You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by hz...@apache.org on 2022/11/25 12:35:45 UTC

[rocketmq] branch develop updated: Enable AbortProcessException to interrupt RPCHook (#5594)

This is an automated email from the ASF dual-hosted git repository.

hzh0425 pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 6f49a1034 Enable AbortProcessException to interrupt RPCHook (#5594)
6f49a1034 is described below

commit 6f49a1034d0698bea84ab0f358c5d184ab64efe8
Author: rongtong <ji...@163.com>
AuthorDate: Fri Nov 25 20:35:20 2022 +0800

    Enable AbortProcessException to interrupt RPCHook (#5594)
---
 .../processor/AbstractSendMessageProcessor.java    |  2 +-
 .../broker/processor/PullMessageProcessor.java     |  2 +-
 .../broker/processor/SendMessageProcessor.java     |  2 +-
 .../broker/processor/SendMessageProcessorTest.java |  2 +-
 .../rocketmq/common}/AbortProcessException.java    |  5 ++--
 .../remoting/netty/NettyRemotingAbstract.java      | 34 +++++++++++++++-------
 6 files changed, 29 insertions(+), 18 deletions(-)

diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java
index d87b765b6..9022f66ec 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java
@@ -23,7 +23,7 @@ import java.util.Map;
 import java.util.Random;
 import java.util.concurrent.ThreadLocalRandom;
 import org.apache.rocketmq.broker.BrokerController;
-import org.apache.rocketmq.broker.mqtrace.AbortProcessException;
+import org.apache.rocketmq.common.AbortProcessException;
 import org.apache.rocketmq.broker.mqtrace.ConsumeMessageContext;
 import org.apache.rocketmq.broker.mqtrace.ConsumeMessageHook;
 import org.apache.rocketmq.broker.mqtrace.SendMessageContext;
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
index c3b610b7b..e1294c129 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
@@ -27,7 +27,7 @@ import org.apache.rocketmq.broker.filter.ConsumerFilterData;
 import org.apache.rocketmq.broker.filter.ConsumerFilterManager;
 import org.apache.rocketmq.broker.filter.ExpressionForRetryMessageFilter;
 import org.apache.rocketmq.broker.filter.ExpressionMessageFilter;
-import org.apache.rocketmq.broker.mqtrace.AbortProcessException;
+import org.apache.rocketmq.common.AbortProcessException;
 import org.apache.rocketmq.broker.mqtrace.ConsumeMessageContext;
 import org.apache.rocketmq.broker.mqtrace.ConsumeMessageHook;
 import org.apache.rocketmq.broker.plugin.PullMessageResultHandler;
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
index 14095f9ec..6a2af0ddb 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
@@ -26,7 +26,7 @@ import java.util.concurrent.CompletableFuture;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.metrics.BrokerMetricsManager;
-import org.apache.rocketmq.broker.mqtrace.AbortProcessException;
+import org.apache.rocketmq.common.AbortProcessException;
 import org.apache.rocketmq.broker.mqtrace.SendMessageContext;
 import org.apache.rocketmq.common.MQVersion;
 import org.apache.rocketmq.common.MixAll;
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/SendMessageProcessorTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/SendMessageProcessorTest.java
index 646db4b22..e046c8884 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/processor/SendMessageProcessorTest.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/SendMessageProcessorTest.java
@@ -26,7 +26,7 @@ import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executors;
 import org.apache.rocketmq.broker.BrokerController;
-import org.apache.rocketmq.broker.mqtrace.AbortProcessException;
+import org.apache.rocketmq.common.AbortProcessException;
 import org.apache.rocketmq.broker.mqtrace.ConsumeMessageContext;
 import org.apache.rocketmq.broker.mqtrace.ConsumeMessageHook;
 import org.apache.rocketmq.broker.mqtrace.SendMessageContext;
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/mqtrace/AbortProcessException.java b/common/src/main/java/org/apache/rocketmq/common/AbortProcessException.java
similarity index 95%
rename from broker/src/main/java/org/apache/rocketmq/broker/mqtrace/AbortProcessException.java
rename to common/src/main/java/org/apache/rocketmq/common/AbortProcessException.java
index c81a29a88..562fdaac6 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/mqtrace/AbortProcessException.java
+++ b/common/src/main/java/org/apache/rocketmq/common/AbortProcessException.java
@@ -14,14 +14,13 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.broker.mqtrace;
+package org.apache.rocketmq.common;
 
-import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.help.FAQUrl;
 
 /**
  *
- * This exception is used for broker hooks only : SendMessageHook, ConsumeMessageHook, pullMessageHook
+ * This exception is used for broker hooks only : SendMessageHook, ConsumeMessageHook, RPCHook
  * This exception is not ignored while executing hooks and it means that
  * certain processor should return an immediate error response to the client. The
  * error response code is included in AbortProcessException.  it's naming might
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
index e45e634ac..0b8ef7086 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
@@ -39,6 +39,7 @@ import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
 import javax.annotation.Nullable;
+import org.apache.rocketmq.common.AbortProcessException;
 import org.apache.rocketmq.common.Pair;
 import org.apache.rocketmq.common.ServiceThread;
 import org.apache.rocketmq.common.UtilAll;
@@ -85,14 +86,14 @@ public abstract class NettyRemotingAbstract {
      * This map caches all on-going requests.
      */
     protected final ConcurrentMap<Integer /* opaque */, ResponseFuture> responseTable =
-            new ConcurrentHashMap<>(256);
+        new ConcurrentHashMap<>(256);
 
     /**
      * This container holds all processors per request code, aka, for each incoming request, we may look up the
      * responding processor in this map to handle the request.
      */
     protected final HashMap<Integer/* request code */, Pair<NettyRequestProcessor, ExecutorService>> processorTable =
-            new HashMap<>(64);
+        new HashMap<>(64);
 
     /**
      * Executor to feed netty events to user defined {@link ChannelEventListener}.
@@ -100,7 +101,8 @@ public abstract class NettyRemotingAbstract {
     protected final NettyEventExecutor nettyEventExecutor = new NettyEventExecutor();
 
     /**
-     * The default request processor to use in case there is no exact match in {@link #processorTable} per request code.
+     * The default request processor to use in case there is no exact match in {@link #processorTable} per request
+     * code.
      */
     protected Pair<NettyRequestProcessor, ExecutorService> defaultRequestProcessorPair;
 
@@ -122,7 +124,7 @@ public abstract class NettyRemotingAbstract {
      * Constructor, specifying capacity of one-way and asynchronous semaphores.
      *
      * @param permitsOneway Number of permits for one-way requests.
-     * @param permitsAsync  Number of permits for asynchronous requests.
+     * @param permitsAsync Number of permits for asynchronous requests.
      */
     public NettyRemotingAbstract(final int permitsOneway, final int permitsAsync) {
         this.semaphoreOneway = new Semaphore(permitsOneway, true);
@@ -195,7 +197,8 @@ public abstract class NettyRemotingAbstract {
         writeResponse(channel, request, response, null);
     }
 
-    public static void writeResponse(Channel channel, RemotingCommand request, @Nullable RemotingCommand response, Consumer<Future<?>> callback) {
+    public static void writeResponse(Channel channel, RemotingCommand request, @Nullable RemotingCommand response,
+        Consumer<Future<?>> callback) {
         if (response == null) {
             return;
         }
@@ -259,7 +262,7 @@ public abstract class NettyRemotingAbstract {
 
         if (pair.getObject1().rejectRequest()) {
             final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
-                    "[REJECTREQUEST]system busy, start flow control for a while");
+                "[REJECTREQUEST]system busy, start flow control for a while");
             response.setOpaque(opaque);
             writeResponse(ctx.channel(), cmd, response);
             return;
@@ -272,9 +275,9 @@ public abstract class NettyRemotingAbstract {
         } catch (RejectedExecutionException e) {
             if ((System.currentTimeMillis() % 10000) == 0) {
                 log.warn(RemotingHelper.parseChannelRemoteAddr(ctx.channel())
-                        + ", too many requests and system thread pool busy, RejectedExecutionException "
-                        + pair.getObject2().toString()
-                        + " request code: " + cmd.getCode());
+                    + ", too many requests and system thread pool busy, RejectedExecutionException "
+                    + pair.getObject2().toString()
+                    + " request code: " + cmd.getCode());
             }
 
             final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
@@ -289,7 +292,8 @@ public abstract class NettyRemotingAbstract {
         }
     }
 
-    private Runnable buildProcessRequestHandler(ChannelHandlerContext ctx, RemotingCommand cmd, Pair<NettyRequestProcessor, ExecutorService> pair, int opaque) {
+    private Runnable buildProcessRequestHandler(ChannelHandlerContext ctx, RemotingCommand cmd,
+        Pair<NettyRequestProcessor, ExecutorService> pair, int opaque) {
         return () -> {
             Exception exception = null;
             RemotingCommand response;
@@ -298,6 +302,8 @@ public abstract class NettyRemotingAbstract {
                 String remoteAddr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
                 try {
                     doBeforeRpcHooks(remoteAddr, cmd);
+                } catch (AbortProcessException e) {
+                    throw e;
                 } catch (Exception e) {
                     exception = e;
                 }
@@ -310,6 +316,8 @@ public abstract class NettyRemotingAbstract {
 
                 try {
                     doAfterRpcHooks(remoteAddr, cmd, response);
+                } catch (AbortProcessException e) {
+                    throw e;
                 } catch (Exception e) {
                     exception = e;
                 }
@@ -318,6 +326,10 @@ public abstract class NettyRemotingAbstract {
                     throw exception;
                 }
 
+                writeResponse(ctx.channel(), cmd, response);
+            } catch (AbortProcessException e) {
+                response = RemotingCommand.createResponseCommand(e.getResponseCode(), e.getErrorMessage());
+                response.setOpaque(opaque);
                 writeResponse(ctx.channel(), cmd, response);
             } catch (Throwable e) {
                 log.error("process request exception", e);
@@ -325,7 +337,7 @@ public abstract class NettyRemotingAbstract {
 
                 if (!cmd.isOnewayRPC()) {
                     response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR,
-                            UtilAll.exceptionSimpleDesc(e));
+                        UtilAll.exceptionSimpleDesc(e));
                     response.setOpaque(opaque);
                     writeResponse(ctx.channel(), cmd, response);
                 }