You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/09/20 08:13:16 UTC

[1/4] flink git commit: [FLINK-6442] [table] Add registration for TableSinks and INSERT INTO support for SQL and Table API.

Repository: flink
Updated Branches:
  refs/heads/master 73a2443d4 -> 2cb37cb93


http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
index d99aac1..0a0d12e 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
@@ -178,7 +178,7 @@ case class BatchTableTestUtil() extends TableTestUtil {
   }
 
   def verifySql(query: String, expected: String): Unit = {
-    verifyTable(tableEnv.sql(query), expected)
+    verifyTable(tableEnv.sqlQuery(query), expected)
   }
 
   def verifyTable(resultTable: Table, expected: String): Unit = {
@@ -197,7 +197,7 @@ case class BatchTableTestUtil() extends TableTestUtil {
   }
 
   def printSql(query: String): Unit = {
-    printTable(tableEnv.sql(query))
+    printTable(tableEnv.sqlQuery(query))
   }
 
 }
@@ -256,7 +256,7 @@ case class StreamTableTestUtil() extends TableTestUtil {
   }
 
   def verifySql(query: String, expected: String): Unit = {
-    verifyTable(tableEnv.sql(query), expected)
+    verifyTable(tableEnv.sqlQuery(query), expected)
   }
 
   def verifyTable(resultTable: Table, expected: String): Unit = {
@@ -276,7 +276,7 @@ case class StreamTableTestUtil() extends TableTestUtil {
   }
 
   def printSql(query: String): Unit = {
-    printTable(tableEnv.sql(query))
+    printTable(tableEnv.sqlQuery(query))
   }
 
 }


[3/4] flink git commit: [FLINK-6442] [table] Add registration for TableSinks and INSERT INTO support for SQL and Table API.

Posted by fh...@apache.org.
[FLINK-6442] [table] Add registration for TableSinks and INSERT INTO support for SQL and Table API.

This closes #3829.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2cb37cb9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2cb37cb9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2cb37cb9

Branch: refs/heads/master
Commit: 2cb37cb937c6f225ad7afe829a28a6eda043ffc1
Parents: df5efe9
Author: lincoln-lil <li...@gmail.com>
Authored: Thu May 4 17:52:34 2017 +0800
Committer: Fabian Hueske <fh...@apache.org>
Committed: Wed Sep 20 10:12:13 2017 +0200

----------------------------------------------------------------------
 docs/dev/table/common.md                        | 153 ++++++++++++--
 docs/dev/table/sql.md                           |  83 ++++++--
 docs/dev/table/tableApi.md                      | 105 ++++++++--
 docs/dev/table/udfs.md                          |  20 +-
 .../addons/hbase/HBaseConnectorITCase.java      |   8 +-
 .../flink/table/examples/java/WordCountSQL.java |   2 +-
 .../table/examples/scala/StreamSQLExample.scala |   2 +-
 .../table/examples/scala/WordCountSQL.scala     |   2 +-
 .../flink/table/api/BatchTableEnvironment.scala |  50 ++++-
 .../table/api/StreamTableEnvironment.scala      |  60 +++++-
 .../flink/table/api/TableEnvironment.scala      | 202 +++++++++++++++++--
 .../apache/flink/table/api/queryConfig.scala    |   1 +
 .../org/apache/flink/table/api/table.scala      |  41 +++-
 .../flink/table/calcite/FlinkPlannerImpl.scala  |   6 +-
 .../table/plan/schema/TableSinkTable.scala      |  45 +++++
 .../runtime/batch/JavaTableSourceITCase.java    |   2 +-
 .../runtime/batch/sql/GroupingSetsITCase.java   |   6 +-
 .../table/runtime/batch/sql/JavaSqlITCase.java  |  12 +-
 .../table/runtime/stream/sql/JavaSqlITCase.java |   8 +-
 .../api/batch/BatchTableEnvironmentTest.scala   |   4 +-
 .../sql/validation/CalcValidationTest.scala     |   2 +-
 .../validation/InsertIntoValidationTest.scala   |  77 +++++++
 .../sql/validation/JoinValidationTest.scala     |  22 +-
 .../validation/OverWindowValidationTest.scala   |   4 +-
 .../sql/validation/SortValidationTest.scala     |   2 +-
 .../validation/InsertIntoValidationTest.scala   |  61 ++++++
 .../api/stream/StreamTableEnvironmentTest.scala |   4 +-
 .../flink/table/api/stream/sql/JoinTest.scala   |   4 +-
 .../validation/InsertIntoValidationTest.scala   |  87 ++++++++
 .../validation/OverWindowValidationTest.scala   |   6 +-
 .../validation/CorrelateValidationTest.scala    |   6 +-
 .../validation/InsertIntoValidationTest.scala   |  68 +++++++
 .../validation/TableSinksValidationTest.scala   |  27 ++-
 .../expressions/utils/ExpressionTestBase.scala  |   1 -
 .../plan/ExpressionReductionRulesTest.scala     |   4 +-
 .../flink/table/plan/RetractionRulesTest.scala  |   2 +-
 .../plan/TimeIndicatorConversionTest.scala      |   8 +-
 .../runtime/batch/sql/AggregateITCase.scala     |  34 ++--
 .../table/runtime/batch/sql/CalcITCase.scala    |  26 +--
 .../table/runtime/batch/sql/JoinITCase.scala    |  44 ++--
 .../runtime/batch/sql/SetOperatorsITCase.scala  |  24 +--
 .../table/runtime/batch/sql/SortITCase.scala    |   8 +-
 .../batch/sql/TableEnvironmentITCase.scala      |  35 +++-
 .../runtime/batch/sql/TableSourceITCase.scala   |   4 +-
 .../table/runtime/batch/table/CalcITCase.scala  |   6 +-
 .../batch/table/TableEnvironmentITCase.scala    |  24 +++
 .../runtime/stream/TimeAttributesITCase.scala   |   4 +-
 .../table/runtime/stream/sql/JoinITCase.scala   |   4 +-
 .../runtime/stream/sql/OverWindowITCase.scala   |  30 +--
 .../table/runtime/stream/sql/SortITCase.scala   |   2 +-
 .../table/runtime/stream/sql/SqlITCase.scala    |  59 ++++--
 .../runtime/stream/sql/TableSourceITCase.scala  |   2 +-
 .../runtime/stream/table/TableSinkITCase.scala  |  32 ++-
 .../flink/table/utils/MemoryTableSinkUtil.scala |  84 ++++++++
 .../table/utils/MockTableEnvironment.scala      |   6 +
 .../flink/table/utils/TableTestBase.scala       |   8 +-
 56 files changed, 1371 insertions(+), 262 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/docs/dev/table/common.md
----------------------------------------------------------------------
diff --git a/docs/dev/table/common.md b/docs/dev/table/common.md
index fed2b6d..acd5711 100644
--- a/docs/dev/table/common.md
+++ b/docs/dev/table/common.md
@@ -50,7 +50,7 @@ tableEnv.registerExternalCatalog("extCat", ...);
 // create a Table from a Table API query
 Table tapiResult = tableEnv.scan("table1").select(...);
 // create a Table from a SQL query
-Table sqlResult  = tableEnv.sql("SELECT ... FROM table2 ... ");
+Table sqlResult  = tableEnv.sqlQuery("SELECT ... FROM table2 ... ");
 
 // emit a Table API result Table to a TableSink, same for SQL result
 tapiResult.writeToSink(...);
@@ -77,7 +77,7 @@ tableEnv.registerExternalCatalog("extCat", ...)
 // create a Table from a Table API query
 val tapiResult = tableEnv.scan("table1").select(...)
 // Create a Table from a SQL query
-val sqlResult  = tableEnv.sql("SELECT ... FROM table2 ...")
+val sqlResult  = tableEnv.sqlQuery("SELECT ... FROM table2 ...")
 
 // emit a Table API result Table to a TableSink, same for SQL result
 tapiResult.writeToSink(...)
@@ -149,18 +149,18 @@ val bTableEnv = TableEnvironment.getTableEnvironment(bEnv)
 
 {% top %}
 
-Register a Table in the Catalog
+Register Tables in the Catalog
 -------------------------------
 
-A `TableEnvironment` has an internal catalog of tables, organized by table name. Table API or SQL queries can access tables which are registered in the catalog, by referencing them by name. 
+A `TableEnvironment` maintains a catalog of tables which are registered by name. There are two types of tables, *input tables* and *output tables*. Input tables can be referenced in Table API and SQL queries and provide input data. Output tables can be used to emit the result of a Table API or SQL query to an external system.
 
-A `TableEnvironment` allows you to register a table from various sources:
+An input table can be registered from various sources:
 
 * an existing `Table` object, usually the result of a Table API or SQL query.
 * a `TableSource`, which accesses external data, such as a file, database, or messaging system. 
