You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Huang Xingbo (Jira)" <ji...@apache.org> on 2022/03/08 02:00:00 UTC
[jira] [Assigned] (FLINK-26462) Release Testing: Running Python UDF in different Execution Mode
[ https://issues.apache.org/jira/browse/FLINK-26462?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Huang Xingbo reassigned FLINK-26462:
------------------------------------
Assignee: Galen Warren
> Release Testing: Running Python UDF in different Execution Mode
> ---------------------------------------------------------------
>
> Key: FLINK-26462
> URL: https://issues.apache.org/jira/browse/FLINK-26462
> Project: Flink
> Issue Type: Improvement
> Components: API / Python
> Affects Versions: 1.15.0
> Reporter: Huang Xingbo
> Assignee: Galen Warren
> Priority: Blocker
> Labels: release-testing
> Fix For: 1.15.0
>
>
> h1. Setup
> * Build flink source code and compile source code
> {code:bash}
> $ cd {flink-source-code}
> $ mvn clean install -DskipTests
> {code}
> * Prepare a Python Virtual Environment
> {code:bash}
> $ cd flink-python/dev
> $ ./lint-python.sh -s basic
> $ source .conda/bin/activate
> {code}
> * Install PyFlink from source code. For more details, you can refer to the [doc|https://nightlies.apache.org/flink/flink-docs-master/docs/flinkdev/building/#build-pyflink]
> {code:bash}
> $ cd flink-python/apache-flink-libraries
> $ python setup.py sdist
> $ pip install dist/*.tar.gz
> $ cd ..
> $ pip install -r dev/dev-requirements.txt
> $ python setpy.py sdist
> $ pip install dist/*.tar.gz
> {code}
> h1. Test
> * Write a python udf job named demo.py in process mode
> {code:python}
> from pyflink.table.table_environment import TableEnvironment
> from pyflink.table.environment_settings import EnvironmentSettings
> from pyflink.table.udf import ScalarFunction, udf
> from pyflink.table import DataTypes, expressions as expr
> class SubtractOne(ScalarFunction):
> def eval(self, i):
> return i - 1
> @udf(input_types=[DataTypes.BIGINT(), DataTypes.BIGINT()], result_type=DataTypes.BIGINT())
> def add(i, j):
> return i + j
> def main():
> t_env = TableEnvironment.create(EnvironmentSettings.in_batch_mode())
> # process mode !
> t_env.get_config().get_configuration().set_string("python.execution-mode", "process")
> # optinal values
> t_env.get_config().get_configuration().set_string("parallelism.default", "2")
> add_one = udf(lambda i: i + 1, result_type=DataTypes.BIGINT())
> subtract_one = udf(SubtractOne(), result_type=DataTypes.BIGINT())
> t = t_env.from_elements([(1, 2, 1), (2, 5, 2), (3, 1, 3)], ['a', 'b', 'c'])
> result = t.select(add(add_one(t.a), subtract_one(t.b)), t.c, expr.lit(1))
> print(result.to_pandas())
> if __name__ == '__main__':
> main()
> {code}
> * run the python udf job and watch the result
> {code:bash}
> $ python demo.py
> _c0 c _c2
> 0 3 1 1
> 1 7 2 1
> 2 4 3 1
> {code}
> * change the python udf job to multi-thread mode
> {code:python}
> from pyflink.table.table_environment import TableEnvironment
> from pyflink.table.environment_settings import EnvironmentSettings
> from pyflink.table.udf import ScalarFunction, udf
> from pyflink.table import DataTypes, expressions as expr
> class SubtractOne(ScalarFunction):
> def eval(self, i):
> return i - 1
> @udf(input_types=[DataTypes.BIGINT(), DataTypes.BIGINT()], result_type=DataTypes.BIGINT())
> def add(i, j):
> return i + j
> def main():
> t_env = TableEnvironment.create(EnvironmentSettings.in_batch_mode())
> # multi-thread mode
> t_env.get_config().get_configuration().set_string("python.execution-mode", "multi-thread")
> t_env.get_config().get_configuration().set_string("parallelism.default", "2")
> add_one = udf(lambda i: i + 1, result_type=DataTypes.BIGINT())
> subtract_one = udf(SubtractOne(), result_type=DataTypes.BIGINT())
> t = t_env.from_elements([(1, 2, 1), (2, 5, 2), (3, 1, 3)], ['a', 'b', 'c'])
> result = t.select(add(add_one(t.a), subtract_one(t.b)), t.c, expr.lit(1))
> print(result.to_pandas())
> if __name__ == '__main__':
> main()
> {code}
> * run the python udf job and watch the result
> {code:bash}
> $ python demo.py
> _c0 c _c2
> 0 3 1 1
> 1 7 2 1
> 2 4 3 1
> {code}
--
This message was sent by Atlassian Jira
(v8.20.1#820001)