You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "awayne (Jira)" <ji...@apache.org> on 2021/02/22 09:54:00 UTC

[jira] [Created] (FLINK-21434) When UDAF return ROW type, and the number of fields is more than 14, the crash happend

awayne created FLINK-21434:
------------------------------

             Summary: When UDAF return ROW type, and the number of fields is more than 14, the crash happend
                 Key: FLINK-21434
                 URL: https://issues.apache.org/jira/browse/FLINK-21434
             Project: Flink
          Issue Type: Bug
          Components: API / Python
    Affects Versions: 1.12.1
         Environment: python 3.7.5

pyflink 1.12.1
            Reporter: awayne


 Code(a simple udaf to return a Row containing 15 fields):
{code:python}
from pyflink.common import Row
from pyflink.table.udf import AggregateFunction, udaf
from pyflink.table import DataTypes, EnvironmentSettings, StreamTableEnvironment

class Test(AggregateFunction):

  def create_accumulator(self):
    return Row(0, 0)

  def get_value(self, accumulator):
    return Row(1.23, 1.23, 1.23, 1.23, 1.23, 1.23, 1.23,
               1.23, 1.23, 1.23, 1.23, 1.23, 1.23, 1.23, 1.23)

  def accumulate(self, accumulator, a, b):
    pass

  def get_result_type(self):
    return DataTypes.ROW([
        DataTypes.FIELD("f1", DataTypes.FLOAT()),
        DataTypes.FIELD("f2", DataTypes.FLOAT()),
        DataTypes.FIELD("f3", DataTypes.FLOAT()),
        DataTypes.FIELD("f4", DataTypes.FLOAT()),
        DataTypes.FIELD("f5", DataTypes.FLOAT()),
        DataTypes.FIELD("f6", DataTypes.FLOAT()),
        DataTypes.FIELD("f7", DataTypes.FLOAT()),
        DataTypes.FIELD("f8", DataTypes.FLOAT()),
        DataTypes.FIELD("f9", DataTypes.FLOAT()),
        DataTypes.FIELD("f10", DataTypes.FLOAT()),
        DataTypes.FIELD("f11", DataTypes.FLOAT()),
        DataTypes.FIELD("f12", DataTypes.FLOAT()),
        DataTypes.FIELD("f13", DataTypes.FLOAT()),
        DataTypes.FIELD("f14", DataTypes.FLOAT()),
        DataTypes.FIELD("f15", DataTypes.FLOAT())
    ])

  def get_accumulator_type(self):
    return DataTypes.ROW([
        DataTypes.FIELD("f1", DataTypes.BIGINT()),
        DataTypes.FIELD("f2", DataTypes.BIGINT())])


def udaf_test():
  env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
  table_env = StreamTableEnvironment.create(environment_settings=env_settings)
  test = udaf(Test())
  table_env.execute_sql("""
      CREATE TABLE print_sink (
          `name` STRING,
          `agg` ROW<f1 FLOAT, f2 FLOAT, f3 FLOAT, f4 FLOAT,
                    f5 FLOAT, f6 FLOAT, f7 FLOAT, f8 FLOAT,
                    f9 FLOAT, f10 FLOAT, f11 FLOAT, f12 FLOAT,
                    f13 FLOAT, f14 FLOAT, f15 FLOAT>
      ) WITH (
          'connector' = 'print'
      )
  """)
  table = table_env.from_elements([(1, 2, "Lee")], ['value', 'count', 'name'])
  result_table = table.group_by(table.name)\
                      .select(table.name, test(table.value, table.count))
  result_table.execute_insert("print_sink").wait()


if __name__ == "__main__":
  udaf_test()

{code}
Exception:
{code:java}
Caused by: java.io.EOFException
	at java.base/java.io.DataInputStream.readInt(DataInputStream.java:397)
	at java.base/java.io.DataInputStream.readFloat(DataInputStream.java:451)
	at org.apache.flink.api.common.typeutils.base.FloatSerializer.deserialize(FloatSerializer.java:72)
	at org.apache.flink.api.common.typeutils.base.FloatSerializer.deserialize(FloatSerializer.java:30)
	at org.apache.flink.table.runtime.typeutils.serializers.python.RowDataSerializer.deserialize(RowDataSerializer.java:106)
	at org.apache.flink.table.runtime.typeutils.serializers.python.RowDataSerializer.deserialize(RowDataSerializer.java:49)
	at org.apache.flink.table.runtime.typeutils.serializers.python.RowDataSerializer.deserialize(RowDataSerializer.java:106
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)