You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Yuval Itzchakov <yu...@gmail.com> on 2021/08/23 11:38:03 UTC

Creating a generic ARRAY_AGG aggregate function for Flink SQL

Hi,

I'm trying to implement a generic ARRAY_AGG UDF function (identical to the
one that exists in many data WHs, e.g
https://docs.snowflake.com/en/sql-reference/functions/array_agg.html) to
utilize in Flink SQL.

Taking reference from CollectAggFunction
<https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/CollectAggFunction.java>,
I tried using ArrayData to generate a GenericArrayData as an output type.
The problem with is I need a way to convert from the external format being
used in the UDF (e.g String, Integer) to the internal representation
required by Flink (i.e. StringData). I haven't found a straight way of
going about that.

Here is a gist of the implementation
<https://gist.github.com/YuvalItzchakov/5cc7b076d31d73e5c1f9b7b72b3c624b>.
Would appreciate any help on how to tackle this.

-- 
Best Regards,
Yuval Itzchakov.

Re: Creating a generic ARRAY_AGG aggregate function for Flink SQL

Posted by Yuval Itzchakov <yu...@gmail.com>.
Hi Guys,

@Ingo Bürk <in...@ververica.com> Thanks for that, but I need this function
sooner rather than later. Would be happy to contribute it back once I get
it to work :)
@Caizhi Weng <ts...@gmail.com> The gist was not in the correct
version. I can't return an Array[T] since it will always be an
Array[Object] at runtime, and that's not what I want. I'm actually
returning ArrayData (more specifically, a GenericArrayData). But for
GenericArrayData the type of the array must be an internal one, and not
external. In order to do this, I need access to something like
DataStructureConverters
<https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/conversion/DataStructureConverters.java>
but
I can't calculate the type at runtime, I'd need this injected somehow in
the TypeInference stage.

On Tue, Aug 24, 2021 at 9:38 AM Ingo Bürk <in...@ververica.com> wrote:

> Hi,
>
> just FYI, we do already have issues in JIRA for this:
> * https://issues.apache.org/jira/browse/FLINK-21949
> * https://issues.apache.org/jira/browse/FLINK-22484
>
>
> Best
> Ingo
>
> On Tue, Aug 24, 2021 at 8:23 AM Caizhi Weng <ts...@gmail.com> wrote:
>
>> Hi!
>>
>> As far as I know, returning an array from the getValue method containing
>> external data format is OK. Flink will do the conversion for you.
>>
>> Are you faced with any exception when using this array_agg? If yes what's
>> the exception stack?
>>
>> You can also open a JIRA ticket to require a built-in support for
>> array_agg, as this function exists in many data ware houses.
>>
>> Yuval Itzchakov <yu...@gmail.com> 于2021年8月23日周一 下午7:38写道:
>>
>>> Hi,
>>>
>>> I'm trying to implement a generic ARRAY_AGG UDF function (identical to
>>> the one that exists in many data WHs, e.g
>>> https://docs.snowflake.com/en/sql-reference/functions/array_agg.html)
>>> to utilize in Flink SQL.
>>>
>>> Taking reference from CollectAggFunction
>>> <https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/CollectAggFunction.java>,
>>> I tried using ArrayData to generate a GenericArrayData as an output type.
>>> The problem with is I need a way to convert from the external format being
>>> used in the UDF (e.g String, Integer) to the internal representation
>>> required by Flink (i.e. StringData). I haven't found a straight way of
>>> going about that.
>>>
>>> Here is a gist of the implementation
>>> <https://gist.github.com/YuvalItzchakov/5cc7b076d31d73e5c1f9b7b72b3c624b>
>>> .
>>> Would appreciate any help on how to tackle this.
>>>
>>> --
>>> Best Regards,
>>> Yuval Itzchakov.
>>>
>>

-- 
Best Regards,
Yuval Itzchakov.

Re: Creating a generic ARRAY_AGG aggregate function for Flink SQL

Posted by Ingo Bürk <in...@ververica.com>.
Hi,

just FYI, we do already have issues in JIRA for this:
* https://issues.apache.org/jira/browse/FLINK-21949
* https://issues.apache.org/jira/browse/FLINK-22484


Best
Ingo

On Tue, Aug 24, 2021 at 8:23 AM Caizhi Weng <ts...@gmail.com> wrote:

> Hi!
>
> As far as I know, returning an array from the getValue method containing
> external data format is OK. Flink will do the conversion for you.
>
> Are you faced with any exception when using this array_agg? If yes what's
> the exception stack?
>
> You can also open a JIRA ticket to require a built-in support for
> array_agg, as this function exists in many data ware houses.
>
> Yuval Itzchakov <yu...@gmail.com> 于2021年8月23日周一 下午7:38写道:
>
>> Hi,
>>
>> I'm trying to implement a generic ARRAY_AGG UDF function (identical to
>> the one that exists in many data WHs, e.g
>> https://docs.snowflake.com/en/sql-reference/functions/array_agg.html) to
>> utilize in Flink SQL.
>>
>> Taking reference from CollectAggFunction
>> <https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/CollectAggFunction.java>,
>> I tried using ArrayData to generate a GenericArrayData as an output type.
>> The problem with is I need a way to convert from the external format being
>> used in the UDF (e.g String, Integer) to the internal representation
>> required by Flink (i.e. StringData). I haven't found a straight way of
>> going about that.
>>
>> Here is a gist of the implementation
>> <https://gist.github.com/YuvalItzchakov/5cc7b076d31d73e5c1f9b7b72b3c624b>
>> .
>> Would appreciate any help on how to tackle this.
>>
>> --
>> Best Regards,
>> Yuval Itzchakov.
>>
>

Re: Creating a generic ARRAY_AGG aggregate function for Flink SQL

Posted by Caizhi Weng <ts...@gmail.com>.
Hi!

As far as I know, returning an array from the getValue method containing
external data format is OK. Flink will do the conversion for you.

Are you faced with any exception when using this array_agg? If yes what's
the exception stack?

You can also open a JIRA ticket to require a built-in support for
array_agg, as this function exists in many data ware houses.

Yuval Itzchakov <yu...@gmail.com> 于2021年8月23日周一 下午7:38写道:

> Hi,
>
> I'm trying to implement a generic ARRAY_AGG UDF function (identical to the
> one that exists in many data WHs, e.g
> https://docs.snowflake.com/en/sql-reference/functions/array_agg.html) to
> utilize in Flink SQL.
>
> Taking reference from CollectAggFunction
> <https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/CollectAggFunction.java>,
> I tried using ArrayData to generate a GenericArrayData as an output type.
> The problem with is I need a way to convert from the external format being
> used in the UDF (e.g String, Integer) to the internal representation
> required by Flink (i.e. StringData). I haven't found a straight way of
> going about that.
>
> Here is a gist of the implementation
> <https://gist.github.com/YuvalItzchakov/5cc7b076d31d73e5c1f9b7b72b3c624b>.
> Would appreciate any help on how to tackle this.
>
> --
> Best Regards,
> Yuval Itzchakov.
>