You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/10/14 17:08:10 UTC

[5/8] flink git commit: [hotfix] [scala api] Move tests to correct package

http://git-wip-us.apache.org/repos/asf/flink/blob/6a935978/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/tuple/base/TupleComparatorTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/tuple/base/TupleComparatorTestBase.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/tuple/base/TupleComparatorTestBase.scala
new file mode 100644
index 0000000..e0ac671
--- /dev/null
+++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/tuple/base/TupleComparatorTestBase.scala
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala.runtime.tuple.base
+
+import org.apache.flink.api.common.typeutils.ComparatorTestBase
+import org.apache.flink.api.scala.typeutils.{CaseClassSerializer, CaseClassComparator}
+import org.junit.Assert._
+
+abstract class TupleComparatorTestBase[T <: Product] extends ComparatorTestBase[T] {
+  protected override def deepEquals(message: String, should: T, is: T) {
+    for (i <- 0 until should.productArity) {
+      assertEquals(should.productElement(i), is.productElement(i))
+    }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/6a935978/flink-scala/src/test/scala/org/apache/flink/api/scala/types/TypeInformationGenTest.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/types/TypeInformationGenTest.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/types/TypeInformationGenTest.scala
new file mode 100644
index 0000000..7d13bba
--- /dev/null
+++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/types/TypeInformationGenTest.scala
@@ -0,0 +1,589 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala.types
+
+import org.apache.flink.api.common.typeinfo._
+import org.apache.flink.api.java.io.CollectionInputFormat
+import org.apache.flink.api.java.typeutils.TypeExtractorTest.CustomTuple
+import org.apache.flink.api.java.typeutils._
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.typeutils.{CaseClassTypeInfo, UnitTypeInfo}
+import org.apache.flink.types.{IntValue, StringValue}
+
+import org.junit.{Assert, Test}
+
+case class CustomCaseClass(a: String, b: Int)
+
+case class UmlautCaseClass(ä: String, ß: Int)
+
+class CustomType(var myField1: String, var myField2: Int) {
+  def this() {
+    this(null, 0)
+  }
+}
+
+class MyObject[A](var a: A) {
+  def this() { this(null.asInstanceOf[A]) }
+}
+
+class TypeInformationGenTest {
+
+  @Test
+  def testJavaTuple(): Unit = {
+    val ti = createTypeInformation[org.apache.flink.api.java.tuple.Tuple3[Int, String, Integer]]
+
+    Assert.assertTrue(ti.isTupleType)
+    Assert.assertEquals(3, ti.getArity)
+    Assert.assertTrue(ti.isInstanceOf[TupleTypeInfoBase[_]])
+    val tti = ti.asInstanceOf[TupleTypeInfoBase[_]]
+    Assert.assertEquals(classOf[org.apache.flink.api.java.tuple.Tuple3[_, _, _]], tti.getTypeClass)
+    for (i <- 0 until 3) {
+      Assert.assertTrue(tti.getTypeAt(i).isInstanceOf[BasicTypeInfo[_]])
+    }
+
+    Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, tti.getTypeAt(0))
+    Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, tti.getTypeAt(1))
+    Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, tti.getTypeAt(2))
+  }
+
+  @Test
+  def testCustomJavaTuple(): Unit = {
+    val ti = createTypeInformation[CustomTuple]
+
+    Assert.assertTrue(ti.isTupleType)
+    Assert.assertEquals(2, ti.getArity)
+    Assert.assertTrue(ti.isInstanceOf[TupleTypeInfoBase[_]])
+    val tti = ti.asInstanceOf[TupleTypeInfoBase[_]]
+    Assert.assertEquals(classOf[CustomTuple], tti.getTypeClass)
+    for (i <- 0 until 2) {
+      Assert.assertTrue(tti.getTypeAt(i).isInstanceOf[BasicTypeInfo[_]])
+    }
+
+    Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, tti.getTypeAt(0))
+    Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, tti.getTypeAt(1))
+  }
+
+  @Test
+  def testBasicType(): Unit = {
+    val ti = createTypeInformation[Boolean]
+
+    Assert.assertTrue(ti.isBasicType)
+    Assert.assertEquals(BasicTypeInfo.BOOLEAN_TYPE_INFO, ti)
+    Assert.assertEquals(classOf[java.lang.Boolean], ti.getTypeClass)
+  }
+
+  @Test
+  def testTypeParameters(): Unit = {
+
+    val data = Seq(1.0d, 2.0d)
+
+    def f[T: TypeInformation](data: Seq[T]): (T, Seq[T]) = {
+
+      val ti = createTypeInformation[(T, Seq[T])]
+
+      Assert.assertTrue(ti.isTupleType)
+      val ccti = ti.asInstanceOf[CaseClassTypeInfo[(T, Seq[T])]]
+      Assert.assertEquals(BasicTypeInfo.DOUBLE_TYPE_INFO, ccti.getTypeAt(0))
+
+      (data.head, data)
+    }
+
+    f(data)
+
+  }
+
+  @Test
+  def testGenericArrays(): Unit = {
+
+    class MyObject(var a: Int, var b: String) {
+      def this() = this(0, "")
+    }
+
+    val boolArray = Array(true, false)
+    val byteArray = Array(1.toByte, 2.toByte, 3.toByte)
+    val charArray= Array(1.toChar, 2.toChar, 3.toChar)
+    val shortArray = Array(1.toShort, 2.toShort, 3.toShort)
+    val intArray = Array(1, 2, 3)
+    val longArray = Array(1L, 2L, 3L)
+    val floatArray = Array(1.0f, 2.0f, 3.0f)
+    val doubleArray = Array(1.0, 2.0, 3.0)
+    val stringArray = Array("hey", "there")
+    val objectArray = Array(new MyObject(1, "hey"), new MyObject(2, "there"))
+
+    def getType[T: TypeInformation](arr: Array[T]): TypeInformation[Array[T]] = {
+      createTypeInformation[Array[T]]
+    }
+
+    Assert.assertEquals(
+      PrimitiveArrayTypeInfo.BOOLEAN_PRIMITIVE_ARRAY_TYPE_INFO,
+      getType(boolArray))
+
+    Assert.assertEquals(
+      PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO,
+      getType(byteArray))
+
+    Assert.assertEquals(
+      PrimitiveArrayTypeInfo.CHAR_PRIMITIVE_ARRAY_TYPE_INFO,
+      getType(charArray))
+
+    Assert.assertEquals(
+      PrimitiveArrayTypeInfo.SHORT_PRIMITIVE_ARRAY_TYPE_INFO,
+      getType(shortArray))
+
+    Assert.assertEquals(
+      PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO,
+      getType(intArray))
+
+    Assert.assertEquals(
+      PrimitiveArrayTypeInfo.LONG_PRIMITIVE_ARRAY_TYPE_INFO,
+      getType(longArray))
+
+    Assert.assertEquals(
+      PrimitiveArrayTypeInfo.FLOAT_PRIMITIVE_ARRAY_TYPE_INFO,
+      getType(floatArray))
+
+    Assert.assertEquals(
+      PrimitiveArrayTypeInfo.DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO,
+      getType(doubleArray))
+
+    Assert.assertEquals(
+      BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO,
+      getType(stringArray))
+
+    Assert.assertTrue(getType(objectArray).isInstanceOf[ObjectArrayTypeInfo[_, _]])
+    Assert.assertTrue(
+      getType(objectArray).asInstanceOf[ObjectArrayTypeInfo[_, _]]
+        .getComponentInfo.isInstanceOf[PojoTypeInfo[_]])
+  }
+
+  @Test
+  def testTupleWithBasicTypes(): Unit = {
+    val ti = createTypeInformation[(Int, Long, Double, Float, Boolean, String, Char, Short, Byte)]
+
+    Assert.assertTrue(ti.isTupleType)
+    Assert.assertEquals(9, ti.getArity)
+    Assert.assertTrue(ti.isInstanceOf[TupleTypeInfoBase[_]])
+    val tti = ti.asInstanceOf[TupleTypeInfoBase[_]]
+    Assert.assertEquals(classOf[Tuple9[_,_,_,_,_,_,_,_,_]], tti.getTypeClass)
+    for (i <- 0 until 9) {
+      Assert.assertTrue(tti.getTypeAt(i).isInstanceOf[BasicTypeInfo[_]])
+    }
+
+    Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, tti.getTypeAt(0))
+    Assert.assertEquals(BasicTypeInfo.LONG_TYPE_INFO, tti.getTypeAt(1))
+    Assert.assertEquals(BasicTypeInfo.DOUBLE_TYPE_INFO, tti.getTypeAt(2))
+    Assert.assertEquals(BasicTypeInfo.FLOAT_TYPE_INFO, tti.getTypeAt(3))
+    Assert.assertEquals(BasicTypeInfo.BOOLEAN_TYPE_INFO, tti.getTypeAt(4))
+    Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, tti.getTypeAt(5))
+    Assert.assertEquals(BasicTypeInfo.CHAR_TYPE_INFO, tti.getTypeAt(6))
+    Assert.assertEquals(BasicTypeInfo.SHORT_TYPE_INFO, tti.getTypeAt(7))
+    Assert.assertEquals(BasicTypeInfo.BYTE_TYPE_INFO, tti.getTypeAt(8))
+  }
+
+  @Test
+  def testTupleWithTuples(): Unit = {
+    val ti = createTypeInformation[(Tuple1[String], Tuple1[Int], Tuple2[Long, Long])]
+
+    Assert.assertTrue(ti.isTupleType())
+    Assert.assertEquals(3, ti.getArity)
+    Assert.assertTrue(ti.isInstanceOf[TupleTypeInfoBase[_]])
+    val tti = ti.asInstanceOf[TupleTypeInfoBase[_]]
+    Assert.assertEquals(classOf[Tuple3[_, _, _]], tti.getTypeClass)
+    Assert.assertTrue(tti.getTypeAt(0).isTupleType())
+    Assert.assertTrue(tti.getTypeAt(1).isTupleType())
+    Assert.assertTrue(tti.getTypeAt(2).isTupleType())
+    Assert.assertEquals(classOf[Tuple1[_]], tti.getTypeAt(0).getTypeClass)
+    Assert.assertEquals(classOf[Tuple1[_]], tti.getTypeAt(1).getTypeClass)
+    Assert.assertEquals(classOf[Tuple2[_, _]], tti.getTypeAt(2).getTypeClass)
+    Assert.assertEquals(1, tti.getTypeAt(0).getArity)
+    Assert.assertEquals(1, tti.getTypeAt(1).getArity)
+    Assert.assertEquals(2, tti.getTypeAt(2).getArity)
+    Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO,
+      tti.getTypeAt(0).asInstanceOf[TupleTypeInfoBase[_]].getTypeAt(0))
+    Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO,
+      tti.getTypeAt(1).asInstanceOf[TupleTypeInfoBase[_]].getTypeAt(0))
+    Assert.assertEquals(BasicTypeInfo.LONG_TYPE_INFO,
+      tti.getTypeAt(2).asInstanceOf[TupleTypeInfoBase[_]].getTypeAt(0))
+    Assert.assertEquals(BasicTypeInfo.LONG_TYPE_INFO,
+      tti.getTypeAt(2).asInstanceOf[TupleTypeInfoBase[_]].getTypeAt(1))
+  }
+
+  @Test
+  def testCaseClass(): Unit = {
+    val ti = createTypeInformation[CustomCaseClass]
+
+    Assert.assertTrue(ti.isTupleType)
+    Assert.assertEquals(2, ti.getArity)
+    Assert.assertEquals(
+      BasicTypeInfo.STRING_TYPE_INFO,
+      ti.asInstanceOf[TupleTypeInfoBase[_]].getTypeAt(0))
+    Assert.assertEquals(
+      BasicTypeInfo.INT_TYPE_INFO,
+      ti.asInstanceOf[TupleTypeInfoBase[_]].getTypeAt(1))
+    Assert.assertEquals(
+      classOf[CustomCaseClass],ti.asInstanceOf[TupleTypeInfoBase[_]].getTypeClass())
+  }
+
+  @Test
+  def testCustomType(): Unit = {
+    val ti = createTypeInformation[CustomType]
+
+    Assert.assertFalse(ti.isBasicType)
+    Assert.assertFalse(ti.isTupleType)
+    Assert.assertTrue(ti.isInstanceOf[PojoTypeInfo[_]])
+    Assert.assertEquals(ti.getTypeClass, classOf[CustomType])
+  }
+
+  @Test
+  def testTupleWithCustomType(): Unit = {
+    val ti = createTypeInformation[(Long, CustomType)]
+
+    Assert.assertTrue(ti.isTupleType)
+    Assert.assertEquals(2, ti.getArity)
+    val tti = ti.asInstanceOf[TupleTypeInfoBase[_]]
+    Assert.assertEquals(classOf[Tuple2[_, _]], tti.getTypeClass)
+    Assert.assertEquals(classOf[java.lang.Long], tti.getTypeAt(0).getTypeClass)
+    Assert.assertTrue(tti.getTypeAt(1).isInstanceOf[PojoTypeInfo[_]])
+    Assert.assertEquals(classOf[CustomType], tti.getTypeAt(1).getTypeClass)
+  }
+
+  @Test
+  def testValue(): Unit = {
+    val ti = createTypeInformation[StringValue]
+
+    Assert.assertFalse(ti.isBasicType)
+    Assert.assertFalse(ti.isTupleType)
+    Assert.assertTrue(ti.isInstanceOf[ValueTypeInfo[_]])
+    Assert.assertEquals(ti.getTypeClass, classOf[StringValue])
+    Assert.assertTrue(TypeExtractor.getForClass(classOf[StringValue])
+      .isInstanceOf[ValueTypeInfo[_]])
+    Assert.assertEquals(TypeExtractor.getForClass(classOf[StringValue]).getTypeClass,
+      ti.getTypeClass)
+  }
+
+  @Test
+  def testTupleOfValues(): Unit = {
+    val ti = createTypeInformation[(StringValue, IntValue)]
+    Assert.assertFalse(ti.isBasicType)
+    Assert.assertTrue(ti.isTupleType)
+    Assert.assertEquals(
+      classOf[StringValue],
+      ti.asInstanceOf[TupleTypeInfoBase[_]].getTypeAt(0).getTypeClass)
+    Assert.assertEquals(
+      classOf[IntValue],
+      ti.asInstanceOf[TupleTypeInfoBase[_]].getTypeAt(1).getTypeClass)
+  }
+
+
+  @Test
+  def testBasicArray(): Unit = {
+    val ti = createTypeInformation[Array[String]]
+
+    Assert.assertFalse(ti.isBasicType)
+    Assert.assertFalse(ti.isTupleType)
+    Assert.assertTrue(ti.isInstanceOf[BasicArrayTypeInfo[_, _]] ||
+      ti.isInstanceOf[ObjectArrayTypeInfo[_, _]])
+    if (ti.isInstanceOf[BasicArrayTypeInfo[_, _]]) {
+      Assert.assertEquals(BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO, ti)
+    }
+    else {
+      Assert.assertEquals(
+        BasicTypeInfo.STRING_TYPE_INFO,
+        ti.asInstanceOf[ObjectArrayTypeInfo[_, _]].getComponentInfo)
+    }
+  }
+
+  @Test
+  def testPrimitiveArray(): Unit = {
+    val ti = createTypeInformation[Array[Boolean]]
+
+    Assert.assertTrue(ti.isInstanceOf[PrimitiveArrayTypeInfo[_]])
+    Assert.assertEquals(ti, PrimitiveArrayTypeInfo.BOOLEAN_PRIMITIVE_ARRAY_TYPE_INFO)
+  }
+
+  @Test
+  def testCustomArray(): Unit = {
+    val ti = createTypeInformation[Array[CustomType]]
+    Assert.assertTrue(ti.isInstanceOf[ObjectArrayTypeInfo[_, _]])
+    Assert.assertEquals(
+      classOf[CustomType],
+      ti.asInstanceOf[ObjectArrayTypeInfo[_, _]].getComponentInfo.getTypeClass)
+  }
+
+  @Test
+  def testTupleArray(): Unit = {
+    val ti = createTypeInformation[Array[(String, String)]]
+
+    Assert.assertTrue(ti.isInstanceOf[ObjectArrayTypeInfo[_, _]])
+    val oati = ti.asInstanceOf[ObjectArrayTypeInfo[_, _]]
+    Assert.assertTrue(oati.getComponentInfo.isTupleType)
+    val tti = oati.getComponentInfo.asInstanceOf[TupleTypeInfoBase[_]]
+    Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, tti.getTypeAt(0))
+    Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, tti.getTypeAt(1))
+  }
+  
+  @Test
+  def testMultidimensionalArrays(): Unit = {
+    // Tuple
+    {
+      val ti = createTypeInformation[Array[Array[(String, String)]]]
+    
+      Assert.assertTrue(ti.isInstanceOf[ObjectArrayTypeInfo[_, _]])
+      val oati = ti.asInstanceOf[ObjectArrayTypeInfo[_, _]]
+      Assert.assertTrue(oati.getComponentInfo.isInstanceOf[ObjectArrayTypeInfo[_, _]])
+      val oati2 = oati.getComponentInfo.asInstanceOf[ObjectArrayTypeInfo[_, _]]
+      Assert.assertTrue(oati2.getComponentInfo.isTupleType)
+      val tti = oati2.getComponentInfo.asInstanceOf[TupleTypeInfoBase[_]]
+      Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, tti.getTypeAt(0))
+      Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, tti.getTypeAt(1))
+    }
+    
+    // primitives
+    {
+      val ti = createTypeInformation[Array[Array[Int]]]
+    
+      Assert.assertTrue(ti.isInstanceOf[ObjectArrayTypeInfo[_, _]])
+      val oati = ti.asInstanceOf[ObjectArrayTypeInfo[_, _]]
+      Assert.assertEquals(oati.getComponentInfo,
+        PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO)
+    }
+    
+    // basic types
+    {
+      val ti = createTypeInformation[Array[Array[Integer]]]
+    
+      Assert.assertTrue(ti.isInstanceOf[ObjectArrayTypeInfo[_, _]])
+      val oati = ti.asInstanceOf[ObjectArrayTypeInfo[_, _]]
+      Assert.assertEquals(oati.getComponentInfo, BasicArrayTypeInfo.INT_ARRAY_TYPE_INFO)
+    }
+    
+    // pojo
+    {
+      val ti = createTypeInformation[Array[Array[CustomType]]]
+    
+      Assert.assertTrue(ti.isInstanceOf[ObjectArrayTypeInfo[_, _]])
+      val oati = ti.asInstanceOf[ObjectArrayTypeInfo[_, _]]
+      Assert.assertTrue(oati.getComponentInfo.isInstanceOf[ObjectArrayTypeInfo[_, _]])
+      val oati2 = oati.getComponentInfo.asInstanceOf[ObjectArrayTypeInfo[_, _]]
+      val tti = oati2.getComponentInfo.asInstanceOf[PojoTypeInfo[_]]
+      Assert.assertEquals(classOf[CustomType], tti.getTypeClass())
+    }
+  }
+
+  @Test
+  def testParamertizedCustomObject(): Unit = {
+    val ti = createTypeInformation[MyObject[String]]
+
+    Assert.assertTrue(ti.isInstanceOf[PojoTypeInfo[_]])
+  }
+
+  @Test
+  def testTupleWithPrimitiveArray(): Unit = {
+    val ti = createTypeInformation[(Array[Int], Array[Double], Array[Long],
+      Array[Byte], Array[Char], Array[Float], Array[Short], Array[Boolean],
+      Array[String])]
+
+    val tti = ti.asInstanceOf[TupleTypeInfoBase[_]]
+    Assert.assertEquals(PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO, tti.getTypeAt(0))
+    Assert.assertEquals(PrimitiveArrayTypeInfo.DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO, tti.getTypeAt(1))
+    Assert.assertEquals(PrimitiveArrayTypeInfo.LONG_PRIMITIVE_ARRAY_TYPE_INFO, tti.getTypeAt(2))
+    Assert.assertEquals(PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO, tti.getTypeAt(3))
+    Assert.assertEquals(PrimitiveArrayTypeInfo.CHAR_PRIMITIVE_ARRAY_TYPE_INFO, tti.getTypeAt(4))
+    Assert.assertEquals(PrimitiveArrayTypeInfo.FLOAT_PRIMITIVE_ARRAY_TYPE_INFO, tti.getTypeAt(5))
+    Assert.assertEquals(PrimitiveArrayTypeInfo.SHORT_PRIMITIVE_ARRAY_TYPE_INFO, tti.getTypeAt(6))
+    Assert.assertEquals(PrimitiveArrayTypeInfo.BOOLEAN_PRIMITIVE_ARRAY_TYPE_INFO, tti.getTypeAt(7))
+    Assert.assertEquals(BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO, tti.getTypeAt(8))
+  }
+
+  @Test
+  def testTrait(): Unit = {
+    trait TestTrait {
+      def foo() = 1
+      def bar(x: Int): Int
+    }
+
+    val ti = createTypeInformation[TestTrait]
+
+    Assert.assertTrue(ti.isInstanceOf[GenericTypeInfo[TestTrait]])
+  }
+
+  @Test
+  def testGetFlatFields(): Unit = {
+
+    val tupleTypeInfo = createTypeInformation[(Int, Int, Int, Int)].
+      asInstanceOf[CaseClassTypeInfo[(Int, Int, Int, Int)]]
+    Assert.assertEquals(0, tupleTypeInfo.getFlatFields("0").get(0).getPosition)
+    Assert.assertEquals(1, tupleTypeInfo.getFlatFields("1").get(0).getPosition)
+    Assert.assertEquals(2, tupleTypeInfo.getFlatFields("2").get(0).getPosition)
+    Assert.assertEquals(3, tupleTypeInfo.getFlatFields("3").get(0).getPosition)
+    Assert.assertEquals(0, tupleTypeInfo.getFlatFields("_1").get(0).getPosition)
+    Assert.assertEquals(1, tupleTypeInfo.getFlatFields("_2").get(0).getPosition)
+    Assert.assertEquals(2, tupleTypeInfo.getFlatFields("_3").get(0).getPosition)
+    Assert.assertEquals(3, tupleTypeInfo.getFlatFields("_4").get(0).getPosition)
+
+    val nestedTypeInfo = createTypeInformation[(Int, (Int, String, Long), Int, (Double, Double))].
+      asInstanceOf[CaseClassTypeInfo[(Int, (Int, String, Long), Int, (Double, Double))]]
+    Assert.assertEquals(0, nestedTypeInfo.getFlatFields("0").get(0).getPosition)
+    Assert.assertEquals(1, nestedTypeInfo.getFlatFields("1.0").get(0).getPosition)
+    Assert.assertEquals(2, nestedTypeInfo.getFlatFields("1.1").get(0).getPosition)
+    Assert.assertEquals(3, nestedTypeInfo.getFlatFields("1.2").get(0).getPosition)
+    Assert.assertEquals(4, nestedTypeInfo.getFlatFields("2").get(0).getPosition)
+    Assert.assertEquals(5, nestedTypeInfo.getFlatFields("3.0").get(0).getPosition)
+    Assert.assertEquals(6, nestedTypeInfo.getFlatFields("3.1").get(0).getPosition)
+    Assert.assertEquals(4, nestedTypeInfo.getFlatFields("_3").get(0).getPosition)
+    Assert.assertEquals(5, nestedTypeInfo.getFlatFields("_4._1").get(0).getPosition)
+    Assert.assertEquals(3, nestedTypeInfo.getFlatFields("1").size)
+    Assert.assertEquals(1, nestedTypeInfo.getFlatFields("1").get(0).getPosition)
+    Assert.assertEquals(2, nestedTypeInfo.getFlatFields("1").get(1).getPosition)
+    Assert.assertEquals(3, nestedTypeInfo.getFlatFields("1").get(2).getPosition)
+    Assert.assertEquals(3, nestedTypeInfo.getFlatFields("1.*").size)
+    Assert.assertEquals(1, nestedTypeInfo.getFlatFields("1.*").get(0).getPosition)
+    Assert.assertEquals(2, nestedTypeInfo.getFlatFields("1.*").get(1).getPosition)
+    Assert.assertEquals(3, nestedTypeInfo.getFlatFields("1.*").get(2).getPosition)
+    Assert.assertEquals(2, nestedTypeInfo.getFlatFields("3").size)
+    Assert.assertEquals(5, nestedTypeInfo.getFlatFields("3").get(0).getPosition)
+    Assert.assertEquals(6, nestedTypeInfo.getFlatFields("3").get(1).getPosition)
+    Assert.assertEquals(3, nestedTypeInfo.getFlatFields("_2").size)
+    Assert.assertEquals(1, nestedTypeInfo.getFlatFields("_2").get(0).getPosition)
+    Assert.assertEquals(2, nestedTypeInfo.getFlatFields("_2").get(1).getPosition)
+    Assert.assertEquals(3, nestedTypeInfo.getFlatFields("_2").get(2).getPosition)
+    Assert.assertEquals(2, nestedTypeInfo.getFlatFields("_4").size)
+    Assert.assertEquals(5, nestedTypeInfo.getFlatFields("_4").get(0).getPosition)
+    Assert.assertEquals(6, nestedTypeInfo.getFlatFields("_4").get(1).getPosition)
+    Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO,
+      nestedTypeInfo.getFlatFields("0").get(0).getType)
+    Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO,
+      nestedTypeInfo.getFlatFields("1.1").get(0).getType)
+    Assert.assertEquals(BasicTypeInfo.LONG_TYPE_INFO,
+      nestedTypeInfo.getFlatFields("1").get(2).getType)
+    Assert.assertEquals(BasicTypeInfo.DOUBLE_TYPE_INFO,
+      nestedTypeInfo.getFlatFields("3").get(1).getType)
+
+    val deepNestedTupleTypeInfo = createTypeInformation[(Int, (Int, (Int, Int)), Int)].
+      asInstanceOf[CaseClassTypeInfo[(Int, (Int, (Int, Int)), Int)]]
+    Assert.assertEquals(3, deepNestedTupleTypeInfo.getFlatFields("1").size)
+    Assert.assertEquals(1, deepNestedTupleTypeInfo.getFlatFields("1").get(0).getPosition)
+    Assert.assertEquals(2, deepNestedTupleTypeInfo.getFlatFields("1").get(1).getPosition)
+    Assert.assertEquals(3, deepNestedTupleTypeInfo.getFlatFields("1").get(2).getPosition)
+    Assert.assertEquals(5, deepNestedTupleTypeInfo.getFlatFields("*").size)
+    Assert.assertEquals(0, deepNestedTupleTypeInfo.getFlatFields("*").get(0).getPosition)
+    Assert.assertEquals(1, deepNestedTupleTypeInfo.getFlatFields("*").get(1).getPosition)
+    Assert.assertEquals(2, deepNestedTupleTypeInfo.getFlatFields("*").get(2).getPosition)
+    Assert.assertEquals(3, deepNestedTupleTypeInfo.getFlatFields("*").get(3).getPosition)
+    Assert.assertEquals(4, deepNestedTupleTypeInfo.getFlatFields("*").get(4).getPosition)
+
+    val caseClassTypeInfo = createTypeInformation[CustomCaseClass].
+      asInstanceOf[CaseClassTypeInfo[CustomCaseClass]]
+    Assert.assertEquals(0, caseClassTypeInfo.getFlatFields("a").get(0).getPosition)
+    Assert.assertEquals(1, caseClassTypeInfo.getFlatFields("b").get(0).getPosition)
+    Assert.assertEquals(2, caseClassTypeInfo.getFlatFields("*").size)
+    Assert.assertEquals(0, caseClassTypeInfo.getFlatFields("*").get(0).getPosition)
+    Assert.assertEquals(1, caseClassTypeInfo.getFlatFields("*").get(1).getPosition)
+
+    val caseClassInTupleTypeInfo = createTypeInformation[(Int, UmlautCaseClass)].
+      asInstanceOf[CaseClassTypeInfo[(Int, UmlautCaseClass)]]
+    Assert.assertEquals(1, caseClassInTupleTypeInfo.getFlatFields("_2.ä").get(0).getPosition)
+    Assert.assertEquals(2, caseClassInTupleTypeInfo.getFlatFields("1.ß").get(0).getPosition)
+    Assert.assertEquals(2, caseClassInTupleTypeInfo.getFlatFields("1").size)
+    Assert.assertEquals(1, caseClassInTupleTypeInfo.getFlatFields("1.*").get(0).getPosition)
+    Assert.assertEquals(2, caseClassInTupleTypeInfo.getFlatFields("1").get(1).getPosition)
+    Assert.assertEquals(2, caseClassInTupleTypeInfo.getFlatFields("_2.*").size)
+    Assert.assertEquals(1, caseClassInTupleTypeInfo.getFlatFields("_2.*").get(0).getPosition)
+    Assert.assertEquals(2, caseClassInTupleTypeInfo.getFlatFields("_2").get(1).getPosition)
+    Assert.assertEquals(3, caseClassInTupleTypeInfo.getFlatFields("*").size)
+    Assert.assertEquals(0, caseClassInTupleTypeInfo.getFlatFields("*").get(0).getPosition)
+    Assert.assertEquals(1, caseClassInTupleTypeInfo.getFlatFields("*").get(1).getPosition)
+    Assert.assertEquals(2, caseClassInTupleTypeInfo.getFlatFields("*").get(2).getPosition)
+
+  }
+
+  @Test
+  def testFieldAtStringRef(): Unit = {
+
+    val tupleTypeInfo = createTypeInformation[(Int, Int, Int, Int)].
+      asInstanceOf[CaseClassTypeInfo[(Int, Int, Int, Int)]]
+    Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, tupleTypeInfo.getTypeAt("0"))
+    Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, tupleTypeInfo.getTypeAt("2"))
+    Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, tupleTypeInfo.getTypeAt("_2"))
+    Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, tupleTypeInfo.getTypeAt("_4"))
+
+    val nestedTypeInfo = createTypeInformation[(Int, (Int, String, Long), Int, (Double, Double))].
+      asInstanceOf[CaseClassTypeInfo[(Int, (Int, String, Long), Int, (Double, Double))]]
+    Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, nestedTypeInfo.getTypeAt("0"))
+    Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, nestedTypeInfo.getTypeAt("1.0"))
+    Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, nestedTypeInfo.getTypeAt("1.1"))
+    Assert.assertEquals(BasicTypeInfo.LONG_TYPE_INFO, nestedTypeInfo.getTypeAt("1.2"))
+    Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, nestedTypeInfo.getTypeAt("2"))
+    Assert.assertEquals(BasicTypeInfo.DOUBLE_TYPE_INFO, nestedTypeInfo.getTypeAt("3.0"))
+    Assert.assertEquals(BasicTypeInfo.DOUBLE_TYPE_INFO, nestedTypeInfo.getTypeAt("3.1"))
+    Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, nestedTypeInfo.getTypeAt("_3"))
+    Assert.assertEquals(BasicTypeInfo.DOUBLE_TYPE_INFO, nestedTypeInfo.getTypeAt("_4._1"))
+    Assert.assertEquals(createTypeInformation[(Int, String, Long)], nestedTypeInfo.getTypeAt("1"))
+    Assert.assertEquals(createTypeInformation[(Double, Double)], nestedTypeInfo.getTypeAt("3"))
+    Assert.assertEquals(createTypeInformation[(Int, String, Long)], nestedTypeInfo.getTypeAt("_2"))
+    Assert.assertEquals(createTypeInformation[(Double, Double)], nestedTypeInfo.getTypeAt("_4"))
+
+    val deepNestedTupleTypeInfo = createTypeInformation[(Int, (Int, (Int, Int)), Int)].
+      asInstanceOf[CaseClassTypeInfo[(Int, (Int, (Int, Int)), Int)]]
+    Assert.assertEquals(createTypeInformation[(Int, (Int, Int))],
+      deepNestedTupleTypeInfo.getTypeAt("1"))
+
+    val umlautCaseClassTypeInfo = createTypeInformation[UmlautCaseClass].
+      asInstanceOf[CaseClassTypeInfo[UmlautCaseClass]]
+    Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, umlautCaseClassTypeInfo.getTypeAt("ä"))
+    Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, umlautCaseClassTypeInfo.getTypeAt("ß"))
+
+    val caseClassTypeInfo = createTypeInformation[CustomCaseClass].
+      asInstanceOf[CaseClassTypeInfo[CustomCaseClass]]
+    val caseClassInTupleTypeInfo = createTypeInformation[(Int, CustomCaseClass)].
+      asInstanceOf[CaseClassTypeInfo[(Int, CustomCaseClass)]]
+    Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, caseClassInTupleTypeInfo.getTypeAt("_2.a"))
+    Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, caseClassInTupleTypeInfo.getTypeAt("1.b"))
+    Assert.assertEquals(caseClassTypeInfo, caseClassInTupleTypeInfo.getTypeAt("1"))
+    Assert.assertEquals(caseClassTypeInfo, caseClassInTupleTypeInfo.getTypeAt("_2"))
+
+  }
+
+  /**
+   * Tests the "implicit val scalaNothingTypeInfo" in
+   * flink-scala/src/main/scala/org/apache/flink/api/scala/package.scala
+   * This does not compile without that line.
+   */
+  @Test
+  def testNothingTypeInfoIsAvailableImplicitly() : Unit = {
+    def g() = {
+
+      def f[O: TypeInformation](x: O): Unit = {}
+
+      f(???) // O will be Nothing
+    }
+    // (Do not call g, because it throws NotImplementedError. This is a compile time test.)
+  }
+
+  @Test
+  def testUnit(): Unit = {
+    val ti = createTypeInformation[Unit]
+    Assert.assertTrue(ti.isInstanceOf[UnitTypeInfo])
+
+    // This checks the condition in checkCollection. If this fails with IllegalArgumentException,
+    // then things like "env.fromElements((),(),())" won't work.
+    import scala.collection.JavaConversions._
+    CollectionInputFormat.checkCollection(Seq((),(),()), (new UnitTypeInfo).getTypeClass())
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/6a935978/flink-tests/src/test/resources/flink_11-kryo_registrations
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/resources/flink_11-kryo_registrations b/flink-tests/src/test/resources/flink_11-kryo_registrations
deleted file mode 100644
index 7000e62..0000000
--- a/flink-tests/src/test/resources/flink_11-kryo_registrations
+++ /dev/null
@@ -1,86 +0,0 @@
-0,int
-1,java.lang.String
-2,float
-3,boolean
-4,byte
-5,char
-6,short
-7,long
-8,double
-9,void
-10,scala.collection.convert.Wrappers$SeqWrapper
-11,scala.collection.convert.Wrappers$IteratorWrapper
-12,scala.collection.convert.Wrappers$MapWrapper
-13,scala.collection.convert.Wrappers$JListWrapper
-14,scala.collection.convert.Wrappers$JMapWrapper
-15,scala.Some
-16,scala.util.Left
-17,scala.util.Right
-18,scala.collection.immutable.Vector
-19,scala.collection.immutable.Set$Set1
-20,scala.collection.immutable.Set$Set2
-21,scala.collection.immutable.Set$Set3
-22,scala.collection.immutable.Set$Set4
-23,scala.collection.immutable.HashSet$HashTrieSet
-24,scala.collection.immutable.Map$Map1
-25,scala.collection.immutable.Map$Map2
-26,scala.collection.immutable.Map$Map3
-27,scala.collection.immutable.Map$Map4
-28,scala.collection.immutable.HashMap$HashTrieMap
-29,scala.collection.immutable.Range$Inclusive
-30,scala.collection.immutable.NumericRange$Inclusive
-31,scala.collection.immutable.NumericRange$Exclusive
-32,scala.collection.mutable.BitSet
-33,scala.collection.mutable.HashMap
-34,scala.collection.mutable.HashSet
-35,scala.collection.convert.Wrappers$IterableWrapper
-36,scala.Tuple1
-37,scala.Tuple2
-38,scala.Tuple3
-39,scala.Tuple4
-40,scala.Tuple5
-41,scala.Tuple6
-42,scala.Tuple7
-43,scala.Tuple8
-44,scala.Tuple9
-45,scala.Tuple10
-46,scala.Tuple11
-47,scala.Tuple12
-48,scala.Tuple13
-49,scala.Tuple14
-50,scala.Tuple15
-51,scala.Tuple16
-52,scala.Tuple17
-53,scala.Tuple18
-54,scala.Tuple19
-55,scala.Tuple20
-56,scala.Tuple21
-57,scala.Tuple22
-58,scala.Tuple1$mcJ$sp
-59,scala.Tuple1$mcI$sp
-60,scala.Tuple1$mcD$sp
-61,scala.Tuple2$mcJJ$sp
-62,scala.Tuple2$mcJI$sp
-63,scala.Tuple2$mcJD$sp
-64,scala.Tuple2$mcIJ$sp
-65,scala.Tuple2$mcII$sp
-66,scala.Tuple2$mcID$sp
-67,scala.Tuple2$mcDJ$sp
-68,scala.Tuple2$mcDI$sp
-69,scala.Tuple2$mcDD$sp
-70,scala.Symbol
-71,scala.reflect.ClassTag
-72,scala.runtime.BoxedUnit
-73,java.util.Arrays$ArrayList
-74,java.util.BitSet
-75,java.util.PriorityQueue
-76,java.util.regex.Pattern
-77,java.sql.Date
-78,java.sql.Time
-79,java.sql.Timestamp
-80,java.net.URI
-81,java.net.InetSocketAddress
-82,java.util.UUID
-83,java.util.Locale
-84,java.text.SimpleDateFormat
-85,org.apache.avro.generic.GenericData$Array

http://git-wip-us.apache.org/repos/asf/flink/blob/6a935978/flink-tests/src/test/scala/org/apache/flink/api/scala/DeltaIterationSanityCheckTest.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/DeltaIterationSanityCheckTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/DeltaIterationSanityCheckTest.scala
deleted file mode 100644
index 2775d09..0000000
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/DeltaIterationSanityCheckTest.scala
+++ /dev/null
@@ -1,169 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala
-
-import org.apache.flink.api.java.io.DiscardingOutputFormat
-import org.junit.Test
-import org.apache.flink.api.common.InvalidProgramException
-
-// Verify that the sanity checking in delta iterations works. We just
-// have a dummy job that is not meant to be executed. Only verify that
-// the join/coGroup inside the iteration is checked.
-class DeltaIterationSanityCheckTest extends Serializable {
-
-  @Test
-  def testCorrectJoinWithSolution1(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val solutionInput = env.fromElements((1, "1"))
-    val worksetInput = env.fromElements((2, "2"))
-
-    val iteration = solutionInput.iterateDelta(worksetInput, 10, Array("_1")) { (s, ws) =>
-      val result = s.join(ws).where("_1").equalTo("_1") { (l, r) => l }
-      (result, ws)
-    }
-
-    iteration.output(new DiscardingOutputFormat[(Int, String)])
-  }
-
-  @Test
-  def testCorrectJoinWithSolution2(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val solutionInput = env.fromElements((1, "1"))
-    val worksetInput = env.fromElements((2, "2"))
-
-    val iteration = solutionInput.iterateDelta(worksetInput, 10, Array("_1")) { (s, ws) =>
-      val result = ws.join(s).where("_1").equalTo("_1") { (l, r) => l }
-      (result, ws)
-    }
-
-    iteration.output(new DiscardingOutputFormat[(Int,String)])
-  }
-
-  @Test(expected = classOf[InvalidProgramException])
-  def testIncorrectJoinWithSolution1(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val solutionInput = env.fromElements((1, "1"))
-    val worksetInput = env.fromElements((2, "2"))
-
-    val iteration = solutionInput.iterateDelta(worksetInput, 10, Array("_1")) { (s, ws) =>
-      val result = s.join(ws).where("_2").equalTo("_2") { (l, r) => l }
-      (result, ws)
-    }
-
-    iteration.output(new DiscardingOutputFormat[(Int,String)])
-  }
-
-  @Test(expected = classOf[InvalidProgramException])
-  def testIncorrectJoinWithSolution2(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val solutionInput = env.fromElements((1, "1"))
-    val worksetInput = env.fromElements((2, "2"))
-
-    val iteration = solutionInput.iterateDelta(worksetInput, 10, Array("_1")) { (s, ws) =>
-      val result = ws.join(s).where("_2").equalTo("_2") { (l, r) => l }
-      (result, ws)
-    }
-
-    iteration.output(new DiscardingOutputFormat[(Int,String)])  
-  }
-
-  @Test(expected = classOf[InvalidProgramException])
-  def testIncorrectJoinWithSolution3(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val solutionInput = env.fromElements((1, "1"))
-    val worksetInput = env.fromElements((2, "2"))
-
-    val iteration = solutionInput.iterateDelta(worksetInput, 10, Array("_2")) { (s, ws) =>
-      val result = ws.join(s).where("_1").equalTo("_1") { (l, r) => l }
-      (result, ws)
-    }
-
-    iteration.output(new DiscardingOutputFormat[(Int,String)])
-   }
-
-  @Test
-  def testCorrectCoGroupWithSolution1(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val solutionInput = env.fromElements((1, "1"))
-    val worksetInput = env.fromElements((2, "2"))
-
-    val iteration = solutionInput.iterateDelta(worksetInput, 10, Array("_1")) { (s, ws) =>
-      val result = s.coGroup(ws).where("_1").equalTo("_1") { (l, r) => l.min }
-      (result, ws)
-    }
-
-    iteration.output(new DiscardingOutputFormat[(Int,String)])
-  }
-
-  @Test
-  def testCorrectCoGroupWithSolution2(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val solutionInput = env.fromElements((1, "1"))
-    val worksetInput = env.fromElements((2, "2"))
-
-    val iteration = solutionInput.iterateDelta(worksetInput, 10, Array("_1")) { (s, ws) =>
-      val result = ws.coGroup(s).where("_1").equalTo("_1") { (l, r) => l.min }
-      (result, ws)
-    }
-
-    iteration.output(new DiscardingOutputFormat[(Int,String)])
-  }
-
-  @Test(expected = classOf[InvalidProgramException])
-  def testIncorrectCoGroupWithSolution1(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val solutionInput = env.fromElements((1, "1"))
-    val worksetInput = env.fromElements((2, "2"))
-
-    val iteration = solutionInput.iterateDelta(worksetInput, 10, Array("_1")) { (s, ws) =>
-      val result = s.coGroup(ws).where("_2").equalTo("_2") { (l, r) => l.min }
-      (result, ws)
-    }
-
-    iteration.output(new DiscardingOutputFormat[(Int,String)])
-  }
-
-  @Test(expected = classOf[InvalidProgramException])
-  def testIncorrectCoGroupWithSolution2(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val solutionInput = env.fromElements((1, "1"))
-    val worksetInput = env.fromElements((2, "2"))
-
-    val iteration = solutionInput.iterateDelta(worksetInput, 10, Array("_1")) { (s, ws) =>
-      val result = ws.coGroup(s).where("_2").equalTo("_2") { (l, r) => l.min }
-      (result, ws)
-    }
-
-    iteration.output(new DiscardingOutputFormat[(Int,String)])  
-  }
-
-  @Test(expected = classOf[InvalidProgramException])
-  def testIncorrectCoGroupWithSolution3(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val solutionInput = env.fromElements((1, "1"))
-    val worksetInput = env.fromElements((2, "2"))
-
-    val iteration = solutionInput.iterateDelta(worksetInput, 10, Array("_2")) { (s, ws) =>
-      val result = ws.coGroup(s).where("_1").equalTo("_1") { (l, r) => l.min }
-      (result, ws)
-    }
-
-    iteration.output(new DiscardingOutputFormat[(Int,String)])
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/6a935978/flink-tests/src/test/scala/org/apache/flink/api/scala/functions/SemanticPropertiesTranslationTest.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/functions/SemanticPropertiesTranslationTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/functions/SemanticPropertiesTranslationTest.scala
deleted file mode 100644
index 16c826f..0000000
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/functions/SemanticPropertiesTranslationTest.scala
+++ /dev/null
@@ -1,282 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.scala.functions
-
-import org.apache.flink.api.java.io.DiscardingOutputFormat
-import org.junit.Assert._
-import org.apache.flink.api.common.functions.RichJoinFunction
-import org.apache.flink.api.common.functions.RichMapFunction
-import org.apache.flink.api.common.operators.{GenericDataSinkBase, SingleInputSemanticProperties}
-import org.apache.flink.api.common.operators.base.{InnerJoinOperatorBase, MapOperatorBase}
-import org.apache.flink.api.common.operators.util.FieldSet
-import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields
-import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst
-import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsSecond
-import org.junit.Test
-
-import org.apache.flink.api.scala._
-
-/**
- * This is a minimal test to verify that semantic annotations are evaluated against
- * the type information properly translated correctly to the common data flow API.
- *
- * This covers only the constant fields annotations currently !!!
- */
-class SemanticPropertiesTranslationTest {
-  /**
-   * A mapper that preserves all fields over a tuple data set.
-   */
-  @Test
-  def translateUnaryFunctionAnnotationTuplesWildCard(): Unit = {
-    try {
-      val env = ExecutionEnvironment.getExecutionEnvironment
-
-      val input = env.fromElements((3L, "test", 42))
-      input.map(new WildcardForwardMapper[(Long, String, Int)])
-        .output(new DiscardingOutputFormat[(Long, String, Int)])
-
-      val plan = env.createProgramPlan()
-
-      val sink: GenericDataSinkBase[_] = plan.getDataSinks.iterator.next
-
-      val mapper: MapOperatorBase[_, _, _] = sink.getInput.asInstanceOf[MapOperatorBase[_, _, _]]
-
-      val semantics: SingleInputSemanticProperties = mapper.getSemanticProperties
-      val fw1: FieldSet = semantics.getForwardingTargetFields(0, 0)
-      val fw2: FieldSet = semantics.getForwardingTargetFields(0, 1)
-      val fw3: FieldSet = semantics.getForwardingTargetFields(0, 2)
-
-      assertNotNull(fw1)
-      assertNotNull(fw2)
-      assertNotNull(fw3)
-      assertTrue(fw1.contains(0))
-      assertTrue(fw2.contains(1))
-      assertTrue(fw3.contains(2))
-    } catch {
-      case e: Exception => {
-        System.err.println(e.getMessage)
-        e.printStackTrace()
-        fail("Exception in test: " + e.getMessage)
-      }
-    }
-  }
-
-  /**
-   * A mapper that preserves fields 0, 1, 2 of a tuple data set.
-   */
-  @Test
-  def translateUnaryFunctionAnnotationTuples1(): Unit = {
-    try {
-      val env = ExecutionEnvironment.getExecutionEnvironment
-
-      val input = env.fromElements((3L, "test", 42))
-      input.map(new IndividualForwardMapper[Long, String, Int])
-        .output(new DiscardingOutputFormat[(Long, String, Int)])
-
-      val plan = env.createProgramPlan()
-
-      val sink: GenericDataSinkBase[_] = plan.getDataSinks.iterator.next
-
-      val mapper: MapOperatorBase[_, _, _] = sink.getInput.asInstanceOf[MapOperatorBase[_, _, _]]
-
-      val semantics: SingleInputSemanticProperties = mapper.getSemanticProperties
-      val fw1: FieldSet = semantics.getForwardingTargetFields(0, 0)
-      val fw2: FieldSet = semantics.getForwardingTargetFields(0, 1)
-      val fw3: FieldSet = semantics.getForwardingTargetFields(0, 2)
-
-      assertNotNull(fw1)
-      assertNotNull(fw2)
-      assertNotNull(fw3)
-      assertTrue(fw1.contains(0))
-      assertTrue(fw2.contains(1))
-      assertTrue(fw3.contains(2))
-    } catch {
-      case e: Exception => {
-        System.err.println(e.getMessage)
-        e.printStackTrace()
-        fail("Exception in test: " + e.getMessage)
-      }
-    }
-  }
-
-  /**
-   * A mapper that preserves field 1 of a tuple data set.
-   */
-  @Test
-  def translateUnaryFunctionAnnotationTuples2(): Unit = {
-    try {
-      val env = ExecutionEnvironment.getExecutionEnvironment
-
-      val input = env.fromElements((3L, "test", 42))
-      input.map(new FieldTwoForwardMapper[Long, String, Int])
-        .output(new DiscardingOutputFormat[(Long, String, Int)])
-
-      val plan = env.createProgramPlan()
-
-      val sink: GenericDataSinkBase[_] = plan.getDataSinks.iterator.next
-
-      val mapper: MapOperatorBase[_, _, _] = sink.getInput.asInstanceOf[MapOperatorBase[_, _, _]]
-
-      val semantics: SingleInputSemanticProperties = mapper.getSemanticProperties
-      val fw1: FieldSet = semantics.getForwardingTargetFields(0, 0)
-      val fw2: FieldSet = semantics.getForwardingTargetFields(0, 1)
-      val fw3: FieldSet = semantics.getForwardingTargetFields(0, 2)
-
-      assertNotNull(fw1)
-      assertNotNull(fw2)
-      assertNotNull(fw3)
-      assertTrue(fw1.size == 0)
-      assertTrue(fw3.size == 0)
-      assertTrue(fw2.contains(1))
-    } catch {
-      case e: Exception => {
-        System.err.println(e.getMessage)
-        e.printStackTrace()
-        fail("Exception in test: " + e.getMessage)
-      }
-    }
-  }
-
-  /**
-   * A join that preserves tuple fields from both sides.
-   */
-  @Test
-  def translateBinaryFunctionAnnotationTuples1(): Unit = {
-    try {
-      val env = ExecutionEnvironment.getExecutionEnvironment
-
-      val input1 = env.fromElements((3L, "test"))
-      val input2 = env.fromElements((3L, 3.1415))
-
-      input1.join(input2).where(0).equalTo(0)(
-        new ForwardingTupleJoin[Long, String, Long, Double])
-        .output(new DiscardingOutputFormat[(String, Long)])
-
-      val plan = env.createProgramPlan()
-      val sink: GenericDataSinkBase[_] = plan.getDataSinks.iterator.next
-
-      val join: InnerJoinOperatorBase[_, _, _, _] =
-        sink.getInput.asInstanceOf[InnerJoinOperatorBase[_, _, _, _]]
-
-      val semantics = join.getSemanticProperties
-      val fw11: FieldSet = semantics.getForwardingTargetFields(0, 0)
-      val fw12: FieldSet = semantics.getForwardingTargetFields(0, 1)
-      val fw21: FieldSet = semantics.getForwardingTargetFields(1, 0)
-      val fw22: FieldSet = semantics.getForwardingTargetFields(1, 1)
-
-      assertNotNull(fw11)
-      assertNotNull(fw21)
-      assertNotNull(fw12)
-      assertNotNull(fw22)
-      assertEquals(0, fw11.size)
-      assertEquals(0, fw22.size)
-      assertTrue(fw12.contains(0))
-      assertTrue(fw21.contains(1))
-    }
-    catch {
-      case e: Exception => {
-        System.err.println(e.getMessage)
-        e.printStackTrace()
-        fail("Exception in test: " + e.getMessage)
-      }
-    }
-  }
-
-  /**
-   * A join that preserves tuple fields from both sides.
-   */
-  @Test
-  def translateBinaryFunctionAnnotationTuples2(): Unit = {
-    try {
-      val env = ExecutionEnvironment.getExecutionEnvironment
-
-      val input1 = env.fromElements((3L, "test"))
-      val input2 = env.fromElements((3L, 42))
-
-      input1.join(input2).where(0).equalTo(0)(
-        new ForwardingBasicJoin[(Long, String), (Long, Int)])
-        .output(new DiscardingOutputFormat[((Long, String), (Long, Int))])
-
-      val plan = env.createProgramPlan()
-      val sink: GenericDataSinkBase[_] = plan.getDataSinks.iterator.next
-
-      val join: InnerJoinOperatorBase[_, _, _, _] =
-        sink.getInput.asInstanceOf[InnerJoinOperatorBase[_, _, _, _]]
-
-      val semantics = join.getSemanticProperties
-      val fw11: FieldSet = semantics.getForwardingTargetFields(0, 0)
-      val fw12: FieldSet = semantics.getForwardingTargetFields(0, 1)
-      val fw21: FieldSet = semantics.getForwardingTargetFields(1, 0)
-      val fw22: FieldSet = semantics.getForwardingTargetFields(1, 1)
-
-      assertNotNull(fw11)
-      assertNotNull(fw12)
-      assertNotNull(fw21)
-      assertNotNull(fw22)
-      assertTrue(fw11.contains(0))
-      assertTrue(fw12.contains(1))
-      assertTrue(fw21.contains(2))
-      assertTrue(fw22.contains(3))
-    }
-    catch {
-      case e: Exception => {
-        System.err.println(e.getMessage)
-        e.printStackTrace()
-        fail("Exception in test: " + e.getMessage)
-      }
-    }
-  }
-}
-
-
-@ForwardedFields(Array("*"))
-class WildcardForwardMapper[T] extends RichMapFunction[T, T] {
-  def map(value: T): T = {
-    value
-  }
-}
-
-@ForwardedFields(Array("0;1;2"))
-class IndividualForwardMapper[X, Y, Z] extends RichMapFunction[(X, Y, Z), (X, Y, Z)] {
-  def map(value: (X, Y, Z)): (X, Y, Z) = {
-    value
-  }
-}
-
-@ForwardedFields(Array("_2"))
-class FieldTwoForwardMapper[X, Y, Z] extends RichMapFunction[(X, Y, Z), (X, Y, Z)] {
-  def map(value: (X, Y ,Z)): (X, Y, Z) = {
-    value
-  }
-}
-
-@ForwardedFieldsFirst(Array("_2 -> _1"))
-@ForwardedFieldsSecond(Array("_1 -> _2"))
-class ForwardingTupleJoin[A, B, C, D] extends RichJoinFunction[(A, B),  (C, D), (B, C)] {
-  def join(first: (A, B), second: (C, D)): (B, C) = {
-    (first._2, second._1)
-  }
-}
-
-@ForwardedFieldsFirst(Array("* -> 0.*"))
-@ForwardedFieldsSecond(Array("* -> 1.*"))
-class ForwardingBasicJoin[A, B] extends RichJoinFunction[A, B, (A, B)] {
-  def join(first: A, second: B): (A, B) = {
-    (first, second)
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/6a935978/flink-tests/src/test/scala/org/apache/flink/api/scala/io/CollectionInputFormatTest.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/io/CollectionInputFormatTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/io/CollectionInputFormatTest.scala
deleted file mode 100644
index 92575f5..0000000
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/io/CollectionInputFormatTest.scala
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.scala.io
-
-import org.apache.flink.api.common.ExecutionConfig
-import org.apache.flink.api.java.io.CollectionInputFormat
-import org.junit.Assert.assertEquals
-import org.junit.Assert.assertNotNull
-import org.junit.Assert.assertTrue
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo
-import org.apache.flink.core.io.GenericInputSplit
-import org.junit.Test
-import java.io.ByteArrayInputStream
-import java.io.ByteArrayOutputStream
-import java.io.ObjectInputStream
-import java.io.ObjectOutputStream
-import org.apache.flink.api.scala._
-import scala.collection.JavaConverters._
-
-class ElementType(val id: Int) {
-  def this() {
-    this(-1)
-  }
-
-  override def equals(obj: Any): Boolean = {
-    if (obj != null && obj.isInstanceOf[ElementType]) {
-      val et = obj.asInstanceOf[ElementType]
-      et.id == this.id
-    }
-    else {
-      false
-    }
-  }
-}
-
-class CollectionInputFormatTest {
-
-  @Test
-  def testSerializability(): Unit = {
-
-    val inputCollection = Seq(new ElementType(1), new ElementType(2), new ElementType(3))
-    val info = createTypeInformation[ElementType]
-
-    val inputFormat: CollectionInputFormat[ElementType] = {
-      new CollectionInputFormat[ElementType](
-        inputCollection.asJava,
-        info.createSerializer(new ExecutionConfig))
-    }
-
-    val buffer = new ByteArrayOutputStream
-    val out = new ObjectOutputStream(buffer)
-
-    out.writeObject(inputFormat)
-
-    val in = new ObjectInputStream(new ByteArrayInputStream(buffer.toByteArray))
-    val serializationResult: AnyRef = in.readObject
-
-    assertNotNull(serializationResult)
-    assertTrue(serializationResult.isInstanceOf[CollectionInputFormat[_]])
-
-    val result = serializationResult.asInstanceOf[CollectionInputFormat[ElementType]]
-    val inputSplit = new GenericInputSplit(0, 1)
-    inputFormat.open(inputSplit)
-    result.open(inputSplit)
-
-    while (!inputFormat.reachedEnd && !result.reachedEnd) {
-      val expectedElement = inputFormat.nextRecord(null)
-      val actualElement = result.nextRecord(null)
-      assertEquals(expectedElement, actualElement)
-    }
-  }
-
-  @Test
-  def testSerializabilityStrings(): Unit = {
-    val data = Seq("To bey or not to be,--that is the question:--",
-      "Whether 'tis nobler in the mind to suffer", "The slings and arrows of outrageous fortune",
-      "Or to take arms against a sea of troubles,", "And by opposing end them?--To die," +
-        "--to sleep,--", "No more; and by a sleep to say we end", "The heartache, " +
-        "and the thousand natural shocks", "That flesh is heir to,--'tis a consummation",
-      "Devoutly to be wish'd. To die,--to sleep;--", "To sleep! perchance to dream:--ay, " +
-        "there's the rub;", "For in that sleep of death what dreams may come,",
-      "When we have shuffled off this mortal coil,", "Must give us pause: there's the respect",
-      "That makes calamity of so long life;", "For who would bear the whips and scorns of time,",
-      "The oppressor's wrong, the proud man's contumely,", "The pangs of despis'd love, " +
-        "the law's delay,", "The insolence of office, and the spurns",
-      "That patient merit of the unworthy takes,", "When he himself might his quietus make",
-      "With a bare bodkin? who would these fardels bear,", "To grunt and sweat under a weary " +
-        "life,", "But that the dread of something after death,--", "The undiscover'd country, " +
-        "from whose bourn", "No traveller returns,--puzzles the will,",
-      "And makes us rather bear those ills we have", "Than fly to others that we know not of?",
-      "Thus conscience does make cowards of us all;", "And thus the native hue of resolution",
-      "Is sicklied o'er with the pale cast of thought;", "And enterprises of great pith and " +
-        "moment,", "With this regard, their currents turn awry,", "And lose the name of action" +
-        ".--Soft you now!", "The fair Ophelia!--Nymph, in thy orisons",
-      "Be all my sins remember'd.")
-
-    val inputFormat = new CollectionInputFormat[String](
-      data.asJava,
-      BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig))
-    val baos = new ByteArrayOutputStream
-    val oos = new ObjectOutputStream(baos)
-
-    oos.writeObject(inputFormat)
-    oos.close()
-
-    val bais = new ByteArrayInputStream(baos.toByteArray)
-    val ois = new ObjectInputStream(bais)
-    val result: AnyRef = ois.readObject
-
-    assertTrue(result.isInstanceOf[CollectionInputFormat[_]])
-    var i: Int = 0
-    val in = result.asInstanceOf[CollectionInputFormat[String]]
-    in.open(new GenericInputSplit(0, 1))
-
-    while (!in.reachedEnd) {
-      assertEquals(data(i), in.nextRecord(""))
-      i += 1
-    }
-    assertEquals(data.length, i)
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/6a935978/flink-tests/src/test/scala/org/apache/flink/api/scala/io/CsvInputFormatTest.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/io/CsvInputFormatTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/io/CsvInputFormatTest.scala
deleted file mode 100644
index 925ee78..0000000
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/io/CsvInputFormatTest.scala
+++ /dev/null
@@ -1,537 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.scala.io
-
-import java.io.{File, FileOutputStream, FileWriter, OutputStreamWriter}
-
-import org.apache.flink.api.java.io.PojoCsvInputFormat
-import org.apache.flink.api.java.io.TupleCsvInputFormat
-import org.apache.flink.api.java.io.CsvInputFormatTest.TwitterPOJO
-import org.apache.flink.api.java.typeutils.PojoTypeInfo
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
-import org.apache.flink.configuration.Configuration
-import org.apache.flink.core.fs.{FileInputSplit, Path}
-import org.junit.Assert.{assertEquals, assertNotNull, assertNull, assertTrue, fail}
-import org.junit.Test
-
-import scala.collection.mutable.ArrayBuffer
-
-class CsvInputFormatTest {
-
-  private final val PATH: Path = new Path("an/ignored/file/")
-  private final val FIRST_PART: String = "That is the first part"
-  private final val SECOND_PART: String = "That is the second part"
-
-
-
-  @Test
-  def ignoreSingleCharPrefixComments():Unit = {
-    try {
-      val fileContent = "#description of the data\n" +
-                        "#successive commented line\n" +
-                        "this is|1|2.0|\n" +
-                        "a test|3|4.0|\n" +
-                        "#next|5|6.0|\n"
-      val split = createTempFile(fileContent)
-      val format = new TupleCsvInputFormat[(String, Integer, Double)](
-        PATH,
-        createTypeInformation[(String, Integer, Double)]
-          .asInstanceOf[CaseClassTypeInfo[(String, Integer, Double)]])
-      format.setDelimiter("\n")
-      format.setFieldDelimiter("|")
-      format.setCommentPrefix("#")
-      val parameters = new Configuration
-      format.configure(parameters)
-      format.open(split)
-      var result: (String, Integer, Double) = null
-      result = format.nextRecord(result)
-      assertNotNull(result)
-      assertEquals("this is", result._1)
-      assertEquals(new Integer(1), result._2)
-      assertEquals(2.0, result._3, 0.0001)
-      result = format.nextRecord(result)
-      assertNotNull(result)
-      assertEquals("a test", result._1)
-      assertEquals(new Integer(3), result._2)
-      assertEquals(4.0, result._3, 0.0001)
-      result = format.nextRecord(result)
-      assertNull(result)
-      assertTrue(format.reachedEnd)
-    }
-    catch {
-      case ex: Exception => {
-        ex.printStackTrace
-        fail("Test failed due to a " + ex.getClass.getName + ": " + ex.getMessage)
-      }
-    }
-  }
-
-  @Test
-  def ignoreMultiCharPrefixComments():Unit = {
-    try {
-      val fileContent = "//description of the data\n" +
-                        "//successive commented line\n" +
-                        "this is|1|2.0|\n" +
-                        "a test|3|4.0|\n" +
-                        "//next|5|6.0|\n"
-      val split = createTempFile(fileContent)
-      val format = new TupleCsvInputFormat[(String, Integer, Double)](
-        PATH,
-        createTypeInformation[(String, Integer, Double)]
-          .asInstanceOf[CaseClassTypeInfo[(String, Integer, Double)]])
-      format.setDelimiter("\n")
-      format.setFieldDelimiter("|")
-      format.setCommentPrefix("//")
-      val parameters = new Configuration
-      format.configure(parameters)
-      format.open(split)
-      var result: (String, Integer, Double) = null
-      result = format.nextRecord(result)
-      assertNotNull(result)
-      assertEquals("this is", result._1)
-      assertEquals(new Integer(1), result._2)
-      assertEquals(2.0, result._3, 0.0001)
-      result = format.nextRecord(result)
-      assertNotNull(result)
-      assertEquals("a test", result._1)
-      assertEquals(new Integer(3), result._2)
-      assertEquals(4.0, result._3, 0.0001)
-      result = format.nextRecord(result)
-      assertNull(result)
-      assertTrue(format.reachedEnd)
-    }
-    catch {
-      case ex: Exception => {
-        ex.printStackTrace
-        fail("Test failed due to a " + ex.getClass.getName + ": " + ex.getMessage)
-      }
-    }
-  }
-
-  @Test
-  def readStringFields():Unit = {
-    try {
-      val fileContent = "abc|def|ghijk\nabc||hhg\n|||"
-      val split = createTempFile(fileContent)
-      val format = new TupleCsvInputFormat[(String, String, String)](
-        PATH,
-        createTypeInformation[(String, String, String)]
-          .asInstanceOf[CaseClassTypeInfo[(String, String, String)]])
-      format.setDelimiter("\n")
-      format.setFieldDelimiter("|")
-      val parameters = new Configuration
-      format.configure(parameters)
-      format.open(split)
-      var result: (String, String, String) = null
-      result = format.nextRecord(result)
-      assertNotNull(result)
-      assertEquals("abc", result._1)
-      assertEquals("def", result._2)
-      assertEquals("ghijk", result._3)
-      result = format.nextRecord(result)
-      assertNotNull(result)
-      assertEquals("abc", result._1)
-      assertEquals("", result._2)
-      assertEquals("hhg", result._3)
-      result = format.nextRecord(result)
-      assertNotNull(result)
-      assertEquals("", result._1)
-      assertEquals("", result._2)
-      assertEquals("", result._3)
-      result = format.nextRecord(result)
-      assertNull(result)
-      assertTrue(format.reachedEnd)
-    }
-    catch {
-      case ex: Exception => {
-        ex.printStackTrace()
-        fail("Test failed due to a " + ex.getClass.getName + ": " + ex.getMessage)
-      }
-    }
-  }
-
-  @Test
-  def readMixedQuotedStringFields():Unit = {
-    try {
-      val fileContent = "abc|\"de|f\"|ghijk\n\"a|bc\"||hhg\n|||"
-      val split = createTempFile(fileContent)
-      val format = new TupleCsvInputFormat[(String, String, String)](
-        PATH,
-        createTypeInformation[(String, String, String)]
-          .asInstanceOf[CaseClassTypeInfo[(String, String, String)]])
-      format.setDelimiter("\n")
-      format.enableQuotedStringParsing('"')
-      format.setFieldDelimiter("|")
-      val parameters = new Configuration
-      format.configure(parameters)
-      format.open(split)
-      var result: (String, String, String) = null
-      result = format.nextRecord(result)
-      assertNotNull(result)
-      assertEquals("abc", result._1)
-      assertEquals("de|f", result._2)
-      assertEquals("ghijk", result._3)
-      result = format.nextRecord(result)
-      assertNotNull(result)
-      assertEquals("a|bc", result._1)
-      assertEquals("", result._2)
-      assertEquals("hhg", result._3)
-      result = format.nextRecord(result)
-      assertNotNull(result)
-      assertEquals("", result._1)
-      assertEquals("", result._2)
-      assertEquals("", result._3)
-      result = format.nextRecord(result)
-      assertNull(result)
-      assertTrue(format.reachedEnd)
-    }
-    catch {
-      case ex: Exception => {
-        ex.printStackTrace()
-        fail("Test failed due to a " + ex.getClass.getName + ": " + ex.getMessage)
-      }
-    }
-  }
-
-  @Test
-  def readStringFieldsWithTrailingDelimiters(): Unit = {
-    try {
-      val fileContent = "abc|-def|-ghijk\nabc|-|-hhg\n|-|-|-\n"
-      val split = createTempFile(fileContent)
-      val format = new TupleCsvInputFormat[(String, String, String)](
-        PATH,
-        createTypeInformation[(String, String, String)]
-          .asInstanceOf[CaseClassTypeInfo[(String, String, String)]])
-      format.setDelimiter("\n")
-      format.setFieldDelimiter("|-")
-      val parameters = new Configuration
-      format.configure(parameters)
-      format.open(split)
-      var result: (String, String, String) = null
-      result = format.nextRecord(result)
-      assertNotNull(result)
-      assertEquals("abc", result._1)
-      assertEquals("def", result._2)
-      assertEquals("ghijk", result._3)
-      result = format.nextRecord(result)
-      assertNotNull(result)
-      assertEquals("abc", result._1)
-      assertEquals("", result._2)
-      assertEquals("hhg", result._3)
-      result = format.nextRecord(result)
-      assertNotNull(result)
-      assertEquals("", result._1)
-      assertEquals("", result._2)
-      assertEquals("", result._3)
-      result = format.nextRecord(result)
-      assertNull(result)
-      assertTrue(format.reachedEnd)
-    }
-    catch {
-      case ex: Exception =>
-        ex.printStackTrace()
-        fail("Test failed due to a " + ex.getClass.getName + ": " + ex.getMessage)
-    }
-  }
-
-  @Test
-  def testIntegerFields(): Unit = {
-    try {
-      val fileContent = "111|222|333|444|555\n666|777|888|999|000|\n"
-      val split = createTempFile(fileContent)
-      val format = new TupleCsvInputFormat[(Int, Int, Int, Int, Int)](
-        PATH, createTypeInformation[(Int, Int, Int, Int, Int)].
-          asInstanceOf[CaseClassTypeInfo[(Int, Int, Int, Int, Int)]])
-      format.setFieldDelimiter("|")
-      format.configure(new Configuration)
-      format.open(split)
-      var result: (Int, Int, Int, Int, Int) = null
-      result = format.nextRecord(result)
-      assertNotNull(result)
-      assertEquals(Integer.valueOf(111), result._1)
-      assertEquals(Integer.valueOf(222), result._2)
-      assertEquals(Integer.valueOf(333), result._3)
-      assertEquals(Integer.valueOf(444), result._4)
-      assertEquals(Integer.valueOf(555), result._5)
-      result = format.nextRecord(result)
-      assertNotNull(result)
-      assertEquals(Integer.valueOf(666), result._1)
-      assertEquals(Integer.valueOf(777), result._2)
-      assertEquals(Integer.valueOf(888), result._3)
-      assertEquals(Integer.valueOf(999), result._4)
-      assertEquals(Integer.valueOf(0), result._5)
-      result = format.nextRecord(result)
-      assertNull(result)
-      assertTrue(format.reachedEnd)
-    }
-    catch {
-      case ex: Exception =>
-        fail("Test failed due to a " + ex.getClass.getName + ": " + ex.getMessage)
-    }
-  }
-
-  @Test
-  def testReadFirstN(): Unit = {
-    try {
-      val fileContent = "111|x|222|x|333|x|444|x|555|x|\n" +
-        "666|x|777|x|888|x|999|x|000|x|\n"
-      val split = createTempFile(fileContent)
-      val format = new TupleCsvInputFormat[(Int, Int)](
-        PATH,
-        createTypeInformation[(Int, Int)].asInstanceOf[CaseClassTypeInfo[(Int, Int)]])
-      format.setFieldDelimiter("|x|")
-      format.configure(new Configuration)
-      format.open(split)
-      var result: (Int, Int) = null
-      result = format.nextRecord(result)
-      assertNotNull(result)
-      assertEquals(Integer.valueOf(111), result._1)
-      assertEquals(Integer.valueOf(222), result._2)
-      result = format.nextRecord(result)
-      assertNotNull(result)
-      assertEquals(Integer.valueOf(666), result._1)
-      assertEquals(Integer.valueOf(777), result._2)
-      result = format.nextRecord(result)
-      assertNull(result)
-      assertTrue(format.reachedEnd)
-    }
-    catch {
-      case ex: Exception =>
-        fail("Test failed due to a " + ex.getClass.getName + ": " + ex.getMessage)
-    }
-  }
-  @Test
-  def testReadSparseWithPositionSetter(): Unit = {
-    try {
-      val fileContent: String = "111|222|333|444|555|666|777|888|999|000|\n000|999|888|777|666" +
-        "|555|444|333|222|111|"
-      val split = createTempFile(fileContent)
-      val format = new TupleCsvInputFormat[(Int, Int, Int)](
-        PATH,
-        createTypeInformation[(Int, Int, Int)].asInstanceOf[CaseClassTypeInfo[(Int, Int, Int)]],
-        Array(0, 3, 7))
-      format.setFieldDelimiter("|")
-      format.configure(new Configuration)
-      format.open(split)
-      var result: (Int, Int, Int) = null
-      result = format.nextRecord(result)
-      assertNotNull(result)
-      assertEquals(Integer.valueOf(111), result._1)
-      assertEquals(Integer.valueOf(444), result._2)
-      assertEquals(Integer.valueOf(888), result._3)
-      result = format.nextRecord(result)
-      assertNotNull(result)
-      assertEquals(Integer.valueOf(0), result._1)
-      assertEquals(Integer.valueOf(777), result._2)
-      assertEquals(Integer.valueOf(333), result._3)
-      result = format.nextRecord(result)
-      assertNull(result)
-      assertTrue(format.reachedEnd)
-    }
-    catch {
-      case ex: Exception =>
-        fail("Test failed due to a " + ex.getClass.getName + ": " + ex.getMessage)
-    }
-  }
-
-  private def createTempFile(content: String): FileInputSplit = {
-    val tempFile = File.createTempFile("test_contents", "tmp")
-    tempFile.deleteOnExit()
-    val wrt = new FileWriter(tempFile)
-    wrt.write(content)
-    wrt.close()
-    new FileInputSplit(0, new Path(tempFile.toURI.toString), 0,
-      tempFile.length,Array[String]("localhost"))
-  }
-
-  @Test
-  def testWindowsLineEndRemoval(): Unit = {
-    this.testRemovingTrailingCR("\n", "\n")
-    this.testRemovingTrailingCR("\r\n", "\r\n")
-    this.testRemovingTrailingCR("\r\n", "\n")
-  }
-
-  private def testRemovingTrailingCR(lineBreakerInFile: String, lineBreakerSetup: String) {
-    var tempFile: File = null
-    val fileContent = FIRST_PART + lineBreakerInFile + SECOND_PART + lineBreakerInFile
-    try {
-      tempFile = File.createTempFile("CsvInputFormatTest", "tmp")
-      tempFile.deleteOnExit()
-      tempFile.setWritable(true)
-      val wrt = new OutputStreamWriter(new FileOutputStream(tempFile))
-      wrt.write(fileContent)
-      wrt.close()
-      val inputFormat = new TupleCsvInputFormat[Tuple1[String]](new Path(tempFile.toURI.toString),
-        createTypeInformation[Tuple1[String]].asInstanceOf[CaseClassTypeInfo[Tuple1[String]]])
-      val parameters = new Configuration
-      inputFormat.configure(parameters)
-      inputFormat.setDelimiter(lineBreakerSetup)
-      val splits = inputFormat.createInputSplits(1)
-      inputFormat.open(splits(0))
-      var result = inputFormat.nextRecord(null)
-      assertNotNull("Expecting to not return null", result)
-      assertEquals(FIRST_PART, result._1)
-      result = inputFormat.nextRecord(result)
-      assertNotNull("Expecting to not return null", result)
-      assertEquals(SECOND_PART, result._1)
-    }
-    catch {
-      case t: Throwable =>
-        System.err.println("test failed with exception: " + t.getMessage)
-        t.printStackTrace(System.err)
-        fail("Test erroneous")
-    }
-  }
-
-  class POJOItem(var field1: Int, var field2: String, var field3: Double) {
-    def this() {
-      this(-1, "", -1)
-    }
-  }
-
-  case class CaseClassItem(field1: Int, field2: String, field3: Double)
-
-  private def validatePOJOItem(format: PojoCsvInputFormat[POJOItem]): Unit = {
-    var result = new POJOItem()
-    result = format.nextRecord(result)
-    assertEquals(123, result.field1)
-    assertEquals("HELLO", result.field2)
-    assertEquals(3.123, result.field3, 0.001)
-
-    result = format.nextRecord(result)
-    assertEquals(456, result.field1)
-    assertEquals("ABC", result.field2)
-    assertEquals(1.234, result.field3, 0.001)
-  }
-
-  private def validateCaseClassItem(format: TupleCsvInputFormat[CaseClassItem]): Unit = {
-    var result = format.nextRecord(null)
-    assertEquals(123, result.field1)
-    assertEquals("HELLO", result.field2)
-    assertEquals(3.123, result.field3, 0.001)
-
-    result = format.nextRecord(null)
-    assertEquals(456, result.field1)
-    assertEquals("ABC", result.field2)
-    assertEquals(1.234, result.field3, 0.001)
-  }
-
-  @Test
-  def testPOJOType(): Unit = {
-    val fileContent = "123,HELLO,3.123\n" + "456,ABC,1.234"
-    val tempFile = createTempFile(fileContent)
-    val typeInfo: PojoTypeInfo[POJOItem] = createTypeInformation[POJOItem]
-      .asInstanceOf[PojoTypeInfo[POJOItem]]
-    val format = new PojoCsvInputFormat[POJOItem](PATH, typeInfo)
-
-    format.setDelimiter('\n')
-    format.setFieldDelimiter(",")
-    format.configure(new Configuration)
-    format.open(tempFile)
-
-    validatePOJOItem(format)
-  }
-
-  @Test
-  def testCaseClass(): Unit = {
-    val fileContent = "123,HELLO,3.123\n" + "456,ABC,1.234"
-    val tempFile = createTempFile(fileContent)
-    val typeInfo: CaseClassTypeInfo[CaseClassItem] = 
-      createTypeInformation[CaseClassItem]
-      .asInstanceOf[CaseClassTypeInfo[CaseClassItem]]
-    val format = new TupleCsvInputFormat[CaseClassItem](PATH, typeInfo)
-
-    format.setDelimiter('\n')
-    format.setFieldDelimiter(",")
-    format.configure(new Configuration)
-    format.open(tempFile)
-
-    validateCaseClassItem(format)
-  }
-
-  @Test
-  def testPOJOTypeWithFieldMapping(): Unit = {
-    val fileContent = "HELLO,123,3.123\n" + "ABC,456,1.234"
-    val tempFile = createTempFile(fileContent)
-    val typeInfo: PojoTypeInfo[POJOItem] = createTypeInformation[POJOItem]
-      .asInstanceOf[PojoTypeInfo[POJOItem]]
-    val format = new PojoCsvInputFormat[POJOItem](
-      PATH, typeInfo, Array("field2", "field1", "field3"))
-
-    format.setDelimiter('\n')
-    format.setFieldDelimiter(",")
-    format.configure(new Configuration)
-    format.open(tempFile)
-
-    validatePOJOItem(format)
-  }
-  
-  @Test
-  def testPOJOTypeWithFieldSubsetAndDataSubset(): Unit = {
-    val fileContent = "HELLO,123,NODATA,3.123,NODATA\n" + "ABC,456,NODATA,1.234,NODATA"
-    val tempFile = createTempFile(fileContent)
-    val typeInfo: PojoTypeInfo[POJOItem] = createTypeInformation[POJOItem]
-      .asInstanceOf[PojoTypeInfo[POJOItem]]
-    val format = new PojoCsvInputFormat[POJOItem](
-      PATH, typeInfo, Array("field2", "field1", "field3"), 
-      Array(true, true, false, true, false))
-
-    format.setDelimiter('\n')
-    format.setFieldDelimiter(",")
-    format.configure(new Configuration)
-    format.open(tempFile)
-
-    validatePOJOItem(format)
-  }
-
-  @Test
-  def testPOJOSubclassType(): Unit = {
-    val fileContent = "t1,foobar,tweet2\nt2,barfoo,tweet2"
-    val tempFile = createTempFile(fileContent)
-    val typeInfo: PojoTypeInfo[TwitterPOJO] = createTypeInformation[TwitterPOJO]
-      .asInstanceOf[PojoTypeInfo[TwitterPOJO]]
-    val format = new PojoCsvInputFormat[TwitterPOJO](PATH, typeInfo)
-
-    format.setDelimiter('\n')
-    format.setFieldDelimiter(",")
-    format.configure(new Configuration)
-    format.open(tempFile)
-
-    val expected = for (line <- fileContent.split("\n")) yield {
-      val elements = line.split(",")
-      new TwitterPOJO(elements(0), elements(1), elements(2))
-    }
-
-    val actual = ArrayBuffer[TwitterPOJO]()
-    var readNextElement = true
-
-    while (readNextElement) {
-      val element = format.nextRecord(new TwitterPOJO())
-
-      if (element != null) {
-        actual += element
-      } else {
-        readNextElement = false
-      }
-    }
-
-    assert(expected.sameElements(actual))
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/6a935978/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/CaseClassComparatorTest.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/CaseClassComparatorTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/CaseClassComparatorTest.scala
deleted file mode 100644
index 1b48612..0000000
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/CaseClassComparatorTest.scala
+++ /dev/null
@@ -1,157 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.runtime
-
-import java.util
-import java.util.{ArrayList, List, Random}
-
-import org.apache.flink.api.common.ExecutionConfig
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.common.typeutils.{CompositeType, TypeComparator}
-import org.apache.flink.api.scala._
-import org.apache.flink.core.memory._
-import org.apache.flink.runtime.operators.sort.{NormalizedKeySorter, QuickSort}
-
-import org.junit.Assert._
-import org.junit.Test
-import org.mockito.Mockito
-
-class CaseClassComparatorTest {
-
-  case class CaseTestClass(a: Int, b: Int, c: Int, d: String)
-  
-  @Test
-  def testNormalizedKeyGeneration(): Unit = {
-    try {
-      
-      val typeInfo = implicitly[TypeInformation[CaseTestClass]]
-                                     .asInstanceOf[CompositeType[CaseTestClass]]
-      
-      val serializer = typeInfo.createSerializer(new ExecutionConfig)
-      val comparator = new FailingCompareDeserializedWrapper(
-          typeInfo.createComparator(
-            Array[Int](0, 2),
-            Array[Boolean](true, true),
-            0,
-            new ExecutionConfig))
-      
-      assertTrue(comparator.supportsNormalizedKey())
-      assertEquals(8, comparator.getNormalizeKeyLen())
-      assertFalse(comparator.isNormalizedKeyPrefixOnly(8))
-      
-      // validate the failing mock
-      {
-        val in1 : DataInputView = Mockito.mock(classOf[DataInputView])
-        val in2 : DataInputView = Mockito.mock(classOf[DataInputView])
-        
-        try {
-          comparator.compareSerialized(in1, in2)
-          fail("should throw an exception")
-        }
-        catch {
-          case e: UnsupportedOperationException => // fine
-          case ee: Exception => fail("unexpected exception")
-        }
-      }
-      
-      
-      val numMemSegs = 20
-      val memory : util.List[MemorySegment] = new util.ArrayList[MemorySegment](numMemSegs)
-      for (i <- 1 to numMemSegs) {
-        memory.add(MemorySegmentFactory.allocateUnpooledSegment(32*1024))
-      }
-      
-      val sorter : NormalizedKeySorter[CaseTestClass] = new NormalizedKeySorter[CaseTestClass](
-               serializer, comparator, memory)
-      
-      val rnd = new Random()
-      var moreToGo = true
-      var num = 0
-      
-      while (moreToGo) {
-        val next = CaseTestClass(rnd.nextInt(), rnd.nextInt(), rnd.nextInt(), "")
-        moreToGo = sorter.write(next)
-        num += 1
-      }
-      
-      print(num)
-      
-      new QuickSort().sort(sorter)
-    }
-    catch {
-      case e: Exception => {
-        e.printStackTrace()
-        fail(e.getMessage())
-      }
-    }
-  }
-  
-  class FailingCompareDeserializedWrapper[T](wrapped: TypeComparator[T]) extends TypeComparator[T] {
-
-    def hash(record: T) : Int = wrapped.hash(record)
-
-    def setReference(toCompare: T) = wrapped.setReference(toCompare)
-
-    def equalToReference(candidate: T): Boolean = wrapped.equalToReference(candidate)
-    
-    def compareToReference(referencedComparator: TypeComparator[T]): Int 
-      = wrapped.compareToReference(referencedComparator)
-    
-    override def supportsCompareAgainstReference(): Boolean 
-      = wrapped.supportsCompareAgainstReference()
-    
-    def compare(first: T, second: T): Int = wrapped.compare(first, second)
-    
-    def compareSerialized(firstSource: DataInputView, secondSource: DataInputView): Int = {
-      throw new UnsupportedOperationException("Not Supported")
-    }
-    
-    def supportsNormalizedKey(): Boolean = wrapped.supportsNormalizedKey()
-    
-    def supportsSerializationWithKeyNormalization(): Boolean
-      = wrapped.supportsSerializationWithKeyNormalization()
-    
-    def getNormalizeKeyLen(): Int = wrapped.getNormalizeKeyLen()
-    
-    def isNormalizedKeyPrefixOnly(keyBytes: Int): Boolean
-      = wrapped.isNormalizedKeyPrefixOnly(keyBytes)
-    
-    def putNormalizedKey(record: T, target: MemorySegment, offset: Int, numBytes: Int): Unit
-      = wrapped.putNormalizedKey(record, target, offset, numBytes)
-    
-    def writeWithKeyNormalization(record: T, target: DataOutputView): Unit
-      = wrapped.writeWithKeyNormalization(record, target)
-    
-    def readWithKeyDenormalization(reuse: T, source: DataInputView): T
-      = wrapped.readWithKeyDenormalization(reuse, source)
-    
-    def invertNormalizedKey(): Boolean = wrapped.invertNormalizedKey()
-    
-    def duplicate(): TypeComparator[T] = new FailingCompareDeserializedWrapper(wrapped.duplicate())
-    
-    def extractKeys(record: Object, target: Array[Object], index: Int): Int
-      = wrapped.extractKeys(record, target, index)
-    
-    def getFlatComparators(): Array[TypeComparator[_]] = wrapped.getFlatComparators()
-    
-    override def compareAgainstReference(keys: Array[Comparable[_]]): Int = {
-      throw new UnsupportedOperationException("Workaround hack.")
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/6a935978/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/CaseClassNormalizedKeySortingTest.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/CaseClassNormalizedKeySortingTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/CaseClassNormalizedKeySortingTest.scala
new file mode 100644
index 0000000..ace93a2
--- /dev/null
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/CaseClassNormalizedKeySortingTest.scala
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.runtime
+
+import org.apache.flink.api.common.ExecutionConfig
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.common.typeutils.{CompositeType, TypeComparator}
+import org.apache.flink.api.scala._
+import org.apache.flink.core.memory._
+import org.apache.flink.runtime.operators.sort.{NormalizedKeySorter, QuickSort}
+
+import java.util
+import java.util.Random
+
+import org.junit.Assert._
+import org.junit.Test
+import org.mockito.Mockito
+
+/** Test that verifies that the case class comparators properly
+ * generate normalized keys used during sorting.
+ */
+class CaseClassNormalizedKeySortingTest {
+
+  case class CaseTestClass(a: Int, b: Int, c: Int, d: String)
+
+  @Test
+  def testNormalizedKeyGeneration(): Unit = {
+    val typeInfo = implicitly[TypeInformation[CaseTestClass]]
+      .asInstanceOf[CompositeType[CaseTestClass]]
+
+    val serializer = typeInfo.createSerializer(new ExecutionConfig)
+    val comparator = new FailingCompareDeserializedWrapper(
+      typeInfo.createComparator(
+        Array[Int](0, 2),
+        Array[Boolean](true, true),
+        0,
+        new ExecutionConfig))
+
+    assertTrue(comparator.supportsNormalizedKey())
+    assertEquals(8, comparator.getNormalizeKeyLen())
+    assertFalse(comparator.isNormalizedKeyPrefixOnly(8))
+
+    // validate the failing mock
+    {
+      val in1 : DataInputView = Mockito.mock(classOf[DataInputView])
+      val in2 : DataInputView = Mockito.mock(classOf[DataInputView])
+
+      try {
+        comparator.compareSerialized(in1, in2)
+        fail("should throw an exception")
+      }
+      catch {
+        case e: UnsupportedOperationException => // fine
+        case ee: Exception => fail("unexpected exception")
+      }
+    }
+
+
+    val numMemSegs = 20
+    val memory : util.List[MemorySegment] = new util.ArrayList[MemorySegment](numMemSegs)
+    for (i <- 1 to numMemSegs) {
+      memory.add(MemorySegmentFactory.allocateUnpooledSegment(32*1024))
+    }
+
+    val sorter : NormalizedKeySorter[CaseTestClass] = new NormalizedKeySorter[CaseTestClass](
+      serializer, comparator, memory)
+
+    val rnd = new Random()
+    var moreToGo = true
+    var num = 0
+
+    while (moreToGo) {
+      val next = CaseTestClass(rnd.nextInt(), rnd.nextInt(), rnd.nextInt(), "")
+      moreToGo = sorter.write(next)
+      num += 1
+    }
+
+    new QuickSort().sort(sorter)
+  }
+
+  class FailingCompareDeserializedWrapper[T](wrapped: TypeComparator[T]) extends TypeComparator[T] {
+
+    def hash(record: T) : Int = wrapped.hash(record)
+
+    def setReference(toCompare: T) = wrapped.setReference(toCompare)
+
+    def equalToReference(candidate: T): Boolean = wrapped.equalToReference(candidate)
+
+    def compareToReference(referencedComparator: TypeComparator[T]): Int
+    = wrapped.compareToReference(referencedComparator)
+
+    override def supportsCompareAgainstReference(): Boolean
+    = wrapped.supportsCompareAgainstReference()
+
+    def compare(first: T, second: T): Int = wrapped.compare(first, second)
+
+    def compareSerialized(firstSource: DataInputView, secondSource: DataInputView): Int = {
+      throw new UnsupportedOperationException("Not Supported")
+    }
+
+    def supportsNormalizedKey(): Boolean = wrapped.supportsNormalizedKey()
+
+    def supportsSerializationWithKeyNormalization(): Boolean
+    = wrapped.supportsSerializationWithKeyNormalization()
+
+    def getNormalizeKeyLen(): Int = wrapped.getNormalizeKeyLen()
+
+    def isNormalizedKeyPrefixOnly(keyBytes: Int): Boolean
+    = wrapped.isNormalizedKeyPrefixOnly(keyBytes)
+
+    def putNormalizedKey(record: T, target: MemorySegment, offset: Int, numBytes: Int): Unit
+    = wrapped.putNormalizedKey(record, target, offset, numBytes)
+
+    def writeWithKeyNormalization(record: T, target: DataOutputView): Unit
+    = wrapped.writeWithKeyNormalization(record, target)
+
+    def readWithKeyDenormalization(reuse: T, source: DataInputView): T
+    = wrapped.readWithKeyDenormalization(reuse, source)
+
+    def invertNormalizedKey(): Boolean = wrapped.invertNormalizedKey()
+
+    def duplicate(): TypeComparator[T] = new FailingCompareDeserializedWrapper(wrapped.duplicate())
+
+    def extractKeys(record: Object, target: Array[Object], index: Int): Int
+    = wrapped.extractKeys(record, target, index)
+
+    def getFlatComparators(): Array[TypeComparator[_]] = wrapped.getFlatComparators()
+
+    override def compareAgainstReference(keys: Array[Comparable[_]]): Int = {
+      throw new UnsupportedOperationException("Workaround hack.")
+    }
+  }
+}