You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2019/08/23 03:43:56 UTC

[flink] branch release-1.9 updated: [FLINK-13354] [docs] Add documentation for how to use blink planner

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.9 by this push:
     new 1dce494  [FLINK-13354] [docs] Add documentation for how to use blink planner
1dce494 is described below

commit 1dce49445263cca3733c3923c129c7c3c1f9b581
Author: godfreyhe <go...@163.com>
AuthorDate: Fri Aug 16 15:28:49 2019 +0800

    [FLINK-13354] [docs] Add documentation for how to use blink planner
---
 docs/dev/table/common.md    | 527 +++++++++++++++++++++++++++++++++----------
 docs/dev/table/common.zh.md | 535 ++++++++++++++++++++++++++++++++++----------
 2 files changed, 830 insertions(+), 232 deletions(-)

diff --git a/docs/dev/table/common.md b/docs/dev/table/common.md
index 1e786d1..41acfc6 100644
--- a/docs/dev/table/common.md
+++ b/docs/dev/table/common.md
@@ -27,6 +27,19 @@ The Table API and SQL are integrated in a joint API. The central concept of this
 * This will be replaced by the TOC
 {:toc}
 
+Main Differences Between the Two Planners
+-----------------------------------------
+
+1. Blink treats batch jobs as a special case of streaming. As such, the conversion between Table and DataSet is also not supported, and batch jobs will not be translated into `DateSet` programs but translated into `DataStream` programs, the same as the streaming jobs.
+2. The Blink planner does not support `BatchTableSource`, use bounded `StreamTableSource` instead of it. 
+3. The Blink planner only support the brand new `Catalog` and does not support `ExternalCatalog` which is deprecated.
+4. The implementations of `FilterableTableSource` for the old planner and the Blink planner are incompatible. The old planner will push down `PlannerExpression`s into `FilterableTableSource`, while the Blink planner will push down `Expression`s.
+5. String based key-value config options (Please see the documentation about [Configuration]({{ site.baseurl }}/dev/table/config.html) for details) are only used for the Blink planner.
+6. The implementation(`CalciteConfig`) of `PlannerConfig` in two planners is different.
+7. The Blink planner will optimize multiple-sinks into one DAG (supported only on `TableEnvironment`, not on `StreamTableEnvironment`). The old planner will always optimize each sink into a new DAG, where all DAGs are independent of each other.
+8. The old planner does not support catalog statistics now, while the Blink planner does.
+
+
 Structure of Table API and SQL Programs
 ---------------------------------------
 
@@ -35,12 +48,9 @@ All Table API and SQL programs for batch and streaming follow the same pattern.
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
-// for batch programs use ExecutionEnvironment instead of StreamExecutionEnvironment
-StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
-// create a TableEnvironment
-// for batch programs use BatchTableEnvironment instead of StreamTableEnvironment
-StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
+// create a TableEnvironment for specific planner batch or streaming
+TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
 
 // register a Table
 tableEnv.registerTable("table1", ...)            // or
@@ -58,18 +68,16 @@ Table sqlResult  = tableEnv.sqlQuery("SELECT ... FROM table2 ... ");
 tapiResult.insertInto("outputTable");
 
 // execute
-env.execute();
+tableEnv.execute("java_job");
 
 {% endhighlight %}
 </div>
 
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
-// for batch programs use ExecutionEnvironment instead of StreamExecutionEnvironment
-val env = StreamExecutionEnvironment.getExecutionEnvironment
 
-// create a TableEnvironment
-val tableEnv = StreamTableEnvironment.create(env)
+// create a TableEnvironment for specific planner batch or streaming
+val tableEnv = ... // see "Create a TableEnvironment" section
 
 // register a Table
 tableEnv.registerTable("table1", ...)           // or
@@ -87,18 +95,16 @@ val sqlResult  = tableEnv.sqlQuery("SELECT ... FROM table2 ...")
 tapiResult.insertInto("outputTable")
 
 // execute
-env.execute()
+tableEnv.execute("scala_job")
 
 {% endhighlight %}
 </div>
 
 <div data-lang="python" markdown="1">
 {% highlight python %}
-# for batch programs use ExecutionEnvironment instead of StreamExecutionEnvironment
-env = StreamExecutionEnvironment.get_execution_environment()
 
-# create a TableEnvironment
-table_env = StreamTableEnvironment.create(env)
+# create a TableEnvironment for specific planner batch or streaming
+table_env = ... # see "Create a TableEnvironment" section
 
 # register a Table
 table_env.register_table("table1", ...)           # or
@@ -142,82 +148,153 @@ A `Table` is always bound to a specific `TableEnvironment`. It is not possible t
 
 A `TableEnvironment` is created by calling the static `BatchTableEnvironment.create()` or `StreamTableEnvironment.create()` method with a `StreamExecutionEnvironment` or an `ExecutionEnvironment` and an optional `TableConfig`. The `TableConfig` can be used to configure the `TableEnvironment` or to customize the query optimization and translation process (see [Query Optimization](#query-optimization)).
 
-Make sure to choose the `BatchTableEnvironment`/`StreamTableEnvironment` that matches your programming language.
+Make sure to choose the specific planner `BatchTableEnvironment`/`StreamTableEnvironment` that matches your programming language.
+
+If both planner jars are on the classpath (the default behavior), you should explicitly set which planner to use in the current program.
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
-// ***************
-// STREAMING QUERY
-// ***************
+
+// **********************
+// FLINK STREAMING QUERY
+// **********************
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.java.StreamTableEnvironment;
 
-StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment();
-// create a TableEnvironment for streaming queries
-StreamTableEnvironment sTableEnv = StreamTableEnvironment.create(sEnv);
+EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build();
+StreamExecutionEnvironment fsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
+StreamTableEnvironment fsTableEnv = StreamTableEnvironment.create(fsEnv, fsSettings);
+// or TableEnvironment fsTableEnv = TableEnvironment.create(fsSettings);
 
-// ***********
-// BATCH QUERY
-// ***********
+// ******************
+// FLINK BATCH QUERY
+// ******************
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.table.api.java.BatchTableEnvironment;
 
-ExecutionEnvironment bEnv = ExecutionEnvironment.getExecutionEnvironment();
-// create a TableEnvironment for batch queries
-BatchTableEnvironment bTableEnv = BatchTableEnvironment.create(bEnv);
+ExecutionEnvironment fbEnv = ExecutionEnvironment.getExecutionEnvironment();
+BatchTableEnvironment fbTableEnv = BatchTableEnvironment.create(fbEnv);
+
+// **********************
+// BLINK STREAMING QUERY
+// **********************
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.java.StreamTableEnvironment;
+
+StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
+EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
+StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);
+// or TableEnvironment bsTableEnv = TableEnvironment.create(bsSettings);
+
+// ******************
+// BLINK BATCH QUERY
+// ******************
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableEnvironment;
+
+EnvironmentSettings bbSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
+TableEnvironment bbTableEnv = TableEnvironment.create(bbSettings);
+
 {% endhighlight %}
 </div>
 
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
-// ***************
-// STREAMING QUERY
-// ***************
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
+
+// **********************
+// FLINK STREAMING QUERY
+// **********************
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.table.api.EnvironmentSettings
 import org.apache.flink.table.api.scala.StreamTableEnvironment
 
-val sEnv = StreamExecutionEnvironment.getExecutionEnvironment
-// create a TableEnvironment for streaming queries
-val sTableEnv = StreamTableEnvironment.create(sEnv)
+val fsSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build()
+val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment
+val fsTableEnv = StreamTableEnvironment.create(fsEnv, fsSettings)
+// or val fsTableEnv = TableEnvironment.create(fsSettings)
 
-// ***********
-// BATCH QUERY
-// ***********
-import org.apache.flink.api.java.ExecutionEnvironment;
+// ******************
+// FLINK BATCH QUERY
+// ******************
+import org.apache.flink.api.scala.ExecutionEnvironment
 import org.apache.flink.table.api.scala.BatchTableEnvironment
 
-val bEnv = ExecutionEnvironment.getExecutionEnvironment
-// create a TableEnvironment for batch queries
-val bTableEnv = BatchTableEnvironment.create(bEnv)
+val fbEnv = ExecutionEnvironment.getExecutionEnvironment
+val fbTableEnv = BatchTableEnvironment.create(fbEnv)
+
+// **********************
+// BLINK STREAMING QUERY
+// **********************
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.table.api.EnvironmentSettings
+import org.apache.flink.table.api.scala.StreamTableEnvironment
+
+val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
+val bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
+val bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings)
+// or val bsTableEnv = TableEnvironment.create(bsSettings)
+
+// ******************
+// BLINK BATCH QUERY
+// ******************
+import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment}
+
+val bbSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build()
+val bbTableEnv = TableEnvironment.create(bbSettings)
+
 {% endhighlight %}
 </div>
 
 <div data-lang="python" markdown="1">
 {% highlight python %}
-# ***************
-# STREAMING QUERY
-# ***************
+
+# **********************
+# FLINK STREAMING QUERY
+# **********************
 from pyflink.datastream import StreamExecutionEnvironment
-from pyflink.table import StreamTableEnvironment
+from pyflink.table import StreamTableEnvironment, EnvironmentSettings
 
-s_env = StreamExecutionEnvironment.get_execution_environment()
-# create a TableEnvironment for streaming queries
-s_table_env = StreamTableEnvironment.create(s_env)
+f_s_env = StreamExecutionEnvironment.get_execution_environment()
+f_s_settings = EnvironmentSettings.new_instance().use_old_planner().in_streaming_mode().build()
+f_s_t_env = StreamTableEnvironment.create(f_s_env, environment_settings=f_s_settings)
 
-# ***********
-# BATCH QUERY
-# ***********
+# ******************
+# FLINK BATCH QUERY
+# ******************
 from pyflink.dataset import ExecutionEnvironment
 from pyflink.table import BatchTableEnvironment
 
