You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by di...@apache.org on 2020/05/29 01:46:34 UTC
[flink] branch master updated: [FLINK-17946][python] Fix the bug
that the config option 'pipeline.jars' doesn't work.
This is an automated email from the ASF dual-hosted git repository.
dianfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new cda25e5 [FLINK-17946][python] Fix the bug that the config option 'pipeline.jars' doesn't work.
cda25e5 is described below
commit cda25e5d185cf04c778ed4d173f9db6dfe3522ad
Author: Wei Zhong <we...@gmail.com>
AuthorDate: Wed May 27 21:09:27 2020 +0800
[FLINK-17946][python] Fix the bug that the config option 'pipeline.jars' doesn't work.
This closes #12372.
---
docs/dev/table/python/common_questions.md | 8 +-
docs/dev/table/python/common_questions.zh.md | 8 +-
flink-python/pyflink/ml/api/base.py | 2 +-
flink-python/pyflink/table/statement_set.py | 8 +-
flink-python/pyflink/table/table.py | 97 ++++++++++---------
flink-python/pyflink/table/table_environment.py | 30 +++---
.../table/tests/test_table_environment_api.py | 103 +++++++++++++++++++--
7 files changed, 178 insertions(+), 78 deletions(-)
diff --git a/docs/dev/table/python/common_questions.md b/docs/dev/table/python/common_questions.md
index 89da24c..f0858e2 100644
--- a/docs/dev/table/python/common_questions.md
+++ b/docs/dev/table/python/common_questions.md
@@ -69,11 +69,11 @@ A PyFlink job may depend on jar files, i.e. connectors, Java UDFs, etc.
You can specify the dependencies with the following Python Table APIs or through <a href="{{ site.baseurl }}/ops/cli.html#usage">command-line arguments</a> directly when submitting the job.
{% highlight python %}
-# NOTE: Only local file URLs (start with "file://") are supported.
-table_env.get_config().set_configuration("pipeline.jars", "file:///my/jar/path/connector.jar;file:///my/jar/path/udf.jar")
+# NOTE: Only local file URLs (start with "file:") are supported.
+table_env.get_config().get_configuration().set_string("pipeline.jars", "file:///my/jar/path/connector.jar;file:///my/jar/path/udf.jar")
-# NOTE: The Paths must specify a protocol (e.g. file://) and users should ensure that the URLs are accessible on both the client and the cluster.
-table_env.get_config().set_configuration("pipeline.classpaths", "file:///my/jar/path/connector.jar;file:///my/jar/path/udf.jar")
+# NOTE: The Paths must specify a protocol (e.g. "file") and users should ensure that the URLs are accessible on both the client and the cluster.
+table_env.get_config().get_configuration().set_string("pipeline.classpaths", "file:///my/jar/path/connector.jar;file:///my/jar/path/udf.jar")
{% endhighlight %}
For details about the APIs of adding Java dependency, you can refer to [the relevant documentation]({{ site.baseurl }}/dev/table/python/dependency_management.html##java-dependency)
diff --git a/docs/dev/table/python/common_questions.zh.md b/docs/dev/table/python/common_questions.zh.md
index f11f83c..58dbdc6 100644
--- a/docs/dev/table/python/common_questions.zh.md
+++ b/docs/dev/table/python/common_questions.zh.md
@@ -69,11 +69,11 @@ A PyFlink job may depend on jar files, i.e. connectors, Java UDFs, etc.
You can specify the dependencies with the following Python Table APIs or through <a href="{{ site.baseurl }}/zh/ops/cli.html#usage">command-line arguments</a> directly when submitting the job.
{% highlight python %}
-# NOTE: Only local file URLs (start with "file://") are supported.
-table_env.get_config().set_configuration("pipeline.jars", "file:///my/jar/path/connector.jar;file:///my/jar/path/udf.jar")
+# NOTE: Only local file URLs (start with "file:") are supported.
+table_env.get_config().get_configuration().set_string("pipeline.jars", "file:///my/jar/path/connector.jar;file:///my/jar/path/udf.jar")
-# NOTE: The Paths must specify a protocol (e.g. file://) and users should ensure that the URLs are accessible on both the client and the cluster.
-table_env.get_config().set_configuration("pipeline.classpaths", "file:///my/jar/path/connector.jar;file:///my/jar/path/udf.jar")
+# NOTE: The Paths must specify a protocol (e.g. "file") and users should ensure that the URLs are accessible on both the client and the cluster.
+table_env.get_config().get_configuration().set_string("pipeline.classpaths", "file:///my/jar/path/connector.jar;file:///my/jar/path/udf.jar")
{% endhighlight %}
For details about the APIs of adding Java dependency, you can refer to [the relevant documentation]({{ site.baseurl }}/zh/dev/table/python/dependency_management.html##java-dependency)
diff --git a/flink-python/pyflink/ml/api/base.py b/flink-python/pyflink/ml/api/base.py
index 3674e6c..6b7c39d 100644
--- a/flink-python/pyflink/ml/api/base.py
+++ b/flink-python/pyflink/ml/api/base.py
@@ -110,7 +110,7 @@ class JavaTransformer(Transformer):
:returns: the transformed table
"""
self._convert_params_to_java(self._j_obj)
- return Table(self._j_obj.transform(table_env._j_tenv, table._j_table))
+ return Table(self._j_obj.transform(table_env._j_tenv, table._j_table), table_env)
class Model(Transformer):
diff --git a/flink-python/pyflink/table/statement_set.py b/flink-python/pyflink/table/statement_set.py
index 9bf5dc6..005f384 100644
--- a/flink-python/pyflink/table/statement_set.py
+++ b/flink-python/pyflink/table/statement_set.py
@@ -16,6 +16,7 @@
# limitations under the License.
################################################################################
+from pyflink.table.table_result import TableResult
from pyflink.util.utils import to_j_explain_detail_arr
__all__ = ['StatementSet']
@@ -34,8 +35,9 @@ class StatementSet(object):
"""
- def __init__(self, _j_statement_set):
+ def __init__(self, _j_statement_set, t_env):
self._j_statement_set = _j_statement_set
+ self._t_env = t_env
def add_insert_sql(self, stmt):
"""
@@ -89,5 +91,5 @@ class StatementSet(object):
:return: execution result.
"""
- # TODO convert java TableResult to python TableResult once FLINK-17303 is finished
- return self._j_statement_set.execute()
+ self._t_env._before_execute()
+ return TableResult(self._j_statement_set.execute())
diff --git a/flink-python/pyflink/table/table.py b/flink-python/pyflink/table/table.py
index f5b5687..7174006 100644
--- a/flink-python/pyflink/table/table.py
+++ b/flink-python/pyflink/table/table.py
@@ -22,6 +22,7 @@ from py4j.java_gateway import get_method
from pyflink.java_gateway import get_gateway
from pyflink.table.serializers import ArrowSerializer
+from pyflink.table.table_result import TableResult
from pyflink.table.table_schema import TableSchema
from pyflink.table.types import create_arrow_schema
from pyflink.table.utils import tz_convert_from_internal
@@ -62,8 +63,9 @@ class Table(object):
the expression syntax.
"""
- def __init__(self, j_table):
+ def __init__(self, j_table, t_env):
self._j_table = j_table
+ self._t_env = t_env
def select(self, fields):
"""
@@ -80,7 +82,7 @@ class Table(object):
:return: The result table.
:rtype: pyflink.table.Table
"""
- return Table(self._j_table.select(fields))
+ return Table(self._j_table.select(fields), self._t_env)
def alias(self, field, *fields):
"""
@@ -101,7 +103,7 @@ class Table(object):
"""
gateway = get_gateway()
extra_fields = to_jarray(gateway.jvm.String, fields)
- return Table(get_method(self._j_table, "as")(field, extra_fields))
+ return Table(get_method(self._j_table, "as")(field, extra_fields), self._t_env)
def filter(self, predicate):
"""
@@ -118,7 +120,7 @@ class Table(object):
:return: The result table.
:rtype: pyflink.table.Table
"""
- return Table(self._j_table.filter(predicate))
+ return Table(self._j_table.filter(predicate), self._t_env)
def where(self, predicate):
"""
@@ -135,7 +137,7 @@ class Table(object):
:return: The result table.
:rtype: pyflink.table.Table
"""
- return Table(self._j_table.where(predicate))
+ return Table(self._j_table.where(predicate), self._t_env)
def group_by(self, fields):
"""
@@ -152,7 +154,7 @@ class Table(object):
:return: The grouped table.
:rtype: pyflink.table.GroupedTable
"""
- return GroupedTable(self._j_table.groupBy(fields))
+ return GroupedTable(self._j_table.groupBy(fields), self._t_env)
def distinct(self):
"""
@@ -166,7 +168,7 @@ class Table(object):
:return: The result table.
:rtype: pyflink.table.Table
"""
- return Table(self._j_table.distinct())
+ return Table(self._j_table.distinct(), self._t_env)
def join(self, right, join_predicate=None):
"""
@@ -193,9 +195,9 @@ class Table(object):
:rtype: pyflink.table.Table
"""
if join_predicate is not None:
- return Table(self._j_table.join(right._j_table, join_predicate))
+ return Table(self._j_table.join(right._j_table, join_predicate), self._t_env)
else:
- return Table(self._j_table.join(right._j_table))
+ return Table(self._j_table.join(right._j_table), self._t_env)
def left_outer_join(self, right, join_predicate=None):
"""
@@ -222,9 +224,9 @@ class Table(object):
:rtype: pyflink.table.Table
"""
if join_predicate is None:
- return Table(self._j_table.leftOuterJoin(right._j_table))
+ return Table(self._j_table.leftOuterJoin(right._j_table), self._t_env)
else:
- return Table(self._j_table.leftOuterJoin(right._j_table, join_predicate))
+ return Table(self._j_table.leftOuterJoin(right._j_table, join_predicate), self._t_env)
def right_outer_join(self, right, join_predicate):
"""
@@ -249,7 +251,7 @@ class Table(object):
:return: The result table.
:rtype: pyflink.table.Table
"""
- return Table(self._j_table.rightOuterJoin(right._j_table, join_predicate))
+ return Table(self._j_table.rightOuterJoin(right._j_table, join_predicate), self._t_env)
def full_outer_join(self, right, join_predicate):
"""
@@ -274,7 +276,7 @@ class Table(object):
:return: The result table.
:rtype: pyflink.table.Table
"""
- return Table(self._j_table.fullOuterJoin(right._j_table, join_predicate))
+ return Table(self._j_table.fullOuterJoin(right._j_table, join_predicate), self._t_env)
def join_lateral(self, table_function_call, join_predicate=None):
"""
@@ -297,9 +299,10 @@ class Table(object):
:rtype: pyflink.table.Table
"""
if join_predicate is None:
- return Table(self._j_table.joinLateral(table_function_call))
+ return Table(self._j_table.joinLateral(table_function_call), self._t_env)
else:
- return Table(self._j_table.joinLateral(table_function_call, join_predicate))
+ return Table(self._j_table.joinLateral(table_function_call, join_predicate),
+ self._t_env)
def left_outer_join_lateral(self, table_function_call, join_predicate=None):
"""
@@ -323,9 +326,10 @@ class Table(object):
:rtype: pyflink.table.Table
"""
if join_predicate is None:
- return Table(self._j_table.leftOuterJoinLateral(table_function_call))
+ return Table(self._j_table.leftOuterJoinLateral(table_function_call), self._t_env)
else:
- return Table(self._j_table.leftOuterJoinLateral(table_function_call, join_predicate))
+ return Table(self._j_table.leftOuterJoinLateral(table_function_call, join_predicate),
+ self._t_env)
def minus(self, right):
"""
@@ -348,7 +352,7 @@ class Table(object):
:return: The result table.
:rtype: pyflink.table.Table
"""
- return Table(self._j_table.minus(right._j_table))
+ return Table(self._j_table.minus(right._j_table), self._t_env)
def minus_all(self, right):
"""
@@ -372,7 +376,7 @@ class Table(object):
:return: The result table.
:rtype: pyflink.table.Table
"""
- return Table(self._j_table.minusAll(right._j_table))
+ return Table(self._j_table.minusAll(right._j_table), self._t_env)
def union(self, right):
"""
@@ -393,7 +397,7 @@ class Table(object):
:return: The result table.
:rtype: pyflink.table.Table
"""
- return Table(self._j_table.union(right._j_table))
+ return Table(self._j_table.union(right._j_table), self._t_env)
def union_all(self, right):
"""
@@ -414,7 +418,7 @@ class Table(object):
:return: The result table.
:rtype: pyflink.table.Table
"""
- return Table(self._j_table.unionAll(right._j_table))
+ return Table(self._j_table.unionAll(right._j_table), self._t_env)
def intersect(self, right):
"""
@@ -438,7 +442,7 @@ class Table(object):
:return: The result table.
:rtype: pyflink.table.Table
"""
- return Table(self._j_table.intersect(right._j_table))
+ return Table(self._j_table.intersect(right._j_table), self._t_env)
def intersect_all(self, right):
"""
@@ -462,7 +466,7 @@ class Table(object):
:return: The result table.
:rtype: pyflink.table.Table
"""
- return Table(self._j_table.intersectAll(right._j_table))
+ return Table(self._j_table.intersectAll(right._j_table), self._t_env)
def order_by(self, fields):
"""
@@ -479,7 +483,7 @@ class Table(object):
:return: The result table.
:rtype: pyflink.table.Table
"""
- return Table(self._j_table.orderBy(fields))
+ return Table(self._j_table.orderBy(fields), self._t_env)
def offset(self, offset):
"""
@@ -502,7 +506,7 @@ class Table(object):
:return: The result table.
:rtype: pyflink.table.Table
"""
- return Table(self._j_table.offset(offset))
+ return Table(self._j_table.offset(offset), self._t_env)
def fetch(self, fetch):
"""
@@ -529,7 +533,7 @@ class Table(object):
:return: The result table.
:rtype: pyflink.table.Table
"""
- return Table(self._j_table.fetch(fetch))
+ return Table(self._j_table.fetch(fetch), self._t_env)
def window(self, window):
"""
@@ -566,7 +570,7 @@ class Table(object):
:return: A group windowed table.
:rtype: GroupWindowedTable
"""
- return GroupWindowedTable(self._j_table.window(window._java_window))
+ return GroupWindowedTable(self._j_table.window(window._java_window), self._t_env)
def over_window(self, *over_windows):
"""
@@ -600,7 +604,7 @@ class Table(object):
gateway = get_gateway()
window_array = to_jarray(gateway.jvm.OverWindow,
[item._java_over_window for item in over_windows])
- return OverWindowedTable(self._j_table.window(window_array))
+ return OverWindowedTable(self._j_table.window(window_array), self._t_env)
def add_columns(self, fields):
"""
@@ -618,7 +622,7 @@ class Table(object):
:return: The result table.
:rtype: pyflink.table.Table
"""
- return Table(self._j_table.addColumns(fields))
+ return Table(self._j_table.addColumns(fields), self._t_env)
def add_or_replace_columns(self, fields):
"""
@@ -637,7 +641,7 @@ class Table(object):
:return: The result table.
:rtype: pyflink.table.Table
"""
- return Table(self._j_table.addOrReplaceColumns(fields))
+ return Table(self._j_table.addOrReplaceColumns(fields), self._t_env)
def rename_columns(self, fields):
"""
@@ -654,7 +658,7 @@ class Table(object):
:return: The result table.
:rtype: pyflink.table.Table
"""
- return Table(self._j_table.renameColumns(fields))
+ return Table(self._j_table.renameColumns(fields), self._t_env)
def drop_columns(self, fields):
"""
@@ -670,7 +674,7 @@ class Table(object):
:return: The result table.
:rtype: pyflink.table.Table
"""
- return Table(self._j_table.dropColumns(fields))
+ return Table(self._j_table.dropColumns(fields), self._t_env)
def insert_into(self, table_path):
"""
@@ -709,6 +713,7 @@ class Table(object):
:return: the result pandas DataFrame.
"""
+ self._t_env._before_execute()
gateway = get_gateway()
max_arrow_batch_size = self._j_table.getTableEnvironment().getConfig().getConfiguration()\
.getInteger(gateway.jvm.org.apache.flink.python.PythonOptions.MAX_ARROW_BATCH_SIZE)
@@ -770,8 +775,8 @@ class Table(object):
:type overwrite: bool
:return: The table result.
"""
- # TODO convert java TableResult to python TableResult once FLINK-17303 is finished
- self._j_table.executeInsert(table_path, overwrite)
+ self._t_env._before_execute()
+ return TableResult(self._j_table.executeInsert(table_path, overwrite))
def execute(self):
"""
@@ -784,8 +789,8 @@ class Table(object):
:return: The content of the table.
"""
- # TODO convert java TableResult to python TableResult once FLINK-17303 is finished
- self._j_table.execute()
+ self._t_env._before_execute()
+ return TableResult(self._j_table.execute())
def explain(self, *extra_details):
"""
@@ -809,8 +814,9 @@ class GroupedTable(object):
A table that has been grouped on a set of grouping keys.
"""
- def __init__(self, java_table):
+ def __init__(self, java_table, t_env):
self._j_table = java_table
+ self._t_env = t_env
def select(self, fields):
"""
@@ -828,7 +834,7 @@ class GroupedTable(object):
:return: The result table.
:rtype: pyflink.table.Table
"""
- return Table(self._j_table.select(fields))
+ return Table(self._j_table.select(fields), self._t_env)
class GroupWindowedTable(object):
@@ -836,8 +842,9 @@ class GroupWindowedTable(object):
A table that has been windowed for :class:`~pyflink.table.GroupWindow`.
"""
- def __init__(self, java_group_windowed_table):
+ def __init__(self, java_group_windowed_table, t_env):
self._j_table = java_group_windowed_table
+ self._t_env = t_env
def group_by(self, fields):
"""
@@ -861,7 +868,7 @@ class GroupWindowedTable(object):
:return: A window grouped table.
:rtype: pyflink.table.WindowGroupedTable
"""
- return WindowGroupedTable(self._j_table.groupBy(fields))
+ return WindowGroupedTable(self._j_table.groupBy(fields), self._t_env)
class WindowGroupedTable(object):
@@ -869,8 +876,9 @@ class WindowGroupedTable(object):
A table that has been windowed and grouped for :class:`~pyflink.table.window.GroupWindow`.
"""
- def __init__(self, java_window_grouped_table):
+ def __init__(self, java_window_grouped_table, t_env):
self._j_table = java_window_grouped_table
+ self._t_env = t_env
def select(self, fields):
"""
@@ -888,7 +896,7 @@ class WindowGroupedTable(object):
:return: The result table.
:rtype: pyflink.table.Table
"""
- return Table(self._j_table.select(fields))
+ return Table(self._j_table.select(fields), self._t_env)
class OverWindowedTable(object):
@@ -900,8 +908,9 @@ class OverWindowedTable(object):
its neighboring rows.
"""
- def __init__(self, java_over_windowed_table):
+ def __init__(self, java_over_windowed_table, t_env):
self._j_table = java_over_windowed_table
+ self._t_env = t_env
def select(self, fields):
"""
@@ -919,4 +928,4 @@ class OverWindowedTable(object):
:return: The result table.
:rtype: pyflink.table.Table
"""
- return Table(self._j_table.select(fields))
+ return Table(self._j_table.select(fields), self._t_env)
diff --git a/flink-python/pyflink/table/table_environment.py b/flink-python/pyflink/table/table_environment.py
index 1d2be8f..3e7b70b 100644
--- a/flink-python/pyflink/table/table_environment.py
+++ b/flink-python/pyflink/table/table_environment.py
@@ -117,7 +117,7 @@ class TableEnvironment(object):
:return: The result table.
:rtype: pyflink.table.Table
"""
- return Table(self._j_tenv.fromTableSource(table_source._j_table_source))
+ return Table(self._j_tenv.fromTableSource(table_source._j_table_source), self)
def register_catalog(self, catalog_name, catalog):
"""
@@ -251,7 +251,7 @@ class TableEnvironment(object):
gateway = get_gateway()
j_table_paths = utils.to_jarray(gateway.jvm.String, table_path)
j_table = self._j_tenv.scan(j_table_paths)
- return Table(j_table)
+ return Table(j_table, self)
def from_path(self, path):
"""
@@ -289,7 +289,7 @@ class TableEnvironment(object):
.. seealso:: :func:`use_database`
.. versionadded:: 1.10.0
"""
- return Table(get_method(self._j_tenv, "from")(path))
+ return Table(get_method(self._j_tenv, "from")(path), self)
def insert_into(self, target_path, table):
"""
@@ -518,7 +518,7 @@ class TableEnvironment(object):
:rtype: pyflink.table.Table
"""
j_table = self._j_tenv.sqlQuery(query)
- return Table(j_table)
+ return Table(j_table, self)
def execute_sql(self, stmt):
"""
@@ -532,6 +532,7 @@ class TableEnvironment(object):
the affected row count for `DML` (-1 means unknown),
or a string message ("OK") for other statements.
"""
+ self._before_execute()
return TableResult(self._j_tenv.executeSql(stmt))
def create_statement_set(self):
@@ -544,7 +545,7 @@ class TableEnvironment(object):
:rtype: pyflink.table.StatementSet
"""
_j_statement_set = self._j_tenv.createStatementSet()
- return StatementSet(_j_statement_set)
+ return StatementSet(_j_statement_set, self)
def sql_update(self, stmt):
"""
@@ -1041,11 +1042,7 @@ class TableEnvironment(object):
"""
warnings.warn("Deprecated in 1.11. Use execute_sql for single sink, "
"use create_statement_set for multiple sinks.", DeprecationWarning)
- jvm = get_gateway().jvm
- jars_key = jvm.org.apache.flink.configuration.PipelineOptions.JARS.key()
- classpaths_key = jvm.org.apache.flink.configuration.PipelineOptions.CLASSPATHS.key()
- self._add_jars_to_j_env_config(jars_key)
- self._add_jars_to_j_env_config(classpaths_key)
+ self._before_execute()
return JobExecutionResult(self._j_tenv.execute(job_name))
def from_elements(self, elements, schema=None, verify_schema=True):
@@ -1181,7 +1178,7 @@ class TableEnvironment(object):
j_table_source = PythonInputFormatTableSource(
j_input_format, row_type_info)
- return Table(self._j_tenv.fromTableSource(j_table_source))
+ return Table(self._j_tenv.fromTableSource(j_table_source), self)
finally:
os.unlink(temp_file.name)
@@ -1262,7 +1259,7 @@ class TableEnvironment(object):
j_arrow_table_source = \
jvm.org.apache.flink.table.runtime.arrow.ArrowUtils.createArrowTableSource(
data_type, temp_file.name)
- return Table(self._j_tenv.fromTableSource(j_arrow_table_source))
+ return Table(self._j_tenv.fromTableSource(j_arrow_table_source), self)
finally:
os.unlink(temp_file.name)
@@ -1281,7 +1278,7 @@ class TableEnvironment(object):
jar_urls_set = set([jvm.java.net.URL(url).toString() for url in jar_urls.split(";")])
j_configuration = get_j_env_configuration(self)
if j_configuration.containsKey(config_key):
- for url in j_configuration.getString(config_key).split(";"):
+ for url in j_configuration.getString(config_key, "").split(";"):
jar_urls_set.add(url)
j_configuration.setString(config_key, ";".join(jar_urls_set))
@@ -1325,6 +1322,13 @@ class TableEnvironment(object):
function_catalog = function_catalog_field.get(self._j_tenv)
return function_catalog
+ def _before_execute(self):
+ jvm = get_gateway().jvm
+ jars_key = jvm.org.apache.flink.configuration.PipelineOptions.JARS.key()
+ classpaths_key = jvm.org.apache.flink.configuration.PipelineOptions.CLASSPATHS.key()
+ self._add_jars_to_j_env_config(jars_key)
+ self._add_jars_to_j_env_config(classpaths_key)
+
class StreamTableEnvironment(TableEnvironment):
diff --git a/flink-python/pyflink/table/tests/test_table_environment_api.py b/flink-python/pyflink/table/tests/test_table_environment_api.py
index 87215e3..8a2b7fc 100644
--- a/flink-python/pyflink/table/tests/test_table_environment_api.py
+++ b/flink-python/pyflink/table/tests/test_table_environment_api.py
@@ -368,12 +368,101 @@ class StreamTableEnvironmentTests(TableEnvironmentTest, PyFlinkStreamTableTestCa
self.assert_equals(results, ['2,hi,hello\n', '3,hello,hello\n'])
def test_set_jars(self):
- self.verify_set_java_dependencies("pipeline.jars")
+ self.verify_set_java_dependencies("pipeline.jars", self.execute_with_t_env)
+
+ def test_set_jars_with_execute_sql(self):
+ self.verify_set_java_dependencies("pipeline.jars", self.execute_with_execute_sql)
+
+ def test_set_jars_with_statement_set(self):
+ self.verify_set_java_dependencies("pipeline.jars", self.execute_with_statement_set)
+
+ def test_set_jars_with_table(self):
+ self.verify_set_java_dependencies("pipeline.jars", self.execute_with_table)
+
+ def test_set_jars_with_table_execute_insert(self):
+ self.verify_set_java_dependencies("pipeline.jars", self.execute_with_table_execute_insert)
+
+ def test_set_jars_with_table_to_pandas(self):
+ self.verify_set_java_dependencies("pipeline.jars", self.execute_with_table_to_pandas)
def test_set_classpaths(self):
- self.verify_set_java_dependencies("pipeline.classpaths")
+ self.verify_set_java_dependencies("pipeline.classpaths", self.execute_with_t_env)
+
+ def test_set_classpaths_with_execute_sql(self):
+ self.verify_set_java_dependencies("pipeline.classpaths", self.execute_with_execute_sql)
+
+ def test_set_classpaths_with_statement_set(self):
+ self.verify_set_java_dependencies("pipeline.classpaths", self.execute_with_statement_set)
+
+ def test_set_classpaths_with_table(self):
+ self.verify_set_java_dependencies("pipeline.classpaths", self.execute_with_table)
+
+ def test_set_classpaths_with_table_execute_insert(self):
+ self.verify_set_java_dependencies(
+ "pipeline.classpaths", self.execute_with_table_execute_insert)
+
+ def test_set_classpaths_with_table_to_pandas(self):
+ self.verify_set_java_dependencies("pipeline.classpaths", self.execute_with_table_to_pandas)
+
+ def execute_with_t_env(self, t_env):
+ source = t_env.from_elements([(1, "Hi"), (2, "Hello")], ["a", "b"])
+ source.select("func1(a, b), func2(a, b)").insert_into("sink")
+ t_env.execute("test")
+ actual = source_sink_utils.results()
+ expected = ['1 and Hi,1 or Hi', '2 and Hello,2 or Hello']
+ self.assert_equals(actual, expected)
+
+ @staticmethod
+ def execute_with_execute_sql(t_env):
+ source = t_env.from_elements([(1, "Hi"), (2, "Hello")], ["a", "b"])
+ t_env.create_temporary_view("source", source)
+ t_env.execute_sql("select func1(a, b), func2(a, b) from source") \
+ .get_job_client() \
+ .get_job_execution_result(
+ get_gateway().jvm.Thread.currentThread().getContextClassLoader()) \
+ .result()
+
+ def execute_with_statement_set(self, t_env):
+ source = t_env.from_elements([(1, "Hi"), (2, "Hello")], ["a", "b"])
+ result = source.select("func1(a, b), func2(a, b)")
+ t_env.create_statement_set().add_insert("sink", result).execute() \
+ .get_job_client() \
+ .get_job_execution_result(
+ get_gateway().jvm.Thread.currentThread().getContextClassLoader()) \
+ .result()
+ actual = source_sink_utils.results()
+ expected = ['1 and Hi,1 or Hi', '2 and Hello,2 or Hello']
+ self.assert_equals(actual, expected)
+
+ @staticmethod
+ def execute_with_table(t_env):
+ source = t_env.from_elements([(1, "Hi"), (2, "Hello")], ["a", "b"])
+ result = source.select("func1(a, b), func2(a, b)")
+ result.execute() \
+ .get_job_client() \
+ .get_job_execution_result(
+ get_gateway().jvm.Thread.currentThread().getContextClassLoader()) \
+ .result()
+
+ def execute_with_table_execute_insert(self, t_env):
+ source = t_env.from_elements([(1, "Hi"), (2, "Hello")], ["a", "b"])
+ result = source.select("func1(a, b), func2(a, b)")
+ result.execute_insert("sink") \
+ .get_job_client() \
+ .get_job_execution_result(
+ get_gateway().jvm.Thread.currentThread().getContextClassLoader()) \
+ .result()
+ actual = source_sink_utils.results()
+ expected = ['1 and Hi,1 or Hi', '2 and Hello,2 or Hello']
+ self.assert_equals(actual, expected)
- def verify_set_java_dependencies(self, config_key):
+ @staticmethod
+ def execute_with_table_to_pandas(t_env):
+ source = t_env.from_elements([(1, "Hi"), (2, "Hello")], ["a", "b"])
+ result = source.select("func1(a, b), func2(a, b)")
+ result.to_pandas()
+
+ def verify_set_java_dependencies(self, config_key, executor):
original_class_loader = \
get_gateway().jvm.Thread.currentThread().getContextClassLoader()
try:
@@ -397,17 +486,13 @@ class StreamTableEnvironmentTests(TableEnvironmentTest, PyFlinkStreamTableTestCa
self.assertEqual(first_class_loader, second_class_loader)
- source = self.t_env.from_elements([(1, "Hi"), (2, "Hello")], ["a", "b"])
self.t_env.register_java_function("func1", func1_class_name)
self.t_env.register_java_function("func2", func2_class_name)
table_sink = source_sink_utils.TestAppendSink(
["a", "b"], [DataTypes.STRING(), DataTypes.STRING()])
self.t_env.register_table_sink("sink", table_sink)
- source.select("func1(a, b), func2(a, b)").insert_into("sink")
- self.t_env.execute("test")
- actual = source_sink_utils.results()
- expected = ['1 and Hi,1 or Hi', '2 and Hello,2 or Hello']
- self.assert_equals(actual, expected)
+
+ executor(self.t_env)
finally:
get_gateway().jvm.Thread.currentThread().setContextClassLoader(original_class_loader)