You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by do...@apache.org on 2021/11/10 06:57:29 UTC
[rocketmq] branch 5.0.0-alpha-static-topic updated: Polish the
rewrite logic for SendProcessor
This is an automated email from the ASF dual-hosted git repository.
dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/5.0.0-alpha-static-topic by this push:
new 36a82fa Polish the rewrite logic for SendProcessor
36a82fa is described below
commit 36a82fa67785aa4e98a2b6a6ceeda576e9cf66b7
Author: dongeforever <do...@apache.org>
AuthorDate: Wed Nov 10 14:57:19 2021 +0800
Polish the rewrite logic for SendProcessor
---
.../broker/processor/SendMessageProcessor.java | 29 ++++++++++++++++------
.../rocketmq/common/TopicQueueMappingDetail.java | 23 +++++++++++++++++
2 files changed, 45 insertions(+), 7 deletions(-)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
index 9daebf4..7b88c75 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
@@ -17,6 +17,7 @@
package org.apache.rocketmq.broker.processor;
import java.net.SocketAddress;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
@@ -126,16 +127,28 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
try {
TopicQueueMappingDetail mappingDetail = this.brokerController.getTopicQueueMappingManager().getTopicQueueMapping(requestHeader.getTopic());
if (mappingDetail == null) {
+ //it is not static topic
return null;
}
- if (!mappingDetail.getCurrIdMap().containsKey(requestHeader.getQueueId())) {
- return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d does not exit in current broker %s", requestHeader.getTopic(), requestHeader.getQueueId(), this.brokerController.getBrokerConfig().getBrokerName()));
+ Integer phyQueueId = null;
+ //compatible with the old logic, but it fact, this should not happen
+ if (requestHeader.getQueueId() < 0) {
+ Iterator<Map.Entry<Integer, Integer>> it = mappingDetail.getCurrIdMap().entrySet().iterator();
+ if (it.hasNext()) {
+ phyQueueId = it.next().getValue();
+ }
+ } else {
+ phyQueueId = mappingDetail.getCurrIdMap().get(requestHeader.getQueueId());
+ }
+ if (phyQueueId == null) {
+ return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d does not exit in request process of current broker %s", requestHeader.getTopic(), requestHeader.getQueueId(), this.brokerController.getBrokerConfig().getBrokerName()));
+ } else {
+ requestHeader.setQueueId(phyQueueId);
+ return null;
}
- requestHeader.setQueueId(mappingDetail.getCurrIdMap().get(requestHeader.getQueueId()));
} catch (Throwable t) {
return buildErrorResponse(ResponseCode.SYSTEM_ERROR, t.getMessage());
}
- return null;
}
private RemotingCommand rewriteResponseForStaticTopic(String topic, SendMessageResponseHeader responseHeader) {
@@ -144,14 +157,16 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
if (mappingDetail == null) {
return null;
}
- if (!mappingDetail.getCurrIdMap().containsKey(responseHeader.getQueueId())) {
- return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d does not exit in current broker %s", topic, responseHeader.getQueueId(), this.brokerController.getBrokerConfig().getBrokerName()));
+ Integer globalId = mappingDetail.getCurrIdMapRevert().get(responseHeader.getQueueId());
+ if (globalId == null) {
+ return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d does not exist in response process of current broker %s", topic, responseHeader.getQueueId(), this.brokerController.getBrokerConfig().getBrokerName()));
}
- long staticLogicOffset = mappingDetail.convertToLogicOffset(responseHeader.getQueueId(), responseHeader.getQueueOffset());
+ long staticLogicOffset = mappingDetail.convertToLogicOffset(globalId, responseHeader.getQueueOffset());
if (staticLogicOffset < 0) {
return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d convert offset error in current broker %s", topic, responseHeader.getQueueId(), this.brokerController.getBrokerConfig().getBrokerName()));
}
+ responseHeader.setQueueId(globalId);
responseHeader.setQueueOffset(staticLogicOffset);
} catch (Throwable t) {
return buildErrorResponse(ResponseCode.SYSTEM_ERROR, t.getMessage());
diff --git a/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingDetail.java b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingDetail.java
index a181130..0021310 100644
--- a/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingDetail.java
+++ b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingDetail.java
@@ -27,6 +27,9 @@ public class TopicQueueMappingDetail extends TopicQueueMappingInfo {
// the mapping info in current broker, do not register to nameserver
ConcurrentMap<Integer/*global id*/, ImmutableList<LogicQueueMappingItem>> hostedQueues = new ConcurrentHashMap<Integer, ImmutableList<LogicQueueMappingItem>>();
+ transient ConcurrentMap<Integer/*physicalId*/, Integer/*logicId*/> currIdMapRevert = new ConcurrentHashMap<Integer, Integer>();
+
+
public TopicQueueMappingDetail(String topic, int totalQueues, String bname) {
super(topic, totalQueues, bname);
@@ -45,6 +48,18 @@ public class TopicQueueMappingDetail extends TopicQueueMappingInfo {
public void buildIdMap() {
this.currIdMap = buildIdMap(LEVEL_0);
this.prevIdMap = buildIdMap(LEVEL_1);
+ this.currIdMapRevert = revert(this.currIdMap);
+ }
+
+ public ConcurrentMap<Integer, Integer> revert(ConcurrentMap<Integer, Integer> original) {
+ if (original == null || original.isEmpty()) {
+ return new ConcurrentHashMap<Integer, Integer>();
+ }
+ ConcurrentMap<Integer, Integer> tmpIdMap = new ConcurrentHashMap<Integer, Integer>();
+ for (Map.Entry<Integer, Integer> entry: tmpIdMap.entrySet()) {
+ tmpIdMap.put(entry.getValue(), entry.getKey());
+ }
+ return tmpIdMap;
}
public ConcurrentMap<Integer, Integer> buildIdMap(int level) {
@@ -107,6 +122,14 @@ public class TopicQueueMappingDetail extends TopicQueueMappingInfo {
return topicQueueMappingInfo;
}
+ public ConcurrentMap<Integer, Integer> getCurrIdMapRevert() {
+ return currIdMapRevert;
+ }
+
+ public void setCurrIdMapRevert(ConcurrentMap<Integer, Integer> currIdMapRevert) {
+ this.currIdMapRevert = currIdMapRevert;
+ }
+
public int getTotalQueues() {
return totalQueues;
}