You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "jinguishi (JIRA)" <ji...@apache.org> on 2019/07/23 10:29:00 UTC

[jira] [Updated] (FLINK-13383) Customize the problem in the trigger

     [ https://issues.apache.org/jira/browse/FLINK-13383?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

jinguishi updated FLINK-13383:
------------------------------
    Priority: Blocker  (was: Major)

> Customize the problem in the trigger
> ------------------------------------
>
>                 Key: FLINK-13383
>                 URL: https://issues.apache.org/jira/browse/FLINK-13383
>             Project: Flink
>          Issue Type: Bug
>          Components: API / DataStream
>    Affects Versions: 1.8.0
>         Environment: The development environment is idea, the flink version is 1.8
>            Reporter: jinguishi
>            Priority: Blocker
>             Fix For: 1.8.0
>
>         Attachments: WX20190723-174236.png, WechatIMG2.png
>
>
> Using a Tumbling time window, I created a time-based and counter trigger. The parameters in the onElement method TriggerContext, ctx.getCurrentWatermark(), get negative values, 
> There are screenshots in the attachment。
> code show as below
> {code:java}
> public class CountTrigger extends Trigger<Object, TimeWindow> {
>     private static final long serialVersionUID = 1L;
>     private CountTrigger(int count) {
>         this.threshold = count;
>     }
>     private int count = 0;
>     private int threshold;
>     @Override
>     public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) {
>         long watermark = ctx.getCurrentWatermark();
>         ctx.registerEventTimeTimer(window.maxTimestamp());
>         if (count > threshold) {
>             count = 0;
>             return TriggerResult.FIRE;
>         } else {
>             count++;
>         }
>         System.out.println("onElement: " + element);
>         return TriggerResult.CONTINUE;
>     }
>     @Override
>     public TriggerResult onEventTime(long time, TimeWindow window, Trigger.TriggerContext ctx) throws Exception {
>         return TriggerResult.CONTINUE;
>     }
>     @Override
>     public TriggerResult onProcessingTime(long time, TimeWindow window, Trigger.TriggerContext ctx) throws Exception {
>         return TriggerResult.FIRE;
>     }
>     @Override
>     public void clear(TimeWindow window, Trigger.TriggerContext ctx) throws Exception {
>         ctx.deleteProcessingTimeTimer(window.maxTimestamp());
>     }
>     @Override
>     public String toString() {
>         return "CountTrigger";
>     }
>     public static <W extends Window> CountTrigger of(int threshold) {
>         return new CountTrigger(threshold);
>     }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)