You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2017/07/13 10:18:22 UTC

[13/44] flink git commit: [FLINK-6617][table] Improve JAVA and SCALA logical plans consistent test

http://git-wip-us.apache.org/repos/asf/flink/blob/ecde7bc1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/SortITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/SortITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/SortITCase.scala
deleted file mode 100644
index 09d3c04..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/SortITCase.scala
+++ /dev/null
@@ -1,226 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.api.scala.batch.table
-
-import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.api.scala.{ExecutionEnvironment, _}
-import org.apache.flink.table.api.TableEnvironment
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.api.scala.batch.utils.SortTestUtils._
-import org.apache.flink.table.api.scala.batch.utils.TableProgramsClusterTestBase
-import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
-import org.apache.flink.test.util.TestBaseUtils
-import org.apache.flink.types.Row
-import org.junit._
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable
-
-@RunWith(classOf[Parameterized])
-class SortITCase(mode: TestExecutionMode, configMode: TableConfigMode)
-    extends TableProgramsClusterTestBase(mode, configMode) {
-
-  private def getExecutionEnvironment = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    // set the parallelism explicitly to make sure the query is executed in
-    // a distributed manner
-    env.setParallelism(3)
-    env
-  }
-
-  @Test
-  def testOrderByDesc(): Unit = {
-    val env = getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds = CollectionDataSets.get3TupleDataSet(env)
-    val t = ds.toTable(tEnv).orderBy('_1.desc)
-    implicit def tupleOrdering[T <: Product] = Ordering.by((x : T) =>
-      - x.productElement(0).asInstanceOf[Int] )
-
-    val expected = sortExpectedly(tupleDataSetStrings)
-    // squash all rows inside a partition into one element
-    val results = t.toDataSet[Row].mapPartition(rows => {
-      // the rows need to be copied in object reuse mode
-      val copied = new mutable.ArrayBuffer[Row]
-      rows.foreach(r => copied += Row.copy(r))
-      Seq(copied)
-    }).collect()
-
-    val result = results
-        .filterNot(_.isEmpty)
-        // sort all partitions by their head element to verify the order across partitions
-        .sortBy(_.head)(Ordering.by((r : Row) => -r.getField(0).asInstanceOf[Int]))
-        .reduceLeft(_ ++ _)
-
-    TestBaseUtils.compareOrderedResultAsText(result.asJava, expected)
-  }
-
-  @Test
-  def testOrderByAsc(): Unit = {
-    val env = getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds = CollectionDataSets.get3TupleDataSet(env)
-    val t = ds.toTable(tEnv).orderBy('_1.asc)
-    implicit def tupleOrdering[T <: Product] = Ordering.by((x : T) =>
-      x.productElement(0).asInstanceOf[Int] )
-
-    val expected = sortExpectedly(tupleDataSetStrings)
-    // squash all rows inside a partition into one element
-    val results = t.toDataSet[Row].mapPartition(rows => {
-      // the rows need to be copied in object reuse mode
-      val copied = new mutable.ArrayBuffer[Row]
-      rows.foreach(r => copied += Row.copy(r))
-      Seq(copied)
-    }).collect()
-
-    val result = results
-        .filterNot(_.isEmpty)
-        // sort all partitions by their head element to verify the order across partitions
-        .sortBy(_.head)(Ordering.by((r : Row) => r.getField(0).asInstanceOf[Int]))
-        .reduceLeft(_ ++ _)
-
-    TestBaseUtils.compareOrderedResultAsText(result.asJava, expected)
-  }
-
-  @Test
-  def testOrderByMultipleFieldsDifferentDirections(): Unit = {
-    val env = getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds = CollectionDataSets.get3TupleDataSet(env)
-    val t = ds.toTable(tEnv).orderBy('_2.asc, '_1.desc)
-    implicit def tupleOrdering[T <: Product] = Ordering.by((x : T) =>
-      (x.productElement(1).asInstanceOf[Long], - x.productElement(0).asInstanceOf[Int]) )
-
-    val expected = sortExpectedly(tupleDataSetStrings)
-    // squash all rows inside a partition into one element
-    val results = t.toDataSet[Row].mapPartition(rows => {
-      // the rows need to be copied in object reuse mode
-      val copied = new mutable.ArrayBuffer[Row]
-      rows.foreach(r => copied += Row.copy(r))
-      Seq(copied)
-    }).collect()
-
-    def rowOrdering = Ordering.by((r : Row) => {
-      // ordering for this tuple will fall into the previous defined tupleOrdering,
-      // so we just need to return the field by their defining sequence
-      (r.getField(0).asInstanceOf[Int], r.getField(1).asInstanceOf[Long])
-    })
-
-    val result = results
-        .filterNot(_.isEmpty)
-        // sort all partitions by their head element to verify the order across partitions
-        .sortBy(_.head)(rowOrdering)
-        .reduceLeft(_ ++ _)
-
-    TestBaseUtils.compareOrderedResultAsText(result.asJava, expected)
-  }
-
-  @Test
-  def testOrderByOffset(): Unit = {
-    val env = getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds = CollectionDataSets.get3TupleDataSet(env)
-    val t = ds.toTable(tEnv).orderBy('_1.asc).limit(3)
-    implicit def tupleOrdering[T <: Product] = Ordering.by((x : T) =>
-      x.productElement(0).asInstanceOf[Int] )
-
-    val expected = sortExpectedly(tupleDataSetStrings, 3, 21)
-    // squash all rows inside a partition into one element
-    val results = t.toDataSet[Row].mapPartition(rows => {
-      // the rows need to be copied in object reuse mode
-      val copied = new mutable.ArrayBuffer[Row]
-      rows.foreach(r => copied += Row.copy(r))
-      Seq(copied)
-    }).collect()
-
-    val result = results
-        .filterNot(_.isEmpty)
-        // sort all partitions by their head element to verify the order across partitions
-        .sortBy(_.head)(Ordering.by((r : Row) => r.getField(0).asInstanceOf[Int]))
-        .reduceLeft(_ ++ _)
-
-    TestBaseUtils.compareOrderedResultAsText(result.asJava, expected)
-  }
-
-  @Test
-  def testOrderByOffsetAndFetch(): Unit = {
-    val env = getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds = CollectionDataSets.get3TupleDataSet(env)
-    val t = ds.toTable(tEnv).orderBy('_1.desc).limit(3, 5)
-    implicit def tupleOrdering[T <: Product] = Ordering.by((x : T) =>
-      - x.productElement(0).asInstanceOf[Int] )
-
-    val expected = sortExpectedly(tupleDataSetStrings, 3, 8)
-    // squash all rows inside a partition into one element
-    val results = t.toDataSet[Row].mapPartition(rows => {
-      // the rows need to be copied in object reuse mode
-      val copied = new mutable.ArrayBuffer[Row]
-      rows.foreach(r => copied += Row.copy(r))
-      Seq(copied)
-    }).collect()
-
-    val result = results
-        .filterNot(_.isEmpty)
-        // sort all partitions by their head element to verify the order across partitions
-        .sortBy(_.head)(Ordering.by((r : Row) => -r.getField(0).asInstanceOf[Int]))
-        .reduceLeft(_ ++ _)
-
-    TestBaseUtils.compareOrderedResultAsText(result.asJava, expected)
-  }
-
-  @Test
-  def testOrderByFetch(): Unit = {
-    val env = getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds = CollectionDataSets.get3TupleDataSet(env)
-    val t = ds.toTable(tEnv).orderBy('_1.asc).limit(0, 5)
-    implicit def tupleOrdering[T <: Product] = Ordering.by((x : T) =>
-      x.productElement(0).asInstanceOf[Int] )
-
-    val expected = sortExpectedly(tupleDataSetStrings, 0, 5)
-    // squash all rows inside a partition into one element
-    val results = t.toDataSet[Row].mapPartition(rows => {
-      // the rows need to be copied in object reuse mode
-      val copied = new mutable.ArrayBuffer[Row]
-      rows.foreach(r => copied += Row.copy(r))
-      Seq(copied)
-    }).collect()
-
-    implicit def rowOrdering = Ordering.by((r : Row) => r.getField(0).asInstanceOf[Int])
-
-    val result = results
-        .filterNot(_.isEmpty)
-        // sort all partitions by their head element to verify the order across partitions
-        .sortBy(_.head)
-        .reduceLeft(_ ++ _)
-
-    TestBaseUtils.compareOrderedResultAsText(result.asJava, expected)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ecde7bc1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/UserDefinedTableFunctionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/UserDefinedTableFunctionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/UserDefinedTableFunctionTest.scala
index 4d7f6cb..079b10a 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/UserDefinedTableFunctionTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/UserDefinedTableFunctionTest.scala
@@ -17,97 +17,15 @@
  */
 package org.apache.flink.table.api.scala.batch.table
 
-import org.apache.flink.api.java.{DataSet => JDataSet, ExecutionEnvironment => JavaExecutionEnv}
+import org.apache.flink.api.scala._
 import org.apache.flink.table.api.scala._
-import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment => ScalaExecutionEnv, _}
-import org.apache.flink.api.java.typeutils.RowTypeInfo
-import org.apache.flink.types.Row
 import org.apache.flink.table.utils.TableTestUtil._
-import org.apache.flink.table.utils.{PojoTableFunc, TableFunc2, _}
-import org.apache.flink.table.api.{Table, TableEnvironment, Types}
+import org.apache.flink.table.utils._
 import org.junit.Test
-import org.mockito.Mockito._
 
 class UserDefinedTableFunctionTest extends TableTestBase {
 
   @Test
-  def testJavaScalaTableAPIEquality(): Unit = {
-    // mock
-    val ds = mock(classOf[DataSet[Row]])
-    val jDs = mock(classOf[JDataSet[Row]])
-    val typeInfo = new RowTypeInfo(Seq(Types.INT, Types.LONG, Types.STRING): _*)
-    when(ds.javaSet).thenReturn(jDs)
-    when(jDs.getType).thenReturn(typeInfo)
-
-    // Scala environment
-    val env = mock(classOf[ScalaExecutionEnv])
-    val tableEnv = TableEnvironment.getTableEnvironment(env)
-    val in1 = ds.toTable(tableEnv).as('a, 'b, 'c)
-
-    // Java environment
-    val javaEnv = mock(classOf[JavaExecutionEnv])
-    val javaTableEnv = TableEnvironment.getTableEnvironment(javaEnv)
-    val in2 = javaTableEnv.fromDataSet(jDs).as("a, b, c")
-    javaTableEnv.registerTable("MyTable", in2)
-
-    // test cross join
-    val func1 = new TableFunc1
-    javaTableEnv.registerFunction("func1", func1)
-    var scalaTable = in1.join(func1('c) as 's).select('c, 's)
-    var javaTable = in2.join(new Table(javaTableEnv, "func1(c).as(s)")).select("c, s")
-    verifyTableEquals(scalaTable, javaTable)
-
-    // test left outer join
-    scalaTable = in1.leftOuterJoin(func1('c) as 's).select('c, 's)
-    javaTable = in2.leftOuterJoin(new Table(javaTableEnv, "as(func1(c), s)")).select("c, s")
-    verifyTableEquals(scalaTable, javaTable)
-
-    // test overloading
-    scalaTable = in1.join(func1('c, "$") as 's).select('c, 's)
-    javaTable = in2.join(new Table(javaTableEnv, "func1(c, '$') as (s)")).select("c, s")
-    verifyTableEquals(scalaTable, javaTable)
-
-    // test custom result type
-    val func2 = new TableFunc2
-    javaTableEnv.registerFunction("func2", func2)
-    scalaTable = in1.join(func2('c) as ('name, 'len)).select('c, 'name, 'len)
-    javaTable = in2.join(new Table(javaTableEnv, "func2(c).as(name, len)")).select("c, name, len")
-    verifyTableEquals(scalaTable, javaTable)
-
-    // test hierarchy generic type
-    val hierarchy = new HierarchyTableFunction
-    javaTableEnv.registerFunction("hierarchy", hierarchy)
-    scalaTable = in1.join(hierarchy('c) as ('name, 'adult, 'len))
-      .select('c, 'name, 'len, 'adult)
-    javaTable = in2.join(new Table(javaTableEnv, "AS(hierarchy(c), name, adult, len)"))
-      .select("c, name, len, adult")
-    verifyTableEquals(scalaTable, javaTable)
-
-    // test pojo type
-    val pojo = new PojoTableFunc
-    javaTableEnv.registerFunction("pojo", pojo)
-    scalaTable = in1.join(pojo('c))
-      .select('c, 'name, 'age)
-    javaTable = in2.join(new Table(javaTableEnv, "pojo(c)"))
-      .select("c, name, age")
-    verifyTableEquals(scalaTable, javaTable)
-
-    // test with filter
-    scalaTable = in1.join(func2('c) as ('name, 'len))
-      .select('c, 'name, 'len).filter('len > 2)
-    javaTable = in2.join(new Table(javaTableEnv, "func2(c) as (name, len)"))
-      .select("c, name, len").filter("len > 2")
-    verifyTableEquals(scalaTable, javaTable)
-
-    // test with scalar function
-    scalaTable = in1.join(func1('c.substring(2)) as 's)
-      .select('a, 'c, 's)
-    javaTable = in2.join(new Table(javaTableEnv, "func1(substring(c, 2)) as (s)"))
-      .select("a, c, s")
-    verifyTableEquals(scalaTable, javaTable)
-  }
-
-  @Test
   def testCrossJoin(): Unit = {
     val util = batchTestUtil()
     val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)

http://git-wip-us.apache.org/repos/asf/flink/blob/ecde7bc1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/AggregationsStringExpressionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/AggregationsStringExpressionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/AggregationsStringExpressionTest.scala
index eda958c..b085160 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/AggregationsStringExpressionTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/AggregationsStringExpressionTest.scala
@@ -19,24 +19,19 @@
 package org.apache.flink.table.api.scala.batch.table.stringexpr
 
 import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.table.api.TableEnvironment
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.utils.TableTestBase
 import org.apache.flink.table.api.java.utils.UserDefinedAggFunctions.WeightedAvgWithMergeAndReset
+import org.apache.flink.table.api.scala._
 import org.apache.flink.table.api.scala.batch.utils.LogicalPlanFormatUtils
 import org.apache.flink.table.functions.aggfunctions.CountAggFunction
+import org.apache.flink.table.utils.TableTestBase
 import org.junit._
 
 class AggregationsStringExpressionTest extends TableTestBase {
 
   @Test
   def testAggregationTypes(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
+    val util = batchTestUtil()
+    val t = util.addTable[(Int, Long, String)]("Table3")
 
     val t1 = t.select('_1.sum, '_1.sum0, '_1.min, '_1.max, '_1.count, '_1.avg)
     val t2 = t.select("_1.sum, _1.sum0, _1.min, _1.max, _1.count, _1.avg")
@@ -46,13 +41,8 @@ class AggregationsStringExpressionTest extends TableTestBase {
 
   @Test
   def testWorkingAggregationDataTypes(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val t = env.fromElements(
-      (1: Byte, 1: Short, 1, 1L, 1.0f, 1.0d, "Hello"),
-      (2: Byte, 2: Short, 2, 2L, 2.0f, 2.0d, "Ciao")).toTable(tEnv)
+    val util = batchTestUtil()
+    val t = util.addTable[(Byte, Short, Int, Long, Float, Double, String)]("Table7")
 
     val t1 = t.select('_1.avg, '_2.avg, '_3.avg, '_4.avg, '_5.avg, '_6.avg, '_7.count)
     val t2 = t.select("_1.avg, _2.avg, _3.avg, _4.avg, _5.avg, _6.avg, _7.count")
@@ -62,13 +52,8 @@ class AggregationsStringExpressionTest extends TableTestBase {
 
   @Test
   def testProjection(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val t = env.fromElements(
-      (1: Byte, 1: Short),
-      (2: Byte, 2: Short)).toTable(tEnv)
+    val util = batchTestUtil()
+    val t = util.addTable[(Byte, Short)]("Table2")
 
     val t1 = t.select('_1.avg, '_1.sum, '_1.count, '_2.avg, '_2.sum)
     val t2 = t.select("_1.avg, _1.sum, _1.count, _2.avg, _2.sum")
@@ -78,11 +63,8 @@ class AggregationsStringExpressionTest extends TableTestBase {
 
   @Test
   def testAggregationWithArithmetic(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val t = env.fromElements((1f, "Hello"), (2f, "Ciao")).toTable(tEnv)
+    val util = batchTestUtil()
+    val t = util.addTable[(Long, String)]("Table2")
 
     val t1 = t.select(('_1 + 2).avg + 2, '_2.count + 5)
     val t2 = t.select("(_1 + 2).avg + 2, _2.count + 5")
@@ -92,11 +74,8 @@ class AggregationsStringExpressionTest extends TableTestBase {
 
   @Test
   def testAggregationWithTwoCount(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val t = env.fromElements((1f, "Hello"), (2f, "Ciao")).toTable(tEnv)
+    val util = batchTestUtil()
+    val t = util.addTable[(Long, String)]("Table2")
 
     val t1 = t.select('_1.count, '_2.count)
     val t2 = t.select("_1.count, _2.count")
@@ -106,13 +85,8 @@ class AggregationsStringExpressionTest extends TableTestBase {
 
   @Test
   def testAggregationAfterProjection(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val t = env.fromElements(
-      (1: Byte, 1: Short, 1, 1L, 1.0f, 1.0d, "Hello"),
-      (2: Byte, 2: Short, 2, 2L, 2.0f, 2.0d, "Ciao")).toTable(tEnv)
+    val util = batchTestUtil()
+    val t = util.addTable[(Byte, Short, Int, Long, Float, Double, String)]("Table7")
 
     val t1 = t.select('_1, '_2, '_3)
       .select('_1.avg, '_2.sum, '_3.count)
@@ -125,10 +99,8 @@ class AggregationsStringExpressionTest extends TableTestBase {
 
   @Test
   def testDistinct(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+    val util = batchTestUtil()
+    val ds = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
 
     val distinct = ds.select('b).distinct()
     val distinct2 = ds.select("b").distinct()
@@ -138,10 +110,8 @@ class AggregationsStringExpressionTest extends TableTestBase {
 
   @Test
   def testDistinctAfterAggregate(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val ds = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, 'd, 'e)
+    val util = batchTestUtil()
+    val ds = util.addTable[(Int, Long, Int, String, Long)]("Table5", 'a, 'b, 'c, 'd, 'e)
 
     val distinct = ds.groupBy('a, 'e).select('e).distinct()
     val distinct2 = ds.groupBy("a, e").select("e").distinct()
@@ -154,11 +124,8 @@ class AggregationsStringExpressionTest extends TableTestBase {
 
   @Test
   def testGroupedAggregate(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+    val util = batchTestUtil()
+    val t = util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
 
     val t1 = t.groupBy('b).select('b, 'a.sum)
     val t2 = t.groupBy("b").select("b, a.sum")
@@ -168,11 +135,8 @@ class AggregationsStringExpressionTest extends TableTestBase {
 
   @Test
   def testGroupingKeyForwardIfNotUsed(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+    val util = batchTestUtil()
+    val t = util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
 
     val t1 = t.groupBy('b).select('a.sum)
     val t2 = t.groupBy("b").select("a.sum")
@@ -182,11 +146,8 @@ class AggregationsStringExpressionTest extends TableTestBase {
 
   @Test
   def testGroupNoAggregation(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+    val util = batchTestUtil()
+    val t = util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
 
     val t1 = t
       .groupBy('b)
@@ -205,11 +166,8 @@ class AggregationsStringExpressionTest extends TableTestBase {
 
   @Test
   def testGroupedAggregateWithConstant1(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+    val util = batchTestUtil()
+    val t = util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
 
     val t1 = t.select('a, 4 as 'four, 'b)
       .groupBy('four, 'a)
@@ -224,11 +182,8 @@ class AggregationsStringExpressionTest extends TableTestBase {
 
   @Test
   def testGroupedAggregateWithConstant2(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+    val util = batchTestUtil()
+    val t = util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
 
     val t1 = t.select('b, 4 as 'four, 'a)
       .groupBy('b, 'four)
@@ -242,11 +197,8 @@ class AggregationsStringExpressionTest extends TableTestBase {
 
   @Test
   def testGroupedAggregateWithExpression(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val t = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, 'd, 'e)
+    val util = batchTestUtil()
+    val t = util.addTable[(Int, Long, Int, String, Long)]("Table5", 'a, 'b, 'c, 'd, 'e)
 
     val t1 = t.groupBy('e, 'b % 3)
       .select('c.min, 'e, 'a.avg, 'd.count)
@@ -258,11 +210,8 @@ class AggregationsStringExpressionTest extends TableTestBase {
 
   @Test
   def testGroupedAggregateWithFilter(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+    val util = batchTestUtil()
+    val t = util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
 
     val t1 = t.groupBy('b)
       .select('b, 'a.sum)
@@ -295,16 +244,13 @@ class AggregationsStringExpressionTest extends TableTestBase {
 
   @Test
   def testAggregateWithUDAGG(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+    val util = batchTestUtil()
+    val t = util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
 
     val myCnt = new CountAggFunction
-    tEnv.registerFunction("myCnt", myCnt)
+    util.tableEnv.registerFunction("myCnt", myCnt)
     val myWeightedAvg = new WeightedAvgWithMergeAndReset
-    tEnv.registerFunction("myWeightedAvg", myWeightedAvg)
+    util.tableEnv.registerFunction("myWeightedAvg", myWeightedAvg)
 
     val t1 = t.select(myCnt('a) as 'aCnt, myWeightedAvg('b, 'a) as 'wAvg)
     val t2 = t.select("myCnt(a) as aCnt, myWeightedAvg(b, a) as wAvg")
@@ -323,16 +269,14 @@ class AggregationsStringExpressionTest extends TableTestBase {
 
   @Test
   def testGroupedAggregateWithUDAGG(): Unit = {
+    val util = batchTestUtil()
+    val t = util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
 
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
 
     val myCnt = new CountAggFunction
-    tEnv.registerFunction("myCnt", myCnt)
+    util.tableEnv.registerFunction("myCnt", myCnt)
     val myWeightedAvg = new WeightedAvgWithMergeAndReset
-    tEnv.registerFunction("myWeightedAvg", myWeightedAvg)
+    util.tableEnv.registerFunction("myWeightedAvg", myWeightedAvg)
 
     val t1 = t.groupBy('b)
       .select('b, myCnt('a) + 9 as 'aCnt, myWeightedAvg('b, 'a) * 2 as 'wAvg, myWeightedAvg('a, 'a))

http://git-wip-us.apache.org/repos/asf/flink/blob/ecde7bc1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/CalcStringExpressionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/CalcStringExpressionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/CalcStringExpressionTest.scala
index 381b8c8..9736ec1 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/CalcStringExpressionTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/CalcStringExpressionTest.scala
@@ -21,21 +21,20 @@ package org.apache.flink.table.api.scala.batch.table.stringexpr
 import java.sql.{Date, Time, Timestamp}
 
 import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.util.CollectionDataSets
+import org.apache.flink.api.scala.util.CollectionDataSets.CustomType
+import org.apache.flink.table.api.Types
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.api.scala.batch.utils.LogicalPlanFormatUtils
-import org.apache.flink.table.api.{TableEnvironment, Types}
 import org.apache.flink.table.expressions.Literal
+import org.apache.flink.table.utils.TableTestBase
 import org.junit._
 
-class CalcStringExpressionTest {
+class CalcStringExpressionTest extends TableTestBase {
 
   @Test
   def testSimpleSelectAllWithAs(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+    val util = batchTestUtil()
+    val t = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
 
     val t1 = t.select('a, 'b, 'c)
     val t2 = t.select("a, b, c")
@@ -48,10 +47,8 @@ class CalcStringExpressionTest {
 
   @Test
   def testSimpleSelectWithNaming(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
+    val util = batchTestUtil()
+    val t = util.addTable[(Int, Long, String)]("Table3")
 
     val t1 = t
       .select('_1 as 'a, '_2 as 'b, '_1 as 'c)
@@ -69,10 +66,8 @@ class CalcStringExpressionTest {
 
   @Test
   def testSimpleSelectRenameAll(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
+    val util = batchTestUtil()
+    val t = util.addTable[(Int, Long, String)]("Table3")
 
     val t1 = t
       .select('_1 as 'a, '_2 as 'b, '_3 as 'c)
@@ -90,10 +85,8 @@ class CalcStringExpressionTest {
 
   @Test
   def testSelectStar(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+    val util = batchTestUtil()
+    val t = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
 
     val t1 = t.select('*)
     val t2 = t.select("*")
@@ -106,10 +99,8 @@ class CalcStringExpressionTest {
 
   @Test
   def testAllRejectingFilter(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+    val util = batchTestUtil()
+    val ds = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
 
     val t1 = ds.filter( Literal(false) )
     val t2 = ds.filter("faLsE")
@@ -122,10 +113,8 @@ class CalcStringExpressionTest {
 
   @Test
   def testAllPassingFilter(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+    val util = batchTestUtil()
+    val ds = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
 
     val t1 = ds.filter( Literal(true) )
     val t2 = ds.filter("trUe")
@@ -138,10 +127,8 @@ class CalcStringExpressionTest {
 
   @Test
   def testFilterOnStringTupleField(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+    val util = batchTestUtil()
+    val ds = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
 
     val t1 = ds.filter( 'c.like("%world%") )
     val t2 = ds.filter("c.like('%world%')")
@@ -154,10 +141,8 @@ class CalcStringExpressionTest {
 
   @Test
   def testFilterOnIntegerTupleField(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+    val util = batchTestUtil()
+    val ds = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
 
     val t1 = ds.filter( 'a % 2 === 0 )
     val t2 = ds.filter( "a % 2 = 0 ")
@@ -170,10 +155,8 @@ class CalcStringExpressionTest {
 
   @Test
   def testNotEquals(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+    val util = batchTestUtil()
+    val ds = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
 
     val t1 = ds.filter( 'a % 2 !== 0 )
     val t2 = ds.filter("a % 2 <> 0")
@@ -186,10 +169,8 @@ class CalcStringExpressionTest {
 
   @Test
   def testDisjunctivePredicate(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+    val util = batchTestUtil()
+    val ds = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
 
     val t1 = ds.filter( 'a < 2 || 'a > 20)
     val t2 = ds.filter("a < 2 || a > 20")
@@ -202,10 +183,8 @@ class CalcStringExpressionTest {
 
   @Test
   def testConsecutiveFilters(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+    val util = batchTestUtil()
+    val ds = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
 
     val t1 = ds.filter('a % 2 !== 0).filter('b % 2 === 0)
     val t2 = ds.filter("a % 2 != 0").filter("b % 2 = 0")
@@ -218,10 +197,8 @@ class CalcStringExpressionTest {
 
   @Test
   def testFilterBasicType(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val ds = CollectionDataSets.getStringDataSet(env).toTable(tEnv, 'a)
+    val util = batchTestUtil()
+    val ds = util.addTable[String]("Table3",'a)
 
     val t1 = ds.filter( 'a.like("H%") )
     val t2 = ds.filter( "a.like('H%')" )
@@ -234,11 +211,8 @@ class CalcStringExpressionTest {
 
   @Test
   def testFilterOnCustomType(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val ds = CollectionDataSets.getCustomTypeDataSet(env)
-    val t = ds.toTable(tEnv, 'myInt as 'i, 'myLong as 'l, 'myString as 's)
+    val util = batchTestUtil()
+    val t = util.addTable[CustomType]("Table3",'myInt as 'i, 'myLong as 'l, 'myString as 's)
 
     val t1 = t.filter( 's.like("%a%") )
     val t2 = t.filter("s.like('%a%')")
@@ -251,10 +225,8 @@ class CalcStringExpressionTest {
 
   @Test
   def testSimpleCalc(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
+    val util = batchTestUtil()
+    val t = util.addTable[(Int, Long, String)]("Table3")
 
     val t1 = t.select('_1, '_2, '_3)
       .where('_1 < 7)
@@ -272,10 +244,8 @@ class CalcStringExpressionTest {
 
   @Test
   def testCalcWithTwoFilters(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
+    val util = batchTestUtil()
+    val t = util.addTable[(Int, Long, String)]("Table3")
 
     val t1 = t.select('_1, '_2, '_3)
       .where('_1 < 7 && '_2 === 3)
@@ -297,10 +267,8 @@ class CalcStringExpressionTest {
 
   @Test
   def testCalcWithAggregation(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
+    val util = batchTestUtil()
+    val t = util.addTable[(Int, Long, String)]("Table3")
 
     val t1 = t.select('_1, '_2, '_3)
       .where('_1 < 15)
@@ -325,11 +293,9 @@ class CalcStringExpressionTest {
 
   @Test
   def testCalcJoin(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
+    val util = batchTestUtil()
+    val ds1 = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
+    val ds2 = util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'h)
 
     val t1 = ds1.select('a, 'b).join(ds2).where('b === 'e).select('a, 'b, 'd, 'e, 'f)
       .where('b > 1).select('a, 'd).where('d === 2)
@@ -344,17 +310,9 @@ class CalcStringExpressionTest {
 
   @Test
   def testAdvancedDataTypes(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val t = env
-      .fromElements((
-        BigDecimal("78.454654654654654").bigDecimal,
-        BigDecimal("4E+9999").bigDecimal,
-        Date.valueOf("1984-07-12"),
-        Time.valueOf("14:34:24"),
-        Timestamp.valueOf("1984-07-12 14:34:24")))
-      .toTable(tEnv, 'a, 'b, 'c, 'd, 'e)
+    val util = batchTestUtil()
+    val t = util
+      .addTable[(BigDecimal, BigDecimal, Date, Time, Timestamp)]("Table5", 'a, 'b, 'c, 'd, 'e)
 
     val t1 = t.select('a, 'b, 'c, 'd, 'e, BigDecimal("11.2"), BigDecimal("11.2").bigDecimal,
         "1984-07-12".cast(Types.SQL_DATE), "14:34:24".cast(Types.SQL_TIME),
@@ -371,9 +329,8 @@ class CalcStringExpressionTest {
 
   @Test
   def testIntegerBiggerThan128(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tableEnv = TableEnvironment.getTableEnvironment(env)
-    val t = env.fromElements((300, 1L, "Hello")).toTable(tableEnv, 'a, 'b, 'c)
+    val util = batchTestUtil()
+    val t = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
 
     val t1 = t.filter('a === 300)
     val t2 = t.filter("a = 300")

http://git-wip-us.apache.org/repos/asf/flink/blob/ecde7bc1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/CastingStringExpressionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/CastingStringExpressionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/CastingStringExpressionTest.scala
index 8a2db41..8ca5745 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/CastingStringExpressionTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/CastingStringExpressionTest.scala
@@ -27,7 +27,7 @@ import org.junit._
 class CastingStringExpressionTest {
 
   @Test
-  def testNumericAutocastInArithmetic() {
+  def testNumericAutoCastInArithmetic() {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
     val tableEnv = TableEnvironment.getTableEnvironment(env)
@@ -47,7 +47,7 @@ class CastingStringExpressionTest {
 
   @Test
   @throws[Exception]
-  def testNumericAutocastInComparison() {
+  def testNumericAutoCastInComparison() {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
     val tableEnv = TableEnvironment.getTableEnvironment(env)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ecde7bc1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/JoinStringExpressionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/JoinStringExpressionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/JoinStringExpressionTest.scala
index b2f683c..067441d 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/JoinStringExpressionTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/JoinStringExpressionTest.scala
@@ -19,22 +19,19 @@
 package org.apache.flink.table.api.scala.batch.table.stringexpr
 
 import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.table.api.TableEnvironment
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.api.scala.batch.utils.LogicalPlanFormatUtils
 import org.apache.flink.table.expressions.Literal
+import org.apache.flink.table.utils.TableTestBase
 import org.junit._
 
-class JoinStringExpressionTest {
+class JoinStringExpressionTest extends TableTestBase {
 
   @Test
   def testJoin(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
+    val util = batchTestUtil()
+    val ds1 = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
+    val ds2 = util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'h)
 
     val t1Scala = ds1.join(ds2).where('b === 'e).select('c, 'g)
     val t1Java = ds1.join(ds2).where("b === e").select("c, g")
@@ -47,12 +44,9 @@ class JoinStringExpressionTest {
 
   @Test
   def testJoinWithFilter(): Unit = {
-
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv).as('d, 'e, 'f, 'g, 'h)
+    val util = batchTestUtil()
+    val ds1 = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
+    val ds2 = util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'h)
 
     val t1Scala = ds1.join(ds2).where('b === 'e && 'b < 2).select('c, 'g)
     val t1Java = ds1.join(ds2).where("b === e && b < 2").select("c, g")
@@ -65,11 +59,9 @@ class JoinStringExpressionTest {
 
   @Test
   def testJoinWithJoinFilter(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
+    val util = batchTestUtil()
+    val ds1 = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
+    val ds2 = util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'h)
 
     val t1Scala = ds1.join(ds2).where('b === 'e && 'a < 6 && 'h < 'b).select('c, 'g)
     val t1Java = ds1.join(ds2).where("b === e && a < 6 && h < b").select("c, g")
@@ -82,11 +74,9 @@ class JoinStringExpressionTest {
 
   @Test
   def testJoinWithMultipleKeys(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
+    val util = batchTestUtil()
+    val ds1 = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
+    val ds2 = util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'h)
 
     val t1Scala = ds1.join(ds2).filter('a === 'd && 'b === 'h).select('c, 'g)
     val t1Java = ds1.join(ds2).filter("a === d && b === h").select("c, g")
@@ -99,11 +89,9 @@ class JoinStringExpressionTest {
 
   @Test
   def testJoinWithAggregation(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
+    val util = batchTestUtil()
+    val ds1 = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
+    val ds2 = util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'h)
 
     val t1Scala = ds1.join(ds2).where('a === 'd).select('g.count)
     val t1Java = ds1.join(ds2).where("a === d").select("g.count")
@@ -118,11 +106,9 @@ class JoinStringExpressionTest {
 
   @Test
   def testJoinWithGroupedAggregation(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
+    val util = batchTestUtil()
+    val ds1 = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
+    val ds2 = util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'h)
 
     val t1 = ds1.join(ds2)
       .where('a === 'd)
@@ -143,12 +129,10 @@ class JoinStringExpressionTest {
 
   @Test
   def testJoinPushThroughJoin(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
-    val ds3 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'j, 'k, 'l)
+    val util = batchTestUtil()
+    val ds1 = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
+    val ds3 = util.addTable[(Int, Long, String)]("Table4",'j, 'k, 'l)
+    val ds2 = util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'h)
 
     val t1 = ds1.join(ds2)
       .where(Literal(true))
@@ -169,11 +153,9 @@ class JoinStringExpressionTest {
 
   @Test
   def testJoinWithDisjunctivePred(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
+    val util = batchTestUtil()
+    val ds1 = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
+    val ds2 = util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'h)
 
     val t1 = ds1.join(ds2).filter('a === 'd && ('b === 'e || 'b === 'e - 10)).select('c, 'g)
     val t2 = ds1.join(ds2).filter("a = d && (b = e || b = e - 10)").select("c, g")
@@ -186,11 +168,9 @@ class JoinStringExpressionTest {
 
   @Test
   def testJoinWithExpressionPreds(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
+    val util = batchTestUtil()
+    val ds1 = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
+    val ds2 = util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'h)
 
     val t1 = ds1.join(ds2).filter('b === 'h + 1 && 'a - 1 === 'd + 2).select('c, 'g)
     val t2 = ds1.join(ds2).filter("b = h + 1 && a - 1 = d + 2").select("c, g")
@@ -203,12 +183,9 @@ class JoinStringExpressionTest {
 
   @Test
   def testLeftJoinWithMultipleKeys(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    tEnv.getConfig.setNullCheck(true)
-
-    val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
+    val util = batchTestUtil()
+    val ds1 = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
+    val ds2 = util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'h)
 
     val t1 = ds1.leftOuterJoin(ds2, 'a === 'd && 'b === 'h).select('c, 'g)
     val t2 = ds1.leftOuterJoin(ds2, "a = d && b = h").select("c, g")
@@ -221,12 +198,9 @@ class JoinStringExpressionTest {
 
   @Test
   def testRightJoinWithMultipleKeys(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    tEnv.getConfig.setNullCheck(true)
-
-    val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
+    val util = batchTestUtil()
+    val ds1 = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
+    val ds2 = util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'h)
 
     val t1 = ds1.rightOuterJoin(ds2, 'a === 'd && 'b === 'h).select('c, 'g)
     val t2 = ds1.rightOuterJoin(ds2, "a = d && b = h").select("c, g")
@@ -239,12 +213,9 @@ class JoinStringExpressionTest {
 
   @Test
   def testFullOuterJoinWithMultipleKeys(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    tEnv.getConfig.setNullCheck(true)
-
-    val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
+    val util = batchTestUtil()
+    val ds1 = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
+    val ds2 = util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 'f, 'g, 'h)
 
     val t1 = ds1.fullOuterJoin(ds2, 'a === 'd && 'b === 'h).select('c, 'g)
     val t2 = ds1.fullOuterJoin(ds2, "a = d && b = h").select("c, g")

http://git-wip-us.apache.org/repos/asf/flink/blob/ecde7bc1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/UserDefinedTableFunctionStringExpressionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/UserDefinedTableFunctionStringExpressionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/UserDefinedTableFunctionStringExpressionTest.scala
new file mode 100644
index 0000000..8cf9979
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/UserDefinedTableFunctionStringExpressionTest.scala
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.api.scala.batch.table.stringexpr
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.scala._
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.types.Row
+import org.apache.flink.table.utils.{PojoTableFunc, TableFunc2, _}
+import org.apache.flink.table.api.{Table, Types}
+import org.junit.Test
+
+class UserDefinedTableFunctionStringExpressionTest extends TableTestBase {
+
+  @Test
+  def testJoin(): Unit = {
+    val util = batchTestUtil()
+
+    val typeInfo = new RowTypeInfo(Seq(Types.INT, Types.LONG, Types.STRING): _*)
+    val sTab = util.addTable[(Int, Long, String)]("Table1", 'a, 'b, 'c)
+    val jTab = util.addJavaTable[Row](typeInfo, "Table2", "a, b, c")
+
+    // test cross join
+    val func1 = new TableFunc1
+    util.javaTableEnv.registerFunction("func1", func1)
+    var scalaTable = sTab.join(func1('c) as 's).select('c, 's)
+    var javaTable = jTab.join(new Table(util.javaTableEnv, "func1(c).as(s)")).select("c, s")
+    verifyTableEquals(scalaTable, javaTable)
+
+    // test left outer join
+    scalaTable = sTab.leftOuterJoin(func1('c) as 's).select('c, 's)
+    javaTable = jTab.leftOuterJoin(new Table(util.javaTableEnv, "as(func1(c), s)")).select("c, s")
+    verifyTableEquals(scalaTable, javaTable)
+
+    // test overloading
+    scalaTable = sTab.join(func1('c, "$") as 's).select('c, 's)
+    javaTable = jTab.join(new Table(util.javaTableEnv, "func1(c, '$') as (s)")).select("c, s")
+    verifyTableEquals(scalaTable, javaTable)
+
+    // test custom result type
+    val func2 = new TableFunc2
+    util.javaTableEnv.registerFunction("func2", func2)
+    scalaTable = sTab.join(func2('c) as('name, 'len)).select('c, 'name, 'len)
+    javaTable = jTab.join(
+      new Table(util.javaTableEnv, "func2(c).as(name, len)")).select("c, name, len")
+    verifyTableEquals(scalaTable, javaTable)
+
+    // test hierarchy generic type
+    val hierarchy = new HierarchyTableFunction
+    util.javaTableEnv.registerFunction("hierarchy", hierarchy)
+    scalaTable = sTab.join(hierarchy('c) as('name, 'adult, 'len)).select('c, 'name, 'len, 'adult)
+    javaTable = jTab.join(new Table(util.javaTableEnv, "AS(hierarchy(c), name, adult, len)"))
+      .select("c, name, len, adult")
+    verifyTableEquals(scalaTable, javaTable)
+
+    // test pojo type
+    val pojo = new PojoTableFunc
+    util.javaTableEnv.registerFunction("pojo", pojo)
+    scalaTable = sTab.join(pojo('c)).select('c, 'name, 'age)
+    javaTable = jTab.join(new Table(util.javaTableEnv, "pojo(c)")).select("c, name, age")
+    verifyTableEquals(scalaTable, javaTable)
+
+    // test with filter
+    scalaTable = sTab.join(func2('c) as('name, 'len)).select('c, 'name, 'len).filter('len > 2)
+    javaTable = jTab.join(new Table(util.javaTableEnv, "func2(c) as (name, len)"))
+      .select("c, name, len").filter("len > 2")
+    verifyTableEquals(scalaTable, javaTable)
+
+    // test with scalar function
+    scalaTable = sTab.join(func1('c.substring(2)) as 's).select('a, 'c, 's)
+    javaTable = jTab.join(
+      new Table(util.javaTableEnv, "func1(substring(c, 2)) as (s)")).select("a, c, s")
+    verifyTableEquals(scalaTable, javaTable)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ecde7bc1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/AggregationsValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/AggregationsValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/AggregationsValidationTest.scala
index 8e90fa8..25ecd96 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/AggregationsValidationTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/AggregationsValidationTest.scala
@@ -19,24 +19,24 @@
 package org.apache.flink.table.api.scala.batch.table.validation
 
 import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.util.CollectionDataSets
 import org.apache.flink.table.api.java.utils.UserDefinedAggFunctions.{OverAgg0, WeightedAvgWithMergeAndReset}
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.api.{TableEnvironment, ValidationException}
+import org.apache.flink.table.utils.TableTestBase
 import org.junit._
 
-class AggregationsValidationTest {
+class AggregationsValidationTest extends TableTestBase {
 
   /**
     * OVER clause is necessary for [[OverAgg0]] window function.
     */
   @Test(expected = classOf[ValidationException])
   def testOverAggregation(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tableEnv = TableEnvironment.getTableEnvironment(env)
-    val input = CollectionDataSets.get3TupleDataSet(env).toTable(tableEnv, 'a, 'b, 'c)
+    val util = batchTestUtil()
+    val t = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
+
     val overAgg = new OverAgg0
-    input.select('c.count, overAgg('b, 'a))
+    t.select('c.count, overAgg('b, 'a))
   }
 
   @Test(expected = classOf[ValidationException])
@@ -52,199 +52,176 @@ class AggregationsValidationTest {
 
   @Test(expected = classOf[ValidationException])
   def testNoNestedAggregations(): Unit = {
+    val util = batchTestUtil()
+    val t = util.addTable[(String, Int)]("Table2")
 
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    env.fromElements(("Hello", 1)).toTable(tEnv)
-      // Must fail. Sum aggregation can not be chained.
-      .select('_2.sum.sum)
+    // Must fail. Sum aggregation can not be chained.
+    t.select('_2.sum.sum)
   }
 
   @Test(expected = classOf[ValidationException])
   def testGroupingOnNonExistentField(): Unit = {
+    val util = batchTestUtil()
+    val t = util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
 
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-      // must fail. '_foo not a valid field
-      .groupBy('_foo)
-      .select('a.avg)
+    // must fail. '_foo not a valid field
+    t.groupBy('_foo)
+    .select('a.avg)
   }
 
   @Test(expected = classOf[ValidationException])
   def testGroupingInvalidSelection(): Unit = {
+    val util = batchTestUtil()
+    val t = util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
 
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-      .groupBy('a, 'b)
-      // must fail. 'c is not a grouping key or aggregation
-      .select('c)
+    t.groupBy('a, 'b)
+    // must fail. 'c is not a grouping key or aggregation
+    .select('c)
   }
 
   @Test(expected = classOf[ValidationException])
   def testAggregationOnNonExistingField(): Unit = {
 
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
+    val util = batchTestUtil()
+    val t = util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
 
-    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
-      // Must fail. Field 'foo does not exist.
-      .select('foo.avg)
+    // Must fail. Field 'foo does not exist.
+    t.select('foo.avg)
   }
 
   @Test(expected = classOf[ValidationException])
   @throws[Exception]
   def testInvalidUdAggArgs() {
-    val env= ExecutionEnvironment.getExecutionEnvironment
-    val tableEnv = TableEnvironment.getTableEnvironment(env)
+    val util = batchTestUtil()
+    val t = util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
 
     val myWeightedAvg = new WeightedAvgWithMergeAndReset
 
-    val input = CollectionDataSets.get3TupleDataSet(env).toTable(tableEnv, 'a, 'b, 'c)
-    input
-      // must fail. UDAGG does not accept String type
-      .select(myWeightedAvg('c, 'a))
+    // must fail. UDAGG does not accept String type
+    t.select(myWeightedAvg('c, 'a))
   }
 
   @Test(expected = classOf[ValidationException])
   @throws[Exception]
   def testGroupingInvalidUdAggArgs() {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tableEnv = TableEnvironment.getTableEnvironment(env)
+    val util = batchTestUtil()
+    val t = util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
 
     val myWeightedAvg = new WeightedAvgWithMergeAndReset
 
-    val input = CollectionDataSets.get3TupleDataSet(env).toTable(tableEnv, 'a, 'b, 'c)
-    input
-      .groupBy('b)
-      // must fail. UDAGG does not accept String type
-      .select(myWeightedAvg('c, 'a))
+    t.groupBy('b)
+    // must fail. UDAGG does not accept String type
+    .select(myWeightedAvg('c, 'a))
   }
 
   @Test(expected = classOf[ValidationException])
   @throws[Exception]
   def testGroupingNestedUdAgg() {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tableEnv = TableEnvironment.getTableEnvironment(env)
+    val util = batchTestUtil()
+    val t = util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
 
     val myWeightedAvg = new WeightedAvgWithMergeAndReset
 
-    val input = CollectionDataSets.get3TupleDataSet(env).toTable(tableEnv, 'a, 'b, 'c)
-    input
-      .groupBy('c)
-      // must fail. UDAGG does not accept String type
-      .select(myWeightedAvg(myWeightedAvg('b, 'a), 'a))
+    t.groupBy('c)
+    // must fail. UDAGG does not accept String type
+    .select(myWeightedAvg(myWeightedAvg('b, 'a), 'a))
   }
 
   @Test(expected = classOf[ValidationException])
   @throws[Exception]
   def testAggregationOnNonExistingFieldJava() {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tableEnv = TableEnvironment.getTableEnvironment(env)
-    val table = CollectionDataSets.get3TupleDataSet(env).toTable(tableEnv)
-    table.select("foo.avg")
+    val util = batchTestUtil()
+    val t = util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
+
+    t.select("foo.avg")
   }
 
   @Test(expected = classOf[ValidationException])
   @throws[Exception]
   def testNonWorkingAggregationDataTypesJava() {
-    val env= ExecutionEnvironment.getExecutionEnvironment
-    val tableEnv = TableEnvironment.getTableEnvironment(env)
-    val table = env.fromElements((1f, "Hello")).toTable(tableEnv)
+    val util = batchTestUtil()
+    val t = util.addTable[(Long, String)]("Table2",'b, 'c)
     // Must fail. Cannot compute SUM aggregate on String field.
-    table.select("f1.sum")
+    t.select("c.sum")
   }
 
   @Test(expected = classOf[ValidationException])
   @throws[Exception]
   def testNoNestedAggregationsJava() {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tableEnv = TableEnvironment.getTableEnvironment(env)
-    val table = env.fromElements((1f, "Hello")).toTable(tableEnv)
+    val util = batchTestUtil()
+    val t = util.addTable[(Long, String)]("Table2",'b, 'c)
     // Must fail. Aggregation on aggregation not allowed.
-    table.select("f0.sum.sum")
+    t.select("b.sum.sum")
   }
 
   @Test(expected = classOf[ValidationException])
   @throws[Exception]
   def testGroupingOnNonExistentFieldJava() {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tableEnv = TableEnvironment.getTableEnvironment(env)
-    val input = CollectionDataSets.get3TupleDataSet(env).toTable(tableEnv, 'a, 'b, 'c)
-    input
-      // must fail. Field foo is not in input
-      .groupBy("foo")
-      .select("a.avg")
+    val util = batchTestUtil()
+    val t = util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
+
+    // must fail. Field foo is not in input
+    t.groupBy("foo")
+    .select("a.avg")
   }
 
   @Test(expected = classOf[ValidationException])
   @throws[Exception]
   def testGroupingInvalidSelectionJava() {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tableEnv = TableEnvironment.getTableEnvironment(env)
-    val input = CollectionDataSets.get3TupleDataSet(env).toTable(tableEnv, 'a, 'b, 'c)
-    input
-      .groupBy("a, b")
-      // must fail. Field c is not a grouping key or aggregation
-      .select("c")
+    val util = batchTestUtil()
+    val t = util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
+
+    t.groupBy("a, b")
+    // must fail. Field c is not a grouping key or aggregation
+    .select("c")
   }
 
   @Test(expected = classOf[ValidationException])
   @throws[Exception]
   def testUnknownUdAggJava() {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tableEnv = TableEnvironment.getTableEnvironment(env)
-    val input = CollectionDataSets.get3TupleDataSet(env).toTable(tableEnv, 'a, 'b, 'c)
-    input
-      // must fail. unknown is not known
-      .select("unknown(c)")
+    val util = batchTestUtil()
+    val t = util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
+
+    // must fail. unknown is not known
+    t.select("unknown(c)")
   }
 
   @Test(expected = classOf[ValidationException])
   @throws[Exception]
   def testGroupingUnknownUdAggJava() {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tableEnv = TableEnvironment.getTableEnvironment(env)
-    val input = CollectionDataSets.get3TupleDataSet(env).toTable(tableEnv, 'a, 'b, 'c)
-    input
-      .groupBy("a, b")
-      // must fail. unknown is not known
-      .select("unknown(c)")
+    val util = batchTestUtil()
+    val t = util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
+
+    t.groupBy("a, b")
+    // must fail. unknown is not known
+    .select("unknown(c)")
   }
 
   @Test(expected = classOf[ValidationException])
   @throws[Exception]
   def testInvalidUdAggArgsJava() {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tableEnv = TableEnvironment.getTableEnvironment(env)
+    val util = batchTestUtil()
+    val t = util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
 
     val myWeightedAvg = new WeightedAvgWithMergeAndReset
-    tableEnv.registerFunction("myWeightedAvg", myWeightedAvg)
+    util.tableEnv.registerFunction("myWeightedAvg", myWeightedAvg)
 
-    val input = CollectionDataSets.get3TupleDataSet(env).toTable(tableEnv, 'a, 'b, 'c)
-    input
-      // must fail. UDAGG does not accept String type
-      .select("myWeightedAvg(c, a)")
+    // must fail. UDAGG does not accept String type
+    t.select("myWeightedAvg(c, a)")
   }
 
   @Test(expected = classOf[ValidationException])
   @throws[Exception]
   def testGroupingInvalidUdAggArgsJava() {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tableEnv = TableEnvironment.getTableEnvironment(env)
+    val util = batchTestUtil()
+    val t = util.addTable[(Int, Long, String)]("Table3", 'a, 'b, 'c)
 
     val myWeightedAvg = new WeightedAvgWithMergeAndReset
-    tableEnv.registerFunction("myWeightedAvg", myWeightedAvg)
+    util.tableEnv.registerFunction("myWeightedAvg", myWeightedAvg)
 
-    val input = CollectionDataSets.get3TupleDataSet(env).toTable(tableEnv, 'a, 'b, 'c)
-    input
-      .groupBy("b")
-      // must fail. UDAGG does not accept String type
-      .select("myWeightedAvg(c, a)")
+    t.groupBy("b")
+    // must fail. UDAGG does not accept String type
+    .select("myWeightedAvg(c, a)")
   }
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ecde7bc1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/CalcValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/CalcValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/CalcValidationTest.scala
index 846585b..e2a5dac 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/CalcValidationTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/CalcValidationTest.scala
@@ -19,117 +19,105 @@
 package org.apache.flink.table.api.scala.batch.table.validation
 
 import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.table.api.{TableEnvironment, TableException, ValidationException}
 import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.{TableException, ValidationException}
+import org.apache.flink.table.utils.TableTestBase
 import org.apache.flink.types.Row
 import org.junit.Assert._
 import org.junit._
 
-class CalcValidationTest {
+class CalcValidationTest extends TableTestBase {
 
   @Test(expected = classOf[ValidationException])
   def testSelectInvalidFieldFields(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+    val util = batchTestUtil()
+    val t = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
       // must fail. Field 'foo does not exist
       .select('a, 'foo)
   }
 
   @Test(expected = classOf[ValidationException])
   def testSelectAmbiguousRenaming(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+    val util = batchTestUtil()
+    val t = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
       // must fail. 'a and 'b are both renamed to 'foo
       .select('a + 1 as 'foo, 'b + 2 as 'foo).toDataSet[Row].print()
   }
 
   @Test(expected = classOf[ValidationException])
   def testSelectAmbiguousRenaming2(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+    val util = batchTestUtil()
+    val t = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
       // must fail. 'a and 'b are both renamed to 'a
       .select('a, 'b as 'a).toDataSet[Row].print()
   }
 
   @Test(expected = classOf[ValidationException])
   def testFilterInvalidFieldName(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+    val util = batchTestUtil()
+    val t = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
 
     // must fail. Field 'foo does not exist
-    ds.filter( 'foo === 2 )
+    t.filter( 'foo === 2 )
   }
 
   @Test(expected = classOf[ValidationException])
   @throws[Exception]
   def testSelectInvalidField() {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tableEnv = TableEnvironment.getTableEnvironment(env)
-    val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tableEnv, 'a, 'b, 'c)
+    val util = batchTestUtil()
+    val t = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
 
     // Must fail. Field foo does not exist
-    ds.select("a + 1, foo + 2")
+    t.select("a + 1, foo + 2")
   }
 
   @Test(expected = classOf[ValidationException])
   @throws[Exception]
   def testSelectAmbiguousFieldNames() {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tableEnv = TableEnvironment.getTableEnvironment(env)
-    val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tableEnv, 'a, 'b, 'c)
+    val util = batchTestUtil()
+    val t = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
 
     // Must fail. Field foo does not exist
-    ds.select("a + 1 as foo, b + 2 as foo")
+    t.select("a + 1 as foo, b + 2 as foo")
   }
 
   @Test(expected = classOf[ValidationException])
   @throws[Exception]
   def testFilterInvalidField() {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tableEnv = TableEnvironment.getTableEnvironment(env)
-    val table = CollectionDataSets.get3TupleDataSet(env).toTable(tableEnv, 'a, 'b, 'c)
+    val util = batchTestUtil()
+    val t = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
 
     // Must fail. Field foo does not exist.
-    table.filter("foo = 17")
+    t.filter("foo = 17")
   }
 
   @Test
   def testAliasStarException(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
+    val util = batchTestUtil()
 
     try {
-      CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, '*, 'b, 'c)
+      util.addTable[(Int, Long, String)]("Table1", '*, 'b, 'c)
       fail("TableException expected")
     } catch {
       case _: TableException => //ignore
     }
 
     try {
-      CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
-        .select('_1 as '*, '_2 as 'b, '_1 as 'c)
+      util.addTable[(Int, Long, String)]("Table2")
+      .select('_1 as '*, '_2 as 'b, '_1 as 'c)
       fail("ValidationException expected")
     } catch {
       case _: ValidationException => //ignore
     }
 
     try {
-      CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('*, 'b, 'c)
+      util.addTable[(Int, Long, String)]("Table3").as('*, 'b, 'c)
       fail("ValidationException expected")
     } catch {
       case _: ValidationException => //ignore
     }
-try {
-      CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c).select('*, 'b)
+    try {
+      util.addTable[(Int, Long, String)]("Table4", 'a, 'b, 'c).select('*, 'b)
       fail("ValidationException expected")
     } catch {
       case _: ValidationException => //ignore

http://git-wip-us.apache.org/repos/asf/flink/blob/ecde7bc1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/CompositeFlatteningValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/CompositeFlatteningValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/CompositeFlatteningValidationTest.scala
new file mode 100644
index 0000000..0ac205a
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/CompositeFlatteningValidationTest.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.api.scala.batch.table.validation
+
+import org.apache.flink.table.api.ValidationException
+import org.apache.flink.table.utils.TableTestBase
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.scala._
+import org.junit.Test
+
+class CompositeFlatteningValidationTest extends TableTestBase {
+
+  @Test(expected = classOf[ValidationException])
+  def testDuplicateFlattening(): Unit = {
+    val util = batchTestUtil()
+    val table = util.addTable[((Int, Long), (String, Boolean), String)]("MyTable", 'a, 'b, 'c)
+
+    table.select('a.flatten(), 'a.flatten())
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ecde7bc1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/GroupWindowValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/GroupWindowValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/GroupWindowValidationTest.scala
new file mode 100644
index 0000000..b8e2b97
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/GroupWindowValidationTest.scala
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.api.scala.batch.table.validation
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.ValidationException
+import org.apache.flink.table.api.java.utils.UserDefinedAggFunctions.{OverAgg0, WeightedAvgWithMerge}
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.utils.TableTestBase
+import org.junit.Test
+
+class GroupWindowValidationTest extends TableTestBase {
+
+  //===============================================================================================
+  // Common test
+  //===============================================================================================
+
+  /**
+    * OVER clause is necessary for [[OverAgg0]] window function.
+    */
+  @Test(expected = classOf[ValidationException])
+  def testOverAggregation(): Unit = {
+    val util = batchTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+    val overAgg = new OverAgg0
+    table
+      .window(Tumble over 5.milli on 'long as 'w)
+      .groupBy('string,'w)
+      .select(overAgg('long, 'int))
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testGroupByWithoutWindowAlias(): Unit = {
+    val util = batchTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+    table
+      .window(Tumble over 5.milli on 'long as 'w)
+      .groupBy('string)
+      .select('string, 'int.count)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testInvalidRowTimeRef(): Unit = {
+    val util = batchTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+    table
+      .window(Tumble over 5.milli on 'long as 'w)
+      .groupBy('w, 'string)
+      .select('string, 'int.count)
+      .window(Slide over 5.milli every 1.milli on 'int as 'w2) // 'Int  does not exist in input.
+      .groupBy('w2)
+      .select('string)
+  }
+
+  //===============================================================================================
+  // Tumbling Windows
+  //===============================================================================================
+
+  @Test(expected = classOf[ValidationException])
+  def testInvalidProcessingTimeDefinition(): Unit = {
+    val util = batchTestUtil()
+    // proctime is not allowed
+    util.addTable[(Long, Int, String)]('long.proctime, 'int, 'string)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testInvalidProcessingTimeDefinition2(): Unit = {
+    val util = batchTestUtil()
+    // proctime is not allowed
+    util.addTable[(Long, Int, String)]('long, 'int, 'string, 'proctime.proctime)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testInvalidEventTimeDefinition(): Unit = {
+    val util = batchTestUtil()
+    // definition must not extend schema
+    util.addTable[(Long, Int, String)]('long, 'int, 'string, 'rowtime.rowtime)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testTumblingGroupWindowWithInvalidUdAggArgs(): Unit = {
+    val util = batchTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+    val myWeightedAvg = new WeightedAvgWithMerge
+
+    table
+      .window(Tumble over 2.minutes on 'rowtime as 'w)
+      .groupBy('w, 'long)
+      // invalid function arguments
+      .select(myWeightedAvg('int, 'string))
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testAllTumblingGroupWindowWithInvalidUdAggArgs(): Unit = {
+    val util = batchTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+    val myWeightedAvg = new WeightedAvgWithMerge
+
+    table
+      .window(Tumble over 2.minutes on 'rowtime as 'w)
+      .groupBy('w)
+      // invalid function arguments
+      .select(myWeightedAvg('int, 'string))
+  }
+
+  //===============================================================================================
+  // Sliding Windows
+  //===============================================================================================
+
+  @Test(expected = classOf[ValidationException])
+  def testSlidingGroupWindowWithInvalidUdAggArgs(): Unit = {
+    val util = batchTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+    val myWeightedAvg = new WeightedAvgWithMerge
+
+    table
+      .window(Slide over 2.minutes every 1.minute on 'rowtime as 'w)
+      .groupBy('w, 'long)
+      // invalid function arguments
+      .select(myWeightedAvg('int, 'string))
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testAllSlidingGroupWindowWithInvalidUdAggArgs(): Unit = {
+    val util = batchTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+    val myWeightedAvg = new WeightedAvgWithMerge
+
+    table
+      .window(Slide over 2.minutes every 1.minute on 'long as 'w)
+      .groupBy('w)
+      // invalid function arguments
+      .select(myWeightedAvg('int, 'string))
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testSessionGroupWindowWithInvalidUdAggArgs(): Unit = {
+    val util = batchTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+    val myWeightedAvg = new WeightedAvgWithMerge
+
+    table
+      .window(Session withGap 2.minutes on 'rowtime as 'w)
+      .groupBy('w, 'long)
+      // invalid function arguments
+      .select(myWeightedAvg('int, 'string))
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testAllSessionGroupWindowWithInvalidUdAggArgs(): Unit = {
+    val util = batchTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+    val myWeightedAvg = new WeightedAvgWithMerge
+
+    table
+      .window(Session withGap 2.minutes on 'rowtime as 'w)
+      .groupBy('w)
+      // invalid function arguments
+      .select(myWeightedAvg('int, 'string))
+  }
+
+}