You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ji...@apache.org on 2020/09/04 01:17:52 UTC

[flink] branch master updated: [FLINK-19118][python] Support Expression in the operations of Python Table API (#13304)

This is an automated email from the ASF dual-hosted git repository.

jincheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new a8cc62a  [FLINK-19118][python] Support Expression in the operations of Python Table API (#13304)
a8cc62a is described below

commit a8cc62a901dabe6c4d877b97db6024715b68174a
Author: Dian Fu <di...@apache.org>
AuthorDate: Fri Sep 4 09:15:58 2020 +0800

    [FLINK-19118][python] Support Expression in the operations of Python Table API (#13304)
---
 .../pyflink/table/examples/batch/word_count.py     |   7 +-
 flink-python/pyflink/table/table.py                | 341 +++++++++++++--------
 flink-python/pyflink/table/table_environment.py    |  74 ++---
 flink-python/pyflink/table/tests/test_aggregate.py |   2 +-
 flink-python/pyflink/table/tests/test_calc.py      |   9 +-
 .../pyflink/table/tests/test_column_operation.py   |   8 +-
 flink-python/pyflink/table/tests/test_correlate.py |   9 +-
 .../pyflink/table/tests/test_dependency.py         |  27 +-
 flink-python/pyflink/table/tests/test_explain.py   |   3 +-
 flink-python/pyflink/table/tests/test_join.py      |  12 +-
 .../pyflink/table/tests/test_pandas_conversion.py  |   6 +-
 .../pyflink/table/tests/test_pandas_udf.py         |   5 +-
 .../pyflink/table/tests/test_schema_operation.py   |   4 +-
 flink-python/pyflink/table/tests/test_sort.py      |   2 +-
 .../table/tests/test_table_environment_api.py      |   4 +-
 flink-python/pyflink/table/tests/test_window.py    |  21 +-
 flink-python/pyflink/table/utils.py                |  11 +-
 flink-python/pyflink/table/window.py               | 108 ++++---
 18 files changed, 380 insertions(+), 273 deletions(-)

diff --git a/flink-python/pyflink/table/examples/batch/word_count.py b/flink-python/pyflink/table/examples/batch/word_count.py
index 1a5bf2c..3f317af 100644
--- a/flink-python/pyflink/table/examples/batch/word_count.py
+++ b/flink-python/pyflink/table/examples/batch/word_count.py
@@ -23,6 +23,7 @@ import tempfile
 
 from pyflink.dataset import ExecutionEnvironment
 from pyflink.table import BatchTableEnvironment, TableConfig
+from pyflink.table import expressions as expr
 
 
 def word_count():
@@ -65,9 +66,9 @@ def word_count():
     t_env.execute_sql(sink_ddl)
 
     elements = [(word, 1) for word in content.split(" ")]
-    t_env.from_elements(elements, ["word", "count"]) \
-         .group_by("word") \
-         .select("word, count(1) as count") \
+    table = t_env.from_elements(elements, ["word", "count"])
+    table.group_by(table.word) \
+         .select(table.word, expr.lit(1).count.alias('count')) \
          .insert_into("Results")
 
     t_env.execute("word_count")
diff --git a/flink-python/pyflink/table/table.py b/flink-python/pyflink/table/table.py
index 2369aaf..10f3a3d 100644
--- a/flink-python/pyflink/table/table.py
+++ b/flink-python/pyflink/table/table.py
@@ -19,13 +19,18 @@
 import warnings
 
 from py4j.java_gateway import get_method
+from typing import Union
 
 from pyflink.java_gateway import get_gateway
+from pyflink.table import ExplainDetail
+from pyflink.table.expression import Expression, _get_java_expression
+from pyflink.table.expressions import col
 from pyflink.table.serializers import ArrowSerializer
 from pyflink.table.table_result import TableResult
 from pyflink.table.table_schema import TableSchema
 from pyflink.table.types import create_arrow_schema
-from pyflink.table.utils import tz_convert_from_internal
+from pyflink.table.utils import tz_convert_from_internal, to_expression_jarray
+from pyflink.table.window import OverWindow, GroupWindow
 
 from pyflink.util.utils import to_jarray
 from pyflink.util.utils import to_j_explain_detail_arr
@@ -66,7 +71,25 @@ class Table(object):
         self._j_table = j_table
         self._t_env = t_env
 
-    def select(self, fields):
+    def __str__(self):
+        return self._j_table.toString()
+
+    def __getattr__(self, name) -> Expression:
+        """
+        Returns the :class:`Expression` of the column `name`.
+
+        Example:
+        ::
+
+            >>> tab.select(tab.a)
+        """
+        if name not in self.get_schema().get_field_names():
+            raise AttributeError(
+                "The current table has no column named '%s', available columns: [%s]"
+                % (name, ', '.join(self.get_schema().get_field_names())))
+        return col(name)
+
+    def select(self, *fields: Union[str, Expression]):
         """
         Performs a selection operation. Similar to a SQL SELECT statement. The field expressions
         can contain complex expressions.
@@ -74,29 +97,35 @@ class Table(object):
         Example:
         ::
 
+            >>> from pyflink.table import expressions as expr
+            >>> tab.select(tab.key, expr.concat(tab.value, 'hello'))
+            >>> tab.select(expr.col('key'), expr.concat(expr.col('value'), 'hello'))
+
             >>> tab.select("key, value + 'hello'")
 
-        :param fields: Expression string.
-        :type fields: str
         :return: The result table.
         :rtype: pyflink.table.Table
         """
-        return Table(self._j_table.select(fields), self._t_env)
+        if all(isinstance(f, Expression) for f in fields):
+            return Table(self._j_table.select(to_expression_jarray(fields)), self._t_env)
+        else:
+            assert len(fields) == 1
+            assert isinstance(fields[0], str)
+            return Table(self._j_table.select(fields[0]), self._t_env)
 
-    def alias(self, field, *fields):
+    def alias(self, field: str, *fields: str):
         """
         Renames the fields of the expression result. Use this to disambiguate fields before
-        joining to operations.
+        joining two tables.
 
         Example:
         ::
 
-            >>> tab.alias("a", "b")
+            >>> tab.alias("a", "b", "c")
+            >>> tab.alias("a, b, c")
 
         :param field: Field alias.
-        :type field: str
         :param fields: Additional field aliases.
-        :type fields: str
         :return: The result table.
         :rtype: pyflink.table.Table
         """
@@ -104,7 +133,7 @@ class Table(object):
         extra_fields = to_jarray(gateway.jvm.String, fields)
         return Table(get_method(self._j_table, "as")(field, extra_fields), self._t_env)
 
-    def filter(self, predicate):
+    def filter(self, predicate: Union[str, Expression[bool]]):
         """
         Filters out elements that don't pass the filter predicate. Similar to a SQL WHERE
         clause.
@@ -112,16 +141,16 @@ class Table(object):
         Example:
         ::
 
+            >>> tab.filter(tab.name == 'Fred')
             >>> tab.filter("name = 'Fred'")
 
         :param predicate: Predicate expression string.
-        :type predicate: str
         :return: The result table.
         :rtype: pyflink.table.Table
         """
-        return Table(self._j_table.filter(predicate), self._t_env)
+        return Table(self._j_table.filter(_get_java_expression(predicate)), self._t_env)
 
-    def where(self, predicate):
+    def where(self, predicate: Union[str, Expression[bool]]):
         """
         Filters out elements that don't pass the filter predicate. Similar to a SQL WHERE
         clause.
@@ -129,16 +158,16 @@ class Table(object):
         Example:
         ::
 
+            >>> tab.where(tab.name == 'Fred')
             >>> tab.where("name = 'Fred'")
 
         :param predicate: Predicate expression string.
-        :type predicate: str
         :return: The result table.
         :rtype: pyflink.table.Table
         """
-        return Table(self._j_table.where(predicate), self._t_env)
+        return Table(self._j_table.where(_get_java_expression(predicate)), self._t_env)
 
-    def group_by(self, fields):
+    def group_by(self, *fields: Union[str, Expression]):
         """
         Groups the elements on some grouping keys. Use this before a selection with aggregations
         to perform the aggregation on a per-group basis. Similar to a SQL GROUP BY statement.
@@ -146,14 +175,19 @@ class Table(object):
         Example:
         ::
 
+            >>> tab.group_by(tab.key).select(tab.key, tab.value.avg)
             >>> tab.group_by("key").select("key, value.avg")
 
         :param fields: Group keys.
-        :type fields: str
         :return: The grouped table.
         :rtype: pyflink.table.GroupedTable
         """
-        return GroupedTable(self._j_table.groupBy(fields), self._t_env)
+        if all(isinstance(f, Expression) for f in fields):
+            return GroupedTable(self._j_table.groupBy(to_expression_jarray(fields)), self._t_env)
+        else:
+            assert len(fields) == 1
+            assert isinstance(fields[0], str)
+            return GroupedTable(self._j_table.groupBy(fields[0]), self._t_env)
 
     def distinct(self):
         """
@@ -162,14 +196,14 @@ class Table(object):
         Example:
         ::
 
-            >>> tab.select("key, value").distinct()
+            >>> tab.select(tab.key, tab.value).distinct()
 
         :return: The result table.
         :rtype: pyflink.table.Table
         """
         return Table(self._j_table.distinct(), self._t_env)
 
-    def join(self, right, join_predicate=None):
+    def join(self, right: 'Table', join_predicate: Union[str, Expression[bool]] = None):
         """
         Joins two :class:`~pyflink.table.Table`. Similar to a SQL join. The fields of the two joined
         operations must not overlap, use :func:`~pyflink.table.Table.alias` to rename fields if
@@ -183,22 +217,22 @@ class Table(object):
         Example:
         ::
 
-            >>> left.join(right).where("a = b && c > 3").select("a, b, d")
-            >>> left.join(right, "a = b")
+            >>> left.join(right).where((left.a == right.b) && (left.c > 3))
+            >>> left.join(right).where("a = b && c > 3")
+            >>> left.join(right, left.a == right.b)
 
         :param right: Right table.
-        :type right: pyflink.table.Table
         :param join_predicate: Optional, the join predicate expression string.
-        :type join_predicate: str
         :return: The result table.
         :rtype: pyflink.table.Table
         """
         if join_predicate is not None:
-            return Table(self._j_table.join(right._j_table, join_predicate), self._t_env)
+            return Table(self._j_table.join(
+                right._j_table, _get_java_expression(join_predicate)), self._t_env)
         else:
             return Table(self._j_table.join(right._j_table), self._t_env)
 
-    def left_outer_join(self, right, join_predicate=None):
+    def left_outer_join(self, right: 'Table', join_predicate: Union[str, Expression[bool]] = None):
         """
         Joins two :class:`~pyflink.table.Table`. Similar to a SQL left outer join. The fields of
         the two joined operations must not overlap, use :func:`~pyflink.table.Table.alias` to
@@ -212,22 +246,22 @@ class Table(object):
         Example:
         ::
 
-            >>> left.left_outer_join(right).select("a, b, d")
-            >>> left.left_outer_join(right, "a = b").select("a, b, d")
+            >>> left.left_outer_join(right)
+            >>> left.left_outer_join(right, left.a == right.b)
+            >>> left.left_outer_join(right, "a = b")
 
         :param right: Right table.
-        :type right: pyflink.table.Table
         :param join_predicate: Optional, the join predicate expression string.
-        :type join_predicate: str
         :return: The result table.
         :rtype: pyflink.table.Table
         """
         if join_predicate is None:
             return Table(self._j_table.leftOuterJoin(right._j_table), self._t_env)
         else:
-            return Table(self._j_table.leftOuterJoin(right._j_table, join_predicate), self._t_env)
+            return Table(self._j_table.leftOuterJoin(
+                right._j_table, _get_java_expression(join_predicate)), self._t_env)
 
-    def right_outer_join(self, right, join_predicate):
+    def right_outer_join(self, right: 'Table', join_predicate: Union[str, Expression[bool]]):
         """
         Joins two :class:`~pyflink.table.Table`. Similar to a SQL right outer join. The fields of
         the two joined operations must not overlap, use :func:`~pyflink.table.Table.alias` to
@@ -241,18 +275,18 @@ class Table(object):
         Example:
         ::
 
-            >>> left.right_outer_join(right, "a = b").select("a, b, d")
+            >>> left.right_outer_join(right, left.a == right.b)
+            >>> left.right_outer_join(right, "a = b")
 
         :param right: Right table.
-        :type right: pyflink.table.Table
         :param join_predicate: The join predicate expression string.
-        :type join_predicate: str
         :return: The result table.
         :rtype: pyflink.table.Table
         """
-        return Table(self._j_table.rightOuterJoin(right._j_table, join_predicate), self._t_env)
+        return Table(self._j_table.rightOuterJoin(
+            right._j_table, _get_java_expression(join_predicate)), self._t_env)
 
-    def full_outer_join(self, right, join_predicate):
+    def full_outer_join(self, right: 'Table', join_predicate: Union[str, Expression[bool]]):
         """
         Joins two :class:`~pyflink.table.Table`. Similar to a SQL full outer join. The fields of
         the two joined operations must not overlap, use :func:`~pyflink.table.Table.alias` to
@@ -266,18 +300,20 @@ class Table(object):
         Example:
         ::
 
-            >>> left.full_outer_join(right, "a = b").select("a, b, d")
+            >>> left.full_outer_join(right, left.a == right.b)
+            >>> left.full_outer_join(right, "a = b")
 
         :param right: Right table.
-        :type right: pyflink.table.Table
         :param join_predicate: The join predicate expression string.
-        :type join_predicate: str
         :return: The result table.
         :rtype: pyflink.table.Table
         """
-        return Table(self._j_table.fullOuterJoin(right._j_table, join_predicate), self._t_env)
+        return Table(self._j_table.fullOuterJoin(
+            right._j_table, _get_java_expression(join_predicate)), self._t_env)
 
-    def join_lateral(self, table_function_call, join_predicate=None):
+    def join_lateral(self,
+                     table_function_call: Union[str, Expression],
+                     join_predicate: Union[str, Expression[bool]] = None):
         """
         Joins this Table with an user-defined TableFunction. This join is similar to a SQL inner
         join but works with a table function. Each row of the table is joined with the rows
@@ -290,21 +326,27 @@ class Table(object):
            ...     "java.table.function.class.name")
             >>> tab.join_lateral("split(text, ' ') as (b)", "a = b")
 
+            >>> from pyflink.table import expressions as expr
+            >>> tab.join_lateral(expr.call('split', ' ').alias('b'), expr.col('a') == expr.col('b'))
+
         :param table_function_call: An expression representing a table function call.
-        :type table_function_call: str
         :param join_predicate: Optional, The join predicate expression string, join ON TRUE if not
                                exist.
-        :type join_predicate: str
         :return: The result Table.
         :rtype: pyflink.table.Table
         """
         if join_predicate is None:
-            return Table(self._j_table.joinLateral(table_function_call), self._t_env)
+            return Table(self._j_table.joinLateral(
+                _get_java_expression(table_function_call)), self._t_env)
         else:
-            return Table(self._j_table.joinLateral(table_function_call, join_predicate),
-                         self._t_env)
+            return Table(self._j_table.joinLateral(
+                _get_java_expression(table_function_call),
+                _get_java_expression(join_predicate)),
+                self._t_env)
 
-    def left_outer_join_lateral(self, table_function_call, join_predicate=None):
+    def left_outer_join_lateral(self,
+                                table_function_call: Union[str, Expression],
+                                join_predicate: Union[str, Expression[bool]] = None):
         """
         Joins this Table with an user-defined TableFunction. This join is similar to
         a SQL left outer join but works with a table function. Each row of the table is joined
@@ -317,22 +359,25 @@ class Table(object):
             >>> t_env.create_java_temporary_system_function("split",
             ...     "java.table.function.class.name")
             >>> tab.left_outer_join_lateral("split(text, ' ') as (b)")
+            >>> from pyflink.table import expressions as expr
+            >>> tab.left_outer_join_lateral(expr.call('split', ' ').alias('b'))
 
         :param table_function_call: An expression representing a table function call.
-        :type table_function_call: str
         :param join_predicate: Optional, The join predicate expression string, join ON TRUE if not
                                exist.
-        :type join_predicate: str
         :return: The result Table.
         :rtype: pyflink.table.Table
         """
         if join_predicate is None:
-            return Table(self._j_table.leftOuterJoinLateral(table_function_call), self._t_env)
+            return Table(self._j_table.leftOuterJoinLateral(
+                _get_java_expression(table_function_call)), self._t_env)
         else:
-            return Table(self._j_table.leftOuterJoinLateral(table_function_call, join_predicate),
-                         self._t_env)
+            return Table(self._j_table.leftOuterJoinLateral(
+                _get_java_expression(table_function_call),
+                _get_java_expression(join_predicate)),
+                self._t_env)
 
-    def minus(self, right):
+    def minus(self, right: 'Table'):
         """
         Minus of two :class:`~pyflink.table.Table` with duplicate records removed.
         Similar to a SQL EXCEPT clause. Minus returns records from the left table that do not
@@ -349,13 +394,12 @@ class Table(object):
             >>> left.minus(right)
 
         :param right: Right table.
-        :type right: pyflink.table.Table
         :return: The result table.
         :rtype: pyflink.table.Table
         """
         return Table(self._j_table.minus(right._j_table), self._t_env)
 
-    def minus_all(self, right):
+    def minus_all(self, right: 'Table'):
         """
         Minus of two :class:`~pyflink.table.Table`. Similar to a SQL EXCEPT ALL.
         Similar to a SQL EXCEPT ALL clause. MinusAll returns the records that do not exist in
@@ -373,13 +417,12 @@ class Table(object):
             >>> left.minus_all(right)
 
         :param right: Right table.
-        :type right: pyflink.table.Table
         :return: The result table.
         :rtype: pyflink.table.Table
         """
         return Table(self._j_table.minusAll(right._j_table), self._t_env)
 
-    def union(self, right):
+    def union(self, right: 'Table'):
         """
         Unions two :class:`~pyflink.table.Table` with duplicate records removed.
         Similar to a SQL UNION. The fields of the two union operations must fully overlap.
@@ -394,13 +437,12 @@ class Table(object):
             >>> left.union(right)
 
         :param right: Right table.
-        :type right: pyflink.table.Table
         :return: The result table.
         :rtype: pyflink.table.Table
         """
         return Table(self._j_table.union(right._j_table), self._t_env)
 
-    def union_all(self, right):
+    def union_all(self, right: 'Table'):
         """
         Unions two :class:`~pyflink.table.Table`. Similar to a SQL UNION ALL. The fields of the
         two union operations must fully overlap.
@@ -415,13 +457,12 @@ class Table(object):
             >>> left.union_all(right)
 
         :param right: Right table.
-        :type right: pyflink.table.Table
         :return: The result table.
         :rtype: pyflink.table.Table
         """
         return Table(self._j_table.unionAll(right._j_table), self._t_env)
 
-    def intersect(self, right):
+    def intersect(self, right: 'Table'):
         """
         Intersects two :class:`~pyflink.table.Table` with duplicate records removed. Intersect
         returns records that exist in both tables. If a record is present in one or both tables
@@ -439,13 +480,12 @@ class Table(object):
             >>> left.intersect(right)
 
         :param right: Right table.
-        :type right: pyflink.table.Table
         :return: The result table.
         :rtype: pyflink.table.Table
         """
         return Table(self._j_table.intersect(right._j_table), self._t_env)
 
-    def intersect_all(self, right):
+    def intersect_all(self, right: 'Table'):
         """
         Intersects two :class:`~pyflink.table.Table`. IntersectAll returns records that exist in
         both tables. If a record is present in both tables more than once, it is returned as many
@@ -463,13 +503,12 @@ class Table(object):
             >>> left.intersect_all(right)
 
         :param right: Right table.
-        :type right: pyflink.table.Table
         :return: The result table.
         :rtype: pyflink.table.Table
         """
         return Table(self._j_table.intersectAll(right._j_table), self._t_env)
 
-    def order_by(self, fields):
+    def order_by(self, *fields: Union[str, Expression]):
         """
         Sorts the given :class:`~pyflink.table.Table`. Similar to SQL ORDER BY.
         The resulting Table is sorted globally sorted across all parallel partitions.
@@ -477,19 +516,24 @@ class Table(object):
         Example:
         ::
 
+            >>> tab.order_by(tab.name.desc)
             >>> tab.order_by("name.desc")
 
         For unbounded tables, this operation requires a sorting on a time attribute or a subsequent
         fetch operation.
 
         :param fields: Order fields expression string.
-        :type fields: str
         :return: The result table.
         :rtype: pyflink.table.Table
         """
-        return Table(self._j_table.orderBy(fields), self._t_env)
+        if all(isinstance(f, Expression) for f in fields):
+            return Table(self._j_table.orderBy(to_expression_jarray(fields)), self._t_env)
+        else:
+            assert len(fields) == 1
+            assert isinstance(fields[0], str)
+            return Table(self._j_table.orderBy(fields[0]), self._t_env)
 
-    def offset(self, offset):
+    def offset(self, offset: int):
         """
         Limits a (possibly sorted) result from an offset position.
 
@@ -501,20 +545,20 @@ class Table(object):
         ::
 
             # skips the first 3 rows and returns all following rows.
+            >>> tab.order_by(tab.name.desc).offset(3)
             >>> tab.order_by("name.desc").offset(3)
             # skips the first 10 rows and returns the next 5 rows.
-            >>> tab.order_by("name.desc").offset(10).fetch(5)
+            >>> tab.order_by(tab.name.desc).offset(10).fetch(5)
 
         For unbounded tables, this operation requires a subsequent fetch operation.
 
         :param offset: Number of records to skip.
-        :type offset: int
         :return: The result table.
         :rtype: pyflink.table.Table
         """
         return Table(self._j_table.offset(offset), self._t_env)
 
-    def fetch(self, fetch):
+    def fetch(self, fetch: int):
         """
         Limits a (possibly sorted) result to the first n rows.
 
@@ -527,21 +571,21 @@ class Table(object):
         Returns the first 3 records.
         ::
 
+            >>> tab.order_by(tab.name.desc).fetch(3)
             >>> tab.order_by("name.desc").fetch(3)
 
         Skips the first 10 rows and returns the next 5 rows.
         ::
 
-            >>> tab.order_by("name.desc").offset(10).fetch(5)
+            >>> tab.order_by(tab.name.desc).offset(10).fetch(5)
 
         :param fetch: The number of records to return. Fetch must be >= 0.
-        :type fetch: int
         :return: The result table.
         :rtype: pyflink.table.Table
         """
         return Table(self._j_table.fetch(fetch), self._t_env)
 
-    def window(self, window):
+    def window(self, window: GroupWindow):
         """
         Defines group window on the records of a table.
 
@@ -565,20 +609,23 @@ class Table(object):
         Example:
         ::
 
-            >>> tab.window(Tumble.over("10.minutes").on("rowtime").alias("w")) \\
-            ...     .group_by("w") \\
-            ...     .select("a.sum as a, w.start as b, w.end as c, w.rowtime as d")
+            >>> from pyflink.table import expressions as expr
+            >>> tab.window(Tumble.over(expr.lit(10).minutes).on(tab.rowtime).alias('w')) \\
+            ...     .group_by(col('w')) \\
+            ...     .select(tab.a.sum.alias('a'),
+            ...             col('w').start.alias('b'),
+            ...             col('w').end.alias('c'),
+            ...             col('w').rowtime.alias('d'))
 
         :param window: A :class:`~pyflink.table.window.GroupWindow` created from
                        :class:`~pyflink.table.window.Tumble`, :class:`~pyflink.table.window.Session`
                        or :class:`~pyflink.table.window.Slide`.
-        :type window: pyflink.table.window.GroupWindow
         :return: A group windowed table.
         :rtype: GroupWindowedTable
         """
         return GroupWindowedTable(self._j_table.window(window._java_window), self._t_env)
 
-    def over_window(self, *over_windows):
+    def over_window(self, *over_windows: OverWindow):
         """
         Defines over-windows on the records of a table.
 
@@ -588,9 +635,10 @@ class Table(object):
         Example:
         ::
 
-            >>> table.window(Over.partition_by("c").order_by("rowTime") \\
-            ...     .preceding("10.seconds").alias("ow")) \\
-            ...     .select("c, b.count over ow, e.sum over ow")
+            >>> from pyflink.table import expressions as expr
+            >>> tab.over_window(Over.partition_by(tab.c).order_by(tab.rowtime) \\
+            ...     .preceding(lit(10).seconds).alias("ow")) \\
+            ...     .select(tab.c, tab.b.count.over(col('ow'), tab.e.sum.over(col('ow'))))
 
         .. note::
 
@@ -603,7 +651,6 @@ class Table(object):
             Over-windows for batch tables are currently not supported.
 
         :param over_windows: over windows created from :class:`~pyflink.table.window.Over`.
-        :type over_windows: pyflink.table.window.OverWindow
         :return: A over windowed table.
         :rtype: pyflink.table.OverWindowedTable
         """
@@ -612,7 +659,7 @@ class Table(object):
                                  [item._java_over_window for item in over_windows])
         return OverWindowedTable(self._j_table.window(window_array), self._t_env)
 
-    def add_columns(self, fields):
+    def add_columns(self, *fields: Union[str, Expression]):
         """
         Adds additional columns. Similar to a SQL SELECT statement. The field expressions
         can contain complex expressions, but can not contain aggregations. It will throw an
@@ -621,16 +668,22 @@ class Table(object):
         Example:
         ::
 
+            >>> from pyflink.table import expressions as expr
+            >>> tab.add_columns((tab.a + 1).alias('a1'), expr.concat(tab.b, 'sunny').alias('b1'))
             >>> tab.add_columns("a + 1 as a1, concat(b, 'sunny') as b1")
 
         :param fields: Column list string.
-        :type fields: str
         :return: The result table.
         :rtype: pyflink.table.Table
         """
-        return Table(self._j_table.addColumns(fields), self._t_env)
+        if all(isinstance(f, Expression) for f in fields):
+            return Table(self._j_table.addColumns(to_expression_jarray(fields)), self._t_env)
+        else:
+            assert len(fields) == 1
+            assert isinstance(fields[0], str)
+            return Table(self._j_table.addColumns(fields[0]), self._t_env)
 
-    def add_or_replace_columns(self, fields):
+    def add_or_replace_columns(self, *fields: Union[str, Expression]):
         """
         Adds additional columns. Similar to a SQL SELECT statement. The field expressions
         can contain complex expressions, but can not contain aggregations. Existing fields will be
@@ -640,16 +693,24 @@ class Table(object):
         Example:
         ::
 
+            >>> from pyflink.table import expressions as expr
+            >>> tab.add_or_replace_columns((tab.a + 1).alias('a1'),
+            ...                            expr.concat(tab.b, 'sunny').alias('b1'))
             >>> tab.add_or_replace_columns("a + 1 as a1, concat(b, 'sunny') as b1")
 
         :param fields: Column list string.
-        :type fields: str
         :return: The result table.
         :rtype: pyflink.table.Table
         """
-        return Table(self._j_table.addOrReplaceColumns(fields), self._t_env)
+        if all(isinstance(f, Expression) for f in fields):
+            return Table(self._j_table.addOrReplaceColumns(to_expression_jarray(fields)),
+                         self._t_env)
+        else:
+            assert len(fields) == 1
+            assert isinstance(fields[0], str)
+            return Table(self._j_table.addOrReplaceColumns(fields[0]), self._t_env)
 
-    def rename_columns(self, fields):
+    def rename_columns(self, *fields: Union[str, Expression]):
         """
         Renames existing columns. Similar to a field alias statement. The field expressions
         should be alias expressions, and only the existing fields can be renamed.
@@ -657,32 +718,44 @@ class Table(object):
         Example:
         ::
 
+            >>> tab.rename_columns(tab.a.alias('a1'), tab.b.alias('b1'))
             >>> tab.rename_columns("a as a1, b as b1")
 
         :param fields: Column list string.
-        :type fields: str
         :return: The result table.
         :rtype: pyflink.table.Table
         """
-        return Table(self._j_table.renameColumns(fields), self._t_env)
+        if all(isinstance(f, Expression) for f in fields):
+            return Table(self._j_table.renameColumns(to_expression_jarray(fields)),
+                         self._t_env)
+        else:
+            assert len(fields) == 1
+            assert isinstance(fields[0], str)
+            return Table(self._j_table.renameColumns(fields[0]), self._t_env)
 
-    def drop_columns(self, fields):
+    def drop_columns(self, *fields: Union[str, Expression]):
         """
         Drops existing columns. The field expressions should be field reference expressions.
 
         Example:
         ::
 
+            >>> tab.drop_columns(tab.a, tab.b)
             >>> tab.drop_columns("a, b")
 
         :param fields: Column list string.
-        :type fields: str
         :return: The result table.
         :rtype: pyflink.table.Table
         """
-        return Table(self._j_table.dropColumns(fields), self._t_env)
+        if all(isinstance(f, Expression) for f in fields):
+            return Table(self._j_table.dropColumns(to_expression_jarray(fields)),
+                         self._t_env)
+        else:
+            assert len(fields) == 1
+            assert isinstance(fields[0], str)
+            return Table(self._j_table.dropColumns(fields[0]), self._t_env)
 
-    def insert_into(self, table_path):
+    def insert_into(self, table_path: str):
         """
         Writes the :class:`~pyflink.table.Table` to a :class:`~pyflink.table.TableSink` that was
         registered under the specified name. For the path resolution algorithm see
@@ -695,7 +768,6 @@ class Table(object):
 
         :param table_path: The path of the registered :class:`~pyflink.table.TableSink` to which
                the :class:`~pyflink.table.Table` is written.
-        :type table_path: str
 
         .. note:: Deprecated in 1.11. Use :func:`execute_insert` for single sink,
                   use :class:`TableTableEnvironment`#:func:`create_statement_set`
@@ -717,7 +789,7 @@ class Table(object):
 
             >>> pdf = pd.DataFrame(np.random.rand(1000, 2))
             >>> table = table_env.from_pandas(pdf, ["a", "b"])
-            >>> table.filter("a > 0.5").to_pandas()
+            >>> table.filter(table.a > 0.5).to_pandas()
 
         :return: the result pandas DataFrame.
 
@@ -766,7 +838,7 @@ class Table(object):
         """
         self._j_table.printSchema()
 
-    def execute_insert(self, table_path, overwrite=False):
+    def execute_insert(self, table_path: str, overwrite: bool = False):
         """
         Writes the :class:`~pyflink.table.Table` to a :class:`~pyflink.table.TableSink` that was
         registered under the specified name, and then execute the insert operation.
@@ -779,10 +851,8 @@ class Table(object):
 
         :param table_path: The path of the registered :class:`~pyflink.table.TableSink` to which
                the :class:`~pyflink.table.Table` is written.
-        :type table_path: str
         :param overwrite: The flag that indicates whether the insert should overwrite
                existing data or not.
-        :type overwrite: bool
         :return: The table result.
 
         .. versionadded:: 1.11.0
@@ -806,24 +876,19 @@ class Table(object):
         self._t_env._before_execute()
         return TableResult(self._j_table.execute())
 
-    def explain(self, *extra_details):
+    def explain(self, *extra_details: ExplainDetail) -> str:
         """
         Returns the AST of this table and the execution plan.
 
         :param extra_details: The extra explain details which the explain result should include,
                               e.g. estimated cost, changelog mode for streaming
-        :type extra_details: tuple[ExplainDetail] (variable-length arguments of ExplainDetail)
         :return: The statement for which the AST and execution plan will be returned.
-        :rtype: str
 
         .. versionadded:: 1.11.0
         """
         j_extra_details = to_j_explain_detail_arr(extra_details)
         return self._j_table.explain(j_extra_details)
 
-    def __str__(self):
-        return self._j_table.toString()
-
 
 class GroupedTable(object):
     """
@@ -834,7 +899,7 @@ class GroupedTable(object):
         self._j_table = java_table
         self._t_env = t_env
 
-    def select(self, fields):
+    def select(self, *fields: Union[str, Expression]):
         """
         Performs a selection operation on a grouped table. Similar to an SQL SELECT statement.
         The field expressions can contain complex expressions and aggregations.
@@ -842,15 +907,20 @@ class GroupedTable(object):
         Example:
         ::
 
-            >>> tab.group_by("key").select("key, value.avg + ' The average' as average")
+            >>> tab.group_by(tab.key).select(tab.key, tab.value.avg.alias('average'))
+            >>> tab.group_by("key").select("key, value.avg as average")
 
 
         :param fields: Expression string that contains group keys and aggregate function calls.
-        :type fields: str
         :return: The result table.
         :rtype: pyflink.table.Table
         """
-        return Table(self._j_table.select(fields), self._t_env)
+        if all(isinstance(f, Expression) for f in fields):
+            return Table(self._j_table.select(to_expression_jarray(fields)), self._t_env)
+        else:
+            assert len(fields) == 1
+            assert isinstance(fields[0], str)
+            return Table(self._j_table.select(fields[0]), self._t_env)
 
 
 class GroupWindowedTable(object):
@@ -862,7 +932,7 @@ class GroupWindowedTable(object):
         self._j_table = java_group_windowed_table
         self._t_env = t_env
 
-    def group_by(self, fields):
+    def group_by(self, *fields: Union[str, Expression]):
         """
         Groups the elements by a mandatory window and one or more optional grouping attributes.
         The window is specified by referring to its alias.
@@ -877,14 +947,25 @@ class GroupWindowedTable(object):
         Example:
         ::
 
-            >>> tab.window(group_window.alias("w")).group_by("w, key").select("key, value.avg")
+            >>> from pyflink.table import expressions as expr
+            >>> tab.window(Tumble.over(expr.lit(10).minutes).on(tab.rowtime).alias('w')) \\
+            ...     .group_by(col('w')) \\
+            ...     .select(tab.a.sum.alias('a'),
+            ...             col('w').start.alias('b'),
+            ...             col('w').end.alias('c'),
+            ...             col('w').rowtime.alias('d'))
 
         :param fields: Group keys.
-        :type fields: str
         :return: A window grouped table.
         :rtype: pyflink.table.WindowGroupedTable
         """
-        return WindowGroupedTable(self._j_table.groupBy(fields), self._t_env)
+        if all(isinstance(f, Expression) for f in fields):
+            return WindowGroupedTable(
+                self._j_table.groupBy(to_expression_jarray(fields)), self._t_env)
+        else:
+            assert len(fields) == 1
+            assert isinstance(fields[0], str)
+            return WindowGroupedTable(self._j_table.groupBy(fields[0]), self._t_env)
 
 
 class WindowGroupedTable(object):
@@ -896,7 +977,7 @@ class WindowGroupedTable(object):
         self._j_table = java_window_grouped_table
         self._t_env = t_env
 
-    def select(self, fields):
+    def select(self, *fields: Union[str, Expression]):
         """
         Performs a selection operation on a window grouped table. Similar to an SQL SELECT
         statement.
@@ -905,14 +986,21 @@ class WindowGroupedTable(object):
         Example:
         ::
 
+            >>> window_grouped_table.select(col('key'),
+            ...                             col('window').start,
+            ...                             col('value').avg.alias('valavg'))
             >>> window_grouped_table.select("key, window.start, value.avg as valavg")
 
         :param fields: Expression string.
-        :type fields: str
         :return: The result table.
         :rtype: pyflink.table.Table
         """
-        return Table(self._j_table.select(fields), self._t_env)
+        if all(isinstance(f, Expression) for f in fields):
+            return Table(self._j_table.select(to_expression_jarray(fields)), self._t_env)
+        else:
+            assert len(fields) == 1
+            assert isinstance(fields[0], str)
+            return Table(self._j_table.select(fields[0]), self._t_env)
 
 
 class OverWindowedTable(object):
@@ -928,7 +1016,7 @@ class OverWindowedTable(object):
         self._j_table = java_over_windowed_table
         self._t_env = t_env
 
-    def select(self, fields):
+    def select(self, *fields: Union[str, Expression]):
         """
         Performs a selection operation on a over windowed table. Similar to an SQL SELECT
         statement.
@@ -937,11 +1025,18 @@ class OverWindowedTable(object):
         Example:
         ::
 
+            >>> over_windowed_table.select(col('c'),
+            ...                            col('b').count.over(col('ow')),
+            ...                            col('e').sum.over(col('ow')))
             >>> over_windowed_table.select("c, b.count over ow, e.sum over ow")
 
         :param fields: Expression string.
-        :type fields: str
         :return: The result table.
         :rtype: pyflink.table.Table
         """
-        return Table(self._j_table.select(fields), self._t_env)
+        if all(isinstance(f, Expression) for f in fields):
+            return Table(self._j_table.select(to_expression_jarray(fields)), self._t_env)
+        else:
+            assert len(fields) == 1
+            assert isinstance(fields[0], str)
+            return Table(self._j_table.select(fields[0]), self._t_env)
diff --git a/flink-python/pyflink/table/table_environment.py b/flink-python/pyflink/table/table_environment.py
index c868c1b..882c5ff 100644
--- a/flink-python/pyflink/table/table_environment.py
+++ b/flink-python/pyflink/table/table_environment.py
@@ -124,24 +124,21 @@ class TableEnvironment(object):
         warnings.warn("Deprecated in 1.11.", DeprecationWarning)
         return Table(self._j_tenv.fromTableSource(table_source._j_table_source), self)
 
-    def register_catalog(self, catalog_name, catalog):
+    def register_catalog(self, catalog_name: str, catalog: Catalog):
         """
         Registers a :class:`~pyflink.table.catalog.Catalog` under a unique name.
         All tables registered in the :class:`~pyflink.table.catalog.Catalog` can be accessed.
 
         :param catalog_name: The name under which the catalog will be registered.
-        :type catalog_name: str
         :param catalog: The catalog to register.
-        :type catalog: pyflink.table.catalog.Catalog
         """
         self._j_tenv.registerCatalog(catalog_name, catalog._j_catalog)
 
-    def get_catalog(self, catalog_name):
+    def get_catalog(self, catalog_name: str):
         """
         Gets a registered :class:`~pyflink.table.catalog.Catalog` by name.
 
         :param catalog_name: The name to look up the :class:`~pyflink.table.catalog.Catalog`.
-        :type catalog_name: str
         :return: The requested catalog, None if there is no
                  registered catalog with given name.
         :rtype: pyflink.table.catalog.Catalog
@@ -297,7 +294,7 @@ class TableEnvironment(object):
         else:
             self._j_tenv.createFunction(path, java_function, ignore_if_exists)
 
-    def drop_function(self, path) -> bool:
+    def drop_function(self, path: str) -> bool:
         """
         Drops a catalog function registered in the given path.
 
@@ -384,7 +381,7 @@ class TableEnvironment(object):
         java_function = function.java_user_defined_function()
         self._j_tenv.createTemporaryFunction(path, java_function)
 
-    def drop_temporary_function(self, path) -> bool:
+    def drop_temporary_function(self, path: str) -> bool:
         """
         Drops a temporary system function registered under the given name.
 
@@ -400,7 +397,7 @@ class TableEnvironment(object):
         """
         return self._j_tenv.dropTemporaryFunction(path)
 
-    def register_table(self, name, table):
+    def register_table(self, name: str, table: Table):
         """
         Registers a :class:`~pyflink.table.Table` under a unique name in the TableEnvironment's
         catalog. Registered tables can be referenced in SQL queries.
@@ -412,16 +409,14 @@ class TableEnvironment(object):
             >>> table_env.register_table("source", tab)
 
         :param name: The name under which the table will be registered.
-        :type name: str
         :param table: The table to register.
-        :type table: pyflink.table.Table
 
         .. note:: Deprecated in 1.10. Use :func:`create_temporary_view` instead.
         """
         warnings.warn("Deprecated in 1.10. Use create_temporary_view instead.", DeprecationWarning)
         self._j_tenv.registerTable(name, table._j_table)
 
-    def register_table_source(self, name, table_source):
+    def register_table_source(self, name: str, table_source):
         """
         Registers an external :class:`~pyflink.table.TableSource` in this
         :class:`~pyflink.table.TableEnvironment`'s catalog. Registered tables can be referenced in
@@ -437,7 +432,6 @@ class TableEnvironment(object):
             ...                                                 DataTypes.STRING()]))
 
         :param name: The name under which the table source is registered.
-        :type name: str
         :param table_source: The table source to register.
         :type table_source: pyflink.table.TableSource
 
@@ -446,7 +440,7 @@ class TableEnvironment(object):
         warnings.warn("Deprecated in 1.10. Use connect instead.", DeprecationWarning)
         self._j_tenv.registerTableSourceInternal(name, table_source._j_table_source)
 
-    def register_table_sink(self, name, table_sink):
+    def register_table_sink(self, name: str, table_sink):
         """
         Registers an external :class:`~pyflink.table.TableSink` with given field names and types in
         this :class:`~pyflink.table.TableEnvironment`'s catalog. Registered sink tables can be
@@ -462,7 +456,6 @@ class TableEnvironment(object):
             ...                                            "./2.csv"))
 
         :param name: The name under which the table sink is registered.
