You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2019/01/02 16:45:37 UTC
[rocketmq] 13/14: Add snode interceptor
This is an automated email from the ASF dual-hosted git repository.
duhengforever pushed a commit to branch snode
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit 6a49f0cbdfd5e598149f517681eca14d75c02064
Author: duhenglucky <du...@gmail.com>
AuthorDate: Wed Jan 2 23:39:05 2019 +0800
Add snode interceptor
---
.../rocketmq/broker/mqtrace/SendMessageHook.java | 6 +--
.../apache/rocketmq/remoting/RemotingClient.java | 2 -
.../remoting/transport/http2/Http2ClientImpl.java | 9 ----
.../transport/rocketmq/NettyRemotingClient.java | 9 ----
.../rocketmq/remoting/util/ServiceProvider.java | 37 ++++++++++++-
.../org/apache/rocketmq/snode/SnodeController.java | 60 +++++++++++-----------
.../snode/interceptor/ExceptionContext.java | 47 +++++++++++++++++
.../rocketmq/snode/interceptor/Interceptor.java | 11 ++--
.../snode/interceptor/InterceptorFactory.java | 46 +++++++++++++++++
.../snode/interceptor/InterceptorGroup.java | 47 +++++++++++++++++
.../rocketmq/snode/interceptor/RequestContext.java | 45 ++++++++++++++++
.../snode/interceptor/ResponseContext.java | 23 ++++++---
.../snode/offset/ConsumerOffsetManager.java | 3 +-
.../snode/processor/PullMessageProcessor.java | 31 +++++++++--
.../snode/processor/SendMessageProcessor.java | 15 ++++++
.../snode/service/impl/EnodeServiceImpl.java | 3 --
.../snode/service/impl/NnodeServiceImpl.java | 2 +-
...tmq.snode.interceptor.ConsumeMessageInterceptor | 0
...cketmq.snode.interceptor.SendMessageInterceptor | 0
19 files changed, 320 insertions(+), 76 deletions(-)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/mqtrace/SendMessageHook.java b/broker/src/main/java/org/apache/rocketmq/broker/mqtrace/SendMessageHook.java
index a74b6d6..a89bace 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/mqtrace/SendMessageHook.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/mqtrace/SendMessageHook.java
@@ -17,9 +17,9 @@
package org.apache.rocketmq.broker.mqtrace;
public interface SendMessageHook {
- public String hookName();
+ String hookName();
- public void sendMessageBefore(final SendMessageContext context);
+ void sendMessageBefore(final SendMessageContext context);
- public void sendMessageAfter(final SendMessageContext context);
+ void sendMessageAfter(final SendMessageContext context);
}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java
index c706eed..84d4241 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java
@@ -49,7 +49,5 @@ public interface RemotingClient extends RemotingService {
ExecutorService getCallbackExecutor();
- boolean isChannelWritable(final String addr);
-
RemotingClient init(ClientConfig nettyClientConfig, ChannelEventListener channelEventListener);
}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/http2/Http2ClientImpl.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/http2/Http2ClientImpl.java
index 71bee1e..fb9e3f8 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/http2/Http2ClientImpl.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/http2/Http2ClientImpl.java
@@ -267,15 +267,6 @@ public class Http2ClientImpl extends NettyRemotingClientAbstract implements Remo
}
@Override
- public boolean isChannelWritable(String addr) {
- ChannelWrapper cw = this.channelTables.get(addr);
- if (cw != null && cw.isOK()) {
- return cw.isWritable();
- }
- return true;
- }
-
- @Override
public List<String> getNameServerAddressList() {
return this.namesrvAddrList.get();
}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/rocketmq/NettyRemotingClient.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/rocketmq/NettyRemotingClient.java
index 686bb01..a5790df 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/rocketmq/NettyRemotingClient.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/rocketmq/NettyRemotingClient.java
@@ -273,15 +273,6 @@ public class NettyRemotingClient extends NettyRemotingClientAbstract implements
}
@Override
- public boolean isChannelWritable(String addr) {
- ChannelWrapper cw = this.channelTables.get(addr);
- if (cw != null && cw.isOK()) {
- return cw.isWritable();
- }
- return true;
- }
-
- @Override
public List<String> getNameServerAddressList() {
return this.namesrvAddrList.get();
}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/util/ServiceProvider.java b/remoting/src/main/java/org/apache/rocketmq/remoting/util/ServiceProvider.java
index 33c8312..586a846 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/util/ServiceProvider.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/util/ServiceProvider.java
@@ -15,7 +15,9 @@ package org.apache.rocketmq.remoting.util;
import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.rocketmq.logging.InternalLogger;
@@ -92,6 +94,39 @@ public class ServiceProvider {
}
}
+ public static <T> List<T> loadServiceList(String name, Class<?> clazz) {
+ LOG.info("Looking for a resource file of name [{}] ...", name);
+ List<T> services = new ArrayList<T>();
+ try {
+ ArrayList<String> names = new ArrayList<String>();
+ final InputStream is = getResourceAsStream(getContextClassLoader(), name);
+ if (is != null) {
+ BufferedReader reader;
+ try {
+ reader = new BufferedReader(new InputStreamReader(is, "UTF-8"));
+ } catch (java.io.UnsupportedEncodingException e) {
+ reader = new BufferedReader(new InputStreamReader(is));
+ }
+ String serviceName = reader.readLine();
+ while (serviceName != null && !"".equals(serviceName)) {
+ if (!names.contains(serviceName)) {
+ T instance = createInstance(serviceName, clazz);
+ services.add(instance);
+ }
+ names.add(serviceName);
+ serviceName = reader.readLine();
+ }
+ reader.close();
+ } else {
+ // is == null
+ LOG.warn("No resource file with name [{}] found.", name);
+ }
+ } catch (Exception e) {
+ LOG.error("Error occured when looking for resource file " + name, e);
+ }
+ return services;
+ }
+
public static Map<String, String> loadPath(String path) {
LOG.info("Load path looking for a resource file of name [{}] ...", path);
Map<String, String> pathMap = new HashMap<String, String>();
@@ -121,7 +156,7 @@ public class ServiceProvider {
reader.close();
}
} catch (Exception ex) {
- LOG.error("Error occured when looking for resource file " + path, ex);
+ LOG.error("Error occurred when looking for resource file " + path, ex);
}
return pathMap;
}
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java b/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java
index c1ccb4a..66d792d 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java
@@ -15,6 +15,7 @@ package org.apache.rocketmq.snode;/*
* limitations under the License.
*/
+import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -39,6 +40,9 @@ import org.apache.rocketmq.snode.client.DefaultConsumerIdsChangeListener;
import org.apache.rocketmq.snode.client.ProducerManager;
import org.apache.rocketmq.snode.client.SubscriptionGroupManager;
import org.apache.rocketmq.snode.config.SnodeConfig;
+import org.apache.rocketmq.snode.interceptor.InterceptorFactory;
+import org.apache.rocketmq.snode.interceptor.InterceptorGroup;
+import org.apache.rocketmq.snode.interceptor.Interceptor;
import org.apache.rocketmq.snode.offset.ConsumerOffsetManager;
import org.apache.rocketmq.snode.processor.ConsumerManageProcessor;
import org.apache.rocketmq.snode.processor.HearbeatProcessor;
@@ -76,6 +80,8 @@ public class SnodeController {
private SendMessageProcessor sendMessageProcessor;
private PullMessageProcessor pullMessageProcessor;
private HearbeatProcessor hearbeatProcessor;
+ private InterceptorGroup consumeMessageInterceptorGroup;
+ private InterceptorGroup sendMessageInterceptorGroup;
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
"SnodeControllerScheduledThread"));
@@ -161,9 +167,27 @@ public class SnodeController {
public boolean initialize() {
this.snodeServer = RemotingServerFactory.createInstance().init(this.nettyServerConfig, this.clientHousekeepingService);
this.registerProcessor();
+ initInterceptorGroup();
return true;
}
+ private void initInterceptorGroup() {
+ List<Interceptor> consumeMessageInterceptors = InterceptorFactory.getInstance().loadConsumeMessageInterceptors();
+ if (consumeMessageInterceptors != null) {
+ this.consumeMessageInterceptorGroup = new InterceptorGroup();
+ for (Interceptor interceptor : consumeMessageInterceptors) {
+ this.consumeMessageInterceptorGroup.registerInterceptor(interceptor);
+ }
+ }
+ List<Interceptor> sendMessageInterceptors = InterceptorFactory.getInstance().loadSendMessageInterceptors();
+ if (sendMessageInterceptors != null) {
+ this.sendMessageInterceptorGroup = new InterceptorGroup();
+ for (Interceptor interceptor : sendMessageInterceptors) {
+ this.sendMessageInterceptorGroup.registerInterceptor(interceptor);
+ }
+ }
+ }
+
public void registerProcessor() {
this.snodeServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendMessageProcessor, this.sendMessageExecutor);
this.snodeServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendMessageProcessor, this.sendMessageExecutor);
@@ -201,34 +225,18 @@ public class SnodeController {
return producerManager;
}
- public void setProducerManager(ProducerManager producerManager) {
- this.producerManager = producerManager;
- }
-
public RemotingServer getSnodeServer() {
return snodeServer;
}
- public void setSnodeServer(RemotingServer snodeServer) {
- this.snodeServer = snodeServer;
- }
-
public ConsumerManager getConsumerManager() {
return consumerManager;
}
- public void setConsumerManager(ConsumerManager consumerManager) {
- this.consumerManager = consumerManager;
- }
-
public SubscriptionGroupManager getSubscriptionGroupManager() {
return subscriptionGroupManager;
}
- public void setSubscriptionGroupManager(SubscriptionGroupManager subscriptionGroupManager) {
- this.subscriptionGroupManager = subscriptionGroupManager;
- }
-
public ClientConfig getNettyClientConfig() {
return nettyClientConfig;
}
@@ -237,31 +245,23 @@ public class SnodeController {
return enodeService;
}
- public void setEnodeService(EnodeService enodeService) {
- this.enodeService = enodeService;
- }
-
public NnodeService getNnodeService() {
return nnodeService;
}
- public void setNnodeService(NnodeService nnodeService) {
- this.nnodeService = nnodeService;
- }
-
public RemotingClient getRemotingClient() {
return remotingClient;
}
- public void setRemotingClient(RemotingClient remotingClient) {
- this.remotingClient = remotingClient;
- }
-
public ConsumerOffsetManager getConsumerOffsetManager() {
return consumerOffsetManager;
}
- public void setConsumerOffsetManager(ConsumerOffsetManager consumerOffsetManager) {
- this.consumerOffsetManager = consumerOffsetManager;
+ public InterceptorGroup getConsumeMessageInterceptorGroup() {
+ return consumeMessageInterceptorGroup;
+ }
+
+ public InterceptorGroup getSendMessageInterceptorGroup() {
+ return sendMessageInterceptorGroup;
}
}
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/interceptor/ExceptionContext.java b/snode/src/main/java/org/apache/rocketmq/snode/interceptor/ExceptionContext.java
new file mode 100644
index 0000000..569633d
--- /dev/null
+++ b/snode/src/main/java/org/apache/rocketmq/snode/interceptor/ExceptionContext.java
@@ -0,0 +1,47 @@
+package org.apache.rocketmq.snode.interceptor;/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.rocketmq.remoting.RemotingChannel;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+
+public class ExceptionContext extends RequestContext {
+ private Throwable throwable;
+ private String remark;
+
+ public ExceptionContext(RemotingCommand request, RemotingChannel remotingChannel, Throwable throwable,
+ String remark) {
+ super(request, remotingChannel);
+ this.throwable = throwable;
+ this.remark = remark;
+ }
+
+ public Throwable getThrowable() {
+ return throwable;
+ }
+
+ public void setThrowable(Throwable throwable) {
+ this.throwable = throwable;
+ }
+
+ public String getRemark() {
+ return remark;
+ }
+
+ public void setRemark(String remark) {
+ this.remark = remark;
+ }
+}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/mqtrace/SendMessageHook.java b/snode/src/main/java/org/apache/rocketmq/snode/interceptor/Interceptor.java
similarity index 76%
copy from broker/src/main/java/org/apache/rocketmq/broker/mqtrace/SendMessageHook.java
copy to snode/src/main/java/org/apache/rocketmq/snode/interceptor/Interceptor.java
index a74b6d6..18cf6b3 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/mqtrace/SendMessageHook.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/interceptor/Interceptor.java
@@ -14,12 +14,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.broker.mqtrace;
+package org.apache.rocketmq.snode.interceptor;
+public interface Interceptor {
+ void beforeSendMessage(RequestContext requestContext);
-public interface SendMessageHook {
- public String hookName();
+ void afterSendMessage(ResponseContext responseContext);
- public void sendMessageBefore(final SendMessageContext context);
-
- public void sendMessageAfter(final SendMessageContext context);
+ void onException(ExceptionContext exceptionContext);
}
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/interceptor/InterceptorFactory.java b/snode/src/main/java/org/apache/rocketmq/snode/interceptor/InterceptorFactory.java
new file mode 100644
index 0000000..e2b4332
--- /dev/null
+++ b/snode/src/main/java/org/apache/rocketmq/snode/interceptor/InterceptorFactory.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.snode.interceptor;
+
+import java.util.List;
+import org.apache.rocketmq.remoting.util.ServiceProvider;
+
+public class InterceptorFactory {
+ private static InterceptorFactory ourInstance = new InterceptorFactory();
+
+ public static InterceptorFactory getInstance() {
+ return ourInstance;
+ }
+
+ private final String SEND_MESSAGE_INTERCEPTOR = "META-INF/service/org.apache.rocketmq.snode.interceptor.SendMessageInterceptor";
+
+ private final String CONSUME_MESSAGE_INTERCEPTOR = "META-INF/service/org.apache.rocketmq.snode.interceptor.ConsumeMessageInterceptor";
+
+ private InterceptorFactory() {
+ }
+
+ public List loadConsumeMessageInterceptors() {
+ List<Interceptor> consumeMessageInterceptors = ServiceProvider.loadServiceList(CONSUME_MESSAGE_INTERCEPTOR, Interceptor.class);
+ return consumeMessageInterceptors;
+ }
+
+ public List loadSendMessageInterceptors() {
+ List<Interceptor> sendMessageInterceptors = ServiceProvider.loadServiceList(SEND_MESSAGE_INTERCEPTOR, Interceptor.class);
+ return sendMessageInterceptors;
+ }
+
+}
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/interceptor/InterceptorGroup.java b/snode/src/main/java/org/apache/rocketmq/snode/interceptor/InterceptorGroup.java
new file mode 100644
index 0000000..4582894
--- /dev/null
+++ b/snode/src/main/java/org/apache/rocketmq/snode/interceptor/InterceptorGroup.java
@@ -0,0 +1,47 @@
+package org.apache.rocketmq.snode.interceptor;/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class InterceptorGroup {
+ private List<Interceptor> interceptors = new ArrayList<>();
+
+ public void registerInterceptor(Interceptor sendMessageInterceptor) {
+ if (sendMessageInterceptor != null) {
+ interceptors.add(sendMessageInterceptor);
+ }
+ }
+
+ public void beforeRequest(RequestContext requestContext) {
+ for (Interceptor interceptor : interceptors) {
+ interceptor.beforeSendMessage(requestContext);
+ }
+ }
+
+ public void afterRequest(ResponseContext responseContext) {
+ for (Interceptor interceptor : interceptors) {
+ interceptor.afterSendMessage(responseContext);
+ }
+ }
+
+ public void onException(ExceptionContext exceptionContext) {
+ for (Interceptor interceptor : interceptors) {
+ interceptor.onException(exceptionContext);
+ }
+ }
+}
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/interceptor/RequestContext.java b/snode/src/main/java/org/apache/rocketmq/snode/interceptor/RequestContext.java
new file mode 100644
index 0000000..796358b
--- /dev/null
+++ b/snode/src/main/java/org/apache/rocketmq/snode/interceptor/RequestContext.java
@@ -0,0 +1,45 @@
+package org.apache.rocketmq.snode.interceptor;/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.rocketmq.remoting.RemotingChannel;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+
+public class RequestContext {
+ protected RemotingCommand request;
+ protected RemotingChannel remotingChannel;
+
+ public RequestContext(RemotingCommand request, RemotingChannel remotingChannel) {
+ this.remotingChannel = remotingChannel;
+ this.request = request;
+ }
+
+ public RemotingCommand getRequest() {
+ return request;
+ }
+
+ public void setRequest(RemotingCommand request) {
+ this.request = request;
+ }
+
+ public RemotingChannel getRemotingChannel() {
+ return remotingChannel;
+ }
+
+ public void setRemotingChannel(RemotingChannel remotingChannel) {
+ this.remotingChannel = remotingChannel;
+ }
+}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/mqtrace/SendMessageHook.java b/snode/src/main/java/org/apache/rocketmq/snode/interceptor/ResponseContext.java
similarity index 56%
copy from broker/src/main/java/org/apache/rocketmq/broker/mqtrace/SendMessageHook.java
copy to snode/src/main/java/org/apache/rocketmq/snode/interceptor/ResponseContext.java
index a74b6d6..2634426 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/mqtrace/SendMessageHook.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/interceptor/ResponseContext.java
@@ -1,4 +1,4 @@
-/*
+package org.apache.rocketmq.snode.interceptor;/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -14,12 +14,23 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.broker.mqtrace;
-public interface SendMessageHook {
- public String hookName();
+import org.apache.rocketmq.remoting.RemotingChannel;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
- public void sendMessageBefore(final SendMessageContext context);
+public class ResponseContext extends RequestContext {
+ private RemotingCommand response;
- public void sendMessageAfter(final SendMessageContext context);
+ public ResponseContext(RemotingCommand request, RemotingChannel remotingChannel, RemotingCommand response) {
+ super(request, remotingChannel);
+ this.response = response;
+ }
+
+ public RemotingCommand getResponse() {
+ return response;
+ }
+
+ public void setResponse(RemotingCommand response) {
+ this.response = response;
+ }
}
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/offset/ConsumerOffsetManager.java b/snode/src/main/java/org/apache/rocketmq/snode/offset/ConsumerOffsetManager.java
index c177ccf..065c017 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/offset/ConsumerOffsetManager.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/offset/ConsumerOffsetManager.java
@@ -42,7 +42,7 @@ public class ConsumerOffsetManager {
private static final String TOPIC_GROUP_SEPARATOR = "@";
private ConcurrentMap<String/* topic@group */, ConcurrentMap<Integer, Long>> offsetTable =
- new ConcurrentHashMap<String, ConcurrentMap<Integer, Long>>(512);
+ new ConcurrentHashMap<>(512);
private transient SnodeController snodeController;
@@ -207,7 +207,6 @@ public class ConsumerOffsetManager {
this.offsetTable = offsetTable;
}
-
public Map<Integer, Long> queryOffset(final String enodeName, final String group, final String topic) {
// topic@group
String key = buildKey(enodeName, topic, group);
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/processor/PullMessageProcessor.java b/snode/src/main/java/org/apache/rocketmq/snode/processor/PullMessageProcessor.java
index 645d7fc..f5d080f 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/processor/PullMessageProcessor.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/processor/PullMessageProcessor.java
@@ -32,6 +32,9 @@ import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.snode.SnodeController;
import org.apache.rocketmq.snode.client.ConsumerGroupInfo;
+import org.apache.rocketmq.snode.interceptor.ExceptionContext;
+import org.apache.rocketmq.snode.interceptor.RequestContext;
+import org.apache.rocketmq.snode.interceptor.ResponseContext;
public class PullMessageProcessor implements RequestProcessor {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME);
@@ -45,8 +48,21 @@ public class PullMessageProcessor implements RequestProcessor {
@Override
public RemotingCommand processRequest(RemotingChannel remotingChannel,
RemotingCommand request) throws RemotingCommandException {
- RemotingCommand response = RemotingCommand.createResponseCommand(PullMessageResponseHeader.class);
+ if (this.snodeController.getConsumeMessageInterceptorGroup() != null) {
+ RequestContext requestContext = new RequestContext(request, remotingChannel);
+ this.snodeController.getConsumeMessageInterceptorGroup().beforeRequest(requestContext);
+ }
+ RemotingCommand response = pullMessage(remotingChannel, request);
+ if (this.snodeController.getConsumeMessageInterceptorGroup() != null && response != null) {
+ ResponseContext responseContext = new ResponseContext(request, remotingChannel, response);
+ this.snodeController.getSendMessageInterceptorGroup().afterRequest(responseContext);
+ }
+ return response;
+ }
+ private RemotingCommand pullMessage(RemotingChannel remotingChannel,
+ RemotingCommand request) throws RemotingCommandException {
+ RemotingCommand response = RemotingCommand.createResponseCommand(PullMessageResponseHeader.class);
final PullMessageRequestHeader requestHeader =
(PullMessageRequestHeader) request.decodeCommandCustomHeader(PullMessageRequestHeader.class);
@@ -72,14 +88,13 @@ public class PullMessageProcessor implements RequestProcessor {
response.setRemark("The consumer group[" + requestHeader.getConsumerGroup() + "] can not consume by broadcast way");
return response;
}
-
- SubscriptionData subscriptionData = consumerGroupInfo.findSubscriptionData(requestHeader.getTopic());
- if (null == subscriptionData) {
+ if ((consumerGroupInfo == null) || (consumerGroupInfo.findSubscriptionData(requestHeader.getTopic()) == null)) {
log.warn("The consumer's subscription not exist, group: {}, topic:{}", requestHeader.getConsumerGroup(), requestHeader.getTopic());
response.setCode(ResponseCode.SUBSCRIPTION_NOT_EXIST);
response.setRemark("The consumer's subscription not exist" + FAQUrl.suggestTodo(FAQUrl.SAME_GROUP_DIFFERENT_TOPIC));
return response;
}
+ SubscriptionData subscriptionData = consumerGroupInfo.findSubscriptionData(requestHeader.getTopic());
if (subscriptionData.getSubVersion() < requestHeader.getSubVersion()) {
log.warn("The broker's subscription is not latest, group: {} {}", requestHeader.getConsumerGroup(),
@@ -92,8 +107,16 @@ public class PullMessageProcessor implements RequestProcessor {
CompletableFuture<RemotingCommand> responseFuture = snodeController.getEnodeService().pullMessage(request);
responseFuture.whenComplete((data, ex) -> {
if (ex == null) {
+ if (this.snodeController.getConsumeMessageInterceptorGroup() != null) {
+ ResponseContext responseContext = new ResponseContext(request, remotingChannel, data);
+ this.snodeController.getSendMessageInterceptorGroup().afterRequest(responseContext);
+ }
remotingChannel.reply(data);
} else {
+ if (this.snodeController.getConsumeMessageInterceptorGroup() != null) {
+ ExceptionContext exceptionContext = new ExceptionContext(request, remotingChannel, ex, null);
+ this.snodeController.getConsumeMessageInterceptorGroup().onException(exceptionContext);
+ }
log.error("Pull message error: {}", ex);
}
});
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/processor/SendMessageProcessor.java b/snode/src/main/java/org/apache/rocketmq/snode/processor/SendMessageProcessor.java
index bd18339..2c45bb7 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/processor/SendMessageProcessor.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/processor/SendMessageProcessor.java
@@ -23,6 +23,9 @@ import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.RequestProcessor;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.snode.SnodeController;
+import org.apache.rocketmq.snode.interceptor.ExceptionContext;
+import org.apache.rocketmq.snode.interceptor.RequestContext;
+import org.apache.rocketmq.snode.interceptor.ResponseContext;
public class SendMessageProcessor implements RequestProcessor {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME);
@@ -35,11 +38,23 @@ public class SendMessageProcessor implements RequestProcessor {
@Override
public RemotingCommand processRequest(RemotingChannel remotingChannel, RemotingCommand request) {
+ if (this.snodeController.getSendMessageInterceptorGroup() != null) {
+ RequestContext requestContext = new RequestContext(request, remotingChannel);
+ this.snodeController.getSendMessageInterceptorGroup().beforeRequest(requestContext);
+ }
CompletableFuture<RemotingCommand> responseFuture = snodeController.getEnodeService().sendMessage(request);
responseFuture.whenComplete((data, ex) -> {
if (ex == null) {
+ if (this.snodeController.getSendMessageInterceptorGroup() != null) {
+ ResponseContext responseContext = new ResponseContext(request, remotingChannel, data);
+ this.snodeController.getSendMessageInterceptorGroup().afterRequest(responseContext);
+ }
remotingChannel.reply(data);
} else {
+ if (this.snodeController.getSendMessageInterceptorGroup() != null) {
+ ExceptionContext exceptionContext = new ExceptionContext(request, remotingChannel, ex, null);
+ this.snodeController.getSendMessageInterceptorGroup().onException(exceptionContext);
+ }
log.error("Send Message error: {}", ex);
}
});
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/EnodeServiceImpl.java b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/EnodeServiceImpl.java
index 20a4d2f..5e28ebe 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/EnodeServiceImpl.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/EnodeServiceImpl.java
@@ -15,8 +15,6 @@ package org.apache.rocketmq.snode.service.impl;/*
* limitations under the License.
*/
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelHandlerContext;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
@@ -47,7 +45,6 @@ import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
-import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
import org.apache.rocketmq.remoting.netty.ResponseFuture;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/NnodeServiceImpl.java b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/NnodeServiceImpl.java
index b272c31..78fd624 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/NnodeServiceImpl.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/NnodeServiceImpl.java
@@ -93,7 +93,7 @@ public class NnodeServiceImpl implements NnodeService {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ROUTEINTO_BY_TOPIC, requestHeader);
RemotingCommand response = this.snodeController.getRemotingClient().invokeSync(null, request, SnodeConstant.defaultTimeoutMills);
- log.info("getTopicRouteInfoFromNameServer response: " + response);
+ log.info("GetTopicRouteInfoFromNameServer response: " + response);
assert response != null;
switch (response.getCode()) {
case ResponseCode.TOPIC_NOT_EXIST: {
diff --git a/snode/src/main/resources/META-INF/service/org.apache.rocketmq.snode.interceptor.ConsumeMessageInterceptor b/snode/src/main/resources/META-INF/service/org.apache.rocketmq.snode.interceptor.ConsumeMessageInterceptor
new file mode 100644
index 0000000..e69de29
diff --git a/snode/src/main/resources/META-INF/service/org.apache.rocketmq.snode.interceptor.SendMessageInterceptor b/snode/src/main/resources/META-INF/service/org.apache.rocketmq.snode.interceptor.SendMessageInterceptor
new file mode 100644
index 0000000..e69de29