You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2019/01/02 16:45:36 UTC

[rocketmq] 12/14: Polish snode related RemotingChannel implementation

This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch snode
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit db97b4063888e85a9d2f39246dc9f71ebde75ad0
Author: duhenglucky <du...@gmail.com>
AuthorDate: Wed Jan 2 14:51:10 2019 +0800

    Polish snode related RemotingChannel implementation
---
 .../rocketmq/remoting/RemotingClientFactory.java   | 16 ++++++++
 .../apache/rocketmq/remoting/RemotingServer.java   |  4 +-
 .../rocketmq/remoting/RemotingServerFactory.java   | 16 ++++++++
 .../netty/NettyChannelHandlerContextImpl.java      | 43 ++++++++++++++--------
 .../rocketmq/remoting/netty/NettyChannelImpl.java  | 20 +++++++++-
 .../remoting/serialize/MsgPackSerializable.java    | 21 +++++++++--
 .../remoting/serialize/SerializerFactory.java      | 16 ++++++++
 .../transport/NettyRemotingServerAbstract.java     | 16 ++++++++
 .../remoting/transport/http2/Http2ClientImpl.java  | 16 ++++++++
 .../remoting/transport/http2/Http2ServerImpl.java  | 20 ++++++++--
 .../transport/rocketmq/NettyRemotingServer.java    | 23 ------------
 .../org/apache/rocketmq/snode/SnodeStartup.java    |  6 ++-
 .../snode/processor/ConsumerManageProcessor.java   | 39 +++++++++-----------
 .../snode/processor/HearbeatProcessor.java         | 22 ++++-------
 .../snode/processor/PullMessageProcessor.java      | 12 +-----
 .../snode/processor/SendMessageProcessor.java      |  3 +-
 .../rocketmq/snode/service/EnodeService.java       |  4 +-
 .../snode/service/impl/EnodeServiceImpl.java       |  6 +--
 .../snode/service/impl/NnodeServiceImpl.java       |  2 -
 19 files changed, 197 insertions(+), 108 deletions(-)

diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClientFactory.java b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClientFactory.java
index 825d96b..4df2a70 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClientFactory.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClientFactory.java
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.rocketmq.remoting;
 
 import java.util.Map;
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingServer.java b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingServer.java
index c773e5a..a37b538 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingServer.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingServer.java
@@ -48,7 +48,5 @@ public interface RemotingServer extends RemotingService {
         throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException,
         RemotingSendRequestException;
 
-    RemotingServer init(ServerConfig nettyServerConfig, ChannelEventListener channelEventListener);
-
-    void sendResponse(final RemotingChannel remotingChannel, RemotingCommand remotingCommand);
+    RemotingServer init(ServerConfig serverConfig, ChannelEventListener channelEventListener);
 }
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingServerFactory.java b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingServerFactory.java
index a2cdf2b..6dbf22a 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingServerFactory.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingServerFactory.java
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.rocketmq.remoting;
 
 import java.util.Map;
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyChannelHandlerContextImpl.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyChannelHandlerContextImpl.java
index 673c12a..a00ef05 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyChannelHandlerContextImpl.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyChannelHandlerContextImpl.java
@@ -1,19 +1,16 @@
-package org.apache.rocketmq.remoting.netty;/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
  */
+package org.apache.rocketmq.remoting.netty;
 
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelFutureListener;
@@ -69,8 +66,24 @@ public class NettyChannelHandlerContextImpl implements RemotingChannel {
     }
 
     @Override
