You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2023/01/09 09:34:51 UTC

[rocketmq] branch develop updated: Fix restNum calculation in pop consumption mode (#5843)

This is an automated email from the ASF dual-hosted git repository.

lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 4450391d1 Fix restNum calculation in pop consumption mode (#5843)
4450391d1 is described below

commit 4450391d1679d46e4182189b0f11dcffb27cc498
Author: SSpirits <ad...@lv5.moe>
AuthorDate: Mon Jan 9 17:34:43 2023 +0800

    Fix restNum calculation in pop consumption mode (#5843)
---
 .../broker/processor/PopMessageProcessor.java      | 70 +++++++++++-----------
 1 file changed, 34 insertions(+), 36 deletions(-)

diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
index 0f4de599a..5dca6c67b 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
@@ -588,8 +588,8 @@ public class PopMessageProcessor implements NettyRequestProcessor {
                 }
                 return CompletableFuture.completedFuture(result);
             }).thenApply(result -> {
-                atomicRestNum.set(brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId) - atomicOffset.get() + atomicRestNum.get());
                 if (result == null) {
+                    atomicRestNum.set(brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId) - atomicOffset.get() + atomicRestNum.get());
                     return atomicRestNum.get();
                 }
                 if (!result.getMessageMapedList().isEmpty()) {
@@ -632,46 +632,44 @@ public class PopMessageProcessor implements NettyRequestProcessor {
 //                this.brokerController.getConsumerOffsetManager().commitOffset(channel.remoteAddress().toString(), requestHeader.getConsumerGroup(), topic,
 //                        queueId, getMessageTmpResult.getNextBeginOffset());
                 }
-                atomicRestNum.set(result.getMaxOffset() - result.getNextBeginOffset() + atomicRestNum.get());
 
-                if (result != null) {
-                    String brokerName = brokerController.getBrokerConfig().getBrokerName();
-                    for (SelectMappedBufferResult mapedBuffer : result.getMessageMapedList()) {
-                        // We should not recode buffer for normal topic message
-                        if (!isRetry) {
-                            getMessageResult.addMessage(mapedBuffer);
-                        } else {
-                            List<MessageExt> messageExtList = MessageDecoder.decodesBatch(mapedBuffer.getByteBuffer(),
-                                true, false, true);
-                            mapedBuffer.release();
-                            for (MessageExt messageExt : messageExtList) {
-                                try {
-                                    String ckInfo = ExtraInfoUtil.buildExtraInfo(finalOffset, popTime, requestHeader.getInvisibleTime(),
-                                        reviveQid, messageExt.getTopic(), brokerName, messageExt.getQueueId(), messageExt.getQueueOffset());
-                                    messageExt.getProperties().putIfAbsent(MessageConst.PROPERTY_POP_CK, ckInfo);
-
-                                    // Set retry message topic to origin topic and clear message store size to recode
-                                    messageExt.setTopic(requestHeader.getTopic());
-                                    messageExt.setStoreSize(0);
-
-                                    byte[] encode = MessageDecoder.encode(messageExt, false);
-                                    ByteBuffer buffer = ByteBuffer.wrap(encode);
-                                    SelectMappedBufferResult tmpResult =
-                                        new SelectMappedBufferResult(mapedBuffer.getStartOffset(), buffer, encode.length, null);
-                                    getMessageResult.addMessage(tmpResult);
-                                } catch (Exception e) {
-                                    POP_LOGGER.error("Exception in recode retry message buffer, topic={}", topic, e);
-                                }
+                atomicRestNum.set(result.getMaxOffset() - result.getNextBeginOffset() + atomicRestNum.get());
+                String brokerName = brokerController.getBrokerConfig().getBrokerName();
+                for (SelectMappedBufferResult mapedBuffer : result.getMessageMapedList()) {
+                    // We should not recode buffer for normal topic message
+                    if (!isRetry) {
+                        getMessageResult.addMessage(mapedBuffer);
+                    } else {
+                        List<MessageExt> messageExtList = MessageDecoder.decodesBatch(mapedBuffer.getByteBuffer(),
+                            true, false, true);
+                        mapedBuffer.release();
+                        for (MessageExt messageExt : messageExtList) {
+                            try {
+                                String ckInfo = ExtraInfoUtil.buildExtraInfo(finalOffset, popTime, requestHeader.getInvisibleTime(),
+                                    reviveQid, messageExt.getTopic(), brokerName, messageExt.getQueueId(), messageExt.getQueueOffset());
+                                messageExt.getProperties().putIfAbsent(MessageConst.PROPERTY_POP_CK, ckInfo);
+
+                                // Set retry message topic to origin topic and clear message store size to recode
+                                messageExt.setTopic(requestHeader.getTopic());
+                                messageExt.setStoreSize(0);
+
+                                byte[] encode = MessageDecoder.encode(messageExt, false);
+                                ByteBuffer buffer = ByteBuffer.wrap(encode);
+                                SelectMappedBufferResult tmpResult =
+                                    new SelectMappedBufferResult(mapedBuffer.getStartOffset(), buffer, encode.length, null);
+                                getMessageResult.addMessage(tmpResult);
+                            } catch (Exception e) {
+                                POP_LOGGER.error("Exception in recode retry message buffer, topic={}", topic, e);
                             }
                         }
                     }
-                    this.brokerController.getPopInflightMessageCounter().incrementInFlightMessageNum(
-                        topic,
-                        requestHeader.getConsumerGroup(),
-                        queueId,
-                        result.getMessageCount()
-                    );
                 }
+                this.brokerController.getPopInflightMessageCounter().incrementInFlightMessageNum(
+                    topic,
+                    requestHeader.getConsumerGroup(),
+                    queueId,
+                    result.getMessageCount()
+                );
                 return atomicRestNum.get();
             }).whenComplete((result, throwable) -> {
                 if (throwable != null) {