You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/10/14 17:08:12 UTC

[7/8] flink git commit: [hotfix] [scala api] Move tests to correct package

[hotfix] [scala api] Move tests to correct package

We previously had all Scala API unit tests in the flink-tests
project, because Eclipse could not use macros in 'test' that were
declared in 'main'.

Because we do not support Eclipse for development of the system any
more (only for using Flink to develop Flink-based applications),
we can now move the tests to their natural location and simplify
some of the dependency structures.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6a935978
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6a935978
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6a935978

Branch: refs/heads/master
Commit: 6a9359784a80b5136e38775815c0362c3294c1dc
Parents: f81af45
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Oct 13 22:21:30 2017 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Sat Oct 14 17:01:56 2017 +0200

----------------------------------------------------------------------
 flink-scala/pom.xml                             |  22 +-
 .../flink/api/operator/MaxByOperatorTest.scala  | 169 ------
 .../flink/api/operator/MinByOperatorTest.scala  | 173 ------
 .../api/operator/SelectByFunctionTest.scala     | 219 -------
 .../scala/DeltaIterationSanityCheckTest.scala   | 169 ++++++
 .../flink/api/scala/MaxByOperatorTest.scala     | 169 ++++++
 .../flink/api/scala/MinByOperatorTest.scala     | 172 ++++++
 .../flink/api/scala/SelectByFunctionTest.scala  | 218 +++++++
 .../SemanticPropertiesTranslationTest.scala     | 282 +++++++++
 .../scala/io/CollectionInputFormatTest.scala    | 137 +++++
 .../flink/api/scala/io/CsvInputFormatTest.scala | 565 +++++++++++++++++
 .../api/scala/metrics/ScalaGaugeTest.scala      |   7 +-
 .../scala/runtime/EnumValueComparatorTest.scala |  53 ++
 .../runtime/GenericPairComparatorTest.scala     |  76 +++
 .../runtime/KryoGenericTypeSerializerTest.scala | 220 +++++++
 .../ScalaSpecialTypesSerializerTest.scala       | 203 +++++++
 .../runtime/TraversableSerializerTest.scala     | 198 ++++++
 .../scala/runtime/TupleComparatorILD2Test.scala |  56 ++
 .../scala/runtime/TupleComparatorILD3Test.scala |  59 ++
 .../runtime/TupleComparatorILDC3Test.scala      |  59 ++
 .../runtime/TupleComparatorILDX1Test.scala      |  55 ++
 .../runtime/TupleComparatorILDXC2Test.scala     |  55 ++
 .../scala/runtime/TupleComparatorISD1Test.scala |  54 ++
 .../scala/runtime/TupleComparatorISD2Test.scala |  54 ++
 .../scala/runtime/TupleComparatorISD3Test.scala |  58 ++
 .../api/scala/runtime/TupleSerializerTest.scala | 236 +++++++
 .../runtime/TupleSerializerTestInstance.scala   |  90 +++
 .../tuple/base/PairComparatorTestBase.scala     | 103 ++++
 .../tuple/base/TupleComparatorTestBase.scala    |  31 +
 .../scala/types/TypeInformationGenTest.scala    | 589 ++++++++++++++++++
 .../test/resources/flink_11-kryo_registrations  |  86 ---
 .../scala/DeltaIterationSanityCheckTest.scala   | 169 ------
 .../SemanticPropertiesTranslationTest.scala     | 282 ---------
 .../scala/io/CollectionInputFormatTest.scala    | 137 -----
 .../flink/api/scala/io/CsvInputFormatTest.scala | 537 ----------------
 .../scala/runtime/CaseClassComparatorTest.scala | 157 -----
 .../CaseClassNormalizedKeySortingTest.scala     | 149 +++++
 .../scala/runtime/EnumValueComparatorTest.scala |  53 --
 .../runtime/GenericPairComparatorTest.scala     |  76 ---
 .../runtime/KryoGenericTypeSerializerTest.scala | 311 ----------
 .../ScalaSpecialTypesSerializerTest.scala       | 203 -------
 .../runtime/TraversableSerializerTest.scala     | 198 ------
 .../scala/runtime/TupleComparatorILD2Test.scala |  56 --
 .../scala/runtime/TupleComparatorILD3Test.scala |  59 --
 .../runtime/TupleComparatorILDC3Test.scala      |  59 --
 .../runtime/TupleComparatorILDX1Test.scala      |  55 --
 .../runtime/TupleComparatorILDXC2Test.scala     |  55 --
 .../scala/runtime/TupleComparatorISD1Test.scala |  54 --
 .../scala/runtime/TupleComparatorISD2Test.scala |  54 --
 .../scala/runtime/TupleComparatorISD3Test.scala |  58 --
 .../api/scala/runtime/TupleSerializerTest.scala | 236 -------
 .../runtime/TupleSerializerTestInstance.scala   |  90 ---
 .../tuple/base/PairComparatorTestBase.scala     | 103 ----
 .../tuple/base/TupleComparatorTestBase.scala    |  31 -
 .../scala/types/TypeInformationGenTest.scala    | 608 -------------------
 55 files changed, 4132 insertions(+), 4295 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6a935978/flink-scala/pom.xml
----------------------------------------------------------------------
diff --git a/flink-scala/pom.xml b/flink-scala/pom.xml
index c3cc554..93152df 100644
--- a/flink-scala/pom.xml
+++ b/flink-scala/pom.xml
@@ -75,9 +75,15 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
+			<artifactId>flink-test-utils-junit</artifactId>
 			<version>${project.version}</version>
-			<type>test-jar</type>
+			<scope>test</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>com.twitter</groupId>
+			<artifactId>chill_${scala.binary.version}</artifactId>
+			<version>${chill.version}</version>
 			<scope>test</scope>
 		</dependency>
 
@@ -89,6 +95,18 @@ under the License.
 			<type>test-jar</type>
 		</dependency>
 
+		<dependency>
+			<groupId>joda-time</groupId>
+			<artifactId>joda-time</artifactId>
+			<scope>test</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.joda</groupId>
+			<artifactId>joda-convert</artifactId>
+			<scope>test</scope>
+		</dependency>
+
 	</dependencies>
 
 	<build>

