You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Dongwon Kim <ea...@gmail.com> on 2019/03/20 01:51:05 UTC

How to split tuple2 returned by UDAF into two columns in a result table

Hi,

I want to split Tuple2 returned by AggregateFunction.getValue into two different columns in a resultant table.

Let's consider the following example where myudaf returns Tuple2<Boolean, Boolean>:

  Table table2 = table1
      .window(Slide.over("3.rows").every("1.rows").on("time").as("w"))
      .groupBy("w, name")
      .select("name, myudaf(col1, col2, col3) as (col4, col5)")

Then table2.printSchema() returns (w/ Flink 1.7.2)

  root
    |-- name: String
    |-- col4: Java Tuple2<Boolean, Boolean>

whereas my expectation is

  root
    |-- name: String
    |-- col4: Boolean
    |-- col5: Boolean

When I define a scalar function which returns Tuple2 and use like "myscalar(col1, col2, col3) as (col4, col5)", it works as expected.

Is there a possible way of splitting the tuple into two different columns in Flink-1.7.2?
If not, do I have to define an additional UDF in order to flatten the tuple? or there's already one I can make use of?

- Dongwon

Re: How to split tuple2 returned by UDAF into two columns in a result table

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Dongwon,

Couldn't you just return a tuple from the aggregation function and extract
the fields from the nested tuple using a value access function [1]?

table table2 = table1
      .window(Slide.over("3.rows").every("1.rows").on("time").as("w"))
      .groupBy("w, name")
      .select("name, myudaf(col1, col2, col3) as x")
      .select("name, x.get(0) as col4, x.get(1) as col5")

Best, Fabian

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/table/functions.html#value-access-functions

Am Mi., 20. März 2019 um 07:15 Uhr schrieb Dongwon Kim <
eastcirclek@gmail.com>:

> Another, yet related question:
>
> Is there something like aggregate table function?
> In the above scenario, I have to apply an aggregate function and then
> apply a table function solely to flatten tuples, which seems quite
> inefficient.
>
>
>
> On Wed, Mar 20, 2019 at 1:09 PM Dongwon Kim <ea...@gmail.com> wrote:
>
>> Hi Kurt,
>> You're right; It is table function like "mytablefunc(col1, col2, col3) as
>> (col4, col5)".
>> I've got to define a custom UDTF for that purpose.
>> Thanks,
>>
>> - Dongwon
>>
>> On Wed, Mar 20, 2019 at 12:04 PM Kurt Young <yk...@gmail.com> wrote:
>>
>>> Hi Dongwon,
>>>
>>> AFAIK, Flink doesn't support the usage like "myscalar(col1, col2, col3)
>>> as (col4, col5)". Am I missing something?
>>>
>>> If you want to split Tuple2 into two different columns, you can use
>>> UDTF.
>>>
>>> Best,
>>> Kurt
>>>
>>>
>>> On Wed, Mar 20, 2019 at 9:59 AM Dongwon Kim <ea...@gmail.com>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> I want to split Tuple2 returned by AggregateFunction.getValue into two
>>>> different columns in a resultant table.
>>>>
>>>> Let's consider the following example where myudaf returns
>>>> Tuple2<Boolean, Boolean>:
>>>>
>>>>   Table table2 = table1
>>>>       .window(Slide.over("3.rows").every("1.rows").on("time").as("w"))
>>>>       .groupBy("w, name")
>>>>       .select("name, myudaf(col1, col2, col3) as (col4, col5)")
>>>>
>>>> Then table2.printSchema() returns (w/ Flink 1.7.2)
>>>>
>>>>   root
>>>>     |-- name: String
>>>>     |-- col4: Java Tuple2<Boolean, Boolean>
>>>>
>>>> whereas my expectation is
>>>>
>>>>   root
>>>>     |-- name: String
>>>>     |-- col4: Boolean
>>>>     |-- col5: Boolean
>>>>
>>>> When I define a scalar function which returns Tuple2 and use like
>>>> "myscalar(col1, col2, col3) as (col4, col5)", it works as expected.
>>>>
>>>> Is there a possible way of splitting the tuple into two different
>>>> columns in Flink-1.7.2?
>>>> If not, do I have to define an additional UDF in order to flatten the
>>>> tuple? or there's already one I can make use of?
>>>>
>>>> - Dongwon
>>>
>>>

Re: How to split tuple2 returned by UDAF into two columns in a result table

Posted by Dongwon Kim <ea...@gmail.com>.
Another, yet related question:

Is there something like aggregate table function?
In the above scenario, I have to apply an aggregate function and then apply
a table function solely to flatten tuples, which seems quite inefficient.



On Wed, Mar 20, 2019 at 1:09 PM Dongwon Kim <ea...@gmail.com> wrote:

