You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2015/09/17 14:04:05 UTC

[3/7] flink git commit: [FLINK-2637] [api-breaking] [types] Adds equals and hashCode method to TypeInformations and TypeSerializers - Fixes ObjectArrayTypeInfo - Makes CompositeTypes serializable - Adds test for equality relation's symmetric property - A

http://git-wip-us.apache.org/repos/asf/flink/blob/b49b1552/flink-java/src/main/java/org/apache/flink/api/java/typeutils/EnumTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/EnumTypeInfo.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/EnumTypeInfo.java
index 024eb71..de59c36 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/EnumTypeInfo.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/EnumTypeInfo.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.api.java.typeutils;
 
+import com.google.common.base.Preconditions;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.AtomicType;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -38,12 +39,12 @@ public class EnumTypeInfo<T extends Enum<T>> extends TypeInformation<T> implemen
 	private final Class<T> typeClass;
 
 	public EnumTypeInfo(Class<T> typeClass) {
-		if (typeClass == null) {
-			throw new NullPointerException();
-		}
+		Preconditions.checkNotNull(typeClass, "Enum type class must not be null.");
+
 		if (!Enum.class.isAssignableFrom(typeClass) ) {
 			throw new IllegalArgumentException("EnumTypeInfo can only be used for subclasses of " + Enum.class.getName());
 		}
+
 		this.typeClass = typeClass;
 	}
 
@@ -98,13 +99,22 @@ public class EnumTypeInfo<T extends Enum<T>> extends TypeInformation<T> implemen
 	
 	@Override
 	public int hashCode() {
-		return typeClass.hashCode() ^ 0xd3a2646c;
+		return typeClass.hashCode();
 	}
