You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Piyush Shrivastava <pi...@yahoo.co.in> on 2016/04/25 11:22:10 UTC

Custom Trigger Implementation

Hi all,I want to implement a custom Trigger which fired a GlobalWindow in 1 minute for the first time and every 20 seconds after that.I believe I cannot get this logic right in the implementation of my custom Trigger. Please help me with this.
Here is the code of my custom Trigger:

public class TradeTrigger<W extends Window> extends Trigger<Object, W> {

    /**
     * 
     */
    private static final long serialVersionUID = 1L;
      
    private TradeTrigger() {
    }
    
    @Override
    public TriggerResult onElement(
            Object element,
            long timestamp,
            W window,
            org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext ctx)
            throws Exception {
        
        ctx.registerEventTimeTimer(timestamp);
        return TriggerResult.CONTINUE;
        
    }

    @Override
    public TriggerResult onEventTime(
            long timestamp,
            W window,
            org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext ctx)
            throws Exception {
    
    ValueState<Boolean> state = ctx.getPartitionedState(new ValueStateDescriptor<Boolean>("flag", Boolean.TYPE, false));
        
        if(state.value()==false){
            ctx.registerEventTimeTimer(timestamp+60000);
            state.update(true);
            return TriggerResult.FIRE;
        }else{
            System.out.println(""+state.value());
            ctx.registerEventTimeTimer(timestamp+20000);
            return TriggerResult.FIRE;
        }
    }

    @Override
    public TriggerResult onProcessingTime(
            long arg0,
            W arg1,
            org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext arg2)
            throws Exception {
        // TODO Auto-generated method stub
        return TriggerResult.CONTINUE;
    }
    
     
    public static <W extends Window> TradeTrigger<W> of() {
        return new TradeTrigger<>();
    }

    
}

 Thanks and Regards,Piyush Shrivastava
http://webograffiti.com

Re: Custom Trigger Implementation

Posted by Kostas Kloudas <k....@data-artisans.com>.
Good to hear that!

Kostas

