You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by "complone (via GitHub)" <gi...@apache.org> on 2023/02/14 02:46:11 UTC

[GitHub] [rocketmq-spring] complone opened a new pull request, #526: feat: batch message support

complone opened a new pull request, #526:
URL: https://github.com/apache/rocketmq-spring/pull/526

   ## What is the purpose of the change
   
   related to [issues-502](https://github.com/apache/rocketmq-spring/issues/502)
   ## Brief changelog
   
   XX
   
   ## Verifying this change
   
   XXXX
   
   Follow this checklist to help us incorporate your contribution quickly and easily. Notice, `it would be helpful if you could finish the following 5 checklist(the last one is not necessary)before request the community to review your PR`.
   
   - [ ] Make sure there is a [Github issue](https://github.com/apache/rocketmq/issues) filed for the change (usually before you start working on it). Trivial changes like typos do not require a Github issue. Your pull request should address just this issue, without pulling in other changes - one PR resolves one issue. 
   - [ ] Format the pull request title like `[ISSUE #123] Fix UnknownException when host config not exist`. Each commit in the pull request should have a meaningful subject line and body.
   - [ ] Write a pull request description that is detailed enough to understand what the pull request does, how, and why.
   - [ ] Write necessary unit-test(over 80% coverage) to verify your logic correction, more mock a little better when cross module dependency exist. 
   - [ ] Run `mvn -B clean apache-rat:check findbugs:findbugs checkstyle:checkstyle` to make sure basic checks pass. Run `mvn clean install -DskipITs` to make sure unit-test pass. Run `mvn clean test-compile failsafe:integration-test`  to make sure integration-test pass.
   - [ ] If this contribution is large, please file an [Apache Individual Contributor License Agreement](http://www.apache.org/licenses/#clas).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq-spring] complone commented on pull request #526: [ISSUE#502] batch message support

Posted by "complone (via GitHub)" <gi...@apache.org>.
complone commented on PR #526:
URL: https://github.com/apache/rocketmq-spring/pull/526#issuecomment-1438081158

   In the unit test, testBatchGetMessages can be used as an example. This week I will add


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq-spring] complone commented on pull request #526: [ISSUE#502] batch message support

Posted by "complone (via GitHub)" <gi...@apache.org>.
complone commented on PR #526:
URL: https://github.com/apache/rocketmq-spring/pull/526#issuecomment-1430653103

   @aaron-ai @RongtongJin Can you help me start ci?
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq-spring] complone commented on a diff in pull request #526: [ISSUE#502] batch message support

Posted by "complone (via GitHub)" <gi...@apache.org>.
complone commented on code in PR #526:
URL: https://github.com/apache/rocketmq-spring/pull/526#discussion_r1117857576


##########
rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainerTest.java:
##########
@@ -256,17 +276,171 @@ public void testSetRocketMQMessageListener() {
         assertEquals(anno.instanceName(), container.getInstanceName());
     }
 
+    @Test
+    public void testSelectorType() throws Exception {
+        DefaultRocketMQListenerContainer listenerContainer = new DefaultRocketMQListenerContainer();
+        listenerContainer.setConsumer(new DefaultMQPushConsumer());
+        Method initSelectorType = DefaultRocketMQListenerContainer.class.getDeclaredMethod("initSelectorType");
+        initSelectorType.setAccessible(true);
+
+        try {
+            listenerContainer.setRocketMQMessageListener(TagClass.class.getAnnotation(RocketMQMessageListener.class));
+            initSelectorType.invoke(listenerContainer);
+
+            listenerContainer.setRocketMQMessageListener(SQL92Class.class.getAnnotation(RocketMQMessageListener.class));
+            initSelectorType.invoke(listenerContainer);
+        } catch (Exception e) {
+
+        }
+    }
+
+    @Test
+    public void testBatchGetMessages() throws Exception {
+        DefaultRocketMQListenerContainer listenerContainer = new DefaultRocketMQListenerContainer();
+        listenerContainer.setConsumer(new DefaultMQPushConsumer());
+        Field messageType = DefaultRocketMQListenerContainer.class.getDeclaredField("messageType");
+        messageType.setAccessible(true);
+
+        ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
+            @Override
+            public Thread newThread(Runnable r) {
+                return new Thread(r, "TestScheduledThread");
+            }
+        });
+        Runnable r = () -> {
+            try {
+                MessageExt msg = new MessageExt();
+                msg.setMsgId("X_SVEN_AUGUSTUS_0001");
+                msg.setBody((exceptedString + " 1").getBytes(listenerContainer.getCharset()));
+                MessageExt msg2 = new MessageExt();
+                msg2.setMsgId("X_SVEN_AUGUSTUS_0002");
+                msg2.setBody((exceptedString + " 2").getBytes(listenerContainer.getCharset()));
+                List<MessageExt> messages = Arrays.asList(msg, msg2);
+
+                MessageListener l = listenerContainer.getConsumer().getMessageListener();
+                if (l instanceof MessageListenerConcurrently) {
+                    ((MessageListenerConcurrently) l).consumeMessage(messages, new ConsumeConcurrentlyContext(new MessageQueue()));
+                }
+                if (l instanceof MessageListenerOrderly) {
+                    ((MessageListenerOrderly) l).consumeMessage(messages, new ConsumeOrderlyContext(new MessageQueue()));
+                }
+            } catch (UnsupportedEncodingException e) {
+                e.printStackTrace();
+            }
+        };
+        messageType.set(listenerContainer, String.class);
+
+        // RocketMQBatchListener IN ConsumeMode.CONCURRENTLY, AND test for excepted
+        tryRocketMQBatchListener(listenerContainer, ConcurrentlyClass.class, scheduledExecutorService, r, exceptedString, true);// excepted
+
+        // RocketMQBatchListener IN ConsumeMode.CONCURRENTLY, AND test for not excepted
+        tryRocketMQBatchListener(listenerContainer, ConcurrentlyClass.class, scheduledExecutorService, r, notExceptedString, false);// not excepted
+
+        // RocketMQBatchListener IN ConsumeMode.ORDERLY, AND test for excepted
+        tryRocketMQBatchListener(listenerContainer, OrderlyClass.class, scheduledExecutorService, r, exceptedString, true);// excepted
+
+        // RocketMQBatchListener IN ConsumeMode.ORDERLY, AND test for not excepted
+        tryRocketMQBatchListener(listenerContainer, OrderlyClass.class, scheduledExecutorService, r, notExceptedString, false);// not excepted
+
+        // RocketMQListener IN ConsumeMode.CONCURRENTLY, AND test for excepted
+        tryRocketMQListener(listenerContainer, ConcurrentlyClass.class, scheduledExecutorService, r, exceptedString, true);// excepted
+
+        // RocketMQListener IN ConsumeMode.CONCURRENTLY, AND test for not excepted
+        tryRocketMQListener(listenerContainer, ConcurrentlyClass.class, scheduledExecutorService, r, notExceptedString, false);// not excepted
+
+        // RocketMQListener IN ConsumeMode.ORDERLY, AND test for excepted
+        tryRocketMQListener(listenerContainer, OrderlyClass.class, scheduledExecutorService, r, exceptedString, true);// excepted
+
+        // RocketMQListener IN ConsumeMode.ORDERLY, AND test for not excepted
+        tryRocketMQListener(listenerContainer, OrderlyClass.class, scheduledExecutorService, r, notExceptedString, false);// not excepted
+    }
+
+    private void tryRocketMQBatchListener(DefaultRocketMQListenerContainer listenerContainer,
+                                          final Class<?> rocketMQMessageListenerClass,
+                                          ScheduledExecutorService scheduledExecutorService, Runnable r, final String exceptedValue,
+                                          boolean exceptedTrueOrFalse) throws InvocationTargetException, IllegalAccessException, NoSuchMethodException, InterruptedException {
+        Method initConsumeMode = DefaultRocketMQListenerContainer.class.getDeclaredMethod("initConsumeMode");
+        initConsumeMode.setAccessible(true);
+
+        final Boolean[] result = new Boolean[] {Boolean.FALSE};
+
+        // RocketMQBatchListener IN ConsumeMode.CONCURRENTLY, AND test for excepted
+        final CountDownLatch countDownLatch = new CountDownLatch(1);
+        listenerContainer.setRocketMQBatchListener(new RocketMQBatchListener<String>() {
+            @Override
+            public void onMessages(List<String> messages) {
+                result[0] = messages.stream().anyMatch(m -> m.startsWith(exceptedValue));
+                countDownLatch.countDown();
+            }
+        });
+        listenerContainer.setRocketMQMessageListener(rocketMQMessageListenerClass.getAnnotation(RocketMQMessageListener.class));
+        initConsumeMode.invoke(listenerContainer);
+        scheduledExecutorService.schedule(r, 100, TimeUnit.MILLISECONDS);
+        countDownLatch.await(1000, TimeUnit.MILLISECONDS);
+        if (exceptedTrueOrFalse) {
+            assertThat(result[0]).isTrue(); // excepted
+        } else {
+            assertThat(result[0]).isFalse(); // not excepted
+        }
+    }
+
+    private void tryRocketMQListener(DefaultRocketMQListenerContainer listenerContainer,
+                                     final Class<?> rocketMQMessageListenerClass,
+                                     ScheduledExecutorService scheduledExecutorService, Runnable r, final String exceptedValue,
+                                     boolean exceptedTrueOrFalse) throws InvocationTargetException, IllegalAccessException, NoSuchMethodException, InterruptedException {
+        Method initConsumeMode = DefaultRocketMQListenerContainer.class.getDeclaredMethod("initConsumeMode");
+        initConsumeMode.setAccessible(true);
+
+        final Boolean[] result = new Boolean[] {Boolean.FALSE};
+
+        // RocketMQBatchListener IN ConsumeMode.CONCURRENTLY, AND test for excepted
+        final CountDownLatch countDownLatch = new CountDownLatch(1);
+        listenerContainer.setRocketMQListener(new RocketMQListener<String>() {
+            @Override
+            public void onMessage(String message) {
+                result[0] = message.startsWith(exceptedValue);
+                countDownLatch.countDown();
+            }
+        });
+        listenerContainer.setRocketMQMessageListener(rocketMQMessageListenerClass.getAnnotation(RocketMQMessageListener.class));
+        initConsumeMode.invoke(listenerContainer);
+        scheduledExecutorService.schedule(r, 100, TimeUnit.MILLISECONDS);
+        countDownLatch.await(1000, TimeUnit.MILLISECONDS);
+        if (exceptedTrueOrFalse) {
+            assertThat(result[0]).isTrue(); // excepted
+        } else {
+            assertThat(result[0]).isFalse(); // not excepted
+        }
+    }
+
+    @RocketMQMessageListener(consumerGroup = "consumerGroup1", topic = "test", selectorExpression = "*", selectorType = SelectorType.TAG)
+    static class TagClass {
+    }
+
+    @RocketMQMessageListener(consumerGroup = "consumerGroup1", topic = "test", selectorType = SelectorType.SQL92)
+    static class SQL92Class {
+    }
+
+    @RocketMQMessageListener(consumerGroup = "consumerGroup1", topic = "test", consumeMode = ConsumeMode.CONCURRENTLY)
+    static class ConcurrentlyClass {
+    }
+
+    @RocketMQMessageListener(consumerGroup = "consumerGroup1", topic = "test", consumeMode = ConsumeMode.ORDERLY)
+    static class OrderlyClass {
+    }
+
+
     @RocketMQMessageListener(consumerGroup = "abc1", topic = "test",
-            consumeMode = ConsumeMode.ORDERLY,
-            consumeThreadNumber = 3456,
-            messageModel = MessageModel.BROADCASTING,
-            selectorType = SelectorType.SQL92,
-            selectorExpression = "selectorExpression",
-            tlsEnable = "tlsEnable",
-            namespace = "namespace",
-            delayLevelWhenNextConsume = 1234,
-            suspendCurrentQueueTimeMillis = 2345,
-            instanceName = "instanceName"
+        consumeMode = ConsumeMode.ORDERLY,
+        consumeThreadNumber = 3456,
+        messageModel = MessageModel.BROADCASTING,
+        selectorType = SelectorType.SQL92,
+        selectorExpression = "selectorExpression",
+        tlsEnable = "tlsEnable",
+        namespace = "namespace",
+        delayLevelWhenNextConsume = 1234,
+        suspendCurrentQueueTimeMillis = 2345,
+        instanceName = "instanceName"

Review Comment:
   add consumeMessageBatchMaxSize



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq-spring] aaron-ai commented on pull request #526: [ISSUE#502] batch message support

