You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ji...@apache.org on 2019/05/19 23:09:54 UTC

[flink] branch master updated: [FLINK-12407][python] Add all table operators for align Java Table API.

This is an automated email from the ASF dual-hosted git repository.

jincheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new b3604f7  [FLINK-12407][python] Add all table operators for align Java Table API.
b3604f7 is described below

commit b3604f7bee7456b8533e9ea222a833a2624e36c2
Author: Wei Zhong <we...@gmail.com>
AuthorDate: Fri May 10 15:37:15 2019 +0800

    [FLINK-12407][python] Add all table operators for align Java Table API.
    
    Brief change log:
     - Add all the operators except the ones that involve UDFs, such as: map/flatmap/aggregate/flatAggregate
     - Add test case for all operators, such as: test_aggregate, test_column_operation, test_join, test_window, test_calc etc.
     - Add doc for all operators
     - Add table_completeness test for check whether the Python Table API is aligned with the Java Table API
     - Add environment_completeness test for check whether the Python Enviroment is aligned with the Java Enviroment API
    
    This is closes #8401
---
 docs/dev/table/tableApi.md                         | 632 ++++++++++++++++++++-
 docs/dev/table/tableApi.zh.md                      | 632 ++++++++++++++++++++-
 flink-python/pyflink/table/__init__.py             |   7 +-
 flink-python/pyflink/table/query_config.py         | 111 ++++
 flink-python/pyflink/table/table.py                | 525 ++++++++++++++++-
 flink-python/pyflink/table/table_config.py         | 126 +++-
 flink-python/pyflink/table/table_environment.py    | 143 ++++-
 .../tests/{test_calc.py => test_aggregate.py}      |  38 +-
 flink-python/pyflink/table/tests/test_calc.py      |  73 ++-
 .../pyflink/table/tests/test_column_operation.py   | 127 +++++
 .../table/tests/{test_calc.py => test_distinct.py} |  38 +-
 .../table/tests/test_environment_completeness.py   |   5 +-
 flink-python/pyflink/table/tests/test_join.py      | 217 +++++++
 .../tests/{test_calc.py => test_print_schema.py}   |  39 +-
 .../pyflink/table/tests/test_set_operation.py      | 183 ++++++
 ...st_environment_completeness.py => test_sort.py} |  47 +-
 .../pyflink/table/tests/test_table_completeness.py |   6 +-
 .../table/tests/test_table_environment_api.py      | 428 ++++++++++++++
 flink-python/pyflink/table/tests/test_window.py    | 111 ++++
 flink-python/pyflink/table/window.py               | 449 +++++++++++++++
 flink-python/pyflink/testing/test_case_utils.py    |  46 +-
 21 files changed, 3817 insertions(+), 166 deletions(-)

diff --git a/docs/dev/table/tableApi.md b/docs/dev/table/tableApi.md
index 3cd3479..2143bf7 100644
--- a/docs/dev/table/tableApi.md
+++ b/docs/dev/table/tableApi.md
@@ -93,6 +93,31 @@ val result = orders
 {% endhighlight %}
 
 </div>
+<div data-lang="python" markdown="1">
+
+The Python Table API is enabled by `from pyflink.table import *`.
+
+The following example shows how a Python Table API program is constructed and how expressions are specified as strings.
+
+{% highlight python %}
+from pyflink.table import *
+
+# environment configuration
+t_env = TableEnvironment.create(TableConfig.Builder().as_batch_execution().build())
+
+# register Orders table and Result table sink in table environment
+# ...
+
+# specify table program
+orders = t_env.scan("Orders")  # schema (a, b, c, rowtime)
+
+orders.group_by("a").select("a, b.count as cnt").insert_into("result")
+
+t_env.execute()
+
+{% endhighlight %}
+
+</div>
 </div>
 
 The next example shows a more complex Table API program. The program scans again the `Orders` table. It filters null values, normalizes the field `a` of type String, and calculates for each hour and product `a` the average billing amount `b`.
@@ -135,6 +160,24 @@ val result: Table = orders
 {% endhighlight %}
 
 </div>
+
+<div data-lang="python" markdown="1">
+
+{% highlight python %}
+# environment configuration
+# ...
+
+# specify table program
+orders = t_env.scan("Orders")  # schema (a, b, c, rowtime)
+
+result = orders.filter("a.isNotNull && b.isNotNull && c.isNotNull") \
+               .select("a.lowerCase() as a, b, rowtime") \
+               .window(Tumble.over("1.hour").on("rowtime").alias("hourlyWindow")) \
+               .group_by("hourlyWindow, a") \
+               .select("a, hourlyWindow.end as hour, b.avg as avgBillingAmount")
+{% endhighlight %}
+
+</div>
 </div>
 
 Since the Table API is a unified API for batch and streaming data, both example programs can be executed on batch and streaming inputs without any modification of the table program itself. In both cases, the program produces the same results given that streaming records are not late (see [Streaming Concepts](streaming) for details).
