You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2014/09/22 14:28:44 UTC
[02/60] Rewrite the Scala API as (somewhat) thin Layer on Java API
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8131fa7/flink-scala/src/test/scala/org/apache/flink/api/scala/io/CollectionInputFormatTest.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/io/CollectionInputFormatTest.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/io/CollectionInputFormatTest.scala
new file mode 100644
index 0000000..1535eed
--- /dev/null
+++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/io/CollectionInputFormatTest.scala
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala.io
+
+import org.apache.flink.api.java.io.CollectionInputFormat
+import org.junit.Assert.assertEquals
+import org.junit.Assert.assertNotNull
+import org.junit.Assert.assertTrue
+import org.apache.flink.api.java.typeutils.BasicTypeInfo
+import org.apache.flink.core.io.GenericInputSplit
+import org.junit.Test
+import java.io.ByteArrayInputStream
+import java.io.ByteArrayOutputStream
+import java.io.ObjectInputStream
+import java.io.ObjectOutputStream
+import org.apache.flink.api.scala._
+import scala.collection.JavaConverters._
+
+class ElementType(val id: Int) {
+ def this() {
+ this(-1)
+ }
+
+ override def equals(obj: Any): Boolean = {
+ if (obj != null && obj.isInstanceOf[ElementType]) {
+ val et = obj.asInstanceOf[ElementType]
+ et.id == this.id
+ }
+ else {
+ false
+ }
+ }
+}
+
+class CollectionInputFormatTest {
+
+ @Test
+ def testSerializability(): Unit = {
+
+ val inputCollection = Seq(new ElementType(1), new ElementType(2), new ElementType(3))
+ val info = createTypeInformation[ElementType]
+
+ val inputFormat: CollectionInputFormat[ElementType] = new
+ CollectionInputFormat[ElementType](inputCollection.asJava, info.createSerializer())
+
+ val buffer = new ByteArrayOutputStream
+ val out = new ObjectOutputStream(buffer)
+
+ out.writeObject(inputFormat)
+
+ val in = new ObjectInputStream(new ByteArrayInputStream(buffer.toByteArray))
+ val serializationResult: AnyRef = in.readObject
+
+ assertNotNull(serializationResult)
+ assertTrue(serializationResult.isInstanceOf[CollectionInputFormat[_]])
+
+ val result = serializationResult.asInstanceOf[CollectionInputFormat[ElementType]]
+ val inputSplit = new GenericInputSplit
+ inputFormat.open(inputSplit)
+ result.open(inputSplit)
+
+ while (!inputFormat.reachedEnd && !result.reachedEnd) {
+ val expectedElement = inputFormat.nextRecord(null)
+ val actualElement = result.nextRecord(null)
+ assertEquals(expectedElement, actualElement)
+ }
+ }
+
+ @Test
+ def testSerializabilityStrings(): Unit = {
+ val data = Seq("To bey or not to be,--that is the question:--",
+ "Whether 'tis nobler in the mind to suffer", "The slings and arrows of outrageous fortune",
+ "Or to take arms against a sea of troubles,", "And by opposing end them?--To die," +
+ "--to sleep,--", "No more; and by a sleep to say we end", "The heartache, " +
+ "and the thousand natural shocks", "That flesh is heir to,--'tis a consummation",
+ "Devoutly to be wish'd. To die,--to sleep;--", "To sleep! perchance to dream:--ay, " +
+ "there's the rub;", "For in that sleep of death what dreams may come,",
+ "When we have shuffled off this mortal coil,", "Must give us pause: there's the respect",
+ "That makes calamity of so long life;", "For who would bear the whips and scorns of time,",
+ "The oppressor's wrong, the proud man's contumely,", "The pangs of despis'd love, " +
+ "the law's delay,", "The insolence of office, and the spurns",
+ "That patient merit of the unworthy takes,", "When he himself might his quietus make",
+ "With a bare bodkin? who would these fardels bear,", "To grunt and sweat under a weary " +
+ "life,", "But that the dread of something after death,--", "The undiscover'd country, " +
+ "from whose bourn", "No traveller returns,--puzzles the will,",
+ "And makes us rather bear those ills we have", "Than fly to others that we know not of?",
+ "Thus conscience does make cowards of us all;", "And thus the native hue of resolution",
+ "Is sicklied o'er with the pale cast of thought;", "And enterprises of great pith and " +
+ "moment,", "With this regard, their currents turn awry,", "And lose the name of action" +
+ ".--Soft you now!", "The fair Ophelia!--Nymph, in thy orisons",
+ "Be all my sins remember'd.")
+
+ val inputFormat = new CollectionInputFormat[String](
+ data.asJava,
+ BasicTypeInfo.STRING_TYPE_INFO.createSerializer)
+ val baos = new ByteArrayOutputStream
+ val oos = new ObjectOutputStream(baos)
+
+ oos.writeObject(inputFormat)
+ oos.close()
+
+ val bais = new ByteArrayInputStream(baos.toByteArray)
+ val ois = new ObjectInputStream(bais)
+ val result: AnyRef = ois.readObject
+
+ assertTrue(result.isInstanceOf[CollectionInputFormat[_]])
+ var i: Int = 0
+ val in = result.asInstanceOf[CollectionInputFormat[String]]
+ in.open(new GenericInputSplit)
+
+ while (!in.reachedEnd) {
+ assertEquals(data(i), in.nextRecord(""))
+ i += 1
+ }
+ assertEquals(data.length, i)
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8131fa7/flink-scala/src/test/scala/org/apache/flink/api/scala/io/CsvInputFormatTest.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/io/CsvInputFormatTest.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/io/CsvInputFormatTest.scala
new file mode 100644
index 0000000..e85b259
--- /dev/null
+++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/io/CsvInputFormatTest.scala
@@ -0,0 +1,288 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala.io
+
+import org.apache.flink.api.scala.operators.ScalaCsvInputFormat
+import org.junit.Assert.assertEquals
+import org.junit.Assert.assertNotNull
+import org.junit.Assert.assertNull
+import org.junit.Assert.assertTrue
+import org.junit.Assert.fail
+import java.io.File
+import java.io.FileOutputStream
+import java.io.FileWriter
+import java.io.OutputStreamWriter
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.core.fs.FileInputSplit
+import org.apache.flink.core.fs.Path
+import org.junit.Test
+import org.apache.flink.api.scala._
+
+class CsvInputFormatTest {
+
+ private final val PATH: Path = new Path("an/ignored/file/")
+ private final val FIRST_PART: String = "That is the first part"
+ private final val SECOND_PART: String = "That is the second part"
+
+ @Test
+ def readStringFields():Unit = {
+ try {
+ val fileContent = "abc|def|ghijk\nabc||hhg\n|||"
+ val split = createTempFile(fileContent)
+ val format = new ScalaCsvInputFormat[(String, String, String)](
+ PATH, createTypeInformation[(String, String, String)])
+ format.setDelimiter("\n")
+ format.setFieldDelimiter('|')
+ val parameters = new Configuration
+ format.configure(parameters)
+ format.open(split)
+ var result: (String, String, String) = null
+ result = format.nextRecord(result)
+ assertNotNull(result)
+ assertEquals("abc", result._1)
+ assertEquals("def", result._2)
+ assertEquals("ghijk", result._3)
+ result = format.nextRecord(result)
+ assertNotNull(result)
+ assertEquals("abc", result._1)
+ assertEquals("", result._2)
+ assertEquals("hhg", result._3)
+ result = format.nextRecord(result)
+ assertNotNull(result)
+ assertEquals("", result._1)
+ assertEquals("", result._2)
+ assertEquals("", result._3)
+ result = format.nextRecord(result)
+ assertNull(result)
+ assertTrue(format.reachedEnd)
+ }
+ catch {
+ case ex: Exception => {
+ ex.printStackTrace()
+ fail("Test failed due to a " + ex.getClass.getName + ": " + ex.getMessage)
+ }
+ }
+ }
+
+ @Test
+ def readStringFieldsWithTrailingDelimiters(): Unit = {
+ try {
+ val fileContent = "abc|def|ghijk\nabc||hhg\n|||\n"
+ val split = createTempFile(fileContent)
+ val format = new ScalaCsvInputFormat[(String, String, String)](
+ PATH, createTypeInformation[(String, String, String)])
+ format.setDelimiter("\n")
+ format.setFieldDelimiter('|')
+ val parameters = new Configuration
+ format.configure(parameters)
+ format.open(split)
+ var result: (String, String, String) = null
+ result = format.nextRecord(result)
+ assertNotNull(result)
+ assertEquals("abc", result._1)
+ assertEquals("def", result._2)
+ assertEquals("ghijk", result._3)
+ result = format.nextRecord(result)
+ assertNotNull(result)
+ assertEquals("abc", result._1)
+ assertEquals("", result._2)
+ assertEquals("hhg", result._3)
+ result = format.nextRecord(result)
+ assertNotNull(result)
+ assertEquals("", result._1)
+ assertEquals("", result._2)
+ assertEquals("", result._3)
+ result = format.nextRecord(result)
+ assertNull(result)
+ assertTrue(format.reachedEnd)
+ }
+ catch {
+ case ex: Exception =>
+ ex.printStackTrace()
+ fail("Test failed due to a " + ex.getClass.getName + ": " + ex.getMessage)
+ }
+ }
+
+ @Test
+ def testIntegerFields(): Unit = {
+ try {
+ val fileContent = "111|222|333|444|555\n666|777|888|999|000|\n"
+ val split = createTempFile(fileContent)
+ val format = new ScalaCsvInputFormat[(Int, Int, Int, Int, Int)](
+ PATH, createTypeInformation[(Int, Int, Int, Int, Int)])
+ format.setFieldDelimiter('|')
+ format.configure(new Configuration)
+ format.open(split)
+ var result: (Int, Int, Int, Int, Int) = null
+ result = format.nextRecord(result)
+ assertNotNull(result)
+ assertEquals(Integer.valueOf(111), result._1)
+ assertEquals(Integer.valueOf(222), result._2)
+ assertEquals(Integer.valueOf(333), result._3)
+ assertEquals(Integer.valueOf(444), result._4)
+ assertEquals(Integer.valueOf(555), result._5)
+ result = format.nextRecord(result)
+ assertNotNull(result)
+ assertEquals(Integer.valueOf(666), result._1)
+ assertEquals(Integer.valueOf(777), result._2)
+ assertEquals(Integer.valueOf(888), result._3)
+ assertEquals(Integer.valueOf(999), result._4)
+ assertEquals(Integer.valueOf(000), result._5)
+ result = format.nextRecord(result)
+ assertNull(result)
+ assertTrue(format.reachedEnd)
+ }
+ catch {
+ case ex: Exception =>
+ fail("Test failed due to a " + ex.getClass.getName + ": " + ex.getMessage)
+ }
+ }
+
+ @Test
+ def testReadFirstN(): Unit = {
+ try {
+ val fileContent = "111|222|333|444|555|\n666|777|888|999|000|\n"
+ val split = createTempFile(fileContent)
+ val format = new ScalaCsvInputFormat[(Int, Int)](PATH, createTypeInformation[(Int, Int)])
+ format.setFieldDelimiter('|')
+ format.configure(new Configuration)
+ format.open(split)
+ var result: (Int, Int) = null
+ result = format.nextRecord(result)
+ assertNotNull(result)
+ assertEquals(Integer.valueOf(111), result._1)
+ assertEquals(Integer.valueOf(222), result._2)
+ result = format.nextRecord(result)
+ assertNotNull(result)
+ assertEquals(Integer.valueOf(666), result._1)
+ assertEquals(Integer.valueOf(777), result._2)
+ result = format.nextRecord(result)
+ assertNull(result)
+ assertTrue(format.reachedEnd)
+ }
+ catch {
+ case ex: Exception =>
+ fail("Test failed due to a " + ex.getClass.getName + ": " + ex.getMessage)
+ }
+ }
+
+ @Test
+ def testReadSparseWithPositionSetter(): Unit = {
+ try {
+ val fileContent: String = "111|222|333|444|555|666|777|888|999|000|\n000|999|888|777|666" +
+ "|555|444|333|222|111|"
+ val split = createTempFile(fileContent)
+ val format = new ScalaCsvInputFormat[(Int, Int, Int)](
+ PATH,
+ createTypeInformation[(Int, Int, Int)])
+ format.setFieldDelimiter('|')
+ format.setFields(Array(0, 3, 7), Array(classOf[Integer], classOf[Integer], classOf[Integer]))
+ format.configure(new Configuration)
+ format.open(split)
+ var result: (Int, Int, Int) = null
+ result = format.nextRecord(result)
+ assertNotNull(result)
+ assertEquals(Integer.valueOf(111), result._1)
+ assertEquals(Integer.valueOf(444), result._2)
+ assertEquals(Integer.valueOf(888), result._3)
+ result = format.nextRecord(result)
+ assertNotNull(result)
+ assertEquals(Integer.valueOf(000), result._1)
+ assertEquals(Integer.valueOf(777), result._2)
+ assertEquals(Integer.valueOf(333), result._3)
+ result = format.nextRecord(result)
+ assertNull(result)
+ assertTrue(format.reachedEnd)
+ }
+ catch {
+ case ex: Exception =>
+ fail("Test failed due to a " + ex.getClass.getName + ": " + ex.getMessage)
+ }
+ }
+
+ @Test
+ def testReadSparseWithShuffledPositions(): Unit = {
+ try {
+ val format = new ScalaCsvInputFormat[(Int, Int, Int)](
+ PATH,
+ createTypeInformation[(Int, Int, Int)])
+ format.setFieldDelimiter('|')
+ try {
+ format.setFields(Array(8, 1, 3), Array(classOf[Integer],classOf[Integer],classOf[Integer]))
+ fail("Input sequence should have been rejected.")
+ }
+ catch {
+ case e: IllegalArgumentException => // ignore
+ }
+ }
+ catch {
+ case ex: Exception =>
+ fail("Test failed due to a " + ex.getClass.getName + ": " + ex.getMessage)
+ }
+ }
+
+ private def createTempFile(content: String): FileInputSplit = {
+ val tempFile = File.createTempFile("test_contents", "tmp")
+ tempFile.deleteOnExit()
+ val wrt = new FileWriter(tempFile)
+ wrt.write(content)
+ wrt.close()
+ new FileInputSplit(0, new Path(tempFile.toURI.toString), 0,
+ tempFile.length,Array[String]("localhost"))
+ }
+
+ @Test
+ def testWindowsLineEndRemoval(): Unit = {
+ this.testRemovingTrailingCR("\n", "\n")
+ this.testRemovingTrailingCR("\r\n", "\r\n")
+ this.testRemovingTrailingCR("\r\n", "\n")
+ }
+
+ private def testRemovingTrailingCR(lineBreakerInFile: String, lineBreakerSetup: String) {
+ var tempFile: File = null
+ val fileContent = FIRST_PART + lineBreakerInFile + SECOND_PART + lineBreakerInFile
+ try {
+ tempFile = File.createTempFile("CsvInputFormatTest", "tmp")
+ tempFile.deleteOnExit()
+ tempFile.setWritable(true)
+ val wrt = new OutputStreamWriter(new FileOutputStream(tempFile))
+ wrt.write(fileContent)
+ wrt.close()
+ val inputFormat = new ScalaCsvInputFormat[Tuple1[String]](new Path(tempFile.toURI.toString),
+ createTypeInformation[Tuple1[String]])
+ val parameters = new Configuration
+ inputFormat.configure(parameters)
+ inputFormat.setDelimiter(lineBreakerSetup)
+ val splits = inputFormat.createInputSplits(1)
+ inputFormat.open(splits(0))
+ var result = inputFormat.nextRecord(null)
+ assertNotNull("Expecting to not return null", result)
+ assertEquals(FIRST_PART, result._1)
+ result = inputFormat.nextRecord(result)
+ assertNotNull("Expecting to not return null", result)
+ assertEquals(SECOND_PART, result._1)
+ }
+ catch {
+ case t: Throwable =>
+ System.err.println("test failed with exception: " + t.getMessage)
+ t.printStackTrace(System.err)
+ fail("Test erroneous")
+ }
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8131fa7/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/AggregateOperatorTest.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/AggregateOperatorTest.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/AggregateOperatorTest.scala
new file mode 100644
index 0000000..ae9fe22
--- /dev/null
+++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/AggregateOperatorTest.scala
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala.operators
+
+import org.junit.Assert
+import org.apache.flink.api.common.InvalidProgramException
+import org.apache.flink.api.java.aggregation.Aggregations
+import org.apache.flink.api.java.aggregation.UnsupportedAggregationTypeException
+import org.junit.Test
+
+import org.apache.flink.api.scala._
+
+class AggregateOperatorTest {
+
+ private final val emptyTupleData = Array[(Int, Long, String, Long, Int)]()
+ private final val tupleTypeInfo = createTypeInformation[(Int, Long, String, Long, Int)]
+ private final val emptyLongData = Array[Long]()
+
+ @Test
+ def testFieldsAggregate(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+
+ val tupleDs = env.fromCollection(emptyTupleData)
+
+ // should work
+ try {
+ tupleDs.aggregate(Aggregations.SUM, 1)
+ } catch {
+ case e: Exception => Assert.fail()
+ }
+
+ // should not work: index out of bounds
+ try {
+ tupleDs.aggregate(Aggregations.SUM, 10)
+ Assert.fail()
+ } catch {
+ case iae: IllegalArgumentException =>
+ case e: Exception => Assert.fail()
+ }
+
+ val longDs = env.fromCollection(emptyLongData)
+
+ // should not work: not applied to tuple DataSet
+ try {
+ longDs.aggregate(Aggregations.MIN, 1)
+ Assert.fail()
+ } catch {
+ case uoe: InvalidProgramException =>
+ case e: Exception => Assert.fail()
+ }
+ }
+
+ @Test
+ def testAggregationTypes(): Unit = {
+ try {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+
+ val tupleDs = env.fromCollection(emptyTupleData)
+
+ tupleDs.aggregate(Aggregations.SUM, 0).aggregate(Aggregations.MIN, 4)
+ tupleDs.aggregate(Aggregations.MIN, 2).aggregate(Aggregations.SUM, 1)
+ try {
+ tupleDs.aggregate(Aggregations.SUM, 2)
+ Assert.fail()
+ } catch {
+ case iae: UnsupportedAggregationTypeException =>
+ }
+ } catch {
+ case e: Exception => {
+ System.err.println(e.getMessage)
+ e.printStackTrace()
+ Assert.fail(e.getMessage)
+ }
+ }
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8131fa7/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/CoGroupOperatorTest.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/CoGroupOperatorTest.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/CoGroupOperatorTest.scala
new file mode 100644
index 0000000..3608a50
--- /dev/null
+++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/CoGroupOperatorTest.scala
@@ -0,0 +1,213 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala.operators
+
+import java.io.Serializable
+import org.apache.flink.api.common.InvalidProgramException
+import org.junit.Assert
+import org.junit.Ignore
+import org.junit.Test
+import org.apache.flink.api.scala._
+
+class CoGroupOperatorTest {
+
+ private val emptyTupleData = Array[(Int, Long, String, Long, Int)]()
+ private var customTypeData = Array[CustomType](new CustomType())
+
+ @Test
+ def testCoGroupKeyFields1(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val ds1 = env.fromCollection(emptyTupleData)
+ val ds2 = env.fromCollection(emptyTupleData)
+
+ // Should work
+ try {
+ ds1.coGroup(ds2).where(0).equalTo(0)
+ }
+ catch {
+ case e: Exception => Assert.fail()
+ }
+ }
+
+ @Test(expected = classOf[InvalidProgramException])
+ def testCoGroupKeyFields2(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val ds1 = env.fromCollection(emptyTupleData)
+ val ds2 = env.fromCollection(emptyTupleData)
+
+ // Should not work, incompatible key types
+ ds1.coGroup(ds2).where(0).equalTo(2)
+ }
+
+ @Test(expected = classOf[InvalidProgramException])
+ def testCoGroupKeyFields3(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val ds1 = env.fromCollection(emptyTupleData)
+ val ds2 = env.fromCollection(emptyTupleData)
+
+ // Should not work, incompatible number of key fields
+ ds1.coGroup(ds2).where(0, 1).equalTo(2)
+ }
+
+ @Test(expected = classOf[IllegalArgumentException])
+ def testCoGroupKeyFields4(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val ds1 = env.fromCollection(emptyTupleData)
+ val ds2 = env.fromCollection(emptyTupleData)
+
+ // Should not work, field position out of range
+ ds1.coGroup(ds2).where(5).equalTo(0)
+ }
+
+ @Test(expected = classOf[IllegalArgumentException])
+ def testCoGroupKeyFields5(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val ds1 = env.fromCollection(emptyTupleData)
+ val ds2 = env.fromCollection(emptyTupleData)
+
+ // Should not work, negative field position
+ ds1.coGroup(ds2).where(-1).equalTo(-1)
+ }
+
+ @Test(expected = classOf[IllegalArgumentException])
+ def testCoGroupKeyFields6(): Unit = {
+ val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+ val ds1 = env.fromCollection(emptyTupleData)
+ val ds2 = env.fromCollection(customTypeData)
+
+ // Should not work, field position key on custom data type
+ ds1.coGroup(ds2).where(5).equalTo(0)
+ }
+
+ @Ignore
+ @Test
+ def testCoGroupKeyExpressions1(): Unit = {
+ val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+ val ds1 = env.fromCollection(customTypeData)
+ val ds2 = env.fromCollection(customTypeData)
+
+ // Should work
+ try {
+// ds1.coGroup(ds2).where("i").equalTo("i");
+
+ }catch {
+ case e: Exception => Assert.fail()
+ }
+ }
+
+ @Ignore
+ @Test(expected = classOf[InvalidProgramException])
+ def testCoGroupKeyExpressions2(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val ds1 = env.fromCollection(customTypeData)
+ val ds2 = env.fromCollection(customTypeData)
+
+ // should not work, incompatible key types
+// ds1.coGroup(ds2).where("i").equalTo("s")
+ }
+
+ @Ignore
+ @Test(expected = classOf[InvalidProgramException])
+ def testCoGroupKeyExpressions3(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val ds1 = env.fromCollection(customTypeData)
+ val ds2 = env.fromCollection(customTypeData)
+
+ // should not work, incompatible number of keys
+// ds1.coGroup(ds2).where("i", "s").equalTo("s")
+ }
+
+ @Ignore
+ @Test(expected = classOf[IllegalArgumentException])
+ def testCoGroupKeyExpressions4(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val ds1 = env.fromCollection(customTypeData)
+ val ds2 = env.fromCollection(customTypeData)
+
+
+ // should not work, key non-existent
+// ds1.coGroup(ds2).where("myNonExistent").equalTo("i")
+ }
+
+ @Test
+ def testCoGroupKeySelectors1(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val ds1 = env.fromCollection(customTypeData)
+ val ds2 = env.fromCollection(customTypeData)
+
+ // Should work
+ try {
+ ds1.coGroup(ds2).where { _.l } equalTo { _.l }
+ }
+ catch {
+ case e: Exception => Assert.fail()
+ }
+ }
+
+ @Test
+ def testCoGroupKeyMixing1(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val ds1 = env.fromCollection(customTypeData)
+ val ds2 = env.fromCollection(emptyTupleData)
+
+ // Should work
+ try {
+ ds1.coGroup(ds2).where { _.l}.equalTo(3)
+ }
+ catch {
+ case e: Exception => Assert.fail()
+ }
+ }
+
+ @Test
+ def testCoGroupKeyMixing2(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val ds1 = env.fromCollection(emptyTupleData)
+ val ds2 = env.fromCollection(customTypeData)
+
+ // Should work
+ try {
+ ds1.coGroup(ds2).where(3).equalTo { _.l }
+ }
+ catch {
+ case e: Exception => Assert.fail()
+ }
+ }
+
+ @Test(expected = classOf[InvalidProgramException])
+ def testCoGroupKeyMixing3(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val ds1 = env.fromCollection(emptyTupleData)
+ val ds2 = env.fromCollection(customTypeData)
+
+ // Should not work, incompatible types
+ ds1.coGroup(ds2).where(2).equalTo { _.l }
+ }
+
+ @Test(expected = classOf[InvalidProgramException])
+ def testCoGroupKeyMixing4(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val ds1 = env.fromCollection(emptyTupleData)
+ val ds2 = env.fromCollection(customTypeData)
+
+ // Should not work, more than one field position key
+ ds1.coGroup(ds2).where(1, 3).equalTo { _.l }
+ }
+}
+
+
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8131fa7/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/CustomType.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/CustomType.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/CustomType.scala
new file mode 100644
index 0000000..3b82f4c
--- /dev/null
+++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/CustomType.scala
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala.operators
+
+import java.io.Serializable
+
+/**
+ * A custom data type that is used by the operator Tests.
+ */
+class CustomType(var i:Int, var l: Long, var s: String) extends Serializable {
+ def this() {
+ this(0, 0, null)
+ }
+
+ override def toString: String = {
+ i + "," + l + "," + s
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8131fa7/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/DistinctOperatorTest.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/DistinctOperatorTest.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/DistinctOperatorTest.scala
new file mode 100644
index 0000000..693cd87
--- /dev/null
+++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/DistinctOperatorTest.scala
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala.operators
+
+import org.junit.Assert
+import org.apache.flink.api.common.InvalidProgramException
+import org.junit.Test
+
+import org.apache.flink.api.scala._
+
+class DistinctOperatorTest {
+
+ private val emptyTupleData = Array[(Int, Long, String, Long, Int)]()
+ private val customTypeData = Array[CustomType](new CustomType())
+ private val emptyLongData = Array[Long]()
+
+ @Test
+ def testDistinctByKeyFields1(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tupleDs = env.fromCollection(emptyTupleData)
+
+ // Should work
+ try {
+ tupleDs.distinct(0)
+ }
+ catch {
+ case e: Exception => Assert.fail()
+ }
+ }
+
+ @Test(expected = classOf[InvalidProgramException])
+ def testDistinctByKeyFields2(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val longDs = env.fromCollection(emptyLongData)
+
+ // should not work: distinct on basic type
+ longDs.distinct(0)
+ }
+
+ @Test(expected = classOf[InvalidProgramException])
+ def testDistinctByKeyFields3(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val customDs = env.fromCollection(customTypeData)
+
+ // should not work: field position key on custom type
+ customDs.distinct(0)
+ }
+
+ @Test
+ def testDistinctByKeyFields4(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tupleDs = env.fromCollection(emptyTupleData)
+
+ // should work
+ tupleDs.distinct()
+ }
+
+ @Test(expected = classOf[InvalidProgramException])
+ def testDistinctByKeyFields5(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val customDs = env.fromCollection(customTypeData)
+
+ // should not work: distinct on custom type
+ customDs.distinct()
+ }
+
+ @Test(expected = classOf[IllegalArgumentException])
+ def testDistinctByKeyFields6(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tupleDs = env.fromCollection(emptyTupleData)
+
+ // should not work, negative field position key
+ tupleDs.distinct(-1)
+ }
+
+ @Test
+ def testDistinctByKeySelector1(): Unit = {
+ val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+ try {
+ val customDs = env.fromCollection(customTypeData)
+ customDs.distinct {_.l}
+ }
+ catch {
+ case e: Exception => Assert.fail()
+ }
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8131fa7/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/GroupingTest.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/GroupingTest.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/GroupingTest.scala
new file mode 100644
index 0000000..841fd52
--- /dev/null
+++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/GroupingTest.scala
@@ -0,0 +1,182 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala.operators
+
+import org.junit.Assert
+import org.apache.flink.api.common.InvalidProgramException
+import org.apache.flink.api.common.operators.Order
+import org.junit.Ignore
+import org.junit.Test
+
+import org.apache.flink.api.scala._
+
+
+class GroupingTest {
+
+ private val emptyTupleData = Array[(Int, Long, String, Long, Int)]()
+ private val customTypeData = Array[CustomType](new CustomType())
+ private val emptyLongData = Array[Long]()
+
+ @Test
+ def testGroupByKeyFields1(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tupleDs = env.fromCollection(emptyTupleData)
+
+ // should work
+ try {
+ tupleDs.groupBy(0)
+ }
+ catch {
+ case e: Exception => Assert.fail()
+ }
+ }
+
+ @Test(expected = classOf[InvalidProgramException])
+ def testGroupByKeyFields2(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val longDs = env.fromCollection(emptyLongData)
+
+ // should not work, grouping on basic type
+ longDs.groupBy(0)
+ }
+
+ @Test(expected = classOf[InvalidProgramException])
+ def testGroupByKeyFields3(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val customDs = env.fromCollection(customTypeData)
+
+ // should not work, field position key on custom type
+ customDs.groupBy(0)
+ }
+
+ @Test(expected = classOf[IllegalArgumentException])
+ def testGroupByKeyFields4(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tupleDs = env.fromCollection(emptyTupleData)
+
+ // should not work, fiels position out of range
+ tupleDs.groupBy(5)
+ }
+
+ @Test(expected = classOf[IllegalArgumentException])
+ def testGroupByKeyFields5(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tupleDs = env.fromCollection(emptyTupleData)
+
+ // should not work, negative field position
+ tupleDs.groupBy(-1)
+ }
+
+ @Ignore
+ @Test
+ def testGroupByKeyExpressions1(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val ds = env.fromCollection(customTypeData)
+
+ // should work
+ try {
+// ds.groupBy("i");
+ }
+ catch {
+ case e: Exception => Assert.fail()
+ }
+ }
+
+ @Ignore
+ @Test(expected = classOf[UnsupportedOperationException])
+ def testGroupByKeyExpressions2(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+
+ // should not work: groups on basic type
+// longDs.groupBy("l");
+ val longDs = env.fromCollection(emptyLongData)
+ }
+
+ @Ignore
+ @Test(expected = classOf[InvalidProgramException])
+ def testGroupByKeyExpressions3(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val customDs = env.fromCollection(customTypeData)
+
+ // should not work: groups on custom type
+ customDs.groupBy(0)
+ }
+
+ @Ignore
+ @Test(expected = classOf[IllegalArgumentException])
+ def testGroupByKeyExpressions4(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val ds = env.fromCollection(customTypeData)
+
+ // should not work, non-existent field
+// ds.groupBy("myNonExistent");
+ }
+
+ @Test
+ def testGroupByKeySelector1(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ try {
+ val customDs = env.fromCollection(customTypeData)
+ customDs.groupBy { _.l }
+ }
+ catch {
+ case e: Exception => Assert.fail()
+ }
+ }
+
+ @Test
+ def testGroupSortKeyFields1(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tupleDs = env.fromCollection(emptyTupleData)
+ try {
+ tupleDs.groupBy(0).sortGroup(0, Order.ASCENDING)
+ }
+ catch {
+ case e: Exception => Assert.fail()
+ }
+ }
+
+ @Test(expected = classOf[IllegalArgumentException])
+ def testGroupSortKeyFields2(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tupleDs = env.fromCollection(emptyTupleData)
+
+ // should not work, field position out of range
+ tupleDs.groupBy(0).sortGroup(5, Order.ASCENDING)
+ }
+
+ @Test(expected = classOf[InvalidProgramException])
+ def testGroupSortKeyFields3(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val longDs = env.fromCollection(emptyLongData)
+ longDs.groupBy { x: Long => x } .sortGroup(0, Order.ASCENDING)
+ }
+
+ @Test
+ def testChainedGroupSortKeyFields(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tupleDs = env.fromCollection(emptyTupleData)
+ try {
+ tupleDs.groupBy(0).sortGroup(0, Order.ASCENDING).sortGroup(2, Order.DESCENDING)
+ }
+ catch {
+ case e: Exception => Assert.fail()
+ }
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8131fa7/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/JoinOperatorTest.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/JoinOperatorTest.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/JoinOperatorTest.scala
new file mode 100644
index 0000000..fff9857
--- /dev/null
+++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/JoinOperatorTest.scala
@@ -0,0 +1,200 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala.operators
+
+import org.junit.Assert
+import org.apache.flink.api.common.InvalidProgramException
+import org.junit.Ignore
+import org.junit.Test
+
+import org.apache.flink.api.scala._
+
+class JoinOperatorTest {
+
+ private val emptyTupleData = Array[(Int, Long, String, Long, Int)]()
+ private val customTypeData = Array[CustomType](new CustomType())
+ private val emptyLongData = Array[Long]()
+
+ @Test
+ def testJoinKeyFields1(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val ds1 = env.fromCollection(emptyTupleData)
+ val ds2 = env.fromCollection(emptyTupleData)
+ try {
+ ds1.join(ds2).where(0).equalTo(0)
+ }
+ catch {
+ case e: Exception => Assert.fail()
+ }
+ }
+
+ @Test(expected = classOf[InvalidProgramException])
+ def testJoinKeyFields2(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val ds1 = env.fromCollection(emptyTupleData)
+ val ds2 = env.fromCollection(emptyTupleData)
+ ds1.join(ds2).where(0).equalTo(2)
+ }
+
+ @Test(expected = classOf[InvalidProgramException])
+ def testJoinKeyFields3(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val ds1 = env.fromCollection(emptyTupleData)
+ val ds2 = env.fromCollection(emptyTupleData)
+ ds1.join(ds2).where(0, 1).equalTo(2)
+ }
+
+ @Test(expected = classOf[IllegalArgumentException])
+ def testJoinKeyFields4(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val ds1 = env.fromCollection(emptyTupleData)
+ val ds2 = env.fromCollection(emptyTupleData)
+ ds1.join(ds2).where(5).equalTo(0)
+ }
+
+ @Test(expected = classOf[IllegalArgumentException])
+ def testJoinKeyFields5(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val ds1 = env.fromCollection(emptyTupleData)
+ val ds2 = env.fromCollection(emptyTupleData)
+ ds1.join(ds2).where(-1).equalTo(-1)
+ }
+
+ @Test(expected = classOf[IllegalArgumentException])
+ def testJoinKeyFields6(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val ds1 = env.fromCollection(emptyTupleData)
+ val ds2 = env.fromCollection(customTypeData)
+ ds1.join(ds2).where(5).equalTo(0)
+ }
+
+ @Ignore
+ @Test
+ def testJoinKeyExpressions1(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val ds1 = env.fromCollection(customTypeData)
+ val ds2 = env.fromCollection(customTypeData)
+
+ // should work
+ try {
+// ds1.join(ds2).where("i").equalTo("i")
+ }
+ catch {
+ case e: Exception => Assert.fail()
+ }
+ }
+
+ @Ignore
+ @Test(expected = classOf[InvalidProgramException])
+ def testJoinKeyExpressions2(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val ds1 = env.fromCollection(customTypeData)
+ val ds2 = env.fromCollection(customTypeData)
+
+ // should not work, incompatible join key types
+// ds1.join(ds2).where("i").equalTo("s")
+ }
+
+ @Ignore
+ @Test(expected = classOf[InvalidProgramException])
+ def testJoinKeyExpressions3(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val ds1 = env.fromCollection(customTypeData)
+ val ds2 = env.fromCollection(customTypeData)
+
+ // should not work, incompatible number of keys
+// ds1.join(ds2).where("i", "s").equalTo("i")
+ }
+
+ @Ignore
+ @Test(expected = classOf[IllegalArgumentException])
+ def testJoinKeyExpressions4(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val ds1 = env.fromCollection(customTypeData)
+ val ds2 = env.fromCollection(customTypeData)
+
+ // should not work, join key non-existent
+// ds1.join(ds2).where("myNonExistent").equalTo("i")
+ }
+
+ @Test
+ def testJoinKeySelectors1(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val ds1 = env.fromCollection(customTypeData)
+ val ds2 = env.fromCollection(customTypeData)
+
+ // should work
+ try {
+ ds1.join(ds2).where { _.l} equalTo { _.l }
+ }
+ catch {
+ case e: Exception => Assert.fail()
+ }
+ }
+
+ @Test
+ def testJoinKeyMixing1(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val ds1 = env.fromCollection(customTypeData)
+ val ds2 = env.fromCollection(emptyTupleData)
+
+ // should work
+ try {
+ ds1.join(ds2).where { _.l }.equalTo(3)
+ }
+ catch {
+ case e: Exception => Assert.fail()
+ }
+ }
+
+ @Test
+ def testJoinKeyMixing2(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val ds1 = env.fromCollection(emptyTupleData)
+ val ds2 = env.fromCollection(customTypeData)
+
+ // should work
+ try {
+ ds1.join(ds2).where(3).equalTo { _.l }
+ }
+ catch {
+ case e: Exception => Assert.fail()
+ }
+ }
+
+ @Test(expected = classOf[InvalidProgramException])
+ def testJoinKeyMixing3(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val ds1 = env.fromCollection(emptyTupleData)
+ val ds2 = env.fromCollection(customTypeData)
+
+ // should not work, incompatible types
+ ds1.join(ds2).where(2).equalTo { _.l }
+ }
+
+ @Test(expected = classOf[InvalidProgramException])
+ def testJoinKeyMixing4(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val ds1 = env.fromCollection(emptyTupleData)
+ val ds2 = env.fromCollection(customTypeData)
+
+ // should not work, more than one field position key
+ ds1.join(ds2).where(1, 3) equalTo { _.l }
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8131fa7/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/translation/AggregateTranslationTest.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/translation/AggregateTranslationTest.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/translation/AggregateTranslationTest.scala
new file mode 100644
index 0000000..7501f82
--- /dev/null
+++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/translation/AggregateTranslationTest.scala
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala.operators.translation
+
+import org.apache.flink.api.common.Plan
+import org.apache.flink.api.common.operators.base.{GenericDataSourceBase, GroupReduceOperatorBase}
+
+import org.apache.flink.api.java.aggregation.Aggregations
+import org.apache.flink.api.scala._
+import org.junit.Assert.{assertEquals, assertTrue, fail}
+import org.junit.Test
+
+class AggregateTranslationTest {
+ @Test
+ def translateAggregate(): Unit = {
+ try {
+ val DOP = 8
+
+ val env = ExecutionEnvironment.createLocalEnvironment(DOP)
+
+ val initialData = env.fromElements((3.141592, "foobar", 77L))
+
+ initialData.groupBy(0).aggregate(Aggregations.MIN, 1).aggregate(Aggregations.SUM, 2).print()
+
+ val p: Plan = env.createProgramPlan()
+ val sink = p.getDataSinks.iterator.next
+
+ val reducer= sink.getInput.asInstanceOf[GroupReduceOperatorBase[_, _, _]]
+
+ assertEquals(1, reducer.getKeyColumns(0).length)
+ assertEquals(0, reducer.getKeyColumns(0)(0))
+ assertEquals(-1, reducer.getDegreeOfParallelism)
+ assertTrue(reducer.isCombinable)
+ assertTrue(reducer.getInput.isInstanceOf[GenericDataSourceBase[_, _]])
+ }
+ catch {
+ case e: Exception => {
+ System.err.println(e.getMessage)
+ e.printStackTrace()
+ fail("Test caused an error: " + e.getMessage)
+ }
+ }
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8131fa7/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/translation/DeltaIterationTranslationTest.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/translation/DeltaIterationTranslationTest.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/translation/DeltaIterationTranslationTest.scala
new file mode 100644
index 0000000..5631cbb
--- /dev/null
+++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/translation/DeltaIterationTranslationTest.scala
@@ -0,0 +1,250 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala.operators.translation
+
+import org.apache.flink.api.common.functions.{RichCoGroupFunction, RichMapFunction,
+RichJoinFunction}
+import org.apache.flink.api.java.operators.translation.WrappingFunction
+import org.junit.Assert.assertArrayEquals
+import org.junit.Assert.assertEquals
+import org.junit.Assert.fail
+import org.apache.flink.api.common.{InvalidProgramException, Plan}
+import org.apache.flink.api.common.aggregators.LongSumAggregator
+import org.apache.flink.api.common.operators.base.DeltaIterationBase
+import org.apache.flink.api.common.operators.base.GenericDataSinkBase
+import org.apache.flink.api.common.operators.base.JoinOperatorBase
+import org.apache.flink.api.common.operators.base.MapOperatorBase
+import org.junit.Test
+
+import org.apache.flink.util.Collector
+
+import org.apache.flink.api.scala._
+
+class DeltaIterationTranslationTest {
+
+ @Test
+ def testCorrectTranslation(): Unit = {
+ try {
+ val JOB_NAME = "Test JobName"
+ val ITERATION_NAME = "Test Name"
+ val BEFORE_NEXT_WORKSET_MAP = "Some Mapper"
+ val AGGREGATOR_NAME = "AggregatorName"
+ val ITERATION_KEYS = Array(2)
+ val NUM_ITERATIONS = 13
+ val DEFAULT_DOP = 133
+ val ITERATION_DOP = 77
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ env.setDegreeOfParallelism(DEFAULT_DOP)
+
+ val initialSolutionSet = env.fromElements((3.44, 5L, "abc"))
+ val initialWorkSet = env.fromElements((1.23, "abc"))
+
+ val result = initialSolutionSet.iterateDelta(initialWorkSet, NUM_ITERATIONS, ITERATION_KEYS) {
+ (s, ws) =>
+ val wsSelfJoin = ws.map(new IdentityMapper[(Double, String)]())
+ .join(ws).where(1).equalTo(1) { (l, r) => Some(l) }
+
+ val joined = wsSelfJoin.join(s).where(1).equalTo(2).apply(new SolutionWorksetJoin)
+ (joined, joined.map(new NextWorksetMapper).name(BEFORE_NEXT_WORKSET_MAP))
+ }
+ result.name(ITERATION_NAME)
+ .setParallelism(ITERATION_DOP)
+ .registerAggregator(AGGREGATOR_NAME, new LongSumAggregator)
+
+ result.print()
+ result.writeAsText("/dev/null")
+
+ val p: Plan = env.createProgramPlan(JOB_NAME)
+ assertEquals(JOB_NAME, p.getJobName)
+ assertEquals(DEFAULT_DOP, p.getDefaultParallelism)
+ var sink1: GenericDataSinkBase[_] = null
+ var sink2: GenericDataSinkBase[_] = null
+ val sinks = p.getDataSinks.iterator
+ sink1 = sinks.next
+ sink2 = sinks.next
+
+ val iteration: DeltaIterationBase[_, _] =
+ sink1.getInput.asInstanceOf[DeltaIterationBase[_,_]]
+
+ assertEquals(iteration, sink2.getInput)
+ assertEquals(NUM_ITERATIONS, iteration.getMaximumNumberOfIterations)
+ assertArrayEquals(ITERATION_KEYS, iteration.getSolutionSetKeyFields)
+ assertEquals(ITERATION_DOP, iteration.getDegreeOfParallelism)
+ assertEquals(ITERATION_NAME, iteration.getName)
+
+ val nextWorksetMapper: MapOperatorBase[_, _, _] =
+ iteration.getNextWorkset.asInstanceOf[MapOperatorBase[_, _, _]]
+ val solutionSetJoin: JoinOperatorBase[_, _, _, _] =
+ iteration.getSolutionSetDelta.asInstanceOf[JoinOperatorBase[_, _, _, _]]
+ val worksetSelfJoin: JoinOperatorBase[_, _, _, _] =
+ solutionSetJoin.getFirstInput.asInstanceOf[JoinOperatorBase[_, _, _, _]]
+ val worksetMapper: MapOperatorBase[_, _, _] =
+ worksetSelfJoin.getFirstInput.asInstanceOf[MapOperatorBase[_, _, _]]
+
+ assertEquals(classOf[IdentityMapper[_]], worksetMapper.getUserCodeWrapper.getUserCodeClass)
+
+
+ assertEquals(classOf[NextWorksetMapper], nextWorksetMapper.getUserCodeWrapper.getUserCodeClass)
+
+
+ if (solutionSetJoin.getUserCodeWrapper.getUserCodeObject.isInstanceOf[WrappingFunction[_]]) {
+ val wf: WrappingFunction[_] = solutionSetJoin.getUserCodeWrapper.getUserCodeObject
+ .asInstanceOf[WrappingFunction[_]]
+ assertEquals(classOf[SolutionWorksetJoin],
+ wf.getWrappedFunction.getClass)
+ }
+ else {
+ assertEquals(classOf[SolutionWorksetJoin],
+ solutionSetJoin.getUserCodeWrapper.getUserCodeClass)
+ }
+
+ assertEquals(BEFORE_NEXT_WORKSET_MAP, nextWorksetMapper.getName)
+ assertEquals(AGGREGATOR_NAME, iteration.getAggregators.getAllRegisteredAggregators.iterator
+ .next.getName)
+ }
+ catch {
+ case e: Exception => {
+ System.err.println(e.getMessage)
+ e.printStackTrace()
+ fail(e.getMessage)
+ }
+ }
+ }
+
+ @Test
+ def testRejectWhenSolutionSetKeysDontMatchJoin(): Unit = {
+ try {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+
+ val initialSolutionSet = env.fromElements((3.44, 5L, "abc"))
+ val initialWorkSet = env.fromElements((1.23, "abc"))
+
+ val iteration = initialSolutionSet.iterateDelta(initialWorkSet, 10, Array(0)) {
+ (s, ws) =>
+ try {
+ ws.join(s).where(1).equalTo(2)
+ fail("Accepted invalid program.")
+ } catch {
+ case e: InvalidProgramException => // all good
+ }
+ try {
+ s.join(ws).where(2).equalTo(1)
+ fail("Accepted invalid program.")
+ } catch {
+ case e: InvalidProgramException => // all good
+ }
+ (s, ws)
+ }
+ } catch {
+ case e: Exception => {
+ System.err.println(e.getMessage)
+ e.printStackTrace()
+ fail(e.getMessage)
+ }
+ }
+ }
+
+ @Test
+ def testRejectWhenSolutionSetKeysDontMatchCoGroup(): Unit = {
+ try {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+
+ val initialSolutionSet = env.fromElements((3.44, 5L, "abc"))
+ val initialWorkSet = env.fromElements((1.23, "abc"))
+
+ val iteration = initialSolutionSet.iterateDelta(initialWorkSet, 10, Array(0)) {
+ (s, ws) =>
+ try {
+ ws.coGroup(s).where(1).equalTo(2)
+ fail("Accepted invalid program.")
+ } catch {
+ case e: InvalidProgramException => // all good
+ }
+ try {
+ s.coGroup(ws).where(2).equalTo(1)
+ fail("Accepted invalid program.")
+ } catch {
+ case e: InvalidProgramException => // all good
+ }
+ (s, ws)
+ }
+ } catch {
+ case e: Exception => {
+ System.err.println(e.getMessage)
+ e.printStackTrace()
+ fail(e.getMessage)
+ }
+ }
+ }
+//
+// @Test def testRejectWhenSolutionSetKeysDontMatchCoGroup {
+// try {
+// val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+// @SuppressWarnings(Array("unchecked")) val initialSolutionSet: DataSet[Tuple3[Double, Long,
+// String]] = env.fromElements(new Tuple3[Double, Long, String](3.44, 5L, "abc"))
+// @SuppressWarnings(Array("unchecked")) val initialWorkSet: DataSet[Tuple2[Double,
+// String]] = env.fromElements(new Tuple2[Double, String](1.23, "abc"))
+// val iteration: DeltaIteration[Tuple3[Double, Long, String], Tuple2[Double,
+// String]] = initialSolutionSet.iterateDelta(initialWorkSet, 10, 1)
+// try {
+// iteration.getWorkset.coGroup(iteration.getSolutionSet).where(1).equalTo(2).`with`(new DeltaIterationTranslationTest.SolutionWorksetCoGroup1)
+// fail("Accepted invalid program.")
+// }
+// catch {
+// case e: InvalidProgramException => {
+// }
+// }
+// try {
+// iteration.getSolutionSet.coGroup(iteration.getWorkset).where(2).equalTo(1).`with`(new DeltaIterationTranslationTest.SolutionWorksetCoGroup2)
+// fail("Accepted invalid program.")
+// }
+// catch {
+// case e: InvalidProgramException => {
+// }
+// }
+// }
+// catch {
+// case e: Exception => {
+// System.err.println(e.getMessage)
+// e.printStackTrace
+// fail(e.getMessage)
+// }
+// }
+// }
+}
+
+class SolutionWorksetJoin
+ extends RichJoinFunction[(Double, String), (Double, Long, String), (Double, Long, String)] {
+ def join(first: (Double, String), second: (Double, Long, String)): (Double, Long, String) = {
+ null
+ }
+}
+
+class NextWorksetMapper extends RichMapFunction[(Double, Long, String), (Double, String)] {
+ def map(value: (Double, Long, String)): (Double, String) = {
+ null
+ }
+}
+
+class IdentityMapper[T] extends RichMapFunction[T, T] {
+ def map(value: T): T = {
+ value
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8131fa7/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/translation/DistinctTranslationTest.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/translation/DistinctTranslationTest.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/translation/DistinctTranslationTest.scala
new file mode 100644
index 0000000..4ba9652
--- /dev/null
+++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/translation/DistinctTranslationTest.scala
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala.operators.translation
+
+import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase
+import org.junit.Assert
+import org.junit.Test
+
+import org.apache.flink.api.scala._
+
+class DistinctTranslationTest {
+ @Test
+ def testCombinable(): Unit = {
+ try {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val input = env.fromElements("1", "2", "1", "3")
+
+ val op = input.distinct { x => x}
+ op.print()
+
+ val p = env.createProgramPlan()
+
+ val reduceOp =
+ p.getDataSinks.iterator.next.getInput.asInstanceOf[GroupReduceOperatorBase[_, _, _]]
+
+ Assert.assertTrue(reduceOp.isCombinable)
+ }
+ catch {
+ case e: Exception => {
+ e.printStackTrace()
+ Assert.fail(e.getMessage)
+ }
+ }
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8131fa7/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/translation/ReduceTranslationTest.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/translation/ReduceTranslationTest.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/translation/ReduceTranslationTest.scala
new file mode 100644
index 0000000..36aa4f2
--- /dev/null
+++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/operators/translation/ReduceTranslationTest.scala
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala.operators.translation
+
+import org.apache.flink.api.java.operators.translation.{KeyExtractingMapper,
+PlanUnwrappingReduceOperator}
+import org.apache.flink.api.java.typeutils.{TupleTypeInfo, BasicTypeInfo}
+import org.junit.Assert._
+import org.apache.flink.api.common.operators.base.GenericDataSinkBase
+import org.apache.flink.api.common.operators.base.GenericDataSourceBase
+import org.apache.flink.api.common.operators.base.MapOperatorBase
+import org.apache.flink.api.common.operators.base.ReduceOperatorBase
+import org.junit.Test
+
+import org.apache.flink.api.scala._
+
+class ReduceTranslationTest {
+ @Test
+ def translateNonGroupedReduce(): Unit = {
+ try {
+ val DOP = 8
+ val env = ExecutionEnvironment.createLocalEnvironment(DOP)
+
+ val initialData = env.fromElements((3.141592, "foobar", 77L)).setParallelism(1)
+
+
+ initialData reduce { (v1, v2) => v1 } print()
+
+ val p = env.createProgramPlan(
+
+)
+ val sink: GenericDataSinkBase[_] = p.getDataSinks.iterator.next
+ val reducer: ReduceOperatorBase[_, _] = sink.getInput.asInstanceOf[ReduceOperatorBase[_, _]]
+
+ assertEquals(initialData.set.getType, reducer.getOperatorInfo.getInputType)
+ assertEquals(initialData.set.getType, reducer.getOperatorInfo.getOutputType)
+ assertTrue(reducer.getKeyColumns(0) == null || reducer.getKeyColumns(0).length == 0)
+ assertTrue(reducer.getDegreeOfParallelism == 1 || reducer.getDegreeOfParallelism == -1)
+ assertTrue(reducer.getInput.isInstanceOf[GenericDataSourceBase[_, _]])
+ }
+ catch {
+ case e: Exception => {
+ System.err.println(e.getMessage)
+ e.printStackTrace()
+ fail("Test caused an error: " + e.getMessage)
+ }
+ }
+ }
+
+ @Test
+ def translateGroupedReduceNoMapper(): Unit = {
+ try {
+ val DOP: Int = 8
+ val env = ExecutionEnvironment.createLocalEnvironment(DOP)
+
+ val initialData = env.fromElements((3.141592, "foobar", 77L)).setParallelism(1)
+
+ initialData.groupBy(2) reduce { (v1, v2) => v1 } print()
+
+ val p = env.createProgramPlan()
+
+ val sink: GenericDataSinkBase[_] = p.getDataSinks.iterator.next
+ val reducer: ReduceOperatorBase[_, _] = sink.getInput.asInstanceOf[ReduceOperatorBase[_, _]]
+ assertEquals(initialData.set.getType, reducer.getOperatorInfo.getInputType)
+ assertEquals(initialData.set.getType, reducer.getOperatorInfo.getOutputType)
+ assertTrue(reducer.getDegreeOfParallelism == DOP || reducer.getDegreeOfParallelism == -1)
+ assertArrayEquals(Array[Int](2), reducer.getKeyColumns(0))
+ assertTrue(reducer.getInput.isInstanceOf[GenericDataSourceBase[_, _]])
+ }
+ catch {
+ case e: Exception => {
+ System.err.println(e.getMessage)
+ e.printStackTrace()
+ fail("Test caused an error: " + e.getMessage)
+ }
+ }
+ }
+
+ @Test
+ def translateGroupedReduceWithKeyExtractor(): Unit = {
+ try {
+ val DOP: Int = 8
+ val env = ExecutionEnvironment.createLocalEnvironment(DOP)
+
+ val initialData = env.fromElements((3.141592, "foobar", 77L)).setParallelism(1)
+
+ initialData.groupBy { _._2 }. reduce { (v1, v2) => v1 } setParallelism(4) print()
+
+ val p = env.createProgramPlan()
+ val sink: GenericDataSinkBase[_] = p.getDataSinks.iterator.next
+ val keyProjector: MapOperatorBase[_, _, _] = sink.getInput.asInstanceOf[MapOperatorBase[_,
+ _, _]]
+ val reducer: PlanUnwrappingReduceOperator[_, _] = keyProjector.getInput
+ .asInstanceOf[PlanUnwrappingReduceOperator[_, _]]
+ val keyExtractor: MapOperatorBase[_, _, _] = reducer.getInput
+ .asInstanceOf[MapOperatorBase[_, _, _]]
+ assertEquals(1, keyExtractor.getDegreeOfParallelism)
+ assertEquals(4, reducer.getDegreeOfParallelism)
+ assertEquals(4, keyProjector.getDegreeOfParallelism)
+ val keyValueInfo = new TupleTypeInfo(
+ BasicTypeInfo.STRING_TYPE_INFO,
+ createTypeInformation[(Double, String, Long)])
+ assertEquals(initialData.set.getType, keyExtractor.getOperatorInfo.getInputType)
+ assertEquals(keyValueInfo, keyExtractor.getOperatorInfo.getOutputType)
+ assertEquals(keyValueInfo, reducer.getOperatorInfo.getInputType)
+ assertEquals(keyValueInfo, reducer.getOperatorInfo.getOutputType)
+ assertEquals(keyValueInfo, keyProjector.getOperatorInfo.getInputType)
+ assertEquals(initialData.set.getType, keyProjector.getOperatorInfo.getOutputType)
+ assertEquals(classOf[KeyExtractingMapper[_, _]], keyExtractor.getUserCodeWrapper.getUserCodeClass)
+ assertTrue(keyExtractor.getInput.isInstanceOf[GenericDataSourceBase[_, _]])
+ }
+ catch {
+ case e: Exception => {
+ System.err.println(e.getMessage)
+ e.printStackTrace()
+ fail("Test caused an error: " + e.getMessage)
+ }
+ }
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8131fa7/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/GenericPairComparatorTest.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/GenericPairComparatorTest.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/GenericPairComparatorTest.scala
new file mode 100644
index 0000000..95a1b77
--- /dev/null
+++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/GenericPairComparatorTest.scala
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala.runtime
+
+import org.apache.flink.api.common.typeutils.{TypeComparator, TypeSerializer}
+import org.apache.flink.api.common.typeutils.base.{DoubleComparator, DoubleSerializer, IntComparator, IntSerializer}
+
+import org.apache.flink.api.java.typeutils.runtime.{GenericPairComparator, TupleComparator}
+import org.apache.flink.api.scala.runtime.tuple.base.PairComparatorTestBase
+import org.apache.flink.api.scala.typeutils.ScalaTupleComparator
+
+class GenericPairComparatorTest
+ extends PairComparatorTestBase[(Int, String, Double), (Int, Float, Long, Double)] {
+
+ override protected def createComparator(ascending: Boolean):
+ GenericPairComparator[(Int, String, Double), (Int, Float, Long, Double)] = {
+ val fields1 = Array[Int](0, 2)
+ val fields2 = Array[Int](0, 3)
+
+ val comps1 =
+ Array[TypeComparator[_]](new IntComparator(ascending), new DoubleComparator(ascending))
+ val comps2 =
+ Array[TypeComparator[_]](new IntComparator(ascending), new DoubleComparator(ascending))
+
+ val sers1 =
+ Array[TypeSerializer[_]](IntSerializer.INSTANCE, DoubleSerializer.INSTANCE)
+ val sers2 =
+ Array[TypeSerializer[_]](IntSerializer.INSTANCE, DoubleSerializer.INSTANCE)
+
+ val comp1 = new ScalaTupleComparator[(Int, String, Double)](fields1, comps1, sers1)
+ val comp2 = new ScalaTupleComparator[(Int, Float, Long, Double)](fields2, comps2, sers2)
+
+ new GenericPairComparator[(Int, String, Double), (Int, Float, Long, Double)](comp1, comp2)
+ }
+
+ protected def getSortedTestData:
+ (Array[(Int, String, Double)], Array[(Int, Float, Long, Double)]) = {
+ (dataISD, dataIDL)
+ }
+
+ private val dataISD = Array(
+ (4, "hello", 20.0),
+ (4, "world", 23.2),
+ (5, "hello", 18.0),
+ (5, "world", 19.2),
+ (6, "hello", 16.0),
+ (6, "world", 17.2),
+ (7, "hello", 14.0),
+ (7,"world", 15.2))
+
+ private val dataIDL = Array(
+ (4, 0.11f, 14L, 20.0),
+ (4, 0.221f, 15L, 23.2),
+ (5, 0.33f, 15L, 18.0),
+ (5, 0.44f, 20L, 19.2),
+ (6, 0.55f, 20L, 16.0),
+ (6, 0.66f, 29L, 17.2),
+ (7, 0.77f, 29L, 14.0),
+ (7, 0.88f, 34L, 15.2))
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8131fa7/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorILD2Test.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorILD2Test.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorILD2Test.scala
new file mode 100644
index 0000000..8e23744
--- /dev/null
+++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorILD2Test.scala
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala.runtime
+
+import org.apache.flink.api.common.typeutils.TypeComparator
+import org.apache.flink.api.common.typeutils.TypeSerializer
+import org.apache.flink.api.java.typeutils.TupleTypeInfoBase
+import org.apache.flink.api.scala.runtime.tuple.base.TupleComparatorTestBase
+
+import org.apache.flink.api.scala._
+
+
+class TupleComparatorILD2Test extends TupleComparatorTestBase[(Int, Long, Double)] {
+
+ protected def createComparator(ascending: Boolean): TypeComparator[(Int, Long, Double)] = {
+ val ti = createTypeInformation[(Int, Long, Double)]
+ ti.asInstanceOf[TupleTypeInfoBase[(Int, Long, Double)]]
+ .createComparator(Array(0, 1), Array(ascending, ascending))
+ }
+
+ protected def createSerializer: TypeSerializer[(Int, Long, Double)] = {
+ val ti = createTypeInformation[(Int, Long, Double)]
+ ti.createSerializer()
+ }
+
+ protected def getSortedTestData: Array[(Int, Long, Double)] = {
+ dataISD
+ }
+
+ private val dataISD = Array(
+ (4, 14L, 20.0),
+ (4, 15L, 23.2),
+ (5, 15L, 20.0),
+ (5, 20L, 20.0),
+ (6, 20L, 23.2),
+ (6, 29L, 20.0),
+ (7, 29L, 20.0),
+ (7, 34L, 23.2)
+ )
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8131fa7/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorILD3Test.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorILD3Test.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorILD3Test.scala
new file mode 100644
index 0000000..6907165
--- /dev/null
+++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorILD3Test.scala
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala.runtime
+
+import org.apache.flink.api.common.typeutils.{TypeComparator, TypeSerializer}
+import org.apache.flink.api.java.typeutils.TupleTypeInfoBase
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.runtime.tuple.base.TupleComparatorTestBase
+
+
+class TupleComparatorILD3Test extends TupleComparatorTestBase[(Int, Long, Double)] {
+
+ protected def createComparator(ascending: Boolean): TypeComparator[(Int, Long, Double)] = {
+ val ti = createTypeInformation[(Int, Long, Double)]
+ ti.asInstanceOf[TupleTypeInfoBase[(Int, Long, Double)]]
+ .createComparator(Array(0, 1, 2), Array(ascending, ascending, ascending))
+ }
+
+ protected def createSerializer: TypeSerializer[(Int, Long, Double)] = {
+ val ti = createTypeInformation[(Int, Long, Double)]
+ ti.createSerializer()
+ }
+
+ protected def getSortedTestData: Array[(Int, Long, Double)] = {
+ dataISD
+ }
+
+ private val dataISD = Array(
+ (4, 4L, 20.0),
+ (4, 4L, 23.2),
+ (4, 9L, 20.0),
+ (5, 4L, 20.0),
+ (5, 4L, 23.2),
+ (5, 9L, 20.0),
+ (6, 4L, 20.0),
+ (6, 4L, 23.2)
+ )
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8131fa7/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorILDC3Test.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorILDC3Test.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorILDC3Test.scala
new file mode 100644
index 0000000..445aaa1
--- /dev/null
+++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorILDC3Test.scala
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala.runtime
+
+import org.apache.flink.api.common.typeutils.{TypeSerializer, TypeComparator}
+import org.apache.flink.api.java.typeutils.TupleTypeInfoBase
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.runtime.tuple.base.TupleComparatorTestBase
+
+
+class TupleComparatorILDC3Test extends TupleComparatorTestBase[(Int, Long, Double)] {
+
+ protected def createComparator(ascending: Boolean): TypeComparator[(Int, Long, Double)] = {
+ val ti = createTypeInformation[(Int, Long, Double)]
+ ti.asInstanceOf[TupleTypeInfoBase[(Int, Long, Double)]]
+ .createComparator(Array(2, 0, 1), Array(ascending, ascending, ascending))
+ }
+
+ protected def createSerializer: TypeSerializer[(Int, Long, Double)] = {
+ val ti = createTypeInformation[(Int, Long, Double)]
+ ti.createSerializer()
+ }
+
+ protected def getSortedTestData: Array[(Int, Long, Double)] = {
+ dataISD
+ }
+
+ private val dataISD = Array(
+ (4, 4L, 20.0),
+ (5, 1L, 20.0),
+ (5, 2L, 20.0),
+ (5, 10L, 23.0),
+ (5, 19L, 24.0),
+ (5, 20L, 24.0),
+ (5, 24L, 25.0),
+ (5, 25L, 25.0)
+ )
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8131fa7/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorILDX1Test.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorILDX1Test.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorILDX1Test.scala
new file mode 100644
index 0000000..518705f
--- /dev/null
+++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorILDX1Test.scala
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala.runtime
+
+import org.apache.flink.api.common.typeutils.{TypeSerializer, TypeComparator}
+import org.apache.flink.api.java.typeutils.TupleTypeInfoBase
+import org.apache.flink.api.scala.runtime.tuple.base.TupleComparatorTestBase
+
+import org.apache.flink.api.scala._
+
+class TupleComparatorILDX1Test extends TupleComparatorTestBase[(Int, Long, Double)] {
+
+ protected def createComparator(ascending: Boolean): TypeComparator[(Int, Long, Double)] = {
+ val ti = createTypeInformation[(Int, Long, Double)]
+ ti.asInstanceOf[TupleTypeInfoBase[(Int, Long, Double)]]
+ .createComparator(Array(1), Array(ascending))
+ }
+
+ protected def createSerializer: TypeSerializer[(Int, Long, Double)] = {
+ val ti = createTypeInformation[(Int, Long, Double)]
+ ti.createSerializer()
+ }
+
+ protected def getSortedTestData: Array[(Int, Long, Double)] = {
+ dataISD
+ }
+
+ private val dataISD = Array(
+ (4, 4L, 20.0),
+ (4, 5L, 23.2),
+ (4, 9L, 20.0),
+ (4, 10L, 24.0),
+ (4, 19L, 23.2),
+ (4, 20L, 24.0),
+ (4, 24L, 20.0),
+ (4, 25L, 23.2)
+ )
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8131fa7/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorILDXC2Test.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorILDXC2Test.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorILDXC2Test.scala
new file mode 100644
index 0000000..0976da8
--- /dev/null
+++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorILDXC2Test.scala
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala.runtime
+
+import org.apache.flink.api.common.typeutils.{TypeSerializer, TypeComparator}
+import org.apache.flink.api.java.typeutils.TupleTypeInfoBase
+import org.apache.flink.api.scala.runtime.tuple.base.TupleComparatorTestBase
+
+import org.apache.flink.api.scala._
+
+class TupleComparatorILDXC2Test extends TupleComparatorTestBase[(Int, Long, Double)] {
+
+ protected def createComparator(ascending: Boolean): TypeComparator[(Int, Long, Double)] = {
+ val ti = createTypeInformation[(Int, Long, Double)]
+ ti.asInstanceOf[TupleTypeInfoBase[(Int, Long, Double)]]
+ .createComparator(Array(2, 1), Array(ascending, ascending))
+ }
+
+ protected def createSerializer: TypeSerializer[(Int, Long, Double)] = {
+ val ti = createTypeInformation[(Int, Long, Double)]
+ ti.createSerializer()
+ }
+
+ protected def getSortedTestData: Array[(Int, Long, Double)] = {
+ dataISD
+ }
+
+ private val dataISD = Array(
+ (4, 4L, 20.0),
+ (4, 5L, 20.0),
+ (4, 3L, 23.0),
+ (4, 19L, 23.0),
+ (4, 17L, 24.0),
+ (4, 18L, 24.0),
+ (4, 24L, 25.0),
+ (4, 25L, 25.0)
+ )
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b8131fa7/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorISD1Test.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorISD1Test.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorISD1Test.scala
new file mode 100644
index 0000000..7478701
--- /dev/null
+++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/TupleComparatorISD1Test.scala
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala.runtime
+
+import org.apache.flink.api.common.typeutils.{TypeSerializer, TypeComparator}
+import org.apache.flink.api.java.typeutils.TupleTypeInfoBase
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.runtime.tuple.base.TupleComparatorTestBase
+
+class TupleComparatorISD1Test extends TupleComparatorTestBase[(Int, String, Double)] {
+
+ protected def createComparator(ascending: Boolean): TypeComparator[(Int, String, Double)] = {
+ val ti = createTypeInformation[(Int, String, Double)]
+ ti.asInstanceOf[TupleTypeInfoBase[(Int, String, Double)]]
+ .createComparator(Array(0), Array(ascending))
+ }
+
+ protected def createSerializer: TypeSerializer[(Int, String, Double)] = {
+ val ti = createTypeInformation[(Int, String, Double)]
+ ti.createSerializer()
+ }
+
+ protected def getSortedTestData: Array[(Int, String, Double)] = {
+ dataISD
+ }
+
+ private val dataISD = Array(
+ (4, "hello", 20.0),
+ (5, "hello", 23.2),
+ (6, "world", 20.0),
+ (7, "hello", 20.0),
+ (8, "hello", 23.2),
+ (9, "world", 20.0),
+ (10, "hello", 20.0),
+ (11, "hello", 23.2)
+ )
+}
+