You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by shkob1 <sh...@gmail.com> on 2018/10/12 18:28:39 UTC

Custom Trigger + SQL Pattern

Hey!

I have a use case in which im grouping a stream by session id - so far
pretty standard, note that i need to do it through SQL and not by the table
api.
In my use case i have 2 trigger conditions though - while one is time
(session inactivity) the other is based on a specific event marked as a
"last" event.
AFAIK SQL does not support custom triggers - so what i end up doing is doing
group by in the SQL - then converting the result to a stream along with a
boolean field that marks whether at least one of the events was the end
event - then adding my custom trigger on top of it. 
It looks something like this:

 Table result = tableEnv.sqlQuery("select atLeastOneTrue(lastEvent),
sessionId, count(*) FROM source Group By sessionId");
tableEnv.toRetractStream(result, Row.class, streamQueryConfig)
                .filter(tuple -> tuple.f0)
                .map(...)
                .returns(...)
                .keyBy("sessionId")
                .window(EventTimeSessionWindows.withGap(Time.hours(4)))
                .trigger(new SessionEndedByTimeOrEndTrigger())
                .process(...take last element from the group by result..)

This seems like a weird work around to, isn't it? my window is basically of
the SQL result rather than on the source stream. Ideally i would keyby the
sessionId before running the SQL but then a) would I need to register a
table per key? b) would i be able to use the custom trigger per window?

basically i want to group by session id and have a window for every session
that supports both time and custom trigger. Assuming i need to use SQL
(reason is the query is dynamically loaded), is there a better solution for
it?










--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Custom Trigger + SQL Pattern

Posted by shkob1 <sh...@gmail.com>.
following up on the actual question - is there a way to register a
keyedstream as table(s) and have a trigger per key? 



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Custom Trigger + SQL Pattern

Posted by Shahar Cizer Kobrinsky <sh...@gmail.com>.
Thanks for the answer Hequn!

To be honest im still trying to wrap my head around this solution, also
trying to think whether it has advantages over my solution.
My original thought was that my design is "backwards" because logically i
would want to

   1. collect raw records
   2. partition them by session id to windows
   3. wait for the session to end (=accumulate)
   4. run the group by & aggregation on the ended session's row - emit the
   result

my solution though (and I assume yours too) is rather doing the aggregation
for every record coming in, which seems wasteful. It doesn't have any
benefit of storage on state as it's emitting to a retractable stream
anyway
What do you think?

Shahar


On Tue, Oct 16, 2018 at 7:02 PM Hequn Cheng <ch...@gmail.com> wrote:

> Hi Shahar,
>
> The table function takes a single row but can output multi rows. You can
> split the row based on the "last" event. The code looks like:
>
>     val sessionResult =
>>       "SELECT " +
>>         "  lastUDAF(line) AS lastEvents "
>>         "FROM MyTable " +
>>         "GROUP BY SESSION(rowtime, INTERVAL '4' HOUR)"
>>     val result =
>>       s"SELECT lastEvent FROM ($sessionResult), LATERAL TABLE(splitUDTF(lastEvents))
>> as T(lastEvent)"
>
>
> The lastUDAF is used to process data in a session window. As your
> lastEvent is base on either window end or a special "last" event, the
> lastUDAF outputs multi last events.
> After the window, we perform a splitUDTF to split the lastEvents to multi
> single events.
>
> Best, Hequn
>
>
> On Wed, Oct 17, 2018 at 12:38 AM Shahar Cizer Kobrinsky <
> shahar.kobrinsky@gmail.com> wrote:
>
>> Im wondering how does that work, it seems that a table function still
>> takes a single row's values as an input, am i wrong (or at least that is
>> how the examples show)?
>> How would the SQL look like?
>>
>> On Fri, Oct 12, 2018 at 9:15 PM Hequn Cheng <ch...@gmail.com> wrote:
>>
>>> Hi shkob1,
>>>
>>> > while one is time(session inactivity) the other is based on a specific
>>> event marked as a "last" event.
>>> How about using a session window and an udtf[1] to solve the problem.
>>> The session window may output multi `last` elements. However, we can use a
>>> udtf to split them into single ones. Thus, we can use SQL for the whole job.
>>>
>>> Best, Hequn.
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/udfs.html#table-functions
>>>
>>> On Sat, Oct 13, 2018 at 2:28 AM shkob1 <sh...@gmail.com>
>>> wrote:
>>>
>>>> Hey!
>>>>
>>>> I have a use case in which im grouping a stream by session id - so far
>>>> pretty standard, note that i need to do it through SQL and not by the
>>>> table
>>>> api.
>>>> In my use case i have 2 trigger conditions though - while one is time
>>>> (session inactivity) the other is based on a specific event marked as a
>>>> "last" event.
>>>> AFAIK SQL does not support custom triggers - so what i end up doing is
>>>> doing
>>>> group by in the SQL - then converting the result to a stream along with
>>>> a
>>>> boolean field that marks whether at least one of the events was the end
>>>> event - then adding my custom trigger on top of it.
>>>> It looks something like this:
>>>>
>>>>  Table result = tableEnv.sqlQuery("select atLeastOneTrue(lastEvent),
>>>> sessionId, count(*) FROM source Group By sessionId");
>>>> tableEnv.toRetractStream(result, Row.class, streamQueryConfig)
>>>>                 .filter(tuple -> tuple.f0)
>>>>                 .map(...)
>>>>                 .returns(...)
>>>>                 .keyBy("sessionId")
>>>>                 .window(EventTimeSessionWindows.withGap(Time.hours(4)))
>>>>                 .trigger(new SessionEndedByTimeOrEndTrigger())
>>>>                 .process(...take last element from the group by
>>>> result..)
>>>>
>>>> This seems like a weird work around to, isn't it? my window is
>>>> basically of
>>>> the SQL result rather than on the source stream. Ideally i would keyby
>>>> the
>>>> sessionId before running the SQL but then a) would I need to register a
>>>> table per key? b) would i be able to use the custom trigger per window?
>>>>
>>>> basically i want to group by session id and have a window for every
>>>> session
>>>> that supports both time and custom trigger. Assuming i need to use SQL
>>>> (reason is the query is dynamically loaded), is there a better solution
>>>> for
>>>> it?
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> Sent from:
>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>>>
>>>

