You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2023/01/09 09:34:51 UTC
[rocketmq] branch develop updated: Fix restNum calculation in pop consumption mode (#5843)
This is an automated email from the ASF dual-hosted git repository.
lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 4450391d1 Fix restNum calculation in pop consumption mode (#5843)
4450391d1 is described below
commit 4450391d1679d46e4182189b0f11dcffb27cc498
Author: SSpirits <ad...@lv5.moe>
AuthorDate: Mon Jan 9 17:34:43 2023 +0800
Fix restNum calculation in pop consumption mode (#5843)
---
.../broker/processor/PopMessageProcessor.java | 70 +++++++++++-----------
1 file changed, 34 insertions(+), 36 deletions(-)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
index 0f4de599a..5dca6c67b 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
@@ -588,8 +588,8 @@ public class PopMessageProcessor implements NettyRequestProcessor {
}
return CompletableFuture.completedFuture(result);
}).thenApply(result -> {
- atomicRestNum.set(brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId) - atomicOffset.get() + atomicRestNum.get());
if (result == null) {
+ atomicRestNum.set(brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId) - atomicOffset.get() + atomicRestNum.get());
return atomicRestNum.get();
}
if (!result.getMessageMapedList().isEmpty()) {
@@ -632,46 +632,44 @@ public class PopMessageProcessor implements NettyRequestProcessor {
// this.brokerController.getConsumerOffsetManager().commitOffset(channel.remoteAddress().toString(), requestHeader.getConsumerGroup(), topic,
// queueId, getMessageTmpResult.getNextBeginOffset());
}
- atomicRestNum.set(result.getMaxOffset() - result.getNextBeginOffset() + atomicRestNum.get());
- if (result != null) {
- String brokerName = brokerController.getBrokerConfig().getBrokerName();
- for (SelectMappedBufferResult mapedBuffer : result.getMessageMapedList()) {
- // We should not recode buffer for normal topic message
- if (!isRetry) {
- getMessageResult.addMessage(mapedBuffer);
- } else {
- List<MessageExt> messageExtList = MessageDecoder.decodesBatch(mapedBuffer.getByteBuffer(),
- true, false, true);
- mapedBuffer.release();
- for (MessageExt messageExt : messageExtList) {
- try {
- String ckInfo = ExtraInfoUtil.buildExtraInfo(finalOffset, popTime, requestHeader.getInvisibleTime(),
- reviveQid, messageExt.getTopic(), brokerName, messageExt.getQueueId(), messageExt.getQueueOffset());
- messageExt.getProperties().putIfAbsent(MessageConst.PROPERTY_POP_CK, ckInfo);
-
- // Set retry message topic to origin topic and clear message store size to recode
- messageExt.setTopic(requestHeader.getTopic());
- messageExt.setStoreSize(0);
-
- byte[] encode = MessageDecoder.encode(messageExt, false);
- ByteBuffer buffer = ByteBuffer.wrap(encode);
- SelectMappedBufferResult tmpResult =
- new SelectMappedBufferResult(mapedBuffer.getStartOffset(), buffer, encode.length, null);
- getMessageResult.addMessage(tmpResult);
- } catch (Exception e) {
- POP_LOGGER.error("Exception in recode retry message buffer, topic={}", topic, e);
- }
+ atomicRestNum.set(result.getMaxOffset() - result.getNextBeginOffset() + atomicRestNum.get());
+ String brokerName = brokerController.getBrokerConfig().getBrokerName();
+ for (SelectMappedBufferResult mapedBuffer : result.getMessageMapedList()) {
+ // We should not recode buffer for normal topic message
+ if (!isRetry) {
+ getMessageResult.addMessage(mapedBuffer);
+ } else {
+ List<MessageExt> messageExtList = MessageDecoder.decodesBatch(mapedBuffer.getByteBuffer(),
+ true, false, true);
+ mapedBuffer.release();
+ for (MessageExt messageExt : messageExtList) {
+ try {
+ String ckInfo = ExtraInfoUtil.buildExtraInfo(finalOffset, popTime, requestHeader.getInvisibleTime(),
+ reviveQid, messageExt.getTopic(), brokerName, messageExt.getQueueId(), messageExt.getQueueOffset());
+ messageExt.getProperties().putIfAbsent(MessageConst.PROPERTY_POP_CK, ckInfo);
+
+ // Set retry message topic to origin topic and clear message store size to recode
+ messageExt.setTopic(requestHeader.getTopic());
+ messageExt.setStoreSize(0);
+
+ byte[] encode = MessageDecoder.encode(messageExt, false);
+ ByteBuffer buffer = ByteBuffer.wrap(encode);
+ SelectMappedBufferResult tmpResult =
+ new SelectMappedBufferResult(mapedBuffer.getStartOffset(), buffer, encode.length, null);
+ getMessageResult.addMessage(tmpResult);
+ } catch (Exception e) {
+ POP_LOGGER.error("Exception in recode retry message buffer, topic={}", topic, e);
}
}
}
- this.brokerController.getPopInflightMessageCounter().incrementInFlightMessageNum(
- topic,
- requestHeader.getConsumerGroup(),
- queueId,
- result.getMessageCount()
- );
}
+ this.brokerController.getPopInflightMessageCounter().incrementInFlightMessageNum(
+ topic,
+ requestHeader.getConsumerGroup(),
+ queueId,
+ result.getMessageCount()
+ );
return atomicRestNum.get();
}).whenComplete((result, throwable) -> {
if (throwable != null) {