You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2019/01/02 16:45:29 UTC

[rocketmq] 05/14: Add Snode server

This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch snode
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit 9f5383e58a6886b253395d5b6f2852e1b13552d2
Author: duhenglucky <du...@gmail.com>
AuthorDate: Fri Dec 21 02:43:48 2018 +0800

    Add Snode server
---
 .../apache/rocketmq/broker/BrokerController.java   |  11 +-
 .../org/apache/rocketmq/broker/BrokerStartup.java  |   7 +-
 .../broker/client/ClientHousekeepingService.java   |   1 -
 .../broker/longpolling/PullRequestHoldService.java |  18 +-
 .../broker/processor/ClientManageProcessor.java    |   2 +
 .../processor/SnodePullMessageProcessor.java       | 449 +++++++++++++++++++++
 .../rocketmq/client/impl/MQClientAPIImpl.java      |  17 +-
 .../client/impl/factory/MQClientInstance.java      |   4 +-
 .../impl/producer/DefaultMQProducerImpl.java       |  37 +-
 .../rocketmq/common/constant/LoggerName.java       |   1 +
 .../common/namesrv/RegisterSnodeResult.java        |  20 +
 .../rocketmq/common/protocol/RequestCode.java      |   5 +
 .../rocketmq/common/protocol/body/ClusterInfo.java |  22 +
 .../protocol/header/PullMessageRequestHeader.java  |  10 +
 .../protocol/header/SendMessageRequestHeader.java  |  10 +
 .../header/SendMessageRequestHeaderV2.java         |  12 +
 .../header/namesrv/RegisterSnodeRequestHeader.java |  61 +++
 .../namesrv/RegisterSnodeResponseHeader.java       |  20 +
 .../common/protocol/heartbeat/HeartbeatData.java   |  20 +-
 .../common/protocol/heartbeat/SnodeData.java       |  34 ++
 .../rocketmq/common/protocol/route/SnodeData.java  |  57 +++
 .../common/utils/PositiveAtomicCounter.java        |  40 ++
 distribution/conf/logback_snode.xml                |  92 +++++
 .../rocketmq/example/quickstart/Producer.java      |   4 +-
 .../apache/rocketmq/example/simple/Producer.java   |   1 -
 .../namesrv/processor/DefaultRequestProcessor.java |  18 +-
 .../namesrv/routeinfo/RouteInfoManager.java        |  31 ++
 pom.xml                                            |   1 +
 rocketmq-snode/pom.xml                             |  93 +++++
 .../org/apache/rocketmq/snode/SnodeController.java | 205 ++++++++++
 .../org/apache/rocketmq/snode/SnodeStartup.java    | 144 +++++++
 .../rocketmq/snode/client/ClientChannelInfo.java   | 100 +++++
 .../snode}/client/ClientHousekeepingService.java   |  36 +-
 .../rocketmq/snode/client/ConsumerGroupEvent.java  |  33 ++
 .../rocketmq/snode/client/ConsumerGroupInfo.java   | 248 ++++++++++++
 .../snode/client/ConsumerIdsChangeListener.java    |  22 +
 .../rocketmq/snode/client/ConsumerManager.java     | 189 +++++++++
 .../client/DefaultConsumerIdsChangeListener.java   |  64 +++
 .../rocketmq/snode/client/ProducerManager.java     | 224 ++++++++++
 .../snode/client/SubscriptionGroupManager.java     | 200 +++++++++
 .../apache/rocketmq/snode/config/SnodeConfig.java  | 223 ++++++++++
 .../snode/processor/ConsumerManageProcessor.java   |  96 +++++
 .../snode/processor/HearbeatProcessor.java         |  92 +++++
 .../snode/processor/PullMessageProcessor.java      |  40 ++
 .../snode/processor/SendMessageProcessor.java      |  44 ++
 .../rocketmq/snode/service/ScheduledService.java   |  21 +
 .../snode/service/SendTransferService.java         |  26 ++
 .../rocketmq/snode/service/SnodeOuterService.java  |  52 +++
 .../snode/service/impl/ScheduledServiceImpl.java   | 113 ++++++
 .../service/impl/SendTransferServiceImpl.java      |  45 +++
 .../snode/service/impl/SnodeOuterServiceImpl.java  | 270 +++++++++++++
 .../rocketmq/snode/topic/TopicConfigManager.java   |  19 +
 52 files changed, 3542 insertions(+), 62 deletions(-)

diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index 1cbd39c..9639f65 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -58,6 +58,7 @@ import org.apache.rocketmq.broker.processor.EndTransactionProcessor;
 import org.apache.rocketmq.broker.processor.PullMessageProcessor;
 import org.apache.rocketmq.broker.processor.QueryMessageProcessor;
 import org.apache.rocketmq.broker.processor.SendMessageProcessor;
+import org.apache.rocketmq.broker.processor.SnodePullMessageProcessor;
 import org.apache.rocketmq.broker.slave.SlaveSynchronize;
 import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager;
 import org.apache.rocketmq.broker.topic.TopicConfigManager;
@@ -115,6 +116,7 @@ public class BrokerController {
     private final ProducerManager producerManager;
     private final ClientHousekeepingService clientHousekeepingService;
     private final PullMessageProcessor pullMessageProcessor;
+    private final SnodePullMessageProcessor snodePullMessageProcessor;
     private final PullRequestHoldService pullRequestHoldService;
     private final MessageArrivingListener messageArrivingListener;
     private final Broker2Client broker2Client;
@@ -184,7 +186,7 @@ public class BrokerController {
         this.filterServerManager = new FilterServerManager(this);
 
         this.slaveSynchronize = new SlaveSynchronize(this);
-
+        this.snodePullMessageProcessor = new SnodePullMessageProcessor(this);
         this.sendThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getSendThreadPoolQueueCapacity());
         this.pullThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getPullThreadPoolQueueCapacity());
         this.queryThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getQueryThreadPoolQueueCapacity());
@@ -513,7 +515,8 @@ public class BrokerController {
          */
         this.remotingServer.registerProcessor(RequestCode.PULL_MESSAGE, this.pullMessageProcessor, this.pullMessageExecutor);
         this.pullMessageProcessor.registerConsumeMessageHook(consumeMessageHookList);
-
+        this.remotingServer.registerProcessor(RequestCode.SNODE_PULL_MESSAGE, this.snodePullMessageProcessor,pullMessageExecutor);
+        this.snodePullMessageProcessor.registerConsumeMessageHook(consumeMessageHookList);
         /**
          * QueryMessageProcessor
          */
@@ -671,6 +674,10 @@ public class BrokerController {
         return pullMessageProcessor;
     }
 
+    public SnodePullMessageProcessor getSnodePullMessageProcessor() {
+        return snodePullMessageProcessor;
+    }
+
     public PullRequestHoldService getPullRequestHoldService() {
         return pullRequestHoldService;
     }
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java
index 4b986c0..c623d52 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java
@@ -60,9 +60,7 @@ public class BrokerStartup {
 
     public static BrokerController start(BrokerController controller) {
         try {
-
             controller.start();
-
             String tip = "The broker[" + controller.getBrokerConfig().getBrokerName() + ", "
                 + controller.getBrokerAddr() + "] boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
 
@@ -100,8 +98,9 @@ public class BrokerStartup {
 
         try {
             //PackageConflictDetect.detectFastjson();
+
             Options options = ServerUtil.buildCommandlineOptions(new Options());
-            commandLine = ServerUtil.parseCmdLine("mqbroker", args, buildCommandlineOptions(options),
+            commandLine = ServerUtil.parseCmdLine("broker", args, buildCommandlineOptions(options),
                 new PosixParser());
             if (null == commandLine) {
                 System.exit(-1);
@@ -260,6 +259,8 @@ public class BrokerStartup {
     }
 
     private static Options buildCommandlineOptions(final Options options) {
+
+
         Option opt = new Option("c", "configFile", true, "Broker config properties file");
         opt.setRequired(false);
         options.addOption(opt);
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ClientHousekeepingService.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ClientHousekeepingService.java
index d536db5..7e023dd 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/client/ClientHousekeepingService.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ClientHousekeepingService.java
@@ -55,7 +55,6 @@ public class ClientHousekeepingService implements ChannelEventListener {
     private void scanExceptionChannel() {
         this.brokerController.getProducerManager().scanNotActiveChannel();
         this.brokerController.getConsumerManager().scanNotActiveChannel();
-        this.brokerController.getFilterServerManager().scanNotActiveChannel();
     }
 
     public void shutdown() {
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequestHoldService.java b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequestHoldService.java
index 417ec0d..af6addc 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequestHoldService.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequestHoldService.java
@@ -138,8 +138,13 @@ public class PullRequestHoldService extends ServiceThread {
 
                         if (match) {
                             try {
-                                this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),
-                                    request.getRequestCommand());
+                                if (request.getMessageFilter() == null && request.getSubscriptionData() == null) {
+                                    this.brokerController.getSnodePullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),
+                                        request.getRequestCommand());
+                                } else {
+                                    this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),
+                                        request.getRequestCommand());
+                                }
                             } catch (Throwable e) {
                                 log.error("execute request when wakeup failed.", e);
                             }
@@ -149,8 +154,13 @@ public class PullRequestHoldService extends ServiceThread {
 
                     if (System.currentTimeMillis() >= (request.getSuspendTimestamp() + request.getTimeoutMillis())) {
                         try {
-                            this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),
-                                request.getRequestCommand());
+                            if (request.getMessageFilter() == null && request.getSubscriptionData() == null) {
+                                this.brokerController.getSnodePullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),
+                                    request.getRequestCommand());
+                            } else {
+                                this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),
+                                    request.getRequestCommand());
+                            }
                         } catch (Throwable e) {
                             log.error("execute request when wakeup failed.", e);
                         }
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/ClientManageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/ClientManageProcessor.java
index b5e6085..f0d155f 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/ClientManageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/ClientManageProcessor.java
@@ -74,6 +74,7 @@ public class ClientManageProcessor implements NettyRequestProcessor {
     public RemotingCommand heartBeat(ChannelHandlerContext ctx, RemotingCommand request) {
         RemotingCommand response = RemotingCommand.createResponseCommand(null);
         HeartbeatData heartbeatData = HeartbeatData.decode(request.getBody(), HeartbeatData.class);
+        log.info("heart beat request:{}", heartbeatData);
         ClientChannelInfo clientChannelInfo = new ClientChannelInfo(
             ctx.channel(),
             heartbeatData.getClientID(),
@@ -121,6 +122,7 @@ public class ClientManageProcessor implements NettyRequestProcessor {
             this.brokerController.getProducerManager().registerProducer(data.getGroupName(),
                 clientChannelInfo);
         }
+
         response.setCode(ResponseCode.SUCCESS);
         response.setRemark(null);
         return response;
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/SnodePullMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/SnodePullMessageProcessor.java
new file mode 100644
index 0000000..8beb6fa
--- /dev/null
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/SnodePullMessageProcessor.java
@@ -0,0 +1,449 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.broker.processor;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.FileRegion;
+import java.nio.ByteBuffer;
+import java.util.List;
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.client.ConsumerGroupInfo;
+import org.apache.rocketmq.broker.filter.ConsumerFilterData;
+import org.apache.rocketmq.broker.filter.ConsumerFilterManager;
+import org.apache.rocketmq.broker.filter.ExpressionForRetryMessageFilter;
+import org.apache.rocketmq.broker.filter.ExpressionMessageFilter;
+import org.apache.rocketmq.broker.longpolling.PullRequest;
+import org.apache.rocketmq.broker.mqtrace.ConsumeMessageContext;
+import org.apache.rocketmq.broker.mqtrace.ConsumeMessageHook;
+import org.apache.rocketmq.broker.pagecache.ManyMessageTransfer;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.TopicFilterType;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.constant.PermName;
+import org.apache.rocketmq.common.filter.ExpressionType;
+import org.apache.rocketmq.common.filter.FilterAPI;
+import org.apache.rocketmq.common.help.FAQUrl;
+import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
+import org.apache.rocketmq.common.protocol.header.PullMessageResponseHeader;
+import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
+import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
+import org.apache.rocketmq.common.protocol.topic.OffsetMovedEvent;
+import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
+import org.apache.rocketmq.common.sysflag.PullSysFlag;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
+import org.apache.rocketmq.remoting.common.RemotingUtil;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
+import org.apache.rocketmq.remoting.netty.RequestTask;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.store.GetMessageResult;
+import org.apache.rocketmq.store.MessageExtBrokerInner;
+import org.apache.rocketmq.store.MessageFilter;
+import org.apache.rocketmq.store.PutMessageResult;
+import org.apache.rocketmq.store.config.BrokerRole;
+import org.apache.rocketmq.store.stats.BrokerStatsManager;
+
+public class SnodePullMessageProcessor implements NettyRequestProcessor {
+    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
+    private final BrokerController brokerController;
+    private List<ConsumeMessageHook> consumeMessageHookList;
+
+    public SnodePullMessageProcessor(final BrokerController brokerController) {
+        this.brokerController = brokerController;
+    }
+
+    @Override
+    public RemotingCommand processRequest(final ChannelHandlerContext ctx,
+        RemotingCommand request) throws RemotingCommandException {
+        return this.processRequest(ctx.channel(), request, true);
+    }
+
+    @Override
+    public boolean rejectRequest() {
+        return false;
+    }
+
+    private RemotingCommand processRequest(final Channel channel, RemotingCommand request, boolean brokerAllowSuspend)
+        throws RemotingCommandException {
+        RemotingCommand response = RemotingCommand.createResponseCommand(PullMessageResponseHeader.class);
+        final PullMessageResponseHeader responseHeader = (PullMessageResponseHeader) response.readCustomHeader();
+        final PullMessageRequestHeader requestHeader =
+            (PullMessageRequestHeader) request.decodeCommandCustomHeader(PullMessageRequestHeader.class);
+
+        response.setOpaque(request.getOpaque());
+
+        log.info("receive PullMessage request command, {}", request);
+        final boolean hasSuspendFlag = PullSysFlag.hasSuspendFlag(requestHeader.getSysFlag());
+        final boolean hasCommitOffsetFlag = PullSysFlag.hasCommitOffsetFlag(requestHeader.getSysFlag());
+        final boolean hasSubscriptionFlag = PullSysFlag.hasSubscriptionFlag(requestHeader.getSysFlag());
+
+        final long suspendTimeoutMillisLong = hasSuspendFlag ? requestHeader.getSuspendTimeoutMillis() : 0;
+        if (!PermName.isReadable(this.brokerController.getBrokerConfig().getBrokerPermission())) {
+            response.setCode(ResponseCode.NO_PERMISSION);
+            response.setRemark(String.format("the broker[%s] pulling message is forbidden", this.brokerController.getBrokerConfig().getBrokerIP1()));
+            return response;
+        }
+
+        final GetMessageResult getMessageResult =
+            this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
+                requestHeader.getQueueId(), requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), null);
+
+        log.info("Get message response:{}",getMessageResult);
+        if (getMessageResult != null) {
+            response.setRemark(getMessageResult.getStatus().name());
+            responseHeader.setNextBeginOffset(getMessageResult.getNextBeginOffset());
+            responseHeader.setMinOffset(getMessageResult.getMinOffset());
+            responseHeader.setMaxOffset(getMessageResult.getMaxOffset());
+
+            if (getMessageResult.isSuggestPullingFromSlave()) {
+                responseHeader.setSuggestWhichBrokerId(0L);
+            } else {
+                responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);
+            }
+
+            switch (this.brokerController.getMessageStoreConfig().getBrokerRole()) {
+                case ASYNC_MASTER:
+                case SYNC_MASTER:
+                    break;
+                case SLAVE:
+                    if (!this.brokerController.getBrokerConfig().isSlaveReadEnable()) {
+                        response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
+                        responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);
+                    }
+                    break;
+            }
+
+//            if (this.brokerController.getBrokerConfig().isSlaveReadEnable()) {
+//                // consume too slow ,redirect to another machine
+//                if (getMessageResult.isSuggestPullingFromSlave()) {
+//                    responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly());
+//                }
+//                // consume ok
+//                else {
+//                    responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getBrokerId());
+//                }
+//            } else {
+//                responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);
+//            }
+
+            switch (getMessageResult.getStatus()) {
+                case FOUND:
+                    response.setCode(ResponseCode.SUCCESS);
+                    break;
+                case MESSAGE_WAS_REMOVING:
+                    response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
+                    break;
+                case NO_MATCHED_LOGIC_QUEUE:
+                case NO_MESSAGE_IN_QUEUE:
+                    if (0 != requestHeader.getQueueOffset()) {
+                        response.setCode(ResponseCode.PULL_OFFSET_MOVED);
+
+                        // XXX: warn and notify me
+                        log.info("the broker store no queue data, fix the request offset {} to {}, Topic: {} QueueId: {} Consumer Group: {}",
+                            requestHeader.getQueueOffset(),
+                            getMessageResult.getNextBeginOffset(),
+                            requestHeader.getTopic(),
+                            requestHeader.getQueueId(),
+                            requestHeader.getConsumerGroup()
+                        );
+                    } else {
+                        response.setCode(ResponseCode.PULL_NOT_FOUND);
+                    }
+                    break;
+                case NO_MATCHED_MESSAGE:
+                    response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
+                    break;
+                case OFFSET_FOUND_NULL:
+                    response.setCode(ResponseCode.PULL_NOT_FOUND);
+                    break;
+                case OFFSET_OVERFLOW_BADLY:
+                    response.setCode(ResponseCode.PULL_OFFSET_MOVED);
+                    // XXX: warn and notify me
+                    log.info("the request offset: {} over flow badly, broker max offset: {}, consumer: {}",
+                        requestHeader.getQueueOffset(), getMessageResult.getMaxOffset(), channel.remoteAddress());
+                    break;
+                case OFFSET_OVERFLOW_ONE:
+                    response.setCode(ResponseCode.PULL_NOT_FOUND);
+                    break;
+                case OFFSET_TOO_SMALL:
+                    response.setCode(ResponseCode.PULL_OFFSET_MOVED);
+                    log.info("the request offset too small. group={}, topic={}, requestOffset={}, brokerMinOffset={}, clientIp={}",
+                        requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueOffset(),
+                        getMessageResult.getMinOffset(), channel.remoteAddress());
+                    break;
+                default:
+                    assert false;
+                    break;
+            }
+
+            if (this.hasConsumeMessageHook()) {
+                ConsumeMessageContext context = new ConsumeMessageContext();
+                context.setConsumerGroup(requestHeader.getConsumerGroup());
+                context.setTopic(requestHeader.getTopic());
+                context.setQueueId(requestHeader.getQueueId());
+
+                String owner = request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER);
+
+                switch (response.getCode()) {
+                    case ResponseCode.SUCCESS:
+                        int commercialBaseCount = brokerController.getBrokerConfig().getCommercialBaseCount();
+                        int incValue = getMessageResult.getMsgCount4Commercial() * commercialBaseCount;
+
+                        context.setCommercialRcvStats(BrokerStatsManager.StatsType.RCV_SUCCESS);
+                        context.setCommercialRcvTimes(incValue);
+                        context.setCommercialRcvSize(getMessageResult.getBufferTotalSize());
+                        context.setCommercialOwner(owner);
+
+                        break;
+                    case ResponseCode.PULL_NOT_FOUND:
+                        if (!brokerAllowSuspend) {
+
+                            context.setCommercialRcvStats(BrokerStatsManager.StatsType.RCV_EPOLLS);
+                            context.setCommercialRcvTimes(1);
+                            context.setCommercialOwner(owner);
+
+                        }
+                        break;
+                    case ResponseCode.PULL_RETRY_IMMEDIATELY:
+                    case ResponseCode.PULL_OFFSET_MOVED:
+                        context.setCommercialRcvStats(BrokerStatsManager.StatsType.RCV_EPOLLS);
+                        context.setCommercialRcvTimes(1);
+                        context.setCommercialOwner(owner);
+                        break;
+                    default:
+                        assert false;
+                        break;
+                }
+
+                this.executeConsumeMessageHookBefore(context);
+            }
+
+            switch (response.getCode()) {
+                case ResponseCode.SUCCESS:
+
+                    this.brokerController.getBrokerStatsManager().incGroupGetNums(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
+                        getMessageResult.getMessageCount());
+
+                    this.brokerController.getBrokerStatsManager().incGroupGetSize(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
+                        getMessageResult.getBufferTotalSize());
+
+                    this.brokerController.getBrokerStatsManager().incBrokerGetNums(getMessageResult.getMessageCount());
+                    if (this.brokerController.getBrokerConfig().isTransferMsgByHeap()) {
+                        final long beginTimeMills = this.brokerController.getMessageStore().now();
+                        final byte[] r = this.readGetMessageResult(getMessageResult, requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId());
+                        this.brokerController.getBrokerStatsManager().incGroupGetLatency(requestHeader.getConsumerGroup(),
+                            requestHeader.getTopic(), requestHeader.getQueueId(),
+                            (int) (this.brokerController.getMessageStore().now() - beginTimeMills));
+                        response.setBody(r);
+                    } else {
+                        try {
+                            FileRegion fileRegion =
+                                new ManyMessageTransfer(response.encodeHeader(getMessageResult.getBufferTotalSize()), getMessageResult);
+                            channel.writeAndFlush(fileRegion).addListener(new ChannelFutureListener() {
+                                @Override
+                                public void operationComplete(ChannelFuture future) throws Exception {
+                                    getMessageResult.release();
+                                    if (!future.isSuccess()) {
+                                        log.error("transfer many message by pagecache failed, {}", channel.remoteAddress(), future.cause());
+                                    }
+                                }
+                            });
+                        } catch (Throwable e) {
+                            log.error("transfer many message by pagecache exception", e);
+                            getMessageResult.release();
+                        }
+
+                        response = null;
+                    }
+                    break;
+                case ResponseCode.PULL_NOT_FOUND:
+
+                    if (brokerAllowSuspend && hasSuspendFlag) {
+                        long pollingTimeMills = suspendTimeoutMillisLong;
+                        if (!this.brokerController.getBrokerConfig().isLongPollingEnable()) {
+                            pollingTimeMills = this.brokerController.getBrokerConfig().getShortPollingTimeMills();
+                        }
+
+                        String topic = requestHeader.getTopic();
+                        long offset = requestHeader.getQueueOffset();
+                        int queueId = requestHeader.getQueueId();
+                        PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills,
+                            this.brokerController.getMessageStore().now(), offset, null, null);
+                        this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest);
+                        response = null;
+                        break;
+                    }
+
+                case ResponseCode.PULL_RETRY_IMMEDIATELY:
+                    break;
+                case ResponseCode.PULL_OFFSET_MOVED:
+                    if (this.brokerController.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE
+                        || this.brokerController.getMessageStoreConfig().isOffsetCheckInSlave()) {
+                        MessageQueue mq = new MessageQueue();
+                        mq.setTopic(requestHeader.getTopic());
+                        mq.setQueueId(requestHeader.getQueueId());
+                        mq.setBrokerName(this.brokerController.getBrokerConfig().getBrokerName());
+
+                        OffsetMovedEvent event = new OffsetMovedEvent();
+                        event.setConsumerGroup(requestHeader.getConsumerGroup());
+                        event.setMessageQueue(mq);
+                        event.setOffsetRequest(requestHeader.getQueueOffset());
+                        event.setOffsetNew(getMessageResult.getNextBeginOffset());
+                        this.generateOffsetMovedEvent(event);
+                        log.warn(
+                            "PULL_OFFSET_MOVED:correction offset. topic={}, groupId={}, requestOffset={}, newOffset={}, suggestBrokerId={}",
+                            requestHeader.getTopic(), requestHeader.getConsumerGroup(), event.getOffsetRequest(), event.getOffsetNew(),
+                            responseHeader.getSuggestWhichBrokerId());
+                    } else {
+//                        responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getBrokerId());
+                        response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
+                        log.warn("PULL_OFFSET_MOVED:none correction. topic={}, groupId={}, requestOffset={}, suggestBrokerId={}",
+                            requestHeader.getTopic(), requestHeader.getConsumerGroup(), requestHeader.getQueueOffset(),
+                            responseHeader.getSuggestWhichBrokerId());
+                    }
+
+                    break;
+                default:
+                    assert false;
+            }
+        } else {
+            response.setCode(ResponseCode.SYSTEM_ERROR);
+            response.setRemark("store getMessage return null");
+        }
+
+        boolean storeOffsetEnable = brokerAllowSuspend;
+        storeOffsetEnable = storeOffsetEnable && hasCommitOffsetFlag;
+        storeOffsetEnable = storeOffsetEnable
+            && this.brokerController.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE;
+        if (storeOffsetEnable) {
+            this.brokerController.getConsumerOffsetManager().commitOffset(RemotingHelper.parseChannelRemoteAddr(channel),
+                requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getCommitOffset());
+        }
+        return response;
+    }
+
+    public boolean hasConsumeMessageHook() {
+        return consumeMessageHookList != null && !this.consumeMessageHookList.isEmpty();
+    }
+
+    public void executeConsumeMessageHookBefore(final ConsumeMessageContext context) {
+        if (hasConsumeMessageHook()) {
+            for (ConsumeMessageHook hook : this.consumeMessageHookList) {
+                try {
+                    hook.consumeMessageBefore(context);
+                } catch (Throwable e) {
+                }
+            }
+        }
+    }
+
+    private byte[] readGetMessageResult(final GetMessageResult getMessageResult, final String group, final String topic,
+        final int queueId) {
+        final ByteBuffer byteBuffer = ByteBuffer.allocate(getMessageResult.getBufferTotalSize());
+
+        long storeTimestamp = 0;
+        try {
+            List<ByteBuffer> messageBufferList = getMessageResult.getMessageBufferList();
+            for (ByteBuffer bb : messageBufferList) {
+
+                byteBuffer.put(bb);
+                storeTimestamp = bb.getLong(MessageDecoder.MESSAGE_STORE_TIMESTAMP_POSTION);
+            }
+        } finally {
+            getMessageResult.release();
+        }
+
+        this.brokerController.getBrokerStatsManager().recordDiskFallBehindTime(group, topic, queueId, this.brokerController.getMessageStore().now() - storeTimestamp);
+        return byteBuffer.array();
+    }
+
+    private void generateOffsetMovedEvent(final OffsetMovedEvent event) {
+        try {
+            MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
+            msgInner.setTopic(MixAll.OFFSET_MOVED_EVENT);
+            msgInner.setTags(event.getConsumerGroup());
+            msgInner.setDelayTimeLevel(0);
+            msgInner.setKeys(event.getConsumerGroup());
+            msgInner.setBody(event.encode());
+            msgInner.setFlag(0);
+            msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
+            msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(TopicFilterType.SINGLE_TAG, msgInner.getTags()));
+
+            msgInner.setQueueId(0);
+            msgInner.setSysFlag(0);
+            msgInner.setBornTimestamp(System.currentTimeMillis());
+            msgInner.setBornHost(RemotingUtil.string2SocketAddress(this.brokerController.getBrokerAddr()));
+            msgInner.setStoreHost(msgInner.getBornHost());
+
+            msgInner.setReconsumeTimes(0);
+
+            PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
+        } catch (Exception e) {
+            log.warn(String.format("generateOffsetMovedEvent Exception, %s", event.toString()), e);
+        }
+    }
+
+    public void executeRequestWhenWakeup(final Channel channel,
+        final RemotingCommand request) throws RemotingCommandException {
+        Runnable run = new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    final RemotingCommand response = SnodePullMessageProcessor.this.processRequest(channel, request, false);
+
+                    if (response != null) {
+                        response.setOpaque(request.getOpaque());
+                        response.markResponseType();
+                        try {
+                            channel.writeAndFlush(response).addListener(new ChannelFutureListener() {
+                                @Override
+                                public void operationComplete(ChannelFuture future) throws Exception {
+                                    if (!future.isSuccess()) {
+                                        log.error("processRequestWrapper response to {} failed",
+                                            future.channel().remoteAddress(), future.cause());
+                                        log.error(request.toString());
+                                        log.error(response.toString());
+                                    }
+                                }
+                            });
+                        } catch (Throwable e) {
+                            log.error("processRequestWrapper process request over, but response failed", e);
+                            log.error(request.toString());
+                            log.error(response.toString());
+                        }
+                    }
+                } catch (RemotingCommandException e1) {
+                    log.error("excuteRequestWhenWakeup run", e1);
+                }
+            }
+        };
+        this.brokerController.getPullMessageExecutor().submit(new RequestTask(run, channel, request));
+    }
+
+    public void registerConsumeMessageHook(List<ConsumeMessageHook> sendMessageHookList) {
+        this.consumeMessageHookList = sendMessageHookList;
+    }
+}
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
index 0fa1ae7..6302cd0 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
@@ -353,7 +353,8 @@ public class MQClientAPIImpl {
         final long timeoutMillis,
         final RemotingCommand request
     ) throws RemotingException, MQBrokerException, InterruptedException {
-        RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
+        String addrS = "localhost:11911";//TODO FIXME
+        RemotingCommand response = this.remotingClient.invokeSync(addrS, request, timeoutMillis);
         assert response != null;
         return this.processSendResponse(brokerName, msg, response);
     }
