You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2017/07/13 10:18:46 UTC

[37/44] flink git commit: [FLINK-6617] [table] Restructuring of tests

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/JoinValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/JoinValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/JoinValidationTest.scala
deleted file mode 100644
index a377ca3..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/JoinValidationTest.scala
+++ /dev/null
@@ -1,240 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.api.scala.batch.table.validation
-
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.api.{TableEnvironment, TableException, ValidationException}
-import org.apache.flink.table.utils.TableTestBase
-import org.apache.flink.types.Row
-import org.junit._
-
-class JoinValidationTest extends TableTestBase {
-
-  @Test(expected = classOf[ValidationException])
-  def testJoinNonExistingKey(): Unit = {
-    val util = batchTestUtil()
-    val ds1 = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
-    val ds2 = util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'h)
-
-    ds1.join(ds2)
-      // must fail. Field 'foo does not exist
-      .where('foo === 'e)
-      .select('c, 'g)
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testJoinWithNonMatchingKeyTypes(): Unit = {
-    val util = batchTestUtil()
-    val ds1 = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
-    val ds2 = util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'h)
-
-    ds1.join(ds2)
-      // must fail. Field 'a is Int, and 'g is String
-      .where('a === 'g)
-      .select('c, 'g).collect()
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testJoinWithAmbiguousFields(): Unit = {
-    val util = batchTestUtil()
-    val ds1 = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
-    val ds2 = util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'c)
-
-    ds1.join(ds2)
-      // must fail. Both inputs share the same field 'c
-      .where('a === 'd)
-      .select('c, 'g)
-  }
-
-  @Test(expected = classOf[TableException])
-  def testNoEqualityJoinPredicate1(): Unit = {
-    val util = batchTestUtil()
-    val ds1 = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
-    val ds2 = util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'h)
-
-    ds1.join(ds2)
-      // must fail. No equality join predicate
-      .where('d === 'f)
-      .select('c, 'g).collect()
-  }
-
-  @Test(expected = classOf[TableException])
-  def testNoEqualityJoinPredicate2(): Unit = {
-    val util = batchTestUtil()
-    val ds1 = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
-    val ds2 = util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'h)
-
-    ds1.join(ds2)
-      // must fail. No equality join predicate
-      .where('a < 'd)
-      .select('c, 'g).collect()
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testJoinTablesFromDifferentEnvs(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv1 = TableEnvironment.getTableEnvironment(env)
-    val tEnv2 = TableEnvironment.getTableEnvironment(env)
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env)
-    val in1 = tEnv1.fromDataSet(ds1, 'a, 'b, 'c)
-    val in2 = tEnv2.fromDataSet(ds2, 'd, 'e, 'f, 'g, 'c)
-
-    // Must fail. Tables are bound to different TableEnvironments.
-    in1.join(in2).where('b === 'e).select('c, 'g)
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testNoJoinCondition(): Unit = {
-    val util = batchTestUtil()
-    val ds1 = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
-    val ds2 = util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'h)
-
-    ds2.leftOuterJoin(ds1, 'b === 'd && 'b < 3).select('c, 'g)
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testNoEquiJoin(): Unit = {
-    val util = batchTestUtil()
-    val ds1 = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
-    val ds2 = util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'h)
-
-    ds2.leftOuterJoin(ds1, 'b < 'd).select('c, 'g)
-  }
-
-  @Test(expected = classOf[ValidationException])
-  @throws[Exception]
-  def testJoinNonExistingKeyJava() {
-    val util = batchTestUtil()
-    val ds1 = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
-    val ds2 = util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'h)
-
-    // Must fail. Field foo does not exist.
-    ds1.join(ds2).where("foo === e").select("c, g")
-  }
-
-  @Test(expected = classOf[ValidationException])
-  @throws[Exception]
-  def testJoinWithNonMatchingKeyTypesJava() {
-    val util = batchTestUtil()
-    val ds1 = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
-    val ds2 = util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'c)
-    ds1.join(ds2)
-      // Must fail. Types of join fields are not compatible (Integer and String)
-    .where("a === g").select("c, g")
-  }
-
-  @Test(expected = classOf[ValidationException])
-  @throws[Exception]
-  def testJoinWithAmbiguousFieldsJava() {
-    val util = batchTestUtil()
-    val ds1 = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
-    val ds2 = util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'c)
-    // Must fail. Join input have overlapping field names.
-    ds1.join(ds2).where("a === d").select("c, g")
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testJoinTablesFromDifferentEnvsJava() {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv1 = TableEnvironment.getTableEnvironment(env)
-    val tEnv2 = TableEnvironment.getTableEnvironment(env)
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env)
-    val in1 = tEnv1.fromDataSet(ds1, 'a, 'b, 'c)
-    val in2 = tEnv2.fromDataSet(ds2, 'd, 'e, 'f, 'g, 'c)
-    // Must fail. Tables are bound to different TableEnvironments.
-    in1.join(in2).where("a === d").select("g.count")
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testRightJoinWithNonEquiJoinPredicate(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    tEnv.getConfig.setNullCheck(true)
-
-    val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
-
-    ds1.rightOuterJoin(ds2, 'a === 'd && 'b < 'h).select('c, 'g).toDataSet[Row].collect()
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testLeftJoinWithLocalPredicate(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    tEnv.getConfig.setNullCheck(true)
-
-    val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
-
-    ds1.leftOuterJoin(ds2, 'a === 'd && 'b < 3).select('c, 'g).toDataSet[Row].collect()
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testFullJoinWithLocalPredicate(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    tEnv.getConfig.setNullCheck(true)
-
-    val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
-
-    ds1.fullOuterJoin(ds2, 'a === 'd && 'b < 3).select('c, 'g).toDataSet[Row].collect()
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testRightJoinWithLocalPredicate(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    tEnv.getConfig.setNullCheck(true)
-
-    val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
-
-    ds1.rightOuterJoin(ds2, 'a === 'd && 'b < 3).select('c, 'g).toDataSet[Row].collect()
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testLeftJoinWithNonEquiJoinPredicate(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    tEnv.getConfig.setNullCheck(true)
-
-    val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
-
-    ds1.leftOuterJoin(ds2, 'a === 'd && 'b < 'h).select('c, 'g).toDataSet[Row].collect()
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testFullJoinWithNonEquiJoinPredicate(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    tEnv.getConfig.setNullCheck(true)
-
-    val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
-
-    ds1.fullOuterJoin(ds2, 'a === 'd && 'b < 'h).select('c, 'g).toDataSet[Row].collect()
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/SetOperatorsValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/SetOperatorsValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/SetOperatorsValidationTest.scala
deleted file mode 100644
index c5586bf..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/SetOperatorsValidationTest.scala
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.api.scala.batch.table.validation
-
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.api.{TableEnvironment, ValidationException}
-import org.apache.flink.table.utils.TableTestBase
-import org.junit._
-
-class SetOperatorsValidationTest extends TableTestBase {
-
-  @Test(expected = classOf[ValidationException])
-  def testUnionDifferentColumnSize(): Unit = {
-    val util = batchTestUtil()
-    val ds1 = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
-    val ds2 = util.addTable[(Int, Long, Int, String, Long)]("Table5", 'a, 'b, 'd, 'c, 'e)
-
-    // must fail. Union inputs have different column size.
-    ds1.unionAll(ds2)
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testUnionDifferentFieldTypes(): Unit = {
-    val util = batchTestUtil()
-    val ds1 = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
-    val ds2 = util.addTable[(Int, Long, Int, String, Long)]("Table5", 'a, 'b, 'c, 'd, 'e)
-      .select('a, 'b, 'c)
-
-    // must fail. Union inputs have different field types.
-    ds1.unionAll(ds2)
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testUnionTablesFromDifferentEnvs(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv1 = TableEnvironment.getTableEnvironment(env)
-    val tEnv2 = TableEnvironment.getTableEnvironment(env)
-
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv1, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv2, 'a, 'b, 'c)
-
-    // Must fail. Tables are bound to different TableEnvironments.
-    ds1.unionAll(ds2).select('c)
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testMinusDifferentFieldTypes(): Unit = {
-    val util = batchTestUtil()
-    val ds1 = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
-    val ds2 = util.addTable[(Int, Long, Int, String, Long)]("Table5", 'a, 'b, 'c, 'd, 'e)
-      .select('a, 'b, 'c)
-
-    // must fail. Minus inputs have different field types.
-    ds1.minus(ds2)
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testMinusAllTablesFromDifferentEnvs(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv1 = TableEnvironment.getTableEnvironment(env)
-    val tEnv2 = TableEnvironment.getTableEnvironment(env)
-
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv1, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv2, 'a, 'b, 'c)
-
-    // Must fail. Tables are bound to different TableEnvironments.
-    ds1.minusAll(ds2).select('c)
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testIntersectWithDifferentFieldTypes(): Unit = {
-    val util = batchTestUtil()
-    val ds1 = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
-    val ds2 = util.addTable[(Int, Long, Int, String, Long)]("Table5", 'a, 'b, 'c, 'd, 'e)
-      .select('a, 'b, 'c)
-
-    // must fail. Intersect inputs have different field types.
-    ds1.intersect(ds2)
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testIntersectTablesFromDifferentEnvs(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv1 = TableEnvironment.getTableEnvironment(env)
-    val tEnv2 = TableEnvironment.getTableEnvironment(env)
-
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv1, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv2, 'a, 'b, 'c)
-
-    // Must fail. Tables are bound to different TableEnvironments.
-    ds1.intersect(ds2).select('c)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/SortValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/SortValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/SortValidationTest.scala
deleted file mode 100644
index a82d4d6..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/SortValidationTest.scala
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.api.scala.batch.table.validation
-
-import org.apache.flink.api.scala._
-import org.apache.flink.table.api.ValidationException
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.utils.TableTestBase
-import org.apache.flink.types.Row
-import org.junit._
-
-class SortValidationTest extends TableTestBase {
-
-  @Test(expected = classOf[ValidationException])
-  def testFetchWithoutOrder(): Unit = {
-    val util = batchTestUtil()
-    val ds = util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
-
-    val t = ds.limit(0, 5).toDataSet[Row].collect()
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/utils/LogicalPlanFormatUtils.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/utils/LogicalPlanFormatUtils.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/utils/LogicalPlanFormatUtils.scala
deleted file mode 100644
index 4a69271..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/utils/LogicalPlanFormatUtils.scala
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.api.scala.batch.utils
-
-object LogicalPlanFormatUtils {
-  private val tempPattern = """TMP_\d+""".r
-
-  def formatTempTableId(preStr: String): String = {
-    val str = preStr.replaceAll("ArrayBuffer\\(", "List\\(")
-    val minId = getMinTempTableId(str)
-    tempPattern.replaceAllIn(str, s => "TMP_" + (s.matched.substring(4).toInt - minId) )
-  }
-
-  private def getMinTempTableId(logicalStr: String): Long = {
-    val found = tempPattern.findAllIn(logicalStr).map(s => {
-      s.substring(4).toInt
-    })
-    if (found.isEmpty) {
-      0L
-    } else {
-      found.min
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/utils/SortTestUtils.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/utils/SortTestUtils.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/utils/SortTestUtils.scala
deleted file mode 100644
index ef425d3..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/utils/SortTestUtils.scala
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.api.scala.batch.utils
-
-object SortTestUtils {
-
-  val tupleDataSetStrings = List((1, 1L, "Hi")
-    ,(2, 2L, "Hello")
-    ,(3, 2L, "Hello world")
-    ,(4, 3L, "Hello world, how are you?")
-    ,(5, 3L, "I am fine.")
-    ,(6, 3L, "Luke Skywalker")
-    ,(7, 4L, "Comment#1")
-    ,(8, 4L, "Comment#2")
-    ,(9, 4L, "Comment#3")
-    ,(10, 4L, "Comment#4")
-    ,(11, 5L, "Comment#5")
-    ,(12, 5L, "Comment#6")
-    ,(13, 5L, "Comment#7")
-    ,(14, 5L, "Comment#8")
-    ,(15, 5L, "Comment#9")
-    ,(16, 6L, "Comment#10")
-    ,(17, 6L, "Comment#11")
-    ,(18, 6L, "Comment#12")
-    ,(19, 6L, "Comment#13")
-    ,(20, 6L, "Comment#14")
-    ,(21, 6L, "Comment#15"))
-
-  def sortExpectedly(dataSet: List[Product])
-                    (implicit ordering: Ordering[Product]): String = 
-    sortExpectedly(dataSet, 0, dataSet.length)
-
-  def sortExpectedly(dataSet: List[Product], start: Int, end: Int)
-                    (implicit ordering: Ordering[Product]): String = {
-    dataSet
-      .sorted(ordering)
-      .slice(start, end)
-      .mkString("\n")
-      .replaceAll("[\\(\\)]", "")
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/utils/TableProgramsClusterTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/utils/TableProgramsClusterTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/utils/TableProgramsClusterTestBase.scala
deleted file mode 100644
index 712b818..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/utils/TableProgramsClusterTestBase.scala
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.api.scala.batch.utils
-
-import java.util
-
-import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
-import org.apache.flink.table.runtime.dataset.table.DataSetWindowAggregateITCase
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
-import org.junit.runners.Parameterized
-
-import scala.collection.JavaConversions._
-
-/**
-  * This test base provides full cluster-like integration tests for batch programs. Only runtime
-  * operator tests should use this test base as they are expensive.
-  * (e.g. [[DataSetWindowAggregateITCase]])
-  */
-class TableProgramsClusterTestBase(
-    executionMode: TestExecutionMode,
-    tableConfigMode: TableConfigMode)
-    extends TableProgramsTestBase(executionMode, tableConfigMode) {
-}
-
-object TableProgramsClusterTestBase {
-
-  @Parameterized.Parameters(name = "Execution mode = {0}, Table config = {1}")
-  def parameters(): util.Collection[Array[java.lang.Object]] = {
-    Seq[Array[AnyRef]](
-      Array(TestExecutionMode.CLUSTER, TableProgramsTestBase.DEFAULT),
-      Array(TestExecutionMode.CLUSTER_OBJECT_REUSE, TableProgramsTestBase.DEFAULT)
-    )
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/utils/TableProgramsCollectionTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/utils/TableProgramsCollectionTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/utils/TableProgramsCollectionTestBase.scala
deleted file mode 100644
index cd7c12d..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/utils/TableProgramsCollectionTestBase.scala
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.api.scala.batch.utils
-
-import java.util
-
-import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
-import org.junit.runners.Parameterized
-
-import scala.collection.JavaConversions._
-
-/**
-  * This test base provides lightweight integration tests for batch programs. However, it does
-  * not test everything (e.g. combiners). Runtime operator tests should
-  * use [[TableProgramsClusterTestBase]].
-  */
-class TableProgramsCollectionTestBase(
-    tableConfigMode: TableConfigMode)
-    extends TableProgramsTestBase(TestExecutionMode.COLLECTION, tableConfigMode) {
-}
-
-object TableProgramsCollectionTestBase {
-
-  @Parameterized.Parameters(name = "Table config = {0}")
-  def parameters(): util.Collection[Array[java.lang.Object]] = {
-    Seq[Array[AnyRef]](Array(TableProgramsTestBase.DEFAULT))
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/utils/TableProgramsTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/utils/TableProgramsTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/utils/TableProgramsTestBase.scala
deleted file mode 100644
index cf9d947..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/utils/TableProgramsTestBase.scala
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.api.scala.batch.utils
-
-import org.apache.flink.table.api.TableConfig
-import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.{NO_NULL, TableConfigMode}
-import org.apache.flink.test.util.MultipleProgramsTestBase
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
-
-class TableProgramsTestBase(
-    mode: TestExecutionMode,
-    tableConfigMode: TableConfigMode)
-  extends MultipleProgramsTestBase(mode) {
-
-  def config: TableConfig = {
-    val conf = new TableConfig
-    tableConfigMode match {
-      case NO_NULL =>
-        conf.setNullCheck(false)
-      case _ => // keep default
-    }
-    conf
-  }
-}
-
-object TableProgramsTestBase {
-  case class TableConfigMode(nullCheck: Boolean)
-
-  val DEFAULT = TableConfigMode(nullCheck = true)
-  val NO_NULL = TableConfigMode(nullCheck = false)
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/validation/TableEnvironmentValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/validation/TableEnvironmentValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/validation/TableEnvironmentValidationTest.scala
deleted file mode 100644
index 030a415..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/validation/TableEnvironmentValidationTest.scala
+++ /dev/null
@@ -1,244 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.api.scala.batch.validation
-
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo.{DOUBLE_TYPE_INFO, INT_TYPE_INFO, STRING_TYPE_INFO}
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.typeutils.{GenericTypeInfo, RowTypeInfo, TupleTypeInfo, TypeExtractor}
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.api.scala.batch.utils.TableProgramsCollectionTestBase
-import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
-import org.apache.flink.table.api.scala.batch.{CClass, PojoClass}
-import org.apache.flink.table.api.{TableEnvironment, TableException}
-import org.apache.flink.table.expressions.{Alias, UnresolvedFieldReference}
-import org.apache.flink.table.runtime.types.CRowTypeInfo
-import org.apache.flink.types.Row
-import org.junit.Assert.assertTrue
-import org.junit._
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-
-@RunWith(classOf[Parameterized])
-class TableEnvironmentValidationTest(
-    configMode: TableConfigMode)
-  extends TableProgramsCollectionTestBase(configMode) {
-
-  val env = ExecutionEnvironment.getExecutionEnvironment
-  val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-  val tupleType = new TupleTypeInfo(
-    INT_TYPE_INFO,
-    STRING_TYPE_INFO,
-    DOUBLE_TYPE_INFO)
-
-  val rowType = new RowTypeInfo(INT_TYPE_INFO, STRING_TYPE_INFO,DOUBLE_TYPE_INFO)
-
-  val cRowType = new CRowTypeInfo(rowType)
-
-  val caseClassType: TypeInformation[CClass] = implicitly[TypeInformation[CClass]]
-
-  val pojoType: TypeInformation[PojoClass] = TypeExtractor.createTypeInfo(classOf[PojoClass])
-
-  val atomicType = INT_TYPE_INFO
-
-  val genericRowType = new GenericTypeInfo[Row](classOf[Row])
-
-
-  @Test(expected = classOf[TableException])
-  def testGetFieldInfoPojoNames1(): Unit = {
-    tEnv.getFieldInfo(
-      pojoType,
-      Array(
-        UnresolvedFieldReference("name1"),
-        UnresolvedFieldReference("name2"),
-        UnresolvedFieldReference("name3")
-      ))
-  }
-
-  @Test(expected = classOf[TableException])
-  def testGetFieldInfoAtomicName2(): Unit = {
-    tEnv.getFieldInfo(
-      atomicType,
-      Array(
-        UnresolvedFieldReference("name1"),
-        UnresolvedFieldReference("name2")
-      ))
-  }
-
-  @Test(expected = classOf[TableException])
-  def testGetFieldInfoTupleAlias3(): Unit = {
-    tEnv.getFieldInfo(
-      tupleType,
-      Array(
-        Alias(UnresolvedFieldReference("xxx"), "name1"),
-        Alias(UnresolvedFieldReference("yyy"), "name2"),
-        Alias(UnresolvedFieldReference("zzz"), "name3")
-      ))
-  }
-
-  @Test(expected = classOf[TableException])
-  def testGetFieldInfoCClassAlias3(): Unit = {
-    tEnv.getFieldInfo(
-      caseClassType,
-      Array(
-        Alias(UnresolvedFieldReference("xxx"), "name1"),
-        Alias(UnresolvedFieldReference("yyy"), "name2"),
-        Alias(UnresolvedFieldReference("zzz"), "name3")
-      ))
-  }
-
-  @Test(expected = classOf[TableException])
-  def testGetFieldInfoPojoAlias3(): Unit = {
-    tEnv.getFieldInfo(
-      pojoType,
-      Array(
-        Alias(UnresolvedFieldReference("xxx"), "name1"),
-        Alias(UnresolvedFieldReference("yyy"), "name2"),
-        Alias(UnresolvedFieldReference("zzz"), "name3")
-      ))
-  }
-
-  @Test(expected = classOf[TableException])
-  def testGetFieldInfoAtomicAlias(): Unit = {
-    tEnv.getFieldInfo(
-      atomicType,
-      Array(
-        Alias(UnresolvedFieldReference("name1"), "name2")
-      ))
-  }
-
-  @Test(expected = classOf[TableException])
-  def testGetFieldInfoGenericRowAlias(): Unit = {
-    tEnv.getFieldInfo(
-      genericRowType,
-      Array(UnresolvedFieldReference("first")))
-  }
-
-  @Test(expected = classOf[TableException])
-  def testRegisterExistingDataSet(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds1 = CollectionDataSets.get3TupleDataSet(env)
-    tEnv.registerDataSet("MyTable", ds1)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env)
-    // Must fail. Name is already in use.
-    tEnv.registerDataSet("MyTable", ds2)
-  }
-
-  @Test(expected = classOf[TableException])
-  def testScanUnregisteredTable(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-    // Must fail. No table registered under that name.
-    tEnv.scan("someTable")
-  }
-
-  @Test(expected = classOf[TableException])
-  def testRegisterExistingTable(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val t1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
-    tEnv.registerTable("MyTable", t1)
-    val t2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv)
-    // Must fail. Name is already in use.
-    tEnv.registerDataSet("MyTable", t2)
-  }
-
-  @Test(expected = classOf[TableException])
-  def testRegisterTableFromOtherEnv(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv1 = TableEnvironment.getTableEnvironment(env, config)
-    val tEnv2 = TableEnvironment.getTableEnvironment(env, config)
-
-    val t1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv1)
-    // Must fail. Table is bound to different TableEnvironment.
-    tEnv2.registerTable("MyTable", t1)
-  }
-
-  @Test(expected = classOf[TableException])
-  def testToTableWithToManyFields(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    CollectionDataSets.get3TupleDataSet(env)
-      // Must fail. Number of fields does not match.
-      .toTable(tEnv, 'a, 'b, 'c, 'd)
-  }
-
-  @Test(expected = classOf[TableException])
-  def testToTableWithAmbiguousFields(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    CollectionDataSets.get3TupleDataSet(env)
-      // Must fail. Field names not unique.
-      .toTable(tEnv, 'a, 'b, 'b)
-  }
-
-  @Test(expected = classOf[TableException])
-  def testToTableWithNonFieldReference1(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    // Must fail. as() can only have field references
-    CollectionDataSets.get3TupleDataSet(env)
-      .toTable(tEnv, 'a + 1, 'b, 'c)
-  }
-
-  @Test(expected = classOf[TableException])
-  def testToTableWithNonFieldReference2(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    // Must fail. as() can only have field references
-    CollectionDataSets.get3TupleDataSet(env)
-      .toTable(tEnv, 'a as 'foo, 'b, 'c)
-  }
-
-  @Test(expected = classOf[TableException])
-  def testGenericRow() {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tableEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    // use null value the enforce GenericType
-    val dataSet = env.fromElements(Row.of(null))
-    assertTrue(dataSet.getType().isInstanceOf[GenericTypeInfo[_]])
-    assertTrue(dataSet.getType().getTypeClass == classOf[Row])
-
-    // Must fail. Cannot import DataSet<Row> with GenericTypeInfo.
-    tableEnv.fromDataSet(dataSet)
-  }
-
-  @Test(expected = classOf[TableException])
-  def testGenericRowWithAlias() {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tableEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    // use null value the enforce GenericType
-    val dataSet = env.fromElements(Row.of(null))
-    assertTrue(dataSet.getType().isInstanceOf[GenericTypeInfo[_]])
-    assertTrue(dataSet.getType().getTypeClass == classOf[Row])
-
-    // Must fail. Cannot import DataSet<Row> with GenericTypeInfo.
-    tableEnv.fromDataSet(dataSet, "nullField")
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableEnvironmentTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableEnvironmentTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableEnvironmentTest.scala
deleted file mode 100644
index ea09f32..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableEnvironmentTest.scala
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.api.scala.stream
-
-import java.lang.{Integer => JInt, Long => JLong}
-
-import org.apache.flink.api.java.tuple.{Tuple5 => JTuple5}
-import org.apache.flink.api.java.typeutils.TupleTypeInfo
-import org.apache.flink.api.scala._
-import org.apache.flink.streaming.api.TimeCharacteristic
-import org.apache.flink.streaming.api.datastream.DataStream
-import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment => JStreamExecEnv}
-import org.apache.flink.table.api.java.{StreamTableEnvironment => JStreamTableEnv}
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.api.{TableEnvironment, Types}
-import org.apache.flink.table.utils.TableTestBase
-import org.junit.Test
-import org.mockito.Mockito.{mock, when}
-
-class TableEnvironmentTest extends TableTestBase {
-
-  @Test
-  def testProctimeAttribute(): Unit = {
-    val util = streamTestUtil()
-    // cannot replace an attribute with proctime
-    util.addTable[(Long, Int, String, Int, Long)]('a, 'b, 'c, 'd, 'e, 'pt.proctime)
-  }
-
-  @Test
-  def testReplacedRowtimeAttribute(): Unit = {
-    val util = streamTestUtil()
-    util.addTable[(Long, Int, String, Int, Long)]('rt.rowtime, 'b, 'c, 'd, 'e)
-  }
-
-  @Test
-  def testAppendedRowtimeAttribute(): Unit = {
-    val util = streamTestUtil()
-    util.addTable[(Long, Int, String, Int, Long)]('a, 'b, 'c, 'd, 'e, 'rt.rowtime)
-  }
-
-  @Test
-  def testRowtimeAndProctimeAttribute1(): Unit = {
-    val util = streamTestUtil()
-    util.addTable[(Long, Int, String, Int, Long)]('a, 'b, 'c, 'd, 'e, 'rt.rowtime, 'pt.proctime)
-  }
-
-  @Test
-  def testRowtimeAndProctimeAttribute2(): Unit = {
-    val util = streamTestUtil()
-    util.addTable[(Long, Int, String, Int, Long)]('a, 'b, 'c, 'd, 'e, 'pt.proctime, 'rt.rowtime)
-  }
-
-  @Test
-  def testRowtimeAndProctimeAttribute3(): Unit = {
-    val util = streamTestUtil()
-    util.addTable[(Long, Int, String, Int, Long)]('rt.rowtime, 'b, 'c, 'd, 'e, 'pt.proctime)
-  }
-
-  @Test
-  def testProctimeAttributeParsed(): Unit = {
-    val (jTEnv, ds) = prepareSchemaExpressionParser
-    jTEnv.fromDataStream(ds, "a, b, c, d, e, pt.proctime")
-  }
-
-  @Test
-  def testReplacingRowtimeAttributeParsed(): Unit = {
-    val (jTEnv, ds) = prepareSchemaExpressionParser
-    jTEnv.fromDataStream(ds, "a.rowtime, b, c, d, e")
-  }
-
-  @Test
-  def testAppedingRowtimeAttributeParsed(): Unit = {
-    val (jTEnv, ds) = prepareSchemaExpressionParser
-    jTEnv.fromDataStream(ds, "a, b, c, d, e, rt.rowtime")
-  }
-
-  @Test
-  def testRowtimeAndProctimeAttributeParsed1(): Unit = {
-    val (jTEnv, ds) = prepareSchemaExpressionParser
-    jTEnv.fromDataStream(ds, "a, b, c, d, e, pt.proctime, rt.rowtime")
-  }
-
-  @Test
-  def testRowtimeAndProctimeAttributeParsed2(): Unit = {
-    val (jTEnv, ds) = prepareSchemaExpressionParser
-    jTEnv.fromDataStream(ds, "rt.rowtime, b, c, d, e, pt.proctime")
-  }
-
-  private def prepareSchemaExpressionParser:
-    (JStreamTableEnv, DataStream[JTuple5[JLong, JInt, String, JInt, JLong]]) = {
-
-    val jStreamExecEnv = mock(classOf[JStreamExecEnv])
-    when(jStreamExecEnv.getStreamTimeCharacteristic).thenReturn(TimeCharacteristic.EventTime)
-    val jTEnv = TableEnvironment.getTableEnvironment(jStreamExecEnv)
-
-    val sType = new TupleTypeInfo(Types.LONG, Types.INT, Types.STRING, Types.INT, Types.LONG)
-      .asInstanceOf[TupleTypeInfo[JTuple5[JLong, JInt, String, JInt, JLong]]]
-    val ds = mock(classOf[DataStream[JTuple5[JLong, JInt, String, JInt, JLong]]])
-    when(ds.getType).thenReturn(sType)
-
-    (jTEnv, ds)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSchemaTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSchemaTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSchemaTest.scala
deleted file mode 100644
index 1f976b7..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSchemaTest.scala
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.api.scala.stream
-
-import org.apache.flink.api.scala._
-import org.apache.flink.table.api.Types
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.utils.TableTestBase
-import org.junit.Assert.{assertEquals, assertTrue}
-import org.junit.Test
-
-class TableSchemaTest extends TableTestBase {
-
-  @Test
-  def testStreamTableSchema(): Unit = {
-    val util = streamTestUtil()
-    val table = util.addTable[(Int, String)]("MyTable", 'a, 'b)
-    val schema = table.getSchema
-
-    assertEquals("a", schema.getColumnNames.apply(0))
-    assertEquals("b", schema.getColumnNames.apply(1))
-
-    assertEquals(Types.INT, schema.getTypes.apply(0))
-    assertEquals(Types.STRING, schema.getTypes.apply(1))
-
-    val expectedString = "root\n" +
-      " |-- a: Integer\n" +
-      " |-- b: String\n"
-    assertEquals(expectedString, schema.toString)
-
-    assertTrue(schema.getColumnName(3).isEmpty)
-    assertTrue(schema.getType(-1).isEmpty)
-    assertTrue(schema.getType("c").isEmpty)
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/ExpressionReductionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/ExpressionReductionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/ExpressionReductionTest.scala
deleted file mode 100644
index af9ce74..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/ExpressionReductionTest.scala
+++ /dev/null
@@ -1,136 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.api.scala.stream.sql
-
-import org.apache.flink.api.scala._
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.utils.TableTestBase
-import org.apache.flink.table.utils.TableTestUtil._
-import org.junit.Test
-
-class ExpressionReductionTest extends TableTestBase {
-
-  @Test
-  def testReduceCalcExpression(): Unit = {
-    val util = streamTestUtil()
-    util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
-
-    val sqlQuery = "SELECT " +
-      "(3+4)+a, " +
-      "b+(1+2), " +
-      "CASE 11 WHEN 1 THEN 'a' ELSE 'b' END, " +
-      "TRIM(BOTH ' STRING '),  " +
-      "'test' || 'string', " +
-      "NULLIF(1, 1), " +
-      "TIMESTAMP '1990-10-14 23:00:00.123' + INTERVAL '10 00:00:01' DAY TO SECOND, " +
-      "EXTRACT(DAY FROM INTERVAL '19 12:10:10.123' DAY TO SECOND(3)),  " +
-      "1 IS NULL, " +
-      "'TEST' LIKE '%EST', " +
-      "FLOOR(2.5), " +
-      "'TEST' IN ('west', 'TEST', 'rest'), " +
-      "CAST(TRUE AS VARCHAR) || 'X'" +
-      "FROM MyTable WHERE a>(1+7)"
-
-    val expected = unaryNode(
-      "DataStreamCalc",
-      streamTableNode(0),
-      term("select",
-        "+(7, a) AS EXPR$0",
-        "+(b, 3) AS EXPR$1",
-        "'b' AS EXPR$2",
-        "'STRING' AS EXPR$3",
-        "'teststring' AS EXPR$4",
-        "null AS EXPR$5",
-        "1990-10-24 23:00:01.123 AS EXPR$6",
-        "19 AS EXPR$7",
-        "false AS EXPR$8",
-        "true AS EXPR$9",
-        "2 AS EXPR$10",
-        "true AS EXPR$11",
-        "'trueX' AS EXPR$12"
-      ),
-      term("where", ">(a, 8)")
-    )
-
-    util.verifySql(sqlQuery, expected)
-  }
-
-  @Test
-  def testReduceProjectExpression(): Unit = {
-    val util = streamTestUtil()
-    util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
-
-    val sqlQuery = "SELECT " +
-      "(3+4)+a, " +
-      "b+(1+2), " +
-      "CASE 11 WHEN 1 THEN 'a' ELSE 'b' END, " +
-      "TRIM(BOTH ' STRING '),  " +
-      "'test' || 'string', " +
-      "NULLIF(1, 1), " +
-      "TIMESTAMP '1990-10-14 23:00:00.123' + INTERVAL '10 00:00:01' DAY TO SECOND, " +
-      "EXTRACT(DAY FROM INTERVAL '19 12:10:10.123' DAY TO SECOND(3)),  " +
-      "1 IS NULL, " +
-      "'TEST' LIKE '%EST', " +
-      "FLOOR(2.5), " +
-      "'TEST' IN ('west', 'TEST', 'rest'), " +
-      "CAST(TRUE AS VARCHAR) || 'X'" +
-      "FROM MyTable"
-
-    val expected = unaryNode(
-      "DataStreamCalc",
-      streamTableNode(0),
-      term("select",
-        "+(7, a) AS EXPR$0",
-        "+(b, 3) AS EXPR$1",
-        "'b' AS EXPR$2",
-        "'STRING' AS EXPR$3",
-        "'teststring' AS EXPR$4",
-        "null AS EXPR$5",
-        "1990-10-24 23:00:01.123 AS EXPR$6",
-        "19 AS EXPR$7",
-        "false AS EXPR$8",
-        "true AS EXPR$9",
-        "2 AS EXPR$10",
-        "true AS EXPR$11",
-        "'trueX' AS EXPR$12"
-      )
-    )
-
-    util.verifySql(sqlQuery, expected)
-  }
-
-  @Test
-  def testReduceFilterExpression(): Unit = {
-    val util = streamTestUtil()
-    util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
-
-    val sqlQuery = "SELECT " +
-      "*" +
-      "FROM MyTable WHERE a>(1+7)"
-
-    val expected = unaryNode(
-      "DataStreamCalc",
-      streamTableNode(0),
-      term("select", "a", "b", "c"),
-      term("where", ">(a, 8)")
-    )
-
-    util.verifySql(sqlQuery, expected)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/OverWindowTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/OverWindowTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/OverWindowTest.scala
deleted file mode 100644
index a79d48f..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/OverWindowTest.scala
+++ /dev/null
@@ -1,504 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.api.scala.stream.sql
-
-import org.apache.flink.api.scala._
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.utils.TableTestUtil._
-import org.apache.flink.table.utils.{StreamTableTestUtil, TableTestBase}
-import org.junit.Test
-
-class OverWindowTest extends TableTestBase {
-  private val streamUtil: StreamTableTestUtil = streamTestUtil()
-  streamUtil.addTable[(Int, String, Long)](
-    "MyTable",
-    'a, 'b, 'c,
-    'proctime.proctime, 'rowtime.rowtime)
-
-  @Test
-  def testProcTimeBoundedPartitionedRowsOver() = {
-    val sql = "SELECT " +
-      "c, " +
-      "count(a) OVER (PARTITION BY c ORDER BY proctime ROWS BETWEEN 2 preceding AND " +
-      "CURRENT ROW) as cnt1, " +
-      "sum(a) OVER (PARTITION BY c ORDER BY proctime ROWS BETWEEN 2 preceding AND " +
-      "CURRENT ROW) as sum1 " +
-      "from MyTable"
-
-    val expected =
-      unaryNode(
-        "DataStreamCalc",
-        unaryNode(
-          "DataStreamOverAggregate",
-          unaryNode(
-            "DataStreamCalc",
-            streamTableNode(0),
-            term("select", "a", "c", "proctime")
-          ),
-          term("partitionBy", "c"),
-          term("orderBy", "proctime"),
-          term("rows", "BETWEEN 2 PRECEDING AND CURRENT ROW"),
-          term("select", "a", "c", "proctime", "COUNT(a) AS w0$o0, $SUM0(a) AS w0$o1")
-        ),
-        term("select", "c", "w0$o0 AS cnt1, CASE(>(w0$o0, 0), CAST(w0$o1), null) AS sum1")
-      )
-    streamUtil.verifySql(sql, expected)
-  }
-
-  @Test
-  def testProcTimeBoundedPartitionedRangeOver() = {
-
-    val sqlQuery =
-      "SELECT a, " +
-        "  AVG(c) OVER (PARTITION BY a ORDER BY proctime " +
-        "    RANGE BETWEEN INTERVAL '2' HOUR PRECEDING AND CURRENT ROW) AS avgA " +
-        "FROM MyTable"
-
-    val expected =
-      unaryNode(
-        "DataStreamCalc",
-        unaryNode(
-          "DataStreamOverAggregate",
-          unaryNode(
-            "DataStreamCalc",
-            streamTableNode(0),
-            term("select", "a", "c", "proctime")
-          ),
-          term("partitionBy", "a"),
-          term("orderBy", "proctime"),
-          term("range", "BETWEEN 7200000 PRECEDING AND CURRENT ROW"),
-          term(
-            "select",
-            "a",
-            "c",
-            "proctime",
-            "COUNT(c) AS w0$o0",
-            "$SUM0(c) AS w0$o1"
-          )
-        ),
-        term("select", "a", "/(CASE(>(w0$o0, 0)", "CAST(w0$o1), null), w0$o0) AS avgA")
-      )
-
-    streamUtil.verifySql(sqlQuery, expected)
-  }
-
-  @Test
-  def testProcTimeBoundedNonPartitionedRangeOver() = {
-
-    val sqlQuery =
-      "SELECT a, " +
-        "  COUNT(c) OVER (ORDER BY proctime " +
-        "    RANGE BETWEEN INTERVAL '10' SECOND PRECEDING AND CURRENT ROW) " +
-        "FROM MyTable"
-
-    val expected =
-      unaryNode(
-        "DataStreamCalc",
-        unaryNode(
-          "DataStreamOverAggregate",
-          unaryNode(
-            "DataStreamCalc",
-            streamTableNode(0),
-            term("select", "a", "c", "proctime")
-          ),
-          term("orderBy", "proctime"),
-          term("range", "BETWEEN 10000 PRECEDING AND CURRENT ROW"),
-          term("select", "a", "c", "proctime", "COUNT(c) AS w0$o0")
-        ),
-        term("select", "a", "w0$o0 AS $1")
-      )
-
-    streamUtil.verifySql(sqlQuery, expected)
-  }
-
-  @Test
-  def testProcTimeBoundedNonPartitionedRowsOver() = {
-    val sql = "SELECT " +
-      "c, " +
-      "count(a) OVER (ORDER BY proctime ROWS BETWEEN 2 preceding AND " +
-      "CURRENT ROW)" +
-      "from MyTable"
-
-    val expected =
-      unaryNode(
-        "DataStreamCalc",
-        unaryNode(
-          "DataStreamOverAggregate",
-          unaryNode(
-            "DataStreamCalc",
-            streamTableNode(0),
-            term("select", "a", "c", "proctime")
-          ),
-          term("orderBy", "proctime"),
-          term("rows", "BETWEEN 2 PRECEDING AND CURRENT ROW"),
-          term("select", "a", "c", "proctime", "COUNT(a) AS w0$o0")
-        ),
-        term("select", "c", "w0$o0 AS $1")
-      )
-    streamUtil.verifySql(sql, expected)
-  }
-
-
-  @Test
-  def testProcTimeUnboundedPartitionedRangeOver() = {
-    val sql = "SELECT " +
-      "c, " +
-      "count(a) OVER (PARTITION BY c ORDER BY proctime RANGE UNBOUNDED preceding) as cnt1, " +
-      "sum(a) OVER (PARTITION BY c ORDER BY proctime RANGE UNBOUNDED preceding) as cnt2 " +
-      "from MyTable"
-
-    val expected =
-      unaryNode(
-        "DataStreamCalc",
-        unaryNode(
-          "DataStreamOverAggregate",
-          unaryNode(
-            "DataStreamCalc",
-            streamTableNode(0),
-            term("select", "a", "c", "proctime")
-          ),
-          term("partitionBy", "c"),
-          term("orderBy", "proctime"),
-          term("range", "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW"),
-          term("select", "a", "c", "proctime", "COUNT(a) AS w0$o0", "$SUM0(a) AS w0$o1")
-        ),
-        term("select", "c", "w0$o0 AS cnt1", "CASE(>(w0$o0, 0)", "CAST(w0$o1), null) AS cnt2")
-      )
-    streamUtil.verifySql(sql, expected)
-  }
-
-  @Test
-  def testProcTimeUnboundedPartitionedRowsOver() = {
-    val sql = "SELECT " +
-      "c, " +
-      "count(a) OVER (PARTITION BY c ORDER BY proctime ROWS BETWEEN UNBOUNDED preceding AND " +
-      "CURRENT ROW) " +
-      "from MyTable"
-
-    val expected =
-      unaryNode(
-        "DataStreamCalc",
-        unaryNode(
-          "DataStreamOverAggregate",
-          streamTableNode(0),
-          term("partitionBy", "c"),
-          term("orderBy", "proctime"),
-          term("rows", "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW"),
-          term("select", "a", "b", "c", "proctime", "rowtime", "COUNT(a) AS w0$o0")
-        ),
-        term("select", "c", "w0$o0 AS $1")
-      )
-    streamUtil.verifySql(sql, expected)
-  }
-
-  @Test
-  def testProcTimeUnboundedNonPartitionedRangeOver() = {
-    val sql = "SELECT " +
-      "c, " +
-      "count(a) OVER (ORDER BY proctime RANGE UNBOUNDED preceding) as cnt1, " +
-      "sum(a) OVER (ORDER BY proctime RANGE UNBOUNDED preceding) as cnt2 " +
-      "from MyTable"
-
-    val expected =
-      unaryNode(
-        "DataStreamCalc",
-        unaryNode(
-          "DataStreamOverAggregate",
-          unaryNode(
-            "DataStreamCalc",
-            streamTableNode(0),
-            term("select", "a", "c", "proctime")
-          ),
-          term("orderBy", "proctime"),
-          term("range", "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW"),
-          term("select", "a", "c", "proctime", "COUNT(a) AS w0$o0", "$SUM0(a) AS w0$o1")
-        ),
-        term("select", "c", "w0$o0 AS cnt1", "CASE(>(w0$o0, 0)", "CAST(w0$o1), null) AS cnt2")
-      )
-    streamUtil.verifySql(sql, expected)
-  }
-
-  @Test
-  def testProcTimeUnboundedNonPartitionedRowsOver() = {
-    val sql = "SELECT " +
-      "c, " +
-      "count(a) OVER (ORDER BY proctime ROWS BETWEEN UNBOUNDED preceding AND " +
-      "CURRENT ROW) " +
-      "from MyTable"
-
-    val expected =
-      unaryNode(
-        "DataStreamCalc",
-        unaryNode(
-          "DataStreamOverAggregate",
-          streamTableNode(0),
-          term("orderBy", "proctime"),
-          term("rows", "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW"),
-          term("select", "a", "b", "c", "proctime", "rowtime", "COUNT(a) AS w0$o0")
-        ),
-        term("select", "c", "w0$o0 AS $1")
-      )
-    streamUtil.verifySql(sql, expected)
-  }
-
-  @Test
-  def testRowTimeBoundedPartitionedRowsOver() = {
-    val sql = "SELECT " +
-      "c, " +
-      "count(a) OVER (PARTITION BY c ORDER BY rowtime ROWS BETWEEN 5 preceding AND " +
-      "CURRENT ROW) " +
-      "from MyTable"
-
-    val expected =
-      unaryNode(
-        "DataStreamCalc",
-        unaryNode(
-          "DataStreamOverAggregate",
-          unaryNode(
-            "DataStreamCalc",
-            streamTableNode(0),
-            term("select", "a", "c", "rowtime")
-          ),
-          term("partitionBy", "c"),
-          term("orderBy", "rowtime"),
-          term("rows", "BETWEEN 5 PRECEDING AND CURRENT ROW"),
-          term("select", "a", "c", "rowtime", "COUNT(a) AS w0$o0")
-        ),
-        term("select", "c", "w0$o0 AS $1")
-      )
-    streamUtil.verifySql(sql, expected)
-  }
-
-  @Test
-  def testRowTimeBoundedPartitionedRangeOver() = {
-    val sql = "SELECT " +
-      "c, " +
-      "count(a) OVER (PARTITION BY c ORDER BY rowtime " +
-      "RANGE BETWEEN INTERVAL '1' SECOND  preceding AND CURRENT ROW) " +
-      "from MyTable"
-
-    val expected =
-      unaryNode(
-        "DataStreamCalc",
-        unaryNode(
-          "DataStreamOverAggregate",
-          unaryNode(
-            "DataStreamCalc",
-            streamTableNode(0),
-            term("select", "a", "c", "rowtime")
-          ),
-          term("partitionBy", "c"),
-          term("orderBy", "rowtime"),
-          term("range", "BETWEEN 1000 PRECEDING AND CURRENT ROW"),
-          term("select", "a", "c", "rowtime", "COUNT(a) AS w0$o0")
-        ),
-        term("select", "c", "w0$o0 AS $1")
-      )
-    streamUtil.verifySql(sql, expected)
-  }
-
-  @Test
-  def testRowTimeBoundedNonPartitionedRowsOver() = {
-    val sql = "SELECT " +
-      "c, " +
-      "count(a) OVER (ORDER BY rowtime ROWS BETWEEN 5 preceding AND " +
-      "CURRENT ROW) " +
-      "from MyTable"
-
-    val expected =
-      unaryNode(
-        "DataStreamCalc",
-        unaryNode(
-          "DataStreamOverAggregate",
-          unaryNode(
-            "DataStreamCalc",
-            streamTableNode(0),
-            term("select", "a", "c", "rowtime")
-          ),
-          term("orderBy", "rowtime"),
-          term("rows", "BETWEEN 5 PRECEDING AND CURRENT ROW"),
-          term("select", "a", "c", "rowtime", "COUNT(a) AS w0$o0")
-        ),
-        term("select", "c", "w0$o0 AS $1")
-      )
-    streamUtil.verifySql(sql, expected)
-  }
-
-  @Test
-  def testRowTimeBoundedNonPartitionedRangeOver() = {
-    val sql = "SELECT " +
-      "c, " +
-      "count(a) OVER (ORDER BY rowtime " +
-      "RANGE BETWEEN INTERVAL '1' SECOND  preceding AND CURRENT ROW) as cnt1 " +
-      "from MyTable"
-
-    val expected =
-      unaryNode(
-        "DataStreamCalc",
-        unaryNode(
-          "DataStreamOverAggregate",
-          unaryNode(
-            "DataStreamCalc",
-            streamTableNode(0),
-            term("select", "a", "c", "rowtime")
-          ),
-          term("orderBy", "rowtime"),
-          term("range", "BETWEEN 1000 PRECEDING AND CURRENT ROW"),
-          term("select", "a", "c", "rowtime", "COUNT(a) AS w0$o0")
-        ),
-        term("select", "c", "w0$o0 AS $1")
-      )
-    streamUtil.verifySql(sql, expected)
-  }
-
-  @Test
-  def testRowTimeUnboundedPartitionedRangeOver() = {
-    val sql = "SELECT " +
-      "c, " +
-      "count(a) OVER (PARTITION BY c ORDER BY rowtime RANGE UNBOUNDED preceding) as cnt1, " +
-      "sum(a) OVER (PARTITION BY c ORDER BY rowtime RANGE UNBOUNDED preceding) as cnt2 " +
-      "from MyTable"
-
-    val expected =
-      unaryNode(
-        "DataStreamCalc",
-        unaryNode(
-          "DataStreamOverAggregate",
-          unaryNode(
-            "DataStreamCalc",
-            streamTableNode(0),
-            term("select", "a", "c", "rowtime")
-          ),
-          term("partitionBy", "c"),
-          term("orderBy", "rowtime"),
-          term("range", "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW"),
-          term("select", "a", "c", "rowtime", "COUNT(a) AS w0$o0", "$SUM0(a) AS w0$o1")
-        ),
-        term("select", "c", "w0$o0 AS cnt1", "CASE(>(w0$o0, 0)", "CAST(w0$o1), null) AS cnt2")
-      )
-    streamUtil.verifySql(sql, expected)
-  }
-
-  @Test
-  def testRowTimeUnboundedPartitionedRowsOver() = {
-    val sql = "SELECT " +
-      "c, " +
-      "count(a) OVER (PARTITION BY c ORDER BY rowtime ROWS UNBOUNDED preceding) as cnt1, " +
-      "sum(a) OVER (PARTITION BY c ORDER BY rowtime ROWS UNBOUNDED preceding) as cnt2 " +
-      "from MyTable"
-
-    val expected =
-      unaryNode(
-        "DataStreamCalc",
-        unaryNode(
-          "DataStreamOverAggregate",
-          unaryNode(
-            "DataStreamCalc",
-            streamTableNode(0),
-            term("select", "a", "c", "rowtime")
-          ),
-          term("partitionBy", "c"),
-          term("orderBy", "rowtime"),
-          term("rows", "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW"),
-          term(
-            "select",
-            "a",
-            "c",
-            "rowtime",
-            "COUNT(a) AS w0$o0",
-            "$SUM0(a) AS w0$o1"
-          )
-        ),
-        term(
-          "select",
-          "c",
-          "w0$o0 AS cnt1",
-          "CASE(>(w0$o0, 0)",
-          "CAST(w0$o1), null) AS cnt2"
-        )
-      )
-    streamUtil.verifySql(sql, expected)
-  }
-
-  @Test
-  def testRowTimeUnboundedNonPartitionedRangeOver() = {
-    val sql = "SELECT " +
-      "c, " +
-      "count(a) OVER (ORDER BY rowtime RANGE UNBOUNDED preceding) as cnt1, " +
-      "sum(a) OVER (ORDER BY rowtime RANGE UNBOUNDED preceding) as cnt2 " +
-      "from MyTable"
-
-    val expected =
-      unaryNode(
-        "DataStreamCalc",
-        unaryNode(
-          "DataStreamOverAggregate",
-          unaryNode(
-            "DataStreamCalc",
-            streamTableNode(0),
-            term("select", "a", "c", "rowtime")
-          ),
-          term("orderBy", "rowtime"),
-          term("range", "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW"),
-          term("select", "a", "c", "rowtime", "COUNT(a) AS w0$o0", "$SUM0(a) AS w0$o1")
-        ),
-        term("select", "c", "w0$o0 AS cnt1", "CASE(>(w0$o0, 0)", "CAST(w0$o1), null) AS cnt2")
-      )
-    streamUtil.verifySql(sql, expected)
-  }
-
-  @Test
-  def testRowTimeUnboundedNonPartitionedRowsOver() = {
-    val sql = "SELECT " +
-      "c, " +
-      "count(a) OVER (ORDER BY rowtime ROWS UNBOUNDED preceding) as cnt1, " +
-      "sum(a) OVER (ORDER BY rowtime ROWS UNBOUNDED preceding) as cnt2 " +
-      "from MyTable"
-
-    val expected =
-      unaryNode(
-        "DataStreamCalc",
-        unaryNode(
-          "DataStreamOverAggregate",
-          unaryNode(
-            "DataStreamCalc",
-            streamTableNode(0),
-            term("select", "a", "c", "rowtime")
-          ),
-          term("orderBy", "rowtime"),
-          term("rows", "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW"),
-          term(
-            "select",
-            "a",
-            "c",
-            "rowtime",
-            "COUNT(a) AS w0$o0",
-            "$SUM0(a) AS w0$o1"
-          )
-        ),
-        term(
-          "select",
-          "c",
-          "w0$o0 AS cnt1",
-          "CASE(>(w0$o0, 0)",
-          "CAST(w0$o1), null) AS cnt2"
-        )
-      )
-    streamUtil.verifySql(sql, expected)
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SortITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SortITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SortITCase.scala
deleted file mode 100644
index 2340591..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SortITCase.scala
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.api.scala.stream.sql
-
-import org.apache.flink.api.scala._
-import org.apache.flink.streaming.api.TimeCharacteristic
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
-import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.table.api.scala.stream.sql.SortITCase.StringRowSelectorSink
-import org.apache.flink.table.api.scala.stream.sql.TimeTestUtil.EventTimeSourceFunction
-import org.apache.flink.table.api.TableEnvironment
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, StreamingWithStateTestBase}
-import org.apache.flink.types.Row
-
-import org.junit.Assert._
-import org.junit._
-
-import scala.collection.mutable
-
-class SortITCase extends StreamingWithStateTestBase {
-
-  @Test
-  def testEventTimeOrderBy(): Unit = {
-    val data = Seq(
-      Left((1500L, (1L, 15, "Hello"))),
-      Left((1600L, (1L, 16, "Hello"))),
-      Left((1000L, (1L, 1, "Hello"))),
-      Left((2000L, (2L, 2, "Hello"))),
-      Right(1000L),
-      Left((2000L, (2L, 2, "Hello"))),
-      Left((2000L, (2L, 3, "Hello"))),
-      Left((3000L, (3L, 3, "Hello"))),
-      Left((2000L, (3L, 1, "Hello"))),
-      Right(2000L),
-      Left((4000L, (4L, 4, "Hello"))),
-      Right(3000L),
-      Left((5000L, (5L, 5, "Hello"))),
-      Right(5000L),
-      Left((6000L, (6L, 65, "Hello"))),
-      Left((6000L, (6L, 6, "Hello"))),
-      Left((6000L, (6L, 67, "Hello"))),
-      Left((6000L, (6L, -1, "Hello"))),
-      Left((6000L, (6L, 6, "Hello"))),
-      Right(7000L),
-      Left((9000L, (6L, 9, "Hello"))),
-      Left((8500L, (6L, 18, "Hello"))),
-      Left((9000L, (6L, 7, "Hello"))),
-      Right(10000L),
-      Left((10000L, (7L, 7, "Hello World"))),
-      Left((11000L, (7L, 77, "Hello World"))),
-      Left((11000L, (7L, 17, "Hello World"))),
-      Right(12000L),
-      Left((14000L, (7L, 18, "Hello World"))),
-      Right(14000L),
-      Left((15000L, (8L, 8, "Hello World"))),
-      Right(17000L),
-      Left((20000L, (20L, 20, "Hello World"))), 
-      Right(19000L))
-
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    env.setStateBackend(getStateBackend)
-    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.clear
-
-    val t1 = env.addSource(new EventTimeSourceFunction[(Long, Int, String)](data))
-      .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
-      
-    tEnv.registerTable("T1", t1)
-
-    val  sqlQuery = "SELECT b FROM T1 ORDER BY rowtime, b ASC "
-      
-      
-    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
-    result.addSink(new StringRowSelectorSink(0)).setParallelism(1)
-    env.execute()
-    
-    val expected = mutable.MutableList(
-      "1", "15", "16",
-      "1", "2", "2", "3",
-      "3",
-      "4",
-      "5",
-      "-1", "6", "6", "65", "67",
-      "18", "7", "9",
-      "7", "17", "77", 
-      "18",
-      "8",
-      "20")
-    assertEquals(expected, SortITCase.testResults)
-  }
-}
-
-object SortITCase {
-
-  final class StringRowSelectorSink(private val field:Int) extends RichSinkFunction[Row]() {
-    def invoke(value: Row) {
-      testResults.synchronized {
-        testResults += value.getField(field).toString
-      }
-    }
-  }
-  
-  var testResults: mutable.MutableList[String] = mutable.MutableList.empty[String]
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SortTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SortTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SortTest.scala
deleted file mode 100644
index 1d50fc1..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SortTest.scala
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.api.scala.stream.sql
-
-import org.apache.flink.api.scala._
-import org.apache.flink.table.api.TableException
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.utils.TableTestUtil._
-import org.apache.flink.table.utils.{StreamTableTestUtil, TableTestBase}
-import org.junit.Test
-
-class SortTest extends TableTestBase {
-  private val streamUtil: StreamTableTestUtil = streamTestUtil()
-  streamUtil.addTable[(Int, String, Long)]("MyTable", 'a, 'b, 'c,
-      'proctime.proctime, 'rowtime.rowtime)
-  
-  @Test
-  def testSortProcessingTime() = {
-
-    val sqlQuery = "SELECT a FROM MyTable ORDER BY proctime, c"
-
-    val expected =
-      unaryNode(
-        "DataStreamCalc",
-        unaryNode("DataStreamSort",
-          streamTableNode(0),
-          term("orderBy", "proctime ASC", "c ASC")),
-        term("select", "a", "TIME_MATERIALIZATION(proctime) AS proctime", "c"))
-
-    streamUtil.verifySql(sqlQuery, expected)
-  }
-
-  @Test
-  def testSortRowTime() = {
-
-    val sqlQuery = "SELECT a FROM MyTable ORDER BY rowtime, c"
-      
-    val expected =
-      unaryNode(
-        "DataStreamCalc",
-        unaryNode("DataStreamSort",
-          streamTableNode(0),
-          term("orderBy", "rowtime ASC, c ASC")),
-        term("select", "a", "TIME_MATERIALIZATION(rowtime) AS rowtime", "c"))
-       
-    streamUtil.verifySql(sqlQuery, expected)
-  }
-
-  // test should fail because time order is descending
-  @Test(expected = classOf[TableException])
-  def testSortProcessingTimeDesc() = {
-
-    val sqlQuery = "SELECT a FROM MyTable ORDER BY proctime DESC, c"
-    streamUtil.verifySql(sqlQuery, "")
-  }
-  
-  
-  // test should fail because time is not the primary order field
-  @Test(expected = classOf[TableException])
-  def testSortProcessingTimeSecondaryField() = {
-
-    val sqlQuery = "SELECT a FROM MyTable ORDER BY c, proctime"
-    streamUtil.verifySql(sqlQuery, "")
-  }
-  
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/TimeTestUtil.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/TimeTestUtil.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/TimeTestUtil.scala
deleted file mode 100644
index b883870..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/TimeTestUtil.scala
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.api.scala.stream.sql
-
-import org.apache.flink.streaming.api.functions.source.SourceFunction
-import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
-import org.apache.flink.streaming.api.watermark.Watermark
-
-object TimeTestUtil {
-  
-  class EventTimeSourceFunction[T](
-      dataWithTimestampList: Seq[Either[(Long, T), Long]]) extends SourceFunction[T] {
-    override def run(ctx: SourceContext[T]): Unit = {
-      dataWithTimestampList.foreach {
-        case Left(t) => ctx.collectWithTimestamp(t._2, t._1)
-        case Right(w) => ctx.emitWatermark(new Watermark(w))
-      }
-    }
-
-    override def cancel(): Unit = ???
-  }
-  
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/UserDefinedTableFunctionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/UserDefinedTableFunctionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/UserDefinedTableFunctionTest.scala
deleted file mode 100644
index 58eedd0..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/UserDefinedTableFunctionTest.scala
+++ /dev/null
@@ -1,237 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.api.scala.stream.sql
-
-import org.apache.flink.api.scala._
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.utils.{HierarchyTableFunction, PojoTableFunc, TableFunc2}
-import org.apache.flink.table.utils._
-import org.apache.flink.table.utils.TableTestUtil._
-import org.junit.Test
-
-class UserDefinedTableFunctionTest extends TableTestBase {
-
-  @Test
-  def testCrossJoin(): Unit = {
-    val util = streamTestUtil()
-    val func1 = new TableFunc1
-    util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
-    util.addFunction("func1", func1)
-
-    val sqlQuery = "SELECT c, s FROM MyTable, LATERAL TABLE(func1(c)) AS T(s)"
-
-    val expected = unaryNode(
-      "DataStreamCalc",
-      unaryNode(
-        "DataStreamCorrelate",
-        streamTableNode(0),
-        term("invocation", "func1($cor0.c)"),
-        term("function", func1.getClass.getCanonicalName),
-        term("rowType",
-             "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) f0)"),
-        term("joinType", "INNER")
-      ),
-      term("select", "c", "f0 AS s")
-    )
-
-    util.verifySql(sqlQuery, expected)
-
-    // test overloading
-
-    val sqlQuery2 = "SELECT c, s FROM MyTable, LATERAL TABLE(func1(c, '$')) AS T(s)"
-
-    val expected2 = unaryNode(
-      "DataStreamCalc",
-      unaryNode(
-        "DataStreamCorrelate",
-        streamTableNode(0),
-        term("invocation", "func1($cor0.c, '$')"),
-        term("function", func1.getClass.getCanonicalName),
-        term("rowType",
-             "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) f0)"),
-        term("joinType", "INNER")
-      ),
-      term("select", "c", "f0 AS s")
-    )
-
-    util.verifySql(sqlQuery2, expected2)
-  }
-
-  @Test
-  def testLeftOuterJoin(): Unit = {
-    val util = streamTestUtil()
-    val func1 = new TableFunc1
-    util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
-    util.addFunction("func1", func1)
-
-    val sqlQuery = "SELECT c, s FROM MyTable LEFT JOIN LATERAL TABLE(func1(c)) AS T(s) ON TRUE"
-
-    val expected = unaryNode(
-      "DataStreamCalc",
-      unaryNode(
-        "DataStreamCorrelate",
-        streamTableNode(0),
-        term("invocation", "func1($cor0.c)"),
-        term("function", func1.getClass.getCanonicalName),
-        term("rowType",
-             "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) f0)"),
-        term("joinType", "LEFT")
-      ),
-      term("select", "c", "f0 AS s")
-    )
-
-    util.verifySql(sqlQuery, expected)
-  }
-
-  @Test
-  def testCustomType(): Unit = {
-    val util = streamTestUtil()
-    val func2 = new TableFunc2
-    util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
-    util.addFunction("func2", func2)
-
-    val sqlQuery = "SELECT c, name, len FROM MyTable, LATERAL TABLE(func2(c)) AS T(name, len)"
-
-    val expected = unaryNode(
-      "DataStreamCalc",
-      unaryNode(
-        "DataStreamCorrelate",
-        streamTableNode(0),
-        term("invocation", "func2($cor0.c)"),
-        term("function", func2.getClass.getCanonicalName),
-        term("rowType",
-             "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, " +
-               "VARCHAR(2147483647) f0, INTEGER f1)"),
-        term("joinType", "INNER")
-      ),
-      term("select", "c", "f0 AS name", "f1 AS len")
-    )
-
-    util.verifySql(sqlQuery, expected)
-  }
-
-  @Test
-  def testHierarchyType(): Unit = {
-    val util = streamTestUtil()
-    util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
-    val function = new HierarchyTableFunction
-    util.addFunction("hierarchy", function)
-
-    val sqlQuery = "SELECT c, T.* FROM MyTable, LATERAL TABLE(hierarchy(c)) AS T(name, adult, len)"
-
-    val expected = unaryNode(
-      "DataStreamCalc",
-      unaryNode(
-        "DataStreamCorrelate",
-        streamTableNode(0),
-        term("invocation", "hierarchy($cor0.c)"),
-        term("function", function.getClass.getCanonicalName),
-        term("rowType",
-             "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c," +
-               " VARCHAR(2147483647) f0, BOOLEAN f1, INTEGER f2)"),
-        term("joinType", "INNER")
-      ),
-      term("select", "c", "f0 AS name", "f1 AS adult", "f2 AS len")
-    )
-
-    util.verifySql(sqlQuery, expected)
-  }
-
-  @Test
-  def testPojoType(): Unit = {
-    val util = streamTestUtil()
-    util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
-    val function = new PojoTableFunc
-    util.addFunction("pojo", function)
-
-    val sqlQuery = "SELECT c, name, age FROM MyTable, LATERAL TABLE(pojo(c))"
-
-    val expected = unaryNode(
-      "DataStreamCalc",
-      unaryNode(
-        "DataStreamCorrelate",
-        streamTableNode(0),
-        term("invocation", "pojo($cor0.c)"),
-        term("function", function.getClass.getCanonicalName),
-        term("rowType",
-             "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c," +
-               " INTEGER age, VARCHAR(2147483647) name)"),
-        term("joinType", "INNER")
-      ),
-      term("select", "c", "name", "age")
-    )
-
-    util.verifySql(sqlQuery, expected)
-  }
-
-  @Test
-  def testFilter(): Unit = {
-    val util = streamTestUtil()
-    val func2 = new TableFunc2
-    util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
-    util.addFunction("func2", func2)
-
-    val sqlQuery = "SELECT c, name, len FROM MyTable, LATERAL TABLE(func2(c)) AS T(name, len) " +
-      "WHERE len > 2"
-
-    val expected = unaryNode(
-      "DataStreamCalc",
-      unaryNode(
-        "DataStreamCorrelate",
-        streamTableNode(0),
-        term("invocation", "func2($cor0.c)"),
-        term("function", func2.getClass.getCanonicalName),
-        term("rowType",
-             "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, " +
-               "VARCHAR(2147483647) f0, INTEGER f1)"),
-        term("joinType", "INNER"),
-        term("condition", ">($1, 2)")
-      ),
-      term("select", "c", "f0 AS name", "f1 AS len")
-    )
-
-    util.verifySql(sqlQuery, expected)
-  }
-
-  @Test
-  def testScalarFunction(): Unit = {
-    val util = streamTestUtil()
-    val func1 = new TableFunc1
-    util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
-    util.addFunction("func1", func1)
-
-    val sqlQuery = "SELECT c, s FROM MyTable, LATERAL TABLE(func1(SUBSTRING(c, 2))) AS T(s)"
-
-    val expected = unaryNode(
-      "DataStreamCalc",
-      unaryNode(
-        "DataStreamCorrelate",
-        streamTableNode(0),
-        term("invocation", "func1(SUBSTRING($cor0.c, 2))"),
-        term("function", func1.getClass.getCanonicalName),
-        term("rowType",
-             "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) f0)"),
-        term("joinType", "INNER")
-      ),
-      term("select", "c", "f0 AS s")
-    )
-
-    util.verifySql(sqlQuery, expected)
-  }
-}