You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "awayne (Jira)" <ji...@apache.org> on 2021/02/22 09:54:00 UTC
[jira] [Created] (FLINK-21434) When UDAF return ROW type, and the
number of fields is more than 14, the crash happend
awayne created FLINK-21434:
------------------------------
Summary: When UDAF return ROW type, and the number of fields is more than 14, the crash happend
Key: FLINK-21434
URL: https://issues.apache.org/jira/browse/FLINK-21434
Project: Flink
Issue Type: Bug
Components: API / Python
Affects Versions: 1.12.1
Environment: python 3.7.5
pyflink 1.12.1
Reporter: awayne
Code(a simple udaf to return a Row containing 15 fields):
{code:python}
from pyflink.common import Row
from pyflink.table.udf import AggregateFunction, udaf
from pyflink.table import DataTypes, EnvironmentSettings, StreamTableEnvironment
class Test(AggregateFunction):
def create_accumulator(self):
return Row(0, 0)
def get_value(self, accumulator):
return Row(1.23, 1.23, 1.23, 1.23, 1.23, 1.23, 1.23,
1.23, 1.23, 1.23, 1.23, 1.23, 1.23, 1.23, 1.23)
def accumulate(self, accumulator, a, b):
pass
def get_result_type(self):
return DataTypes.ROW([
DataTypes.FIELD("f1", DataTypes.FLOAT()),
DataTypes.FIELD("f2", DataTypes.FLOAT()),
DataTypes.FIELD("f3", DataTypes.FLOAT()),
DataTypes.FIELD("f4", DataTypes.FLOAT()),
DataTypes.FIELD("f5", DataTypes.FLOAT()),
DataTypes.FIELD("f6", DataTypes.FLOAT()),
DataTypes.FIELD("f7", DataTypes.FLOAT()),
DataTypes.FIELD("f8", DataTypes.FLOAT()),
DataTypes.FIELD("f9", DataTypes.FLOAT()),
DataTypes.FIELD("f10", DataTypes.FLOAT()),
DataTypes.FIELD("f11", DataTypes.FLOAT()),
DataTypes.FIELD("f12", DataTypes.FLOAT()),
DataTypes.FIELD("f13", DataTypes.FLOAT()),
DataTypes.FIELD("f14", DataTypes.FLOAT()),
DataTypes.FIELD("f15", DataTypes.FLOAT())
])
def get_accumulator_type(self):
return DataTypes.ROW([
DataTypes.FIELD("f1", DataTypes.BIGINT()),
DataTypes.FIELD("f2", DataTypes.BIGINT())])
def udaf_test():
env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
table_env = StreamTableEnvironment.create(environment_settings=env_settings)
test = udaf(Test())
table_env.execute_sql("""
CREATE TABLE print_sink (
`name` STRING,
`agg` ROW<f1 FLOAT, f2 FLOAT, f3 FLOAT, f4 FLOAT,
f5 FLOAT, f6 FLOAT, f7 FLOAT, f8 FLOAT,
f9 FLOAT, f10 FLOAT, f11 FLOAT, f12 FLOAT,
f13 FLOAT, f14 FLOAT, f15 FLOAT>
) WITH (
'connector' = 'print'
)
""")
table = table_env.from_elements([(1, 2, "Lee")], ['value', 'count', 'name'])
result_table = table.group_by(table.name)\
.select(table.name, test(table.value, table.count))
result_table.execute_insert("print_sink").wait()
if __name__ == "__main__":
udaf_test()
{code}
Exception:
{code:java}
Caused by: java.io.EOFException
at java.base/java.io.DataInputStream.readInt(DataInputStream.java:397)
at java.base/java.io.DataInputStream.readFloat(DataInputStream.java:451)
at org.apache.flink.api.common.typeutils.base.FloatSerializer.deserialize(FloatSerializer.java:72)
at org.apache.flink.api.common.typeutils.base.FloatSerializer.deserialize(FloatSerializer.java:30)
at org.apache.flink.table.runtime.typeutils.serializers.python.RowDataSerializer.deserialize(RowDataSerializer.java:106)
at org.apache.flink.table.runtime.typeutils.serializers.python.RowDataSerializer.deserialize(RowDataSerializer.java:49)
at org.apache.flink.table.runtime.typeutils.serializers.python.RowDataSerializer.deserialize(RowDataSerializer.java:106
{code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)