-        :type name: str
         :param table_sink: The table sink to register.
         :type table_sink: pyflink.table.TableSink
 
@@ -471,7 +464,7 @@ class TableEnvironment(object):
         warnings.warn("Deprecated in 1.10. Use connect instead.", DeprecationWarning)
         self._j_tenv.registerTableSinkInternal(name, table_sink._j_table_sink)
 
-    def scan(self, *table_path):
+    def scan(self, *table_path: str):
         """
         Scans a registered table and returns the resulting :class:`~pyflink.table.Table`.
         A table to scan must be registered in the TableEnvironment. It can be either directly
@@ -493,7 +486,6 @@ class TableEnvironment(object):
             >>> tab = table_env.scan("catalogName", "dbName", "tableName")
 
         :param table_path: The path of the table to scan.
-        :type table_path: str
         :throws: Exception if no table is found using the given table path.
         :return: The resulting table.
         :rtype: pyflink.table.Table
@@ -506,7 +498,7 @@ class TableEnvironment(object):
         j_table = self._j_tenv.scan(j_table_paths)
         return Table(j_table, self)
 
-    def from_path(self, path):
+    def from_path(self, path: str):
         """
         Reads a registered table and returns the resulting :class:`~pyflink.table.Table`.
 
@@ -534,7 +526,6 @@ class TableEnvironment(object):
             >>> tab = table_env.from_path("catalogName.`db.Name`.`Table`")
 
         :param path: The path of a table API object to scan.
