You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2017/07/13 10:18:27 UTC

[18/44] flink git commit: [FLINK-6617] [table] Restructuring of tests

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/TableProgramsTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/TableProgramsTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/TableProgramsTestBase.scala
new file mode 100644
index 0000000..e2d0473
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/TableProgramsTestBase.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.utils
+
+import org.apache.flink.table.api.TableConfig
+import org.apache.flink.table.runtime.utils.TableProgramsTestBase.TableConfigMode
+import org.apache.flink.test.util.MultipleProgramsTestBase
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+
+class TableProgramsTestBase(
+    mode: TestExecutionMode,
+    tableConfigMode: TableConfigMode)
+  extends MultipleProgramsTestBase(mode) {
+
+  def config: TableConfig = {
+    val conf = new TableConfig
+    tableConfigMode match {
+      case TableProgramsTestBase.NO_NULL =>
+        conf.setNullCheck(false)
+      case _ => // keep default
+    }
+    conf
+  }
+}
+
+object TableProgramsTestBase {
+  case class TableConfigMode(nullCheck: Boolean)
+
+  val DEFAULT = TableConfigMode(nullCheck = true)
+  val NO_NULL = TableConfigMode(nullCheck = false)
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/TimeTestUtil.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/TimeTestUtil.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/TimeTestUtil.scala
new file mode 100644
index 0000000..fab1e4a
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/TimeTestUtil.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.utils
+
+import org.apache.flink.streaming.api.functions.source.SourceFunction
+import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
+import org.apache.flink.streaming.api.watermark.Watermark
+
+object TimeTestUtil {
+  
+  class EventTimeSourceFunction[T](
+      dataWithTimestampList: Seq[Either[(Long, T), Long]]) extends SourceFunction[T] {
+    override def run(ctx: SourceContext[T]): Unit = {
+      dataWithTimestampList.foreach {
+        case Left(t) => ctx.collectWithTimestamp(t._2, t._1)
+        case Right(w) => ctx.emitWatermark(new Watermark(w))
+      }
+    }
+
+    override def cancel(): Unit = ???
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/UserDefinedFunctionTestUtils.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/UserDefinedFunctionTestUtils.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/UserDefinedFunctionTestUtils.scala
new file mode 100644
index 0000000..176afcd
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/UserDefinedFunctionTestUtils.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.utils
+
+import java.io.File
+
+import com.google.common.base.Charsets
+import com.google.common.io.Files
+import org.apache.flink.api.scala.ExecutionEnvironment
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+
+object UserDefinedFunctionTestUtils {
+
+  def setJobParameters(env: ExecutionEnvironment, parameters: Map[String, String]): Unit = {
+    val conf = new Configuration()
+    parameters.foreach {
+      case (k, v) => conf.setString(k, v)
+    }
+    env.getConfig.setGlobalJobParameters(conf)
+  }
+
+  def setJobParameters(env: StreamExecutionEnvironment, parameters: Map[String, String]): Unit = {
+    val conf = new Configuration()
+    parameters.foreach {
+      case (k, v) => conf.setString(k, v)
+    }
+    env.getConfig.setGlobalJobParameters(conf)
+  }
+
+  def writeCacheFile(fileName: String, contents: String): String = {
+    val tempFile = File.createTempFile(this.getClass.getName + "-" + fileName, "tmp")
+    tempFile.deleteOnExit()
+    Files.write(contents, tempFile, Charsets.UTF_8)
+    tempFile.getAbsolutePath
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sinks/validation/TableSinksValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sinks/validation/TableSinksValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sinks/validation/TableSinksValidationTest.scala
deleted file mode 100644
index c800289..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sinks/validation/TableSinksValidationTest.scala
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.sinks.validation
-
-import org.apache.flink.table.api.scala._
-import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
-import org.apache.flink.table.api.{TableEnvironment, TableException}
-import org.apache.flink.table.api.scala.stream.utils.StreamTestData
-import org.apache.flink.table.runtime.datastream.table.TestAppendSink
-import org.junit.Test
-
-class TableSinksValidationTest extends StreamingMultipleProgramsTestBase {
-
-  @Test(expected = classOf[TableException])
-  def testAppendSinkOnUpdatingTable(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val t = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'id, 'num, 'text)
-
-    t.groupBy('text)
-    .select('text, 'id.count, 'num.sum)
-    .writeToSink(new TestAppendSink)
-
-    // must fail because table is not append-only
-    env.execute()
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/TableSourceTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/TableSourceTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/TableSourceTest.scala
deleted file mode 100644
index b01453d..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/TableSourceTest.scala
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.sources
-
-import org.apache.flink.table.api.Types
-import org.apache.flink.table.utils.TableTestUtil._
-import org.junit.{Assert, Test}
-
-class TableSourceTest extends TableSourceTestBase {
-
-  @Test
-  def testTableSourceScanToString(): Unit = {
-    val (tableSource1, _) = filterableTableSource
-    val (tableSource2, _) = filterableTableSource
-    val util = batchTestUtil()
-    val tEnv = util.tableEnv
-
-    tEnv.registerTableSource("table1", tableSource1)
-    tEnv.registerTableSource("table2", tableSource2)
-
-    val table1 = tEnv.scan("table1").where("amount > 2")
-    val table2 = tEnv.scan("table2").where("amount > 2")
-    val result = table1.unionAll(table2)
-
-    val expected = binaryNode(
-      "DataSetUnion",
-      batchFilterableSourceTableNode(
-        "table1",
-        Array("name", "id", "amount", "price"),
-        "'amount > 2"),
-      batchFilterableSourceTableNode(
-        "table2",
-        Array("name", "id", "amount", "price"),
-        "'amount > 2"),
-      term("union", "name, id, amount, price")
-    )
-    util.verifyTable(result, expected)
-  }
-
-  @Test
-  def testCsvTableSourceBuilder(): Unit = {
-    val source1 = CsvTableSource.builder()
-      .path("/path/to/csv")
-      .field("myfield", Types.STRING)
-      .field("myfield2", Types.INT)
-      .quoteCharacter(';')
-      .fieldDelimiter("#")
-      .lineDelimiter("\r\n")
-      .commentPrefix("%%")
-      .ignoreFirstLine()
-      .ignoreParseErrors()
-      .build()
-
-    val source2 = new CsvTableSource(
-      "/path/to/csv",
-      Array("myfield", "myfield2"),
-      Array(Types.STRING, Types.INT),
-      "#",
-      "\r\n",
-      ';',
-      true,
-      "%%",
-      true)
-
-    Assert.assertEquals(source1, source2)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/TableSourceTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/TableSourceTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/TableSourceTestBase.scala
deleted file mode 100644
index d408694..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/TableSourceTestBase.scala
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.sources
-
-import org.apache.flink.table.utils.{CommonTestData, TableTestBase, TestFilterableTableSource}
-
-class TableSourceTestBase extends TableTestBase {
-
-  protected val projectedFields: Array[String] = Array("last", "id", "score")
-  protected val noCalcFields: Array[String] = Array("id", "score", "first")
-
-  def filterableTableSource:(TableSource[_], String) = {
-    val tableSource = new TestFilterableTableSource
-    (tableSource, "filterableTable")
-  }
-
-  def csvTable: (CsvTableSource, String) = {
-    val csvTable = CommonTestData.getCsvTableSource
-    val tableName = "csvTable"
-    (csvTable, tableName)
-  }
-
-  def batchSourceTableNode(sourceName: String, fields: Array[String]): String = {
-    s"BatchTableSourceScan(table=[[$sourceName]], fields=[${fields.mkString(", ")}])"
-  }
-
-  def streamSourceTableNode(sourceName: String, fields: Array[String] ): String = {
-    s"StreamTableSourceScan(table=[[$sourceName]], fields=[${fields.mkString(", ")}])"
-  }
-
-  def batchFilterableSourceTableNode(
-      sourceName: String,
-      fields: Array[String],
-      exp: String): String = {
-    "BatchTableSourceScan(" +
-      s"table=[[$sourceName]], fields=[${fields.mkString(", ")}], source=[filter=[$exp]])"
-  }
-
-  def streamFilterableSourceTableNode(
-      sourceName: String,
-      fields: Array[String],
-      exp: String): String = {
-    "StreamTableSourceScan(" +
-      s"table=[[$sourceName]], fields=[${fields.mkString(", ")}], source=[filter=[$exp]])"
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/batch/TableSourceTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/batch/TableSourceTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/batch/TableSourceTest.scala
deleted file mode 100644
index f670452..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/batch/TableSourceTest.scala
+++ /dev/null
@@ -1,421 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.sources.batch
-
-import org.apache.flink.table.api.Types
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.expressions.utils._
-import org.apache.flink.table.sources.{CsvTableSource, TableSource}
-import org.apache.flink.table.utils.TableTestUtil._
-import org.apache.flink.table.utils.{CommonTestData, TableTestBase, TestFilterableTableSource}
-import org.junit.{Assert, Test}
-
-class TableSourceTest extends TableTestBase {
-
-  private val projectedFields: Array[String] = Array("last", "id", "score")
-  private val noCalcFields: Array[String] = Array("id", "score", "first")
-
-  @Test
-  def testTableSourceScanToString(): Unit = {
-    val (tableSource1, _) = filterableTableSource
-    val (tableSource2, _) = filterableTableSource
-    val util = batchTestUtil()
-    val tEnv = util.tableEnv
-
-    tEnv.registerTableSource("table1", tableSource1)
-    tEnv.registerTableSource("table2", tableSource2)
-
-    val table1 = tEnv.scan("table1").where("amount > 2")
-    val table2 = tEnv.scan("table2").where("amount > 2")
-    val result = table1.unionAll(table2)
-
-    val expected = binaryNode(
-      "DataSetUnion",
-      batchFilterableSourceTableNode(
-        "table1",
-        Array("name", "id", "amount", "price"),
-        "'amount > 2"),
-      batchFilterableSourceTableNode(
-        "table2",
-        Array("name", "id", "amount", "price"),
-        "'amount > 2"),
-      term("union", "name, id, amount, price")
-    )
-    util.verifyTable(result, expected)
-  }
-
-  // batch plan
-
-  @Test
-  def testBatchProjectableSourceScanPlanTableApi(): Unit = {
-    val (tableSource, tableName) = csvTable
-    val util = batchTestUtil()
-    val tEnv = util.tableEnv
-
-    tEnv.registerTableSource(tableName, tableSource)
-
-    val result = tEnv
-      .scan(tableName)
-      .select('last.upperCase(), 'id.floor(), 'score * 2)
-
-    val expected = unaryNode(
-      "DataSetCalc",
-      batchSourceTableNode(tableName, projectedFields),
-      term("select", "UPPER(last) AS _c0", "FLOOR(id) AS _c1", "*(score, 2) AS _c2")
-    )
-
-    util.verifyTable(result, expected)
-  }
-
-  @Test
-  def testBatchProjectableSourceScanPlanSQL(): Unit = {
-    val (tableSource, tableName) = csvTable
-    val util = batchTestUtil()
-
-    util.tableEnv.registerTableSource(tableName, tableSource)
-
-    val sqlQuery = s"SELECT `last`, floor(id), score * 2 FROM $tableName"
-
-    val expected = unaryNode(
-      "DataSetCalc",
-      batchSourceTableNode(tableName, projectedFields),
-      term("select", "last", "FLOOR(id) AS EXPR$1", "*(score, 2) AS EXPR$2")
-    )
-
-    util.verifySql(sqlQuery, expected)
-  }
-
-  @Test
-  def testBatchProjectableSourceScanNoIdentityCalc(): Unit = {
-    val (tableSource, tableName) = csvTable
-    val util = batchTestUtil()
-    val tEnv = util.tableEnv
-
-    tEnv.registerTableSource(tableName, tableSource)
-
-    val result = tEnv
-      .scan(tableName)
-      .select('id, 'score, 'first)
-
-    val expected = batchSourceTableNode(tableName, noCalcFields)
-    util.verifyTable(result, expected)
-  }
-
-  @Test
-  def testBatchFilterableWithoutPushDown(): Unit = {
-    val (tableSource, tableName) = filterableTableSource
-    val util = batchTestUtil()
-    val tEnv = util.tableEnv
-
-    tEnv.registerTableSource(tableName, tableSource)
-
-    val result = tEnv
-        .scan(tableName)
-        .select('price, 'id, 'amount)
-        .where("price * 2 < 32")
-
-    val expected = unaryNode(
-      "DataSetCalc",
-      batchSourceTableNode(
-        tableName,
-        Array("name", "id", "amount", "price")),
-      term("select", "price", "id", "amount"),
-      term("where", "<(*(price, 2), 32)")
-    )
-
-    util.verifyTable(result, expected)
-  }
-
-  @Test
-  def testBatchFilterablePartialPushDown(): Unit = {
-    val (tableSource, tableName) = filterableTableSource
-    val util = batchTestUtil()
-    val tEnv = util.tableEnv
-
-    tEnv.registerTableSource(tableName, tableSource)
-
-    val result = tEnv
-      .scan(tableName)
-      .where("amount > 2 && price * 2 < 32")
-      .select('price, 'name.lowerCase(), 'amount)
-
-    val expected = unaryNode(
-      "DataSetCalc",
-      batchFilterableSourceTableNode(
-        tableName,
-        Array("name", "id", "amount", "price"),
-        "'amount > 2"),
-      term("select", "price", "LOWER(name) AS _c1", "amount"),
-      term("where", "<(*(price, 2), 32)")
-    )
-    util.verifyTable(result, expected)
-  }
-
-  @Test
-  def testBatchFilterableFullyPushedDown(): Unit = {
-    val (tableSource, tableName) = filterableTableSource
-    val util = batchTestUtil()
-    val tEnv = util.tableEnv
-
-    tEnv.registerTableSource(tableName, tableSource)
-
-    val result = tEnv
-        .scan(tableName)
-        .select('price, 'id, 'amount)
-        .where("amount > 2 && amount < 32")
-
-    val expected = unaryNode(
-      "DataSetCalc",
-      batchFilterableSourceTableNode(
-        tableName,
-        Array("name", "id", "amount", "price"),
-        "'amount > 2 && 'amount < 32"),
-      term("select", "price", "id", "amount")
-    )
-    util.verifyTable(result, expected)
-  }
-
-  @Test
-  def testBatchFilterableWithUnconvertedExpression(): Unit = {
-    val (tableSource, tableName) = filterableTableSource
-    val util = batchTestUtil()
-    val tEnv = util.tableEnv
-
-    tEnv.registerTableSource(tableName, tableSource)
-
-    val result = tEnv
-        .scan(tableName)
-        .select('price, 'id, 'amount)
-        .where("amount > 2 && (amount < 32 || amount.cast(LONG) > 10)") // cast can not be converted
-
-    val expected = unaryNode(
-      "DataSetCalc",
-      batchFilterableSourceTableNode(
-        tableName,
-        Array("name", "id", "amount", "price"),
-        "'amount > 2"),
-      term("select", "price", "id", "amount"),
-      term("where", "OR(<(amount, 32), >(CAST(amount), 10))")
-    )
-    util.verifyTable(result, expected)
-  }
-
-  @Test
-  def testBatchFilterableWithUDF(): Unit = {
-    val (tableSource, tableName) = filterableTableSource
-    val util = batchTestUtil()
-    val tEnv = util.tableEnv
-
-    tEnv.registerTableSource(tableName, tableSource)
-    val func = Func0
-    tEnv.registerFunction("func0", func)
-
-    val result = tEnv
-        .scan(tableName)
-        .select('price, 'id, 'amount)
-        .where("amount > 2 && func0(amount) < 32")
-
-    val expected = unaryNode(
-      "DataSetCalc",
-      batchFilterableSourceTableNode(
-        tableName,
-        Array("name", "id", "amount", "price"),
-        "'amount > 2"),
-      term("select", "price", "id", "amount"),
-      term("where", s"<(${func.functionIdentifier}(amount), 32)")
-    )
-
-    util.verifyTable(result, expected)
-  }
-
-  // stream plan
-
-  @Test
-  def testStreamProjectableSourceScanPlanTableApi(): Unit = {
-    val (tableSource, tableName) = csvTable
-    val util = streamTestUtil()
-    val tEnv = util.tableEnv
-
-    tEnv.registerTableSource(tableName, tableSource)
-
-    val result = tEnv
-      .scan(tableName)
-      .select('last, 'id.floor(), 'score * 2)
-
-    val expected = unaryNode(
-      "DataStreamCalc",
-      streamSourceTableNode(tableName, projectedFields),
-      term("select", "last", "FLOOR(id) AS _c1", "*(score, 2) AS _c2")
-    )
-
-    util.verifyTable(result, expected)
-  }
-
-  @Test
-  def testStreamProjectableSourceScanPlanSQL(): Unit = {
-    val (tableSource, tableName) = csvTable
-    val util = streamTestUtil()
-
-    util.tableEnv.registerTableSource(tableName, tableSource)
-
-    val sqlQuery = s"SELECT `last`, floor(id), score * 2 FROM $tableName"
-
-    val expected = unaryNode(
-      "DataStreamCalc",
-      streamSourceTableNode(tableName, projectedFields),
-      term("select", "last", "FLOOR(id) AS EXPR$1", "*(score, 2) AS EXPR$2")
-    )
-
-    util.verifySql(sqlQuery, expected)
-  }
-
-  @Test
-  def testStreamProjectableSourceScanNoIdentityCalc(): Unit = {
-    val (tableSource, tableName) = csvTable
-    val util = streamTestUtil()
-    val tEnv = util.tableEnv
-
-    tEnv.registerTableSource(tableName, tableSource)
-
-    val result = tEnv
-      .scan(tableName)
-      .select('id, 'score, 'first)
-
-    val expected = streamSourceTableNode(tableName, noCalcFields)
-    util.verifyTable(result, expected)
-  }
-
-  @Test
-  def testStreamFilterableSourceScanPlanTableApi(): Unit = {
-    val (tableSource, tableName) = filterableTableSource
-    val util = streamTestUtil()
-    val tEnv = util.tableEnv
-
-    tEnv.registerTableSource(tableName, tableSource)
-
-    val result = tEnv
-      .scan(tableName)
-      .select('price, 'id, 'amount)
-      .where("amount > 2 && price * 2 < 32")
-
-    val expected = unaryNode(
-      "DataStreamCalc",
-      streamFilterableSourceTableNode(
-        tableName,
-        Array("name", "id", "amount", "price"),
-        "'amount > 2"),
-      term("select", "price", "id", "amount"),
-      term("where", "<(*(price, 2), 32)")
-    )
-
-    util.verifyTable(result, expected)
-  }
-
-  // csv builder
-
-  @Test
-  def testCsvTableSourceBuilder(): Unit = {
-    val source1 = CsvTableSource.builder()
-      .path("/path/to/csv")
-      .field("myfield", Types.STRING)
-      .field("myfield2", Types.INT)
-      .quoteCharacter(';')
-      .fieldDelimiter("#")
-      .lineDelimiter("\r\n")
-      .commentPrefix("%%")
-      .ignoreFirstLine()
-      .ignoreParseErrors()
-      .build()
-
-    val source2 = new CsvTableSource(
-      "/path/to/csv",
-      Array("myfield", "myfield2"),
-      Array(Types.STRING, Types.INT),
-      "#",
-      "\r\n",
-      ';',
-      true,
-      "%%",
-      true)
-
-    Assert.assertEquals(source1, source2)
-  }
-
-  @Test(expected = classOf[IllegalArgumentException])
-  def testCsvTableSourceBuilderWithNullPath(): Unit = {
-    CsvTableSource.builder()
-      .field("myfield", Types.STRING)
-      // should fail, path is not defined
-      .build()
-  }
-
-  @Test(expected = classOf[IllegalArgumentException])
-  def testCsvTableSourceBuilderWithDuplicateFieldName(): Unit = {
-    CsvTableSource.builder()
-      .path("/path/to/csv")
-      .field("myfield", Types.STRING)
-      // should fail, field name must no be duplicate
-      .field("myfield", Types.INT)
-  }
-
-  @Test(expected = classOf[IllegalArgumentException])
-  def testCsvTableSourceBuilderWithEmptyField(): Unit = {
-    CsvTableSource.builder()
-      .path("/path/to/csv")
-      // should fail, field can be empty
-      .build()
-  }
-
-  // utils
-
-  def filterableTableSource:(TableSource[_], String) = {
-    val tableSource = new TestFilterableTableSource
-    (tableSource, "filterableTable")
-  }
-
-  def csvTable: (CsvTableSource, String) = {
-    val csvTable = CommonTestData.getCsvTableSource
-    val tableName = "csvTable"
-    (csvTable, tableName)
-  }
-
-  def batchSourceTableNode(sourceName: String, fields: Array[String]): String = {
-    s"BatchTableSourceScan(table=[[$sourceName]], fields=[${fields.mkString(", ")}])"
-  }
-
-  def streamSourceTableNode(sourceName: String, fields: Array[String] ): String = {
-    s"StreamTableSourceScan(table=[[$sourceName]], fields=[${fields.mkString(", ")}])"
-  }
-
-  def batchFilterableSourceTableNode(
-      sourceName: String,
-      fields: Array[String],
-      exp: String): String = {
-    "BatchTableSourceScan(" +
-      s"table=[[$sourceName]], fields=[${fields.mkString(", ")}], source=[filter=[$exp]])"
-  }
-
-  def streamFilterableSourceTableNode(
-      sourceName: String,
-      fields: Array[String],
-      exp: String): String = {
-    "StreamTableSourceScan(" +
-      s"table=[[$sourceName]], fields=[${fields.mkString(", ")}], source=[filter=[$exp]])"
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/stream/sql/TableSourceTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/stream/sql/TableSourceTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/stream/sql/TableSourceTest.scala
deleted file mode 100644
index e6d6480..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/stream/sql/TableSourceTest.scala
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.sources.stream.sql
-
-import org.apache.flink.table.sources.TableSourceTestBase
-import org.apache.flink.table.utils.TableTestUtil._
-import org.junit.Test
-
-class TableSourceTest extends TableSourceTestBase {
-
-  @Test
-  def testProjectableSourceScanPlan(): Unit = {
-    val (tableSource, tableName) = csvTable
-    val util = streamTestUtil()
-
-    util.tableEnv.registerTableSource(tableName, tableSource)
-
-    val sqlQuery = s"SELECT `last`, floor(id), score * 2 FROM $tableName"
-
-    val expected = unaryNode(
-      "DataStreamCalc",
-      streamSourceTableNode(tableName, projectedFields),
-      term("select", "last", "FLOOR(id) AS EXPR$1", "*(score, 2) AS EXPR$2")
-    )
-
-    util.verifySql(sqlQuery, expected)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/stream/table/TableSourceTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/stream/table/TableSourceTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/stream/table/TableSourceTest.scala
deleted file mode 100644
index c43913b..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/stream/table/TableSourceTest.scala
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.sources.stream.table
-
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.sources.TableSourceTestBase
-import org.apache.flink.table.utils.TableTestUtil._
-import org.junit.Test
-
-class TableSourceTest extends TableSourceTestBase {
-
-  @Test
-  def testProjectableSourceScanPlan(): Unit = {
-    val (tableSource, tableName) = csvTable
-    val util = streamTestUtil()
-    val tEnv = util.tableEnv
-
-    tEnv.registerTableSource(tableName, tableSource)
-
-    val result = tEnv
-      .scan(tableName)
-      .select('last, 'id.floor(), 'score * 2)
-
-    val expected = unaryNode(
-      "DataStreamCalc",
-      streamSourceTableNode(tableName, projectedFields),
-      term("select", "last", "FLOOR(id) AS _c1", "*(score, 2) AS _c2")
-    )
-
-    util.verifyTable(result, expected)
-  }
-
-  @Test
-  def testProjectableSourceScanNoIdentityCalc(): Unit = {
-    val (tableSource, tableName) = csvTable
-    val util = streamTestUtil()
-    val tEnv = util.tableEnv
-
-    tEnv.registerTableSource(tableName, tableSource)
-
-    val result = tEnv
-      .scan(tableName)
-      .select('id, 'score, 'first)
-
-    val expected = streamSourceTableNode(tableName, noCalcFields)
-    util.verifyTable(result, expected)
-  }
-
-  @Test
-  def testFilterableSourceScanPlan(): Unit = {
-    val (tableSource, tableName) = filterableTableSource
-    val util = streamTestUtil()
-    val tEnv = util.tableEnv
-
-    tEnv.registerTableSource(tableName, tableSource)
-
-    val result = tEnv
-      .scan(tableName)
-      .select('price, 'id, 'amount)
-      .where("amount > 2 && price * 2 < 32")
-
-    val expected = unaryNode(
-      "DataStreamCalc",
-      streamFilterableSourceTableNode(
-        tableName,
-        Array("name", "id", "amount", "price"),
-        "'amount > 2"),
-      term("select", "price", "id", "amount"),
-      term("where", "<(*(price, 2), 32)")
-    )
-
-    util.verifyTable(result, expected)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/validation/TableSourceBalidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/validation/TableSourceBalidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/validation/TableSourceBalidationTest.scala
deleted file mode 100644
index e575ab8..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/validation/TableSourceBalidationTest.scala
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.sources.validation
-
-import org.apache.flink.table.api.Types
-import org.apache.flink.table.sources.CsvTableSource
-import org.apache.flink.table.sources.TableSourceTestBase
-import org.junit.Test
-
-class TableSourceBalidationTest extends TableSourceTestBase {
-
-  @Test(expected = classOf[IllegalArgumentException])
-  def testCsvTableSourceBuilderWithNullPath(): Unit = {
-    CsvTableSource.builder()
-      .field("myfield", Types.STRING)
-      // should fail, path is not defined
-      .build()
-  }
-
-  @Test(expected = classOf[IllegalArgumentException])
-  def testCsvTableSourceBuilderWithDuplicateFieldName(): Unit = {
-    CsvTableSource.builder()
-      .path("/path/to/csv")
-      .field("myfield", Types.STRING)
-      // should fail, field name must no be duplicate
-      .field("myfield", Types.INT)
-  }
-
-  @Test(expected = classOf[IllegalArgumentException])
-  def testCsvTableSourceBuilderWithEmptyField(): Unit = {
-    CsvTableSource.builder()
-      .path("/path/to/csv")
-      // should fail, field can be empty
-      .build()
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/CommonTestData.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/CommonTestData.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/CommonTestData.scala
deleted file mode 100644
index 6a5c52f..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/CommonTestData.scala
+++ /dev/null
@@ -1,178 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.utils
-
-import java.io.{File, FileOutputStream, OutputStreamWriter}
-import java.util
-
-import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
-import org.apache.flink.api.java.typeutils.TypeExtractor
-import org.apache.flink.api.java.{DataSet, ExecutionEnvironment}
-import org.apache.flink.table.api.TableSchema
-import org.apache.flink.table.sources.{BatchTableSource, CsvTableSource}
-import org.apache.flink.table.catalog._
-
-object CommonTestData {
-
-  def getCsvTableSource: CsvTableSource = {
-    val csvRecords = Seq(
-      "First#Id#Score#Last",
-      "Mike#1#12.3#Smith",
-      "Bob#2#45.6#Taylor",
-      "Sam#3#7.89#Miller",
-      "Peter#4#0.12#Smith",
-      "% Just a comment",
-      "Liz#5#34.5#Williams",
-      "Sally#6#6.78#Miller",
-      "Alice#7#90.1#Smith",
-      "Kelly#8#2.34#Williams"
-    )
-
-    val tempFilePath = writeToTempFile(csvRecords.mkString("$"), "csv-test", "tmp")
-    new CsvTableSource(
-      tempFilePath,
-      Array("first", "id", "score", "last"),
-      Array(
-        BasicTypeInfo.STRING_TYPE_INFO,
-        BasicTypeInfo.INT_TYPE_INFO,
-        BasicTypeInfo.DOUBLE_TYPE_INFO,
-        BasicTypeInfo.STRING_TYPE_INFO
-      ),
-      fieldDelim = "#",
-      rowDelim = "$",
-      ignoreFirstLine = true,
-      ignoreComments = "%"
-    )
-  }
-
-  def getInMemoryTestCatalog: ExternalCatalog = {
-    val csvRecord1 = Seq(
-      "1#1#Hi",
-      "2#2#Hello",
-      "3#2#Hello world"
-    )
-    val tempFilePath1 = writeToTempFile(csvRecord1.mkString("$"), "csv-test1", "tmp")
-    val properties1 = new util.HashMap[String, String]()
-    properties1.put("path", tempFilePath1)
-    properties1.put("fieldDelim", "#")
-    properties1.put("rowDelim", "$")
-    val externalCatalogTable1 = ExternalCatalogTable(
-      "csv",
-      new TableSchema(
-        Array("a", "b", "c"),
-        Array(
-          BasicTypeInfo.INT_TYPE_INFO,
-          BasicTypeInfo.LONG_TYPE_INFO,
-          BasicTypeInfo.STRING_TYPE_INFO)),
-      properties1
-    )
-
-    val csvRecord2 = Seq(
-      "1#1#0#Hallo#1",
-      "2#2#1#Hallo Welt#2",
-      "2#3#2#Hallo Welt wie#1",
-      "3#4#3#Hallo Welt wie gehts?#2",
-      "3#5#4#ABC#2",
-      "3#6#5#BCD#3",
-      "4#7#6#CDE#2",
-      "4#8#7#DEF#1",
-      "4#9#8#EFG#1",
-      "4#10#9#FGH#2",
-      "5#11#10#GHI#1",
-      "5#12#11#HIJ#3",
-      "5#13#12#IJK#3",
-      "5#14#13#JKL#2",
-      "5#15#14#KLM#2"
-    )
-    val tempFilePath2 = writeToTempFile(csvRecord2.mkString("$"), "csv-test2", "tmp")
-    val properties2 = new util.HashMap[String, String]()
-    properties2.put("path", tempFilePath2)
-    properties2.put("fieldDelim", "#")
-    properties2.put("rowDelim", "$")
-    val externalCatalogTable2 = ExternalCatalogTable(
-      "csv",
-      new TableSchema(
-        Array("d", "e", "f", "g", "h"),
-        Array(
-          BasicTypeInfo.INT_TYPE_INFO,
-          BasicTypeInfo.LONG_TYPE_INFO,
-          BasicTypeInfo.INT_TYPE_INFO,
-          BasicTypeInfo.STRING_TYPE_INFO,
-          BasicTypeInfo.LONG_TYPE_INFO)
-      ),
-      properties2
-    )
-    val catalog = new InMemoryExternalCatalog("test")
-    val db1 = new InMemoryExternalCatalog("db1")
-    val db2 = new InMemoryExternalCatalog("db2")
-    catalog.createSubCatalog("db1", db1, ignoreIfExists = false)
-    catalog.createSubCatalog("db2", db2, ignoreIfExists = false)
-
-    // Register the table with both catalogs
-    catalog.createTable("tb1", externalCatalogTable1, ignoreIfExists = false)
-    db1.createTable("tb1", externalCatalogTable1, ignoreIfExists = false)
-    db2.createTable("tb2", externalCatalogTable2, ignoreIfExists = false)
-    catalog
-  }
-
-  private def writeToTempFile(
-      contents: String,
-      filePrefix: String,
-      fileSuffix: String,
-      charset: String = "UTF-8"): String = {
-    val tempFile = File.createTempFile(filePrefix, fileSuffix)
-    tempFile.deleteOnExit()
-    val tmpWriter = new OutputStreamWriter(new FileOutputStream(tempFile), charset)
-    tmpWriter.write(contents)
-    tmpWriter.close()
-    tempFile.getAbsolutePath
-  }
-
-  def getNestedTableSource: BatchTableSource[Person] = {
-    new BatchTableSource[Person] {
-      override def getDataSet(execEnv: ExecutionEnvironment): DataSet[Person] = {
-        val executionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-        executionEnvironment.fromCollection(
-          util.Arrays.asList(
-            new Person("Mike", "Smith", new Address("5th Ave", "New-York")),
-            new Person("Sally", "Miller", new Address("Potsdamer Platz", "Berlin")),
-            new Person("Bob", "Taylor", new Address("Pearse Street", "Dublin"))),
-          getReturnType
-        )
-      }
-
-      override def getReturnType: TypeInformation[Person] = {
-        TypeExtractor.getForClass(classOf[Person])
-      }
-    }
-  }
-
-  class Person(var firstName: String, var lastName: String, var address: Address) {
-    def this() {
-      this(null, null, null)
-    }
-  }
-
-  class Address(var street: String, var city: String) {
-    def this() {
-      this(null, null)
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/LogicalPlanFormatUtils.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/LogicalPlanFormatUtils.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/LogicalPlanFormatUtils.scala
new file mode 100644
index 0000000..92d8100
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/LogicalPlanFormatUtils.scala
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.utils
+
+object LogicalPlanFormatUtils {
+  private val tempPattern = """TMP_\d+""".r
+
+  def formatTempTableId(preStr: String): String = {
+    val str = preStr.replaceAll("ArrayBuffer\\(", "List\\(")
+    val minId = getMinTempTableId(str)
+    tempPattern.replaceAllIn(str, s => "TMP_" + (s.matched.substring(4).toInt - minId) )
+  }
+
+  private def getMinTempTableId(logicalStr: String): Long = {
+    val found = tempPattern.findAllIn(logicalStr).map(s => {
+      s.substring(4).toInt
+    })
+    if (found.isEmpty) {
+      0L
+    } else {
+      found.min
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
index 6657d50..d99aac1 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
@@ -20,19 +20,16 @@ package org.apache.flink.table.utils
 
 import org.apache.calcite.plan.RelOptUtil
 import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.{DataSet => JDataSet}
-import org.apache.flink.table.api.{Table, TableEnvironment}
-import org.apache.flink.table.api.scala._
+import org.apache.flink.api.java.{DataSet => JDataSet, ExecutionEnvironment => JExecutionEnvironment}
 import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment}
-import org.apache.flink.api.java.{ExecutionEnvironment => JExecutionEnvironment}
 import org.apache.flink.streaming.api.TimeCharacteristic
-import org.apache.flink.table.expressions.Expression
-import org.apache.flink.table.functions.{ScalarFunction, TableFunction}
 import org.apache.flink.streaming.api.datastream.{DataStream => JDataStream}
 import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment => JStreamExecutionEnvironment}
 import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
-import org.apache.flink.table.api.scala.batch.utils.LogicalPlanFormatUtils
-import org.apache.flink.table.functions.AggregateFunction
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.{Table, TableEnvironment}
+import org.apache.flink.table.expressions.Expression
+import org.apache.flink.table.functions.{AggregateFunction, ScalarFunction, TableFunction}
 import org.junit.Assert.assertEquals
 import org.junit.Rule
 import org.junit.rules.ExpectedException

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/UserDefinedFunctionTestUtils.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/UserDefinedFunctionTestUtils.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/UserDefinedFunctionTestUtils.scala
deleted file mode 100644
index deaedc9..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/UserDefinedFunctionTestUtils.scala
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.utils
-
-import java.io.File
-
-import com.google.common.base.Charsets
-import com.google.common.io.Files
-import org.apache.flink.api.scala.ExecutionEnvironment
-import org.apache.flink.configuration.Configuration
-import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-
-object UserDefinedFunctionTestUtils {
-
-  def setJobParameters(env: ExecutionEnvironment, parameters: Map[String, String]): Unit = {
-    val conf = new Configuration()
-    parameters.foreach {
-      case (k, v) => conf.setString(k, v)
-    }
-    env.getConfig.setGlobalJobParameters(conf)
-  }
-
-  def setJobParameters(env: StreamExecutionEnvironment, parameters: Map[String, String]): Unit = {
-    val conf = new Configuration()
-    parameters.foreach {
-      case (k, v) => conf.setString(k, v)
-    }
-    env.getConfig.setGlobalJobParameters(conf)
-  }
-
-  def writeCacheFile(fileName: String, contents: String): String = {
-    val tempFile = File.createTempFile(this.getClass.getName + "-" + fileName, "tmp")
-    tempFile.deleteOnExit()
-    Files.write(contents, tempFile, Charsets.UTF_8)
-    tempFile.getAbsolutePath
-  }
-}