You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2017/07/13 10:18:27 UTC
[18/44] flink git commit: [FLINK-6617] [table] Restructuring of tests
http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/TableProgramsTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/TableProgramsTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/TableProgramsTestBase.scala
new file mode 100644
index 0000000..e2d0473
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/TableProgramsTestBase.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.utils
+
+import org.apache.flink.table.api.TableConfig
+import org.apache.flink.table.runtime.utils.TableProgramsTestBase.TableConfigMode
+import org.apache.flink.test.util.MultipleProgramsTestBase
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+
+class TableProgramsTestBase(
+ mode: TestExecutionMode,
+ tableConfigMode: TableConfigMode)
+ extends MultipleProgramsTestBase(mode) {
+
+ def config: TableConfig = {
+ val conf = new TableConfig
+ tableConfigMode match {
+ case TableProgramsTestBase.NO_NULL =>
+ conf.setNullCheck(false)
+ case _ => // keep default
+ }
+ conf
+ }
+}
+
+object TableProgramsTestBase {
+ case class TableConfigMode(nullCheck: Boolean)
+
+ val DEFAULT = TableConfigMode(nullCheck = true)
+ val NO_NULL = TableConfigMode(nullCheck = false)
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/TimeTestUtil.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/TimeTestUtil.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/TimeTestUtil.scala
new file mode 100644
index 0000000..fab1e4a
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/TimeTestUtil.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.utils
+
+import org.apache.flink.streaming.api.functions.source.SourceFunction
+import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
+import org.apache.flink.streaming.api.watermark.Watermark
+
+object TimeTestUtil {
+
+ class EventTimeSourceFunction[T](
+ dataWithTimestampList: Seq[Either[(Long, T), Long]]) extends SourceFunction[T] {
+ override def run(ctx: SourceContext[T]): Unit = {
+ dataWithTimestampList.foreach {
+ case Left(t) => ctx.collectWithTimestamp(t._2, t._1)
+ case Right(w) => ctx.emitWatermark(new Watermark(w))
+ }
+ }
+
+ override def cancel(): Unit = ???
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/UserDefinedFunctionTestUtils.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/UserDefinedFunctionTestUtils.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/UserDefinedFunctionTestUtils.scala
new file mode 100644
index 0000000..176afcd
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/UserDefinedFunctionTestUtils.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.utils
+
+import java.io.File
+
+import com.google.common.base.Charsets
+import com.google.common.io.Files
+import org.apache.flink.api.scala.ExecutionEnvironment
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+
+object UserDefinedFunctionTestUtils {
+
+ def setJobParameters(env: ExecutionEnvironment, parameters: Map[String, String]): Unit = {
+ val conf = new Configuration()
+ parameters.foreach {
+ case (k, v) => conf.setString(k, v)
+ }
+ env.getConfig.setGlobalJobParameters(conf)
+ }
+
+ def setJobParameters(env: StreamExecutionEnvironment, parameters: Map[String, String]): Unit = {
+ val conf = new Configuration()
+ parameters.foreach {
+ case (k, v) => conf.setString(k, v)
+ }
+ env.getConfig.setGlobalJobParameters(conf)
+ }
+
+ def writeCacheFile(fileName: String, contents: String): String = {
+ val tempFile = File.createTempFile(this.getClass.getName + "-" + fileName, "tmp")
+ tempFile.deleteOnExit()
+ Files.write(contents, tempFile, Charsets.UTF_8)
+ tempFile.getAbsolutePath
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sinks/validation/TableSinksValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sinks/validation/TableSinksValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sinks/validation/TableSinksValidationTest.scala
deleted file mode 100644
index c800289..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sinks/validation/TableSinksValidationTest.scala
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.sinks.validation
-
-import org.apache.flink.table.api.scala._
-import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
-import org.apache.flink.table.api.{TableEnvironment, TableException}
-import org.apache.flink.table.api.scala.stream.utils.StreamTestData
-import org.apache.flink.table.runtime.datastream.table.TestAppendSink
-import org.junit.Test
-
-class TableSinksValidationTest extends StreamingMultipleProgramsTestBase {
-
- @Test(expected = classOf[TableException])
- def testAppendSinkOnUpdatingTable(): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
-
- val t = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'id, 'num, 'text)
-
- t.groupBy('text)
- .select('text, 'id.count, 'num.sum)
- .writeToSink(new TestAppendSink)
-
- // must fail because table is not append-only
- env.execute()
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/TableSourceTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/TableSourceTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/TableSourceTest.scala
deleted file mode 100644
index b01453d..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/TableSourceTest.scala
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.sources
-
-import org.apache.flink.table.api.Types
-import org.apache.flink.table.utils.TableTestUtil._
-import org.junit.{Assert, Test}
-
-class TableSourceTest extends TableSourceTestBase {
-
- @Test
- def testTableSourceScanToString(): Unit = {
- val (tableSource1, _) = filterableTableSource
- val (tableSource2, _) = filterableTableSource
- val util = batchTestUtil()
- val tEnv = util.tableEnv
-
- tEnv.registerTableSource("table1", tableSource1)
- tEnv.registerTableSource("table2", tableSource2)
-
- val table1 = tEnv.scan("table1").where("amount > 2")
- val table2 = tEnv.scan("table2").where("amount > 2")
- val result = table1.unionAll(table2)
-
- val expected = binaryNode(
- "DataSetUnion",
- batchFilterableSourceTableNode(
- "table1",
- Array("name", "id", "amount", "price"),
- "'amount > 2"),
- batchFilterableSourceTableNode(
- "table2",
- Array("name", "id", "amount", "price"),
- "'amount > 2"),
- term("union", "name, id, amount, price")
- )
- util.verifyTable(result, expected)
- }
-
- @Test
- def testCsvTableSourceBuilder(): Unit = {
- val source1 = CsvTableSource.builder()
- .path("/path/to/csv")
- .field("myfield", Types.STRING)
- .field("myfield2", Types.INT)
- .quoteCharacter(';')
- .fieldDelimiter("#")
- .lineDelimiter("\r\n")
- .commentPrefix("%%")
- .ignoreFirstLine()
- .ignoreParseErrors()
- .build()
-
- val source2 = new CsvTableSource(
- "/path/to/csv",
- Array("myfield", "myfield2"),
- Array(Types.STRING, Types.INT),
- "#",
- "\r\n",
- ';',
- true,
- "%%",
- true)
-
- Assert.assertEquals(source1, source2)
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/TableSourceTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/TableSourceTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/TableSourceTestBase.scala
deleted file mode 100644
index d408694..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/TableSourceTestBase.scala
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.sources
-
-import org.apache.flink.table.utils.{CommonTestData, TableTestBase, TestFilterableTableSource}
-
-class TableSourceTestBase extends TableTestBase {
-
- protected val projectedFields: Array[String] = Array("last", "id", "score")
- protected val noCalcFields: Array[String] = Array("id", "score", "first")
-
- def filterableTableSource:(TableSource[_], String) = {
- val tableSource = new TestFilterableTableSource
- (tableSource, "filterableTable")
- }
-
- def csvTable: (CsvTableSource, String) = {
- val csvTable = CommonTestData.getCsvTableSource
- val tableName = "csvTable"
- (csvTable, tableName)
- }
-
- def batchSourceTableNode(sourceName: String, fields: Array[String]): String = {
- s"BatchTableSourceScan(table=[[$sourceName]], fields=[${fields.mkString(", ")}])"
- }
-
- def streamSourceTableNode(sourceName: String, fields: Array[String] ): String = {
- s"StreamTableSourceScan(table=[[$sourceName]], fields=[${fields.mkString(", ")}])"
- }
-
- def batchFilterableSourceTableNode(
- sourceName: String,
- fields: Array[String],
- exp: String): String = {
- "BatchTableSourceScan(" +
- s"table=[[$sourceName]], fields=[${fields.mkString(", ")}], source=[filter=[$exp]])"
- }
-
- def streamFilterableSourceTableNode(
- sourceName: String,
- fields: Array[String],
- exp: String): String = {
- "StreamTableSourceScan(" +
- s"table=[[$sourceName]], fields=[${fields.mkString(", ")}], source=[filter=[$exp]])"
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/batch/TableSourceTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/batch/TableSourceTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/batch/TableSourceTest.scala
deleted file mode 100644
index f670452..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/batch/TableSourceTest.scala
+++ /dev/null
@@ -1,421 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.sources.batch
-
-import org.apache.flink.table.api.Types
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.expressions.utils._
-import org.apache.flink.table.sources.{CsvTableSource, TableSource}
-import org.apache.flink.table.utils.TableTestUtil._
-import org.apache.flink.table.utils.{CommonTestData, TableTestBase, TestFilterableTableSource}
-import org.junit.{Assert, Test}
-
-class TableSourceTest extends TableTestBase {
-
- private val projectedFields: Array[String] = Array("last", "id", "score")
- private val noCalcFields: Array[String] = Array("id", "score", "first")
-
- @Test
- def testTableSourceScanToString(): Unit = {
- val (tableSource1, _) = filterableTableSource
- val (tableSource2, _) = filterableTableSource
- val util = batchTestUtil()
- val tEnv = util.tableEnv
-
- tEnv.registerTableSource("table1", tableSource1)
- tEnv.registerTableSource("table2", tableSource2)
-
- val table1 = tEnv.scan("table1").where("amount > 2")
- val table2 = tEnv.scan("table2").where("amount > 2")
- val result = table1.unionAll(table2)
-
- val expected = binaryNode(
- "DataSetUnion",
- batchFilterableSourceTableNode(
- "table1",
- Array("name", "id", "amount", "price"),
- "'amount > 2"),
- batchFilterableSourceTableNode(
- "table2",
- Array("name", "id", "amount", "price"),
- "'amount > 2"),
- term("union", "name, id, amount, price")
- )
- util.verifyTable(result, expected)
- }
-
- // batch plan
-
- @Test
- def testBatchProjectableSourceScanPlanTableApi(): Unit = {
- val (tableSource, tableName) = csvTable
- val util = batchTestUtil()
- val tEnv = util.tableEnv
-
- tEnv.registerTableSource(tableName, tableSource)
-
- val result = tEnv
- .scan(tableName)
- .select('last.upperCase(), 'id.floor(), 'score * 2)
-
- val expected = unaryNode(
- "DataSetCalc",
- batchSourceTableNode(tableName, projectedFields),
- term("select", "UPPER(last) AS _c0", "FLOOR(id) AS _c1", "*(score, 2) AS _c2")
- )
-
- util.verifyTable(result, expected)
- }
-
- @Test
- def testBatchProjectableSourceScanPlanSQL(): Unit = {
- val (tableSource, tableName) = csvTable
- val util = batchTestUtil()
-
- util.tableEnv.registerTableSource(tableName, tableSource)
-
- val sqlQuery = s"SELECT `last`, floor(id), score * 2 FROM $tableName"
-
- val expected = unaryNode(
- "DataSetCalc",
- batchSourceTableNode(tableName, projectedFields),
- term("select", "last", "FLOOR(id) AS EXPR$1", "*(score, 2) AS EXPR$2")
- )
-
- util.verifySql(sqlQuery, expected)
- }
-
- @Test
- def testBatchProjectableSourceScanNoIdentityCalc(): Unit = {
- val (tableSource, tableName) = csvTable
- val util = batchTestUtil()
- val tEnv = util.tableEnv
-
- tEnv.registerTableSource(tableName, tableSource)
-
- val result = tEnv
- .scan(tableName)
- .select('id, 'score, 'first)
-
- val expected = batchSourceTableNode(tableName, noCalcFields)
- util.verifyTable(result, expected)
- }
-
- @Test
- def testBatchFilterableWithoutPushDown(): Unit = {
- val (tableSource, tableName) = filterableTableSource
- val util = batchTestUtil()
- val tEnv = util.tableEnv
-
- tEnv.registerTableSource(tableName, tableSource)
-
- val result = tEnv
- .scan(tableName)
- .select('price, 'id, 'amount)
- .where("price * 2 < 32")
-
- val expected = unaryNode(
- "DataSetCalc",
- batchSourceTableNode(
- tableName,
- Array("name", "id", "amount", "price")),
- term("select", "price", "id", "amount"),
- term("where", "<(*(price, 2), 32)")
- )
-
- util.verifyTable(result, expected)
- }
-
- @Test
- def testBatchFilterablePartialPushDown(): Unit = {
- val (tableSource, tableName) = filterableTableSource
- val util = batchTestUtil()
- val tEnv = util.tableEnv
-
- tEnv.registerTableSource(tableName, tableSource)
-
- val result = tEnv
- .scan(tableName)
- .where("amount > 2 && price * 2 < 32")
- .select('price, 'name.lowerCase(), 'amount)
-
- val expected = unaryNode(
- "DataSetCalc",
- batchFilterableSourceTableNode(
- tableName,
- Array("name", "id", "amount", "price"),
- "'amount > 2"),
- term("select", "price", "LOWER(name) AS _c1", "amount"),
- term("where", "<(*(price, 2), 32)")
- )
- util.verifyTable(result, expected)
- }
-
- @Test
- def testBatchFilterableFullyPushedDown(): Unit = {
- val (tableSource, tableName) = filterableTableSource
- val util = batchTestUtil()
- val tEnv = util.tableEnv
-
- tEnv.registerTableSource(tableName, tableSource)
-
- val result = tEnv
- .scan(tableName)
- .select('price, 'id, 'amount)
- .where("amount > 2 && amount < 32")
-
- val expected = unaryNode(
- "DataSetCalc",
- batchFilterableSourceTableNode(
- tableName,
- Array("name", "id", "amount", "price"),
- "'amount > 2 && 'amount < 32"),
- term("select", "price", "id", "amount")
- )
- util.verifyTable(result, expected)
- }
-
- @Test
- def testBatchFilterableWithUnconvertedExpression(): Unit = {
- val (tableSource, tableName) = filterableTableSource
- val util = batchTestUtil()
- val tEnv = util.tableEnv
-
- tEnv.registerTableSource(tableName, tableSource)
-
- val result = tEnv
- .scan(tableName)
- .select('price, 'id, 'amount)
- .where("amount > 2 && (amount < 32 || amount.cast(LONG) > 10)") // cast can not be converted
-
- val expected = unaryNode(
- "DataSetCalc",
- batchFilterableSourceTableNode(
- tableName,
- Array("name", "id", "amount", "price"),
- "'amount > 2"),
- term("select", "price", "id", "amount"),
- term("where", "OR(<(amount, 32), >(CAST(amount), 10))")
- )
- util.verifyTable(result, expected)
- }
-
- @Test
- def testBatchFilterableWithUDF(): Unit = {
- val (tableSource, tableName) = filterableTableSource
- val util = batchTestUtil()
- val tEnv = util.tableEnv
-
- tEnv.registerTableSource(tableName, tableSource)
- val func = Func0
- tEnv.registerFunction("func0", func)
-
- val result = tEnv
- .scan(tableName)
- .select('price, 'id, 'amount)
- .where("amount > 2 && func0(amount) < 32")
-
- val expected = unaryNode(
- "DataSetCalc",
- batchFilterableSourceTableNode(
- tableName,
- Array("name", "id", "amount", "price"),
- "'amount > 2"),
- term("select", "price", "id", "amount"),
- term("where", s"<(${func.functionIdentifier}(amount), 32)")
- )
-
- util.verifyTable(result, expected)
- }
-
- // stream plan
-
- @Test
- def testStreamProjectableSourceScanPlanTableApi(): Unit = {
- val (tableSource, tableName) = csvTable
- val util = streamTestUtil()
- val tEnv = util.tableEnv
-
- tEnv.registerTableSource(tableName, tableSource)
-
- val result = tEnv
- .scan(tableName)
- .select('last, 'id.floor(), 'score * 2)
-
- val expected = unaryNode(
- "DataStreamCalc",
- streamSourceTableNode(tableName, projectedFields),
- term("select", "last", "FLOOR(id) AS _c1", "*(score, 2) AS _c2")
- )
-
- util.verifyTable(result, expected)
- }
-
- @Test
- def testStreamProjectableSourceScanPlanSQL(): Unit = {
- val (tableSource, tableName) = csvTable
- val util = streamTestUtil()
-
- util.tableEnv.registerTableSource(tableName, tableSource)
-
- val sqlQuery = s"SELECT `last`, floor(id), score * 2 FROM $tableName"
-
- val expected = unaryNode(
- "DataStreamCalc",
- streamSourceTableNode(tableName, projectedFields),
- term("select", "last", "FLOOR(id) AS EXPR$1", "*(score, 2) AS EXPR$2")
- )
-
- util.verifySql(sqlQuery, expected)
- }
-
- @Test
- def testStreamProjectableSourceScanNoIdentityCalc(): Unit = {
- val (tableSource, tableName) = csvTable
- val util = streamTestUtil()
- val tEnv = util.tableEnv
-
- tEnv.registerTableSource(tableName, tableSource)
-
- val result = tEnv
- .scan(tableName)
- .select('id, 'score, 'first)
-
- val expected = streamSourceTableNode(tableName, noCalcFields)
- util.verifyTable(result, expected)
- }
-
- @Test
- def testStreamFilterableSourceScanPlanTableApi(): Unit = {
- val (tableSource, tableName) = filterableTableSource
- val util = streamTestUtil()
- val tEnv = util.tableEnv
-
- tEnv.registerTableSource(tableName, tableSource)
-
- val result = tEnv
- .scan(tableName)
- .select('price, 'id, 'amount)
- .where("amount > 2 && price * 2 < 32")
-
- val expected = unaryNode(
- "DataStreamCalc",
- streamFilterableSourceTableNode(
- tableName,
- Array("name", "id", "amount", "price"),
- "'amount > 2"),
- term("select", "price", "id", "amount"),
- term("where", "<(*(price, 2), 32)")
- )
-
- util.verifyTable(result, expected)
- }
-
- // csv builder
-
- @Test
- def testCsvTableSourceBuilder(): Unit = {
- val source1 = CsvTableSource.builder()
- .path("/path/to/csv")
- .field("myfield", Types.STRING)
- .field("myfield2", Types.INT)
- .quoteCharacter(';')
- .fieldDelimiter("#")
- .lineDelimiter("\r\n")
- .commentPrefix("%%")
- .ignoreFirstLine()
- .ignoreParseErrors()
- .build()
-
- val source2 = new CsvTableSource(
- "/path/to/csv",
- Array("myfield", "myfield2"),
- Array(Types.STRING, Types.INT),
- "#",
- "\r\n",
- ';',
- true,
- "%%",
- true)
-
- Assert.assertEquals(source1, source2)
- }
-
- @Test(expected = classOf[IllegalArgumentException])
- def testCsvTableSourceBuilderWithNullPath(): Unit = {
- CsvTableSource.builder()
- .field("myfield", Types.STRING)
- // should fail, path is not defined
- .build()
- }
-
- @Test(expected = classOf[IllegalArgumentException])
- def testCsvTableSourceBuilderWithDuplicateFieldName(): Unit = {
- CsvTableSource.builder()
- .path("/path/to/csv")
- .field("myfield", Types.STRING)
- // should fail, field name must no be duplicate
- .field("myfield", Types.INT)
- }
-
- @Test(expected = classOf[IllegalArgumentException])
- def testCsvTableSourceBuilderWithEmptyField(): Unit = {
- CsvTableSource.builder()
- .path("/path/to/csv")
- // should fail, field can be empty
- .build()
- }
-
- // utils
-
- def filterableTableSource:(TableSource[_], String) = {
- val tableSource = new TestFilterableTableSource
- (tableSource, "filterableTable")
- }
-
- def csvTable: (CsvTableSource, String) = {
- val csvTable = CommonTestData.getCsvTableSource
- val tableName = "csvTable"
- (csvTable, tableName)
- }
-
- def batchSourceTableNode(sourceName: String, fields: Array[String]): String = {
- s"BatchTableSourceScan(table=[[$sourceName]], fields=[${fields.mkString(", ")}])"
- }
-
- def streamSourceTableNode(sourceName: String, fields: Array[String] ): String = {
- s"StreamTableSourceScan(table=[[$sourceName]], fields=[${fields.mkString(", ")}])"
- }
-
- def batchFilterableSourceTableNode(
- sourceName: String,
- fields: Array[String],
- exp: String): String = {
- "BatchTableSourceScan(" +
- s"table=[[$sourceName]], fields=[${fields.mkString(", ")}], source=[filter=[$exp]])"
- }
-
- def streamFilterableSourceTableNode(
- sourceName: String,
- fields: Array[String],
- exp: String): String = {
- "StreamTableSourceScan(" +
- s"table=[[$sourceName]], fields=[${fields.mkString(", ")}], source=[filter=[$exp]])"
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/stream/sql/TableSourceTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/stream/sql/TableSourceTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/stream/sql/TableSourceTest.scala
deleted file mode 100644
index e6d6480..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/stream/sql/TableSourceTest.scala
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.sources.stream.sql
-
-import org.apache.flink.table.sources.TableSourceTestBase
-import org.apache.flink.table.utils.TableTestUtil._
-import org.junit.Test
-
-class TableSourceTest extends TableSourceTestBase {
-
- @Test
- def testProjectableSourceScanPlan(): Unit = {
- val (tableSource, tableName) = csvTable
- val util = streamTestUtil()
-
- util.tableEnv.registerTableSource(tableName, tableSource)
-
- val sqlQuery = s"SELECT `last`, floor(id), score * 2 FROM $tableName"
-
- val expected = unaryNode(
- "DataStreamCalc",
- streamSourceTableNode(tableName, projectedFields),
- term("select", "last", "FLOOR(id) AS EXPR$1", "*(score, 2) AS EXPR$2")
- )
-
- util.verifySql(sqlQuery, expected)
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/stream/table/TableSourceTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/stream/table/TableSourceTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/stream/table/TableSourceTest.scala
deleted file mode 100644
index c43913b..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/stream/table/TableSourceTest.scala
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.sources.stream.table
-
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.sources.TableSourceTestBase
-import org.apache.flink.table.utils.TableTestUtil._
-import org.junit.Test
-
-class TableSourceTest extends TableSourceTestBase {
-
- @Test
- def testProjectableSourceScanPlan(): Unit = {
- val (tableSource, tableName) = csvTable
- val util = streamTestUtil()
- val tEnv = util.tableEnv
-
- tEnv.registerTableSource(tableName, tableSource)
-
- val result = tEnv
- .scan(tableName)
- .select('last, 'id.floor(), 'score * 2)
-
- val expected = unaryNode(
- "DataStreamCalc",
- streamSourceTableNode(tableName, projectedFields),
- term("select", "last", "FLOOR(id) AS _c1", "*(score, 2) AS _c2")
- )
-
- util.verifyTable(result, expected)
- }
-
- @Test
- def testProjectableSourceScanNoIdentityCalc(): Unit = {
- val (tableSource, tableName) = csvTable
- val util = streamTestUtil()
- val tEnv = util.tableEnv
-
- tEnv.registerTableSource(tableName, tableSource)
-
- val result = tEnv
- .scan(tableName)
- .select('id, 'score, 'first)
-
- val expected = streamSourceTableNode(tableName, noCalcFields)
- util.verifyTable(result, expected)
- }
-
- @Test
- def testFilterableSourceScanPlan(): Unit = {
- val (tableSource, tableName) = filterableTableSource
- val util = streamTestUtil()
- val tEnv = util.tableEnv
-
- tEnv.registerTableSource(tableName, tableSource)
-
- val result = tEnv
- .scan(tableName)
- .select('price, 'id, 'amount)
- .where("amount > 2 && price * 2 < 32")
-
- val expected = unaryNode(
- "DataStreamCalc",
- streamFilterableSourceTableNode(
- tableName,
- Array("name", "id", "amount", "price"),
- "'amount > 2"),
- term("select", "price", "id", "amount"),
- term("where", "<(*(price, 2), 32)")
- )
-
- util.verifyTable(result, expected)
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/validation/TableSourceBalidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/validation/TableSourceBalidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/validation/TableSourceBalidationTest.scala
deleted file mode 100644
index e575ab8..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/validation/TableSourceBalidationTest.scala
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.sources.validation
-
-import org.apache.flink.table.api.Types
-import org.apache.flink.table.sources.CsvTableSource
-import org.apache.flink.table.sources.TableSourceTestBase
-import org.junit.Test
-
-class TableSourceBalidationTest extends TableSourceTestBase {
-
- @Test(expected = classOf[IllegalArgumentException])
- def testCsvTableSourceBuilderWithNullPath(): Unit = {
- CsvTableSource.builder()
- .field("myfield", Types.STRING)
- // should fail, path is not defined
- .build()
- }
-
- @Test(expected = classOf[IllegalArgumentException])
- def testCsvTableSourceBuilderWithDuplicateFieldName(): Unit = {
- CsvTableSource.builder()
- .path("/path/to/csv")
- .field("myfield", Types.STRING)
- // should fail, field name must no be duplicate
- .field("myfield", Types.INT)
- }
-
- @Test(expected = classOf[IllegalArgumentException])
- def testCsvTableSourceBuilderWithEmptyField(): Unit = {
- CsvTableSource.builder()
- .path("/path/to/csv")
- // should fail, field can be empty
- .build()
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/CommonTestData.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/CommonTestData.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/CommonTestData.scala
deleted file mode 100644
index 6a5c52f..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/CommonTestData.scala
+++ /dev/null
@@ -1,178 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.utils
-
-import java.io.{File, FileOutputStream, OutputStreamWriter}
-import java.util
-
-import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
-import org.apache.flink.api.java.typeutils.TypeExtractor
-import org.apache.flink.api.java.{DataSet, ExecutionEnvironment}
-import org.apache.flink.table.api.TableSchema
-import org.apache.flink.table.sources.{BatchTableSource, CsvTableSource}
-import org.apache.flink.table.catalog._
-
-object CommonTestData {
-
- def getCsvTableSource: CsvTableSource = {
- val csvRecords = Seq(
- "First#Id#Score#Last",
- "Mike#1#12.3#Smith",
- "Bob#2#45.6#Taylor",
- "Sam#3#7.89#Miller",
- "Peter#4#0.12#Smith",
- "% Just a comment",
- "Liz#5#34.5#Williams",
- "Sally#6#6.78#Miller",
- "Alice#7#90.1#Smith",
- "Kelly#8#2.34#Williams"
- )
-
- val tempFilePath = writeToTempFile(csvRecords.mkString("$"), "csv-test", "tmp")
- new CsvTableSource(
- tempFilePath,
- Array("first", "id", "score", "last"),
- Array(
- BasicTypeInfo.STRING_TYPE_INFO,
- BasicTypeInfo.INT_TYPE_INFO,
- BasicTypeInfo.DOUBLE_TYPE_INFO,
- BasicTypeInfo.STRING_TYPE_INFO
- ),
- fieldDelim = "#",
- rowDelim = "$",
- ignoreFirstLine = true,
- ignoreComments = "%"
- )
- }
-
- def getInMemoryTestCatalog: ExternalCatalog = {
- val csvRecord1 = Seq(
- "1#1#Hi",
- "2#2#Hello",
- "3#2#Hello world"
- )
- val tempFilePath1 = writeToTempFile(csvRecord1.mkString("$"), "csv-test1", "tmp")
- val properties1 = new util.HashMap[String, String]()
- properties1.put("path", tempFilePath1)
- properties1.put("fieldDelim", "#")
- properties1.put("rowDelim", "$")
- val externalCatalogTable1 = ExternalCatalogTable(
- "csv",
- new TableSchema(
- Array("a", "b", "c"),
- Array(
- BasicTypeInfo.INT_TYPE_INFO,
- BasicTypeInfo.LONG_TYPE_INFO,
- BasicTypeInfo.STRING_TYPE_INFO)),
- properties1
- )
-
- val csvRecord2 = Seq(
- "1#1#0#Hallo#1",
- "2#2#1#Hallo Welt#2",
- "2#3#2#Hallo Welt wie#1",
- "3#4#3#Hallo Welt wie gehts?#2",
- "3#5#4#ABC#2",
- "3#6#5#BCD#3",
- "4#7#6#CDE#2",
- "4#8#7#DEF#1",
- "4#9#8#EFG#1",
- "4#10#9#FGH#2",
- "5#11#10#GHI#1",
- "5#12#11#HIJ#3",
- "5#13#12#IJK#3",
- "5#14#13#JKL#2",
- "5#15#14#KLM#2"
- )
- val tempFilePath2 = writeToTempFile(csvRecord2.mkString("$"), "csv-test2", "tmp")
- val properties2 = new util.HashMap[String, String]()
- properties2.put("path", tempFilePath2)
- properties2.put("fieldDelim", "#")
- properties2.put("rowDelim", "$")
- val externalCatalogTable2 = ExternalCatalogTable(
- "csv",
- new TableSchema(
- Array("d", "e", "f", "g", "h"),
- Array(
- BasicTypeInfo.INT_TYPE_INFO,
- BasicTypeInfo.LONG_TYPE_INFO,
- BasicTypeInfo.INT_TYPE_INFO,
- BasicTypeInfo.STRING_TYPE_INFO,
- BasicTypeInfo.LONG_TYPE_INFO)
- ),
- properties2
- )
- val catalog = new InMemoryExternalCatalog("test")
- val db1 = new InMemoryExternalCatalog("db1")
- val db2 = new InMemoryExternalCatalog("db2")
- catalog.createSubCatalog("db1", db1, ignoreIfExists = false)
- catalog.createSubCatalog("db2", db2, ignoreIfExists = false)
-
- // Register the table with both catalogs
- catalog.createTable("tb1", externalCatalogTable1, ignoreIfExists = false)
- db1.createTable("tb1", externalCatalogTable1, ignoreIfExists = false)
- db2.createTable("tb2", externalCatalogTable2, ignoreIfExists = false)
- catalog
- }
-
- private def writeToTempFile(
- contents: String,
- filePrefix: String,
- fileSuffix: String,
- charset: String = "UTF-8"): String = {
- val tempFile = File.createTempFile(filePrefix, fileSuffix)
- tempFile.deleteOnExit()
- val tmpWriter = new OutputStreamWriter(new FileOutputStream(tempFile), charset)
- tmpWriter.write(contents)
- tmpWriter.close()
- tempFile.getAbsolutePath
- }
-
- def getNestedTableSource: BatchTableSource[Person] = {
- new BatchTableSource[Person] {
- override def getDataSet(execEnv: ExecutionEnvironment): DataSet[Person] = {
- val executionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- executionEnvironment.fromCollection(
- util.Arrays.asList(
- new Person("Mike", "Smith", new Address("5th Ave", "New-York")),
- new Person("Sally", "Miller", new Address("Potsdamer Platz", "Berlin")),
- new Person("Bob", "Taylor", new Address("Pearse Street", "Dublin"))),
- getReturnType
- )
- }
-
- override def getReturnType: TypeInformation[Person] = {
- TypeExtractor.getForClass(classOf[Person])
- }
- }
- }
-
- class Person(var firstName: String, var lastName: String, var address: Address) {
- def this() {
- this(null, null, null)
- }
- }
-
- class Address(var street: String, var city: String) {
- def this() {
- this(null, null)
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/LogicalPlanFormatUtils.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/LogicalPlanFormatUtils.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/LogicalPlanFormatUtils.scala
new file mode 100644
index 0000000..92d8100
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/LogicalPlanFormatUtils.scala
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.utils
+
+object LogicalPlanFormatUtils {
+ private val tempPattern = """TMP_\d+""".r
+
+ def formatTempTableId(preStr: String): String = {
+ val str = preStr.replaceAll("ArrayBuffer\\(", "List\\(")
+ val minId = getMinTempTableId(str)
+ tempPattern.replaceAllIn(str, s => "TMP_" + (s.matched.substring(4).toInt - minId) )
+ }
+
+ private def getMinTempTableId(logicalStr: String): Long = {
+ val found = tempPattern.findAllIn(logicalStr).map(s => {
+ s.substring(4).toInt
+ })
+ if (found.isEmpty) {
+ 0L
+ } else {
+ found.min
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
index 6657d50..d99aac1 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
@@ -20,19 +20,16 @@ package org.apache.flink.table.utils
import org.apache.calcite.plan.RelOptUtil
import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.{DataSet => JDataSet}
-import org.apache.flink.table.api.{Table, TableEnvironment}
-import org.apache.flink.table.api.scala._
+import org.apache.flink.api.java.{DataSet => JDataSet, ExecutionEnvironment => JExecutionEnvironment}
import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment}
-import org.apache.flink.api.java.{ExecutionEnvironment => JExecutionEnvironment}
import org.apache.flink.streaming.api.TimeCharacteristic
-import org.apache.flink.table.expressions.Expression
-import org.apache.flink.table.functions.{ScalarFunction, TableFunction}
import org.apache.flink.streaming.api.datastream.{DataStream => JDataStream}
import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment => JStreamExecutionEnvironment}
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
-import org.apache.flink.table.api.scala.batch.utils.LogicalPlanFormatUtils
-import org.apache.flink.table.functions.AggregateFunction
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.{Table, TableEnvironment}
+import org.apache.flink.table.expressions.Expression
+import org.apache.flink.table.functions.{AggregateFunction, ScalarFunction, TableFunction}
import org.junit.Assert.assertEquals
import org.junit.Rule
import org.junit.rules.ExpectedException
http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/UserDefinedFunctionTestUtils.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/UserDefinedFunctionTestUtils.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/UserDefinedFunctionTestUtils.scala
deleted file mode 100644
index deaedc9..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/UserDefinedFunctionTestUtils.scala
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.utils
-
-import java.io.File
-
-import com.google.common.base.Charsets
-import com.google.common.io.Files
-import org.apache.flink.api.scala.ExecutionEnvironment
-import org.apache.flink.configuration.Configuration
-import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-
-object UserDefinedFunctionTestUtils {
-
- def setJobParameters(env: ExecutionEnvironment, parameters: Map[String, String]): Unit = {
- val conf = new Configuration()
- parameters.foreach {
- case (k, v) => conf.setString(k, v)
- }
- env.getConfig.setGlobalJobParameters(conf)
- }
-
- def setJobParameters(env: StreamExecutionEnvironment, parameters: Map[String, String]): Unit = {
- val conf = new Configuration()
- parameters.foreach {
- case (k, v) => conf.setString(k, v)
- }
- env.getConfig.setGlobalJobParameters(conf)
- }
-
- def writeCacheFile(fileName: String, contents: String): String = {
- val tempFile = File.createTempFile(this.getClass.getName + "-" + fileName, "tmp")
- tempFile.deleteOnExit()
- Files.write(contents, tempFile, Charsets.UTF_8)
- tempFile.getAbsolutePath
- }
-}