> On Apr 25, 2016, at 12:24 PM, Piyush Shrivastava <pi...@yahoo.co.in> wrote:
> 
> Thanks a lot Kostas. This solved my problem.
>  
> Thanks and Regards,
> Piyush Shrivastava <ma...@webograffiti.com>
> 
> http://webograffiti.com <http://webograffiti.com/>
> 
> 
> On Monday, 25 April 2016 3:27 PM, Kostas Kloudas <k....@data-artisans.com> wrote:
> 
> 
> Hi,
> 
> Let me also add that you should also override the clear() method in order to clear you state.
> and delete the pending timers.
> 
> Kostas
> 
>> On Apr 25, 2016, at 11:52 AM, Kostas Kloudas <k.kloudas@data-artisans.com <ma...@data-artisans.com>> wrote:
>> 
> 
> Hi Piyush,
> 
> In the onElement function, you register a timer every time you receive an element. 
> 
> When the next watermark arrives, in the flag==false case, this will lead to every element 
> adding a timer for its timestamp+60000ms. The same for flag==true case, with 20000ms interval.
> 
> What you can try is to set only once, at the first element the initial trigger for 60 sec, and then 
> just set all the rest in the on the onEventTime with 20 sec.
> 
> To have a look at a custom trigger you can look at here:
> https://github.com/kl0u/flink-examples/blob/master/src/main/java/com/dataartisans/flinksolo/beam_comparison/customTriggers/EventTimeTriggerWithEarlyAndLateFiring.java <https://github.com/kl0u/flink-examples/blob/master/src/main/java/com/dataartisans/flinksolo/beam_comparison/customTriggers/EventTimeTriggerWithEarlyAndLateFiring.java>
> 
> I hope this helped.
> Let me know if you need any help.
> 
> Kostas
> 
>> On Apr 25, 2016, at 11:22 AM, Piyush Shrivastava <piyushjoy@yahoo.co.in <ma...@yahoo.co.in>> wrote:
>> 
>> Hi all,
>> I want to implement a custom Trigger which fired a GlobalWindow in 1 minute for the first time and every 20 seconds after that.
>> I believe I cannot get this logic right in the implementation of my custom Trigger. Please help me with this.
>> 
>> Here is the code of my custom Trigger:
>> 
>> public class TradeTrigger<W extends Window> extends Trigger<Object, W> {
>> 
>>     /**
>>      * 
>>      */
>>     private static final long serialVersionUID = 1L;
>>       
>>     private TradeTrigger() {
>>     }
>>     
>>     @Override
>>     public TriggerResult onElement(
>>             Object element,
>>             long timestamp,
>>             W window,
>>             org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext ctx)
>>             throws Exception {
>>         
>>         ctx.registerEventTimeTimer(timestamp);
>>         return TriggerResult.CONTINUE;
>>         
>>     }
>> 
>>     @Override
>>     public TriggerResult onEventTime(
>>             long timestamp,
>>             W window,
>>             org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext ctx)
>>             throws Exception {
>>     
>>     ValueState<Boolean> state = ctx.getPartitionedState(new ValueStateDescriptor<Boolean>("flag", Boolean.TYPE, false));
>>         
>>         if(state.value()==false){
>>             ctx.registerEventTimeTimer(timestamp+60000);
>>             state.update(true);
>>             return TriggerResult.FIRE;
>>         }else{
>>             System.out.println(""+state.value());
>>             ctx.registerEventTimeTimer(timestamp+20000);
>>             return TriggerResult.FIRE;
>>         }
>>     }
>> 
>>     @Override
>>     public TriggerResult onProcessingTime(
>>             long arg0,
>>             W arg1,
>>             org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext arg2)
>>             throws Exception {
>>         // TODO Auto-generated method stub
>>         return TriggerResult.CONTINUE;
>>     }
>>     
>>      
>>     public static <W extends Window> TradeTrigger<W> of() {
>>         return new TradeTrigger<>();
>>     }
>> 
>>     
>> }
>>  
>> Thanks and Regards,
>> Piyush Shrivastava <ma...@webograffiti.com>
>> 
>> http://webograffiti.com <http://webograffiti.com/>
> 
> 
> 
> 


Re: Custom Trigger Implementation

Posted by Piyush Shrivastava <pi...@yahoo.co.in>.
Thanks a lot Kostas. This solved my problem. Thanks and Regards,Piyush Shrivastava
http://webograffiti.com
 

    On Monday, 25 April 2016 3:27 PM, Kostas Kloudas <k....@data-artisans.com> wrote:
 

 Hi,
Let me also add that you should also override the clear() method in order to clear you state.and delete the pending timers.
Kostas

On Apr 25, 2016, at 11:52 AM, Kostas Kloudas <k....@data-artisans.com> wrote:

Hi Piyush,
In the onElement function, you register a timer every time you receive an element. 
When the next watermark arrives, in the flag==false case, this will lead to every element adding a timer for its timestamp+60000ms. The same for flag==true case, with 20000ms interval.
What you can try is to set only once, at the first element the initial trigger for 60 sec, and then just set all the rest in the on the onEventTime with 20 sec.
To have a look at a custom trigger you can look at here:https://github.com/kl0u/flink-examples/blob/master/src/main/java/com/dataartisans/flinksolo/beam_comparison/customTriggers/EventTimeTriggerWithEarlyAndLateFiring.java
I hope this helped.Let me know if you need any help.
Kostas

On Apr 25, 2016, at 11:22 AM, Piyush Shrivastava <pi...@yahoo.co.in> wrote:
Hi all,I want to implement a custom Trigger which fired a GlobalWindow in 1 minute for the first time and every 20 seconds after that.I believe I cannot get this logic right in the implementation of my custom Trigger. Please help me with this.
Here is the code of my custom Trigger:

public class TradeTrigger<W extends Window> extends Trigger<Object, W> {

    /**
     * 
     */
    private static final long serialVersionUID = 1L;
      
    private TradeTrigger() {
    }
    
