You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by GitBox <gi...@apache.org> on 2022/01/20 09:08:40 UTC

[GitHub] [rocketmq] duhenglucky commented on a change in pull request #3458: [ISSUE #3449] Delayed message supports asynchronous delivery

duhenglucky commented on a change in pull request #3458:
URL: https://github.com/apache/rocketmq/pull/3458#discussion_r788540984



##########
File path: store/src/test/java/org/apache/rocketmq/store/ScheduleMessageServiceTest.java
##########
@@ -71,14 +91,104 @@ public void testCorrectDelayOffset_whenInit() throws Exception {
 
     }
 
-    private MessageStore buildMessageStore() throws Exception {
+    private MessageStoreConfig buildMessageStoreConfig() throws Exception {
         MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
         messageStoreConfig.setMappedFileSizeCommitLog(1024 * 1024 * 10);
         messageStoreConfig.setMappedFileSizeConsumeQueue(1024 * 1024 * 10);
         messageStoreConfig.setMaxHashSlotNum(10000);
         messageStoreConfig.setMaxIndexNum(100 * 100);
         messageStoreConfig.setFlushDiskType(FlushDiskType.SYNC_FLUSH);
         messageStoreConfig.setFlushIntervalConsumeQueue(1);
-        return new DefaultMessageStore(messageStoreConfig, new BrokerStatsManager("simpleTest", true), null, new BrokerConfig());
+        return messageStoreConfig;
+    }
+
+    @Test
+    public void testHandlePutResultTask() throws Exception {
+        DefaultMessageStore messageStore = mock(DefaultMessageStore.class);
+        MessageStoreConfig config = buildMessageStoreConfig();
+        config.setEnableScheduleMessageStats(false);
+        config.setEnableScheduleAsyncDeliver(true);
+        when(messageStore.getMessageStoreConfig()).thenReturn(config);
+        ScheduleMessageService scheduleMessageService = new ScheduleMessageService(messageStore);
+        scheduleMessageService.parseDelayLevel();
+
+        Field field = scheduleMessageService.getClass().getDeclaredField("deliverPendingTable");
+        field.setAccessible(true);
+        Map<Integer /* level */, LinkedBlockingQueue<ScheduleMessageService.PutResultProcess>> deliverPendingTable =
+            (Map<Integer, LinkedBlockingQueue<ScheduleMessageService.PutResultProcess>>) field.get(scheduleMessageService);
+
+        field = scheduleMessageService.getClass().getDeclaredField("offsetTable");
+        field.setAccessible(true);
+        ConcurrentMap<Integer /* level */, Long/* offset */> offsetTable =
+            (ConcurrentMap<Integer /* level */, Long/* offset */>) field.get(scheduleMessageService);
+        for (int i = 1; i <= scheduleMessageService.getMaxDelayLevel(); i++) {
+            offsetTable.put(i, 0L);
+        }
+
+        int deliverThreadPoolNums = Runtime.getRuntime().availableProcessors();

Review comment:
       Is it better to set _deliverThreadPoolNums_ to _delaylevel_?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org