You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by "ni-ze (via GitHub)" <gi...@apache.org> on 2023/05/15 02:30:28 UTC

[GitHub] [rocketmq-streams] ni-ze commented on a diff in pull request #291: [ISSUE #290] Change the logic of watermark judging whether to solve the data

ni-ze commented on code in PR #291:
URL: https://github.com/apache/rocketmq-streams/pull/291#discussion_r1193273452


##########
core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/WindowAccumulatorSupplier.java:
##########
@@ -120,17 +120,25 @@ public void process(V data) throws Throwable {
             K key = this.context.getKey();
             long time = this.context.getDataTime();
 
+            long sizeInterval = windowInfo.getWindowSize().toMillSecond();
+            long slideInterval = windowInfo.getWindowSlide().toMillSecond();
+            long lastWindowStart = time - (time + slideInterval) % slideInterval;
+            long lastWindowEnd = lastWindowStart + sizeInterval;
             long watermark = this.watermark(time - allowDelay, stateTopicMessageQueue);
-            if (time < watermark) {
+            if (lastWindowEnd < watermark) {

Review Comment:
   If we commit watermark after window fired instead of here, the above is right.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org