You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2019/01/02 16:45:31 UTC

[rocketmq] 07/14: Add subscription, consumer offset, sendback etc. management module

This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch snode
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit 20da514fe758fb23ccc8d18cfa68c7cc348887d9
Author: duhenglucky <du...@gmail.com>
AuthorDate: Sat Dec 29 19:41:02 2018 +0800

    Add subscription, consumer offset, sendback etc. management module
---
 .../apache/rocketmq/broker/BrokerController.java   |   2 +-
 .../rocketmq/broker/longpolling/PullRequest.java   |  19 ++
 .../broker/longpolling/PullRequestHoldService.java |  45 ++--
 .../broker/processor/SendMessageProcessor.java     |  34 +--
 .../processor/SnodePullMessageProcessor.java       |  73 ++---
 .../org/apache/rocketmq/client/ClientConfig.java   |   2 +-
 .../consumer/store/RemoteBrokerOffsetStore.java    |  11 +-
 .../rocketmq/client/impl/MQClientAPIImpl.java      |  10 +-
 .../impl/consumer/DefaultMQPullConsumerImpl.java   |   2 +-
 .../impl/consumer/DefaultMQPushConsumerImpl.java   |   2 +-
 .../client/impl/factory/MQClientInstance.java      |  10 +-
 .../impl/producer/DefaultMQProducerImpl.java       |   2 +-
 .../org/apache/rocketmq/common/BrokerConfig.java   |  12 +
 .../rocketmq/common/protocol/ResponseCode.java     |   4 +
 .../header/ConsumerSendMsgBackRequestHeader.java   |  25 +-
 .../protocol/header/GetMaxOffsetRequestHeader.java |  10 +
 .../protocol/header/GetMinOffsetRequestHeader.java |  11 +
 .../header/QueryConsumerOffsetRequestHeader.java   |  10 +
 .../protocol/header/SearchOffsetRequestHeader.java |   9 +
 .../protocol/header/SendMessageRequestHeader.java  |  29 +-
 .../header/SendMessageRequestHeaderV2.java         |   4 +-
 .../header/UpdateConsumerOffsetRequestHeader.java  |  10 +
 .../rocketmq/example/quickstart/Consumer.java      |   3 +-
 .../namesrv/processor/DefaultRequestProcessor.java |   1 -
 .../remoting/netty/NettyRemotingAbstract.java      |   3 +-
 .../transport/rocketmq/NettyRemotingClient.java    |   1 +
 .../org/apache/rocketmq/snode/SnodeController.java | 110 ++++++--
 .../snode/client/ClientHousekeepingService.java    |   2 +-
 .../rocketmq/snode/client/ConsumerGroupInfo.java   |  10 +-
 .../rocketmq/snode/client/ConsumerManager.java     |  23 +-
 .../client/DefaultConsumerIdsChangeListener.java   |   2 +-
 .../rocketmq/snode/client/ProducerManager.java     |  74 +++---
 .../snode/client/SubscriptionGroupManager.java     |  20 +-
 .../apache/rocketmq/snode/config/SnodeConfig.java  |  12 +
 .../rocketmq/snode/constant/SnodeConstant.java     |   2 +
 .../SnodeException.java}                           |  29 +-
 .../snode/offset/ConsumerOffsetManager.java        | 243 +++++++++++++++++
 .../snode/processor/ConsumerManageProcessor.java   | 115 +++++++-
 .../snode/processor/HearbeatProcessor.java         |  64 ++++-
 .../snode/processor/PullMessageProcessor.java      |  56 +++-
 .../snode/processor/SendMessageProcessor.java      |   3 +-
 .../{SnodeOuterService.java => EnodeService.java}  |  33 ++-
 .../rocketmq/snode/service/NnodeService.java       |  47 ++++
 .../snode/service/impl/EnodeServiceImpl.java       | 295 +++++++++++++++++++++
 .../snode/service/impl/NnodeServiceImpl.java       | 208 +++++++++++++++
 .../snode/service/impl/ScheduledServiceImpl.java   |  51 +++-
 .../snode/service/impl/SnodeOuterServiceImpl.java  | 280 -------------------
 47 files changed, 1503 insertions(+), 520 deletions(-)

diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index 9639f65..eff8fd4 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -515,7 +515,7 @@ public class BrokerController {
          */
         this.remotingServer.registerProcessor(RequestCode.PULL_MESSAGE, this.pullMessageProcessor, this.pullMessageExecutor);
         this.pullMessageProcessor.registerConsumeMessageHook(consumeMessageHookList);
-        this.remotingServer.registerProcessor(RequestCode.SNODE_PULL_MESSAGE, this.snodePullMessageProcessor,pullMessageExecutor);
+        this.remotingServer.registerProcessor(RequestCode.SNODE_PULL_MESSAGE, this.snodePullMessageProcessor, pullMessageExecutor);
         this.snodePullMessageProcessor.registerConsumeMessageHook(consumeMessageHookList);
         /**
          * QueryMessageProcessor
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequest.java b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequest.java
index 045ab9b..e64b0e9 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequest.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequest.java
@@ -29,6 +29,7 @@ public class PullRequest {
     private final long pullFromThisOffset;
     private final SubscriptionData subscriptionData;
     private final MessageFilter messageFilter;
+    private final boolean snodeRequest;
 
     public PullRequest(RemotingCommand requestCommand, Channel clientChannel, long timeoutMillis, long suspendTimestamp,
         long pullFromThisOffset, SubscriptionData subscriptionData,
@@ -40,6 +41,20 @@ public class PullRequest {
         this.pullFromThisOffset = pullFromThisOffset;
         this.subscriptionData = subscriptionData;
         this.messageFilter = messageFilter;
+        this.snodeRequest = false;
+    }
+
+    public PullRequest(RemotingCommand requestCommand, Channel clientChannel, long timeoutMillis, long suspendTimestamp,
+        long pullFromThisOffset, SubscriptionData subscriptionData,
+        MessageFilter messageFilter, boolean snodeRequest) {
+        this.requestCommand = requestCommand;
+        this.clientChannel = clientChannel;
+        this.timeoutMillis = timeoutMillis;
+        this.suspendTimestamp = suspendTimestamp;
+        this.pullFromThisOffset = pullFromThisOffset;
+        this.subscriptionData = subscriptionData;
+        this.messageFilter = messageFilter;
+        this.snodeRequest = snodeRequest;
     }
 
     public RemotingCommand getRequestCommand() {
@@ -69,4 +84,8 @@ public class PullRequest {
     public MessageFilter getMessageFilter() {
         return messageFilter;
     }
+
+    public boolean isSnodeRequest() {
+        return snodeRequest;
+    }
 }
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequestHoldService.java b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequestHoldService.java
index af6addc..ee02017 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequestHoldService.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequestHoldService.java
@@ -127,34 +127,37 @@ public class PullRequestHoldService extends ServiceThread {
                     if (newestOffset <= request.getPullFromThisOffset()) {
                         newestOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId);
                     }
+                    try {
+                        if (newestOffset > request.getPullFromThisOffset()) {
+                            boolean match = request.getMessageFilter().isMatchedByConsumeQueue(tagsCode,
+                                new ConsumeQueueExt.CqExtUnit(tagsCode, msgStoreTime, filterBitMap));
+                            // match by bit map, need eval again when properties is not null.
+                            if (match && properties != null) {
+                                match = request.getMessageFilter().isMatchedByCommitLog(null, properties);
+                            }
 
-                    if (newestOffset > request.getPullFromThisOffset()) {
-                        boolean match = request.getMessageFilter().isMatchedByConsumeQueue(tagsCode,
-                            new ConsumeQueueExt.CqExtUnit(tagsCode, msgStoreTime, filterBitMap));
-                        // match by bit map, need eval again when properties is not null.
-                        if (match && properties != null) {
-                            match = request.getMessageFilter().isMatchedByCommitLog(null, properties);
-                        }
-
-                        if (match) {
-                            try {
-                                if (request.getMessageFilter() == null && request.getSubscriptionData() == null) {
-                                    this.brokerController.getSnodePullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),
-                                        request.getRequestCommand());
-                                } else {
-                                    this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),
-                                        request.getRequestCommand());
+                            if (match) {
+                                try {
+                                    if (request.isSnodeRequest()) {
+                                        this.brokerController.getSnodePullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),
+                                            request.getRequestCommand());
+                                    } else {
+                                        this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),
+                                            request.getRequestCommand());
+                                    }
+                                } catch (Throwable e) {
+                                    log.error("execute request when wakeup failed.", e);
                                 }
-                            } catch (Throwable e) {
-                                log.error("execute request when wakeup failed.", e);
+                                continue;
                             }
-                            continue;
                         }
+                    } catch (Exception ex) {
+                        log.error("Error occurred:{}", ex);
                     }
 
                     if (System.currentTimeMillis() >= (request.getSuspendTimestamp() + request.getTimeoutMillis())) {
                         try {
-                            if (request.getMessageFilter() == null && request.getSubscriptionData() == null) {
+                            if (request.isSnodeRequest()) {
                                 this.brokerController.getSnodePullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),
                                     request.getRequestCommand());
                             } else {
@@ -166,7 +169,6 @@ public class PullRequestHoldService extends ServiceThread {
                         }
                         continue;
                     }
-
                     replayList.add(request);
                 }
 
@@ -177,3 +179,4 @@ public class PullRequestHoldService extends ServiceThread {
         }
     }
 }
+
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
index b7e7a61..5f1c2f1 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
@@ -63,7 +63,7 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
 
     @Override
     public RemotingCommand processRequest(ChannelHandlerContext ctx,
-                                          RemotingCommand request) throws RemotingCommandException {
+        RemotingCommand request) throws RemotingCommandException {
         SendMessageContext mqtraceContext;
         switch (request.getCode()) {
             case RequestCode.CONSUMER_SEND_MSG_BACK:
@@ -99,7 +99,7 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
         throws RemotingCommandException {
         final RemotingCommand response = RemotingCommand.createResponseCommand(null);
         final ConsumerSendMsgBackRequestHeader requestHeader =
-            (ConsumerSendMsgBackRequestHeader)request.decodeCommandCustomHeader(ConsumerSendMsgBackRequestHeader.class);
+            (ConsumerSendMsgBackRequestHeader) request.decodeCommandCustomHeader(ConsumerSendMsgBackRequestHeader.class);
 
         if (this.hasConsumeMessageHook() && !UtilAll.isBlank(requestHeader.getOriginMsgId())) {
 
@@ -249,8 +249,8 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
     }
 
     private boolean handleRetryAndDLQ(SendMessageRequestHeader requestHeader, RemotingCommand response,
-                                      RemotingCommand request,
-                                      MessageExt msg, TopicConfig topicConfig) {
+        RemotingCommand request,
+        MessageExt msg, TopicConfig topicConfig) {
         String newTopic = requestHeader.getTopic();
         if (null != newTopic && newTopic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
             String groupName = newTopic.substring(MixAll.RETRY_GROUP_TOPIC_PREFIX.length());
@@ -293,12 +293,12 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
     }
 
     private RemotingCommand sendMessage(final ChannelHandlerContext ctx,
-                                        final RemotingCommand request,
-                                        final SendMessageContext sendMessageContext,
-                                        final SendMessageRequestHeader requestHeader) throws RemotingCommandException {
+        final RemotingCommand request,
+        final SendMessageContext sendMessageContext,
+        final SendMessageRequestHeader requestHeader) throws RemotingCommandException {
 
         final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
-        final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader();
+        final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader) response.readCustomHeader();
 
         response.setOpaque(request.getOpaque());
 
@@ -366,9 +366,9 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
     }
 
     private RemotingCommand handlePutMessageResult(PutMessageResult putMessageResult, RemotingCommand response,
-                                                   RemotingCommand request, MessageExt msg,
-                                                   SendMessageResponseHeader responseHeader, SendMessageContext sendMessageContext, ChannelHandlerContext ctx,
-                                                   int queueIdInt) {
+        RemotingCommand request, MessageExt msg,
+        SendMessageResponseHeader responseHeader, SendMessageContext sendMessageContext, ChannelHandlerContext ctx,
+        int queueIdInt) {
         if (putMessageResult == null) {
             response.setCode(ResponseCode.SYSTEM_ERROR);
             response.setRemark("store putMessage return null");
@@ -448,7 +448,7 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
 
                 int commercialBaseCount = brokerController.getBrokerConfig().getCommercialBaseCount();
                 int wroteSize = putMessageResult.getAppendMessageResult().getWroteBytes();
-                int incValue = (int)Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT) * commercialBaseCount;
+                int incValue = (int) Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT) * commercialBaseCount;
 
                 sendMessageContext.setCommercialSendStats(BrokerStatsManager.StatsType.SEND_SUCCESS);
                 sendMessageContext.setCommercialSendTimes(incValue);
@@ -459,7 +459,7 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
         } else {
             if (hasSendMessageHook()) {
                 int wroteSize = request.getBody().length;
-                int incValue = (int)Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT);
+                int incValue = (int) Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT);
 
                 sendMessageContext.setCommercialSendStats(BrokerStatsManager.StatsType.SEND_FAILURE);
                 sendMessageContext.setCommercialSendTimes(incValue);
@@ -471,12 +471,12 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
     }
 
     private RemotingCommand sendBatchMessage(final ChannelHandlerContext ctx,
-                                             final RemotingCommand request,
-                                             final SendMessageContext sendMessageContext,
-                                             final SendMessageRequestHeader requestHeader) throws RemotingCommandException {
+        final RemotingCommand request,
+        final SendMessageContext sendMessageContext,
+        final SendMessageRequestHeader requestHeader) throws RemotingCommandException {
 
         final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
-        final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader();
+        final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader) response.readCustomHeader();
 
         response.setOpaque(request.getOpaque());
 
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/SnodePullMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/SnodePullMessageProcessor.java
index 8beb6fa..788c498 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/SnodePullMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/SnodePullMessageProcessor.java
@@ -24,7 +24,6 @@ import io.netty.channel.FileRegion;
 import java.nio.ByteBuffer;
 import java.util.List;
 import org.apache.rocketmq.broker.BrokerController;
-import org.apache.rocketmq.broker.client.ConsumerGroupInfo;
 import org.apache.rocketmq.broker.filter.ConsumerFilterData;
 import org.apache.rocketmq.broker.filter.ConsumerFilterManager;
 import org.apache.rocketmq.broker.filter.ExpressionForRetryMessageFilter;
@@ -34,22 +33,18 @@ import org.apache.rocketmq.broker.mqtrace.ConsumeMessageContext;
 import org.apache.rocketmq.broker.mqtrace.ConsumeMessageHook;
 import org.apache.rocketmq.broker.pagecache.ManyMessageTransfer;
 import org.apache.rocketmq.common.MixAll;
-import org.apache.rocketmq.common.TopicConfig;
 import org.apache.rocketmq.common.TopicFilterType;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.constant.PermName;
 import org.apache.rocketmq.common.filter.ExpressionType;
 import org.apache.rocketmq.common.filter.FilterAPI;
-import org.apache.rocketmq.common.help.FAQUrl;
 import org.apache.rocketmq.common.message.MessageDecoder;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.common.protocol.ResponseCode;
 import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
 import org.apache.rocketmq.common.protocol.header.PullMessageResponseHeader;
-import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
 import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
 import org.apache.rocketmq.common.protocol.topic.OffsetMovedEvent;
-import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
 import org.apache.rocketmq.common.sysflag.PullSysFlag;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
@@ -95,23 +90,50 @@ public class SnodePullMessageProcessor implements NettyRequestProcessor {
 
         response.setOpaque(request.getOpaque());
 
-        log.info("receive PullMessage request command, {}", request);
         final boolean hasSuspendFlag = PullSysFlag.hasSuspendFlag(requestHeader.getSysFlag());
         final boolean hasCommitOffsetFlag = PullSysFlag.hasCommitOffsetFlag(requestHeader.getSysFlag());
-        final boolean hasSubscriptionFlag = PullSysFlag.hasSubscriptionFlag(requestHeader.getSysFlag());
+
+        ConsumerFilterData consumerFilterData = null;
+        SubscriptionData subscriptionData;
+        try {
+            subscriptionData = FilterAPI.build(
+                requestHeader.getTopic(), requestHeader.getSubscription(), requestHeader.getExpressionType()
+            );
+            if (!ExpressionType.isTagType(subscriptionData.getExpressionType())) {
+                consumerFilterData = ConsumerFilterManager.build(
+                    requestHeader.getTopic(), requestHeader.getConsumerGroup(), requestHeader.getSubscription(),
+                    requestHeader.getExpressionType(), requestHeader.getSubVersion()
+                );
+                assert consumerFilterData != null;
+            }
+        } catch (Exception e) {
+            log.warn("Parse the consumer's subscription[{}] failed, group: {}", requestHeader.getSubscription(),
+                requestHeader.getConsumerGroup());
+            response.setCode(ResponseCode.SUBSCRIPTION_PARSE_FAILED);
+            response.setRemark("parse the consumer's subscription failed");
+            return response;
+        }
 
         final long suspendTimeoutMillisLong = hasSuspendFlag ? requestHeader.getSuspendTimeoutMillis() : 0;
         if (!PermName.isReadable(this.brokerController.getBrokerConfig().getBrokerPermission())) {
             response.setCode(ResponseCode.NO_PERMISSION);
-            response.setRemark(String.format("the broker[%s] pulling message is forbidden", this.brokerController.getBrokerConfig().getBrokerIP1()));
+            response.setRemark(String.format("The broker[%s] pulling message is forbidden", this.brokerController.getBrokerConfig().getBrokerIP1()));
             return response;
         }
 
+        MessageFilter messageFilter;
+        if (this.brokerController.getBrokerConfig().isFilterSupportRetry()) {
+            messageFilter = new ExpressionForRetryMessageFilter(subscriptionData, consumerFilterData,
+                this.brokerController.getConsumerFilterManager());
+        } else {
+            messageFilter = new ExpressionMessageFilter(subscriptionData, consumerFilterData,
+                this.brokerController.getConsumerFilterManager());
+        }
+
         final GetMessageResult getMessageResult =
             this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
-                requestHeader.getQueueId(), requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), null);
+                requestHeader.getQueueId(), requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), messageFilter);
 
-        log.info("Get message response:{}",getMessageResult);
         if (getMessageResult != null) {
             response.setRemark(getMessageResult.getStatus().name());
             responseHeader.setNextBeginOffset(getMessageResult.getNextBeginOffset());
@@ -136,19 +158,6 @@ public class SnodePullMessageProcessor implements NettyRequestProcessor {
                     break;
             }
 
-//            if (this.brokerController.getBrokerConfig().isSlaveReadEnable()) {
-//                // consume too slow ,redirect to another machine
-//                if (getMessageResult.isSuggestPullingFromSlave()) {
-//                    responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly());
-//                }
-//                // consume ok
-//                else {
-//                    responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getBrokerId());
-//                }
-//            } else {
-//                responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);
-//            }
-
             switch (getMessageResult.getStatus()) {
                 case FOUND:
                     response.setCode(ResponseCode.SUCCESS);
@@ -162,7 +171,7 @@ public class SnodePullMessageProcessor implements NettyRequestProcessor {
                         response.setCode(ResponseCode.PULL_OFFSET_MOVED);
 
                         // XXX: warn and notify me
-                        log.info("the broker store no queue data, fix the request offset {} to {}, Topic: {} QueueId: {} Consumer Group: {}",
+                        log.info("The broker store no queue data, fix the request offset {} to {}, Topic: {} QueueId: {} Consumer Group: {}",
                             requestHeader.getQueueOffset(),
                             getMessageResult.getNextBeginOffset(),
                             requestHeader.getTopic(),
@@ -182,7 +191,7 @@ public class SnodePullMessageProcessor implements NettyRequestProcessor {
                 case OFFSET_OVERFLOW_BADLY:
                     response.setCode(ResponseCode.PULL_OFFSET_MOVED);
                     // XXX: warn and notify me
-                    log.info("the request offset: {} over flow badly, broker max offset: {}, consumer: {}",
+                    log.info("The request offset: {} over flow badly, broker max offset: {}, consumer: {}",
                         requestHeader.getQueueOffset(), getMessageResult.getMaxOffset(), channel.remoteAddress());
                     break;
                 case OFFSET_OVERFLOW_ONE:
@@ -190,7 +199,7 @@ public class SnodePullMessageProcessor implements NettyRequestProcessor {
                     break;
                 case OFFSET_TOO_SMALL:
                     response.setCode(ResponseCode.PULL_OFFSET_MOVED);
-                    log.info("the request offset too small. group={}, topic={}, requestOffset={}, brokerMinOffset={}, clientIp={}",
+                    log.info("The request offset too small. group={}, topic={}, requestOffset={}, brokerMinOffset={}, clientIp={}",
                         requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueOffset(),
                         getMessageResult.getMinOffset(), channel.remoteAddress());
                     break;
@@ -267,12 +276,12 @@ public class SnodePullMessageProcessor implements NettyRequestProcessor {
                                 public void operationComplete(ChannelFuture future) throws Exception {
                                     getMessageResult.release();
                                     if (!future.isSuccess()) {
-                                        log.error("transfer many message by pagecache failed, {}", channel.remoteAddress(), future.cause());
+                                        log.error("Transfer many message by pagecache failed, {}", channel.remoteAddress(), future.cause());
                                     }
                                 }
                             });
                         } catch (Throwable e) {
-                            log.error("transfer many message by pagecache exception", e);
+                            log.error("Transfer many message by pagecache exception", e);
                             getMessageResult.release();
                         }
 
@@ -291,7 +300,7 @@ public class SnodePullMessageProcessor implements NettyRequestProcessor {
                         long offset = requestHeader.getQueueOffset();
                         int queueId = requestHeader.getQueueId();
                         PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills,
-                            this.brokerController.getMessageStore().now(), offset, null, null);
+                            this.brokerController.getMessageStore().now(), offset, subscriptionData, messageFilter, true);
                         this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest);
                         response = null;
                         break;
@@ -407,7 +416,7 @@ public class SnodePullMessageProcessor implements NettyRequestProcessor {
     }
 
     public void executeRequestWhenWakeup(final Channel channel,
-        final RemotingCommand request) throws RemotingCommandException {
+        final RemotingCommand request) {
         Runnable run = new Runnable() {
             @Override
             public void run() {
@@ -422,7 +431,7 @@ public class SnodePullMessageProcessor implements NettyRequestProcessor {
                                 @Override
                                 public void operationComplete(ChannelFuture future) throws Exception {
                                     if (!future.isSuccess()) {
-                                        log.error("processRequestWrapper response to {} failed",
+                                        log.error("ProcessRequestWrapper snode response to {} failed",
                                             future.channel().remoteAddress(), future.cause());
                                         log.error(request.toString());
                                         log.error(response.toString());
@@ -430,7 +439,7 @@ public class SnodePullMessageProcessor implements NettyRequestProcessor {
                                 }
                             });
                         } catch (Throwable e) {
-                            log.error("processRequestWrapper process request over, but response failed", e);
+                            log.error("ProcessRequestWrapper snode process request over, but response failed", e);
                             log.error(request.toString());
                             log.error(response.toString());
                         }
diff --git a/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java b/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
index 562810f..48fb934 100644
--- a/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
+++ b/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
@@ -45,7 +45,7 @@ public class ClientConfig {
     private int persistConsumerOffsetInterval = 1000 * 5;
     private boolean unitMode = false;
     private String unitName;
-    private boolean vipChannelEnabled = Boolean.parseBoolean(System.getProperty(SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY, "true"));
+    private boolean vipChannelEnabled = Boolean.parseBoolean(System.getProperty(SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY, "false"));
 
     private boolean useTLS = TlsSystemConfig.tlsEnable;
 
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java b/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java
index c1524e1..f3e2f4e 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java
@@ -16,6 +16,7 @@
  */
 package org.apache.rocketmq.client.consumer.store;
 
+import com.sun.org.apache.regexp.internal.RE;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
@@ -187,8 +188,7 @@ public class RemoteBrokerOffsetStore implements OffsetStore {
     }
 
     /**
-     * Update the Consumer Offset in one way, once the Master is off, updated to Slave,
-     * here need to be optimized.
+     * Update the Consumer Offset in one way, once the Master is off, updated to Slave, here need to be optimized.
      */
     private void updateConsumeOffsetToBroker(MessageQueue mq, long offset) throws RemotingException,
         MQBrokerException, InterruptedException, MQClientException {
@@ -196,8 +196,7 @@ public class RemoteBrokerOffsetStore implements OffsetStore {
     }
 
     /**
-     * Update the Consumer Offset synchronously, once the Master is off, updated to Slave,
-     * here need to be optimized.
+     * Update the Consumer Offset synchronously, once the Master is off, updated to Slave, here need to be optimized.
      */
     @Override
     public void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean isOneway) throws RemotingException,
@@ -215,7 +214,7 @@ public class RemoteBrokerOffsetStore implements OffsetStore {
             requestHeader.setConsumerGroup(this.groupName);
             requestHeader.setQueueId(mq.getQueueId());
             requestHeader.setCommitOffset(offset);
-
+            requestHeader.setEnodeName(mq.getBrokerName());
             if (isOneway) {
                 this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffsetOneway(
                     findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);
@@ -242,7 +241,7 @@ public class RemoteBrokerOffsetStore implements OffsetStore {
             requestHeader.setTopic(mq.getTopic());
             requestHeader.setConsumerGroup(this.groupName);
             requestHeader.setQueueId(mq.getQueueId());
-
+            requestHeader.setEnodeName(mq.getBrokerName());
             return this.mQClientFactory.getMQClientAPIImpl().queryConsumerOffset(
                 findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);
         } else {
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
index 6302cd0..a7aead1 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
@@ -353,8 +353,7 @@ public class MQClientAPIImpl {
         final long timeoutMillis,
         final RemotingCommand request
     ) throws RemotingException, MQBrokerException, InterruptedException {
-        String addrS = "localhost:11911";//TODO FIXME
-        RemotingCommand response = this.remotingClient.invokeSync(addrS, request, timeoutMillis);
+        RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
         assert response != null;
         return this.processSendResponse(brokerName, msg, response);
     }
@@ -566,7 +565,6 @@ public class MQClientAPIImpl {
     ) throws RemotingException, MQBrokerException, InterruptedException {
         requestHeader.setEnodeAddr(addr);
         RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.SNODE_PULL_MESSAGE, requestHeader);
-         addr = "localhost:11911"; //TODO FIXME
         switch (communicationMode) {
             case ONEWAY:
                 assert false;
@@ -649,7 +647,6 @@ public class MQClientAPIImpl {
 
         PullMessageResponseHeader responseHeader =
             (PullMessageResponseHeader) response.decodeCommandCustomHeader(PullMessageResponseHeader.class);
-        log.info("response header: {}", responseHeader.getSuggestWhichBrokerId());
         return new PullResultExt(pullStatus, responseHeader.getNextBeginOffset(), responseHeader.getMinOffset(),
             responseHeader.getMaxOffset(), null, responseHeader.getSuggestWhichBrokerId(), response.getBody());
     }
@@ -734,8 +731,7 @@ public class MQClientAPIImpl {
         GetConsumerListByGroupRequestHeader requestHeader = new GetConsumerListByGroupRequestHeader();
         requestHeader.setConsumerGroup(consumerGroup);
         RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_CONSUMER_LIST_BY_GROUP, requestHeader);
-        String addrS = "localhost:11911";//TODO FIXME
-        RemotingCommand response = this.remotingClient.invokeSync(addrS,
+        RemotingCommand response = this.remotingClient.invokeSync(addr,
             request, timeoutMillis);
         assert response != null;
         switch (response.getCode()) {
@@ -940,6 +936,7 @@ public class MQClientAPIImpl {
     }
 
     public void consumerSendMessageBack(
+        final String brokerName,
         final String addr,
         final MessageExt msg,
         final String consumerGroup,
@@ -956,6 +953,7 @@ public class MQClientAPIImpl {
         requestHeader.setDelayLevel(delayLevel);
         requestHeader.setOriginMsgId(msg.getMsgId());
         requestHeader.setMaxReconsumeTimes(maxConsumeRetryTimes);
+        requestHeader.setEnodeName(brokerName);
 
         RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
             request, timeoutMillis);
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
index 420d89b..20a72d8 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
@@ -495,7 +495,7 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
                 consumerGroup = this.defaultMQPullConsumer.getConsumerGroup();
             }
 
-            this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, msg, consumerGroup, delayLevel, 3000,
+            this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerName, brokerAddr, msg, consumerGroup, delayLevel, 3000,
                 this.defaultMQPullConsumer.getMaxReconsumeTimes());
         } catch (Exception e) {
             log.error("sendMessageBack Exception, " + this.defaultMQPullConsumer.getConsumerGroup(), e);
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
index 393ef92..2f3cc97 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
@@ -499,7 +499,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
         try {
             String brokerAddr = (null != brokerName) ? this.mQClientFactory.findBrokerAddressInPublish(brokerName)
                 : RemotingHelper.parseSocketAddressAddr(msg.getStoreHost());
-            this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, msg,
+            this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerName, brokerAddr, msg,
                 this.defaultMQPushConsumer.getConsumerGroup(), delayLevel, 5000, getMaxReconsumeTimes());
         } catch (Exception e) {
             log.error("sendMessageBack Exception, " + this.defaultMQPushConsumer.getConsumerGroup(), e);
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
index 984e2cc..4e9d9ba 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
@@ -537,8 +537,7 @@ public class MQClientInstance {
                             }
 
                             try {
-                                String addrS = "localhost:11911"; //TODO FIXME
-                                int version = this.mQClientAPIImpl.sendHearbeat(addrS, heartbeatData, 3000);
+                                int version = this.mQClientAPIImpl.sendHearbeat(addr, heartbeatData, 3000);
                                 if (!this.brokerVersionTable.containsKey(brokerName)) {
                                     this.brokerVersionTable.put(brokerName, new HashMap<String, Integer>(4));
                                 }
@@ -548,7 +547,7 @@ public class MQClientInstance {
                                     log.info(heartbeatData.toString());
                                 }
                             } catch (Exception e) {
-                                log.error("send heart beat error:{}",e);
+                                log.error("send heart beat error:{}", e);
                                 if (this.isBrokerInNameServer(addr)) {
                                     log.info("send heart beat to broker[{} {} {}] failed", brokerName, id, addr);
                                 } else {
@@ -722,9 +721,10 @@ public class MQClientInstance {
 
         return false;
     }
+
     /**
-     * This method will be removed in the version 5.0.0,because filterServer was removed,and method <code>subscribe(final String topic, final MessageSelector messageSelector)</code>
-     * is recommended.
+     * This method will be removed in the version 5.0.0,because filterServer was removed,and method
+     * <code>subscribe(final String topic, final MessageSelector messageSelector)</code> is recommended.
      */
     @Deprecated
     private void uploadFilterClassToAllFilterServer(final String consumerGroup, final String fullClassName,
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
index 9ada834..291834f 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
@@ -733,7 +733,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
                 requestHeader.setReconsumeTimes(0);
                 requestHeader.setUnitMode(this.isUnitMode());
                 requestHeader.setBatch(msg instanceof MessageBatch);
-                requestHeader.setEnodeAddr(brokerAddr);
+                requestHeader.setEnodeName(mq.getBrokerName());
                 if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                     String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
                     if (reconsumeTimes != null) {
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index f81af21..de0b8e2 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -171,6 +171,18 @@ public class BrokerConfig {
     @ImportantField
     private long transactionCheckInterval = 60 * 1000;
 
+
+    @ImportantField
+    private boolean transactionEnable = true;
+
+    public boolean isTransactionEnable() {
+        return transactionEnable;
+    }
+
+    public void setTransactionEnable(boolean transactionEnable) {
+        this.transactionEnable = transactionEnable;
+    }
+
     public boolean isTraceOn() {
         return traceOn;
     }
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/ResponseCode.java b/common/src/main/java/org/apache/rocketmq/common/protocol/ResponseCode.java
index f62c4ea..97d433b 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/ResponseCode.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/ResponseCode.java
@@ -73,4 +73,8 @@ public class ResponseCode extends RemotingSysResponseCode {
     public static final int CONSUME_MSG_TIMEOUT = 207;
 
     public static final int NO_MESSAGE = 208;
+
+    public static final int QUERY_OFFSET_ERROR = 210;
+
+    public static final int PARAMETER_ERROR = 211;
 }
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/ConsumerSendMsgBackRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/ConsumerSendMsgBackRequestHeader.java
index bd8fbb4..8ead5ef 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/ConsumerSendMsgBackRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/ConsumerSendMsgBackRequestHeader.java
@@ -35,6 +35,8 @@ public class ConsumerSendMsgBackRequestHeader implements CommandCustomHeader {
     private boolean unitMode = false;
     private Integer maxReconsumeTimes;
 
+    private String enodeName;
+
     @Override
     public void checkFields() throws RemotingCommandException {
 
@@ -96,9 +98,24 @@ public class ConsumerSendMsgBackRequestHeader implements CommandCustomHeader {
         this.maxReconsumeTimes = maxReconsumeTimes;
     }
 
-    @Override
-    public String toString() {
-        return "ConsumerSendMsgBackRequestHeader [group=" + group + ", originTopic=" + originTopic + ", originMsgId=" + originMsgId
-            + ", delayLevel=" + delayLevel + ", unitMode=" + unitMode + ", maxReconsumeTimes=" + maxReconsumeTimes + "]";
+    public String getEnodeName() {
+        return enodeName;
+    }
+
+    public void setEnodeName(String enodeName) {
+        this.enodeName = enodeName;
+    }
+
+    @Override public String toString() {
+        return "ConsumerSendMsgBackRequestHeader{" +
+            "offset=" + offset +
+            ", group='" + group + '\'' +
+            ", delayLevel=" + delayLevel +
+            ", originMsgId='" + originMsgId + '\'' +
+            ", originTopic='" + originTopic + '\'' +
+            ", unitMode=" + unitMode +
+            ", maxReconsumeTimes=" + maxReconsumeTimes +
+            ", enodeName='" + enodeName + '\'' +
+            '}';
     }
 }
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetMaxOffsetRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetMaxOffsetRequestHeader.java
index 871309d..1b5f951 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetMaxOffsetRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetMaxOffsetRequestHeader.java
@@ -30,6 +30,8 @@ public class GetMaxOffsetRequestHeader implements CommandCustomHeader {
     @CFNotNull
     private Integer queueId;
 
+    private String enodeName;
+
     @Override
     public void checkFields() throws RemotingCommandException {
     }
@@ -49,4 +51,12 @@ public class GetMaxOffsetRequestHeader implements CommandCustomHeader {
     public void setQueueId(Integer queueId) {
         this.queueId = queueId;
     }
+
+    public String getEnodeName() {
+        return enodeName;
+    }
+
+    public void setEnodeName(String enodeName) {
+        this.enodeName = enodeName;
+    }
 }
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetMinOffsetRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetMinOffsetRequestHeader.java
index 6fb8ed4..1ac771b 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetMinOffsetRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetMinOffsetRequestHeader.java
@@ -27,9 +27,12 @@ import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 public class GetMinOffsetRequestHeader implements CommandCustomHeader {
     @CFNotNull
     private String topic;
+
     @CFNotNull
     private Integer queueId;
 
+    private String enodeName;
+
     @Override
     public void checkFields() throws RemotingCommandException {
     }
@@ -49,4 +52,12 @@ public class GetMinOffsetRequestHeader implements CommandCustomHeader {
     public void setQueueId(Integer queueId) {
         this.queueId = queueId;
     }
+
+    public String getEnodeName() {
+        return enodeName;
+    }
+
+    public void setEnodeName(String enodeName) {
+        this.enodeName = enodeName;
+    }
 }
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/QueryConsumerOffsetRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/QueryConsumerOffsetRequestHeader.java
index 3b7f627..2034c97 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/QueryConsumerOffsetRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/QueryConsumerOffsetRequestHeader.java
@@ -32,6 +32,8 @@ public class QueryConsumerOffsetRequestHeader implements CommandCustomHeader {
     @CFNotNull
     private Integer queueId;
 
+    private String enodeName;
+
     @Override
     public void checkFields() throws RemotingCommandException {
     }
@@ -59,4 +61,12 @@ public class QueryConsumerOffsetRequestHeader implements CommandCustomHeader {
     public void setQueueId(Integer queueId) {
         this.queueId = queueId;
     }
+
+    public String getEnodeName() {
+        return enodeName;
+    }
+
+    public void setEnodeName(String enodeName) {
+        this.enodeName = enodeName;
+    }
 }
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SearchOffsetRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SearchOffsetRequestHeader.java
index 5ea2e24..4db36b5 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SearchOffsetRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SearchOffsetRequestHeader.java
@@ -32,6 +32,8 @@ public class SearchOffsetRequestHeader implements CommandCustomHeader {
     @CFNotNull
     private Long timestamp;
 
+    private String enodeName;
+
     @Override
     public void checkFields() throws RemotingCommandException {
 
@@ -61,4 +63,11 @@ public class SearchOffsetRequestHeader implements CommandCustomHeader {
         this.timestamp = timestamp;
     }
 
+    public String getEnodeName() {
+        return enodeName;
+    }
+
+    public void setEnodeName(String enodeName) {
+        this.enodeName = enodeName;
+    }
 }
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeader.java
index a032911..bab833b 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeader.java
@@ -53,7 +53,7 @@ public class SendMessageRequestHeader implements CommandCustomHeader {
 
     private Integer maxReconsumeTimes;
 
-    private String enodeAddr;
+    private String enodeName;
 
     @Override
     public void checkFields() throws RemotingCommandException {
@@ -163,11 +163,30 @@ public class SendMessageRequestHeader implements CommandCustomHeader {
         this.batch = batch;
     }
 
-    public String getEnodeAddr() {
-        return enodeAddr;
+    public String getEnodeName() {
+        return enodeName;
     }
 
-    public void setEnodeAddr(String enodeAddr) {
-        this.enodeAddr = enodeAddr;
+    public void setEnodeName(String enodeName) {
+        this.enodeName = enodeName;
+    }
+
+    @Override public String toString() {
+        return "SendMessageRequestHeader{" +
+            "producerGroup='" + producerGroup + '\'' +
+            ", topic='" + topic + '\'' +
+            ", defaultTopic='" + defaultTopic + '\'' +
+            ", defaultTopicQueueNums=" + defaultTopicQueueNums +
+            ", queueId=" + queueId +
+            ", sysFlag=" + sysFlag +
+            ", bornTimestamp=" + bornTimestamp +
+            ", flag=" + flag +
+            ", properties='" + properties + '\'' +
+            ", reconsumeTimes=" + reconsumeTimes +
+            ", unitMode=" + unitMode +
+            ", batch=" + batch +
+            ", maxReconsumeTimes=" + maxReconsumeTimes +
+            ", enodeName='" + enodeName + '\'' +
+            '}';
     }
 }
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeaderV2.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeaderV2.java
index 9602805..ed6babe 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeaderV2.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeaderV2.java
@@ -71,7 +71,7 @@ public class SendMessageRequestHeaderV2 implements CommandCustomHeader {
         v1.setUnitMode(v2.k);
         v1.setMaxReconsumeTimes(v2.l);
         v1.setBatch(v2.m);
-        v1.setEnodeAddr(v2.n);
+        v1.setEnodeName(v2.n);
         return v1;
     }
 
@@ -90,7 +90,7 @@ public class SendMessageRequestHeaderV2 implements CommandCustomHeader {
         v2.k = v1.isUnitMode();
         v2.l = v1.getMaxReconsumeTimes();
         v2.m = v1.isBatch();
-        v2.n = v1.getEnodeAddr();
+        v2.n = v1.getEnodeName();
         return v2;
     }
 
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/UpdateConsumerOffsetRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/UpdateConsumerOffsetRequestHeader.java
index 3f44db6..dd4d3b4 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/UpdateConsumerOffsetRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/UpdateConsumerOffsetRequestHeader.java
@@ -34,6 +34,8 @@ public class UpdateConsumerOffsetRequestHeader implements CommandCustomHeader {
     @CFNotNull
     private Long commitOffset;
 
+    private String enodeName;
+
     @Override
     public void checkFields() throws RemotingCommandException {
     }
@@ -69,4 +71,12 @@ public class UpdateConsumerOffsetRequestHeader implements CommandCustomHeader {
     public void setCommitOffset(Long commitOffset) {
         this.commitOffset = commitOffset;
     }
+
+    public String getEnodeName() {
+        return enodeName;
+    }
+
+    public void setEnodeName(String enodeName) {
+        this.enodeName = enodeName;
+    }
 }
diff --git a/example/src/main/java/org/apache/rocketmq/example/quickstart/Consumer.java b/example/src/main/java/org/apache/rocketmq/example/quickstart/Consumer.java
index 6d3b936..8d667a4 100644
--- a/example/src/main/java/org/apache/rocketmq/example/quickstart/Consumer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/quickstart/Consumer.java
@@ -35,7 +35,7 @@ public class Consumer {
         /*
          * Instantiate with specified consumer group name.
          */
-        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
+        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("RocketMQ5");
 
         /*
          * Specify name server addresses.
@@ -58,7 +58,6 @@ public class Consumer {
          * Subscribe one more more topics to consume.
          */
         consumer.subscribe("TopicTest", "*");
-
         /*
          *  Register callback to execute on arrival of messages fetched from brokers.
          */
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java
index 3b12d49..0af8c98 100644
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java
@@ -379,7 +379,6 @@ public class DefaultRequestProcessor implements NettyRequestProcessor {
 
     private RemotingCommand getBrokerClusterInfo(ChannelHandlerContext ctx, RemotingCommand request) {
         final RemotingCommand response = RemotingCommand.createResponseCommand(null);
-
         byte[] content = this.namesrvController.getRouteInfoManager().getAllClusterInfo();
         response.setBody(content);
 
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
index cae2bf4..8d3f54b 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
@@ -296,8 +296,7 @@ public abstract class NettyRemotingAbstract {
                 responseFuture.release();
             }
         } else {
-            log.warn("receive response, but not matched any request, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
-            log.warn(cmd.toString());
+            log.warn("receive response, but not matched any request: {}, cmd: {}", RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd);
         }
     }
 
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/rocketmq/NettyRemotingClient.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/rocketmq/NettyRemotingClient.java
index 4e691f1..55a1d3d 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/rocketmq/NettyRemotingClient.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/rocketmq/NettyRemotingClient.java
@@ -181,6 +181,7 @@ public class NettyRemotingClient extends NettyRemotingClientAbstract implements
         throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException {
         long beginStartTime = System.currentTimeMillis();
         final Channel channel = this.getAndCreateChannel(addr, timeoutMillis);
+
         if (channel != null && channel.isActive()) {
             try {
                 if (this.rpcHook != null) {
diff --git a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java
index 50337df..cb8d662 100644
--- a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java
+++ b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java
@@ -26,6 +26,8 @@ import org.apache.rocketmq.common.protocol.RequestCode;
 import org.apache.rocketmq.common.utils.ThreadUtils;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.RemotingClient;
+import org.apache.rocketmq.remoting.RemotingClientFactory;
 import org.apache.rocketmq.remoting.RemotingServer;
 import org.apache.rocketmq.remoting.RemotingServerFactory;
 import org.apache.rocketmq.remoting.netty.NettyClientConfig;
@@ -37,14 +39,17 @@ import org.apache.rocketmq.snode.client.DefaultConsumerIdsChangeListener;
 import org.apache.rocketmq.snode.client.ProducerManager;
 import org.apache.rocketmq.snode.client.SubscriptionGroupManager;
 import org.apache.rocketmq.snode.config.SnodeConfig;
+import org.apache.rocketmq.snode.offset.ConsumerOffsetManager;
 import org.apache.rocketmq.snode.processor.ConsumerManageProcessor;
 import org.apache.rocketmq.snode.processor.HearbeatProcessor;
 import org.apache.rocketmq.snode.processor.PullMessageProcessor;
 import org.apache.rocketmq.snode.processor.SendMessageProcessor;
+import org.apache.rocketmq.snode.service.EnodeService;
+import org.apache.rocketmq.snode.service.NnodeService;
 import org.apache.rocketmq.snode.service.ScheduledService;
-import org.apache.rocketmq.snode.service.SnodeOuterService;
+import org.apache.rocketmq.snode.service.impl.EnodeServiceImpl;
+import org.apache.rocketmq.snode.service.impl.NnodeServiceImpl;
 import org.apache.rocketmq.snode.service.impl.ScheduledServiceImpl;
-import org.apache.rocketmq.snode.service.impl.SnodeOuterServiceImpl;
 
 public class SnodeController {
     private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME);
@@ -52,17 +57,25 @@ public class SnodeController {
     private final SnodeConfig snodeConfig;
     private final NettyServerConfig nettyServerConfig;
     private final NettyClientConfig nettyClientConfig;
+    private RemotingClient remotingClient;
     private RemotingServer snodeServer;
-    private ExecutorService sendMessageExcutor;
+    private ExecutorService sendMessageExecutor;
     private ExecutorService heartbeatExecutor;
-    private ExecutorService pullMessageExcutor;
-    private SnodeOuterService snodeOuterService;
-    private ExecutorService consumerManagerExcutor;
+    private ExecutorService pullMessageExecutor;
+    private ExecutorService consumerManageExecutor;
+    private EnodeService enodeService;
+    private NnodeService nnodeService;
+    private ExecutorService consumerManagerExecutor;
     private ScheduledService scheduledService;
     private ProducerManager producerManager;
     private ConsumerManager consumerManager;
     private ClientHousekeepingService clientHousekeepingService;
     private SubscriptionGroupManager subscriptionGroupManager;
+    private ConsumerOffsetManager consumerOffsetManager;
+    private ConsumerManageProcessor consumerManageProcessor;
+    private SendMessageProcessor sendMessageProcessor;
+    private PullMessageProcessor pullMessageProcessor;
+    private HearbeatProcessor hearbeatProcessor;
 
     private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
         "SnodeControllerScheduledThread"));
@@ -73,9 +86,12 @@ public class SnodeController {
         this.nettyClientConfig = nettyClientConfig;
         this.nettyServerConfig = nettyServerConfig;
         this.snodeConfig = snodeConfig;
-        this.snodeOuterService = SnodeOuterServiceImpl.getInstance(this);
-        this.scheduledService = new ScheduledServiceImpl(this.snodeOuterService, this.snodeConfig);
-        this.sendMessageExcutor = ThreadUtils.newThreadPoolExecutor(
+        this.enodeService = new EnodeServiceImpl(this);
+        this.nnodeService = new NnodeServiceImpl(this);
+        this.scheduledService = new ScheduledServiceImpl(this);
+        this.remotingClient = RemotingClientFactory.createInstance().init(this.getNettyClientConfig(), null);
+
+        this.sendMessageExecutor = ThreadUtils.newThreadPoolExecutor(
             snodeConfig.getSnodeSendMessageMinPoolSize(),
             snodeConfig.getSnodeSendMessageMaxPoolSize(),
             3000,
@@ -84,7 +100,7 @@ public class SnodeController {
             "SnodeSendMessageThread",
             false);
 
-        this.pullMessageExcutor = ThreadUtils.newThreadPoolExecutor(
+        this.pullMessageExecutor = ThreadUtils.newThreadPoolExecutor(
             snodeConfig.getSnodeSendMessageMinPoolSize(),
             snodeConfig.getSnodeSendMessageMaxPoolSize(),
             3000,
@@ -102,7 +118,7 @@ public class SnodeController {
             "SnodeHeartbeatThread",
             true);
 
-        this.consumerManagerExcutor = ThreadUtils.newThreadPoolExecutor(
+        this.consumerManagerExecutor = ThreadUtils.newThreadPoolExecutor(
             snodeConfig.getSnodeSendMessageMinPoolSize(),
             snodeConfig.getSnodeSendMessageMaxPoolSize(),
             3000,
@@ -111,8 +127,17 @@ public class SnodeController {
             "SnodePullMessageThread",
             false);
 
+        this.consumerManageExecutor = ThreadUtils.newThreadPoolExecutor(
+            snodeConfig.getSnodeSendMessageMinPoolSize(),
+            snodeConfig.getSnodeSendMessageMaxPoolSize(),
+            3000,
+            TimeUnit.MILLISECONDS,
+            new ArrayBlockingQueue<Runnable>(snodeConfig.getSnodeSendThreadPoolQueueCapacity()),
+            "ConsumerManagerThread",
+            false);
+
         if (this.snodeConfig.getNamesrvAddr() != null) {
-            this.snodeOuterService.updateNameServerAddressList(this.snodeConfig.getNamesrvAddr());
+            this.nnodeService.updateNnodeAddressList(this.snodeConfig.getNamesrvAddr());
             log.info("Set user specified name server address: {}", this.snodeConfig.getNamesrvAddr());
         }
 
@@ -122,6 +147,11 @@ public class SnodeController {
         this.consumerManager = new ConsumerManager(consumerIdsChangeListener);
         this.subscriptionGroupManager = new SubscriptionGroupManager(this);
         this.clientHousekeepingService = new ClientHousekeepingService(this.producerManager, this.consumerManager);
+        this.consumerOffsetManager = new ConsumerOffsetManager(this);
+        this.consumerManageProcessor = new ConsumerManageProcessor(this);
+        this.sendMessageProcessor = new SendMessageProcessor(this);
+        this.hearbeatProcessor = new HearbeatProcessor(this);
+        this.pullMessageProcessor = new PullMessageProcessor(this);
     }
 
     public SnodeConfig getSnodeConfig() {
@@ -135,26 +165,34 @@ public class SnodeController {
     }
 
     public void registerProcessor() {
-        snodeServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, new SendMessageProcessor(this), sendMessageExcutor);
-        snodeServer.registerProcessor(RequestCode.HEART_BEAT, new HearbeatProcessor(this), heartbeatExecutor);
-        snodeServer.registerProcessor(RequestCode.SNODE_PULL_MESSAGE, new PullMessageProcessor(this), pullMessageExcutor);
-        snodeServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, new ConsumerManageProcessor(this), consumerManagerExcutor);
+        this.snodeServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendMessageProcessor, this.sendMessageExecutor);
+        this.snodeServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendMessageProcessor, this.sendMessageExecutor);
+        this.snodeServer.registerProcessor(RequestCode.HEART_BEAT, hearbeatProcessor, this.heartbeatExecutor);
+        this.snodeServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, hearbeatProcessor, this.heartbeatExecutor);
+        this.snodeServer.registerProcessor(RequestCode.SNODE_PULL_MESSAGE, pullMessageProcessor, this.pullMessageExecutor);
+        this.snodeServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor);
+        this.snodeServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
+        this.snodeServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
+        this.snodeServer.registerProcessor(RequestCode.GET_MIN_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
+        this.snodeServer.registerProcessor(RequestCode.GET_MAX_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
+        this.snodeServer.registerProcessor(RequestCode.SEARCH_OFFSET_BY_TIMESTAMP, consumerManageProcessor, this.consumerManageExecutor);
     }
 
     public void start() {
         initialize();
         this.snodeServer.start();
-        this.snodeOuterService.start();
+        this.remotingClient.start();
         this.scheduledService.startScheduleTask();
         this.clientHousekeepingService.start(this.snodeConfig.getHouseKeepingInterval());
     }
 
     public void shutdown() {
-        this.sendMessageExcutor.shutdown();
-        this.pullMessageExcutor.shutdown();
+        this.sendMessageExecutor.shutdown();
+        this.pullMessageExecutor.shutdown();
         this.heartbeatExecutor.shutdown();
+        this.consumerManagerExecutor.shutdown();
         this.scheduledExecutorService.shutdown();
-        this.snodeOuterService.shutdown();
+        this.remotingClient.shutdown();
         this.scheduledService.shutdown();
         this.clientHousekeepingService.shutdown();
     }
@@ -195,11 +233,35 @@ public class SnodeController {
         return nettyClientConfig;
     }
 
-    public SnodeOuterService getSnodeOuterService() {
-        return snodeOuterService;
+    public EnodeService getEnodeService() {
+        return enodeService;
+    }
+
+    public void setEnodeService(EnodeService enodeService) {
+        this.enodeService = enodeService;
+    }
+
+    public NnodeService getNnodeService() {
+        return nnodeService;
+    }
+
+    public void setNnodeService(NnodeService nnodeService) {
+        this.nnodeService = nnodeService;
+    }
+
+    public RemotingClient getRemotingClient() {
+        return remotingClient;
+    }
+
+    public void setRemotingClient(RemotingClient remotingClient) {
+        this.remotingClient = remotingClient;
+    }
+
+    public ConsumerOffsetManager getConsumerOffsetManager() {
+        return consumerOffsetManager;
     }
 
-    public void setSnodeOuterService(SnodeOuterService snodeOuterService) {
-        this.snodeOuterService = snodeOuterService;
+    public void setConsumerOffsetManager(ConsumerOffsetManager consumerOffsetManager) {
+        this.consumerOffsetManager = consumerOffsetManager;
     }
 }
diff --git a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/client/ClientHousekeepingService.java b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/client/ClientHousekeepingService.java
index e71ea0a..02598b9 100644
--- a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/client/ClientHousekeepingService.java
+++ b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/client/ClientHousekeepingService.java
@@ -54,7 +54,7 @@ public class ClientHousekeepingService implements ChannelEventListener {
 
     private void scanExceptionChannel() {
         this.producerManager.scanNotActiveChannel();
-        //this.consumerManager.scanNotActiveChannel();
+        this.consumerManager.scanNotActiveChannel();
     }
 
     public void shutdown() {
diff --git a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/client/ConsumerGroupInfo.java b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/client/ConsumerGroupInfo.java
index 9b366a5..89b02fd 100644
--- a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/client/ConsumerGroupInfo.java
+++ b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/client/ConsumerGroupInfo.java
@@ -36,9 +36,9 @@ public class ConsumerGroupInfo {
     private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
     private final String groupName;
     private final ConcurrentMap<String/* Topic */, SubscriptionData> subscriptionTable =
-        new ConcurrentHashMap<String, SubscriptionData>();
+        new ConcurrentHashMap<>();
     private final ConcurrentMap<Channel, ClientChannelInfo> channelInfoTable =
-        new ConcurrentHashMap<Channel, ClientChannelInfo>(16);
+        new ConcurrentHashMap<>(16);
     private volatile ConsumeType consumeType;
     private volatile MessageModel messageModel;
     private volatile ConsumeFromWhere consumeFromWhere;
@@ -124,7 +124,7 @@ public class ConsumerGroupInfo {
         if (null == infoOld) {
             ClientChannelInfo prev = this.channelInfoTable.put(infoNew.getChannel(), infoNew);
             if (null == prev) {
-                log.info("new consumer connected, group: {} {} {} channel: {}", this.groupName, consumeType,
+                log.info("New consumer connected, group: {} {} {} channel: {}", this.groupName, consumeType,
                     messageModel, infoNew.toString());
                 updated = true;
             }
@@ -155,13 +155,13 @@ public class ConsumerGroupInfo {
                 SubscriptionData prev = this.subscriptionTable.putIfAbsent(sub.getTopic(), sub);
                 if (null == prev) {
                     updated = true;
-                    log.info("subscription changed, add new topic, group: {} {}",
+                    log.info("Subscription changed, add new topic, group: {} {}",
                         this.groupName,
                         sub.toString());
                 }
             } else if (sub.getSubVersion() > old.getSubVersion()) {
                 if (this.consumeType == ConsumeType.CONSUME_PASSIVELY) {
-                    log.info("subscription changed, group: {} OLD: {} NEW: {}",
+                    log.info("Subscription changed, group: {} OLD: {} NEW: {}",
                         this.groupName,
                         old.toString(),
                         sub.toString()
diff --git a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/client/ConsumerManager.java b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/client/ConsumerManager.java
index 8d3b665..a0bab83 100644
--- a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/client/ConsumerManager.java
+++ b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/client/ConsumerManager.java
@@ -37,20 +37,18 @@ public class ConsumerManager {
     private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
     private static final long CHANNEL_EXPIRED_TIMEOUT = 1000 * 120;
     private final ConcurrentMap<String/* Group */, ConsumerGroupInfo> consumerTable =
-        new ConcurrentHashMap<String, ConsumerGroupInfo>(1024);
+        new ConcurrentHashMap<>(1024);
     private final ConsumerIdsChangeListener consumerIdsChangeListener;
 
     public ConsumerManager(final ConsumerIdsChangeListener consumerIdsChangeListener) {
         this.consumerIdsChangeListener = consumerIdsChangeListener;
     }
 
-    public ClientChannelInfo findChannel(final String group, final String clientId) {
-        ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);
-        if (consumerGroupInfo != null) {
-            return consumerGroupInfo.findChannel(clientId);
-        }
-        return null;
-    }
+    /**
+     * public ClientChannelInfo findChannel(final String group, final String clientId) { ConsumerGroupInfo
+     * consumerGroupInfo = this.consumerTable.get(group); if (consumerGroupInfo != null) { return
+     * consumerGroupInfo.findChannel(clientId); } return null; }
+     **/
 
     public SubscriptionData findSubscriptionData(final String group, final String topic) {
         ConsumerGroupInfo consumerGroupInfo = this.getConsumerGroupInfo(group);
@@ -84,12 +82,11 @@ public class ConsumerManager {
                 if (info.getChannelInfoTable().isEmpty()) {
                     ConsumerGroupInfo remove = this.consumerTable.remove(next.getKey());
                     if (remove != null) {
-                        log.info("unregister consumer ok, no any connection, and remove consumer group, {}",
+                        log.info("Unregister consumer ok, no any connection, and remove consumer group, {}",
                             next.getKey());
                         this.consumerIdsChangeListener.handle(ConsumerGroupEvent.UNREGISTER, next.getKey());
                     }
                 }
-
                 this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, next.getKey(), info.getAllChannel());
             }
         }
@@ -130,7 +127,7 @@ public class ConsumerManager {
             if (consumerGroupInfo.getChannelInfoTable().isEmpty()) {
                 ConsumerGroupInfo remove = this.consumerTable.remove(group);
                 if (remove != null) {
-                    log.info("unregister consumer ok, no any connection, and remove consumer group, {}", group);
+                    log.info("Unregister consumer ok, no any connection, and remove consumer group, {}", group);
 
                     this.consumerIdsChangeListener.handle(ConsumerGroupEvent.UNREGISTER, group);
                 }
@@ -157,7 +154,7 @@ public class ConsumerManager {
                 long diff = System.currentTimeMillis() - clientChannelInfo.getLastUpdateTimestamp();
                 if (diff > CHANNEL_EXPIRED_TIMEOUT) {
                     log.warn(
-                        "SCAN: remove expired channel from ConsumerManager consumerTable. channel={}, consumerGroup={}",
+                        "SCAN: Remove expired channel from ConsumerManager consumerTable. channel={}, consumerGroup={}",
                         RemotingHelper.parseChannelRemoteAddr(clientChannelInfo.getChannel()), group);
                     RemotingUtil.closeChannel(clientChannelInfo.getChannel());
                     itChannel.remove();
@@ -166,7 +163,7 @@ public class ConsumerManager {
 
             if (channelInfoTable.isEmpty()) {
                 log.warn(
-                    "SCAN: remove expired channel from ConsumerManager consumerTable, all clear, consumerGroup={}",
+                    "SCAN: Remove expired channel from ConsumerManager consumerTable, all clear, consumerGroup={}",
                     group);
                 it.remove();
             }
diff --git a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/client/DefaultConsumerIdsChangeListener.java b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/client/DefaultConsumerIdsChangeListener.java
index cb7c164..1f46c95 100644
--- a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/client/DefaultConsumerIdsChangeListener.java
+++ b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/client/DefaultConsumerIdsChangeListener.java
@@ -43,7 +43,7 @@ public class DefaultConsumerIdsChangeListener implements ConsumerIdsChangeListen
                 List<Channel> channels = (List<Channel>) args[0];
                 if (channels != null && snodeController.getSnodeConfig().isNotifyConsumerIdsChangedEnable()) {
                     for (Channel chl : channels) {
-                        this.snodeController.getSnodeOuterService().notifyConsumerIdsChanged(chl, group);
+                        this.snodeController.getEnodeService().notifyConsumerIdsChanged(chl, group);
                     }
                 }
                 break;
diff --git a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/client/ProducerManager.java b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/client/ProducerManager.java
index b80c027..4513c7d 100644
--- a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/client/ProducerManager.java
+++ b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/client/ProducerManager.java
@@ -36,11 +36,11 @@ public class ProducerManager {
     private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
     private static final long LOCK_TIMEOUT_MILLIS = 3000;
     private static final long CHANNEL_EXPIRED_TIMEOUT = 1000 * 120;
-    private static final int GET_AVALIABLE_CHANNEL_RETRY_COUNT = 3;
     private final Lock groupChannelLock = new ReentrantLock();
     private final HashMap<String /* group name */, HashMap<Channel, ClientChannelInfo>> groupChannelTable =
-        new HashMap<String, HashMap<Channel, ClientChannelInfo>>();
+        new HashMap<>();
     private PositiveAtomicCounter positiveAtomicCounter = new PositiveAtomicCounter();
+
     public ProducerManager() {
 
     }
@@ -144,7 +144,7 @@ public class ProducerManager {
                     clientChannelInfoFound = channelTable.get(clientChannelInfo.getChannel());
                     if (null == clientChannelInfoFound) {
                         channelTable.put(clientChannelInfo.getChannel(), clientChannelInfo);
-                        log.info("new producer connected, group: {} channel: {}", group,
+                        log.info("New producer connected, group: {} channel: {}", group,
                             clientChannelInfo.toString());
                     }
                 } finally {
@@ -158,7 +158,7 @@ public class ProducerManager {
                 log.warn("ProducerManager registerProducer lock timeout");
             }
         } catch (InterruptedException e) {
-            log.error("", e);
+            log.error("Register Producer error: {}", e);
         }
     }
 
@@ -170,13 +170,13 @@ public class ProducerManager {
                     if (null != channelTable && !channelTable.isEmpty()) {
                         ClientChannelInfo old = channelTable.remove(clientChannelInfo.getChannel());
                         if (old != null) {
-                            log.info("unregister a producer[{}] from groupChannelTable {}", group,
+                            log.info("Unregister a producer[{}] from groupChannelTable {}", group,
                                 clientChannelInfo.toString());
                         }
 
                         if (channelTable.isEmpty()) {
                             this.groupChannelTable.remove(group);
-                            log.info("unregister a producer group[{}] from groupChannelTable", group);
+                            log.info("Unregister a producer group[{}] from groupChannelTable", group);
                         }
                     }
                 } finally {
@@ -190,35 +190,35 @@ public class ProducerManager {
         }
     }
 
-    public Channel getAvaliableChannel(String groupId) {
-        HashMap<Channel, ClientChannelInfo> channelClientChannelInfoHashMap = groupChannelTable.get(groupId);
-        List<Channel> channelList = new ArrayList<Channel>();
-        if (channelClientChannelInfoHashMap != null) {
-            for (Channel channel : channelClientChannelInfoHashMap.keySet()) {
-                channelList.add(channel);
-            }
-            int size = channelList.size();
-            if (0 == size) {
-                log.warn("Channel list is empty. groupId={}", groupId);
-                return null;
-            }
-
-            int index = positiveAtomicCounter.incrementAndGet() % size;
-            Channel channel = channelList.get(index);
-            int count = 0;
-            boolean isOk = channel.isActive() && channel.isWritable();
-            while (count++ < GET_AVALIABLE_CHANNEL_RETRY_COUNT) {
-                if (isOk) {
-                    return channel;
-                }
-                index = (++index) % size;
-                channel = channelList.get(index);
-                isOk = channel.isActive() && channel.isWritable();
-            }
-        } else {
-            log.warn("Check transaction failed, channel table is empty. groupId={}", groupId);
-            return null;
-        }
-        return null;
-    }
+//    public Channel getAvaliableChannel(String groupId) {
+//        HashMap<Channel, ClientChannelInfo> channelClientChannelInfoHashMap = groupChannelTable.get(groupId);
+//        List<Channel> channelList = new ArrayList<Channel>();
+//        if (channelClientChannelInfoHashMap != null) {
+//            for (Channel channel : channelClientChannelInfoHashMap.keySet()) {
+//                channelList.add(channel);
+//            }
+//            int size = channelList.size();
+//            if (0 == size) {
+//                log.warn("Channel list is empty. groupId={}", groupId);
+//                return null;
+//            }
+//
+//            int index = positiveAtomicCounter.incrementAndGet() % size;
+//            Channel channel = channelList.get(index);
+//            int count = 0;
+//            boolean isOk = channel.isActive() && channel.isWritable();
+//            while (count++ < GET_AVALIABLE_CHANNEL_RETRY_COUNT) {
+//                if (isOk) {
+//                    return channel;
+//                }
+//                index = (++index) % size;
+//                channel = channelList.get(index);
+//                isOk = channel.isActive() && channel.isWritable();
+//            }
+//        } else {
+//            log.warn("Check transaction failed, channel table is empty. groupId={}", groupId);
+//            return null;
+//        }
+//        return null;
+//    }
 }
diff --git a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/client/SubscriptionGroupManager.java b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/client/SubscriptionGroupManager.java
index 3c6799e..8d83d7a 100644
--- a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/client/SubscriptionGroupManager.java
+++ b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/client/SubscriptionGroupManager.java
@@ -38,12 +38,6 @@ public class SubscriptionGroupManager extends ConfigManager {
     private final DataVersion dataVersion = new DataVersion();
     private transient SnodeController snodeController;
 
-    public enum SUBSCRIPTION_EVENT {
-        CREATE,
-        UPDATE,
-        DELETE
-    }
-
     public SubscriptionGroupManager() {
         this.init();
     }
@@ -104,14 +98,14 @@ public class SubscriptionGroupManager extends ConfigManager {
     public void updateSubscriptionGroupConfig(final SubscriptionGroupConfig config) {
         SubscriptionGroupConfig old = this.subscriptionGroupTable.put(config.getGroupName(), config);
         if (old != null) {
-            log.info("update subscription group config, old: {} new: {}", old, config);
+            log.info("Update subscription group config, old: {} new: {}", old, config);
         } else {
-            log.info("create new subscription group, {}", config);
+            log.info("Create new subscription group, {}", config);
         }
 
         this.dataVersion.nextVersion();
 
-        this.persistToEnode(SUBSCRIPTION_EVENT.UPDATE, config);
+        this.persistToEnode(config);
     }
 
     public void disableConsume(final String groupName) {
@@ -133,7 +127,7 @@ public class SubscriptionGroupManager extends ConfigManager {
                     log.info("auto create a subscription group, {}", subscriptionGroupConfig.toString());
                 }
                 this.dataVersion.nextVersion();
-                this.persistToEnode(SUBSCRIPTION_EVENT.CREATE, subscriptionGroupConfig);
+                this.persistToEnode(subscriptionGroupConfig);
             }
         }
 
@@ -188,13 +182,13 @@ public class SubscriptionGroupManager extends ConfigManager {
         if (old != null) {
             log.info("delete subscription group OK, subscription group:{}", old);
             this.dataVersion.nextVersion();
-            this.persistToEnode(SUBSCRIPTION_EVENT.DELETE, old);
+            this.persistToEnode(old);
         } else {
             log.warn("delete subscription group failed, subscription groupName: {} not exist", groupName);
         }
     }
 
-    void persistToEnode(SUBSCRIPTION_EVENT event, SubscriptionGroupConfig config) {
-
+    void persistToEnode(SubscriptionGroupConfig config) {
+        this.snodeController.getEnodeService().persistSubscriptionGroupConfig(config);
     }
 }
diff --git a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/config/SnodeConfig.java b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/config/SnodeConfig.java
index 725cf6a..6ef55b1 100644
--- a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/config/SnodeConfig.java
+++ b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/config/SnodeConfig.java
@@ -21,6 +21,8 @@ import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
 
+import static org.apache.rocketmq.client.ClientConfig.SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY;
+
 public class SnodeConfig {
 
     private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);
@@ -60,6 +62,8 @@ public class SnodeConfig {
 
     private int listenPort = 11911;
 
+    private boolean vipChannelEnabled = Boolean.parseBoolean(System.getProperty(SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY, "true"));
+
     public void setSnodeHeartBeatInterval(long snodeHeartBeatInterval) {
         this.snodeHeartBeatInterval = snodeHeartBeatInterval;
     }
@@ -220,4 +224,12 @@ public class SnodeConfig {
     public void setAutoCreateSubscriptionGroup(boolean autoCreateSubscriptionGroup) {
         this.autoCreateSubscriptionGroup = autoCreateSubscriptionGroup;
     }
+
+    public boolean isVipChannelEnabled() {
+        return vipChannelEnabled;
+    }
+
+    public void setVipChannelEnabled(boolean vipChannelEnabled) {
+        this.vipChannelEnabled = vipChannelEnabled;
+    }
 }
diff --git a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/constant/SnodeConstant.java b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/constant/SnodeConstant.java
index 2ba91b2..1f5c7dd 100644
--- a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/constant/SnodeConstant.java
+++ b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/constant/SnodeConstant.java
@@ -22,4 +22,6 @@ public class SnodeConstant {
 
     public static final long defaultTimeoutMills = 3000L;
 
+    public static final long CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND = 1000 * 30;
+
 }
diff --git a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/service/SendTransferService.java b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/exception/SnodeException.java
similarity index 50%
rename from rocketmq-snode/src/main/java/org/apache/rocketmq/snode/service/SendTransferService.java
rename to rocketmq-snode/src/main/java/org/apache/rocketmq/snode/exception/SnodeException.java
index 6dad57c..e9bd114 100644
--- a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/service/SendTransferService.java
+++ b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/exception/SnodeException.java
@@ -1,4 +1,4 @@
-package org.apache.rocketmq.snode.service;/*
+package org.apache.rocketmq.snode.exception;/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -15,12 +15,27 @@ package org.apache.rocketmq.snode.service;/*
  * limitations under the License.
  */
 
-import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.help.FAQUrl;
 
-public interface SendTransferService {
-    RemotingCommand sendMessage(RemotingCommand request);
+public class SnodeException extends RuntimeException {
+    private static final long serialVersionUID = 5975020272601250368L;
 
-    boolean start();
+    private final int responseCode;
+    private final String errorMessage;
 
-    void shutdown();
-}
+    public SnodeException(int responseCode, String errorMessage) {
+        super(FAQUrl.attachDefaultURL("CODE: " + UtilAll.responseCode2String(responseCode) + "  DESC: "
+            + errorMessage));
+        this.responseCode = responseCode;
+        this.errorMessage = errorMessage;
+    }
+
+    public int getResponseCode() {
+        return responseCode;
+    }
+
+    public String getErrorMessage() {
+        return errorMessage;
+    }
+}
\ No newline at end of file
diff --git a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/offset/ConsumerOffsetManager.java b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/offset/ConsumerOffsetManager.java
new file mode 100644
index 0000000..c177ccf
--- /dev/null
+++ b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/offset/ConsumerOffsetManager.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.snode.offset;
+
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.common.protocol.header.GetMinOffsetResponseHeader;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+import org.apache.rocketmq.remoting.exception.RemotingConnectException;
+import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
+import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
+import org.apache.rocketmq.snode.SnodeController;
+import org.apache.rocketmq.snode.exception.SnodeException;
+
+public class ConsumerOffsetManager {
+    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
+    private static final String TOPIC_GROUP_SEPARATOR = "@";
+
+    private ConcurrentMap<String/* topic@group */, ConcurrentMap<Integer, Long>> offsetTable =
+        new ConcurrentHashMap<String, ConcurrentMap<Integer, Long>>(512);
+
+    private transient SnodeController snodeController;
+
+    public ConsumerOffsetManager() {
+    }
+
+    public ConsumerOffsetManager(SnodeController brokerController) {
+        this.snodeController = brokerController;
+    }
+
+    public void scanUnsubscribedTopic(String enodeName) throws InterruptedException, RemotingTimeoutException,
+        RemotingSendRequestException, RemotingConnectException, RemotingCommandException {
+        Iterator<Entry<String, ConcurrentMap<Integer, Long>>> it = this.offsetTable.entrySet().iterator();
+        while (it.hasNext()) {
+            Entry<String, ConcurrentMap<Integer, Long>> next = it.next();
+            String topicAtGroup = next.getKey();
+            String[] arrays = topicAtGroup.split(TOPIC_GROUP_SEPARATOR);
+            if (arrays.length == 2) {
+                String topic = arrays[0];
+                String group = arrays[1];
+
+                if (null == snodeController.getConsumerManager().findSubscriptionData(group, topic)
+                    && this.offsetBehindMuchThanData(enodeName, topic, next.getValue())) {
+                    it.remove();
+                    log.warn("Remove topic offset, {}", topicAtGroup);
+                }
+            }
+        }
+    }
+
+    private String buildKey(final String enodeName, final String topic, final String consumerGroup) {
+        if (enodeName == null || topic == null || consumerGroup == null) {
+            log.warn("Build key parameter error enodeName: {}, topic: {} consumerGroup:{}",
+                enodeName, topic, consumerGroup);
+            throw new SnodeException(ResponseCode.PARAMETER_ERROR, "Build key parameter error!");
+        }
+        StringBuilder sb = new StringBuilder(50);
+        sb.append(enodeName).append(TOPIC_GROUP_SEPARATOR).append(topic).append(TOPIC_GROUP_SEPARATOR).append(consumerGroup);
+        return sb.toString();
+    }
+
+    private boolean offsetBehindMuchThanData(final String enodeName, final String topic,
+        ConcurrentMap<Integer, Long> table) throws InterruptedException, RemotingTimeoutException,
+        RemotingSendRequestException, RemotingConnectException, RemotingCommandException {
+        Iterator<Entry<Integer, Long>> it = table.entrySet().iterator();
+        boolean result = !table.isEmpty();
+
+        while (it.hasNext() && result) {
+            Entry<Integer, Long> next = it.next();
+            RemotingCommand remotingCommand = this.snodeController.getEnodeService().getMinOffsetInQueue(enodeName, topic, next.getKey());
+            long minOffsetInStore = 0;
+            if (remotingCommand != null) {
+                switch (remotingCommand.getCode()) {
+                    case ResponseCode.SUCCESS: {
+                        GetMinOffsetResponseHeader responseHeader =
+                            (GetMinOffsetResponseHeader) remotingCommand.decodeCommandCustomHeader(GetMinOffsetResponseHeader.class);
+                        minOffsetInStore = responseHeader.getOffset();
+                    }
+                    default:
+                        break;
+                }
+            } else {
+                throw new SnodeException(ResponseCode.QUERY_OFFSET_ERROR, "Query min offset error!");
+            }
+            long offsetInPersist = next.getValue();
+            result = offsetInPersist <= minOffsetInStore;
+        }
+        return result;
+    }
+
+    public Set<String> whichTopicByConsumer(final String group) {
+        Set<String> topics = new HashSet<String>();
+
+        Iterator<Entry<String, ConcurrentMap<Integer, Long>>> it = this.offsetTable.entrySet().iterator();
+        while (it.hasNext()) {
+            Entry<String, ConcurrentMap<Integer, Long>> next = it.next();
+            String topicAtGroup = next.getKey();
+            String[] arrays = topicAtGroup.split(TOPIC_GROUP_SEPARATOR);
+            if (arrays.length == 2) {
+                if (group.equals(arrays[1])) {
+                    topics.add(arrays[0]);
+                }
+            }
+        }
+
+        return topics;
+    }
+
+    public Set<String> whichGroupByTopic(final String topic) {
+        Set<String> groups = new HashSet<String>();
+        Iterator<Entry<String, ConcurrentMap<Integer, Long>>> it = this.offsetTable.entrySet().iterator();
+        while (it.hasNext()) {
+            Entry<String, ConcurrentMap<Integer, Long>> next = it.next();
+            String topicAtGroup = next.getKey();
+            String[] arrays = topicAtGroup.split(TOPIC_GROUP_SEPARATOR);
+            if (arrays.length == 2) {
+                if (topic.equals(arrays[0])) {
+                    groups.add(arrays[1]);
+                }
+            }
+        }
+
+        return groups;
+    }
+
+    public void commitOffset(final String enodeName, final String clientHost, final String group, final String topic,
+        final int queueId,
+        final long offset) {
+        // topic@group
+        String key = buildKey(enodeName, topic, group);
+        this.commitOffset(clientHost, key, queueId, offset);
+    }
+
+    private void commitOffset(final String clientHost, final String key, final int queueId, final long offset) {
+        ConcurrentMap<Integer, Long> map = this.offsetTable.get(key);
+        if (null == map) {
+            map = new ConcurrentHashMap<>(32);
+            map.put(queueId, offset);
+            this.offsetTable.put(key, map);
+        } else {
+            Long storeOffset = map.put(queueId, offset);
+            if (storeOffset != null && offset < storeOffset) {
+                log.warn("[NOTIFYME]update consumer offset less than store. clientHost={}, key={}, queueId={}, requestOffset={}, storeOffset={}", clientHost, key, queueId, offset, storeOffset);
+            }
+        }
+    }
+
+    public long queryOffset(final String enodeName, final String group, final String topic, final int queueId) {
+        String key = buildKey(enodeName, topic, group);
+        ConcurrentMap<Integer, Long> map = this.offsetTable.get(key);
+        if (null != map) {
+            Long offset = map.get(queueId);
+            if (offset != null)
+                return offset;
+        }
+
+        return -1;
+    }
+
+    public String encode() {
+        return this.encode(false);
+    }
+
+    public void decode(String jsonString) {
+        if (jsonString != null) {
+            ConsumerOffsetManager obj = RemotingSerializable.fromJson(jsonString, ConsumerOffsetManager.class);
+            if (obj != null) {
+                this.offsetTable = obj.offsetTable;
+            }
+        }
+    }
+
+    public String encode(final boolean prettyFormat) {
+        return RemotingSerializable.toJson(this, prettyFormat);
+    }
+
+    public ConcurrentMap<String, ConcurrentMap<Integer, Long>> getOffsetTable() {
+        return offsetTable;
+    }
+
+    public void setOffsetTable(ConcurrentHashMap<String, ConcurrentMap<Integer, Long>> offsetTable) {
+        this.offsetTable = offsetTable;
+    }
+
+
+    public Map<Integer, Long> queryOffset(final String enodeName, final String group, final String topic) {
+        // topic@group
+        String key = buildKey(enodeName, topic, group);
+        return this.offsetTable.get(key);
+    }
+
+    public void cloneOffset(final String srcGroup, final String destGroup, final String topic) {
+        ConcurrentMap<Integer, Long> offsets = this.offsetTable.get(topic + TOPIC_GROUP_SEPARATOR + srcGroup);
+        if (offsets != null) {
+            this.offsetTable.put(topic + TOPIC_GROUP_SEPARATOR + destGroup, new ConcurrentHashMap<Integer, Long>(offsets));
+        }
+    }
+
+    public void persist() {
+        for (Entry<String, ConcurrentMap<Integer, Long>> offSetEntry : this.offsetTable.entrySet()) {
+            ConcurrentHashMap<Integer, Long> map = (ConcurrentHashMap<Integer, Long>) offSetEntry.getValue();
+            String key = offSetEntry.getKey();
+            String[] keys = key.split(TOPIC_GROUP_SEPARATOR);
+            if (keys.length == 3) {
+                String enodeName = keys[0];
+                String topic = keys[1];
+                String consumerGroup = keys[2];
+                for (Entry<Integer, Long> queueEntry : map.entrySet()) {
+                    Integer queueId = queueEntry.getKey();
+                    Long offset = queueEntry.getValue();
+                    this.snodeController.getEnodeService().persistOffsetToEnode(enodeName, consumerGroup, topic, queueId, offset);
+                }
+            } else {
+                log.error("Persist offset split keys error:{}", key);
+            }
+        }
+    }
+}
diff --git a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/processor/ConsumerManageProcessor.java b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/processor/ConsumerManageProcessor.java
index aec6f04..3e53795 100644
--- a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/processor/ConsumerManageProcessor.java
+++ b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/processor/ConsumerManageProcessor.java
@@ -24,10 +24,20 @@ import org.apache.rocketmq.common.protocol.ResponseCode;
 import org.apache.rocketmq.common.protocol.header.GetConsumerListByGroupRequestHeader;
 import org.apache.rocketmq.common.protocol.header.GetConsumerListByGroupResponseBody;
 import org.apache.rocketmq.common.protocol.header.GetConsumerListByGroupResponseHeader;
+import org.apache.rocketmq.common.protocol.header.GetMaxOffsetRequestHeader;
+import org.apache.rocketmq.common.protocol.header.GetMinOffsetRequestHeader;
+import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetRequestHeader;
+import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetResponseHeader;
+import org.apache.rocketmq.common.protocol.header.SearchOffsetRequestHeader;
+import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetRequestHeader;
+import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetResponseHeader;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
 import org.apache.rocketmq.remoting.common.RemotingHelper;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+import org.apache.rocketmq.remoting.exception.RemotingConnectException;
+import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
+import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
 import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 import org.apache.rocketmq.snode.SnodeController;
@@ -38,16 +48,27 @@ public class ConsumerManageProcessor implements NettyRequestProcessor {
 
     private final SnodeController snodeController;
 
-    public ConsumerManageProcessor(final SnodeController snodeController) {
-        this.snodeController = snodeController;
+    public ConsumerManageProcessor(final SnodeController brokerController) {
+        this.snodeController = brokerController;
     }
 
     @Override
     public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)
-        throws RemotingCommandException {
+        throws InterruptedException, RemotingTimeoutException,
+        RemotingSendRequestException, RemotingConnectException, RemotingCommandException {
         switch (request.getCode()) {
             case RequestCode.GET_CONSUMER_LIST_BY_GROUP:
                 return this.getConsumerListByGroup(ctx, request);
+            case RequestCode.UPDATE_CONSUMER_OFFSET:
+                return this.updateConsumerOffset(ctx, request);
+            case RequestCode.QUERY_CONSUMER_OFFSET:
+                return this.queryConsumerOffset(ctx, request);
+            case RequestCode.SEARCH_OFFSET_BY_TIMESTAMP:
+                return searchOffsetByTimestamp(ctx, request);
+            case RequestCode.GET_MAX_OFFSET:
+                return getMaxOffset(ctx, request);
+            case RequestCode.GET_MIN_OFFSET:
+                return getMinOffset(ctx, request);
             default:
                 break;
         }
@@ -59,6 +80,45 @@ public class ConsumerManageProcessor implements NettyRequestProcessor {
         return false;
     }
 
+    public RemotingCommand searchOffsetByTimestamp(ChannelHandlerContext ctx,
+        RemotingCommand request) throws RemotingCommandException {
+        final SearchOffsetRequestHeader requestHeader =
+            (SearchOffsetRequestHeader) request
+                .decodeCommandCustomHeader(SearchOffsetRequestHeader.class);
+        try {
+            return this.snodeController.getEnodeService().getOffsetByTimestamp(requestHeader.getEnodeName(), request);
+        } catch (Exception ex) {
+            log.error("Search offset by timestamp error:{}", ex);
+        }
+        return null;
+    }
+
+    public RemotingCommand getMinOffset(ChannelHandlerContext ctx,
+        RemotingCommand request) throws RemotingCommandException {
+        final GetMinOffsetRequestHeader requestHeader =
+            (GetMinOffsetRequestHeader) request
+                .decodeCommandCustomHeader(GetMinOffsetRequestHeader.class);
+        try {
+            return this.snodeController.getEnodeService().getMinOffsetInQueue(requestHeader.getEnodeName(), requestHeader.getTopic(), requestHeader.getQueueId());
+        } catch (Exception ex) {
+            log.error("Get min offset error:{}", ex);
+        }
+        return null;
+    }
+
+    public RemotingCommand getMaxOffset(ChannelHandlerContext ctx,
+        RemotingCommand request) throws RemotingCommandException {
+        final GetMaxOffsetRequestHeader requestHeader =
+            (GetMaxOffsetRequestHeader) request
+                .decodeCommandCustomHeader(GetMaxOffsetRequestHeader.class);
+        try {
+            return this.snodeController.getEnodeService().getMaxOffsetInQueue(requestHeader.getEnodeName(), request);
+        } catch (Exception ex) {
+            log.error("Get min offset error:{}", ex);
+        }
+        return null;
+    }
+
     public RemotingCommand getConsumerListByGroup(ChannelHandlerContext ctx, RemotingCommand request)
         throws RemotingCommandException {
         final RemotingCommand response =
@@ -80,7 +140,7 @@ public class ConsumerManageProcessor implements NettyRequestProcessor {
                 response.setRemark(null);
                 return response;
             } else {
-                log.warn("Get all client failed, {} {}", requestHeader.getConsumerGroup(),
+                log.warn("GetAllClientId failed, {} {}", requestHeader.getConsumerGroup(),
                     RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
             }
         } else {
@@ -89,8 +149,53 @@ public class ConsumerManageProcessor implements NettyRequestProcessor {
         }
 
         response.setCode(ResponseCode.SYSTEM_ERROR);
-        response.setRemark("no consumer for this group, " + requestHeader.getConsumerGroup());
+        response.setRemark("No consumer for this group, " + requestHeader.getConsumerGroup());
         return response;
     }
 
+    private RemotingCommand updateConsumerOffset(ChannelHandlerContext ctx, RemotingCommand request)
+        throws RemotingCommandException {
+        final RemotingCommand response =
+            RemotingCommand.createResponseCommand(UpdateConsumerOffsetResponseHeader.class);
+        final UpdateConsumerOffsetRequestHeader requestHeader =
+            (UpdateConsumerOffsetRequestHeader) request
+                .decodeCommandCustomHeader(UpdateConsumerOffsetRequestHeader.class);
+        this.snodeController.getConsumerOffsetManager().commitOffset(requestHeader.getEnodeName(), RemotingHelper.parseChannelRemoteAddr(ctx.channel()), requestHeader.getConsumerGroup(),
+            requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getCommitOffset());
+        response.setCode(ResponseCode.SUCCESS);
+        response.setRemark(null);
+        return response;
+    }
+
+    private RemotingCommand queryConsumerOffset(ChannelHandlerContext ctx, RemotingCommand request)
+        throws InterruptedException, RemotingTimeoutException,
+        RemotingSendRequestException, RemotingConnectException, RemotingCommandException {
+        final RemotingCommand response =
+            RemotingCommand.createResponseCommand(QueryConsumerOffsetResponseHeader.class);
+        final QueryConsumerOffsetResponseHeader responseHeader =
+            (QueryConsumerOffsetResponseHeader) response.readCustomHeader();
+        final QueryConsumerOffsetRequestHeader requestHeader =
+            (QueryConsumerOffsetRequestHeader) request
+                .decodeCommandCustomHeader(QueryConsumerOffsetRequestHeader.class);
+
+        long offset =
+            this.snodeController.getConsumerOffsetManager().queryOffset(requestHeader.getEnodeName(),
+                requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId());
+
+        if (offset < 0) {
+            log.info("Load offset from enode server, enodeName: {}, consumer group: {}, topic: {}, queueId: {}",
+                requestHeader.getEnodeName(),
+                requestHeader.getConsumerGroup(),
+                requestHeader.getTopic(),
+                requestHeader.getQueueId());
+            return this.snodeController.getEnodeService().loadOffset(requestHeader.getEnodeName(), requestHeader.getConsumerGroup(), requestHeader.getTopic(),
+                requestHeader.getQueueId());
+        } else {
+            responseHeader.setOffset(offset);
+            response.setCode(ResponseCode.SUCCESS);
+            response.setRemark(null);
+            return response;
+        }
+    }
 }
+
diff --git a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/processor/HearbeatProcessor.java b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/processor/HearbeatProcessor.java
index c26ed7c..a0af26a 100644
--- a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/processor/HearbeatProcessor.java
+++ b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/processor/HearbeatProcessor.java
@@ -15,12 +15,20 @@
  * limitations under the License.
  */
 package org.apache.rocketmq.snode.processor;
+
 import io.netty.channel.ChannelHandlerContext;
+import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.constant.PermName;
+import org.apache.rocketmq.common.protocol.RequestCode;
+import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.common.protocol.header.UnregisterClientRequestHeader;
+import org.apache.rocketmq.common.protocol.header.UnregisterClientResponseHeader;
 import org.apache.rocketmq.common.protocol.heartbeat.ConsumerData;
 import org.apache.rocketmq.common.protocol.heartbeat.HeartbeatData;
 import org.apache.rocketmq.common.protocol.heartbeat.ProducerData;
 import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
+import org.apache.rocketmq.common.sysflag.TopicSysFlag;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
 import org.apache.rocketmq.remoting.common.RemotingHelper;
@@ -38,7 +46,19 @@ public class HearbeatProcessor implements NettyRequestProcessor {
     }
 
     @Override
-    public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) {
+    public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws Exception {
+        switch (request.getCode()) {
+            case RequestCode.HEART_BEAT:
+                return heartbeat(ctx, request);
+            case RequestCode.UNREGISTER_CLIENT:
+                return unregister(ctx, request);
+            default:
+                break;
+        }
+        return null;
+    }
+
+    private RemotingCommand heartbeat(ChannelHandlerContext ctx, RemotingCommand request) {
         HeartbeatData heartbeatData = HeartbeatData.decode(request.getBody(), HeartbeatData.class);
         ClientChannelInfo clientChannelInfo = new ClientChannelInfo(
             ctx.channel(),
@@ -55,11 +75,15 @@ public class HearbeatProcessor implements NettyRequestProcessor {
         }
 
         if (heartbeatData.getConsumerDataSet() != null) {
+            log.info("ConsumerDataSet: {}", heartbeatData.getConsumerDataSet());
             for (ConsumerData data : heartbeatData.getConsumerDataSet()) {
                 SubscriptionGroupConfig subscriptionGroupConfig =
                     this.snodeController.getSubscriptionGroupManager().findSubscriptionGroupConfig(
                         data.getGroupName());
                 boolean isNotifyConsumerIdsChangedEnable = subscriptionGroupConfig.isNotifyConsumerIdsChangedEnable();
+                if (null != subscriptionGroupConfig) {
+                    isNotifyConsumerIdsChangedEnable = subscriptionGroupConfig.isNotifyConsumerIdsChangedEnable();
+                }
                 boolean changed = this.snodeController.getConsumerManager().registerConsumer(
                     data.getGroupName(),
                     clientChannelInfo,
@@ -79,6 +103,44 @@ public class HearbeatProcessor implements NettyRequestProcessor {
             }
         }
         RemotingCommand response = RemotingCommand.createResponseCommand(null);
+        response.setCode(ResponseCode.SUCCESS);
+        response.setRemark(null);
+        return response;
+    }
+
+    private RemotingCommand unregister(ChannelHandlerContext ctx, RemotingCommand request) throws Exception {
+        final RemotingCommand response =
+            RemotingCommand.createResponseCommand(UnregisterClientResponseHeader.class);
+        final UnregisterClientRequestHeader requestHeader =
+            (UnregisterClientRequestHeader) request.decodeCommandCustomHeader(UnregisterClientRequestHeader.class);
+
+        ClientChannelInfo clientChannelInfo = new ClientChannelInfo(
+            ctx.channel(),
+            requestHeader.getClientID(),
+            request.getLanguage(),
+            request.getVersion());
+        {
+            final String group = requestHeader.getProducerGroup();
+            if (group != null) {
+                this.snodeController.getProducerManager().unregisterProducer(group, clientChannelInfo);
+            }
+        }
+
+        {
+            final String group = requestHeader.getConsumerGroup();
+            if (group != null) {
+                SubscriptionGroupConfig subscriptionGroupConfig =
+                    this.snodeController.getSubscriptionGroupManager().findSubscriptionGroupConfig(group);
+                boolean isNotifyConsumerIdsChangedEnable = true;
+                if (null != subscriptionGroupConfig) {
+                    isNotifyConsumerIdsChangedEnable = subscriptionGroupConfig.isNotifyConsumerIdsChangedEnable();
+                }
+                this.snodeController.getConsumerManager().unregisterConsumer(group, clientChannelInfo, isNotifyConsumerIdsChangedEnable);
+            }
+        }
+
+        response.setCode(ResponseCode.SUCCESS);
+        response.setRemark(null);
         return response;
     }
 
diff --git a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/processor/PullMessageProcessor.java b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/processor/PullMessageProcessor.java
index 6e474bc..a636f87 100644
--- a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/processor/PullMessageProcessor.java
+++ b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/processor/PullMessageProcessor.java
@@ -18,11 +18,19 @@ package org.apache.rocketmq.snode.processor;/*
 import io.netty.channel.ChannelHandlerContext;
 import java.util.concurrent.CompletableFuture;
 import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.help.FAQUrl;
+import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
+import org.apache.rocketmq.common.protocol.header.PullMessageResponseHeader;
+import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
+import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
+import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
 import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 import org.apache.rocketmq.snode.SnodeController;
+import org.apache.rocketmq.snode.client.ConsumerGroupInfo;
 
 public class PullMessageProcessor implements NettyRequestProcessor {
     private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME);
@@ -34,8 +42,52 @@ public class PullMessageProcessor implements NettyRequestProcessor {
     }
 
     @Override
-    public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) {
-        CompletableFuture<RemotingCommand> responseFuture = snodeController.getSnodeOuterService().pullMessage(ctx, request);
+    public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws Exception {
+        RemotingCommand response = RemotingCommand.createResponseCommand(PullMessageResponseHeader.class);
+
+        final PullMessageRequestHeader requestHeader =
+            (PullMessageRequestHeader) request.decodeCommandCustomHeader(PullMessageRequestHeader.class);
+
+        ConsumerGroupInfo consumerGroupInfo = snodeController.getConsumerManager().getConsumerGroupInfo(requestHeader.getConsumerGroup());
+
+        SubscriptionGroupConfig subscriptionGroupConfig =
+            this.snodeController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getConsumerGroup());
+        if (null == subscriptionGroupConfig) {
+            response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);
+            response.setRemark(String.format("Subscription group [%s] does not exist, %s", requestHeader.getConsumerGroup(), FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST)));
+            return response;
+        }
+
+        if (!subscriptionGroupConfig.isConsumeEnable()) {
+            response.setCode(ResponseCode.NO_PERMISSION);
+            response.setRemark("Subscription group no permission, " + requestHeader.getConsumerGroup());
+            return response;
+        }
+
+        if (!subscriptionGroupConfig.isConsumeBroadcastEnable()
+            && consumerGroupInfo.getMessageModel() == MessageModel.BROADCASTING) {
+            response.setCode(ResponseCode.NO_PERMISSION);
+            response.setRemark("The consumer group[" + requestHeader.getConsumerGroup() + "] can not consume by broadcast way");
+            return response;
+        }
+
+        SubscriptionData subscriptionData = consumerGroupInfo.findSubscriptionData(requestHeader.getTopic());
+        if (null == subscriptionData) {
+            log.warn("The consumer's subscription not exist, group: {}, topic:{}", requestHeader.getConsumerGroup(), requestHeader.getTopic());
+            response.setCode(ResponseCode.SUBSCRIPTION_NOT_EXIST);
+            response.setRemark("The consumer's subscription not exist" + FAQUrl.suggestTodo(FAQUrl.SAME_GROUP_DIFFERENT_TOPIC));
+            return response;
+        }
+
+        if (subscriptionData.getSubVersion() < requestHeader.getSubVersion()) {
+            log.warn("The broker's subscription is not latest, group: {} {}", requestHeader.getConsumerGroup(),
+                subscriptionData.getSubString());
+            response.setCode(ResponseCode.SUBSCRIPTION_NOT_LATEST);
+            response.setRemark("The consumer's subscription not latest");
+            return response;
+        }
+
+        CompletableFuture<RemotingCommand> responseFuture = snodeController.getEnodeService().pullMessage(ctx, request);
         responseFuture.whenComplete((data, ex) -> {
             if (ex == null) {
                 this.snodeController.getSnodeServer().sendResponse(ctx, data);
diff --git a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/processor/SendMessageProcessor.java b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/processor/SendMessageProcessor.java
index 3f52ed3..15e4294 100644
--- a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/processor/SendMessageProcessor.java
+++ b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/processor/SendMessageProcessor.java
@@ -35,7 +35,8 @@ public class SendMessageProcessor implements NettyRequestProcessor {
 
     @Override
     public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) {
-        CompletableFuture<RemotingCommand> responseFuture = snodeController.getSnodeOuterService().sendMessage(request);
+        log.info("-----Receive sendback request: {}", request);
+        CompletableFuture<RemotingCommand> responseFuture = snodeController.getEnodeService().sendMessage(request);
         responseFuture.whenComplete((data, ex) -> {
             if (ex == null) {
                 snodeController.getSnodeServer().sendResponse(ctx, data);
diff --git a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/service/SnodeOuterService.java b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/service/EnodeService.java
similarity index 56%
rename from rocketmq-snode/src/main/java/org/apache/rocketmq/snode/service/SnodeOuterService.java
rename to rocketmq-snode/src/main/java/org/apache/rocketmq/snode/service/EnodeService.java
index a1ffdd8..cf7c1e9 100644
--- a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/service/SnodeOuterService.java
+++ b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/service/EnodeService.java
@@ -17,17 +17,17 @@ package org.apache.rocketmq.snode.service;/*
 
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandlerContext;
-import io.netty.util.concurrent.CompleteFuture;
 import java.util.concurrent.CompletableFuture;
 import org.apache.rocketmq.client.exception.MQBrokerException;
 import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 import org.apache.rocketmq.remoting.exception.RemotingConnectException;
 import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
 import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
-import org.apache.rocketmq.snode.config.SnodeConfig;
 
-public interface SnodeOuterService {
+public interface EnodeService {
     void sendHearbeat(RemotingCommand remotingCommand);
 
     CompletableFuture<RemotingCommand> sendMessage(final RemotingCommand request);
@@ -35,22 +35,29 @@ public interface SnodeOuterService {
     CompletableFuture<RemotingCommand> pullMessage(final ChannelHandlerContext context,
         final RemotingCommand remotingCommand);
 
-    void saveSubscriptionData(RemotingCommand remotingCommand);
+    void notifyConsumerIdsChanged(final Channel channel, final String consumerGroup);
 
-    void start();
+    RemotingCommand creatTopic(String enodeName, TopicConfig topicConfig)throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException ;
 
-    void shutdown();
+    void updateEnodeAddr(String clusterName) throws InterruptedException, RemotingTimeoutException,
+        RemotingSendRequestException, RemotingConnectException, MQBrokerException;
 
-    void registerSnode(SnodeConfig snodeConfig);
+    boolean persistSubscriptionGroupConfig(SubscriptionGroupConfig subscriptionGroupConfig);
 
-    void updateNameServerAddressList(final String addrs);
+    void persistOffsetToEnode(String enodeName, String groupName, String topic, int queueId, long offset);
 
-    String fetchNameServerAddr();
+    RemotingCommand loadOffset(String enodeName, String consumerGroup, String topic,
+        int queueId) throws InterruptedException, RemotingTimeoutException,
+        RemotingSendRequestException, RemotingConnectException;
 
-    void updateEnodeAddr(String clusterName) throws InterruptedException, RemotingTimeoutException,
-        RemotingSendRequestException, RemotingConnectException, MQBrokerException;
+    RemotingCommand getMaxOffsetInQueue(String enodeName,
+        RemotingCommand request) throws InterruptedException, RemotingTimeoutException,
+        RemotingSendRequestException, RemotingConnectException, RemotingCommandException;
 
-    void notifyConsumerIdsChanged(final Channel channel, final String consumerGroup);
+    RemotingCommand getMinOffsetInQueue(String enodeName, String topic,
+        int queueId) throws InterruptedException, RemotingTimeoutException,
+        RemotingSendRequestException, RemotingConnectException, RemotingCommandException;
 
-    RemotingCommand creatTopic(TopicConfig topicConfig);
+    RemotingCommand getOffsetByTimestamp(String enodeName,
+        RemotingCommand request) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException;
 }
diff --git a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/service/NnodeService.java b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/service/NnodeService.java
new file mode 100644
index 0000000..21bc6ed
--- /dev/null
+++ b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/service/NnodeService.java
@@ -0,0 +1,47 @@
+package org.apache.rocketmq.snode.service;/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.util.Set;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.protocol.body.ClusterInfo;
+import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.apache.rocketmq.remoting.exception.RemotingConnectException;
+import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
+import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
+import org.apache.rocketmq.snode.config.SnodeConfig;
+
+public interface NnodeService {
+    void registerSnode(SnodeConfig snodeConfig);
+
+    void updateNnodeAddressList(final String addrs);
+
+    String fetchNnodeAdress();
+
+    void updateTopicRouteDataByTopic();
+
+    Set<String> getEnodeClusterInfo(String clusterName);
+
+    ClusterInfo updateEnodeClusterInfo() throws InterruptedException, RemotingTimeoutException,
+        RemotingSendRequestException, RemotingConnectException;
+
+    String getAddressByEnodeName(String brokerName,
+        boolean isUseSlave) throws InterruptedException, RemotingTimeoutException,
+        RemotingSendRequestException, RemotingConnectException;
+
+    TopicRouteData getTopicRouteDataByTopic(String topic,
+        boolean allowTopicNotExist) throws MQClientException, InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException;
+}
diff --git a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/service/impl/EnodeServiceImpl.java b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/service/impl/EnodeServiceImpl.java
new file mode 100644
index 0000000..d647f47
--- /dev/null
+++ b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/service/impl/EnodeServiceImpl.java
@@ -0,0 +1,295 @@
+package org.apache.rocketmq.snode.service.impl;/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.protocol.RequestCode;
+import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.common.protocol.body.ClusterInfo;
+import org.apache.rocketmq.common.protocol.header.ConsumerSendMsgBackRequestHeader;
+import org.apache.rocketmq.common.protocol.header.CreateTopicRequestHeader;
+import org.apache.rocketmq.common.protocol.header.GetMinOffsetRequestHeader;
+import org.apache.rocketmq.common.protocol.header.NotifyConsumerIdsChangedRequestHeader;
+import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
+import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetRequestHeader;
+import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeaderV2;
+import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetRequestHeader;
+import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.InvokeCallback;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+import org.apache.rocketmq.remoting.exception.RemotingConnectException;
+import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
+import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
+import org.apache.rocketmq.remoting.netty.ResponseFuture;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
+import org.apache.rocketmq.snode.SnodeController;
+import org.apache.rocketmq.snode.constant.SnodeConstant;
+import org.apache.rocketmq.snode.service.EnodeService;
+
+public class EnodeServiceImpl implements EnodeService {
+    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME);
+
+    private SnodeController snodeController;
+
+    private final ConcurrentMap<String/* Broker Name */, HashMap<Long/* brokerId */, String/* address */>> enodeTable =
+        new ConcurrentHashMap<>();
+
+    public EnodeServiceImpl(SnodeController snodeController) {
+        this.snodeController = snodeController;
+    }
+
+    @Override
+    public void sendHearbeat(RemotingCommand remotingCommand) {
+        for (Map.Entry<String, HashMap<Long, String>> entry : enodeTable.entrySet()) {
+            String enodeAddr = entry.getValue().get(MixAll.MASTER_ID);
+            if (enodeAddr != null) {
+                try {
+                    this.snodeController.getRemotingClient().invokeSync(enodeAddr, remotingCommand, SnodeConstant.defaultTimeoutMills);
+                } catch (Exception ex) {
+                    log.warn("Send heart beat faild:{} ,ex:{}", enodeAddr, ex);
+                }
+            }
+        }
+    }
+
+    @Override
+    public CompletableFuture<RemotingCommand> pullMessage(final ChannelHandlerContext context,
+        RemotingCommand request) {
+
+        CompletableFuture<RemotingCommand> future = new CompletableFuture<>();
+        try {
+            final PullMessageRequestHeader requestHeader =
+                (PullMessageRequestHeader) request.decodeCommandCustomHeader(PullMessageRequestHeader.class);
+            this.snodeController.getRemotingClient().invokeAsync(requestHeader.getEnodeAddr(), request, SnodeConstant.CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND, new InvokeCallback() {
+                @Override
+                public void operationComplete(ResponseFuture responseFuture) {
+                    RemotingCommand response = responseFuture.getResponseCommand();
+                    if (response != null) {
+                        future.complete(response);
+                    } else {
+                        if (!responseFuture.isSendRequestOK()) {
+                            log.error("Pull message error in async callback: {}", responseFuture.getCause());
+                        } else if (responseFuture.isTimeout()) {
+                            log.warn("Pull message timeout!");
+                        } else {
+                            log.error("Unknown pull message error occurred: {}", responseFuture.getCause());
+                        }
+                    }
+                }
+            });
+        } catch (Exception ex) {
+            log.error("pull message async error:", ex);
+            future.completeExceptionally(ex);
+        }
+        return future;
+    }
+
+    @Override
+    public CompletableFuture<RemotingCommand> sendMessage(RemotingCommand request) {
+        CompletableFuture<RemotingCommand> future = new CompletableFuture<>();
+        try {
+            String enodeName = null;
+            if (request.getCode() == RequestCode.SEND_MESSAGE_V2) {
+                SendMessageRequestHeaderV2 sendMessageRequestHeaderV2 = (SendMessageRequestHeaderV2) request.decodeCommandCustomHeader(SendMessageRequestHeaderV2.class);
+                enodeName = sendMessageRequestHeaderV2.getN();
+            } else {
+                ConsumerSendMsgBackRequestHeader consumerSendMsgBackRequestHeader = (ConsumerSendMsgBackRequestHeader) request.decodeCommandCustomHeader(ConsumerSendMsgBackRequestHeader.class);
+                enodeName = consumerSendMsgBackRequestHeader.getEnodeName();
+            }
+            String enodeAddress = this.snodeController.getNnodeService().getAddressByEnodeName(enodeName, false);
+            log.info("Receive request: {}", request);
+            this.snodeController.getRemotingClient().invokeAsync(enodeAddress, request, SnodeConstant.defaultTimeoutMills, (responseFuture) -> {
+                future.complete(responseFuture.getResponseCommand());
+            });
+        } catch (Exception ex) {
+            log.error("Send message async error:{}", ex);
+            future.completeExceptionally(ex);
+        }
+        return future;
+    }
+
+    @Override
+    public void notifyConsumerIdsChanged(
+        final Channel channel,
+        final String consumerGroup) {
+        if (null == consumerGroup) {
+            log.error("NotifyConsumerIdsChanged consumerGroup is null");
+            return;
+        }
+
+        NotifyConsumerIdsChangedRequestHeader requestHeader = new NotifyConsumerIdsChangedRequestHeader();
+        requestHeader.setConsumerGroup(consumerGroup);
+        RemotingCommand request =
+            RemotingCommand.createRequestCommand(RequestCode.NOTIFY_CONSUMER_IDS_CHANGED, requestHeader);
+
+        try {
+            this.snodeController.getSnodeServer().invokeOneway(channel, request, SnodeConstant.oneWaytimeout);
+        } catch (Exception e) {
+            log.error("NotifyConsumerIdsChanged exception, " + consumerGroup, e.getMessage());
+        }
+    }
+
+    private ClusterInfo getBrokerClusterInfo(
+        final long timeoutMillis) throws InterruptedException, RemotingTimeoutException,
+        RemotingSendRequestException, RemotingConnectException, MQBrokerException {
+        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_BROKER_CLUSTER_INFO, null);
+        RemotingCommand response = this.snodeController.getRemotingClient().invokeSync(null, request, timeoutMillis);
+        switch (response.getCode()) {
+            case ResponseCode.SUCCESS: {
+                return ClusterInfo.decode(response.getBody(), ClusterInfo.class);
+            }
+            default:
+                break;
+        }
+        throw new MQBrokerException(response.getCode(), response.getRemark());
+    }
+
+    @Override
+    public void updateEnodeAddr(String clusterName) throws InterruptedException, RemotingTimeoutException,
+        RemotingSendRequestException, RemotingConnectException, MQBrokerException {
+        synchronized (this) {
+            ClusterInfo clusterInfo = getBrokerClusterInfo(SnodeConstant.defaultTimeoutMills);
+            if (clusterInfo != null) {
+                HashMap<String, Set<String>> enodeAddress = clusterInfo.getClusterAddrTable();
+                for (Map.Entry<String, Set<String>> entry : enodeAddress.entrySet()) {
+                    Set<String> enodeNames = entry.getValue();
+                    if (enodeNames != null) {
+                        for (String enodeName : enodeNames) {
+                            enodeTable.put(enodeName, clusterInfo.getBrokerAddrTable().get(enodeName).getBrokerAddrs());
+                        }
+                    }
+                }
+            }
+        }
+    }
+
+    @Override
+    public boolean persistSubscriptionGroupConfig(SubscriptionGroupConfig subscriptionGroupConfig) {
+        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_SUBSCRIPTIONGROUP, null);
+        boolean persist = false;
+        for (Map.Entry<String, HashMap<Long, String>> entry : enodeTable.entrySet()) {
+            byte[] body = RemotingSerializable.encode(subscriptionGroupConfig);
+            request.setBody(body);
+            String enodeAddress = entry.getValue().get(MixAll.MASTER_ID);
+            try {
+                RemotingCommand response = this.snodeController.getRemotingClient().invokeSync(enodeAddress,
+                    request, SnodeConstant.defaultTimeoutMills);
+                if (response != null && response.getCode() == ResponseCode.SUCCESS) {
+                    persist = true;
+                } else {
+                    persist = false;
+                }
+                log.info("Persist to broker address: {} result: {}", enodeAddress, persist);
+            } catch (Exception ex) {
+                log.warn("Persist Subscription to Enode {} error", enodeAddress);
+                persist = false;
+            }
+        }
+        return persist;
+    }
+
+    @Override
+    public void persistOffsetToEnode(String enodeName, String groupName, String topic, int queueId, long offset) {
+        try {
+            String address = this.snodeController.getNnodeService().getAddressByEnodeName(enodeName, false);
+            UpdateConsumerOffsetRequestHeader requestHeader = new UpdateConsumerOffsetRequestHeader();
+            requestHeader.setTopic(topic);
+            requestHeader.setConsumerGroup(groupName);
+            requestHeader.setQueueId(queueId);
+            requestHeader.setCommitOffset(offset);
+            requestHeader.setEnodeName(enodeName);
+            RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_CONSUMER_OFFSET, requestHeader);
+            this.snodeController.getRemotingClient().invokeOneway(address, request, SnodeConstant.defaultTimeoutMills);
+        } catch (Exception ex) {
+            log.error("Persist offset to Enode error!");
+        }
+    }
+
+    @Override
+    public RemotingCommand getMinOffsetInQueue(String enodeName, String topic,
+        int queueId) throws InterruptedException, RemotingTimeoutException,
+        RemotingSendRequestException, RemotingConnectException {
+        GetMinOffsetRequestHeader requestHeader = new GetMinOffsetRequestHeader();
+        requestHeader.setTopic(topic);
+        requestHeader.setQueueId(queueId);
+        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_MIN_OFFSET, requestHeader);
+        String addr = this.snodeController.getNnodeService().getAddressByEnodeName(enodeName, false);
+        return this.snodeController.getRemotingClient().invokeSync(MixAll.brokerVIPChannel(snodeController.getSnodeConfig().isVipChannelEnabled(), addr),
+            request, SnodeConstant.defaultTimeoutMills);
+    }
+
+    @Override
+    public RemotingCommand loadOffset(String enodeName, String consumerGroup, String topic,
+        int queueId) throws InterruptedException, RemotingTimeoutException,
+        RemotingSendRequestException, RemotingConnectException {
+        QueryConsumerOffsetRequestHeader requestHeader = new QueryConsumerOffsetRequestHeader();
+        requestHeader.setTopic(topic);
+        requestHeader.setConsumerGroup(consumerGroup);
+        requestHeader.setQueueId(queueId);
+
+        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.QUERY_CONSUMER_OFFSET, requestHeader);
+        String addr = this.snodeController.getNnodeService().getAddressByEnodeName(enodeName, false);
+        return this.snodeController.getRemotingClient().invokeSync(MixAll.brokerVIPChannel(this.snodeController.getSnodeConfig().isVipChannelEnabled(), addr),
+            request, SnodeConstant.defaultTimeoutMills);
+    }
+
+    @Override
+    public RemotingCommand getMaxOffsetInQueue(String enodeName,
+        RemotingCommand request) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, RemotingCommandException {
+        String addr = this.snodeController.getNnodeService().getAddressByEnodeName(enodeName, false);
+        return this.snodeController.getRemotingClient().invokeSync(MixAll.brokerVIPChannel(snodeController.getSnodeConfig().isVipChannelEnabled(), addr),
+            request, SnodeConstant.defaultTimeoutMills);
+    }
+
+    @Override
+    public RemotingCommand getOffsetByTimestamp(String enodeName,
+        RemotingCommand request) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
+        String addr = this.snodeController.getNnodeService().getAddressByEnodeName(enodeName, false);
+        return this.snodeController.getRemotingClient().invokeSync(MixAll.brokerVIPChannel(snodeController.getSnodeConfig().isVipChannelEnabled(), addr),
+            request, SnodeConstant.defaultTimeoutMills);
+    }
+
+    @Override
+    public RemotingCommand creatTopic(String enodeName,
+        TopicConfig topicConfig) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
+        CreateTopicRequestHeader requestHeader = new CreateTopicRequestHeader();
+        requestHeader.setTopic(topicConfig.getTopicName());
+        requestHeader.setReadQueueNums(topicConfig.getReadQueueNums());
+        requestHeader.setWriteQueueNums(topicConfig.getWriteQueueNums());
+        requestHeader.setPerm(topicConfig.getPerm());
+        requestHeader.setTopicFilterType(topicConfig.getTopicFilterType().name());
+        requestHeader.setTopicSysFlag(topicConfig.getTopicSysFlag());
+        requestHeader.setOrder(topicConfig.isOrder());
+        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_TOPIC, requestHeader);
+        String address = this.snodeController.getNnodeService().getAddressByEnodeName(enodeName, false);
+        return this.snodeController.getRemotingClient().invokeSync(address,
+            request, SnodeConstant.defaultTimeoutMills);
+    }
+}
diff --git a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/service/impl/NnodeServiceImpl.java b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/service/impl/NnodeServiceImpl.java
new file mode 100644
index 0000000..fe28571
--- /dev/null
+++ b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/service/impl/NnodeServiceImpl.java
@@ -0,0 +1,208 @@
+package org.apache.rocketmq.snode.service.impl;/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.rocketmq.client.ClientConfig;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.namesrv.TopAddressing;
+import org.apache.rocketmq.common.protocol.RequestCode;
+import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.common.protocol.body.ClusterInfo;
+import org.apache.rocketmq.common.protocol.header.namesrv.GetRouteInfoRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.RegisterSnodeRequestHeader;
+import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.exception.RemotingConnectException;
+import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
+import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.snode.SnodeController;
+import org.apache.rocketmq.snode.config.SnodeConfig;
+import org.apache.rocketmq.snode.constant.SnodeConstant;
+import org.apache.rocketmq.snode.exception.SnodeException;
+import org.apache.rocketmq.snode.service.NnodeService;
+
+public class NnodeServiceImpl implements NnodeService {
+    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME);
+    private final TopAddressing topAddressing = new TopAddressing(MixAll.getWSAddr());
+    private String nameSrvAddr = null;
+    private SnodeController snodeController;
+    private ConcurrentHashMap<String /*Topic*/, TopicRouteData> topicRouteDataMap = new ConcurrentHashMap<>(1000);
+    private ClusterInfo clusterInfo;
+
+    public NnodeServiceImpl(SnodeController snodeController) {
+        this.snodeController = snodeController;
+    }
+
+    @Override
+    public void registerSnode(SnodeConfig snodeConfig) {
+        List<String> nnodeAddressList = this.snodeController.getRemotingClient().getNameServerAddressList();
+        RemotingCommand remotingCommand = new RemotingCommand();
+        RegisterSnodeRequestHeader requestHeader = new RegisterSnodeRequestHeader();
+        requestHeader.setSnodeAddr(snodeConfig.getSnodeAddr());
+        requestHeader.setSnodeName(snodeConfig.getSnodeName());
+        requestHeader.setClusterName(snodeConfig.getClusterName());
+        remotingCommand.setCustomHeader(requestHeader);
+        remotingCommand.setCode(RequestCode.REGISTER_SNODE);
+        if (nnodeAddressList != null && nnodeAddressList.size() > 0) {
+            for (String nodeAddress : nnodeAddressList) {
+                try {
+                    this.snodeController.getRemotingClient().invokeSync(nodeAddress, remotingCommand, SnodeConstant.heartbeatTimeout);
+                } catch (Exception ex) {
+                    log.warn("Register Snode to Nnode addr: {} error, ex:{} ", nodeAddress, ex);
+                }
+            }
+        }
+    }
+
+    @Override
+    public void updateTopicRouteDataByTopic() {
+        Set<String> topSet = topicRouteDataMap.keySet();
+        for (String topic : topSet) {
+            try {
+                TopicRouteData topicRouteData = getTopicRouteDataByTopic(topic, false);
+                topicRouteDataMap.put(topic, topicRouteData);
+            } catch (Exception ex) {
+                log.error("Update topic {} error: {}", topic, ex);
+            }
+        }
+    }
+
+    private TopicRouteData getTopicRouteDataByTopicFromNnode(String topic,
+        boolean allowTopicNotExist) throws MQClientException, InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
+        GetRouteInfoRequestHeader requestHeader = new GetRouteInfoRequestHeader();
+        requestHeader.setTopic(topic);
+
+        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ROUTEINTO_BY_TOPIC, requestHeader);
+        RemotingCommand response = this.snodeController.getRemotingClient().invokeSync(null, request, SnodeConstant.defaultTimeoutMills);
+        log.info("getTopicRouteInfoFromNameServer response: " + response);
+        assert response != null;
+        switch (response.getCode()) {
+            case ResponseCode.TOPIC_NOT_EXIST: {
+                if (allowTopicNotExist && !topic.equals(MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC)) {
+                    log.warn("Topic [{}] RouteInfo is not exist value", topic);
+                }
+                break;
+            }
+            case ResponseCode.SUCCESS: {
+                byte[] body = response.getBody();
+                if (body != null) {
+                    return TopicRouteData.decode(body, TopicRouteData.class);
+                }
+            }
+            default:
+                break;
+        }
+
+        throw new MQClientException(response.getCode(), response.getRemark());
+    }
+
+    @Override
+    public TopicRouteData getTopicRouteDataByTopic(
+        String topic,
+        boolean allowTopicNotExist) throws MQClientException, InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
+        if (topic == null || "".equals(topic)) {
+            return null;
+        }
+
+        TopicRouteData topicRouteData = topicRouteDataMap.get(topic);
+        if (topicRouteData == null) {
+            topicRouteData = getTopicRouteDataByTopicFromNnode(topic, allowTopicNotExist);
+            if (topicRouteData != null) {
+                topicRouteDataMap.put(topic, topicRouteData);
+            }
+        }
+        return topicRouteData;
+    }
+
+    @Override
+    public void updateNnodeAddressList(final String addrs) {
+        List<String> list = new ArrayList<String>();
+        String[] addrArray = addrs.split(";");
+        for (String addr : addrArray) {
+            list.add(addr);
+        }
+        this.snodeController.getRemotingClient().updateNameServerAddressList(list);
+    }
+
+    @Override
+    public String fetchNnodeAdress() {
+        try {
+            String addrs = this.topAddressing.fetchNSAddr();
+            if (addrs != null) {
+                if (!addrs.equals(this.nameSrvAddr)) {
+                    log.info("Nnode server address changed, old: {} new: {}", this.nameSrvAddr, addrs);
+                    this.updateNnodeAddressList(addrs);
+                    this.nameSrvAddr = addrs;
+                    return nameSrvAddr;
+                }
+            }
+        } catch (Exception e) {
+            log.error("FetchNnodeServerAddr Exception", e);
+        }
+        return nameSrvAddr;
+    }
+
+    @Override
+    public ClusterInfo updateEnodeClusterInfo() throws InterruptedException, RemotingTimeoutException,
+        RemotingSendRequestException, RemotingConnectException {
+        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_BROKER_CLUSTER_INFO, null);
+
+        RemotingCommand response = this.snodeController.getRemotingClient().invokeSync(null, request, SnodeConstant.defaultTimeoutMills);
+        switch (response.getCode()) {
+            case ResponseCode.SUCCESS: {
+                ClusterInfo clusterInfo = ClusterInfo.decode(response.getBody(), ClusterInfo.class);
+                this.clusterInfo = clusterInfo;
+                return clusterInfo;
+            }
+            default:
+                break;
+        }
+        log.error("Update Cluster info error: {}", response);
+        return clusterInfo;
+    }
+
+    public Set<String> getEnodeClusterInfo(String clusterName) {
+        if (this.clusterInfo == null) {
+            try {
+                updateEnodeClusterInfo();
+            } catch (Exception ex) {
+                log.error("Update Cluster info error:{}", ex);
+            }
+        }
+        return this.clusterInfo.getClusterAddrTable().get(clusterName);
+    }
+
+    @Override
+    public String getAddressByEnodeName(String enodeName,
+        boolean isUseSlave) throws InterruptedException, RemotingTimeoutException,
+        RemotingSendRequestException, RemotingConnectException {
+        if (this.clusterInfo == null) {
+            clusterInfo = this.updateEnodeClusterInfo();
+        }
+        if (this.clusterInfo != null) {
+            return this.clusterInfo.getBrokerAddrTable().get(enodeName).getBrokerAddrs().get(MixAll.MASTER_ID);
+        }
+        return null;
+    }
+}
diff --git a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/service/impl/ScheduledServiceImpl.java b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/service/impl/ScheduledServiceImpl.java
index 23d8867..384a0c2 100644
--- a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/service/impl/ScheduledServiceImpl.java
+++ b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/service/impl/ScheduledServiceImpl.java
@@ -26,21 +26,22 @@ import org.apache.rocketmq.common.protocol.heartbeat.SnodeData;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.snode.SnodeController;
 import org.apache.rocketmq.snode.config.SnodeConfig;
 import org.apache.rocketmq.snode.service.ScheduledService;
-import org.apache.rocketmq.snode.service.SnodeOuterService;
 
 public class ScheduledServiceImpl implements ScheduledService {
     private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME);
 
-    private SnodeOuterService snodeOuterService;
+    private SnodeController snodeController;
+
     private SnodeConfig snodeConfig;
 
     private final RemotingCommand enodeHeartbeat;
 
-    public ScheduledServiceImpl(SnodeOuterService snodeOuterService, SnodeConfig snodeConfig) {
-        this.snodeOuterService = snodeOuterService;
-        this.snodeConfig = snodeConfig;
+    public ScheduledServiceImpl(SnodeController snodeController) {
+        this.snodeController = snodeController;
+        this.snodeConfig = snodeController.getSnodeConfig();
         enodeHeartbeat = RemotingCommand.createRequestCommand(RequestCode.HEART_BEAT, null);
         HeartbeatData heartbeatData = new HeartbeatData();
         heartbeatData.setClientID(snodeConfig.getSnodeName());
@@ -64,7 +65,7 @@ public class ScheduledServiceImpl implements ScheduledService {
             @Override
             public void run() {
                 try {
-                    snodeOuterService.sendHearbeat(enodeHeartbeat);
+                    snodeController.getEnodeService().sendHearbeat(enodeHeartbeat);
                 } catch (Exception e) {
                     log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
                 }
@@ -76,7 +77,7 @@ public class ScheduledServiceImpl implements ScheduledService {
                 @Override
                 public void run() {
                     try {
-                        snodeOuterService.fetchNameServerAddr();
+                        snodeController.getNnodeService().fetchNnodeAdress();
                     } catch (Throwable e) {
                         log.error("ScheduledTask fetchNameServerAddr exception", e);
                     }
@@ -87,7 +88,18 @@ public class ScheduledServiceImpl implements ScheduledService {
         this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
             @Override
             public void run() {
-                snodeOuterService.registerSnode(snodeConfig);
+                snodeController.getNnodeService().registerSnode(snodeConfig);
+            }
+        }, 1000 * 10, Math.max(10000, Math.min(snodeConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
+
+        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    snodeController.getEnodeService().updateEnodeAddr(snodeConfig.getClusterName());
+                } catch (Exception ex) {
+                    log.warn("Update broker addr error:{}", ex);
+                }
             }
         }, 1000 * 10, Math.max(10000, Math.min(snodeConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
 
@@ -95,13 +107,34 @@ public class ScheduledServiceImpl implements ScheduledService {
             @Override
             public void run() {
                 try {
-                    snodeOuterService.updateEnodeAddr(snodeConfig.getClusterName());
+                    snodeController.getNnodeService().updateTopicRouteDataByTopic();
                 } catch (Exception ex) {
                     log.warn("Update broker addr error:{}", ex);
                 }
             }
         }, 1000 * 10, Math.max(10000, Math.min(snodeConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
 
+        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    snodeController.getNnodeService().updateEnodeClusterInfo();
+                } catch (Exception ex) {
+                    log.warn("Update broker addr error:{}", ex);
+                }
+            }
+        }, 1000 * 10, Math.max(10000, Math.min(snodeConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
+
+        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    snodeController.getConsumerOffsetManager().persist();
+                } catch (Throwable e) {
+                    log.error("ScheduledTask fetchNameServerAddr exception", e);
+                }
+            }
+        }, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
     }
 
     @Override
diff --git a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/service/impl/SnodeOuterServiceImpl.java b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/service/impl/SnodeOuterServiceImpl.java
deleted file mode 100644
index 1863f6b..0000000
--- a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/service/impl/SnodeOuterServiceImpl.java
+++ /dev/null
@@ -1,280 +0,0 @@
-package org.apache.rocketmq.snode.service.impl;/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.util.concurrent.CompleteFuture;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import org.apache.rocketmq.client.exception.MQBrokerException;
-import org.apache.rocketmq.common.MixAll;
-import org.apache.rocketmq.common.TopicConfig;
-import org.apache.rocketmq.common.constant.LoggerName;
-import org.apache.rocketmq.common.namesrv.TopAddressing;
-import org.apache.rocketmq.common.protocol.RequestCode;
-import org.apache.rocketmq.common.protocol.ResponseCode;
-import org.apache.rocketmq.common.protocol.body.ClusterInfo;
-import org.apache.rocketmq.common.protocol.header.NotifyConsumerIdsChangedRequestHeader;
-import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
-import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeaderV2;
-import org.apache.rocketmq.common.protocol.header.namesrv.RegisterSnodeRequestHeader;
-import org.apache.rocketmq.logging.InternalLogger;
-import org.apache.rocketmq.logging.InternalLoggerFactory;
-import org.apache.rocketmq.remoting.InvokeCallback;
-import org.apache.rocketmq.remoting.RemotingClient;
-import org.apache.rocketmq.remoting.RemotingClientFactory;
-import org.apache.rocketmq.remoting.exception.RemotingConnectException;
-import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
-import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
-import org.apache.rocketmq.remoting.netty.ResponseFuture;
-import org.apache.rocketmq.remoting.protocol.RemotingCommand;
-import org.apache.rocketmq.snode.SnodeController;
-import org.apache.rocketmq.snode.config.SnodeConfig;
-import org.apache.rocketmq.snode.constant.SnodeConstant;
-import org.apache.rocketmq.snode.service.SnodeOuterService;
-
-public class SnodeOuterServiceImpl implements SnodeOuterService {
-    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME);
-    private final TopAddressing topAddressing = new TopAddressing(MixAll.getWSAddr());
-    private String nameSrvAddr = null;
-    private RemotingClient client;
-    private SnodeController snodeController;
-    private static SnodeOuterServiceImpl snodeOuterService;
-    private final ConcurrentMap<String/* Broker Name */, HashMap<Long/* brokerId */, String/* address */>> enodeTable =
-        new ConcurrentHashMap<>();
-
-    private SnodeOuterServiceImpl() {
-
-    }
-
-    public static SnodeOuterServiceImpl getInstance(SnodeController snodeController) {
-        if (snodeOuterService == null) {
-            synchronized (SnodeOuterServiceImpl.class) {
-                if (snodeOuterService == null) {
-                    snodeOuterService = new SnodeOuterServiceImpl(snodeController);
-                    return snodeOuterService;
-                }
-            }
-        }
-        return snodeOuterService;
-    }
-
-    private SnodeOuterServiceImpl(SnodeController snodeController) {
-        this.snodeController = snodeController;
-        this.client = RemotingClientFactory.createInstance().init(snodeController.getNettyClientConfig(), null);
-    }
-
-    @Override
-    public void start() {
-        this.client.start();
-    }
-
-    @Override
-    public void shutdown() {
-        this.client.shutdown();
-    }
-
-    @Override
-    public void sendHearbeat(RemotingCommand remotingCommand) {
-        for (Map.Entry<String, HashMap<Long, String>> entry : enodeTable.entrySet()) {
-            String enodeAddr = entry.getValue().get(MixAll.MASTER_ID);
-            if (enodeAddr != null) {
-                try {
-                    RemotingCommand response = this.client.invokeSync(enodeAddr, remotingCommand, SnodeConstant.defaultTimeoutMills);
-                } catch (Exception ex) {
-                    log.warn("Send heart beat faild:{} ,ex:{}", enodeAddr, ex);
-                }
-            }
-        }
-    }
-
-    @Override
-    public CompletableFuture<RemotingCommand> pullMessage(final ChannelHandlerContext context,
-        RemotingCommand request) {
-        try {
-            final PullMessageRequestHeader requestHeader =
-                (PullMessageRequestHeader) request.decodeCommandCustomHeader(PullMessageRequestHeader.class);
-            this.client.invokeAsync(requestHeader.getEnodeAddr(), request, SnodeConstant.defaultTimeoutMills, new InvokeCallback() {
-                @Override
-                public void operationComplete(ResponseFuture responseFuture) {
-                    RemotingCommand response = responseFuture.getResponseCommand();
-                    snodeController.getSnodeServer().sendResponse(context, response);
-                }
-            });
-            return null;
-        } catch (Exception ex) {
-            log.error("pull message async error:", ex);
-        }
-        return null;
-    }
-
-    @Override
-    public void saveSubscriptionData(RemotingCommand remotingCommand) {
-
-    }
-
-    @Override
-    public String fetchNameServerAddr() {
-        try {
-            String addrs = this.topAddressing.fetchNSAddr();
-            if (addrs != null) {
-                if (!addrs.equals(this.nameSrvAddr)) {
-                    log.info("name server address changed, old: {} new: {}", this.nameSrvAddr, addrs);
-                    this.updateNameServerAddressList(addrs);
-                    this.nameSrvAddr = addrs;
-                    return nameSrvAddr;
-                }
-            }
-        } catch (Exception e) {
-            log.error("fetchNameServerAddr Exception", e);
-        }
-        return nameSrvAddr;
-    }
-
-    private ClusterInfo getBrokerClusterInfo(
-        final long timeoutMillis) throws InterruptedException, RemotingTimeoutException,
-        RemotingSendRequestException, RemotingConnectException, MQBrokerException {
-        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_BROKER_CLUSTER_INFO, null);
-        RemotingCommand response = this.client.invokeSync(null, request, timeoutMillis);
-        assert response != null;
-        switch (response.getCode()) {
-            case ResponseCode.SUCCESS: {
-                return ClusterInfo.decode(response.getBody(), ClusterInfo.class);
-            }
-            default:
-                break;
-        }
-        throw new MQBrokerException(response.getCode(), response.getRemark());
-    }
-
-    @Override
-    public void updateEnodeAddr(String clusterName) throws InterruptedException, RemotingTimeoutException,
-        RemotingSendRequestException, RemotingConnectException, MQBrokerException {
-        synchronized (this) {
-            ClusterInfo clusterInfo = getBrokerClusterInfo(SnodeConstant.defaultTimeoutMills);
-            if (clusterInfo != null) {
-                HashMap<String, Set<String>> brokerAddrs = clusterInfo.getClusterAddrTable();
-                for (Map.Entry<String, Set<String>> entry : brokerAddrs.entrySet()) {
-                    Set<String> brokerNames = entry.getValue();
-                    if (brokerNames != null) {
-                        for (String brokerName : brokerNames) {
-                            enodeTable.put(brokerName, clusterInfo.getBrokerAddrTable().get(brokerName).getBrokerAddrs());
-                        }
-                    }
-                }
-            }
-        }
-    }
-
-    public void updateNameServerAddressList(final String addrs) {
-        List<String> list = new ArrayList<String>();
-        String[] addrArray = addrs.split(";");
-        for (String addr : addrArray) {
-            list.add(addr);
-        }
-        this.client.updateNameServerAddressList(list);
-    }
-
-    public void registerSnode(SnodeConfig snodeConfig) {
-        List<String> nameServerAddressList = this.client.getNameServerAddressList();
-        RemotingCommand remotingCommand = new RemotingCommand();
-        RegisterSnodeRequestHeader requestHeader = new RegisterSnodeRequestHeader();
-        requestHeader.setSnodeAddr(snodeConfig.getSnodeAddr());
-        requestHeader.setSnodeName(snodeConfig.getSnodeName());
-        requestHeader.setClusterName(snodeConfig.getClusterName());
-        remotingCommand.setCustomHeader(requestHeader);
-        remotingCommand.setCode(RequestCode.REGISTER_SNODE);
-        if (nameServerAddressList != null && nameServerAddressList.size() > 0) {
-            for (String nameServer : nameServerAddressList) {
-                try {
-                    this.client.invokeSync(nameSrvAddr, remotingCommand, SnodeConstant.heartbeatTimeout);
-                } catch (Exception ex) {
-                    log.warn("Register Snode to Nameserver addr: {} error, ex:{} ", nameServer, ex);
-                }
-            }
-        }
-    }
-
-    @Override
-    public CompletableFuture<RemotingCommand> sendMessage(RemotingCommand request) {
-        CompletableFuture<RemotingCommand> future = new CompletableFuture<>();
-        try {
-            SendMessageRequestHeaderV2 sendMessageRequestHeaderV2 = (SendMessageRequestHeaderV2) request.decodeCommandCustomHeader(SendMessageRequestHeaderV2.class);
-            this.client.invokeAsync(sendMessageRequestHeaderV2.getN(), request, SnodeConstant.defaultTimeoutMills, (responseFuture) -> {
-                future.complete(responseFuture.getResponseCommand());
-            });
-        } catch (Exception ex) {
-            log.error("Send message async error:{}", ex);
-            future.completeExceptionally(ex);
-        }
-        return future;
-    }
-
-    @Override
-    public void notifyConsumerIdsChanged(
-        final Channel channel,
-        final String consumerGroup) {
-        if (null == consumerGroup) {
-            log.error("notifyConsumerIdsChanged consumerGroup is null");
-            return;
-        }
-
-        NotifyConsumerIdsChangedRequestHeader requestHeader = new NotifyConsumerIdsChangedRequestHeader();
-        requestHeader.setConsumerGroup(consumerGroup);
-        RemotingCommand request =
-            RemotingCommand.createRequestCommand(RequestCode.NOTIFY_CONSUMER_IDS_CHANGED, requestHeader);
-
-        try {
-            this.snodeController.getSnodeServer().invokeOneway(channel, request, SnodeConstant.oneWaytimeout);
-        } catch (Exception e) {
-            log.error("notifyConsumerIdsChanged exception, " + consumerGroup, e.getMessage());
-        }
-    }
-
-    @Override
-    public RemotingCommand creatTopic(TopicConfig topicConfig) {
-//        CreateTopicRequestHeader requestHeader = new CreateTopicRequestHeader();
-//        requestHeader.setTopic(topicConfig.getTopicName());
-//        requestHeader.setDefaultTopic(defaultTopic);
-//        requestHeader.setReadQueueNums(topicConfig.getReadQueueNums());
-//        requestHeader.setWriteQueueNums(topicConfig.getWriteQueueNums());
-//        requestHeader.setPerm(topicConfig.getPerm());
-//        requestHeader.setTopicFilterType(topicConfig.getTopicFilterType().name());
-//        requestHeader.setTopicSysFlag(topicConfig.getTopicSysFlag());
-//        requestHeader.setOrder(topicConfig.isOrder());
-//
-//        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_TOPIC, requestHeader);
-//
-//        RemotingCommand response = this.client.invokeSync(,
-//            request, defaultTimeoutMills);
-//        assert response != null;
-//        switch (response.getCode()) {
-//            case ResponseCode.SUCCESS: {
-//                return;
-//            }
-//            default:
-//                break;
-//        }
-        return null;
-    }
-}