You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by do...@apache.org on 2022/01/05 09:43:18 UTC

[rocketmq] branch 5.0.0-alpha updated (d5701ae -> 16128db)

This is an automated email from the ASF dual-hosted git repository.

dongeforever pushed a change to branch 5.0.0-alpha
in repository https://gitbox.apache.org/repos/asf/rocketmq.git.


    from d5701ae  [ISSUE #3679] Support topic attributes (#3698)
     new add737f  Convert the consumer offset too
     new ba53f93  Fix test for consumer offset
     new 16128db  Fix check style

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../broker/processor/AdminBrokerProcessor.java     | 15 ++---
 .../broker/processor/ConsumerManageProcessor.java  | 68 +++++++++++++++++++++-
 .../broker/topic/TopicQueueMappingManager.java     |  2 +-
 .../protocol/header/CreateTopicRequestHeader.java  |  2 -
 .../apache/rocketmq/common/rpc/RpcClientImpl.java  | 31 ++++++++--
 .../common/statictopic/TopicQueueMappingOne.java   |  5 +-
 ..._Topic_Logic_Queue_\350\256\276\350\256\241.md" |  9 ++-
 .../rocketmq/test/statictopic/StaticTopicIT.java   |  3 +
 8 files changed, 110 insertions(+), 25 deletions(-)

[rocketmq] 02/03: Fix test for consumer offset

Posted by do...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dongeforever pushed a commit to branch 5.0.0-alpha
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit ba53f9364fbfc69c56bef25f1f771e61bcfc01d0
Author: dongeforever <do...@apache.org>
AuthorDate: Wed Jan 5 17:11:31 2022 +0800

    Fix test for consumer offset
---
 .../apache/rocketmq/broker/processor/AdminBrokerProcessor.java | 10 ++++++----
 .../org/apache/rocketmq/test/statictopic/StaticTopicIT.java    |  3 +++
 2 files changed, 9 insertions(+), 4 deletions(-)

diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index 568a728..5b8a19f 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -1176,6 +1176,8 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
                 continue;
             }
 
