You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "jinguishi (JIRA)" <ji...@apache.org> on 2019/07/23 10:29:00 UTC
[jira] [Updated] (FLINK-13383) Customize the problem in the trigger
[ https://issues.apache.org/jira/browse/FLINK-13383?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
jinguishi updated FLINK-13383:
------------------------------
Priority: Blocker (was: Major)
> Customize the problem in the trigger
> ------------------------------------
>
> Key: FLINK-13383
> URL: https://issues.apache.org/jira/browse/FLINK-13383
> Project: Flink
> Issue Type: Bug
> Components: API / DataStream
> Affects Versions: 1.8.0
> Environment: The development environment is idea, the flink version is 1.8
> Reporter: jinguishi
> Priority: Blocker
> Fix For: 1.8.0
>
> Attachments: WX20190723-174236.png, WechatIMG2.png
>
>
> Using a Tumbling time window, I created a time-based and counter trigger. The parameters in the onElement method TriggerContext, ctx.getCurrentWatermark(), get negative values,
> There are screenshots in the attachment。
> code show as below
> {code:java}
> public class CountTrigger extends Trigger<Object, TimeWindow> {
> private static final long serialVersionUID = 1L;
> private CountTrigger(int count) {
> this.threshold = count;
> }
> private int count = 0;
> private int threshold;
> @Override
> public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) {
> long watermark = ctx.getCurrentWatermark();
> ctx.registerEventTimeTimer(window.maxTimestamp());
> if (count > threshold) {
> count = 0;
> return TriggerResult.FIRE;
> } else {
> count++;
> }
> System.out.println("onElement: " + element);
> return TriggerResult.CONTINUE;
> }
> @Override
> public TriggerResult onEventTime(long time, TimeWindow window, Trigger.TriggerContext ctx) throws Exception {
> return TriggerResult.CONTINUE;
> }
> @Override
> public TriggerResult onProcessingTime(long time, TimeWindow window, Trigger.TriggerContext ctx) throws Exception {
> return TriggerResult.FIRE;
> }
> @Override
> public void clear(TimeWindow window, Trigger.TriggerContext ctx) throws Exception {
> ctx.deleteProcessingTimeTimer(window.maxTimestamp());
> }
> @Override
> public String toString() {
> return "CountTrigger";
> }
> public static <W extends Window> CountTrigger of(int threshold) {
> return new CountTrigger(threshold);
> }
> }
> {code}
--
This message was sent by Atlassian JIRA
(v7.6.14#76016)