You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by hz...@apache.org on 2022/11/25 12:35:45 UTC
[rocketmq] branch develop updated: Enable AbortProcessException to interrupt RPCHook (#5594)
This is an automated email from the ASF dual-hosted git repository.
hzh0425 pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 6f49a1034 Enable AbortProcessException to interrupt RPCHook (#5594)
6f49a1034 is described below
commit 6f49a1034d0698bea84ab0f358c5d184ab64efe8
Author: rongtong <ji...@163.com>
AuthorDate: Fri Nov 25 20:35:20 2022 +0800
Enable AbortProcessException to interrupt RPCHook (#5594)
---
.../processor/AbstractSendMessageProcessor.java | 2 +-
.../broker/processor/PullMessageProcessor.java | 2 +-
.../broker/processor/SendMessageProcessor.java | 2 +-
.../broker/processor/SendMessageProcessorTest.java | 2 +-
.../rocketmq/common}/AbortProcessException.java | 5 ++--
.../remoting/netty/NettyRemotingAbstract.java | 34 +++++++++++++++-------
6 files changed, 29 insertions(+), 18 deletions(-)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java
index d87b765b6..9022f66ec 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java
@@ -23,7 +23,7 @@ import java.util.Map;
import java.util.Random;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.rocketmq.broker.BrokerController;
-import org.apache.rocketmq.broker.mqtrace.AbortProcessException;
+import org.apache.rocketmq.common.AbortProcessException;
import org.apache.rocketmq.broker.mqtrace.ConsumeMessageContext;
import org.apache.rocketmq.broker.mqtrace.ConsumeMessageHook;
import org.apache.rocketmq.broker.mqtrace.SendMessageContext;
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
index c3b610b7b..e1294c129 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
@@ -27,7 +27,7 @@ import org.apache.rocketmq.broker.filter.ConsumerFilterData;
import org.apache.rocketmq.broker.filter.ConsumerFilterManager;
import org.apache.rocketmq.broker.filter.ExpressionForRetryMessageFilter;
import org.apache.rocketmq.broker.filter.ExpressionMessageFilter;
-import org.apache.rocketmq.broker.mqtrace.AbortProcessException;
+import org.apache.rocketmq.common.AbortProcessException;
import org.apache.rocketmq.broker.mqtrace.ConsumeMessageContext;
import org.apache.rocketmq.broker.mqtrace.ConsumeMessageHook;
import org.apache.rocketmq.broker.plugin.PullMessageResultHandler;
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
index 14095f9ec..6a2af0ddb 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
@@ -26,7 +26,7 @@ import java.util.concurrent.CompletableFuture;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.metrics.BrokerMetricsManager;
-import org.apache.rocketmq.broker.mqtrace.AbortProcessException;
+import org.apache.rocketmq.common.AbortProcessException;
import org.apache.rocketmq.broker.mqtrace.SendMessageContext;
import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.MixAll;
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/SendMessageProcessorTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/SendMessageProcessorTest.java
index 646db4b22..e046c8884 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/processor/SendMessageProcessorTest.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/SendMessageProcessorTest.java
@@ -26,7 +26,7 @@ import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import org.apache.rocketmq.broker.BrokerController;
-import org.apache.rocketmq.broker.mqtrace.AbortProcessException;
+import org.apache.rocketmq.common.AbortProcessException;
import org.apache.rocketmq.broker.mqtrace.ConsumeMessageContext;
import org.apache.rocketmq.broker.mqtrace.ConsumeMessageHook;
import org.apache.rocketmq.broker.mqtrace.SendMessageContext;
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/mqtrace/AbortProcessException.java b/common/src/main/java/org/apache/rocketmq/common/AbortProcessException.java
similarity index 95%
rename from broker/src/main/java/org/apache/rocketmq/broker/mqtrace/AbortProcessException.java
rename to common/src/main/java/org/apache/rocketmq/common/AbortProcessException.java
index c81a29a88..562fdaac6 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/mqtrace/AbortProcessException.java
+++ b/common/src/main/java/org/apache/rocketmq/common/AbortProcessException.java
@@ -14,14 +14,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.broker.mqtrace;
+package org.apache.rocketmq.common;
-import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.help.FAQUrl;
/**
*
- * This exception is used for broker hooks only : SendMessageHook, ConsumeMessageHook, pullMessageHook
+ * This exception is used for broker hooks only : SendMessageHook, ConsumeMessageHook, RPCHook
* This exception is not ignored while executing hooks and it means that
* certain processor should return an immediate error response to the client. The
* error response code is included in AbortProcessException. it's naming might
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
index e45e634ac..0b8ef7086 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
@@ -39,6 +39,7 @@ import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import javax.annotation.Nullable;
+import org.apache.rocketmq.common.AbortProcessException;
import org.apache.rocketmq.common.Pair;
import org.apache.rocketmq.common.ServiceThread;
import org.apache.rocketmq.common.UtilAll;
@@ -85,14 +86,14 @@ public abstract class NettyRemotingAbstract {
* This map caches all on-going requests.
*/
protected final ConcurrentMap<Integer /* opaque */, ResponseFuture> responseTable =
- new ConcurrentHashMap<>(256);
+ new ConcurrentHashMap<>(256);
/**
* This container holds all processors per request code, aka, for each incoming request, we may look up the
* responding processor in this map to handle the request.
*/
protected final HashMap<Integer/* request code */, Pair<NettyRequestProcessor, ExecutorService>> processorTable =
- new HashMap<>(64);
+ new HashMap<>(64);
/**
* Executor to feed netty events to user defined {@link ChannelEventListener}.
@@ -100,7 +101,8 @@ public abstract class NettyRemotingAbstract {
protected final NettyEventExecutor nettyEventExecutor = new NettyEventExecutor();
/**
- * The default request processor to use in case there is no exact match in {@link #processorTable} per request code.
+ * The default request processor to use in case there is no exact match in {@link #processorTable} per request
+ * code.
*/
protected Pair<NettyRequestProcessor, ExecutorService> defaultRequestProcessorPair;
@@ -122,7 +124,7 @@ public abstract class NettyRemotingAbstract {
* Constructor, specifying capacity of one-way and asynchronous semaphores.
*
* @param permitsOneway Number of permits for one-way requests.
- * @param permitsAsync Number of permits for asynchronous requests.
+ * @param permitsAsync Number of permits for asynchronous requests.
*/
public NettyRemotingAbstract(final int permitsOneway, final int permitsAsync) {
this.semaphoreOneway = new Semaphore(permitsOneway, true);
@@ -195,7 +197,8 @@ public abstract class NettyRemotingAbstract {
writeResponse(channel, request, response, null);
}
- public static void writeResponse(Channel channel, RemotingCommand request, @Nullable RemotingCommand response, Consumer<Future<?>> callback) {
+ public static void writeResponse(Channel channel, RemotingCommand request, @Nullable RemotingCommand response,
+ Consumer<Future<?>> callback) {
if (response == null) {
return;
}
@@ -259,7 +262,7 @@ public abstract class NettyRemotingAbstract {
if (pair.getObject1().rejectRequest()) {
final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
- "[REJECTREQUEST]system busy, start flow control for a while");
+ "[REJECTREQUEST]system busy, start flow control for a while");
response.setOpaque(opaque);
writeResponse(ctx.channel(), cmd, response);
return;
@@ -272,9 +275,9 @@ public abstract class NettyRemotingAbstract {
} catch (RejectedExecutionException e) {
if ((System.currentTimeMillis() % 10000) == 0) {
log.warn(RemotingHelper.parseChannelRemoteAddr(ctx.channel())
- + ", too many requests and system thread pool busy, RejectedExecutionException "
- + pair.getObject2().toString()
- + " request code: " + cmd.getCode());
+ + ", too many requests and system thread pool busy, RejectedExecutionException "
+ + pair.getObject2().toString()
+ + " request code: " + cmd.getCode());
}
final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
@@ -289,7 +292,8 @@ public abstract class NettyRemotingAbstract {
}
}
- private Runnable buildProcessRequestHandler(ChannelHandlerContext ctx, RemotingCommand cmd, Pair<NettyRequestProcessor, ExecutorService> pair, int opaque) {
+ private Runnable buildProcessRequestHandler(ChannelHandlerContext ctx, RemotingCommand cmd,
+ Pair<NettyRequestProcessor, ExecutorService> pair, int opaque) {
return () -> {
Exception exception = null;
RemotingCommand response;
@@ -298,6 +302,8 @@ public abstract class NettyRemotingAbstract {
String remoteAddr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
try {
doBeforeRpcHooks(remoteAddr, cmd);
+ } catch (AbortProcessException e) {
+ throw e;
} catch (Exception e) {
exception = e;
}
@@ -310,6 +316,8 @@ public abstract class NettyRemotingAbstract {
try {
doAfterRpcHooks(remoteAddr, cmd, response);
+ } catch (AbortProcessException e) {
+ throw e;
} catch (Exception e) {
exception = e;
}
@@ -318,6 +326,10 @@ public abstract class NettyRemotingAbstract {
throw exception;
}
+ writeResponse(ctx.channel(), cmd, response);
+ } catch (AbortProcessException e) {
+ response = RemotingCommand.createResponseCommand(e.getResponseCode(), e.getErrorMessage());
+ response.setOpaque(opaque);
writeResponse(ctx.channel(), cmd, response);
} catch (Throwable e) {
log.error("process request exception", e);
@@ -325,7 +337,7 @@ public abstract class NettyRemotingAbstract {
if (!cmd.isOnewayRPC()) {
response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR,
- UtilAll.exceptionSimpleDesc(e));
+ UtilAll.exceptionSimpleDesc(e));
response.setOpaque(opaque);
writeResponse(ctx.channel(), cmd, response);
}