You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2019/01/02 16:45:37 UTC

[rocketmq] 13/14: Add snode interceptor

This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch snode
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit 6a49f0cbdfd5e598149f517681eca14d75c02064
Author: duhenglucky <du...@gmail.com>
AuthorDate: Wed Jan 2 23:39:05 2019 +0800

    Add snode interceptor
---
 .../rocketmq/broker/mqtrace/SendMessageHook.java   |  6 +--
 .../apache/rocketmq/remoting/RemotingClient.java   |  2 -
 .../remoting/transport/http2/Http2ClientImpl.java  |  9 ----
 .../transport/rocketmq/NettyRemotingClient.java    |  9 ----
 .../rocketmq/remoting/util/ServiceProvider.java    | 37 ++++++++++++-
 .../org/apache/rocketmq/snode/SnodeController.java | 60 +++++++++++-----------
 .../snode/interceptor/ExceptionContext.java        | 47 +++++++++++++++++
 .../rocketmq/snode/interceptor/Interceptor.java    | 11 ++--
 .../snode/interceptor/InterceptorFactory.java      | 46 +++++++++++++++++
 .../snode/interceptor/InterceptorGroup.java        | 47 +++++++++++++++++
 .../rocketmq/snode/interceptor/RequestContext.java | 45 ++++++++++++++++
 .../snode/interceptor/ResponseContext.java         | 23 ++++++---
 .../snode/offset/ConsumerOffsetManager.java        |  3 +-
 .../snode/processor/PullMessageProcessor.java      | 31 +++++++++--
 .../snode/processor/SendMessageProcessor.java      | 15 ++++++
 .../snode/service/impl/EnodeServiceImpl.java       |  3 --
 .../snode/service/impl/NnodeServiceImpl.java       |  2 +-
 ...tmq.snode.interceptor.ConsumeMessageInterceptor |  0
 ...cketmq.snode.interceptor.SendMessageInterceptor |  0
 19 files changed, 320 insertions(+), 76 deletions(-)

diff --git a/broker/src/main/java/org/apache/rocketmq/broker/mqtrace/SendMessageHook.java b/broker/src/main/java/org/apache/rocketmq/broker/mqtrace/SendMessageHook.java
index a74b6d6..a89bace 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/mqtrace/SendMessageHook.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/mqtrace/SendMessageHook.java
@@ -17,9 +17,9 @@
 package org.apache.rocketmq.broker.mqtrace;
 
 public interface SendMessageHook {
-    public String hookName();
+    String hookName();
 
-    public void sendMessageBefore(final SendMessageContext context);
+    void sendMessageBefore(final SendMessageContext context);
 
-    public void sendMessageAfter(final SendMessageContext context);
+    void sendMessageAfter(final SendMessageContext context);
 }
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java
index c706eed..84d4241 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java
@@ -49,7 +49,5 @@ public interface RemotingClient extends RemotingService {
 
     ExecutorService getCallbackExecutor();
 
-    boolean isChannelWritable(final String addr);
-
     RemotingClient init(ClientConfig nettyClientConfig, ChannelEventListener channelEventListener);
 }
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/http2/Http2ClientImpl.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/http2/Http2ClientImpl.java
index 71bee1e..fb9e3f8 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/http2/Http2ClientImpl.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/http2/Http2ClientImpl.java
@@ -267,15 +267,6 @@ public class Http2ClientImpl extends NettyRemotingClientAbstract implements Remo
     }
 
     @Override
-    public boolean isChannelWritable(String addr) {
-        ChannelWrapper cw = this.channelTables.get(addr);
-        if (cw != null && cw.isOK()) {
-            return cw.isWritable();
-        }
-        return true;
-    }
-
-    @Override
     public List<String> getNameServerAddressList() {
         return this.namesrvAddrList.get();
     }
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/rocketmq/NettyRemotingClient.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/rocketmq/NettyRemotingClient.java
index 686bb01..a5790df 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/rocketmq/NettyRemotingClient.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/rocketmq/NettyRemotingClient.java
@@ -273,15 +273,6 @@ public class NettyRemotingClient extends NettyRemotingClientAbstract implements
     }
 
     @Override