-    public void reply(final RemotingCommand command) {
-        channelHandlerContext.writeAndFlush(command);
+    public void reply(final RemotingCommand response) {
+        if (response != null) {
+            response.markResponseType();
+            try {
+                this.channelHandlerContext.writeAndFlush(response).addListener(new ChannelFutureListener() {
+                    @Override
+                    public void operationComplete(ChannelFuture future) throws Exception {
+                        if (!future.isSuccess()) {
+                            log.error("processRequestWrapper response to {} failed",
+                                future.channel().remoteAddress(), future.cause());
+                        }
+                    }
+                });
+            } catch (Throwable e) {
+                log.error("processRequestWrapper process request over, but response failed", e);
+                log.error(response.toString());
+            }
+        }
     }
 
     public ChannelHandlerContext getChannelHandlerContext() {
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyChannelImpl.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyChannelImpl.java
index 86436c1..fe33a52 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyChannelImpl.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyChannelImpl.java
@@ -71,8 +71,24 @@ public class NettyChannelImpl implements RemotingChannel {
     }
 
     @Override
-    public void reply(final RemotingCommand command) {
-        channel.writeAndFlush(command);
+    public void reply(final RemotingCommand response) {
+        if (response != null) {
+            response.markResponseType();
+            try {
+                channel.writeAndFlush(response).addListener(new ChannelFutureListener() {
+                    @Override
+                    public void operationComplete(ChannelFuture future) {
+                        if (!future.isSuccess()) {
+                            log.error("ProcessRequestWrapper response to {} failed",
+                                future.channel().remoteAddress(), future.cause());
+                        }
+                    }
+                });
+            } catch (Throwable e) {
+                log.error("ProcessRequestWrapper process request over, but response failed", e);
+                log.error(response.toString());
+            }
+        }
     }
 
     public io.netty.channel.Channel getChannel() {
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/serialize/MsgPackSerializable.java b/remoting/src/main/java/org/apache/rocketmq/remoting/serialize/MsgPackSerializable.java
index 6c7ba78..4bf1896 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/serialize/MsgPackSerializable.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/serialize/MsgPackSerializable.java
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.rocketmq.remoting.serialize;
 
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
@@ -6,10 +22,7 @@ import org.msgpack.MessagePack;
 public class MsgPackSerializable implements Serializer {
     private final MessagePack messagePack = new MessagePack();
 
-//    public MsgPackSerializable(){
-//        messagePack.register(LanguageCode.class);
-//        messagePack.register(SerializeType.class);
-//    }
+
     @Override
     public SerializeType type() {
         return SerializeType.MSGPACK;
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/serialize/SerializerFactory.java b/remoting/src/main/java/org/apache/rocketmq/remoting/serialize/SerializerFactory.java
index e015478..1330b39 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/serialize/SerializerFactory.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/serialize/SerializerFactory.java
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.rocketmq.remoting.serialize;
 
 import java.util.Map;
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/NettyRemotingServerAbstract.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/NettyRemotingServerAbstract.java
index cec0086..5ef9783 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/NettyRemotingServerAbstract.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/NettyRemotingServerAbstract.java
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.rocketmq.remoting.transport;
 
 import io.netty.channel.Channel;
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/http2/Http2ClientImpl.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/http2/Http2ClientImpl.java
index e37f354..71bee1e 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/http2/Http2ClientImpl.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/http2/Http2ClientImpl.java
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.rocketmq.remoting.transport.http2;
 
 import io.netty.bootstrap.Bootstrap;
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/http2/Http2ServerImpl.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/http2/Http2ServerImpl.java
index 8e5e4c7..649055b 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/http2/Http2ServerImpl.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/http2/Http2ServerImpl.java
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.rocketmq.remoting.transport.http2;
 
 import io.netty.bootstrap.ServerBootstrap;
@@ -244,8 +260,4 @@ public class Http2ServerImpl extends NettyRemotingServerAbstract implements Remo
         return this.channelEventListener;
     }
 
-    @Override
-    public void sendResponse(RemotingChannel channel, RemotingCommand remotingCommand) {
-
-    }
 }
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/rocketmq/NettyRemotingServer.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/rocketmq/NettyRemotingServer.java
index 2bd397c..89b9820 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/rocketmq/NettyRemotingServer.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/rocketmq/NettyRemotingServer.java
@@ -350,27 +350,4 @@ public class NettyRemotingServer extends NettyRemotingServerAbstract implements
     public void push(String addr, String sessionId, RemotingCommand remotingCommand) {
 
     }
-
-    @Override
-    public void sendResponse(RemotingChannel remotingChannel, RemotingCommand response) {
-        NettyChannelHandlerContextImpl channelHandlerContext = (NettyChannelHandlerContextImpl) remotingChannel;
-        ChannelHandlerContext ctx = channelHandlerContext.getChannelHandlerContext();
-        if (response != null) {
-            response.markResponseType();
-            try {
-                ctx.writeAndFlush(response).addListener(new ChannelFutureListener() {
-                    @Override
-                    public void operationComplete(ChannelFuture future) throws Exception {
-                        if (!future.isSuccess()) {
-                            log.error("processRequestWrapper response to {} failed",
-                                future.channel().remoteAddress(), future.cause());
-                        }
-                    }
-                });
-            } catch (Throwable e) {
-                log.error("processRequestWrapper process request over, but response failed", e);
-                log.error(response.toString());
-            }
-        }
-    }
 }
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/SnodeStartup.java b/snode/src/main/java/org/apache/rocketmq/snode/SnodeStartup.java
index 8ccb34e..f745ede 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/SnodeStartup.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/SnodeStartup.java
@@ -102,6 +102,10 @@ public class SnodeStartup {
                 in.close();
             }
         }
+        if (null == snodeConfig.getRocketmqHome()) {
+            System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation", MixAll.ROCKETMQ_HOME_ENV);
+            System.exit(-2);
+        }
 
         LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
         JoranConfigurator configurator = new JoranConfigurator();
@@ -135,7 +139,7 @@ public class SnodeStartup {
     }
 
     private static Options buildCommandlineOptions(final Options options) {
-        Option opt = new Option("c", "configFile", true, "Broker config properties file");
+        Option opt = new Option("c", "configFile", true, "SNode config properties file");
         opt.setRequired(false);
         options.addOption(opt);
         return options;
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/processor/ConsumerManageProcessor.java b/snode/src/main/java/org/apache/rocketmq/snode/processor/ConsumerManageProcessor.java
index bc58a5c..edb4b58 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/processor/ConsumerManageProcessor.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/processor/ConsumerManageProcessor.java
@@ -16,7 +16,6 @@
  */
 package org.apache.rocketmq.snode.processor;
 
-import io.netty.channel.ChannelHandlerContext;
 import java.util.List;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.protocol.RequestCode;
@@ -34,13 +33,12 @@ import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetResponseHe
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
 import org.apache.rocketmq.remoting.RemotingChannel;
+import org.apache.rocketmq.remoting.RequestProcessor;
 import org.apache.rocketmq.remoting.common.RemotingHelper;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 import org.apache.rocketmq.remoting.exception.RemotingConnectException;
 import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
 import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
-import org.apache.rocketmq.remoting.RequestProcessor;
-import org.apache.rocketmq.remoting.netty.NettyChannelHandlerContextImpl;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 import org.apache.rocketmq.snode.SnodeController;
 import org.apache.rocketmq.snode.client.ConsumerGroupInfo;
@@ -57,23 +55,20 @@ public class ConsumerManageProcessor implements RequestProcessor {
     @Override
     public RemotingCommand processRequest(RemotingChannel remotingChannel,
         RemotingCommand request) throws InterruptedException, RemotingTimeoutException,
-        RemotingSendRequestException, RemotingConnectException, RemotingCommandException  {
-        NettyChannelHandlerContextImpl nettyChannelHandlerContext = (NettyChannelHandlerContextImpl)remotingChannel;
-        ChannelHandlerContext ctx = nettyChannelHandlerContext.getChannelHandlerContext();
-
+        RemotingSendRequestException, RemotingConnectException, RemotingCommandException {
         switch (request.getCode()) {
             case RequestCode.GET_CONSUMER_LIST_BY_GROUP:
-                return this.getConsumerListByGroup(ctx, request);
+                return this.getConsumerListByGroup(remotingChannel, request);
             case RequestCode.UPDATE_CONSUMER_OFFSET:
-                return this.updateConsumerOffset(ctx, request);
+                return this.updateConsumerOffset(remotingChannel, request);
             case RequestCode.QUERY_CONSUMER_OFFSET:
-                return this.queryConsumerOffset(ctx, request);
+                return this.queryConsumerOffset(remotingChannel, request);
             case RequestCode.SEARCH_OFFSET_BY_TIMESTAMP:
-                return searchOffsetByTimestamp(ctx, request);
+                return searchOffsetByTimestamp(remotingChannel, request);
             case RequestCode.GET_MAX_OFFSET:
-                return getMaxOffset(ctx, request);
+                return getMaxOffset(remotingChannel, request);
             case RequestCode.GET_MIN_OFFSET:
-                return getMinOffset(ctx, request);
+                return getMinOffset(remotingChannel, request);
             default:
                 break;
         }
@@ -85,7 +80,7 @@ public class ConsumerManageProcessor implements RequestProcessor {
         return false;
     }
 
-    public RemotingCommand searchOffsetByTimestamp(ChannelHandlerContext ctx,
+    public RemotingCommand searchOffsetByTimestamp(RemotingChannel remotingChannel,
         RemotingCommand request) throws RemotingCommandException {
         final SearchOffsetRequestHeader requestHeader =
             (SearchOffsetRequestHeader) request
@@ -98,7 +93,7 @@ public class ConsumerManageProcessor implements RequestProcessor {
         return null;
     }
 
-    public RemotingCommand getMinOffset(ChannelHandlerContext ctx,
+    public RemotingCommand getMinOffset(RemotingChannel remotingChannel,
         RemotingCommand request) throws RemotingCommandException {
         final GetMinOffsetRequestHeader requestHeader =
             (GetMinOffsetRequestHeader) request
@@ -111,7 +106,7 @@ public class ConsumerManageProcessor implements RequestProcessor {
         return null;
     }
 
-    public RemotingCommand getMaxOffset(ChannelHandlerContext ctx,
+    public RemotingCommand getMaxOffset(RemotingChannel remotingChannel,
         RemotingCommand request) throws RemotingCommandException {
         final GetMaxOffsetRequestHeader requestHeader =
             (GetMaxOffsetRequestHeader) request
@@ -124,7 +119,7 @@ public class ConsumerManageProcessor implements RequestProcessor {
         return null;
     }
 
-    public RemotingCommand getConsumerListByGroup(ChannelHandlerContext ctx, RemotingCommand request)
+    public RemotingCommand getConsumerListByGroup(RemotingChannel remotingChannel, RemotingCommand request)
         throws RemotingCommandException {
         final RemotingCommand response =
             RemotingCommand.createResponseCommand(GetConsumerListByGroupResponseHeader.class);
@@ -146,11 +141,11 @@ public class ConsumerManageProcessor implements RequestProcessor {
                 return response;
             } else {
                 log.warn("GetAllClientId failed, {} {}", requestHeader.getConsumerGroup(),
-                    RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
+                    RemotingHelper.parseChannelRemoteAddr(remotingChannel.remoteAddress()));
             }
         } else {
             log.warn("GetConsumerGroupInfo failed, {} {}", requestHeader.getConsumerGroup(),
-                RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
+                RemotingHelper.parseChannelRemoteAddr(remotingChannel.remoteAddress()));
         }
 
         response.setCode(ResponseCode.SYSTEM_ERROR);
@@ -158,21 +153,21 @@ public class ConsumerManageProcessor implements RequestProcessor {
         return response;
     }
 
-    private RemotingCommand updateConsumerOffset(ChannelHandlerContext ctx, RemotingCommand request)
+    private RemotingCommand updateConsumerOffset(RemotingChannel remotingChannel, RemotingCommand request)
         throws RemotingCommandException {
         final RemotingCommand response =
             RemotingCommand.createResponseCommand(UpdateConsumerOffsetResponseHeader.class);
         final UpdateConsumerOffsetRequestHeader requestHeader =
             (UpdateConsumerOffsetRequestHeader) request
                 .decodeCommandCustomHeader(UpdateConsumerOffsetRequestHeader.class);
-        this.snodeController.getConsumerOffsetManager().commitOffset(requestHeader.getEnodeName(), RemotingHelper.parseChannelRemoteAddr(ctx.channel()), requestHeader.getConsumerGroup(),
+        this.snodeController.getConsumerOffsetManager().commitOffset(requestHeader.getEnodeName(), RemotingHelper.parseChannelRemoteAddr(remotingChannel.remoteAddress()), requestHeader.getConsumerGroup(),
             requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getCommitOffset());
         response.setCode(ResponseCode.SUCCESS);
         response.setRemark(null);
         return response;
     }
 
-    private RemotingCommand queryConsumerOffset(ChannelHandlerContext ctx, RemotingCommand request)
+    private RemotingCommand queryConsumerOffset(RemotingChannel remotingChannel, RemotingCommand request)
         throws InterruptedException, RemotingTimeoutException,
         RemotingSendRequestException, RemotingConnectException, RemotingCommandException {
         final RemotingCommand response =
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/processor/HearbeatProcessor.java b/snode/src/main/java/org/apache/rocketmq/snode/processor/HearbeatProcessor.java
index c69ba5d..829ad0e 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/processor/HearbeatProcessor.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/processor/HearbeatProcessor.java
@@ -16,7 +16,6 @@
  */
 package org.apache.rocketmq.snode.processor;
 
-import io.netty.channel.ChannelHandlerContext;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.protocol.RequestCode;
 import org.apache.rocketmq.common.protocol.ResponseCode;
@@ -29,11 +28,8 @@ import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
 import org.apache.rocketmq.remoting.RemotingChannel;
-import org.apache.rocketmq.remoting.common.RemotingHelper;
-import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
-import org.apache.rocketmq.remoting.netty.NettyChannelHandlerContextImpl;
-import org.apache.rocketmq.remoting.netty.NettyChannelImpl;
 import org.apache.rocketmq.remoting.RequestProcessor;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 import org.apache.rocketmq.snode.SnodeController;
 import org.apache.rocketmq.snode.client.ClientChannelInfo;
@@ -49,23 +45,21 @@ public class HearbeatProcessor implements RequestProcessor {
     @Override
     public RemotingCommand processRequest(RemotingChannel remotingChannel,
         RemotingCommand request) throws Exception {
-        NettyChannelHandlerContextImpl nettyChannelHandlerContext = (NettyChannelHandlerContextImpl)remotingChannel;
-        ChannelHandlerContext ctx = nettyChannelHandlerContext.getChannelHandlerContext();
         switch (request.getCode()) {
             case RequestCode.HEART_BEAT:
-                return heartbeat(ctx, request);
+                return heartbeat(remotingChannel, request);
             case RequestCode.UNREGISTER_CLIENT:
-                return unregister(ctx, request);
+                return unregister(remotingChannel, request);
             default:
                 break;
         }
         return null;
     }
 
-    private RemotingCommand heartbeat(ChannelHandlerContext ctx, RemotingCommand request) {
+    private RemotingCommand heartbeat(RemotingChannel remotingChannel, RemotingCommand request) {
         HeartbeatData heartbeatData = HeartbeatData.decode(request.getBody(), HeartbeatData.class);
         ClientChannelInfo clientChannelInfo = new ClientChannelInfo(
-            new NettyChannelImpl(ctx.channel()),
+            remotingChannel,
             heartbeatData.getClientID(),
             request.getLanguage(),
             request.getVersion()
@@ -100,7 +94,7 @@ public class HearbeatProcessor implements RequestProcessor {
                 if (changed) {
                     log.info("registerConsumer info changed {} {}",
                         data.toString(),
-                        RemotingHelper.parseChannelRemoteAddr(ctx.channel())
+                        RemotingHelper.parseChannelRemoteAddr(remotingChannel.remoteAddress())
                     );
                 }
             }
@@ -111,14 +105,14 @@ public class HearbeatProcessor implements RequestProcessor {
         return response;
     }
 
-    private RemotingCommand unregister(ChannelHandlerContext ctx, RemotingCommand request) throws Exception {
+    private RemotingCommand unregister(RemotingChannel remotingChannel, RemotingCommand request) throws Exception {
         final RemotingCommand response =
             RemotingCommand.createResponseCommand(UnregisterClientResponseHeader.class);
         final UnregisterClientRequestHeader requestHeader =
             (UnregisterClientRequestHeader) request.decodeCommandCustomHeader(UnregisterClientRequestHeader.class);
 
         ClientChannelInfo clientChannelInfo = new ClientChannelInfo(
-            new NettyChannelImpl(ctx.channel()),
+            remotingChannel,
             requestHeader.getClientID(),
             request.getLanguage(),
             request.getVersion());
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/processor/PullMessageProcessor.java b/snode/src/main/java/org/apache/rocketmq/snode/processor/PullMessageProcessor.java
index 951d175..645d7fc 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/processor/PullMessageProcessor.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/processor/PullMessageProcessor.java
@@ -15,7 +15,6 @@ package org.apache.rocketmq.snode.processor;/*
  * limitations under the License.
  */
 
-import io.netty.channel.ChannelHandlerContext;
 import java.util.concurrent.CompletableFuture;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.help.FAQUrl;
@@ -30,10 +29,6 @@ import org.apache.rocketmq.logging.InternalLoggerFactory;
 import org.apache.rocketmq.remoting.RemotingChannel;
 import org.apache.rocketmq.remoting.RequestProcessor;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
-import org.apache.rocketmq.remoting.exception.RemotingConnectException;
-import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
-import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
-import org.apache.rocketmq.remoting.netty.NettyChannelHandlerContextImpl;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 import org.apache.rocketmq.snode.SnodeController;
 import org.apache.rocketmq.snode.client.ConsumerGroupInfo;
@@ -50,9 +45,6 @@ public class PullMessageProcessor implements RequestProcessor {
     @Override
     public RemotingCommand processRequest(RemotingChannel remotingChannel,
         RemotingCommand request) throws RemotingCommandException {
-        NettyChannelHandlerContextImpl nettyChannelHandlerContext = (NettyChannelHandlerContextImpl) remotingChannel;
-        ChannelHandlerContext ctx = nettyChannelHandlerContext.getChannelHandlerContext();
-
         RemotingCommand response = RemotingCommand.createResponseCommand(PullMessageResponseHeader.class);
 
         final PullMessageRequestHeader requestHeader =
@@ -97,10 +89,10 @@ public class PullMessageProcessor implements RequestProcessor {
             return response;
         }
 
-        CompletableFuture<RemotingCommand> responseFuture = snodeController.getEnodeService().pullMessage(ctx, request);
+        CompletableFuture<RemotingCommand> responseFuture = snodeController.getEnodeService().pullMessage(request);
         responseFuture.whenComplete((data, ex) -> {
             if (ex == null) {
-                this.snodeController.getSnodeServer().sendResponse(remotingChannel, data);
+                remotingChannel.reply(data);
             } else {
                 log.error("Pull message error: {}", ex);
             }
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/processor/SendMessageProcessor.java b/snode/src/main/java/org/apache/rocketmq/snode/processor/SendMessageProcessor.java
index 6641b2a..bd18339 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/processor/SendMessageProcessor.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/processor/SendMessageProcessor.java
@@ -15,7 +15,6 @@ package org.apache.rocketmq.snode.processor;/*
  * limitations under the License.
  */
 
-import io.netty.channel.ChannelHandlerContext;
 import java.util.concurrent.CompletableFuture;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.logging.InternalLogger;
@@ -39,7 +38,7 @@ public class SendMessageProcessor implements RequestProcessor {
         CompletableFuture<RemotingCommand> responseFuture = snodeController.getEnodeService().sendMessage(request);
         responseFuture.whenComplete((data, ex) -> {
             if (ex == null) {
-                snodeController.getSnodeServer().sendResponse(remotingChannel, data);
+                remotingChannel.reply(data);
             } else {
                 log.error("Send Message error: {}", ex);
             }
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/service/EnodeService.java b/snode/src/main/java/org/apache/rocketmq/snode/service/EnodeService.java
index add1cde..4c3f894 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/service/EnodeService.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/service/EnodeService.java
@@ -15,7 +15,6 @@ package org.apache.rocketmq.snode.service;/*
  * limitations under the License.
  */
 
-import io.netty.channel.ChannelHandlerContext;
 import java.util.concurrent.CompletableFuture;
 import org.apache.rocketmq.client.exception.MQBrokerException;
 import org.apache.rocketmq.common.TopicConfig;
@@ -32,8 +31,7 @@ public interface EnodeService {
 
     CompletableFuture<RemotingCommand> sendMessage(final RemotingCommand request);
 
-    CompletableFuture<RemotingCommand> pullMessage(final ChannelHandlerContext context,
-        final RemotingCommand remotingCommand);
+    CompletableFuture<RemotingCommand> pullMessage(final RemotingCommand request);
 
     void notifyConsumerIdsChanged(final RemotingChannel channel, final String consumerGroup);
 
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/EnodeServiceImpl.java b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/EnodeServiceImpl.java
index cd4a519..20a4d2f 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/EnodeServiceImpl.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/EnodeServiceImpl.java
@@ -47,6 +47,7 @@ import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 import org.apache.rocketmq.remoting.exception.RemotingConnectException;
 import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
 import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
+import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
 import org.apache.rocketmq.remoting.netty.ResponseFuture;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
@@ -81,8 +82,7 @@ public class EnodeServiceImpl implements EnodeService {
     }
 
     @Override
-    public CompletableFuture<RemotingCommand> pullMessage(final ChannelHandlerContext context,
-        RemotingCommand request) {
+    public CompletableFuture<RemotingCommand> pullMessage(RemotingCommand request) {
 
         CompletableFuture<RemotingCommand> future = new CompletableFuture<>();
         try {
@@ -106,7 +106,7 @@ public class EnodeServiceImpl implements EnodeService {
                 }
             });
         } catch (Exception ex) {
-            log.error("pull message async error:", ex);
+            log.error("Pull message async error:", ex);
             future.completeExceptionally(ex);
         }
         return future;
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/NnodeServiceImpl.java b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/NnodeServiceImpl.java
index fe28571..b272c31 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/NnodeServiceImpl.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/NnodeServiceImpl.java
@@ -19,7 +19,6 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
-import org.apache.rocketmq.client.ClientConfig;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.constant.LoggerName;
@@ -39,7 +38,6 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 import org.apache.rocketmq.snode.SnodeController;
 import org.apache.rocketmq.snode.config.SnodeConfig;
 import org.apache.rocketmq.snode.constant.SnodeConstant;
-import org.apache.rocketmq.snode.exception.SnodeException;
 import org.apache.rocketmq.snode.service.NnodeService;
 
 public class NnodeServiceImpl implements NnodeService {