You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by do...@apache.org on 2021/12/02 08:34:52 UTC

[rocketmq] 01/02: Finish the logic for RETRY message of logic queue

This is an automated email from the ASF dual-hosted git repository.

dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit ae7b6751c78acb6c6f32904dc429c4470c2e90b0
Author: dongeforever <do...@apache.org>
AuthorDate: Thu Dec 2 11:00:02 2021 +0800

    Finish the logic for RETRY message of logic queue
---
 .../broker/processor/SendMessageProcessor.java     |  8 ++++
 .../impl/consumer/DefaultMQPushConsumerImpl.java   | 56 ++++++++++------------
 ..._Topic_Logic_Queue_\350\256\276\350\256\241.md" |  8 ++--
 3 files changed, 36 insertions(+), 36 deletions(-)

diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
index 03a5dba..c229ce3 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
@@ -212,6 +212,14 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
             return CompletableFuture.completedFuture(response);
         }
 
+        if (requestHeader.getOriginTopic() != null
+                && !msgExt.getTopic().equals(requestHeader.getOriginTopic())) {
+            //here just do some fence in case of some unexpected offset is income
+            response.setCode(ResponseCode.SYSTEM_ERROR);
+            response.setRemark("look message by offset failed to check the topic name" + requestHeader.getOffset());
+            return CompletableFuture.completedFuture(response);
+        }
+
         final String retryTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);
         if (null == retryTopic) {
             MessageAccessor.putProperty(msgExt, MessageConst.PROPERTY_RETRY_TOPIC, msgExt.getTopic());
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
index d00c4e8..9d158c6 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
@@ -715,48 +715,42 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
         sendMessageBack(msg, delayLevel, null, mq);
     }
 
+
     private void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName, final MessageQueue mq)
         throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
         try {
-            String desBrokerName = brokerName;
-            if (mq != null) {
-                String tmpBrokerName = this.mQClientFactory.getBrokerNameFromMessageQueue(mq);
-                if (tmpBrokerName != null) {
-                    desBrokerName = tmpBrokerName;
-                }
-            }
-            if (MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME.equals(desBrokerName)) {
-                desBrokerName = this.mQClientFactory.getBrokerNameFromMessageQueue(this.defaultMQPushConsumer.queueWithNamespace(new MessageQueue(msg.getTopic(), msg.getBrokerName(), msg.getQueueId())));
-            }
-
-            String brokerAddr = null;
-            if (null != desBrokerName) {
-                brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(desBrokerName);
+            if (MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME.equals(brokerName)
+                || (mq != null && MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME.equals(mq.getBrokerName()))) {
+                sendMessageBackAsNormalMessage(msg);
             } else {
-                RemotingHelper.parseSocketAddressAddr(msg.getStoreHost());
+                String brokerAddr = (null != brokerName) ? this.mQClientFactory.findBrokerAddressInPublish(brokerName)
+                        : RemotingHelper.parseSocketAddressAddr(msg.getStoreHost());
+                this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, msg,
+                        this.defaultMQPushConsumer.getConsumerGroup(), delayLevel, 5000, getMaxReconsumeTimes());
             }
-            this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, msg,
-                this.defaultMQPushConsumer.getConsumerGroup(), delayLevel, 5000, getMaxReconsumeTimes());
         } catch (Exception e) {
             log.error("sendMessageBack Exception, " + this.defaultMQPushConsumer.getConsumerGroup(), e);
+            sendMessageBackAsNormalMessage(msg);
+        } finally {
+            msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQPushConsumer.getNamespace()));
+        }
+    }
 
-            Message newMsg = new Message(MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup()), msg.getBody());
+    private void sendMessageBackAsNormalMessage(MessageExt msg) throws  RemotingException, MQBrokerException, InterruptedException, MQClientException {
+        Message newMsg = new Message(MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup()), msg.getBody());
 
-            String originMsgId = MessageAccessor.getOriginMessageId(msg);
-            MessageAccessor.setOriginMessageId(newMsg, UtilAll.isBlank(originMsgId) ? msg.getMsgId() : originMsgId);
+        String originMsgId = MessageAccessor.getOriginMessageId(msg);
+        MessageAccessor.setOriginMessageId(newMsg, UtilAll.isBlank(originMsgId) ? msg.getMsgId() : originMsgId);
 