    @Override
    public TriggerResult onElement(
            Object element,
            long timestamp,
            W window,
            org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext ctx)
            throws Exception {
        
        ctx.registerEventTimeTimer(timestamp);
        return TriggerResult.CONTINUE;
        
    }

    @Override
    public TriggerResult onEventTime(
            long timestamp,
            W window,
            org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext ctx)
            throws Exception {
    
    ValueState<Boolean> state = ctx.getPartitionedState(new ValueStateDescriptor<Boolean>("flag", Boolean.TYPE, false));
        
        if(state.value()==false){
            ctx.registerEventTimeTimer(timestamp+60000);
            state.update(true);
            return TriggerResult.FIRE;
        }else{
            System.out.println(""+state.value());
            ctx.registerEventTimeTimer(timestamp+20000);
            return TriggerResult.FIRE;
        }
    }

    @Override
    public TriggerResult onProcessingTime(
            long arg0,
            W arg1,
            org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext arg2)
            throws Exception {
        // TODO Auto-generated method stub
        return TriggerResult.CONTINUE;
    }
    
     
    public static <W extends Window> TradeTrigger<W> of() {
        return new TradeTrigger<>();
    }

    
}

 Thanks and Regards,Piyush Shrivastava
http://webograffiti.com





  

Re: Custom Trigger Implementation

Posted by Kostas Kloudas <k....@data-artisans.com>.
Hi,

Let me also add that you should also override the clear() method in order to clear you state.
and delete the pending timers.

Kostas

> On Apr 25, 2016, at 11:52 AM, Kostas Kloudas <k....@data-artisans.com> wrote:
> 
> Hi Piyush,
> 
> In the onElement function, you register a timer every time you receive an element. 
> 
> When the next watermark arrives, in the flag==false case, this will lead to every element 
> adding a timer for its timestamp+60000ms. The same for flag==true case, with 20000ms interval.
> 
> What you can try is to set only once, at the first element the initial trigger for 60 sec, and then 
> just set all the rest in the on the onEventTime with 20 sec.
> 
> To have a look at a custom trigger you can look at here:
> https://github.com/kl0u/flink-examples/blob/master/src/main/java/com/dataartisans/flinksolo/beam_comparison/customTriggers/EventTimeTriggerWithEarlyAndLateFiring.java <https://github.com/kl0u/flink-examples/blob/master/src/main/java/com/dataartisans/flinksolo/beam_comparison/customTriggers/EventTimeTriggerWithEarlyAndLateFiring.java>
> 
> I hope this helped.
> Let me know if you need any help.
> 
> Kostas
> 
>> On Apr 25, 2016, at 11:22 AM, Piyush Shrivastava <piyushjoy@yahoo.co.in <ma...@yahoo.co.in>> wrote:
>> 
>> Hi all,
>> I want to implement a custom Trigger which fired a GlobalWindow in 1 minute for the first time and every 20 seconds after that.
>> I believe I cannot get this logic right in the implementation of my custom Trigger. Please help me with this.
>> 
>> Here is the code of my custom Trigger:
>> 
>> public class TradeTrigger<W extends Window> extends Trigger<Object, W> {
>> 
>>     /**
>>      * 
>>      */
>>     private static final long serialVersionUID = 1L;
>>       
>>     private TradeTrigger() {
>>     }
>>     
>>     @Override
>>     public TriggerResult onElement(
>>             Object element,
>>             long timestamp,
>>             W window,
>>             org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext ctx)
>>             throws Exception {
>>         
>>         ctx.registerEventTimeTimer(timestamp);
>>         return TriggerResult.CONTINUE;
>>         
>>     }
>> 
>>     @Override
>>     public TriggerResult onEventTime(
>>             long timestamp,
>>             W window,
>>             org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext ctx)
>>             throws Exception {
>>     
>>     ValueState<Boolean> state = ctx.getPartitionedState(new ValueStateDescriptor<Boolean>("flag", Boolean.TYPE, false));
>>         
>>         if(state.value()==false){
>>             ctx.registerEventTimeTimer(timestamp+60000);
>>             state.update(true);
>>             return TriggerResult.FIRE;
>>         }else{
>>             System.out.println(""+state.value());
>>             ctx.registerEventTimeTimer(timestamp+20000);
>>             return TriggerResult.FIRE;
>>         }
>>     }
>> 
>>     @Override
>>     public TriggerResult onProcessingTime(
>>             long arg0,
>>             W arg1,
>>             org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext arg2)
>>             throws Exception {
>>         // TODO Auto-generated method stub
>>         return TriggerResult.CONTINUE;
>>     }
>>     
>>      
>>     public static <W extends Window> TradeTrigger<W> of() {
>>         return new TradeTrigger<>();
>>     }
>> 
>>     
>> }
>>  
>> Thanks and Regards,
>> Piyush Shrivastava <ma...@webograffiti.com>
>> 
>> http://webograffiti.com <http://webograffiti.com/>
> 


