You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2015/11/30 16:29:53 UTC

flink git commit: [FLINK-3088] [serialization] Fix copy method of TypeSerializer which use Kryo

Repository: flink
Updated Branches:
  refs/heads/master 209ae6c91 -> 20ebad048


[FLINK-3088] [serialization] Fix copy method of TypeSerializer which use Kryo

Some TypeSerializer, WritableSerializer, ValueSerializer, and AvroSerializer, and
comparators, WritableComparator and ValueComparator, use Kryo to copy records.
In case where the Kryo serializer cannot copy the record, the copy method fails.
This is however not necessary, because one can copy the element by serializing
the record to a byte array and deserializing it from this array. This PR adds
this behaviour to the respective classes.

Adds KryoUtils tool with copy method to avoid code duplication

This closes #1415.

Adds comments to KryoUtils functions


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/20ebad04
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/20ebad04
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/20ebad04

Branch: refs/heads/master
Commit: 20ebad048d3b4621a6e801a4d24a15d4468f2ba6
Parents: 209ae6c
Author: Till Rohrmann <tr...@apache.org>
Authored: Fri Nov 27 15:14:15 2015 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Mon Nov 30 16:29:22 2015 +0100

----------------------------------------------------------------------
 .../apache/flink/util/InstantiationUtil.java    | 11 ++-
 .../java/typeutils/runtime/AvroSerializer.java  | 30 ++-----
 .../api/java/typeutils/runtime/KryoUtils.java   | 87 ++++++++++++++++++++
 .../java/typeutils/runtime/ValueComparator.java |  9 +-
 .../java/typeutils/runtime/ValueSerializer.java | 12 ++-
 .../typeutils/runtime/WritableComparator.java   |  9 +-
 .../typeutils/runtime/WritableSerializer.java   | 17 +++-
 .../runtime/ValueComparatorUUIDTest.java        | 46 +++++++++++
 .../api/java/typeutils/runtime/ValueID.java     | 72 ++++++++++++++++
 .../runtime/ValueSerializerUUIDTest.java        | 50 +++++++++++
 .../runtime/WritableComparatorUUIDTest.java     | 46 +++++++++++
 .../api/java/typeutils/runtime/WritableID.java  | 78 ++++++++++++++++++
 .../runtime/WritableSerializerTest.java         |  1 -
 .../runtime/WritableSerializerUUIDTest.java     | 50 +++++++++++
 14 files changed, 487 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/20ebad04/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java b/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java
index 8ce3e85..4f6ee32 100644
--- a/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java
+++ b/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java
@@ -279,9 +279,16 @@ public final class InstantiationUtil {
 		}
 
 		InputViewDataInputStreamWrapper inputViewWrapper = new InputViewDataInputStreamWrapper(new DataInputStream(new ByteArrayInputStream(buf)));
+		return serializer.deserialize(inputViewWrapper);
+	}
+
+	public static <T> T deserializeFromByteArray(TypeSerializer<T> serializer, T reuse, byte[] buf) throws IOException {
+		if (buf == null) {
+			throw new NullPointerException("Byte array to deserialize from must not be null.");
+		}
 
-		T record = serializer.createInstance();
-		return serializer.deserialize(record, inputViewWrapper);
+		InputViewDataInputStreamWrapper inputViewWrapper = new InputViewDataInputStreamWrapper(new DataInputStream(new ByteArrayInputStream(buf)));
+		return serializer.deserialize(reuse, inputViewWrapper);
 	}
 	
 	@SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/flink/blob/20ebad04/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java
index 26bf4ce..bc04367 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java
@@ -18,13 +18,8 @@
 
 package org.apache.flink.api.java.typeutils.runtime;
 
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 
-import com.esotericsoftware.kryo.KryoException;
-import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.io.Output;
 import com.google.common.base.Preconditions;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.reflect.ReflectDatumReader;
@@ -37,6 +32,7 @@ import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.util.InstantiationUtil;
 
 import com.esotericsoftware.kryo.Kryo;
