You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by do...@apache.org on 2021/12/02 08:34:52 UTC
[rocketmq] 01/02: Finish the logic for RETRY message of logic queue
This is an automated email from the ASF dual-hosted git repository.
dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit ae7b6751c78acb6c6f32904dc429c4470c2e90b0
Author: dongeforever <do...@apache.org>
AuthorDate: Thu Dec 2 11:00:02 2021 +0800
Finish the logic for RETRY message of logic queue
---
.../broker/processor/SendMessageProcessor.java | 8 ++++
.../impl/consumer/DefaultMQPushConsumerImpl.java | 56 ++++++++++------------
..._Topic_Logic_Queue_\350\256\276\350\256\241.md" | 8 ++--
3 files changed, 36 insertions(+), 36 deletions(-)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
index 03a5dba..c229ce3 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
@@ -212,6 +212,14 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
return CompletableFuture.completedFuture(response);
}
+ if (requestHeader.getOriginTopic() != null
+ && !msgExt.getTopic().equals(requestHeader.getOriginTopic())) {
+ //here just do some fence in case of some unexpected offset is income
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark("look message by offset failed to check the topic name" + requestHeader.getOffset());
+ return CompletableFuture.completedFuture(response);
+ }
+
final String retryTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);
if (null == retryTopic) {
MessageAccessor.putProperty(msgExt, MessageConst.PROPERTY_RETRY_TOPIC, msgExt.getTopic());
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
index d00c4e8..9d158c6 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
@@ -715,48 +715,42 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
sendMessageBack(msg, delayLevel, null, mq);
}
+
private void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName, final MessageQueue mq)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
try {
- String desBrokerName = brokerName;
- if (mq != null) {
- String tmpBrokerName = this.mQClientFactory.getBrokerNameFromMessageQueue(mq);
- if (tmpBrokerName != null) {
- desBrokerName = tmpBrokerName;
- }
- }
- if (MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME.equals(desBrokerName)) {
- desBrokerName = this.mQClientFactory.getBrokerNameFromMessageQueue(this.defaultMQPushConsumer.queueWithNamespace(new MessageQueue(msg.getTopic(), msg.getBrokerName(), msg.getQueueId())));
- }
-
- String brokerAddr = null;
- if (null != desBrokerName) {
- brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(desBrokerName);
+ if (MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME.equals(brokerName)
+ || (mq != null && MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME.equals(mq.getBrokerName()))) {
+ sendMessageBackAsNormalMessage(msg);
} else {
- RemotingHelper.parseSocketAddressAddr(msg.getStoreHost());
+ String brokerAddr = (null != brokerName) ? this.mQClientFactory.findBrokerAddressInPublish(brokerName)
+ : RemotingHelper.parseSocketAddressAddr(msg.getStoreHost());
+ this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, msg,
+ this.defaultMQPushConsumer.getConsumerGroup(), delayLevel, 5000, getMaxReconsumeTimes());
}
- this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, msg,
- this.defaultMQPushConsumer.getConsumerGroup(), delayLevel, 5000, getMaxReconsumeTimes());
} catch (Exception e) {
log.error("sendMessageBack Exception, " + this.defaultMQPushConsumer.getConsumerGroup(), e);
+ sendMessageBackAsNormalMessage(msg);
+ } finally {
+ msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQPushConsumer.getNamespace()));
+ }
+ }
- Message newMsg = new Message(MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup()), msg.getBody());
+ private void sendMessageBackAsNormalMessage(MessageExt msg) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+ Message newMsg = new Message(MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup()), msg.getBody());
- String originMsgId = MessageAccessor.getOriginMessageId(msg);
- MessageAccessor.setOriginMessageId(newMsg, UtilAll.isBlank(originMsgId) ? msg.getMsgId() : originMsgId);
+ String originMsgId = MessageAccessor.getOriginMessageId(msg);
+ MessageAccessor.setOriginMessageId(newMsg, UtilAll.isBlank(originMsgId) ? msg.getMsgId() : originMsgId);
- newMsg.setFlag(msg.getFlag());
- MessageAccessor.setProperties(newMsg, msg.getProperties());
- MessageAccessor.putProperty(newMsg, MessageConst.PROPERTY_RETRY_TOPIC, msg.getTopic());
- MessageAccessor.setReconsumeTime(newMsg, String.valueOf(msg.getReconsumeTimes() + 1));
- MessageAccessor.setMaxReconsumeTimes(newMsg, String.valueOf(getMaxReconsumeTimes()));
- MessageAccessor.clearProperty(newMsg, MessageConst.PROPERTY_TRANSACTION_PREPARED);
- newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes());
+ newMsg.setFlag(msg.getFlag());
+ MessageAccessor.setProperties(newMsg, msg.getProperties());
+ MessageAccessor.putProperty(newMsg, MessageConst.PROPERTY_RETRY_TOPIC, msg.getTopic());
+ MessageAccessor.setReconsumeTime(newMsg, String.valueOf(msg.getReconsumeTimes() + 1));
+ MessageAccessor.setMaxReconsumeTimes(newMsg, String.valueOf(getMaxReconsumeTimes()));
+ MessageAccessor.clearProperty(newMsg, MessageConst.PROPERTY_TRANSACTION_PREPARED);
+ newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes());
- this.mQClientFactory.getDefaultMQProducer().send(newMsg);
- } finally {
- msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQPushConsumer.getNamespace()));
- }
+ this.mQClientFactory.getDefaultMQProducer().send(newMsg);
}
void ackAsync(MessageExt message, String consumerGroup) {
diff --git "a/docs/cn/statictopic/RocketMQ_Static_Topic_Logic_Queue_\350\256\276\350\256\241.md" "b/docs/cn/statictopic/RocketMQ_Static_Topic_Logic_Queue_\350\256\276\350\256\241.md"
index 9d41f00..d2e8e10 100644
--- "a/docs/cn/statictopic/RocketMQ_Static_Topic_Logic_Queue_\350\256\276\350\256\241.md"
+++ "b/docs/cn/statictopic/RocketMQ_Static_Topic_Logic_Queue_\350\256\276\350\256\241.md"
@@ -385,10 +385,9 @@ logicOffset的决策,依赖于上一个 PhysicalQueue 的最大位点。
* 所有位点相关的API,需要考虑 MappingItem endOffset,因为超过了 endOffset 可能已经不属于 当前 LogicQueue 了
* 新建 MappingItem,需要先获取 旧 MappingItem 的 endOffset
-当前实现,为了保证简洁,禁止 PhysicalQueue 被重复利用,每次更新映射都会让物理层面的 writeQueues++ 和 readQueues++
+当前实现,为了保证简洁,禁止 PhysicalQueue 被重复利用,每次更新映射都会让物理层面的 writeQueues++ 和 readQueues++。
后续实现,可以考虑复用已经被清除掉的Physical,也即已经没有数据,位点从0开始。
-
#### 备机更新映射
当前,admin操作都是要求在Master操作的。因此,没有这个问题。
Command操作时,提前预判Master是否存在,如果不存在,则提前报错,减少中间失败率。
@@ -411,10 +410,9 @@ Command操作时,提前预判Master是否存在,如果不存在,则提前
#### 拉取消息时的 中断问题
当1个 PhysicalQueue 被拉取干净时,需要修正 nextBeginOffset 到下一个 PhysicalQueue。
如果没有处理好,则直接会导致拉取中断,无法前进。
-
-
#### pullResult 位点由谁设置的问题
-类似于Batch,由客户端设置,避免服务端解开消息。
+类似于Batch,由客户端设置,避免服务端解开消息:
+在PullResultExt中新增字段 offsetDelta。
#### 远程读的性能问题
从实战经验来看,性能损耗几乎不计。
#### 使用习惯的改变