-        :type path: str
         :return: Either a table or virtual table (=view).
         :rtype: pyflink.table.Table
 
@@ -544,7 +535,7 @@ class TableEnvironment(object):
         """
         return Table(get_method(self._j_tenv, "from")(path), self)
 
-    def insert_into(self, target_path, table):
+    def insert_into(self, target_path: str, table: Table):
         """
         Instructs to write the content of a :class:`~pyflink.table.Table` API object into a table.
 
@@ -559,9 +550,7 @@ class TableEnvironment(object):
 
         :param target_path: The path of the registered :class:`~pyflink.table.TableSink` to which
                             the :class:`~pyflink.table.Table` is written.
-        :type target_path: str
         :param table: The Table to write to the sink.
-        :type table: pyflink.table.Table
 
         .. versionchanged:: 1.10.0
             The signature is changed, e.g. the parameter *table_path_continued* was removed and
@@ -734,17 +723,15 @@ class TableEnvironment(object):
         else:
             return self._j_tenv.explain(table._j_table, extended)
 
-    def explain_sql(self, stmt, *extra_details):
+    def explain_sql(self, stmt: str, *extra_details) -> str:
         """
         Returns the AST of the specified statement and the execution plan.
 
         :param stmt: The statement for which the AST and execution plan will be returned.
-        :type stmt: str
         :param extra_details: The extra explain details which the explain result should include,
                               e.g. estimated cost, changelog mode for streaming
         :type extra_details: tuple[ExplainDetail] (variable-length arguments of ExplainDetail)
         :return: The statement for which the AST and execution plan will be returned.
-        :rtype: str
 
         .. versionadded:: 1.11.0
         """
