You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2022/02/08 02:43:51 UTC
[rocketmq] 08/17: Convert the consumer offset too
This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch 5.0.0-alpha
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit b3f9fbd3a72b2bdf54fed315b71f8361b300e107
Author: dongeforever <do...@apache.org>
AuthorDate: Wed Jan 5 17:01:11 2022 +0800
Convert the consumer offset too
---
.../broker/processor/AdminBrokerProcessor.java | 21 +++----
.../broker/processor/ConsumerManageProcessor.java | 68 +++++++++++++++++++++-
.../broker/topic/TopicQueueMappingManager.java | 2 +-
.../apache/rocketmq/common/rpc/RpcClientImpl.java | 25 ++++++++
..._Topic_Logic_Queue_\350\256\276\350\256\241.md" | 9 ++-
5 files changed, 106 insertions(+), 19 deletions(-)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index 6505263..568a728 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -1176,8 +1176,6 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
continue;
}
- TopicQueueMappingDetail mappingDetail = this.brokerController.getTopicQueueMappingManager().getTopicQueueMapping(topic);
-
{
SubscriptionData findSubscriptionData =
this.brokerController.getConsumerManager().findSubscriptionData(requestHeader.getConsumerGroup(), topic);
@@ -1208,26 +1206,21 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
// the consumerOffset cannot be zero for static topic because of the "double read check" strategy
// just remain the logic for dynamic topic
// maybe we should remove it in the future
- if (mappingDetail == null) {
- if (consumerOffset < 0)
- consumerOffset = 0;
- }
+ if (consumerOffset < 0)
+ consumerOffset = 0;
offsetWrapper.setBrokerOffset(brokerOffset);
offsetWrapper.setConsumerOffset(consumerOffset);
// the consumeOffset is not in this broker for static topic
// and may get the wrong result
- if (mappingDetail == null) {
- long timeOffset = consumerOffset - 1;
- if (timeOffset >= 0) {
- long lastTimestamp = this.brokerController.getMessageStore().getMessageStoreTimeStamp(topic, i, timeOffset);
- if (lastTimestamp > 0) {
- offsetWrapper.setLastTimestamp(lastTimestamp);
- }
+ long timeOffset = consumerOffset - 1;
+ if (timeOffset >= 0) {
+ long lastTimestamp = this.brokerController.getMessageStore().getMessageStoreTimeStamp(topic, i, timeOffset);
+ if (lastTimestamp > 0) {
+ offsetWrapper.setLastTimestamp(lastTimestamp);
}
}
-
consumeStats.getOffsetTable().put(mq, offsetWrapper);
}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java
index 04e705b..a266442 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java
@@ -29,11 +29,15 @@ import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetRequestHead
import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetResponseHeader;
import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetRequestHeader;
import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetResponseHeader;
+import org.apache.rocketmq.common.rpc.RpcClientUtils;
import org.apache.rocketmq.common.rpc.RpcRequest;
import org.apache.rocketmq.common.rpc.RpcResponse;
+import org.apache.rocketmq.common.rpc.TopicQueueRequestHeader;
import org.apache.rocketmq.common.statictopic.LogicQueueMappingItem;
import org.apache.rocketmq.common.statictopic.TopicQueueMappingContext;
import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail;
+import org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils;
+import org.apache.rocketmq.common.sysflag.PullSysFlag;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.common.RemotingHelper;
@@ -110,6 +114,37 @@ public class ConsumerManageProcessor extends AsyncNettyRequestProcessor implemen
return response;
}
+ public RemotingCommand rewriteRequestForStaticTopic(final UpdateConsumerOffsetRequestHeader requestHeader, final TopicQueueMappingContext mappingContext) {
+ try {
+ if (mappingContext.getMappingDetail() == null) {
+ return null;
+ }
+ TopicQueueMappingDetail mappingDetail = mappingContext.getMappingDetail();
+ if (!mappingContext.isLeader()) {
+ return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d does not exit in request process of current broker %s", requestHeader.getTopic(), requestHeader.getQueueId(), mappingDetail.getBname()));
+ }
+ Long globalOffset = requestHeader.getCommitOffset();
+ LogicQueueMappingItem mappingItem = TopicQueueMappingUtils.findLogicQueueMappingItem(mappingContext.getMappingItemList(), globalOffset, true);
+ requestHeader.setQueueId(mappingItem.getQueueId());
+ requestHeader.setLo(false);
+ requestHeader.setBname(mappingItem.getBname());
+ requestHeader.setCommitOffset(mappingItem.computePhysicalQueueOffset(globalOffset));
+ //leader, let it go, do not need to rewrite the response
+ if (mappingDetail.getBname().equals(mappingItem.getBname())) {
+ return null;
+ }
+ RpcRequest rpcRequest = new RpcRequest(RequestCode.UPDATE_CONSUMER_OFFSET, requestHeader, null);
+ RpcResponse rpcResponse = this.brokerController.getBrokerOuterAPI().getRpcClient().invoke(rpcRequest, this.brokerController.getBrokerConfig().getForwardTimeout()).get();
+ if (rpcResponse.getException() != null) {
+ throw rpcResponse.getException();
+ }
+ return RpcClientUtils.createCommandForRpcResponse(rpcResponse);
+ } catch (Throwable t) {
+ return buildErrorResponse(ResponseCode.SYSTEM_ERROR, t.getMessage());
+ }
+ }
+
+
private RemotingCommand updateConsumerOffset(ChannelHandlerContext ctx, RemotingCommand request)
throws RemotingCommandException {
final RemotingCommand response =
@@ -119,7 +154,7 @@ public class ConsumerManageProcessor extends AsyncNettyRequestProcessor implemen
.decodeCommandCustomHeader(UpdateConsumerOffsetRequestHeader.class);
TopicQueueMappingContext mappingContext = this.brokerController.getTopicQueueMappingManager().buildTopicQueueMappingContext(requestHeader);
- RemotingCommand rewriteResult = this.brokerController.getTopicQueueMappingManager().rewriteRequestForStaticTopic(requestHeader, mappingContext);
+ RemotingCommand rewriteResult = rewriteRequestForStaticTopic(requestHeader, mappingContext);
if (rewriteResult != null) {
return rewriteResult;
}
@@ -144,6 +179,7 @@ public class ConsumerManageProcessor extends AsyncNettyRequestProcessor implemen
if (mappingItemList.size() == 1
&& mappingItemList.get(0).getLogicOffset() == 0) {
//as physical, just let it go
+ mappingContext.setCurrentItem(mappingItemList.get(0));
requestHeader.setQueueId(mappingContext.getLeaderItem().getQueueId());
return null;
}
@@ -154,6 +190,7 @@ public class ConsumerManageProcessor extends AsyncNettyRequestProcessor implemen
//double read, first from leader, then from second leader
for (int i = itemList.size() - 1; i >= 0; i--) {
LogicQueueMappingItem mappingItem = itemList.get(i);
+ mappingContext.setCurrentItem(mappingItem);
if (mappingItem.getBname().equals(mappingDetail.getBname())) {
offset = this.brokerController.getConsumerOffsetManager().queryOffset(requestHeader.getConsumerGroup(), requestHeader.getTopic(), mappingItem.getQueueId());
if (offset >= 0) {
@@ -194,9 +231,31 @@ public class ConsumerManageProcessor extends AsyncNettyRequestProcessor implemen
response.setCode(ResponseCode.QUERY_NOT_FOUND);
response.setRemark("Not found, maybe this group consumer boot first");
}
+ RemotingCommand rewriteResponseResult = rewriteResponseForStaticTopic(requestHeader, responseHeader, mappingContext, response.getCode());
+ if (rewriteResponseResult != null) {
+ return rewriteResponseResult;
+ }
return response;
} catch (Throwable t) {
- t.printStackTrace();
+ return buildErrorResponse(ResponseCode.SYSTEM_ERROR, t.getMessage());
+ }
+ }
+
+
+ public RemotingCommand rewriteResponseForStaticTopic(final QueryConsumerOffsetRequestHeader requestHeader, final QueryConsumerOffsetResponseHeader responseHeader,
+ final TopicQueueMappingContext mappingContext, final int code) {
+ try {
+ if (mappingContext.getMappingDetail() == null) {
+ return null;
+ }
+ if (code != ResponseCode.SUCCESS) {
+ return null;
+ }
+ LogicQueueMappingItem item = mappingContext.getCurrentItem();
+ responseHeader.setOffset(item.computeStaticQueueOffsetStrictly(responseHeader.getOffset()));
+ //no need to construct new object
+ return null;
+ } catch (Throwable t) {
return buildErrorResponse(ResponseCode.SYSTEM_ERROR, t.getMessage());
}
}
@@ -245,6 +304,11 @@ public class ConsumerManageProcessor extends AsyncNettyRequestProcessor implemen
}
}
+ RemotingCommand rewriteResponseResult = rewriteResponseForStaticTopic(requestHeader, responseHeader, mappingContext, response.getCode());
+ if (rewriteResponseResult != null) {
+ return rewriteResponseResult;
+ }
+
return response;
}
}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
index 56fc792..dd7e708 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
@@ -189,7 +189,7 @@ public class TopicQueueMappingManager extends ConfigManager {
//Do not return a null context
public TopicQueueMappingContext buildTopicQueueMappingContext(TopicRequestHeader requestHeader, boolean selectOneWhenMiss) {
- //should disable logic queue explicitly, otherwise the old client may cause dirty data to newly created static topic
+ // if lo is set to false explicitly, it maybe the forwarded request
if (requestHeader.getLo() != null
&& Boolean.FALSE.equals(requestHeader.getLo())) {
return new TopicQueueMappingContext(requestHeader.getTopic(), null, null, null, null);
diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java
index 62e6ec1..3782ab0 100644
--- a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java
+++ b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java
@@ -28,6 +28,8 @@ import org.apache.rocketmq.common.protocol.header.GetMinOffsetResponseHeader;
import org.apache.rocketmq.common.protocol.header.PullMessageResponseHeader;
import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetResponseHeader;
import org.apache.rocketmq.common.protocol.header.SearchOffsetResponseHeader;
+import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetRequestHeader;
+import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetResponseHeader;
import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping;
import org.apache.rocketmq.remoting.InvokeCallback;
import org.apache.rocketmq.remoting.RemotingClient;
@@ -101,6 +103,9 @@ public class RpcClientImpl implements RpcClient {
case RequestCode.QUERY_CONSUMER_OFFSET:
rpcResponsePromise = handleQueryConsumerOffset(addr, request, timeoutMs);
break;
+ case RequestCode.UPDATE_CONSUMER_OFFSET:
+ rpcResponsePromise = handleUpdateConsumerOffset(addr, request, timeoutMs);
+ break;
case RequestCode.GET_TOPIC_STATS_INFO:
rpcResponsePromise = handleCommonBodyRequest(addr, request, timeoutMs, TopicStatsTable.class);
break;
@@ -234,6 +239,26 @@ public class RpcClientImpl implements RpcClient {
return rpcResponsePromise;
}
+ public Promise<RpcResponse> handleUpdateConsumerOffset(String addr, RpcRequest rpcRequest, long timeoutMillis) throws Exception {
+ final Promise<RpcResponse> rpcResponsePromise = createResponseFuture();
+
+ RemotingCommand requestCommand = RpcClientUtils.createCommandForRpcRequest(rpcRequest);
+ RemotingCommand responseCommand = this.remotingClient.invokeSync(addr, requestCommand, timeoutMillis);
+ assert responseCommand != null;
+ switch (responseCommand.getCode()) {
+ case ResponseCode.SUCCESS: {
+ UpdateConsumerOffsetResponseHeader responseHeader =
+ (UpdateConsumerOffsetResponseHeader) responseCommand.decodeCommandCustomHeader(UpdateConsumerOffsetResponseHeader.class);
+ rpcResponsePromise.setSuccess(new RpcResponse(responseCommand.getCode(), responseHeader, responseCommand.getBody()));
+ break;
+ }
+ default: {
+ rpcResponsePromise.setSuccess(new RpcResponse(new RpcException(responseCommand.getCode(), "unknown remote error")));
+ }
+ }
+ return rpcResponsePromise;
+ }
+
public Promise<RpcResponse> handleCommonBodyRequest(final String addr, RpcRequest rpcRequest, long timeoutMillis, Class bodyClass) throws Exception {
final Promise<RpcResponse> rpcResponsePromise = createResponseFuture();
RemotingCommand requestCommand = RpcClientUtils.createCommandForRpcRequest(rpcRequest);
diff --git "a/docs/cn/statictopic/RocketMQ_Static_Topic_Logic_Queue_\350\256\276\350\256\241.md" "b/docs/cn/statictopic/RocketMQ_Static_Topic_Logic_Queue_\350\256\276\350\256\241.md"
index c06d83f..ac1cc6b 100644
--- "a/docs/cn/statictopic/RocketMQ_Static_Topic_Logic_Queue_\350\256\276\350\256\241.md"
+++ "b/docs/cn/statictopic/RocketMQ_Static_Topic_Logic_Queue_\350\256\276\350\256\241.md"
@@ -7,6 +7,8 @@
| 2021-12-03 | 增加代码走读的说明| dongforever |
| 2021-12-10 | 引入Scope概念,保留『多集群动态零耦合』的集群设计模型 | dongforever |
| 2021-12-23 | 梳理待完成事项;讨论Admin接口的适配方式 | dongforever |
+| 2021-01-05 | Offset存储改成『转换制』,以更好适配原有逻辑 | dongforever |
+
@@ -342,8 +344,9 @@ UpdateStaticTopic 命令会自动计算预期的分布情况,包括但不限
#### consumerOffsets 系列
-Offset的存储,无需转换,直接存储在 LogicQueue 所对应的最新 PhysicalQueue 中。
-读取时,采取『Double-Read-Check』机制。
+Offset的存储,进行转换,存储在对应PhysicalQueue 所在的 Broker上面。
+读取时,采取『Double-Read-Check』机制,并进行转换。
+这样可以最大程度与 PhysicalQueue 的相关逻辑进行适配,比如 ConsumerProgress 可以看到『最近拉取时间』。
#### Client
@@ -454,6 +457,8 @@ User 接口,使用范围广泛如多语言等,应该尽可能简单,把适
#### 阻止Pop模式、事务消息、定时消息使用 LogicQueue
不兼容 事务消息和定时消息。
LogicQueue 当前不支持Pop模式消费。
+#### Nameserver 相关生命周期完善
+目前没有处理Nameserver中Mapping数据的生命周期
#### ConsumeQueue 的 correctMinOffset 逻辑存在缺陷
可能导致 LogicQueue 无法清除已经过期的 MappingItem。
#### getOffsetInQueueByTime 语义有缺陷