You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Huang Xingbo (Jira)" <ji...@apache.org> on 2022/03/08 02:00:00 UTC

[jira] [Assigned] (FLINK-26462) Release Testing: Running Python UDF in different Execution Mode

     [ https://issues.apache.org/jira/browse/FLINK-26462?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Huang Xingbo reassigned FLINK-26462:
------------------------------------

    Assignee: Galen Warren

> Release Testing: Running Python UDF in different Execution Mode
> ---------------------------------------------------------------
>
>                 Key: FLINK-26462
>                 URL: https://issues.apache.org/jira/browse/FLINK-26462
>             Project: Flink
>          Issue Type: Improvement
>          Components: API / Python
>    Affects Versions: 1.15.0
>            Reporter: Huang Xingbo
>            Assignee: Galen Warren
>            Priority: Blocker
>              Labels: release-testing
>             Fix For: 1.15.0
>
>
> h1. Setup
> * Build flink source code and compile source code
> {code:bash}
> $ cd {flink-source-code}
> $ mvn clean install -DskipTests
> {code}
> * Prepare a Python Virtual Environment
> {code:bash}
> $ cd flink-python/dev
> $ ./lint-python.sh -s basic
> $ source .conda/bin/activate
> {code}
> * Install PyFlink from source code. For more details, you can refer to the [doc|https://nightlies.apache.org/flink/flink-docs-master/docs/flinkdev/building/#build-pyflink]
> {code:bash}
> $ cd flink-python/apache-flink-libraries
> $ python setup.py sdist
> $ pip install dist/*.tar.gz
> $ cd ..
> $ pip install -r dev/dev-requirements.txt
> $ python setpy.py sdist
> $ pip install dist/*.tar.gz
> {code}
> h1. Test
> * Write a python udf job named demo.py in process mode
> {code:python}
> from pyflink.table.table_environment import TableEnvironment
> from pyflink.table.environment_settings import EnvironmentSettings
> from pyflink.table.udf import ScalarFunction, udf
> from pyflink.table import DataTypes, expressions as expr
> class SubtractOne(ScalarFunction):
>     def eval(self, i):
>         return i - 1
> @udf(input_types=[DataTypes.BIGINT(), DataTypes.BIGINT()], result_type=DataTypes.BIGINT())
> def add(i, j):
>     return i + j
> def main():
>     t_env = TableEnvironment.create(EnvironmentSettings.in_batch_mode())
>     # process mode !
>     t_env.get_config().get_configuration().set_string("python.execution-mode", "process")
>     # optinal values
>     t_env.get_config().get_configuration().set_string("parallelism.default", "2")
>     add_one = udf(lambda i: i + 1, result_type=DataTypes.BIGINT())
>     subtract_one = udf(SubtractOne(), result_type=DataTypes.BIGINT())
>     t = t_env.from_elements([(1, 2, 1), (2, 5, 2), (3, 1, 3)], ['a', 'b', 'c'])
>     result = t.select(add(add_one(t.a), subtract_one(t.b)), t.c, expr.lit(1))
>     print(result.to_pandas())
> if __name__ == '__main__':
>     main()
> {code}
> * run the python udf job and watch the result
> {code:bash}
> $ python demo.py
>    _c0  c  _c2
> 0    3  1    1
> 1    7  2    1
> 2    4  3    1
> {code}
> * change the python udf job to multi-thread mode
> {code:python}
> from pyflink.table.table_environment import TableEnvironment
> from pyflink.table.environment_settings import EnvironmentSettings
> from pyflink.table.udf import ScalarFunction, udf
> from pyflink.table import DataTypes, expressions as expr
> class SubtractOne(ScalarFunction):
>     def eval(self, i):
>         return i - 1
> @udf(input_types=[DataTypes.BIGINT(), DataTypes.BIGINT()], result_type=DataTypes.BIGINT())
> def add(i, j):
>     return i + j
> def main():
>     t_env = TableEnvironment.create(EnvironmentSettings.in_batch_mode())
>     # multi-thread mode
>     t_env.get_config().get_configuration().set_string("python.execution-mode", "multi-thread")
>     t_env.get_config().get_configuration().set_string("parallelism.default", "2")
>     add_one = udf(lambda i: i + 1, result_type=DataTypes.BIGINT())
>     subtract_one = udf(SubtractOne(), result_type=DataTypes.BIGINT())
>     t = t_env.from_elements([(1, 2, 1), (2, 5, 2), (3, 1, 3)], ['a', 'b', 'c'])
>     result = t.select(add(add_one(t.a), subtract_one(t.b)), t.c, expr.lit(1))
>     print(result.to_pandas())
> if __name__ == '__main__':
>     main()
> {code}
> * run the python udf job and watch the result
> {code:bash}
> $ python demo.py
>    _c0  c  _c2
> 0    3  1    1
> 1    7  2    1
> 2    4  3    1
> {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)