@@ -752,7 +739,7 @@ class TableEnvironment(object):
         j_extra_details = to_j_explain_detail_arr(extra_details)
         return self._j_tenv.explainSql(stmt, j_extra_details)
 
-    def sql_query(self, query):
+    def sql_query(self, query: str):
         """
         Evaluates a SQL query on registered tables and retrieves the result as a
         :class:`~pyflink.table.Table`.
@@ -770,14 +757,13 @@ class TableEnvironment(object):
             >>> table_env.sql_query("SELECT * FROM %s" % table)
 
         :param query: The sql query string.
-        :type query: str
         :return: The result table.
         :rtype: pyflink.table.Table
         """
         j_table = self._j_tenv.sqlQuery(query)
         return Table(j_table, self)
 
-    def execute_sql(self, stmt):
+    def execute_sql(self, stmt: str):
         """
         Execute the given single statement, and return the execution result.
 
@@ -808,7 +794,7 @@ class TableEnvironment(object):
         _j_statement_set = self._j_tenv.createStatementSet()
         return StatementSet(_j_statement_set, self)
 
-    def sql_update(self, stmt):
+    def sql_update(self, stmt: str):
         """
         Evaluates a SQL statement such as INSERT, UPDATE or DELETE or a DDL statement
 
@@ -877,7 +863,6 @@ class TableEnvironment(object):
             >>> table_env.execute("MyJob")
 
         :param stmt: The SQL statement to evaluate.
-        :type stmt: str
 
         .. note:: Deprecated in 1.11. Use :func:`execute_sql` for single statement,
                   use :func:`create_statement_set` for multiple DML statements.
@@ -897,7 +882,7 @@ class TableEnvironment(object):
         """
         return self._j_tenv.getCurrentCatalog()
 
-    def use_catalog(self, catalog_name):
+    def use_catalog(self, catalog_name: str):
         """
         Sets the current catalog to the given value. It also sets the default
         database to the catalog's default one.
@@ -947,7 +932,7 @@ class TableEnvironment(object):
         """
         self._j_tenv.useCatalog(catalog_name)
 
-    def get_current_database(self):
+    def get_current_database(self) -> str:
         """
         Gets the current default database name of the running session.
 
