You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by di...@apache.org on 2020/05/29 01:46:34 UTC

[flink] branch master updated: [FLINK-17946][python] Fix the bug that the config option 'pipeline.jars' doesn't work.

This is an automated email from the ASF dual-hosted git repository.

dianfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new cda25e5  [FLINK-17946][python] Fix the bug that the config option 'pipeline.jars' doesn't work.
cda25e5 is described below

commit cda25e5d185cf04c778ed4d173f9db6dfe3522ad
Author: Wei Zhong <we...@gmail.com>
AuthorDate: Wed May 27 21:09:27 2020 +0800

    [FLINK-17946][python] Fix the bug that the config option 'pipeline.jars' doesn't work.
    
    This closes #12372.
---
 docs/dev/table/python/common_questions.md          |   8 +-
 docs/dev/table/python/common_questions.zh.md       |   8 +-
 flink-python/pyflink/ml/api/base.py                |   2 +-
 flink-python/pyflink/table/statement_set.py        |   8 +-
 flink-python/pyflink/table/table.py                |  97 ++++++++++---------
 flink-python/pyflink/table/table_environment.py    |  30 +++---
 .../table/tests/test_table_environment_api.py      | 103 +++++++++++++++++++--
 7 files changed, 178 insertions(+), 78 deletions(-)

diff --git a/docs/dev/table/python/common_questions.md b/docs/dev/table/python/common_questions.md
index 89da24c..f0858e2 100644
--- a/docs/dev/table/python/common_questions.md
+++ b/docs/dev/table/python/common_questions.md
@@ -69,11 +69,11 @@ A PyFlink job may depend on jar files, i.e. connectors, Java UDFs, etc.
 You can specify the dependencies with the following Python Table APIs or through <a href="{{ site.baseurl }}/ops/cli.html#usage">command-line arguments</a> directly when submitting the job.
 
 {% highlight python %}
-# NOTE: Only local file URLs (start with "file://") are supported.
-table_env.get_config().set_configuration("pipeline.jars", "file:///my/jar/path/connector.jar;file:///my/jar/path/udf.jar")
+# NOTE: Only local file URLs (start with "file:") are supported.
+table_env.get_config().get_configuration().set_string("pipeline.jars", "file:///my/jar/path/connector.jar;file:///my/jar/path/udf.jar")
 