Posted by "aaron-ai (via GitHub)" <gi...@apache.org>.
aaron-ai commented on PR #526:
URL: https://github.com/apache/rocketmq-spring/pull/526#issuecomment-1430735757

   > @aaron-ai @francisoliverlee Can you help me start ci?
   
   OK


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq-spring] RongtongJin commented on a diff in pull request #526: [ISSUE#502] batch message support

Posted by "RongtongJin (via GitHub)" <gi...@apache.org>.
RongtongJin commented on code in PR #526:
URL: https://github.com/apache/rocketmq-spring/pull/526#discussion_r1118184775


##########
rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListener.java:
##########
@@ -143,6 +143,12 @@
      * The namespace of consumer.
      */
     String namespace() default "";
+    
+    /**
+     * The consumeMessageBatchMaxSize of consumer.
+     * @return
+     */
+    int consumeMessageBatchMaxSize() default 0;

Review Comment:
   why consumeMessageBatchMaxSize default is zero and delayLevelWhenNextConsume  default is 1024 ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq-spring] complone commented on pull request #526: [ISSUE#502] batch message support

Posted by "complone (via GitHub)" <gi...@apache.org>.
complone commented on PR #526:
URL: https://github.com/apache/rocketmq-spring/pull/526#issuecomment-1438081939

   > Hi @complone , Could you add a sample to show how to use it ?
   
   In the unit test, ```testBatchGetMessages``` can be used as an example. This week I will add


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq-spring] panzhi33 commented on pull request #526: [ISSUE#502] batch message support

Posted by "panzhi33 (via GitHub)" <gi...@apache.org>.
panzhi33 commented on PR #526:
URL: https://github.com/apache/rocketmq-spring/pull/526#issuecomment-1438136000

   consumeBatchSize parameter needs to be added, otherwise batch consumption will not take effect, and there will still be one item at a time


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org