You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2015/09/17 13:24:07 UTC

[4/9] flink git commit: [FLINK-2637] [api-breaking] [types] Adds equals and hashCode method to TypeInformations and TypeSerializers - Fixes ObjectArrayTypeInfo - Makes CompositeTypes serializable - Adds test for equality relation's symmetric property - A

http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-java/src/test/java/org/apache/flink/api/java/typeutils/EnumTypeInfoTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/EnumTypeInfoTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/EnumTypeInfoTest.java
new file mode 100644
index 0000000..b200566
--- /dev/null
+++ b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/EnumTypeInfoTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+public class EnumTypeInfoTest extends TestLogger {
+
+	enum TestEnum {
+		ONE, TWO
+	}
+
+	enum AlternativeEnum {
+		ONE, TWO
+	}
+
+	@Test
+	public void testEnumTypeEquality() {
+		EnumTypeInfo<TestEnum> enumTypeInfo1 = new EnumTypeInfo<TestEnum>(TestEnum.class);
+		EnumTypeInfo<TestEnum> enumTypeInfo2 = new EnumTypeInfo<TestEnum>(TestEnum.class);
+
+		assertEquals(enumTypeInfo1, enumTypeInfo2);
+		assertEquals(enumTypeInfo1.hashCode(), enumTypeInfo2.hashCode());
+	}
+
+	@Test
+	public void testEnumTypeInequality() {
+		EnumTypeInfo<TestEnum> enumTypeInfo1 = new EnumTypeInfo<TestEnum>(TestEnum.class);
+		EnumTypeInfo<AlternativeEnum> enumTypeInfo2 = new EnumTypeInfo<AlternativeEnum>(AlternativeEnum.class);
+
+		assertNotEquals(enumTypeInfo1, enumTypeInfo2);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-java/src/test/java/org/apache/flink/api/java/typeutils/GenericTypeInfoTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/GenericTypeInfoTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/GenericTypeInfoTest.java
new file mode 100644
index 0000000..fad43df
--- /dev/null
+++ b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/GenericTypeInfoTest.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+public class GenericTypeInfoTest extends TestLogger {
+
+	static class TestClass {}
+	static class AlternativeClass {}
+
+	@Test
+	public void testGenericTypeInfoEquality() {
+		GenericTypeInfo<TestClass> tpeInfo1 = new GenericTypeInfo<>(TestClass.class);
+		GenericTypeInfo<TestClass> tpeInfo2 = new GenericTypeInfo<>(TestClass.class);
+
+		assertEquals(tpeInfo1, tpeInfo2);
+		assertEquals(tpeInfo1.hashCode(), tpeInfo2.hashCode());
+	}
+
+	@Test
+	public void testGenericTypeInfoInequality() {
+		GenericTypeInfo<TestClass> tpeInfo1 = new GenericTypeInfo<>(TestClass.class);
+		GenericTypeInfo<AlternativeClass> tpeInfo2 = new GenericTypeInfo<>(AlternativeClass.class);
+
+		assertNotEquals(tpeInfo1, tpeInfo2);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-java/src/test/java/org/apache/flink/api/java/typeutils/MissingTypeInfoTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/MissingTypeInfoTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/MissingTypeInfoTest.java
new file mode 100644
index 0000000..ee57475
--- /dev/null
+++ b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/MissingTypeInfoTest.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import org.apache.flink.api.common.functions.InvalidTypesException;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+public class MissingTypeInfoTest extends TestLogger {
+	static final String functionName = "foobar";
+	static final InvalidTypesException testException = new InvalidTypesException("Test exception.");
+
+	@Test
+	public void testMissingTypeInfoEquality() {
+		MissingTypeInfo tpeInfo1 = new MissingTypeInfo(functionName, testException);
+		MissingTypeInfo tpeInfo2 = new MissingTypeInfo(functionName, testException);
+
+		assertEquals(tpeInfo1, tpeInfo2);
+		assertEquals(tpeInfo1.hashCode(), tpeInfo2.hashCode());
+	}
+
+	@Test
+	public void testMissingTypeInfoInequality() {
+		MissingTypeInfo tpeInfo1 = new MissingTypeInfo(functionName, testException);
+		MissingTypeInfo tpeInfo2 = new MissingTypeInfo("alt" + functionName, testException);
+
+		assertNotEquals(tpeInfo1, tpeInfo2);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-java/src/test/java/org/apache/flink/api/java/typeutils/ObjectArrayTypeInfoTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/ObjectArrayTypeInfoTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/ObjectArrayTypeInfoTest.java
new file mode 100644
index 0000000..f3b39c0
--- /dev/null
+++ b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/ObjectArrayTypeInfoTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+
+import java.util.ArrayList;
+
+import static org.junit.Assert.*;
+
+public class ObjectArrayTypeInfoTest extends TestLogger {
+
+	public static class TestClass{}
+
+	@Test
+	public void testObjectArrayTypeInfoEquality() {
+		ObjectArrayTypeInfo<TestClass[], TestClass> tpeInfo1 = ObjectArrayTypeInfo.getInfoFor(
+			TestClass[].class,
+			new GenericTypeInfo<TestClass>(TestClass.class));
+
+		ObjectArrayTypeInfo<TestClass[], TestClass> tpeInfo2 = ObjectArrayTypeInfo.getInfoFor(
+			TestClass[].class,
+			new GenericTypeInfo<TestClass>(TestClass.class));
+
+		assertEquals(tpeInfo1, tpeInfo2);
+		assertEquals(tpeInfo1.hashCode(), tpeInfo2.hashCode());
+	}
+
+	@Test
+	public void testObjectArrayTypeInfoInequality() {
+		ObjectArrayTypeInfo<TestClass[], TestClass> tpeInfo1 = ObjectArrayTypeInfo.getInfoFor(
+			TestClass[].class,
+			new GenericTypeInfo<TestClass>(TestClass.class));
+
+		ObjectArrayTypeInfo<TestClass[], TestClass> tpeInfo2 = ObjectArrayTypeInfo.getInfoFor(
+			TestClass[].class,
+			new PojoTypeInfo<TestClass>(TestClass.class, new ArrayList<PojoField>()));
+
+		assertNotEquals(tpeInfo1, tpeInfo2);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-java/src/test/java/org/apache/flink/api/java/typeutils/PojoTypeInfoTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/PojoTypeInfoTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/PojoTypeInfoTest.java
index 12b7913..2fe1357 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/PojoTypeInfoTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/PojoTypeInfoTest.java
@@ -21,12 +21,15 @@ package org.apache.flink.api.java.typeutils;
 import static org.junit.Assert.*;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.util.InstantiationUtil;
 import org.junit.Test;
 
+import java.io.IOException;
+
 public class PojoTypeInfoTest {
 
 	@Test
-	public void testEquals() {
+	public void testPojoTypeInfoEquality() {
 		try {
 			TypeInformation<TestPojo> info1 = TypeExtractor.getForClass(TestPojo.class);
 			TypeInformation<TestPojo> info2 = TypeExtractor.getForClass(TestPojo.class);
@@ -42,6 +45,35 @@ public class PojoTypeInfoTest {
 			fail(e.getMessage());
 		}
 	}
+
+	@Test
+	public void testPojoTypeInfoInequality() {
+		try {
+			TypeInformation<TestPojo> info1 = TypeExtractor.getForClass(TestPojo.class);
+			TypeInformation<AlternatePojo> info2 = TypeExtractor.getForClass(AlternatePojo.class);
+
+			assertTrue(info1 instanceof PojoTypeInfo);
+			assertTrue(info2 instanceof PojoTypeInfo);
+
+			assertFalse(info1.equals(info2));
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testSerializabilityOfPojoTypeInfo() throws IOException, ClassNotFoundException {
+		PojoTypeInfo<TestPojo> pojoTypeInfo = (PojoTypeInfo<TestPojo>)TypeExtractor.getForClass(TestPojo.class);
+
+		byte[] serializedPojoTypeInfo = InstantiationUtil.serializeObject(pojoTypeInfo);
+		PojoTypeInfo<TestPojo> deserializedPojoTypeInfo = (PojoTypeInfo<TestPojo>)InstantiationUtil.deserializeObject(
+			serializedPojoTypeInfo,
+			getClass().getClassLoader());
+
+		assertEquals(pojoTypeInfo, deserializedPojoTypeInfo);
+	}
 	
 	public static final class TestPojo {
 		
@@ -60,4 +92,22 @@ public class PojoTypeInfoTest {
 			return aString;
 		}
 	}
+
+	public static final class AlternatePojo {
+
+		public int someInt;
+
+		private String aString;
+
+		public Double[] doubleArray;
+
+
+		public void setaString(String aString) {
+			this.aString = aString;
+		}
+
+		public String getaString() {
+			return aString;
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-java/src/test/java/org/apache/flink/api/java/typeutils/RecordTypeInfoTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/RecordTypeInfoTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/RecordTypeInfoTest.java
new file mode 100644
index 0000000..7aeb062
--- /dev/null
+++ b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/RecordTypeInfoTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+public class RecordTypeInfoTest extends TestLogger {
+
+	@Test
+	public void testRecordTypeInfoEquality() {
+		RecordTypeInfo tpeInfo1 = new RecordTypeInfo();
+		RecordTypeInfo tpeInfo2 = new RecordTypeInfo();
+
+		assertEquals(tpeInfo1, tpeInfo2);
+		assertEquals(tpeInfo1.hashCode(), tpeInfo2.hashCode());
+	}
+
+	@Test
+	public void testRecordTypeInfoInequality() {
+		RecordTypeInfo tpeInfo1 = new RecordTypeInfo();
+		MissingTypeInfo tpeInfo2 = new MissingTypeInfo("foobar");
+
+		assertNotEquals(tpeInfo1, tpeInfo2);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-java/src/test/java/org/apache/flink/api/java/typeutils/TupleTypeInfoTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/TupleTypeInfoTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/TupleTypeInfoTest.java
new file mode 100644
index 0000000..b6cff34
--- /dev/null
+++ b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/TupleTypeInfoTest.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.util.TestLogger;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TupleTypeInfoTest extends TestLogger {
+
+	@Test
+	public void testTupleTypeInfoSymmetricEqualityRelation() {
+		TupleTypeInfo<Tuple1<Integer>> tupleTypeInfo = new TupleTypeInfo<>(BasicTypeInfo.INT_TYPE_INFO);
+
+		TupleTypeInfoBase<Tuple1> anonymousTupleTypeInfo = new TupleTypeInfoBase<Tuple1>(
+			(Class<Tuple1>)Tuple1.class,
+			(TypeInformation<?>)BasicTypeInfo.INT_TYPE_INFO) {
+
+			private static final long serialVersionUID = -7985593598027660836L;
+
+			@Override
+			public TypeSerializer<Tuple1> createSerializer(ExecutionConfig config) {
+				return null;
+			}
+
+			@Override
+			protected TypeComparatorBuilder<Tuple1> createTypeComparatorBuilder() {
+				return null;
+			}
+
+			@Override
+			public String[] getFieldNames() {
+				return new String[0];
+			}
+
+			@Override
+			public int getFieldIndex(String fieldName) {
+				return 0;
+			}
+		};
+
+		boolean tupleVsAnonymous = tupleTypeInfo.equals(anonymousTupleTypeInfo);
+		boolean anonymousVsTuple = anonymousTupleTypeInfo.equals(tupleTypeInfo);
+
+		Assert.assertTrue("Equality relation should be symmetric", tupleVsAnonymous == anonymousVsTuple);
+	}
+
+	@Test
+	public void testTupleTypeInfoEquality() {
+		TupleTypeInfo<Tuple2<Integer, String>> tupleTypeInfo1 = new TupleTypeInfo<>(
+			BasicTypeInfo.INT_TYPE_INFO,
+			BasicTypeInfo.STRING_TYPE_INFO);
+
+		TupleTypeInfo<Tuple2<Integer, String>> tupleTypeInfo2 = new TupleTypeInfo<>(
+			BasicTypeInfo.INT_TYPE_INFO,
+			BasicTypeInfo.STRING_TYPE_INFO);
+
+		Assert.assertEquals(tupleTypeInfo1, tupleTypeInfo2);
+		Assert.assertEquals(tupleTypeInfo1.hashCode(), tupleTypeInfo2.hashCode());
+	}
+
+	@Test
+	public void testTupleTypeInfoInequality() {
+		TupleTypeInfo<Tuple2<Integer, String>> tupleTypeInfo1 = new TupleTypeInfo<>(
+			BasicTypeInfo.INT_TYPE_INFO,
+			BasicTypeInfo.STRING_TYPE_INFO);
+
+		TupleTypeInfo<Tuple2<Integer, Boolean>> tupleTypeInfo2 = new TupleTypeInfo<>(
+			BasicTypeInfo.INT_TYPE_INFO,
+			BasicTypeInfo.BOOLEAN_TYPE_INFO);
+
+		Assert.assertNotEquals(tupleTypeInfo1, tupleTypeInfo2);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-java/src/test/java/org/apache/flink/api/java/typeutils/TypeInfoParserTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/TypeInfoParserTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/TypeInfoParserTest.java
index 9fe8174..e225460 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/TypeInfoParserTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/TypeInfoParserTest.java
@@ -181,14 +181,14 @@ public class TypeInfoParserTest {
 				+ ">");
 		Assert.assertTrue(ti instanceof PojoTypeInfo);
 		PojoTypeInfo<?> pti = (PojoTypeInfo<?>) ti;
-		Assert.assertEquals("array", pti.getPojoFieldAt(0).field.getName());
-		Assert.assertTrue(pti.getPojoFieldAt(0).type instanceof BasicArrayTypeInfo);
-		Assert.assertEquals("basic", pti.getPojoFieldAt(1).field.getName());
-		Assert.assertTrue(pti.getPojoFieldAt(1).type instanceof BasicTypeInfo);
-		Assert.assertEquals("hadoopCitizen", pti.getPojoFieldAt(2).field.getName());
-		Assert.assertTrue(pti.getPojoFieldAt(2).type instanceof WritableTypeInfo);
-		Assert.assertEquals("tuple", pti.getPojoFieldAt(3).field.getName());
-		Assert.assertTrue(pti.getPojoFieldAt(3).type instanceof TupleTypeInfo);
+		Assert.assertEquals("array", pti.getPojoFieldAt(0).getField().getName());
+		Assert.assertTrue(pti.getPojoFieldAt(0).getTypeInformation() instanceof BasicArrayTypeInfo);
+		Assert.assertEquals("basic", pti.getPojoFieldAt(1).getField().getName());
+		Assert.assertTrue(pti.getPojoFieldAt(1).getTypeInformation() instanceof BasicTypeInfo);
+		Assert.assertEquals("hadoopCitizen", pti.getPojoFieldAt(2).getField().getName());
+		Assert.assertTrue(pti.getPojoFieldAt(2).getTypeInformation() instanceof WritableTypeInfo);
+		Assert.assertEquals("tuple", pti.getPojoFieldAt(3).getField().getName());
+		Assert.assertTrue(pti.getPojoFieldAt(3).getTypeInformation() instanceof TupleTypeInfo);
 	}
 	
 	@Test
@@ -198,12 +198,12 @@ public class TypeInfoParserTest {
 		TupleTypeInfo<?> tti = (TupleTypeInfo<?>) ti;
 		Assert.assertTrue(tti.getTypeAt(0) instanceof BasicTypeInfo);
 		Assert.assertTrue(tti.getTypeAt(1) instanceof TupleTypeInfo);
-		TupleTypeInfo<?> tti2 = (TupleTypeInfo<?>) tti.getTypeAt(1);
+		TupleTypeInfo<?> tti2 = (TupleTypeInfo<?>)(Object)tti.getTypeAt(1);
 		Assert.assertTrue(tti2.getTypeAt(0) instanceof BasicTypeInfo);
 		Assert.assertTrue(tti2.getTypeAt(1) instanceof PojoTypeInfo);
 		PojoTypeInfo<?> pti = (PojoTypeInfo<?>) tti2.getTypeAt(1);
-		Assert.assertEquals("basic", pti.getPojoFieldAt(0).field.getName());
-		Assert.assertTrue(pti.getPojoFieldAt(0).type instanceof BasicTypeInfo);
+		Assert.assertEquals("basic", pti.getPojoFieldAt(0).getField().getName());
+		Assert.assertTrue(pti.getPojoFieldAt(0).getTypeInformation() instanceof BasicTypeInfo);
 	}
 	
 	public static class MyWritable implements Writable {
@@ -232,7 +232,7 @@ public class TypeInfoParserTest {
 		TypeInformation<?> ti = TypeInfoParser.parse("java.lang.Class[]");
 
 		Assert.assertTrue(ti instanceof ObjectArrayTypeInfo<?, ?>);
-		Assert.assertEquals(Class.class, ((ObjectArrayTypeInfo<?, ?>) ti).getComponentType());
+		Assert.assertEquals(Class.class, ((ObjectArrayTypeInfo<?, ?>) ti).getComponentInfo().getTypeClass());
 		
 		TypeInformation<?> ti2 = TypeInfoParser.parse("Tuple2<Integer,Double>[]");
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-java/src/test/java/org/apache/flink/api/java/typeutils/ValueTypeInfoTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/ValueTypeInfoTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/ValueTypeInfoTest.java
new file mode 100644
index 0000000..4a579c8
--- /dev/null
+++ b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/ValueTypeInfoTest.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.types.Record;
+import org.apache.flink.types.Value;
+import org.apache.flink.util.TestLogger;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.junit.Assert.*;
+
+public class ValueTypeInfoTest extends TestLogger {
+
+	public static class TestClass implements Value {
+		private static final long serialVersionUID = -492760806806568285L;
+
+		@Override
+		public void write(DataOutputView out) throws IOException {
+
+		}
+
+		@Override
+		public void read(DataInputView in) throws IOException {
+
+		}
+	}
+
+	public static class AlternativeClass implements Value {
+
+		private static final long serialVersionUID = -163437084575260172L;
+
+		@Override
+		public void write(DataOutputView out) throws IOException {
+
+		}
+
+		@Override
+		public void read(DataInputView in) throws IOException {
+
+		}
+	}
+
+	@Test
+	public void testValueTypeInfoEquality() {
+		ValueTypeInfo<TestClass> tpeInfo1 = new ValueTypeInfo<>(TestClass.class);
+		ValueTypeInfo<TestClass> tpeInfo2 = new ValueTypeInfo<>(TestClass.class);
+
+		assertEquals(tpeInfo1, tpeInfo2);
+		assertEquals(tpeInfo1.hashCode(), tpeInfo2.hashCode());
+	}
+
+	@Test
+	public void testValueTyepInfoInequality() {
+		ValueTypeInfo<TestClass> tpeInfo1 = new ValueTypeInfo<>(TestClass.class);
+		ValueTypeInfo<AlternativeClass> tpeInfo2 = new ValueTypeInfo<>(AlternativeClass.class);
+
+		assertNotEquals(tpeInfo1, tpeInfo2);
+	}
+
+	@Test
+	public void testValueTypeEqualsWithNull() throws Exception {
+		ValueTypeInfo<Record> tpeInfo = new ValueTypeInfo<>(Record.class);
+
+		Assert.assertFalse(tpeInfo.equals(null));
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-java/src/test/java/org/apache/flink/api/java/typeutils/WritableTypeInfoTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/WritableTypeInfoTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/WritableTypeInfoTest.java
new file mode 100644
index 0000000..2ab0021
--- /dev/null
+++ b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/WritableTypeInfoTest.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import org.apache.flink.util.TestLogger;
+import org.apache.hadoop.io.Writable;
+import org.junit.Test;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import static org.junit.Assert.*;
+
+public class WritableTypeInfoTest extends TestLogger {
+
+	public static class TestClass implements Writable {
+		@Override
+		public void write(DataOutput dataOutput) throws IOException {
+
+		}
+
+		@Override
+		public void readFields(DataInput dataInput) throws IOException {
+
+		}
+	}
+
+	public static class AlternateClass implements Writable {
+		@Override
+		public void write(DataOutput dataOutput) throws IOException {
+
+		}
+
+		@Override
+		public void readFields(DataInput dataInput) throws IOException {
+
+		}
+	}
+
+
+	@Test
+	public void testWritableTypeInfoEquality() {
+		WritableTypeInfo<TestClass> tpeInfo1 = new WritableTypeInfo<>(TestClass.class);
+		WritableTypeInfo<TestClass> tpeInfo2 = new WritableTypeInfo<>(TestClass.class);
+
+		assertEquals(tpeInfo1, tpeInfo2);
+		assertEquals(tpeInfo1.hashCode(), tpeInfo2.hashCode());
+	}
+
+	@Test
+	public void testWritableTypeInfoInequality() {
+		WritableTypeInfo<TestClass> tpeInfo1 = new WritableTypeInfo<>(TestClass.class);
+		WritableTypeInfo<AlternateClass> tpeInfo2 = new WritableTypeInfo<>(AlternateClass.class);
+
+		assertNotEquals(tpeInfo1, tpeInfo2);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntListSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntListSerializer.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntListSerializer.java
index 69dfeb9..e872526 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntListSerializer.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntListSerializer.java
@@ -106,4 +106,25 @@ public class IntListSerializer extends TypeSerializer<IntList> {
 			target.writeInt(source.readInt());
 		}
 	}
+
+	@Override
+	public boolean equals(Object obj) {
+		if (obj instanceof IntListSerializer) {
+			IntListSerializer other = (IntListSerializer) obj;
+
+			return other.canEqual(this);
+		} else {
+			return false;
+		}
+	}
+
+	@Override
+	public boolean canEqual(Object obj) {
+		return obj instanceof IntListSerializer;
+	}
+
+	@Override
+	public int hashCode() {
+		return IntListSerializer.class.hashCode();
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntPairSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntPairSerializer.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntPairSerializer.java
index c2571cc..e4a9264 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntPairSerializer.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntPairSerializer.java
@@ -88,6 +88,27 @@ public class IntPairSerializer extends TypeSerializer<IntPair> {
 		target.write(source, 8);
 	}
 
+	@Override
+	public boolean equals(Object obj) {
+		if (obj instanceof IntPairSerializer) {
+			IntPairSerializer other = (IntPairSerializer) obj;
+
+			return other.canEqual(this);
+		} else {
+			return false;
+		}
+	}
+
+	@Override
+	public boolean canEqual(Object obj) {
+		return obj instanceof IntPairSerializer;
+	}
+
+	@Override
+	public int hashCode() {
+		return IntPairSerializer.class.hashCode();
+	}
+
 	public static final class IntPairSerializerFactory implements TypeSerializerFactory<IntPair> {
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/StringPairSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/StringPairSerializer.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/StringPairSerializer.java
index 388e8bd..b62b097 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/StringPairSerializer.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/StringPairSerializer.java
@@ -83,4 +83,25 @@ public class StringPairSerializer extends TypeSerializer<StringPair> {
 		StringValue.writeString(StringValue.readString(source), target);
 		StringValue.writeString(StringValue.readString(source), target);
 	}
+
+	@Override
+	public boolean equals(Object obj) {
+		if (obj instanceof StringPairSerializer) {
+			StringPairSerializer other = (StringPairSerializer) obj;
+
+			return other.canEqual(this);
+		} else {
+			return false;
+		}
+	}
+
+	@Override
+	public boolean canEqual(Object obj) {
+		return obj instanceof StringPairSerializer;
+	}
+
+	@Override
+	public int hashCode() {
+		return StringPairSerializer.class.hashCode();
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
index 344b186..5265134 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
@@ -296,7 +296,7 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
                 "Field \"" + pojoFields(i) + "\" not part of POJO type " +
                   info.getTypeClass.getCanonicalName);
             }
-            classesBuf += info.getPojoFieldAt(pos).`type`.getTypeClass
+            classesBuf += info.getPojoFieldAt(pos).getTypeInformation().getTypeClass
           }
         }
       case _ => throw new IllegalArgumentException("Type information is not valid.")

http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeInformationGen.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeInformationGen.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeInformationGen.scala
index 07f7205..aa76fcc 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeInformationGen.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeInformationGen.scala
@@ -121,7 +121,7 @@ private[flink] trait TypeInformationGen[C <: Context] {
             fieldSerializers(i) = types(i).createSerializer(executionConfig)
           }
 
-          new CaseClassSerializer[T](tupleType, fieldSerializers) {
+          new CaseClassSerializer[T](getTypeClass(), fieldSerializers) {
             override def createInstance(fields: Array[AnyRef]): T = {
               instance.splice
             }

http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassSerializer.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassSerializer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassSerializer.scala
index 2a76c37..3a015f7 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassSerializer.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassSerializer.scala
@@ -17,7 +17,6 @@
  */
 package org.apache.flink.api.scala.typeutils
 
-import org.apache.commons.lang.SerializationUtils
 import org.apache.flink.api.common.typeutils.TypeSerializer
 import org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase
 import org.apache.flink.core.memory.{DataOutputView, DataInputView}
@@ -29,19 +28,25 @@ import org.apache.flink.core.memory.{DataOutputView, DataInputView}
 abstract class CaseClassSerializer[T <: Product](
     clazz: Class[T],
     scalaFieldSerializers: Array[TypeSerializer[_]])
-  extends TupleSerializerBase[T](clazz, scalaFieldSerializers) with Cloneable {
+  extends TupleSerializerBase[T](clazz, scalaFieldSerializers)
+  with Cloneable {
 
   @transient var fields : Array[AnyRef] = _
 
   @transient var instanceCreationFailed : Boolean = false
 
   override def duplicate = {
-    val result = this.clone().asInstanceOf[CaseClassSerializer[T]]
+    clone().asInstanceOf[CaseClassSerializer[T]]
+  }
+
+  @throws[CloneNotSupportedException]
+  override protected def clone(): Object = {
+    val result = super.clone().asInstanceOf[CaseClassSerializer[T]]
 
-    // set transient fields to null and make copy of serializers
+    // achieve a deep copy by duplicating the field serializers
+    result.fieldSerializers.transform(_.duplicate())
     result.fields = null
     result.instanceCreationFailed = false
-    result.fieldSerializers = fieldSerializers.map(_.duplicate())
 
     result
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassTypeInfo.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassTypeInfo.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassTypeInfo.scala
index 0c8049d..37c7431 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassTypeInfo.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassTypeInfo.scala
@@ -18,19 +18,19 @@
 
 package org.apache.flink.api.scala.typeutils
 
+import java.util
 import java.util.regex.{Pattern, Matcher}
 
 import org.apache.flink.api.common.ExecutionConfig
 import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.common.typeinfo.AtomicType
-import org.apache.flink.api.common.typeutils.CompositeType.InvalidFieldReferenceException
-import org.apache.flink.api.common.typeutils.CompositeType.FlatFieldDescriptor
+import org.apache.flink.api.common.typeutils.CompositeType.{TypeComparatorBuilder,
+InvalidFieldReferenceException, FlatFieldDescriptor}
 import org.apache.flink.api.common.typeutils._
 import org.apache.flink.api.java.operators.Keys.ExpressionKeys
-import org.apache.flink.api.java.typeutils.{TupleTypeInfoBase, PojoTypeInfo}
-import PojoTypeInfo.NamedFlatFieldDescriptor
+import org.apache.flink.api.java.typeutils.TupleTypeInfoBase
 
 import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
 
 /**
  * TypeInformation for Case Classes. Creation and access is different from
@@ -38,7 +38,7 @@ import scala.collection.JavaConverters._
  */
 abstract class CaseClassTypeInfo[T <: Product](
     clazz: Class[T],
-    typeParamTypeInfos: Array[TypeInformation[_]],
+    val typeParamTypeInfos: Array[TypeInformation[_]],
     fieldTypes: Seq[TypeInformation[_]],
     val fieldNames: Seq[String])
   extends TupleTypeInfoBase[T](clazz, fieldTypes: _*) {
@@ -63,46 +63,19 @@ abstract class CaseClassTypeInfo[T <: Product](
     fields map { x => fieldNames.indexOf(x) }
   }
 
-  /*
-   * Comparator construction
-   */
-  var fieldComparators: Array[TypeComparator[_]] = null
-  var logicalKeyFields : Array[Int] = null
-  var comparatorHelperIndex = 0
-
-  override protected def initializeNewComparator(localKeyCount: Int): Unit = {
-    fieldComparators = new Array(localKeyCount)
-    logicalKeyFields = new Array(localKeyCount)
-    comparatorHelperIndex = 0
-  }
-
-  override protected def addCompareField(fieldId: Int, comparator: TypeComparator[_]): Unit = {
-    fieldComparators(comparatorHelperIndex) = comparator
-    logicalKeyFields(comparatorHelperIndex) = fieldId
-    comparatorHelperIndex += 1
-  }
-
-  override protected def getNewComparator(executionConfig: ExecutionConfig): TypeComparator[T] = {
-    val finalLogicalKeyFields = logicalKeyFields.take(comparatorHelperIndex)
-    val finalComparators = fieldComparators.take(comparatorHelperIndex)
-    val maxKey = finalLogicalKeyFields.max
-
-    // create serializers only up to the last key, fields after that are not needed
-    val fieldSerializers = types.take(maxKey + 1).map(_.createSerializer(executionConfig))
-    new CaseClassComparator[T](finalLogicalKeyFields, finalComparators, fieldSerializers.toArray)
-  }
-
   override def getFlatFields(
       fieldExpression: String,
       offset: Int,
       result: java.util.List[FlatFieldDescriptor]): Unit = {
     val matcher: Matcher = PATTERN_NESTED_FIELDS_WILDCARD.matcher(fieldExpression)
+
     if (!matcher.matches) {
       throw new InvalidFieldReferenceException("Invalid tuple field reference \"" +
         fieldExpression + "\".")
     }
 
     var field: String = matcher.group(0)
+
     if ((field == ExpressionKeys.SELECT_ALL_CHAR) ||
       (field == ExpressionKeys.SELECT_ALL_CHAR_SCALA)) {
       var keyPosition: Int = 0
@@ -116,59 +89,58 @@ abstract class CaseClassTypeInfo[T <: Product](
         }
         keyPosition += 1
       }
-      return
     } else {
       field = matcher.group(1)
-    }
-
-    val intFieldMatcher = PATTERN_INT_FIELD.matcher(field)
-    if(intFieldMatcher.matches()) {
-      // convert 0-indexed integer field into 1-indexed name field
-      field = "_" + (Integer.valueOf(field) + 1)
-    }
 
-    var pos = offset
-    val tail = matcher.group(3)
-    if (tail == null) {
+      val intFieldMatcher = PATTERN_INT_FIELD.matcher(field)
+      if(intFieldMatcher.matches()) {
+        // convert 0-indexed integer field into 1-indexed name field
+        field = "_" + (Integer.valueOf(field) + 1)
+      }
 
-      for (i <- 0 until fieldNames.length) {
-        if (field == fieldNames(i)) {
-          // found field
-          fieldTypes(i) match {
-            case ct: CompositeType[_] =>
-              ct.getFlatFields("*", pos, result)
-              return
-            case _ =>
-              result.add(new FlatFieldDescriptor(pos, fieldTypes(i)))
-              return
+      val tail = matcher.group(3)
+
+      if (tail == null) {
+        def extractFlatFields(index: Int, pos: Int): Unit = {
+          if (index >= fieldNames.size) {
+            throw new InvalidFieldReferenceException("Unable to find field \"" + field +
+              "\" in type " + this + ".")
+          } else if (field == fieldNames(index)) {
+            // found field
+            fieldTypes(index) match {
+              case ct: CompositeType[_] =>
+                ct.getFlatFields("*", pos, result)
+              case _ =>
+                result.add(new FlatFieldDescriptor(pos, fieldTypes(index)))
+            }
+          } else {
+            // skipping over non-matching fields
+            extractFlatFields(index + 1, pos + fieldTypes(index).getTotalFields())
           }
-        } else {
-          // skipping over non-matching fields
-          pos += fieldTypes(i).getTotalFields
         }
-      }
-      throw new InvalidFieldReferenceException("Unable to find field \"" + field +
-        "\" in type " + this + ".")
-    } else {
-      var pos = offset
-      for (i <- 0 until fieldNames.length) {
-        if (field == fieldNames(i)) {
-          // found field
-          fieldTypes(i) match {
-            case ct: CompositeType[_] =>
-              ct.getFlatFields(tail, pos, result)
-              return
-            case _ =>
-              throw new InvalidFieldReferenceException("Nested field expression \"" + tail +
-                "\" not possible on atomic type " + fieldTypes(i) + ".")
+
+        extractFlatFields(0, offset)
+      } else {
+        def extractFlatFields(index: Int, pos: Int): Unit = {
+          if (index >= fieldNames.size) {
+            throw new InvalidFieldReferenceException("Unable to find field \"" + field +
+              "\" in type " + this + ".")
+          } else if (field == fieldNames(index)) {
+            // found field
+            fieldTypes(index) match {
+              case ct: CompositeType[_] =>
+                ct.getFlatFields(tail, pos, result)
+              case _ =>
+                throw new InvalidFieldReferenceException("Nested field expression \"" + tail +
+                  "\" not possible on atomic type " + fieldTypes(index) + ".")
+            }
+          } else {
+            extractFlatFields(index + 1, pos + fieldTypes(index).getTotalFields())
           }
-        } else {
-          // skipping over non-matching fields
-          pos += fieldTypes(i).getTotalFields
         }
+
+        extractFlatFields(0, offset)
       }
-      throw new InvalidFieldReferenceException("Unable to find field \"" + field +
-        "\" in type " + this + ".")
     }
   }
 
@@ -195,7 +167,7 @@ abstract class CaseClassTypeInfo[T <: Product](
       field = "_" + (Integer.valueOf(field) + 1)
     }
 
-    for (i <- 0 until fieldNames.length) {
+    for (i <- fieldNames.indices) {
       if (fieldNames(i) == field) {
         if (tail == null) {
           return getTypeAt(i)
@@ -225,9 +197,57 @@ abstract class CaseClassTypeInfo[T <: Product](
     }
   }
 
-  override def toString = clazz.getName + "(" + fieldNames.zip(types).map {
-    case (n, t) => n + ": " + t}
-    .mkString(", ") + ")"
+  override def createTypeComparatorBuilder(): TypeComparatorBuilder[T] = {
+    new CaseClassTypeComparatorBuilder
+  }
+
+  private class CaseClassTypeComparatorBuilder extends TypeComparatorBuilder[T] {
+    val fieldComparators: ArrayBuffer[TypeComparator[_]] = new ArrayBuffer[TypeComparator[_]]()
+    val logicalKeyFields: ArrayBuffer[Int] = new ArrayBuffer[Int]()
+
+    override def initializeTypeComparatorBuilder(size: Int): Unit = {}
+
+    override def addComparatorField(fieldId: Int, comparator: TypeComparator[_]): Unit = {
+      fieldComparators += comparator
+      logicalKeyFields += fieldId
+    }
+
+    override def createTypeComparator(config: ExecutionConfig): TypeComparator[T] = {
+      val maxIndex = logicalKeyFields.max
+
+      new CaseClassComparator[T](
+        logicalKeyFields.toArray,
+        fieldComparators.toArray,
+        types.take(maxIndex + 1).map(_.createSerializer(config))
+      )
+    }
+  }
+
+  override def toString: String = {
+    clazz.getName + "(" + fieldNames.zip(types).map {
+      case (n, t) => n + ": " + t
+    }.mkString(", ") + ")"
+  }
 
   override def isCaseClass = true
+
+  override def equals(obj: Any): Boolean = {
+    obj match {
+      case caseClass: CaseClassTypeInfo[_] =>
+        caseClass.canEqual(this) &&
+        super.equals(caseClass) &&
+        typeParamTypeInfos.sameElements(caseClass.typeParamTypeInfos) &&
+        fieldNames.equals(caseClass.fieldNames)
+      case _ => false
+    }
+  }
+
+  override def hashCode(): Int = {
+    31 * (31 * super.hashCode() + fieldNames.hashCode()) +
+      util.Arrays.hashCode(typeParamTypeInfos.asInstanceOf[Array[AnyRef]])
+  }
+
+  override def canEqual(obj: Any): Boolean = {
+    obj.isInstanceOf[CaseClassTypeInfo[_]]
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala
index 65628a0..2efc207 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala
@@ -86,11 +86,20 @@ class EitherSerializer[A, B, T <: Either[A, B]](
   }
 
   override def equals(obj: Any): Boolean = {
-    if (obj != null && obj.isInstanceOf[EitherSerializer[_, _, _]]) {
-      val other = obj.asInstanceOf[EitherSerializer[_, _, _]]
-      other.leftSerializer.equals(leftSerializer) && other.rightSerializer.equals(rightSerializer)
-    } else {
-      false
+    obj match {
+      case eitherSerializer: EitherSerializer[_, _, _] =>
+        eitherSerializer.canEqual(this) &&
+        leftSerializer.equals(eitherSerializer.leftSerializer) &&
+        rightSerializer.equals(eitherSerializer.rightSerializer)
+      case _ => false
     }
   }
+
+  override def canEqual(obj: Any): Boolean = {
+    obj.isInstanceOf[EitherSerializer[_, _, _]]
+  }
+
+  override def hashCode(): Int = {
+    31 * leftSerializer.hashCode() + rightSerializer.hashCode()
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherTypeInfo.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherTypeInfo.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherTypeInfo.scala
index a1cded7..2beebde 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherTypeInfo.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherTypeInfo.scala
@@ -27,9 +27,9 @@ import scala.collection.JavaConverters._
  * TypeInformation [[Either]].
  */
 class EitherTypeInfo[A, B, T <: Either[A, B]](
-    clazz: Class[T],
-    leftTypeInfo: TypeInformation[A],
-    rightTypeInfo: TypeInformation[B])
+    val clazz: Class[T],
+    val leftTypeInfo: TypeInformation[A],
+    val rightTypeInfo: TypeInformation[B])
   extends TypeInformation[T] {
 
   override def isBasicType: Boolean = false
@@ -55,5 +55,24 @@ class EitherTypeInfo[A, B, T <: Either[A, B]](
     new EitherSerializer(leftSerializer, rightSerializer)
   }
 
+  override def equals(obj: Any): Boolean = {
+    obj match {
+      case eitherTypeInfo: EitherTypeInfo[_, _, _] =>
+        eitherTypeInfo.canEqual(this) &&
+        clazz.equals(eitherTypeInfo.clazz) &&
+        leftTypeInfo.equals(eitherTypeInfo.leftTypeInfo) &&
+        rightTypeInfo.equals(eitherTypeInfo.rightTypeInfo)
+      case _ => false
+    }
+  }
+
+  override def canEqual(obj: Any): Boolean = {
+    obj.isInstanceOf[EitherTypeInfo[_, _, _]]
+  }
+
+  override def hashCode(): Int = {
+    31 * (31 * clazz.hashCode() + leftTypeInfo.hashCode()) + rightTypeInfo.hashCode()
+  }
+
   override def toString = s"Either[$leftTypeInfo, $rightTypeInfo]"
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializer.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializer.scala
index 8d03676..7c3bc95 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializer.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializer.scala
@@ -51,15 +51,18 @@ class EnumValueSerializer[E <: Enumeration](val enum: E) extends TypeSerializer[
   override def deserialize(reuse: T, source: DataInputView): T = deserialize(source)
 
   override def equals(obj: Any): Boolean = {
-    if (obj != null && obj.isInstanceOf[EnumValueSerializer[_]]) {
-      val other = obj.asInstanceOf[EnumValueSerializer[_]]
-      this.enum == other.enum
-    } else {
-      false
+    obj match {
+      case enumValueSerializer: EnumValueSerializer[_] =>
+        enumValueSerializer.canEqual(this) && enum == enumValueSerializer.enum
+      case _ => false
     }
   }
 
   override def hashCode(): Int = {
     enum.hashCode()
   }
+
+  override def canEqual(obj: scala.Any): Boolean = {
+    obj.isInstanceOf[EnumValueSerializer[_]]
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EnumValueTypeInfo.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EnumValueTypeInfo.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EnumValueTypeInfo.scala
index c66e4bc..e3d665e 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EnumValueTypeInfo.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EnumValueTypeInfo.scala
@@ -26,7 +26,7 @@ import scala.collection.JavaConverters._
 /**
  * TypeInformation for [[Enumeration]] values.
  */
-class EnumValueTypeInfo[E <: Enumeration](enum: E, clazz: Class[E#Value])
+class EnumValueTypeInfo[E <: Enumeration](val enum: E, val clazz: Class[E#Value])
   extends TypeInformation[E#Value] with AtomicType[E#Value] {
 
   type T = E#Value
@@ -49,4 +49,22 @@ class EnumValueTypeInfo[E <: Enumeration](enum: E, clazz: Class[E#Value])
   }
 
   override def toString = clazz.getCanonicalName
+
+  override def hashCode(): Int = {
+    31 * enum.hashCode() + clazz.hashCode()
+  }
+
+  override def equals(obj: Any): Boolean = {
+    obj match {
+      case enumValueTypeInfo: EnumValueTypeInfo[E] =>
+        enumValueTypeInfo.canEqual(this) &&
+          enum.equals(enumValueTypeInfo.enum) &&
+          clazz.equals(enumValueTypeInfo.clazz)
+      case _ => false
+    }
+  }
+
+  override def canEqual(obj: Any): Boolean = {
+    obj.isInstanceOf[EnumValueTypeInfo[E]]
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/NothingSerializer.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/NothingSerializer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/NothingSerializer.scala
index 147a060..a6a7954 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/NothingSerializer.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/NothingSerializer.scala
@@ -56,6 +56,17 @@ class NothingSerializer extends TypeSerializer[Any] {
     throw new RuntimeException("This must not be used. You encountered a bug.")
 
   override def equals(obj: Any): Boolean = {
-    obj != null && obj.isInstanceOf[NothingSerializer]
+    obj match {
+      case nothingSerializer: NothingSerializer => nothingSerializer.canEqual(this)
+      case _ => false
+    }
+  }
+
+  override def canEqual(obj: scala.Any): Boolean = {
+    obj.isInstanceOf[NothingSerializer]
+  }
+
+  override def hashCode(): Int = {
+    classOf[NothingSerializer].hashCode()
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala
index 488710d..af2091b 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala
@@ -71,11 +71,18 @@ class OptionSerializer[A](val elemSerializer: TypeSerializer[A])
   override def deserialize(reuse: Option[A], source: DataInputView): Option[A] = deserialize(source)
 
   override def equals(obj: Any): Boolean = {
-    if (obj != null && obj.isInstanceOf[OptionSerializer[_]]) {
-      val other = obj.asInstanceOf[OptionSerializer[_]]
-      other.elemSerializer.equals(elemSerializer)
-    } else {
-      false
+    obj match {
+      case optionSerializer: OptionSerializer[_] =>
+        optionSerializer.canEqual(this) && elemSerializer.equals(optionSerializer.elemSerializer)
+      case _ => false
     }
   }
+
+  override def canEqual(obj: scala.Any): Boolean = {
+    obj.isInstanceOf[OptionSerializer[_]]
+  }
+
+  override def hashCode(): Int = {
+    elemSerializer.hashCode()
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionTypeInfo.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionTypeInfo.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionTypeInfo.scala
index 510b604..2aff2dd 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionTypeInfo.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionTypeInfo.scala
@@ -26,7 +26,7 @@ import scala.collection.JavaConverters._
 /**
  * TypeInformation for [[Option]].
  */
-class OptionTypeInfo[A, T <: Option[A]](elemTypeInfo: TypeInformation[A])
+class OptionTypeInfo[A, T <: Option[A]](private val elemTypeInfo: TypeInformation[A])
   extends TypeInformation[T] {
 
   override def isBasicType: Boolean = false
@@ -49,4 +49,20 @@ class OptionTypeInfo[A, T <: Option[A]](elemTypeInfo: TypeInformation[A])
   }
 
   override def toString = s"Option[$elemTypeInfo]"
+
+  override def equals(obj: Any): Boolean = {
+    obj match {
+      case optTpe: OptionTypeInfo[_, _] =>
+        optTpe.canEqual(this) && elemTypeInfo.equals(optTpe.elemTypeInfo)
+      case _ => false
+    }
+  }
+
+  def canEqual(obj: Any): Boolean = {
+    obj.isInstanceOf[OptionTypeInfo[_, _]]
+  }
+
+  override def hashCode: Int = {
+    elemTypeInfo.hashCode()
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableSerializer.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableSerializer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableSerializer.scala
index 38fd14b..d242863 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableSerializer.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableSerializer.scala
@@ -134,11 +134,18 @@ abstract class TraversableSerializer[T <: TraversableOnce[E], E](
   }
 
   override def equals(obj: Any): Boolean = {
-    if (obj != null && obj.isInstanceOf[TraversableSerializer[_, _]]) {
-      val other = obj.asInstanceOf[TraversableSerializer[_, _]]
-      other.elementSerializer.equals(elementSerializer)
-    } else {
-      false
+    obj match {
+      case other: TraversableSerializer[_, _] =>
+        other.canEqual(this) && elementSerializer.equals(other.elementSerializer)
+      case _ => false
     }
   }
+
+  override def hashCode(): Int = {
+    elementSerializer.hashCode()
+  }
+
+  override def canEqual(obj: Any): Boolean = {
+    obj.isInstanceOf[TraversableSerializer[_, _]]
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableTypeInfo.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableTypeInfo.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableTypeInfo.scala
index 76067bb..8948b0c 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableTypeInfo.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableTypeInfo.scala
@@ -23,13 +23,11 @@ import org.apache.flink.api.common.typeutils.TypeSerializer
 
 import scala.collection.JavaConverters._
 
-import scala.collection.generic.CanBuildFrom
-
 /**
  * TypeInformation for Scala Collections.
  */
 abstract class TraversableTypeInfo[T <: TraversableOnce[E], E](
-    clazz: Class[T],
+    val clazz: Class[T],
     val elementTypeInfo: TypeInformation[E])
   extends TypeInformation[T] {
 
@@ -41,17 +39,25 @@ abstract class TraversableTypeInfo[T <: TraversableOnce[E], E](
   override def getTypeClass: Class[T] = clazz
   override def getGenericParameters = List[TypeInformation[_]](elementTypeInfo).asJava
 
-
   def createSerializer(executionConfig: ExecutionConfig): TypeSerializer[T]
 
   override def equals(other: Any): Boolean = {
-    if (other.isInstanceOf[TraversableTypeInfo[_, _]]) {
-      val otherTrav = other.asInstanceOf[TraversableTypeInfo[_, _]]
-      otherTrav.getTypeClass == getTypeClass && otherTrav.elementTypeInfo == elementTypeInfo
-    } else {
-      false
+    other match {
+      case traversable: TraversableTypeInfo[_, _] =>
+        traversable.canEqual(this) &&
+        clazz == traversable.clazz &&
+        elementTypeInfo.equals(traversable.elementTypeInfo)
+      case _ => false
     }
   }
 
+  override def hashCode(): Int = {
+    31 * clazz.hashCode() + elementTypeInfo.hashCode()
+  }
+
+  override def canEqual(obj: Any): Boolean = {
+    obj.isInstanceOf[TraversableTypeInfo[_, _]]
+  }
+
   override def toString = s"$clazz[$elementTypeInfo]"
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TrySerializer.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TrySerializer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TrySerializer.scala
index b7ceadf..99aae21 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TrySerializer.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TrySerializer.scala
@@ -27,7 +27,9 @@ import scala.util.{Success, Try, Failure}
 /**
  * Serializer for [[scala.util.Try]].
  */
-class TrySerializer[A](val elemSerializer: TypeSerializer[A], executionConfig: ExecutionConfig)
+class TrySerializer[A](
+    private val elemSerializer: TypeSerializer[A],
+    private val executionConfig: ExecutionConfig)
   extends TypeSerializer[Try[A]] {
 
   override def duplicate: TrySerializer[A] = this
@@ -80,11 +82,18 @@ class TrySerializer[A](val elemSerializer: TypeSerializer[A], executionConfig: E
   override def deserialize(reuse: Try[A], source: DataInputView): Try[A] = deserialize(source)
 
   override def equals(obj: Any): Boolean = {
-    if (obj != null && obj.isInstanceOf[TrySerializer[_]]) {
-      val other = obj.asInstanceOf[TrySerializer[_]]
-      other.elemSerializer.equals(elemSerializer)
-    } else {
-      false
+    obj match {
+      case other: TrySerializer[_] =>
+        other.canEqual(this) && elemSerializer.equals(other.elemSerializer)
+      case _ => false
     }
   }
+
+  override def canEqual(obj: Any): Boolean = {
+    obj.isInstanceOf[TrySerializer[_]]
+  }
+
+  override def hashCode(): Int = {
+    31 * elemSerializer.hashCode() + executionConfig.hashCode()
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TryTypeInfo.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TryTypeInfo.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TryTypeInfo.scala
index 3749b37..f3f2ce2 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TryTypeInfo.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TryTypeInfo.scala
@@ -28,7 +28,7 @@ import scala.util.Try
 /**
  * TypeInformation for [[scala.util.Try]].
  */
-class TryTypeInfo[A, T <: Try[A]](elemTypeInfo: TypeInformation[A])
+class TryTypeInfo[A, T <: Try[A]](val elemTypeInfo: TypeInformation[A])
   extends TypeInformation[T] {
 
   override def isBasicType: Boolean = false
@@ -49,5 +49,22 @@ class TryTypeInfo[A, T <: Try[A]](elemTypeInfo: TypeInformation[A])
     }
   }
 
+  override def equals(obj: Any): Boolean = {
+    obj match {
+      case tryTypeInfo: TryTypeInfo[_, _] =>
+        tryTypeInfo.canEqual(this) &&
+        elemTypeInfo.equals(tryTypeInfo.elemTypeInfo)
+      case _ => false
+    }
+  }
+
+  override def canEqual(obj: Any): Boolean = {
+    obj.isInstanceOf[TryTypeInfo[_, _]]
+  }
+
+  override def hashCode(): Int = {
+    elemTypeInfo.hashCode()
+  }
+
   override def toString = s"Try[$elemTypeInfo]"
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/CaseClassTypeInfoTest.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/CaseClassTypeInfoTest.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/CaseClassTypeInfoTest.scala
new file mode 100644
index 0000000..479483f
--- /dev/null
+++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/CaseClassTypeInfoTest.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.typeutils
+
+import org.apache.flink.api.common.ExecutionConfig
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.api.common.typeutils.TypeSerializer
+import org.apache.flink.util.TestLogger
+import org.junit.Test
+import org.scalatest.junit.JUnitSuiteLike
+
+class CaseClassTypeInfoTest extends TestLogger with JUnitSuiteLike {
+
+  @Test
+  def testCaseClassTypeInfoEquality(): Unit = {
+    val tpeInfo1 = new CaseClassTypeInfo[Tuple2[Int, String]](
+      classOf[Tuple2[Int, String]],
+      Array(),
+      Array(BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO),
+      Array("_1", "_2")) {
+      override def createSerializer(config: ExecutionConfig): TypeSerializer[(Int, String)] = ???
+    }
+
+    val tpeInfo2 = new CaseClassTypeInfo[Tuple2[Int, String]](
+      classOf[Tuple2[Int, String]],
+      Array(),
+      Array(BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO),
+      Array("_1", "_2")) {
+      override def createSerializer(config: ExecutionConfig): TypeSerializer[(Int, String)] = ???
+    }
+
+    assert(tpeInfo1.equals(tpeInfo2))
+    assert(tpeInfo1.hashCode() == tpeInfo2.hashCode())
+  }
+
+  @Test
+  def testCaseClassTypeInfoInequality(): Unit = {
+    val tpeInfo1 = new CaseClassTypeInfo[Tuple2[Int, String]](
+      classOf[Tuple2[Int, String]],
+      Array(),
+      Array(BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO),
+      Array("_1", "_2")) {
+      override def createSerializer(config: ExecutionConfig): TypeSerializer[(Int, String)] = ???
+    }
+
+    val tpeInfo2 = new CaseClassTypeInfo[Tuple2[Int, Boolean]](
+      classOf[Tuple2[Int, Boolean]],
+      Array(),
+      Array(BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.BOOLEAN_TYPE_INFO),
+      Array("_1", "_2")) {
+      override def createSerializer(config: ExecutionConfig): TypeSerializer[(Int, Boolean)] = ???
+    }
+
+    assert(!tpeInfo1.equals(tpeInfo2))
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/EitherTypeInfoTest.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/EitherTypeInfoTest.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/EitherTypeInfoTest.scala
new file mode 100644
index 0000000..e23a6a0
--- /dev/null
+++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/EitherTypeInfoTest.scala
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.typeutils
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.util.TestLogger
+import org.junit.Test
+import org.scalatest.junit.JUnitSuiteLike
+
+class EitherTypeInfoTest extends TestLogger with JUnitSuiteLike {
+
+  @Test
+  def testEitherTypeEquality(): Unit = {
+    val eitherTypeInfo1 = new EitherTypeInfo[Integer, String, Either[Integer, String]](
+      classOf[Either[Integer,String]],
+      BasicTypeInfo.INT_TYPE_INFO,
+      BasicTypeInfo.STRING_TYPE_INFO
+    )
+    val eitherTypeInfo2 = new EitherTypeInfo[Integer, String, Either[Integer, String]](
+      classOf[Either[Integer,String]],
+      BasicTypeInfo.INT_TYPE_INFO,
+      BasicTypeInfo.STRING_TYPE_INFO
+    )
+
+    assert(eitherTypeInfo1.equals(eitherTypeInfo2))
+    assert(eitherTypeInfo1.hashCode() == eitherTypeInfo2.hashCode())
+  }
+
+  @Test
+  def testEitherTypeInequality(): Unit = {
+    val eitherTypeInfo1 = new EitherTypeInfo[Integer, Integer, Either[Integer, Integer]](
+      classOf[Either[Integer,Integer]],
+      BasicTypeInfo.INT_TYPE_INFO,
+      BasicTypeInfo.INT_TYPE_INFO
+    )
+    val eitherTypeInfo2 = new EitherTypeInfo[Integer, String, Either[Integer, String]](
+      classOf[Either[Integer,String]],
+      BasicTypeInfo.INT_TYPE_INFO,
+      BasicTypeInfo.STRING_TYPE_INFO
+    )
+    assert(!eitherTypeInfo1.equals(eitherTypeInfo2))
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/EnumValueTypeInfoTest.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/EnumValueTypeInfoTest.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/EnumValueTypeInfoTest.scala
new file mode 100644
index 0000000..acd6a39
--- /dev/null
+++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/EnumValueTypeInfoTest.scala
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.typeutils
+
+import org.apache.flink.api.scala.typeutils.AlternateEnumeration.AlternateEnumeration
+import org.apache.flink.api.scala.typeutils.TestEnumeration.TestEnumeration
+import org.apache.flink.util.TestLogger
+import org.junit.Test
+import org.scalatest.junit.JUnitSuiteLike
+
+class EnumValueTypeInfoTest extends TestLogger with JUnitSuiteLike {
+
+  @Test
+  def testEnumValueTypeInfoEquality(): Unit = {
+    val enumTypeInfo1 = new EnumValueTypeInfo[TestEnumeration.type](
+      TestEnumeration,
+      classOf[TestEnumeration])
+    val enumTypeInfo2 = new EnumValueTypeInfo[TestEnumeration.type](
+      TestEnumeration,
+      classOf[TestEnumeration])
+
+    assert(enumTypeInfo1.equals(enumTypeInfo2))
+    assert(enumTypeInfo1.hashCode() == enumTypeInfo2.hashCode())
+  }
+
+  @Test
+  def testEnumValueTypeInfoInequality(): Unit = {
+    val enumTypeInfo1 = new EnumValueTypeInfo[TestEnumeration.type](
+      TestEnumeration,
+      classOf[TestEnumeration])
+    val enumTypeInfo2 = new EnumValueTypeInfo[AlternateEnumeration.type](
+      AlternateEnumeration,
+      classOf[AlternateEnumeration])
+
+    assert(!enumTypeInfo1.equals(enumTypeInfo2))
+  }
+
+}
+
+object TestEnumeration extends Enumeration {
+  type TestEnumeration = Value
+  val ONE = this.Value
+}
+
+object AlternateEnumeration extends Enumeration {
+  type AlternateEnumeration = Value
+  val TWO = this.Value
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/OptionTypeInfoTest.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/OptionTypeInfoTest.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/OptionTypeInfoTest.scala
new file mode 100644
index 0000000..b765658
--- /dev/null
+++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/OptionTypeInfoTest.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.typeutils
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.api.java.typeutils.GenericTypeInfo
+import org.apache.flink.util.TestLogger
+import org.junit.Test
+import org.scalatest.junit.{JUnitSuiteLike, JUnitSuite}
+
+class OptionTypeInfoTest extends TestLogger with JUnitSuiteLike {
+
+  @Test
+  def testOptionTypeEquality: Unit = {
+    val optionTypeInfo1 = new OptionTypeInfo[Integer, Option[Integer]](BasicTypeInfo.INT_TYPE_INFO)
+    val optionTypeInfo2 = new OptionTypeInfo[Integer, Option[Integer]](BasicTypeInfo.INT_TYPE_INFO)
+
+    assert(optionTypeInfo1.equals(optionTypeInfo2))
+    assert(optionTypeInfo1.hashCode == optionTypeInfo2.hashCode)
+  }
+
+  @Test
+  def testOptionTypeInequality: Unit = {
+    val optionTypeInfo1 = new OptionTypeInfo[Integer, Option[Integer]](BasicTypeInfo.INT_TYPE_INFO)
+    val optionTypeInfo2 = new OptionTypeInfo[String, Option[String]](BasicTypeInfo.STRING_TYPE_INFO)
+
+    assert(!optionTypeInfo1.equals(optionTypeInfo2))
+  }
+
+  @Test
+  def testOptionTypeInequalityWithDifferentType: Unit = {
+    val optionTypeInfo = new OptionTypeInfo[Integer, Option[Integer]](BasicTypeInfo.INT_TYPE_INFO)
+    val genericTypeInfo = new GenericTypeInfo[Double](Double.getClass.asInstanceOf[Class[Double]])
+
+    assert(!optionTypeInfo.equals(genericTypeInfo))
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/TraversableTypeInfoTest.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/TraversableTypeInfoTest.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/TraversableTypeInfoTest.scala
new file mode 100644
index 0000000..e83b326
--- /dev/null
+++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/TraversableTypeInfoTest.scala
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.typeutils
+
+import org.apache.flink.api.common.ExecutionConfig
+import org.apache.flink.api.common.typeinfo.{TypeInformation, BasicTypeInfo}
+import org.apache.flink.api.common.typeutils.TypeSerializer
+import org.apache.flink.util.TestLogger
+import org.junit.Test
+import org.scalatest.junit.JUnitSuiteLike
+
+class TraversableTypeInfoTest extends TestLogger with JUnitSuiteLike {
+
+  @Test
+  def testTraversableTypeInfoEquality(): Unit = {
+    val tpeInfo1 = new TraversableTypeInfo[Seq[Int], Int](
+      classOf[Seq[Int]],
+      BasicTypeInfo.INT_TYPE_INFO.asInstanceOf[TypeInformation[Int]]) {
+      override def createSerializer(executionConfig: ExecutionConfig): TypeSerializer[Seq[Int]] =
+        ???
+    }
+
+    val tpeInfo2 = new TraversableTypeInfo[Seq[Int], Int](
+      classOf[Seq[Int]],
+      BasicTypeInfo.INT_TYPE_INFO.asInstanceOf[TypeInformation[Int]]) {
+      override def createSerializer(executionConfig: ExecutionConfig): TypeSerializer[Seq[Int]] =
+        ???
+    }
+
+    assert(tpeInfo1.equals(tpeInfo2))
+  }
+
+  @Test
+  def testTraversableTypeInfoInequality(): Unit = {
+    val tpeInfo1 = new TraversableTypeInfo[Seq[Int], Int](
+      classOf[Seq[Int]],
+      BasicTypeInfo.INT_TYPE_INFO.asInstanceOf[TypeInformation[Int]]) {
+      override def createSerializer(executionConfig: ExecutionConfig): TypeSerializer[Seq[Int]] =
+        ???
+    }
+
+    val tpeInfo2 = new TraversableTypeInfo[List[Int], Int](
+      classOf[List[Int]],
+      BasicTypeInfo.INT_TYPE_INFO.asInstanceOf[TypeInformation[Int]]) {
+      override def createSerializer(executionConfig: ExecutionConfig): TypeSerializer[List[Int]] =
+        ???
+    }
+
+    assert(!tpeInfo1.equals(tpeInfo2))
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/TryTypeInfoTest.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/TryTypeInfoTest.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/TryTypeInfoTest.scala
new file mode 100644
index 0000000..3a5fc80
--- /dev/null
+++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/TryTypeInfoTest.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.typeutils
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.api.java.typeutils.GenericTypeInfo
+import org.apache.flink.util.TestLogger
+import org.junit.Test
+import org.scalatest.junit.JUnitSuiteLike
+
+import scala.util.Try
+
+class TryTypeInfoTest extends TestLogger with JUnitSuiteLike {
+
+  @Test
+  def testTryTypeEquality: Unit = {
+    val TryTypeInfo1 = new TryTypeInfo[Integer, Try[Integer]](BasicTypeInfo.INT_TYPE_INFO)
+    val TryTypeInfo2 = new TryTypeInfo[Integer, Try[Integer]](BasicTypeInfo.INT_TYPE_INFO)
+
+    assert(TryTypeInfo1.equals(TryTypeInfo2))
+    assert(TryTypeInfo1.hashCode == TryTypeInfo2.hashCode)
+  }
+
+  @Test
+  def testTryTypeInequality: Unit = {
+    val TryTypeInfo1 = new TryTypeInfo[Integer, Try[Integer]](BasicTypeInfo.INT_TYPE_INFO)
+    val TryTypeInfo2 = new TryTypeInfo[String, Try[String]](BasicTypeInfo.STRING_TYPE_INFO)
+
+    assert(!TryTypeInfo1.equals(TryTypeInfo2))
+  }
+
+  @Test
+  def testTryTypeInequalityWithDifferentType: Unit = {
+    val TryTypeInfo = new TryTypeInfo[Integer, Try[Integer]](BasicTypeInfo.INT_TYPE_INFO)
+    val genericTypeInfo = new GenericTypeInfo[Double](Double.getClass.asInstanceOf[Class[Double]])
+
+    assert(!TryTypeInfo.equals(genericTypeInfo))
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/StreamWindowSerializer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/StreamWindowSerializer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/StreamWindowSerializer.java
index 229cb4a..529850f 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/StreamWindowSerializer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/StreamWindowSerializer.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.api.windowing;
 
 import java.io.IOException;
 
+import com.google.common.base.Preconditions;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -37,6 +38,8 @@ public final class StreamWindowSerializer<T> extends TypeSerializer<StreamWindow
 	TypeSerializer<Boolean> boolSerializer = BooleanSerializer.INSTANCE;
 
 	public StreamWindowSerializer(TypeInformation<T> typeInfo, ExecutionConfig conf) {
+		Preconditions.checkNotNull(typeInfo);
+
 		this.typeSerializer = typeInfo.createSerializer(conf);
 	}
 
@@ -118,6 +121,27 @@ public final class StreamWindowSerializer<T> extends TypeSerializer<StreamWindow
 	}
 
 	@Override
+	public boolean equals(Object obj) {
+		if (obj instanceof StreamWindowSerializer) {
+			StreamWindowSerializer<?> other = (StreamWindowSerializer<?>) obj;
+
+			return other.canEqual(this) && typeSerializer.equals(other.typeSerializer);
+		} else {
+			return false;
+		}
+	}
+
+	@Override
+	public boolean canEqual(Object obj) {
+		return obj instanceof StreamWindowSerializer;
+	}
+
+	@Override
+	public int hashCode() {
+		return typeSerializer.hashCode();
+	}
+
+	@Override
 	public TypeSerializer<StreamWindow<T>> duplicate() {
 		return this;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/StreamWindowTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/StreamWindowTypeInfo.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/StreamWindowTypeInfo.java
index 3bcc253..2c0a999 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/StreamWindowTypeInfo.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/StreamWindowTypeInfo.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.streaming.api.windowing;
 
+import com.google.common.base.Preconditions;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -25,10 +26,11 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 public class StreamWindowTypeInfo<T> extends TypeInformation<StreamWindow<T>> {
 
 	private static final long serialVersionUID = 1L;
-	TypeInformation<T> innerType;
+
+	final TypeInformation<T> innerType;
 
 	public StreamWindowTypeInfo(TypeInformation<T> innerType) {
-		this.innerType = innerType;
+		this.innerType = Preconditions.checkNotNull(innerType);
 	}
 
 	public TypeInformation<T> getInnerType() {
@@ -50,10 +52,10 @@ public class StreamWindowTypeInfo<T> extends TypeInformation<StreamWindow<T>> {
 		return innerType.getArity();
 	}
 
+	@SuppressWarnings("unchecked")
 	@Override
 	public Class<StreamWindow<T>> getTypeClass() {
-		// TODO Auto-generated method stub
-		return null;
+		return (Class<StreamWindow<T>>)(Object)StreamWindow.class;
 	}
 
 	@Override
@@ -67,6 +69,34 @@ public class StreamWindowTypeInfo<T> extends TypeInformation<StreamWindow<T>> {
 	}
 
 	@Override
+	public String toString() {
+		return getClass().getSimpleName() + "<" + innerType + ">";
+	}
+
+	@Override
+	public boolean equals(Object obj) {
+		if (obj instanceof StreamWindowTypeInfo) {
+			@SuppressWarnings("unchecked")
+			StreamWindowTypeInfo<T> streamWindowTypeInfo = (StreamWindowTypeInfo<T>) obj;
+
+			return streamWindowTypeInfo.canEqual(this) &&
+				innerType.equals(streamWindowTypeInfo.innerType);
+		} else {
+			return false;
+		}
+	}
+
+	@Override
+	public int hashCode() {
+		return innerType.hashCode();
+	}
+
+	@Override
+	public boolean canEqual(Object obj) {
+		return obj instanceof StreamWindowTypeInfo;
+	}
+
+	@Override
 	public int getTotalFields() {
 		return innerType.getTotalFields();
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java
index 156e5d6..d4363cd 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java
@@ -164,4 +164,25 @@ public final class MultiplexingStreamRecordSerializer<T> extends TypeSerializer<
 			typeSerializer.copy(source, target);
 		}
 	}
+
+	@Override
+	public boolean equals(Object obj) {
+		if (obj instanceof MultiplexingStreamRecordSerializer) {
+			MultiplexingStreamRecordSerializer<?> other = (MultiplexingStreamRecordSerializer<?>) obj;
+
+			return other.canEqual(this) && typeSerializer.equals(other.typeSerializer);
+		} else {
+			return false;
+		}
+	}
+
+	@Override
+	public boolean canEqual(Object obj) {
+		return obj instanceof MultiplexingStreamRecordSerializer;
+	}
+
+	@Override
+	public int hashCode() {
+		return typeSerializer.hashCode();
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecordSerializer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecordSerializer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecordSerializer.java
index e58d3c8..d47da50 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecordSerializer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecordSerializer.java
@@ -122,4 +122,25 @@ public final class StreamRecordSerializer<T> extends TypeSerializer<StreamRecord
 	public void copy(DataInputView source, DataOutputView target) throws IOException {
 		typeSerializer.copy(source, target);
 	}
+
+	@Override
+	public boolean equals(Object obj) {
+		if (obj instanceof StreamRecordSerializer) {
+			StreamRecordSerializer<?> other = (StreamRecordSerializer<?>) obj;
+
+			return other.canEqual(this) && typeSerializer.equals(other.typeSerializer);
+		} else {
+			return false;
+		}
+	}
+
+	@Override
+	public boolean canEqual(Object obj) {
+		return obj instanceof StreamRecordSerializer;
+	}
+
+	@Override
+	public int hashCode() {
+		return typeSerializer.hashCode();
+	}
 }