@@ -958,7 +943,7 @@ class TableEnvironment(object):
         """
         return self._j_tenv.getCurrentDatabase()
 
-    def use_database(self, database_name):
+    def use_database(self, database_name: str):
         """
         Sets the current default database. It has to exist in the current catalog. That path will
         be used as the default one when looking for unqualified object names.
@@ -1004,7 +989,6 @@ class TableEnvironment(object):
         .. seealso:: :func:`~pyflink.table.TableEnvironment.use_catalog`
 
         :param database_name: The name of the database to set as the current database.
-        :type database_name: str
         """
         self._j_tenv.useDatabase(database_name)
 
@@ -1057,7 +1041,7 @@ class TableEnvironment(object):
         """
         pass
 
-    def register_java_function(self, name, function_class_name):
+    def register_java_function(self, name: str, function_class_name: str):
         """
         Registers a java user defined function under a unique name. Replaces already existing
         user-defined functions under this name. The acceptable function type contains
@@ -1094,7 +1078,7 @@ class TableEnvironment(object):
         else:
             self._j_tenv.registerFunction(name, java_function)
 
-    def register_function(self, name, function):
+    def register_function(self, name: str, function):
         """
         Registers a python user-defined function under a unique name. Replaces already existing
         user-defined function under this name.
@@ -1136,7 +1120,7 @@ class TableEnvironment(object):
         else:
             self._j_tenv.registerFunction(name, java_function)
 
-    def create_temporary_view(self, view_path, table):
+    def create_temporary_view(self, view_path: str, table: Table):
         """
         Registers a :class:`~pyflink.table.Table` API object as a temporary view similar to SQL
         temporary views.
@@ -1148,22 +1132,19 @@ class TableEnvironment(object):
         :param view_path: The path under which the view will be registered. See also the
                           :class:`~pyflink.table.TableEnvironment` class description for the format
                           of the path.
-        :type view_path: str
         :param table: The view to register.
-        :type table: pyflink.table.Table
 
         .. versionadded:: 1.10.0
         """
         self._j_tenv.createTemporaryView(view_path, table._j_table)
 
-    def add_python_file(self, file_path):
+    def add_python_file(self, file_path: str):
         """
         Adds a python dependency which could be python files, python packages or
         local directories. They will be added to the PYTHONPATH of the python UDF worker.
         Please make sure that these dependencies can be imported.
 
         :param file_path: The path of the python dependency.
-        :type file_path: str
 
         .. versionadded:: 1.10.0
         """
@@ -1177,7 +1158,9 @@ class TableEnvironment(object):
         self.get_config().get_configuration().set_string(
             jvm.PythonOptions.PYTHON_FILES.key(), python_files)
 
-    def set_python_requirements(self, requirements_file_path, requirements_cache_dir=None):
+    def set_python_requirements(self,
+                                requirements_file_path: str,
+                                requirements_cache_dir: str = None):
         """
         Specifies a requirements.txt file which defines the third-party dependencies.
         These dependencies will be installed to a temporary directory and added to the
@@ -1206,10 +1189,8 @@ class TableEnvironment(object):
             SetupTools (version >= 37.0.0).
 
         :param requirements_file_path: The path of "requirements.txt" file.
-        :type requirements_file_path: str
         :param requirements_cache_dir: The path of the local directory which contains the
                                        installation packages.
-        :type requirements_cache_dir: str
 
         .. versionadded:: 1.10.0
         """
@@ -1221,7 +1202,7 @@ class TableEnvironment(object):
         self.get_config().get_configuration().set_string(
             jvm.PythonOptions.PYTHON_REQUIREMENTS.key(), python_requirements)
 
-    def add_python_archive(self, archive_path, target_dir=None):
+    def add_python_archive(self, archive_path: str, target_dir: str = None):
         """
         Adds a python archive file. The file will be extracted to the working directory of
         python UDF worker.
@@ -1268,9 +1249,7 @@ class TableEnvironment(object):
             The other archive formats such as tar, tar.gz, 7z, rar, etc are not supported.
 
         :param archive_path: The archive file path.
-        :type archive_path: str
         :param target_dir: Optional, the target dir name that the archive file extracted to.
-        :type target_dir: str
 
         .. versionadded:: 1.10.0
         """
@@ -1288,7 +1267,7 @@ class TableEnvironment(object):
         self.get_config().get_configuration().set_string(
             jvm.PythonOptions.PYTHON_ARCHIVES.key(), python_files)
 
