You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/03/31 08:58:53 UTC

[GitHub] [flink] SteNicholas commented on a change in pull request #19259: [FLINK-23890] impove cep operator timer register policy

SteNicholas commented on a change in pull request #19259:
URL: https://github.com/apache/flink/pull/19259#discussion_r839347821



##########
File path: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFAState.java
##########
@@ -97,6 +100,18 @@ public void setNewPartialMatches(PriorityQueue<ComputationState> newPartialMatch
         this.partialMatches = newPartialMatches;
     }
 
+    public boolean isNewStartPartialMatch() {
+        return isNewStartPartialMatch;
+    }
+
+    public void resetNewStartPartialMatch() {

Review comment:
       Do the `resetNewStartPartialMatch`  and `setNewStartPartiailMatch` combine to one method?

##########
File path: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CepOperator.java
##########
@@ -417,6 +405,9 @@ private void processEvent(NFAState nfaState, IN event, long timestamp) throws Ex
                             timestamp,
                             afterMatchSkipStrategy,
                             cepTimerService);
+            if (nfa.getWindowTime() > 0 && nfaState.isNewStartPartialMatch()) {
+                registerTimer(timestamp + nfa.getWindowTime());

Review comment:
       If the `windowTime` of the `NFA` equals to 0, should this registers a time for {@code current watermark + 1}?

##########
File path: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
##########
@@ -333,6 +337,10 @@ private boolean isStateTimedOut(final ComputationState state, final long timesta
             boolean shouldDiscardPath = false;
             for (final ComputationState newComputationState : newComputationStates) {
 
+                if (isStartState(computationState) && newComputationState.getStartTimestamp() > 0) {
+                    nfaState.setNewStartPartiailMatch();

Review comment:
       If the `computationState` is in start state and the start timestamp equal to 0, what's the behaivor of this situation?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org