You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/12/16 15:46:36 UTC

[07/47] flink git commit: [FLINK-4704] [table] Refactor package structure of flink-table.

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/datastream/DataStreamCorrelateITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/datastream/DataStreamCorrelateITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/datastream/DataStreamCorrelateITCase.scala
deleted file mode 100644
index 21fe157..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/datastream/DataStreamCorrelateITCase.scala
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.runtime.datastream
-
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.stream.utils.StreamITCase
-import org.apache.flink.api.scala.table._
-import org.apache.flink.api.table.utils.TableFunc0
-import org.apache.flink.api.table.TableEnvironment
-import org.apache.flink.types.Row
-import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
-import org.junit.Assert._
-import org.junit.Test
-
-import scala.collection.mutable
-
-class DataStreamCorrelateITCase extends StreamingMultipleProgramsTestBase {
-
-  @Test
-  def testCrossJoin(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.clear
-
-    val t = testData(env).toTable(tEnv).as('a, 'b, 'c)
-    val func0 = new TableFunc0
-
-    val result = t
-      .join(func0('c) as('d, 'e))
-      .select('c, 'd, 'e)
-      .toDataStream[Row]
-
-    result.addSink(new StreamITCase.StringSink)
-    env.execute()
-
-    val expected = mutable.MutableList("Jack#22,Jack,22", "John#19,John,19", "Anna#44,Anna,44")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test
-  def testLeftOuterJoin(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.clear
-
-    val t = testData(env).toTable(tEnv).as('a, 'b, 'c)
-    val func0 = new TableFunc0
-
-    val result = t
-      .leftOuterJoin(func0('c) as('d, 'e))
-      .select('c, 'd, 'e)
-      .toDataStream[Row]
-
-    result.addSink(new StreamITCase.StringSink)
-    env.execute()
-
-    val expected = mutable.MutableList(
-      "nosharp,null,null", "Jack#22,Jack,22",
-      "John#19,John,19", "Anna#44,Anna,44")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  private def testData(
-    env: StreamExecutionEnvironment)
-  : DataStream[(Int, Long, String)] = {
-
-    val data = new mutable.MutableList[(Int, Long, String)]
-    data.+=((1, 1L, "Jack#22"))
-    data.+=((2, 2L, "John#19"))
-    data.+=((3, 2L, "Anna#44"))
-    data.+=((4, 3L, "nosharp"))
-    env.fromCollection(data)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/utils/TableTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/utils/TableTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/utils/TableTestBase.scala
deleted file mode 100644
index b281dfc..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/utils/TableTestBase.scala
+++ /dev/null
@@ -1,229 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.utils
-
-import org.apache.calcite.plan.RelOptUtil
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.{DataSet => JDataSet}
-import org.apache.flink.api.scala.table._
-import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment}
-import org.apache.flink.api.table.expressions.Expression
-import org.apache.flink.api.table.functions.{ScalarFunction, TableFunction}
-import org.apache.flink.api.table.{Table, TableEnvironment}
-import org.apache.flink.streaming.api.datastream.{DataStream => JDataStream}
-import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
-import org.junit.Assert.assertEquals
-import org.mockito.Mockito.{mock, when}
-
-/**
-  * Test base for testing Table API / SQL plans.
-  */
-class TableTestBase {
-
-  def batchTestUtil(): BatchTableTestUtil = {
-    BatchTableTestUtil()
-  }
-
-  def streamTestUtil(): StreamTableTestUtil = {
-    StreamTableTestUtil()
-  }
-
-  def verifyTableEquals(expected: Table, actual: Table): Unit = {
-    assertEquals(
-      "Logical plans do not match",
-      RelOptUtil.toString(expected.getRelNode),
-      RelOptUtil.toString(actual.getRelNode))
-  }
-
-}
-
-abstract class TableTestUtil {
-
-  private var counter = 0
-
-  def addTable[T: TypeInformation](fields: Expression*): Table = {
-    addTable[T](s"Table${counter += 1}", fields: _*)
-  }
-
-  def addTable[T: TypeInformation](name: String, fields: Expression*): Table
-  def addFunction[T: TypeInformation](name: String, function: TableFunction[T]): TableFunction[T]
-  def addFunction(name: String, function: ScalarFunction): Unit
-
-  def verifySql(query: String, expected: String): Unit
-  def verifyTable(resultTable: Table, expected: String): Unit
-
-  // the print methods are for debugging purposes only
-  def printTable(resultTable: Table): Unit
-  def printSql(query: String): Unit
-}
-
-object TableTestUtil {
-
-  // this methods are currently just for simplifying string construction,
-  // we could replace it with logic later
-
-  def unaryNode(node: String, input: String, term: String*): String = {
-    s"""$node(${term.mkString(", ")})
-       |$input
-       |""".stripMargin.stripLineEnd
-  }
-
-  def binaryNode(node: String, left: String, right: String, term: String*): String = {
-    s"""$node(${term.mkString(", ")})
-       |$left
-       |$right
-       |""".stripMargin.stripLineEnd
-  }
-
-  def values(node: String, term: String*): String = {
-    s"$node(${term.mkString(", ")})"
-  }
-
-  def term(term: AnyRef, value: AnyRef*): String = {
-    s"$term=[${value.mkString(", ")}]"
-  }
-
-  def tuples(value:List[AnyRef]*): String={
-    val listValues = value.map( listValue => s"{ ${listValue.mkString(", ")} }")
-    term("tuples","[" + listValues.mkString(", ") + "]")
-  }
-
-  def batchTableNode(idx: Int): String = {
-    s"DataSetScan(table=[[_DataSetTable_$idx]])"
-  }
-
-  def streamTableNode(idx: Int): String = {
-    s"DataStreamScan(table=[[_DataStreamTable_$idx]])"
-  }
-
-}
-
-case class BatchTableTestUtil() extends TableTestUtil {
-
-  val env = mock(classOf[ExecutionEnvironment])
-  val tEnv = TableEnvironment.getTableEnvironment(env)
-
-  def addTable[T: TypeInformation](
-      name: String,
-      fields: Expression*)
-    : Table = {
-    val ds = mock(classOf[DataSet[T]])
-    val jDs = mock(classOf[JDataSet[T]])
-    when(ds.javaSet).thenReturn(jDs)
-    val typeInfo: TypeInformation[T] = implicitly[TypeInformation[T]]
-    when(jDs.getType).thenReturn(typeInfo)
-
-    val t = ds.toTable(tEnv, fields: _*)
-    tEnv.registerTable(name, t)
-    t
-  }
-
-  def addFunction[T: TypeInformation](
-      name: String,
-      function: TableFunction[T])
-    : TableFunction[T] = {
-    tEnv.registerFunction(name, function)
-    function
-  }
-
-  def addFunction(name: String, function: ScalarFunction): Unit = {
-    tEnv.registerFunction(name, function)
-  }
-
-  def verifySql(query: String, expected: String): Unit = {
-    verifyTable(tEnv.sql(query), expected)
-  }
-
-  def verifyTable(resultTable: Table, expected: String): Unit = {
-    val relNode = resultTable.getRelNode
-    val optimized = tEnv.optimize(relNode)
-    val actual = RelOptUtil.toString(optimized)
-    assertEquals(
-      expected.split("\n").map(_.trim).mkString("\n"),
-      actual.split("\n").map(_.trim).mkString("\n"))
-  }
-
-  def printTable(resultTable: Table): Unit = {
-    val relNode = resultTable.getRelNode
-    val optimized = tEnv.optimize(relNode)
-    println(RelOptUtil.toString(optimized))
-  }
-
-  def printSql(query: String): Unit = {
-    printTable(tEnv.sql(query))
-  }
-}
-
-case class StreamTableTestUtil() extends TableTestUtil {
-
-  val env = mock(classOf[StreamExecutionEnvironment])
-  val tEnv = TableEnvironment.getTableEnvironment(env)
-
-  def addTable[T: TypeInformation](
-      name: String,
-      fields: Expression*)
-    : Table = {
-
-    val ds = mock(classOf[DataStream[T]])
-    val jDs = mock(classOf[JDataStream[T]])
-    when(ds.javaStream).thenReturn(jDs)
-    val typeInfo: TypeInformation[T] = implicitly[TypeInformation[T]]
-    when(jDs.getType).thenReturn(typeInfo)
-
-    val t = ds.toTable(tEnv, fields: _*)
-    tEnv.registerTable(name, t)
-    t
-  }
-
-  def addFunction[T: TypeInformation](
-      name: String,
-      function: TableFunction[T])
-    : TableFunction[T] = {
-    tEnv.registerFunction(name, function)
-    function
-  }
-
-  def addFunction(name: String, function: ScalarFunction): Unit = {
-    tEnv.registerFunction(name, function)
-  }
-
-  def verifySql(query: String, expected: String): Unit = {
-    verifyTable(tEnv.sql(query), expected)
-  }
-
-  def verifyTable(resultTable: Table, expected: String): Unit = {
-    val relNode = resultTable.getRelNode
-    val optimized = tEnv.optimize(relNode)
-    val actual = RelOptUtil.toString(optimized)
-    assertEquals(
-      expected.split("\n").map(_.trim).mkString("\n"),
-      actual.split("\n").map(_.trim).mkString("\n"))
-  }
-
-  // the print methods are for debugging purposes only
-  def printTable(resultTable: Table): Unit = {
-    val relNode = resultTable.getRelNode
-    val optimized = tEnv.optimize(relNode)
-    println(RelOptUtil.toString(optimized))
-  }
-
-  def printSql(query: String): Unit = {
-    printTable(tEnv.sql(query))
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/utils/UserDefinedTableFunctions.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/utils/UserDefinedTableFunctions.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/utils/UserDefinedTableFunctions.scala
deleted file mode 100644
index 4291b29..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/utils/UserDefinedTableFunctions.scala
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.utils
-
-import java.lang.Boolean
-
-import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
-import org.apache.flink.api.java.tuple.Tuple3
-import org.apache.flink.types.Row
-import org.apache.flink.api.table.functions.TableFunction
-import org.apache.flink.api.java.typeutils.RowTypeInfo
-
-
-case class SimpleUser(name: String, age: Int)
-
-class TableFunc0 extends TableFunction[SimpleUser] {
-  // make sure input element's format is "<string>#<int>"
-  def eval(user: String): Unit = {
-    if (user.contains("#")) {
-      val splits = user.split("#")
-      collect(SimpleUser(splits(0), splits(1).toInt))
-    }
-  }
-}
-
-class TableFunc1 extends TableFunction[String] {
-  def eval(str: String): Unit = {
-    if (str.contains("#")){
-      str.split("#").foreach(collect)
-    }
-  }
-
-  def eval(str: String, prefix: String): Unit = {
-    if (str.contains("#")) {
-      str.split("#").foreach(s => collect(prefix + s))
-    }
-  }
-}
-
-
-class TableFunc2 extends TableFunction[Row] {
-  def eval(str: String): Unit = {
-    if (str.contains("#")) {
-      str.split("#").foreach({ s =>
-        val row = new Row(2)
-        row.setField(0, s)
-        row.setField(1, s.length)
-        collect(row)
-      })
-    }
-  }
-
-  override def getResultType: TypeInformation[Row] = {
-    new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO,
-                    BasicTypeInfo.INT_TYPE_INFO)
-  }
-}
-
-class HierarchyTableFunction extends SplittableTableFunction[Boolean, Integer] {
-  def eval(user: String) {
-    if (user.contains("#")) {
-      val splits = user.split("#")
-      val age = splits(1).toInt
-      collect(new Tuple3[String, Boolean, Integer](splits(0), age >= 20, age))
-    }
-  }
-}
-
-abstract class SplittableTableFunction[A, B] extends TableFunction[Tuple3[String, A, B]] {}
-
-class PojoTableFunc extends TableFunction[PojoUser] {
-  def eval(user: String) {
-    if (user.contains("#")) {
-      val splits = user.split("#")
-      collect(new PojoUser(splits(0), splits(1).toInt))
-    }
-  }
-}
-
-class PojoUser() {
-  var name: String = _
-  var age: Int = 0
-
-  def this(name: String, age: Int) {
-    this()
-    this.name = name
-    this.age = age
-  }
-}
-
-// ----------------------------------------------------------------------------------------------
-// Invalid Table Functions
-// ----------------------------------------------------------------------------------------------
-
-
-// this is used to check whether scala object is forbidden
-object ObjectTableFunction extends TableFunction[Integer] {
-  def eval(a: Int, b: Int): Unit = {
-    collect(a)
-    collect(b)
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/AggregationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/AggregationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/AggregationTest.scala
new file mode 100644
index 0000000..708e007
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/AggregationTest.scala
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.utils.TableTestBase
+import org.apache.flink.table.utils.TableTestUtil._
+import org.junit.Test
+
+/**
+  * Test for testing aggregate plans.
+  */
+class AggregationTest extends TableTestBase {
+
+  @Test
+  def testAggregateQueryBatchSQL(): Unit = {
+    val util = batchTestUtil()
+    util.addTable[(Int, Long, Int)]("MyTable", 'a, 'b, 'c)
+
+    val sqlQuery = "SELECT avg(a), sum(b), count(c) FROM MyTable"
+
+    val setValues = unaryNode(
+      "DataSetValues",
+      batchTableNode(0),
+      tuples(List(null,null,null)),
+      term("values","a","b","c")
+    )
+    val union = unaryNode(
+      "DataSetUnion",
+      setValues,
+      term("union","a","b","c")
+    )
+
+    val aggregate = unaryNode(
+      "DataSetAggregate",
+      union,
+      term("select",
+        "AVG(a) AS EXPR$0",
+        "SUM(b) AS EXPR$1",
+        "COUNT(c) AS EXPR$2")
+    )
+    util.verifySql(sqlQuery, aggregate)
+  }
+
+  @Test
+  def testAggregateWithFilterQueryBatchSQL(): Unit = {
+    val util = batchTestUtil()
+    util.addTable[(Int, Long, Int)]("MyTable", 'a, 'b, 'c)
+
+    val sqlQuery = "SELECT avg(a), sum(b), count(c) FROM MyTable WHERE a = 1"
+
+    val calcNode = unaryNode(
+      "DataSetCalc",
+      batchTableNode(0),
+      term("select", "a", "b", "c"),
+      term("where", "=(a, 1)")
+    )
+
+    val setValues =  unaryNode(
+        "DataSetValues",
+        calcNode,
+        tuples(List(null,null,null)),
+        term("values","a","b","c")
+    )
+
+    val union = unaryNode(
+      "DataSetUnion",
+      setValues,
+      term("union","a","b","c")
+    )
+
+    val aggregate = unaryNode(
+      "DataSetAggregate",
+      union,
+      term("select",
+        "AVG(a) AS EXPR$0",
+        "SUM(b) AS EXPR$1",
+        "COUNT(c) AS EXPR$2")
+    )
+    util.verifySql(sqlQuery, aggregate)
+  }
+
+  @Test
+  def testAggregateGroupQueryBatchSQL(): Unit = {
+    val util = batchTestUtil()
+    util.addTable[(Int, Long, Int)]("MyTable", 'a, 'b, 'c)
+
+    val sqlQuery = "SELECT avg(a), sum(b), count(c) FROM MyTable GROUP BY a"
+
+    val aggregate = unaryNode(
+        "DataSetAggregate",
+        batchTableNode(0),
+        term("groupBy", "a"),
+        term("select",
+          "a",
+          "AVG(a) AS EXPR$0",
+          "SUM(b) AS EXPR$1",
+          "COUNT(c) AS EXPR$2")
+    )
+    val expected = unaryNode(
+        "DataSetCalc",
+        aggregate,
+        term("select",
+          "EXPR$0",
+          "EXPR$1",
+          "EXPR$2")
+    )
+    util.verifySql(sqlQuery, expected)
+  }
+
+  @Test
+  def testAggregateGroupWithFilterQueryBatchSQL(): Unit = {
+    val util = batchTestUtil()
+    util.addTable[(Int, Long, Int)]("MyTable", 'a, 'b, 'c)
+
+    val sqlQuery = "SELECT avg(a), sum(b), count(c) FROM MyTable WHERE a = 1 GROUP BY a"
+
+    val calcNode = unaryNode(
+      "DataSetCalc",
+      batchTableNode(0),
+      term("select","a", "b", "c") ,
+      term("where","=(a, 1)")
+    )
+
+    val aggregate = unaryNode(
+        "DataSetAggregate",
+        calcNode,
+        term("groupBy", "a"),
+        term("select",
+          "a",
+          "AVG(a) AS EXPR$0",
+          "SUM(b) AS EXPR$1",
+          "COUNT(c) AS EXPR$2")
+    )
+    val expected = unaryNode(
+        "DataSetCalc",
+        aggregate,
+        term("select",
+          "EXPR$0",
+          "EXPR$1",
+          "EXPR$2")
+    )
+    util.verifySql(sqlQuery, expected)
+  }
+
+  @Test
+  def testAggregateGroupWithFilterTableApi(): Unit = {
+
+    val util = batchTestUtil()
+    val sourceTable = util.addTable[(Int, Long, Int)]("MyTable", 'a, 'b, 'c)
+
+    val resultTable = sourceTable.groupBy('a)
+      .select('a, 'a.avg, 'b.sum, 'c.count)
+      .where('a === 1)
+
+    val calcNode = unaryNode(
+      "DataSetCalc",
+      batchTableNode(0),
+      term("select", "a", "b", "c"),
+      term("where", "=(a, 1)")
+    )
+
+    val expected = unaryNode(
+      "DataSetAggregate",
+      calcNode,
+      term("groupBy", "a"),
+      term("select",
+        "a",
+        "AVG(a) AS TMP_0",
+        "SUM(b) AS TMP_1",
+        "COUNT(c) AS TMP_2")
+    )
+
+    util.verifyTable(resultTable,expected)
+  }
+
+  @Test
+  def testAggregateTableApi(): Unit = {
+    val util = batchTestUtil()
+    val sourceTable = util.addTable[(Int, Long, Int)]("MyTable", 'a, 'b, 'c)
+    val resultTable = sourceTable.select('a.avg,'b.sum,'c.count)
+
+    val setValues = unaryNode(
+      "DataSetValues",
+      batchTableNode(0),
+      tuples(List(null,null,null)),
+      term("values","a","b","c")
+    )
+    val union = unaryNode(
+      "DataSetUnion",
+      setValues,
+      term("union","a","b","c")
+    )
+
+    val expected = unaryNode(
+      "DataSetAggregate",
+      union,
+      term("select",
+        "AVG(a) AS TMP_0",
+        "SUM(b) AS TMP_1",
+        "COUNT(c) AS TMP_2")
+    )
+    util.verifyTable(resultTable, expected)
+  }
+
+  @Test
+  def testAggregateWithFilterTableApi(): Unit = {
+    val util = batchTestUtil()
+    val sourceTable = util.addTable[(Int, Long, Int)]("MyTable", 'a, 'b, 'c)
+
+    val resultTable = sourceTable.select('a,'b,'c).where('a === 1)
+      .select('a.avg,'b.sum,'c.count)
+
+    val calcNode = unaryNode(
+      "DataSetCalc",
+      batchTableNode(0),
+      term("select", "a", "b", "c"),
+      term("where", "=(a, 1)")
+    )
+
+    val setValues =  unaryNode(
+      "DataSetValues",
+      calcNode,
+      tuples(List(null,null,null)),
+      term("values","a","b","c")
+    )
+
+    val union = unaryNode(
+      "DataSetUnion",
+      setValues,
+      term("union","a","b","c")
+    )
+
+    val expected = unaryNode(
+      "DataSetAggregate",
+      union,
+      term("select",
+        "AVG(a) AS TMP_0",
+        "SUM(b) AS TMP_1",
+        "COUNT(c) AS TMP_2")
+    )
+
+    util.verifyTable(resultTable, expected)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/CalciteConfigBuilderTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/CalciteConfigBuilderTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/CalciteConfigBuilderTest.scala
new file mode 100644
index 0000000..09a4c4e
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/CalciteConfigBuilderTest.scala
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table
+
+import org.apache.calcite.rel.rules.{CalcSplitRule, CalcMergeRule, FilterMergeRule}
+import org.apache.calcite.sql.fun.{SqlStdOperatorTable, OracleSqlOperatorTable}
+import org.apache.calcite.tools.RuleSets
+import org.apache.flink.table.calcite.{CalciteConfigBuilder, CalciteConfig}
+import org.junit.Test
+import org.junit.Assert._
+
+import scala.collection.JavaConverters._
+
+class CalciteConfigBuilderTest {
+
+  @Test
+  def testDefaultRules(): Unit = {
+
+    val cc: CalciteConfig = new CalciteConfigBuilder()
+      .build()
+
+    assertEquals(false, cc.replacesRuleSet)
+    assertFalse(cc.getRuleSet.isDefined)
+  }
+
+  @Test
+  def testReplaceRules(): Unit = {
+
+    val cc: CalciteConfig = new CalciteConfigBuilder()
+      .replaceRuleSet(RuleSets.ofList(FilterMergeRule.INSTANCE))
+      .build()
+
+    assertEquals(true, cc.replacesRuleSet)
+    assertTrue(cc.getRuleSet.isDefined)
+    val cSet = cc.getRuleSet.get.iterator().asScala.toSet
+    assertEquals(1, cSet.size)
+    assertTrue(cSet.contains(FilterMergeRule.INSTANCE))
+  }
+
+  @Test
+  def testReplaceAddRules(): Unit = {
+
+    val cc: CalciteConfig = new CalciteConfigBuilder()
+      .replaceRuleSet(RuleSets.ofList(FilterMergeRule.INSTANCE))
+      .addRuleSet(RuleSets.ofList(CalcMergeRule.INSTANCE, CalcSplitRule.INSTANCE))
+      .build()
+
+    assertEquals(true, cc.replacesRuleSet)
+    assertTrue(cc.getRuleSet.isDefined)
+    val cSet = cc.getRuleSet.get.iterator().asScala.toSet
+    assertEquals(3, cSet.size)
+    assertTrue(cSet.contains(FilterMergeRule.INSTANCE))
+    assertTrue(cSet.contains(CalcMergeRule.INSTANCE))
+    assertTrue(cSet.contains(CalcSplitRule.INSTANCE))
+  }
+
+  @Test
+  def testAddRules(): Unit = {
+
+    val cc: CalciteConfig = new CalciteConfigBuilder()
+      .addRuleSet(RuleSets.ofList(FilterMergeRule.INSTANCE))
+      .build()
+
+    assertEquals(false, cc.replacesRuleSet)
+    assertTrue(cc.getRuleSet.isDefined)
+    val cSet = cc.getRuleSet.get.iterator().asScala.toSet
+    assertEquals(1, cSet.size)
+    assertTrue(cSet.contains(FilterMergeRule.INSTANCE))
+  }
+
+  @Test
+  def testAddAddRules(): Unit = {
+
+    val cc: CalciteConfig = new CalciteConfigBuilder()
+      .addRuleSet(RuleSets.ofList(FilterMergeRule.INSTANCE))
+      .addRuleSet(RuleSets.ofList(CalcMergeRule.INSTANCE, CalcSplitRule.INSTANCE))
+      .build()
+
+    assertEquals(false, cc.replacesRuleSet)
+    assertTrue(cc.getRuleSet.isDefined)
+    val cSet = cc.getRuleSet.get.iterator().asScala.toSet
+    assertEquals(3, cSet.size)
+    assertTrue(cSet.contains(FilterMergeRule.INSTANCE))
+    assertTrue(cSet.contains(CalcMergeRule.INSTANCE))
+    assertTrue(cSet.contains(CalcSplitRule.INSTANCE))
+  }
+
+  @Test
+  def testDefaultOperatorTable(): Unit = {
+
+    val cc: CalciteConfig = new CalciteConfigBuilder()
+      .build()
+
+    assertEquals(false, cc.replacesSqlOperatorTable)
+    assertFalse(cc.getSqlOperatorTable.isDefined)
+  }
+
+  def testReplaceOperatorTable(): Unit = {
+
+    val oracleTable = new OracleSqlOperatorTable
+
+    val cc: CalciteConfig = new CalciteConfigBuilder()
+      .replaceSqlOperatorTable(oracleTable)
+      .build()
+
+    val oracleOps = oracleTable.getOperatorList.asScala
+
+    assertEquals(true, cc.replacesSqlOperatorTable)
+    assertTrue(cc.getSqlOperatorTable.isDefined)
+    val ops = cc.getSqlOperatorTable.get.getOperatorList
+      .asScala.toSet
+    assertEquals(oracleOps.size, ops.size)
+    for (o <- oracleOps) {
+      assertTrue(ops.contains(o))
+    }
+  }
+
+  def testReplaceAddOperatorTable(): Unit = {
+
+    val oracleTable = new OracleSqlOperatorTable
+    val stdTable = new SqlStdOperatorTable
+
+    val cc: CalciteConfig = new CalciteConfigBuilder()
+      .replaceSqlOperatorTable(oracleTable)
+      .addSqlOperatorTable(stdTable)
+      .build()
+
+    val oracleOps = oracleTable.getOperatorList.asScala
+    val stdOps = stdTable.getOperatorList.asScala
+
+    assertEquals(true, cc.replacesSqlOperatorTable)
+    assertTrue(cc.getSqlOperatorTable.isDefined)
+    val ops = cc.getSqlOperatorTable.get.getOperatorList
+      .asScala.toSet
+    assertEquals(oracleOps.size + stdOps.size, ops.size)
+    for (o <- oracleOps) {
+      assertTrue(ops.contains(o))
+    }
+    for (o <- stdOps) {
+      assertTrue(ops.contains(o))
+    }
+
+  }
+
+  def testAddOperatorTable(): Unit = {
+
+    val oracleTable = new OracleSqlOperatorTable
+
+    val cc: CalciteConfig = new CalciteConfigBuilder()
+      .addSqlOperatorTable(oracleTable)
+      .build()
+
+    val oracleOps = oracleTable.getOperatorList.asScala
+
+    assertEquals(false, cc.replacesSqlOperatorTable)
+    assertTrue(cc.getSqlOperatorTable.isDefined)
+    val ops = cc.getSqlOperatorTable.get.getOperatorList
+      .asScala.toSet
+    assertEquals(oracleOps.size, ops.size)
+    for (o <- oracleOps) {
+      assertTrue(ops.contains(o))
+    }
+  }
+
+  def testAddAddOperatorTable(): Unit = {
+
+    val oracleTable = new OracleSqlOperatorTable
+    val stdTable = new SqlStdOperatorTable
+
+    val cc: CalciteConfig = new CalciteConfigBuilder()
+      .addSqlOperatorTable(oracleTable)
+      .addSqlOperatorTable(stdTable)
+      .build()
+
+    val oracleOps = oracleTable.getOperatorList.asScala
+    val stdOps = stdTable.getOperatorList.asScala
+
+    assertEquals(false, cc.replacesSqlOperatorTable)
+    assertTrue(cc.getSqlOperatorTable.isDefined)
+    val ops = cc.getSqlOperatorTable.get.getOperatorList
+      .asScala.toSet
+    assertEquals(oracleOps.size + stdOps.size, ops.size)
+    for (o <- oracleOps) {
+      assertTrue(ops.contains(o))
+    }
+    for (o <- stdOps) {
+      assertTrue(ops.contains(o))
+    }
+
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/CompositeFlatteningTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/CompositeFlatteningTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/CompositeFlatteningTest.scala
new file mode 100644
index 0000000..0055fc2
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/CompositeFlatteningTest.scala
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.scala.createTypeInformation
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.CompositeFlatteningTest.{TestCaseClass, giveMeCaseClass}
+import org.apache.flink.table.api.ValidationException
+import org.apache.flink.table.functions.ScalarFunction
+import org.apache.flink.table.utils.TableTestBase
+import org.apache.flink.table.utils.TableTestUtil._
+import org.junit.Test
+
+
+class CompositeFlatteningTest extends TableTestBase {
+
+  @Test(expected = classOf[ValidationException])
+  def testDuplicateFlattening(): Unit = {
+    val util = batchTestUtil()
+    val table = util.addTable[((Int, Long), (String, Boolean), String)]("MyTable", 'a, 'b, 'c)
+
+    table.select('a.flatten(), 'a.flatten())
+  }
+
+  @Test
+  def testMultipleFlatteningsTable(): Unit = {
+    val util = batchTestUtil()
+    val table = util.addTable[((Int, Long), (String, Boolean), String)]("MyTable", 'a, 'b, 'c)
+
+    val result = table.select('a.flatten(), 'c, 'b.flatten())
+
+    val expected = unaryNode(
+      "DataSetCalc",
+      batchTableNode(0),
+      term("select",
+        "a._1 AS a$_1",
+        "a._2 AS a$_2",
+        "c",
+        "b._1 AS b$_1",
+        "b._2 AS b$_2"
+      )
+    )
+
+    util.verifyTable(result, expected)
+  }
+
+  @Test
+  def testMultipleFlatteningsSql(): Unit = {
+    val util = batchTestUtil()
+    val table = util.addTable[((Int, Long), (String, Boolean), String)]("MyTable", 'a, 'b, 'c)
+
+    val expected = unaryNode(
+      "DataSetCalc",
+      batchTableNode(0),
+      term("select",
+        "a._1 AS _1",
+        "a._2 AS _2",
+        "c",
+        "b._1 AS _10",
+        "b._2 AS _20"
+      )
+    )
+
+    util.verifySql(
+      "SELECT MyTable.a.*, c, MyTable.b.* FROM MyTable",
+      expected)
+  }
+
+  @Test
+  def testNestedFlattenings(): Unit = {
+    val util = batchTestUtil()
+    val table = util
+      .addTable[((((String, TestCaseClass), Boolean), String), String)]("MyTable", 'a, 'b)
+
+    val result = table.select('a.flatten(), 'b.flatten())
+
+    val expected = unaryNode(
+      "DataSetCalc",
+      batchTableNode(0),
+      term("select",
+        "a._1 AS a$_1",
+        "a._2 AS a$_2",
+        "b"
+      )
+    )
+
+    util.verifyTable(result, expected)
+  }
+
+  @Test
+  def testScalarFunctionAccess(): Unit = {
+    val util = batchTestUtil()
+    val table = util
+      .addTable[(String, Int)]("MyTable", 'a, 'b)
+
+    val result = table.select(
+      giveMeCaseClass().get("my"),
+      giveMeCaseClass().get("clazz"),
+      giveMeCaseClass().flatten())
+
+    val expected = unaryNode(
+      "DataSetCalc",
+      batchTableNode(0),
+      term("select",
+        "org.apache.flink.table.CompositeFlatteningTest.giveMeCaseClass$().my AS _c0",
+        "org.apache.flink.table.CompositeFlatteningTest.giveMeCaseClass$().clazz AS _c1",
+        "org.apache.flink.table.CompositeFlatteningTest.giveMeCaseClass$().my AS _c2",
+        "org.apache.flink.table.CompositeFlatteningTest.giveMeCaseClass$().clazz AS _c3"
+      )
+    )
+
+    util.verifyTable(result, expected)
+  }
+
+}
+
+object CompositeFlatteningTest {
+
+  case class TestCaseClass(my: String, clazz: Int)
+
+  object giveMeCaseClass extends ScalarFunction {
+    def eval(): TestCaseClass = {
+      TestCaseClass("hello", 42)
+    }
+
+    override def getResultType(signature: Array[Class[_]]): TypeInformation[_] = {
+      createTypeInformation[TestCaseClass]
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/ExpressionReductionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/ExpressionReductionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/ExpressionReductionTest.scala
new file mode 100644
index 0000000..b660243
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/ExpressionReductionTest.scala
@@ -0,0 +1,425 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.Types
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.utils.TableTestBase
+import org.apache.flink.table.utils.TableTestUtil._
+import org.junit.Test
+
+class ExpressionReductionTest extends TableTestBase {
+
+  @Test
+  def testReduceCalcExpressionForBatchSQL(): Unit = {
+    val util = batchTestUtil()
+    util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+
+    val sqlQuery = "SELECT " +
+      "(3+4)+a, " +
+      "b+(1+2), " +
+      "CASE 11 WHEN 1 THEN 'a' ELSE 'b' END, " +
+      "TRIM(BOTH ' STRING '),  " +
+      "'test' || 'string', " +
+      "NULLIF(1, 1), " +
+      "TIMESTAMP '1990-10-14 23:00:00.123' + INTERVAL '10 00:00:01' DAY TO SECOND, " +
+      "EXTRACT(DAY FROM INTERVAL '19 12:10:10.123' DAY TO SECOND(3)),  " +
+      "1 IS NULL, " +
+      "'TEST' LIKE '%EST', " +
+      "FLOOR(2.5), " +
+      "'TEST' IN ('west', 'TEST', 'rest'), " +
+      "CAST(TRUE AS VARCHAR) || 'X'" +
+      "FROM MyTable WHERE a>(1+7)"
+
+    val expected = unaryNode(
+      "DataSetCalc",
+      batchTableNode(0),
+      term("select",
+        "+(7, a) AS EXPR$0",
+        "+(b, 3) AS EXPR$1",
+        "'b' AS EXPR$2",
+        "'STRING' AS EXPR$3",
+        "'teststring' AS EXPR$4",
+        "null AS EXPR$5",
+        "1990-10-24 23:00:01 AS EXPR$6",
+        "19 AS EXPR$7",
+        "false AS EXPR$8",
+        "true AS EXPR$9",
+        "2 AS EXPR$10",
+        "true AS EXPR$11",
+        "'trueX' AS EXPR$12"
+      ),
+      term("where", ">(a, 8)")
+    )
+
+    util.verifySql(sqlQuery, expected)
+  }
+
+  @Test
+  def testReduceProjectExpressionForBatchSQL(): Unit = {
+    val util = batchTestUtil()
+    util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+
+    val sqlQuery = "SELECT " +
+      "(3+4)+a, " +
+      "b+(1+2), " +
+      "CASE 11 WHEN 1 THEN 'a' ELSE 'b' END, " +
+      "TRIM(BOTH ' STRING '),  " +
+      "'test' || 'string', " +
+      "NULLIF(1, 1), " +
+      "TIMESTAMP '1990-10-14 23:00:00.123' + INTERVAL '10 00:00:01' DAY TO SECOND, " +
+      "EXTRACT(DAY FROM INTERVAL '19 12:10:10.123' DAY TO SECOND(3)),  " +
+      "1 IS NULL, " +
+      "'TEST' LIKE '%EST', " +
+      "FLOOR(2.5), " +
+      "'TEST' IN ('west', 'TEST', 'rest'), " +
+      "CAST(TRUE AS VARCHAR) || 'X'" +
+      "FROM MyTable"
+
+    val expected = unaryNode(
+      "DataSetCalc",
+      batchTableNode(0),
+      term("select",
+        "+(7, a) AS EXPR$0",
+        "+(b, 3) AS EXPR$1",
+        "'b' AS EXPR$2",
+        "'STRING' AS EXPR$3",
+        "'teststring' AS EXPR$4",
+        "null AS EXPR$5",
+        "1990-10-24 23:00:01 AS EXPR$6",
+        "19 AS EXPR$7",
+        "false AS EXPR$8",
+        "true AS EXPR$9",
+        "2 AS EXPR$10",
+        "true AS EXPR$11",
+        "'trueX' AS EXPR$12"
+      )
+    )
+
+    util.verifySql(sqlQuery, expected)
+  }
+
+  @Test
+  def testReduceFilterExpressionForBatchSQL(): Unit = {
+    val util = batchTestUtil()
+    util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+
+    val sqlQuery = "SELECT " +
+      "*" +
+      "FROM MyTable WHERE a>(1+7)"
+
+    val expected = unaryNode(
+      "DataSetCalc",
+      batchTableNode(0),
+      term("select", "a", "b", "c"),
+      term("where", ">(a, 8)")
+    )
+
+    util.verifySql(sqlQuery, expected)
+  }
+
+  @Test
+  def testReduceCalcExpressionForBatchTableAPI(): Unit = {
+    val util = batchTestUtil()
+    val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+
+    val result = table
+      .where('a > (1 + 7))
+      .select((3 + 4).toExpr + 6,
+              (11 === 1) ? ("a", "b"),
+              " STRING ".trim,
+              "test" + "string",
+              "1990-10-14 23:00:00.123".toTimestamp + 10.days + 1.second,
+              1.isNull,
+              "TEST".like("%EST"),
+              2.5.toExpr.floor(),
+              true.cast(Types.STRING) + "X")
+
+    val expected = unaryNode(
+      "DataSetCalc",
+      batchTableNode(0),
+      term("select",
+        "13 AS _c0",
+        "'b' AS _c1",
+        "'STRING' AS _c2",
+        "'teststring' AS _c3",
+        "1990-10-24 23:00:01 AS _c4",
+        "false AS _c5",
+        "true AS _c6",
+        "2E0 AS _c7",
+        "'trueX' AS _c8"
+      ),
+      term("where", ">(a, 8)")
+    )
+
+    util.verifyTable(result, expected)
+  }
+
+  @Test
+  def testReduceProjectExpressionForBatchTableAPI(): Unit = {
+    val util = batchTestUtil()
+    val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+
+    val result = table
+      .select((3 + 4).toExpr + 6,
+              (11 === 1) ? ("a", "b"),
+              " STRING ".trim,
+              "test" + "string",
+              "1990-10-14 23:00:00.123".toTimestamp + 10.days + 1.second,
+              1.isNull,
+              "TEST".like("%EST"),
+              2.5.toExpr.floor(),
+              true.cast(Types.STRING) + "X")
+
+    val expected = unaryNode(
+      "DataSetCalc",
+      batchTableNode(0),
+      term("select",
+        "13 AS _c0",
+        "'b' AS _c1",
+        "'STRING' AS _c2",
+        "'teststring' AS _c3",
+        "1990-10-24 23:00:01 AS _c4",
+        "false AS _c5",
+        "true AS _c6",
+        "2E0 AS _c7",
+        "'trueX' AS _c8"
+      )
+    )
+
+    util.verifyTable(result, expected)
+  }
+
+  @Test
+  def testReduceFilterExpressionForBatchTableAPI(): Unit = {
+    val util = batchTestUtil()
+    val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+
+    val result = table
+      .where('a > (1 + 7))
+
+    val expected = unaryNode(
+      "DataSetCalc",
+      batchTableNode(0),
+      term("select", "a", "b", "c"),
+      term("where", ">(a, 8)")
+    )
+
+    util.verifyTable(result, expected)
+  }
+
+  @Test
+  def testReduceCalcExpressionForStreamSQL(): Unit = {
+    val util = streamTestUtil()
+    util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+
+    val sqlQuery = "SELECT " +
+      "(3+4)+a, " +
+      "b+(1+2), " +
+      "CASE 11 WHEN 1 THEN 'a' ELSE 'b' END, " +
+      "TRIM(BOTH ' STRING '),  " +
+      "'test' || 'string', " +
+      "NULLIF(1, 1), " +
+      "TIMESTAMP '1990-10-14 23:00:00.123' + INTERVAL '10 00:00:01' DAY TO SECOND, " +
+      "EXTRACT(DAY FROM INTERVAL '19 12:10:10.123' DAY TO SECOND(3)),  " +
+      "1 IS NULL, " +
+      "'TEST' LIKE '%EST', " +
+      "FLOOR(2.5), " +
+      "'TEST' IN ('west', 'TEST', 'rest'), " +
+      "CAST(TRUE AS VARCHAR) || 'X'" +
+      "FROM MyTable WHERE a>(1+7)"
+
+    val expected = unaryNode(
+      "DataStreamCalc",
+      streamTableNode(0),
+      term("select",
+        "+(7, a) AS EXPR$0",
+        "+(b, 3) AS EXPR$1",
+        "'b' AS EXPR$2",
+        "'STRING' AS EXPR$3",
+        "'teststring' AS EXPR$4",
+        "null AS EXPR$5",
+        "1990-10-24 23:00:01 AS EXPR$6",
+        "19 AS EXPR$7",
+        "false AS EXPR$8",
+        "true AS EXPR$9",
+        "2 AS EXPR$10",
+        "true AS EXPR$11",
+        "'trueX' AS EXPR$12"
+      ),
+      term("where", ">(a, 8)")
+    )
+
+    util.verifySql(sqlQuery, expected)
+  }
+
+  @Test
+  def testReduceProjectExpressionForStreamSQL(): Unit = {
+    val util = streamTestUtil()
+    util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+
+    val sqlQuery = "SELECT " +
+      "(3+4)+a, " +
+      "b+(1+2), " +
+      "CASE 11 WHEN 1 THEN 'a' ELSE 'b' END, " +
+      "TRIM(BOTH ' STRING '),  " +
+      "'test' || 'string', " +
+      "NULLIF(1, 1), " +
+      "TIMESTAMP '1990-10-14 23:00:00.123' + INTERVAL '10 00:00:01' DAY TO SECOND, " +
+      "EXTRACT(DAY FROM INTERVAL '19 12:10:10.123' DAY TO SECOND(3)),  " +
+      "1 IS NULL, " +
+      "'TEST' LIKE '%EST', " +
+      "FLOOR(2.5), " +
+      "'TEST' IN ('west', 'TEST', 'rest'), " +
+      "CAST(TRUE AS VARCHAR) || 'X'" +
+      "FROM MyTable"
+
+    val expected = unaryNode(
+      "DataStreamCalc",
+      streamTableNode(0),
+      term("select",
+        "+(7, a) AS EXPR$0",
+        "+(b, 3) AS EXPR$1",
+        "'b' AS EXPR$2",
+        "'STRING' AS EXPR$3",
+        "'teststring' AS EXPR$4",
+        "null AS EXPR$5",
+        "1990-10-24 23:00:01 AS EXPR$6",
+        "19 AS EXPR$7",
+        "false AS EXPR$8",
+        "true AS EXPR$9",
+        "2 AS EXPR$10",
+        "true AS EXPR$11",
+        "'trueX' AS EXPR$12"
+      )
+    )
+
+    util.verifySql(sqlQuery, expected)
+  }
+
+  @Test
+  def testReduceFilterExpressionForStreamSQL(): Unit = {
+    val util = streamTestUtil()
+    util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+
+    val sqlQuery = "SELECT " +
+      "*" +
+      "FROM MyTable WHERE a>(1+7)"
+
+    val expected = unaryNode(
+      "DataStreamCalc",
+      streamTableNode(0),
+      term("select", "a", "b", "c"),
+      term("where", ">(a, 8)")
+    )
+
+    util.verifySql(sqlQuery, expected)
+  }
+
+  @Test
+  def testReduceCalcExpressionForStreamTableAPI(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+
+    val result = table
+      .where('a > (1 + 7))
+      .select((3 + 4).toExpr + 6,
+              (11 === 1) ? ("a", "b"),
+              " STRING ".trim,
+              "test" + "string",
+              "1990-10-14 23:00:00.123".toTimestamp + 10.days + 1.second,
+              1.isNull,
+              "TEST".like("%EST"),
+              2.5.toExpr.floor(),
+              true.cast(Types.STRING) + "X")
+
+    val expected = unaryNode(
+      "DataStreamCalc",
+      streamTableNode(0),
+      term("select",
+        "13 AS _c0",
+        "'b' AS _c1",
+        "'STRING' AS _c2",
+        "'teststring' AS _c3",
+        "1990-10-24 23:00:01 AS _c4",
+        "false AS _c5",
+        "true AS _c6",
+        "2E0 AS _c7",
+        "'trueX' AS _c8"
+      ),
+      term("where", ">(a, 8)")
+    )
+
+    util.verifyTable(result, expected)
+  }
+
+  @Test
+  def testReduceProjectExpressionForStreamTableAPI(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+
+    val result =  table
+      .select((3 + 4).toExpr + 6,
+              (11 === 1) ? ("a", "b"),
+              " STRING ".trim,
+              "test" + "string",
+              "1990-10-14 23:00:00.123".toTimestamp + 10.days + 1.second,
+              1.isNull,
+              "TEST".like("%EST"),
+              2.5.toExpr.floor(),
+              true.cast(Types.STRING) + "X")
+
+    val expected = unaryNode(
+      "DataStreamCalc",
+      streamTableNode(0),
+      term("select",
+        "13 AS _c0",
+        "'b' AS _c1",
+        "'STRING' AS _c2",
+        "'teststring' AS _c3",
+        "1990-10-24 23:00:01 AS _c4",
+        "false AS _c5",
+        "true AS _c6",
+        "2E0 AS _c7",
+        "'trueX' AS _c8"
+      )
+    )
+
+    util.verifyTable(result, expected)
+  }
+
+  @Test
+  def testReduceFilterExpressionForStreamTableAPI(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+
+    val result = table
+      .where('a > (1 + 7))
+
+    val expected = unaryNode(
+      "DataStreamCalc",
+      streamTableNode(0),
+      term("select", "a", "b", "c"),
+      term("where", ">(a, 8)")
+    )
+
+    util.verifyTable(result, expected)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala
new file mode 100644
index 0000000..b90de97
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table
+
+import org.apache.calcite.tools.RuleSet
+import org.apache.flink.api.scala._
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.{TupleTypeInfo, TypeExtractor}
+import org.apache.flink.table.api.{Table, TableConfig, TableEnvironment, TableException}
+import org.apache.flink.table.expressions.{Alias, UnresolvedFieldReference}
+import org.apache.flink.table.sinks.TableSink
+import org.junit.Test
+import org.junit.Assert.assertEquals
+
+class TableEnvironmentTest {
+
+  val tEnv = new MockTableEnvironment
+
+  val tupleType = new TupleTypeInfo(
+    INT_TYPE_INFO,
+    STRING_TYPE_INFO,
+    DOUBLE_TYPE_INFO)
+
+  val caseClassType = implicitly[TypeInformation[CClass]]
+
+  val pojoType = TypeExtractor.createTypeInfo(classOf[PojoClass])
+
+  val atomicType = INT_TYPE_INFO
+
+  @Test
+  def testGetFieldInfoTuple(): Unit = {
+    val fieldInfo = tEnv.getFieldInfo(tupleType)
+
+    fieldInfo._1.zip(Array("f0", "f1", "f2")).foreach(x => assertEquals(x._2, x._1))
+    fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1))
+  }
+
+  @Test
+  def testGetFieldInfoCClass(): Unit = {
+    val fieldInfo = tEnv.getFieldInfo(caseClassType)
+
+    fieldInfo._1.zip(Array("cf1", "cf2", "cf3")).foreach(x => assertEquals(x._2, x._1))
+    fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1))
+  }
+
+  @Test
+  def testGetFieldInfoPojo(): Unit = {
+    val fieldInfo = tEnv.getFieldInfo(pojoType)
+
+    fieldInfo._1.zip(Array("pf1", "pf2", "pf3")).foreach(x => assertEquals(x._2, x._1))
+    fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1))
+  }
+
+  @Test(expected = classOf[TableException])
+  def testGetFieldInfoAtomic(): Unit = {
+    tEnv.getFieldInfo(atomicType)
+  }
+
+  @Test
+  def testGetFieldInfoTupleNames(): Unit = {
+    val fieldInfo = tEnv.getFieldInfo(
+      tupleType,
+      Array(
+        new UnresolvedFieldReference("name1"),
+        new UnresolvedFieldReference("name2"),
+        new UnresolvedFieldReference("name3")
+    ))
+
+    fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x => assertEquals(x._2, x._1))
+    fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1))
+  }
+
+  @Test
+  def testGetFieldInfoCClassNames(): Unit = {
+    val fieldInfo = tEnv.getFieldInfo(
+      caseClassType,
+      Array(
+        new UnresolvedFieldReference("name1"),
+        new UnresolvedFieldReference("name2"),
+        new UnresolvedFieldReference("name3")
+    ))
+
+    fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x => assertEquals(x._2, x._1))
+    fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1))
+  }
+
+  @Test(expected = classOf[TableException])
+  def testGetFieldInfoPojoNames1(): Unit = {
+    tEnv.getFieldInfo(
+      pojoType,
+      Array(
+        new UnresolvedFieldReference("name1"),
+        new UnresolvedFieldReference("name2"),
+        new UnresolvedFieldReference("name3")
+      ))
+  }
+
+  @Test
+  def testGetFieldInfoPojoNames2(): Unit = {
+    val fieldInfo = tEnv.getFieldInfo(
+      pojoType,
+      Array(
+        new UnresolvedFieldReference("pf3"),
+        new UnresolvedFieldReference("pf1"),
+        new UnresolvedFieldReference("pf2")
+      ))
+
+    fieldInfo._1.zip(Array("pf3", "pf1", "pf2")).foreach(x => assertEquals(x._2, x._1))
+    fieldInfo._2.zip(Array(2, 0, 1)).foreach(x => assertEquals(x._2, x._1))
+  }
+
+  @Test
+  def testGetFieldInfoAtomicName1(): Unit = {
+    val fieldInfo = tEnv.getFieldInfo(
+      atomicType,
+      Array(new UnresolvedFieldReference("name"))
+    )
+
+    fieldInfo._1.zip(Array("name")).foreach(x => assertEquals(x._2, x._1))
+    fieldInfo._2.zip(Array(0)).foreach(x => assertEquals(x._2, x._1))
+  }
+
+  @Test(expected = classOf[TableException])
+  def testGetFieldInfoAtomicName2(): Unit = {
+    tEnv.getFieldInfo(
+      atomicType,
+      Array(
+        new UnresolvedFieldReference("name1"),
+        new UnresolvedFieldReference("name2")
+      ))
+  }
+
+  @Test
+  def testGetFieldInfoTupleAlias1(): Unit = {
+    val fieldInfo = tEnv.getFieldInfo(
+      tupleType,
+      Array(
+        new Alias(UnresolvedFieldReference("f0"), "name1"),
+        new Alias(UnresolvedFieldReference("f1"), "name2"),
+        new Alias(UnresolvedFieldReference("f2"), "name3")
+      ))
+
+    fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x => assertEquals(x._2, x._1))
+    fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1))
+  }
+
+  @Test
+  def testGetFieldInfoTupleAlias2(): Unit = {
+    val fieldInfo = tEnv.getFieldInfo(
+      tupleType,
+      Array(
+        new Alias(UnresolvedFieldReference("f2"), "name1"),
+        new Alias(UnresolvedFieldReference("f0"), "name2"),
+        new Alias(UnresolvedFieldReference("f1"), "name3")
+      ))
+
+    fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x => assertEquals(x._2, x._1))
+    fieldInfo._2.zip(Array(2, 0, 1)).foreach(x => assertEquals(x._2, x._1))
+  }
+
+  @Test(expected = classOf[TableException])
+  def testGetFieldInfoTupleAlias3(): Unit = {
+    tEnv.getFieldInfo(
+      tupleType,
+      Array(
+        new Alias(UnresolvedFieldReference("xxx"), "name1"),
+        new Alias(UnresolvedFieldReference("yyy"), "name2"),
+        new Alias(UnresolvedFieldReference("zzz"), "name3")
+      ))
+  }
+
+  @Test
+  def testGetFieldInfoCClassAlias1(): Unit = {
+    val fieldInfo = tEnv.getFieldInfo(
+      caseClassType,
+      Array(
+        new Alias(new UnresolvedFieldReference("cf1"), "name1"),
+        new Alias(new UnresolvedFieldReference("cf2"), "name2"),
+        new Alias(new UnresolvedFieldReference("cf3"), "name3")
+      ))
+
+    fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x => assertEquals(x._2, x._1))
+    fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1))
+  }
+
+  @Test
+  def testGetFieldInfoCClassAlias2(): Unit = {
+    val fieldInfo = tEnv.getFieldInfo(
+      caseClassType,
+      Array(
+        new Alias(new UnresolvedFieldReference("cf3"), "name1"),
+        new Alias(new UnresolvedFieldReference("cf1"), "name2"),
+        new Alias(new UnresolvedFieldReference("cf2"), "name3")
+      ))
+
+    fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x => assertEquals(x._2, x._1))
+    fieldInfo._2.zip(Array(2, 0, 1)).foreach(x => assertEquals(x._2, x._1))
+  }
+
+  @Test(expected = classOf[TableException])
+  def testGetFieldInfoCClassAlias3(): Unit = {
+    tEnv.getFieldInfo(
+      caseClassType,
+      Array(
+        new Alias(new UnresolvedFieldReference("xxx"), "name1"),
+        new Alias(new UnresolvedFieldReference("yyy"), "name2"),
+        new Alias(new UnresolvedFieldReference("zzz"), "name3")
+      ))
+  }
+
+  @Test
+  def testGetFieldInfoPojoAlias1(): Unit = {
+    val fieldInfo = tEnv.getFieldInfo(
+      pojoType,
+      Array(
+        new Alias(new UnresolvedFieldReference("pf1"), "name1"),
+        new Alias(new UnresolvedFieldReference("pf2"), "name2"),
+        new Alias(new UnresolvedFieldReference("pf3"), "name3")
+      ))
+
+    fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x => assertEquals(x._2, x._1))
+    fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1))
+  }
+
+  @Test
+  def testGetFieldInfoPojoAlias2(): Unit = {
+    val fieldInfo = tEnv.getFieldInfo(
+      pojoType,
+      Array(
+        new Alias(new UnresolvedFieldReference("pf3"), "name1"),
+        new Alias(new UnresolvedFieldReference("pf1"), "name2"),
+        new Alias(new UnresolvedFieldReference("pf2"), "name3")
+      ))
+
+    fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x => assertEquals(x._2, x._1))
+    fieldInfo._2.zip(Array(2, 0, 1)).foreach(x => assertEquals(x._2, x._1))
+  }
+
+  @Test(expected = classOf[TableException])
+  def testGetFieldInfoPojoAlias3(): Unit = {
+    tEnv.getFieldInfo(
+      pojoType,
+      Array(
+        new Alias(new UnresolvedFieldReference("xxx"), "name1"),
+        new Alias(new UnresolvedFieldReference("yyy"), "name2"),
+        new Alias(new UnresolvedFieldReference("zzz"), "name3")
+      ))
+  }
+
+  @Test(expected = classOf[TableException])
+  def testGetFieldInfoAtomicAlias(): Unit = {
+    tEnv.getFieldInfo(
+      atomicType,
+      Array(
+        new Alias(new UnresolvedFieldReference("name1"), "name2")
+      ))
+  }
+
+}
+
+class MockTableEnvironment extends TableEnvironment(new TableConfig) {
+
+  override private[flink] def writeToSink[T](table: Table, sink: TableSink[T]): Unit = ???
+
+  override protected def checkValidTableName(name: String): Unit = ???
+
+  override protected def getBuiltInRuleSet: RuleSet = ???
+
+  override def sql(query: String): Table = ???
+}
+
+case class CClass(cf1: Int, cf2: String, cf3: Double)
+
+class PojoClass(var pf1: Int, var pf2: String, var pf3: Double) {
+  def this() = this(0, "", 0.0)
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/ExplainTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/ExplainTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/ExplainTest.scala
new file mode 100644
index 0000000..a323ec9
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/ExplainTest.scala
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.batch
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.api.scala._
+import org.apache.flink.test.util.MultipleProgramsTestBase
+import org.junit.Assert.assertEquals
+import org.junit._
+
+class ExplainTest
+  extends MultipleProgramsTestBase(MultipleProgramsTestBase.TestExecutionMode.CLUSTER) {
+
+  val testFilePath = ExplainTest.this.getClass.getResource("/").getFile
+
+  @Test
+  def testFilterWithoutExtended() : Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val table = env.fromElements((1, "hello"))
+      .toTable(tEnv, 'a, 'b)
+      .filter("a % 2 = 0")
+
+    val result = tEnv.explain(table).replaceAll("\\r\\n", "\n")
+    val source = scala.io.Source.fromFile(testFilePath +
+      "../../src/test/scala/resources/testFilter0.out").mkString.replaceAll("\\r\\n", "\n")
+    assertEquals(result, source)
+  }
+
+  @Test
+  def testFilterWithExtended() : Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val table = env.fromElements((1, "hello"))
+      .toTable(tEnv, 'a, 'b)
+      .filter("a % 2 = 0")
+
+    val result = tEnv.explain(table, true).replaceAll("\\r\\n", "\n")
+    val source = scala.io.Source.fromFile(testFilePath +
+      "../../src/test/scala/resources/testFilter1.out").mkString.replaceAll("\\r\\n", "\n")
+    assertEquals(result, source)
+  }
+
+  @Test
+  def testJoinWithoutExtended() : Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val table1 = env.fromElements((1, "hello")).toTable(tEnv, 'a, 'b)
+    val table2 = env.fromElements((1, "hello")).toTable(tEnv, 'c, 'd)
+    val table = table1.join(table2).where("b = d").select("a, c")
+
+    val result = tEnv.explain(table).replaceAll("\\r\\n", "\n")
+    val source = scala.io.Source.fromFile(testFilePath +
+      "../../src/test/scala/resources/testJoin0.out").mkString.replaceAll("\\r\\n", "\n")
+    assertEquals(source, result)
+  }
+
+  @Test
+  def testJoinWithExtended() : Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val table1 = env.fromElements((1, "hello")).toTable(tEnv, 'a, 'b)
+    val table2 = env.fromElements((1, "hello")).toTable(tEnv, 'c, 'd)
+    val table = table1.join(table2).where("b = d").select("a, c")
+
+    val result = tEnv.explain(table, true).replaceAll("\\r\\n", "\n")
+    val source = scala.io.Source.fromFile(testFilePath +
+      "../../src/test/scala/resources/testJoin1.out").mkString.replaceAll("\\r\\n", "\n")
+    assertEquals(source, result)
+  }
+
+  @Test
+  def testUnionWithoutExtended() : Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val table1 = env.fromElements((1, "hello")).toTable(tEnv, 'count, 'word)
+    val table2 = env.fromElements((1, "hello")).toTable(tEnv, 'count, 'word)
+    val table = table1.unionAll(table2)
+
+    val result = tEnv.explain(table).replaceAll("\\r\\n", "\n")
+    val source = scala.io.Source.fromFile(testFilePath +
+      "../../src/test/scala/resources/testUnion0.out").mkString.replaceAll("\\r\\n", "\n")
+    assertEquals(result, source)
+  }
+
+  @Test
+  def testUnionWithExtended() : Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val table1 = env.fromElements((1, "hello")).toTable(tEnv, 'count, 'word)
+    val table2 = env.fromElements((1, "hello")).toTable(tEnv, 'count, 'word)
+    val table = table1.unionAll(table2)
+
+    val result = tEnv.explain(table, true).replaceAll("\\r\\n", "\n")
+    val source = scala.io.Source.fromFile(testFilePath +
+      "../../src/test/scala/resources/testUnion1.out").mkString.replaceAll("\\r\\n", "\n")
+    assertEquals(result, source)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/ProjectableTableSourceITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/ProjectableTableSourceITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/ProjectableTableSourceITCase.scala
new file mode 100644
index 0000000..ec4dc59
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/ProjectableTableSourceITCase.scala
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.batch
+
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.api.java.{DataSet => JavaSet, ExecutionEnvironment => JavaExecEnv}
+import org.apache.flink.api.scala.ExecutionEnvironment
+import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase
+import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.sources.{BatchTableSource, ProjectableTableSource}
+import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.TestBaseUtils
+import org.apache.flink.types.Row
+import org.junit.{Before, Test}
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+
+import scala.collection.JavaConverters._
+
+@RunWith(classOf[Parameterized])
+class ProjectableTableSourceITCase(mode: TestExecutionMode,
+  configMode: TableConfigMode)
+  extends TableProgramsTestBase(mode, configMode) {
+
+  private val tableName = "MyTable"
+  private var tableEnv: BatchTableEnvironment = null
+
+  @Before
+  def initTableEnv(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    tableEnv = TableEnvironment.getTableEnvironment(env, config)
+    tableEnv.registerTableSource(tableName, new TestProjectableTableSource)
+  }
+
+  @Test
+  def testTableAPI(): Unit = {
+    val results = tableEnv
+                  .scan(tableName)
+                  .where("amount < 4")
+                  .select("id, name")
+                  .collect()
+
+    val expected = Seq(
+      "0,Record_0", "1,Record_1", "2,Record_2", "3,Record_3", "16,Record_16",
+      "17,Record_17", "18,Record_18", "19,Record_19", "32,Record_32").mkString("\n")
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+
+  @Test
+  def testSQL(): Unit = {
+    val results = tableEnv
+                  .sql(s"select id, name from $tableName where amount < 4 ")
+                  .collect()
+
+    val expected = Seq(
+      "0,Record_0", "1,Record_1", "2,Record_2", "3,Record_3", "16,Record_16",
+      "17,Record_17", "18,Record_18", "19,Record_19", "32,Record_32").mkString("\n")
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+}
+
+class TestProjectableTableSource(
+  fieldTypes: Array[TypeInformation[_]],
+  fieldNames: Array[String])
+  extends BatchTableSource[Row] with ProjectableTableSource[Row] {
+
+  def this() = this(
+    fieldTypes = Array(
+      BasicTypeInfo.STRING_TYPE_INFO,
+      BasicTypeInfo.LONG_TYPE_INFO,
+      BasicTypeInfo.INT_TYPE_INFO,
+      BasicTypeInfo.DOUBLE_TYPE_INFO),
+    fieldNames = Array[String]("name", "id", "amount", "price")
+  )
+
+  /** Returns the data of the table as a [[org.apache.flink.api.java.DataSet]]. */
+  override def getDataSet(execEnv: JavaExecEnv): JavaSet[Row] = {
+    execEnv.fromCollection(generateDynamicCollection(33, fieldNames).asJava, getReturnType)
+  }
+
+  /** Returns the types of the table fields. */
+  override def getFieldTypes: Array[TypeInformation[_]] = fieldTypes
+
+  /** Returns the names of the table fields. */
+  override def getFieldsNames: Array[String] = fieldNames
+
+  /** Returns the [[TypeInformation]] for the return type. */
+  override def getReturnType: TypeInformation[Row] = new RowTypeInfo(fieldTypes: _*)
+
+  /** Returns the number of fields of the table. */
+  override def getNumberOfFields: Int = fieldNames.length
+
+  override def projectFields(fields: Array[Int]): TestProjectableTableSource = {
+    val projectedFieldTypes = new Array[TypeInformation[_]](fields.length)
+    val projectedFieldNames = new Array[String](fields.length)
+
+    fields.zipWithIndex.foreach(f => {
+      projectedFieldTypes(f._2) = fieldTypes(f._1)
+      projectedFieldNames(f._2) = fieldNames(f._1)
+    })
+    new TestProjectableTableSource(projectedFieldTypes, projectedFieldNames)
+  }
+
+  private def generateDynamicCollection(num: Int, fieldNames: Array[String]): Seq[Row] = {
+    for {cnt <- 0 until num}
+      yield {
+        val row = new Row(fieldNames.length)
+        fieldNames.zipWithIndex.foreach(
+          f =>
+            f._1 match {
+              case "name" =>
+                row.setField(f._2, "Record_" + cnt)
+              case "id" =>
+                row.setField(f._2, cnt.toLong)
+              case "amount" =>
+                row.setField(f._2, cnt.toInt % 16)
+              case "price" =>
+                row.setField(f._2, cnt.toDouble / 3)
+              case _ =>
+                throw new IllegalArgumentException(s"unknown field name $f._1")
+            }
+        )
+        row
+      }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableEnvironmentITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableEnvironmentITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableEnvironmentITCase.scala
new file mode 100644
index 0000000..961e575
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableEnvironmentITCase.scala
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.batch
+
+import java.util
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase
+import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
+import org.apache.flink.table.api.scala._
+import org.apache.flink.api.scala.util.CollectionDataSets
+import org.apache.flink.types.Row
+import org.apache.flink.table.api.{TableEnvironment, TableException}
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.TestBaseUtils
+import org.junit._
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+
+import scala.collection.JavaConverters._
+
+@RunWith(classOf[Parameterized])
+class TableEnvironmentITCase(
+    mode: TestExecutionMode,
+    configMode: TableConfigMode)
+  extends TableProgramsTestBase(mode, configMode) {
+
+  @Test
+  def testSimpleRegister(): Unit = {
+
+    val tableName = "MyTable"
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val ds = CollectionDataSets.get3TupleDataSet(env)
+    tEnv.registerDataSet(tableName, ds)
+    val t = tEnv.scan(tableName).select('_1, '_2, '_3)
+
+    val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" +
+      "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" +
+      "7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" +
+      "11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" +
+      "15,5,Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" +
+      "19,6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
+    val results = t.toDataSet[Row].collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testRegisterWithFields(): Unit = {
+
+    val tableName = "MyTable"
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val ds = CollectionDataSets.get3TupleDataSet(env)
+    tEnv.registerDataSet(tableName, ds, 'a, 'b, 'c)
+    val t = tEnv.scan(tableName).select('a, 'b)
+
+    val expected = "1,1\n" + "2,2\n" + "3,2\n" + "4,3\n" + "5,3\n" + "6,3\n" +
+      "7,4\n" + "8,4\n" + "9,4\n" + "10,4\n" + "11,5\n" + "12,5\n" + "13,5\n" + "14,5\n" +
+      "15,5\n" + "16,6\n" + "17,6\n" + "18,6\n" + "19,6\n" + "20,6\n" + "21,6\n"
+    val results = t.toDataSet[Row].collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test(expected = classOf[TableException])
+  def testRegisterExistingDataSet(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val ds1 = CollectionDataSets.get3TupleDataSet(env)
+    tEnv.registerDataSet("MyTable", ds1)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env)
+    // Must fail. Name is already in use.
+    tEnv.registerDataSet("MyTable", ds2)
+  }
+
+  @Test(expected = classOf[TableException])
+  def testScanUnregisteredTable(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+    // Must fail. No table registered under that name.
+    tEnv.scan("someTable")
+  }
+
+  @Test
+  def testTableRegister(): Unit = {
+
+    val tableName = "MyTable"
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+    tEnv.registerTable(tableName, t)
+
+    val regT = tEnv.scan(tableName).select('a, 'b).filter('a > 8)
+
+    val expected = "9,4\n" + "10,4\n" +
+      "11,5\n" + "12,5\n" + "13,5\n" + "14,5\n" +
+      "15,5\n" + "16,6\n" + "17,6\n" + "18,6\n" +
+      "19,6\n" + "20,6\n" + "21,6\n"
+
+    val results = regT.toDataSet[Row].collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test(expected = classOf[TableException])
+  def testRegisterExistingTable(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val t1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
+    tEnv.registerTable("MyTable", t1)
+    val t2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv)
+    // Must fail. Name is already in use.
+    tEnv.registerDataSet("MyTable", t2)
+  }
+
+  @Test(expected = classOf[TableException])
+  def testRegisterTableFromOtherEnv(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv1 = TableEnvironment.getTableEnvironment(env, config)
+    val tEnv2 = TableEnvironment.getTableEnvironment(env, config)
+
+    val t1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv1)
+    // Must fail. Table is bound to different TableEnvironment.
+    tEnv2.registerTable("MyTable", t1)
+  }
+
+  @Test
+  def testToTable(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val t = CollectionDataSets.get3TupleDataSet(env)
+      .toTable(tEnv, 'a, 'b, 'c)
+      .select('a, 'b, 'c)
+
+    val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" +
+      "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" +
+      "7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" +
+      "11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" +
+      "15,5,Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" +
+      "19,6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
+    val results = t.toDataSet[Row].collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testToTableFromCaseClass(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val data = List(
+      SomeCaseClass("Peter", 28, 4000.00, "Sales"),
+      SomeCaseClass("Anna", 56, 10000.00, "Engineering"),
+      SomeCaseClass("Lucy", 42, 6000.00, "HR"))
+
+    val t =  env.fromCollection(data)
+      .toTable(tEnv, 'a, 'b, 'c, 'd)
+      .select('a, 'b, 'c, 'd)
+
+    val expected: String =
+      "Peter,28,4000.0,Sales\n" +
+      "Anna,56,10000.0,Engineering\n" +
+      "Lucy,42,6000.0,HR\n"
+    val results = t.toDataSet[Row].collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testToTableFromAndToCaseClass(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val data = List(
+      SomeCaseClass("Peter", 28, 4000.00, "Sales"),
+      SomeCaseClass("Anna", 56, 10000.00, "Engineering"),
+      SomeCaseClass("Lucy", 42, 6000.00, "HR"))
+
+    val t =  env.fromCollection(data)
+      .toTable(tEnv, 'a, 'b, 'c, 'd)
+      .select('a, 'b, 'c, 'd)
+
+    val expected: String =
+      "SomeCaseClass(Peter,28,4000.0,Sales)\n" +
+      "SomeCaseClass(Anna,56,10000.0,Engineering)\n" +
+      "SomeCaseClass(Lucy,42,6000.0,HR)\n"
+    val results = t.toDataSet[SomeCaseClass].collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test(expected = classOf[TableException])
+  def testToTableWithToFewFields(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    CollectionDataSets.get3TupleDataSet(env)
+      // Must fail. Number of fields does not match.
+      .toTable(tEnv, 'a, 'b)
+  }
+
+  @Test(expected = classOf[TableException])
+  def testToTableWithToManyFields(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    CollectionDataSets.get3TupleDataSet(env)
+      // Must fail. Number of fields does not match.
+      .toTable(tEnv, 'a, 'b, 'c, 'd)
+  }
+
+  @Test(expected = classOf[TableException])
+  def testToTableWithAmbiguousFields(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    CollectionDataSets.get3TupleDataSet(env)
+      // Must fail. Field names not unique.
+      .toTable(tEnv, 'a, 'b, 'b)
+  }
+
+  @Test(expected = classOf[TableException])
+  def testToTableWithNonFieldReference1(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    // Must fail. as() can only have field references
+    CollectionDataSets.get3TupleDataSet(env)
+      .toTable(tEnv, 'a + 1, 'b, 'c)
+  }
+
+  @Test(expected = classOf[TableException])
+  def testToTableWithNonFieldReference2(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    // Must fail. as() can only have field references
+    CollectionDataSets.get3TupleDataSet(env)
+      .toTable(tEnv, 'a as 'foo, 'b, 'c)
+  }
+}
+
+object TableEnvironmentITCase {
+
+  @Parameterized.Parameters(name = "Execution mode = {0}, Table config = {1}")
+  def parameters(): util.Collection[Array[java.lang.Object]] = {
+    Seq[Array[AnyRef]](
+      Array(TestExecutionMode.COLLECTION, TableProgramsTestBase.DEFAULT),
+      Array(TestExecutionMode.COLLECTION, TableProgramsTestBase.EFFICIENT)).asJava
+  }
+}
+
+case class SomeCaseClass(name: String, age: Int, salary: Double, department: String) {
+  def this() { this("", 0, 0.0, "") }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableSinkITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableSinkITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableSinkITCase.scala
new file mode 100644
index 0000000..8bc7874
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableSinkITCase.scala
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.batch
+
+import java.io.File
+
+import org.apache.flink.api.scala.{ExecutionEnvironment, _}
+import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase
+import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
+import org.apache.flink.table.api.scala._
+import org.apache.flink.api.scala.util.CollectionDataSets
+import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.sinks.CsvTableSink
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.TestBaseUtils
+import org.junit.Test
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+
+
+@RunWith(classOf[Parameterized])
+class TableSinkITCase(
+    mode: TestExecutionMode,
+    configMode: TableConfigMode)
+  extends TableProgramsTestBase(mode, configMode) {
+
+  @Test
+  def testBatchTableSink(): Unit = {
+
+    val tmpFile = File.createTempFile("flink-table-sink-test", ".tmp")
+    tmpFile.deleteOnExit()
+    val path = tmpFile.toURI.toString
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+    env.setParallelism(4)
+
+    val input = CollectionDataSets.get3TupleDataSet(env)
+      .map(x => x).setParallelism(4) // increase DOP to 4
+
+    val results = input.toTable(tEnv, 'a, 'b, 'c)
+      .where('a < 5 || 'a > 17)
+      .select('c, 'b)
+      .writeToSink(new CsvTableSink(path, fieldDelim = "|"))
+
+    env.execute()
+
+    val expected = Seq(
+      "Hi|1", "Hello|2", "Hello world|2", "Hello world, how are you?|3",
+      "Comment#12|6", "Comment#13|6", "Comment#14|6", "Comment#15|6").mkString("\n")
+
+    TestBaseUtils.compareResultsByLinesInMemory(expected, path)
+  }
+
+}