You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2014/09/22 14:28:44 UTC

[02/60] Rewrite the Scala API as (somewhat) thin Layer on Java API

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8131fa7/flink-scala/src/test/scala/org/apache/flink/api/scala/io/CollectionInputFormatTest.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/io/CollectionInputFormatTest.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/io/CollectionInputFormatTest.scala
new file mode 100644
index 0000000..1535eed
--- /dev/null
+++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/io/CollectionInputFormatTest.scala
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala.io
+
+import org.apache.flink.api.java.io.CollectionInputFormat
+import org.junit.Assert.assertEquals
+import org.junit.Assert.assertNotNull
+import org.junit.Assert.assertTrue
+import org.apache.flink.api.java.typeutils.BasicTypeInfo
+import org.apache.flink.core.io.GenericInputSplit
+import org.junit.Test
+import java.io.ByteArrayInputStream
+import java.io.ByteArrayOutputStream
+import java.io.ObjectInputStream
+import java.io.ObjectOutputStream
+import org.apache.flink.api.scala._
+import scala.collection.JavaConverters._
+
+class ElementType(val id: Int) {
+  def this() {
+    this(-1)
+  }
+
+  override def equals(obj: Any): Boolean = {
+    if (obj != null && obj.isInstanceOf[ElementType]) {
+      val et = obj.asInstanceOf[ElementType]
+      et.id == this.id
+    }
+    else {
+      false
+    }
+  }
+}
+
+class CollectionInputFormatTest {
+
+  @Test
+  def testSerializability(): Unit = {
+
+    val inputCollection = Seq(new ElementType(1), new ElementType(2), new ElementType(3))
+    val info = createTypeInformation[ElementType]
+
+    val inputFormat: CollectionInputFormat[ElementType] = new
+        CollectionInputFormat[ElementType](inputCollection.asJava, info.createSerializer())
+
+    val buffer = new ByteArrayOutputStream
+    val out = new ObjectOutputStream(buffer)
+
+    out.writeObject(inputFormat)
+
+    val in = new ObjectInputStream(new ByteArrayInputStream(buffer.toByteArray))
+    val serializationResult: AnyRef = in.readObject
+
+    assertNotNull(serializationResult)
+    assertTrue(serializationResult.isInstanceOf[CollectionInputFormat[_]])
+
+    val result = serializationResult.asInstanceOf[CollectionInputFormat[ElementType]]
+    val inputSplit = new GenericInputSplit
+    inputFormat.open(inputSplit)
+    result.open(inputSplit)
+
+    while (!inputFormat.reachedEnd && !result.reachedEnd) {
+      val expectedElement = inputFormat.nextRecord(null)
+      val actualElement = result.nextRecord(null)
+      assertEquals(expectedElement, actualElement)
+    }
+  }
+
+  @Test
+  def testSerializabilityStrings(): Unit = {
+    val data = Seq("To bey or not to be,--that is the question:--",
+      "Whether 'tis nobler in the mind to suffer", "The slings and arrows of outrageous fortune",
+      "Or to take arms against a sea of troubles,", "And by opposing end them?--To die," +
+        "--to sleep,--", "No more; and by a sleep to say we end", "The heartache, " +
+        "and the thousand natural shocks", "That flesh is heir to,--'tis a consummation",
+      "Devoutly to be wish'd. To die,--to sleep;--", "To sleep! perchance to dream:--ay, " +
+        "there's the rub;", "For in that sleep of death what dreams may come,",
+      "When we have shuffled off this mortal coil,", "Must give us pause: there's the respect",
+      "That makes calamity of so long life;", "For who would bear the whips and scorns of time,",
+      "The oppressor's wrong, the proud man's contumely,", "The pangs of despis'd love, " +
+        "the law's delay,", "The insolence of office, and the spurns",
+      "That patient merit of the unworthy takes,", "When he himself might his quietus make",
+      "With a bare bodkin? who would these fardels bear,", "To grunt and sweat under a weary " +
+        "life,", "But that the dread of something after death,--", "The undiscover'd country, " +
+        "from whose bourn", "No traveller returns,--puzzles the will,",
+      "And makes us rather bear those ills we have", "Than fly to others that we know not of?",
+      "Thus conscience does make cowards of us all;", "And thus the native hue of resolution",
+      "Is sicklied o'er with the pale cast of thought;", "And enterprises of great pith and " +
+        "moment,", "With this regard, their currents turn awry,", "And lose the name of action" +
+        ".--Soft you now!", "The fair Ophelia!--Nymph, in thy orisons",
+      "Be all my sins remember'd.")
+
+    val inputFormat = new CollectionInputFormat[String](
+      data.asJava,
+      BasicTypeInfo.STRING_TYPE_INFO.createSerializer)
+    val baos = new ByteArrayOutputStream
+    val oos = new ObjectOutputStream(baos)
+
+    oos.writeObject(inputFormat)
+    oos.close()
+
+    val bais = new ByteArrayInputStream(baos.toByteArray)
+    val ois = new ObjectInputStream(bais)
+    val result: AnyRef = ois.readObject
+
+    assertTrue(result.isInstanceOf[CollectionInputFormat[_]])
+    var i: Int = 0
+    val in = result.asInstanceOf[CollectionInputFormat[String]]
+    in.open(new GenericInputSplit)
+
+    while (!in.reachedEnd) {
+      assertEquals(data(i), in.nextRecord(""))
+      i += 1
+    }
+    assertEquals(data.length, i)
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8131fa7/flink-scala/src/test/scala/org/apache/flink/api/scala/io/CsvInputFormatTest.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/io/CsvInputFormatTest.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/io/CsvInputFormatTest.scala
new file mode 100644
index 0000000..e85b259
--- /dev/null
+++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/io/CsvInputFormatTest.scala
@@ -0,0 +1,288 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala.io
+
+import org.apache.flink.api.scala.operators.ScalaCsvInputFormat
+import org.junit.Assert.assertEquals
+import org.junit.Assert.assertNotNull
+import org.junit.Assert.assertNull
+import org.junit.Assert.assertTrue
+import org.junit.Assert.fail
+import java.io.File
+import java.io.FileOutputStream
+import java.io.FileWriter
+import java.io.OutputStreamWriter
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.core.fs.FileInputSplit
+import org.apache.flink.core.fs.Path
+import org.junit.Test
+import org.apache.flink.api.scala._
+
+class CsvInputFormatTest {
+
+  private final val PATH: Path = new Path("an/ignored/file/")
+  private final val FIRST_PART: String = "That is the first part"
+  private final val SECOND_PART: String = "That is the second part"
+
+  @Test
+  def readStringFields():Unit = {
+    try {
+      val fileContent = "abc|def|ghijk\nabc||hhg\n|||"
+      val split = createTempFile(fileContent)
+      val format = new ScalaCsvInputFormat[(String, String, String)](
+        PATH, createTypeInformation[(String, String, String)])
+      format.setDelimiter("\n")
+      format.setFieldDelimiter('|')
+      val parameters = new Configuration
+      format.configure(parameters)
+      format.open(split)
+      var result: (String, String, String) = null
+      result = format.nextRecord(result)
+      assertNotNull(result)
+      assertEquals("abc", result._1)
+      assertEquals("def", result._2)
+      assertEquals("ghijk", result._3)
+      result = format.nextRecord(result)
+      assertNotNull(result)
+      assertEquals("abc", result._1)
+      assertEquals("", result._2)
+      assertEquals("hhg", result._3)
+      result = format.nextRecord(result)
+      assertNotNull(result)
+      assertEquals("", result._1)
+      assertEquals("", result._2)
+      assertEquals("", result._3)
+      result = format.nextRecord(result)
+      assertNull(result)
+      assertTrue(format.reachedEnd)
+    }
+    catch {
+      case ex: Exception => {
+        ex.printStackTrace()
+        fail("Test failed due to a " + ex.getClass.getName + ": " + ex.getMessage)
+      }
+    }
+  }
+
+  @Test
+  def readStringFieldsWithTrailingDelimiters(): Unit = {
+    try {
+      val fileContent = "abc|def|ghijk\nabc||hhg\n|||\n"
+      val split = createTempFile(fileContent)
+      val format = new ScalaCsvInputFormat[(String, String, String)](
+        PATH, createTypeInformation[(String, String, String)])
+      format.setDelimiter("\n")
+      format.setFieldDelimiter('|')
+      val parameters = new Configuration
+      format.configure(parameters)
+      format.open(split)
+      var result: (String, String, String) = null
+      result = format.nextRecord(result)
+      assertNotNull(result)
+      assertEquals("abc", result._1)
+      assertEquals("def", result._2)
+      assertEquals("ghijk", result._3)
+      result = format.nextRecord(result)
+      assertNotNull(result)
+      assertEquals("abc", result._1)
+      assertEquals("", result._2)
+      assertEquals("hhg", result._3)
+      result = format.nextRecord(result)
+      assertNotNull(result)
+      assertEquals("", result._1)
+      assertEquals("", result._2)
+      assertEquals("", result._3)
+      result = format.nextRecord(result)
+      assertNull(result)
+      assertTrue(format.reachedEnd)
+    }
+    catch {
+      case ex: Exception =>
+        ex.printStackTrace()
+        fail("Test failed due to a " + ex.getClass.getName + ": " + ex.getMessage)
+    }
+  }
+
+  @Test
+  def testIntegerFields(): Unit = {
+    try {
+      val fileContent = "111|222|333|444|555\n666|777|888|999|000|\n"
+      val split = createTempFile(fileContent)
+      val format = new ScalaCsvInputFormat[(Int, Int, Int, Int, Int)](
+        PATH, createTypeInformation[(Int, Int, Int, Int, Int)])
+      format.setFieldDelimiter('|')
+      format.configure(new Configuration)
+      format.open(split)
+      var result: (Int, Int, Int, Int, Int) = null
+      result = format.nextRecord(result)
+      assertNotNull(result)
+      assertEquals(Integer.valueOf(111), result._1)
+      assertEquals(Integer.valueOf(222), result._2)
+      assertEquals(Integer.valueOf(333), result._3)
+      assertEquals(Integer.valueOf(444), result._4)
+      assertEquals(Integer.valueOf(555), result._5)
+      result = format.nextRecord(result)
+      assertNotNull(result)
+      assertEquals(Integer.valueOf(666), result._1)
+      assertEquals(Integer.valueOf(777), result._2)
+      assertEquals(Integer.valueOf(888), result._3)
+      assertEquals(Integer.valueOf(999), result._4)
+      assertEquals(Integer.valueOf(000), result._5)
+      result = format.nextRecord(result)
+      assertNull(result)
+      assertTrue(format.reachedEnd)
+    }
+    catch {
+      case ex: Exception =>
+        fail("Test failed due to a " + ex.getClass.getName + ": " + ex.getMessage)
+    }
+  }
+
+  @Test
+  def testReadFirstN(): Unit = {
+    try {
+      val fileContent = "111|222|333|444|555|\n666|777|888|999|000|\n"
+      val split = createTempFile(fileContent)
+      val format = new ScalaCsvInputFormat[(Int, Int)](PATH, createTypeInformation[(Int, Int)])
+      format.setFieldDelimiter('|')
+      format.configure(new Configuration)
+      format.open(split)
+      var result: (Int, Int) = null
+      result = format.nextRecord(result)
+      assertNotNull(result)
+      assertEquals(Integer.valueOf(111), result._1)
+      assertEquals(Integer.valueOf(222), result._2)
+      result = format.nextRecord(result)
+      assertNotNull(result)
+      assertEquals(Integer.valueOf(666), result._1)
+      assertEquals(Integer.valueOf(777), result._2)
+      result = format.nextRecord(result)
+      assertNull(result)
+      assertTrue(format.reachedEnd)
+    }
+    catch {
+      case ex: Exception =>
+        fail("Test failed due to a " + ex.getClass.getName + ": " + ex.getMessage)
+    }
+  }
+
+  @Test
+  def testReadSparseWithPositionSetter(): Unit = {
+    try {
+      val fileContent: String = "111|222|333|444|555|666|777|888|999|000|\n000|999|888|777|666" +
+        "|555|444|333|222|111|"
+      val split = createTempFile(fileContent)
+      val format = new ScalaCsvInputFormat[(Int, Int, Int)](
+        PATH,
+        createTypeInformation[(Int, Int, Int)])
+      format.setFieldDelimiter('|')
+      format.setFields(Array(0, 3, 7), Array(classOf[Integer], classOf[Integer], classOf[Integer]))
+      format.configure(new Configuration)
+      format.open(split)
+      var result: (Int, Int, Int) = null
+      result = format.nextRecord(result)
+      assertNotNull(result)
+      assertEquals(Integer.valueOf(111), result._1)
+      assertEquals(Integer.valueOf(444), result._2)
+      assertEquals(Integer.valueOf(888), result._3)
+      result = format.nextRecord(result)
+      assertNotNull(result)
+      assertEquals(Integer.valueOf(000), result._1)
+      assertEquals(Integer.valueOf(777), result._2)
+      assertEquals(Integer.valueOf(333), result._3)
+      result = format.nextRecord(result)
+      assertNull(result)
+      assertTrue(format.reachedEnd)
+    }
+    catch {
+      case ex: Exception =>
+        fail("Test failed due to a " + ex.getClass.getName + ": " + ex.getMessage)
+    }
+  }
+
+  @Test
+  def testReadSparseWithShuffledPositions(): Unit = {
+    try {
+      val format = new ScalaCsvInputFormat[(Int, Int, Int)](
+        PATH,
+        createTypeInformation[(Int, Int, Int)])
+      format.setFieldDelimiter('|')
+      try {
+        format.setFields(Array(8, 1, 3), Array(classOf[Integer],classOf[Integer],classOf[Integer]))
+        fail("Input sequence should have been rejected.")
+      }
+      catch {
+        case e: IllegalArgumentException => // ignore
+      }
+    }
+    catch {
+      case ex: Exception =>
+        fail("Test failed due to a " + ex.getClass.getName + ": " + ex.getMessage)
+    }
+  }
+
+  private def createTempFile(content: String): FileInputSplit = {
+    val tempFile = File.createTempFile("test_contents", "tmp")
+    tempFile.deleteOnExit()
+    val wrt = new FileWriter(tempFile)
+    wrt.write(content)
+    wrt.close()
+    new FileInputSplit(0, new Path(tempFile.toURI.toString), 0,
+      tempFile.length,Array[String]("localhost"))
+  }
+
+  @Test
+  def testWindowsLineEndRemoval(): Unit = {
+    this.testRemovingTrailingCR("\n", "\n")
+    this.testRemovingTrailingCR("\r\n", "\r\n")
+    this.testRemovingTrailingCR("\r\n", "\n")
+  }
+
+  private def testRemovingTrailingCR(lineBreakerInFile: String, lineBreakerSetup: String) {
+    var tempFile: File = null
+    val fileContent = FIRST_PART + lineBreakerInFile + SECOND_PART + lineBreakerInFile
+    try {
+      tempFile = File.createTempFile("CsvInputFormatTest", "tmp")
+      tempFile.deleteOnExit()
+      tempFile.setWritable(true)
+      val wrt = new OutputStreamWriter(new FileOutputStream(tempFile))
+      wrt.write(fileContent)
+      wrt.close()
+      val inputFormat = new ScalaCsvInputFormat[Tuple1[String]](new Path(tempFile.toURI.toString),
+        createTypeInformation[Tuple1[String]])
+      val parameters = new Configuration
+      inputFormat.configure(parameters)
+      inputFormat.setDelimiter(lineBreakerSetup)
+      val splits = inputFormat.createInputSplits(1)
+      inputFormat.open(splits(0))
+      var result = inputFormat.nextRecord(null)
+      assertNotNull("Expecting to not return null", result)
+      assertEquals(FIRST_PART, result._1)
+      result = inputFormat.nextRecord(result)
+      assertNotNull("Expecting to not return null", result)
+      assertEquals(SECOND_PART, result._1)
+    }
+    catch {
+      case t: Throwable =>
+        System.err.println("test failed with exception: " + t.getMessage)
+        t.printStackTrace(System.err)
+        fail("Test erroneous")
+    }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8131fa7/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/AggregateOperatorTest.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/AggregateOperatorTest.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/AggregateOperatorTest.scala
new file mode 100644
index 0000000..ae9fe22
--- /dev/null
+++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/AggregateOperatorTest.scala
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala.operators
+
+import org.junit.Assert
+import org.apache.flink.api.common.InvalidProgramException
+import org.apache.flink.api.java.aggregation.Aggregations
+import org.apache.flink.api.java.aggregation.UnsupportedAggregationTypeException
+import org.junit.Test
+
+import org.apache.flink.api.scala._
+
+class AggregateOperatorTest {
+
+  private final val emptyTupleData = Array[(Int, Long, String, Long, Int)]()
+  private final val tupleTypeInfo = createTypeInformation[(Int, Long, String, Long, Int)]
+  private final val emptyLongData = Array[Long]()
+
+  @Test
+  def testFieldsAggregate(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+
+    val tupleDs = env.fromCollection(emptyTupleData)
+
+    // should work
+    try {
+      tupleDs.aggregate(Aggregations.SUM, 1)
+    } catch {
+      case e: Exception => Assert.fail()
+    }
+
+    // should not work: index out of bounds
+    try {
+      tupleDs.aggregate(Aggregations.SUM, 10)
+      Assert.fail()
+    } catch {
+      case iae: IllegalArgumentException =>
+      case e: Exception => Assert.fail()
+    }
+
+    val longDs = env.fromCollection(emptyLongData)
+
+    // should not work: not applied to tuple DataSet
+    try {
+      longDs.aggregate(Aggregations.MIN, 1)
+      Assert.fail()
+    } catch {
+      case uoe: InvalidProgramException =>
+      case e: Exception => Assert.fail()
+    }
+  }
+
+  @Test
+  def testAggregationTypes(): Unit = {
+    try {
+      val env = ExecutionEnvironment.getExecutionEnvironment
+
+      val tupleDs = env.fromCollection(emptyTupleData)
+
+      tupleDs.aggregate(Aggregations.SUM, 0).aggregate(Aggregations.MIN, 4)
+      tupleDs.aggregate(Aggregations.MIN, 2).aggregate(Aggregations.SUM, 1)
+      try {
+        tupleDs.aggregate(Aggregations.SUM, 2)
+        Assert.fail()
+      } catch {
+        case iae: UnsupportedAggregationTypeException =>
+      }
+    } catch {
+      case e: Exception => {
+        System.err.println(e.getMessage)
+        e.printStackTrace()
+        Assert.fail(e.getMessage)
+      }
+    }
+  }
+
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8131fa7/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/CoGroupOperatorTest.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/CoGroupOperatorTest.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/CoGroupOperatorTest.scala
new file mode 100644
index 0000000..3608a50
--- /dev/null
+++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/CoGroupOperatorTest.scala
@@ -0,0 +1,213 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala.operators
+
+import java.io.Serializable
+import org.apache.flink.api.common.InvalidProgramException
+import org.junit.Assert
+import org.junit.Ignore
+import org.junit.Test
+import org.apache.flink.api.scala._
+
+class CoGroupOperatorTest {
+
+  private val emptyTupleData = Array[(Int, Long, String, Long, Int)]()
+  private var customTypeData = Array[CustomType](new CustomType())
+
+  @Test
+  def testCoGroupKeyFields1(): Unit =  {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds1 = env.fromCollection(emptyTupleData)
+    val ds2 = env.fromCollection(emptyTupleData)
+
+    // Should work
+    try {
+      ds1.coGroup(ds2).where(0).equalTo(0)
+    }
+    catch {
+      case e: Exception => Assert.fail()
+    }
+  }
+
+  @Test(expected = classOf[InvalidProgramException])
+  def testCoGroupKeyFields2(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds1 = env.fromCollection(emptyTupleData)
+    val ds2 = env.fromCollection(emptyTupleData)
+
+    // Should not work, incompatible key types
+    ds1.coGroup(ds2).where(0).equalTo(2)
+  }
+
+  @Test(expected = classOf[InvalidProgramException])
+  def testCoGroupKeyFields3(): Unit =  {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds1 = env.fromCollection(emptyTupleData)
+    val ds2 = env.fromCollection(emptyTupleData)
+
+    // Should not work, incompatible number of key fields
+    ds1.coGroup(ds2).where(0, 1).equalTo(2)
+  }
+
+  @Test(expected = classOf[IllegalArgumentException])
+  def testCoGroupKeyFields4(): Unit =  {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds1 = env.fromCollection(emptyTupleData)
+    val ds2 = env.fromCollection(emptyTupleData)
+
+    // Should not work, field position out of range
+    ds1.coGroup(ds2).where(5).equalTo(0)
+  }
+
+  @Test(expected = classOf[IllegalArgumentException])
+  def testCoGroupKeyFields5(): Unit =  {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds1 = env.fromCollection(emptyTupleData)
+    val ds2 = env.fromCollection(emptyTupleData)
+
+    // Should not work, negative field position
+    ds1.coGroup(ds2).where(-1).equalTo(-1)
+  }
+
+  @Test(expected = classOf[IllegalArgumentException])
+  def testCoGroupKeyFields6(): Unit = {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val ds1 = env.fromCollection(emptyTupleData)
+    val ds2 = env.fromCollection(customTypeData)
+
+    // Should not work, field position key on custom data type
+    ds1.coGroup(ds2).where(5).equalTo(0)
+  }
+
+  @Ignore
+  @Test
+  def testCoGroupKeyExpressions1(): Unit =  {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val ds1 = env.fromCollection(customTypeData)
+    val ds2 = env.fromCollection(customTypeData)
+
+    // Should work
+    try {
+//      ds1.coGroup(ds2).where("i").equalTo("i");
+
+    }catch {
+      case e: Exception => Assert.fail()
+    }
+  }
+
+  @Ignore
+  @Test(expected = classOf[InvalidProgramException])
+  def testCoGroupKeyExpressions2(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds1 = env.fromCollection(customTypeData)
+    val ds2 = env.fromCollection(customTypeData)
+
+    // should not work, incompatible key types
+//    ds1.coGroup(ds2).where("i").equalTo("s")
+  }
+
+  @Ignore
+  @Test(expected = classOf[InvalidProgramException])
+  def testCoGroupKeyExpressions3(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds1 = env.fromCollection(customTypeData)
+    val ds2 = env.fromCollection(customTypeData)
+
+    // should not work, incompatible number of keys
+//    ds1.coGroup(ds2).where("i", "s").equalTo("s")
+  }
+
+  @Ignore
+  @Test(expected = classOf[IllegalArgumentException])
+  def testCoGroupKeyExpressions4(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds1 = env.fromCollection(customTypeData)
+    val ds2 = env.fromCollection(customTypeData)
+
+
+    // should not work, key non-existent
+//    ds1.coGroup(ds2).where("myNonExistent").equalTo("i")
+  }
+
+  @Test
+  def testCoGroupKeySelectors1(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds1 = env.fromCollection(customTypeData)
+    val ds2 = env.fromCollection(customTypeData)
+
+    // Should work
+    try {
+      ds1.coGroup(ds2).where { _.l } equalTo { _.l }
+    }
+    catch {
+      case e: Exception => Assert.fail()
+    }
+  }
+
+  @Test
+  def testCoGroupKeyMixing1(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds1 = env.fromCollection(customTypeData)
+    val ds2 = env.fromCollection(emptyTupleData)
+
+    // Should work
+    try {
+      ds1.coGroup(ds2).where { _.l}.equalTo(3)
+    }
+    catch {
+      case e: Exception => Assert.fail()
+    }
+  }
+
+  @Test
+  def testCoGroupKeyMixing2(): Unit =  {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds1 = env.fromCollection(emptyTupleData)
+    val ds2 = env.fromCollection(customTypeData)
+
+    // Should work
+    try {
+      ds1.coGroup(ds2).where(3).equalTo { _.l }
+    }
+    catch {
+      case e: Exception => Assert.fail()
+    }
+  }
+
+  @Test(expected = classOf[InvalidProgramException])
+  def testCoGroupKeyMixing3(): Unit =  {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds1 = env.fromCollection(emptyTupleData)
+    val ds2 = env.fromCollection(customTypeData)
+
+    // Should not work, incompatible types
+    ds1.coGroup(ds2).where(2).equalTo { _.l }
+  }
+
+  @Test(expected = classOf[InvalidProgramException])
+  def testCoGroupKeyMixing4(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds1 = env.fromCollection(emptyTupleData)
+    val ds2 = env.fromCollection(customTypeData)
+
+    // Should not work, more than one field position key
+    ds1.coGroup(ds2).where(1, 3).equalTo { _.l }
+  }
+}
+
+

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8131fa7/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/CustomType.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/CustomType.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/CustomType.scala
new file mode 100644
index 0000000..3b82f4c
--- /dev/null
+++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/CustomType.scala
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala.operators
+
+import java.io.Serializable
+
+/**
+ * A custom data type that is used by the operator Tests.
+ */
+class CustomType(var i:Int, var l: Long, var s: String) extends Serializable {
+  def this() {
+    this(0, 0, null)
+  }
+
+  override def toString: String = {
+    i + "," + l + "," + s
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8131fa7/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/DistinctOperatorTest.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/DistinctOperatorTest.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/DistinctOperatorTest.scala
new file mode 100644
index 0000000..693cd87
--- /dev/null
+++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/DistinctOperatorTest.scala
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala.operators
+
+import org.junit.Assert
+import org.apache.flink.api.common.InvalidProgramException
+import org.junit.Test
+
+import org.apache.flink.api.scala._
+
+class DistinctOperatorTest {
+
+  private val emptyTupleData = Array[(Int, Long, String, Long, Int)]()
+  private val customTypeData = Array[CustomType](new CustomType())
+  private val emptyLongData = Array[Long]()
+
+  @Test
+  def testDistinctByKeyFields1(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tupleDs = env.fromCollection(emptyTupleData)
+
+    // Should work
+    try {
+      tupleDs.distinct(0)
+    }
+    catch {
+      case e: Exception => Assert.fail()
+    }
+  }
+
+  @Test(expected = classOf[InvalidProgramException])
+  def testDistinctByKeyFields2(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val longDs = env.fromCollection(emptyLongData)
+
+    // should not work: distinct on basic type
+    longDs.distinct(0)
+  }
+
+  @Test(expected = classOf[InvalidProgramException])
+  def testDistinctByKeyFields3(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val customDs = env.fromCollection(customTypeData)
+
+    // should not work: field position key on custom type
+    customDs.distinct(0)
+  }
+
+  @Test
+  def testDistinctByKeyFields4(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tupleDs = env.fromCollection(emptyTupleData)
+
+    // should work
+    tupleDs.distinct()
+  }
+
+  @Test(expected = classOf[InvalidProgramException])
+  def testDistinctByKeyFields5(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val customDs = env.fromCollection(customTypeData)
+
+    // should not work: distinct on custom type
+    customDs.distinct()
+  }
+
+  @Test(expected = classOf[IllegalArgumentException])
+  def testDistinctByKeyFields6(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tupleDs = env.fromCollection(emptyTupleData)
+
+    // should not work, negative field position key
+    tupleDs.distinct(-1)
+  }
+
+  @Test
+  def testDistinctByKeySelector1(): Unit = {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    try {
+      val customDs = env.fromCollection(customTypeData)
+      customDs.distinct {_.l}
+    }
+    catch {
+      case e: Exception => Assert.fail()
+    }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8131fa7/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/GroupingTest.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/GroupingTest.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/GroupingTest.scala
new file mode 100644
index 0000000..841fd52
--- /dev/null
+++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/GroupingTest.scala
@@ -0,0 +1,182 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala.operators
+
+import org.junit.Assert
+import org.apache.flink.api.common.InvalidProgramException
+import org.apache.flink.api.common.operators.Order
+import org.junit.Ignore
+import org.junit.Test
+
+import org.apache.flink.api.scala._
+
+
+class GroupingTest {
+
+  private val emptyTupleData = Array[(Int, Long, String, Long, Int)]()
+  private val customTypeData = Array[CustomType](new CustomType())
+  private val emptyLongData = Array[Long]()
+
+  @Test
+  def testGroupByKeyFields1(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tupleDs = env.fromCollection(emptyTupleData)
+
+    // should work
+    try {
+      tupleDs.groupBy(0)
+    }
+    catch {
+      case e: Exception => Assert.fail()
+    }
+  }
+
+  @Test(expected = classOf[InvalidProgramException])
+  def testGroupByKeyFields2(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val longDs = env.fromCollection(emptyLongData)
+
+    // should not work, grouping on basic type
+    longDs.groupBy(0)
+  }
+
+  @Test(expected = classOf[InvalidProgramException])
+  def testGroupByKeyFields3(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val customDs = env.fromCollection(customTypeData)
+
+    // should not work, field position key on custom type
+    customDs.groupBy(0)
+  }
+
+  @Test(expected = classOf[IllegalArgumentException])
+  def testGroupByKeyFields4(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tupleDs = env.fromCollection(emptyTupleData)
+
+    // should not work, fiels position out of range
+    tupleDs.groupBy(5)
+  }
+
+  @Test(expected = classOf[IllegalArgumentException])
+  def testGroupByKeyFields5(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tupleDs = env.fromCollection(emptyTupleData)
+
+    // should not work, negative field position
+    tupleDs.groupBy(-1)
+  }
+
+  @Ignore
+  @Test
+  def testGroupByKeyExpressions1(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = env.fromCollection(customTypeData)
+
+    // should work
+    try {
+//      ds.groupBy("i");
+    }
+    catch {
+      case e: Exception => Assert.fail()
+    }
+  }
+
+  @Ignore
+  @Test(expected = classOf[UnsupportedOperationException])
+  def testGroupByKeyExpressions2(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+
+    // should not work: groups on basic type
+//    longDs.groupBy("l");
+    val longDs = env.fromCollection(emptyLongData)
+  }
+
+  @Ignore
+  @Test(expected = classOf[InvalidProgramException])
+  def testGroupByKeyExpressions3(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val customDs = env.fromCollection(customTypeData)
+
+    // should not work: groups on custom type
+    customDs.groupBy(0)
+  }
+
+  @Ignore
+  @Test(expected = classOf[IllegalArgumentException])
+  def testGroupByKeyExpressions4(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = env.fromCollection(customTypeData)
+
+    // should not work, non-existent field
+//    ds.groupBy("myNonExistent");
+  }
+
+  @Test
+  def testGroupByKeySelector1(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    try {
+      val customDs = env.fromCollection(customTypeData)
+      customDs.groupBy { _.l }
+    }
+    catch {
+      case e: Exception => Assert.fail()
+    }
+  }
+
+  @Test
+  def testGroupSortKeyFields1(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tupleDs = env.fromCollection(emptyTupleData)
+    try {
+      tupleDs.groupBy(0).sortGroup(0, Order.ASCENDING)
+    }
+    catch {
+      case e: Exception => Assert.fail()
+    }
+  }
+
+  @Test(expected = classOf[IllegalArgumentException])
+  def testGroupSortKeyFields2(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tupleDs = env.fromCollection(emptyTupleData)
+
+    // should not work, field position out of range
+    tupleDs.groupBy(0).sortGroup(5, Order.ASCENDING)
+  }
+
+  @Test(expected = classOf[InvalidProgramException])
+  def testGroupSortKeyFields3(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val longDs = env.fromCollection(emptyLongData)
+    longDs.groupBy { x: Long => x } .sortGroup(0, Order.ASCENDING)
+  }
+
+  @Test
+  def testChainedGroupSortKeyFields(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tupleDs = env.fromCollection(emptyTupleData)
+    try {
+      tupleDs.groupBy(0).sortGroup(0, Order.ASCENDING).sortGroup(2, Order.DESCENDING)
+    }
+    catch {
+      case e: Exception => Assert.fail()
+    }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8131fa7/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/JoinOperatorTest.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/JoinOperatorTest.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/JoinOperatorTest.scala
new file mode 100644
index 0000000..fff9857
--- /dev/null
+++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/JoinOperatorTest.scala
@@ -0,0 +1,200 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala.operators
+
+import org.junit.Assert
+import org.apache.flink.api.common.InvalidProgramException
+import org.junit.Ignore
+import org.junit.Test
+
+import org.apache.flink.api.scala._
+
+class JoinOperatorTest {
+
+  private val emptyTupleData = Array[(Int, Long, String, Long, Int)]()
+  private val customTypeData = Array[CustomType](new CustomType())
+  private val emptyLongData = Array[Long]()
+
+  @Test
+  def testJoinKeyFields1(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds1 = env.fromCollection(emptyTupleData)
+    val ds2 = env.fromCollection(emptyTupleData)
+    try {
+      ds1.join(ds2).where(0).equalTo(0)
+    }
+    catch {
+      case e: Exception => Assert.fail()
+    }
+  }
+
+  @Test(expected = classOf[InvalidProgramException])
+  def testJoinKeyFields2(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds1 = env.fromCollection(emptyTupleData)
+    val ds2 = env.fromCollection(emptyTupleData)
+    ds1.join(ds2).where(0).equalTo(2)
+  }
+
+  @Test(expected = classOf[InvalidProgramException])
+  def testJoinKeyFields3(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds1 = env.fromCollection(emptyTupleData)
+    val ds2 = env.fromCollection(emptyTupleData)
+    ds1.join(ds2).where(0, 1).equalTo(2)
+  }
+
+  @Test(expected = classOf[IllegalArgumentException])
+  def testJoinKeyFields4(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds1 = env.fromCollection(emptyTupleData)
+    val ds2 = env.fromCollection(emptyTupleData)
+    ds1.join(ds2).where(5).equalTo(0)
+  }
+
+  @Test(expected = classOf[IllegalArgumentException])
+  def testJoinKeyFields5(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds1 = env.fromCollection(emptyTupleData)
+    val ds2 = env.fromCollection(emptyTupleData)
+    ds1.join(ds2).where(-1).equalTo(-1)
+  }
+
+  @Test(expected = classOf[IllegalArgumentException])
+  def testJoinKeyFields6(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds1 = env.fromCollection(emptyTupleData)
+    val ds2 = env.fromCollection(customTypeData)
+    ds1.join(ds2).where(5).equalTo(0)
+  }
+
+  @Ignore
+  @Test
+  def testJoinKeyExpressions1(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds1 = env.fromCollection(customTypeData)
+    val ds2 = env.fromCollection(customTypeData)
+
+    // should work
+    try {
+//      ds1.join(ds2).where("i").equalTo("i")
+    }
+    catch {
+      case e: Exception => Assert.fail()
+    }
+  }
+
+  @Ignore
+  @Test(expected = classOf[InvalidProgramException])
+  def testJoinKeyExpressions2(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds1 = env.fromCollection(customTypeData)
+    val ds2 = env.fromCollection(customTypeData)
+
+    // should not work, incompatible join key types
+//    ds1.join(ds2).where("i").equalTo("s")
+  }
+
+  @Ignore
+  @Test(expected = classOf[InvalidProgramException])
+  def testJoinKeyExpressions3(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds1 = env.fromCollection(customTypeData)
+    val ds2 = env.fromCollection(customTypeData)
+
+    // should not work, incompatible number of keys
+//    ds1.join(ds2).where("i", "s").equalTo("i")
+  }
+
+  @Ignore
+  @Test(expected = classOf[IllegalArgumentException])
+  def testJoinKeyExpressions4(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds1 = env.fromCollection(customTypeData)
+    val ds2 = env.fromCollection(customTypeData)
+
+    // should not work, join key non-existent
+//    ds1.join(ds2).where("myNonExistent").equalTo("i")
+  }
+
+  @Test
+  def testJoinKeySelectors1(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds1 = env.fromCollection(customTypeData)
+    val ds2 = env.fromCollection(customTypeData)
+
+    // should work
+    try {
+      ds1.join(ds2).where { _.l} equalTo { _.l }
+    }
+    catch {
+      case e: Exception => Assert.fail()
+    }
+  }
+
+  @Test
+  def testJoinKeyMixing1(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds1 = env.fromCollection(customTypeData)
+    val ds2 = env.fromCollection(emptyTupleData)
+
+    // should work
+    try {
+      ds1.join(ds2).where { _.l }.equalTo(3)
+    }
+    catch {
+      case e: Exception => Assert.fail()
+    }
+  }
+
+  @Test
+  def testJoinKeyMixing2(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds1 = env.fromCollection(emptyTupleData)
+    val ds2 = env.fromCollection(customTypeData)
+
+    // should work
+    try {
+      ds1.join(ds2).where(3).equalTo { _.l }
+    }
+    catch {
+      case e: Exception => Assert.fail()
+    }
+  }
+
+  @Test(expected = classOf[InvalidProgramException])
+  def testJoinKeyMixing3(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds1 = env.fromCollection(emptyTupleData)
+    val ds2 = env.fromCollection(customTypeData)
+
+    // should not work, incompatible types
+    ds1.join(ds2).where(2).equalTo { _.l }
+  }
+
+  @Test(expected = classOf[InvalidProgramException])
+  def testJoinKeyMixing4(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds1 = env.fromCollection(emptyTupleData)
+    val ds2 = env.fromCollection(customTypeData)
+
+    // should not work, more than one field position key
+    ds1.join(ds2).where(1, 3) equalTo { _.l }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8131fa7/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/translation/AggregateTranslationTest.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/translation/AggregateTranslationTest.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/translation/AggregateTranslationTest.scala
new file mode 100644
index 0000000..7501f82
--- /dev/null
+++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/translation/AggregateTranslationTest.scala
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala.operators.translation
+
+import org.apache.flink.api.common.Plan
+import org.apache.flink.api.common.operators.base.{GenericDataSourceBase, GroupReduceOperatorBase}
+
+import org.apache.flink.api.java.aggregation.Aggregations
+import org.apache.flink.api.scala._
+import org.junit.Assert.{assertEquals, assertTrue, fail}
+import org.junit.Test
+
+class AggregateTranslationTest {
+  @Test
+  def translateAggregate(): Unit =  {
+    try {
+      val DOP = 8
+
+      val env = ExecutionEnvironment.createLocalEnvironment(DOP)
+
+      val initialData = env.fromElements((3.141592, "foobar", 77L))
+
+      initialData.groupBy(0).aggregate(Aggregations.MIN, 1).aggregate(Aggregations.SUM, 2).print()
+
+      val p: Plan = env.createProgramPlan()
+      val sink = p.getDataSinks.iterator.next
+
+      val reducer= sink.getInput.asInstanceOf[GroupReduceOperatorBase[_, _, _]]
+
+      assertEquals(1, reducer.getKeyColumns(0).length)
+      assertEquals(0, reducer.getKeyColumns(0)(0))
+      assertEquals(-1, reducer.getDegreeOfParallelism)
+      assertTrue(reducer.isCombinable)
+      assertTrue(reducer.getInput.isInstanceOf[GenericDataSourceBase[_, _]])
+    }
+    catch {
+      case e: Exception => {
+        System.err.println(e.getMessage)
+        e.printStackTrace()
+        fail("Test caused an error: " + e.getMessage)
+      }
+    }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8131fa7/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/translation/DeltaIterationTranslationTest.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/translation/DeltaIterationTranslationTest.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/translation/DeltaIterationTranslationTest.scala
new file mode 100644
index 0000000..5631cbb
--- /dev/null
+++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/translation/DeltaIterationTranslationTest.scala
@@ -0,0 +1,250 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala.operators.translation
+
+import org.apache.flink.api.common.functions.{RichCoGroupFunction, RichMapFunction,
+RichJoinFunction}
+import org.apache.flink.api.java.operators.translation.WrappingFunction
+import org.junit.Assert.assertArrayEquals
+import org.junit.Assert.assertEquals
+import org.junit.Assert.fail
+import org.apache.flink.api.common.{InvalidProgramException, Plan}
+import org.apache.flink.api.common.aggregators.LongSumAggregator
+import org.apache.flink.api.common.operators.base.DeltaIterationBase
+import org.apache.flink.api.common.operators.base.GenericDataSinkBase
+import org.apache.flink.api.common.operators.base.JoinOperatorBase
+import org.apache.flink.api.common.operators.base.MapOperatorBase
+import org.junit.Test
+
+import org.apache.flink.util.Collector
+
+import org.apache.flink.api.scala._
+
+class DeltaIterationTranslationTest {
+
+  @Test
+  def testCorrectTranslation(): Unit = {
+    try {
+      val JOB_NAME = "Test JobName"
+      val ITERATION_NAME = "Test Name"
+      val BEFORE_NEXT_WORKSET_MAP = "Some Mapper"
+      val AGGREGATOR_NAME = "AggregatorName"
+      val ITERATION_KEYS = Array(2)
+      val NUM_ITERATIONS = 13
+      val DEFAULT_DOP = 133
+      val ITERATION_DOP = 77
+
+      val env = ExecutionEnvironment.getExecutionEnvironment
+      env.setDegreeOfParallelism(DEFAULT_DOP)
+
+      val initialSolutionSet = env.fromElements((3.44, 5L, "abc"))
+      val initialWorkSet = env.fromElements((1.23, "abc"))
+
+      val result = initialSolutionSet.iterateDelta(initialWorkSet, NUM_ITERATIONS, ITERATION_KEYS) {
+        (s, ws) =>
+          val wsSelfJoin = ws.map(new IdentityMapper[(Double, String)]())
+            .join(ws).where(1).equalTo(1) { (l, r) => Some(l) }
+
+          val joined = wsSelfJoin.join(s).where(1).equalTo(2).apply(new SolutionWorksetJoin)
+          (joined, joined.map(new NextWorksetMapper).name(BEFORE_NEXT_WORKSET_MAP))
+      }
+      result.name(ITERATION_NAME)
+        .setParallelism(ITERATION_DOP)
+        .registerAggregator(AGGREGATOR_NAME, new LongSumAggregator)
+
+      result.print()
+      result.writeAsText("/dev/null")
+
+      val p: Plan = env.createProgramPlan(JOB_NAME)
+      assertEquals(JOB_NAME, p.getJobName)
+      assertEquals(DEFAULT_DOP, p.getDefaultParallelism)
+      var sink1: GenericDataSinkBase[_] = null
+      var sink2: GenericDataSinkBase[_] = null
+      val sinks = p.getDataSinks.iterator
+      sink1 = sinks.next
+      sink2 = sinks.next
+
+      val iteration: DeltaIterationBase[_, _] =
+        sink1.getInput.asInstanceOf[DeltaIterationBase[_,_]]
+
+      assertEquals(iteration, sink2.getInput)
+      assertEquals(NUM_ITERATIONS, iteration.getMaximumNumberOfIterations)
+      assertArrayEquals(ITERATION_KEYS, iteration.getSolutionSetKeyFields)
+      assertEquals(ITERATION_DOP, iteration.getDegreeOfParallelism)
+      assertEquals(ITERATION_NAME, iteration.getName)
+
+      val nextWorksetMapper: MapOperatorBase[_, _, _] =
+        iteration.getNextWorkset.asInstanceOf[MapOperatorBase[_, _, _]]
+      val solutionSetJoin: JoinOperatorBase[_, _, _, _] =
+        iteration.getSolutionSetDelta.asInstanceOf[JoinOperatorBase[_, _, _, _]]
+      val worksetSelfJoin: JoinOperatorBase[_, _, _, _] =
+        solutionSetJoin.getFirstInput.asInstanceOf[JoinOperatorBase[_, _, _, _]]
+      val worksetMapper: MapOperatorBase[_, _, _] =
+        worksetSelfJoin.getFirstInput.asInstanceOf[MapOperatorBase[_, _, _]]
+
+      assertEquals(classOf[IdentityMapper[_]], worksetMapper.getUserCodeWrapper.getUserCodeClass)
+
+
+      assertEquals(classOf[NextWorksetMapper], nextWorksetMapper.getUserCodeWrapper.getUserCodeClass)
+
+
+      if (solutionSetJoin.getUserCodeWrapper.getUserCodeObject.isInstanceOf[WrappingFunction[_]]) {
+        val wf: WrappingFunction[_] = solutionSetJoin.getUserCodeWrapper.getUserCodeObject
+          .asInstanceOf[WrappingFunction[_]]
+        assertEquals(classOf[SolutionWorksetJoin],
+          wf.getWrappedFunction.getClass)
+      }
+      else {
+        assertEquals(classOf[SolutionWorksetJoin],
+          solutionSetJoin.getUserCodeWrapper.getUserCodeClass)
+      }
+
+      assertEquals(BEFORE_NEXT_WORKSET_MAP, nextWorksetMapper.getName)
+      assertEquals(AGGREGATOR_NAME, iteration.getAggregators.getAllRegisteredAggregators.iterator
+        .next.getName)
+    }
+    catch {
+      case e: Exception => {
+        System.err.println(e.getMessage)
+        e.printStackTrace()
+        fail(e.getMessage)
+      }
+    }
+  }
+
+  @Test
+  def testRejectWhenSolutionSetKeysDontMatchJoin(): Unit = {
+    try {
+      val env = ExecutionEnvironment.getExecutionEnvironment
+
+      val initialSolutionSet = env.fromElements((3.44, 5L, "abc"))
+      val initialWorkSet = env.fromElements((1.23, "abc"))
+
+      val iteration = initialSolutionSet.iterateDelta(initialWorkSet, 10, Array(0)) {
+        (s, ws) =>
+          try {
+            ws.join(s).where(1).equalTo(2)
+            fail("Accepted invalid program.")
+          } catch {
+            case e: InvalidProgramException => // all good
+          }
+          try {
+            s.join(ws).where(2).equalTo(1)
+            fail("Accepted invalid program.")
+          } catch {
+            case e: InvalidProgramException => // all good
+          }
+          (s, ws)
+      }
+    } catch {
+      case e: Exception => {
+        System.err.println(e.getMessage)
+        e.printStackTrace()
+        fail(e.getMessage)
+      }
+    }
+  }
+
+  @Test
+  def testRejectWhenSolutionSetKeysDontMatchCoGroup(): Unit = {
+    try {
+      val env = ExecutionEnvironment.getExecutionEnvironment
+
+      val initialSolutionSet = env.fromElements((3.44, 5L, "abc"))
+      val initialWorkSet = env.fromElements((1.23, "abc"))
+
+      val iteration = initialSolutionSet.iterateDelta(initialWorkSet, 10, Array(0)) {
+        (s, ws) =>
+          try {
+            ws.coGroup(s).where(1).equalTo(2)
+            fail("Accepted invalid program.")
+          } catch {
+            case e: InvalidProgramException => // all good
+          }
+          try {
+            s.coGroup(ws).where(2).equalTo(1)
+            fail("Accepted invalid program.")
+          } catch {
+            case e: InvalidProgramException => // all good
+          }
+          (s, ws)
+      }
+    } catch {
+      case e: Exception => {
+        System.err.println(e.getMessage)
+        e.printStackTrace()
+        fail(e.getMessage)
+      }
+    }
+  }
+//
+//  @Test def testRejectWhenSolutionSetKeysDontMatchCoGroup {
+//    try {
+//      val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+//      @SuppressWarnings(Array("unchecked")) val initialSolutionSet: DataSet[Tuple3[Double, Long,
+//        String]] = env.fromElements(new Tuple3[Double, Long, String](3.44, 5L, "abc"))
+//      @SuppressWarnings(Array("unchecked")) val initialWorkSet: DataSet[Tuple2[Double,
+//        String]] = env.fromElements(new Tuple2[Double, String](1.23, "abc"))
+//      val iteration: DeltaIteration[Tuple3[Double, Long, String], Tuple2[Double,
+//        String]] = initialSolutionSet.iterateDelta(initialWorkSet, 10, 1)
+//      try {
+//        iteration.getWorkset.coGroup(iteration.getSolutionSet).where(1).equalTo(2).`with`(new DeltaIterationTranslationTest.SolutionWorksetCoGroup1)
+//        fail("Accepted invalid program.")
+//      }
+//      catch {
+//        case e: InvalidProgramException => {
+//        }
+//      }
+//      try {
+//        iteration.getSolutionSet.coGroup(iteration.getWorkset).where(2).equalTo(1).`with`(new DeltaIterationTranslationTest.SolutionWorksetCoGroup2)
+//        fail("Accepted invalid program.")
+//      }
+//      catch {
+//        case e: InvalidProgramException => {
+//        }
+//      }
+//    }
+//    catch {
+//      case e: Exception => {
+//        System.err.println(e.getMessage)
+//        e.printStackTrace
+//        fail(e.getMessage)
+//      }
+//    }
+//  }
+}
+
+class SolutionWorksetJoin
+  extends RichJoinFunction[(Double, String), (Double, Long, String), (Double, Long, String)] {
+  def join(first: (Double, String), second: (Double, Long, String)): (Double, Long, String) = {
+    null
+  }
+}
+
+class NextWorksetMapper extends RichMapFunction[(Double, Long, String), (Double, String)] {
+  def map(value: (Double, Long, String)): (Double, String) = {
+    null
+  }
+}
+
+class IdentityMapper[T] extends RichMapFunction[T, T] {
+  def map(value: T): T = {
+    value
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8131fa7/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/translation/DistinctTranslationTest.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/translation/DistinctTranslationTest.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/translation/DistinctTranslationTest.scala
new file mode 100644
index 0000000..4ba9652
--- /dev/null
+++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/translation/DistinctTranslationTest.scala
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala.operators.translation
+
+import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase
+import org.junit.Assert
+import org.junit.Test
+
+import org.apache.flink.api.scala._
+
+class DistinctTranslationTest {
+  @Test
+  def testCombinable(): Unit = {
+    try {
+      val env = ExecutionEnvironment.getExecutionEnvironment
+      val input = env.fromElements("1", "2", "1", "3")
+
+      val op = input.distinct { x => x}
+      op.print()
+
+      val p = env.createProgramPlan()
+
+      val reduceOp =
+        p.getDataSinks.iterator.next.getInput.asInstanceOf[GroupReduceOperatorBase[_, _, _]]
+
+      Assert.assertTrue(reduceOp.isCombinable)
+    }
+    catch {
+      case e: Exception => {
+        e.printStackTrace()
+        Assert.fail(e.getMessage)
+      }
+    }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8131fa7/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/translation/ReduceTranslationTest.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/translation/ReduceTranslationTest.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/translation/ReduceTranslationTest.scala
new file mode 100644
index 0000000..36aa4f2
--- /dev/null
+++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/translation/ReduceTranslationTest.scala
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala.operators.translation
+
+import org.apache.flink.api.java.operators.translation.{KeyExtractingMapper,
+PlanUnwrappingReduceOperator}
+import org.apache.flink.api.java.typeutils.{TupleTypeInfo, BasicTypeInfo}
+import org.junit.Assert._
+import org.apache.flink.api.common.operators.base.GenericDataSinkBase
+import org.apache.flink.api.common.operators.base.GenericDataSourceBase
+import org.apache.flink.api.common.operators.base.MapOperatorBase
+import org.apache.flink.api.common.operators.base.ReduceOperatorBase
+import org.junit.Test
+
+import org.apache.flink.api.scala._
+
+class ReduceTranslationTest {
+  @Test
+  def translateNonGroupedReduce(): Unit = {
+    try {
+      val DOP = 8
+      val env = ExecutionEnvironment.createLocalEnvironment(DOP)
+
+      val initialData = env.fromElements((3.141592, "foobar", 77L)).setParallelism(1)
+
+
+      initialData reduce { (v1, v2) => v1 } print()
+
+      val p = env.createProgramPlan(
+
+)
+      val sink: GenericDataSinkBase[_] = p.getDataSinks.iterator.next
+      val reducer: ReduceOperatorBase[_, _] = sink.getInput.asInstanceOf[ReduceOperatorBase[_, _]]
+
+      assertEquals(initialData.set.getType, reducer.getOperatorInfo.getInputType)
+      assertEquals(initialData.set.getType, reducer.getOperatorInfo.getOutputType)
+      assertTrue(reducer.getKeyColumns(0) == null || reducer.getKeyColumns(0).length == 0)
+      assertTrue(reducer.getDegreeOfParallelism == 1 || reducer.getDegreeOfParallelism == -1)
+      assertTrue(reducer.getInput.isInstanceOf[GenericDataSourceBase[_, _]])
+    }
+    catch {
+      case e: Exception => {
+        System.err.println(e.getMessage)
+        e.printStackTrace()
+        fail("Test caused an error: " + e.getMessage)
+      }
+    }
+  }
+
+  @Test
+  def translateGroupedReduceNoMapper(): Unit = {
+    try {
+      val DOP: Int = 8
+      val env = ExecutionEnvironment.createLocalEnvironment(DOP)
+
+      val initialData = env.fromElements((3.141592, "foobar", 77L)).setParallelism(1)
+
+      initialData.groupBy(2) reduce { (v1, v2) => v1 } print()
+
+      val p = env.createProgramPlan()
+
+      val sink: GenericDataSinkBase[_] = p.getDataSinks.iterator.next
+      val reducer: ReduceOperatorBase[_, _] = sink.getInput.asInstanceOf[ReduceOperatorBase[_, _]]
+      assertEquals(initialData.set.getType, reducer.getOperatorInfo.getInputType)
+      assertEquals(initialData.set.getType, reducer.getOperatorInfo.getOutputType)
+      assertTrue(reducer.getDegreeOfParallelism == DOP || reducer.getDegreeOfParallelism == -1)
+      assertArrayEquals(Array[Int](2), reducer.getKeyColumns(0))
+      assertTrue(reducer.getInput.isInstanceOf[GenericDataSourceBase[_, _]])
+    }
+    catch {
+      case e: Exception => {
+        System.err.println(e.getMessage)
+        e.printStackTrace()
+        fail("Test caused an error: " + e.getMessage)
+      }
+    }
+  }
+
+  @Test
+  def translateGroupedReduceWithKeyExtractor(): Unit = {
+    try {
+      val DOP: Int = 8
+      val env = ExecutionEnvironment.createLocalEnvironment(DOP)
+
+      val initialData = env.fromElements((3.141592, "foobar", 77L)).setParallelism(1)
+
+      initialData.groupBy { _._2 }. reduce { (v1, v2) => v1 } setParallelism(4) print()
+
+      val p = env.createProgramPlan()
+      val sink: GenericDataSinkBase[_] = p.getDataSinks.iterator.next
+      val keyProjector: MapOperatorBase[_, _, _] = sink.getInput.asInstanceOf[MapOperatorBase[_,
+        _, _]]
+      val reducer: PlanUnwrappingReduceOperator[_, _] = keyProjector.getInput
+        .asInstanceOf[PlanUnwrappingReduceOperator[_, _]]
+      val keyExtractor: MapOperatorBase[_, _, _] = reducer.getInput
+        .asInstanceOf[MapOperatorBase[_, _, _]]
+      assertEquals(1, keyExtractor.getDegreeOfParallelism)
+      assertEquals(4, reducer.getDegreeOfParallelism)
+      assertEquals(4, keyProjector.getDegreeOfParallelism)
+      val keyValueInfo = new TupleTypeInfo(
+        BasicTypeInfo.STRING_TYPE_INFO,
+        createTypeInformation[(Double, String, Long)])
+      assertEquals(initialData.set.getType, keyExtractor.getOperatorInfo.getInputType)
+      assertEquals(keyValueInfo, keyExtractor.getOperatorInfo.getOutputType)
+      assertEquals(keyValueInfo, reducer.getOperatorInfo.getInputType)
+      assertEquals(keyValueInfo, reducer.getOperatorInfo.getOutputType)
+      assertEquals(keyValueInfo, keyProjector.getOperatorInfo.getInputType)
+      assertEquals(initialData.set.getType, keyProjector.getOperatorInfo.getOutputType)
+      assertEquals(classOf[KeyExtractingMapper[_, _]], keyExtractor.getUserCodeWrapper.getUserCodeClass)
+      assertTrue(keyExtractor.getInput.isInstanceOf[GenericDataSourceBase[_, _]])
+    }
+    catch {
+      case e: Exception => {
+        System.err.println(e.getMessage)
+        e.printStackTrace()
+        fail("Test caused an error: " + e.getMessage)
+      }
+    }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8131fa7/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/GenericPairComparatorTest.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/GenericPairComparatorTest.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/GenericPairComparatorTest.scala
new file mode 100644
index 0000000..95a1b77
--- /dev/null
+++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/GenericPairComparatorTest.scala
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala.runtime
+
+import org.apache.flink.api.common.typeutils.{TypeComparator, TypeSerializer}
+import org.apache.flink.api.common.typeutils.base.{DoubleComparator, DoubleSerializer, IntComparator, IntSerializer}
+
+import org.apache.flink.api.java.typeutils.runtime.{GenericPairComparator, TupleComparator}
+import org.apache.flink.api.scala.runtime.tuple.base.PairComparatorTestBase
+import org.apache.flink.api.scala.typeutils.ScalaTupleComparator
+
+class GenericPairComparatorTest
+  extends PairComparatorTestBase[(Int, String, Double), (Int, Float, Long, Double)] {
+
+  override protected def createComparator(ascending: Boolean):
+  GenericPairComparator[(Int, String, Double), (Int, Float, Long, Double)] = {
+    val fields1  = Array[Int](0, 2)
+    val fields2 = Array[Int](0, 3)
+
+    val comps1 =
+      Array[TypeComparator[_]](new IntComparator(ascending), new DoubleComparator(ascending))
+    val comps2 =
+      Array[TypeComparator[_]](new IntComparator(ascending), new DoubleComparator(ascending))
+
+    val sers1 =
+      Array[TypeSerializer[_]](IntSerializer.INSTANCE, DoubleSerializer.INSTANCE)
+    val sers2 =
+      Array[TypeSerializer[_]](IntSerializer.INSTANCE, DoubleSerializer.INSTANCE)
+
+    val comp1 = new ScalaTupleComparator[(Int, String, Double)](fields1, comps1, sers1)
+    val comp2 = new ScalaTupleComparator[(Int, Float, Long, Double)](fields2, comps2, sers2)
+
+    new GenericPairComparator[(Int, String, Double), (Int, Float, Long, Double)](comp1, comp2)
+  }
+
+  protected def getSortedTestData:
+  (Array[(Int, String, Double)], Array[(Int, Float, Long, Double)]) = {
+    (dataISD, dataIDL)
+  }
+
+  private val dataISD = Array(
+    (4, "hello", 20.0),
+    (4, "world", 23.2),
+    (5, "hello", 18.0),
+    (5, "world", 19.2),
+    (6, "hello", 16.0),
+    (6, "world", 17.2),
+    (7, "hello", 14.0),
+    (7,"world", 15.2))
+
+  private val dataIDL = Array(
+    (4, 0.11f, 14L, 20.0),
+    (4, 0.221f, 15L, 23.2),
+    (5, 0.33f, 15L, 18.0),
+    (5, 0.44f, 20L, 19.2),
+    (6, 0.55f, 20L, 16.0),
+    (6, 0.66f, 29L, 17.2),
+    (7, 0.77f, 29L, 14.0),
+    (7, 0.88f, 34L, 15.2))
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8131fa7/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorILD2Test.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorILD2Test.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorILD2Test.scala
new file mode 100644
index 0000000..8e23744
--- /dev/null
+++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorILD2Test.scala
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala.runtime
+
+import org.apache.flink.api.common.typeutils.TypeComparator
+import org.apache.flink.api.common.typeutils.TypeSerializer
+import org.apache.flink.api.java.typeutils.TupleTypeInfoBase
+import org.apache.flink.api.scala.runtime.tuple.base.TupleComparatorTestBase
+
+import org.apache.flink.api.scala._
+
+
+class TupleComparatorILD2Test extends TupleComparatorTestBase[(Int, Long, Double)] {
+
+  protected def createComparator(ascending: Boolean): TypeComparator[(Int, Long, Double)] = {
+    val ti = createTypeInformation[(Int, Long, Double)]
+    ti.asInstanceOf[TupleTypeInfoBase[(Int, Long, Double)]]
+      .createComparator(Array(0, 1), Array(ascending, ascending))
+  }
+
+  protected def createSerializer: TypeSerializer[(Int, Long, Double)] = {
+    val ti = createTypeInformation[(Int, Long, Double)]
+    ti.createSerializer()
+  }
+
+  protected def getSortedTestData: Array[(Int, Long, Double)] = {
+    dataISD
+  }
+
+  private val dataISD = Array(
+    (4, 14L, 20.0),
+    (4, 15L, 23.2),
+    (5, 15L, 20.0),
+    (5, 20L, 20.0),
+    (6, 20L, 23.2),
+    (6, 29L, 20.0),
+    (7, 29L, 20.0),
+    (7, 34L, 23.2)
+  )
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8131fa7/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorILD3Test.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorILD3Test.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorILD3Test.scala
new file mode 100644
index 0000000..6907165
--- /dev/null
+++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorILD3Test.scala
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala.runtime
+
+import org.apache.flink.api.common.typeutils.{TypeComparator, TypeSerializer}
+import org.apache.flink.api.java.typeutils.TupleTypeInfoBase
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.runtime.tuple.base.TupleComparatorTestBase
+
+
+class TupleComparatorILD3Test extends TupleComparatorTestBase[(Int, Long, Double)] {
+
+  protected def createComparator(ascending: Boolean): TypeComparator[(Int, Long, Double)] = {
+    val ti = createTypeInformation[(Int, Long, Double)]
+    ti.asInstanceOf[TupleTypeInfoBase[(Int, Long, Double)]]
+      .createComparator(Array(0, 1, 2), Array(ascending, ascending, ascending))
+  }
+
+  protected def createSerializer: TypeSerializer[(Int, Long, Double)] = {
+    val ti = createTypeInformation[(Int, Long, Double)]
+    ti.createSerializer()
+  }
+
+  protected def getSortedTestData: Array[(Int, Long, Double)] = {
+    dataISD
+  }
+
+  private val dataISD = Array(
+    (4, 4L, 20.0),
+    (4, 4L, 23.2),
+    (4, 9L, 20.0),
+    (5, 4L, 20.0),
+    (5, 4L, 23.2),
+    (5, 9L, 20.0),
+    (6, 4L, 20.0),
+    (6, 4L, 23.2)
+  )
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8131fa7/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorILDC3Test.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorILDC3Test.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorILDC3Test.scala
new file mode 100644
index 0000000..445aaa1
--- /dev/null
+++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorILDC3Test.scala
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala.runtime
+
+import org.apache.flink.api.common.typeutils.{TypeSerializer, TypeComparator}
+import org.apache.flink.api.java.typeutils.TupleTypeInfoBase
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.runtime.tuple.base.TupleComparatorTestBase
+
+
+class TupleComparatorILDC3Test extends TupleComparatorTestBase[(Int, Long, Double)] {
+
+  protected def createComparator(ascending: Boolean): TypeComparator[(Int, Long, Double)] = {
+    val ti = createTypeInformation[(Int, Long, Double)]
+    ti.asInstanceOf[TupleTypeInfoBase[(Int, Long, Double)]]
+      .createComparator(Array(2, 0, 1), Array(ascending, ascending, ascending))
+  }
+
+  protected def createSerializer: TypeSerializer[(Int, Long, Double)] = {
+    val ti = createTypeInformation[(Int, Long, Double)]
+    ti.createSerializer()
+  }
+
+  protected def getSortedTestData: Array[(Int, Long, Double)] = {
+    dataISD
+  }
+
+  private val dataISD = Array(
+    (4, 4L, 20.0),
+    (5, 1L, 20.0),
+    (5, 2L, 20.0),
+    (5, 10L, 23.0),
+    (5, 19L, 24.0),
+    (5, 20L, 24.0),
+    (5, 24L, 25.0),
+    (5, 25L, 25.0)
+  )
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8131fa7/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorILDX1Test.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorILDX1Test.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorILDX1Test.scala
new file mode 100644
index 0000000..518705f
--- /dev/null
+++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorILDX1Test.scala
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala.runtime
+
+import org.apache.flink.api.common.typeutils.{TypeSerializer, TypeComparator}
+import org.apache.flink.api.java.typeutils.TupleTypeInfoBase
+import org.apache.flink.api.scala.runtime.tuple.base.TupleComparatorTestBase
+
+import org.apache.flink.api.scala._
+
+class TupleComparatorILDX1Test extends TupleComparatorTestBase[(Int, Long, Double)] {
+
+  protected def createComparator(ascending: Boolean): TypeComparator[(Int, Long, Double)] = {
+    val ti = createTypeInformation[(Int, Long, Double)]
+    ti.asInstanceOf[TupleTypeInfoBase[(Int, Long, Double)]]
+      .createComparator(Array(1), Array(ascending))
+  }
+
+  protected def createSerializer: TypeSerializer[(Int, Long, Double)] = {
+    val ti = createTypeInformation[(Int, Long, Double)]
+    ti.createSerializer()
+  }
+
+  protected def getSortedTestData: Array[(Int, Long, Double)] = {
+    dataISD
+  }
+
+  private val dataISD = Array(
+    (4, 4L, 20.0),
+    (4, 5L, 23.2),
+    (4, 9L, 20.0),
+    (4, 10L, 24.0),
+    (4, 19L, 23.2),
+    (4, 20L, 24.0),
+    (4, 24L, 20.0),
+    (4, 25L, 23.2)
+  )
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8131fa7/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorILDXC2Test.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorILDXC2Test.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorILDXC2Test.scala
new file mode 100644
index 0000000..0976da8
--- /dev/null
+++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorILDXC2Test.scala
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala.runtime
+
+import org.apache.flink.api.common.typeutils.{TypeSerializer, TypeComparator}
+import org.apache.flink.api.java.typeutils.TupleTypeInfoBase
+import org.apache.flink.api.scala.runtime.tuple.base.TupleComparatorTestBase
+
+import org.apache.flink.api.scala._
+
+class TupleComparatorILDXC2Test extends TupleComparatorTestBase[(Int, Long, Double)] {
+
+  protected def createComparator(ascending: Boolean): TypeComparator[(Int, Long, Double)] = {
+    val ti = createTypeInformation[(Int, Long, Double)]
+    ti.asInstanceOf[TupleTypeInfoBase[(Int, Long, Double)]]
+      .createComparator(Array(2, 1), Array(ascending, ascending))
+  }
+
+  protected def createSerializer: TypeSerializer[(Int, Long, Double)] = {
+    val ti = createTypeInformation[(Int, Long, Double)]
+    ti.createSerializer()
+  }
+
+  protected def getSortedTestData: Array[(Int, Long, Double)] = {
+    dataISD
+  }
+
+  private val dataISD = Array(
+    (4, 4L, 20.0),
+    (4, 5L, 20.0),
+    (4, 3L, 23.0),
+    (4, 19L, 23.0),
+    (4, 17L, 24.0),
+    (4, 18L, 24.0),
+    (4, 24L, 25.0),
+    (4, 25L, 25.0)
+  )
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8131fa7/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorISD1Test.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorISD1Test.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorISD1Test.scala
new file mode 100644
index 0000000..7478701
--- /dev/null
+++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorISD1Test.scala
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala.runtime
+
+import org.apache.flink.api.common.typeutils.{TypeSerializer, TypeComparator}
+import org.apache.flink.api.java.typeutils.TupleTypeInfoBase
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.runtime.tuple.base.TupleComparatorTestBase
+
+class TupleComparatorISD1Test extends TupleComparatorTestBase[(Int, String, Double)] {
+
+  protected def createComparator(ascending: Boolean): TypeComparator[(Int, String, Double)] = {
+    val ti = createTypeInformation[(Int, String, Double)]
+    ti.asInstanceOf[TupleTypeInfoBase[(Int, String, Double)]]
+      .createComparator(Array(0), Array(ascending))
+  }
+
+  protected def createSerializer: TypeSerializer[(Int, String, Double)] = {
+    val ti = createTypeInformation[(Int, String, Double)]
+    ti.createSerializer()
+  }
+
+  protected def getSortedTestData: Array[(Int, String, Double)] = {
+    dataISD
+  }
+
+  private val dataISD = Array(
+    (4, "hello", 20.0),
+    (5, "hello", 23.2),
+    (6, "world", 20.0),
+    (7, "hello", 20.0),
+    (8, "hello", 23.2),
+    (9, "world", 20.0),
+    (10, "hello", 20.0),
+    (11, "hello", 23.2)
+  )
+}
+