Re: Custom Trigger Implementation

Posted by Kostas Kloudas <k....@data-artisans.com>.
Hi Piyush,

In the onElement function, you register a timer every time you receive an element. 

When the next watermark arrives, in the flag==false case, this will lead to every element 
adding a timer for its timestamp+60000ms. The same for flag==true case, with 20000ms interval.

What you can try is to set only once, at the first element the initial trigger for 60 sec, and then 
just set all the rest in the on the onEventTime with 20 sec.

To have a look at a custom trigger you can look at here:
https://github.com/kl0u/flink-examples/blob/master/src/main/java/com/dataartisans/flinksolo/beam_comparison/customTriggers/EventTimeTriggerWithEarlyAndLateFiring.java <https://github.com/kl0u/flink-examples/blob/master/src/main/java/com/dataartisans/flinksolo/beam_comparison/customTriggers/EventTimeTriggerWithEarlyAndLateFiring.java>

I hope this helped.
Let me know if you need any help.

Kostas

> On Apr 25, 2016, at 11:22 AM, Piyush Shrivastava <pi...@yahoo.co.in> wrote:
> 
> Hi all,
> I want to implement a custom Trigger which fired a GlobalWindow in 1 minute for the first time and every 20 seconds after that.
> I believe I cannot get this logic right in the implementation of my custom Trigger. Please help me with this.
> 
> Here is the code of my custom Trigger:
> 
> public class TradeTrigger<W extends Window> extends Trigger<Object, W> {
> 
>     /**
>      * 
>      */
>     private static final long serialVersionUID = 1L;
>       
>     private TradeTrigger() {
>     }
>     
>     @Override
>     public TriggerResult onElement(
>             Object element,
>             long timestamp,
>             W window,
>             org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext ctx)
>             throws Exception {
>         
>         ctx.registerEventTimeTimer(timestamp);
>         return TriggerResult.CONTINUE;
>         
>     }
> 
>     @Override
>     public TriggerResult onEventTime(
>             long timestamp,
>             W window,
>             org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext ctx)
>             throws Exception {
>     
>     ValueState<Boolean> state = ctx.getPartitionedState(new ValueStateDescriptor<Boolean>("flag", Boolean.TYPE, false));
>         
>         if(state.value()==false){
>             ctx.registerEventTimeTimer(timestamp+60000);
>             state.update(true);
>             return TriggerResult.FIRE;
>         }else{
>             System.out.println(""+state.value());
>             ctx.registerEventTimeTimer(timestamp+20000);
>             return TriggerResult.FIRE;
>         }
>     }
> 
>     @Override
>     public TriggerResult onProcessingTime(
>             long arg0,
>             W arg1,
>             org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext arg2)
>             throws Exception {
>         // TODO Auto-generated method stub
>         return TriggerResult.CONTINUE;
>     }
>     
>      
>     public static <W extends Window> TradeTrigger<W> of() {
>         return new TradeTrigger<>();
>     }
> 
>     
> }
>  
> Thanks and Regards,
> Piyush Shrivastava <ma...@webograffiti.com>
> 
> http://webograffiti.com <http://webograffiti.com/>