You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "xljtswf (Jira)" <ji...@apache.org> on 2022/11/25 08:45:00 UTC

[jira] [Created] (FLINK-30208) avoid unconditional state update in CountTrigger#onElement

xljtswf created FLINK-30208:
-------------------------------

             Summary: avoid unconditional state update in CountTrigger#onElement
                 Key: FLINK-30208
                 URL: https://issues.apache.org/jira/browse/FLINK-30208
             Project: Flink
          Issue Type: Improvement
          Components: API / DataStream
            Reporter: xljtswf


In current CountTrigger#onElement, when one element is received, the state is updated unconditionally, and we then fetch the state again to check whether we need to clear the state. This implies we may update the state 2 times to process one element. I suppose to make following simplification:

    public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx)
            throws Exception {
        TriggerResult triggerResult;
        if (maxCount > 1) {
            ReducingState<Long> countState = ctx.getPartitionedState(stateDesc);
            Long currentCount = countState.get();
            if (currentCount == null || currentCount < maxCount - 1) {
                countState.add(1L);
                triggerResult = TriggerResult.CONTINUE;
            } else {
                countState.clear();
                triggerResult = TriggerResult.FIRE;
            }
        } else {
            triggerResult = TriggerResult.FIRE;
        }
        return triggerResult;
    }

If this is approved, I will make a pr then.
Thanks!

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)