You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Dongwon Kim <ea...@gmail.com> on 2019/03/20 01:51:05 UTC
How to split tuple2 returned by UDAF into two columns in a result
table
Hi,
I want to split Tuple2 returned by AggregateFunction.getValue into two different columns in a resultant table.
Let's consider the following example where myudaf returns Tuple2<Boolean, Boolean>:
Table table2 = table1
.window(Slide.over("3.rows").every("1.rows").on("time").as("w"))
.groupBy("w, name")
.select("name, myudaf(col1, col2, col3) as (col4, col5)")
Then table2.printSchema() returns (w/ Flink 1.7.2)
root
|-- name: String
|-- col4: Java Tuple2<Boolean, Boolean>
whereas my expectation is
root
|-- name: String
|-- col4: Boolean
|-- col5: Boolean
When I define a scalar function which returns Tuple2 and use like "myscalar(col1, col2, col3) as (col4, col5)", it works as expected.
Is there a possible way of splitting the tuple into two different columns in Flink-1.7.2?
If not, do I have to define an additional UDF in order to flatten the tuple? or there's already one I can make use of?
- Dongwon
Re: How to split tuple2 returned by UDAF into two columns in a result table
Posted by Fabian Hueske <fh...@gmail.com>.
Hi Dongwon,
Couldn't you just return a tuple from the aggregation function and extract
the fields from the nested tuple using a value access function [1]?
table table2 = table1
.window(Slide.over("3.rows").every("1.rows").on("time").as("w"))
.groupBy("w, name")
.select("name, myudaf(col1, col2, col3) as x")
.select("name, x.get(0) as col4, x.get(1) as col5")
Best, Fabian
[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/table/functions.html#value-access-functions
Am Mi., 20. März 2019 um 07:15 Uhr schrieb Dongwon Kim <
eastcirclek@gmail.com>:
> Another, yet related question:
>
> Is there something like aggregate table function?
> In the above scenario, I have to apply an aggregate function and then
> apply a table function solely to flatten tuples, which seems quite
> inefficient.
>
>
>
> On Wed, Mar 20, 2019 at 1:09 PM Dongwon Kim <ea...@gmail.com> wrote:
>
>> Hi Kurt,
>> You're right; It is table function like "mytablefunc(col1, col2, col3) as
>> (col4, col5)".
>> I've got to define a custom UDTF for that purpose.
>> Thanks,
>>
>> - Dongwon
>>
>> On Wed, Mar 20, 2019 at 12:04 PM Kurt Young <yk...@gmail.com> wrote:
>>
>>> Hi Dongwon,
>>>
>>> AFAIK, Flink doesn't support the usage like "myscalar(col1, col2, col3)
>>> as (col4, col5)". Am I missing something?
>>>
>>> If you want to split Tuple2 into two different columns, you can use
>>> UDTF.
>>>
>>> Best,
>>> Kurt
>>>
>>>
>>> On Wed, Mar 20, 2019 at 9:59 AM Dongwon Kim <ea...@gmail.com>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> I want to split Tuple2 returned by AggregateFunction.getValue into two
>>>> different columns in a resultant table.
>>>>
>>>> Let's consider the following example where myudaf returns
>>>> Tuple2<Boolean, Boolean>:
>>>>
>>>> Table table2 = table1
>>>> .window(Slide.over("3.rows").every("1.rows").on("time").as("w"))
>>>> .groupBy("w, name")
>>>> .select("name, myudaf(col1, col2, col3) as (col4, col5)")
>>>>
>>>> Then table2.printSchema() returns (w/ Flink 1.7.2)
>>>>
>>>> root
>>>> |-- name: String
>>>> |-- col4: Java Tuple2<Boolean, Boolean>
>>>>
>>>> whereas my expectation is
>>>>
>>>> root
>>>> |-- name: String
>>>> |-- col4: Boolean
>>>> |-- col5: Boolean
>>>>
>>>> When I define a scalar function which returns Tuple2 and use like
>>>> "myscalar(col1, col2, col3) as (col4, col5)", it works as expected.
>>>>
>>>> Is there a possible way of splitting the tuple into two different
>>>> columns in Flink-1.7.2?
>>>> If not, do I have to define an additional UDF in order to flatten the
>>>> tuple? or there's already one I can make use of?
>>>>
>>>> - Dongwon
>>>
>>>
Re: How to split tuple2 returned by UDAF into two columns in a result table
Posted by Dongwon Kim <ea...@gmail.com>.
Another, yet related question:
Is there something like aggregate table function?
In the above scenario, I have to apply an aggregate function and then apply
a table function solely to flatten tuples, which seems quite inefficient.
On Wed, Mar 20, 2019 at 1:09 PM Dongwon Kim <ea...@gmail.com> wrote:
> Hi Kurt,
> You're right; It is table function like "mytablefunc(col1, col2, col3) as
> (col4, col5)".
> I've got to define a custom UDTF for that purpose.
> Thanks,
>
> - Dongwon
>
> On Wed, Mar 20, 2019 at 12:04 PM Kurt Young <yk...@gmail.com> wrote:
>
>> Hi Dongwon,
>>
>> AFAIK, Flink doesn't support the usage like "myscalar(col1, col2, col3)
>> as (col4, col5)". Am I missing something?
>>
>> If you want to split Tuple2 into two different columns, you can use UDTF.
>>
>> Best,
>> Kurt
>>
>>
>> On Wed, Mar 20, 2019 at 9:59 AM Dongwon Kim <ea...@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> I want to split Tuple2 returned by AggregateFunction.getValue into two
>>> different columns in a resultant table.
>>>
>>> Let's consider the following example where myudaf returns
>>> Tuple2<Boolean, Boolean>:
>>>
>>> Table table2 = table1
>>> .window(Slide.over("3.rows").every("1.rows").on("time").as("w"))
>>> .groupBy("w, name")
>>> .select("name, myudaf(col1, col2, col3) as (col4, col5)")
>>>
>>> Then table2.printSchema() returns (w/ Flink 1.7.2)
>>>
>>> root
>>> |-- name: String
>>> |-- col4: Java Tuple2<Boolean, Boolean>
>>>
>>> whereas my expectation is
>>>
>>> root
>>> |-- name: String
>>> |-- col4: Boolean
>>> |-- col5: Boolean
>>>
>>> When I define a scalar function which returns Tuple2 and use like
>>> "myscalar(col1, col2, col3) as (col4, col5)", it works as expected.
>>>
>>> Is there a possible way of splitting the tuple into two different
>>> columns in Flink-1.7.2?
>>> If not, do I have to define an additional UDF in order to flatten the
>>> tuple? or there's already one I can make use of?
>>>
>>> - Dongwon
>>
>>
Re: How to split tuple2 returned by UDAF into two columns in a result table
Posted by Dongwon Kim <ea...@gmail.com>.
Hi Kurt,
You're right; It is table function like "mytablefunc(col1, col2, col3) as
(col4, col5)".
I've got to define a custom UDTF for that purpose.
Thanks,
- Dongwon
On Wed, Mar 20, 2019 at 12:04 PM Kurt Young <yk...@gmail.com> wrote:
> Hi Dongwon,
>
> AFAIK, Flink doesn't support the usage like "myscalar(col1, col2, col3) as
> (col4, col5)". Am I missing something?
>
> If you want to split Tuple2 into two different columns, you can use UDTF.
>
> Best,
> Kurt
>
>
> On Wed, Mar 20, 2019 at 9:59 AM Dongwon Kim <ea...@gmail.com> wrote:
>
>> Hi,
>>
>> I want to split Tuple2 returned by AggregateFunction.getValue into two
>> different columns in a resultant table.
>>
>> Let's consider the following example where myudaf returns Tuple2<Boolean,
>> Boolean>:
>>
>> Table table2 = table1
>> .window(Slide.over("3.rows").every("1.rows").on("time").as("w"))
>> .groupBy("w, name")
>> .select("name, myudaf(col1, col2, col3) as (col4, col5)")
>>
>> Then table2.printSchema() returns (w/ Flink 1.7.2)
>>
>> root
>> |-- name: String
>> |-- col4: Java Tuple2<Boolean, Boolean>
>>
>> whereas my expectation is
>>
>> root
>> |-- name: String
>> |-- col4: Boolean
>> |-- col5: Boolean
>>
>> When I define a scalar function which returns Tuple2 and use like
>> "myscalar(col1, col2, col3) as (col4, col5)", it works as expected.
>>
>> Is there a possible way of splitting the tuple into two different columns
>> in Flink-1.7.2?
>> If not, do I have to define an additional UDF in order to flatten the
>> tuple? or there's already one I can make use of?
>>
>> - Dongwon
>
>
Re: How to split tuple2 returned by UDAF into two columns in a result table
Posted by Kurt Young <yk...@gmail.com>.
Hi Dongwon,
AFAIK, Flink doesn't support the usage like "myscalar(col1, col2, col3) as
(col4, col5)". Am I missing something?
If you want to split Tuple2 into two different columns, you can use UDTF.
Best,
Kurt
On Wed, Mar 20, 2019 at 9:59 AM Dongwon Kim <ea...@gmail.com> wrote:
> Hi,
>
> I want to split Tuple2 returned by AggregateFunction.getValue into two
> different columns in a resultant table.
>
> Let's consider the following example where myudaf returns Tuple2<Boolean,
> Boolean>:
>
> Table table2 = table1
> .window(Slide.over("3.rows").every("1.rows").on("time").as("w"))
> .groupBy("w, name")
> .select("name, myudaf(col1, col2, col3) as (col4, col5)")
>
> Then table2.printSchema() returns (w/ Flink 1.7.2)
>
> root
> |-- name: String
> |-- col4: Java Tuple2<Boolean, Boolean>
>
> whereas my expectation is
>
> root
> |-- name: String
> |-- col4: Boolean
> |-- col5: Boolean
>
> When I define a scalar function which returns Tuple2 and use like
> "myscalar(col1, col2, col3) as (col4, col5)", it works as expected.
>
> Is there a possible way of splitting the tuple into two different columns
> in Flink-1.7.2?
> If not, do I have to define an additional UDF in order to flatten the
> tuple? or there's already one I can make use of?
>
> - Dongwon