-# NOTE: The Paths must specify a protocol (e.g. file://) and users should ensure that the URLs are accessible on both the client and the cluster.
-table_env.get_config().set_configuration("pipeline.classpaths", "file:///my/jar/path/connector.jar;file:///my/jar/path/udf.jar")
+# NOTE: The Paths must specify a protocol (e.g. "file") and users should ensure that the URLs are accessible on both the client and the cluster.
+table_env.get_config().get_configuration().set_string("pipeline.classpaths", "file:///my/jar/path/connector.jar;file:///my/jar/path/udf.jar")
 {% endhighlight %}
 
 For details about the APIs of adding Java dependency, you can refer to [the relevant documentation]({{ site.baseurl }}/dev/table/python/dependency_management.html##java-dependency)
diff --git a/docs/dev/table/python/common_questions.zh.md b/docs/dev/table/python/common_questions.zh.md
index f11f83c..58dbdc6 100644
--- a/docs/dev/table/python/common_questions.zh.md
+++ b/docs/dev/table/python/common_questions.zh.md
@@ -69,11 +69,11 @@ A PyFlink job may depend on jar files, i.e. connectors, Java UDFs, etc.
 You can specify the dependencies with the following Python Table APIs or through <a href="{{ site.baseurl }}/zh/ops/cli.html#usage">command-line arguments</a> directly when submitting the job.
 
 {% highlight python %}
-# NOTE: Only local file URLs (start with "file://") are supported.
-table_env.get_config().set_configuration("pipeline.jars", "file:///my/jar/path/connector.jar;file:///my/jar/path/udf.jar")
+# NOTE: Only local file URLs (start with "file:") are supported.
+table_env.get_config().get_configuration().set_string("pipeline.jars", "file:///my/jar/path/connector.jar;file:///my/jar/path/udf.jar")
 
-# NOTE: The Paths must specify a protocol (e.g. file://) and users should ensure that the URLs are accessible on both the client and the cluster.
-table_env.get_config().set_configuration("pipeline.classpaths", "file:///my/jar/path/connector.jar;file:///my/jar/path/udf.jar")
+# NOTE: The Paths must specify a protocol (e.g. "file") and users should ensure that the URLs are accessible on both the client and the cluster.
+table_env.get_config().get_configuration().set_string("pipeline.classpaths", "file:///my/jar/path/connector.jar;file:///my/jar/path/udf.jar")
 {% endhighlight %}
 
 For details about the APIs of adding Java dependency, you can refer to [the relevant documentation]({{ site.baseurl }}/zh/dev/table/python/dependency_management.html##java-dependency)
diff --git a/flink-python/pyflink/ml/api/base.py b/flink-python/pyflink/ml/api/base.py
index 3674e6c..6b7c39d 100644
--- a/flink-python/pyflink/ml/api/base.py
+++ b/flink-python/pyflink/ml/api/base.py
@@ -110,7 +110,7 @@ class JavaTransformer(Transformer):
         :returns: the transformed table
         """
         self._convert_params_to_java(self._j_obj)
-        return Table(self._j_obj.transform(table_env._j_tenv, table._j_table))
+        return Table(self._j_obj.transform(table_env._j_tenv, table._j_table), table_env)
 
 
 class Model(Transformer):
diff --git a/flink-python/pyflink/table/statement_set.py b/flink-python/pyflink/table/statement_set.py
index 9bf5dc6..005f384 100644
--- a/flink-python/pyflink/table/statement_set.py
+++ b/flink-python/pyflink/table/statement_set.py
@@ -16,6 +16,7 @@
 # limitations under the License.
 ################################################################################
 
+from pyflink.table.table_result import TableResult
 from pyflink.util.utils import to_j_explain_detail_arr
 
 __all__ = ['StatementSet']
@@ -34,8 +35,9 @@ class StatementSet(object):
 
     """
 
-    def __init__(self, _j_statement_set):
+    def __init__(self, _j_statement_set, t_env):
         self._j_statement_set = _j_statement_set
+        self._t_env = t_env
 
     def add_insert_sql(self, stmt):
         """
@@ -89,5 +91,5 @@ class StatementSet(object):
 
         :return: execution result.
         """
-        # TODO convert java TableResult to python TableResult once FLINK-17303 is finished
-        return self._j_statement_set.execute()
+        self._t_env._before_execute()
+        return TableResult(self._j_statement_set.execute())
diff --git a/flink-python/pyflink/table/table.py b/flink-python/pyflink/table/table.py
index f5b5687..7174006 100644
--- a/flink-python/pyflink/table/table.py
+++ b/flink-python/pyflink/table/table.py
@@ -22,6 +22,7 @@ from py4j.java_gateway import get_method
 
 from pyflink.java_gateway import get_gateway
 from pyflink.table.serializers import ArrowSerializer
+from pyflink.table.table_result import TableResult
 from pyflink.table.table_schema import TableSchema
 from pyflink.table.types import create_arrow_schema
 from pyflink.table.utils import tz_convert_from_internal
@@ -62,8 +63,9 @@ class Table(object):
     the expression syntax.
     """
 
-    def __init__(self, j_table):
+    def __init__(self, j_table, t_env):
         self._j_table = j_table
+        self._t_env = t_env
 
     def select(self, fields):
         """
@@ -80,7 +82,7 @@ class Table(object):
         :return: The result table.
         :rtype: pyflink.table.Table
         """
-        return Table(self._j_table.select(fields))
+        return Table(self._j_table.select(fields), self._t_env)
 
     def alias(self, field, *fields):
         """
@@ -101,7 +103,7 @@ class Table(object):
         """
         gateway = get_gateway()
         extra_fields = to_jarray(gateway.jvm.String, fields)
-        return Table(get_method(self._j_table, "as")(field, extra_fields))
+        return Table(get_method(self._j_table, "as")(field, extra_fields), self._t_env)
 
     def filter(self, predicate):
         """
@@ -118,7 +120,7 @@ class Table(object):
         :return: The result table.
         :rtype: pyflink.table.Table
         """
-        return Table(self._j_table.filter(predicate))
+        return Table(self._j_table.filter(predicate), self._t_env)
 
     def where(self, predicate):
         """
@@ -135,7 +137,7 @@ class Table(object):
         :return: The result table.
         :rtype: pyflink.table.Table
         """
-        return Table(self._j_table.where(predicate))
+        return Table(self._j_table.where(predicate), self._t_env)
 
     def group_by(self, fields):
         """
@@ -152,7 +154,7 @@ class Table(object):
         :return: The grouped table.
         :rtype: pyflink.table.GroupedTable
         """
-        return GroupedTable(self._j_table.groupBy(fields))
+        return GroupedTable(self._j_table.groupBy(fields), self._t_env)
 
     def distinct(self):
         """
@@ -166,7 +168,7 @@ class Table(object):
         :return: The result table.
         :rtype: pyflink.table.Table
         """
-        return Table(self._j_table.distinct())
+        return Table(self._j_table.distinct(), self._t_env)
 
     def join(self, right, join_predicate=None):
         """
@@ -193,9 +195,9 @@ class Table(object):
         :rtype: pyflink.table.Table
         """
         if join_predicate is not None:
-            return Table(self._j_table.join(right._j_table, join_predicate))
+            return Table(self._j_table.join(right._j_table, join_predicate), self._t_env)
         else:
-            return Table(self._j_table.join(right._j_table))
+            return Table(self._j_table.join(right._j_table), self._t_env)
 
     def left_outer_join(self, right, join_predicate=None):
         """
@@ -222,9 +224,9 @@ class Table(object):
         :rtype: pyflink.table.Table
         """
         if join_predicate is None:
-            return Table(self._j_table.leftOuterJoin(right._j_table))
+            return Table(self._j_table.leftOuterJoin(right._j_table), self._t_env)
         else:
-            return Table(self._j_table.leftOuterJoin(right._j_table, join_predicate))
+            return Table(self._j_table.leftOuterJoin(right._j_table, join_predicate), self._t_env)
 
     def right_outer_join(self, right, join_predicate):
         """
@@ -249,7 +251,7 @@ class Table(object):
         :return: The result table.
         :rtype: pyflink.table.Table
         """
-        return Table(self._j_table.rightOuterJoin(right._j_table, join_predicate))
+        return Table(self._j_table.rightOuterJoin(right._j_table, join_predicate), self._t_env)
 
     def full_outer_join(self, right, join_predicate):
         """
@@ -274,7 +276,7 @@ class Table(object):
         :return: The result table.
         :rtype: pyflink.table.Table
         """
-        return Table(self._j_table.fullOuterJoin(right._j_table, join_predicate))
+        return Table(self._j_table.fullOuterJoin(right._j_table, join_predicate), self._t_env)
 
     def join_lateral(self, table_function_call, join_predicate=None):
         """
@@ -297,9 +299,10 @@ class Table(object):
         :rtype: pyflink.table.Table
         """
         if join_predicate is None:
-            return Table(self._j_table.joinLateral(table_function_call))
+            return Table(self._j_table.joinLateral(table_function_call), self._t_env)
         else:
-            return Table(self._j_table.joinLateral(table_function_call, join_predicate))
+            return Table(self._j_table.joinLateral(table_function_call, join_predicate),
+                         self._t_env)
 
     def left_outer_join_lateral(self, table_function_call, join_predicate=None):
         """
@@ -323,9 +326,10 @@ class Table(object):
         :rtype: pyflink.table.Table
         """
         if join_predicate is None:
-            return Table(self._j_table.leftOuterJoinLateral(table_function_call))
+            return Table(self._j_table.leftOuterJoinLateral(table_function_call), self._t_env)
         else:
-            return Table(self._j_table.leftOuterJoinLateral(table_function_call, join_predicate))
+            return Table(self._j_table.leftOuterJoinLateral(table_function_call, join_predicate),
+                         self._t_env)
 
     def minus(self, right):
         """
@@ -348,7 +352,7 @@ class Table(object):
         :return: The result table.
         :rtype: pyflink.table.Table
         """
-        return Table(self._j_table.minus(right._j_table))
+        return Table(self._j_table.minus(right._j_table), self._t_env)
 
     def minus_all(self, right):
         """
@@ -372,7 +376,7 @@ class Table(object):
         :return: The result table.
         :rtype: pyflink.table.Table
         """
-        return Table(self._j_table.minusAll(right._j_table))
+        return Table(self._j_table.minusAll(right._j_table), self._t_env)
 
     def union(self, right):
         """
@@ -393,7 +397,7 @@ class Table(object):
         :return: The result table.
         :rtype: pyflink.table.Table
         """
-        return Table(self._j_table.union(right._j_table))
+        return Table(self._j_table.union(right._j_table), self._t_env)
 
     def union_all(self, right):
         """
@@ -414,7 +418,7 @@ class Table(object):
         :return: The result table.
         :rtype: pyflink.table.Table
         """
-        return Table(self._j_table.unionAll(right._j_table))
+        return Table(self._j_table.unionAll(right._j_table), self._t_env)
 
     def intersect(self, right):
         """
@@ -438,7 +442,7 @@ class Table(object):
         :return: The result table.
         :rtype: pyflink.table.Table
         """
-        return Table(self._j_table.intersect(right._j_table))
+        return Table(self._j_table.intersect(right._j_table), self._t_env)
 
     def intersect_all(self, right):
         """
@@ -462,7 +466,7 @@ class Table(object):
         :return: The result table.
         :rtype: pyflink.table.Table
         """
-        return Table(self._j_table.intersectAll(right._j_table))
+        return Table(self._j_table.intersectAll(right._j_table), self._t_env)
 
     def order_by(self, fields):
         """
@@ -479,7 +483,7 @@ class Table(object):
         :return: The result table.
         :rtype: pyflink.table.Table
         """
-        return Table(self._j_table.orderBy(fields))
+        return Table(self._j_table.orderBy(fields), self._t_env)
 
     def offset(self, offset):
         """
@@ -502,7 +506,7 @@ class Table(object):
         :return: The result table.
         :rtype: pyflink.table.Table
         """
-        return Table(self._j_table.offset(offset))
+        return Table(self._j_table.offset(offset), self._t_env)
 
     def fetch(self, fetch):
         """
@@ -529,7 +533,7 @@ class Table(object):
         :return: The result table.
         :rtype: pyflink.table.Table
         """
-        return Table(self._j_table.fetch(fetch))
+        return Table(self._j_table.fetch(fetch), self._t_env)
 
     def window(self, window):
         """
@@ -566,7 +570,7 @@ class Table(object):
         :return: A group windowed table.
         :rtype: GroupWindowedTable
         """
-        return GroupWindowedTable(self._j_table.window(window._java_window))
+        return GroupWindowedTable(self._j_table.window(window._java_window), self._t_env)
 
     def over_window(self, *over_windows):
         """
@@ -600,7 +604,7 @@ class Table(object):
         gateway = get_gateway()
         window_array = to_jarray(gateway.jvm.OverWindow,
                                  [item._java_over_window for item in over_windows])
-        return OverWindowedTable(self._j_table.window(window_array))
+        return OverWindowedTable(self._j_table.window(window_array), self._t_env)
 
     def add_columns(self, fields):
         """
@@ -618,7 +622,7 @@ class Table(object):
         :return: The result table.
         :rtype: pyflink.table.Table
         """
-        return Table(self._j_table.addColumns(fields))
+        return Table(self._j_table.addColumns(fields), self._t_env)
 
     def add_or_replace_columns(self, fields):
         """
@@ -637,7 +641,7 @@ class Table(object):
         :return: The result table.
         :rtype: pyflink.table.Table
         """
-        return Table(self._j_table.addOrReplaceColumns(fields))
+        return Table(self._j_table.addOrReplaceColumns(fields), self._t_env)
 
     def rename_columns(self, fields):
         """
@@ -654,7 +658,7 @@ class Table(object):
         :return: The result table.
         :rtype: pyflink.table.Table
         """
-        return Table(self._j_table.renameColumns(fields))
+        return Table(self._j_table.renameColumns(fields), self._t_env)
 
     def drop_columns(self, fields):
         """
@@ -670,7 +674,7 @@ class Table(object):
         :return: The result table.
         :rtype: pyflink.table.Table
         """
-        return Table(self._j_table.dropColumns(fields))
+        return Table(self._j_table.dropColumns(fields), self._t_env)
 
     def insert_into(self, table_path):
         """
@@ -709,6 +713,7 @@ class Table(object):
 
         :return: the result pandas DataFrame.
         """
+        self._t_env._before_execute()
         gateway = get_gateway()
         max_arrow_batch_size = self._j_table.getTableEnvironment().getConfig().getConfiguration()\
             .getInteger(gateway.jvm.org.apache.flink.python.PythonOptions.MAX_ARROW_BATCH_SIZE)
@@ -770,8 +775,8 @@ class Table(object):
         :type overwrite: bool
         :return: The table result.
         """
-        # TODO convert java TableResult to python TableResult once FLINK-17303 is finished
-        self._j_table.executeInsert(table_path, overwrite)
+        self._t_env._before_execute()
+        return TableResult(self._j_table.executeInsert(table_path, overwrite))
 
     def execute(self):
         """
@@ -784,8 +789,8 @@ class Table(object):
 
         :return: The content of the table.
         """
-        # TODO convert java TableResult to python TableResult once FLINK-17303 is finished
-        self._j_table.execute()
+        self._t_env._before_execute()
+        return TableResult(self._j_table.execute())
 
     def explain(self, *extra_details):
         """
@@ -809,8 +814,9 @@ class GroupedTable(object):
     A table that has been grouped on a set of grouping keys.
     """
 
-    def __init__(self, java_table):
+    def __init__(self, java_table, t_env):
         self._j_table = java_table
+        self._t_env = t_env
 
     def select(self, fields):
         """
@@ -828,7 +834,7 @@ class GroupedTable(object):
         :return: The result table.
         :rtype: pyflink.table.Table
         """
-        return Table(self._j_table.select(fields))
+        return Table(self._j_table.select(fields), self._t_env)
 
 
 class GroupWindowedTable(object):
@@ -836,8 +842,9 @@ class GroupWindowedTable(object):
     A table that has been windowed for :class:`~pyflink.table.GroupWindow`.
     """
 
-    def __init__(self, java_group_windowed_table):
+    def __init__(self, java_group_windowed_table, t_env):
         self._j_table = java_group_windowed_table
+        self._t_env = t_env
 
     def group_by(self, fields):
         """
@@ -861,7 +868,7 @@ class GroupWindowedTable(object):
         :return: A window grouped table.
         :rtype: pyflink.table.WindowGroupedTable
         """
-        return WindowGroupedTable(self._j_table.groupBy(fields))
+        return WindowGroupedTable(self._j_table.groupBy(fields), self._t_env)
 
 
 class WindowGroupedTable(object):
@@ -869,8 +876,9 @@ class WindowGroupedTable(object):
     A table that has been windowed and grouped for :class:`~pyflink.table.window.GroupWindow`.
     """
 
-    def __init__(self, java_window_grouped_table):
+    def __init__(self, java_window_grouped_table, t_env):
         self._j_table = java_window_grouped_table
+        self._t_env = t_env
 
     def select(self, fields):
         """
@@ -888,7 +896,7 @@ class WindowGroupedTable(object):
         :return: The result table.
         :rtype: pyflink.table.Table
         """
-        return Table(self._j_table.select(fields))
+        return Table(self._j_table.select(fields), self._t_env)
 
 
 class OverWindowedTable(object):
@@ -900,8 +908,9 @@ class OverWindowedTable(object):
     its neighboring rows.
     """
 
-    def __init__(self, java_over_windowed_table):
+    def __init__(self, java_over_windowed_table, t_env):
         self._j_table = java_over_windowed_table
+        self._t_env = t_env
 
     def select(self, fields):
         """
@@ -919,4 +928,4 @@ class OverWindowedTable(object):
         :return: The result table.
         :rtype: pyflink.table.Table
         """
-        return Table(self._j_table.select(fields))
+        return Table(self._j_table.select(fields), self._t_env)
diff --git a/flink-python/pyflink/table/table_environment.py b/flink-python/pyflink/table/table_environment.py
index 1d2be8f..3e7b70b 100644
--- a/flink-python/pyflink/table/table_environment.py
+++ b/flink-python/pyflink/table/table_environment.py
@@ -117,7 +117,7 @@ class TableEnvironment(object):
         :return: The result table.
         :rtype: pyflink.table.Table
         """
-        return Table(self._j_tenv.fromTableSource(table_source._j_table_source))
+        return Table(self._j_tenv.fromTableSource(table_source._j_table_source), self)
 
     def register_catalog(self, catalog_name, catalog):
         """
@@ -251,7 +251,7 @@ class TableEnvironment(object):
         gateway = get_gateway()
         j_table_paths = utils.to_jarray(gateway.jvm.String, table_path)
         j_table = self._j_tenv.scan(j_table_paths)
-        return Table(j_table)
+        return Table(j_table, self)
 
     def from_path(self, path):
         """
@@ -289,7 +289,7 @@ class TableEnvironment(object):
         .. seealso:: :func:`use_database`
         .. versionadded:: 1.10.0
         """
-        return Table(get_method(self._j_tenv, "from")(path))
+        return Table(get_method(self._j_tenv, "from")(path), self)
 
     def insert_into(self, target_path, table):
         """
@@ -518,7 +518,7 @@ class TableEnvironment(object):
         :rtype: pyflink.table.Table
         """
         j_table = self._j_tenv.sqlQuery(query)
-        return Table(j_table)
+        return Table(j_table, self)
 
     def execute_sql(self, stmt):
         """
@@ -532,6 +532,7 @@ class TableEnvironment(object):
                 the affected row count for `DML` (-1 means unknown),
                 or a string message ("OK") for other statements.
         """
+        self._before_execute()
         return TableResult(self._j_tenv.executeSql(stmt))
 
     def create_statement_set(self):
@@ -544,7 +545,7 @@ class TableEnvironment(object):
         :rtype: pyflink.table.StatementSet
         """
         _j_statement_set = self._j_tenv.createStatementSet()
-        return StatementSet(_j_statement_set)
+        return StatementSet(_j_statement_set, self)
 
     def sql_update(self, stmt):
         """
@@ -1041,11 +1042,7 @@ class TableEnvironment(object):
         """
         warnings.warn("Deprecated in 1.11. Use execute_sql for single sink, "
                       "use create_statement_set for multiple sinks.", DeprecationWarning)
-        jvm = get_gateway().jvm
-        jars_key = jvm.org.apache.flink.configuration.PipelineOptions.JARS.key()
-        classpaths_key = jvm.org.apache.flink.configuration.PipelineOptions.CLASSPATHS.key()
-        self._add_jars_to_j_env_config(jars_key)
-        self._add_jars_to_j_env_config(classpaths_key)
+        self._before_execute()
         return JobExecutionResult(self._j_tenv.execute(job_name))
 
     def from_elements(self, elements, schema=None, verify_schema=True):
@@ -1181,7 +1178,7 @@ class TableEnvironment(object):
             j_table_source = PythonInputFormatTableSource(
                 j_input_format, row_type_info)
 
-            return Table(self._j_tenv.fromTableSource(j_table_source))
+            return Table(self._j_tenv.fromTableSource(j_table_source), self)
         finally:
             os.unlink(temp_file.name)
 
@@ -1262,7 +1259,7 @@ class TableEnvironment(object):
             j_arrow_table_source = \
                 jvm.org.apache.flink.table.runtime.arrow.ArrowUtils.createArrowTableSource(
                     data_type, temp_file.name)
-            return Table(self._j_tenv.fromTableSource(j_arrow_table_source))
+            return Table(self._j_tenv.fromTableSource(j_arrow_table_source), self)
         finally:
             os.unlink(temp_file.name)
 
@@ -1281,7 +1278,7 @@ class TableEnvironment(object):
             jar_urls_set = set([jvm.java.net.URL(url).toString() for url in jar_urls.split(";")])
             j_configuration = get_j_env_configuration(self)
             if j_configuration.containsKey(config_key):
-                for url in j_configuration.getString(config_key).split(";"):
+                for url in j_configuration.getString(config_key, "").split(";"):
                     jar_urls_set.add(url)
             j_configuration.setString(config_key, ";".join(jar_urls_set))
 
@@ -1325,6 +1322,13 @@ class TableEnvironment(object):
         function_catalog = function_catalog_field.get(self._j_tenv)
         return function_catalog
 
+    def _before_execute(self):
+        jvm = get_gateway().jvm
+        jars_key = jvm.org.apache.flink.configuration.PipelineOptions.JARS.key()
+        classpaths_key = jvm.org.apache.flink.configuration.PipelineOptions.CLASSPATHS.key()
+        self._add_jars_to_j_env_config(jars_key)
+        self._add_jars_to_j_env_config(classpaths_key)
+
 
 class StreamTableEnvironment(TableEnvironment):
 
diff --git a/flink-python/pyflink/table/tests/test_table_environment_api.py b/flink-python/pyflink/table/tests/test_table_environment_api.py
index 87215e3..8a2b7fc 100644
--- a/flink-python/pyflink/table/tests/test_table_environment_api.py
+++ b/flink-python/pyflink/table/tests/test_table_environment_api.py
@@ -368,12 +368,101 @@ class StreamTableEnvironmentTests(TableEnvironmentTest, PyFlinkStreamTableTestCa
         self.assert_equals(results, ['2,hi,hello\n', '3,hello,hello\n'])
 
     def test_set_jars(self):
-        self.verify_set_java_dependencies("pipeline.jars")
+        self.verify_set_java_dependencies("pipeline.jars", self.execute_with_t_env)
+
+    def test_set_jars_with_execute_sql(self):
+        self.verify_set_java_dependencies("pipeline.jars", self.execute_with_execute_sql)
+
+    def test_set_jars_with_statement_set(self):
+        self.verify_set_java_dependencies("pipeline.jars", self.execute_with_statement_set)
+
+    def test_set_jars_with_table(self):
+        self.verify_set_java_dependencies("pipeline.jars", self.execute_with_table)
+
+    def test_set_jars_with_table_execute_insert(self):
+        self.verify_set_java_dependencies("pipeline.jars", self.execute_with_table_execute_insert)
+
+    def test_set_jars_with_table_to_pandas(self):
+        self.verify_set_java_dependencies("pipeline.jars", self.execute_with_table_to_pandas)
 
     def test_set_classpaths(self):
-        self.verify_set_java_dependencies("pipeline.classpaths")
+        self.verify_set_java_dependencies("pipeline.classpaths", self.execute_with_t_env)
+
+    def test_set_classpaths_with_execute_sql(self):
+        self.verify_set_java_dependencies("pipeline.classpaths", self.execute_with_execute_sql)
+
+    def test_set_classpaths_with_statement_set(self):
+        self.verify_set_java_dependencies("pipeline.classpaths", self.execute_with_statement_set)
+
+    def test_set_classpaths_with_table(self):
+        self.verify_set_java_dependencies("pipeline.classpaths", self.execute_with_table)
+
+    def test_set_classpaths_with_table_execute_insert(self):
+        self.verify_set_java_dependencies(
+            "pipeline.classpaths", self.execute_with_table_execute_insert)
+
+    def test_set_classpaths_with_table_to_pandas(self):
+        self.verify_set_java_dependencies("pipeline.classpaths", self.execute_with_table_to_pandas)
+
+    def execute_with_t_env(self, t_env):
+        source = t_env.from_elements([(1, "Hi"), (2, "Hello")], ["a", "b"])
+        source.select("func1(a, b), func2(a, b)").insert_into("sink")
+        t_env.execute("test")
+        actual = source_sink_utils.results()
+        expected = ['1 and Hi,1 or Hi', '2 and Hello,2 or Hello']
+        self.assert_equals(actual, expected)
+
+    @staticmethod
+    def execute_with_execute_sql(t_env):
+        source = t_env.from_elements([(1, "Hi"), (2, "Hello")], ["a", "b"])
+        t_env.create_temporary_view("source", source)
+        t_env.execute_sql("select func1(a, b), func2(a, b) from source") \
+            .get_job_client() \
+            .get_job_execution_result(
+                get_gateway().jvm.Thread.currentThread().getContextClassLoader()) \
+            .result()
+
+    def execute_with_statement_set(self, t_env):
+        source = t_env.from_elements([(1, "Hi"), (2, "Hello")], ["a", "b"])
+        result = source.select("func1(a, b), func2(a, b)")
+        t_env.create_statement_set().add_insert("sink", result).execute() \
+            .get_job_client() \
+            .get_job_execution_result(
+                get_gateway().jvm.Thread.currentThread().getContextClassLoader()) \
+            .result()
+        actual = source_sink_utils.results()
+        expected = ['1 and Hi,1 or Hi', '2 and Hello,2 or Hello']
+        self.assert_equals(actual, expected)
+
+    @staticmethod
+    def execute_with_table(t_env):
+        source = t_env.from_elements([(1, "Hi"), (2, "Hello")], ["a", "b"])
+        result = source.select("func1(a, b), func2(a, b)")
+        result.execute() \
+            .get_job_client() \
+            .get_job_execution_result(
+                get_gateway().jvm.Thread.currentThread().getContextClassLoader()) \
+            .result()
+
+    def execute_with_table_execute_insert(self, t_env):
+        source = t_env.from_elements([(1, "Hi"), (2, "Hello")], ["a", "b"])
+        result = source.select("func1(a, b), func2(a, b)")
+        result.execute_insert("sink") \
+            .get_job_client() \
+            .get_job_execution_result(
+                get_gateway().jvm.Thread.currentThread().getContextClassLoader()) \
+            .result()
+        actual = source_sink_utils.results()
+        expected = ['1 and Hi,1 or Hi', '2 and Hello,2 or Hello']
+        self.assert_equals(actual, expected)
 
-    def verify_set_java_dependencies(self, config_key):
+    @staticmethod
+    def execute_with_table_to_pandas(t_env):
+        source = t_env.from_elements([(1, "Hi"), (2, "Hello")], ["a", "b"])
+        result = source.select("func1(a, b), func2(a, b)")
+        result.to_pandas()
+
+    def verify_set_java_dependencies(self, config_key, executor):
         original_class_loader = \
             get_gateway().jvm.Thread.currentThread().getContextClassLoader()
         try:
@@ -397,17 +486,13 @@ class StreamTableEnvironmentTests(TableEnvironmentTest, PyFlinkStreamTableTestCa
 
             self.assertEqual(first_class_loader, second_class_loader)
 
-            source = self.t_env.from_elements([(1, "Hi"), (2, "Hello")], ["a", "b"])
             self.t_env.register_java_function("func1", func1_class_name)
             self.t_env.register_java_function("func2", func2_class_name)
             table_sink = source_sink_utils.TestAppendSink(
                 ["a", "b"], [DataTypes.STRING(), DataTypes.STRING()])
             self.t_env.register_table_sink("sink", table_sink)
-            source.select("func1(a, b), func2(a, b)").insert_into("sink")
-            self.t_env.execute("test")
-            actual = source_sink_utils.results()
-            expected = ['1 and Hi,1 or Hi', '2 and Hello,2 or Hello']
-            self.assert_equals(actual, expected)
+
+            executor(self.t_env)
         finally:
             get_gateway().jvm.Thread.currentThread().setContextClassLoader(original_class_loader)