-b_env = ExecutionEnvironment.get_execution_environment()
-# create a TableEnvironment for batch queries
-b_table_env = BatchTableEnvironment.create(b_env)
+f_b_env = ExecutionEnvironment.get_execution_environment()
+f_b_t_env = BatchTableEnvironment.create(f_b_env, table_config)
+
+# **********************
+# BLINK STREAMING QUERY
+# **********************
+from pyflink.datastream import StreamExecutionEnvironment
+from pyflink.table import StreamTableEnvironment, EnvironmentSettings
+
+b_s_env = StreamExecutionEnvironment.get_execution_environment()
+b_s_settings = EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build()
+b_s_t_env = StreamTableEnvironment.create(b_s_env, environment_settings=b_s_settings)
+
+# ******************
+# BLINK BATCH QUERY
+# ******************
+from pyflink.table import EnvironmentSettings, BatchTableEnvironment
+
+b_b_settings = EnvironmentSettings.new_instance().use_blink_planner().in_batch_mode().build()
+b_b_t_env = BatchTableEnvironment.create(environment_settings=b_b_settings)
+
 {% endhighlight %}
 </div>
 </div>
 
+**Note:** If there is only one planner jar in `/lib` directory, you can use `useAnyPlanner` (`use_any_planner` for python) to create specific `EnvironmentSettings`.
+
+
 {% top %}
 
 Register Tables in the Catalog
@@ -229,7 +306,7 @@ An input table can be registered from various sources:
 
 * an existing `Table` object, usually the result of a Table API or SQL query.
 * a `TableSource`, which accesses external data, such as a file, database, or messaging system. 
