You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/09/20 08:13:18 UTC

[3/4] flink git commit: [FLINK-6442] [table] Add registration for TableSinks and INSERT INTO support for SQL and Table API.

[FLINK-6442] [table] Add registration for TableSinks and INSERT INTO support for SQL and Table API.

This closes #3829.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2cb37cb9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2cb37cb9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2cb37cb9

Branch: refs/heads/master
Commit: 2cb37cb937c6f225ad7afe829a28a6eda043ffc1
Parents: df5efe9
Author: lincoln-lil <li...@gmail.com>
Authored: Thu May 4 17:52:34 2017 +0800
Committer: Fabian Hueske <fh...@apache.org>
Committed: Wed Sep 20 10:12:13 2017 +0200

----------------------------------------------------------------------
 docs/dev/table/common.md                        | 153 ++++++++++++--
 docs/dev/table/sql.md                           |  83 ++++++--
 docs/dev/table/tableApi.md                      | 105 ++++++++--
 docs/dev/table/udfs.md                          |  20 +-
 .../addons/hbase/HBaseConnectorITCase.java      |   8 +-
 .../flink/table/examples/java/WordCountSQL.java |   2 +-
 .../table/examples/scala/StreamSQLExample.scala |   2 +-
 .../table/examples/scala/WordCountSQL.scala     |   2 +-
 .../flink/table/api/BatchTableEnvironment.scala |  50 ++++-
 .../table/api/StreamTableEnvironment.scala      |  60 +++++-
 .../flink/table/api/TableEnvironment.scala      | 202 +++++++++++++++++--
 .../apache/flink/table/api/queryConfig.scala    |   1 +
 .../org/apache/flink/table/api/table.scala      |  41 +++-
 .../flink/table/calcite/FlinkPlannerImpl.scala  |   6 +-
 .../table/plan/schema/TableSinkTable.scala      |  45 +++++
 .../runtime/batch/JavaTableSourceITCase.java    |   2 +-
 .../runtime/batch/sql/GroupingSetsITCase.java   |   6 +-
 .../table/runtime/batch/sql/JavaSqlITCase.java  |  12 +-
 .../table/runtime/stream/sql/JavaSqlITCase.java |   8 +-
 .../api/batch/BatchTableEnvironmentTest.scala   |   4 +-
 .../sql/validation/CalcValidationTest.scala     |   2 +-
 .../validation/InsertIntoValidationTest.scala   |  77 +++++++
 .../sql/validation/JoinValidationTest.scala     |  22 +-
 .../validation/OverWindowValidationTest.scala   |   4 +-
 .../sql/validation/SortValidationTest.scala     |   2 +-
 .../validation/InsertIntoValidationTest.scala   |  61 ++++++
 .../api/stream/StreamTableEnvironmentTest.scala |   4 +-
 .../flink/table/api/stream/sql/JoinTest.scala   |   4 +-
 .../validation/InsertIntoValidationTest.scala   |  87 ++++++++
 .../validation/OverWindowValidationTest.scala   |   6 +-
 .../validation/CorrelateValidationTest.scala    |   6 +-
 .../validation/InsertIntoValidationTest.scala   |  68 +++++++
 .../validation/TableSinksValidationTest.scala   |  27 ++-
 .../expressions/utils/ExpressionTestBase.scala  |   1 -
 .../plan/ExpressionReductionRulesTest.scala     |   4 +-
 .../flink/table/plan/RetractionRulesTest.scala  |   2 +-
 .../plan/TimeIndicatorConversionTest.scala      |   8 +-
 .../runtime/batch/sql/AggregateITCase.scala     |  34 ++--
 .../table/runtime/batch/sql/CalcITCase.scala    |  26 +--
 .../table/runtime/batch/sql/JoinITCase.scala    |  44 ++--
 .../runtime/batch/sql/SetOperatorsITCase.scala  |  24 +--
 .../table/runtime/batch/sql/SortITCase.scala    |   8 +-
 .../batch/sql/TableEnvironmentITCase.scala      |  35 +++-
 .../runtime/batch/sql/TableSourceITCase.scala   |   4 +-
 .../table/runtime/batch/table/CalcITCase.scala  |   6 +-
 .../batch/table/TableEnvironmentITCase.scala    |  24 +++
 .../runtime/stream/TimeAttributesITCase.scala   |   4 +-
 .../table/runtime/stream/sql/JoinITCase.scala   |   4 +-
 .../runtime/stream/sql/OverWindowITCase.scala   |  30 +--
 .../table/runtime/stream/sql/SortITCase.scala   |   2 +-
 .../table/runtime/stream/sql/SqlITCase.scala    |  59 ++++--
 .../runtime/stream/sql/TableSourceITCase.scala  |   2 +-
 .../runtime/stream/table/TableSinkITCase.scala  |  32 ++-
 .../flink/table/utils/MemoryTableSinkUtil.scala |  84 ++++++++
 .../table/utils/MockTableEnvironment.scala      |   6 +
 .../flink/table/utils/TableTestBase.scala       |   8 +-
 56 files changed, 1371 insertions(+), 262 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/docs/dev/table/common.md
----------------------------------------------------------------------
diff --git a/docs/dev/table/common.md b/docs/dev/table/common.md
index fed2b6d..acd5711 100644
--- a/docs/dev/table/common.md
+++ b/docs/dev/table/common.md
@@ -50,7 +50,7 @@ tableEnv.registerExternalCatalog("extCat", ...);
 // create a Table from a Table API query
 Table tapiResult = tableEnv.scan("table1").select(...);
 // create a Table from a SQL query
-Table sqlResult  = tableEnv.sql("SELECT ... FROM table2 ... ");
+Table sqlResult  = tableEnv.sqlQuery("SELECT ... FROM table2 ... ");
 
 // emit a Table API result Table to a TableSink, same for SQL result
 tapiResult.writeToSink(...);
@@ -77,7 +77,7 @@ tableEnv.registerExternalCatalog("extCat", ...)
 // create a Table from a Table API query
 val tapiResult = tableEnv.scan("table1").select(...)
 // Create a Table from a SQL query
-val sqlResult  = tableEnv.sql("SELECT ... FROM table2 ...")
+val sqlResult  = tableEnv.sqlQuery("SELECT ... FROM table2 ...")
 
 // emit a Table API result Table to a TableSink, same for SQL result
 tapiResult.writeToSink(...)
@@ -149,18 +149,18 @@ val bTableEnv = TableEnvironment.getTableEnvironment(bEnv)
 
 {% top %}
 
-Register a Table in the Catalog
+Register Tables in the Catalog
 -------------------------------
 
-A `TableEnvironment` has an internal catalog of tables, organized by table name. Table API or SQL queries can access tables which are registered in the catalog, by referencing them by name. 
+A `TableEnvironment` maintains a catalog of tables which are registered by name. There are two types of tables, *input tables* and *output tables*. Input tables can be referenced in Table API and SQL queries and provide input data. Output tables can be used to emit the result of a Table API or SQL query to an external system.
 
-A `TableEnvironment` allows you to register a table from various sources:
+An input table can be registered from various sources:
 
 * an existing `Table` object, usually the result of a Table API or SQL query.
 * a `TableSource`, which accesses external data, such as a file, database, or messaging system. 