+            TopicQueueMappingDetail mappingDetail = this.brokerController.getTopicQueueMappingManager().getTopicQueueMapping(topic);
+
             {
                 SubscriptionData findSubscriptionData =
                     this.brokerController.getConsumerManager().findSubscriptionData(requestHeader.getConsumerGroup(), topic);
@@ -1206,14 +1208,14 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
                 // the consumerOffset cannot be zero for static topic because of the "double read check" strategy
                 // just remain the logic for dynamic topic
                 // maybe we should remove it in the future
-                if (consumerOffset < 0)
-                    consumerOffset = 0;
+                if (mappingDetail == null) {
+                    if (consumerOffset < 0)
+                        consumerOffset = 0;
+                }
 
                 offsetWrapper.setBrokerOffset(brokerOffset);
                 offsetWrapper.setConsumerOffset(consumerOffset);
 
-                // the consumeOffset is not in this broker for static topic
-                // and may get the wrong result
                 long timeOffset = consumerOffset - 1;
                 if (timeOffset >= 0) {
                     long lastTimestamp = this.brokerController.getMessageStore().getMessageStoreTimeStamp(topic, i, timeOffset);
diff --git a/test/src/test/java/org/apache/rocketmq/test/statictopic/StaticTopicIT.java b/test/src/test/java/org/apache/rocketmq/test/statictopic/StaticTopicIT.java
index 3e8f146..5b3e5fe 100644
--- a/test/src/test/java/org/apache/rocketmq/test/statictopic/StaticTopicIT.java
+++ b/test/src/test/java/org/apache/rocketmq/test/statictopic/StaticTopicIT.java
@@ -292,6 +292,7 @@ public class StaticTopicIT extends BaseConf {
         String group = initConsumerGroup();
         RMQNormalProducer producer = getProducer(nsAddr, topic);
         RMQNormalConsumer consumer = getConsumer(nsAddr, group, topic, "*", new RMQNormalListener());
+        long start = System.currentTimeMillis();
 
         int queueNum = 10;
         int msgEachQueue = 100;
@@ -314,6 +315,7 @@ public class StaticTopicIT extends BaseConf {
             Assert.assertNotNull(wrapper);
             Assert.assertEquals(msgEachQueue, wrapper.getBrokerOffset());
             Assert.assertEquals(msgEachQueue, wrapper.getConsumerOffset());
+            Assert.assertTrue(wrapper.getLastTimestamp() > start);
         }
 
         List<String> brokers = ImmutableList.of(broker2Name, broker3Name, broker1Name);
@@ -332,6 +334,7 @@ public class StaticTopicIT extends BaseConf {
             Assert.assertNotNull(wrapper);
             Assert.assertEquals(msgEachQueue + brokers.size() * TopicQueueMappingUtils.DEFAULT_BLOCK_SEQ_SIZE, wrapper.getBrokerOffset());
             Assert.assertEquals(msgEachQueue, wrapper.getConsumerOffset());
+            Assert.assertTrue(wrapper.getLastTimestamp() > start);
         }
         consumer = getConsumer(nsAddr, group, topic, "*", new RMQNormalListener());
         consumeMessagesAndCheck(producer, consumer, topic, queueNum, msgEachQueue, 1, brokers.size());

[rocketmq] 03/03: Fix check style

Posted by do...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dongeforever pushed a commit to branch 5.0.0-alpha
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit 16128dbbd88bbd797671ae02a52a73eec24838a3
Author: dongeforever <do...@apache.org>
AuthorDate: Wed Jan 5 17:29:26 2022 +0800

    Fix check style
---
 .../rocketmq/common/protocol/header/CreateTopicRequestHeader.java | 2 --
 .../main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java   | 8 +++-----
 .../apache/rocketmq/common/statictopic/TopicQueueMappingOne.java  | 5 +----
 3 files changed, 4 insertions(+), 11 deletions(-)

diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/CreateTopicRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/CreateTopicRequestHeader.java
index 2e381b3..c3c59d4 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/CreateTopicRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/CreateTopicRequestHeader.java
@@ -26,8 +26,6 @@ import org.apache.rocketmq.remoting.annotation.CFNotNull;
 import org.apache.rocketmq.remoting.annotation.CFNullable;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 
-import java.util.Map;
-
 public class CreateTopicRequestHeader implements CommandCustomHeader {
     @CFNotNull
     private String topic;
diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java
index 3782ab0..c5cbc74 100644
--- a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java
+++ b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java
@@ -18,6 +18,9 @@ package org.apache.rocketmq.common.rpc;
 
 import io.netty.util.concurrent.ImmediateEventExecutor;
 import io.netty.util.concurrent.Promise;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Future;
 import org.apache.rocketmq.common.admin.TopicStatsTable;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.common.protocol.RequestCode;
@@ -28,7 +31,6 @@ import org.apache.rocketmq.common.protocol.header.GetMinOffsetResponseHeader;
 import org.apache.rocketmq.common.protocol.header.PullMessageResponseHeader;
 import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetResponseHeader;
 import org.apache.rocketmq.common.protocol.header.SearchOffsetResponseHeader;
-import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetRequestHeader;
 import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetResponseHeader;
 import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping;
 import org.apache.rocketmq.remoting.InvokeCallback;
@@ -37,10 +39,6 @@ import org.apache.rocketmq.remoting.netty.ResponseFuture;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
 
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.Future;
-
 public class RpcClientImpl implements RpcClient {
 
     private ClientMetadata clientMetadata;
diff --git a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingOne.java b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingOne.java
index 636f1d5..597b6ab 100644
--- a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingOne.java
+++ b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingOne.java
@@ -16,11 +16,8 @@
  */
 package org.apache.rocketmq.common.statictopic;
 
-import org.apache.commons.lang3.builder.EqualsBuilder;
-import org.apache.commons.lang3.builder.HashCodeBuilder;
-import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
-
 import java.util.List;
+import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
 
 public class TopicQueueMappingOne extends RemotingSerializable {
 

[rocketmq] 01/03: Convert the consumer offset too

Posted by do...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dongeforever pushed a commit to branch 5.0.0-alpha
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit add737fdb9e984f175b42d3d19fff6d9c2085b39
Author: dongeforever <do...@apache.org>
AuthorDate: Wed Jan 5 17:01:11 2022 +0800

    Convert the consumer offset too
---
 .../broker/processor/AdminBrokerProcessor.java     | 21 +++----
 .../broker/processor/ConsumerManageProcessor.java  | 68 +++++++++++++++++++++-
 .../broker/topic/TopicQueueMappingManager.java     |  2 +-
 .../apache/rocketmq/common/rpc/RpcClientImpl.java  | 25 ++++++++
 ..._Topic_Logic_Queue_\350\256\276\350\256\241.md" |  9 ++-
 5 files changed, 106 insertions(+), 19 deletions(-)

diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index 6505263..568a728 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -1176,8 +1176,6 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
                 continue;
             }
 
-            TopicQueueMappingDetail mappingDetail = this.brokerController.getTopicQueueMappingManager().getTopicQueueMapping(topic);
-
             {
                 SubscriptionData findSubscriptionData =
                     this.brokerController.getConsumerManager().findSubscriptionData(requestHeader.getConsumerGroup(), topic);
@@ -1208,26 +1206,21 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
                 // the consumerOffset cannot be zero for static topic because of the "double read check" strategy
                 // just remain the logic for dynamic topic
                 // maybe we should remove it in the future
-                if (mappingDetail == null) {
-                    if (consumerOffset < 0)
-                        consumerOffset = 0;
-                }
+                if (consumerOffset < 0)
+                    consumerOffset = 0;
 
                 offsetWrapper.setBrokerOffset(brokerOffset);
                 offsetWrapper.setConsumerOffset(consumerOffset);
 
                 // the consumeOffset is not in this broker for static topic
                 // and may get the wrong result
-                if (mappingDetail == null) {
-                    long timeOffset = consumerOffset - 1;
-                    if (timeOffset >= 0) {
-                        long lastTimestamp = this.brokerController.getMessageStore().getMessageStoreTimeStamp(topic, i, timeOffset);
-                        if (lastTimestamp > 0) {
-                            offsetWrapper.setLastTimestamp(lastTimestamp);
-                        }
+                long timeOffset = consumerOffset - 1;
+                if (timeOffset >= 0) {
+                    long lastTimestamp = this.brokerController.getMessageStore().getMessageStoreTimeStamp(topic, i, timeOffset);
+                    if (lastTimestamp > 0) {
+                        offsetWrapper.setLastTimestamp(lastTimestamp);
                     }
                 }
-
                 consumeStats.getOffsetTable().put(mq, offsetWrapper);
             }
 
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java
index 04e705b..a266442 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java
@@ -29,11 +29,15 @@ import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetRequestHead
 import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetResponseHeader;
 import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetRequestHeader;
 import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetResponseHeader;
+import org.apache.rocketmq.common.rpc.RpcClientUtils;
 import org.apache.rocketmq.common.rpc.RpcRequest;
 import org.apache.rocketmq.common.rpc.RpcResponse;
+import org.apache.rocketmq.common.rpc.TopicQueueRequestHeader;
 import org.apache.rocketmq.common.statictopic.LogicQueueMappingItem;
 import org.apache.rocketmq.common.statictopic.TopicQueueMappingContext;
 import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail;
+import org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils;
+import org.apache.rocketmq.common.sysflag.PullSysFlag;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
 import org.apache.rocketmq.remoting.common.RemotingHelper;
@@ -110,6 +114,37 @@ public class ConsumerManageProcessor extends AsyncNettyRequestProcessor implemen
         return response;
     }
 
+    public  RemotingCommand rewriteRequestForStaticTopic(final UpdateConsumerOffsetRequestHeader requestHeader, final TopicQueueMappingContext mappingContext) {
+        try {
+            if (mappingContext.getMappingDetail() == null) {
+                return null;
+            }
+            TopicQueueMappingDetail mappingDetail = mappingContext.getMappingDetail();
+            if (!mappingContext.isLeader()) {
+                return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d does not exit in request process of current broker %s", requestHeader.getTopic(), requestHeader.getQueueId(), mappingDetail.getBname()));
+            }
+            Long globalOffset = requestHeader.getCommitOffset();
+            LogicQueueMappingItem mappingItem = TopicQueueMappingUtils.findLogicQueueMappingItem(mappingContext.getMappingItemList(), globalOffset, true);
+            requestHeader.setQueueId(mappingItem.getQueueId());
+            requestHeader.setLo(false);
+            requestHeader.setBname(mappingItem.getBname());
+            requestHeader.setCommitOffset(mappingItem.computePhysicalQueueOffset(globalOffset));
+            //leader, let it go, do not need to rewrite the response
+            if (mappingDetail.getBname().equals(mappingItem.getBname())) {
+                return null;
+            }
+            RpcRequest rpcRequest = new RpcRequest(RequestCode.UPDATE_CONSUMER_OFFSET, requestHeader, null);
+            RpcResponse rpcResponse = this.brokerController.getBrokerOuterAPI().getRpcClient().invoke(rpcRequest, this.brokerController.getBrokerConfig().getForwardTimeout()).get();
+            if (rpcResponse.getException() != null) {
+                throw rpcResponse.getException();
+            }
+            return RpcClientUtils.createCommandForRpcResponse(rpcResponse);
+        } catch (Throwable t) {
+            return buildErrorResponse(ResponseCode.SYSTEM_ERROR, t.getMessage());
+        }
+    }
+
+
     private RemotingCommand updateConsumerOffset(ChannelHandlerContext ctx, RemotingCommand request)
         throws RemotingCommandException {
         final RemotingCommand response =
@@ -119,7 +154,7 @@ public class ConsumerManageProcessor extends AsyncNettyRequestProcessor implemen
                 .decodeCommandCustomHeader(UpdateConsumerOffsetRequestHeader.class);
         TopicQueueMappingContext mappingContext = this.brokerController.getTopicQueueMappingManager().buildTopicQueueMappingContext(requestHeader);
 
-        RemotingCommand rewriteResult  = this.brokerController.getTopicQueueMappingManager().rewriteRequestForStaticTopic(requestHeader, mappingContext);
+        RemotingCommand rewriteResult  =  rewriteRequestForStaticTopic(requestHeader, mappingContext);
         if (rewriteResult != null) {
             return rewriteResult;
         }
@@ -144,6 +179,7 @@ public class ConsumerManageProcessor extends AsyncNettyRequestProcessor implemen
             if (mappingItemList.size() == 1
                     &&  mappingItemList.get(0).getLogicOffset() == 0) {
                 //as physical, just let it go
+                mappingContext.setCurrentItem(mappingItemList.get(0));
                 requestHeader.setQueueId(mappingContext.getLeaderItem().getQueueId());
                 return null;
             }
@@ -154,6 +190,7 @@ public class ConsumerManageProcessor extends AsyncNettyRequestProcessor implemen
             //double read, first from leader, then from second leader
             for (int i = itemList.size() - 1; i >= 0; i--) {
                 LogicQueueMappingItem mappingItem = itemList.get(i);
+                mappingContext.setCurrentItem(mappingItem);
                 if (mappingItem.getBname().equals(mappingDetail.getBname())) {
                     offset = this.brokerController.getConsumerOffsetManager().queryOffset(requestHeader.getConsumerGroup(), requestHeader.getTopic(), mappingItem.getQueueId());
                     if (offset >= 0) {
@@ -194,9 +231,31 @@ public class ConsumerManageProcessor extends AsyncNettyRequestProcessor implemen
                 response.setCode(ResponseCode.QUERY_NOT_FOUND);
                 response.setRemark("Not found, maybe this group consumer boot first");
             }
+            RemotingCommand rewriteResponseResult = rewriteResponseForStaticTopic(requestHeader, responseHeader, mappingContext, response.getCode());
+            if (rewriteResponseResult != null) {
+                return rewriteResponseResult;
+            }
             return response;
         } catch (Throwable t) {
-            t.printStackTrace();
+            return buildErrorResponse(ResponseCode.SYSTEM_ERROR, t.getMessage());
+        }
+    }
+
+
+    public  RemotingCommand rewriteResponseForStaticTopic(final QueryConsumerOffsetRequestHeader requestHeader, final QueryConsumerOffsetResponseHeader responseHeader,
+        final TopicQueueMappingContext mappingContext, final int code) {
+        try {
+            if (mappingContext.getMappingDetail() == null) {
+                return null;
+            }
+            if (code != ResponseCode.SUCCESS) {
+                return null;
+            }
+            LogicQueueMappingItem item = mappingContext.getCurrentItem();
+            responseHeader.setOffset(item.computeStaticQueueOffsetStrictly(responseHeader.getOffset()));
+            //no need to construct new object
+            return null;
+        } catch (Throwable t) {
             return buildErrorResponse(ResponseCode.SYSTEM_ERROR, t.getMessage());
         }
     }
@@ -245,6 +304,11 @@ public class ConsumerManageProcessor extends AsyncNettyRequestProcessor implemen
             }
         }
 
+        RemotingCommand rewriteResponseResult = rewriteResponseForStaticTopic(requestHeader, responseHeader, mappingContext, response.getCode());
+        if (rewriteResponseResult != null) {
+            return rewriteResponseResult;
+        }
+
         return response;
     }
 }
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
index 56fc792..dd7e708 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
@@ -189,7 +189,7 @@ public class TopicQueueMappingManager extends ConfigManager {
 
     //Do not return a null context
     public TopicQueueMappingContext buildTopicQueueMappingContext(TopicRequestHeader requestHeader, boolean selectOneWhenMiss) {
-        //should disable logic queue explicitly, otherwise the old client may cause dirty data to newly created static topic
+        // if lo is set to false explicitly, it maybe the forwarded request
         if (requestHeader.getLo() != null
                 && Boolean.FALSE.equals(requestHeader.getLo())) {
             return new TopicQueueMappingContext(requestHeader.getTopic(), null, null, null, null);
diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java
index 62e6ec1..3782ab0 100644
--- a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java
+++ b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java
@@ -28,6 +28,8 @@ import org.apache.rocketmq.common.protocol.header.GetMinOffsetResponseHeader;
 import org.apache.rocketmq.common.protocol.header.PullMessageResponseHeader;
 import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetResponseHeader;
 import org.apache.rocketmq.common.protocol.header.SearchOffsetResponseHeader;
+import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetRequestHeader;
+import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetResponseHeader;
 import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping;
 import org.apache.rocketmq.remoting.InvokeCallback;
 import org.apache.rocketmq.remoting.RemotingClient;
@@ -101,6 +103,9 @@ public class RpcClientImpl implements RpcClient {
                 case RequestCode.QUERY_CONSUMER_OFFSET:
                     rpcResponsePromise = handleQueryConsumerOffset(addr, request, timeoutMs);
                     break;
+                case RequestCode.UPDATE_CONSUMER_OFFSET:
+                    rpcResponsePromise = handleUpdateConsumerOffset(addr, request, timeoutMs);
+                    break;
                 case RequestCode.GET_TOPIC_STATS_INFO:
                     rpcResponsePromise = handleCommonBodyRequest(addr, request, timeoutMs, TopicStatsTable.class);
                     break;
@@ -234,6 +239,26 @@ public class RpcClientImpl implements RpcClient {
         return rpcResponsePromise;
     }
 
+    public Promise<RpcResponse> handleUpdateConsumerOffset(String addr, RpcRequest rpcRequest, long timeoutMillis) throws Exception {
+        final Promise<RpcResponse> rpcResponsePromise = createResponseFuture();
+
+        RemotingCommand requestCommand = RpcClientUtils.createCommandForRpcRequest(rpcRequest);
+        RemotingCommand responseCommand = this.remotingClient.invokeSync(addr, requestCommand, timeoutMillis);
+        assert responseCommand != null;
+        switch (responseCommand.getCode()) {
+            case ResponseCode.SUCCESS: {
+                UpdateConsumerOffsetResponseHeader responseHeader =
+                    (UpdateConsumerOffsetResponseHeader) responseCommand.decodeCommandCustomHeader(UpdateConsumerOffsetResponseHeader.class);
+                rpcResponsePromise.setSuccess(new RpcResponse(responseCommand.getCode(), responseHeader, responseCommand.getBody()));
+                break;
+            }
+            default: {
+                rpcResponsePromise.setSuccess(new RpcResponse(new RpcException(responseCommand.getCode(), "unknown remote error")));
+            }
+        }
+        return rpcResponsePromise;
+    }
+
     public Promise<RpcResponse> handleCommonBodyRequest(final String addr, RpcRequest rpcRequest, long timeoutMillis, Class bodyClass) throws Exception {
         final Promise<RpcResponse> rpcResponsePromise = createResponseFuture();
         RemotingCommand requestCommand = RpcClientUtils.createCommandForRpcRequest(rpcRequest);
diff --git "a/docs/cn/statictopic/RocketMQ_Static_Topic_Logic_Queue_\350\256\276\350\256\241.md" "b/docs/cn/statictopic/RocketMQ_Static_Topic_Logic_Queue_\350\256\276\350\256\241.md"
index c06d83f..ac1cc6b 100644
--- "a/docs/cn/statictopic/RocketMQ_Static_Topic_Logic_Queue_\350\256\276\350\256\241.md"
+++ "b/docs/cn/statictopic/RocketMQ_Static_Topic_Logic_Queue_\350\256\276\350\256\241.md"
@@ -7,6 +7,8 @@
 | 2021-12-03 | 增加代码走读的说明| dongforever |
 | 2021-12-10 | 引入Scope概念,保留『多集群动态零耦合』的集群设计模型 | dongforever |
 | 2021-12-23 | 梳理待完成事项;讨论Admin接口的适配方式 | dongforever |
+| 2021-01-05 | Offset存储改成『转换制』,以更好适配原有逻辑 | dongforever |
+
 
 
 
@@ -342,8 +344,9 @@ UpdateStaticTopic 命令会自动计算预期的分布情况,包括但不限
 
 
 #### consumerOffsets 系列
-Offset的存储,无需转换,直接存储在 LogicQueue 所对应的最新 PhysicalQueue 中。
-读取时,采取『Double-Read-Check』机制。
+Offset的存储,进行转换,存储在对应PhysicalQueue 所在的 Broker上面。  
+读取时,采取『Double-Read-Check』机制,并进行转换。  
+这样可以最大程度与 PhysicalQueue 的相关逻辑进行适配,比如 ConsumerProgress 可以看到『最近拉取时间』。 
 
 #### Client
 
@@ -454,6 +457,8 @@ User 接口,使用范围广泛如多语言等,应该尽可能简单,把适
 #### 阻止Pop模式、事务消息、定时消息使用 LogicQueue
 不兼容 事务消息和定时消息。  
 LogicQueue 当前不支持Pop模式消费。
+#### Nameserver 相关生命周期完善
+目前没有处理Nameserver中Mapping数据的生命周期
 #### ConsumeQueue 的 correctMinOffset 逻辑存在缺陷
 可能导致 LogicQueue 无法清除已经过期的 MappingItem。
 #### getOffsetInQueueByTime 语义有缺陷