Re: Custom Trigger + SQL Pattern

Posted by Hequn Cheng <ch...@gmail.com>.
Hi Shahar,

The table function takes a single row but can output multi rows. You can
split the row based on the "last" event. The code looks like:

    val sessionResult =
>       "SELECT " +
>         "  lastUDAF(line) AS lastEvents "
>         "FROM MyTable " +
>         "GROUP BY SESSION(rowtime, INTERVAL '4' HOUR)"
>     val result =
>       s"SELECT lastEvent FROM ($sessionResult), LATERAL TABLE(splitUDTF(lastEvents))
> as T(lastEvent)"


The lastUDAF is used to process data in a session window. As your lastEvent
is base on either window end or a special "last" event, the lastUDAF
outputs multi last events.
After the window, we perform a splitUDTF to split the lastEvents to multi
single events.

Best, Hequn


On Wed, Oct 17, 2018 at 12:38 AM Shahar Cizer Kobrinsky <
shahar.kobrinsky@gmail.com> wrote:

> Im wondering how does that work, it seems that a table function still
> takes a single row's values as an input, am i wrong (or at least that is
> how the examples show)?
> How would the SQL look like?
>
> On Fri, Oct 12, 2018 at 9:15 PM Hequn Cheng <ch...@gmail.com> wrote:
>
>> Hi shkob1,
>>
>> > while one is time(session inactivity) the other is based on a specific
>> event marked as a "last" event.
>> How about using a session window and an udtf[1] to solve the problem. The
>> session window may output multi `last` elements. However, we can use a udtf
>> to split them into single ones. Thus, we can use SQL for the whole job.
>>
>> Best, Hequn.
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/udfs.html#table-functions
>>
>> On Sat, Oct 13, 2018 at 2:28 AM shkob1 <sh...@gmail.com>
>> wrote:
>>
>>> Hey!
>>>
>>> I have a use case in which im grouping a stream by session id - so far
>>> pretty standard, note that i need to do it through SQL and not by the
>>> table
>>> api.
>>> In my use case i have 2 trigger conditions though - while one is time
>>> (session inactivity) the other is based on a specific event marked as a
>>> "last" event.
>>> AFAIK SQL does not support custom triggers - so what i end up doing is
>>> doing
>>> group by in the SQL - then converting the result to a stream along with a
>>> boolean field that marks whether at least one of the events was the end
>>> event - then adding my custom trigger on top of it.
>>> It looks something like this:
>>>
>>>  Table result = tableEnv.sqlQuery("select atLeastOneTrue(lastEvent),
>>> sessionId, count(*) FROM source Group By sessionId");
>>> tableEnv.toRetractStream(result, Row.class, streamQueryConfig)
>>>                 .filter(tuple -> tuple.f0)
>>>                 .map(...)
>>>                 .returns(...)
>>>                 .keyBy("sessionId")
>>>                 .window(EventTimeSessionWindows.withGap(Time.hours(4)))
>>>                 .trigger(new SessionEndedByTimeOrEndTrigger())
>>>                 .process(...take last element from the group by result..)
>>>
>>> This seems like a weird work around to, isn't it? my window is basically
>>> of
>>> the SQL result rather than on the source stream. Ideally i would keyby
>>> the
>>> sessionId before running the SQL but then a) would I need to register a
>>> table per key? b) would i be able to use the custom trigger per window?
>>>
>>> basically i want to group by session id and have a window for every
>>> session
>>> that supports both time and custom trigger. Assuming i need to use SQL
>>> (reason is the query is dynamically loaded), is there a better solution
>>> for
>>> it?
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> --
>>> Sent from:
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>>
>>