@@ -316,7 +359,7 @@ val result = orders.where('b === "red")
   		<td>
         <p>Similar to the FROM clause in a SQL query. Performs a scan of a registered table.</p>
 {% highlight python %}
-orders = table_env.scan("Orders");
+orders = table_env.scan("Orders")
 {% endhighlight %}
       </td>
   	</tr>
@@ -328,12 +371,12 @@ orders = table_env.scan("Orders");
       <td>
         <p>Similar to a SQL SELECT statement. Performs a select operation.</p>
 {% highlight python %}
-orders = table_env.scan("Orders");
-result = orders.select("a, c as d");
+orders = table_env.scan("Orders")
+result = orders.select("a, c as d")
 {% endhighlight %}
         <p>You can use star (<code>*</code>) to act as a wild card, selecting all of the columns in the table.</p>
 {% highlight python %}
-result = orders.select("*");
+result = orders.select("*")
 {% endhighlight %}
 </td>
         </tr>
@@ -345,8 +388,8 @@ result = orders.select("*");
       <td>
         <p>Renames fields.</p>
 {% highlight python %}
-orders = table_env.scan("Orders");
-result = orders.alias("x, y, z, t");
+orders = table_env.scan("Orders")
+result = orders.alias("x, y, z, t")
 {% endhighlight %}
       </td>
     </tr>
@@ -359,13 +402,13 @@ result = orders.alias("x, y, z, t");
       <td>
         <p>Similar to a SQL WHERE clause. Filters out rows that do not pass the filter predicate.</p>
 {% highlight python %}
-orders = table_env.scan("Orders");
-result = orders.where("b === 'red'");
+orders = table_env.scan("Orders")
+result = orders.where("b === 'red'")
 {% endhighlight %}
 or
 {% highlight python %}
-orders = table_env.scan("Orders");
-result = orders.filter("a % 2 === 0");
+orders = table_env.scan("Orders")
+result = orders.filter("a % 2 === 0")
 {% endhighlight %}
       </td>
     </tr>
@@ -511,6 +554,73 @@ val result = orders.renameColumns('b as 'b2, 'c as 'c2)
   </tbody>
 </table>
 </div>
+<div data-lang="python" markdown="1">
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left" style="width: 20%">Operators</th>
+      <th class="text-center">Description</th>
+    </tr>
+  </thead>
+  <tbody>
+  <tr>
+          <td>
+            <strong>AddColumns</strong><br>
+            <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span>
+          </td>
+          <td>
+          <p>Performs a field add operation. It will throw an exception if the added fields already exist.</p>
+{% highlight python %}
+orders = table_env.scan("Orders")
+result = orders.add_columns("concat(c, 'sunny')")
+{% endhighlight %}
+</td>
+        </tr>
+        
+ <tr>
+     <td>
+                    <strong>AddOrReplaceColumns</strong><br>
+                    <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span>
+                  </td>
+                  <td>
+                  <p>Performs a field add operation. Existing fields will be replaced if add columns name is the same as the existing column name.  Moreover, if the added fields have duplicate field name, then the last one is used. </p>
+{% highlight python %}
+orders = table_env.scan("Orders")
+result = orders.add_or_replace_columns("concat(c, 'sunny') as desc")
+{% endhighlight %}
+                  </td>
+                </tr>
+         <tr>
+                  <td>
+                    <strong>DropColumns</strong><br>
+                    <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span>
+                  </td>
+                  <td>
+                  <p>Performs a field drop operation. The field expressions should be field reference expressions, and only existing fields can be dropped.</p>
+{% highlight python %}
+orders = table_env.scan("Orders")
+result = orders.drop_columns("b, c")
+{% endhighlight %}
+                  </td>
+                </tr>
+         <tr>
+                  <td>
+                    <strong>RenameColumns</strong><br>
+                    <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span>
+                  </td>
+                  <td>
+                  <p>Performs a field rename operation. The field expressions should be alias expressions, and only the existing fields can be renamed.</p>
+{% highlight python %}
+orders = table_env.scan("Orders")
+result = orders.rename_columns("b as b2, c as c2")
+{% endhighlight %}
+                  </td>
+                </tr>
+  </tbody>
+</table>
+
+</div>
 </div>
 
 {% top %}
@@ -758,6 +868,111 @@ val result = orders.distinct()
   </tbody>
 </table>
 </div>
+<div data-lang="python" markdown="1">
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left" style="width: 20%">Operators</th>
+      <th class="text-center">Description</th>
+    </tr>
+  </thead>
+  <tbody>
+    <tr>
+      <td>
+        <strong>GroupBy Aggregation</strong><br>
+        <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span><br>
+        <span class="label label-info">Result Updating</span>
+      </td>
+      <td>
+        <p>Similar to a SQL GROUP BY clause. Groups the rows on the grouping keys with a following running aggregation operator to aggregate rows group-wise.</p>
+{% highlight python %}
+orders = table_env.scan("Orders")
+result = orders.group_by("a").select("a, b.sum as d")
+{% endhighlight %}
+        <p><b>Note:</b> For streaming queries the required state to compute the query result might grow infinitely depending on the type of aggregation and the number of distinct grouping keys. Please provide a query configuration with valid retention interval to prevent excessive state size. See <a href="streaming/query_configuration.html">Query Configuration</a> for details.</p>
+      </td>
+    </tr>
+    <tr>
+    	<td>
+        <strong>GroupBy Window Aggregation</strong><br>
+        <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span>
+      </td>
+    	<td>
+        <p>Groups and aggregates a table on a <a href="#group-windows">group window</a> and possibly one or more grouping keys.</p>
+{% highlight python %}
+orders = table_env.scan("Orders")
+result = orders.window(Tumble.over("5.minutes").on("rowtime").alias("w")) \ 
+               .group_by("a, w") \
+               .select("a, w.start, w.end, w.rowtime, b.sum as d")
+{% endhighlight %}
+      </td>
+    </tr>
+    <tr>
+    	<td>
+        <strong>Over Window Aggregation</strong><br>
+        <span class="label label-primary">Streaming</span>
+      </td>
+      <td>
+       <p>Similar to a SQL OVER clause. Over window aggregates are computed for each row, based on a window (range) of preceding and succeeding rows. See the <a href="#over-windows">over windows section</a> for more details.</p>
+{% highlight python %}
+orders = table_env.scan("Orders")
+result = orders.over_window(Over.partition_by("a").order_by("rowtime") \
+      	       .preceding("UNBOUNDED_RANGE").following("CURRENT_RANGE") \
+               .alias("w")) \
+               .select("a, b.avg over w, b.max over w, b.min over w")
+{% endhighlight %}
+       <p><b>Note:</b> All aggregates must be defined over the same window, i.e., same partitioning, sorting, and range. Currently, only windows with PRECEDING (UNBOUNDED and bounded) to CURRENT ROW range are supported. Ranges with FOLLOWING are not supported yet. ORDER BY must be specified on a single <a href="streaming/time_attributes.html">time attribute</a>.</p>
+      </td>
+    </tr>
+    <tr>
+      <td>
+        <strong>Distinct Aggregation</strong><br>
+        <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span> <br>
+        <span class="label label-info">Result Updating</span>
+      </td>
+      <td>
+        <p>Similar to a SQL DISTINCT aggregation clause such as COUNT(DISTINCT a). Distinct aggregation declares that an aggregation function (built-in or user-defined) is only applied on distinct input values. Distinct can be applied to <b>GroupBy Aggregation</b>, <b>GroupBy Window Aggregation</b> and <b>Over Window Aggregation</b>.</p>
+{% highlight python %}
+orders = table_env.scan("Orders")
+# Distinct aggregation on group by
+group_by_distinct_result = orders.group_by("a") \
+                                 .select("a, b.sum.distinct as d")
+# Distinct aggregation on time window group by
+group_by_window_distinct_result = orders.window(
+    Tumble.over("5.minutes").on("rowtime").alias("w")).groupBy("a, w") \
+    .select("a, b.sum.distinct as d")
+# Distinct aggregation on over window
+result = orders.over_window(Over
+                       .partition_by("a")
+                       .order_by("rowtime")
+                       .preceding("UNBOUNDED_RANGE")
+                       .alias("w")) \
+                       .select(
+                       "a, b.avg.distinct over w, b.max over w, b.min over w")
+{% endhighlight %}
+        <p><b>Note:</b> For streaming queries the required state to compute the query result might grow infinitely depending on the number of distinct fields. Please provide a query configuration with valid retention interval to prevent excessive state size. See <a href="streaming/query_configuration.html">Query Configuration</a> for details.</p>
+      </td>
+    </tr>
+    <tr>
+      <td>
+        <strong>Distinct</strong><br>
+        <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span> <br>
+        <span class="label label-info">Result Updating</span>
+      </td>
+      <td>
+        <p>Similar to a SQL DISTINCT clause. Returns records with distinct value combinations.</p>
+{% highlight java %}
+orders = table_env.scan("Orders")
+result = orders.distinct()
+{% endhighlight %}
+        <p><b>Note:</b> For streaming queries the required state to compute the query result might grow infinitely depending on the number of distinct fields. Please provide a query configuration with valid retention interval to prevent excessive state size. See <a href="streaming/query_configuration.html">Query Configuration</a> for details.</p>
+      </td>
+    </tr>
+  </tbody>
+</table>
+
+</div>
 </div>
 
 {% top %}
@@ -1053,6 +1268,94 @@ val result = orders
   </tbody>
 </table>
 </div>
+<div data-lang="python" markdown="1">
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left" style="width: 20%">Operators</th>
+      <th class="text-center">Description</th>
+    </tr>
+  </thead>
+  <tbody>
+  	<tr>
+      <td>
+        <strong>Inner Join</strong><br>
+        <span class="label label-primary">Batch</span>
+        <span class="label label-primary">Streaming</span>
+      </td>
+      <td>
+        <p>Similar to a SQL JOIN clause. Joins two tables. Both tables must have distinct field names and at least one equality join predicate must be defined through join operator or using a where or filter operator.</p>
+{% highlight python %}
+left = table_env.scan("Source1").select("a, b, c")
+right = table_env.scan("Source2").select("d, e, f")
+result = left.join(right).where("a = d").select("a, b, e")
+{% endhighlight %}
+<p><b>Note:</b> For streaming queries the required state to compute the query result might grow infinitely depending on the number of distinct input rows. Please provide a query configuration with valid retention interval to prevent excessive state size. See <a href="streaming/query_configuration.html">Query Configuration</a> for details.</p>
+      </td>
+    </tr>
+
+    <tr>
+      <td>
+        <strong>Outer Join</strong><br>
+        <span class="label label-primary">Batch</span>
+        <span class="label label-primary">Streaming</span>
+        <span class="label label-info">Result Updating</span>
+      </td>
+      <td>
+        <p>Similar to SQL LEFT/RIGHT/FULL OUTER JOIN clauses. Joins two tables. Both tables must have distinct field names and at least one equality join predicate must be defined.</p>
+{% highlight python %}
+left = table_env.scan("Source1").select("a, b, c")
+right = table_env.scan("Source2").select("d, e, f")
+
+left_outer_result = left.left_outer_join(right, "a = d").select("a, b, e")
+right_outer_result = left.right_outer_join(right, "a = d").select("a, b, e")
+full_outer_result = left.full_outer_join(right, "a = d").select("a, b, e")
+{% endhighlight %}
+<p><b>Note:</b> For streaming queries the required state to compute the query result might grow infinitely depending on the number of distinct input rows. Please provide a query configuration with valid retention interval to prevent excessive state size. See <a href="streaming/query_configuration.html">Query Configuration</a> for details.</p>
+      </td>
+    </tr>
+    <tr>
+      <td><strong>Time-windowed Join</strong><br>
+        <span class="label label-primary">Batch</span>
+        <span class="label label-primary">Streaming</span>
+      </td>
+      <td>
+        <p>Currently not supported in python API.</p>
+      </td>
+    </tr>
+    <tr>
+    	<td>
+        <strong>Inner Join with Table Function</strong><br>
+        <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span>
+      </td>
+    	<td>
+        <p>Currently not supported in python API.</p>
+      </td>
+    </tr>
+    <tr>
+    	<td>
+        <strong>Left Outer Join with Table Function</strong><br>
+        <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span>
+      </td>
+      <td>
+        <p>Currently not supported in python API.</p>
+      </td>
+    </tr>
+    <tr>
+      <td>
+        <strong>Join with Temporal Table</strong><br>
+        <span class="label label-primary">Streaming</span>
+      </td>
+      <td>
+        <p>Currently not supported in python API.</p>
+      </td>
+    </tr>
+
+  </tbody>
+</table>
+
+</div>
 </div>
 
 {% top %}
@@ -1306,6 +1609,132 @@ val result = left.select('a, 'b, 'c).where('a.in(right))
   </tbody>
 </table>
 </div>
+<div data-lang="python" markdown="1">
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left" style="width: 20%">Operators</th>
+      <th class="text-center">Description</th>
+    </tr>
+  </thead>
+  <tbody>
+  	<tr>
+      <td>
+        <strong>Union</strong><br>
+        <span class="label label-primary">Batch</span>
+      </td>
+      <td>
+        <p>Similar to a SQL UNION clause. Unions two tables with duplicate records removed. Both tables must have identical field types.</p>
+{% highlight python %}
+left = table_env.scan("Source1").select("a, b, c")
+right = table_env.scan("Source2").select("a, b, c")
+result = left.union(right)
+{% endhighlight %}
+      </td>
+    </tr>
+
+    <tr>
+      <td>
+        <strong>UnionAll</strong><br>
+        <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span>
+      </td>
+      <td>
+        <p>Similar to a SQL UNION ALL clause. Unions two tables. Both tables must have identical field types.</p>
+{% highlight python %}
+left = table_env.scan("Source1").select("a, b, c")
+right = table_env.scan("Source2").select("a, b, c")
+result = left.union_all(right)
+{% endhighlight %}
+      </td>
+    </tr>
+
+    <tr>
+      <td>
+        <strong>Intersect</strong><br>
+        <span class="label label-primary">Batch</span>
+      </td>
+      <td>
+        <p>Similar to a SQL INTERSECT clause. Intersect returns records that exist in both tables. If a record is present one or both tables more than once, it is returned just once, i.e., the resulting table has no duplicate records. Both tables must have identical field types.</p>
+{% highlight python %}
+left = table_env.scan("Source1").select("a, b, c")
+right = table_env.scan("Source2").select("a, b, c")
+result = left.intersect(right)
+{% endhighlight %}
+      </td>
+    </tr>
+
+    <tr>
+      <td>
+        <strong>IntersectAll</strong><br>
+        <span class="label label-primary">Batch</span>
+      </td>
+      <td>
+        <p>Similar to a SQL INTERSECT ALL clause. IntersectAll returns records that exist in both tables. If a record is present in both tables more than once, it is returned as many times as it is present in both tables, i.e., the resulting table might have duplicate records. Both tables must have identical field types.</p>
+{% highlight python %}
+left = table_env.scan("Source1").select("a, b, c")
+right = table_env.scan("Source2").select("a, b, c")
+result = left.intersect_all(right)
+{% endhighlight %}
+      </td>
+    </tr>
+
+    <tr>
+      <td>
+        <strong>Minus</strong><br>
+        <span class="label label-primary">Batch</span>
+      </td>
+      <td>
+        <p>Similar to a SQL EXCEPT clause. Minus returns records from the left table that do not exist in the right table. Duplicate records in the left table are returned exactly once, i.e., duplicates are removed. Both tables must have identical field types.</p>
+{% highlight python %}
+left = table_env.scan("Source1").select("a, b, c")
+right = table_env.scan("Source2").select("a, b, c")
+result = left.minus(right);
+{% endhighlight %}
+      </td>
+    </tr>
+
+    <tr>
+      <td>
+        <strong>MinusAll</strong><br>
+        <span class="label label-primary">Batch</span>
+      </td>
+      <td>
+        <p>Similar to a SQL EXCEPT ALL clause. MinusAll returns the records that do not exist in the right table. A record that is present n times in the left table and m times in the right table is returned (n - m) times, i.e., as many duplicates as are present in the right table are removed. Both tables must have identical field types.</p>
+{% highlight python %}
+left = table_env.scan("Source1").select("a, b, c")
+right = table_env.scan("Source2").select("a, b, c")
+result = left.minus_all(right)
+{% endhighlight %}
+      </td>
+    </tr>
+
+    <tr>
+      <td>
+        <strong>In</strong><br>
+        <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span>
+      </td>
+      <td>
+        <p>Similar to a SQL IN clause. In returns true if an expression exists in a given table sub-query. The sub-query table must consist of one column. This column must have the same data type as the expression.</p>
+{% highlight python %}
+left = table_env.scan("Source1").select("a, b, c")
+right = table_env.scan("Source2").select("a")
+
+# using implicit registration
+result = left.select("a, b, c").where("a.in(%s)" % right)
+
+# using explicit registration
+table_env.register_table("RightTable", right)
+result = left.select("a, b, c").where("a.in(RightTable)")
+{% endhighlight %}
+
+        <p><b>Note:</b> For streaming queries the operation is rewritten in a join and group operation. The required state to compute the query result might grow infinitely depending on the number of distinct input rows. Please provide a query configuration with valid retention interval to prevent excessive state size. See <a href="streaming/query_configuration.html">Query Configuration</a> for details.</p>
+      </td>
+    </tr>
+  </tbody>
+</table>
+
+</div>
 </div>
 
 {% top %}
@@ -1410,6 +1839,54 @@ val result3: Table = in.orderBy('a.asc).offset(10).fetch(5)
   </tbody>
 </table>
 </div>
+<div data-lang="python" markdown="1">
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left" style="width: 20%">Operators</th>
+      <th class="text-center">Description</th>
+    </tr>
+  </thead>
+  <tbody>
+    <tr>
+      <td>
+        <strong>Order By</strong><br>
+        <span class="label label-primary">Batch</span>
+      </td>
+      <td>
+        <p>Similar to a SQL ORDER BY clause. Returns records globally sorted across all parallel partitions.</p>
+{% highlight python %}
+in = table_env.scan("Source1").select("a, b, c")
+result = in.order_by("a.asc")
+{% endhighlight %}
+      </td>
+    </tr>
+
+    <tr>
+      <td>
+        <strong>Offset &amp; Fetch</strong><br>
+        <span class="label label-primary">Batch</span>
+      </td>
+      <td>
+        <p>Similar to the SQL OFFSET and FETCH clauses. Offset and Fetch limit the number of records returned from a sorted result. Offset and Fetch are technically part of the Order By operator and thus must be preceded by it.</p>
+{% highlight python %}
+in = table_env.scan("Source1").select("a, b, c")
+
+# returns the first 5 records from the sorted result
+result1 = in.order_by("a.asc").fetch(5)
+
+# skips the first 3 records and returns all following records from the sorted result
+result2 = in.order_by("a.asc").offset(3)
+
+# skips the first 10 records and returns the next 5 records from the sorted result
+result3 = in.order_by("a.asc").offset(10).fetch(5)
+{% endhighlight %}
+      </td>
+    </tr>
+
+  </tbody>
+</table>
+</div>
 </div>
 
 ### Insert
@@ -1475,6 +1952,36 @@ orders.insertInto("OutOrders")
   </tbody>
 </table>
 </div>
+<div data-lang="python" markdown="1">
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left" style="width: 20%">Operators</th>
+      <th class="text-center">Description</th>
+    </tr>
+  </thead>
+  <tbody>
+    <tr>
+      <td>
+        <strong>Insert Into</strong><br>
+        <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span>
+      </td>
+      <td>
+        <p>Similar to the INSERT INTO clause in a SQL query. Performs a insertion into a registered output table.</p>
+
+        <p>Output tables must be registered in the TableEnvironment (see <a href="common.html#register-a-tablesink">Register a TableSink</a>). Moreover, the schema of the registered table must match the schema of the query.</p>
+
+{% highlight python %}
+orders = table_env.scan("Orders");
+orders.insert_into("OutOrders");
+{% endhighlight %}
+      </td>
+    </tr>
+
+  </tbody>
+</table>
+</div>
 </div>
 
 {% top %}
@@ -1504,6 +2011,14 @@ val table = input
   .select('b.sum)  // aggregate
 {% endhighlight %}
 </div>
+
+<div data-lang="python" markdown="1">
+{% highlight python %}
+# define window with alias w, group the table by window w, then aggregate
+table = input.window([GroupWindow w].alias("w")) \
+             .group_by("w").select("b.sum")
+{% endhighlight %}
+</div>
 </div>
 
 In streaming environments, window aggregates can only be computed in parallel if they group on one or more attributes in addition to the window, i.e., the `groupBy(...)` clause references a window alias and at least one additional attribute. A `groupBy(...)` clause that only references a window alias (such as in the example above) can only be evaluated by a single, non-parallel task. 
@@ -1527,6 +2042,15 @@ val table = input
   .select('a, 'b.sum)  // aggregate
 {% endhighlight %}
 </div>
+
+<div data-lang="python" markdown="1">
+{% highlight python %}
+# define window with alias w, group the table by attribute a and window w,
+# then aggregate
+table = input.window([GroupWindow w].alias("w")) \
+             .group_by("w, a").select("b.sum")
+{% endhighlight %}
+</div>
 </div>
 
 Window properties such as the start, end, or rowtime timestamp of a time window can be added in the select statement as a property of the window alias as `w.start`, `w.end`, and `w.rowtime`, respectively. The window start and rowtime timestamps are the inclusive lower and upper window boundaries. In contrast, the window end timestamp is the exclusive upper window boundary. For example a tumbling window of 30 minutes that starts at 2pm would have `14:00:00.000` as start timestamp, `14:29: [...]
@@ -1549,6 +2073,16 @@ val table = input
   .select('a, 'w.start, 'w.end, 'w.rowtime, 'b.count) // aggregate and add window start, end, and rowtime timestamps
 {% endhighlight %}
 </div>
+
+<div data-lang="python" markdown="1">
+{% highlight python %}
+# define window with alias w, group the table by attribute a and window w,
+# then aggregate and add window start, end, and rowtime timestamps
+table = input.window([GroupWindow w].alias("w")) \
+             .group_by("w, a") \
+             .select("a, w.start, w.end, w.rowtime, b.count")
+{% endhighlight %}
+</div>
 </div>
 
 The `Window` parameter defines how rows are mapped to windows. `Window` is not an interface that users can implement. Instead, the Table API provides a set of predefined `Window` classes with specific semantics, which are translated into underlying `DataStream` or `DataSet` operations. The supported window definitions are listed below.
@@ -1609,6 +2143,19 @@ Tumbling windows are defined by using the `Tumble` class as follows:
 .window(Tumble over 10.rows on 'proctime as 'w)
 {% endhighlight %}
 </div>
+
+<div data-lang="python" markdown="1">
+{% highlight python %}
+# Tumbling Event-time Window
+.window(Tumble.over("10.minutes").on("rowtime").alias("w"))
+
+# Tumbling Processing-time Window (assuming a processing-time attribute "proctime")
+.window(Tumble.over("10.minutes").on("proctime").alias("w"))
+
+# Tumbling Row-count Window (assuming a processing-time attribute "proctime")
+.window(Tumble.over("10.rows").on("proctime").alias("w"));
+{% endhighlight %}
+</div>
 </div>
 
 #### Slide (Sliding Windows)
@@ -1671,6 +2218,19 @@ Sliding windows are defined by using the `Slide` class as follows:
 .window(Slide over 10.rows every 5.rows on 'proctime as 'w)
 {% endhighlight %}
 </div>
+
+<div data-lang="python" markdown="1">
+{% highlight python %}
+# Sliding Event-time Window
+.window(Slide.over("10.minutes").every("5.minutes").on("rowtime").alias("w"))
+
+# Sliding Processing-time window (assuming a processing-time attribute "proctime")
+.window(Slide.over("10.minutes").every("5.minutes").on("proctime").alias("w"))
+
+# Sliding Row-count window (assuming a processing-time attribute "proctime")
+.window(Slide.over("10.rows").every("5.rows").on("proctime").alias("w"))
+{% endhighlight %}
+</div>
 </div>
 
 #### Session (Session Windows)
@@ -1723,6 +2283,16 @@ A session window is defined by using the `Session` class as follows:
 .window(Session withGap 10.minutes on 'proctime as 'w)
 {% endhighlight %}
 </div>
+
+<div data-lang="python" markdown="1">
+{% highlight python %}
+# Session Event-time Window
+.window(Session.withGap("10.minutes").on("rowtime").alias("w"))
+
+# Session Processing-time Window (assuming a processing-time attribute "proctime")
+.window(Session.withGap("10.minutes").on("proctime").alias("w"))
+{% endhighlight %}
+</div>
 </div>
 
 {% top %}
@@ -1731,7 +2301,7 @@ A session window is defined by using the `Session` class as follows:
 
 Over window aggregates are known from standard SQL (`OVER` clause) and defined in the `SELECT` clause of a query. Unlike group windows, which are specified in the `GROUP BY` clause, over windows do not collapse rows. Instead over window aggregates compute an aggregate for each input row over a range of its neighboring rows. 
 
-Over windows are defined using the `window(w: OverWindow*)` clause and referenced via an alias in the `select()` method. The following example shows how to define an over window aggregation on a table.
+Over windows are defined using the `window(w: OverWindow*)` clause (using `over_window(*OverWindow)` in Python API) and referenced via an alias in the `select()` method. The following example shows how to define an over window aggregation on a table.
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
@@ -1749,6 +2319,14 @@ val table = input
   .select('a, 'b.sum over 'w, 'c.min over 'w) // aggregate over the over window w
 {% endhighlight %}
 </div>
+
+<div data-lang="python" markdown="1">
+{% highlight python %}
+# define over window with alias w and aggregate over the over window w
+table = input.over_window([OverWindow w].alias("w")) \
+             .select("a, b.sum over w, c.min over w")
+{% endhighlight %}
+</div>
 </div>
 
 The `OverWindow` defines a range of rows over which aggregates are computed. `OverWindow` is not an interface that users can implement. Instead, the Table API provides the `Over` class to configure the properties of the over window. Over windows can be defined on event-time or processing-time and on ranges specified as time interval or row-count. The supported over window definitions are exposed as methods on `Over` (and other classes) and are listed below:
@@ -1855,6 +2433,22 @@ The `OverWindow` defines a range of rows over which aggregates are computed. `Ov
 .window(Over partitionBy 'a orderBy 'proctime preceding UNBOUNDED_ROW as 'w)
 {% endhighlight %}
 </div>
+
+<div data-lang="python" markdown="1">
+{% highlight python %}
+# Unbounded Event-time over window (assuming an event-time attribute "rowtime")
+.over_window(Over.partition_by("a").order_by("rowtime").preceding("unbounded_range").alias("w"))
+
+# Unbounded Processing-time over window (assuming a processing-time attribute "proctime")
+.over_window(Over.partition_by("a").order_by("proctime").preceding("unbounded_range").alias("w"))
+
+# Unbounded Event-time Row-count over window (assuming an event-time attribute "rowtime")
+.over_window(Over.partition_by("a").order_by("rowtime").preceding("unbounded_row").alias("w"))
+ 
+# Unbounded Processing-time Row-count over window (assuming a processing-time attribute "proctime")
+.over_window(Over.partition_by("a").order_by("proctime").preceding("unbounded_row").alias("w"))
+{% endhighlight %}
+</div>
 </div>
 
 #### Bounded Over Windows
@@ -1890,6 +2484,22 @@ The `OverWindow` defines a range of rows over which aggregates are computed. `Ov
 .window(Over partitionBy 'a orderBy 'proctime preceding 10.rows as 'w)
 {% endhighlight %}
 </div>
+
+<div data-lang="python" markdown="1">
+{% highlight python %}
+# Bounded Event-time over window (assuming an event-time attribute "rowtime")
+.over_window(Over.partition_by("a").order_by("rowtime").preceding("1.minutes").alias("w"))
+
+# Bounded Processing-time over window (assuming a processing-time attribute "proctime")
+.over_window(Over.partition_by("a").order_by("proctime").preceding("1.minutes").alias("w"))
+
+# Bounded Event-time Row-count over window (assuming an event-time attribute "rowtime")
+.over_window(Over.partition_by("a").order_by("rowtime").preceding("10.rows").alias("w"))
+ 
+# Bounded Processing-time Row-count over window (assuming a processing-time attribute "proctime")
+.over_window(Over.partition_by("a").order_by("proctime").preceding("10.rows").alias("w"))
+{% endhighlight %}
+</div>
 </div>
 
 {% top %}
diff --git a/docs/dev/table/tableApi.zh.md b/docs/dev/table/tableApi.zh.md
index 4aa1c98..acb1bb9 100644
--- a/docs/dev/table/tableApi.zh.md
+++ b/docs/dev/table/tableApi.zh.md
@@ -93,6 +93,31 @@ val result = orders
 {% endhighlight %}
 
 </div>
+<div data-lang="python" markdown="1">
+
+使用`from pyflink.table import *`来导入Python Table API。
+
+下面这个例子演示了如何组织一个Python Table API程序,以及字符串形式的表达式用法。
+
+{% highlight python %}
+from pyflink.table import *
+
+# environment configuration
+t_env = TableEnvironment.create(TableConfig.Builder().as_batch_execution().build())
+
+# register Orders table and Result table sink in table environment
+# ...
+
+# specify table program
+orders = t_env.scan("Orders")  # schema (a, b, c, rowtime)
+
+orders.group_by("a").select("a, b.count as cnt").insert_into("result")
+
+t_env.execute()
+
+{% endhighlight %}
+
+</div>
 </div>
 
 The next example shows a more complex Table API program. The program scans again the `Orders` table. It filters null values, normalizes the field `a` of type String, and calculates for each hour and product `a` the average billing amount `b`.
@@ -135,6 +160,24 @@ val result: Table = orders
 {% endhighlight %}
 
 </div>
+
+<div data-lang="python" markdown="1">
+
+{% highlight python %}
+# environment configuration
+# ...
+
+# specify table program
+orders = t_env.scan("Orders")  # schema (a, b, c, rowtime)
+
+result = orders.filter("a.isNotNull && b.isNotNull && c.isNotNull") \
+               .select("a.lowerCase() as a, b, rowtime") \
+               .window(Tumble.over("1.hour").on("rowtime").alias("hourlyWindow")) \
+               .group_by("hourlyWindow, a") \
+               .select("a, hourlyWindow.end as hour, b.avg as avgBillingAmount")
+{% endhighlight %}
+
+</div>
 </div>
 
 Since the Table API is a unified API for batch and streaming data, both example programs can be executed on batch and streaming inputs without any modification of the table program itself. In both cases, the program produces the same results given that streaming records are not late (see [Streaming Concepts](streaming) for details).
@@ -316,7 +359,7 @@ val result = orders.where('b === "red")
   		<td>
         <p>类似于SQL请求中的FROM子句,将一个环境中已注册的表转换成Table对象。</p>
 {% highlight python %}
-orders = table_env.scan("Orders");
+orders = table_env.scan("Orders")
 {% endhighlight %}
       </td>
   	</tr>
@@ -328,12 +371,12 @@ orders = table_env.scan("Orders");
       <td>
         <p>类似于SQL请求中的SELECT子句,执行一个select操作。</p>
 {% highlight python %}
-orders = table_env.scan("Orders");
-result = orders.select("a, c as d");
+orders = table_env.scan("Orders")
+result = orders.select("a, c as d")
 {% endhighlight %}
         <p>您可以使用星号 (<code>*</code>) 表示选择表中的所有列。</p>
 {% highlight python %}
-result = orders.select("*");
+result = orders.select("*")
 {% endhighlight %}
 </td>
         </tr>
@@ -345,8 +388,8 @@ result = orders.select("*");
       <td>
         <p>重命名字段。</p>
 {% highlight python %}
-orders = table_env.scan("Orders");
-result = orders.alias("x, y, z, t");
+orders = table_env.scan("Orders")
+result = orders.alias("x, y, z, t")
 {% endhighlight %}
       </td>
     </tr>
@@ -359,13 +402,13 @@ result = orders.alias("x, y, z, t");
       <td>
         <p>类似于SQL请求中的WHERE子句,过滤掉表中不满足条件的行。</p>
 {% highlight python %}
-orders = table_env.scan("Orders");
-result = orders.where("b === 'red'");
+orders = table_env.scan("Orders")
+result = orders.where("b === 'red'")
 {% endhighlight %}
 or
 {% highlight python %}
-orders = table_env.scan("Orders");
-result = orders.filter("a % 2 === 0");
+orders = table_env.scan("Orders")
+result = orders.filter("a % 2 === 0")
 {% endhighlight %}
       </td>
     </tr>
@@ -511,6 +554,73 @@ val result = orders.renameColumns('b as 'b2, 'c as 'c2)
   </tbody>
 </table>
 </div>
+<div data-lang="python" markdown="1">
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left" style="width: 20%">操作</th>
+      <th class="text-center">描述</th>
+    </tr>
+  </thead>
+  <tbody>
+  <tr>
+          <td>
+            <strong>AddColumns</strong><br>
+            <span class="label label-primary">批处理</span> <span class="label label-primary">流处理</span>
+          </td>
+          <td>
+          <p>执行新增字段操作。如果欲添加字段已经存在,将会抛出异常。</p>
+{% highlight python %}
+orders = table_env.scan("Orders")
+result = orders.add_columns("concat(c, 'sunny')")
+{% endhighlight %}
+</td>
+        </tr>
+        
+ <tr>
+     <td>
+                    <strong>AddOrReplaceColumns</strong><br>
+                    <span class="label label-primary">批处理</span> <span class="label label-primary">流处理</span>
+                  </td>
+                  <td>
+                  <p>执行新增字段操作。如果欲添加字段已经存在,将会替换该字段。如果新增字段列表中有同名字段,取最靠后的为有效字段。</p>
+{% highlight python %}
+orders = table_env.scan("Orders")
+result = orders.add_or_replace_columns("concat(c, 'sunny') as desc")
+{% endhighlight %}
+                  </td>
+                </tr>
+         <tr>
+                  <td>
+                    <strong>DropColumns</strong><br>
+                    <span class="label label-primary">批处理</span> <span class="label label-primary">流处理</span>
+                  </td>
+                  <td>
+                  <p>执行删除字段操作。参数必须是字段列表,并且必须是已经存在的字段才能被删除。</p>
+{% highlight python %}
+orders = table_env.scan("Orders")
+result = orders.drop_columns("b, c")
+{% endhighlight %}
+                  </td>
+                </tr>
+         <tr>
+                  <td>
+                    <strong>RenameColumns</strong><br>
+                    <span class="label label-primary">批处理</span> <span class="label label-primary">流处理</span>
+                  </td>
+                  <td>
+                  <p>执行重命名字段操作。参数必须是字段别名(例:b as b2)列表,并且必须是已经存在的字段才能被重命名。</p>
+{% highlight python %}
+orders = table_env.scan("Orders")
+result = orders.rename_columns("b as b2, c as c2")
+{% endhighlight %}
+                  </td>
+                </tr>
+  </tbody>
+</table>
+
+</div>
 </div>
 
 {% top %}
@@ -757,6 +867,111 @@ val result = orders.distinct()
   </tbody>
 </table>
 </div>
+<div data-lang="python" markdown="1">
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left" style="width: 20%">操作</th>
+      <th class="text-center">描述</th>
+    </tr>
+  </thead>
+  <tbody>
+    <tr>
+      <td>
+        <strong>GroupBy Aggregation</strong><br>
+        <span class="label label-primary">批处理</span> <span class="label label-primary">流处理</span><br>
+        <span class="label label-info">结果持续更新</span>
+      </td>
+      <td>
+        <p>类似于SQL的GROUP BY子句。将数据按照指定字段进行分组,之后对各组内数据执行聚合操作。</p>
+{% highlight python %}
+orders = table_env.scan("Orders")
+result = orders.group_by("a").select("a, b.sum as d")
+{% endhighlight %}
+        <p><b>注意:</b> 对于流式查询,计算查询结果所需的状态(state)可能会无限增长,具体情况取决于聚合操作的类型和分组的数量。您可能需要在查询配置中设置状态保留时间,以防止状态过大。详情请看<a href="streaming/query_configuration.html">查询配置</a>。</p>
+      </td>
+    </tr>
+    <tr>
+    	<td>
+        <strong>GroupBy Window Aggregation</strong><br>
+        <span class="label label-primary">批处理</span> <span class="label label-primary">流处理</span>
+      </td>
+    	<td>
+        <p>在一个窗口上分组和聚合数据,可包含其它分组字段。</p>
+{% highlight python %}
+orders = table_env.scan("Orders")
+result = orders.window(Tumble.over("5.minutes").on("rowtime").alias("w")) \ 
+               .group_by("a, w") \
+               .select("a, w.start, w.end, w.rowtime, b.sum as d")
+{% endhighlight %}
+      </td>
+    </tr>
+    <tr>
+    	<td>
+        <strong>Over Window Aggregation</strong><br>
+        <span class="label label-primary">流处理</span>
+      </td>
+      <td>
+       <p>类似于SQL中的OVER开窗函数。Over窗口聚合对每一行都进行一次聚合计算,聚合的对象是以当前行的位置为基准,向前向后取一个区间范围内的所有数据。详情请见<a href="#over-windows">Over窗口</a>一节。</p>
+{% highlight python %}
+orders = table_env.scan("Orders")
+result = orders.over_window(Over.partition_by("a").order_by("rowtime") \
+      	       .preceding("UNBOUNDED_RANGE").following("CURRENT_RANGE") \
+               .alias("w")) \
+               .select("a, b.avg over w, b.max over w, b.min over w")
+{% endhighlight %}
+       <p><b>注意:</b> 所有的聚合操作必须在同一个窗口上定义,即分组,排序,范围等属性必须一致。目前,窗口区间范围的向前(PRECEDING)取值没有限制,可以为无界(UNBOUNDED),但是向后(FOLLOWING)只支持当前行(CURRENT ROW),其它向后范围取值暂不支持。排序(ORDER BY)属性必须指定单个<a href="streaming/time_attributes.html">时间属性</a>。</p>
+      </td>
+    </tr>
+    <tr>
+      <td>
+        <strong>Distinct Aggregation</strong><br>
+        <span class="label label-primary">批处理</span> <span class="label label-primary">流处理</span> <br>
+        <span class="label label-info">结果持续更新</span>
+      </td>
+      <td>
+        <p>类似于SQL聚合函数中的的DISTINCT关键字比如COUNT(DISTINCT a)。带有distinct标记的聚合函数只会接受不重复的输入,重复输入将被丢弃。这个去重特性可以在<b>分组聚合(GroupBy Aggregation)</b>,<b>分组窗口聚合(GroupBy Window Aggregation)</b>以及<b>Over窗口聚合(Over Window Aggregation)</b>上使用。</p>
+{% highlight python %}
+orders = table_env.scan("Orders")
+# Distinct aggregation on group by
+group_by_distinct_result = orders.group_by("a") \
+                                 .select("a, b.sum.distinct as d")
+# Distinct aggregation on time window group by
+group_by_window_distinct_result = orders.window(
+    Tumble.over("5.minutes").on("rowtime").alias("w")).groupBy("a, w") \
+    .select("a, b.sum.distinct as d")
+# Distinct aggregation on over window
+result = orders.over_window(Over
+                       .partition_by("a")
+                       .order_by("rowtime")
+                       .preceding("UNBOUNDED_RANGE")
+                       .alias("w")) \
+                       .select(
+                       "a, b.avg.distinct over w, b.max over w, b.min over w")
+{% endhighlight %}
+        <p><b>注意:</b> 对于流式查询,计算查询结果所需的状态(state)可能会无限增长,具体情况取决于执行去重判断时参与判断的字段的数量。您可能需要在查询配置中设置状态保留时间,以防止状态过大。详情请看<a href="streaming/query_configuration.html">查询配置</a>。</p>
+      </td>
+    </tr>
+    <tr>
+      <td>
+        <strong>Distinct</strong><br>
+        <span class="label label-primary">批处理</span> <span class="label label-primary">流处理</span> <br>
+        <span class="label label-info">结果持续更新</span>
+      </td>
+      <td>
+        <p>类似于SQL中的DISTINCT子句。返回去重后的数据。</p>
+{% highlight java %}
+orders = table_env.scan("Orders")
+result = orders.distinct()
+{% endhighlight %}
+        <p><b>注意:</b> 对于流式查询,计算查询结果所需的状态(state)可能会无限增长,具体情况取决于执行去重判断时参与判断的字段的数量。您可能需要在查询配置中设置状态保留时间,以防止状态过大。详情请看<a href="streaming/query_configuration.html">查询配置</a>。</p>
+      </td>
+    </tr>
+  </tbody>
+</table>
+
+</div>
 </div>
 
 {% top %}
@@ -1052,6 +1267,94 @@ val result = orders
   </tbody>
 </table>
 </div>
+<div data-lang="python" markdown="1">
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left" style="width: 20%">操作</th>
+      <th class="text-center">描述</th>
+    </tr>
+  </thead>
+  <tbody>
+  	<tr>
+      <td>
+        <strong>Inner Join</strong><br>
+        <span class="label label-primary">批处理</span>
+        <span class="label label-primary">流处理</span>
+      </td>
+      <td>
+        <p>类似于SQL的JOIN子句。对两张表执行内连接操作。两张表必须具有不同的字段名称,并且必须在join方法或者随后的where或filter方法中定义至少一个等值连接条件。</p>
+{% highlight python %}
+left = table_env.scan("Source1").select("a, b, c")
+right = table_env.scan("Source2").select("d, e, f")
+result = left.join(right).where("a = d").select("a, b, e")
+{% endhighlight %}
+<p><b>注意:</b> 对于流式查询,计算查询结果所需的状态(state)可能会无限增长,具体取决于不重复的输入行的数量。您可能需要在查询配置中设置状态保留时间,以防止状态过大。详情请看<a href="streaming/query_configuration.html">查询配置</a>。</p>
+      </td>
+    </tr>
+
+    <tr>
+      <td>
+        <strong>Outer Join</strong><br>
+        <span class="label label-primary">批处理</span>
+        <span class="label label-primary">流处理</span>
+        <span class="label label-info">结果持续更新</span>
+      </td>
+      <td>
+        <p>类似于SQL的LEFT/RIGHT/FULL OUTER JOIN子句。对两张表执行外连接操作。两张表必须具有不同的字段名称,并且必须定义至少一个等值连接条件。</p>
+{% highlight python %}
+left = table_env.scan("Source1").select("a, b, c")
+right = table_env.scan("Source2").select("d, e, f")
+
+left_outer_result = left.left_outer_join(right, "a = d").select("a, b, e")
+right_outer_result = left.right_outer_join(right, "a = d").select("a, b, e")
+full_outer_result = left.full_outer_join(right, "a = d").select("a, b, e")
+{% endhighlight %}
+<p><b>注意:</b> 对于流式查询,计算查询结果所需的状态(state)可能会无限增长,具体取决于不重复的输入行的数量。您可能需要在查询配置中设置状态保留时间,以防止状态过大。详情请看<a href="streaming/query_configuration.html">查询配置</a>。</p>
+      </td>
+    </tr>
+    <tr>
+      <td><strong>Time-windowed Join</strong><br>
+        <span class="label label-primary">批处理</span>
+        <span class="label label-primary">流处理</span>
+      </td>
+      <td>
+        <p>Python API暂不支持。</p>
+      </td>
+    </tr>
+    <tr>
+    	<td>
+        <strong>Inner Join with Table Function</strong><br>
+        <span class="label label-primary">批处理</span> <span class="label label-primary">流处理</span>
+      </td>
+    	<td>
+        <p>Python API暂不支持。</p>
+      </td>
+    </tr>
+    <tr>
+    	<td>
+        <strong>Left Outer Join with Table Function</strong><br>
+        <span class="label label-primary">批处理</span> <span class="label label-primary">流处理</span>
+      </td>
+      <td>
+        <p>Python API暂不支持。</p>
+      </td>
+    </tr>
+    <tr>
+      <td>
+        <strong>Join with Temporal Table</strong><br>
+        <span class="label label-primary">流处理</span>
+      </td>
+      <td>
+        <p>Python API暂不支持。</p>
+      </td>
+    </tr>
+
+  </tbody>
+</table>
+
+</div>
 </div>
 
 {% top %}
@@ -1305,6 +1608,132 @@ val result = left.select('a, 'b, 'c).where('a.in(right))
   </tbody>
 </table>
 </div>
+<div data-lang="python" markdown="1">
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left" style="width: 20%">Operators</th>
+      <th class="text-center">Description</th>
+    </tr>
+  </thead>
+  <tbody>
+  	<tr>
+      <td>
+        <strong>Union</strong><br>
+        <span class="label label-primary">批处理</span>
+      </td>
+      <td>
+        <p>类似于SQL的UNION子句。将两张表组合成一张表,这张表拥有二者去除重复后的全部数据。两张表的字段和类型必须完全一致。</p>
+{% highlight python %}
+left = table_env.scan("Source1").select("a, b, c")
+right = table_env.scan("Source2").select("a, b, c")
+result = left.union(right)
+{% endhighlight %}
+      </td>
+    </tr>
+
+    <tr>
+      <td>
+        <strong>UnionAll</strong><br>
+        <span class="label label-primary">批处理</span> <span class="label label-primary">流处理</span>
+      </td>
+      <td>
+        <p>类似于SQL的UNION ALL子句。将两张表组合成一张表,这张表拥有二者的全部数据。两张表的字段和类型必须完全一致。</p>
+{% highlight python %}
+left = table_env.scan("Source1").select("a, b, c")
+right = table_env.scan("Source2").select("a, b, c")
+result = left.union_all(right)
+{% endhighlight %}
+      </td>
+    </tr>
+
+    <tr>
+      <td>
+        <strong>Intersect</strong><br>
+        <span class="label label-primary">批处理</span>
+      </td>
+      <td>
+        <p>类似于SQL的INTERSECT子句。Intersect返回在两张表中都存在的数据。如果一个记录在两张表中不止出现一次,则只返回一次,即结果表没有重复记录。两张表的字段和类型必须完全一致。</p>
+{% highlight python %}
+left = table_env.scan("Source1").select("a, b, c")
+right = table_env.scan("Source2").select("a, b, c")
+result = left.intersect(right)
+{% endhighlight %}
+      </td>
+    </tr>
+
+    <tr>
+      <td>
+        <strong>IntersectAll</strong><br>
+        <span class="label label-primary">批处理</span>
+      </td>
+      <td>
+        <p>类似于SQL的INTERSECT ALL子句。IntersectAll返回在两张表中都存在的数据。如果一个记录在两张表中不止出现一次,则按照它在两张表中都出现的次数返回,即结果表可能包含重复数据。两张表的字段和类型必须完全一致。</p>
+{% highlight python %}
+left = table_env.scan("Source1").select("a, b, c")
+right = table_env.scan("Source2").select("a, b, c")
+result = left.intersect_all(right)
+{% endhighlight %}
+      </td>
+    </tr>
+
+    <tr>
+      <td>
+        <strong>Minus</strong><br>
+        <span class="label label-primary">批处理</span>
+      </td>
+      <td>
+        <p>类似于SQL的EXCEPT子句。Minus返回仅存在于左表,不存在于右表中的数据。左表中的相同数据只会返回一次,即数据会被去重。两张表的字段和类型必须完全一致。</p>
+{% highlight python %}
+left = table_env.scan("Source1").select("a, b, c")
+right = table_env.scan("Source2").select("a, b, c")
+result = left.minus(right);
+{% endhighlight %}
+      </td>
+    </tr>
+
+    <tr>
+      <td>
+        <strong>MinusAll</strong><br>
+        <span class="label label-primary">批处理</span>
+      </td>
+      <td>
+        <p>类似于SQL的EXCEPT ALL子句。MinusAll返回仅存在于左表,不存在于右表中的数据。如果一条数据在左表中出现了n次,在右表中出现了m次,最终这条数据将会被返回(n - m)次,即按右表中出现的次数来移除数据。两张表的字段和类型必须完全一致。</p>
+{% highlight python %}
+left = table_env.scan("Source1").select("a, b, c")
+right = table_env.scan("Source2").select("a, b, c")
+result = left.minus_all(right)
+{% endhighlight %}
+      </td>
+    </tr>
+
+    <tr>
+      <td>
+        <strong>In</strong><br>
+        <span class="label label-primary">批处理</span> <span class="label label-primary">流处理</span>
+      </td>
+      <td>
+        <p>类似于SQL的IN子句。如果In左边表达式的值在给定的子查询结果中则返回true。子查询的结果必须为单列。此列数据类型必须和表达式一致。</p>
+{% highlight python %}
+left = table_env.scan("Source1").select("a, b, c")
+right = table_env.scan("Source2").select("a")
+
+# using implicit registration
+result = left.select("a, b, c").where("a.in(%s)" % right)
+
+# using explicit registration
+table_env.register_table("RightTable", right)
+result = left.select("a, b, c").where("a.in(RightTable)")
+{% endhighlight %}
+
+        <p><b>注意:</b> 对于流式查询,这个操作会被替换成一个连接操作和一个分组操作。计算查询结果所需的状态(state)可能会无限增长,具体取决于不重复的输入行的数量。您可能需要在查询配置中设置状态保留时间,以防止状态过大。详情请看<a href="streaming/query_configuration.html">查询配置</a>。</p>
+      </td>
+    </tr>
+  </tbody>
+</table>
+
+</div>
 </div>
 
 {% top %}
@@ -1409,6 +1838,54 @@ val result3: Table = in.orderBy('a.asc).offset(10).fetch(5)
   </tbody>
 </table>
 </div>
+<div data-lang="python" markdown="1">
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left" style="width: 20%">操作</th>
+      <th class="text-center">描述</th>
+    </tr>
+  </thead>
+  <tbody>
+    <tr>
+      <td>
+        <strong>Order By</strong><br>
+        <span class="label label-primary">批处理</span>
+      </td>
+      <td>
+        <p>类似于SQL的ORDER BY子句。返回包括所有子并发分区内所有数据的全局排序结果。</p>
+{% highlight python %}
+in = table_env.scan("Source1").select("a, b, c")
+result = in.order_by("a.asc")
+{% endhighlight %}
+      </td>
+    </tr>
+
+    <tr>
+      <td>
+        <strong>Offset &amp; Fetch</strong><br>
+        <span class="label label-primary">批处理</span>
+      </td>
+      <td>
+        <p>类似于SQL的OFFSET和FETCH子句。Offset和Fetch从已排序的结果中返回指定数量的数据。Offset和Fetch在技术上是Order By操作的一部分,因此必须紧跟其后出现。</p>
+{% highlight python %}
+in = table_env.scan("Source1").select("a, b, c")
+
+# returns the first 5 records from the sorted result
+result1 = in.order_by("a.asc").fetch(5)
+
+# skips the first 3 records and returns all following records from the sorted result
+result2 = in.order_by("a.asc").offset(3)
+
+# skips the first 10 records and returns the next 5 records from the sorted result
+result3 = in.order_by("a.asc").offset(10).fetch(5)
+{% endhighlight %}
+      </td>
+    </tr>
+
+  </tbody>
+</table>
+</div>
 </div>
 
 ### Insert
@@ -1474,6 +1951,36 @@ orders.insertInto("OutOrders")
   </tbody>
 </table>
 </div>
+<div data-lang="python" markdown="1">
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left" style="width: 20%">操作</th>
+      <th class="text-center">描述</th>
+    </tr>
+  </thead>
+  <tbody>
+    <tr>
+      <td>
+        <strong>Insert Into</strong><br>
+        <span class="label label-primary">批处理</span> <span class="label label-primary">流处理</span>
+      </td>
+      <td>
+        <p>类似于SQL请求中的INSERT INTO子句。将数据输出到一个已注册的输出表中。</p>
+
+        <p>输出表必须先在TableEnvironment中注册(详见<a href="common.html#register-a-tablesink">注册一个TableSink</a>)。此外,注册的表的模式(schema)必须和请求的结果的模式(schema)相匹配。</p>
+
+{% highlight python %}
+orders = table_env.scan("Orders");
+orders.insert_into("OutOrders");
+{% endhighlight %}
+      </td>
+    </tr>
+
+  </tbody>
+</table>
+</div>
 </div>
 
 {% top %}
@@ -1503,6 +2010,14 @@ val table = input
   .select('b.sum)  // aggregate
 {% endhighlight %}
 </div>
+
+<div data-lang="python" markdown="1">
+{% highlight python %}
+# define window with alias w, group the table by window w, then aggregate
+table = input.window([GroupWindow w].alias("w")) \
+             .group_by("w").select("b.sum")
+{% endhighlight %}
+</div>
 </div>
 
 In streaming environments, window aggregates can only be computed in parallel if they group on one or more attributes in addition to the window, i.e., the `groupBy(...)` clause references a window alias and at least one additional attribute. A `groupBy(...)` clause that only references a window alias (such as in the example above) can only be evaluated by a single, non-parallel task.
@@ -1526,6 +2041,15 @@ val table = input
   .select('a, 'b.sum)  // aggregate
 {% endhighlight %}
 </div>
+
+<div data-lang="python" markdown="1">
+{% highlight python %}
+# define window with alias w, group the table by attribute a and window w,
+# then aggregate
+table = input.window([GroupWindow w].alias("w")) \
+             .group_by("w, a").select("b.sum")
+{% endhighlight %}
+</div>
 </div>
 
 Window properties such as the start, end, or rowtime timestamp of a time window can be added in the select statement as a property of the window alias as `w.start`, `w.end`, and `w.rowtime`, respectively. The window start and rowtime timestamps are the inclusive lower and upper window boundaries. In contrast, the window end timestamp is the exclusive upper window boundary. For example a tumbling window of 30 minutes that starts at 2pm would have `14:00:00.000` as start timestamp, `14:29: [...]
@@ -1548,6 +2072,16 @@ val table = input
   .select('a, 'w.start, 'w.end, 'w.rowtime, 'b.count) // aggregate and add window start, end, and rowtime timestamps
 {% endhighlight %}
 </div>
+
+<div data-lang="python" markdown="1">
+{% highlight python %}
+# define window with alias w, group the table by attribute a and window w,
+# then aggregate and add window start, end, and rowtime timestamps
+table = input.window([GroupWindow w].alias("w")) \
+             .group_by("w, a") \
+             .select("a, w.start, w.end, w.rowtime, b.count")
+{% endhighlight %}
+</div>
 </div>
 
 The `Window` parameter defines how rows are mapped to windows. `Window` is not an interface that users can implement. Instead, the Table API provides a set of predefined `Window` classes with specific semantics, which are translated into underlying `DataStream` or `DataSet` operations. The supported window definitions are listed below.
@@ -1608,6 +2142,19 @@ Tumbling windows are defined by using the `Tumble` class as follows:
 .window(Tumble over 10.rows on 'proctime as 'w)
 {% endhighlight %}
 </div>
+
+<div data-lang="python" markdown="1">
+{% highlight python %}
+# Tumbling Event-time Window
+.window(Tumble.over("10.minutes").on("rowtime").alias("w"))
+
+# Tumbling Processing-time Window (assuming a processing-time attribute "proctime")
+.window(Tumble.over("10.minutes").on("proctime").alias("w"))
+
+# Tumbling Row-count Window (assuming a processing-time attribute "proctime")
+.window(Tumble.over("10.rows").on("proctime").alias("w"));
+{% endhighlight %}
+</div>
 </div>
 
 #### Slide (Sliding Windows)
@@ -1670,6 +2217,19 @@ Sliding windows are defined by using the `Slide` class as follows:
 .window(Slide over 10.rows every 5.rows on 'proctime as 'w)
 {% endhighlight %}
 </div>
+
+<div data-lang="python" markdown="1">
+{% highlight python %}
+# Sliding Event-time Window
+.window(Slide.over("10.minutes").every("5.minutes").on("rowtime").alias("w"))
+
+# Sliding Processing-time window (assuming a processing-time attribute "proctime")
+.window(Slide.over("10.minutes").every("5.minutes").on("proctime").alias("w"))
+
+# Sliding Row-count window (assuming a processing-time attribute "proctime")
+.window(Slide.over("10.rows").every("5.rows").on("proctime").alias("w"))
+{% endhighlight %}
+</div>
 </div>
 
 #### Session (Session Windows)
@@ -1722,6 +2282,16 @@ A session window is defined by using the `Session` class as follows:
 .window(Session withGap 10.minutes on 'proctime as 'w)
 {% endhighlight %}
 </div>
+
+<div data-lang="python" markdown="1">
+{% highlight python %}
+# Session Event-time Window
+.window(Session.withGap("10.minutes").on("rowtime").alias("w"))
+
+# Session Processing-time Window (assuming a processing-time attribute "proctime")
+.window(Session.withGap("10.minutes").on("proctime").alias("w"))
+{% endhighlight %}
+</div>
 </div>
 
 {% top %}
@@ -1730,7 +2300,7 @@ A session window is defined by using the `Session` class as follows:
 
 Over window aggregates are known from standard SQL (`OVER` clause) and defined in the `SELECT` clause of a query. Unlike group windows, which are specified in the `GROUP BY` clause, over windows do not collapse rows. Instead over window aggregates compute an aggregate for each input row over a range of its neighboring rows.
 
-Over windows are defined using the `window(w: OverWindow*)` clause and referenced via an alias in the `select()` method. The following example shows how to define an over window aggregation on a table.
+Over windows are defined using the `window(w: OverWindow*)` clause (在Python API中为`over_window(*OverWindow)`) and referenced via an alias in the `select()` method. The following example shows how to define an over window aggregation on a table.
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
@@ -1748,6 +2318,14 @@ val table = input
   .select('a, 'b.sum over 'w, 'c.min over 'w) // aggregate over the over window w
 {% endhighlight %}
 </div>
+
+<div data-lang="python" markdown="1">
+{% highlight python %}
+# define over window with alias w and aggregate over the over window w
+table = input.over_window([OverWindow w].alias("w")) \
+             .select("a, b.sum over w, c.min over w")
+{% endhighlight %}
+</div>
 </div>
 
 The `OverWindow` defines a range of rows over which aggregates are computed. `OverWindow` is not an interface that users can implement. Instead, the Table API provides the `Over` class to configure the properties of the over window. Over windows can be defined on event-time or processing-time and on ranges specified as time interval or row-count. The supported over window definitions are exposed as methods on `Over` (and other classes) and are listed below:
@@ -1854,6 +2432,22 @@ The `OverWindow` defines a range of rows over which aggregates are computed. `Ov
 .window(Over partitionBy 'a orderBy 'proctime preceding UNBOUNDED_ROW as 'w)
 {% endhighlight %}
 </div>
+
+<div data-lang="python" markdown="1">
+{% highlight python %}
+# Unbounded Event-time over window (assuming an event-time attribute "rowtime")
+.over_window(Over.partition_by("a").order_by("rowtime").preceding("unbounded_range").alias("w"))
+
+# Unbounded Processing-time over window (assuming a processing-time attribute "proctime")
+.over_window(Over.partition_by("a").order_by("proctime").preceding("unbounded_range").alias("w"))
+
+# Unbounded Event-time Row-count over window (assuming an event-time attribute "rowtime")
+.over_window(Over.partition_by("a").order_by("rowtime").preceding("unbounded_row").alias("w"))
+ 
+# Unbounded Processing-time Row-count over window (assuming a processing-time attribute "proctime")
+.over_window(Over.partition_by("a").order_by("proctime").preceding("unbounded_row").alias("w"))
+{% endhighlight %}
+</div>
 </div>
 
 #### Bounded Over Windows
@@ -1889,6 +2483,22 @@ The `OverWindow` defines a range of rows over which aggregates are computed. `Ov
 .window(Over partitionBy 'a orderBy 'proctime preceding 10.rows as 'w)
 {% endhighlight %}
 </div>
+
+<div data-lang="python" markdown="1">
+{% highlight python %}
+# Bounded Event-time over window (assuming an event-time attribute "rowtime")
+.over_window(Over.partition_by("a").order_by("rowtime").preceding("1.minutes").alias("w"))
+
+# Bounded Processing-time over window (assuming a processing-time attribute "proctime")
+.over_window(Over.partition_by("a").order_by("proctime").preceding("1.minutes").alias("w"))
+
+# Bounded Event-time Row-count over window (assuming an event-time attribute "rowtime")
+.over_window(Over.partition_by("a").order_by("rowtime").preceding("10.rows").alias("w"))
+ 
+# Bounded Processing-time Row-count over window (assuming a processing-time attribute "proctime")
+.over_window(Over.partition_by("a").order_by("proctime").preceding("10.rows").alias("w"))
+{% endhighlight %}
+</div>
 </div>
 
 {% top %}
diff --git a/flink-python/pyflink/table/__init__.py b/flink-python/pyflink/table/__init__.py
index 05575fc..dcbc0ab 100644
--- a/flink-python/pyflink/table/__init__.py
+++ b/flink-python/pyflink/table/__init__.py
@@ -39,6 +39,7 @@ from pyflink.table.table_environment import (TableEnvironment, StreamTableEnviro
 from pyflink.table.table_sink import TableSink, CsvTableSink
 from pyflink.table.table_source import TableSource, CsvTableSource
 from pyflink.table.types import DataTypes
+from pyflink.table.window import Tumble, Session, Slide, Over
 
 __all__ = [
     'TableEnvironment',
@@ -50,5 +51,9 @@ __all__ = [
     'TableSource',
     'CsvTableSink',
     'CsvTableSource',
-    'DataTypes'
+    'DataTypes',
+    'Tumble',
+    'Session',
+    'Slide',
+    'Over',
 ]
diff --git a/flink-python/pyflink/table/query_config.py b/flink-python/pyflink/table/query_config.py
new file mode 100644
index 0000000..9a2411c
--- /dev/null
+++ b/flink-python/pyflink/table/query_config.py
@@ -0,0 +1,111 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from abc import ABCMeta
+from datetime import timedelta
+from py4j.compat import long
+
+from pyflink.java_gateway import get_gateway
+
+
+class QueryConfig(object):
+    """
+    The :class:`QueryConfig` holds parameters to configure the behavior of queries.
+    """
+
+    __metaclass__ = ABCMeta
+
+    def __init__(self, j_query_config):
+        self._j_query_config = j_query_config
+
+
+class StreamQueryConfig(QueryConfig):
+    """
+    The :class:`StreamQueryConfig` holds parameters to configure the behavior of streaming queries.
+    """
+
+    def __init__(self, j_stream_query_config=None):
+        if j_stream_query_config is not None:
+            self._j_stream_query_config = j_stream_query_config
+        else:
+            self._j_stream_query_config = get_gateway().jvm.StreamQueryConfig()
+        super(StreamQueryConfig, self).__init__(self._j_stream_query_config)
+
+    def with_idle_state_retention_time(self, min_time, max_time):
+        """
+        Specifies a minimum and a maximum time interval for how long idle state, i.e., state which
+        was not updated, will be retained.
+
+        State will never be cleared until it was idle for less than the minimum time and will never
+        be kept if it was idle for more than the maximum time.
+
+        When new data arrives for previously cleaned-up state, the new data will be handled as if it
+        was the first data. This can result in previous results being overwritten.
+
+        Set to ``datetime.timedelta()``(zero) to never clean-up the state.
+
+        .. note::
+            Cleaning up state requires additional bookkeeping which becomes less expensive for
+            larger differences of minTime and maxTime. The difference between minTime and maxTime
+            must be at least ``datetime.timedelta(minutes=5)``(5 minutes).
+
+        :param min_time: The minimum time interval for which idle state is retained. Set to
+                         ``datetime.timedelta()``(zero) to never clean-up the state.
+        :param max_time: The maximum time interval for which idle state is retained. Must be at
+                         least 5 minutes greater than minTime. Set to
+                         ``datetime.timedelta()``(zero) to never clean-up the state.
+        :return: :class:`StreamQueryConfig`
+        """
+        #  type: (timedelta, timedelta) -> StreamQueryConfig
+        j_time_class = get_gateway().jvm.org.apache.flink.api.common.time.Time
+        j_min_time = j_time_class.milliseconds(long(round(min_time.total_seconds() * 1000)))
+        j_max_time = j_time_class.milliseconds(long(round(max_time.total_seconds() * 1000)))
+        self._j_stream_query_config = \
+            self._j_stream_query_config.withIdleStateRetentionTime(j_min_time, j_max_time)
+        return self
+
+    def get_min_idle_state_retention_time(self):
+        """
+        State might be cleared and removed if it was not updated for the defined period of time.
+
+        :return: The minimum time until state which was not updated will be retained.
+        """
+        #  type: () -> int
+        return self._j_stream_query_config.getMinIdleStateRetentionTime()
+
+    def get_max_idle_state_retention_time(self):
+        """
+        State will be cleared and removed if it was not updated for the defined period of time.
+
+        :return: The maximum time until state which was not updated will be retained.
+        """
+        #  type: () -> int
+        return self._j_stream_query_config.getMaxIdleStateRetentionTime()
+
+
+class BatchQueryConfig(QueryConfig):
+    """
+    The :class:`BatchQueryConfig` holds parameters to configure the behavior of batch queries.
+    """
+
+    def __init__(self, j_batch_query_config=None):
+        self._jvm = get_gateway().jvm
+        if j_batch_query_config is not None:
+            self._j_batch_query_config = j_batch_query_config
+        else:
+            self._j_batch_query_config = self._jvm.BatchQueryConfig()
+        super(BatchQueryConfig, self).__init__(self._j_batch_query_config)
diff --git a/flink-python/pyflink/table/table.py b/flink-python/pyflink/table/table.py
index 2321c59..ca7fdfb 100644
--- a/flink-python/pyflink/table/table.py
+++ b/flink-python/pyflink/table/table.py
@@ -15,8 +15,16 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 ################################################################################
+import sys
 
 from py4j.java_gateway import get_method
+from pyflink.java_gateway import get_gateway
+
+from pyflink.table.window import GroupWindow
+from pyflink.util.utils import to_jarray
+
+if sys.version > '3':
+    xrange = range
 
 __all__ = ['Table']
 
@@ -33,7 +41,7 @@ class Table(object):
     Example:
     ::
         >>> t_config = TableConfig.Builder().as_streaming_execution().set_parallelism(1).build()
-        >>> t_env = TableEnvironment.get_table_environment(t_config)
+        >>> t_env = TableEnvironment.create(t_config)
         >>> ...
         >>> t_env.register_table_source("source", ...)
         >>> t = t_env.scan("source")
@@ -70,6 +78,7 @@ class Table(object):
         """
         Renames the fields of the expression result. Use this to disambiguate fields before
         joining to operations.
+
         Example:
         ::
             >>> tab.alias("a, b")
@@ -83,6 +92,7 @@ class Table(object):
         """
         Filters out elements that don't pass the filter predicate. Similar to a SQL WHERE
         clause.
+
         Example:
         ::
             >>> tab.filter("name = 'Fred'")
@@ -96,9 +106,9 @@ class Table(object):
         """
         Filters out elements that don't pass the filter predicate. Similar to a SQL WHERE
         clause.
+
         Example:
         ::
-
             >>> tab.where("name = 'Fred'")
 
         :param predicate: Predicate expression string.
@@ -106,6 +116,402 @@ class Table(object):
         """
         return Table(self._j_table.where(predicate))
 
+    def group_by(self, fields):
+        """
+        Groups the elements on some grouping keys. Use this before a selection with aggregations
+        to perform the aggregation on a per-group basis. Similar to a SQL GROUP BY statement.
+
+        Example:
+        ::
+            >>> tab.group_by("key").select("key, value.avg")
+
+        :param fields: Group keys.
+        :return: The grouped table.
+        """
+        return GroupedTable(self._j_table.groupBy(fields))
+
+    def distinct(self):
+        """
+        Removes duplicate values and returns only distinct (different) values.
+
+        Example:
+        ::
+            >>> tab.select("key, value").distinct()
+
+        :return: Result table.
+        """
+        return Table(self._j_table.distinct())
+
+    def join(self, right, join_predicate=None):
+        """
+        Joins two :class:`Table`s. Similar to a SQL join. The fields of the two joined
+        operations must not overlap, use :func:`~pyflink.table.Table.alias` to rename fields if
+        necessary. You can use where and select clauses after a join to further specify the
+        behaviour of the join.
+
+        .. note::
+            Both tables must be bound to the same :class:`TableEnvironment` .
+
+        Example:
+        ::
+            >>> left.join(right).where("a = b && c > 3").select("a, b, d")
+            >>> left.join(right, "a = b")
+
+        :param right: Right table.
+        :param join_predicate: Optional, the join predicate expression string.
+        :return: Result table.
+        """
+        if join_predicate is not None:
+            return Table(self._j_table.join(right._j_table, join_predicate))
+        else:
+            return Table(self._j_table.join(right._j_table))
+
+    def left_outer_join(self, right, join_predicate=None):
+        """
+        Joins two :class:`Table`s. Similar to a SQL left outer join. The fields of the two joined
+        operations must not overlap, use :func:`~pyflink.table.Table.alias` to rename fields if
+        necessary.
+
+        .. note::
+            Both tables must be bound to the same :class:`TableEnvironment` and its
+            :class:`TableConfig` must have null check enabled (default).
+
+        Example:
+        ::
+            >>> left.left_outer_join(right).select("a, b, d")
+            >>> left.left_outer_join(right, "a = b").select("a, b, d")
+
+        :param right: Right table.
+        :param join_predicate: Optional, the join predicate expression string.
+        :return: Result table.
+        """
+        if join_predicate is None:
+            return Table(self._j_table.leftOuterJoin(right._j_table))
+        else:
+            return Table(self._j_table.leftOuterJoin(right._j_table, join_predicate))
+
+    def right_outer_join(self, right, join_predicate):
+        """
+        Joins two :class:`Table`s. Similar to a SQL right outer join. The fields of the two joined
+        operations must not overlap, use :func:`~pyflink.table.Table.alias` to rename fields if
+        necessary.
+
+        .. note::
+            Both tables must be bound to the same :class:`TableEnvironment` and its
+            :class:`TableConfig` must have null check enabled (default).
+
+        Example:
+        ::
+            >>> left.right_outer_join(right, "a = b").select("a, b, d")
+
+        :param right: Right table.
+        :param join_predicate: The join predicate expression string.
+        :return: Result table.
+        """
+        return Table(self._j_table.rightOuterJoin(right._j_table, join_predicate))
+
+    def full_outer_join(self, right, join_predicate):
+        """
+        Joins two :class:`Table`s. Similar to a SQL full outer join. The fields of the two joined
+        operations must not overlap, use :func:`~pyflink.table.Table.alias` to rename fields if
+        necessary.
+
+        .. note::
+            Both tables must be bound to the same :class:`TableEnvironment` and its
+            :class:`TableConfig` must have null check enabled (default).
+
+        Example:
+        ::
+            >>> left.full_outer_join(right, "a = b").select("a, b, d")
+
+        :param right: Right table.
+        :param join_predicate: The join predicate expression string.
+        :return: Result table.
+        """
+        return Table(self._j_table.fullOuterJoin(right._j_table, join_predicate))
+
+    def minus(self, right):
+        """
+        Minus of two :class:`Table`s with duplicate records removed.
+        Similar to a SQL EXCEPT clause. Minus returns records from the left table that do not
+        exist in the right table. Duplicate records in the left table are returned
+        exactly once, i.e., duplicates are removed. Both tables must have identical field types.
+
+        .. note::
+            Both tables must be bound to the same :class:`TableEnvironment`.
+
+        Example:
+        ::
+            >>> left.minus(right)
+
+        :param right: Right table.
+        :return: Result table.
+        """
+        return Table(self._j_table.minus(right._j_table))
+
+    def minus_all(self, right):
+        """
+        Minus of two :class:`Table`s. Similar to a SQL EXCEPT ALL.
+        Similar to a SQL EXCEPT ALL clause. MinusAll returns the records that do not exist in
+        the right table. A record that is present n times in the left table and m times
+        in the right table is returned (n - m) times, i.e., as many duplicates as are present
+        in the right table are removed. Both tables must have identical field types.
+
+        .. note::
+            Both tables must be bound to the same :class:`TableEnvironment`.
+
+        Example:
+        ::
+            >>> left.minus_all(right)
+
+        :param right: Right table.
+        :return: Result table.
+        """
+        return Table(self._j_table.minusAll(right._j_table))
+
+    def union(self, right):
+        """
+        Unions two :class:`Table`s with duplicate records removed.
+        Similar to a SQL UNION. The fields of the two union operations must fully overlap.
+
+        .. note::
+            Both tables must be bound to the same :class:`TableEnvironment`.
+
+        Example:
+        ::
+            >>> left.union(right)
+
+        :param right: Right table.
+        :return: Result table.
+        """
+        return Table(self._j_table.union(right._j_table))
+
+    def union_all(self, right):
+        """
+        Unions two :class:`Table`s. Similar to a SQL UNION ALL. The fields of the two union
+        operations must fully overlap.
+
+        .. note::
+            Both tables must be bound to the same :class:`TableEnvironment`.
+
+        Example:
+        ::
+            >>> left.union_all(right)
+
+        :param right: Right table.
+        :return: Result table.
+        """
+        return Table(self._j_table.unionAll(right._j_table))
+
+    def intersect(self, right):
+        """
+        Intersects two :class:`Table`s with duplicate records removed. Intersect returns records
+        that exist in both tables. If a record is present in one or both tables more than once,
+        it is returned just once, i.e., the resulting table has no duplicate records. Similar to a
+        SQL INTERSECT. The fields of the two intersect operations must fully overlap.
+
+        .. note::
+            Both tables must be bound to the same :class:`TableEnvironment`.
+
+        Example:
+        ::
+            >>> left.intersect(right)
+
+        :param right: Right table.
+        :return: Result table.
+        """
+        return Table(self._j_table.intersect(right._j_table))
+
+    def intersect_all(self, right):
+        """
+        Intersects two :class:`Table`s. IntersectAll returns records that exist in both tables.
+        If a record is present in both tables more than once, it is returned as many times as it
+        is present in both tables, i.e., the resulting table might have duplicate records. Similar
+        to an SQL INTERSECT ALL. The fields of the two intersect operations must fully overlap.
+
+        .. note::
+            Both tables must be bound to the same :class:`TableEnvironment`.
+
+        Example:
+        ::
+            >>> left.intersect_all(right)
+
+        :param right: Right table.
+        :return: Result table.
+        """
+        return Table(self._j_table.intersectAll(right._j_table))
+
+    def order_by(self, fields):
+        """
+        Sorts the given :class:`Table`. Similar to SQL ORDER BY.
+        The resulting Table is sorted globally sorted across all parallel partitions.
+
+        Example:
+        ::
+            >>> tab.order_by("name.desc")
+
+        :param fields: Order fields expression string,
+        :return: Result table.
+        """
+        return Table(self._j_table.orderBy(fields))
+
+    def offset(self, offset):
+        """
+        Limits a sorted result from an offset position.
+        Similar to a SQL OFFSET clause. Offset is technically part of the Order By operator and
+        thus must be preceded by it.
+        :func:`~pyflink.table.Table.offset` can be combined with a subsequent
+        :func:`~pyflink.table.Table.fetch` call to return n rows after skipping the first o rows.
+
+        Example:
+        ::
+            # skips the first 3 rows and returns all following rows.
+            >>> tab.order_by("name.desc").offset(3)
+            # skips the first 10 rows and returns the next 5 rows.
+            >>> tab.order_by("name.desc").offset(10).fetch(5)
+
+        :param offset: Number of records to skip.
+        :return: Result table.
+        """
+        return Table(self._j_table.offset(offset))
+
+    def fetch(self, fetch):
+        """
+        Limits a sorted result to the first n rows.
+        Similar to a SQL FETCH clause. Fetch is technically part of the Order By operator and
+        thus must be preceded by it.
+        :func:`~pyflink.table.Table.offset` can be combined with a preceding
+        :func:`~pyflink.table.Table.fetch` call to return n rows after skipping the first o rows.
+
+        Example:
+
+        Returns the first 3 records.
+        ::
+            >>> tab.order_by("name.desc").fetch(3)
+
+        Skips the first 10 rows and returns the next 5 rows.
+        ::
+            >>> tab.order_by("name.desc").offset(10).fetch(5)
+
+        :param fetch: The number of records to return. Fetch must be >= 0.
+        :return: Result table.
+        """
+        return Table(self._j_table.fetch(fetch))
+
+    def window(self, window):
+        """
+        Defines group window on the records of a table.
+
+        A group window groups the records of a table by assigning them to windows defined by a time
+        or row interval.
+
+        For streaming tables of infinite size, grouping into windows is required to define finite
+        groups on which group-based aggregates can be computed.
+
+        For batch tables of finite size, windowing essentially provides shortcuts for time-based
+        groupBy.
+
+        .. note::
+            Computing windowed aggregates on a streaming table is only a parallel operation
+            if additional grouping attributes are added to the
+            :func:`~pyflink.table.GroupWindowedTable.group_by` clause.
+            If the :func:`~pyflink.table.GroupWindowedTable.group_by` only references a GroupWindow
+            alias, the streamed table will be processed by a single task, i.e., with parallelism 1.
+
+        :param window: A :class:`GroupWindow` created from :class:`Tumble`, :class:`Session` or
+                        :class:`Slide`.
+        :return: A :class:`GroupWindowedTable`.
+        """
+        # type: (GroupWindow) -> GroupWindowedTable
+        return GroupWindowedTable(self._j_table.window(window._java_window))
+
+    def over_window(self, *over_windows):
+        """
+        Defines over-windows on the records of a table.
+
+        An over-window defines for each record an interval of records over which aggregation
+        functions can be computed.
+
+        Example:
+        ::
+            >>> table.window(Over.partition_by("c").order_by("rowTime")\
+            ...     .preceding("10.seconds").alias("ow"))\
+            ...     .select("c, b.count over ow, e.sum over ow")
+
+        .. note::
+            Computing over window aggregates on a streaming table is only a parallel
+            operation if the window is partitioned. Otherwise, the whole stream will be processed
+            by a single task, i.e., with parallelism 1.
+
+        .. note::
+            Over-windows for batch tables are currently not supported.
+
+        :param over_windows: :class:`OverWindow`s created from :class:`Over`.
+        :return: A :class:`OverWindowedTable`.
+        """
+        gateway = get_gateway()
+        window_array = to_jarray(gateway.jvm.OverWindow,
+                                 [item._java_over_window for item in over_windows])
+        return OverWindowedTable(self._j_table.window(window_array))
+
+    def add_columns(self, fields):
+        """
+        Adds additional columns. Similar to a SQL SELECT statement. The field expressions
+        can contain complex expressions, but can not contain aggregations. It will throw an
+        exception if the added fields already exist.
+
+        Example:
+        ::
+            >>> tab.add_columns("a + 1 as a1, concat(b, 'sunny') as b1")
+
+        :param fields: Column list string.
+        :return: Result table.
+        """
+        return Table(self._j_table.addColumns(fields))
+
+    def add_or_replace_columns(self, fields):
+        """
+        Adds additional columns. Similar to a SQL SELECT statement. The field expressions
+        can contain complex expressions, but can not contain aggregations. Existing fields will be
+        replaced if add columns name is the same as the existing column name. Moreover, if the added
+        fields have duplicate field name, then the last one is used.
+
+        Example:
+        ::
+            >>> tab.add_or_replace_columns("a + 1 as a1, concat(b, 'sunny') as b1")
+
+        :param fields: Column list string.
+        :return: Result table.
+        """
+        return Table(self._j_table.addOrReplaceColumns(fields))
+
+    def rename_columns(self, fields):
+        """
+        Renames existing columns. Similar to a field alias statement. The field expressions
+        should be alias expressions, and only the existing fields can be renamed.
+
+        Example:
+        ::
+            >>> tab.rename_columns("a as a1, b as b1")
+
+        :param fields: Column list string.
+        :return: Result table.
+        """
+        return Table(self._j_table.renameColumns(fields))
+
+    def drop_columns(self, fields):
+        """
+        Drops existing columns. The field expressions should be field reference expressions.
+
+        Example:
+        ::
+            >>> tab.drop_columns("a, b")
+
+        :param fields: Column list string.
+        :return: Result table.
+        """
+        return Table(self._j_table.dropColumns(fields))
+
     def insert_into(self, table_name):
         """
         Writes the :class:`Table` to a :class:`TableSink` that was registered under
@@ -118,3 +524,118 @@ class Table(object):
         :param table_name: Name of the :class:`TableSink` to which the :class:`Table` is written.
         """
         self._j_table.insertInto(table_name)
+
+    def print_schema(self):
+        """
+        Prints the schema of this table to the console in a tree format.
+        """
+        self._j_table.printSchema()
+
+    def __str__(self):
+        return self._j_table.toString()
+
+
+class GroupedTable(object):
+    """
+    A table that has been grouped on a set of grouping keys.
+    """
+
+    def __init__(self, java_table):
+        self._j_table = java_table
+
+    def select(self, fields):
+        """
+        Performs a selection operation on a grouped table. Similar to an SQL SELECT statement.
+        The field expressions can contain complex expressions and aggregations.
+
+        Example:
+        ::
+            >>> tab.group_by("key").select("key, value.avg + ' The average' as average")
+
+
+        :param fields: Expression string that contains group keys and aggregate function calls.
+        :return: Result table.
+        """
+        return Table(self._j_table.select(fields))
+
+
+class GroupWindowedTable(object):
+    """
+    A table that has been windowed for :class:`GroupWindow`s.
+    """
+
+    def __init__(self, java_group_windowed_table):
+        self._j_table = java_group_windowed_table
+
+    def group_by(self, fields):
+        """
+        Groups the elements by a mandatory window and one or more optional grouping attributes.
+        The window is specified by referring to its alias.
+
+        If no additional grouping attribute is specified and if the input is a streaming table,
+        the aggregation will be performed by a single task, i.e., with parallelism 1.
+
+        Aggregations are performed per group and defined by a subsequent
+        :func:`~pyflink.table.WindowGroupedTable.select` clause similar to SQL SELECT-GROUP-BY
+        query.
+
+        Example:
+        ::
+            >>> tab.window(groupWindow.alias("w")).group_by("w, key").select("key, value.avg")
+
+        :param fields: Group keys.
+        :return: A :class:`WindowGroupedTable`.
+        """
+        return WindowGroupedTable(self._j_table.groupBy(fields))
+
+
+class WindowGroupedTable(object):
+    """
+    A table that has been windowed and grouped for :class:`GroupWindow`s.
+    """
+
+    def __init__(self, java_window_grouped_table):
+        self._j_table = java_window_grouped_table
+
+    def select(self, fields):
+        """
+        Performs a selection operation on a window grouped table. Similar to an SQL SELECT
+        statement.
+        The field expressions can contain complex expressions and aggregations.
+
+        Example:
+        ::
+            >>> window_grouped_table.select("key, window.start, value.avg as valavg")
+
+        :param fields: Expression string.
+        :return: Result table.
+        """
+        return Table(self._j_table.select(fields))
+
+
+class OverWindowedTable(object):
+    """
+    A table that has been windowed for :class:`OverWindow`s.
+
+    Unlike group windows, which are specified in the GROUP BY clause, over windows do not collapse
+    rows. Instead over window aggregates compute an aggregate for each input row over a range of
+    its neighboring rows.
+    """
+
+    def __init__(self, java_over_windowed_table):
+        self._j_table = java_over_windowed_table
+
+    def select(self, fields):
+        """
+        Performs a selection operation on a over windowed table. Similar to an SQL SELECT
+        statement.
+        The field expressions can contain complex expressions and aggregations.
+
+        Example:
+        ::
+            >>> over_windowed_table.select("c, b.count over ow, e.sum over ow")
+
+        :param fields: Expression string.
+        :return: Result table.
+        """
+        return Table(self._j_table.select(fields))
diff --git a/flink-python/pyflink/table/table_config.py b/flink-python/pyflink/table/table_config.py
index abb8842..f02765c 100644
--- a/flink-python/pyflink/table/table_config.py
+++ b/flink-python/pyflink/table/table_config.py
@@ -15,9 +15,15 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 ################################################################################
+import sys
+
+from pyflink.java_gateway import get_gateway
 
 __all__ = ['TableConfig']
 
+if sys.version > '3':
+    unicode = str
+
 
 class TableConfig(object):
     """
@@ -25,37 +31,84 @@ class TableConfig(object):
     """
 
     def __init__(self):
-        self._is_stream = None
-        self._parallelism = None
+        self._jvm = get_gateway().jvm
+        self._j_table_config = self._jvm.TableConfig()
+        self._is_stream = None  # type: bool
+        self._parallelism = None  # type: int
 
-    @property
     def is_stream(self):
+        """
+        Configures execution mode, "true" for streaming and "false" for batch.
+        """
         return self._is_stream
 
-    @is_stream.setter
-    def is_stream(self, is_stream):
+    def _set_stream(self, is_stream):
         self._is_stream = is_stream
 
-    @property
     def parallelism(self):
+        """
+        The parallelism for all operations.
+        """
         return self._parallelism
 
-    @parallelism.setter
-    def parallelism(self, parallelism):
+    def _set_parallelism(self, parallelism):
         self._parallelism = parallelism
 
+    def timezone(self):
+        """
+        Returns the timezone id, either an abbreviation such as "PST", a full name such as
+        "America/Los_Angeles", or a custom timezone_id such as "GMT-8:00".
+        """
+        return self._j_table_config.getTimeZone().getID()
+
+    def _set_timezone(self, timezone_id):
+        if timezone_id is not None and isinstance(timezone_id, (str, unicode)):
+            j_timezone = self._jvm.java.util.TimeZone.getTimeZone(timezone_id)
+            self._j_table_config.setTimeZone(j_timezone)
+        else:
+            raise Exception("TableConfig.timezone should be a string!")
+
+    def null_check(self):
+        """
+        A boolean value, "True" enables NULL check and "False" disables NULL check.
+        """
+        return self._j_table_config.getNullCheck()
+
+    def _set_null_check(self, null_check):
+        if null_check is not None and isinstance(null_check, bool):
+            self._j_table_config.setNullCheck(null_check)
+        else:
+            raise Exception("TableConfig.null_check should be a bool value!")
+
+    def max_generated_code_length(self):
+        """
+        The current threshold where generated code will be split into sub-function calls. Java has
+        a maximum method length of 64 KB. This setting allows for finer granularity if necessary.
+        Default is 64000.
+        """
+        return self._j_table_config.getMaxGeneratedCodeLength()
+
+    def _set_max_generated_code_length(self, max_generated_code_length):
+        if max_generated_code_length is not None and isinstance(max_generated_code_length, int):
+            self._j_table_config.setMaxGeneratedCodeLength(max_generated_code_length)
+        else:
+            raise Exception("TableConfig.max_generated_code_length should be a int value!")
+
     class Builder(object):
 
         def __init__(self):
-            self._is_stream = None
-            self._parallelism = None
+            self._is_stream = None  # type: bool
+            self._parallelism = None  # type: int
+            self._timezone_id = None  # type: str
+            self._null_check = None  # type: bool
+            self._max_generated_code_length = None  # type: int
 
         def as_streaming_execution(self):
             """
             Configures streaming execution mode.
             If this method is called, :class:`StreamTableEnvironment` will be created.
 
-            :return: Builder
+            :return: :class:`TableConfig.Builder`
             """
             self._is_stream = True
             return self
@@ -65,7 +118,7 @@ class TableConfig(object):
             Configures batch execution mode.
             If this method is called, :class:`BatchTableEnvironment` will be created.
 
-            :return: Builder
+            :return: :class:`TableConfig.Builder`
             """
             self._is_stream = False
             return self
@@ -75,11 +128,46 @@ class TableConfig(object):
             Sets the parallelism for all operations.
 
             :param parallelism: The parallelism.
-            :return: Builder
+            :return: :class:`TableConfig.Builder`
             """
             self._parallelism = parallelism
             return self
 
+        def set_timezone(self, time_zone_id):
+            """
+            Sets the timezone for date/time/timestamp conversions.
+
+            :param time_zone_id: The time zone ID in string format, either an abbreviation such as
+                                 "PST", a full name such as "America/Los_Angeles", or a custom ID
+                                 such as "GMT-8:00".
+            :return: :class:`TableConfig.Builder`
+            """
+            self._timezone_id = time_zone_id
+            return self
+
+        def set_null_check(self, null_check):
+            """
+            Sets the NULL check. If enabled, all fields need to be checked for NULL first.
+
+            :param null_check: A boolean value, "True" enables NULL check and "False" disables
+                               NULL check.
+            :return: :class:`TableConfig.Builder`
+            """
+            self._null_check = null_check
+            return self
+
+        def set_max_generated_code_length(self, max_length):
+            """
+            Sets the current threshold where generated code will be split into sub-function calls.
+            Java has a maximum method length of 64 KB. This setting allows for finer granularity if
+            necessary. Default is 64000.
+
+            :param max_length: The maximum method length of generated java code.
+            :return: :class:`TableConfig.Builder`
+            """
+            self._max_generated_code_length = max_length
+            return self
+
         def build(self):
             """
             Builds :class:`TableConfig` object.
@@ -87,6 +175,14 @@ class TableConfig(object):
             :return: TableConfig
             """
             config = TableConfig()
-            config.parallelism = self._parallelism
-            config.is_stream = self._is_stream
+            if self._parallelism is not None:
+                config._set_parallelism(self._parallelism)
+            if self._is_stream is not None:
+                config._set_stream(self._is_stream)
+            if self._timezone_id is not None:
+                config._set_timezone(self._timezone_id)
+            if self._null_check is not None:
+                config._set_null_check(self._null_check)
+            if self._max_generated_code_length is not None:
+                config._set_max_generated_code_length(self._max_generated_code_length)
             return config
diff --git a/flink-python/pyflink/table/table_environment.py b/flink-python/pyflink/table/table_environment.py
index d3494ee..5b9886d 100644
--- a/flink-python/pyflink/table/table_environment.py
+++ b/flink-python/pyflink/table/table_environment.py
@@ -16,7 +16,10 @@
 # limitations under the License.
 ################################################################################
 
-from abc import ABCMeta
+from abc import ABCMeta, abstractmethod
+
+from pyflink.table.query_config import StreamQueryConfig, BatchQueryConfig, QueryConfig
+from pyflink.table.table_config import TableConfig
 
 from pyflink.java_gateway import get_gateway
 from pyflink.table import Table
@@ -56,7 +59,7 @@ class TableEnvironment(object):
         :param name: The name under which the table will be registered.
         :param table: The table to register.
         """
-        self._j_tenv.registerTable(name, table._java_table)
+        self._j_tenv.registerTable(name, table._j_table)
 
     def register_table_source(self, name, table_source):
         """
@@ -111,6 +114,75 @@ class TableEnvironment(object):
         j_table = self._j_tenv.scan(j_table_paths)
         return Table(j_table)
 
+    def list_tables(self):
+        """
+        Gets the names of all tables registered in this environment.
+
+        :return: List of table names.
+        """
+        j_table_name_array = self._j_tenv.listTables()
+        return [item for item in j_table_name_array]
+
+    def explain(self, table):
+        """
+        Returns the AST of the specified Table API and SQL queries and the execution plan to compute
+        the result of the given :class:`Table`.
+
+        :param table: The table to be explained.
+        :return: The table for which the AST and execution plan will be returned.
+        """
+        return self._j_tenv.explain(table._j_table)
+
+    def sql_query(self, query):
+        """
+        Evaluates a SQL query on registered tables and retrieves the result as a :class:`Table`.
+
+        All tables referenced by the query must be registered in the TableEnvironment.
+
+        A :class:`Table` is automatically registered when its :func:`~Table.__str__` method is
+        called, for example when it is embedded into a String.
+
+        Hence, SQL queries can directly reference a :class:`Table` as follows:
+
+        ::
+            >>> table = ...
+            # the table is not registered to the table environment
+            >>> t_env.sql_query("SELECT * FROM %s" % table)
+
+        :param query: The sql query string.
+        :return: The result :class:`Table`.
+        """
+        j_table = self._j_tenv.sqlQuery(query)
+        return Table(j_table)
+
+    def sql_update(self, stmt, query_config=None):
+        """
+        Evaluates a SQL statement such as INSERT, UPDATE or DELETE or a DDL statement
+
+        ..note::
+            Currently only SQL INSERT statements are supported.
+
+        All tables referenced by the query must be registered in the TableEnvironment.
+        A :class:`Table` is automatically registered when its :func:`~Table.__str__` method is
+        called, for example when it is embedded into a String.
+        Hence, SQL queries can directly reference a :class:`Table` as follows:
+
+        ::
+            # register the table sink into which the result is inserted.
+            >>> t_env.register_table_sink("sink_table", field_names, fields_types, table_sink)
+            >>> source_table = ...
+            # source_table is not registered to the table environment
+            >>> tEnv.sql_update(s"INSERT INTO sink_table SELECT * FROM %s" % source_table)
+
+        :param stmt: The SQL statement to evaluate.
+        :param query_config: The :class:`QueryConfig` to use.
+        """
+        # type: (str, QueryConfig) -> None
+        if query_config is not None:
+            self._j_tenv.sqlUpdate(stmt, query_config._j_query_config)
+        else:
+            self._j_tenv.sqlUpdate(stmt)
+
     def execute(self, job_name=None):
         """
         Triggers the program execution.
@@ -122,8 +194,21 @@ class TableEnvironment(object):
         else:
             self._j_tenv.execEnv().execute()
 
+    @abstractmethod
+    def get_config(self):
+        """
+        Returns the table config to define the runtime behavior of the Table API.
+
+        :return: Current :class:`TableConfig`.
+        """
+        pass
+
+    @abstractmethod
+    def query_config(self):
+        pass
+
     @classmethod
-    def get_table_environment(cls, table_config):
+    def create(cls, table_config):
         """
         Returns a :class:`StreamTableEnvironment` or a :class:`BatchTableEnvironment`
         which matches the :class:`TableConfig`'s content.
@@ -132,17 +217,19 @@ class TableEnvironment(object):
         :return: Desired :class:`TableEnvironment`.
         """
         gateway = get_gateway()
-        if table_config.is_stream:
+        if table_config.is_stream():
             j_execution_env = gateway.jvm.StreamExecutionEnvironment.getExecutionEnvironment()
-            j_tenv = gateway.jvm.StreamTableEnvironment.create(j_execution_env)
+            j_tenv = gateway.jvm.StreamTableEnvironment.create(
+                j_execution_env, table_config._j_table_config)
             t_env = StreamTableEnvironment(j_tenv)
         else:
             j_execution_env = gateway.jvm.ExecutionEnvironment.getExecutionEnvironment()
-            j_tenv = gateway.jvm.BatchTableEnvironment.create(j_execution_env)
+            j_tenv = gateway.jvm.BatchTableEnvironment.create(
+                j_execution_env, table_config._j_table_config)
             t_env = BatchTableEnvironment(j_tenv)
 
         if table_config.parallelism is not None:
-            t_env._j_tenv.execEnv().setParallelism(table_config.parallelism)
+            t_env._j_tenv.execEnv().setParallelism(table_config.parallelism())
 
         return t_env
 
@@ -153,9 +240,51 @@ class StreamTableEnvironment(TableEnvironment):
         self._j_tenv = j_tenv
         super(StreamTableEnvironment, self).__init__(j_tenv)
 
+    def get_config(self):
+        """
+        Returns the table config to define the runtime behavior of the Table API.
+
+        :return: Current :class:`TableConfig`.
+        """
+        table_config = TableConfig()
+        table_config._j_table_config = self._j_tenv.getConfig()
+        table_config._set_stream(True)
+        table_config._set_parallelism(self._j_tenv.execEnv().getParallelism())
+        return table_config
+
+    def query_config(self):
+        """
+        Returns a :class:`StreamQueryConfig` that holds parameters to configure the behavior of
+        streaming queries.
+
+        :return: A new :class:`StreamQueryConfig`.
+        """
+        return StreamQueryConfig(self._j_tenv.queryConfig())
+
 
 class BatchTableEnvironment(TableEnvironment):
 
     def __init__(self, j_tenv):
         self._j_tenv = j_tenv
         super(BatchTableEnvironment, self).__init__(j_tenv)
+
+    def get_config(self):
+        """
+        Returns the table config to define the runtime behavior of the Table API.
+
+        :return: Current :class:`TableConfig`.
+        """
+        table_config = TableConfig()
+        table_config._j_table_config = self._j_tenv.getConfig()
+        table_config._set_stream(False)
+        table_config._set_parallelism(self._j_tenv.execEnv().getParallelism())
+        return table_config
+
+    def query_config(self):
+        """
+        Returns the :class:`BatchQueryConfig` that holds parameters to configure the behavior of
+        batch queries.
+
+        :return: A new :class:`BatchQueryConfig`.
+        """
+        return BatchQueryConfig(self._j_tenv.queryConfig())
diff --git a/flink-python/pyflink/table/tests/test_calc.py b/flink-python/pyflink/table/tests/test_aggregate.py
similarity index 68%
copy from flink-python/pyflink/table/tests/test_calc.py
copy to flink-python/pyflink/table/tests/test_aggregate.py
index 70829dc..b37673d 100644
--- a/flink-python/pyflink/table/tests/test_calc.py
+++ b/flink-python/pyflink/table/tests/test_aggregate.py
@@ -1,4 +1,4 @@
-# ###############################################################################
+################################################################################
 #  Licensed to the Apache Software Foundation (ASF) under one
 #  or more contributor license agreements.  See the NOTICE file
 #  distributed with this work for additional information
@@ -18,44 +18,34 @@
 
 import os
 
-from pyflink.table.table_source import CsvTableSource
 from pyflink.table.types import DataTypes
 from pyflink.testing import source_sink_utils
 from pyflink.testing.test_case_utils import PyFlinkStreamTableTestCase
 
 
-class TableTests(PyFlinkStreamTableTestCase):
+class StreamTableAggregateTests(PyFlinkStreamTableTestCase):
 
-    def test_select(self):
+    def test_group_by(self):
         source_path = os.path.join(self.tempdir + '/streaming.csv')
-        with open(source_path, 'w') as f:
-            lines = '1,hi,hello\n' + '2,hi,hello\n'
-            f.write(lines)
-            f.close()
-
         field_names = ["a", "b", "c"]
         field_types = [DataTypes.INT, DataTypes.STRING, DataTypes.STRING]
-
+        data = [(1, "Hi", "Hello"), (2, "Hello", "Hello"), (2, "Hello", "Hello")]
+        csv_source = self.prepare_csv_source(source_path, data, field_types, field_names)
         t_env = self.t_env
-
-        # register Orders table in table environment
-        t_env.register_table_source(
-            "Orders",
-            CsvTableSource(source_path, field_names, field_types))
-
+        t_env.register_table_source("Source", csv_source)
+        source = t_env.scan("Source")
+        field_names = ["a", "b"]
+        field_types = [DataTypes.INT, DataTypes.STRING]
         t_env.register_table_sink(
             "Results",
-            field_names, field_types, source_sink_utils.TestAppendSink())
-
-        t_env.scan("Orders") \
-             .where("a > 0") \
-             .select("a + 1, b, c") \
-             .insert_into("Results")
+            field_names, field_types, source_sink_utils.TestRetractSink())
 
+        result = source.group_by("c").select("a.sum, c as b")
+        result.insert_into("Results")
         t_env.execute()
-
         actual = source_sink_utils.results()
-        expected = ['2,hi,hello', '3,hi,hello']
+
+        expected = ['5,Hello']
         self.assert_equals(actual, expected)
 
 
diff --git a/flink-python/pyflink/table/tests/test_calc.py b/flink-python/pyflink/table/tests/test_calc.py
index 70829dc..137f322 100644
--- a/flink-python/pyflink/table/tests/test_calc.py
+++ b/flink-python/pyflink/table/tests/test_calc.py
@@ -1,4 +1,4 @@
-# ###############################################################################
+################################################################################
 #  Licensed to the Apache Software Foundation (ASF) under one
 #  or more contributor license agreements.  See the NOTICE file
 #  distributed with this work for additional information
@@ -24,7 +24,7 @@ from pyflink.testing import source_sink_utils
 from pyflink.testing.test_case_utils import PyFlinkStreamTableTestCase
 
 
-class TableTests(PyFlinkStreamTableTestCase):
+class StreamTableCalcTests(PyFlinkStreamTableTestCase):
 
     def test_select(self):
         source_path = os.path.join(self.tempdir + '/streaming.csv')
@@ -32,30 +32,87 @@ class TableTests(PyFlinkStreamTableTestCase):
             lines = '1,hi,hello\n' + '2,hi,hello\n'
             f.write(lines)
             f.close()
-
         field_names = ["a", "b", "c"]
         field_types = [DataTypes.INT, DataTypes.STRING, DataTypes.STRING]
-
         t_env = self.t_env
-
         # register Orders table in table environment
         t_env.register_table_source(
             "Orders",
             CsvTableSource(source_path, field_names, field_types))
-
         t_env.register_table_sink(
             "Results",
             field_names, field_types, source_sink_utils.TestAppendSink())
 
         t_env.scan("Orders") \
-             .where("a > 0") \
              .select("a + 1, b, c") \
              .insert_into("Results")
+        t_env.execute()
+        actual = source_sink_utils.results()
+
+        expected = ['2,hi,hello', '3,hi,hello']
+        self.assert_equals(actual, expected)
 
+    def test_alias(self):
+        source_path = os.path.join(self.tempdir + '/streaming.csv')
+        field_names = ["a", "b", "c"]
+        field_types = [DataTypes.INT, DataTypes.STRING, DataTypes.STRING]
+        data = [(1, "Hi", "Hello"), (2, "Hello", "Hello")]
+        csv_source = self.prepare_csv_source(source_path, data, field_types, field_names)
+        t_env = self.t_env
+        t_env.register_table_source("Source", csv_source)
+        source = t_env.scan("Source")
+        t_env.register_table_sink(
+            "Results",
+            field_names, field_types, source_sink_utils.TestAppendSink())
+
+        result = source.alias("d, e, f").select("d, e, f")
+        result.insert_into("Results")
         t_env.execute()
+        actual = source_sink_utils.results()
+
+        expected = ['1,Hi,Hello', '2,Hello,Hello']
+        self.assert_equals(actual, expected)
 
+    def test_where(self):
+        source_path = os.path.join(self.tempdir + '/streaming.csv')
+        field_names = ["a", "b", "c"]
+        field_types = [DataTypes.INT, DataTypes.STRING, DataTypes.STRING]
+        data = [(1, "Hi", "Hello"), (2, "Hello", "Hello")]
+        csv_source = self.prepare_csv_source(source_path, data, field_types, field_names)
+        t_env = self.t_env
+        t_env.register_table_source("Source", csv_source)
+        source = t_env.scan("Source")
+        t_env.register_table_sink(
+            "Results",
+            field_names, field_types, source_sink_utils.TestAppendSink())
+
+        result = source.where("a > 1 && b = 'Hello'")
+        result.insert_into("Results")
+        t_env.execute()
         actual = source_sink_utils.results()
-        expected = ['2,hi,hello', '3,hi,hello']
+
+        expected = ['2,Hello,Hello']
+        self.assert_equals(actual, expected)
+
+    def test_filter(self):
+        source_path = os.path.join(self.tempdir + '/streaming.csv')
+        field_names = ["a", "b", "c"]
+        field_types = [DataTypes.INT, DataTypes.STRING, DataTypes.STRING]
+        data = [(1, "Hi", "Hello"), (2, "Hello", "Hello")]
+        csv_source = self.prepare_csv_source(source_path, data, field_types, field_names)
+        t_env = self.t_env
+        t_env.register_table_source("Source", csv_source)
+        source = t_env.scan("Source")
+        t_env.register_table_sink(
+            "Results",
+            field_names, field_types, source_sink_utils.TestAppendSink())
+
+        result = source.filter("a > 1 && b = 'Hello'")
+        result.insert_into("Results")
+        t_env.execute()
+        actual = source_sink_utils.results()
+
+        expected = ['2,Hello,Hello']
         self.assert_equals(actual, expected)
 
 
diff --git a/flink-python/pyflink/table/tests/test_column_operation.py b/flink-python/pyflink/table/tests/test_column_operation.py
new file mode 100644
index 0000000..d0fbc46
--- /dev/null
+++ b/flink-python/pyflink/table/tests/test_column_operation.py
@@ -0,0 +1,127 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+import os
+
+from pyflink.table.types import DataTypes
+from pyflink.testing import source_sink_utils
+from pyflink.testing.test_case_utils import PyFlinkStreamTableTestCase
+
+
+class StreamTableColumnsOperationTests(PyFlinkStreamTableTestCase):
+
+    def test_add_columns(self):
+        source_path = os.path.join(self.tempdir + '/streaming.csv')
+        field_names = ["a", "b", "c"]
+        field_types = [DataTypes.INT, DataTypes.STRING, DataTypes.STRING]
+        data = [(1, "Hi", "Hello"), (2, "Hello", "Hello")]
+        csv_source = self.prepare_csv_source(source_path, data, field_types, field_names)
+        t_env = self.t_env
+        t_env.register_table_source("Source", csv_source)
+        source = t_env.scan("Source")
+        field_types = [DataTypes.INT, DataTypes.INT, DataTypes.INT]
+        t_env.register_table_sink(
+            "Results",
+            field_names, field_types, source_sink_utils.TestAppendSink())
+
+        result = source.select("a").add_columns("a + 1 as b, a + 2 as c")
+        result.insert_into("Results")
+        t_env.execute()
+        actual = source_sink_utils.results()
+
+        expected = ['1,2,3', '2,3,4']
+        self.assert_equals(actual, expected)
+
+    def test_add_or_replace_columns(self):
+        source_path = os.path.join(self.tempdir + '/streaming.csv')
+        field_names = ["a", "b", "c"]
+        field_types = [DataTypes.INT, DataTypes.STRING, DataTypes.STRING]
+        data = [(1, "Hi", "Hello"), (2, "Hello", "Hello")]
+        csv_source = self.prepare_csv_source(source_path, data, field_types, field_names)
+        t_env = self.t_env
+        t_env.register_table_source("Source", csv_source)
+        source = t_env.scan("Source")
+        field_names = ["a", "b"]
+        field_types = [DataTypes.INT, DataTypes.INT]
+        t_env.register_table_sink(
+            "Results",
+            field_names, field_types, source_sink_utils.TestAppendSink())
+
+        result = source.select("a").add_or_replace_columns("a + 1 as b, a + 2 as a")
+        result.insert_into("Results")
+        t_env.execute()
+        actual = source_sink_utils.results()
+
+        expected = ['3,2', '4,3']
+        self.assert_equals(actual, expected)
+
+    def test_rename_columns(self):
+        source_path = os.path.join(self.tempdir + '/streaming.csv')
+        field_names = ["a", "b", "c"]
+        field_types = [DataTypes.INT, DataTypes.STRING, DataTypes.STRING]
+        data = [(1, "Hi", "Hello"), (2, "Hello", "Hello")]
+        csv_source = self.prepare_csv_source(source_path, data, field_types, field_names)
+        t_env = self.t_env
+        t_env.register_table_source("Source", csv_source)
+        source = t_env.scan("Source")
+        field_names = ["d", "e", "f"]
+        t_env.register_table_sink(
+            "Results",
+            field_names, field_types, source_sink_utils.TestAppendSink())
+
+        result = source.select("a, b, c").rename_columns("a as d, c as f, b as e").select("d, e, f")
+        result.insert_into("Results")
+        t_env.execute()
+        actual = source_sink_utils.results()
+
+        expected = ['1,Hi,Hello', '2,Hello,Hello']
+        self.assert_equals(actual, expected)
+
+    def test_drop_columns(self):
+        source_path = os.path.join(self.tempdir + '/streaming.csv')
+        field_names = ["a", "b", "c"]
+        field_types = [DataTypes.INT, DataTypes.STRING, DataTypes.STRING]
+        data = [(1, "Hi", "Hello"), (2, "Hello", "Hello")]
+        csv_source = self.prepare_csv_source(source_path, data, field_types, field_names)
+        t_env = self.t_env
+        t_env.register_table_source("Source", csv_source)
+        source = t_env.scan("Source")
+        field_names = ["b"]
+        field_types = [DataTypes.STRING]
+        t_env.register_table_sink(
+            "Results",
+            field_names, field_types, source_sink_utils.TestAppendSink())
+
+        result = source.select("a, b, c").drop_columns("a, c").select("b")
+        result.insert_into("Results")
+        t_env.execute()
+        actual = source_sink_utils.results()
+
+        expected = ['Hi', 'Hello']
+        self.assert_equals(actual, expected)
+
+
+if __name__ == '__main__':
+    import unittest
+
+    try:
+        import xmlrunner
+        testRunner = xmlrunner.XMLTestRunner(output='target/test-reports')
+    except ImportError:
+        testRunner = None
+    unittest.main(testRunner=testRunner, verbosity=2)
diff --git a/flink-python/pyflink/table/tests/test_calc.py b/flink-python/pyflink/table/tests/test_distinct.py
similarity index 68%
copy from flink-python/pyflink/table/tests/test_calc.py
copy to flink-python/pyflink/table/tests/test_distinct.py
index 70829dc..8f5b05a 100644
--- a/flink-python/pyflink/table/tests/test_calc.py
+++ b/flink-python/pyflink/table/tests/test_distinct.py
@@ -1,4 +1,4 @@
-# ###############################################################################
+################################################################################
 #  Licensed to the Apache Software Foundation (ASF) under one
 #  or more contributor license agreements.  See the NOTICE file
 #  distributed with this work for additional information
@@ -18,44 +18,34 @@
 
 import os
 
-from pyflink.table.table_source import CsvTableSource
 from pyflink.table.types import DataTypes
 from pyflink.testing import source_sink_utils
 from pyflink.testing.test_case_utils import PyFlinkStreamTableTestCase
 
 
-class TableTests(PyFlinkStreamTableTestCase):
+class StreamTableDistinctTests(PyFlinkStreamTableTestCase):
 
-    def test_select(self):
+    def test_distinct(self):
         source_path = os.path.join(self.tempdir + '/streaming.csv')
-        with open(source_path, 'w') as f:
-            lines = '1,hi,hello\n' + '2,hi,hello\n'
-            f.write(lines)
-            f.close()
-
         field_names = ["a", "b", "c"]
         field_types = [DataTypes.INT, DataTypes.STRING, DataTypes.STRING]
-
+        data = [(1, "Hi", "Hello"), (2, "Hello", "Hello"), (2, "Hello", "Hello")]
+        csv_source = self.prepare_csv_source(source_path, data, field_types, field_names)
         t_env = self.t_env
-
-        # register Orders table in table environment
-        t_env.register_table_source(
-            "Orders",
-            CsvTableSource(source_path, field_names, field_types))
-
+        t_env.register_table_source("Source", csv_source)
+        source = t_env.scan("Source")
+        field_names = ["a", "b"]
+        field_types = [DataTypes.INT, DataTypes.STRING]
         t_env.register_table_sink(
             "Results",
-            field_names, field_types, source_sink_utils.TestAppendSink())
-
-        t_env.scan("Orders") \
-             .where("a > 0") \
-             .select("a + 1, b, c") \
-             .insert_into("Results")
+            field_names, field_types, source_sink_utils.TestRetractSink())
 
+        result = source.distinct().select("a, c as b")
+        result.insert_into("Results")
         t_env.execute()
-
         actual = source_sink_utils.results()
-        expected = ['2,hi,hello', '3,hi,hello']
+
+        expected = ['1,Hello', '2,Hello']
         self.assert_equals(actual, expected)
 
 
diff --git a/flink-python/pyflink/table/tests/test_environment_completeness.py b/flink-python/pyflink/table/tests/test_environment_completeness.py
index bc0e8eb..136c7a0 100644
--- a/flink-python/pyflink/table/tests/test_environment_completeness.py
+++ b/flink-python/pyflink/table/tests/test_environment_completeness.py
@@ -38,8 +38,9 @@ class EnvironmentAPICompletenessTests(PythonAPICompletenessTestCase):
         # registerFunction and listUserDefinedFunctions should be supported when UDFs supported.
         # registerExternalCatalog, getRegisteredExternalCatalog and listTables
         # should be supported when catalog supported in python.
-        return {'registerExternalCatalog', 'getRegisteredExternalCatalog',
-                'registerFunction', 'listUserDefinedFunctions', 'listTables'}
+        # getCompletionHints has been deprecated. It will be removed in the next release.
+        return {'registerExternalCatalog', 'getRegisteredExternalCatalog', 'connect',
+                'registerFunction', 'listUserDefinedFunctions', 'listTables', 'getCompletionHints'}
 
 
 if __name__ == '__main__':
diff --git a/flink-python/pyflink/table/tests/test_join.py b/flink-python/pyflink/table/tests/test_join.py
new file mode 100644
index 0000000..eb524b6
--- /dev/null
+++ b/flink-python/pyflink/table/tests/test_join.py
@@ -0,0 +1,217 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+import os
+
+from pyflink.table.types import DataTypes
+from pyflink.testing import source_sink_utils
+from pyflink.testing.test_case_utils import PyFlinkStreamTableTestCase
+
+
+class StreamTableJoinTests(PyFlinkStreamTableTestCase):
+
+    def test_join_without_where(self):
+        source_path = os.path.join(self.tempdir + '/streaming.csv')
+        field_names = ["a", "b", "c"]
+        field_types = [DataTypes.INT, DataTypes.STRING, DataTypes.STRING]
+        data = [(1, "Hi", "Hello"), (2, "Hi", "Hello"), (3, "Hello", "Hello")]
+        csv_source = self.prepare_csv_source(source_path, data, field_types, field_names)
+        source_path2 = os.path.join(self.tempdir + '/streaming2.csv')
+        field_names2 = ["d", "e"]
+        field_types2 = [DataTypes.INT, DataTypes.STRING]
+        data2 = [(2, "Flink"), (3, "Python"), (3, "Flink")]
+        csv_source2 = self.prepare_csv_source(source_path2, data2, field_types2, field_names2)
+        t_env = self.t_env
+        t_env.register_table_source("Source1", csv_source)
+        t_env.register_table_source("Source2", csv_source2)
+        source1 = t_env.scan("Source1")
+        source2 = t_env.scan("Source2")
+        field_names = ["a", "b"]
+        field_types = [DataTypes.INT, DataTypes.STRING]
+        t_env.register_table_sink(
+            "Results",
+            field_names, field_types, source_sink_utils.TestRetractSink())
+
+        result = source1.join(source2, "a = d").select("a, b + e")
+        result.insert_into("Results")
+        t_env.execute()
+        actual = source_sink_utils.results()
+
+        expected = ['2,HiFlink', '3,HelloPython', '3,HelloFlink']
+        self.assert_equals(actual, expected)
+
+    def test_join_with_where(self):
+        source_path = os.path.join(self.tempdir + '/streaming.csv')
+        field_names = ["a", "b", "c"]
+        field_types = [DataTypes.INT, DataTypes.STRING, DataTypes.STRING]
+        data = [(1, "Hi", "Hello"), (2, "Hi", "Hello"), (3, "Hello", "Hello")]
+        csv_source = self.prepare_csv_source(source_path, data, field_types, field_names)
+        source_path2 = os.path.join(self.tempdir + '/streaming2.csv')
+        field_names2 = ["d", "e"]
+        field_types2 = [DataTypes.INT, DataTypes.STRING]
+        data2 = [(2, "Flink"), (3, "Python"), (3, "Flink")]
+        csv_source2 = self.prepare_csv_source(source_path2, data2, field_types2, field_names2)
+        t_env = self.t_env
+        t_env.register_table_source("Source1", csv_source)
+        t_env.register_table_source("Source2", csv_source2)
+        source1 = t_env.scan("Source1")
+        source2 = t_env.scan("Source2")
+        field_names = ["a", "b"]
+        field_types = [DataTypes.INT, DataTypes.STRING]
+        t_env.register_table_sink(
+            "Results",
+            field_names, field_types, source_sink_utils.TestRetractSink())
+
+        result = source1.join(source2).where("a = d").select("a, b + e")
+        result.insert_into("Results")
+        t_env.execute()
+        actual = source_sink_utils.results()
+
+        expected = ['2,HiFlink', '3,HelloPython', '3,HelloFlink']
+        self.assert_equals(actual, expected)
+
+    def test_left_outer_join_without_where(self):
+        source_path = os.path.join(self.tempdir + '/streaming.csv')
+        field_names = ["a", "b", "c"]
+        field_types = [DataTypes.INT, DataTypes.STRING, DataTypes.STRING]
+        data = [(1, "Hi", "Hello"), (2, "Hi", "Hello"), (3, "Hello", "Hello")]
+        csv_source = self.prepare_csv_source(source_path, data, field_types, field_names)
+        source_path2 = os.path.join(self.tempdir + '/streaming2.csv')
+        field_names2 = ["d", "e"]
+        field_types2 = [DataTypes.INT, DataTypes.STRING]
+        data2 = [(2, "Flink"), (3, "Python"), (3, "Flink")]
+        csv_source2 = self.prepare_csv_source(source_path2, data2, field_types2, field_names2)
+        t_env = self.t_env
+        t_env.register_table_source("Source1", csv_source)
+        t_env.register_table_source("Source2", csv_source2)
+        source1 = t_env.scan("Source1")
+        source2 = t_env.scan("Source2")
+        field_names = ["a", "b"]
+        field_types = [DataTypes.INT, DataTypes.STRING]
+        t_env.register_table_sink(
+            "Results",
+            field_names, field_types, source_sink_utils.TestRetractSink())
+
+        result = source1.left_outer_join(source2, "a = d").select("a, b + e")
+        result.insert_into("Results")
+        t_env.execute()
+        actual = source_sink_utils.results()
+
+        expected = ['1,null', '2,HiFlink', '3,HelloPython', '3,HelloFlink']
+        self.assert_equals(actual, expected)
+
+    def test_left_outer_join_with_where(self):
+        source_path = os.path.join(self.tempdir + '/streaming.csv')
+        field_names = ["a", "b", "c"]
+        field_types = [DataTypes.INT, DataTypes.STRING, DataTypes.STRING]
+        data = [(1, "Hi", "Hello"), (2, "Hi", "Hello"), (3, "Hello", "Hello")]
+        csv_source = self.prepare_csv_source(source_path, data, field_types, field_names)
+        source_path2 = os.path.join(self.tempdir + '/streaming2.csv')
+        field_names2 = ["d", "e"]
+        field_types2 = [DataTypes.INT, DataTypes.STRING]
+        data2 = [(2, "Flink"), (3, "Python"), (3, "Flink")]
+        csv_source2 = self.prepare_csv_source(source_path2, data2, field_types2, field_names2)
+        t_env = self.t_env
+        t_env.register_table_source("Source1", csv_source)
+        t_env.register_table_source("Source2", csv_source2)
+        source1 = t_env.scan("Source1")
+        source2 = t_env.scan("Source2")
+        field_names = ["a", "b"]
+        field_types = [DataTypes.INT, DataTypes.STRING]
+        t_env.register_table_sink(
+            "Results",
+            field_names, field_types, source_sink_utils.TestRetractSink())
+
+        result = source1.left_outer_join(source2).where("a = d").select("a, b + e")
+        result.insert_into("Results")
+        t_env.execute()
+        actual = source_sink_utils.results()
+
+        expected = ['2,HiFlink', '3,HelloPython', '3,HelloFlink']
+        self.assert_equals(actual, expected)
+
+    def test_right_outer_join(self):
+        source_path = os.path.join(self.tempdir + '/streaming.csv')
+        field_names = ["a", "b", "c"]
+        field_types = [DataTypes.INT, DataTypes.STRING, DataTypes.STRING]
+        data = [(1, "Hi", "Hello"), (2, "Hi", "Hello"), (3, "Hello", "Hello")]
+        csv_source = self.prepare_csv_source(source_path, data, field_types, field_names)
+        source_path2 = os.path.join(self.tempdir + '/streaming2.csv')
+        field_names2 = ["d", "e"]
+        field_types2 = [DataTypes.INT, DataTypes.STRING]
+        data2 = [(2, "Flink"), (3, "Python"), (4, "Flink")]
+        csv_source2 = self.prepare_csv_source(source_path2, data2, field_types2, field_names2)
+        t_env = self.t_env
+        t_env.register_table_source("Source1", csv_source)
+        t_env.register_table_source("Source2", csv_source2)
+        source1 = t_env.scan("Source1")
+        source2 = t_env.scan("Source2")
+        field_names = ["a", "b"]
+        field_types = [DataTypes.INT, DataTypes.STRING]
+        t_env.register_table_sink(
+            "Results",
+            field_names, field_types, source_sink_utils.TestRetractSink())
+
+        result = source1.right_outer_join(source2, "a = d").select("d, b + e")
+        result.insert_into("Results")
+        t_env.execute()
+        actual = source_sink_utils.results()
+
+        expected = ['2,HiFlink', '3,HelloPython', '4,null']
+        self.assert_equals(actual, expected)
+
+    def test_full_outer_join(self):
+        source_path = os.path.join(self.tempdir + '/streaming.csv')
+        field_names = ["a", "b", "c"]
+        field_types = [DataTypes.INT, DataTypes.STRING, DataTypes.STRING]
+        data = [(1, "Hi", "Hello"), (2, "Hi", "Hello"), (3, "Hello", "Hello")]
+        csv_source = self.prepare_csv_source(source_path, data, field_types, field_names)
+        source_path2 = os.path.join(self.tempdir + '/streaming2.csv')
+        field_names2 = ["d", "e"]
+        field_types2 = [DataTypes.INT, DataTypes.STRING]
+        data2 = [(2, "Flink"), (3, "Python"), (4, "Flink")]
+        csv_source2 = self.prepare_csv_source(source_path2, data2, field_types2, field_names2)
+        t_env = self.t_env
+        t_env.register_table_source("Source1", csv_source)
+        t_env.register_table_source("Source2", csv_source2)
+        source1 = t_env.scan("Source1")
+        source2 = t_env.scan("Source2")
+        field_names = ["a", "b", "c"]
+        field_types = [DataTypes.INT, DataTypes.INT, DataTypes.STRING]
+        t_env.register_table_sink(
+            "Results",
+            field_names, field_types, source_sink_utils.TestRetractSink())
+
+        result = source1.full_outer_join(source2, "a = d").select("a, d, b + e")
+        result.insert_into("Results")
+        t_env.execute()
+        actual = source_sink_utils.results()
+
+        expected = ['1,null,null', '2,2,HiFlink', '3,3,HelloPython', 'null,4,null']
+        self.assert_equals(actual, expected)
+
+
+if __name__ == '__main__':
+    import unittest
+
+    try:
+        import xmlrunner
+        testRunner = xmlrunner.XMLTestRunner(output='target/test-reports')
+    except ImportError:
+        testRunner = None
+    unittest.main(testRunner=testRunner, verbosity=2)
diff --git a/flink-python/pyflink/table/tests/test_calc.py b/flink-python/pyflink/table/tests/test_print_schema.py
similarity index 63%
copy from flink-python/pyflink/table/tests/test_calc.py
copy to flink-python/pyflink/table/tests/test_print_schema.py
index 70829dc..45fbc11 100644
--- a/flink-python/pyflink/table/tests/test_calc.py
+++ b/flink-python/pyflink/table/tests/test_print_schema.py
@@ -1,4 +1,4 @@
-# ###############################################################################
+################################################################################
 #  Licensed to the Apache Software Foundation (ASF) under one
 #  or more contributor license agreements.  See the NOTICE file
 #  distributed with this work for additional information
@@ -18,45 +18,30 @@
 
 import os
 
-from pyflink.table.table_source import CsvTableSource
 from pyflink.table.types import DataTypes
 from pyflink.testing import source_sink_utils
 from pyflink.testing.test_case_utils import PyFlinkStreamTableTestCase
 
 
-class TableTests(PyFlinkStreamTableTestCase):
+class StreamTableSchemaTests(PyFlinkStreamTableTestCase):
 
-    def test_select(self):
+    def test_print_schema(self):
         source_path = os.path.join(self.tempdir + '/streaming.csv')
-        with open(source_path, 'w') as f:
-            lines = '1,hi,hello\n' + '2,hi,hello\n'
-            f.write(lines)
-            f.close()
-
         field_names = ["a", "b", "c"]
         field_types = [DataTypes.INT, DataTypes.STRING, DataTypes.STRING]
-
+        data = [(1, "Hi", "Hello"), (2, "Hello", "Hello"), (2, "Hello", "Hello")]
+        csv_source = self.prepare_csv_source(source_path, data, field_types, field_names)
         t_env = self.t_env
-
-        # register Orders table in table environment
-        t_env.register_table_source(
-            "Orders",
-            CsvTableSource(source_path, field_names, field_types))
-
+        t_env.register_table_source("Source", csv_source)
+        source = t_env.scan("Source")
+        field_names = ["a", "b"]
+        field_types = [DataTypes.INT, DataTypes.STRING]
         t_env.register_table_sink(
             "Results",
-            field_names, field_types, source_sink_utils.TestAppendSink())
-
-        t_env.scan("Orders") \
-             .where("a > 0") \
-             .select("a + 1, b, c") \
-             .insert_into("Results")
-
-        t_env.execute()
+            field_names, field_types, source_sink_utils.TestRetractSink())
 
-        actual = source_sink_utils.results()
-        expected = ['2,hi,hello', '3,hi,hello']
-        self.assert_equals(actual, expected)
+        result = source.group_by("c").select("a.sum, c as b")
+        result.print_schema()
 
 
 if __name__ == '__main__':
diff --git a/flink-python/pyflink/table/tests/test_set_operation.py b/flink-python/pyflink/table/tests/test_set_operation.py
new file mode 100644
index 0000000..151e895
--- /dev/null
+++ b/flink-python/pyflink/table/tests/test_set_operation.py
@@ -0,0 +1,183 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+# #  distributed under the License is distributed on an "AS IS" BASIS,
+# #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# #  See the License for the specific language governing permissions and
+# # limitations under the License.
+################################################################################
+import os
+
+from pyflink.table.types import DataTypes
+from pyflink.testing import source_sink_utils
+
+from pyflink.testing.test_case_utils import PyFlinkBatchTableTestCase, PyFlinkStreamTableTestCase
+
+
+class StreamTableSetOperationTests(PyFlinkStreamTableTestCase):
+
+    def test_union_all(self):
+        source_path = os.path.join(self.tempdir + '/streaming.csv')
+        field_names = ["a", "b", "c"]
+        field_types = [DataTypes.INT, DataTypes.STRING, DataTypes.STRING]
+        data = [(1, "Hi", "Hello"), (2, "Hi", "Hello"), (3, "Hello", "Hello")]
+        csv_source = self.prepare_csv_source(source_path, data, field_types, field_names)
+        source_path2 = os.path.join(self.tempdir + '/streaming2.csv')
+        data2 = [(2, "Hi", "Hello"), (3, "Hello", "Python"), (4, "Hi", "Flink")]
+        csv_source2 = self.prepare_csv_source(source_path2, data2, field_types, field_names)
+        t_env = self.t_env
+        t_env.register_table_source("Source1", csv_source)
+        t_env.register_table_source("Source2", csv_source2)
+        source1 = t_env.scan("Source1")
+        source2 = t_env.scan("Source2")
+        field_names = ["a", "b", "c"]
+        field_types = [DataTypes.INT, DataTypes.STRING, DataTypes.STRING]
+        t_env.register_table_sink(
+            "Results",
+            field_names, field_types, source_sink_utils.TestAppendSink())
+
+        result = source1.union_all(source2)
+        result.insert_into("Results")
+        t_env.execute()
+
+        actual = source_sink_utils.results()
+        expected = ['1,Hi,Hello',
+                    '2,Hi,Hello',
+                    '2,Hi,Hello',
+                    '3,Hello,Hello',
+                    '3,Hello,Python',
+                    '4,Hi,Flink']
+        self.assert_equals(actual, expected)
+
+
+class BatchTableSetOperationTests(PyFlinkBatchTableTestCase):
+
+    def test_minus(self):
+        source_path = os.path.join(self.tempdir + '/streaming.csv')
+        field_names = ["a", "b", "c"]
+        field_types = [DataTypes.INT, DataTypes.STRING, DataTypes.STRING]
+        data = [(1, "Hi", "Hello"), (1, "Hi", "Hello"), (3, "Hello", "Hello")]
+        csv_source = self.prepare_csv_source(source_path, data, field_types, field_names)
+        source_path2 = os.path.join(self.tempdir + '/streaming2.csv')
+        data2 = [(3, "Hello", "Hello"), (3, "Hello", "Python"), (4, "Hi", "Flink")]
+        csv_source2 = self.prepare_csv_source(source_path2, data2, field_types, field_names)
+        t_env = self.t_env
+        t_env.register_table_source("Source1", csv_source)
+        t_env.register_table_source("Source2", csv_source2)
+        source1 = t_env.scan("Source1")
+        source2 = t_env.scan("Source2")
+
+        result = source1.minus(source2)
+        actual = self.collect(result)
+
+        expected = ['1,Hi,Hello']
+        self.assert_equals(actual, expected)
+
+    def test_minus_all(self):
+        source_path = os.path.join(self.tempdir + '/streaming.csv')
+        field_names = ["a", "b", "c"]
+        field_types = [DataTypes.INT, DataTypes.STRING, DataTypes.STRING]
+        data = [(1, "Hi", "Hello"), (1, "Hi", "Hello"), (3, "Hello", "Hello")]
+        csv_source = self.prepare_csv_source(source_path, data, field_types, field_names)
+        source_path2 = os.path.join(self.tempdir + '/streaming2.csv')
+        data2 = [(3, "Hello", "Hello"), (3, "Hello", "Python"), (4, "Hi", "Flink")]
+        csv_source2 = self.prepare_csv_source(source_path2, data2, field_types, field_names)
+        t_env = self.t_env
+        t_env.register_table_source("Source1", csv_source)
+        t_env.register_table_source("Source2", csv_source2)
+        source1 = t_env.scan("Source1")
+        source2 = t_env.scan("Source2")
+
+        result = source1.minus_all(source2)
+        actual = self.collect(result)
+
+        expected = ['1,Hi,Hello',
+                    '1,Hi,Hello']
+        self.assert_equals(actual, expected)
+
+    def test_union(self):
+        source_path = os.path.join(self.tempdir + '/streaming.csv')
+        field_names = ["a", "b", "c"]
+        field_types = [DataTypes.INT, DataTypes.STRING, DataTypes.STRING]
+        data = [(1, "Hi", "Hello"), (2, "Hi", "Hello"), (3, "Hello", "Hello")]
+        csv_source = self.prepare_csv_source(source_path, data, field_types, field_names)
+        source_path2 = os.path.join(self.tempdir + '/streaming2.csv')
+        data2 = [(2, "Hi", "Hello"), (3, "Hello", "Python"), (4, "Hi", "Flink")]
+        csv_source2 = self.prepare_csv_source(source_path2, data2, field_types, field_names)
+        t_env = self.t_env
+        t_env.register_table_source("Source1", csv_source)
+        t_env.register_table_source("Source2", csv_source2)
+        source1 = t_env.scan("Source1")
+        source2 = t_env.scan("Source2")
+
+        result = source1.union(source2)
+        actual = self.collect(result)
+
+        expected = ['1,Hi,Hello',
+                    '2,Hi,Hello',
+                    '3,Hello,Hello',
+                    '3,Hello,Python',
+                    '4,Hi,Flink']
+        self.assert_equals(actual, expected)
+
+    def test_intersect(self):
+        source_path = os.path.join(self.tempdir + '/streaming.csv')
+        field_names = ["a", "b", "c"]
+        field_types = [DataTypes.INT, DataTypes.STRING, DataTypes.STRING]
+        data = [(1, "Hi", "Hello"), (2, "Hi", "Hello"), (2, "Hi", "Hello")]
+        csv_source = self.prepare_csv_source(source_path, data, field_types, field_names)
+        source_path2 = os.path.join(self.tempdir + '/streaming2.csv')
+        data2 = [(2, "Hi", "Hello"), (2, "Hi", "Hello"), (4, "Hi", "Flink")]
+        csv_source2 = self.prepare_csv_source(source_path2, data2, field_types, field_names)
+        t_env = self.t_env
+        t_env.register_table_source("Source1", csv_source)
+        t_env.register_table_source("Source2", csv_source2)
+        source1 = t_env.scan("Source1")
+        source2 = t_env.scan("Source2")
+
+        result = source1.intersect(source2)
+        actual = self.collect(result)
+
+        expected = ['2,Hi,Hello']
+        self.assert_equals(actual, expected)
+
+    def test_intersect_all(self):
+        source_path = os.path.join(self.tempdir + '/streaming.csv')
+        field_names = ["a", "b", "c"]
+        field_types = [DataTypes.INT, DataTypes.STRING, DataTypes.STRING]
+        data = [(1, "Hi", "Hello"), (2, "Hi", "Hello"), (2, "Hi", "Hello")]
+        csv_source = self.prepare_csv_source(source_path, data, field_types, field_names)
+        source_path2 = os.path.join(self.tempdir + '/streaming2.csv')
+        data2 = [(2, "Hi", "Hello"), (2, "Hi", "Hello"), (4, "Hi", "Flink")]
+        csv_source2 = self.prepare_csv_source(source_path2, data2, field_types, field_names)
+        t_env = self.t_env
+        t_env.register_table_source("Source1", csv_source)
+        t_env.register_table_source("Source2", csv_source2)
+        source1 = t_env.scan("Source1")
+        source2 = t_env.scan("Source2")
+
+        result = source1.intersect_all(source2)
+        actual = self.collect(result)
+
+        expected = ['2,Hi,Hello', '2,Hi,Hello']
+        self.assert_equals(actual, expected)
+
+
+if __name__ == '__main__':
+    import unittest
+
+    try:
+        import xmlrunner
+        testRunner = xmlrunner.XMLTestRunner(output='target/test-reports')
+    except ImportError:
+        testRunner = None
+    unittest.main(testRunner=testRunner, verbosity=2)
diff --git a/flink-python/pyflink/table/tests/test_environment_completeness.py b/flink-python/pyflink/table/tests/test_sort.py
similarity index 56%
copy from flink-python/pyflink/table/tests/test_environment_completeness.py
copy to flink-python/pyflink/table/tests/test_sort.py
index bc0e8eb..f916b93 100644
--- a/flink-python/pyflink/table/tests/test_environment_completeness.py
+++ b/flink-python/pyflink/table/tests/test_sort.py
@@ -16,30 +16,29 @@
 # limitations under the License.
 ################################################################################
 
-from pyflink.testing.test_case_utils import PythonAPICompletenessTestCase
-from pyflink.table import TableEnvironment
-
-
-class EnvironmentAPICompletenessTests(PythonAPICompletenessTestCase):
-    """
-    Tests whether the Python :class:`TableEnvironment` is consistent with
-    Java `org.apache.flink.table.api.TableEnvironment`.
-    """
-    @classmethod
-    def python_class(cls):
-        return TableEnvironment
-
-    @classmethod
-    def java_class(cls):
-        return "org.apache.flink.table.api.TableEnvironment"
-
-    @classmethod
-    def excluded_methods(cls):
-        # registerFunction and listUserDefinedFunctions should be supported when UDFs supported.
-        # registerExternalCatalog, getRegisteredExternalCatalog and listTables
-        # should be supported when catalog supported in python.
-        return {'registerExternalCatalog', 'getRegisteredExternalCatalog',
-                'registerFunction', 'listUserDefinedFunctions', 'listTables'}
+import os
+
+from pyflink.table.types import DataTypes
+from pyflink.testing.test_case_utils import PyFlinkBatchTableTestCase
+
+
+class BatchTableSortTests(PyFlinkBatchTableTestCase):
+
+    def test_order_by_offset_fetch(self):
+        source_path = os.path.join(self.tempdir + '/streaming.csv')
+        field_names = ["a", "b"]
+        field_types = [DataTypes.INT, DataTypes.STRING]
+        data = [(1, "Hello"), (2, "Hello"), (3, "Flink"), (4, "Python")]
+        csv_source = self.prepare_csv_source(source_path, data, field_types, field_names)
+        t_env = self.t_env
+        t_env.register_table_source("Source", csv_source)
+        source = t_env.scan("Source")
+
+        result = source.order_by("a.desc").offset(2).fetch(2).select("a, b")
+        actual = self.collect(result)
+
+        expected = ['2,Hello', '1,Hello']
+        self.assert_equals(actual, expected)
 
 
 if __name__ == '__main__':
diff --git a/flink-python/pyflink/table/tests/test_table_completeness.py b/flink-python/pyflink/table/tests/test_table_completeness.py
index 05d8075..3e366f0 100644
--- a/flink-python/pyflink/table/tests/test_table_completeness.py
+++ b/flink-python/pyflink/table/tests/test_table_completeness.py
@@ -37,7 +37,11 @@ class TableAPICompletenessTests(PythonAPICompletenessTestCase):
     @classmethod
     def excluded_methods(cls):
         # row-based operators should be supported when UDFs supported in python.
-        return {'map', 'flatMap', 'flatAggregate',  'aggregate'}
+        # getSchema method returns a TableSchema, the implementation of TableSchema requires a
+        # complete type system, which does not exist currently. It will be implemented after
+        # FLINK-12408 is merged. So we exclude this method for the time being.
+        return {'map', 'flatMap', 'flatAggregate',  'aggregate', 'leftOuterJoinLateral',
+                'createTemporalTableFunction', 'joinLateral', 'getTableOperation', 'getSchema'}
 
     @classmethod
     def java_method_name(cls, python_method_name):
diff --git a/flink-python/pyflink/table/tests/test_table_environment_api.py b/flink-python/pyflink/table/tests/test_table_environment_api.py
new file mode 100644
index 0000000..0abe244
--- /dev/null
+++ b/flink-python/pyflink/table/tests/test_table_environment_api.py
@@ -0,0 +1,428 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+# #  distributed under the License is distributed on an "AS IS" BASIS,
+# #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# #  See the License for the specific language governing permissions and
+# # limitations under the License.
+################################################################################
+import datetime
+import os
+import tempfile
+
+from py4j.compat import unicode
+from pyflink.table.table_environment import TableEnvironment
+from pyflink.table.table_config import TableConfig
+from pyflink.table.table_sink import CsvTableSink
+from pyflink.table.types import DataTypes
+from pyflink.testing import source_sink_utils
+from pyflink.testing.test_case_utils import PyFlinkBatchTableTestCase, PyFlinkStreamTableTestCase
+
+
+class StreamTableEnvironmentTests(PyFlinkStreamTableTestCase):
+
+    def test_register_scan(self):
+        source_path = os.path.join(self.tempdir + '/streaming.csv')
+        field_names = ["a", "b", "c"]
+        field_types = [DataTypes.INT, DataTypes.STRING, DataTypes.STRING]
+        data = [(1, "Hi", "Hello"), (2, "Hello", "Hello")]
+        csv_source = self.prepare_csv_source(source_path, data, field_types, field_names)
+        t_env = self.t_env
+        t_env.register_table_source("Source", csv_source)
+        field_names = ["a", "b", "c"]
+        field_types = [DataTypes.INT, DataTypes.STRING, DataTypes.STRING]
+        t_env.register_table_sink(
+            "Results",
+            field_names, field_types, source_sink_utils.TestAppendSink())
+
+        result = t_env.scan("Source")
+        result.insert_into("Results")
+        t_env.execute()
+        actual = source_sink_utils.results()
+
+        expected = ['1,Hi,Hello', '2,Hello,Hello']
+        self.assert_equals(actual, expected)
+
+    def test_register_table_source_sink(self):
+        source_path = os.path.join(self.tempdir + '/streaming.csv')
+        field_names = ["a", "b", "c"]
+        field_types = [DataTypes.INT, DataTypes.STRING, DataTypes.STRING]
+        data = [(1, "Hi", "Hello")]
+        csv_source = self.prepare_csv_source(source_path, data, field_types, field_names)
+        t_env = self.t_env
+
+        t_env.register_table_source("Orders", csv_source)
+        t_env.register_table_sink(
+            "Sinks",
+            field_names, field_types, source_sink_utils.TestAppendSink())
+        t_env.scan("Orders").insert_into("Sinks")
+        t_env.execute()
+        actual = source_sink_utils.results()
+
+        expected = ['1,Hi,Hello']
+        self.assert_equals(actual, expected)
+
+    def test_from_table_source(self):
+        source_path = os.path.join(self.tempdir + '/streaming.csv')
+        field_names = ["a", "b", "c"]
+        field_types = [DataTypes.INT, DataTypes.STRING, DataTypes.STRING]
+        data = [(1, "Hi", "Hello"), (2, "Hi", "Hello")]
+        csv_source = self.prepare_csv_source(source_path, data, field_types, field_names)
+        t_env = self.t_env
+        t_env.register_table_sink(
+            "Sinks",
+            field_names, field_types, source_sink_utils.TestAppendSink())
+
+        source = t_env.from_table_source(csv_source)
+        source.insert_into("Sinks")
+        t_env.execute()
+        actual = source_sink_utils.results()
+
+        expected = ['1,Hi,Hello', '2,Hi,Hello']
+        self.assert_equals(actual, expected)
+
+    def test_list_tables(self):
+        source_path = os.path.join(self.tempdir + '/streaming.csv')
+        field_names = ["a", "b", "c"]
+        field_types = [DataTypes.INT, DataTypes.STRING, DataTypes.STRING]
+        data = []
+        csv_source = self.prepare_csv_source(source_path, data, field_types, field_names)
+        t_env = self.t_env
+        t_env.register_table_source("Orders", csv_source)
+        t_env.register_table_sink(
+            "Sinks",
+            field_names, field_types, source_sink_utils.TestAppendSink())
+        t_env.register_table_sink(
+            "Results",
+            field_names, field_types, source_sink_utils.TestAppendSink())
+
+        actual = t_env.list_tables()
+
+        expected = ['Orders', 'Results', 'Sinks']
+        self.assert_equals(actual, expected)
+
+    def test_explain(self):
+        source_path = os.path.join(self.tempdir + '/streaming.csv')
+        field_names = ["a", "b", "c"]
+        field_types = [DataTypes.INT, DataTypes.STRING, DataTypes.STRING]
+        data = []
+        csv_source = self.prepare_csv_source(source_path, data, field_types, field_names)
+        t_env = self.t_env
+        t_env.register_table_source("Source", csv_source)
+        source = t_env.scan("Source")
+        result = source.alias("a, b, c").select("1 + a, b, c")
+
+        actual = t_env.explain(result)
+
+        assert isinstance(actual, str) or isinstance(actual, unicode)
+
+    def test_sql_query(self):
+        source_path = os.path.join(self.tempdir + '/streaming.csv')
+        field_names = ["a", "b", "c"]
+        field_types = [DataTypes.INT, DataTypes.STRING, DataTypes.STRING]
+        data = [(1, "Hi", "Hello"), (2, "Hello", "Hello")]
+        csv_source = self.prepare_csv_source(source_path, data, field_types, field_names)
+        t_env = self.t_env
+        t_env.register_table_source("Source", csv_source)
+        source = t_env.scan("Source")
+        t_env.register_table_sink(
+            "sinks",
+            field_names, field_types, source_sink_utils.TestAppendSink())
+
+        result = t_env.sql_query("select a + 1, b, c from %s" % source)
+        result.insert_into("sinks")
+        t_env.execute()
+        actual = source_sink_utils.results()
+
+        expected = ['2,Hi,Hello', '3,Hello,Hello']
+        self.assert_equals(actual, expected)
+
+    def test_sql_update(self):
+        source_path = os.path.join(self.tempdir + '/streaming.csv')
+        field_names = ["a", "b", "c"]
+        field_types = [DataTypes.INT, DataTypes.STRING, DataTypes.STRING]
+        data = [(1, "Hi", "Hello"), (2, "Hello", "Hello")]
+        csv_source = self.prepare_csv_source(source_path, data, field_types, field_names)
+        t_env = self.t_env
+        t_env.register_table_source("source", csv_source)
+        t_env.register_table_sink(
+            "sinks",
+            field_names, field_types, source_sink_utils.TestAppendSink())
+
+        t_env.sql_update("insert into sinks select * from source")
+        t_env.execute("test_sql_job")
+
+        actual = source_sink_utils.results()
+        expected = ['1,Hi,Hello', '2,Hello,Hello']
+        self.assert_equals(actual, expected)
+
+    def test_sql_update_with_query_config(self):
+        source_path = os.path.join(self.tempdir + '/streaming.csv')
+        field_names = ["a", "b", "c"]
+        field_types = [DataTypes.INT, DataTypes.STRING, DataTypes.STRING]
+        data = [(1, "Hi", "Hello"), (2, "Hello", "Hello")]
+        csv_source = self.prepare_csv_source(source_path, data, field_types, field_names)
+        t_env = self.t_env
+        t_env.register_table_source("source", csv_source)
+        t_env.register_table_sink(
+            "sinks",
+            field_names, field_types, source_sink_utils.TestAppendSink())
+        query_config = t_env.query_config()
+        query_config.with_idle_state_retention_time(
+            datetime.timedelta(days=1), datetime.timedelta(days=2))
+
+        t_env.sql_update("insert into sinks select * from source", query_config)
+        t_env.execute("test_sql_job")
+
+        actual = source_sink_utils.results()
+        expected = ['1,Hi,Hello', '2,Hello,Hello']
+        self.assert_equals(actual, expected)
+
+    def test_query_config(self):
+        t_env = self.t_env
+        query_config = t_env.query_config()
+
+        query_config.with_idle_state_retention_time(
+            datetime.timedelta(days=1), datetime.timedelta(days=2))
+
+        assert query_config.get_max_idle_state_retention_time() == 2 * 24 * 3600 * 1000
+        assert query_config.get_min_idle_state_retention_time() == 24 * 3600 * 1000
+
+    def test_table_config(self):
+
+        table_config = TableConfig.Builder()\
+            .as_streaming_execution()\
+            .set_timezone("Asia/Shanghai")\
+            .set_max_generated_code_length(64000)\
+            .set_null_check(True)\
+            .set_parallelism(4).build()
+
+        assert table_config.parallelism() == 4
+        assert table_config.null_check() is True
+        assert table_config.max_generated_code_length() == 64000
+        assert table_config.timezone() == "Asia/Shanghai"
+        assert table_config.is_stream() is True
+
+    def test_create_table_environment(self):
+        table_config = TableConfig.Builder()\
+            .set_parallelism(2)\
+            .set_max_generated_code_length(32000)\
+            .set_null_check(False)\
+            .set_timezone("Asia/Shanghai")\
+            .as_streaming_execution()\
+            .build()
+
+        t_env = TableEnvironment.create(table_config)
+
+        readed_table_config = t_env.get_config()
+        assert readed_table_config.parallelism() == 2
+        assert readed_table_config.null_check() is False
+        assert readed_table_config.max_generated_code_length() == 32000
+        assert readed_table_config.timezone() == "Asia/Shanghai"
+        assert readed_table_config.is_stream() is True
+
+
+class BatchTableEnvironmentTests(PyFlinkBatchTableTestCase):
+
+    def test_register_scan(self):
+        source_path = os.path.join(self.tempdir + '/streaming.csv')
+        field_names = ["a", "b", "c"]
+        field_types = [DataTypes.INT, DataTypes.STRING, DataTypes.STRING]
+        data = [(1, "Hi", "Hello"), (2, "Hello", "Hello")]
+        csv_source = self.prepare_csv_source(source_path, data, field_types, field_names)
+        t_env = self.t_env
+        t_env.register_table_source("Source", csv_source)
+
+        result = t_env.scan("Source")
+        actual = self.collect(result)
+
+        expected = ['1,Hi,Hello', '2,Hello,Hello']
+        self.assert_equals(actual, expected)
+
+    def test_register_table_source_sink(self):
+        source_path = os.path.join(self.tempdir + '/streaming.csv')
+        field_names = ["a", "b", "c"]
+        field_types = [DataTypes.INT, DataTypes.STRING, DataTypes.STRING]
+        data = [(1, "Hi", "Hello")]
+        csv_source = self.prepare_csv_source(source_path, data, field_types, field_names)
+        t_env = self.t_env
+        tmp_dir = tempfile.gettempdir()
+        tmp_csv = tmp_dir + '/streaming2.csv'
+        if os.path.isfile(tmp_csv):
+            os.remove(tmp_csv)
+
+        t_env.register_table_source("Orders", csv_source)
+        t_env.register_table_sink(
+            "Results",
+            field_names, field_types, CsvTableSink(tmp_csv))
+        t_env.scan("Orders").insert_into("Results")
+        t_env.execute()
+
+        with open(tmp_csv, 'r') as f:
+            lines = f.read()
+            assert lines == '1,Hi,Hello\n'
+
+    def test_from_table_source(self):
+        source_path = os.path.join(self.tempdir + '/streaming.csv')
+        field_names = ["a", "b", "c"]
+        field_types = [DataTypes.INT, DataTypes.STRING, DataTypes.STRING]
+        data = [(1, "Hi", "Hello"), (2, "Hi", "Hello")]
+        csv_source = self.prepare_csv_source(source_path, data, field_types, field_names)
+        t_env = self.t_env
+
+        source = t_env.from_table_source(csv_source)
+        actual = self.collect(source)
+
+        expected = ['1,Hi,Hello', '2,Hi,Hello']
+        self.assert_equals(actual, expected)
+
+    def test_list_tables(self):
+        source_path = os.path.join(self.tempdir + '/streaming.csv')
+        field_names = ["a", "b", "c"]
+        field_types = [DataTypes.INT, DataTypes.STRING, DataTypes.STRING]
+        data = []
+        csv_source = self.prepare_csv_source(source_path, data, field_types, field_names)
+        t_env = self.t_env
+        t_env.register_table_source("Orders", csv_source)
+        tmp_dir = tempfile.gettempdir()
+        tmp_csv = tmp_dir + '/streaming2.csv'
+        t_env.register_table_sink(
+            "Sinks",
+            field_names, field_types, CsvTableSink(tmp_csv))
+        t_env.register_table_sink(
+            "Results",
+            field_names, field_types, CsvTableSink(tmp_csv))
+
+        actual = t_env.list_tables()
+
+        expected = ['Orders', 'Results', 'Sinks']
+        self.assert_equals(actual, expected)
+
+    def test_explain(self):
+        source_path = os.path.join(self.tempdir + '/streaming.csv')
+        field_names = ["a", "b", "c"]
+        field_types = [DataTypes.INT, DataTypes.STRING, DataTypes.STRING]
+        data = []
+        csv_source = self.prepare_csv_source(source_path, data, field_types, field_names)
+        t_env = self.t_env
+        t_env.register_table_source("Source", csv_source)
+        source = t_env.scan("Source")
+        result = source.alias("a, b, c").select("1 + a, b, c")
+
+        actual = t_env.explain(result)
+
+        assert isinstance(actual, str) or isinstance(actual, unicode)
+
+    def test_sql_query(self):
+        source_path = os.path.join(self.tempdir + '/streaming.csv')
+        field_names = ["a", "b", "c"]
+        field_types = [DataTypes.INT, DataTypes.STRING, DataTypes.STRING]
+        data = [(1, "Hi", "Hello"), (2, "Hello", "Hello")]
+        csv_source = self.prepare_csv_source(source_path, data, field_types, field_names)
+        t_env = self.t_env
+        t_env.register_table_source("Source", csv_source)
+        source = t_env.scan("Source")
+        tmp_dir = tempfile.gettempdir()
+        tmp_csv = tmp_dir + '/streaming2.csv'
+        if os.path.isfile(tmp_csv):
+            os.remove(tmp_csv)
+        t_env.register_table_sink(
+            "sinks",
+            field_names, field_types, CsvTableSink(tmp_csv))
+
+        result = t_env.sql_query("select a + 1, b, c from %s" % source)
+        result.insert_into("sinks")
+        t_env.execute()
+
+        with open(tmp_csv, 'r') as f:
+            lines = f.read()
+            assert lines == '2,Hi,Hello\n' + '3,Hello,Hello\n'
+
+    def test_sql_update(self):
+        source_path = os.path.join(self.tempdir + '/streaming.csv')
+        field_names = ["a", "b", "c"]
+        field_types = [DataTypes.INT, DataTypes.STRING, DataTypes.STRING]
+        data = [(1, "Hi", "Hello"), (2, "Hello", "Hello")]
+        csv_source = self.prepare_csv_source(source_path, data, field_types, field_names)
+        t_env = self.t_env
+        t_env.register_table_source("source", csv_source)
+        tmp_dir = tempfile.gettempdir()
+        tmp_csv = tmp_dir + '/streaming2.csv'
+        if os.path.isfile(tmp_csv):
+            os.remove(tmp_csv)
+        t_env.register_table_sink(
+            "sinks",
+            field_names, field_types, CsvTableSink(tmp_csv))
+
+        t_env.sql_update("insert into sinks select * from source")
+        t_env.execute("test_sql_job")
+
+        with open(tmp_csv, 'r') as f:
+            lines = f.read()
+            assert lines == '1,Hi,Hello\n' + '2,Hello,Hello\n'
+
+    def test_sql_update_with_query_config(self):
+        source_path = os.path.join(self.tempdir + '/streaming.csv')
+        field_names = ["a", "b", "c"]
+        field_types = [DataTypes.INT, DataTypes.STRING, DataTypes.STRING]
+        data = [(1, "Hi", "Hello"), (2, "Hello", "Hello")]
+        csv_source = self.prepare_csv_source(source_path, data, field_types, field_names)
+        t_env = self.t_env
+        t_env.register_table_source("source", csv_source)
+        tmp_dir = tempfile.gettempdir()
+        tmp_csv = tmp_dir + '/streaming2.csv'
+        if os.path.isfile(tmp_csv):
+            os.remove(tmp_csv)
+        t_env.register_table_sink(
+            "sinks",
+            field_names, field_types, CsvTableSink(tmp_csv))
+        query_config = t_env.query_config()
+
+        t_env.sql_update("insert into sinks select * from source", query_config)
+        t_env.execute("test_sql_job")
+
+        with open(tmp_csv, 'r') as f:
+            lines = f.read()
+            assert lines == '1,Hi,Hello\n' + '2,Hello,Hello\n'
+
+    def test_table_config(self):
+
+        table_config = TableConfig.Builder()\
+            .as_batch_execution()\
+            .set_timezone("Asia/Shanghai")\
+            .set_max_generated_code_length(64000)\
+            .set_null_check(True)\
+            .set_parallelism(4).build()
+
+        assert table_config.parallelism() == 4
+        assert table_config.null_check() is True
+        assert table_config.max_generated_code_length() == 64000
+        assert table_config.timezone() == "Asia/Shanghai"
+        assert table_config.is_stream() is False
+
+    def test_create_table_environment(self):
+        table_config = TableConfig.Builder()\
+            .set_parallelism(2)\
+            .set_max_generated_code_length(32000)\
+            .set_null_check(False)\
+            .set_timezone("Asia/Shanghai")\
+            .as_batch_execution()\
+            .build()
+
+        t_env = TableEnvironment.create(table_config)
+
+        readed_table_config = t_env.get_config()
+        assert readed_table_config.parallelism() == 2
+        assert readed_table_config.null_check() is False
+        assert readed_table_config.max_generated_code_length() == 32000
+        assert readed_table_config.timezone() == "Asia/Shanghai"
+        assert readed_table_config.is_stream() is False
diff --git a/flink-python/pyflink/table/tests/test_window.py b/flink-python/pyflink/table/tests/test_window.py
new file mode 100644
index 0000000..32d5305
--- /dev/null
+++ b/flink-python/pyflink/table/tests/test_window.py
@@ -0,0 +1,111 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+import os
+
+from py4j.protocol import Py4JJavaError
+
+from pyflink.table.window import Session, Slide, Tumble
+from pyflink.table import Over
+from pyflink.table.types import DataTypes
+from pyflink.testing.test_case_utils import PyFlinkStreamTableTestCase, PyFlinkBatchTableTestCase
+
+
+class StreamTableWindowTests(PyFlinkStreamTableTestCase):
+
+    def test_over_window(self):
+        source_path = os.path.join(self.tempdir + '/streaming.csv')
+        field_names = ["a", "b", "c"]
+        field_types = [DataTypes.LONG, DataTypes.INT, DataTypes.STRING]
+        data = [(1, 1, "Hello"), (2, 2, "Hello"), (3, 4, "Hello"), (4, 8, "Hello")]
+        csv_source = self.prepare_csv_source(source_path, data, field_types, field_names)
+        t_env = self.t_env
+        t_env.register_table_source("Source", csv_source)
+        source = t_env.scan("Source")
+
+        result = source.over_window(Over.partition_by("c").order_by("a")
+                                    .preceding("2.rows").following("current_row").alias("w"))
+
+        self.assertRaisesRegexp(
+            Py4JJavaError, "Ordering must be defined on a time attribute",
+            result.select, "b.sum over w")
+
+
+class BatchTableWindowTests(PyFlinkBatchTableTestCase):
+
+    def test_tumble_window(self):
+        source_path = os.path.join(self.tempdir + '/streaming.csv')
+        field_names = ["a", "b", "c"]
+        field_types = [DataTypes.LONG, DataTypes.INT, DataTypes.STRING]
+        data = [(1, 1, "Hello"), (2, 2, "Hello"), (3, 4, "Hello"), (4, 8, "Hello")]
+        csv_source = self.prepare_csv_source(source_path, data, field_types, field_names)
+        t_env = self.t_env
+        t_env.register_table_source("Source", csv_source)
+        source = t_env.scan("Source")
+
+        result = source.window(Tumble.over("2.rows").on("a").alias("w"))\
+            .group_by("w, c").select("b.sum")
+        actual = self.collect(result)
+
+        expected = ['3', '12']
+        self.assert_equals(actual, expected)
+
+    def test_slide_window(self):
+        source_path = os.path.join(self.tempdir + '/streaming.csv')
+        field_names = ["a", "b", "c"]
+        field_types = [DataTypes.LONG, DataTypes.INT, DataTypes.STRING]
+        data = [(1000, 1, "Hello"), (2000, 2, "Hello"), (3000, 4, "Hello"), (4000, 8, "Hello")]
+        csv_source = self.prepare_csv_source(source_path, data, field_types, field_names)
+        t_env = self.t_env
+        t_env.register_table_source("Source", csv_source)
+        source = t_env.scan("Source")
+
+        result = source.window(Slide.over("2.seconds").every("1.seconds").on("a").alias("w"))\
+            .group_by("w, c").select("b.sum")
+        actual = self.collect(result)
+
+        expected = ['1', '3', '6', '12', '8']
+        self.assert_equals(actual, expected)
+
+    def test_session_window(self):
+        source_path = os.path.join(self.tempdir + '/streaming.csv')
+        field_names = ["a", "b", "c"]
+        field_types = [DataTypes.LONG, DataTypes.INT, DataTypes.STRING]
+        data = [(1000, 1, "Hello"), (2000, 2, "Hello"), (4000, 4, "Hello"), (5000, 8, "Hello")]
+        csv_source = self.prepare_csv_source(source_path, data, field_types, field_names)
+        t_env = self.t_env
+        t_env.register_table_source("Source", csv_source)
+        source = t_env.scan("Source")
+
+        result = source.window(Session.with_gap("1.seconds").on("a").alias("w"))\
+            .group_by("w, c").select("b.sum")
+        actual = self.collect(result)
+
+        expected = ['3', '12']
+        self.assert_equals(actual, expected)
+
+
+if __name__ == '__main__':
+    import unittest
+
+    try:
+        import xmlrunner
+        testRunner = xmlrunner.XMLTestRunner(output='target/test-reports')
+    except ImportError:
+        testRunner = None
+    unittest.main(testRunner=testRunner, verbosity=2)
diff --git a/flink-python/pyflink/table/window.py b/flink-python/pyflink/table/window.py
new file mode 100644
index 0000000..96e495e
--- /dev/null
+++ b/flink-python/pyflink/table/window.py
@@ -0,0 +1,449 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from py4j.java_gateway import get_method
+from pyflink.java_gateway import get_gateway
+
+__all__ = [
+    'Tumble',
+    'Session',
+    'Slide',
+    'Over',
+    'GroupWindow',
+    'OverWindow'
+]
+
+
+class GroupWindow(object):
+    """
+    A group window specification.
+
+    Group windows group rows based on time or row-count intervals and is therefore essentially a
+    special type of groupBy. Just like groupBy, group windows allow to compute aggregates
+    on groups of elements.
+
+    Infinite streaming tables can only be grouped into time or row intervals. Hence window
+    grouping is required to apply aggregations on streaming tables.
+
+    For finite batch tables, group windows provide shortcuts for time-based groupBy.
+    """
+
+    def __init__(self, java_window):
+        self._java_window = java_window
+
+
+class Tumble(object):
+    """
+    Helper class for creating a tumbling window. Tumbling windows are consecutive, non-overlapping
+    windows of a specified fixed length. For example, a tumbling window of 5 minutes size groups
+    elements in 5 minutes intervals.
+
+    Example:
+    ::
+        >>> Tumble.over("10.minutes").on("rowtime").alias("w")
+    """
+
+    @classmethod
+    def over(cls, size):
+        """
+        Creates a tumbling window. Tumbling windows are fixed-size, consecutive, non-overlapping
+        windows of a specified fixed length. For example, a tumbling window of 5 minutes size
+        groups elements in 5 minutes intervals.
+
+        :param size: The size of the window as time or row-count interval.
+        :return: A partially defined tumbling window.
+        """
+        # type: (str) -> TumbleWithSize
+        return TumbleWithSize(
+            get_gateway().jvm.Tumble.over(size))
+
+
+class TumbleWithSize(object):
+    """
+    Tumbling window.
+
+    For streaming tables you can specify grouping by a event-time or processing-time attribute.
+
+    For batch tables you can specify grouping on a timestamp or long attribute.
+    """
+
+    def __init__(self, java_window):
+        self._java_window = java_window
+
+    def on(self, time_field):
+        """
+        Specifies the time attribute on which rows are grouped.
+
+        For streaming tables you can specify grouping by a event-time or processing-ti
+        attribute.
+
+        For batch tables you can specify grouping on a timestamp or long attribute.
+
+        :param time_field: Time attribute for streaming and batch tables.
+        :return: A tumbling window on event-time/processing-time.
+        """
+        # type: (str) -> TumbleWithSizeOnTime
+        return TumbleWithSizeOnTime(self._java_window.on(time_field))
+
+
+class TumbleWithSizeOnTime(object):
+    """
+    Tumbling window on time. You need to assign an alias for the window.
+    """
+
+    def __init__(self, java_window):
+        self._java_window = java_window
+
+    def alias(self, alias):
+        """
+        Assigns an alias for this window that the following
+        :func:`~pyflink.table.GroupWindowedTable.group_by` and
+        :func:`~pyflink.table.WindowGroupedTable.select` clause can refer to.
+        :func:`~pyflink.table.WindowGroupedTable.select` statement can access window properties
+        such as window start or end time.
+
+        :param alias: Alias for this window.
+        :return: This window.
+        """
+        # type: (str) -> GroupWindow
+        return GroupWindow(get_method(self._java_window, "as")(alias))
+
+
+class Session(object):
+    """
+    Helper class for creating a session window. The boundary of session windows are defined by
+    intervals of inactivity, i.e., a session window is closes if no event appears for a defined
+    gap period.
+
+    Example:
+    ::
+        >>> Session.with_gap("10.minutes").on("rowtime").alias("w")
+
+    """
+
+    @classmethod
+    def with_gap(cls, gap):
+        """
+        Creates a session window. The boundary of session windows are defined by
+        intervals of inactivity, i.e., a session window is closes if no event appears for a defined
+        gap period.
+
+        :param gap: Specifies how long (as interval of milliseconds) to wait for new data before
+                    closing the session window.
+        :return: A partially defined session window.
+        """
+        # type: (str) -> SessionWithGap
+        return SessionWithGap(
+            get_gateway().jvm.Session.withGap(gap))
+
+
+class SessionWithGap(object):
+    """
+    Session window.
+
+    For streaming tables you can specify grouping by a event-time or processing-time attribute.
+
+    For batch tables you can specify grouping on a timestamp or long attribute.
+    """
+
+    def __init__(self, java_window):
+        self._java_window = java_window
+
+    def on(self, time_field):
+        """
+        Specifies the time attribute on which rows are grouped.
+
+        For streaming tables you can specify grouping by a event-time or processing-time
+        attribute.
+
+        For batch tables you can specify grouping on a timestamp or long attribute.
+
+        :param time_field: Time attribute for streaming and batch tables.
+        :return: A tumbling window on event-time.
+        """
+        # type: (str) -> SessionWithGapOnTime
+        return SessionWithGapOnTime(self._java_window.on(time_field))
+
+
+class SessionWithGapOnTime(object):
+    """
+    Session window on time. You need to assign an alias for the window.
+    """
+
+    def __init__(self, java_window):
+        self._java_window = java_window
+
+    def alias(self, alias):
+        """
+        Assigns an alias for this window that the following
+        :func:`~pyflink.table.GroupWindowedTable.group_by` and
+        :func:`~pyflink.table.WindowGroupedTable.select` clause can refer to.
+        :func:`~pyflink.table.WindowGroupedTable.select` statement can access window properties
+        such as window start or end time.
+
+        :param alias: Alias for this window.
+        :return: This window.
+        """
+        # type: (str) -> GroupWindow
+        return GroupWindow(get_method(self._java_window, "as")(alias))
+
+
+class Slide(object):
+    """
+    Helper class for creating a sliding window. Sliding windows have a fixed size and slide by
+    a specified slide interval. If the slide interval is smaller than the window size, sliding
+    windows are overlapping. Thus, an element can be assigned to multiple windows.
+
+    For example, a sliding window of size 15 minutes with 5 minutes sliding interval groups
+    elements of 15 minutes and evaluates every five minutes. Each element is contained in three
+    consecutive window evaluations.
+
+    Example:
+    ::
+        >>> Slide.over("10.minutes").every("5.minutes").on("rowtime").alias("w")
+    """
+
+    @classmethod
+    def over(cls, size):
+        """
+        Creates a sliding window. Sliding windows have a fixed size and slide by
+        a specified slide interval. If the slide interval is smaller than the window size, sliding
+        windows are overlapping. Thus, an element can be assigned to multiple windows.
+
+        For example, a sliding window of size 15 minutes with 5 minutes sliding interval groups
+        elements of 15 minutes and evaluates every five minutes. Each element is contained in three
+        consecutive window evaluations.
+
+        :param size: The size of the window as time or row-count interval.
+        :return: A partially specified sliding window.
+        """
+        # type: (str) -> SlideWithSize
+        return SlideWithSize(
+            get_gateway().jvm.Slide.over(size))
+
+
+class SlideWithSize(object):
+    """
+    Partially specified sliding window. The size of the window either as time or row-count
+    interval.
+    """
+
+    def __init__(self, java_window):
+        self._java_window = java_window
+
+    def every(self, slide):
+        """
+        Specifies the window's slide as time or row-count interval.
+
+        The slide determines the interval in which windows are started. Hence, sliding windows can
+        overlap if the slide is smaller than the size of the window.
+
+        For example, you could have windows of size 15 minutes that slide by 3 minutes. With this
+        15 minutes worth of elements are grouped every 3 minutes and each row contributes to 5
+        windows.
+
+        :param slide: The slide of the window either as time or row-count interval.
+        :return: A sliding window.
+        """
+        # type: (str) -> SlideWithSizeAndSlide
+        return SlideWithSizeAndSlide(self._java_window.every(slide))
+
+
+class SlideWithSizeAndSlide(object):
+    """
+    Sliding window. The size of the window either as time or row-count interval.
+
+    For streaming tables you can specify grouping by a event-time or processing-time attribute.
+
+    For batch tables you can specify grouping on a timestamp or long attribute.
+    """
+
+    def __init__(self, java_window):
+        self._java_window = java_window
+
+    def on(self, time_field):
+        """
+        Specifies the time attribute on which rows are grouped.
+
+        For streaming tables you can specify grouping by a event-time or processing-time
+        attribute.
+
+        For batch tables you can specify grouping on a timestamp or long attribute.
+        """
+        # type: (str) -> SlideWithSizeAndSlideOnTime
+        return SlideWithSizeAndSlideOnTime(self._java_window.on(time_field))
+
+
+class SlideWithSizeAndSlideOnTime(object):
+    """
+    Sliding window on time. You need to assign an alias for the window.
+    """
+
+    def __init__(self, java_window):
+        self._java_window = java_window
+
+    def alias(self, alias):
+        """
+        Assigns an alias for this window that the following
+        :func:`~pyflink.table.GroupWindowedTable.group_by` and
+        :func:`~pyflink.table.WindowGroupedTable.select` clause can refer to.
+        :func:`~pyflink.table.WindowGroupedTable.select` statement can access window properties
+        such as window start or end time.
+
+        :param alias: Alias for this window.
+        :return: This window.
+        """
+        # type: (str) -> GroupWindow
+        return GroupWindow(
+            get_method(self._java_window, "as")(alias))
+
+
+class Over(object):
+    """
+    Helper class for creating an over window. Similar to SQL, over window aggregates compute an
+    aggregate for each input row over a range of its neighboring rows.
+
+    Over-windows for batch tables are currently not supported.
+
+    Example:
+    ::
+        >>> Over.partition_by("a").order_by("rowtime").preceding("unbounded_range").alias("w")
+    """
+
+    @classmethod
+    def order_by(cls, order_by):
+        """
+        Specifies the time attribute on which rows are ordered.
+
+        For streaming tables, reference a rowtime or proctime time attribute here
+        to specify the time mode.
+
+        :param order_by: Field reference.
+        :return: An over window with defined order.
+        """
+        # type: (str) -> OverWindowPartitionedOrdered
+        return OverWindowPartitionedOrdered(get_gateway().jvm.Over.orderBy(order_by))
+
+    @classmethod
+    def partition_by(cls, partition_by):
+        """
+        Partitions the elements on some partition keys.
+
+        Each partition is individually sorted and aggregate functions are applied to each
+        partition separately.
+
+        :param partition_by: List of field references.
+        :return: An over window with defined partitioning.
+        """
+        # type: (str) -> OverWindowPartitioned
+        return OverWindowPartitioned(get_gateway().jvm.Over.partitionBy(partition_by))
+
+
+class OverWindowPartitionedOrdered(object):
+    """
+    Partially defined over window with (optional) partitioning and order.
+    """
+
+    def __init__(self, java_over_window):
+        self._java_over_window = java_over_window
+
+    def alias(self, alias):
+        """
+        Set the preceding offset (based on time or row-count intervals) for over window.
+
+        :param alias: Preceding offset relative to the current row.
+        :return: An over window with defined preceding.
+        """
+        # type: (str) -> OverWindow
+        return OverWindow(get_method(self._java_over_window, "as")(alias))
+
+    def preceding(self, preceding):
+        """
+        Set the preceding offset (based on time or row-count intervals) for over window.
+
+        :param preceding: Preceding offset relative to the current row.
+        :return: An over window with defined preceding.
+        """
+        # type: (str) -> OverWindowPartitionedOrderedPreceding
+        return OverWindowPartitionedOrderedPreceding(
+            self._java_over_window.preceding(preceding))
+
+
+class OverWindowPartitionedOrderedPreceding(object):
+    """
+    Partially defined over window with (optional) partitioning, order, and preceding.
+    """
+
+    def __init__(self, java_over_window):
+        self._java_over_window = java_over_window
+
+    def alias(self, alias):
+        """
+        Assigns an alias for this window that the following
+        :func:`~pyflink.table.OverWindowedTable.select` clause can refer to.
+
+        :param alias: Alias for this over window.
+        :return: The fully defined over window.
+        """
+        # type: (str) -> OverWindow
+        return OverWindow(get_method(self._java_over_window, "as")(alias))
+
+    def following(self, following):
+        """
+        Set the following offset (based on time or row-count intervals) for over window.
+
+        :param following: Following offset that relative to the current row.
+        :return: An over window with defined following.
+        """
+        # type: (str) -> OverWindowPartitionedOrderedPreceding
+        return OverWindowPartitionedOrderedPreceding(
+            self._java_over_window.following(following))
+
+
+class OverWindowPartitioned(object):
+    """
+    Partially defined over window with partitioning.
+    """
+
+    def __init__(self, java_over_window):
+        self._java_over_window = java_over_window
+
+    def order_by(self, order_by):
+        """
+        Specifies the time attribute on which rows are ordered.
+
+        For streaming tables, reference a rowtime or proctime time attribute here
+        to specify the time mode.
+
+        For batch tables, refer to a timestamp or long attribute.
+
+        :param order_by: Field reference.
+        :return: An over window with defined order.
+        """
+        return OverWindowPartitionedOrdered(self._java_over_window.orderBy(order_by))
+
+
+class OverWindow(object):
+    """
+    An over window specification.
+
+    Similar to SQL, over window aggregates compute an aggregate for each input row over a range
+    of its neighboring rows.
+    """
+
+    def __init__(self, java_over_window):
+        self._java_over_window = java_over_window
diff --git a/flink-python/pyflink/testing/test_case_utils.py b/flink-python/pyflink/testing/test_case_utils.py
index dd05daf..78819db 100644
--- a/flink-python/pyflink/testing/test_case_utils.py
+++ b/flink-python/pyflink/testing/test_case_utils.py
@@ -24,6 +24,9 @@ import tempfile
 import unittest
 from abc import abstractmethod
 
+from py4j.java_gateway import JavaObject
+from pyflink.table.table_source import CsvTableSource
+
 from pyflink.find_flink_home import _find_flink_home
 from pyflink.table import TableEnvironment, TableConfig
 from pyflink.java_gateway import get_gateway
@@ -59,9 +62,13 @@ class PyFlinkTestCase(unittest.TestCase):
 
     @classmethod
     def assert_equals(cls, actual, expected):
-        actual_py_list = cls.to_py_list(actual)
+        if isinstance(actual, JavaObject):
+            actual_py_list = cls.to_py_list(actual)
+        else:
+            actual_py_list = actual
         actual_py_list.sort()
         expected.sort()
+        assert len(actual_py_list) == len(expected)
         assert all(x == y for x, y in zip(actual_py_list, expected))
 
     @classmethod
@@ -71,6 +78,21 @@ class PyFlinkTestCase(unittest.TestCase):
             py_list.append(actual.apply(i))
         return py_list
 
+    @classmethod
+    def prepare_csv_source(cls, path, data, data_types, fields):
+        if os.path.isfile(path):
+            os.remove(path)
+        csv_data = ""
+        for item in data:
+            if isinstance(item, list) or isinstance(item, tuple):
+                csv_data += ",".join([str(element) for element in item]) + "\n"
+            else:
+                csv_data += str(item) + "\n"
+        with open(path, 'w') as f:
+            f.write(csv_data)
+            f.close()
+        return CsvTableSource(path, fields, data_types)
+
 
 class PyFlinkStreamTableTestCase(PyFlinkTestCase):
     """
@@ -79,8 +101,8 @@ class PyFlinkStreamTableTestCase(PyFlinkTestCase):
 
     def setUp(self):
         super(PyFlinkStreamTableTestCase, self).setUp()
-        self.t_config = TableConfig.Builder().as_streaming_execution().set_parallelism(4).build()
-        self.t_env = TableEnvironment.get_table_environment(self.t_config)
+        self.t_config = TableConfig.Builder().as_streaming_execution().set_parallelism(1).build()
+        self.t_env = TableEnvironment.create(self.t_config)
 
 
 class PyFlinkBatchTableTestCase(PyFlinkTestCase):
@@ -90,8 +112,16 @@ class PyFlinkBatchTableTestCase(PyFlinkTestCase):
 
     def setUp(self):
         super(PyFlinkBatchTableTestCase, self).setUp()
-        self.t_config = TableConfig.Builder().as_batch_execution().set_parallelism(4).build()
-        self.t_env = TableEnvironment.get_table_environment(self.t_config)
+        self.t_config = TableConfig.Builder().as_batch_execution().set_parallelism(1).build()
+        self.t_env = TableEnvironment.create(self.t_config)
+
+    def collect(self, table):
+        j_table = table._j_table
+        gateway = get_gateway()
+        row_result = self.t_env._j_tenv\
+            .toDataSet(j_table, gateway.jvm.Class.forName("org.apache.flink.types.Row")).collect()
+        string_result = [java_row.toString() for java_row in row_result]
+        return string_result
 
 
 class PythonAPICompletenessTestCase(unittest.TestCase):
@@ -125,10 +155,8 @@ class PythonAPICompletenessTestCase(unittest.TestCase):
         python_methods = cls.get_python_class_methods(cls.python_class())
         missing_methods = java_methods - python_methods - cls.excluded_methods()
         if len(missing_methods) > 0:
-            print(missing_methods)
-            print('The Exception should be raised after FLINK-12407 is merged.')
-            # raise Exception('Methods: %s in Java class %s have not been added in Python class %s.'
-            #                % (missing_methods, cls.java_class(), cls.python_class()))
+            raise Exception('Methods: %s in Java class %s have not been added in Python class %s.'
+                            % (missing_methods, cls.java_class(), cls.python_class()))
 
     @classmethod
     def java_method_name(cls, python_method_name):