You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by "zhengsh (via GitHub)" <gi...@apache.org> on 2024/03/14 05:52:48 UTC

[D] rocketmq 5.1 客户端版本 4.9.5 顺序消息偶发在客户端消费丢消息 [rocketmq]

GitHub user zhengsh created a discussion: rocketmq 5.1 客户端版本 4.9.5 顺序消息偶发在客户端消费丢消息

顺序消息消费,有时候回出现消费端丢消息的情况。
比较奇怪的是,在 rocketmq 里面能够查询到丢失的消息
消费者 consumer 代码如下:
`
    public void start() throws Exception {
        
                DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupNameXX);
                consumer.setNamesrvAddr(nameServerXX);
                for (String topic : topics) {
                    consumer.subscribe(topic, "*");
                }
                consumer.setPullBatchSize(pullBatchSize());
                consumer.setConsumeMessageBatchMaxSize(ullBatchSize());
                consumer.setPullInterval(pullInterval());
                consumer.setConsumeThreadMin(threadNum);
                consumer.setConsumeThreadMax(threadNum);
                consumer.setSuspendCurrentQueueTimeMillis(100L);
                consumer.registerMessageListener((List<MessageExt> msgs, ConsumeOrderlyContext context) -> {
                    return consume(msgs);
                });
                log.info("consumer start topic:{}, groupName:{}", String.join(",", topics), groupNameXX);
        
            try {
                consumer.start();
                log.info("consumer consumerGroup[{}] started", groupNameXX);
            } catch (MQClientException e) {
                log.error("start error ", e);
            }
    }

    private ConsumeOrderlyStatus consume(List<MessageExt> messageExts) {
        String batchId = UUID.randomUUID().toString();
        try {
            long start = System.currentTimeMillis();
            //逻辑处理 。。。。。
            return ConsumeOrderlyStatus.SUCCESS;
        } catch (Throwable e) {
            log.error("consume error", e);
            return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
        }
    }
`


GitHub link: https://github.com/apache/rocketmq/discussions/7916

----
This is an automatically sent email for dev@rocketmq.apache.org.
To unsubscribe, please send an email to: dev-unsubscribe@rocketmq.apache.org


Re: [D] rocketmq 5.1 客户端版本 4.9.5 顺序消息偶发在客户端消费丢消息 [rocketmq]

Posted by "zhengsh (via GitHub)" <gi...@apache.org>.
GitHub user zhengsh edited a discussion: rocketmq 5.1 客户端版本 4.9.5 顺序消息偶发在客户端消费丢消息

顺序消息消费,有时候回出现消费端丢消息的情况。
比较奇怪的是,在 rocketmq-admin 里面能够查询到丢失的消息

消费者 consumer 代码如下:
`
    public void start() throws Exception {
        
                DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupNameXX);
                consumer.setNamesrvAddr(nameServerXX);
                for (String topic : topics) {
                    consumer.subscribe(topic, "*");
                }
                consumer.setPullBatchSize(pullBatchSize());
                consumer.setConsumeMessageBatchMaxSize(ullBatchSize());
                consumer.setPullInterval(pullInterval());
                consumer.setConsumeThreadMin(threadNum);
                consumer.setConsumeThreadMax(threadNum);
                consumer.setSuspendCurrentQueueTimeMillis(100L);
                consumer.registerMessageListener((List<MessageExt> msgs, ConsumeOrderlyContext context) -> {
                    return consume(msgs);
                });
                log.info("consumer start topic:{}, groupName:{}", String.join(",", topics), groupNameXX);
        
            try {
                consumer.start();
                log.info("consumer consumerGroup[{}] started", groupNameXX);
            } catch (MQClientException e) {
                log.error("start error ", e);
            }
    }

    private ConsumeOrderlyStatus consume(List<MessageExt> messageExts) {
        String batchId = UUID.randomUUID().toString();
        try {
            long start = System.currentTimeMillis();
            //逻辑处理 。。。。。
            return ConsumeOrderlyStatus.SUCCESS;
        } catch (Throwable e) {
            log.error("consume error", e);
            return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
        }
    }
`

目前我们解决的办法就是通过 rocketmq 重发了一下这个消息


GitHub link: https://github.com/apache/rocketmq/discussions/7916

----
This is an automatically sent email for dev@rocketmq.apache.org.
To unsubscribe, please send an email to: dev-unsubscribe@rocketmq.apache.org