Re: Custom Trigger + SQL Pattern

Posted by Shahar Cizer Kobrinsky <sh...@gmail.com>.
Im wondering how does that work, it seems that a table function still takes
a single row's values as an input, am i wrong (or at least that is how the
examples show)?
How would the SQL look like?

On Fri, Oct 12, 2018 at 9:15 PM Hequn Cheng <ch...@gmail.com> wrote:

> Hi shkob1,
>
> > while one is time(session inactivity) the other is based on a specific
> event marked as a "last" event.
> How about using a session window and an udtf[1] to solve the problem. The
> session window may output multi `last` elements. However, we can use a udtf
> to split them into single ones. Thus, we can use SQL for the whole job.
>
> Best, Hequn.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/udfs.html#table-functions
>
> On Sat, Oct 13, 2018 at 2:28 AM shkob1 <sh...@gmail.com> wrote:
>
>> Hey!
>>
>> I have a use case in which im grouping a stream by session id - so far
>> pretty standard, note that i need to do it through SQL and not by the
>> table
>> api.
>> In my use case i have 2 trigger conditions though - while one is time
>> (session inactivity) the other is based on a specific event marked as a
>> "last" event.
>> AFAIK SQL does not support custom triggers - so what i end up doing is
>> doing
>> group by in the SQL - then converting the result to a stream along with a
>> boolean field that marks whether at least one of the events was the end
>> event - then adding my custom trigger on top of it.
>> It looks something like this:
>>
>>  Table result = tableEnv.sqlQuery("select atLeastOneTrue(lastEvent),
>> sessionId, count(*) FROM source Group By sessionId");
>> tableEnv.toRetractStream(result, Row.class, streamQueryConfig)
>>                 .filter(tuple -> tuple.f0)
>>                 .map(...)
>>                 .returns(...)
>>                 .keyBy("sessionId")
>>                 .window(EventTimeSessionWindows.withGap(Time.hours(4)))
>>                 .trigger(new SessionEndedByTimeOrEndTrigger())
>>                 .process(...take last element from the group by result..)
>>
>> This seems like a weird work around to, isn't it? my window is basically
>> of
>> the SQL result rather than on the source stream. Ideally i would keyby the
>> sessionId before running the SQL but then a) would I need to register a
>> table per key? b) would i be able to use the custom trigger per window?
>>
>> basically i want to group by session id and have a window for every
>> session
>> that supports both time and custom trigger. Assuming i need to use SQL
>> (reason is the query is dynamically loaded), is there a better solution
>> for
>> it?
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> --
>> Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>
>

Re: Custom Trigger + SQL Pattern

Posted by Hequn Cheng <ch...@gmail.com>.
Hi shkob1,

> while one is time(session inactivity) the other is based on a specific
event marked as a "last" event.
How about using a session window and an udtf[1] to solve the problem. The
session window may output multi `last` elements. However, we can use a udtf
to split them into single ones. Thus, we can use SQL for the whole job.

Best, Hequn.

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/udfs.html#table-functions

On Sat, Oct 13, 2018 at 2:28 AM shkob1 <sh...@gmail.com> wrote:

> Hey!
>
> I have a use case in which im grouping a stream by session id - so far
> pretty standard, note that i need to do it through SQL and not by the table
> api.
> In my use case i have 2 trigger conditions though - while one is time
> (session inactivity) the other is based on a specific event marked as a
> "last" event.
> AFAIK SQL does not support custom triggers - so what i end up doing is
> doing
> group by in the SQL - then converting the result to a stream along with a
> boolean field that marks whether at least one of the events was the end
> event - then adding my custom trigger on top of it.
> It looks something like this:
>
>  Table result = tableEnv.sqlQuery("select atLeastOneTrue(lastEvent),
> sessionId, count(*) FROM source Group By sessionId");
> tableEnv.toRetractStream(result, Row.class, streamQueryConfig)
>                 .filter(tuple -> tuple.f0)
>                 .map(...)
>                 .returns(...)
>                 .keyBy("sessionId")
>                 .window(EventTimeSessionWindows.withGap(Time.hours(4)))
>                 .trigger(new SessionEndedByTimeOrEndTrigger())
>                 .process(...take last element from the group by result..)
>
> This seems like a weird work around to, isn't it? my window is basically of
> the SQL result rather than on the source stream. Ideally i would keyby the
> sessionId before running the SQL but then a) would I need to register a
> table per key? b) would i be able to use the custom trigger per window?
>
> basically i want to group by session id and have a window for every session
> that supports both time and custom trigger. Assuming i need to use SQL
> (reason is the query is dynamically loaded), is there a better solution for
> it?
>
>
>
>
>
>
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>