You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by "Aaron-PanJi (via GitHub)" <gi...@apache.org> on 2023/05/11 08:58:49 UTC

[GitHub] [rocketmq] Aaron-PanJi opened a new issue, #6734: Can push consumers provide a single message consumption API?

Aaron-PanJi opened a new issue, #6734:
URL: https://github.com/apache/rocketmq/issues/6734

   ### Is Your Feature Request Related to a Problem?
   
   In a batch of messages, if the last few messages fail to consume, other messages that successfully consume must also return a failure result, causing duplicate consumption.
   
   ### Describe the Solution You'd Like
   
   Consumers provide a single message consumption callback method, which processes one message at a time and returns the consumption result after each message is processed.
   
   ### Describe Alternatives You've Considered
   
   Set a relatively small pullBatchSize parameter value and pull a small number of messages at once to avoid messages that fail to consume affecting messages that succeed in consumption. But this approach affects performance and is not very recommended.
   
   ### Additional Context
   
   None


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] cserwen commented on issue #6734: Can push consumers provide a single message consumption API?

Posted by "cserwen (via GitHub)" <gi...@apache.org>.
cserwen commented on issue #6734:
URL: https://github.com/apache/rocketmq/issues/6734#issuecomment-1545037525

   Actually, you can set the ackIndex  when use `DefaultMQPushConsumer`. The value can mark the consumed index of a batch msgs.
   ```java
   DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("Test");
   consumer.subscribe("TopicTest", "*");
   consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
       for (int i = 0; i < msgs.size(); i++) {
           // handle msg
           if (!handle(msg)) {
               context.setAckIndex(i - 1);
           }
       }
       
       // Just return success, more detail can see ConsumeMessageConcurrentlyService#processConsumeResult;
       return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
   });
   consumer.start();
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org