You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2022/02/08 02:43:51 UTC

[rocketmq] 08/17: Convert the consumer offset too

This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch 5.0.0-alpha
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit b3f9fbd3a72b2bdf54fed315b71f8361b300e107
Author: dongeforever <do...@apache.org>
AuthorDate: Wed Jan 5 17:01:11 2022 +0800

    Convert the consumer offset too
---
 .../broker/processor/AdminBrokerProcessor.java     | 21 +++----
 .../broker/processor/ConsumerManageProcessor.java  | 68 +++++++++++++++++++++-
 .../broker/topic/TopicQueueMappingManager.java     |  2 +-
 .../apache/rocketmq/common/rpc/RpcClientImpl.java  | 25 ++++++++
 ..._Topic_Logic_Queue_\350\256\276\350\256\241.md" |  9 ++-
 5 files changed, 106 insertions(+), 19 deletions(-)

diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index 6505263..568a728 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -1176,8 +1176,6 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
                 continue;
             }
 
-            TopicQueueMappingDetail mappingDetail = this.brokerController.getTopicQueueMappingManager().getTopicQueueMapping(topic);
-
             {
                 SubscriptionData findSubscriptionData =
                     this.brokerController.getConsumerManager().findSubscriptionData(requestHeader.getConsumerGroup(), topic);
@@ -1208,26 +1206,21 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
                 // the consumerOffset cannot be zero for static topic because of the "double read check" strategy
                 // just remain the logic for dynamic topic
                 // maybe we should remove it in the future
-                if (mappingDetail == null) {
-                    if (consumerOffset < 0)
-                        consumerOffset = 0;
-                }
+                if (consumerOffset < 0)
+                    consumerOffset = 0;
 
                 offsetWrapper.setBrokerOffset(brokerOffset);
                 offsetWrapper.setConsumerOffset(consumerOffset);
 
                 // the consumeOffset is not in this broker for static topic
                 // and may get the wrong result
-                if (mappingDetail == null) {
-                    long timeOffset = consumerOffset - 1;
-                    if (timeOffset >= 0) {
-                        long lastTimestamp = this.brokerController.getMessageStore().getMessageStoreTimeStamp(topic, i, timeOffset);
-                        if (lastTimestamp > 0) {
-                            offsetWrapper.setLastTimestamp(lastTimestamp);
-                        }
+                long timeOffset = consumerOffset - 1;
+                if (timeOffset >= 0) {
+                    long lastTimestamp = this.brokerController.getMessageStore().getMessageStoreTimeStamp(topic, i, timeOffset);
+                    if (lastTimestamp > 0) {
+                        offsetWrapper.setLastTimestamp(lastTimestamp);
                     }
                 }
-
                 consumeStats.getOffsetTable().put(mq, offsetWrapper);
             }
 
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java
index 04e705b..a266442 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java
@@ -29,11 +29,15 @@ import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetRequestHead
 import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetResponseHeader;
 import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetRequestHeader;
 import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetResponseHeader;
+import org.apache.rocketmq.common.rpc.RpcClientUtils;
 import org.apache.rocketmq.common.rpc.RpcRequest;
 import org.apache.rocketmq.common.rpc.RpcResponse;
+import org.apache.rocketmq.common.rpc.TopicQueueRequestHeader;
 import org.apache.rocketmq.common.statictopic.LogicQueueMappingItem;
 import org.apache.rocketmq.common.statictopic.TopicQueueMappingContext;
 import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail;
+import org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils;
+import org.apache.rocketmq.common.sysflag.PullSysFlag;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
 import org.apache.rocketmq.remoting.common.RemotingHelper;
@@ -110,6 +114,37 @@ public class ConsumerManageProcessor extends AsyncNettyRequestProcessor implemen
         return response;
     }
 
+    public  RemotingCommand rewriteRequestForStaticTopic(final UpdateConsumerOffsetRequestHeader requestHeader, final TopicQueueMappingContext mappingContext) {
+        try {
+            if (mappingContext.getMappingDetail() == null) {
+                return null;
+            }
+            TopicQueueMappingDetail mappingDetail = mappingContext.getMappingDetail();
+            if (!mappingContext.isLeader()) {
+                return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d does not exit in request process of current broker %s", requestHeader.getTopic(), requestHeader.getQueueId(), mappingDetail.getBname()));
+            }
+            Long globalOffset = requestHeader.getCommitOffset();
+            LogicQueueMappingItem mappingItem = TopicQueueMappingUtils.findLogicQueueMappingItem(mappingContext.getMappingItemList(), globalOffset, true);
+            requestHeader.setQueueId(mappingItem.getQueueId());
+            requestHeader.setLo(false);
+            requestHeader.setBname(mappingItem.getBname());
+            requestHeader.setCommitOffset(mappingItem.computePhysicalQueueOffset(globalOffset));
+            //leader, let it go, do not need to rewrite the response
+            if (mappingDetail.getBname().equals(mappingItem.getBname())) {
+                return null;
+            }
+            RpcRequest rpcRequest = new RpcRequest(RequestCode.UPDATE_CONSUMER_OFFSET, requestHeader, null);
+            RpcResponse rpcResponse = this.brokerController.getBrokerOuterAPI().getRpcClient().invoke(rpcRequest, this.brokerController.getBrokerConfig().getForwardTimeout()).get();
+            if (rpcResponse.getException() != null) {
+                throw rpcResponse.getException();
+            }
+            return RpcClientUtils.createCommandForRpcResponse(rpcResponse);
+        } catch (Throwable t) {
+            return buildErrorResponse(ResponseCode.SYSTEM_ERROR, t.getMessage());
+        }
+    }
+
+
     private RemotingCommand updateConsumerOffset(ChannelHandlerContext ctx, RemotingCommand request)
         throws RemotingCommandException {
         final RemotingCommand response =
@@ -119,7 +154,7 @@ public class ConsumerManageProcessor extends AsyncNettyRequestProcessor implemen
                 .decodeCommandCustomHeader(UpdateConsumerOffsetRequestHeader.class);
         TopicQueueMappingContext mappingContext = this.brokerController.getTopicQueueMappingManager().buildTopicQueueMappingContext(requestHeader);
 
-        RemotingCommand rewriteResult  = this.brokerController.getTopicQueueMappingManager().rewriteRequestForStaticTopic(requestHeader, mappingContext);
+        RemotingCommand rewriteResult  =  rewriteRequestForStaticTopic(requestHeader, mappingContext);
         if (rewriteResult != null) {
             return rewriteResult;
         }
@@ -144,6 +179,7 @@ public class ConsumerManageProcessor extends AsyncNettyRequestProcessor implemen
             if (mappingItemList.size() == 1
                     &&  mappingItemList.get(0).getLogicOffset() == 0) {
                 //as physical, just let it go
+                mappingContext.setCurrentItem(mappingItemList.get(0));
                 requestHeader.setQueueId(mappingContext.getLeaderItem().getQueueId());
                 return null;
             }
@@ -154,6 +190,7 @@ public class ConsumerManageProcessor extends AsyncNettyRequestProcessor implemen
             //double read, first from leader, then from second leader
             for (int i = itemList.size() - 1; i >= 0; i--) {
                 LogicQueueMappingItem mappingItem = itemList.get(i);
+                mappingContext.setCurrentItem(mappingItem);
                 if (mappingItem.getBname().equals(mappingDetail.getBname())) {
                     offset = this.brokerController.getConsumerOffsetManager().queryOffset(requestHeader.getConsumerGroup(), requestHeader.getTopic(), mappingItem.getQueueId());
                     if (offset >= 0) {
@@ -194,9 +231,31 @@ public class ConsumerManageProcessor extends AsyncNettyRequestProcessor implemen
                 response.setCode(ResponseCode.QUERY_NOT_FOUND);
                 response.setRemark("Not found, maybe this group consumer boot first");
             }
+            RemotingCommand rewriteResponseResult = rewriteResponseForStaticTopic(requestHeader, responseHeader, mappingContext, response.getCode());
+            if (rewriteResponseResult != null) {
+                return rewriteResponseResult;
+            }
             return response;
         } catch (Throwable t) {
-            t.printStackTrace();
+            return buildErrorResponse(ResponseCode.SYSTEM_ERROR, t.getMessage());
+        }
+    }
+
+
+    public  RemotingCommand rewriteResponseForStaticTopic(final QueryConsumerOffsetRequestHeader requestHeader, final QueryConsumerOffsetResponseHeader responseHeader,
+        final TopicQueueMappingContext mappingContext, final int code) {
+        try {
+            if (mappingContext.getMappingDetail() == null) {
+                return null;
+            }
+            if (code != ResponseCode.SUCCESS) {
+                return null;
+            }
+            LogicQueueMappingItem item = mappingContext.getCurrentItem();
+            responseHeader.setOffset(item.computeStaticQueueOffsetStrictly(responseHeader.getOffset()));
+            //no need to construct new object
+            return null;
+        } catch (Throwable t) {
             return buildErrorResponse(ResponseCode.SYSTEM_ERROR, t.getMessage());
         }
     }
@@ -245,6 +304,11 @@ public class ConsumerManageProcessor extends AsyncNettyRequestProcessor implemen
             }
         }
 