-    def execute(self, job_name):
+    def execute(self, job_name: str):
         """
         Triggers the program execution. The environment will execute all parts of
         the program.
@@ -1306,7 +1285,6 @@ class TableEnvironment(object):
             this method is called (e.g. timezone).
 
         :param job_name: Desired name of the job.
-        :type job_name: str
         :return: The result of the job execution, containing elapsed time and accumulators.
 
         .. note:: Deprecated in 1.11. Use :func:`execute_sql` for single sink,
diff --git a/flink-python/pyflink/table/tests/test_aggregate.py b/flink-python/pyflink/table/tests/test_aggregate.py
index f0e70b9..ceac66c 100644
--- a/flink-python/pyflink/table/tests/test_aggregate.py
+++ b/flink-python/pyflink/table/tests/test_aggregate.py
@@ -23,7 +23,7 @@ class StreamTableAggregateTests(PyFlinkStreamTableTestCase):
 
     def test_group_by(self):
         t = self.t_env.from_elements([(1, 'Hi', 'Hello')], ['a', 'b', 'c'])
-        result = t.group_by("c").select("a.sum, c as b")
+        result = t.group_by(t.c).select(t.a.sum, t.c.alias('b'))
         query_operation = result._j_table.getQueryOperation().getChildren().get(0)
         self.assertEqual("[c]", query_operation.getGroupingExpressions().toString())
         self.assertEqual("[as(sum(a), 'EXPR$0')]",
diff --git a/flink-python/pyflink/table/tests/test_calc.py b/flink-python/pyflink/table/tests/test_calc.py
index 53faf59..ecb2c8d 100644
--- a/flink-python/pyflink/table/tests/test_calc.py
+++ b/flink-python/pyflink/table/tests/test_calc.py
@@ -31,21 +31,22 @@ class StreamTableCalcTests(PyFlinkStreamTableTestCase):
 
     def test_select(self):
         t = self.t_env.from_elements([(1, 'hi', 'hello')], ['a', 'b', 'c'])
-        result = t.select("a + 1, b, c")
+        result = t.select(t.a + 1, t.b, t.c)
         query_operation = result._j_table.getQueryOperation()
         self.assertEqual('[plus(a, 1), b, c]',
                          query_operation.getProjectList().toString())
 
     def test_alias(self):
         t = self.t_env.from_elements([(1, 'Hi', 'Hello')], ['a', 'b', 'c'])
-        result = t.alias("d, e, f").select("d, e, f")
+        t = t.alias("d, e, f")
+        result = t.select(t.d, t.e, t.f)
         table_schema = result._j_table.getQueryOperation().getTableSchema()
         self.assertEqual(['d', 'e', 'f'], list(table_schema.getFieldNames()))
 
     def test_where(self):
         t_env = self.t_env
         t = t_env.from_elements([(1, 'Hi', 'Hello')], ['a', 'b', 'c'])
-        result = t.where("a > 1 && b = 'Hello'")
+        result = t.where((t.a > 1) & (t.b == 'Hello'))
         query_operation = result._j_table.getQueryOperation()
         self.assertEqual("and("
                          "greaterThan(a, 1), "
@@ -54,7 +55,7 @@ class StreamTableCalcTests(PyFlinkStreamTableTestCase):
 
     def test_filter(self):
         t = self.t_env.from_elements([(1, 'Hi', 'Hello')], ['a', 'b', 'c'])
-        result = t.filter("a > 1 && b = 'Hello'")
+        result = t.filter((t.a > 1) & (t.b == 'Hello'))
         query_operation = result._j_table.getQueryOperation()
         self.assertEqual("and("
                          "greaterThan(a, 1), "
diff --git a/flink-python/pyflink/table/tests/test_column_operation.py b/flink-python/pyflink/table/tests/test_column_operation.py
index 023d546..818299c 100644
--- a/flink-python/pyflink/table/tests/test_column_operation.py
+++ b/flink-python/pyflink/table/tests/test_column_operation.py
@@ -23,7 +23,7 @@ class StreamTableColumnsOperationTests(PyFlinkStreamTableTestCase):
 
     def test_add_columns(self):
         t = self.t_env.from_elements([(1, 'Hi', 'Hello')], ['a', 'b', 'c'])
-        result = t.select("a").add_columns("a + 1 as b, a + 2 as c")
+        result = t.select(t.a).add_columns((t.a + 1).alias('b'), (t.a + 2).alias('c'))
         query_operation = result._j_table.getQueryOperation()
         self.assertEqual('[a, plus(a, 1), '
                          'plus(a, 2)]',
@@ -31,7 +31,7 @@ class StreamTableColumnsOperationTests(PyFlinkStreamTableTestCase):
 
     def test_add_or_replace_columns(self):
         t = self.t_env.from_elements([(1, 'Hi', 'Hello')], ['a', 'b', 'c'])
-        result = t.select("a").add_or_replace_columns("a + 1 as b, a + 2 as a")
+        result = t.select("a").add_or_replace_columns((t.a + 1).alias('b'), (t.a + 2).alias('a'))
         query_operation = result._j_table.getQueryOperation()
         self.assertEqual('[plus(a, 2), '
                          'plus(a, 1)]',
@@ -39,13 +39,13 @@ class StreamTableColumnsOperationTests(PyFlinkStreamTableTestCase):
 
     def test_rename_columns(self):
         t = self.t_env.from_elements([(1, 'Hi', 'Hello')], ['a', 'b', 'c'])
-        result = t.select("a, b, c").rename_columns("a as d, c as f, b as e")
+        result = t.select("a, b, c").rename_columns(t.a.alias('d'), t.c.alias('f'), t.b.alias('e'))
         table_schema = result._j_table.getQueryOperation().getTableSchema()
         self.assertEqual(['d', 'e', 'f'], list(table_schema.getFieldNames()))
 
     def test_drop_columns(self):
         t = self.t_env.from_elements([(1, 'Hi', 'Hello')], ['a', 'b', 'c'])
-        result = t.select("a, b, c").drop_columns("a, c")
+        result = t.drop_columns(t.a, t.c)
         query_operation = result._j_table.getQueryOperation()
         self.assertEqual('[b]', query_operation.getProjectList().toString())
 
diff --git a/flink-python/pyflink/table/tests/test_correlate.py b/flink-python/pyflink/table/tests/test_correlate.py
index 24f1503..7a41aaf 100644
--- a/flink-python/pyflink/table/tests/test_correlate.py
+++ b/flink-python/pyflink/table/tests/test_correlate.py
@@ -15,6 +15,7 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 ################################################################################
+from pyflink.table import expressions as expr
 from pyflink.testing.test_case_utils import PyFlinkStreamTableTestCase
 
 
@@ -39,7 +40,8 @@ class CorrelateTests(PyFlinkStreamTableTestCase):
                                                     "org.apache.flink.table.utils.TableFunc1")
         source = t_env.from_elements([("1", "1#3#5#7"), ("2", "2#4#6#8")], ["id", "words"])
 
-        result = source.join_lateral("split(words) as (word)", "id = word")
+        result = source.join_lateral(expr.call('split', source.words).alias('word'),
+                                     expr.col('id') == expr.col('word'))
 
         query_operation = result._j_table.getQueryOperation()
         self.assertEqual('INNER', query_operation.getJoinType().toString())
@@ -53,7 +55,7 @@ class CorrelateTests(PyFlinkStreamTableTestCase):
                                                     "org.apache.flink.table.utils.TableFunc1")
         source = t_env.from_elements([("1", "1#3#5#7"), ("2", "2#4#6#8")], ["id", "words"])
 
-        result = source.left_outer_join_lateral("split(words) as (word)")
+        result = source.left_outer_join_lateral(expr.call('split', source.words).alias('word'))
 
         query_operation = result._j_table.getQueryOperation()
         self.assertEqual('LEFT_OUTER', query_operation.getJoinType().toString())
@@ -67,7 +69,8 @@ class CorrelateTests(PyFlinkStreamTableTestCase):
         source = t_env.from_elements([("1", "1#3#5#7"), ("2", "2#4#6#8")], ["id", "words"])
 
         # only support "true" as the join predicate currently
-        result = source.left_outer_join_lateral("split(words) as (word)", "true")
+        result = source.left_outer_join_lateral(expr.call('split', source.words).alias('word'),
+                                                expr.lit(True))
 
         query_operation = result._j_table.getQueryOperation()
         self.assertEqual('LEFT_OUTER', query_operation.getJoinType().toString())
diff --git a/flink-python/pyflink/table/tests/test_dependency.py b/flink-python/pyflink/table/tests/test_dependency.py
index 5f6e682..d9adc71 100644
--- a/flink-python/pyflink/table/tests/test_dependency.py
+++ b/flink-python/pyflink/table/tests/test_dependency.py
@@ -23,6 +23,7 @@ import uuid
 
 from pyflink.pyflink_gateway_server import on_windows
 from pyflink.table import DataTypes
+from pyflink.table import expressions as expr
 from pyflink.table.udf import udf
 from pyflink.testing import source_sink_utils
 from pyflink.testing.test_case_utils import (PyFlinkBlinkStreamTableTestCase,
@@ -45,13 +46,13 @@ class DependencyTests(object):
             from test_dependency_manage_lib import add_two
             return add_two(i)
 
-        self.t_env.create_temporary_system_function("add_two", udf(plus_two, DataTypes.BIGINT(),
-                                                    DataTypes.BIGINT()))
+        self.t_env.create_temporary_system_function(
+            "add_two", udf(plus_two, DataTypes.BIGINT(), DataTypes.BIGINT()))
         table_sink = source_sink_utils.TestAppendSink(
             ['a', 'b'], [DataTypes.BIGINT(), DataTypes.BIGINT()])
         self.t_env.register_table_sink("Results", table_sink)
         t = self.t_env.from_elements([(1, 2), (2, 5), (3, 1)], ['a', 'b'])
-        exec_insert_table(t.select("add_two(a), a"), "Results")
+        exec_insert_table(t.select(expr.call("add_two", t.a), t.a), "Results")
 
         actual = source_sink_utils.results()
         self.assert_equals(actual, ["3,1", "4,2", "5,3"])
@@ -76,11 +77,11 @@ class FlinkBatchDependencyTests(PyFlinkBatchTableTestCase):
             from test_dependency_manage_lib import add_two
             return add_two(i)
 
-        self.t_env.create_temporary_system_function("add_two", udf(plus_two, DataTypes.BIGINT(),
-                                                    DataTypes.BIGINT()))
+        self.t_env.create_temporary_system_function(
+            "add_two", udf(plus_two, DataTypes.BIGINT(), DataTypes.BIGINT()))
 
-        t = self.t_env.from_elements([(1, 2), (2, 5), (3, 1)], ['a', 'b'])\
-            .select("add_two(a), a")
+        t = self.t_env.from_elements([(1, 2), (2, 5), (3, 1)], ['a', 'b'])
+        t = t.select(expr.call('add_two', t.a), t.a)
 
         result = self.collect(t)
         self.assertEqual(result, ["3,1", "4,2", "5,3"])
@@ -112,7 +113,7 @@ class BlinkStreamDependencyTests(DependencyTests, PyFlinkBlinkStreamTableTestCas
             ['a', 'b'], [DataTypes.BIGINT(), DataTypes.BIGINT()])
         self.t_env.register_table_sink("Results", table_sink)
         t = self.t_env.from_elements([(1, 2), (2, 5), (3, 1)], ['a', 'b'])
-        exec_insert_table(t.select("check_requirements(a), a"), "Results")
+        exec_insert_table(t.select(expr.call('check_requirements', t.a), t.a), "Results")
 
         actual = source_sink_utils.results()
         self.assert_equals(actual, ["1,1", "2,2", "3,3"])
@@ -158,7 +159,7 @@ class BlinkStreamDependencyTests(DependencyTests, PyFlinkBlinkStreamTableTestCas
             ['a', 'b'], [DataTypes.BIGINT(), DataTypes.BIGINT()])
         self.t_env.register_table_sink("Results", table_sink)
         t = self.t_env.from_elements([(1, 2), (2, 5), (3, 1)], ['a', 'b'])
-        exec_insert_table(t.select("add_one(a), a"), "Results")
+        exec_insert_table(t.select(expr.call('add_one', t.a), t.a), "Results")
 
         actual = source_sink_utils.results()
         self.assert_equals(actual, ["2,1", "3,2", "4,3"])
@@ -184,7 +185,7 @@ class BlinkStreamDependencyTests(DependencyTests, PyFlinkBlinkStreamTableTestCas
             ['a', 'b'], [DataTypes.BIGINT(), DataTypes.BIGINT()])
         self.t_env.register_table_sink("Results", table_sink)
         t = self.t_env.from_elements([(1, 2), (2, 5), (3, 1)], ['a', 'b'])
-        exec_insert_table(t.select("add_from_file(a), a"), "Results")
+        exec_insert_table(t.select(expr.call('add_from_file', t.a), t.a), "Results")
 
         actual = source_sink_utils.results()
         self.assert_equals(actual, ["3,1", "4,2", "5,3"])
@@ -226,8 +227,10 @@ class BlinkStreamDependencyTests(DependencyTests, PyFlinkBlinkStreamTableTestCas
             ['a', 'b'], [DataTypes.BIGINT(), DataTypes.BIGINT()])
         self.t_env.register_table_sink("Results", table_sink)
         t = self.t_env.from_elements([(1, 2), (2, 5), (3, 1)], ['a', 'b'])
-        exec_insert_table(t.select("check_python_exec(a), check_pyflink_gateway_disabled(a)"),
-                          "Results")
+        exec_insert_table(t.select(
+            expr.call('check_python_exec', t.a),
+            expr.call('check_pyflink_gateway_disabled', t.a)),
+            "Results")
 
         actual = source_sink_utils.results()
         self.assert_equals(actual, ["1,1", "2,2", "3,3"])
diff --git a/flink-python/pyflink/table/tests/test_explain.py b/flink-python/pyflink/table/tests/test_explain.py
index 1dccaba..c651fec 100644
--- a/flink-python/pyflink/table/tests/test_explain.py
+++ b/flink-python/pyflink/table/tests/test_explain.py
@@ -24,7 +24,8 @@ class StreamTableExplainTests(PyFlinkStreamTableTestCase):
 
     def test_explain(self):
         t = self.t_env.from_elements([(1, 'Hi', 'Hello')], ['a', 'b', 'c'])
-        result = t.group_by("c").select("a.sum, c as b").explain(ExplainDetail.CHANGELOG_MODE)
+        result = t.group_by("c").select(t.a.sum, t.c.alias('b')).explain(
+            ExplainDetail.CHANGELOG_MODE)
 
         assert isinstance(result, str)
 
diff --git a/flink-python/pyflink/table/tests/test_join.py b/flink-python/pyflink/table/tests/test_join.py
index f05c9e7..8ee4b9f 100644
--- a/flink-python/pyflink/table/tests/test_join.py
+++ b/flink-python/pyflink/table/tests/test_join.py
@@ -25,7 +25,7 @@ class StreamTableJoinTests(PyFlinkStreamTableTestCase):
         t_env = self.t_env
         t1 = t_env.from_elements([(1, "Hi", "Hello")], ['a', 'b', 'c'])
         t2 = t_env.from_elements([(2, "Flink")], ['d', 'e'])
-        result = t1.join(t2, "a = d")
+        result = t1.join(t2, t1.a == t2.d)
 
         query_operation = result._j_table.getQueryOperation()
         self.assertEqual('INNER', query_operation.getJoinType().toString())
@@ -37,7 +37,7 @@ class StreamTableJoinTests(PyFlinkStreamTableTestCase):
         t_env = self.t_env
         t1 = t_env.from_elements([(1, "Hi", "Hello")], ['a', 'b', 'c'])
         t2 = t_env.from_elements([(2, "Flink")], ['d', 'e'])
-        result = t1.join(t2).where("a = d")
+        result = t1.join(t2).where(t1.a == t2.d)
 
         query_operation = result._j_table.getQueryOperation().getChildren().get(0)
         self.assertEqual('INNER', query_operation.getJoinType().toString())
@@ -48,7 +48,7 @@ class StreamTableJoinTests(PyFlinkStreamTableTestCase):
         t_env = self.t_env
         t1 = t_env.from_elements([(1, "Hi", "Hello")], ['a', 'b', 'c'])
         t2 = t_env.from_elements([(2, "Flink")], ['d', 'e'])
-        result = t1.left_outer_join(t2, "a = d")
+        result = t1.left_outer_join(t2, t1.a == t2.d)
 
         query_operation = result._j_table.getQueryOperation()
         self.assertEqual('LEFT_OUTER', query_operation.getJoinType().toString())
@@ -60,7 +60,7 @@ class StreamTableJoinTests(PyFlinkStreamTableTestCase):
         t_env = self.t_env
         t1 = t_env.from_elements([(1, "Hi", "Hello")], ['a', 'b', 'c'])
         t2 = t_env.from_elements([(2, "Flink")], ['d', 'e'])
-        result = t1.left_outer_join(t2).where("a = d")
+        result = t1.left_outer_join(t2).where(t1.a == t2.d)
 
         query_operation = result._j_table.getQueryOperation().getChildren().get(0)
         self.assertEqual('LEFT_OUTER', query_operation.getJoinType().toString())
@@ -71,7 +71,7 @@ class StreamTableJoinTests(PyFlinkStreamTableTestCase):
         t_env = self.t_env
         t1 = t_env.from_elements([(1, "Hi", "Hello")], ['a', 'b', 'c'])
         t2 = t_env.from_elements([(2, "Flink")], ['d', 'e'])
-        result = t1.right_outer_join(t2, "a = d")
+        result = t1.right_outer_join(t2, t1.a == t2.d)
 
         query_operation = result._j_table.getQueryOperation()
         self.assertEqual('RIGHT_OUTER', query_operation.getJoinType().toString())
@@ -84,7 +84,7 @@ class StreamTableJoinTests(PyFlinkStreamTableTestCase):
         t1 = t_env.from_elements([(1, "Hi", "Hello")], ['a', 'b', 'c'])
         t2 = t_env.from_elements([(2, "Flink")], ['d', 'e'])
 
-        result = t1.full_outer_join(t2, "a = d")
+        result = t1.full_outer_join(t2, t1.a == t2.d)
         query_operation = result._j_table.getQueryOperation()
         self.assertEqual('FULL_OUTER', query_operation.getJoinType().toString())
         self.assertEqual('equals(a, d)',
diff --git a/flink-python/pyflink/table/tests/test_pandas_conversion.py b/flink-python/pyflink/table/tests/test_pandas_conversion.py
index 7a06393..37689e8 100644
--- a/flink-python/pyflink/table/tests/test_pandas_conversion.py
+++ b/flink-python/pyflink/table/tests/test_pandas_conversion.py
@@ -118,7 +118,7 @@ class PandasConversionITTests(PandasConversionTestBase):
         table = self.t_env.from_pandas(self.pdf, self.data_type, 5)
         self.assertEqual(self.data_type, table.get_schema().to_row_data_type())
 
-        table = table.filter("f2 < 2")
+        table = table.filter(table.f2 < 2)
         table_sink = source_sink_utils.TestAppendSink(
             self.data_type.field_names(),
             self.data_type.field_types())
@@ -139,12 +139,12 @@ class PandasConversionITTests(PandasConversionTestBase):
 
     def test_empty_to_pandas(self):
         table = self.t_env.from_pandas(self.pdf, self.data_type)
-        pdf = table.filter("f1 < 0").to_pandas()
+        pdf = table.filter(table.f1 < 0).to_pandas()
         self.assertTrue(pdf.empty)
 
     def test_to_pandas_for_retract_table(self):
         table = self.t_env.from_pandas(self.pdf, self.data_type)
-        result_pdf = table.group_by("f1").select("max(f2) as f2").to_pandas()
+        result_pdf = table.group_by(table.f1).select(table.f2.max.alias('f2')).to_pandas()
         import pandas as pd
         import numpy as np
         assert_frame_equal(result_pdf, pd.DataFrame(data={'f2': np.int16([2])}))
diff --git a/flink-python/pyflink/table/tests/test_pandas_udf.py b/flink-python/pyflink/table/tests/test_pandas_udf.py
index 19b74f9..4c8fd69 100644
--- a/flink-python/pyflink/table/tests/test_pandas_udf.py
+++ b/flink-python/pyflink/table/tests/test_pandas_udf.py
@@ -21,6 +21,7 @@ import decimal
 import pytz
 
 from pyflink.table import DataTypes, Row
+from pyflink.table import expressions as E
 from pyflink.table.tests.test_udf import SubtractOne
 from pyflink.table.udf import udf
 from pyflink.testing import source_sink_utils
@@ -58,8 +59,8 @@ class PandasUDFITTests(object):
 
         t = self.t_env.from_elements([(1, 2, 3), (2, 5, 6), (3, 1, 9)], ['a', 'b', 'c'])
         exec_insert_table(
-            t.where("add_one(b) <= 3").select("a, b + 1, add(a + 1, subtract_one(c)) + 2, "
-                                              "add(add_one(a), 1L)"),
+            t.where(E.call('add_one', t.b) <= 3)
+             .select("a, b + 1, add(a + 1, subtract_one(c)) + 2, add(add_one(a), 1L)"),
             "Results")
         actual = source_sink_utils.results()
         self.assert_equals(actual, ["1,3,6,3", "3,2,14,5"])
diff --git a/flink-python/pyflink/table/tests/test_schema_operation.py b/flink-python/pyflink/table/tests/test_schema_operation.py
index b31b640..3df9dd3 100644
--- a/flink-python/pyflink/table/tests/test_schema_operation.py
+++ b/flink-python/pyflink/table/tests/test_schema_operation.py
@@ -24,12 +24,12 @@ class StreamTableSchemaTests(PyFlinkStreamTableTestCase):
 
     def test_print_schema(self):
         t = self.t_env.from_elements([(1, 'Hi', 'Hello')], ['a', 'b', 'c'])
-        result = t.group_by("c").select("a.sum, c as b")
+        result = t.group_by(t.c).select(t.a.sum, t.c.alias('b'))
         result.print_schema()
 
     def test_get_schema(self):
         t = self.t_env.from_elements([(1, 'Hi', 'Hello')], ['a', 'b', 'c'])
-        result = t.group_by("c").select("a.sum as a, c as b")
+        result = t.group_by(t.c).select(t.a.sum.alias('a'), t.c.alias('b'))
         schema = result.get_schema()
 
         assert schema == TableSchema(["a", "b"], [DataTypes.BIGINT(), DataTypes.STRING()])
diff --git a/flink-python/pyflink/table/tests/test_sort.py b/flink-python/pyflink/table/tests/test_sort.py
index e4ef078..59857f8 100644
--- a/flink-python/pyflink/table/tests/test_sort.py
+++ b/flink-python/pyflink/table/tests/test_sort.py
@@ -23,7 +23,7 @@ class BatchTableSortTests(PyFlinkBatchTableTestCase):
 
     def test_order_by_offset_fetch(self):
         t = self.t_env.from_elements([(1, "Hello")], ["a", "b"])
-        result = t.order_by("a.desc").offset(2).fetch(2)
+        result = t.order_by(t.a.desc).offset(2).fetch(2)
 
         query_operation = result._j_table.getQueryOperation()
         self.assertEqual(2, query_operation.getOffset())
diff --git a/flink-python/pyflink/table/tests/test_table_environment_api.py b/flink-python/pyflink/table/tests/test_table_environment_api.py
index a791656..9da1986 100644
--- a/flink-python/pyflink/table/tests/test_table_environment_api.py
+++ b/flink-python/pyflink/table/tests/test_table_environment_api.py
@@ -56,7 +56,7 @@ class TableEnvironmentTest(object):
             .add('c', DataTypes.STRING())
         t_env = self.t_env
         t = t_env.from_elements([], schema)
-        result = t.select("1 + a, b, c")
+        result = t.select(t.a + 1, t.b, t.c)
 
         actual = result.explain()
 
@@ -69,7 +69,7 @@ class TableEnvironmentTest(object):
             .add('c', DataTypes.STRING())
         t_env = self.t_env
         t = t_env.from_elements([], schema)
-        result = t.select("1 + a, b, c")
+        result = t.select(t.a + 1, t.b, t.c)
 
         actual = result.explain(ExplainDetail.ESTIMATED_COST, ExplainDetail.CHANGELOG_MODE)
 
diff --git a/flink-python/pyflink/table/tests/test_window.py b/flink-python/pyflink/table/tests/test_window.py
index d093c8a..0f820ab 100644
--- a/flink-python/pyflink/table/tests/test_window.py
+++ b/flink-python/pyflink/table/tests/test_window.py
@@ -18,6 +18,7 @@
 
 from py4j.protocol import Py4JJavaError
 
+from pyflink.table import expressions as expr
 from pyflink.table.window import Session, Slide, Tumble, Over
 from pyflink.testing.test_case_utils import PyFlinkStreamTableTestCase, PyFlinkBatchTableTestCase
 
@@ -28,8 +29,12 @@ class StreamTableWindowTests(PyFlinkStreamTableTestCase):
         t_env = self.t_env
         t = t_env.from_elements([(1, 1, "Hello")], ['a', 'b', 'c'])
 
-        result = t.over_window(Over.partition_by("c").order_by("a")
-                               .preceding("2.rows").following("current_row").alias("w"))
+        result = t.over_window(
+            Over.partition_by("c")
+                .order_by("a")
+                .preceding(expr.row_interval(2))
+                .following(expr.CURRENT_ROW)
+                .alias("w"))
 
         self.assertRaisesRegex(
             Py4JJavaError, "Ordering must be defined on a time attribute",
@@ -40,8 +45,8 @@ class BatchTableWindowTests(PyFlinkBatchTableTestCase):
 
     def test_tumble_window(self):
         t = self.t_env.from_elements([(1, 1, "Hello")], ["a", "b", "c"])
-        result = t.window(Tumble.over("2.rows").on("a").alias("w"))\
-            .group_by("w, c").select("b.sum")
+        result = t.window(Tumble.over(expr.row_interval(2)).on("a").alias("w"))\
+            .group_by(expr.col('w'), expr.col('c')).select(t.b.sum)
 
         query_operation = result._j_table.getQueryOperation().getChildren().get(0)
         self.assertEqual('[c]', query_operation.getGroupingExpressions().toString())
@@ -50,8 +55,8 @@ class BatchTableWindowTests(PyFlinkBatchTableTestCase):
 
     def test_slide_window(self):
         t = self.t_env.from_elements([(1000, 1, "Hello")], ["a", "b", "c"])
-        result = t.window(Slide.over("2.seconds").every("1.seconds").on("a").alias("w"))\
-            .group_by("w, c").select("b.sum")
+        result = t.window(Slide.over(expr.lit(2).seconds).every(expr.lit(1).seconds).on("a")
+                          .alias("w")).group_by(expr.col('w'), expr.col('c')).select(t.b.sum)
 
         query_operation = result._j_table.getQueryOperation().getChildren().get(0)
         self.assertEqual('[c]', query_operation.getGroupingExpressions().toString())
@@ -60,8 +65,8 @@ class BatchTableWindowTests(PyFlinkBatchTableTestCase):
 
     def test_session_window(self):
         t = self.t_env.from_elements([(1000, 1, "Hello")], ["a", "b", "c"])
-        result = t.window(Session.with_gap("1.seconds").on("a").alias("w"))\
-            .group_by("w, c").select("b.sum")
+        result = t.window(Session.with_gap(expr.lit(1).seconds).on("a").alias("w"))\
+            .group_by(expr.col('w'), expr.col('c')).select(t.b.sum)
 
         query_operation = result._j_table.getQueryOperation().getChildren().get(0)
         self.assertEqual('[c]', query_operation.getGroupingExpressions().toString())
diff --git a/flink-python/pyflink/table/utils.py b/flink-python/pyflink/table/utils.py
index b20bea0..52434e0 100644
--- a/flink-python/pyflink/table/utils.py
+++ b/flink-python/pyflink/table/utils.py
@@ -15,8 +15,9 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 ################################################################################
-
+from pyflink.java_gateway import get_gateway
 from pyflink.table.types import DataType, LocalZonedTimestampType
+from pyflink.util.utils import to_jarray
 
 
 def pandas_to_arrow(schema, timezone, field_types, series):
@@ -67,3 +68,11 @@ def tz_convert_to_internal(s, t: DataType, local_tz):
         elif is_datetime64tz_dtype(s.dtype):
             return s.dt.tz_convert(local_tz).dt.tz_localize(None)
     return s
+
+
+def to_expression_jarray(exprs):
+    """
+    Convert python list of Expression to java array of Expression.
+    """
+    gateway = get_gateway()
+    return to_jarray(gateway.jvm.Expression, [expr._j_expr for expr in exprs])
diff --git a/flink-python/pyflink/table/window.py b/flink-python/pyflink/table/window.py
index 218d640..3c99430 100644
--- a/flink-python/pyflink/table/window.py
+++ b/flink-python/pyflink/table/window.py
@@ -16,7 +16,11 @@
 # limitations under the License.
 ################################################################################
 from py4j.java_gateway import get_method
+from typing import Union
+
 from pyflink.java_gateway import get_gateway
+from pyflink.table import Expression
+from pyflink.table.expression import _get_java_expression
 
 __all__ = [
     'Tumble',
@@ -55,11 +59,16 @@ class Tumble(object):
     Example:
     ::
 
+        >>> from pyflink.table import expressions as expr
+        >>> Tumble.over(expr.lit(10).minutes)
+        ...       .on(expr.col("rowtime"))
+        ...       .alias("w")
+
         >>> Tumble.over("10.minutes").on("rowtime").alias("w")
     """
 
     @classmethod
