You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Huang Xingbo (Jira)" <ji...@apache.org> on 2022/03/03 09:39:00 UTC

[jira] [Created] (FLINK-26462) Release Testing: Running Python UDF in different Execution Mode

Huang Xingbo created FLINK-26462:
------------------------------------

             Summary: Release Testing: Running Python UDF in different Execution Mode
                 Key: FLINK-26462
                 URL: https://issues.apache.org/jira/browse/FLINK-26462
             Project: Flink
          Issue Type: Improvement
          Components: API / Python
    Affects Versions: 1.15.0
            Reporter: Huang Xingbo
             Fix For: 1.15.0


h1. Setup
Prepare a Python Virtual Environment

{code:bash}
$ cd flink-python/dev
$ ./lint-python.sh -s basic
$ source .conda/bin/activate
{code}

h1. Test
# Write a python udf job named demo.py in process mode

{code:python}
from pyflink.table.table_environment import TableEnvironment
from pyflink.table.environment_settings import EnvironmentSettings
from pyflink.table.udf import ScalarFunction, udf
from pyflink.table import DataTypes, expressions as expr

class SubtractOne(ScalarFunction):
    def eval(self, i):
        return i - 1

@udf(input_types=[DataTypes.BIGINT(), DataTypes.BIGINT()], result_type=DataTypes.BIGINT())
def add(i, j):
    return i + j

def main():
    t_env = TableEnvironment.create(EnvironmentSettings.in_batch_mode())
    # process mode !
    t_env.get_config().get_configuration().set_string("python.execution-mode", "process")
    # optinal values
    t_env.get_config().get_configuration().set_string("parallelism.default", "2")
    add_one = udf(lambda i: i + 1, result_type=DataTypes.BIGINT())
    subtract_one = udf(SubtractOne(), result_type=DataTypes.BIGINT())

    t = t_env.from_elements([(1, 2, 1), (2, 5, 2), (3, 1, 3)], ['a', 'b', 'c'])
    result = t.select(add(add_one(t.a), subtract_one(t.b)), t.c, expr.lit(1))
    print(result.to_pandas())

if __name__ == '__main__':
    main()
{code}

# run the python udf job and watch the result
{code:bash}
$ python demo.py
   _c0  c  _c2
0    3  1    1
1    7  2    1
2    4  3    1
{code}

# change the python udf job to multi-thread mode

{code:python}
from pyflink.table.table_environment import TableEnvironment
from pyflink.table.environment_settings import EnvironmentSettings
from pyflink.table.udf import ScalarFunction, udf
from pyflink.table import DataTypes, expressions as expr

class SubtractOne(ScalarFunction):
    def eval(self, i):
        return i - 1

@udf(input_types=[DataTypes.BIGINT(), DataTypes.BIGINT()], result_type=DataTypes.BIGINT())
def add(i, j):
    return i + j

def main():
    t_env = TableEnvironment.create(EnvironmentSettings.in_batch_mode())
    # multi-thread mode
    t_env.get_config().get_configuration().set_string("python.execution-mode", "multi-thread")
    t_env.get_config().get_configuration().set_string("parallelism.default", "2")
    add_one = udf(lambda i: i + 1, result_type=DataTypes.BIGINT())
    subtract_one = udf(SubtractOne(), result_type=DataTypes.BIGINT())

    t = t_env.from_elements([(1, 2, 1), (2, 5, 2), (3, 1, 3)], ['a', 'b', 'c'])
    result = t.select(add(add_one(t.a), subtract_one(t.b)), t.c, expr.lit(1))
    print(result.to_pandas())

if __name__ == '__main__':
    main()
{code}

# run the python udf job and watch the result
{code:bash}
$ python demo.py
   _c0  c  _c2
0    3  1    1
1    7  2    1
2    4  3    1
{code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)