You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Dian Fu (Jira)" <ji...@apache.org> on 2024/04/02 02:13:00 UTC

[jira] [Created] (FLINK-34985) It doesn't support to access fields by name for map function in thread mode

Dian Fu created FLINK-34985:
-------------------------------

             Summary: It doesn't support to access fields by name for map function in thread mode
                 Key: FLINK-34985
                 URL: https://issues.apache.org/jira/browse/FLINK-34985
             Project: Flink
          Issue Type: Bug
          Components: API / Python
            Reporter: Dian Fu


Reported in slack channel: [https://apache-flink.slack.com/archives/C065944F9M2/p1711640068929589]

```
hi all, I seem to be running into an issue when switching to thread mode in PyFlink. In an UDF the {{Row}} seems to get converted into a tuple and you cannot access fields by their name anymore. In process mode it works fine. This bug can easily be reproduced using this minimal example, which is close to the PyFlink docs:
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.common import Row
from pyflink.table import StreamTableEnvironment, DataTypes
from pyflink.table.udf import udf

env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(env)
t_env.get_config().set("parallelism.default", "1")


# This does work:
t_env.get_config().set("python.execution-mode", "process")

# This doesn't work:
#t_env.get_config().set("python.execution-mode", "thread")


def map_function(a: Row) -> Row:
    return Row(a.a + 1, a.b * a.b)


# map operation with a python general scalar function
func = udf(
    map_function,
    result_type=DataTypes.ROW(
        [
            DataTypes.FIELD("a", DataTypes.BIGINT()),
            DataTypes.FIELD("b", DataTypes.BIGINT()),
        ]
    ),
)
table = (
    t_env.from_elements(
        [(2, 4), (0, 0)],
        schema=DataTypes.ROW(
            [
                DataTypes.FIELD("a", DataTypes.BIGINT()),
                DataTypes.FIELD("b", DataTypes.BIGINT()),
            ]
        ),
    )
    .map(func)
    .alias("a", "b")
    .execute()
    .print()
)```
 
The exception I get in this execution mode is:
2024-03-28 16:32:10 Caused by: pemja.core.PythonException: <class 'AttributeError'>: 'tuple' object has no attribute 'a'
2024-03-28 16:32:10     at /usr/local/lib/python3.10/dist-packages/pyflink/fn_execution/embedded/operation_utils.process_element(operation_utils.py:72)
2024-03-28 16:32:10     at /usr/local/lib/python3.10/dist-packages/pyflink/fn_execution/table/operations.process_element(operations.py:102)
2024-03-28 16:32:10     at <string>.<lambda>(<string>:1)
2024-03-28 16:32:10     at /opt/flink/wouter/minimal_example.map_function(minimal_example.py:19)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)