> Hi Kurt,
> You're right; It is table function like "mytablefunc(col1, col2, col3) as
> (col4, col5)".
> I've got to define a custom UDTF for that purpose.
> Thanks,
>
> - Dongwon
>
> On Wed, Mar 20, 2019 at 12:04 PM Kurt Young <yk...@gmail.com> wrote:
>
>> Hi Dongwon,
>>
>> AFAIK, Flink doesn't support the usage like "myscalar(col1, col2, col3)
>> as (col4, col5)". Am I missing something?
>>
>> If you want to split Tuple2 into two different columns, you can use UDTF.
>>
>> Best,
>> Kurt
>>
>>
>> On Wed, Mar 20, 2019 at 9:59 AM Dongwon Kim <ea...@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> I want to split Tuple2 returned by AggregateFunction.getValue into two
>>> different columns in a resultant table.
>>>
>>> Let's consider the following example where myudaf returns
>>> Tuple2<Boolean, Boolean>:
>>>
>>>   Table table2 = table1
>>>       .window(Slide.over("3.rows").every("1.rows").on("time").as("w"))
>>>       .groupBy("w, name")
>>>       .select("name, myudaf(col1, col2, col3) as (col4, col5)")
>>>
>>> Then table2.printSchema() returns (w/ Flink 1.7.2)
>>>
>>>   root
>>>     |-- name: String
>>>     |-- col4: Java Tuple2<Boolean, Boolean>
>>>
>>> whereas my expectation is
>>>
>>>   root
>>>     |-- name: String
>>>     |-- col4: Boolean
>>>     |-- col5: Boolean
>>>
>>> When I define a scalar function which returns Tuple2 and use like
>>> "myscalar(col1, col2, col3) as (col4, col5)", it works as expected.
>>>
>>> Is there a possible way of splitting the tuple into two different
>>> columns in Flink-1.7.2?
>>> If not, do I have to define an additional UDF in order to flatten the
>>> tuple? or there's already one I can make use of?
>>>
>>> - Dongwon
>>
>>

Re: How to split tuple2 returned by UDAF into two columns in a result table

Posted by Dongwon Kim <ea...@gmail.com>.
Hi Kurt,
You're right; It is table function like "mytablefunc(col1, col2, col3) as
(col4, col5)".
I've got to define a custom UDTF for that purpose.
Thanks,

- Dongwon

On Wed, Mar 20, 2019 at 12:04 PM Kurt Young <yk...@gmail.com> wrote:

> Hi Dongwon,
>
> AFAIK, Flink doesn't support the usage like "myscalar(col1, col2, col3) as
> (col4, col5)". Am I missing something?
>
> If you want to split Tuple2 into two different columns, you can use UDTF.
>
> Best,
> Kurt
>
>
> On Wed, Mar 20, 2019 at 9:59 AM Dongwon Kim <ea...@gmail.com> wrote:
>
>> Hi,
>>
>> I want to split Tuple2 returned by AggregateFunction.getValue into two
>> different columns in a resultant table.
>>
>> Let's consider the following example where myudaf returns Tuple2<Boolean,
>> Boolean>:
>>
>>   Table table2 = table1
>>       .window(Slide.over("3.rows").every("1.rows").on("time").as("w"))
>>       .groupBy("w, name")
>>       .select("name, myudaf(col1, col2, col3) as (col4, col5)")
>>
>> Then table2.printSchema() returns (w/ Flink 1.7.2)
>>
>>   root
>>     |-- name: String
>>     |-- col4: Java Tuple2<Boolean, Boolean>
>>
>> whereas my expectation is
>>
>>   root
>>     |-- name: String
>>     |-- col4: Boolean
>>     |-- col5: Boolean
>>
>> When I define a scalar function which returns Tuple2 and use like
>> "myscalar(col1, col2, col3) as (col4, col5)", it works as expected.
>>
>> Is there a possible way of splitting the tuple into two different columns
>> in Flink-1.7.2?
>> If not, do I have to define an additional UDF in order to flatten the
>> tuple? or there's already one I can make use of?
>>
>> - Dongwon
>
>

Re: How to split tuple2 returned by UDAF into two columns in a result table

Posted by Kurt Young <yk...@gmail.com>.
Hi Dongwon,

AFAIK, Flink doesn't support the usage like "myscalar(col1, col2, col3) as
(col4, col5)". Am I missing something?

If you want to split Tuple2 into two different columns, you can use UDTF.

Best,
Kurt


On Wed, Mar 20, 2019 at 9:59 AM Dongwon Kim <ea...@gmail.com> wrote:

> Hi,
>
> I want to split Tuple2 returned by AggregateFunction.getValue into two
> different columns in a resultant table.
>
> Let's consider the following example where myudaf returns Tuple2<Boolean,
> Boolean>:
>
>   Table table2 = table1
>       .window(Slide.over("3.rows").every("1.rows").on("time").as("w"))
>       .groupBy("w, name")
>       .select("name, myudaf(col1, col2, col3) as (col4, col5)")
>
> Then table2.printSchema() returns (w/ Flink 1.7.2)
>
>   root
>     |-- name: String
>     |-- col4: Java Tuple2<Boolean, Boolean>
>
> whereas my expectation is
>
>   root
>     |-- name: String
>     |-- col4: Boolean
>     |-- col5: Boolean
>
> When I define a scalar function which returns Tuple2 and use like
> "myscalar(col1, col2, col3) as (col4, col5)", it works as expected.
>
> Is there a possible way of splitting the tuple into two different columns
> in Flink-1.7.2?
> If not, do I have to define an additional UDF in order to flatten the
> tuple? or there's already one I can make use of?
>
> - Dongwon