http://git-wip-us.apache.org/repos/asf/flink/blob/6a935978/flink-scala/src/test/scala/org/apache/flink/api/operator/MaxByOperatorTest.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/operator/MaxByOperatorTest.scala b/flink-scala/src/test/scala/org/apache/flink/api/operator/MaxByOperatorTest.scala
deleted file mode 100644
index 523cd5d..0000000
--- a/flink-scala/src/test/scala/org/apache/flink/api/operator/MaxByOperatorTest.scala
+++ /dev/null
@@ -1,169 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.operator
-
-import org.apache.flink.api.common.InvalidProgramException
-import org.apache.flink.api.scala._
-import org.junit.Test
-import org.junit.Assert
-
-class MaxByOperatorTest {
-
-  private val emptyTupleData = List[(Int, Long, String, Long, Int)]()
-  private val customTypeData = List[CustomType]()
-
-  @Test
-  def testMaxByKeyFieldsDataset(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val collection = env.fromCollection(emptyTupleData)
-    try {
-      collection.maxBy(0, 1, 2, 3, 4)
-    } catch {
-      case e : Exception => Assert.fail();
-    }
-  }
-
-  /**
-    * This test validates that an index which is out of bounds throws an
-    * IndexOutOfBOundsExcpetion.
-    */
-  @Test(expected = classOf[IndexOutOfBoundsException])
-  def testOutOfTupleBoundsDataset1() {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val collection = env.fromCollection(emptyTupleData)
-
-    // should not work, key out of tuple bounds
-    collection.maxBy(5)
-  }
-
-  /**
-    * This test validates that an index which is out of bounds throws an
-    * IndexOutOfBOundsExcpetion.
-    */
-  @Test(expected = classOf[IndexOutOfBoundsException])
-  def testOutOfTupleBoundsDataset2() {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val collection = env.fromCollection(emptyTupleData)
-
-    // should not work, key out of tuple bounds
-    collection.maxBy(-1)
-  }
-
-  /**
-    * This test validates that an index which is out of bounds throws an
-    * IndexOutOfBOundsExcpetion.
-    */
-  @Test(expected = classOf[IndexOutOfBoundsException])
-  def testOutOfTupleBoundsDataset3() {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val collection = env.fromCollection(emptyTupleData)
-
-    // should not work, key out of tuple bounds
-    collection.maxBy(1, 2, 3, 4, -1)
-  }
-
-  /**
-    * This test validates that no exceptions is thrown when an empty grouping
-    * calls maxBy().
-    */
-  @Test
-  def testMaxByKeyFieldsGrouping() {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val groupDs = env.fromCollection(emptyTupleData).groupBy(0)
-    // should work
-    try {
-      groupDs.maxBy(4, 0, 1, 2, 3)
-    } catch {
-      case e : Exception => Assert.fail();
-    }
-  }
-
-  /**
-    * This test validates that an InvalidProgrammException is thrown when maxBy
-    * is used on a custom data type.
-    */
-  @Test(expected = classOf[InvalidProgramException])
-  def testCustomKeyFieldsDataset() {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-
-    val customDS = env.fromCollection(customTypeData)
-    // should not work: groups on custom type
-    customDS.maxBy(0)
-  }
-
-  /**
-    * This test validates that an InvalidProgrammException is thrown when maxBy
-    * is used on a custom data type.
-    */
-  @Test(expected = classOf[InvalidProgramException])
-  def testCustomKeyFieldsGrouping() {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val groupDs: GroupedDataSet[CustomType] = env.fromCollection(customTypeData).groupBy(0)
-
-    groupDs.maxBy(0)
-  }
-  /**
-    * This test validates that an index which is out of bounds throws an
-    * IndexOutOfBOundsExcpetion.
-    */
-  @Test(expected = classOf[IndexOutOfBoundsException])
-  def testOutOfTupleBoundsGrouping1() {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val groupDs = env.fromCollection(emptyTupleData).groupBy(0)
-    groupDs.maxBy(5)
-  }
-
-  /**
-    * This test validates that an index which is out of bounds throws an
-    * IndexOutOfBOundsExcpetion.
-    */
-  @Test(expected = classOf[IndexOutOfBoundsException])
-  def testOutOfTupleBoundsGrouping2() {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val groupDs = env.fromCollection(emptyTupleData).groupBy(0)
-    groupDs.maxBy(-1)
-  }
-
-  /**
-    * This test validates that an index which is out of bounds throws an
-    * IndexOutOfBOundsExcpetion.
-    */
-  @Test(expected = classOf[IndexOutOfBoundsException])
-  def testOutOfTupleBoundsGrouping3() {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val groupDs = env.fromCollection(emptyTupleData).groupBy(0)
-    groupDs.maxBy(1, 2, 3, 4, -1)
-  }
-
-  class CustomType(var myInt: Int, var myLong: Long, var myString: String) {
-    def this() {
-      this(0, 0, "")
-    }
-
-    override def toString: String = {
-      myInt + "," + myLong + "," + myString
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/6a935978/flink-scala/src/test/scala/org/apache/flink/api/operator/MinByOperatorTest.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/operator/MinByOperatorTest.scala b/flink-scala/src/test/scala/org/apache/flink/api/operator/MinByOperatorTest.scala
deleted file mode 100644
index f9d5249..0000000
--- a/flink-scala/src/test/scala/org/apache/flink/api/operator/MinByOperatorTest.scala
+++ /dev/null
@@ -1,173 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.operator
-
-import org.apache.flink.api.common.InvalidProgramException
-import org.apache.flink.api.scala.ExecutionEnvironment
-import org.apache.flink.api.scala._
-import org.junit.Test
-import org.junit.Assert
-
-class MinByOperatorTest {
-  private val emptyTupleData = List[(Int, Long, String, Long, Int)]()
-  private val customTypeData = List[CustomType]()
-
-  @Test
-  def testMinByKeyFieldsDataset(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val collection = env.fromCollection(emptyTupleData)
-    try {
-      collection.minBy(4, 0, 1, 2, 3)
-    } catch {
-      case e : Exception => Assert.fail();
-    }
-  }
-
-  /**
-    * This test validates that an index which is out of bounds throws an
-    * IndexOutOfBOundsExcpetion.
-    */
-  @Test(expected = classOf[IndexOutOfBoundsException])
-  def testOutOfTupleBoundsDataset1() {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val collection = env.fromCollection(emptyTupleData)
-
-    // should not work, key out of tuple bounds
-    collection.minBy(5)
-  }
-
-  /**
-    * This test validates that an index which is out of bounds throws an
-    * IndexOutOfBOundsExcpetion.
-    */
-  @Test(expected = classOf[IndexOutOfBoundsException])
-  def testOutOfTupleBoundsDataset2() {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val collection = env.fromCollection(emptyTupleData)
-
-    // should not work, key out of tuple bounds
-    collection.minBy(-1)
-  }
-
-  /**
-    * This test validates that an index which is out of bounds throws an
-    * IndexOutOfBOundsExcpetion.
-    */
-  @Test(expected = classOf[IndexOutOfBoundsException])
-  def testOutOfTupleBoundsDataset3() {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val collection = env.fromCollection(emptyTupleData)
-
-    // should not work, key out of tuple bounds
-    collection.minBy(1, 2, 3, 4, -1)
-  }
-
-  /**
-    * This test validates that an InvalidProgrammException is thrown when minBy
-    * is used on a custom data type.
-    */
-  @Test(expected = classOf[InvalidProgramException])
-  def testCustomKeyFieldsDataset() {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-
-    val customDS = env.fromCollection(customTypeData)
-    // should not work: groups on custom type
-    customDS.minBy(0)
-  }
-
-  /**
-    * This test validates that no exceptions is thrown when an empty grouping
-    * calls minBy().
-    */
-  @Test
-  def testMinByKeyFieldsGrouping() {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val groupDs = env.fromCollection(emptyTupleData).groupBy(0)
-    // should work
-    try {
-      groupDs.minBy(4, 0, 1, 2, 3)
-    } catch {
-      case e : Exception => Assert.fail()
-    }
-  }
-
-  /**
-    * This test validates that an InvalidProgrammException is thrown when minBy
-    * is used on a custom data type.
-    */
-  @Test(expected = classOf[InvalidProgramException])
-  def testCustomKeyFieldsGrouping() {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val groupDs: GroupedDataSet[CustomType] = env.fromCollection(customTypeData).groupBy(0)
-
-    groupDs.minBy(0)
-  }
-
-  /**
-    * This test validates that an index which is out of bounds throws an
-    * IndexOutOfBOundsExcpetion.
-    */
-  @Test(expected = classOf[IndexOutOfBoundsException])
-  def testOutOfTupleBoundsGrouping1() {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val groupDs = env.fromCollection(emptyTupleData).groupBy(0)
-
-    groupDs.minBy(5)
-  }
-
-  /**
-    * This test validates that an index which is out of bounds throws an
-    * IndexOutOfBOundsExcpetion.
-    */
-  @Test(expected = classOf[IndexOutOfBoundsException])
-  def testOutOfTupleBoundsGrouping2() {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val groupDs = env.fromCollection(emptyTupleData).groupBy(0)
-
-    groupDs.minBy(-1)
-  }
-
-  /**s
-    * This test validates that an index which is out of bounds throws an
-    * IndexOutOfBOundsExcpetion.
-    */
-  @Test(expected = classOf[IndexOutOfBoundsException])
-  def testOutOfTupleBoundsGrouping3() {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val groupDs = env.fromCollection(emptyTupleData).groupBy(0)
-
-    groupDs.minBy(1, 2, 3, 4, -1)
-  }
-
-  class CustomType(var myInt: Int, var myLong: Long, var myString: String) {
-    def this() {
-      this(0, 0, "")
-    }
-
-    override def toString: String = {
-      myInt + "," + myLong + "," + myString
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/6a935978/flink-scala/src/test/scala/org/apache/flink/api/operator/SelectByFunctionTest.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/operator/SelectByFunctionTest.scala b/flink-scala/src/test/scala/org/apache/flink/api/operator/SelectByFunctionTest.scala
deleted file mode 100644
index d6af6bd..0000000
--- a/flink-scala/src/test/scala/org/apache/flink/api/operator/SelectByFunctionTest.scala
+++ /dev/null
@@ -1,219 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.operator
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.typeutils.TupleTypeInfoBase
-import org.apache.flink.api.scala.{SelectByMaxFunction, SelectByMinFunction}
-import org.apache.flink.api.scala._
-import org.junit.{Assert, Test}
-
-/**
-  *
-  */
-class SelectByFunctionTest {
-
-   val tupleTypeInfo = implicitly[TypeInformation[(Int, Long, String, Long, Int)]]
-    .asInstanceOf[TupleTypeInfoBase[(Int, Long, String, Long, Int)]]
-
-   private val bigger  = (10, 100L, "HelloWorld", 200L, 20)
-   private val smaller = (5, 50L, "Hello", 50L, 15)
-
-   //Special case where only the last value determines if bigger or smaller
-   private val specialCaseBigger = (10, 100L, "HelloWorld", 200L, 17)
-   private val specialCaseSmaller  = (5, 50L, "Hello", 50L, 17)
-
-  /**
-    * This test validates whether the order of tuples has
-    *
-    * any impact on the outcome and if the bigger tuple is returned.
-    */
-  @Test
-  def testMaxByComparison(): Unit = {
-    val a1 = Array(0)
-    val maxByTuple = new SelectByMaxFunction(tupleTypeInfo, a1)
-      try {
-        Assert.assertSame("SelectByMax must return bigger tuple",
-          bigger, maxByTuple.reduce(smaller, bigger))
-        Assert.assertSame("SelectByMax must return bigger tuple",
-          bigger, maxByTuple.reduce(bigger, smaller))
-      } catch {
-        case e : Exception =>
-          Assert.fail("No exception should be thrown while comparing both tuples")
-      }
-  }
-
-  // ----------------------- MAXIMUM FUNCTION TEST BELOW --------------------------
-
-  /**
-    * This test cases checks when two tuples only differ in one value, but this value is not
-    * in the fields list. In that case it should be seen as equal
-    * and then the first given tuple (value1) should be returned by reduce().
-    */
-  @Test
-  def testMaxByComparisonSpecialCase1() : Unit = {
-    val a1 = Array(0, 3)
-    val maxByTuple = new SelectByMaxFunction(tupleTypeInfo, a1)
-
-    try {
-      Assert.assertSame("SelectByMax must return the first given tuple",
-        specialCaseBigger, maxByTuple.reduce(specialCaseBigger, bigger))
-      Assert.assertSame("SelectByMax must return the first given tuple",
-        bigger, maxByTuple.reduce(bigger, specialCaseBigger))
-    } catch {
-      case e : Exception => Assert.fail("No exception should be thrown " +
-        "while comparing both tuples")
-    }
-  }
-
-  /**
-    * This test cases checks when two tuples only differ in one value.
-    */
-  @Test
-  def testMaxByComparisonSpecialCase2() : Unit = {
-    val a1 = Array(0, 2, 1, 4, 3)
-    val maxByTuple = new SelectByMaxFunction(tupleTypeInfo, a1)
-    try {
-      Assert.assertSame("SelectByMax must return bigger tuple",
-        bigger, maxByTuple.reduce(specialCaseBigger, bigger))
-      Assert.assertSame("SelectByMax must return bigger tuple",
-        bigger, maxByTuple.reduce(bigger, specialCaseBigger))
-    } catch {
-      case e : Exception => Assert.fail("No exception should be thrown" +
-        " while comparing both tuples")
-    }
-  }
-
-  /**
-    * This test validates that equality is independent of the amount of used indices.
-    */
-  @Test
-  def testMaxByComparisonMultiple(): Unit = {
-    val a1 = Array(0, 1, 2, 3, 4)
-    val maxByTuple = new SelectByMaxFunction(tupleTypeInfo, a1)
-    try {
-      Assert.assertSame("SelectByMax must return bigger tuple",
-        bigger, maxByTuple.reduce(smaller, bigger))
-      Assert.assertSame("SelectByMax must return bigger tuple",
-        bigger, maxByTuple.reduce(bigger, smaller))
-    } catch {
-      case e : Exception => Assert.fail("No exception should be thrown " +
-        "while comparing both tuples")
-    }
-  }
-
-  /**
-    * Checks whether reduce does behave as expected if both values are the same object.
-    */
-  @Test
-  def testMaxByComparisonMustReturnATuple() : Unit = {
-    val a1 = Array(0)
-    val maxByTuple = new SelectByMaxFunction(tupleTypeInfo, a1)
-
-    try {
-      Assert.assertSame("SelectByMax must return bigger tuple",
-        bigger, maxByTuple.reduce(bigger, bigger))
-      Assert.assertSame("SelectByMax must return smaller tuple",
-        smaller, maxByTuple.reduce(smaller, smaller))
-    } catch {
-      case e : Exception => Assert.fail("No exception should be thrown" +
-        " while comparing both tuples")
-    }
-  }
-
-  // ----------------------- MINIMUM FUNCTION TEST BELOW --------------------------
-
-  /**
-    * This test validates whether the order of tuples has any impact
-    * on the outcome and if the smaller tuple is returned.
-    */
-  @Test
-  def testMinByComparison() : Unit = {
-    val a1 = Array(0)
-    val minByTuple = new SelectByMinFunction(tupleTypeInfo, a1)
-    try {
-      Assert.assertSame("SelectByMin must return smaller tuple",
-        smaller, minByTuple.reduce(smaller, bigger))
-      Assert.assertSame("SelectByMin must return smaller tuple",
-        smaller, minByTuple.reduce(bigger, smaller))
-    } catch {
-      case e : Exception => Assert.fail("No exception should be thrown " +
-        "while comparing both tuples")
-    }
-  }
-
-  /**
-    * This test cases checks when two tuples only differ in one value, but this value is not
-    * in the fields list. In that case it should be seen as equal and
-    * then the first given tuple (value1) should be returned by reduce().
-    */
-  @Test
-  def testMinByComparisonSpecialCase1() : Unit = {
-    val a1 = Array(0, 3)
-    val minByTuple = new SelectByMinFunction(tupleTypeInfo, a1)
-
-    try {
-      Assert.assertSame("SelectByMin must return the first given tuple",
-        specialCaseBigger, minByTuple.reduce(specialCaseBigger, bigger))
-      Assert.assertSame("SelectByMin must return the first given tuple",
-        bigger, minByTuple.reduce(bigger, specialCaseBigger))
-    } catch {
-      case e : Exception => Assert.fail("No exception should be thrown " +
-        "while comparing both tuples")
-    }
-  }
-
-  /**
-    * This test validates that when two tuples only differ in one value
-    * and that value's index is given at construction time. The smaller tuple must be returned
-    * then.
-    */
-  @Test
-  def  testMinByComparisonSpecialCase2() : Unit = {
-    val a1 = Array(0, 2, 1, 4, 3)
-    val minByTuple = new SelectByMinFunction(tupleTypeInfo, a1)
-
-    try {
-      Assert.assertSame("SelectByMin must return smaller tuple",
-        smaller, minByTuple.reduce(specialCaseSmaller, smaller))
-      Assert.assertSame("SelectByMin must return smaller tuple",
-        smaller, minByTuple.reduce(smaller, specialCaseSmaller))
-    } catch {
-      case e : Exception => Assert.fail("No exception should be thrown" +
-        " while comparing both tuples")
-    }
-  }
-
-  /**
-    * Checks whether reduce does behave as expected if both values are the same object.
-    */
-  @Test
-  def testMinByComparisonMultiple() : Unit =  {
-    val a1 = Array(0, 1, 2, 3, 4)
-    val minByTuple  = new SelectByMinFunction(tupleTypeInfo, a1)
-    try {
-      Assert.assertSame("SelectByMin must return smaller tuple",
-        smaller, minByTuple.reduce(smaller, bigger))
-      Assert.assertSame("SelectByMin must return smaller tuple",
-        smaller, minByTuple.reduce(bigger, smaller))
-    } catch {
-      case e : Exception => Assert.fail("No exception should be thrown" +
-        " while comparing both tuples")
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/6a935978/flink-scala/src/test/scala/org/apache/flink/api/scala/DeltaIterationSanityCheckTest.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/DeltaIterationSanityCheckTest.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/DeltaIterationSanityCheckTest.scala
new file mode 100644
index 0000000..2775d09
--- /dev/null
+++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/DeltaIterationSanityCheckTest.scala
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala
+
+import org.apache.flink.api.java.io.DiscardingOutputFormat
+import org.junit.Test
+import org.apache.flink.api.common.InvalidProgramException
+
+// Verify that the sanity checking in delta iterations works. We just
+// have a dummy job that is not meant to be executed. Only verify that
+// the join/coGroup inside the iteration is checked.
+class DeltaIterationSanityCheckTest extends Serializable {
+
+  @Test
+  def testCorrectJoinWithSolution1(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val solutionInput = env.fromElements((1, "1"))
+    val worksetInput = env.fromElements((2, "2"))
+
+    val iteration = solutionInput.iterateDelta(worksetInput, 10, Array("_1")) { (s, ws) =>
+      val result = s.join(ws).where("_1").equalTo("_1") { (l, r) => l }
+      (result, ws)
+    }
+
+    iteration.output(new DiscardingOutputFormat[(Int, String)])
+  }
+
+  @Test
+  def testCorrectJoinWithSolution2(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val solutionInput = env.fromElements((1, "1"))
+    val worksetInput = env.fromElements((2, "2"))
+
+    val iteration = solutionInput.iterateDelta(worksetInput, 10, Array("_1")) { (s, ws) =>
+      val result = ws.join(s).where("_1").equalTo("_1") { (l, r) => l }
+      (result, ws)
+    }
+
+    iteration.output(new DiscardingOutputFormat[(Int,String)])
+  }
+
+  @Test(expected = classOf[InvalidProgramException])
+  def testIncorrectJoinWithSolution1(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val solutionInput = env.fromElements((1, "1"))
+    val worksetInput = env.fromElements((2, "2"))
+
+    val iteration = solutionInput.iterateDelta(worksetInput, 10, Array("_1")) { (s, ws) =>
+      val result = s.join(ws).where("_2").equalTo("_2") { (l, r) => l }
+      (result, ws)
+    }
+
+    iteration.output(new DiscardingOutputFormat[(Int,String)])
+  }
+
+  @Test(expected = classOf[InvalidProgramException])
+  def testIncorrectJoinWithSolution2(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val solutionInput = env.fromElements((1, "1"))
+    val worksetInput = env.fromElements((2, "2"))
+
+    val iteration = solutionInput.iterateDelta(worksetInput, 10, Array("_1")) { (s, ws) =>
+      val result = ws.join(s).where("_2").equalTo("_2") { (l, r) => l }
+      (result, ws)
+    }
+
+    iteration.output(new DiscardingOutputFormat[(Int,String)])  
+  }
+
+  @Test(expected = classOf[InvalidProgramException])
+  def testIncorrectJoinWithSolution3(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val solutionInput = env.fromElements((1, "1"))
+    val worksetInput = env.fromElements((2, "2"))
+
+    val iteration = solutionInput.iterateDelta(worksetInput, 10, Array("_2")) { (s, ws) =>
+      val result = ws.join(s).where("_1").equalTo("_1") { (l, r) => l }
+      (result, ws)
+    }
+
+    iteration.output(new DiscardingOutputFormat[(Int,String)])
+   }
+
+  @Test
+  def testCorrectCoGroupWithSolution1(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val solutionInput = env.fromElements((1, "1"))
+    val worksetInput = env.fromElements((2, "2"))
+
+    val iteration = solutionInput.iterateDelta(worksetInput, 10, Array("_1")) { (s, ws) =>
+      val result = s.coGroup(ws).where("_1").equalTo("_1") { (l, r) => l.min }
+      (result, ws)
+    }
+
+    iteration.output(new DiscardingOutputFormat[(Int,String)])
+  }
+
+  @Test
+  def testCorrectCoGroupWithSolution2(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val solutionInput = env.fromElements((1, "1"))
+    val worksetInput = env.fromElements((2, "2"))
+
+    val iteration = solutionInput.iterateDelta(worksetInput, 10, Array("_1")) { (s, ws) =>
+      val result = ws.coGroup(s).where("_1").equalTo("_1") { (l, r) => l.min }
+      (result, ws)
+    }
+
+    iteration.output(new DiscardingOutputFormat[(Int,String)])
+  }
+
+  @Test(expected = classOf[InvalidProgramException])
+  def testIncorrectCoGroupWithSolution1(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val solutionInput = env.fromElements((1, "1"))
+    val worksetInput = env.fromElements((2, "2"))
+
+    val iteration = solutionInput.iterateDelta(worksetInput, 10, Array("_1")) { (s, ws) =>
+      val result = s.coGroup(ws).where("_2").equalTo("_2") { (l, r) => l.min }
+      (result, ws)
+    }
+
+    iteration.output(new DiscardingOutputFormat[(Int,String)])
+  }
+
+  @Test(expected = classOf[InvalidProgramException])
+  def testIncorrectCoGroupWithSolution2(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val solutionInput = env.fromElements((1, "1"))
+    val worksetInput = env.fromElements((2, "2"))
+
+    val iteration = solutionInput.iterateDelta(worksetInput, 10, Array("_1")) { (s, ws) =>
+      val result = ws.coGroup(s).where("_2").equalTo("_2") { (l, r) => l.min }
+      (result, ws)
+    }
+
+    iteration.output(new DiscardingOutputFormat[(Int,String)])  
+  }
+
+  @Test(expected = classOf[InvalidProgramException])
+  def testIncorrectCoGroupWithSolution3(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val solutionInput = env.fromElements((1, "1"))
+    val worksetInput = env.fromElements((2, "2"))
+
+    val iteration = solutionInput.iterateDelta(worksetInput, 10, Array("_2")) { (s, ws) =>
+      val result = ws.coGroup(s).where("_1").equalTo("_1") { (l, r) => l.min }
+      (result, ws)
+    }
+
+    iteration.output(new DiscardingOutputFormat[(Int,String)])
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6a935978/flink-scala/src/test/scala/org/apache/flink/api/scala/MaxByOperatorTest.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/MaxByOperatorTest.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/MaxByOperatorTest.scala
new file mode 100644
index 0000000..4266449
--- /dev/null
+++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/MaxByOperatorTest.scala
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala
+
+import org.apache.flink.api.common.InvalidProgramException
+
+import org.junit.Test
+import org.junit.Assert
+
+class MaxByOperatorTest {
+
+  private val emptyTupleData = List[(Int, Long, String, Long, Int)]()
+  private val customTypeData = List[CustomType]()
+
+  @Test
+  def testMaxByKeyFieldsDataset(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val collection = env.fromCollection(emptyTupleData)
+    try {
+      collection.maxBy(0, 1, 2, 3, 4)
+    } catch {
+      case e : Exception => Assert.fail();
+    }
+  }
+
+  /**
+    * This test validates that an index which is out of bounds throws an
+    * IndexOutOfBOundsExcpetion.
+    */
+  @Test(expected = classOf[IndexOutOfBoundsException])
+  def testOutOfTupleBoundsDataset1() {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val collection = env.fromCollection(emptyTupleData)
+
+    // should not work, key out of tuple bounds
+    collection.maxBy(5)
+  }
+
+  /**
+    * This test validates that an index which is out of bounds throws an
+    * IndexOutOfBOundsExcpetion.
+    */
+  @Test(expected = classOf[IndexOutOfBoundsException])
+  def testOutOfTupleBoundsDataset2() {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val collection = env.fromCollection(emptyTupleData)
+
+    // should not work, key out of tuple bounds
+    collection.maxBy(-1)
+  }
+
+  /**
+    * This test validates that an index which is out of bounds throws an
+    * IndexOutOfBOundsExcpetion.
+    */
+  @Test(expected = classOf[IndexOutOfBoundsException])
+  def testOutOfTupleBoundsDataset3() {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val collection = env.fromCollection(emptyTupleData)
+
+    // should not work, key out of tuple bounds
+    collection.maxBy(1, 2, 3, 4, -1)
+  }
+
+  /**
+    * This test validates that no exceptions is thrown when an empty grouping
+    * calls maxBy().
+    */
+  @Test
+  def testMaxByKeyFieldsGrouping() {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val groupDs = env.fromCollection(emptyTupleData).groupBy(0)
+    // should work
+    try {
+      groupDs.maxBy(4, 0, 1, 2, 3)
+    } catch {
+      case e : Exception => Assert.fail();
+    }
+  }
+
+  /**
+    * This test validates that an InvalidProgrammException is thrown when maxBy
+    * is used on a custom data type.
+    */
+  @Test(expected = classOf[InvalidProgramException])
+  def testCustomKeyFieldsDataset() {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+
+    val customDS = env.fromCollection(customTypeData)
+    // should not work: groups on custom type
+    customDS.maxBy(0)
+  }
+
+  /**
+    * This test validates that an InvalidProgrammException is thrown when maxBy
+    * is used on a custom data type.
+    */
+  @Test(expected = classOf[InvalidProgramException])
+  def testCustomKeyFieldsGrouping() {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val groupDs: GroupedDataSet[CustomType] = env.fromCollection(customTypeData).groupBy(0)
+
+    groupDs.maxBy(0)
+  }
+  /**
+    * This test validates that an index which is out of bounds throws an
+    * IndexOutOfBOundsExcpetion.
+    */
+  @Test(expected = classOf[IndexOutOfBoundsException])
+  def testOutOfTupleBoundsGrouping1() {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val groupDs = env.fromCollection(emptyTupleData).groupBy(0)
+    groupDs.maxBy(5)
+  }
+
+  /**
+    * This test validates that an index which is out of bounds throws an
+    * IndexOutOfBOundsExcpetion.
+    */
+  @Test(expected = classOf[IndexOutOfBoundsException])
+  def testOutOfTupleBoundsGrouping2() {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val groupDs = env.fromCollection(emptyTupleData).groupBy(0)
+    groupDs.maxBy(-1)
+  }
+
+  /**
+    * This test validates that an index which is out of bounds throws an
+    * IndexOutOfBOundsExcpetion.
+    */
+  @Test(expected = classOf[IndexOutOfBoundsException])
+  def testOutOfTupleBoundsGrouping3() {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val groupDs = env.fromCollection(emptyTupleData).groupBy(0)
+    groupDs.maxBy(1, 2, 3, 4, -1)
+  }
+
+  class CustomType(var myInt: Int, var myLong: Long, var myString: String) {
+    def this() {
+      this(0, 0, "")
+    }
+
+    override def toString: String = {
+      myInt + "," + myLong + "," + myString
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6a935978/flink-scala/src/test/scala/org/apache/flink/api/scala/MinByOperatorTest.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/MinByOperatorTest.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/MinByOperatorTest.scala
new file mode 100644
index 0000000..5e659ad
--- /dev/null
+++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/MinByOperatorTest.scala
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala
+
+import org.apache.flink.api.common.InvalidProgramException
+
+import org.junit.Test
+import org.junit.Assert
+
+class MinByOperatorTest {
+  private val emptyTupleData = List[(Int, Long, String, Long, Int)]()
+  private val customTypeData = List[CustomType]()
+
+  @Test
+  def testMinByKeyFieldsDataset(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val collection = env.fromCollection(emptyTupleData)
+    try {
+      collection.minBy(4, 0, 1, 2, 3)
+    } catch {
+      case e : Exception => Assert.fail();
+    }
+  }
+
+  /**
+    * This test validates that an index which is out of bounds throws an
+    * IndexOutOfBOundsExcpetion.
+    */
+  @Test(expected = classOf[IndexOutOfBoundsException])
+  def testOutOfTupleBoundsDataset1() {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val collection = env.fromCollection(emptyTupleData)
+
+    // should not work, key out of tuple bounds
+    collection.minBy(5)
+  }
+
+  /**
+    * This test validates that an index which is out of bounds throws an
+    * IndexOutOfBOundsExcpetion.
+    */
+  @Test(expected = classOf[IndexOutOfBoundsException])
+  def testOutOfTupleBoundsDataset2() {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val collection = env.fromCollection(emptyTupleData)
+
+    // should not work, key out of tuple bounds
+    collection.minBy(-1)
+  }
+
+  /**
+    * This test validates that an index which is out of bounds throws an
+    * IndexOutOfBOundsExcpetion.
+    */
+  @Test(expected = classOf[IndexOutOfBoundsException])
+  def testOutOfTupleBoundsDataset3() {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val collection = env.fromCollection(emptyTupleData)
+
+    // should not work, key out of tuple bounds
+    collection.minBy(1, 2, 3, 4, -1)
+  }
+
+  /**
+    * This test validates that an InvalidProgrammException is thrown when minBy
+    * is used on a custom data type.
+    */
+  @Test(expected = classOf[InvalidProgramException])
+  def testCustomKeyFieldsDataset() {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+
+    val customDS = env.fromCollection(customTypeData)
+    // should not work: groups on custom type
+    customDS.minBy(0)
+  }
+
+  /**
+    * This test validates that no exceptions is thrown when an empty grouping
+    * calls minBy().
+    */
+  @Test
+  def testMinByKeyFieldsGrouping() {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val groupDs = env.fromCollection(emptyTupleData).groupBy(0)
+    // should work
+    try {
+      groupDs.minBy(4, 0, 1, 2, 3)
+    } catch {
+      case e : Exception => Assert.fail()
+    }
+  }
+
+  /**
+    * This test validates that an InvalidProgrammException is thrown when minBy
+    * is used on a custom data type.
+    */
+  @Test(expected = classOf[InvalidProgramException])
+  def testCustomKeyFieldsGrouping() {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val groupDs: GroupedDataSet[CustomType] = env.fromCollection(customTypeData).groupBy(0)
+
+    groupDs.minBy(0)
+  }
+
+  /**
+    * This test validates that an index which is out of bounds throws an
+    * IndexOutOfBOundsExcpetion.
+    */
+  @Test(expected = classOf[IndexOutOfBoundsException])
+  def testOutOfTupleBoundsGrouping1() {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val groupDs = env.fromCollection(emptyTupleData).groupBy(0)
+
+    groupDs.minBy(5)
+  }
+
+  /**
+    * This test validates that an index which is out of bounds throws an
+    * IndexOutOfBOundsExcpetion.
+    */
+  @Test(expected = classOf[IndexOutOfBoundsException])
+  def testOutOfTupleBoundsGrouping2() {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val groupDs = env.fromCollection(emptyTupleData).groupBy(0)
+
+    groupDs.minBy(-1)
+  }
+
+  /**s
+    * This test validates that an index which is out of bounds throws an
+    * IndexOutOfBOundsExcpetion.
+    */
+  @Test(expected = classOf[IndexOutOfBoundsException])
+  def testOutOfTupleBoundsGrouping3() {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val groupDs = env.fromCollection(emptyTupleData).groupBy(0)
+
+    groupDs.minBy(1, 2, 3, 4, -1)
+  }
+
+  class CustomType(var myInt: Int, var myLong: Long, var myString: String) {
+    def this() {
+      this(0, 0, "")
+    }
+
+    override def toString: String = {
+      myInt + "," + myLong + "," + myString
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6a935978/flink-scala/src/test/scala/org/apache/flink/api/scala/SelectByFunctionTest.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/SelectByFunctionTest.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/SelectByFunctionTest.scala
new file mode 100644
index 0000000..a614700
--- /dev/null
+++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/SelectByFunctionTest.scala
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.TupleTypeInfoBase
+
+import org.junit.{Assert, Test}
+
+/**
+  *
+  */
+class SelectByFunctionTest {
+
+   val tupleTypeInfo = implicitly[TypeInformation[(Int, Long, String, Long, Int)]]
+    .asInstanceOf[TupleTypeInfoBase[(Int, Long, String, Long, Int)]]
+
+   private val bigger  = (10, 100L, "HelloWorld", 200L, 20)
+   private val smaller = (5, 50L, "Hello", 50L, 15)
+
+   //Special case where only the last value determines if bigger or smaller
+   private val specialCaseBigger = (10, 100L, "HelloWorld", 200L, 17)
+   private val specialCaseSmaller  = (5, 50L, "Hello", 50L, 17)
+
+  /**
+    * This test validates whether the order of tuples has
+    *
+    * any impact on the outcome and if the bigger tuple is returned.
+    */
+  @Test
+  def testMaxByComparison(): Unit = {
+    val a1 = Array(0)
+    val maxByTuple = new SelectByMaxFunction(tupleTypeInfo, a1)
+      try {
+        Assert.assertSame("SelectByMax must return bigger tuple",
+          bigger, maxByTuple.reduce(smaller, bigger))
+        Assert.assertSame("SelectByMax must return bigger tuple",
+          bigger, maxByTuple.reduce(bigger, smaller))
+      } catch {
+        case e : Exception =>
+          Assert.fail("No exception should be thrown while comparing both tuples")
+      }
+  }
+
+  // ----------------------- MAXIMUM FUNCTION TEST BELOW --------------------------
+
+  /**
+    * This test cases checks when two tuples only differ in one value, but this value is not
+    * in the fields list. In that case it should be seen as equal
+    * and then the first given tuple (value1) should be returned by reduce().
+    */
+  @Test
+  def testMaxByComparisonSpecialCase1() : Unit = {
+    val a1 = Array(0, 3)
+    val maxByTuple = new SelectByMaxFunction(tupleTypeInfo, a1)
+
+    try {
+      Assert.assertSame("SelectByMax must return the first given tuple",
+        specialCaseBigger, maxByTuple.reduce(specialCaseBigger, bigger))
+      Assert.assertSame("SelectByMax must return the first given tuple",
+        bigger, maxByTuple.reduce(bigger, specialCaseBigger))
+    } catch {
+      case e : Exception => Assert.fail("No exception should be thrown " +
+        "while comparing both tuples")
+    }
+  }
+
+  /**
+    * This test cases checks when two tuples only differ in one value.
+    */
+  @Test
+  def testMaxByComparisonSpecialCase2() : Unit = {
+    val a1 = Array(0, 2, 1, 4, 3)
+    val maxByTuple = new SelectByMaxFunction(tupleTypeInfo, a1)
+    try {
+      Assert.assertSame("SelectByMax must return bigger tuple",
+        bigger, maxByTuple.reduce(specialCaseBigger, bigger))
+      Assert.assertSame("SelectByMax must return bigger tuple",
+        bigger, maxByTuple.reduce(bigger, specialCaseBigger))
+    } catch {
+      case e : Exception => Assert.fail("No exception should be thrown" +
+        " while comparing both tuples")
+    }
+  }
+
+  /**
+    * This test validates that equality is independent of the amount of used indices.
+    */
+  @Test
+  def testMaxByComparisonMultiple(): Unit = {
+    val a1 = Array(0, 1, 2, 3, 4)
+    val maxByTuple = new SelectByMaxFunction(tupleTypeInfo, a1)
+    try {
+      Assert.assertSame("SelectByMax must return bigger tuple",
+        bigger, maxByTuple.reduce(smaller, bigger))
+      Assert.assertSame("SelectByMax must return bigger tuple",
+        bigger, maxByTuple.reduce(bigger, smaller))
+    } catch {
+      case e : Exception => Assert.fail("No exception should be thrown " +
+        "while comparing both tuples")
+    }
+  }
+
+  /**
+    * Checks whether reduce does behave as expected if both values are the same object.
+    */
+  @Test
+  def testMaxByComparisonMustReturnATuple() : Unit = {
+    val a1 = Array(0)
+    val maxByTuple = new SelectByMaxFunction(tupleTypeInfo, a1)
+
+    try {
+      Assert.assertSame("SelectByMax must return bigger tuple",
+        bigger, maxByTuple.reduce(bigger, bigger))
+      Assert.assertSame("SelectByMax must return smaller tuple",
+        smaller, maxByTuple.reduce(smaller, smaller))
+    } catch {
+      case e : Exception => Assert.fail("No exception should be thrown" +
+        " while comparing both tuples")
+    }
+  }
+
+  // ----------------------- MINIMUM FUNCTION TEST BELOW --------------------------
+
+  /**
+    * This test validates whether the order of tuples has any impact
+    * on the outcome and if the smaller tuple is returned.
+    */
+  @Test
+  def testMinByComparison() : Unit = {
+    val a1 = Array(0)
+    val minByTuple = new SelectByMinFunction(tupleTypeInfo, a1)
+    try {
+      Assert.assertSame("SelectByMin must return smaller tuple",
+        smaller, minByTuple.reduce(smaller, bigger))
+      Assert.assertSame("SelectByMin must return smaller tuple",
+        smaller, minByTuple.reduce(bigger, smaller))
+    } catch {
+      case e : Exception => Assert.fail("No exception should be thrown " +
+        "while comparing both tuples")
+    }
+  }
+
+  /**
+    * This test cases checks when two tuples only differ in one value, but this value is not
+    * in the fields list. In that case it should be seen as equal and
+    * then the first given tuple (value1) should be returned by reduce().
+    */
+  @Test
+  def testMinByComparisonSpecialCase1() : Unit = {
+    val a1 = Array(0, 3)
+    val minByTuple = new SelectByMinFunction(tupleTypeInfo, a1)
+
+    try {
+      Assert.assertSame("SelectByMin must return the first given tuple",
+        specialCaseBigger, minByTuple.reduce(specialCaseBigger, bigger))
+      Assert.assertSame("SelectByMin must return the first given tuple",
+        bigger, minByTuple.reduce(bigger, specialCaseBigger))
+    } catch {
+      case e : Exception => Assert.fail("No exception should be thrown " +
+        "while comparing both tuples")
+    }
+  }
+
+  /**
+    * This test validates that when two tuples only differ in one value
+    * and that value's index is given at construction time. The smaller tuple must be returned
+    * then.
+    */
+  @Test
+  def  testMinByComparisonSpecialCase2() : Unit = {
+    val a1 = Array(0, 2, 1, 4, 3)
+    val minByTuple = new SelectByMinFunction(tupleTypeInfo, a1)
+
+    try {
+      Assert.assertSame("SelectByMin must return smaller tuple",
+        smaller, minByTuple.reduce(specialCaseSmaller, smaller))
+      Assert.assertSame("SelectByMin must return smaller tuple",
+        smaller, minByTuple.reduce(smaller, specialCaseSmaller))
+    } catch {
+      case e : Exception => Assert.fail("No exception should be thrown" +
+        " while comparing both tuples")
+    }
+  }
+
+  /**
+    * Checks whether reduce does behave as expected if both values are the same object.
+    */
+  @Test
+  def testMinByComparisonMultiple() : Unit =  {
+    val a1 = Array(0, 1, 2, 3, 4)
+    val minByTuple  = new SelectByMinFunction(tupleTypeInfo, a1)
+    try {
+      Assert.assertSame("SelectByMin must return smaller tuple",
+        smaller, minByTuple.reduce(smaller, bigger))
+      Assert.assertSame("SelectByMin must return smaller tuple",
+        smaller, minByTuple.reduce(bigger, smaller))
+    } catch {
+      case e : Exception => Assert.fail("No exception should be thrown" +
+        " while comparing both tuples")
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6a935978/flink-scala/src/test/scala/org/apache/flink/api/scala/functions/SemanticPropertiesTranslationTest.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/functions/SemanticPropertiesTranslationTest.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/functions/SemanticPropertiesTranslationTest.scala
new file mode 100644
index 0000000..16c826f
--- /dev/null
+++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/functions/SemanticPropertiesTranslationTest.scala
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala.functions
+
+import org.apache.flink.api.java.io.DiscardingOutputFormat
+import org.junit.Assert._
+import org.apache.flink.api.common.functions.RichJoinFunction
+import org.apache.flink.api.common.functions.RichMapFunction
+import org.apache.flink.api.common.operators.{GenericDataSinkBase, SingleInputSemanticProperties}
+import org.apache.flink.api.common.operators.base.{InnerJoinOperatorBase, MapOperatorBase}
+import org.apache.flink.api.common.operators.util.FieldSet
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsSecond
+import org.junit.Test
+
+import org.apache.flink.api.scala._
+
+/**
+ * This is a minimal test to verify that semantic annotations are evaluated against
+ * the type information properly translated correctly to the common data flow API.
+ *
+ * This covers only the constant fields annotations currently !!!
+ */
+class SemanticPropertiesTranslationTest {
+  /**
+   * A mapper that preserves all fields over a tuple data set.
+   */
+  @Test
+  def translateUnaryFunctionAnnotationTuplesWildCard(): Unit = {
+    try {
+      val env = ExecutionEnvironment.getExecutionEnvironment
+
+      val input = env.fromElements((3L, "test", 42))
+      input.map(new WildcardForwardMapper[(Long, String, Int)])
+        .output(new DiscardingOutputFormat[(Long, String, Int)])
+
+      val plan = env.createProgramPlan()
+
+      val sink: GenericDataSinkBase[_] = plan.getDataSinks.iterator.next
+
+      val mapper: MapOperatorBase[_, _, _] = sink.getInput.asInstanceOf[MapOperatorBase[_, _, _]]
+
+      val semantics: SingleInputSemanticProperties = mapper.getSemanticProperties
+      val fw1: FieldSet = semantics.getForwardingTargetFields(0, 0)
+      val fw2: FieldSet = semantics.getForwardingTargetFields(0, 1)
+      val fw3: FieldSet = semantics.getForwardingTargetFields(0, 2)
+
+      assertNotNull(fw1)
+      assertNotNull(fw2)
+      assertNotNull(fw3)
+      assertTrue(fw1.contains(0))
+      assertTrue(fw2.contains(1))
+      assertTrue(fw3.contains(2))
+    } catch {
+      case e: Exception => {
+        System.err.println(e.getMessage)
+        e.printStackTrace()
+        fail("Exception in test: " + e.getMessage)
+      }
+    }
+  }
+
+  /**
+   * A mapper that preserves fields 0, 1, 2 of a tuple data set.
+   */
+  @Test
+  def translateUnaryFunctionAnnotationTuples1(): Unit = {
+    try {
+      val env = ExecutionEnvironment.getExecutionEnvironment
+
+      val input = env.fromElements((3L, "test", 42))
+      input.map(new IndividualForwardMapper[Long, String, Int])
+        .output(new DiscardingOutputFormat[(Long, String, Int)])
+
+      val plan = env.createProgramPlan()
+
+      val sink: GenericDataSinkBase[_] = plan.getDataSinks.iterator.next
+
+      val mapper: MapOperatorBase[_, _, _] = sink.getInput.asInstanceOf[MapOperatorBase[_, _, _]]
+
+      val semantics: SingleInputSemanticProperties = mapper.getSemanticProperties
+      val fw1: FieldSet = semantics.getForwardingTargetFields(0, 0)
+      val fw2: FieldSet = semantics.getForwardingTargetFields(0, 1)
+      val fw3: FieldSet = semantics.getForwardingTargetFields(0, 2)
+
+      assertNotNull(fw1)
+      assertNotNull(fw2)
+      assertNotNull(fw3)
+      assertTrue(fw1.contains(0))
+      assertTrue(fw2.contains(1))
+      assertTrue(fw3.contains(2))
+    } catch {
+      case e: Exception => {
+        System.err.println(e.getMessage)
+        e.printStackTrace()
+        fail("Exception in test: " + e.getMessage)
+      }
+    }
+  }
+
+  /**
+   * A mapper that preserves field 1 of a tuple data set.
+   */
+  @Test
+  def translateUnaryFunctionAnnotationTuples2(): Unit = {
+    try {
+      val env = ExecutionEnvironment.getExecutionEnvironment
+
+      val input = env.fromElements((3L, "test", 42))
+      input.map(new FieldTwoForwardMapper[Long, String, Int])
+        .output(new DiscardingOutputFormat[(Long, String, Int)])
+
+      val plan = env.createProgramPlan()
+
+      val sink: GenericDataSinkBase[_] = plan.getDataSinks.iterator.next
+
+      val mapper: MapOperatorBase[_, _, _] = sink.getInput.asInstanceOf[MapOperatorBase[_, _, _]]
+
+      val semantics: SingleInputSemanticProperties = mapper.getSemanticProperties
+      val fw1: FieldSet = semantics.getForwardingTargetFields(0, 0)
+      val fw2: FieldSet = semantics.getForwardingTargetFields(0, 1)
+      val fw3: FieldSet = semantics.getForwardingTargetFields(0, 2)
+
+      assertNotNull(fw1)
+      assertNotNull(fw2)
+      assertNotNull(fw3)
+      assertTrue(fw1.size == 0)
+      assertTrue(fw3.size == 0)
+      assertTrue(fw2.contains(1))
+    } catch {
+      case e: Exception => {
+        System.err.println(e.getMessage)
+        e.printStackTrace()
+        fail("Exception in test: " + e.getMessage)
+      }
+    }
+  }
+
+  /**
+   * A join that preserves tuple fields from both sides.
+   */
+  @Test
+  def translateBinaryFunctionAnnotationTuples1(): Unit = {
+    try {
+      val env = ExecutionEnvironment.getExecutionEnvironment
+
+      val input1 = env.fromElements((3L, "test"))
+      val input2 = env.fromElements((3L, 3.1415))
+
+      input1.join(input2).where(0).equalTo(0)(
+        new ForwardingTupleJoin[Long, String, Long, Double])
+        .output(new DiscardingOutputFormat[(String, Long)])
+
+      val plan = env.createProgramPlan()
+      val sink: GenericDataSinkBase[_] = plan.getDataSinks.iterator.next
+
+      val join: InnerJoinOperatorBase[_, _, _, _] =
+        sink.getInput.asInstanceOf[InnerJoinOperatorBase[_, _, _, _]]
+
+      val semantics = join.getSemanticProperties
+      val fw11: FieldSet = semantics.getForwardingTargetFields(0, 0)
+      val fw12: FieldSet = semantics.getForwardingTargetFields(0, 1)
+      val fw21: FieldSet = semantics.getForwardingTargetFields(1, 0)
+      val fw22: FieldSet = semantics.getForwardingTargetFields(1, 1)
+
+      assertNotNull(fw11)
+      assertNotNull(fw21)
+      assertNotNull(fw12)
+      assertNotNull(fw22)
+      assertEquals(0, fw11.size)
+      assertEquals(0, fw22.size)
+      assertTrue(fw12.contains(0))
+      assertTrue(fw21.contains(1))
+    }
+    catch {
+      case e: Exception => {
+        System.err.println(e.getMessage)
+        e.printStackTrace()
+        fail("Exception in test: " + e.getMessage)
+      }
+    }
+  }
+
+  /**
+   * A join that preserves tuple fields from both sides.
+   */
+  @Test
+  def translateBinaryFunctionAnnotationTuples2(): Unit = {
+    try {
+      val env = ExecutionEnvironment.getExecutionEnvironment
+
+      val input1 = env.fromElements((3L, "test"))
+      val input2 = env.fromElements((3L, 42))
+
+      input1.join(input2).where(0).equalTo(0)(
+        new ForwardingBasicJoin[(Long, String), (Long, Int)])
+        .output(new DiscardingOutputFormat[((Long, String), (Long, Int))])
+
+      val plan = env.createProgramPlan()
+      val sink: GenericDataSinkBase[_] = plan.getDataSinks.iterator.next
+
+      val join: InnerJoinOperatorBase[_, _, _, _] =
+        sink.getInput.asInstanceOf[InnerJoinOperatorBase[_, _, _, _]]
+
+      val semantics = join.getSemanticProperties
+      val fw11: FieldSet = semantics.getForwardingTargetFields(0, 0)
+      val fw12: FieldSet = semantics.getForwardingTargetFields(0, 1)
+      val fw21: FieldSet = semantics.getForwardingTargetFields(1, 0)
+      val fw22: FieldSet = semantics.getForwardingTargetFields(1, 1)
+
+      assertNotNull(fw11)
+      assertNotNull(fw12)
+      assertNotNull(fw21)
+      assertNotNull(fw22)
+      assertTrue(fw11.contains(0))
+      assertTrue(fw12.contains(1))
+      assertTrue(fw21.contains(2))
+      assertTrue(fw22.contains(3))
+    }
+    catch {
+      case e: Exception => {
+        System.err.println(e.getMessage)
+        e.printStackTrace()
+        fail("Exception in test: " + e.getMessage)
+      }
+    }
+  }
+}
+
+
+@ForwardedFields(Array("*"))
+class WildcardForwardMapper[T] extends RichMapFunction[T, T] {
+  def map(value: T): T = {
+    value
+  }
+}
+
+@ForwardedFields(Array("0;1;2"))
+class IndividualForwardMapper[X, Y, Z] extends RichMapFunction[(X, Y, Z), (X, Y, Z)] {
+  def map(value: (X, Y, Z)): (X, Y, Z) = {
+    value
+  }
+}
+
+@ForwardedFields(Array("_2"))
+class FieldTwoForwardMapper[X, Y, Z] extends RichMapFunction[(X, Y, Z), (X, Y, Z)] {
+  def map(value: (X, Y ,Z)): (X, Y, Z) = {
+    value
+  }
+}
+
+@ForwardedFieldsFirst(Array("_2 -> _1"))
+@ForwardedFieldsSecond(Array("_1 -> _2"))
+class ForwardingTupleJoin[A, B, C, D] extends RichJoinFunction[(A, B),  (C, D), (B, C)] {
+  def join(first: (A, B), second: (C, D)): (B, C) = {
+    (first._2, second._1)
+  }
+}
+
+@ForwardedFieldsFirst(Array("* -> 0.*"))
+@ForwardedFieldsSecond(Array("* -> 1.*"))
+class ForwardingBasicJoin[A, B] extends RichJoinFunction[A, B, (A, B)] {
+  def join(first: A, second: B): (A, B) = {
+    (first, second)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6a935978/flink-scala/src/test/scala/org/apache/flink/api/scala/io/CollectionInputFormatTest.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/io/CollectionInputFormatTest.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/io/CollectionInputFormatTest.scala
new file mode 100644
index 0000000..92575f5
--- /dev/null
+++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/io/CollectionInputFormatTest.scala
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala.io
+
+import org.apache.flink.api.common.ExecutionConfig
+import org.apache.flink.api.java.io.CollectionInputFormat
+import org.junit.Assert.assertEquals
+import org.junit.Assert.assertNotNull
+import org.junit.Assert.assertTrue
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.core.io.GenericInputSplit
+import org.junit.Test
+import java.io.ByteArrayInputStream
+import java.io.ByteArrayOutputStream
+import java.io.ObjectInputStream
+import java.io.ObjectOutputStream
+import org.apache.flink.api.scala._
+import scala.collection.JavaConverters._
+
+class ElementType(val id: Int) {
+  def this() {
+    this(-1)
+  }
+
+  override def equals(obj: Any): Boolean = {
+    if (obj != null && obj.isInstanceOf[ElementType]) {
+      val et = obj.asInstanceOf[ElementType]
+      et.id == this.id
+    }
+    else {
+      false
+    }
+  }
+}
+
+class CollectionInputFormatTest {
+
+  @Test
+  def testSerializability(): Unit = {
+
+    val inputCollection = Seq(new ElementType(1), new ElementType(2), new ElementType(3))
+    val info = createTypeInformation[ElementType]
+
+    val inputFormat: CollectionInputFormat[ElementType] = {
+      new CollectionInputFormat[ElementType](
+        inputCollection.asJava,
+        info.createSerializer(new ExecutionConfig))
+    }
+
+    val buffer = new ByteArrayOutputStream
+    val out = new ObjectOutputStream(buffer)
+
+    out.writeObject(inputFormat)
+
+    val in = new ObjectInputStream(new ByteArrayInputStream(buffer.toByteArray))
+    val serializationResult: AnyRef = in.readObject
+
+    assertNotNull(serializationResult)
+    assertTrue(serializationResult.isInstanceOf[CollectionInputFormat[_]])
+
+    val result = serializationResult.asInstanceOf[CollectionInputFormat[ElementType]]
+    val inputSplit = new GenericInputSplit(0, 1)
+    inputFormat.open(inputSplit)
+    result.open(inputSplit)
+
+    while (!inputFormat.reachedEnd && !result.reachedEnd) {
+      val expectedElement = inputFormat.nextRecord(null)
+      val actualElement = result.nextRecord(null)
+      assertEquals(expectedElement, actualElement)
+    }
+  }
+
+  @Test
+  def testSerializabilityStrings(): Unit = {
+    val data = Seq("To bey or not to be,--that is the question:--",
+      "Whether 'tis nobler in the mind to suffer", "The slings and arrows of outrageous fortune",
+      "Or to take arms against a sea of troubles,", "And by opposing end them?--To die," +
+        "--to sleep,--", "No more; and by a sleep to say we end", "The heartache, " +
+        "and the thousand natural shocks", "That flesh is heir to,--'tis a consummation",
+      "Devoutly to be wish'd. To die,--to sleep;--", "To sleep! perchance to dream:--ay, " +
+        "there's the rub;", "For in that sleep of death what dreams may come,",
+      "When we have shuffled off this mortal coil,", "Must give us pause: there's the respect",
+      "That makes calamity of so long life;", "For who would bear the whips and scorns of time,",
+      "The oppressor's wrong, the proud man's contumely,", "The pangs of despis'd love, " +
+        "the law's delay,", "The insolence of office, and the spurns",
+      "That patient merit of the unworthy takes,", "When he himself might his quietus make",
+      "With a bare bodkin? who would these fardels bear,", "To grunt and sweat under a weary " +
+        "life,", "But that the dread of something after death,--", "The undiscover'd country, " +
+        "from whose bourn", "No traveller returns,--puzzles the will,",
+      "And makes us rather bear those ills we have", "Than fly to others that we know not of?",
+      "Thus conscience does make cowards of us all;", "And thus the native hue of resolution",
+      "Is sicklied o'er with the pale cast of thought;", "And enterprises of great pith and " +
+        "moment,", "With this regard, their currents turn awry,", "And lose the name of action" +
+        ".--Soft you now!", "The fair Ophelia!--Nymph, in thy orisons",
+      "Be all my sins remember'd.")
+
+    val inputFormat = new CollectionInputFormat[String](
+      data.asJava,
+      BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig))
+    val baos = new ByteArrayOutputStream
+    val oos = new ObjectOutputStream(baos)
+
+    oos.writeObject(inputFormat)
+    oos.close()
+
+    val bais = new ByteArrayInputStream(baos.toByteArray)
+    val ois = new ObjectInputStream(bais)
+    val result: AnyRef = ois.readObject
+
+    assertTrue(result.isInstanceOf[CollectionInputFormat[_]])
+    var i: Int = 0
+    val in = result.asInstanceOf[CollectionInputFormat[String]]
+    in.open(new GenericInputSplit(0, 1))
+
+    while (!in.reachedEnd) {
+      assertEquals(data(i), in.nextRecord(""))
+      i += 1
+    }
+    assertEquals(data.length, i)
+  }
+}
+