You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by "zhengsh (via GitHub)" <gi...@apache.org> on 2024/03/14 05:52:48 UTC
[D] rocketmq 5.1 客户端版本 4.9.5 顺序消息偶发在客户端消费丢消息 [rocketmq]
GitHub user zhengsh created a discussion: rocketmq 5.1 客户端版本 4.9.5 顺序消息偶发在客户端消费丢消息
顺序消息消费,有时候回出现消费端丢消息的情况。
比较奇怪的是,在 rocketmq 里面能够查询到丢失的消息
消费者 consumer 代码如下:
`
public void start() throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupNameXX);
consumer.setNamesrvAddr(nameServerXX);
for (String topic : topics) {
consumer.subscribe(topic, "*");
}
consumer.setPullBatchSize(pullBatchSize());
consumer.setConsumeMessageBatchMaxSize(ullBatchSize());
consumer.setPullInterval(pullInterval());
consumer.setConsumeThreadMin(threadNum);
consumer.setConsumeThreadMax(threadNum);
consumer.setSuspendCurrentQueueTimeMillis(100L);
consumer.registerMessageListener((List<MessageExt> msgs, ConsumeOrderlyContext context) -> {
return consume(msgs);
});
log.info("consumer start topic:{}, groupName:{}", String.join(",", topics), groupNameXX);
try {
consumer.start();
log.info("consumer consumerGroup[{}] started", groupNameXX);
} catch (MQClientException e) {
log.error("start error ", e);
}
}
private ConsumeOrderlyStatus consume(List<MessageExt> messageExts) {
String batchId = UUID.randomUUID().toString();
try {
long start = System.currentTimeMillis();
//逻辑处理 。。。。。
return ConsumeOrderlyStatus.SUCCESS;
} catch (Throwable e) {
log.error("consume error", e);
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
}
`
GitHub link: https://github.com/apache/rocketmq/discussions/7916
----
This is an automatically sent email for dev@rocketmq.apache.org.
To unsubscribe, please send an email to: dev-unsubscribe@rocketmq.apache.org
Re: [D] rocketmq 5.1 客户端版本 4.9.5 顺序消息偶发在客户端消费丢消息 [rocketmq]
Posted by "zhengsh (via GitHub)" <gi...@apache.org>.
GitHub user zhengsh edited a discussion: rocketmq 5.1 客户端版本 4.9.5 顺序消息偶发在客户端消费丢消息
顺序消息消费,有时候回出现消费端丢消息的情况。
比较奇怪的是,在 rocketmq-admin 里面能够查询到丢失的消息
消费者 consumer 代码如下:
`
public void start() throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupNameXX);
consumer.setNamesrvAddr(nameServerXX);
for (String topic : topics) {
consumer.subscribe(topic, "*");
}
consumer.setPullBatchSize(pullBatchSize());
consumer.setConsumeMessageBatchMaxSize(ullBatchSize());
consumer.setPullInterval(pullInterval());
consumer.setConsumeThreadMin(threadNum);
consumer.setConsumeThreadMax(threadNum);
consumer.setSuspendCurrentQueueTimeMillis(100L);
consumer.registerMessageListener((List<MessageExt> msgs, ConsumeOrderlyContext context) -> {
return consume(msgs);
});
log.info("consumer start topic:{}, groupName:{}", String.join(",", topics), groupNameXX);
try {
consumer.start();
log.info("consumer consumerGroup[{}] started", groupNameXX);
} catch (MQClientException e) {
log.error("start error ", e);
}
}
private ConsumeOrderlyStatus consume(List<MessageExt> messageExts) {
String batchId = UUID.randomUUID().toString();
try {
long start = System.currentTimeMillis();
//逻辑处理 。。。。。
return ConsumeOrderlyStatus.SUCCESS;
} catch (Throwable e) {
log.error("consume error", e);
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
}
`
目前我们解决的办法就是通过 rocketmq 重发了一下这个消息
GitHub link: https://github.com/apache/rocketmq/discussions/7916
----
This is an automatically sent email for dev@rocketmq.apache.org.
To unsubscribe, please send an email to: dev-unsubscribe@rocketmq.apache.org