You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2020/06/23 16:31:37 UTC

[flink] branch release-1.11 updated (52fa6ab -> 77ff122)

This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a change to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 52fa6ab  [FLINK-18348] RemoteInputChannel should checkError before checking partitionRequestClient
     new 8a87231  [hotfix][table] fix typos in PlannerBase javadoc
     new 880a5d7  [hotfix][table] fix typos in TableEnvironment javadoc
     new fc886eb  [hotfix][table] Code cleanup: use new methods introduced in FLIP-84 instead of deprecated methods
     new a2d90fe  [FLINK-17599][docs] Update documents due to FLIP-84
     new a4797e4  [FLINK-17599][docs] Add documents for DESCRIBE statement
     new 6818f94  [FLINK-17599][docs] Add documents for EXPLAIN statement
     new d9e3e20  [FLINK-17599][docs] Add documents for USE statement
     new 77ff122  [FLINK-17599][docs] Add documents for SHOW statement

The 8 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 docs/dev/table/catalogs.md                         | 100 ++++++--
 docs/dev/table/catalogs.zh.md                      | 101 ++++++--
 docs/dev/table/common.md                           | 210 +++++-----------
 docs/dev/table/common.zh.md                        | 201 +++++----------
 docs/dev/table/connect.md                          |   6 +-
 docs/dev/table/connect.zh.md                       |   6 +-
 docs/dev/table/sql/alter.md                        |  34 +--
 docs/dev/table/sql/alter.zh.md                     |  34 +--
 docs/dev/table/sql/create.md                       |  38 +--
 docs/dev/table/sql/create.zh.md                    |  44 ++--
 docs/dev/table/sql/describe.md                     | 202 +++++++++++++++
 docs/dev/table/sql/describe.zh.md                  | 201 +++++++++++++++
 docs/dev/table/sql/drop.md                         |  34 +--
 docs/dev/table/sql/drop.zh.md                      |  34 +--
 docs/dev/table/sql/explain.md                      | 180 +++++++++++++
 docs/dev/table/sql/explain.zh.md                   | 182 ++++++++++++++
 docs/dev/table/sql/index.md                        |   4 +
 docs/dev/table/sql/index.zh.md                     |   6 +-
 docs/dev/table/sql/insert.md                       |  92 +++++--
 docs/dev/table/sql/insert.zh.md                    |  91 +++++--
 docs/dev/table/sql/queries.md                      | 181 ++++++++------
 docs/dev/table/sql/queries.zh.md                   | 183 +++++++-------
 docs/dev/table/sql/show.md                         | 277 +++++++++++++++++++++
 docs/dev/table/sql/show.zh.md                      | 277 +++++++++++++++++++++
 docs/dev/table/sql/use.md                          | 200 +++++++++++++++
 docs/dev/table/sql/use.zh.md                       | 199 +++++++++++++++
 docs/dev/table/streaming/query_configuration.md    |   6 +-
 docs/dev/table/streaming/query_configuration.zh.md |   4 +-
 docs/dev/table/tableApi.md                         |  14 +-
 docs/dev/table/tableApi.zh.md                      |  14 +-
 .../jdbc/table/JdbcLookupTableITCase.java          |   2 +-
 .../flink/sql/tests/BatchSQLTestProgram.java       |   2 +-
 .../apache/flink/table/api/TableEnvironment.java   |   1 -
 .../table/planner/delegation/PlannerBase.scala     |   2 +-
 .../flink/table/api/TableEnvironmentTest.scala     |   5 +-
 .../validation/LegacyTableSinkValidationTest.scala |   3 +-
 .../planner/runtime/FileSystemITCaseBase.scala     |   4 +-
 .../runtime/batch/table/TableSinkITCase.scala      |  28 +--
 .../runtime/stream/table/TableSinkITCase.scala     |  86 +++----
 39 files changed, 2562 insertions(+), 726 deletions(-)
 create mode 100644 docs/dev/table/sql/describe.md
 create mode 100644 docs/dev/table/sql/describe.zh.md
 create mode 100644 docs/dev/table/sql/explain.md
 create mode 100644 docs/dev/table/sql/explain.zh.md
 create mode 100644 docs/dev/table/sql/show.md
 create mode 100644 docs/dev/table/sql/show.zh.md
 create mode 100644 docs/dev/table/sql/use.md
 create mode 100644 docs/dev/table/sql/use.zh.md


[flink] 05/08: [FLINK-17599][docs] Add documents for DESCRIBE statement

Posted by tw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a4797e43421d61d1d5eb82cc528b31e43ec73f04
Author: godfreyhe <go...@163.com>
AuthorDate: Wed Jun 10 10:34:51 2020 +0800

    [FLINK-17599][docs] Add documents for DESCRIBE statement
---
 docs/dev/table/sql/describe.md    | 202 ++++++++++++++++++++++++++++++++++++++
 docs/dev/table/sql/describe.zh.md | 201 +++++++++++++++++++++++++++++++++++++
 docs/dev/table/sql/index.md       |   1 +
 docs/dev/table/sql/index.zh.md    |   1 +
 docs/dev/table/sql/queries.md     |  18 +---
 docs/dev/table/sql/queries.zh.md  |  18 +---
 6 files changed, 407 insertions(+), 34 deletions(-)

diff --git a/docs/dev/table/sql/describe.md b/docs/dev/table/sql/describe.md
new file mode 100644
index 0000000..de18869
--- /dev/null
+++ b/docs/dev/table/sql/describe.md
@@ -0,0 +1,202 @@
+---
+title: "DESCRIBE Statements"
+nav-parent_id: sql
+nav-pos: 7
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+* This will be replaced by the TOC
+{:toc}
+
+DESCRIBE statements are used to describe the schema of a table or a view.
+
+
+## Run a DESCRIBE statement
+
+DESCRIBE statements can be executed with the `executeSql()` method of the `TableEnvironment`, or executed in [SQL CLI]({{ site.baseurl }}/dev/table/sqlClient.html). The `executeSql()` method returns the schema of given table for a successful DESCRIBE operation, otherwise will throw an exception.
+
+The following examples show how to run a DESCRIBE statement in `TableEnvironment` and in SQL CLI.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+EnvironmentSettings settings = EnvironmentSettings.newInstance()...
+TableEnvironment tableEnv = TableEnvironment.create(settings);
+
+// register a table named "Orders"
+tableEnv.executeSql(
+        "CREATE TABLE Orders (" +
+        " `user` BIGINT NOT NULl," +
+        " product VARCHAR(32)," +
+        " amount INT," +
+        " ts TIMESTAMP(3)," +
+        " ptime AS PROCTIME()," +
+        " PRIMARY KEY(`user`) NOT ENFORCED," +
+        " WATERMARK FOR ts AS ts - INTERVAL '1' SECONDS" +
+        ") with (...)");
+
+// print the schema
+tableEnv.executeSql("DESCRIBE Orders").print();
+
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val settings = EnvironmentSettings.newInstance()...
+val tableEnv = TableEnvironment.create(settings)
+
+// register a table named "Orders"
+ tableEnv.executeSql(
+        "CREATE TABLE Orders (" +
+        " `user` BIGINT NOT NULl," +
+        " product VARCHAR(32)," +
+        " amount INT," +
+        " ts TIMESTAMP(3)," +
+        " ptime AS PROCTIME()," +
+        " PRIMARY KEY(`user`) NOT ENFORCED," +
+        " WATERMARK FOR ts AS ts - INTERVAL '1' SECONDS" +
+        ") with (...)")
+
+// print the schema
+tableEnv.executeSql("DESCRIBE Orders").print()
+
+{% endhighlight %}
+</div>
+
+<div data-lang="python" markdown="1">
+{% highlight python %}
+settings = EnvironmentSettings.new_instance()...
+table_env = StreamTableEnvironment.create(env, settings)
+
+# register a table named "Orders"
+table_env.execute_sql( \
+        "CREATE TABLE Orders (" 
+        " `user` BIGINT NOT NULl," 
+        " product VARCHAR(32),"
+        " amount INT,"
+        " ts TIMESTAMP(3),"
+        " ptime AS PROCTIME(),"
+        " PRIMARY KEY(`user`) NOT ENFORCED,"
+        " WATERMARK FOR ts AS ts - INTERVAL '1' SECONDS"
+        ") with (...)");
+
+# print the schema
+table_env.execute_sql("DESCRIBE Orders").print()
+
+{% endhighlight %}
+</div>
+
+<div data-lang="SQL CLI" markdown="1">
+{% highlight sql %}
+Flink SQL> CREATE TABLE Orders (
+>  `user` BIGINT NOT NULl,
+>  product VARCHAR(32),
+>  amount INT,
+>  ts TIMESTAMP(3),
+>  ptime AS PROCTIME(),
+>  PRIMARY KEY(`user`) NOT ENFORCED,
+>  WATERMARK FOR ts AS ts - INTERVAL '1' SECONDS
+> ) with (
+>  ...
+> );
+[INFO] Table has been created.
+
+Flink SQL> DESCRIBE Orders;
+
+{% endhighlight %}
+</div>
+</div>
+
+The result of the above example is:
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight text %}
+
++---------+----------------------------------+-------+-----------+-----------------+----------------------------+
+|    name |                             type |  null |       key | computed column |                  watermark |
++---------+----------------------------------+-------+-----------+-----------------+----------------------------+
+|    user |                           BIGINT | false | PRI(user) |                 |                            |
+| product |                      VARCHAR(32) |  true |           |                 |                            |
+|  amount |                              INT |  true |           |                 |                            |
+|      ts |           TIMESTAMP(3) *ROWTIME* |  true |           |                 | `ts` - INTERVAL '1' SECOND |
+|   ptime | TIMESTAMP(3) NOT NULL *PROCTIME* | false |           |      PROCTIME() |                            |
++---------+----------------------------------+-------+-----------+-----------------+----------------------------+
+5 rows in set
+
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight text %}
+
++---------+----------------------------------+-------+-----------+-----------------+----------------------------+
+|    name |                             type |  null |       key | computed column |                  watermark |
++---------+----------------------------------+-------+-----------+-----------------+----------------------------+
+|    user |                           BIGINT | false | PRI(user) |                 |                            |
+| product |                      VARCHAR(32) |  true |           |                 |                            |
+|  amount |                              INT |  true |           |                 |                            |
+|      ts |           TIMESTAMP(3) *ROWTIME* |  true |           |                 | `ts` - INTERVAL '1' SECOND |
+|   ptime | TIMESTAMP(3) NOT NULL *PROCTIME* | false |           |      PROCTIME() |                            |
++---------+----------------------------------+-------+-----------+-----------------+----------------------------+
+5 rows in set
+
+{% endhighlight %}
+</div>
+<div data-lang="python" markdown="1">
+{% highlight text %}
+
++---------+----------------------------------+-------+-----------+-----------------+----------------------------+
+|    name |                             type |  null |       key | computed column |                  watermark |
++---------+----------------------------------+-------+-----------+-----------------+----------------------------+
+|    user |                           BIGINT | false | PRI(user) |                 |                            |
+| product |                      VARCHAR(32) |  true |           |                 |                            |
+|  amount |                              INT |  true |           |                 |                            |
+|      ts |           TIMESTAMP(3) *ROWTIME* |  true |           |                 | `ts` - INTERVAL '1' SECOND |
+|   ptime | TIMESTAMP(3) NOT NULL *PROCTIME* | false |           |      PROCTIME() |                            |
++---------+----------------------------------+-------+-----------+-----------------+----------------------------+
+5 rows in set
+
+{% endhighlight %}
+</div>
+<div data-lang="SQL CLI" markdown="1">
+{% highlight text %}
+
+root
+ |-- user: BIGINT NOT NULL
+ |-- product: VARCHAR(32)
+ |-- amount: INT
+ |-- ts: TIMESTAMP(3) *ROWTIME*
+ |-- ptime: TIMESTAMP(3) NOT NULL *PROCTIME* AS PROCTIME()
+ |-- WATERMARK FOR ts AS `ts` - INTERVAL '1' SECOND
+ |-- CONSTRAINT PK_3599338 PRIMARY KEY (user)
+
+{% endhighlight %}
+</div>
+
+</div>
+
+
+{% top %}
+
+## Syntax
+
+{% highlight sql %}
+DESCRIBE [catalog_name.][db_name.]table_name
+{% endhighlight %}
diff --git a/docs/dev/table/sql/describe.zh.md b/docs/dev/table/sql/describe.zh.md
new file mode 100644
index 0000000..cb532ca
--- /dev/null
+++ b/docs/dev/table/sql/describe.zh.md
@@ -0,0 +1,201 @@
+---
+title: "DESCRIBE 语句"
+nav-parent_id: sql
+nav-pos: 7
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+* This will be replaced by the TOC
+{:toc}
+
+DESCRIBE 语句用来描述一张表或者视图的 Schema。
+
+
+## 执行 DESCRIBE 语句
+
+DESCRIBE 语句可以通过 `TableEnvironment` 的 `executeSql()` 执行,也可以在 [SQL CLI]({{ site.baseurl }}/dev/table/sqlClient.html) 中执行 DROP 语句。 若 DESCRIBE 操作执行成功,executeSql() 方法返回该表的 Schema,否则会抛出异常。
+
+以下的例子展示了如何在 TableEnvironment 和 SQL CLI 中执行一个 DESCRIBE 语句。
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+EnvironmentSettings settings = EnvironmentSettings.newInstance()...
+TableEnvironment tableEnv = TableEnvironment.create(settings);
+
+// register a table named "Orders"
+tableEnv.executeSql(
+        "CREATE TABLE Orders (" +
+        " `user` BIGINT NOT NULl," +
+        " product VARCHAR(32)," +
+        " amount INT," +
+        " ts TIMESTAMP(3)," +
+        " ptime AS PROCTIME()," +
+        " PRIMARY KEY(`user`) NOT ENFORCED," +
+        " WATERMARK FOR ts AS ts - INTERVAL '1' SECONDS" +
+        ") with (...)");
+
+// print the schema
+tableEnv.executeSql("DESCRIBE Orders").print();
+
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val settings = EnvironmentSettings.newInstance()...
+val tableEnv = TableEnvironment.create(settings)
+
+// register a table named "Orders"
+ tableEnv.executeSql(
+        "CREATE TABLE Orders (" +
+        " `user` BIGINT NOT NULl," +
+        " product VARCHAR(32)," +
+        " amount INT," +
+        " ts TIMESTAMP(3)," +
+        " ptime AS PROCTIME()," +
+        " PRIMARY KEY(`user`) NOT ENFORCED," +
+        " WATERMARK FOR ts AS ts - INTERVAL '1' SECONDS" +
+        ") with (...)")
+
+// print the schema
+tableEnv.executeSql("DESCRIBE Orders").print()
+
+{% endhighlight %}
+</div>
+
+<div data-lang="python" markdown="1">
+{% highlight python %}
+settings = EnvironmentSettings.new_instance()...
+table_env = StreamTableEnvironment.create(env, settings)
+
+# register a table named "Orders"
+table_env.execute_sql( \
+        "CREATE TABLE Orders (" 
+        " `user` BIGINT NOT NULl," 
+        " product VARCHAR(32),"
+        " amount INT,"
+        " ts TIMESTAMP(3),"
+        " ptime AS PROCTIME(),"
+        " PRIMARY KEY(`user`) NOT ENFORCED,"
+        " WATERMARK FOR ts AS ts - INTERVAL '1' SECONDS"
+        ") with (...)");
+
+# print the schema
+table_env.execute_sql("DESCRIBE Orders").print()
+
+{% endhighlight %}
+</div>
+
+<div data-lang="SQL CLI" markdown="1">
+{% highlight sql %}
+Flink SQL> CREATE TABLE Orders (
+>  `user` BIGINT NOT NULl,
+>  product VARCHAR(32),
+>  amount INT,
+>  ts TIMESTAMP(3),
+>  ptime AS PROCTIME(),
+>  PRIMARY KEY(`user`) NOT ENFORCED,
+>  WATERMARK FOR ts AS ts - INTERVAL '1' SECONDS
+> ) with (
+>  ...
+> );
+[INFO] Table has been created.
+
+Flink SQL> DESCRIBE Orders;
+
+{% endhighlight %}
+</div>
+</div>
+
+上述例子执行的结果为:
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight text %}
+
++---------+----------------------------------+-------+-----------+-----------------+----------------------------+
+|    name |                             type |  null |       key | computed column |                  watermark |
++---------+----------------------------------+-------+-----------+-----------------+----------------------------+
+|    user |                           BIGINT | false | PRI(user) |                 |                            |
+| product |                      VARCHAR(32) |  true |           |                 |                            |
+|  amount |                              INT |  true |           |                 |                            |
+|      ts |           TIMESTAMP(3) *ROWTIME* |  true |           |                 | `ts` - INTERVAL '1' SECOND |
+|   ptime | TIMESTAMP(3) NOT NULL *PROCTIME* | false |           |      PROCTIME() |                            |
++---------+----------------------------------+-------+-----------+-----------------+----------------------------+
+5 rows in set
+
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight text %}
+
++---------+----------------------------------+-------+-----------+-----------------+----------------------------+
+|    name |                             type |  null |       key | computed column |                  watermark |
++---------+----------------------------------+-------+-----------+-----------------+----------------------------+
+|    user |                           BIGINT | false | PRI(user) |                 |                            |
+| product |                      VARCHAR(32) |  true |           |                 |                            |
+|  amount |                              INT |  true |           |                 |                            |
+|      ts |           TIMESTAMP(3) *ROWTIME* |  true |           |                 | `ts` - INTERVAL '1' SECOND |
+|   ptime | TIMESTAMP(3) NOT NULL *PROCTIME* | false |           |      PROCTIME() |                            |
++---------+----------------------------------+-------+-----------+-----------------+----------------------------+
+5 rows in set
+
+{% endhighlight %}
+</div>
+<div data-lang="python" markdown="1">
+{% highlight text %}
+
++---------+----------------------------------+-------+-----------+-----------------+----------------------------+
+|    name |                             type |  null |       key | computed column |                  watermark |
++---------+----------------------------------+-------+-----------+-----------------+----------------------------+
+|    user |                           BIGINT | false | PRI(user) |                 |                            |
+| product |                      VARCHAR(32) |  true |           |                 |                            |
+|  amount |                              INT |  true |           |                 |                            |
+|      ts |           TIMESTAMP(3) *ROWTIME* |  true |           |                 | `ts` - INTERVAL '1' SECOND |
+|   ptime | TIMESTAMP(3) NOT NULL *PROCTIME* | false |           |      PROCTIME() |                            |
++---------+----------------------------------+-------+-----------+-----------------+----------------------------+
+5 rows in set
+
+{% endhighlight %}
+</div>
+<div data-lang="SQL CLI" markdown="1">
+{% highlight text %}
+
+root
+ |-- user: BIGINT NOT NULL
+ |-- product: VARCHAR(32)
+ |-- amount: INT
+ |-- ts: TIMESTAMP(3) *ROWTIME*
+ |-- ptime: TIMESTAMP(3) NOT NULL *PROCTIME* AS PROCTIME()
+ |-- WATERMARK FOR ts AS `ts` - INTERVAL '1' SECOND
+ |-- CONSTRAINT PK_3599338 PRIMARY KEY (user)
+
+{% endhighlight %}
+</div>
+
+</div>
+
+{% top %}
+
+## 语法
+
+{% highlight sql %}
+DESCRIBE [catalog_name.][db_name.]table_name
+{% endhighlight %}
diff --git a/docs/dev/table/sql/index.md b/docs/dev/table/sql/index.md
index 73076e8..06f8bd3 100644
--- a/docs/dev/table/sql/index.md
+++ b/docs/dev/table/sql/index.md
@@ -34,6 +34,7 @@ This page lists all the supported statements supported in Flink SQL for now:
 - [ALTER TABLE, DATABASE, FUNCTION](alter.html)
 - [INSERT](insert.html)
 - [SQL HINTS](hints.html)
+- [DESCRIBE](describe.html)
 
 ## Data Types
 
diff --git a/docs/dev/table/sql/index.zh.md b/docs/dev/table/sql/index.zh.md
index 70130e8..00d34ab 100644
--- a/docs/dev/table/sql/index.zh.md
+++ b/docs/dev/table/sql/index.zh.md
@@ -34,6 +34,7 @@ under the License.
 - [ALTER TABLE, DATABASE, FUNCTION](alter.html)
 - [INSERT](insert.html)
 - [SQL HINTS](hints.html)
+- [DESCRIBE](describe.html)
 
 ## 数据类型
 
diff --git a/docs/dev/table/sql/queries.md b/docs/dev/table/sql/queries.md
index b667c29..9bfb953 100644
--- a/docs/dev/table/sql/queries.md
+++ b/docs/dev/table/sql/queries.md
@@ -382,7 +382,7 @@ String literals must be enclosed in single quotes (e.g., `SELECT 'Hello World'`)
 
 ## Operations
 
-### Show, Describe, and Use
+### Show and Use
 
 <div markdown="1">
 <table class="table table-bordered">
@@ -419,22 +419,6 @@ SHOW VIEWS;
     </tr>
     <tr>
       <td>
-        <strong>Describe</strong><br>
-        <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span>
-      </td>
-      <td>
-			<p>Describe the schema of the given table.</p>
-{% highlight sql %}
-DESCRIBE myTable;
-{% endhighlight %}
-            <p>Describe the schema of the given view.</p>
-{% highlight sql %}
-DESCRIBE myView;
-{% endhighlight %}
-      </td>
-    </tr>    
-    <tr>
-      <td>
         <strong>Use</strong><br>
         <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span>
       </td>
diff --git a/docs/dev/table/sql/queries.zh.md b/docs/dev/table/sql/queries.zh.md
index 62efaca..147b682 100644
--- a/docs/dev/table/sql/queries.zh.md
+++ b/docs/dev/table/sql/queries.zh.md
@@ -380,7 +380,7 @@ Flink SQL 对于标识符(表、属性、函数名)有类似于 Java 的词
 
 ## 操作符
 
-### Show, Describe 与 Use
+### Show 与 Use
 
 <div markdown="1">
 <table class="table table-bordered">
@@ -417,22 +417,6 @@ SHOW VIEWS;
     </tr>
     <tr>
       <td>
-        <strong>Describe</strong><br>
-        <span class="label label-primary">批处理</span> <span class="label label-primary">流处理</span>
-      </td>
-      <td>
-			<p>描述给定表的 Schema</p>
-{% highlight sql %}
-DESCRIBE myTable;
-{% endhighlight %}
-            <p>描述给定视图的 Schema</p>
-{% highlight sql %}
-DESCRIBE myView;
-{% endhighlight %}
-      </td>
-    </tr>    
-    <tr>
-      <td>
         <strong>Use</strong><br>
         <span class="label label-primary">批处理</span> <span class="label label-primary">流处理</span>
       </td>


[flink] 04/08: [FLINK-17599][docs] Update documents due to FLIP-84

Posted by tw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a2d90fe9b784332d1554901c675d649715c2af55
Author: godfreyhe <go...@163.com>
AuthorDate: Tue Jun 9 17:28:47 2020 +0800

    [FLINK-17599][docs] Update documents due to FLIP-84
---
 docs/dev/table/catalogs.md                         | 100 ++++++++--
 docs/dev/table/catalogs.zh.md                      | 101 ++++++++--
 docs/dev/table/common.md                           | 208 +++++++--------------
 docs/dev/table/common.zh.md                        | 199 ++++++--------------
 docs/dev/table/connect.md                          |   6 +-
 docs/dev/table/connect.zh.md                       |   6 +-
 docs/dev/table/sql/alter.md                        |  34 ++--
 docs/dev/table/sql/alter.zh.md                     |  34 ++--
 docs/dev/table/sql/create.md                       |  38 ++--
 docs/dev/table/sql/create.zh.md                    |  44 ++---
 docs/dev/table/sql/drop.md                         |  34 ++--
 docs/dev/table/sql/drop.zh.md                      |  34 ++--
 docs/dev/table/sql/index.zh.md                     |   2 +-
 docs/dev/table/sql/insert.md                       |  92 +++++++--
 docs/dev/table/sql/insert.zh.md                    |  91 +++++++--
 docs/dev/table/sql/queries.md                      | 110 +++++++++--
 docs/dev/table/sql/queries.zh.md                   | 112 +++++++++--
 docs/dev/table/streaming/query_configuration.md    |   6 +-
 docs/dev/table/streaming/query_configuration.zh.md |   4 +-
 docs/dev/table/tableApi.md                         |  14 +-
 docs/dev/table/tableApi.zh.md                      |  14 +-
 21 files changed, 786 insertions(+), 497 deletions(-)

diff --git a/docs/dev/table/catalogs.md b/docs/dev/table/catalogs.md
index 69b7e46..4ff3917 100644
--- a/docs/dev/table/catalogs.md
+++ b/docs/dev/table/catalogs.md
@@ -68,8 +68,6 @@ The set of properties will be passed to a discovery service where the service tr
 
 Users can use SQL DDL to create tables in catalogs in both Table API and SQL.
 
-For Table API:
-
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
@@ -82,19 +80,36 @@ Catalog catalog = new HiveCatalog("myhive", null, "<path_of_hive_conf>", "<hive_
 tableEnv.registerCatalog("myhive", catalog);
 
 // Create a catalog database
-tableEnv.sqlUpdate("CREATE DATABASE mydb WITH (...)");
+tableEnv.executeSql("CREATE DATABASE mydb WITH (...)");
 
 // Create a catalog table
-tableEnv.sqlUpdate("CREATE TABLE mytable (name STRING, age INT) WITH (...)");
+tableEnv.executeSql("CREATE TABLE mytable (name STRING, age INT) WITH (...)");
 
 tableEnv.listTables(); // should return the tables in current catalog and database.
 
 {% endhighlight %}
 </div>
-</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val tableEnv = ...
+
+// Create a HiveCatalog 
+val catalog = new HiveCatalog("myhive", null, "<path_of_hive_conf>", "<hive_version>")
+
+// Register the catalog
+tableEnv.registerCatalog("myhive", catalog)
 
-For SQL Client:
+// Create a catalog database
+tableEnv.executeSql("CREATE DATABASE mydb WITH (...)")
+
+// Create a catalog table
+tableEnv.executeSql("CREATE TABLE mytable (name STRING, age INT) WITH (...)")
 
+tableEnv.listTables() // should return the tables in current catalog and database.
+
+{% endhighlight %}
+</div>
+<div data-lang="SQL Client" markdown="1">
 {% highlight sql %}
 // the catalog should have been registered via yaml file
 Flink SQL> CREATE DATABASE mydb WITH (...);
@@ -104,17 +119,25 @@ Flink SQL> CREATE TABLE mytable (name STRING, age INT) WITH (...);
 Flink SQL> SHOW TABLES;
 mytable
 {% endhighlight %}
+</div>
+</div>
+
 
 For detailed information, please check out [Flink SQL CREATE DDL]({{ site.baseurl }}/dev/table/sql/create.html).
 
-### Using Java/Scala/Python API
+### Using Java/Scala
 
-Users can use Java, Scala, or Python API to create catalog tables programmatically.
+Users can use Java or Scala to create catalog tables programmatically.
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
-TableEnvironment tableEnv = ...
+import org.apache.flink.table.api.*;
+import org.apache.flink.table.catalog.*;
+import org.apache.flink.table.catalog.hive.HiveCatalog;
+import org.apache.flink.table.descriptors.Kafka;
+
+TableEnvironment tableEnv = TableEnvironment.create(EnvironmentSettings.newInstance().build());
 
 // Create a HiveCatalog 
 Catalog catalog = new HiveCatalog("myhive", null, "<path_of_hive_conf>", "<hive_version>");
@@ -123,7 +146,7 @@ Catalog catalog = new HiveCatalog("myhive", null, "<path_of_hive_conf>", "<hive_
 tableEnv.registerCatalog("myhive", catalog);
 
 // Create a catalog database 
-catalog.createDatabase("mydb", new CatalogDatabaseImpl(...))
+catalog.createDatabase("mydb", new CatalogDatabaseImpl(...));
 
 // Create a catalog table
 TableSchema schema = TableSchema.builder()
@@ -138,15 +161,58 @@ catalog.createTable(
             new Kafka()
                 .version("0.11")
                 ....
-                .startFromEarlist(),
+                .startFromEarlist()
+                .toProperties(),
             "my comment"
-        )
+        ),
+        false
     );
     
 List<String> tables = catalog.listTables("mydb"); // tables should contain "mytable"
 {% endhighlight %}
 
 </div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+import org.apache.flink.table.api._