-* a `DataStream` or `DataSet` from a DataStream or DataSet program.
+* a `DataStream` or `DataSet` from a DataStream or DataSet program. Registering a `DataStream` or `DataSet` is discussed in the [Integration with DataStream and DataSet API](#integration-with-datastream-and-dataset-api) section.
 
-Registering a `DataStream` or `DataSet` as a table is discussed in the [Integration with DataStream and DataSet API](#integration-with-datastream-and-dataset-api) section.
+An output table can be registerd using a `TableSink`. 
 
 ### Register a Table
 
@@ -200,7 +200,7 @@ tableEnv.registerTable("projectedTable", projTable)
 
 ### Register a TableSource
 
-A `TableSource` provides access to external data which is stored in a storage systems such as a database (MySQL, HBase, ...), a file with specific encoding (CSV, Apache \[Parquet, Avro, ORC\], ...), or a messaging system (Apache Kafka, RabbitMQ, ...). 
+A `TableSource` provides access to external data which is stored in a storage system such as a database (MySQL, HBase, ...), a file with a specific encoding (CSV, Apache \[Parquet, Avro, ORC\], ...), or a messaging system (Apache Kafka, RabbitMQ, ...). 
 
 Flink aims to provide TableSources for common data formats and storage systems. Please have a look at the [Table Sources and Sinks]({{ site.baseurl }}/dev/table/sourceSinks.html) page for a list of supported TableSources and instructions for how to build a custom `TableSource`.
 
@@ -236,6 +236,52 @@ tableEnv.registerTableSource("CsvTable", csvSource)
 
 {% top %}
 
+### Register a TableSink
+
+A registered `TableSink` can be used to [emit the result of a Table API or SQL query](common.html#emit-a-table) to an external storage system, such as a database, key-value store, message queue, or file system (in different encodings, e.g., CSV, Apache \[Parquet, Avro, ORC\], ...).
+
+Flink aims to provide TableSinks for common data formats and storage systems. Please see the documentation about [Table Sources and Sinks]({{ site.baseurl }}/dev/table/sourceSinks.html) page for details about available sinks and instructions for how to implement a custom `TableSink`.
+
+A `TableSink` is registered in a `TableEnvironment` as follows:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+// get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
+StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+
+// create a TableSink
+TableSink csvSink = new CsvTableSink("/path/to/file", ...);
+
+// define the field names and types
+String[] fieldNames = {"a", "b", "c"};
+TypeInformation[] fieldTypes = {Types.INT, Types.STRING, Types.LONG};
+
+// register the TableSink as table "CsvSinkTable"
+tableEnv.registerTableSink("CsvSinkTable", fieldNames, fieldTypes, csvSink);
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+// get a TableEnvironment
+val tableEnv = TableEnvironment.getTableEnvironment(env)
+
+// create a TableSink
+val csvSink: TableSink = new CsvTableSink("/path/to/file", ...)
+
+// define the field names and types
+val fieldNames: Arary[String] = Array("a", "b", "c")
+val fieldTypes: Array[TypeInformation[_]] = Array(Types.INT, Types.STRING, Types.LONG)
+
+// register the TableSink as table "CsvSinkTable"
+tableEnv.registerTableSink("CsvSinkTable", fieldNames, fieldTypes, csvSink)
+{% endhighlight %}
+</div>
+</div>
+
+{% top %}
+
 Register an External Catalog
 ----------------------------
 
@@ -342,7 +388,7 @@ Flink's SQL integration is based on [Apache Calcite](https://calcite.apache.org)
 
 The [SQL]({{ site.baseurl }}/dev/table/sql.html) document describes Flink's SQL support for streaming and batch tables.
 
-The following example shows how to specify a query and return the result as a Table.
+The following example shows how to specify a query and return the result as a `Table`.
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
@@ -353,7 +399,7 @@ StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
 // register Orders table
 
 // compute revenue for all customers from France
-Table revenue = tableEnv.sql(
+Table revenue = tableEnv.sqlQuery(
     "SELECT cID, cName, SUM(revenue) AS revSum " +
     "FROM Orders " +
     "WHERE cCountry = 'FRANCE' " +
@@ -373,7 +419,7 @@ val tableEnv = TableEnvironment.getTableEnvironment(env)
 // register Orders table
 
 // compute revenue for all customers from France
-Table revenue = tableEnv.sql(""" 
+Table revenue = tableEnv.sqlQuery("""
   |SELECT cID, cName, SUM(revenue) AS revSum
   |FROM Orders
   |WHERE cCountry = 'FRANCE'
@@ -387,6 +433,53 @@ Table revenue = tableEnv.sql("""
 </div>
 </div>
 
+The following example shows how to specify an update query that inserts its result into a registered table.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+// get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
+StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+
+// register "Orders" table
+// register "RevenueFrance" output table
+
+// compute revenue for all customers from France and emit to "RevenueFrance"
+tableEnv.sqlUpdate(
+    "INSERT INTO RevenueFrance " +
+    "SELECT cID, cName, SUM(revenue) AS revSum " +
+    "FROM Orders " +
+    "WHERE cCountry = 'FRANCE' " +
+    "GROUP BY cID, cName"
+  );
+
+// execute query
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+// get a TableEnvironment
+val tableEnv = TableEnvironment.getTableEnvironment(env)
+
+// register "Orders" table
+// register "RevenueFrance" output table
+
+// compute revenue for all customers from France and emit to "RevenueFrance"
+tableEnv.sqlUpdate("""
+  |INSERT INTO RevenueFrance
+  |SELECT cID, cName, SUM(revenue) AS revSum
+  |FROM Orders
+  |WHERE cCountry = 'FRANCE'
+  |GROUP BY cID, cName
+  """.stripMargin)
+
+// execute query
+{% endhighlight %}
+
+</div>
+</div>
+
 {% top %}
 
 ### Mixing Table API and SQL
@@ -401,12 +494,19 @@ Table API and SQL queries can be easily mixed because both return `Table` object
 Emit a Table 
 ------------
 
-In order to emit a `Table`, it can be written to a `TableSink`. A `TableSink` is a generic interface to support a wide variety of file formats (e.g. CSV, Apache Parquet, Apache Avro), storage systems (e.g., JDBC, Apache HBase, Apache Cassandra, Elasticsearch), or messaging systems (e.g., Apache Kafka, RabbitMQ). 
+A `Table` is emitted by writing it to a `TableSink`. A `TableSink` is a generic interface to support a wide variety of file formats (e.g. CSV, Apache Parquet, Apache Avro), storage systems (e.g., JDBC, Apache HBase, Apache Cassandra, Elasticsearch), or messaging systems (e.g., Apache Kafka, RabbitMQ). 
 
-A batch `Table` can only be written to a `BatchTableSink`, while a streaming table requires either an `AppendStreamTableSink`, a `RetractStreamTableSink`, or an `UpsertStreamTableSink`. 
+A batch `Table` can only be written to a `BatchTableSink`, while a streaming `Table` requires either an `AppendStreamTableSink`, a `RetractStreamTableSink`, or an `UpsertStreamTableSink`. 
 
 Please see the documentation about [Table Sources & Sinks]({{ site.baseurl }}/dev/table/sourceSinks.html) for details about available sinks and instructions for how to implement a custom `TableSink`.
 
+There are two ways to emit a table:
+
+1. The `Table.writeToSink(TableSink sink)` method emits the table using the provided `TableSink` and automatically configures the sink with the schema of the table to emit.
+2. The `Table.insertInto(String sinkTable)` method looks up a `TableSink` that was registered with a specific schema under the provided name in the `TableEnvironment`'s catalog. The schema of the table to emit is validated against the schema of the registered `TableSink`.
+
+The following examples shows how to emit a `Table`:
+
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
@@ -419,9 +519,18 @@ Table result = ...
 // create a TableSink
 TableSink sink = new CsvTableSink("/path/to/file", fieldDelim = "|");
 
-// write the result Table to the TableSink
+// METHOD 1:
+//   Emit the result Table to the TableSink via the writeToSink() method
 result.writeToSink(sink);
 
+// METHOD 2:
+//   Register the TableSink with a specific schema
+String[] fieldNames = {"a", "b", "c"};
+TypeInformation[] fieldTypes = {Types.INT, Types.STRING, Types.LONG};
+tableEnv.registerTableSink("CsvSinkTable", fieldNames, fieldTypes, sink);
+//   Emit the result Table to the registered TableSink via the insertInto() method
+result.insertInto("CsvSinkTable");
+
 // execute the program
 {% endhighlight %}
 </div>
@@ -437,9 +546,18 @@ val result: Table = ...
 // create a TableSink
 val sink: TableSink = new CsvTableSink("/path/to/file", fieldDelim = "|")
 
-// write the result Table to the TableSink
+// METHOD 1:
+//   Emit the result Table to the TableSink via the writeToSink() method
 result.writeToSink(sink)
 
+// METHOD 2:
+//   Register the TableSink with a specific schema
+val fieldNames: Array[String] = Array("a", "b", "c")
+val fieldTypes: Array[TypeInformation] = Array(Types.INT, Types.STRING, Types.LONG)
+tableEnv.registerTableSink("CsvSinkTable", fieldNames, fieldTypes, sink)
+//   Emit the result Table to the registered TableSink via the insertInto() method
+result.insertInto("CsvSinkTable")
+
 // execute the program
 {% endhighlight %}
 </div>
@@ -458,8 +576,9 @@ Table API and SQL queries are translated into [DataStream]({{ site.baseurl }}/de
 
 A Table API or SQL query is translated when:
 
-* the `Table` is emitted to a `TableSink`, i.e., when `Table.writeToSink()` is called.
-* the `Table` is converted into a `DataStream` or `DataSet` (see [Integration with DataStream and DataSet API](#integration-with-dataStream-and-dataSet-api)).
+* a `Table` is emitted to a `TableSink`, i.e., when `Table.writeToSink()` or `Table.insertInto()` is called.
+* a SQL update query is specified, i.e., when `TableEnvironment.sqlUpdate()` is called.
+* a `Table` is converted into a `DataStream` or `DataSet` (see [Integration with DataStream and DataSet API](#integration-with-dataStream-and-dataSet-api)).
 
 Once translated, a Table API or SQL query is handled like a regular DataStream or DataSet program and is executed when `StreamExecutionEnvironment.execute()` or `ExecutionEnvironment.execute()` is called.
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/docs/dev/table/sql.md
----------------------------------------------------------------------
diff --git a/docs/dev/table/sql.md b/docs/dev/table/sql.md
index fa4e3f3..b9205ab 100644
--- a/docs/dev/table/sql.md
+++ b/docs/dev/table/sql.md
@@ -49,15 +49,25 @@ DataStream<Tuple3<Long, String, Integer>> ds = env.addSource(...);
 
 // SQL query with an inlined (unregistered) table
 Table table = tableEnv.toTable(ds, "user, product, amount");
-Table result = tableEnv.sql(
+Table result = tableEnv.sqlQuery(
   "SELECT SUM(amount) FROM " + table + " WHERE product LIKE '%Rubber%'");
 
 // SQL query with a registered table
 // register the DataStream as table "Orders"
 tableEnv.registerDataStream("Orders", ds, "user, product, amount");
 // run a SQL query on the Table and retrieve the result as a new Table
-Table result2 = tableEnv.sql(
+Table result2 = tableEnv.sqlQuery(
   "SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'");
+
+// SQL update with a registered table
+// create and register a TableSink
+TableSink csvSink = new CsvTableSink("/path/to/file", ...);
+String[] fieldNames = {"product", "amount"};
+TypeInformation[] fieldTypes = {Types.STRING, Types.INT};
+tableEnv.registerTableSink("RubberOrders", fieldNames, fieldTypes, csvSink);
+// run a SQL update query on the Table and emit the result to the TableSink
+tableEnv.sqlUpdate(
+  "INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'");
 {% endhighlight %}
 </div>
 
@@ -71,15 +81,25 @@ val ds: DataStream[(Long, String, Integer)] = env.addSource(...)
 
 // SQL query with an inlined (unregistered) table
 val table = ds.toTable(tableEnv, 'user, 'product, 'amount)
-val result = tableEnv.sql(
+val result = tableEnv.sqlQuery(
   s"SELECT SUM(amount) FROM $table WHERE product LIKE '%Rubber%'")
 
 // SQL query with a registered table
 // register the DataStream under the name "Orders"
 tableEnv.registerDataStream("Orders", ds, 'user, 'product, 'amount)
 // run a SQL query on the Table and retrieve the result as a new Table
-val result2 = tableEnv.sql(
+val result2 = tableEnv.sqlQuery(
   "SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")
+
+// SQL update with a registered table
+// create and register a TableSink
+TableSink csvSink = new CsvTableSink("/path/to/file", ...)
+val fieldNames: Arary[String] = Array("product", "amount")
+val fieldTypes: Array[TypeInformation[_]] = Array(Types.STRING, Types.INT)
+tableEnv.registerTableSink("RubberOrders", fieldNames, fieldTypes, csvSink)
+// run a SQL update query on the Table and emit the result to the TableSink
+tableEnv.sqlUpdate(
+  "INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")
 {% endhighlight %}
 </div>
 </div>
@@ -89,7 +109,7 @@ val result2 = tableEnv.sql(
 Supported Syntax
 ----------------
 
-Flink parses SQL using [Apache Calcite](https://calcite.apache.org/docs/reference.html), which supports standard ANSI SQL. DML and DDL statements are not supported by Flink.
+Flink parses SQL using [Apache Calcite](https://calcite.apache.org/docs/reference.html), which supports standard ANSI SQL. DDL statements are not supported by Flink.
 
 The following BNF-grammar describes the superset of supported SQL features in batch and streaming queries. The [Operations](#operations) section shows examples for the supported features and indicates which features are only supported for batch or streaming queries.
 
@@ -156,6 +176,10 @@ groupItem:
   | ROLLUP '(' expression [, expression ]* ')'
   | GROUPING SETS '(' groupItem [, groupItem ]* ')'
 
+insert:
+  INSERT INTO tableReference
+  query
+
 ```
 
 Flink SQL uses a lexical policy for identifier (table, attribute, function names) similar to Java:
@@ -566,6 +590,39 @@ LIMIT 3
 
 {% top %}
 
+### Insert
+
+<div markdown="1">
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left" style="width: 20%">Operation</th>
+      <th class="text-center">Description</th>
+    </tr>
+  </thead>
+  <tbody>
+    <tr>
+      <td>
+        <strong>Insert Into</strong><br>
+        <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span>
+      </td>
+      <td>
+        <p>Output tables must be registered in the TableEnvironment (see <a href="common.html#register-a-tablesink">Register a TableSink</a>). Moreover, the schema of the registered table must match the schema of the query.</p>
+
+{% highlight sql %}
+INSERT INTO OutputTable
+SELECT users, tag
+FROM Orders
+{% endhighlight %}
+      </td>
+    </tr>
+
+  </tbody>
+</table>
+</div>
+
+{% top %}
+
 ### Group Windows
 
 Group windows are defined in the `GROUP BY` clause of a SQL query. Just like queries with regular `GROUP BY` clauses, queries with a `GROUP BY` clause that includes a group window function compute a single result row per group. The following group windows functions are supported for SQL on batch and streaming tables.
@@ -649,22 +706,22 @@ DataStream<Tuple3<Long, String, Integer>> ds = env.addSource(...);
 tableEnv.registerDataStream("Orders", ds, "user, product, amount, proctime.proctime, rowtime.rowtime");
 
 // compute SUM(amount) per day (in event-time)
-Table result1 = tableEnv.sql(
+Table result1 = tableEnv.sqlQuery(
   "SELECT user, " +
   "  TUMBLE_START(rowtime, INTERVAL '1' DAY) as wStart,  " +
   "  SUM(amount) FROM Orders " +
   "GROUP BY TUMBLE(rowtime, INTERVAL '1' DAY), user");
 
 // compute SUM(amount) per day (in processing-time)
-Table result2 = tableEnv.sql(
+Table result2 = tableEnv.sqlQuery(
   "SELECT user, SUM(amount) FROM Orders GROUP BY TUMBLE(proctime, INTERVAL '1' DAY), user");
 
 // compute every hour the SUM(amount) of the last 24 hours in event-time
-Table result3 = tableEnv.sql(
+Table result3 = tableEnv.sqlQuery(
   "SELECT product, SUM(amount) FROM Orders GROUP BY HOP(rowtime, INTERVAL '1' HOUR, INTERVAL '1' DAY), product");
 
 // compute SUM(amount) per session with 12 hour inactivity gap (in event-time)
-Table result4 = tableEnv.sql(
+Table result4 = tableEnv.sqlQuery(
   "SELECT user, " +
   "  SESSION_START(rowtime, INTERVAL '12' HOUR) AS sStart, " +
   "  SESSION_END(rowtime, INTERVAL '12' HOUR) AS snd, " +
@@ -686,7 +743,7 @@ val ds: DataStream[(Long, String, Int)] = env.addSource(...)
 tableEnv.registerDataStream("Orders", ds, 'user, 'product, 'amount, 'proctime.proctime, 'rowtime.rowtime)
 
 // compute SUM(amount) per day (in event-time)
-val result1 = tableEnv.sql(
+val result1 = tableEnv.sqlQuery(
     """
       |SELECT
       |  user,
@@ -697,15 +754,15 @@ val result1 = tableEnv.sql(
     """.stripMargin)
 
 // compute SUM(amount) per day (in processing-time)
-val result2 = tableEnv.sql(
+val result2 = tableEnv.sqlQuery(
   "SELECT user, SUM(amount) FROM Orders GROUP BY TUMBLE(proctime, INTERVAL '1' DAY), user")
 
 // compute every hour the SUM(amount) of the last 24 hours in event-time
-val result3 = tableEnv.sql(
+val result3 = tableEnv.sqlQuery(
   "SELECT product, SUM(amount) FROM Orders GROUP BY HOP(rowtime, INTERVAL '1' HOUR, INTERVAL '1' DAY), product")
 
 // compute SUM(amount) per session with 12 hour inactivity gap (in event-time)
-val result4 = tableEnv.sql(
+val result4 = tableEnv.sqlQuery(
     """
       |SELECT
       |  user,

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/docs/dev/table/tableApi.md
----------------------------------------------------------------------
diff --git a/docs/dev/table/tableApi.md b/docs/dev/table/tableApi.md
index fd57111..0a2acab 100644
--- a/docs/dev/table/tableApi.md
+++ b/docs/dev/table/tableApi.md
@@ -961,7 +961,52 @@ val result = left.select('a, 'b, 'c).where('a.in(right));
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left" style="width: 20%">Operators</th>
+      <th class="text-center">Description</th>
+    </tr>
+  </thead>
+  <tbody>
+    <tr>
+      <td>
+        <strong>Order By</strong><br>
+        <span class="label label-primary">Batch</span>
+      </td>
+      <td>
+        <p>Similar to a SQL ORDER BY clause. Returns records globally sorted across all parallel partitions.</p>
+{% highlight java %}
+Table in = tableEnv.fromDataSet(ds, "a, b, c");
+Table result = in.orderBy("a.asc");
+{% endhighlight %}
+      </td>
+    </tr>
+
+    <tr>
+      <td>
+        <strong>Limit</strong><br>
+        <span class="label label-primary">Batch</span>
+      </td>
+      <td>
+        <p>Similar to a SQL LIMIT clause. Limits a sorted result to a specified number of records from an offset position. Limit is technically part of the Order By operator and thus must be preceded by it.</p>
+{% highlight java %}
+Table in = tableEnv.fromDataSet(ds, "a, b, c");
+Table result = in.orderBy("a.asc").limit(3); // returns unlimited number of records beginning with the 4th record
+{% endhighlight %}
+or
+{% highlight java %}
+Table in = tableEnv.fromDataSet(ds, "a, b, c");
+Table result = in.orderBy("a.asc").limit(3, 5); // returns 5 records beginning with the 4th record
+{% endhighlight %}
+      </td>
+    </tr>
+
+  </tbody>
+</table>
+</div>
 
+<div data-lang="scala" markdown="1">
 <table class="table table-bordered">
   <thead>
     <tr>
@@ -970,7 +1015,7 @@ val result = left.select('a, 'b, 'c).where('a.in(right));
     </tr>
   </thead>
   <tbody>
-  	<tr>
+    <tr>
       <td>
         <strong>Order By</strong><br>
         <span class="label label-primary">Batch</span>
@@ -1005,9 +1050,13 @@ val result = in.orderBy('a.asc).limit(3, 5); // returns 5 records beginning with
 
   </tbody>
 </table>
-
 </div>
-<div data-lang="scala" markdown="1">
+</div>
+
+### Insert
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
 
 <table class="table table-bordered">
   <thead>
@@ -1017,35 +1066,49 @@ val result = in.orderBy('a.asc).limit(3, 5); // returns 5 records beginning with
     </tr>
   </thead>
   <tbody>
-  	<tr>
+    <tr>
       <td>
-        <strong>Order By</strong><br>
-        <span class="label label-primary">Batch</span>
+        <strong>Insert Into</strong><br>
+        <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span>
       </td>
       <td>
-        <p>Similar to a SQL ORDER BY clause. Returns records globally sorted across all parallel partitions.</p>
+        <p>Similar to the INSERT INTO clause in a SQL query. Performs a insertion into a registered output table.</p>
+
+        <p>Output tables must be registered in the TableEnvironment (see <a href="common.html#register-a-tablesink">Register a TableSink</a>). Moreover, the schema of the registered table must match the schema of the query.</p>
+
 {% highlight java %}
-Table in = tableEnv.fromDataSet(ds, "a, b, c");
-Table result = in.orderBy("a.asc");
+Table orders = tableEnv.scan("Orders");
+orders.insertInto("OutOrders");
 {% endhighlight %}
       </td>
     </tr>
 
+  </tbody>
+</table>
+</div>
+
+<div data-lang="scala" markdown="1">
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left" style="width: 20%">Operators</th>
+      <th class="text-center">Description</th>
+    </tr>
+  </thead>
+  <tbody>
     <tr>
       <td>
-        <strong>Limit</strong><br>
-        <span class="label label-primary">Batch</span>
+        <strong>Insert Into</strong><br>
+        <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span>
       </td>
       <td>
-        <p>Similar to a SQL LIMIT clause. Limits a sorted result to a specified number of records from an offset position. Limit is technically part of the Order By operator and thus must be preceded by it.</p>
-{% highlight java %}
-Table in = tableEnv.fromDataSet(ds, "a, b, c");
-Table result = in.orderBy("a.asc").limit(3); // returns unlimited number of records beginning with the 4th record
-{% endhighlight %}
-or
-{% highlight java %}
-Table in = tableEnv.fromDataSet(ds, "a, b, c");
-Table result = in.orderBy("a.asc").limit(3, 5); // returns 5 records beginning with the 4th record
+        <p>Similar to the INSERT INTO clause in a SQL query. Performs a insertion into a registered output table.</p>
+
+        <p>Output tables must be registered in the TableEnvironment (see <a href="common.html#register-a-tablesink">Register a TableSink</a>). Moreover, the schema of the registered table must match the schema of the query.</p>
+
+{% highlight scala %}
+val orders: Table = tableEnv.scan("Orders")
+orders.insertInto("OutOrders")
 {% endhighlight %}
       </td>
     </tr>
@@ -1055,6 +1118,8 @@ Table result = in.orderBy("a.asc").limit(3, 5); // returns 5 records beginning w
 </div>
 </div>
 
+{% top %}
+
 ### Group Windows
 
 Group window aggregates group rows into finite groups based on time or row-count intervals and evaluate aggregation functions once per group. For batch tables, windows are a convenient shortcut to group records by time intervals.

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/docs/dev/table/udfs.md
----------------------------------------------------------------------
diff --git a/docs/dev/table/udfs.md b/docs/dev/table/udfs.md
index 6c9bc1a..eef7db6 100644
--- a/docs/dev/table/udfs.md
+++ b/docs/dev/table/udfs.md
@@ -72,7 +72,7 @@ tableEnv.registerFunction("hashCode", new HashCode(10));
 myTable.select("string, string.hashCode(), hashCode(string)");
 
 // use the function in SQL API
-tableEnv.sql("SELECT string, HASHCODE(string) FROM MyTable");
+tableEnv.sqlQuery("SELECT string, HASHCODE(string) FROM MyTable");
 {% endhighlight %}
 </div>
 
@@ -93,7 +93,7 @@ myTable.select('string, hashCode('string))
 
 // register and use the function in SQL
 tableEnv.registerFunction("hashCode", new HashCode(10))
-tableEnv.sql("SELECT string, HASHCODE(string) FROM MyTable");
+tableEnv.sqlQuery("SELECT string, HASHCODE(string) FROM MyTable");
 {% endhighlight %}
 </div>
 </div>
@@ -176,9 +176,9 @@ myTable.leftOuterJoin("split(a) as (word, length)").select("a, word, length");
 
 // Use the table function in SQL with LATERAL and TABLE keywords.
 // CROSS JOIN a table function (equivalent to "join" in Table API).
-tableEnv.sql("SELECT a, word, length FROM MyTable, LATERAL TABLE(split(a)) as T(word, length)");
+tableEnv.sqlQuery("SELECT a, word, length FROM MyTable, LATERAL TABLE(split(a)) as T(word, length)");
 // LEFT JOIN a table function (equivalent to "leftOuterJoin" in Table API).
-tableEnv.sql("SELECT a, word, length FROM MyTable LEFT JOIN LATERAL TABLE(split(a)) as T(word, length) ON TRUE");
+tableEnv.sqlQuery("SELECT a, word, length FROM MyTable LEFT JOIN LATERAL TABLE(split(a)) as T(word, length) ON TRUE");
 {% endhighlight %}
 </div>
 
@@ -206,9 +206,9 @@ tableEnv.registerFunction("split", new Split("#"))
 
 // Use the table function in SQL with LATERAL and TABLE keywords.
 // CROSS JOIN a table function (equivalent to "join" in Table API)
-tableEnv.sql("SELECT a, word, length FROM MyTable, LATERAL TABLE(split(a)) as T(word, length)");
+tableEnv.sqlQuery("SELECT a, word, length FROM MyTable, LATERAL TABLE(split(a)) as T(word, length)");
 // LEFT JOIN a table function (equivalent to "leftOuterJoin" in Table API)
-tableEnv.sql("SELECT a, word, length FROM MyTable LEFT JOIN TABLE(split(a)) as T(word, length) ON TRUE");
+tableEnv.sqlQuery("SELECT a, word, length FROM MyTable LEFT JOIN TABLE(split(a)) as T(word, length) ON TRUE");
 {% endhighlight %}
 **IMPORTANT:** Do not implement TableFunction as a Scala object. Scala object is a singleton and will cause concurrency issues.
 </div>
@@ -572,7 +572,7 @@ StreamTableEnvironment tEnv = ...
 tEnv.registerFunction("wAvg", new WeightedAvg());
 
 // use function
-tEnv.sql("SELECT user, wAvg(points, level) AS avgPoints FROM userScores GROUP BY user");
+tEnv.sqlQuery("SELECT user, wAvg(points, level) AS avgPoints FROM userScores GROUP BY user");
 
 {% endhighlight %}
 </div>
@@ -649,7 +649,7 @@ val tEnv: StreamTableEnvironment = ???
 tEnv.registerFunction("wAvg", new WeightedAvg())
 
 // use function
-tEnv.sql("SELECT user, wAvg(points, level) AS avgPoints FROM userScores GROUP BY user")
+tEnv.sqlQuery("SELECT user, wAvg(points, level) AS avgPoints FROM userScores GROUP BY user")
 
 {% endhighlight %}
 </div>
@@ -720,7 +720,7 @@ tableEnv.registerFunction("hashCode", new HashCode());
 myTable.select("string, string.hashCode(), hashCode(string)");
 
 // use the function in SQL
-tableEnv.sql("SELECT string, HASHCODE(string) FROM MyTable");
+tableEnv.sqlQuery("SELECT string, HASHCODE(string) FROM MyTable");
 {% endhighlight %}
 </div>
 
@@ -748,7 +748,7 @@ myTable.select('string, hashCode('string))
 
 // register and use the function in SQL
 tableEnv.registerFunction("hashCode", hashCode)
-tableEnv.sql("SELECT string, HASHCODE(string) FROM MyTable");
+tableEnv.sqlQuery("SELECT string, HASHCODE(string) FROM MyTable");
 {% endhighlight %}
 
 </div>

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseConnectorITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseConnectorITCase.java b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseConnectorITCase.java
index 5d71ca5..3da4230 100644
--- a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseConnectorITCase.java
+++ b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseConnectorITCase.java
@@ -155,7 +155,7 @@ public class HBaseConnectorITCase extends HBaseTestingClusterAutostarter {
 		hbaseTable.addColumn(FAMILY3, F3COL3, String.class);
 		tableEnv.registerTableSource("hTable", hbaseTable);
 
-		Table result = tableEnv.sql(
+		Table result = tableEnv.sqlQuery(
 			"SELECT " +
 				"  h.family1.col1, " +
 				"  h.family2.col1, " +
@@ -196,7 +196,7 @@ public class HBaseConnectorITCase extends HBaseTestingClusterAutostarter {
 		hbaseTable.addColumn(FAMILY3, F3COL3, String.class);
 		tableEnv.registerTableSource("hTable", hbaseTable);
 
-		Table result = tableEnv.sql(
+		Table result = tableEnv.sqlQuery(
 			"SELECT " +
 				"  h.family1.col1, " +
 				"  h.family3.col1, " +
@@ -236,7 +236,7 @@ public class HBaseConnectorITCase extends HBaseTestingClusterAutostarter {
 		hbaseTable.addColumn(FAMILY3, F3COL3, String.class);
 		tableEnv.registerTableSource("hTable", hbaseTable);
 
-		Table result = tableEnv.sql(
+		Table result = tableEnv.sqlQuery(
 			"SELECT * FROM hTable AS h"
 		);
 		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
@@ -270,7 +270,7 @@ public class HBaseConnectorITCase extends HBaseTestingClusterAutostarter {
 		tableEnv.registerFunction("toUTF8", new ToUTF8());
 		tableEnv.registerFunction("toLong", new ToLong());
 
-		Table result = tableEnv.sql(
+		Table result = tableEnv.sqlQuery(
 			"SELECT " +
 				"  toUTF8(h.family2.col1), " +
 				"  toLong(h.family2.col2) " +

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/WordCountSQL.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/WordCountSQL.java b/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/WordCountSQL.java
index 65efc17..22f0553 100644
--- a/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/WordCountSQL.java
+++ b/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/WordCountSQL.java
@@ -53,7 +53,7 @@ public class WordCountSQL {
 		tEnv.registerDataSet("WordCount", input, "word, frequency");
 
 		// run a SQL query on the Table and retrieve the result as a new Table
-		Table table = tEnv.sql(
+		Table table = tEnv.sqlQuery(
 			"SELECT word, SUM(frequency) as frequency FROM WordCount GROUP BY word");
 
 		DataSet<WC> result = tEnv.toDataSet(table, WC.class);

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamSQLExample.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamSQLExample.scala b/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamSQLExample.scala
index 665913e..3297aec 100644
--- a/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamSQLExample.scala
+++ b/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamSQLExample.scala
@@ -58,7 +58,7 @@ object StreamSQLExample {
     tEnv.registerDataStream("OrderB", orderB, 'user, 'product, 'amount)
 
     // union the two tables
-    val result = tEnv.sql(
+    val result = tEnv.sqlQuery(
       "SELECT * FROM OrderA WHERE amount > 2 UNION ALL " +
         "SELECT * FROM OrderB WHERE amount < 2")
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/WordCountSQL.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/WordCountSQL.scala b/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/WordCountSQL.scala
index a8b8268..55bbdb5 100644
--- a/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/WordCountSQL.scala
+++ b/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/WordCountSQL.scala
@@ -48,7 +48,7 @@ object WordCountSQL {
     tEnv.registerDataSet("WordCount", input, 'word, 'frequency)
 
     // run a SQL query on the Table and retrieve the result as a new Table
-    val table = tEnv.sql("SELECT word, SUM(frequency) FROM WordCount GROUP BY word")
+    val table = tEnv.sqlQuery("SELECT word, SUM(frequency) FROM WordCount GROUP BY word")
 
     table.toDataSet[WC].print()
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
index a9d60dd..bca5826 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
@@ -36,7 +36,7 @@ import org.apache.flink.table.expressions.{Expression, TimeAttribute}
 import org.apache.flink.table.plan.nodes.FlinkConventions
 import org.apache.flink.table.plan.nodes.dataset.DataSetRel
 import org.apache.flink.table.plan.rules.FlinkRuleSets
-import org.apache.flink.table.plan.schema.{DataSetTable, RowSchema, TableSourceTable}
+import org.apache.flink.table.plan.schema.{DataSetTable, RowSchema, TableSinkTable, TableSourceTable}
 import org.apache.flink.table.runtime.MapRunner
 import org.apache.flink.table.sinks.{BatchTableSink, TableSink}
 import org.apache.flink.table.sources.{BatchTableSource, TableSource}
@@ -106,6 +106,54 @@ abstract class BatchTableEnvironment(
   }
 
   /**
+    * Registers an external [[TableSink]] with given field names and types in this
+    * [[TableEnvironment]]'s catalog.
+    * Registered sink tables can be referenced in SQL DML statements.
+    *
+    * Example:
+    *
+    * {{{
+    *   // create a table sink and its field names and types
+    *   val fieldNames: Array[String] = Array("a", "b", "c")
+    *   val fieldTypes: Array[TypeInformation[_]] = Array(Types.STRING, Types.INT, Types.LONG)
+    *   val tableSink: BatchTableSink = new YourTableSinkImpl(...)
+    *
+    *   // register the table sink in the catalog
+    *   tableEnv.registerTableSink("output_table", fieldNames, fieldsTypes, tableSink)
+    *
+    *   // use the registered sink
+    *   tableEnv.sqlUpdate("INSERT INTO output_table SELECT a, b, c FROM sourceTable")
+    * }}}
+    *
+    * @param name       The name under which the [[TableSink]] is registered.
+    * @param fieldNames The field names to register with the [[TableSink]].
+    * @param fieldTypes The field types to register with the [[TableSink]].
+    * @param tableSink  The [[TableSink]] to register.
+    */
+  def registerTableSink(
+      name: String,
+      fieldNames: Array[String],
+      fieldTypes: Array[TypeInformation[_]],
+      tableSink: TableSink[_]): Unit = {
+
+    checkValidTableName(name)
+    if (fieldNames == null) throw TableException("fieldNames must not be null.")
+    if (fieldTypes == null) throw TableException("fieldTypes must not be null.")
+    if (fieldNames.length == 0) throw new TableException("fieldNames must not be empty.")
+    if (fieldNames.length != fieldTypes.length) {
+      throw new TableException("Same number of field names and types required.")
+    }
+
+    tableSink match {
+      case batchTableSink: BatchTableSink[_] =>
+        val configuredSink = batchTableSink.configure(fieldNames, fieldTypes)
+        registerTableInternal(name, new TableSinkTable(configuredSink))
+      case _ =>
+        throw new TableException("Only BatchTableSink can be registered in BatchTableEnvironment.")
+    }
+  }
+
+  /**
     * Writes a [[Table]] to a [[TableSink]].
     *
     * Internally, the [[Table]] is translated into a [[DataSet]] and handed over to the

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
index 8d8cebb..c7cc61b 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
@@ -42,12 +42,12 @@ import org.apache.flink.table.expressions._
 import org.apache.flink.table.plan.nodes.FlinkConventions
 import org.apache.flink.table.plan.nodes.datastream.{DataStreamRel, UpdateAsRetractionTrait}
 import org.apache.flink.table.plan.rules.FlinkRuleSets
-import org.apache.flink.table.plan.schema.{DataStreamTable, RowSchema, StreamTableSourceTable}
 import org.apache.flink.table.plan.util.UpdatingPlanChecker
 import org.apache.flink.table.runtime.conversion._
+import org.apache.flink.table.plan.schema.{DataStreamTable, RowSchema, StreamTableSourceTable, TableSinkTable}
 import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
 import org.apache.flink.table.runtime.{CRowMapRunner, OutputRowtimeProcessFunction}
-import org.apache.flink.table.sinks.{AppendStreamTableSink, RetractStreamTableSink, TableSink, UpsertStreamTableSink}
+import org.apache.flink.table.sinks._
 import org.apache.flink.table.sources.{DefinedRowtimeAttribute, StreamTableSource, TableSource}
 import org.apache.flink.table.typeutils.{TimeIndicatorTypeInfo, TypeCheckUtils}
 
@@ -79,7 +79,7 @@ abstract class StreamTableEnvironment(
   // the naming pattern for internally registered tables.
   private val internalNamePattern = "^_DataStreamTable_[0-9]+$".r
 
-  def queryConfig: StreamQueryConfig = new StreamQueryConfig
+  override def queryConfig: StreamQueryConfig = new StreamQueryConfig
 
   /**
     * Checks if the chosen table name is valid.
@@ -130,6 +130,60 @@ abstract class StreamTableEnvironment(
   }
 
   /**
+    * Registers an external [[TableSink]] with given field names and types in this
+    * [[TableEnvironment]]'s catalog.
+    * Registered sink tables can be referenced in SQL DML statements.
+    *
+    * Example:
+    *
+    * {{{
+    *   // create a table sink and its field names and types
+    *   val fieldNames: Array[String] = Array("a", "b", "c")
+    *   val fieldTypes: Array[TypeInformation[_]] = Array(Types.STRING, Types.INT, Types.LONG)
+    *   val tableSink: StreamTableSink = new YourTableSinkImpl(...)
+    *
+    *   // register the table sink in the catalog
+    *   tableEnv.registerTableSink("output_table", fieldNames, fieldsTypes, tableSink)
+    *
+    *   // use the registered sink
+    *   tableEnv.sqlUpdate("INSERT INTO output_table SELECT a, b, c FROM sourceTable")
+    * }}}
+    *
+    * @param name The name under which the [[TableSink]] is registered.
+    * @param fieldNames The field names to register with the [[TableSink]].
+    * @param fieldTypes The field types to register with the [[TableSink]].
+    * @param tableSink The [[TableSink]] to register.
+    */
+  def registerTableSink(
+      name: String,
+      fieldNames: Array[String],
+      fieldTypes: Array[TypeInformation[_]],
+      tableSink: TableSink[_]): Unit = {
+
+    checkValidTableName(name)
+    if (fieldNames == null) throw TableException("fieldNames must not be null.")
+    if (fieldTypes == null) throw TableException("fieldTypes must not be null.")
+    if (fieldNames.length == 0) throw new TableException("fieldNames must not be empty.")
+    if (fieldNames.length != fieldTypes.length) {
+      throw new TableException("Same number of field names and types required.")
+    }
+
+    tableSink match {
+      case streamTableSink@(
+        _: AppendStreamTableSink[_] |
+        _: UpsertStreamTableSink[_] |
+        _: RetractStreamTableSink[_]) =>
+
+        val configuredSink = streamTableSink.configure(fieldNames, fieldTypes)
+        registerTableInternal(name, new TableSinkTable(configuredSink))
+      case _ =>
+        throw new TableException(
+          "Only AppendStreamTableSink, UpsertStreamTableSink, and RetractStreamTableSink can be " +
+            "registered in StreamTableEnvironment.")
+    }
+  }
+
+  /**
     * Writes a [[Table]] to a [[TableSink]].
     *
     * Internally, the [[Table]] is translated into a [[DataStream]] and handed over to the

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
index 2e9e18f..0424cf8 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
@@ -31,7 +31,7 @@ import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.schema.SchemaPlus
 import org.apache.calcite.schema.impl.AbstractTable
-import org.apache.calcite.sql.SqlOperatorTable
+import org.apache.calcite.sql._
 import org.apache.calcite.sql.parser.SqlParser
 import org.apache.calcite.sql.util.ChainedSqlOperatorTable
 import org.apache.calcite.tools._
@@ -49,13 +49,13 @@ import org.apache.flink.table.api.scala.{BatchTableEnvironment => ScalaBatchTabl
 import org.apache.flink.table.calcite.{FlinkPlannerImpl, FlinkRelBuilder, FlinkTypeFactory, FlinkTypeSystem}
 import org.apache.flink.table.catalog.{ExternalCatalog, ExternalCatalogSchema}
 import org.apache.flink.table.codegen.{ExpressionReducer, FunctionCodeGenerator, GeneratedFunction}
-import org.apache.flink.table.expressions.{Alias, Expression, UnresolvedFieldReference, _}
-import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils.{checkForInstantiation, checkNotSingleton, createScalarSqlFunction, createTableSqlFunctions, _}
+import org.apache.flink.table.expressions._
+import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils._
 import org.apache.flink.table.functions.{AggregateFunction, ScalarFunction, TableFunction}
 import org.apache.flink.table.plan.cost.DataSetCostFactory
 import org.apache.flink.table.plan.logical.{CatalogNode, LogicalRelNode}
 import org.apache.flink.table.plan.rules.FlinkRuleSets
-import org.apache.flink.table.plan.schema.{RelTable, RowSchema}
+import org.apache.flink.table.plan.schema.{RelTable, RowSchema, TableSinkTable}
 import org.apache.flink.table.sinks.TableSink
 import org.apache.flink.table.sources.{DefinedFieldNames, TableSource}
 import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo
@@ -110,6 +110,13 @@ abstract class TableEnvironment(val config: TableConfig) {
   /** Returns the table config to define the runtime behavior of the Table API. */
   def getConfig: TableConfig = config
 
+  /** Returns the [[QueryConfig]] depends on the concrete type of this TableEnvironment. */
+  private[flink] def queryConfig: QueryConfig = this match {
+    case _: BatchTableEnvironment => new BatchQueryConfig
+    case _: StreamTableEnvironment => new StreamQueryConfig
+    case _ => null
+  }
+
   /**
     * Returns the operator table for this environment including a custom Calcite configuration.
     */
@@ -276,7 +283,8 @@ abstract class TableEnvironment(val config: TableConfig) {
             s"${t.msg}\n" +
             s"Please check the documentation for the set of currently supported SQL features.")
       case a: AssertionError =>
-        throw a.getCause
+        // keep original exception stack for caller
+        throw a
     }
     output
   }
@@ -414,6 +422,22 @@ abstract class TableEnvironment(val config: TableConfig) {
   def registerTableSource(name: String, tableSource: TableSource[_]): Unit
 
   /**
+    * Registers an external [[TableSink]] with given field names and types in this
+    * [[TableEnvironment]]'s catalog.
+    * Registered sink tables can be referenced in SQL DML statements.
+    *
+    * @param name The name under which the [[TableSink]] is registered.
+    * @param fieldNames The field names to register with the [[TableSink]].
+    * @param fieldTypes The field types to register with the [[TableSink]].
+    * @param tableSink The [[TableSink]] to register.
+    */
+  def registerTableSink(
+      name: String,
+      fieldNames: Array[String],
+      fieldTypes: Array[TypeInformation[_]],
+      tableSink: TableSink[_]): Unit
+
+  /**
     * Replaces a registered Table with another Table under the same name.
     * We use this method to replace a [[org.apache.flink.table.plan.schema.DataStreamTable]]
     * with a [[org.apache.calcite.schema.TranslatableTable]].
@@ -489,9 +513,10 @@ abstract class TableEnvironment(val config: TableConfig) {
   /**
     * Evaluates a SQL query on registered tables and retrieves the result as a [[Table]].
     *
-    * All tables referenced by the query must be registered in the TableEnvironment. But
-    * [[Table.toString]] will automatically register an unique table name and return the
-    * table name. So it allows to call SQL directly on tables like this:
+    * All tables referenced by the query must be registered in the TableEnvironment.
+    * A [[Table]] is automatically registered when its [[toString]] method is called, for example
+    * when it is embedded into a String.
+    * Hence, SQL queries can directly reference a [[Table]] as follows:
     *
     * {{{
     *   val table: Table = ...
@@ -502,16 +527,110 @@ abstract class TableEnvironment(val config: TableConfig) {
     * @param query The SQL query to evaluate.
     * @return The result of the query as Table.
     */
+  @deprecated("Please use sqlQuery() instead.")
   def sql(query: String): Table = {
+    sqlQuery(query)
+  }
+
+  /**
+    * Evaluates a SQL query on registered tables and retrieves the result as a [[Table]].
+    *
+    * All tables referenced by the query must be registered in the TableEnvironment.
+    * A [[Table]] is automatically registered when its [[toString]] method is called, for example
+    * when it is embedded into a String.
+    * Hence, SQL queries can directly reference a [[Table]] as follows:
+    *
+    * {{{
+    *   val table: Table = ...
+    *   // the table is not registered to the table environment
+    *   tEnv.sqlQuery(s"SELECT * FROM $table")
+    * }}}
+    *
+    * @param query The SQL query to evaluate.
+    * @return The result of the query as Table
+    */
+  def sqlQuery(query: String): Table = {
     val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, getTypeFactory)
     // parse the sql query
     val parsed = planner.parse(query)
-    // validate the sql query
-    val validated = planner.validate(parsed)
-    // transform to a relational tree
-    val relational = planner.rel(validated)
+    if (null != parsed && parsed.getKind.belongsTo(SqlKind.QUERY)) {
+      // validate the sql query
+      val validated = planner.validate(parsed)
+      // transform to a relational tree
+      val relational = planner.rel(validated)
+      new Table(this, LogicalRelNode(relational.rel))
+    } else {
+      throw new TableException(
+        "Unsupported SQL query! sqlQuery() only accepts SQL queries of type " +
+          "SELECT, UNION, INTERSECT, EXCEPT, VALUES, and ORDER_BY.")
+    }
+  }
+
+  /**
+    * Evaluates a SQL statement such as INSERT, UPDATE or DELETE; or a DDL statement;
+    * NOTE: Currently only SQL INSERT statements are supported.
+    *
+    * All tables referenced by the query must be registered in the TableEnvironment.
+    * A [[Table]] is automatically registered when its [[toString]] method is called, for example
+    * when it is embedded into a String.
+    * Hence, SQL queries can directly reference a [[Table]] as follows:
+    *
+    * {{{
+    *   // register the table sink into which the result is inserted.
+    *   tEnv.registerTableSink("sinkTable", fieldNames, fieldsTypes, tableSink)
+    *   val sourceTable: Table = ...
+    *   // sourceTable is not registered to the table environment
+    *   tEnv.sqlUpdate(s"INSERT INTO sinkTable SELECT * FROM $sourceTable")
+    * }}}
+    *
+    * @param stmt The SQL statement to evaluate.
+    */
+  def sqlUpdate(stmt: String): Unit = {
+    sqlUpdate(stmt, this.queryConfig)
+  }
 
-    new Table(this, LogicalRelNode(relational.rel))
+  /**
+    * Evaluates a SQL statement such as INSERT, UPDATE or DELETE; or a DDL statement;
+    * NOTE: Currently only SQL INSERT statements are supported.
+    *
+    * All tables referenced by the query must be registered in the TableEnvironment.
+    * A [[Table]] is automatically registered when its [[toString]] method is called, for example
+    * when it is embedded into a String.
+    * Hence, SQL queries can directly reference a [[Table]] as follows:
+    *
+    * {{{
+    *   // register the table sink into which the result is inserted.
+    *   tEnv.registerTableSink("sinkTable", fieldNames, fieldsTypes, tableSink)
+    *   val sourceTable: Table = ...
+    *   // sourceTable is not registered to the table environment
+    *   tEnv.sqlUpdate(s"INSERT INTO sinkTable SELECT * FROM $sourceTable")
+    * }}}
+    *
+    * @param stmt The SQL statement to evaluate.
+    * @param config The [[QueryConfig]] to use.
+    */
+  def sqlUpdate(stmt: String, config: QueryConfig): Unit = {
+    val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, getTypeFactory)
+    // parse the sql query
+    val parsed = planner.parse(stmt)
+    parsed match {
+      case insert: SqlInsert =>
+        // validate the SQL query
+        val query = insert.getSource
+        planner.validate(query)
+
+        // get query result as Table
+        val queryResult = new Table(this, LogicalRelNode(planner.rel(query).rel))
+
+        // get name of sink table
+        val targetTableName = insert.getTargetTable.asInstanceOf[SqlIdentifier].names.get(0)
+
+        // insert query result into sink table
+        insertInto(queryResult, targetTableName, config)
+      case _ =>
+        throw new TableException(
+          "Unsupported SQL query! sqlUpdate() only accepts SQL statements of type INSERT.")
+    }
   }
 
   /**
@@ -519,11 +638,62 @@ abstract class TableEnvironment(val config: TableConfig) {
     *
     * @param table The [[Table]] to write.
     * @param sink The [[TableSink]] to write the [[Table]] to.
+    * @param conf The [[QueryConfig]] to use.
     * @tparam T The data type that the [[TableSink]] expects.
     */
   private[flink] def writeToSink[T](table: Table, sink: TableSink[T], conf: QueryConfig): Unit
 
   /**
+    * Writes the [[Table]] to a [[TableSink]] that was registered under the specified name.
+    *
+    * @param table The table to write to the TableSink.
+    * @param sinkTableName The name of the registered TableSink.
+    * @param conf The query configuration to use.
+    */
+  private[flink] def insertInto(table: Table, sinkTableName: String, conf: QueryConfig): Unit = {
+
+    // check that sink table exists
+    if (null == sinkTableName) throw TableException("Name of TableSink must not be null.")
+    if (sinkTableName.isEmpty) throw TableException("Name of TableSink must not be empty.")
+    if (!isRegistered(sinkTableName)) {
+      throw TableException(s"No table was registered under the name $sinkTableName.")
+    }
+
+    getTable(sinkTableName) match {
+      case s: TableSinkTable[_] =>
+        val tableSink = s.tableSink
+        // validate schema of source table and table sink
+        val srcFieldTypes = table.getSchema.getTypes
+        val sinkFieldTypes = tableSink.getFieldTypes
+
+        if (srcFieldTypes.length != sinkFieldTypes.length ||
+          srcFieldTypes.zip(sinkFieldTypes).exists{case (srcF, snkF) => srcF != snkF}) {
+
+          val srcFieldNames = table.getSchema.getColumnNames
+          val sinkFieldNames = tableSink.getFieldNames
+
+          // format table and table sink schema strings
+          val srcSchema = srcFieldNames.zip(srcFieldTypes)
+            .map{case (n, t) => s"$n: ${t.getTypeClass.getSimpleName}"}
+            .mkString("[", ", ", "]")
+          val sinkSchema = sinkFieldNames.zip(sinkFieldTypes)
+            .map{case (n, t) => s"$n: ${t.getTypeClass.getSimpleName}"}
+            .mkString("[", ", ", "]")
+
+          throw ValidationException(
+            s"Field types of query result and registered TableSink $sinkTableName do not match.\n" +
+              s"Query result schema: $srcSchema\n" +
+              s"TableSink schema:    $sinkSchema")
+        }
+        // emit the table to the configured table sink
+        writeToSink(table, tableSink, conf)
+      case _ =>
+        throw TableException(s"The table registered as $sinkTableName is not a TableSink. " +
+          s"You can only emit query results to a registered TableSink.")
+    }
+  }
+
+  /**
     * Registers a Calcite [[AbstractTable]] in the TableEnvironment's catalog.
     *
     * @param name The name under which the table will be registered.
@@ -554,10 +724,14 @@ abstract class TableEnvironment(val config: TableConfig) {
     * @param name The table name to check.
     * @return true, if a table is registered under the name, false otherwise.
     */
-  protected def isRegistered(name: String): Boolean = {
+  protected[flink] def isRegistered(name: String): Boolean = {
     rootSchema.getTableNames.contains(name)
   }
 
+  private def getTable(name: String): org.apache.calcite.schema.Table = {
+    rootSchema.getTable(name)
+  }
+
   protected def getRowType(name: String): RelDataType = {
     rootSchema.getTable(name).getRowType(typeFactory)
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/queryConfig.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/queryConfig.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/queryConfig.scala
index c8fbab7..4aa5543 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/queryConfig.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/queryConfig.scala
@@ -19,6 +19,7 @@
 package org.apache.flink.table.api
 
 import _root_.java.io.Serializable
+
 import org.apache.flink.api.common.time.Time
 
 class QueryConfig private[table] extends Serializable {}

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
index 2298575..30ed98e 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
@@ -25,6 +25,7 @@ import org.apache.flink.table.expressions.{Alias, Asc, Expression, ExpressionPar
 import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils
 import org.apache.flink.table.plan.ProjectionTranslator._
 import org.apache.flink.table.plan.logical.{Minus, _}
+import org.apache.flink.table.plan.schema.TableSinkTable
 import org.apache.flink.table.sinks.TableSink
 
 import _root_.scala.annotation.varargs
@@ -762,13 +763,10 @@ class Table(
     * @tparam T The data type that the [[TableSink]] expects.
     */
   def writeToSink[T](sink: TableSink[T]): Unit = {
-
-    def queryConfig = this.tableEnv match {
-      case s: StreamTableEnvironment => s.queryConfig
-      case b: BatchTableEnvironment => new BatchQueryConfig
-      case _ => null
+    val queryConfig = Option(this.tableEnv) match {
+      case None => null
+      case _ => this.tableEnv.queryConfig
     }
-
     writeToSink(sink, queryConfig)
   }
 
@@ -800,6 +798,37 @@ class Table(
   }
 
   /**
+    * Writes the [[Table]] to a [[TableSink]] that was registered under the specified name.
+    *
+    * A batch [[Table]] can only be written to a
+    * [[org.apache.flink.table.sinks.BatchTableSink]], a streaming [[Table]] requires a
+    * [[org.apache.flink.table.sinks.AppendStreamTableSink]], a
+    * [[org.apache.flink.table.sinks.RetractStreamTableSink]], or an
+    * [[org.apache.flink.table.sinks.UpsertStreamTableSink]].
+    *
+    * @param tableName Name of the registered [[TableSink]] to which the [[Table]] is written.
+    */
+  def insertInto(tableName: String): Unit = {
+    tableEnv.insertInto(this, tableName, this.tableEnv.queryConfig)
+  }
+
+  /**
+    * Writes the [[Table]] to a [[TableSink]] that was registered under the specified name.
+    *
+    * A batch [[Table]] can only be written to a
+    * [[org.apache.flink.table.sinks.BatchTableSink]], a streaming [[Table]] requires a
+    * [[org.apache.flink.table.sinks.AppendStreamTableSink]], a
+    * [[org.apache.flink.table.sinks.RetractStreamTableSink]], or an
+    * [[org.apache.flink.table.sinks.UpsertStreamTableSink]].
+    *
+    * @param tableName Name of the [[TableSink]] to which the [[Table]] is written.
+    * @param conf The [[QueryConfig]] to use.
+    */
+  def insertInto(tableName: String, conf: QueryConfig): Unit = {
+    tableEnv.insertInto(this, tableName, conf)
+  }
+
+  /**
     * Groups the records of a table by assigning them to windows defined by a time or row interval.
     *
     * For streaming tables of infinite size, grouping into windows is required to define finite

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala
index beb2436..4d9acaa 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala
@@ -57,7 +57,6 @@ class FlinkPlannerImpl(
   val defaultSchema: SchemaPlus = config.getDefaultSchema
 
   var validator: FlinkCalciteSqlValidator = _
-  var validatedSqlNode: SqlNode = _
   var root: RelRoot = _
 
   private def ready() {
@@ -85,16 +84,15 @@ class FlinkPlannerImpl(
     validator = new FlinkCalciteSqlValidator(operatorTable, createCatalogReader, typeFactory)
     validator.setIdentifierExpansion(true)
     try {
-      validatedSqlNode = validator.validate(sqlNode)
+      validator.validate(sqlNode)
     }
     catch {
       case e: RuntimeException =>
         throw new ValidationException(s"SQL validation failed. ${e.getMessage}", e)
     }
-    validatedSqlNode
   }
 
-  def rel(sql: SqlNode): RelRoot = {
+  def rel(validatedSqlNode: SqlNode): RelRoot = {
     try {
       assert(validatedSqlNode != null)
       val rexBuilder: RexBuilder = createRexBuilder

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSinkTable.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSinkTable.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSinkTable.scala
new file mode 100644
index 0000000..f5e80d5
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSinkTable.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.schema
+
+import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory}
+import org.apache.calcite.schema.Statistic
+import org.apache.calcite.schema.impl.AbstractTable
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.plan.stats.FlinkStatistic
+import org.apache.flink.table.sinks.TableSink
+
+/** Table which defines an external table via a [[TableSink]] */
+class TableSinkTable[T](
+    val tableSink: TableSink[T],
+    val statistic: FlinkStatistic = FlinkStatistic.UNKNOWN)
+  extends AbstractTable {
+
+  override def getRowType(typeFactory: RelDataTypeFactory): RelDataType = {
+    val flinkTypeFactory = typeFactory.asInstanceOf[FlinkTypeFactory]
+    flinkTypeFactory.buildLogicalRowType(tableSink.getFieldNames, tableSink.getFieldTypes)
+  }
+
+  /**
+    * Returns statistics of current table
+    *
+    * @return statistics of current table
+    */
+  override def getStatistic: Statistic = statistic
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/batch/JavaTableSourceITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/batch/JavaTableSourceITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/batch/JavaTableSourceITCase.java
index eb97afe..672b6fd 100644
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/batch/JavaTableSourceITCase.java
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/batch/JavaTableSourceITCase.java
@@ -80,7 +80,7 @@ public class JavaTableSourceITCase extends TableProgramsCollectionTestBase {
 		tableEnv.registerTableSource("persons", csvTable);
 
 		Table result = tableEnv
-			.sql("SELECT `last`, FLOOR(id), score * 2 FROM persons WHERE score < 20");
+			.sqlQuery("SELECT `last`, FLOOR(id), score * 2 FROM persons WHERE score < 20");
 
 		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
 		List<Row> results = resultSet.collect();

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/batch/sql/GroupingSetsITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/batch/sql/GroupingSetsITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/batch/sql/GroupingSetsITCase.java
index 3c8a1cc..455e8ce 100644
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/batch/sql/GroupingSetsITCase.java
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/batch/sql/GroupingSetsITCase.java
@@ -186,7 +186,7 @@ public class GroupingSetsITCase extends TableProgramsClusterTestBase {
 	 * @param expected Expected result.
 	 */
 	private void checkSql(String query, String expected) throws Exception {
-		Table resultTable = tableEnv.sql(query);
+		Table resultTable = tableEnv.sqlQuery(query);
 		DataSet<Row> resultDataSet = tableEnv.toDataSet(resultTable, Row.class);
 		List<Row> results = resultDataSet.collect();
 		TestBaseUtils.compareResultAsText(results, expected);
@@ -204,12 +204,12 @@ public class GroupingSetsITCase extends TableProgramsClusterTestBase {
 		};
 
 		// Execute first query and store results
-		Table resultTable1 = tableEnv.sql(query1);
+		Table resultTable1 = tableEnv.sqlQuery(query1);
 		DataSet<Row> resultDataSet1 = tableEnv.toDataSet(resultTable1, Row.class);
 		List<String> results1 = resultDataSet1.map(mapFunction).collect();
 
 		// Execute second query and store results
-		Table resultTable2 = tableEnv.sql(query2);
+		Table resultTable2 = tableEnv.sqlQuery(query2);
 		DataSet<Row> resultDataSet2 = tableEnv.toDataSet(resultTable2, Row.class);
 		List<String> results2 = resultDataSet2.map(mapFunction).collect();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/batch/sql/JavaSqlITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/batch/sql/JavaSqlITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/batch/sql/JavaSqlITCase.java
index c5a394a..f9693fd 100644
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/batch/sql/JavaSqlITCase.java
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/batch/sql/JavaSqlITCase.java
@@ -61,7 +61,7 @@ public class JavaSqlITCase extends TableProgramsCollectionTestBase {
 		String sqlQuery = "VALUES (1, 'Test', TRUE, DATE '1944-02-24', 12.4444444444444445)," +
 			"(2, 'Hello', TRUE, DATE '1944-02-24', 12.666666665)," +
 			"(3, 'World', FALSE, DATE '1944-12-24', 12.54444445)";
-		Table result = tableEnv.sql(sqlQuery);
+		Table result = tableEnv.sqlQuery(sqlQuery);
 
 		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
 
@@ -83,7 +83,7 @@ public class JavaSqlITCase extends TableProgramsCollectionTestBase {
 		tableEnv.registerTable("T", in);
 
 		String sqlQuery = "SELECT a, c FROM T";
-		Table result = tableEnv.sql(sqlQuery);
+		Table result = tableEnv.sqlQuery(sqlQuery);
 
 		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
 		List<Row> results = resultSet.collect();
@@ -106,7 +106,7 @@ public class JavaSqlITCase extends TableProgramsCollectionTestBase {
 		tableEnv.registerDataSet("DataSetTable", ds, "x, y, z");
 
 		String sqlQuery = "SELECT x FROM DataSetTable WHERE z LIKE '%Hello%'";
-		Table result = tableEnv.sql(sqlQuery);
+		Table result = tableEnv.sqlQuery(sqlQuery);
 
 		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
 		List<Row> results = resultSet.collect();
@@ -123,7 +123,7 @@ public class JavaSqlITCase extends TableProgramsCollectionTestBase {
 		tableEnv.registerDataSet("AggTable", ds, "x, y, z");
 
 		String sqlQuery = "SELECT sum(x), min(x), max(x), count(y), avg(x) FROM AggTable";
-		Table result = tableEnv.sql(sqlQuery);
+		Table result = tableEnv.sqlQuery(sqlQuery);
 
 		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
 		List<Row> results = resultSet.collect();
@@ -143,7 +143,7 @@ public class JavaSqlITCase extends TableProgramsCollectionTestBase {
 		tableEnv.registerDataSet("t2", ds2, "d, e, f, g, h");
 
 		String sqlQuery = "SELECT c, g FROM t1, t2 WHERE b = e";
-		Table result = tableEnv.sql(sqlQuery);
+		Table result = tableEnv.sqlQuery(sqlQuery);
 
 		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
 		List<Row> results = resultSet.collect();
@@ -168,7 +168,7 @@ public class JavaSqlITCase extends TableProgramsCollectionTestBase {
 		tableEnv.registerDataSet("t1", ds1, "a, b");
 
 		String sqlQuery = "SELECT b['foo'] FROM t1";
-		Table result = tableEnv.sql(sqlQuery);
+		Table result = tableEnv.sqlQuery(sqlQuery);
 
 		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
 		List<Row> results = resultSet.collect();

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/stream/sql/JavaSqlITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/stream/sql/JavaSqlITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/stream/sql/JavaSqlITCase.java
index c6368d4..f3d0309 100644
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/stream/sql/JavaSqlITCase.java
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/stream/sql/JavaSqlITCase.java
@@ -68,7 +68,7 @@ public class JavaSqlITCase extends StreamingMultipleProgramsTestBase {
 		tableEnv.registerTable("MyTableRow", in);
 
 		String sqlQuery = "SELECT a,c FROM MyTableRow";
-		Table result = tableEnv.sql(sqlQuery);
+		Table result = tableEnv.sqlQuery(sqlQuery);
 
 		DataStream<Row> resultSet = tableEnv.toAppendStream(result, Row.class);
 		resultSet.addSink(new StreamITCase.StringSink<Row>());
@@ -93,7 +93,7 @@ public class JavaSqlITCase extends StreamingMultipleProgramsTestBase {
 		tableEnv.registerTable("MyTable", in);
 
 		String sqlQuery = "SELECT * FROM MyTable";
-		Table result = tableEnv.sql(sqlQuery);
+		Table result = tableEnv.sqlQuery(sqlQuery);
 
 		DataStream<Row> resultSet = tableEnv.toAppendStream(result, Row.class);
 		resultSet.addSink(new StreamITCase.StringSink<Row>());
@@ -117,7 +117,7 @@ public class JavaSqlITCase extends StreamingMultipleProgramsTestBase {
 		tableEnv.registerDataStream("MyTable", ds, "a, b, c, d, e");
 
 		String sqlQuery = "SELECT a, b, e FROM MyTable WHERE c < 4";
-		Table result = tableEnv.sql(sqlQuery);
+		Table result = tableEnv.sqlQuery(sqlQuery);
 
 		DataStream<Row> resultSet = tableEnv.toAppendStream(result, Row.class);
 		resultSet.addSink(new StreamITCase.StringSink<Row>());
@@ -148,7 +148,7 @@ public class JavaSqlITCase extends StreamingMultipleProgramsTestBase {
 		String sqlQuery = "SELECT * FROM T1 " +
 							"UNION ALL " +
 							"(SELECT a, b, c FROM T2 WHERE a	< 3)";
-		Table result = tableEnv.sql(sqlQuery);
+		Table result = tableEnv.sqlQuery(sqlQuery);
 
 		DataStream<Row> resultSet = tableEnv.toAppendStream(result, Row.class);
 		resultSet.addSink(new StreamITCase.StringSink<Row>());

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/BatchTableEnvironmentTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/BatchTableEnvironmentTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/BatchTableEnvironmentTest.scala
index c9a7049..dde5569 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/BatchTableEnvironmentTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/BatchTableEnvironmentTest.scala
@@ -31,7 +31,7 @@ class BatchTableEnvironmentTest extends TableTestBase {
     val util = batchTestUtil()
     val table = util.addTable[(Long, Int, String)]("tableName", 'a, 'b, 'c)
 
-    val sqlTable = util.tableEnv.sql(s"SELECT a, b, c FROM $table WHERE b > 12")
+    val sqlTable = util.tableEnv.sqlQuery(s"SELECT a, b, c FROM $table WHERE b > 12")
 
     val expected = unaryNode(
       "DataSetCalc",
@@ -43,7 +43,7 @@ class BatchTableEnvironmentTest extends TableTestBase {
 
     val table2 = util.addTable[(Long, Int, String)]('d, 'e, 'f)
 
-    val sqlTable2 = util.tableEnv.sql(s"SELECT d, e, f FROM $table, $table2 WHERE c = d")
+    val sqlTable2 = util.tableEnv.sqlQuery(s"SELECT d, e, f FROM $table, $table2 WHERE c = d")
 
     val join = unaryNode(
       "DataSetJoin",

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/CalcValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/CalcValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/CalcValidationTest.scala
index 9aada9a..69b12b2 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/CalcValidationTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/CalcValidationTest.scala
@@ -34,6 +34,6 @@ class CalcValidationTest extends TableTestBase {
 
     val sqlQuery = "SELECT a, foo FROM MyTable"
 
-    util.tableEnv.sql(sqlQuery)
+    util.tableEnv.sqlQuery(sqlQuery)
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/InsertIntoValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/InsertIntoValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/InsertIntoValidationTest.scala
new file mode 100644
index 0000000..ef9e6a3
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/InsertIntoValidationTest.scala
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.batch.sql.validation
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.{Types, ValidationException}
+import org.apache.flink.table.utils.{MemoryTableSinkUtil, TableTestBase}
+import org.junit._
+
+class InsertIntoValidationTest extends TableTestBase {
+
+  @Test(expected = classOf[ValidationException])
+  def testInconsistentLengthInsert(): Unit = {
+    val util = batchTestUtil()
+    util.addTable[(Int, Long, String)]("sourceTable", 'a, 'b, 'c)
+
+    val fieldNames = Array("d", "e")
+    val fieldTypes: Array[TypeInformation[_]] = Array(Types.INT, Types.LONG)
+    val sink = new MemoryTableSinkUtil.UnsafeMemoryAppendTableSink
+    util.tableEnv.registerTableSink("targetTable", fieldNames, fieldTypes, sink)
+
+    val sql = "INSERT INTO targetTable SELECT a, b, c FROM sourceTable"
+
+    // must fail because table sink schema has too few fields
+    util.tableEnv.sqlUpdate(sql)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testUnmatchedTypesInsert(): Unit = {
+    val util = batchTestUtil()
+    util.addTable[(Int, Long, String)]("sourceTable", 'a, 'b, 'c)
+
+    val fieldNames = Array("d", "e", "f")
+    val fieldTypes: Array[TypeInformation[_]] = Array(Types.STRING, Types.INT, Types.LONG)
+    val sink = new MemoryTableSinkUtil.UnsafeMemoryAppendTableSink
+    util.tableEnv.registerTableSink("targetTable", fieldNames, fieldTypes, sink)
+
+    val sql = "INSERT INTO targetTable SELECT a, b, c FROM sourceTable"
+
+    // must fail because types of table sink do not match query result
+    util.tableEnv.sqlUpdate(sql)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testUnsupportedPartialInsert(): Unit = {
+    val util = batchTestUtil()
+    util.addTable[(Int, Long, String)]("sourceTable", 'a, 'b, 'c)
+
+    val fieldNames = Array("d", "e", "f")
+    val fieldTypes = util.tableEnv.scan("sourceTable").getSchema.getTypes
+    val sink = new MemoryTableSinkUtil.UnsafeMemoryAppendTableSink
+    util.tableEnv.registerTableSink("targetTable", fieldNames, fieldTypes, sink)
+
+    val sql = "INSERT INTO targetTable (d, f) SELECT a, c FROM sourceTable"
+
+    // must fail because partial insert is not supported yet.
+    util.tableEnv.sqlUpdate(sql, util.tableEnv.queryConfig)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/JoinValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/JoinValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/JoinValidationTest.scala
index 90bcfec..d9e0e10 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/JoinValidationTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/JoinValidationTest.scala
@@ -35,7 +35,7 @@ class JoinValidationTest extends TableTestBase {
 
     val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE foo = e"
 
-    util.tableEnv.sql(sqlQuery)
+    util.tableEnv.sqlQuery(sqlQuery)
   }
 
   @Test(expected = classOf[TableException])
@@ -46,7 +46,7 @@ class JoinValidationTest extends TableTestBase {
 
     val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE a = g"
 
-    util.tableEnv.sql(sqlQuery).toDataSet[Row]
+    util.tableEnv.sqlQuery(sqlQuery).toDataSet[Row]
   }
 
   @Test(expected = classOf[ValidationException])
@@ -57,7 +57,7 @@ class JoinValidationTest extends TableTestBase {
 
     val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE a = d"
 
-    util.tableEnv.sql(sqlQuery).toDataSet[Row]
+    util.tableEnv.sqlQuery(sqlQuery).toDataSet[Row]
   }
 
   @Test(expected = classOf[TableException])
@@ -68,7 +68,7 @@ class JoinValidationTest extends TableTestBase {
 
     val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE d = f"
 
-    util.tableEnv.sql(sqlQuery).toDataSet[Row]
+    util.tableEnv.sqlQuery(sqlQuery).toDataSet[Row]
   }
 
   @Test(expected = classOf[TableException])
@@ -79,7 +79,7 @@ class JoinValidationTest extends TableTestBase {
 
     val sqlQuery = "SELECT a, a1 FROM Table3 CROSS JOIN Table4"
 
-    util.tableEnv.sql(sqlQuery).toDataSet[Row]
+    util.tableEnv.sqlQuery(sqlQuery).toDataSet[Row]
   }
 
   @Test(expected = classOf[TableException])
@@ -90,7 +90,7 @@ class JoinValidationTest extends TableTestBase {
 
     val sqlQuery = "SELECT c, g FROM Table3 RIGHT OUTER JOIN Table5 ON b = e and a > d"
 
-    util.tableEnv.sql(sqlQuery).toDataSet[Row]
+    util.tableEnv.sqlQuery(sqlQuery).toDataSet[Row]
   }
 
   @Test(expected = classOf[TableException])
@@ -101,7 +101,7 @@ class JoinValidationTest extends TableTestBase {
 
     val sqlQuery = "SELECT c, g FROM Table3 LEFT OUTER JOIN Table5 ON b = e and a > d"
 
-    util.tableEnv.sql(sqlQuery).toDataSet[Row]
+    util.tableEnv.sqlQuery(sqlQuery).toDataSet[Row]
   }
 
   @Test(expected = classOf[TableException])
@@ -112,7 +112,7 @@ class JoinValidationTest extends TableTestBase {
 
     val sqlQuery = "SELECT c, g FROM Table3 FULL OUTER JOIN Table5 ON b = e and a > d"
 
-    util.tableEnv.sql(sqlQuery).toDataSet[Row]
+    util.tableEnv.sqlQuery(sqlQuery).toDataSet[Row]
   }
 
   @Test(expected = classOf[TableException])
@@ -123,7 +123,7 @@ class JoinValidationTest extends TableTestBase {
 
     val sqlQuery = "SELECT c, g FROM Table3 RIGHT OUTER JOIN Table5 ON b = e and e > 3"
 
-    util.tableEnv.sql(sqlQuery).toDataSet[Row]
+    util.tableEnv.sqlQuery(sqlQuery).toDataSet[Row]
   }
 
   @Test(expected = classOf[TableException])
@@ -134,7 +134,7 @@ class JoinValidationTest extends TableTestBase {
 
     val sqlQuery = "SELECT c, g FROM Table3 LEFT OUTER JOIN Table5 ON b = e and b > 3"
 
-    util.tableEnv.sql(sqlQuery).toDataSet[Row]
+    util.tableEnv.sqlQuery(sqlQuery).toDataSet[Row]
   }
 
   @Test(expected = classOf[TableException])
@@ -145,6 +145,6 @@ class JoinValidationTest extends TableTestBase {
 
     val sqlQuery = "SELECT c, g FROM Table3 FULL OUTER JOIN Table5 ON b = e and b > 3"
 
-    util.tableEnv.sql(sqlQuery).toDataSet[Row]
+    util.tableEnv.sqlQuery(sqlQuery).toDataSet[Row]
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/OverWindowValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/OverWindowValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/OverWindowValidationTest.scala
index 7e72a21..dfbdd5a 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/OverWindowValidationTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/OverWindowValidationTest.scala
@@ -41,7 +41,7 @@ class OverWindowValidationTest extends TableTestBase {
     util.addFunction("overAgg", new OverAgg0)
 
     val sqlQuery = "SELECT overAgg(b, a) FROM T"
-    util.tableEnv.sql(sqlQuery)
+    util.tableEnv.sqlQuery(sqlQuery)
   }
 
   /**
@@ -55,6 +55,6 @@ class OverWindowValidationTest extends TableTestBase {
 
     val sqlQuery = "SELECT overAgg(b, a) FROM T GROUP BY TUMBLE(ts, INTERVAL '2' HOUR)"
 
-    util.tableEnv.sql(sqlQuery)
+    util.tableEnv.sqlQuery(sqlQuery)
   }
 }


[2/4] flink git commit: [FLINK-6442] [table] Add registration for TableSinks and INSERT INTO support for SQL and Table API.

Posted by fh...@apache.org.
http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/SortValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/SortValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/SortValidationTest.scala
index d3f9b9f..cfc8067 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/SortValidationTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/SortValidationTest.scala
@@ -34,6 +34,6 @@ class SortValidationTest extends TableTestBase {
 
     val sqlQuery = "SELECT * FROM MyTable LIMIT 5"
 
-    util.tableEnv.sql(sqlQuery).toDataSet[Row]
+    util.tableEnv.sqlQuery(sqlQuery).toDataSet[Row]
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/validation/InsertIntoValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/validation/InsertIntoValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/validation/InsertIntoValidationTest.scala
new file mode 100644
index 0000000..2cfe931
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/validation/InsertIntoValidationTest.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.batch.table.validation
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.{Types, ValidationException}
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.utils.{MemoryTableSinkUtil, TableTestBase}
+import org.junit._
+
+class InsertIntoValidationTest extends TableTestBase {
+
+  @Test(expected = classOf[ValidationException])
+  def testInconsistentLengthInsert(): Unit = {
+    val util = batchTestUtil()
+    util.addTable[(Int, Long, String)]("sourceTable", 'a, 'b, 'c)
+
+    val fieldNames = Array("d", "e")
+    val fieldTypes: Array[TypeInformation[_]] = Array(Types.INT, Types.LONG)
+    val sink = new MemoryTableSinkUtil.UnsafeMemoryAppendTableSink
+    util.tableEnv.registerTableSink("targetTable", fieldNames, fieldTypes, sink)
+
+    // must fail because TableSink accepts fewer fields.
+    util.tableEnv.scan("sourceTable")
+      .select('a, 'b, 'c)
+      .insertInto("targetTable")
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testUnmatchedTypesInsert(): Unit = {
+    val util = batchTestUtil()
+    util.addTable[(Int, Long, String)]("sourceTable", 'a, 'b, 'c)
+
+    val fieldNames = Array("d", "e", "f")
+    val fieldTypes: Array[TypeInformation[_]] = Array(Types.STRING, Types.INT, Types.LONG)
+    val sink = new MemoryTableSinkUtil.UnsafeMemoryAppendTableSink
+    util.tableEnv.registerTableSink("targetTable", fieldNames, fieldTypes, sink)
+
+    // must fail because types of result and TableSink do not match.
+    util.tableEnv.scan("sourceTable")
+      .select('a, 'b, 'c)
+      .insertInto("targetTable")
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala
index 0943ea6..1b99679 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala
@@ -41,7 +41,7 @@ class StreamTableEnvironmentTest extends TableTestBase {
     val util = streamTestUtil()
     val table = util.addTable[(Long, Int, String)]("tableName", 'a, 'b, 'c)
 
-    val sqlTable = util.tableEnv.sql(s"SELECT a, b, c FROM $table WHERE b > 12")
+    val sqlTable = util.tableEnv.sqlQuery(s"SELECT a, b, c FROM $table WHERE b > 12")
 
     val expected = unaryNode(
       "DataStreamCalc",
@@ -53,7 +53,7 @@ class StreamTableEnvironmentTest extends TableTestBase {
 
     val table2 = util.addTable[(Long, Int, String)]('d, 'e, 'f)
 
-    val sqlTable2 = util.tableEnv.sql(s"SELECT d, e, f FROM $table2 " +
+    val sqlTable2 = util.tableEnv.sqlQuery(s"SELECT d, e, f FROM $table2 " +
         s"UNION ALL SELECT a, b, c FROM $table")
 
     val expected2 = binaryNode(

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala
index 640fd26..e066fe4 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala
@@ -206,7 +206,7 @@ class JoinTest extends TableTestBase {
     val query =
       "SELECT t1.a, t2.b FROM MyTable as t1 join MyTable2 as t2 on t1.a = t2.a and " + timeSql
 
-    val resultTable = streamUtil.tableEnv.sql(query)
+    val resultTable = streamUtil.tableEnv.sqlQuery(query)
     val relNode = resultTable.getRelNode
     val joinNode = relNode.getInput(0).asInstanceOf[LogicalJoin]
     val rexNode = joinNode.getCondition
@@ -230,7 +230,7 @@ class JoinTest extends TableTestBase {
       query: String,
       expectCondStr: String): Unit = {
 
-    val resultTable = streamUtil.tableEnv.sql(query)
+    val resultTable = streamUtil.tableEnv.sqlQuery(query)
     val relNode = resultTable.getRelNode
     val joinNode = relNode.getInput(0).asInstanceOf[LogicalJoin]
     val joinInfo = joinNode.analyzeCondition

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/InsertIntoValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/InsertIntoValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/InsertIntoValidationTest.scala
new file mode 100644
index 0000000..3045100
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/InsertIntoValidationTest.scala
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.stream.sql.validation
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.{TableEnvironment, Types, ValidationException}
+import org.apache.flink.table.runtime.utils.StreamTestData
+import org.apache.flink.table.utils.MemoryTableSinkUtil
+import org.junit.Test
+
+class InsertIntoValidationTest {
+
+  @Test(expected = classOf[ValidationException])
+  def testInconsistentLengthInsert(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val t = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
+    tEnv.registerTable("sourceTable", t)
+
+    val fieldNames = Array("d", "e")
+    val fieldTypes: Array[TypeInformation[_]] = Array(Types.INT, Types.LONG)
+    val sink = new MemoryTableSinkUtil.UnsafeMemoryAppendTableSink
+    tEnv.registerTableSink("targetTable", fieldNames, fieldTypes, sink)
+
+    val sql = "INSERT INTO targetTable SELECT a, b, c FROM sourceTable"
+
+    // must fail because table sink has too few fields.
+    tEnv.sqlUpdate(sql)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testUnmatchedTypesInsert(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val t = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
+    tEnv.registerTable("sourceTable", t)
+
+    val fieldNames = Array("d", "e", "f")
+    val fieldTypes: Array[TypeInformation[_]] = Array(Types.STRING, Types.INT, Types.LONG)
+    val sink = new MemoryTableSinkUtil.UnsafeMemoryAppendTableSink
+    tEnv.registerTableSink("targetTable", fieldNames, fieldTypes, sink)
+
+    val sql = "INSERT INTO targetTable SELECT a, b, c FROM sourceTable"
+
+    // must fail because field types of table sink are incompatible.
+    tEnv.sqlUpdate(sql)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testUnsupportedPartialInsert(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val t = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
+    tEnv.registerTable("sourceTable", t)
+
+    val fieldNames = Array("d", "e", "f")
+    val fieldTypes = tEnv.scan("sourceTable").getSchema.getTypes
+    val sink = new MemoryTableSinkUtil.UnsafeMemoryAppendTableSink
+    tEnv.registerTableSink("targetTable", fieldNames, fieldTypes, sink)
+
+    val sql = "INSERT INTO targetTable (d, f) SELECT a, c FROM sourceTable"
+
+    // must fail because we don't support partial insert yet.
+    tEnv.sqlUpdate(sql)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/OverWindowValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/OverWindowValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/OverWindowValidationTest.scala
index 413cca7..d04b6d0 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/OverWindowValidationTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/OverWindowValidationTest.scala
@@ -43,7 +43,7 @@ class OverWindowValidationTest extends TableTestBase {
       "sum(a) OVER (PARTITION BY b ORDER BY proctime RANGE UNBOUNDED preceding) " +
       "from T1"
 
-    streamUtil.tableEnv.sql(sqlQuery).toAppendStream[Row]
+    streamUtil.tableEnv.sqlQuery(sqlQuery).toAppendStream[Row]
   }
 
   /**
@@ -55,7 +55,7 @@ class OverWindowValidationTest extends TableTestBase {
 
     val sqlQuery = "SELECT overAgg(c, a) FROM MyTable"
 
-    streamUtil.tableEnv.sql(sqlQuery)
+    streamUtil.tableEnv.sqlQuery(sqlQuery)
   }
 
   /**
@@ -66,6 +66,6 @@ class OverWindowValidationTest extends TableTestBase {
     streamUtil.addFunction("overAgg", new OverAgg0)
 
     val sqlQuery = "SELECT overAgg(c, a) FROM MyTable"
-    streamUtil.tableEnv.sql(sqlQuery)
+    streamUtil.tableEnv.sqlQuery(sqlQuery)
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/CorrelateValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/CorrelateValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/CorrelateValidationTest.scala
index dbc7d46..f58feed 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/CorrelateValidationTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/CorrelateValidationTest.scala
@@ -145,7 +145,7 @@ class CorrelateValidationTest extends TableTestBase {
       ), "Undefined function: NONEXIST")
     // SQL API call
     expectExceptionThrown(
-      util.tableEnv.sql("SELECT * FROM MyTable, LATERAL TABLE(nonexist(a))"),
+      util.tableEnv.sqlQuery("SELECT * FROM MyTable, LATERAL TABLE(nonexist(a))"),
       "No match found for function signature nonexist(<NUMERIC>)")
 
 
@@ -160,7 +160,7 @@ class CorrelateValidationTest extends TableTestBase {
     // SQL API call
     // NOTE: it doesn't throw an exception but an AssertionError, maybe a Calcite bug
     expectExceptionThrown(
-      util.tableEnv.sql("SELECT * FROM MyTable, LATERAL TABLE(func0(a))"),
+      util.tableEnv.sqlQuery("SELECT * FROM MyTable, LATERAL TABLE(func0(a))"),
       null,
       classOf[AssertionError])
 
@@ -172,7 +172,7 @@ class CorrelateValidationTest extends TableTestBase {
       "Given parameters of function 'FUNC2' do not match any signature")
     // SQL API call
     expectExceptionThrown(
-      util.tableEnv.sql("SELECT * FROM MyTable, LATERAL TABLE(func2(c, c))"),
+      util.tableEnv.sqlQuery("SELECT * FROM MyTable, LATERAL TABLE(func2(c, c))"),
       "No match found for function signature func2(<CHARACTER>, <CHARACTER>)")
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/InsertIntoValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/InsertIntoValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/InsertIntoValidationTest.scala
new file mode 100644
index 0000000..2fcfd6c
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/InsertIntoValidationTest.scala
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.stream.table.validation
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.{TableEnvironment, Types, ValidationException}
+import org.apache.flink.table.runtime.utils.StreamTestData
+import org.apache.flink.table.utils.MemoryTableSinkUtil
+import org.junit.Test
+
+class InsertIntoValidationTest {
+
+  @Test(expected = classOf[ValidationException])
+  def testInconsistentLengthInsert(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val t = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
+    tEnv.registerTable("sourceTable", t)
+
+    val fieldNames = Array("d", "f")
+    val fieldTypes: Array[TypeInformation[_]] = Array(Types.INT, Types.LONG)
+    val sink = new MemoryTableSinkUtil.UnsafeMemoryAppendTableSink
+    tEnv.registerTableSink("targetTable", fieldNames, fieldTypes, sink)
+
+    // must fail because table sink has too few fields.
+    tEnv.scan("sourceTable")
+      .select('a, 'b, 'c)
+      .insertInto("targetTable")
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testUnmatchedTypesInsert(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val t = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
+    tEnv.registerTable("sourceTable", t)
+
+    val fieldNames = Array("d", "e", "f")
+    val fieldTypes: Array[TypeInformation[_]] = Array(Types.STRING, Types.INT, Types.LONG)
+    val sink = new MemoryTableSinkUtil.UnsafeMemoryAppendTableSink
+    tEnv.registerTableSink("targetTable", fieldNames, fieldTypes, sink)
+
+    // must fail because field types of table sink are incompatible.
+    tEnv.scan("sourceTable")
+      .select('a, 'b, 'c)
+      .insertInto("targetTable")
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/validation/TableSinksValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/validation/TableSinksValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/validation/TableSinksValidationTest.scala
index ab87cd3..628925b 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/validation/TableSinksValidationTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/validation/TableSinksValidationTest.scala
@@ -18,10 +18,12 @@
 
 package org.apache.flink.table.api.validation
 
+import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.scala._
-import org.apache.flink.table.api.TableException
+import org.apache.flink.table.api.{TableException, Types}
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.runtime.stream.table.TestAppendSink
+import org.apache.flink.table.utils.MemoryTableSinkUtil.UnsafeMemoryAppendTableSink
 import org.apache.flink.table.utils.TableTestBase
 import org.junit.Test
 
@@ -39,4 +41,27 @@ class TableSinksValidationTest extends TableTestBase {
     .writeToSink(new TestAppendSink)
   }
 
+  @Test(expected = classOf[TableException])
+  def testSinkTableRegistrationUsingExistedTableName(): Unit = {
+    val util = streamTestUtil()
+    util.addTable[(Int, String)]("TargetTable", 'id, 'text)
+
+    val fieldNames = Array("a", "b", "c")
+    val fieldTypes: Array[TypeInformation[_]] = Array(Types.STRING, Types.INT, Types.LONG)
+    // table name already registered
+    util.tableEnv
+      .registerTableSink("TargetTable", fieldNames, fieldTypes, new UnsafeMemoryAppendTableSink)
+  }
+
+  @Test(expected = classOf[TableException])
+  def testRegistrationWithInconsistentFieldNamesAndTypesLength(): Unit = {
+    val util = streamTestUtil()
+
+    // inconsistent length of field names and types
+    val fieldNames = Array("a", "b", "c")
+    val fieldTypes: Array[TypeInformation[_]] = Array(Types.STRING, Types.LONG)
+
+    util.tableEnv
+      .registerTableSink("TargetTable", fieldNames, fieldTypes, new UnsafeMemoryAppendTableSink)
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ExpressionTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ExpressionTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ExpressionTestBase.scala
index 44842f7..dd6e00e 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ExpressionTestBase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ExpressionTestBase.scala
@@ -23,7 +23,6 @@ import java.util.concurrent.Future
 
 import com.google.common.collect.ImmutableList
 import org.apache.calcite.plan.hep.{HepMatchOrder, HepPlanner, HepProgramBuilder}
-import org.apache.calcite.rel.logical.LogicalTableScan
 import org.apache.calcite.rex.RexNode
 import org.apache.calcite.sql.`type`.SqlTypeName._
 import org.apache.calcite.sql2rel.RelDecorrelator

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/ExpressionReductionRulesTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/ExpressionReductionRulesTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/ExpressionReductionRulesTest.scala
index a15f1d1..b4ad9ca 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/ExpressionReductionRulesTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/ExpressionReductionRulesTest.scala
@@ -430,7 +430,7 @@ class ExpressionReductionRulesTest extends TableTestBase {
 
     util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
 
-    val newTable = util.tableEnv.sql("SELECT 1 + 1 + a AS a FROM MyTable")
+    val newTable = util.tableEnv.sqlQuery("SELECT 1 + 1 + a AS a FROM MyTable")
 
     util.tableEnv.registerTable("NewTable", newTable)
 
@@ -448,7 +448,7 @@ class ExpressionReductionRulesTest extends TableTestBase {
 
     util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
 
-    val newTable = util.tableEnv.sql("SELECT 1 + 1 + a AS a FROM MyTable")
+    val newTable = util.tableEnv.sqlQuery("SELECT 1 + 1 + a AS a FROM MyTable")
 
     util.tableEnv.registerTable("NewTable", newTable)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/RetractionRulesTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/RetractionRulesTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/RetractionRulesTest.scala
index 535bbf5..999a808 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/RetractionRulesTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/RetractionRulesTest.scala
@@ -286,7 +286,7 @@ class RetractionRulesTest extends TableTestBase {
 class StreamTableTestForRetractionUtil extends StreamTableTestUtil {
 
   def verifySqlTrait(query: String, expected: String): Unit = {
-    verifyTableTrait(tableEnv.sql(query), expected)
+    verifyTableTrait(tableEnv.sqlQuery(query), expected)
   }
 
   def verifyTableTrait(resultTable: Table, expected: String): Unit = {

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala
index ab80c65..cfff326 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala
@@ -275,7 +275,7 @@ class TimeIndicatorConversionTest extends TableTestBase {
     val util = streamTestUtil()
     util.addTable[(Long, Int)]("MyTable" , 'long, 'int, 'proctime.proctime)
 
-    val result = util.tableEnv.sql("SELECT COUNT(long) FROM MyTable GROUP BY proctime")
+    val result = util.tableEnv.sqlQuery("SELECT COUNT(long) FROM MyTable GROUP BY proctime")
 
     val expected = unaryNode(
       "DataStreamCalc",
@@ -300,7 +300,7 @@ class TimeIndicatorConversionTest extends TableTestBase {
     val util = streamTestUtil()
     util.addTable[(Long, Int)]("MyTable" , 'long, 'int, 'proctime.proctime)
 
-    val result = util.tableEnv.sql("SELECT MIN(proctime) FROM MyTable GROUP BY long")
+    val result = util.tableEnv.sqlQuery("SELECT MIN(proctime) FROM MyTable GROUP BY long")
 
     val expected = unaryNode(
       "DataStreamCalc",
@@ -325,7 +325,7 @@ class TimeIndicatorConversionTest extends TableTestBase {
     val util = streamTestUtil()
     util.addTable[(Long, Long, Int)]("MyTable", 'rowtime.rowtime, 'long, 'int)
 
-    val result = util.tableEnv.sql(
+    val result = util.tableEnv.sqlQuery(
       "SELECT TUMBLE_END(rowtime, INTERVAL '0.1' SECOND) AS `rowtime`, `long`, " +
         "SUM(`int`) FROM MyTable " +
         "GROUP BY `long`, TUMBLE(rowtime, INTERVAL '0.1' SECOND)")
@@ -355,7 +355,7 @@ class TimeIndicatorConversionTest extends TableTestBase {
     val util = streamTestUtil()
     util.addTable[(Long, Long, Int)]("MyTable", 'rowtime.rowtime, 'long, 'int)
 
-    val result = util.tableEnv.sql("SELECT MIN(rowtime), long FROM MyTable " +
+    val result = util.tableEnv.sqlQuery("SELECT MIN(rowtime), long FROM MyTable " +
       "GROUP BY long, TUMBLE(rowtime, INTERVAL '0.1' SECOND)")
 
     val expected = unaryNode(

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/AggregateITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/AggregateITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/AggregateITCase.scala
index 39b8371..465a88c 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/AggregateITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/AggregateITCase.scala
@@ -52,7 +52,7 @@ class AggregateITCase(
     val ds = CollectionDataSets.get3TupleDataSet(env)
     tEnv.registerDataSet("MyTable", ds)
 
-    val result = tEnv.sql(sqlQuery)
+    val result = tEnv.sqlQuery(sqlQuery)
 
     val expected = "231,1,21,21,11"
     val results = result.toDataSet[Row].collect()
@@ -70,7 +70,7 @@ class AggregateITCase(
     val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
     tEnv.registerTable("MyTable", ds)
 
-    val result = tEnv.sql(sqlQuery)
+    val result = tEnv.sqlQuery(sqlQuery)
 
     val expected = "231"
     val results = result.toDataSet[Row].collect()
@@ -88,7 +88,7 @@ class AggregateITCase(
     val ds = CollectionDataSets.get3TupleDataSet(env)
     tEnv.registerDataSet("MyTable", ds)
 
-    val result = tEnv.sql(sqlQuery)
+    val result = tEnv.sqlQuery(sqlQuery)
 
     val expected = "231"
     val results = result.toDataSet[Row].collect()
@@ -109,7 +109,7 @@ class AggregateITCase(
       (2: Byte, 2: Short, 2, 2L, 2.0f, 2.0d, "Ciao")).toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'f, 'g)
     tEnv.registerTable("MyTable", ds)
 
-    val result = tEnv.sql(sqlQuery)
+    val result = tEnv.sqlQuery(sqlQuery)
 
     val expected = "1,1,1,1,1.5,1.5,2,Ciao,Ciao,Hello,Ciao,3.0"
     val results = result.toDataSet[Row].collect()
@@ -128,7 +128,7 @@ class AggregateITCase(
     val ds = env.fromElements((1: Byte, 1: Short), (2: Byte, 2: Short)).toTable(tEnv, 'a, 'b)
     tEnv.registerTable("MyTable", ds)
 
-    val result = tEnv.sql(sqlQuery)
+    val result = tEnv.sqlQuery(sqlQuery)
 
     val expected = "1,3,2,1,3"
     val results = result.toDataSet[Row].collect()
@@ -147,7 +147,7 @@ class AggregateITCase(
     val ds = env.fromElements((1f, "Hello"), (2f, "Ciao")).toTable(tEnv, 'a, 'b)
     tEnv.registerTable("MyTable", ds)
 
-    val result = tEnv.sql(sqlQuery)
+    val result = tEnv.sqlQuery(sqlQuery)
 
     val expected = "5.5,7"
     val results = result.toDataSet[Row].collect()
@@ -165,7 +165,7 @@ class AggregateITCase(
     val ds = env.fromElements((1f, "Hello"), (2f, "Ciao")).toTable(tEnv)
     tEnv.registerTable("MyTable", ds)
 
-    val result = tEnv.sql(sqlQuery)
+    val result = tEnv.sqlQuery(sqlQuery)
 
     val expected = "2,2"
     val results = result.toDataSet[Row].collect()
@@ -187,7 +187,7 @@ class AggregateITCase(
       (2: Byte, 2: Short, 2, 2L, 2.0f, 2.0d, "Ciao")).toTable(tEnv)
     tEnv.registerTable("MyTable", ds)
 
-    val result = tEnv.sql(sqlQuery)
+    val result = tEnv.sqlQuery(sqlQuery)
 
     val expected = "1,3,2"
     val results = result.toDataSet[Row].collect()
@@ -205,7 +205,7 @@ class AggregateITCase(
     val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
     tEnv.registerTable("MyTable", ds)
 
-    val result = tEnv.sql(sqlQuery)
+    val result = tEnv.sqlQuery(sqlQuery)
 
     val expected = "231,21"
     val results = result.toDataSet[Row].collect()
@@ -223,7 +223,7 @@ class AggregateITCase(
     val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
     tEnv.registerTable("MyTable", ds)
 
-    val result = tEnv.sql(sqlQuery)
+    val result = tEnv.sqlQuery(sqlQuery)
 
     val expected =
       "6,18,6\n5,13,5\n4,8,4\n3,5,3\n2,2,2\n1,1,1"
@@ -243,7 +243,7 @@ class AggregateITCase(
     val ds = CollectionDataSets.get3TupleDataSet(env)
     tEnv.registerDataSet("MyTable", ds)
 
-    val result = tEnv.sql(sqlQuery).toDataSet[Row].collect()
+    val result = tEnv.sqlQuery(sqlQuery).toDataSet[Row].collect()
 
     val expected =
       "6,null,18,1\n5,null,13,1\n4,null,8,1\n3,null,5,1\n2,null,2,1\n1,null,1,1\n" +
@@ -280,9 +280,9 @@ class AggregateITCase(
 
     tEnv.registerTable("MyTable", ds)
 
-    val result = tEnv.sql(sqlQuery)
-    val result2 = tEnv.sql(sqlQuery2)
-    val result3 = tEnv.sql(sqlQuery3)
+    val result = tEnv.sqlQuery(sqlQuery)
+    val result2 = tEnv.sqlQuery(sqlQuery2)
+    val result3 = tEnv.sqlQuery(sqlQuery3)
 
     val results = result.toDataSet[Row].collect()
     val expected = Seq.empty
@@ -315,7 +315,7 @@ class AggregateITCase(
       .map(x => (x._1, x._2, x._3, new Timestamp(x._1 * 1000)))
     tEnv.registerDataSet("T", ds, 'a, 'b, 'c, 'ts)
 
-    val result = tEnv.sql(sqlQuery).toDataSet[Row].collect()
+    val result = tEnv.sqlQuery(sqlQuery).toDataSet[Row].collect()
     val expected = Seq(
       "1,1,1,1,1",
       "2,2,1,2,2", "2,3,1,2,3",
@@ -348,7 +348,7 @@ class AggregateITCase(
       .map(x => (x._1, x._2, x._3, new Timestamp(x._1 * 1000)))
     tEnv.registerDataSet("T", ds, 'a, 'b, 'c, 'ts)
 
-    val result = tEnv.sql(sqlQuery).toDataSet[Row].collect()
+    val result = tEnv.sqlQuery(sqlQuery).toDataSet[Row].collect()
     val expected = Seq(
       "1,1,1,1,1","1,1,1,1,1",
       "2,5,2,2,2","2,5,2,2,2",
@@ -383,7 +383,7 @@ class AggregateITCase(
       .map(x => (x._1, x._2, x._3, new Timestamp(x._1 * 1000)))
     tEnv.registerDataSet("T", ds, 'a, 'b, 'c, 'ts)
 
-    val result = tEnv.sql(sqlQuery).toDataSet[Row].collect()
+    val result = tEnv.sqlQuery(sqlQuery).toDataSet[Row].collect()
     val expected = Seq(
       "2,10,39,6,3,7",
       "16,21,111,6,6,18"

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/CalcITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/CalcITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/CalcITCase.scala
index 711182c..b891a7d 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/CalcITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/CalcITCase.scala
@@ -53,7 +53,7 @@ class CalcITCase(
     val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
     tEnv.registerTable("MyTable", ds)
 
-    val result = tEnv.sql(sqlQuery)
+    val result = tEnv.sqlQuery(sqlQuery)
 
     val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" +
       "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" +
@@ -77,7 +77,7 @@ class CalcITCase(
     val ds = CollectionDataSets.get3TupleDataSet(env)
     tEnv.registerDataSet("MyTable", ds, 'a, 'b, 'c)
 
-    val result = tEnv.sql(sqlQuery)
+    val result = tEnv.sqlQuery(sqlQuery)
 
     val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" +
       "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" +
@@ -101,7 +101,7 @@ class CalcITCase(
     val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
     tEnv.registerTable("MyTable", ds)
 
-    val result = tEnv.sql(sqlQuery)
+    val result = tEnv.sqlQuery(sqlQuery)
 
     val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" +
       "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" +
@@ -125,7 +125,7 @@ class CalcITCase(
     val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
     tEnv.registerTable("MyTable", ds)
 
-    val result = tEnv.sql(sqlQuery)
+    val result = tEnv.sqlQuery(sqlQuery)
 
     val expected = "1,1\n" + "2,2\n" + "3,2\n" + "4,3\n" + "5,3\n" + "6,3\n" + "7,4\n" +
       "8,4\n" + "9,4\n" + "10,4\n" + "11,5\n" + "12,5\n" + "13,5\n" + "14,5\n" + "15,5\n" +
@@ -146,7 +146,7 @@ class CalcITCase(
     val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
     tEnv.registerTable("MyTable", ds)
 
-    tEnv.sql(sqlQuery)
+    tEnv.sqlQuery(sqlQuery)
   }
 
   @Test
@@ -160,7 +160,7 @@ class CalcITCase(
     val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
     tEnv.registerTable("MyTable", ds)
 
-    val result = tEnv.sql(sqlQuery)
+    val result = tEnv.sqlQuery(sqlQuery)
 
     val expected = "\n"
     val results = result.toDataSet[Row].collect()
@@ -178,7 +178,7 @@ class CalcITCase(
     val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
     tEnv.registerTable("MyTable", ds)
 
-    val result = tEnv.sql(sqlQuery)
+    val result = tEnv.sqlQuery(sqlQuery)
 
     val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, " +
       "how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + "7,4," +
@@ -201,7 +201,7 @@ class CalcITCase(
     val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
     tEnv.registerTable("MyTable", ds)
 
-    val result = tEnv.sql(sqlQuery)
+    val result = tEnv.sqlQuery(sqlQuery)
 
     val expected = "3,2,Hello world\n" + "4,3,Hello world, how are you?\n"
     val results = result.toDataSet[Row].collect()
@@ -219,7 +219,7 @@ class CalcITCase(
     val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
     tEnv.registerTable("MyTable", ds)
 
-    val result = tEnv.sql(sqlQuery)
+    val result = tEnv.sqlQuery(sqlQuery)
 
     val expected = "2,2,Hello\n" + "4,3,Hello world, how are you?\n" +
       "6,3,Luke Skywalker\n" + "8,4," + "Comment#2\n" + "10,4,Comment#4\n" +
@@ -240,7 +240,7 @@ class CalcITCase(
     val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
     tEnv.registerTable("MyTable", ds)
 
-    val result = tEnv.sql(sqlQuery)
+    val result = tEnv.sqlQuery(sqlQuery)
 
     val expected = "1,1,Hi\n" + "21,6,Comment#15\n"
     val results = result.toDataSet[Row].collect()
@@ -258,7 +258,7 @@ class CalcITCase(
     val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
     tEnv.registerTable("MyTable", ds)
 
-    val result = tEnv.sql(sqlQuery)
+    val result = tEnv.sqlQuery(sqlQuery)
 
     val expected = "3,2,Hello world\n" + "7,4,Comment#1\n" +
       "9,4,Comment#3\n" + "17,6,Comment#11\n" +
@@ -281,7 +281,7 @@ class CalcITCase(
       Timestamp.valueOf("1984-07-12 14:34:24")))
     tEnv.registerDataSet("MyTable", ds, 'a, 'b, 'c)
 
-    val result = tEnv.sql(sqlQuery)
+    val result = tEnv.sqlQuery(sqlQuery)
 
     val expected = "1984-07-12,14:34:24,1984-07-12 14:34:24.0," +
       "1984-07-12,14:34:24,1984-07-12 14:34:24.0"
@@ -300,7 +300,7 @@ class CalcITCase(
     val ds = env.fromElements("a", "b", "c")
     tEnv.registerDataSet("MyTable", ds, 'text)
 
-    val result = tEnv.sql("SELECT hashCode(text) FROM MyTable")
+    val result = tEnv.sqlQuery("SELECT hashCode(text) FROM MyTable")
 
     val expected = "97\n98\n99"
     val results = result.toDataSet[Row].collect()

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/JoinITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/JoinITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/JoinITCase.scala
index 681b4b5..6a17cb4 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/JoinITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/JoinITCase.scala
@@ -49,7 +49,7 @@ class JoinITCase(
     tEnv.registerTable("Table3", ds1)
     tEnv.registerTable("Table5", ds2)
 
-    val result = tEnv.sql(sqlQuery)
+    val result = tEnv.sqlQuery(sqlQuery)
 
     val expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt\n"
     val results = result.toDataSet[Row].collect()
@@ -69,7 +69,7 @@ class JoinITCase(
     tEnv.registerTable("Table3", ds1)
     tEnv.registerTable("Table5", ds2)
 
-    val result = tEnv.sql(sqlQuery)
+    val result = tEnv.sqlQuery(sqlQuery)
 
     val expected = "Hi,Hallo\n"
     val results = result.toDataSet[Row].collect()
@@ -89,7 +89,7 @@ class JoinITCase(
     tEnv.registerTable("Table3", ds1)
     tEnv.registerTable("Table5", ds2)
 
-    val result = tEnv.sql(sqlQuery)
+    val result = tEnv.sqlQuery(sqlQuery)
 
     val expected = "Hello world, how are you?,Hallo Welt wie\n" + "I am fine.,Hallo Welt wie\n"
     val results = result.toDataSet[Row].collect()
@@ -109,7 +109,7 @@ class JoinITCase(
     tEnv.registerTable("Table3", ds1)
     tEnv.registerTable("Table5", ds2)
 
-    val result = tEnv.sql(sqlQuery)
+    val result = tEnv.sqlQuery(sqlQuery)
 
     val expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt wie gehts?\n" +
       "Hello world,ABC\n" + "I am fine.,HIJ\n" + "I am fine.,IJK\n"
@@ -130,7 +130,7 @@ class JoinITCase(
     tEnv.registerTable("Table3", ds1)
     tEnv.registerTable("Table5", ds2)
 
-    val result = tEnv.sql(sqlQuery)
+    val result = tEnv.sqlQuery(sqlQuery)
     val expected = "1,Hi\n" + "2,Hello\n" + "1,Hello\n" +
       "2,Hello world\n" + "2,Hello world\n" + "3,Hello world\n"
     val results = result.toDataSet[Row].collect()
@@ -150,7 +150,7 @@ class JoinITCase(
     tEnv.registerDataSet("Table3", ds1, 'a, 'b, 'c)
     tEnv.registerDataSet("Table5", ds2, 'd, 'e, 'f, 'g, 'h)
 
-    val result = tEnv.sql(sqlQuery)
+    val result = tEnv.sqlQuery(sqlQuery)
 
     val expected = "6,6"
     val results = result.toDataSet[Row].collect()
@@ -170,7 +170,7 @@ class JoinITCase(
     tEnv.registerTable("Table3", ds1)
     tEnv.registerTable("Table5", ds2)
 
-    val result = tEnv.sql(sqlQuery)
+    val result = tEnv.sqlQuery(sqlQuery)
 
     val expected = "6,6"
     val results = result.toDataSet[Row].collect()
@@ -196,7 +196,7 @@ class JoinITCase(
       "null,CDE\n" + "null,DEF\n" + "null,EFG\n" + "null,FGH\n" + "null,GHI\n" + "null,HIJ\n" +
       "null,IJK\n" + "null,JKL\n" + "null,KLM"
 
-    val results = tEnv.sql(sqlQuery).toDataSet[Row].collect()
+    val results = tEnv.sqlQuery(sqlQuery).toDataSet[Row].collect()
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
@@ -218,7 +218,7 @@ class JoinITCase(
       "null,Hallo Welt wie\n" + "null,Hallo Welt wie gehts?\n" + "null,ABC\n" + "null,BCD\n" +
       "null,CDE\n" + "null,DEF\n" + "null,EFG\n" + "null,FGH\n" + "null,GHI\n" + "null,HIJ\n" +
       "null,IJK\n" + "null,JKL\n" + "null,KLM"
-    val results = tEnv.sql(sqlQuery).toDataSet[Row].collect()
+    val results = tEnv.sqlQuery(sqlQuery).toDataSet[Row].collect()
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
@@ -240,7 +240,7 @@ class JoinITCase(
       "null,Hallo Welt wie\n" + "null,Hallo Welt wie gehts?\n" + "null,ABC\n" + "null,BCD\n" +
       "null,CDE\n" + "null,DEF\n" + "null,EFG\n" + "null,FGH\n" + "null,GHI\n" + "null,HIJ\n" +
       "null,IJK\n" + "null,JKL\n" + "null,KLM"
-    val results = tEnv.sql(sqlQuery).toDataSet[Row].collect()
+    val results = tEnv.sqlQuery(sqlQuery).toDataSet[Row].collect()
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
@@ -257,7 +257,7 @@ class JoinITCase(
       "3,1,1,Hi\n" +
       "3,2,2,Hello\n" +
       "3,3,2,Hello world"
-    val result = tEnv.sql(sqlQuery2).collect()
+    val result = tEnv.sqlQuery(sqlQuery2).collect()
     TestBaseUtils.compareResultAsText(result.asJava, expected)
   }
 
@@ -274,7 +274,7 @@ class JoinITCase(
       "1,1,Hi,3\n" +
       "2,2,Hello,3\n" +
       "3,2,Hello world,3"
-    val result = tEnv.sql(sqlQuery1).collect()
+    val result = tEnv.sqlQuery(sqlQuery1).collect()
     TestBaseUtils.compareResultAsText(result.asJava, expected)
   }
 
@@ -287,7 +287,7 @@ class JoinITCase(
     tEnv.registerTable("A", table)
 
     val sqlQuery1 = "SELECT * FROM A CROSS JOIN (SELECT count(*) FROM A HAVING count(*) < 0)"
-    val result = tEnv.sql(sqlQuery1).count()
+    val result = tEnv.sqlQuery(sqlQuery1).count()
     Assert.assertEquals(0, result)
   }
 
@@ -305,7 +305,7 @@ class JoinITCase(
     tEnv.registerTable("B", ds2)
 
 
-    val result = tEnv.sql(sqlQuery)
+    val result = tEnv.sqlQuery(sqlQuery)
     val expected = Seq(
           "1,null",
           "2,null", "2,null",
@@ -331,7 +331,7 @@ class JoinITCase(
     tEnv.registerTable("A", ds1)
     tEnv.registerTable("B", ds2)
 
-    val result = tEnv.sql(sqlQuery)
+    val result = tEnv.sqlQuery(sqlQuery)
     val expected = Seq(
       "1,null", "2,null", "2,null", "3,3", "3,3",
       "3,3", "4,null", "4,null", "4,null",
@@ -355,7 +355,7 @@ class JoinITCase(
     tEnv.registerTable("A", ds1)
     tEnv.registerTable("B", ds2)
 
-    val result = tEnv.sql(sqlQuery)
+    val result = tEnv.sqlQuery(sqlQuery)
     val expected = Seq(
       "1,3", "2,3", "2,3", "3,null", "3,null",
       "3,null", "4,null", "4,null", "4,null",
@@ -380,7 +380,7 @@ class JoinITCase(
     tEnv.registerTable("A", ds2)
     tEnv.registerTable("B", ds1)
 
-    val result = tEnv.sql(sqlQuery)
+    val result = tEnv.sqlQuery(sqlQuery)
 
     val expected = Seq(
       "2,null", "3,null", "1,null").mkString("\n")
@@ -402,7 +402,7 @@ class JoinITCase(
     tEnv.registerTable("A", ds1)
     tEnv.registerTable("B", ds2)
 
-    val result = tEnv.sql(sqlQuery)
+    val result = tEnv.sqlQuery(sqlQuery)
 
     val expected = Seq(
       "1,null", "2,null", "2,null", "3,3", "3,3",
@@ -427,7 +427,7 @@ class JoinITCase(
     tEnv.registerTable("A", ds1)
     tEnv.registerTable("B", ds2)
 
-    val result = tEnv.sql(sqlQuery)
+    val result = tEnv.sqlQuery(sqlQuery)
 
     val expected = Seq(
       "1,null", "2,null", "2,null", "3,null", "3,null",
@@ -453,7 +453,7 @@ class JoinITCase(
     tEnv.registerTable("t1", ds1)
     tEnv.registerTable("t2", ds2)
 
-    val result = tEnv.sql(sqlQuery)
+    val result = tEnv.sqlQuery(sqlQuery)
     val expected = Seq(
       "1,null,null",
       "2,null,null", "2,null,null",
@@ -481,7 +481,7 @@ class JoinITCase(
 
     val sqlQuery = "SELECT a, s FROM T, UNNEST(T.c) as A (s)"
 
-    val result = tEnv.sql(sqlQuery)
+    val result = tEnv.sqlQuery(sqlQuery)
 
     val expected = List("1,Hi", "1,w", "2,Hello", "2,k", "3,Hello world", "3,x")
     val results = result.toDataSet[Row].collect().toList
@@ -508,7 +508,7 @@ class JoinITCase(
       "  UNNEST(tf.b) as A (x, y) " +
       "WHERE x > a"
 
-    val result = tEnv.sql(sqlQuery)
+    val result = tEnv.sqlQuery(sqlQuery)
 
     val expected = List(
       "1,[(12,45.6), (2,45.612)],12,45.6",

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/SetOperatorsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/SetOperatorsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/SetOperatorsITCase.scala
index b0e6fe8..d965e0c 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/SetOperatorsITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/SetOperatorsITCase.scala
@@ -52,7 +52,7 @@ class SetOperatorsITCase(
     tEnv.registerDataSet("t1", ds1, 'a, 'b, 'c)
     tEnv.registerDataSet("t2", ds2, 'd, 'e, 'f)
 
-    val result = tEnv.sql(sqlQuery)
+    val result = tEnv.sqlQuery(sqlQuery)
 
     val expected = "Hi\n" + "Hello\n" + "Hello world\n" + "Hi\n" + "Hello\n" + "Hello world\n"
     val results = result.toDataSet[Row].collect()
@@ -72,7 +72,7 @@ class SetOperatorsITCase(
     tEnv.registerDataSet("t1", ds1, 'a, 'b, 'c)
     tEnv.registerDataSet("t2", ds2, 'd, 'e, 'f)
 
-    val result = tEnv.sql(sqlQuery)
+    val result = tEnv.sqlQuery(sqlQuery)
 
     val expected = "Hi\n" + "Hello\n" + "Hello world\n"
     val results = result.toDataSet[Row].collect()
@@ -94,7 +94,7 @@ class SetOperatorsITCase(
     tEnv.registerDataSet("t1", ds1, 'a, 'b, 'c)
     tEnv.registerDataSet("t2", ds2, 'a, 'b, 'd, 'c, 'e)
 
-    val result = tEnv.sql(sqlQuery)
+    val result = tEnv.sqlQuery(sqlQuery)
 
     val expected = "Hi\n" + "Hallo\n"
     val results = result.toDataSet[Row].collect()
@@ -115,7 +115,7 @@ class SetOperatorsITCase(
     tEnv.registerDataSet("t1", ds1, 'a, 'b, 'c)
     tEnv.registerDataSet("t2", ds2, 'a, 'b, 'd, 'c, 'e)
 
-    val result = tEnv.sql(sqlQuery)
+    val result = tEnv.sqlQuery(sqlQuery)
 
     val expected = "18"
     val results = result.toDataSet[Row].collect()
@@ -135,7 +135,7 @@ class SetOperatorsITCase(
     tEnv.registerDataSet("t1", ds1, 'a, 'b, 'c)
     tEnv.registerDataSet("t2", ds2, 'a, 'b, 'c)
 
-    val result = tEnv.sql(sqlQuery)
+    val result = tEnv.sqlQuery(sqlQuery)
 
     val expected = "Hello\n" + "Hello world\n"
     val results = result.toDataSet[Row].collect()
@@ -161,7 +161,7 @@ class SetOperatorsITCase(
     tEnv.registerDataSet("t1", ds1, 'c)
     tEnv.registerDataSet("t2", ds2, 'c)
 
-    val result = tEnv.sql(sqlQuery)
+    val result = tEnv.sqlQuery(sqlQuery)
 
     val expected = "1\n1"
     val results = result.toDataSet[Row].collect()
@@ -183,7 +183,7 @@ class SetOperatorsITCase(
     tEnv.registerDataSet("t1", ds1, 'a, 'b, 'c)
     tEnv.registerDataSet("t2", ds2, 'a, 'b, 'd, 'c, 'e)
 
-    val result = tEnv.sql(sqlQuery)
+    val result = tEnv.sqlQuery(sqlQuery)
 
     val expected = "Hi\n"
     val results = result.toDataSet[Row].collect()
@@ -208,7 +208,7 @@ class SetOperatorsITCase(
     tEnv.registerDataSet("t1", ds1, 'a, 'b, 'c)
     tEnv.registerDataSet("t2", ds2, 'a, 'b, 'c)
 
-    val result = tEnv.sql(sqlQuery)
+    val result = tEnv.sqlQuery(sqlQuery)
 
     val expected = "Hi\n" + "Hello\n"
     val results = result.toDataSet[Row].collect()
@@ -234,7 +234,7 @@ class SetOperatorsITCase(
     tEnv.registerDataSet("t1", ds1, 'c)
     tEnv.registerDataSet("t2", ds2, 'c)
 
-    val result = tEnv.sql(sqlQuery)
+    val result = tEnv.sqlQuery(sqlQuery)
 
     val expected = "1\n2\n2"
     val results = result.toDataSet[Row].collect()
@@ -254,7 +254,7 @@ class SetOperatorsITCase(
     tEnv.registerDataSet("t1", ds1, 'a, 'b, 'c)
     tEnv.registerDataSet("t2", ds2, 'a, 'b, 'c)
 
-    val result = tEnv.sql(sqlQuery)
+    val result = tEnv.sqlQuery(sqlQuery)
 
     val expected = "Hello\n" + "Hello world\n"
     val results = result.toDataSet[Row].collect()
@@ -271,7 +271,7 @@ class SetOperatorsITCase(
     tEnv.registerTable("Table3", ds1)
     tEnv.registerTable("Table5", ds2)
 
-    val result = tEnv.sql("SELECT d FROM Table5 WHERE d IN (SELECT a FROM Table3)")
+    val result = tEnv.sqlQuery("SELECT d FROM Table5 WHERE d IN (SELECT a FROM Table3)")
 
     val expected = Seq("1", "2", "2", "3", "3", "3").mkString("\n")
     val results = result.toDataSet[Row].collect()
@@ -288,7 +288,7 @@ class SetOperatorsITCase(
     tEnv.registerTable("Table3", ds1)
     tEnv.registerTable("Table5", ds2)
 
-    val result = tEnv.sql("SELECT d IN (SELECT a FROM Table3) FROM Table5")
+    val result = tEnv.sqlQuery("SELECT d IN (SELECT a FROM Table3) FROM Table5")
 
     val expected = Seq("false", "false", "false", "false", "false", "false", "false",
       "false", "false", "true", "true", "true", "true", "true", "true").mkString("\n")

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/SortITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/SortITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/SortITCase.scala
index 4672ec3..66943fc 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/SortITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/SortITCase.scala
@@ -62,7 +62,7 @@ class SortITCase(mode: TestExecutionMode, configMode: TableConfigMode)
 
     val expected = sortExpectedly(tupleDataSetStrings)
     // squash all rows inside a partition into one element
-    val results = tEnv.sql(sqlQuery).toDataSet[Row].mapPartition(rows => {
+    val results = tEnv.sqlQuery(sqlQuery).toDataSet[Row].mapPartition(rows => {
       // the rows need to be copied in object reuse mode
       val copied = new mutable.ArrayBuffer[Row]
       rows.foreach(r => copied += Row.copy(r))
@@ -99,7 +99,7 @@ class SortITCase(mode: TestExecutionMode, configMode: TableConfigMode)
 
     val expected = sortExpectedly(tupleDataSetStrings, 2, 21)
     // squash all rows inside a partition into one element
-    val results = tEnv.sql(sqlQuery).toDataSet[Row].mapPartition(rows => {
+    val results = tEnv.sqlQuery(sqlQuery).toDataSet[Row].mapPartition(rows => {
       // the rows need to be copied in object reuse mode
       val copied = new mutable.ArrayBuffer[Row]
       rows.foreach(r => copied += Row.copy(r))
@@ -130,7 +130,7 @@ class SortITCase(mode: TestExecutionMode, configMode: TableConfigMode)
 
     val expected = sortExpectedly(tupleDataSetStrings, 2, 7)
     // squash all rows inside a partition into one element
-    val results = tEnv.sql(sqlQuery).toDataSet[Row].mapPartition(rows => {
+    val results = tEnv.sqlQuery(sqlQuery).toDataSet[Row].mapPartition(rows => {
       // the rows need to be copied in object reuse mode
       val copied = new mutable.ArrayBuffer[Row]
       rows.foreach(r => copied += Row.copy(r))
@@ -161,7 +161,7 @@ class SortITCase(mode: TestExecutionMode, configMode: TableConfigMode)
 
     val expected = sortExpectedly(tupleDataSetStrings, 0, 5)
     // squash all rows inside a partition into one element
-    val results = tEnv.sql(sqlQuery).toDataSet[Row].mapPartition(rows => {
+    val results = tEnv.sqlQuery(sqlQuery).toDataSet[Row].mapPartition(rows => {
       // the rows need to be copied in object reuse mode
       val copied = new mutable.ArrayBuffer[Row]
       rows.foreach(r => copied += Row.copy(r))

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableEnvironmentITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableEnvironmentITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableEnvironmentITCase.scala
index b7f1bb1..be8278f 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableEnvironmentITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableEnvironmentITCase.scala
@@ -24,8 +24,10 @@ import org.apache.flink.table.api.TableEnvironment
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.runtime.utils.TableProgramsCollectionTestBase
 import org.apache.flink.table.runtime.utils.TableProgramsTestBase.TableConfigMode
+import org.apache.flink.table.utils.MemoryTableSinkUtil
 import org.apache.flink.test.util.TestBaseUtils
 import org.apache.flink.types.Row
+import org.junit.Assert.assertEquals
 import org.junit._
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
@@ -48,7 +50,7 @@ class TableEnvironmentITCase(
 
     val sqlQuery = "SELECT * FROM MyTable WHERE a > 9"
 
-    val result = tEnv.sql(sqlQuery).select('a.avg, 'b.sum, 'c.count)
+    val result = tEnv.sqlQuery(sqlQuery).select('a.avg, 'b.sum, 'c.count)
 
     val expected = "15,65,12"
     val results = result.toDataSet[Row].collect()
@@ -68,7 +70,7 @@ class TableEnvironmentITCase(
 
     val sqlQuery = "SELECT avg(a) as a1, sum(b) as b1, count(c) as c1 FROM MyTable"
 
-    val result = tEnv.sql(sqlQuery).select('a1 + 1, 'b1 - 5, 'c1)
+    val result = tEnv.sqlQuery(sqlQuery).select('a1 + 1, 'b1 - 5, 'c1)
 
     val expected = "16,60,12"
     val results = result.toDataSet[Row].collect()
@@ -85,11 +87,11 @@ class TableEnvironmentITCase(
     tEnv.registerTable("MyTable", t)
 
     val sqlQuery = "SELECT a as aa FROM MyTable WHERE b = 6"
-    val result1 = tEnv.sql(sqlQuery)
+    val result1 = tEnv.sqlQuery(sqlQuery)
     tEnv.registerTable("ResTable", result1)
 
     val sqlQuery2 = "SELECT count(aa) FROM ResTable"
-    val result2 = tEnv.sql(sqlQuery2)
+    val result2 = tEnv.sqlQuery(sqlQuery2)
 
     val expected = "6"
     val results = result2.toDataSet[Row].collect()
@@ -106,11 +108,34 @@ class TableEnvironmentITCase(
     val ds = env.fromElements(((12, true), "Hello")).toTable(tEnv).as('a1, 'a2)
     tEnv.registerTable("MyTable", ds)
 
-    val result = tEnv.sql(sqlQuery)
+    val result = tEnv.sqlQuery(sqlQuery)
 
     val expected = "Hello,true\n"
 
     val results = result.toDataSet[Row].collect()
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
+
+  @Test
+  def testInsertIntoMemoryTable(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    MemoryTableSinkUtil.clear
+
+    val t = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
+    tEnv.registerTable("sourceTable", t)
+
+    val fieldNames = Array("d", "e", "f")
+    val fieldTypes = tEnv.scan("sourceTable").getSchema.getTypes
+    val sink = new MemoryTableSinkUtil.UnsafeMemoryAppendTableSink
+    tEnv.registerTableSink("targetTable", fieldNames, fieldTypes, sink)
+
+    val sql = "INSERT INTO targetTable SELECT a, b, c FROM sourceTable"
+    tEnv.sqlUpdate(sql)
+    env.execute()
+
+    val expected = List("1,1,Hi", "2,2,Hello", "3,2,Hello world")
+    assertEquals(expected.sorted, MemoryTableSinkUtil.results.sorted)
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableSourceITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableSourceITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableSourceITCase.scala
index 504ab90..187d096 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableSourceITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableSourceITCase.scala
@@ -44,7 +44,7 @@ class TableSourceITCase(
     val tEnv = TableEnvironment.getTableEnvironment(env, config)
 
     tEnv.registerTableSource("csvTable", csvTable)
-    val results = tEnv.sql(
+    val results = tEnv.sqlQuery(
       "SELECT id, `first`, `last`, score FROM csvTable").collect()
 
     val expected = Seq(
@@ -67,7 +67,7 @@ class TableSourceITCase(
 
     tableEnv.registerTableSource("NestedPersons", nestedTable)
 
-    val result = tableEnv.sql("SELECT NestedPersons.firstName, NestedPersons.lastName," +
+    val result = tableEnv.sqlQuery("SELECT NestedPersons.firstName, NestedPersons.lastName," +
         "NestedPersons.address.street, NestedPersons.address.city AS city " +
         "FROM NestedPersons " +
         "WHERE NestedPersons.address.city LIKE 'Dublin'").collect()

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CalcITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CalcITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CalcITCase.scala
index 116f690..e947c3f 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CalcITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CalcITCase.scala
@@ -446,7 +446,7 @@ class CalcITCase(
 
     val sqlQuery = "SELECT c FROM t1 where RichFunc2(c)='ABC#Hello'"
 
-    val result = tEnv.sql(sqlQuery)
+    val result = tEnv.sqlQuery(sqlQuery)
 
     val expected = "Hello"
     val results = result.toDataSet[Row].collect()
@@ -467,7 +467,7 @@ class CalcITCase(
 
     val sqlQuery = "SELECT c FROM t1 where RichFunc3(c)=true"
 
-    val result = tEnv.sql(sqlQuery)
+    val result = tEnv.sqlQuery(sqlQuery)
 
     val expected = "Hello"
     val results = result.toDataSet[Row].collect()
@@ -488,7 +488,7 @@ class CalcITCase(
     val sqlQuery = "SELECT c FROM t1 where " +
       "RichFunc2(c)='Abc#Hello' or RichFunc1(a)=3 and b=2"
 
-    val result = tEnv.sql(sqlQuery)
+    val result = tEnv.sqlQuery(sqlQuery)
 
     val expected = "Hello\nHello world"
     val results = result.toDataSet[Row].collect()

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/TableEnvironmentITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/TableEnvironmentITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/TableEnvironmentITCase.scala
index 2e23161..725c580 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/TableEnvironmentITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/TableEnvironmentITCase.scala
@@ -26,8 +26,10 @@ import org.apache.flink.table.api.TableEnvironment
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.runtime.utils.TableProgramsTestBase.TableConfigMode
 import org.apache.flink.table.runtime.utils.{TableProgramsCollectionTestBase, TableProgramsTestBase}
+import org.apache.flink.table.utils.MemoryTableSinkUtil
 import org.apache.flink.test.util.TestBaseUtils
 import org.apache.flink.types.Row
+import org.junit.Assert.assertEquals
 import org.junit._
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
@@ -162,6 +164,28 @@ class TableEnvironmentITCase(
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
+  @Test
+  def testInsertIntoMemoryTable(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    MemoryTableSinkUtil.clear
+
+    val t = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
+    tEnv.registerTable("sourceTable", t)
+
+    val fieldNames = Array("d", "e", "f")
+    val fieldTypes = tEnv.scan("sourceTable").getSchema.getTypes
+    val sink = new MemoryTableSinkUtil.UnsafeMemoryAppendTableSink
+    tEnv.registerTableSink("targetTable", fieldNames, fieldTypes, sink)
+
+    tEnv.scan("sourceTable")
+      .select('a, 'b, 'c)
+      .insertInto("targetTable")
+    env.execute()
+
+    val expected = List("1,1,Hi", "2,2,Hello", "3,2,Hello world")
+    assertEquals(expected.sorted, MemoryTableSinkUtil.results.sorted)
+  }
 }
 
 object TableEnvironmentITCase {

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala
index 24d8695..ec65cf7 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala
@@ -229,7 +229,7 @@ class TimeAttributesITCase extends StreamingMultipleProgramsTestBase {
     val table = stream.toTable(tEnv, 'rowtime.rowtime, 'int, 'double, 'float, 'bigdec, 'string)
     tEnv.registerTable("MyTable", table)
 
-    val t = tEnv.sql("SELECT COUNT(`rowtime`) FROM MyTable " +
+    val t = tEnv.sqlQuery("SELECT COUNT(`rowtime`) FROM MyTable " +
       "GROUP BY TUMBLE(rowtime, INTERVAL '0.003' SECOND)")
 
     val results = t.toAppendStream[Row]
@@ -292,7 +292,7 @@ class TimeAttributesITCase extends StreamingMultipleProgramsTestBase {
     tEnv.registerTable("T1", table)
     val querySql = "select rowtime as ts, string as msg from T1"
 
-    val results = tEnv.sql(querySql).toAppendStream[Pojo1]
+    val results = tEnv.sqlQuery(querySql).toAppendStream[Pojo1]
     results.addSink(new StreamITCase.StringSink[Pojo1])
     env.execute()
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala
index ab7925b..e40da7a 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala
@@ -60,7 +60,7 @@ class JoinITCase extends StreamingWithStateTestBase {
     tEnv.registerTable("T1", t1)
     tEnv.registerTable("T2", t2)
 
-    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+    val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
     result.addSink(new StreamITCase.StringSink[Row])
     env.execute()
   }
@@ -97,7 +97,7 @@ class JoinITCase extends StreamingWithStateTestBase {
     tEnv.registerTable("T1", t1)
     tEnv.registerTable("T2", t2)
 
-    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+    val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
     result.addSink(new StreamITCase.StringSink[Row])
     env.execute()
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/OverWindowITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/OverWindowITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/OverWindowITCase.scala
index cc47a69..4884513 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/OverWindowITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/OverWindowITCase.scala
@@ -68,7 +68,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
       "    PARTITION BY a ORDER BY proctime ROWS BETWEEN 4 PRECEDING AND CURRENT ROW) " +
       "FROM MyTable"
 
-    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+    val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
     result.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
@@ -110,7 +110,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
       "  MIN(c) OVER (" +
       "    ORDER BY proctime ROWS BETWEEN 10 PRECEDING AND CURRENT ROW) " +
       "FROM MyTable"
-    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+    val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
     result.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
@@ -153,7 +153,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
       "sum(a) OVER (PARTITION BY c ORDER BY proctime RANGE UNBOUNDED preceding) " +
       "from T1"
 
-    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+    val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
     result.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
@@ -181,7 +181,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
       " OVER (PARTITION BY c ORDER BY proctime ROWS BETWEEN UNBOUNDED preceding AND CURRENT ROW) " +
       "as cnt1 from T1)"
 
-    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+    val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
     result.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
@@ -215,7 +215,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
       "sum(a) OVER (ORDER BY proctime RANGE UNBOUNDED preceding) " +
       "from T1"
 
-    val result = tEnv.sql(sqlQuery).toAppendStream[Row](queryConfig)
+    val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row](queryConfig)
     result.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
@@ -240,7 +240,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
       "count(a) OVER (ORDER BY proctime ROWS BETWEEN UNBOUNDED preceding AND CURRENT ROW) " +
       "from T1"
 
-    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+    val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
     result.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
@@ -302,7 +302,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
       "    BETWEEN INTERVAL '1' SECOND PRECEDING AND CURRENT ROW)" +
       " FROM T1"
 
-    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+    val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
     result.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
@@ -363,7 +363,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
       "    OVER (PARTITION BY c ORDER BY rowtime ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) " +
       "FROM T1"
 
-    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+    val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
     result.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
@@ -431,7 +431,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
       "    OVER (ORDER BY rowtime RANGE BETWEEN INTERVAL '1' SECOND PRECEDING AND CURRENT ROW) " +
       " FROM T1"
 
-    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+    val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
     result.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
@@ -492,7 +492,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
       "  SUM(a) OVER (ORDER BY rowtime ROWS BETWEEN 2 preceding AND CURRENT ROW) " +
       "FROM T1"
 
-    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+    val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
     result.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
@@ -553,7 +553,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
 
     tEnv.registerTable("T1", t1)
 
-    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+    val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
     result.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
@@ -619,7 +619,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
 
     tEnv.registerTable("T1", t1)
 
-    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+    val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
     result.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
@@ -681,7 +681,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
 
     tEnv.registerTable("T1", t1)
 
-    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+    val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
     result.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
@@ -742,7 +742,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
 
     tEnv.registerTable("T1", t1)
 
-    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+    val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
     result.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
@@ -814,7 +814,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
 
     tEnv.registerTable("T1", t1)
 
-    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+    val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
     result.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SortITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SortITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SortITCase.scala
index 2c59f8c..19db2a0 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SortITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SortITCase.scala
@@ -87,7 +87,7 @@ class SortITCase extends StreamingWithStateTestBase {
     val  sqlQuery = "SELECT b FROM T1 ORDER BY rowtime, b ASC "
       
       
-    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+    val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
     result.addSink(new StringRowSelectorSink(0)).setParallelism(1)
     env.execute()
     

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala
index 5398c6d..2c82d9c 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala
@@ -18,16 +18,21 @@
 
 package org.apache.flink.table.runtime.stream.sql
 
+import java.util
+
 import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
 import org.apache.flink.api.java.typeutils.RowTypeInfo
 import org.apache.flink.api.scala._
 import org.apache.flink.streaming.api.TimeCharacteristic
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.api.{TableEnvironment, Types}
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.runtime.utils.TimeTestUtil.EventTimeSourceFunction
 import org.apache.flink.table.runtime.utils.{StreamITCase, StreamTestData, StreamingWithStateTestBase}
 import org.apache.flink.types.Row
+import org.apache.flink.table.utils.MemoryTableSinkUtil
+
+import scala.collection.JavaConverters._
 import org.junit.Assert._
 import org.junit._
 
@@ -58,7 +63,7 @@ class SqlITCase extends StreamingWithStateTestBase {
     val t = ds.toTable(tEnv).as('a, 'b, 'c)
     tEnv.registerTable("MyTableRow", t)
 
-    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+    val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
     result.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
@@ -79,7 +84,7 @@ class SqlITCase extends StreamingWithStateTestBase {
     val t = StreamTestData.get3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
     tEnv.registerTable("MyTable", t)
 
-    val result = tEnv.sql(sqlQuery).toRetractStream[Row]
+    val result = tEnv.sqlQuery(sqlQuery).toRetractStream[Row]
     result.addSink(new StreamITCase.RetractingSink).setParallelism(1)
     env.execute()
 
@@ -100,7 +105,7 @@ class SqlITCase extends StreamingWithStateTestBase {
     val t = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
     tEnv.registerTable("MyTable", t)
 
-    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+    val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
     result.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
@@ -121,7 +126,7 @@ class SqlITCase extends StreamingWithStateTestBase {
     val t = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
     tEnv.registerTable("MyTable", t)
 
-    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+    val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
     result.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
@@ -142,7 +147,7 @@ class SqlITCase extends StreamingWithStateTestBase {
     val t = StreamTestData.getSmall3TupleDataStream(env)
     tEnv.registerDataStream("MyTable", t)
 
-    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+    val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
     result.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
@@ -166,7 +171,7 @@ class SqlITCase extends StreamingWithStateTestBase {
     val t2 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
     tEnv.registerTable("T2", t2)
 
-    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+    val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
     result.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
@@ -193,7 +198,7 @@ class SqlITCase extends StreamingWithStateTestBase {
     val t2 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
     tEnv.registerTable("T2", t2)
 
-    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+    val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
     result.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
@@ -219,7 +224,7 @@ class SqlITCase extends StreamingWithStateTestBase {
     val t2 = StreamTestData.get3TupleDataStream(env)
     tEnv.registerDataStream("T2", t2, 'a, 'b, 'c)
 
-    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+    val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
     result.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
@@ -244,7 +249,7 @@ class SqlITCase extends StreamingWithStateTestBase {
 
     val sqlQuery = "SELECT a, b, s FROM T, UNNEST(T.b) AS A (s)"
 
-    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+    val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
     result.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
@@ -276,7 +281,7 @@ class SqlITCase extends StreamingWithStateTestBase {
 
     val sqlQuery = "SELECT a, s FROM T, UNNEST(T.c) AS A (s)"
 
-    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+    val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
     result.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
@@ -306,7 +311,7 @@ class SqlITCase extends StreamingWithStateTestBase {
 
     val sqlQuery = "SELECT a, b, s, t FROM T, UNNEST(T.b) AS A (s, t) WHERE s > 13"
 
-    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+    val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
     result.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 
@@ -365,5 +370,33 @@ class SqlITCase extends StreamingWithStateTestBase {
     assertEquals(expected.sorted, StreamITCase.testResults.sorted)
   }
 
-}
+  @Test
+  def testInsertIntoMemoryTable(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    MemoryTableSinkUtil.clear
 
+    val t = StreamTestData.getSmall3TupleDataStream(env)
+        .assignAscendingTimestamps(x => x._2)
+      .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
+    tEnv.registerTable("sourceTable", t)
+
+    val fieldNames = Array("d", "e", "f", "t")
+    val fieldTypes = Array(Types.INT, Types.LONG, Types.STRING, Types.SQL_TIMESTAMP)
+      .asInstanceOf[Array[TypeInformation[_]]]
+    val sink = new MemoryTableSinkUtil.UnsafeMemoryAppendTableSink
+    tEnv.registerTableSink("targetTable", fieldNames, fieldTypes, sink)
+
+    val sql = "INSERT INTO targetTable SELECT a, b, c, rowtime FROM sourceTable"
+    tEnv.sqlUpdate(sql)
+    env.execute()
+
+    val expected = List(
+      "1,1,Hi,1970-01-01 00:00:00.001",
+      "2,2,Hello,1970-01-01 00:00:00.002",
+      "3,2,Hello world,1970-01-01 00:00:00.002")
+    assertEquals(expected.sorted, MemoryTableSinkUtil.results.sorted)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/TableSourceITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/TableSourceITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/TableSourceITCase.scala
index be876a8..30ada56 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/TableSourceITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/TableSourceITCase.scala
@@ -43,7 +43,7 @@ class TableSourceITCase extends StreamingMultipleProgramsTestBase {
 
     tEnv.registerTableSource("persons", csvTable)
 
-    tEnv.sql(
+    tEnv.sqlQuery(
       "SELECT id, `first`, `last`, score FROM persons WHERE id < 4 ")
       .toAppendStream[Row]
       .addSink(new StreamITCase.StringSink[Row])

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala
index 830359f..c5b82fe 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala
@@ -20,7 +20,6 @@ package org.apache.flink.table.runtime.stream.table
 
 import java.io.File
 import java.lang.{Boolean => JBool}
-import java.sql.Timestamp
 
 import org.apache.flink.api.common.functions.MapFunction
 import org.apache.flink.api.common.typeinfo.TypeInformation
@@ -37,6 +36,7 @@ import org.apache.flink.table.api.scala._
 import org.apache.flink.table.api.{TableEnvironment, TableException, Types}
 import org.apache.flink.table.runtime.utils.{StreamITCase, StreamTestData}
 import org.apache.flink.table.sinks._
+import org.apache.flink.table.utils.MemoryTableSinkUtil
 import org.apache.flink.test.util.TestBaseUtils
 import org.apache.flink.types.Row
 import org.apache.flink.util.Collector
@@ -49,6 +49,36 @@ import scala.collection.JavaConverters._
 class TableSinkITCase extends StreamingMultipleProgramsTestBase {
 
   @Test
+  def testInsertIntoRegisteredTableSink(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    MemoryTableSinkUtil.clear
+
+    val input = StreamTestData.get3TupleDataStream(env)
+      .assignAscendingTimestamps(r => r._2)
+    val fieldNames = Array("d", "e", "t")
+    val fieldTypes: Array[TypeInformation[_]] = Array(Types.STRING, Types.SQL_TIMESTAMP, Types.LONG)
+    val sink = new MemoryTableSinkUtil.UnsafeMemoryAppendTableSink
+    tEnv.registerTableSink("targetTable", fieldNames, fieldTypes, sink)
+
+    val results = input.toTable(tEnv, 'a, 'b, 'c, 't.rowtime)
+      .where('a < 3 || 'a > 19)
+      .select('c, 't, 'b)
+      .insertInto("targetTable")
+    env.execute()
+
+    val expected = Seq(
+      "Hi,1970-01-01 00:00:00.001,1",
+      "Hello,1970-01-01 00:00:00.002,2",
+      "Comment#14,1970-01-01 00:00:00.006,6",
+      "Comment#15,1970-01-01 00:00:00.006,6").mkString("\n")
+
+    TestBaseUtils.compareResultAsText(MemoryTableSinkUtil.results.asJava, expected)
+  }
+
+  @Test
   def testStreamTableSink(): Unit = {
 
     val tmpFile = File.createTempFile("flink-table-sink-test", ".tmp")

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MemoryTableSinkUtil.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MemoryTableSinkUtil.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MemoryTableSinkUtil.scala
new file mode 100644
index 0000000..29b2e94
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MemoryTableSinkUtil.scala
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.utils
+
+import org.apache.flink.api.common.io.RichOutputFormat
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
+import org.apache.flink.table.sinks.{AppendStreamTableSink, BatchTableSink, TableSinkBase}
+import org.apache.flink.types.Row
+
+import scala.collection.mutable
+
+object MemoryTableSinkUtil {
+  var results: mutable.MutableList[String] = mutable.MutableList.empty[String]
+
+  def clear = {
+    MemoryTableSinkUtil.results.clear()
+  }
+
+  final class UnsafeMemoryAppendTableSink
+    extends TableSinkBase[Row] with BatchTableSink[Row]
+    with AppendStreamTableSink[Row] {
+
+    override def getOutputType: TypeInformation[Row] = {
+      new RowTypeInfo(getFieldTypes, getFieldNames)
+    }
+
+    override protected def copy: TableSinkBase[Row] = {
+      new UnsafeMemoryAppendTableSink
+    }
+
+    override def emitDataSet(dataSet: DataSet[Row]): Unit = {
+      dataSet.output(new MemoryCollectionOutputFormat)
+    }
+
+    override def emitDataStream(dataStream: DataStream[Row]): Unit = {
+      dataStream.addSink(new MemoryAppendSink)
+    }
+  }
+
+  private class MemoryAppendSink extends RichSinkFunction[Row]() {
+
+    override def invoke(value: Row): Unit = {
+      results.synchronized {
+        results += value.toString
+      }
+    }
+  }
+
+  private class MemoryCollectionOutputFormat extends RichOutputFormat[Row] {
+
+    override def configure(parameters: Configuration): Unit = {}
+
+    override def open(taskNumber: Int, numTasks: Int): Unit = {}
+
+    override def writeRecord(record: Row): Unit = {
+      results.synchronized {
+        results += record.toString
+      }
+    }
+
+    override def close(): Unit = {}
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
index c4e2433..ff7c79d 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
@@ -19,6 +19,7 @@
 package org.apache.flink.table.utils
 
 import org.apache.calcite.tools.RuleSet
+import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.table.api.{QueryConfig, Table, TableConfig, TableEnvironment}
 import org.apache.flink.table.sinks.TableSink
 import org.apache.flink.table.sources.TableSource
@@ -40,4 +41,9 @@ class MockTableEnvironment extends TableEnvironment(new TableConfig) {
 
   override protected def getBuiltInPhysicalOptRuleSet: RuleSet = ???
 
+  override def registerTableSink(
+      name: String,
+      fieldNames: Array[String],
+      fieldTypes: Array[TypeInformation[_]],
+      tableSink: TableSink[_]): Unit = ???
 }


[4/4] flink git commit: [FLINK-7357] [table] Fix translation of group window queries with window props and HAVING.

Posted by fh...@apache.org.
[FLINK-7357] [table] Fix translation of group window queries with window props and HAVING.

This closes #4521.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/df5efe9c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/df5efe9c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/df5efe9c

Branch: refs/heads/master
Commit: df5efe9cead172994abb2fd4858a27caacd9468c
Parents: 73a2443
Author: Rong Rong <ro...@uber.com>
Authored: Thu Aug 10 10:46:25 2017 -0700
Committer: Fabian Hueske <fh...@apache.org>
Committed: Wed Sep 20 10:12:13 2017 +0200

----------------------------------------------------------------------
 .../flink/table/plan/rules/FlinkRuleSets.scala  |   4 +-
 .../common/WindowStartEndPropertiesRule.scala   | 169 ++++++++++++-------
 .../table/api/batch/sql/GroupWindowTest.scala   |  41 +++++
 .../table/api/stream/sql/GroupWindowTest.scala  |  38 +++++
 .../table/runtime/stream/sql/SqlITCase.scala    |  51 ++++++
 5 files changed, 241 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/df5efe9c/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
index a81c7d2..073a8cc 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
@@ -147,7 +147,8 @@ object FlinkRuleSets {
 
     // Transform window to LogicalWindowAggregate
     DataSetLogicalWindowAggregateRule.INSTANCE,
-    WindowStartEndPropertiesRule.INSTANCE
+    WindowStartEndPropertiesRule.INSTANCE,
+    WindowStartEndPropertiesHavingRule.INSTANCE
   )
 
   /**
@@ -179,6 +180,7 @@ object FlinkRuleSets {
     // Transform window to LogicalWindowAggregate
     DataStreamLogicalWindowAggregateRule.INSTANCE,
     WindowStartEndPropertiesRule.INSTANCE,
+    WindowStartEndPropertiesHavingRule.INSTANCE,
 
     // simplify expressions rules
     ReduceExpressionsRule.FILTER_INSTANCE,

http://git-wip-us.apache.org/repos/asf/flink/blob/df5efe9c/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowStartEndPropertiesRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowStartEndPropertiesRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowStartEndPropertiesRule.scala
index 14e9b21..33190e6 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowStartEndPropertiesRule.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowStartEndPropertiesRule.scala
@@ -18,20 +18,19 @@
 
 package org.apache.flink.table.plan.rules.common
 
-import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
-import org.apache.calcite.rel.logical.LogicalProject
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelOptRuleOperand}
+import org.apache.calcite.rel.logical.{LogicalFilter, LogicalProject}
 import org.apache.calcite.rex.{RexCall, RexNode}
 import org.apache.calcite.sql.fun.SqlStdOperatorTable
+import org.apache.calcite.tools.RelBuilder
 import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
 import org.apache.flink.table.expressions.{WindowEnd, WindowStart}
 import org.apache.flink.table.plan.logical.rel.LogicalWindowAggregate
 
 import scala.collection.JavaConversions._
 
-class WindowStartEndPropertiesRule
-  extends RelOptRule(
-    WindowStartEndPropertiesRule.WINDOW_EXPRESSION_RULE_PREDICATE,
-    "WindowStartEndPropertiesRule") {
+abstract class WindowStartEndPropertiesBaseRule(rulePredicate: RelOptRuleOperand, ruleName: String)
+  extends RelOptRule(rulePredicate, ruleName) {
 
   override def matches(call: RelOptRuleCall): Boolean = {
     val project = call.rel(0).asInstanceOf[LogicalProject]
@@ -49,61 +48,24 @@ class WindowStartEndPropertiesRule
     project.getProjects.exists(hasGroupAuxiliaries)
   }
 
-  override def onMatch(call: RelOptRuleCall): Unit = {
-
-    val project = call.rel(0).asInstanceOf[LogicalProject]
-    val innerProject = call.rel(1).asInstanceOf[LogicalProject]
-    val agg = call.rel(2).asInstanceOf[LogicalWindowAggregate]
-
-    // Retrieve window start and end properties
-    val transformed = call.builder()
-    val rexBuilder = transformed.getRexBuilder
-    transformed.push(LogicalWindowAggregate.create(
-      agg.getWindow,
-      Seq(
-        NamedWindowProperty("w$start", WindowStart(agg.getWindow.aliasAttribute)),
-        NamedWindowProperty("w$end", WindowEnd(agg.getWindow.aliasAttribute))
-      ), agg)
-    )
-
-    // forward window start and end properties
-    transformed.project(
-      innerProject.getProjects ++ Seq(transformed.field("w$start"), transformed.field("w$end")))
-
-    def replaceGroupAuxiliaries(node: RexNode): RexNode = {
-      node match {
-        case c: RexCall if WindowStartEndPropertiesRule.isWindowStart(c) =>
-          // replace expression by access to window start
-          rexBuilder.makeCast(c.getType, transformed.field("w$start"), false)
-        case c: RexCall if WindowStartEndPropertiesRule.isWindowEnd(c) =>
-          // replace expression by access to window end
-          rexBuilder.makeCast(c.getType, transformed.field("w$end"), false)
-        case c: RexCall =>
-          // replace expressions in children
-          val newOps = c.getOperands.map(replaceGroupAuxiliaries)
-          c.clone(c.getType, newOps)
-        case x =>
-          // preserve expression
-          x
-      }
+  def replaceGroupAuxiliaries(node: RexNode, relBuilder: RelBuilder): RexNode = {
+    val rexBuilder = relBuilder.getRexBuilder
+    node match {
+      case c: RexCall if isWindowStart(c) =>
+        // replace expression by access to window start
+        rexBuilder.makeCast(c.getType, relBuilder.field("w$start"), false)
+      case c: RexCall if isWindowEnd(c) =>
+        // replace expression by access to window end
+        rexBuilder.makeCast(c.getType, relBuilder.field("w$end"), false)
+      case c: RexCall =>
+        // replace expressions in children
+        val newOps = c.getOperands.map(x => replaceGroupAuxiliaries(x, relBuilder))
+        c.clone(c.getType, newOps)
+      case x =>
+        // preserve expression
+        x
     }
-
-    // replace window auxiliary function by access to window properties
-    transformed.project(
-      project.getProjects.map(replaceGroupAuxiliaries)
-    )
-    val res = transformed.build()
-    call.transformTo(res)
   }
-}
-
-object WindowStartEndPropertiesRule {
-  private val WINDOW_EXPRESSION_RULE_PREDICATE =
-    RelOptRule.operand(classOf[LogicalProject],
-      RelOptRule.operand(classOf[LogicalProject],
-        RelOptRule.operand(classOf[LogicalWindowAggregate], RelOptRule.none())))
-
-  val INSTANCE = new WindowStartEndPropertiesRule
 
   /** Checks if a RexNode is a window start auxiliary function. */
   private def isWindowStart(node: RexNode): Boolean = {
@@ -113,7 +75,7 @@ object WindowStartEndPropertiesRule {
           case SqlStdOperatorTable.TUMBLE_START |
                SqlStdOperatorTable.HOP_START |
                SqlStdOperatorTable.SESSION_START
-            => true
+          => true
           case _ => false
         }
       case _ => false
@@ -128,10 +90,95 @@ object WindowStartEndPropertiesRule {
           case SqlStdOperatorTable.TUMBLE_END |
                SqlStdOperatorTable.HOP_END |
                SqlStdOperatorTable.SESSION_END
-            => true
+          => true
           case _ => false
         }
       case _ => false
     }
   }
 }
+
+object WindowStartEndPropertiesRule {
+
+  val INSTANCE = new WindowStartEndPropertiesBaseRule(
+    RelOptRule.operand(classOf[LogicalProject],
+      RelOptRule.operand(classOf[LogicalProject],
+        RelOptRule.operand(classOf[LogicalWindowAggregate], RelOptRule.none()))),
+    "WindowStartEndPropertiesRule") {
+
+    override def onMatch(call: RelOptRuleCall): Unit = {
+
+      val project = call.rel(0).asInstanceOf[LogicalProject]
+      val innerProject = call.rel(1).asInstanceOf[LogicalProject]
+      val agg = call.rel(2).asInstanceOf[LogicalWindowAggregate]
+
+      // Retrieve window start and end properties
+      val builder = call.builder()
+      builder.push(LogicalWindowAggregate.create(
+        agg.getWindow,
+        Seq(
+          NamedWindowProperty("w$start", WindowStart(agg.getWindow.aliasAttribute)),
+          NamedWindowProperty("w$end", WindowEnd(agg.getWindow.aliasAttribute))),
+        agg)
+      )
+
+      // forward window start and end properties
+      builder.project(
+        innerProject.getProjects ++ Seq(builder.field("w$start"), builder.field("w$end")))
+
+      // replace window auxiliary function by access to window properties
+      builder.project(
+        project.getProjects.map(expr => replaceGroupAuxiliaries(expr, builder))
+      )
+      val res = builder.build()
+      call.transformTo(res)
+    }
+  }
+}
+
+object WindowStartEndPropertiesHavingRule {
+
+  val INSTANCE = new WindowStartEndPropertiesBaseRule(
+    RelOptRule.operand(classOf[LogicalProject],
+      RelOptRule.operand(classOf[LogicalFilter],
+        RelOptRule.operand(classOf[LogicalProject],
+          RelOptRule.operand(classOf[LogicalWindowAggregate], RelOptRule.none())))),
+    "WindowStartEndPropertiesHavingRule") {
+
+    override def onMatch(call: RelOptRuleCall): Unit = {
+
+      val project = call.rel(0).asInstanceOf[LogicalProject]
+      val filter = call.rel(1).asInstanceOf[LogicalFilter]
+      val innerProject = call.rel(2).asInstanceOf[LogicalProject]
+      val agg = call.rel(3).asInstanceOf[LogicalWindowAggregate]
+
+      // Retrieve window start and end properties
+      val builder = call.builder()
+      builder.push(LogicalWindowAggregate.create(
+        agg.getWindow,
+        Seq(
+          NamedWindowProperty("w$start", WindowStart(agg.getWindow.aliasAttribute)),
+          NamedWindowProperty("w$end", WindowEnd(agg.getWindow.aliasAttribute))),
+        agg)
+      )
+
+      // forward window start and end properties
+      builder.project(
+        innerProject.getProjects ++ Seq(builder.field("w$start"), builder.field("w$end")))
+
+      // replace window auxiliary function by access to window properties
+      builder.filter(
+        filter.getChildExps.map(expr => replaceGroupAuxiliaries(expr, builder))
+      )
+
+      // replace window auxiliary function by access to window properties
+      builder.project(
+        project.getProjects.map(expr => replaceGroupAuxiliaries(expr, builder))
+      )
+
+      val res = builder.build()
+      call.transformTo(res)
+    }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/df5efe9c/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/GroupWindowTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/GroupWindowTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/GroupWindowTest.scala
index e77087c..a78aa8c 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/GroupWindowTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/GroupWindowTest.scala
@@ -256,4 +256,45 @@ class GroupWindowTest extends TableTestBase {
 
     util.verifySql(sqlQuery, expected)
   }
+
+  @Test
+  def testExpressionOnWindowHavingFunction() = {
+    val util = batchTestUtil()
+    util.addTable[(Int, Long, String, Timestamp)]("T", 'a, 'b, 'c, 'ts)
+
+    val sql =
+      "SELECT " +
+        "  COUNT(*), " +
+        "  HOP_START(ts, INTERVAL '15' MINUTE, INTERVAL '1' MINUTE) " +
+        "FROM T " +
+        "GROUP BY HOP(ts, INTERVAL '15' MINUTE, INTERVAL '1' MINUTE) " +
+        "HAVING " +
+        "  SUM(a) > 0 AND " +
+        "  QUARTER(HOP_START(ts, INTERVAL '15' MINUTE, INTERVAL '1' MINUTE)) = 1"
+
+    val expected =
+      unaryNode(
+        "DataSetCalc",
+        unaryNode(
+          "DataSetWindowAggregate",
+          unaryNode(
+            "DataSetCalc",
+            batchTableNode(0),
+            term("select", "ts, a")
+          ),
+          term("window", SlidingGroupWindow('w$, 'ts, 60000.millis, 900000.millis)),
+          term("select",
+            "COUNT(*) AS EXPR$0",
+            "SUM(a) AS $f1",
+            "start('w$) AS w$start",
+            "end('w$) AS w$end")
+        ),
+        term("select", "EXPR$0", "CAST(w$start) AS w$start"),
+        term("where",
+          "AND(>($f1, 0), " +
+            "=(EXTRACT_DATE(FLAG(QUARTER), /INT(Reinterpret(CAST(w$start)), 86400000)), 1))")
+      )
+
+    util.verifySql(sql, expected)
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/df5efe9c/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/GroupWindowTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/GroupWindowTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/GroupWindowTest.scala
index 4823903..722c4f0 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/GroupWindowTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/GroupWindowTest.scala
@@ -143,4 +143,42 @@ class GroupWindowTest extends TableTestBase {
 
     streamUtil.verifySql(sql, expected)
   }
+
+  @Test
+  def testExpressionOnWindowHavingFunction() = {
+    val sql =
+      "SELECT " +
+        "  COUNT(*), " +
+        "  HOP_START(rowtime, INTERVAL '15' MINUTE, INTERVAL '1' MINUTE) " +
+        "FROM MyTable " +
+        "GROUP BY HOP(rowtime, INTERVAL '15' MINUTE, INTERVAL '1' MINUTE) " +
+        "HAVING " +
+        "  SUM(a) > 0 AND " +
+        "  QUARTER(HOP_START(rowtime, INTERVAL '15' MINUTE, INTERVAL '1' MINUTE)) = 1"
+
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        unaryNode(
+          "DataStreamGroupWindowAggregate",
+          unaryNode(
+            "DataStreamCalc",
+            streamTableNode(0),
+            term("select", "rowtime, a")
+          ),
+          term("window", SlidingGroupWindow('w$, 'rowtime, 60000.millis, 900000.millis)),
+          term("select",
+            "COUNT(*) AS EXPR$0",
+            "SUM(a) AS $f1",
+            "start('w$) AS w$start",
+            "end('w$) AS w$end")
+        ),
+        term("select", "EXPR$0", "w$start"),
+        term("where",
+          "AND(>($f1, 0), " +
+            "=(EXTRACT_DATE(FLAG(QUARTER), /INT(Reinterpret(w$start), 86400000)), 1))")
+      )
+
+    streamUtil.verifySql(sql, expected)
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/df5efe9c/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala
index d2f9a9a..5398c6d 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala
@@ -21,9 +21,11 @@ package org.apache.flink.table.runtime.stream.sql
 import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
 import org.apache.flink.api.java.typeutils.RowTypeInfo
 import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.TimeCharacteristic
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
 import org.apache.flink.table.api.TableEnvironment
 import org.apache.flink.table.api.scala._
+import org.apache.flink.table.runtime.utils.TimeTestUtil.EventTimeSourceFunction
 import org.apache.flink.table.runtime.utils.{StreamITCase, StreamTestData, StreamingWithStateTestBase}
 import org.apache.flink.types.Row
 import org.junit.Assert._
@@ -314,5 +316,54 @@ class SqlITCase extends StreamingWithStateTestBase {
     assertEquals(expected.sorted, StreamITCase.testResults.sorted)
   }
 
+  @Test
+  def testHopStartEndWithHaving(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    env.setStateBackend(getStateBackend)
+    StreamITCase.clear
+    env.setParallelism(1)
+
+    val sqlQueryHopStartEndWithHaving =
+      """
+        |SELECT
+        |  c AS k,
+        |  COUNT(a) AS v,
+        |  HOP_START(rowtime, INTERVAL '1' MINUTE, INTERVAL '1' MINUTE) AS windowStart,
+        |  HOP_END(rowtime, INTERVAL '1' MINUTE, INTERVAL '1' MINUTE) AS windowEnd
+        |FROM T1
+        |GROUP BY HOP(rowtime, INTERVAL '1' MINUTE, INTERVAL '1' MINUTE), c
+        |HAVING
+        |  SUM(b) > 1 AND
+        |    QUARTER(HOP_START(rowtime, INTERVAL '1' MINUTE, INTERVAL '1' MINUTE)) = 1
+      """.stripMargin
+
+    val data = Seq(
+      Left(14000005L, (1, 1L, "Hi")),
+      Left(14000000L, (2, 1L, "Hello")),
+      Left(14000002L, (3, 1L, "Hello")),
+      Right(14000010L),
+      Left(8640000000L, (4, 1L, "Hello")), // data for the quarter to validate having filter
+      Left(8640000001L, (4, 1L, "Hello")),
+      Right(8640000010L)
+    )
+
+    val t1 = env.addSource(new EventTimeSourceFunction[(Int, Long, String)](data))
+      .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
+
+    tEnv.registerTable("T1", t1)
+
+    val resultHopStartEndWithHaving = tEnv.sql(sqlQueryHopStartEndWithHaving).toAppendStream[Row]
+    resultHopStartEndWithHaving.addSink(new StreamITCase.StringSink[Row])
+
+    env.execute()
+
+    val expected = List(
+      "Hello,2,1970-01-01 03:53:00.0,1970-01-01 03:54:00.0"
+    )
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
 }