-    public boolean isChannelWritable(String addr) {
-        ChannelWrapper cw = this.channelTables.get(addr);
-        if (cw != null && cw.isOK()) {
-            return cw.isWritable();
-        }
-        return true;
-    }
-
-    @Override
     public List<String> getNameServerAddressList() {
         return this.namesrvAddrList.get();
     }
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/util/ServiceProvider.java b/remoting/src/main/java/org/apache/rocketmq/remoting/util/ServiceProvider.java
index 33c8312..586a846 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/util/ServiceProvider.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/util/ServiceProvider.java
@@ -15,7 +15,9 @@ package org.apache.rocketmq.remoting.util;
 import java.io.BufferedReader;
 import java.io.InputStream;
 import java.io.InputStreamReader;
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import org.apache.rocketmq.logging.InternalLogger;
@@ -92,6 +94,39 @@ public class ServiceProvider {
         }
     }
 
+    public static <T> List<T> loadServiceList(String name, Class<?> clazz) {
+        LOG.info("Looking for a resource file of name [{}] ...", name);
+        List<T> services = new ArrayList<T>();
+        try {
+            ArrayList<String> names = new ArrayList<String>();
+            final InputStream is = getResourceAsStream(getContextClassLoader(), name);
+            if (is != null) {
+                BufferedReader reader;
+                try {
+                    reader = new BufferedReader(new InputStreamReader(is, "UTF-8"));
+                } catch (java.io.UnsupportedEncodingException e) {
+                    reader = new BufferedReader(new InputStreamReader(is));
+                }
+                String serviceName = reader.readLine();
+                while (serviceName != null && !"".equals(serviceName)) {
+                    if (!names.contains(serviceName)) {
+                        T instance = createInstance(serviceName, clazz);
+                        services.add(instance);
+                    }
+                    names.add(serviceName);
+                    serviceName = reader.readLine();
+                }
+                reader.close();
+            } else {
+                // is == null
+                LOG.warn("No resource file with name [{}] found.", name);
+            }
+        } catch (Exception e) {
+            LOG.error("Error occured when looking for resource file " + name, e);
+        }
+        return services;
+    }
+
     public static Map<String, String> loadPath(String path) {
         LOG.info("Load path looking for a resource file of name [{}] ...", path);
         Map<String, String> pathMap = new HashMap<String, String>();
@@ -121,7 +156,7 @@ public class ServiceProvider {
                 reader.close();
             }
         } catch (Exception ex) {
-            LOG.error("Error occured when looking for resource file " + path, ex);
+            LOG.error("Error occurred when looking for resource file " + path, ex);
         }
         return pathMap;
     }
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java b/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java
index c1ccb4a..66d792d 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java
@@ -15,6 +15,7 @@ package org.apache.rocketmq.snode;/*
  * limitations under the License.
  */
 
+import java.util.List;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -39,6 +40,9 @@ import org.apache.rocketmq.snode.client.DefaultConsumerIdsChangeListener;
 import org.apache.rocketmq.snode.client.ProducerManager;
 import org.apache.rocketmq.snode.client.SubscriptionGroupManager;
 import org.apache.rocketmq.snode.config.SnodeConfig;
+import org.apache.rocketmq.snode.interceptor.InterceptorFactory;
+import org.apache.rocketmq.snode.interceptor.InterceptorGroup;
+import org.apache.rocketmq.snode.interceptor.Interceptor;
 import org.apache.rocketmq.snode.offset.ConsumerOffsetManager;
 import org.apache.rocketmq.snode.processor.ConsumerManageProcessor;
 import org.apache.rocketmq.snode.processor.HearbeatProcessor;
@@ -76,6 +80,8 @@ public class SnodeController {
     private SendMessageProcessor sendMessageProcessor;
     private PullMessageProcessor pullMessageProcessor;
     private HearbeatProcessor hearbeatProcessor;
+    private InterceptorGroup consumeMessageInterceptorGroup;
+    private InterceptorGroup sendMessageInterceptorGroup;
 
     private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
         "SnodeControllerScheduledThread"));