@@ -557,14 +558,15 @@ public class MQClientAPIImpl {
     }
 
     public PullResult pullMessage(
-        final String addr,
+         String addr,
         final PullMessageRequestHeader requestHeader,
         final long timeoutMillis,
         final CommunicationMode communicationMode,
         final PullCallback pullCallback
     ) throws RemotingException, MQBrokerException, InterruptedException {
-        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, requestHeader);
-
+        requestHeader.setEnodeAddr(addr);
+        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.SNODE_PULL_MESSAGE, requestHeader);
+         addr = "localhost:11911"; //TODO FIXME
         switch (communicationMode) {
             case ONEWAY:
                 assert false;
@@ -647,7 +649,7 @@ public class MQClientAPIImpl {
 
         PullMessageResponseHeader responseHeader =
             (PullMessageResponseHeader) response.decodeCommandCustomHeader(PullMessageResponseHeader.class);
-
+        log.info("response header: {}", responseHeader.getSuggestWhichBrokerId());
         return new PullResultExt(pullStatus, responseHeader.getNextBeginOffset(), responseHeader.getMinOffset(),
             responseHeader.getMaxOffset(), null, responseHeader.getSuggestWhichBrokerId(), response.getBody());
     }
@@ -728,11 +730,12 @@ public class MQClientAPIImpl {
         final String consumerGroup,
         final long timeoutMillis) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
         MQBrokerException, InterruptedException {
+
         GetConsumerListByGroupRequestHeader requestHeader = new GetConsumerListByGroupRequestHeader();
         requestHeader.setConsumerGroup(consumerGroup);
         RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_CONSUMER_LIST_BY_GROUP, requestHeader);
-
-        RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
+        String addrS = "localhost:11911";//TODO FIXME
+        RemotingCommand response = this.remotingClient.invokeSync(addrS,
             request, timeoutMillis);
         assert response != null;
         switch (response.getCode()) {
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
index 9ffaed0..984e2cc 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
@@ -537,7 +537,8 @@ public class MQClientInstance {
                             }
 
                             try {
-                                int version = this.mQClientAPIImpl.sendHearbeat(addr, heartbeatData, 3000);
+                                String addrS = "localhost:11911"; //TODO FIXME
+                                int version = this.mQClientAPIImpl.sendHearbeat(addrS, heartbeatData, 3000);
                                 if (!this.brokerVersionTable.containsKey(brokerName)) {
                                     this.brokerVersionTable.put(brokerName, new HashMap<String, Integer>(4));
                                 }
@@ -547,6 +548,7 @@ public class MQClientInstance {
                                     log.info(heartbeatData.toString());
                                 }
                             } catch (Exception e) {
+                                log.error("send heart beat error:{}",e);
                                 if (this.isBrokerInNameServer(addr)) {
                                     log.info("send heart beat to broker[{} {} {}] failed", brokerName, id, addr);
                                 } else {
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
index 7ace9d5..9ada834 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
@@ -247,6 +247,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
 
     /**
      * This method will be removed in the version 5.0.0 and <code>getCheckListener</code> is recommended.
+     *
      * @return
      */
     @Override
@@ -440,13 +441,14 @@ public class DefaultMQProducerImpl implements MQProducerInner {
      * DEFAULT ASYNC -------------------------------------------------------
      */
     public void send(Message msg,
-                     SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException {
+        SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException {
         send(msg, sendCallback, this.defaultMQProducer.getSendMsgTimeout());
     }
 
     /**
-     * It will be removed at 4.4.0 cause for exception handling and the wrong Semantics of timeout.
-     * A new one will be provided in next version
+     * It will be removed at 4.4.0 cause for exception handling and the wrong Semantics of timeout. A new one will be
+     * provided in next version
+     *
      * @param msg
      * @param sendCallback
      * @param timeout the <code>sendCallback</code> will be invoked at most time
@@ -481,7 +483,6 @@ public class DefaultMQProducerImpl implements MQProducerInner {
 
     }
 
-
     public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
         return this.mqFaultStrategy.selectOneMessageQueue(tpInfo, lastBrokerName);
     }
@@ -653,18 +654,17 @@ public class DefaultMQProducerImpl implements MQProducerInner {
     }
 
     private SendResult sendKernelImpl(final Message msg,
-                                      final MessageQueue mq,
-                                      final CommunicationMode communicationMode,
-                                      final SendCallback sendCallback,
-                                      final TopicPublishInfo topicPublishInfo,
-                                      final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+        final MessageQueue mq,
+        final CommunicationMode communicationMode,
+        final SendCallback sendCallback,
+        final TopicPublishInfo topicPublishInfo,
+        final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
         long beginStartTime = System.currentTimeMillis();
         String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
         if (null == brokerAddr) {
             tryToFindTopicPublishInfo(mq.getTopic());
             brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
         }
-
         SendMessageContext context = null;
         if (brokerAddr != null) {
             brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);
@@ -733,6 +733,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
                 requestHeader.setReconsumeTimes(0);
                 requestHeader.setUnitMode(this.isUnitMode());
                 requestHeader.setBatch(msg instanceof MessageBatch);
+                requestHeader.setEnodeAddr(brokerAddr);
                 if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                     String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
                     if (reconsumeTimes != null) {
@@ -943,8 +944,9 @@ public class DefaultMQProducerImpl implements MQProducerInner {
     }
 
     /**
-     * It will be removed at 4.4.0 cause for exception handling and the wrong Semantics of timeout.
-     * A new one will be provided in next version
+     * It will be removed at 4.4.0 cause for exception handling and the wrong Semantics of timeout. A new one will be
+     * provided in next version
+     *
      * @param msg
      * @param mq
      * @param sendCallback
@@ -1064,8 +1066,9 @@ public class DefaultMQProducerImpl implements MQProducerInner {
     }
 
     /**
-     * It will be removed at 4.4.0 cause for exception handling and the wrong Semantics of timeout.
-     * A new one will be provided in next version
+     * It will be removed at 4.4.0 cause for exception handling and the wrong Semantics of timeout. A new one will be
+     * provided in next version
+     *
      * @param msg
      * @param selector
      * @param arg
@@ -1076,7 +1079,8 @@ public class DefaultMQProducerImpl implements MQProducerInner {
      * @throws InterruptedException
      */
     @Deprecated
-    public void send(final Message msg, final MessageQueueSelector selector, final Object arg, final SendCallback sendCallback, final long timeout)
+    public void send(final Message msg, final MessageQueueSelector selector, final Object arg,
+        final SendCallback sendCallback, final long timeout)
         throws MQClientException, RemotingException, InterruptedException {
         final long beginStartTime = System.currentTimeMillis();
         ExecutorService executor = this.getCallbackExecutor();
@@ -1120,7 +1124,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
     }
 
     public TransactionSendResult sendMessageInTransaction(final Message msg,
-                                                          final LocalTransactionExecuter localTransactionExecuter, final Object arg)
+        final LocalTransactionExecuter localTransactionExecuter, final Object arg)
         throws MQClientException {
         TransactionListener transactionListener = getCheckListener();
         if (null == localTransactionExecuter && null == transactionListener) {
@@ -1243,6 +1247,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
     public void setCallbackExecutor(final ExecutorService callbackExecutor) {
         this.mQClientFactory.getMQClientAPIImpl().getRemotingClient().setCallbackExecutor(callbackExecutor);
     }
+
     public ExecutorService getCallbackExecutor() {
         return this.mQClientFactory.getMQClientAPIImpl().getRemotingClient().getCallbackExecutor();
 
diff --git a/common/src/main/java/org/apache/rocketmq/common/constant/LoggerName.java b/common/src/main/java/org/apache/rocketmq/common/constant/LoggerName.java
index fe0ae9f..48295a3 100644
--- a/common/src/main/java/org/apache/rocketmq/common/constant/LoggerName.java
+++ b/common/src/main/java/org/apache/rocketmq/common/constant/LoggerName.java
@@ -37,4 +37,5 @@ public class LoggerName {
     public static final String PROTECTION_LOGGER_NAME = "RocketmqProtection";
     public static final String WATER_MARK_LOGGER_NAME = "RocketmqWaterMark";
     public static final String FILTER_LOGGER_NAME = "RocketmqFilter";
+    public static final String SNODE_LOGGER_NAME = "RocketmqSnode";
 }
diff --git a/common/src/main/java/org/apache/rocketmq/common/namesrv/RegisterSnodeResult.java b/common/src/main/java/org/apache/rocketmq/common/namesrv/RegisterSnodeResult.java
new file mode 100644
index 0000000..6527a55
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/namesrv/RegisterSnodeResult.java
@@ -0,0 +1,20 @@
+package org.apache.rocketmq.common.namesrv;/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+public class RegisterSnodeResult {
+
+}
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java b/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
index 8cf2d46..b36bdb2 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
@@ -167,4 +167,9 @@ public class RequestCode {
     public static final int QUERY_CONSUME_QUEUE = 321;
 
     public static final int QUERY_DATA_VERSION = 322;
+
+    public static final int REGISTER_SNODE = 350;
+
+    public static final int SNODE_PULL_MESSAGE = 351;
+
 }
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/ClusterInfo.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ClusterInfo.java
index 566bf93..9c4d913 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/body/ClusterInfo.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ClusterInfo.java
@@ -20,7 +20,9 @@ package org.apache.rocketmq.common.protocol.body;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
+import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.protocol.route.BrokerData;
 import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
 
@@ -59,6 +61,26 @@ public class ClusterInfo extends RemotingSerializable {
         return addrs.toArray(new String[] {});
     }
 
+    public String[] retrieveAllMasterAddrByCluster(String cluster) {
+        List<String> addrs = new ArrayList<String>();
+        if (clusterAddrTable.containsKey(cluster)) {
+            Set<String> brokerNames = clusterAddrTable.get(cluster);
+            for (String brokerName : brokerNames) {
+                BrokerData brokerData = brokerAddrTable.get(brokerName);
+                if (null != brokerData) {
+                    HashMap<Long, String> brokerAddrs = brokerData.getBrokerAddrs();
+                    for (Map.Entry<Long, String> entry : brokerAddrs.entrySet()) {
+                        if (MixAll.MASTER_ID == entry.getKey()) {
+                            addrs.add(entry.getValue());
+                        }
+                    }
+
+                }
+            }
+        }
+        return addrs.toArray(new String[] {});
+    }
+
     public String[] retrieveAllClusterNames() {
         return clusterAddrTable.keySet().toArray(new String[] {});
     }
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageRequestHeader.java
index 106e89e..8332307 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageRequestHeader.java
@@ -48,6 +48,16 @@ public class PullMessageRequestHeader implements CommandCustomHeader {
     private Long subVersion;
     private String expressionType;
 
+    private String enodeAddr;
+
+    public String getEnodeAddr() {
+        return enodeAddr;
+    }
+
+    public void setEnodeAddr(String enodeAddr) {
+        this.enodeAddr = enodeAddr;
+    }
+
     @Override
     public void checkFields() throws RemotingCommandException {
     }
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeader.java
index 2df31e6..81e0cff 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeader.java
@@ -52,6 +52,8 @@ public class SendMessageRequestHeader implements CommandCustomHeader {
     private boolean batch = false;
     private Integer maxReconsumeTimes;
 
+    private String enodeAddr;
+
     @Override
     public void checkFields() throws RemotingCommandException {
     }
@@ -159,4 +161,12 @@ public class SendMessageRequestHeader implements CommandCustomHeader {
     public void setBatch(boolean batch) {
         this.batch = batch;
     }
+
+    public String getEnodeAddr() {
+        return enodeAddr;
+    }
+
+    public void setEnodeAddr(String enodeAddr) {
+        this.enodeAddr = enodeAddr;
+    }
 }
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeaderV2.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeaderV2.java
index 4e0098b..9602805 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeaderV2.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeaderV2.java
@@ -54,6 +54,8 @@ public class SendMessageRequestHeaderV2 implements CommandCustomHeader {
     @CFNullable
     private boolean m; //batch
 
+    private String n; //enode addr
+
     public static SendMessageRequestHeader createSendMessageRequestHeaderV1(final SendMessageRequestHeaderV2 v2) {
         SendMessageRequestHeader v1 = new SendMessageRequestHeader();
         v1.setProducerGroup(v2.a);
@@ -69,6 +71,7 @@ public class SendMessageRequestHeaderV2 implements CommandCustomHeader {
         v1.setUnitMode(v2.k);
         v1.setMaxReconsumeTimes(v2.l);
         v1.setBatch(v2.m);
+        v1.setEnodeAddr(v2.n);
         return v1;
     }
 
@@ -87,6 +90,7 @@ public class SendMessageRequestHeaderV2 implements CommandCustomHeader {
         v2.k = v1.isUnitMode();
         v2.l = v1.getMaxReconsumeTimes();
         v2.m = v1.isBatch();
+        v2.n = v1.getEnodeAddr();
         return v2;
     }
 
@@ -197,4 +201,12 @@ public class SendMessageRequestHeaderV2 implements CommandCustomHeader {
     public void setM(boolean m) {
         this.m = m;
     }
+
+    public String getN() {
+        return n;
+    }
+
+    public void setN(String n) {
+        this.n = n;
+    }
 }
\ No newline at end of file
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/RegisterSnodeRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/RegisterSnodeRequestHeader.java
new file mode 100644
index 0000000..7752cd7
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/RegisterSnodeRequestHeader.java
@@ -0,0 +1,61 @@
+package org.apache.rocketmq.common.protocol.header.namesrv;/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.annotation.CFNotNull;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+
+public class RegisterSnodeRequestHeader implements CommandCustomHeader {
+
+    @CFNotNull
+    private String snodeName;
+
+    @CFNotNull
+    private String snodeAddr;
+
+    @CFNotNull
+    private String clusterName;
+
+    @Override
+    public void checkFields() throws RemotingCommandException {
+
+    }
+
+    public String getClusterName() {
+        return clusterName;
+    }
+
+    public void setClusterName(String clusterName) {
+        this.clusterName = clusterName;
+    }
+
+    public String getSnodeName() {
+        return snodeName;
+    }
+
+    public void setSnodeName(String snodeName) {
+        this.snodeName = snodeName;
+    }
+
+    public String getSnodeAddr() {
+        return snodeAddr;
+    }
+
+    public void setSnodeAddr(String snodeAddr) {
+        this.snodeAddr = snodeAddr;
+    }
+}
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/RegisterSnodeResponseHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/RegisterSnodeResponseHeader.java
new file mode 100644
index 0000000..4d4f15f
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/RegisterSnodeResponseHeader.java
@@ -0,0 +1,20 @@
+package org.apache.rocketmq.common.protocol.header.namesrv;/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+public class RegisterSnodeResponseHeader {
+
+}
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/heartbeat/HeartbeatData.java b/common/src/main/java/org/apache/rocketmq/common/protocol/heartbeat/HeartbeatData.java
index 03151f5..eb32749 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/heartbeat/HeartbeatData.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/heartbeat/HeartbeatData.java
@@ -28,6 +28,15 @@ public class HeartbeatData extends RemotingSerializable {
     private String clientID;
     private Set<ProducerData> producerDataSet = new HashSet<ProducerData>();
     private Set<ConsumerData> consumerDataSet = new HashSet<ConsumerData>();
+    private SnodeData snodeData;
+
+    public SnodeData getSnodeData() {
+        return snodeData;
+    }
+
+    public void setSnodeData(SnodeData snodeData) {
+        this.snodeData = snodeData;
+    }
 
     public String getClientID() {
         return clientID;
@@ -53,9 +62,12 @@ public class HeartbeatData extends RemotingSerializable {
         this.consumerDataSet = consumerDataSet;
     }
 
-    @Override
-    public String toString() {
-        return "HeartbeatData [clientID=" + clientID + ", producerDataSet=" + producerDataSet
-            + ", consumerDataSet=" + consumerDataSet + "]";
+    @Override public String toString() {
+        return "HeartbeatData{" +
+            "clientID='" + clientID + '\'' +
+            ", producerDataSet=" + producerDataSet +
+            ", consumerDataSet=" + consumerDataSet +
+            ", snodeData='" + snodeData + '\'' +
+            '}';
     }
 }
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/heartbeat/SnodeData.java b/common/src/main/java/org/apache/rocketmq/common/protocol/heartbeat/SnodeData.java
new file mode 100644
index 0000000..e4cb9b1
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/heartbeat/SnodeData.java
@@ -0,0 +1,34 @@
+package org.apache.rocketmq.common.protocol.heartbeat;/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+public class SnodeData {
+    private String snodeName;
+
+    public String getSnodeName() {
+        return snodeName;
+    }
+
+    public void setSnodeName(String snodeName) {
+        this.snodeName = snodeName;
+    }
+
+    @Override public String toString() {
+        return "SnodeData{" +
+            "snodeName='" + snodeName + '\'' +
+            '}';
+    }
+}
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/route/SnodeData.java b/common/src/main/java/org/apache/rocketmq/common/protocol/route/SnodeData.java
new file mode 100644
index 0000000..db76ad2
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/route/SnodeData.java
@@ -0,0 +1,57 @@
+package org.apache.rocketmq.common.protocol.route;/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+public class SnodeData {
+    private String snodeName;
+
+    private String addr;
+
+    private String clusterName;
+
+    public String getClusterName() {
+        return clusterName;
+    }
+
+    public void setClusterName(String clusterName) {
+        this.clusterName = clusterName;
+    }
+
+    public String getSnodeName() {
+        return snodeName;
+    }
+
+    public void setSnodeName(String snodeName) {
+        this.snodeName = snodeName;
+    }
+
+    public String getAddr() {
+        return addr;
+    }
+
+    public void setAddr(String addr) {
+        this.addr = addr;
+    }
+
+    @Override
+    public String toString() {
+        return "SnodeData{" +
+            "snodeName='" + snodeName + '\'' +
+            ", addr='" + addr + '\'' +
+            ", clusterName='" + clusterName + '\'' +
+            '}';
+    }
+}
diff --git a/common/src/main/java/org/apache/rocketmq/common/utils/PositiveAtomicCounter.java b/common/src/main/java/org/apache/rocketmq/common/utils/PositiveAtomicCounter.java
new file mode 100644
index 0000000..105b88c
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/utils/PositiveAtomicCounter.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.utils;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class PositiveAtomicCounter {
+    private static final int MASK = 0x7FFFFFFF;
+    private final AtomicInteger atom;
+
+
+    public PositiveAtomicCounter() {
+        atom = new AtomicInteger(0);
+    }
+
+
+    public final int incrementAndGet() {
+        final int rt = atom.incrementAndGet();
+        return rt & MASK;
+    }
+
+
+    public int intValue() {
+        return atom.intValue();
+    }
+}
diff --git a/distribution/conf/logback_snode.xml b/distribution/conf/logback_snode.xml
new file mode 100644
index 0000000..554f589
--- /dev/null
+++ b/distribution/conf/logback_snode.xml
@@ -0,0 +1,92 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+
+<configuration>
+    <appender name="DefaultAppender"
+              class="ch.qos.logback.core.rolling.RollingFileAppender">
+        <file>${user.home}/logs/rocketmqlogs/snode_default.log</file>
+        <append>true</append>
+        <rollingPolicy class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">
+            <fileNamePattern>${user.home}/logs/rocketmqlogs/otherdays/snode_default.%i.log.gz</fileNamePattern>
+            <minIndex>1</minIndex>
+            <maxIndex>5</maxIndex>
+        </rollingPolicy>
+        <triggeringPolicy
+            class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
+            <maxFileSize>100MB</maxFileSize>
+        </triggeringPolicy>
+        <encoder>
+            <pattern>%d{yyy-MM-dd HH:mm:ss,GMT+8} %p %t - %m%n</pattern>
+            <charset class="java.nio.charset.Charset">UTF-8</charset>
+        </encoder>
+    </appender>
+
+    <appender name="RocketmqSnodeAppender_inner"
+              class="ch.qos.logback.core.rolling.RollingFileAppender">
+        <file>${user.home}/logs/rocketmqlogs/snode.log</file>
+        <append>true</append>
+        <rollingPolicy class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">
+            <fileNamePattern>${user.home}/logs/rocketmqlogs/otherdays/snode.%i.log.gz</fileNamePattern>
+            <minIndex>1</minIndex>
+            <maxIndex>5</maxIndex>
+        </rollingPolicy>
+        <triggeringPolicy
+            class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
+            <maxFileSize>100MB</maxFileSize>
+        </triggeringPolicy>
+        <encoder>
+            <pattern>%d{yyy-MM-dd HH:mm:ss,GMT+8} %p %t - %m%n</pattern>
+            <charset class="java.nio.charset.Charset">UTF-8</charset>
+        </encoder>
+    </appender>
+    <appender name="RocketmqSnodeAppender" class="ch.qos.logback.classic.AsyncAppender">
+        <appender-ref ref="RocketmqSnodeAppender_inner"/>
+        <discardingThreshold>0</discardingThreshold>
+    </appender>
+
+    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+        <append>true</append>
+        <encoder>
+            <pattern>%d{yyy-MM-dd HH\:mm\:ss,SSS} %p %t - %m%n</pattern>
+            <charset class="java.nio.charset.Charset">UTF-8</charset>
+        </encoder>
+    </appender>
+
+    <logger name="RocketmqSnode" additivity="false">
+        <level value="INFO"/>
+        <appender-ref ref="RocketmqSnodeAppender"/>
+        <appender-ref ref="STDOUT"/>
+    </logger>
+
+    <logger name="RocketmqCommon" additivity="false">
+        <level value="INFO"/>
+        <appender-ref ref="RocketmqSnodeAppender"/>
+    </logger>
+
+    <logger name="RocketmqRemoting" additivity="false">
+        <level value="INFO"/>
+        <appender-ref ref="RocketmqSnodeAppender"/>
+        <appender-ref ref="STDOUT"/>
+    </logger>
+
+    <root>
+        <level value="debug"/>
+        <appender-ref ref="STDOUT"/>
+        <appender-ref ref="DefaultAppender"/>
+    </root>
+</configuration>
diff --git a/example/src/main/java/org/apache/rocketmq/example/quickstart/Producer.java b/example/src/main/java/org/apache/rocketmq/example/quickstart/Producer.java
index 53a1d4d..b2609c8 100644
--- a/example/src/main/java/org/apache/rocketmq/example/quickstart/Producer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/quickstart/Producer.java
@@ -20,6 +20,7 @@ import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.client.producer.DefaultMQProducer;
 import org.apache.rocketmq.client.producer.SendResult;
 import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.utils.ThreadUtils;
 import org.apache.rocketmq.remoting.common.RemotingHelper;
 
 /**
@@ -50,7 +51,7 @@ public class Producer {
          */
         producer.start();
 
-        for (int i = 0; i < 1000; i++) {
+        for (int i = 0; i < 10; i++) {
             try {
 
                 /*
@@ -76,6 +77,7 @@ public class Producer {
         /*
          * Shut down once the producer instance is not longer in use.
          */
+        Thread.sleep(100000000000L);
         producer.shutdown();
     }
 }
diff --git a/example/src/main/java/org/apache/rocketmq/example/simple/Producer.java b/example/src/main/java/org/apache/rocketmq/example/simple/Producer.java
index 7b504dd..84b7872 100644
--- a/example/src/main/java/org/apache/rocketmq/example/simple/Producer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/simple/Producer.java
@@ -43,7 +43,6 @@ public class Producer {
             } catch (Exception e) {
                 e.printStackTrace();
             }
-
         producer.shutdown();
     }
 }
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java
index dc32445..3b12d49 100644
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java
@@ -27,6 +27,7 @@ import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.help.FAQUrl;
+import org.apache.rocketmq.common.protocol.header.namesrv.RegisterSnodeRequestHeader;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
 import org.apache.rocketmq.common.namesrv.NamesrvUtil;
@@ -77,7 +78,6 @@ public class DefaultRequestProcessor implements NettyRequestProcessor {
                 request);
         }
 
-
         switch (request.getCode()) {
             case RequestCode.PUT_KV_CONFIG:
                 return this.putKVConfig(ctx, request);
@@ -122,12 +122,26 @@ public class DefaultRequestProcessor implements NettyRequestProcessor {
                 return this.updateConfig(ctx, request);
             case RequestCode.GET_NAMESRV_CONFIG:
                 return this.getConfig(ctx, request);
+            case RequestCode.REGISTER_SNODE:
+                return this.registerSnode(ctx, request);
             default:
                 break;
         }
         return null;
     }
 
+    public RemotingCommand registerSnode(ChannelHandlerContext ctx,
+        RemotingCommand request) throws RemotingCommandException {
+        final RemotingCommand response = RemotingCommand.createResponseCommand(null);
+        final RegisterSnodeRequestHeader requestHeader =
+            (RegisterSnodeRequestHeader) request.decodeCommandCustomHeader(RegisterSnodeRequestHeader.class);
+        this.namesrvController.getRouteInfoManager().registerSnode(
+            requestHeader.getClusterName(),
+            requestHeader.getSnodeName(),
+            requestHeader.getSnodeAddr());
+        return response;
+    }
+
     @Override
     public boolean rejectRequest() {
         return false;
@@ -280,7 +294,7 @@ public class DefaultRequestProcessor implements NettyRequestProcessor {
         final RegisterBrokerResponseHeader responseHeader = (RegisterBrokerResponseHeader) response.readCustomHeader();
         final RegisterBrokerRequestHeader requestHeader =
             (RegisterBrokerRequestHeader) request.decodeCommandCustomHeader(RegisterBrokerRequestHeader.class);
-        log.info("requestHeader:  " + requestHeader );
+        log.info("requestHeader:  " + requestHeader);
         if (!checksum(ctx, request, requestHeader)) {
             response.setCode(ResponseCode.SYSTEM_ERROR);
             response.setRemark("crc32 not match");
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
index 00962ef..beef528 100644
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
@@ -33,6 +33,7 @@ import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.TopicConfig;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.constant.PermName;
+import org.apache.rocketmq.common.protocol.route.SnodeData;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
 import org.apache.rocketmq.common.namesrv.RegisterBrokerResult;
@@ -54,6 +55,8 @@ public class RouteInfoManager {
     private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
     private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
     private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
+    private final HashMap<String/* snodeName*/, SnodeData> snodeTable;
+    private final HashMap<String/* clusterName*/, Set<String/*snodeName*/>> snodeCluster;
 
     public RouteInfoManager() {
         this.topicQueueTable = new HashMap<String, List<QueueData>>(1024);
@@ -61,6 +64,8 @@ public class RouteInfoManager {
         this.clusterAddrTable = new HashMap<String, Set<String>>(32);
         this.brokerLiveTable = new HashMap<String, BrokerLiveInfo>(256);
         this.filterServerTable = new HashMap<String, List<String>>(256);
+        this.snodeTable = new HashMap<>(256);
+        this.snodeCluster = new HashMap<>(256);
     }
 
     public byte[] getAllClusterInfo() {
@@ -99,6 +104,32 @@ public class RouteInfoManager {
         return topicList.encode();
     }
 
+    public void registerSnode(
+        final String clusterName,
+        final String snodeName,
+        final String snodeAddr) {
+        try {
+            this.lock.writeLock().lockInterruptibly();
+            Set<String> snodeSet = this.snodeCluster.get(clusterName);
+            if (snodeSet == null) {
+                snodeSet = new HashSet<>();
+                snodeSet.add(snodeName);
+                this.snodeCluster.put(clusterName, snodeSet);
+            } else {
+                snodeSet.add(snodeName);
+            }
+            SnodeData snodeData = new SnodeData();
+            snodeData.setAddr(snodeAddr);
+            snodeData.setSnodeName(snodeName);
+            snodeData.setClusterName(clusterName);
+            snodeTable.put(snodeName, snodeData);
+        } catch (Exception ex) {
+            log.error("Register snode error", ex);
+        } finally {
+            this.lock.writeLock().unlock();
+        }
+    }
+
     public RegisterBrokerResult registerBroker(
         final String clusterName,
         final String brokerAddr,
diff --git a/pom.xml b/pom.xml
index 0a8a41e..47a7b68 100644
--- a/pom.xml
+++ b/pom.xml
@@ -125,6 +125,7 @@
         <module>distribution</module>
         <module>openmessaging</module>
         <module>logging</module>
+        <module>rocketmq-snode</module>
     </modules>
 
     <build>
diff --git a/rocketmq-snode/pom.xml b/rocketmq-snode/pom.xml
new file mode 100644
index 0000000..d3df574
--- /dev/null
+++ b/rocketmq-snode/pom.xml
@@ -0,0 +1,93 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <groupId>org.apache.rocketmq</groupId>
+        <artifactId>rocketmq-all</artifactId>
+        <version>4.4.0-SNAPSHOT</version>
+    </parent>
+
+    <modelVersion>4.0.0</modelVersion>
+    <packaging>jar</packaging>
+    <artifactId>rocketmq-snode</artifactId>
+    <name>rocketmq-snode ${project.version}</name>
+
+    <dependencies>
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>rocketmq-common</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>rocketmq-store</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>rocketmq-remoting</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>rocketmq-client</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>rocketmq-srvutil</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>rocketmq-filter</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>ch.qos.logback</groupId>
+            <artifactId>logback-classic</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>ch.qos.logback</groupId>
+            <artifactId>logback-core</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.alibaba</groupId>
+            <artifactId>fastjson</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.javassist</groupId>
+            <artifactId>javassist</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+        </dependency>
+        <!--<dependency>-->
+            <!--<groupId>org.apache.rocketmq</groupId>-->
+            <!--<artifactId>rocketmq-broker</artifactId>-->
+        <!--</dependency>-->
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <artifactId>maven-surefire-plugin</artifactId>
+                <version>2.19.1</version>
+                <configuration>
+                    <forkCount>1</forkCount>
+                    <reuseForks>false</reuseForks>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+</project>
diff --git a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java
new file mode 100644
index 0000000..cb19bc9
--- /dev/null
+++ b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java
@@ -0,0 +1,205 @@
+package org.apache.rocketmq.snode;/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.rocketmq.common.ThreadFactoryImpl;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.protocol.RequestCode;
+import org.apache.rocketmq.common.utils.ThreadUtils;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.RemotingServer;
+import org.apache.rocketmq.remoting.RemotingServerFactory;
+import org.apache.rocketmq.remoting.netty.NettyClientConfig;
+import org.apache.rocketmq.remoting.netty.NettyServerConfig;
+import org.apache.rocketmq.snode.client.ClientHousekeepingService;
+import org.apache.rocketmq.snode.client.ConsumerIdsChangeListener;
+import org.apache.rocketmq.snode.client.ConsumerManager;
+import org.apache.rocketmq.snode.client.DefaultConsumerIdsChangeListener;
+import org.apache.rocketmq.snode.client.ProducerManager;
+import org.apache.rocketmq.snode.client.SubscriptionGroupManager;
+import org.apache.rocketmq.snode.config.SnodeConfig;
+import org.apache.rocketmq.snode.processor.ConsumerManageProcessor;
+import org.apache.rocketmq.snode.processor.HearbeatProcessor;
+import org.apache.rocketmq.snode.processor.PullMessageProcessor;
+import org.apache.rocketmq.snode.processor.SendMessageProcessor;
+import org.apache.rocketmq.snode.service.ScheduledService;
+import org.apache.rocketmq.snode.service.SnodeOuterService;
+import org.apache.rocketmq.snode.service.impl.ScheduledServiceImpl;
+import org.apache.rocketmq.snode.service.impl.SnodeOuterServiceImpl;
+
+public class SnodeController {
+    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME);
+
+    private final SnodeConfig snodeConfig;
+    private final NettyServerConfig nettyServerConfig;
+    private final NettyClientConfig nettyClientConfig;
+    private RemotingServer snodeServer;
+    private ExecutorService sendMessageExcutor;
+    private ExecutorService heartbeatExecutor;
+    private ExecutorService pullMessageExcutor;
+    private SnodeOuterService snodeOuterService;
+    private ExecutorService consumerManagerExcutor;
+    private ScheduledService scheduledService;
+    private ProducerManager producerManager;
+    private ConsumerManager consumerManager;
+    private ClientHousekeepingService clientHousekeepingService;
+    private SubscriptionGroupManager subscriptionGroupManager;
+
+    private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
+        "SnodeControllerScheduledThread"));
+
+    public SnodeController(NettyServerConfig nettyServerConfig,
+        NettyClientConfig nettyClientConfig,
+        SnodeConfig snodeConfig) {
+        this.nettyClientConfig = nettyClientConfig;
+        this.nettyServerConfig = nettyServerConfig;
+        this.snodeConfig = snodeConfig;
+        this.snodeOuterService = SnodeOuterServiceImpl.getInstance(this);
+        this.scheduledService = new ScheduledServiceImpl(this.snodeOuterService, this.snodeConfig);
+        this.sendMessageExcutor = ThreadUtils.newThreadPoolExecutor(
+            snodeConfig.getSnodeSendMessageMinPoolSize(),
+            snodeConfig.getSnodeSendMessageMaxPoolSize(),
+            3000,
+            TimeUnit.MILLISECONDS,
+            new ArrayBlockingQueue<Runnable>(snodeConfig.getSnodeSendThreadPoolQueueCapacity()),
+            "SnodeSendMessageThread",
+            false);
+
+        this.pullMessageExcutor = ThreadUtils.newThreadPoolExecutor(
+            snodeConfig.getSnodeSendMessageMinPoolSize(),
+            snodeConfig.getSnodeSendMessageMaxPoolSize(),
+            3000,
+            TimeUnit.MILLISECONDS,
+            new ArrayBlockingQueue<Runnable>(snodeConfig.getSnodeSendThreadPoolQueueCapacity()),
+            "SnodePullMessageThread",
+            false);
+
+        this.heartbeatExecutor = ThreadUtils.newThreadPoolExecutor(
+            snodeConfig.getSnodeHeartBeatCorePoolSize(),
+            snodeConfig.getSnodeHeartBeatMaxPoolSize(),
+            1000 * 60,
+            TimeUnit.MILLISECONDS,
+            new ArrayBlockingQueue<Runnable>(snodeConfig.getSnodeHeartBeatThreadPoolQueueCapacity()),
+            "SnodeHeartbeatThread",
+            true);
+
+        this.consumerManagerExcutor = ThreadUtils.newThreadPoolExecutor(
+            snodeConfig.getSnodeSendMessageMinPoolSize(),
+            snodeConfig.getSnodeSendMessageMaxPoolSize(),
+            3000,
+            TimeUnit.MILLISECONDS,
+            new ArrayBlockingQueue<Runnable>(snodeConfig.getSnodeSendThreadPoolQueueCapacity()),
+            "SnodePullMessageThread",
+            false);
+
+        if (this.snodeConfig.getNamesrvAddr() != null) {
+            this.snodeOuterService.updateNameServerAddressList(this.snodeConfig.getNamesrvAddr());
+            log.info("Set user specified name server address: {}", this.snodeConfig.getNamesrvAddr());
+        }
+
+        this.producerManager = new ProducerManager();
+
+        ConsumerIdsChangeListener consumerIdsChangeListener = new DefaultConsumerIdsChangeListener(this);
+        this.consumerManager = new ConsumerManager(consumerIdsChangeListener);
+        this.subscriptionGroupManager = new SubscriptionGroupManager(this);
+        this.clientHousekeepingService = new ClientHousekeepingService(this.producerManager, this.consumerManager);
+    }
+
+    public SnodeConfig getSnodeConfig() {
+        return snodeConfig;
+    }
+
+    public boolean initialize() {
+        this.snodeServer = RemotingServerFactory.createInstance().init(this.nettyServerConfig, null);
+        this.registerProcessor();
+        return true;
+    }
+
+    public void registerProcessor() {
+        snodeServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, new SendMessageProcessor(this.snodeOuterService), sendMessageExcutor);
+        snodeServer.registerProcessor(RequestCode.HEART_BEAT, new HearbeatProcessor(this), heartbeatExecutor);
+        snodeServer.registerProcessor(RequestCode.SNODE_PULL_MESSAGE, new PullMessageProcessor(this.snodeOuterService), pullMessageExcutor);
+        snodeServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, new ConsumerManageProcessor(this), consumerManagerExcutor);
+    }
+
+    public void start() {
+        initialize();
+        this.snodeServer.start();
+        this.snodeOuterService.start();
+        this.scheduledService.startScheduleTask();
+        this.clientHousekeepingService.start(this.snodeConfig.getHouseKeepingInterval());
+    }
+
+    public void shutdown() {
+        this.sendMessageExcutor.shutdown();
+        this.pullMessageExcutor.shutdown();
+        this.heartbeatExecutor.shutdown();
+        this.scheduledExecutorService.shutdown();
+        this.snodeOuterService.shutdown();
+        this.scheduledService.shutdown();
+        this.clientHousekeepingService.shutdown();
+    }
+
+    public ProducerManager getProducerManager() {
+        return producerManager;
+    }
+
+    public void setProducerManager(ProducerManager producerManager) {
+        this.producerManager = producerManager;
+    }
+
+    public RemotingServer getSnodeServer() {
+        return snodeServer;
+    }
+
+    public void setSnodeServer(RemotingServer snodeServer) {
+        this.snodeServer = snodeServer;
+    }
+
+    public ConsumerManager getConsumerManager() {
+        return consumerManager;
+    }
+
+    public void setConsumerManager(ConsumerManager consumerManager) {
+        this.consumerManager = consumerManager;
+    }
+
+    public SubscriptionGroupManager getSubscriptionGroupManager() {
+        return subscriptionGroupManager;
+    }
+
+    public void setSubscriptionGroupManager(SubscriptionGroupManager subscriptionGroupManager) {
+        this.subscriptionGroupManager = subscriptionGroupManager;
+    }
+
+    public NettyClientConfig getNettyClientConfig() {
+        return nettyClientConfig;
+    }
+
+    public SnodeOuterService getSnodeOuterService() {
+        return snodeOuterService;
+    }
+
+    public void setSnodeOuterService(SnodeOuterService snodeOuterService) {
+        this.snodeOuterService = snodeOuterService;
+    }
+}
diff --git a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/SnodeStartup.java b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/SnodeStartup.java
new file mode 100644
index 0000000..ddbeef8
--- /dev/null
+++ b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/SnodeStartup.java
@@ -0,0 +1,144 @@
+package org.apache.rocketmq.snode;/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import ch.qos.logback.classic.LoggerContext;
+import ch.qos.logback.classic.joran.JoranConfigurator;
+import ch.qos.logback.core.joran.spi.JoranException;
+import java.io.BufferedInputStream;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Properties;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.PosixParser;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.common.TlsMode;
+import org.apache.rocketmq.remoting.netty.NettyClientConfig;
+import org.apache.rocketmq.remoting.netty.NettyServerConfig;
+import org.apache.rocketmq.remoting.netty.TlsSystemConfig;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.snode.config.SnodeConfig;
+import org.apache.rocketmq.srvutil.ServerUtil;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.rocketmq.remoting.netty.TlsSystemConfig.TLS_ENABLE;
+
+public class SnodeStartup {
+    private static InternalLogger log;
+    public static Properties properties = null;
+    public static CommandLine commandLine = null;
+    public static String configFile = null;
+
+    public static void main(String[] args) throws IOException, JoranException {
+        startup(createSnodeController(args));
+    }
+
+    public static SnodeController startup(SnodeController controller) {
+        try {
+            controller.start();
+
+            String tip = "The snode[" + controller.getSnodeConfig().getSnodeName() + ", "
+                + controller.getSnodeConfig().getSnodeAddr() + "] boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
+
+            if (null != controller.getSnodeConfig().getNamesrvAddr()) {
+                tip += " and name server is " + controller.getSnodeConfig().getNamesrvAddr();
+            }
+            log.info(tip);
+            System.out.printf("%s%n", tip);
+            return controller;
+        } catch (Throwable e) {
+            e.printStackTrace();
+            System.exit(-1);
+        }
+        return null;
+    }
+
+    public static SnodeController createSnodeController(String[] args) throws IOException, JoranException {
+        Options options = ServerUtil.buildCommandlineOptions(new Options());
+        commandLine = ServerUtil.parseCmdLine("snode", args, buildCommandlineOptions(options),
+            new PosixParser());
+        if (null == commandLine) {
+            System.exit(-1);
+        }
+
+        final SnodeConfig snodeConfig = new SnodeConfig();
+        final NettyServerConfig nettyServerConfig = new NettyServerConfig();
+        final NettyClientConfig nettyClientConfig = new NettyClientConfig();
+
+        nettyServerConfig.setListenPort(snodeConfig.getListenPort());
+
+        nettyClientConfig.setUseTLS(Boolean.parseBoolean(System.getProperty(TLS_ENABLE,
+            String.valueOf(TlsSystemConfig.tlsMode == TlsMode.ENFORCING))));
+
+        if (commandLine.hasOption('c')) {
+            String file = commandLine.getOptionValue('c');
+            if (file != null) {
+                configFile = file;
+                InputStream in = new BufferedInputStream(new FileInputStream(file));
+                properties = new Properties();
+                properties.load(in);
+                MixAll.properties2Object(properties, snodeConfig);
+                MixAll.properties2Object(properties, nettyServerConfig);
+                MixAll.properties2Object(properties, nettyClientConfig);
+                in.close();
+            }
+        }
+
+        LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
+        JoranConfigurator configurator = new JoranConfigurator();
+        configurator.setContext(lc);
+        lc.reset();
+        configurator.doConfigure(snodeConfig.getRocketmqHome() + "/conf/logback_snode.xml");
+        log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME);
+
+        MixAll.printObjectProperties(log, snodeConfig);
+        MixAll.printObjectProperties(log, nettyClientConfig);
+        MixAll.printObjectProperties(log, nettyServerConfig);
+        final SnodeController snodeController = new SnodeController(
+            nettyServerConfig,
+            nettyClientConfig,
+            snodeConfig);
+
+        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
+            private volatile boolean hasShutdown = false;
+
+            @Override
+            public void run() {
+                synchronized (this) {
+                    if (!this.hasShutdown) {
+                        this.hasShutdown = true;
+                        snodeController.shutdown();
+                    }
+                }
+            }
+        }));
+        return snodeController;
+    }
+
+    private static Options buildCommandlineOptions(final Options options) {
+        Option opt = new Option("c", "configFile", true, "Broker config properties file");
+        opt.setRequired(false);
+        options.addOption(opt);
+        return options;
+    }
+}
+
diff --git a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/client/ClientChannelInfo.java b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/client/ClientChannelInfo.java
new file mode 100644
index 0000000..16d8cde
--- /dev/null
+++ b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/client/ClientChannelInfo.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.snode.client;
+
+import io.netty.channel.Channel;
+import org.apache.rocketmq.remoting.serialize.LanguageCode;
+
+public class ClientChannelInfo {
+    private final Channel channel;
+    private final String clientId;
+    private final LanguageCode language;
+    private final int version;
+    private volatile long lastUpdateTimestamp = System.currentTimeMillis();
+
+    public ClientChannelInfo(Channel channel) {
+        this(channel, null, null, 0);
+    }
+
+    public ClientChannelInfo(Channel channel, String clientId, LanguageCode language, int version) {
+        this.channel = channel;
+        this.clientId = clientId;
+        this.language = language;
+        this.version = version;
+    }
+
+    public Channel getChannel() {
+        return channel;
+    }
+
+    public String getClientId() {
+        return clientId;
+    }
+
+    public LanguageCode getLanguage() {
+        return language;
+    }
+
+    public int getVersion() {
+        return version;
+    }
+
+    public long getLastUpdateTimestamp() {
+        return lastUpdateTimestamp;
+    }
+
+    public void setLastUpdateTimestamp(long lastUpdateTimestamp) {
+        this.lastUpdateTimestamp = lastUpdateTimestamp;
+    }
+
+    @Override
+    public int hashCode() {
+        final int prime = 31;
+        int result = 1;
+        result = prime * result + ((channel == null) ? 0 : channel.hashCode());
+        result = prime * result + ((clientId == null) ? 0 : clientId.hashCode());
+        result = prime * result + ((language == null) ? 0 : language.hashCode());
+        result = prime * result + (int) (lastUpdateTimestamp ^ (lastUpdateTimestamp >>> 32));
+        result = prime * result + version;
+        return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj)
+            return true;
+        if (obj == null)
+            return false;
+        if (getClass() != obj.getClass())
+            return false;
+        ClientChannelInfo other = (ClientChannelInfo) obj;
+        if (channel == null) {
+            if (other.channel != null)
+                return false;
+        } else if (this.channel != other.channel) {
+            return false;
+        }
+
+        return true;
+    }
+
+    @Override
+    public String toString() {
+        return "ClientChannelInfo [channel=" + channel + ", clientId=" + clientId + ", language=" + language
+            + ", version=" + version + ", lastUpdateTimestamp=" + lastUpdateTimestamp + "]";
+    }
+}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ClientHousekeepingService.java b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/client/ClientHousekeepingService.java
similarity index 63%
copy from broker/src/main/java/org/apache/rocketmq/broker/client/ClientHousekeepingService.java
copy to rocketmq-snode/src/main/java/org/apache/rocketmq/snode/client/ClientHousekeepingService.java
index d536db5..e71ea0a 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/client/ClientHousekeepingService.java
+++ b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/client/ClientHousekeepingService.java
@@ -14,13 +14,12 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.broker.client;
+package org.apache.rocketmq.snode.client;
 
 import io.netty.channel.Channel;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
-import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.common.ThreadFactoryImpl;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.logging.InternalLogger;
@@ -29,17 +28,18 @@ import org.apache.rocketmq.remoting.ChannelEventListener;
 
 public class ClientHousekeepingService implements ChannelEventListener {
     private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
-    private final BrokerController brokerController;
+    private final ProducerManager producerManager;
+    private final ConsumerManager consumerManager;
 
     private ScheduledExecutorService scheduledExecutorService = Executors
         .newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ClientHousekeepingScheduledThread"));
 
-    public ClientHousekeepingService(final BrokerController brokerController) {
-        this.brokerController = brokerController;
+    public ClientHousekeepingService(final ProducerManager producerManager, final ConsumerManager consumerManager) {
+        this.producerManager = producerManager;
+        this.consumerManager = consumerManager;
     }
 
-    public void start() {
-
+    public void start(long interval) {
         this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
             @Override
             public void run() {
@@ -49,13 +49,12 @@ public class ClientHousekeepingService implements ChannelEventListener {
                     log.error("Error occurred when scan not active client channels.", e);
                 }
             }
-        }, 1000 * 10, 1000 * 10, TimeUnit.MILLISECONDS);
+        }, 1000 * 10, interval, TimeUnit.MILLISECONDS);
     }
 
     private void scanExceptionChannel() {
-        this.brokerController.getProducerManager().scanNotActiveChannel();
-        this.brokerController.getConsumerManager().scanNotActiveChannel();
-        this.brokerController.getFilterServerManager().scanNotActiveChannel();
+        this.producerManager.scanNotActiveChannel();
+        //this.consumerManager.scanNotActiveChannel();
     }
 
     public void shutdown() {
@@ -69,22 +68,19 @@ public class ClientHousekeepingService implements ChannelEventListener {
 
     @Override
     public void onChannelClose(String remoteAddr, Channel channel) {
-        this.brokerController.getProducerManager().doChannelCloseEvent(remoteAddr, channel);
-        this.brokerController.getConsumerManager().doChannelCloseEvent(remoteAddr, channel);
-        this.brokerController.getFilterServerManager().doChannelCloseEvent(remoteAddr, channel);
+        this.producerManager.doChannelCloseEvent(remoteAddr, channel);
+        this.producerManager.doChannelCloseEvent(remoteAddr, channel);
     }
 
     @Override
     public void onChannelException(String remoteAddr, Channel channel) {
-        this.brokerController.getProducerManager().doChannelCloseEvent(remoteAddr, channel);
-        this.brokerController.getConsumerManager().doChannelCloseEvent(remoteAddr, channel);
-        this.brokerController.getFilterServerManager().doChannelCloseEvent(remoteAddr, channel);
+        this.producerManager.doChannelCloseEvent(remoteAddr, channel);
+        this.consumerManager.doChannelCloseEvent(remoteAddr, channel);
     }
 
     @Override
     public void onChannelIdle(String remoteAddr, Channel channel) {
-        this.brokerController.getProducerManager().doChannelCloseEvent(remoteAddr, channel);
-        this.brokerController.getConsumerManager().doChannelCloseEvent(remoteAddr, channel);
-        this.brokerController.getFilterServerManager().doChannelCloseEvent(remoteAddr, channel);
+        this.producerManager.doChannelCloseEvent(remoteAddr, channel);
+        this.consumerManager.doChannelCloseEvent(remoteAddr, channel);
     }
 }
diff --git a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/client/ConsumerGroupEvent.java b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/client/ConsumerGroupEvent.java
new file mode 100644
index 0000000..0ebcf17
--- /dev/null
+++ b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/client/ConsumerGroupEvent.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.snode.client;
+
+public enum ConsumerGroupEvent {
+
+    /**
+     * Some consumers in the group are changed.
+     */
+    CHANGE,
+    /**
+     * The group of consumer is unregistered.
+     */
+    UNREGISTER,
+    /**
+     * The group of consumer is registered.
+     */
+    REGISTER
+}
diff --git a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/client/ConsumerGroupInfo.java b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/client/ConsumerGroupInfo.java
new file mode 100644
index 0000000..9b366a5
--- /dev/null
+++ b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/client/ConsumerGroupInfo.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.snode.client;
+
+import io.netty.channel.Channel;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
+import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
+import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
+import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+
+public class ConsumerGroupInfo {
+    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
+    private final String groupName;
+    private final ConcurrentMap<String/* Topic */, SubscriptionData> subscriptionTable =
+        new ConcurrentHashMap<String, SubscriptionData>();
+    private final ConcurrentMap<Channel, ClientChannelInfo> channelInfoTable =
+        new ConcurrentHashMap<Channel, ClientChannelInfo>(16);
+    private volatile ConsumeType consumeType;
+    private volatile MessageModel messageModel;
+    private volatile ConsumeFromWhere consumeFromWhere;
+    private volatile long lastUpdateTimestamp = System.currentTimeMillis();
+
+    public ConsumerGroupInfo(String groupName, ConsumeType consumeType, MessageModel messageModel,
+        ConsumeFromWhere consumeFromWhere) {
+        this.groupName = groupName;
+        this.consumeType = consumeType;
+        this.messageModel = messageModel;
+        this.consumeFromWhere = consumeFromWhere;
+    }
+
+    public ClientChannelInfo findChannel(final String clientId) {
+        Iterator<Entry<Channel, ClientChannelInfo>> it = this.channelInfoTable.entrySet().iterator();
+        while (it.hasNext()) {
+            Entry<Channel, ClientChannelInfo> next = it.next();
+            if (next.getValue().getClientId().equals(clientId)) {
+                return next.getValue();
+            }
+        }
+
+        return null;
+    }
+
+    public ConcurrentMap<String, SubscriptionData> getSubscriptionTable() {
+        return subscriptionTable;
+    }
+
+    public ConcurrentMap<Channel, ClientChannelInfo> getChannelInfoTable() {
+        return channelInfoTable;
+    }
+
+    public List<Channel> getAllChannel() {
+        List<Channel> result = new ArrayList<>();
+
+        result.addAll(this.channelInfoTable.keySet());
+
+        return result;
+    }
+
+    public List<String> getAllClientId() {
+        List<String> result = new ArrayList<>();
+
+        Iterator<Entry<Channel, ClientChannelInfo>> it = this.channelInfoTable.entrySet().iterator();
+
+        while (it.hasNext()) {
+            Entry<Channel, ClientChannelInfo> entry = it.next();
+            ClientChannelInfo clientChannelInfo = entry.getValue();
+            result.add(clientChannelInfo.getClientId());
+        }
+
+        return result;
+    }
+
+    public void unregisterChannel(final ClientChannelInfo clientChannelInfo) {
+        ClientChannelInfo old = this.channelInfoTable.remove(clientChannelInfo.getChannel());
+        if (old != null) {
+            log.info("unregister a consumer[{}] from consumerGroupInfo {}", this.groupName, old.toString());
+        }
+    }
+
+    public boolean doChannelCloseEvent(final String remoteAddr, final Channel channel) {
+        final ClientChannelInfo info = this.channelInfoTable.remove(channel);
+        if (info != null) {
+            log.warn(
+                "NETTY EVENT: remove not active channel[{}] from ConsumerGroupInfo groupChannelTable, consumer group: {}",
+                info.toString(), groupName);
+            return true;
+        }
+
+        return false;
+    }
+
+    public boolean updateChannel(final ClientChannelInfo infoNew, ConsumeType consumeType,
+        MessageModel messageModel, ConsumeFromWhere consumeFromWhere) {
+        boolean updated = false;
+        this.consumeType = consumeType;
+        this.messageModel = messageModel;
+        this.consumeFromWhere = consumeFromWhere;
+
+        ClientChannelInfo infoOld = this.channelInfoTable.get(infoNew.getChannel());
+        if (null == infoOld) {
+            ClientChannelInfo prev = this.channelInfoTable.put(infoNew.getChannel(), infoNew);
+            if (null == prev) {
+                log.info("new consumer connected, group: {} {} {} channel: {}", this.groupName, consumeType,
+                    messageModel, infoNew.toString());
+                updated = true;
+            }
+
+            infoOld = infoNew;
+        } else {
+            if (!infoOld.getClientId().equals(infoNew.getClientId())) {
+                log.error("[BUG] consumer channel exist in broker, but clientId not equal. GROUP: {} OLD: {} NEW: {} ",
+                    this.groupName,
+                    infoOld.toString(),
+                    infoNew.toString());
+                this.channelInfoTable.put(infoNew.getChannel(), infoNew);
+            }
+        }
+
+        this.lastUpdateTimestamp = System.currentTimeMillis();
+        infoOld.setLastUpdateTimestamp(this.lastUpdateTimestamp);
+
+        return updated;
+    }
+
+    public boolean updateSubscription(final Set<SubscriptionData> subList) {
+        boolean updated = false;
+
+        for (SubscriptionData sub : subList) {
+            SubscriptionData old = this.subscriptionTable.get(sub.getTopic());
+            if (old == null) {
+                SubscriptionData prev = this.subscriptionTable.putIfAbsent(sub.getTopic(), sub);
+                if (null == prev) {
+                    updated = true;
+                    log.info("subscription changed, add new topic, group: {} {}",
+                        this.groupName,
+                        sub.toString());
+                }
+            } else if (sub.getSubVersion() > old.getSubVersion()) {
+                if (this.consumeType == ConsumeType.CONSUME_PASSIVELY) {
+                    log.info("subscription changed, group: {} OLD: {} NEW: {}",
+                        this.groupName,
+                        old.toString(),
+                        sub.toString()
+                    );
+                }
+
+                this.subscriptionTable.put(sub.getTopic(), sub);
+            }
+        }
+
+        Iterator<Entry<String, SubscriptionData>> it = this.subscriptionTable.entrySet().iterator();
+        while (it.hasNext()) {
+            Entry<String, SubscriptionData> next = it.next();
+            String oldTopic = next.getKey();
+
+            boolean exist = false;
+            for (SubscriptionData sub : subList) {
+                if (sub.getTopic().equals(oldTopic)) {
+                    exist = true;
+                    break;
+                }
+            }
+
+            if (!exist) {
+                log.warn("subscription changed, group: {} remove topic {} {}",
+                    this.groupName,
+                    oldTopic,
+                    next.getValue().toString()
+                );
+
+                it.remove();
+                updated = true;
+            }
+        }
+
+        this.lastUpdateTimestamp = System.currentTimeMillis();
+
+        return updated;
+    }
+
+    public Set<String> getSubscribeTopics() {
+        return subscriptionTable.keySet();
+    }
+
+    public SubscriptionData findSubscriptionData(final String topic) {
+        return this.subscriptionTable.get(topic);
+    }
+
+    public ConsumeType getConsumeType() {
+        return consumeType;
+    }
+
+    public void setConsumeType(ConsumeType consumeType) {
+        this.consumeType = consumeType;
+    }
+
+    public MessageModel getMessageModel() {
+        return messageModel;
+    }
+
+    public void setMessageModel(MessageModel messageModel) {
+        this.messageModel = messageModel;
+    }
+
+    public String getGroupName() {
+        return groupName;
+    }
+
+    public long getLastUpdateTimestamp() {
+        return lastUpdateTimestamp;
+    }
+
+    public void setLastUpdateTimestamp(long lastUpdateTimestamp) {
+        this.lastUpdateTimestamp = lastUpdateTimestamp;
+    }
+
+    public ConsumeFromWhere getConsumeFromWhere() {
+        return consumeFromWhere;
+    }
+
+    public void setConsumeFromWhere(ConsumeFromWhere consumeFromWhere) {
+        this.consumeFromWhere = consumeFromWhere;
+    }
+}
diff --git a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/client/ConsumerIdsChangeListener.java b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/client/ConsumerIdsChangeListener.java
new file mode 100644
index 0000000..ba950f1
--- /dev/null
+++ b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/client/ConsumerIdsChangeListener.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.snode.client;
+
+public interface ConsumerIdsChangeListener {
+
+    void handle(ConsumerGroupEvent event, String group, Object... args);
+}
diff --git a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/client/ConsumerManager.java b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/client/ConsumerManager.java
new file mode 100644
index 0000000..8d3b665
--- /dev/null
+++ b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/client/ConsumerManager.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.snode.client;
+
+import io.netty.channel.Channel;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
+import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
+import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
+import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
+import org.apache.rocketmq.remoting.common.RemotingUtil;
+
+public class ConsumerManager {
+    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
+    private static final long CHANNEL_EXPIRED_TIMEOUT = 1000 * 120;
+    private final ConcurrentMap<String/* Group */, ConsumerGroupInfo> consumerTable =
+        new ConcurrentHashMap<String, ConsumerGroupInfo>(1024);
+    private final ConsumerIdsChangeListener consumerIdsChangeListener;
+
+    public ConsumerManager(final ConsumerIdsChangeListener consumerIdsChangeListener) {
+        this.consumerIdsChangeListener = consumerIdsChangeListener;
+    }
+
+    public ClientChannelInfo findChannel(final String group, final String clientId) {
+        ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);
+        if (consumerGroupInfo != null) {
+            return consumerGroupInfo.findChannel(clientId);
+        }
+        return null;
+    }
+
+    public SubscriptionData findSubscriptionData(final String group, final String topic) {
+        ConsumerGroupInfo consumerGroupInfo = this.getConsumerGroupInfo(group);
+        if (consumerGroupInfo != null) {
+            return consumerGroupInfo.findSubscriptionData(topic);
+        }
+
+        return null;
+    }
+
+    public ConsumerGroupInfo getConsumerGroupInfo(final String group) {
+        return this.consumerTable.get(group);
+    }
+
+    public int findSubscriptionDataCount(final String group) {
+        ConsumerGroupInfo consumerGroupInfo = this.getConsumerGroupInfo(group);
+        if (consumerGroupInfo != null) {
+            return consumerGroupInfo.getSubscriptionTable().size();
+        }
+
+        return 0;
+    }
+
+    public void doChannelCloseEvent(final String remoteAddr, final Channel channel) {
+        Iterator<Entry<String, ConsumerGroupInfo>> it = this.consumerTable.entrySet().iterator();
+        while (it.hasNext()) {
+            Entry<String, ConsumerGroupInfo> next = it.next();
+            ConsumerGroupInfo info = next.getValue();
+            boolean removed = info.doChannelCloseEvent(remoteAddr, channel);
+            if (removed) {
+                if (info.getChannelInfoTable().isEmpty()) {
+                    ConsumerGroupInfo remove = this.consumerTable.remove(next.getKey());
+                    if (remove != null) {
+                        log.info("unregister consumer ok, no any connection, and remove consumer group, {}",
+                            next.getKey());
+                        this.consumerIdsChangeListener.handle(ConsumerGroupEvent.UNREGISTER, next.getKey());
+                    }
+                }
+
+                this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, next.getKey(), info.getAllChannel());
+            }
+        }
+    }
+
+    public boolean registerConsumer(final String group, final ClientChannelInfo clientChannelInfo,
+        ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere,
+        final Set<SubscriptionData> subList, boolean isNotifyConsumerIdsChangedEnable) {
+
+        ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);
+        if (null == consumerGroupInfo) {
+            ConsumerGroupInfo tmp = new ConsumerGroupInfo(group, consumeType, messageModel, consumeFromWhere);
+            ConsumerGroupInfo prev = this.consumerTable.putIfAbsent(group, tmp);
+            consumerGroupInfo = prev != null ? prev : tmp;
+        }
+
+        boolean r1 =
+            consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel,
+                consumeFromWhere);
+        boolean r2 = consumerGroupInfo.updateSubscription(subList);
+
+        if (r1 || r2) {
+            if (isNotifyConsumerIdsChangedEnable) {
+                this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel());
+            }
+        }
+
+        this.consumerIdsChangeListener.handle(ConsumerGroupEvent.REGISTER, group, subList);
+
+        return r1 || r2;
+    }
+
+    public void unregisterConsumer(final String group, final ClientChannelInfo clientChannelInfo,
+        boolean isNotifyConsumerIdsChangedEnable) {
+        ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);
+        if (null != consumerGroupInfo) {
+            consumerGroupInfo.unregisterChannel(clientChannelInfo);
+            if (consumerGroupInfo.getChannelInfoTable().isEmpty()) {
+                ConsumerGroupInfo remove = this.consumerTable.remove(group);
+                if (remove != null) {
+                    log.info("unregister consumer ok, no any connection, and remove consumer group, {}", group);
+
+                    this.consumerIdsChangeListener.handle(ConsumerGroupEvent.UNREGISTER, group);
+                }
+            }
+            if (isNotifyConsumerIdsChangedEnable) {
+                this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel());
+            }
+        }
+    }
+
+    public void scanNotActiveChannel() {
+        Iterator<Entry<String, ConsumerGroupInfo>> it = this.consumerTable.entrySet().iterator();
+        while (it.hasNext()) {
+            Entry<String, ConsumerGroupInfo> next = it.next();
+            String group = next.getKey();
+            ConsumerGroupInfo consumerGroupInfo = next.getValue();
+            ConcurrentMap<Channel, ClientChannelInfo> channelInfoTable =
+                consumerGroupInfo.getChannelInfoTable();
+
+            Iterator<Entry<Channel, ClientChannelInfo>> itChannel = channelInfoTable.entrySet().iterator();
+            while (itChannel.hasNext()) {
+                Entry<Channel, ClientChannelInfo> nextChannel = itChannel.next();
+                ClientChannelInfo clientChannelInfo = nextChannel.getValue();
+                long diff = System.currentTimeMillis() - clientChannelInfo.getLastUpdateTimestamp();
+                if (diff > CHANNEL_EXPIRED_TIMEOUT) {
+                    log.warn(
+                        "SCAN: remove expired channel from ConsumerManager consumerTable. channel={}, consumerGroup={}",
+                        RemotingHelper.parseChannelRemoteAddr(clientChannelInfo.getChannel()), group);
+                    RemotingUtil.closeChannel(clientChannelInfo.getChannel());
+                    itChannel.remove();
+                }
+            }
+
+            if (channelInfoTable.isEmpty()) {
+                log.warn(
+                    "SCAN: remove expired channel from ConsumerManager consumerTable, all clear, consumerGroup={}",
+                    group);
+                it.remove();
+            }
+        }
+    }
+
+    public HashSet<String> queryTopicConsumeByWho(final String topic) {
+        HashSet<String> groups = new HashSet<>();
+        Iterator<Entry<String, ConsumerGroupInfo>> it = this.consumerTable.entrySet().iterator();
+        while (it.hasNext()) {
+            Entry<String, ConsumerGroupInfo> entry = it.next();
+            ConcurrentMap<String, SubscriptionData> subscriptionTable =
+                entry.getValue().getSubscriptionTable();
+            if (subscriptionTable.containsKey(topic)) {
+                groups.add(entry.getKey());
+            }
+        }
+        return groups;
+    }
+}
diff --git a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/client/DefaultConsumerIdsChangeListener.java b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/client/DefaultConsumerIdsChangeListener.java
new file mode 100644
index 0000000..cb7c164
--- /dev/null
+++ b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/client/DefaultConsumerIdsChangeListener.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.snode.client;
+
+import io.netty.channel.Channel;
+import java.util.Collection;
+import java.util.List;
+import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
+import org.apache.rocketmq.snode.SnodeController;
+
+//TODO Filter implementation
+public class DefaultConsumerIdsChangeListener implements ConsumerIdsChangeListener {
+    private final SnodeController snodeController;
+
+    public DefaultConsumerIdsChangeListener(SnodeController snodeController) {
+        this.snodeController = snodeController;
+    }
+
+    @Override
+    public void handle(ConsumerGroupEvent event, String group, Object... args) {
+        if (event == null) {
+            return;
+        }
+        switch (event) {
+            case CHANGE:
+                if (args == null || args.length < 1) {
+                    return;
+                }
+                List<Channel> channels = (List<Channel>) args[0];
+                if (channels != null && snodeController.getSnodeConfig().isNotifyConsumerIdsChangedEnable()) {
+                    for (Channel chl : channels) {
+                        this.snodeController.getSnodeOuterService().notifyConsumerIdsChanged(chl, group);
+                    }
+                }
+                break;
+            case UNREGISTER:
+//                this.snodeController.getConsumerFilterManager().unRegister(group);
+                break;
+            case REGISTER:
+                if (args == null || args.length < 1) {
+                    return;
+                }
+//                Collection<SubscriptionData> subscriptionDataList = (Collection<SubscriptionData>) args[0];
+//                this.snodeController.getConsumerFilterManager().register(group, subscriptionDataList);
+                break;
+            default:
+                throw new RuntimeException("Unknown event " + event);
+        }
+    }
+}
diff --git a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/client/ProducerManager.java b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/client/ProducerManager.java
new file mode 100644
index 0000000..b80c027
--- /dev/null
+++ b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/client/ProducerManager.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.snode.client;
+
+import io.netty.channel.Channel;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.utils.PositiveAtomicCounter;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
+import org.apache.rocketmq.remoting.common.RemotingUtil;
+
+public class ProducerManager {
+    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
+    private static final long LOCK_TIMEOUT_MILLIS = 3000;
+    private static final long CHANNEL_EXPIRED_TIMEOUT = 1000 * 120;
+    private static final int GET_AVALIABLE_CHANNEL_RETRY_COUNT = 3;
+    private final Lock groupChannelLock = new ReentrantLock();
+    private final HashMap<String /* group name */, HashMap<Channel, ClientChannelInfo>> groupChannelTable =
+        new HashMap<String, HashMap<Channel, ClientChannelInfo>>();
+    private PositiveAtomicCounter positiveAtomicCounter = new PositiveAtomicCounter();
+    public ProducerManager() {
+
+    }
+
+    public HashMap<String, HashMap<Channel, ClientChannelInfo>> getGroupChannelTable() {
+        HashMap<String /* group name */, HashMap<Channel, ClientChannelInfo>> newGroupChannelTable =
+            new HashMap<String, HashMap<Channel, ClientChannelInfo>>();
+        try {
+            if (this.groupChannelLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
+                try {
+                    newGroupChannelTable.putAll(groupChannelTable);
+                } finally {
+                    groupChannelLock.unlock();
+                }
+            }
+        } catch (InterruptedException e) {
+            log.error("", e);
+        }
+        return newGroupChannelTable;
+    }
+
+    public void scanNotActiveChannel() {
+        try {
+            if (this.groupChannelLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
+                try {
+                    for (final Entry<String, HashMap<Channel, ClientChannelInfo>> entry : this.groupChannelTable
+                        .entrySet()) {
+                        final String group = entry.getKey();
+                        final HashMap<Channel, ClientChannelInfo> chlMap = entry.getValue();
+
+                        Iterator<Entry<Channel, ClientChannelInfo>> it = chlMap.entrySet().iterator();
+                        while (it.hasNext()) {
+                            Entry<Channel, ClientChannelInfo> item = it.next();
+                            // final Integer id = item.getKey();
+                            final ClientChannelInfo info = item.getValue();
+
+                            long diff = System.currentTimeMillis() - info.getLastUpdateTimestamp();
+                            if (diff > CHANNEL_EXPIRED_TIMEOUT) {
+                                it.remove();
+                                log.warn(
+                                    "SCAN: remove expired channel[{}] from ProducerManager groupChannelTable, producer group name: {}",
+                                    RemotingHelper.parseChannelRemoteAddr(info.getChannel()), group);
+                                RemotingUtil.closeChannel(info.getChannel());
+                            }
+                        }
+                    }
+                } finally {
+                    this.groupChannelLock.unlock();
+                }
+            } else {
+                log.warn("ProducerManager scanNotActiveChannel lock timeout");
+            }
+        } catch (InterruptedException e) {
+            log.error("", e);
+        }
+    }
+
+    public void doChannelCloseEvent(final String remoteAddr, final Channel channel) {
+        if (channel != null) {
+            try {
+                if (this.groupChannelLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
+                    try {
+                        for (final Entry<String, HashMap<Channel, ClientChannelInfo>> entry : this.groupChannelTable
+                            .entrySet()) {
+                            final String group = entry.getKey();
+                            final HashMap<Channel, ClientChannelInfo> clientChannelInfoTable =
+                                entry.getValue();
+                            final ClientChannelInfo clientChannelInfo =
+                                clientChannelInfoTable.remove(channel);
+                            if (clientChannelInfo != null) {
+                                log.info(
+                                    "NETTY EVENT: remove channel[{}][{}] from ProducerManager groupChannelTable, producer group: {}",
+                                    clientChannelInfo.toString(), remoteAddr, group);
+                            }
+
+                        }
+                    } finally {
+                        this.groupChannelLock.unlock();
+                    }
+                } else {
+                    log.warn("ProducerManager doChannelCloseEvent lock timeout");
+                }
+            } catch (InterruptedException e) {
+                log.error("", e);
+            }
+        }
+    }
+
+    public void registerProducer(final String group, final ClientChannelInfo clientChannelInfo) {
+        try {
+            ClientChannelInfo clientChannelInfoFound = null;
+
+            if (this.groupChannelLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
+                try {
+                    HashMap<Channel, ClientChannelInfo> channelTable = this.groupChannelTable.get(group);
+                    if (null == channelTable) {
+                        channelTable = new HashMap<>();
+                        this.groupChannelTable.put(group, channelTable);
+                    }
+
+                    clientChannelInfoFound = channelTable.get(clientChannelInfo.getChannel());
+                    if (null == clientChannelInfoFound) {
+                        channelTable.put(clientChannelInfo.getChannel(), clientChannelInfo);
+                        log.info("new producer connected, group: {} channel: {}", group,
+                            clientChannelInfo.toString());
+                    }
+                } finally {
+                    this.groupChannelLock.unlock();
+                }
+
+                if (clientChannelInfoFound != null) {
+                    clientChannelInfoFound.setLastUpdateTimestamp(System.currentTimeMillis());
+                }
+            } else {
+                log.warn("ProducerManager registerProducer lock timeout");
+            }
+        } catch (InterruptedException e) {
+            log.error("", e);
+        }
+    }
+
+    public void unregisterProducer(final String group, final ClientChannelInfo clientChannelInfo) {
+        try {
+            if (this.groupChannelLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
+                try {
+                    HashMap<Channel, ClientChannelInfo> channelTable = this.groupChannelTable.get(group);
+                    if (null != channelTable && !channelTable.isEmpty()) {
+                        ClientChannelInfo old = channelTable.remove(clientChannelInfo.getChannel());
+                        if (old != null) {
+                            log.info("unregister a producer[{}] from groupChannelTable {}", group,
+                                clientChannelInfo.toString());
+                        }
+
+                        if (channelTable.isEmpty()) {
+                            this.groupChannelTable.remove(group);
+                            log.info("unregister a producer group[{}] from groupChannelTable", group);
+                        }
+                    }
+                } finally {
+                    this.groupChannelLock.unlock();
+                }
+            } else {
+                log.warn("ProducerManager unregisterProducer lock timeout");
+            }
+        } catch (InterruptedException e) {
+            log.error("", e);
+        }
+    }
+
+    public Channel getAvaliableChannel(String groupId) {
+        HashMap<Channel, ClientChannelInfo> channelClientChannelInfoHashMap = groupChannelTable.get(groupId);
+        List<Channel> channelList = new ArrayList<Channel>();
+        if (channelClientChannelInfoHashMap != null) {
+            for (Channel channel : channelClientChannelInfoHashMap.keySet()) {
+                channelList.add(channel);
+            }
+            int size = channelList.size();
+            if (0 == size) {
+                log.warn("Channel list is empty. groupId={}", groupId);
+                return null;
+            }
+
+            int index = positiveAtomicCounter.incrementAndGet() % size;
+            Channel channel = channelList.get(index);
+            int count = 0;
+            boolean isOk = channel.isActive() && channel.isWritable();
+            while (count++ < GET_AVALIABLE_CHANNEL_RETRY_COUNT) {
+                if (isOk) {
+                    return channel;
+                }
+                index = (++index) % size;
+                channel = channelList.get(index);
+                isOk = channel.isActive() && channel.isWritable();
+            }
+        } else {
+            log.warn("Check transaction failed, channel table is empty. groupId={}", groupId);
+            return null;
+        }
+        return null;
+    }
+}
diff --git a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/client/SubscriptionGroupManager.java b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/client/SubscriptionGroupManager.java
new file mode 100644
index 0000000..3c6799e
--- /dev/null
+++ b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/client/SubscriptionGroupManager.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.snode.client;
+
+import java.util.Iterator;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.rocketmq.common.ConfigManager;
+import org.apache.rocketmq.common.DataVersion;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
+import org.apache.rocketmq.snode.SnodeController;
+
+public class SubscriptionGroupManager extends ConfigManager {
+    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
+
+    private final ConcurrentMap<String, SubscriptionGroupConfig> subscriptionGroupTable =
+        new ConcurrentHashMap<String, SubscriptionGroupConfig>(1024);
+    private final DataVersion dataVersion = new DataVersion();
+    private transient SnodeController snodeController;
+
+    public enum SUBSCRIPTION_EVENT {
+        CREATE,
+        UPDATE,
+        DELETE
+    }
+
+    public SubscriptionGroupManager() {
+        this.init();
+    }
+
+    public SubscriptionGroupManager(SnodeController snodeController) {
+        this.snodeController = snodeController;
+        this.init();
+    }
+
+    private void init() {
+        {
+            SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
+            subscriptionGroupConfig.setGroupName(MixAll.TOOLS_CONSUMER_GROUP);
+            this.subscriptionGroupTable.put(MixAll.TOOLS_CONSUMER_GROUP, subscriptionGroupConfig);
+        }
+
+        {
+            SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
+            subscriptionGroupConfig.setGroupName(MixAll.FILTERSRV_CONSUMER_GROUP);
+            this.subscriptionGroupTable.put(MixAll.FILTERSRV_CONSUMER_GROUP, subscriptionGroupConfig);
+        }
+
+        {
+            SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
+            subscriptionGroupConfig.setGroupName(MixAll.SELF_TEST_CONSUMER_GROUP);
+            this.subscriptionGroupTable.put(MixAll.SELF_TEST_CONSUMER_GROUP, subscriptionGroupConfig);
+        }
+
+        {
+            SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
+            subscriptionGroupConfig.setGroupName(MixAll.ONS_HTTP_PROXY_GROUP);
+            subscriptionGroupConfig.setConsumeBroadcastEnable(true);
+            this.subscriptionGroupTable.put(MixAll.ONS_HTTP_PROXY_GROUP, subscriptionGroupConfig);
+        }
+
+        {
+            SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
+            subscriptionGroupConfig.setGroupName(MixAll.CID_ONSAPI_PULL_GROUP);
+            subscriptionGroupConfig.setConsumeBroadcastEnable(true);
+            this.subscriptionGroupTable.put(MixAll.CID_ONSAPI_PULL_GROUP, subscriptionGroupConfig);
+        }
+
+        {
+            SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
+            subscriptionGroupConfig.setGroupName(MixAll.CID_ONSAPI_PERMISSION_GROUP);
+            subscriptionGroupConfig.setConsumeBroadcastEnable(true);
+            this.subscriptionGroupTable.put(MixAll.CID_ONSAPI_PERMISSION_GROUP, subscriptionGroupConfig);
+        }
+
+        {
+            SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
+            subscriptionGroupConfig.setGroupName(MixAll.CID_ONSAPI_OWNER_GROUP);
+            subscriptionGroupConfig.setConsumeBroadcastEnable(true);
+            this.subscriptionGroupTable.put(MixAll.CID_ONSAPI_OWNER_GROUP, subscriptionGroupConfig);
+        }
+    }
+
+    public void updateSubscriptionGroupConfig(final SubscriptionGroupConfig config) {
+        SubscriptionGroupConfig old = this.subscriptionGroupTable.put(config.getGroupName(), config);
+        if (old != null) {
+            log.info("update subscription group config, old: {} new: {}", old, config);
+        } else {
+            log.info("create new subscription group, {}", config);
+        }
+
+        this.dataVersion.nextVersion();
+
+        this.persistToEnode(SUBSCRIPTION_EVENT.UPDATE, config);
+    }
+
+    public void disableConsume(final String groupName) {
+        SubscriptionGroupConfig old = this.subscriptionGroupTable.get(groupName);
+        if (old != null) {
+            old.setConsumeEnable(false);
+            this.dataVersion.nextVersion();
+        }
+    }
+
+    public SubscriptionGroupConfig findSubscriptionGroupConfig(final String group) {
+        SubscriptionGroupConfig subscriptionGroupConfig = this.subscriptionGroupTable.get(group);
+        if (null == subscriptionGroupConfig) {
+            if (snodeController.getSnodeConfig().isAutoCreateSubscriptionGroup() || MixAll.isSysConsumerGroup(group)) {
+                subscriptionGroupConfig = new SubscriptionGroupConfig();
+                subscriptionGroupConfig.setGroupName(group);
+                SubscriptionGroupConfig preConfig = this.subscriptionGroupTable.putIfAbsent(group, subscriptionGroupConfig);
+                if (null == preConfig) {
+                    log.info("auto create a subscription group, {}", subscriptionGroupConfig.toString());
+                }
+                this.dataVersion.nextVersion();
+                this.persistToEnode(SUBSCRIPTION_EVENT.CREATE, subscriptionGroupConfig);
+            }
+        }
+
+        return subscriptionGroupConfig;
+    }
+
+    @Override
+    public String encode() {
+        return this.encode(false);
+    }
+
+    @Override
+    public String configFilePath() {
+        //TODO  get subscription persist request code
+        return null;
+    }
+
+    @Override
+    public void decode(String jsonString) {
+        if (jsonString != null) {
+            SubscriptionGroupManager obj = RemotingSerializable.fromJson(jsonString, SubscriptionGroupManager.class);
+            if (obj != null) {
+                this.subscriptionGroupTable.putAll(obj.subscriptionGroupTable);
+                this.dataVersion.assignNewOne(obj.dataVersion);
+                this.printLoadDataWhenFirstBoot(obj);
+            }
+        }
+    }
+
+    public String encode(final boolean prettyFormat) {
+        return RemotingSerializable.toJson(this, prettyFormat);
+    }
+
+    private void printLoadDataWhenFirstBoot(final SubscriptionGroupManager sgm) {
+        Iterator<Entry<String, SubscriptionGroupConfig>> it = sgm.getSubscriptionGroupTable().entrySet().iterator();
+        while (it.hasNext()) {
+            Entry<String, SubscriptionGroupConfig> next = it.next();
+            log.info("load exist subscription group, {}", next.getValue().toString());
+        }
+    }
+
+    public ConcurrentMap<String, SubscriptionGroupConfig> getSubscriptionGroupTable() {
+        return subscriptionGroupTable;
+    }
+
+    public DataVersion getDataVersion() {
+        return dataVersion;
+    }
+
+    public void deleteSubscriptionGroupConfig(final String groupName) {
+        SubscriptionGroupConfig old = this.subscriptionGroupTable.remove(groupName);
+        if (old != null) {
+            log.info("delete subscription group OK, subscription group:{}", old);
+            this.dataVersion.nextVersion();
+            this.persistToEnode(SUBSCRIPTION_EVENT.DELETE, old);
+        } else {
+            log.warn("delete subscription group failed, subscription groupName: {} not exist", groupName);
+        }
+    }
+
+    void persistToEnode(SUBSCRIPTION_EVENT event, SubscriptionGroupConfig config) {
+
+    }
+}
diff --git a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/config/SnodeConfig.java b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/config/SnodeConfig.java
new file mode 100644
index 0000000..725cf6a
--- /dev/null
+++ b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/config/SnodeConfig.java
@@ -0,0 +1,223 @@
+package org.apache.rocketmq.snode.config;/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.annotation.ImportantField;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+
+public class SnodeConfig {
+
+    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);
+
+    private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV));
+
+    @ImportantField
+    private String namesrvAddr = System.getProperty(MixAll.NAMESRV_ADDR_PROPERTY, System.getenv(MixAll.NAMESRV_ADDR_ENV));
+
+    private String snodeName = "defaultNode";
+
+    private String snodeAddr = "127.0.0.1:11911";
+
+    private String clusterName = "defaultCluster";
+
+    private int snodeSendThreadPoolQueueCapacity = 10000;
+
+    private int snodeSendMessageMinPoolSize = 10;
+
+    private int snodeSendMessageMaxPoolSize = 20;
+
+    private int snodeHeartBeatCorePoolSize = 1;
+
+    private int snodeHeartBeatMaxPoolSize = 2;
+
+    private int snodeHeartBeatThreadPoolQueueCapacity = 1000;
+
+    private long snodeHeartBeatInterval = 30 * 1000;
+
+    private boolean fetechNameserver = false;
+
+    private long houseKeepingInterval = 10 * 1000;
+
+    private boolean notifyConsumerIdsChangedEnable = true;
+
+    private boolean autoCreateSubscriptionGroup = true;
+
+    private int listenPort = 11911;
+
+    public void setSnodeHeartBeatInterval(long snodeHeartBeatInterval) {
+        this.snodeHeartBeatInterval = snodeHeartBeatInterval;
+    }
+
+    public long getHouseKeepingInterval() {
+        return houseKeepingInterval;
+    }
+
+    public void setHouseKeepingInterval(long houseKeepingInterval) {
+        this.houseKeepingInterval = houseKeepingInterval;
+    }
+
+    public boolean isFetechNameserver() {
+        return fetechNameserver;
+    }
+
+    public void setFetechNameserver(boolean fetechNameserver) {
+        this.fetechNameserver = fetechNameserver;
+    }
+
+    public long getSnodeHeartBeatInterval() {
+        return snodeHeartBeatInterval;
+    }
+
+    public String getClusterName() {
+        return clusterName;
+    }
+
+    public void setClusterName(String clusterName) {
+        this.clusterName = clusterName;
+    }
+
+    /**
+     * This configurable item defines interval of topics registration of broker to name server. Allowing values are
+     * between 10, 000 and 60, 000 milliseconds.
+     */
+    private int registerNameServerPeriod = 1000 * 30;
+
+    public int getRegisterNameServerPeriod() {
+        return registerNameServerPeriod;
+    }
+
+    public void setRegisterNameServerPeriod(int registerNameServerPeriod) {
+        this.registerNameServerPeriod = registerNameServerPeriod;
+    }
+
+    @ImportantField
+    private boolean fetchNamesrvAddrByAddressServer = false;
+
+    public boolean isFetchNamesrvAddrByAddressServer() {
+        return fetchNamesrvAddrByAddressServer;
+    }
+
+    public void setFetchNamesrvAddrByAddressServer(boolean fetchNamesrvAddrByAddressServer) {
+        this.fetchNamesrvAddrByAddressServer = fetchNamesrvAddrByAddressServer;
+    }
+
+    public int getSnodeHeartBeatThreadPoolQueueCapacity() {
+        return snodeHeartBeatThreadPoolQueueCapacity;
+    }
+
+    public void setSnodeHeartBeatThreadPoolQueueCapacity(int snodeHeartBeatThreadPoolQueueCapacity) {
+        this.snodeHeartBeatThreadPoolQueueCapacity = snodeHeartBeatThreadPoolQueueCapacity;
+    }
+
+    public int getSnodeHeartBeatCorePoolSize() {
+        return snodeHeartBeatCorePoolSize;
+    }
+
+    public void setSnodeHeartBeatCorePoolSize(int snodeHeartBeatCorePoolSize) {
+        this.snodeHeartBeatCorePoolSize = snodeHeartBeatCorePoolSize;
+    }
+
+    public int getSnodeHeartBeatMaxPoolSize() {
+        return snodeHeartBeatMaxPoolSize;
+    }
+
+    public void setSnodeHeartBeatMaxPoolSize(int snodeHeartBeatMaxPoolSize) {
+        this.snodeHeartBeatMaxPoolSize = snodeHeartBeatMaxPoolSize;
+    }
+
+    public int getListenPort() {
+        return listenPort;
+    }
+
+    public String getRocketmqHome() {
+        return rocketmqHome;
+    }
+
+    public void setRocketmqHome(String rocketmqHome) {
+        this.rocketmqHome = rocketmqHome;
+    }
+
+    public void setListenPort(int listenPort) {
+        this.listenPort = listenPort;
+    }
+
+    public int getSnodeSendThreadPoolQueueCapacity() {
+        return snodeSendThreadPoolQueueCapacity;
+    }
+
+    public void setSnodeSendThreadPoolQueueCapacity(int snodeSendThreadPoolQueueCapacity) {
+        this.snodeSendThreadPoolQueueCapacity = snodeSendThreadPoolQueueCapacity;
+    }
+
+    public int getSnodeSendMessageMinPoolSize() {
+        return snodeSendMessageMinPoolSize;
+    }
+
+    public void setSnodeSendMessageMinPoolSize(int snodeSendMessageMinPoolSize) {
+        this.snodeSendMessageMinPoolSize = snodeSendMessageMinPoolSize;
+    }
+
+    public int getSnodeSendMessageMaxPoolSize() {
+        return snodeSendMessageMaxPoolSize;
+    }
+
+    public void setSnodeSendMessageMaxPoolSize(int snodeSendMessageMaxPoolSize) {
+        this.snodeSendMessageMaxPoolSize = snodeSendMessageMaxPoolSize;
+    }
+
+    public String getSnodeAddr() {
+        return snodeAddr;
+    }
+
+    public void setSnodeAddr(String snodeAddr) {
+        this.snodeAddr = snodeAddr;
+    }
+
+    public String getSnodeName() {
+        return snodeName;
+    }
+
+    public void setSnodeName(String snodeName) {
+        this.snodeName = snodeName;
+    }
+
+    public String getNamesrvAddr() {
+        return namesrvAddr;
+    }
+
+    public void setNamesrvAddr(String namesrvAddr) {
+        this.namesrvAddr = namesrvAddr;
+    }
+
+    public boolean isNotifyConsumerIdsChangedEnable() {
+        return notifyConsumerIdsChangedEnable;
+    }
+
+    public void setNotifyConsumerIdsChangedEnable(boolean notifyConsumerIdsChangedEnable) {
+        this.notifyConsumerIdsChangedEnable = notifyConsumerIdsChangedEnable;
+    }
+
+    public boolean isAutoCreateSubscriptionGroup() {
+        return autoCreateSubscriptionGroup;
+    }
+
+    public void setAutoCreateSubscriptionGroup(boolean autoCreateSubscriptionGroup) {
+        this.autoCreateSubscriptionGroup = autoCreateSubscriptionGroup;
+    }
+}
diff --git a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/processor/ConsumerManageProcessor.java b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/processor/ConsumerManageProcessor.java
new file mode 100644
index 0000000..8509546
--- /dev/null
+++ b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/processor/ConsumerManageProcessor.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.snode.processor;
+
+import io.netty.channel.ChannelHandlerContext;
+import java.util.List;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.protocol.RequestCode;
+import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.common.protocol.header.GetConsumerListByGroupRequestHeader;
+import org.apache.rocketmq.common.protocol.header.GetConsumerListByGroupResponseBody;
+import org.apache.rocketmq.common.protocol.header.GetConsumerListByGroupResponseHeader;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.snode.SnodeController;
+import org.apache.rocketmq.snode.client.ConsumerGroupInfo;
+
+public class ConsumerManageProcessor implements NettyRequestProcessor {
+    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
+
+    private final SnodeController snodeController;
+
+    public ConsumerManageProcessor(final SnodeController snodeController) {
+        this.snodeController = snodeController;
+    }
+
+    @Override
+    public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)
+        throws RemotingCommandException {
+        switch (request.getCode()) {
+            case RequestCode.GET_CONSUMER_LIST_BY_GROUP:
+                return this.getConsumerListByGroup(ctx, request);
+            default:
+                break;
+        }
+        return null;
+    }
+
+    @Override
+    public boolean rejectRequest() {
+        return false;
+    }
+
+    public RemotingCommand getConsumerListByGroup(ChannelHandlerContext ctx, RemotingCommand request)
+        throws RemotingCommandException {
+        final RemotingCommand response =
+            RemotingCommand.createResponseCommand(GetConsumerListByGroupResponseHeader.class);
+        final GetConsumerListByGroupRequestHeader requestHeader =
+            (GetConsumerListByGroupRequestHeader) request
+                .decodeCommandCustomHeader(GetConsumerListByGroupRequestHeader.class);
+
+        ConsumerGroupInfo consumerGroupInfo =
+            this.snodeController.getConsumerManager().getConsumerGroupInfo(
+                requestHeader.getConsumerGroup());
+        if (consumerGroupInfo != null) {
+            List<String> clientIds = consumerGroupInfo.getAllClientId();
+            if (!clientIds.isEmpty()) {
+                GetConsumerListByGroupResponseBody body = new GetConsumerListByGroupResponseBody();
+                body.setConsumerIdList(clientIds);
+                response.setBody(body.encode());
+                response.setCode(ResponseCode.SUCCESS);
+                response.setRemark(null);
+                return response;
+            } else {
+                log.warn("getAllClientId failed, {} {}", requestHeader.getConsumerGroup(),
+                    RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
+            }
+        } else {
+            log.warn("getConsumerGroupInfo failed, {} {}", requestHeader.getConsumerGroup(),
+                RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
+        }
+
+        response.setCode(ResponseCode.SYSTEM_ERROR);
+        response.setRemark("no consumer for this group, " + requestHeader.getConsumerGroup());
+        return response;
+    }
+
+}
diff --git a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/processor/HearbeatProcessor.java b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/processor/HearbeatProcessor.java
new file mode 100644
index 0000000..f06af79
--- /dev/null
+++ b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/processor/HearbeatProcessor.java
@@ -0,0 +1,92 @@
+package org.apache.rocketmq.snode.processor;/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import io.netty.channel.ChannelHandlerContext;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.constant.PermName;
+import org.apache.rocketmq.common.protocol.heartbeat.ConsumerData;
+import org.apache.rocketmq.common.protocol.heartbeat.HeartbeatData;
+import org.apache.rocketmq.common.protocol.heartbeat.ProducerData;
+import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
+import org.apache.rocketmq.common.sysflag.TopicSysFlag;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
+import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.snode.SnodeController;
+import org.apache.rocketmq.snode.client.ClientChannelInfo;
+
+public class HearbeatProcessor implements NettyRequestProcessor {
+    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME);
+    private final SnodeController snodeController;
+
+    public HearbeatProcessor(SnodeController snodeController) {
+        this.snodeController = snodeController;
+    }
+
+    @Override
+    public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) {
+        HeartbeatData heartbeatData = HeartbeatData.decode(request.getBody(), HeartbeatData.class);
+        ClientChannelInfo clientChannelInfo = new ClientChannelInfo(
+            ctx.channel(),
+            heartbeatData.getClientID(),
+            request.getLanguage(),
+            request.getVersion()
+        );
+
+        if (heartbeatData.getProducerDataSet() != null) {
+            for (ProducerData producerData : heartbeatData.getProducerDataSet()) {
+                this.snodeController.getProducerManager().registerProducer(producerData.getGroupName(),
+                    clientChannelInfo);
+            }
+        }
+
+        if (heartbeatData.getConsumerDataSet() != null) {
+            for (ConsumerData data : heartbeatData.getConsumerDataSet()) {
+                SubscriptionGroupConfig subscriptionGroupConfig =
+                    this.snodeController.getSubscriptionGroupManager().findSubscriptionGroupConfig(
+                        data.getGroupName());
+                boolean isNotifyConsumerIdsChangedEnable = subscriptionGroupConfig.isNotifyConsumerIdsChangedEnable();
+                boolean changed = this.snodeController.getConsumerManager().registerConsumer(
+                    data.getGroupName(),
+                    clientChannelInfo,
+                    data.getConsumeType(),
+                    data.getMessageModel(),
+                    data.getConsumeFromWhere(),
+                    data.getSubscriptionDataSet(),
+                    isNotifyConsumerIdsChangedEnable
+                );
+
+                if (changed) {
+                    log.info("registerConsumer info changed {} {}",
+                        data.toString(),
+                        RemotingHelper.parseChannelRemoteAddr(ctx.channel())
+                    );
+                }
+            }
+        }
+        RemotingCommand response = RemotingCommand.createResponseCommand(null);
+        return response;
+    }
+
+    @Override
+    public boolean rejectRequest() {
+        return false;
+    }
+}
diff --git a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/processor/PullMessageProcessor.java b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/processor/PullMessageProcessor.java
new file mode 100644
index 0000000..a165fd4
--- /dev/null
+++ b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/processor/PullMessageProcessor.java
@@ -0,0 +1,40 @@
+package org.apache.rocketmq.snode.processor;/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import io.netty.channel.ChannelHandlerContext;
+import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.snode.service.SnodeOuterService;
+
+public class PullMessageProcessor implements NettyRequestProcessor {
+
+    private final SnodeOuterService snodeOuterService;
+
+    public PullMessageProcessor(SnodeOuterService snodeOuterService){
+        this.snodeOuterService = snodeOuterService;
+    }
+
+    @Override
+    public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) {
+        return snodeOuterService.pullMessage(request);
+    }
+
+    @Override
+    public boolean rejectRequest() {
+        return false;
+    }
+}
diff --git a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/processor/SendMessageProcessor.java b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/processor/SendMessageProcessor.java
new file mode 100644
index 0000000..e419475
--- /dev/null
+++ b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/processor/SendMessageProcessor.java
@@ -0,0 +1,44 @@
+package org.apache.rocketmq.snode.processor;/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import io.netty.channel.ChannelHandlerContext;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.snode.service.SnodeOuterService;
+
+public class SendMessageProcessor implements NettyRequestProcessor {
+    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME);
+
+    private final SnodeOuterService snodeOuterService;
+
+    public SendMessageProcessor(final SnodeOuterService snodeOuterService) {
+        this.snodeOuterService = snodeOuterService;
+    }
+
+    @Override
+    public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) {
+        return snodeOuterService.sendMessage(request);
+    }
+
+    @Override
+    public boolean rejectRequest() {
+        return false;
+    }
+}
diff --git a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/service/ScheduledService.java b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/service/ScheduledService.java
new file mode 100644
index 0000000..dc86688
--- /dev/null
+++ b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/service/ScheduledService.java
@@ -0,0 +1,21 @@
+package org.apache.rocketmq.snode.service;/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+public interface ScheduledService {
+    void startScheduleTask();
+    void shutdown();
+}
diff --git a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/service/SendTransferService.java b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/service/SendTransferService.java
new file mode 100644
index 0000000..6dad57c
--- /dev/null
+++ b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/service/SendTransferService.java
@@ -0,0 +1,26 @@
+package org.apache.rocketmq.snode.service;/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+
+public interface SendTransferService {
+    RemotingCommand sendMessage(RemotingCommand request);
+
+    boolean start();
+
+    void shutdown();
+}
diff --git a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/service/SnodeOuterService.java b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/service/SnodeOuterService.java
new file mode 100644
index 0000000..8764228
--- /dev/null
+++ b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/service/SnodeOuterService.java
@@ -0,0 +1,52 @@
+package org.apache.rocketmq.snode.service;/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import io.netty.channel.Channel;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.remoting.exception.RemotingConnectException;
+import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
+import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.snode.config.SnodeConfig;
+
+public interface SnodeOuterService {
+    void sendHearbeat(RemotingCommand remotingCommand);
+
+    RemotingCommand sendMessage(RemotingCommand remotingCommand);
+
+    RemotingCommand pullMessage(RemotingCommand remotingCommand);
+
+    void saveSubscriptionData(RemotingCommand remotingCommand);
+
+    void start();
+
+    void shutdown();
+
+    void registerSnode(SnodeConfig snodeConfig);
+
+    void updateNameServerAddressList(final String addrs);
+
+    String fetchNameServerAddr();
+
+    void updateEnodeAddr(String clusterName) throws InterruptedException, RemotingTimeoutException,
+        RemotingSendRequestException, RemotingConnectException, MQBrokerException;
+
+    void notifyConsumerIdsChanged(final Channel channel, final String consumerGroup);
+
+    RemotingCommand creatTopic(TopicConfig topicConfig);
+}
diff --git a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/service/impl/ScheduledServiceImpl.java b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/service/impl/ScheduledServiceImpl.java
new file mode 100644
index 0000000..23d8867
--- /dev/null
+++ b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/service/impl/ScheduledServiceImpl.java
@@ -0,0 +1,113 @@
+package org.apache.rocketmq.snode.service.impl;/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.protocol.RequestCode;
+import org.apache.rocketmq.common.protocol.heartbeat.HeartbeatData;
+import org.apache.rocketmq.common.protocol.heartbeat.SnodeData;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.snode.config.SnodeConfig;
+import org.apache.rocketmq.snode.service.ScheduledService;
+import org.apache.rocketmq.snode.service.SnodeOuterService;
+
+public class ScheduledServiceImpl implements ScheduledService {
+    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME);
+
+    private SnodeOuterService snodeOuterService;
+    private SnodeConfig snodeConfig;
+
+    private final RemotingCommand enodeHeartbeat;
+
+    public ScheduledServiceImpl(SnodeOuterService snodeOuterService, SnodeConfig snodeConfig) {
+        this.snodeOuterService = snodeOuterService;
+        this.snodeConfig = snodeConfig;
+        enodeHeartbeat = RemotingCommand.createRequestCommand(RequestCode.HEART_BEAT, null);
+        HeartbeatData heartbeatData = new HeartbeatData();
+        heartbeatData.setClientID(snodeConfig.getSnodeName());
+        SnodeData snodeData = new SnodeData();
+        snodeData.setSnodeName(snodeConfig.getSnodeName());
+        heartbeatData.setSnodeData(snodeData);
+        enodeHeartbeat.setBody(heartbeatData.encode());
+    }
+
+    private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
+        @Override
+        public Thread newThread(Runnable r) {
+            return new Thread(r, "SNodeScheduledThread");
+        }
+    });
+
+    @Override
+    public void startScheduleTask() {
+
+        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    snodeOuterService.sendHearbeat(enodeHeartbeat);
+                } catch (Exception e) {
+                    log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
+                }
+            }
+        }, 1000 * 10, this.snodeConfig.getSnodeHeartBeatInterval(), TimeUnit.MILLISECONDS);
+
+        if (snodeConfig.isFetchNamesrvAddrByAddressServer()) {
+            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+                @Override
+                public void run() {
+                    try {
+                        snodeOuterService.fetchNameServerAddr();
+                    } catch (Throwable e) {
+                        log.error("ScheduledTask fetchNameServerAddr exception", e);
+                    }
+                }
+            }, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
+        }
+
+        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+            @Override
+            public void run() {
+                snodeOuterService.registerSnode(snodeConfig);
+            }
+        }, 1000 * 10, Math.max(10000, Math.min(snodeConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
+
+        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    snodeOuterService.updateEnodeAddr(snodeConfig.getClusterName());
+                } catch (Exception ex) {
+                    log.warn("Update broker addr error:{}", ex);
+                }
+            }
+        }, 1000 * 10, Math.max(10000, Math.min(snodeConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
+
+    }
+
+    @Override
+    public void shutdown() {
+        if (this.scheduledExecutorService != null) {
+            this.scheduledExecutorService.shutdown();
+        }
+    }
+}
diff --git a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/service/impl/SendTransferServiceImpl.java b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/service/impl/SendTransferServiceImpl.java
new file mode 100644
index 0000000..df589da
--- /dev/null
+++ b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/service/impl/SendTransferServiceImpl.java
@@ -0,0 +1,45 @@
+package org.apache.rocketmq.snode.service.impl;/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.rocketmq.common.ServiceState;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.snode.service.SendTransferService;
+import org.apache.rocketmq.snode.service.SnodeOuterService;
+
+public class SendTransferServiceImpl implements SendTransferService {
+    private ServiceState serviceState = ServiceState.CREATE_JUST;
+    private SnodeOuterService snodeOuterService;
+
+    public SendTransferServiceImpl(SnodeOuterService snodeOuterService) {
+        snodeOuterService = snodeOuterService;
+    }
+
+    @Override
+    public RemotingCommand sendMessage(RemotingCommand request) {
+        return snodeOuterService.sendMessage(request);
+    }
+
+    @Override
+    public boolean start() {
+        return false;
+    }
+
+    @Override
+    public void shutdown() {
+
+    }
+}
diff --git a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/service/impl/SnodeOuterServiceImpl.java b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/service/impl/SnodeOuterServiceImpl.java
new file mode 100644
index 0000000..14e577e
--- /dev/null
+++ b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/service/impl/SnodeOuterServiceImpl.java
@@ -0,0 +1,270 @@
+package org.apache.rocketmq.snode.service.impl;/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import io.netty.channel.Channel;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.namesrv.TopAddressing;
+import org.apache.rocketmq.common.protocol.RequestCode;
+import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.common.protocol.body.ClusterInfo;
+import org.apache.rocketmq.common.protocol.header.CreateTopicRequestHeader;
+import org.apache.rocketmq.common.protocol.header.NotifyConsumerIdsChangedRequestHeader;
+import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
+import org.apache.rocketmq.common.protocol.header.PullMessageResponseHeader;
+import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeaderV2;
+import org.apache.rocketmq.common.protocol.header.namesrv.RegisterSnodeRequestHeader;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.RemotingClient;
+import org.apache.rocketmq.remoting.RemotingClientFactory;
+import org.apache.rocketmq.remoting.exception.RemotingConnectException;
+import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
+import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.snode.SnodeController;
+import org.apache.rocketmq.snode.config.SnodeConfig;
+import org.apache.rocketmq.snode.service.SnodeOuterService;
+
+public class SnodeOuterServiceImpl implements SnodeOuterService {
+    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME);
+    private final TopAddressing topAddressing = new TopAddressing(MixAll.getWSAddr());
+    private String nameSrvAddr = null;
+    private RemotingClient client;
+    private SnodeController snodeController;
+    private static SnodeOuterServiceImpl snodeOuterService;
+    private final ConcurrentMap<String/* Broker Name */, HashMap<Long/* brokerId */, String/* address */>> enodeTable =
+        new ConcurrentHashMap<>();
+    private final long defaultTimeoutMills = 3000L;
+
+    private SnodeOuterServiceImpl() {
+
+    }
+
+    public static SnodeOuterServiceImpl getInstance(SnodeController snodeController) {
+        if (snodeOuterService == null) {
+            synchronized (SnodeOuterServiceImpl.class) {
+                if (snodeOuterService == null) {
+                    snodeOuterService = new SnodeOuterServiceImpl(snodeController);
+                    return snodeOuterService;
+                }
+            }
+        }
+        return snodeOuterService;
+    }
+
+    private SnodeOuterServiceImpl(SnodeController snodeController) {
+        this.snodeController = snodeController;
+        this.client = RemotingClientFactory.createInstance().init(snodeController.getNettyClientConfig(), null);
+    }
+
+    @Override
+    public void start() {
+        this.client.start();
+    }
+
+    @Override
+    public void shutdown() {
+        this.client.shutdown();
+    }
+
+    @Override
+    public void sendHearbeat(RemotingCommand remotingCommand) {
+        for (Map.Entry<String, HashMap<Long, String>> entry : enodeTable.entrySet()) {
+            String enodeAddr = entry.getValue().get(MixAll.MASTER_ID);
+            if (enodeAddr != null) {
+                try {
+                    RemotingCommand response = this.client.invokeSync(enodeAddr, remotingCommand, defaultTimeoutMills);
+                } catch (Exception ex) {
+                    log.warn("Send heart beat faild:{} ,ex:{}", enodeAddr, ex);
+                }
+            }
+        }
+    }
+
+    @Override
+    public RemotingCommand sendMessage(RemotingCommand request) {
+        try {
+            SendMessageRequestHeaderV2 sendMessageRequestHeaderV2 = (SendMessageRequestHeaderV2) request.decodeCommandCustomHeader(SendMessageRequestHeaderV2.class);
+            RemotingCommand response =
+                this.client.invokeSync(sendMessageRequestHeaderV2.getN(), request, defaultTimeoutMills);
+            return response;
+        } catch (Exception ex) {
+            log.error("Send message async error:", ex);
+        }
+        return null;
+    }
+
+    @Override
+    public RemotingCommand pullMessage(RemotingCommand request) {
+        try {
+            final PullMessageRequestHeader requestHeader =
+                (PullMessageRequestHeader) request.decodeCommandCustomHeader(PullMessageRequestHeader.class);
+            RemotingCommand remotingCommand =  this.client.invokeSync(requestHeader.getEnodeAddr(), request, 20 * defaultTimeoutMills);
+            log.info("Pull message response:{}", remotingCommand);
+            log.info("Pull message response:{}", remotingCommand.getBody().length);
+            return remotingCommand;
+        } catch (Exception ex) {
+            log.error("pull message async error:", ex);
+        }
+        return null;
+    }
+
+    @Override
+    public void saveSubscriptionData(RemotingCommand remotingCommand) {
+
+    }
+
+    @Override
+    public String fetchNameServerAddr() {
+        try {
+            String addrs = this.topAddressing.fetchNSAddr();
+            if (addrs != null) {
+                if (!addrs.equals(this.nameSrvAddr)) {
+                    log.info("name server address changed, old: {} new: {}", this.nameSrvAddr, addrs);
+                    this.updateNameServerAddressList(addrs);
+                    this.nameSrvAddr = addrs;
+                    return nameSrvAddr;
+                }
+            }
+        } catch (Exception e) {
+            log.error("fetchNameServerAddr Exception", e);
+        }
+        return nameSrvAddr;
+    }
+
+    private ClusterInfo getBrokerClusterInfo(
+        final long timeoutMillis) throws InterruptedException, RemotingTimeoutException,
+        RemotingSendRequestException, RemotingConnectException, MQBrokerException {
+        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_BROKER_CLUSTER_INFO, null);
+        RemotingCommand response = this.client.invokeSync(null, request, timeoutMillis);
+        assert response != null;
+        switch (response.getCode()) {
+            case ResponseCode.SUCCESS: {
+                return ClusterInfo.decode(response.getBody(), ClusterInfo.class);
+            }
+            default:
+                break;
+        }
+        throw new MQBrokerException(response.getCode(), response.getRemark());
+    }
+
+    @Override
+    public void updateEnodeAddr(String clusterName) throws InterruptedException, RemotingTimeoutException,
+        RemotingSendRequestException, RemotingConnectException, MQBrokerException {
+        synchronized (this) {
+            ClusterInfo clusterInfo = getBrokerClusterInfo(defaultTimeoutMills);
+            if (clusterInfo != null) {
+                HashMap<String, Set<String>> brokerAddrs = clusterInfo.getClusterAddrTable();
+                for (Map.Entry<String, Set<String>> entry : brokerAddrs.entrySet()) {
+                    Set<String> brokerNames = entry.getValue();
+                    if (brokerNames != null) {
+                        for (String brokerName : brokerNames) {
+                            enodeTable.put(brokerName, clusterInfo.getBrokerAddrTable().get(brokerName).getBrokerAddrs());
+                        }
+                    }
+                }
+            }
+        }
+    }
+
+    public void updateNameServerAddressList(final String addrs) {
+        List<String> list = new ArrayList<String>();
+        String[] addrArray = addrs.split(";");
+        for (String addr : addrArray) {
+            list.add(addr);
+        }
+        this.client.updateNameServerAddressList(list);
+    }
+
+    public void registerSnode(SnodeConfig snodeConfig) {
+        List<String> nameServerAddressList = this.client.getNameServerAddressList();
+        RemotingCommand remotingCommand = new RemotingCommand();
+        RegisterSnodeRequestHeader requestHeader = new RegisterSnodeRequestHeader();
+        requestHeader.setSnodeAddr(snodeConfig.getSnodeAddr());
+        requestHeader.setSnodeName(snodeConfig.getSnodeName());
+        requestHeader.setClusterName(snodeConfig.getClusterName());
+        remotingCommand.setCustomHeader(requestHeader);
+        remotingCommand.setCode(RequestCode.REGISTER_SNODE);
+        if (nameServerAddressList != null && nameServerAddressList.size() > 0) {
+            for (String nameServer : nameServerAddressList) {
+                try {
+                    this.client.invokeSync(nameSrvAddr, remotingCommand, 3000L);
+                } catch (Exception ex) {
+                    log.warn("Register Snode to Nameserver addr: {} error, ex:{} ", nameServer, ex);
+                }
+            }
+        }
+    }
+
+    @Override
+    public void notifyConsumerIdsChanged(
+        final Channel channel,
+        final String consumerGroup) {
+        if (null == consumerGroup) {
+            log.error("notifyConsumerIdsChanged consumerGroup is null");
+            return;
+        }
+
+        NotifyConsumerIdsChangedRequestHeader requestHeader = new NotifyConsumerIdsChangedRequestHeader();
+        requestHeader.setConsumerGroup(consumerGroup);
+        RemotingCommand request =
+            RemotingCommand.createRequestCommand(RequestCode.NOTIFY_CONSUMER_IDS_CHANGED, requestHeader);
+
+        try {
+            this.snodeController.getSnodeServer().invokeOneway(channel, request, 10);
+        } catch (Exception e) {
+            log.error("notifyConsumerIdsChanged exception, " + consumerGroup, e.getMessage());
+        }
+    }
+
+    @Override
+    public RemotingCommand creatTopic(TopicConfig topicConfig) {
+//        CreateTopicRequestHeader requestHeader = new CreateTopicRequestHeader();
+//        requestHeader.setTopic(topicConfig.getTopicName());
+//        requestHeader.setDefaultTopic(defaultTopic);
+//        requestHeader.setReadQueueNums(topicConfig.getReadQueueNums());
+//        requestHeader.setWriteQueueNums(topicConfig.getWriteQueueNums());
+//        requestHeader.setPerm(topicConfig.getPerm());
+//        requestHeader.setTopicFilterType(topicConfig.getTopicFilterType().name());
+//        requestHeader.setTopicSysFlag(topicConfig.getTopicSysFlag());
+//        requestHeader.setOrder(topicConfig.isOrder());
+//
+//        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_TOPIC, requestHeader);
+//
+//        RemotingCommand response = this.client.invokeSync(,
+//            request, defaultTimeoutMills);
+//        assert response != null;
+//        switch (response.getCode()) {
+//            case ResponseCode.SUCCESS: {
+//                return;
+//            }
+//            default:
+//                break;
+//        }
+        return null;
+    }
+}
diff --git a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/topic/TopicConfigManager.java b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/topic/TopicConfigManager.java
new file mode 100644
index 0000000..b8b6eb7
--- /dev/null
+++ b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/topic/TopicConfigManager.java
@@ -0,0 +1,19 @@
+package org.apache.rocketmq.snode.topic;/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+public class TopicConfigManager {
+}