You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/03/31 08:58:53 UTC
[GitHub] [flink] SteNicholas commented on a change in pull request #19259: [FLINK-23890] impove cep operator timer register policy
SteNicholas commented on a change in pull request #19259:
URL: https://github.com/apache/flink/pull/19259#discussion_r839347821
##########
File path: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFAState.java
##########
@@ -97,6 +100,18 @@ public void setNewPartialMatches(PriorityQueue<ComputationState> newPartialMatch
this.partialMatches = newPartialMatches;
}
+ public boolean isNewStartPartialMatch() {
+ return isNewStartPartialMatch;
+ }
+
+ public void resetNewStartPartialMatch() {
Review comment:
Do the `resetNewStartPartialMatch` and `setNewStartPartiailMatch` combine to one method?
##########
File path: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CepOperator.java
##########
@@ -417,6 +405,9 @@ private void processEvent(NFAState nfaState, IN event, long timestamp) throws Ex
timestamp,
afterMatchSkipStrategy,
cepTimerService);
+ if (nfa.getWindowTime() > 0 && nfaState.isNewStartPartialMatch()) {
+ registerTimer(timestamp + nfa.getWindowTime());
Review comment:
If the `windowTime` of the `NFA` equals to 0, should this registers a time for {@code current watermark + 1}?
##########
File path: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
##########
@@ -333,6 +337,10 @@ private boolean isStateTimedOut(final ComputationState state, final long timesta
boolean shouldDiscardPath = false;
for (final ComputationState newComputationState : newComputationStates) {
+ if (isStartState(computationState) && newComputationState.getStartTimestamp() > 0) {
+ nfaState.setNewStartPartiailMatch();
Review comment:
If the `computationState` is in start state and the start timestamp equal to 0, what's the behaivor of this situation?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org