+        RemotingCommand rewriteResponseResult = rewriteResponseForStaticTopic(requestHeader, responseHeader, mappingContext, response.getCode());
+        if (rewriteResponseResult != null) {
+            return rewriteResponseResult;
+        }
+
         return response;
     }
 }
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
index 56fc792..dd7e708 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
@@ -189,7 +189,7 @@ public class TopicQueueMappingManager extends ConfigManager {
 
     //Do not return a null context
     public TopicQueueMappingContext buildTopicQueueMappingContext(TopicRequestHeader requestHeader, boolean selectOneWhenMiss) {
-        //should disable logic queue explicitly, otherwise the old client may cause dirty data to newly created static topic
+        // if lo is set to false explicitly, it maybe the forwarded request
         if (requestHeader.getLo() != null
                 && Boolean.FALSE.equals(requestHeader.getLo())) {
             return new TopicQueueMappingContext(requestHeader.getTopic(), null, null, null, null);
diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java
index 62e6ec1..3782ab0 100644
--- a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java
+++ b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java
@@ -28,6 +28,8 @@ import org.apache.rocketmq.common.protocol.header.GetMinOffsetResponseHeader;
 import org.apache.rocketmq.common.protocol.header.PullMessageResponseHeader;
 import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetResponseHeader;
 import org.apache.rocketmq.common.protocol.header.SearchOffsetResponseHeader;
+import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetRequestHeader;
+import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetResponseHeader;
 import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping;
 import org.apache.rocketmq.remoting.InvokeCallback;
 import org.apache.rocketmq.remoting.RemotingClient;
@@ -101,6 +103,9 @@ public class RpcClientImpl implements RpcClient {
                 case RequestCode.QUERY_CONSUMER_OFFSET:
                     rpcResponsePromise = handleQueryConsumerOffset(addr, request, timeoutMs);
                     break;
+                case RequestCode.UPDATE_CONSUMER_OFFSET:
+                    rpcResponsePromise = handleUpdateConsumerOffset(addr, request, timeoutMs);
+                    break;
                 case RequestCode.GET_TOPIC_STATS_INFO:
                     rpcResponsePromise = handleCommonBodyRequest(addr, request, timeoutMs, TopicStatsTable.class);
                     break;
@@ -234,6 +239,26 @@ public class RpcClientImpl implements RpcClient {
         return rpcResponsePromise;
     }
 
+    public Promise<RpcResponse> handleUpdateConsumerOffset(String addr, RpcRequest rpcRequest, long timeoutMillis) throws Exception {
+        final Promise<RpcResponse> rpcResponsePromise = createResponseFuture();
+
+        RemotingCommand requestCommand = RpcClientUtils.createCommandForRpcRequest(rpcRequest);
+        RemotingCommand responseCommand = this.remotingClient.invokeSync(addr, requestCommand, timeoutMillis);
+        assert responseCommand != null;
+        switch (responseCommand.getCode()) {
+            case ResponseCode.SUCCESS: {
+                UpdateConsumerOffsetResponseHeader responseHeader =
+                    (UpdateConsumerOffsetResponseHeader) responseCommand.decodeCommandCustomHeader(UpdateConsumerOffsetResponseHeader.class);
+                rpcResponsePromise.setSuccess(new RpcResponse(responseCommand.getCode(), responseHeader, responseCommand.getBody()));
+                break;
+            }
+            default: {
+                rpcResponsePromise.setSuccess(new RpcResponse(new RpcException(responseCommand.getCode(), "unknown remote error")));
+            }
+        }
+        return rpcResponsePromise;
+    }
+
     public Promise<RpcResponse> handleCommonBodyRequest(final String addr, RpcRequest rpcRequest, long timeoutMillis, Class bodyClass) throws Exception {
         final Promise<RpcResponse> rpcResponsePromise = createResponseFuture();
         RemotingCommand requestCommand = RpcClientUtils.createCommandForRpcRequest(rpcRequest);
diff --git "a/docs/cn/statictopic/RocketMQ_Static_Topic_Logic_Queue_\350\256\276\350\256\241.md" "b/docs/cn/statictopic/RocketMQ_Static_Topic_Logic_Queue_\350\256\276\350\256\241.md"
index c06d83f..ac1cc6b 100644
--- "a/docs/cn/statictopic/RocketMQ_Static_Topic_Logic_Queue_\350\256\276\350\256\241.md"
+++ "b/docs/cn/statictopic/RocketMQ_Static_Topic_Logic_Queue_\350\256\276\350\256\241.md"
@@ -7,6 +7,8 @@
 | 2021-12-03 | 增加代码走读的说明| dongforever |
 | 2021-12-10 | 引入Scope概念,保留『多集群动态零耦合』的集群设计模型 | dongforever |
 | 2021-12-23 | 梳理待完成事项;讨论Admin接口的适配方式 | dongforever |
+| 2021-01-05 | Offset存储改成『转换制』,以更好适配原有逻辑 | dongforever |
+
 
 
 
@@ -342,8 +344,9 @@ UpdateStaticTopic 命令会自动计算预期的分布情况,包括但不限
 
 
 #### consumerOffsets 系列
-Offset的存储,无需转换,直接存储在 LogicQueue 所对应的最新 PhysicalQueue 中。
-读取时,采取『Double-Read-Check』机制。
+Offset的存储,进行转换,存储在对应PhysicalQueue 所在的 Broker上面。  
+读取时,采取『Double-Read-Check』机制,并进行转换。  
+这样可以最大程度与 PhysicalQueue 的相关逻辑进行适配,比如 ConsumerProgress 可以看到『最近拉取时间』。 
 
 #### Client
 
@@ -454,6 +457,8 @@ User 接口,使用范围广泛如多语言等,应该尽可能简单,把适
 #### 阻止Pop模式、事务消息、定时消息使用 LogicQueue
 不兼容 事务消息和定时消息。  
 LogicQueue 当前不支持Pop模式消费。
+#### Nameserver 相关生命周期完善
+目前没有处理Nameserver中Mapping数据的生命周期
 #### ConsumeQueue 的 correctMinOffset 逻辑存在缺陷
 可能导致 LogicQueue 无法清除已经过期的 MappingItem。
 #### getOffsetInQueueByTime 语义有缺陷