You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by "starmilkxin (via GitHub)" <gi...@apache.org> on 2023/05/13 14:03:51 UTC

[GitHub] [rocketmq-streams] starmilkxin opened a new pull request, #291: [ISSUE #290] Change the logic of watermark judging whether to solve the data

starmilkxin opened a new pull request, #291:
URL: https://github.com/apache/rocketmq-streams/pull/291

   [ISSUE #290](https://github.com/apache/rocketmq-streams/issues/290)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq-streams] starmilkxin closed pull request #291: [ISSUE #290] Change the logic of watermark judging whether to solve the data

Posted by "starmilkxin (via GitHub)" <gi...@apache.org>.
starmilkxin closed pull request #291: [ISSUE #290] Change the logic of watermark judging whether to solve the data
URL: https://github.com/apache/rocketmq-streams/pull/291


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq-streams] ni-ze commented on a diff in pull request #291: [ISSUE #290] Change the logic of watermark judging whether to solve the data

Posted by "ni-ze (via GitHub)" <gi...@apache.org>.
ni-ze commented on code in PR #291:
URL: https://github.com/apache/rocketmq-streams/pull/291#discussion_r1193273452


##########
core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/WindowAccumulatorSupplier.java:
##########
@@ -120,17 +120,25 @@ public void process(V data) throws Throwable {
             K key = this.context.getKey();
             long time = this.context.getDataTime();
 
+            long sizeInterval = windowInfo.getWindowSize().toMillSecond();
+            long slideInterval = windowInfo.getWindowSlide().toMillSecond();
+            long lastWindowStart = time - (time + slideInterval) % slideInterval;
+            long lastWindowEnd = lastWindowStart + sizeInterval;
             long watermark = this.watermark(time - allowDelay, stateTopicMessageQueue);
-            if (time < watermark) {
+            if (lastWindowEnd < watermark) {

Review Comment:
   If we commit watermark after window fired instead of here, the above is right.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org