You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/10/14 17:08:10 UTC
[5/8] flink git commit: [hotfix] [scala api] Move tests to correct
package
http://git-wip-us.apache.org/repos/asf/flink/blob/6a935978/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/tuple/base/TupleComparatorTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/tuple/base/TupleComparatorTestBase.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/tuple/base/TupleComparatorTestBase.scala
new file mode 100644
index 0000000..e0ac671
--- /dev/null
+++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/tuple/base/TupleComparatorTestBase.scala
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala.runtime.tuple.base
+
+import org.apache.flink.api.common.typeutils.ComparatorTestBase
+import org.apache.flink.api.scala.typeutils.{CaseClassSerializer, CaseClassComparator}
+import org.junit.Assert._
+
+abstract class TupleComparatorTestBase[T <: Product] extends ComparatorTestBase[T] {
+ protected override def deepEquals(message: String, should: T, is: T) {
+ for (i <- 0 until should.productArity) {
+ assertEquals(should.productElement(i), is.productElement(i))
+ }
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/flink/blob/6a935978/flink-scala/src/test/scala/org/apache/flink/api/scala/types/TypeInformationGenTest.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/types/TypeInformationGenTest.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/types/TypeInformationGenTest.scala
new file mode 100644
index 0000000..7d13bba
--- /dev/null
+++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/types/TypeInformationGenTest.scala
@@ -0,0 +1,589 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala.types
+
+import org.apache.flink.api.common.typeinfo._
+import org.apache.flink.api.java.io.CollectionInputFormat
+import org.apache.flink.api.java.typeutils.TypeExtractorTest.CustomTuple
+import org.apache.flink.api.java.typeutils._
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.typeutils.{CaseClassTypeInfo, UnitTypeInfo}
+import org.apache.flink.types.{IntValue, StringValue}
+
+import org.junit.{Assert, Test}
+
+case class CustomCaseClass(a: String, b: Int)
+
+case class UmlautCaseClass(ä: String, ß: Int)
+
+class CustomType(var myField1: String, var myField2: Int) {
+ def this() {
+ this(null, 0)
+ }
+}
+
+class MyObject[A](var a: A) {
+ def this() { this(null.asInstanceOf[A]) }
+}
+
+class TypeInformationGenTest {
+
+ @Test
+ def testJavaTuple(): Unit = {
+ val ti = createTypeInformation[org.apache.flink.api.java.tuple.Tuple3[Int, String, Integer]]
+
+ Assert.assertTrue(ti.isTupleType)
+ Assert.assertEquals(3, ti.getArity)
+ Assert.assertTrue(ti.isInstanceOf[TupleTypeInfoBase[_]])
+ val tti = ti.asInstanceOf[TupleTypeInfoBase[_]]
+ Assert.assertEquals(classOf[org.apache.flink.api.java.tuple.Tuple3[_, _, _]], tti.getTypeClass)
+ for (i <- 0 until 3) {
+ Assert.assertTrue(tti.getTypeAt(i).isInstanceOf[BasicTypeInfo[_]])
+ }
+
+ Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, tti.getTypeAt(0))
+ Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, tti.getTypeAt(1))
+ Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, tti.getTypeAt(2))
+ }
+
+ @Test
+ def testCustomJavaTuple(): Unit = {
+ val ti = createTypeInformation[CustomTuple]
+
+ Assert.assertTrue(ti.isTupleType)
+ Assert.assertEquals(2, ti.getArity)
+ Assert.assertTrue(ti.isInstanceOf[TupleTypeInfoBase[_]])
+ val tti = ti.asInstanceOf[TupleTypeInfoBase[_]]
+ Assert.assertEquals(classOf[CustomTuple], tti.getTypeClass)
+ for (i <- 0 until 2) {
+ Assert.assertTrue(tti.getTypeAt(i).isInstanceOf[BasicTypeInfo[_]])
+ }
+
+ Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, tti.getTypeAt(0))
+ Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, tti.getTypeAt(1))
+ }
+
+ @Test
+ def testBasicType(): Unit = {
+ val ti = createTypeInformation[Boolean]
+
+ Assert.assertTrue(ti.isBasicType)
+ Assert.assertEquals(BasicTypeInfo.BOOLEAN_TYPE_INFO, ti)
+ Assert.assertEquals(classOf[java.lang.Boolean], ti.getTypeClass)
+ }
+
+ @Test
+ def testTypeParameters(): Unit = {
+
+ val data = Seq(1.0d, 2.0d)
+
+ def f[T: TypeInformation](data: Seq[T]): (T, Seq[T]) = {
+
+ val ti = createTypeInformation[(T, Seq[T])]
+
+ Assert.assertTrue(ti.isTupleType)
+ val ccti = ti.asInstanceOf[CaseClassTypeInfo[(T, Seq[T])]]
+ Assert.assertEquals(BasicTypeInfo.DOUBLE_TYPE_INFO, ccti.getTypeAt(0))
+
+ (data.head, data)
+ }
+
+ f(data)
+
+ }
+
+ @Test
+ def testGenericArrays(): Unit = {
+
+ class MyObject(var a: Int, var b: String) {
+ def this() = this(0, "")
+ }
+
+ val boolArray = Array(true, false)
+ val byteArray = Array(1.toByte, 2.toByte, 3.toByte)
+ val charArray= Array(1.toChar, 2.toChar, 3.toChar)
+ val shortArray = Array(1.toShort, 2.toShort, 3.toShort)
+ val intArray = Array(1, 2, 3)
+ val longArray = Array(1L, 2L, 3L)
+ val floatArray = Array(1.0f, 2.0f, 3.0f)
+ val doubleArray = Array(1.0, 2.0, 3.0)
+ val stringArray = Array("hey", "there")
+ val objectArray = Array(new MyObject(1, "hey"), new MyObject(2, "there"))
+
+ def getType[T: TypeInformation](arr: Array[T]): TypeInformation[Array[T]] = {
+ createTypeInformation[Array[T]]
+ }
+
+ Assert.assertEquals(
+ PrimitiveArrayTypeInfo.BOOLEAN_PRIMITIVE_ARRAY_TYPE_INFO,
+ getType(boolArray))
+
+ Assert.assertEquals(
+ PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO,
+ getType(byteArray))
+
+ Assert.assertEquals(
+ PrimitiveArrayTypeInfo.CHAR_PRIMITIVE_ARRAY_TYPE_INFO,
+ getType(charArray))
+
+ Assert.assertEquals(
+ PrimitiveArrayTypeInfo.SHORT_PRIMITIVE_ARRAY_TYPE_INFO,
+ getType(shortArray))
+
+ Assert.assertEquals(
+ PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO,
+ getType(intArray))
+
+ Assert.assertEquals(
+ PrimitiveArrayTypeInfo.LONG_PRIMITIVE_ARRAY_TYPE_INFO,
+ getType(longArray))
+
+ Assert.assertEquals(
+ PrimitiveArrayTypeInfo.FLOAT_PRIMITIVE_ARRAY_TYPE_INFO,
+ getType(floatArray))
+
+ Assert.assertEquals(
+ PrimitiveArrayTypeInfo.DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO,
+ getType(doubleArray))
+
+ Assert.assertEquals(
+ BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO,
+ getType(stringArray))
+
+ Assert.assertTrue(getType(objectArray).isInstanceOf[ObjectArrayTypeInfo[_, _]])
+ Assert.assertTrue(
+ getType(objectArray).asInstanceOf[ObjectArrayTypeInfo[_, _]]
+ .getComponentInfo.isInstanceOf[PojoTypeInfo[_]])
+ }
+
+ @Test
+ def testTupleWithBasicTypes(): Unit = {
+ val ti = createTypeInformation[(Int, Long, Double, Float, Boolean, String, Char, Short, Byte)]
+
+ Assert.assertTrue(ti.isTupleType)
+ Assert.assertEquals(9, ti.getArity)
+ Assert.assertTrue(ti.isInstanceOf[TupleTypeInfoBase[_]])
+ val tti = ti.asInstanceOf[TupleTypeInfoBase[_]]
+ Assert.assertEquals(classOf[Tuple9[_,_,_,_,_,_,_,_,_]], tti.getTypeClass)
+ for (i <- 0 until 9) {
+ Assert.assertTrue(tti.getTypeAt(i).isInstanceOf[BasicTypeInfo[_]])
+ }
+
+ Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, tti.getTypeAt(0))
+ Assert.assertEquals(BasicTypeInfo.LONG_TYPE_INFO, tti.getTypeAt(1))
+ Assert.assertEquals(BasicTypeInfo.DOUBLE_TYPE_INFO, tti.getTypeAt(2))
+ Assert.assertEquals(BasicTypeInfo.FLOAT_TYPE_INFO, tti.getTypeAt(3))
+ Assert.assertEquals(BasicTypeInfo.BOOLEAN_TYPE_INFO, tti.getTypeAt(4))
+ Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, tti.getTypeAt(5))
+ Assert.assertEquals(BasicTypeInfo.CHAR_TYPE_INFO, tti.getTypeAt(6))
+ Assert.assertEquals(BasicTypeInfo.SHORT_TYPE_INFO, tti.getTypeAt(7))
+ Assert.assertEquals(BasicTypeInfo.BYTE_TYPE_INFO, tti.getTypeAt(8))
+ }
+
+ @Test
+ def testTupleWithTuples(): Unit = {
+ val ti = createTypeInformation[(Tuple1[String], Tuple1[Int], Tuple2[Long, Long])]
+
+ Assert.assertTrue(ti.isTupleType())
+ Assert.assertEquals(3, ti.getArity)
+ Assert.assertTrue(ti.isInstanceOf[TupleTypeInfoBase[_]])
+ val tti = ti.asInstanceOf[TupleTypeInfoBase[_]]
+ Assert.assertEquals(classOf[Tuple3[_, _, _]], tti.getTypeClass)
+ Assert.assertTrue(tti.getTypeAt(0).isTupleType())
+ Assert.assertTrue(tti.getTypeAt(1).isTupleType())
+ Assert.assertTrue(tti.getTypeAt(2).isTupleType())
+ Assert.assertEquals(classOf[Tuple1[_]], tti.getTypeAt(0).getTypeClass)
+ Assert.assertEquals(classOf[Tuple1[_]], tti.getTypeAt(1).getTypeClass)
+ Assert.assertEquals(classOf[Tuple2[_, _]], tti.getTypeAt(2).getTypeClass)
+ Assert.assertEquals(1, tti.getTypeAt(0).getArity)
+ Assert.assertEquals(1, tti.getTypeAt(1).getArity)
+ Assert.assertEquals(2, tti.getTypeAt(2).getArity)
+ Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO,
+ tti.getTypeAt(0).asInstanceOf[TupleTypeInfoBase[_]].getTypeAt(0))
+ Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO,
+ tti.getTypeAt(1).asInstanceOf[TupleTypeInfoBase[_]].getTypeAt(0))
+ Assert.assertEquals(BasicTypeInfo.LONG_TYPE_INFO,
+ tti.getTypeAt(2).asInstanceOf[TupleTypeInfoBase[_]].getTypeAt(0))
+ Assert.assertEquals(BasicTypeInfo.LONG_TYPE_INFO,
+ tti.getTypeAt(2).asInstanceOf[TupleTypeInfoBase[_]].getTypeAt(1))
+ }
+
+ @Test
+ def testCaseClass(): Unit = {
+ val ti = createTypeInformation[CustomCaseClass]
+
+ Assert.assertTrue(ti.isTupleType)
+ Assert.assertEquals(2, ti.getArity)
+ Assert.assertEquals(
+ BasicTypeInfo.STRING_TYPE_INFO,
+ ti.asInstanceOf[TupleTypeInfoBase[_]].getTypeAt(0))
+ Assert.assertEquals(
+ BasicTypeInfo.INT_TYPE_INFO,
+ ti.asInstanceOf[TupleTypeInfoBase[_]].getTypeAt(1))
+ Assert.assertEquals(
+ classOf[CustomCaseClass],ti.asInstanceOf[TupleTypeInfoBase[_]].getTypeClass())
+ }
+
+ @Test
+ def testCustomType(): Unit = {
+ val ti = createTypeInformation[CustomType]
+
+ Assert.assertFalse(ti.isBasicType)
+ Assert.assertFalse(ti.isTupleType)
+ Assert.assertTrue(ti.isInstanceOf[PojoTypeInfo[_]])
+ Assert.assertEquals(ti.getTypeClass, classOf[CustomType])
+ }
+
+ @Test
+ def testTupleWithCustomType(): Unit = {
+ val ti = createTypeInformation[(Long, CustomType)]
+
+ Assert.assertTrue(ti.isTupleType)
+ Assert.assertEquals(2, ti.getArity)
+ val tti = ti.asInstanceOf[TupleTypeInfoBase[_]]
+ Assert.assertEquals(classOf[Tuple2[_, _]], tti.getTypeClass)
+ Assert.assertEquals(classOf[java.lang.Long], tti.getTypeAt(0).getTypeClass)
+ Assert.assertTrue(tti.getTypeAt(1).isInstanceOf[PojoTypeInfo[_]])
+ Assert.assertEquals(classOf[CustomType], tti.getTypeAt(1).getTypeClass)
+ }
+
+ @Test
+ def testValue(): Unit = {
+ val ti = createTypeInformation[StringValue]
+
+ Assert.assertFalse(ti.isBasicType)
+ Assert.assertFalse(ti.isTupleType)
+ Assert.assertTrue(ti.isInstanceOf[ValueTypeInfo[_]])
+ Assert.assertEquals(ti.getTypeClass, classOf[StringValue])
+ Assert.assertTrue(TypeExtractor.getForClass(classOf[StringValue])
+ .isInstanceOf[ValueTypeInfo[_]])
+ Assert.assertEquals(TypeExtractor.getForClass(classOf[StringValue]).getTypeClass,
+ ti.getTypeClass)
+ }
+
+ @Test
+ def testTupleOfValues(): Unit = {
+ val ti = createTypeInformation[(StringValue, IntValue)]
+ Assert.assertFalse(ti.isBasicType)
+ Assert.assertTrue(ti.isTupleType)
+ Assert.assertEquals(
+ classOf[StringValue],
+ ti.asInstanceOf[TupleTypeInfoBase[_]].getTypeAt(0).getTypeClass)
+ Assert.assertEquals(
+ classOf[IntValue],
+ ti.asInstanceOf[TupleTypeInfoBase[_]].getTypeAt(1).getTypeClass)
+ }
+
+
+ @Test
+ def testBasicArray(): Unit = {
+ val ti = createTypeInformation[Array[String]]
+
+ Assert.assertFalse(ti.isBasicType)
+ Assert.assertFalse(ti.isTupleType)
+ Assert.assertTrue(ti.isInstanceOf[BasicArrayTypeInfo[_, _]] ||
+ ti.isInstanceOf[ObjectArrayTypeInfo[_, _]])
+ if (ti.isInstanceOf[BasicArrayTypeInfo[_, _]]) {
+ Assert.assertEquals(BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO, ti)
+ }
+ else {
+ Assert.assertEquals(
+ BasicTypeInfo.STRING_TYPE_INFO,
+ ti.asInstanceOf[ObjectArrayTypeInfo[_, _]].getComponentInfo)
+ }
+ }
+
+ @Test
+ def testPrimitiveArray(): Unit = {
+ val ti = createTypeInformation[Array[Boolean]]
+
+ Assert.assertTrue(ti.isInstanceOf[PrimitiveArrayTypeInfo[_]])
+ Assert.assertEquals(ti, PrimitiveArrayTypeInfo.BOOLEAN_PRIMITIVE_ARRAY_TYPE_INFO)
+ }
+
+ @Test
+ def testCustomArray(): Unit = {
+ val ti = createTypeInformation[Array[CustomType]]
+ Assert.assertTrue(ti.isInstanceOf[ObjectArrayTypeInfo[_, _]])
+ Assert.assertEquals(
+ classOf[CustomType],
+ ti.asInstanceOf[ObjectArrayTypeInfo[_, _]].getComponentInfo.getTypeClass)
+ }
+
+ @Test
+ def testTupleArray(): Unit = {
+ val ti = createTypeInformation[Array[(String, String)]]
+
+ Assert.assertTrue(ti.isInstanceOf[ObjectArrayTypeInfo[_, _]])
+ val oati = ti.asInstanceOf[ObjectArrayTypeInfo[_, _]]
+ Assert.assertTrue(oati.getComponentInfo.isTupleType)
+ val tti = oati.getComponentInfo.asInstanceOf[TupleTypeInfoBase[_]]
+ Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, tti.getTypeAt(0))
+ Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, tti.getTypeAt(1))
+ }
+
+ @Test
+ def testMultidimensionalArrays(): Unit = {
+ // Tuple
+ {
+ val ti = createTypeInformation[Array[Array[(String, String)]]]
+
+ Assert.assertTrue(ti.isInstanceOf[ObjectArrayTypeInfo[_, _]])
+ val oati = ti.asInstanceOf[ObjectArrayTypeInfo[_, _]]
+ Assert.assertTrue(oati.getComponentInfo.isInstanceOf[ObjectArrayTypeInfo[_, _]])
+ val oati2 = oati.getComponentInfo.asInstanceOf[ObjectArrayTypeInfo[_, _]]
+ Assert.assertTrue(oati2.getComponentInfo.isTupleType)
+ val tti = oati2.getComponentInfo.asInstanceOf[TupleTypeInfoBase[_]]
+ Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, tti.getTypeAt(0))
+ Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, tti.getTypeAt(1))
+ }
+
+ // primitives
+ {
+ val ti = createTypeInformation[Array[Array[Int]]]
+
+ Assert.assertTrue(ti.isInstanceOf[ObjectArrayTypeInfo[_, _]])
+ val oati = ti.asInstanceOf[ObjectArrayTypeInfo[_, _]]
+ Assert.assertEquals(oati.getComponentInfo,
+ PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO)
+ }
+
+ // basic types
+ {
+ val ti = createTypeInformation[Array[Array[Integer]]]
+
+ Assert.assertTrue(ti.isInstanceOf[ObjectArrayTypeInfo[_, _]])
+ val oati = ti.asInstanceOf[ObjectArrayTypeInfo[_, _]]
+ Assert.assertEquals(oati.getComponentInfo, BasicArrayTypeInfo.INT_ARRAY_TYPE_INFO)
+ }
+
+ // pojo
+ {
+ val ti = createTypeInformation[Array[Array[CustomType]]]
+
+ Assert.assertTrue(ti.isInstanceOf[ObjectArrayTypeInfo[_, _]])
+ val oati = ti.asInstanceOf[ObjectArrayTypeInfo[_, _]]
+ Assert.assertTrue(oati.getComponentInfo.isInstanceOf[ObjectArrayTypeInfo[_, _]])
+ val oati2 = oati.getComponentInfo.asInstanceOf[ObjectArrayTypeInfo[_, _]]
+ val tti = oati2.getComponentInfo.asInstanceOf[PojoTypeInfo[_]]
+ Assert.assertEquals(classOf[CustomType], tti.getTypeClass())
+ }
+ }
+
+ @Test
+ def testParamertizedCustomObject(): Unit = {
+ val ti = createTypeInformation[MyObject[String]]
+
+ Assert.assertTrue(ti.isInstanceOf[PojoTypeInfo[_]])
+ }
+
+ @Test
+ def testTupleWithPrimitiveArray(): Unit = {
+ val ti = createTypeInformation[(Array[Int], Array[Double], Array[Long],
+ Array[Byte], Array[Char], Array[Float], Array[Short], Array[Boolean],
+ Array[String])]
+
+ val tti = ti.asInstanceOf[TupleTypeInfoBase[_]]
+ Assert.assertEquals(PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO, tti.getTypeAt(0))
+ Assert.assertEquals(PrimitiveArrayTypeInfo.DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO, tti.getTypeAt(1))
+ Assert.assertEquals(PrimitiveArrayTypeInfo.LONG_PRIMITIVE_ARRAY_TYPE_INFO, tti.getTypeAt(2))
+ Assert.assertEquals(PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO, tti.getTypeAt(3))
+ Assert.assertEquals(PrimitiveArrayTypeInfo.CHAR_PRIMITIVE_ARRAY_TYPE_INFO, tti.getTypeAt(4))
+ Assert.assertEquals(PrimitiveArrayTypeInfo.FLOAT_PRIMITIVE_ARRAY_TYPE_INFO, tti.getTypeAt(5))
+ Assert.assertEquals(PrimitiveArrayTypeInfo.SHORT_PRIMITIVE_ARRAY_TYPE_INFO, tti.getTypeAt(6))
+ Assert.assertEquals(PrimitiveArrayTypeInfo.BOOLEAN_PRIMITIVE_ARRAY_TYPE_INFO, tti.getTypeAt(7))
+ Assert.assertEquals(BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO, tti.getTypeAt(8))
+ }
+
+ @Test
+ def testTrait(): Unit = {
+ trait TestTrait {
+ def foo() = 1
+ def bar(x: Int): Int
+ }
+
+ val ti = createTypeInformation[TestTrait]
+
+ Assert.assertTrue(ti.isInstanceOf[GenericTypeInfo[TestTrait]])
+ }
+
+ @Test
+ def testGetFlatFields(): Unit = {
+
+ val tupleTypeInfo = createTypeInformation[(Int, Int, Int, Int)].
+ asInstanceOf[CaseClassTypeInfo[(Int, Int, Int, Int)]]
+ Assert.assertEquals(0, tupleTypeInfo.getFlatFields("0").get(0).getPosition)
+ Assert.assertEquals(1, tupleTypeInfo.getFlatFields("1").get(0).getPosition)
+ Assert.assertEquals(2, tupleTypeInfo.getFlatFields("2").get(0).getPosition)
+ Assert.assertEquals(3, tupleTypeInfo.getFlatFields("3").get(0).getPosition)
+ Assert.assertEquals(0, tupleTypeInfo.getFlatFields("_1").get(0).getPosition)
+ Assert.assertEquals(1, tupleTypeInfo.getFlatFields("_2").get(0).getPosition)
+ Assert.assertEquals(2, tupleTypeInfo.getFlatFields("_3").get(0).getPosition)
+ Assert.assertEquals(3, tupleTypeInfo.getFlatFields("_4").get(0).getPosition)
+
+ val nestedTypeInfo = createTypeInformation[(Int, (Int, String, Long), Int, (Double, Double))].
+ asInstanceOf[CaseClassTypeInfo[(Int, (Int, String, Long), Int, (Double, Double))]]
+ Assert.assertEquals(0, nestedTypeInfo.getFlatFields("0").get(0).getPosition)
+ Assert.assertEquals(1, nestedTypeInfo.getFlatFields("1.0").get(0).getPosition)
+ Assert.assertEquals(2, nestedTypeInfo.getFlatFields("1.1").get(0).getPosition)
+ Assert.assertEquals(3, nestedTypeInfo.getFlatFields("1.2").get(0).getPosition)
+ Assert.assertEquals(4, nestedTypeInfo.getFlatFields("2").get(0).getPosition)
+ Assert.assertEquals(5, nestedTypeInfo.getFlatFields("3.0").get(0).getPosition)
+ Assert.assertEquals(6, nestedTypeInfo.getFlatFields("3.1").get(0).getPosition)
+ Assert.assertEquals(4, nestedTypeInfo.getFlatFields("_3").get(0).getPosition)
+ Assert.assertEquals(5, nestedTypeInfo.getFlatFields("_4._1").get(0).getPosition)
+ Assert.assertEquals(3, nestedTypeInfo.getFlatFields("1").size)
+ Assert.assertEquals(1, nestedTypeInfo.getFlatFields("1").get(0).getPosition)
+ Assert.assertEquals(2, nestedTypeInfo.getFlatFields("1").get(1).getPosition)
+ Assert.assertEquals(3, nestedTypeInfo.getFlatFields("1").get(2).getPosition)
+ Assert.assertEquals(3, nestedTypeInfo.getFlatFields("1.*").size)
+ Assert.assertEquals(1, nestedTypeInfo.getFlatFields("1.*").get(0).getPosition)
+ Assert.assertEquals(2, nestedTypeInfo.getFlatFields("1.*").get(1).getPosition)
+ Assert.assertEquals(3, nestedTypeInfo.getFlatFields("1.*").get(2).getPosition)
+ Assert.assertEquals(2, nestedTypeInfo.getFlatFields("3").size)
+ Assert.assertEquals(5, nestedTypeInfo.getFlatFields("3").get(0).getPosition)
+ Assert.assertEquals(6, nestedTypeInfo.getFlatFields("3").get(1).getPosition)
+ Assert.assertEquals(3, nestedTypeInfo.getFlatFields("_2").size)
+ Assert.assertEquals(1, nestedTypeInfo.getFlatFields("_2").get(0).getPosition)
+ Assert.assertEquals(2, nestedTypeInfo.getFlatFields("_2").get(1).getPosition)
+ Assert.assertEquals(3, nestedTypeInfo.getFlatFields("_2").get(2).getPosition)
+ Assert.assertEquals(2, nestedTypeInfo.getFlatFields("_4").size)
+ Assert.assertEquals(5, nestedTypeInfo.getFlatFields("_4").get(0).getPosition)
+ Assert.assertEquals(6, nestedTypeInfo.getFlatFields("_4").get(1).getPosition)
+ Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO,
+ nestedTypeInfo.getFlatFields("0").get(0).getType)
+ Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO,
+ nestedTypeInfo.getFlatFields("1.1").get(0).getType)
+ Assert.assertEquals(BasicTypeInfo.LONG_TYPE_INFO,
+ nestedTypeInfo.getFlatFields("1").get(2).getType)
+ Assert.assertEquals(BasicTypeInfo.DOUBLE_TYPE_INFO,
+ nestedTypeInfo.getFlatFields("3").get(1).getType)
+
+ val deepNestedTupleTypeInfo = createTypeInformation[(Int, (Int, (Int, Int)), Int)].
+ asInstanceOf[CaseClassTypeInfo[(Int, (Int, (Int, Int)), Int)]]
+ Assert.assertEquals(3, deepNestedTupleTypeInfo.getFlatFields("1").size)
+ Assert.assertEquals(1, deepNestedTupleTypeInfo.getFlatFields("1").get(0).getPosition)
+ Assert.assertEquals(2, deepNestedTupleTypeInfo.getFlatFields("1").get(1).getPosition)
+ Assert.assertEquals(3, deepNestedTupleTypeInfo.getFlatFields("1").get(2).getPosition)
+ Assert.assertEquals(5, deepNestedTupleTypeInfo.getFlatFields("*").size)
+ Assert.assertEquals(0, deepNestedTupleTypeInfo.getFlatFields("*").get(0).getPosition)
+ Assert.assertEquals(1, deepNestedTupleTypeInfo.getFlatFields("*").get(1).getPosition)
+ Assert.assertEquals(2, deepNestedTupleTypeInfo.getFlatFields("*").get(2).getPosition)
+ Assert.assertEquals(3, deepNestedTupleTypeInfo.getFlatFields("*").get(3).getPosition)
+ Assert.assertEquals(4, deepNestedTupleTypeInfo.getFlatFields("*").get(4).getPosition)
+
+ val caseClassTypeInfo = createTypeInformation[CustomCaseClass].
+ asInstanceOf[CaseClassTypeInfo[CustomCaseClass]]
+ Assert.assertEquals(0, caseClassTypeInfo.getFlatFields("a").get(0).getPosition)
+ Assert.assertEquals(1, caseClassTypeInfo.getFlatFields("b").get(0).getPosition)
+ Assert.assertEquals(2, caseClassTypeInfo.getFlatFields("*").size)
+ Assert.assertEquals(0, caseClassTypeInfo.getFlatFields("*").get(0).getPosition)
+ Assert.assertEquals(1, caseClassTypeInfo.getFlatFields("*").get(1).getPosition)
+
+ val caseClassInTupleTypeInfo = createTypeInformation[(Int, UmlautCaseClass)].
+ asInstanceOf[CaseClassTypeInfo[(Int, UmlautCaseClass)]]
+ Assert.assertEquals(1, caseClassInTupleTypeInfo.getFlatFields("_2.ä").get(0).getPosition)
+ Assert.assertEquals(2, caseClassInTupleTypeInfo.getFlatFields("1.ß").get(0).getPosition)
+ Assert.assertEquals(2, caseClassInTupleTypeInfo.getFlatFields("1").size)
+ Assert.assertEquals(1, caseClassInTupleTypeInfo.getFlatFields("1.*").get(0).getPosition)
+ Assert.assertEquals(2, caseClassInTupleTypeInfo.getFlatFields("1").get(1).getPosition)
+ Assert.assertEquals(2, caseClassInTupleTypeInfo.getFlatFields("_2.*").size)
+ Assert.assertEquals(1, caseClassInTupleTypeInfo.getFlatFields("_2.*").get(0).getPosition)
+ Assert.assertEquals(2, caseClassInTupleTypeInfo.getFlatFields("_2").get(1).getPosition)
+ Assert.assertEquals(3, caseClassInTupleTypeInfo.getFlatFields("*").size)
+ Assert.assertEquals(0, caseClassInTupleTypeInfo.getFlatFields("*").get(0).getPosition)
+ Assert.assertEquals(1, caseClassInTupleTypeInfo.getFlatFields("*").get(1).getPosition)
+ Assert.assertEquals(2, caseClassInTupleTypeInfo.getFlatFields("*").get(2).getPosition)
+
+ }
+
+ @Test
+ def testFieldAtStringRef(): Unit = {
+
+ val tupleTypeInfo = createTypeInformation[(Int, Int, Int, Int)].
+ asInstanceOf[CaseClassTypeInfo[(Int, Int, Int, Int)]]
+ Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, tupleTypeInfo.getTypeAt("0"))
+ Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, tupleTypeInfo.getTypeAt("2"))
+ Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, tupleTypeInfo.getTypeAt("_2"))
+ Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, tupleTypeInfo.getTypeAt("_4"))
+
+ val nestedTypeInfo = createTypeInformation[(Int, (Int, String, Long), Int, (Double, Double))].
+ asInstanceOf[CaseClassTypeInfo[(Int, (Int, String, Long), Int, (Double, Double))]]
+ Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, nestedTypeInfo.getTypeAt("0"))
+ Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, nestedTypeInfo.getTypeAt("1.0"))
+ Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, nestedTypeInfo.getTypeAt("1.1"))
+ Assert.assertEquals(BasicTypeInfo.LONG_TYPE_INFO, nestedTypeInfo.getTypeAt("1.2"))
+ Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, nestedTypeInfo.getTypeAt("2"))
+ Assert.assertEquals(BasicTypeInfo.DOUBLE_TYPE_INFO, nestedTypeInfo.getTypeAt("3.0"))
+ Assert.assertEquals(BasicTypeInfo.DOUBLE_TYPE_INFO, nestedTypeInfo.getTypeAt("3.1"))
+ Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, nestedTypeInfo.getTypeAt("_3"))
+ Assert.assertEquals(BasicTypeInfo.DOUBLE_TYPE_INFO, nestedTypeInfo.getTypeAt("_4._1"))
+ Assert.assertEquals(createTypeInformation[(Int, String, Long)], nestedTypeInfo.getTypeAt("1"))
+ Assert.assertEquals(createTypeInformation[(Double, Double)], nestedTypeInfo.getTypeAt("3"))
+ Assert.assertEquals(createTypeInformation[(Int, String, Long)], nestedTypeInfo.getTypeAt("_2"))
+ Assert.assertEquals(createTypeInformation[(Double, Double)], nestedTypeInfo.getTypeAt("_4"))
+
+ val deepNestedTupleTypeInfo = createTypeInformation[(Int, (Int, (Int, Int)), Int)].
+ asInstanceOf[CaseClassTypeInfo[(Int, (Int, (Int, Int)), Int)]]
+ Assert.assertEquals(createTypeInformation[(Int, (Int, Int))],
+ deepNestedTupleTypeInfo.getTypeAt("1"))
+
+ val umlautCaseClassTypeInfo = createTypeInformation[UmlautCaseClass].
+ asInstanceOf[CaseClassTypeInfo[UmlautCaseClass]]
+ Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, umlautCaseClassTypeInfo.getTypeAt("ä"))
+ Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, umlautCaseClassTypeInfo.getTypeAt("ß"))
+
+ val caseClassTypeInfo = createTypeInformation[CustomCaseClass].
+ asInstanceOf[CaseClassTypeInfo[CustomCaseClass]]
+ val caseClassInTupleTypeInfo = createTypeInformation[(Int, CustomCaseClass)].
+ asInstanceOf[CaseClassTypeInfo[(Int, CustomCaseClass)]]
+ Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, caseClassInTupleTypeInfo.getTypeAt("_2.a"))
+ Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, caseClassInTupleTypeInfo.getTypeAt("1.b"))
+ Assert.assertEquals(caseClassTypeInfo, caseClassInTupleTypeInfo.getTypeAt("1"))
+ Assert.assertEquals(caseClassTypeInfo, caseClassInTupleTypeInfo.getTypeAt("_2"))
+
+ }
+
+ /**
+ * Tests the "implicit val scalaNothingTypeInfo" in
+ * flink-scala/src/main/scala/org/apache/flink/api/scala/package.scala
+ * This does not compile without that line.
+ */
+ @Test
+ def testNothingTypeInfoIsAvailableImplicitly() : Unit = {
+ def g() = {
+
+ def f[O: TypeInformation](x: O): Unit = {}
+
+ f(???) // O will be Nothing
+ }
+ // (Do not call g, because it throws NotImplementedError. This is a compile time test.)
+ }
+
+ @Test
+ def testUnit(): Unit = {
+ val ti = createTypeInformation[Unit]
+ Assert.assertTrue(ti.isInstanceOf[UnitTypeInfo])
+
+ // This checks the condition in checkCollection. If this fails with IllegalArgumentException,
+ // then things like "env.fromElements((),(),())" won't work.
+ import scala.collection.JavaConversions._
+ CollectionInputFormat.checkCollection(Seq((),(),()), (new UnitTypeInfo).getTypeClass())
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/flink/blob/6a935978/flink-tests/src/test/resources/flink_11-kryo_registrations
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/resources/flink_11-kryo_registrations b/flink-tests/src/test/resources/flink_11-kryo_registrations
deleted file mode 100644
index 7000e62..0000000
--- a/flink-tests/src/test/resources/flink_11-kryo_registrations
+++ /dev/null
@@ -1,86 +0,0 @@
-0,int
-1,java.lang.String
-2,float
-3,boolean
-4,byte
-5,char
-6,short
-7,long
-8,double
-9,void
-10,scala.collection.convert.Wrappers$SeqWrapper
-11,scala.collection.convert.Wrappers$IteratorWrapper
-12,scala.collection.convert.Wrappers$MapWrapper
-13,scala.collection.convert.Wrappers$JListWrapper
-14,scala.collection.convert.Wrappers$JMapWrapper
-15,scala.Some
-16,scala.util.Left
-17,scala.util.Right
-18,scala.collection.immutable.Vector
-19,scala.collection.immutable.Set$Set1
-20,scala.collection.immutable.Set$Set2
-21,scala.collection.immutable.Set$Set3
-22,scala.collection.immutable.Set$Set4
-23,scala.collection.immutable.HashSet$HashTrieSet
-24,scala.collection.immutable.Map$Map1
-25,scala.collection.immutable.Map$Map2
-26,scala.collection.immutable.Map$Map3
-27,scala.collection.immutable.Map$Map4
-28,scala.collection.immutable.HashMap$HashTrieMap
-29,scala.collection.immutable.Range$Inclusive
-30,scala.collection.immutable.NumericRange$Inclusive
-31,scala.collection.immutable.NumericRange$Exclusive
-32,scala.collection.mutable.BitSet
-33,scala.collection.mutable.HashMap
-34,scala.collection.mutable.HashSet
-35,scala.collection.convert.Wrappers$IterableWrapper
-36,scala.Tuple1
-37,scala.Tuple2
-38,scala.Tuple3
-39,scala.Tuple4
-40,scala.Tuple5
-41,scala.Tuple6
-42,scala.Tuple7
-43,scala.Tuple8
-44,scala.Tuple9
-45,scala.Tuple10
-46,scala.Tuple11
-47,scala.Tuple12
-48,scala.Tuple13
-49,scala.Tuple14
-50,scala.Tuple15
-51,scala.Tuple16
-52,scala.Tuple17
-53,scala.Tuple18
-54,scala.Tuple19
-55,scala.Tuple20
-56,scala.Tuple21
-57,scala.Tuple22
-58,scala.Tuple1$mcJ$sp
-59,scala.Tuple1$mcI$sp
-60,scala.Tuple1$mcD$sp
-61,scala.Tuple2$mcJJ$sp
-62,scala.Tuple2$mcJI$sp
-63,scala.Tuple2$mcJD$sp
-64,scala.Tuple2$mcIJ$sp
-65,scala.Tuple2$mcII$sp
-66,scala.Tuple2$mcID$sp
-67,scala.Tuple2$mcDJ$sp
-68,scala.Tuple2$mcDI$sp
-69,scala.Tuple2$mcDD$sp
-70,scala.Symbol
-71,scala.reflect.ClassTag
-72,scala.runtime.BoxedUnit
-73,java.util.Arrays$ArrayList
-74,java.util.BitSet
-75,java.util.PriorityQueue
-76,java.util.regex.Pattern
-77,java.sql.Date
-78,java.sql.Time
-79,java.sql.Timestamp
-80,java.net.URI
-81,java.net.InetSocketAddress
-82,java.util.UUID
-83,java.util.Locale
-84,java.text.SimpleDateFormat
-85,org.apache.avro.generic.GenericData$Array
http://git-wip-us.apache.org/repos/asf/flink/blob/6a935978/flink-tests/src/test/scala/org/apache/flink/api/scala/DeltaIterationSanityCheckTest.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/DeltaIterationSanityCheckTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/DeltaIterationSanityCheckTest.scala
deleted file mode 100644
index 2775d09..0000000
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/DeltaIterationSanityCheckTest.scala
+++ /dev/null
@@ -1,169 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala
-
-import org.apache.flink.api.java.io.DiscardingOutputFormat
-import org.junit.Test
-import org.apache.flink.api.common.InvalidProgramException
-
-// Verify that the sanity checking in delta iterations works. We just
-// have a dummy job that is not meant to be executed. Only verify that
-// the join/coGroup inside the iteration is checked.
-class DeltaIterationSanityCheckTest extends Serializable {
-
- @Test
- def testCorrectJoinWithSolution1(): Unit = {
- val env = ExecutionEnvironment.getExecutionEnvironment
- val solutionInput = env.fromElements((1, "1"))
- val worksetInput = env.fromElements((2, "2"))
-
- val iteration = solutionInput.iterateDelta(worksetInput, 10, Array("_1")) { (s, ws) =>
- val result = s.join(ws).where("_1").equalTo("_1") { (l, r) => l }
- (result, ws)
- }
-
- iteration.output(new DiscardingOutputFormat[(Int, String)])
- }
-
- @Test
- def testCorrectJoinWithSolution2(): Unit = {
- val env = ExecutionEnvironment.getExecutionEnvironment
- val solutionInput = env.fromElements((1, "1"))
- val worksetInput = env.fromElements((2, "2"))
-
- val iteration = solutionInput.iterateDelta(worksetInput, 10, Array("_1")) { (s, ws) =>
- val result = ws.join(s).where("_1").equalTo("_1") { (l, r) => l }
- (result, ws)
- }
-
- iteration.output(new DiscardingOutputFormat[(Int,String)])
- }
-
- @Test(expected = classOf[InvalidProgramException])
- def testIncorrectJoinWithSolution1(): Unit = {
- val env = ExecutionEnvironment.getExecutionEnvironment
- val solutionInput = env.fromElements((1, "1"))
- val worksetInput = env.fromElements((2, "2"))
-
- val iteration = solutionInput.iterateDelta(worksetInput, 10, Array("_1")) { (s, ws) =>
- val result = s.join(ws).where("_2").equalTo("_2") { (l, r) => l }
- (result, ws)
- }
-
- iteration.output(new DiscardingOutputFormat[(Int,String)])
- }
-
- @Test(expected = classOf[InvalidProgramException])
- def testIncorrectJoinWithSolution2(): Unit = {
- val env = ExecutionEnvironment.getExecutionEnvironment
- val solutionInput = env.fromElements((1, "1"))
- val worksetInput = env.fromElements((2, "2"))
-
- val iteration = solutionInput.iterateDelta(worksetInput, 10, Array("_1")) { (s, ws) =>
- val result = ws.join(s).where("_2").equalTo("_2") { (l, r) => l }
- (result, ws)
- }
-
- iteration.output(new DiscardingOutputFormat[(Int,String)])
- }
-
- @Test(expected = classOf[InvalidProgramException])
- def testIncorrectJoinWithSolution3(): Unit = {
- val env = ExecutionEnvironment.getExecutionEnvironment
- val solutionInput = env.fromElements((1, "1"))
- val worksetInput = env.fromElements((2, "2"))
-
- val iteration = solutionInput.iterateDelta(worksetInput, 10, Array("_2")) { (s, ws) =>
- val result = ws.join(s).where("_1").equalTo("_1") { (l, r) => l }
- (result, ws)
- }
-
- iteration.output(new DiscardingOutputFormat[(Int,String)])
- }
-
- @Test
- def testCorrectCoGroupWithSolution1(): Unit = {
- val env = ExecutionEnvironment.getExecutionEnvironment
- val solutionInput = env.fromElements((1, "1"))
- val worksetInput = env.fromElements((2, "2"))
-
- val iteration = solutionInput.iterateDelta(worksetInput, 10, Array("_1")) { (s, ws) =>
- val result = s.coGroup(ws).where("_1").equalTo("_1") { (l, r) => l.min }
- (result, ws)
- }
-
- iteration.output(new DiscardingOutputFormat[(Int,String)])
- }
-
- @Test
- def testCorrectCoGroupWithSolution2(): Unit = {
- val env = ExecutionEnvironment.getExecutionEnvironment
- val solutionInput = env.fromElements((1, "1"))
- val worksetInput = env.fromElements((2, "2"))
-
- val iteration = solutionInput.iterateDelta(worksetInput, 10, Array("_1")) { (s, ws) =>
- val result = ws.coGroup(s).where("_1").equalTo("_1") { (l, r) => l.min }
- (result, ws)
- }
-
- iteration.output(new DiscardingOutputFormat[(Int,String)])
- }
-
- @Test(expected = classOf[InvalidProgramException])
- def testIncorrectCoGroupWithSolution1(): Unit = {
- val env = ExecutionEnvironment.getExecutionEnvironment
- val solutionInput = env.fromElements((1, "1"))
- val worksetInput = env.fromElements((2, "2"))
-
- val iteration = solutionInput.iterateDelta(worksetInput, 10, Array("_1")) { (s, ws) =>
- val result = s.coGroup(ws).where("_2").equalTo("_2") { (l, r) => l.min }
- (result, ws)
- }
-
- iteration.output(new DiscardingOutputFormat[(Int,String)])
- }
-
- @Test(expected = classOf[InvalidProgramException])
- def testIncorrectCoGroupWithSolution2(): Unit = {
- val env = ExecutionEnvironment.getExecutionEnvironment
- val solutionInput = env.fromElements((1, "1"))
- val worksetInput = env.fromElements((2, "2"))
-
- val iteration = solutionInput.iterateDelta(worksetInput, 10, Array("_1")) { (s, ws) =>
- val result = ws.coGroup(s).where("_2").equalTo("_2") { (l, r) => l.min }
- (result, ws)
- }
-
- iteration.output(new DiscardingOutputFormat[(Int,String)])
- }
-
- @Test(expected = classOf[InvalidProgramException])
- def testIncorrectCoGroupWithSolution3(): Unit = {
- val env = ExecutionEnvironment.getExecutionEnvironment
- val solutionInput = env.fromElements((1, "1"))
- val worksetInput = env.fromElements((2, "2"))
-
- val iteration = solutionInput.iterateDelta(worksetInput, 10, Array("_2")) { (s, ws) =>
- val result = ws.coGroup(s).where("_1").equalTo("_1") { (l, r) => l.min }
- (result, ws)
- }
-
- iteration.output(new DiscardingOutputFormat[(Int,String)])
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/6a935978/flink-tests/src/test/scala/org/apache/flink/api/scala/functions/SemanticPropertiesTranslationTest.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/functions/SemanticPropertiesTranslationTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/functions/SemanticPropertiesTranslationTest.scala
deleted file mode 100644
index 16c826f..0000000
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/functions/SemanticPropertiesTranslationTest.scala
+++ /dev/null
@@ -1,282 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.scala.functions
-
-import org.apache.flink.api.java.io.DiscardingOutputFormat
-import org.junit.Assert._
-import org.apache.flink.api.common.functions.RichJoinFunction
-import org.apache.flink.api.common.functions.RichMapFunction
-import org.apache.flink.api.common.operators.{GenericDataSinkBase, SingleInputSemanticProperties}
-import org.apache.flink.api.common.operators.base.{InnerJoinOperatorBase, MapOperatorBase}
-import org.apache.flink.api.common.operators.util.FieldSet
-import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields
-import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst
-import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsSecond
-import org.junit.Test
-
-import org.apache.flink.api.scala._
-
-/**
- * This is a minimal test to verify that semantic annotations are evaluated against
- * the type information properly translated correctly to the common data flow API.
- *
- * This covers only the constant fields annotations currently !!!
- */
-class SemanticPropertiesTranslationTest {
- /**
- * A mapper that preserves all fields over a tuple data set.
- */
- @Test
- def translateUnaryFunctionAnnotationTuplesWildCard(): Unit = {
- try {
- val env = ExecutionEnvironment.getExecutionEnvironment
-
- val input = env.fromElements((3L, "test", 42))
- input.map(new WildcardForwardMapper[(Long, String, Int)])
- .output(new DiscardingOutputFormat[(Long, String, Int)])
-
- val plan = env.createProgramPlan()
-
- val sink: GenericDataSinkBase[_] = plan.getDataSinks.iterator.next
-
- val mapper: MapOperatorBase[_, _, _] = sink.getInput.asInstanceOf[MapOperatorBase[_, _, _]]
-
- val semantics: SingleInputSemanticProperties = mapper.getSemanticProperties
- val fw1: FieldSet = semantics.getForwardingTargetFields(0, 0)
- val fw2: FieldSet = semantics.getForwardingTargetFields(0, 1)
- val fw3: FieldSet = semantics.getForwardingTargetFields(0, 2)
-
- assertNotNull(fw1)
- assertNotNull(fw2)
- assertNotNull(fw3)
- assertTrue(fw1.contains(0))
- assertTrue(fw2.contains(1))
- assertTrue(fw3.contains(2))
- } catch {
- case e: Exception => {
- System.err.println(e.getMessage)
- e.printStackTrace()
- fail("Exception in test: " + e.getMessage)
- }
- }
- }
-
- /**
- * A mapper that preserves fields 0, 1, 2 of a tuple data set.
- */
- @Test
- def translateUnaryFunctionAnnotationTuples1(): Unit = {
- try {
- val env = ExecutionEnvironment.getExecutionEnvironment
-
- val input = env.fromElements((3L, "test", 42))
- input.map(new IndividualForwardMapper[Long, String, Int])
- .output(new DiscardingOutputFormat[(Long, String, Int)])
-
- val plan = env.createProgramPlan()
-
- val sink: GenericDataSinkBase[_] = plan.getDataSinks.iterator.next
-
- val mapper: MapOperatorBase[_, _, _] = sink.getInput.asInstanceOf[MapOperatorBase[_, _, _]]
-
- val semantics: SingleInputSemanticProperties = mapper.getSemanticProperties
- val fw1: FieldSet = semantics.getForwardingTargetFields(0, 0)
- val fw2: FieldSet = semantics.getForwardingTargetFields(0, 1)
- val fw3: FieldSet = semantics.getForwardingTargetFields(0, 2)
-
- assertNotNull(fw1)
- assertNotNull(fw2)
- assertNotNull(fw3)
- assertTrue(fw1.contains(0))
- assertTrue(fw2.contains(1))
- assertTrue(fw3.contains(2))
- } catch {
- case e: Exception => {
- System.err.println(e.getMessage)
- e.printStackTrace()
- fail("Exception in test: " + e.getMessage)
- }
- }
- }
-
- /**
- * A mapper that preserves field 1 of a tuple data set.
- */
- @Test
- def translateUnaryFunctionAnnotationTuples2(): Unit = {
- try {
- val env = ExecutionEnvironment.getExecutionEnvironment
-
- val input = env.fromElements((3L, "test", 42))
- input.map(new FieldTwoForwardMapper[Long, String, Int])
- .output(new DiscardingOutputFormat[(Long, String, Int)])
-
- val plan = env.createProgramPlan()
-
- val sink: GenericDataSinkBase[_] = plan.getDataSinks.iterator.next
-
- val mapper: MapOperatorBase[_, _, _] = sink.getInput.asInstanceOf[MapOperatorBase[_, _, _]]
-
- val semantics: SingleInputSemanticProperties = mapper.getSemanticProperties
- val fw1: FieldSet = semantics.getForwardingTargetFields(0, 0)
- val fw2: FieldSet = semantics.getForwardingTargetFields(0, 1)
- val fw3: FieldSet = semantics.getForwardingTargetFields(0, 2)
-
- assertNotNull(fw1)
- assertNotNull(fw2)
- assertNotNull(fw3)
- assertTrue(fw1.size == 0)
- assertTrue(fw3.size == 0)
- assertTrue(fw2.contains(1))
- } catch {
- case e: Exception => {
- System.err.println(e.getMessage)
- e.printStackTrace()
- fail("Exception in test: " + e.getMessage)
- }
- }
- }
-
- /**
- * A join that preserves tuple fields from both sides.
- */
- @Test
- def translateBinaryFunctionAnnotationTuples1(): Unit = {
- try {
- val env = ExecutionEnvironment.getExecutionEnvironment
-
- val input1 = env.fromElements((3L, "test"))
- val input2 = env.fromElements((3L, 3.1415))
-
- input1.join(input2).where(0).equalTo(0)(
- new ForwardingTupleJoin[Long, String, Long, Double])
- .output(new DiscardingOutputFormat[(String, Long)])
-
- val plan = env.createProgramPlan()
- val sink: GenericDataSinkBase[_] = plan.getDataSinks.iterator.next
-
- val join: InnerJoinOperatorBase[_, _, _, _] =
- sink.getInput.asInstanceOf[InnerJoinOperatorBase[_, _, _, _]]
-
- val semantics = join.getSemanticProperties
- val fw11: FieldSet = semantics.getForwardingTargetFields(0, 0)
- val fw12: FieldSet = semantics.getForwardingTargetFields(0, 1)
- val fw21: FieldSet = semantics.getForwardingTargetFields(1, 0)
- val fw22: FieldSet = semantics.getForwardingTargetFields(1, 1)
-
- assertNotNull(fw11)
- assertNotNull(fw21)
- assertNotNull(fw12)
- assertNotNull(fw22)
- assertEquals(0, fw11.size)
- assertEquals(0, fw22.size)
- assertTrue(fw12.contains(0))
- assertTrue(fw21.contains(1))
- }
- catch {
- case e: Exception => {
- System.err.println(e.getMessage)
- e.printStackTrace()
- fail("Exception in test: " + e.getMessage)
- }
- }
- }
-
- /**
- * A join that preserves tuple fields from both sides.
- */
- @Test
- def translateBinaryFunctionAnnotationTuples2(): Unit = {
- try {
- val env = ExecutionEnvironment.getExecutionEnvironment
-
- val input1 = env.fromElements((3L, "test"))
- val input2 = env.fromElements((3L, 42))
-
- input1.join(input2).where(0).equalTo(0)(
- new ForwardingBasicJoin[(Long, String), (Long, Int)])
- .output(new DiscardingOutputFormat[((Long, String), (Long, Int))])
-
- val plan = env.createProgramPlan()
- val sink: GenericDataSinkBase[_] = plan.getDataSinks.iterator.next
-
- val join: InnerJoinOperatorBase[_, _, _, _] =
- sink.getInput.asInstanceOf[InnerJoinOperatorBase[_, _, _, _]]
-
- val semantics = join.getSemanticProperties
- val fw11: FieldSet = semantics.getForwardingTargetFields(0, 0)
- val fw12: FieldSet = semantics.getForwardingTargetFields(0, 1)
- val fw21: FieldSet = semantics.getForwardingTargetFields(1, 0)
- val fw22: FieldSet = semantics.getForwardingTargetFields(1, 1)
-
- assertNotNull(fw11)
- assertNotNull(fw12)
- assertNotNull(fw21)
- assertNotNull(fw22)
- assertTrue(fw11.contains(0))
- assertTrue(fw12.contains(1))
- assertTrue(fw21.contains(2))
- assertTrue(fw22.contains(3))
- }
- catch {
- case e: Exception => {
- System.err.println(e.getMessage)
- e.printStackTrace()
- fail("Exception in test: " + e.getMessage)
- }
- }
- }
-}
-
-
-@ForwardedFields(Array("*"))
-class WildcardForwardMapper[T] extends RichMapFunction[T, T] {
- def map(value: T): T = {
- value
- }
-}
-
-@ForwardedFields(Array("0;1;2"))
-class IndividualForwardMapper[X, Y, Z] extends RichMapFunction[(X, Y, Z), (X, Y, Z)] {
- def map(value: (X, Y, Z)): (X, Y, Z) = {
- value
- }
-}
-
-@ForwardedFields(Array("_2"))
-class FieldTwoForwardMapper[X, Y, Z] extends RichMapFunction[(X, Y, Z), (X, Y, Z)] {
- def map(value: (X, Y ,Z)): (X, Y, Z) = {
- value
- }
-}
-
-@ForwardedFieldsFirst(Array("_2 -> _1"))
-@ForwardedFieldsSecond(Array("_1 -> _2"))
-class ForwardingTupleJoin[A, B, C, D] extends RichJoinFunction[(A, B), (C, D), (B, C)] {
- def join(first: (A, B), second: (C, D)): (B, C) = {
- (first._2, second._1)
- }
-}
-
-@ForwardedFieldsFirst(Array("* -> 0.*"))
-@ForwardedFieldsSecond(Array("* -> 1.*"))
-class ForwardingBasicJoin[A, B] extends RichJoinFunction[A, B, (A, B)] {
- def join(first: A, second: B): (A, B) = {
- (first, second)
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/6a935978/flink-tests/src/test/scala/org/apache/flink/api/scala/io/CollectionInputFormatTest.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/io/CollectionInputFormatTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/io/CollectionInputFormatTest.scala
deleted file mode 100644
index 92575f5..0000000
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/io/CollectionInputFormatTest.scala
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.scala.io
-
-import org.apache.flink.api.common.ExecutionConfig
-import org.apache.flink.api.java.io.CollectionInputFormat
-import org.junit.Assert.assertEquals
-import org.junit.Assert.assertNotNull
-import org.junit.Assert.assertTrue
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo
-import org.apache.flink.core.io.GenericInputSplit
-import org.junit.Test
-import java.io.ByteArrayInputStream
-import java.io.ByteArrayOutputStream
-import java.io.ObjectInputStream
-import java.io.ObjectOutputStream
-import org.apache.flink.api.scala._
-import scala.collection.JavaConverters._
-
-class ElementType(val id: Int) {
- def this() {
- this(-1)
- }
-
- override def equals(obj: Any): Boolean = {
- if (obj != null && obj.isInstanceOf[ElementType]) {
- val et = obj.asInstanceOf[ElementType]
- et.id == this.id
- }
- else {
- false
- }
- }
-}
-
-class CollectionInputFormatTest {
-
- @Test
- def testSerializability(): Unit = {
-
- val inputCollection = Seq(new ElementType(1), new ElementType(2), new ElementType(3))
- val info = createTypeInformation[ElementType]
-
- val inputFormat: CollectionInputFormat[ElementType] = {
- new CollectionInputFormat[ElementType](
- inputCollection.asJava,
- info.createSerializer(new ExecutionConfig))
- }
-
- val buffer = new ByteArrayOutputStream
- val out = new ObjectOutputStream(buffer)
-
- out.writeObject(inputFormat)
-
- val in = new ObjectInputStream(new ByteArrayInputStream(buffer.toByteArray))
- val serializationResult: AnyRef = in.readObject
-
- assertNotNull(serializationResult)
- assertTrue(serializationResult.isInstanceOf[CollectionInputFormat[_]])
-
- val result = serializationResult.asInstanceOf[CollectionInputFormat[ElementType]]
- val inputSplit = new GenericInputSplit(0, 1)
- inputFormat.open(inputSplit)
- result.open(inputSplit)
-
- while (!inputFormat.reachedEnd && !result.reachedEnd) {
- val expectedElement = inputFormat.nextRecord(null)
- val actualElement = result.nextRecord(null)
- assertEquals(expectedElement, actualElement)
- }
- }
-
- @Test
- def testSerializabilityStrings(): Unit = {
- val data = Seq("To bey or not to be,--that is the question:--",
- "Whether 'tis nobler in the mind to suffer", "The slings and arrows of outrageous fortune",
- "Or to take arms against a sea of troubles,", "And by opposing end them?--To die," +
- "--to sleep,--", "No more; and by a sleep to say we end", "The heartache, " +
- "and the thousand natural shocks", "That flesh is heir to,--'tis a consummation",
- "Devoutly to be wish'd. To die,--to sleep;--", "To sleep! perchance to dream:--ay, " +
- "there's the rub;", "For in that sleep of death what dreams may come,",
- "When we have shuffled off this mortal coil,", "Must give us pause: there's the respect",
- "That makes calamity of so long life;", "For who would bear the whips and scorns of time,",
- "The oppressor's wrong, the proud man's contumely,", "The pangs of despis'd love, " +
- "the law's delay,", "The insolence of office, and the spurns",
- "That patient merit of the unworthy takes,", "When he himself might his quietus make",
- "With a bare bodkin? who would these fardels bear,", "To grunt and sweat under a weary " +
- "life,", "But that the dread of something after death,--", "The undiscover'd country, " +
- "from whose bourn", "No traveller returns,--puzzles the will,",
- "And makes us rather bear those ills we have", "Than fly to others that we know not of?",
- "Thus conscience does make cowards of us all;", "And thus the native hue of resolution",
- "Is sicklied o'er with the pale cast of thought;", "And enterprises of great pith and " +
- "moment,", "With this regard, their currents turn awry,", "And lose the name of action" +
- ".--Soft you now!", "The fair Ophelia!--Nymph, in thy orisons",
- "Be all my sins remember'd.")
-
- val inputFormat = new CollectionInputFormat[String](
- data.asJava,
- BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig))
- val baos = new ByteArrayOutputStream
- val oos = new ObjectOutputStream(baos)
-
- oos.writeObject(inputFormat)
- oos.close()
-
- val bais = new ByteArrayInputStream(baos.toByteArray)
- val ois = new ObjectInputStream(bais)
- val result: AnyRef = ois.readObject
-
- assertTrue(result.isInstanceOf[CollectionInputFormat[_]])
- var i: Int = 0
- val in = result.asInstanceOf[CollectionInputFormat[String]]
- in.open(new GenericInputSplit(0, 1))
-
- while (!in.reachedEnd) {
- assertEquals(data(i), in.nextRecord(""))
- i += 1
- }
- assertEquals(data.length, i)
- }
-}
-
http://git-wip-us.apache.org/repos/asf/flink/blob/6a935978/flink-tests/src/test/scala/org/apache/flink/api/scala/io/CsvInputFormatTest.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/io/CsvInputFormatTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/io/CsvInputFormatTest.scala
deleted file mode 100644
index 925ee78..0000000
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/io/CsvInputFormatTest.scala
+++ /dev/null
@@ -1,537 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.scala.io
-
-import java.io.{File, FileOutputStream, FileWriter, OutputStreamWriter}
-
-import org.apache.flink.api.java.io.PojoCsvInputFormat
-import org.apache.flink.api.java.io.TupleCsvInputFormat
-import org.apache.flink.api.java.io.CsvInputFormatTest.TwitterPOJO
-import org.apache.flink.api.java.typeutils.PojoTypeInfo
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
-import org.apache.flink.configuration.Configuration
-import org.apache.flink.core.fs.{FileInputSplit, Path}
-import org.junit.Assert.{assertEquals, assertNotNull, assertNull, assertTrue, fail}
-import org.junit.Test
-
-import scala.collection.mutable.ArrayBuffer
-
-class CsvInputFormatTest {
-
- private final val PATH: Path = new Path("an/ignored/file/")
- private final val FIRST_PART: String = "That is the first part"
- private final val SECOND_PART: String = "That is the second part"
-
-
-
- @Test
- def ignoreSingleCharPrefixComments():Unit = {
- try {
- val fileContent = "#description of the data\n" +
- "#successive commented line\n" +
- "this is|1|2.0|\n" +
- "a test|3|4.0|\n" +
- "#next|5|6.0|\n"
- val split = createTempFile(fileContent)
- val format = new TupleCsvInputFormat[(String, Integer, Double)](
- PATH,
- createTypeInformation[(String, Integer, Double)]
- .asInstanceOf[CaseClassTypeInfo[(String, Integer, Double)]])
- format.setDelimiter("\n")
- format.setFieldDelimiter("|")
- format.setCommentPrefix("#")
- val parameters = new Configuration
- format.configure(parameters)
- format.open(split)
- var result: (String, Integer, Double) = null
- result = format.nextRecord(result)
- assertNotNull(result)
- assertEquals("this is", result._1)
- assertEquals(new Integer(1), result._2)
- assertEquals(2.0, result._3, 0.0001)
- result = format.nextRecord(result)
- assertNotNull(result)
- assertEquals("a test", result._1)
- assertEquals(new Integer(3), result._2)
- assertEquals(4.0, result._3, 0.0001)
- result = format.nextRecord(result)
- assertNull(result)
- assertTrue(format.reachedEnd)
- }
- catch {
- case ex: Exception => {
- ex.printStackTrace
- fail("Test failed due to a " + ex.getClass.getName + ": " + ex.getMessage)
- }
- }
- }
-
- @Test
- def ignoreMultiCharPrefixComments():Unit = {
- try {
- val fileContent = "//description of the data\n" +
- "//successive commented line\n" +
- "this is|1|2.0|\n" +
- "a test|3|4.0|\n" +
- "//next|5|6.0|\n"
- val split = createTempFile(fileContent)
- val format = new TupleCsvInputFormat[(String, Integer, Double)](
- PATH,
- createTypeInformation[(String, Integer, Double)]
- .asInstanceOf[CaseClassTypeInfo[(String, Integer, Double)]])
- format.setDelimiter("\n")
- format.setFieldDelimiter("|")
- format.setCommentPrefix("//")
- val parameters = new Configuration
- format.configure(parameters)
- format.open(split)
- var result: (String, Integer, Double) = null
- result = format.nextRecord(result)
- assertNotNull(result)
- assertEquals("this is", result._1)
- assertEquals(new Integer(1), result._2)
- assertEquals(2.0, result._3, 0.0001)
- result = format.nextRecord(result)
- assertNotNull(result)
- assertEquals("a test", result._1)
- assertEquals(new Integer(3), result._2)
- assertEquals(4.0, result._3, 0.0001)
- result = format.nextRecord(result)
- assertNull(result)
- assertTrue(format.reachedEnd)
- }
- catch {
- case ex: Exception => {
- ex.printStackTrace
- fail("Test failed due to a " + ex.getClass.getName + ": " + ex.getMessage)
- }
- }
- }
-
- @Test
- def readStringFields():Unit = {
- try {
- val fileContent = "abc|def|ghijk\nabc||hhg\n|||"
- val split = createTempFile(fileContent)
- val format = new TupleCsvInputFormat[(String, String, String)](
- PATH,
- createTypeInformation[(String, String, String)]
- .asInstanceOf[CaseClassTypeInfo[(String, String, String)]])
- format.setDelimiter("\n")
- format.setFieldDelimiter("|")
- val parameters = new Configuration
- format.configure(parameters)
- format.open(split)
- var result: (String, String, String) = null
- result = format.nextRecord(result)
- assertNotNull(result)
- assertEquals("abc", result._1)
- assertEquals("def", result._2)
- assertEquals("ghijk", result._3)
- result = format.nextRecord(result)
- assertNotNull(result)
- assertEquals("abc", result._1)
- assertEquals("", result._2)
- assertEquals("hhg", result._3)
- result = format.nextRecord(result)
- assertNotNull(result)
- assertEquals("", result._1)
- assertEquals("", result._2)
- assertEquals("", result._3)
- result = format.nextRecord(result)
- assertNull(result)
- assertTrue(format.reachedEnd)
- }
- catch {
- case ex: Exception => {
- ex.printStackTrace()
- fail("Test failed due to a " + ex.getClass.getName + ": " + ex.getMessage)
- }
- }
- }
-
- @Test
- def readMixedQuotedStringFields():Unit = {
- try {
- val fileContent = "abc|\"de|f\"|ghijk\n\"a|bc\"||hhg\n|||"
- val split = createTempFile(fileContent)
- val format = new TupleCsvInputFormat[(String, String, String)](
- PATH,
- createTypeInformation[(String, String, String)]
- .asInstanceOf[CaseClassTypeInfo[(String, String, String)]])
- format.setDelimiter("\n")
- format.enableQuotedStringParsing('"')
- format.setFieldDelimiter("|")
- val parameters = new Configuration
- format.configure(parameters)
- format.open(split)
- var result: (String, String, String) = null
- result = format.nextRecord(result)
- assertNotNull(result)
- assertEquals("abc", result._1)
- assertEquals("de|f", result._2)
- assertEquals("ghijk", result._3)
- result = format.nextRecord(result)
- assertNotNull(result)
- assertEquals("a|bc", result._1)
- assertEquals("", result._2)
- assertEquals("hhg", result._3)
- result = format.nextRecord(result)
- assertNotNull(result)
- assertEquals("", result._1)
- assertEquals("", result._2)
- assertEquals("", result._3)
- result = format.nextRecord(result)
- assertNull(result)
- assertTrue(format.reachedEnd)
- }
- catch {
- case ex: Exception => {
- ex.printStackTrace()
- fail("Test failed due to a " + ex.getClass.getName + ": " + ex.getMessage)
- }
- }
- }
-
- @Test
- def readStringFieldsWithTrailingDelimiters(): Unit = {
- try {
- val fileContent = "abc|-def|-ghijk\nabc|-|-hhg\n|-|-|-\n"
- val split = createTempFile(fileContent)
- val format = new TupleCsvInputFormat[(String, String, String)](
- PATH,
- createTypeInformation[(String, String, String)]
- .asInstanceOf[CaseClassTypeInfo[(String, String, String)]])
- format.setDelimiter("\n")
- format.setFieldDelimiter("|-")
- val parameters = new Configuration
- format.configure(parameters)
- format.open(split)
- var result: (String, String, String) = null
- result = format.nextRecord(result)
- assertNotNull(result)
- assertEquals("abc", result._1)
- assertEquals("def", result._2)
- assertEquals("ghijk", result._3)
- result = format.nextRecord(result)
- assertNotNull(result)
- assertEquals("abc", result._1)
- assertEquals("", result._2)
- assertEquals("hhg", result._3)
- result = format.nextRecord(result)
- assertNotNull(result)
- assertEquals("", result._1)
- assertEquals("", result._2)
- assertEquals("", result._3)
- result = format.nextRecord(result)
- assertNull(result)
- assertTrue(format.reachedEnd)
- }
- catch {
- case ex: Exception =>
- ex.printStackTrace()
- fail("Test failed due to a " + ex.getClass.getName + ": " + ex.getMessage)
- }
- }
-
- @Test
- def testIntegerFields(): Unit = {
- try {
- val fileContent = "111|222|333|444|555\n666|777|888|999|000|\n"
- val split = createTempFile(fileContent)
- val format = new TupleCsvInputFormat[(Int, Int, Int, Int, Int)](
- PATH, createTypeInformation[(Int, Int, Int, Int, Int)].
- asInstanceOf[CaseClassTypeInfo[(Int, Int, Int, Int, Int)]])
- format.setFieldDelimiter("|")
- format.configure(new Configuration)
- format.open(split)
- var result: (Int, Int, Int, Int, Int) = null
- result = format.nextRecord(result)
- assertNotNull(result)
- assertEquals(Integer.valueOf(111), result._1)
- assertEquals(Integer.valueOf(222), result._2)
- assertEquals(Integer.valueOf(333), result._3)
- assertEquals(Integer.valueOf(444), result._4)
- assertEquals(Integer.valueOf(555), result._5)
- result = format.nextRecord(result)
- assertNotNull(result)
- assertEquals(Integer.valueOf(666), result._1)
- assertEquals(Integer.valueOf(777), result._2)
- assertEquals(Integer.valueOf(888), result._3)
- assertEquals(Integer.valueOf(999), result._4)
- assertEquals(Integer.valueOf(0), result._5)
- result = format.nextRecord(result)
- assertNull(result)
- assertTrue(format.reachedEnd)
- }
- catch {
- case ex: Exception =>
- fail("Test failed due to a " + ex.getClass.getName + ": " + ex.getMessage)
- }
- }
-
- @Test
- def testReadFirstN(): Unit = {
- try {
- val fileContent = "111|x|222|x|333|x|444|x|555|x|\n" +
- "666|x|777|x|888|x|999|x|000|x|\n"
- val split = createTempFile(fileContent)
- val format = new TupleCsvInputFormat[(Int, Int)](
- PATH,
- createTypeInformation[(Int, Int)].asInstanceOf[CaseClassTypeInfo[(Int, Int)]])
- format.setFieldDelimiter("|x|")
- format.configure(new Configuration)
- format.open(split)
- var result: (Int, Int) = null
- result = format.nextRecord(result)
- assertNotNull(result)
- assertEquals(Integer.valueOf(111), result._1)
- assertEquals(Integer.valueOf(222), result._2)
- result = format.nextRecord(result)
- assertNotNull(result)
- assertEquals(Integer.valueOf(666), result._1)
- assertEquals(Integer.valueOf(777), result._2)
- result = format.nextRecord(result)
- assertNull(result)
- assertTrue(format.reachedEnd)
- }
- catch {
- case ex: Exception =>
- fail("Test failed due to a " + ex.getClass.getName + ": " + ex.getMessage)
- }
- }
- @Test
- def testReadSparseWithPositionSetter(): Unit = {
- try {
- val fileContent: String = "111|222|333|444|555|666|777|888|999|000|\n000|999|888|777|666" +
- "|555|444|333|222|111|"
- val split = createTempFile(fileContent)
- val format = new TupleCsvInputFormat[(Int, Int, Int)](
- PATH,
- createTypeInformation[(Int, Int, Int)].asInstanceOf[CaseClassTypeInfo[(Int, Int, Int)]],
- Array(0, 3, 7))
- format.setFieldDelimiter("|")
- format.configure(new Configuration)
- format.open(split)
- var result: (Int, Int, Int) = null
- result = format.nextRecord(result)
- assertNotNull(result)
- assertEquals(Integer.valueOf(111), result._1)
- assertEquals(Integer.valueOf(444), result._2)
- assertEquals(Integer.valueOf(888), result._3)
- result = format.nextRecord(result)
- assertNotNull(result)
- assertEquals(Integer.valueOf(0), result._1)
- assertEquals(Integer.valueOf(777), result._2)
- assertEquals(Integer.valueOf(333), result._3)
- result = format.nextRecord(result)
- assertNull(result)
- assertTrue(format.reachedEnd)
- }
- catch {
- case ex: Exception =>
- fail("Test failed due to a " + ex.getClass.getName + ": " + ex.getMessage)
- }
- }
-
- private def createTempFile(content: String): FileInputSplit = {
- val tempFile = File.createTempFile("test_contents", "tmp")
- tempFile.deleteOnExit()
- val wrt = new FileWriter(tempFile)
- wrt.write(content)
- wrt.close()
- new FileInputSplit(0, new Path(tempFile.toURI.toString), 0,
- tempFile.length,Array[String]("localhost"))
- }
-
- @Test
- def testWindowsLineEndRemoval(): Unit = {
- this.testRemovingTrailingCR("\n", "\n")
- this.testRemovingTrailingCR("\r\n", "\r\n")
- this.testRemovingTrailingCR("\r\n", "\n")
- }
-
- private def testRemovingTrailingCR(lineBreakerInFile: String, lineBreakerSetup: String) {
- var tempFile: File = null
- val fileContent = FIRST_PART + lineBreakerInFile + SECOND_PART + lineBreakerInFile
- try {
- tempFile = File.createTempFile("CsvInputFormatTest", "tmp")
- tempFile.deleteOnExit()
- tempFile.setWritable(true)
- val wrt = new OutputStreamWriter(new FileOutputStream(tempFile))
- wrt.write(fileContent)
- wrt.close()
- val inputFormat = new TupleCsvInputFormat[Tuple1[String]](new Path(tempFile.toURI.toString),
- createTypeInformation[Tuple1[String]].asInstanceOf[CaseClassTypeInfo[Tuple1[String]]])
- val parameters = new Configuration
- inputFormat.configure(parameters)
- inputFormat.setDelimiter(lineBreakerSetup)
- val splits = inputFormat.createInputSplits(1)
- inputFormat.open(splits(0))
- var result = inputFormat.nextRecord(null)
- assertNotNull("Expecting to not return null", result)
- assertEquals(FIRST_PART, result._1)
- result = inputFormat.nextRecord(result)
- assertNotNull("Expecting to not return null", result)
- assertEquals(SECOND_PART, result._1)
- }
- catch {
- case t: Throwable =>
- System.err.println("test failed with exception: " + t.getMessage)
- t.printStackTrace(System.err)
- fail("Test erroneous")
- }
- }
-
- class POJOItem(var field1: Int, var field2: String, var field3: Double) {
- def this() {
- this(-1, "", -1)
- }
- }
-
- case class CaseClassItem(field1: Int, field2: String, field3: Double)
-
- private def validatePOJOItem(format: PojoCsvInputFormat[POJOItem]): Unit = {
- var result = new POJOItem()
- result = format.nextRecord(result)
- assertEquals(123, result.field1)
- assertEquals("HELLO", result.field2)
- assertEquals(3.123, result.field3, 0.001)
-
- result = format.nextRecord(result)
- assertEquals(456, result.field1)
- assertEquals("ABC", result.field2)
- assertEquals(1.234, result.field3, 0.001)
- }
-
- private def validateCaseClassItem(format: TupleCsvInputFormat[CaseClassItem]): Unit = {
- var result = format.nextRecord(null)
- assertEquals(123, result.field1)
- assertEquals("HELLO", result.field2)
- assertEquals(3.123, result.field3, 0.001)
-
- result = format.nextRecord(null)
- assertEquals(456, result.field1)
- assertEquals("ABC", result.field2)
- assertEquals(1.234, result.field3, 0.001)
- }
-
- @Test
- def testPOJOType(): Unit = {
- val fileContent = "123,HELLO,3.123\n" + "456,ABC,1.234"
- val tempFile = createTempFile(fileContent)
- val typeInfo: PojoTypeInfo[POJOItem] = createTypeInformation[POJOItem]
- .asInstanceOf[PojoTypeInfo[POJOItem]]
- val format = new PojoCsvInputFormat[POJOItem](PATH, typeInfo)
-
- format.setDelimiter('\n')
- format.setFieldDelimiter(",")
- format.configure(new Configuration)
- format.open(tempFile)
-
- validatePOJOItem(format)
- }
-
- @Test
- def testCaseClass(): Unit = {
- val fileContent = "123,HELLO,3.123\n" + "456,ABC,1.234"
- val tempFile = createTempFile(fileContent)
- val typeInfo: CaseClassTypeInfo[CaseClassItem] =
- createTypeInformation[CaseClassItem]
- .asInstanceOf[CaseClassTypeInfo[CaseClassItem]]
- val format = new TupleCsvInputFormat[CaseClassItem](PATH, typeInfo)
-
- format.setDelimiter('\n')
- format.setFieldDelimiter(",")
- format.configure(new Configuration)
- format.open(tempFile)
-
- validateCaseClassItem(format)
- }
-
- @Test
- def testPOJOTypeWithFieldMapping(): Unit = {
- val fileContent = "HELLO,123,3.123\n" + "ABC,456,1.234"
- val tempFile = createTempFile(fileContent)
- val typeInfo: PojoTypeInfo[POJOItem] = createTypeInformation[POJOItem]
- .asInstanceOf[PojoTypeInfo[POJOItem]]
- val format = new PojoCsvInputFormat[POJOItem](
- PATH, typeInfo, Array("field2", "field1", "field3"))
-
- format.setDelimiter('\n')
- format.setFieldDelimiter(",")
- format.configure(new Configuration)
- format.open(tempFile)
-
- validatePOJOItem(format)
- }
-
- @Test
- def testPOJOTypeWithFieldSubsetAndDataSubset(): Unit = {
- val fileContent = "HELLO,123,NODATA,3.123,NODATA\n" + "ABC,456,NODATA,1.234,NODATA"
- val tempFile = createTempFile(fileContent)
- val typeInfo: PojoTypeInfo[POJOItem] = createTypeInformation[POJOItem]
- .asInstanceOf[PojoTypeInfo[POJOItem]]
- val format = new PojoCsvInputFormat[POJOItem](
- PATH, typeInfo, Array("field2", "field1", "field3"),
- Array(true, true, false, true, false))
-
- format.setDelimiter('\n')
- format.setFieldDelimiter(",")
- format.configure(new Configuration)
- format.open(tempFile)
-
- validatePOJOItem(format)
- }
-
- @Test
- def testPOJOSubclassType(): Unit = {
- val fileContent = "t1,foobar,tweet2\nt2,barfoo,tweet2"
- val tempFile = createTempFile(fileContent)
- val typeInfo: PojoTypeInfo[TwitterPOJO] = createTypeInformation[TwitterPOJO]
- .asInstanceOf[PojoTypeInfo[TwitterPOJO]]
- val format = new PojoCsvInputFormat[TwitterPOJO](PATH, typeInfo)
-
- format.setDelimiter('\n')
- format.setFieldDelimiter(",")
- format.configure(new Configuration)
- format.open(tempFile)
-
- val expected = for (line <- fileContent.split("\n")) yield {
- val elements = line.split(",")
- new TwitterPOJO(elements(0), elements(1), elements(2))
- }
-
- val actual = ArrayBuffer[TwitterPOJO]()
- var readNextElement = true
-
- while (readNextElement) {
- val element = format.nextRecord(new TwitterPOJO())
-
- if (element != null) {
- actual += element
- } else {
- readNextElement = false
- }
- }
-
- assert(expected.sameElements(actual))
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/6a935978/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/CaseClassComparatorTest.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/CaseClassComparatorTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/CaseClassComparatorTest.scala
deleted file mode 100644
index 1b48612..0000000
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/CaseClassComparatorTest.scala
+++ /dev/null
@@ -1,157 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.runtime
-
-import java.util
-import java.util.{ArrayList, List, Random}
-
-import org.apache.flink.api.common.ExecutionConfig
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.common.typeutils.{CompositeType, TypeComparator}
-import org.apache.flink.api.scala._
-import org.apache.flink.core.memory._
-import org.apache.flink.runtime.operators.sort.{NormalizedKeySorter, QuickSort}
-
-import org.junit.Assert._
-import org.junit.Test
-import org.mockito.Mockito
-
-class CaseClassComparatorTest {
-
- case class CaseTestClass(a: Int, b: Int, c: Int, d: String)
-
- @Test
- def testNormalizedKeyGeneration(): Unit = {
- try {
-
- val typeInfo = implicitly[TypeInformation[CaseTestClass]]
- .asInstanceOf[CompositeType[CaseTestClass]]
-
- val serializer = typeInfo.createSerializer(new ExecutionConfig)
- val comparator = new FailingCompareDeserializedWrapper(
- typeInfo.createComparator(
- Array[Int](0, 2),
- Array[Boolean](true, true),
- 0,
- new ExecutionConfig))
-
- assertTrue(comparator.supportsNormalizedKey())
- assertEquals(8, comparator.getNormalizeKeyLen())
- assertFalse(comparator.isNormalizedKeyPrefixOnly(8))
-
- // validate the failing mock
- {
- val in1 : DataInputView = Mockito.mock(classOf[DataInputView])
- val in2 : DataInputView = Mockito.mock(classOf[DataInputView])
-
- try {
- comparator.compareSerialized(in1, in2)
- fail("should throw an exception")
- }
- catch {
- case e: UnsupportedOperationException => // fine
- case ee: Exception => fail("unexpected exception")
- }
- }
-
-
- val numMemSegs = 20
- val memory : util.List[MemorySegment] = new util.ArrayList[MemorySegment](numMemSegs)
- for (i <- 1 to numMemSegs) {
- memory.add(MemorySegmentFactory.allocateUnpooledSegment(32*1024))
- }
-
- val sorter : NormalizedKeySorter[CaseTestClass] = new NormalizedKeySorter[CaseTestClass](
- serializer, comparator, memory)
-
- val rnd = new Random()
- var moreToGo = true
- var num = 0
-
- while (moreToGo) {
- val next = CaseTestClass(rnd.nextInt(), rnd.nextInt(), rnd.nextInt(), "")
- moreToGo = sorter.write(next)
- num += 1
- }
-
- print(num)
-
- new QuickSort().sort(sorter)
- }
- catch {
- case e: Exception => {
- e.printStackTrace()
- fail(e.getMessage())
- }
- }
- }
-
- class FailingCompareDeserializedWrapper[T](wrapped: TypeComparator[T]) extends TypeComparator[T] {
-
- def hash(record: T) : Int = wrapped.hash(record)
-
- def setReference(toCompare: T) = wrapped.setReference(toCompare)
-
- def equalToReference(candidate: T): Boolean = wrapped.equalToReference(candidate)
-
- def compareToReference(referencedComparator: TypeComparator[T]): Int
- = wrapped.compareToReference(referencedComparator)
-
- override def supportsCompareAgainstReference(): Boolean
- = wrapped.supportsCompareAgainstReference()
-
- def compare(first: T, second: T): Int = wrapped.compare(first, second)
-
- def compareSerialized(firstSource: DataInputView, secondSource: DataInputView): Int = {
- throw new UnsupportedOperationException("Not Supported")
- }
-
- def supportsNormalizedKey(): Boolean = wrapped.supportsNormalizedKey()
-
- def supportsSerializationWithKeyNormalization(): Boolean
- = wrapped.supportsSerializationWithKeyNormalization()
-
- def getNormalizeKeyLen(): Int = wrapped.getNormalizeKeyLen()
-
- def isNormalizedKeyPrefixOnly(keyBytes: Int): Boolean
- = wrapped.isNormalizedKeyPrefixOnly(keyBytes)
-
- def putNormalizedKey(record: T, target: MemorySegment, offset: Int, numBytes: Int): Unit
- = wrapped.putNormalizedKey(record, target, offset, numBytes)
-
- def writeWithKeyNormalization(record: T, target: DataOutputView): Unit
- = wrapped.writeWithKeyNormalization(record, target)
-
- def readWithKeyDenormalization(reuse: T, source: DataInputView): T
- = wrapped.readWithKeyDenormalization(reuse, source)
-
- def invertNormalizedKey(): Boolean = wrapped.invertNormalizedKey()
-
- def duplicate(): TypeComparator[T] = new FailingCompareDeserializedWrapper(wrapped.duplicate())
-
- def extractKeys(record: Object, target: Array[Object], index: Int): Int
- = wrapped.extractKeys(record, target, index)
-
- def getFlatComparators(): Array[TypeComparator[_]] = wrapped.getFlatComparators()
-
- override def compareAgainstReference(keys: Array[Comparable[_]]): Int = {
- throw new UnsupportedOperationException("Workaround hack.")
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/6a935978/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/CaseClassNormalizedKeySortingTest.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/CaseClassNormalizedKeySortingTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/CaseClassNormalizedKeySortingTest.scala
new file mode 100644
index 0000000..ace93a2
--- /dev/null
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/CaseClassNormalizedKeySortingTest.scala
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.runtime
+
+import org.apache.flink.api.common.ExecutionConfig
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.common.typeutils.{CompositeType, TypeComparator}
+import org.apache.flink.api.scala._
+import org.apache.flink.core.memory._
+import org.apache.flink.runtime.operators.sort.{NormalizedKeySorter, QuickSort}
+
+import java.util
+import java.util.Random
+
+import org.junit.Assert._
+import org.junit.Test
+import org.mockito.Mockito
+
+/** Test that verifies that the case class comparators properly
+ * generate normalized keys used during sorting.
+ */
+class CaseClassNormalizedKeySortingTest {
+
+ case class CaseTestClass(a: Int, b: Int, c: Int, d: String)
+
+ @Test
+ def testNormalizedKeyGeneration(): Unit = {
+ val typeInfo = implicitly[TypeInformation[CaseTestClass]]
+ .asInstanceOf[CompositeType[CaseTestClass]]
+
+ val serializer = typeInfo.createSerializer(new ExecutionConfig)
+ val comparator = new FailingCompareDeserializedWrapper(
+ typeInfo.createComparator(
+ Array[Int](0, 2),
+ Array[Boolean](true, true),
+ 0,
+ new ExecutionConfig))
+
+ assertTrue(comparator.supportsNormalizedKey())
+ assertEquals(8, comparator.getNormalizeKeyLen())
+ assertFalse(comparator.isNormalizedKeyPrefixOnly(8))
+
+ // validate the failing mock
+ {
+ val in1 : DataInputView = Mockito.mock(classOf[DataInputView])
+ val in2 : DataInputView = Mockito.mock(classOf[DataInputView])
+
+ try {
+ comparator.compareSerialized(in1, in2)
+ fail("should throw an exception")
+ }
+ catch {
+ case e: UnsupportedOperationException => // fine
+ case ee: Exception => fail("unexpected exception")
+ }
+ }
+
+
+ val numMemSegs = 20
+ val memory : util.List[MemorySegment] = new util.ArrayList[MemorySegment](numMemSegs)
+ for (i <- 1 to numMemSegs) {
+ memory.add(MemorySegmentFactory.allocateUnpooledSegment(32*1024))
+ }
+
+ val sorter : NormalizedKeySorter[CaseTestClass] = new NormalizedKeySorter[CaseTestClass](
+ serializer, comparator, memory)
+
+ val rnd = new Random()
+ var moreToGo = true
+ var num = 0
+
+ while (moreToGo) {
+ val next = CaseTestClass(rnd.nextInt(), rnd.nextInt(), rnd.nextInt(), "")
+ moreToGo = sorter.write(next)
+ num += 1
+ }
+
+ new QuickSort().sort(sorter)
+ }
+
+ class FailingCompareDeserializedWrapper[T](wrapped: TypeComparator[T]) extends TypeComparator[T] {
+
+ def hash(record: T) : Int = wrapped.hash(record)
+
+ def setReference(toCompare: T) = wrapped.setReference(toCompare)
+
+ def equalToReference(candidate: T): Boolean = wrapped.equalToReference(candidate)
+
+ def compareToReference(referencedComparator: TypeComparator[T]): Int
+ = wrapped.compareToReference(referencedComparator)
+
+ override def supportsCompareAgainstReference(): Boolean
+ = wrapped.supportsCompareAgainstReference()
+
+ def compare(first: T, second: T): Int = wrapped.compare(first, second)
+
+ def compareSerialized(firstSource: DataInputView, secondSource: DataInputView): Int = {
+ throw new UnsupportedOperationException("Not Supported")
+ }
+
+ def supportsNormalizedKey(): Boolean = wrapped.supportsNormalizedKey()
+
+ def supportsSerializationWithKeyNormalization(): Boolean
+ = wrapped.supportsSerializationWithKeyNormalization()
+
+ def getNormalizeKeyLen(): Int = wrapped.getNormalizeKeyLen()
+
+ def isNormalizedKeyPrefixOnly(keyBytes: Int): Boolean
+ = wrapped.isNormalizedKeyPrefixOnly(keyBytes)
+
+ def putNormalizedKey(record: T, target: MemorySegment, offset: Int, numBytes: Int): Unit
+ = wrapped.putNormalizedKey(record, target, offset, numBytes)
+
+ def writeWithKeyNormalization(record: T, target: DataOutputView): Unit
+ = wrapped.writeWithKeyNormalization(record, target)
+
+ def readWithKeyDenormalization(reuse: T, source: DataInputView): T
+ = wrapped.readWithKeyDenormalization(reuse, source)
+
+ def invertNormalizedKey(): Boolean = wrapped.invertNormalizedKey()
+
+ def duplicate(): TypeComparator[T] = new FailingCompareDeserializedWrapper(wrapped.duplicate())
+
+ def extractKeys(record: Object, target: Array[Object], index: Int): Int
+ = wrapped.extractKeys(record, target, index)
+
+ def getFlatComparators(): Array[TypeComparator[_]] = wrapped.getFlatComparators()
+
+ override def compareAgainstReference(keys: Array[Comparable[_]]): Int = {
+ throw new UnsupportedOperationException("Workaround hack.")
+ }
+ }
+}