-* a `DataStream` or `DataSet` from a DataStream or DataSet program.
+* a `DataStream` or `DataSet` from a DataStream or DataSet program. Registering a `DataStream` or `DataSet` is discussed in the [Integration with DataStream and DataSet API](#integration-with-datastream-and-dataset-api) section.
 
-Registering a `DataStream` or `DataSet` as a table is discussed in the [Integration with DataStream and DataSet API](#integration-with-datastream-and-dataset-api) section.
+An output table can be registerd using a `TableSink`. 
 
 ### Register a Table
 
@@ -200,7 +200,7 @@ tableEnv.registerTable("projectedTable", projTable)
 
 ### Register a TableSource
 
-A `TableSource` provides access to external data which is stored in a storage systems such as a database (MySQL, HBase, ...), a file with specific encoding (CSV, Apache \[Parquet, Avro, ORC\], ...), or a messaging system (Apache Kafka, RabbitMQ, ...). 
+A `TableSource` provides access to external data which is stored in a storage system such as a database (MySQL, HBase, ...), a file with a specific encoding (CSV, Apache \[Parquet, Avro, ORC\], ...), or a messaging system (Apache Kafka, RabbitMQ, ...). 
 
 Flink aims to provide TableSources for common data formats and storage systems. Please have a look at the [Table Sources and Sinks]({{ site.baseurl }}/dev/table/sourceSinks.html) page for a list of supported TableSources and instructions for how to build a custom `TableSource`.
 
@@ -236,6 +236,52 @@ tableEnv.registerTableSource("CsvTable", csvSource)
 
 {% top %}
 
+### Register a TableSink
+
+A registered `TableSink` can be used to [emit the result of a Table API or SQL query](common.html#emit-a-table) to an external storage system, such as a database, key-value store, message queue, or file system (in different encodings, e.g., CSV, Apache \[Parquet, Avro, ORC\], ...).
+
+Flink aims to provide TableSinks for common data formats and storage systems. Please see the documentation about [Table Sources and Sinks]({{ site.baseurl }}/dev/table/sourceSinks.html) page for details about available sinks and instructions for how to implement a custom `TableSink`.
+
+A `TableSink` is registered in a `TableEnvironment` as follows:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+// get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
+StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+
+// create a TableSink
+TableSink csvSink = new CsvTableSink("/path/to/file", ...);
+
+// define the field names and types
+String[] fieldNames = {"a", "b", "c"};
+TypeInformation[] fieldTypes = {Types.INT, Types.STRING, Types.LONG};
+
+// register the TableSink as table "CsvSinkTable"
+tableEnv.registerTableSink("CsvSinkTable", fieldNames, fieldTypes, csvSink);
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+// get a TableEnvironment
+val tableEnv = TableEnvironment.getTableEnvironment(env)
+
+// create a TableSink
+val csvSink: TableSink = new CsvTableSink("/path/to/file", ...)
+
+// define the field names and types
+val fieldNames: Arary[String] = Array("a", "b", "c")
+val fieldTypes: Array[TypeInformation[_]] = Array(Types.INT, Types.STRING, Types.LONG)
+
+// register the TableSink as table "CsvSinkTable"
+tableEnv.registerTableSink("CsvSinkTable", fieldNames, fieldTypes, csvSink)
+{% endhighlight %}
+</div>
+</div>
+
+{% top %}
+
 Register an External Catalog
 ----------------------------
 
@@ -342,7 +388,7 @@ Flink's SQL integration is based on [Apache Calcite](https://calcite.apache.org)
 
 The [SQL]({{ site.baseurl }}/dev/table/sql.html) document describes Flink's SQL support for streaming and batch tables.
 
-The following example shows how to specify a query and return the result as a Table.
+The following example shows how to specify a query and return the result as a `Table`.
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
@@ -353,7 +399,7 @@ StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
 // register Orders table
 
 // compute revenue for all customers from France
-Table revenue = tableEnv.sql(
+Table revenue = tableEnv.sqlQuery(
     "SELECT cID, cName, SUM(revenue) AS revSum " +
     "FROM Orders " +
     "WHERE cCountry = 'FRANCE' " +
@@ -373,7 +419,7 @@ val tableEnv = TableEnvironment.getTableEnvironment(env)
 // register Orders table
 
 // compute revenue for all customers from France
-Table revenue = tableEnv.sql(""" 
+Table revenue = tableEnv.sqlQuery("""
   |SELECT cID, cName, SUM(revenue) AS revSum
   |FROM Orders
   |WHERE cCountry = 'FRANCE'
@@ -387,6 +433,53 @@ Table revenue = tableEnv.sql("""
 </div>
 </div>
 
+The following example shows how to specify an update query that inserts its result into a registered table.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+// get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
+StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+
+// register "Orders" table
+// register "RevenueFrance" output table
+
+// compute revenue for all customers from France and emit to "RevenueFrance"
+tableEnv.sqlUpdate(
+    "INSERT INTO RevenueFrance " +
+    "SELECT cID, cName, SUM(revenue) AS revSum " +
+    "FROM Orders " +
+    "WHERE cCountry = 'FRANCE' " +
+    "GROUP BY cID, cName"
+  );
+
+// execute query
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+// get a TableEnvironment
+val tableEnv = TableEnvironment.getTableEnvironment(env)
+
+// register "Orders" table
+// register "RevenueFrance" output table
+
+// compute revenue for all customers from France and emit to "RevenueFrance"
+tableEnv.sqlUpdate("""
+  |INSERT INTO RevenueFrance
+  |SELECT cID, cName, SUM(revenue) AS revSum
+  |FROM Orders
+  |WHERE cCountry = 'FRANCE'
+  |GROUP BY cID, cName
+  """.stripMargin)
+
+// execute query
+{% endhighlight %}
+
+</div>
+</div>
+
 {% top %}
 
 ### Mixing Table API and SQL
@@ -401,12 +494,19 @@ Table API and SQL queries can be easily mixed because both return `Table` object
 Emit a Table 
 ------------
 
-In order to emit a `Table`, it can be written to a `TableSink`. A `TableSink` is a generic interface to support a wide variety of file formats (e.g. CSV, Apache Parquet, Apache Avro), storage systems (e.g., JDBC, Apache HBase, Apache Cassandra, Elasticsearch), or messaging systems (e.g., Apache Kafka, RabbitMQ). 
+A `Table` is emitted by writing it to a `TableSink`. A `TableSink` is a generic interface to support a wide variety of file formats (e.g. CSV, Apache Parquet, Apache Avro), storage systems (e.g., JDBC, Apache HBase, Apache Cassandra, Elasticsearch), or messaging systems (e.g., Apache Kafka, RabbitMQ). 
 
-A batch `Table` can only be written to a `BatchTableSink`, while a streaming table requires either an `AppendStreamTableSink`, a `RetractStreamTableSink`, or an `UpsertStreamTableSink`. 
+A batch `Table` can only be written to a `BatchTableSink`, while a streaming `Table` requires either an `AppendStreamTableSink`, a `RetractStreamTableSink`, or an `UpsertStreamTableSink`. 
 
 Please see the documentation about [Table Sources & Sinks]({{ site.baseurl }}/dev/table/sourceSinks.html) for details about available sinks and instructions for how to implement a custom `TableSink`.
 
+There are two ways to emit a table:
+
+1. The `Table.writeToSink(TableSink sink)` method emits the table using the provided `TableSink` and automatically configures the sink with the schema of the table to emit.
+2. The `Table.insertInto(String sinkTable)` method looks up a `TableSink` that was registered with a specific schema under the provided name in the `TableEnvironment`'s catalog. The schema of the table to emit is validated against the schema of the registered `TableSink`.
+
+The following examples shows how to emit a `Table`:
+
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
@@ -419,9 +519,18 @@ Table result = ...
 // create a TableSink
 TableSink sink = new CsvTableSink("/path/to/file", fieldDelim = "|");
 
-// write the result Table to the TableSink
+// METHOD 1:
+//   Emit the result Table to the TableSink via the writeToSink() method
 result.writeToSink(sink);
 
+// METHOD 2:
+//   Register the TableSink with a specific schema
+String[] fieldNames = {"a", "b", "c"};
+TypeInformation[] fieldTypes = {Types.INT, Types.STRING, Types.LONG};
+tableEnv.registerTableSink("CsvSinkTable", fieldNames, fieldTypes, sink);
+//   Emit the result Table to the registered TableSink via the insertInto() method
+result.insertInto("CsvSinkTable");
+
 // execute the program
 {% endhighlight %}
 </div>
@@ -437,9 +546,18 @@ val result: Table = ...
 // create a TableSink
 val sink: TableSink = new CsvTableSink("/path/to/file", fieldDelim = "|")
 
-// write the result Table to the TableSink
+// METHOD 1:
+//   Emit the result Table to the TableSink via the writeToSink() method
 result.writeToSink(sink)
 
+// METHOD 2:
+//   Register the TableSink with a specific schema
+val fieldNames: Array[String] = Array("a", "b", "c")
+val fieldTypes: Array[TypeInformation] = Array(Types.INT, Types.STRING, Types.LONG)
+tableEnv.registerTableSink("CsvSinkTable", fieldNames, fieldTypes, sink)
+//   Emit the result Table to the registered TableSink via the insertInto() method
+result.insertInto("CsvSinkTable")
+
 // execute the program
 {% endhighlight %}
 </div>
@@ -458,8 +576,9 @@ Table API and SQL queries are translated into [DataStream]({{ site.baseurl }}/de
 
 A Table API or SQL query is translated when:
 
-* the `Table` is emitted to a `TableSink`, i.e., when `Table.writeToSink()` is called.
-* the `Table` is converted into a `DataStream` or `DataSet` (see [Integration with DataStream and DataSet API](#integration-with-dataStream-and-dataSet-api)).
+* a `Table` is emitted to a `TableSink`, i.e., when `Table.writeToSink()` or `Table.insertInto()` is called.
+* a SQL update query is specified, i.e., when `TableEnvironment.sqlUpdate()` is called.
+* a `Table` is converted into a `DataStream` or `DataSet` (see [Integration with DataStream and DataSet API](#integration-with-dataStream-and-dataSet-api)).
 
 Once translated, a Table API or SQL query is handled like a regular DataStream or DataSet program and is executed when `StreamExecutionEnvironment.execute()` or `ExecutionEnvironment.execute()` is called.
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/docs/dev/table/sql.md
----------------------------------------------------------------------
diff --git a/docs/dev/table/sql.md b/docs/dev/table/sql.md
index fa4e3f3..b9205ab 100644
--- a/docs/dev/table/sql.md
+++ b/docs/dev/table/sql.md
@@ -49,15 +49,25 @@ DataStream<Tuple3<Long, String, Integer>> ds = env.addSource(...);
 
 // SQL query with an inlined (unregistered) table
 Table table = tableEnv.toTable(ds, "user, product, amount");
-Table result = tableEnv.sql(
+Table result = tableEnv.sqlQuery(
   "SELECT SUM(amount) FROM " + table + " WHERE product LIKE '%Rubber%'");
 
 // SQL query with a registered table
 // register the DataStream as table "Orders"
 tableEnv.registerDataStream("Orders", ds, "user, product, amount");
 // run a SQL query on the Table and retrieve the result as a new Table
-Table result2 = tableEnv.sql(
+Table result2 = tableEnv.sqlQuery(
   "SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'");
+
+// SQL update with a registered table
+// create and register a TableSink
+TableSink csvSink = new CsvTableSink("/path/to/file", ...);
+String[] fieldNames = {"product", "amount"};
+TypeInformation[] fieldTypes = {Types.STRING, Types.INT};
+tableEnv.registerTableSink("RubberOrders", fieldNames, fieldTypes, csvSink);
+// run a SQL update query on the Table and emit the result to the TableSink
+tableEnv.sqlUpdate(
+  "INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'");
 {% endhighlight %}
 </div>
 
@@ -71,15 +81,25 @@ val ds: DataStream[(Long, String, Integer)] = env.addSource(...)
 
 // SQL query with an inlined (unregistered) table
 val table = ds.toTable(tableEnv, 'user, 'product, 'amount)
-val result = tableEnv.sql(
+val result = tableEnv.sqlQuery(
   s"SELECT SUM(amount) FROM $table WHERE product LIKE '%Rubber%'")
 
 // SQL query with a registered table
 // register the DataStream under the name "Orders"
 tableEnv.registerDataStream("Orders", ds, 'user, 'product, 'amount)
 // run a SQL query on the Table and retrieve the result as a new Table
-val result2 = tableEnv.sql(
+val result2 = tableEnv.sqlQuery(
   "SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")
+
+// SQL update with a registered table
+// create and register a TableSink
+TableSink csvSink = new CsvTableSink("/path/to/file", ...)
+val fieldNames: Arary[String] = Array("product", "amount")
+val fieldTypes: Array[TypeInformation[_]] = Array(Types.STRING, Types.INT)
+tableEnv.registerTableSink("RubberOrders", fieldNames, fieldTypes, csvSink)
+// run a SQL update query on the Table and emit the result to the TableSink
+tableEnv.sqlUpdate(
+  "INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")
 {% endhighlight %}
 </div>
 </div>
@@ -89,7 +109,7 @@ val result2 = tableEnv.sql(
 Supported Syntax
 ----------------
 
-Flink parses SQL using [Apache Calcite](https://calcite.apache.org/docs/reference.html), which supports standard ANSI SQL. DML and DDL statements are not supported by Flink.
+Flink parses SQL using [Apache Calcite](https://calcite.apache.org/docs/reference.html), which supports standard ANSI SQL. DDL statements are not supported by Flink.
 
 The following BNF-grammar describes the superset of supported SQL features in batch and streaming queries. The [Operations](#operations) section shows examples for the supported features and indicates which features are only supported for batch or streaming queries.
 
@@ -156,6 +176,10 @@ groupItem:
   | ROLLUP '(' expression [, expression ]* ')'
   | GROUPING SETS '(' groupItem [, groupItem ]* ')'
 
+insert:
+  INSERT INTO tableReference
+  query
+
 ```
 
 Flink SQL uses a lexical policy for identifier (table, attribute, function names) similar to Java:
@@ -566,6 +590,39 @@ LIMIT 3
 
 {% top %}
 
+### Insert
+
+<div markdown="1">
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left" style="width: 20%">Operation</th>
+      <th class="text-center">Description</th>
+    </tr>
+  </thead>
+  <tbody>
+    <tr>
+      <td>
+        <strong>Insert Into</strong><br>
+        <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span>
+      </td>
+      <td>
+        <p>Output tables must be registered in the TableEnvironment (see <a href="common.html#register-a-tablesink">Register a TableSink</a>). Moreover, the schema of the registered table must match the schema of the query.</p>
+
+{% highlight sql %}
+INSERT INTO OutputTable
+SELECT users, tag
+FROM Orders
+{% endhighlight %}
+      </td>
+    </tr>
+
+  </tbody>
+</table>
+</div>
+
+{% top %}
+
 ### Group Windows
 
 Group windows are defined in the `GROUP BY` clause of a SQL query. Just like queries with regular `GROUP BY` clauses, queries with a `GROUP BY` clause that includes a group window function compute a single result row per group. The following group windows functions are supported for SQL on batch and streaming tables.
@@ -649,22 +706,22 @@ DataStream<Tuple3<Long, String, Integer>> ds = env.addSource(...);
 tableEnv.registerDataStream("Orders", ds, "user, product, amount, proctime.proctime, rowtime.rowtime");
 
 // compute SUM(amount) per day (in event-time)
-Table result1 = tableEnv.sql(
+Table result1 = tableEnv.sqlQuery(
   "SELECT user, " +
   "  TUMBLE_START(rowtime, INTERVAL '1' DAY) as wStart,  " +
   "  SUM(amount) FROM Orders " +
   "GROUP BY TUMBLE(rowtime, INTERVAL '1' DAY), user");
 
 // compute SUM(amount) per day (in processing-time)
-Table result2 = tableEnv.sql(
+Table result2 = tableEnv.sqlQuery(
   "SELECT user, SUM(amount) FROM Orders GROUP BY TUMBLE(proctime, INTERVAL '1' DAY), user");
 
 // compute every hour the SUM(amount) of the last 24 hours in event-time
-Table result3 = tableEnv.sql(
+Table result3 = tableEnv.sqlQuery(
   "SELECT product, SUM(amount) FROM Orders GROUP BY HOP(rowtime, INTERVAL '1' HOUR, INTERVAL '1' DAY), product");
 
 // compute SUM(amount) per session with 12 hour inactivity gap (in event-time)
-Table result4 = tableEnv.sql(
+Table result4 = tableEnv.sqlQuery(
   "SELECT user, " +
   "  SESSION_START(rowtime, INTERVAL '12' HOUR) AS sStart, " +
   "  SESSION_END(rowtime, INTERVAL '12' HOUR) AS snd, " +
@@ -686,7 +743,7 @@ val ds: DataStream[(Long, String, Int)] = env.addSource(...)
 tableEnv.registerDataStream("Orders", ds, 'user, 'product, 'amount, 'proctime.proctime, 'rowtime.rowtime)
 
 // compute SUM(amount) per day (in event-time)
-val result1 = tableEnv.sql(
+val result1 = tableEnv.sqlQuery(
     """
       |SELECT
       |  user,
@@ -697,15 +754,15 @@ val result1 = tableEnv.sql(
     """.stripMargin)
 
 // compute SUM(amount) per day (in processing-time)
-val result2 = tableEnv.sql(
+val result2 = tableEnv.sqlQuery(
   "SELECT user, SUM(amount) FROM Orders GROUP BY TUMBLE(proctime, INTERVAL '1' DAY), user")
 
 // compute every hour the SUM(amount) of the last 24 hours in event-time
-val result3 = tableEnv.sql(
+val result3 = tableEnv.sqlQuery(
   "SELECT product, SUM(amount) FROM Orders GROUP BY HOP(rowtime, INTERVAL '1' HOUR, INTERVAL '1' DAY), product")
 
 // compute SUM(amount) per session with 12 hour inactivity gap (in event-time)
-val result4 = tableEnv.sql(
+val result4 = tableEnv.sqlQuery(
     """
       |SELECT
       |  user,

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/docs/dev/table/tableApi.md
----------------------------------------------------------------------
diff --git a/docs/dev/table/tableApi.md b/docs/dev/table/tableApi.md
index fd57111..0a2acab 100644
--- a/docs/dev/table/tableApi.md
+++ b/docs/dev/table/tableApi.md
@@ -961,7 +961,52 @@ val result = left.select('a, 'b, 'c).where('a.in(right));
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left" style="width: 20%">Operators</th>
+      <th class="text-center">Description</th>
+    </tr>
+  </thead>
+  <tbody>
+    <tr>
+      <td>
+        <strong>Order By</strong><br>
+        <span class="label label-primary">Batch</span>
+      </td>
+      <td>
+        <p>Similar to a SQL ORDER BY clause. Returns records globally sorted across all parallel partitions.</p>
+{% highlight java %}
+Table in = tableEnv.fromDataSet(ds, "a, b, c");
+Table result = in.orderBy("a.asc");
+{% endhighlight %}
+      </td>
+    </tr>
+
+    <tr>
+      <td>
+        <strong>Limit</strong><br>
+        <span class="label label-primary">Batch</span>
+      </td>
+      <td>
+        <p>Similar to a SQL LIMIT clause. Limits a sorted result to a specified number of records from an offset position. Limit is technically part of the Order By operator and thus must be preceded by it.</p>
+{% highlight java %}
+Table in = tableEnv.fromDataSet(ds, "a, b, c");
+Table result = in.orderBy("a.asc").limit(3); // returns unlimited number of records beginning with the 4th record
+{% endhighlight %}
+or
+{% highlight java %}
+Table in = tableEnv.fromDataSet(ds, "a, b, c");
+Table result = in.orderBy("a.asc").limit(3, 5); // returns 5 records beginning with the 4th record
+{% endhighlight %}
+      </td>
+    </tr>
+
+  </tbody>
+</table>
+</div>
 
+<div data-lang="scala" markdown="1">
 <table class="table table-bordered">
   <thead>
     <tr>
@@ -970,7 +1015,7 @@ val result = left.select('a, 'b, 'c).where('a.in(right));
     </tr>
   </thead>
   <tbody>
-  	<tr>
+    <tr>
       <td>
         <strong>Order By</strong><br>
         <span class="label label-primary">Batch</span>
@@ -1005,9 +1050,13 @@ val result = in.orderBy('a.asc).limit(3, 5); // returns 5 records beginning with
 
   </tbody>
 </table>
-
 </div>
-<div data-lang="scala" markdown="1">
+</div>
+
+### Insert
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
 
 <table class="table table-bordered">
   <thead>
@@ -1017,35 +1066,49 @@ val result = in.orderBy('a.asc).limit(3, 5); // returns 5 records beginning with
     </tr>
   </thead>
   <tbody>
-  	<tr>
+    <tr>
       <td>
-        <strong>Order By</strong><br>
-        <span class="label label-primary">Batch</span>
+        <strong>Insert Into</strong><br>
+        <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span>
       </td>
       <td>
-        <p>Similar to a SQL ORDER BY clause. Returns records globally sorted across all parallel partitions.</p>
+        <p>Similar to the INSERT INTO clause in a SQL query. Performs a insertion into a registered output table.</p>
+
+        <p>Output tables must be registered in the TableEnvironment (see <a href="common.html#register-a-tablesink">Register a TableSink</a>). Moreover, the schema of the registered table must match the schema of the query.</p>
+
 {% highlight java %}
-Table in = tableEnv.fromDataSet(ds, "a, b, c");
-Table result = in.orderBy("a.asc");
+Table orders = tableEnv.scan("Orders");
+orders.insertInto("OutOrders");
 {% endhighlight %}
       </td>
     </tr>
 
+  </tbody>
+</table>
+</div>
+
+<div data-lang="scala" markdown="1">
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left" style="width: 20%">Operators</th>
+      <th class="text-center">Description</th>
+    </tr>
+  </thead>
+  <tbody>
     <tr>
       <td>
-        <strong>Limit</strong><br>
-        <span class="label label-primary">Batch</span>
+        <strong>Insert Into</strong><br>
+        <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span>
       </td>
       <td>
-        <p>Similar to a SQL LIMIT clause. Limits a sorted result to a specified number of records from an offset position. Limit is technically part of the Order By operator and thus must be preceded by it.</p>
-{% highlight java %}
-Table in = tableEnv.fromDataSet(ds, "a, b, c");
-Table result = in.orderBy("a.asc").limit(3); // returns unlimited number of records beginning with the 4th record
-{% endhighlight %}
-or
-{% highlight java %}
-Table in = tableEnv.fromDataSet(ds, "a, b, c");
-Table result = in.orderBy("a.asc").limit(3, 5); // returns 5 records beginning with the 4th record
+        <p>Similar to the INSERT INTO clause in a SQL query. Performs a insertion into a registered output table.</p>
+
+        <p>Output tables must be registered in the TableEnvironment (see <a href="common.html#register-a-tablesink">Register a TableSink</a>). Moreover, the schema of the registered table must match the schema of the query.</p>
+
+{% highlight scala %}
+val orders: Table = tableEnv.scan("Orders")
+orders.insertInto("OutOrders")
 {% endhighlight %}
       </td>
     </tr>
@@ -1055,6 +1118,8 @@ Table result = in.orderBy("a.asc").limit(3, 5); // returns 5 records beginning w
 </div>
 </div>
 
+{% top %}
+
 ### Group Windows
 
 Group window aggregates group rows into finite groups based on time or row-count intervals and evaluate aggregation functions once per group. For batch tables, windows are a convenient shortcut to group records by time intervals.

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/docs/dev/table/udfs.md
----------------------------------------------------------------------
diff --git a/docs/dev/table/udfs.md b/docs/dev/table/udfs.md
index 6c9bc1a..eef7db6 100644
--- a/docs/dev/table/udfs.md
+++ b/docs/dev/table/udfs.md
@@ -72,7 +72,7 @@ tableEnv.registerFunction("hashCode", new HashCode(10));
 myTable.select("string, string.hashCode(), hashCode(string)");
 
 // use the function in SQL API
-tableEnv.sql("SELECT string, HASHCODE(string) FROM MyTable");
+tableEnv.sqlQuery("SELECT string, HASHCODE(string) FROM MyTable");
 {% endhighlight %}
 </div>
 
@@ -93,7 +93,7 @@ myTable.select('string, hashCode('string))
 
 // register and use the function in SQL
 tableEnv.registerFunction("hashCode", new HashCode(10))
-tableEnv.sql("SELECT string, HASHCODE(string) FROM MyTable");
+tableEnv.sqlQuery("SELECT string, HASHCODE(string) FROM MyTable");
 {% endhighlight %}
 </div>
 </div>
@@ -176,9 +176,9 @@ myTable.leftOuterJoin("split(a) as (word, length)").select("a, word, length");
 
 // Use the table function in SQL with LATERAL and TABLE keywords.
 // CROSS JOIN a table function (equivalent to "join" in Table API).
-tableEnv.sql("SELECT a, word, length FROM MyTable, LATERAL TABLE(split(a)) as T(word, length)");
+tableEnv.sqlQuery("SELECT a, word, length FROM MyTable, LATERAL TABLE(split(a)) as T(word, length)");
 // LEFT JOIN a table function (equivalent to "leftOuterJoin" in Table API).
-tableEnv.sql("SELECT a, word, length FROM MyTable LEFT JOIN LATERAL TABLE(split(a)) as T(word, length) ON TRUE");
+tableEnv.sqlQuery("SELECT a, word, length FROM MyTable LEFT JOIN LATERAL TABLE(split(a)) as T(word, length) ON TRUE");
 {% endhighlight %}
 </div>
 
@@ -206,9 +206,9 @@ tableEnv.registerFunction("split", new Split("#"))
 
 // Use the table function in SQL with LATERAL and TABLE keywords.
 // CROSS JOIN a table function (equivalent to "join" in Table API)
-tableEnv.sql("SELECT a, word, length FROM MyTable, LATERAL TABLE(split(a)) as T(word, length)");
+tableEnv.sqlQuery("SELECT a, word, length FROM MyTable, LATERAL TABLE(split(a)) as T(word, length)");
 // LEFT JOIN a table function (equivalent to "leftOuterJoin" in Table API)
-tableEnv.sql("SELECT a, word, length FROM MyTable LEFT JOIN TABLE(split(a)) as T(word, length) ON TRUE");
+tableEnv.sqlQuery("SELECT a, word, length FROM MyTable LEFT JOIN TABLE(split(a)) as T(word, length) ON TRUE");
 {% endhighlight %}
 **IMPORTANT:** Do not implement TableFunction as a Scala object. Scala object is a singleton and will cause concurrency issues.
 </div>
@@ -572,7 +572,7 @@ StreamTableEnvironment tEnv = ...
 tEnv.registerFunction("wAvg", new WeightedAvg());
 
 // use function
-tEnv.sql("SELECT user, wAvg(points, level) AS avgPoints FROM userScores GROUP BY user");
+tEnv.sqlQuery("SELECT user, wAvg(points, level) AS avgPoints FROM userScores GROUP BY user");
 
 {% endhighlight %}
 </div>
@@ -649,7 +649,7 @@ val tEnv: StreamTableEnvironment = ???
 tEnv.registerFunction("wAvg", new WeightedAvg())
 
 // use function
-tEnv.sql("SELECT user, wAvg(points, level) AS avgPoints FROM userScores GROUP BY user")
+tEnv.sqlQuery("SELECT user, wAvg(points, level) AS avgPoints FROM userScores GROUP BY user")
 
 {% endhighlight %}
 </div>
@@ -720,7 +720,7 @@ tableEnv.registerFunction("hashCode", new HashCode());
 myTable.select("string, string.hashCode(), hashCode(string)");
 
 // use the function in SQL
-tableEnv.sql("SELECT string, HASHCODE(string) FROM MyTable");
+tableEnv.sqlQuery("SELECT string, HASHCODE(string) FROM MyTable");
 {% endhighlight %}
 </div>
 
@@ -748,7 +748,7 @@ myTable.select('string, hashCode('string))
 
 // register and use the function in SQL
 tableEnv.registerFunction("hashCode", hashCode)
-tableEnv.sql("SELECT string, HASHCODE(string) FROM MyTable");
+tableEnv.sqlQuery("SELECT string, HASHCODE(string) FROM MyTable");
 {% endhighlight %}
 
 </div>

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseConnectorITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseConnectorITCase.java b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseConnectorITCase.java
index 5d71ca5..3da4230 100644
--- a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseConnectorITCase.java
+++ b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseConnectorITCase.java
@@ -155,7 +155,7 @@ public class HBaseConnectorITCase extends HBaseTestingClusterAutostarter {
 		hbaseTable.addColumn(FAMILY3, F3COL3, String.class);
 		tableEnv.registerTableSource("hTable", hbaseTable);
 
-		Table result = tableEnv.sql(
+		Table result = tableEnv.sqlQuery(
 			"SELECT " +
 				"  h.family1.col1, " +
 				"  h.family2.col1, " +
@@ -196,7 +196,7 @@ public class HBaseConnectorITCase extends HBaseTestingClusterAutostarter {
 		hbaseTable.addColumn(FAMILY3, F3COL3, String.class);
 		tableEnv.registerTableSource("hTable", hbaseTable);
 
-		Table result = tableEnv.sql(
+		Table result = tableEnv.sqlQuery(
 			"SELECT " +
 				"  h.family1.col1, " +
 				"  h.family3.col1, " +
@@ -236,7 +236,7 @@ public class HBaseConnectorITCase extends HBaseTestingClusterAutostarter {
 		hbaseTable.addColumn(FAMILY3, F3COL3, String.class);
 		tableEnv.registerTableSource("hTable", hbaseTable);
 
-		Table result = tableEnv.sql(
+		Table result = tableEnv.sqlQuery(
 			"SELECT * FROM hTable AS h"
 		);
 		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
@@ -270,7 +270,7 @@ public class HBaseConnectorITCase extends HBaseTestingClusterAutostarter {
 		tableEnv.registerFunction("toUTF8", new ToUTF8());
 		tableEnv.registerFunction("toLong", new ToLong());
 
-		Table result = tableEnv.sql(
+		Table result = tableEnv.sqlQuery(
 			"SELECT " +
 				"  toUTF8(h.family2.col1), " +
 				"  toLong(h.family2.col2) " +

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/WordCountSQL.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/WordCountSQL.java b/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/WordCountSQL.java
index 65efc17..22f0553 100644
--- a/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/WordCountSQL.java
+++ b/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/WordCountSQL.java
@@ -53,7 +53,7 @@ public class WordCountSQL {
 		tEnv.registerDataSet("WordCount", input, "word, frequency");
 
 		// run a SQL query on the Table and retrieve the result as a new Table
-		Table table = tEnv.sql(
+		Table table = tEnv.sqlQuery(
 			"SELECT word, SUM(frequency) as frequency FROM WordCount GROUP BY word");
 
 		DataSet<WC> result = tEnv.toDataSet(table, WC.class);

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamSQLExample.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamSQLExample.scala b/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamSQLExample.scala
index 665913e..3297aec 100644
--- a/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamSQLExample.scala
+++ b/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamSQLExample.scala
@@ -58,7 +58,7 @@ object StreamSQLExample {
     tEnv.registerDataStream("OrderB", orderB, 'user, 'product, 'amount)
 
     // union the two tables
-    val result = tEnv.sql(
+    val result = tEnv.sqlQuery(
       "SELECT * FROM OrderA WHERE amount > 2 UNION ALL " +
         "SELECT * FROM OrderB WHERE amount < 2")
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/WordCountSQL.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/WordCountSQL.scala b/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/WordCountSQL.scala
index a8b8268..55bbdb5 100644
--- a/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/WordCountSQL.scala
+++ b/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/WordCountSQL.scala
@@ -48,7 +48,7 @@ object WordCountSQL {
     tEnv.registerDataSet("WordCount", input, 'word, 'frequency)
 
     // run a SQL query on the Table and retrieve the result as a new Table
-    val table = tEnv.sql("SELECT word, SUM(frequency) FROM WordCount GROUP BY word")
+    val table = tEnv.sqlQuery("SELECT word, SUM(frequency) FROM WordCount GROUP BY word")
 
     table.toDataSet[WC].print()
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
index a9d60dd..bca5826 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
@@ -36,7 +36,7 @@ import org.apache.flink.table.expressions.{Expression, TimeAttribute}
 import org.apache.flink.table.plan.nodes.FlinkConventions
 import org.apache.flink.table.plan.nodes.dataset.DataSetRel
 import org.apache.flink.table.plan.rules.FlinkRuleSets
-import org.apache.flink.table.plan.schema.{DataSetTable, RowSchema, TableSourceTable}
+import org.apache.flink.table.plan.schema.{DataSetTable, RowSchema, TableSinkTable, TableSourceTable}
 import org.apache.flink.table.runtime.MapRunner
 import org.apache.flink.table.sinks.{BatchTableSink, TableSink}
 import org.apache.flink.table.sources.{BatchTableSource, TableSource}
@@ -106,6 +106,54 @@ abstract class BatchTableEnvironment(
   }
 
   /**
+    * Registers an external [[TableSink]] with given field names and types in this
+    * [[TableEnvironment]]'s catalog.
+    * Registered sink tables can be referenced in SQL DML statements.
+    *
+    * Example:
+    *
+    * {{{
+    *   // create a table sink and its field names and types
+    *   val fieldNames: Array[String] = Array("a", "b", "c")
+    *   val fieldTypes: Array[TypeInformation[_]] = Array(Types.STRING, Types.INT, Types.LONG)
+    *   val tableSink: BatchTableSink = new YourTableSinkImpl(...)
+    *
+    *   // register the table sink in the catalog
+    *   tableEnv.registerTableSink("output_table", fieldNames, fieldsTypes, tableSink)
+    *
+    *   // use the registered sink
+    *   tableEnv.sqlUpdate("INSERT INTO output_table SELECT a, b, c FROM sourceTable")
+    * }}}
+    *
+    * @param name       The name under which the [[TableSink]] is registered.
+    * @param fieldNames The field names to register with the [[TableSink]].
+    * @param fieldTypes The field types to register with the [[TableSink]].
+    * @param tableSink  The [[TableSink]] to register.
+    */
+  def registerTableSink(
+      name: String,
+      fieldNames: Array[String],
+      fieldTypes: Array[TypeInformation[_]],
+      tableSink: TableSink[_]): Unit = {
+
+    checkValidTableName(name)
+    if (fieldNames == null) throw TableException("fieldNames must not be null.")
+    if (fieldTypes == null) throw TableException("fieldTypes must not be null.")
+    if (fieldNames.length == 0) throw new TableException("fieldNames must not be empty.")
+    if (fieldNames.length != fieldTypes.length) {
+      throw new TableException("Same number of field names and types required.")
+    }
+
+    tableSink match {
+      case batchTableSink: BatchTableSink[_] =>
+        val configuredSink = batchTableSink.configure(fieldNames, fieldTypes)
+        registerTableInternal(name, new TableSinkTable(configuredSink))
+      case _ =>
+        throw new TableException("Only BatchTableSink can be registered in BatchTableEnvironment.")
+    }
+  }
+
+  /**
     * Writes a [[Table]] to a [[TableSink]].
     *
     * Internally, the [[Table]] is translated into a [[DataSet]] and handed over to the

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
index 8d8cebb..c7cc61b 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
@@ -42,12 +42,12 @@ import org.apache.flink.table.expressions._
 import org.apache.flink.table.plan.nodes.FlinkConventions
 import org.apache.flink.table.plan.nodes.datastream.{DataStreamRel, UpdateAsRetractionTrait}
 import org.apache.flink.table.plan.rules.FlinkRuleSets
-import org.apache.flink.table.plan.schema.{DataStreamTable, RowSchema, StreamTableSourceTable}
 import org.apache.flink.table.plan.util.UpdatingPlanChecker
 import org.apache.flink.table.runtime.conversion._
+import org.apache.flink.table.plan.schema.{DataStreamTable, RowSchema, StreamTableSourceTable, TableSinkTable}
 import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
 import org.apache.flink.table.runtime.{CRowMapRunner, OutputRowtimeProcessFunction}
-import org.apache.flink.table.sinks.{AppendStreamTableSink, RetractStreamTableSink, TableSink, UpsertStreamTableSink}
+import org.apache.flink.table.sinks._
 import org.apache.flink.table.sources.{DefinedRowtimeAttribute, StreamTableSource, TableSource}
 import org.apache.flink.table.typeutils.{TimeIndicatorTypeInfo, TypeCheckUtils}
 
@@ -79,7 +79,7 @@ abstract class StreamTableEnvironment(
   // the naming pattern for internally registered tables.
   private val internalNamePattern = "^_DataStreamTable_[0-9]+$".r
 
-  def queryConfig: StreamQueryConfig = new StreamQueryConfig
+  override def queryConfig: StreamQueryConfig = new StreamQueryConfig
 
   /**
     * Checks if the chosen table name is valid.
@@ -130,6 +130,60 @@ abstract class StreamTableEnvironment(
   }
 
   /**
+    * Registers an external [[TableSink]] with given field names and types in this
+    * [[TableEnvironment]]'s catalog.
+    * Registered sink tables can be referenced in SQL DML statements.
+    *
+    * Example:
+    *
+    * {{{
+    *   // create a table sink and its field names and types
+    *   val fieldNames: Array[String] = Array("a", "b", "c")
+    *   val fieldTypes: Array[TypeInformation[_]] = Array(Types.STRING, Types.INT, Types.LONG)
+    *   val tableSink: StreamTableSink = new YourTableSinkImpl(...)
+    *
+    *   // register the table sink in the catalog
+    *   tableEnv.registerTableSink("output_table", fieldNames, fieldsTypes, tableSink)
+    *
+    *   // use the registered sink
+    *   tableEnv.sqlUpdate("INSERT INTO output_table SELECT a, b, c FROM sourceTable")
+    * }}}
+    *
+    * @param name The name under which the [[TableSink]] is registered.
+    * @param fieldNames The field names to register with the [[TableSink]].
+    * @param fieldTypes The field types to register with the [[TableSink]].
+    * @param tableSink The [[TableSink]] to register.
+    */
+  def registerTableSink(
+      name: String,
+      fieldNames: Array[String],
+      fieldTypes: Array[TypeInformation[_]],
+      tableSink: TableSink[_]): Unit = {
+
+    checkValidTableName(name)
+    if (fieldNames == null) throw TableException("fieldNames must not be null.")
+    if (fieldTypes == null) throw TableException("fieldTypes must not be null.")
+    if (fieldNames.length == 0) throw new TableException("fieldNames must not be empty.")
+    if (fieldNames.length != fieldTypes.length) {
+      throw new TableException("Same number of field names and types required.")
+    }
+
+    tableSink match {
+      case streamTableSink@(
+        _: AppendStreamTableSink[_] |
+        _: UpsertStreamTableSink[_] |
+        _: RetractStreamTableSink[_]) =>
+
+        val configuredSink = streamTableSink.configure(fieldNames, fieldTypes)
+        registerTableInternal(name, new TableSinkTable(configuredSink))
+      case _ =>
+        throw new TableException(
+          "Only AppendStreamTableSink, UpsertStreamTableSink, and RetractStreamTableSink can be " +
+            "registered in StreamTableEnvironment.")
+    }
+  }
+
+  /**
     * Writes a [[Table]] to a [[TableSink]].
     *
     * Internally, the [[Table]] is translated into a [[DataStream]] and handed over to the

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
index 2e9e18f..0424cf8 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
@@ -31,7 +31,7 @@ import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.schema.SchemaPlus
 import org.apache.calcite.schema.impl.AbstractTable
-import org.apache.calcite.sql.SqlOperatorTable
+import org.apache.calcite.sql._
 import org.apache.calcite.sql.parser.SqlParser
 import org.apache.calcite.sql.util.ChainedSqlOperatorTable
 import org.apache.calcite.tools._
@@ -49,13 +49,13 @@ import org.apache.flink.table.api.scala.{BatchTableEnvironment => ScalaBatchTabl
 import org.apache.flink.table.calcite.{FlinkPlannerImpl, FlinkRelBuilder, FlinkTypeFactory, FlinkTypeSystem}
 import org.apache.flink.table.catalog.{ExternalCatalog, ExternalCatalogSchema}
 import org.apache.flink.table.codegen.{ExpressionReducer, FunctionCodeGenerator, GeneratedFunction}
-import org.apache.flink.table.expressions.{Alias, Expression, UnresolvedFieldReference, _}
-import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils.{checkForInstantiation, checkNotSingleton, createScalarSqlFunction, createTableSqlFunctions, _}
+import org.apache.flink.table.expressions._
+import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils._
 import org.apache.flink.table.functions.{AggregateFunction, ScalarFunction, TableFunction}
 import org.apache.flink.table.plan.cost.DataSetCostFactory
 import org.apache.flink.table.plan.logical.{CatalogNode, LogicalRelNode}
 import org.apache.flink.table.plan.rules.FlinkRuleSets
-import org.apache.flink.table.plan.schema.{RelTable, RowSchema}
+import org.apache.flink.table.plan.schema.{RelTable, RowSchema, TableSinkTable}
 import org.apache.flink.table.sinks.TableSink
 import org.apache.flink.table.sources.{DefinedFieldNames, TableSource}
 import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo
@@ -110,6 +110,13 @@ abstract class TableEnvironment(val config: TableConfig) {
   /** Returns the table config to define the runtime behavior of the Table API. */
   def getConfig: TableConfig = config
 
+  /** Returns the [[QueryConfig]] depends on the concrete type of this TableEnvironment. */
+  private[flink] def queryConfig: QueryConfig = this match {
+    case _: BatchTableEnvironment => new BatchQueryConfig
+    case _: StreamTableEnvironment => new StreamQueryConfig
+    case _ => null
+  }
+
   /**
     * Returns the operator table for this environment including a custom Calcite configuration.
     */
@@ -276,7 +283,8 @@ abstract class TableEnvironment(val config: TableConfig) {
             s"${t.msg}\n" +
             s"Please check the documentation for the set of currently supported SQL features.")
       case a: AssertionError =>
-        throw a.getCause
+        // keep original exception stack for caller
+        throw a
     }
     output
   }
@@ -414,6 +422,22 @@ abstract class TableEnvironment(val config: TableConfig) {
   def registerTableSource(name: String, tableSource: TableSource[_]): Unit
 
   /**
+    * Registers an external [[TableSink]] with given field names and types in this
+    * [[TableEnvironment]]'s catalog.
+    * Registered sink tables can be referenced in SQL DML statements.
+    *
+    * @param name The name under which the [[TableSink]] is registered.
+    * @param fieldNames The field names to register with the [[TableSink]].
+    * @param fieldTypes The field types to register with the [[TableSink]].
+    * @param tableSink The [[TableSink]] to register.
+    */
+  def registerTableSink(
+      name: String,
+      fieldNames: Array[String],
+      fieldTypes: Array[TypeInformation[_]],
+      tableSink: TableSink[_]): Unit
+
+  /**
     * Replaces a registered Table with another Table under the same name.
     * We use this method to replace a [[org.apache.flink.table.plan.schema.DataStreamTable]]
     * with a [[org.apache.calcite.schema.TranslatableTable]].
@@ -489,9 +513,10 @@ abstract class TableEnvironment(val config: TableConfig) {
   /**
     * Evaluates a SQL query on registered tables and retrieves the result as a [[Table]].
     *
-    * All tables referenced by the query must be registered in the TableEnvironment. But
-    * [[Table.toString]] will automatically register an unique table name and return the
-    * table name. So it allows to call SQL directly on tables like this:
+    * All tables referenced by the query must be registered in the TableEnvironment.
+    * A [[Table]] is automatically registered when its [[toString]] method is called, for example
+    * when it is embedded into a String.
+    * Hence, SQL queries can directly reference a [[Table]] as follows:
     *
     * {{{
     *   val table: Table = ...
@@ -502,16 +527,110 @@ abstract class TableEnvironment(val config: TableConfig) {
     * @param query The SQL query to evaluate.
     * @return The result of the query as Table.
     */
+  @deprecated("Please use sqlQuery() instead.")
   def sql(query: String): Table = {
+    sqlQuery(query)
+  }
+
+  /**
+    * Evaluates a SQL query on registered tables and retrieves the result as a [[Table]].
+    *
+    * All tables referenced by the query must be registered in the TableEnvironment.
+    * A [[Table]] is automatically registered when its [[toString]] method is called, for example
+    * when it is embedded into a String.
+    * Hence, SQL queries can directly reference a [[Table]] as follows:
+    *
+    * {{{
+    *   val table: Table = ...
+    *   // the table is not registered to the table environment
+    *   tEnv.sqlQuery(s"SELECT * FROM $table")
+    * }}}
+    *
+    * @param query The SQL query to evaluate.
+    * @return The result of the query as Table
+    */
+  def sqlQuery(query: String): Table = {
     val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, getTypeFactory)
     // parse the sql query
     val parsed = planner.parse(query)
-    // validate the sql query
-    val validated = planner.validate(parsed)
-    // transform to a relational tree
-    val relational = planner.rel(validated)
+    if (null != parsed && parsed.getKind.belongsTo(SqlKind.QUERY)) {
+      // validate the sql query
+      val validated = planner.validate(parsed)
+      // transform to a relational tree
+      val relational = planner.rel(validated)
+      new Table(this, LogicalRelNode(relational.rel))
+    } else {
+      throw new TableException(
+        "Unsupported SQL query! sqlQuery() only accepts SQL queries of type " +
+          "SELECT, UNION, INTERSECT, EXCEPT, VALUES, and ORDER_BY.")
+    }
+  }
+
+  /**
+    * Evaluates a SQL statement such as INSERT, UPDATE or DELETE; or a DDL statement;
+    * NOTE: Currently only SQL INSERT statements are supported.
+    *
+    * All tables referenced by the query must be registered in the TableEnvironment.
+    * A [[Table]] is automatically registered when its [[toString]] method is called, for example
+    * when it is embedded into a String.
+    * Hence, SQL queries can directly reference a [[Table]] as follows:
+    *
+    * {{{
+    *   // register the table sink into which the result is inserted.
+    *   tEnv.registerTableSink("sinkTable", fieldNames, fieldsTypes, tableSink)
+    *   val sourceTable: Table = ...
+    *   // sourceTable is not registered to the table environment
+    *   tEnv.sqlUpdate(s"INSERT INTO sinkTable SELECT * FROM $sourceTable")
+    * }}}
+    *
+    * @param stmt The SQL statement to evaluate.
+    */
+  def sqlUpdate(stmt: String): Unit = {
+    sqlUpdate(stmt, this.queryConfig)
+  }
 
-    new Table(this, LogicalRelNode(relational.rel))
+  /**
+    * Evaluates a SQL statement such as INSERT, UPDATE or DELETE; or a DDL statement;
+    * NOTE: Currently only SQL INSERT statements are supported.
+    *
+    * All tables referenced by the query must be registered in the TableEnvironment.
+    * A [[Table]] is automatically registered when its [[toString]] method is called, for example
+    * when it is embedded into a String.
+    * Hence, SQL queries can directly reference a [[Table]] as follows:
+    *
+    * {{{
+    *   // register the table sink into which the result is inserted.
+    *   tEnv.registerTableSink("sinkTable", fieldNames, fieldsTypes, tableSink)
+    *   val sourceTable: Table = ...
+    *   // sourceTable is not registered to the table environment
+    *   tEnv.sqlUpdate(s"INSERT INTO sinkTable SELECT * FROM $sourceTable")
+    * }}}
+    *
+    * @param stmt The SQL statement to evaluate.
+    * @param config The [[QueryConfig]] to use.
+    */
+  def sqlUpdate(stmt: String, config: QueryConfig): Unit = {
+    val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, getTypeFactory)
+    // parse the sql query
+    val parsed = planner.parse(stmt)
+    parsed match {
+      case insert: SqlInsert =>
+        // validate the SQL query
+        val query = insert.getSource
+        planner.validate(query)
+
+        // get query result as Table
+        val queryResult = new Table(this, LogicalRelNode(planner.rel(query).rel))
+
+        // get name of sink table
+        val targetTableName = insert.getTargetTable.asInstanceOf[SqlIdentifier].names.get(0)
+
+        // insert query result into sink table
+        insertInto(queryResult, targetTableName, config)
+      case _ =>
+        throw new TableException(
+          "Unsupported SQL query! sqlUpdate() only accepts SQL statements of type INSERT.")
+    }
   }
 
   /**
@@ -519,11 +638,62 @@ abstract class TableEnvironment(val config: TableConfig) {
     *
     * @param table The [[Table]] to write.
     * @param sink The [[TableSink]] to write the [[Table]] to.
+    * @param conf The [[QueryConfig]] to use.
     * @tparam T The data type that the [[TableSink]] expects.
     */
   private[flink] def writeToSink[T](table: Table, sink: TableSink[T], conf: QueryConfig): Unit
 
   /**
+    * Writes the [[Table]] to a [[TableSink]] that was registered under the specified name.
+    *
+    * @param table The table to write to the TableSink.
+    * @param sinkTableName The name of the registered TableSink.
+    * @param conf The query configuration to use.
+    */
+  private[flink] def insertInto(table: Table, sinkTableName: String, conf: QueryConfig): Unit = {
+
+    // check that sink table exists
+    if (null == sinkTableName) throw TableException("Name of TableSink must not be null.")
+    if (sinkTableName.isEmpty) throw TableException("Name of TableSink must not be empty.")
+    if (!isRegistered(sinkTableName)) {
+      throw TableException(s"No table was registered under the name $sinkTableName.")
+    }
+
+    getTable(sinkTableName) match {
+      case s: TableSinkTable[_] =>
+        val tableSink = s.tableSink
+        // validate schema of source table and table sink
+        val srcFieldTypes = table.getSchema.getTypes
+        val sinkFieldTypes = tableSink.getFieldTypes
+
+        if (srcFieldTypes.length != sinkFieldTypes.length ||
+          srcFieldTypes.zip(sinkFieldTypes).exists{case (srcF, snkF) => srcF != snkF}) {
+
+          val srcFieldNames = table.getSchema.getColumnNames
+          val sinkFieldNames = tableSink.getFieldNames
+
+          // format table and table sink schema strings
+          val srcSchema = srcFieldNames.zip(srcFieldTypes)
+            .map{case (n, t) => s"$n: ${t.getTypeClass.getSimpleName}"}
+            .mkString("[", ", ", "]")
+          val sinkSchema = sinkFieldNames.zip(sinkFieldTypes)
+            .map{case (n, t) => s"$n: ${t.getTypeClass.getSimpleName}"}
+            .mkString("[", ", ", "]")
+
+          throw ValidationException(
+            s"Field types of query result and registered TableSink $sinkTableName do not match.\n" +
+              s"Query result schema: $srcSchema\n" +
+              s"TableSink schema:    $sinkSchema")
+        }
+        // emit the table to the configured table sink
+        writeToSink(table, tableSink, conf)
+      case _ =>
+        throw TableException(s"The table registered as $sinkTableName is not a TableSink. " +
+          s"You can only emit query results to a registered TableSink.")
+    }
+  }
+
+  /**
     * Registers a Calcite [[AbstractTable]] in the TableEnvironment's catalog.
     *
     * @param name The name under which the table will be registered.
@@ -554,10 +724,14 @@ abstract class TableEnvironment(val config: TableConfig) {
     * @param name The table name to check.
     * @return true, if a table is registered under the name, false otherwise.
     */
-  protected def isRegistered(name: String): Boolean = {
+  protected[flink] def isRegistered(name: String): Boolean = {
     rootSchema.getTableNames.contains(name)
   }
 
+  private def getTable(name: String): org.apache.calcite.schema.Table = {
+    rootSchema.getTable(name)
+  }
+
   protected def getRowType(name: String): RelDataType = {
     rootSchema.getTable(name).getRowType(typeFactory)
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/queryConfig.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/queryConfig.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/queryConfig.scala
index c8fbab7..4aa5543 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/queryConfig.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/queryConfig.scala
@@ -19,6 +19,7 @@
 package org.apache.flink.table.api
 
 import _root_.java.io.Serializable
+
 import org.apache.flink.api.common.time.Time
 
 class QueryConfig private[table] extends Serializable {}

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
index 2298575..30ed98e 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
@@ -25,6 +25,7 @@ import org.apache.flink.table.expressions.{Alias, Asc, Expression, ExpressionPar
 import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils
 import org.apache.flink.table.plan.ProjectionTranslator._
 import org.apache.flink.table.plan.logical.{Minus, _}
+import org.apache.flink.table.plan.schema.TableSinkTable
 import org.apache.flink.table.sinks.TableSink
 
 import _root_.scala.annotation.varargs
@@ -762,13 +763,10 @@ class Table(
     * @tparam T The data type that the [[TableSink]] expects.
     */
   def writeToSink[T](sink: TableSink[T]): Unit = {
-
-    def queryConfig = this.tableEnv match {
-      case s: StreamTableEnvironment => s.queryConfig
-      case b: BatchTableEnvironment => new BatchQueryConfig
-      case _ => null
+    val queryConfig = Option(this.tableEnv) match {
+      case None => null
+      case _ => this.tableEnv.queryConfig
     }
-
     writeToSink(sink, queryConfig)
   }
 
@@ -800,6 +798,37 @@ class Table(
   }
 
   /**
+    * Writes the [[Table]] to a [[TableSink]] that was registered under the specified name.
+    *
+    * A batch [[Table]] can only be written to a
+    * [[org.apache.flink.table.sinks.BatchTableSink]], a streaming [[Table]] requires a
+    * [[org.apache.flink.table.sinks.AppendStreamTableSink]], a
+    * [[org.apache.flink.table.sinks.RetractStreamTableSink]], or an
+    * [[org.apache.flink.table.sinks.UpsertStreamTableSink]].
+    *
+    * @param tableName Name of the registered [[TableSink]] to which the [[Table]] is written.
+    */
+  def insertInto(tableName: String): Unit = {
+    tableEnv.insertInto(this, tableName, this.tableEnv.queryConfig)
+  }
+
+  /**
+    * Writes the [[Table]] to a [[TableSink]] that was registered under the specified name.
+    *
+    * A batch [[Table]] can only be written to a
+    * [[org.apache.flink.table.sinks.BatchTableSink]], a streaming [[Table]] requires a
+    * [[org.apache.flink.table.sinks.AppendStreamTableSink]], a
+    * [[org.apache.flink.table.sinks.RetractStreamTableSink]], or an
+    * [[org.apache.flink.table.sinks.UpsertStreamTableSink]].
+    *
+    * @param tableName Name of the [[TableSink]] to which the [[Table]] is written.
+    * @param conf The [[QueryConfig]] to use.
+    */
+  def insertInto(tableName: String, conf: QueryConfig): Unit = {
+    tableEnv.insertInto(this, tableName, conf)
+  }
+
+  /**
     * Groups the records of a table by assigning them to windows defined by a time or row interval.
     *
     * For streaming tables of infinite size, grouping into windows is required to define finite

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala
index beb2436..4d9acaa 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala
@@ -57,7 +57,6 @@ class FlinkPlannerImpl(
   val defaultSchema: SchemaPlus = config.getDefaultSchema
 
   var validator: FlinkCalciteSqlValidator = _
-  var validatedSqlNode: SqlNode = _
   var root: RelRoot = _
 
   private def ready() {
@@ -85,16 +84,15 @@ class FlinkPlannerImpl(
     validator = new FlinkCalciteSqlValidator(operatorTable, createCatalogReader, typeFactory)
     validator.setIdentifierExpansion(true)
     try {
-      validatedSqlNode = validator.validate(sqlNode)
+      validator.validate(sqlNode)
     }
     catch {
       case e: RuntimeException =>
         throw new ValidationException(s"SQL validation failed. ${e.getMessage}", e)
     }
-    validatedSqlNode
   }
 
-  def rel(sql: SqlNode): RelRoot = {
+  def rel(validatedSqlNode: SqlNode): RelRoot = {
     try {
       assert(validatedSqlNode != null)
       val rexBuilder: RexBuilder = createRexBuilder

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSinkTable.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSinkTable.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSinkTable.scala
new file mode 100644
index 0000000..f5e80d5
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSinkTable.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.schema
+
+import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory}
+import org.apache.calcite.schema.Statistic
+import org.apache.calcite.schema.impl.AbstractTable
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.plan.stats.FlinkStatistic
+import org.apache.flink.table.sinks.TableSink
+
+/** Table which defines an external table via a [[TableSink]] */
+class TableSinkTable[T](
+    val tableSink: TableSink[T],
+    val statistic: FlinkStatistic = FlinkStatistic.UNKNOWN)
+  extends AbstractTable {
+
+  override def getRowType(typeFactory: RelDataTypeFactory): RelDataType = {
+    val flinkTypeFactory = typeFactory.asInstanceOf[FlinkTypeFactory]
+    flinkTypeFactory.buildLogicalRowType(tableSink.getFieldNames, tableSink.getFieldTypes)
+  }
+
+  /**
+    * Returns statistics of current table
+    *
+    * @return statistics of current table
+    */
+  override def getStatistic: Statistic = statistic
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/batch/JavaTableSourceITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/batch/JavaTableSourceITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/batch/JavaTableSourceITCase.java
index eb97afe..672b6fd 100644
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/batch/JavaTableSourceITCase.java
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/batch/JavaTableSourceITCase.java
@@ -80,7 +80,7 @@ public class JavaTableSourceITCase extends TableProgramsCollectionTestBase {
 		tableEnv.registerTableSource("persons", csvTable);
 
 		Table result = tableEnv
-			.sql("SELECT `last`, FLOOR(id), score * 2 FROM persons WHERE score < 20");
+			.sqlQuery("SELECT `last`, FLOOR(id), score * 2 FROM persons WHERE score < 20");
 
 		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
 		List<Row> results = resultSet.collect();

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/batch/sql/GroupingSetsITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/batch/sql/GroupingSetsITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/batch/sql/GroupingSetsITCase.java
index 3c8a1cc..455e8ce 100644
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/batch/sql/GroupingSetsITCase.java
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/batch/sql/GroupingSetsITCase.java
@@ -186,7 +186,7 @@ public class GroupingSetsITCase extends TableProgramsClusterTestBase {
 	 * @param expected Expected result.
 	 */
 	private void checkSql(String query, String expected) throws Exception {
-		Table resultTable = tableEnv.sql(query);
+		Table resultTable = tableEnv.sqlQuery(query);
 		DataSet<Row> resultDataSet = tableEnv.toDataSet(resultTable, Row.class);
 		List<Row> results = resultDataSet.collect();
 		TestBaseUtils.compareResultAsText(results, expected);
@@ -204,12 +204,12 @@ public class GroupingSetsITCase extends TableProgramsClusterTestBase {
 		};
 
 		// Execute first query and store results
-		Table resultTable1 = tableEnv.sql(query1);
+		Table resultTable1 = tableEnv.sqlQuery(query1);
 		DataSet<Row> resultDataSet1 = tableEnv.toDataSet(resultTable1, Row.class);
 		List<String> results1 = resultDataSet1.map(mapFunction).collect();
 
 		// Execute second query and store results
-		Table resultTable2 = tableEnv.sql(query2);
+		Table resultTable2 = tableEnv.sqlQuery(query2);
 		DataSet<Row> resultDataSet2 = tableEnv.toDataSet(resultTable2, Row.class);
 		List<String> results2 = resultDataSet2.map(mapFunction).collect();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/batch/sql/JavaSqlITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/batch/sql/JavaSqlITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/batch/sql/JavaSqlITCase.java
index c5a394a..f9693fd 100644
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/batch/sql/JavaSqlITCase.java
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/batch/sql/JavaSqlITCase.java
@@ -61,7 +61,7 @@ public class JavaSqlITCase extends TableProgramsCollectionTestBase {
 		String sqlQuery = "VALUES (1, 'Test', TRUE, DATE '1944-02-24', 12.4444444444444445)," +
 			"(2, 'Hello', TRUE, DATE '1944-02-24', 12.666666665)," +
 			"(3, 'World', FALSE, DATE '1944-12-24', 12.54444445)";
-		Table result = tableEnv.sql(sqlQuery);
+		Table result = tableEnv.sqlQuery(sqlQuery);
 
 		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
 
@@ -83,7 +83,7 @@ public class JavaSqlITCase extends TableProgramsCollectionTestBase {
 		tableEnv.registerTable("T", in);
 
 		String sqlQuery = "SELECT a, c FROM T";
-		Table result = tableEnv.sql(sqlQuery);
+		Table result = tableEnv.sqlQuery(sqlQuery);
 
 		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
 		List<Row> results = resultSet.collect();
@@ -106,7 +106,7 @@ public class JavaSqlITCase extends TableProgramsCollectionTestBase {
 		tableEnv.registerDataSet("DataSetTable", ds, "x, y, z");
 
 		String sqlQuery = "SELECT x FROM DataSetTable WHERE z LIKE '%Hello%'";
-		Table result = tableEnv.sql(sqlQuery);
+		Table result = tableEnv.sqlQuery(sqlQuery);
 
 		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
 		List<Row> results = resultSet.collect();
@@ -123,7 +123,7 @@ public class JavaSqlITCase extends TableProgramsCollectionTestBase {
 		tableEnv.registerDataSet("AggTable", ds, "x, y, z");
 
 		String sqlQuery = "SELECT sum(x), min(x), max(x), count(y), avg(x) FROM AggTable";
-		Table result = tableEnv.sql(sqlQuery);
+		Table result = tableEnv.sqlQuery(sqlQuery);
 
 		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
 		List<Row> results = resultSet.collect();
@@ -143,7 +143,7 @@ public class JavaSqlITCase extends TableProgramsCollectionTestBase {
 		tableEnv.registerDataSet("t2", ds2, "d, e, f, g, h");
 
 		String sqlQuery = "SELECT c, g FROM t1, t2 WHERE b = e";
-		Table result = tableEnv.sql(sqlQuery);
+		Table result = tableEnv.sqlQuery(sqlQuery);
 
 		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
 		List<Row> results = resultSet.collect();
@@ -168,7 +168,7 @@ public class JavaSqlITCase extends TableProgramsCollectionTestBase {
 		tableEnv.registerDataSet("t1", ds1, "a, b");
 
 		String sqlQuery = "SELECT b['foo'] FROM t1";
-		Table result = tableEnv.sql(sqlQuery);
+		Table result = tableEnv.sqlQuery(sqlQuery);
 
 		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
 		List<Row> results = resultSet.collect();

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/stream/sql/JavaSqlITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/stream/sql/JavaSqlITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/stream/sql/JavaSqlITCase.java
index c6368d4..f3d0309 100644
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/stream/sql/JavaSqlITCase.java
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/stream/sql/JavaSqlITCase.java
@@ -68,7 +68,7 @@ public class JavaSqlITCase extends StreamingMultipleProgramsTestBase {
 		tableEnv.registerTable("MyTableRow", in);
 
 		String sqlQuery = "SELECT a,c FROM MyTableRow";
-		Table result = tableEnv.sql(sqlQuery);
+		Table result = tableEnv.sqlQuery(sqlQuery);
 
 		DataStream<Row> resultSet = tableEnv.toAppendStream(result, Row.class);
 		resultSet.addSink(new StreamITCase.StringSink<Row>());
@@ -93,7 +93,7 @@ public class JavaSqlITCase extends StreamingMultipleProgramsTestBase {
 		tableEnv.registerTable("MyTable", in);
 
 		String sqlQuery = "SELECT * FROM MyTable";
-		Table result = tableEnv.sql(sqlQuery);
+		Table result = tableEnv.sqlQuery(sqlQuery);
 
 		DataStream<Row> resultSet = tableEnv.toAppendStream(result, Row.class);
 		resultSet.addSink(new StreamITCase.StringSink<Row>());
@@ -117,7 +117,7 @@ public class JavaSqlITCase extends StreamingMultipleProgramsTestBase {
 		tableEnv.registerDataStream("MyTable", ds, "a, b, c, d, e");
 
 		String sqlQuery = "SELECT a, b, e FROM MyTable WHERE c < 4";
-		Table result = tableEnv.sql(sqlQuery);
+		Table result = tableEnv.sqlQuery(sqlQuery);
 
 		DataStream<Row> resultSet = tableEnv.toAppendStream(result, Row.class);
 		resultSet.addSink(new StreamITCase.StringSink<Row>());
@@ -148,7 +148,7 @@ public class JavaSqlITCase extends StreamingMultipleProgramsTestBase {
 		String sqlQuery = "SELECT * FROM T1 " +
 							"UNION ALL " +
 							"(SELECT a, b, c FROM T2 WHERE a	< 3)";
-		Table result = tableEnv.sql(sqlQuery);
+		Table result = tableEnv.sqlQuery(sqlQuery);
 
 		DataStream<Row> resultSet = tableEnv.toAppendStream(result, Row.class);
 		resultSet.addSink(new StreamITCase.StringSink<Row>());

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/BatchTableEnvironmentTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/BatchTableEnvironmentTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/BatchTableEnvironmentTest.scala
index c9a7049..dde5569 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/BatchTableEnvironmentTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/BatchTableEnvironmentTest.scala
@@ -31,7 +31,7 @@ class BatchTableEnvironmentTest extends TableTestBase {
     val util = batchTestUtil()
     val table = util.addTable[(Long, Int, String)]("tableName", 'a, 'b, 'c)
 
-    val sqlTable = util.tableEnv.sql(s"SELECT a, b, c FROM $table WHERE b > 12")
+    val sqlTable = util.tableEnv.sqlQuery(s"SELECT a, b, c FROM $table WHERE b > 12")
 
     val expected = unaryNode(
       "DataSetCalc",
@@ -43,7 +43,7 @@ class BatchTableEnvironmentTest extends TableTestBase {
 
     val table2 = util.addTable[(Long, Int, String)]('d, 'e, 'f)
 
-    val sqlTable2 = util.tableEnv.sql(s"SELECT d, e, f FROM $table, $table2 WHERE c = d")
+    val sqlTable2 = util.tableEnv.sqlQuery(s"SELECT d, e, f FROM $table, $table2 WHERE c = d")
 
     val join = unaryNode(
       "DataSetJoin",

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/CalcValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/CalcValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/CalcValidationTest.scala
index 9aada9a..69b12b2 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/CalcValidationTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/CalcValidationTest.scala
@@ -34,6 +34,6 @@ class CalcValidationTest extends TableTestBase {
 
     val sqlQuery = "SELECT a, foo FROM MyTable"
 
-    util.tableEnv.sql(sqlQuery)
+    util.tableEnv.sqlQuery(sqlQuery)
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/InsertIntoValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/InsertIntoValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/InsertIntoValidationTest.scala
new file mode 100644
index 0000000..ef9e6a3
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/InsertIntoValidationTest.scala
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.batch.sql.validation
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.{Types, ValidationException}
+import org.apache.flink.table.utils.{MemoryTableSinkUtil, TableTestBase}
+import org.junit._
+
+class InsertIntoValidationTest extends TableTestBase {
+
+  @Test(expected = classOf[ValidationException])
+  def testInconsistentLengthInsert(): Unit = {
+    val util = batchTestUtil()
+    util.addTable[(Int, Long, String)]("sourceTable", 'a, 'b, 'c)
+
+    val fieldNames = Array("d", "e")
+    val fieldTypes: Array[TypeInformation[_]] = Array(Types.INT, Types.LONG)
+    val sink = new MemoryTableSinkUtil.UnsafeMemoryAppendTableSink
+    util.tableEnv.registerTableSink("targetTable", fieldNames, fieldTypes, sink)
+
+    val sql = "INSERT INTO targetTable SELECT a, b, c FROM sourceTable"
+
+    // must fail because table sink schema has too few fields
+    util.tableEnv.sqlUpdate(sql)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testUnmatchedTypesInsert(): Unit = {
+    val util = batchTestUtil()
+    util.addTable[(Int, Long, String)]("sourceTable", 'a, 'b, 'c)
+
+    val fieldNames = Array("d", "e", "f")
+    val fieldTypes: Array[TypeInformation[_]] = Array(Types.STRING, Types.INT, Types.LONG)
+    val sink = new MemoryTableSinkUtil.UnsafeMemoryAppendTableSink
+    util.tableEnv.registerTableSink("targetTable", fieldNames, fieldTypes, sink)
+
+    val sql = "INSERT INTO targetTable SELECT a, b, c FROM sourceTable"
+
+    // must fail because types of table sink do not match query result
+    util.tableEnv.sqlUpdate(sql)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testUnsupportedPartialInsert(): Unit = {
+    val util = batchTestUtil()
+    util.addTable[(Int, Long, String)]("sourceTable", 'a, 'b, 'c)
+
+    val fieldNames = Array("d", "e", "f")
+    val fieldTypes = util.tableEnv.scan("sourceTable").getSchema.getTypes
+    val sink = new MemoryTableSinkUtil.UnsafeMemoryAppendTableSink
+    util.tableEnv.registerTableSink("targetTable", fieldNames, fieldTypes, sink)
+
+    val sql = "INSERT INTO targetTable (d, f) SELECT a, c FROM sourceTable"
+
+    // must fail because partial insert is not supported yet.
+    util.tableEnv.sqlUpdate(sql, util.tableEnv.queryConfig)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/JoinValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/JoinValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/JoinValidationTest.scala
index 90bcfec..d9e0e10 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/JoinValidationTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/JoinValidationTest.scala
@@ -35,7 +35,7 @@ class JoinValidationTest extends TableTestBase {
 
     val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE foo = e"
 
-    util.tableEnv.sql(sqlQuery)
+    util.tableEnv.sqlQuery(sqlQuery)
   }
 
   @Test(expected = classOf[TableException])
@@ -46,7 +46,7 @@ class JoinValidationTest extends TableTestBase {
 
     val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE a = g"
 
-    util.tableEnv.sql(sqlQuery).toDataSet[Row]
+    util.tableEnv.sqlQuery(sqlQuery).toDataSet[Row]
   }
 
   @Test(expected = classOf[ValidationException])
@@ -57,7 +57,7 @@ class JoinValidationTest extends TableTestBase {
 
     val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE a = d"
 
-    util.tableEnv.sql(sqlQuery).toDataSet[Row]
+    util.tableEnv.sqlQuery(sqlQuery).toDataSet[Row]
   }
 
   @Test(expected = classOf[TableException])
@@ -68,7 +68,7 @@ class JoinValidationTest extends TableTestBase {
 
     val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE d = f"
 
-    util.tableEnv.sql(sqlQuery).toDataSet[Row]
+    util.tableEnv.sqlQuery(sqlQuery).toDataSet[Row]
   }
 
   @Test(expected = classOf[TableException])
@@ -79,7 +79,7 @@ class JoinValidationTest extends TableTestBase {
 
     val sqlQuery = "SELECT a, a1 FROM Table3 CROSS JOIN Table4"
 
-    util.tableEnv.sql(sqlQuery).toDataSet[Row]
+    util.tableEnv.sqlQuery(sqlQuery).toDataSet[Row]
   }
 
   @Test(expected = classOf[TableException])
@@ -90,7 +90,7 @@ class JoinValidationTest extends TableTestBase {
 
     val sqlQuery = "SELECT c, g FROM Table3 RIGHT OUTER JOIN Table5 ON b = e and a > d"
 
-    util.tableEnv.sql(sqlQuery).toDataSet[Row]
+    util.tableEnv.sqlQuery(sqlQuery).toDataSet[Row]
   }
 
   @Test(expected = classOf[TableException])
@@ -101,7 +101,7 @@ class JoinValidationTest extends TableTestBase {
 
     val sqlQuery = "SELECT c, g FROM Table3 LEFT OUTER JOIN Table5 ON b = e and a > d"
 
-    util.tableEnv.sql(sqlQuery).toDataSet[Row]
+    util.tableEnv.sqlQuery(sqlQuery).toDataSet[Row]
   }
 
   @Test(expected = classOf[TableException])
@@ -112,7 +112,7 @@ class JoinValidationTest extends TableTestBase {
 
     val sqlQuery = "SELECT c, g FROM Table3 FULL OUTER JOIN Table5 ON b = e and a > d"
 
-    util.tableEnv.sql(sqlQuery).toDataSet[Row]
+    util.tableEnv.sqlQuery(sqlQuery).toDataSet[Row]
   }
 
   @Test(expected = classOf[TableException])
@@ -123,7 +123,7 @@ class JoinValidationTest extends TableTestBase {
 
     val sqlQuery = "SELECT c, g FROM Table3 RIGHT OUTER JOIN Table5 ON b = e and e > 3"
 
-    util.tableEnv.sql(sqlQuery).toDataSet[Row]
+    util.tableEnv.sqlQuery(sqlQuery).toDataSet[Row]
   }
 
   @Test(expected = classOf[TableException])
@@ -134,7 +134,7 @@ class JoinValidationTest extends TableTestBase {
 
     val sqlQuery = "SELECT c, g FROM Table3 LEFT OUTER JOIN Table5 ON b = e and b > 3"
 
-    util.tableEnv.sql(sqlQuery).toDataSet[Row]
+    util.tableEnv.sqlQuery(sqlQuery).toDataSet[Row]
   }
 
   @Test(expected = classOf[TableException])
@@ -145,6 +145,6 @@ class JoinValidationTest extends TableTestBase {
 
     val sqlQuery = "SELECT c, g FROM Table3 FULL OUTER JOIN Table5 ON b = e and b > 3"
 
-    util.tableEnv.sql(sqlQuery).toDataSet[Row]
+    util.tableEnv.sqlQuery(sqlQuery).toDataSet[Row]
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2cb37cb9/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/OverWindowValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/OverWindowValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/OverWindowValidationTest.scala
index 7e72a21..dfbdd5a 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/OverWindowValidationTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/OverWindowValidationTest.scala
@@ -41,7 +41,7 @@ class OverWindowValidationTest extends TableTestBase {
     util.addFunction("overAgg", new OverAgg0)
 
     val sqlQuery = "SELECT overAgg(b, a) FROM T"
-    util.tableEnv.sql(sqlQuery)
+    util.tableEnv.sqlQuery(sqlQuery)
   }
 
   /**
@@ -55,6 +55,6 @@ class OverWindowValidationTest extends TableTestBase {
 
     val sqlQuery = "SELECT overAgg(b, a) FROM T GROUP BY TUMBLE(ts, INTERVAL '2' HOUR)"
 
-    util.tableEnv.sql(sqlQuery)
+    util.tableEnv.sqlQuery(sqlQuery)
   }
 }