You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ji...@apache.org on 2019/07/01 04:06:02 UTC

[flink] branch master updated: [FLINK-12897][python][docs] Improve the Python Table API docs by adding more examples.

This is an automated email from the ASF dual-hosted git repository.

jincheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 534d53b  [FLINK-12897][python][docs] Improve the Python Table API docs by adding more examples.
534d53b is described below

commit 534d53bf828404ef3dcd7aad9e7a8b6528606254
Author: Wei Zhong <we...@gmail.com>
AuthorDate: Thu Jun 27 20:30:36 2019 +0800

    [FLINK-12897][python][docs] Improve the Python Table API docs by adding more examples.
    
    This closes #8916
---
 docs/dev/event_time.md                             |  11 +
 docs/dev/event_time.zh.md                          |  11 +
 docs/dev/event_timestamps_watermarks.md            |   6 +
 docs/dev/event_timestamps_watermarks.zh.md         |   6 +
 docs/dev/stream/state/checkpointing.md             |  28 ++
 docs/dev/stream/state/checkpointing.zh.md          |  28 ++
 docs/dev/stream/state/state_backends.md            |   6 +
 docs/dev/stream/state/state_backends.zh.md         |   6 +
 docs/dev/table/common.md                           | 210 ++++++++++++-
 docs/dev/table/common.zh.md                        | 206 ++++++++++++-
 docs/dev/table/connect.md                          | 335 ++++++++++++++++++++
 docs/dev/table/connect.zh.md                       | 336 +++++++++++++++++++++
 docs/dev/table/functions.md                        |  40 ++-
 docs/dev/table/functions.zh.md                     |  40 ++-
 docs/dev/table/sql.md                              |  24 ++
 docs/dev/table/sql.zh.md                           |  25 ++
 docs/dev/table/streaming/query_configuration.md    |  35 +++
 docs/dev/table/streaming/query_configuration.zh.md |  35 +++
 docs/dev/table/streaming/time_attributes.md        |  11 +
 docs/dev/table/streaming/time_attributes.zh.md     |  11 +
 docs/dev/table/tableApi.md                         |  12 +-
 docs/dev/table/tableApi.zh.md                      |  12 +-
 .../pyflink/dataset/execution_environment.py       |   8 +-
 .../datastream/stream_execution_environment.py     |   4 +-
 flink-python/pyflink/table/__init__.py             |   3 +-
 flink-python/pyflink/table/descriptors.py          |  10 +-
 flink-python/pyflink/table/query_config.py         |   9 +
 flink-python/pyflink/table/sinks.py                |  12 +-
 flink-python/pyflink/table/table.py                |  17 +-
 flink-python/pyflink/table/table_environment.py    | 167 ++++++++--
 30 files changed, 1564 insertions(+), 100 deletions(-)

diff --git a/docs/dev/event_time.md b/docs/dev/event_time.md
index 1d747aa..5790746 100644
--- a/docs/dev/event_time.md
+++ b/docs/dev/event_time.md
@@ -131,6 +131,17 @@ stream
     .addSink(...)
 {% endhighlight %}
 </div>
+<div data-lang="python" markdown="1">
+{% highlight python %}
+env = StreamExecutionEnvironment.get_execution_environment()
+
+env.set_stream_time_characteristic(TimeCharacteristic.ProcessingTime)
+
+# alternatively:
+# env.set_stream_time_characteristic(TimeCharacteristic.IngestionTime)
+# env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
+{% endhighlight %}
+</div>
 </div>
 
 
diff --git a/docs/dev/event_time.zh.md b/docs/dev/event_time.zh.md
index 1d747aa..5790746 100644
--- a/docs/dev/event_time.zh.md
+++ b/docs/dev/event_time.zh.md
@@ -131,6 +131,17 @@ stream
     .addSink(...)
 {% endhighlight %}
 </div>
+<div data-lang="python" markdown="1">
+{% highlight python %}
+env = StreamExecutionEnvironment.get_execution_environment()
+
+env.set_stream_time_characteristic(TimeCharacteristic.ProcessingTime)
+
+# alternatively:
+# env.set_stream_time_characteristic(TimeCharacteristic.IngestionTime)
+# env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
+{% endhighlight %}
+</div>
 </div>
 
 
diff --git a/docs/dev/event_timestamps_watermarks.md b/docs/dev/event_timestamps_watermarks.md
index cb1c5d4..fdb716b 100644
--- a/docs/dev/event_timestamps_watermarks.md
+++ b/docs/dev/event_timestamps_watermarks.md
@@ -44,6 +44,12 @@ val env = StreamExecutionEnvironment.getExecutionEnvironment
 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
 {% endhighlight %}
 </div>
+<div data-lang="python" markdown="1">
+{% highlight python %}
+env = StreamExecutionEnvironment.get_execution_environment()
+env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
+{% endhighlight %}
+</div>
 </div>
 
 ## Assigning Timestamps
diff --git a/docs/dev/event_timestamps_watermarks.zh.md b/docs/dev/event_timestamps_watermarks.zh.md
index cb1c5d4..fdb716b 100644
--- a/docs/dev/event_timestamps_watermarks.zh.md
+++ b/docs/dev/event_timestamps_watermarks.zh.md
@@ -44,6 +44,12 @@ val env = StreamExecutionEnvironment.getExecutionEnvironment
 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
 {% endhighlight %}
 </div>
+<div data-lang="python" markdown="1">
+{% highlight python %}
+env = StreamExecutionEnvironment.get_execution_environment()
+env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
+{% endhighlight %}
+</div>
 </div>
 
 ## Assigning Timestamps
diff --git a/docs/dev/stream/state/checkpointing.md b/docs/dev/stream/state/checkpointing.md
index fab1a49..c193fc3 100644
--- a/docs/dev/stream/state/checkpointing.md
+++ b/docs/dev/stream/state/checkpointing.md
@@ -132,6 +132,34 @@ env.getCheckpointConfig.setFailTasksOnCheckpointingErrors(false)
 env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
 {% endhighlight %}
 </div>
+<div data-lang="python" markdown="1">
+{% highlight python %}
+env = StreamExecutionEnvironment.get_execution_environment()
+
+# start a checkpoint every 1000 ms
+env.enable_checkpointing(1000)
+
+# advanced options:
+
+# set mode to exactly-once (this is the default)
+env.get_checkpoint_config().set_checkpointing_mode(CheckpointingMode.EXACTLY_ONCE)
+
+# make sure 500 ms of progress happen between checkpoints
+env.get_checkpoint_config().set_min_pause_between_checkpoints(500)
+
+# checkpoints have to complete within one minute, or are discarded
+env.get_checkpoint_config().set_checkpoint_timeout(60000)
+
+# allow only one checkpoint to be in progress at the same time
+env.get_checkpoint_config().set_max_concurrent_checkpoints(1)
+
+# enable externalized checkpoints which are retained after job cancellation
+env.get_checkpoint_config().enable_externalized_checkpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
+
+# allow job recovery fallback to checkpoint when there is a more recent savepoint
+env.get_checkpoint_config().set_prefer_checkpoint_for_recovery(True)
+{% endhighlight %}
+</div>
 </div>
 
 ### Related Config Options
diff --git a/docs/dev/stream/state/checkpointing.zh.md b/docs/dev/stream/state/checkpointing.zh.md
index fab1a49..c193fc3 100644
--- a/docs/dev/stream/state/checkpointing.zh.md
+++ b/docs/dev/stream/state/checkpointing.zh.md
@@ -132,6 +132,34 @@ env.getCheckpointConfig.setFailTasksOnCheckpointingErrors(false)
 env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
 {% endhighlight %}
 </div>
+<div data-lang="python" markdown="1">
+{% highlight python %}
+env = StreamExecutionEnvironment.get_execution_environment()
+
+# start a checkpoint every 1000 ms
+env.enable_checkpointing(1000)
+
+# advanced options:
+
+# set mode to exactly-once (this is the default)
+env.get_checkpoint_config().set_checkpointing_mode(CheckpointingMode.EXACTLY_ONCE)
+
+# make sure 500 ms of progress happen between checkpoints
+env.get_checkpoint_config().set_min_pause_between_checkpoints(500)
+
+# checkpoints have to complete within one minute, or are discarded
+env.get_checkpoint_config().set_checkpoint_timeout(60000)
+
+# allow only one checkpoint to be in progress at the same time
+env.get_checkpoint_config().set_max_concurrent_checkpoints(1)
+
+# enable externalized checkpoints which are retained after job cancellation
+env.get_checkpoint_config().enable_externalized_checkpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
+
+# allow job recovery fallback to checkpoint when there is a more recent savepoint
+env.get_checkpoint_config().set_prefer_checkpoint_for_recovery(True)
+{% endhighlight %}
+</div>
 </div>
 
 ### Related Config Options
diff --git a/docs/dev/stream/state/state_backends.md b/docs/dev/stream/state/state_backends.md
index 8e32f8e..4cd6d06 100644
--- a/docs/dev/stream/state/state_backends.md
+++ b/docs/dev/stream/state/state_backends.md
@@ -43,6 +43,12 @@ val env = StreamExecutionEnvironment.getExecutionEnvironment()
 env.setStateBackend(...)
 {% endhighlight %}
 </div>
+<div data-lang="python" markdown="1">
+{% highlight python %}
+env = StreamExecutionEnvironment.get_execution_environment()
+env.set_state_backend(...)
+{% endhighlight %}
+</div>
 </div>
 
 {% top %}
diff --git a/docs/dev/stream/state/state_backends.zh.md b/docs/dev/stream/state/state_backends.zh.md
index 8e32f8e..4cd6d06 100644
--- a/docs/dev/stream/state/state_backends.zh.md
+++ b/docs/dev/stream/state/state_backends.zh.md
@@ -43,6 +43,12 @@ val env = StreamExecutionEnvironment.getExecutionEnvironment()
 env.setStateBackend(...)
 {% endhighlight %}
 </div>
+<div data-lang="python" markdown="1">
+{% highlight python %}
+env = StreamExecutionEnvironment.get_execution_environment()
+env.set_state_backend(...)
+{% endhighlight %}
+</div>
 </div>
 
 {% top %}
diff --git a/docs/dev/table/common.md b/docs/dev/table/common.md
index d6655f7..705a978 100644
--- a/docs/dev/table/common.md
+++ b/docs/dev/table/common.md
@@ -80,7 +80,7 @@ tableEnv.registerTableSink("outputTable", ...);
 
 // create a Table from a Table API query
 val tapiResult = tableEnv.scan("table1").select(...)
-// Create a Table from a SQL query
+// create a Table from a SQL query
 val sqlResult  = tableEnv.sqlQuery("SELECT ... FROM table2 ...")
 
 // emit a Table API result Table to a TableSink, same for SQL result
@@ -91,6 +91,35 @@ env.execute()
 
 {% endhighlight %}
 </div>
