You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by do...@apache.org on 2021/12/08 09:28:17 UTC
[rocketmq] 02/02: Finish the test for topicStats
This is an automated email from the ASF dual-hosted git repository.
dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit 48db31b4815af2d2a3c07c1944a69f572e4425d7
Author: dongeforever <do...@apache.org>
AuthorDate: Wed Dec 8 17:27:55 2021 +0800
Finish the test for topicStats
---
.../broker/processor/AdminBrokerProcessor.java | 97 ++++-
.../broker/processor/ConsumerManageProcessor.java | 12 +-
.../broker/processor/PullMessageProcessor.java | 16 +-
.../broker/topic/TopicQueueMappingManager.java | 34 +-
.../header/GetTopicStatsInfoRequestHeader.java | 5 +-
.../apache/rocketmq/common/rpc/RequestBuilder.java | 16 +-
.../apache/rocketmq/common/rpc/RpcClientImpl.java | 1 +
.../rocketmq/common/rpc/RpcRequestHeader.java | 32 +-
.../common/rpc/TopicQueueRequestHeader.java | 14 +-
...eRequestHeader.java => TopicRequestHeader.java} | 22 +-
.../rocketmq/test/statictopic/StaticTopicIT.java | 405 ++++++---------------
11 files changed, 278 insertions(+), 376 deletions(-)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index cca96f1..fb4a711 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -30,6 +30,7 @@ import org.apache.rocketmq.broker.filter.ConsumerFilterData;
import org.apache.rocketmq.broker.filter.ExpressionMessageFilter;
import org.apache.rocketmq.broker.transaction.queue.TransactionalMessageUtil;
import org.apache.rocketmq.common.AclConfig;
+import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.PlainAccessConfig;
@@ -104,6 +105,7 @@ import org.apache.rocketmq.common.protocol.header.ViewBrokerStatsDataRequestHead
import org.apache.rocketmq.common.protocol.header.filtersrv.RegisterFilterServerRequestHeader;
import org.apache.rocketmq.common.protocol.header.filtersrv.RegisterFilterServerResponseHeader;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
+import org.apache.rocketmq.common.rpc.RpcClient;
import org.apache.rocketmq.common.rpc.RpcClientUtils;
import org.apache.rocketmq.common.rpc.RpcException;
import org.apache.rocketmq.common.rpc.RpcRequest;
@@ -159,9 +161,13 @@ import static org.apache.rocketmq.remoting.protocol.RemotingCommand.buildErrorRe
public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private final BrokerController brokerController;
+ private final RpcClient rpcClient;
+ private final BrokerConfig brokerConfig;
public AdminBrokerProcessor(final BrokerController brokerController) {
this.brokerController = brokerController;
+ this.brokerConfig = brokerController.getBrokerConfig();
+ this.rpcClient = brokerController.getBrokerOuterAPI().getRpcClient();
}
@Override
@@ -650,7 +656,7 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
break;
}
} else {
- requestHeader.setPhysical(true);
+ requestHeader.setLo(false);
requestHeader.setTimestamp(timestamp);
requestHeader.setQueueId(item.getQueueId());
requestHeader.setBname(item.getBname());
@@ -720,7 +726,7 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
assert maxItem != null;
assert maxItem.getLogicOffset() >= 0;
requestHeader.setBname(maxItem.getBname());
- requestHeader.setPhysical(true);
+ requestHeader.setLo(false);
requestHeader.setQueueId(mappingItem.getQueueId());
long maxPhysicalOffset = Long.MAX_VALUE;
@@ -770,7 +776,7 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
return response;
}
- private CompletableFuture<RpcResponse> handleGetMinOffsetForStaticTopic(GetMinOffsetRequestHeader requestHeader, TopicQueueMappingContext mappingContext) {
+ private CompletableFuture<RpcResponse> handleGetMinOffsetForStaticTopic(RpcRequest request, TopicQueueMappingContext mappingContext) {
if (mappingContext.getMappingDetail() == null) {
return null;
}
@@ -778,14 +784,14 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
if (!mappingContext.isLeader()) {
//this may not
return CompletableFuture.completedFuture(new RpcResponse(new RpcException(ResponseCode.NOT_LEADER_FOR_QUEUE,
- String.format("%s-%d is not leader in current broker %s", mappingContext.getTopic(), mappingContext.getGlobalId(), mappingDetail.getBname()))));
+ String.format("%s-%d is not leader in broker %s, request code %d", mappingContext.getTopic(), mappingContext.getGlobalId(), mappingDetail.getBname(), request.getCode()))));
};
-
+ GetMinOffsetRequestHeader requestHeader = (GetMinOffsetRequestHeader) request.getHeader();
LogicQueueMappingItem mappingItem = TopicQueueMappingUtils.findLogicQueueMappingItem(mappingContext.getMappingItemList(), 0L, true);
assert mappingItem != null;
try {
requestHeader.setBname(mappingItem.getBname());
- requestHeader.setPhysical(true);
+ requestHeader.setLo(false);
requestHeader.setQueueId(mappingItem.getQueueId());
long physicalOffset;
//run in local
@@ -815,7 +821,7 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
assert request.getCode() == RequestCode.GET_MIN_OFFSET;
GetMinOffsetRequestHeader requestHeader = (GetMinOffsetRequestHeader) request.getHeader();
TopicQueueMappingContext mappingContext = this.brokerController.getTopicQueueMappingManager().buildTopicQueueMappingContext(requestHeader, false);
- CompletableFuture<RpcResponse> rewriteResult = handleGetMinOffsetForStaticTopic(requestHeader, mappingContext);
+ CompletableFuture<RpcResponse> rewriteResult = handleGetMinOffsetForStaticTopic(request, mappingContext);
if (rewriteResult != null) {
return rewriteResult;
}
@@ -851,7 +857,7 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
assert mappingItem != null;
try {
requestHeader.setBname(mappingItem.getBname());
- requestHeader.setPhysical(true);
+ requestHeader.setLo(false);
RpcRequest rpcRequest = new RpcRequest(RequestCode.GET_EARLIEST_MSG_STORETIME, requestHeader, null);
//TODO check if it is in current broker
RpcResponse rpcResponse = this.brokerController.getBrokerOuterAPI().getRpcClient().invoke(rpcRequest, this.brokerController.getBrokerConfig().getForwardTimeout()).get();
@@ -1006,6 +1012,70 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
return response;
}
+ private RpcResponse handleGetTopicStatsInfoForStaticTopic(RpcRequest request, TopicQueueMappingContext mappingContext) {
+ try {
+ assert request.getCode() == RequestCode.GET_TOPIC_STATS_INFO;
+ if (mappingContext.getMappingDetail() == null) {
+ return null;
+ }
+ final GetTopicStatsInfoRequestHeader requestHeader = (GetTopicStatsInfoRequestHeader) request.getHeader();
+ String topic = requestHeader.getTopic();
+ TopicQueueMappingDetail mappingDetail = mappingContext.getMappingDetail();
+ Map<Integer, LogicQueueMappingItem[]> qidItemMap = new HashMap<>();
+ Set<String> brokers = new HashSet<>();
+ mappingDetail.getHostedQueues().forEach((qid, items) -> {
+ if (TopicQueueMappingUtils.checkIfLeader(items, mappingDetail)) {
+ LogicQueueMappingItem[] itemPair = new LogicQueueMappingItem[2];
+ itemPair[0] = TopicQueueMappingUtils.findLogicQueueMappingItem(items, 0, true);
+ itemPair[1] = TopicQueueMappingUtils.findLogicQueueMappingItem(items, Long.MAX_VALUE, true);
+ assert itemPair[0] != null && itemPair[1] != null;
+ qidItemMap.put(qid, itemPair);
+ brokers.add(itemPair[0].getBname());
+ brokers.add(itemPair[1].getBname());
+ }
+ });
+ Map<String, TopicStatsTable> statsTable = new HashMap<>();
+ for (String broker: brokers) {
+ GetTopicStatsInfoRequestHeader header = new GetTopicStatsInfoRequestHeader();
+ header.setTopic(topic);
+ header.setBname(broker);
+ header.setLo(false);
+ RpcRequest rpcRequest = new RpcRequest(RequestCode.GET_TOPIC_STATS_INFO, header, null);
+ RpcResponse rpcResponse = rpcClient.invoke(rpcRequest, brokerConfig.getForwardTimeout()).get();
+ if (rpcResponse.getException() != null) {
+ throw rpcResponse.getException();
+ }
+ statsTable.put(broker, (TopicStatsTable) rpcResponse.getBody());
+ }
+ TopicStatsTable topicStatsTable = new TopicStatsTable();
+ qidItemMap.forEach( (qid, itemPair) -> {
+ LogicQueueMappingItem minItem = itemPair[0];
+ LogicQueueMappingItem maxItem = itemPair[1];
+ TopicOffset minTopicOffset = statsTable.get(minItem.getBname()).getOffsetTable().get(new MessageQueue(topic, minItem.getBname(), minItem.getQueueId()));
+ TopicOffset maxTopicOffset = statsTable.get(maxItem.getBname()).getOffsetTable().get(new MessageQueue(topic, maxItem.getBname(), maxItem.getQueueId()));
+
+ assert minTopicOffset != null && maxTopicOffset != null;
+
+ long min = minItem.computeStaticQueueOffsetLoosely(minTopicOffset.getMinOffset());
+ if (min < 0)
+ min = 0;
+ long max = maxItem.computeStaticQueueOffsetStrictly(maxTopicOffset.getMaxOffset());
+ if (max < 0)
+ max = 0;
+ long timestamp = maxTopicOffset.getLastUpdateTimestamp();
+
+ TopicOffset topicOffset = new TopicOffset();
+ topicOffset.setMinOffset(min);
+ topicOffset.setMaxOffset(max);
+ topicOffset.setLastUpdateTimestamp(timestamp);
+ topicStatsTable.getOffsetTable().put(new MessageQueue(topic, MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME, qid), topicOffset);
+ });
+ return new RpcResponse(ResponseCode.SUCCESS, null, topicStatsTable);
+ } catch (Throwable t) {
+ return new RpcResponse(new RpcException(ResponseCode.SYSTEM_ERROR, t.getMessage(), t));
+ }
+ }
+
private RemotingCommand getTopicStatsInfo(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
@@ -1019,8 +1089,17 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
response.setRemark("topic[" + topic + "] not exist");
return response;
}
-
TopicStatsTable topicStatsTable = new TopicStatsTable();
+ TopicQueueMappingContext mappingContext = this.brokerController.getTopicQueueMappingManager().buildTopicQueueMappingContext(requestHeader, false);
+ RpcResponse rpcResponse = handleGetTopicStatsInfoForStaticTopic(new RpcRequest(RequestCode.GET_TOPIC_STATS_INFO, requestHeader, null), mappingContext);
+ if (rpcResponse != null) {
+ if (rpcResponse.getException() != null) {
+ return RpcClientUtils.createCommandForRpcResponse(rpcResponse);
+ } else {
+ topicStatsTable.getOffsetTable().putAll(((TopicStatsTable)rpcResponse.getBody()).getOffsetTable());
+ }
+ }
+
for (int i = 0; i < topicConfig.getWriteQueueNums(); i++) {
MessageQueue mq = new MessageQueue();
mq.setTopic(topic);
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java
index 0443396..66abe62 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java
@@ -153,11 +153,8 @@ public class ConsumerManageProcessor extends AsyncNettyRequestProcessor implemen
//by default, it is -1
long offset = -1;
//double read, first from leader, then from second leader
- for (int i = 1; i <= 2; i++) {
- if (itemList.size() - i < 0) {
- break;
- }
- LogicQueueMappingItem mappingItem = itemList.get(itemList.size() - i);
+ for (int i = itemList.size() - 1; i >= 0; i--) {
+ LogicQueueMappingItem mappingItem = itemList.get(i);
if (mappingItem.getBname().equals(mappingDetail.getBname())) {
offset = this.brokerController.getConsumerOffsetManager().queryOffset(requestHeader.getConsumerGroup(), requestHeader.getTopic(), mappingItem.getQueueId());
if (offset >= 0) {
@@ -170,7 +167,7 @@ public class ConsumerManageProcessor extends AsyncNettyRequestProcessor implemen
//maybe we need to reconstruct an object
requestHeader.setBname(mappingItem.getBname());
requestHeader.setQueueId(mappingItem.getQueueId());
- requestHeader.setPhysical(true);
+ requestHeader.setLo(false);
requestHeader.setSetZeroIfNotFound(false);
RpcRequest rpcRequest = new RpcRequest(RequestCode.QUERY_CONSUMER_OFFSET, requestHeader, null);
RpcResponse rpcResponse = this.brokerController.getBrokerOuterAPI().getRpcClient().invoke(rpcRequest, this.brokerController.getBrokerConfig().getForwardTimeout()).get();
@@ -179,7 +176,8 @@ public class ConsumerManageProcessor extends AsyncNettyRequestProcessor implemen
}
if (rpcResponse.getCode() == ResponseCode.SUCCESS) {
offset = ((QueryConsumerOffsetResponseHeader) rpcResponse.getHeader()).getOffset();
- } else if (rpcResponse.getCode() == ResponseCode.PULL_NOT_FOUND){
+ break;
+ } else if (rpcResponse.getCode() == ResponseCode.QUERY_NOT_FOUND){
continue;
} else {
//this should not happen
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
index 9b8135e..0f1ca23 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
@@ -134,18 +134,14 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements
&& requestHeader.getMaxMsgNums() != null) {
requestHeader.setMaxMsgNums((int) Math.min(mappingItem.getEndOffset() - mappingItem.getStartOffset(), requestHeader.getMaxMsgNums()));
}
- int sysFlag = requestHeader.getSysFlag();
- if (!mappingContext.isLeader()) {
- sysFlag = PullSysFlag.clearCommitOffsetFlag(sysFlag);
- requestHeader.setSysFlag(sysFlag);
- }
if (mappingDetail.getBname().equals(bname)) {
//just let it go, do the local pull process
return null;
}
- requestHeader.setPhysical(true);
+ int sysFlag = requestHeader.getSysFlag();
+ requestHeader.setLo(false);
requestHeader.setBname(bname);
sysFlag = PullSysFlag.clearSuspendFlag(sysFlag);
sysFlag = PullSysFlag.clearCommitOffsetFlag(sysFlag);
@@ -189,11 +185,13 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements
long minOffset = responseHeader.getMinOffset();
long maxOffset = responseHeader.getMaxOffset();
int responseCode = code;
+
//consider the following situations
// 1. read from slave, currently not supported
// 2. the middle queue is truncated because of deleting commitlog
if (code != ResponseCode.SUCCESS) {
//note the currentItem maybe both the leader and the earliest
+ boolean isRevised = false;
if (leaderItem.getGen() == currentItem.getGen()) {
//read the leader
if (requestOffset > maxOffset) {
@@ -228,6 +226,7 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements
//just move to another item
LogicQueueMappingItem nextItem = TopicQueueMappingUtils.findNext(mappingContext.getMappingItemList(), currentItem, true);
if (nextItem != null) {
+ isRevised = true;
currentItem = nextItem;
nextBeginOffset = currentItem.getStartOffset();
minOffset = currentItem.getStartOffset();
@@ -244,7 +243,8 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements
}
//read from the middle item, ignore the PULL_OFFSET_MOVED
- if (leaderItem.getGen() != currentItem.getGen()
+ if (!isRevised
+ && leaderItem.getGen() != currentItem.getGen()
&& earlistItem.getGen() != currentItem.getGen()) {
if (requestOffset < minOffset) {
nextBeginOffset = minOffset;
@@ -289,7 +289,6 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements
return null;
}
} catch (Throwable t) {
- t.printStackTrace();
return buildErrorResponse(ResponseCode.SYSTEM_ERROR, t.getMessage());
}
}
@@ -443,6 +442,7 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements
return response;
}
+
MessageFilter messageFilter;
if (this.brokerController.getBrokerConfig().isFilterSupportRetry()) {
messageFilter = new ExpressionForRetryMessageFilter(subscriptionData, consumerFilterData,
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
index e76d25e..6965053 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
@@ -30,6 +30,7 @@ import org.apache.rocketmq.common.protocol.header.GetTopicConfigRequestHeader;
import org.apache.rocketmq.common.rpc.RpcRequest;
import org.apache.rocketmq.common.rpc.RpcResponse;
import org.apache.rocketmq.common.rpc.TopicQueueRequestHeader;
+import org.apache.rocketmq.common.rpc.TopicRequestHeader;
import org.apache.rocketmq.common.statictopic.LogicQueueMappingItem;
import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping;
import org.apache.rocketmq.common.statictopic.TopicQueueMappingContext;
@@ -190,28 +191,37 @@ public class TopicQueueMappingManager extends ConfigManager {
return dataVersion;
}
- public TopicQueueMappingContext buildTopicQueueMappingContext(TopicQueueRequestHeader requestHeader) {
+ public TopicQueueMappingContext buildTopicQueueMappingContext(TopicRequestHeader requestHeader) {
return buildTopicQueueMappingContext(requestHeader, false);
}
//Do not return a null context
- public TopicQueueMappingContext buildTopicQueueMappingContext(TopicQueueRequestHeader requestHeader, boolean selectOneWhenMiss) {
- if (requestHeader.getPhysical() != null
- && Boolean.TRUE.equals(requestHeader.getPhysical())) {
- return new TopicQueueMappingContext(requestHeader.getTopic(), requestHeader.getQueueId(), null, null, null);
+ public TopicQueueMappingContext buildTopicQueueMappingContext(TopicRequestHeader requestHeader, boolean selectOneWhenMiss) {
+ //should disable logic queue explicitly, otherwise the old client may cause dirty data to newly created static topic
+ if (requestHeader.getLo() != null
+ && Boolean.FALSE.equals(requestHeader.getLo())) {
+ return new TopicQueueMappingContext(requestHeader.getTopic(), null, null, null, null);
}
- TopicQueueMappingDetail mappingDetail = getTopicQueueMapping(requestHeader.getTopic());
+ String topic = requestHeader.getTopic();
+ Integer globalId = null;
+ if (requestHeader instanceof TopicQueueRequestHeader) {
+ globalId = ((TopicQueueRequestHeader) requestHeader).getQueueId();
+ }
+
+ TopicQueueMappingDetail mappingDetail = getTopicQueueMapping(topic);
if (mappingDetail == null) {
//it is not static topic
- return new TopicQueueMappingContext(requestHeader.getTopic(), requestHeader.getQueueId(), null, null, null);
+ return new TopicQueueMappingContext(topic, null, null, null, null);
}
-
assert mappingDetail.getBname().equals(this.brokerController.getBrokerConfig().getBrokerName());
+ if (globalId == null) {
+ return new TopicQueueMappingContext(topic, null, mappingDetail, null, null);
+ }
+
//If not find mappingItem, it encounters some errors
- Integer globalId = requestHeader.getQueueId();
if (globalId < 0 && !selectOneWhenMiss) {
- return new TopicQueueMappingContext(requestHeader.getTopic(), globalId, mappingDetail, null, null);
+ return new TopicQueueMappingContext(topic, globalId, mappingDetail, null, null);
}
if (globalId < 0) {
@@ -224,7 +234,7 @@ public class TopicQueueMappingManager extends ConfigManager {
}
}
if (globalId < 0) {
- return new TopicQueueMappingContext(requestHeader.getTopic(), globalId, mappingDetail, null, null);
+ return new TopicQueueMappingContext(topic, globalId, mappingDetail, null, null);
}
List<LogicQueueMappingItem> mappingItemList = TopicQueueMappingDetail.getMappingInfo(mappingDetail, globalId);
@@ -233,7 +243,7 @@ public class TopicQueueMappingManager extends ConfigManager {
&& mappingItemList.size() > 0) {
leaderItem = mappingItemList.get(mappingItemList.size() - 1);
}
- return new TopicQueueMappingContext(requestHeader.getTopic(), globalId, mappingDetail, mappingItemList, leaderItem);
+ return new TopicQueueMappingContext(topic, globalId, mappingDetail, mappingItemList, leaderItem);
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetTopicStatsInfoRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetTopicStatsInfoRequestHeader.java
index 8e921b2..f98e150 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetTopicStatsInfoRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetTopicStatsInfoRequestHeader.java
@@ -17,12 +17,11 @@
package org.apache.rocketmq.common.protocol.header;
-import org.apache.rocketmq.common.rpc.RpcRequestHeader;
-import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.common.rpc.TopicRequestHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
-public class GetTopicStatsInfoRequestHeader extends RpcRequestHeader {
+public class GetTopicStatsInfoRequestHeader extends TopicRequestHeader {
@CFNotNull
private String topic;
diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/RequestBuilder.java b/common/src/main/java/org/apache/rocketmq/common/rpc/RequestBuilder.java
index 4b5c62b..f9478e4 100644
--- a/common/src/main/java/org/apache/rocketmq/common/rpc/RequestBuilder.java
+++ b/common/src/main/java/org/apache/rocketmq/common/rpc/RequestBuilder.java
@@ -25,7 +25,7 @@ public class RequestBuilder {
}
try {
RpcRequestHeader requestHeader = (RpcRequestHeader) requestHeaderClass.newInstance();
- requestHeader.setOneway(oneway);
+ requestHeader.setOway(oneway);
requestHeader.setBname(destBrokerName);
return requestHeader;
} catch (Throwable t) {
@@ -37,26 +37,26 @@ public class RequestBuilder {
return buildTopicQueueRequestHeader(requestCode, null, mq.getBrokerName(), mq.getTopic(), mq.getQueueId(), null);
}
- public static TopicQueueRequestHeader buildTopicQueueRequestHeader(int requestCode, MessageQueue mq, Boolean physical) {
- return buildTopicQueueRequestHeader(requestCode, null, mq.getBrokerName(), mq.getTopic(), mq.getQueueId(), physical);
+ public static TopicQueueRequestHeader buildTopicQueueRequestHeader(int requestCode, MessageQueue mq, Boolean logic) {
+ return buildTopicQueueRequestHeader(requestCode, null, mq.getBrokerName(), mq.getTopic(), mq.getQueueId(), logic);
}
- public static TopicQueueRequestHeader buildTopicQueueRequestHeader(int requestCode, Boolean oneway, MessageQueue mq, Boolean physical) {
- return buildTopicQueueRequestHeader(requestCode, oneway, mq.getBrokerName(), mq.getTopic(), mq.getQueueId(), physical);
+ public static TopicQueueRequestHeader buildTopicQueueRequestHeader(int requestCode, Boolean oneway, MessageQueue mq, Boolean logic) {
+ return buildTopicQueueRequestHeader(requestCode, oneway, mq.getBrokerName(), mq.getTopic(), mq.getQueueId(), logic);
}
- public static TopicQueueRequestHeader buildTopicQueueRequestHeader(int requestCode, Boolean oneway, String destBrokerName, String topic, int queueId, Boolean physical) {
+ public static TopicQueueRequestHeader buildTopicQueueRequestHeader(int requestCode, Boolean oneway, String destBrokerName, String topic, int queueId, Boolean logic) {
Class requestHeaderClass = requestCodeMap.get(requestCode);
if (requestHeaderClass == null) {
throw new UnsupportedOperationException("unknown " + requestCode);
}
try {
TopicQueueRequestHeader requestHeader = (TopicQueueRequestHeader) requestHeaderClass.newInstance();
- requestHeader.setOneway(oneway);
+ requestHeader.setOway(oneway);
requestHeader.setBname(destBrokerName);
requestHeader.setTopic(topic);
requestHeader.setQueueId(queueId);
- requestHeader.setPhysical(physical);
+ requestHeader.setLo(logic);
return requestHeader;
} catch (Throwable t) {
throw new RuntimeException(t);
diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java
index eaa22be..97879d1 100644
--- a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java
+++ b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java
@@ -214,6 +214,7 @@ public class RpcClientImpl implements RpcClient {
}
case ResponseCode.QUERY_NOT_FOUND: {
rpcResponsePromise.setSuccess(new RpcResponse(responseCommand.getCode(), null, null));
+ break;
}
default:{
rpcResponsePromise.setSuccess(new RpcResponse(new RpcException(responseCommand.getCode(), "unknown remote error")));
diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcRequestHeader.java
index 577865e..593df81 100644
--- a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcRequestHeader.java
@@ -20,13 +20,13 @@ import org.apache.rocketmq.remoting.CommandCustomHeader;
public abstract class RpcRequestHeader implements CommandCustomHeader {
//the namespace name
- protected String namespace;
+ protected String ns;
//if the data has been namespaced
- protected Boolean namespaced;
+ protected Boolean nsd;
//the abstract remote addr name, usually the physical broker name
protected String bname;
-
- protected Boolean oneway;
+ //oneway
+ protected Boolean oway;
public String getBname() {
return bname;
@@ -36,27 +36,27 @@ public abstract class RpcRequestHeader implements CommandCustomHeader {
this.bname = bname;
}
- public Boolean getOneway() {
- return oneway;
+ public Boolean getOway() {
+ return oway;
}
- public void setOneway(Boolean oneway) {
- this.oneway = oneway;
+ public void setOway(Boolean oway) {
+ this.oway = oway;
}
- public String getNamespace() {
- return namespace;
+ public String getNs() {
+ return ns;
}
- public void setNamespace(String namespace) {
- this.namespace = namespace;
+ public void setNs(String ns) {
+ this.ns = ns;
}
- public Boolean getNamespaced() {
- return namespaced;
+ public Boolean getNsd() {
+ return nsd;
}
- public void setNamespaced(Boolean namespaced) {
- this.namespaced = namespaced;
+ public void setNsd(Boolean nsd) {
+ this.nsd = nsd;
}
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/TopicQueueRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/rpc/TopicQueueRequestHeader.java
index 5d0a151..660f046 100644
--- a/common/src/main/java/org/apache/rocketmq/common/rpc/TopicQueueRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/rpc/TopicQueueRequestHeader.java
@@ -16,20 +16,8 @@
*/
package org.apache.rocketmq.common.rpc;
-public abstract class TopicQueueRequestHeader extends RpcRequestHeader {
- //Physical or Logical
- protected Boolean physical;
+public abstract class TopicQueueRequestHeader extends TopicRequestHeader {
- public Boolean getPhysical() {
- return physical;
- }
-
- public void setPhysical(Boolean physical) {
- this.physical = physical;
- }
-
- public abstract String getTopic();
- public abstract void setTopic(String topic);
public abstract Integer getQueueId();
public abstract void setQueueId(Integer queueId);
diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/TopicQueueRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/rpc/TopicRequestHeader.java
similarity index 71%
copy from common/src/main/java/org/apache/rocketmq/common/rpc/TopicQueueRequestHeader.java
copy to common/src/main/java/org/apache/rocketmq/common/rpc/TopicRequestHeader.java
index 5d0a151..a70cded 100644
--- a/common/src/main/java/org/apache/rocketmq/common/rpc/TopicQueueRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/rpc/TopicRequestHeader.java
@@ -16,21 +16,17 @@
*/
package org.apache.rocketmq.common.rpc;
-public abstract class TopicQueueRequestHeader extends RpcRequestHeader {
- //Physical or Logical
- protected Boolean physical;
-
- public Boolean getPhysical() {
- return physical;
- }
-
- public void setPhysical(Boolean physical) {
- this.physical = physical;
- }
+public abstract class TopicRequestHeader extends RpcRequestHeader {
+ //logical
+ protected Boolean lo;
public abstract String getTopic();
public abstract void setTopic(String topic);
- public abstract Integer getQueueId();
- public abstract void setQueueId(Integer queueId);
+ public Boolean getLo() {
+ return lo;
+ }
+ public void setLo(Boolean lo) {
+ this.lo = lo;
+ }
}
diff --git a/test/src/test/java/org/apache/rocketmq/test/statictopic/StaticTopicIT.java b/test/src/test/java/org/apache/rocketmq/test/statictopic/StaticTopicIT.java
index fdefb06..e89dc3a 100644
--- a/test/src/test/java/org/apache/rocketmq/test/statictopic/StaticTopicIT.java
+++ b/test/src/test/java/org/apache/rocketmq/test/statictopic/StaticTopicIT.java
@@ -1,9 +1,12 @@
package org.apache.rocketmq.test.statictopic;
-import com.alibaba.fastjson.JSON;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
import org.apache.log4j.Logger;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.admin.ConsumeStats;
+import org.apache.rocketmq.common.admin.TopicStatsTable;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.rpc.ClientMetadata;
@@ -86,71 +89,35 @@ public class StaticTopicIT extends BaseConf {
}
-
- @Test
- public void testCreateProduceConsumeStaticTopic() throws Exception {
- String topic = "static" + MQRandomUtils.getRandomTopic();
- RMQNormalProducer producer = getProducer(nsAddr, topic);
- RMQNormalConsumer consumer = getConsumer(nsAddr, topic, "*", new RMQNormalListener());
-
- int queueNum = 10;
- int msgEachQueue = 100;
- //create static topic
- Map<String, TopicConfigAndQueueMapping> localBrokerConfigMap = MQAdminTestUtils.createStaticTopic(topic, queueNum, getBrokers(), defaultMQAdminExt);
- //check the static topic config
- {
- Map<String, TopicConfigAndQueueMapping> remoteBrokerConfigMap = MQAdminUtils.examineTopicConfigAll(topic, defaultMQAdminExt);
- Assert.assertEquals(brokerNum, remoteBrokerConfigMap.size());
- for (Map.Entry<String, TopicConfigAndQueueMapping> entry: remoteBrokerConfigMap.entrySet()) {
- String broker = entry.getKey();
- TopicConfigAndQueueMapping configMapping = entry.getValue();
- TopicConfigAndQueueMapping localConfigMapping = localBrokerConfigMap.get(broker);
- Assert.assertNotNull(localConfigMapping);
- Assert.assertEquals(configMapping, localConfigMapping);
- }
- TopicQueueMappingUtils.checkNameEpochNumConsistence(topic, remoteBrokerConfigMap);
- Map<Integer, TopicQueueMappingOne> globalIdMap = TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<>(getMappingDetailFromConfig(remoteBrokerConfigMap.values())), false, true);
- Assert.assertEquals(queueNum, globalIdMap.size());
- }
- //check the route data
+ private void sendMessagesAndCheck(RMQNormalProducer producer, Set<String> targetBrokers, String topic, int queueNum, int msgEachQueue, int gen) throws Exception {
+ ClientMetadata clientMetadata = MQAdminUtils.getBrokerAndTopicMetadata(topic, defaultMQAdminExt);
List<MessageQueue> messageQueueList = producer.getMessageQueue();
Assert.assertEquals(queueNum, messageQueueList.size());
- producer.setDebug(true);
for (int i = 0; i < queueNum; i++) {
MessageQueue messageQueue = messageQueueList.get(i);
Assert.assertEquals(topic, messageQueue.getTopic());
- Assert.assertEquals(i, messageQueue.getQueueId());
Assert.assertEquals(MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME, messageQueue.getBrokerName());
+ Assert.assertEquals(i, messageQueue.getQueueId());
+ String destBrokerName = clientMetadata.getBrokerNameFromMessageQueue(messageQueue);
+ Assert.assertTrue(targetBrokers.contains(destBrokerName));
}
- //send and consume the msg
for(MessageQueue messageQueue: messageQueueList) {
producer.send(msgEachQueue, messageQueue);
}
+ Assert.assertEquals(0, producer.getSendErrorMsg().size());
//leave the time to build the cq
- Thread.sleep(500);
+ Thread.sleep(100);
for(MessageQueue messageQueue: messageQueueList) {
Assert.assertEquals(0, defaultMQAdminExt.minOffset(messageQueue));
- Assert.assertEquals(msgEachQueue, defaultMQAdminExt.maxOffset(messageQueue));
+ Assert.assertEquals(msgEachQueue + gen * TopicQueueMappingUtils.DEFAULT_BLOCK_SEQ_SIZE, defaultMQAdminExt.maxOffset(messageQueue));
}
- Assert.assertEquals(msgEachQueue * queueNum, producer.getAllOriginMsg().size());
- Assert.assertEquals(0, producer.getSendErrorMsg().size());
-
- consumer.getListener().waitForMessageConsume(producer.getAllMsgBody(), 3000);
- assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
- consumer.getListener().getAllMsgBody()))
- .containsExactlyElementsIn(producer.getAllMsgBody());
- Map<Integer, List<MessageExt>> messagesByQueue = computeMessageByQueue(consumer.getListener().getAllOriginMsg());
- Assert.assertEquals(queueNum, messagesByQueue.size());
- for (int i = 0; i < queueNum; i++) {
- List<MessageExt> messageExts = messagesByQueue.get(i);
- Assert.assertEquals(msgEachQueue, messageExts.size());
- for (int j = 0; j < msgEachQueue; j++) {
- Assert.assertEquals(j, messageExts.get(j).getQueueOffset());
- }
+ TopicStatsTable topicStatsTable = defaultMQAdminExt.examineTopicStats(topic);
+ for(MessageQueue messageQueue: messageQueueList) {
+ Assert.assertEquals(0, topicStatsTable.getOffsetTable().get(messageQueue).getMinOffset());
+ Assert.assertEquals(msgEachQueue + gen * TopicQueueMappingUtils.DEFAULT_BLOCK_SEQ_SIZE, topicStatsTable.getOffsetTable().get(messageQueue).getMaxOffset());
}
}
-
private Map<Integer, List<MessageExt>> computeMessageByQueue(Collection<Object> msgs) {
Map<Integer, List<MessageExt>> messagesByQueue = new HashMap<>();
for (Object object : msgs) {
@@ -171,217 +138,133 @@ public class StaticTopicIT extends BaseConf {
return messagesByQueue;
}
- @Test
- public void testDoubleReadCheckConsumerOffset() throws Exception {
- String topic = "static" + MQRandomUtils.getRandomTopic();
- String group = initConsumerGroup();
- RMQNormalProducer producer = getProducer(nsAddr, topic);
+ private void consumeMessagesAndCheck(RMQNormalProducer producer, RMQNormalConsumer consumer, String topic, int queueNum, int msgEachQueue, int startGen, int genNum) {
+ consumer.getListener().waitForMessageConsume(producer.getAllMsgBody(), 30000);
+ /*System.out.println("produce:" + producer.getAllMsgBody().size());
+ System.out.println("consume:" + consumer.getListener().getAllMsgBody().size());*/
- RMQNormalConsumer consumer = getConsumer(nsAddr, group, topic, "*", new RMQNormalListener());
-
- //System.out.printf("Group:%s\n", consumer.getConsumerGroup());
- //System.out.printf("Topic:%s\n", topic);
-
- int queueNum = 10;
- int msgEachQueue = 100;
- //create static topic
- {
- Set<String> targetBrokers = new HashSet<>();
- targetBrokers.add(broker1Name);
- MQAdminTestUtils.createStaticTopic(topic, queueNum, targetBrokers, defaultMQAdminExt);
- }
- //produce the messages
- {
- List<MessageQueue> messageQueueList = producer.getMessageQueue();
- for(MessageQueue messageQueue: messageQueueList) {
- producer.send(msgEachQueue, messageQueue);
- }
- Assert.assertEquals(0, producer.getSendErrorMsg().size());
- Assert.assertEquals(msgEachQueue * queueNum, producer.getAllMsgBody().size());
- }
-
- consumer.getListener().waitForMessageConsume(producer.getAllMsgBody(), 3000);
assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
consumer.getListener().getAllMsgBody()))
.containsExactlyElementsIn(producer.getAllMsgBody());
- producer.shutdown();
- consumer.shutdown();
+ Map<Integer, List<MessageExt>> messagesByQueue = computeMessageByQueue(consumer.getListener().getAllOriginMsg());
+ Assert.assertEquals(queueNum, messagesByQueue.size());
+ for (int i = 0; i < queueNum; i++) {
+ List<MessageExt> messageExts = messagesByQueue.get(i);
+ /*for (MessageExt messageExt:messageExts) {
+ System.out.printf("%d %d\n", messageExt.getQueueId(), messageExt.getQueueOffset());
+ }*/
+ int totalEachQueue = msgEachQueue * genNum;
+ Assert.assertEquals(totalEachQueue, messageExts.size());
+ for (int j = 0; j < totalEachQueue; j++) {
+ MessageExt messageExt = messageExts.get(j);
+ int currGen = startGen + j / msgEachQueue;
+ Assert.assertEquals(topic, messageExt.getTopic());
+ Assert.assertEquals(MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME, messageExt.getBrokerName());
+ Assert.assertEquals(i, messageExt.getQueueId());
+ Assert.assertEquals((j % msgEachQueue) + currGen * TopicQueueMappingUtils.DEFAULT_BLOCK_SEQ_SIZE, messageExt.getQueueOffset());
+ }
+ }
+ }
- //remapping the static topic
- {
- Set<String> targetBrokers = new HashSet<>();
- targetBrokers.add(broker2Name);
- MQAdminTestUtils.remappingStaticTopic(topic, targetBrokers, defaultMQAdminExt);
- }
- //make the metadata
- Thread.sleep(500);
- //System.out.printf("Group:%s\n", consumer.getConsumerGroup());
+ @Test
+ public void testCreateProduceConsumeStaticTopic() throws Exception {
+ String topic = "static" + MQRandomUtils.getRandomTopic();
+ RMQNormalProducer producer = getProducer(nsAddr, topic);
+ RMQNormalConsumer consumer = getConsumer(nsAddr, topic, "*", new RMQNormalListener());
+ int queueNum = 10;
+ int msgEachQueue = 100;
+ //create static topic
+ Map<String, TopicConfigAndQueueMapping> localBrokerConfigMap = MQAdminTestUtils.createStaticTopic(topic, queueNum, getBrokers(), defaultMQAdminExt);
+ //check the static topic config
{
- producer = getProducer(nsAddr, topic);
- ClientMetadata clientMetadata = MQAdminUtils.getBrokerAndTopicMetadata(topic, defaultMQAdminExt);
- //just refresh the metadata
- List<MessageQueue> messageQueueList = producer.getMessageQueue();
- for(MessageQueue messageQueue: messageQueueList) {
- producer.send(msgEachQueue, messageQueue);
- Assert.assertEquals(broker2Name, clientMetadata.getBrokerNameFromMessageQueue(messageQueue));
- }
- Assert.assertEquals(0, producer.getSendErrorMsg().size());
- Assert.assertEquals(msgEachQueue * queueNum, producer.getAllMsgBody().size());
- for(MessageQueue messageQueue: messageQueueList) {
- Assert.assertEquals(0, defaultMQAdminExt.minOffset(messageQueue));
- Assert.assertEquals(msgEachQueue + TopicQueueMappingUtils.DEFAULT_BLOCK_SEQ_SIZE, defaultMQAdminExt.maxOffset(messageQueue));
- }
- //leave the time to build the cq
- Thread.sleep(100);
- }
- {
- consumer = getConsumer(nsAddr, group, topic, "*", new RMQNormalListener());
- consumer.getListener().waitForMessageConsume(producer.getAllMsgBody(), 6000);
- //System.out.printf("Consume %d\n", consumer.getListener().getAllMsgBody().size());
- assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
- consumer.getListener().getAllMsgBody()))
- .containsExactlyElementsIn(producer.getAllMsgBody());
-
- Map<Integer, List<MessageExt>> messagesByQueue = computeMessageByQueue(consumer.getListener().getAllOriginMsg());
-
- Assert.assertEquals(queueNum, messagesByQueue.size());
- for (int i = 0; i < queueNum; i++) {
- List<MessageExt> messageExts = messagesByQueue.get(i);
- Assert.assertEquals(msgEachQueue, messageExts.size());
- for (int j = 0; j < msgEachQueue; j++) {
- Assert.assertEquals(j + TopicQueueMappingUtils.DEFAULT_BLOCK_SEQ_SIZE, messageExts.get(j).getQueueOffset());
- }
+ Map<String, TopicConfigAndQueueMapping> remoteBrokerConfigMap = MQAdminUtils.examineTopicConfigAll(topic, defaultMQAdminExt);
+ Assert.assertEquals(brokerNum, remoteBrokerConfigMap.size());
+ for (Map.Entry<String, TopicConfigAndQueueMapping> entry: remoteBrokerConfigMap.entrySet()) {
+ String broker = entry.getKey();
+ TopicConfigAndQueueMapping configMapping = entry.getValue();
+ TopicConfigAndQueueMapping localConfigMapping = localBrokerConfigMap.get(broker);
+ Assert.assertNotNull(localConfigMapping);
+ Assert.assertEquals(configMapping, localConfigMapping);
}
+ TopicQueueMappingUtils.checkNameEpochNumConsistence(topic, remoteBrokerConfigMap);
+ Map<Integer, TopicQueueMappingOne> globalIdMap = TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<>(getMappingDetailFromConfig(remoteBrokerConfigMap.values())), false, true);
+ Assert.assertEquals(queueNum, globalIdMap.size());
}
+ //send and check
+ sendMessagesAndCheck(producer, getBrokers(), topic, queueNum, msgEachQueue, 0);
+ //consume and check
+ consumeMessagesAndCheck(producer, consumer, topic, queueNum, msgEachQueue, 0, 1);
}
+
@Test
public void testRemappingProduceConsumeStaticTopic() throws Exception {
String topic = "static" + MQRandomUtils.getRandomTopic();
RMQNormalProducer producer = getProducer(nsAddr, topic);
RMQNormalConsumer consumer = getConsumer(nsAddr, topic, "*", new RMQNormalListener());
-
- int queueNum = 10;
+ int queueNum = 1;
int msgEachQueue = 100;
- //create static topic
+ //create send consume
{
- Set<String> targetBrokers = new HashSet<>();
- targetBrokers.add(broker1Name);
+ Set<String> targetBrokers = ImmutableSet.of(broker1Name);
MQAdminTestUtils.createStaticTopic(topic, queueNum, targetBrokers, defaultMQAdminExt);
+ sendMessagesAndCheck(producer, targetBrokers, topic, queueNum, msgEachQueue, 0);
+ consumeMessagesAndCheck(producer, consumer, topic, queueNum, msgEachQueue, 0, 1);
}
- //System.out.printf("%s %s\n", broker1Name, clientMetadata.findMasterBrokerAddr(broker1Name));
- //System.out.printf("%s %s\n", broker2Name, clientMetadata.findMasterBrokerAddr(broker2Name));
-
- //produce the messages
- {
- List<MessageQueue> messageQueueList = producer.getMessageQueue();
- for (int i = 0; i < queueNum; i++) {
- MessageQueue messageQueue = messageQueueList.get(i);
- Assert.assertEquals(i, messageQueue.getQueueId());
- Assert.assertEquals(MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME, messageQueue.getBrokerName());
- }
- for(MessageQueue messageQueue: messageQueueList) {
- producer.send(msgEachQueue, messageQueue);
- }
- Assert.assertEquals(0, producer.getSendErrorMsg().size());
- //leave the time to build the cq
- Thread.sleep(100);
- for(MessageQueue messageQueue: messageQueueList) {
- //Assert.assertEquals(0, defaultMQAdminExt.minOffset(messageQueue));
- Assert.assertEquals(msgEachQueue, defaultMQAdminExt.maxOffset(messageQueue));
- }
- }
-
- consumer.getListener().waitForMessageConsume(producer.getAllMsgBody(), 3000);
- assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
- consumer.getListener().getAllMsgBody()))
- .containsExactlyElementsIn(producer.getAllMsgBody());
-
+ System.out.println("=============================================================");
//remapping the static topic
{
- Set<String> targetBrokers = new HashSet<>();
- targetBrokers.add(broker2Name);
+ Set<String> targetBrokers = ImmutableSet.of(broker2Name);
MQAdminTestUtils.remappingStaticTopic(topic, targetBrokers, defaultMQAdminExt);
Map<String, TopicConfigAndQueueMapping> remoteBrokerConfigMap = MQAdminUtils.examineTopicConfigAll(topic, defaultMQAdminExt);
-
TopicQueueMappingUtils.checkNameEpochNumConsistence(topic, remoteBrokerConfigMap);
Map<Integer, TopicQueueMappingOne> globalIdMap = TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<>(getMappingDetailFromConfig(remoteBrokerConfigMap.values())), false, true);
Assert.assertEquals(queueNum, globalIdMap.size());
for (TopicQueueMappingOne mappingOne: globalIdMap.values()) {
Assert.assertEquals(broker2Name, mappingOne.getBname());
+ Assert.assertEquals(TopicQueueMappingUtils.DEFAULT_BLOCK_SEQ_SIZE, mappingOne.getItems().get(mappingOne.getItems().size() - 1).getLogicOffset());
}
- }
- //leave the time to refresh the metadata
- Thread.sleep(500);
- producer.setDebug(true);
- {
- ClientMetadata clientMetadata = MQAdminUtils.getBrokerAndTopicMetadata(topic, defaultMQAdminExt);
- List<MessageQueue> messageQueueList = producer.getMessageQueue();
- for (int i = 0; i < queueNum; i++) {
- MessageQueue messageQueue = messageQueueList.get(i);
- Assert.assertEquals(i, messageQueue.getQueueId());
- Assert.assertEquals(MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME, messageQueue.getBrokerName());
- String destBrokerName = clientMetadata.getBrokerNameFromMessageQueue(messageQueue);
- Assert.assertEquals(destBrokerName, broker2Name);
- }
-
- for(MessageQueue messageQueue: messageQueueList) {
- producer.send(msgEachQueue, messageQueue);
- }
- Assert.assertEquals(0, producer.getSendErrorMsg().size());
- //leave the time to build the cq
- Thread.sleep(100);
- for(MessageQueue messageQueue: messageQueueList) {
- Assert.assertEquals(0, defaultMQAdminExt.minOffset(messageQueue));
- Assert.assertEquals(msgEachQueue + TopicQueueMappingUtils.DEFAULT_BLOCK_SEQ_SIZE, defaultMQAdminExt.maxOffset(messageQueue));
- }
- }
- {
- consumer.getListener().waitForMessageConsume(producer.getAllMsgBody(), 30000);
- assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
- consumer.getListener().getAllMsgBody()))
- .containsExactlyElementsIn(producer.getAllMsgBody());
- Map<Integer, List<MessageExt>> messagesByQueue = computeMessageByQueue(consumer.getListener().getAllOriginMsg());
- Assert.assertEquals(queueNum, messagesByQueue.size());
- for (int i = 0; i < queueNum; i++) {
- List<MessageExt> messageExts = messagesByQueue.get(i);
- Assert.assertEquals(msgEachQueue * 2, messageExts.size());
- for (int j = 0; j < msgEachQueue; j++) {
- Assert.assertEquals(j, messageExts.get(j).getQueueOffset());
- }
- for (int j = msgEachQueue; j < msgEachQueue * 2; j++) {
- Assert.assertEquals(j + TopicQueueMappingUtils.DEFAULT_BLOCK_SEQ_SIZE - msgEachQueue, messageExts.get(j).getQueueOffset());
- }
- }
+ Thread.sleep(500);
+ sendMessagesAndCheck(producer, targetBrokers, topic, queueNum, msgEachQueue, 1);
+ consumeMessagesAndCheck(producer, consumer, topic, queueNum, msgEachQueue, 0, 2);
}
}
- public void sendMessagesAndCheck(RMQNormalProducer producer, String broker, String topic, int queueNum, int msgEachQueue, long baseOffset) throws Exception {
- ClientMetadata clientMetadata = MQAdminUtils.getBrokerAndTopicMetadata(topic, defaultMQAdminExt);
- List<MessageQueue> messageQueueList = producer.getMessageQueue();
- Assert.assertEquals(queueNum, messageQueueList.size());
- for (int i = 0; i < queueNum; i++) {
- MessageQueue messageQueue = messageQueueList.get(i);
- Assert.assertEquals(i, messageQueue.getQueueId());
- Assert.assertEquals(MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME, messageQueue.getBrokerName());
- String destBrokerName = clientMetadata.getBrokerNameFromMessageQueue(messageQueue);
- Assert.assertEquals(destBrokerName, broker);
- }
+ @Test
+ public void testDoubleReadCheckConsumerOffset() throws Exception {
+ String topic = "static" + MQRandomUtils.getRandomTopic();
+ String group = initConsumerGroup();
+ RMQNormalProducer producer = getProducer(nsAddr, topic);
+ RMQNormalConsumer consumer = getConsumer(nsAddr, group, topic, "*", new RMQNormalListener());
- for(MessageQueue messageQueue: messageQueueList) {
- producer.send(msgEachQueue, messageQueue);
+ int queueNum = 10;
+ int msgEachQueue = 100;
+ //create static topic
+ {
+ Set<String> targetBrokers = ImmutableSet.of(broker1Name);
+ MQAdminTestUtils.createStaticTopic(topic, queueNum, targetBrokers, defaultMQAdminExt);
+ sendMessagesAndCheck(producer, targetBrokers, topic, queueNum, msgEachQueue, 0);
+ consumeMessagesAndCheck(producer, consumer, topic, queueNum, msgEachQueue, 0, 1);
}
- Assert.assertEquals(0, producer.getSendErrorMsg().size());
- //leave the time to build the cq
- Thread.sleep(100);
- for(MessageQueue messageQueue: messageQueueList) {
- Assert.assertEquals(0, defaultMQAdminExt.minOffset(messageQueue));
- Assert.assertEquals(msgEachQueue + baseOffset, defaultMQAdminExt.maxOffset(messageQueue));
+ producer.shutdown();
+ consumer.shutdown();
+ //use a new producer
+ producer = getProducer(nsAddr, topic);
+
+ List<String> brokers = ImmutableList.of(broker2Name, broker3Name, broker1Name);
+ for (int i = 0; i < brokers.size(); i++) {
+ Set<String> targetBrokers = ImmutableSet.of(brokers.get(i));
+ MQAdminTestUtils.remappingStaticTopic(topic, targetBrokers, defaultMQAdminExt);
+ //make the metadata
+ Thread.sleep(500);
+ sendMessagesAndCheck(producer, targetBrokers, topic, queueNum, msgEachQueue, i + 1);
}
+ consumer = getConsumer(nsAddr, group, topic, "*", new RMQNormalListener());
+ consumeMessagesAndCheck(producer, consumer, topic, queueNum, msgEachQueue, 1, brokers.size());
}
@@ -393,32 +276,29 @@ public class StaticTopicIT extends BaseConf {
int msgEachQueue = 100;
//create to broker1Name
{
- Set<String> targetBrokers = new HashSet<>();
- targetBrokers.add(broker1Name);
+ Set<String> targetBrokers = ImmutableSet.of(broker1Name);
MQAdminTestUtils.createStaticTopic(topic, queueNum, targetBrokers, defaultMQAdminExt);
//leave the time to refresh the metadata
Thread.sleep(500);
- sendMessagesAndCheck(producer, broker1Name, topic, queueNum, msgEachQueue, 0L);
+ sendMessagesAndCheck(producer, targetBrokers, topic, queueNum, msgEachQueue, 0);
}
//remapping to broker2Name
{
- Set<String> targetBrokers = new HashSet<>();
- targetBrokers.add(broker2Name);
+ Set<String> targetBrokers = ImmutableSet.of(broker2Name);
MQAdminTestUtils.remappingStaticTopic(topic, targetBrokers, defaultMQAdminExt);
//leave the time to refresh the metadata
Thread.sleep(500);
- sendMessagesAndCheck(producer, broker2Name, topic, queueNum, msgEachQueue, TopicQueueMappingUtils.DEFAULT_BLOCK_SEQ_SIZE);
+ sendMessagesAndCheck(producer, targetBrokers, topic, queueNum, msgEachQueue, 1);
}
//remapping to broker3Name
{
- Set<String> targetBrokers = new HashSet<>();
- targetBrokers.add(broker3Name);
+ Set<String> targetBrokers = ImmutableSet.of(broker3Name);
MQAdminTestUtils.remappingStaticTopic(topic, targetBrokers, defaultMQAdminExt);
//leave the time to refresh the metadata
Thread.sleep(500);
- sendMessagesAndCheck(producer, broker3Name, topic, queueNum, msgEachQueue, TopicQueueMappingUtils.DEFAULT_BLOCK_SEQ_SIZE * 2);
+ sendMessagesAndCheck(producer, targetBrokers, topic, queueNum, msgEachQueue, 2);
}
// 1 -> 2 -> 3, currently 1 should not has any mappings
@@ -469,10 +349,7 @@ public class StaticTopicIT extends BaseConf {
for (List<LogicQueueMappingItem> items : config3.getMappingDetail().getHostedQueues().values()) {
Assert.assertEquals(1, items.size());
}
-
}
-
-
}
@@ -482,42 +359,18 @@ public class StaticTopicIT extends BaseConf {
RMQNormalProducer producer = getProducer(nsAddr, topic);
int queueNum = 10;
int msgEachQueue = 100;
- //create static topic
+ //create and send
{
- Set<String> targetBrokers = new HashSet<>();
- targetBrokers.add(broker1Name);
+ Set<String> targetBrokers = ImmutableSet.of(broker1Name);
MQAdminTestUtils.createStaticTopic(topic, queueNum, targetBrokers, defaultMQAdminExt);
- }
- //System.out.printf("%s %s\n", broker1Name, clientMetadata.findMasterBrokerAddr(broker1Name));
- //System.out.printf("%s %s\n", broker2Name, clientMetadata.findMasterBrokerAddr(broker2Name));
-
- //produce the messages
- {
- List<MessageQueue> messageQueueList = producer.getMessageQueue();
- for (int i = 0; i < queueNum; i++) {
- MessageQueue messageQueue = messageQueueList.get(i);
- Assert.assertEquals(i, messageQueue.getQueueId());
- Assert.assertEquals(MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME, messageQueue.getBrokerName());
- }
- for(MessageQueue messageQueue: messageQueueList) {
- producer.send(msgEachQueue, messageQueue);
- }
- Assert.assertEquals(0, producer.getSendErrorMsg().size());
- //leave the time to build the cq
- Thread.sleep(100);
- for(MessageQueue messageQueue: messageQueueList) {
- //Assert.assertEquals(0, defaultMQAdminExt.minOffset(messageQueue));
- Assert.assertEquals(msgEachQueue, defaultMQAdminExt.maxOffset(messageQueue));
- }
+ sendMessagesAndCheck(producer, targetBrokers, topic, queueNum, msgEachQueue, 0);
}
//remapping the static topic with -1 logic offset
{
- Set<String> targetBrokers = new HashSet<>();
- targetBrokers.add(broker2Name);
+ Set<String> targetBrokers = ImmutableSet.of(broker2Name);
MQAdminTestUtils.remappingStaticTopicWithNegativeLogicOffset(topic, targetBrokers, defaultMQAdminExt);
Map<String, TopicConfigAndQueueMapping> remoteBrokerConfigMap = MQAdminUtils.examineTopicConfigAll(topic, defaultMQAdminExt);
-
TopicQueueMappingUtils.checkNameEpochNumConsistence(topic, remoteBrokerConfigMap);
Map<Integer, TopicQueueMappingOne> globalIdMap = TopicQueueMappingUtils.checkAndBuildMappingItems(new ArrayList<>(getMappingDetailFromConfig(remoteBrokerConfigMap.values())), false, true);
Assert.assertEquals(queueNum, globalIdMap.size());
@@ -525,32 +378,10 @@ public class StaticTopicIT extends BaseConf {
Assert.assertEquals(broker2Name, mappingOne.getBname());
Assert.assertEquals(-1, mappingOne.getItems().get(mappingOne.getItems().size() - 1).getLogicOffset());
}
- }
- //leave the time to refresh the metadata
- Thread.sleep(500);
- producer.setDebug(true);
- {
- ClientMetadata clientMetadata = MQAdminUtils.getBrokerAndTopicMetadata(topic, defaultMQAdminExt);
- List<MessageQueue> messageQueueList = producer.getMessageQueue();
- for (int i = 0; i < queueNum; i++) {
- MessageQueue messageQueue = messageQueueList.get(i);
- Assert.assertEquals(i, messageQueue.getQueueId());
- String destBrokerName = clientMetadata.getBrokerNameFromMessageQueue(messageQueue);
- Assert.assertEquals(destBrokerName, broker2Name);
- }
-
- for(MessageQueue messageQueue: messageQueueList) {
- producer.send(msgEachQueue, messageQueue);
- }
- Assert.assertEquals(0, producer.getSendErrorMsg().size());
- Assert.assertEquals(queueNum * msgEachQueue * 2, producer.getAllOriginMsg().size());
- //leave the time to build the cq
- Thread.sleep(100);
- for(MessageQueue messageQueue: messageQueueList) {
- Assert.assertEquals(0, defaultMQAdminExt.minOffset(messageQueue));
- //the max offset should still be msgEachQueue
- Assert.assertEquals(msgEachQueue, defaultMQAdminExt.maxOffset(messageQueue));
- }
+ //leave the time to refresh the metadata
+ Thread.sleep(500);
+ //here the gen should be 0
+ sendMessagesAndCheck(producer, targetBrokers, topic, queueNum, msgEachQueue, 0);
}
}