-            newMsg.setFlag(msg.getFlag());
-            MessageAccessor.setProperties(newMsg, msg.getProperties());
-            MessageAccessor.putProperty(newMsg, MessageConst.PROPERTY_RETRY_TOPIC, msg.getTopic());
-            MessageAccessor.setReconsumeTime(newMsg, String.valueOf(msg.getReconsumeTimes() + 1));
-            MessageAccessor.setMaxReconsumeTimes(newMsg, String.valueOf(getMaxReconsumeTimes()));
-            MessageAccessor.clearProperty(newMsg, MessageConst.PROPERTY_TRANSACTION_PREPARED);
-            newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes());
+        newMsg.setFlag(msg.getFlag());
+        MessageAccessor.setProperties(newMsg, msg.getProperties());
+        MessageAccessor.putProperty(newMsg, MessageConst.PROPERTY_RETRY_TOPIC, msg.getTopic());
+        MessageAccessor.setReconsumeTime(newMsg, String.valueOf(msg.getReconsumeTimes() + 1));
+        MessageAccessor.setMaxReconsumeTimes(newMsg, String.valueOf(getMaxReconsumeTimes()));
+        MessageAccessor.clearProperty(newMsg, MessageConst.PROPERTY_TRANSACTION_PREPARED);
+        newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes());
 
-            this.mQClientFactory.getDefaultMQProducer().send(newMsg);
-        } finally {
-            msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQPushConsumer.getNamespace()));
-        }
+        this.mQClientFactory.getDefaultMQProducer().send(newMsg);
     }
 
     void ackAsync(MessageExt message, String consumerGroup) {
diff --git "a/docs/cn/statictopic/RocketMQ_Static_Topic_Logic_Queue_\350\256\276\350\256\241.md" "b/docs/cn/statictopic/RocketMQ_Static_Topic_Logic_Queue_\350\256\276\350\256\241.md"
index 9d41f00..d2e8e10 100644
--- "a/docs/cn/statictopic/RocketMQ_Static_Topic_Logic_Queue_\350\256\276\350\256\241.md"
+++ "b/docs/cn/statictopic/RocketMQ_Static_Topic_Logic_Queue_\350\256\276\350\256\241.md"
@@ -385,10 +385,9 @@ logicOffset的决策,依赖于上一个 PhysicalQueue 的最大位点。
 * 所有位点相关的API,需要考虑 MappingItem endOffset,因为超过了 endOffset 可能已经不属于 当前 LogicQueue 了
 * 新建 MappingItem,需要先获取 旧 MappingItem 的 endOffset  
 
-当前实现,为了保证简洁,禁止 PhysicalQueue 被重复利用,每次更新映射都会让物理层面的 writeQueues++ 和 readQueues++
+当前实现,为了保证简洁,禁止 PhysicalQueue 被重复利用,每次更新映射都会让物理层面的 writeQueues++ 和 readQueues++。  
 后续实现,可以考虑复用已经被清除掉的Physical,也即已经没有数据,位点从0开始。
 
-
 #### 备机更新映射
 当前,admin操作都是要求在Master操作的。因此,没有这个问题。  
 Command操作时,提前预判Master是否存在,如果不存在,则提前报错,减少中间失败率。  
@@ -411,10 +410,9 @@ Command操作时,提前预判Master是否存在,如果不存在,则提前
 #### 拉取消息时的 中断问题
 当1个 PhysicalQueue 被拉取干净时,需要修正 nextBeginOffset 到下一个 PhysicalQueue。
 如果没有处理好,则直接会导致拉取中断,无法前进。
-
-
 #### pullResult 位点由谁设置的问题
-类似于Batch,由客户端设置,避免服务端解开消息。
+类似于Batch,由客户端设置,避免服务端解开消息:  
+在PullResultExt中新增字段 offsetDelta。
 #### 远程读的性能问题
 从实战经验来看,性能损耗几乎不计。
 #### 使用习惯的改变