-    def over(cls, size):
+    def over(cls, size: Union[str, Expression]):
         """
         Creates a tumbling window. Tumbling windows are fixed-size, consecutive, non-overlapping
         windows of a specified fixed length. For example, a tumbling window of 5 minutes size
@@ -68,9 +77,7 @@ class Tumble(object):
         :param size: The size of the window as time or row-count interval.
         :return: A partially defined tumbling window.
         """
-        # type: (str) -> TumbleWithSize
-        return TumbleWithSize(
-            get_gateway().jvm.Tumble.over(size))
+        return TumbleWithSize(get_gateway().jvm.Tumble.over(_get_java_expression(size)))
 
 
 class TumbleWithSize(object):
@@ -85,7 +92,7 @@ class TumbleWithSize(object):
     def __init__(self, java_window):
         self._java_window = java_window
 
-    def on(self, time_field):
+    def on(self, time_field: Union[str, Expression]):
         """
         Specifies the time attribute on which rows are grouped.
 
@@ -97,8 +104,7 @@ class TumbleWithSize(object):
         :param time_field: Time attribute for streaming and batch tables.
         :return: A tumbling window on event-time/processing-time.
         """
-        # type: (str) -> TumbleWithSizeOnTime
-        return TumbleWithSizeOnTime(self._java_window.on(time_field))
+        return TumbleWithSizeOnTime(self._java_window.on(_get_java_expression(time_field)))
 
 
 class TumbleWithSizeOnTime(object):
