You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2017/07/13 10:18:40 UTC
[31/44] flink git commit: [FLINK-6617] [table] Restructuring of tests
http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/batch/table/ExternalCatalogTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/batch/table/ExternalCatalogTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/batch/table/ExternalCatalogTest.scala
deleted file mode 100644
index fab6c08..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/batch/table/ExternalCatalogTest.scala
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.catalog.batch.table
-
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.catalog.utils.ExternalCatalogTestBase
-import org.apache.flink.table.utils.CommonTestData
-import org.apache.flink.table.utils.TableTestUtil._
-import org.junit.Test
-
-/**
- * Test for external catalog query plan.
- */
-class ExternalCatalogTest extends ExternalCatalogTestBase {
-
- @Test
- def test(): Unit = {
- val util = batchTestUtil()
- val tEnv = util.tableEnv
-
- tEnv.registerExternalCatalog("test", CommonTestData.getInMemoryTestCatalog)
-
- val table1 = tEnv.scan("test", "db1", "tb1")
- val table2 = tEnv.scan("test", "db2", "tb2")
- val result = table2
- .select('d * 2, 'e, 'g.upperCase())
- .unionAll(table1.select('a * 2, 'b, 'c.upperCase()))
-
- val expected = binaryNode(
- "DataSetUnion",
- unaryNode(
- "DataSetCalc",
- sourceBatchTableNode(table2Path, table2ProjectedFields),
- term("select", "*(d, 2) AS _c0", "e", "UPPER(g) AS _c2")
- ),
- unaryNode(
- "DataSetCalc",
- sourceBatchTableNode(table1Path, table1ProjectedFields),
- term("select", "*(a, 2) AS _c0", "b", "UPPER(c) AS _c2")
- ),
- term("union", "_c0", "e", "_c2")
- )
-
- util.verifyTable(result, expected)
- }
-
- @Test
- def testTopLevelTable(): Unit = {
- val util = batchTestUtil()
- val tEnv = util.tableEnv
-
- tEnv.registerExternalCatalog("test", CommonTestData.getInMemoryTestCatalog)
-
- val table1 = tEnv.scan("test", "tb1")
- val table2 = tEnv.scan("test", "db2", "tb2")
- val result = table2
- .select('d * 2, 'e, 'g.upperCase())
- .unionAll(table1.select('a * 2, 'b, 'c.upperCase()))
-
- val expected = binaryNode(
- "DataSetUnion",
- unaryNode(
- "DataSetCalc",
- sourceBatchTableNode(table2Path, table2ProjectedFields),
- term("select", "*(d, 2) AS _c0", "e", "UPPER(g) AS _c2")
- ),
- unaryNode(
- "DataSetCalc",
- sourceBatchTableNode(table1TopLevelPath, table1ProjectedFields),
- term("select", "*(a, 2) AS _c0", "b", "UPPER(c) AS _c2")
- ),
- term("union", "_c0", "e", "_c2")
- )
-
- util.verifyTable(result, expected)
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/stream/sql/ExternalCatalogTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/stream/sql/ExternalCatalogTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/stream/sql/ExternalCatalogTest.scala
deleted file mode 100644
index 786c4a1..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/stream/sql/ExternalCatalogTest.scala
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.catalog.stream.sql
-
-import org.apache.flink.table.catalog.utils.ExternalCatalogTestBase
-import org.apache.flink.table.utils.CommonTestData
-import org.apache.flink.table.utils.TableTestUtil._
-import org.junit.Test
-
-/**
- * Test for external catalog query plan.
- */
-class ExternalCatalogTest extends ExternalCatalogTestBase {
-
- @Test
- def test(): Unit = {
- val util = streamTestUtil()
-
- util.tableEnv.registerExternalCatalog("test", CommonTestData.getInMemoryTestCatalog)
-
- val sqlQuery = "SELECT d * 2, e, g FROM test.db2.tb2 WHERE d < 3 UNION ALL " +
- "(SELECT a * 2, b, c FROM test.db1.tb1)"
-
- val expected = binaryNode(
- "DataStreamUnion",
- unaryNode(
- "DataStreamCalc",
- sourceStreamTableNode(table2Path, table2ProjectedFields),
- term("select", "*(d, 2) AS EXPR$0", "e", "g"),
- term("where", "<(d, 3)")),
- unaryNode(
- "DataStreamCalc",
- sourceStreamTableNode(table1Path, table1ProjectedFields),
- term("select", "*(a, 2) AS EXPR$0", "b", "c")
- ),
- term("union all", "EXPR$0", "e", "g"))
-
- util.verifySql(sqlQuery, expected)
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/stream/table/ExternalCatalogTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/stream/table/ExternalCatalogTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/stream/table/ExternalCatalogTest.scala
deleted file mode 100644
index 79d3f99..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/stream/table/ExternalCatalogTest.scala
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.catalog.stream.table
-
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.catalog.utils.ExternalCatalogTestBase
-import org.apache.flink.table.utils.CommonTestData
-import org.apache.flink.table.utils.TableTestUtil._
-import org.junit.Test
-
-/**
- * Test for external catalog query plan.
- */
-class ExternalCatalogTest extends ExternalCatalogTestBase {
-
- @Test
- def testStreamTableApi(): Unit = {
- val util = streamTestUtil()
- val tEnv = util.tableEnv
-
- util.tableEnv.registerExternalCatalog("test", CommonTestData.getInMemoryTestCatalog)
-
- val table1 = tEnv.scan("test", "db1", "tb1")
- val table2 = tEnv.scan("test", "db2", "tb2")
-
- val result = table2.where("d < 3")
- .select('d * 2, 'e, 'g.upperCase())
- .unionAll(table1.select('a * 2, 'b, 'c.upperCase()))
-
- val expected = binaryNode(
- "DataStreamUnion",
- unaryNode(
- "DataStreamCalc",
- sourceStreamTableNode(table2Path, table2ProjectedFields),
- term("select", "*(d, 2) AS _c0", "e", "UPPER(g) AS _c2"),
- term("where", "<(d, 3)")
- ),
- unaryNode(
- "DataStreamCalc",
- sourceStreamTableNode(table1Path, table1ProjectedFields),
- term("select", "*(a, 2) AS _c0", "b", "UPPER(c) AS _c2")
- ),
- term("union all", "_c0", "e", "_c2")
- )
-
- util.verifyTable(result, expected)
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/utils/ExternalCatalogTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/utils/ExternalCatalogTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/utils/ExternalCatalogTestBase.scala
deleted file mode 100644
index 19fbd36..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/utils/ExternalCatalogTestBase.scala
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.catalog.utils
-
-import org.apache.flink.table.utils.TableTestBase
-
-class ExternalCatalogTestBase extends TableTestBase {
- protected val table1Path: Array[String] = Array("test", "db1", "tb1")
- protected val table1TopLevelPath: Array[String] = Array("test", "tb1")
- protected val table1ProjectedFields: Array[String] = Array("a", "b", "c")
- protected val table2Path: Array[String] = Array("test", "db2", "tb2")
- protected val table2ProjectedFields: Array[String] = Array("d", "e", "g")
-
- def sourceBatchTableNode(sourceTablePath: Array[String], fields: Array[String]): String = {
- s"BatchTableSourceScan(table=[[${sourceTablePath.mkString(", ")}]], " +
- s"fields=[${fields.mkString(", ")}])"
- }
-
- def sourceStreamTableNode(sourceTablePath: Array[String], fields: Array[String]): String = {
- s"StreamTableSourceScan(table=[[${sourceTablePath.mkString(", ")}]], " +
- s"fields=[${fields.mkString(", ")}])"
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/CompositeAccessTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/CompositeAccessTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/CompositeAccessTest.scala
index 558e5b3..bd1c199 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/CompositeAccessTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/CompositeAccessTest.scala
@@ -19,10 +19,10 @@
package org.apache.flink.table.expressions
import org.apache.flink.table.api.scala._
-import org.apache.flink.table.expressions.utils.CompositeAccessTestBase
+import org.apache.flink.table.expressions.utils.CompositeTypeTestBase
import org.junit.Test
-class CompositeAccessTest extends CompositeAccessTestBase {
+class CompositeAccessTest extends CompositeTypeTestBase {
@Test
def testGetField(): Unit = {
@@ -120,7 +120,6 @@ class CompositeAccessTest extends CompositeAccessTestBase {
testTableApi('f5.flatten(), "f5.flatten()", "13")
}
-
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/MapTypeTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/MapTypeTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/MapTypeTest.scala
index 7b09733..b2b8016 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/MapTypeTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/MapTypeTest.scala
@@ -31,5 +31,4 @@ class MapTypeTest extends MapTypeTestBase {
testSqlApi("f3[1]", "null")
testSqlApi("f3[12]", "a")
}
-
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/NonDeterministicTests.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/NonDeterministicTests.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/NonDeterministicTests.scala
index bcc53af..d3b606b 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/NonDeterministicTests.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/NonDeterministicTests.scala
@@ -82,7 +82,7 @@ class NonDeterministicTests extends ExpressionTestBase {
// ----------------------------------------------------------------------------------------------
- override def testData: Any = new Row(0)
+ override def testData: Row = new Row(0)
override def typeInfo: TypeInformation[Any] =
new RowTypeInfo().asInstanceOf[TypeInformation[Any]]
http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
index ef021b7..02745b5 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
@@ -20,10 +20,10 @@ package org.apache.flink.table.expressions
import org.apache.flink.table.api.Types
import org.apache.flink.table.api.scala._
-import org.apache.flink.table.expressions.utils.ScalarFunctionsTestBase
+import org.apache.flink.table.expressions.utils.ScalarTypesTestBase
import org.junit.Test
-class ScalarFunctionsTest extends ScalarFunctionsTestBase {
+class ScalarFunctionsTest extends ScalarTypesTestBase {
// ----------------------------------------------------------------------------------------------
// String functions
@@ -347,7 +347,6 @@ class ScalarFunctionsTest extends ScalarFunctionsTestBase {
"mod(44, 3)",
"MOD(44, 3)",
"2")
-
}
@Test
http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarOperatorsTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarOperatorsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarOperatorsTest.scala
index 30f8a4c..738413e 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarOperatorsTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarOperatorsTest.scala
@@ -18,9 +18,13 @@
package org.apache.flink.table.expressions
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.RowTypeInfo
import org.apache.flink.table.api.Types
import org.apache.flink.table.api.scala._
-import org.apache.flink.table.expressions.utils.{ScalarOperatorsTestBase, ShouldNotExecuteFunc}
+import org.apache.flink.table.expressions.utils.{ExpressionTestBase, ScalarOperatorsTestBase, ShouldNotExecuteFunc}
+import org.apache.flink.table.functions.ScalarFunction
+import org.apache.flink.types.Row
import org.junit.Test
class ScalarOperatorsTest extends ScalarOperatorsTestBase {
@@ -211,5 +215,4 @@ class ScalarOperatorsTest extends ScalarOperatorsTestBase {
"trueX")
testTableApi(12.isNull, "12.isNull", "false")
}
-
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/SqlExpressionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/SqlExpressionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/SqlExpressionTest.scala
index 5cc8bd1..c631212 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/SqlExpressionTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/SqlExpressionTest.scala
@@ -114,6 +114,7 @@ class SqlExpressionTest extends ExpressionTestBase {
testSqlApi("SIGN(-1.1)", "-1")
testSqlApi("ROUND(-12.345, 2)", "-12.35")
testSqlApi("PI", "3.141592653589793")
+ testSqlApi("E()", "2.718281828459045")
}
@Test
http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/TemporalTypesTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/TemporalTypesTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/TemporalTypesTest.scala
index 4f47fd5..1d761c3 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/TemporalTypesTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/TemporalTypesTest.scala
@@ -540,7 +540,7 @@ class TemporalTypesTest extends ExpressionTestBase {
// ----------------------------------------------------------------------------------------------
- def testData = {
+ def testData: Row = {
val testData = new Row(11)
testData.setField(0, Date.valueOf("1990-10-14"))
testData.setField(1, Time.valueOf("10:20:45"))
@@ -556,7 +556,7 @@ class TemporalTypesTest extends ExpressionTestBase {
testData
}
- def typeInfo = {
+ def typeInfo: TypeInformation[Any] = {
new RowTypeInfo(
Types.SQL_DATE,
Types.SQL_TIME,
http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/UserDefinedScalarFunctionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/UserDefinedScalarFunctionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/UserDefinedScalarFunctionTest.scala
index f0432cf..9b3407e 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/UserDefinedScalarFunctionTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/UserDefinedScalarFunctionTest.scala
@@ -25,9 +25,9 @@ import org.apache.flink.api.common.typeinfo.{BasicArrayTypeInfo, TypeInformation
import org.apache.flink.api.java.typeutils.RowTypeInfo
import org.apache.flink.types.Row
import org.apache.flink.table.api.{Types, ValidationException}
-import org.apache.flink.table.api.java.utils.UserDefinedScalarFunctions._
+import org.apache.flink.table.runtime.utils.JavaUserDefinedScalarFunctions._
import org.apache.flink.table.api.scala._
-import org.apache.flink.table.expressions.utils._
+import org.apache.flink.table.expressions.utils.{ExpressionTestBase, _}
import org.apache.flink.table.functions.ScalarFunction
import org.junit.Test
http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/batch/sql/ExpressionReductionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/batch/sql/ExpressionReductionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/batch/sql/ExpressionReductionTest.scala
deleted file mode 100644
index c132335..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/batch/sql/ExpressionReductionTest.scala
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.expressions.batch.sql
-
-import org.apache.flink.api.scala._
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.utils.TableTestBase
-import org.apache.flink.table.utils.TableTestUtil._
-import org.junit.Test
-
-class ExpressionReductionTest extends TableTestBase {
-
- @Test
- def testReduceCalcExpression(): Unit = {
- val util = batchTestUtil()
- util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
-
- val sqlQuery = "SELECT " +
- "(3+4)+a, " +
- "b+(1+2), " +
- "CASE 11 WHEN 1 THEN 'a' ELSE 'b' END, " +
- "TRIM(BOTH ' STRING '), " +
- "'test' || 'string', " +
- "NULLIF(1, 1), " +
- "TIMESTAMP '1990-10-14 23:00:00.123' + INTERVAL '10 00:00:01' DAY TO SECOND, " +
- "EXTRACT(DAY FROM INTERVAL '19 12:10:10.123' DAY TO SECOND(3)), " +
- "1 IS NULL, " +
- "'TEST' LIKE '%EST', " +
- "FLOOR(2.5), " +
- "'TEST' IN ('west', 'TEST', 'rest'), " +
- "CAST(TRUE AS VARCHAR) || 'X'" +
- "FROM MyTable WHERE a>(1+7)"
-
- val expected = unaryNode(
- "DataSetCalc",
- batchTableNode(0),
- term("select",
- "+(7, a) AS EXPR$0",
- "+(b, 3) AS EXPR$1",
- "'b' AS EXPR$2",
- "'STRING' AS EXPR$3",
- "'teststring' AS EXPR$4",
- "null AS EXPR$5",
- "1990-10-24 23:00:01.123 AS EXPR$6",
- "19 AS EXPR$7",
- "false AS EXPR$8",
- "true AS EXPR$9",
- "2 AS EXPR$10",
- "true AS EXPR$11",
- "'trueX' AS EXPR$12"
- ),
- term("where", ">(a, 8)")
- )
-
- util.verifySql(sqlQuery, expected)
- }
-
- @Test
- def testReduceProjectExpression(): Unit = {
- val util = batchTestUtil()
- util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
-
- val sqlQuery = "SELECT " +
- "(3+4)+a, " +
- "b+(1+2), " +
- "CASE 11 WHEN 1 THEN 'a' ELSE 'b' END, " +
- "TRIM(BOTH ' STRING '), " +
- "'test' || 'string', " +
- "NULLIF(1, 1), " +
- "TIMESTAMP '1990-10-14 23:00:00.123' + INTERVAL '10 00:00:01' DAY TO SECOND, " +
- "EXTRACT(DAY FROM INTERVAL '19 12:10:10.123' DAY TO SECOND(3)), " +
- "1 IS NULL, " +
- "'TEST' LIKE '%EST', " +
- "FLOOR(2.5), " +
- "'TEST' IN ('west', 'TEST', 'rest'), " +
- "CAST(TRUE AS VARCHAR) || 'X'" +
- "FROM MyTable"
-
- val expected = unaryNode(
- "DataSetCalc",
- batchTableNode(0),
- term("select",
- "+(7, a) AS EXPR$0",
- "+(b, 3) AS EXPR$1",
- "'b' AS EXPR$2",
- "'STRING' AS EXPR$3",
- "'teststring' AS EXPR$4",
- "null AS EXPR$5",
- "1990-10-24 23:00:01.123 AS EXPR$6",
- "19 AS EXPR$7",
- "false AS EXPR$8",
- "true AS EXPR$9",
- "2 AS EXPR$10",
- "true AS EXPR$11",
- "'trueX' AS EXPR$12"
- )
- )
-
- util.verifySql(sqlQuery, expected)
- }
-
- @Test
- def testReduceFilterExpression(): Unit = {
- val util = batchTestUtil()
- util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
-
- val sqlQuery = "SELECT " +
- "*" +
- "FROM MyTable WHERE a>(1+7)"
-
- val expected = unaryNode(
- "DataSetCalc",
- batchTableNode(0),
- term("select", "a", "b", "c"),
- term("where", ">(a, 8)")
- )
-
- util.verifySql(sqlQuery, expected)
- }
-
- @Test
- def testNestedTablesReduction(): Unit = {
- val util = batchTestUtil()
-
- util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
-
- val newTable = util.tableEnv.sql("SELECT 1 + 1 + a AS a FROM MyTable")
-
- util.tableEnv.registerTable("NewTable", newTable)
-
- val sqlQuery = "SELECT a FROM NewTable"
-
- // 1+1 should be normalized to 2
- val expected = unaryNode("DataSetCalc", batchTableNode(0), term("select", "+(2, a) AS a"))
-
- util.verifySql(sqlQuery, expected)
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/batch/table/ExpressionReductionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/batch/table/ExpressionReductionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/batch/table/ExpressionReductionTest.scala
deleted file mode 100644
index 595063a..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/batch/table/ExpressionReductionTest.scala
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.expressions.batch.table
-
-import org.apache.flink.api.scala._
-import org.apache.flink.table.api.Types
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.utils.TableTestBase
-import org.apache.flink.table.utils.TableTestUtil._
-import org.junit.Test
-
-class ExpressionReductionTest extends TableTestBase {
-
- @Test
- def testReduceCalcExpression(): Unit = {
- val util = batchTestUtil()
- val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
-
- val result = table
- .where('a > (1 + 7))
- .select((3 + 4).toExpr + 6,
- (11 === 1) ? ("a", "b"),
- " STRING ".trim,
- "test" + "string",
- "1990-10-14 23:00:00.123".toTimestamp + 10.days + 1.second,
- 1.isNull,
- "TEST".like("%EST"),
- 2.5.toExpr.floor(),
- true.cast(Types.STRING) + "X")
-
- val expected = unaryNode(
- "DataSetCalc",
- batchTableNode(0),
- term("select",
- "13 AS _c0",
- "'b' AS _c1",
- "'STRING' AS _c2",
- "'teststring' AS _c3",
- "1990-10-24 23:00:01.123 AS _c4",
- "false AS _c5",
- "true AS _c6",
- "2E0 AS _c7",
- "'trueX' AS _c8"
- ),
- term("where", ">(a, 8)")
- )
-
- util.verifyTable(result, expected)
- }
-
- @Test
- def testReduceProjectExpression(): Unit = {
- val util = batchTestUtil()
- val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
-
- val result = table
- .select((3 + 4).toExpr + 6,
- (11 === 1) ? ("a", "b"),
- " STRING ".trim,
- "test" + "string",
- "1990-10-14 23:00:00.123".toTimestamp + 10.days + 1.second,
- 1.isNull,
- "TEST".like("%EST"),
- 2.5.toExpr.floor(),
- true.cast(Types.STRING) + "X")
-
- val expected = unaryNode(
- "DataSetCalc",
- batchTableNode(0),
- term("select",
- "13 AS _c0",
- "'b' AS _c1",
- "'STRING' AS _c2",
- "'teststring' AS _c3",
- "1990-10-24 23:00:01.123 AS _c4",
- "false AS _c5",
- "true AS _c6",
- "2E0 AS _c7",
- "'trueX' AS _c8"
- )
- )
-
- util.verifyTable(result, expected)
- }
-
- @Test
- def testReduceFilterExpression(): Unit = {
- val util = batchTestUtil()
- val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
-
- val result = table
- .where('a > (1 + 7))
-
- val expected = unaryNode(
- "DataSetCalc",
- batchTableNode(0),
- term("select", "a", "b", "c"),
- term("where", ">(a, 8)")
- )
-
- util.verifyTable(result, expected)
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/stream/sql/ExpressionReductionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/stream/sql/ExpressionReductionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/stream/sql/ExpressionReductionTest.scala
deleted file mode 100644
index 0a22d8c..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/stream/sql/ExpressionReductionTest.scala
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.expressions.stream.sql
-
-import org.apache.flink.api.scala._
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.utils.TableTestBase
-import org.apache.flink.table.utils.TableTestUtil._
-import org.junit.Test
-
-class ExpressionReductionTest extends TableTestBase {
-
- @Test
- def testReduceCalcExpression(): Unit = {
- val util = streamTestUtil()
- util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
-
- val sqlQuery = "SELECT " +
- "(3+4)+a, " +
- "b+(1+2), " +
- "CASE 11 WHEN 1 THEN 'a' ELSE 'b' END, " +
- "TRIM(BOTH ' STRING '), " +
- "'test' || 'string', " +
- "NULLIF(1, 1), " +
- "TIMESTAMP '1990-10-14 23:00:00.123' + INTERVAL '10 00:00:01' DAY TO SECOND, " +
- "EXTRACT(DAY FROM INTERVAL '19 12:10:10.123' DAY TO SECOND(3)), " +
- "1 IS NULL, " +
- "'TEST' LIKE '%EST', " +
- "FLOOR(2.5), " +
- "'TEST' IN ('west', 'TEST', 'rest'), " +
- "CAST(TRUE AS VARCHAR) || 'X'" +
- "FROM MyTable WHERE a>(1+7)"
-
- val expected = unaryNode(
- "DataStreamCalc",
- streamTableNode(0),
- term("select",
- "+(7, a) AS EXPR$0",
- "+(b, 3) AS EXPR$1",
- "'b' AS EXPR$2",
- "'STRING' AS EXPR$3",
- "'teststring' AS EXPR$4",
- "null AS EXPR$5",
- "1990-10-24 23:00:01.123 AS EXPR$6",
- "19 AS EXPR$7",
- "false AS EXPR$8",
- "true AS EXPR$9",
- "2 AS EXPR$10",
- "true AS EXPR$11",
- "'trueX' AS EXPR$12"
- ),
- term("where", ">(a, 8)")
- )
-
- util.verifySql(sqlQuery, expected)
- }
-
- @Test
- def testReduceProjectExpression(): Unit = {
- val util = streamTestUtil()
- util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
-
- val sqlQuery = "SELECT " +
- "(3+4)+a, " +
- "b+(1+2), " +
- "CASE 11 WHEN 1 THEN 'a' ELSE 'b' END, " +
- "TRIM(BOTH ' STRING '), " +
- "'test' || 'string', " +
- "NULLIF(1, 1), " +
- "TIMESTAMP '1990-10-14 23:00:00.123' + INTERVAL '10 00:00:01' DAY TO SECOND, " +
- "EXTRACT(DAY FROM INTERVAL '19 12:10:10.123' DAY TO SECOND(3)), " +
- "1 IS NULL, " +
- "'TEST' LIKE '%EST', " +
- "FLOOR(2.5), " +
- "'TEST' IN ('west', 'TEST', 'rest'), " +
- "CAST(TRUE AS VARCHAR) || 'X'" +
- "FROM MyTable"
-
- val expected = unaryNode(
- "DataStreamCalc",
- streamTableNode(0),
- term("select",
- "+(7, a) AS EXPR$0",
- "+(b, 3) AS EXPR$1",
- "'b' AS EXPR$2",
- "'STRING' AS EXPR$3",
- "'teststring' AS EXPR$4",
- "null AS EXPR$5",
- "1990-10-24 23:00:01.123 AS EXPR$6",
- "19 AS EXPR$7",
- "false AS EXPR$8",
- "true AS EXPR$9",
- "2 AS EXPR$10",
- "true AS EXPR$11",
- "'trueX' AS EXPR$12"
- )
- )
-
- util.verifySql(sqlQuery, expected)
- }
-
- @Test
- def testReduceFilterExpression(): Unit = {
- val util = streamTestUtil()
- util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
-
- val sqlQuery = "SELECT " +
- "*" +
- "FROM MyTable WHERE a>(1+7)"
-
- val expected = unaryNode(
- "DataStreamCalc",
- streamTableNode(0),
- term("select", "a", "b", "c"),
- term("where", ">(a, 8)")
- )
-
- util.verifySql(sqlQuery, expected)
- }
-
- @Test
- def testNestedTablesReduction(): Unit = {
- val util = streamTestUtil()
-
- util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
-
- val newTable = util.tableEnv.sql("SELECT 1 + 1 + a AS a FROM MyTable")
-
- util.tableEnv.registerTable("NewTable", newTable)
-
- val sqlQuery = "SELECT a FROM NewTable"
-
- // 1+1 should be normalized to 2
- val expected = unaryNode("DataStreamCalc", streamTableNode(0), term("select", "+(2, a) AS a"))
-
- util.verifySql(sqlQuery, expected)
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/stream/table/ExpressionReductionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/stream/table/ExpressionReductionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/stream/table/ExpressionReductionTest.scala
deleted file mode 100644
index a62bcd3..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/stream/table/ExpressionReductionTest.scala
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.expressions.stream.table
-
-import org.apache.flink.api.scala._
-import org.apache.flink.table.api.Types
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.utils.TableTestBase
-import org.apache.flink.table.utils.TableTestUtil._
-import org.junit.Test
-
-class ExpressionReductionTest extends TableTestBase {
-
- @Test
- def testReduceCalcExpression(): Unit = {
- val util = streamTestUtil()
- val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
-
- val result = table
- .where('a > (1 + 7))
- .select((3 + 4).toExpr + 6,
- (11 === 1) ? ("a", "b"),
- " STRING ".trim,
- "test" + "string",
- "1990-10-14 23:00:00.123".toTimestamp + 10.days + 1.second,
- 1.isNull,
- "TEST".like("%EST"),
- 2.5.toExpr.floor(),
- true.cast(Types.STRING) + "X")
-
- val expected = unaryNode(
- "DataStreamCalc",
- streamTableNode(0),
- term("select",
- "13 AS _c0",
- "'b' AS _c1",
- "'STRING' AS _c2",
- "'teststring' AS _c3",
- "1990-10-24 23:00:01.123 AS _c4",
- "false AS _c5",
- "true AS _c6",
- "2E0 AS _c7",
- "'trueX' AS _c8"
- ),
- term("where", ">(a, 8)")
- )
-
- util.verifyTable(result, expected)
- }
-
- @Test
- def testReduceProjectExpression(): Unit = {
- val util = streamTestUtil()
- val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
-
- val result = table
- .select((3 + 4).toExpr + 6,
- (11 === 1) ? ("a", "b"),
- " STRING ".trim,
- "test" + "string",
- "1990-10-14 23:00:00.123".toTimestamp + 10.days + 1.second,
- 1.isNull,
- "TEST".like("%EST"),
- 2.5.toExpr.floor(),
- true.cast(Types.STRING) + "X")
-
- val expected = unaryNode(
- "DataStreamCalc",
- streamTableNode(0),
- term("select",
- "13 AS _c0",
- "'b' AS _c1",
- "'STRING' AS _c2",
- "'teststring' AS _c3",
- "1990-10-24 23:00:01.123 AS _c4",
- "false AS _c5",
- "true AS _c6",
- "2E0 AS _c7",
- "'trueX' AS _c8"
- )
- )
-
- util.verifyTable(result, expected)
- }
-
- @Test
- def testReduceFilterExpression(): Unit = {
- val util = streamTestUtil()
- val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
-
- val result = table
- .where('a > (1 + 7))
-
- val expected = unaryNode(
- "DataStreamCalc",
- streamTableNode(0),
- term("select", "a", "b", "c"),
- term("where", ">(a, 8)")
- )
-
- util.verifyTable(result, expected)
- }
-
- @Test
- def testNestedTablesReduction(): Unit = {
- val util = streamTestUtil()
-
- util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
-
- val newTable = util.tableEnv.sql("SELECT 1 + 1 + a AS a FROM MyTable")
-
- util.tableEnv.registerTable("NewTable", newTable)
-
- val sqlQuery = "SELECT a FROM NewTable"
-
- // 1+1 should be normalized to 2
- val expected = unaryNode("DataStreamCalc", streamTableNode(0), term("select", "+(2, a) AS a"))
-
- util.verifySql(sqlQuery, expected)
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/CompositeAccessTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/CompositeAccessTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/CompositeAccessTestBase.scala
deleted file mode 100644
index 7243f1d..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/CompositeAccessTestBase.scala
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.expressions.utils
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.typeutils.{RowTypeInfo, TupleTypeInfo, TypeExtractor}
-import org.apache.flink.api.scala.createTypeInformation
-import org.apache.flink.table.api.Types
-import org.apache.flink.table.expressions.utils.CompositeAccessTestBase.{MyCaseClass, MyCaseClass2, MyPojo}
-import org.apache.flink.types.Row
-
-class CompositeAccessTestBase extends ExpressionTestBase {
-
- def testData = {
- val testData = new Row(8)
- testData.setField(0, MyCaseClass(42, "Bob", booleanField = true))
- testData.setField(1, MyCaseClass2(MyCaseClass(25, "Timo", booleanField = false)))
- testData.setField(2, ("a", "b"))
- testData.setField(3, new org.apache.flink.api.java.tuple.Tuple2[String, String]("a", "b"))
- testData.setField(4, new MyPojo())
- testData.setField(5, 13)
- testData.setField(6, MyCaseClass2(null))
- testData.setField(7, Tuple1(true))
- testData
- }
-
- def typeInfo = {
- new RowTypeInfo(
- createTypeInformation[MyCaseClass],
- createTypeInformation[MyCaseClass2],
- createTypeInformation[(String, String)],
- new TupleTypeInfo(Types.STRING, Types.STRING),
- TypeExtractor.createTypeInfo(classOf[MyPojo]),
- Types.INT,
- createTypeInformation[MyCaseClass2],
- createTypeInformation[Tuple1[Boolean]]
- ).asInstanceOf[TypeInformation[Any]]
- }
-
-}
-
-object CompositeAccessTestBase {
- case class MyCaseClass(intField: Int, stringField: String, booleanField: Boolean)
-
- case class MyCaseClass2(objectField: MyCaseClass)
-
- class MyPojo {
- private var myInt: Int = 0
- private var myString: String = "Hello"
-
- def getMyInt = myInt
-
- def setMyInt(value: Int) = {
- myInt = value
- }
-
- def getMyString = myString
-
- def setMyString(value: String) = {
- myString = myString
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/CompositeTypeTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/CompositeTypeTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/CompositeTypeTestBase.scala
new file mode 100644
index 0000000..8f7360e
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/CompositeTypeTestBase.scala
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.expressions.utils
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.{RowTypeInfo, TupleTypeInfo, TypeExtractor}
+import org.apache.flink.api.scala.createTypeInformation
+import org.apache.flink.table.api.Types
+import org.apache.flink.table.expressions.utils.CompositeTypeTestBase.{MyCaseClass, MyCaseClass2, MyPojo}
+import org.apache.flink.types.Row
+
+class CompositeTypeTestBase extends ExpressionTestBase {
+
+ def testData: Row = {
+ val testData = new Row(8)
+ testData.setField(0, MyCaseClass(42, "Bob", booleanField = true))
+ testData.setField(1, MyCaseClass2(MyCaseClass(25, "Timo", booleanField = false)))
+ testData.setField(2, ("a", "b"))
+ testData.setField(3, new org.apache.flink.api.java.tuple.Tuple2[String, String]("a", "b"))
+ testData.setField(4, new MyPojo())
+ testData.setField(5, 13)
+ testData.setField(6, MyCaseClass2(null))
+ testData.setField(7, Tuple1(true))
+ testData
+ }
+
+ def typeInfo: TypeInformation[Any] = {
+ new RowTypeInfo(
+ createTypeInformation[MyCaseClass],
+ createTypeInformation[MyCaseClass2],
+ createTypeInformation[(String, String)],
+ new TupleTypeInfo(Types.STRING, Types.STRING),
+ TypeExtractor.createTypeInfo(classOf[MyPojo]),
+ Types.INT,
+ createTypeInformation[MyCaseClass2],
+ createTypeInformation[Tuple1[Boolean]]
+ ).asInstanceOf[TypeInformation[Any]]
+ }
+}
+
+object CompositeTypeTestBase {
+ case class MyCaseClass(intField: Int, stringField: String, booleanField: Boolean)
+
+ case class MyCaseClass2(objectField: MyCaseClass)
+
+ class MyPojo {
+ private var myInt: Int = 0
+ private var myString: String = "Hello"
+
+ def getMyInt: Int = myInt
+
+ def setMyInt(value: Int): Unit = {
+ myInt = value
+ }
+
+ def getMyString: String = myString
+
+ def setMyString(value: String): Unit = {
+ myString = myString
+ }
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/MapTypeTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/MapTypeTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/MapTypeTestBase.scala
index eacf052..d90d9df 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/MapTypeTestBase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/MapTypeTestBase.scala
@@ -49,5 +49,4 @@ class MapTypeTestBase extends ExpressionTestBase {
new MapTypeInfo(BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO)
).asInstanceOf[TypeInformation[Any]]
}
-
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ScalarFunctionsTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ScalarFunctionsTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ScalarFunctionsTestBase.scala
deleted file mode 100644
index c526a01..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ScalarFunctionsTestBase.scala
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.expressions.utils
-
-import java.sql.{Date, Time, Timestamp}
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.typeutils.RowTypeInfo
-import org.apache.flink.table.api.Types
-import org.apache.flink.types.Row
-
-class ScalarFunctionsTestBase extends ExpressionTestBase {
-
- def testData = {
- val testData = new Row(33)
- testData.setField(0, "This is a test String.")
- testData.setField(1, true)
- testData.setField(2, 42.toByte)
- testData.setField(3, 43.toShort)
- testData.setField(4, 44.toLong)
- testData.setField(5, 4.5.toFloat)
- testData.setField(6, 4.6)
- testData.setField(7, 3)
- testData.setField(8, " This is a test String. ")
- testData.setField(9, -42.toByte)
- testData.setField(10, -43.toShort)
- testData.setField(11, -44.toLong)
- testData.setField(12, -4.5.toFloat)
- testData.setField(13, -4.6)
- testData.setField(14, -3)
- testData.setField(15, BigDecimal("-1231.1231231321321321111").bigDecimal)
- testData.setField(16, Date.valueOf("1996-11-10"))
- testData.setField(17, Time.valueOf("06:55:44"))
- testData.setField(18, Timestamp.valueOf("1996-11-10 06:55:44.333"))
- testData.setField(19, 1467012213000L) // +16979 07:23:33.000
- testData.setField(20, 25) // +2-01
- testData.setField(21, null)
- testData.setField(22, BigDecimal("2").bigDecimal)
- testData.setField(23, "%This is a test String.")
- testData.setField(24, "*_This is a test String.")
- testData.setField(25, 0.42.toByte)
- testData.setField(26, 0.toShort)
- testData.setField(27, 0.toLong)
- testData.setField(28, 0.45.toFloat)
- testData.setField(29, 0.46)
- testData.setField(30, 1)
- testData.setField(31, BigDecimal("-0.1231231321321321111").bigDecimal)
- testData.setField(32, -1)
- testData
- }
-
- def typeInfo = {
- new RowTypeInfo(
- Types.STRING,
- Types.BOOLEAN,
- Types.BYTE,
- Types.SHORT,
- Types.LONG,
- Types.FLOAT,
- Types.DOUBLE,
- Types.INT,
- Types.STRING,
- Types.BYTE,
- Types.SHORT,
- Types.LONG,
- Types.FLOAT,
- Types.DOUBLE,
- Types.INT,
- Types.DECIMAL,
- Types.SQL_DATE,
- Types.SQL_TIME,
- Types.SQL_TIMESTAMP,
- Types.INTERVAL_MILLIS,
- Types.INTERVAL_MONTHS,
- Types.BOOLEAN,
- Types.DECIMAL,
- Types.STRING,
- Types.STRING,
- Types.BYTE,
- Types.SHORT,
- Types.LONG,
- Types.FLOAT,
- Types.DOUBLE,
- Types.INT,
- Types.DECIMAL,
- Types.INT).asInstanceOf[TypeInformation[Any]]
-
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ScalarOperatorsTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ScalarOperatorsTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ScalarOperatorsTestBase.scala
index 0a77d29..2d22843 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ScalarOperatorsTestBase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ScalarOperatorsTestBase.scala
@@ -26,7 +26,7 @@ import org.apache.flink.types.Row
class ScalarOperatorsTestBase extends ExpressionTestBase {
- def testData = {
+ def testData: Row = {
val testData = new Row(13)
testData.setField(0, 1: Byte)
testData.setField(1, 1: Short)
@@ -44,7 +44,7 @@ class ScalarOperatorsTestBase extends ExpressionTestBase {
testData
}
- def typeInfo = {
+ def typeInfo: TypeInformation[Any] = {
new RowTypeInfo(
Types.BYTE,
Types.SHORT,
@@ -65,5 +65,4 @@ class ScalarOperatorsTestBase extends ExpressionTestBase {
override def functions: Map[String, ScalarFunction] = Map(
"shouldNotExecuteFunc" -> ShouldNotExecuteFunc
)
-
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ScalarTypesTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ScalarTypesTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ScalarTypesTestBase.scala
new file mode 100644
index 0000000..41fe52c
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ScalarTypesTestBase.scala
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.expressions.utils
+
+import java.sql.{Date, Time, Timestamp}
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.table.api.Types
+import org.apache.flink.types.Row
+
+class ScalarTypesTestBase extends ExpressionTestBase {
+
+ def testData: Row = {
+ val testData = new Row(34)
+ testData.setField(0, "This is a test String.")
+ testData.setField(1, true)
+ testData.setField(2, 42.toByte)
+ testData.setField(3, 43.toShort)
+ testData.setField(4, 44.toLong)
+ testData.setField(5, 4.5.toFloat)
+ testData.setField(6, 4.6)
+ testData.setField(7, 3)
+ testData.setField(8, " This is a test String. ")
+ testData.setField(9, -42.toByte)
+ testData.setField(10, -43.toShort)
+ testData.setField(11, -44.toLong)
+ testData.setField(12, -4.5.toFloat)
+ testData.setField(13, -4.6)
+ testData.setField(14, -3)
+ testData.setField(15, BigDecimal("-1231.1231231321321321111").bigDecimal)
+ testData.setField(16, Date.valueOf("1996-11-10"))
+ testData.setField(17, Time.valueOf("06:55:44"))
+ testData.setField(18, Timestamp.valueOf("1996-11-10 06:55:44.333"))
+ testData.setField(19, 1467012213000L) // +16979 07:23:33.000
+ testData.setField(20, 25) // +2-01
+ testData.setField(21, null)
+ testData.setField(22, BigDecimal("2").bigDecimal)
+ testData.setField(23, "%This is a test String.")
+ testData.setField(24, "*_This is a test String.")
+ testData.setField(25, 0.42.toByte)
+ testData.setField(26, 0.toShort)
+ testData.setField(27, 0.toLong)
+ testData.setField(28, 0.45.toFloat)
+ testData.setField(29, 0.46)
+ testData.setField(30, 1)
+ testData.setField(31, BigDecimal("-0.1231231321321321111").bigDecimal)
+ testData.setField(32, -1)
+ testData.setField(33, null)
+ testData
+ }
+
+ def typeInfo: TypeInformation[Any] = {
+ new RowTypeInfo(
+ Types.STRING,
+ Types.BOOLEAN,
+ Types.BYTE,
+ Types.SHORT,
+ Types.LONG,
+ Types.FLOAT,
+ Types.DOUBLE,
+ Types.INT,
+ Types.STRING,
+ Types.BYTE,
+ Types.SHORT,
+ Types.LONG,
+ Types.FLOAT,
+ Types.DOUBLE,
+ Types.INT,
+ Types.DECIMAL,
+ Types.SQL_DATE,
+ Types.SQL_TIME,
+ Types.SQL_TIMESTAMP,
+ Types.INTERVAL_MILLIS,
+ Types.INTERVAL_MONTHS,
+ Types.BOOLEAN,
+ Types.DECIMAL,
+ Types.STRING,
+ Types.STRING,
+ Types.BYTE,
+ Types.SHORT,
+ Types.LONG,
+ Types.FLOAT,
+ Types.DOUBLE,
+ Types.INT,
+ Types.DECIMAL,
+ Types.INT,
+ Types.STRING).asInstanceOf[TypeInformation[Any]]
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/UserDefinedScalarFunctions.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/UserDefinedScalarFunctions.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/UserDefinedScalarFunctions.scala
deleted file mode 100644
index 5285569..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/UserDefinedScalarFunctions.scala
+++ /dev/null
@@ -1,270 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.expressions.utils
-
-import java.sql.{Date, Time, Timestamp}
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.table.api.Types
-import org.apache.flink.table.functions.{ScalarFunction, FunctionContext}
-import org.junit.Assert
-
-import scala.annotation.varargs
-import scala.collection.mutable
-import scala.io.Source
-
-case class SimplePojo(name: String, age: Int)
-
-object Func0 extends ScalarFunction {
- def eval(index: Int): Int = {
- index
- }
-}
-
-object Func1 extends ScalarFunction {
- def eval(index: Integer): Integer = {
- index + 1
- }
-}
-
-object Func2 extends ScalarFunction {
- def eval(index: Integer, str: String, pojo: SimplePojo): String = {
- s"$index and $str and $pojo"
- }
-}
-
-object Func3 extends ScalarFunction {
- def eval(index: Integer, str: String): String = {
- s"$index and $str"
- }
-}
-
-object Func4 extends ScalarFunction {
- def eval(): Integer = {
- null
- }
-}
-
-object Func5 extends ScalarFunction {
- def eval(): Int = {
- -1
- }
-}
-
-object Func6 extends ScalarFunction {
- def eval(date: Date, time: Time, timestamp: Timestamp): (Date, Time, Timestamp) = {
- (date, time, timestamp)
- }
-}
-
-object Func7 extends ScalarFunction {
- def eval(a: Integer, b: Integer): Integer = {
- a + b
- }
-}
-
-object Func8 extends ScalarFunction {
- def eval(a: Int): String = {
- "a"
- }
-
- def eval(a: Int, b: Int): String = {
- "b"
- }
-
- def eval(a: String, b: String): String = {
- "c"
- }
-}
-
-object Func9 extends ScalarFunction {
- def eval(a: Int, b: Int, c: Long): String = {
- s"$a and $b and $c"
- }
-}
-
-object Func10 extends ScalarFunction {
- def eval(c: Long): Long = {
- c
- }
-
- override def getResultType(signature: Array[Class[_]]): TypeInformation[_] = {
- Types.SQL_TIMESTAMP
- }
-}
-
-object Func11 extends ScalarFunction {
- def eval(a: Int, b: Long): String = {
- s"$a and $b"
- }
-}
-
-object Func12 extends ScalarFunction {
- def eval(a: Long): Long = {
- a
- }
-
- override def getResultType(signature: Array[Class[_]]): TypeInformation[_] = {
- Types.INTERVAL_MILLIS
- }
-}
-
-object ShouldNotExecuteFunc extends ScalarFunction {
- def eval(s: String): Boolean = {
- throw new Exception("This func should never be executed")
- }
-}
-
-class RichFunc0 extends ScalarFunction {
- var openCalled = false
- var closeCalled = false
-
- override def open(context: FunctionContext): Unit = {
- super.open(context)
- if (openCalled) {
- Assert.fail("Open called more than once.")
- } else {
- openCalled = true
- }
- if (closeCalled) {
- Assert.fail("Close called before open.")
- }
- }
-
- def eval(index: Int): Int = {
- if (!openCalled) {
- Assert.fail("Open was not called before eval.")
- }
- if (closeCalled) {
- Assert.fail("Close called before eval.")
- }
-
- index + 1
- }
-
- override def close(): Unit = {
- super.close()
- if (closeCalled) {
- Assert.fail("Close called more than once.")
- } else {
- closeCalled = true
- }
- if (!openCalled) {
- Assert.fail("Open was not called before close.")
- }
- }
-}
-
-class RichFunc1 extends ScalarFunction {
- var added = Int.MaxValue
-
- override def open(context: FunctionContext): Unit = {
- added = context.getJobParameter("int.value", "0").toInt
- }
-
- def eval(index: Int): Int = {
- index + added
- }
-
- override def close(): Unit = {
- added = Int.MaxValue
- }
-}
-
-class RichFunc2 extends ScalarFunction {
- var prefix = "ERROR_VALUE"
-
- override def open(context: FunctionContext): Unit = {
- prefix = context.getJobParameter("string.value", "")
- }
-
- def eval(value: String): String = {
- prefix + "#" + value
- }
-
- override def close(): Unit = {
- prefix = "ERROR_VALUE"
- }
-}
-
-class RichFunc3 extends ScalarFunction {
- private val words = mutable.HashSet[String]()
-
- override def open(context: FunctionContext): Unit = {
- val file = context.getCachedFile("words")
- for (line <- Source.fromFile(file.getCanonicalPath).getLines) {
- words.add(line.trim)
- }
- }
-
- def eval(value: String): Boolean = {
- words.contains(value)
- }
-
- override def close(): Unit = {
- words.clear()
- }
-}
-
-class Func13(prefix: String) extends ScalarFunction {
- def eval(a: String): String = {
- s"$prefix-$a"
- }
-}
-
-object Func14 extends ScalarFunction {
-
- @varargs
- def eval(a: Int*): Int = {
- a.sum
- }
-}
-
-object Func15 extends ScalarFunction {
-
- @varargs
- def eval(a: String, b: Int*): String = {
- a + b.length
- }
-
- def eval(a: String): String = {
- a
- }
-}
-
-object Func16 extends ScalarFunction {
-
- def eval(a: Seq[String]): String = {
- a.mkString(", ")
- }
-}
-
-object Func17 extends ScalarFunction {
-
- // Without @varargs, we will throw an exception
- def eval(a: String*): String = {
- a.mkString(", ")
- }
-}
-
-object Func18 extends ScalarFunction {
- def eval(str: String, prefix: String): Boolean = {
- str.startsWith(prefix)
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/userDefinedScalarFunctions.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/userDefinedScalarFunctions.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/userDefinedScalarFunctions.scala
new file mode 100644
index 0000000..5285569
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/userDefinedScalarFunctions.scala
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.expressions.utils
+
+import java.sql.{Date, Time, Timestamp}
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.Types
+import org.apache.flink.table.functions.{ScalarFunction, FunctionContext}
+import org.junit.Assert
+
+import scala.annotation.varargs
+import scala.collection.mutable
+import scala.io.Source
+
+case class SimplePojo(name: String, age: Int)
+
+object Func0 extends ScalarFunction {
+ def eval(index: Int): Int = {
+ index
+ }
+}
+
+object Func1 extends ScalarFunction {
+ def eval(index: Integer): Integer = {
+ index + 1
+ }
+}
+
+object Func2 extends ScalarFunction {
+ def eval(index: Integer, str: String, pojo: SimplePojo): String = {
+ s"$index and $str and $pojo"
+ }
+}
+
+object Func3 extends ScalarFunction {
+ def eval(index: Integer, str: String): String = {
+ s"$index and $str"
+ }
+}
+
+object Func4 extends ScalarFunction {
+ def eval(): Integer = {
+ null
+ }
+}
+
+object Func5 extends ScalarFunction {
+ def eval(): Int = {
+ -1
+ }
+}
+
+object Func6 extends ScalarFunction {
+ def eval(date: Date, time: Time, timestamp: Timestamp): (Date, Time, Timestamp) = {
+ (date, time, timestamp)
+ }
+}
+
+object Func7 extends ScalarFunction {
+ def eval(a: Integer, b: Integer): Integer = {
+ a + b
+ }
+}
+
+object Func8 extends ScalarFunction {
+ def eval(a: Int): String = {
+ "a"
+ }
+
+ def eval(a: Int, b: Int): String = {
+ "b"
+ }
+
+ def eval(a: String, b: String): String = {
+ "c"
+ }
+}
+
+object Func9 extends ScalarFunction {
+ def eval(a: Int, b: Int, c: Long): String = {
+ s"$a and $b and $c"
+ }
+}
+
+object Func10 extends ScalarFunction {
+ def eval(c: Long): Long = {
+ c
+ }
+
+ override def getResultType(signature: Array[Class[_]]): TypeInformation[_] = {
+ Types.SQL_TIMESTAMP
+ }
+}
+
+object Func11 extends ScalarFunction {
+ def eval(a: Int, b: Long): String = {
+ s"$a and $b"
+ }
+}
+
+object Func12 extends ScalarFunction {
+ def eval(a: Long): Long = {
+ a
+ }
+
+ override def getResultType(signature: Array[Class[_]]): TypeInformation[_] = {
+ Types.INTERVAL_MILLIS
+ }
+}
+
+object ShouldNotExecuteFunc extends ScalarFunction {
+ def eval(s: String): Boolean = {
+ throw new Exception("This func should never be executed")
+ }
+}
+
+class RichFunc0 extends ScalarFunction {
+ var openCalled = false
+ var closeCalled = false
+
+ override def open(context: FunctionContext): Unit = {
+ super.open(context)
+ if (openCalled) {
+ Assert.fail("Open called more than once.")
+ } else {
+ openCalled = true
+ }
+ if (closeCalled) {
+ Assert.fail("Close called before open.")
+ }
+ }
+
+ def eval(index: Int): Int = {
+ if (!openCalled) {
+ Assert.fail("Open was not called before eval.")
+ }
+ if (closeCalled) {
+ Assert.fail("Close called before eval.")
+ }
+
+ index + 1
+ }
+
+ override def close(): Unit = {
+ super.close()
+ if (closeCalled) {
+ Assert.fail("Close called more than once.")
+ } else {
+ closeCalled = true
+ }
+ if (!openCalled) {
+ Assert.fail("Open was not called before close.")
+ }
+ }
+}
+
+class RichFunc1 extends ScalarFunction {
+ var added = Int.MaxValue
+
+ override def open(context: FunctionContext): Unit = {
+ added = context.getJobParameter("int.value", "0").toInt
+ }
+
+ def eval(index: Int): Int = {
+ index + added
+ }
+
+ override def close(): Unit = {
+ added = Int.MaxValue
+ }
+}
+
+class RichFunc2 extends ScalarFunction {
+ var prefix = "ERROR_VALUE"
+
+ override def open(context: FunctionContext): Unit = {
+ prefix = context.getJobParameter("string.value", "")
+ }
+
+ def eval(value: String): String = {
+ prefix + "#" + value
+ }
+
+ override def close(): Unit = {
+ prefix = "ERROR_VALUE"
+ }
+}
+
+class RichFunc3 extends ScalarFunction {
+ private val words = mutable.HashSet[String]()
+
+ override def open(context: FunctionContext): Unit = {
+ val file = context.getCachedFile("words")
+ for (line <- Source.fromFile(file.getCanonicalPath).getLines) {
+ words.add(line.trim)
+ }
+ }
+
+ def eval(value: String): Boolean = {
+ words.contains(value)
+ }
+
+ override def close(): Unit = {
+ words.clear()
+ }
+}
+
+class Func13(prefix: String) extends ScalarFunction {
+ def eval(a: String): String = {
+ s"$prefix-$a"
+ }
+}
+
+object Func14 extends ScalarFunction {
+
+ @varargs
+ def eval(a: Int*): Int = {
+ a.sum
+ }
+}
+
+object Func15 extends ScalarFunction {
+
+ @varargs
+ def eval(a: String, b: Int*): String = {
+ a + b.length
+ }
+
+ def eval(a: String): String = {
+ a
+ }
+}
+
+object Func16 extends ScalarFunction {
+
+ def eval(a: Seq[String]): String = {
+ a.mkString(", ")
+ }
+}
+
+object Func17 extends ScalarFunction {
+
+ // Without @varargs, we will throw an exception
+ def eval(a: String*): String = {
+ a.mkString(", ")
+ }
+}
+
+object Func18 extends ScalarFunction {
+ def eval(str: String, prefix: String): Boolean = {
+ str.startsWith(prefix)
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/validation/CompositeAccessValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/validation/CompositeAccessValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/validation/CompositeAccessValidationTest.scala
index 5542533..829fe65 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/validation/CompositeAccessValidationTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/validation/CompositeAccessValidationTest.scala
@@ -20,10 +20,10 @@ package org.apache.flink.table.expressions.validation
import org.apache.flink.table.api.ValidationException
import org.apache.flink.table.api.scala._
-import org.apache.flink.table.expressions.utils.CompositeAccessTestBase
+import org.apache.flink.table.expressions.utils.CompositeTypeTestBase
import org.junit.Test
-class CompositeAccessValidationTest extends CompositeAccessTestBase {
+class CompositeAccessValidationTest extends CompositeTypeTestBase {
@Test(expected = classOf[ValidationException])
def testWrongSqlFieldFull(): Unit = {
@@ -54,7 +54,6 @@ class CompositeAccessValidationTest extends CompositeAccessTestBase {
def testWrongStringKeyField2(): Unit = {
testTableApi("fail", "f0.get('fghj')", "fail")
}
-
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/validation/ScalarFunctionsValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/validation/ScalarFunctionsValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/validation/ScalarFunctionsValidationTest.scala
index 819dbbce..deb4518 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/validation/ScalarFunctionsValidationTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/validation/ScalarFunctionsValidationTest.scala
@@ -20,10 +20,10 @@ package org.apache.flink.table.expressions.validation
import org.apache.flink.table.api.ValidationException
import org.apache.flink.table.api.scala._
-import org.apache.flink.table.expressions.utils.ScalarFunctionsTestBase
+import org.apache.flink.table.expressions.utils.ScalarTypesTestBase
import org.junit.Test
-class ScalarFunctionsValidationTest extends ScalarFunctionsTestBase {
+class ScalarFunctionsValidationTest extends ScalarTypesTestBase {
// ----------------------------------------------------------------------------------------------
// String functions
@@ -40,5 +40,4 @@ class ScalarFunctionsValidationTest extends ScalarFunctionsTestBase {
// Must fail. Parameter of substring must be an Integer not a String.
testTableApi("test".substring("test".toExpr), "FAIL", "FAIL")
}
-
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/validation/ScalarOperatorsValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/validation/ScalarOperatorsValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/validation/ScalarOperatorsValidationTest.scala
index 68c0b3f..9dcc990 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/validation/ScalarOperatorsValidationTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/validation/ScalarOperatorsValidationTest.scala
@@ -44,5 +44,4 @@ class ScalarOperatorsValidationTest extends ScalarOperatorsTestBase {
def testInvalidStringComparison2(): Unit = {
testTableApi("w" > 4.toExpr, "FAIL", "FAIL")
}
-
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/AggFunctionTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/AggFunctionTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/AggFunctionTestBase.scala
deleted file mode 100644
index 39b9ec3..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/AggFunctionTestBase.scala
+++ /dev/null
@@ -1,156 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.functions.aggfunctions
-
-import java.lang.reflect.Method
-import java.math.BigDecimal
-import java.util.{ArrayList => JArrayList, List => JList}
-import org.apache.flink.table.functions.AggregateFunction
-import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils._
-import org.junit.Assert.assertEquals
-import org.junit.Test
-
-/**
- * Base class for aggregate function test
- *
- * @tparam T the type for the aggregation result
- */
-abstract class AggFunctionTestBase[T, ACC] {
- def inputValueSets: Seq[Seq[_]]
-
- def expectedResults: Seq[T]
-
- def aggregator: AggregateFunction[T, ACC]
-
- val accType = aggregator.getClass.getMethod("createAccumulator").getReturnType
-
- def accumulateFunc: Method = aggregator.getClass.getMethod("accumulate", accType, classOf[Any])
-
- def retractFunc: Method = null
-
- @Test
- // test aggregate and retract functions without partial merge
- def testAccumulateAndRetractWithoutMerge(): Unit = {
- // iterate over input sets
- for ((vals, expected) <- inputValueSets.zip(expectedResults)) {
- val accumulator = accumulateVals(vals)
- val result = aggregator.getValue(accumulator)
- validateResult[T](expected, result)
-
- if (ifMethodExistInFunction("retract", aggregator)) {
- retractVals(accumulator, vals)
- val expectedAccum = aggregator.createAccumulator()
- //The two accumulators should be exactly same
- validateResult[ACC](expectedAccum, accumulator)
- }
- }
- }
-
- @Test
- def testAggregateWithMerge(): Unit = {
-
- if (ifMethodExistInFunction("merge", aggregator)) {
- val mergeFunc =
- aggregator.getClass.getMethod("merge", accType, classOf[java.lang.Iterable[ACC]])
- // iterate over input sets
- for ((vals, expected) <- inputValueSets.zip(expectedResults)) {
- //equally split the vals sequence into two sequences
- val (firstVals, secondVals) = vals.splitAt(vals.length / 2)
-
- //1. verify merge with accumulate
- val accumulators: JList[ACC] = new JArrayList[ACC]()
- accumulators.add(accumulateVals(secondVals))
-
- val acc = accumulateVals(firstVals)
-
- mergeFunc.invoke(aggregator, acc.asInstanceOf[Object], accumulators)
- val result = aggregator.getValue(acc)
- validateResult[T](expected, result)
-
- //2. verify merge with accumulate & retract
- if (ifMethodExistInFunction("retract", aggregator)) {
- retractVals(acc, vals)
- val expectedAccum = aggregator.createAccumulator()
- //The two accumulators should be exactly same
- validateResult[ACC](expectedAccum, acc)
- }
- }
-
- // iterate over input sets
- for ((vals, expected) <- inputValueSets.zip(expectedResults)) {
- //3. test partial merge with an empty accumulator
- val accumulators: JList[ACC] = new JArrayList[ACC]()
- accumulators.add(aggregator.createAccumulator())
-
- val acc = accumulateVals(vals)
-
- mergeFunc.invoke(aggregator, acc.asInstanceOf[Object], accumulators)
- val result = aggregator.getValue(acc)
- validateResult[T](expected, result)
- }
- }
- }
-
- @Test
- // test aggregate functions with resetAccumulator
- def testResetAccumulator(): Unit = {
-
- if (ifMethodExistInFunction("resetAccumulator", aggregator)) {
- val resetAccFunc = aggregator.getClass.getMethod("resetAccumulator", accType)
- // iterate over input sets
- for ((vals, expected) <- inputValueSets.zip(expectedResults)) {
- val accumulator = accumulateVals(vals)
- resetAccFunc.invoke(aggregator, accumulator.asInstanceOf[Object])
- val expectedAccum = aggregator.createAccumulator()
- //The accumulator after reset should be exactly same as the new accumulator
- validateResult[ACC](expectedAccum, accumulator)
- }
- }
- }
-
- private def validateResult[T](expected: T, result: T): Unit = {
- (expected, result) match {
- case (e: DecimalSumWithRetractAccumulator, r: DecimalSumWithRetractAccumulator) =>
- // BigDecimal.equals() value and scale but we are only interested in value.
- assert(e.f0.compareTo(r.f0) == 0 && e.f1 == r.f1)
- case (e: DecimalAvgAccumulator, r: DecimalAvgAccumulator) =>
- // BigDecimal.equals() value and scale but we are only interested in value.
- assert(e.f0.compareTo(r.f0) == 0 && e.f1 == r.f1)
- case (e: BigDecimal, r: BigDecimal) =>
- // BigDecimal.equals() value and scale but we are only interested in value.
- assert(e.compareTo(r) == 0)
- case _ =>
- assertEquals(expected, result)
- }
- }
-
- private def accumulateVals(vals: Seq[_]): ACC = {
- val accumulator = aggregator.createAccumulator()
- vals.foreach(
- v =>
- accumulateFunc.invoke(aggregator, accumulator.asInstanceOf[Object], v.asInstanceOf[Object])
- )
- accumulator
- }
-
- private def retractVals(accumulator:ACC, vals: Seq[_]) = {
- vals.foreach(
- v => retractFunc.invoke(aggregator, accumulator.asInstanceOf[Object], v.asInstanceOf[Object])
- )
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/AvgFunctionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/AvgFunctionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/AvgFunctionTest.scala
deleted file mode 100644
index fd510ff..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/AvgFunctionTest.scala
+++ /dev/null
@@ -1,191 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.functions.aggfunctions
-
-import java.math.BigDecimal
-import org.apache.flink.table.functions.AggregateFunction
-
-/**
- * Test case for built-in average aggregate function
- *
- * @tparam T the type for the aggregation result
- */
-abstract class AvgAggFunctionTestBase[T: Numeric, ACC] extends AggFunctionTestBase[T, ACC] {
-
- private val numeric: Numeric[T] = implicitly[Numeric[T]]
-
- def minVal: T
-
- def maxVal: T
-
- override def inputValueSets: Seq[Seq[T]] = Seq(
- Seq(
- minVal,
- minVal,
- null.asInstanceOf[T],
- minVal,
- minVal,
- null.asInstanceOf[T],
- minVal,
- minVal,
- minVal
- ),
- Seq(
- maxVal,
- maxVal,
- null.asInstanceOf[T],
- maxVal,
- maxVal,
- null.asInstanceOf[T],
- maxVal,
- maxVal,
- maxVal
- ),
- Seq(
- minVal,
- maxVal,
- null.asInstanceOf[T],
- numeric.fromInt(0),
- numeric.negate(maxVal),
- numeric.negate(minVal),
- null.asInstanceOf[T]
- ),
- Seq(
- numeric.fromInt(1),
- numeric.fromInt(2),
- null.asInstanceOf[T],
- numeric.fromInt(3),
- numeric.fromInt(4),
- numeric.fromInt(5),
- null.asInstanceOf[T]
- ),
- Seq(
- null.asInstanceOf[T],
- null.asInstanceOf[T],
- null.asInstanceOf[T],
- null.asInstanceOf[T],
- null.asInstanceOf[T],
- null.asInstanceOf[T]
- )
- )
-
- override def expectedResults: Seq[T] = Seq(
- minVal,
- maxVal,
- numeric.fromInt(0),
- numeric.fromInt(3),
- null.asInstanceOf[T]
- )
-
- override def retractFunc = aggregator.getClass.getMethod("retract", accType, classOf[Any])
-}
-
-class ByteAvgAggFunctionTest extends AvgAggFunctionTestBase[Byte, IntegralAvgAccumulator] {
-
- override def minVal = (Byte.MinValue + 1).toByte
-
- override def maxVal = (Byte.MaxValue - 1).toByte
-
- override def aggregator = new ByteAvgAggFunction()
-}
-
-class ShortAvgAggFunctionTest extends AvgAggFunctionTestBase[Short, IntegralAvgAccumulator] {
-
- override def minVal = (Short.MinValue + 1).toShort
-
- override def maxVal = (Short.MaxValue - 1).toShort
-
- override def aggregator = new ShortAvgAggFunction()
-}
-
-class IntAvgAggFunctionTest extends AvgAggFunctionTestBase[Int, IntegralAvgAccumulator] {
-
- override def minVal = Int.MinValue + 1
-
- override def maxVal = Int.MaxValue - 1
-
- override def aggregator = new IntAvgAggFunction()
-}
-
-class LongAvgAggFunctionTest extends AvgAggFunctionTestBase[Long, BigIntegralAvgAccumulator] {
-
- override def minVal = Long.MinValue + 1
-
- override def maxVal = Long.MaxValue - 1
-
- override def aggregator = new LongAvgAggFunction()
-}
-
-class FloatAvgAggFunctionTest extends AvgAggFunctionTestBase[Float, FloatingAvgAccumulator] {
-
- override def minVal = Float.MinValue
-
- override def maxVal = Float.MaxValue
-
- override def aggregator = new FloatAvgAggFunction()
-}
-
-class DoubleAvgAggFunctionTest extends AvgAggFunctionTestBase[Double, FloatingAvgAccumulator] {
-
- override def minVal = Float.MinValue
-
- override def maxVal = Float.MaxValue
-
- override def aggregator = new DoubleAvgAggFunction()
-}
-
-class DecimalAvgAggFunctionTest extends AggFunctionTestBase[BigDecimal, DecimalAvgAccumulator] {
-
- override def inputValueSets: Seq[Seq[_]] = Seq(
- Seq(
- new BigDecimal("987654321000000"),
- new BigDecimal("-0.000000000012345"),
- null,
- new BigDecimal("0.000000000012345"),
- new BigDecimal("-987654321000000"),
- null,
- new BigDecimal("0")
- ),
- Seq(
- new BigDecimal("987654321000000"),
- new BigDecimal("-0.000000000012345"),
- null,
- new BigDecimal("0.000000000012345"),
- new BigDecimal("-987654321000000"),
- null,
- new BigDecimal("5")
- ),
- Seq(
- null,
- null,
- null,
- null
- )
- )
-
- override def expectedResults: Seq[BigDecimal] = Seq(
- BigDecimal.ZERO,
- BigDecimal.ONE,
- null
- )
-
- override def aggregator: AggregateFunction[BigDecimal, DecimalAvgAccumulator] =
- new DecimalAvgAggFunction()
-
- override def retractFunc = aggregator.getClass.getMethod("retract", accType, classOf[Any])
-}