You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by "Barth, Torben" <T....@Fraport.de> on 2020/12/17 18:39:13 UTC

Pyflink UDF with ARRAY as input

Dear List,

I have  a table with the following structure

my_table
-- Key: String
-- List_element: ARRAY<ROW<`integer_element` INT, `string_element` STRING >>

I want to define a udf to extract information of the “list_element”. I do not manage to access the information of the array in the udf. I try something like:

@udf(result_type=DataTypes.STRING())
def get_string_element(my_list):
       my_string = ‘xxx’
        for element in my_list:
            if element.integer_element  == 2:
                my_string = element. string_element
        return my_string


table_env.create_temporary_function("get_string_element", get_string_element)
# use the function in Python Table API
my_table.select("get_string_element(List_element)")

Unfortunately, I cannot get it work. Does anybody have an idea how the correct way to extract the information is?

Any comments or ideas are very welcome.

Thanks
Torben

Fraport AG Frankfurt Airport Services Worldwide, 60547 Frankfurt am Main, Sitz der Gesellschaft: Frankfurt am Main, Amtsgericht Frankfurt am Main: HRB 7042, Umsatzsteuer-Identifikationsnummer: DE 114150623, Vorsitzender des Aufsichtsrates: Michael Boddenberg – Hessischer Minister der Finanzen; Vorstand: Dr. Stefan Schulte (Vorsitzender), Anke Giesen, Michael Müller, Dr. Pierre Dominique Prümm, Dr. Matthias Zieschang

Re: Pyflink UDF with ARRAY as input

Posted by Xingbo Huang <hx...@gmail.com>.
Hi Torben,

It is indeed a bug, and I have created a JIRA[1]. The work around solution
is to use the index to solve (written in release-1.12):

    env = StreamExecutionEnvironment.get_execution_environment()
    env.set_parallelism(1)
    t_env = StreamTableEnvironment.create(env,

environment_settings=EnvironmentSettings.new_instance()

.in_streaming_mode().use_blink_planner().build())

    @udf(result_type=DataTypes.STRING())
    def get_string_element(my_list):
        my_string = 'xxx'
        for element in my_list:
            if element[0] == 2:
                my_string = element[1]
        return my_string

    t = t_env.from_elements(
        [("1", [Row(3, "flink")]), ("3", [Row(2, "pyflink")]), ("2",
[Row(2, "python")])],
        DataTypes.ROW(
            [DataTypes.FIELD("Key", DataTypes.STRING()),
             DataTypes.FIELD("List_element",
                             DataTypes.ARRAY(DataTypes.ROW(
                                 [DataTypes.FIELD("integer_element",
DataTypes.INT()),
                                  DataTypes.FIELD("string_element",
DataTypes.STRING())])))]))
    print(t.select(get_string_element(t.List_element)).to_pandas())



[1] https://issues.apache.org/jira/browse/FLINK-20666

Best,
Xingbo

Barth, Torben <T....@fraport.de> 于2020年12月18日周五 上午2:46写道:

> Dear List,
>
>
>
> I have  a table with the following structure
>
>
>
> my_table
>
> -- Key: String
>
> -- List_element: ARRAY<ROW<`integer_element` INT, `string_element` STRING
> >>
>
>
>
> I want to define a udf to extract information of the “list_element”. I do
> not manage to access the information of the array in the udf. I try
> something like:
>
>
>
> @udf(result_type=DataTypes.STRING())
>
> def get_string_element(my_list):
>
>        my_string = ‘xxx’
>
>         for element in my_list:
>
>             if element.integer_element  == 2:
>
>                 my_string = element. string_element
>
>         return my_string
>
>
>
>
>
> table_env.create_temporary_function("get_string_element",
> get_string_element)
>
> *# use the function in Python Table API*
>
> my_table.select("get_string_element(List_element)")
>
>
>
> Unfortunately, I cannot get it work. Does anybody have an idea how the
> correct way to extract the information is?
>
>
>
> Any comments or ideas are very welcome.
>
>
>
> Thanks
>
> Torben
>
> Fraport AG Frankfurt Airport Services Worldwide, 60547 Frankfurt am Main,
> Sitz der Gesellschaft: Frankfurt am Main, Amtsgericht Frankfurt am Main:
> HRB 7042, Umsatzsteuer-Identifikationsnummer: DE 114150623, Vorsitzender
> des Aufsichtsrates: Michael Boddenberg – Hessischer Minister der Finanzen;
> Vorstand: Dr. Stefan Schulte (Vorsitzender), Anke Giesen, Michael Müller,
> Dr. Pierre Dominique Prümm, Dr. Matthias Zieschang
>