You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Dian Fu (Jira)" <ji...@apache.org> on 2024/04/02 02:13:00 UTC
[jira] [Created] (FLINK-34985) It doesn't support to access fields by name for map function in thread mode
Dian Fu created FLINK-34985:
-------------------------------
Summary: It doesn't support to access fields by name for map function in thread mode
Key: FLINK-34985
URL: https://issues.apache.org/jira/browse/FLINK-34985
Project: Flink
Issue Type: Bug
Components: API / Python
Reporter: Dian Fu
Reported in slack channel: [https://apache-flink.slack.com/archives/C065944F9M2/p1711640068929589]
```
hi all, I seem to be running into an issue when switching to thread mode in PyFlink. In an UDF the {{Row}} seems to get converted into a tuple and you cannot access fields by their name anymore. In process mode it works fine. This bug can easily be reproduced using this minimal example, which is close to the PyFlink docs:
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.common import Row
from pyflink.table import StreamTableEnvironment, DataTypes
from pyflink.table.udf import udf
env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(env)
t_env.get_config().set("parallelism.default", "1")
# This does work:
t_env.get_config().set("python.execution-mode", "process")
# This doesn't work:
#t_env.get_config().set("python.execution-mode", "thread")
def map_function(a: Row) -> Row:
return Row(a.a + 1, a.b * a.b)
# map operation with a python general scalar function
func = udf(
map_function,
result_type=DataTypes.ROW(
[
DataTypes.FIELD("a", DataTypes.BIGINT()),
DataTypes.FIELD("b", DataTypes.BIGINT()),
]
),
)
table = (
t_env.from_elements(
[(2, 4), (0, 0)],
schema=DataTypes.ROW(
[
DataTypes.FIELD("a", DataTypes.BIGINT()),
DataTypes.FIELD("b", DataTypes.BIGINT()),
]
),
)
.map(func)
.alias("a", "b")
.execute()
.print()
)```
The exception I get in this execution mode is:
2024-03-28 16:32:10 Caused by: pemja.core.PythonException: <class 'AttributeError'>: 'tuple' object has no attribute 'a'
2024-03-28 16:32:10 at /usr/local/lib/python3.10/dist-packages/pyflink/fn_execution/embedded/operation_utils.process_element(operation_utils.py:72)
2024-03-28 16:32:10 at /usr/local/lib/python3.10/dist-packages/pyflink/fn_execution/table/operations.process_element(operations.py:102)
2024-03-28 16:32:10 at <string>.<lambda>(<string>:1)
2024-03-28 16:32:10 at /opt/flink/wouter/minimal_example.map_function(minimal_example.py:19)
--
This message was sent by Atlassian Jira
(v8.20.10#820010)