-	
+
+	@Override
+	public boolean canEqual(Object obj) {
+		return obj instanceof EnumTypeInfo;
+	}
+
 	@Override
 	public boolean equals(Object obj) {
 		if (obj instanceof EnumTypeInfo) {
-			return typeClass == ((EnumTypeInfo<?>) obj).typeClass;
+			@SuppressWarnings("unchecked")
+			EnumTypeInfo<T> enumTypeInfo = (EnumTypeInfo<T>) obj;
+
+			return enumTypeInfo.canEqual(this) &&
+				typeClass == enumTypeInfo.typeClass;
 		} else {
 			return false;
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/b49b1552/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java
index 5caf8f2..7e7aa68 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.api.java.typeutils;
 
+import com.google.common.base.Preconditions;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.AtomicType;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -34,7 +35,7 @@ public class GenericTypeInfo<T> extends TypeInformation<T> implements AtomicType
 	private final Class<T> typeClass;
 
 	public GenericTypeInfo(Class<T> typeClass) {
-		this.typeClass = typeClass;
+		this.typeClass = Preconditions.checkNotNull(typeClass);
 	}
 
 	@Override
@@ -88,13 +89,21 @@ public class GenericTypeInfo<T> extends TypeInformation<T> implements AtomicType
 
 	@Override
 	public int hashCode() {
-		return typeClass.hashCode() ^ 0x165667b1;
+		return typeClass.hashCode();
+	}
+
+	@Override
+	public boolean canEqual(Object obj) {
+		return obj instanceof GenericTypeInfo;
 	}
 
 	@Override
 	public boolean equals(Object obj) {
-		if (obj.getClass() == GenericTypeInfo.class) {
-			return typeClass == ((GenericTypeInfo<?>) obj).typeClass;
+		if (obj instanceof GenericTypeInfo) {
+			@SuppressWarnings("unchecked")
+			GenericTypeInfo<T> genericTypeInfo = (GenericTypeInfo<T>) obj;
+
+			return typeClass == genericTypeInfo.typeClass;
 		} else {
 			return false;
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/b49b1552/flink-java/src/main/java/org/apache/flink/api/java/typeutils/MissingTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/MissingTypeInfo.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/MissingTypeInfo.java
index 227c68c..1dd7f01 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/MissingTypeInfo.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/MissingTypeInfo.java
@@ -31,8 +31,8 @@ public class MissingTypeInfo extends TypeInformation<InvalidTypesException> {
 
 	private static final long serialVersionUID = -4212082837126702723L;
 	
-	private String functionName;
-	private InvalidTypesException typeException;
+	private final String functionName;
+	private final InvalidTypesException typeException;
 
 	
 	public MissingTypeInfo(String functionName) {
@@ -87,6 +87,34 @@ public class MissingTypeInfo extends TypeInformation<InvalidTypesException> {
 	}
 
 	@Override
+	public String toString() {
+		return getClass().getSimpleName() + "<" + functionName + ", " + typeException.getMessage() + ">";
+	}
+
+	@Override
+	public boolean equals(Object obj) {
+		if (obj instanceof MissingTypeInfo) {
+			MissingTypeInfo missingTypeInfo = (MissingTypeInfo) obj;
+
+			return missingTypeInfo.canEqual(this) &&
+				functionName.equals(missingTypeInfo.functionName) &&
+				typeException.equals(missingTypeInfo.typeException);
+		} else {
+			return false;
+		}
+	}
+
+	@Override
+	public int hashCode() {
+		return 31 * functionName.hashCode() + typeException.hashCode();
+	}
+
+	@Override
+	public boolean canEqual(Object obj) {
+		return obj instanceof MissingTypeInfo;
+	}
+
+	@Override
 	public int getTotalFields() {
 		throw new UnsupportedOperationException("The missing type information cannot be used as a type information.");
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/b49b1552/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ObjectArrayTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ObjectArrayTypeInfo.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ObjectArrayTypeInfo.java
index 6806122..150c976 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ObjectArrayTypeInfo.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ObjectArrayTypeInfo.java
@@ -19,13 +19,9 @@
 package org.apache.flink.api.java.typeutils;
 
 import java.lang.reflect.Array;
-import java.lang.reflect.GenericArrayType;
-import java.lang.reflect.ParameterizedType;
-import java.lang.reflect.Type;
 
+import com.google.common.base.Preconditions;
 import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.InvalidTypesException;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.base.GenericArraySerializer;
@@ -34,21 +30,12 @@ public class ObjectArrayTypeInfo<T, C> extends TypeInformation<T> {
 
 	private static final long serialVersionUID = 1L;
 	
-	private final Type arrayType;
-	private final Type componentType;
+	private final Class<T> arrayType;
 	private final TypeInformation<C> componentInfo;
 
-	@SuppressWarnings("unchecked")
-	private ObjectArrayTypeInfo(Type arrayType, Type componentType) {
-		this.arrayType = arrayType;
-		this.componentType = componentType;
-		this.componentInfo = (TypeInformation<C>) TypeExtractor.createTypeInfo(componentType);
-	}
-
-	private ObjectArrayTypeInfo(Type arrayType, Type componentType, TypeInformation<C> componentInfo) {
-		this.arrayType = arrayType;
-		this.componentType = componentType;
-		this.componentInfo = componentInfo;
+	private ObjectArrayTypeInfo(Class<T> arrayType, TypeInformation<C> componentInfo) {
+		this.arrayType = Preconditions.checkNotNull(arrayType);
+		this.componentInfo = Preconditions.checkNotNull(componentInfo);
 	}
 
 	// --------------------------------------------------------------------------------------------
@@ -76,29 +63,9 @@ public class ObjectArrayTypeInfo<T, C> extends TypeInformation<T> {
 	@SuppressWarnings("unchecked")
 	@Override
 	public Class<T> getTypeClass() {
-		// if arrayType is a Class
-		if (arrayType instanceof Class) {
-			return (Class<T>) arrayType;
-		}
-
-		// if arrayType is a GenericArrayType
-		Class<?> componentClass = (Class<?>) ((ParameterizedType) componentType).getRawType();
-
-		try {
-			return (Class<T>) Class.forName("[L" + componentClass.getName() + ";");
-		} catch (ClassNotFoundException e) {
-			throw new RuntimeException("Cannot create non-generic type class.", e);
-		}
-	}
-
-	public Type getType() {
 		return arrayType;
 	}
 
-	public Type getComponentType() {
-		return this.componentType;
-	}
-
 	public TypeInformation<C> getComponentInfo() {
 		return componentInfo;
 	}
@@ -111,15 +78,9 @@ public class ObjectArrayTypeInfo<T, C> extends TypeInformation<T> {
 	@SuppressWarnings("unchecked")
 	@Override
 	public TypeSerializer<T> createSerializer(ExecutionConfig executionConfig) {
-		// use raw type for serializer if generic array type
-		if (this.componentType instanceof GenericArrayType) {
-			ParameterizedType paramType = (ParameterizedType) ((GenericArrayType) this.componentType).getGenericComponentType();
-
-			return (TypeSerializer<T>) new GenericArraySerializer<C>((Class<C>) paramType.getRawType(),
-					this.componentInfo.createSerializer(executionConfig));
-		} else {
-			return (TypeSerializer<T>) new GenericArraySerializer<C>((Class<C>) this.componentType, this.componentInfo.createSerializer(executionConfig));
-		}
+		return (TypeSerializer<T>) new GenericArraySerializer<C>(
+			componentInfo.getTypeClass(),
+			componentInfo.createSerializer(executionConfig));
 	}
 
 	@Override
@@ -128,38 +89,37 @@ public class ObjectArrayTypeInfo<T, C> extends TypeInformation<T> {
 	}
 
 	@Override
-	public boolean equals(Object o) {
-		if (this == o) {
-			return true;
-		}
-		if (o == null || getClass() != o.getClass()) {
+	public boolean equals(Object obj) {
+		if (obj instanceof ObjectArrayTypeInfo) {
+			@SuppressWarnings("unchecked")
+			ObjectArrayTypeInfo<T, C> objectArrayTypeInfo = (ObjectArrayTypeInfo<T, C>)obj;
+
+			return objectArrayTypeInfo.canEqual(this) &&
+				arrayType == objectArrayTypeInfo.arrayType &&
+				componentInfo.equals(objectArrayTypeInfo.componentInfo);
+		} else {
 			return false;
 		}
+	}
 
-		ObjectArrayTypeInfo<?, ?> that = (ObjectArrayTypeInfo<?, ?>) o;
-		return this.arrayType.equals(that.arrayType) && this.componentType.equals(that.componentType);
+	@Override
+	public boolean canEqual(Object obj) {
+		return obj instanceof ObjectArrayTypeInfo;
 	}
 
 	@Override
 	public int hashCode() {
-		return 31 * this.arrayType.hashCode() + this.componentType.hashCode();
+		return 31 * this.arrayType.hashCode() + this.componentInfo.hashCode();
 	}
 
 	// --------------------------------------------------------------------------------------------
 
-	public static <T, C> ObjectArrayTypeInfo<T, C> getInfoFor(Type type, TypeInformation<C> componentInfo) {
-		// generic array type e.g. for Tuples
-		if (type instanceof GenericArrayType) {
-			GenericArrayType genericArray = (GenericArrayType) type;
-			return new ObjectArrayTypeInfo<T, C>(type, genericArray.getGenericComponentType(), componentInfo);
-		}
-		// for tuples without generics (e.g. generated by the TypeInformation parser)
-		// and multidimensional arrays (e.g. in scala)
-		else if (type instanceof Class<?> && ((Class<?>) type).isArray()
-				&& BasicTypeInfo.getInfoFor((Class<?>) type) == null) {
-			return new ObjectArrayTypeInfo<T, C>(type, ((Class<?>) type).getComponentType(), componentInfo);
-		}
-		throw new InvalidTypesException("The given type is not a valid object array.");
+	public static <T, C> ObjectArrayTypeInfo<T, C> getInfoFor(Class<T> arrayClass, TypeInformation<C> componentInfo) {
+		Preconditions.checkNotNull(arrayClass);
+		Preconditions.checkNotNull(componentInfo);
+		Preconditions.checkArgument(arrayClass.isArray(), "Class " + arrayClass + " must be an array.");
+
+		return new ObjectArrayTypeInfo<T, C>(arrayClass, componentInfo);
 	}
 
 	/**
@@ -170,20 +130,12 @@ public class ObjectArrayTypeInfo<T, C> extends TypeInformation<T> {
 	 * This must be used in cases where the complete type of the array is not available as a
 	 * {@link java.lang.reflect.Type} or {@link java.lang.Class}.
 	 */
+	@SuppressWarnings("unchecked")
 	public static <T, C> ObjectArrayTypeInfo<T, C> getInfoFor(TypeInformation<C> componentInfo) {
-		return new ObjectArrayTypeInfo<T, C>(
-				Array.newInstance(componentInfo.getTypeClass(), 0).getClass(),
-				componentInfo.getTypeClass(),
-				componentInfo);
-	}
+		Preconditions.checkNotNull(componentInfo);
 
-	@SuppressWarnings("unchecked")
-	public static <T, C> ObjectArrayTypeInfo<T, C> getInfoFor(Type type) {
-		// class type e.g. for POJOs
-		if (type instanceof Class<?> && ((Class<?>) type).isArray() && BasicTypeInfo.getInfoFor((Class<C>) type) == null) {
-			Class<C> array = (Class<C>) type;
-			return new ObjectArrayTypeInfo<T, C>(type, array.getComponentType());
-		}
-		throw new InvalidTypesException("The given type is not a valid object array.");
+		return new ObjectArrayTypeInfo<T, C>(
+			(Class<T>)Array.newInstance(componentInfo.getTypeClass(), 0).getClass(),
+			componentInfo);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b49b1552/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoField.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoField.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoField.java
index 91a7a5e..4ad0ac2 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoField.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoField.java
@@ -23,16 +23,29 @@ import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 import java.io.Serializable;
 import java.lang.reflect.Field;
+import java.util.Objects;
 
+import com.google.common.base.Preconditions;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 
 public class PojoField implements Serializable {
-	public transient Field field;
-	public TypeInformation<?> type;
+
+	private static final long serialVersionUID = 1975295846436559363L;
+
+	private transient Field field;
+	private final TypeInformation<?> type;
 
 	public PojoField(Field field, TypeInformation<?> type) {
-		this.field = field;
-		this.type = type;
+		this.field = Preconditions.checkNotNull(field);
+		this.type = Preconditions.checkNotNull(type);
+	}
+
+	public Field getField() {
+		return field;
+	}
+
+	public TypeInformation<?> getTypeInformation() {
+		return type;
 	}
 
 	private void writeObject(ObjectOutputStream out)
@@ -68,4 +81,25 @@ public class PojoField implements Serializable {
 	public String toString() {
 		return "PojoField " + field.getDeclaringClass() + "." + field.getName() + " (" + type + ")";
 	}
+
+	@Override
+	public boolean equals(Object obj) {
+		if (obj instanceof PojoField) {
+			PojoField other = (PojoField) obj;
+
+			return other.canEqual(this) && type.equals(other.type) &&
+				Objects.equals(field, other.field);
+		} else {
+			return false;
+		}
+	}
+
+	@Override
+	public int hashCode() {
+		return Objects.hash(field, type);
+	}
+
+	public boolean canEqual(Object obj) {
+		return obj instanceof PojoField;
+	}
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/b49b1552/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
index 273a907..f7e4e42 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
@@ -22,12 +22,12 @@ import java.lang.reflect.Field;
 import java.lang.reflect.Modifier;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+import com.google.common.base.Preconditions;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.CompositeType;
@@ -40,8 +40,6 @@ import org.apache.flink.api.java.operators.Keys.ExpressionKeys;
 
 import com.google.common.base.Joiner;
 import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * TypeInformation for "Java Beans"-style types. Flink refers to them as POJOs,
@@ -59,8 +57,6 @@ public class PojoTypeInfo<T> extends CompositeType<T> {
 	
 	private static final long serialVersionUID = 1L;
 
-	private static final Logger LOG = LoggerFactory.getLogger(PojoTypeInfo.class);
-
 	private final static String REGEX_FIELD = "[\\p{L}_\\$][\\p{L}\\p{Digit}_\\$]*";
 	private final static String REGEX_NESTED_FIELDS = "("+REGEX_FIELD+")(\\.(.+))?";
 	private final static String REGEX_NESTED_FIELDS_WILDCARD = REGEX_NESTED_FIELDS
@@ -70,31 +66,32 @@ public class PojoTypeInfo<T> extends CompositeType<T> {
 	private static final Pattern PATTERN_NESTED_FIELDS = Pattern.compile(REGEX_NESTED_FIELDS);
 	private static final Pattern PATTERN_NESTED_FIELDS_WILDCARD = Pattern.compile(REGEX_NESTED_FIELDS_WILDCARD);
 
-	private final Class<T> typeClass;
-
-	private PojoField[] fields;
+	private final PojoField[] fields;
 	
-	private int totalFields;
+	private final int totalFields;
 
 	public PojoTypeInfo(Class<T> typeClass, List<PojoField> fields) {
 		super(typeClass);
-		this.typeClass = typeClass;
-		List<PojoField> tempFields = new ArrayList<PojoField>(fields);
-		Collections.sort(tempFields, new Comparator<PojoField>() {
+
+		Preconditions.checkArgument(Modifier.isPublic(typeClass.getModifiers()),
+				"POJO " + typeClass + " is not public");
+
+		this.fields = fields.toArray(new PojoField[fields.size()]);
+
+		Arrays.sort(this.fields, new Comparator<PojoField>() {
 			@Override
 			public int compare(PojoField o1, PojoField o2) {
-				return o1.field.getName().compareTo(o2.field.getName());
+				return o1.getField().getName().compareTo(o2.getField().getName());
 			}
 		});
-		this.fields = tempFields.toArray(new PojoField[tempFields.size()]);
-		
-		// check if POJO is public
-		if(!Modifier.isPublic(typeClass.getModifiers())) {
-			throw new RuntimeException("POJO "+typeClass+" is not public");
-		}
+
+		int counterFields = 0;
+
 		for(PojoField field : fields) {
-			totalFields += field.type.getTotalFields();
+			counterFields += field.getTypeInformation().getTotalFields();
 		}
+
+		totalFields = counterFields;
 	}
 
 	@Override
@@ -119,11 +116,6 @@ public class PojoTypeInfo<T> extends CompositeType<T> {
 	}
 
 	@Override
-	public Class<T> getTypeClass() {
-		return typeClass;
-	}
-
-	@Override
 	public boolean isSortKeyType() {
 		// Support for sorting POJOs that implement Comparable is not implemented yet.
 		// Since the order of fields in a POJO type is not well defined, sorting on fields
@@ -145,12 +137,16 @@ public class PojoTypeInfo<T> extends CompositeType<T> {
 			// handle select all
 			int keyPosition = 0;
 			for(PojoField pField : fields) {
-				if(pField.type instanceof CompositeType) {
-					CompositeType<?> cType = (CompositeType<?>)pField.type;
+				if(pField.getTypeInformation() instanceof CompositeType) {
+					CompositeType<?> cType = (CompositeType<?>)pField.getTypeInformation();
 					cType.getFlatFields(String.valueOf(ExpressionKeys.SELECT_ALL_CHAR), offset + keyPosition, result);
 					keyPosition += cType.getTotalFields()-1;
 				} else {
-					result.add(new NamedFlatFieldDescriptor(pField.field.getName(), offset + keyPosition, pField.type));
+					result.add(
+						new NamedFlatFieldDescriptor(
+							pField.getField().getName(),
+							offset + keyPosition,
+							pField.getTypeInformation()));
 				}
 				keyPosition++;
 			}
@@ -163,9 +159,9 @@ public class PojoTypeInfo<T> extends CompositeType<T> {
 		int fieldPos = -1;
 		TypeInformation<?> fieldType = null;
 		for (int i = 0; i < fields.length; i++) {
-			if (fields[i].field.getName().equals(field)) {
+			if (fields[i].getField().getName().equals(field)) {
 				fieldPos = i;
-				fieldType = fields[i].type;
+				fieldType = fields[i].getTypeInformation();
 				break;
 			}
 		}
@@ -181,7 +177,6 @@ public class PojoTypeInfo<T> extends CompositeType<T> {
 				}
 				// add all fields of composite type
 				((CompositeType<?>) fieldType).getFlatFields("*", offset, result);
-				return;
 			} else {
 				// we found the field to add
 				// compute flat field position by adding skipped fields
@@ -190,8 +185,6 @@ public class PojoTypeInfo<T> extends CompositeType<T> {
 					flatFieldPos += this.getTypeAt(i).getTotalFields();
 				}
 				result.add(new FlatFieldDescriptor(flatFieldPos, fieldType));
-				// nothing left to do
-				return;
 			}
 		} else {
 			if(fieldType instanceof CompositeType<?>) {
@@ -200,8 +193,6 @@ public class PojoTypeInfo<T> extends CompositeType<T> {
 					offset += this.getTypeAt(i).getTotalFields();
 				}
 				((CompositeType<?>) fieldType).getFlatFields(tail, offset, result);
-				// nothing left to do
-				return;
 			} else {
 				throw new InvalidFieldReferenceException("Nested field expression \""+tail+"\" not possible on atomic type "+fieldType+".");
 			}
@@ -226,9 +217,9 @@ public class PojoTypeInfo<T> extends CompositeType<T> {
 		int fieldPos = -1;
 		TypeInformation<?> fieldType = null;
 		for (int i = 0; i < fields.length; i++) {
-			if (fields[i].field.getName().equals(field)) {
+			if (fields[i].getField().getName().equals(field)) {
 				fieldPos = i;
-				fieldType = fields[i].type;
+				fieldType = fields[i].getTypeInformation();
 				break;
 			}
 		}
@@ -255,10 +246,15 @@ public class PojoTypeInfo<T> extends CompositeType<T> {
 			throw new IndexOutOfBoundsException();
 		}
 		@SuppressWarnings("unchecked")
-		TypeInformation<X> typed = (TypeInformation<X>) fields[pos].type;
+		TypeInformation<X> typed = (TypeInformation<X>) fields[pos].getTypeInformation();
 		return typed;
 	}
 
+	@Override
+	protected TypeComparatorBuilder<T> createTypeComparatorBuilder() {
+		return new PojoTypeComparatorBuilder();
+	}
+
 	// used for testing. Maybe use mockito here
 	public PojoField getPojoFieldAt(int pos) {
 		if (pos < 0 || pos >= this.fields.length) {
@@ -267,42 +263,10 @@ public class PojoTypeInfo<T> extends CompositeType<T> {
 		return this.fields[pos];
 	}
 
-	/**
-	 * Comparator creation
-	 */
-	private TypeComparator<?>[] fieldComparators;
-	private Field[] keyFields;
-	private int comparatorHelperIndex = 0;
-	@Override
-	protected void initializeNewComparator(int keyCount) {
-		fieldComparators = new TypeComparator<?>[keyCount];
-		keyFields = new Field[keyCount];
-		comparatorHelperIndex = 0;
-	}
-
-	@Override
-	protected void addCompareField(int fieldId, TypeComparator<?> comparator) {
-		fieldComparators[comparatorHelperIndex] = comparator;
-		keyFields[comparatorHelperIndex] = fields[fieldId].field;
-		comparatorHelperIndex++;
-	}
-
-	@Override
-	protected TypeComparator<T> getNewComparator(ExecutionConfig config) {
-		// first remove the null array fields
-		final Field[] finalKeyFields = Arrays.copyOf(keyFields, comparatorHelperIndex);
-		@SuppressWarnings("rawtypes")
-		final TypeComparator[] finalFieldComparators = Arrays.copyOf(fieldComparators, comparatorHelperIndex);
-		if(finalFieldComparators.length == 0 || finalKeyFields.length == 0 ||  finalFieldComparators.length != finalKeyFields.length) {
-			throw new IllegalArgumentException("Pojo comparator creation has a bug");
-		}
-		return new PojoComparator<T>(finalKeyFields, finalFieldComparators, createSerializer(config), typeClass);
-	}
-
 	public String[] getFieldNames() {
 		String[] result = new String[fields.length];
 		for (int i = 0; i < fields.length; i++) {
-			result[i] = fields[i].field.getName();
+			result[i] = fields[i].getField().getName();
 		}
 		return result;
 	}
@@ -310,7 +274,7 @@ public class PojoTypeInfo<T> extends CompositeType<T> {
 	@Override
 	public int getFieldIndex(String fieldName) {
 		for (int i = 0; i < fields.length; i++) {
-			if (fields[i].field.getName().equals(fieldName)) {
+			if (fields[i].getField().getName().equals(fieldName)) {
 				return i;
 			}
 		}
@@ -320,46 +284,106 @@ public class PojoTypeInfo<T> extends CompositeType<T> {
 	@Override
 	public TypeSerializer<T> createSerializer(ExecutionConfig config) {
 		if(config.isForceKryoEnabled()) {
-			return new KryoSerializer<T>(this.typeClass, config);
+			return new KryoSerializer<T>(getTypeClass(), config);
 		}
 		if(config.isForceAvroEnabled()) {
-			return new AvroSerializer<T>(this.typeClass);
+			return new AvroSerializer<T>(getTypeClass());
 		}
 
 		TypeSerializer<?>[] fieldSerializers = new TypeSerializer<?>[fields.length ];
 		Field[] reflectiveFields = new Field[fields.length];
 
 		for (int i = 0; i < fields.length; i++) {
-			fieldSerializers[i] = fields[i].type.createSerializer(config);
-			reflectiveFields[i] = fields[i].field;
+			fieldSerializers[i] = fields[i].getTypeInformation().createSerializer(config);
+			reflectiveFields[i] = fields[i].getField();
 		}
 
-		return new PojoSerializer<T>(this.typeClass, fieldSerializers, reflectiveFields, config);
+		return new PojoSerializer<T>(getTypeClass(), fieldSerializers, reflectiveFields, config);
 	}
-
-	// --------------------------------------------------------------------------------------------
 	
 	@Override
 	public boolean equals(Object obj) {
-		return (obj instanceof PojoTypeInfo) && ((PojoTypeInfo<?>) obj).typeClass == this.typeClass;
+		if (obj instanceof PojoTypeInfo) {
+			@SuppressWarnings("unchecked")
+			PojoTypeInfo<T> pojoTypeInfo = (PojoTypeInfo<T>)obj;
+
+			return pojoTypeInfo.canEqual(this) &&
+				super.equals(pojoTypeInfo) &&
+				Arrays.equals(fields, pojoTypeInfo.fields) &&
+				totalFields == pojoTypeInfo.totalFields;
+		} else {
+			return false;
+		}
 	}
 	
 	@Override
 	public int hashCode() {
-		return typeClass.hashCode() + 1387562934;
+		return 31 * (31 * Arrays.hashCode(fields) + totalFields) + super.hashCode();
+	}
+
+	@Override
+	public boolean canEqual(Object obj) {
+		return obj instanceof PojoTypeInfo;
 	}
 	
 	@Override
 	public String toString() {
 		List<String> fieldStrings = new ArrayList<String>();
 		for (PojoField field : fields) {
-			fieldStrings.add(field.field.getName() + ": " + field.type.toString());
+			fieldStrings.add(field.getField().getName() + ": " + field.getTypeInformation().toString());
 		}
-		return "PojoType<" + typeClass.getName()
+		return "PojoType<" + getTypeClass().getName()
 				+ ", fields = [" + Joiner.on(", ").join(fieldStrings) + "]"
 				+ ">";
 	}
 
+	// --------------------------------------------------------------------------------------------
+
+	private class PojoTypeComparatorBuilder implements TypeComparatorBuilder<T> {
+
+		private ArrayList<TypeComparator> fieldComparators;
+		private ArrayList<Field> keyFields;
+
+		public PojoTypeComparatorBuilder() {
+			fieldComparators = new ArrayList<TypeComparator>();
+			keyFields = new ArrayList<Field>();
+		}
+
+
+		@Override
+		public void initializeTypeComparatorBuilder(int size) {
+			fieldComparators.ensureCapacity(size);
+			keyFields.ensureCapacity(size);
+		}
+
+		@Override
+		public void addComparatorField(int fieldId, TypeComparator<?> comparator) {
+			fieldComparators.add(comparator);
+			keyFields.add(fields[fieldId].getField());
+		}
+
+		@Override
+		public TypeComparator<T> createTypeComparator(ExecutionConfig config) {
+			Preconditions.checkState(
+				keyFields.size() > 0,
+				"No keys were defined for the PojoTypeComparatorBuilder.");
+
+			Preconditions.checkState(
+				fieldComparators.size() > 0,
+				"No type comparators were defined for the PojoTypeComparatorBuilder.");
+
+			Preconditions.checkState(
+				keyFields.size() == fieldComparators.size(),
+				"Number of key fields and field comparators is not equal.");
+
+			return new PojoComparator<T>(
+				keyFields.toArray(new Field[keyFields.size()]),
+				fieldComparators.toArray(new TypeComparator[fieldComparators.size()]),
+				createSerializer(config),
+				getTypeClass());
+		}
+	}
+
 	public static class NamedFlatFieldDescriptor extends FlatFieldDescriptor {
 
 		private String fieldName;

http://git-wip-us.apache.org/repos/asf/flink/blob/b49b1552/flink-java/src/main/java/org/apache/flink/api/java/typeutils/RecordTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/RecordTypeInfo.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/RecordTypeInfo.java
index 9857eb6..e9ce102 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/RecordTypeInfo.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/RecordTypeInfo.java
@@ -70,12 +70,22 @@ public class RecordTypeInfo extends TypeInformation<Record> {
 	
 	@Override
 	public int hashCode() {
-		return Record.class.hashCode() ^ 0x165667b1;
+		return Record.class.hashCode();
 	}
-	
+
+	@Override
+	public boolean canEqual(Object obj) {
+		return obj instanceof RecordTypeInfo;
+	}
+
 	@Override
 	public boolean equals(Object obj) {
-		return obj.getClass() == RecordTypeInfo.class;
+		if (obj instanceof RecordTypeInfo) {
+			RecordTypeInfo recordTypeInfo = (RecordTypeInfo) obj;
+			return recordTypeInfo.canEqual(this);
+		} else {
+			return false;
+		}
 	}
 	
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/b49b1552/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java
index 618b190..30710e5 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java
@@ -18,8 +18,12 @@
 
 package org.apache.flink.api.java.typeutils;
 
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 
+import com.google.common.base.Preconditions;
+import com.google.common.primitives.Ints;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.InvalidTypesException;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
@@ -52,10 +56,13 @@ public final class TupleTypeInfo<T extends Tuple> extends TupleTypeInfoBase<T> {
 
 	public TupleTypeInfo(Class<T> tupleType, TypeInformation<?>... types) {
 		super(tupleType, types);
-		if (types == null || types.length > Tuple.MAX_ARITY) {
-			throw new IllegalArgumentException();
-		}
+
+		Preconditions.checkArgument(
+			types.length <= Tuple.MAX_ARITY,
+			"The tuple type exceeds the maximum supported arity.");
+
 		this.fieldNames = new String[types.length];
+
 		for (int i = 0; i < types.length; i++) {
 			fieldNames[i] = "f" + i;
 		}
@@ -78,7 +85,7 @@ public final class TupleTypeInfo<T extends Tuple> extends TupleTypeInfoBase<T> {
 	@SuppressWarnings("unchecked")
 	@Override
 	public TupleSerializer<T> createSerializer(ExecutionConfig executionConfig) {
-		if (this.tupleType == Tuple0.class) {
+		if (getTypeClass() == Tuple0.class) {
 			return (TupleSerializer<T>) Tuple0Serializer.INSTANCE;
 		}
 
@@ -91,48 +98,65 @@ public final class TupleTypeInfo<T extends Tuple> extends TupleTypeInfoBase<T> {
 		
 		return new TupleSerializer<T>(tupleClass, fieldSerializers);
 	}
-	
-	/**
-	 * Comparator creation
-	 */
-	private TypeComparator<?>[] fieldComparators;
-	private int[] logicalKeyFields;
-	private int comparatorHelperIndex = 0;
-	
-	@Override
-	protected void initializeNewComparator(int localKeyCount) {
-		fieldComparators = new TypeComparator<?>[localKeyCount];
-		logicalKeyFields = new int[localKeyCount];
-		comparatorHelperIndex = 0;
-	}
 
 	@Override
-	protected void addCompareField(int fieldId, TypeComparator<?> comparator) {
-		fieldComparators[comparatorHelperIndex] = comparator;
-		logicalKeyFields[comparatorHelperIndex] = fieldId;
-		comparatorHelperIndex++;
+	protected TypeComparatorBuilder<T> createTypeComparatorBuilder() {
+		return new TupleTypeComparatorBuilder();
 	}
 
-	@Override
-	protected TypeComparator<T> getNewComparator(ExecutionConfig executionConfig) {
-		@SuppressWarnings("rawtypes")
-		final TypeComparator[] finalFieldComparators = Arrays.copyOf(fieldComparators, comparatorHelperIndex);
-		final int[] finalLogicalKeyFields = Arrays.copyOf(logicalKeyFields, comparatorHelperIndex);
-		//final TypeSerializer[] finalFieldSerializers = Arrays.copyOf(fieldSerializers, comparatorHelperIndex);
-		// create the serializers for the prefix up to highest key position
-		int maxKey = 0;
-		for(int key : finalLogicalKeyFields) {
-			maxKey = Math.max(maxKey, key);
+	private class TupleTypeComparatorBuilder implements TypeComparatorBuilder<T> {
+
+		private final ArrayList<TypeComparator> fieldComparators = new ArrayList<TypeComparator>();
+		private final ArrayList<Integer> logicalKeyFields = new ArrayList<Integer>();
+
+		@Override
+		public void initializeTypeComparatorBuilder(int size) {
+			fieldComparators.ensureCapacity(size);
+			logicalKeyFields.ensureCapacity(size);
 		}
-		TypeSerializer<?>[] fieldSerializers = new TypeSerializer<?>[maxKey + 1];
-		for (int i = 0; i <= maxKey; i++) {
-			fieldSerializers[i] = types[i].createSerializer(executionConfig);
+
+		@Override
+		public void addComparatorField(int fieldId, TypeComparator<?> comparator) {
+			fieldComparators.add(comparator);
+			logicalKeyFields.add(fieldId);
 		}
-		if(finalFieldComparators.length == 0 || finalLogicalKeyFields.length == 0 || fieldSerializers.length == 0 
-				|| finalFieldComparators.length != finalLogicalKeyFields.length) {
-			throw new IllegalArgumentException("Tuple comparator creation has a bug");
+
+		@Override
+		public TypeComparator<T> createTypeComparator(ExecutionConfig config) {
+			Preconditions.checkState(
+				fieldComparators.size() > 0,
+				"No field comparators were defined for the TupleTypeComparatorBuilder."
+			);
+
+			Preconditions.checkState(
+				logicalKeyFields.size() > 0,
+				"No key fields were defined for the TupleTypeComparatorBuilder."
+			);
+
+			Preconditions.checkState(
+				fieldComparators.size() == logicalKeyFields.size(),
+				"The number of field comparators and key fields is not equal."
+			);
+
+			final int maxKey = Collections.max(logicalKeyFields);
+
+			Preconditions.checkState(
+				maxKey >= 0,
+				"The maximum key field must be greater or equal than 0."
+			);
+
+			TypeSerializer<?>[] fieldSerializers = new TypeSerializer<?>[maxKey + 1];
+
+			for (int i = 0; i <= maxKey; i++) {
+				fieldSerializers[i] = types[i].createSerializer(config);
+			}
+
+			return new TupleComparator<T>(
+				Ints.toArray(logicalKeyFields),
+				fieldComparators.toArray(new TypeComparator[fieldComparators.size()]),
+				fieldSerializers
+			);
 		}
-		return new TupleComparator<T>(finalLogicalKeyFields, finalFieldComparators, fieldSerializers);
 	}
 	
 	// --------------------------------------------------------------------------------------------
@@ -142,17 +166,22 @@ public final class TupleTypeInfo<T extends Tuple> extends TupleTypeInfoBase<T> {
 		if (obj instanceof TupleTypeInfo) {
 			@SuppressWarnings("unchecked")
 			TupleTypeInfo<T> other = (TupleTypeInfo<T>) obj;
-			return ((this.tupleType == null && other.tupleType == null) || this.tupleType.equals(other.tupleType)) &&
-					Arrays.deepEquals(this.types, other.types);
-			
+			return other.canEqual(this) &&
+				super.equals(other) &&
+				Arrays.equals(fieldNames, other.fieldNames);
 		} else {
 			return false;
 		}
 	}
+
+	@Override
+	public boolean canEqual(Object obj) {
+		return obj instanceof TupleTypeInfo;
+	}
 	
 	@Override
 	public int hashCode() {
-		return this.types.hashCode() ^ Arrays.deepHashCode(this.types);
+		return 31 * super.hashCode() + Arrays.hashCode(fieldNames);
 	}
 	
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/b49b1552/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java
index a2d937f..469476f 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java
@@ -23,6 +23,7 @@ import java.util.List;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+import com.google.common.base.Preconditions;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.CompositeType;
 import org.apache.flink.api.java.operators.Keys.ExpressionKeys;
@@ -45,17 +46,20 @@ public abstract class TupleTypeInfoBase<T> extends CompositeType<T> {
 	
 	protected final TypeInformation<?>[] types;
 	
-	protected final Class<T> tupleType;
-
-	private int totalFields;
+	private final int totalFields;
 
 	public TupleTypeInfoBase(Class<T> tupleType, TypeInformation<?>... types) {
 		super(tupleType);
-		this.tupleType = tupleType;
-		this.types = types;
+
+		this.types = Preconditions.checkNotNull(types);
+
+		int fieldCounter = 0;
+
 		for(TypeInformation<?> type : types) {
-			totalFields += type.getTotalFields();
+			fieldCounter += type.getTotalFields();
 		}
+
+		totalFields = fieldCounter;
 	}
 
 	@Override
@@ -83,11 +87,6 @@ public abstract class TupleTypeInfoBase<T> extends CompositeType<T> {
 	}
 
 	@Override
-	public Class<T> getTypeClass() {
-		return tupleType;
-	}
-
-	@Override
 	public void getFlatFields(String fieldExpression, int offset, List<FlatFieldDescriptor> result) {
 
 		Matcher matcher = PATTERN_NESTED_FIELDS_WILDCARD.matcher(fieldExpression);
@@ -109,53 +108,49 @@ public abstract class TupleTypeInfoBase<T> extends CompositeType<T> {
 				}
 				keyPosition++;
 			}
-			return;
-		}
+		} else {
+			String fieldStr = matcher.group(1);
+			Matcher fieldMatcher = PATTERN_FIELD.matcher(fieldStr);
 
-		String fieldStr = matcher.group(1);
-		Matcher fieldMatcher = PATTERN_FIELD.matcher(fieldStr);
-		if (!fieldMatcher.matches()) {
-			throw new RuntimeException("Invalid matcher pattern");
-		}
-		field = fieldMatcher.group(2);
-		int fieldPos = Integer.valueOf(field);
+			if (!fieldMatcher.matches()) {
+				throw new RuntimeException("Invalid matcher pattern");
+			}
 
-		if (fieldPos >= this.getArity()) {
-			throw new InvalidFieldReferenceException("Tuple field expression \"" + fieldStr + "\" out of bounds of " + this.toString() + ".");
-		}
-		TypeInformation<?> fieldType = this.getTypeAt(fieldPos);
-		String tail = matcher.group(5);
-		if(tail == null) {
-			if(fieldType instanceof CompositeType) {
-				// forward offsets
-				for(int i=0; i<fieldPos; i++) {
-					offset += this.getTypeAt(i).getTotalFields();
-				}
-				// add all fields of composite type
-				((CompositeType<?>) fieldType).getFlatFields("*", offset, result);
-				return;
-			} else {
-				// we found the field to add
-				// compute flat field position by adding skipped fields
-				int flatFieldPos = offset;
-				for(int i=0; i<fieldPos; i++) {
-					flatFieldPos += this.getTypeAt(i).getTotalFields();
-				}
-				result.add(new FlatFieldDescriptor(flatFieldPos, fieldType));
-				// nothing left to do
-				return;
+			field = fieldMatcher.group(2);
+			int fieldPos = Integer.valueOf(field);
+
+			if (fieldPos >= this.getArity()) {
+				throw new InvalidFieldReferenceException("Tuple field expression \"" + fieldStr + "\" out of bounds of " + this.toString() + ".");
 			}
-		} else {
-			if(fieldType instanceof CompositeType<?>) {
-				// forward offset
-				for(int i=0; i<fieldPos; i++) {
-					offset += this.getTypeAt(i).getTotalFields();
+			TypeInformation<?> fieldType = this.getTypeAt(fieldPos);
+			String tail = matcher.group(5);
+			if (tail == null) {
+				if (fieldType instanceof CompositeType) {
+					// forward offsets
+					for (int i = 0; i < fieldPos; i++) {
+						offset += this.getTypeAt(i).getTotalFields();
+					}
+					// add all fields of composite type
+					((CompositeType<?>) fieldType).getFlatFields("*", offset, result);
+				} else {
+					// we found the field to add
+					// compute flat field position by adding skipped fields
+					int flatFieldPos = offset;
+					for (int i = 0; i < fieldPos; i++) {
+						flatFieldPos += this.getTypeAt(i).getTotalFields();
+					}
+					result.add(new FlatFieldDescriptor(flatFieldPos, fieldType));
 				}
-				((CompositeType<?>) fieldType).getFlatFields(tail, offset, result);
-				// nothing left to do
-				return;
 			} else {
-				throw new InvalidFieldReferenceException("Nested field expression \""+tail+"\" not possible on atomic type "+fieldType+".");
+				if (fieldType instanceof CompositeType<?>) {
+					// forward offset
+					for (int i = 0; i < fieldPos; i++) {
+						offset += this.getTypeAt(i).getTotalFields();
+					}
+					((CompositeType<?>) fieldType).getFlatFields(tail, offset, result);
+				} else {
+					throw new InvalidFieldReferenceException("Nested field expression \"" + tail + "\" not possible on atomic type " + fieldType + ".");
+				}
 			}
 		}
 	}
@@ -213,17 +208,24 @@ public abstract class TupleTypeInfoBase<T> extends CompositeType<T> {
 		if (obj instanceof TupleTypeInfoBase) {
 			@SuppressWarnings("unchecked")
 			TupleTypeInfoBase<T> other = (TupleTypeInfoBase<T>) obj;
-			return ((this.tupleType == null && other.tupleType == null) || this.tupleType.equals(other.tupleType)) &&
-					Arrays.deepEquals(this.types, other.types);
-			
+
+			return other.canEqual(this) &&
+				super.equals(other) &&
+				Arrays.equals(types, other.types) &&
+				totalFields == other.totalFields;
 		} else {
 			return false;
 		}
 	}
+
+	@Override
+	public boolean canEqual(Object obj) {
+		return obj instanceof TupleTypeInfoBase;
+	}
 	
 	@Override
 	public int hashCode() {
-		return this.types.hashCode() ^ Arrays.deepHashCode(this.types);
+		return 31 * (31 * super.hashCode() + Arrays.hashCode(types)) + totalFields;
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/b49b1552/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
index 8f5d599..0196b5d 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
@@ -526,12 +526,33 @@ public class TypeExtractor {
 				} catch (ClassNotFoundException e) {
 					throw new InvalidTypesException("Could not convert GenericArrayType to Class.");
 				}
+
 				return getForClass(classArray);
+			} else {
+				TypeInformation<?> componentInfo = createTypeInfoWithTypeHierarchy(
+					typeHierarchy,
+					genericArray.getGenericComponentType(),
+					in1Type,
+					in2Type);
+
+				Class<OUT> classArray;
+
+				try {
+					String componentClassName = componentInfo.getTypeClass().getName();
+					String resultingClassName;
+
+					if (componentClassName.startsWith("[")) {
+						resultingClassName = "[" + componentClassName;
+					} else {
+						resultingClassName = "[L" + componentClassName + ";";
+					}
+					classArray = (Class<OUT>) Class.forName(resultingClassName);
+				} catch (ClassNotFoundException e) {
+					throw new InvalidTypesException("Could not convert GenericArrayType to Class.");
+				}
+
+				return ObjectArrayTypeInfo.getInfoFor(classArray, componentInfo);
 			}
-			
-			TypeInformation<?> componentInfo = createTypeInfoWithTypeHierarchy(typeHierarchy, genericArray.getGenericComponentType(),
-					in1Type, in2Type);
-			return ObjectArrayTypeInfo.getInfoFor(t, componentInfo);
 		}
 		// objects with generics are treated as Class first
 		else if (t instanceof ParameterizedType) {
@@ -1188,7 +1209,13 @@ public class TypeExtractor {
 			
 			// object arrays
 			else {
-				return ObjectArrayTypeInfo.getInfoFor(clazz);
+				TypeInformation<?> componentTypeInfo = createTypeInfoWithTypeHierarchy(
+					typeHierarchy,
+					clazz.getComponentType(),
+					in1Type,
+					in2Type);
+
+				return ObjectArrayTypeInfo.getInfoFor(clazz, componentTypeInfo);
 			}
 		}
 		
@@ -1481,8 +1508,8 @@ public class TypeExtractor {
 	private static TypeInformation<?> getTypeOfPojoField(TypeInformation<?> pojoInfo, Field field) {
 		for (int j = 0; j < pojoInfo.getArity(); j++) {
 			PojoField pf = ((PojoTypeInfo<?>) pojoInfo).getPojoFieldAt(j);
-			if (pf.field.getName().equals(field.getName())) {
-				return pf.type;
+			if (pf.getField().getName().equals(field.getName())) {
+				return pf.getTypeInformation();
 			}
 		}
 		return null;

http://git-wip-us.apache.org/repos/asf/flink/blob/b49b1552/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ValueTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ValueTypeInfo.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ValueTypeInfo.java
index e61acd8..0b4823e 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ValueTypeInfo.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ValueTypeInfo.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.api.java.typeutils;
 
+import com.google.common.base.Preconditions;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.InvalidTypesException;
 import org.apache.flink.api.common.typeinfo.AtomicType;
@@ -52,17 +53,13 @@ public class ValueTypeInfo<T extends Value> extends TypeInformation<T> implement
 	private static final long serialVersionUID = 1L;
 	
 	private final Class<T> type;
-
 	
 	public ValueTypeInfo(Class<T> type) {
-		if (type == null) {
-			throw new NullPointerException();
-		}
-		if (!Value.class.isAssignableFrom(type) && !type.equals(Value.class)) {
-			throw new IllegalArgumentException("ValueTypeInfo can only be used for subclasses of " + Value.class.getName());
-		}
-		
-		this.type = type;
+		this.type = Preconditions.checkNotNull(type);
+
+		Preconditions.checkArgument(
+			Value.class.isAssignableFrom(type) || type.equals(Value.class),
+			"ValueTypeInfo can only be used for subclasses of " + Value.class.getName());
 	}
 	
 	@Override
@@ -136,17 +133,26 @@ public class ValueTypeInfo<T extends Value> extends TypeInformation<T> implement
 	
 	@Override
 	public int hashCode() {
-		return this.type.hashCode() ^ 0xd3a2646c;
+		return this.type.hashCode();
 	}
 	
 	@Override
 	public boolean equals(Object obj) {
-		if (obj.getClass() == ValueTypeInfo.class) {
-			return type == ((ValueTypeInfo<?>) obj).type;
+		if (obj instanceof ValueTypeInfo) {
+			@SuppressWarnings("unchecked")
+			ValueTypeInfo<T> valueTypeInfo = (ValueTypeInfo<T>) obj;
+
+			return valueTypeInfo.canEqual(this) &&
+				type == valueTypeInfo.type;
 		} else {
 			return false;
 		}
 	}
+
+	@Override
+	public boolean canEqual(Object obj) {
+		return obj instanceof ValueTypeInfo;
+	}
 	
 	@Override
 	public String toString() {
@@ -155,7 +161,7 @@ public class ValueTypeInfo<T extends Value> extends TypeInformation<T> implement
 	
 	// --------------------------------------------------------------------------------------------
 	
-	static final <X extends Value> TypeInformation<X> getValueTypeInfo(Class<X> typeClass) {
+	static <X extends Value> TypeInformation<X> getValueTypeInfo(Class<X> typeClass) {
 		if (Value.class.isAssignableFrom(typeClass) && !typeClass.equals(Value.class)) {
 			return new ValueTypeInfo<X>(typeClass);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/b49b1552/flink-java/src/main/java/org/apache/flink/api/java/typeutils/WritableTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/WritableTypeInfo.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/WritableTypeInfo.java
index a91b888..6c140d9 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/WritableTypeInfo.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/WritableTypeInfo.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.api.java.typeutils;
 
+import com.google.common.base.Preconditions;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.InvalidTypesException;
 import org.apache.flink.api.common.typeinfo.AtomicType;
@@ -41,13 +42,11 @@ public class WritableTypeInfo<T extends Writable> extends TypeInformation<T> imp
 	private final Class<T> typeClass;
 	
 	public WritableTypeInfo(Class<T> typeClass) {
-		if (typeClass == null) {
-			throw new NullPointerException();
-		}
-		if (!Writable.class.isAssignableFrom(typeClass) || typeClass == Writable.class) {
-			throw new IllegalArgumentException("WritableTypeInfo can only be used for subclasses of " + Writable.class.getName());
-		}
-		this.typeClass = typeClass;
+		this.typeClass = Preconditions.checkNotNull(typeClass);
+
+		Preconditions.checkArgument(
+			Writable.class.isAssignableFrom(typeClass) && !typeClass.equals(Writable.class),
+			"WritableTypeInfo can only be used for subclasses of " + Writable.class.getName());
 	}
 
 	@SuppressWarnings({ "rawtypes", "unchecked" })
@@ -104,12 +103,26 @@ public class WritableTypeInfo<T extends Writable> extends TypeInformation<T> imp
 	
 	@Override
 	public int hashCode() {
-		return typeClass.hashCode() ^ 0xd3a2646c;
+		return typeClass.hashCode();
 	}
 	
 	@Override
 	public boolean equals(Object obj) {
-		return obj.getClass() == WritableTypeInfo.class && typeClass == ((WritableTypeInfo<?>) obj).typeClass;
+		if (obj instanceof WritableTypeInfo) {
+			@SuppressWarnings("unchecked")
+			WritableTypeInfo<T> writableTypeInfo = (WritableTypeInfo<T>) obj;
+
+			return writableTypeInfo.canEqual(this) &&
+				typeClass == writableTypeInfo.typeClass;
+
+		} else {
+			return false;
+		}
+	}
+
+	@Override
+	public boolean canEqual(Object obj) {
+		return obj instanceof WritableTypeInfo;
 	}
 	
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/b49b1552/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java
index 34dc500..26bf4ce 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java
@@ -25,6 +25,7 @@ import java.io.IOException;
 import com.esotericsoftware.kryo.KryoException;
 import com.esotericsoftware.kryo.io.Input;
 import com.esotericsoftware.kryo.io.Output;
+import com.google.common.base.Preconditions;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.reflect.ReflectDatumReader;
 import org.apache.avro.reflect.ReflectDatumWriter;
@@ -69,14 +70,10 @@ public final class AvroSerializer<T> extends TypeSerializer<T> {
 	}
 	
 	public AvroSerializer(Class<T> type, Class<? extends T> typeToInstantiate) {
-		if (type == null || typeToInstantiate == null) {
-			throw new NullPointerException();
-		}
+		this.type = Preconditions.checkNotNull(type);
+		this.typeToInstantiate = Preconditions.checkNotNull(typeToInstantiate);
 		
 		InstantiationUtil.checkForInstantiation(typeToInstantiate);
-		
-		this.type = type;
-		this.typeToInstantiate = typeToInstantiate;
 	}
 
 	// --------------------------------------------------------------------------------------------
@@ -192,16 +189,25 @@ public final class AvroSerializer<T> extends TypeSerializer<T> {
 	
 	@Override
 	public int hashCode() {
-		return 0x42fba55c + this.type.hashCode() + this.typeToInstantiate.hashCode();
+		return 31 * this.type.hashCode() + this.typeToInstantiate.hashCode();
 	}
 	
 	@Override
 	public boolean equals(Object obj) {
-		if (obj.getClass() == AvroSerializer.class) {
-			AvroSerializer<?> other = (AvroSerializer<?>) obj;
-			return this.type == other.type && this.typeToInstantiate == other.typeToInstantiate;
+		if (obj instanceof AvroSerializer) {
+			@SuppressWarnings("unchecked")
+			AvroSerializer<T> avroSerializer = (AvroSerializer<T>) obj;
+
+			return avroSerializer.canEqual(this) &&
+				type == avroSerializer.type &&
+				typeToInstantiate == avroSerializer.typeToInstantiate;
 		} else {
 			return false;
 		}
 	}
+
+	@Override
+	public boolean canEqual(Object obj) {
+		return obj instanceof AvroSerializer;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b49b1552/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueSerializer.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueSerializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueSerializer.java
index 193d495..9e46f27 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueSerializer.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueSerializer.java
@@ -20,6 +20,7 @@ package org.apache.flink.api.java.typeutils.runtime;
 
 import java.io.IOException;
 
+import com.google.common.base.Preconditions;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
@@ -38,7 +39,7 @@ public class CopyableValueSerializer<T extends CopyableValue<T>> extends TypeSer
 	
 	
 	public CopyableValueSerializer(Class<T> valueClass) {
-		this.valueClass = valueClass;
+		this.valueClass = Preconditions.checkNotNull(valueClass);
 	}
 
 	@Override
@@ -105,16 +106,24 @@ public class CopyableValueSerializer<T extends CopyableValue<T>> extends TypeSer
 	
 	@Override
 	public int hashCode() {
-		return this.valueClass.hashCode() + 9231;
+		return this.valueClass.hashCode();
 	}
 	
 	@Override
 	public boolean equals(Object obj) {
-		if (obj.getClass() == CopyableValueSerializer.class) {
-			CopyableValueSerializer<?> other = (CopyableValueSerializer<?>) obj;
-			return this.valueClass == other.valueClass;
+		if (obj instanceof CopyableValueSerializer) {
+			@SuppressWarnings("unchecked")
+			CopyableValueSerializer<T> copyableValueSerializer = (CopyableValueSerializer<T>) obj;
+
+			return copyableValueSerializer.canEqual(this) &&
+				valueClass == copyableValueSerializer.valueClass;
 		} else {
 			return false;
 		}
 	}
+
+	@Override
+	public boolean canEqual(Object obj) {
+		return obj instanceof CopyableValueSerializer;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b49b1552/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
index 5d4553d..de24956 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
@@ -30,7 +30,9 @@ import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 
+import com.google.common.base.Preconditions;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -51,33 +53,33 @@ public final class PojoSerializer<T> extends TypeSerializer<T> {
 
 	private final Class<T> clazz;
 
-	private TypeSerializer<Object>[] fieldSerializers;
+	private final TypeSerializer<Object>[] fieldSerializers;
 
-	// We need to handle these ourselves in writeObject()/readObject()
-	private transient Field[] fields;
-
-	private int numFields;
+	private final int numFields;
 
-	private transient Map<Class<?>, TypeSerializer<?>> subclassSerializerCache;
-	private transient ClassLoader cl;
+	private final Map<Class<?>, Integer> registeredClasses;
 
-	private Map<Class<?>, Integer> registeredClasses;
-
-	private TypeSerializer<?>[] registeredSerializers;
+	private final TypeSerializer<?>[] registeredSerializers;
 
 	private final ExecutionConfig executionConfig;
 
+	private transient Map<Class<?>, TypeSerializer<?>> subclassSerializerCache;
+	private transient ClassLoader cl;
+	// We need to handle these ourselves in writeObject()/readObject()
+	private transient Field[] fields;
+
 	@SuppressWarnings("unchecked")
 	public PojoSerializer(
 			Class<T> clazz,
 			TypeSerializer<?>[] fieldSerializers,
 			Field[] fields,
 			ExecutionConfig executionConfig) {
-		this.clazz = clazz;
-		this.fieldSerializers = (TypeSerializer<Object>[]) fieldSerializers;
-		this.fields = fields;
+
+		this.clazz = Preconditions.checkNotNull(clazz);
+		this.fieldSerializers = (TypeSerializer<Object>[]) Preconditions.checkNotNull(fieldSerializers);
+		this.fields = Preconditions.checkNotNull(fields);
 		this.numFields = fieldSerializers.length;
-		this.executionConfig = executionConfig;
+		this.executionConfig = Preconditions.checkNotNull(executionConfig);
 
 		LinkedHashSet<Class<?>> registeredPojoTypes = executionConfig.getRegisteredPojoTypes();
 
@@ -563,23 +565,28 @@ public final class PojoSerializer<T> extends TypeSerializer<T> {
 	
 	@Override
 	public int hashCode() {
-		int hashCode = numFields * 47;
-		for (TypeSerializer<?> ser : this.fieldSerializers) {
-			hashCode = (hashCode << 7) | (hashCode >>> -7);
-			hashCode += ser.hashCode();
-		}
-		return hashCode;
+		return 31 * (31 * Arrays.hashCode(fieldSerializers) + Arrays.hashCode(registeredSerializers)) +
+			Objects.hash(clazz, numFields, registeredClasses);
 	}
 	
 	@Override
 	public boolean equals(Object obj) {
-		if (obj != null && obj instanceof PojoSerializer) {
-			PojoSerializer<?> otherTS = (PojoSerializer<?>) obj;
-			return (otherTS.clazz == this.clazz) &&
-					Arrays.deepEquals(this.fieldSerializers, otherTS.fieldSerializers);
-		}
-		else {
+		if (obj instanceof PojoSerializer) {
+			PojoSerializer<?> other = (PojoSerializer<?>) obj;
+
+			return other.canEqual(this) &&
+				clazz == other.clazz &&
+				Arrays.equals(fieldSerializers, other.fieldSerializers) &&
+				Arrays.equals(registeredSerializers, other.registeredSerializers) &&
+				numFields == other.numFields &&
+				registeredClasses.equals(other.registeredClasses);
+		} else {
 			return false;
 		}
 	}
+
+	@Override
+	public boolean canEqual(Object obj) {
+		return obj instanceof PojoSerializer;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b49b1552/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/Tuple0Serializer.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/Tuple0Serializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/Tuple0Serializer.java
index 246cecf..a06ff1a 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/Tuple0Serializer.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/Tuple0Serializer.java
@@ -13,6 +13,7 @@
 package org.apache.flink.api.java.typeutils.runtime;
 
 import java.io.IOException;
+
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple0;
 import org.apache.flink.core.memory.DataInputView;
@@ -94,12 +95,23 @@ public class Tuple0Serializer extends TupleSerializer<Tuple0> {
 
 	@Override
 	public int hashCode() {
-		return 1837461876;
+		return Tuple0Serializer.class.hashCode();
 	}
 
 	@Override
 	public boolean equals(Object obj) {
-		return obj == this || obj instanceof Tuple0Serializer;
+		if (obj instanceof Tuple0Serializer) {
+			Tuple0Serializer other = (Tuple0Serializer) obj;
+
+			return other.canEqual(this);
+		} else {
+			return false;
+		}
+	}
+
+	@Override
+	public boolean canEqual(Object obj) {
+		return obj instanceof Tuple0Serializer;
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/b49b1552/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
index f041736..bf3c7a1 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
@@ -18,12 +18,14 @@
 
 package org.apache.flink.api.java.typeutils.runtime;
 
+import com.google.common.base.Preconditions;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.Objects;
 
 
 public abstract class TupleSerializerBase<T> extends TypeSerializer<T> {
@@ -32,14 +34,14 @@ public abstract class TupleSerializerBase<T> extends TypeSerializer<T> {
 
 	protected final Class<T> tupleClass;
 
-	protected TypeSerializer<Object>[] fieldSerializers;
+	protected final TypeSerializer<Object>[] fieldSerializers;
 
 	protected final int arity;
 
 	@SuppressWarnings("unchecked")
 	public TupleSerializerBase(Class<T> tupleClass, TypeSerializer<?>[] fieldSerializers) {
-		this.tupleClass = tupleClass;
-		this.fieldSerializers = (TypeSerializer<Object>[]) fieldSerializers;
+		this.tupleClass = Preconditions.checkNotNull(tupleClass);
+		this.fieldSerializers = (TypeSerializer<Object>[]) Preconditions.checkNotNull(fieldSerializers);
 		this.arity = fieldSerializers.length;
 	}
 	
@@ -74,23 +76,25 @@ public abstract class TupleSerializerBase<T> extends TypeSerializer<T> {
 	
 	@Override
 	public int hashCode() {
-		int hashCode = arity * 47;
-		for (TypeSerializer<?> ser : this.fieldSerializers) {
-			hashCode = (hashCode << 7) | (hashCode >>> -7);
-			hashCode += ser.hashCode();
-		}
-		return hashCode;
+		return 31 * Arrays.hashCode(fieldSerializers) + Objects.hash(tupleClass, arity);
 	}
 	
 	@Override
 	public boolean equals(Object obj) {
-		if (obj != null && obj instanceof TupleSerializerBase) {
-			TupleSerializerBase<?> otherTS = (TupleSerializerBase<?>) obj;
-			return (otherTS.tupleClass == this.tupleClass) && 
-					Arrays.deepEquals(this.fieldSerializers, otherTS.fieldSerializers);
-		}
-		else {
+		if (obj instanceof TupleSerializerBase) {
+			TupleSerializerBase<?> other = (TupleSerializerBase<?>) obj;
+
+			return other.canEqual(this) &&
+				tupleClass == other.tupleClass &&
+				Arrays.equals(fieldSerializers, other.fieldSerializers) &&
+				arity == other.arity;
+		} else {
 			return false;
 		}
 	}
+
+	@Override
+	public boolean canEqual(Object obj) {
+		return obj instanceof TupleSerializerBase;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b49b1552/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializer.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializer.java
index ad1b0f0..179ef19 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializer.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializer.java
@@ -20,6 +20,7 @@ package org.apache.flink.api.java.typeutils.runtime;
 
 import java.io.IOException;
 
+import com.google.common.base.Preconditions;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
@@ -47,11 +48,7 @@ public class ValueSerializer<T extends Value> extends TypeSerializer<T> {
 	// --------------------------------------------------------------------------------------------
 	
 	public ValueSerializer(Class<T> type) {
-		if (type == null) {
-			throw new NullPointerException();
-		}
-		
-		this.type = type;
+		this.type = Preconditions.checkNotNull(type);
 	}
 
 	// --------------------------------------------------------------------------------------------
@@ -126,16 +123,22 @@ public class ValueSerializer<T extends Value> extends TypeSerializer<T> {
 	
 	@Override
 	public int hashCode() {
-		return this.type.hashCode() + 17;
+		return this.type.hashCode();
 	}
 	
 	@Override
 	public boolean equals(Object obj) {
-		if (obj.getClass() == ValueSerializer.class) {
+		if (obj instanceof ValueSerializer) {
 			ValueSerializer<?> other = (ValueSerializer<?>) obj;
-			return this.type == other.type;
+
+			return other.canEqual(this) && type == other.type;
 		} else {
 			return false;
 		}
 	}
+
+	@Override
+	public boolean canEqual(Object obj) {
+		return obj instanceof ValueSerializer;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b49b1552/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java
index 60012ee..d854f52 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java
@@ -121,16 +121,22 @@ public class WritableSerializer<T extends Writable> extends TypeSerializer<T> {
 	
 	@Override
 	public int hashCode() {
-		return this.typeClass.hashCode() + 177;
+		return this.typeClass.hashCode();
 	}
 	
 	@Override
 	public boolean equals(Object obj) {
-		if (obj.getClass() == WritableSerializer.class) {
+		if (obj instanceof WritableSerializer) {
 			WritableSerializer<?> other = (WritableSerializer<?>) obj;
-			return this.typeClass == other.typeClass;
+
+			return other.canEqual(this) && typeClass == other.typeClass;
 		} else {
 			return false;
 		}
 	}
+
+	@Override
+	public boolean canEqual(Object obj) {
+		return obj instanceof WritableSerializer;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b49b1552/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
index 8ae3562..28473a2 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
@@ -25,6 +25,7 @@ import com.esotericsoftware.kryo.factories.ReflectionSerializerFactory;
 import com.esotericsoftware.kryo.io.Input;
 import com.esotericsoftware.kryo.io.Output;
 import com.esotericsoftware.kryo.serializers.JavaSerializer;
+import com.google.common.base.Preconditions;
 import com.twitter.chill.ScalaKryoInstantiator;
 
 import org.apache.avro.generic.GenericData;
@@ -44,6 +45,7 @@ import java.io.IOException;
 import java.lang.reflect.Modifier;
 import java.util.LinkedHashSet;
 import java.util.List;
+import java.util.Objects;
 
 /**
  * A type serializer that serializes its type using the Kryo serialization
@@ -83,10 +85,7 @@ public class KryoSerializer<T> extends TypeSerializer<T> {
 	// ------------------------------------------------------------------------
 
 	public KryoSerializer(Class<T> type, ExecutionConfig executionConfig){
-		if(type == null){
-			throw new NullPointerException("Type class cannot be null.");
-		}
-		this.type = type;
+		this.type = Preconditions.checkNotNull(type);
 
 		this.defaultSerializers = executionConfig.getDefaultKryoSerializers();
 		this.defaultSerializerClasses = executionConfig.getDefaultKryoSerializerClasses();
@@ -240,19 +239,34 @@ public class KryoSerializer<T> extends TypeSerializer<T> {
 	
 	@Override
 	public int hashCode() {
-		return type.hashCode();
+		return Objects.hash(
+			type,
+			registeredTypes,
+			registeredTypesWithSerializerClasses,
+			defaultSerializerClasses);
 	}
 	
 	@Override
 	public boolean equals(Object obj) {
-		if (obj != null && obj instanceof KryoSerializer) {
+		if (obj instanceof KryoSerializer) {
 			KryoSerializer<?> other = (KryoSerializer<?>) obj;
-			return other.type == this.type;
+
+			// we cannot include the Serializers here because they don't implement the equals method
+			return other.canEqual(this) &&
+				type == other.type &&
+				registeredTypes.equals(other.registeredTypes) &&
+				registeredTypesWithSerializerClasses.equals(other.registeredTypesWithSerializerClasses) &&
+				defaultSerializerClasses.equals(other.defaultSerializerClasses);
 		} else {
 			return false;
 		}
 	}
-	
+
+	@Override
+	public boolean canEqual(Object obj) {
+		return obj instanceof KryoSerializer;
+	}
+
 	// --------------------------------------------------------------------------------------------
 
 	private void checkKryoInitialized() {

http://git-wip-us.apache.org/repos/asf/flink/blob/b49b1552/flink-java/src/test/java/org/apache/flink/api/java/io/CollectionInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/CollectionInputFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/io/CollectionInputFormatTest.java
index ad491d0..ebaa44d 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/io/CollectionInputFormatTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/io/CollectionInputFormatTest.java
@@ -43,6 +43,7 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
+import java.util.Objects;
 
 public class CollectionInputFormatTest {
 	
@@ -265,8 +266,8 @@ public class CollectionInputFormatTest {
 
 		private static final long serialVersionUID = 1L;
 		
-		private boolean failOnRead;
-		private boolean failOnWrite;
+		private final boolean failOnRead;
+		private final boolean failOnWrite;
 		
 		public TestSerializer(boolean failOnRead, boolean failOnWrite) {
 			this.failOnRead = failOnRead;
@@ -331,5 +332,26 @@ public class CollectionInputFormatTest {
 		public void copy(DataInputView source, DataOutputView target) throws IOException {
 			target.writeInt(source.readInt());
 		}
+
+		@Override
+		public boolean equals(Object obj) {
+			if (obj instanceof TestSerializer) {
+				TestSerializer other = (TestSerializer) obj;
+
+				return other.canEqual(this) && failOnRead == other.failOnRead && failOnWrite == other.failOnWrite;
+			} else {
+				return false;
+			}
+		}
+
+		@Override
+		public boolean canEqual(Object obj) {
+			return obj instanceof TestSerializer;
+		}
+
+		@Override
+		public int hashCode() {
+			return Objects.hash(failOnRead, failOnWrite);
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b49b1552/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeExtractionTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeExtractionTest.java b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeExtractionTest.java
index 34fde20..96ba264 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeExtractionTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeExtractionTest.java
@@ -336,37 +336,37 @@ public class PojoTypeExtractionTest {
 				tupleSeen = false, objectSeen = false, writableSeen = false, collectionSeen = false;
 		for(int i = 0; i < pojoTypeComplexNested.getArity(); i++) {
 			PojoField field = pojoTypeComplexNested.getPojoFieldAt(i);
-			String name = field.field.getName();
+			String name = field.getField().getName();
 			if(name.equals("date")) {
 				if(dateSeen) {
 					Assert.fail("already seen");
 				}
 				dateSeen = true;
-				Assert.assertEquals(BasicTypeInfo.DATE_TYPE_INFO, field.type);
-				Assert.assertEquals(Date.class, field.type.getTypeClass());
+				Assert.assertEquals(BasicTypeInfo.DATE_TYPE_INFO, field.getTypeInformation());
+				Assert.assertEquals(Date.class, field.getTypeInformation().getTypeClass());
 			} else if(name.equals("someNumberWithÜnicödeNäme")) {
 				if(intSeen) {
 					Assert.fail("already seen");
 				}
 				intSeen = true;
-				Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, field.type);
-				Assert.assertEquals(Integer.class, field.type.getTypeClass());
+				Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, field.getTypeInformation());
+				Assert.assertEquals(Integer.class, field.getTypeInformation().getTypeClass());
 			} else if(name.equals("someFloat")) {
 				if(floatSeen) {
 					Assert.fail("already seen");
 				}
 				floatSeen = true;
-				Assert.assertEquals(BasicTypeInfo.FLOAT_TYPE_INFO, field.type);
-				Assert.assertEquals(Float.class, field.type.getTypeClass());
+				Assert.assertEquals(BasicTypeInfo.FLOAT_TYPE_INFO, field.getTypeInformation());
+				Assert.assertEquals(Float.class, field.getTypeInformation().getTypeClass());
 			} else if(name.equals("word")) {
 				if(tupleSeen) {
 					Assert.fail("already seen");
 				}
 				tupleSeen = true;
-				Assert.assertTrue(field.type instanceof TupleTypeInfo<?>);
-				Assert.assertEquals(Tuple3.class, field.type.getTypeClass());
+				Assert.assertTrue(field.getTypeInformation() instanceof TupleTypeInfo<?>);
+				Assert.assertEquals(Tuple3.class, field.getTypeInformation().getTypeClass());
 				// do some more advanced checks on the tuple
-				TupleTypeInfo<?> tupleTypeFromComplexNested = (TupleTypeInfo<?>) field.type;
+				TupleTypeInfo<?> tupleTypeFromComplexNested = (TupleTypeInfo<?>) field.getTypeInformation();
 				Assert.assertEquals(BasicTypeInfo.LONG_TYPE_INFO, tupleTypeFromComplexNested.getTypeAt(0));
 				Assert.assertEquals(BasicTypeInfo.LONG_TYPE_INFO, tupleTypeFromComplexNested.getTypeAt(1));
 				Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, tupleTypeFromComplexNested.getTypeAt(2));
@@ -375,21 +375,21 @@ public class PojoTypeExtractionTest {
 					Assert.fail("already seen");
 				}
 				objectSeen = true;
-				Assert.assertEquals(new GenericTypeInfo<Object>(Object.class), field.type);
-				Assert.assertEquals(Object.class, field.type.getTypeClass());
+				Assert.assertEquals(new GenericTypeInfo<Object>(Object.class), field.getTypeInformation());
+				Assert.assertEquals(Object.class, field.getTypeInformation().getTypeClass());
 			} else if(name.equals("hadoopCitizen")) {
 				if(writableSeen) {
 					Assert.fail("already seen");
 				}
 				writableSeen = true;
-				Assert.assertEquals(new WritableTypeInfo<MyWritable>(MyWritable.class), field.type);
-				Assert.assertEquals(MyWritable.class, field.type.getTypeClass());
+				Assert.assertEquals(new WritableTypeInfo<MyWritable>(MyWritable.class), field.getTypeInformation());
+				Assert.assertEquals(MyWritable.class, field.getTypeInformation().getTypeClass());
 			} else if(name.equals("collection")) {
 				if(collectionSeen) {
 					Assert.fail("already seen");
 				}
 				collectionSeen = true;
-				Assert.assertEquals(new GenericTypeInfo(List.class), field.type);
+				Assert.assertEquals(new GenericTypeInfo(List.class), field.getTypeInformation());
 
 			} else {
 				Assert.fail("field "+field+" is not expected");
@@ -428,28 +428,28 @@ public class PojoTypeExtractionTest {
 		PojoTypeInfo<?> pojoTypeForClass = (PojoTypeInfo<?>) typeInformation;
 		for(int i = 0; i < pojoTypeForClass.getArity(); i++) {
 			PojoField field = pojoTypeForClass.getPojoFieldAt(i);
-			String name = field.field.getName();
+			String name = field.getField().getName();
 			if(name.equals("somethingFancy")) {
 				if(arrayListSeen) {
 					Assert.fail("already seen");
 				}
 				arrayListSeen = true;
-				Assert.assertTrue(field.type instanceof GenericTypeInfo);
-				Assert.assertEquals(ArrayList.class, field.type.getTypeClass());
+				Assert.assertTrue(field.getTypeInformation() instanceof GenericTypeInfo);
+				Assert.assertEquals(ArrayList.class, field.getTypeInformation().getTypeClass());
 			} else if(name.equals("fancyIds")) {
 				if(multisetSeen) {
 					Assert.fail("already seen");
 				}
 				multisetSeen = true;
-				Assert.assertTrue(field.type instanceof GenericTypeInfo);
-				Assert.assertEquals(HashMultiset.class, field.type.getTypeClass());
+				Assert.assertTrue(field.getTypeInformation() instanceof GenericTypeInfo);
+				Assert.assertEquals(HashMultiset.class, field.getTypeInformation().getTypeClass());
 			} else if(name.equals("fancyArray")) {
 				if(strArraySeen) {
 					Assert.fail("already seen");
 				}
 				strArraySeen = true;
-				Assert.assertEquals(BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO, field.type);
-				Assert.assertEquals(String[].class, field.type.getTypeClass());
+				Assert.assertEquals(BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO, field.getTypeInformation());
+				Assert.assertEquals(String[].class, field.getTypeInformation().getTypeClass());
 			} else if(Arrays.asList("date", "someNumberWithÜnicödeNäme", "someFloat", "word", "nothing", "hadoopCitizen", "collection").contains(name)) {
 				// ignore these, they are inherited from the ComplexNestedClass
 			}
@@ -479,13 +479,13 @@ public class PojoTypeExtractionTest {
 		PojoTypeInfo<?> pojoTypeForClass = (PojoTypeInfo<?>) typeInformation;
 		for(int i = 0; i < pojoTypeForClass.getArity(); i++) {
 			PojoField field = pojoTypeForClass.getPojoFieldAt(i);
-			String name = field.field.getName();
+			String name = field.getField().getName();
 			if(name.equals("special")) {
-				Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, field.type);
+				Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, field.getTypeInformation());
 			} else if(name.equals("f0") || name.equals("f1")) {
-				Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, field.type);
+				Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, field.getTypeInformation());
 			} else if(name.equals("f2")) {
-				Assert.assertEquals(BasicTypeInfo.LONG_TYPE_INFO, field.type);
+				Assert.assertEquals(BasicTypeInfo.LONG_TYPE_INFO, field.getTypeInformation());
 			} else {
 				Assert.fail("unexpected field");
 			}
@@ -499,15 +499,15 @@ public class PojoTypeExtractionTest {
 		PojoTypeInfo<?> pojoTypeForClass = (PojoTypeInfo<?>) typeForClass;
 		for(int i = 0; i < pojoTypeForClass.getArity(); i++) {
 			PojoField field = pojoTypeForClass.getPojoFieldAt(i);
-			String name = field.field.getName();
+			String name = field.getField().getName();
 			if(name.equals("field1")) {
-				Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, field.type);
+				Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, field.getTypeInformation());
 			} else if (name.equals("field2")) {
-				Assert.assertEquals(BasicTypeInfo.LONG_TYPE_INFO, field.type);
+				Assert.assertEquals(BasicTypeInfo.LONG_TYPE_INFO, field.getTypeInformation());
 			} else if (name.equals("field3")) {
-				Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, field.type);
+				Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, field.getTypeInformation());
 			} else if (name.equals("key")) {
-				Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, field.type);
+				Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, field.getTypeInformation());
 			} else {
 				Assert.fail("Unexpected field "+field);
 			}
@@ -525,13 +525,13 @@ public class PojoTypeExtractionTest {
 		PojoTypeInfo<?> pojoTypeForClass = (PojoTypeInfo<?>) typeForClass;
 		for(int i = 0; i < pojoTypeForClass.getArity(); i++) {
 			PojoField field = pojoTypeForClass.getPojoFieldAt(i);
-			String name = field.field.getName();
+			String name = field.getField().getName();
 			if(name.equals("field1")) {
-				Assert.assertEquals(new GenericTypeInfo<Object>(Object.class), field.type);
+				Assert.assertEquals(new GenericTypeInfo<Object>(Object.class), field.getTypeInformation());
 			} else if (name.equals("field2")) {
-				Assert.assertEquals(new GenericTypeInfo<Object>(Object.class), field.type);
+				Assert.assertEquals(new GenericTypeInfo<Object>(Object.class), field.getTypeInformation());
 			} else if (name.equals("key")) {
-				Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, field.type);
+				Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, field.getTypeInformation());
 			} else {
 				Assert.fail("Unexpected field "+field);
 			}
@@ -546,14 +546,14 @@ public class PojoTypeExtractionTest {
 		PojoTypeInfo<?> pojoTypeForClass = (PojoTypeInfo<?>) typeForClass;
 		for(int i = 0; i < pojoTypeForClass.getArity(); i++) {
 			PojoField field = pojoTypeForClass.getPojoFieldAt(i);
-			String name = field.field.getName();
+			String name = field.getField().getName();
 			if(name.equals("field1")) {
-				Assert.assertTrue(field.type instanceof PojoTypeInfo<?>); // From tuple is pojo (not tuple type!)
+				Assert.assertTrue(field.getTypeInformation() instanceof PojoTypeInfo<?>); // From tuple is pojo (not tuple type!)
 			} else if (name.equals("field2")) {
-				Assert.assertTrue(field.type instanceof TupleTypeInfo<?>);
-				Assert.assertTrue( ((TupleTypeInfo<?>)field.type).getTypeAt(0).equals(BasicTypeInfo.STRING_TYPE_INFO) );
+				Assert.assertTrue(field.getTypeInformation() instanceof TupleTypeInfo<?>);
+				Assert.assertTrue( ((TupleTypeInfo<?>)field.getTypeInformation()).getTypeAt(0).equals(BasicTypeInfo.STRING_TYPE_INFO) );
 			} else if (name.equals("key")) {
-				Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, field.type);
+				Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, field.getTypeInformation());
 			} else {
 				Assert.fail("Unexpected field "+field);
 			}
@@ -641,13 +641,13 @@ public class PojoTypeExtractionTest {
 		PojoTypeInfo<?> pti = (PojoTypeInfo<?>) ti;
 		for(int i = 0; i < pti.getArity(); i++) {
 			PojoField field = pti.getPojoFieldAt(i);
-			String name = field.field.getName();
+			String name = field.getField().getName();
 			if(name.equals("field1")) {
-				Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, field.type);
+				Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, field.getTypeInformation());
 			} else if (name.equals("field2")) {
-				Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, field.type);
+				Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, field.getTypeInformation());
 			} else if (name.equals("key")) {
-				Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, field.type);
+				Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, field.getTypeInformation());
 			} else {
 				Assert.fail("Unexpected field "+field);
 			}
@@ -680,15 +680,15 @@ public class PojoTypeExtractionTest {
 		PojoTypeInfo<?> pti = (PojoTypeInfo<?>) ti;
 		for(int i = 0; i < pti.getArity(); i++) {
 			PojoField field = pti.getPojoFieldAt(i);
-			String name = field.field.getName();
+			String name = field.getField().getName();
 			if(name.equals("extraField")) {
-				Assert.assertEquals(BasicTypeInfo.CHAR_TYPE_INFO, field.type);
+				Assert.assertEquals(BasicTypeInfo.CHAR_TYPE_INFO, field.getTypeInformation());
 			} else if (name.equals("f0")) {
-				Assert.assertEquals(BasicTypeInfo.BOOLEAN_TYPE_INFO, field.type);
+				Assert.assertEquals(BasicTypeInfo.BOOLEAN_TYPE_INFO, field.getTypeInformation());
 			} else if (name.equals("f1")) {
-				Assert.assertEquals(BasicTypeInfo.BOOLEAN_TYPE_INFO, field.type);
+				Assert.assertEquals(BasicTypeInfo.BOOLEAN_TYPE_INFO, field.getTypeInformation());
 			} else if (name.equals("f2")) {
-				Assert.assertEquals(BasicTypeInfo.LONG_TYPE_INFO, field.type);
+				Assert.assertEquals(BasicTypeInfo.LONG_TYPE_INFO, field.getTypeInformation());
 			} else {
 				Assert.fail("Unexpected field "+field);
 			}
@@ -831,7 +831,7 @@ public class PojoTypeExtractionTest {
 	public void testRecursivePojo1() {
 		TypeInformation<?> ti = TypeExtractor.createTypeInfo(RecursivePojo1.class);
 		Assert.assertTrue(ti instanceof PojoTypeInfo);
-		Assert.assertEquals(GenericTypeInfo.class, ((PojoTypeInfo) ti).getPojoFieldAt(0).type.getClass());
+		Assert.assertEquals(GenericTypeInfo.class, ((PojoTypeInfo) ti).getPojoFieldAt(0).getTypeInformation().getClass());
 	}
 
 	@Test
@@ -839,8 +839,8 @@ public class PojoTypeExtractionTest {
 		TypeInformation<?> ti = TypeExtractor.createTypeInfo(RecursivePojo2.class);
 		Assert.assertTrue(ti instanceof PojoTypeInfo);
 		PojoField pf = ((PojoTypeInfo) ti).getPojoFieldAt(0);
-		Assert.assertTrue(pf.type instanceof TupleTypeInfo);
-		Assert.assertEquals(GenericTypeInfo.class, ((TupleTypeInfo) pf.type).getTypeAt(0).getClass());
+		Assert.assertTrue(pf.getTypeInformation() instanceof TupleTypeInfo);
+		Assert.assertEquals(GenericTypeInfo.class, ((TupleTypeInfo) pf.getTypeInformation()).getTypeAt(0).getClass());
 	}
 
 	@Test
@@ -848,8 +848,8 @@ public class PojoTypeExtractionTest {
 		TypeInformation<?> ti = TypeExtractor.createTypeInfo(RecursivePojo3.class);
 		Assert.assertTrue(ti instanceof PojoTypeInfo);
 		PojoField pf = ((PojoTypeInfo) ti).getPojoFieldAt(0);
-		Assert.assertTrue(pf.type instanceof PojoTypeInfo);
-		Assert.assertEquals(GenericTypeInfo.class, ((PojoTypeInfo) pf.type).getPojoFieldAt(0).type.getClass());
+		Assert.assertTrue(pf.getTypeInformation() instanceof PojoTypeInfo);
+		Assert.assertEquals(GenericTypeInfo.class, ((PojoTypeInfo) pf.getTypeInformation()).getPojoFieldAt(0).getTypeInformation().getClass());
 	}
 
 	public static class FooBarPojo {

http://git-wip-us.apache.org/repos/asf/flink/blob/b49b1552/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java
index d27a82b..7f0cf5b 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java
@@ -66,6 +66,8 @@ import org.apache.hadoop.io.Writable;
 import org.junit.Assert;
 import org.junit.Test;
 
+import javax.xml.bind.TypeConstraintException;
+
 
 public class TypeExtractorTest {
 
@@ -1237,7 +1239,7 @@ public class TypeExtractorTest {
 		TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation) TypeInfoParser.parse("org.apache.flink.api.java.type.extractor.TypeExtractorTest$CustomArrayObject[]"));
 
 		Assert.assertTrue(ti instanceof ObjectArrayTypeInfo<?, ?>);
-		Assert.assertEquals(CustomArrayObject.class, ((ObjectArrayTypeInfo<?, ?>) ti).getComponentType());
+		Assert.assertEquals(CustomArrayObject.class, ((ObjectArrayTypeInfo<?, ?>) ti).getComponentInfo().getTypeClass());
 	}
 
 	@SuppressWarnings({ "rawtypes", "unchecked" })