You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/09/20 08:13:17 UTC

[2/4] flink git commit: [FLINK-6442] [table] Add registration for TableSinks and INSERT INTO support for SQL and Table API.

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/SortValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/SortValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/SortValidationTest.scala
index d3f9b9f..cfc8067 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/SortValidationTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/SortValidationTest.scala
@@ -34,6 +34,6 @@ class SortValidationTest extends TableTestBase {
 
     val sqlQuery = "SELECT * FROM MyTable LIMIT 5"
 
-    util.tableEnv.sql(sqlQuery).toDataSet[Row]
+    util.tableEnv.sqlQuery(sqlQuery).toDataSet[Row]
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/validation/InsertIntoValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/validation/InsertIntoValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/validation/InsertIntoValidationTest.scala
new file mode 100644
index 0000000..2cfe931
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/validation/InsertIntoValidationTest.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.batch.table.validation
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.{Types, ValidationException}
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.utils.{MemoryTableSinkUtil, TableTestBase}
+import org.junit._
+
+class InsertIntoValidationTest extends TableTestBase {
+
+  @Test(expected = classOf[ValidationException])
+  def testInconsistentLengthInsert(): Unit = {
+    val util = batchTestUtil()
+    util.addTable[(Int, Long, String)]("sourceTable", 'a, 'b, 'c)
+
+    val fieldNames = Array("d", "e")
+    val fieldTypes: Array[TypeInformation[_]] = Array(Types.INT, Types.LONG)
+    val sink = new MemoryTableSinkUtil.UnsafeMemoryAppendTableSink
+    util.tableEnv.registerTableSink("targetTable", fieldNames, fieldTypes, sink)
+
+    // must fail because TableSink accepts fewer fields.
+    util.tableEnv.scan("sourceTable")
+      .select('a, 'b, 'c)
+      .insertInto("targetTable")
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testUnmatchedTypesInsert(): Unit = {
+    val util = batchTestUtil()
+    util.addTable[(Int, Long, String)]("sourceTable", 'a, 'b, 'c)
+
+    val fieldNames = Array("d", "e", "f")
+    val fieldTypes: Array[TypeInformation[_]] = Array(Types.STRING, Types.INT, Types.LONG)
+    val sink = new MemoryTableSinkUtil.UnsafeMemoryAppendTableSink
+    util.tableEnv.registerTableSink("targetTable", fieldNames, fieldTypes, sink)
+
+    // must fail because types of result and TableSink do not match.
+    util.tableEnv.scan("sourceTable")
+      .select('a, 'b, 'c)
+      .insertInto("targetTable")
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala
index 0943ea6..1b99679 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala
@@ -41,7 +41,7 @@ class StreamTableEnvironmentTest extends TableTestBase {
     val util = streamTestUtil()
     val table = util.addTable[(Long, Int, String)]("tableName", 'a, 'b, 'c)
 
-    val sqlTable = util.tableEnv.sql(s"SELECT a, b, c FROM $table WHERE b > 12")
+    val sqlTable = util.tableEnv.sqlQuery(s"SELECT a, b, c FROM $table WHERE b > 12")
 
     val expected = unaryNode(
       "DataStreamCalc",
@@ -53,7 +53,7 @@ class StreamTableEnvironmentTest extends TableTestBase {
 
     val table2 = util.addTable[(Long, Int, String)]('d, 'e, 'f)
 
-    val sqlTable2 = util.tableEnv.sql(s"SELECT d, e, f FROM $table2 " +
+    val sqlTable2 = util.tableEnv.sqlQuery(s"SELECT d, e, f FROM $table2 " +
         s"UNION ALL SELECT a, b, c FROM $table")
 
     val expected2 = binaryNode(

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala
index 640fd26..e066fe4 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala
@@ -206,7 +206,7 @@ class JoinTest extends TableTestBase {
     val query =
       "SELECT t1.a, t2.b FROM MyTable as t1 join MyTable2 as t2 on t1.a = t2.a and " + timeSql
 
-    val resultTable = streamUtil.tableEnv.sql(query)
+    val resultTable = streamUtil.tableEnv.sqlQuery(query)
     val relNode = resultTable.getRelNode
     val joinNode = relNode.getInput(0).asInstanceOf[LogicalJoin]
     val rexNode = joinNode.getCondition
@@ -230,7 +230,7 @@ class JoinTest extends TableTestBase {
       query: String,
       expectCondStr: String): Unit = {
 
-    val resultTable = streamUtil.tableEnv.sql(query)
+    val resultTable = streamUtil.tableEnv.sqlQuery(query)
     val relNode = resultTable.getRelNode
     val joinNode = relNode.getInput(0).asInstanceOf[LogicalJoin]
     val joinInfo = joinNode.analyzeCondition

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/InsertIntoValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/InsertIntoValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/InsertIntoValidationTest.scala
new file mode 100644
index 0000000..3045100
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/InsertIntoValidationTest.scala
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.stream.sql.validation
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.{TableEnvironment, Types, ValidationException}
+import org.apache.flink.table.runtime.utils.StreamTestData
+import org.apache.flink.table.utils.MemoryTableSinkUtil
+import org.junit.Test
+
+class InsertIntoValidationTest {
+
+  @Test(expected = classOf[ValidationException])
+  def testInconsistentLengthInsert(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val t = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
+    tEnv.registerTable("sourceTable", t)
+
+    val fieldNames = Array("d", "e")
+    val fieldTypes: Array[TypeInformation[_]] = Array(Types.INT, Types.LONG)
+    val sink = new MemoryTableSinkUtil.UnsafeMemoryAppendTableSink
+    tEnv.registerTableSink("targetTable", fieldNames, fieldTypes, sink)
+
+    val sql = "INSERT INTO targetTable SELECT a, b, c FROM sourceTable"
+
+    // must fail because table sink has too few fields.
+    tEnv.sqlUpdate(sql)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testUnmatchedTypesInsert(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val t = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
+    tEnv.registerTable("sourceTable", t)
+
+    val fieldNames = Array("d", "e", "f")
+    val fieldTypes: Array[TypeInformation[_]] = Array(Types.STRING, Types.INT, Types.LONG)
+    val sink = new MemoryTableSinkUtil.UnsafeMemoryAppendTableSink
+    tEnv.registerTableSink("targetTable", fieldNames, fieldTypes, sink)
+
+    val sql = "INSERT INTO targetTable SELECT a, b, c FROM sourceTable"
+
+    // must fail because field types of table sink are incompatible.
+    tEnv.sqlUpdate(sql)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testUnsupportedPartialInsert(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val t = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
+    tEnv.registerTable("sourceTable", t)
+
+    val fieldNames = Array("d", "e", "f")
+    val fieldTypes = tEnv.scan("sourceTable").getSchema.getTypes
+    val sink = new MemoryTableSinkUtil.UnsafeMemoryAppendTableSink
+    tEnv.registerTableSink("targetTable", fieldNames, fieldTypes, sink)
+
+    val sql = "INSERT INTO targetTable (d, f) SELECT a, c FROM sourceTable"
+
+    // must fail because we don't support partial insert yet.
+    tEnv.sqlUpdate(sql)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/OverWindowValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/OverWindowValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/OverWindowValidationTest.scala
index 413cca7..d04b6d0 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/OverWindowValidationTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/OverWindowValidationTest.scala
@@ -43,7 +43,7 @@ class OverWindowValidationTest extends TableTestBase {
       "sum(a) OVER (PARTITION BY b ORDER BY proctime RANGE UNBOUNDED preceding) " +
       "from T1"
 
-    streamUtil.tableEnv.sql(sqlQuery).toAppendStream[Row]
+    streamUtil.tableEnv.sqlQuery(sqlQuery).toAppendStream[Row]
   }
 
   /**
@@ -55,7 +55,7 @@ class OverWindowValidationTest extends TableTestBase {
 
     val sqlQuery = "SELECT overAgg(c, a) FROM MyTable"
 
-    streamUtil.tableEnv.sql(sqlQuery)
+    streamUtil.tableEnv.sqlQuery(sqlQuery)
   }
 
   /**
@@ -66,6 +66,6 @@ class OverWindowValidationTest extends TableTestBase {
     streamUtil.addFunction("overAgg", new OverAgg0)
 
     val sqlQuery = "SELECT overAgg(c, a) FROM MyTable"
-    streamUtil.tableEnv.sql(sqlQuery)
+    streamUtil.tableEnv.sqlQuery(sqlQuery)
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/CorrelateValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/CorrelateValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/CorrelateValidationTest.scala
index dbc7d46..f58feed 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/CorrelateValidationTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/CorrelateValidationTest.scala
@@ -145,7 +145,7 @@ class CorrelateValidationTest extends TableTestBase {
       ), "Undefined function: NONEXIST")
     // SQL API call
     expectExceptionThrown(
-      util.tableEnv.sql("SELECT * FROM MyTable, LATERAL TABLE(nonexist(a))"),
+      util.tableEnv.sqlQuery("SELECT * FROM MyTable, LATERAL TABLE(nonexist(a))"),
       "No match found for function signature nonexist(<NUMERIC>)")
 
 
@@ -160,7 +160,7 @@ class CorrelateValidationTest extends TableTestBase {
     // SQL API call
     // NOTE: it doesn't throw an exception but an AssertionError, maybe a Calcite bug
     expectExceptionThrown(
-      util.tableEnv.sql("SELECT * FROM MyTable, LATERAL TABLE(func0(a))"),
+      util.tableEnv.sqlQuery("SELECT * FROM MyTable, LATERAL TABLE(func0(a))"),
       null,
       classOf[AssertionError])
 
@@ -172,7 +172,7 @@ class CorrelateValidationTest extends TableTestBase {
       "Given parameters of function 'FUNC2' do not match any signature")
     // SQL API call
     expectExceptionThrown(
-      util.tableEnv.sql("SELECT * FROM MyTable, LATERAL TABLE(func2(c, c))"),
+      util.tableEnv.sqlQuery("SELECT * FROM MyTable, LATERAL TABLE(func2(c, c))"),
       "No match found for function signature func2(<CHARACTER>, <CHARACTER>)")
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/InsertIntoValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/InsertIntoValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/InsertIntoValidationTest.scala
new file mode 100644
index 0000000..2fcfd6c
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/InsertIntoValidationTest.scala
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.stream.table.validation
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.{TableEnvironment, Types, ValidationException}
+import org.apache.flink.table.runtime.utils.StreamTestData
+import org.apache.flink.table.utils.MemoryTableSinkUtil
+import org.junit.Test
+
+class InsertIntoValidationTest {
+
+  @Test(expected = classOf[ValidationException])
+  def testInconsistentLengthInsert(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val t = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
+    tEnv.registerTable("sourceTable", t)
+
+    val fieldNames = Array("d", "f")
+    val fieldTypes: Array[TypeInformation[_]] = Array(Types.INT, Types.LONG)
+    val sink = new MemoryTableSinkUtil.UnsafeMemoryAppendTableSink
+    tEnv.registerTableSink("targetTable", fieldNames, fieldTypes, sink)
+
+    // must fail because table sink has too few fields.
+    tEnv.scan("sourceTable")
+      .select('a, 'b, 'c)
+      .insertInto("targetTable")
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testUnmatchedTypesInsert(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val t = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
+    tEnv.registerTable("sourceTable", t)
+
+    val fieldNames = Array("d", "e", "f")
+    val fieldTypes: Array[TypeInformation[_]] = Array(Types.STRING, Types.INT, Types.LONG)
+    val sink = new MemoryTableSinkUtil.UnsafeMemoryAppendTableSink
+    tEnv.registerTableSink("targetTable", fieldNames, fieldTypes, sink)
+
+    // must fail because field types of table sink are incompatible.
+    tEnv.scan("sourceTable")
+      .select('a, 'b, 'c)
+      .insertInto("targetTable")
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/validation/TableSinksValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/validation/TableSinksValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/validation/TableSinksValidationTest.scala
index ab87cd3..628925b 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/validation/TableSinksValidationTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/validation/TableSinksValidationTest.scala
@@ -18,10 +18,12 @@
 
 package org.apache.flink.table.api.validation
 
+import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.scala._
-import org.apache.flink.table.api.TableException
+import org.apache.flink.table.api.{TableException, Types}
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.runtime.stream.table.TestAppendSink
+import org.apache.flink.table.utils.MemoryTableSinkUtil.UnsafeMemoryAppendTableSink
 import org.apache.flink.table.utils.TableTestBase
 import org.junit.Test
 
@@ -39,4 +41,27 @@ class TableSinksValidationTest extends TableTestBase {
     .writeToSink(new TestAppendSink)
   }
 
+  @Test(expected = classOf[TableException])
+  def testSinkTableRegistrationUsingExistedTableName(): Unit = {
+    val util = streamTestUtil()
+    util.addTable[(Int, String)]("TargetTable", 'id, 'text)
+
+    val fieldNames = Array("a", "b", "c")
+    val fieldTypes: Array[TypeInformation[_]] = Array(Types.STRING, Types.INT, Types.LONG)
+    // table name already registered
+    util.tableEnv
+      .registerTableSink("TargetTable", fieldNames, fieldTypes, new UnsafeMemoryAppendTableSink)
+  }
+
+  @Test(expected = classOf[TableException])
+  def testRegistrationWithInconsistentFieldNamesAndTypesLength(): Unit = {
+    val util = streamTestUtil()
+
+    // inconsistent length of field names and types
+    val fieldNames = Array("a", "b", "c")
+    val fieldTypes: Array[TypeInformation[_]] = Array(Types.STRING, Types.LONG)
+
+    util.tableEnv
+      .registerTableSink("TargetTable", fieldNames, fieldTypes, new UnsafeMemoryAppendTableSink)
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ExpressionTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ExpressionTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ExpressionTestBase.scala
index 44842f7..dd6e00e 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ExpressionTestBase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ExpressionTestBase.scala
@@ -23,7 +23,6 @@ import java.util.concurrent.Future
 
 import com.google.common.collect.ImmutableList
 import org.apache.calcite.plan.hep.{HepMatchOrder, HepPlanner, HepProgramBuilder}
-import org.apache.calcite.rel.logical.LogicalTableScan
 import org.apache.calcite.rex.RexNode
 import org.apache.calcite.sql.`type`.SqlTypeName._
 import org.apache.calcite.sql2rel.RelDecorrelator

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/ExpressionReductionRulesTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/ExpressionReductionRulesTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/ExpressionReductionRulesTest.scala
index a15f1d1..b4ad9ca 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/ExpressionReductionRulesTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/ExpressionReductionRulesTest.scala
@@ -430,7 +430,7 @@ class ExpressionReductionRulesTest extends TableTestBase {
 
     util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
 
-    val newTable = util.tableEnv.sql("SELECT 1 + 1 + a AS a FROM MyTable")
+    val newTable = util.tableEnv.sqlQuery("SELECT 1 + 1 + a AS a FROM MyTable")
 
     util.tableEnv.registerTable("NewTable", newTable)
 
@@ -448,7 +448,7 @@ class ExpressionReductionRulesTest extends TableTestBase {
 
     util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
 
-    val newTable = util.tableEnv.sql("SELECT 1 + 1 + a AS a FROM MyTable")
+    val newTable = util.tableEnv.sqlQuery("SELECT 1 + 1 + a AS a FROM MyTable")
 
     util.tableEnv.registerTable("NewTable", newTable)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/RetractionRulesTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/RetractionRulesTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/RetractionRulesTest.scala
index 535bbf5..999a808 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/RetractionRulesTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/RetractionRulesTest.scala
@@ -286,7 +286,7 @@ class RetractionRulesTest extends TableTestBase {
 class StreamTableTestForRetractionUtil extends StreamTableTestUtil {
 
   def verifySqlTrait(query: String, expected: String): Unit = {
-    verifyTableTrait(tableEnv.sql(query), expected)
+    verifyTableTrait(tableEnv.sqlQuery(query), expected)
   }
 
   def verifyTableTrait(resultTable: Table, expected: String): Unit = {

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala
index ab80c65..cfff326 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala
@@ -275,7 +275,7 @@ class TimeIndicatorConversionTest extends TableTestBase {
     val util = streamTestUtil()
     util.addTable[(Long, Int)]("MyTable" , 'long, 'int, 'proctime.proctime)
 
-    val result = util.tableEnv.sql("SELECT COUNT(long) FROM MyTable GROUP BY proctime")
+    val result = util.tableEnv.sqlQuery("SELECT COUNT(long) FROM MyTable GROUP BY proctime")
 
     val expected = unaryNode(
       "DataStreamCalc",
@@ -300,7 +300,7 @@ class TimeIndicatorConversionTest extends TableTestBase {
     val util = streamTestUtil()
     util.addTable[(Long, Int)]("MyTable" , 'long, 'int, 'proctime.proctime)
 
-    val result = util.tableEnv.sql("SELECT MIN(proctime) FROM MyTable GROUP BY long")
+    val result = util.tableEnv.sqlQuery("SELECT MIN(proctime) FROM MyTable GROUP BY long")
 
     val expected = unaryNode(
       "DataStreamCalc",
@@ -325,7 +325,7 @@ class TimeIndicatorConversionTest extends TableTestBase {
     val util = streamTestUtil()
     util.addTable[(Long, Long, Int)]("MyTable", 'rowtime.rowtime, 'long, 'int)
 
-    val result = util.tableEnv.sql(
+    val result = util.tableEnv.sqlQuery(
       "SELECT TUMBLE_END(rowtime, INTERVAL '0.1' SECOND) AS `rowtime`, `long`, " +
         "SUM(`int`) FROM MyTable " +
         "GROUP BY `long`, TUMBLE(rowtime, INTERVAL '0.1' SECOND)")
@@ -355,7 +355,7 @@ class TimeIndicatorConversionTest extends TableTestBase {
     val util = streamTestUtil()
     util.addTable[(Long, Long, Int)]("MyTable", 'rowtime.rowtime, 'long, 'int)
 
-    val result = util.tableEnv.sql("SELECT MIN(rowtime), long FROM MyTable " +
+    val result = util.tableEnv.sqlQuery("SELECT MIN(rowtime), long FROM MyTable " +
       "GROUP BY long, TUMBLE(rowtime, INTERVAL '0.1' SECOND)")
 
     val expected = unaryNode(

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/AggregateITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/AggregateITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/AggregateITCase.scala
index 39b8371..465a88c 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/AggregateITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/AggregateITCase.scala
@@ -52,7 +52,7 @@ class AggregateITCase(
     val ds = CollectionDataSets.get3TupleDataSet(env)
     tEnv.registerDataSet("MyTable", ds)
 
-    val result = tEnv.sql(sqlQuery)
+    val result = tEnv.sqlQuery(sqlQuery)
 
     val expected = "231,1,21,21,11"
     val results = result.toDataSet[Row].collect()
@@ -70,7 +70,7 @@ class AggregateITCase(
     val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
     tEnv.registerTable("MyTable", ds)
 
-    val result = tEnv.sql(sqlQuery)
+    val result = tEnv.sqlQuery(sqlQuery)
 
     val expected = "231"
     val results = result.toDataSet[Row].collect()
@@ -88,7 +88,7 @@ class AggregateITCase(
     val ds = CollectionDataSets.get3TupleDataSet(env)
     tEnv.registerDataSet("MyTable", ds)
 
-    val result = tEnv.sql(sqlQuery)
+    val result = tEnv.sqlQuery(sqlQuery)
 
     val expected = "231"
     val results = result.toDataSet[Row].collect()
@@ -109,7 +109,7 @@ class AggregateITCase(
       (2: Byte, 2: Short, 2, 2L, 2.0f, 2.0d, "Ciao")).toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'f, 'g)
     tEnv.registerTable("MyTable", ds)
 
-    val result = tEnv.sql(sqlQuery)
+    val result = tEnv.sqlQuery(sqlQuery)
 
     val expected = "1,1,1,1,1.5,1.5,2,Ciao,Ciao,Hello,Ciao,3.0"
     val results = result.toDataSet[Row].collect()
@@ -128,7 +128,7 @@ class AggregateITCase(
     val ds = env.fromElements((1: Byte, 1: Short), (2: Byte, 2: Short)).toTable(tEnv, 'a, 'b)
     tEnv.registerTable("MyTable", ds)
 
-    val result = tEnv.sql(sqlQuery)
+    val result = tEnv.sqlQuery(sqlQuery)
 
     val expected = "1,3,2,1,3"
     val results = result.toDataSet[Row].collect()
@@ -147,7 +147,7 @@ class AggregateITCase(
     val ds = env.fromElements((1f, "Hello"), (2f, "Ciao")).toTable(tEnv, 'a, 'b)
     tEnv.registerTable("MyTable", ds)
 
-    val result = tEnv.sql(sqlQuery)
+    val result = tEnv.sqlQuery(sqlQuery)
 
     val expected = "5.5,7"
     val results = result.toDataSet[Row].collect()
@@ -165,7 +165,7 @@ class AggregateITCase(
     val ds = env.fromElements((1f, "Hello"), (2f, "Ciao")).toTable(tEnv)
     tEnv.registerTable("MyTable", ds)
 
-    val result = tEnv.sql(sqlQuery)
+    val result = tEnv.sqlQuery(sqlQuery)
 
     val expected = "2,2"
     val results = result.toDataSet[Row].collect()
@@ -187,7 +187,7 @@ class AggregateITCase(
       (2: Byte, 2: Short, 2, 2L, 2.0f, 2.0d, "Ciao")).toTable(tEnv)
     tEnv.registerTable("MyTable", ds)
 
-    val result = tEnv.sql(sqlQuery)
+    val result = tEnv.sqlQuery(sqlQuery)
 
     val expected = "1,3,2"
     val results = result.toDataSet[Row].collect()
@@ -205,7 +205,7 @@ class AggregateITCase(
     val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
     tEnv.registerTable("MyTable", ds)
 
-    val result = tEnv.sql(sqlQuery)
+    val result = tEnv.sqlQuery(sqlQuery)
 
     val expected = "231,21"
     val results = result.toDataSet[Row].collect()
@@ -223,7 +223,7 @@ class AggregateITCase(
     val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
     tEnv.registerTable("MyTable", ds)
 
-    val result = tEnv.sql(sqlQuery)
+    val result = tEnv.sqlQuery(sqlQuery)
 
     val expected =
       "6,18,6\n5,13,5\n4,8,4\n3,5,3\n2,2,2\n1,1,1"
@@ -243,7 +243,7 @@ class AggregateITCase(
     val ds = CollectionDataSets.get3TupleDataSet(env)
     tEnv.registerDataSet("MyTable", ds)
 
-    val result = tEnv.sql(sqlQuery).toDataSet[Row].collect()
+    val result = tEnv.sqlQuery(sqlQuery).toDataSet[Row].collect()
 
     val expected =
       "6,null,18,1\n5,null,13,1\n4,null,8,1\n3,null,5,1\n2,null,2,1\n1,null,1,1\n" +
@@ -280,9 +280,9 @@ class AggregateITCase(
 
     tEnv.registerTable("MyTable", ds)
 
-    val result = tEnv.sql(sqlQuery)
-    val result2 = tEnv.sql(sqlQuery2)
-    val result3 = tEnv.sql(sqlQuery3)
+    val result = tEnv.sqlQuery(sqlQuery)
+    val result2 = tEnv.sqlQuery(sqlQuery2)
+    val result3 = tEnv.sqlQuery(sqlQuery3)
 
     val results = result.toDataSet[Row].collect()
     val expected = Seq.empty
@@ -315,7 +315,7 @@ class AggregateITCase(
       .map(x => (x._1, x._2, x._3, new Timestamp(x._1 * 1000)))
     tEnv.registerDataSet("T", ds, 'a, 'b, 'c, 'ts)
 
-    val result = tEnv.sql(sqlQuery).toDataSet[Row].collect()
+    val result = tEnv.sqlQuery(sqlQuery).toDataSet[Row].collect()
     val expected = Seq(
       "1,1,1,1,1",
       "2,2,1,2,2", "2,3,1,2,3",
@@ -348,7 +348,7 @@ class AggregateITCase(
       .map(x => (x._1, x._2, x._3, new Timestamp(x._1 * 1000)))
     tEnv.registerDataSet("T", ds, 'a, 'b, 'c, 'ts)
 
-    val result = tEnv.sql(sqlQuery).toDataSet[Row].collect()
+    val result = tEnv.sqlQuery(sqlQuery).toDataSet[Row].collect()
     val expected = Seq(
       "1,1,1,1,1","1,1,1,1,1",
       "2,5,2,2,2","2,5,2,2,2",
@@ -383,7 +383,7 @@ class AggregateITCase(
       .map(x => (x._1, x._2, x._3, new Timestamp(x._1 * 1000)))
     tEnv.registerDataSet("T", ds, 'a, 'b, 'c, 'ts)
 
-    val result = tEnv.sql(sqlQuery).toDataSet[Row].collect()
+    val result = tEnv.sqlQuery(sqlQuery).toDataSet[Row].collect()
     val expected = Seq(
       "2,10,39,6,3,7",
       "16,21,111,6,6,18"

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/CalcITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/CalcITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/CalcITCase.scala
index 711182c..b891a7d 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/CalcITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/CalcITCase.scala
@@ -53,7 +53,7 @@ class CalcITCase(
     val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
     tEnv.registerTable("MyTable", ds)
 
-    val result = tEnv.sql(sqlQuery)
+    val result = tEnv.sqlQuery(sqlQuery)
 
     val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" +
       "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" +
@@ -77,7 +77,7 @@ class CalcITCase(
     val ds = CollectionDataSets.get3TupleDataSet(env)
     tEnv.registerDataSet("MyTable", ds, 'a, 'b, 'c)
 
-    val result = tEnv.sql(sqlQuery)
+    val result = tEnv.sqlQuery(sqlQuery)
 
     val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" +
       "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" +
@@ -101,7 +101,7 @@ class CalcITCase(
     val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
     tEnv.registerTable("MyTable", ds)
 
-    val result = tEnv.sql(sqlQuery)
+    val result = tEnv.sqlQuery(sqlQuery)
 
     val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" +
       "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" +
@@ -125,7 +125,7 @@ class CalcITCase(
     val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
     tEnv.registerTable("MyTable", ds)
 
-    val result = tEnv.sql(sqlQuery)
+    val result = tEnv.sqlQuery(sqlQuery)
 
     val expected = "1,1\n" + "2,2\n" + "3,2\n" + "4,3\n" + "5,3\n" + "6,3\n" + "7,4\n" +
       "8,4\n" + "9,4\n" + "10,4\n" + "11,5\n" + "12,5\n" + "13,5\n" + "14,5\n" + "15,5\n" +
@@ -146,7 +146,7 @@ class CalcITCase(
     val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
     tEnv.registerTable("MyTable", ds)
 
-    tEnv.sql(sqlQuery)
+    tEnv.sqlQuery(sqlQuery)
   }
 
   @Test
@@ -160,7 +160,7 @@ class CalcITCase(
     val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
     tEnv.registerTable("MyTable", ds)
 
-    val result = tEnv.sql(sqlQuery)
+    val result = tEnv.sqlQuery(sqlQuery)
 
     val expected = "\n"
     val results = result.toDataSet[Row].collect()
@@ -178,7 +178,7 @@ class CalcITCase(
     val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
     tEnv.registerTable("MyTable", ds)
 
-    val result = tEnv.sql(sqlQuery)
+    val result = tEnv.sqlQuery(sqlQuery)
 
     val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, " +
       "how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + "7,4," +
@@ -201,7 +201,7 @@ class CalcITCase(
     val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
     tEnv.registerTable("MyTable", ds)
 
-    val result = tEnv.sql(sqlQuery)
+    val result = tEnv.sqlQuery(sqlQuery)
 
     val expected = "3,2,Hello world\n" + "4,3,Hello world, how are you?\n"
     val results = result.toDataSet[Row].collect()
@@ -219,7 +219,7 @@ class CalcITCase(
     val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
     tEnv.registerTable("MyTable", ds)
 
-    val result = tEnv.sql(sqlQuery)
+    val result = tEnv.sqlQuery(sqlQuery)
 
     val expected = "2,2,Hello\n" + "4,3,Hello world, how are you?\n" +
       "6,3,Luke Skywalker\n" + "8,4," + "Comment#2\n" + "10,4,Comment#4\n" +
@@ -240,7 +240,7 @@ class CalcITCase(
     val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
     tEnv.registerTable("MyTable", ds)
 
-    val result = tEnv.sql(sqlQuery)
+    val result = tEnv.sqlQuery(sqlQuery)
 
     val expected = "1,1,Hi\n" + "21,6,Comment#15\n"
     val results = result.toDataSet[Row].collect()
@@ -258,7 +258,7 @@ class CalcITCase(
     val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
     tEnv.registerTable("MyTable", ds)
 
-    val result = tEnv.sql(sqlQuery)
+    val result = tEnv.sqlQuery(sqlQuery)
 
     val expected = "3,2,Hello world\n" + "7,4,Comment#1\n" +
       "9,4,Comment#3\n" + "17,6,Comment#11\n" +
@@ -281,7 +281,7 @@ class CalcITCase(
       Timestamp.valueOf("1984-07-12 14:34:24")))
     tEnv.registerDataSet("MyTable", ds, 'a, 'b, 'c)
 
-    val result = tEnv.sql(sqlQuery)
+    val result = tEnv.sqlQuery(sqlQuery)
 
     val expected = "1984-07-12,14:34:24,1984-07-12 14:34:24.0," +
       "1984-07-12,14:34:24,1984-07-12 14:34:24.0"
@@ -300,7 +300,7 @@ class CalcITCase(
     val ds = env.fromElements("a", "b", "c")
     tEnv.registerDataSet("MyTable", ds, 'text)
 
-    val result = tEnv.sql("SELECT hashCode(text) FROM MyTable")
+    val result = tEnv.sqlQuery("SELECT hashCode(text) FROM MyTable")
 
     val expected = "97\n98\n99"
     val results = result.toDataSet[Row].collect()

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/JoinITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/JoinITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/JoinITCase.scala
index 681b4b5..6a17cb4 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/JoinITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/JoinITCase.scala
@@ -49,7 +49,7 @@ class JoinITCase(
     tEnv.registerTable("Table3", ds1)
     tEnv.registerTable("Table5", ds2)
 
-    val result = tEnv.sql(sqlQuery)
+    val result = tEnv.sqlQuery(sqlQuery)
 
     val expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt\n"
     val results = result.toDataSet[Row].collect()
@@ -69,7 +69,7 @@ class JoinITCase(
     tEnv.registerTable("Table3", ds1)
     tEnv.registerTable("Table5", ds2)
 
-    val result = tEnv.sql(sqlQuery)
+    val result = tEnv.sqlQuery(sqlQuery)
 
     val expected = "Hi,Hallo\n"
     val results = result.toDataSet[Row].collect()
@@ -89,7 +89,7 @@ class JoinITCase(
     tEnv.registerTable("Table3", ds1)
     tEnv.registerTable("Table5", ds2)
 
-    val result = tEnv.sql(sqlQuery)
+    val result = tEnv.sqlQuery(sqlQuery)
 
     val expected = "Hello world, how are you?,Hallo Welt wie\n" + "I am fine.,Hallo Welt wie\n"
     val results = result.toDataSet[Row].collect()
@@ -109,7 +109,7 @@ class JoinITCase(
     tEnv.registerTable("Table3", ds1)
     tEnv.registerTable("Table5", ds2)
 
-    val result = tEnv.sql(sqlQuery)
+    val result = tEnv.sqlQuery(sqlQuery)
 
     val expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt wie gehts?\n" +
       "Hello world,ABC\n" + "I am fine.,HIJ\n" + "I am fine.,IJK\n"
@@ -130,7 +130,7 @@ class JoinITCase(
     tEnv.registerTable("Table3", ds1)
     tEnv.registerTable("Table5", ds2)
 
-    val result = tEnv.sql(sqlQuery)
+    val result = tEnv.sqlQuery(sqlQuery)
     val expected = "1,Hi\n" + "2,Hello\n" + "1,Hello\n" +
       "2,Hello world\n" + "2,Hello world\n" + "3,Hello world\n"
     val results = result.toDataSet[Row].collect()
@@ -150,7 +150,7 @@ class JoinITCase(
     tEnv.registerDataSet("Table3", ds1, 'a, 'b, 'c)
     tEnv.registerDataSet("Table5", ds2, 'd, 'e, 'f, 'g, 'h)
 
-    val result = tEnv.sql(sqlQuery)
+    val result = tEnv.sqlQuery(sqlQuery)
 
     val expected = "6,6"
     val results = result.toDataSet[Row].collect()
@@ -170,7 +170,7 @@ class JoinITCase(
     tEnv.registerTable("Table3", ds1)
     tEnv.registerTable("Table5", ds2)
 
-    val result = tEnv.sql(sqlQuery)
+    val result = tEnv.sqlQuery(sqlQuery)
 
     val expected = "6,6"
     val results = result.toDataSet[Row].collect()
@@ -196,7 +196,7 @@ class JoinITCase(
       "null,CDE\n" + "null,DEF\n" + "null,EFG\n" + "null,FGH\n" + "null,GHI\n" + "null,HIJ\n" +
       "null,IJK\n" + "null,JKL\n" + "null,KLM"
 
-    val results = tEnv.sql(sqlQuery).toDataSet[Row].collect()
+    val results = tEnv.sqlQuery(sqlQuery).toDataSet[Row].collect()
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
@@ -218,7 +218,7 @@ class JoinITCase(
       "null,Hallo Welt wie\n" + "null,Hallo Welt wie gehts?\n" + "null,ABC\n" + "null,BCD\n" +
       "null,CDE\n" + "null,DEF\n" + "null,EFG\n" + "null,FGH\n" + "null,GHI\n" + "null,HIJ\n" +
       "null,IJK\n" + "null,JKL\n" + "null,KLM"
-    val results = tEnv.sql(sqlQuery).toDataSet[Row].collect()
+    val results = tEnv.sqlQuery(sqlQuery).toDataSet[Row].collect()
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
@@ -240,7 +240,7 @@ class JoinITCase(
       "null,Hallo Welt wie\n" + "null,Hallo Welt wie gehts?\n" + "null,ABC\n" + "null,BCD\n" +
       "null,CDE\n" + "null,DEF\n" + "null,EFG\n" + "null,FGH\n" + "null,GHI\n" + "null,HIJ\n" +
       "null,IJK\n" + "null,JKL\n" + "null,KLM"
-    val results = tEnv.sql(sqlQuery).toDataSet[Row].collect()
+    val results = tEnv.sqlQuery(sqlQuery).toDataSet[Row].collect()
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
@@ -257,7 +257,7 @@ class JoinITCase(
       "3,1,1,Hi\n" +
       "3,2,2,Hello\n" +
       "3,3,2,Hello world"
-    val result = tEnv.sql(sqlQuery2).collect()
+    val result = tEnv.sqlQuery(sqlQuery2).collect()
     TestBaseUtils.compareResultAsText(result.asJava, expected)
   }
 
@@ -274,7 +274,7 @@ class JoinITCase(
       "1,1,Hi,3\n" +
       "2,2,Hello,3\n" +
       "3,2,Hello world,3"
-    val result = tEnv.sql(sqlQuery1).collect()
+    val result = tEnv.sqlQuery(sqlQuery1).collect()
     TestBaseUtils.compareResultAsText(result.asJava, expected)
   }
 
@@ -287,7 +287,7 @@ class JoinITCase(
     tEnv.registerTable("A", table)
 
     val sqlQuery1 = "SELECT * FROM A CROSS JOIN (SELECT count(*) FROM A HAVING count(*) < 0)"
-    val result = tEnv.sql(sqlQuery1).count()
+    val result = tEnv.sqlQuery(sqlQuery1).count()
     Assert.assertEquals(0, result)
   }
 
@@ -305,7 +305,7 @@ class JoinITCase(
     tEnv.registerTable("B", ds2)
 
 
-    val result = tEnv.sql(sqlQuery)
+    val result = tEnv.sqlQuery(sqlQuery)
     val expected = Seq(
           "1,null",
           "2,null", "2,null",
@@ -331,7 +331,7 @@ class JoinITCase(
     tEnv.registerTable("A", ds1)
     tEnv.registerTable("B", ds2)
 
-    val result = tEnv.sql(sqlQuery)
+    val result = tEnv.sqlQuery(sqlQuery)
     val expected = Seq(
       "1,null", "2,null", "2,null", "3,3", "3,3",
       "3,3", "4,null", "4,null", "4,null",
@@ -355,7 +355,7 @@ class JoinITCase(
     tEnv.registerTable("A", ds1)
     tEnv.registerTable("B", ds2)
 
-    val result = tEnv.sql(sqlQuery)
+    val result = tEnv.sqlQuery(sqlQuery)
     val expected = Seq(
       "1,3", "2,3", "2,3", "3,null", "3,null",
       "3,null", "4,null", "4,null", "4,null",
@@ -380,7 +380,7 @@ class JoinITCase(
     tEnv.registerTable("A", ds2)
     tEnv.registerTable("B", ds1)
 
-    val result = tEnv.sql(sqlQuery)
+    val result = tEnv.sqlQuery(sqlQuery)
 
     val expected = Seq(
       "2,null", "3,null", "1,null").mkString("\n")
@@ -402,7 +402,7 @@ class JoinITCase(
     tEnv.registerTable("A", ds1)
     tEnv.registerTable("B", ds2)
 
-    val result = tEnv.sql(sqlQuery)
+    val result = tEnv.sqlQuery(sqlQuery)
 
     val expected = Seq(
       "1,null", "2,null", "2,null", "3,3", "3,3",
@@ -427,7 +427,7 @@ class JoinITCase(
     tEnv.registerTable("A", ds1)
     tEnv.registerTable("B", ds2)
 
-    val result = tEnv.sql(sqlQuery)
+    val result = tEnv.sqlQuery(sqlQuery)
 
     val expected = Seq(
       "1,null", "2,null", "2,null", "3,null", "3,null",
@@ -453,7 +453,7 @@ class JoinITCase(
     tEnv.registerTable("t1", ds1)
     tEnv.registerTable("t2", ds2)
 
-    val result = tEnv.sql(sqlQuery)
+    val result = tEnv.sqlQuery(sqlQuery)
     val expected = Seq(
       "1,null,null",
       "2,null,null", "2,null,null",
@@ -481,7 +481,7 @@ class JoinITCase(
 
     val sqlQuery = "SELECT a, s FROM T, UNNEST(T.c) as A (s)"
 
-    val result = tEnv.sql(sqlQuery)
+    val result = tEnv.sqlQuery(sqlQuery)
 
     val expected = List("1,Hi", "1,w", "2,Hello", "2,k", "3,Hello world", "3,x")
     val results = result.toDataSet[Row].collect().toList
@@ -508,7 +508,7 @@ class JoinITCase(
       "  UNNEST(tf.b) as A (x, y) " +
       "WHERE x > a"
 
-    val result = tEnv.sql(sqlQuery)
+    val result = tEnv.sqlQuery(sqlQuery)
 
     val expected = List(
       "1,[(12,45.6), (2,45.612)],12,45.6",

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/SetOperatorsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/SetOperatorsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/SetOperatorsITCase.scala
index b0e6fe8..d965e0c 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/SetOperatorsITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/SetOperatorsITCase.scala
@@ -52,7 +52,7 @@ class SetOperatorsITCase(
     tEnv.registerDataSet("t1", ds1, 'a, 'b, 'c)
     tEnv.registerDataSet("t2", ds2, 'd, 'e, 'f)
 
-    val result = tEnv.sql(sqlQuery)
+    val result = tEnv.sqlQuery(sqlQuery)
 
     val expected = "Hi\n" + "Hello\n" + "Hello world\n" + "Hi\n" + "Hello\n" + "Hello world\n"
     val results = result.toDataSet[Row].collect()
@@ -72,7 +72,7 @@ class SetOperatorsITCase(
     tEnv.registerDataSet("t1", ds1, 'a, 'b, 'c)
     tEnv.registerDataSet("t2", ds2, 'd, 'e, 'f)
 
-    val result = tEnv.sql(sqlQuery)
+    val result = tEnv.sqlQuery(sqlQuery)
 
     val expected = "Hi\n" + "Hello\n" + "Hello world\n"
     val results = result.toDataSet[Row].collect()
@@ -94,7 +94,7 @@ class SetOperatorsITCase(
     tEnv.registerDataSet("t1", ds1, 'a, 'b, 'c)
     tEnv.registerDataSet("t2", ds2, 'a, 'b, 'd, 'c, 'e)
 
-    val result = tEnv.sql(sqlQuery)
+    val result = tEnv.sqlQuery(sqlQuery)
 
     val expected = "Hi\n" + "Hallo\n"
     val results = result.toDataSet[Row].collect()
@@ -115,7 +115,7 @@ class SetOperatorsITCase(
     tEnv.registerDataSet("t1", ds1, 'a, 'b, 'c)
     tEnv.registerDataSet("t2", ds2, 'a, 'b, 'd, 'c, 'e)
 
-    val result = tEnv.sql(sqlQuery)
+    val result = tEnv.sqlQuery(sqlQuery)
 
     val expected = "18"
     val results = result.toDataSet[Row].collect()
@@ -135,7 +135,7 @@ class SetOperatorsITCase(
     tEnv.registerDataSet("t1", ds1, 'a, 'b, 'c)
     tEnv.registerDataSet("t2", ds2, 'a, 'b, 'c)
 
-    val result = tEnv.sql(sqlQuery)
+    val result = tEnv.sqlQuery(sqlQuery)
 
     val expected = "Hello\n" + "Hello world\n"
     val results = result.toDataSet[Row].collect()
@@ -161,7 +161,7 @@ class SetOperatorsITCase(
     tEnv.registerDataSet("t1", ds1, 'c)
     tEnv.registerDataSet("t2", ds2, 'c)
 
-    val result = tEnv.sql(sqlQuery)
+    val result = tEnv.sqlQuery(sqlQuery)
 
     val expected = "1\n1"
     val results = result.toDataSet[Row].collect()
@@ -183,7 +183,7 @@ class SetOperatorsITCase(
     tEnv.registerDataSet("t1", ds1, 'a, 'b, 'c)
     tEnv.registerDataSet("t2", ds2, 'a, 'b, 'd, 'c, 'e)
 
-    val result = tEnv.sql(sqlQuery)
+    val result = tEnv.sqlQuery(sqlQuery)
 
     val expected = "Hi\n"
     val results = result.toDataSet[Row].collect()
@@ -208,7 +208,7 @@ class SetOperatorsITCase(
     tEnv.registerDataSet("t1", ds1, 'a, 'b, 'c)
     tEnv.registerDataSet("t2", ds2, 'a, 'b, 'c)
 
-    val result = tEnv.sql(sqlQuery)
+    val result = tEnv.sqlQuery(sqlQuery)
 
     val expected = "Hi\n" + "Hello\n"
     val results = result.toDataSet[Row].collect()
@@ -234,7 +234,7 @@ class SetOperatorsITCase(
     tEnv.registerDataSet("t1", ds1, 'c)
     tEnv.registerDataSet("t2", ds2, 'c)
 
-    val result = tEnv.sql(sqlQuery)
+    val result = tEnv.sqlQuery(sqlQuery)
 
     val expected = "1\n2\n2"
     val results = result.toDataSet[Row].collect()
@@ -254,7 +254,7 @@ class SetOperatorsITCase(
     tEnv.registerDataSet("t1", ds1, 'a, 'b, 'c)
     tEnv.registerDataSet("t2", ds2, 'a, 'b, 'c)
 
-    val result = tEnv.sql(sqlQuery)
+    val result = tEnv.sqlQuery(sqlQuery)
 
     val expected = "Hello\n" + "Hello world\n"
     val results = result.toDataSet[Row].collect()
@@ -271,7 +271,7 @@ class SetOperatorsITCase(
     tEnv.registerTable("Table3", ds1)
     tEnv.registerTable("Table5", ds2)
 
-    val result = tEnv.sql("SELECT d FROM Table5 WHERE d IN (SELECT a FROM Table3)")
+    val result = tEnv.sqlQuery("SELECT d FROM Table5 WHERE d IN (SELECT a FROM Table3)")
 
     val expected = Seq("1", "2", "2", "3", "3", "3").mkString("\n")
     val results = result.toDataSet[Row].collect()
@@ -288,7 +288,7 @@ class SetOperatorsITCase(
     tEnv.registerTable("Table3", ds1)
     tEnv.registerTable("Table5", ds2)
 
-    val result = tEnv.sql("SELECT d IN (SELECT a FROM Table3) FROM Table5")
+    val result = tEnv.sqlQuery("SELECT d IN (SELECT a FROM Table3) FROM Table5")
 
     val expected = Seq("false", "false", "false", "false", "false", "false", "false",
       "false", "false", "true", "true", "true", "true", "true", "true").mkString("\n")

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/SortITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/SortITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/SortITCase.scala
index 4672ec3..66943fc 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/SortITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/SortITCase.scala
@@ -62,7 +62,7 @@ class SortITCase(mode: TestExecutionMode, configMode: TableConfigMode)
 
     val expected = sortExpectedly(tupleDataSetStrings)
     // squash all rows inside a partition into one element
-    val results = tEnv.sql(sqlQuery).toDataSet[Row].mapPartition(rows => {
+    val results = tEnv.sqlQuery(sqlQuery).toDataSet[Row].mapPartition(rows => {
       // the rows need to be copied in object reuse mode
       val copied = new mutable.ArrayBuffer[Row]
       rows.foreach(r => copied += Row.copy(r))
@@ -99,7 +99,7 @@ class SortITCase(mode: TestExecutionMode, configMode: TableConfigMode)
 
     val expected = sortExpectedly(tupleDataSetStrings, 2, 21)
     // squash all rows inside a partition into one element
-    val results = tEnv.sql(sqlQuery).toDataSet[Row].mapPartition(rows => {
+    val results = tEnv.sqlQuery(sqlQuery).toDataSet[Row].mapPartition(rows => {
       // the rows need to be copied in object reuse mode
       val copied = new mutable.ArrayBuffer[Row]
       rows.foreach(r => copied += Row.copy(r))
@@ -130,7 +130,7 @@ class SortITCase(mode: TestExecutionMode, configMode: TableConfigMode)
 
     val expected = sortExpectedly(tupleDataSetStrings, 2, 7)
     // squash all rows inside a partition into one element
-    val results = tEnv.sql(sqlQuery).toDataSet[Row].mapPartition(rows => {
+    val results = tEnv.sqlQuery(sqlQuery).toDataSet[Row].mapPartition(rows => {
       // the rows need to be copied in object reuse mode
       val copied = new mutable.ArrayBuffer[Row]
       rows.foreach(r => copied += Row.copy(r))
@@ -161,7 +161,7 @@ class SortITCase(mode: TestExecutionMode, configMode: TableConfigMode)
 
     val expected = sortExpectedly(tupleDataSetStrings, 0, 5)
     // squash all rows inside a partition into one element
-    val results = tEnv.sql(sqlQuery).toDataSet[Row].mapPartition(rows => {
+    val results = tEnv.sqlQuery(sqlQuery).toDataSet[Row].mapPartition(rows => {
       // the rows need to be copied in object reuse mode
       val copied = new mutable.ArrayBuffer[Row]
       rows.foreach(r => copied += Row.copy(r))

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableEnvironmentITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableEnvironmentITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableEnvironmentITCase.scala
index b7f1bb1..be8278f 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableEnvironmentITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableEnvironmentITCase.scala
@@ -24,8 +24,10 @@ import org.apache.flink.table.api.TableEnvironment
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.runtime.utils.TableProgramsCollectionTestBase
 import org.apache.flink.table.runtime.utils.TableProgramsTestBase.TableConfigMode
+import org.apache.flink.table.utils.MemoryTableSinkUtil
 import org.apache.flink.test.util.TestBaseUtils
 import org.apache.flink.types.Row
+import org.junit.Assert.assertEquals
 import org.junit._
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
@@ -48,7 +50,7 @@ class TableEnvironmentITCase(
 
     val sqlQuery = "SELECT * FROM MyTable WHERE a > 9"
 
-    val result = tEnv.sql(sqlQuery).select('a.avg, 'b.sum, 'c.count)
+    val result = tEnv.sqlQuery(sqlQuery).select('a.avg, 'b.sum, 'c.count)
 
     val expected = "15,65,12"
     val results = result.toDataSet[Row].collect()
@@ -68,7 +70,7 @@ class TableEnvironmentITCase(
 
     val sqlQuery = "SELECT avg(a) as a1, sum(b) as b1, count(c) as c1 FROM MyTable"
 
-    val result = tEnv.sql(sqlQuery).select('a1 + 1, 'b1 - 5, 'c1)
+    val result = tEnv.sqlQuery(sqlQuery).select('a1 + 1, 'b1 - 5, 'c1)
 
     val expected = "16,60,12"
     val results = result.toDataSet[Row].collect()
@@ -85,11 +87,11 @@ class TableEnvironmentITCase(
     tEnv.registerTable("MyTable", t)
 
     val sqlQuery = "SELECT a as aa FROM MyTable WHERE b = 6"
-    val result1 = tEnv.sql(sqlQuery)
+    val result1 = tEnv.sqlQuery(sqlQuery)
     tEnv.registerTable("ResTable", result1)
 
     val sqlQuery2 = "SELECT count(aa) FROM ResTable"
-    val result2 = tEnv.sql(sqlQuery2)
+    val result2 = tEnv.sqlQuery(sqlQuery2)
 
     val expected = "6"
     val results = result2.toDataSet[Row].collect()
@@ -106,11 +108,34 @@ class TableEnvironmentITCase(
     val ds = env.fromElements(((12, true), "Hello")).toTable(tEnv).as('a1, 'a2)
     tEnv.registerTable("MyTable", ds)
 
-    val result = tEnv.sql(sqlQuery)
+    val result = tEnv.sqlQuery(sqlQuery)
 
     val expected = "Hello,true\n"
 
     val results = result.toDataSet[Row].collect()
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
+
+  @Test
+  def testInsertIntoMemoryTable(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    MemoryTableSinkUtil.clear
+
+    val t = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
+    tEnv.registerTable("sourceTable", t)
+
+    val fieldNames = Array("d", "e", "f")
+    val fieldTypes = tEnv.scan("sourceTable").getSchema.getTypes
+    val sink = new MemoryTableSinkUtil.UnsafeMemoryAppendTableSink
+    tEnv.registerTableSink("targetTable", fieldNames, fieldTypes, sink)
+
+    val sql = "INSERT INTO targetTable SELECT a, b, c FROM sourceTable"
+    tEnv.sqlUpdate(sql)
+    env.execute()
+
+    val expected = List("1,1,Hi", "2,2,Hello", "3,2,Hello world")
+    assertEquals(expected.sorted, MemoryTableSinkUtil.results.sorted)
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableSourceITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableSourceITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableSourceITCase.scala
index 504ab90..187d096 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableSourceITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableSourceITCase.scala
@@ -44,7 +44,7 @@ class TableSourceITCase(
     val tEnv = TableEnvironment.getTableEnvironment(env, config)
 
     tEnv.registerTableSource("csvTable", csvTable)
-    val results = tEnv.sql(
+    val results = tEnv.sqlQuery(
       "SELECT id, `first`, `last`, score FROM csvTable").collect()
 
     val expected = Seq(
@@ -67,7 +67,7 @@ class TableSourceITCase(
 
     tableEnv.registerTableSource("NestedPersons", nestedTable)
 
-    val result = tableEnv.sql("SELECT NestedPersons.firstName, NestedPersons.lastName," +
+    val result = tableEnv.sqlQuery("SELECT NestedPersons.firstName, NestedPersons.lastName," +
         "NestedPersons.address.street, NestedPersons.address.city AS city " +
         "FROM NestedPersons " +
         "WHERE NestedPersons.address.city LIKE 'Dublin'").collect()

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CalcITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CalcITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CalcITCase.scala
index 116f690..e947c3f 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CalcITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CalcITCase.scala
@@ -446,7 +446,7 @@ class CalcITCase(
 
     val sqlQuery = "SELECT c FROM t1 where RichFunc2(c)='ABC#Hello'"
 
-    val result = tEnv.sql(sqlQuery)
+    val result = tEnv.sqlQuery(sqlQuery)
 
     val expected = "Hello"
     val results = result.toDataSet[Row].collect()
@@ -467,7 +467,7 @@ class CalcITCase(
 
     val sqlQuery = "SELECT c FROM t1 where RichFunc3(c)=true"
 
-    val result = tEnv.sql(sqlQuery)
+    val result = tEnv.sqlQuery(sqlQuery)
 
     val expected = "Hello"
     val results = result.toDataSet[Row].collect()
@@ -488,7 +488,7 @@ class CalcITCase(
     val sqlQuery = "SELECT c FROM t1 where " +
       "RichFunc2(c)='Abc#Hello' or RichFunc1(a)=3 and b=2"
 
-    val result = tEnv.sql(sqlQuery)
+    val result = tEnv.sqlQuery(sqlQuery)
 
     val expected = "Hello\nHello world"
     val results = result.toDataSet[Row].collect()

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/TableEnvironmentITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/TableEnvironmentITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/TableEnvironmentITCase.scala
index 2e23161..725c580 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/TableEnvironmentITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/TableEnvironmentITCase.scala
@@ -26,8 +26,10 @@ import org.apache.flink.table.api.TableEnvironment
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.runtime.utils.TableProgramsTestBase.TableConfigMode
 import org.apache.flink.table.runtime.utils.{TableProgramsCollectionTestBase, TableProgramsTestBase}
+import org.apache.flink.table.utils.MemoryTableSinkUtil
 import org.apache.flink.test.util.TestBaseUtils
 import org.apache.flink.types.Row
+import org.junit.Assert.assertEquals
 import org.junit._
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
@@ -162,6 +164,28 @@ class TableEnvironmentITCase(
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
+  @Test
+  def testInsertIntoMemoryTable(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    MemoryTableSinkUtil.clear
+
+    val t = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
+    tEnv.registerTable("sourceTable", t)
+
+    val fieldNames = Array("d", "e", "f")
+    val fieldTypes = tEnv.scan("sourceTable").getSchema.getTypes
+    val sink = new MemoryTableSinkUtil.UnsafeMemoryAppendTableSink
+    tEnv.registerTableSink("targetTable", fieldNames, fieldTypes, sink)
+
+    tEnv.scan("sourceTable")
+      .select('a, 'b, 'c)
+      .insertInto("targetTable")
+    env.execute()
+
+    val expected = List("1,1,Hi", "2,2,Hello", "3,2,Hello world")
+    assertEquals(expected.sorted, MemoryTableSinkUtil.results.sorted)
+  }
 }
 
 object TableEnvironmentITCase {

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala
index 24d8695..ec65cf7 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala
@@ -229,7 +229,7 @@ class TimeAttributesITCase extends StreamingMultipleProgramsTestBase {
     val table = stream.toTable(tEnv, 'rowtime.rowtime, 'int, 'double, 'float, 'bigdec, 'string)
     tEnv.registerTable("MyTable", table)
 
-    val t = tEnv.sql("SELECT COUNT(`rowtime`) FROM MyTable " +
+    val t = tEnv.sqlQuery("SELECT COUNT(`rowtime`) FROM MyTable " +
       "GROUP BY TUMBLE(rowtime, INTERVAL '0.003' SECOND)")
 
     val results = t.toAppendStream[Row]
@@ -292,7 +292,7 @@ class TimeAttributesITCase extends StreamingMultipleProgramsTestBase {
     tEnv.registerTable("T1", table)
     val querySql = "select rowtime as ts, string as msg from T1"
 
-    val results = tEnv.sql(querySql).toAppendStream[Pojo1]
+    val results = tEnv.sqlQuery(querySql).toAppendStream[Pojo1]
     results.addSink(new StreamITCase.StringSink[Pojo1])
     env.execute()
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala
index ab7925b..e40da7a 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala
@@ -60,7 +60,7 @@ class JoinITCase extends StreamingWithStateTestBase {
     tEnv.registerTable("T1", t1)
     tEnv.registerTable("T2", t2)
 
-    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+    val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
     result.addSink(new StreamITCase.StringSink[Row])
     env.execute()
   }
@@ -97,7 +97,7 @@ class JoinITCase extends StreamingWithStateTestBase {
     tEnv.registerTable("T1", t1)
     tEnv.registerTable("T2", t2)
 
-    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+    val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
     result.addSink(new StreamITCase.StringSink[Row])
     env.execute()
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/OverWindowITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/OverWindowITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/OverWindowITCase.scala
index cc47a69..4884513 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/OverWindowITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/OverWindowITCase.scala
@@ -68,7 +68,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
       "    PARTITION BY a ORDER BY proctime ROWS BETWEEN 4 PRECEDING AND CURRENT ROW) " +
       "FROM MyTable"
 
-    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+    val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
     result.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
@@ -110,7 +110,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
       "  MIN(c) OVER (" +
       "    ORDER BY proctime ROWS BETWEEN 10 PRECEDING AND CURRENT ROW) " +
       "FROM MyTable"
-    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+    val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
     result.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
@@ -153,7 +153,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
       "sum(a) OVER (PARTITION BY c ORDER BY proctime RANGE UNBOUNDED preceding) " +
       "from T1"
 
-    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+    val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
     result.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
@@ -181,7 +181,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
       " OVER (PARTITION BY c ORDER BY proctime ROWS BETWEEN UNBOUNDED preceding AND CURRENT ROW) " +
       "as cnt1 from T1)"
 
-    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+    val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
     result.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
@@ -215,7 +215,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
       "sum(a) OVER (ORDER BY proctime RANGE UNBOUNDED preceding) " +
       "from T1"
 
-    val result = tEnv.sql(sqlQuery).toAppendStream[Row](queryConfig)
+    val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row](queryConfig)
     result.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
@@ -240,7 +240,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
       "count(a) OVER (ORDER BY proctime ROWS BETWEEN UNBOUNDED preceding AND CURRENT ROW) " +
       "from T1"
 
-    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+    val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
     result.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
@@ -302,7 +302,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
       "    BETWEEN INTERVAL '1' SECOND PRECEDING AND CURRENT ROW)" +
       " FROM T1"
 
-    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+    val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
     result.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
@@ -363,7 +363,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
       "    OVER (PARTITION BY c ORDER BY rowtime ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) " +
       "FROM T1"
 
-    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+    val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
     result.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
@@ -431,7 +431,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
       "    OVER (ORDER BY rowtime RANGE BETWEEN INTERVAL '1' SECOND PRECEDING AND CURRENT ROW) " +
       " FROM T1"
 
-    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+    val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
     result.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
@@ -492,7 +492,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
       "  SUM(a) OVER (ORDER BY rowtime ROWS BETWEEN 2 preceding AND CURRENT ROW) " +
       "FROM T1"
 
-    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+    val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
     result.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
@@ -553,7 +553,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
 
     tEnv.registerTable("T1", t1)
 
-    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+    val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
     result.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
@@ -619,7 +619,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
 
     tEnv.registerTable("T1", t1)
 
-    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+    val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
     result.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
@@ -681,7 +681,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
 
     tEnv.registerTable("T1", t1)
 
-    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+    val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
     result.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
@@ -742,7 +742,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
 
     tEnv.registerTable("T1", t1)
 
-    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+    val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
     result.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
@@ -814,7 +814,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
 
     tEnv.registerTable("T1", t1)
 
-    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+    val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
     result.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SortITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SortITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SortITCase.scala
index 2c59f8c..19db2a0 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SortITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SortITCase.scala
@@ -87,7 +87,7 @@ class SortITCase extends StreamingWithStateTestBase {
     val  sqlQuery = "SELECT b FROM T1 ORDER BY rowtime, b ASC "
       
       
-    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+    val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
     result.addSink(new StringRowSelectorSink(0)).setParallelism(1)
     env.execute()
     

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala
index 5398c6d..2c82d9c 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala
@@ -18,16 +18,21 @@
 
 package org.apache.flink.table.runtime.stream.sql
 
+import java.util
+
 import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
 import org.apache.flink.api.java.typeutils.RowTypeInfo
 import org.apache.flink.api.scala._
 import org.apache.flink.streaming.api.TimeCharacteristic
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.api.{TableEnvironment, Types}
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.runtime.utils.TimeTestUtil.EventTimeSourceFunction
 import org.apache.flink.table.runtime.utils.{StreamITCase, StreamTestData, StreamingWithStateTestBase}
 import org.apache.flink.types.Row
+import org.apache.flink.table.utils.MemoryTableSinkUtil
+
+import scala.collection.JavaConverters._
 import org.junit.Assert._
 import org.junit._
 
@@ -58,7 +63,7 @@ class SqlITCase extends StreamingWithStateTestBase {
     val t = ds.toTable(tEnv).as('a, 'b, 'c)
     tEnv.registerTable("MyTableRow", t)
 
-    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+    val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
     result.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
@@ -79,7 +84,7 @@ class SqlITCase extends StreamingWithStateTestBase {
     val t = StreamTestData.get3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
     tEnv.registerTable("MyTable", t)
 
-    val result = tEnv.sql(sqlQuery).toRetractStream[Row]
+    val result = tEnv.sqlQuery(sqlQuery).toRetractStream[Row]
     result.addSink(new StreamITCase.RetractingSink).setParallelism(1)
     env.execute()
 
@@ -100,7 +105,7 @@ class SqlITCase extends StreamingWithStateTestBase {
     val t = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
     tEnv.registerTable("MyTable", t)
 
-    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+    val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
     result.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
@@ -121,7 +126,7 @@ class SqlITCase extends StreamingWithStateTestBase {
     val t = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
     tEnv.registerTable("MyTable", t)
 
-    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+    val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
     result.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
@@ -142,7 +147,7 @@ class SqlITCase extends StreamingWithStateTestBase {
     val t = StreamTestData.getSmall3TupleDataStream(env)
     tEnv.registerDataStream("MyTable", t)
 
-    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+    val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
     result.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
@@ -166,7 +171,7 @@ class SqlITCase extends StreamingWithStateTestBase {
     val t2 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
     tEnv.registerTable("T2", t2)
 
-    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+    val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
     result.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
@@ -193,7 +198,7 @@ class SqlITCase extends StreamingWithStateTestBase {
     val t2 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
     tEnv.registerTable("T2", t2)
 
-    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+    val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
     result.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
@@ -219,7 +224,7 @@ class SqlITCase extends StreamingWithStateTestBase {
     val t2 = StreamTestData.get3TupleDataStream(env)
     tEnv.registerDataStream("T2", t2, 'a, 'b, 'c)
 
-    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+    val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
     result.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
@@ -244,7 +249,7 @@ class SqlITCase extends StreamingWithStateTestBase {
 
     val sqlQuery = "SELECT a, b, s FROM T, UNNEST(T.b) AS A (s)"
 
-    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+    val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
     result.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
@@ -276,7 +281,7 @@ class SqlITCase extends StreamingWithStateTestBase {
 
     val sqlQuery = "SELECT a, s FROM T, UNNEST(T.c) AS A (s)"
 
-    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+    val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
     result.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
@@ -306,7 +311,7 @@ class SqlITCase extends StreamingWithStateTestBase {
 
     val sqlQuery = "SELECT a, b, s, t FROM T, UNNEST(T.b) AS A (s, t) WHERE s > 13"
 
-    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+    val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
     result.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
@@ -365,5 +370,33 @@ class SqlITCase extends StreamingWithStateTestBase {
     assertEquals(expected.sorted, StreamITCase.testResults.sorted)
   }
 
-}
+  @Test
+  def testInsertIntoMemoryTable(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    MemoryTableSinkUtil.clear
 
+    val t = StreamTestData.getSmall3TupleDataStream(env)
+        .assignAscendingTimestamps(x => x._2)
+      .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
+    tEnv.registerTable("sourceTable", t)
+
+    val fieldNames = Array("d", "e", "f", "t")
+    val fieldTypes = Array(Types.INT, Types.LONG, Types.STRING, Types.SQL_TIMESTAMP)
+      .asInstanceOf[Array[TypeInformation[_]]]
+    val sink = new MemoryTableSinkUtil.UnsafeMemoryAppendTableSink
+    tEnv.registerTableSink("targetTable", fieldNames, fieldTypes, sink)
+
+    val sql = "INSERT INTO targetTable SELECT a, b, c, rowtime FROM sourceTable"
+    tEnv.sqlUpdate(sql)
+    env.execute()
+
+    val expected = List(
+      "1,1,Hi,1970-01-01 00:00:00.001",
+      "2,2,Hello,1970-01-01 00:00:00.002",
+      "3,2,Hello world,1970-01-01 00:00:00.002")
+    assertEquals(expected.sorted, MemoryTableSinkUtil.results.sorted)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/TableSourceITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/TableSourceITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/TableSourceITCase.scala
index be876a8..30ada56 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/TableSourceITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/TableSourceITCase.scala
@@ -43,7 +43,7 @@ class TableSourceITCase extends StreamingMultipleProgramsTestBase {
 
     tEnv.registerTableSource("persons", csvTable)
 
-    tEnv.sql(
+    tEnv.sqlQuery(
       "SELECT id, `first`, `last`, score FROM persons WHERE id < 4 ")
       .toAppendStream[Row]
       .addSink(new StreamITCase.StringSink[Row])

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala
index 830359f..c5b82fe 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala
@@ -20,7 +20,6 @@ package org.apache.flink.table.runtime.stream.table
 
 import java.io.File
 import java.lang.{Boolean => JBool}
-import java.sql.Timestamp
 
 import org.apache.flink.api.common.functions.MapFunction
 import org.apache.flink.api.common.typeinfo.TypeInformation
@@ -37,6 +36,7 @@ import org.apache.flink.table.api.scala._
 import org.apache.flink.table.api.{TableEnvironment, TableException, Types}
 import org.apache.flink.table.runtime.utils.{StreamITCase, StreamTestData}
 import org.apache.flink.table.sinks._
+import org.apache.flink.table.utils.MemoryTableSinkUtil
 import org.apache.flink.test.util.TestBaseUtils
 import org.apache.flink.types.Row
 import org.apache.flink.util.Collector
@@ -49,6 +49,36 @@ import scala.collection.JavaConverters._
 class TableSinkITCase extends StreamingMultipleProgramsTestBase {
 
   @Test
+  def testInsertIntoRegisteredTableSink(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    MemoryTableSinkUtil.clear
+
+    val input = StreamTestData.get3TupleDataStream(env)
+      .assignAscendingTimestamps(r => r._2)
+    val fieldNames = Array("d", "e", "t")
+    val fieldTypes: Array[TypeInformation[_]] = Array(Types.STRING, Types.SQL_TIMESTAMP, Types.LONG)
+    val sink = new MemoryTableSinkUtil.UnsafeMemoryAppendTableSink
+    tEnv.registerTableSink("targetTable", fieldNames, fieldTypes, sink)
+
+    val results = input.toTable(tEnv, 'a, 'b, 'c, 't.rowtime)
+      .where('a < 3 || 'a > 19)
+      .select('c, 't, 'b)
+      .insertInto("targetTable")
+    env.execute()
+
+    val expected = Seq(
+      "Hi,1970-01-01 00:00:00.001,1",
+      "Hello,1970-01-01 00:00:00.002,2",
+      "Comment#14,1970-01-01 00:00:00.006,6",
+      "Comment#15,1970-01-01 00:00:00.006,6").mkString("\n")
+
+    TestBaseUtils.compareResultAsText(MemoryTableSinkUtil.results.asJava, expected)
+  }
+
+  @Test
   def testStreamTableSink(): Unit = {
 
     val tmpFile = File.createTempFile("flink-table-sink-test", ".tmp")

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MemoryTableSinkUtil.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MemoryTableSinkUtil.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MemoryTableSinkUtil.scala
new file mode 100644
index 0000000..29b2e94
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MemoryTableSinkUtil.scala
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.utils
+
+import org.apache.flink.api.common.io.RichOutputFormat
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
+import org.apache.flink.table.sinks.{AppendStreamTableSink, BatchTableSink, TableSinkBase}
+import org.apache.flink.types.Row
+
+import scala.collection.mutable
+
+object MemoryTableSinkUtil {
+  var results: mutable.MutableList[String] = mutable.MutableList.empty[String]
+
+  def clear = {
+    MemoryTableSinkUtil.results.clear()
+  }
+
+  final class UnsafeMemoryAppendTableSink
+    extends TableSinkBase[Row] with BatchTableSink[Row]
+    with AppendStreamTableSink[Row] {
+
+    override def getOutputType: TypeInformation[Row] = {
+      new RowTypeInfo(getFieldTypes, getFieldNames)
+    }
+
+    override protected def copy: TableSinkBase[Row] = {
+      new UnsafeMemoryAppendTableSink
+    }
+
+    override def emitDataSet(dataSet: DataSet[Row]): Unit = {
+      dataSet.output(new MemoryCollectionOutputFormat)
+    }
+
+    override def emitDataStream(dataStream: DataStream[Row]): Unit = {
+      dataStream.addSink(new MemoryAppendSink)
+    }
+  }
+
+  private class MemoryAppendSink extends RichSinkFunction[Row]() {
+
+    override def invoke(value: Row): Unit = {
+      results.synchronized {
+        results += value.toString
+      }
+    }
+  }
+
+  private class MemoryCollectionOutputFormat extends RichOutputFormat[Row] {
+
+    override def configure(parameters: Configuration): Unit = {}
+
+    override def open(taskNumber: Int, numTasks: Int): Unit = {}
+
+    override def writeRecord(record: Row): Unit = {
+      results.synchronized {
+        results += record.toString
+      }
+    }
+
+    override def close(): Unit = {}
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
index c4e2433..ff7c79d 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
@@ -19,6 +19,7 @@
 package org.apache.flink.table.utils
 
 import org.apache.calcite.tools.RuleSet
+import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.table.api.{QueryConfig, Table, TableConfig, TableEnvironment}
 import org.apache.flink.table.sinks.TableSink
 import org.apache.flink.table.sources.TableSource
@@ -40,4 +41,9 @@ class MockTableEnvironment extends TableEnvironment(new TableConfig) {
 
   override protected def getBuiltInPhysicalOptRuleSet: RuleSet = ???
 
+  override def registerTableSink(
+      name: String,
+      fieldNames: Array[String],
+      fieldTypes: Array[TypeInformation[_]],
+      tableSink: TableSink[_]): Unit = ???
 }