@@ -161,9 +167,27 @@ public class SnodeController {
     public boolean initialize() {
         this.snodeServer = RemotingServerFactory.createInstance().init(this.nettyServerConfig, this.clientHousekeepingService);
         this.registerProcessor();
+        initInterceptorGroup();
         return true;
     }
 
+    private void initInterceptorGroup() {
+        List<Interceptor> consumeMessageInterceptors = InterceptorFactory.getInstance().loadConsumeMessageInterceptors();
+        if (consumeMessageInterceptors != null) {
+            this.consumeMessageInterceptorGroup = new InterceptorGroup();
+            for (Interceptor interceptor : consumeMessageInterceptors) {
+                this.consumeMessageInterceptorGroup.registerInterceptor(interceptor);
+            }
+        }
+        List<Interceptor> sendMessageInterceptors = InterceptorFactory.getInstance().loadSendMessageInterceptors();
+        if (sendMessageInterceptors != null) {
+            this.sendMessageInterceptorGroup = new InterceptorGroup();
+            for (Interceptor interceptor : sendMessageInterceptors) {
+                this.sendMessageInterceptorGroup.registerInterceptor(interceptor);
+            }
+        }
+    }
+
     public void registerProcessor() {
         this.snodeServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendMessageProcessor, this.sendMessageExecutor);
         this.snodeServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendMessageProcessor, this.sendMessageExecutor);
@@ -201,34 +225,18 @@ public class SnodeController {
         return producerManager;
     }
 
-    public void setProducerManager(ProducerManager producerManager) {
-        this.producerManager = producerManager;
-    }
-
     public RemotingServer getSnodeServer() {
         return snodeServer;
     }
 
-    public void setSnodeServer(RemotingServer snodeServer) {
-        this.snodeServer = snodeServer;
-    }
-
     public ConsumerManager getConsumerManager() {
         return consumerManager;
     }
 
-    public void setConsumerManager(ConsumerManager consumerManager) {
-        this.consumerManager = consumerManager;
-    }
-
     public SubscriptionGroupManager getSubscriptionGroupManager() {
         return subscriptionGroupManager;
     }
 
-    public void setSubscriptionGroupManager(SubscriptionGroupManager subscriptionGroupManager) {
-        this.subscriptionGroupManager = subscriptionGroupManager;
-    }
-
     public ClientConfig getNettyClientConfig() {
         return nettyClientConfig;
     }
@@ -237,31 +245,23 @@ public class SnodeController {
         return enodeService;
     }
 
-    public void setEnodeService(EnodeService enodeService) {
-        this.enodeService = enodeService;
-    }
-
     public NnodeService getNnodeService() {
         return nnodeService;
     }
 
-    public void setNnodeService(NnodeService nnodeService) {
-        this.nnodeService = nnodeService;
-    }
-
     public RemotingClient getRemotingClient() {
         return remotingClient;
     }
 
-    public void setRemotingClient(RemotingClient remotingClient) {
-        this.remotingClient = remotingClient;
-    }
-
     public ConsumerOffsetManager getConsumerOffsetManager() {
         return consumerOffsetManager;
     }
 
