You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2019/01/08 02:00:31 UTC

[rocketmq] branch snode updated: Add async push logic after send message succeed

This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch snode
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/snode by this push:
     new 41366dd  Add async push logic after send message succeed
41366dd is described below

commit 41366dda32f72bc0be5e3d33ef064e0ce9e8902a
Author: duhenglucky <du...@gmail.com>
AuthorDate: Tue Jan 8 09:59:54 2019 +0800

    Add async push logic after send message succeed
---
 .../apache/rocketmq/broker/BrokerController.java   |  6 +-
 .../apache/rocketmq/broker/out/BrokerOuterAPI.java |  2 +-
 .../broker/processor/PullMessageProcessor.java     |  2 +-
 .../rocketmq/client/impl/MQClientAPIImpl.java      |  1 -
 .../client/impl/consumer/RebalancePushImpl.java    |  6 ++
 .../rocketmq/common/protocol/RequestCode.java      |  2 +
 .../protocol/header/PullMessageRequestHeader.java  | 10 +--
 .../common/protocol/heartbeat/ConsumerData.java    | 21 ++++++-
 .../subscription/SubscriptionGroupConfig.java      | 34 +++-------
 .../apache/rocketmq/namesrv/NamesrvController.java |  3 +-
 .../apache/rocketmq/remoting/RemotingClient.java   |  8 +--
 .../rocketmq/remoting/RemotingClientFactory.java   | 10 ++-
 .../rocketmq/remoting/RemotingServerFactory.java   |  9 ++-
 .../remoting}/interceptor/ExceptionContext.java    |  2 +-
 .../remoting}/interceptor/Interceptor.java         |  6 +-
 .../remoting}/interceptor/InterceptorFactory.java  | 17 ++---
 .../remoting}/interceptor/InterceptorGroup.java    | 14 ++---
 .../remoting}/interceptor/RequestContext.java      |  2 +-
 .../remoting}/interceptor/ResponseContext.java     |  2 +-
 .../remoting/netty/NettyRemotingAbstract.java      |  2 +-
 .../transport/NettyRemotingClientAbstract.java     | 12 +---
 .../remoting/transport/rocketmq/NettyDecoder.java  |  5 +-
 .../transport/rocketmq/NettyRemotingClient.java    |  1 -
 .../transport/rocketmq/NettyRemotingServer.java    |  6 +-
 .../org/apache/rocketmq/snode/SnodeController.java | 37 +++++------
 .../rocketmq/snode/client/ConsumerGroupInfo.java   |  1 -
 .../rocketmq/snode/client/ConsumerManager.java     |  5 +-
 .../rocketmq/snode/client/ProducerManager.java     |  4 --
 .../rocketmq/snode/client/PushSessionManager.java  | 56 -----------------
 .../snode/client/SubscriptionGroupManager.java     |  1 -
 .../apache/rocketmq/snode/config/SnodeConfig.java  | 12 ++++
 .../snode/processor/HeartbeatProcessor.java        |  7 ++-
 .../snode/processor/PullMessageProcessor.java      | 72 ++++++++++++++--------
 .../snode/processor/SendMessageProcessor.java      | 36 +++++++++--
 .../rocketmq/snode/service/EnodeService.java       |  8 ++-
 .../apache/rocketmq/snode/service/PushService.java |  6 +-
 .../snode/service/impl/EnodeServiceImpl.java       | 20 ++----
 .../snode/service/impl/PushServiceImpl.java        | 57 +++++++++--------
 .../snode/service/impl/ScheduledServiceImpl.java   |  2 +-
 39 files changed, 244 insertions(+), 263 deletions(-)

diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index 61a5008..c12ebe1 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -248,14 +248,12 @@ public class BrokerController {
         result = result && this.messageStore.load();
 
         if (result) {
-            this.remotingServer = RemotingServerFactory.createInstance();
-            this.remotingServer.init(this.nettyServerConfig, this.clientHousekeepingService);
+            this.remotingServer = RemotingServerFactory.getInstance().createRemotingServer().init(this.nettyServerConfig, this.clientHousekeepingService);
 //            this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.clientHousekeepingService);
             ServerConfig fastConfig = (ServerConfig) this.nettyServerConfig.clone();
             fastConfig.setListenPort(nettyServerConfig.getListenPort() - 2);
 //            this.fastRemotingServer = new NettyRemotingServer(fastConfig, this.clientHousekeepingService);
-            this.fastRemotingServer = RemotingServerFactory.createInstance();
-            this.fastRemotingServer.init(fastConfig, this.clientHousekeepingService);
+            this.fastRemotingServer = RemotingServerFactory.getInstance().createRemotingServer().init(fastConfig, this.clientHousekeepingService);
 
             this.sendMessageExecutor = new BrokerFixedThreadPoolExecutor(
                 this.brokerConfig.getSendMessageThreadPoolNums(),
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
index 2c204ce..4be411a 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
@@ -71,7 +71,7 @@ public class BrokerOuterAPI {
     }
 
     public BrokerOuterAPI(final ClientConfig nettyClientConfig, RPCHook rpcHook) {
-        this.remotingClient = RemotingClientFactory.createInstance().init(nettyClientConfig, null);
+        this.remotingClient = RemotingClientFactory.getInstance().createRemotingClient().init(nettyClientConfig, null);
         this.remotingClient.registerRPCHook(rpcHook);
     }
 
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
index 391b599..60b9ad8 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
@@ -100,7 +100,7 @@ public class PullMessageProcessor implements RequestProcessor {
 
         response.setOpaque(request.getOpaque());
 
-        log.debug("receive PullMessage request command, {}", request);
+        log.info("receive PullMessage request command, {}", request);
 
         if (!PermName.isReadable(this.brokerController.getBrokerConfig().getBrokerPermission())) {
             response.setCode(ResponseCode.NO_PERMISSION);
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
index 39f1b61..5e009fd 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
@@ -562,7 +562,6 @@ public class MQClientAPIImpl {
         final CommunicationMode communicationMode,
         final PullCallback pullCallback
     ) throws RemotingException, MQBrokerException, InterruptedException {
-        requestHeader.setEnodeAddr(addr);
         RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.SNODE_PULL_MESSAGE, requestHeader);
         switch (communicationMode) {
             case ONEWAY:
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java
index e5166f3..79e1a64 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java
@@ -16,6 +16,7 @@
  */
 package org.apache.rocketmq.client.impl.consumer;
 
+import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
@@ -58,6 +59,11 @@ public class RebalancePushImpl extends RebalanceImpl {
         log.info("{} Rebalance changed, also update version: {}, {}", topic, subscriptionData.getSubVersion(), newVersion);
         subscriptionData.setSubVersion(newVersion);
 
+        Set<Integer> queueIdSet = new HashSet<Integer>();
+        for (MessageQueue messageQueue : mqAll) {
+            queueIdSet.add(messageQueue.getQueueId());
+        }
+        subscriptionData.setQueueIdSet(queueIdSet);
         int currentQueueCount = this.processQueueTable.size();
         if (currentQueueCount != 0) {
             int pullThresholdForTopic = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getPullThresholdForTopic();
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java b/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
index b36bdb2..f9b097f 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
@@ -172,4 +172,6 @@ public class RequestCode {
 
     public static final int SNODE_PULL_MESSAGE = 351;
 
+    public static final int SNODE_PUSH_MESSAGE = 352;
+
 }
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageRequestHeader.java
index 8332307..53be8b2 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageRequestHeader.java
@@ -48,14 +48,14 @@ public class PullMessageRequestHeader implements CommandCustomHeader {
     private Long subVersion;
     private String expressionType;
 
-    private String enodeAddr;
+    private String enodeName;
 
-    public String getEnodeAddr() {
-        return enodeAddr;
+    public String getEnodeName() {
+        return enodeName;
     }
 
-    public void setEnodeAddr(String enodeAddr) {
-        this.enodeAddr = enodeAddr;
+    public void setEnodeName(String enodeName) {
+        this.enodeName = enodeName;
     }
 
     @Override
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/heartbeat/ConsumerData.java b/common/src/main/java/org/apache/rocketmq/common/protocol/heartbeat/ConsumerData.java
index d4605d0..505eeed 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/heartbeat/ConsumerData.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/heartbeat/ConsumerData.java
@@ -31,6 +31,7 @@ public class ConsumerData {
     private ConsumeFromWhere consumeFromWhere;
     private Set<SubscriptionData> subscriptionDataSet = new HashSet<SubscriptionData>();
     private boolean unitMode;
+    private boolean realPushEnable = true;
 
     public String getGroupName() {
         return groupName;
@@ -80,10 +81,24 @@ public class ConsumerData {
         this.unitMode = isUnitMode;
     }
 
+    public boolean isRealPushEnable() {
+        return realPushEnable;
+    }
+
+    public void setRealPushEnable(boolean realPushEnable) {
+        this.realPushEnable = realPushEnable;
+    }
+
     @Override
     public String toString() {
-        return "ConsumerData [groupName=" + groupName + ", consumeType=" + consumeType + ", messageModel="
-            + messageModel + ", consumeFromWhere=" + consumeFromWhere + ", unitMode=" + unitMode
-            + ", subscriptionDataSet=" + subscriptionDataSet + "]";
+        return "ConsumerData{" +
+            "groupName='" + groupName + '\'' +
+            ", consumeType=" + consumeType +
+            ", messageModel=" + messageModel +
+            ", consumeFromWhere=" + consumeFromWhere +
+            ", subscriptionDataSet=" + subscriptionDataSet +
+            ", unitMode=" + unitMode +
+            ", realPushEnable=" + realPushEnable +
+            '}';
     }
 }
diff --git a/common/src/main/java/org/apache/rocketmq/common/subscription/SubscriptionGroupConfig.java b/common/src/main/java/org/apache/rocketmq/common/subscription/SubscriptionGroupConfig.java
index 158b025..8f4703f 100644
--- a/common/src/main/java/org/apache/rocketmq/common/subscription/SubscriptionGroupConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/subscription/SubscriptionGroupConfig.java
@@ -38,8 +38,6 @@ public class SubscriptionGroupConfig {
 
     private boolean notifyConsumerIdsChangedEnable = true;
 
-    private boolean realPushEnable = false;
-
     public String getGroupName() {
         return groupName;
     }
@@ -112,14 +110,6 @@ public class SubscriptionGroupConfig {
         this.notifyConsumerIdsChangedEnable = notifyConsumerIdsChangedEnable;
     }
 
-    public boolean isRealPushEnable() {
-        return realPushEnable;
-    }
-
-    public void setRealPushEnable(boolean realPushEnable) {
-        this.realPushEnable = realPushEnable;
-    }
-
     @Override
     public int hashCode() {
         final int prime = 31;
@@ -128,7 +118,6 @@ public class SubscriptionGroupConfig {
         result = prime * result + (consumeBroadcastEnable ? 1231 : 1237);
         result = prime * result + (consumeEnable ? 1231 : 1237);
         result = prime * result + (consumeFromMinEnable ? 1231 : 1237);
-        result = prime * result + (realPushEnable ? 1231 : 1237);
         result = prime * result + (notifyConsumerIdsChangedEnable ? 1231 : 1237);
         result = prime * result + ((groupName == null) ? 0 : groupName.hashCode());
         result = prime * result + retryMaxTimes;
@@ -155,8 +144,6 @@ public class SubscriptionGroupConfig {
             return false;
         if (consumeFromMinEnable != other.consumeFromMinEnable)
             return false;
-        if (realPushEnable != other.realPushEnable)
-            return false;
         if (groupName == null) {
             if (other.groupName != null)
                 return false;
@@ -173,18 +160,13 @@ public class SubscriptionGroupConfig {
         return true;
     }
 
-    @Override public String toString() {
-        return "SubscriptionGroupConfig{" +
-            "groupName='" + groupName + '\'' +
-            ", consumeEnable=" + consumeEnable +
-            ", consumeFromMinEnable=" + consumeFromMinEnable +
-            ", consumeBroadcastEnable=" + consumeBroadcastEnable +
-            ", retryQueueNums=" + retryQueueNums +
-            ", retryMaxTimes=" + retryMaxTimes +
-            ", brokerId=" + brokerId +
-            ", whichBrokerWhenConsumeSlowly=" + whichBrokerWhenConsumeSlowly +
-            ", notifyConsumerIdsChangedEnable=" + notifyConsumerIdsChangedEnable +
-            ", realPushEnable=" + realPushEnable +
-            '}';
+    @Override
+    public String toString() {
+        return "SubscriptionGroupConfig [groupName=" + groupName + ", consumeEnable=" + consumeEnable
+            + ", consumeFromMinEnable=" + consumeFromMinEnable + ", consumeBroadcastEnable="
+            + consumeBroadcastEnable + ", retryQueueNums=" + retryQueueNums + ", retryMaxTimes="
+            + retryMaxTimes + ", brokerId=" + brokerId + ", whichBrokerWhenConsumeSlowly="
+            + whichBrokerWhenConsumeSlowly + ", notifyConsumerIdsChangedEnable="
+            + notifyConsumerIdsChangedEnable + "]";
     }
 }
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java
index e833ae6..740d502 100644
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java
@@ -77,9 +77,8 @@ public class NamesrvController {
 
         this.kvConfigManager.load();
 
-        this.remotingServer = RemotingServerFactory.createInstance();
+        this.remotingServer = RemotingServerFactory.getInstance().createRemotingServer();
         this.remotingServer.init(this.nettyServerConfig, this.brokerHousekeepingService);
-//        this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
 
         this.remotingExecutor =
             Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java
index 84d4241..bc9b61a 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java
@@ -26,19 +26,19 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 
 public interface RemotingClient extends RemotingService {
 
-    void updateNameServerAddressList(final List<String> addrs);
+    void updateNameServerAddressList(final List<String> addresses);
 
     List<String> getNameServerAddressList();
 
-    RemotingCommand invokeSync(final String addr, final RemotingCommand request,
+    RemotingCommand invokeSync(final String address, final RemotingCommand request,
         final long timeoutMillis) throws InterruptedException, RemotingConnectException,
         RemotingSendRequestException, RemotingTimeoutException;
 
-    void invokeAsync(final String addr, final RemotingCommand request, final long timeoutMillis,
+    void invokeAsync(final String address, final RemotingCommand request, final long timeoutMillis,
         final InvokeCallback invokeCallback) throws InterruptedException, RemotingConnectException,
         RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException;
 
-    void invokeOneway(final String addr, final RemotingCommand request, final long timeoutMillis)
+    void invokeOneway(final String address, final RemotingCommand request, final long timeoutMillis)
         throws InterruptedException, RemotingConnectException, RemotingTooMuchRequestException,
         RemotingTimeoutException, RemotingSendRequestException;
 
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClientFactory.java b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClientFactory.java
index 4df2a70..b5e5a91 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClientFactory.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClientFactory.java
@@ -24,7 +24,11 @@ import org.apache.rocketmq.remoting.common.RemotingUtil;
 import org.apache.rocketmq.remoting.util.ServiceProvider;
 
 public class RemotingClientFactory {
-    private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
+    private static RemotingClientFactory instance = new RemotingClientFactory();
+
+    public static RemotingClientFactory getInstance(){
+        return instance;
+    }
 
     private RemotingClientFactory() {
     }
@@ -37,11 +41,11 @@ public class RemotingClientFactory {
         paths = ServiceProvider.loadPath(CLIENT_LOCATION);
     }
 
-    public static RemotingClient createInstance(String protocol) {
+    public RemotingClient createRemotingClient(String protocol) {
         return ServiceProvider.createInstance(paths.get(protocol), RemotingClient.class);
     }
 
-    public static RemotingClient createInstance() {
+    public RemotingClient createRemotingClient() {
         return ServiceProvider.createInstance(paths.get(RemotingUtil.DEFAULT_PROTOCOL), RemotingClient.class);
     }
 }
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingServerFactory.java b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingServerFactory.java
index 6dbf22a..231e19a 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingServerFactory.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingServerFactory.java
@@ -24,8 +24,11 @@ import org.apache.rocketmq.remoting.common.RemotingUtil;
 import org.apache.rocketmq.remoting.util.ServiceProvider;
 
 public class RemotingServerFactory {
+    private static RemotingServerFactory instance = new RemotingServerFactory();
 
-    private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
+    public static RemotingServerFactory getInstance() {
+        return instance;
+    }
 
     private RemotingServerFactory() {
     }
@@ -38,11 +41,11 @@ public class RemotingServerFactory {
         protocolPathMap = ServiceProvider.loadPath(SERVER_LOCATION);
     }
 
-    public static RemotingServer createInstance(String protocol) {
+    public RemotingServer createRemotingServer(String protocol) {
         return ServiceProvider.createInstance(protocolPathMap.get(protocol), RemotingClient.class);
     }
 
-    public static RemotingServer createInstance() {
+    public RemotingServer createRemotingServer() {
         return ServiceProvider.createInstance(protocolPathMap.get(RemotingUtil.DEFAULT_PROTOCOL), RemotingServer.class);
     }
 }
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/interceptor/ExceptionContext.java b/remoting/src/main/java/org/apache/rocketmq/remoting/interceptor/ExceptionContext.java
similarity index 96%
rename from snode/src/main/java/org/apache/rocketmq/snode/interceptor/ExceptionContext.java
rename to remoting/src/main/java/org/apache/rocketmq/remoting/interceptor/ExceptionContext.java
index 192f249..bfb9d23 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/interceptor/ExceptionContext.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/interceptor/ExceptionContext.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.snode.interceptor;
+package org.apache.rocketmq.remoting.interceptor;
 import org.apache.rocketmq.remoting.RemotingChannel;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/interceptor/Interceptor.java b/remoting/src/main/java/org/apache/rocketmq/remoting/interceptor/Interceptor.java
similarity index 84%
rename from snode/src/main/java/org/apache/rocketmq/snode/interceptor/Interceptor.java
rename to remoting/src/main/java/org/apache/rocketmq/remoting/interceptor/Interceptor.java
index 18cf6b3..5baee83 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/interceptor/Interceptor.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/interceptor/Interceptor.java
@@ -14,11 +14,11 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.snode.interceptor;
+package org.apache.rocketmq.remoting.interceptor;
 public interface Interceptor {
-    void beforeSendMessage(RequestContext requestContext);
+    void beforeRequest(RequestContext requestContext);
 
-    void afterSendMessage(ResponseContext responseContext);
+    void afterRequest(ResponseContext responseContext);
 
     void onException(ExceptionContext exceptionContext);
 }
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/interceptor/InterceptorFactory.java b/remoting/src/main/java/org/apache/rocketmq/remoting/interceptor/InterceptorFactory.java
similarity index 59%
rename from snode/src/main/java/org/apache/rocketmq/snode/interceptor/InterceptorFactory.java
rename to remoting/src/main/java/org/apache/rocketmq/remoting/interceptor/InterceptorFactory.java
index e2b4332..16d5170 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/interceptor/InterceptorFactory.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/interceptor/InterceptorFactory.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.snode.interceptor;
+package org.apache.rocketmq.remoting.interceptor;
 
 import java.util.List;
 import org.apache.rocketmq.remoting.util.ServiceProvider;
@@ -26,21 +26,12 @@ public class InterceptorFactory {
         return ourInstance;
     }
 
-    private final String SEND_MESSAGE_INTERCEPTOR = "META-INF/service/org.apache.rocketmq.snode.interceptor.SendMessageInterceptor";
-
-    private final String CONSUME_MESSAGE_INTERCEPTOR = "META-INF/service/org.apache.rocketmq.snode.interceptor.ConsumeMessageInterceptor";
-
     private InterceptorFactory() {
     }
 
-    public List loadConsumeMessageInterceptors() {
-        List<Interceptor> consumeMessageInterceptors = ServiceProvider.loadServiceList(CONSUME_MESSAGE_INTERCEPTOR, Interceptor.class);
-        return consumeMessageInterceptors;
-    }
-
-    public List loadSendMessageInterceptors() {
-        List<Interceptor> sendMessageInterceptors = ServiceProvider.loadServiceList(SEND_MESSAGE_INTERCEPTOR, Interceptor.class);
-        return sendMessageInterceptors;
+    public List loadInterceptors(String servicePath) {
+        List<Interceptor> interceptors = ServiceProvider.loadServiceList(servicePath, Interceptor.class);
+        return interceptors;
     }
 
 }
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/interceptor/InterceptorGroup.java b/remoting/src/main/java/org/apache/rocketmq/remoting/interceptor/InterceptorGroup.java
similarity index 77%
rename from snode/src/main/java/org/apache/rocketmq/snode/interceptor/InterceptorGroup.java
rename to remoting/src/main/java/org/apache/rocketmq/remoting/interceptor/InterceptorGroup.java
index fcee7a5..de80021 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/interceptor/InterceptorGroup.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/interceptor/InterceptorGroup.java
@@ -14,28 +14,28 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.snode.interceptor;
+package org.apache.rocketmq.remoting.interceptor;
 import java.util.ArrayList;
 import java.util.List;
 
 public class InterceptorGroup {
-    private List<Interceptor> interceptors = new ArrayList<>();
+    private List<Interceptor> interceptors = new ArrayList<Interceptor>();
 
-    public void registerInterceptor(Interceptor sendMessageInterceptor) {
-        if (sendMessageInterceptor != null) {
-            interceptors.add(sendMessageInterceptor);
+    public void registerInterceptor(Interceptor interceptor) {
+        if (interceptor != null) {
+            interceptors.add(interceptor);
         }
     }
 
     public void beforeRequest(RequestContext requestContext) {
         for (Interceptor interceptor : interceptors) {
-            interceptor.beforeSendMessage(requestContext);
+            interceptor.beforeRequest(requestContext);
         }
     }
 
     public void afterRequest(ResponseContext responseContext) {
         for (Interceptor interceptor : interceptors) {
-            interceptor.afterSendMessage(responseContext);
+            interceptor.afterRequest(responseContext);
         }
     }
 
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/interceptor/RequestContext.java b/remoting/src/main/java/org/apache/rocketmq/remoting/interceptor/RequestContext.java
similarity index 96%
rename from snode/src/main/java/org/apache/rocketmq/snode/interceptor/RequestContext.java
rename to remoting/src/main/java/org/apache/rocketmq/remoting/interceptor/RequestContext.java
index 5f32e6b..a0fffbb 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/interceptor/RequestContext.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/interceptor/RequestContext.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.snode.interceptor;
+package org.apache.rocketmq.remoting.interceptor;
 import org.apache.rocketmq.remoting.RemotingChannel;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/interceptor/ResponseContext.java b/remoting/src/main/java/org/apache/rocketmq/remoting/interceptor/ResponseContext.java
similarity index 96%
rename from snode/src/main/java/org/apache/rocketmq/snode/interceptor/ResponseContext.java
rename to remoting/src/main/java/org/apache/rocketmq/remoting/interceptor/ResponseContext.java
index 9901981..a643b44 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/interceptor/ResponseContext.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/interceptor/ResponseContext.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.snode.interceptor;
+package org.apache.rocketmq.remoting.interceptor;
 import org.apache.rocketmq.remoting.RemotingChannel;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
index 1e0235d..1555959 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
@@ -468,7 +468,7 @@ public abstract class NettyRemotingAbstract {
     public void invokeAsyncImpl(final String addr, final Channel currentChannel, final RemotingCommand request,
         final long timeoutMillis,
         final InvokeCallback invokeCallback)
-        throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
+        throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException {
         final long beginStartTime = System.currentTimeMillis();
         boolean acquired = semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
         if (acquired) {
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/NettyRemotingClientAbstract.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/NettyRemotingClientAbstract.java
index 957b053..b61fc63 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/NettyRemotingClientAbstract.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/NettyRemotingClientAbstract.java
@@ -29,10 +29,8 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
-import java.util.Timer;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
@@ -40,17 +38,11 @@ import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
-import org.apache.rocketmq.remoting.InvokeCallback;
 import org.apache.rocketmq.remoting.common.RemotingHelper;
 import org.apache.rocketmq.remoting.common.RemotingUtil;
-import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
-import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
-import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
 import org.apache.rocketmq.remoting.netty.NettyEvent;
 import org.apache.rocketmq.remoting.netty.NettyEventType;
 import org.apache.rocketmq.remoting.netty.NettyRemotingAbstract;
-import org.apache.rocketmq.remoting.protocol.RemotingCommand;
-import org.apache.rocketmq.remoting.util.ThreadUtils;
 
 public abstract class NettyRemotingClientAbstract extends NettyRemotingAbstract {
     private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
@@ -179,7 +171,7 @@ public abstract class NettyRemotingClientAbstract extends NettyRemotingAbstract
     @Override
     protected Channel getAndCreateChannel(final String addr, long timeout) throws InterruptedException {
         if (null == addr) {
-            return getAndCreateNameserverChannel(timeout);
+            return getAndCreateNameServerChannel(timeout);
         }
 
         ChannelWrapper cw = this.channelTables.get(addr);
@@ -190,7 +182,7 @@ public abstract class NettyRemotingClientAbstract extends NettyRemotingAbstract
         return this.createChannel(addr, timeout);
     }
 
-    private Channel getAndCreateNameserverChannel(long timeout) throws InterruptedException {
+    private Channel getAndCreateNameServerChannel(long timeout) throws InterruptedException {
         String addr = this.namesrvAddrChoosed.get();
         if (addr != null) {
             ChannelWrapper cw = this.channelTables.get(addr);
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/rocketmq/NettyDecoder.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/rocketmq/NettyDecoder.java
index 70858a9..9240630 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/rocketmq/NettyDecoder.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/rocketmq/NettyDecoder.java
@@ -20,12 +20,11 @@ import io.netty.buffer.ByteBuf;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
 import java.nio.ByteBuffer;
-import org.apache.rocketmq.remoting.common.RemotingHelper;
-import org.apache.rocketmq.remoting.common.RemotingUtil;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
+import org.apache.rocketmq.remoting.common.RemotingUtil;
 import org.apache.rocketmq.remoting.netty.CodecHelper;
-import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 
 public class NettyDecoder extends LengthFieldBasedFrameDecoder {
     private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/rocketmq/NettyRemotingClient.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/rocketmq/NettyRemotingClient.java
index a5790df..5e91b7c 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/rocketmq/NettyRemotingClient.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/rocketmq/NettyRemotingClient.java
@@ -61,7 +61,6 @@ public class NettyRemotingClient extends NettyRemotingClientAbstract implements
 
     private ExecutorService publicExecutor;
 
-    private ExecutorService asyncExecutor;
     /**
      * Invoke the callback methods in this executor when process response.
      */
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/rocketmq/NettyRemotingServer.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/rocketmq/NettyRemotingServer.java
index 8f81ac2..f7ff842 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/rocketmq/NettyRemotingServer.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/rocketmq/NettyRemotingServer.java
@@ -20,7 +20,6 @@ import io.netty.bootstrap.ServerBootstrap;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.PooledByteBufAllocator;
 import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelFutureListener;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelOption;
@@ -46,6 +45,8 @@ import org.apache.rocketmq.remoting.InvokeCallback;
 import org.apache.rocketmq.remoting.RPCHook;
 import org.apache.rocketmq.remoting.RemotingChannel;
 import org.apache.rocketmq.remoting.RemotingServer;
+import org.apache.rocketmq.remoting.RequestProcessor;
+import org.apache.rocketmq.remoting.ServerConfig;
 import org.apache.rocketmq.remoting.common.Pair;
 import org.apache.rocketmq.remoting.common.RemotingHelper;
 import org.apache.rocketmq.remoting.common.TlsMode;
@@ -53,10 +54,7 @@ import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
 import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
 import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
 import org.apache.rocketmq.remoting.netty.FileRegionEncoder;
-import org.apache.rocketmq.remoting.netty.NettyChannelHandlerContextImpl;
 import org.apache.rocketmq.remoting.netty.NettyChannelImpl;
-import org.apache.rocketmq.remoting.RequestProcessor;
-import org.apache.rocketmq.remoting.ServerConfig;
 import org.apache.rocketmq.remoting.netty.TlsHelper;
 import org.apache.rocketmq.remoting.netty.TlsSystemConfig;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java b/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java
index f61b662..944277e 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java
@@ -28,23 +28,22 @@ import org.apache.rocketmq.common.protocol.RequestCode;
 import org.apache.rocketmq.common.utils.ThreadUtils;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.ClientConfig;
 import org.apache.rocketmq.remoting.RemotingClient;
 import org.apache.rocketmq.remoting.RemotingClientFactory;
 import org.apache.rocketmq.remoting.RemotingServer;
 import org.apache.rocketmq.remoting.RemotingServerFactory;
-import org.apache.rocketmq.remoting.ClientConfig;
 import org.apache.rocketmq.remoting.ServerConfig;
 import org.apache.rocketmq.snode.client.ClientHousekeepingService;
 import org.apache.rocketmq.snode.client.ConsumerIdsChangeListener;
 import org.apache.rocketmq.snode.client.ConsumerManager;
 import org.apache.rocketmq.snode.client.DefaultConsumerIdsChangeListener;
 import org.apache.rocketmq.snode.client.ProducerManager;
-import org.apache.rocketmq.snode.client.PushSessionManager;
 import org.apache.rocketmq.snode.client.SubscriptionGroupManager;
 import org.apache.rocketmq.snode.config.SnodeConfig;
-import org.apache.rocketmq.snode.interceptor.InterceptorFactory;
-import org.apache.rocketmq.snode.interceptor.InterceptorGroup;
-import org.apache.rocketmq.snode.interceptor.Interceptor;
+import org.apache.rocketmq.remoting.interceptor.Interceptor;
+import org.apache.rocketmq.remoting.interceptor.InterceptorFactory;
+import org.apache.rocketmq.remoting.interceptor.InterceptorGroup;
 import org.apache.rocketmq.snode.offset.ConsumerOffsetManager;
 import org.apache.rocketmq.snode.processor.ConsumerManageProcessor;
 import org.apache.rocketmq.snode.processor.HeartbeatProcessor;
@@ -86,7 +85,6 @@ public class SnodeController {
     private HeartbeatProcessor hearbeatProcessor;
     private InterceptorGroup consumeMessageInterceptorGroup;
     private InterceptorGroup sendMessageInterceptorGroup;
-    private PushSessionManager pushSessionManager;
     private PushService pushService;
 
     private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
@@ -101,14 +99,14 @@ public class SnodeController {
         this.enodeService = new EnodeServiceImpl(this);
         this.nnodeService = new NnodeServiceImpl(this);
         this.scheduledService = new ScheduledServiceImpl(this);
-        this.remotingClient = RemotingClientFactory.createInstance().init(this.getNettyClientConfig(), null);
+        this.remotingClient = RemotingClientFactory.getInstance().createRemotingClient().init(this.getNettyClientConfig(), null);
 
         this.sendMessageExecutor = ThreadUtils.newThreadPoolExecutor(
             snodeConfig.getSnodeSendMessageMinPoolSize(),
             snodeConfig.getSnodeSendMessageMaxPoolSize(),
             3000,
             TimeUnit.MILLISECONDS,
-            new ArrayBlockingQueue<Runnable>(snodeConfig.getSnodeSendThreadPoolQueueCapacity()),
+            new ArrayBlockingQueue<>(snodeConfig.getSnodeSendThreadPoolQueueCapacity()),
             "SnodeSendMessageThread",
             false);
 
@@ -117,7 +115,7 @@ public class SnodeController {
             snodeConfig.getSnodeSendMessageMaxPoolSize(),
             3000,
             TimeUnit.MILLISECONDS,
-            new ArrayBlockingQueue<Runnable>(snodeConfig.getSnodeSendThreadPoolQueueCapacity()),
+            new ArrayBlockingQueue<>(snodeConfig.getSnodeSendThreadPoolQueueCapacity()),
             "SnodePullMessageThread",
             false);
 
@@ -126,7 +124,7 @@ public class SnodeController {
             snodeConfig.getSnodeHeartBeatMaxPoolSize(),
             1000 * 60,
             TimeUnit.MILLISECONDS,
-            new ArrayBlockingQueue<Runnable>(snodeConfig.getSnodeHeartBeatThreadPoolQueueCapacity()),
+            new ArrayBlockingQueue<>(snodeConfig.getSnodeHeartBeatThreadPoolQueueCapacity()),
             "SnodeHeartbeatThread",
             true);
 
@@ -135,7 +133,7 @@ public class SnodeController {
             snodeConfig.getSnodeSendMessageMaxPoolSize(),
             3000,
             TimeUnit.MILLISECONDS,
-            new ArrayBlockingQueue<Runnable>(snodeConfig.getSnodeSendThreadPoolQueueCapacity()),
+            new ArrayBlockingQueue<>(snodeConfig.getSnodeSendThreadPoolQueueCapacity()),
             "SnodePullMessageThread",
             false);
 
@@ -144,7 +142,7 @@ public class SnodeController {
             snodeConfig.getSnodeSendMessageMaxPoolSize(),
             3000,
             TimeUnit.MILLISECONDS,
-            new ArrayBlockingQueue<Runnable>(snodeConfig.getSnodeSendThreadPoolQueueCapacity()),
+            new ArrayBlockingQueue<>(snodeConfig.getSnodeSendThreadPoolQueueCapacity()),
             "ConsumerManagerThread",
             false);
 
@@ -164,7 +162,6 @@ public class SnodeController {
         this.sendMessageProcessor = new SendMessageProcessor(this);
         this.hearbeatProcessor = new HeartbeatProcessor(this);
         this.pullMessageProcessor = new PullMessageProcessor(this);
-        this.pushSessionManager = new PushSessionManager();
         this.pushService = new PushServiceImpl(this);
 
     }
@@ -174,22 +171,22 @@ public class SnodeController {
     }
 
     public boolean initialize() {
-        this.snodeServer = RemotingServerFactory.createInstance().init(this.nettyServerConfig, this.clientHousekeepingService);
+        this.snodeServer = RemotingServerFactory.getInstance().createRemotingServer().init(this.nettyServerConfig, this.clientHousekeepingService);
         this.registerProcessor();
         initInterceptorGroup();
         return true;
     }
 
     private void initInterceptorGroup() {
-        List<Interceptor> consumeMessageInterceptors = InterceptorFactory.getInstance().loadConsumeMessageInterceptors();
-        if (consumeMessageInterceptors != null) {
+        List<Interceptor> consumeMessageInterceptors = InterceptorFactory.getInstance().loadInterceptors(this.snodeConfig.getConsumeMessageInterceptorPath());
+        if (consumeMessageInterceptors != null && consumeMessageInterceptors.size() > 0) {
             this.consumeMessageInterceptorGroup = new InterceptorGroup();
             for (Interceptor interceptor : consumeMessageInterceptors) {
                 this.consumeMessageInterceptorGroup.registerInterceptor(interceptor);
             }
         }
-        List<Interceptor> sendMessageInterceptors = InterceptorFactory.getInstance().loadSendMessageInterceptors();
-        if (sendMessageInterceptors != null) {
+        List<Interceptor> sendMessageInterceptors = InterceptorFactory.getInstance().loadInterceptors(this.snodeConfig.getSendMessageInterceptorPath());
+        if (sendMessageInterceptors != null && sendMessageInterceptors.size() > 0) {
             this.sendMessageInterceptorGroup = new InterceptorGroup();
             for (Interceptor interceptor : sendMessageInterceptors) {
                 this.sendMessageInterceptorGroup.registerInterceptor(interceptor);
@@ -275,10 +272,6 @@ public class SnodeController {
         return sendMessageInterceptorGroup;
     }
 
-    public PushSessionManager getPushSessionManager() {
-        return pushSessionManager;
-    }
-
     public PushService getPushService() {
         return pushService;
     }
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/client/ConsumerGroupInfo.java b/snode/src/main/java/org/apache/rocketmq/snode/client/ConsumerGroupInfo.java
index c3cd21f..c8140c5 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/client/ConsumerGroupInfo.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/client/ConsumerGroupInfo.java
@@ -16,7 +16,6 @@
  */
 package org.apache.rocketmq.snode.client;
 
-import java.nio.channels.Channel;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/client/ConsumerManager.java b/snode/src/main/java/org/apache/rocketmq/snode/client/ConsumerManager.java
index f44da69..dc3bb02 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/client/ConsumerManager.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/client/ConsumerManager.java
@@ -85,8 +85,7 @@ public class ConsumerManager {
                 if (info.getChannelInfoTable().isEmpty()) {
                     ConsumerGroupInfo remove = this.consumerTable.remove(next.getKey());
                     if (remove != null) {
-                        log.info("Unregister consumer ok, no any connection, and remove consumer group, {}",
-                            next.getKey());
+                        log.info("Unregister consumer ok, no any connection, and remove consumer group, {}", next.getKey());
                         this.consumerIdsChangeListener.handle(ConsumerGroupEvent.UNREGISTER, next.getKey());
                     }
                 }
@@ -208,7 +207,7 @@ public class ConsumerManager {
         }
     }
 
-    public ClientChannelInfo getClientInfoTable(String topic, long queueId) {
+    public ClientChannelInfo getClientInfoTable(String topic, Integer queueId) {
         ConcurrentHashMap<Integer, ClientChannelInfo> clientChannelInfoMap = this.topicConsumerTable.get(topic);
         if (clientChannelInfoMap != null) {
             return clientChannelInfoMap.get(queueId);
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/client/ProducerManager.java b/snode/src/main/java/org/apache/rocketmq/snode/client/ProducerManager.java
index b018dc3..dc2bb21 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/client/ProducerManager.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/client/ProducerManager.java
@@ -16,11 +16,8 @@
  */
 package org.apache.rocketmq.snode.client;
 
-import io.netty.channel.Channel;
-import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
-import java.util.List;
 import java.util.Map.Entry;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
@@ -31,7 +28,6 @@ import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
 import org.apache.rocketmq.remoting.RemotingChannel;
 import org.apache.rocketmq.remoting.common.RemotingHelper;
-import org.apache.rocketmq.remoting.common.RemotingUtil;
 
 public class ProducerManager {
     private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/client/PushSessionManager.java b/snode/src/main/java/org/apache/rocketmq/snode/client/PushSessionManager.java
deleted file mode 100644
index f4228c5..0000000
--- a/snode/src/main/java/org/apache/rocketmq/snode/client/PushSessionManager.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.rocketmq.snode.client;
-
-import java.util.concurrent.ConcurrentHashMap;
-
-public class PushSessionManager {
-    private final ConcurrentHashMap<String/*Topic*/, ConcurrentHashMap<Integer/*QueueId*/, ClientChannelInfo>> topicConsumerTable = new ConcurrentHashMap<>(2048);
-
-    public void updateTopicConsumerTable(String topic, int queueId, ClientChannelInfo clientChannelInfo) {
-        ConcurrentHashMap<Integer, ClientChannelInfo> clientChannelInfoMap = this.topicConsumerTable.get(topic);
-
-        if (clientChannelInfoMap == null) {
-            clientChannelInfoMap = new ConcurrentHashMap<>();
-            ConcurrentHashMap prev = this.topicConsumerTable.putIfAbsent(topic, clientChannelInfoMap);
-            if (prev != null) {
-                clientChannelInfoMap = prev;
-            }
-        }
-        clientChannelInfoMap.put(queueId, clientChannelInfo);
-    }
-
-    public ClientChannelInfo getClientInfoTable(String topic, long queueId) {
-        ConcurrentHashMap<Integer, ClientChannelInfo> clientChannelInfoMap = this.topicConsumerTable.get(topic);
-        if (clientChannelInfoMap != null) {
-            return clientChannelInfoMap.get(queueId);
-        }
-        return null;
-    }
-
-    public void removeConsumerTopicTable(String topic, Integer queueId, ClientChannelInfo clientChannelInfo) {
-        ConcurrentHashMap<Integer, ClientChannelInfo> clientChannelInfoMap = this.topicConsumerTable.get(topic);
-        if (clientChannelInfoMap != null) {
-            ClientChannelInfo old = clientChannelInfoMap.get(queueId);
-            //TODO Thread safe issue: wait for the next heartbeat
-            if (old == clientChannelInfo) {
-                clientChannelInfoMap.remove(queueId, clientChannelInfo);
-            }
-        }
-
-    }
-}
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/client/SubscriptionGroupManager.java b/snode/src/main/java/org/apache/rocketmq/snode/client/SubscriptionGroupManager.java
index 8b977d9..85ed8b1 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/client/SubscriptionGroupManager.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/client/SubscriptionGroupManager.java
@@ -20,7 +20,6 @@ import java.util.Iterator;
 import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import org.apache.rocketmq.common.ConfigManager;
 import org.apache.rocketmq.common.DataVersion;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.constant.LoggerName;
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/config/SnodeConfig.java b/snode/src/main/java/org/apache/rocketmq/snode/config/SnodeConfig.java
index af14f0e..cd5beab 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/config/SnodeConfig.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/config/SnodeConfig.java
@@ -67,6 +67,10 @@ public class SnodeConfig {
 
     private int snodePushMessageThreadPoolQueueCapacity = 10000;
 
+    private final String sendMessageInterceptorPath = "META-INF/service/org.apache.rocketmq.snode.interceptor.SendMessageInterceptor";
+
+    private final String consumeMessageInterceptorPath = "META-INF/service/org.apache.rocketmq.snode.interceptor.ConsumeMessageInterceptor";
+
 
     private int listenPort = 11911;
 
@@ -264,4 +268,12 @@ public class SnodeConfig {
     public void setSnodePushMessageThreadPoolQueueCapacity(int snodePushMessageThreadPoolQueueCapacity) {
         this.snodePushMessageThreadPoolQueueCapacity = snodePushMessageThreadPoolQueueCapacity;
     }
+
+    public String getSendMessageInterceptorPath() {
+        return sendMessageInterceptorPath;
+    }
+
+    public String getConsumeMessageInterceptorPath() {
+        return consumeMessageInterceptorPath;
+    }
 }
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/processor/HeartbeatProcessor.java b/snode/src/main/java/org/apache/rocketmq/snode/processor/HeartbeatProcessor.java
index f8ba684..2ea53ea 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/processor/HeartbeatProcessor.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/processor/HeartbeatProcessor.java
@@ -16,7 +16,6 @@
  */
 package org.apache.rocketmq.snode.processor;
 
-import java.util.List;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.protocol.RequestCode;
 import org.apache.rocketmq.common.protocol.ResponseCode;
@@ -31,6 +30,8 @@ import org.apache.rocketmq.logging.InternalLoggerFactory;
 import org.apache.rocketmq.remoting.RemotingChannel;
 import org.apache.rocketmq.remoting.RequestProcessor;
 import org.apache.rocketmq.remoting.common.RemotingHelper;
+import org.apache.rocketmq.remoting.netty.NettyChannelHandlerContextImpl;
+import org.apache.rocketmq.remoting.netty.NettyChannelImpl;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 import org.apache.rocketmq.snode.SnodeController;
 import org.apache.rocketmq.snode.client.ClientChannelInfo;
@@ -60,7 +61,7 @@ public class HeartbeatProcessor implements RequestProcessor {
     private RemotingCommand heartbeat(RemotingChannel remotingChannel, RemotingCommand request) {
         HeartbeatData heartbeatData = HeartbeatData.decode(request.getBody(), HeartbeatData.class);
         ClientChannelInfo clientChannelInfo = new ClientChannelInfo(
-            remotingChannel,
+            new NettyChannelImpl((((NettyChannelHandlerContextImpl) remotingChannel).getChannelHandlerContext().channel())),
             heartbeatData.getClientID(),
             request.getLanguage(),
             request.getVersion()
@@ -99,7 +100,7 @@ public class HeartbeatProcessor implements RequestProcessor {
                     );
                 }
 
-                if (subscriptionGroupConfig.isRealPushEnable()) {
+                if (data.isRealPushEnable()) {
                     this.snodeController.getConsumerManager().updateTopicConsumerTable(data.getSubscriptionDataSet(), clientChannelInfo);
                 }
             }
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/processor/PullMessageProcessor.java b/snode/src/main/java/org/apache/rocketmq/snode/processor/PullMessageProcessor.java
index 2abc196..4be3450 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/processor/PullMessageProcessor.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/processor/PullMessageProcessor.java
@@ -15,6 +15,7 @@
  * limitations under the License.
  */
 package org.apache.rocketmq.snode.processor;
+
 import java.util.concurrent.CompletableFuture;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.help.FAQUrl;
@@ -24,6 +25,7 @@ import org.apache.rocketmq.common.protocol.header.PullMessageResponseHeader;
 import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
 import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
 import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
+import org.apache.rocketmq.common.sysflag.PullSysFlag;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
 import org.apache.rocketmq.remoting.RemotingChannel;
@@ -32,9 +34,9 @@ import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 import org.apache.rocketmq.snode.SnodeController;
 import org.apache.rocketmq.snode.client.ConsumerGroupInfo;
-import org.apache.rocketmq.snode.interceptor.ExceptionContext;
-import org.apache.rocketmq.snode.interceptor.RequestContext;
-import org.apache.rocketmq.snode.interceptor.ResponseContext;
+import org.apache.rocketmq.remoting.interceptor.ExceptionContext;
+import org.apache.rocketmq.remoting.interceptor.RequestContext;
+import org.apache.rocketmq.remoting.interceptor.ResponseContext;
 
 public class PullMessageProcessor implements RequestProcessor {
     private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME);
@@ -63,11 +65,9 @@ public class PullMessageProcessor implements RequestProcessor {
     private RemotingCommand pullMessage(RemotingChannel remotingChannel,
         RemotingCommand request) throws RemotingCommandException {
         RemotingCommand response = RemotingCommand.createResponseCommand(PullMessageResponseHeader.class);
+        response.setOpaque(request.getOpaque());
         final PullMessageRequestHeader requestHeader =
             (PullMessageRequestHeader) request.decodeCommandCustomHeader(PullMessageRequestHeader.class);
-
-        ConsumerGroupInfo consumerGroupInfo = snodeController.getConsumerManager().getConsumerGroupInfo(requestHeader.getConsumerGroup());
-
         SubscriptionGroupConfig subscriptionGroupConfig =
             this.snodeController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getConsumerGroup());
         if (null == subscriptionGroupConfig) {
@@ -82,29 +82,53 @@ public class PullMessageProcessor implements RequestProcessor {
             return response;
         }
 
-        if (!subscriptionGroupConfig.isConsumeBroadcastEnable()
-            && consumerGroupInfo.getMessageModel() == MessageModel.BROADCASTING) {
-            response.setCode(ResponseCode.NO_PERMISSION);
-            response.setRemark("The consumer group[" + requestHeader.getConsumerGroup() + "] can not consume by broadcast way");
-            return response;
-        }
-        if ((consumerGroupInfo == null) || (consumerGroupInfo.findSubscriptionData(requestHeader.getTopic()) == null)) {
-            log.warn("The consumer's subscription not exist, group: {}, topic:{}", requestHeader.getConsumerGroup(), requestHeader.getTopic());
-            response.setCode(ResponseCode.SUBSCRIPTION_NOT_EXIST);
-            response.setRemark("The consumer's subscription not exist" + FAQUrl.suggestTodo(FAQUrl.SAME_GROUP_DIFFERENT_TOPIC));
+        final boolean hasSubscriptionFlag = PullSysFlag.hasSubscriptionFlag(requestHeader.getSysFlag());
+
+        if (requestHeader.getQueueId() < 0) {
+            String errorInfo = String.format("QueueId[%d] is illegal, topic:[%s] consumer:[%s]",
+                requestHeader.getQueueId(), requestHeader.getTopic(), remotingChannel.remoteAddress());
+            log.warn(errorInfo);
+            response.setCode(ResponseCode.SYSTEM_ERROR);
+            response.setRemark(errorInfo);
             return response;
         }
-        SubscriptionData subscriptionData = consumerGroupInfo.findSubscriptionData(requestHeader.getTopic());
 
-        if (subscriptionData.getSubVersion() < requestHeader.getSubVersion()) {
-            log.warn("The broker's subscription is not latest, group: {} {}", requestHeader.getConsumerGroup(),
-                subscriptionData.getSubString());
-            response.setCode(ResponseCode.SUBSCRIPTION_NOT_LATEST);
-            response.setRemark("The consumer's subscription not latest");
-            return response;
+        SubscriptionData subscriptionData;
+        if (!hasSubscriptionFlag) {
+            ConsumerGroupInfo consumerGroupInfo =
+                this.snodeController.getConsumerManager().getConsumerGroupInfo(requestHeader.getConsumerGroup());
+            if (null == consumerGroupInfo) {
+                log.warn("The consumer's group info not exist, group: {}", requestHeader.getConsumerGroup());
+                response.setCode(ResponseCode.SUBSCRIPTION_NOT_EXIST);
+                response.setRemark("The consumer's group info not exist" + FAQUrl.suggestTodo(FAQUrl.SAME_GROUP_DIFFERENT_TOPIC));
+                return response;
+            }
+
+            if (!subscriptionGroupConfig.isConsumeBroadcastEnable()
+                && consumerGroupInfo.getMessageModel() == MessageModel.BROADCASTING) {
+                response.setCode(ResponseCode.NO_PERMISSION);
+                response.setRemark("The consumer group[" + requestHeader.getConsumerGroup() + "] can not consume by broadcast way");
+                return response;
+            }
+
+            subscriptionData = consumerGroupInfo.findSubscriptionData(requestHeader.getTopic());
+            if (null == subscriptionData) {
+                log.warn("The consumer's subscription not exist, group: {}, topic:{}", requestHeader.getConsumerGroup(), requestHeader.getTopic());
+                response.setCode(ResponseCode.SUBSCRIPTION_NOT_EXIST);
+                response.setRemark("The consumer's subscription not exist" + FAQUrl.suggestTodo(FAQUrl.SAME_GROUP_DIFFERENT_TOPIC));
+                return response;
+            }
+
+            if (subscriptionData.getSubVersion() < requestHeader.getSubVersion()) {
+                log.warn("The broker's subscription is not latest, group: {} {}", requestHeader.getConsumerGroup(),
+                    subscriptionData.getSubString());
+                response.setCode(ResponseCode.SUBSCRIPTION_NOT_LATEST);
+                response.setRemark("The consumer's subscription not latest");
+                return response;
+            }
         }
 
-        CompletableFuture<RemotingCommand> responseFuture = snodeController.getEnodeService().pullMessage(request);
+        CompletableFuture<RemotingCommand> responseFuture = snodeController.getEnodeService().pullMessage(requestHeader.getEnodeName(), request);
         responseFuture.whenComplete((data, ex) -> {
             if (ex == null) {
                 if (this.snodeController.getConsumeMessageInterceptorGroup() != null) {
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/processor/SendMessageProcessor.java b/snode/src/main/java/org/apache/rocketmq/snode/processor/SendMessageProcessor.java
index 9477895..ecda813 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/processor/SendMessageProcessor.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/processor/SendMessageProcessor.java
@@ -15,17 +15,22 @@
  * limitations under the License.
  */
 package org.apache.rocketmq.snode.processor;
+
 import java.util.concurrent.CompletableFuture;
 import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.protocol.RequestCode;
+import org.apache.rocketmq.common.protocol.header.ConsumerSendMsgBackRequestHeader;
+import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeaderV2;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
 import org.apache.rocketmq.remoting.RemotingChannel;
 import org.apache.rocketmq.remoting.RequestProcessor;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 import org.apache.rocketmq.snode.SnodeController;
-import org.apache.rocketmq.snode.interceptor.ExceptionContext;
-import org.apache.rocketmq.snode.interceptor.RequestContext;
-import org.apache.rocketmq.snode.interceptor.ResponseContext;
+import org.apache.rocketmq.remoting.interceptor.ExceptionContext;
+import org.apache.rocketmq.remoting.interceptor.RequestContext;
+import org.apache.rocketmq.remoting.interceptor.ResponseContext;
 
 public class SendMessageProcessor implements RequestProcessor {
     private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME);
@@ -37,12 +42,30 @@ public class SendMessageProcessor implements RequestProcessor {
     }
 
     @Override
-    public RemotingCommand processRequest(RemotingChannel remotingChannel, RemotingCommand request) {
+    public RemotingCommand processRequest(RemotingChannel remotingChannel,
+        RemotingCommand request) throws RemotingCommandException {
         if (this.snodeController.getSendMessageInterceptorGroup() != null) {
             RequestContext requestContext = new RequestContext(request, remotingChannel);
             this.snodeController.getSendMessageInterceptorGroup().beforeRequest(requestContext);
         }
-        CompletableFuture<RemotingCommand> responseFuture = snodeController.getEnodeService().sendMessage(request);
+        String enodeName;
+        SendMessageRequestHeaderV2 sendMessageRequestHeaderV2 = null;
+        boolean isSendBack = false;
+        if (request.getCode() == RequestCode.SEND_MESSAGE_V2) {
+            sendMessageRequestHeaderV2 = (SendMessageRequestHeaderV2) request.decodeCommandCustomHeader(SendMessageRequestHeaderV2.class);
+            enodeName = sendMessageRequestHeaderV2.getN();
+        } else {
+            isSendBack = true;
+            ConsumerSendMsgBackRequestHeader consumerSendMsgBackRequestHeader = (ConsumerSendMsgBackRequestHeader) request.decodeCommandCustomHeader(ConsumerSendMsgBackRequestHeader.class);
+            enodeName = consumerSendMsgBackRequestHeader.getEnodeName();
+        }
+
+        CompletableFuture<RemotingCommand> responseFuture = snodeController.getEnodeService().sendMessage(enodeName, request);
+
+        final String topic = sendMessageRequestHeaderV2.getB();
+        final Integer queueId = sendMessageRequestHeaderV2.getE();
+        final byte[] message = request.getBody();
+        final boolean isNeedPush = !isSendBack;
         responseFuture.whenComplete((data, ex) -> {
             if (ex == null) {
                 if (this.snodeController.getSendMessageInterceptorGroup() != null) {
@@ -50,6 +73,9 @@ public class SendMessageProcessor implements RequestProcessor {
                     this.snodeController.getSendMessageInterceptorGroup().afterRequest(responseContext);
                 }
                 remotingChannel.reply(data);
+                if (isNeedPush) {
+                    this.snodeController.getPushService().pushMessage(topic, queueId, message, data);
+                }
             } else {
                 if (this.snodeController.getSendMessageInterceptorGroup() != null) {
                     ExceptionContext exceptionContext = new ExceptionContext(request, remotingChannel, ex, null);
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/service/EnodeService.java b/snode/src/main/java/org/apache/rocketmq/snode/service/EnodeService.java
index 0105027..d6e307f 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/service/EnodeService.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/service/EnodeService.java
@@ -15,6 +15,7 @@
  * limitations under the License.
  */
 package org.apache.rocketmq.snode.service;
+
 import java.util.concurrent.CompletableFuture;
 import org.apache.rocketmq.client.exception.MQBrokerException;
 import org.apache.rocketmq.common.TopicConfig;
@@ -29,13 +30,14 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 public interface EnodeService {
     void sendHearbeat(RemotingCommand remotingCommand);
 
-    CompletableFuture<RemotingCommand> sendMessage(final RemotingCommand request);
+    CompletableFuture<RemotingCommand> sendMessage(final String enodeName, final RemotingCommand request);
 
-    CompletableFuture<RemotingCommand> pullMessage(final RemotingCommand request);
+    CompletableFuture<RemotingCommand> pullMessage(final String enodeName, final RemotingCommand request);
 
     void notifyConsumerIdsChanged(final RemotingChannel channel, final String consumerGroup);
 
-    RemotingCommand creatTopic(String enodeName, TopicConfig topicConfig)throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException ;
+    RemotingCommand creatTopic(String enodeName,
+        TopicConfig topicConfig) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException;
 
     void updateEnodeAddr(String clusterName) throws InterruptedException, RemotingTimeoutException,
         RemotingSendRequestException, RemotingConnectException, MQBrokerException;
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/service/PushService.java b/snode/src/main/java/org/apache/rocketmq/snode/service/PushService.java
index 41fc4de..98bd41a 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/service/PushService.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/service/PushService.java
@@ -16,13 +16,15 @@
  */
 package org.apache.rocketmq.snode.service;
 
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+
 public interface PushService {
     boolean registerPushSession(String consumerGroup);
 
     void unregisterPushSession(String consumerGroup);
 
-    void pushMessage(final String messageId, final byte[] message, final Integer queueId, final String topic,
-        final long queueOffset);
+    void pushMessage(final String topic, final Integer queueId, final byte[] message,
+        final RemotingCommand response);
 
     void start();
 
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/EnodeServiceImpl.java b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/EnodeServiceImpl.java
index f6d157a..55866a2 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/EnodeServiceImpl.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/EnodeServiceImpl.java
@@ -15,6 +15,7 @@
  * limitations under the License.
  */
 package org.apache.rocketmq.snode.service.impl;
+
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
@@ -79,13 +80,12 @@ public class EnodeServiceImpl implements EnodeService {
     }
 
     @Override
-    public CompletableFuture<RemotingCommand> pullMessage(RemotingCommand request) {
+    public CompletableFuture<RemotingCommand> pullMessage(final String enodeName, final RemotingCommand request) {
 
         CompletableFuture<RemotingCommand> future = new CompletableFuture<>();
         try {
-            final PullMessageRequestHeader requestHeader =
-                (PullMessageRequestHeader) request.decodeCommandCustomHeader(PullMessageRequestHeader.class);
-            this.snodeController.getRemotingClient().invokeAsync(requestHeader.getEnodeAddr(), request, SnodeConstant.CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND, new InvokeCallback() {
+            String enodeAddress = this.snodeController.getNnodeService().getAddressByEnodeName(enodeName, false);
+            this.snodeController.getRemotingClient().invokeAsync(enodeAddress, request, SnodeConstant.CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND, new InvokeCallback() {
                 @Override
                 public void operationComplete(ResponseFuture responseFuture) {
                     RemotingCommand response = responseFuture.getResponseCommand();
@@ -110,17 +110,9 @@ public class EnodeServiceImpl implements EnodeService {
     }
 
     @Override
-    public CompletableFuture<RemotingCommand> sendMessage(RemotingCommand request) {
+    public CompletableFuture<RemotingCommand> sendMessage(String enodeName, RemotingCommand request) {
         CompletableFuture<RemotingCommand> future = new CompletableFuture<>();
         try {
-            String enodeName;
-            if (request.getCode() == RequestCode.SEND_MESSAGE_V2) {
-                SendMessageRequestHeaderV2 sendMessageRequestHeaderV2 = (SendMessageRequestHeaderV2) request.decodeCommandCustomHeader(SendMessageRequestHeaderV2.class);
-                enodeName = sendMessageRequestHeaderV2.getN();
-            } else {
-                ConsumerSendMsgBackRequestHeader consumerSendMsgBackRequestHeader = (ConsumerSendMsgBackRequestHeader) request.decodeCommandCustomHeader(ConsumerSendMsgBackRequestHeader.class);
-                enodeName = consumerSendMsgBackRequestHeader.getEnodeName();
-            }
             String enodeAddress = this.snodeController.getNnodeService().getAddressByEnodeName(enodeName, false);
             this.snodeController.getRemotingClient().invokeAsync(enodeAddress, request, SnodeConstant.defaultTimeoutMills, (responseFuture) -> {
                 future.complete(responseFuture.getResponseCommand());
@@ -149,7 +141,7 @@ public class EnodeServiceImpl implements EnodeService {
         try {
             this.snodeController.getSnodeServer().invokeOneway(channel, request, SnodeConstant.oneWaytimeout);
         } catch (Exception e) {
-            log.error("NotifyConsumerIdsChanged exception, " + consumerGroup, e.getMessage());
+            log.error("NotifyConsumerIdsChanged consumer group: {} exception ", consumerGroup, e);
         }
     }
 
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/PushServiceImpl.java b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/PushServiceImpl.java
index ffaa4df..0476a6b 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/PushServiceImpl.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/PushServiceImpl.java
@@ -21,7 +21,9 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.protocol.RequestCode;
 import org.apache.rocketmq.common.protocol.header.PushMessageHeader;
+import org.apache.rocketmq.common.protocol.header.SendMessageResponseHeader;
 import org.apache.rocketmq.common.utils.ThreadUtils;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
@@ -45,57 +47,55 @@ public class PushServiceImpl implements PushService {
             this.snodeController.getSnodeConfig().getSnodePushMessageMaxPoolSize(),
             3000,
             TimeUnit.MILLISECONDS,
-            new ArrayBlockingQueue<Runnable>(this.snodeController.getSnodeConfig().getSnodeSendThreadPoolQueueCapacity()),
+            new ArrayBlockingQueue<>(this.snodeController.getSnodeConfig().getSnodeSendThreadPoolQueueCapacity()),
             "SnodePushMessageThread",
             false);
     }
 
     public class PushTask implements Runnable {
-        private AtomicBoolean canceled;
-        private final String messageId;
+        private AtomicBoolean canceled = new AtomicBoolean(false);
         private final byte[] message;
         private final Integer queueId;
         private final String topic;
-        private final long queueOffset;
+        private final RemotingCommand response;
 
-        public PushTask(final String messageId, final byte[] message, final Integer queueId, final String topic,
-            final long queueOffset) {
-            this.messageId = messageId;
+        public PushTask(final String topic, final Integer queueId, final byte[] message,
+            final RemotingCommand response) {
             this.message = message;
             this.queueId = queueId;
             this.topic = topic;
-            this.queueOffset = queueOffset;
+            this.response = response;
         }
 
         @Override
         public void run() {
             if (!canceled.get()) {
-                PushMessageHeader pushMessageHeader = new PushMessageHeader();
-                pushMessageHeader.setMessageId(this.messageId);
-                pushMessageHeader.setQueueOffset(queueOffset);
-                pushMessageHeader.setTopic(topic);
-                pushMessageHeader.setQueueId(queueId);
-                RemotingCommand pushMessage = RemotingCommand.createResponseCommand(PushMessageHeader.class);
-                pushMessage.setBody(message);
-                pushMessage.setCustomHeader(pushMessageHeader);
                 try {
-                    ClientChannelInfo clientChannelInfo = snodeController.getPushSessionManager().getClientInfoTable(topic, queueId);
+                    SendMessageResponseHeader sendMessageResponseHeader = (SendMessageResponseHeader) response.decodeCommandCustomHeader(SendMessageResponseHeader.class);
+                    PushMessageHeader pushMessageHeader = new PushMessageHeader();
+                    pushMessageHeader.setQueueOffset(sendMessageResponseHeader.getQueueOffset());
+                    pushMessageHeader.setTopic(topic);
+                    pushMessageHeader.setQueueId(queueId);
+                    RemotingCommand pushMessage = RemotingCommand.createResponseCommand(PushMessageHeader.class);
+                    pushMessage.setBody(message);
+                    pushMessage.setCustomHeader(pushMessageHeader);
+                    pushMessage.setCode(RequestCode.SNODE_PUSH_MESSAGE);
+                    ClientChannelInfo clientChannelInfo = snodeController.getConsumerManager().getClientInfoTable(topic, queueId);
                     if (clientChannelInfo != null) {
+                        log.warn("Push message to topic: {} queueId: {}, message:{}", topic, queueId, pushMessage);
                         RemotingChannel remotingChannel = clientChannelInfo.getChannel();
                         snodeController.getSnodeServer().push(remotingChannel, pushMessage, SnodeConstant.defaultTimeoutMills);
                     } else {
                         log.warn("Get client info to topic: {} queueId: {} is null", topic, queueId);
                     }
                 } catch (Exception ex) {
-                    log.warn("Push message to topic: {} queueId: {} ex:{}", topic, queueId, ex);
+                    log.warn("Push message to topic: {} queueId: {}", topic, queueId, ex);
                 }
+            } else {
+                log.info("Push message to topic: {} queueId: {} canceled!", topic, queueId);
             }
         }
 
-        public AtomicBoolean getCanceled() {
-            return canceled;
-        }
-
         public void setCanceled(AtomicBoolean canceled) {
             this.canceled = canceled;
         }
@@ -113,10 +113,15 @@ public class PushServiceImpl implements PushService {
     }
 
     @Override
-    public void pushMessage(final String messageId, final byte[] message, final Integer queueId, final String topic,
-        final long queueOffset) {
-        PushTask pushTask = new PushTask(messageId, message, queueId, topic, queueOffset);
-        pushMessageExecutorService.submit(pushTask);
+    public void pushMessage(final String topic, final Integer queueId, final byte[] message,
+        final RemotingCommand response) {
+        ClientChannelInfo clientChannelInfo = this.snodeController.getConsumerManager().getClientInfoTable(topic, queueId);
+        if (clientChannelInfo != null) {
+            PushTask pushTask = new PushTask(topic, queueId, message, response);
+            pushMessageExecutorService.submit(pushTask);
+        } else {
+            log.info("Topic: {} QueueId: {} no need to push", topic, queueId);
+        }
     }
 
     @Override
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/ScheduledServiceImpl.java b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/ScheduledServiceImpl.java
index 4de080b..21e7fab 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/ScheduledServiceImpl.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/ScheduledServiceImpl.java
@@ -112,7 +112,7 @@ public class ScheduledServiceImpl implements ScheduledService {
                     log.warn("Update broker addr error:{}", ex);
                 }
             }
-        }, 1000 * 10, Math.max(10000, Math.min(snodeConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
+        }, 0, Math.max(10000, Math.min(snodeConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
 
         this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
             @Override