@@ -109,7 +115,7 @@ class TumbleWithSizeOnTime(object):
     def __init__(self, java_window):
         self._java_window = java_window
 
-    def alias(self, alias):
+    def alias(self, alias: str):
         """
         Assigns an alias for this window that the following
         :func:`~pyflink.table.GroupWindowedTable.group_by` and
@@ -120,7 +126,6 @@ class TumbleWithSizeOnTime(object):
         :param alias: Alias for this window.
         :return: This window.
         """
-        # type: (str) -> GroupWindow
         return GroupWindow(get_method(self._java_window, "as")(alias))
 
 
@@ -133,12 +138,17 @@ class Session(object):
     Example:
     ::
 
+        >>> from pyflink.table import expressions as expr
+        >>> Session.with_gap(expr.lit(10).minutes)
+        ...        .on(expr.col("rowtime"))
+        ...        .alias("w")
+
         >>> Session.with_gap("10.minutes").on("rowtime").alias("w")
 
     """
 
     @classmethod
-    def with_gap(cls, gap):
+    def with_gap(cls, gap: Union[str, Expression]):
         """
         Creates a session window. The boundary of session windows are defined by
         intervals of inactivity, i.e., a session window is closes if no event appears for a defined
@@ -148,9 +158,7 @@ class Session(object):
                     closing the session window.
         :return: A partially defined session window.
         """
-        # type: (str) -> SessionWithGap
-        return SessionWithGap(
-            get_gateway().jvm.Session.withGap(gap))
+        return SessionWithGap(get_gateway().jvm.Session.withGap(_get_java_expression(gap)))
 
 
 class SessionWithGap(object):
@@ -165,7 +173,7 @@ class SessionWithGap(object):
     def __init__(self, java_window):
         self._java_window = java_window
 
-    def on(self, time_field):
+    def on(self, time_field: Union[str, Expression]):
         """
         Specifies the time attribute on which rows are grouped.
 
@@ -177,8 +185,7 @@ class SessionWithGap(object):
         :param time_field: Time attribute for streaming and batch tables.
         :return: A tumbling window on event-time.
         """
-        # type: (str) -> SessionWithGapOnTime
-        return SessionWithGapOnTime(self._java_window.on(time_field))
+        return SessionWithGapOnTime(self._java_window.on(_get_java_expression(time_field)))
 
 
 class SessionWithGapOnTime(object):
@@ -189,7 +196,7 @@ class SessionWithGapOnTime(object):
     def __init__(self, java_window):
         self._java_window = java_window
 
-    def alias(self, alias):
+    def alias(self, alias: str):
         """
         Assigns an alias for this window that the following
         :func:`~pyflink.table.GroupWindowedTable.group_by` and
@@ -200,7 +207,6 @@ class SessionWithGapOnTime(object):
         :param alias: Alias for this window.
         :return: This window.
         """
-        # type: (str) -> GroupWindow
         return GroupWindow(get_method(self._java_window, "as")(alias))
 
 
@@ -217,11 +223,17 @@ class Slide(object):
     Example:
     ::
 
+        >>> from pyflink.table import expressions as expr
+        >>> Slide.over(expr.lit(10).minutes)
+        ...      .every(expr.lit(5).minutes)
+        ...      .on(expr.col("rowtime"))
+        ...      .alias("w")
+
         >>> Slide.over("10.minutes").every("5.minutes").on("rowtime").alias("w")
     """
 
     @classmethod
-    def over(cls, size):
+    def over(cls, size: Union[str, Expression]):
         """
         Creates a sliding window. Sliding windows have a fixed size and slide by
         a specified slide interval. If the slide interval is smaller than the window size, sliding
@@ -234,9 +246,7 @@ class Slide(object):
         :param size: The size of the window as time or row-count interval.
         :return: A partially specified sliding window.
         """
-        # type: (str) -> SlideWithSize
-        return SlideWithSize(
-            get_gateway().jvm.Slide.over(size))
+        return SlideWithSize(get_gateway().jvm.Slide.over(_get_java_expression(size)))
 
 
 class SlideWithSize(object):
@@ -248,7 +258,7 @@ class SlideWithSize(object):
     def __init__(self, java_window):
         self._java_window = java_window
 
-    def every(self, slide):
+    def every(self, slide: Union[str, Expression]):
         """
         Specifies the window's slide as time or row-count interval.
 
@@ -262,8 +272,7 @@ class SlideWithSize(object):
         :param slide: The slide of the window either as time or row-count interval.
         :return: A sliding window.
         """
-        # type: (str) -> SlideWithSizeAndSlide
-        return SlideWithSizeAndSlide(self._java_window.every(slide))
+        return SlideWithSizeAndSlide(self._java_window.every(_get_java_expression(slide)))
 
 
 class SlideWithSizeAndSlide(object):
@@ -278,7 +287,7 @@ class SlideWithSizeAndSlide(object):
     def __init__(self, java_window):
         self._java_window = java_window
 
-    def on(self, time_field):
+    def on(self, time_field: Union[str, Expression]):
         """
         Specifies the time attribute on which rows are grouped.
 
@@ -288,7 +297,7 @@ class SlideWithSizeAndSlide(object):
         For batch tables you can specify grouping on a timestamp or long attribute.
         """
         # type: (str) -> SlideWithSizeAndSlideOnTime
-        return SlideWithSizeAndSlideOnTime(self._java_window.on(time_field))
+        return SlideWithSizeAndSlideOnTime(self._java_window.on(_get_java_expression(time_field)))
 
 
 class SlideWithSizeAndSlideOnTime(object):
@@ -299,7 +308,7 @@ class SlideWithSizeAndSlideOnTime(object):
     def __init__(self, java_window):
         self._java_window = java_window
 
-    def alias(self, alias):
+    def alias(self, alias: str):
         """
         Assigns an alias for this window that the following
         :func:`~pyflink.table.GroupWindowedTable.group_by` and
@@ -310,9 +319,7 @@ class SlideWithSizeAndSlideOnTime(object):
         :param alias: Alias for this window.
         :return: This window.
         """
-        # type: (str) -> GroupWindow
-        return GroupWindow(
-            get_method(self._java_window, "as")(alias))
+        return GroupWindow(get_method(self._java_window, "as")(alias))
 
 
 class Over(object):
@@ -325,11 +332,17 @@ class Over(object):
     Example:
     ::
 
+        >>> from pyflink.table import expressions as expr
+        >>> Over.partition_by(col("a")) \\
+        ...     .order_by(col("rowtime")) \\
+        ...     .preceding(expr.UNBOUNDED_RANGE) \\
+        ...     .alias("w")
+
         >>> Over.partition_by("a").order_by("rowtime").preceding("unbounded_range").alias("w")
     """
 
     @classmethod
-    def order_by(cls, order_by):
+    def order_by(cls, order_by: Union[str, Expression]):
         """
         Specifies the time attribute on which rows are ordered.
 
@@ -339,11 +352,11 @@ class Over(object):
         :param order_by: Field reference.
         :return: An over window with defined order.
         """
-        # type: (str) -> OverWindowPartitionedOrdered
-        return OverWindowPartitionedOrdered(get_gateway().jvm.Over.orderBy(order_by))
+        return OverWindowPartitionedOrdered(get_gateway().jvm.Over.orderBy(
+            _get_java_expression(order_by)))
 
     @classmethod
-    def partition_by(cls, partition_by):
+    def partition_by(cls, partition_by: Union[str, Expression]):
         """
         Partitions the elements on some partition keys.
 
@@ -353,8 +366,8 @@ class Over(object):
         :param partition_by: List of field references.
         :return: An over window with defined partitioning.
         """
-        # type: (str) -> OverWindowPartitioned
-        return OverWindowPartitioned(get_gateway().jvm.Over.partitionBy(partition_by))
+        return OverWindowPartitioned(get_gateway().jvm.Over.partitionBy(
+            _get_java_expression(partition_by)))
 
 
 class OverWindowPartitionedOrdered(object):
@@ -365,26 +378,24 @@ class OverWindowPartitionedOrdered(object):
     def __init__(self, java_over_window):
         self._java_over_window = java_over_window
 
-    def alias(self, alias):
+    def alias(self, alias: str):
         """
         Set the preceding offset (based on time or row-count intervals) for over window.
 
         :param alias: Preceding offset relative to the current row.
         :return: An over window with defined preceding.
         """
-        # type: (str) -> OverWindow
         return OverWindow(get_method(self._java_over_window, "as")(alias))
 
-    def preceding(self, preceding):
+    def preceding(self, preceding: Union[str, Expression]):
         """
         Set the preceding offset (based on time or row-count intervals) for over window.
 
         :param preceding: Preceding offset relative to the current row.
         :return: An over window with defined preceding.
         """
-        # type: (str) -> OverWindowPartitionedOrderedPreceding
         return OverWindowPartitionedOrderedPreceding(
-            self._java_over_window.preceding(preceding))
+            self._java_over_window.preceding(_get_java_expression(preceding)))
 
 
 class OverWindowPartitionedOrderedPreceding(object):
@@ -395,7 +406,7 @@ class OverWindowPartitionedOrderedPreceding(object):
     def __init__(self, java_over_window):
         self._java_over_window = java_over_window
 
-    def alias(self, alias):
+    def alias(self, alias: str):
         """
         Assigns an alias for this window that the following
         :func:`~pyflink.table.OverWindowedTable.select` clause can refer to.
@@ -403,19 +414,17 @@ class OverWindowPartitionedOrderedPreceding(object):
         :param alias: Alias for this over window.
         :return: The fully defined over window.
         """
-        # type: (str) -> OverWindow
         return OverWindow(get_method(self._java_over_window, "as")(alias))
 
-    def following(self, following):
+    def following(self, following: Union[str, Expression]):
         """
         Set the following offset (based on time or row-count intervals) for over window.
 
         :param following: Following offset that relative to the current row.
         :return: An over window with defined following.
         """
-        # type: (str) -> OverWindowPartitionedOrderedPreceding
         return OverWindowPartitionedOrderedPreceding(
-            self._java_over_window.following(following))
+            self._java_over_window.following(_get_java_expression(following)))
 
 
 class OverWindowPartitioned(object):
@@ -426,7 +435,7 @@ class OverWindowPartitioned(object):
     def __init__(self, java_over_window):
         self._java_over_window = java_over_window
 
-    def order_by(self, order_by):
+    def order_by(self, order_by: Union[str, Expression]):
         """
         Specifies the time attribute on which rows are ordered.
 
@@ -438,7 +447,8 @@ class OverWindowPartitioned(object):
         :param order_by: Field reference.
         :return: An over window with defined order.
         """
-        return OverWindowPartitionedOrdered(self._java_over_window.orderBy(order_by))
+        return OverWindowPartitionedOrdered(self._java_over_window.orderBy(
+            _get_java_expression(order_by)))
 
 
 class OverWindow(object):