You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by do...@apache.org on 2021/11/11 06:38:20 UTC
[rocketmq] branch 5.0.0-alpha-static-topic updated: Polish the
topic queue mapping context, and process the conext for ConsumerManager
This is an automated email from the ASF dual-hosted git repository.
dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/5.0.0-alpha-static-topic by this push:
new 32f58c7 Polish the topic queue mapping context, and process the conext for ConsumerManager
32f58c7 is described below
commit 32f58c79827f92d5f66f7e44bb38250ddb1a0585
Author: dongeforever <do...@apache.org>
AuthorDate: Thu Nov 11 14:38:04 2021 +0800
Polish the topic queue mapping context, and process the conext for ConsumerManager
---
.../broker/processor/ConsumerManageProcessor.java | 15 +++-
.../broker/processor/PullMessageProcessor.java | 38 +++-------
.../broker/processor/SendMessageProcessor.java | 71 +++++--------------
.../broker/topic/TopicQueueMappingManager.java | 81 ++++++++++++++++++++++
.../rocketmq/common/TopicQueueMappingContext.java | 20 +++++-
.../rocketmq/common/TopicQueueMappingDetail.java | 30 +++-----
.../protocol/header/PullMessageRequestHeader.java | 8 ++-
.../header/QueryConsumerOffsetRequestHeader.java | 8 ++-
.../protocol/header/SendMessageRequestHeader.java | 8 ++-
.../header/UpdateConsumerOffsetRequestHeader.java | 8 ++-
.../rocketmq/remoting/TopicQueueRequestHeader.java | 25 +++++++
.../remoting/protocol/RemotingCommand.java | 7 ++
12 files changed, 206 insertions(+), 113 deletions(-)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java
index 77317a6..59ea6c6 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java
@@ -20,6 +20,7 @@ import io.netty.channel.ChannelHandlerContext;
import java.util.List;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.client.ConsumerGroupInfo;
+import org.apache.rocketmq.common.TopicQueueMappingContext;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.common.protocol.RequestCode;
@@ -109,6 +110,11 @@ public class ConsumerManageProcessor extends AsyncNettyRequestProcessor implemen
final UpdateConsumerOffsetRequestHeader requestHeader =
(UpdateConsumerOffsetRequestHeader) request
.decodeCommandCustomHeader(UpdateConsumerOffsetRequestHeader.class);
+ TopicQueueMappingContext mappingContext = this.brokerController.getTopicQueueMappingManager().buildTopicQueueMappingContext(requestHeader);
+ RemotingCommand rewriteResult = this.brokerController.getTopicQueueMappingManager().rewriteRequestForStaticTopic(requestHeader, mappingContext);
+ if (rewriteResult != null) {
+ return rewriteResult;
+ }
this.brokerController.getConsumerOffsetManager().commitOffset(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), requestHeader.getConsumerGroup(),
requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getCommitOffset());
response.setCode(ResponseCode.SUCCESS);
@@ -126,6 +132,12 @@ public class ConsumerManageProcessor extends AsyncNettyRequestProcessor implemen
(QueryConsumerOffsetRequestHeader) request
.decodeCommandCustomHeader(QueryConsumerOffsetRequestHeader.class);
+ TopicQueueMappingContext mappingContext = this.brokerController.getTopicQueueMappingManager().buildTopicQueueMappingContext(requestHeader);
+ RemotingCommand rewriteResult = this.brokerController.getTopicQueueMappingManager().rewriteRequestForStaticTopic(requestHeader, mappingContext);
+ if (rewriteResult != null) {
+ return rewriteResult;
+ }
+
long offset =
this.brokerController.getConsumerOffsetManager().queryOffset(
requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId());
@@ -140,7 +152,8 @@ public class ConsumerManageProcessor extends AsyncNettyRequestProcessor implemen
requestHeader.getQueueId());
if (minOffset <= 0
&& !this.brokerController.getMessageStore().checkInDiskByConsumeOffset(
- requestHeader.getTopic(), requestHeader.getQueueId(), 0)) {
+ requestHeader.getTopic(), requestHeader.getQueueId(), 0)
+ && mappingContext.checkIfAsPhysical()) {
responseHeader.setOffset(0L);
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
index 8d7758a..79869b9 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
@@ -80,6 +80,8 @@ import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.config.BrokerRole;
import org.apache.rocketmq.store.stats.BrokerStatsManager;
+import static org.apache.rocketmq.remoting.protocol.RemotingCommand.buildErrorResponse;
+
public class PullMessageProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private final BrokerController brokerController;
@@ -101,50 +103,26 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements
}
- private RemotingCommand buildErrorResponse(int code, String remark) {
- final RemotingCommand response = RemotingCommand.createResponseCommand(PullMessageResponseHeader.class);
- response.setCode(code);
- response.setRemark(remark);
- return response;
- }
-
- private TopicQueueMappingContext buildTopicQueueMappingContext(PullMessageRequestHeader requestHeader) {
- if (requestHeader.getPhysical() != null
- && Boolean.TRUE.equals(requestHeader.getPhysical())) {
- return null;
- }
- TopicQueueMappingDetail mappingDetail = this.brokerController.getTopicQueueMappingManager().getTopicQueueMapping(requestHeader.getTopic());
- if (mappingDetail == null) {
- //it is not static topic
- return null;
- }
- String topic = requestHeader.getTopic();
- Integer globalId = requestHeader.getQueueId();
- Long globalOffset = requestHeader.getQueueOffset();
-
- LogicQueueMappingItem mappingItem = mappingDetail.findLogicQueueMappingItem(globalId, globalOffset);
- return new TopicQueueMappingContext(topic, globalId, globalOffset, mappingDetail, mappingItem);
- }
private RemotingCommand rewriteRequestForStaticTopic(PullMessageRequestHeader requestHeader, TopicQueueMappingContext mappingContext) {
try {
- if (mappingContext == null) {
+ if (mappingContext.getMappingDetail() == null) {
return null;
}
+ TopicQueueMappingDetail mappingDetail = mappingContext.getMappingDetail();
String topic = mappingContext.getTopic();
Integer globalId = mappingContext.getGlobalId();
Long globalOffset = mappingContext.getGlobalOffset();
LogicQueueMappingItem mappingItem = mappingContext.getMappingItem();
if (mappingItem == null) {
- return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d cannot find mapping item in request process of current broker %s",
- topic, globalId, this.brokerController.getBrokerConfig().getBrokerName()));
+ return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d cannot find mapping item in request process of current broker %s", topic, globalId, mappingDetail.getBname()));
}
if (globalOffset < mappingItem.getStartOffset()) {
log.warn("{}-{} fetch offset {} smaller than the min mapping offset {}", topic, globalId, globalOffset, mappingItem.getStartOffset());
return buildErrorResponse(ResponseCode.PULL_OFFSET_MOVED, String.format("%s-%d fetch offset {} smaller than the min mapping offset {} in broker %s",
- topic, globalId, globalOffset, mappingItem.getStartOffset(), this.brokerController.getBrokerConfig().getBrokerName()));
+ topic, globalId, globalOffset, mappingItem.getStartOffset(), mappingDetail.getBname()));
}
//below are physical info
String bname = mappingItem.getBname();
@@ -157,7 +135,7 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements
requestHeader.setMaxMsgNums((int) Math.min(mappingItem.getEndOffset() - mappingItem.getStartOffset(), requestHeader.getMaxMsgNums()));
}
- if (this.brokerController.getBrokerConfig().getBrokerName().equals(bname)) {
+ if (mappingDetail.getBname().equals(bname)) {
//just let it go
return null;
}
@@ -278,7 +256,7 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements
return response;
}
- TopicQueueMappingContext mappingContext = buildTopicQueueMappingContext(requestHeader);
+ TopicQueueMappingContext mappingContext = this.brokerController.getTopicQueueMappingManager().buildTopicQueueMappingContext(requestHeader, false, requestHeader.getQueueOffset());
{
RemotingCommand rewriteResult = rewriteRequestForStaticTopic(requestHeader, mappingContext);
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
index 52508a4..baa1024 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
@@ -29,6 +29,7 @@ import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.mqtrace.ConsumeMessageContext;
import org.apache.rocketmq.broker.mqtrace.ConsumeMessageHook;
import org.apache.rocketmq.broker.mqtrace.SendMessageContext;
+import org.apache.rocketmq.common.LogicQueueMappingItem;
import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.TopicConfig;
@@ -53,6 +54,7 @@ import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.common.sysflag.MessageSysFlag;
import org.apache.rocketmq.common.sysflag.TopicSysFlag;
import org.apache.rocketmq.common.topic.TopicValidator;
+import org.apache.rocketmq.remoting.TopicQueueRequestHeader;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
import org.apache.rocketmq.remoting.netty.RemotingResponseCallback;
@@ -63,6 +65,8 @@ import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.store.config.StorePathConfigHelper;
import org.apache.rocketmq.store.stats.BrokerStatsManager;
+import static org.apache.rocketmq.remoting.protocol.RemotingCommand.buildErrorResponse;
+
public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor {
private List<ConsumeMessageHook> consumeMessageHookList;
@@ -99,8 +103,8 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
if (requestHeader == null) {
return CompletableFuture.completedFuture(null);
}
- TopicQueueMappingContext mappingContext = buildTopicQueueMappingContext(requestHeader);
- RemotingCommand rewriteResult = rewriteRequestForStaticTopic(requestHeader, mappingContext);
+ TopicQueueMappingContext mappingContext = this.brokerController.getTopicQueueMappingManager().buildTopicQueueMappingContext(requestHeader, true, Long.MAX_VALUE);
+ RemotingCommand rewriteResult = this.brokerController.getTopicQueueMappingManager().rewriteRequestForStaticTopic(requestHeader, mappingContext);
if (rewriteResult != null) {
return CompletableFuture.completedFuture(rewriteResult);
}
@@ -115,68 +119,27 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
}
-
- private RemotingCommand buildErrorResponse(int code, String remark) {
- final RemotingCommand response = RemotingCommand.createResponseCommand(null);
- response.setCode(code);
- response.setRemark(remark);
- return response;
- }
-
- private TopicQueueMappingContext buildTopicQueueMappingContext(SendMessageRequestHeader requestHeader) {
- if (requestHeader.getPhysical() != null
- && Boolean.TRUE.equals(requestHeader.getPhysical())) {
- return null;
- }
- TopicQueueMappingDetail mappingDetail = this.brokerController.getTopicQueueMappingManager().getTopicQueueMapping(requestHeader.getTopic());
- if (mappingDetail == null) {
- //it is not static topic
- return null;
- }
- return new TopicQueueMappingContext(requestHeader.getTopic(), requestHeader.getQueueId(), null, mappingDetail, null);
- }
/**
* If the response is not null, it meets some errors
- * @param requestHeader
* @return
*/
- private RemotingCommand rewriteRequestForStaticTopic(SendMessageRequestHeader requestHeader, TopicQueueMappingContext mappingContext) {
- try {
- if (mappingContext == null) {
- return null;
- }
- TopicQueueMappingDetail mappingDetail = mappingContext.getMappingDetail();
- Integer phyQueueId = null;
- //compatible with the old logic, but it fact, this should not happen
- if (requestHeader.getQueueId() < 0) {
- Iterator<Map.Entry<Integer, Integer>> it = mappingDetail.getCurrIdMap().entrySet().iterator();
- if (it.hasNext()) {
- phyQueueId = it.next().getValue();
- }
- } else {
- phyQueueId = mappingDetail.getCurrIdMap().get(requestHeader.getQueueId());
- }
- if (phyQueueId == null) {
- return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d does not exit in request process of current broker %s", requestHeader.getTopic(), requestHeader.getQueueId(), this.brokerController.getBrokerConfig().getBrokerName()));
- } else {
- requestHeader.setQueueId(phyQueueId);
- return null;
- }
- } catch (Throwable t) {
- return buildErrorResponse(ResponseCode.SYSTEM_ERROR, t.getMessage());
- }
- }
- private RemotingCommand rewriteResponseForStaticTopic(SendMessageRequestHeader requestHeader, SendMessageResponseHeader responseHeader, TopicQueueMappingContext mappingContext) {
+
+ private RemotingCommand rewriteResponseForStaticTopic(SendMessageResponseHeader responseHeader, TopicQueueMappingContext mappingContext) {
try {
- if (mappingContext == null) {
+ if (mappingContext.getMappingDetail() == null) {
return null;
}
TopicQueueMappingDetail mappingDetail = mappingContext.getMappingDetail();
- long staticLogicOffset = mappingDetail.computeStaticQueueOffset(mappingContext.getGlobalId(), responseHeader.getQueueOffset());
+ LogicQueueMappingItem mappingItem = mappingContext.getMappingItem();
+ if (mappingItem == null) {
+ return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d does not exit in request process of current broker %s", mappingContext.getTopic(), mappingContext.getGlobalId(), mappingDetail.getBname()));
+ }
+ //no need to care the broker name
+ long staticLogicOffset = mappingItem.computeStaticQueueOffset(responseHeader.getQueueOffset());
if (staticLogicOffset < 0) {
- return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d convert offset error in current broker %s", mappingContext.getTopic(), responseHeader.getQueueId(), this.brokerController.getBrokerConfig().getBrokerName()));
+ return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d convert offset error in current broker %s", mappingContext.getTopic(), mappingContext.getGlobalId(), mappingDetail.getBname()));
}
responseHeader.setQueueId(mappingContext.getGlobalId());
responseHeader.setQueueOffset(staticLogicOffset);
@@ -626,7 +589,7 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
responseHeader.setQueueId(queueIdInt);
responseHeader.setQueueOffset(putMessageResult.getAppendMessageResult().getLogicsOffset());
- RemotingCommand rewriteResult = rewriteResponseForStaticTopic(requestHeader, responseHeader, mappingContext);
+ RemotingCommand rewriteResult = rewriteResponseForStaticTopic(responseHeader, mappingContext);
if (rewriteResult != null) {
return rewriteResult;
}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
index 2f5d558..ae2e75d 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
@@ -17,21 +17,32 @@
package org.apache.rocketmq.broker.topic;
import com.alibaba.fastjson.JSON;
+import com.google.common.collect.ImmutableList;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.BrokerPathConfigHelper;
import org.apache.rocketmq.common.ConfigManager;
import org.apache.rocketmq.common.DataVersion;
+import org.apache.rocketmq.common.LogicQueueMappingItem;
+import org.apache.rocketmq.common.TopicQueueMappingContext;
import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.body.TopicQueueMappingSerializeWrapper;
import org.apache.rocketmq.common.TopicQueueMappingDetail;
+import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetRequestHeader;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.TopicQueueRequestHeader;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import java.util.List;
+import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
+import static org.apache.rocketmq.remoting.protocol.RemotingCommand.buildErrorResponse;
+
public class TopicQueueMappingManager extends ConfigManager {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private static final long LOCK_TIMEOUT_MILLIS = 3000;
@@ -94,5 +105,75 @@ public class TopicQueueMappingManager extends ConfigManager {
return dataVersion;
}
+ public TopicQueueMappingContext buildTopicQueueMappingContext(TopicQueueRequestHeader requestHeader) {
+ return buildTopicQueueMappingContext(requestHeader, false, Long.MAX_VALUE);
+ }
+
+ //Do not return a null context
+ public TopicQueueMappingContext buildTopicQueueMappingContext(TopicQueueRequestHeader requestHeader, boolean selectOneWhenMiss, Long globalOffset) {
+ if (requestHeader.getPhysical() != null
+ && Boolean.TRUE.equals(requestHeader.getPhysical())) {
+ return new TopicQueueMappingContext(requestHeader.getTopic(), requestHeader.getQueueId(), null, null, null, null);
+ }
+ TopicQueueMappingDetail mappingDetail = getTopicQueueMapping(requestHeader.getTopic());
+ if (mappingDetail == null) {
+ //it is not static topic
+ return new TopicQueueMappingContext(requestHeader.getTopic(), requestHeader.getQueueId(), null, null, null, null);
+ }
+ //If not find mappingItem, it encounters some errors
+ Integer globalId = requestHeader.getQueueId();
+ if (globalId < 0 && !selectOneWhenMiss) {
+ return new TopicQueueMappingContext(requestHeader.getTopic(), globalId, globalOffset, mappingDetail, null, null);
+ }
+
+ if (globalId < 0) {
+ try {
+ if (!mappingDetail.getHostedQueues().isEmpty()) {
+ //do not check
+ globalId = mappingDetail.getHostedQueues().keySet().iterator().next();
+ }
+ } catch (Throwable ignored) {
+ }
+ }
+ if (globalId < 0) {
+ return new TopicQueueMappingContext(requestHeader.getTopic(), globalId, globalOffset, mappingDetail, null, null);
+ }
+
+ ImmutableList<LogicQueueMappingItem> mappingItemList = null;
+ LogicQueueMappingItem mappingItem = null;
+
+ if (globalOffset == null
+ || Long.MAX_VALUE == globalOffset) {
+ mappingItemList = mappingDetail.getMappingInfo(globalId);
+ if (mappingItemList != null
+ && mappingItemList.size() > 0) {
+ mappingItem = mappingItemList.get(mappingItemList.size() - 1);
+ }
+ } else {
+ mappingItemList = mappingDetail.getMappingInfo(globalId);
+ mappingItem = TopicQueueMappingDetail.findLogicQueueMappingItem(mappingItemList, globalOffset);
+ }
+ return new TopicQueueMappingContext(requestHeader.getTopic(), globalId, globalOffset, mappingDetail, mappingItemList, mappingItem);
+ }
+
+
+ public RemotingCommand rewriteRequestForStaticTopic(TopicQueueRequestHeader requestHeader, TopicQueueMappingContext mappingContext) {
+ try {
+ if (mappingContext.getMappingDetail() == null) {
+ return null;
+ }
+ TopicQueueMappingDetail mappingDetail = mappingContext.getMappingDetail();
+ LogicQueueMappingItem mappingItem = mappingContext.getMappingItem();
+ if (mappingItem == null
+ || !mappingDetail.getBname().equals(mappingItem.getBname())) {
+ return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d does not exit in request process of current broker %s", requestHeader.getTopic(), requestHeader.getQueueId(), mappingDetail.getBname()));
+ }
+ requestHeader.setQueueId(mappingItem.getQueueId());
+ return null;
+ } catch (Throwable t) {
+ return buildErrorResponse(ResponseCode.SYSTEM_ERROR, t.getMessage());
+ }
+ }
+
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingContext.java b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingContext.java
index 50ac43e..ca759a1 100644
--- a/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingContext.java
+++ b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingContext.java
@@ -16,21 +16,31 @@
*/
package org.apache.rocketmq.common;
+import com.google.common.collect.ImmutableList;
+
public class TopicQueueMappingContext {
private String topic;
private Integer globalId;
private Long globalOffset;
private TopicQueueMappingDetail mappingDetail;
+ private ImmutableList<LogicQueueMappingItem> mappingItemList;
private LogicQueueMappingItem mappingItem;
- public TopicQueueMappingContext(String topic, Integer globalId, Long globalOffset, TopicQueueMappingDetail mappingDetail, LogicQueueMappingItem mappingItem) {
+ public TopicQueueMappingContext(String topic, Integer globalId, Long globalOffset, TopicQueueMappingDetail mappingDetail, ImmutableList<LogicQueueMappingItem> mappingItemList, LogicQueueMappingItem mappingItem) {
this.topic = topic;
this.globalId = globalId;
this.globalOffset = globalOffset;
this.mappingDetail = mappingDetail;
+ this.mappingItemList = mappingItemList;
this.mappingItem = mappingItem;
}
+ public boolean checkIfAsPhysical() {
+ return mappingDetail == null
+ || mappingItemList == null
+ || (mappingItemList.size() == 1 && mappingItemList.get(0).getLogicOffset() == 0);
+ }
+
public String getTopic() {
return topic;
}
@@ -63,6 +73,14 @@ public class TopicQueueMappingContext {
this.mappingDetail = mappingDetail;
}
+ public ImmutableList<LogicQueueMappingItem> getMappingItemList() {
+ return mappingItemList;
+ }
+
+ public void setMappingItemList(ImmutableList<LogicQueueMappingItem> mappingItemList) {
+ this.mappingItemList = mappingItemList;
+ }
+
public LogicQueueMappingItem getMappingItem() {
return mappingItem;
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingDetail.java b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingDetail.java
index 8d5c8e8..75f2c52 100644
--- a/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingDetail.java
+++ b/common/src/main/java/org/apache/rocketmq/common/TopicQueueMappingDetail.java
@@ -26,6 +26,7 @@ import java.util.concurrent.ConcurrentMap;
public class TopicQueueMappingDetail extends TopicQueueMappingInfo {
// the mapping info in current broker, do not register to nameserver
+ // make sure this value is not null
private ConcurrentMap<Integer/*global id*/, ImmutableList<LogicQueueMappingItem>> hostedQueues = new ConcurrentHashMap<Integer, ImmutableList<LogicQueueMappingItem>>();
public TopicQueueMappingDetail(String topic, int totalQueues, String bname) {
@@ -77,30 +78,14 @@ public class TopicQueueMappingDetail extends TopicQueueMappingInfo {
return tmpIdMap;
}
- public List<LogicQueueMappingItem> getMappingInfo(Integer globalId) {
+ public ImmutableList<LogicQueueMappingItem> getMappingInfo(Integer globalId) {
return hostedQueues.get(globalId);
}
- public long computeStaticQueueOffset(Integer globalId, long physicalLogicOffset) {
- List<LogicQueueMappingItem> mappingItems = getMappingInfo(globalId);
- if (mappingItems == null
- || mappingItems.isEmpty()) {
- return -1;
- }
- if (bname.equals(mappingItems.get(mappingItems.size() - 1).getBname())) {
- return mappingItems.get(mappingItems.size() - 1).computeStaticQueueOffset(physicalLogicOffset);
- }
- //Consider the "switch" process, reduce the error
- if (mappingItems.size() >= 2
- && bname.equals(mappingItems.get(mappingItems.size() - 2).getBname())) {
- return mappingItems.get(mappingItems.size() - 2).computeStaticQueueOffset(physicalLogicOffset);
- }
- return -1;
- }
- public LogicQueueMappingItem findLogicQueueMappingItem(Integer globalId, long logicOffset) {
- List<LogicQueueMappingItem> mappingItems = getMappingInfo(globalId);
+
+ public static LogicQueueMappingItem findLogicQueueMappingItem(ImmutableList<LogicQueueMappingItem> mappingItems, long logicOffset) {
if (mappingItems == null
|| mappingItems.isEmpty()) {
return null;
@@ -143,4 +128,11 @@ public class TopicQueueMappingDetail extends TopicQueueMappingInfo {
public ConcurrentMap<Integer, ImmutableList<LogicQueueMappingItem>> getHostedQueues() {
return hostedQueues;
}
+
+ public boolean checkIfAsPhysical(Integer globalId) {
+ List<LogicQueueMappingItem> mappingItems = getMappingInfo(globalId);
+ return mappingItems == null
+ || (mappingItems.size() == 1
+ && mappingItems.get(0).getLogicOffset() == 0);
+ }
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageRequestHeader.java
index 1bce01f..5407964 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageRequestHeader.java
@@ -20,12 +20,12 @@
*/
package org.apache.rocketmq.common.protocol.header;
-import org.apache.rocketmq.remoting.RequestHeader;
+import org.apache.rocketmq.remoting.TopicQueueRequestHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.annotation.CFNullable;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
-public class PullMessageRequestHeader extends RequestHeader {
+public class PullMessageRequestHeader extends TopicQueueRequestHeader {
@CFNotNull
private String consumerGroup;
@CFNotNull
@@ -60,18 +60,22 @@ public class PullMessageRequestHeader extends RequestHeader {
this.consumerGroup = consumerGroup;
}
+ @Override
public String getTopic() {
return topic;
}
+ @Override
public void setTopic(String topic) {
this.topic = topic;
}
+ @Override
public Integer getQueueId() {
return queueId;
}
+ @Override
public void setQueueId(Integer queueId) {
this.queueId = queueId;
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/QueryConsumerOffsetRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/QueryConsumerOffsetRequestHeader.java
index 3b7f627..e4e132e 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/QueryConsumerOffsetRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/QueryConsumerOffsetRequestHeader.java
@@ -20,11 +20,11 @@
*/
package org.apache.rocketmq.common.protocol.header;
-import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.TopicQueueRequestHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
-public class QueryConsumerOffsetRequestHeader implements CommandCustomHeader {
+public class QueryConsumerOffsetRequestHeader extends TopicQueueRequestHeader {
@CFNotNull
private String consumerGroup;
@CFNotNull
@@ -44,18 +44,22 @@ public class QueryConsumerOffsetRequestHeader implements CommandCustomHeader {
this.consumerGroup = consumerGroup;
}
+ @Override
public String getTopic() {
return topic;
}
+ @Override
public void setTopic(String topic) {
this.topic = topic;
}
+ @Override
public Integer getQueueId() {
return queueId;
}
+ @Override
public void setQueueId(Integer queueId) {
this.queueId = queueId;
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeader.java
index f9dcbff..0ec9795 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeader.java
@@ -20,12 +20,12 @@
*/
package org.apache.rocketmq.common.protocol.header;
-import org.apache.rocketmq.remoting.RequestHeader;
+import org.apache.rocketmq.remoting.TopicQueueRequestHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.annotation.CFNullable;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
-public class SendMessageRequestHeader extends RequestHeader {
+public class SendMessageRequestHeader extends TopicQueueRequestHeader {
@CFNotNull
private String producerGroup;
@CFNotNull
@@ -64,10 +64,12 @@ public class SendMessageRequestHeader extends RequestHeader {
this.producerGroup = producerGroup;
}
+ @Override
public String getTopic() {
return topic;
}
+ @Override
public void setTopic(String topic) {
this.topic = topic;
}
@@ -88,10 +90,12 @@ public class SendMessageRequestHeader extends RequestHeader {
this.defaultTopicQueueNums = defaultTopicQueueNums;
}
+ @Override
public Integer getQueueId() {
return queueId;
}
+ @Override
public void setQueueId(Integer queueId) {
this.queueId = queueId;
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/UpdateConsumerOffsetRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/UpdateConsumerOffsetRequestHeader.java
index 3f44db6..e17fe36 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/UpdateConsumerOffsetRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/UpdateConsumerOffsetRequestHeader.java
@@ -20,11 +20,11 @@
*/
package org.apache.rocketmq.common.protocol.header;
-import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.TopicQueueRequestHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
-public class UpdateConsumerOffsetRequestHeader implements CommandCustomHeader {
+public class UpdateConsumerOffsetRequestHeader extends TopicQueueRequestHeader {
@CFNotNull
private String consumerGroup;
@CFNotNull
@@ -46,18 +46,22 @@ public class UpdateConsumerOffsetRequestHeader implements CommandCustomHeader {
this.consumerGroup = consumerGroup;
}
+ @Override
public String getTopic() {
return topic;
}
+ @Override
public void setTopic(String topic) {
this.topic = topic;
}
+ @Override
public Integer getQueueId() {
return queueId;
}
+ @Override
public void setQueueId(Integer queueId) {
this.queueId = queueId;
}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/TopicQueueRequestHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/TopicQueueRequestHeader.java
new file mode 100644
index 0000000..08c3fef
--- /dev/null
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/TopicQueueRequestHeader.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.remoting;
+
+public abstract class TopicQueueRequestHeader extends RequestHeader {
+ public abstract String getTopic();
+ public abstract void setTopic(String topic);
+ public abstract Integer getQueueId();
+ public abstract void setQueueId(Integer queueId);
+
+}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java
index 51b6194..34a1790 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java
@@ -110,6 +110,13 @@ public class RemotingCommand {
return createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR, "not set any response code", classHeader);
}
+ public static RemotingCommand buildErrorResponse(int code, String remark) {
+ final RemotingCommand response = RemotingCommand.createResponseCommand(null);
+ response.setCode(code);
+ response.setRemark(remark);
+ return response;
+ }
+
public static RemotingCommand createResponseCommand(int code, String remark,
Class<? extends CommandCustomHeader> classHeader) {
RemotingCommand cmd = new RemotingCommand();