You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/09/30 16:57:14 UTC

[1/2] [FLINK-1005] Extend TypeSerializer interface to handle non-mutable object deserialization more efficiently.

Repository: incubator-flink
Updated Branches:
  refs/heads/master ea4c8828c -> 76d4a75e8


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/FloatValueSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/FloatValueSerializerTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/FloatValueSerializerTest.java
new file mode 100644
index 0000000..13aba1f
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/FloatValueSerializerTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import java.util.Random;
+
+import org.apache.flink.api.common.typeutils.SerializerTestBase;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.types.FloatValue;
+
+/**
+ * A test for the {@link FloatValueSerializer}.
+ */
+public class FloatValueSerializerTest extends SerializerTestBase<FloatValue> {
+	
+	@Override
+	protected TypeSerializer<FloatValue> createSerializer() {
+		return new FloatValueSerializer();
+	}
+	
+	@Override
+	protected int getLength() {
+		return 4;
+	}
+	
+	@Override
+	protected Class<FloatValue> getTypeClass() {
+		return FloatValue.class;
+	}
+	
+	@Override
+	protected FloatValue[] getTestData() {
+		Random rnd = new Random(874597969123412341L);
+		float rndFloat = rnd.nextFloat() * Float.MAX_VALUE;
+		
+		return new FloatValue[] {new FloatValue(0), new FloatValue(1), new FloatValue(-1),
+							new FloatValue(Float.MAX_VALUE), new FloatValue(Float.MIN_VALUE),
+							new FloatValue(rndFloat), new FloatValue(-rndFloat),
+							new FloatValue(Float.NaN),
+							new FloatValue(Float.NEGATIVE_INFINITY), new FloatValue(Float.POSITIVE_INFINITY)};
+	}
+}	

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/IntValueSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/IntValueSerializerTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/IntValueSerializerTest.java
new file mode 100644
index 0000000..80d63b0
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/IntValueSerializerTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import java.util.Random;
+
+import org.apache.flink.api.common.typeutils.SerializerTestBase;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.types.IntValue;
+
+/**
+ * A test for the {@link IntValueSerializer}.
+ */
+public class IntValueSerializerTest extends SerializerTestBase<IntValue> {
+	
+	@Override
+	protected TypeSerializer<IntValue> createSerializer() {
+		return new IntValueSerializer();
+	}
+	
+	@Override
+	protected int getLength() {
+		return 4;
+	}
+	
+	@Override
+	protected Class<IntValue> getTypeClass() {
+		return IntValue.class;
+	}
+	
+	@Override
+	protected IntValue[] getTestData() {
+		Random rnd = new Random(874597969123412341L);
+		int rndInt = rnd.nextInt();
+		
+		return new IntValue[] {new IntValue(0), new IntValue(1), new IntValue(-1),
+							new IntValue(Integer.MAX_VALUE), new IntValue(Integer.MIN_VALUE),
+							new IntValue(rndInt), new IntValue(-rndInt)};
+	}
+}	

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/LongValueSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/LongValueSerializerTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/LongValueSerializerTest.java
new file mode 100644
index 0000000..533b809
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/LongValueSerializerTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import java.util.Random;
+
+import org.apache.flink.api.common.typeutils.SerializerTestBase;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.types.LongValue;
+
+/**
+ * A test for the {@link LongValueSerializer}.
+ */
+public class LongValueSerializerTest extends SerializerTestBase<LongValue> {
+	
+	@Override
+	protected TypeSerializer<LongValue> createSerializer() {
+		return new LongValueSerializer();
+	}
+	
+	@Override
+	protected int getLength() {
+		return 8;
+	}
+	
+	@Override
+	protected Class<LongValue> getTypeClass() {
+		return LongValue.class;
+	}
+	
+	@Override
+	protected LongValue[] getTestData() {
+		Random rnd = new Random(874597969123412341L);
+		long rndLong = rnd.nextLong();
+		
+		return new LongValue[] {new LongValue(0L), new LongValue(1L), new LongValue(-1L),
+							new LongValue(Long.MAX_VALUE), new LongValue(Long.MIN_VALUE),
+							new LongValue(rndLong), new LongValue(-rndLong)};
+	}
+}	

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/ShortValueSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/ShortValueSerializerTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/ShortValueSerializerTest.java
new file mode 100644
index 0000000..4ef6816
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/ShortValueSerializerTest.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import java.util.Random;
+
+import org.apache.flink.api.common.typeutils.SerializerTestBase;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.types.ShortValue;
+
+/**
+ * A test for the {@link ShortValueSerializer}.
+ */
+public class ShortValueSerializerTest extends SerializerTestBase<ShortValue> {
+	
+	@Override
+	protected TypeSerializer<ShortValue> createSerializer() {
+		return new ShortValueSerializer();
+	}
+	
+	@Override
+	protected int getLength() {
+		return 2;
+	}
+	
+	@Override
+	protected Class<ShortValue> getTypeClass() {
+		return ShortValue.class;
+	}
+	
+	@Override
+	protected ShortValue[] getTestData() {
+		Random rnd = new Random(874597969123412341L);
+		int rndInt = rnd.nextInt(32767);
+		
+		return new ShortValue[] {new ShortValue((short) 0), new ShortValue((short) 1), new ShortValue((short) -1),
+							new ShortValue((short) rndInt), new ShortValue((short) -rndInt),
+							new ShortValue((short) -32767), new ShortValue((short) 32768)};
+	}
+}
+	

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/StringValueSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/StringValueSerializerTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/StringValueSerializerTest.java
new file mode 100644
index 0000000..c20cd92
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/StringValueSerializerTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import org.apache.flink.api.common.typeutils.SerializerTestBase;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.types.StringValue;
+
+/**
+ * A test for the {@link StringValueSerializer}.
+ */
+public class StringValueSerializerTest extends SerializerTestBase<StringValue> {
+	
+	@Override
+	protected TypeSerializer<StringValue> createSerializer() {
+		return new StringValueSerializer();
+	}
+	
+	@Override
+	protected int getLength() {
+		return -1;
+	}
+	
+	@Override
+	protected Class<StringValue> getTypeClass() {
+		return StringValue.class;
+	}
+	
+	@Override
+	protected StringValue[] getTestData() {
+		return new StringValue[] {
+				new StringValue("a"),
+				new StringValue(""),
+				new StringValue("bcd"),
+				new StringValue("jbmbmner8 jhk hj \n \t üäßß@µ"),
+				new StringValue(""),
+				new StringValue("non-empty")};
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/flink-core/src/test/resources/logback-test.xml b/flink-core/src/test/resources/logback-test.xml
index 8b3bb27..4f484cb 100644
--- a/flink-core/src/test/resources/logback-test.xml
+++ b/flink-core/src/test/resources/logback-test.xml
@@ -26,4 +26,8 @@
     <root level="WARN">
         <appender-ref ref="STDOUT"/>
     </root>
+    
+    <logger name="org.apache.flink.api.common.io.DelimitedInputFormat" level="OFF"/>
+    <logger name="org.apache.flink.api.common.io.FileInputFormat" level="OFF"/>
+    <logger name="org.apache.flink.configuration.GlobalConfiguration" level="OFF"/>
 </configuration>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java
index d93030d..31a04c9 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java
@@ -36,7 +36,7 @@ import com.esotericsoftware.kryo.Kryo;
  *
  * @param <T> The type serialized.
  */
-public class AvroSerializer<T> extends TypeSerializer<T> {
+public final class AvroSerializer<T> extends TypeSerializer<T> {
 
 	private static final long serialVersionUID = 1L;
 	
@@ -89,10 +89,15 @@ public class AvroSerializer<T> extends TypeSerializer<T> {
 	}
 
 	@Override
+	public T copy(T from) {
+		checkKryoInitialized();
+		return this.kryo.copy(from);
+	}
+	
+	@Override
 	public T copy(T from, T reuse) {
 		checkKryoInitialized();
-		reuse = this.kryo.copy(from);
-		return reuse;
+		return this.kryo.copy(from);
 	}
 
 	@Override
@@ -106,6 +111,13 @@ public class AvroSerializer<T> extends TypeSerializer<T> {
 		this.encoder.setOut(target);
 		this.writer.write(value, this.encoder);
 	}
+	
+	@Override
+	public T deserialize(DataInputView source) throws IOException {
+		checkAvroInitialized();
+		this.decoder.setIn(source);
+		return this.reader.read(null, this.decoder);
+	}
 
 	@Override
 	public T deserialize(T reuse, DataInputView source) throws IOException {
@@ -146,4 +158,21 @@ public class AvroSerializer<T> extends TypeSerializer<T> {
 			this.kryo.register(type);
 		}
 	}
+	
+	// --------------------------------------------------------------------------------------------
+	
+	@Override
+	public int hashCode() {
+		return 0x42fba55c + this.type.hashCode() + this.typeToInstantiate.hashCode();
+	}
+	
+	@Override
+	public boolean equals(Object obj) {
+		if (obj.getClass() == AvroSerializer.class) {
+			AvroSerializer<?> other = (AvroSerializer<?>) obj;
+			return this.type == other.type && this.typeToInstantiate == other.typeToInstantiate;
+		} else {
+			return false;
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueSerializer.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueSerializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueSerializer.java
index fa3a91e..9d12d7e 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueSerializer.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueSerializer.java
@@ -55,7 +55,12 @@ public class CopyableValueSerializer<T extends CopyableValue<T>> extends TypeSer
 	public T createInstance() {
 		return InstantiationUtil.instantiate(this.valueClass);
 	}
-
+	
+	@Override
+	public T copy(T from) {
+		return copy(from, createInstance());
+	}
+	
 	@Override
 	public T copy(T from, T reuse) {
 		from.copyTo(reuse);
@@ -74,6 +79,11 @@ public class CopyableValueSerializer<T extends CopyableValue<T>> extends TypeSer
 	}
 
 	@Override
+	public T deserialize(DataInputView source) throws IOException {
+		return deserialize(createInstance(), source);
+	}
+	
+	@Override
 	public T deserialize(T reuse, DataInputView source) throws IOException {
 		reuse.read(source);
 		return reuse;
@@ -92,4 +102,19 @@ public class CopyableValueSerializer<T extends CopyableValue<T>> extends TypeSer
 			instance = createInstance();
 		}
 	}
+	
+	@Override
+	public int hashCode() {
+		return this.valueClass.hashCode() + 9231;
+	}
+	
+	@Override
+	public boolean equals(Object obj) {
+		if (obj.getClass() == CopyableValueSerializer.class) {
+			CopyableValueSerializer<?> other = (CopyableValueSerializer<?>) obj;
+			return this.valueClass == other.valueClass;
+		} else {
+			return false;
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java
index 9e2a1f3..a3acb20 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java
@@ -21,6 +21,7 @@ package org.apache.flink.api.java.typeutils.runtime;
 import com.esotericsoftware.kryo.Kryo;
 import com.esotericsoftware.kryo.io.Input;
 import com.esotericsoftware.kryo.io.Output;
+
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
@@ -75,10 +76,15 @@ public class KryoSerializer<T> extends TypeSerializer<T> {
 	}
 
 	@Override
+	public T copy(T from) {
+		checkKryoInitialized();
+		return kryo.copy(from);
+	}
+	
+	@Override
 	public T copy(T from, T reuse) {
 		checkKryoInitialized();
-		reuse = kryo.copy(from);
-		return reuse;
+		return kryo.copy(from);
 	}
 
 	@Override
@@ -100,15 +106,19 @@ public class KryoSerializer<T> extends TypeSerializer<T> {
 	}
 
 	@Override
-	public T deserialize(T reuse, DataInputView source) throws IOException {
+	public T deserialize(DataInputView source) throws IOException {
 		checkKryoInitialized();
 		if (source != previousIn) {
 			DataInputViewStream inputStream = new DataInputViewStream(source);
 			input = new NoFetchingInput(inputStream);
 			previousIn = source;
 		}
-		reuse = kryo.readObject(input, typeToInstantiate);
-		return reuse;
+		return kryo.readObject(input, typeToInstantiate);
+	}
+	
+	@Override
+	public T deserialize(T reuse, DataInputView source) throws IOException {
+		return deserialize(source);
 	}
 
 	@Override
@@ -121,6 +131,25 @@ public class KryoSerializer<T> extends TypeSerializer<T> {
 		T tmp = deserialize(copyInstance, source);
 		serialize(tmp, target);
 	}
+	
+	// --------------------------------------------------------------------------------------------
+	
+	@Override
+	public int hashCode() {
+		return type.hashCode() + 31 * typeToInstantiate.hashCode();
+	}
+	
+	@Override
+	public boolean equals(Object obj) {
+		if (obj != null && obj instanceof KryoSerializer) {
+			KryoSerializer<?> other = (KryoSerializer<?>) obj;
+			return other.type == this.type && other.typeToInstantiate == this.typeToInstantiate;
+		} else {
+			return false;
+		}
+	}
+	
+	// --------------------------------------------------------------------------------------------
 
 	private final void checkKryoInitialized() {
 		if (this.kryo == null) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
index ed48c42..71f2cd8 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
@@ -131,6 +131,28 @@ public final class PojoSerializer<T> extends TypeSerializer<T> {
 	}
 
 	@Override
+	public T copy(T from) {
+		T target;
+		try {
+			target = clazz.newInstance();
+		}
+		catch (Throwable t) {
+			throw new RuntimeException("Cannot instantiate class.", t);
+		}
+		
+		try {
+			for (int i = 0; i < numFields; i++) {
+				Object copy = fieldSerializers[i].copy(fields[i].get(from));
+				fields[i].set(target, copy);
+			}
+		}
+		catch (IllegalAccessException e) {
+			throw new RuntimeException("Error during POJO copy, this should not happen since we check the fields before.");
+		}
+		return target;
+	}
+	
+	@Override
 	public T copy(T from, T reuse) {
 		try {
 			for (int i = 0; i < numFields; i++) {
@@ -165,6 +187,28 @@ public final class PojoSerializer<T> extends TypeSerializer<T> {
 	}
 
 	@Override
+	public T deserialize(DataInputView source) throws IOException {
+		T target;
+		try {
+			target = clazz.newInstance();
+		}
+		catch (Throwable t) {
+			throw new RuntimeException("Cannot instantiate class.", t);
+		}
+		
+		try {
+			for (int i = 0; i < numFields; i++) {
+				Object field = fieldSerializers[i].deserialize(source);
+				fields[i].set(target, field);
+			}
+		} catch (IllegalAccessException e) {
+			throw new RuntimeException("Error during POJO copy, this should not happen since we check the fields" +
+					"before.");
+		}
+		return target;
+	}
+	
+	@Override
 	public T deserialize(T reuse, DataInputView source) throws IOException {
 		try {
 			for (int i = 0; i < numFields; i++) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java
index 5d5c08f..12bec12 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java
@@ -31,7 +31,6 @@ public final class TupleSerializer<T extends Tuple> extends TupleSerializerBase<
 
 	private static final long serialVersionUID = 1L;
 	
-	@SuppressWarnings("unchecked")
 	public TupleSerializer(Class<T> tupleClass, TypeSerializer<?>[] fieldSerializers) {
 		super(tupleClass, fieldSerializers);
 	}
@@ -69,6 +68,11 @@ public final class TupleSerializer<T extends Tuple> extends TupleSerializerBase<
 	}
 
 	@Override
+	public T copy(T from) {
+		return copy(from, instantiateRaw());
+	}
+	
+	@Override
 	public T copy(T from, T reuse) {
 		for (int i = 0; i < arity; i++) {
 			Object copy = fieldSerializers[i].copy(from.getField(i), reuse.getField(i));
@@ -91,6 +95,16 @@ public final class TupleSerializer<T extends Tuple> extends TupleSerializerBase<
 	}
 
 	@Override
+	public T deserialize(DataInputView source) throws IOException {
+		T tuple = instantiateRaw();
+		for (int i = 0; i < arity; i++) {
+			Object field = fieldSerializers[i].deserialize(source);
+			tuple.setField(field, i);
+		}
+		return tuple;
+	}
+	
+	@Override
 	public T deserialize(T reuse, DataInputView source) throws IOException {
 		for (int i = 0; i < arity; i++) {
 			Object field = fieldSerializers[i].deserialize(reuse.getField(i), source);
@@ -98,4 +112,13 @@ public final class TupleSerializer<T extends Tuple> extends TupleSerializerBase<
 		}
 		return reuse;
 	}
+	
+	private T instantiateRaw() {
+		try {
+			return tupleClass.newInstance();
+		}
+		catch (Exception e) {
+			throw new RuntimeException("Cannot instantiate tuple.", e);
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
index db71b56..08df7d3 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
@@ -28,6 +28,8 @@ import java.util.Arrays;
 
 public abstract class TupleSerializerBase<T> extends TypeSerializer<T> {
 
+	private static final long serialVersionUID = 1L;
+
 	protected final Class<T> tupleClass;
 
 	protected final TypeSerializer<Object>[] fieldSerializers;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializer.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializer.java
index 8bea523..69a5ff6 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializer.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializer.java
@@ -72,10 +72,15 @@ public class ValueSerializer<T extends Value> extends TypeSerializer<T> {
 	}
 
 	@Override
+	public T copy(T from) {
+		checkKryoInitialized();
+		return this.kryo.copy(from);
+	}
+	
+	@Override
 	public T copy(T from, T reuse) {
 		checkKryoInitialized();
-		reuse = this.kryo.copy(from);
-		return reuse;
+		return this.kryo.copy(from);
 	}
 
 	@Override
@@ -89,6 +94,11 @@ public class ValueSerializer<T extends Value> extends TypeSerializer<T> {
 	}
 
 	@Override
+	public T deserialize(DataInputView source) throws IOException {
+		return deserialize(createInstance(), source);
+	}
+	
+	@Override
 	public T deserialize(T reuse, DataInputView source) throws IOException {
 		reuse.read(source);
 		return reuse;
@@ -111,4 +121,21 @@ public class ValueSerializer<T extends Value> extends TypeSerializer<T> {
 			this.kryo.register(type);
 		}
 	}
+	
+	// --------------------------------------------------------------------------------------------
+	
+	@Override
+	public int hashCode() {
+		return this.type.hashCode() + 17;
+	}
+	
+	@Override
+	public boolean equals(Object obj) {
+		if (obj.getClass() == ValueSerializer.class) {
+			ValueSerializer<?> other = (ValueSerializer<?>) obj;
+			return this.type == other.type;
+		} else {
+			return false;
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java
index 6fcf730..d5a0470 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java
@@ -48,10 +48,15 @@ public class WritableSerializer<T extends Writable> extends TypeSerializer<T> {
 	}
 	
 	@Override
+	public T copy(T from) {
+		checkKryoInitialized();
+		return this.kryo.copy(from);
+	}
+	
+	@Override
 	public T copy(T from, T reuse) {
 		checkKryoInitialized();
-		reuse = this.kryo.copy(from);
-		return reuse;
+		return this.kryo.copy(from);
 	}
 	
 	@Override
@@ -65,6 +70,11 @@ public class WritableSerializer<T extends Writable> extends TypeSerializer<T> {
 	}
 	
 	@Override
+	public T deserialize(DataInputView source) throws IOException {
+		return deserialize(createInstance(), source);
+	}
+	
+	@Override
 	public T deserialize(T reuse, DataInputView source) throws IOException {
 		reuse.readFields(source);
 		return reuse;
@@ -102,4 +112,20 @@ public class WritableSerializer<T extends Writable> extends TypeSerializer<T> {
 			this.kryo.register(typeClass);
 		}
 	}
+	// --------------------------------------------------------------------------------------------
+	
+	@Override
+	public int hashCode() {
+		return this.typeClass.hashCode() + 177;
+	}
+	
+	@Override
+	public boolean equals(Object obj) {
+		if (obj.getClass() == WritableSerializer.class) {
+			WritableSerializer<?> other = (WritableSerializer<?>) obj;
+			return this.typeClass == other.typeClass;
+		} else {
+			return false;
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-runtime/src/test/java/org/apache/flink/runtime/operators/resettable/SpillingResettableIteratorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/resettable/SpillingResettableIteratorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/resettable/SpillingResettableIteratorTest.java
index f3b63d3..2bc11f1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/resettable/SpillingResettableIteratorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/resettable/SpillingResettableIteratorTest.java
@@ -16,24 +16,21 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.operators.resettable;
 
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.NoSuchElementException;
 
-import org.junit.Assert;
-
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntValueSerializer;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
 import org.apache.flink.runtime.memorymanager.MemoryManager;
-import org.apache.flink.runtime.operators.resettable.SpillingResettableIterator;
 import org.apache.flink.runtime.operators.testutils.DummyInvokable;
-import org.apache.flink.runtime.operators.testutils.types.IntValueSerializer;
 import org.apache.flink.types.IntValue;
+import org.junit.Assert;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntListSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntListSerializer.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntListSerializer.java
index 05b55da..2134bcd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntListSerializer.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntListSerializer.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.operators.testutils.types;
 
 import java.io.IOException;
+import java.util.Arrays;
 
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataInputView;
@@ -44,9 +45,14 @@ public class IntListSerializer extends TypeSerializer<IntList> {
 	}
 	
 	@Override
+	public IntList copy(IntList from) {
+		return new IntList(from.getKey(), Arrays.copyOf(from.getValue(), from.getValue().length));
+	}
+	
+	@Override
 	public IntList copy(IntList from, IntList reuse) {
 		reuse.setKey(from.getKey());
-		reuse.setValue(from.getValue());
+		reuse.setValue(Arrays.copyOf(from.getValue(), from.getValue().length));
 		return reuse;
 	}
 	
@@ -74,6 +80,11 @@ public class IntListSerializer extends TypeSerializer<IntList> {
 	}
 
 	@Override
+	public IntList deserialize(DataInputView source) throws IOException {
+		return deserialize(new IntList(), source);
+	}
+	
+	@Override
 	public IntList deserialize(IntList record, DataInputView source) throws IOException {
 		int key = source.readInt();
 		record.setKey(key);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntPairSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntPairSerializer.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntPairSerializer.java
index 9533627..361585d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntPairSerializer.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntPairSerializer.java
@@ -46,6 +46,11 @@ public class IntPairSerializer extends TypeSerializer<IntPair> {
 	public IntPair createInstance() {
 		return new IntPair();
 	}
+	
+	@Override
+	public IntPair copy(IntPair from) {
+		return new IntPair(from.getKey(), from.getValue());
+	}
 
 	@Override
 	public IntPair copy(IntPair from, IntPair reuse) {
@@ -67,6 +72,11 @@ public class IntPairSerializer extends TypeSerializer<IntPair> {
 	}
 
 	@Override
+	public IntPair deserialize(DataInputView source) throws IOException {
+		return new IntPair(source.readInt(), source.readInt());
+	}
+	
+	@Override
 	public IntPair deserialize(IntPair reuse, DataInputView source) throws IOException {
 		reuse.setKey(source.readInt());
 		reuse.setValue(source.readInt());

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntValueSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntValueSerializer.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntValueSerializer.java
deleted file mode 100644
index ab24e92..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntValueSerializer.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.operators.testutils.types;
-
-import java.io.IOException;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.types.IntValue;
-
-
-public class IntValueSerializer extends TypeSerializer<IntValue> {
-
-	private static final long serialVersionUID = 1L;
-
-	@Override
-	public boolean isImmutableType() {
-		return false;
-	}
-
-	@Override
-	public boolean isStateful() {
-		return false;
-	}
-	@Override
-	public IntValue createInstance() {
-		return new IntValue();
-	}
-
-	@Override
-	public IntValue copy(IntValue from, IntValue reuse) {
-		reuse.setValue(from.getValue());
-		return reuse;
-	}
-
-
-	@Override
-	public int getLength() {
-		return 4;
-	}
-
-	@Override
-	public void serialize(IntValue record, DataOutputView target) throws IOException {
-		target.writeInt(record.getValue());
-	}
-
-	@Override
-	public IntValue deserialize(IntValue reuse, DataInputView source) throws IOException {
-		reuse.setValue(source.readInt());
-		return reuse;
-	}
-
-	@Override
-	public void copy(DataInputView source, DataOutputView target) throws IOException {
-		target.write(source, 4);
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/StringPairSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/StringPairSerializer.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/StringPairSerializer.java
index 8ba46c2..a38633c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/StringPairSerializer.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/StringPairSerializer.java
@@ -45,6 +45,10 @@ public class StringPairSerializer extends TypeSerializer<StringPair> {
 	}
 	
 	@Override
+	public StringPair copy(StringPair from) {
+		return new StringPair(from.getKey(), from.getValue());
+	}
+	@Override
 	public StringPair copy(StringPair from, StringPair reuse) {
 		reuse.setKey(from.getKey());
 		reuse.setValue(from.getValue());
@@ -63,6 +67,11 @@ public class StringPairSerializer extends TypeSerializer<StringPair> {
 	}
 
 	@Override
+	public StringPair deserialize(DataInputView source) throws IOException {
+		return new StringPair(StringValue.readString(source), StringValue.readString(source));
+	}
+	
+	@Override
 	public StringPair deserialize(StringPair record, DataInputView source) throws IOException {
 		record.setKey(StringValue.readString(source));
 		record.setValue(StringValue.readString(source));

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassSerializer.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassSerializer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassSerializer.scala
index b452b12..f9cd10c 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassSerializer.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassSerializer.scala
@@ -31,8 +31,11 @@ abstract class CaseClassSerializer[T <: Product](
     scalaFieldSerializers: Array[TypeSerializer[_]])
   extends TupleSerializerBase[T](clazz, scalaFieldSerializers) {
 
+  @transient var fields : Array[AnyRef] = _
+  
+  
   def createInstance: T = {
-    val fields: Array[AnyRef] = new Array(arity)
+    initArray()
     for (i <- 0 until arity) {
       fields(i) = fieldSerializers(i).createInstance()
     }
@@ -40,7 +43,11 @@ abstract class CaseClassSerializer[T <: Product](
   }
 
   def copy(from: T, reuse: T): T = {
-    val fields: Array[AnyRef] = new Array(arity)
+    copy(from)
+  }
+  
+  def copy(from: T): T = {
+    initArray()
     for (i <- 0 until arity) {
       fields(i) = from.productElement(i).asInstanceOf[AnyRef]
     }
@@ -55,11 +62,25 @@ abstract class CaseClassSerializer[T <: Product](
   }
 
   def deserialize(reuse: T, source: DataInputView): T = {
-    val fields: Array[AnyRef] = new Array(arity)
+    initArray()
     for (i <- 0 until arity) {
       val field = reuse.productElement(i).asInstanceOf[AnyRef]
       fields(i) = fieldSerializers(i).deserialize(field, source)
     }
     createInstance(fields)
   }
+  
+  def deserialize(source: DataInputView): T = {
+    initArray()
+    for (i <- 0 until arity) {
+      fields(i) = fieldSerializers(i).deserialize(source)
+    }
+    createInstance(fields)
+  }
+  
+  def initArray() = {
+    if (fields == null) {
+      fields = new Array[AnyRef](arity)
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithAdjacencyListSerializer.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithAdjacencyListSerializer.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithAdjacencyListSerializer.java
index fdfd873..822b4f2 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithAdjacencyListSerializer.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithAdjacencyListSerializer.java
@@ -20,11 +20,11 @@ package org.apache.flink.test.iterative.nephele.customdanglingpagerank.types;
 
 import java.io.IOException;
 
-import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 
-public final class VertexWithAdjacencyListSerializer extends TypeSerializer<VertexWithAdjacencyList> {
+public final class VertexWithAdjacencyListSerializer extends TypeSerializerSingleton<VertexWithAdjacencyList> {
 
 	private static final long serialVersionUID = 1L;
 	
@@ -45,6 +45,14 @@ public final class VertexWithAdjacencyListSerializer extends TypeSerializer<Vert
 	}
 
 	@Override
+	public VertexWithAdjacencyList copy(VertexWithAdjacencyList from) {
+		VertexWithAdjacencyList copy = new VertexWithAdjacencyList(from.getVertexID(), new long[from.getNumTargets()]);
+		copy.setNumTargets(from.getNumTargets());
+		System.arraycopy(from.getTargets(), 0, copy.getTargets(), 0, from.getNumTargets());
+		return copy;
+	}
+	
+	@Override
 	public VertexWithAdjacencyList copy(VertexWithAdjacencyList from, VertexWithAdjacencyList reuse) {
 		if (reuse.getTargets().length < from.getTargets().length) {
 			reuse.setTargets(new long[from.getTargets().length]);
@@ -75,6 +83,11 @@ public final class VertexWithAdjacencyListSerializer extends TypeSerializer<Vert
 	}
 
 	@Override
+	public VertexWithAdjacencyList deserialize(DataInputView source) throws IOException {
+		return deserialize(new VertexWithAdjacencyList(), source);
+	}
+	
+	@Override
 	public VertexWithAdjacencyList deserialize(VertexWithAdjacencyList target, DataInputView source) throws IOException {
 		target.setVertexID(source.readLong());
 		
@@ -101,16 +114,4 @@ public final class VertexWithAdjacencyListSerializer extends TypeSerializer<Vert
 		target.writeInt(numTargets);
 		target.write(source, numTargets * 8);
 	}
-	
-	// --------------------------------------------------------------------------------------------
-	
-	@Override
-	public int hashCode() {
-		return 3;
-	}
-	
-	@Override
-	public boolean equals(Object obj) {
-		return obj != null && obj.getClass() == VertexWithAdjacencyListSerializer.class;
-	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankAndDanglingSerializer.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankAndDanglingSerializer.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankAndDanglingSerializer.java
index 5189203..e972cd1 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankAndDanglingSerializer.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankAndDanglingSerializer.java
@@ -20,11 +20,11 @@ package org.apache.flink.test.iterative.nephele.customdanglingpagerank.types;
 
 import java.io.IOException;
 
-import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 
-public final class VertexWithRankAndDanglingSerializer extends TypeSerializer<VertexWithRankAndDangling> {
+public final class VertexWithRankAndDanglingSerializer extends TypeSerializerSingleton<VertexWithRankAndDangling> {
 
 	private static final long serialVersionUID = 1L;
 	
@@ -45,6 +45,11 @@ public final class VertexWithRankAndDanglingSerializer extends TypeSerializer<Ve
 	}
 
 	@Override
+	public VertexWithRankAndDangling copy(VertexWithRankAndDangling from) {
+		return new VertexWithRankAndDangling(from.getVertexID(), from.getRank(), from.isDangling());
+	}
+	
+	@Override
 	public VertexWithRankAndDangling copy(VertexWithRankAndDangling from, VertexWithRankAndDangling reuse) {
 		reuse.setVertexID(from.getVertexID());
 		reuse.setRank(from.getRank());
@@ -65,6 +70,11 @@ public final class VertexWithRankAndDanglingSerializer extends TypeSerializer<Ve
 	}
 
 	@Override
+	public VertexWithRankAndDangling deserialize(DataInputView source) throws IOException {
+		return new VertexWithRankAndDangling(source.readLong(), source.readDouble(), source.readBoolean());
+	}
+	
+	@Override
 	public VertexWithRankAndDangling deserialize(VertexWithRankAndDangling target, DataInputView source) throws IOException {
 		target.setVertexID(source.readLong());
 		target.setRank(source.readDouble());
@@ -76,16 +86,4 @@ public final class VertexWithRankAndDanglingSerializer extends TypeSerializer<Ve
 	public void copy(DataInputView source, DataOutputView target) throws IOException {
 		target.write(source, 17);
 	}
-	
-	// --------------------------------------------------------------------------------------------
-	
-	@Override
-	public int hashCode() {
-		return 2;
-	}
-	
-	@Override
-	public boolean equals(Object obj) {
-		return obj != null && obj.getClass() == VertexWithRankAndDanglingSerializer.class;
-	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankSerializer.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankSerializer.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankSerializer.java
index 1065633..928d4f4 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankSerializer.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankSerializer.java
@@ -20,11 +20,11 @@ package org.apache.flink.test.iterative.nephele.customdanglingpagerank.types;
 
 import java.io.IOException;
 
-import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 
-public final class VertexWithRankSerializer extends TypeSerializer<VertexWithRank> {
+public final class VertexWithRankSerializer extends TypeSerializerSingleton<VertexWithRank> {
 
 	private static final long serialVersionUID = 1L;
 	
@@ -43,6 +43,11 @@ public final class VertexWithRankSerializer extends TypeSerializer<VertexWithRan
 	public VertexWithRank createInstance() {
 		return new VertexWithRank();
 	}
+	
+	@Override
+	public VertexWithRank copy(VertexWithRank from) {
+		return new VertexWithRank(from.getVertexID(), from.getRank());
+	}
 
 	@Override
 	public VertexWithRank copy(VertexWithRank from, VertexWithRank reuse) {
@@ -63,6 +68,11 @@ public final class VertexWithRankSerializer extends TypeSerializer<VertexWithRan
 	}
 
 	@Override
+	public VertexWithRank deserialize(DataInputView source) throws IOException {
+		return new VertexWithRank(source.readLong(), source.readDouble());
+	}
+	
+	@Override
 	public VertexWithRank deserialize(VertexWithRank target, DataInputView source) throws IOException {
 		target.setVertexID(source.readLong());
 		target.setRank(source.readDouble());
@@ -73,16 +83,4 @@ public final class VertexWithRankSerializer extends TypeSerializer<VertexWithRan
 	public void copy(DataInputView source, DataOutputView target) throws IOException {
 		target.write(source, 16);
 	}
-	
-	// --------------------------------------------------------------------------------------------
-	
-	@Override
-	public int hashCode() {
-		return 1;
-	}
-	
-	@Override
-	public boolean equals(Object obj) {
-		return obj != null && obj.getClass() == VertexWithRankSerializer.class;
-	}
 }


[2/2] git commit: [FLINK-1005] Extend TypeSerializer interface to handle non-mutable object deserialization more efficiently.

Posted by se...@apache.org.
[FLINK-1005] Extend TypeSerializer interface to handle non-mutable object deserialization more efficiently.


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/76d4a75e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/76d4a75e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/76d4a75e

Branch: refs/heads/master
Commit: 76d4a75e823c31a899f2143fb6be185b90e55532
Parents: ea4c882
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Jul 7 19:39:24 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Sep 30 15:10:38 2014 +0200

----------------------------------------------------------------------
 .../streamrecord/StreamRecordSerializer.java    | 22 +++++-
 .../api/common/typeutils/TypeSerializer.java    | 42 ++++++-----
 .../typeutils/base/BooleanSerializer.java       | 14 +++-
 .../typeutils/base/BooleanValueSerializer.java  | 15 +++-
 .../common/typeutils/base/ByteSerializer.java   | 15 +++-
 .../typeutils/base/ByteValueSerializer.java     | 13 +++-
 .../common/typeutils/base/CharSerializer.java   | 15 +++-
 .../typeutils/base/CharValueSerializer.java     | 14 +++-
 .../common/typeutils/base/DoubleSerializer.java | 16 +++--
 .../typeutils/base/DoubleValueSerializer.java   | 13 +++-
 .../common/typeutils/base/FloatSerializer.java  | 16 +++--
 .../typeutils/base/FloatValueSerializer.java    | 15 +++-
 .../typeutils/base/GenericArraySerializer.java  | 38 ++++++++--
 .../common/typeutils/base/IntSerializer.java    | 15 +++-
 .../typeutils/base/IntValueSerializer.java      | 13 +++-
 .../common/typeutils/base/LongSerializer.java   | 15 +++-
 .../typeutils/base/LongValueSerializer.java     | 13 +++-
 .../common/typeutils/base/ShortSerializer.java  | 15 +++-
 .../typeutils/base/ShortValueSerializer.java    | 13 +++-
 .../common/typeutils/base/StringSerializer.java | 15 +++-
 .../typeutils/base/StringValueSerializer.java   | 40 ++++++++++-
 .../typeutils/base/TypeSerializerSingleton.java | 38 ++++++++++
 .../array/BooleanPrimitiveArraySerializer.java  | 26 ++++---
 .../array/BytePrimitiveArraySerializer.java     | 27 ++++---
 .../array/CharPrimitiveArraySerializer.java     | 27 ++++---
 .../array/DoublePrimitiveArraySerializer.java   | 29 +++++---
 .../array/FloatPrimitiveArraySerializer.java    | 27 ++++---
 .../base/array/IntPrimitiveArraySerializer.java | 27 ++++---
 .../array/LongPrimitiveArraySerializer.java     | 29 +++++---
 .../array/ShortPrimitiveArraySerializer.java    | 27 ++++---
 .../base/array/StringArraySerializer.java       | 30 +++++---
 .../typeutils/record/RecordSerializer.java      | 11 ++-
 .../common/typeutils/SerializerTestBase.java    | 49 ++++++++++++-
 .../typeutils/SerializerTestInstance.java       |  4 +-
 .../base/BooleanValueSerializerTest.java        | 56 +++++++++++++++
 .../typeutils/base/ByteValueSerializerTest.java | 57 +++++++++++++++
 .../typeutils/base/CharValueSerializerTest.java | 56 +++++++++++++++
 .../base/DoubleValueSerializerTest.java         | 58 +++++++++++++++
 .../base/FloatValueSerializerTest.java          | 58 +++++++++++++++
 .../typeutils/base/IntValueSerializerTest.java  | 56 +++++++++++++++
 .../typeutils/base/LongValueSerializerTest.java | 56 +++++++++++++++
 .../base/ShortValueSerializerTest.java          | 57 +++++++++++++++
 .../base/StringValueSerializerTest.java         | 55 ++++++++++++++
 flink-core/src/test/resources/logback-test.xml  |  4 ++
 .../java/typeutils/runtime/AvroSerializer.java  | 35 ++++++++-
 .../runtime/CopyableValueSerializer.java        | 27 ++++++-
 .../java/typeutils/runtime/KryoSerializer.java  | 39 ++++++++--
 .../java/typeutils/runtime/PojoSerializer.java  | 44 ++++++++++++
 .../java/typeutils/runtime/TupleSerializer.java | 25 ++++++-
 .../typeutils/runtime/TupleSerializerBase.java  |  2 +
 .../java/typeutils/runtime/ValueSerializer.java | 31 +++++++-
 .../typeutils/runtime/WritableSerializer.java   | 30 +++++++-
 .../SpillingResettableIteratorTest.java         |  7 +-
 .../testutils/types/IntListSerializer.java      | 13 +++-
 .../testutils/types/IntPairSerializer.java      | 10 +++
 .../testutils/types/IntValueSerializer.java     | 75 --------------------
 .../testutils/types/StringPairSerializer.java   |  9 +++
 .../scala/typeutils/CaseClassSerializer.scala   | 27 ++++++-
 .../VertexWithAdjacencyListSerializer.java      | 29 ++++----
 .../VertexWithRankAndDanglingSerializer.java    | 26 ++++---
 .../types/VertexWithRankSerializer.java         | 26 ++++---
 61 files changed, 1405 insertions(+), 301 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamrecord/StreamRecordSerializer.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamrecord/StreamRecordSerializer.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamrecord/StreamRecordSerializer.java
index 932bae0..85faa9e 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamrecord/StreamRecordSerializer.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamrecord/StreamRecordSerializer.java
@@ -62,6 +62,15 @@ public final class StreamRecordSerializer<T> extends TypeSerializer<StreamRecord
 			throw new RuntimeException("Cannot instantiate StreamRecord.", e);
 		}
 	}
+	
+	@Override
+	public StreamRecord<T> copy(StreamRecord<T> from) {
+		StreamRecord<T> rec = new StreamRecord<T>();
+		rec.isTuple = from.isTuple;
+		rec.setId(from.getId().copy());
+		rec.setObject(typeSerializer.copy(from.getObject()));
+		return rec;
+	}
 
 	@Override
 	public StreamRecord<T> copy(StreamRecord<T> from, StreamRecord<T> reuse) {
@@ -81,10 +90,18 @@ public final class StreamRecordSerializer<T> extends TypeSerializer<StreamRecord
 		value.getId().write(target);
 		typeSerializer.serialize(value.getObject(), target);
 	}
+	
+	@Override
+	public StreamRecord<T> deserialize(DataInputView source) throws IOException {
+		StreamRecord<T> record = new StreamRecord<T>();
+		record.isTuple = this.isTuple;
+		record.getId().read(source);
+		record.setObject(typeSerializer.deserialize(source));
+		return record;
+	}
 
 	@Override
-	public StreamRecord<T> deserialize(StreamRecord<T> reuse, DataInputView source)
-			throws IOException {
+	public StreamRecord<T> deserialize(StreamRecord<T> reuse, DataInputView source) throws IOException {
 		reuse.getId().read(source);
 		reuse.setObject(typeSerializer.deserialize(reuse.getObject(), source));
 		return reuse;
@@ -94,5 +111,4 @@ public final class StreamRecordSerializer<T> extends TypeSerializer<StreamRecord
 	public void copy(DataInputView source, DataOutputView target) throws IOException {
 		// Needs to be implemented
 	}
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
index 9be4a89..87d7e20 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
@@ -73,10 +73,21 @@ public abstract class TypeSerializer<T> implements Serializable {
 	public abstract T createInstance();
 
 	/**
-	 * Creates a copy from the given element, storing the copied result in the given reuse element if type is mutable.
+	 * Creates a deep copy of the given element in a new element.
 	 * 
 	 * @param from The element reuse be copied.
-	 * @param reuse The element to be reused.
+	 * @return A deep copy of the element.
+	 */
+	public abstract T copy(T from);
+	
+	/**
+	 * Creates a copy from the given element.
+	 * The method makes an attempt to store the copy in the given reuse element, if the type is mutable.
+	 * This is, however, not guaranteed.
+	 * 
+	 * @param from The element to be copied.
+	 * @param reuse The element to be reused. May or may not be used.
+	 * @return A deep copy of the element.
 	 */
 	public abstract T copy(T from, T reuse);
 	
@@ -103,10 +114,22 @@ public abstract class TypeSerializer<T> implements Serializable {
 	public abstract void serialize(T record, DataOutputView target) throws IOException;
 
 	/**
+	 * De-serializes a record from the given source input view.
+	 * 
+	 * @param source The input view from which to read the data.
+	 * @result The deserialized element.
+	 * 
+	 * @throws IOException Thrown, if the de-serialization encountered an I/O related error. Typically raised by the
+	 *                     input view, which may have an underlying I/O channel from which it reads.
+	 */
+	public abstract T deserialize(DataInputView source) throws IOException;
+	
+	/**
 	 * De-serializes a record from the given source input view into the given reuse record instance if mutable.
 	 * 
 	 * @param reuse The record instance into which to de-serialize the data.
 	 * @param source The input view from which to read the data.
+	 * @result The deserialized element.
 	 * 
 	 * @throws IOException Thrown, if the de-serialization encountered an I/O related error. Typically raised by the
 	 *                     input view, which may have an underlying I/O channel from which it reads.
@@ -126,19 +149,4 @@ public abstract class TypeSerializer<T> implements Serializable {
 	 * @throws IOException Thrown if any of the two views raises an exception.
 	 */
 	public abstract void copy(DataInputView source, DataOutputView target) throws IOException;
-	
-	// --------------------------------------------------------------------------------------------
-	//  Default Utilities: Hash code and equals are pre-defined for singleton serializers, where
-	//                     all instances are equal
-	// --------------------------------------------------------------------------------------------
-	
-	@Override
-	public int hashCode() {
-		return getClass().hashCode();
-	}
-	
-	@Override
-	public boolean equals(Object obj) {
-		return obj != null && obj.getClass() == this.getClass();
-	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanSerializer.java
index 02a72c5..ecfb3c2 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanSerializer.java
@@ -20,12 +20,10 @@ package org.apache.flink.api.common.typeutils.base;
 
 import java.io.IOException;
 
-import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 
-
-public class BooleanSerializer extends TypeSerializer<Boolean> {
+public final class BooleanSerializer extends TypeSerializerSingleton<Boolean> {
 
 	private static final long serialVersionUID = 1L;
 	
@@ -49,6 +47,11 @@ public class BooleanSerializer extends TypeSerializer<Boolean> {
 	}
 
 	@Override
+	public Boolean copy(Boolean from) {
+		return from;
+	}
+	
+	@Override
 	public Boolean copy(Boolean from, Boolean reuse) {
 		return from;
 	}
@@ -64,6 +67,11 @@ public class BooleanSerializer extends TypeSerializer<Boolean> {
 	}
 
 	@Override
+	public Boolean deserialize(DataInputView source) throws IOException {
+		return Boolean.valueOf(source.readBoolean());
+	}
+	
+	@Override
 	public Boolean deserialize(Boolean reuse, DataInputView source) throws IOException {
 		return Boolean.valueOf(source.readBoolean());
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanValueSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanValueSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanValueSerializer.java
index ddab3bb..4795055 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanValueSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanValueSerializer.java
@@ -20,13 +20,12 @@ package org.apache.flink.api.common.typeutils.base;
 
 import java.io.IOException;
 
-import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.types.BooleanValue;
 
 
-public class BooleanValueSerializer extends TypeSerializer<BooleanValue> {
+public final class BooleanValueSerializer extends TypeSerializerSingleton<BooleanValue> {
 
 	private static final long serialVersionUID = 1L;
 	
@@ -49,6 +48,13 @@ public class BooleanValueSerializer extends TypeSerializer<BooleanValue> {
 	}
 
 	@Override
+	public BooleanValue copy(BooleanValue from) {
+		BooleanValue result = new BooleanValue();
+		result.setValue(from.getValue());
+		return result;
+	}
+	
+	@Override
 	public BooleanValue copy(BooleanValue from, BooleanValue reuse) {
 		reuse.setValue(from.getValue());
 		return reuse;
@@ -65,6 +71,11 @@ public class BooleanValueSerializer extends TypeSerializer<BooleanValue> {
 	}
 
 	@Override
+	public BooleanValue deserialize(DataInputView source) throws IOException {
+		return deserialize(new BooleanValue(), source);
+	}
+	
+	@Override
 	public BooleanValue deserialize(BooleanValue reuse, DataInputView source) throws IOException {
 		reuse.read(source);
 		return reuse;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteSerializer.java
index df52c13..32f3edd 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteSerializer.java
@@ -20,12 +20,11 @@ package org.apache.flink.api.common.typeutils.base;
 
 import java.io.IOException;
 
-import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 
 
-public class ByteSerializer extends TypeSerializer<Byte> {
+public final class ByteSerializer extends TypeSerializerSingleton<Byte> {
 
 	private static final long serialVersionUID = 1L;
 	
@@ -50,6 +49,11 @@ public class ByteSerializer extends TypeSerializer<Byte> {
 	}
 
 	@Override
+	public Byte copy(Byte from) {
+		return from;
+	}
+	
+	@Override
 	public Byte copy(Byte from, Byte reuse) {
 		return from;
 	}
@@ -65,9 +69,14 @@ public class ByteSerializer extends TypeSerializer<Byte> {
 	}
 
 	@Override
-	public Byte deserialize(Byte reuse, DataInputView source) throws IOException {
+	public Byte deserialize(DataInputView source) throws IOException {
 		return Byte.valueOf(source.readByte());
 	}
+	
+	@Override
+	public Byte deserialize(Byte reuse, DataInputView source) throws IOException {
+		return deserialize(source);
+	}
 
 	@Override
 	public void copy(DataInputView source, DataOutputView target) throws IOException {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteValueSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteValueSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteValueSerializer.java
index ab3df38..24cc98b 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteValueSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteValueSerializer.java
@@ -20,13 +20,12 @@ package org.apache.flink.api.common.typeutils.base;
 
 import java.io.IOException;
 
-import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.types.ByteValue;
 
 
-public class ByteValueSerializer extends TypeSerializer<ByteValue> {
+public final class ByteValueSerializer extends TypeSerializerSingleton<ByteValue> {
 
 	private static final long serialVersionUID = 1L;
 	
@@ -49,6 +48,11 @@ public class ByteValueSerializer extends TypeSerializer<ByteValue> {
 	}
 
 	@Override
+	public ByteValue copy(ByteValue from) {
+		return copy(from, new ByteValue());
+	}
+	
+	@Override
 	public ByteValue copy(ByteValue from, ByteValue reuse) {
 		reuse.setValue(from.getValue());
 		return reuse;
@@ -65,6 +69,11 @@ public class ByteValueSerializer extends TypeSerializer<ByteValue> {
 	}
 
 	@Override
+	public ByteValue deserialize(DataInputView source) throws IOException {
+		return deserialize(new ByteValue(), source);
+	}
+	
+	@Override
 	public ByteValue deserialize(ByteValue reuse, DataInputView source) throws IOException {
 		reuse.read(source);
 		return reuse;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharSerializer.java
index 4ef9a56..c46d3a0 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharSerializer.java
@@ -20,12 +20,11 @@ package org.apache.flink.api.common.typeutils.base;
 
 import java.io.IOException;
 
-import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 
 
-public class CharSerializer extends TypeSerializer<Character> {
+public final class CharSerializer extends TypeSerializerSingleton<Character> {
 
 	private static final long serialVersionUID = 1L;
 	
@@ -50,6 +49,11 @@ public class CharSerializer extends TypeSerializer<Character> {
 	}
 
 	@Override
+	public Character copy(Character from) {
+		return from;
+	}
+	
+	@Override
 	public Character copy(Character from, Character reuse) {
 		return from;
 	}
@@ -65,9 +69,14 @@ public class CharSerializer extends TypeSerializer<Character> {
 	}
 
 	@Override
-	public Character deserialize(Character reuse, DataInputView source) throws IOException {
+	public Character deserialize(DataInputView source) throws IOException {
 		return Character.valueOf(source.readChar());
 	}
+	
+	@Override
+	public Character deserialize(Character reuse, DataInputView source) throws IOException {
+		return deserialize(source);
+	}
 
 	@Override
 	public void copy(DataInputView source, DataOutputView target) throws IOException {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharValueSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharValueSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharValueSerializer.java
index 4946743..71a8ef4 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharValueSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharValueSerializer.java
@@ -20,13 +20,11 @@ package org.apache.flink.api.common.typeutils.base;
 
 import java.io.IOException;
 
-import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.types.CharValue;
 
-
-public class CharValueSerializer extends TypeSerializer<CharValue> {
+public class CharValueSerializer extends TypeSerializerSingleton<CharValue> {
 
 	private static final long serialVersionUID = 1L;
 	
@@ -47,6 +45,11 @@ public class CharValueSerializer extends TypeSerializer<CharValue> {
 	public CharValue createInstance() {
 		return new CharValue();
 	}
+	
+	@Override
+	public CharValue copy(CharValue from) {
+		return copy(from, new CharValue());
+	}
 
 	@Override
 	public CharValue copy(CharValue from, CharValue reuse) {
@@ -63,6 +66,11 @@ public class CharValueSerializer extends TypeSerializer<CharValue> {
 	public void serialize(CharValue record, DataOutputView target) throws IOException {
 		record.write(target);
 	}
+	
+	@Override
+	public CharValue deserialize(DataInputView source) throws IOException {
+		return deserialize(new CharValue(), source);
+	}
 
 	@Override
 	public CharValue deserialize(CharValue reuse, DataInputView source) throws IOException {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleSerializer.java
index fc8f55d..8e09f7c 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleSerializer.java
@@ -20,12 +20,10 @@ package org.apache.flink.api.common.typeutils.base;
 
 import java.io.IOException;
 
-import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 
-
-public class DoubleSerializer extends TypeSerializer<Double> {
+public final class DoubleSerializer extends TypeSerializerSingleton<Double> {
 
 	private static final long serialVersionUID = 1L;
 	
@@ -50,6 +48,11 @@ public class DoubleSerializer extends TypeSerializer<Double> {
 	}
 
 	@Override
+	public Double copy(Double from) {
+		return from;
+	}
+	
+	@Override
 	public Double copy(Double from, Double reuse) {
 		return from;
 	}
@@ -65,9 +68,14 @@ public class DoubleSerializer extends TypeSerializer<Double> {
 	}
 
 	@Override
-	public Double deserialize(Double reuse, DataInputView source) throws IOException {
+	public Double deserialize(DataInputView source) throws IOException {
 		return Double.valueOf(source.readDouble());
 	}
+	
+	@Override
+	public Double deserialize(Double reuse, DataInputView source) throws IOException {
+		return deserialize(source);
+	}
 
 	@Override
 	public void copy(DataInputView source, DataOutputView target) throws IOException {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleValueSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleValueSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleValueSerializer.java
index a19f83e..f4c7f37 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleValueSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleValueSerializer.java
@@ -20,13 +20,12 @@ package org.apache.flink.api.common.typeutils.base;
 
 import java.io.IOException;
 
-import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.types.DoubleValue;
 
 
-public class DoubleValueSerializer extends TypeSerializer<DoubleValue> {
+public final class DoubleValueSerializer extends TypeSerializerSingleton<DoubleValue> {
 
 	private static final long serialVersionUID = 1L;
 	
@@ -49,6 +48,11 @@ public class DoubleValueSerializer extends TypeSerializer<DoubleValue> {
 	}
 
 	@Override
+	public DoubleValue copy(DoubleValue from) {
+		return copy(from, new DoubleValue());
+	}
+	
+	@Override
 	public DoubleValue copy(DoubleValue from, DoubleValue reuse) {
 		reuse.setValue(from.getValue());
 		return reuse;
@@ -65,6 +69,11 @@ public class DoubleValueSerializer extends TypeSerializer<DoubleValue> {
 	}
 
 	@Override
+	public DoubleValue deserialize(DataInputView source) throws IOException {
+		return deserialize(new DoubleValue(), source);
+	}
+	
+	@Override
 	public DoubleValue deserialize(DoubleValue reuse, DataInputView source) throws IOException {
 		reuse.read(source);
 		return reuse;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatSerializer.java
index c0c7917..b1a46b0 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatSerializer.java
@@ -20,12 +20,10 @@ package org.apache.flink.api.common.typeutils.base;
 
 import java.io.IOException;
 
-import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 
-
-public class FloatSerializer extends TypeSerializer<Float> {
+public final class FloatSerializer extends TypeSerializerSingleton<Float> {
 
 	private static final long serialVersionUID = 1L;
 	
@@ -50,6 +48,11 @@ public class FloatSerializer extends TypeSerializer<Float> {
 	}
 
 	@Override
+	public Float copy(Float from) {
+		return from;
+	}
+	
+	@Override
 	public Float copy(Float from, Float reuse) {
 		return from;
 	}
@@ -65,9 +68,14 @@ public class FloatSerializer extends TypeSerializer<Float> {
 	}
 
 	@Override
-	public Float deserialize(Float reuse, DataInputView source) throws IOException {
+	public Float deserialize(DataInputView source) throws IOException {
 		return Float.valueOf(source.readFloat());
 	}
+	
+	@Override
+	public Float deserialize(Float reuse, DataInputView source) throws IOException {
+		return deserialize(source);
+	}
 
 	@Override
 	public void copy(DataInputView source, DataOutputView target) throws IOException {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatValueSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatValueSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatValueSerializer.java
index 5660dea..6ebb268 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatValueSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatValueSerializer.java
@@ -20,13 +20,12 @@ package org.apache.flink.api.common.typeutils.base;
 
 import java.io.IOException;
 
-import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.types.FloatValue;
 
 
-public class FloatValueSerializer extends TypeSerializer<FloatValue> {
+public class FloatValueSerializer extends TypeSerializerSingleton<FloatValue> {
 
 	private static final long serialVersionUID = 1L;
 	
@@ -49,6 +48,11 @@ public class FloatValueSerializer extends TypeSerializer<FloatValue> {
 	}
 
 	@Override
+	public FloatValue copy(FloatValue from) {
+		return copy(from, new FloatValue());
+	}
+	
+	@Override
 	public FloatValue copy(FloatValue from, FloatValue reuse) {
 		reuse.setValue(from.getValue());
 		return reuse;
@@ -65,6 +69,11 @@ public class FloatValueSerializer extends TypeSerializer<FloatValue> {
 	}
 
 	@Override
+	public FloatValue deserialize(DataInputView source) throws IOException {
+		return deserialize(new FloatValue(), source);
+	}
+	
+	@Override
 	public FloatValue deserialize(FloatValue reuse, DataInputView source) throws IOException {
 		reuse.read(source);
 		return reuse;
@@ -72,6 +81,6 @@ public class FloatValueSerializer extends TypeSerializer<FloatValue> {
 
 	@Override
 	public void copy(DataInputView source, DataOutputView target) throws IOException {
-		target.writeDouble(source.readFloat());
+		target.writeFloat(source.readFloat());
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java
index cfdb132..504b41b 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java
@@ -27,9 +27,11 @@ import org.apache.flink.core.memory.DataOutputView;
 
 
 /**
+ * A serializer for arrays of objects.
+ * 
  * @param <C> The component type
  */
-public class GenericArraySerializer<C> extends TypeSerializer<C[]> {
+public final class GenericArraySerializer<C> extends TypeSerializer<C[]> {
 
 	private static final long serialVersionUID = 1L;
 
@@ -40,7 +42,6 @@ public class GenericArraySerializer<C> extends TypeSerializer<C[]> {
 	private final C[] EMPTY;
 	
 	
-	
 	public GenericArraySerializer(Class<C> componentClass, TypeSerializer<C> componentSerializer) {
 		if (componentClass == null || componentSerializer == null) {
 			throw new NullPointerException();
@@ -68,7 +69,7 @@ public class GenericArraySerializer<C> extends TypeSerializer<C[]> {
 	}
 
 	@Override
-	public C[] copy(C[] from, C[] reuse) {
+	public C[] copy(C[] from) {
 		C[] copy = create(from.length);
 
 		for (int i = 0; i < copy.length; i++) {
@@ -77,6 +78,11 @@ public class GenericArraySerializer<C> extends TypeSerializer<C[]> {
 
 		return copy;
 	}
+	
+	@Override
+	public C[] copy(C[] from, C[] reuse) {
+		return copy(from);
+	}
 
 	@Override
 	public int getLength() {
@@ -98,6 +104,24 @@ public class GenericArraySerializer<C> extends TypeSerializer<C[]> {
 	}
 
 	@Override
+	public C[] deserialize(DataInputView source) throws IOException {
+		int len = source.readInt();
+		
+		C[] array = create(len);
+		
+		for (int i = 0; i < len; i++) {
+			boolean isNonNull = source.readBoolean();
+			if (isNonNull) {
+				array[i] = componentSerializer.deserialize(source);
+			} else {
+				array[i] = null;
+			}
+		}
+		
+		return array;
+	}
+	
+	@Override
 	public C[] deserialize(C[] reuse, DataInputView source) throws IOException {
 		int len = source.readInt();
 		
@@ -108,7 +132,13 @@ public class GenericArraySerializer<C> extends TypeSerializer<C[]> {
 		for (int i = 0; i < len; i++) {
 			boolean isNonNull = source.readBoolean();
 			if (isNonNull) {
-				reuse[i] = componentSerializer.deserialize(componentSerializer.createInstance(), source);
+				C ri = reuse[i];
+				if (ri == null) {
+					ri = componentSerializer.deserialize(source);
+				} else {
+					ri = componentSerializer.deserialize(ri, source);
+				}
+				reuse[i] = ri;
 			} else {
 				reuse[i] = null;
 			}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntSerializer.java
index 28192cd..2937b2a 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntSerializer.java
@@ -20,12 +20,11 @@ package org.apache.flink.api.common.typeutils.base;
 
 import java.io.IOException;
 
-import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 
 
-public class IntSerializer extends TypeSerializer<Integer> {
+public final class IntSerializer extends TypeSerializerSingleton<Integer> {
 
 	private static final long serialVersionUID = 1L;
 	
@@ -50,6 +49,11 @@ public class IntSerializer extends TypeSerializer<Integer> {
 	}
 
 	@Override
+	public Integer copy(Integer from) {
+		return from;
+	}
+	
+	@Override
 	public Integer copy(Integer from, Integer reuse) {
 		return from;
 	}
@@ -65,9 +69,14 @@ public class IntSerializer extends TypeSerializer<Integer> {
 	}
 
 	@Override
-	public Integer deserialize(Integer reuse, DataInputView source) throws IOException {
+	public Integer deserialize(DataInputView source) throws IOException {
 		return Integer.valueOf(source.readInt());
 	}
+	
+	@Override
+	public Integer deserialize(Integer reuse, DataInputView source) throws IOException {
+		return deserialize(source);
+	}
 
 	@Override
 	public void copy(DataInputView source, DataOutputView target) throws IOException {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntValueSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntValueSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntValueSerializer.java
index 9cf72d9..ec1f345 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntValueSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntValueSerializer.java
@@ -20,13 +20,12 @@ package org.apache.flink.api.common.typeutils.base;
 
 import java.io.IOException;
 
-import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.types.IntValue;
 
 
-public class IntValueSerializer extends TypeSerializer<IntValue> {
+public final class IntValueSerializer extends TypeSerializerSingleton<IntValue> {
 
 	private static final long serialVersionUID = 1L;
 	
@@ -49,6 +48,11 @@ public class IntValueSerializer extends TypeSerializer<IntValue> {
 	}
 
 	@Override
+	public IntValue copy(IntValue from) {
+		return copy(from, new IntValue());
+	}
+	
+	@Override
 	public IntValue copy(IntValue from, IntValue reuse) {
 		reuse.setValue(from.getValue());
 		return reuse;
@@ -65,6 +69,11 @@ public class IntValueSerializer extends TypeSerializer<IntValue> {
 	}
 
 	@Override
+	public IntValue deserialize(DataInputView source) throws IOException {
+		return deserialize(new IntValue(), source);
+	}
+	
+	@Override
 	public IntValue deserialize(IntValue reuse, DataInputView source) throws IOException {
 		reuse.read(source);
 		return reuse;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongSerializer.java
index 2ca2b84..6b25596 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongSerializer.java
@@ -20,12 +20,11 @@ package org.apache.flink.api.common.typeutils.base;
 
 import java.io.IOException;
 
-import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 
 
-public class LongSerializer extends TypeSerializer<Long> {
+public final class LongSerializer extends TypeSerializerSingleton<Long> {
 
 	private static final long serialVersionUID = 1L;
 	
@@ -50,6 +49,11 @@ public class LongSerializer extends TypeSerializer<Long> {
 	}
 
 	@Override
+	public Long copy(Long from) {
+		return from;
+	}
+	
+	@Override
 	public Long copy(Long from, Long reuse) {
 		return from;
 	}
@@ -65,9 +69,14 @@ public class LongSerializer extends TypeSerializer<Long> {
 	}
 
 	@Override
-	public Long deserialize(Long reuse, DataInputView source) throws IOException {
+	public Long deserialize(DataInputView source) throws IOException {
 		return Long.valueOf(source.readLong());
 	}
+	
+	@Override
+	public Long deserialize(Long reuse, DataInputView source) throws IOException {
+		return deserialize(source);
+	}
 
 	@Override
 	public void copy(DataInputView source, DataOutputView target) throws IOException {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongValueSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongValueSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongValueSerializer.java
index ea45dcc..95caf04 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongValueSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongValueSerializer.java
@@ -20,13 +20,12 @@ package org.apache.flink.api.common.typeutils.base;
 
 import java.io.IOException;
 
-import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.types.LongValue;
 
 
-public class LongValueSerializer extends TypeSerializer<LongValue> {
+public final class LongValueSerializer extends TypeSerializerSingleton<LongValue> {
 
 	private static final long serialVersionUID = 1L;
 	
@@ -49,6 +48,11 @@ public class LongValueSerializer extends TypeSerializer<LongValue> {
 	}
 
 	@Override
+	public LongValue copy(LongValue from) {
+		return copy(from, new LongValue());
+	}
+	
+	@Override
 	public LongValue copy(LongValue from, LongValue reuse) {
 		reuse.setValue(from.getValue());
 		return reuse;
@@ -65,6 +69,11 @@ public class LongValueSerializer extends TypeSerializer<LongValue> {
 	}
 
 	@Override
+	public LongValue deserialize(DataInputView source) throws IOException {
+		return deserialize(new LongValue(), source);
+	}
+	
+	@Override
 	public LongValue deserialize(LongValue reuse, DataInputView source) throws IOException {
 		reuse.read(source);
 		return reuse;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortSerializer.java
index 156732c..c6e7870 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortSerializer.java
@@ -20,12 +20,11 @@ package org.apache.flink.api.common.typeutils.base;
 
 import java.io.IOException;
 
-import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 
 
-public class ShortSerializer extends TypeSerializer<Short> {
+public final class ShortSerializer extends TypeSerializerSingleton<Short> {
 
 	private static final long serialVersionUID = 1L;
 	
@@ -50,6 +49,11 @@ public class ShortSerializer extends TypeSerializer<Short> {
 	}
 
 	@Override
+	public Short copy(Short from) {
+		return from;
+	}
+	
+	@Override
 	public Short copy(Short from, Short reuse) {
 		return from;
 	}
@@ -65,9 +69,14 @@ public class ShortSerializer extends TypeSerializer<Short> {
 	}
 
 	@Override
-	public Short deserialize(Short reuse, DataInputView source) throws IOException {
+	public Short deserialize(DataInputView source) throws IOException {
 		return Short.valueOf(source.readShort());
 	}
+	
+	@Override
+	public Short deserialize(Short reuse, DataInputView source) throws IOException {
+		return deserialize(source);
+	}
 
 	@Override
 	public void copy(DataInputView source, DataOutputView target) throws IOException {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortValueSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortValueSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortValueSerializer.java
index ac46b46..ab58987 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortValueSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortValueSerializer.java
@@ -20,13 +20,12 @@ package org.apache.flink.api.common.typeutils.base;
 
 import java.io.IOException;
 
-import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.types.ShortValue;
 
 
-public class ShortValueSerializer extends TypeSerializer<ShortValue> {
+public final class ShortValueSerializer extends TypeSerializerSingleton<ShortValue> {
 
 	private static final long serialVersionUID = 1L;
 	
@@ -49,6 +48,11 @@ public class ShortValueSerializer extends TypeSerializer<ShortValue> {
 	}
 
 	@Override
+	public ShortValue copy(ShortValue from) {
+		return copy(from, new ShortValue());
+	}
+	
+	@Override
 	public ShortValue copy(ShortValue from, ShortValue reuse) {
 		reuse.setValue(from.getValue());
 		return reuse;
@@ -65,6 +69,11 @@ public class ShortValueSerializer extends TypeSerializer<ShortValue> {
 	}
 
 	@Override
+	public ShortValue deserialize(DataInputView source) throws IOException {
+		return deserialize(new ShortValue(), source);
+	}
+	
+	@Override
 	public ShortValue deserialize(ShortValue reuse, DataInputView source) throws IOException {
 		reuse.read(source);
 		return reuse;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringSerializer.java
index d71b521..71221a2 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringSerializer.java
@@ -20,13 +20,12 @@ package org.apache.flink.api.common.typeutils.base;
 
 import java.io.IOException;
 
-import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.types.StringValue;
 
 
-public class StringSerializer extends TypeSerializer<String> {
+public final class StringSerializer extends TypeSerializerSingleton<String> {
 
 	private static final long serialVersionUID = 1L;
 	
@@ -50,6 +49,11 @@ public class StringSerializer extends TypeSerializer<String> {
 	}
 
 	@Override
+	public String copy(String from) {
+		return from;
+	}
+	
+	@Override
 	public String copy(String from, String reuse) {
 		return from;
 	}
@@ -65,9 +69,14 @@ public class StringSerializer extends TypeSerializer<String> {
 	}
 
 	@Override
-	public String deserialize(String record, DataInputView source) throws IOException {
+	public String deserialize(DataInputView source) throws IOException {
 		return StringValue.readString(source);
 	}
+	
+	@Override
+	public String deserialize(String record, DataInputView source) throws IOException {
+		return deserialize(source);
+	}
 
 	@Override
 	public void copy(DataInputView source, DataOutputView target) throws IOException {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringValueSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringValueSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringValueSerializer.java
index 352330f..c5d5b55 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringValueSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringValueSerializer.java
@@ -20,16 +20,17 @@ package org.apache.flink.api.common.typeutils.base;
 
 import java.io.IOException;
 
-import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.types.StringValue;
 
 
-public class StringValueSerializer extends TypeSerializer<StringValue> {
+public final class StringValueSerializer extends TypeSerializerSingleton<StringValue> {
 
 	private static final long serialVersionUID = 1L;
 	
+	private static final int HIGH_BIT = 0x1 << 7;
+	
 	public static final StringValueSerializer INSTANCE = new StringValueSerializer();
 	
 	
@@ -49,6 +50,11 @@ public class StringValueSerializer extends TypeSerializer<StringValue> {
 	}
 
 	@Override
+	public StringValue copy(StringValue from) {
+		return copy(from, new StringValue());
+	}
+	
+	@Override
 	public StringValue copy(StringValue from, StringValue reuse) {
 		reuse.setValue(from);
 		return reuse;
@@ -65,6 +71,11 @@ public class StringValueSerializer extends TypeSerializer<StringValue> {
 	}
 
 	@Override
+	public StringValue deserialize(DataInputView source) throws IOException {
+		return deserialize(new StringValue(), source);
+	}
+	
+	@Override
 	public StringValue deserialize(StringValue reuse, DataInputView source) throws IOException {
 		reuse.read(source);
 		return reuse;
@@ -72,6 +83,29 @@ public class StringValueSerializer extends TypeSerializer<StringValue> {
 
 	@Override
 	public void copy(DataInputView source, DataOutputView target) throws IOException {
-		StringValue.copyString(source, target);
+		int len = source.readUnsignedByte();
+		target.writeByte(len);
+
+		if (len >= HIGH_BIT) {
+			int shift = 7;
+			int curr;
+			len = len & 0x7f;
+			while ((curr = source.readUnsignedByte()) >= HIGH_BIT) {
+				target.writeByte(curr);
+				len |= (curr & 0x7f) << shift;
+				shift += 7;
+			}
+			target.writeByte(curr);
+			len |= curr << shift;
+		}
+
+		for (int i = 0; i < len; i++) {
+			int c = source.readUnsignedByte();
+			target.writeByte(c);
+			while (c >= HIGH_BIT) {
+				c = source.readUnsignedByte();
+				target.writeByte(c);
+			}
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/TypeSerializerSingleton.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/TypeSerializerSingleton.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/TypeSerializerSingleton.java
new file mode 100644
index 0000000..979d5ab
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/TypeSerializerSingleton.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+public abstract class TypeSerializerSingleton<T> extends TypeSerializer<T>{
+
+	private static final long serialVersionUID = 8766687317209282373L;
+
+	// --------------------------------------------------------------------------------------------
+	
+	@Override
+	public int hashCode() {
+		return super.hashCode();
+	}
+	
+	@Override
+	public boolean equals(Object obj) {
+		return obj != null && obj.getClass() == this.getClass();
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BooleanPrimitiveArraySerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BooleanPrimitiveArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BooleanPrimitiveArraySerializer.java
index 0093463..e9941a8 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BooleanPrimitiveArraySerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BooleanPrimitiveArraySerializer.java
@@ -20,14 +20,14 @@ package org.apache.flink.api.common.typeutils.base.array;
 
 import java.io.IOException;
 
-import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 
 /**
- * A serializer for long arrays.
+ * A serializer for boolean arrays.
  */
-public class BooleanPrimitiveArraySerializer extends TypeSerializer<boolean[]>{
+public final class BooleanPrimitiveArraySerializer extends TypeSerializerSingleton<boolean[]>{
 
 	private static final long serialVersionUID = 1L;
 	
@@ -52,13 +52,18 @@ public class BooleanPrimitiveArraySerializer extends TypeSerializer<boolean[]>{
 	}
 
 	@Override
-	public boolean[] copy(boolean[] from, boolean[] reuse) {
+	public boolean[] copy(boolean[] from) {
 		boolean[] copy = new boolean[from.length];
 		System.arraycopy(from, 0, copy, 0, from.length);
 		return copy;
 	}
 
 	@Override
+	public boolean[] copy(boolean[] from, boolean[] reuse) {
+		return copy(from);
+	}
+
+	@Override
 	public int getLength() {
 		return -1;
 	}
@@ -79,15 +84,20 @@ public class BooleanPrimitiveArraySerializer extends TypeSerializer<boolean[]>{
 
 
 	@Override
-	public boolean[] deserialize(boolean[] reuse, DataInputView source) throws IOException {
+	public boolean[] deserialize(DataInputView source) throws IOException {
 		final int len = source.readInt();
-		reuse = new boolean[len];
+		boolean[] result = new boolean[len];
 		
 		for (int i = 0; i < len; i++) {
-			reuse[i] = source.readBoolean();
+			result[i] = source.readBoolean();
 		}
 		
-		return reuse;
+		return result;
+	}
+	
+	@Override
+	public boolean[] deserialize(boolean[] reuse, DataInputView source) throws IOException {
+		return deserialize(source);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BytePrimitiveArraySerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BytePrimitiveArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BytePrimitiveArraySerializer.java
index fa0638a..aaf867f 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BytePrimitiveArraySerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BytePrimitiveArraySerializer.java
@@ -20,14 +20,14 @@ package org.apache.flink.api.common.typeutils.base.array;
 
 import java.io.IOException;
 
-import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 
 /**
- * A serializer for long arrays.
+ * A serializer for byte arrays.
  */
-public class BytePrimitiveArraySerializer extends TypeSerializer<byte[]>{
+public final class BytePrimitiveArraySerializer extends TypeSerializerSingleton<byte[]>{
 
 	private static final long serialVersionUID = 1L;
 	
@@ -51,11 +51,16 @@ public class BytePrimitiveArraySerializer extends TypeSerializer<byte[]>{
 	}
 
 	@Override
-	public byte[] copy(byte[] from, byte[] reuse) {
+	public byte[] copy(byte[] from) {
 		byte[] copy = new byte[from.length];
 		System.arraycopy(from, 0, copy, 0, from.length);
 		return copy;
 	}
+	
+	@Override
+	public byte[] copy(byte[] from, byte[] reuse) {
+		return copy(from);
+	}
 
 	@Override
 	public int getLength() {
@@ -74,13 +79,17 @@ public class BytePrimitiveArraySerializer extends TypeSerializer<byte[]>{
 		target.write(record);
 	}
 
-
 	@Override
-	public byte[] deserialize(byte[] reuse, DataInputView source) throws IOException {
+	public byte[] deserialize(DataInputView source) throws IOException {
 		final int len = source.readInt();
-		reuse = new byte[len];
-		source.readFully(reuse);
-		return reuse;
+		byte[] result = new byte[len];
+		source.readFully(result);
+		return result;
+	}
+	
+	@Override
+	public byte[] deserialize(byte[] reuse, DataInputView source) throws IOException {
+		return deserialize(source);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/CharPrimitiveArraySerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/CharPrimitiveArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/CharPrimitiveArraySerializer.java
index 639d4b6..64632bd 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/CharPrimitiveArraySerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/CharPrimitiveArraySerializer.java
@@ -20,14 +20,14 @@ package org.apache.flink.api.common.typeutils.base.array;
 
 import java.io.IOException;
 
-import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 
 /**
- * A serializer for long arrays.
+ * A serializer for char arrays.
  */
-public class CharPrimitiveArraySerializer extends TypeSerializer<char[]>{
+public final class CharPrimitiveArraySerializer extends TypeSerializerSingleton<char[]>{
 
 	private static final long serialVersionUID = 1L;
 	
@@ -52,11 +52,16 @@ public class CharPrimitiveArraySerializer extends TypeSerializer<char[]>{
 	}
 
 	@Override
-	public char[] copy(char[] from, char[] reuse) {
+	public char[] copy(char[] from) {
 		char[] copy = new char[from.length];
 		System.arraycopy(from, 0, copy, 0, from.length);
 		return copy;
 	}
+	
+	@Override
+	public char[] copy(char[] from, char[] reuse) {
+		return copy(from);
+	}
 
 	@Override
 	public int getLength() {
@@ -77,17 +82,21 @@ public class CharPrimitiveArraySerializer extends TypeSerializer<char[]>{
 		}
 	}
 
-
 	@Override
-	public char[] deserialize(char[] reuse, DataInputView source) throws IOException {
+	public char[] deserialize(DataInputView source) throws IOException {
 		final int len = source.readInt();
-		reuse = new char[len];
+		char[] result = new char[len];
 		
 		for (int i = 0; i < len; i++) {
-			reuse[i] = source.readChar();
+			result[i] = source.readChar();
 		}
 		
-		return reuse;
+		return result;
+	}
+
+	@Override
+	public char[] deserialize(char[] reuse, DataInputView source) throws IOException {
+		return deserialize(source);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/DoublePrimitiveArraySerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/DoublePrimitiveArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/DoublePrimitiveArraySerializer.java
index 2b089a3..846ae74 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/DoublePrimitiveArraySerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/DoublePrimitiveArraySerializer.java
@@ -20,14 +20,14 @@ package org.apache.flink.api.common.typeutils.base.array;
 
 import java.io.IOException;
 
-import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 
 /**
- * A serializer for long arrays.
+ * A serializer for double arrays.
  */
-public class DoublePrimitiveArraySerializer extends TypeSerializer<double[]>{
+public final class DoublePrimitiveArraySerializer extends TypeSerializerSingleton<double[]>{
 
 	private static final long serialVersionUID = 1L;
 	
@@ -50,15 +50,20 @@ public class DoublePrimitiveArraySerializer extends TypeSerializer<double[]>{
 	public double[] createInstance() {
 		return EMPTY;
 	}
-
+	
 	@Override
-	public double[] copy(double[] from, double[] reuse) {
+	public double[] copy(double[] from) {
 		double[] copy = new double[from.length];
 		System.arraycopy(from, 0, copy, 0, from.length);
 		return copy;
 	}
 
 	@Override
+	public double[] copy(double[] from, double[] reuse) {
+		return copy(from);
+	}
+
+	@Override
 	public int getLength() {
 		return -1;
 	}
@@ -77,17 +82,21 @@ public class DoublePrimitiveArraySerializer extends TypeSerializer<double[]>{
 		}
 	}
 
-
 	@Override
-	public double[] deserialize(double[] reuse, DataInputView source) throws IOException {
+	public double[] deserialize(DataInputView source) throws IOException {
 		final int len = source.readInt();
-		reuse = new double[len];
+		double[] result = new double[len];
 		
 		for (int i = 0; i < len; i++) {
-			reuse[i] = source.readDouble();
+			result[i] = source.readDouble();
 		}
 		
-		return reuse;
+		return result;
+	}
+	
+	@Override
+	public double[] deserialize(double[] reuse, DataInputView source) throws IOException {
+		return deserialize(source);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/FloatPrimitiveArraySerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/FloatPrimitiveArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/FloatPrimitiveArraySerializer.java
index 897292e..8f42ac8 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/FloatPrimitiveArraySerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/FloatPrimitiveArraySerializer.java
@@ -20,14 +20,14 @@ package org.apache.flink.api.common.typeutils.base.array;
 
 import java.io.IOException;
 
-import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 
 /**
- * A serializer for long arrays.
+ * A serializer for float arrays.
  */
-public class FloatPrimitiveArraySerializer extends TypeSerializer<float[]>{
+public final class FloatPrimitiveArraySerializer extends TypeSerializerSingleton<float[]>{
 
 	private static final long serialVersionUID = 1L;
 	
@@ -52,11 +52,16 @@ public class FloatPrimitiveArraySerializer extends TypeSerializer<float[]>{
 	}
 
 	@Override
-	public float[] copy(float[] from, float[] reuse) {
+	public float[] copy(float[] from) {
 		float[] copy = new float[from.length];
 		System.arraycopy(from, 0, copy, 0, from.length);
 		return copy;
 	}
+	
+	@Override
+	public float[] copy(float[] from, float[] reuse) {
+		return copy(from);
+	}
 
 	@Override
 	public int getLength() {
@@ -77,17 +82,21 @@ public class FloatPrimitiveArraySerializer extends TypeSerializer<float[]>{
 		}
 	}
 
-
 	@Override
-	public float[] deserialize(float[] reuse, DataInputView source) throws IOException {
+	public float[] deserialize(DataInputView source) throws IOException {
 		final int len = source.readInt();
-		reuse = new float[len];
+		float[] result = new float[len];
 		
 		for (int i = 0; i < len; i++) {
-			reuse[i] = source.readFloat();
+			result[i] = source.readFloat();
 		}
 		
-		return reuse;
+		return result;
+	}
+	
+	@Override
+	public float[] deserialize(float[] reuse, DataInputView source) throws IOException {
+		return deserialize(source);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/IntPrimitiveArraySerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/IntPrimitiveArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/IntPrimitiveArraySerializer.java
index aeaf35e..2ab056c 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/IntPrimitiveArraySerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/IntPrimitiveArraySerializer.java
@@ -20,14 +20,14 @@ package org.apache.flink.api.common.typeutils.base.array;
 
 import java.io.IOException;
 
-import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 
 /**
- * A serializer for long arrays.
+ * A serializer for int arrays.
  */
-public class IntPrimitiveArraySerializer extends TypeSerializer<int[]>{
+public class IntPrimitiveArraySerializer extends TypeSerializerSingleton<int[]>{
 
 	private static final long serialVersionUID = 1L;
 	
@@ -52,11 +52,16 @@ public class IntPrimitiveArraySerializer extends TypeSerializer<int[]>{
 	}
 
 	@Override
-	public int[] copy(int[] from, int[] reuse) {
+	public int[] copy(int[] from) {
 		int[] copy = new int[from.length];
 		System.arraycopy(from, 0, copy, 0, from.length);
 		return copy;
 	}
+	
+	@Override
+	public int[] copy(int[] from, int[] reuse) {
+		return copy(from);
+	}
 
 	@Override
 	public int getLength() {
@@ -77,17 +82,21 @@ public class IntPrimitiveArraySerializer extends TypeSerializer<int[]>{
 		}
 	}
 
-
 	@Override
-	public int[] deserialize(int[] reuse, DataInputView source) throws IOException {
+	public int[] deserialize(DataInputView source) throws IOException {
 		final int len = source.readInt();
-		reuse = new int[len];
+		int[] result = new int[len];
 		
 		for (int i = 0; i < len; i++) {
-			reuse[i] = source.readInt();
+			result[i] = source.readInt();
 		}
 		
-		return reuse;
+		return result;
+	}
+	
+	@Override
+	public int[] deserialize(int[] reuse, DataInputView source) throws IOException {
+		return deserialize(source);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/LongPrimitiveArraySerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/LongPrimitiveArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/LongPrimitiveArraySerializer.java
index 953e8ff..5d34dfe 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/LongPrimitiveArraySerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/LongPrimitiveArraySerializer.java
@@ -20,14 +20,14 @@ package org.apache.flink.api.common.typeutils.base.array;
 
 import java.io.IOException;
 
-import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 
 /**
  * A serializer for long arrays.
  */
-public class LongPrimitiveArraySerializer extends TypeSerializer<long[]>{
+public final class LongPrimitiveArraySerializer extends TypeSerializerSingleton<long[]>{
 
 	private static final long serialVersionUID = 1L;
 	
@@ -52,10 +52,15 @@ public class LongPrimitiveArraySerializer extends TypeSerializer<long[]>{
 	}
 
 	@Override
+	public long[] copy(long[] from) {
+		long[] result = new long[from.length];
+		System.arraycopy(from, 0, result, 0, from.length);
+		return result;
+	}
+	
+	@Override
 	public long[] copy(long[] from, long[] reuse) {
-		reuse = new long[from.length];
-		System.arraycopy(from, 0, reuse, 0, from.length);
-		return reuse;
+		return copy(from);
 	}
 
 	@Override
@@ -77,17 +82,21 @@ public class LongPrimitiveArraySerializer extends TypeSerializer<long[]>{
 		}
 	}
 
-
 	@Override
-	public long[] deserialize(long[] reuse, DataInputView source) throws IOException {
+	public long[] deserialize(DataInputView source) throws IOException {
 		final int len = source.readInt();
-		reuse = new long[len];
+		long[] array = new long[len];
 		
 		for (int i = 0; i < len; i++) {
-			reuse[i] = source.readLong();
+			array[i] = source.readLong();
 		}
 		
-		return reuse;
+		return array;
+	}
+	
+	@Override
+	public long[] deserialize(long[] reuse, DataInputView source) throws IOException {
+		return deserialize(source);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/ShortPrimitiveArraySerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/ShortPrimitiveArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/ShortPrimitiveArraySerializer.java
index 014fd05..2f37033 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/ShortPrimitiveArraySerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/ShortPrimitiveArraySerializer.java
@@ -20,14 +20,14 @@ package org.apache.flink.api.common.typeutils.base.array;
 
 import java.io.IOException;
 
-import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 
 /**
- * A serializer for long arrays.
+ * A serializer for short arrays.
  */
-public class ShortPrimitiveArraySerializer extends TypeSerializer<short[]>{
+public final class ShortPrimitiveArraySerializer extends TypeSerializerSingleton<short[]>{
 
 	private static final long serialVersionUID = 1L;
 	
@@ -52,11 +52,16 @@ public class ShortPrimitiveArraySerializer extends TypeSerializer<short[]>{
 	}
 
 	@Override
-	public short[] copy(short[] from, short[] reuse) {
+	public short[] copy(short[] from) {
 		short[] copy = new short[from.length];
 		System.arraycopy(from, 0, copy, 0, from.length);
 		return copy;
 	}
+	
+	@Override
+	public short[] copy(short[] from, short[] reuse) {
+		return copy(from);
+	}
 
 	@Override
 	public int getLength() {
@@ -77,17 +82,21 @@ public class ShortPrimitiveArraySerializer extends TypeSerializer<short[]>{
 		}
 	}
 
-
 	@Override
-	public short[] deserialize(short[] reuse, DataInputView source) throws IOException {
+	public short[] deserialize(DataInputView source) throws IOException {
 		final int len = source.readInt();
-		reuse = new short[len];
+		short[] array = new short[len];
 		
 		for (int i = 0; i < len; i++) {
-			reuse[i] = source.readShort();
+			array[i] = source.readShort();
 		}
 		
-		return reuse;
+		return array;
+	}
+
+	@Override
+	public short[] deserialize(short[] reuse, DataInputView source) throws IOException {
+		return deserialize(source);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/StringArraySerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/StringArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/StringArraySerializer.java
index c6c1826..d5ab030 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/StringArraySerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/StringArraySerializer.java
@@ -20,7 +20,7 @@ package org.apache.flink.api.common.typeutils.base.array;
 
 import java.io.IOException;
 
-import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.types.StringValue;
@@ -29,7 +29,7 @@ import org.apache.flink.types.StringValue;
 /**
  * A serializer for String arrays. Specialized for efficiency.
  */
-public class StringArraySerializer extends TypeSerializer<String[]>{
+public final class StringArraySerializer extends TypeSerializerSingleton<String[]>{
 
 	private static final long serialVersionUID = 1L;
 	
@@ -54,10 +54,15 @@ public class StringArraySerializer extends TypeSerializer<String[]>{
 	}
 
 	@Override
+	public String[] copy(String[] from) {
+		String[] target = new String[from.length];
+		System.arraycopy(from, 0, target, 0, from.length);
+		return target;
+	}
+	
+	@Override
 	public String[] copy(String[] from, String[] reuse) {
-		reuse = new String[from.length];
-		System.arraycopy(from, 0, reuse, 0, from.length);
-		return reuse;
+		return copy(from);
 	}
 
 	@Override
@@ -65,7 +70,6 @@ public class StringArraySerializer extends TypeSerializer<String[]>{
 		return -1;
 	}
 
-
 	@Override
 	public void serialize(String[] record, DataOutputView target) throws IOException {
 		if (record == null) {
@@ -79,17 +83,21 @@ public class StringArraySerializer extends TypeSerializer<String[]>{
 		}
 	}
 
-
 	@Override
-	public String[] deserialize(String[] reuse, DataInputView source) throws IOException {
+	public String[] deserialize(DataInputView source) throws IOException {
 		final int len = source.readInt();
-		reuse = new String[len];
+		String[] array = new String[len];
 		
 		for (int i = 0; i < len; i++) {
-			reuse[i] = StringValue.readString(source);
+			array[i] = StringValue.readString(source);
 		}
 		
-		return reuse;
+		return array;
+	}
+	
+	@Override
+	public String[] deserialize(String[] reuse, DataInputView source) throws IOException {
+		return deserialize(source);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/main/java/org/apache/flink/api/common/typeutils/record/RecordSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/record/RecordSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/record/RecordSerializer.java
index 7e8762e..7b72e89 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/record/RecordSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/record/RecordSerializer.java
@@ -67,12 +67,16 @@ public final class RecordSerializer extends TypeSerializer<Record> {
 	}
 
 	@Override
+	public Record copy(Record from) {
+		return from.createCopy();
+	}
+	
+	@Override
 	public Record copy(Record from, Record reuse) {
 		from.copyTo(reuse);
 		return reuse;
 	}
 	
-
 	@Override
 	public int getLength() {
 		return -1;
@@ -86,6 +90,11 @@ public final class RecordSerializer extends TypeSerializer<Record> {
 	}
 
 	@Override
+	public Record deserialize(DataInputView source) throws IOException {
+		return deserialize(new Record(), source);
+	}
+	
+	@Override
 	public Record deserialize(Record target, DataInputView source) throws IOException {
 		target.deserialize(source);
 		return target;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
index 6ca3f6c..d509284 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
@@ -89,6 +89,24 @@ public abstract class SerializerTestBase<T> {
 	}
 	
 	@Test
+	public void testCopy() {
+		try {
+			TypeSerializer<T> serializer = getSerializer();
+			T[] testData = getData();
+			
+			for (T datum : testData) {
+				T copy = serializer.copy(datum);
+				deepEquals("Copied element is not equal to the original element.", datum, copy);
+			}
+		}
+		catch (Exception e) {
+			System.err.println(e.getMessage());
+			e.printStackTrace();
+			fail("Exception in test: " + e.getMessage());
+		}
+	}
+	
+	@Test
 	public void testCopyIntoNewElements() {
 		try {
 			TypeSerializer<T> serializer = getSerializer();
@@ -184,7 +202,36 @@ public abstract class SerializerTestBase<T> {
 	}
 	
 	@Test
-	public void testSerializeAsSequence() {
+	public void testSerializeAsSequenceNoReuse() {
+		try {
+			TypeSerializer<T> serializer = getSerializer();
+			T[] testData = getData();
+			
+			TestOutputView out = new TestOutputView();
+			for (T value : testData) {
+				serializer.serialize(value, out);
+			}
+			
+			TestInputView in = out.getInputView();
+			
+			int num = 0;
+			while (in.available() > 0) {
+				T deserialized = serializer.deserialize(in);
+				deepEquals("Deserialized value if wrong.", testData[num], deserialized);
+				num++;
+			}
+			
+			assertEquals("Wrong number of elements deserialized.", testData.length, num);
+		}
+		catch (Exception e) {
+			System.err.println(e.getMessage());
+			e.printStackTrace();
+			fail("Exception in test: " + e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testSerializeAsSequenceReusingValues() {
 		try {
 			TypeSerializer<T> serializer = getSerializer();
 			T[] testData = getData();

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestInstance.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestInstance.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestInstance.java
index 2eb52c0..5b63633 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestInstance.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestInstance.java
@@ -69,11 +69,13 @@ public class
 	public void testAll() {
 		testInstantiate();
 		testGetLength();
+		testCopy();
 		testCopyIntoNewElements();
 		testCopyIntoReusedElements();
 		testSerializeIndividually();
 		testSerializeIndividuallyReusingValues();
-		testSerializeAsSequence();
+		testSerializeAsSequenceNoReuse();
+		testSerializeAsSequenceReusingValues();
 		testSerializedCopyIndividually();
 		testSerializedCopyAsSequence();
 		testSerializabilityAndEquals();

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/BooleanValueSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/BooleanValueSerializerTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/BooleanValueSerializerTest.java
new file mode 100644
index 0000000..43f1a57
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/BooleanValueSerializerTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import java.util.Random;
+
+import org.apache.flink.api.common.typeutils.SerializerTestBase;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.types.BooleanValue;
+
+/**
+ * A test for the {@link BooleanValueSerializer}.
+ */
+public class BooleanValueSerializerTest extends SerializerTestBase<BooleanValue> {
+	
+	@Override
+	protected TypeSerializer<BooleanValue> createSerializer() {
+		return new BooleanValueSerializer();
+	}
+	
+	@Override
+	protected int getLength() {
+		return 1;
+	}
+	
+	@Override
+	protected Class<BooleanValue> getTypeClass() {
+		return BooleanValue.class;
+	}
+	
+	@Override
+	protected BooleanValue[] getTestData() {
+		Random rnd = new Random(874597969123412341L);
+		
+		return new BooleanValue[] {new BooleanValue(true), new BooleanValue(false),
+								new BooleanValue(rnd.nextBoolean()),
+								new BooleanValue(rnd.nextBoolean()),
+								new BooleanValue(rnd.nextBoolean())};
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/ByteValueSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/ByteValueSerializerTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/ByteValueSerializerTest.java
new file mode 100644
index 0000000..0e16629
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/ByteValueSerializerTest.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import java.util.Random;
+
+import org.apache.flink.api.common.typeutils.SerializerTestBase;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.types.ByteValue;
+
+/**
+ * A test for the {@link ByteValueSerializer}.
+ */
+public class ByteValueSerializerTest extends SerializerTestBase<ByteValue> {
+	
+	@Override
+	protected TypeSerializer<ByteValue> createSerializer() {
+		return new ByteValueSerializer();
+	}
+	
+	@Override
+	protected int getLength() {
+		return 1;
+	}
+	
+	@Override
+	protected Class<ByteValue> getTypeClass() {
+		return ByteValue.class;
+	}
+	
+	@Override
+	protected ByteValue[] getTestData() {
+		Random rnd = new Random(874597969123412341L);
+		byte byteArray[] = new byte[1];
+		rnd.nextBytes(byteArray);
+		
+		return new ByteValue[] {new ByteValue((byte) 0), new ByteValue((byte) 1), new ByteValue((byte) -1), 
+							new ByteValue(Byte.MAX_VALUE), new ByteValue(Byte.MIN_VALUE),
+							new ByteValue(byteArray[0]), new ByteValue((byte) -byteArray[0])};
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/CharValueSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/CharValueSerializerTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/CharValueSerializerTest.java
new file mode 100644
index 0000000..ac83666
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/CharValueSerializerTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import java.util.Random;
+
+import org.apache.flink.api.common.typeutils.SerializerTestBase;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.types.CharValue;
+
+/**
+ * A test for the {@link CharValueSerializer}.
+ */
+public class CharValueSerializerTest extends SerializerTestBase<CharValue> {
+	
+	@Override
+	protected TypeSerializer<CharValue> createSerializer() {
+		return new CharValueSerializer();
+	}
+	
+	@Override
+	protected int getLength() {
+		return 2;
+	}
+	
+	@Override
+	protected Class<CharValue> getTypeClass() {
+		return CharValue.class;
+	}
+	
+	@Override
+	protected CharValue[] getTestData() {
+		Random rnd = new Random(874597969123412341L);
+		int rndInt = rnd.nextInt((int) Character.MAX_VALUE);
+		
+		return new CharValue[] {new CharValue('a'), new CharValue('@'), new CharValue('ä'),
+								new CharValue('1'), new CharValue((char) rndInt),
+								new CharValue(Character.MAX_VALUE), new CharValue(Character.MIN_VALUE)};
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/DoubleValueSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/DoubleValueSerializerTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/DoubleValueSerializerTest.java
new file mode 100644
index 0000000..c0a2b24
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/DoubleValueSerializerTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import java.util.Random;
+
+import org.apache.flink.api.common.typeutils.SerializerTestBase;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.types.DoubleValue;
+
+/**
+ * A test for the {@link DoubleValueSerializer}.
+ */
+public class DoubleValueSerializerTest extends SerializerTestBase<DoubleValue> {
+	
+	@Override
+	protected TypeSerializer<DoubleValue> createSerializer() {
+		return new DoubleValueSerializer();
+	}
+	
+	@Override
+	protected int getLength() {
+		return 8;
+	}
+	
+	@Override
+	protected Class<DoubleValue> getTypeClass() {
+		return DoubleValue.class;
+	}
+	
+	@Override
+	protected DoubleValue[] getTestData() {
+		Random rnd = new Random(874597969123412341L);
+		Double rndDouble = rnd.nextDouble() * Double.MAX_VALUE;
+		
+		return new DoubleValue[] {new DoubleValue(0), new DoubleValue(1), new DoubleValue(-1),
+							new DoubleValue(Double.MAX_VALUE), new DoubleValue(Double.MIN_VALUE),
+							new DoubleValue(rndDouble), new DoubleValue(-rndDouble),
+							new DoubleValue(Double.NaN),
+							new DoubleValue(Double.NEGATIVE_INFINITY), new DoubleValue(Double.POSITIVE_INFINITY)};
+	}
+}