-    public void setConsumerOffsetManager(ConsumerOffsetManager consumerOffsetManager) {
-        this.consumerOffsetManager = consumerOffsetManager;
+    public InterceptorGroup getConsumeMessageInterceptorGroup() {
+        return consumeMessageInterceptorGroup;
+    }
+
+    public InterceptorGroup getSendMessageInterceptorGroup() {
+        return sendMessageInterceptorGroup;
     }
 }
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/interceptor/ExceptionContext.java b/snode/src/main/java/org/apache/rocketmq/snode/interceptor/ExceptionContext.java
new file mode 100644
index 0000000..569633d
--- /dev/null
+++ b/snode/src/main/java/org/apache/rocketmq/snode/interceptor/ExceptionContext.java
@@ -0,0 +1,47 @@
+package org.apache.rocketmq.snode.interceptor;/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.rocketmq.remoting.RemotingChannel;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+
+public class ExceptionContext extends RequestContext {
+    private Throwable throwable;
+    private String remark;
+
+    public ExceptionContext(RemotingCommand request, RemotingChannel remotingChannel, Throwable throwable,
+        String remark) {
+        super(request, remotingChannel);
+        this.throwable = throwable;
+        this.remark = remark;
+    }
+
+    public Throwable getThrowable() {
+        return throwable;
+    }
+
+    public void setThrowable(Throwable throwable) {
+        this.throwable = throwable;
+    }
+
+    public String getRemark() {
+        return remark;
+    }
+
+    public void setRemark(String remark) {
+        this.remark = remark;
+    }
+}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/mqtrace/SendMessageHook.java b/snode/src/main/java/org/apache/rocketmq/snode/interceptor/Interceptor.java
similarity index 76%
copy from broker/src/main/java/org/apache/rocketmq/broker/mqtrace/SendMessageHook.java
copy to snode/src/main/java/org/apache/rocketmq/snode/interceptor/Interceptor.java
index a74b6d6..18cf6b3 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/mqtrace/SendMessageHook.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/interceptor/Interceptor.java
@@ -14,12 +14,11 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.broker.mqtrace;
+package org.apache.rocketmq.snode.interceptor;
+public interface Interceptor {
+    void beforeSendMessage(RequestContext requestContext);
 
-public interface SendMessageHook {
-    public String hookName();
+    void afterSendMessage(ResponseContext responseContext);
 
-    public void sendMessageBefore(final SendMessageContext context);
-
-    public void sendMessageAfter(final SendMessageContext context);
+    void onException(ExceptionContext exceptionContext);
 }
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/interceptor/InterceptorFactory.java b/snode/src/main/java/org/apache/rocketmq/snode/interceptor/InterceptorFactory.java
new file mode 100644
index 0000000..e2b4332
--- /dev/null
+++ b/snode/src/main/java/org/apache/rocketmq/snode/interceptor/InterceptorFactory.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.snode.interceptor;
+
+import java.util.List;
+import org.apache.rocketmq.remoting.util.ServiceProvider;
+
+public class InterceptorFactory {
+    private static InterceptorFactory ourInstance = new InterceptorFactory();
+
+    public static InterceptorFactory getInstance() {
+        return ourInstance;
+    }
+
+    private final String SEND_MESSAGE_INTERCEPTOR = "META-INF/service/org.apache.rocketmq.snode.interceptor.SendMessageInterceptor";
+
+    private final String CONSUME_MESSAGE_INTERCEPTOR = "META-INF/service/org.apache.rocketmq.snode.interceptor.ConsumeMessageInterceptor";
+
+    private InterceptorFactory() {
+    }
+
+    public List loadConsumeMessageInterceptors() {
+        List<Interceptor> consumeMessageInterceptors = ServiceProvider.loadServiceList(CONSUME_MESSAGE_INTERCEPTOR, Interceptor.class);
+        return consumeMessageInterceptors;
+    }
+
+    public List loadSendMessageInterceptors() {
+        List<Interceptor> sendMessageInterceptors = ServiceProvider.loadServiceList(SEND_MESSAGE_INTERCEPTOR, Interceptor.class);
+        return sendMessageInterceptors;
+    }
+
+}
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/interceptor/InterceptorGroup.java b/snode/src/main/java/org/apache/rocketmq/snode/interceptor/InterceptorGroup.java
new file mode 100644
index 0000000..4582894
--- /dev/null
+++ b/snode/src/main/java/org/apache/rocketmq/snode/interceptor/InterceptorGroup.java
@@ -0,0 +1,47 @@
+package org.apache.rocketmq.snode.interceptor;/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class InterceptorGroup {
+    private List<Interceptor> interceptors = new ArrayList<>();
+
+    public void registerInterceptor(Interceptor sendMessageInterceptor) {
+        if (sendMessageInterceptor != null) {
+            interceptors.add(sendMessageInterceptor);
+        }
+    }
+
+    public void beforeRequest(RequestContext requestContext) {
+        for (Interceptor interceptor : interceptors) {
+            interceptor.beforeSendMessage(requestContext);
+        }
+    }
+
+    public void afterRequest(ResponseContext responseContext) {
+        for (Interceptor interceptor : interceptors) {
+            interceptor.afterSendMessage(responseContext);
+        }
+    }
+
+    public void onException(ExceptionContext exceptionContext) {
+        for (Interceptor interceptor : interceptors) {
+            interceptor.onException(exceptionContext);
+        }
+    }
+}
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/interceptor/RequestContext.java b/snode/src/main/java/org/apache/rocketmq/snode/interceptor/RequestContext.java
new file mode 100644
index 0000000..796358b
--- /dev/null
+++ b/snode/src/main/java/org/apache/rocketmq/snode/interceptor/RequestContext.java
@@ -0,0 +1,45 @@
+package org.apache.rocketmq.snode.interceptor;/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.rocketmq.remoting.RemotingChannel;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+
+public class RequestContext {
+    protected RemotingCommand request;
+    protected RemotingChannel remotingChannel;
+
+    public RequestContext(RemotingCommand request, RemotingChannel remotingChannel) {
+        this.remotingChannel = remotingChannel;
+        this.request = request;
+    }
+
+    public RemotingCommand getRequest() {
+        return request;
+    }
+
+    public void setRequest(RemotingCommand request) {
+        this.request = request;
+    }
+
+    public RemotingChannel getRemotingChannel() {
+        return remotingChannel;
+    }
+
+    public void setRemotingChannel(RemotingChannel remotingChannel) {
+        this.remotingChannel = remotingChannel;
+    }
+}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/mqtrace/SendMessageHook.java b/snode/src/main/java/org/apache/rocketmq/snode/interceptor/ResponseContext.java
similarity index 56%
copy from broker/src/main/java/org/apache/rocketmq/broker/mqtrace/SendMessageHook.java
copy to snode/src/main/java/org/apache/rocketmq/snode/interceptor/ResponseContext.java
index a74b6d6..2634426 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/mqtrace/SendMessageHook.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/interceptor/ResponseContext.java
@@ -1,4 +1,4 @@
-/*
+package org.apache.rocketmq.snode.interceptor;/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -14,12 +14,23 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.broker.mqtrace;
 
-public interface SendMessageHook {
-    public String hookName();
+import org.apache.rocketmq.remoting.RemotingChannel;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 
-    public void sendMessageBefore(final SendMessageContext context);
+public class ResponseContext extends RequestContext {
+    private RemotingCommand response;
 
-    public void sendMessageAfter(final SendMessageContext context);
+    public ResponseContext(RemotingCommand request, RemotingChannel remotingChannel, RemotingCommand response) {
+        super(request, remotingChannel);
+        this.response = response;
+    }
+
+    public RemotingCommand getResponse() {
+        return response;
+    }
+
+    public void setResponse(RemotingCommand response) {
+        this.response = response;
+    }
 }
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/offset/ConsumerOffsetManager.java b/snode/src/main/java/org/apache/rocketmq/snode/offset/ConsumerOffsetManager.java
index c177ccf..065c017 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/offset/ConsumerOffsetManager.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/offset/ConsumerOffsetManager.java
@@ -42,7 +42,7 @@ public class ConsumerOffsetManager {
     private static final String TOPIC_GROUP_SEPARATOR = "@";
 
     private ConcurrentMap<String/* topic@group */, ConcurrentMap<Integer, Long>> offsetTable =
-        new ConcurrentHashMap<String, ConcurrentMap<Integer, Long>>(512);
+        new ConcurrentHashMap<>(512);
 
     private transient SnodeController snodeController;
 
@@ -207,7 +207,6 @@ public class ConsumerOffsetManager {
         this.offsetTable = offsetTable;
     }
 
-
     public Map<Integer, Long> queryOffset(final String enodeName, final String group, final String topic) {
         // topic@group
         String key = buildKey(enodeName, topic, group);
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/processor/PullMessageProcessor.java b/snode/src/main/java/org/apache/rocketmq/snode/processor/PullMessageProcessor.java
index 645d7fc..f5d080f 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/processor/PullMessageProcessor.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/processor/PullMessageProcessor.java
@@ -32,6 +32,9 @@ import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 import org.apache.rocketmq.snode.SnodeController;
 import org.apache.rocketmq.snode.client.ConsumerGroupInfo;
+import org.apache.rocketmq.snode.interceptor.ExceptionContext;
+import org.apache.rocketmq.snode.interceptor.RequestContext;
+import org.apache.rocketmq.snode.interceptor.ResponseContext;
 
 public class PullMessageProcessor implements RequestProcessor {
     private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME);
@@ -45,8 +48,21 @@ public class PullMessageProcessor implements RequestProcessor {
     @Override
     public RemotingCommand processRequest(RemotingChannel remotingChannel,
         RemotingCommand request) throws RemotingCommandException {
-        RemotingCommand response = RemotingCommand.createResponseCommand(PullMessageResponseHeader.class);
+        if (this.snodeController.getConsumeMessageInterceptorGroup() != null) {
+            RequestContext requestContext = new RequestContext(request, remotingChannel);
+            this.snodeController.getConsumeMessageInterceptorGroup().beforeRequest(requestContext);
+        }
+        RemotingCommand response = pullMessage(remotingChannel, request);
+        if (this.snodeController.getConsumeMessageInterceptorGroup() != null && response != null) {
+            ResponseContext responseContext = new ResponseContext(request, remotingChannel, response);
+            this.snodeController.getSendMessageInterceptorGroup().afterRequest(responseContext);
+        }
+        return response;
+    }
 
+    private RemotingCommand pullMessage(RemotingChannel remotingChannel,
+        RemotingCommand request) throws RemotingCommandException {
+        RemotingCommand response = RemotingCommand.createResponseCommand(PullMessageResponseHeader.class);
         final PullMessageRequestHeader requestHeader =
             (PullMessageRequestHeader) request.decodeCommandCustomHeader(PullMessageRequestHeader.class);
 
@@ -72,14 +88,13 @@ public class PullMessageProcessor implements RequestProcessor {
             response.setRemark("The consumer group[" + requestHeader.getConsumerGroup() + "] can not consume by broadcast way");
             return response;
         }
-
-        SubscriptionData subscriptionData = consumerGroupInfo.findSubscriptionData(requestHeader.getTopic());
-        if (null == subscriptionData) {
+        if ((consumerGroupInfo == null) || (consumerGroupInfo.findSubscriptionData(requestHeader.getTopic()) == null)) {
             log.warn("The consumer's subscription not exist, group: {}, topic:{}", requestHeader.getConsumerGroup(), requestHeader.getTopic());
             response.setCode(ResponseCode.SUBSCRIPTION_NOT_EXIST);
             response.setRemark("The consumer's subscription not exist" + FAQUrl.suggestTodo(FAQUrl.SAME_GROUP_DIFFERENT_TOPIC));
             return response;
         }
+        SubscriptionData subscriptionData = consumerGroupInfo.findSubscriptionData(requestHeader.getTopic());
 
         if (subscriptionData.getSubVersion() < requestHeader.getSubVersion()) {
             log.warn("The broker's subscription is not latest, group: {} {}", requestHeader.getConsumerGroup(),
@@ -92,8 +107,16 @@ public class PullMessageProcessor implements RequestProcessor {
         CompletableFuture<RemotingCommand> responseFuture = snodeController.getEnodeService().pullMessage(request);
         responseFuture.whenComplete((data, ex) -> {
             if (ex == null) {
+                if (this.snodeController.getConsumeMessageInterceptorGroup() != null) {
+                    ResponseContext responseContext = new ResponseContext(request, remotingChannel, data);
+                    this.snodeController.getSendMessageInterceptorGroup().afterRequest(responseContext);
+                }
                 remotingChannel.reply(data);
             } else {
+                if (this.snodeController.getConsumeMessageInterceptorGroup() != null) {
+                    ExceptionContext exceptionContext = new ExceptionContext(request, remotingChannel, ex, null);
+                    this.snodeController.getConsumeMessageInterceptorGroup().onException(exceptionContext);
+                }
                 log.error("Pull message error: {}", ex);
             }
         });
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/processor/SendMessageProcessor.java b/snode/src/main/java/org/apache/rocketmq/snode/processor/SendMessageProcessor.java
index bd18339..2c45bb7 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/processor/SendMessageProcessor.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/processor/SendMessageProcessor.java
@@ -23,6 +23,9 @@ import org.apache.rocketmq.remoting.RemotingChannel;
 import org.apache.rocketmq.remoting.RequestProcessor;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 import org.apache.rocketmq.snode.SnodeController;
+import org.apache.rocketmq.snode.interceptor.ExceptionContext;
+import org.apache.rocketmq.snode.interceptor.RequestContext;
+import org.apache.rocketmq.snode.interceptor.ResponseContext;
 
 public class SendMessageProcessor implements RequestProcessor {
     private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME);
@@ -35,11 +38,23 @@ public class SendMessageProcessor implements RequestProcessor {
 
     @Override
     public RemotingCommand processRequest(RemotingChannel remotingChannel, RemotingCommand request) {
+        if (this.snodeController.getSendMessageInterceptorGroup() != null) {
+            RequestContext requestContext = new RequestContext(request, remotingChannel);
+            this.snodeController.getSendMessageInterceptorGroup().beforeRequest(requestContext);
+        }
         CompletableFuture<RemotingCommand> responseFuture = snodeController.getEnodeService().sendMessage(request);
         responseFuture.whenComplete((data, ex) -> {
             if (ex == null) {
+                if (this.snodeController.getSendMessageInterceptorGroup() != null) {
+                    ResponseContext responseContext = new ResponseContext(request, remotingChannel, data);
+                    this.snodeController.getSendMessageInterceptorGroup().afterRequest(responseContext);
+                }
                 remotingChannel.reply(data);
             } else {
+                if (this.snodeController.getSendMessageInterceptorGroup() != null) {
+                    ExceptionContext exceptionContext = new ExceptionContext(request, remotingChannel, ex, null);
+                    this.snodeController.getSendMessageInterceptorGroup().onException(exceptionContext);
+                }
                 log.error("Send Message error: {}", ex);
             }
         });
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/EnodeServiceImpl.java b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/EnodeServiceImpl.java
index 20a4d2f..5e28ebe 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/EnodeServiceImpl.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/EnodeServiceImpl.java
@@ -15,8 +15,6 @@ package org.apache.rocketmq.snode.service.impl;/*
  * limitations under the License.
  */
 
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelHandlerContext;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
@@ -47,7 +45,6 @@ import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 import org.apache.rocketmq.remoting.exception.RemotingConnectException;
 import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
 import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
-import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
 import org.apache.rocketmq.remoting.netty.ResponseFuture;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/NnodeServiceImpl.java b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/NnodeServiceImpl.java
index b272c31..78fd624 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/NnodeServiceImpl.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/NnodeServiceImpl.java
@@ -93,7 +93,7 @@ public class NnodeServiceImpl implements NnodeService {
 
         RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ROUTEINTO_BY_TOPIC, requestHeader);
         RemotingCommand response = this.snodeController.getRemotingClient().invokeSync(null, request, SnodeConstant.defaultTimeoutMills);
-        log.info("getTopicRouteInfoFromNameServer response: " + response);
+        log.info("GetTopicRouteInfoFromNameServer response: " + response);
         assert response != null;
         switch (response.getCode()) {
             case ResponseCode.TOPIC_NOT_EXIST: {
diff --git a/snode/src/main/resources/META-INF/service/org.apache.rocketmq.snode.interceptor.ConsumeMessageInterceptor b/snode/src/main/resources/META-INF/service/org.apache.rocketmq.snode.interceptor.ConsumeMessageInterceptor
new file mode 100644
index 0000000..e69de29
diff --git a/snode/src/main/resources/META-INF/service/org.apache.rocketmq.snode.interceptor.SendMessageInterceptor b/snode/src/main/resources/META-INF/service/org.apache.rocketmq.snode.interceptor.SendMessageInterceptor
new file mode 100644
index 0000000..e69de29