-* a `DataStream` or `DataSet` from a DataStream or DataSet program. Registering a `DataStream` or `DataSet` is discussed in the [Integration with DataStream and DataSet API](#integration-with-datastream-and-dataset-api) section.
+* a `DataStream` or `DataSet` from a DataStream (only for stream job) or DataSet (only for batch job translated from old planner) program. Registering a `DataStream` or `DataSet` is discussed in the [Integration with DataStream and DataSet API](#integration-with-datastream-and-dataset-api) section.
 
 An output table can be registered using a `TableSink`.
 
@@ -240,8 +317,8 @@ A `Table` is registered in a `TableEnvironment` as follows:
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
-// get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
-StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
+// get a TableEnvironment
+TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
 
 // table is the result of a simple projection query 
 Table projTable = tableEnv.scan("X").select(...);
@@ -254,7 +331,7 @@ tableEnv.registerTable("projectedTable", projTable);
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
 // get a TableEnvironment
-val tableEnv = StreamTableEnvironment.create(env)
+val tableEnv = ... // see "Create a TableEnvironment" section
 
 // table is the result of a simple projection query 
 val projTable: Table = tableEnv.scan("X").select(...)
@@ -267,7 +344,7 @@ tableEnv.registerTable("projectedTable", projTable)
 <div data-lang="python" markdown="1">
 {% highlight python %}
 # get a TableEnvironment
-table_env = StreamTableEnvironment.create(env)
+table_env = ... # see "Create a TableEnvironment" section
 
 # table is the result of a simple projection query 
 proj_table = table_env.scan("X").select(...)
@@ -293,8 +370,8 @@ A `TableSource` is registered in a `TableEnvironment` as follows:
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
-// get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
-StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
+// get a TableEnvironment
+TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
 
 // create a TableSource
 TableSource csvSource = new CsvTableSource("/path/to/file", ...);
@@ -307,7 +384,7 @@ tableEnv.registerTableSource("CsvTable", csvSource);
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
 // get a TableEnvironment
-val tableEnv = StreamTableEnvironment.create(env)
+val tableEnv = ... // see "Create a TableEnvironment" section
 
 // create a TableSource
 val csvSource: TableSource = new CsvTableSource("/path/to/file", ...)
@@ -320,7 +397,7 @@ tableEnv.registerTableSource("CsvTable", csvSource)
 <div data-lang="python" markdown="1">
 {% highlight python %}
 # get a TableEnvironment
-table_env = StreamTableEnvironment.create(env)
+table_env = ... # see "Create a TableEnvironment" section
 
 # create a TableSource
 csv_source = CsvTableSource("/path/to/file", ...)
@@ -331,6 +408,8 @@ table_env.register_table_source("csvTable", csv_source)
 </div>
 </div>
 
+**Note:** A `TableEnvironment` used for Blink planner only accepts `StreamTableSource`, `LookupableTableSource` and `InputFormatTableSource`, and a `StreamTableSource` used for Blink planner on batch must be bounded.
+
 {% top %}
 
 ### Register a TableSink
@@ -344,8 +423,8 @@ A `TableSink` is registered in a `TableEnvironment` as follows:
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
-// get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
-StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
+// get a TableEnvironment
+TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
 
 // create a TableSink
 TableSink csvSink = new CsvTableSink("/path/to/file", ...);
@@ -362,7 +441,7 @@ tableEnv.registerTableSink("CsvSinkTable", fieldNames, fieldTypes, csvSink);
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
 // get a TableEnvironment
-val tableEnv = StreamTableEnvironment.create(env)
+val tableEnv = ... // see "Create a TableEnvironment" section
 
 // create a TableSink
 val csvSink: TableSink = new CsvTableSink("/path/to/file", ...)
@@ -379,7 +458,7 @@ tableEnv.registerTableSink("CsvSinkTable", fieldNames, fieldTypes, csvSink)
 <div data-lang="python" markdown="1">
 {% highlight python %}
 # get a TableEnvironment
-table_env = StreamTableEnvironment.create(env)
+table_env = ... # see "Create a TableEnvironment" section
 
 # define the field names and types
 field_names = ["a", "b", "c"]
@@ -406,8 +485,8 @@ An external catalog can be created by implementing the `ExternalCatalog` interfa
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
-// get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
-StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
+// get a TableEnvironment
+TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
 
 // create an external catalog
 ExternalCatalog catalog = new InMemoryExternalCatalog();
@@ -420,7 +499,7 @@ tableEnv.registerExternalCatalog("InMemCatalog", catalog);
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
 // get a TableEnvironment
-val tableEnv = StreamTableEnvironment.create(env)
+val tableEnv = ... // see "Create a TableEnvironment" section
 
 // create an external catalog
 val catalog: ExternalCatalog = new InMemoryExternalCatalog
@@ -435,6 +514,8 @@ Once registered in a `TableEnvironment`, all tables defined in a `ExternalCatalo
 
 Currently, Flink provides an `InMemoryExternalCatalog` for demo and testing purposes. However, the `ExternalCatalog` interface can also be used to connect catalogs like HCatalog or Metastore to the Table API.
 
+**Note:** The Blink planner does not support external catalog.
+
 {% top %}
 
 Query a Table 
@@ -453,8 +534,8 @@ The following example shows a simple Table API aggregation query:
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
-// get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
-StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
+// get a TableEnvironment
+TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
 
 // register Orders table
 
@@ -474,7 +555,7 @@ Table revenue = orders
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
 // get a TableEnvironment
-val tableEnv = StreamTableEnvironment.create(env)
+val tableEnv = ... // see "Create a TableEnvironment" section
 
 // register Orders table
 
@@ -495,8 +576,8 @@ val revenue = orders
 
 <div data-lang="python" markdown="1">
 {% highlight python %}
-# get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
-table_env = StreamTableEnvironment.create(env)
+# get a TableEnvironment
+table_env = # see "Create a TableEnvironment" section
 
 # register Orders table
 
@@ -527,8 +608,8 @@ The following example shows how to specify a query and return the result as a `T
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
-// get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
-StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
+// get a TableEnvironment
+TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
 
 // register Orders table
 
@@ -548,7 +629,7 @@ Table revenue = tableEnv.sqlQuery(
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
 // get a TableEnvironment
-val tableEnv = StreamTableEnvironment.create(env)
+val tableEnv = ... // see "Create a TableEnvironment" section
 
 // register Orders table
 
@@ -568,8 +649,8 @@ val revenue = tableEnv.sqlQuery("""
 
 <div data-lang="python" markdown="1">
 {% highlight python %}
-# get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
-table_env = StreamTableEnvironment.create(env)
+# get a TableEnvironment
+table_env = ... # see "Create a TableEnvironment" section
 
 # register Orders table
 
@@ -592,8 +673,8 @@ The following example shows how to specify an update query that inserts its resu
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
-// get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
-StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
+// get a TableEnvironment
+TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
 
 // register "Orders" table
 // register "RevenueFrance" output table
@@ -614,7 +695,7 @@ tableEnv.sqlUpdate(
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
 // get a TableEnvironment
-val tableEnv = StreamTableEnvironment.create(env)
+val tableEnv = ... // see "Create a TableEnvironment" section
 
 // register "Orders" table
 // register "RevenueFrance" output table
@@ -635,8 +716,8 @@ tableEnv.sqlUpdate("""
 
 <div data-lang="python" markdown="1">
 {% highlight python %}
-# get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
-table_env = StreamTableEnvironment.create(env)
+# get a TableEnvironment
+table_env = ... # see "Create a TableEnvironment" section
 
 # register "Orders" table
 # register "RevenueFrance" output table
@@ -682,8 +763,8 @@ The following examples shows how to emit a `Table`:
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
-// get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
-StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
+// get a TableEnvironment
+TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
 
 // create a TableSink
 TableSink sink = new CsvTableSink("/path/to/file", fieldDelim = "|");
@@ -705,7 +786,7 @@ result.insertInto("CsvSinkTable");
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
 // get a TableEnvironment
-val tableEnv = StreamTableEnvironment.create(env)
+val tableEnv = ... // see "Create a TableEnvironment" section
 
 // create a TableSink
 val sink: TableSink = new CsvTableSink("/path/to/file", fieldDelim = "|")
@@ -728,7 +809,7 @@ result.insertInto("CsvSinkTable")
 <div data-lang="python" markdown="1">
 {% highlight python %}
 # get a TableEnvironment
-table_env = StreamTableEnvironment.create(env)
+table_env = ... # see "Create a TableEnvironment" section
 
 field_names = ["a", "b", "c"]
 field_types = [DataTypes.INT(), DataTypes.STRING(), DataTypes.BIGINT()]
@@ -755,10 +836,14 @@ result.insert_into("CsvSinkTable")
 Translate and Execute a Query
 -----------------------------
 
+The behavior of translating and executing a query is different for the two planners.
+
+<div class="codetabs" markdown="1">
+<div data-lang="Old planner" markdown="1">
 Table API and SQL queries are translated into [DataStream]({{ site.baseurl }}/dev/datastream_api.html) or [DataSet]({{ site.baseurl }}/dev/batch) programs depending on whether their input is a streaming or batch input. A query is internally represented as a logical query plan and is translated in two phases: 
 
-1. optimization of the logical plan, 
-2. translation into a DataStream or DataSet program.
+1. Optimization of the logical plan
+2. Translation into a DataStream or DataSet program
 
 A Table API or SQL query is translated when:
 
@@ -768,11 +853,37 @@ A Table API or SQL query is translated when:
 
 Once translated, a Table API or SQL query is handled like a regular DataStream or DataSet program and is executed when `StreamExecutionEnvironment.execute()` or `ExecutionEnvironment.execute()` is called.
 
+</div>
+
+<div data-lang="Blink planner" markdown="1">
+Table API and SQL queries are translated into [DataStream]({{ site.baseurl }}/dev/datastream_api.html) programs whether their input is streaming or batch. A query is internally represented as a logical query plan and is translated in two phases: 
+
+1. Optimization of the logical plan, 
+2. Translation into a DataStream program.
+
+The behavior of translating  a query is different for `TableEnvironment` and `StreamTableEnvironment`.
+
+For `TableEnvironment`, A Table API or SQL query is translated when `TableEnvironment.execute()` is called, because `TableEnvironment` will optimize multiple-sinks into one DAG.
+
+while for `StreamTableEnvironment`, A Table API or SQL query is translated when:
+
+* a `Table` is emitted to a `TableSink`, i.e., when `Table.insertInto()` is called.
+* a SQL update query is specified, i.e., when `TableEnvironment.sqlUpdate()` is called.
+* a `Table` is converted into a `DataStream`.
+
+Once translated, a Table API or SQL query is handled like a regular DataStream program and is executed when `TableEnvironment.execute()` or `StreamExecutionEnvironment.execute()` is called.
+
+</div>
+</div>
+
 {% top %}
 
 Integration with DataStream and DataSet API
 -------------------------------------------
 
+Both planners on stream can integrate with the `DataStream` API. Only old planner can integrate with the `DataSet API`, Blink planner on batch could not be combined with both.
+**Note:** The `DataSet` API discussed below is only relevant for the old planner on batch.
+
 Table API and SQL queries can be easily integrated with and embedded into [DataStream]({{ site.baseurl }}/dev/datastream_api.html) and [DataSet]({{ site.baseurl }}/dev/batch) programs. For instance, it is possible to query an external table (for example from a RDBMS), do some pre-processing, such as filtering, projecting, aggregating, or joining with meta data, and then further process the data with either the DataStream or DataSet API (and any of the libraries built on top of these APIs [...]
 
 This interaction can be achieved by converting a `DataStream` or `DataSet` into a `Table` and vice versa. In this section, we describe how these conversions are done.
@@ -790,7 +901,7 @@ A `DataStream` or `DataSet` can be registered in a `TableEnvironment` as a Table
 {% highlight java %}
 // get StreamTableEnvironment
 // registration of a DataSet in a BatchTableEnvironment is equivalent
-StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
+StreamTableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
 
 DataStream<Tuple2<Long, String>> stream = ...
 
@@ -806,7 +917,7 @@ tableEnv.registerDataStream("myTable2", stream, "myLong, myString");
 {% highlight scala %}
 // get TableEnvironment 
 // registration of a DataSet is equivalent
-val tableEnv = StreamTableEnvironment.create(env)
+val tableEnv: StreamTableEnvironment = ... // see "Create a TableEnvironment" section
 
 val stream: DataStream[(Long, String)] = ...
 
@@ -832,7 +943,7 @@ Instead of registering a `DataStream` or `DataSet` in a `TableEnvironment`, it c
 {% highlight java %}
 // get StreamTableEnvironment
 // registration of a DataSet in a BatchTableEnvironment is equivalent
-StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
+StreamTableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
 
 DataStream<Tuple2<Long, String>> stream = ...
 
@@ -848,7 +959,7 @@ Table table2 = tableEnv.fromDataStream(stream, "myLong, myString");
 {% highlight scala %}
 // get TableEnvironment
 // registration of a DataSet is equivalent
-val tableEnv = StreamTableEnvironment.create(env)
+val tableEnv = ... // see "Create a TableEnvironment" section
 
 val stream: DataStream[(Long, String)] = ...
 
@@ -888,7 +999,7 @@ There are two modes to convert a `Table` into a `DataStream`:
 <div data-lang="java" markdown="1">
 {% highlight java %}
 // get StreamTableEnvironment. 
-StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
+StreamTableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
 
 // Table with two fields (String name, Integer age)
 Table table = ...
@@ -918,7 +1029,7 @@ DataStream<Tuple2<Boolean, Row>> retractStream =
 {% highlight scala %}
 // get TableEnvironment. 
 // registration of a DataSet is equivalent
-val tableEnv = StreamTableEnvironment.create(env)
+val tableEnv: StreamTableEnvironment = ... // see "Create a TableEnvironment" section
 
 // Table with two fields (String name, Integer age)
 val table: Table = ...
@@ -1002,7 +1113,7 @@ When defining a position-based mapping, the specified names must not exist in th
 <div data-lang="java" markdown="1">
 {% highlight java %}
 // get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
-StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
+StreamTableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section;
 
 DataStream<Tuple2<Long, Integer>> stream = ...
 
@@ -1020,7 +1131,7 @@ Table table = tableEnv.fromDataStream(stream, "myLong, myInt");
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
 // get a TableEnvironment
-val tableEnv = StreamTableEnvironment.create(env)
+val tableEnv: StreamTableEnvironment = ... // see "Create a TableEnvironment" section
 
 val stream: DataStream[(Long, Int)] = ...
 
@@ -1046,7 +1157,7 @@ If no field names are specified, the default field names and field order of the
 <div data-lang="java" markdown="1">
 {% highlight java %}
 // get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
-StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
+StreamTableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
 
 DataStream<Tuple2<Long, Integer>> stream = ...
 
@@ -1067,7 +1178,7 @@ Table table = tableEnv.fromDataStream(stream, "f1 as myInt, f0 as myLong");
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
 // get a TableEnvironment
-val tableEnv = StreamTableEnvironment.create(env)
+val tableEnv: StreamTableEnvironment = ... // see "Create a TableEnvironment" section
 
 val stream: DataStream[(Long, Int)] = ...
 
@@ -1094,7 +1205,7 @@ Flink treats primitives (`Integer`, `Double`, `String`) or generic types (types
 <div data-lang="java" markdown="1">
 {% highlight java %}
 // get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
-StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
+StreamTableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
 
 DataStream<Long> stream = ...
 
@@ -1109,7 +1220,7 @@ Table table = tableEnv.fromDataStream(stream, "myLong");
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
 // get a TableEnvironment
-val tableEnv = StreamTableEnvironment.create(env)
+val tableEnv: StreamTableEnvironment = ... // see "Create a TableEnvironment" section
 
 val stream: DataStream[Long] = ...
 
@@ -1130,7 +1241,7 @@ Flink supports Scala's built-in tuples and provides its own tuple classes for Ja
 <div data-lang="java" markdown="1">
 {% highlight java %}
 // get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
-StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
+StreamTableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
 
 DataStream<Tuple2<Long, String>> stream = ...
 
@@ -1154,7 +1265,7 @@ Table table = tableEnv.fromDataStream(stream, "f1 as 'myString', f0 as 'myLong'"
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
 // get a TableEnvironment
-val tableEnv = StreamTableEnvironment.create(env)
+val tableEnv: StreamTableEnvironment = ... // see "Create a TableEnvironment" section
 
 val stream: DataStream[(Long, String)] = ...
 
@@ -1200,7 +1311,7 @@ When converting a POJO `DataStream` or `DataSet` into a `Table` without specifyi
 <div data-lang="java" markdown="1">
 {% highlight java %}
 // get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
-StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
+StreamTableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
 
 // Person is a POJO with fields "name" and "age"
 DataStream<Person> stream = ...
@@ -1222,7 +1333,7 @@ Table table = tableEnv.fromDataStream(stream, "name as myName");
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
 // get a TableEnvironment
-val tableEnv = StreamTableEnvironment.create(env)
+val tableEnv: StreamTableEnvironment = ... // see "Create a TableEnvironment" section
 
 // Person is a POJO with field names "name" and "age"
 val stream: DataStream[Person] = ...
@@ -1250,7 +1361,7 @@ The `Row` data type supports an arbitrary number of fields and fields with `null
 <div data-lang="java" markdown="1">
 {% highlight java %}
 // get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
-StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
+StreamTableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
 
 // DataStream of Row with two fields "name" and "age" specified in `RowTypeInfo`
 DataStream<Row> stream = ...
@@ -1275,7 +1386,7 @@ Table table = tableEnv.fromDataStream(stream, "name as myName");
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
 // get a TableEnvironment
-val tableEnv = StreamTableEnvironment.create(env)
+val tableEnv: StreamTableEnvironment = ... // see "Create a TableEnvironment" section
 
 // DataStream of Row with two fields "name" and "age" specified in `RowTypeInfo`
 val stream: DataStream[Row] = ...
@@ -1304,20 +1415,51 @@ val table: Table = tableEnv.fromDataStream(stream, 'name as 'myName)
 Query Optimization
 ------------------
 
-Apache Flink leverages Apache Calcite to optimize and translate queries. The optimization currently performed include projection and filter push-down, subquery decorrelation, and other kinds of query rewriting. Flink does not yet optimize the order of joins, but executes them in the same order as defined in the query (order of Tables in the `FROM` clause and/or order of join predicates in the `WHERE` clause).
+<div class="codetabs" markdown="1">
+<div data-lang="Old planner" markdown="1">
+
+Apache Flink leverages Apache Calcite to optimize and translate queries. The optimization currently performed include projection and filter push-down, subquery decorrelation, and other kinds of query rewriting. Old planner does not yet optimize the order of joins, but executes them in the same order as defined in the query (order of Tables in the `FROM` clause and/or order of join predicates in the `WHERE` clause).
+
+It is possible to tweak the set of optimization rules which are applied in different phases by providing a `CalciteConfig` object. This can be created via a builder by calling `CalciteConfig.createBuilder())` and is provided to the TableEnvironment by calling `tableEnv.getConfig.setPlannerConfig(calciteConfig)`. 
+
+</div>
+
+<div data-lang="Blink planner" markdown="1">
+
+Apache Flink leverages and extends Apache Calcite to perform sophisticated query optimization.
+This includes a series of rule and cost-based optimizations such as:
+
+* Subquery decorrelation based on Apache Calcite 
+* Project pruning
+* Partition pruning
+* Filter push-down 
+* Sub-plan deduplication to avoid duplicate computation 
+* Special subquery rewriting, including two parts:
+    * Converts IN and EXISTS into left semi-joins
+    * Converts NOT IN and NOT EXISTS into left anti-join 
+* Optional join reordering
+    * Enabled via `table.optimizer.join-reorder-enabled` 
+
+**Note:** IN/EXISTS/NOT IN/NOT EXISTS are currently only supported in conjunctive conditions in subquery rewriting.
+
+The optimizer makes intelligent decisions, based not only on the plan but also rich statistics available from the data sources and fine-grain costs for each operator such as io, cpu, network, and memory. 
+
+Advanced users may provide custom optimizations via a `CalciteConfig` object that can be provided to the table environment by calling `TableEnvironment#getConfig#setPlannerConfig`.
+
+</div>
+</div>
 
-It is possible to tweak the set of optimization rules which are applied in different phases by providing a `CalciteConfig` object. This can be created via a builder by calling `CalciteConfig.createBuilder())` and is provided to the TableEnvironment by calling `tableEnv.getConfig.setCalciteConfig(calciteConfig)`. 
 
 ### Explaining a Table
 
 The Table API provides a mechanism to explain the logical and optimized query plans to compute a `Table`. 
-This is done through the `TableEnvironment.explain(table)` method. It returns a String describing three plans: 
+This is done through the `TableEnvironment.explain(table)` method or `TableEnvironment.explain()` method. `explain(table)` returns the plan of a given `Table`. `explain()` returns the result of a multiple-sinks plan and is mainly used for the Blink planner. It returns a String describing three plans: 
 
 1. the Abstract Syntax Tree of the relational query, i.e., the unoptimized logical query plan,
 2. the optimized logical query plan, and
 3. the physical execution plan.
 
-The following code shows an example and the corresponding output:
+The following code shows an example and the corresponding output for given `Table` using `explain(table)`:
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
@@ -1495,6 +1637,163 @@ Stage 4 : Data Source
 </div>
 </div>
 
+The following code shows an example and the corresponding output for multiple-sinks plan using `explain()`:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+
+EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
+TableEnvironment tEnv = TableEnvironment.create(settings);
+
+String[] fieldNames = { "count", "word" };
+TypeInformation[] fieldTypes = { Types.INT, Types.STRING };
+tEnv.registerTableSource("MySource1", new CsvTableSource("/source/path1", fieldNames, fieldTypes));
+tEnv.registerTableSource("MySource2", new CsvTableSource("/source/path2", fieldNames, fieldTypes));
+tEnv.registerTableSink("MySink1", new CsvTableSink("/sink/path1").configure(fieldNames, fieldTypes));
+tEnv.registerTableSink("MySink2", new CsvTableSink("/sink/path2").configure(fieldNames, fieldTypes));
+
+Table table1 = tEnv.scan("MySource1").where("LIKE(word, 'F%')");
+table1.insertInto("MySink1");
+
+Table table2 = table1.unionAll(tEnv.scan("MySource2"));
+table2.insertInto("MySink2");
+
+String explanation = tEnv.explain(false);
+System.out.println(explanation);
+
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val settings = EnvironmentSettings.newInstance.useBlinkPlanner.inStreamingMode.build
+val tEnv = TableEnvironment.create(settings)
+
+val fieldNames = Array("count", "word")
+val fieldTypes = Array[TypeInformation[_]](Types.INT, Types.STRING)
+tEnv.registerTableSource("MySource1", new CsvTableSource("/source/path1", fieldNames, fieldTypes))
+tEnv.registerTableSource("MySource2", new CsvTableSource("/source/path2",fieldNames, fieldTypes))
+tEnv.registerTableSink("MySink1", new CsvTableSink("/sink/path1").configure(fieldNames, fieldTypes))
+tEnv.registerTableSink("MySink2", new CsvTableSink("/sink/path2").configure(fieldNames, fieldTypes))
+
+val table1 = tEnv.scan("MySource1").where("LIKE(word, 'F%')")
+table1.insertInto("MySink1")
+
+val table2 = table1.unionAll(tEnv.scan("MySource2"))
+table2.insertInto("MySink2")
+
+val explanation = tEnv.explain(false)
+println(explanation)
+
+{% endhighlight %}
+</div>
+
+<div data-lang="python" markdown="1">
+{% highlight python %}
+settings = EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build()
+t_env = TableEnvironment.create(environment_settings=settings)
+
+field_names = ["count", "word"]
+field_types = [DataTypes.INT(), DataTypes.STRING()]
+t_env.register_table_source("MySource1", CsvTableSource("/source/path1", field_names, field_types))
+t_env.register_table_source("MySource2", CsvTableSource("/source/path2", field_names, field_types))
+t_env.register_table_sink("MySink1", CsvTableSink("/sink/path1", field_names, field_types))
+t_env.register_table_sink("MySink2", CsvTableSink("/sink/path2", field_names, field_types))
+            
+table1 = t_env.scan("MySource1").where("LIKE(word, 'F%')")
+table1.insert_into("MySink1")
+
+table2 = table1.union_all(t_env.scan("MySource2"))
+table2.insert_into("MySink2")
+
+explanation = t_env.explain()
+print(explanation)
+{% endhighlight %}
+</div>
+</div>
+
+the result of multiple-sinks plan is
+<div>
+{% highlight text %}
+
+== Abstract Syntax Tree ==
+LogicalSink(name=[MySink1], fields=[count, word])
++- LogicalFilter(condition=[LIKE($1, _UTF-16LE'F%')])
+   +- LogicalTableScan(table=[[default_catalog, default_database, MySource1, source: [CsvTableSource(read fields: count, word)]]])
+
+LogicalSink(name=[MySink2], fields=[count, word])
++- LogicalUnion(all=[true])
+   :- LogicalFilter(condition=[LIKE($1, _UTF-16LE'F%')])
+   :  +- LogicalTableScan(table=[[default_catalog, default_database, MySource1, source: [CsvTableSource(read fields: count, word)]]])
+   +- LogicalTableScan(table=[[default_catalog, default_database, MySource2, source: [CsvTableSource(read fields: count, word)]]])
+
+== Optimized Logical Plan ==
+Calc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')], reuse_id=[1])
++- TableSourceScan(table=[[default_catalog, default_database, MySource1, source: [CsvTableSource(read fields: count, word)]]], fields=[count, word])
+
+Sink(name=[MySink1], fields=[count, word])
++- Reused(reference_id=[1])
+
+Sink(name=[MySink2], fields=[count, word])
++- Union(all=[true], union=[count, word])
+   :- Reused(reference_id=[1])
+   +- TableSourceScan(table=[[default_catalog, default_database, MySource2, source: [CsvTableSource(read fields: count, word)]]], fields=[count, word])
+
+== Physical Execution Plan ==
+Stage 1 : Data Source
+	content : collect elements with CollectionInputFormat
+
+	Stage 2 : Operator
+		content : CsvTableSource(read fields: count, word)
+		ship_strategy : REBALANCE
+
+		Stage 3 : Operator
+			content : SourceConversion(table:Buffer(default_catalog, default_database, MySource1, source: [CsvTableSource(read fields: count, word)]), fields:(count, word))
+			ship_strategy : FORWARD
+
+			Stage 4 : Operator
+				content : Calc(where: (word LIKE _UTF-16LE'F%'), select: (count, word))
+				ship_strategy : FORWARD
+
+				Stage 5 : Operator
+					content : SinkConversionToRow
+					ship_strategy : FORWARD
+
+					Stage 6 : Operator
+						content : Map
+						ship_strategy : FORWARD
+
+Stage 8 : Data Source
+	content : collect elements with CollectionInputFormat
+
+	Stage 9 : Operator
+		content : CsvTableSource(read fields: count, word)
+		ship_strategy : REBALANCE
+
+		Stage 10 : Operator
+			content : SourceConversion(table:Buffer(default_catalog, default_database, MySource2, source: [CsvTableSource(read fields: count, word)]), fields:(count, word))
+			ship_strategy : FORWARD
+
+			Stage 12 : Operator
+				content : SinkConversionToRow
+				ship_strategy : FORWARD
+
+				Stage 13 : Operator
+					content : Map
+					ship_strategy : FORWARD
+
+					Stage 7 : Data Sink
+						content : Sink: CsvTableSink(count, word)
+						ship_strategy : FORWARD
+
+						Stage 14 : Data Sink
+							content : Sink: CsvTableSink(count, word)
+							ship_strategy : FORWARD
+
+{% endhighlight %}
+</div>
+
 {% top %}
 
 
diff --git a/docs/dev/table/common.zh.md b/docs/dev/table/common.zh.md
index bc6f69c..c245373 100644
--- a/docs/dev/table/common.zh.md
+++ b/docs/dev/table/common.zh.md
@@ -27,6 +27,19 @@ The Table API and SQL are integrated in a joint API. The central concept of this
 * This will be replaced by the TOC
 {:toc}
 
+Main Differences Between the Two Planners
+-----------------------------------------
+
+1. Blink treats batch jobs as a special case of streaming. As such, the conversion between Table and DataSet is not supported, and batch jobs will not be translated into `DateSet` programs but translated into `DataStream` programs, the same as the streaming jobs.
+2. The Blink planner does not support `BatchTableSource`, uses bounded `StreamTableSource` instead of it. 
+3. The Blink planner only support the brand new `Catalog` and does not support `ExternalCatalog` which is deprecated.
+4. The implementations of `FilterableTableSource` for the old planner and the Blink planner are incompatible. The old planner will push down `PlannerExpression`s into `FilterableTableSource`, while the Blink planner will push down `Expression`s.
+5. String based key-value config options (Please see the documentation about [Configuration]({{ site.baseurl }}/dev/table/config.html) for details) are only used for the Blink planner.
+6. The implementation(`CalciteConfig`) of `PlannerConfig` in two planners is different.
+7. The Blink planner will optimize multiple-sinks into one DAG (supported only on `TableEnvironment`, not on `StreamTableEnvironment`). The old planner will always optimize each sink into a new DAG, where all DAGs are independent of each other.
+8. The old planner does not support catalog statistics now, while the Blink planner does.
+
+
 Structure of Table API and SQL Programs
 ---------------------------------------
 
@@ -35,12 +48,9 @@ All Table API and SQL programs for batch and streaming follow the same pattern.
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
-// for batch programs use ExecutionEnvironment instead of StreamExecutionEnvironment
-StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
-// create a TableEnvironment
-// for batch programs use BatchTableEnvironment instead of StreamTableEnvironment
-StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
+// create a TableEnvironment for specific planner batch or streaming
+TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
 
 // register a Table
 tableEnv.registerTable("table1", ...)            // or
@@ -58,18 +68,16 @@ Table sqlResult  = tableEnv.sqlQuery("SELECT ... FROM table2 ... ");
 tapiResult.insertInto("outputTable");
 
 // execute
-env.execute();
+tableEnv.execute("java_job");
 
 {% endhighlight %}
 </div>
 
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
-// for batch programs use ExecutionEnvironment instead of StreamExecutionEnvironment
-val env = StreamExecutionEnvironment.getExecutionEnvironment
 
-// create a TableEnvironment
-val tableEnv = StreamTableEnvironment.create(env)
+// create a TableEnvironment for specific planner batch or streaming
+val tableEnv = ... // see "Create a TableEnvironment" section
 
 // register a Table
 tableEnv.registerTable("table1", ...)           // or
@@ -87,18 +95,16 @@ val sqlResult  = tableEnv.sqlQuery("SELECT ... FROM table2 ...")
 tapiResult.insertInto("outputTable")
 
 // execute
-env.execute()
+tableEnv.execute("scala_job")
 
 {% endhighlight %}
 </div>
 
 <div data-lang="python" markdown="1">
 {% highlight python %}
-# for batch programs use ExecutionEnvironment instead of StreamExecutionEnvironment
-env = StreamExecutionEnvironment.get_execution_environment()
 
-# create a TableEnvironment
-table_env = StreamTableEnvironment.create(env)
+# create a TableEnvironment for specific planner batch or streaming
+table_env = ... # see "Create a TableEnvironment" section
 
 # register a Table
 table_env.register_table("table1", ...)           # or
@@ -142,82 +148,153 @@ A `Table` is always bound to a specific `TableEnvironment`. It is not possible t
 
 A `TableEnvironment` is created by calling the static `BatchTableEnvironment.create()` or `StreamTableEnvironment.create()` method with a `StreamExecutionEnvironment` or an `ExecutionEnvironment` and an optional `TableConfig`. The `TableConfig` can be used to configure the `TableEnvironment` or to customize the query optimization and translation process (see [Query Optimization](#query-optimization)).
 
-Make sure to choose the `BatchTableEnvironment`/`StreamTableEnvironment` that matches your programming language.
+Make sure to choose the specific planner `BatchTableEnvironment`/`StreamTableEnvironment` that matches your programming language.
+
+If both planner jars are on the classpath (the default behavior), you should explicitly set which planner to use in the current program.
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
-// ***************
-// STREAMING QUERY
-// ***************
+
+// **********************
+// FLINK STREAMING QUERY
+// **********************
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.java.StreamTableEnvironment;
 
-StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment();
-// create a TableEnvironment for streaming queries
-StreamTableEnvironment sTableEnv = StreamTableEnvironment.create(sEnv);
+EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build();
+StreamExecutionEnvironment fsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
+StreamTableEnvironment fsTableEnv = StreamTableEnvironment.create(fsEnv, fsSettings);
+// or TableEnvironment fsTableEnv = TableEnvironment.create(fsSettings);
 
-// ***********
-// BATCH QUERY
-// ***********
-import org.apache.flink.api.java.ExecutionEnvironment
+// ******************
+// FLINK BATCH QUERY
+// ******************
+import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.table.api.java.BatchTableEnvironment;
 
-ExecutionEnvironment bEnv = ExecutionEnvironment.getExecutionEnvironment();
-// create a TableEnvironment for batch queries
-BatchTableEnvironment bTableEnv = BatchTableEnvironment.create(bEnv);
+ExecutionEnvironment fbEnv = ExecutionEnvironment.getExecutionEnvironment();
+BatchTableEnvironment fbTableEnv = BatchTableEnvironment.create(fbEnv);
+
+// **********************
+// BLINK STREAMING QUERY
+// **********************
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.java.StreamTableEnvironment;
+
+StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
+EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
+StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);
+// or TableEnvironment bsTableEnv = TableEnvironment.create(bsSettings);
+
+// ******************
+// BLINK BATCH QUERY
+// ******************
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableEnvironment;
+
+EnvironmentSettings bbSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
+TableEnvironment bbTableEnv = TableEnvironment.create(bbSettings);
+
 {% endhighlight %}
 </div>
 
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
-// ***************
-// STREAMING QUERY
-// ***************
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
+
+// **********************
+// FLINK STREAMING QUERY
+// **********************
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.table.api.EnvironmentSettings
 import org.apache.flink.table.api.scala.StreamTableEnvironment
 
-val sEnv = StreamExecutionEnvironment.getExecutionEnvironment
-// create a TableEnvironment for streaming queries
-val sTableEnv = StreamTableEnvironment.create(sEnv)
+val fsSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build()
+val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment
+val fsTableEnv = StreamTableEnvironment.create(fsEnv, fsSettings)
+// or val fsTableEnv = TableEnvironment.create(fsSettings)
 
-// ***********
-// BATCH QUERY
-// ***********
-import org.apache.flink.api.java.ExecutionEnvironment
+// ******************
+// FLINK BATCH QUERY
+// ******************
+import org.apache.flink.api.scala.ExecutionEnvironment
 import org.apache.flink.table.api.scala.BatchTableEnvironment
 
-val bEnv = ExecutionEnvironment.getExecutionEnvironment
-// create a TableEnvironment for batch queries
-val bTableEnv = BatchTableEnvironment.create(bEnv)
+val fbEnv = ExecutionEnvironment.getExecutionEnvironment
+val fbTableEnv = BatchTableEnvironment.create(fbEnv)
+
+// **********************
+// BLINK STREAMING QUERY
+// **********************
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.table.api.EnvironmentSettings
+import org.apache.flink.table.api.scala.StreamTableEnvironment
+
+val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
+val bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
+val bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings)
+// or val bsTableEnv = TableEnvironment.create(bsSettings)
+
+// ******************
+// BLINK BATCH QUERY
+// ******************
+import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment}
+
+val bbSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build()
+val bbTableEnv = TableEnvironment.create(bbSettings)
+
 {% endhighlight %}
 </div>
 
 <div data-lang="python" markdown="1">
 {% highlight python %}
-# ***************
-# STREAMING QUERY
-# ***************
+
+# **********************
+# FLINK STREAMING QUERY
+# **********************
 from pyflink.datastream import StreamExecutionEnvironment
-from pyflink.table import StreamTableEnvironment
+from pyflink.table import StreamTableEnvironment, EnvironmentSettings
 
-s_env = StreamExecutionEnvironment.get_execution_environment()
-# create a TableEnvironment for streaming queries
-s_table_env = StreamTableEnvironment.create(s_env)
+f_s_env = StreamExecutionEnvironment.get_execution_environment()
+f_s_settings = EnvironmentSettings.new_instance().use_old_planner().in_streaming_mode().build()
+f_s_t_env = StreamTableEnvironment.create(f_s_env, environment_settings=f_s_settings)
 
-# ***********
-# BATCH QUERY
-# ***********
+# ******************
+# FLINK BATCH QUERY
+# ******************
 from pyflink.dataset import ExecutionEnvironment
 from pyflink.table import BatchTableEnvironment
 
-b_env = ExecutionEnvironment.get_execution_environment()
-# create a TableEnvironment for batch queries
-b_table_env = BatchTableEnvironment.create(b_env)
+f_b_env = ExecutionEnvironment.get_execution_environment()
+f_b_t_env = BatchTableEnvironment.create(f_b_env, table_config)
+
+# **********************
+# BLINK STREAMING QUERY
+# **********************
+from pyflink.datastream import StreamExecutionEnvironment
+from pyflink.table import StreamTableEnvironment, EnvironmentSettings
+
+b_s_env = StreamExecutionEnvironment.get_execution_environment()
+b_s_settings = EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build()
+b_s_t_env = StreamTableEnvironment.create(b_s_env, environment_settings=b_s_settings)
+
+# ******************
+# BLINK BATCH QUERY
+# ******************
+from pyflink.table import EnvironmentSettings, BatchTableEnvironment
+
+b_b_settings = EnvironmentSettings.new_instance().use_blink_planner().in_batch_mode().build()
+b_b_t_env = BatchTableEnvironment.create(environment_settings=b_b_settings)
+
 {% endhighlight %}
 </div>
 </div>
 
+**Note:** If there is only one planner jar in `/lib` directory, you can use `useAnyPlanner` (`use_any_planner` for python) to create specific `EnvironmentSettings`.
+
+
 {% top %}
 
 Register Tables in the Catalog
@@ -229,7 +306,7 @@ An input table can be registered from various sources:
 
 * an existing `Table` object, usually the result of a Table API or SQL query.
 * a `TableSource`, which accesses external data, such as a file, database, or messaging system. 
-* a `DataStream` or `DataSet` from a DataStream or DataSet program. Registering a `DataStream` or `DataSet` is discussed in the [Integration with DataStream and DataSet API](#integration-with-datastream-and-dataset-api) section.
+* a `DataStream` or `DataSet` from a DataStream (only for stream job) or DataSet (only for batch job translated from old planner) program. Registering a `DataStream` or `DataSet` is discussed in the [Integration with DataStream and DataSet API](#integration-with-datastream-and-dataset-api) section.
 
 An output table can be registered using a `TableSink`.
 
@@ -240,10 +317,10 @@ A `Table` is registered in a `TableEnvironment` as follows:
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
-// get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
-StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
+// get a TableEnvironment
+TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
 
-// Table is the result of a simple projection query 
+// table is the result of a simple projection query 
 Table projTable = tableEnv.scan("X").select(...);
 
 // register the Table projTable as table "projectedTable"
@@ -254,9 +331,9 @@ tableEnv.registerTable("projectedTable", projTable);
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
 // get a TableEnvironment
-val tableEnv = StreamTableEnvironment.create(env)
+val tableEnv = ... // see "Create a TableEnvironment" section
 
-// Table is the result of a simple projection query 
+// table is the result of a simple projection query 
 val projTable: Table = tableEnv.scan("X").select(...)
 
 // register the Table projTable as table "projectedTable"
@@ -267,9 +344,9 @@ tableEnv.registerTable("projectedTable", projTable)
 <div data-lang="python" markdown="1">
 {% highlight python %}
 # get a TableEnvironment
-table_env = StreamTableEnvironment.create(env)
+table_env = ... # see "Create a TableEnvironment" section
 
-# Table is the result of a simple projection query 
+# table is the result of a simple projection query 
 proj_table = table_env.scan("X").select(...)
 
 # register the Table projTable as table "projectedTable"
@@ -293,8 +370,8 @@ A `TableSource` is registered in a `TableEnvironment` as follows:
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
-// get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
-StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
+// get a TableEnvironment
+TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
 
 // create a TableSource
 TableSource csvSource = new CsvTableSource("/path/to/file", ...);
@@ -307,7 +384,7 @@ tableEnv.registerTableSource("CsvTable", csvSource);
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
 // get a TableEnvironment
-val tableEnv = StreamTableEnvironment.create(env)
+val tableEnv = ... // see "Create a TableEnvironment" section
 
 // create a TableSource
 val csvSource: TableSource = new CsvTableSource("/path/to/file", ...)
@@ -320,7 +397,7 @@ tableEnv.registerTableSource("CsvTable", csvSource)
 <div data-lang="python" markdown="1">
 {% highlight python %}
 # get a TableEnvironment
-table_env = StreamTableEnvironment.create(env)
+table_env = ... # see "Create a TableEnvironment" section
 
 # create a TableSource
 csv_source = CsvTableSource("/path/to/file", ...)
@@ -331,6 +408,8 @@ table_env.register_table_source("csvTable", csv_source)
 </div>
 </div>
 
+**Note:** A `TableEnvironment` used for Blink planner only accepts `StreamTableSource`, `LookupableTableSource` and `InputFormatTableSource`, and a `StreamTableSource` used for Blink planner on batch must be bounded.
+
 {% top %}
 
 ### Register a TableSink
@@ -344,8 +423,8 @@ A `TableSink` is registered in a `TableEnvironment` as follows:
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
-// get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
-StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
+// get a TableEnvironment
+TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
 
 // create a TableSink
 TableSink csvSink = new CsvTableSink("/path/to/file", ...);
@@ -362,7 +441,7 @@ tableEnv.registerTableSink("CsvSinkTable", fieldNames, fieldTypes, csvSink);
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
 // get a TableEnvironment
-val tableEnv = StreamTableEnvironment.create(env)
+val tableEnv = ... // see "Create a TableEnvironment" section
 
 // create a TableSink
 val csvSink: TableSink = new CsvTableSink("/path/to/file", ...)
@@ -379,7 +458,7 @@ tableEnv.registerTableSink("CsvSinkTable", fieldNames, fieldTypes, csvSink)
 <div data-lang="python" markdown="1">
 {% highlight python %}
 # get a TableEnvironment
-table_env = StreamTableEnvironment.create(env)
+table_env = ... # see "Create a TableEnvironment" section
 
 # define the field names and types
 field_names = ["a", "b", "c"]
@@ -406,8 +485,8 @@ An external catalog can be created by implementing the `ExternalCatalog` interfa
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
-// get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
-StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
+// get a TableEnvironment
+TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
 
 // create an external catalog
 ExternalCatalog catalog = new InMemoryExternalCatalog();
@@ -420,7 +499,7 @@ tableEnv.registerExternalCatalog("InMemCatalog", catalog);
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
 // get a TableEnvironment
-val tableEnv = StreamTableEnvironment.create(env)
+val tableEnv = ... // see "Create a TableEnvironment" section
 
 // create an external catalog
 val catalog: ExternalCatalog = new InMemoryExternalCatalog
@@ -435,6 +514,8 @@ Once registered in a `TableEnvironment`, all tables defined in a `ExternalCatalo
 
 Currently, Flink provides an `InMemoryExternalCatalog` for demo and testing purposes. However, the `ExternalCatalog` interface can also be used to connect catalogs like HCatalog or Metastore to the Table API.
 
+**Note:** The Blink planner does not support external catalog.
+
 {% top %}
 
 Query a Table 
@@ -453,8 +534,8 @@ The following example shows a simple Table API aggregation query:
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
-// get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
-StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
+// get a TableEnvironment
+TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
 
 // register Orders table
 
@@ -474,7 +555,7 @@ Table revenue = orders
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
 // get a TableEnvironment
-val tableEnv = StreamTableEnvironment.create(env)
+val tableEnv = ... // see "Create a TableEnvironment" section
 
 // register Orders table
 
@@ -495,8 +576,8 @@ val revenue = orders
 
 <div data-lang="python" markdown="1">
 {% highlight python %}
-# get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
-table_env = StreamTableEnvironment.create(env)
+# get a TableEnvironment
+table_env = # see "Create a TableEnvironment" section
 
 # register Orders table
 
@@ -527,8 +608,8 @@ The following example shows how to specify a query and return the result as a `T
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
-// get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
-StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
+// get a TableEnvironment
+TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
 
 // register Orders table
 
@@ -548,7 +629,7 @@ Table revenue = tableEnv.sqlQuery(
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
 // get a TableEnvironment
-val tableEnv = StreamTableEnvironment.create(env)
+val tableEnv = ... // see "Create a TableEnvironment" section
 
 // register Orders table
 
@@ -568,8 +649,8 @@ val revenue = tableEnv.sqlQuery("""
 
 <div data-lang="python" markdown="1">
 {% highlight python %}
-# get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
-table_env = StreamTableEnvironment.create(env)
+# get a TableEnvironment
+table_env = ... # see "Create a TableEnvironment" section
 
 # register Orders table
 
@@ -592,8 +673,8 @@ The following example shows how to specify an update query that inserts its resu
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
-// get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
-StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
+// get a TableEnvironment
+TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
 
 // register "Orders" table
 // register "RevenueFrance" output table
@@ -614,7 +695,7 @@ tableEnv.sqlUpdate(
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
 // get a TableEnvironment
-val tableEnv = StreamTableEnvironment.create(env)
+val tableEnv = ... // see "Create a TableEnvironment" section
 
 // register "Orders" table
 // register "RevenueFrance" output table
@@ -635,8 +716,8 @@ tableEnv.sqlUpdate("""
 
 <div data-lang="python" markdown="1">
 {% highlight python %}
-# get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
-table_env = StreamTableEnvironment.create(env)
+# get a TableEnvironment
+table_env = ... # see "Create a TableEnvironment" section
 
 # register "Orders" table
 # register "RevenueFrance" output table
@@ -682,8 +763,8 @@ The following examples shows how to emit a `Table`:
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
-// get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
-StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
+// get a TableEnvironment
+TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
 
 // create a TableSink
 TableSink sink = new CsvTableSink("/path/to/file", fieldDelim = "|");
@@ -705,7 +786,7 @@ result.insertInto("CsvSinkTable");
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
 // get a TableEnvironment
-val tableEnv = StreamTableEnvironment.create(env)
+val tableEnv = ... // see "Create a TableEnvironment" section
 
 // create a TableSink
 val sink: TableSink = new CsvTableSink("/path/to/file", fieldDelim = "|")
@@ -728,7 +809,7 @@ result.insertInto("CsvSinkTable")
 <div data-lang="python" markdown="1">
 {% highlight python %}
 # get a TableEnvironment
-table_env = StreamTableEnvironment.create(env)
+table_env = ... # see "Create a TableEnvironment" section
 
 field_names = ["a", "b", "c"]
 field_types = [DataTypes.INT(), DataTypes.STRING(), DataTypes.BIGINT()]
@@ -755,10 +836,14 @@ result.insert_into("CsvSinkTable")
 Translate and Execute a Query
 -----------------------------
 
+The behavior of translating and executing a query is different for the two planners.
+
+<div class="codetabs" markdown="1">
+<div data-lang="Old planner" markdown="1">
 Table API and SQL queries are translated into [DataStream]({{ site.baseurl }}/dev/datastream_api.html) or [DataSet]({{ site.baseurl }}/dev/batch) programs depending on whether their input is a streaming or batch input. A query is internally represented as a logical query plan and is translated in two phases: 
 
-1. optimization of the logical plan, 
-2. translation into a DataStream or DataSet program.
+1. Optimization of the logical plan
+2. Translation into a DataStream or DataSet program
 
 A Table API or SQL query is translated when:
 
@@ -768,11 +853,37 @@ A Table API or SQL query is translated when:
 
 Once translated, a Table API or SQL query is handled like a regular DataStream or DataSet program and is executed when `StreamExecutionEnvironment.execute()` or `ExecutionEnvironment.execute()` is called.
 
+</div>
+
+<div data-lang="Blink planner" markdown="1">
+Table API and SQL queries are translated into [DataStream]({{ site.baseurl }}/dev/datastream_api.html) programs whether their input is streaming or batch. A query is internally represented as a logical query plan and is translated in two phases: 
+
+1. Optimization of the logical plan, 
+2. Translation into a DataStream program.
+
+The behavior of translating  a query is different for `TableEnvironment` and `StreamTableEnvironment`.
+
+For `TableEnvironment`, A Table API or SQL query is translated when `TableEnvironment.execute()` is called, because `TableEnvironment` will optimize multiple-sinks into one DAG.
+
+while for `StreamTableEnvironment`, A Table API or SQL query is translated when:
+
+* a `Table` is emitted to a `TableSink`, i.e., when `Table.insertInto()` is called.
+* a SQL update query is specified, i.e., when `TableEnvironment.sqlUpdate()` is called.
+* a `Table` is converted into a `DataStream`.
+
+Once translated, a Table API or SQL query is handled like a regular DataStream program and is executed when `TableEnvironment.execute()` or `StreamExecutionEnvironment.execute()` is called.
+
+</div>
+</div>
+
 {% top %}
 
 Integration with DataStream and DataSet API
 -------------------------------------------
 
+Both planners on stream can integrate with the `DataStream` API. Only old planner can integrate with the `DataSet API`, Blink planner on batch could not be combined with both.
+**Note:** The `DataSet` API discussed below is only relevant for the old planner on batch.
+
 Table API and SQL queries can be easily integrated with and embedded into [DataStream]({{ site.baseurl }}/dev/datastream_api.html) and [DataSet]({{ site.baseurl }}/dev/batch) programs. For instance, it is possible to query an external table (for example from a RDBMS), do some pre-processing, such as filtering, projecting, aggregating, or joining with meta data, and then further process the data with either the DataStream or DataSet API (and any of the libraries built on top of these APIs [...]
 
 This interaction can be achieved by converting a `DataStream` or `DataSet` into a `Table` and vice versa. In this section, we describe how these conversions are done.
@@ -790,7 +901,7 @@ A `DataStream` or `DataSet` can be registered in a `TableEnvironment` as a Table
 {% highlight java %}
 // get StreamTableEnvironment
 // registration of a DataSet in a BatchTableEnvironment is equivalent
-StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
+StreamTableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
 
 DataStream<Tuple2<Long, String>> stream = ...
 
@@ -806,7 +917,7 @@ tableEnv.registerDataStream("myTable2", stream, "myLong, myString");
 {% highlight scala %}
 // get TableEnvironment 
 // registration of a DataSet is equivalent
-val tableEnv = StreamTableEnvironment.create(env)
+val tableEnv: StreamTableEnvironment = ... // see "Create a TableEnvironment" section
 
 val stream: DataStream[(Long, String)] = ...
 
@@ -832,7 +943,7 @@ Instead of registering a `DataStream` or `DataSet` in a `TableEnvironment`, it c
 {% highlight java %}
 // get StreamTableEnvironment
 // registration of a DataSet in a BatchTableEnvironment is equivalent
-StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
+StreamTableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
 
 DataStream<Tuple2<Long, String>> stream = ...
 
@@ -848,7 +959,7 @@ Table table2 = tableEnv.fromDataStream(stream, "myLong, myString");
 {% highlight scala %}
 // get TableEnvironment
 // registration of a DataSet is equivalent
-val tableEnv = StreamTableEnvironment.create(env)
+val tableEnv = ... // see "Create a TableEnvironment" section
 
 val stream: DataStream[(Long, String)] = ...
 
@@ -888,7 +999,7 @@ There are two modes to convert a `Table` into a `DataStream`:
 <div data-lang="java" markdown="1">
 {% highlight java %}
 // get StreamTableEnvironment. 
-StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
+StreamTableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
 
 // Table with two fields (String name, Integer age)
 Table table = ...
@@ -918,7 +1029,7 @@ DataStream<Tuple2<Boolean, Row>> retractStream =
 {% highlight scala %}
 // get TableEnvironment. 
 // registration of a DataSet is equivalent
-val tableEnv = StreamTableEnvironment.create(env)
+val tableEnv: StreamTableEnvironment = ... // see "Create a TableEnvironment" section
 
 // Table with two fields (String name, Integer age)
 val table: Table = ...
@@ -1002,7 +1113,7 @@ When defining a position-based mapping, the specified names must not exist in th
 <div data-lang="java" markdown="1">
 {% highlight java %}
 // get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
-StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
+StreamTableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section;
 
 DataStream<Tuple2<Long, Integer>> stream = ...
 
@@ -1020,7 +1131,7 @@ Table table = tableEnv.fromDataStream(stream, "myLong, myInt");
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
 // get a TableEnvironment
-val tableEnv = StreamTableEnvironment.create(env)
+val tableEnv: StreamTableEnvironment = ... // see "Create a TableEnvironment" section
 
 val stream: DataStream[(Long, Int)] = ...
 
@@ -1046,7 +1157,7 @@ If no field names are specified, the default field names and field order of the
 <div data-lang="java" markdown="1">
 {% highlight java %}
 // get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
-StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
+StreamTableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
 
 DataStream<Tuple2<Long, Integer>> stream = ...
 
@@ -1067,7 +1178,7 @@ Table table = tableEnv.fromDataStream(stream, "f1 as myInt, f0 as myLong");
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
 // get a TableEnvironment
-val tableEnv = StreamTableEnvironment.create(env)
+val tableEnv: StreamTableEnvironment = ... // see "Create a TableEnvironment" section
 
 val stream: DataStream[(Long, Int)] = ...
 
@@ -1094,7 +1205,7 @@ Flink treats primitives (`Integer`, `Double`, `String`) or generic types (types
 <div data-lang="java" markdown="1">
 {% highlight java %}
 // get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
-StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
+StreamTableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
 
 DataStream<Long> stream = ...
 
@@ -1109,7 +1220,7 @@ Table table = tableEnv.fromDataStream(stream, "myLong");
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
 // get a TableEnvironment
-val tableEnv = StreamTableEnvironment.create(env)
+val tableEnv: StreamTableEnvironment = ... // see "Create a TableEnvironment" section
 
 val stream: DataStream[Long] = ...
 
@@ -1130,7 +1241,7 @@ Flink supports Scala's built-in tuples and provides its own tuple classes for Ja
 <div data-lang="java" markdown="1">
 {% highlight java %}
 // get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
-StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
+StreamTableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
 
 DataStream<Tuple2<Long, String>> stream = ...
 
@@ -1154,7 +1265,7 @@ Table table = tableEnv.fromDataStream(stream, "f1 as 'myString', f0 as 'myLong'"
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
 // get a TableEnvironment
-val tableEnv = StreamTableEnvironment.create(env)
+val tableEnv: StreamTableEnvironment = ... // see "Create a TableEnvironment" section
 
 val stream: DataStream[(Long, String)] = ...
 
@@ -1200,7 +1311,7 @@ When converting a POJO `DataStream` or `DataSet` into a `Table` without specifyi
 <div data-lang="java" markdown="1">
 {% highlight java %}
 // get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
-StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
+StreamTableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
 
 // Person is a POJO with fields "name" and "age"
 DataStream<Person> stream = ...
@@ -1222,7 +1333,7 @@ Table table = tableEnv.fromDataStream(stream, "name as myName");
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
 // get a TableEnvironment
-val tableEnv = StreamTableEnvironment.create(env)
+val tableEnv: StreamTableEnvironment = ... // see "Create a TableEnvironment" section
 
 // Person is a POJO with field names "name" and "age"
 val stream: DataStream[Person] = ...
@@ -1250,7 +1361,7 @@ The `Row` data type supports an arbitrary number of fields and fields with `null
 <div data-lang="java" markdown="1">
 {% highlight java %}
 // get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
-StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
+StreamTableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
 
 // DataStream of Row with two fields "name" and "age" specified in `RowTypeInfo`
 DataStream<Row> stream = ...
@@ -1275,7 +1386,7 @@ Table table = tableEnv.fromDataStream(stream, "name as myName");
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
 // get a TableEnvironment
-val tableEnv = StreamTableEnvironment.create(env)
+val tableEnv: StreamTableEnvironment = ... // see "Create a TableEnvironment" section
 
 // DataStream of Row with two fields "name" and "age" specified in `RowTypeInfo`
 val stream: DataStream[Row] = ...
@@ -1304,20 +1415,51 @@ val table: Table = tableEnv.fromDataStream(stream, 'name as 'myName)
 Query Optimization
 ------------------
 
-Apache Flink leverages Apache Calcite to optimize and translate queries. The optimization currently performed include projection and filter push-down, subquery decorrelation, and other kinds of query rewriting. Flink does not yet optimize the order of joins, but executes them in the same order as defined in the query (order of Tables in the `FROM` clause and/or order of join predicates in the `WHERE` clause).
+<div class="codetabs" markdown="1">
+<div data-lang="Old planner" markdown="1">
+
+Apache Flink leverages Apache Calcite to optimize and translate queries. The optimization currently performed include projection and filter push-down, subquery decorrelation, and other kinds of query rewriting. Old planner does not yet optimize the order of joins, but executes them in the same order as defined in the query (order of Tables in the `FROM` clause and/or order of join predicates in the `WHERE` clause).
+
+It is possible to tweak the set of optimization rules which are applied in different phases by providing a `CalciteConfig` object. This can be created via a builder by calling `CalciteConfig.createBuilder())` and is provided to the TableEnvironment by calling `tableEnv.getConfig.setPlannerConfig(calciteConfig)`. 
+
+</div>
+
+<div data-lang="Blink planner" markdown="1">
+
+Apache Flink leverages and extends Apache Calcite to perform sophisticated query optimization.
+This includes a series of rule and cost-based optimizations such as:
+
+* Subquery decorrelation based on Apache Calcite 
+* Project pruning
+* Partition pruning
+* Filter push-down 
+* Sub-plan deduplication to avoid duplicate computation 
+* Special subquery rewriting, including two parts:
+    * Converts IN and EXISTS into left semi-joins
+    * Converts NOT IN and NOT EXISTS into left anti-join 
+* Optional join reordering
+    * Enabled via `table.optimizer.join-reorder-enabled` 
+
+**Note:** IN/EXISTS/NOT IN/NOT EXISTS are currently only supported in conjunctive conditions in subquery rewriting.
+
+The optimizer makes intelligent decisions, based not only on the plan but also rich statistics available from the data sources and fine-grain costs for each operator such as io, cpu, network, and memory. 
+
+Advanced users may provide custom optimizations via a `CalciteConfig` object that can be provided to the table environment by calling `TableEnvironment#getConfig#setPlannerConfig`.
+
+</div>
+</div>
 
-It is possible to tweak the set of optimization rules which are applied in different phases by providing a `CalciteConfig` object. This can be created via a builder by calling `CalciteConfig.createBuilder())` and is provided to the TableEnvironment by calling `tableEnv.getConfig.setCalciteConfig(calciteConfig)`. 
 
 ### Explaining a Table
 
 The Table API provides a mechanism to explain the logical and optimized query plans to compute a `Table`. 
-This is done through the `TableEnvironment.explain(table)` method. It returns a String describing three plans: 
+This is done through the `TableEnvironment.explain(table)` method or `TableEnvironment.explain()` method. `explain(table)` returns the plan of a given `Table`. `explain()` returns the result of a multiple-sinks plan and is mainly used for the Blink planner. It returns a String describing three plans: 
 
 1. the Abstract Syntax Tree of the relational query, i.e., the unoptimized logical query plan,
 2. the optimized logical query plan, and
 3. the physical execution plan.
 
-The following code shows an example and the corresponding output:
+The following code shows an example and the corresponding output for given `Table` using `explain(table)`:
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
@@ -1495,6 +1637,163 @@ Stage 4 : Data Source
 </div>
 </div>
 
+The following code shows an example and the corresponding output for multiple-sinks plan using `explain()`:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+
+EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
+TableEnvironment tEnv = TableEnvironment.create(settings);
+
+String[] fieldNames = { "count", "word" };
+TypeInformation[] fieldTypes = { Types.INT, Types.STRING };
+tEnv.registerTableSource("MySource1", new CsvTableSource("/source/path1", fieldNames, fieldTypes));
+tEnv.registerTableSource("MySource2", new CsvTableSource("/source/path2", fieldNames, fieldTypes));
+tEnv.registerTableSink("MySink1", new CsvTableSink("/sink/path1").configure(fieldNames, fieldTypes));
+tEnv.registerTableSink("MySink2", new CsvTableSink("/sink/path2").configure(fieldNames, fieldTypes));
+
+Table table1 = tEnv.scan("MySource1").where("LIKE(word, 'F%')");
+table1.insertInto("MySink1");
+
+Table table2 = table1.unionAll(tEnv.scan("MySource2"));
+table2.insertInto("MySink2");
+
+String explanation = tEnv.explain(false);
+System.out.println(explanation);
+
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val settings = EnvironmentSettings.newInstance.useBlinkPlanner.inStreamingMode.build
+val tEnv = TableEnvironment.create(settings)
+
+val fieldNames = Array("count", "word")
+val fieldTypes = Array[TypeInformation[_]](Types.INT, Types.STRING)
+tEnv.registerTableSource("MySource1", new CsvTableSource("/source/path1", fieldNames, fieldTypes))
+tEnv.registerTableSource("MySource2", new CsvTableSource("/source/path2",fieldNames, fieldTypes))
+tEnv.registerTableSink("MySink1", new CsvTableSink("/sink/path1").configure(fieldNames, fieldTypes))
+tEnv.registerTableSink("MySink2", new CsvTableSink("/sink/path2").configure(fieldNames, fieldTypes))
+
+val table1 = tEnv.scan("MySource1").where("LIKE(word, 'F%')")
+table1.insertInto("MySink1")
+
+val table2 = table1.unionAll(tEnv.scan("MySource2"))
+table2.insertInto("MySink2")
+
+val explanation = tEnv.explain(false)
+println(explanation)
+
+{% endhighlight %}
+</div>
+
+<div data-lang="python" markdown="1">
+{% highlight python %}
+settings = EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build()
+t_env = TableEnvironment.create(environment_settings=settings)
+
+field_names = ["count", "word"]
+field_types = [DataTypes.INT(), DataTypes.STRING()]
+t_env.register_table_source("MySource1", CsvTableSource("/source/path1", field_names, field_types))
+t_env.register_table_source("MySource2", CsvTableSource("/source/path2", field_names, field_types))
+t_env.register_table_sink("MySink1", CsvTableSink("/sink/path1", field_names, field_types))
+t_env.register_table_sink("MySink2", CsvTableSink("/sink/path2", field_names, field_types))
+            
+table1 = t_env.scan("MySource1").where("LIKE(word, 'F%')")
+table1.insert_into("MySink1")
+
+table2 = table1.union_all(t_env.scan("MySource2"))
+table2.insert_into("MySink2")
+
+explanation = t_env.explain()
+print(explanation)
+{% endhighlight %}
+</div>
+</div>
+
+the result of multiple-sinks plan is
+<div>
+{% highlight text %}
+
+== Abstract Syntax Tree ==
+LogicalSink(name=[MySink1], fields=[count, word])
++- LogicalFilter(condition=[LIKE($1, _UTF-16LE'F%')])
+   +- LogicalTableScan(table=[[default_catalog, default_database, MySource1, source: [CsvTableSource(read fields: count, word)]]])
+
+LogicalSink(name=[MySink2], fields=[count, word])
++- LogicalUnion(all=[true])
+   :- LogicalFilter(condition=[LIKE($1, _UTF-16LE'F%')])
+   :  +- LogicalTableScan(table=[[default_catalog, default_database, MySource1, source: [CsvTableSource(read fields: count, word)]]])
+   +- LogicalTableScan(table=[[default_catalog, default_database, MySource2, source: [CsvTableSource(read fields: count, word)]]])
+
+== Optimized Logical Plan ==
+Calc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')], reuse_id=[1])
++- TableSourceScan(table=[[default_catalog, default_database, MySource1, source: [CsvTableSource(read fields: count, word)]]], fields=[count, word])
+
+Sink(name=[MySink1], fields=[count, word])
++- Reused(reference_id=[1])
+
+Sink(name=[MySink2], fields=[count, word])
++- Union(all=[true], union=[count, word])
+   :- Reused(reference_id=[1])
+   +- TableSourceScan(table=[[default_catalog, default_database, MySource2, source: [CsvTableSource(read fields: count, word)]]], fields=[count, word])
+
+== Physical Execution Plan ==
+Stage 1 : Data Source
+	content : collect elements with CollectionInputFormat
+
+	Stage 2 : Operator
+		content : CsvTableSource(read fields: count, word)
+		ship_strategy : REBALANCE
+
+		Stage 3 : Operator
+			content : SourceConversion(table:Buffer(default_catalog, default_database, MySource1, source: [CsvTableSource(read fields: count, word)]), fields:(count, word))
+			ship_strategy : FORWARD
+
+			Stage 4 : Operator
+				content : Calc(where: (word LIKE _UTF-16LE'F%'), select: (count, word))
+				ship_strategy : FORWARD
+
+				Stage 5 : Operator
+					content : SinkConversionToRow
+					ship_strategy : FORWARD
+
+					Stage 6 : Operator
+						content : Map
+						ship_strategy : FORWARD
+
+Stage 8 : Data Source
+	content : collect elements with CollectionInputFormat
+
+	Stage 9 : Operator
+		content : CsvTableSource(read fields: count, word)
+		ship_strategy : REBALANCE
+
+		Stage 10 : Operator
+			content : SourceConversion(table:Buffer(default_catalog, default_database, MySource2, source: [CsvTableSource(read fields: count, word)]), fields:(count, word))
+			ship_strategy : FORWARD
+
+			Stage 12 : Operator
+				content : SinkConversionToRow
+				ship_strategy : FORWARD
+
+				Stage 13 : Operator
+					content : Map
+					ship_strategy : FORWARD
+
+					Stage 7 : Data Sink
+						content : Sink: CsvTableSink(count, word)
+						ship_strategy : FORWARD
+
+						Stage 14 : Data Sink
+							content : Sink: CsvTableSink(count, word)
+							ship_strategy : FORWARD
+
+{% endhighlight %}
+</div>
+
 {% top %}