You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/03/29 13:50:39 UTC
[10/12] flink git commit: [FLINK-947] Add parser to Expression API
for exposing it to Java
http://git-wip-us.apache.org/repos/asf/flink/blob/d7d9b639/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/GroupedAggreagationsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/GroupedAggreagationsITCase.scala b/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/GroupedAggreagationsITCase.scala
new file mode 100644
index 0000000..f2e0286
--- /dev/null
+++ b/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/GroupedAggreagationsITCase.scala
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.expressions.test
+
+import org.apache.flink.api.expressions.ExpressionException
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.expressions._
+import org.apache.flink.api.scala.util.CollectionDataSets
+import org.apache.flink.core.fs.FileSystem.WriteMode
+import org.apache.flink.test.util.MultipleProgramsTestBase
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.junit._
+import org.junit.rules.TemporaryFolder
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+
+@RunWith(classOf[Parameterized])
+class GroupedAggreagationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) {
+ private var resultPath: String = null
+ private var expected: String = ""
+ private val _tempFolder = new TemporaryFolder()
+
+ @Rule
+ def tempFolder = _tempFolder
+
+ @Before
+ def before(): Unit = {
+ resultPath = tempFolder.newFile().toURI.toString
+ }
+
+ @After
+ def after: Unit = {
+ compareResultsByLinesInMemory(expected, resultPath)
+ }
+
+ @Test(expected = classOf[ExpressionException])
+ def testGroupingOnNonExistentField: Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
+ .groupBy('_foo)
+ .select('a.avg)
+
+ ds.writeAsText(resultPath, WriteMode.OVERWRITE)
+ env.execute()
+ expected = ""
+ }
+
+ @Test
+ def testGroupedAggregate: Unit = {
+
+ // the grouping key needs to be forwarded to the intermediate DataSet, even
+ // if we don't want the key in the output
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
+ .groupBy('b)
+ .select('b, 'a.sum)
+
+ ds.writeAsText(resultPath, WriteMode.OVERWRITE)
+ env.execute()
+ expected = "1,1\n" + "2,5\n" + "3,15\n" + "4,34\n" + "5,65\n" + "6,111\n"
+ }
+
+ @Test
+ def testGroupingKeyForwardIfNotUsed: Unit = {
+
+ // the grouping key needs to be forwarded to the intermediate DataSet, even
+ // if we don't want the key in the output
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
+ .groupBy('b)
+ .select('a.sum)
+
+ ds.writeAsText(resultPath, WriteMode.OVERWRITE)
+ env.execute()
+ expected = "1\n" + "5\n" + "15\n" + "34\n" + "65\n" + "111\n"
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/d7d9b639/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/JoinITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/JoinITCase.scala b/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/JoinITCase.scala
new file mode 100644
index 0000000..91b3b19
--- /dev/null
+++ b/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/JoinITCase.scala
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.expressions.test
+
+import org.apache.flink.api.expressions.ExpressionException
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.expressions._
+import org.apache.flink.api.scala.util.CollectionDataSets
+import org.apache.flink.core.fs.FileSystem.WriteMode
+import org.apache.flink.test.util.MultipleProgramsTestBase
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.junit._
+import org.junit.rules.TemporaryFolder
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+
+@RunWith(classOf[Parameterized])
+class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) {
+ private var resultPath: String = null
+ private var expected: String = ""
+ private val _tempFolder = new TemporaryFolder()
+
+ @Rule
+ def tempFolder = _tempFolder
+
+ @Before
+ def before(): Unit = {
+ resultPath = tempFolder.newFile().toURI.toString
+ }
+
+ @After
+ def after: Unit = {
+ compareResultsByLinesInMemory(expected, resultPath)
+ }
+
+ @Test
+ def testJoin: Unit = {
+ val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+ val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c)
+ val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h)
+
+ val joinDs = ds1.join(ds2).where('b === 'e).select('c, 'g)
+
+ joinDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
+ env.execute()
+ expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt\n"
+ }
+
+ @Test
+ def testJoinWithFilter: Unit = {
+ val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+ val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c)
+ val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h)
+
+ val joinDs = ds1.join(ds2).where('b === 'e && 'b < 2).select('c, 'g)
+
+ joinDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
+ env.execute()
+ expected = "Hi,Hallo\n"
+ }
+
+ @Test
+ def testJoinWithMultipleKeys: Unit = {
+ val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+ val ds1 = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
+ val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h)
+
+ val joinDs = ds1.join(ds2).filter('a === 'd && 'b === 'h).select('c, 'g)
+
+ joinDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
+ env.execute()
+ expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt wie gehts?\n" +
+ "Hello world,ABC\n" + "I am fine.,HIJ\n" + "I am fine.,IJK\n"
+ }
+
+ @Test(expected = classOf[ExpressionException])
+ def testJoinNonExistingKey: Unit = {
+ val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+ val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c)
+ val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h)
+
+ val joinDs = ds1.join(ds2).where('foo === 'e).select('c, 'g)
+
+ joinDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
+ env.execute()
+ expected = ""
+ }
+
+ @Test(expected = classOf[ExpressionException])
+ def testJoinWithNonMatchingKeyTypes: Unit = {
+ val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+ val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c)
+ val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h)
+
+ val joinDs = ds1.join(ds2).where('a === 'g).select('c, 'g)
+
+ joinDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
+ env.execute()
+ expected = ""
+ }
+
+ @Test(expected = classOf[ExpressionException])
+ def testJoinWithAmbiguousFields: Unit = {
+ val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+ val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c)
+ val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'c)
+
+ val joinDs = ds1.join(ds2).where('a === 'd).select('c, 'g)
+
+ joinDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
+ env.execute()
+ expected = ""
+ }
+
+ @Test
+ def testJoinWithAggregation: Unit = {
+ val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+ val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c)
+ val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h)
+
+ val joinDs = ds1.join(ds2).where('a === 'd).select('g.count)
+
+ joinDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
+ env.execute()
+ expected = "6"
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/d7d9b639/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/SelectITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/SelectITCase.scala b/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/SelectITCase.scala
new file mode 100644
index 0000000..a799b60
--- /dev/null
+++ b/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/SelectITCase.scala
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.expressions.test
+
+import org.apache.flink.api.expressions.ExpressionException
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.expressions._
+import org.apache.flink.api.scala.util.CollectionDataSets
+import org.apache.flink.core.fs.FileSystem.WriteMode
+import org.apache.flink.test.util.MultipleProgramsTestBase
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.junit._
+import org.junit.rules.TemporaryFolder
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+
+@RunWith(classOf[Parameterized])
+class SelectITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) {
+ private var resultPath: String = null
+ private var expected: String = ""
+ private val _tempFolder = new TemporaryFolder()
+
+ @Rule
+ def tempFolder = _tempFolder
+
+ @Before
+ def before(): Unit = {
+ resultPath = tempFolder.newFile().toURI.toString
+ }
+
+ @After
+ def after: Unit = {
+ compareResultsByLinesInMemory(expected, resultPath)
+ }
+
+ @Test
+ def testSimpleSelectAll: Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val ds = CollectionDataSets.get3TupleDataSet(env).toExpression.select('_1, '_2, '_3)
+
+ ds.writeAsText(resultPath, WriteMode.OVERWRITE)
+ env.execute()
+ expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, " +
+ "how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + "7,4," +
+ "Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + "11,5," +
+ "Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5," +
+ "Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19," +
+ "6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
+ }
+
+ @Test
+ def testSimpleSelectAllWithAs: Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c).select('a, 'b, 'c)
+
+ ds.writeAsText(resultPath, WriteMode.OVERWRITE)
+ env.execute()
+ expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, " +
+ "how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + "7,4," +
+ "Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + "11,5," +
+ "Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5," +
+ "Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19," +
+ "6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
+ }
+
+ @Test
+ def testSimpleSelectWithNaming: Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val ds = CollectionDataSets.get3TupleDataSet(env).toExpression
+ .select('_1 as 'a, '_2 as 'b)
+ .select('a, 'b)
+
+ ds.writeAsText(resultPath, WriteMode.OVERWRITE)
+ env.execute()
+ expected = "1,1\n" + "2,2\n" + "3,2\n" + "4,3\n" + "5,3\n" + "6,3\n" + "7,4\n" +
+ "8,4\n" + "9,4\n" + "10,4\n" + "11,5\n" + "12,5\n" + "13,5\n" + "14,5\n" + "15,5\n" +
+ "16,6\n" + "17,6\n" + "18,6\n" + "19,6\n" + "20,6\n" + "21,6\n"
+ }
+
+ @Test(expected = classOf[ExpressionException])
+ def testAsWithToFewFields: Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b)
+
+ ds.writeAsText(resultPath, WriteMode.OVERWRITE)
+ env.execute()
+ expected = "no"
+ }
+
+ @Test(expected = classOf[ExpressionException])
+ def testAsWithToManyFields: Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c, 'd)
+
+ ds.writeAsText(resultPath, WriteMode.OVERWRITE)
+ env.execute()
+ expected = "no"
+ }
+
+ @Test(expected = classOf[ExpressionException])
+ def testAsWithAmbiguousFields: Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'b)
+
+ ds.writeAsText(resultPath, WriteMode.OVERWRITE)
+ env.execute()
+ expected = "no"
+ }
+
+
+ @Test(expected = classOf[ExpressionException])
+ def testOnlyFieldRefInAs: Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b as 'c, 'd)
+
+ ds.writeAsText(resultPath, WriteMode.OVERWRITE)
+ env.execute()
+ expected = "no"
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/d7d9b639/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/StringExpressionsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/StringExpressionsITCase.scala b/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/StringExpressionsITCase.scala
new file mode 100644
index 0000000..c6c1113
--- /dev/null
+++ b/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/StringExpressionsITCase.scala
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.expressions.test
+
+import org.apache.flink.api.expressions.ExpressionException
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.expressions._
+import org.apache.flink.core.fs.FileSystem.WriteMode
+import org.apache.flink.test.util.MultipleProgramsTestBase
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.junit._
+import org.junit.rules.TemporaryFolder
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+
+@RunWith(classOf[Parameterized])
+class StringExpressionsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) {
+ private var resultPath: String = null
+ private var expected: String = ""
+ private val _tempFolder = new TemporaryFolder()
+
+ @Rule
+ def tempFolder = _tempFolder
+
+ @Before
+ def before(): Unit = {
+ resultPath = tempFolder.newFile().toURI.toString
+ }
+
+ @After
+ def after: Unit = {
+ compareResultsByLinesInMemory(expected, resultPath)
+ }
+
+ @Test
+ def testSubstring: Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val ds = env.fromElements(("AAAA", 2), ("BBBB", 1)).as('a, 'b)
+ .select('a.substring(0, 'b))
+
+ ds.writeAsText(resultPath, WriteMode.OVERWRITE)
+ env.execute()
+ expected = "AA\nB"
+ }
+
+ @Test
+ def testSubstringWithMaxEnd: Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val ds = env.fromElements(("ABCD", 2), ("ABCD", 1)).as('a, 'b)
+ .select('a.substring('b))
+
+ ds.writeAsText(resultPath, WriteMode.OVERWRITE)
+ env.execute()
+ expected = "CD\nBCD"
+ }
+
+ @Test(expected = classOf[ExpressionException])
+ def testNonWorkingSubstring1: Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val ds = env.fromElements(("AAAA", 2.0), ("BBBB", 1.0)).as('a, 'b)
+ .select('a.substring(0, 'b))
+
+ ds.writeAsText(resultPath, WriteMode.OVERWRITE)
+ env.execute()
+ expected = "AAA\nBB"
+ }
+
+ @Test(expected = classOf[ExpressionException])
+ def testNonWorkingSubstring2: Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val ds = env.fromElements(("AAAA", "c"), ("BBBB", "d")).as('a, 'b)
+ .select('a.substring('b, 15))
+
+ ds.writeAsText(resultPath, WriteMode.OVERWRITE)
+ env.execute()
+ expected = "AAA\nBB"
+ }
+
+
+}