You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by do...@apache.org on 2021/12/02 08:34:53 UTC
[rocketmq] 02/02: Finish the logic for double-read-check
This is an automated email from the ASF dual-hosted git repository.
dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit fad17e1ecc4a8884f2a020dfcb4c271d018995c2
Author: dongeforever <do...@apache.org>
AuthorDate: Thu Dec 2 16:34:33 2021 +0800
Finish the logic for double-read-check
---
.../broker/processor/AdminBrokerProcessor.java | 1 +
.../broker/processor/ConsumerManageProcessor.java | 85 +++++++++++++-
.../broker/topic/TopicQueueMappingManager.java | 3 +-
.../consumer/store/RemoteBrokerOffsetStore.java | 6 +-
.../client/exception/OffsetNotFoundException.java | 15 +++
.../rocketmq/client/impl/MQClientAPIImpl.java | 5 +-
.../client/impl/consumer/RebalancePushImpl.java | 1 +
.../apache/rocketmq/common/rpc/RpcClientImpl.java | 29 +++++
.../apache/rocketmq/test/smoke/StaticTopicIT.java | 128 +++++++++++++++++----
9 files changed, 242 insertions(+), 31 deletions(-)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index 16bf677..33d3025 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -709,6 +709,7 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
offset = mappingItem.computeStaticQueueOffsetUpToEnd(offset);
+
final RemotingCommand response = RemotingCommand.createResponseCommand(GetMaxOffsetResponseHeader.class);
final GetMaxOffsetResponseHeader responseHeader = (GetMaxOffsetResponseHeader) response.readCustomHeader();
responseHeader.setOffset(offset);
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java
index 930f171..aad9454 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java
@@ -20,8 +20,14 @@ import io.netty.channel.ChannelHandlerContext;
import java.util.List;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.client.ConsumerGroupInfo;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.rpc.RpcRequest;
+import org.apache.rocketmq.common.rpc.RpcResponse;
+import org.apache.rocketmq.common.rpc.TopicQueueRequestHeader;
+import org.apache.rocketmq.common.statictopic.LogicQueueMappingItem;
import org.apache.rocketmq.common.statictopic.TopicQueueMappingContext;
import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.ResponseCode;
@@ -39,6 +45,8 @@ import org.apache.rocketmq.remoting.netty.AsyncNettyRequestProcessor;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import static org.apache.rocketmq.remoting.protocol.RemotingCommand.buildErrorResponse;
+
public class ConsumerManageProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
@@ -111,6 +119,7 @@ public class ConsumerManageProcessor extends AsyncNettyRequestProcessor implemen
(UpdateConsumerOffsetRequestHeader) request
.decodeCommandCustomHeader(UpdateConsumerOffsetRequestHeader.class);
TopicQueueMappingContext mappingContext = this.brokerController.getTopicQueueMappingManager().buildTopicQueueMappingContext(requestHeader);
+
RemotingCommand rewriteResult = this.brokerController.getTopicQueueMappingManager().rewriteRequestForStaticTopic(requestHeader, mappingContext);
if (rewriteResult != null) {
return rewriteResult;
@@ -122,6 +131,76 @@ public class ConsumerManageProcessor extends AsyncNettyRequestProcessor implemen
return response;
}
+
+ public RemotingCommand rewriteRequestForStaticTopic(QueryConsumerOffsetRequestHeader requestHeader, TopicQueueMappingContext mappingContext) {
+ try {
+ if (mappingContext.getMappingDetail() == null) {
+ return null;
+ }
+ TopicQueueMappingDetail mappingDetail = mappingContext.getMappingDetail();
+ if (!mappingContext.isLeader()) {
+ return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d does not exit in request process of current broker %s", requestHeader.getTopic(), requestHeader.getQueueId(), mappingDetail.getBname()));
+ }
+ if (mappingContext.checkIfAsPhysical()) {
+ //let it go
+ requestHeader.setQueueId(mappingContext.getLeaderItem().getQueueId());
+ return null;
+ }
+ //double read check
+ List<LogicQueueMappingItem> itemList = mappingContext.getMappingItemList();
+ //by default, it is -1
+ long offset = -1;
+ //double read, first from leader, then from second leader
+ for (int i = 1; i <= 2; i++) {
+ if (itemList.size() - i < 0) {
+ break;
+ }
+ LogicQueueMappingItem mappingItem = itemList.get(itemList.size() - i);
+ if (mappingItem.getBname().equals(mappingDetail.getBname())) {
+ offset = this.brokerController.getConsumerOffsetManager().queryOffset(requestHeader.getConsumerGroup(), requestHeader.getTopic(), mappingItem.getQueueId());
+ if (offset >= 0) {
+ break;
+ } else {
+ //not found
+ continue;
+ }
+ } else {
+ //maybe we need to reconstruct an object
+ requestHeader.setBname(mappingItem.getBname());
+ requestHeader.setQueueId(mappingItem.getQueueId());
+ requestHeader.setPhysical(true);
+ RpcRequest rpcRequest = new RpcRequest(RequestCode.QUERY_CONSUMER_OFFSET, requestHeader, null);
+ RpcResponse rpcResponse = this.brokerController.getBrokerOuterAPI().getRpcClient().invoke(rpcRequest, this.brokerController.getBrokerConfig().getForwardTimeout()).get();
+ if (rpcResponse.getException() != null) {
+ throw rpcResponse.getException();
+ }
+ if (rpcResponse.getCode() == ResponseCode.SUCCESS) {
+ offset = ((QueryConsumerOffsetResponseHeader) rpcResponse.getHeader()).getOffset();
+ } else if (rpcResponse.getCode() == ResponseCode.PULL_NOT_FOUND){
+ continue;
+ } else {
+ //this should not happen
+ throw new RuntimeException("Unknown response code " + rpcResponse.getCode());
+ }
+ }
+ }
+ final RemotingCommand response = RemotingCommand.createResponseCommand(QueryConsumerOffsetResponseHeader.class);
+ final QueryConsumerOffsetResponseHeader responseHeader = (QueryConsumerOffsetResponseHeader) response.readCustomHeader();
+ if (offset >= 0) {
+ responseHeader.setOffset(offset);
+ response.setCode(ResponseCode.SUCCESS);
+ response.setRemark(null);
+ } else {
+ response.setCode(ResponseCode.QUERY_NOT_FOUND);
+ response.setRemark("Not found, maybe this group consumer boot first");
+ }
+ return response;
+ } catch (Throwable t) {
+ t.printStackTrace();
+ return buildErrorResponse(ResponseCode.SYSTEM_ERROR, t.getMessage());
+ }
+ }
+
private RemotingCommand queryConsumerOffset(ChannelHandlerContext ctx, RemotingCommand request)
throws RemotingCommandException {
final RemotingCommand response =
@@ -132,8 +211,9 @@ public class ConsumerManageProcessor extends AsyncNettyRequestProcessor implemen
(QueryConsumerOffsetRequestHeader) request
.decodeCommandCustomHeader(QueryConsumerOffsetRequestHeader.class);
+
TopicQueueMappingContext mappingContext = this.brokerController.getTopicQueueMappingManager().buildTopicQueueMappingContext(requestHeader);
- RemotingCommand rewriteResult = this.brokerController.getTopicQueueMappingManager().rewriteRequestForStaticTopic(requestHeader, mappingContext);
+ RemotingCommand rewriteResult = rewriteRequestForStaticTopic(requestHeader, mappingContext);
if (rewriteResult != null) {
return rewriteResult;
}
@@ -152,8 +232,7 @@ public class ConsumerManageProcessor extends AsyncNettyRequestProcessor implemen
requestHeader.getQueueId());
if (minOffset <= 0
&& !this.brokerController.getMessageStore().checkInDiskByConsumeOffset(
- requestHeader.getTopic(), requestHeader.getQueueId(), 0)
- && mappingContext.checkIfAsPhysical()) {
+ requestHeader.getTopic(), requestHeader.getQueueId(), 0)) {
responseHeader.setOffset(0L);
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
index 1dd9cbf..9be3717 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
@@ -196,9 +196,8 @@ public class TopicQueueMappingManager extends ConfigManager {
return new TopicQueueMappingContext(requestHeader.getTopic(), globalId, mappingDetail, null, null);
}
- List<LogicQueueMappingItem> mappingItemList = null;
+ List<LogicQueueMappingItem> mappingItemList = TopicQueueMappingDetail.getMappingInfo(mappingDetail, globalId);
LogicQueueMappingItem leaderItem = null;
- mappingItemList = TopicQueueMappingDetail.getMappingInfo(mappingDetail, globalId);
if (mappingItemList != null
&& mappingItemList.size() > 0) {
leaderItem = mappingItemList.get(mappingItemList.size() - 1);
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java b/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java
index 7364856..91d12a0 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java
@@ -25,11 +25,13 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.exception.OffsetNotFoundException;
import org.apache.rocketmq.client.impl.FindBrokerResult;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetRequestHeader;
@@ -94,7 +96,7 @@ public class RemoteBrokerOffsetStore implements OffsetStore {
return brokerOffset;
}
// No offset in broker
- catch (MQBrokerException e) {
+ catch (OffsetNotFoundException e) {
return -1;
}
//Other exceptions
@@ -108,7 +110,7 @@ public class RemoteBrokerOffsetStore implements OffsetStore {
}
}
- return -1;
+ return -3;
}
@Override
diff --git a/client/src/main/java/org/apache/rocketmq/client/exception/OffsetNotFoundException.java b/client/src/main/java/org/apache/rocketmq/client/exception/OffsetNotFoundException.java
new file mode 100644
index 0000000..c3d275f
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/exception/OffsetNotFoundException.java
@@ -0,0 +1,15 @@
+package org.apache.rocketmq.client.exception;
+
+public class OffsetNotFoundException extends MQBrokerException {
+
+ public OffsetNotFoundException() {
+ }
+
+ public OffsetNotFoundException(int responseCode, String errorMessage) {
+ super(responseCode, errorMessage);
+ }
+
+ public OffsetNotFoundException(int responseCode, String errorMessage, String brokerAddr) {
+ super(responseCode, errorMessage, brokerAddr);
+ }
+}
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
index 4feb225..7538866 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
@@ -30,6 +30,7 @@ import org.apache.rocketmq.client.consumer.PullStatus;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.exception.MQRedirectException;
+import org.apache.rocketmq.client.exception.OffsetNotFoundException;
import org.apache.rocketmq.client.hook.SendMessageContext;
import org.apache.rocketmq.client.impl.consumer.PullResultExt;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
@@ -1232,9 +1233,11 @@ public class MQClientAPIImpl {
case ResponseCode.SUCCESS: {
QueryConsumerOffsetResponseHeader responseHeader =
(QueryConsumerOffsetResponseHeader) response.decodeCommandCustomHeader(QueryConsumerOffsetResponseHeader.class);
-
return responseHeader.getOffset();
}
+ case ResponseCode.PULL_NOT_FOUND:{
+ throw new OffsetNotFoundException(response.getCode(), response.getRemark(), addr);
+ }
default:
break;
}
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java
index 09d1521..d28a046 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java
@@ -29,6 +29,7 @@ import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.constant.ConsumeInitMode;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java
index 4e00bc3..713bbf9 100644
--- a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java
+++ b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java
@@ -9,6 +9,7 @@ import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.header.GetEarliestMsgStoretimeResponseHeader;
import org.apache.rocketmq.common.protocol.header.GetMinOffsetResponseHeader;
import org.apache.rocketmq.common.protocol.header.PullMessageResponseHeader;
+import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetResponseHeader;
import org.apache.rocketmq.common.protocol.header.SearchOffsetResponseHeader;
import org.apache.rocketmq.remoting.InvokeCallback;
import org.apache.rocketmq.remoting.RemotingClient;
@@ -75,6 +76,9 @@ public class RpcClientImpl implements RpcClient {
case RequestCode.GET_EARLIEST_MSG_STORETIME:
rpcResponsePromise = handleGetEarliestMsgStoretime(addr, request, timeoutMs);
break;
+ case RequestCode.QUERY_CONSUMER_OFFSET:
+ rpcResponsePromise = handleQueryConsumerOffset(addr, request, timeoutMs);
+ break;
default:
throw new RpcException(ResponseCode.REQUEST_CODE_NOT_SUPPORTED, "Unknown request code " + request.getCode());
}
@@ -176,6 +180,31 @@ public class RpcClientImpl implements RpcClient {
return rpcResponsePromise;
}
+
+
+ public Promise<RpcResponse> handleQueryConsumerOffset(String addr, RpcRequest rpcRequest, long timeoutMillis) throws Exception {
+ final Promise<RpcResponse> rpcResponsePromise = createResponseFuture();
+
+ RemotingCommand requestCommand = RpcClientUtils.createCommandForRpcRequest(rpcRequest);
+ RemotingCommand responseCommand = this.remotingClient.invokeSync(addr, requestCommand, timeoutMillis);
+ assert responseCommand != null;
+ switch (responseCommand.getCode()) {
+ case ResponseCode.SUCCESS: {
+ QueryConsumerOffsetResponseHeader responseHeader =
+ (QueryConsumerOffsetResponseHeader) responseCommand.decodeCommandCustomHeader(QueryConsumerOffsetResponseHeader.class);
+ rpcResponsePromise.setSuccess(new RpcResponse(responseCommand.getCode(), responseHeader, responseCommand.getBody()));
+ break;
+ }
+ case ResponseCode.QUERY_NOT_FOUND: {
+ rpcResponsePromise.setSuccess(new RpcResponse(responseCommand.getCode(), null, null));
+ }
+ default:{
+ rpcResponsePromise.setSuccess(new RpcResponse(new RpcException(responseCommand.getCode(), "unknown remote error")));
+ }
+ }
+ return rpcResponsePromise;
+ }
+
public Promise<RpcResponse> handleGetMinOffset(String addr, RpcRequest rpcRequest, long timeoutMillis) throws Exception {
final Promise<RpcResponse> rpcResponsePromise = createResponseFuture();
diff --git a/test/src/test/java/org/apache/rocketmq/test/smoke/StaticTopicIT.java b/test/src/test/java/org/apache/rocketmq/test/smoke/StaticTopicIT.java
index abe1eee..c1cc60b 100644
--- a/test/src/test/java/org/apache/rocketmq/test/smoke/StaticTopicIT.java
+++ b/test/src/test/java/org/apache/rocketmq/test/smoke/StaticTopicIT.java
@@ -28,6 +28,7 @@ import org.junit.Test;
import sun.jvm.hotspot.runtime.aarch64.AARCH64CurrentFrameGuess;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
@@ -144,30 +145,123 @@ public class StaticTopicIT extends BaseConf {
assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
consumer.getListener().getAllMsgBody()))
.containsExactlyElementsIn(producer.getAllMsgBody());
+ Map<Integer, List<MessageExt>> messagesByQueue = computeMessageByQueue(consumer.getListener().getAllOriginMsg());
+ Assert.assertEquals(queueNum, messagesByQueue.size());
+ for (int i = 0; i < queueNum; i++) {
+ List<MessageExt> messageExts = messagesByQueue.get(i);
+ Assert.assertEquals(msgEachQueue, messageExts.size());
+ for (int j = 0; j < msgEachQueue; j++) {
+ Assert.assertEquals(j, messageExts.get(j).getQueueOffset());
+ }
+ }
+ }
+
+
+ private Map<Integer, List<MessageExt>> computeMessageByQueue(Collection<Object> msgs) {
Map<Integer, List<MessageExt>> messagesByQueue = new HashMap<>();
- for (Object object : consumer.getListener().getAllOriginMsg()) {
+ for (Object object : msgs) {
MessageExt messageExt = (MessageExt) object;
if (!messagesByQueue.containsKey(messageExt.getQueueId())) {
messagesByQueue.put(messageExt.getQueueId(), new ArrayList<>());
}
messagesByQueue.get(messageExt.getQueueId()).add(messageExt);
}
- Assert.assertEquals(queueNum, messagesByQueue.size());
- for (int i = 0; i < queueNum; i++) {
- List<MessageExt> messageExts = messagesByQueue.get(i);
- Assert.assertEquals(msgEachQueue, messageExts.size());
- Collections.sort(messageExts, new Comparator<MessageExt>() {
+ for (List<MessageExt> msgEachQueue: messagesByQueue.values()) {
+ Collections.sort(msgEachQueue, new Comparator<MessageExt>() {
@Override
public int compare(MessageExt o1, MessageExt o2) {
return (int) (o1.getQueueOffset() - o2.getQueueOffset());
}
});
- for (int j = 0; j < msgEachQueue; j++) {
- Assert.assertEquals(j, messageExts.get(j).getQueueOffset());
- }
}
+ return messagesByQueue;
}
+ @Test
+ public void testDoubleReadCheck() throws Exception {
+ String topic = "static" + MQRandomUtils.getRandomTopic();
+ String group = initConsumerGroup();
+ RMQNormalProducer producer = getProducer(nsAddr, topic);
+
+ RMQNormalConsumer consumer = getConsumer(nsAddr, group, topic, "*", new RMQNormalListener());
+
+ //System.out.printf("Group:%s\n", consumer.getConsumerGroup());
+ //System.out.printf("Topic:%s\n", topic);
+
+ int queueNum = 10;
+ int msgEachQueue = 100;
+ //create static topic
+ {
+ Set<String> targetBrokers = new HashSet<>();
+ targetBrokers.add(broker1Name);
+ createStaticTopic(topic, queueNum, targetBrokers);
+ }
+ //produce the messages
+ {
+ List<MessageQueue> messageQueueList = producer.getMessageQueue();
+ for(MessageQueue messageQueue: messageQueueList) {
+ producer.send(msgEachQueue, messageQueue);
+ }
+ Assert.assertEquals(0, producer.getSendErrorMsg().size());
+ Assert.assertEquals(msgEachQueue * queueNum, producer.getAllMsgBody().size());
+ }
+
+ consumer.getListener().waitForMessageConsume(producer.getAllMsgBody(), 3000);
+ assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
+ consumer.getListener().getAllMsgBody()))
+ .containsExactlyElementsIn(producer.getAllMsgBody());
+ producer.shutdown();
+ consumer.shutdown();
+
+ //remapping the static topic
+ {
+ Set<String> targetBrokers = new HashSet<>();
+ targetBrokers.add(broker2Name);
+ remappingStaticTopic(topic, targetBrokers);
+
+ }
+ //make the metadata
+ Thread.sleep(500);
+ //System.out.printf("Group:%s\n", consumer.getConsumerGroup());
+
+ {
+ producer = getProducer(nsAddr, topic);
+ //just refresh the metadata
+ defaultMQAdminExt.examineTopicConfigAll(clientMetadata, topic);
+ List<MessageQueue> messageQueueList = producer.getMessageQueue();
+ for(MessageQueue messageQueue: messageQueueList) {
+ producer.send(msgEachQueue, messageQueue);
+ Assert.assertEquals(broker2Name, clientMetadata.getBrokerNameFromMessageQueue(messageQueue));
+ }
+ Assert.assertEquals(0, producer.getSendErrorMsg().size());
+ Assert.assertEquals(msgEachQueue * queueNum, producer.getAllMsgBody().size());
+ for(MessageQueue messageQueue: messageQueueList) {
+ Assert.assertEquals(0, defaultMQAdminExt.minOffset(messageQueue));
+ Assert.assertEquals(msgEachQueue + TopicQueueMappingUtils.DEFAULT_BLOCK_SEQ_SIZE, defaultMQAdminExt.maxOffset(messageQueue));
+ }
+ //leave the time to build the cq
+ Thread.sleep(100);
+ }
+ {
+ consumer = getConsumer(nsAddr, group, topic, "*", new RMQNormalListener());
+ consumer.getListener().waitForMessageConsume(producer.getAllMsgBody(), 6000);
+ //System.out.printf("Consume %d\n", consumer.getListener().getAllMsgBody().size());
+ assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
+ consumer.getListener().getAllMsgBody()))
+ .containsExactlyElementsIn(producer.getAllMsgBody());
+
+ Map<Integer, List<MessageExt>> messagesByQueue = computeMessageByQueue(consumer.getListener().getAllOriginMsg());
+
+ Assert.assertEquals(queueNum, messagesByQueue.size());
+ for (int i = 0; i < queueNum; i++) {
+ List<MessageExt> messageExts = messagesByQueue.get(i);
+ Assert.assertEquals(msgEachQueue, messageExts.size());
+ for (int j = 0; j < msgEachQueue; j++) {
+ Assert.assertEquals(j + TopicQueueMappingUtils.DEFAULT_BLOCK_SEQ_SIZE, messageExts.get(j).getQueueOffset());
+ }
+ }
+ }
+ }
@Test
public void testRemappingProduceConsumeStaticTopic() throws Exception {
@@ -175,6 +269,7 @@ public class StaticTopicIT extends BaseConf {
RMQNormalProducer producer = getProducer(nsAddr, topic);
RMQNormalConsumer consumer = getConsumer(nsAddr, topic, "*", new RMQNormalListener());
+
int queueNum = 10;
int msgEachQueue = 100;
//create static topic
@@ -254,24 +349,11 @@ public class StaticTopicIT extends BaseConf {
assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
consumer.getListener().getAllMsgBody()))
.containsExactlyElementsIn(producer.getAllMsgBody());
- Map<Integer, List<MessageExt>> messagesByQueue = new HashMap<>();
- for (Object object : consumer.getListener().getAllOriginMsg()) {
- MessageExt messageExt = (MessageExt) object;
- if (!messagesByQueue.containsKey(messageExt.getQueueId())) {
- messagesByQueue.put(messageExt.getQueueId(), new ArrayList<>());
- }
- messagesByQueue.get(messageExt.getQueueId()).add(messageExt);
- }
+ Map<Integer, List<MessageExt>> messagesByQueue = computeMessageByQueue(consumer.getListener().getAllOriginMsg());
Assert.assertEquals(queueNum, messagesByQueue.size());
for (int i = 0; i < queueNum; i++) {
List<MessageExt> messageExts = messagesByQueue.get(i);
Assert.assertEquals(msgEachQueue * 2, messageExts.size());
- Collections.sort(messageExts, new Comparator<MessageExt>() {
- @Override
- public int compare(MessageExt o1, MessageExt o2) {
- return (int) (o1.getQueueOffset() - o2.getQueueOffset());
- }
- });
for (int j = 0; j < msgEachQueue; j++) {
Assert.assertEquals(j, messageExts.get(j).getQueueOffset());
}