+import org.objenesis.strategy.StdInstantiatorStrategy;
 
 
 /**
@@ -96,28 +92,15 @@ public final class AvroSerializer<T> extends TypeSerializer<T> {
 	@Override
 	public T copy(T from) {
 		checkKryoInitialized();
-		return this.kryo.copy(from);
+
+		return KryoUtils.copy(from, kryo, this);
 	}
 	
 	@Override
 	public T copy(T from, T reuse) {
 		checkKryoInitialized();
-		try {
-			return this.kryo.copy(from);
-		} catch(KryoException ke) {
-			// kryo was unable to copy it, so we do it through serialization:
-			ByteArrayOutputStream baout = new ByteArrayOutputStream();
-			Output output = new Output(baout);
-
-			kryo.writeObject(output, from);
-
-			output.close();
-
-			ByteArrayInputStream bain = new ByteArrayInputStream(baout.toByteArray());
-			Input input = new Input(bain);
 
-			return (T)kryo.readObject(input, from.getClass());
-		}
+		return KryoUtils.copy(from, reuse, kryo, this);
 	}
 
 	@Override
@@ -174,6 +157,11 @@ public final class AvroSerializer<T> extends TypeSerializer<T> {
 	private void checkKryoInitialized() {
 		if (this.kryo == null) {
 			this.kryo = new Kryo();
+
+			Kryo.DefaultInstantiatorStrategy instantiatorStrategy = new Kryo.DefaultInstantiatorStrategy();
+			instantiatorStrategy.setFallbackInstantiatorStrategy(new StdInstantiatorStrategy());
+			kryo.setInstantiatorStrategy(instantiatorStrategy);
+
 			// register Avro types.
 			this.kryo.register(GenericData.Array.class, new Serializers.SpecificInstanceCollectionSerializerForArrayList());
 			this.kryo.register(Utf8.class);

http://git-wip-us.apache.org/repos/asf/flink/blob/20ebad04/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoUtils.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoUtils.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoUtils.java
new file mode 100644
index 0000000..faf5646
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoUtils.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.KryoException;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.util.InstantiationUtil;
+
+import java.io.IOException;
+
+/**
+ * Convenience methods for Kryo
+ */
+public class KryoUtils {
+
+	/**
+	 * Tries to copy the given record from using the provided Kryo instance. If this fails, then
+	 * the record from is copied by serializing it into a byte buffer and deserializing it from
+	 * there.
+	 *
+	 * @param from Element to copy
+	 * @param kryo Kryo instance to use
+	 * @param serializer TypeSerializer which is used in case of a Kryo failure
+	 * @param <T> Type of the element to be copied
+	 * @return Copied element
+	 */
+	public static <T> T copy(T from, Kryo kryo, TypeSerializer<T> serializer) {
+		try {
+			return kryo.copy(from);
+		} catch (KryoException ke) {
+			// Kryo could not copy the object --> try to serialize/deserialize the object
+			try {
+				byte[] byteArray = InstantiationUtil.serializeToByteArray(serializer, from);
+
+				return InstantiationUtil.deserializeFromByteArray(serializer, byteArray);
+			} catch (IOException ioe) {
+				throw new RuntimeException("Could not copy object by serializing/deserializing" +
+					" it.", ioe);
+			}
+		}
+	}
+
+	/**
+	 * Tries to copy the given record from using the provided Kryo instance. If this fails, then
+	 * the record from is copied by serializing it into a byte buffer and deserializing it from
+	 * there.
+	 *
+	 * @param from Element to copy
+	 * @param reuse Reuse element for the deserialization
+	 * @param kryo Kryo instance to use
+	 * @param serializer TypeSerializer which is used in case of a Kryo failure
+	 * @param <T> Type of the element to be copied
+	 * @return Copied element
+	 */
+	public static <T> T copy(T from, T reuse, Kryo kryo, TypeSerializer<T> serializer) {
+		try {
+			return kryo.copy(from);
+		} catch (KryoException ke) {
+			// Kryo could not copy the object --> try to serialize/deserialize the object
+			try {
+				byte[] byteArray = InstantiationUtil.serializeToByteArray(serializer, from);
+
+				return InstantiationUtil.deserializeFromByteArray(serializer, reuse, byteArray);
+			} catch (IOException ioe) {
+				throw new RuntimeException("Could not copy object by serializing/deserializing" +
+					" it.", ioe);
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/20ebad04/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueComparator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueComparator.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueComparator.java
index 0393c8a..4b9629a 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueComparator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueComparator.java
@@ -29,6 +29,7 @@ import org.apache.flink.types.Value;
 import org.apache.flink.util.InstantiationUtil;
 
 import com.esotericsoftware.kryo.Kryo;
+import org.objenesis.strategy.StdInstantiatorStrategy;
 
 /**
  * Comparator for all Value types that extend Key
@@ -63,7 +64,8 @@ public class ValueComparator<T extends Value & Comparable<T>> extends TypeCompar
 	@Override
 	public void setReference(T toCompare) {
 		checkKryoInitialized();
-		reference = this.kryo.copy(toCompare);
+
+		reference = KryoUtils.copy(toCompare, kryo, new ValueSerializer<T>(type));
 	}
 
 	@Override
@@ -138,6 +140,11 @@ public class ValueComparator<T extends Value & Comparable<T>> extends TypeCompar
 	private void checkKryoInitialized() {
 		if (this.kryo == null) {
 			this.kryo = new Kryo();
+
+			Kryo.DefaultInstantiatorStrategy instantiatorStrategy = new Kryo.DefaultInstantiatorStrategy();
+			instantiatorStrategy.setFallbackInstantiatorStrategy(new StdInstantiatorStrategy());
+			kryo.setInstantiatorStrategy(instantiatorStrategy);
+
 			this.kryo.setAsmEnabled(true);
 			this.kryo.register(type);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/20ebad04/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializer.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializer.java
index 179ef19..9329866 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializer.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializer.java
@@ -28,6 +28,7 @@ import org.apache.flink.types.Value;
 import org.apache.flink.util.InstantiationUtil;
 
 import com.esotericsoftware.kryo.Kryo;
+import org.objenesis.strategy.StdInstantiatorStrategy;
 
 /**
  * Serializer for {@link Value} types. Uses the value's serialization methods, and uses
@@ -71,13 +72,15 @@ public class ValueSerializer<T extends Value> extends TypeSerializer<T> {
 	@Override
 	public T copy(T from) {
 		checkKryoInitialized();
-		return this.kryo.copy(from);
+
+		return KryoUtils.copy(from, kryo, this);
 	}
 	
 	@Override
 	public T copy(T from, T reuse) {
 		checkKryoInitialized();
-		return this.kryo.copy(from);
+
+		return KryoUtils.copy(from, reuse, kryo, this);
 	}
 
 	@Override
@@ -114,6 +117,11 @@ public class ValueSerializer<T extends Value> extends TypeSerializer<T> {
 	private void checkKryoInitialized() {
 		if (this.kryo == null) {
 			this.kryo = new Kryo();
+
+			Kryo.DefaultInstantiatorStrategy instantiatorStrategy = new Kryo.DefaultInstantiatorStrategy();
+			instantiatorStrategy.setFallbackInstantiatorStrategy(new StdInstantiatorStrategy());
+			kryo.setInstantiatorStrategy(instantiatorStrategy);
+
 			this.kryo.setAsmEnabled(true);
 			this.kryo.register(type);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/20ebad04/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableComparator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableComparator.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableComparator.java
index 6bb8d8b..a03369a 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableComparator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableComparator.java
@@ -29,6 +29,7 @@ import org.apache.flink.util.InstantiationUtil;
 import org.apache.hadoop.io.Writable;
 
 import com.esotericsoftware.kryo.Kryo;
+import org.objenesis.strategy.StdInstantiatorStrategy;
 
 public class WritableComparator<T extends Writable & Comparable<T>> extends TypeComparator<T> {
 	
@@ -60,7 +61,8 @@ public class WritableComparator<T extends Writable & Comparable<T>> extends Type
 	@Override
 	public void setReference(T toCompare) {
 		checkKryoInitialized();
-		reference = this.kryo.copy(toCompare);
+
+		reference = KryoUtils.copy(toCompare, kryo, new WritableSerializer<T>(type));
 	}
 	
 	@Override
@@ -163,6 +165,11 @@ public class WritableComparator<T extends Writable & Comparable<T>> extends Type
 	private void checkKryoInitialized() {
 		if (this.kryo == null) {
 			this.kryo = new Kryo();
+
+			Kryo.DefaultInstantiatorStrategy instantiatorStrategy = new Kryo.DefaultInstantiatorStrategy();
+			instantiatorStrategy.setFallbackInstantiatorStrategy(new StdInstantiatorStrategy());
+			kryo.setInstantiatorStrategy(instantiatorStrategy);
+
 			this.kryo.setAsmEnabled(true);
 			this.kryo.register(type);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/20ebad04/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java
index d854f52..258d92c 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.api.java.typeutils.runtime;
 
-import java.io.IOException;
 
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataInputView;
@@ -28,6 +27,9 @@ import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Writable;
 
 import com.esotericsoftware.kryo.Kryo;
+import org.objenesis.strategy.StdInstantiatorStrategy;
+
+import java.io.IOException;
 
 public class WritableSerializer<T extends Writable> extends TypeSerializer<T> {
 	
@@ -51,17 +53,21 @@ public class WritableSerializer<T extends Writable> extends TypeSerializer<T> {
 		}
 		return InstantiationUtil.instantiate(typeClass);
 	}
+
+
 	
 	@Override
 	public T copy(T from) {
 		checkKryoInitialized();
-		return this.kryo.copy(from);
+
+		return KryoUtils.copy(from, kryo, this);
 	}
 	
 	@Override
 	public T copy(T from, T reuse) {
 		checkKryoInitialized();
-		return this.kryo.copy(from);
+
+		return KryoUtils.copy(from, reuse, kryo, this);
 	}
 	
 	@Override
@@ -113,6 +119,11 @@ public class WritableSerializer<T extends Writable> extends TypeSerializer<T> {
 	private void checkKryoInitialized() {
 		if (this.kryo == null) {
 			this.kryo = new Kryo();
+
+			Kryo.DefaultInstantiatorStrategy instantiatorStrategy = new Kryo.DefaultInstantiatorStrategy();
+			instantiatorStrategy.setFallbackInstantiatorStrategy(new StdInstantiatorStrategy());
+			kryo.setInstantiatorStrategy(instantiatorStrategy);
+
 			this.kryo.setAsmEnabled(true);
 			this.kryo.register(typeClass);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/20ebad04/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/ValueComparatorUUIDTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/ValueComparatorUUIDTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/ValueComparatorUUIDTest.java
new file mode 100644
index 0000000..a2c3e1e
--- /dev/null
+++ b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/ValueComparatorUUIDTest.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.typeutils.ComparatorTestBase;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+import java.util.UUID;
+
+public class ValueComparatorUUIDTest extends ComparatorTestBase<ValueID> {
+	@Override
+	protected TypeComparator<ValueID> createComparator(boolean ascending) {
+		return new ValueComparator<>(ascending, ValueID.class);
+	}
+
+	@Override
+	protected TypeSerializer<ValueID> createSerializer() {
+		return new ValueSerializer<>(ValueID.class);
+	}
+
+	@Override
+	protected ValueID[] getSortedTestData() {
+		return new ValueID[] {
+			new ValueID(new UUID(0, 0)),
+			new ValueID(new UUID(1, 0)),
+			new ValueID(new UUID(1, 1))
+		};
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/20ebad04/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/ValueID.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/ValueID.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/ValueID.java
new file mode 100644
index 0000000..d644485
--- /dev/null
+++ b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/ValueID.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.types.Value;
+
+import java.io.IOException;
+import java.util.UUID;
+
+public class ValueID implements Value, Comparable<ValueID> {
+	private static final long serialVersionUID = -562791433077971752L;
+
+	private UUID id;
+
+	public ValueID() {
+		id = UUID.randomUUID();
+	}
+
+	public ValueID(UUID id) {
+		this.id = id;
+	}
+
+	@Override
+	public int compareTo(ValueID o) {
+		return id.compareTo(o.id);
+	}
+
+	@Override
+	public void write(DataOutputView out) throws IOException {
+		out.writeLong(id.getMostSignificantBits());
+		out.writeLong(id.getLeastSignificantBits());
+	}
+
+	@Override
+	public void read(DataInputView in) throws IOException {
+		id = new UUID(in.readLong(), in.readLong());
+	}
+
+	@Override
+	public boolean equals(Object obj) {
+		if (obj instanceof ValueID) {
+			ValueID other = (ValueID) obj;
+
+			return id.equals(other.id);
+		} else {
+			return false;
+		}
+	}
+
+	@Override
+	public int hashCode() {
+		return id.hashCode();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/20ebad04/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializerUUIDTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializerUUIDTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializerUUIDTest.java
new file mode 100644
index 0000000..9c07a5e
--- /dev/null
+++ b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializerUUIDTest.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.typeutils.SerializerTestBase;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+import java.util.UUID;
+
+public class ValueSerializerUUIDTest extends SerializerTestBase<ValueID> {
+	@Override
+	protected TypeSerializer<ValueID> createSerializer() {
+		return new ValueSerializer<>(ValueID.class);
+	}
+
+	@Override
+	protected int getLength() {
+		return -1;
+	}
+
+	@Override
+	protected Class<ValueID> getTypeClass() {
+		return ValueID.class;
+	}
+
+	@Override
+	protected ValueID[] getTestData() {
+		return new ValueID[] {
+			new ValueID(new UUID(0, 0)),
+			new ValueID(new UUID(1, 0)),
+			new ValueID(new UUID(1, 1))
+		};
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/20ebad04/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableComparatorUUIDTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableComparatorUUIDTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableComparatorUUIDTest.java
new file mode 100644
index 0000000..94e759d
--- /dev/null
+++ b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableComparatorUUIDTest.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.typeutils.ComparatorTestBase;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+import java.util.UUID;
+
+public class WritableComparatorUUIDTest extends ComparatorTestBase<WritableID> {
+	@Override
+	protected TypeComparator<WritableID> createComparator(boolean ascending) {
+		return new WritableComparator<>(ascending, WritableID.class);
+	}
+
+	@Override
+	protected TypeSerializer<WritableID> createSerializer() {
+		return new WritableSerializer<>(WritableID.class);
+	}
+
+	@Override
+	protected WritableID[] getSortedTestData() {
+		return new WritableID[] {
+			new WritableID(new UUID(0, 0)),
+			new WritableID(new UUID(1, 0)),
+			new WritableID(new UUID(1, 1))
+		};
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/20ebad04/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableID.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableID.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableID.java
new file mode 100644
index 0000000..4274cf6
--- /dev/null
+++ b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableID.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.hadoop.io.WritableComparable;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.UUID;
+
+public class WritableID implements WritableComparable<WritableID> {
+	private UUID uuid;
+
+	public WritableID() {
+		this.uuid = UUID.randomUUID();
+	}
+
+	public WritableID(UUID uuid) {
+		this.uuid = uuid;
+	}
+
+	@Override
+	public int compareTo(WritableID o) {
+		return this.uuid.compareTo(o.uuid);
+	}
+
+	@Override
+	public void write(DataOutput dataOutput) throws IOException {
+		dataOutput.writeLong(uuid.getMostSignificantBits());
+		dataOutput.writeLong(uuid.getLeastSignificantBits());
+	}
+
+	@Override
+	public void readFields(DataInput dataInput) throws IOException {
+		this.uuid = new UUID(dataInput.readLong(), dataInput.readLong());
+	}
+
+	@Override
+	public String toString() {
+		return uuid.toString();
+	}
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+		if (o == null || getClass() != o.getClass()) {
+			return false;
+		}
+
+		WritableID id = (WritableID) o;
+
+		return !(uuid != null ? !uuid.equals(id.uuid) : id.uuid != null);
+	}
+
+	@Override
+	public int hashCode() {
+		return uuid != null ? uuid.hashCode() : 0;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/20ebad04/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializerTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializerTest.java
index 557c0e4..bb5f4d4 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializerTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializerTest.java
@@ -47,5 +47,4 @@ public class WritableSerializerTest {
 		
 		testInstance.testAll();
 	}
-	
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/20ebad04/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializerUUIDTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializerUUIDTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializerUUIDTest.java
new file mode 100644
index 0000000..2af7730
--- /dev/null
+++ b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializerUUIDTest.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.typeutils.SerializerTestBase;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+import java.util.UUID;
+
+public class WritableSerializerUUIDTest extends SerializerTestBase<WritableID> {
+	@Override
+	protected TypeSerializer<WritableID> createSerializer() {
+		return new WritableSerializer<>(WritableID.class);
+	}
+
+	@Override
+	protected int getLength() {
+		return -1;
+	}
+
+	@Override
+	protected Class<WritableID> getTypeClass() {
+		return WritableID.class;
+	}
+
+	@Override
+	protected WritableID[] getTestData() {
+		return new WritableID[] {
+			new WritableID(new UUID(0, 0)),
+			new WritableID(new UUID(1, 0)),
+			new WritableID(new UUID(1, 1))
+		};
+	}
+}