You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by "starmilkxin (via GitHub)" <gi...@apache.org> on 2023/05/22 15:11:14 UTC

[GitHub] [rocketmq-streams] starmilkxin opened a new pull request, #296: Add watermark in IdleWindowScaner

starmilkxin opened a new pull request, #296:
URL: https://github.com/apache/rocketmq-streams/pull/296

   fire idleWindow according to watermark and windowEnd


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq-streams] ni-ze closed pull request #296: Add watermark in IdleWindowScaner

Posted by "ni-ze (via GitHub)" <gi...@apache.org>.
ni-ze closed pull request #296: Add watermark in IdleWindowScaner
URL: https://github.com/apache/rocketmq-streams/pull/296


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq-streams] starmilkxin commented on pull request #296: Add watermark in IdleWindowScaner

Posted by "starmilkxin (via GitHub)" <gi...@apache.org>.
starmilkxin commented on PR #296:
URL: https://github.com/apache/rocketmq-streams/pull/296#issuecomment-1630287692

   I did some simple tests, and the windowEnd will change with the arrival of data.
   Here the sessionTimeout of sessionWindow is 5 seconds. Each message sent causes it to keep updating windowEnd. Pause for 4 seconds after each message is sent, a total of 5 pieces of data are sent, and it takes 4 * 4 + 5 = 21 seconds.
   
   ```java
   public class UserProducer {
       private static final String topic = "windowCount";
       private static final Random random = new Random();
       public static void main(String[] args) throws Exception {
   
           DefaultMQProducer producer = new DefaultMQProducer("Pro_Group");
   
           producer.setNamesrvAddr("localhost:9876");
   
           producer.start();
   
           long time = 4000000000000L;
   
           for (int i = 0; i < 5; i++) {
               User user = new User("小红" + i, 13, time + 4000*i);
               byte[] body = JSON.toJSONBytes(user);
               Message msg = new Message(topic, "", body);
               SendResult sendResult = producer.send(msg);
               System.out.printf("%s\n", user);
               Thread.sleep(4000);
           }
   
           //Shut down once the producer instance is not longer in use.
           producer.shutdown();
       }
   }
   
   User{name='小红0', age=13, timeStamp=4000000000000}
   User{name='小红1', age=13, timeStamp=4000000004000}
   User{name='小红2', age=13, timeStamp=4000000008000}
   User{name='小红3', age=13, timeStamp=4000000012000}
   User{name='小红4', age=13, timeStamp=4000000016000}
   ```
   
   ```java
   // AccumulatorSessionWindowFire.fire
                   logger.info("fire session,windowKey={}, search keyPrefix={}, window: [{} - {}]",
                           windowKey, state.getKey().toString(), Utils.format(windowBegin), Utils.format(windowEnd));
   
   2023-07-11 00:27:49.121 [ScanIdleWindowThread_1] INFO  o.a.r.s.c.window.fire.AccumulatorSessionWindowFire - fire session,windowKey=sessionWindowCount-ROCKETMQ-COUNT-00004@SessionWindowAccumulatorProcessor&&31a7fab88e791062b8f7a30640648be6&&1689006466145&&1689006445131, search keyPrefix=13, window: [2023-07-11 00:27:25 - 2023-07-11 00:27:46]
   ```
   
   ```java
   // PrintSupplier.process
                   String template = "(key=%s, value=%s)";
   
                   Data<Object, T> result = new Data<>(this.context.getKey(), data, this.context.getDataTime(), header);
                   String format = String.format(template, result.getKey(), data.toString());
   
                   System.out.println(format);
   
   
   [2023-07-11 00:27:25 - 2023-07-11 00:27:46](key=13, value=5)
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq-streams] ni-ze commented on pull request #296: Add watermark in IdleWindowScaner

Posted by "ni-ze (via GitHub)" <gi...@apache.org>.
ni-ze commented on PR #296:
URL: https://github.com/apache/rocketmq-streams/pull/296#issuecomment-1615838163

   I am ok with that, but the sessionTimeOut is work with the session window, this kind of window has not windowEnd.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq-streams] ni-ze merged pull request #296: Add watermark in IdleWindowScaner

Posted by "ni-ze (via GitHub)" <gi...@apache.org>.
ni-ze merged PR #296:
URL: https://github.com/apache/rocketmq-streams/pull/296


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org