You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Huang Xingbo (Jira)" <ji...@apache.org> on 2022/03/03 09:39:00 UTC
[jira] [Created] (FLINK-26462) Release Testing: Running Python UDF in different Execution Mode
Huang Xingbo created FLINK-26462:
------------------------------------
Summary: Release Testing: Running Python UDF in different Execution Mode
Key: FLINK-26462
URL: https://issues.apache.org/jira/browse/FLINK-26462
Project: Flink
Issue Type: Improvement
Components: API / Python
Affects Versions: 1.15.0
Reporter: Huang Xingbo
Fix For: 1.15.0
h1. Setup
Prepare a Python Virtual Environment
{code:bash}
$ cd flink-python/dev
$ ./lint-python.sh -s basic
$ source .conda/bin/activate
{code}
h1. Test
# Write a python udf job named demo.py in process mode
{code:python}
from pyflink.table.table_environment import TableEnvironment
from pyflink.table.environment_settings import EnvironmentSettings
from pyflink.table.udf import ScalarFunction, udf
from pyflink.table import DataTypes, expressions as expr
class SubtractOne(ScalarFunction):
def eval(self, i):
return i - 1
@udf(input_types=[DataTypes.BIGINT(), DataTypes.BIGINT()], result_type=DataTypes.BIGINT())
def add(i, j):
return i + j
def main():
t_env = TableEnvironment.create(EnvironmentSettings.in_batch_mode())
# process mode !
t_env.get_config().get_configuration().set_string("python.execution-mode", "process")
# optinal values
t_env.get_config().get_configuration().set_string("parallelism.default", "2")
add_one = udf(lambda i: i + 1, result_type=DataTypes.BIGINT())
subtract_one = udf(SubtractOne(), result_type=DataTypes.BIGINT())
t = t_env.from_elements([(1, 2, 1), (2, 5, 2), (3, 1, 3)], ['a', 'b', 'c'])
result = t.select(add(add_one(t.a), subtract_one(t.b)), t.c, expr.lit(1))
print(result.to_pandas())
if __name__ == '__main__':
main()
{code}
# run the python udf job and watch the result
{code:bash}
$ python demo.py
_c0 c _c2
0 3 1 1
1 7 2 1
2 4 3 1
{code}
# change the python udf job to multi-thread mode
{code:python}
from pyflink.table.table_environment import TableEnvironment
from pyflink.table.environment_settings import EnvironmentSettings
from pyflink.table.udf import ScalarFunction, udf
from pyflink.table import DataTypes, expressions as expr
class SubtractOne(ScalarFunction):
def eval(self, i):
return i - 1
@udf(input_types=[DataTypes.BIGINT(), DataTypes.BIGINT()], result_type=DataTypes.BIGINT())
def add(i, j):
return i + j
def main():
t_env = TableEnvironment.create(EnvironmentSettings.in_batch_mode())
# multi-thread mode
t_env.get_config().get_configuration().set_string("python.execution-mode", "multi-thread")
t_env.get_config().get_configuration().set_string("parallelism.default", "2")
add_one = udf(lambda i: i + 1, result_type=DataTypes.BIGINT())
subtract_one = udf(SubtractOne(), result_type=DataTypes.BIGINT())
t = t_env.from_elements([(1, 2, 1), (2, 5, 2), (3, 1, 3)], ['a', 'b', 'c'])
result = t.select(add(add_one(t.a), subtract_one(t.b)), t.c, expr.lit(1))
print(result.to_pandas())
if __name__ == '__main__':
main()
{code}
# run the python udf job and watch the result
{code:bash}
$ python demo.py
_c0 c _c2
0 3 1 1
1 7 2 1
2 4 3 1
{code}
--
This message was sent by Atlassian Jira
(v8.20.1#820001)