You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by aj <aj...@gmail.com> on 2020/05/21 06:47:20 UTC

Flink Window with multiple trigger condition

Hello All,

I am getting a lot of user events in a  stream. There are different types
of events, now I want to build some aggregation metrics for the user by
grouping events in buckets.

My condition for windowing is :

1. Start the window for the user when event_name: *"search"  *arrived for
the user.
2. Trigger the window when
      either 30 mins from the start of the window reached
       OR
       event_type : *"start" *is appeared.

After that, I want to do calculate some aggregation on those window events.
I know this can be done using process function but I am stuck to create the
window with multiple conditions trigger.

Please help me how to create this type of window with multiple
trigger condition either time or some event happen.


-- 
Thanks & Regards,
Anuj Jain


<http://www.cse.iitm.ac.in/%7Eanujjain/>

Re: Re: Re: Flink Window with multiple trigger condition

Posted by aj <aj...@gmail.com>.
Thanks Yun.

I have converted the code to use a keyed-processed function rather than a
flatMap and using register timer it worked.

On Fri, May 29, 2020 at 11:13 AM Yun Gao <yu...@aliyun.com> wrote:

> Hi,
>
>      I think you could use *timer* to achieve that. In *processFunction*
> you could register a timer at specific time (event time or processing time)
> and get callbacked at that point. It could be registered like
>
>
> ctx.timerService().registerEventTimeTimer(current.lastModified + 60000);
>
>
>     More details on timer could be found in [1] and an example is in [2].
> In this example, a timer is registered in the last line of the
> *processElement* method, and the callback is implemented by override the
> *onTimer* method.
>
>    [1]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/process_function.html#timers
>    [2]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/process_function.html#example
>
>
> ------------------Original Mail ------------------
> *Sender:*aj <aj...@gmail.com>
> *Send Date:*Fri May 29 02:07:33 2020
> *Recipients:*Yun Gao <yu...@aliyun.com>
> *CC:*user <us...@flink.apache.org>
> *Subject:*Re: Re: Flink Window with multiple trigger condition
>
>> Hi,
>>
>> I have implemented the below solution and its working fine but the
>> biggest problem with this is if no event coming for the user after 30 min
>> then I am not able to trigger because I am checking
>> time diff from upcoming events. So when the next event comes than only it
>> triggers but I want it to trigger just after 30 mins.
>>
>> So please help me to improve this and how to solve the above problem.
>>
>>
>>
>> public class DemandSessionFlatMap extends RichFlatMapFunction<Tuple2<Long, GenericRecord>, DemandSessionSummaryTuple> {
>>
>>     private static final Logger LOGGER = LoggerFactory.getLogger(DemandSessionFlatMap.class);
>>
>>     private transient ValueState<Tuple3<String, Long, Long>> timeState; // maintain session_id starttime and endtime
>>     private transient MapState<String, DemandSessionSummaryTuple> sessionSummary; // map for hex9 and summarytuple
>>
>>     @Override
>>     public void open(Configuration config) {
>>
>>         ValueStateDescriptor<Tuple3<String, Long, Long>> timeDescriptor =
>>                 new ValueStateDescriptor<>(
>>                         "time_state", // the state name
>>                         TypeInformation.of(new TypeHint<Tuple3<String, Long, Long>>() {
>>                         }), // type information
>>                         Tuple3.of(null, 0L, 0L)); // default value of the state, if nothing was set
>>         timeState = getRuntimeContext().getState(timeDescriptor);
>>
>>         MapStateDescriptor<String, DemandSessionSummaryTuple> descriptor =
>>                 new MapStateDescriptor<String, DemandSessionSummaryTuple>("demand_session",
>>                         TypeInformation.of(new TypeHint<String>() {
>>                         }), TypeInformation.of(new TypeHint<DemandSessionSummaryTuple>() {
>>                 }));
>>         sessionSummary = getRuntimeContext().getMapState(descriptor);
>>
>>     }
>>
>>     @Override
>>     public void flatMap(Tuple2<Long, GenericRecord> recordTuple2, Collector<DemandSessionSummaryTuple> collector) throws Exception {
>>         GenericRecord record = recordTuple2.f1;
>>         String event_name = record.get("event_name").toString();
>>         long event_ts = (Long) record.get("event_ts");
>>         Tuple3<String, Long, Long> currentTimeState = timeState.value();
>>
>>         if (event_name.equals("search_list_keyless") && currentTimeState.f1 == 0) {
>>             currentTimeState.f1 = event_ts;
>>             String demandSessionId = UUID.randomUUID().toString();
>>             currentTimeState.f0 = demandSessionId;
>>         }
>>
>>         long timeDiff = event_ts - currentTimeState.f1;
>>
>>         if (event_name.equals("keyless_start_trip") || timeDiff >= 1800000) {
>>             Tuple3<String, Long, Long> finalCurrentTimeState = currentTimeState;
>>             sessionSummary.entries().forEach( tuple ->{
>>                 String key = tuple.getKey();
>>                 DemandSessionSummaryTuple sessionSummaryTuple = tuple.getValue();
>>                 try {
>>                     sessionSummaryTuple.setEndTime(finalCurrentTimeState.f2);
>>                     collector.collect(sessionSummaryTuple);
>>                 } catch (Exception e) {
>>                     e.printStackTrace();
>>                 }
>>
>>             });
>>             timeState.clear();
>>             sessionSummary.clear();
>>             currentTimeState = timeState.value();
>>         }
>>
>>         if (event_name.equals("search_list_keyless") && currentTimeState.f1 == 0) {
>>             currentTimeState.f1 = event_ts;
>>             String demandSessionId = UUID.randomUUID().toString();
>>             currentTimeState.f0 = demandSessionId;
>>         }
>>         currentTimeState.f2 = event_ts;
>>
>>         if (currentTimeState.f1 > 0) {
>>             String search_hex9 = record.get("search_hex9") != null ? record.get("search_hex9").toString() : null;
>>             DemandSessionSummaryTuple currentTuple = sessionSummary.get(search_hex9) != null ? sessionSummary.get(search_hex9) : new DemandSessionSummaryTuple();
>>
>>             if (sessionSummary.get(search_hex9) == null) {
>>                 currentTuple.setSearchHex9(search_hex9);
>>                 currentTuple.setUserId(recordTuple2.f0);
>>                 currentTuple.setStartTime(currentTimeState.f1);
>>                 currentTuple.setDemandSessionId(currentTimeState.f0);
>>             }
>>
>>             if (event_name.equals("search_list_keyless")) {
>>                 currentTuple.setTotalSearch(currentTuple.getTotalSearch() + 1);
>>                 SearchSummaryCalculation(record, currentTuple);
>>             }
>>             sessionSummary.put(search_hex9, currentTuple);
>>         }
>>         timeState.update(currentTimeState);
>>     }
>>
>>
>>
>>
>>
>>
>> On Sun, May 24, 2020 at 10:57 PM Yun Gao <yu...@aliyun.com> wrote:
>>
>>> Hi,
>>>
>>>        First sorry that I'm not expert on Window and please correct me
>>> if I'm wrong, but from my side, it seems the assigner might also be a
>>> problem in addition to the trigger: currently Flink window assigner should
>>> be all based on time (processing time or event time), and it might be hard
>>> to implement an event-driven window assigner that start to assign elements
>>> to a window after received some elements.
>>>
>>>       What comes to me is that a possible alternative method is to use
>>> the low-level *KeyedProcessFunction* directly:  you may register a
>>> timer 30 mins later when received the "*search*" event and write the
>>> time of search event into the state. Then for the following events, they
>>> will be saved to the state since the flag is set. After received the "
>>> *start*" event or the timer is triggered, you could load all the events
>>> from the states, do the aggregation and cancel the timer if it is triggered
>>> by "*start*" event. A simpler case is [1] and it does not consider stop
>>> the aggreation when received special event, but it seems that the logic
>>> could be added to the case.
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/operators/process_function.html#example
>>>
>>> Best,
>>>  Yun
>>>
>>>
>>>
>>> ------------------Original Mail ------------------
>>> *Sender:*aj <aj...@gmail.com>
>>> *Send Date:*Sun May 24 01:10:55 2020
>>> *Recipients:*Tzu-Li (Gordon) Tai <tz...@apache.org>
>>> *CC:*user <us...@flink.apache.org>
>>> *Subject:*Re: Flink Window with multiple trigger condition
>>>
>>>>
>>>> I am still not able to get much after reading the stuff. Please help
>>>> with some basic code to start to build this window and trigger.
>>>>
>>>> Another option I am thinking is I just use a Richflatmap function and
>>>> use the keyed state to build this logic. Is that the correct approach?
>>>>
>>>>
>>>>
>>>> On Fri, May 22, 2020 at 4:52 PM aj <aj...@gmail.com> wrote:
>>>>
>>>>>
>>>>>
>>>>> I was also thinking to have a processing time window but that will not
>>>>> work for me. I want to start the window when the user  "*search*"
>>>>> event arrives. So for each user window will start from the *search*
>>>>> event.
>>>>>  The Tumbling window has fixed start end time so that will not be
>>>>> suitable in my case.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Fri, May 22, 2020 at 10:23 AM Tzu-Li (Gordon) Tai <
>>>>> tzulitai@apache.org> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> To achieve what you have in mind, I think what you have to do is to
>>>>>> use a
>>>>>> processing time window of 30 mins, and have a custom trigger that
>>>>>> matches
>>>>>> the "start" event in the `onElement` method and return
>>>>>> TriggerResult.FIRE_AND_PURGE.
>>>>>>
>>>>>> That way, the window fires either when the processing time has
>>>>>> passed, or
>>>>>> the start event was recieved.
>>>>>>
>>>>>> Cheers,
>>>>>> Gordon
>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Sent from:
>>>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Thanks & Regards,
>>>>> Anuj Jain
>>>>> Mob. : +91- 8588817877
>>>>> Skype : anuj.jain07
>>>>> <http://www.oracle.com/>
>>>>>
>>>>>
>>>>> <http://www.cse.iitm.ac.in/%7Eanujjain/>
>>>>>
>>>>
>>>>
>>>> --
>>>> Thanks & Regards,
>>>> Anuj Jain
>>>> Mob. : +91- 8588817877
>>>> Skype : anuj.jain07
>>>> <http://www.oracle.com/>
>>>>
>>>>
>>>> <http://www.cse.iitm.ac.in/%7Eanujjain/>
>>>>
>>>
>>
>> --
>> Thanks & Regards,
>> Anuj Jain
>> Mob. : +91- 8588817877
>> Skype : anuj.jain07
>> <http://www.oracle.com/>
>>
>>
>> <http://www.cse.iitm.ac.in/%7Eanujjain/>
>>
>

-- 
Thanks & Regards,
Anuj Jain
Mob. : +91- 8588817877
Skype : anuj.jain07
<http://www.oracle.com/>


<http://www.cse.iitm.ac.in/%7Eanujjain/>

Re: Re: Re: Flink Window with multiple trigger condition

Posted by Yun Gao <yu...@aliyun.com>.
Hi,
     I think you could use timer to achieve that. In processFunction you could register a timer at specific time (event time or processing time) and get callbacked at that point. It could be registered like 
ctx.timerService().registerEventTimeTimer(current.lastModified + 60000);
    More details on timer could be found in [1] and an example is in [2]. In this example, a timer is registered in the last line of the processElement method, and the callback is implemented by override the onTimer method.

   [1] https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/process_function.html#timers
   [2] https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/process_function.html#example



 ------------------Original Mail ------------------
Sender:aj <aj...@gmail.com>
Send Date:Fri May 29 02:07:33 2020
Recipients:Yun Gao <yu...@aliyun.com>
CC:user <us...@flink.apache.org>
Subject:Re: Re: Flink Window with multiple trigger condition

Hi,

I have implemented the below solution and its working fine but the biggest problem with this is if no event coming for the user after 30 min then I am not able to trigger because I am checking
time diff from upcoming events. So when the next event comes than only it triggers but I want it to trigger just after 30 mins. 

So please help me to improve this and how to solve the above problem.



public class DemandSessionFlatMap extends RichFlatMapFunction<Tuple2<Long, GenericRecord>, DemandSessionSummaryTuple> {

    private static final Logger LOGGER = LoggerFactory.getLogger(DemandSessionFlatMap.class);

    private transient ValueState<Tuple3<String, Long, Long>> timeState; // maintain session_id starttime and endtime 
    private transient MapState<String, DemandSessionSummaryTuple> sessionSummary; // map for hex9 and summarytuple

    @Override
    public void open(Configuration config) {

        ValueStateDescriptor<Tuple3<String, Long, Long>> timeDescriptor =
                new ValueStateDescriptor<>(
                        "time_state", // the state name
                        TypeInformation.of(new TypeHint<Tuple3<String, Long, Long>>() {
                        }), // type information
                        Tuple3.of(null, 0L, 0L)); // default value of the state, if nothing was set
        timeState = getRuntimeContext().getState(timeDescriptor);

        MapStateDescriptor<String, DemandSessionSummaryTuple> descriptor =
                new MapStateDescriptor<String, DemandSessionSummaryTuple>("demand_session",
                        TypeInformation.of(new TypeHint<String>() {
                        }), TypeInformation.of(new TypeHint<DemandSessionSummaryTuple>() {
                }));
        sessionSummary = getRuntimeContext().getMapState(descriptor);

    }

    @Override
    public void flatMap(Tuple2<Long, GenericRecord> recordTuple2, Collector<DemandSessionSummaryTuple> collector) throws Exception {
        GenericRecord record = recordTuple2.f1;
        String event_name = record.get("event_name").toString();
        long event_ts = (Long) record.get("event_ts");
        Tuple3<String, Long, Long> currentTimeState = timeState.value();

        if (event_name.equals("search_list_keyless") && currentTimeState.f1 == 0) {
            currentTimeState.f1 = event_ts;
            String demandSessionId = UUID.randomUUID().toString();
            currentTimeState.f0 = demandSessionId;
        }

        long timeDiff = event_ts - currentTimeState.f1;

        if (event_name.equals("keyless_start_trip") || timeDiff >= 1800000) {
            Tuple3<String, Long, Long> finalCurrentTimeState = currentTimeState;
            sessionSummary.entries().forEach( tuple ->{
                String key = tuple.getKey();
                DemandSessionSummaryTuple sessionSummaryTuple = tuple.getValue();
                try {
                    sessionSummaryTuple.setEndTime(finalCurrentTimeState.f2);
                    collector.collect(sessionSummaryTuple);
                } catch (Exception e) {
                    e.printStackTrace();
                }

            });
            timeState.clear();
            sessionSummary.clear();
            currentTimeState = timeState.value();
        }

        if (event_name.equals("search_list_keyless") && currentTimeState.f1 == 0) {
            currentTimeState.f1 = event_ts;
            String demandSessionId = UUID.randomUUID().toString();
            currentTimeState.f0 = demandSessionId;
        }
        currentTimeState.f2 = event_ts;

        if (currentTimeState.f1 > 0) {
            String search_hex9 = record.get("search_hex9") != null ? record.get("search_hex9").toString() : null;
            DemandSessionSummaryTuple currentTuple = sessionSummary.get(search_hex9) != null ? sessionSummary.get(search_hex9) : new DemandSessionSummaryTuple();

            if (sessionSummary.get(search_hex9) == null) {
                currentTuple.setSearchHex9(search_hex9);
                currentTuple.setUserId(recordTuple2.f0);
                currentTuple.setStartTime(currentTimeState.f1);
                currentTuple.setDemandSessionId(currentTimeState.f0);
            }

            if (event_name.equals("search_list_keyless")) {
                currentTuple.setTotalSearch(currentTuple.getTotalSearch() + 1);
                SearchSummaryCalculation(record, currentTuple);
            }
            sessionSummary.put(search_hex9, currentTuple);
        }
        timeState.update(currentTimeState);
    }





On Sun, May 24, 2020 at 10:57 PM Yun Gao <yu...@aliyun.com> wrote:

Hi,

       First sorry that I'm not expert on Window and please correct me if I'm wrong, but from my side, it seems the assigner might also be a problem in addition to the trigger: currently Flink window assigner should be all based on time (processing time or event time), and it might be hard to implement an event-driven window assigner that start to assign elements to a window after received some elements. 
      What comes to me is that a possible alternative method is to use the low-level KeyedProcessFunction directly:  you may register a timer 30 mins later when received the "search" event and write the time of search event into the state. Then for the following events, they will be saved to the state since the flag is set. After received the "start" event or the timer is triggered, you could load all the events from the states, do the aggregation and cancel the timer if it is triggered by "start" event. A simpler case is [1] and it does not consider stop the aggreation when received special event, but it seems that the logic could be added to the case.

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/operators/process_function.html#example

Best,
 Yun




 ------------------Original Mail ------------------
Sender:aj <aj...@gmail.com>
Send Date:Sun May 24 01:10:55 2020
Recipients:Tzu-Li (Gordon) Tai <tz...@apache.org>
CC:user <us...@flink.apache.org>
Subject:Re: Flink Window with multiple trigger condition


I am still not able to get much after reading the stuff. Please help with some basic code to start to build this window and trigger. 

Another option I am thinking is I just use a Richflatmap function and use the keyed state to build this logic. Is that the correct approach? 



On Fri, May 22, 2020 at 4:52 PM aj <aj...@gmail.com> wrote:


I was also thinking to have a processing time window but that will not work for me. I want to start the window when the user  "search" event arrives. So for each user window will start from the search event. 
 The Tumbling window has fixed start end time so that will not be suitable in my case. 




On Fri, May 22, 2020 at 10:23 AM Tzu-Li (Gordon) Tai <tz...@apache.org> wrote:
Hi,

To achieve what you have in mind, I think what you have to do is to use a
processing time window of 30 mins, and have a custom trigger that matches
the "start" event in the `onElement` method and return
TriggerResult.FIRE_AND_PURGE.

That way, the window fires either when the processing time has passed, or
the start event was recieved.

Cheers,
Gordon



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


-- 
Thanks & Regards,
Anuj Jain
Mob. : +91- 8588817877 
Skype : anuj.jain07





-- 
Thanks & Regards,
Anuj Jain
Mob. : +91- 8588817877 
Skype : anuj.jain07





-- 
Thanks & Regards,
Anuj Jain
Mob. : +91- 8588817877 
Skype : anuj.jain07




Re: Re: Flink Window with multiple trigger condition

Posted by aj <aj...@gmail.com>.
Hi,

I have implemented the below solution and its working fine but the biggest
problem with this is if no event coming for the user after 30 min then I am
not able to trigger because I am checking
time diff from upcoming events. So when the next event comes than only it
triggers but I want it to trigger just after 30 mins.

So please help me to improve this and how to solve the above problem.



public class DemandSessionFlatMap extends
RichFlatMapFunction<Tuple2<Long, GenericRecord>,
DemandSessionSummaryTuple> {

    private static final Logger LOGGER =
LoggerFactory.getLogger(DemandSessionFlatMap.class);

    private transient ValueState<Tuple3<String, Long, Long>>
timeState; // maintain session_id starttime and endtime
    private transient MapState<String, DemandSessionSummaryTuple>
sessionSummary; // map for hex9 and summarytuple

    @Override
    public void open(Configuration config) {

        ValueStateDescriptor<Tuple3<String, Long, Long>> timeDescriptor =
                new ValueStateDescriptor<>(
                        "time_state", // the state name
                        TypeInformation.of(new TypeHint<Tuple3<String,
Long, Long>>() {
                        }), // type information
                        Tuple3.of(null, 0L, 0L)); // default value of
the state, if nothing was set
        timeState = getRuntimeContext().getState(timeDescriptor);

        MapStateDescriptor<String, DemandSessionSummaryTuple> descriptor =
                new MapStateDescriptor<String,
DemandSessionSummaryTuple>("demand_session",
                        TypeInformation.of(new TypeHint<String>() {
                        }), TypeInformation.of(new
TypeHint<DemandSessionSummaryTuple>() {
                }));
        sessionSummary = getRuntimeContext().getMapState(descriptor);

    }

    @Override
    public void flatMap(Tuple2<Long, GenericRecord> recordTuple2,
Collector<DemandSessionSummaryTuple> collector) throws Exception {
        GenericRecord record = recordTuple2.f1;
        String event_name = record.get("event_name").toString();
        long event_ts = (Long) record.get("event_ts");
        Tuple3<String, Long, Long> currentTimeState = timeState.value();

        if (event_name.equals("search_list_keyless") &&
currentTimeState.f1 == 0) {
            currentTimeState.f1 = event_ts;
            String demandSessionId = UUID.randomUUID().toString();
            currentTimeState.f0 = demandSessionId;
        }

        long timeDiff = event_ts - currentTimeState.f1;

        if (event_name.equals("keyless_start_trip") || timeDiff >= 1800000) {
            Tuple3<String, Long, Long> finalCurrentTimeState = currentTimeState;
            sessionSummary.entries().forEach( tuple ->{
                String key = tuple.getKey();
                DemandSessionSummaryTuple sessionSummaryTuple =
tuple.getValue();
                try {
                    sessionSummaryTuple.setEndTime(finalCurrentTimeState.f2);
                    collector.collect(sessionSummaryTuple);
                } catch (Exception e) {
                    e.printStackTrace();
                }

            });
            timeState.clear();
            sessionSummary.clear();
            currentTimeState = timeState.value();
        }

        if (event_name.equals("search_list_keyless") &&
currentTimeState.f1 == 0) {
            currentTimeState.f1 = event_ts;
            String demandSessionId = UUID.randomUUID().toString();
            currentTimeState.f0 = demandSessionId;
        }
        currentTimeState.f2 = event_ts;

        if (currentTimeState.f1 > 0) {
            String search_hex9 = record.get("search_hex9") != null ?
record.get("search_hex9").toString() : null;
            DemandSessionSummaryTuple currentTuple =
sessionSummary.get(search_hex9) != null ?
sessionSummary.get(search_hex9) : new DemandSessionSummaryTuple();

            if (sessionSummary.get(search_hex9) == null) {
                currentTuple.setSearchHex9(search_hex9);
                currentTuple.setUserId(recordTuple2.f0);
                currentTuple.setStartTime(currentTimeState.f1);
                currentTuple.setDemandSessionId(currentTimeState.f0);
            }

            if (event_name.equals("search_list_keyless")) {
                currentTuple.setTotalSearch(currentTuple.getTotalSearch() + 1);
                SearchSummaryCalculation(record, currentTuple);
            }
            sessionSummary.put(search_hex9, currentTuple);
        }
        timeState.update(currentTimeState);
    }






On Sun, May 24, 2020 at 10:57 PM Yun Gao <yu...@aliyun.com> wrote:

> Hi,
>
>        First sorry that I'm not expert on Window and please correct me if
> I'm wrong, but from my side, it seems the assigner might also be a problem
> in addition to the trigger: currently Flink window assigner should be all
> based on time (processing time or event time), and it might be hard to
> implement an event-driven window assigner that start to assign elements to
> a window after received some elements.
>
>       What comes to me is that a possible alternative method is to use the
> low-level *KeyedProcessFunction* directly:  you may register a timer 30
> mins later when received the "*search*" event and write the time of
> search event into the state. Then for the following events, they will be
> saved to the state since the flag is set. After received the "*start*"
> event or the timer is triggered, you could load all the events from the
> states, do the aggregation and cancel the timer if it is triggered by "
> *start*" event. A simpler case is [1] and it does not consider stop the
> aggreation when received special event, but it seems that the logic could
> be added to the case.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/operators/process_function.html#example
>
> Best,
>  Yun
>
>
>
> ------------------Original Mail ------------------
> *Sender:*aj <aj...@gmail.com>
> *Send Date:*Sun May 24 01:10:55 2020
> *Recipients:*Tzu-Li (Gordon) Tai <tz...@apache.org>
> *CC:*user <us...@flink.apache.org>
> *Subject:*Re: Flink Window with multiple trigger condition
>
>>
>> I am still not able to get much after reading the stuff. Please help with
>> some basic code to start to build this window and trigger.
>>
>> Another option I am thinking is I just use a Richflatmap function and use
>> the keyed state to build this logic. Is that the correct approach?
>>
>>
>>
>> On Fri, May 22, 2020 at 4:52 PM aj <aj...@gmail.com> wrote:
>>
>>>
>>>
>>> I was also thinking to have a processing time window but that will not
>>> work for me. I want to start the window when the user  "*search*" event
>>> arrives. So for each user window will start from the *search* event.
>>>  The Tumbling window has fixed start end time so that will not be
>>> suitable in my case.
>>>
>>>
>>>
>>>
>>> On Fri, May 22, 2020 at 10:23 AM Tzu-Li (Gordon) Tai <
>>> tzulitai@apache.org> wrote:
>>>
>>>> Hi,
>>>>
>>>> To achieve what you have in mind, I think what you have to do is to use
>>>> a
>>>> processing time window of 30 mins, and have a custom trigger that
>>>> matches
>>>> the "start" event in the `onElement` method and return
>>>> TriggerResult.FIRE_AND_PURGE.
>>>>
>>>> That way, the window fires either when the processing time has passed,
>>>> or
>>>> the start event was recieved.
>>>>
>>>> Cheers,
>>>> Gordon
>>>>
>>>>
>>>>
>>>> --
>>>> Sent from:
>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>>>
>>>
>>>
>>> --
>>> Thanks & Regards,
>>> Anuj Jain
>>> Mob. : +91- 8588817877
>>> Skype : anuj.jain07
>>> <http://www.oracle.com/>
>>>
>>>
>>> <http://www.cse.iitm.ac.in/%7Eanujjain/>
>>>
>>
>>
>> --
>> Thanks & Regards,
>> Anuj Jain
>> Mob. : +91- 8588817877
>> Skype : anuj.jain07
>> <http://www.oracle.com/>
>>
>>
>> <http://www.cse.iitm.ac.in/%7Eanujjain/>
>>
>

-- 
Thanks & Regards,
Anuj Jain
Mob. : +91- 8588817877
Skype : anuj.jain07
<http://www.oracle.com/>


<http://www.cse.iitm.ac.in/%7Eanujjain/>

Re: Re: Flink Window with multiple trigger condition

Posted by Yun Gao <yu...@aliyun.com>.
Hi,

       First sorry that I'm not expert on Window and please correct me if I'm wrong, but from my side, it seems the assigner might also be a problem in addition to the trigger: currently Flink window assigner should be all based on time (processing time or event time), and it might be hard to implement an event-driven window assigner that start to assign elements to a window after received some elements. 
      What comes to me is that a possible alternative method is to use the low-level KeyedProcessFunction directly:  you may register a timer 30 mins later when received the "search" event and write the time of search event into the state. Then for the following events, they will be saved to the state since the flag is set. After received the "start" event or the timer is triggered, you could load all the events from the states, do the aggregation and cancel the timer if it is triggered by "start" event. A simpler case is [1] and it does not consider stop the aggreation when received special event, but it seems that the logic could be added to the case.

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/operators/process_function.html#example

Best,
 Yun




 ------------------Original Mail ------------------
Sender:aj <aj...@gmail.com>
Send Date:Sun May 24 01:10:55 2020
Recipients:Tzu-Li (Gordon) Tai <tz...@apache.org>
CC:user <us...@flink.apache.org>
Subject:Re: Flink Window with multiple trigger condition


I am still not able to get much after reading the stuff. Please help with some basic code to start to build this window and trigger. 

Another option I am thinking is I just use a Richflatmap function and use the keyed state to build this logic. Is that the correct approach? 



On Fri, May 22, 2020 at 4:52 PM aj <aj...@gmail.com> wrote:


I was also thinking to have a processing time window but that will not work for me. I want to start the window when the user  "search" event arrives. So for each user window will start from the search event. 
 The Tumbling window has fixed start end time so that will not be suitable in my case. 




On Fri, May 22, 2020 at 10:23 AM Tzu-Li (Gordon) Tai <tz...@apache.org> wrote:
Hi,

To achieve what you have in mind, I think what you have to do is to use a
processing time window of 30 mins, and have a custom trigger that matches
the "start" event in the `onElement` method and return
TriggerResult.FIRE_AND_PURGE.

That way, the window fires either when the processing time has passed, or
the start event was recieved.

Cheers,
Gordon



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


-- 
Thanks & Regards,
Anuj Jain
Mob. : +91- 8588817877 
Skype : anuj.jain07





-- 
Thanks & Regards,
Anuj Jain
Mob. : +91- 8588817877 
Skype : anuj.jain07




Re: Flink Window with multiple trigger condition

Posted by aj <aj...@gmail.com>.
I am still not able to get much after reading the stuff. Please help with
some basic code to start to build this window and trigger.

Another option I am thinking is I just use a Richflatmap function and use
the keyed state to build this logic. Is that the correct approach?



On Fri, May 22, 2020 at 4:52 PM aj <aj...@gmail.com> wrote:

>
>
> I was also thinking to have a processing time window but that will not
> work for me. I want to start the window when the user  "*search*" event
> arrives. So for each user window will start from the *search* event.
>  The Tumbling window has fixed start end time so that will not be suitable
> in my case.
>
>
>
>
> On Fri, May 22, 2020 at 10:23 AM Tzu-Li (Gordon) Tai <tz...@apache.org>
> wrote:
>
>> Hi,
>>
>> To achieve what you have in mind, I think what you have to do is to use a
>> processing time window of 30 mins, and have a custom trigger that matches
>> the "start" event in the `onElement` method and return
>> TriggerResult.FIRE_AND_PURGE.
>>
>> That way, the window fires either when the processing time has passed, or
>> the start event was recieved.
>>
>> Cheers,
>> Gordon
>>
>>
>>
>> --
>> Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>
>
>
> --
> Thanks & Regards,
> Anuj Jain
> Mob. : +91- 8588817877
> Skype : anuj.jain07
> <http://www.oracle.com/>
>
>
> <http://www.cse.iitm.ac.in/%7Eanujjain/>
>


-- 
Thanks & Regards,
Anuj Jain
Mob. : +91- 8588817877
Skype : anuj.jain07
<http://www.oracle.com/>


<http://www.cse.iitm.ac.in/%7Eanujjain/>

Re: Flink Window with multiple trigger condition

Posted by aj <aj...@gmail.com>.
I was also thinking to have a processing time window but that will not work
for me. I want to start the window when the user  "*search*" event arrives.
So for each user window will start from the *search* event.
 The Tumbling window has fixed start end time so that will not be suitable
in my case.




On Fri, May 22, 2020 at 10:23 AM Tzu-Li (Gordon) Tai <tz...@apache.org>
wrote:

> Hi,
>
> To achieve what you have in mind, I think what you have to do is to use a
> processing time window of 30 mins, and have a custom trigger that matches
> the "start" event in the `onElement` method and return
> TriggerResult.FIRE_AND_PURGE.
>
> That way, the window fires either when the processing time has passed, or
> the start event was recieved.
>
> Cheers,
> Gordon
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


-- 
Thanks & Regards,
Anuj Jain
Mob. : +91- 8588817877
Skype : anuj.jain07
<http://www.oracle.com/>


<http://www.cse.iitm.ac.in/%7Eanujjain/>

Re: Flink Window with multiple trigger condition

Posted by "Tzu-Li (Gordon) Tai" <tz...@apache.org>.
Hi,

To achieve what you have in mind, I think what you have to do is to use a
processing time window of 30 mins, and have a custom trigger that matches
the "start" event in the `onElement` method and return
TriggerResult.FIRE_AND_PURGE.

That way, the window fires either when the processing time has passed, or
the start event was recieved.

Cheers,
Gordon



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Flink Window with multiple trigger condition

Posted by aj <aj...@gmail.com>.
Session window defined on the gap of inactivity, I do not have that
requirement.

Start the window only on the "*search even*t" that part I will take later.

Let's say in the first phase I want to start the window on any event that
appears for that user.

For example :

*Scenario -1*
t1 ----- user1  ---- event1 ( window start)
t1 +5 mins ----- user1 ----- event2
t1 + 10 mins --- user1 ---- event3
t1 + 15 mins ----- user1 ---- event4===start type event (terminate window
as event type "*Start*" arrived and calculate aggregate on above collected
events)

t1+16 mins ---user-1  ---- event 5 starts a new window


*Scenario -2*
t1 ----- user1  ---- event1 ( window start)
t1 +5 mins ----- user1 ----- event2
t1 + 10 mins --- user1 ---- event3
t1 + 30 mins ----- user1 ---- event4 (terminates the window as 30 mins
elapsed and calculate aggregate on above collected events)

t1+31 mins ---user-1  ---- event5  starts a new window

This I want to implement. I have tried to read triggers but did not getting
understand how to trigger when either time pass or eventtype==* "start"*
has arrived.  Which function of trigger class I have to implement and how
to check these 2  conditions on each event arrive.

Please help to implement this. If you can provide a basic start function
that I need to implement. I am not clear how to start.



















On Thu, May 21, 2020 at 4:59 PM Jiayi Liao <bu...@gmail.com> wrote:

>
> According to your description, it's more like a session window scenario
> rather than a tumbling window, you can find more details in [1]. But the
> difference between your window and Flink
> 's session window is, the session length is defined by the first element
> in your case. I'm afraid that Flink does't have implementations for such
> scenarios, you may need to create your own WindowAssigner.
>
> For the trigger, yes, you can always implement a trigger to determine the
> lifecyle of a window.
>
>
>
> [1].
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#session-windows
>
>
>
> Best,
> Jiayi Liao
>
> On Thu, May 21, 2020 at 5:51 PM aj <aj...@gmail.com> wrote:
>
>> Hi Liao,
>>
>> Thanks for the response. I am reading all this as I never implemented
>> this.
>>
>> > Start the window for the user when event_name: *"search"  *arrived for
>> the user.
>>
>> I'm not very sure this is right way to define the window in your business
>> if you use event time, because there may exist out-of-order events that
>> happened after "search" event, but arrive before "search" event, which will
>> be discarded because you haven't assigned a window. (If I understand
>> correctly)
>>
>> *Yes you are right and I raised this concern to the business team and we
>> still in discussion. *
>>
>> But let say if I do not need the above condition if I want to start the
>> window whenever the first event of the particular user event appears and
>> then bucket those events with similar conditions (either 30 mins from the
>> start of the window reached or event_type: *"start" *is appeared). So,
>> in that case, can I use *TumblingProcessingTimeWindows *with 30 mins,
>> and on that can I put a custom trigger that before 30 mins if event_type: *"start"
>> is *arrived than the process the window.
>> Is this correct understanding like if let stay *start* event arrived at
>> 20 mins from window start then that window will be close and processed and
>> events that arriving after that will be assign to the new window or window
>> will continue till 30 mins.
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> On Thu, May 21, 2020 at 2:55 PM Jiayi Liao <bu...@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>>
>>> > Start the window for the user when event_name: *"search"  *arrived
>>> for the user.
>>>
>>> I'm not very sure this is right way to define the window in your
>>> business if you use event time, because there may exist out-of-order events
>>> that happened after "search" event, but arrive before "search" event, which
>>> will be discarded because you haven't assigned a window. (If I understand
>>> correctly)
>>>
>>> Back to the problem, I think you can implement your own #WindowAssigner,
>>> which will create a window based on the event's name. Take a look at our
>>> existing implementations like #TumblingEventWindows.
>>>
>>> > Trigger the window when either 30 mins from the start of
>>> the window reached or event_type : *"start" *is appeared
>>>
>>> This can also be implemented with a customed #Trigger. The timing of
>>> being triggered can be set by registering timers with Flink's internal
>>> timer service. Take a look at #EventTimeTrigger, it's easy to implement it.
>>>
>>>
>>> Best,
>>> Jiayi Liao
>>>
>>> On Thu, May 21, 2020 at 2:47 PM aj <aj...@gmail.com> wrote:
>>>
>>>>
>>>> Hello All,
>>>>
>>>> I am getting a lot of user events in a  stream. There are different
>>>> types of events, now I want to build some aggregation metrics for the user
>>>> by grouping events in buckets.
>>>>
>>>> My condition for windowing is :
>>>>
>>>> 1. Start the window for the user when event_name: *"search"  *arrived
>>>> for the user.
>>>> 2. Trigger the window when
>>>>       either 30 mins from the start of the window reached
>>>>        OR
>>>>        event_type : *"start" *is appeared.
>>>>
>>>> After that, I want to do calculate some aggregation on those window
>>>> events. I know this can be done using process function but I am stuck to
>>>> create the window with multiple conditions trigger.
>>>>
>>>> Please help me how to create this type of window with multiple
>>>> trigger condition either time or some event happen.
>>>>
>>>>
>>>> --
>>>> Thanks & Regards,
>>>> Anuj Jain
>>>>
>>>>
>>>> <http://www.cse.iitm.ac.in/%7Eanujjain/>
>>>>
>>>
>>
>> --
>> Thanks & Regards,
>> Anuj Jain
>> Mob. : +91- 8588817877
>> Skype : anuj.jain07
>> <http://www.oracle.com/>
>>
>>
>> <http://www.cse.iitm.ac.in/%7Eanujjain/>
>>
>

-- 
Thanks & Regards,
Anuj Jain
Mob. : +91- 8588817877
Skype : anuj.jain07
<http://www.oracle.com/>


<http://www.cse.iitm.ac.in/%7Eanujjain/>