+
+<div data-lang="python" markdown="1">
+{% highlight python %}
+# for batch programs use ExecutionEnvironment instead of StreamExecutionEnvironment
+env = StreamExecutionEnvironment.get_execution_environment()
+
+# create a TableEnvironment
+table_env = StreamTableEnvironment.create(env)
+
+# register a Table
+table_env.register_table("table1", ...)           # or
+table_env.register_table_source("table2", ...)
+
+# register an output Table
+table_env.register_table_sink("outputTable", ...);
+
+# create a Table from a Table API query
+tapi_result = table_env.scan("table1").select(...)
+# create a Table from a SQL query
+sql_result  = table_env.sql_query("SELECT ... FROM table2 ...")
+
+# emit a Table API result Table to a TableSink, same for SQL result
+tapi_result.insert_into("outputTable")
+
+# execute
+env.execute()
+
+{% endhighlight %}
+</div>
 </div>
 
 **Note:** Table API and SQL queries can be easily integrated with and embedded into DataStream or DataSet programs. Have a look at the [Integration with DataStream and DataSet API](#integration-with-datastream-and-dataset-api) section to learn how DataStreams and DataSets can be converted into Tables and vice versa.
@@ -121,6 +150,7 @@ Make sure to choose the `BatchTableEnvironment`/`StreamTableEnvironment` that ma
 // ***************
 // STREAMING QUERY
 // ***************
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.java.StreamTableEnvironment;
 
 StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment();
@@ -130,6 +160,7 @@ StreamTableEnvironment sTableEnv = StreamTableEnvironment.create(sEnv);
 // ***********
 // BATCH QUERY
 // ***********
+import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.table.api.java.BatchTableEnvironment;
 
 ExecutionEnvironment bEnv = ExecutionEnvironment.getExecutionEnvironment();
@@ -143,6 +174,7 @@ BatchTableEnvironment bTableEnv = BatchTableEnvironment.create(bEnv);
 // ***************
 // STREAMING QUERY
 // ***************
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
 import org.apache.flink.table.api.scala.StreamTableEnvironment
 
 val sEnv = StreamExecutionEnvironment.getExecutionEnvironment
@@ -152,6 +184,7 @@ val sTableEnv = StreamTableEnvironment.create(sEnv)
 // ***********
 // BATCH QUERY
 // ***********
+import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.table.api.scala.BatchTableEnvironment
 
 val bEnv = ExecutionEnvironment.getExecutionEnvironment
@@ -159,6 +192,30 @@ val bEnv = ExecutionEnvironment.getExecutionEnvironment
 val bTableEnv = BatchTableEnvironment.create(bEnv)
 {% endhighlight %}
 </div>
+
+<div data-lang="python" markdown="1">
+{% highlight python %}
+# ***************
+# STREAMING QUERY
+# ***************
+from pyflink.datastream import StreamExecutionEnvironment
+from pyflink.table import StreamTableEnvironment
+
+s_env = StreamExecutionEnvironment.get_execution_environment()
+# create a TableEnvironment for streaming queries
+s_table_env = StreamTableEnvironment.create(s_env)
+
+# ***********
+# BATCH QUERY
+# ***********
+from pyflink.dataset import ExecutionEnvironment
+from pyflink.table import BatchTableEnvironment
+
+b_env = ExecutionEnvironment.get_execution_environment()
+# create a TableEnvironment for batch queries
+b_table_env = BatchTableEnvironment.create(b_env)
+{% endhighlight %}
+</div>
 </div>
 
 {% top %}
@@ -186,10 +243,10 @@ A `Table` is registered in a `TableEnvironment` as follows:
 // get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
 StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
 
-// Table is the result of a simple projection query 
+// table is the result of a simple projection query 
 Table projTable = tableEnv.scan("X").select(...);
 
-// register the Table projTable as table "projectedX"
+// register the Table projTable as table "projectedTable"
 tableEnv.registerTable("projectedTable", projTable);
 {% endhighlight %}
 </div>
@@ -199,13 +256,26 @@ tableEnv.registerTable("projectedTable", projTable);
 // get a TableEnvironment
 val tableEnv = StreamTableEnvironment.create(env)
 
-// Table is the result of a simple projection query 
+// table is the result of a simple projection query 
 val projTable: Table = tableEnv.scan("X").select(...)
 
-// register the Table projTable as table "projectedX"
+// register the Table projTable as table "projectedTable"
 tableEnv.registerTable("projectedTable", projTable)
 {% endhighlight %}
 </div>
+
+<div data-lang="python" markdown="1">
+{% highlight python %}
+# get a TableEnvironment
+table_env = StreamTableEnvironment.create(env)
+
+# table is the result of a simple projection query 
+proj_table = table_env.scan("X").select(...)
+
+# register the Table projTable as table "projectedTable"
+table_env.register_table("projectedTable", proj_table)
+{% endhighlight %}
+</div>
 </div>
 
 **Note:** A registered `Table` is treated similarly to a `VIEW` as known from relational database systems, i.e., the query that defines the `Table` is not optimized but will be inlined when another query references the registered `Table`. If multiple queries reference the same registered `Table`, it will be inlined for each referencing query and executed multiple times, i.e., the result of the registered `Table` will *not* be shared.
@@ -246,6 +316,19 @@ val csvSource: TableSource = new CsvTableSource("/path/to/file", ...)
 tableEnv.registerTableSource("CsvTable", csvSource)
 {% endhighlight %}
 </div>
+
+<div data-lang="python" markdown="1">
+{% highlight python %}
+# get a TableEnvironment
+table_env = StreamTableEnvironment.create(env)
+
+# create a TableSource
+csv_source = CsvTableSource("/path/to/file", ...)
+
+# register the TableSource as table "csvTable"
+table_env.register_table_source("csvTable", csv_source)
+{% endhighlight %}
+</div>
 </div>
 
 {% top %}
@@ -292,6 +375,23 @@ val fieldTypes: Array[TypeInformation[_]] = Array(Types.INT, Types.STRING, Types
 tableEnv.registerTableSink("CsvSinkTable", fieldNames, fieldTypes, csvSink)
 {% endhighlight %}
 </div>
+
+<div data-lang="python" markdown="1">
+{% highlight python %}
+# get a TableEnvironment
+table_env = StreamTableEnvironment.create(env)
+
+# define the field names and types
+field_names = ["a", "b", "c"]
+field_types = [DataTypes.INT(), DataTypes.STRING(), DataTypes.BIGINT()]
+
+# create a TableSink
+csv_sink = CsvTableSink(field_names, field_types, "/path/to/file", ...)
+
+# register the TableSink as table "CsvSinkTable"
+table_env.register_table_sink("CsvSinkTable", csv_sink)
+{% endhighlight %}
+</div>
 </div>
 
 {% top %}
@@ -392,6 +492,26 @@ val revenue = orders
 
 **Note:** The Scala Table API uses Scala Symbols, which start with a single tick (`'`) to reference the attributes of a `Table`. The Table API uses Scala implicits. Make sure to import `org.apache.flink.api.scala._` and `org.apache.flink.table.api.scala._` in order to use Scala implicit conversions.
 </div>
+
+<div data-lang="python" markdown="1">
+{% highlight python %}
+# get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
+table_env = StreamTableEnvironment.create(env)
+
+# register Orders table
+
+# scan registered Orders table
+orders = table_env.scan("Orders")
+# compute revenue for all customers from France
+revenue = orders \
+    .filter("cCountry === 'FRANCE'") \
+    .group_by("cID, cName") \
+    .select("cID, cName, revenue.sum AS revSum")
+
+# emit or convert Table
+# execute query
+{% endhighlight %}
+</div>
 </div>
 
 {% top %}
@@ -445,6 +565,26 @@ val revenue = tableEnv.sqlQuery("""
 {% endhighlight %}
 
 </div>
+
+<div data-lang="python" markdown="1">
+{% highlight python %}
+# get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
+table_env = StreamTableEnvironment.create(env)
+
+# register Orders table
+
+# compute revenue for all customers from France
+revenue = table_env.sql_query(
+    "SELECT cID, cName, SUM(revenue) AS revSum "
+    "FROM Orders "
+    "WHERE cCountry = 'FRANCE' "
+    "GROUP BY cID, cName"
+)
+
+# emit or convert Table
+# execute query
+{% endhighlight %}
+</div>
 </div>
 
 The following example shows how to specify an update query that inserts its result into a registered table.
@@ -492,6 +632,27 @@ tableEnv.sqlUpdate("""
 {% endhighlight %}
 
 </div>
+
+<div data-lang="python" markdown="1">
+{% highlight python %}
+# get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
+table_env = StreamTableEnvironment.create(env)
+
+# register "Orders" table
+# register "RevenueFrance" output table
+
+# compute revenue for all customers from France and emit to "RevenueFrance"
+table_env.sql_update(
+    "INSERT INTO RevenueFrance "
+    "SELECT cID, cName, SUM(revenue) AS revSum "
+    "FROM Orders "
+    "WHERE cCountry = 'FRANCE' "
+    "GROUP BY cID, cName"
+)
+
+# execute query
+{% endhighlight %}
+</div>
 </div>
 
 {% top %}
@@ -563,6 +724,29 @@ result.insertInto("CsvSinkTable")
 // execute the program
 {% endhighlight %}
 </div>
+
+<div data-lang="python" markdown="1">
+{% highlight python %}
+# get a TableEnvironment
+table_env = StreamTableEnvironment.create(env)
+
+field_names = ["a", "b", "c"]
+field_types = [DataTypes.INT(), DataTypes.STRING(), DataTypes.BIGINT()]
+
+# create a TableSink
+sink = CsvTableSink(field_names, field_types, "/path/to/file", "|")
+
+table_env.register_table_sink("CsvSinkTable", sink)
+
+# compute a result Table using Table API operators and/or SQL queries
+result = ...
+
+# emit the result Table to the registered TableSink
+result.insert_into("CsvSinkTable")
+
+# execute the program
+{% endhighlight %}
+</div>
 </div>
 
 {% top %}
@@ -1170,6 +1354,22 @@ val explanation: String = tEnv.explain(table)
 println(explanation)
 {% endhighlight %}
 </div>
+
+<div data-lang="python" markdown="1">
+{% highlight python %}
+env = StreamExecutionEnvironment.get_execution_environment()
+t_env = StreamTableEnvironment.create(env)
+
+table1 = t_env.from_elements([(1, "hello")], ["count", "word"])
+table2 = t_env.from_elements([(1, "hello")], ["count", "word"])
+table = table1 \
+    .where("LIKE(word, 'F%')") \
+    .union_all(table2)
+
+explanation = t_env.explain(table)
+print(explanation)
+{% endhighlight %}
+</div>
 </div>
 
 {% highlight text %}
diff --git a/docs/dev/table/common.zh.md b/docs/dev/table/common.zh.md
index 5ab8664..2de8f74 100644
--- a/docs/dev/table/common.zh.md
+++ b/docs/dev/table/common.zh.md
@@ -80,7 +80,7 @@ tableEnv.registerTableSink("outputTable", ...);
 
 // create a Table from a Table API query
 val tapiResult = tableEnv.scan("table1").select(...)
-// Create a Table from a SQL query
+// create a Table from a SQL query
 val sqlResult  = tableEnv.sqlQuery("SELECT ... FROM table2 ...")
 
 // emit a Table API result Table to a TableSink, same for SQL result
@@ -91,6 +91,35 @@ env.execute()
 
 {% endhighlight %}
 </div>
+
+<div data-lang="python" markdown="1">
+{% highlight python %}
+# for batch programs use ExecutionEnvironment instead of StreamExecutionEnvironment
+env = StreamExecutionEnvironment.get_execution_environment()
+
+# create a TableEnvironment
+table_env = StreamTableEnvironment.create(env)
+
+# register a Table
+table_env.register_table("table1", ...)           # or
+table_env.register_table_source("table2", ...)
+
+# register an output Table
+table_env.register_table_sink("outputTable", ...);
+
+# create a Table from a Table API query
+tapi_result = table_env.scan("table1").select(...)
+# create a Table from a SQL query
+sql_result  = table_env.sql_query("SELECT ... FROM table2 ...")
+
+# emit a Table API result Table to a TableSink, same for SQL result
+tapi_result.insert_into("outputTable")
+
+# execute
+env.execute()
+
+{% endhighlight %}
+</div>
 </div>
 
 **Note:** Table API and SQL queries can be easily integrated with and embedded into DataStream or DataSet programs. Have a look at the [Integration with DataStream and DataSet API](#integration-with-datastream-and-dataset-api) section to learn how DataStreams and DataSets can be converted into Tables and vice versa.
@@ -121,6 +150,7 @@ Make sure to choose the `BatchTableEnvironment`/`StreamTableEnvironment` that ma
 // ***************
 // STREAMING QUERY
 // ***************
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.java.StreamTableEnvironment;
 
 StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment();
@@ -130,6 +160,7 @@ StreamTableEnvironment sTableEnv = StreamTableEnvironment.create(sEnv);
 // ***********
 // BATCH QUERY
 // ***********
+import org.apache.flink.api.java.ExecutionEnvironment
 import org.apache.flink.table.api.java.BatchTableEnvironment;
 
 ExecutionEnvironment bEnv = ExecutionEnvironment.getExecutionEnvironment();
@@ -143,6 +174,7 @@ BatchTableEnvironment bTableEnv = BatchTableEnvironment.create(bEnv);
 // ***************
 // STREAMING QUERY
 // ***************
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
 import org.apache.flink.table.api.scala.StreamTableEnvironment
 
 val sEnv = StreamExecutionEnvironment.getExecutionEnvironment
@@ -152,6 +184,7 @@ val sTableEnv = StreamTableEnvironment.create(sEnv)
 // ***********
 // BATCH QUERY
 // ***********
+import org.apache.flink.api.java.ExecutionEnvironment
 import org.apache.flink.table.api.scala.BatchTableEnvironment
 
 val bEnv = ExecutionEnvironment.getExecutionEnvironment
@@ -159,6 +192,30 @@ val bEnv = ExecutionEnvironment.getExecutionEnvironment
 val bTableEnv = BatchTableEnvironment.create(bEnv)
 {% endhighlight %}
 </div>
+
+<div data-lang="python" markdown="1">
+{% highlight python %}
+# ***************
+# STREAMING QUERY
+# ***************
+from pyflink.datastream import StreamExecutionEnvironment
+from pyflink.table import StreamTableEnvironment
+
+s_env = StreamExecutionEnvironment.get_execution_environment()
+# create a TableEnvironment for streaming queries
+s_table_env = StreamTableEnvironment.create(s_env)
+
+# ***********
+# BATCH QUERY
+# ***********
+from pyflink.dataset import ExecutionEnvironment
+from pyflink.table import BatchTableEnvironment
+
+b_env = ExecutionEnvironment.get_execution_environment()
+# create a TableEnvironment for batch queries
+b_table_env = BatchTableEnvironment.create(b_env)
+{% endhighlight %}
+</div>
 </div>
 
 {% top %}
@@ -189,7 +246,7 @@ StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
 // Table is the result of a simple projection query 
 Table projTable = tableEnv.scan("X").select(...);
 
-// register the Table projTable as table "projectedX"
+// register the Table projTable as table "projectedTable"
 tableEnv.registerTable("projectedTable", projTable);
 {% endhighlight %}
 </div>
@@ -202,10 +259,23 @@ val tableEnv = StreamTableEnvironment.create(env)
 // Table is the result of a simple projection query 
 val projTable: Table = tableEnv.scan("X").select(...)
 
-// register the Table projTable as table "projectedX"
+// register the Table projTable as table "projectedTable"
 tableEnv.registerTable("projectedTable", projTable)
 {% endhighlight %}
 </div>
+
+<div data-lang="python" markdown="1">
+{% highlight python %}
+# get a TableEnvironment
+table_env = StreamTableEnvironment.create(env)
+
+# Table is the result of a simple projection query 
+proj_table = table_env.scan("X").select(...)
+
+# register the Table projTable as table "projectedTable"
+table_env.register_table("projectedTable", proj_table)
+{% endhighlight %}
+</div>
 </div>
 
 **Note:** A registered `Table` is treated similarly to a `VIEW` as known from relational database systems, i.e., the query that defines the `Table` is not optimized but will be inlined when another query references the registered `Table`. If multiple queries reference the same registered `Table`, it will be inlined for each referencing query and executed multiple times, i.e., the result of the registered `Table` will *not* be shared.
@@ -246,6 +316,19 @@ val csvSource: TableSource = new CsvTableSource("/path/to/file", ...)
 tableEnv.registerTableSource("CsvTable", csvSource)
 {% endhighlight %}
 </div>
+
+<div data-lang="python" markdown="1">
+{% highlight python %}
+# get a TableEnvironment
+table_env = StreamTableEnvironment.create(env)
+
+# create a TableSource
+csv_source = CsvTableSource("/path/to/file", ...)
+
+# register the TableSource as table "csvTable"
+table_env.register_table_source("csvTable", csv_source)
+{% endhighlight %}
+</div>
 </div>
 
 {% top %}
@@ -292,6 +375,23 @@ val fieldTypes: Array[TypeInformation[_]] = Array(Types.INT, Types.STRING, Types
 tableEnv.registerTableSink("CsvSinkTable", fieldNames, fieldTypes, csvSink)
 {% endhighlight %}
 </div>
+
+<div data-lang="python" markdown="1">
+{% highlight python %}
+# get a TableEnvironment
+table_env = StreamTableEnvironment.create(env)
+
+# define the field names and types
+field_names = ["a", "b", "c"]
+field_types = [DataTypes.INT(), DataTypes.STRING(), DataTypes.BIGINT()]
+
+# create a TableSink
+csv_sink = CsvTableSink(field_names, field_types, "/path/to/file", ...)
+
+# register the TableSink as table "CsvSinkTable"
+table_env.register_table_sink("CsvSinkTable", csv_sink)
+{% endhighlight %}
+</div>
 </div>
 
 {% top %}
@@ -392,6 +492,26 @@ val revenue = orders
 
 **Note:** The Scala Table API uses Scala Symbols, which start with a single tick (`'`) to reference the attributes of a `Table`. The Table API uses Scala implicits. Make sure to import `org.apache.flink.api.scala._` and `org.apache.flink.table.api.scala._` in order to use Scala implicit conversions.
 </div>
+
+<div data-lang="python" markdown="1">
+{% highlight python %}
+# get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
+table_env = StreamTableEnvironment.create(env)
+
+# register Orders table
+
+# scan registered Orders table
+orders = table_env.scan("Orders")
+# compute revenue for all customers from France
+revenue = orders \
+    .filter("cCountry === 'FRANCE'") \
+    .group_by("cID, cName") \
+    .select("cID, cName, revenue.sum AS revSum")
+
+# emit or convert Table
+# execute query
+{% endhighlight %}
+</div>
 </div>
 
 {% top %}
@@ -445,6 +565,26 @@ val revenue = tableEnv.sqlQuery("""
 {% endhighlight %}
 
 </div>
+
+<div data-lang="python" markdown="1">
+{% highlight python %}
+# get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
+table_env = StreamTableEnvironment.create(env)
+
+# register Orders table
+
+# compute revenue for all customers from France
+revenue = table_env.sql_query(
+    "SELECT cID, cName, SUM(revenue) AS revSum "
+    "FROM Orders "
+    "WHERE cCountry = 'FRANCE' "
+    "GROUP BY cID, cName"
+)
+
+# emit or convert Table
+# execute query
+{% endhighlight %}
+</div>
 </div>
 
 The following example shows how to specify an update query that inserts its result into a registered table.
@@ -492,6 +632,27 @@ tableEnv.sqlUpdate("""
 {% endhighlight %}
 
 </div>
+
+<div data-lang="python" markdown="1">
+{% highlight python %}
+# get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
+table_env = StreamTableEnvironment.create(env)
+
+# register "Orders" table
+# register "RevenueFrance" output table
+
+# compute revenue for all customers from France and emit to "RevenueFrance"
+table_env.sql_update(
+    "INSERT INTO RevenueFrance "
+    "SELECT cID, cName, SUM(revenue) AS revSum "
+    "FROM Orders "
+    "WHERE cCountry = 'FRANCE' "
+    "GROUP BY cID, cName"
+)
+
+# execute query
+{% endhighlight %}
+</div>
 </div>
 
 {% top %}
@@ -563,6 +724,29 @@ result.insertInto("CsvSinkTable")
 // execute the program
 {% endhighlight %}
 </div>
+
+<div data-lang="python" markdown="1">
+{% highlight python %}
+# get a TableEnvironment
+table_env = StreamTableEnvironment.create(env)
+
+field_names = ["a", "b", "c"]
+field_types = [DataTypes.INT(), DataTypes.STRING(), DataTypes.BIGINT()]
+
+# create a TableSink
+sink = CsvTableSink(field_names, field_types, "/path/to/file", "|")
+
+table_env.register_table_sink("CsvSinkTable", sink)
+
+# compute a result Table using Table API operators and/or SQL queries
+result = ...
+
+# emit the result Table to the registered TableSink
+result.insert_into("CsvSinkTable")
+
+# execute the program
+{% endhighlight %}
+</div>
 </div>
 
 {% top %}
@@ -1170,6 +1354,22 @@ val explanation: String = tEnv.explain(table)
 println(explanation)
 {% endhighlight %}
 </div>
+
+<div data-lang="python" markdown="1">
+{% highlight python %}
+env = StreamExecutionEnvironment.get_execution_environment()
+t_env = StreamTableEnvironment.create(env)
+
+table1 = t_env.from_elements([(1, "hello")], ["count", "word"])
+table2 = t_env.from_elements([(1, "hello")], ["count", "word"])
+table = table1 \
+    .where("LIKE(word, 'F%')") \
+    .union_all(table2)
+
+explanation = t_env.explain(table)
+print(explanation)
+{% endhighlight %}
+</div>
 </div>
 
 {% highlight text %}
diff --git a/docs/dev/table/connect.md b/docs/dev/table/connect.md
index 98e0891..e365ee6 100644
--- a/docs/dev/table/connect.md
+++ b/docs/dev/table/connect.md
@@ -101,6 +101,17 @@ tableEnvironment
 {% endhighlight %}
 </div>
 
+<div data-lang="python" markdown="1">
+{% highlight python %}
+table_environment \
+    .connect(...) \
+    .with_format(...) \
+    .with_schema(...) \
+    .in_append_mode() \
+    .register_table_source("MyTable")
+{% endhighlight %}
+</div>
+
 <div data-lang="YAML" markdown="1">
 {% highlight yaml %}
 name: MyTable
@@ -170,6 +181,50 @@ tableEnvironment
 {% endhighlight %}
 </div>
 
+<div data-lang="python" markdown="1">
+{% highlight python %}
+table_environment \
+    .connect(  # declare the external system to connect to
+        Kafka()
+        .version("0.10")
+        .topic("test-input")
+        .start_from_earliest()
+        .property("zookeeper.connect", "localhost:2181")
+        .property("bootstrap.servers", "localhost:9092")
+    ) \
+    .with_format(  # declare a format for this system
+        Avro()
+        .avro_schema(
+            "{"
+            "  \"namespace\": \"org.myorganization\","
+            "  \"type\": \"record\","
+            "  \"name\": \"UserMessage\","
+            "    \"fields\": ["
+            "      {\"name\": \"timestamp\", \"type\": \"string\"},"
+            "      {\"name\": \"user\", \"type\": \"long\"},"
+            "      {\"name\": \"message\", \"type\": [\"string\", \"null\"]}"
+            "    ]"
+            "}"
+        )
+    ) \
+    .with_schema(  # declare the schema of the table
+        Schema()
+        .field("rowtime", DataTypes.TIMESTAMP())
+        .rowtime(
+            Rowtime()
+            .timestamps_from_field("timestamp")
+            .watermarks_periodic_bounded(60000)
+        )
+        .field("user", DataTypes.BIGINT())
+        .field("message", DataTypes.STRING())
+    ) \
+    .in_append_mode() \
+    .register_table_source("MyUserTable")  
+    # specify the update-mode for streaming tables and
+    # register as source, sink, or both and under a name
+{% endhighlight %}
+</div>
+
 <div data-lang="YAML" markdown="1">
 {% highlight yaml %}
 tables:
@@ -248,6 +303,17 @@ The following example shows a simple schema without time attributes and one-to-o
 {% endhighlight %}
 </div>
 
+<div data-lang="python" markdown="1">
+{% highlight python %}
+.with_schema(
+    Schema()
+    .field("MyField1", DataTypes.INT())  # required: specify the fields of the table (in this order)
+    .field("MyField2", DataTypes.STRING())
+    .field("MyField3", DataTypes.BOOLEAN())
+)
+{% endhighlight %}
+</div>
+
 <div data-lang="YAML" markdown="1">
 {% highlight yaml %}
 schema:
@@ -278,6 +344,20 @@ For *each field*, the following properties can be declared in addition to the co
 {% endhighlight %}
 </div>
 
+<div data-lang="python" markdown="1">
+{% highlight python %}
+.with_schema(
+    Schema()
+    .field("MyField1", DataTypes.TIMESTAMP())
+    .proctime()  # optional: declares this field as a processing-time attribute
+    .field("MyField2", DataTypes.TIMESTAMP())
+    .rowtime(...)  # optional: declares this field as a event-time attribute
+    .field("MyField3", DataTypes.BOOLEAN())
+    .from_origin_field("mf3")  # optional: original field in the input that is referenced/aliased by this field
+)
+{% endhighlight %}
+</div>
+
 <div data-lang="YAML" markdown="1">
 {% highlight yaml %}
 schema:
@@ -330,6 +410,32 @@ The following timestamp extractors are supported:
 {% endhighlight %}
 </div>
 
+<div data-lang="python" markdown="1">
+{% highlight python %}
+# Converts an existing BIGINT or TIMESTAMP field in the input into the rowtime attribute.
+.rowtime(
+    Rowtime()
+    .timestamps_from_field("ts_field")  # required: original field name in the input
+)
+
+# Converts the assigned timestamps into the rowtime attribute
+# and thus preserves the assigned timestamps from the source.
+# This requires a source that assigns timestamps (e.g., Kafka 0.10+).
+.rowtime(
+    Rowtime()
+    .timestamps_from_source()
+)
+
+# Sets a custom timestamp extractor to be used for the rowtime attribute.
+# The extractor must extend `org.apache.flink.table.sources.tsextractors.TimestampExtractor`.
+# Due to python can not accept java object, so it requires a full-qualified class name of the extractor.
+.rowtime(
+    Rowtime()
+    .timestamps_from_extractor(...)
+)
+{% endhighlight %}
+</div>
+
 <div data-lang="YAML" markdown="1">
 {% highlight yaml %}
 # Converts an existing BIGINT or TIMESTAMP field in the input into the rowtime attribute.
@@ -376,6 +482,32 @@ The following watermark strategies are supported:
 {% endhighlight %}
 </div>
 
+<div data-lang="python" markdown="1">
+{% highlight python %}
+# Sets a watermark strategy for ascending rowtime attributes. Emits a watermark of the maximum
+# observed timestamp so far minus 1. Rows that have a timestamp equal to the max timestamp
+# are not late.
+.rowtime(
+    Rowtime()
+    .watermarks_periodic_ascending()
+)
+
+# Sets a built-in watermark strategy for rowtime attributes which are out-of-order by a bounded time interval.
+# Emits watermarks which are the maximum observed timestamp minus the specified delay.
+.rowtime(
+    Rowtime()
+    .watermarks_periodic_bounded(2000)  # delay in milliseconds
+)
+
+# Sets a built-in watermark strategy which indicates the watermarks should be preserved from the
+# underlying DataStream API and thus preserves the assigned watermarks from the source.
+.rowtime(
+    Rowtime()
+    .watermarks_from_source()
+)
+{% endhighlight %}
+</div>
+
 <div data-lang="YAML" markdown="1">
 {% highlight yaml %}
 # Sets a watermark strategy for ascending rowtime attributes. Emits a watermark of the maximum
@@ -457,6 +589,13 @@ For streaming queries, it is required to declare how to perform the [conversion
 {% endhighlight %}
 </div>
 
+<div data-lang="python" markdown="1">
+{% highlight python %}
+.connect(...) \
+    .in_append_mode()  # otherwise: in_upsert_mode() or in_retract_mode()
+{% endhighlight %}
+</div>
+
 <div data-lang="YAML" markdown="1">
 {% highlight yaml %}
 tables:
@@ -497,6 +636,15 @@ The file system connector allows for reading and writing from a local or distrib
 {% endhighlight %}
 </div>
 
+<div data-lang="python" markdown="1">
+{% highlight python %}
+.connect(
+    FileSystem()
+    .path("file:///path/to/whatever")  # required: path to a file or directory
+)
+{% endhighlight %}
+</div>
+
 <div data-lang="YAML" markdown="1">
 {% highlight yaml %}
 connector:
@@ -548,6 +696,32 @@ The Kafka connector allows for reading and writing from and to an Apache Kafka t
 {% endhighlight %}
 </div>
 
+<div data-lang="python" markdown="1">
+{% highlight python %}
+.connect(
+    Kafka()
+    .version("0.11")  # required: valid connector versions are
+                      # "0.8", "0.9", "0.10", "0.11", and "universal"
+    .topic("...")     # required: topic name from which the table is read
+    
+    # optional: connector specific properties
+    .property("zookeeper.connect", "localhost:2181")
+    .property("bootstrap.servers", "localhost:9092")
+    .property("group.id", "testGroup")
+
+    # optional: select a startup mode for Kafka offsets
+    .start_from_earliest()
+    .start_from_latest()
+    .start_from_specific_offsets(...)
+
+    # optional: output partitioning from Flink's partitions into Kafka's partitions
+    .sink_partitioner_fixed()        # each Flink partition ends up in at-most one Kafka partition (default)
+    .sink_partitioner_round_robin()  # a Flink partition is distributed to Kafka partitions round-robin
+    .sink_partitioner_custom("full.qualified.custom.class.name")  # use a custom FlinkKafkaPartitioner subclass
+)
+{% endhighlight %}
+</div>
+
 <div data-lang="YAML" markdown="1">
 {% highlight yaml %}
 connector:
@@ -648,6 +822,44 @@ The connector can be defined as follows:
 {% endhighlight %}
 </div>
 
+<div data-lang="python" markdown="1">
+{% highlight python %}
+.connect(
+    Elasticsearch()
+    .version("6")                      # required: valid connector versions are "6"
+    .host("localhost", 9200, "http")   # required: one or more Elasticsearch hosts to connect to
+    .index("MyUsers")                  # required: Elasticsearch index
+    .document_type("user")             # required: Elasticsearch document type
+
+    .key_delimiter("$")       # optional: delimiter for composite keys ("_" by default)
+                              #   e.g., "$" would result in IDs "KEY1$KEY2$KEY3"
+    .key_null_literal("n/a")  # optional: representation for null fields in keys ("null" by default)
+
+    # optional: failure handling strategy in case a request to Elasticsearch fails (fail by default)
+    .failure_handler_fail()             # optional: throws an exception if a request fails and causes a job failure
+    .failure_handler_ignore()           #   or ignores failures and drops the request
+    .failure_handler_retry_rejected()   #   or re-adds requests that have failed due to queue capacity saturation
+    .failure_handler_custom(...)        #   or custom failure handling with a ActionRequestFailureHandler subclass
+
+    # optional: configure how to buffer elements before sending them in bulk to the cluster for efficiency
+    .disable_flush_on_checkpoint()      # optional: disables flushing on checkpoint (see notes below!)
+    .bulk_flush_max_actions(42)         # optional: maximum number of actions to buffer for each bulk request
+    .bulk_flush_max_size("42 mb")       # optional: maximum size of buffered actions in bytes per bulk request
+                                        #   (only MB granularity is supported)
+    .bulk_flush_interval(60000)         # optional: bulk flush interval (in milliseconds)
+
+    .bulk_flush_backoff_constant()      # optional: use a constant backoff type
+    .bulk_flush_backoff_exponential()   #   or use an exponential backoff type
+    .bulk_flush_backoff_max_retries(3)  # optional: maximum number of retries
+    .bulk_flush_backoff_delay(30000)    # optional: delay between each backoff attempt (in milliseconds)
+
+    # optional: connection properties to be used during REST communication to Elasticsearch
+    .connection_max_retry_timeout(3)    # optional: maximum timeout (in milliseconds) between retries
+    .connection_path_prefix("/v1")      # optional: prefix string to be added to every REST communication
+)
+{% endhighlight %}
+</div>
+
 <div data-lang="YAML" markdown="1">
 {% highlight yaml %}
 connector:
@@ -754,6 +966,34 @@ The CSV format can be used as follows:
 {% endhighlight %}
 </div>
 
+<div data-lang="python" markdown="1">
+{% highlight python %}
+.with_format(
+    Csv()
+
+    # required: define the schema either by using type information
+    .schema(DataTypes.ROW(...))
+
+    # or use the table's schema
+    .derive_schema()
+
+    .field_delimiter(';')          # optional: field delimiter character (',' by default)
+    .line_delimiter("\r\n")        # optional: line delimiter ("\n" by default;
+                                   #   otherwise "\r" or "\r\n" are allowed)
+    .quote_character('\'')         # optional: quote character for enclosing field values ('"' by default)
+    .allow_comments()              # optional: ignores comment lines that start with '#' (disabled by default);
+                                   #   if enabled, make sure to also ignore parse errors to allow empty rows
+    .ignore_parse_errors()         # optional: skip fields and rows with parse errors instead of failing;
+                                   #   fields are set to null in case of errors
+    .array_element_delimiter("|")  # optional: the array element delimiter string for separating
+                                   #   array and row element values (";" by default)
+    .escape_character('\\')        # optional: escape character for escaping values (disabled by default)
+    .null_literal("n/a")           # optional: null literal string that is interpreted as a
+                                   #   null value (disabled by default)
+)
+{% endhighlight %}
+</div>
+
 <div data-lang="YAML" markdown="1">
 {% highlight yaml %}
 format:
@@ -872,6 +1112,37 @@ The JSON format can be used as follows:
 {% endhighlight %}
 </div>
 
+<div data-lang="python" markdown="1">
+{% highlight python %}
+.with_format(
+    Json()
+    .fail_on_missing_field(True)   # optional: flag whether to fail if a field is missing or not, False by default
+
+    # required: define the schema either by using type information which parses numbers to corresponding types
+    .schema(DataTypes.ROW(...))
+
+    # or by using a JSON schema which parses to DECIMAL and TIMESTAMP
+    .json_schema(
+        "{"
+        "  type: 'object',"
+        "  properties: {"
+        "    lon: {"
+        "      type: 'number'"
+        "    },"
+        "    rideTime: {"
+        "      type: 'string',"
+        "      format: 'date-time'"
+        "    }"
+        "  }"
+        "}"
+    )
+
+    # or use the table's schema
+    .derive_schema()
+)
+{% endhighlight %}
+</div>
+
 <div data-lang="YAML" markdown="1">
 {% highlight yaml %}
 format:
@@ -1004,6 +1275,29 @@ The Avro format can be used as follows:
 {% endhighlight %}
 </div>
 
+<div data-lang="python" markdown="1">
+{% highlight python %}
+.with_format(
+    Avro()
+
+    # required: define the schema either by using an Avro specific record class
+    .record_class("full.qualified.user.class.name")
+
+    # or by using an Avro schema
+    .avro_schema(
+        "{"
+        "  \"type\": \"record\","
+        "  \"name\": \"test\","
+        "  \"fields\" : ["
+        "    {\"name\": \"a\", \"type\": \"long\"},"
+        "    {\"name\": \"b\", \"type\": \"string\"}"
+        "  ]"
+        "}"
+    )
+)
+{% endhighlight %}
+</div>
+
 <div data-lang="YAML" markdown="1">
 {% highlight yaml %}
 format:
@@ -1083,6 +1377,22 @@ Use the old one for stream/batch filesystem operations for now.
 {% endhighlight %}
 </div>
 
+<div data-lang="python" markdown="1">
+{% highlight python %}
+.with_format(
+    OldCsv()
+    .field("field1", DataTypes.STRING())    # required: ordered format fields
+    .field("field2", DataTypes.TIMESTAMP())
+    .field_delimiter(",")                   # optional: string delimiter "," by default
+    .line_delimiter("\n")                   # optional: string delimiter "\n" by default
+    .quote_character('"')                   # optional: single character for string values, empty by default
+    .comment_prefix('#')                    # optional: string to indicate comments, empty by default
+    .ignore_first_line()                    # optional: ignore the first line, by default it is not skipped
+    .ignore_parse_errors()                  # optional: skip records with parse error instead of failing by default
+)
+{% endhighlight %}
+</div>
+
 <div data-lang="YAML" markdown="1">
 {% highlight yaml %}
 format:
@@ -1221,6 +1531,31 @@ val table: Table = ???
 table.insertInto("csvOutputTable")
 {% endhighlight %}
 </div>
+
+<div data-lang="python" markdown="1">
+{% highlight python %}
+
+field_names = ["f0", "f1"]
+field_types = [DataTypes.STRING(), DataTypes.INT()]
+
+sink = CsvTableSink(
+    field_names,
+    field_types,
+    path,                 # output path
+    "|",                  # optional: delimit files by '|'
+    1,                    # optional: write to a single file
+    WriteMode.OVERWRITE  # optional: override existing files
+)
+
+table_env.register_table_sink(
+    "csvOutputTable",
+    sink
+)
+
+table = ...
+table.insert_into("csvOutputTable")
+{% endhighlight %}
+</div>
 </div>
 
 ### JDBCAppendTableSink
diff --git a/docs/dev/table/connect.zh.md b/docs/dev/table/connect.zh.md
index 30dd05e..a685f72 100644
--- a/docs/dev/table/connect.zh.md
+++ b/docs/dev/table/connect.zh.md
@@ -101,6 +101,17 @@ tableEnvironment
 {% endhighlight %}
 </div>
 
+<div data-lang="python" markdown="1">
+{% highlight python %}
+table_environment \
+    .connect(...) \
+    .with_format(...) \
+    .with_schema(...) \
+    .in_append_mode() \
+    .register_table_source("MyTable")
+{% endhighlight %}
+</div>
+
 <div data-lang="YAML" markdown="1">
 {% highlight yaml %}
 name: MyTable
@@ -170,6 +181,50 @@ tableEnvironment
 {% endhighlight %}
 </div>
 
+<div data-lang="python" markdown="1">
+{% highlight python %}
+table_environment \
+    .connect(  # declare the external system to connect to
+        Kafka()
+        .version("0.10")
+        .topic("test-input")
+        .start_from_earliest()
+        .property("zookeeper.connect", "localhost:2181")
+        .property("bootstrap.servers", "localhost:9092")
+    ) \
+    .with_format(  # declare a format for this system
+        Avro()
+        .avro_schema(
+            "{"
+            "  \"namespace\": \"org.myorganization\","
+            "  \"type\": \"record\","
+            "  \"name\": \"UserMessage\","
+            "    \"fields\": ["
+            "      {\"name\": \"timestamp\", \"type\": \"string\"},"
+            "      {\"name\": \"user\", \"type\": \"long\"},"
+            "      {\"name\": \"message\", \"type\": [\"string\", \"null\"]}"
+            "    ]"
+            "}"
+        )
+    ) \
+    .with_schema(  # declare the schema of the table
+        Schema()
+        .field("rowtime", DataTypes.TIMESTAMP())
+        .rowtime(
+            Rowtime()
+            .timestamps_from_field("timestamp")
+            .watermarks_periodic_bounded(60000)
+        )
+        .field("user", DataTypes.BIGINT())
+        .field("message", DataTypes.STRING())
+    ) \
+    .in_append_mode() \
+    .register_table_source("MyUserTable")  
+    # specify the update-mode for streaming tables and
+    # register as source, sink, or both and under a name
+{% endhighlight %}
+</div>
+
 <div data-lang="YAML" markdown="1">
 {% highlight yaml %}
 tables:
@@ -248,6 +303,17 @@ The following example shows a simple schema without time attributes and one-to-o
 {% endhighlight %}
 </div>
 
+<div data-lang="python" markdown="1">
+{% highlight python %}
+.with_schema(
+    Schema()
+    .field("MyField1", DataTypes.INT())  # required: specify the fields of the table (in this order)
+    .field("MyField2", DataTypes.STRING())
+    .field("MyField3", DataTypes.BOOLEAN())
+)
+{% endhighlight %}
+</div>
+
 <div data-lang="YAML" markdown="1">
 {% highlight yaml %}
 schema:
@@ -278,6 +344,20 @@ For *each field*, the following properties can be declared in addition to the co
 {% endhighlight %}
 </div>
 
+<div data-lang="python" markdown="1">
+{% highlight python %}
+.with_schema(
+    Schema()
+    .field("MyField1", DataTypes.TIMESTAMP())
+    .proctime()  # optional: declares this field as a processing-time attribute
+    .field("MyField2", DataTypes.TIMESTAMP())
+    .rowtime(...)  # optional: declares this field as a event-time attribute
+    .field("MyField3", DataTypes.BOOLEAN())
+    .from_origin_field("mf3")  # optional: original field in the input that is referenced/aliased by this field
+)
+{% endhighlight %}
+</div>
+
 <div data-lang="YAML" markdown="1">
 {% highlight yaml %}
 schema:
@@ -330,6 +410,32 @@ The following timestamp extractors are supported:
 {% endhighlight %}
 </div>
 
+<div data-lang="python" markdown="1">
+{% highlight python %}
+# Converts an existing BIGINT or TIMESTAMP field in the input into the rowtime attribute.
+.rowtime(
+    Rowtime()
+    .timestamps_from_field("ts_field")  # required: original field name in the input
+)
+
+# Converts the assigned timestamps into the rowtime attribute
+# and thus preserves the assigned timestamps from the source.
+# This requires a source that assigns timestamps (e.g., Kafka 0.10+).
+.rowtime(
+    Rowtime()
+    .timestamps_from_source()
+)
+
+# Sets a custom timestamp extractor to be used for the rowtime attribute.
+# The extractor must extend `org.apache.flink.table.sources.tsextractors.TimestampExtractor`.
+# Due to python can not accept java object, so it requires a full-qualified class name of the extractor.
+.rowtime(
+    Rowtime()
+    .timestamps_from_extractor(...)
+)
+{% endhighlight %}
+</div>
+
 <div data-lang="YAML" markdown="1">
 {% highlight yaml %}
 # Converts an existing BIGINT or TIMESTAMP field in the input into the rowtime attribute.
@@ -376,6 +482,32 @@ The following watermark strategies are supported:
 {% endhighlight %}
 </div>
 
+<div data-lang="python" markdown="1">
+{% highlight python %}
+# Sets a watermark strategy for ascending rowtime attributes. Emits a watermark of the maximum
+# observed timestamp so far minus 1. Rows that have a timestamp equal to the max timestamp
+# are not late.
+.rowtime(
+    Rowtime()
+    .watermarks_periodic_ascending()
+)
+
+# Sets a built-in watermark strategy for rowtime attributes which are out-of-order by a bounded time interval.
+# Emits watermarks which are the maximum observed timestamp minus the specified delay.
+.rowtime(
+    Rowtime()
+    .watermarks_periodic_bounded(2000)  # delay in milliseconds
+)
+
+# Sets a built-in watermark strategy which indicates the watermarks should be preserved from the
+# underlying DataStream API and thus preserves the assigned watermarks from the source.
+.rowtime(
+    Rowtime()
+    .watermarks_from_source()
+)
+{% endhighlight %}
+</div>
+
 <div data-lang="YAML" markdown="1">
 {% highlight yaml %}
 # Sets a watermark strategy for ascending rowtime attributes. Emits a watermark of the maximum
@@ -457,6 +589,13 @@ For streaming queries, it is required to declare how to perform the [conversion
 {% endhighlight %}
 </div>
 
+<div data-lang="python" markdown="1">
+{% highlight python %}
+.connect(...) \
+    .in_append_mode()  # otherwise: in_upsert_mode() or in_retract_mode()
+{% endhighlight %}
+</div>
+
 <div data-lang="YAML" markdown="1">
 {% highlight yaml %}
 tables:
@@ -497,6 +636,15 @@ The file system connector allows for reading and writing from a local or distrib
 {% endhighlight %}
 </div>
 
+<div data-lang="python" markdown="1">
+{% highlight python %}
+.connect(
+    FileSystem()
+    .path("file:///path/to/whatever")  # required: path to a file or directory
+)
+{% endhighlight %}
+</div>
+
 <div data-lang="YAML" markdown="1">
 {% highlight yaml %}
 connector:
@@ -548,6 +696,32 @@ The Kafka connector allows for reading and writing from and to an Apache Kafka t
 {% endhighlight %}
 </div>
 
+<div data-lang="python" markdown="1">
+{% highlight python %}
+.connect(
+    Kafka()
+    .version("0.11")  # required: valid connector versions are
+                      # "0.8", "0.9", "0.10", "0.11", and "universal"
+    .topic("...")     # required: topic name from which the table is read
+    
+    # optional: connector specific properties
+    .property("zookeeper.connect", "localhost:2181")
+    .property("bootstrap.servers", "localhost:9092")
+    .property("group.id", "testGroup")
+
+    # optional: select a startup mode for Kafka offsets
+    .start_from_earliest()
+    .start_from_latest()
+    .start_from_specific_offsets(...)
+
+    # optional: output partitioning from Flink's partitions into Kafka's partitions
+    .sink_partitioner_fixed()        # each Flink partition ends up in at-most one Kafka partition (default)
+    .sink_partitioner_round_robin()  # a Flink partition is distributed to Kafka partitions round-robin
+    .sink_partitioner_custom("full.qualified.custom.class.name")  # use a custom FlinkKafkaPartitioner subclass
+)
+{% endhighlight %}
+</div>
+
 <div data-lang="YAML" markdown="1">
 {% highlight yaml %}
 connector:
@@ -648,6 +822,44 @@ The connector can be defined as follows:
 {% endhighlight %}
 </div>
 
+<div data-lang="python" markdown="1">
+{% highlight python %}
+.connect(
+    Elasticsearch()
+    .version("6")                      # required: valid connector versions are "6"
+    .host("localhost", 9200, "http")   # required: one or more Elasticsearch hosts to connect to
+    .index("MyUsers")                  # required: Elasticsearch index
+    .document_type("user")             # required: Elasticsearch document type
+
+    .key_delimiter("$")       # optional: delimiter for composite keys ("_" by default)
+                              #   e.g., "$" would result in IDs "KEY1$KEY2$KEY3"
+    .key_null_literal("n/a")  # optional: representation for null fields in keys ("null" by default)
+
+    # optional: failure handling strategy in case a request to Elasticsearch fails (fail by default)
+    .failure_handler_fail()             # optional: throws an exception if a request fails and causes a job failure
+    .failure_handler_ignore()           #   or ignores failures and drops the request
+    .failure_handler_retry_rejected()   #   or re-adds requests that have failed due to queue capacity saturation
+    .failure_handler_custom(...)        #   or custom failure handling with a ActionRequestFailureHandler subclass
+
+    # optional: configure how to buffer elements before sending them in bulk to the cluster for efficiency
+    .disable_flush_on_checkpoint()      # optional: disables flushing on checkpoint (see notes below!)
+    .bulk_flush_max_actions(42)         # optional: maximum number of actions to buffer for each bulk request
+    .bulk_flush_max_size("42 mb")       # optional: maximum size of buffered actions in bytes per bulk request
+                                        #   (only MB granularity is supported)
+    .bulk_flush_interval(60000)         # optional: bulk flush interval (in milliseconds)
+
+    .bulk_flush_backoff_constant()      # optional: use a constant backoff type
+    .bulk_flush_backoff_exponential()   #   or use an exponential backoff type
+    .bulk_flush_backoff_max_retries(3)  # optional: maximum number of retries
+    .bulk_flush_backoff_delay(30000)    # optional: delay between each backoff attempt (in milliseconds)
+
+    # optional: connection properties to be used during REST communication to Elasticsearch
+    .connection_max_retry_timeout(3)    # optional: maximum timeout (in milliseconds) between retries
+    .connection_path_prefix("/v1")      # optional: prefix string to be added to every REST communication
+)
+{% endhighlight %}
+</div>
+
 <div data-lang="YAML" markdown="1">
 {% highlight yaml %}
 connector:
@@ -754,6 +966,34 @@ The CSV format can be used as follows:
 {% endhighlight %}
 </div>
 
+<div data-lang="python" markdown="1">
+{% highlight python %}
+.with_format(
+    Csv()
+
+    # required: define the schema either by using type information
+    .schema(DataTypes.ROW(...))
+
+    # or use the table's schema
+    .derive_schema()
+
+    .field_delimiter(";")          # optional: field delimiter character ("," by default)
+    .line_delimiter("\r\n")        # optional: line delimiter ("\n" by default;
+                                   #   otherwise "\r" or "\r\n" are allowed)
+    .quote_character("'")         # optional: quote character for enclosing field values ('"' by default)
+    .allow_comments()              # optional: ignores comment lines that start with "#" (disabled by default);
+                                   #   if enabled, make sure to also ignore parse errors to allow empty rows
+    .ignore_parse_errors()         # optional: skip fields and rows with parse errors instead of failing;
+                                   #   fields are set to null in case of errors
+    .array_element_delimiter("|")  # optional: the array element delimiter string for separating
+                                   #   array and row element values (";" by default)
+    .escape_character("\\")        # optional: escape character for escaping values (disabled by default)
+    .null_literal("n/a")           # optional: null literal string that is interpreted as a
+                                   #   null value (disabled by default)
+)
+{% endhighlight %}
+</div>
+
 <div data-lang="YAML" markdown="1">
 {% highlight yaml %}
 format:
@@ -872,6 +1112,37 @@ The JSON format can be used as follows:
 {% endhighlight %}
 </div>
 
+<div data-lang="python" markdown="1">
+{% highlight python %}
+.with_format(
+    Json()
+    .fail_on_missing_field(True)   # optional: flag whether to fail if a field is missing or not, False by default
+
+    # required: define the schema either by using type information which parses numbers to corresponding types
+    .schema(DataTypes.ROW(...))
+
+    # or by using a JSON schema which parses to DECIMAL and TIMESTAMP
+    .json_schema(
+        "{"
+        "  type: 'object',"
+        "  properties: {"
+        "    lon: {"
+        "      type: 'number'"
+        "    },"
+        "    rideTime: {"
+        "      type: 'string',"
+        "      format: 'date-time'"
+        "    }"
+        "  }"
+        "}"
+    )
+
+    # or use the table's schema
+    .derive_schema()
+)
+{% endhighlight %}
+</div>
+
 <div data-lang="YAML" markdown="1">
 {% highlight yaml %}
 format:
@@ -1004,6 +1275,29 @@ The Avro format can be used as follows:
 {% endhighlight %}
 </div>
 
+<div data-lang="python" markdown="1">
+{% highlight python %}
+.with_format(
+    Avro()
+
+    # required: define the schema either by using an Avro specific record class
+    .record_class("full.qualified.user.class.name")
+
+    # or by using an Avro schema
+    .avro_schema(
+        "{"
+        "  \"type\": \"record\","
+        "  \"name\": \"test\","
+        "  \"fields\" : ["
+        "    {\"name\": \"a\", \"type\": \"long\"},"
+        "    {\"name\": \"b\", \"type\": \"string\"}"
+        "  ]"
+        "}"
+    )
+)
+{% endhighlight %}
+</div>
+
 <div data-lang="YAML" markdown="1">
 {% highlight yaml %}
 format:
@@ -1083,6 +1377,22 @@ Use the old one for stream/batch filesystem operations for now.
 {% endhighlight %}
 </div>
 
+<div data-lang="python" markdown="1">
+{% highlight python %}
+.with_format(
+    OldCsv()
+    .field("field1", DataTypes.STRING())    # required: ordered format fields
+    .field("field2", DataTypes.TIMESTAMP())
+    .field_delimiter(",")                   # optional: string delimiter "," by default
+    .line_delimiter("\n")                   # optional: string delimiter "\n" by default
+    .quote_character('"')                   # optional: single character for string values, empty by default
+    .comment_prefix('#')                    # optional: string to indicate comments, empty by default
+    .ignore_first_line()                    # optional: ignore the first line, by default it is not skipped
+    .ignore_parse_errors()                  # optional: skip records with parse error instead of failing by default
+)
+{% endhighlight %}
+</div>
+
 <div data-lang="YAML" markdown="1">
 {% highlight yaml %}
 format:
@@ -1221,6 +1531,32 @@ val table: Table = ???
 table.insertInto("csvOutputTable")
 {% endhighlight %}
 </div>
+
+
+<div data-lang="python" markdown="1">
+{% highlight python %}
+
+field_names = ["f0", "f1"]
+field_types = [DataTypes.STRING(), DataTypes.INT()]
+
+sink = CsvTableSink(
+    field_names,
+    field_types,
+    path,                 # output path
+    "|",                  # optional: delimit files by '|'
+    1,                    # optional: write to a single file
+    WriteMode.OVERWRITE  # optional: override existing files
+)
+
+table_env.register_table_sink(
+    "csvOutputTable",
+    sink
+)
+
+table = ...
+table.insert_into("csvOutputTable")
+{% endhighlight %}
+</div>
 </div>
 
 ### JDBCAppendTableSink
diff --git a/docs/dev/table/functions.md b/docs/dev/table/functions.md
index 3179838..0939a9a 100644
--- a/docs/dev/table/functions.md
+++ b/docs/dev/table/functions.md
@@ -316,7 +316,7 @@ value NOT IN (sub-query)
 </table>
 </div>
 
-<div data-lang="java" markdown="1">
+<div data-lang="Java/Python" markdown="1">
 <table class="table table-bordered">
   <thead>
     <tr>
@@ -429,7 +429,7 @@ STRING1.like(STRING2)
     <tr>
       <td>
         {% highlight java %}
-STRING.similar(STRING)
+STRING1.similar(STRING2)
 {% endhighlight %}
       </td>
       <td>
@@ -796,7 +796,7 @@ boolean IS NOT UNKNOWN
 </table>
 </div>
 
-<div data-lang="java" markdown="1">
+<div data-lang="Java/Python" markdown="1">
 <table class="table table-bordered">
   <thead>
     <tr>
@@ -1471,7 +1471,7 @@ TRUNCATE(numeric1, integer2)
 </table>
 </div>
 
-<div data-lang="java" markdown="1">
+<div data-lang="Java/Python" markdown="1">
 <table class="table table-bordered">
   <thead>
     <tr>
@@ -2693,7 +2693,7 @@ TO_BASE64(string)
 </table>
 </div>
 
-<div data-lang="java" markdown="1">
+<div data-lang="Java/Python" markdown="1">
 
 <table class="table table-bordered">
   <thead>
@@ -3545,7 +3545,7 @@ TIMESTAMPDIFF(timepointunit, timepoint1, timepoint2)
 </table>
 </div>
 
-<div data-lang="java" markdown="1">
+<div data-lang="Java/Python" markdown="1">
 <table class="table table-bordered">
   <thead>
     <tr>
@@ -4185,7 +4185,7 @@ COALESCE(value1, value2 [, value3 ]* )
 </table>
 </div>
 
-<div data-lang="java" markdown="1">
+<div data-lang="Java/Python" markdown="1">
 <table class="table table-bordered">
   <thead>
     <tr>
@@ -4267,7 +4267,7 @@ CAST(value AS type)
 </table>
 </div>
 
-<div data-lang="java" markdown="1">
+<div data-lang="Java/Python" markdown="1">
 <table class="table table-bordered">
   <thead>
     <tr>
@@ -4393,7 +4393,7 @@ map ‘[’ value ‘]’
 </table>
 </div>
 
-<div data-lang="java" markdown="1">
+<div data-lang="Java/Python" markdown="1">
 <table class="table table-bordered">
   <thead>
     <tr>
@@ -4581,7 +4581,7 @@ MAP ‘[’ value1, value2 [, value3, value4 ]* ‘]’
 </table>
 
 </div>
-<div data-lang="java" markdown="1">
+<div data-lang="Java/Python" markdown="1">
 <table class="table table-bordered">
   <thead>
     <tr>
@@ -4738,7 +4738,7 @@ tableName.compositeType.*
 </table>
 </div>
 
-<div data-lang="java" markdown="1">
+<div data-lang="Java/Python" markdown="1">
 <table class="table table-bordered">
   <thead>
     <tr>
@@ -4856,7 +4856,7 @@ GROUPING_ID(expression1 [, expression2]* )
 </table>
 </div>
 
-<div data-lang="Java" markdown="1">
+<div data-lang="Java/Python" markdown="1">
 <table class="table table-bordered">
   <thead>
     <tr>
@@ -4977,7 +4977,7 @@ SHA2(string, hashLength)
 </table>
 </div>
 
-<div data-lang="java" markdown="1">
+<div data-lang="Java/Python" markdown="1">
 <table class="table table-bordered">
   <thead>
     <tr>
@@ -5180,7 +5180,7 @@ STRING.sha2(INT)
 </table>
 </div>
 
-<div data-lang="java" markdown="1">
+<div data-lang="Java/Python" markdown="1">
 <table class="table table-bordered">
   <thead>
     <tr>
@@ -5372,7 +5372,7 @@ COLLECT([ ALL | DISTINCT ] expression)
 
 </div>
 
-<div data-lang="java" markdown="1">
+<div data-lang="Java/Python" markdown="1">
 <table class="table table-bordered">
   <thead>
     <tr>
@@ -5832,7 +5832,7 @@ The usage of the column function is illustrated in the following table. (Suppose
 
 
 <div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
+<div data-lang="Java/Python" markdown="1">
 
 <table class="table table-bordered">
   <thead>
@@ -6077,6 +6077,14 @@ table
    .select(withColumns('a to 'b), myUDAgg(myUDF(withColumns(5 to 20))))
 {% endhighlight %}
 </div>
+
+<div data-lang="python" markdown="1">
+{% highlight python %}
+table \
+    .group_by("withColumns(1 to 3)") \
+    .select("withColumns(a to b), myUDAgg(myUDF(withColumns(5 to 20)))")
+{% endhighlight %}
+</div>
 </div>
 
 <span class="label label-info">Note</span> Column functions are only used in Table API.
diff --git a/docs/dev/table/functions.zh.md b/docs/dev/table/functions.zh.md
index aaba9ca..76a03e0 100644
--- a/docs/dev/table/functions.zh.md
+++ b/docs/dev/table/functions.zh.md
@@ -316,7 +316,7 @@ value NOT IN (sub-query)
 </table>
 </div>
 
-<div data-lang="java" markdown="1">
+<div data-lang="Java/Python" markdown="1">
 <table class="table table-bordered">
   <thead>
     <tr>
@@ -429,7 +429,7 @@ STRING1.like(STRING2)
     <tr>
       <td>
         {% highlight java %}
-STRING.similar(STRING)
+STRING1.similar(STRING2)
 {% endhighlight %}
       </td>
       <td>
@@ -796,7 +796,7 @@ boolean IS NOT UNKNOWN
 </table>
 </div>
 
-<div data-lang="java" markdown="1">
+<div data-lang="Java/Python" markdown="1">
 <table class="table table-bordered">
   <thead>
     <tr>
@@ -1471,7 +1471,7 @@ TRUNCATE(numeric1, integer2)
 </table>
 </div>
 
-<div data-lang="java" markdown="1">
+<div data-lang="Java/Python" markdown="1">
 <table class="table table-bordered">
   <thead>
     <tr>
@@ -2693,7 +2693,7 @@ TO_BASE64(string)
 </table>
 </div>
 
-<div data-lang="java" markdown="1">
+<div data-lang="Java/Python" markdown="1">
 
 <table class="table table-bordered">
   <thead>
@@ -3545,7 +3545,7 @@ TIMESTAMPDIFF(timepointunit, timepoint1, timepoint2)
 </table>
 </div>
 
-<div data-lang="java" markdown="1">
+<div data-lang="Java/Python" markdown="1">
 <table class="table table-bordered">
   <thead>
     <tr>
@@ -4185,7 +4185,7 @@ COALESCE(value1, value2 [, value3 ]* )
 </table>
 </div>
 
-<div data-lang="java" markdown="1">
+<div data-lang="Java/Python" markdown="1">
 <table class="table table-bordered">
   <thead>
     <tr>
@@ -4267,7 +4267,7 @@ CAST(value AS type)
 </table>
 </div>
 
-<div data-lang="java" markdown="1">
+<div data-lang="Java/Python" markdown="1">
 <table class="table table-bordered">
   <thead>
     <tr>
@@ -4393,7 +4393,7 @@ map ‘[’ value ‘]’
 </table>
 </div>
 
-<div data-lang="java" markdown="1">
+<div data-lang="Java/Python" markdown="1">
 <table class="table table-bordered">
   <thead>
     <tr>
@@ -4581,7 +4581,7 @@ MAP ‘[’ value1, value2 [, value3, value4 ]* ‘]’
 </table>
 
 </div>
-<div data-lang="java" markdown="1">
+<div data-lang="Java/Python" markdown="1">
 <table class="table table-bordered">
   <thead>
     <tr>
@@ -4738,7 +4738,7 @@ tableName.compositeType.*
 </table>
 </div>
 
-<div data-lang="java" markdown="1">
+<div data-lang="Java/Python" markdown="1">
 <table class="table table-bordered">
   <thead>
     <tr>
@@ -4856,7 +4856,7 @@ GROUPING_ID(expression1 [, expression2]* )
 </table>
 </div>
 
-<div data-lang="Java" markdown="1">
+<div data-lang="Java/Python" markdown="1">
 <table class="table table-bordered">
   <thead>
     <tr>
@@ -4977,7 +4977,7 @@ SHA2(string, hashLength)
 </table>
 </div>
 
-<div data-lang="java" markdown="1">
+<div data-lang="Java/Python" markdown="1">
 <table class="table table-bordered">
   <thead>
     <tr>
@@ -5180,7 +5180,7 @@ STRING.sha2(INT)
 </table>
 </div>
 
-<div data-lang="java" markdown="1">
+<div data-lang="Java/Python" markdown="1">
 <table class="table table-bordered">
   <thead>
     <tr>
@@ -5372,7 +5372,7 @@ COLLECT([ ALL | DISTINCT ] expression)
 
 </div>
 
-<div data-lang="java" markdown="1">
+<div data-lang="Java/Python" markdown="1">
 <table class="table table-bordered">
   <thead>
     <tr>
@@ -5832,7 +5832,7 @@ The usage of the column function is illustrated in the following table. (Suppose
 
 
 <div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
+<div data-lang="Java/Python" markdown="1">
 
 <table class="table table-bordered">
   <thead>
@@ -6077,6 +6077,14 @@ table
    .select(withColumns('a to 'b), myUDAgg(myUDF(withColumns(5 to 20))))
 {% endhighlight %}
 </div>
+
+<div data-lang="python" markdown="1">
+{% highlight python %}
+table \
+    .group_by("withColumns(1 to 3)") \
+    .select("withColumns(a to b), myUDAgg(myUDF(withColumns(5 to 20)))")
+{% endhighlight %}
+</div>
 </div>
 
 <span class="label label-info">Note</span> Column functions are only used in Table API.
diff --git a/docs/dev/table/sql.md b/docs/dev/table/sql.md
index 18ade21..f297409 100644
--- a/docs/dev/table/sql.md
+++ b/docs/dev/table/sql.md
@@ -102,6 +102,30 @@ tableEnv.sqlUpdate(
   "INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")
 {% endhighlight %}
 </div>
+
+<div data-lang="python" markdown="1">
+{% highlight python %}
+env = StreamExecutionEnvironment.get_execution_environment()
+table_env = StreamTableEnvironment.create(env)
+
+# SQL query with an inlined (unregistered) table
+# elements data type: BIGINT, STRING, BIGINT
+table = table_env.from_elements(..., ['user', 'product', 'amount'])
+result = table_env \
+    .sql_query("SELECT SUM(amount) FROM %s WHERE product LIKE '%%Rubber%%'" % table)
+
+# SQL update with a registered table
+# create and register a TableSink
+table_env.register_table("Orders", table)
+field_names = ["product", "amount"]
+field_types = [DataTypes.STRING(), DataTypes.BIGINT()]
+csv_sink = CsvTableSink(field_names, field_types, "/path/to/file", ...)
+table_env.register_table_sink("RubberOrders", csv_sink)
+# run a SQL update query on the Table and emit the result to the TableSink
+table_env \
+    .sql_update("INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")
+{% endhighlight %}
+</div>
 </div>
 
 {% top %}
diff --git a/docs/dev/table/sql.zh.md b/docs/dev/table/sql.zh.md
index 6f16c9c..60c4571 100644
--- a/docs/dev/table/sql.zh.md
+++ b/docs/dev/table/sql.zh.md
@@ -102,6 +102,31 @@ tableEnv.sqlUpdate(
   "INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")
 {% endhighlight %}
 </div>
+
+
+<div data-lang="python" markdown="1">
+{% highlight python %}
+env = StreamExecutionEnvironment.get_execution_environment()
+table_env = StreamTableEnvironment.create(env)
+
+# SQL query with an inlined (unregistered) table
+# elements data type: BIGINT, STRING, BIGINT
+table = table_env.from_elements(..., ['user', 'product', 'amount'])
+result = table_env \
+    .sql_query("SELECT SUM(amount) FROM %s WHERE product LIKE '%%Rubber%%'" % table)
+
+# SQL update with a registered table
+# create and register a TableSink
+table_env.register_table("Orders", table)
+field_names = ["product", "amount"]
+field_types = [DataTypes.STRING(), DataTypes.BIGINT()]
+csv_sink = CsvTableSink(field_names, field_types, "/path/to/file", ...)
+table_env.register_table_sink("RubberOrders", csv_sink)
+# run a SQL update query on the Table and emit the result to the TableSink
+table_env \
+    .sql_update("INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")
+{% endhighlight %}
+</div>
 </div>
 
 {% top %}
diff --git a/docs/dev/table/streaming/query_configuration.md b/docs/dev/table/streaming/query_configuration.md
index 58ee506..1e0527d 100644
--- a/docs/dev/table/streaming/query_configuration.md
+++ b/docs/dev/table/streaming/query_configuration.md
@@ -89,6 +89,31 @@ val stream: DataStream[Row] = result.toAppendStream[Row](qConfig)
 
 {% endhighlight %}
 </div>
+<div data-lang="python" markdown="1">
+{% highlight python %}
+env = StreamExecutionEnvironment.get_execution_environment()
+table_env = StreamTableEnvironment.create(env)
+
+# obtain query configuration from TableEnvironment
+q_config = StreamQueryConfig()
+# set query parameters
+q_config.with_idle_state_retention_time(timedelta(hours=12), timedelta(hours=24))
+
+# define query
+result = ...
+
+# create TableSink
+sink = ...
+
+# register TableSink
+table_env.register_table_sink("outputTable",  # table name
+                              sink)  # table sink
+
+# emit result Table via a TableSink
+result.insert_into("outputTable", q_config)
+
+{% endhighlight %}
+</div>
 </div>
 
 In the following we describe the parameters of the `QueryConfig` and how they affect the accuracy and resource consumption of a query.
@@ -137,6 +162,16 @@ qConfig.withIdleStateRetentionTime(Time.hours(12), Time.hours(24))
 
 {% endhighlight %}
 </div>
+<div data-lang="python" markdown="1">
+{% highlight python %}
+
+q_config = ...  # type: StreamQueryConfig
+
+# set idle state retention time: min = 12 hours, max = 24 hours
+q_config.with_idle_state_retention_time(timedelta(hours=12), timedelta(hours=24))
+
+{% endhighlight %}
+</div>
 </div>
 
 Cleaning up state requires additional bookkeeping which becomes less expensive for larger differences of `minTime` and `maxTime`. The difference between `minTime` and `maxTime` must be at least 5 minutes.
diff --git a/docs/dev/table/streaming/query_configuration.zh.md b/docs/dev/table/streaming/query_configuration.zh.md
index 58ee506..1e0527d 100644
--- a/docs/dev/table/streaming/query_configuration.zh.md
+++ b/docs/dev/table/streaming/query_configuration.zh.md
@@ -89,6 +89,31 @@ val stream: DataStream[Row] = result.toAppendStream[Row](qConfig)
 
 {% endhighlight %}
 </div>
+<div data-lang="python" markdown="1">
+{% highlight python %}
+env = StreamExecutionEnvironment.get_execution_environment()
+table_env = StreamTableEnvironment.create(env)
+
+# obtain query configuration from TableEnvironment
+q_config = StreamQueryConfig()
+# set query parameters
+q_config.with_idle_state_retention_time(timedelta(hours=12), timedelta(hours=24))
+
+# define query
+result = ...
+
+# create TableSink
+sink = ...
+
+# register TableSink
+table_env.register_table_sink("outputTable",  # table name
+                              sink)  # table sink
+
+# emit result Table via a TableSink
+result.insert_into("outputTable", q_config)
+
+{% endhighlight %}
+</div>
 </div>
 
 In the following we describe the parameters of the `QueryConfig` and how they affect the accuracy and resource consumption of a query.
@@ -137,6 +162,16 @@ qConfig.withIdleStateRetentionTime(Time.hours(12), Time.hours(24))
 
 {% endhighlight %}
 </div>
+<div data-lang="python" markdown="1">
+{% highlight python %}
+
+q_config = ...  # type: StreamQueryConfig
+
+# set idle state retention time: min = 12 hours, max = 24 hours
+q_config.with_idle_state_retention_time(timedelta(hours=12), timedelta(hours=24))
+
+{% endhighlight %}
+</div>
 </div>
 
 Cleaning up state requires additional bookkeeping which becomes less expensive for larger differences of `minTime` and `maxTime`. The difference between `minTime` and `maxTime` must be at least 5 minutes.
diff --git a/docs/dev/table/streaming/time_attributes.md b/docs/dev/table/streaming/time_attributes.md
index 9014d2a..20e4301 100644
--- a/docs/dev/table/streaming/time_attributes.md
+++ b/docs/dev/table/streaming/time_attributes.md
@@ -69,6 +69,17 @@ env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) // default
 // env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
 {% endhighlight %}
 </div>
+<div data-lang="python" markdown="1">
+{% highlight python %}
+env = StreamExecutionEnvironment.get_execution_environment()
+
+env.set_stream_time_characteristic(TimeCharacteristic.ProcessingTime)  # default
+
+# alternatively:
+# env.set_stream_time_characteristic(TimeCharacteristic.IngestionTime)
+# env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
+{% endhighlight %}
+</div>
 </div>
 
 Processing time
diff --git a/docs/dev/table/streaming/time_attributes.zh.md b/docs/dev/table/streaming/time_attributes.zh.md
index 9014d2a..20e4301 100644
--- a/docs/dev/table/streaming/time_attributes.zh.md
+++ b/docs/dev/table/streaming/time_attributes.zh.md
@@ -69,6 +69,17 @@ env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) // default
 // env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
 {% endhighlight %}
 </div>
+<div data-lang="python" markdown="1">
+{% highlight python %}
+env = StreamExecutionEnvironment.get_execution_environment()
+
+env.set_stream_time_characteristic(TimeCharacteristic.ProcessingTime)  # default
+
+# alternatively:
+# env.set_stream_time_characteristic(TimeCharacteristic.IngestionTime)
+# env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
+{% endhighlight %}
+</div>
 </div>
 
 Processing time
diff --git a/docs/dev/table/tableApi.md b/docs/dev/table/tableApi.md
index 3995b97..4fb9f75 100644
--- a/docs/dev/table/tableApi.md
+++ b/docs/dev/table/tableApi.md
@@ -101,9 +101,11 @@ The following example shows how a Python Table API program is constructed and ho
 
 {% highlight python %}
 from pyflink.table import *
+from pyflink.dataset import *
 
 # environment configuration
-t_env = TableEnvironment.create(TableConfig.Builder().as_batch_execution().build())
+env = ExecutionEnvironment.get_execution_environment()
+t_env = TableEnvironment.create(env, TableConfig())
 
 # register Orders table and Result table sink in table environment
 # ...
@@ -113,7 +115,7 @@ orders = t_env.scan("Orders")  # schema (a, b, c, rowtime)
 
 orders.group_by("a").select("a, b.count as cnt").insert_into("result")
 
-t_env.execute()
+env.execute()
 
 {% endhighlight %}
 
@@ -2287,10 +2289,10 @@ A session window is defined by using the `Session` class as follows:
 <div data-lang="python" markdown="1">
 {% highlight python %}
 # Session Event-time Window
-.window(Session.withGap("10.minutes").on("rowtime").alias("w"))
+.window(Session.with_gap("10.minutes").on("rowtime").alias("w"))
 
 # Session Processing-time Window (assuming a processing-time attribute "proctime")
-.window(Session.withGap("10.minutes").on("proctime").alias("w"))
+.window(Session.with_gap("10.minutes").on("proctime").alias("w"))
 {% endhighlight %}
 </div>
 </div>
@@ -2324,7 +2326,7 @@ val table = input
 {% highlight python %}
 # define over window with alias w and aggregate over the over window w
 table = input.over_window([OverWindow w].alias("w")) \
-             .select("a, b.sum over w, c.min over w")
+    .select("a, b.sum over w, c.min over w")
 {% endhighlight %}
 </div>
 </div>
diff --git a/docs/dev/table/tableApi.zh.md b/docs/dev/table/tableApi.zh.md
index ddcdd5d..33ad6f9 100644
--- a/docs/dev/table/tableApi.zh.md
+++ b/docs/dev/table/tableApi.zh.md
@@ -101,9 +101,11 @@ val result = orders
 
 {% highlight python %}
 from pyflink.table import *
+from pyflink.dataset import *
 
 # environment configuration
-t_env = TableEnvironment.create(TableConfig.Builder().as_batch_execution().build())
+env = ExecutionEnvironment.get_execution_environment()
+t_env = TableEnvironment.create(env, TableConfig())
 
 # register Orders table and Result table sink in table environment
 # ...
@@ -113,7 +115,7 @@ orders = t_env.scan("Orders")  # schema (a, b, c, rowtime)
 
 orders.group_by("a").select("a, b.count as cnt").insert_into("result")
 
-t_env.execute()
+env.execute()
 
 {% endhighlight %}
 
@@ -2286,10 +2288,10 @@ A session window is defined by using the `Session` class as follows:
 <div data-lang="python" markdown="1">
 {% highlight python %}
 # Session Event-time Window
-.window(Session.withGap("10.minutes").on("rowtime").alias("w"))
+.window(Session.with_gap("10.minutes").on("rowtime").alias("w"))
 
 # Session Processing-time Window (assuming a processing-time attribute "proctime")
-.window(Session.withGap("10.minutes").on("proctime").alias("w"))
+.window(Session.with_gap("10.minutes").on("proctime").alias("w"))
 {% endhighlight %}
 </div>
 </div>
@@ -2323,7 +2325,7 @@ val table = input
 {% highlight python %}
 # define over window with alias w and aggregate over the over window w
 table = input.over_window([OverWindow w].alias("w")) \
-             .select("a, b.sum over w, c.min over w")
+    .select("a, b.sum over w, c.min over w")
 {% endhighlight %}
 </div>
 </div>
diff --git a/flink-python/pyflink/dataset/execution_environment.py b/flink-python/pyflink/dataset/execution_environment.py
index 75fe47d..89115a3 100644
--- a/flink-python/pyflink/dataset/execution_environment.py
+++ b/flink-python/pyflink/dataset/execution_environment.py
@@ -70,7 +70,7 @@ class ExecutionEnvironment(object):
         """
         Gets the config object that defines execution parameters.
 
-        :return: The environment's execution configuration.
+        :return: An :class:`ExecutionConfig` object, the environment's execution configuration.
         """
         return ExecutionConfig(self._j_execution_environment.getConfig())
 
@@ -179,15 +179,15 @@ class ExecutionEnvironment(object):
         """
         return self._j_execution_environment.getExecutionPlan()
 
-    @classmethod
-    def get_execution_environment(cls):
+    @staticmethod
+    def get_execution_environment():
         """
         Creates an execution environment that represents the context in which the program is
         currently executed. If the program is invoked standalone, this method returns a local
         execution environment. If the program is invoked from within the command line client to be
         submitted to a cluster, this method returns the execution environment of this cluster.
 
-        :return: The execution environment of the context in which the program is executed.
+        :return: The :class:`ExecutionEnvironment` of the context in which the program is executed.
         """
         gateway = get_gateway()
         j_execution_environment = gateway.jvm.org.apache.flink.api.java.ExecutionEnvironment\
diff --git a/flink-python/pyflink/datastream/stream_execution_environment.py b/flink-python/pyflink/datastream/stream_execution_environment.py
index f8a4897..f5451d7 100644
--- a/flink-python/pyflink/datastream/stream_execution_environment.py
+++ b/flink-python/pyflink/datastream/stream_execution_environment.py
@@ -422,8 +422,8 @@ class StreamExecutionEnvironment(object):
         """
         return self._j_stream_execution_environment.getExecutionPlan()
 
-    @classmethod
-    def get_execution_environment(cls):
+    @staticmethod
+    def get_execution_environment():
         """
         Creates an execution environment that represents the context in which the
         program is currently executed. If the program is invoked standalone, this
diff --git a/flink-python/pyflink/table/__init__.py b/flink-python/pyflink/table/__init__.py
index 66528bf..ac5991d 100644
--- a/flink-python/pyflink/table/__init__.py
+++ b/flink-python/pyflink/table/__init__.py
@@ -59,7 +59,7 @@ from pyflink.table.table import Table, GroupedTable, GroupWindowedTable, OverWin
 from pyflink.table.table_config import TableConfig
 from pyflink.table.table_environment import (TableEnvironment, StreamTableEnvironment,
                                              BatchTableEnvironment)
-from pyflink.table.sinks import TableSink, CsvTableSink
+from pyflink.table.sinks import TableSink, CsvTableSink, WriteMode
 from pyflink.table.sources import TableSource, CsvTableSource
 from pyflink.table.types import DataTypes, UserDefinedType, Row
 from pyflink.table.table_schema import TableSchema
@@ -78,6 +78,7 @@ __all__ = [
     'BatchQueryConfig',
     'TableSink',
     'TableSource',
+    'WriteMode',
     'CsvTableSink',
     'CsvTableSource',
     'DataTypes',
diff --git a/flink-python/pyflink/table/descriptors.py b/flink-python/pyflink/table/descriptors.py
index d31561c..cb8fc7b 100644
--- a/flink-python/pyflink/table/descriptors.py
+++ b/flink-python/pyflink/table/descriptors.py
@@ -814,9 +814,9 @@ class Kafka(ConnectorDescriptor):
             |    Flink Sinks --------- Kafka Partitions
             |        1    ---------------->    1
             |        2    ---------------->    2
-            |                                  3
-            |                                  4
-            |                                  5
+            |             ................     3
+            |             ................     4
+            |             ................     5
 
         :return: This object.
         """
@@ -1199,6 +1199,8 @@ class ConnectTableDescriptor(Descriptor):
 class StreamTableDescriptor(ConnectTableDescriptor):
     """
     Descriptor for specifying a table source and/or sink in a streaming environment.
+
+    .. seealso:: parent class: :class:`ConnectTableDescriptor`
     """
 
     def __init__(self, j_stream_table_descriptor):
@@ -1259,6 +1261,8 @@ class StreamTableDescriptor(ConnectTableDescriptor):
 class BatchTableDescriptor(ConnectTableDescriptor):
     """
     Descriptor for specifying a table source and/or sink in a batch environment.
+
+    .. seealso:: parent class: :class:`ConnectTableDescriptor`
     """
 
     def __init__(self, j_batch_table_descriptor):
diff --git a/flink-python/pyflink/table/query_config.py b/flink-python/pyflink/table/query_config.py
index d39109a..69b6488 100644
--- a/flink-python/pyflink/table/query_config.py
+++ b/flink-python/pyflink/table/query_config.py
@@ -36,6 +36,15 @@ class QueryConfig(object):
 class StreamQueryConfig(QueryConfig):
     """
     The :class:`StreamQueryConfig` holds parameters to configure the behavior of streaming queries.
+
+    Example:
+    ::
+
+        >>> query_config = StreamQueryConfig() \\
+        ...     .with_idle_state_retention_time(datetime.timedelta(days=1),
+        ...                                     datetime.timedelta(days=3))
+        >>> table_env.sql_update("...", query_config)
+
     """
 
     def __init__(self, j_stream_query_config=None):
diff --git a/flink-python/pyflink/table/sinks.py b/flink-python/pyflink/table/sinks.py
index 9722602..4aa968f 100644
--- a/flink-python/pyflink/table/sinks.py
+++ b/flink-python/pyflink/table/sinks.py
@@ -20,7 +20,7 @@ from pyflink.java_gateway import get_gateway
 from pyflink.table.types import _to_java_type, DataType
 from pyflink.util import utils
 
-__all__ = ['TableSink', 'CsvTableSink']
+__all__ = ['TableSink', 'CsvTableSink', 'WriteMode']
 
 
 class TableSink(object):
@@ -41,12 +41,20 @@ class CsvTableSink(TableSink):
     """
     A simple :class:`TableSink` to emit data as CSV files.
 
+    Example:
+    ::
+
+        >>> CsvTableSink(["a", "b"], [DataTypes.INT(), DataTypes.STRING()],
+        ...              "/csv/file/path", "|", 1, WriteMode.OVERWRITE)
+
     :param field_names: The list of field names.
     :param field_types: The list of field data types.
     :param path: The output path to write the Table to.
     :param field_delimiter: The field delimiter.
     :param num_files: The number of files to write to.
-    :param write_mode: The write mode to specify whether existing files are overwritten or not.
+    :param write_mode: The write mode to specify whether existing files are overwritten or not,
+                       which contains: :data:`WriteMode.NO_OVERWRITE`
+                       and :data:`WriteMode.OVERWRITE`.
     """
 
     def __init__(self, field_names, field_types, path, field_delimiter=',', num_files=1,
diff --git a/flink-python/pyflink/table/table.py b/flink-python/pyflink/table/table.py
index 508ea17..fda8ebf 100644
--- a/flink-python/pyflink/table/table.py
+++ b/flink-python/pyflink/table/table.py
@@ -49,7 +49,7 @@ class Table(object):
         >>> t_env.register_table_source("source", ...)
         >>> t = t_env.scan("source")
         >>> t.select(...)
-        ...
+        >>> ...
         >>> t_env.register_table_sink("result", ...)
         >>> t.insert_into("result")
         >>> env.execute()
@@ -452,6 +452,13 @@ class Table(object):
             If the :func:`~pyflink.table.GroupWindowedTable.group_by` only references a GroupWindow
             alias, the streamed table will be processed by a single task, i.e., with parallelism 1.
 
+        Example:
+        ::
+
+            >>> tab.window(Tumble.over("10.minutes").on("rowtime").alias("w")) \\
+            ...     .group_by("w") \\
+            ...     .select("a.sum as a, w.start as b, w.end as c, w.rowtime as d")
+
         :param window: A :class:`pyflink.table.window.GroupWindow` created from
                        :class:`pyflink.table.window.Tumble`, :class:`pyflink.table.window.Session`
                        or :class:`pyflink.table.window.Slide`.
@@ -470,8 +477,8 @@ class Table(object):
         Example:
         ::
 
-            >>> table.window(Over.partition_by("c").order_by("rowTime")\\
-            ...     .preceding("10.seconds").alias("ow"))\\
+            >>> table.window(Over.partition_by("c").order_by("rowTime") \\
+            ...     .preceding("10.seconds").alias("ow")) \\
             ...     .select("c, b.count over ow, e.sum over ow")
 
         .. note::
@@ -563,7 +570,7 @@ class Table(object):
         Example:
         ::
 
-            >>> tab.insert_into("print")
+            >>> tab.insert_into("sink")
 
         :param table_path: The first part of the path of the registered :class:`TableSink` to which
                the :class:`Table` is written. This is to ensure at least the name of the
@@ -641,7 +648,7 @@ class GroupWindowedTable(object):
         Example:
         ::
 
-            >>> tab.window(groupWindow.alias("w")).group_by("w, key").select("key, value.avg")
+            >>> tab.window(group_window.alias("w")).group_by("w, key").select("key, value.avg")
 
         :param fields: Group keys.
         :return: A :class:`WindowGroupedTable`.
diff --git a/flink-python/pyflink/table/table_environment.py b/flink-python/pyflink/table/table_environment.py
index 9c4eabe..19b5199 100644
--- a/flink-python/pyflink/table/table_environment.py
+++ b/flink-python/pyflink/table/table_environment.py
@@ -59,7 +59,7 @@ class TableEnvironment(object):
 
             >>> csv_table_source = CsvTableSource(
             ...     csv_file_path, ['a', 'b'], [DataTypes.STRING(), DataTypes.BIGINT()])
-            ... table_env.from_table_source(csv_table_source)
+            >>> table_env.from_table_source(csv_table_source)
 
         :param table_source: The table source used as table.
         :return: The result :class:`Table`.
@@ -95,6 +95,12 @@ class TableEnvironment(object):
         Registers a :class:`Table` under a unique name in the TableEnvironment's catalog.
         Registered tables can be referenced in SQL queries.
 
+        Example:
+        ::
+
+            >>> tab = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['a', 'b'])
+            >>> table_env.register_table("source", tab)
+
         :param name: The name under which the table will be registered.
         :param table: The table to register.
         """
@@ -105,6 +111,15 @@ class TableEnvironment(object):
         Registers an external :class:`TableSource` in this :class:`TableEnvironment`'s catalog.
         Registered tables can be referenced in SQL queries.
 
+        Example:
+        ::
+
+            >>> table_env.register_table_source("source",
+            ...                                 CsvTableSource("./1.csv",
+            ...                                                ["a", "b"],
+            ...                                                [DataTypes.INT(),
+            ...                                                 DataTypes.STRING()]))
+
         :param name: The name under which the :class:`TableSource` is registered.
         :param table_source: The :class:`TableSource` to register.
         """
@@ -116,6 +131,15 @@ class TableEnvironment(object):
         :class:`TableEnvironment`'s catalog.
         Registered sink tables can be referenced in SQL DML statements.
 
+        Example:
+        ::
+
+            >>> table_env.register_table_sink("sink",
+            ...                               CsvTableSink(["a", "b"],
+            ...                                            [DataTypes.INT(),
+            ...                                             DataTypes.STRING()],
+            ...                                            "./2.csv"))
+
         :param name: The name under which the :class:`TableSink` is registered.
         :param table_sink: The :class:`TableSink` to register.
         """
@@ -135,12 +159,12 @@ class TableEnvironment(object):
         Scanning a directly registered table
         ::
 
-            >>> tab = t_env.scan("tableName")
+            >>> tab = table_env.scan("tableName")
 
         Scanning a table from a registered catalog
         ::
 
-            >>> tab = t_env.scan("catalogName", "dbName", "tableName")
+            >>> tab = table_env.scan("catalogName", "dbName", "tableName")
 
         :param table_path: The path of the table to scan.
         :throws: Exception if no table is found using the given table path.
@@ -160,8 +184,8 @@ class TableEnvironment(object):
         Example:
         ::
 
-            >>> tab = t_env.scan("tableName")
-            >>> t_env.insert_into(tab, "print")
+            >>> tab = table_env.scan("tableName")
+            >>> table_env.insert_into(tab, "sink")
 
         :param table: :class:`Table` to write to the sink.
         :param table_path: The first part of the path of the registered :class:`TableSink` to which
@@ -225,7 +249,7 @@ class TableEnvironment(object):
 
             >>> table = ...
             # the table is not registered to the table environment
-            >>> t_env.sql_query("SELECT * FROM %s" % table)
+            >>> table_env.sql_query("SELECT * FROM %s" % table)
 
         :param query: The sql query string.
         :return: The result :class:`Table`.
@@ -248,10 +272,10 @@ class TableEnvironment(object):
         ::
 
             # register the table sink into which the result is inserted.
-            >>> t_env.register_table_sink("sink_table", table_sink)
+            >>> table_env.register_table_sink("sink_table", table_sink)
             >>> source_table = ...
             # source_table is not registered to the table environment
-            >>> tEnv.sql_update(s"INSERT INTO sink_table SELECT * FROM %s" % source_table)
+            >>> table_env.sql_update("INSERT INTO sink_table SELECT * FROM %s" % source_table)
 
         :param stmt: The SQL statement to evaluate.
         :param query_config: The :class:`QueryConfig` to use.
@@ -404,16 +428,16 @@ class TableEnvironment(object):
         Example:
         ::
 
-            >>> table_env\\
+            >>> table_env \\
             ...     .connect(ExternalSystemXYZ()
-            ...              .version("0.11"))\\
+            ...              .version("0.11")) \\
             ...     .with_format(Json()
-            ...                 .json_schema("{...}")
-            ...                 .fail_on_missing_field(False))\\
+            ...                  .json_schema("{...}")
+            ...                  .fail_on_missing_field(False)) \\
             ...     .with_schema(Schema()
-            ...                 .field("user-name", "VARCHAR")
-            ...                 .from_origin_field("u_name")
-            ...                 .field("count", "DECIMAL"))\\
+            ...                  .field("user-name", "VARCHAR")
+            ...                  .from_origin_field("u_name")
+            ...                  .field("count", "DECIMAL")) \\
             ...     .register_table_source("MyTable")
 
         :param connector_descriptor: Connector descriptor describing the external system.
@@ -425,11 +449,50 @@ class TableEnvironment(object):
     def from_elements(self, elements, schema=None, verify_schema=True):
         """
         Creates a table from a collection of elements.
+        The elements types must be acceptable atomic types or acceptable composite types.
+        All elements must be of the same type.
+        If the elements types are composite types, the composite types must be strictly equal,
+        and its subtypes must also be acceptable types.
+        e.g. if the elements are tuples, the length of the tuples must be equal, the element types
+        of the tuples must be equal in order.
+
+        The built-in acceptable atomic element types contains:
+
+        **int**, **long**, **str**, **unicode**, **bool**,
+        **float**, **bytearray**, **datetime.date**, **datetime.time**, **datetime.datetime**,
+        **datetime.timedelta**, **decimal.Decimal**
+
+        The built-in acceptable composite element types contains:
+
+        **list**, **tuple**, **dict**, **array**, :class:`pyflink.table.Row`
+
+        If the element type is a composite type, it will be unboxed.
+        e.g. table_env.from_elements([(1, 'Hi'), (2, 'Hello')]) will return a table like:
+
+        +----+-------+
+        | _1 |  _2   |
+        +====+=======+
+        | 1  |  Hi   |
+        +----+-------+
+        | 2  | Hello |
+        +----+-------+
+
+        "_1" and "_2" are generated field names.
 
         Example:
         ::
 
+            # use the second parameter to specify custom field names
             >>> table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['a', 'b'])
+            # use the second parameter to specify custom table schema
+            >>> table_env.from_elements([(1, 'Hi'), (2, 'Hello')],
+            ...                         DataTypes.ROW([DataTypes.FIELD("a", DataTypes.INT()),
+            ...                                        DataTypes.FIELD("b", DataTypes.STRING())]))
+            # use the thrid parameter to switch whether to verify the elements against the schema
+            >>> table_env.from_elements([(1, 'Hi'), (2, 'Hello')],
+            ...                         DataTypes.ROW([DataTypes.FIELD("a", DataTypes.INT()),
+            ...                                        DataTypes.FIELD("b", DataTypes.STRING())]),
+            ...                         False)
 
         :param elements: The elements to create a table from.
         :param schema: The schema of the table.
@@ -541,16 +604,16 @@ class StreamTableEnvironment(TableEnvironment):
         registering a table source as "MyTable":
         ::
 
-            >>> table_env\\
+            >>> table_env \\
             ...     .connect(ExternalSystemXYZ()
-            ...              .version("0.11"))\\
+            ...              .version("0.11")) \\
             ...     .with_format(Json()
-            ...                 .json_schema("{...}")
-            ...                 .fail_on_missing_field(False))\\
+            ...                  .json_schema("{...}")
+            ...                  .fail_on_missing_field(False)) \\
             ...     .with_schema(Schema()
-            ...                 .field("user-name", "VARCHAR")
-            ...                 .from_origin_field("u_name")
-            ...                 .field("count", "DECIMAL"))\\
+            ...                  .field("user-name", "VARCHAR")
+            ...                  .from_origin_field("u_name")
+            ...                  .field("count", "DECIMAL")) \\
             ...     .register_table_source("MyTable")
 
         :param connector_descriptor: Connector descriptor describing the external system.
@@ -560,8 +623,28 @@ class StreamTableEnvironment(TableEnvironment):
         return StreamTableDescriptor(
             self._j_tenv.connect(connector_descriptor._j_connector_descriptor))
 
-    @classmethod
-    def create(cls, stream_execution_environment, table_config=None):
+    @staticmethod
+    def create(stream_execution_environment, table_config=None):
+        """
+        Creates a :class:`TableEnvironment` for a :class:`StreamExecutionEnvironment`
+
+        Example:
+        ::
+
+            >>> env = StreamExecutionEnvironment.get_execution_environment()
+            # create without TableConfig
+            >>> table_env = StreamTableEnvironment.create(env)
+            # create with TableConfig
+            >>> table_config = TableConfig()
+            >>> table_config.set_null_check(False)
+            >>> table_env = StreamTableEnvironment.create(env, table_config)
+
+        :param stream_execution_environment: The :class:`StreamExecutionEnvironment` of the
+                                             TableEnvironment.
+        :param table_config: The configuration of the TableEnvironment, optional.
+        :return: The :class:`StreamTableEnvironment` created from given StreamExecutionEnvironment
+                 and configuration.
+        """
         gateway = get_gateway()
         if table_config is not None:
             j_tenv = gateway.jvm.StreamTableEnvironment.create(
@@ -608,16 +691,16 @@ class BatchTableEnvironment(TableEnvironment):
         registering a table source as "MyTable":
         ::
 
-            >>> table_env\\
+            >>> table_env \\
             ...     .connect(ExternalSystemXYZ()
-            ...              .version("0.11"))\\
+            ...              .version("0.11")) \\
             ...     .with_format(Json()
-            ...                 .json_schema("{...}")
-            ...                 .fail_on_missing_field(False))\\
+            ...                  .json_schema("{...}")
+            ...                  .fail_on_missing_field(False)) \\
             ...     .with_schema(Schema()
-            ...                 .field("user-name", "VARCHAR")
-            ...                 .from_origin_field("u_name")
-            ...                 .field("count", "DECIMAL"))\\
+            ...                  .field("user-name", "VARCHAR")
+            ...                  .from_origin_field("u_name")
+            ...                  .field("count", "DECIMAL")) \\
             ...     .register_table_source("MyTable")
 
         :param connector_descriptor: Connector descriptor describing the external system.
@@ -627,8 +710,26 @@ class BatchTableEnvironment(TableEnvironment):
         return BatchTableDescriptor(
             self._j_tenv.connect(connector_descriptor._j_connector_descriptor))
 
-    @classmethod
-    def create(cls, execution_environment, table_config=None):
+    @staticmethod
+    def create(execution_environment, table_config=None):
+        """
+        Creates a :class:`TableEnvironment` for a batch :class:`ExecutionEnvironment`.
+
+        Example:
+        ::
+
+            >>> env = ExecutionEnvironment.get_execution_environment()
+            >>> table_env = BatchTableEnvironment.create(env)
+            >>> table_config = TableConfig()
+            >>> table_config.set_null_check(False)
+            >>> table_env = BatchTableEnvironment.create(env, table_config)
+
+        :param execution_environment: The batch :class:`ExecutionEnvironment` of the
+                                      TableEnvironment.
+        :param table_config: The configuration of the TableEnvironment, optional.
+        :return: The :class:`BatchTableEnvironment` created from given ExecutionEnvironment and
+                 configuration.
+        """
         gateway = get_gateway()
         if table_config is not None:
             j_tenv = gateway.jvm.BatchTableEnvironment.create(