+import org.apache.flink.table.catalog._
+import org.apache.flink.table.catalog.hive.HiveCatalog
+import org.apache.flink.table.descriptors.Kafka
+
+val tableEnv = TableEnvironment.create(EnvironmentSettings.newInstance.build)
+
+// Create a HiveCatalog 
+val catalog = new HiveCatalog("myhive", null, "<path_of_hive_conf>", "<hive_version>")
+
+// Register the catalog
+tableEnv.registerCatalog("myhive", catalog)
+
+// Create a catalog database 
+catalog.createDatabase("mydb", new CatalogDatabaseImpl(...))
+
+// Create a catalog table
+val schema = TableSchema.builder()
+    .field("name", DataTypes.STRING())
+    .field("age", DataTypes.INT())
+    .build()
+
+catalog.createTable(
+        new ObjectPath("mydb", "mytable"), 
+        new CatalogTableImpl(
+            schema,
+            new Kafka()
+                .version("0.11")
+                ....
+                .startFromEarlist()
+                .toProperties(),
+            "my comment"
+        ),
+        false
+    )
+    
+val tables = catalog.listTables("mydb") // tables should contain "mytable"
+{% endhighlight %}
+</div>
 </div>
 
 ## Catalog API
@@ -158,7 +224,7 @@ For detailed DDL information, please refer to [SQL CREATE DDL]({{ site.baseurl }
 ### Database operations
 
 <div class="codetabs" markdown="1">
-<div data-lang="Java" markdown="1">
+<div data-lang="Java/Scala" markdown="1">
 {% highlight java %}
 // create database
 catalog.createDatabase("mydb", new CatalogDatabaseImpl(...), false);
@@ -184,7 +250,7 @@ catalog.listDatabases("mycatalog");
 ### Table operations
 
 <div class="codetabs" markdown="1">
-<div data-lang="Java" markdown="1">
+<div data-lang="Java/Scala" markdown="1">
 {% highlight java %}
 // create table
 catalog.createTable(new ObjectPath("mydb", "mytable"), new CatalogTableImpl(...), false);
@@ -213,7 +279,7 @@ catalog.listTables("mydb");
 ### View operations
 
 <div class="codetabs" markdown="1">
-<div data-lang="Java" markdown="1">
+<div data-lang="Java/Scala" markdown="1">
 {% highlight java %}
 // create view
 catalog.createTable(new ObjectPath("mydb", "myview"), new CatalogViewImpl(...), false);
@@ -243,7 +309,7 @@ catalog.listViews("mydb");
 ### Partition operations
 
 <div class="codetabs" markdown="1">
-<div data-lang="Java" markdown="1">
+<div data-lang="Java/Scala" markdown="1">
 {% highlight java %}
 // create view
 catalog.createPartition(
@@ -284,7 +350,7 @@ catalog.listPartitions(new ObjectPath("mydb", "mytable"), Arrays.asList(epr1, ..
 ### Function operations
 
 <div class="codetabs" markdown="1">
-<div data-lang="Java" markdown="1">
+<div data-lang="Java/Scala" markdown="1">
 {% highlight java %}
 // create function
 catalog.createFunction(new ObjectPath("mydb", "myfunc"), new CatalogFunctionImpl(...), false);
diff --git a/docs/dev/table/catalogs.zh.md b/docs/dev/table/catalogs.zh.md
index fd3cafd..5bd8e56 100644
--- a/docs/dev/table/catalogs.zh.md
+++ b/docs/dev/table/catalogs.zh.md
@@ -64,8 +64,6 @@ Catalog 是可扩展的,用户可以通过实现 `Catalog` 接口来开发自
 
 用户可以使用 DDL 通过 Table API 或者 SQL Client 在 Catalog 中创建表。
 
-使用 Table API:
-
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
@@ -78,19 +76,36 @@ Catalog catalog = new HiveCatalog("myhive", null, "<path_of_hive_conf>", "<hive_
 tableEnv.registerCatalog("myhive", catalog);
 
 // Create a catalog database
-tableEnv.sqlUpdate("CREATE DATABASE mydb WITH (...)");
+tableEnv.executeSql("CREATE DATABASE mydb WITH (...)");
 
 // Create a catalog table
-tableEnv.sqlUpdate("CREATE TABLE mytable (name STRING, age INT) WITH (...)");
+tableEnv.executeSql("CREATE TABLE mytable (name STRING, age INT) WITH (...)");
 
 tableEnv.listTables(); // should return the tables in current catalog and database.
 
 {% endhighlight %}
 </div>
-</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val tableEnv = ...
 
-使用 SQL Client:
+// Create a HiveCatalog 
+val catalog = new HiveCatalog("myhive", null, "<path_of_hive_conf>", "<hive_version>");
 
+// Register the catalog
+tableEnv.registerCatalog("myhive", catalog);
+
+// Create a catalog database
+tableEnv.executeSql("CREATE DATABASE mydb WITH (...)");
+
+// Create a catalog table
+tableEnv.executeSql("CREATE TABLE mytable (name STRING, age INT) WITH (...)");
+
+tableEnv.listTables(); // should return the tables in current catalog and database.
+
+{% endhighlight %}
+</div>
+<div data-lang="SQL Client" markdown="1">
 {% highlight sql %}
 // the catalog should have been registered via yaml file
 Flink SQL> CREATE DATABASE mydb WITH (...);
@@ -100,17 +115,25 @@ Flink SQL> CREATE TABLE mytable (name STRING, age INT) WITH (...);
 Flink SQL> SHOW TABLES;
 mytable
 {% endhighlight %}
+</div>
+</div>
+
 
 更多详细信息,请参考[Flink SQL CREATE DDL]({{ site.baseurl }}/zh/dev/table/sql/create.html)。
 
-### 使用 Java/Scala/Python API
+### 使用 Java/Scala
 
-用户可以用编程的方式使用Java、Scala 或者 Python API 来创建 Catalog 表。
+用户可以用编程的方式使用Java 或者 Scala 来创建 Catalog 表。
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
-TableEnvironment tableEnv = ...
+import org.apache.flink.table.api.*;
+import org.apache.flink.table.catalog.*;
+import org.apache.flink.table.catalog.hive.HiveCatalog;
+import org.apache.flink.table.descriptors.Kafka;
+
+TableEnvironment tableEnv = TableEnvironment.create(EnvironmentSettings.newInstance().build());
 
 // Create a HiveCatalog
 Catalog catalog = new HiveCatalog("myhive", null, "<path_of_hive_conf>", "<hive_version>");
@@ -119,7 +142,7 @@ Catalog catalog = new HiveCatalog("myhive", null, "<path_of_hive_conf>", "<hive_
 tableEnv.registerCatalog("myhive", catalog);
 
 // Create a catalog database
-catalog.createDatabase("mydb", new CatalogDatabaseImpl(...))
+catalog.createDatabase("mydb", new CatalogDatabaseImpl(...));
 
 // Create a catalog table
 TableSchema schema = TableSchema.builder()
@@ -134,15 +157,59 @@ catalog.createTable(
             new Kafka()
                 .version("0.11")
                 ....
-                .startFromEarlist(),
+                .startFromEarlist()
+                .toProperties(),
             "my comment"
-        )
+        ),
+        false
     );
 
 List<String> tables = catalog.listTables("mydb"); // tables should contain "mytable"
 {% endhighlight %}
 
 </div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+import org.apache.flink.table.api._
+import org.apache.flink.table.catalog._
+import org.apache.flink.table.catalog.hive.HiveCatalog
+import org.apache.flink.table.descriptors.Kafka
+
+val tableEnv = TableEnvironment.create(EnvironmentSettings.newInstance.build)
+
+// Create a HiveCatalog
+val catalog = new HiveCatalog("myhive", null, "<path_of_hive_conf>", "<hive_version>")
+
+// Register the catalog
+tableEnv.registerCatalog("myhive", catalog)
+
+// Create a catalog database
+catalog.createDatabase("mydb", new CatalogDatabaseImpl(...))
+
+// Create a catalog table
+val schema = TableSchema.builder()
+    .field("name", DataTypes.STRING())
+    .field("age", DataTypes.INT())
+    .build()
+
+catalog.createTable(
+        new ObjectPath("mydb", "mytable"),
+        new CatalogTableImpl(
+            schema,
+            new Kafka()
+                .version("0.11")
+                ....
+                .startFromEarlist()
+                .toProperties(),
+            "my comment"
+        ),
+        false
+    )
+
+val tables = catalog.listTables("mydb") // tables should contain "mytable"
+{% endhighlight %}
+
+</div>
 </div>
 
 ## Catalog API
@@ -154,7 +221,7 @@ List<String> tables = catalog.listTables("mydb"); // tables should contain "myta
 ### 数据库操作
 
 <div class="codetabs" markdown="1">
-<div data-lang="Java" markdown="1">
+<div data-lang="Java/Scala" markdown="1">
 {% highlight java %}
 // create database
 catalog.createDatabase("mydb", new CatalogDatabaseImpl(...), false);
@@ -180,7 +247,7 @@ catalog.listDatabases("mycatalog");
 ### 表操作
 
 <div class="codetabs" markdown="1">
-<div data-lang="Java" markdown="1">
+<div data-lang="Java/Scala" markdown="1">
 {% highlight java %}
 // create table
 catalog.createTable(new ObjectPath("mydb", "mytable"), new CatalogTableImpl(...), false);
@@ -209,7 +276,7 @@ catalog.listTables("mydb");
 ### 视图操作
 
 <div class="codetabs" markdown="1">
-<div data-lang="Java" markdown="1">
+<div data-lang="Java/Scala" markdown="1">
 {% highlight java %}
 // create view
 catalog.createTable(new ObjectPath("mydb", "myview"), new CatalogViewImpl(...), false);
@@ -239,7 +306,7 @@ catalog.listViews("mydb");
 ### 分区操作
 
 <div class="codetabs" markdown="1">
-<div data-lang="Java" markdown="1">
+<div data-lang="Java/Scala" markdown="1">
 {% highlight java %}
 // create view
 catalog.createPartition(
@@ -280,7 +347,7 @@ catalog.listPartitions(new ObjectPath("mydb", "mytable"), Arrays.asList(epr1, ..
 ### 函数操作
 
 <div class="codetabs" markdown="1">
-<div data-lang="Java" markdown="1">
+<div data-lang="Java/Scala" markdown="1">
 {% highlight java %}
 // create function
 catalog.createFunction(new ObjectPath("mydb", "myfunc"), new CatalogFunctionImpl(...), false);
diff --git a/docs/dev/table/common.md b/docs/dev/table/common.md
index a6fe84f..40bd4c8 100644
--- a/docs/dev/table/common.md
+++ b/docs/dev/table/common.md
@@ -35,7 +35,7 @@ Main Differences Between the Two Planners
 3. The implementations of `FilterableTableSource` for the old planner and the Blink planner are incompatible. The old planner will push down `PlannerExpression`s into `FilterableTableSource`, while the Blink planner will push down `Expression`s.
 4. String based key-value config options (Please see the documentation about [Configuration]({{ site.baseurl }}/dev/table/config.html) for details) are only used for the Blink planner.
 5. The implementation(`CalciteConfig`) of `PlannerConfig` in two planners is different.
-6. The Blink planner will optimize multiple-sinks into one DAG (supported only on `TableEnvironment`, not on `StreamTableEnvironment`). The old planner will always optimize each sink into a new DAG, where all DAGs are independent of each other.
+6. The Blink planner will optimize multiple-sinks into one DAG on both `TableEnvironment` and `StreamTableEnvironment`. The old planner will always optimize each sink into a new DAG, where all DAGs are independent of each other.
 7. The old planner does not support catalog statistics now, while the Blink planner does.
 
 
@@ -62,10 +62,8 @@ Table tapiResult = tableEnv.from("table1").select(...);
 Table sqlResult  = tableEnv.sqlQuery("SELECT ... FROM table1 ... ");
 
 // emit a Table API result Table to a TableSink, same for SQL result
-tapiResult.insertInto("outputTable");
-
-// execute
-tableEnv.execute("java_job");
+TableResult tableResult = tapiResult.executeInsert("outputTable");
+tableResult...
 
 {% endhighlight %}
 </div>
@@ -87,10 +85,8 @@ val tapiResult = tableEnv.from("table1").select(...)
 val sqlResult  = tableEnv.sqlQuery("SELECT ... FROM table1 ...")
 
 // emit a Table API result Table to a TableSink, same for SQL result
-tapiResult.insertInto("outputTable")
-
-// execute
-tableEnv.execute("scala_job")
+val tableResult = tapiResult.executeInsert("outputTable")
+tableResult...
 
 {% endhighlight %}
 </div>
@@ -113,10 +109,8 @@ tapi_result = table_env.from_path("table1").select(...)
 sql_result  = table_env.sql_query("SELECT ... FROM table1 ...")
 
 # emit a Table API result Table to a TableSink, same for SQL result
-tapi_result.insert_into("outputTable")
-
-# execute
-table_env.execute("python_job")
+table_result = tapi_result.execute_insert("outputTable")
+table_result...
 
 {% endhighlight %}
 </div>
@@ -425,7 +419,7 @@ table_environment \
 
 <div data-lang="DDL" markdown="1">
 {% highlight sql %}
-tableEnvironment.sqlUpdate("CREATE [TEMPORARY] TABLE MyTable (...) WITH (...)")
+tableEnvironment.executeSql("CREATE [TEMPORARY] TABLE MyTable (...) WITH (...)")
 {% endhighlight %}
 </div>
 </div>
@@ -662,7 +656,7 @@ TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
 // register "RevenueFrance" output table
 
 // compute revenue for all customers from France and emit to "RevenueFrance"
-tableEnv.sqlUpdate(
+tableEnv.executeSql(
     "INSERT INTO RevenueFrance " +
     "SELECT cID, cName, SUM(revenue) AS revSum " +
     "FROM Orders " +
@@ -670,7 +664,6 @@ tableEnv.sqlUpdate(
     "GROUP BY cID, cName"
   );
 
-// execute query
 {% endhighlight %}
 </div>
 
@@ -683,7 +676,7 @@ val tableEnv = ... // see "Create a TableEnvironment" section
 // register "RevenueFrance" output table
 
 // compute revenue for all customers from France and emit to "RevenueFrance"
-tableEnv.sqlUpdate("""
+tableEnv.executeSql("""
   |INSERT INTO RevenueFrance
   |SELECT cID, cName, SUM(revenue) AS revSum
   |FROM Orders
@@ -691,7 +684,6 @@ tableEnv.sqlUpdate("""
   |GROUP BY cID, cName
   """.stripMargin)
 
-// execute query
 {% endhighlight %}
 
 </div>
@@ -705,7 +697,7 @@ table_env = ... # see "Create a TableEnvironment" section
 # register "RevenueFrance" output table
 
 # compute revenue for all customers from France and emit to "RevenueFrance"
-table_env.sql_update(
+table_env.execute_sql(
     "INSERT INTO RevenueFrance "
     "SELECT cID, cName, SUM(revenue) AS revSum "
     "FROM Orders "
@@ -713,7 +705,6 @@ table_env.sql_update(
     "GROUP BY cID, cName"
 )
 
-# execute query
 {% endhighlight %}
 </div>
 </div>
@@ -738,7 +729,7 @@ A batch `Table` can only be written to a `BatchTableSink`, while a streaming `Ta
 
 Please see the documentation about [Table Sources & Sinks]({{ site.baseurl }}/dev/table/sourceSinks.html) for details about available sinks and instructions for how to implement a custom `TableSink`.
 
-The `Table.insertInto(String tableName)` method emits the `Table` to a registered `TableSink`. The method looks up the `TableSink` from the catalog by the name and validates that the schema of the `Table` is identical to the schema of the `TableSink`. 
+The `Table.executeInsert(String tableName)` method emits the `Table` to a registered `TableSink`. The method looks up the `TableSink` from the catalog by the name and validates that the schema of the `Table` is identical to the schema of the `TableSink`. 
 
 The following examples shows how to emit a `Table`:
 
@@ -761,10 +752,10 @@ tableEnv.connect(new FileSystem("/path/to/file"))
 
 // compute a result Table using Table API operators and/or SQL queries
 Table result = ...
+
 // emit the result Table to the registered TableSink
-result.insertInto("CsvSinkTable");
+result.executeInsert("CsvSinkTable");
 
-// execute the program
 {% endhighlight %}
 </div>
 
@@ -788,9 +779,8 @@ tableEnv.connect(new FileSystem("/path/to/file"))
 val result: Table = ...
 
 // emit the result Table to the registered TableSink
-result.insertInto("CsvSinkTable")
+result.executeInsert("CsvSinkTable")
 
-// execute the program
 {% endhighlight %}
 </div>
 
@@ -800,7 +790,7 @@ result.insertInto("CsvSinkTable")
 table_env = ... # see "Create a TableEnvironment" section
 
 # create a TableSink
-t_env.connect(FileSystem().path("/path/to/file")))
+table_env.connect(FileSystem().path("/path/to/file")))
     .with_format(Csv()
                  .field_delimiter(',')
                  .deriveSchema())
@@ -814,9 +804,8 @@ t_env.connect(FileSystem().path("/path/to/file")))
 result = ...
 
 # emit the result Table to the registered TableSink
-result.insert_into("CsvSinkTable")
+result.execute_insert("CsvSinkTable")
 
-# execute the program
 {% endhighlight %}
 </div>
 </div>
@@ -839,8 +828,14 @@ Table API and SQL queries are translated into [DataStream]({{ site.baseurl }}/de
 
 a Table API or SQL query is translated when:
 
-* `TableEnvironment.execute()` is called. A `Table` (emitted to a `TableSink` through `Table.insertInto()`) or a SQL update query (specified through `TableEnvironment.sqlUpdate()`) will be buffered in `TableEnvironment` first. All sinks will be optimized into one DAG.
+* `TableEnvironment.executeSql()` is called. This method is used for executing a given statement, and the sql query is translated immediately once this method is called.
+* `Table.executeInsert()` is called. This method is used for inserting the table content to the given sink path, and the Table API is translated immediately once this method is called.
+* `Table.execute()` is called. This method is used for collecting the table content to local client, and the Table API is translated immediately once this method is called.
+* `StatementSet.execute()` is called. A `Table` (emitted to a sink through `StatementSet.addInsert()`) or an INSERT statement (specified through `StatementSet.addInsertSql()`) will be buffered in `StatementSet` first. They are translated once `StatementSet.execute()` is called. All sinks will be optimized into one DAG.
 * A `Table` is translated when it is converted into a `DataStream` (see [Integration with DataStream and DataSet API](#integration-with-datastream-and-dataset-api)). Once translated, it's a regular DataStream program and is executed when `StreamExecutionEnvironment.execute()` is called.
+
+<span class="label label-danger">Attention</span> **Since 1.11 version, `sqlUpdate()` method and `insertInto()` method are deprecated. If the Table program is built from these two methods, we must use `StreamTableEnvironment.execute()` method instead of `StreamExecutionEnvironment.execute()` method to execute it.**
+
 </div>
 
 <div data-lang="Old planner" markdown="1">
@@ -849,18 +844,16 @@ Table API and SQL queries are translated into [DataStream]({{ site.baseurl }}/de
 1. Optimization of the logical plan
 2. Translation into a DataStream or DataSet program
 
-For streaming, a Table API or SQL query is translated when:
-
-* `TableEnvironment.execute()` is called. A `Table` (emitted to a `TableSink` through `Table.insertInto()`) or a SQL update query (specified through `TableEnvironment.sqlUpdate()`) will be buffered in `TableEnvironment` first. Each sink will be optimized independently. The execution graph contains multiple independent sub-DAGs.
-* A `Table` is translated when it is converted into a `DataStream` (see [Integration with DataStream and DataSet API](#integration-with-datastream-and-dataset-api)). Once translated, it's a regular DataStream program and is executed when `StreamExecutionEnvironment.execute()` is called.
+A Table API or SQL query is translated when:
 
-For batch, a Table API or SQL query is translated when:
+* `TableEnvironment.executeSql()` is called. This method is used for executing a given statement, and the sql query is translated immediately once this method is called.
+* `Table.executeInsert()` is called. This method is used for inserting the table content to the given sink path, and the Table API is translated immediately once this method is called.
+* `Table.execute()` is called. This method is used for collecting the table content to local client, and the Table API is translated immediately once this method is called.
+* `StatementSet.execute()` is called. A `Table` (emitted to a sink through `StatementSet.addInsert()`) or an INSERT statement (specified through `StatementSet.addInsertSql()`) will be buffered in `StatementSet` first. They are translated once `StatementSet.execute()` is called. Each sink will be optimized independently. The execution graph contains multiple independent sub-DAGs.
+* For streaming, a `Table` is translated when it is converted into a `DataStream` (see [Integration with DataStream and DataSet API](#integration-with-datastream-and-dataset-api)). Once translated, it's a regular DataStream program and is executed when `StreamExecutionEnvironment.execute()` is called. For batch, a `Table` is translated when it is converted into a `DataSet` (see [Integration with DataStream and DataSet API](#integration-with-datastream-and-dataset-api)). Once translated,  [...]
 
-* a `Table` is emitted to a `TableSink`, i.e., when `Table.insertInto()` is called.
-* a SQL update query is specified, i.e., when `TableEnvironment.sqlUpdate()` is called.
-* a `Table` is converted into a `DataSet` (see [Integration with DataStream and DataSet API](#integration-with-datastream-and-dataset-api)).
+<span class="label label-danger">Attention</span> **Since 1.11 version, `sqlUpdate()` method and `insertInto()` method are deprecated. For streaming, if the Table program is built from these two methods, we must use `StreamTableEnvironment.execute()` method instead of `StreamExecutionEnvironment.execute()` method to execute it. For batch, if the Table program is built from these two methods, we must use `BatchTableEnvironment.execute()` method instead of `ExecutionEnvironment.execute()`  [...]
 
-Once translated, a Table API or SQL query is handled like a regular DataSet program and is executed when `ExecutionEnvironment.execute()` is called.
 </div>
 
 </div>
@@ -1039,7 +1032,9 @@ val retractStream: DataStream[(Boolean, Row)] = tableEnv.toRetractStream[Row](ta
 </div>
 </div>
 
-**Note:** A detailed discussion about dynamic tables and their properties is given in the [Dynamic Tables](streaming/dynamic_tables.html) document.
+**Note:** A detailed discussion about dynamic tables and their properties is given in the [Dynamic Tables](streaming/dynamic_tables.html) document. 
+
+<span class="label label-danger">Attention</span> **Once the Table is converted to a DataStream, please use the `StreamExecutionEnvironment.execute()` method to execute the DataStream program.**
 
 #### Convert a Table into a DataSet
 
@@ -1084,6 +1079,8 @@ val dsTuple: DataSet[(String, Int)] = tableEnv.toDataSet[(String, Int)](table)
 </div>
 </div>
 
+<span class="label label-danger">Attention</span> **Once the Table is converted to a DataSet, we must use the ExecutionEnvironment.execute method to execute the DataSet program.**
+
 {% top %}
 
 ### Mapping of Data Types to Table Schema
@@ -1435,16 +1432,17 @@ It is possible to tweak the set of optimization rules which are applied in diffe
 </div>
 
 
-### Explaining a Table
+Explaining a Table
+------------------
 
 The Table API provides a mechanism to explain the logical and optimized query plans to compute a `Table`. 
-This is done through the `TableEnvironment.explain(table)` method or `TableEnvironment.explain()` method. `explain(table)` returns the plan of a given `Table`. `explain()` returns the result of a multiple sinks plan and is mainly used for the Blink planner. It returns a String describing three plans:
+This is done through the `Table.explain()` method or `StatementSet.explain()` method. `Table.explain()`returns the plan of a `Table`. `StatementSet.explain()` returns the plan of multiple sinks. It returns a String describing three plans:
 
 1. the Abstract Syntax Tree of the relational query, i.e., the unoptimized logical query plan,
 2. the optimized logical query plan, and
 3. the physical execution plan.
 
-The following code shows an example and the corresponding output for given `Table` using `explain(table)`:
+The following code shows an example and the corresponding output for given `Table` using `Table.explain()` method:
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
@@ -1455,14 +1453,14 @@ StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
 DataStream<Tuple2<Integer, String>> stream1 = env.fromElements(new Tuple2<>(1, "hello"));
 DataStream<Tuple2<Integer, String>> stream2 = env.fromElements(new Tuple2<>(1, "hello"));
 
+// explain Table API
 Table table1 = tEnv.fromDataStream(stream1, $("count"), $("word"));
 Table table2 = tEnv.fromDataStream(stream2, $("count"), $("word"));
 Table table = table1
   .where($("word").like("F%"))
   .unionAll(table2);
+System.out.println(table.explain());
 
-String explanation = tEnv.explain(table);
-System.out.println(explanation);
 {% endhighlight %}
 </div>
 
@@ -1476,9 +1474,8 @@ val table2 = env.fromElements((1, "hello")).toTable(tEnv, $"count", $"word")
 val table = table1
   .where($"word".like("F%"))
   .unionAll(table2)
+println(table.explain())
 
-val explanation: String = tEnv.explain(table)
-println(explanation)
 {% endhighlight %}
 </div>
 
@@ -1492,51 +1489,16 @@ table2 = t_env.from_elements([(1, "hello")], ["count", "word"])
 table = table1 \
     .where("LIKE(word, 'F%')") \
     .union_all(table2)
+print(table.explain())
 
-explanation = t_env.explain(table)
-print(explanation)
 {% endhighlight %}
 </div>
 </div>
 
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
+The result of the above exmaple is
+<div>
 {% highlight text %}
-== Abstract Syntax Tree ==
-LogicalUnion(all=[true])
-  LogicalFilter(condition=[LIKE($1, _UTF-16LE'F%')])
-    FlinkLogicalDataStreamScan(id=[1], fields=[count, word])
-  FlinkLogicalDataStreamScan(id=[2], fields=[count, word])
-
-== Optimized Logical Plan ==
-DataStreamUnion(all=[true], union all=[count, word])
-  DataStreamCalc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')])
-    DataStreamScan(id=[1], fields=[count, word])
-  DataStreamScan(id=[2], fields=[count, word])
-
-== Physical Execution Plan ==
-Stage 1 : Data Source
-	content : collect elements with CollectionInputFormat
-
-Stage 2 : Data Source
-	content : collect elements with CollectionInputFormat
-
-	Stage 3 : Operator
-		content : from: (count, word)
-		ship_strategy : REBALANCE
-
-		Stage 4 : Operator
-			content : where: (LIKE(word, _UTF-16LE'F%')), select: (count, word)
-			ship_strategy : FORWARD
 
-			Stage 5 : Operator
-				content : from: (count, word)
-				ship_strategy : REBALANCE
-{% endhighlight %}
-</div>
-
-<div data-lang="scala" markdown="1">
-{% highlight text %}
 == Abstract Syntax Tree ==
 LogicalUnion(all=[true])
   LogicalFilter(condition=[LIKE($1, _UTF-16LE'F%')])
@@ -1567,62 +1529,11 @@ Stage 2 : Data Source
 			Stage 5 : Operator
 				content : from: (count, word)
 				ship_strategy : REBALANCE
-{% endhighlight %}
-</div>
-
-<div data-lang="python" markdown="1">
-{% highlight text %}
-== Abstract Syntax Tree ==
-LogicalUnion(all=[true])
-  LogicalFilter(condition=[LIKE($1, _UTF-16LE'F%')])
-    FlinkLogicalDataStreamScan(id=[3], fields=[count, word])
-  FlinkLogicalDataStreamScan(id=[6], fields=[count, word])
-
-== Optimized Logical Plan ==
-DataStreamUnion(all=[true], union all=[count, word])
-  DataStreamCalc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')])
-    DataStreamScan(id=[3], fields=[count, word])
-  DataStreamScan(id=[6], fields=[count, word])
 
-== Physical Execution Plan ==
-Stage 1 : Data Source
-	content : collect elements with CollectionInputFormat
-
-	Stage 2 : Operator
-		content : Flat Map
-		ship_strategy : FORWARD
-
-		Stage 3 : Operator
-			content : Map
-			ship_strategy : FORWARD
-
-Stage 4 : Data Source
-	content : collect elements with CollectionInputFormat
-
-	Stage 5 : Operator
-		content : Flat Map
-		ship_strategy : FORWARD
-
-		Stage 6 : Operator
-			content : Map
-			ship_strategy : FORWARD
-
-			Stage 7 : Operator
-				content : Map
-				ship_strategy : FORWARD
-
-				Stage 8 : Operator
-					content : where: (LIKE(word, _UTF-16LE'F%')), select: (count, word)
-					ship_strategy : FORWARD
-
-					Stage 9 : Operator
-						content : Map
-						ship_strategy : FORWARD
 {% endhighlight %}
 </div>
-</div>
 
-The following code shows an example and the corresponding output for multiple-sinks plan using `explain()`:
+The following code shows an example and the corresponding output for multiple-sinks plan using `StatementSet.explain()` method:
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
@@ -1651,14 +1562,16 @@ tEnv.connect(new FileSystem("/sink/path2"))
     .withFormat(new Csv().deriveSchema())
     .withSchema(schema)
     .createTemporaryTable("MySink2");
+    
+StatementSet stmtSet = tEnv.createStatementSet();
 
 Table table1 = tEnv.from("MySource1").where($("word").like("F%"));
-table1.insertInto("MySink1");
+stmtSet.addInsert("MySink1", table1);
 
 Table table2 = table1.unionAll(tEnv.from("MySource2"));
-table2.insertInto("MySink2");
+stmtSet.addInsert("MySink2", table2);
 
-String explanation = tEnv.explain(false);
+String explanation = stmtSet.explain();
 System.out.println(explanation);
 
 {% endhighlight %}
@@ -1689,14 +1602,16 @@ tEnv.connect(new FileSystem("/sink/path2"))
     .withFormat(new Csv().deriveSchema())
     .withSchema(schema)
     .createTemporaryTable("MySink2")
+    
+val stmtSet = tEnv.createStatementSet()
 
 val table1 = tEnv.from("MySource1").where($"word".like("F%"))
-table1.insertInto("MySink1")
+stmtSet.addInsert("MySink1", table1)
 
 val table2 = table1.unionAll(tEnv.from("MySource2"))
-table2.insertInto("MySink2")
+stmtSet.addInsert("MySink2", table2)
 
-val explanation = tEnv.explain(false)
+val explanation = stmtSet.explain()
 println(explanation)
 
 {% endhighlight %}
@@ -1727,15 +1642,18 @@ t_env.connect(FileSystem().path("/sink/path2")))
     .with_format(Csv().deriveSchema())
     .with_schema(schema)
     .create_temporary_table("MySink2")
+    
+stmt_set = t_env.create_statement_set()
 
 table1 = t_env.from_path("MySource1").where("LIKE(word, 'F%')")
-table1.insert_into("MySink1")
+stmt_set.add_insert("MySink1", table1)
 
 table2 = table1.union_all(t_env.from_path("MySource2"))
-table2.insert_into("MySink2")
+stmt_set.add_insert("MySink2", table2)
 
-explanation = t_env.explain()
+explanation = stmt_set.explain()
 print(explanation)
+
 {% endhighlight %}
 </div>
 </div>
diff --git a/docs/dev/table/common.zh.md b/docs/dev/table/common.zh.md
index 7bb7954..f295f39 100644
--- a/docs/dev/table/common.zh.md
+++ b/docs/dev/table/common.zh.md
@@ -35,7 +35,7 @@ Table API 和 SQL 集成在同一套 API 中。这套 API 的核心概念是`Tab
 3. 旧计划器和 Blink 计划器中 `FilterableTableSource` 的实现是不兼容的。旧计划器会将 `PlannerExpression` 下推至 `FilterableTableSource`,而 Blink 计划器则是将 `Expression` 下推。
 4. 基于字符串的键值配置选项仅在 Blink 计划器中使用。(详情参见 [配置]({{ site.baseurl }}/zh/dev/table/config.html) )
 5. `PlannerConfig` 在两种计划器中的实现(`CalciteConfig`)是不同的。
-6. Blink 计划器会将多sink(multiple-sinks)优化成一张有向无环图(DAG)(仅支持 `TableEnvironment`,不支持 `StreamTableEnvironment`)。旧计划器总是将每个sink都优化成一个新的有向无环图,且所有图相互独立。
+6. Blink 计划器会将多sink(multiple-sinks)优化成一张有向无环图(DAG),`TableEnvironment` 和 `StreamTableEnvironment` 都支持该特性。旧计划器总是将每个sink都优化成一个新的有向无环图,且所有图相互独立。
 7. 旧计划器目前不支持 catalog 统计数据,而 Blink 支持。
 
 
@@ -62,7 +62,8 @@ Table tapiResult = tableEnv.from("table1").select(...);
 Table sqlResult  = tableEnv.sqlQuery("SELECT ... FROM table1 ... ");
 
 // emit a Table API result Table to a TableSink, same for SQL result
-tapiResult.insertInto("outputTable");
+TableResult tableResult = tapiResult.executeInsert("outputTable");
+tableResult...
 
 // execute
 tableEnv.execute("java_job");
@@ -87,7 +88,8 @@ val tapiResult = tableEnv.from("table1").select(...)
 val sqlResult  = tableEnv.sqlQuery("SELECT ... FROM table1 ...")
 
 // emit a Table API result Table to a TableSink, same for SQL result
-tapiResult.insertInto("outputTable")
+TableResult tableResult = tapiResult.executeInsert("outputTable");
+tableResult...
 
 // execute
 tableEnv.execute("scala_job")
@@ -113,7 +115,8 @@ tapi_result = table_env.from_path("table1").select(...)
 sql_result  = table_env.sql_query("SELECT ... FROM table1 ...")
 
 # emit a Table API result Table to a TableSink, same for SQL result
-tapi_result.insert_into("outputTable")
+table_result = tapi_result.execute_insert("outputTable")
+table_result...
 
 # execute
 table_env.execute("python_job")
@@ -405,7 +408,7 @@ table_environment \
 
 <div data-lang="DDL" markdown="1">
 {% highlight sql %}
-tableEnvironment.sqlUpdate("CREATE [TEMPORARY] TABLE MyTable (...) WITH (...)")
+tableEnvironment.executeSql("CREATE [TEMPORARY] TABLE MyTable (...) WITH (...)")
 {% endhighlight %}
 </div>
 </div>
@@ -641,7 +644,7 @@ TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
 // register "RevenueFrance" output table
 
 // compute revenue for all customers from France and emit to "RevenueFrance"
-tableEnv.sqlUpdate(
+tableEnv.executeSql(
     "INSERT INTO RevenueFrance " +
     "SELECT cID, cName, SUM(revenue) AS revSum " +
     "FROM Orders " +
@@ -649,7 +652,6 @@ tableEnv.sqlUpdate(
     "GROUP BY cID, cName"
   );
 
-// execute query
 {% endhighlight %}
 </div>
 
@@ -662,7 +664,7 @@ val tableEnv = ... // see "Create a TableEnvironment" section
 // register "RevenueFrance" output table
 
 // compute revenue for all customers from France and emit to "RevenueFrance"
-tableEnv.sqlUpdate("""
+tableEnv.executeSql("""
   |INSERT INTO RevenueFrance
   |SELECT cID, cName, SUM(revenue) AS revSum
   |FROM Orders
@@ -670,7 +672,6 @@ tableEnv.sqlUpdate("""
   |GROUP BY cID, cName
   """.stripMargin)
 
-// execute query
 {% endhighlight %}
 
 </div>
@@ -684,7 +685,7 @@ table_env = ... # see "Create a TableEnvironment" section
 # register "RevenueFrance" output table
 
 # compute revenue for all customers from France and emit to "RevenueFrance"
-table_env.sql_update(
+table_env.execute_sql(
     "INSERT INTO RevenueFrance "
     "SELECT cID, cName, SUM(revenue) AS revSum "
     "FROM Orders "
@@ -692,7 +693,6 @@ table_env.sql_update(
     "GROUP BY cID, cName"
 )
 
-# execute query
 {% endhighlight %}
 </div>
 </div>
@@ -717,7 +717,7 @@ Table API 和 SQL 查询的混用非常简单因为它们都返回 `Table` 对
 
 请参考文档 [Table Sources & Sinks]({{ site.baseurl }}/zh/dev/table/sourceSinks.html) 以获取更多关于可用 Sink 的信息以及如何自定义 `TableSink`。
 
-方法 `Table.insertInto(String tableName)` 将 `Table` 发送至已注册的 `TableSink`。该方法通过名称在 catalog 中查找 `TableSink` 并确认`Table` schema 和 `TableSink` schema 一致。
+方法 `Table.executeInsert(String tableName)` 将 `Table` 发送至已注册的 `TableSink`。该方法通过名称在 catalog 中查找 `TableSink` 并确认`Table` schema 和 `TableSink` schema 一致。
 
 下面的示例演示如何输出 `Table`:
 
@@ -741,9 +741,8 @@ tableEnv.connect(new FileSystem("/path/to/file"))
 // compute a result Table using Table API operators and/or SQL queries
 Table result = ...
 // emit the result Table to the registered TableSink
-result.insertInto("CsvSinkTable");
+result.executeInsert("CsvSinkTable");
 
-// execute the program
 {% endhighlight %}
 </div>
 
@@ -767,9 +766,8 @@ tableEnv.connect(new FileSystem("/path/to/file"))
 val result: Table = ...
 
 // emit the result Table to the registered TableSink
-result.insertInto("CsvSinkTable")
+result.executeInsert("CsvSinkTable")
 
-// execute the program
 {% endhighlight %}
 </div>
 
@@ -779,7 +777,7 @@ result.insertInto("CsvSinkTable")
 table_env = ... # see "Create a TableEnvironment" section
 
 # create a TableSink
-t_env.connect(FileSystem().path("/path/to/file")))
+table_env.connect(FileSystem().path("/path/to/file")))
     .with_format(Csv()
                  .field_delimiter(',')
                  .deriveSchema())
@@ -793,9 +791,8 @@ t_env.connect(FileSystem().path("/path/to/file")))
 result = ...
 
 # emit the result Table to the registered TableSink
-result.insert_into("CsvSinkTable")
+result.execute_insert("CsvSinkTable")
 
-# execute the program
 {% endhighlight %}
 </div>
 </div>
@@ -818,8 +815,13 @@ result.insert_into("CsvSinkTable")
 
 Table API 或者 SQL 查询在下列情况下会被翻译:
 
-* 当 `TableEnvironment.execute()` 被调用时。`Table` (通过 `Table.insertInto()` 输出给 `TableSink`)和 SQL (通过调用 `TableEnvironment.sqlUpdate()`)会先被缓存到 `TableEnvironment` 中,所有的 sink 会被优化成一张有向无环图。
-* `Table` 被转换成 `DataStream` 时(参阅[与 DataStream 和 DataSet API 结合](#integration-with-datastream-and-dataset-api))。转换完成后,它就成为一个普通的 DataStream 程序,并且会在调用 `StreamExecutionEnvironment.execute()` 的时候被执行。
+* 当 `TableEnvironment.executeSql()` 被调用时。该方法是用来执行一个 SQL 语句,一旦该方法被调用, SQL 语句立即被翻译。
+* 当 `Table.executeInsert()` 被调用时。该方法是用来将一个表的内容插入到目标表中,一旦该方法被调用, TABLE API 程序立即被翻译。
+* 当 `Table.execute()` 被调用时。该方法是用来将一个表的内容收集到本地,一旦该方法被调用, TABLE API 程序立即被翻译。
+* 当 `StatementSet.execute()` 被调用时。`Table` (通过 `StatementSet.addInsert()` 输出给某个 `Sink`)和 INSERT 语句 (通过调用 `StatementSet.addInsertSql()`)会先被缓存到 `StatementSet` 中,`StatementSet.execute()` 方法被调用时,所有的 sink 会被优化成一张有向无环图。
+* 当 `Table` 被转换成 `DataStream` 时(参阅[与 DataStream 和 DataSet API 结合](#integration-with-datastream-and-dataset-api))。转换完成后,它就成为一个普通的 DataStream 程序,并会在调用 `StreamExecutionEnvironment.execute()` 时被执行。
+
+<span class="label label-danger">注意</span> **从 1.11 版本开始,`sqlUpdate` 方法 和 `insertInto` 方法被废弃,从这两个方法构建的 Table 程序必须通过 `StreamTableEnvironment.execute()` 方法执行,而不能通过 `StreamExecutionEnvironment.execute()` 方法来执行。**
 </div>
 
 <div data-lang="Old planner" markdown="1">
@@ -828,18 +830,16 @@ Table API 和 SQL 查询会被翻译成 [DataStream]({{ site.baseurl }}/zh/dev/d
 1. 优化逻辑执行计划
 2. 翻译成 DataStream 或 DataSet 程序
 
-对于 Streaming 而言,Table API 或者 SQL 查询在下列情况下会被翻译:
-
-* 当 `TableEnvironment.execute()` 被调用时。`Table` (通过 `Table.insertInto()` 输出给 `TableSink`)和 SQL (通过调用 `TableEnvironment.sqlUpdate()`)会先被缓存到 `TableEnvironment` 中,每个 sink 会被单独优化。执行计划将包括多个独立的有向无环子图。
-* `Table` 被转换成 `DataStream` 时(参阅[与 DataStream 和 DataSet API 结合](#integration-with-datastream-and-dataset-api))。转换完成后,它就成为一个普通的 DataStream 程序,并且会在调用 `StreamExecutionEnvironment.execute()` 的时候被执行。
+Table API 或者 SQL 查询在下列情况下会被翻译:
 
-对于 Batch 而言,Table API 或者 SQL 查询在下列情况下会被翻译:
+* 当 `TableEnvironment.executeSql()` 被调用时。该方法是用来执行一个 SQL 语句,一旦该方法被调用, SQL 语句立即被翻译。
+* 当 `Table.executeInsert()` 被调用时。该方法是用来将一个表的内容插入到目标表中,一旦该方法被调用, TABLE API 程序立即被翻译。
+* 当 `Table.execute()` 被调用时。该方法是用来将一个表的内容收集到本地,一旦该方法被调用, TABLE API 程序立即被翻译。
+* 当 `StatementSet.execute()` 被调用时。`Table` (通过 `StatementSet.addInsert()` 输出给某个 `Sink`)和 INSERT 语句 (通过调用 `StatementSet.addInsertSql()`)会先被缓存到 `StatementSet` 中,`StatementSet.execute()` 方法被调用时,所有的 sink 会被优化成一张有向无环图。
+* 对于 Streaming 而言,当`Table` 被转换成 `DataStream` 时(参阅[与 DataStream 和 DataSet API 结合](#integration-with-datastream-and-dataset-api))触发翻译。转换完成后,它就成为一个普通的 DataStream 程序,并会在调用 `StreamExecutionEnvironment.execute()` 时被执行。对于 Batch 而言,`Table` 被转换成 `DataSet` 时(参阅[与 DataStream 和 DataSet API 结合](#integration-with-datastream-and-dataset-api))触发翻译。转换完成后,它就成为一个普通的 DataSet 程序,并会在调用 `ExecutionEnvironment.execute()` 时被执行。
 
-* `Table` 被输出给 `TableSink`,即当调用 `Table.insertInto()` 时。
-* SQL 更新语句执行时,即,当调用 `TableEnvironment.sqlUpdate()` 时。
-* `Table` 被转换成 `DataSet` 时(参阅[与 DataStream 和 DataSet API 结合](#integration-with-datastream-and-dataset-api))。
+<span class="label label-danger">注意</span> **从 1.11 版本开始,`sqlUpdate` 方法 和 `insertInto` 方法被废弃。对于 Streaming 而言,如果一个 Table 程序是从这两个方法构建出来的,必须通过 `StreamTableEnvironment.execute()` 方法执行,而不能通过 `StreamExecutionEnvironment.execute()` 方法执行;对于 Batch 而言,如果一个 Table 程序是从这两个方法构建出来的,必须通过 `BatchTableEnvironment.execute()` 方法执行,而不能通过 `ExecutionEnvironment.execute()` 方法执行。**
 
-翻译完成后,Table API 或者 SQL 查询会被当做普通的 DataSet 程序对待并且会在调用 `ExecutionEnvironment.execute()` 的时候被执行。
 </div>
 
 </div>
@@ -1023,6 +1023,8 @@ val retractStream: DataStream[(Boolean, Row)] = tableEnv.toRetractStream[Row](ta
 
 **注意:** 文档[动态表](streaming/dynamic_tables.html)给出了有关动态表及其属性的详细讨论。
 
+<span class="label label-danger">注意</span> **一旦 Table 被转化为 DataStream,必须使用 StreamExecutionEnvironment 的 execute 方法执行该 DataStream 作业。**
+
 #### 将表转换成 DataSet
 
 将 `Table` 转换成 `DataSet` 的过程如下:
@@ -1066,6 +1068,8 @@ val dsTuple: DataSet[(String, Int)] = tableEnv.toDataSet[(String, Int)](table)
 </div>
 </div>
 
+<span class="label label-danger">注意</span> **一旦 Table 被转化为 DataSet,必须使用 ExecutionEnvironment 的 execute 方法执行该 DataSet 作业。**
+
 {% top %}
 
 ### 数据类型到 Table Schema 的映射
@@ -1417,16 +1421,17 @@ Apache Flink 利用 Apache Calcite 来优化和翻译查询。当前执行的优
 </div>
 
 
-### 解释表
+解释表
+------------------
 
 Table API 提供了一种机制来解释计算 `Table` 的逻辑和优化查询计划。
-这是通过 `TableEnvironment.explain(table)` 或者 `TableEnvironment.explain()` 完成的。`explain(table)` 返回给定 `Table` 的计划。 `explain()` 返回多 sink 计划的结果并且主要用于 Blink 计划器。它返回一个描述三种计划的字符串:
+这是通过 `Table.explain()` 方法或者 `StatementSet.explain()` 方法来完成的。`Table.explain()` 返回一个 Table 的计划。`StatementSet.explain()` 返回多 sink 计划的结果。它返回一个描述三种计划的字符串:
 
 1. 关系查询的抽象语法树(the Abstract Syntax Tree),即未优化的逻辑查询计划,
 2. 优化的逻辑查询计划,以及
 3. 物理执行计划。
 
-以下代码展示了一个示例以及对给定 `Table` 使用 `explain(table)` 的相应输出:
+以下代码展示了一个示例以及对给定 `Table` 使用 `Table.explain()` 方法的相应输出:
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
@@ -1437,14 +1442,14 @@ StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
 DataStream<Tuple2<Integer, String>> stream1 = env.fromElements(new Tuple2<>(1, "hello"));
 DataStream<Tuple2<Integer, String>> stream2 = env.fromElements(new Tuple2<>(1, "hello"));
 
+// explain Table API
 Table table1 = tEnv.fromDataStream(stream1, $("count"), $("word"));
 Table table2 = tEnv.fromDataStream(stream2, $("count"), $("word"));
 Table table = table1
   .where($("word").like("F%"))
   .unionAll(table2);
+System.out.println(table.explain());
 
-String explanation = tEnv.explain(table);
-System.out.println(explanation);
 {% endhighlight %}
 </div>
 
@@ -1458,9 +1463,8 @@ val table2 = env.fromElements((1, "hello")).toTable(tEnv, $"count", $"word")
 val table = table1
   .where($"word".like("F%"))
   .unionAll(table2)
+println(table.explain())
 
-val explanation: String = tEnv.explain(table)
-println(explanation)
 {% endhighlight %}
 </div>
 
@@ -1474,50 +1478,14 @@ table2 = t_env.from_elements([(1, "hello")], ["count", "word"])
 table = table1 \
     .where("LIKE(word, 'F%')") \
     .union_all(table2)
+print(table.explain())
 
-explanation = t_env.explain(table)
-print(explanation)
 {% endhighlight %}
 </div>
 </div>
 
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight text %}
-== Abstract Syntax Tree ==
-LogicalUnion(all=[true])
-  LogicalFilter(condition=[LIKE($1, _UTF-16LE'F%')])
-    FlinkLogicalDataStreamScan(id=[1], fields=[count, word])
-  FlinkLogicalDataStreamScan(id=[2], fields=[count, word])
-
-== Optimized Logical Plan ==
-DataStreamUnion(all=[true], union all=[count, word])
-  DataStreamCalc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')])
-    DataStreamScan(id=[1], fields=[count, word])
-  DataStreamScan(id=[2], fields=[count, word])
-
-== Physical Execution Plan ==
-Stage 1 : Data Source
-	content : collect elements with CollectionInputFormat
-
-Stage 2 : Data Source
-	content : collect elements with CollectionInputFormat
-
-	Stage 3 : Operator
-		content : from: (count, word)
-		ship_strategy : REBALANCE
-
-		Stage 4 : Operator
-			content : where: (LIKE(word, _UTF-16LE'F%')), select: (count, word)
-			ship_strategy : FORWARD
-
-			Stage 5 : Operator
-				content : from: (count, word)
-				ship_strategy : REBALANCE
-{% endhighlight %}
-</div>
-
-<div data-lang="scala" markdown="1">
+上述例子的结果是:
+<div>
 {% highlight text %}
 == Abstract Syntax Tree ==
 LogicalUnion(all=[true])
@@ -1552,59 +1520,7 @@ Stage 2 : Data Source
 {% endhighlight %}
 </div>
 
-<div data-lang="python" markdown="1">
-{% highlight text %}
-== Abstract Syntax Tree ==
-LogicalUnion(all=[true])
-  LogicalFilter(condition=[LIKE($1, _UTF-16LE'F%')])
-    FlinkLogicalDataStreamScan(id=[3], fields=[count, word])
-  FlinkLogicalDataStreamScan(id=[6], fields=[count, word])
-
-== Optimized Logical Plan ==
-DataStreamUnion(all=[true], union all=[count, word])
-  DataStreamCalc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')])
-    DataStreamScan(id=[3], fields=[count, word])
-  DataStreamScan(id=[6], fields=[count, word])
-
-== Physical Execution Plan ==
-Stage 1 : Data Source
-	content : collect elements with CollectionInputFormat
-
-	Stage 2 : Operator
-		content : Flat Map
-		ship_strategy : FORWARD
-
-		Stage 3 : Operator
-			content : Map
-			ship_strategy : FORWARD
-
-Stage 4 : Data Source
-	content : collect elements with CollectionInputFormat
-
-	Stage 5 : Operator
-		content : Flat Map
-		ship_strategy : FORWARD
-
-		Stage 6 : Operator
-			content : Map
-			ship_strategy : FORWARD
-
-			Stage 7 : Operator
-				content : Map
-				ship_strategy : FORWARD
-
-				Stage 8 : Operator
-					content : where: (LIKE(word, _UTF-16LE'F%')), select: (count, word)
-					ship_strategy : FORWARD
-
-					Stage 9 : Operator
-						content : Map
-						ship_strategy : FORWARD
-{% endhighlight %}
-</div>
-</div>
-
-以下代码展示了一个示例以及使用 `explain()` 的多 sink 计划的相应输出:
+以下代码展示了一个示例以及使用 `StatementSet.explain()` 的多 sink 计划的相应输出:
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
@@ -1634,13 +1550,15 @@ tEnv.connect(new FileSystem("/sink/path2"))
     .withSchema(schema)
     .createTemporaryTable("MySink2");
 
+StatementSet stmtSet = tEnv.createStatementSet();
+
 Table table1 = tEnv.from("MySource1").where($("word").like("F%"));
-table1.insertInto("MySink1");
+stmtSet.addInsert("MySink1", table1);
 
 Table table2 = table1.unionAll(tEnv.from("MySource2"));
-table2.insertInto("MySink2");
+stmtSet.addInsert("MySink2", table2);
 
-String explanation = tEnv.explain(false);
+String explanation = stmtSet.explain();
 System.out.println(explanation);
 
 {% endhighlight %}
@@ -1672,13 +1590,15 @@ tEnv.connect(new FileSystem("/sink/path2"))
     .withSchema(schema)
     .createTemporaryTable("MySink2")
 
+val stmtSet = tEnv.createStatementSet()
+
 val table1 = tEnv.from("MySource1").where($"word".like("F%"))
-table1.insertInto("MySink1")
+stmtSet.addInsert("MySink1", table1)
 
 val table2 = table1.unionAll(tEnv.from("MySource2"))
-table2.insertInto("MySink2")
+stmtSet.addInsert("MySink2", table2)
 
-val explanation = tEnv.explain(false)
+val explanation = stmtSet.explain()
 println(explanation)
 
 {% endhighlight %}
@@ -1710,14 +1630,17 @@ t_env.connect(FileSystem().path("/sink/path2")))
     .with_schema(schema)
     .create_temporary_table("MySink2")
 
+stmt_set = t_env.create_statement_set()
+
 table1 = t_env.from_path("MySource1").where("LIKE(word, 'F%')")
-table1.insert_into("MySink1")
+stmt_set.add_insert("MySink1", table1)
 
 table2 = table1.union_all(t_env.from_path("MySource2"))
-table2.insert_into("MySink2")
+stmt_set.add_insert("MySink2", table2)
 
-explanation = t_env.explain()
+explanation = stmt_set.explain()
 print(explanation)
+
 {% endhighlight %}
 </div>
 </div>
diff --git a/docs/dev/table/connect.md b/docs/dev/table/connect.md
index 0519752..7aa151b 100644
--- a/docs/dev/table/connect.md
+++ b/docs/dev/table/connect.md
@@ -91,7 +91,7 @@ The subsequent sections will cover each definition part ([connector](connect.htm
 <div class="codetabs" markdown="1">
 <div data-lang="DDL" markdown="1">
 {% highlight sql %}
-tableEnvironment.sqlUpdate(
+tableEnvironment.executeSql(
     "CREATE TABLE MyTable (\n" +
     "  ...    -- declare table schema \n" +
     ") WITH (\n" +
@@ -2078,7 +2078,7 @@ tableEnv.registerTableSink(
   sink);
 
 Table table = ...
-table.insertInto("csvOutputTable");
+table.executeInsert("csvOutputTable");
 {% endhighlight %}
 </div>
 
@@ -2099,7 +2099,7 @@ tableEnv.registerTableSink(
   sink)
 
 val table: Table = ???
-table.insertInto("csvOutputTable")
+table.executeInsert("csvOutputTable")
 {% endhighlight %}
 </div>
 
diff --git a/docs/dev/table/connect.zh.md b/docs/dev/table/connect.zh.md
index 41d87a1..622e04f2 100644
--- a/docs/dev/table/connect.zh.md
+++ b/docs/dev/table/connect.zh.md
@@ -91,7 +91,7 @@ The subsequent sections will cover each definition part ([connector](connect.htm
 <div class="codetabs" markdown="1">
 <div data-lang="DDL" markdown="1">
 {% highlight sql %}
-tableEnvironment.sqlUpdate(
+tableEnvironment.executeSql(
     "CREATE TABLE MyTable (\n" +
     "  ...    -- declare table schema \n" +
     ") WITH (\n" +
@@ -2074,7 +2074,7 @@ tableEnv.registerTableSink(
   sink);
 
 Table table = ...
-table.insertInto("csvOutputTable");
+table.executeInsert("csvOutputTable");
 {% endhighlight %}
 </div>
 
@@ -2095,7 +2095,7 @@ tableEnv.registerTableSink(
   sink)
 
 val table: Table = ???
-table.insertInto("csvOutputTable")
+table.executeInsert("csvOutputTable")
 {% endhighlight %}
 </div>
 
diff --git a/docs/dev/table/sql/alter.md b/docs/dev/table/sql/alter.md
index 82ae467..61d9098 100644
--- a/docs/dev/table/sql/alter.md
+++ b/docs/dev/table/sql/alter.md
@@ -35,7 +35,7 @@ Flink SQL supports the following ALTER statements for now:
 
 ## Run an ALTER statement
 
-ALTER statements can be executed with the `sqlUpdate()` method of the `TableEnvironment`, or executed in [SQL CLI]({{ site.baseurl }}/dev/table/sqlClient.html). The `sqlUpdate()` method returns nothing for a successful ALTER operation, otherwise will throw an exception.
+ALTER statements can be executed with the `executeSql()` method of the `TableEnvironment`, or executed in [SQL CLI]({{ site.baseurl }}/dev/table/sqlClient.html). The `executeSql()` method returns 'OK' for a successful ALTER operation, otherwise will throw an exception.
 
 The following examples show how to run an ALTER statement in `TableEnvironment` and in SQL CLI.
 
@@ -46,16 +46,18 @@ EnvironmentSettings settings = EnvironmentSettings.newInstance()...
 TableEnvironment tableEnv = TableEnvironment.create(settings);
 
 // register a table named "Orders"
-tableEnv.sqlUpdate("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)");
+tableEnv.executeSql("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)");
 
 // a string array: ["Orders"]
-String[] tables = tableEnv.listTable();
+String[] tables = tableEnv.listTables();
+// or tableEnv.executeSql("SHOW TABLES").print();
 
 // rename "Orders" to "NewOrders"
-tableEnv.sqlUpdate("ALTER TABLE Orders RENAME TO NewOrders;");
+tableEnv.executeSql("ALTER TABLE Orders RENAME TO NewOrders;");
 
 // a string array: ["NewOrders"]
-String[] tables = tableEnv.listTable();
+String[] tables = tableEnv.listTables();
+// or tableEnv.executeSql("SHOW TABLES").print();
 {% endhighlight %}
 </div>
 
@@ -65,32 +67,36 @@ val settings = EnvironmentSettings.newInstance()...
 val tableEnv = TableEnvironment.create(settings)
 
 // register a table named "Orders"
-tableEnv.sqlUpdate("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)");
+tableEnv.executeSql("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)");
 
 // a string array: ["Orders"]
-val tables = tableEnv.listTable()
+val tables = tableEnv.listTables()
+// or tableEnv.executeSql("SHOW TABLES").print()
 
 // rename "Orders" to "NewOrders"
-tableEnv.sqlUpdate("ALTER TABLE Orders RENAME TO NewOrders;")
+tableEnv.executeSql("ALTER TABLE Orders RENAME TO NewOrders;")
 
 // a string array: ["NewOrders"]
-val tables = tableEnv.listTable()
+val tables = tableEnv.listTables()
+// or tableEnv.executeSql("SHOW TABLES").print()
 {% endhighlight %}
 </div>
 
 <div data-lang="python" markdown="1">
 {% highlight python %}
-settings = EnvironmentSettings.newInstance()...
-table_env = TableEnvironment.create(settings)
+settings = EnvironmentSettings.new_instance()...
+table_env = StreamTableEnvironment.create(env, settings)
 
 # a string array: ["Orders"]
-tables = tableEnv.listTable()
+tables = table_env.list_tables()
+# or table_env.execute_sql("SHOW TABLES").print()
 
 # rename "Orders" to "NewOrders"
-tableEnv.sqlUpdate("ALTER TABLE Orders RENAME TO NewOrders;")
+table_env.execute_sql("ALTER TABLE Orders RENAME TO NewOrders;")
 
 # a string array: ["NewOrders"]
-tables = tableEnv.listTable()
+tables = table_env.list_tables()
+# or table_env.execute_sql("SHOW TABLES").print()
 {% endhighlight %}
 </div>
 
diff --git a/docs/dev/table/sql/alter.zh.md b/docs/dev/table/sql/alter.zh.md
index 16e4cda..7d3eed1 100644
--- a/docs/dev/table/sql/alter.zh.md
+++ b/docs/dev/table/sql/alter.zh.md
@@ -35,7 +35,7 @@ Flink SQL 目前支持以下 ALTER 语句:
 
 ## 执行 ALTER 语句
 
-可以使用 `TableEnvironment` 中的 `sqlUpdate()` 方法执行 ALTER 语句,也可以在 [SQL CLI]({{ site.baseurl }}/zh/dev/table/sqlClient.html) 中执行 ALTER 语句。 若 ALTER 操作执行成功,`sqlUpdate()` 方法不返回任何内容,否则会抛出异常。
+可以使用 `TableEnvironment` 中的 `executeSql()` 方法执行 ALTER 语句,也可以在 [SQL CLI]({{ site.baseurl }}/zh/dev/table/sqlClient.html) 中执行 ALTER 语句。 若 ALTER 操作执行成功,`executeSql()` 方法返回 'OK',否则会抛出异常。
 
 以下的例子展示了如何在 `TableEnvironment` 和  SQL CLI 中执行一个 ALTER 语句。
 
@@ -46,16 +46,18 @@ EnvironmentSettings settings = EnvironmentSettings.newInstance()...
 TableEnvironment tableEnv = TableEnvironment.create(settings);
 
 // 注册名为 “Orders” 的表
-tableEnv.sqlUpdate("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)");
+tableEnv.executeSql("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)");
 
 // 字符串数组: ["Orders"]
-String[] tables = tableEnv.listTable();
+String[] tables = tableEnv.listTables();
+// or tableEnv.executeSql("SHOW TABLES").print();
 
 // 把 “Orders” 的表名改为 “NewOrders”
-tableEnv.sqlUpdate("ALTER TABLE Orders RENAME TO NewOrders;");
+tableEnv.executeSql("ALTER TABLE Orders RENAME TO NewOrders;");
 
 // 字符串数组:["NewOrders"]
-String[] tables = tableEnv.listTable();
+String[] tables = tableEnv.listTables();
+// or tableEnv.executeSql("SHOW TABLES").print();
 {% endhighlight %}
 </div>
 
@@ -65,32 +67,36 @@ val settings = EnvironmentSettings.newInstance()...
 val tableEnv = TableEnvironment.create(settings)
 
 // 注册名为 “Orders” 的表
-tableEnv.sqlUpdate("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)");
+tableEnv.executeSql("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)");
 
 // 字符串数组: ["Orders"]
-val tables = tableEnv.listTable()
+val tables = tableEnv.listTables()
+// or tableEnv.executeSql("SHOW TABLES").print()
 
 // 把 “Orders” 的表名改为 “NewOrders”
-tableEnv.sqlUpdate("ALTER TABLE Orders RENAME TO NewOrders;")
+tableEnv.executeSql("ALTER TABLE Orders RENAME TO NewOrders;")
 
 // 字符串数组:["NewOrders"]
-val tables = tableEnv.listTable()
+val tables = tableEnv.listTables()
+// or tableEnv.executeSql("SHOW TABLES").print()
 {% endhighlight %}
 </div>
 
 <div data-lang="python" markdown="1">
 {% highlight python %}
-settings = EnvironmentSettings.newInstance()...
-table_env = TableEnvironment.create(settings)
+settings = EnvironmentSettings.new_instance()...
+table_env = StreamTableEnvironment.create(env, settings)
 
 # 字符串数组: ["Orders"]
-tables = tableEnv.listTable()
+tables = table_env.list_tables()
+# or table_env.execute_sql("SHOW TABLES").print()
 
 # 把 “Orders” 的表名改为 “NewOrders”
-tableEnv.sqlUpdate("ALTER TABLE Orders RENAME TO NewOrders;")
+table_env.execute_sql("ALTER TABLE Orders RENAME TO NewOrders;")
 
 # 字符串数组:["NewOrders"]
-tables = tableEnv.listTable()
+tables = table_env.list_tables()
+# or table_env.execute_sql("SHOW TABLES").print()
 {% endhighlight %}
 </div>
 
diff --git a/docs/dev/table/sql/create.md b/docs/dev/table/sql/create.md
index 00eb2ca..4bc398c 100644
--- a/docs/dev/table/sql/create.md
+++ b/docs/dev/table/sql/create.md
@@ -36,7 +36,7 @@ Flink SQL supports the following CREATE statements for now:
 
 ## Run a CREATE statement
 
-CREATE statements can be executed with the `sqlUpdate()` method of the `TableEnvironment`, or executed in [SQL CLI]({{ site.baseurl }}/dev/table/sqlClient.html). The `sqlUpdate()` method returns nothing for a successful CREATE operation, otherwise will throw an exception.
+CREATE statements can be executed with the `executeSql()` method of the `TableEnvironment`, or executed in [SQL CLI]({{ site.baseurl }}/dev/table/sqlClient.html). The `executeSql()` method returns 'OK' for a successful CREATE operation, otherwise will throw an exception.
 
 The following examples show how to run a CREATE statement in `TableEnvironment` and in SQL CLI.
 
@@ -48,16 +48,16 @@ TableEnvironment tableEnv = TableEnvironment.create(settings);
 
 // SQL query with a registered table
 // register a table named "Orders"
-tableEnv.sqlUpdate("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)");
+tableEnv.executeSql("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)");
 // run a SQL query on the Table and retrieve the result as a new Table
 Table result = tableEnv.sqlQuery(
   "SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'");
 
-// SQL update with a registered table
+// Execute insert SQL with a registered table
 // register a TableSink
-tableEnv.sqlUpdate("CREATE TABLE RubberOrders(product STRING, amount INT) WITH (...)");
-// run a SQL update query on the Table and emit the result to the TableSink
-tableEnv.sqlUpdate(
+tableEnv.executeSql("CREATE TABLE RubberOrders(product STRING, amount INT) WITH (...)");
+// run an insert SQL on the Table and emit the result to the TableSink
+tableEnv.executeSql(
   "INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'");
 {% endhighlight %}
 </div>
@@ -69,38 +69,38 @@ val tableEnv = TableEnvironment.create(settings)
 
 // SQL query with a registered table
 // register a table named "Orders"
-tableEnv.sqlUpdate("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)");
+tableEnv.executeSql("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)");
 // run a SQL query on the Table and retrieve the result as a new Table
 val result = tableEnv.sqlQuery(
   "SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'");
 
-// SQL update with a registered table
+// Execute insert SQL with a registered table
 // register a TableSink
-tableEnv.sqlUpdate("CREATE TABLE RubberOrders(product STRING, amount INT) WITH ('connector.path'='/path/to/file' ...)");
-// run a SQL update query on the Table and emit the result to the TableSink
-tableEnv.sqlUpdate(
+tableEnv.executeSql("CREATE TABLE RubberOrders(product STRING, amount INT) WITH ('connector.path'='/path/to/file' ...)");
+// run an insert SQL on the Table and emit the result to the TableSink
+tableEnv.executeSql(
   "INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")
 {% endhighlight %}
 </div>
 
 <div data-lang="python" markdown="1">
 {% highlight python %}
-settings = EnvironmentSettings.newInstance()...
-table_env = TableEnvironment.create(settings)
+settings = EnvironmentSettings.new_instance()...
+table_env = StreamTableEnvironment.create(env, settings)
 
 # SQL query with a registered table
 # register a table named "Orders"
-tableEnv.sqlUpdate("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)");
+table_env.execute_sql("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)");
 # run a SQL query on the Table and retrieve the result as a new Table
-result = tableEnv.sqlQuery(
+result = table_env.sql_query(
   "SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'");
 
-# SQL update with a registered table
+# Execute an INSERT SQL with a registered table
 # register a TableSink
-table_env.sql_update("CREATE TABLE RubberOrders(product STRING, amount INT) WITH (...)")
-# run a SQL update query on the Table and emit the result to the TableSink
+table_env.execute_sql("CREATE TABLE RubberOrders(product STRING, amount INT) WITH (...)")
+# run an INSERT SQL on the Table and emit the result to the TableSink
 table_env \
-    .sql_update("INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")
+    .execute_sql("INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")
 {% endhighlight %}
 </div>
 
diff --git a/docs/dev/table/sql/create.zh.md b/docs/dev/table/sql/create.zh.md
index 34c9bc8..027debf 100644
--- a/docs/dev/table/sql/create.zh.md
+++ b/docs/dev/table/sql/create.zh.md
@@ -36,7 +36,7 @@ CREATE 语句用于向当前或指定的 [Catalog]({{ site.baseurl }}/zh/dev/tab
 
 ## 执行 CREATE 语句
 
-可以使用 `TableEnvironment` 中的 `sqlUpdate()` 方法执行 CREATE 语句,也可以在 [SQL CLI]({{ site.baseurl }}/zh/dev/table/sqlClient.html) 中执行 CREATE 语句。 若 CREATE 操作执行成功,`sqlUpdate()` 方法不返回任何内容,否则会抛出异常。
+可以使用 `TableEnvironment` 中的 `executeSql()` 方法执行 CREATE 语句,也可以在 [SQL CLI]({{ site.baseurl }}/zh/dev/table/sqlClient.html) 中执行 CREATE 语句。 若 CREATE 操作执行成功,`executeSql()` 方法返回 'OK',否则会抛出异常。
 
 以下的例子展示了如何在 `TableEnvironment` 和  SQL CLI 中执行一个 CREATE 语句。
 
@@ -46,18 +46,18 @@ CREATE 语句用于向当前或指定的 [Catalog]({{ site.baseurl }}/zh/dev/tab
 EnvironmentSettings settings = EnvironmentSettings.newInstance()...
 TableEnvironment tableEnv = TableEnvironment.create(settings);
 
-// 对已经已经注册的表进行 SQL 查询
+// 对已注册的表进行 SQL 查询
 // 注册名为 “Orders” 的表
-tableEnv.sqlUpdate("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)");
+tableEnv.executeSql("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)");
 // 在表上执行 SQL 查询,并把得到的结果作为一个新的表
 Table result = tableEnv.sqlQuery(
   "SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'");
 
-// SQL 对已注册的表进行 update 操作
+// 对已注册的表进行 INSERT 操作
 // 注册 TableSink
-tableEnv.sqlUpdate("CREATE TABLE RubberOrders(product STRING, amount INT) WITH (...)");
-// 在表上执行 SQL 更新查询并向 TableSink 发出结果
-tableEnv.sqlUpdate(
+tableEnv.executeSql("CREATE TABLE RubberOrders(product STRING, amount INT) WITH (...)");
+// 在表上执行 INSERT 语句并向 TableSink 发出结果
+tableEnv.executeSql(
   "INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'");
 {% endhighlight %}
 </div>
@@ -67,40 +67,40 @@ tableEnv.sqlUpdate(
 val settings = EnvironmentSettings.newInstance()...
 val tableEnv = TableEnvironment.create(settings)
 
-// 对已经已经注册的表进行 SQL 查询
+// 对已注册的表进行 SQL 查询
 // 注册名为 “Orders” 的表
-tableEnv.sqlUpdate("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)");
+tableEnv.executeSql("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)");
 // 在表上执行 SQL 查询,并把得到的结果作为一个新的表
 val result = tableEnv.sqlQuery(
   "SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'");
 
-// SQL 对已注册的表进行 update 操作
+// 对已注册的表进行 INSERT 操作
 // 注册 TableSink
-tableEnv.sqlUpdate("CREATE TABLE RubberOrders(product STRING, amount INT) WITH ('connector.path'='/path/to/file' ...)");
-// 在表上执行 SQL 更新查询并向 TableSink 发出结果
-tableEnv.sqlUpdate(
+tableEnv.executeSql("CREATE TABLE RubberOrders(product STRING, amount INT) WITH ('connector.path'='/path/to/file' ...)");
+// 在表上执行 INSERT 语句并向 TableSink 发出结果
+tableEnv.executeSql(
   "INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")
 {% endhighlight %}
 </div>
 
 <div data-lang="python" markdown="1">
 {% highlight python %}
-settings = EnvironmentSettings.newInstance()...
-table_env = TableEnvironment.create(settings)
+settings = EnvironmentSettings.new_instance()...
+table_env = StreamTableEnvironment.create(env, settings)
 
-# 对已经已经注册的表进行 SQL 查询
+# 对已经注册的表进行 SQL 查询
 # 注册名为 “Orders” 的表
-tableEnv.sqlUpdate("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)");
+table_env.execute_sql("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)");
 # 在表上执行 SQL 查询,并把得到的结果作为一个新的表
-result = tableEnv.sqlQuery(
+result = table_env.sql_query(
   "SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'");
 
-# SQL 对已注册的表进行 update 操作
+# 对已注册的表进行 INSERT 操作
 # 注册 TableSink
-table_env.sql_update("CREATE TABLE RubberOrders(product STRING, amount INT) WITH (...)")
-# 在表上执行 SQL 更新查询并向 TableSink 发出结果
+table_env.execute_sql("CREATE TABLE RubberOrders(product STRING, amount INT) WITH (...)")
+# 在表上执行 INSERT 语句并向 TableSink 发出结果
 table_env \
-    .sql_update("INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")
+    .execute_sql("INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")
 {% endhighlight %}
 </div>
 
diff --git a/docs/dev/table/sql/drop.md b/docs/dev/table/sql/drop.md
index bcaea0b..c12105f 100644
--- a/docs/dev/table/sql/drop.md
+++ b/docs/dev/table/sql/drop.md
@@ -36,7 +36,7 @@ Flink SQL supports the following DROP statements for now:
 
 ## Run a DROP statement
 
-DROP statements can be executed with the `sqlUpdate()` method of the `TableEnvironment`, or executed in [SQL CLI]({{ site.baseurl }}/dev/table/sqlClient.html). The `sqlUpdate()` method returns nothing for a successful DROP operation, otherwise will throw an exception.
+DROP statements can be executed with the `executeSql()` method of the `TableEnvironment`, or executed in [SQL CLI]({{ site.baseurl }}/dev/table/sqlClient.html). The `executeSql()` method returns 'OK' for a successful DROP operation, otherwise will throw an exception.
 
 The following examples show how to run a DROP statement in `TableEnvironment` and in SQL CLI.
 
@@ -47,16 +47,18 @@ EnvironmentSettings settings = EnvironmentSettings.newInstance()...
 TableEnvironment tableEnv = TableEnvironment.create(settings);
 
 // register a table named "Orders"
-tableEnv.sqlUpdate("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)");
+tableEnv.executeSql("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)");
 
 // a string array: ["Orders"]
-String[] tables = tableEnv.listTable();
+String[] tables = tableEnv.listTables();
+// or tableEnv.executeSql("SHOW TABLES").print();
 
 // drop "Orders" table from catalog
-tableEnv.sqlUpdate("DROP TABLE Orders");
+tableEnv.executeSql("DROP TABLE Orders");
 
 // an empty string array
-String[] tables = tableEnv.listTable();
+String[] tables = tableEnv.listTables();
+// or tableEnv.executeSql("SHOW TABLES").print();
 {% endhighlight %}
 </div>
 
@@ -66,32 +68,36 @@ val settings = EnvironmentSettings.newInstance()...
 val tableEnv = TableEnvironment.create(settings)
 
 // register a table named "Orders"
-tableEnv.sqlUpdate("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)");
+tableEnv.executeSql("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)")
 
 // a string array: ["Orders"]
-val tables = tableEnv.listTable()
+val tables = tableEnv.listTables()
+// or tableEnv.executeSql("SHOW TABLES").print()
 
 // drop "Orders" table from catalog
-tableEnv.sqlUpdate("DROP TABLE Orders")
+tableEnv.executeSql("DROP TABLE Orders")
 
 // an empty string array
-val tables = tableEnv.listTable()
+val tables = tableEnv.listTables()
+// or tableEnv.executeSql("SHOW TABLES").print()
 {% endhighlight %}
 </div>
 
 <div data-lang="python" markdown="1">
 {% highlight python %}
-settings = EnvironmentSettings.newInstance()...
-table_env = TableEnvironment.create(settings)
+settings = EnvironmentSettings.new_instance()...
+table_env = StreamTableEnvironment.create(env, settings)
 
 # a string array: ["Orders"]
-tables = tableEnv.listTable()
+tables = table_env.list_tables()
+# or table_env.execute_sql("SHOW TABLES").print()
 
 # drop "Orders" table from catalog
-tableEnv.sqlUpdate("DROP TABLE Orders")
+table_env.execute_sql("DROP TABLE Orders")
 
 # an empty string array
-tables = tableEnv.listTable()
+tables = table_env.list_tables()
+# or table_env.execute_sql("SHOW TABLES").print()
 {% endhighlight %}
 </div>
 
diff --git a/docs/dev/table/sql/drop.zh.md b/docs/dev/table/sql/drop.zh.md
index 8d1b76d..911ae97 100644
--- a/docs/dev/table/sql/drop.zh.md
+++ b/docs/dev/table/sql/drop.zh.md
@@ -36,7 +36,7 @@ Flink SQL 目前支持以下 DROP 语句:
 
 ## 执行 DROP 语句
 
-可以使用 `TableEnvironment` 中的 `sqlUpdate()` 方法执行 DROP 语句,也可以在 [SQL CLI]({{ site.baseurl }}/zh/dev/table/sqlClient.html) 中执行 DROP 语句。 若 DROP 操作执行成功,`sqlUpdate()` 方法不返回任何内容,否则会抛出异常。
+可以使用 `TableEnvironment` 中的 `executeSql()` 方法执行 DROP 语句,也可以在 [SQL CLI]({{ site.baseurl }}/zh/dev/table/sqlClient.html) 中执行 DROP 语句。 若 DROP 操作执行成功,`executeSql()` 方法返回 'OK',否则会抛出异常。
 
 以下的例子展示了如何在 `TableEnvironment` 和  SQL CLI 中执行一个 DROP 语句。
 
@@ -47,16 +47,18 @@ EnvironmentSettings settings = EnvironmentSettings.newInstance()...
 TableEnvironment tableEnv = TableEnvironment.create(settings);
 
 // 注册名为 “Orders” 的表
-tableEnv.sqlUpdate("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)");
+tableEnv.executeSql("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)");
 
 // 字符串数组: ["Orders"]
-String[] tables = tableEnv.listTable();
+String[] tables = tableEnv.listTables();
+// or tableEnv.executeSql("SHOW TABLES").print();
 
 // 从 catalog 删除 “Orders” 表
-tableEnv.sqlUpdate("DROP TABLE Orders");
+tableEnv.executeSql("DROP TABLE Orders");
 
 // 空字符串数组
-String[] tables = tableEnv.listTable();
+String[] tables = tableEnv.listTables();
+// or tableEnv.executeSql("SHOW TABLES").print();
 {% endhighlight %}
 </div>
 
@@ -66,32 +68,36 @@ val settings = EnvironmentSettings.newInstance()...
 val tableEnv = TableEnvironment.create(settings)
 
 // 注册名为 “Orders” 的表
-tableEnv.sqlUpdate("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)");
+tableEnv.executeSql("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)")
 
 // 字符串数组: ["Orders"]
-val tables = tableEnv.listTable()
+val tables = tableEnv.listTables()
+// or tableEnv.executeSql("SHOW TABLES").print()
 
 // 从 catalog 删除 “Orders” 表
-tableEnv.sqlUpdate("DROP TABLE Orders")
+tableEnv.executeSql("DROP TABLE Orders")
 
 // 空字符串数组
-val tables = tableEnv.listTable()
+val tables = tableEnv.listTables()
+// or tableEnv.executeSql("SHOW TABLES").print()
 {% endhighlight %}
 </div>
 
 <div data-lang="python" markdown="1">
 {% highlight python %}
-settings = EnvironmentSettings.newInstance()...
-table_env = TableEnvironment.create(settings)
+settings = EnvironmentSettings.new_instance()...
+table_env = StreamTableEnvironment.create(env, settings)
 
 # 字符串数组: ["Orders"]
-tables = tableEnv.listTable()
+tables = table_env.listTables()
+# or table_env.executeSql("SHOW TABLES").print()
 
 # 从 catalog 删除 “Orders” 表
-tableEnv.sqlUpdate("DROP TABLE Orders")
+table_env.execute_sql("DROP TABLE Orders")
 
 # 空字符串数组
-tables = tableEnv.listTable()
+tables = table_env.list_tables()
+# or table_env.execute_sql("SHOW TABLES").print()
 {% endhighlight %}
 </div>
 
diff --git a/docs/dev/table/sql/index.zh.md b/docs/dev/table/sql/index.zh.md
index b93f4ff..70130e8 100644
--- a/docs/dev/table/sql/index.zh.md
+++ b/docs/dev/table/sql/index.zh.md
@@ -28,7 +28,7 @@ under the License.
 
 本页面列出了目前 Flink SQL 所支持的所有语句:
 
-- [SELECT (查询)](queries.html)
+- [SELECT (Queries)](queries.html)
 - [CREATE TABLE, DATABASE, VIEW, FUNCTION](create.html)
 - [DROP TABLE, DATABASE, VIEW, FUNCTION](drop.html)
 - [ALTER TABLE, DATABASE, FUNCTION](alter.html)
diff --git a/docs/dev/table/sql/insert.md b/docs/dev/table/sql/insert.md
index 96052ad..01fa413 100644
--- a/docs/dev/table/sql/insert.md
+++ b/docs/dev/table/sql/insert.md
@@ -29,9 +29,10 @@ INSERT statements are used to add rows to a table.
 
 ## Run an INSERT statement
 
-INSERT statements are specified with the `sqlUpdate()` method of the `TableEnvironment` or executed in [SQL CLI]({{ site.baseurl }}/dev/table/sqlClient.html). The method `sqlUpdate()` for INSERT statements is a lazy execution, they will be executed only when `TableEnvironment.execute(jobName)` is invoked.
+Single INSERT statement can be executed through the `executeSql()` method of the `TableEnvironment`, or executed in [SQL CLI]({{ site.baseurl }}/dev/table/sqlClient.html). The `executeSql()` method for INSERT statement will submit a Flink job immediately, and return a `TableResult` instance which associates the submitted job. 
+Multiple INSERT statements can be executed through the `addInsertSql()` method of the `StatementSet` which can be created by the `TableEnvironment.createStatementSet()` method. The `addInsertSql()` method is a lazy execution, they will be executed only when `StatementSet.execute()` is invoked.
 
-The following examples show how to run an INSERT statement in `TableEnvironment` and in SQL CLI.
+The following examples show how to run a single INSERT statement in `TableEnvironment` and in SQL CLI, run multiple INSERT statements in `StatementSet`.
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
@@ -40,12 +41,31 @@ EnvironmentSettings settings = EnvironmentSettings.newInstance()...
 TableEnvironment tEnv = TableEnvironment.create(settings);
 
 // register a source table named "Orders" and a sink table named "RubberOrders"
-tEnv.sqlUpdate("CREATE TABLE Orders (`user` BIGINT, product VARCHAR, amount INT) WITH (...)");
-tEnv.sqlUpdate("CREATE TABLE RubberOrders(product VARCHAR, amount INT) WITH (...)");
+tEnv.executeSql("CREATE TABLE Orders (`user` BIGINT, product VARCHAR, amount INT) WITH (...)");
+tEnv.executeSql("CREATE TABLE RubberOrders(product VARCHAR, amount INT) WITH (...)");
 
-// run a SQL update query on the registered source table and emit the result to registered sink table
-tEnv.sqlUpdate(
+// run a single INSERT query on the registered source table and emit the result to registered sink table
+TableResult tableResult1 = tEnv.executeSql(
   "INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'");
+// get job status through TableResult
+System.out.println(tableResult1.getJobClient().get().getJobStatus());
+
+//----------------------------------------------------------------------------
+// register another sink table named "GlassOrders" for multiple INSERT queries
+tEnv.executeSql("CREATE TABLE GlassOrders(product VARCHAR, amount INT) WITH (...)");
+
+// run multiple INSERT queries on the registered source table and emit the result to registered sink tables
+StatementSet stmtSet = tEnv.createStatementSet();
+// only single INSERT query can be accepted by `addInsertSql` method
+stmtSet.addInsertSql(
+  "INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'");
+stmtSet.addInsertSql(
+  "INSERT INTO GlassOrders SELECT product, amount FROM Orders WHERE product LIKE '%Glass%'");
+// execute all statements together
+TableResult tableResult2 = stmtSet.execute();
+// get job status through TableResult
+System.out.println(tableResult2.getJobClient().get().getJobStatus());
+
 {% endhighlight %}
 </div>
 
@@ -55,27 +75,65 @@ val settings = EnvironmentSettings.newInstance()...
 val tEnv = TableEnvironment.create(settings)
 
 // register a source table named "Orders" and a sink table named "RubberOrders"
-tEnv.sqlUpdate("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)")
-tEnv.sqlUpdate("CREATE TABLE RubberOrders(product STRING, amount INT) WITH (...)")
+tEnv.executeSql("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)")
+tEnv.executeSql("CREATE TABLE RubberOrders(product STRING, amount INT) WITH (...)")
 
-// run a SQL update query on the registered source table and emit the result to registered sink table
-tEnv.sqlUpdate(
+// run a single INSERT query on the registered source table and emit the result to registered sink table
+val tableResult1 = tEnv.executeSql(
   "INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")
+// get job status through TableResult
+println(tableResult1.getJobClient().get().getJobStatus())
+
+//----------------------------------------------------------------------------
+// register another sink table named "GlassOrders" for multiple INSERT queries
+tEnv.executeSql("CREATE TABLE GlassOrders(product VARCHAR, amount INT) WITH (...)")
+
+// run multiple INSERT queries on the registered source table and emit the result to registered sink tables
+val stmtSet = tEnv.createStatementSet()
+// only single INSERT query can be accepted by `addInsertSql` method
+stmtSet.addInsertSql(
+  "INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")
+stmtSet.addInsertSql(
+  "INSERT INTO GlassOrders SELECT product, amount FROM Orders WHERE product LIKE '%Glass%'")
+// execute all statements together
+val tableResult2 = stmtSet.execute()
+// get job status through TableResult
+println(tableResult2.getJobClient().get().getJobStatus())
+
 {% endhighlight %}
 </div>
 
 <div data-lang="python" markdown="1">
 {% highlight python %}
-settings = EnvironmentSettings.newInstance()...
-table_env = TableEnvironment.create(settings)
+settings = EnvironmentSettings.new_instance()...
+table_env = StreamTableEnvironment.create(env, settings)
 
 # register a source table named "Orders" and a sink table named "RubberOrders"
-table_env.sqlUpdate("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)")
-table_env.sqlUpdate("CREATE TABLE RubberOrders(product STRING, amount INT) WITH (...)")
+table_env.execute_sql("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)")
+table_env.execute_sql("CREATE TABLE RubberOrders(product STRING, amount INT) WITH (...)")
+
+# run a single INSERT query on the registered source table and emit the result to registered sink table
+table_result1 = table_env \
+    .execute_sql("INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")
+# get job status through TableResult
+print(table_result1get_job_client().get_job_status())
+
+#----------------------------------------------------------------------------
+# register another sink table named "GlassOrders" for multiple INSERT queries
+table_env.execute_sql("CREATE TABLE GlassOrders(product VARCHAR, amount INT) WITH (...)")
+
+# run multiple INSERT queries on the registered source table and emit the result to registered sink tables
+stmt_set = table_env.create_statement_set()
+# only single INSERT query can be accepted by `add_insert_sql` method
+stmt_set \
+    .add_insert_sql("INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")
+stmt_set \
+    .add_insert_sql("INSERT INTO GlassOrders SELECT product, amount FROM Orders WHERE product LIKE '%Glass%'")
+# execute all statements together
+table_result2 = stmt_set.execute()
+# get job status through TableResult
+print(table_result2.get_job_client().get_job_status())
 
-# run a SQL update query on the registered source table and emit the result to registered sink table
-table_env \
-    .sqlUpdate("INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")
 {% endhighlight %}
 </div>
 
diff --git a/docs/dev/table/sql/insert.zh.md b/docs/dev/table/sql/insert.zh.md
index 14ad9a4..863447c 100644
--- a/docs/dev/table/sql/insert.zh.md
+++ b/docs/dev/table/sql/insert.zh.md
@@ -29,9 +29,10 @@ INSERT 语句用来向表中添加行。
 
 ## 执行 INSERT 语句
 
-可以使用 `TableEnvironment` 中的 `sqlUpdate()` 方法执行 INSERT 语句,也可以在 [SQL CLI]({{ site.baseurl }}/zh/dev/table/sqlClient.html) 中执行 INSERT 语句。`sqlUpdate()` 方法执行 INSERT 语句时时懒执行的,只有当`TableEnvironment.execute(jobName)`被调用时才会被执行。
+单条 INSERT 语句,可以使用 `TableEnvironment` 中的 `executeSql()` 方法执行,也可以在 [SQL CLI]({{ site.baseurl }}/zh/dev/table/sqlClient.html) 中执行 INSERT 语句。`executeSql()` 方法执行 INSERT 语句时会立即提交一个 Flink 作业,并且返回一个 TableResult 对象,通过该对象可以获取 JobClient 方便的操作提交的作业。
+多条 INSERT 语句,使用 `TableEnvironment` 中的 `createStatementSet` 创建一个 `StatementSet` 对象,然后使用 `StatementSet` 中的 `addInsertSql()` 方法添加多条 INSERT 语句,最后通过 `StatementSet` 中的 `execute()` 方法来执行。
 
-以下的例子展示了如何在 `TableEnvironment` 和  SQL CLI 中执行一个 INSERT 语句。
+以下的例子展示了如何在 `TableEnvironment` 和  SQL CLI 中执行一条 INSERT 语句,或者通过 `StatementSet` 执行多条 INSERT 语句。
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
@@ -40,12 +41,31 @@ EnvironmentSettings settings = EnvironmentSettings.newInstance()...
 TableEnvironment tEnv = TableEnvironment.create(settings);
 
 // 注册一个 "Orders" 源表,和 "RubberOrders" 结果表
-tEnv.sqlUpdate("CREATE TABLE Orders (`user` BIGINT, product VARCHAR, amount INT) WITH (...)");
-tEnv.sqlUpdate("CREATE TABLE RubberOrders(product VARCHAR, amount INT) WITH (...)");
+tEnv.executeSql("CREATE TABLE Orders (`user` BIGINT, product VARCHAR, amount INT) WITH (...)");
+tEnv.executeSql("CREATE TABLE RubberOrders(product VARCHAR, amount INT) WITH (...)");
 
-// 运行一个 INSERT 语句,将源表的数据输出到结果表中
-tEnv.sqlUpdate(
+// 运行一条 INSERT 语句,将源表的数据输出到结果表中
+TableResult tableResult1 = tEnv.executeSql(
   "INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'");
+// 通过 TableResult 来获取作业状态
+System.out.println(tableResult1.getJobClient().get().getJobStatus());
+
+//----------------------------------------------------------------------------
+// 注册一个 "GlassOrders" 结果表用于运行多 INSERT 语句
+tEnv.executeSql("CREATE TABLE GlassOrders(product VARCHAR, amount INT) WITH (...)");
+
+// 运行多条 INSERT 语句,将原表数据输出到多个结果表中
+StatementSet stmtSet = tEnv.createStatementSet();
+// `addInsertSql` 方法每次只接收单条 INSERT 语句
+stmtSet.addInsertSql(
+  "INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'");
+stmtSet.addInsertSql(
+  "INSERT INTO GlassOrders SELECT product, amount FROM Orders WHERE product LIKE '%Glass%'");
+// 执行刚刚添加的所有 INSERT 语句
+TableResult tableResult2 = stmtSet.execute();
+// 通过 TableResult 来获取作业状态
+System.out.println(tableResult1.getJobClient().get().getJobStatus());
+
 {% endhighlight %}
 </div>
 
@@ -55,27 +75,66 @@ val settings = EnvironmentSettings.newInstance()...
 val tEnv = TableEnvironment.create(settings)
 
 // 注册一个 "Orders" 源表,和 "RubberOrders" 结果表
-tEnv.sqlUpdate("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)")
-tEnv.sqlUpdate("CREATE TABLE RubberOrders(product STRING, amount INT) WITH (...)")
+tEnv.executeSql("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)")
+tEnv.executeSql("CREATE TABLE RubberOrders(product STRING, amount INT) WITH (...)")
 
 // 运行一个 INSERT 语句,将源表的数据输出到结果表中
-tEnv.sqlUpdate(
+val tableResult1 = tEnv.executeSql(
   "INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")
+// 通过 TableResult 来获取作业状态
+println(tableResult1.getJobClient().get().getJobStatus())
+
+//----------------------------------------------------------------------------
+// 注册一个 "GlassOrders" 结果表用于运行多 INSERT 语句
+tEnv.executeSql("CREATE TABLE GlassOrders(product VARCHAR, amount INT) WITH (...)");
+
+// 运行多个 INSERT 语句,将原表数据输出到多个结果表中
+val stmtSet = tEnv.createStatementSet()
+// `addInsertSql` 方法每次只接收单条 INSERT 语句
+stmtSet.addInsertSql(
+  "INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")
+stmtSet.addInsertSql(
+  "INSERT INTO GlassOrders SELECT product, amount FROM Orders WHERE product LIKE '%Glass%'")
+// 执行刚刚添加的所有 INSERT 语句
+val tableResult2 = stmtSet.execute()
+// 通过 TableResult 来获取作业状态
+println(tableResult1.getJobClient().get().getJobStatus())
+  
 {% endhighlight %}
 </div>
 
 <div data-lang="python" markdown="1">
 {% highlight python %}
-settings = EnvironmentSettings.newInstance()...
-table_env = TableEnvironment.create(settings)
+settings = EnvironmentSettings.new_instance()...
+table_env = StreamTableEnvironment.create(env, settings)
 
 # 注册一个 "Orders" 源表,和 "RubberOrders" 结果表
-table_env.sqlUpdate("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)")
-table_env.sqlUpdate("CREATE TABLE RubberOrders(product STRING, amount INT) WITH (...)")
+table_env.executeSql("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)")
+table_env.executeSql("CREATE TABLE RubberOrders(product STRING, amount INT) WITH (...)")
+
+# 运行一条 INSERT 语句,将源表的数据输出到结果表中
+table_result1 = table_env \
+    .executeSql("INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")
+# 通过 TableResult 来获取作业状态
+print(table_result1.get_job_client().get_job_status())
+
+#----------------------------------------------------------------------------
+# 注册一个 "GlassOrders" 结果表用于运行多 INSERT 语句
+table_env.execute_sql("CREATE TABLE GlassOrders(product VARCHAR, amount INT) WITH (...)")
+
+# 运行多条 INSERT 语句,将原表数据输出到多个结果表中
+stmt_set = table_env.create_statement_set()
+# `add_insert_sql` 方法每次只接收单条 INSERT 语句
+stmt_set \
+    .add_insert_sql("INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")
+stmt_set \
+    .add_insert_sql("INSERT INTO GlassOrders SELECT product, amount FROM Orders WHERE product LIKE '%Glass%'")
+# 执行刚刚添加的所有 INSERT 语句
+table_result2 = stmt_set.execute()
+# 通过 TableResult 来获取作业状态
+print(table_result2.get_job_client().get_job_status())
+
 
-# 运行一个 INSERT 语句,将源表的数据输出到结果表中
-table_env \
-    .sqlUpdate("INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")
 {% endhighlight %}
 </div>
 
diff --git a/docs/dev/table/sql/queries.md b/docs/dev/table/sql/queries.md
index add05e8..b667c29 100644
--- a/docs/dev/table/sql/queries.md
+++ b/docs/dev/table/sql/queries.md
@@ -25,7 +25,7 @@ under the License.
 * This will be replaced by the TOC
 {:toc}
 
-SELECT queries are specified with the `sqlQuery()` method of the `TableEnvironment`. The method returns the result of the SELECT query as a `Table`. A `Table` can be used in [subsequent SQL and Table API queries]({{ site.baseurl }}/dev/table/common.html#mixing-table-api-and-sql), be [converted into a DataSet or DataStream]({{ site.baseurl }}/dev/table/common.html#integration-with-datastream-and-dataset-api), or [written to a TableSink]({{ site.baseurl }}/dev/table/common.html#emit-a-tabl [...]
+SELECT statements and VALUES statements are specified with the `sqlQuery()` method of the `TableEnvironment`. The method returns the result of the SELECT statement (or the VALUES statements) as a `Table`. A `Table` can be used in [subsequent SQL and Table API queries]({{ site.baseurl }}/dev/table/common.html#mixing-table-api-and-sql), be [converted into a DataSet or DataStream]({{ site.baseurl }}/dev/table/common.html#integration-with-datastream-and-dataset-api), or [written to a TableSi [...]
 
 In order to access a table in a SQL query, it must be [registered in the TableEnvironment]({{ site.baseurl }}/dev/table/common.html#register-tables-in-the-catalog). A table can be registered from a [TableSource]({{ site.baseurl }}/dev/table/common.html#register-a-tablesource), [Table]({{ site.baseurl }}/dev/table/common.html#register-a-table), [CREATE TABLE statement](#create-table), [DataStream, or DataSet]({{ site.baseurl }}/dev/table/common.html#register-a-datastream-or-dataset-as-tab [...]
 
@@ -58,7 +58,6 @@ tableEnv.createTemporaryView("Orders", ds, $("user"), $("product"), $("amount"))
 Table result2 = tableEnv.sqlQuery(
   "SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'");
 
-// SQL update with a registered table
 // create and register a TableSink
 final Schema schema = new Schema()
     .field("product", DataTypes.STRING())
@@ -69,8 +68,8 @@ tableEnv.connect(new FileSystem("/path/to/file"))
     .withSchema(schema)
     .createTemporaryTable("RubberOrders");
 
-// run a SQL update query on the Table and emit the result to the TableSink
-tableEnv.sqlUpdate(
+// run an INSERT SQL on the Table and emit the result to the TableSink
+tableEnv.executeSql(
   "INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'");
 {% endhighlight %}
 </div>
@@ -95,7 +94,6 @@ tableEnv.createTemporaryView("Orders", ds, $"user", $"product", $"amount")
 val result2 = tableEnv.sqlQuery(
   "SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")
 
-// SQL update with a registered table
 // create and register a TableSink
 val schema = new Schema()
     .field("product", DataTypes.STRING())
@@ -106,8 +104,8 @@ tableEnv.connect(new FileSystem("/path/to/file"))
     .withSchema(schema)
     .createTemporaryTable("RubberOrders")
 
-// run a SQL update query on the Table and emit the result to the TableSink
-tableEnv.sqlUpdate(
+// run an INSERT SQL on the Table and emit the result to the TableSink
+tableEnv.executeSql(
   "INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")
 {% endhighlight %}
 </div>
@@ -123,7 +121,6 @@ table = table_env.from_elements(..., ['user', 'product', 'amount'])
 result = table_env \
     .sql_query("SELECT SUM(amount) FROM %s WHERE product LIKE '%%Rubber%%'" % table)
 
-# SQL update with a registered table
 # create and register a TableSink
 t_env.connect(FileSystem().path("/path/to/file")))
     .with_format(Csv()
@@ -134,16 +131,107 @@ t_env.connect(FileSystem().path("/path/to/file")))
                  .field("amount", DataTypes.BIGINT()))
     .create_temporary_table("RubberOrders")
 
-# run a SQL update query on the Table and emit the result to the TableSink
+# run an INSERT SQL on the Table and emit the result to the TableSink
 table_env \
-    .sql_update("INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")
+    .execute_sql("INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")
 {% endhighlight %}
 </div>
 </div>
 
 {% top %}
 
-## Supported Syntax
+## Execute a Query
+A SELECT statement or a VALUES statement can be executed to collect the content to local through the `TableEnvironment.executeSql()` method. The method returns the result of the SELECT statement (or the VALUES statement) as a `TableResult`. Similar to a SELECT statement, a `Table` object can be executed using the `Table.execute()` method to collect the content of the query to the local client.
+`TableResult.collect()` method returns a closeable row iterator. The select job will not be finished unless all result data has been collected. We should actively close the job to avoid resource leak through the `CloseableIterator#close()` method. 
+We can also print the select result to client console through the `TableResult.print()` method. The result data in `TableResult` can be accessed only once. Thus, `collect()` and `print()` must not be called after each other.
+
+For streaming job, `TableResult.collect()` method or `TableResult.print` method guarantee end-to-end exactly-once record delivery. This requires the checkpointing mechanism to be enabled. By default, checkpointing is disabled. To enable checkpointing, we can set checkpointing properties (see the <a href="{{ site.baseurl }}/ops/config.html#checkpointing">checkpointing config</a> for details) through `TableConfig`.
+So a result record can be accessed by client only after its corresponding checkpoint completes.
+
+**Notes:** For streaming mode, only append-only query is supported now. 
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
+// enable checkpointing
+tableEnv.getConfig().getConfiguration().set(
+  ExecutionCheckpointingOptions.CHECKPOINTING_MODE, CheckpointingMode.EXACTLY_ONCE);
+tableEnv.getConfig().getConfiguration().set(
+  ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofSeconds(10));
+
+tableEnv.executeSql("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)");
+
+// execute SELECT statement
+TableResult tableResult1 = tableEnv.executeSql("SELECT * FROM Orders");
+// use try-with-resources statement to make sure the iterator will be closed automatically
+try (CloseableIterator<Row> it = tableResult1.collect()) {
+    while(it.hasNext()) {
+        Row row = it.next();
+        // handle row
+    }
+}
+
+// execute Table
+TableResult tableResult2 = tableEnv.sqlQuery("SELECT * FROM Orders").execute();
+tableResult2.print();
+
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val env = StreamExecutionEnvironment.getExecutionEnvironment()
+val tableEnv = StreamTableEnvironment.create(env, settings)
+// enable checkpointing
+tableEnv.getConfig.getConfiguration.set(
+  ExecutionCheckpointingOptions.CHECKPOINTING_MODE, CheckpointingMode.EXACTLY_ONCE)
+tableEnv.getConfig.getConfiguration.set(
+  ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofSeconds(10))
+
+tableEnv.executeSql("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)")
+
+// execute SELECT statement
+val tableResult1 = tableEnv.executeSql("SELECT * FROM Orders")
+val it = tableResult1.collect()
+try while (it.hasNext) {
+  val row = it.next
+  // handle row
+}
+finally it.close() // close the iterator to avoid resource leak
+
+// execute Table
+val tableResult2 = tableEnv.sqlQuery("SELECT * FROM Orders").execute()
+tableResult2.print()
+
+{% endhighlight %}
+</div>
+<div data-lang="python" markdown="1">
+{% highlight python %}
+env = StreamExecutionEnvironment.get_execution_environment()
+table_env = StreamTableEnvironment.create(env, settings)
+# enable checkpointing
+table_env.get_config().get_configuration().set_string("execution.checkpointing.mode", "EXACTLY_ONCE")
+table_env.get_config().get_configuration().set_string("execution.checkpointing.interval", "10s")
+
+table_env.execute_sql("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)")
+
+# execute SELECT statement
+table_result1 = table_env.execute_sql("SELECT * FROM Orders")
+table_result1.print()
+
+# execute Table
+table_result2 = table_env.sql_query("SELECT * FROM Orders").execute()
+table_result2.print()
+
+{% endhighlight %}
+</div>
+</div>
+
+{% top %}
+
+
+## Syntax
 
 Flink parses SQL using [Apache Calcite](https://calcite.apache.org/docs/reference.html), which supports standard ANSI SQL.
 
diff --git a/docs/dev/table/sql/queries.zh.md b/docs/dev/table/sql/queries.zh.md
index e4e5d69..62efaca 100644
--- a/docs/dev/table/sql/queries.zh.md
+++ b/docs/dev/table/sql/queries.zh.md
@@ -25,11 +25,11 @@ under the License.
 * This will be replaced by the TOC
 {:toc}
 
-SELECT 查询需要使用 `TableEnvironment` 的 `sqlQuery()` 方法加以指定。这个方法会以 `Table` 的形式返回 SELECT 的查询结果。 `Table` 可以被用于 [随后的SQL 与 Table API 查询]({{ site.baseurl }}/zh/dev/table/common.html#mixing-table-api-and-sql) 、 [转换为 DataSet 或 DataStream ]({{ site.baseurl }}/zh/dev/table/common.html#integration-with-datastream-and-dataset-api)或 [输出到 TableSink ]({{ site.baseurl }}/zh/dev/table/common.html#emit-a-table)。SQL 与 Table API 的查询可以进行无缝融合、整体优化并翻译为单一的程序。
+SELECT 语句和 VALUES 语句需要使用 `TableEnvironment` 的 `sqlQuery()` 方法加以指定。这个方法会以 `Table` 的形式返回 SELECT (或 VALUE)的查询结果。`Table` 可以被用于 [随后的SQL 与 Table API 查询]({{ site.baseurl }}/zh/dev/table/common.html#mixing-table-api-and-sql) 、 [转换为 DataSet 或 DataStream ]({{ site.baseurl }}/zh/dev/table/common.html#integration-with-datastream-and-dataset-api)或 [输出到 TableSink ]({{ site.baseurl }}/zh/dev/table/common.html#emit-a-table)。SQL 与 Table API 的查询可以进行无缝融合、整体优化并翻译为单一的程序。
 
 为了可以在 SQL 查询中访问到表,你需要先 [在 TableEnvironment 中注册表 ]({{ site.baseurl }}/zh/dev/table/common.html#register-tables-in-the-catalog)。表可以通过 [TableSource]({{ site.baseurl }}/zh/dev/table/common.html#register-a-tablesource)、 [Table]({{ site.baseurl }}/zh/dev/table/common.html#register-a-table)、[CREATE TABLE 语句](create.html)、 [DataStream 或 DataSet]({{ site.baseurl }}/zh/dev/table/common.html#register-a-datastream-or-dataset-as-table) 注册。 用户也可以通过 [向 TableEnvironment 中注册 catalog ]({{ site.baseurl }}/ [...]
 
-为方便起见 `Table.toString()` 将会在其 `TableEnvironment` 中自动使用一个唯一的名字注册表并返回表名。 因此, `Table` 对象可以如下文所示样例,直接内联到 SQL 查询中。
+为方便起见 `Table.toString()` 将会在其 `TableEnvironment` 中自动使用一个唯一的名字注册表并返回表名。 因此, `Table` 对象可以如下文所示样例,直接内联到 SQL 语句中。
 
 **注意:** 查询若包括了不支持的 SQL 特性,将会抛出 `TableException`。批处理和流处理所支持的 SQL 特性将会在下述章节中列出。
 
@@ -58,7 +58,6 @@ tableEnv.createTemporaryView("Orders", ds, $("user"), $("product"), $("amount"))
 Table result2 = tableEnv.sqlQuery(
   "SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'");
 
-// SQL 更新一个已经注册的表
 // 创建并注册一个 TableSink
 final Schema schema = new Schema()
     .field("product", DataTypes.STRING())
@@ -69,8 +68,8 @@ tableEnv.connect(new FileSystem("/path/to/file"))
     .withSchema(schema)
     .createTemporaryTable("RubberOrders");
 
-// 在表上执行更新语句并把结果发出到 TableSink
-tableEnv.sqlUpdate(
+// 在表上执行插入语句并把结果发出到 TableSink
+tableEnv.executeSql(
   "INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'");
 {% endhighlight %}
 </div>
@@ -88,14 +87,12 @@ val table = ds.toTable(tableEnv, $"user", $"product", $"amount")
 val result = tableEnv.sqlQuery(
   s"SELECT SUM(amount) FROM $table WHERE product LIKE '%Rubber%'")
 
-// SQL 查询一个已经注册的表
 // 使用名称 "Orders" 注册一个 DataStream 
 tableEnv.createTemporaryView("Orders", ds, $"user", $"product", $"amount")
 // 在表上执行 SQL 查询并得到以新表返回的结果
 val result2 = tableEnv.sqlQuery(
   "SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")
 
-// 使用 SQL 更新一个已经注册的表
 // 创建并注册一个 TableSink
 val schema = new Schema()
     .field("product", DataTypes.STRING())
@@ -106,8 +103,8 @@ tableEnv.connect(new FileSystem("/path/to/file"))
     .withSchema(schema)
     .createTemporaryTable("RubberOrders")
 
-// 在表上执行 SQL 更新操作,并把结果发出到 TableSink
-tableEnv.sqlUpdate(
+// 在表上执行插入操作,并把结果发出到 TableSink
+tableEnv.executeSql(
   "INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")
 {% endhighlight %}
 </div>
@@ -123,7 +120,6 @@ table = table_env.from_elements(..., ['user', 'product', 'amount'])
 result = table_env \
     .sql_query("SELECT SUM(amount) FROM %s WHERE product LIKE '%%Rubber%%'" % table)
 
-# SQL 更新已经注册的表
 # 创建并注册 TableSink
 t_env.connect(FileSystem().path("/path/to/file")))
     .with_format(Csv()
@@ -134,16 +130,106 @@ t_env.connect(FileSystem().path("/path/to/file")))
                  .field("amount", DataTypes.BIGINT()))
     .create_temporary_table("RubberOrders")
 
-# 在表上执行 SQL 更新操作,并把结果发出到 TableSink
+# 在表上执行插入操作,并把结果发出到 TableSink
 table_env \
-    .sql_update("INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")
+    .execute_sql("INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")
+{% endhighlight %}
+</div>
+</div>
+
+{% top %}
+
+## 执行查询
+SELECT 语句或者 VALUES 语句可以通过 `TableEnvironment.executeSql()` 方法来执行,将选择的结果收集到本地。该方法返回 `TableResult` 对象用于包装查询的结果。和 SELECT 语句很像,一个 `Table` 对象可以通过 `Table.execute()` 方法执行从而将 `Table` 的内容收集到本地客户端。
+`TableResult.collect()` 方法返回一个可以关闭的行迭代器。除非所有的数据都被收集到本地,否则一个查询作业永远不会结束。所以我们应该通过 `CloseableIterator#close()` 方法主动地关闭作业以防止资源泄露。
+我们还可以通过 `TableResult.print()` 方法将查询结果打印到本地控制台。`TableResult` 中的结果数据只能被访问一次,因此一个 `TableResult` 实例中,`collect()` 方法和 `print()` 方法不能被同时使用。
+
+对于流模式,`TableResult.collect()` 方法或者 `TableResult.print` 方法保证端到端精确一次的数据交付。这就要求开启 checkpointing。默认情况下 checkpointing 是禁止的,我们可以通过 `TableConfig` 设置 checkpointing 相关属性(请参考 <a href="{{ site.baseurl }}/zh/ops/config.html#checkpointing">checkpointing 配置</a>)来开启 checkpointing。
+因此一条结果数据只有在其对应的 checkpointing 完成后才能在客户端被访问。
+
+**注意:** 对于流模式,当前仅支持追加模式的查询语句,并且应该开启 checkpoint。因为一条结果只有在其对应的 checkpoint 完成之后才能被客户端访问到。
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
+// enable checkpointing
+tableEnv.getConfig().getConfiguration().set(
+  ExecutionCheckpointingOptions.CHECKPOINTING_MODE, CheckpointingMode.EXACTLY_ONCE);
+tableEnv.getConfig().getConfiguration().set(
+  ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofSeconds(10));
+
+tableEnv.executeSql("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)");
+
+// execute SELECT statement
+TableResult tableResult1 = tableEnv.executeSql("SELECT * FROM Orders");
+// use try-with-resources statement to make sure the iterator will be closed automatically
+try (CloseableIterator<Row> it = tableResult1.collect()) {
+    while(it.hasNext()) {
+        Row row = it.next();
+        // handle row
+    }
+}
+
+// execute Table
+TableResult tableResult2 = tableEnv.sqlQuery("SELECT * FROM Orders").execute();
+tableResult2.print();
+
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val env = StreamExecutionEnvironment.getExecutionEnvironment()
+val tableEnv = StreamTableEnvironment.create(env, settings)
+// enable checkpointing
+tableEnv.getConfig.getConfiguration.set(
+  ExecutionCheckpointingOptions.CHECKPOINTING_MODE, CheckpointingMode.EXACTLY_ONCE)
+tableEnv.getConfig.getConfiguration.set(
+  ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofSeconds(10))
+
+tableEnv.executeSql("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)")
+
+// execute SELECT statement
+val tableResult1 = tableEnv.executeSql("SELECT * FROM Orders")
+val it = tableResult1.collect()
+try while (it.hasNext) {
+  val row = it.next
+  // handle row
+}
+finally it.close() // close the iterator to avoid resource leak
+
+// execute Table
+val tableResult2 = tableEnv.sqlQuery("SELECT * FROM Orders").execute()
+tableResult2.print()
+
+{% endhighlight %}
+</div>
+<div data-lang="python" markdown="1">
+{% highlight python %}
+env = StreamExecutionEnvironment.get_execution_environment()
+table_env = StreamTableEnvironment.create(env, settings)
+# enable checkpointing
+table_env.get_config().get_configuration().set_string("execution.checkpointing.mode", "EXACTLY_ONCE")
+table_env.get_config().get_configuration().set_string("execution.checkpointing.interval", "10s")
+
+table_env.execute_sql("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)")
+
+# execute SELECT statement
+table_result1 = table_env.execute_sql("SELECT * FROM Orders")
+table_result1.print()
+
+# execute Table
+table_result2 = table_env.sql_query("SELECT * FROM Orders").execute()
+table_result2.print()
+
 {% endhighlight %}
 </div>
 </div>
 
 {% top %}
 
-## 支持的语法
+## 语法
 
 Flink 通过支持标准 ANSI SQL的 [Apache Calcite](https://calcite.apache.org/docs/reference.html) 解析 SQL。
 
diff --git a/docs/dev/table/streaming/query_configuration.md b/docs/dev/table/streaming/query_configuration.md
index bf84843..b677276 100644
--- a/docs/dev/table/streaming/query_configuration.md
+++ b/docs/dev/table/streaming/query_configuration.md
@@ -51,7 +51,7 @@ tableEnv.registerTableSink(
   sink);                       // table sink
 
 // emit result Table via a TableSink
-result.insertInto("outputTable");
+result.executeInsert("outputTable");
 
 // convert result Table into a DataStream<Row>
 DataStream<Row> stream = tableEnv.toAppendStream(result, Row.class);
@@ -82,7 +82,7 @@ tableEnv.registerTableSink(
   sink)                           // table sink
 
 // emit result Table via a TableSink
-result.insertInto("outputTable")
+result.executeInsert("outputTable")
 
 // convert result Table into a DataStream[Row]
 val stream: DataStream[Row] = result.toAppendStream[Row]
@@ -110,7 +110,7 @@ table_env.register_table_sink("outputTable",  # table name
                               sink)  # table sink
 
 # emit result Table via a TableSink
-result.insert_into("outputTable")
+result.execute_insert("outputTable")
 
 {% endhighlight %}
 </div>
diff --git a/docs/dev/table/streaming/query_configuration.zh.md b/docs/dev/table/streaming/query_configuration.zh.md
index bf84843..2fe7ab2 100644
--- a/docs/dev/table/streaming/query_configuration.zh.md
+++ b/docs/dev/table/streaming/query_configuration.zh.md
@@ -51,7 +51,7 @@ tableEnv.registerTableSink(
   sink);                       // table sink
 
 // emit result Table via a TableSink
-result.insertInto("outputTable");
+result.executeInsert("outputTable");
 
 // convert result Table into a DataStream<Row>
 DataStream<Row> stream = tableEnv.toAppendStream(result, Row.class);
@@ -82,7 +82,7 @@ tableEnv.registerTableSink(
   sink)                           // table sink
 
 // emit result Table via a TableSink
-result.insertInto("outputTable")
+result.executeInsert("outputTable")
 
 // convert result Table into a DataStream[Row]
 val stream: DataStream[Row] = result.toAppendStream[Row]
diff --git a/docs/dev/table/tableApi.md b/docs/dev/table/tableApi.md
index a4e1f5c..fb46087 100644
--- a/docs/dev/table/tableApi.md
+++ b/docs/dev/table/tableApi.md
@@ -2060,13 +2060,13 @@ result3 = in.order_by("a.asc").offset(10).fetch(5)
         <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span>
       </td>
       <td>
-        <p>Similar to the INSERT INTO clause in a SQL query. Performs a insertion into a registered output table.</p>
+        <p>Similar to the `INSERT INTO` clause in a SQL query, the method performs an insertion into a registered output table. The `executeInsert()` method will immediately submit a Flink job which execute the insert operation.</p>
 
         <p>Output tables must be registered in the TableEnvironment (see <a href="common.html#connector-tables">Connector tables</a>). Moreover, the schema of the registered table must match the schema of the query.</p>
 
 {% highlight java %}
 Table orders = tableEnv.from("Orders");
-orders.insertInto("OutOrders");
+orders.executeInsert("OutOrders");
 {% endhighlight %}
       </td>
     </tr>
@@ -2090,13 +2090,13 @@ orders.insertInto("OutOrders");
         <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span>
       </td>
       <td>
-        <p>Similar to the INSERT INTO clause in a SQL query. Performs a insertion into a registered output table.</p>
+        <p>Similar to the `INSERT INTO` clause in a SQL query, the method performs an insertion into a registered output table. The `executeInsert()` method will immediately submit a Flink job which execute the insert operation.</p>
 
         <p>Output tables must be registered in the TableEnvironment (see <a href="common.html#connector-tables">Connector tables</a>). Moreover, the schema of the registered table must match the schema of the query.</p>
 
 {% highlight scala %}
 val orders: Table = tableEnv.from("Orders")
-orders.insertInto("OutOrders")
+orders.executeInsert("OutOrders")
 {% endhighlight %}
       </td>
     </tr>
@@ -2120,13 +2120,13 @@ orders.insertInto("OutOrders")
         <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span>
       </td>
       <td>
-        <p>Similar to the INSERT INTO clause in a SQL query. Performs a insertion into a registered output table.</p>
+        <p>Similar to the INSERT INTO clause in a SQL query. Performs a insertion into a registered output table. The executeInsert method will immediately submit a flink job which execute the insert operation.</p>
 
         <p>Output tables must be registered in the TableEnvironment (see <a href="common.html#register-a-tablesink">Register a TableSink</a>). Moreover, the schema of the registered table must match the schema of the query.</p>
 
 {% highlight python %}
-orders = table_env.from_path("Orders");
-orders.insert_into("OutOrders");
+orders = table_env.from_path("Orders")
+orders.execute_insert("OutOrders")
 {% endhighlight %}
       </td>
     </tr>
diff --git a/docs/dev/table/tableApi.zh.md b/docs/dev/table/tableApi.zh.md
index 0ac1d80..f22ec9c 100644
--- a/docs/dev/table/tableApi.zh.md
+++ b/docs/dev/table/tableApi.zh.md
@@ -2059,13 +2059,13 @@ result3 = in.order_by("a.asc").offset(10).fetch(5)
         <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span>
       </td>
       <td>
-        <p>Similar to the INSERT INTO clause in a SQL query. Performs a insertion into a registered output table.</p>
+        <p>Similar to the `INSERT INTO` clause in a SQL query, the method performs an insertion into a registered output table. The `executeInsert()` method will immediately submit a Flink job which execute the insert operation.</p>
 
         <p>Output tables must be registered in the TableEnvironment (see <a href="common.html#register-a-tablesink">Register a TableSink</a>). Moreover, the schema of the registered table must match the schema of the query.</p>
 
 {% highlight java %}
 Table orders = tableEnv.from("Orders");
-orders.insertInto("OutOrders");
+orders.executeInsert("OutOrders");
 {% endhighlight %}
       </td>
     </tr>
@@ -2089,13 +2089,13 @@ orders.insertInto("OutOrders");
         <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span>
       </td>
       <td>
-        <p>Similar to the INSERT INTO clause in a SQL query. Performs a insertion into a registered output table.</p>
+        <p>Similar to the `INSERT INTO` clause in a SQL query, the method performs an insertion into a registered output table. The `executeInsert()` method will immediately submit a Flink job which execute the insert operation.</p>
 
         <p>Output tables must be registered in the TableEnvironment (see <a href="common.html#connector-tables">Connector tables</a>). Moreover, the schema of the registered table must match the schema of the query.</p>
 
 {% highlight scala %}
 val orders: Table = tableEnv.from("Orders")
-orders.insertInto("OutOrders")
+orders.executeInsert("OutOrders")
 {% endhighlight %}
       </td>
     </tr>
@@ -2119,13 +2119,13 @@ orders.insertInto("OutOrders")
         <span class="label label-primary">批处理</span> <span class="label label-primary">流处理</span>
       </td>
       <td>
-        <p>类似于SQL请求中的INSERT INTO子句。将数据输出到一个已注册的输出表中。</p>
+        <p>类似于SQL请求中的INSERT INTO子句。将数据输出到一个已注册的输出表中。`execute_insert` 方法会立即提交一个 Flink 作业,触发插入操作。</p>
 
         <p>输出表必须先在TableEnvironment中注册(详见<a href="common.html#register-a-tablesink">注册一个TableSink</a>)。此外,注册的表的模式(schema)必须和请求的结果的模式(schema)相匹配。</p>
 
 {% highlight python %}
-orders = table_env.from_path("Orders");
-orders.insert_into("OutOrders");
+orders = table_env.from_path("Orders")
+orders.execute_insert("OutOrders")
 {% endhighlight %}
       </td>
     </tr>


[flink] 08/08: [FLINK-17599][docs] Add documents for SHOW statement

Posted by tw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 77ff1222a3d7ebc19c6f5a3f4cec5080f3d57d57
Author: godfreyhe <go...@163.com>
AuthorDate: Mon Jun 15 16:30:05 2020 +0800

    [FLINK-17599][docs] Add documents for SHOW statement
---
 docs/dev/table/sql/index.md      |   1 +
 docs/dev/table/sql/index.zh.md   |   1 +
 docs/dev/table/sql/queries.md    |  39 ------
 docs/dev/table/sql/queries.zh.md |  39 ------
 docs/dev/table/sql/show.md       | 277 +++++++++++++++++++++++++++++++++++++++
 docs/dev/table/sql/show.zh.md    | 277 +++++++++++++++++++++++++++++++++++++++
 6 files changed, 556 insertions(+), 78 deletions(-)

diff --git a/docs/dev/table/sql/index.md b/docs/dev/table/sql/index.md
index db96b1e..63d549c 100644
--- a/docs/dev/table/sql/index.md
+++ b/docs/dev/table/sql/index.md
@@ -37,6 +37,7 @@ This page lists all the supported statements supported in Flink SQL for now:
 - [DESCRIBE](describe.html)
 - [EXPLAIN](explain.html)
 - [USE](use.html)
+- [SHOW](show.html)
 
 ## Data Types
 
diff --git a/docs/dev/table/sql/index.zh.md b/docs/dev/table/sql/index.zh.md
index f81474b..0def8c2 100644
--- a/docs/dev/table/sql/index.zh.md
+++ b/docs/dev/table/sql/index.zh.md
@@ -37,6 +37,7 @@ under the License.
 - [DESCRIBE](describe.html)
 - [EXPLAIN](explain.html)
 - [USE](use.html)
+- [SHOW](show.html)
 
 ## 数据类型
 
diff --git a/docs/dev/table/sql/queries.md b/docs/dev/table/sql/queries.md
index 0040072..df78622 100644
--- a/docs/dev/table/sql/queries.md
+++ b/docs/dev/table/sql/queries.md
@@ -382,45 +382,6 @@ String literals must be enclosed in single quotes (e.g., `SELECT 'Hello World'`)
 
 ## Operations
 
-### Show
-
-<div markdown="1">
-<table class="table table-bordered">
-  <thead>
-    <tr>
-      <th class="text-left" style="width: 20%">Operation</th>
-      <th class="text-center">Description</th>
-    </tr>
-  </thead>
-  <tbody>
-    <tr>
-      <td>
-        <strong>Show</strong><br>
-        <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span>
-      </td>
-      <td>
-        <p>Show all catalogs</p>
-{% highlight sql %}
-SHOW CATALOGS;
-{% endhighlight %}
-		<p>Show all databases in the current catalog</p>
-{% highlight sql %}
-SHOW DATABASES;
-{% endhighlight %}
-		<p>Show all tables in the current database in the current catalog</p>
-{% highlight sql %}
-SHOW TABLES;
-{% endhighlight %}
-        <p>Show all views in the current database in the current catalog</p>
-{% highlight sql %}
-SHOW VIEWS;
-{% endhighlight %}
-      </td>
-    </tr>
-  </tbody>
-</table>
-</div>
-
 ### Scan, Projection, and Filter
 
 <div markdown="1">
diff --git a/docs/dev/table/sql/queries.zh.md b/docs/dev/table/sql/queries.zh.md
index d208380..059e1a3 100644
--- a/docs/dev/table/sql/queries.zh.md
+++ b/docs/dev/table/sql/queries.zh.md
@@ -380,45 +380,6 @@ Flink SQL 对于标识符(表、属性、函数名)有类似于 Java 的词
 
 ## 操作符
 
-### Show
-
-<div markdown="1">
-<table class="table table-bordered">
-  <thead>
-    <tr>
-      <th class="text-left" style="width: 20%">操作符</th>
-      <th class="text-center">描述</th>
-    </tr>
-  </thead>
-  <tbody>
-    <tr>
-      <td>
-        <strong>Show</strong><br>
-        <span class="label label-primary">批处理</span> <span class="label label-primary">流处理</span>
-      </td>
-      <td>
-        <p>显示所有 catalog</p>
-{% highlight sql %}
-SHOW CATALOGS;
-{% endhighlight %}
-    <p>显示当前 catalog 中所有的数据库</p>
-{% highlight sql %}
-SHOW DATABASES;
-{% endhighlight %}
-    <p>显示当前数据库、Catalog中的所有表</p>
-{% highlight sql %}
-SHOW TABLES;
-{% endhighlight %}
-        <p>显示当前数据库、Catalog中的所有视图</p>
-{% highlight sql %}
-SHOW VIEWS;
-{% endhighlight %}
-      </td>
-    </tr>
-  </tbody>
-</table>
-</div>
-
 ### Scan、Projection 与 Filter
 
 <div markdown="1">
diff --git a/docs/dev/table/sql/show.md b/docs/dev/table/sql/show.md
new file mode 100644
index 0000000..4c9e241
--- /dev/null
+++ b/docs/dev/table/sql/show.md
@@ -0,0 +1,277 @@
+---
+title: "SHOW Statements"
+nav-parent_id: sql
+nav-pos: 10
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+* This will be replaced by the TOC
+{:toc}
+
+SHOW statements are used to list all catalogs, or list all databases in the current catalog, or list all tables/views in the current catalog and the current database, or list all functions including temp system functions, system functions, temp catalog functions and catalog functions in the current catalog and the current database.
+
+Flink SQL supports the following SHOW statements for now:
+- SHOW CATALOGS
+- SHOW DATABASES
+- SHOW TABLES 
+- SHOW VIEWS
+- SHOW FUNCTIONS
+
+
+## Run a SHOW statement
+
+SHOW statements can be executed with the `executeSql()` method of the `TableEnvironment`, or executed in [SQL CLI]({{ site.baseurl }}/dev/table/sqlClient.html). The `executeSql()` method returns objects for a successful SHOW operation, otherwise will throw an exception.
+
+The following examples show how to run a SHOW statement in `TableEnvironment` and in SQL CLI.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+// show catalogs
+tEnv.executeSql("SHOW CATALOGS").print();
+// +-----------------+
+// |    catalog name |
+// +-----------------+
+// | default_catalog |
+// +-----------------+
+
+// show databases
+tEnv.executeSql("SHOW DATABASES").print();
+// +------------------+
+// |    database name |
+// +------------------+
+// | default_database |
+// +------------------+
+
+// create a table
+tEnv.executeSql("CREATE TABLE my_table (...) WITH (...)");
+// show tables
+tEnv.executeSql("SHOW TABLES").print();
+// +------------+
+// | table name |
+// +------------+
+// |   my_table |
+// +------------+
+
+// create a view
+tEnv.executeSql("CREATE VIEW my_view AS ...");
+// show views
+tEnv.executeSql("SHOW VIEWS").print();
+// +-----------+
+// | view name |
+// +-----------+
+// |   my_view |
+// +-----------+
+
+// show functions
+tEnv.executeSql("SHOW FUNCTIONS").print();
+// +---------------+
+// | function name |
+// +---------------+
+// |           mod |
+// |        sha256 |
+// |           ... |
+// +---------------+
+
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val env = StreamExecutionEnvironment.getExecutionEnvironment()
+val tEnv = StreamTableEnvironment.create(env)
+
+// show catalogs
+tEnv.executeSql("SHOW CATALOGS").print()
+// +-----------------+
+// |    catalog name |
+// +-----------------+
+// | default_catalog |
+// +-----------------+
+
+// show databases
+tEnv.executeSql("SHOW DATABASES").print()
+// +------------------+
+// |    database name |
+// +------------------+
+// | default_database |
+// +------------------+
+
+// create a table
+tEnv.executeSql("CREATE TABLE my_table (...) WITH (...)")
+// show tables
+tEnv.executeSql("SHOW TABLES").print()
+// +------------+
+// | table name |
+// +------------+
+// |   my_table |
+// +------------+
+
+// create a view
+tEnv.executeSql("CREATE VIEW my_view AS ...")
+// show views
+tEnv.executeSql("SHOW VIEWS").print()
+// +-----------+
+// | view name |
+// +-----------+
+// |   my_view |
+// +-----------+
+
+// show functions
+tEnv.executeSql("SHOW FUNCTIONS").print()
+// +---------------+
+// | function name |
+// +---------------+
+// |           mod |
+// |        sha256 |
+// |           ... |
+// +---------------+
+
+{% endhighlight %}
+</div>
+
+<div data-lang="python" markdown="1">
+{% highlight python %}
+settings = EnvironmentSettings.new_instance()...
+table_env = StreamTableEnvironment.create(env, settings)
+
+# show catalogs
+table_env.execute_sql("SHOW CATALOGS").print()
+# +-----------------+
+# |    catalog name |
+# +-----------------+
+# | default_catalog |
+# +-----------------+
+
+# show databases
+table_env.execute_sql("SHOW DATABASES").print()
+# +------------------+
+# |    database name |
+# +------------------+
+# | default_database |
+# +------------------+
+
+# create a table
+table_env.execute_sql("CREATE TABLE my_table (...) WITH (...)")
+# show tables
+table_env.execute_sql("SHOW TABLES").print()
+# +------------+
+# | table name |
+# +------------+
+# |   my_table |
+# +------------+
+
+# create a view
+table_env.execute_sql("CREATE VIEW my_view AS ...")
+# show views
+table_env.execute_sql("SHOW VIEWS").print()
+# +-----------+
+# | view name |
+# +-----------+
+# |   my_view |
+# +-----------+
+
+# show functions
+table_env.execute_sql("SHOW FUNCTIONS").print()
+# +---------------+
+# | function name |
+# +---------------+
+# |           mod |
+# |        sha256 |
+# |           ... |
+# +---------------+
+
+{% endhighlight %}
+</div>
+
+<div data-lang="SQL CLI" markdown="1">
+{% highlight sql %}
+
+Flink SQL> SHOW CATALOGS;
+default_catalog
+
+Flink SQL> SHOW DATABASES;
+default_database
+
+Flink SQL> CREATE TABLE my_table (...) WITH (...);
+[INFO] Table has been created.
+
+Flink SQL> SHOW TABLES;
+my_table
+
+Flink SQL> CREATE VIEW my_view AS ...;
+[INFO] View has been created.
+
+Flink SQL> SHOW VIEWS;
+my_view
+
+Flink SQL> SHOW FUNCTIONS;
+mod
+sha256
+...
+
+{% endhighlight %}
+</div>
+</div>
+
+{% top %}
+
+## SHOW CATALOGS
+
+{% highlight sql %}
+SHOW CATALOGS
+{% endhighlight %}
+
+Show all catalogs.
+
+## SHOW DATABASES
+
+{% highlight sql %}
+SHOW DATABASES
+{% endhighlight %}
+
+Show all databases in the current catalog.
+
+## SHOW TABLES
+
+{% highlight sql %}
+SHOW TABLES
+{% endhighlight %}
+
+Show all tables in the current catalog and the current database.
+
+## SHOW VIEWS
+
+{% highlight sql %}
+SHOW VIEWS
+{% endhighlight %}
+
+Show all views in the current catalog and the current database.
+
+## SHOW FUNCTIONS
+
+{% highlight sql %}
+SHOW FUNCTIONS
+{% endhighlight %}
+
+Show all functions including temp system functions, system functions, temp catalog functions and catalog functions in the current catalog and current database.
\ No newline at end of file
diff --git a/docs/dev/table/sql/show.zh.md b/docs/dev/table/sql/show.zh.md
new file mode 100644
index 0000000..9f6c648
--- /dev/null
+++ b/docs/dev/table/sql/show.zh.md
@@ -0,0 +1,277 @@
+---
+title: "SHOW 语句"
+nav-parent_id: sql
+nav-pos: 10
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+* This will be replaced by the TOC
+{:toc}
+
+SHOW 语句用于列出所有的 catalog,或者列出当前 catalog 中所有的 database,或者列出当前 catalog 和当前 database 的所有表或视图,或者列出所有的 function,包括:临时系统 function,系统 function,临时 catalog function,当前 catalog 和 database 中的 catalog function。
+
+目前 Flink SQL 支持下列 SHOW 语句:
+- SHOW CATALOGS
+- SHOW DATABASES
+- SHOW TABLES 
+- SHOW VIEWS
+- SHOW FUNCTIONS
+
+
+## 执行 SHOW 语句
+
+可以使用 `TableEnvironment` 中的 `executeSql()` 方法执行 SHOW 语句,也可以在 [SQL CLI]({{ site.baseurl }}/zh/dev/table/sqlClient.html) 中执行 SHOW 语句。 若 SHOW 操作执行成功,`executeSql()` 方法返回所有对象,否则会抛出异常。
+
+以下的例子展示了如何在 `TableEnvironment` 和  SQL CLI 中执行一个 SHOW 语句。
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+// show catalogs
+tEnv.executeSql("SHOW CATALOGS").print();
+// +-----------------+
+// |    catalog name |
+// +-----------------+
+// | default_catalog |
+// +-----------------+
+
+// show databases
+tEnv.executeSql("SHOW DATABASES").print();
+// +------------------+
+// |    database name |
+// +------------------+
+// | default_database |
+// +------------------+
+
+// create a table
+tEnv.executeSql("CREATE TABLE my_table (...) WITH (...)");
+// show tables
+tEnv.executeSql("SHOW TABLES").print();
+// +------------+
+// | table name |
+// +------------+
+// |   my_table |
+// +------------+
+
+// create a view
+tEnv.executeSql("CREATE VIEW my_view AS ...");
+// show views
+tEnv.executeSql("SHOW VIEWS").print();
+// +-----------+
+// | view name |
+// +-----------+
+// |   my_view |
+// +-----------+
+
+// show functions
+tEnv.executeSql("SHOW FUNCTIONS").print();
+// +---------------+
+// | function name |
+// +---------------+
+// |           mod |
+// |        sha256 |
+// |           ... |
+// +---------------+
+
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val env = StreamExecutionEnvironment.getExecutionEnvironment()
+val tEnv = StreamTableEnvironment.create(env)
+
+// show catalogs
+tEnv.executeSql("SHOW CATALOGS").print()
+// +-----------------+
+// |    catalog name |
+// +-----------------+
+// | default_catalog |
+// +-----------------+
+
+// show databases
+tEnv.executeSql("SHOW DATABASES").print()
+// +------------------+
+// |    database name |
+// +------------------+
+// | default_database |
+// +------------------+
+
+// create a table
+tEnv.executeSql("CREATE TABLE my_table (...) WITH (...)")
+// show tables
+tEnv.executeSql("SHOW TABLES").print()
+// +------------+
+// | table name |
+// +------------+
+// |   my_table |
+// +------------+
+
+// create a view
+tEnv.executeSql("CREATE VIEW my_view AS ...")
+// show views
+tEnv.executeSql("SHOW VIEWS").print()
+// +-----------+
+// | view name |
+// +-----------+
+// |   my_view |
+// +-----------+
+
+// show functions
+tEnv.executeSql("SHOW FUNCTIONS").print()
+// +---------------+
+// | function name |
+// +---------------+
+// |           mod |
+// |        sha256 |
+// |           ... |
+// +---------------+
+
+{% endhighlight %}
+</div>
+
+<div data-lang="python" markdown="1">
+{% highlight python %}
+settings = EnvironmentSettings.new_instance()...
+table_env = StreamTableEnvironment.create(env, settings)
+
+# show catalogs
+table_env.execute_sql("SHOW CATALOGS").print()
+# +-----------------+
+# |    catalog name |
+# +-----------------+
+# | default_catalog |
+# +-----------------+
+
+# show databases
+table_env.execute_sql("SHOW DATABASES").print()
+# +------------------+
+# |    database name |
+# +------------------+
+# | default_database |
+# +------------------+
+
+# create a table
+table_env.execute_sql("CREATE TABLE my_table (...) WITH (...)")
+# show tables
+table_env.execute_sql("SHOW TABLES").print()
+# +------------+
+# | table name |
+# +------------+
+# |   my_table |
+# +------------+
+
+# create a view
+table_env.execute_sql("CREATE VIEW my_view AS ...")
+# show views
+table_env.execute_sql("SHOW VIEWS").print()
+# +-----------+
+# | view name |
+# +-----------+
+# |   my_view |
+# +-----------+
+
+# show functions
+table_env.execute_sql("SHOW FUNCTIONS").print()
+# +---------------+
+# | function name |
+# +---------------+
+# |           mod |
+# |        sha256 |
+# |           ... |
+# +---------------+
+
+{% endhighlight %}
+</div>
+
+<div data-lang="SQL CLI" markdown="1">
+{% highlight sql %}
+
+Flink SQL> SHOW CATALOGS;
+default_catalog
+
+Flink SQL> SHOW DATABASES;
+default_database
+
+Flink SQL> CREATE TABLE my_table (...) WITH (...);
+[INFO] Table has been created.
+
+Flink SQL> SHOW TABLES;
+my_table
+
+Flink SQL> CREATE VIEW my_view AS ...;
+[INFO] View has been created.
+
+Flink SQL> SHOW VIEWS;
+my_view
+
+Flink SQL> SHOW FUNCTIONS;
+mod
+sha256
+...
+
+{% endhighlight %}
+</div>
+</div>
+
+{% top %}
+
+## SHOW CATALOGS
+
+{% highlight sql %}
+SHOW CATALOGS
+{% endhighlight %}
+
+展示所有的 catalog。
+
+## SHOW DATABASES
+
+{% highlight sql %}
+SHOW DATABASES
+{% endhighlight %}
+
+展示当前 catalog 中所有的 database。
+
+## SHOW TABLES
+
+{% highlight sql %}
+SHOW TABLES
+{% endhighlight %}
+
+展示当前 catalog 和当前 database 中所有的表。
+
+## SHOW VIEWS
+
+{% highlight sql %}
+SHOW VIEWS
+{% endhighlight %}
+
+展示当前 catalog 和当前 database 中所有的视图。
+
+## SHOW FUNCTIONS
+
+{% highlight sql %}
+SHOW FUNCTIONS
+{% endhighlight %}
+
+展示所有的 function,包括:临时系统 function, 系统 function, 临时 catalog function,当前 catalog 和 database 中的 catalog function。
\ No newline at end of file


[flink] 07/08: [FLINK-17599][docs] Add documents for USE statement

Posted by tw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d9e3e2006b5acb85731daf4d8b189df2c87f3ac8
Author: godfreyhe <go...@163.com>
AuthorDate: Wed Jun 10 16:57:22 2020 +0800

    [FLINK-17599][docs] Add documents for USE statement
---
 docs/dev/table/sql/index.md      |   1 +
 docs/dev/table/sql/index.zh.md   |   1 +
 docs/dev/table/sql/queries.md    |  18 +---
 docs/dev/table/sql/queries.zh.md |  18 +---
 docs/dev/table/sql/use.md        | 200 +++++++++++++++++++++++++++++++++++++++
 docs/dev/table/sql/use.zh.md     | 199 ++++++++++++++++++++++++++++++++++++++
 6 files changed, 403 insertions(+), 34 deletions(-)

diff --git a/docs/dev/table/sql/index.md b/docs/dev/table/sql/index.md
index 5ac4ff1..db96b1e 100644
--- a/docs/dev/table/sql/index.md
+++ b/docs/dev/table/sql/index.md
@@ -36,6 +36,7 @@ This page lists all the supported statements supported in Flink SQL for now:
 - [SQL HINTS](hints.html)
 - [DESCRIBE](describe.html)
 - [EXPLAIN](explain.html)
+- [USE](use.html)
 
 ## Data Types
 
diff --git a/docs/dev/table/sql/index.zh.md b/docs/dev/table/sql/index.zh.md
index 9b220f7..f81474b 100644
--- a/docs/dev/table/sql/index.zh.md
+++ b/docs/dev/table/sql/index.zh.md
@@ -36,6 +36,7 @@ under the License.
 - [SQL HINTS](hints.html)
 - [DESCRIBE](describe.html)
 - [EXPLAIN](explain.html)
+- [USE](use.html)
 
 ## 数据类型
 
diff --git a/docs/dev/table/sql/queries.md b/docs/dev/table/sql/queries.md
index 9bfb953..0040072 100644
--- a/docs/dev/table/sql/queries.md
+++ b/docs/dev/table/sql/queries.md
@@ -382,7 +382,7 @@ String literals must be enclosed in single quotes (e.g., `SELECT 'Hello World'`)
 
 ## Operations
 
-### Show and Use
+### Show
 
 <div markdown="1">
 <table class="table table-bordered">
@@ -417,22 +417,6 @@ SHOW VIEWS;
 {% endhighlight %}
       </td>
     </tr>
-    <tr>
-      <td>
-        <strong>Use</strong><br>
-        <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span>
-      </td>
-      <td>
-			<p>Set current catalog for the session </p>
-{% highlight sql %}
-USE CATALOG mycatalog;
-{% endhighlight %}
-            <p>Set current database of the current catalog for the session</p>
-{% highlight sql %}
-USE mydatabase;
-{% endhighlight %}
-      </td>
-    </tr>
   </tbody>
 </table>
 </div>
diff --git a/docs/dev/table/sql/queries.zh.md b/docs/dev/table/sql/queries.zh.md
index 147b682..d208380 100644
--- a/docs/dev/table/sql/queries.zh.md
+++ b/docs/dev/table/sql/queries.zh.md
@@ -380,7 +380,7 @@ Flink SQL 对于标识符(表、属性、函数名)有类似于 Java 的词
 
 ## 操作符
 
-### Show 与 Use
+### Show
 
 <div markdown="1">
 <table class="table table-bordered">
@@ -415,22 +415,6 @@ SHOW VIEWS;
 {% endhighlight %}
       </td>
     </tr>
-    <tr>
-      <td>
-        <strong>Use</strong><br>
-        <span class="label label-primary">批处理</span> <span class="label label-primary">流处理</span>
-      </td>
-      <td>
-      <p>为本次会话设置 catalog </p>
-{% highlight sql %}
-USE CATALOG mycatalog;
-{% endhighlight %}
-            <p>为会话设置一个属于当前 catalog 的数据库</p>
-{% highlight sql %}
-USE mydatabase;
-{% endhighlight %}
-      </td>
-    </tr>
   </tbody>
 </table>
 </div>
diff --git a/docs/dev/table/sql/use.md b/docs/dev/table/sql/use.md
new file mode 100644
index 0000000..983d2d4
--- /dev/null
+++ b/docs/dev/table/sql/use.md
@@ -0,0 +1,200 @@
+---
+title: "USE Statements"
+nav-parent_id: sql
+nav-pos: 9
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+* This will be replaced by the TOC
+{:toc}
+
+USE statements are used to set the current database or catalog.
+
+
+## Run a USE statement
+
+USE statements can be executed with the `executeSql()` method of the `TableEnvironment`, or executed in [SQL CLI]({{ site.baseurl }}/dev/table/sqlClient.html). The `executeSql()` method returns 'OK' for a successful USE operation, otherwise will throw an exception.
+
+The following examples show how to run a USE statement in `TableEnvironment` and in SQL CLI.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+// create a catalog
+tEnv.executeSql("CREATE CATALOG cat1 WITH (...)");
+tEnv.executeSql("SHOW CATALOGS").print();
+// +-----------------+
+// |    catalog name |
+// +-----------------+
+// | default_catalog |
+// | cat1            |
+// +-----------------+
+
+// change default catalog
+tEnv.executeSql("USE CATALOG cat1");
+
+tEnv.executeSql("SHOW DATABASES").print();
+// databases are empty
+// +---------------+
+// | database name |
+// +---------------+
+// +---------------+
+
+// create a database
+tEnv.executeSql("CREATE DATABASE db1 WITH (...)");
+tEnv.executeSql("SHOW DATABASES").print();
+// +---------------+
+// | database name |
+// +---------------+
+// |        db1    |
+// +---------------+
+
+// change default database
+tEnv.executeSql("USE db1");
+
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val env = StreamExecutionEnvironment.getExecutionEnvironment()
+val tEnv = StreamTableEnvironment.create(env)
+
+// create a catalog
+tEnv.executeSql("CREATE CATALOG cat1 WITH (...)")
+tEnv.executeSql("SHOW CATALOGS").print()
+// +-----------------+
+// |    catalog name |
+// +-----------------+
+// | default_catalog |
+// | cat1            |
+// +-----------------+
+
+// change default catalog
+tEnv.executeSql("USE CATALOG cat1")
+
+tEnv.executeSql("SHOW DATABASES").print()
+// databases are empty
+// +---------------+
+// | database name |
+// +---------------+
+// +---------------+
+
+// create a database
+tEnv.executeSql("CREATE DATABASE db1 WITH (...)")
+tEnv.executeSql("SHOW DATABASES").print()
+// +---------------+
+// | database name |
+// +---------------+
+// |        db1    |
+// +---------------+
+
+// change default database
+tEnv.executeSql("USE db1")
+
+{% endhighlight %}
+</div>
+
+<div data-lang="python" markdown="1">
+{% highlight python %}
+settings = EnvironmentSettings.new_instance()...
+table_env = StreamTableEnvironment.create(env, settings)
+
+# create a catalog
+table_env.execute_sql("CREATE CATALOG cat1 WITH (...)")
+table_env.execute_sql("SHOW CATALOGS").print()
+# +-----------------+
+# |    catalog name |
+# +-----------------+
+# | default_catalog |
+# | cat1            |
+# +-----------------+
+
+# change default catalog
+table_env.execute_sql("USE CATALOG cat1")
+
+table_env.execute_sql("SHOW DATABASES").print()
+# databases are empty
+# +---------------+
+# | database name |
+# +---------------+
+# +---------------+
+
+# create a database
+table_env.execute_sql("CREATE DATABASE db1 WITH (...)")
+table_env.execute_sql("SHOW DATABASES").print()
+# +---------------+
+# | database name |
+# +---------------+
+# |           db1 |
+# +---------------+
+
+# change default database
+table_env.execute_sql("USE db1")
+
+{% endhighlight %}
+</div>
+
+<div data-lang="SQL CLI" markdown="1">
+{% highlight sql %}
+Flink SQL> CREATE CATALOG cat1 WITH (...);
+[INFO] Catalog has been created.
+
+Flink SQL> SHOW CATALOGS;
+default_catalog
+cat1
+
+Flink SQL> USE CATALOG cat1;
+
+Flink SQL> SHOW DATABASES;
+
+Flink SQL> CREATE DATABASE db1 WITH (...);
+[INFO] Database has been created.
+
+Flink SQL> SHOW DATABASES;
+db1
+
+Flink SQL> USE db1;
+
+{% endhighlight %}
+</div>
+</div>
+
+{% top %}
+
+## USE CATLOAG
+
+{% highlight sql %}
+USE CATALOG catalog_name
+{% endhighlight %}
+
+Set the current catalog. All subsequent commands that do not explicitly specify a catalog will use this one. If the provided catalog does not exist, an exception is thrown. The default current catalog is `default_catalog`.
+
+
+## USE
+
+{% highlight sql %}
+USE [catalog_name.]database_name
+{% endhighlight %}
+
+Set the current database. All subsequent commands that do not explicitly specify a database will use this one. If the provided database does not exist, an exception is thrown. The default current database is `default_database`.
diff --git a/docs/dev/table/sql/use.zh.md b/docs/dev/table/sql/use.zh.md
new file mode 100644
index 0000000..128faf5
--- /dev/null
+++ b/docs/dev/table/sql/use.zh.md
@@ -0,0 +1,199 @@
+---
+title: "USE 语句"
+nav-parent_id: sql
+nav-pos: 9
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+* This will be replaced by the TOC
+{:toc}
+
+USE 语句用来设置当前的 catalog 或者 database。
+
+## 运行一个 USE 语句
+
+可以使用 TableEnvironment 中的 executeSql() 方法执行 USE 语句,也可以在 SQL CLI 中执行 USE 语句。 若 USE 操作执行成功,executeSql() 方法返回 'OK',否则会抛出异常。
+
+以下的例子展示了如何在 TableEnvironment 和 SQL CLI 中执行一个 USE 语句。
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+// create a catalog
+tEnv.executeSql("CREATE CATALOG cat1 WITH (...)");
+tEnv.executeSql("SHOW CATALOGS").print();
+// +-----------------+
+// |    catalog name |
+// +-----------------+
+// | default_catalog |
+// | cat1            |
+// +-----------------+
+
+// change default catalog
+tEnv.executeSql("USE CATALOG cat1");
+
+tEnv.executeSql("SHOW DATABASES").print();
+// databases are empty
+// +---------------+
+// | database name |
+// +---------------+
+// +---------------+
+
+// create a database
+tEnv.executeSql("CREATE DATABASE db1 WITH (...)");
+tEnv.executeSql("SHOW DATABASES").print();
+// +---------------+
+// | database name |
+// +---------------+
+// |        db1    |
+// +---------------+
+
+// change default database
+tEnv.executeSql("USE db1");
+
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val env = StreamExecutionEnvironment.getExecutionEnvironment()
+val tEnv = StreamTableEnvironment.create(env)
+
+// create a catalog
+tEnv.executeSql("CREATE CATALOG cat1 WITH (...)")
+tEnv.executeSql("SHOW CATALOGS").print()
+// +-----------------+
+// |    catalog name |
+// +-----------------+
+// | default_catalog |
+// | cat1            |
+// +-----------------+
+
+// change default catalog
+tEnv.executeSql("USE CATALOG cat1")
+
+tEnv.executeSql("SHOW DATABASES").print()
+// databases are empty
+// +---------------+
+// | database name |
+// +---------------+
+// +---------------+
+
+// create a database
+tEnv.executeSql("CREATE DATABASE db1 WITH (...)")
+tEnv.executeSql("SHOW DATABASES").print()
+// +---------------+
+// | database name |
+// +---------------+
+// |        db1    |
+// +---------------+
+
+// change default database
+tEnv.executeSql("USE db1")
+
+{% endhighlight %}
+</div>
+
+<div data-lang="python" markdown="1">
+{% highlight python %}
+settings = EnvironmentSettings.new_instance()...
+table_env = StreamTableEnvironment.create(env, settings)
+
+# create a catalog
+table_env.execute_sql("CREATE CATALOG cat1 WITH (...)")
+table_env.execute_sql("SHOW CATALOGS").print()
+# +-----------------+
+# |    catalog name |
+# +-----------------+
+# | default_catalog |
+# | cat1            |
+# +-----------------+
+
+# change default catalog
+table_env.execute_sql("USE CATALOG cat1")
+
+table_env.execute_sql("SHOW DATABASES").print()
+# databases are empty
+# +---------------+
+# | database name |
+# +---------------+
+# +---------------+
+
+# create a database
+table_env.execute_sql("CREATE DATABASE db1 WITH (...)")
+table_env.execute_sql("SHOW DATABASES").print()
+# +---------------+
+# | database name |
+# +---------------+
+# |           db1 |
+# +---------------+
+
+# change default database
+table_env.execute_sql("USE db1")
+
+{% endhighlight %}
+</div>
+
+<div data-lang="SQL CLI" markdown="1">
+{% highlight sql %}
+Flink SQL> CREATE CATALOG cat1 WITH (...);
+[INFO] Catalog has been created.
+
+Flink SQL> SHOW CATALOGS;
+default_catalog
+cat1
+
+Flink SQL> USE CATALOG cat1;
+
+Flink SQL> SHOW DATABASES;
+
+Flink SQL> CREATE DATABASE db1 WITH (...);
+[INFO] Database has been created.
+
+Flink SQL> SHOW DATABASES;
+db1
+
+Flink SQL> USE db1;
+
+{% endhighlight %}
+</div>
+</div>
+
+{% top %}
+
+## USE CATLOAG
+
+{% highlight sql %}
+USE CATALOG catalog_name
+{% endhighlight %}
+
+设置当前的 catalog。所有后续命令未显式指定 catalog 的将使用此 catalog。如果指定的的 catalog 不存在,则抛出异常。默认的当前 catalog 是 `default_catalog`。
+
+
+## USE
+
+{% highlight sql %}
+USE [catalog_name.]database_name
+{% endhighlight %}
+
+设置当前的 database。所有后续命令未显式指定 database 的将使用此 database。如果指定的的 database 不存在,则抛出异常。默认的当前 database 是 `default_database`。


[flink] 02/08: [hotfix][table] fix typos in TableEnvironment javadoc

Posted by tw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 880a5d77e0ad93cff6f21704b9dbcf756efb3d0e
Author: godfreyhe <go...@163.com>
AuthorDate: Tue Jun 9 16:34:18 2020 +0800

    [hotfix][table] fix typos in TableEnvironment javadoc
---
 .../src/main/java/org/apache/flink/table/api/TableEnvironment.java       | 1 -
 1 file changed, 1 deletion(-)

diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
index 614b6e7..dfe5054e 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
@@ -326,7 +326,6 @@ public interface TableEnvironment {
 	 * Creates a table from a table source.
 	 *
 	 * @param source table source used as table
-	 * @deprecated use {@link #createTemporaryView(String, Table)}.
 	 */
 	@Deprecated
 	Table fromTableSource(TableSource<?> source);


[flink] 06/08: [FLINK-17599][docs] Add documents for EXPLAIN statement

Posted by tw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 6818f94c5565360a1a1b476c09a81c95d1165006
Author: godfreyhe <go...@163.com>
AuthorDate: Wed Jun 10 13:23:31 2020 +0800

    [FLINK-17599][docs] Add documents for EXPLAIN statement
---
 docs/dev/table/common.md         |   2 +
 docs/dev/table/common.zh.md      |   2 +
 docs/dev/table/sql/explain.md    | 180 ++++++++++++++++++++++++++++++++++++++
 docs/dev/table/sql/explain.zh.md | 182 +++++++++++++++++++++++++++++++++++++++
 docs/dev/table/sql/index.md      |   1 +
 docs/dev/table/sql/index.zh.md   |   1 +
 6 files changed, 368 insertions(+)

diff --git a/docs/dev/table/common.md b/docs/dev/table/common.md
index 40bd4c8..e8a1861 100644
--- a/docs/dev/table/common.md
+++ b/docs/dev/table/common.md
@@ -1442,6 +1442,8 @@ This is done through the `Table.explain()` method or `StatementSet.explain()` me
 2. the optimized logical query plan, and
 3. the physical execution plan.
 
+`TableEnvironment.explainSql()` and `TableEnvironment.executeSql()` support execute a `EXPLAIN` statement to get the plans, Please refer to [EXPLAIN]({{ site.baseurl }}/dev/table/sql/explain.html) page.
+
 The following code shows an example and the corresponding output for given `Table` using `Table.explain()` method:
 
 <div class="codetabs" markdown="1">
diff --git a/docs/dev/table/common.zh.md b/docs/dev/table/common.zh.md
index f295f39..0371bb8 100644
--- a/docs/dev/table/common.zh.md
+++ b/docs/dev/table/common.zh.md
@@ -1431,6 +1431,8 @@ Table API 提供了一种机制来解释计算 `Table` 的逻辑和优化查询
 2. 优化的逻辑查询计划,以及
 3. 物理执行计划。
 
+可以用 `TableEnvironment.explainSql()` 方法和 `TableEnvironment.executeSql()` 方法支持执行一个 `EXPLAIN` 语句获取逻辑和优化查询计划,请参阅 [EXPLAIN]({{ site.baseurl }}/zh/dev/table/sql/explain.html) 页面.
+
 以下代码展示了一个示例以及对给定 `Table` 使用 `Table.explain()` 方法的相应输出:
 
 <div class="codetabs" markdown="1">
diff --git a/docs/dev/table/sql/explain.md b/docs/dev/table/sql/explain.md
new file mode 100644
index 0000000..ae26b06
--- /dev/null
+++ b/docs/dev/table/sql/explain.md
@@ -0,0 +1,180 @@
+---
+title: "EXPLAIN Statements"
+nav-parent_id: sql
+nav-pos: 8
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+* This will be replaced by the TOC
+{:toc}
+
+EXPLAIN statements are used to explain the logical and optimized query plans of a query or an INSERT statement.
+
+
+## Run an EXPLAIN statement
+
+EXPLAIN statements can be executed with the `executeSql()` method of the `TableEnvironment`, or executed in [SQL CLI]({{ site.baseurl }}/dev/table/sqlClient.html). The `executeSql()` method returns explain result for a successful EXPLAIN operation, otherwise will throw an exception.
+
+The following examples show how to run an EXPLAIN statement in `TableEnvironment` and in SQL CLI.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+// register a table named "Orders"
+tEnv.executeSql("CREATE TABLE MyTable1 (count bigint, work VARCHAR(256) WITH (...)");
+tEnv.executeSql("CREATE TABLE MyTable2 (count bigint, work VARCHAR(256) WITH (...)");
+
+// explain SELECT statement through TableEnvironment.explainSql()
+String explanation = tEnv.explainSql(
+  "SELECT count, word FROM MyTable1 WHERE word LIKE 'F%' " +
+  "UNION ALL " + 
+  "SELECT count, word FROM MyTable2");
+System.out.println(explanation);
+
+// explain SELECT statement through TableEnvironment.executeSql()
+TableResult tableResult = tEnv.executeSql(
+  "EXPLAIN PLAN FOR " + 
+  "SELECT count, word FROM MyTable1 WHERE word LIKE 'F%' " +
+  "UNION ALL " + 
+  "SELECT count, word FROM MyTable2");
+tableResult.print();
+
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val env = StreamExecutionEnvironment.getExecutionEnvironment()
+val tEnv = StreamTableEnvironment.create(env)
+
+// register a table named "Orders"
+tEnv.executeSql("CREATE TABLE MyTable1 (count bigint, work VARCHAR(256) WITH (...)")
+tEnv.executeSql("CREATE TABLE MyTable2 (count bigint, work VARCHAR(256) WITH (...)")
+
+// explain SELECT statement through TableEnvironment.explainSql()
+val explanation = tEnv.explainSql(
+  "SELECT count, word FROM MyTable1 WHERE word LIKE 'F%' " +
+  "UNION ALL " + 
+  "SELECT count, word FROM MyTable2")
+println(explanation)
+
+// explain SELECT statement through TableEnvironment.executeSql()
+val tableResult = tEnv.executeSql(
+  "EXPLAIN PLAN FOR " + 
+  "SELECT count, word FROM MyTable1 WHERE word LIKE 'F%' " +
+  "UNION ALL " + 
+  "SELECT count, word FROM MyTable2")
+tableResult.print()
+
+{% endhighlight %}
+</div>
+
+<div data-lang="python" markdown="1">
+{% highlight python %}
+settings = EnvironmentSettings.new_instance()...
+table_env = StreamTableEnvironment.create(env, settings)
+
+t_env.execute_sql("CREATE TABLE MyTable1 (count bigint, work VARCHAR(256) WITH (...)")
+t_env.execute_sql("CREATE TABLE MyTable2 (count bigint, work VARCHAR(256) WITH (...)")
+
+# explain SELECT statement through TableEnvironment.explain_sql()
+explanation1 = t_env.explain_sql(
+    "SELECT count, word FROM MyTable1 WHERE word LIKE 'F%' "
+    "UNION ALL "
+    "SELECT count, word FROM MyTable2")
+print(explanation1)
+
+# explain SELECT statement through TableEnvironment.execute_sql()
+table_result = t_env.execute_sql(
+    "EXPLAIN PLAN FOR "
+    "SELECT count, word FROM MyTable1 WHERE word LIKE 'F%' "
+    "UNION ALL "
+    "SELECT count, word FROM MyTable2")
+table_result.print()
+
+{% endhighlight %}
+</div>
+
+<div data-lang="SQL CLI" markdown="1">
+{% highlight sql %}
+Flink SQL> CREATE TABLE MyTable1 (count bigint, work VARCHAR(256);
+[INFO] Table has been created.
+
+Flink SQL> CREATE TABLE MyTable2 (count bigint, work VARCHAR(256);
+[INFO] Table has been created.
+
+Flink SQL> EXPLAIN PLAN FOR SELECT count, word FROM MyTable1 WHERE word LIKE 'F%' 
+> UNION ALL 
+> SELECT count, word FROM MyTable2;
+
+{% endhighlight %}
+</div>
+</div>
+
+The `EXPLAIN` result is:
+<div>
+{% highlight text %}
+== Abstract Syntax Tree ==
+LogicalUnion(all=[true])
+  LogicalFilter(condition=[LIKE($1, _UTF-16LE'F%')])
+    FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, MyTable1]], fields=[count, word])
+  FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, MyTable2]], fields=[count, word])
+  
+
+== Optimized Logical Plan ==
+DataStreamUnion(all=[true], union all=[count, word])
+  DataStreamCalc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')])
+    TableSourceScan(table=[[default_catalog, default_database, MyTable1]], fields=[count, word])
+  TableSourceScan(table=[[default_catalog, default_database, MyTable2]], fields=[count, word])
+
+== Physical Execution Plan ==
+Stage 1 : Data Source
+	content : collect elements with CollectionInputFormat
+
+Stage 2 : Data Source
+	content : collect elements with CollectionInputFormat
+
+	Stage 3 : Operator
+		content : from: (count, word)
+		ship_strategy : REBALANCE
+
+		Stage 4 : Operator
+			content : where: (LIKE(word, _UTF-16LE'F%')), select: (count, word)
+			ship_strategy : FORWARD
+
+			Stage 5 : Operator
+				content : from: (count, word)
+				ship_strategy : REBALANCE
+{% endhighlight %}
+</div>
+
+{% top %}
+
+## Syntax
+
+{% highlight sql %}
+EXPLAIN PLAN FOR <query_statement_or_insert_statement>
+{% endhighlight %}
+
+For query syntax, please refer to [Queries]({{ site.baseurl }}/dev/table/sql/queries.html#supported-syntax) page.
+For INSERT, please refer to [INSERT]({{ site.baseurl }}/dev/table/sql/insert.html) page.
diff --git a/docs/dev/table/sql/explain.zh.md b/docs/dev/table/sql/explain.zh.md
new file mode 100644
index 0000000..e2f2c1e
--- /dev/null
+++ b/docs/dev/table/sql/explain.zh.md
@@ -0,0 +1,182 @@
+---
+title: "EXPLAIN 语句"
+nav-parent_id: sql
+nav-pos: 8
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+* This will be replaced by the TOC
+{:toc}
+
+EXPLAIN 语句用来解释一条 query 语句或者 INSERT 语句的逻辑计划和优化后的计划。
+
+
+## 运行一条 EXPLAIN 语句
+
+EXPLAIN 语句可以通过 `TableEnvironment` 的 `executeSql()` 执行,也可以在 [SQL CLI]({{ site.baseurl }}/zh/dev/table/sqlClient.html) 中执行 EXPLAIN 语句。 若 EXPLAIN 操作执行成功,`executeSql()` 方法返回解释的结果,否则会抛出异常。
+
+以下的例子展示了如何在 TableEnvironment 和 SQL CLI 中执行一条 EXPLAIN 语句。
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+// register a table named "Orders"
+tEnv.executeSql("CREATE TABLE MyTable1 (count bigint, work VARCHAR(256) WITH (...)");
+tEnv.executeSql("CREATE TABLE MyTable2 (count bigint, work VARCHAR(256) WITH (...)");
+
+// explain SELECT statement through TableEnvironment.explainSql()
+String explanation = tEnv.explainSql(
+  "SELECT count, word FROM MyTable1 WHERE word LIKE 'F%' " +
+  "UNION ALL " + 
+  "SELECT count, word FROM MyTable2");
+System.out.println(explanation);
+
+// explain SELECT statement through TableEnvironment.executeSql()
+TableResult tableResult = tEnv.executeSql(
+  "EXPLAIN PLAN FOR " + 
+  "SELECT count, word FROM MyTable1 WHERE word LIKE 'F%' " +
+  "UNION ALL " + 
+  "SELECT count, word FROM MyTable2");
+tableResult.print();
+
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val env = StreamExecutionEnvironment.getExecutionEnvironment()
+val tEnv = StreamTableEnvironment.create(env)
+
+// register a table named "Orders"
+tEnv.executeSql("CREATE TABLE MyTable1 (count bigint, work VARCHAR(256) WITH (...)")
+tEnv.executeSql("CREATE TABLE MyTable2 (count bigint, work VARCHAR(256) WITH (...)")
+
+// explain SELECT statement through TableEnvironment.explainSql()
+val explanation = tEnv.explainSql(
+  "SELECT count, word FROM MyTable1 WHERE word LIKE 'F%' " +
+  "UNION ALL " + 
+  "SELECT count, word FROM MyTable2")
+println(explanation)
+
+// explain SELECT statement through TableEnvironment.executeSql()
+val tableResult = tEnv.executeSql(
+  "EXPLAIN PLAN FOR " + 
+  "SELECT count, word FROM MyTable1 WHERE word LIKE 'F%' " +
+  "UNION ALL " + 
+  "SELECT count, word FROM MyTable2")
+tableResult.print()
+
+{% endhighlight %}
+</div>
+
+<div data-lang="python" markdown="1">
+{% highlight python %}
+settings = EnvironmentSettings.new_instance()...
+table_env = StreamTableEnvironment.create(env, settings)
+
+t_env.execute_sql("CREATE TABLE MyTable1 (count bigint, work VARCHAR(256) WITH (...)")
+t_env.execute_sql("CREATE TABLE MyTable2 (count bigint, work VARCHAR(256) WITH (...)")
+
+# explain SELECT statement through TableEnvironment.explain_sql()
+explanation1 = t_env.explain_sql(
+    "SELECT count, word FROM MyTable1 WHERE word LIKE 'F%' "
+    "UNION ALL "
+    "SELECT count, word FROM MyTable2")
+print(explanation1)
+
+# explain SELECT statement through TableEnvironment.execute_sql()
+table_result = t_env.execute_sql(
+    "EXPLAIN PLAN FOR "
+    "SELECT count, word FROM MyTable1 WHERE word LIKE 'F%' "
+    "UNION ALL "
+    "SELECT count, word FROM MyTable2")
+table_result.print()
+
+{% endhighlight %}
+</div>
+
+<div data-lang="SQL CLI" markdown="1">
+{% highlight sql %}
+Flink SQL> CREATE TABLE MyTable1 (count bigint, work VARCHAR(256);
+[INFO] Table has been created.
+
+Flink SQL> CREATE TABLE MyTable2 (count bigint, work VARCHAR(256);
+[INFO] Table has been created.
+
+Flink SQL> EXPLAIN PLAN FOR SELECT count, word FROM MyTable1 WHERE word LIKE 'F%' 
+> UNION ALL 
+> SELECT count, word FROM MyTable2;
+
+{% endhighlight %}
+</div>
+</div>
+
+执行 `EXPLAIN` 语句后的结果为:
+<div>
+{% highlight text %}
+== Abstract Syntax Tree ==
+LogicalUnion(all=[true])
+  LogicalFilter(condition=[LIKE($1, _UTF-16LE'F%')])
+    FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, MyTable1]], fields=[count, word])
+  FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, MyTable2]], fields=[count, word])
+  
+
+== Optimized Logical Plan ==
+DataStreamUnion(all=[true], union all=[count, word])
+  DataStreamCalc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')])
+    TableSourceScan(table=[[default_catalog, default_database, MyTable1]], fields=[count, word])
+  TableSourceScan(table=[[default_catalog, default_database, MyTable2]], fields=[count, word])
+
+== Physical Execution Plan ==
+Stage 1 : Data Source
+	content : collect elements with CollectionInputFormat
+
+Stage 2 : Data Source
+	content : collect elements with CollectionInputFormat
+
+	Stage 3 : Operator
+		content : from: (count, word)
+		ship_strategy : REBALANCE
+
+		Stage 4 : Operator
+			content : where: (LIKE(word, _UTF-16LE'F%')), select: (count, word)
+			ship_strategy : FORWARD
+
+			Stage 5 : Operator
+				content : from: (count, word)
+				ship_strategy : REBALANCE
+{% endhighlight %}
+</div>
+
+{% top %}
+
+## 语法
+
+{% highlight sql %}
+EXPLAIN PLAN FOR <query_statement_or_insert_statement>
+{% endhighlight %}
+
+请参阅 [Queries]({{ site.baseurl }}/zh/dev/table/sql/queries.html#supported-syntax) 页面获得 query 的语法。
+请参阅 [INSERT]({{ site.baseurl }}/zh/dev/table/sql/insert.html) 页面获得 INSERT 的语法。
+
+{% top %}
diff --git a/docs/dev/table/sql/index.md b/docs/dev/table/sql/index.md
index 06f8bd3..5ac4ff1 100644
--- a/docs/dev/table/sql/index.md
+++ b/docs/dev/table/sql/index.md
@@ -35,6 +35,7 @@ This page lists all the supported statements supported in Flink SQL for now:
 - [INSERT](insert.html)
 - [SQL HINTS](hints.html)
 - [DESCRIBE](describe.html)
+- [EXPLAIN](explain.html)
 
 ## Data Types
 
diff --git a/docs/dev/table/sql/index.zh.md b/docs/dev/table/sql/index.zh.md
index 00d34ab..9b220f7 100644
--- a/docs/dev/table/sql/index.zh.md
+++ b/docs/dev/table/sql/index.zh.md
@@ -35,6 +35,7 @@ under the License.
 - [INSERT](insert.html)
 - [SQL HINTS](hints.html)
 - [DESCRIBE](describe.html)
+- [EXPLAIN](explain.html)
 
 ## 数据类型
 


[flink] 01/08: [hotfix][table] fix typos in PlannerBase javadoc

Posted by tw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 8a87231300a81ea0be77989c72221eaaff62b381
Author: godfreyhe <go...@163.com>
AuthorDate: Tue Jun 9 16:31:21 2020 +0800

    [hotfix][table] fix typos in PlannerBase javadoc
---
 .../scala/org/apache/flink/table/planner/delegation/PlannerBase.scala   | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
index 451f4d7..fe9da90 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
@@ -60,7 +60,7 @@ import java.util.function.{Function => JFunction, Supplier => JSupplier}
 import _root_.scala.collection.JavaConversions._
 
 /**
-  * Implementation of [[Planner]] for legacy Flink planner. It supports only streaming use cases.
+  * Implementation of [[Planner]] for blink planner. It supports only streaming use cases.
   * (The new [[org.apache.flink.table.sources.InputFormatTableSource]] should work, but will be
   * handled as streaming sources, and no batch specific optimizations will be applied).
   *


[flink] 03/08: [hotfix][table] Code cleanup: use new methods introduced in FLIP-84 instead of deprecated methods

Posted by tw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit fc886eb9ee7308618353ae6fd688976abc6bd843
Author: godfreyhe <go...@163.com>
AuthorDate: Tue Jun 9 17:27:14 2020 +0800

    [hotfix][table] Code cleanup: use new methods introduced in FLIP-84 instead of deprecated methods
---
 .../jdbc/table/JdbcLookupTableITCase.java          |  2 +-
 .../flink/sql/tests/BatchSQLTestProgram.java       |  2 +-
 .../flink/table/api/TableEnvironmentTest.scala     |  5 +-
 .../validation/LegacyTableSinkValidationTest.scala |  3 +-
 .../planner/runtime/FileSystemITCaseBase.scala     |  4 +-
 .../runtime/batch/table/TableSinkITCase.scala      | 28 +++----
 .../runtime/stream/table/TableSinkITCase.scala     | 86 ++++++++--------------
 7 files changed, 45 insertions(+), 85 deletions(-)

diff --git a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcLookupTableITCase.java b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcLookupTableITCase.java
index 8babd64..3e3a449 100644
--- a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcLookupTableITCase.java
+++ b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcLookupTableITCase.java
@@ -144,7 +144,7 @@ public class JdbcLookupTableITCase extends JdbcLookupTestBase {
 		tEnv.createTemporaryView("T", t);
 
 		String cacheConfig = ", 'lookup.cache.max-rows'='4', 'lookup.cache.ttl'='10000', 'lookup.max-retries'='5'";
-		tEnv.sqlUpdate(
+		tEnv.executeSql(
 			String.format("create table lookup (" +
 				"  id1 INT," +
 				"  id2 VARCHAR," +
diff --git a/flink-end-to-end-tests/flink-batch-sql-test/src/main/java/org/apache/flink/sql/tests/BatchSQLTestProgram.java b/flink-end-to-end-tests/flink-batch-sql-test/src/main/java/org/apache/flink/sql/tests/BatchSQLTestProgram.java
index 1c9d92f..9698b48 100644
--- a/flink-end-to-end-tests/flink-batch-sql-test/src/main/java/org/apache/flink/sql/tests/BatchSQLTestProgram.java
+++ b/flink-end-to-end-tests/flink-batch-sql-test/src/main/java/org/apache/flink/sql/tests/BatchSQLTestProgram.java
@@ -48,7 +48,7 @@ import java.util.NoSuchElementException;
  *
  * <p>Parameters:
  * -outputPath output file path for CsvTableSink;
- * -sqlStatement SQL statement that will be executed as sqlUpdate
+ * -sqlStatement SQL statement that will be executed as executeSql
  */
 public class BatchSQLTestProgram {
 
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
index b2073dc..f7731f1 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
@@ -99,11 +99,8 @@ class TableEnvironmentTest {
     TestTableSourceSinks.createCsvTemporarySinkTable(
       tEnv, new TableSchema(Array("first"), Array(STRING)), "MySink", -1)
 
-    val table1 = tEnv.sqlQuery("select first from MyTable")
-    tEnv.insertInto(table1, "MySink")
-
     val expected = TableTestUtil.readFromResource("/explain/testStreamTableEnvironmentExplain.out")
-    val actual = tEnv.explain(false)
+    val actual = tEnv.explainSql("insert into MySink select first from MyTable")
     assertEquals(TableTestUtil.replaceStageId(expected), TableTestUtil.replaceStageId(actual))
   }
 
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/validation/LegacyTableSinkValidationTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/validation/LegacyTableSinkValidationTest.scala
index ce0293c..e17bdb5 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/validation/LegacyTableSinkValidationTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/validation/LegacyTableSinkValidationTest.scala
@@ -65,9 +65,8 @@ class LegacyTableSinkValidationTest extends TableTestBase {
     val schema = result.getSchema
     sink.configure(schema.getFieldNames, schema.getFieldTypes)
     tEnv.asInstanceOf[TableEnvironmentInternal].registerTableSinkInternal("testSink", sink)
-    tEnv.insertInto("testSink", result)
     // must fail because table is updating table without full key
-    env.execute()
+    result.executeInsert("testSink")
   }
 
   @Test(expected = classOf[TableException])
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/FileSystemITCaseBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/FileSystemITCaseBase.scala
index 28af0e9..b41aaf8 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/FileSystemITCaseBase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/FileSystemITCaseBase.scala
@@ -243,8 +243,8 @@ trait FileSystemITCaseBase {
 
   @Test
   def testProjectPushDown(): Unit = {
-    tableEnv.sqlUpdate("insert into partitionedTable select x, y, a, b from originalT")
-    tableEnv.execute("test")
+    execInsertSqlAndWaitResult(
+      tableEnv, "insert into partitionedTable select x, y, a, b from originalT")
 
     check(
       "select y, b, x from partitionedTable where a=3",
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/TableSinkITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/TableSinkITCase.scala
index f6aaf0f..9082c8b 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/TableSinkITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/TableSinkITCase.scala
@@ -48,12 +48,10 @@ class TableSinkITCase extends BatchTestBase {
 
     registerCollection("MyTable", data3, type3, "a, b, c", nullablesOfData3)
 
-    tEnv.from("MyTable")
+    val table = tEnv.from("MyTable")
       .where('a > 20)
       .select("12345", 55.cast(DataTypes.DECIMAL(10, 0)), "12345".cast(DataTypes.CHAR(5)))
-      .insertInto("sink")
-
-    tEnv.execute("job name")
+    execInsertTableAndWaitResult(table, "sink")
 
     val result = TestValuesTableFactory.getResults("sink")
     val expected = Seq("12345,55,12345")
@@ -76,12 +74,10 @@ class TableSinkITCase extends BatchTestBase {
 
     registerCollection("MyTable", data3, type3, "a, b, c", nullablesOfData3)
 
-    tEnv.from("MyTable")
+    val table = tEnv.from("MyTable")
       .where('a > 20)
       .select("12345", 55.cast(DataTypes.DECIMAL(10, 0)), "12345".cast(DataTypes.CHAR(5)))
-      .insertInto("sink")
-
-    tEnv.execute("job name")
+    execInsertTableAndWaitResult(table, "sink")
 
     val result = TestValuesTableFactory.getResults("sink")
     val expected = Seq("12345,55,12345")
@@ -104,11 +100,10 @@ class TableSinkITCase extends BatchTestBase {
 
     registerCollection("MyTable", simpleData2, simpleType2, "a, b", nullableOfSimpleData2)
 
-    tEnv.from("MyTable")
+    val table = tEnv.from("MyTable")
       .groupBy('a)
       .select('a, 'b.sum())
-      .insertInto("testSink")
-    tEnv.execute("")
+    execInsertTableAndWaitResult(table, "testSink")
 
     val result = TestValuesTableFactory.getResults("testSink")
     val expected = List(
@@ -135,11 +130,10 @@ class TableSinkITCase extends BatchTestBase {
 
     registerCollection("MyTable", simpleData2, simpleType2, "a, b", nullableOfSimpleData2)
 
-    tEnv.from("MyTable")
+    val table = tEnv.from("MyTable")
       .groupBy('a)
       .select('a, 'b.sum())
-      .insertInto("testSink")
-    tEnv.execute("")
+    execInsertTableAndWaitResult(table, "testSink")
 
     val result = TestValuesTableFactory.getResults("testSink")
     val expected = List(
@@ -177,11 +171,10 @@ class TableSinkITCase extends BatchTestBase {
          |  'sink-insert-only' = 'true'
          |)
          |""".stripMargin)
-    tEnv.sqlUpdate("INSERT INTO not_null_sink SELECT * FROM nullable_src")
 
     // default should fail, because there are null values in the source
     try {
-      tEnv.execute("job name")
+      execInsertSqlAndWaitResult("INSERT INTO not_null_sink SELECT * FROM nullable_src")
       fail("Execution should fail.")
     } catch {
       case t: Throwable =>
@@ -195,8 +188,7 @@ class TableSinkITCase extends BatchTestBase {
 
     // enable drop enforcer to make the query can run
     tEnv.getConfig.getConfiguration.setString("table.exec.sink.not-null-enforcer", "drop")
-    tEnv.sqlUpdate("INSERT INTO not_null_sink SELECT * FROM nullable_src")
-    tEnv.execute("job name")
+    execInsertSqlAndWaitResult("INSERT INTO not_null_sink SELECT * FROM nullable_src")
 
     val result = TestValuesTableFactory.getResults("not_null_sink")
     val expected = List("book,1,12", "book,4,11", "fruit,3,44")
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableSinkITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableSinkITCase.scala
index c4cbae2..5c1c245 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableSinkITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableSinkITCase.scala
@@ -55,12 +55,10 @@ class TableSinkITCase extends StreamingTestBase {
          |)
          |""".stripMargin)
 
-    t.window(Tumble over 5.millis on 'rowtime as 'w)
+    val table = t.window(Tumble over 5.millis on 'rowtime as 'w)
       .groupBy('w)
       .select('w.end as 't, 'id.count as 'icnt, 'num.sum as 'nsum)
-      .insertInto("appendSink")
-
-    tEnv.execute("job name")
+    execInsertTableAndWaitResult(table, "appendSink")
 
     val result = TestValuesTableFactory.getResults("appendSink")
     val expected = List(
@@ -88,8 +86,7 @@ class TableSinkITCase extends StreamingTestBase {
          |  'sink-insert-only' = 'true'
          |)
          |""".stripMargin)
-    tEnv.sqlUpdate("INSERT INTO appendSink SELECT id, ROW(num, text) FROM src")
-    tEnv.execute("job name")
+    execInsertSqlAndWaitResult("INSERT INTO appendSink SELECT id, ROW(num, text) FROM src")
 
     val result = TestValuesTableFactory.getResults("appendSink")
     val expected = List(
@@ -115,11 +112,9 @@ class TableSinkITCase extends StreamingTestBase {
          |)
          |""".stripMargin)
 
-    ds1.join(ds2).where('b === 'e)
+    val table = ds1.join(ds2).where('b === 'e)
       .select('c, 'g)
-      .insertInto("appendSink")
-
-    tEnv.execute("job name")
+    execInsertTableAndWaitResult(table, "appendSink")
 
     val result = TestValuesTableFactory.getResults("appendSink")
     val expected = List("Hi,Hallo", "Hello,Hallo Welt", "Hello world,Hallo Welt")
@@ -144,12 +139,10 @@ class TableSinkITCase extends StreamingTestBase {
          |)
          |""".stripMargin)
 
-    t.select('id, 'num, 'text.charLength() as 'len)
+    val table = t.select('id, 'num, 'text.charLength() as 'len)
       .groupBy('len)
       .select('len, 'id.count as 'icnt, 'num.sum as 'nsum)
-      .insertInto("retractSink")
-
-    tEnv.execute("job name")
+    execInsertTableAndWaitResult(table, "retractSink")
 
     val result = TestValuesTableFactory.getResults("retractSink")
     val expected = List(
@@ -177,12 +170,10 @@ class TableSinkITCase extends StreamingTestBase {
          |)
          |""".stripMargin)
 
-    t.window(Tumble over 5.millis on 'rowtime as 'w)
+    val table = t.window(Tumble over 5.millis on 'rowtime as 'w)
       .groupBy('w)
       .select('w.end as 't, 'id.count as 'icnt, 'num.sum as 'nsum)
-      .insertInto("retractSink")
-
-    tEnv.execute("job name")
+    execInsertTableAndWaitResult(table, "retractSink")
 
     val rawResult = TestValuesTableFactory.getRawResults("retractSink")
     assertFalse(
@@ -218,15 +209,13 @@ class TableSinkITCase extends StreamingTestBase {
          |)
          |""".stripMargin)
 
-    t.select('id, 'num, 'text.charLength() as 'len, ('id > 0) as 'cTrue)
+    val table = t.select('id, 'num, 'text.charLength() as 'len, ('id > 0) as 'cTrue)
       .groupBy('len, 'cTrue)
       // test query field name is different with registered sink field name
       .select('len, 'id.count as 'count, 'cTrue)
       .groupBy('count, 'cTrue)
       .select('count, 'len.count as 'lencnt, 'cTrue)
-      .insertInto("upsertSink")
-
-    tEnv.execute("job name")
+    execInsertTableAndWaitResult(table, "upsertSink")
 
     val rawResult = TestValuesTableFactory.getRawResults("upsertSink")
     assertTrue(
@@ -257,13 +246,11 @@ class TableSinkITCase extends StreamingTestBase {
          |)
          |""".stripMargin)
 
-    t.window(Tumble over 5.millis on 'rowtime as 'w)
+    val table = t.window(Tumble over 5.millis on 'rowtime as 'w)
       .groupBy('w, 'num)
       // test query field name is different with registered sink field name
       .select('num, 'w.end as 'window_end, 'id.count as 'icnt)
-      .insertInto("upsertSink")
-
-    tEnv.execute("job name")
+    execInsertTableAndWaitResult(table, "upsertSink")
 
     val rawResult = TestValuesTableFactory.getRawResults("upsertSink")
     assertFalse(
@@ -303,12 +290,10 @@ class TableSinkITCase extends StreamingTestBase {
          |)
          |""".stripMargin)
 
-    t.window(Tumble over 5.millis on 'rowtime as 'w)
+    val table = t.window(Tumble over 5.millis on 'rowtime as 'w)
       .groupBy('w, 'num)
       .select('w.end as 'wend, 'id.count as 'cnt)
-      .insertInto("upsertSink")
-
-    tEnv.execute("job name")
+    execInsertTableAndWaitResult(table, "upsertSink")
 
     val rawResult = TestValuesTableFactory.getRawResults("upsertSink")
     assertFalse(
@@ -347,12 +332,10 @@ class TableSinkITCase extends StreamingTestBase {
          |)
          |""".stripMargin)
 
-    t.window(Tumble over 5.millis on 'rowtime as 'w)
+    val table = t.window(Tumble over 5.millis on 'rowtime as 'w)
       .groupBy('w, 'num)
       .select('num, 'id.count as 'cnt)
-      .insertInto("upsertSink")
-
-    tEnv.execute("job name")
+    execInsertTableAndWaitResult(table, "upsertSink")
 
     val rawResult = TestValuesTableFactory.getRawResults("upsertSink")
     assertFalse(
@@ -399,12 +382,10 @@ class TableSinkITCase extends StreamingTestBase {
     //   5, 5
     //   6, 6
 
-    t.groupBy('num)
+    val table = t.groupBy('num)
       .select('num, 'id.count as 'cnt)
       .where('cnt <= 3)
-      .insertInto("upsertSink")
-
-    tEnv.execute("job name")
+    execInsertTableAndWaitResult(table, "upsertSink")
 
     val result = TestValuesTableFactory.getResults("upsertSink")
     val expected = List("1,1", "2,2", "3,3")
@@ -429,15 +410,14 @@ class TableSinkITCase extends StreamingTestBase {
          |)
          |""".stripMargin)
 
-    t.window(Tumble over 5.milli on 'rowtime as 'w)
+    val table = t.window(Tumble over 5.milli on 'rowtime as 'w)
       .groupBy('num, 'w)
       .select('num, 'w.rowtime as 'rowtime1, 'w.rowtime as 'rowtime2)
-      .insertInto("sink")
 
     thrown.expect(classOf[TableException])
     thrown.expectMessage("Found more than one rowtime field: [rowtime1, rowtime2] " +
       "in the query when insert into 'default_catalog.default_database.sink'")
-    tEnv.execute("job name")
+    table.executeInsert("sink")
   }
 
   @Test
@@ -454,13 +434,11 @@ class TableSinkITCase extends StreamingTestBase {
          |)
          |""".stripMargin)
 
-    env.fromCollection(tupleData3)
+    val table = env.fromCollection(tupleData3)
       .toTable(tEnv, 'a, 'b, 'c)
       .where('a > 20)
       .select("12345", 55.cast(DataTypes.DECIMAL(10, 0)), "12345".cast(DataTypes.CHAR(5)))
-      .insertInto("sink")
-
-    tEnv.execute("job name")
+    execInsertTableAndWaitResult(table, "sink")
 
     val result = TestValuesTableFactory.getResults("sink")
     val expected = Seq("12345,55,12345")
@@ -482,13 +460,11 @@ class TableSinkITCase extends StreamingTestBase {
          |)
          |""".stripMargin)
 
-    env.fromCollection(tupleData3)
+    val table = env.fromCollection(tupleData3)
       .toTable(tEnv, 'a, 'b, 'c)
       .where('a > 20)
       .select("12345", 55.cast(DataTypes.DECIMAL(10, 0)), "12345".cast(DataTypes.CHAR(5)))
-      .insertInto("sink")
-
-    tEnv.execute("job name")
+    execInsertTableAndWaitResult(table, "sink")
 
     val result = TestValuesTableFactory.getResults("sink")
     val expected = Seq("12345,55,12345")
@@ -532,14 +508,13 @@ class TableSinkITCase extends StreamingTestBase {
         |  'sink-insert-only' = 'false'
         |)
         |""".stripMargin)
-    tEnv.sqlUpdate(
+    execInsertSqlAndWaitResult(
       """
         |INSERT INTO changelog_sink
         |SELECT product_id, user_name, SUM(order_price)
         |FROM orders
         |GROUP BY product_id, user_name
         |""".stripMargin)
-    tEnv.execute("job name")
 
     val rawResult = TestValuesTableFactory.getRawResults("changelog_sink")
     val expected = List(
@@ -584,14 +559,13 @@ class TableSinkITCase extends StreamingTestBase {
         |  'sink-insert-only' = 'false'
         |)
         |""".stripMargin)
-    tEnv.sqlUpdate(
+    execInsertSqlAndWaitResult(
       """
         |INSERT INTO final_sink
         |SELECT user_name, SUM(price) as total_pay
         |FROM changelog_source
         |GROUP BY user_name
         |""".stripMargin)
-    tEnv.execute("job name")
     val finalResult = TestValuesTableFactory.getResults("final_sink")
     val finalExpected = List(
       "user1,28.12", "user2,71.20", "user3,32.33", "user4,9.99")
@@ -623,11 +597,10 @@ class TableSinkITCase extends StreamingTestBase {
          |  'sink-insert-only' = 'true'
          |)
          |""".stripMargin)
-    tEnv.sqlUpdate("INSERT INTO not_null_sink SELECT * FROM nullable_src")
 
     // default should fail, because there are null values in the source
     try {
-      tEnv.execute("job name")
+      execInsertSqlAndWaitResult("INSERT INTO not_null_sink SELECT * FROM nullable_src")
       fail("Execution should fail.")
     } catch {
       case t: Throwable =>
@@ -641,8 +614,7 @@ class TableSinkITCase extends StreamingTestBase {
 
     // enable drop enforcer to make the query can run
     tEnv.getConfig.getConfiguration.setString("table.exec.sink.not-null-enforcer", "drop")
-    tEnv.sqlUpdate("INSERT INTO not_null_sink SELECT * FROM nullable_src")
-    tEnv.execute("job name")
+    execInsertSqlAndWaitResult("INSERT INTO not_null_sink SELECT * FROM nullable_src")
 
     val result = TestValuesTableFactory.getResults("not_null_sink")
     val expected = List("book,1,12", "book,4,11", "fruit,3,44")