You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by mi...@apache.org on 2022/12/31 10:23:56 UTC

[incubator-eventmesh] branch master updated: fix issue2707

This is an automated email from the ASF dual-hosted git repository.

mikexue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git


The following commit(s) were added to refs/heads/master by this push:
     new 556a56631 fix issue2707
     new d4e863e99 Merge pull request #2752 from jonyangx/issue2707
556a56631 is described below

commit 556a56631932731dd8bdf0481cd0c152baac89a0
Author: jonyangx <ya...@gmail.com>
AuthorDate: Sat Dec 31 14:32:10 2022 +0800

    fix issue2707
---
 .../eventmesh/runtime/boot/AbstractHTTPServer.java |  97 +++++-----
 .../runtime/boot/AbstractRemotingServer.java       | 104 ++++++----
 .../eventmesh/runtime/boot/EventMeshTCPServer.java | 211 +++++++++++----------
 3 files changed, 233 insertions(+), 179 deletions(-)

diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractHTTPServer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractHTTPServer.java
index 1001cfbc3..d24afbcb1 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractHTTPServer.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractHTTPServer.java
@@ -125,20 +125,20 @@ public abstract class AbstractHTTPServer extends AbstractRemotingServer {
     private final EventMeshHTTPConfiguration eventMeshHttpConfiguration;
 
     public ThreadPoolExecutor asyncContextCompleteHandler =
-        ThreadPoolFactory.createThreadPoolExecutor(10, 10, "EventMesh-http-asyncContext-");
+            ThreadPoolFactory.createThreadPoolExecutor(10, 10, "EventMesh-http-asyncContext-");
 
     static {
         DiskAttribute.deleteOnExitTemporaryFile = false;
     }
 
     protected final Map<String/* request code */, Pair<HttpRequestProcessor, ThreadPoolExecutor>>
-        processorTable = new HashMap<>(64);
+            processorTable = new HashMap<>(64);
 
     protected final Map<String/* request uri */, Pair<EventProcessor, ThreadPoolExecutor>>
-        eventProcessorTable = new HashMap<>(64);
+            eventProcessorTable = new HashMap<>(64);
 
     public AbstractHTTPServer(int port, boolean useTLS, EventMeshHTTPConfiguration eventMeshHttpConfiguration) {
-        this.port = port;
+        this.setPort(port);
         this.useTLS = useTLS;
         this.eventMeshHttpConfiguration = eventMeshHttpConfiguration;
     }
@@ -147,7 +147,7 @@ public abstract class AbstractHTTPServer extends AbstractRemotingServer {
         FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, status);
         HttpHeaders responseHeaders = response.headers();
         responseHeaders.add(
-            HttpHeaderNames.CONTENT_TYPE, String.format("text/plain; charset=%s", EventMeshConstants.DEFAULT_CHARSET)
+                HttpHeaderNames.CONTENT_TYPE, String.format("text/plain; charset=%s", EventMeshConstants.DEFAULT_CHARSET)
         );
         responseHeaders.add(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes());
         responseHeaders.add(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
@@ -159,7 +159,7 @@ public abstract class AbstractHTTPServer extends AbstractRemotingServer {
         ctx.writeAndFlush(response).addListener((ChannelFutureListener) f -> {
             if (!f.isSuccess()) {
                 httpLogger.warn("send response to [{}] fail, will close this channel",
-                    RemotingHelper.parseChannelRemoteAddr(f.channel()));
+                        RemotingHelper.parseChannelRemoteAddr(f.channel()));
                 f.channel().close();
             }
         });
@@ -167,17 +167,16 @@ public abstract class AbstractHTTPServer extends AbstractRemotingServer {
 
     @Override
     public void start() throws Exception {
-        super.start();
         Runnable r = () -> {
             ServerBootstrap b = new ServerBootstrap();
             SSLContext sslContext = useTLS ? SSLContextFactory.getSslContext(eventMeshHttpConfiguration) : null;
-            b.group(this.bossGroup, this.workerGroup)
-                .channel(NioServerSocketChannel.class)
-                .childHandler(new HttpsServerInitializer(sslContext))
-                .childOption(ChannelOption.SO_KEEPALIVE, Boolean.TRUE);
+            b.group(this.getBossGroup(), this.getWorkerGroup())
+                    .channel(NioServerSocketChannel.class)
+                    .childHandler(new HttpsServerInitializer(sslContext))
+                    .childOption(ChannelOption.SO_KEEPALIVE, Boolean.TRUE);
             try {
-                httpServerLogger.info("HTTPServer[port={}] started......", this.port);
-                ChannelFuture future = b.bind(this.port).sync();
+                httpServerLogger.info("HTTPServer[port={}] started......", this.getPort());
+                ChannelFuture future = b.bind(this.getPort()).sync();
                 future.channel().closeFuture().sync();
             } catch (Exception e) {
                 httpServerLogger.error("HTTPServer start Err!", e);
@@ -254,7 +253,7 @@ public abstract class AbstractHTTPServer extends AbstractRemotingServer {
         long startTime = System.currentTimeMillis();
         HttpHeaders requestHeaders = httpRequest.headers();
         requestHeaders.set(ProtocolKey.ClientInstanceKey.IP,
-            RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
+                RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
 
         String protocolVersion = httpRequest.headers().get(ProtocolKey.VERSION);
         if (StringUtils.isBlank(protocolVersion)) {
@@ -279,7 +278,7 @@ public abstract class AbstractHTTPServer extends AbstractRemotingServer {
             getDecoder.parameters().forEach((key, value) -> httpRequestBody.put(key, value.get(0)));
         } else if (HttpMethod.POST.equals(httpRequest.method())) {
             HttpPostRequestDecoder decoder =
-                new HttpPostRequestDecoder(defaultHttpDataFactory, httpRequest);
+                    new HttpPostRequestDecoder(defaultHttpDataFactory, httpRequest);
             for (InterfaceHttpData parm : decoder.getBodyHttpDatas()) {
                 if (parm.getHttpDataType() == InterfaceHttpData.HttpDataType.Attribute) {
                     Attribute data = (Attribute) parm;
@@ -318,7 +317,7 @@ public abstract class AbstractHTTPServer extends AbstractRemotingServer {
                     sendError(ctx, errorStatus);
 
                     span = TraceUtils.prepareServerSpan(headerMap,
-                        EventMeshTraceConstants.TRACE_UPSTREAM_EVENTMESH_SERVER_SPAN, false);
+                            EventMeshTraceConstants.TRACE_UPSTREAM_EVENTMESH_SERVER_SPAN, false);
                     TraceUtils.finishSpanWithException(span, headerMap, errorStatus.reasonPhrase(), null);
                     return;
                 }
@@ -340,7 +339,7 @@ public abstract class AbstractHTTPServer extends AbstractRemotingServer {
                     HttpEventWrapper httpEventWrapper = parseHttpRequest(httpRequest);
 
                     AsyncContext<HttpEventWrapper> asyncContext =
-                        new AsyncContext<>(httpEventWrapper, null, asyncContextCompleteHandler);
+                            new AsyncContext<>(httpEventWrapper, null, asyncContextCompleteHandler);
                     processHttpRequest(ctx, asyncContext);
 
                 } else {
@@ -349,9 +348,9 @@ public abstract class AbstractHTTPServer extends AbstractRemotingServer {
                     final Map<String, Object> bodyMap = parseHttpRequestBody(httpRequest);
 
                     String requestCode =
-                        (httpRequest.method() == HttpMethod.POST)
-                            ? httpRequest.headers().get(ProtocolKey.REQUEST_CODE)
-                            : MapUtils.getString(bodyMap, StringUtils.lowerCase(ProtocolKey.REQUEST_CODE), "");
+                            (httpRequest.method() == HttpMethod.POST)
+                                    ? httpRequest.headers().get(ProtocolKey.REQUEST_CODE)
+                                    : MapUtils.getString(bodyMap, StringUtils.lowerCase(ProtocolKey.REQUEST_CODE), "");
 
                     requestCommand.setHttpMethod(httpRequest.method().name());
                     requestCommand.setHttpVersion(httpRequest.protocolVersion().protocolName());
@@ -360,16 +359,16 @@ public abstract class AbstractHTTPServer extends AbstractRemotingServer {
                     HttpCommand responseCommand = null;
 
                     if (StringUtils.isBlank(requestCode)
-                        || !processorTable.containsKey(requestCode)
-                        || !RequestCode.contains(Integer.valueOf(requestCode))) {
+                            || !processorTable.containsKey(requestCode)
+                            || !RequestCode.contains(Integer.valueOf(requestCode))) {
                         responseCommand =
-                            requestCommand.createHttpCommandResponse(EventMeshRetCode.EVENTMESH_REQUESTCODE_INVALID);
+                                requestCommand.createHttpCommandResponse(EventMeshRetCode.EVENTMESH_REQUESTCODE_INVALID);
                         sendResponse(ctx, responseCommand.httpResponse());
 
                         span = TraceUtils.prepareServerSpan(headerMap,
-                            EventMeshTraceConstants.TRACE_UPSTREAM_EVENTMESH_SERVER_SPAN, false);
+                                EventMeshTraceConstants.TRACE_UPSTREAM_EVENTMESH_SERVER_SPAN, false);
                         TraceUtils.finishSpanWithException(span, headerMap,
-                            EventMeshRetCode.EVENTMESH_REQUESTCODE_INVALID.getErrMsg(), null);
+                                EventMeshRetCode.EVENTMESH_REQUESTCODE_INVALID.getErrMsg(), null);
                         return;
                     }
 
@@ -381,9 +380,9 @@ public abstract class AbstractHTTPServer extends AbstractRemotingServer {
                         sendResponse(ctx, responseCommand.httpResponse());
 
                         span = TraceUtils.prepareServerSpan(headerMap,
-                            EventMeshTraceConstants.TRACE_UPSTREAM_EVENTMESH_SERVER_SPAN, false);
+                                EventMeshTraceConstants.TRACE_UPSTREAM_EVENTMESH_SERVER_SPAN, false);
                         TraceUtils.finishSpanWithException(span, headerMap,
-                            EventMeshRetCode.EVENTMESH_RUNTIME_ERR.getErrMsg(), e);
+                                EventMeshRetCode.EVENTMESH_RUNTIME_ERR.getErrMsg(), e);
                         return;
                     }
 
@@ -392,7 +391,7 @@ public abstract class AbstractHTTPServer extends AbstractRemotingServer {
                     }
 
                     AsyncContext<HttpCommand> asyncContext =
-                        new AsyncContext<>(requestCommand, responseCommand, asyncContextCompleteHandler);
+                            new AsyncContext<>(requestCommand, responseCommand, asyncContextCompleteHandler);
                     processEventMeshRequest(ctx, asyncContext);
                 }
 
@@ -422,7 +421,7 @@ public abstract class AbstractHTTPServer extends AbstractRemotingServer {
                         if (processor.rejectRequest()) {
 
                             HttpEventWrapper responseWrapper =
-                                requestWrapper.createHttpResponse(EventMeshRetCode.EVENTMESH_REJECT_BY_PROCESSOR_ERROR);
+                                    requestWrapper.createHttpResponse(EventMeshRetCode.EVENTMESH_REJECT_BY_PROCESSOR_ERROR);
 
                             asyncContext.onComplete(responseWrapper);
                             if (asyncContext.isComplete()) {
@@ -440,7 +439,7 @@ public abstract class AbstractHTTPServer extends AbstractRemotingServer {
                         }
 
                         metrics.getSummaryMetrics()
-                            .recordHTTPReqResTimeCost(System.currentTimeMillis() - requestWrapper.getReqTime());
+                                .recordHTTPReqResTimeCost(System.currentTimeMillis() - requestWrapper.getReqTime());
 
                         if (httpLogger.isDebugEnabled()) {
                             httpLogger.debug("{}", asyncContext.getResponse());
@@ -456,7 +455,7 @@ public abstract class AbstractHTTPServer extends AbstractRemotingServer {
                 asyncContext.onComplete(responseWrapper);
                 metrics.getSummaryMetrics().recordHTTPDiscard();
                 metrics.getSummaryMetrics().recordHTTPReqResTimeCost(
-                    System.currentTimeMillis() - requestWrapper.getReqTime());
+                        System.currentTimeMillis() - requestWrapper.getReqTime());
                 try {
                     sendResponse(ctx, asyncContext.getResponse().httpResponse());
                 } catch (Exception e) {
@@ -476,7 +475,7 @@ public abstract class AbstractHTTPServer extends AbstractRemotingServer {
                         HttpRequestProcessor processor = choosed.getObject1();
                         if (processor.rejectRequest()) {
                             HttpCommand responseCommand =
-                                request.createHttpCommandResponse(EventMeshRetCode.EVENTMESH_REJECT_BY_PROCESSOR_ERROR);
+                                    request.createHttpCommandResponse(EventMeshRetCode.EVENTMESH_REJECT_BY_PROCESSOR_ERROR);
                             asyncContext.onComplete(responseCommand);
                             if (asyncContext.isComplete()) {
                                 if (httpLogger.isDebugEnabled()) {
@@ -486,10 +485,10 @@ public abstract class AbstractHTTPServer extends AbstractRemotingServer {
 
                                 Map<String, Object> traceMap = asyncContext.getRequest().getHeader().toMap();
                                 Span span = TraceUtils.prepareServerSpan(traceMap,
-                                    EventMeshTraceConstants.TRACE_UPSTREAM_EVENTMESH_SERVER_SPAN,
-                                    false);
+                                        EventMeshTraceConstants.TRACE_UPSTREAM_EVENTMESH_SERVER_SPAN,
+                                        false);
                                 TraceUtils.finishSpanWithException(span, traceMap,
-                                    EventMeshRetCode.EVENTMESH_REJECT_BY_PROCESSOR_ERROR.getErrMsg(), null);
+                                        EventMeshRetCode.EVENTMESH_REJECT_BY_PROCESSOR_ERROR.getErrMsg(), null);
                             }
                             return;
                         }
@@ -500,7 +499,7 @@ public abstract class AbstractHTTPServer extends AbstractRemotingServer {
                         }
 
                         metrics.getSummaryMetrics()
-                            .recordHTTPReqResTimeCost(System.currentTimeMillis() - request.getReqTime());
+                                .recordHTTPReqResTimeCost(System.currentTimeMillis() - request.getReqTime());
 
                         if (httpLogger.isDebugEnabled()) {
                             httpLogger.debug("{}", asyncContext.getResponse());
@@ -522,9 +521,9 @@ public abstract class AbstractHTTPServer extends AbstractRemotingServer {
 
                     Map<String, Object> traceMap = asyncContext.getRequest().getHeader().toMap();
                     Span span = TraceUtils.prepareServerSpan(traceMap,
-                        EventMeshTraceConstants.TRACE_UPSTREAM_EVENTMESH_SERVER_SPAN, false);
+                            EventMeshTraceConstants.TRACE_UPSTREAM_EVENTMESH_SERVER_SPAN, false);
                     TraceUtils.finishSpanWithException(span, traceMap,
-                        EventMeshRetCode.EVENTMESH_RUNTIME_ERR.getErrMsg(), re);
+                            EventMeshRetCode.EVENTMESH_RUNTIME_ERR.getErrMsg(), re);
                 } catch (Exception e) {
                     httpServerLogger.error("processEventMeshRequest fail", re);
                 }
@@ -573,18 +572,18 @@ public abstract class AbstractHTTPServer extends AbstractRemotingServer {
         } else if (HttpMethod.POST == fullHttpRequest.method()) {
 
             if (StringUtils.contains(httpRequest.headers().get("Content-Type"),
-                ContentType.APPLICATION_JSON.getMimeType())) {
+                    ContentType.APPLICATION_JSON.getMimeType())) {
                 int length = fullHttpRequest.content().readableBytes();
                 if (length > 0) {
                     byte[] body = new byte[length];
                     fullHttpRequest.content().readBytes(body);
                     bodyMap.putAll(Objects.requireNonNull(JsonUtils.deserialize(new String(body, Constants.DEFAULT_CHARSET),
-                        new TypeReference<Map<String, Object>>() {
-                        })));
+                            new TypeReference<Map<String, Object>>() {
+                            })));
                 }
             } else {
                 HttpPostRequestDecoder decoder =
-                    new HttpPostRequestDecoder(defaultHttpDataFactory, httpRequest);
+                        new HttpPostRequestDecoder(defaultHttpDataFactory, httpRequest);
                 for (InterfaceHttpData parm : decoder.getBodyHttpDatas()) {
                     if (parm.getHttpDataType() == InterfaceHttpData.HttpDataType.Attribute) {
                         Attribute data = (Attribute) parm;
@@ -625,8 +624,8 @@ public abstract class AbstractHTTPServer extends AbstractRemotingServer {
             int c = connections.incrementAndGet();
             if (c > 20000) {
                 httpServerLogger
-                    .warn("client|http|channelActive|remoteAddress={}|msg={}", remoteAddress,
-                        "too many client(20000) connect this eventMesh server");
+                        .warn("client|http|channelActive|remoteAddress={}|msg={}", remoteAddress,
+                                "too many client(20000) connect this eventMesh server");
                 ctx.close();
                 return;
             }
@@ -647,7 +646,7 @@ public abstract class AbstractHTTPServer extends AbstractRemotingServer {
                 if (event.state().equals(IdleState.ALL_IDLE)) {
                     final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
                     httpServerLogger.info("client|http|userEventTriggered|remoteAddress={}|msg={}",
-                        remoteAddress, evt.getClass().getName());
+                            remoteAddress, evt.getClass().getName());
                     ctx.close();
                 }
             }
@@ -675,10 +674,10 @@ public abstract class AbstractHTTPServer extends AbstractRemotingServer {
                 pipeline.addFirst("ssl", new SslHandler(sslEngine));
             }
             pipeline.addLast(new HttpRequestDecoder(),
-                new HttpResponseEncoder(),
-                new HttpConnectionHandler(),
-                new HttpObjectAggregator(Integer.MAX_VALUE),
-                new HTTPHandler());
+                    new HttpResponseEncoder(),
+                    new HttpConnectionHandler(),
+                    new HttpObjectAggregator(Integer.MAX_VALUE),
+                    new HTTPHandler());
         }
     }
 
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractRemotingServer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractRemotingServer.java
index f97f7b202..71742cfa9 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractRemotingServer.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractRemotingServer.java
@@ -22,29 +22,66 @@ import org.apache.eventmesh.common.utils.ThreadUtils;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.nio.NioEventLoopGroup;
 
-import lombok.extern.slf4j.Slf4j;
-
-@Slf4j
 public abstract class AbstractRemotingServer {
-    
-    public EventLoopGroup bossGroup;
+    private static final Logger LOGGER = LoggerFactory.getLogger(AbstractRemotingServer.class);
+
+    private static final int DEFAULT_SLEEP_SECONDS = 30;
+
+    private EventLoopGroup bossGroup;
+
+    private EventLoopGroup ioGroup;
+
+    private EventLoopGroup workerGroup;
+
+    private int port;
+
+    private static final int MAX_THREADS = Runtime.getRuntime().availableProcessors();
+
+    public EventLoopGroup getBossGroup() {
+        return bossGroup;
+    }
+
+    public EventLoopGroup getIoGroup() {
+        return ioGroup;
+    }
+
+    public EventLoopGroup getWorkerGroup() {
+        return workerGroup;
+    }
+
+    public int getPort() {
+        return port;
+    }
 
-    public EventLoopGroup ioGroup;
+    public void setBossGroup(final EventLoopGroup bossGroup) {
+        this.bossGroup = bossGroup;
+    }
 
-    public EventLoopGroup workerGroup;
+    public void setIoGroup(final EventLoopGroup ioGroup) {
+        this.ioGroup = ioGroup;
+    }
 
-    public int port;
+    public void setWorkerGroup(final EventLoopGroup workerGroup) {
+        this.workerGroup = workerGroup;
+    }
 
-    private EventLoopGroup initBossGroup(String threadPrefix) {
+    public void setPort(final int port) {
+        this.port = port;
+    }
+
+    private EventLoopGroup initBossGroup(final String threadPrefix) {
         bossGroup = new NioEventLoopGroup(1, new ThreadFactory() {
-            final AtomicInteger count = new AtomicInteger(0);
+            private final AtomicInteger count = new AtomicInteger(0);
 
             @Override
-            public Thread newThread(Runnable r) {
-                Thread t = new Thread(r, threadPrefix + "-boss-" + count.incrementAndGet());
+            public Thread newThread(final Runnable r) {
+                final Thread t = new Thread(r, threadPrefix + "-boss-" + count.incrementAndGet());
                 t.setDaemon(true);
                 return t;
             }
@@ -53,57 +90,60 @@ public abstract class AbstractRemotingServer {
         return bossGroup;
     }
 
-    private EventLoopGroup initIOGroup(String threadPrefix) {
-        ioGroup = new NioEventLoopGroup(Runtime.getRuntime().availableProcessors(), new ThreadFactory() {
-            AtomicInteger count = new AtomicInteger(0);
+    private EventLoopGroup initIOGroup(final String threadPrefix, final int threadNum) {
+        ioGroup = new NioEventLoopGroup(threadNum, new ThreadFactory() {
+            private final AtomicInteger count = new AtomicInteger(0);
 
             @Override
-            public Thread newThread(Runnable r) {
+            public Thread newThread(final Runnable r) {
                 return new Thread(r, threadPrefix + "-io-" + count.incrementAndGet());
             }
         });
         return ioGroup;
     }
 
-    private EventLoopGroup initWorkerGroup(String threadPrefix) {
-        workerGroup = new NioEventLoopGroup(Runtime.getRuntime().availableProcessors(), new ThreadFactory() {
-            AtomicInteger count = new AtomicInteger(0);
+    private EventLoopGroup initWorkerGroup(final String threadPrefix, final int threadNum) {
+        workerGroup = new NioEventLoopGroup(threadNum, new ThreadFactory() {
+            private final AtomicInteger count = new AtomicInteger(0);
 
             @Override
-            public Thread newThread(Runnable r) {
-                Thread t = new Thread(r, threadPrefix + "-worker-" + count.incrementAndGet());
-                return t;
+            public Thread newThread(final Runnable r) {
+                return new Thread(r, threadPrefix + "-worker-" + count.incrementAndGet());
             }
         });
         return workerGroup;
     }
 
-    public void init(String threadPrefix) throws Exception {
+    public void init(final String threadPrefix) throws Exception {
         initBossGroup(threadPrefix);
-        initIOGroup(threadPrefix);
-        initWorkerGroup(threadPrefix);
+        initIOGroup(threadPrefix, MAX_THREADS);
+        initWorkerGroup(threadPrefix, MAX_THREADS);
     }
 
     public void shutdown() throws Exception {
         if (bossGroup != null) {
             bossGroup.shutdownGracefully();
-            log.info("shutdown bossGroup");
+            if (LOGGER.isInfoEnabled()) {
+                LOGGER.info("shutdown bossGroup");
+            }
         }
 
-        ThreadUtils.randomSleep(30);
+        ThreadUtils.randomSleep(DEFAULT_SLEEP_SECONDS);
 
         if (ioGroup != null) {
             ioGroup.shutdownGracefully();
-            log.info("shutdown ioGroup");
+            if (LOGGER.isInfoEnabled()) {
+                LOGGER.info("shutdown ioGroup");
+            }
         }
 
         if (workerGroup != null) {
             workerGroup.shutdownGracefully();
-            log.info("shutdown workerGroup");
+            if (LOGGER.isInfoEnabled()) {
+                LOGGER.info("shutdown workerGroup");
+            }
         }
     }
 
-    public void start() throws Exception {
-
-    }
+    public abstract void start() throws Exception;
 }
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshTCPServer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshTCPServer.java
index fcc8e7485..01c64f579 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshTCPServer.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshTCPServer.java
@@ -48,6 +48,8 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ScheduledExecutorService;
 
 import org.assertj.core.util.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import io.netty.bootstrap.ServerBootstrap;
 import io.netty.buffer.PooledByteBufAllocator;
@@ -56,6 +58,7 @@ import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
 import io.netty.channel.socket.nio.NioServerSocketChannel;
 import io.netty.handler.timeout.IdleStateHandler;
 import io.netty.handler.traffic.ChannelTrafficShapingHandler;
@@ -63,36 +66,36 @@ import io.netty.handler.traffic.GlobalTrafficShapingHandler;
 
 import com.google.common.util.concurrent.RateLimiter;
 
-import lombok.extern.slf4j.Slf4j;
 
-@Slf4j
 public class EventMeshTCPServer extends AbstractRemotingServer {
+    private static final Logger LOGGER = LoggerFactory.getLogger(EventMeshTCPServer.class);
 
     private ClientSessionGroupMapping clientSessionGroupMapping;
 
-    private EventMeshTcpRetryer eventMeshTcpRetryer;
+    private transient EventMeshTcpRetryer eventMeshTcpRetryer;
 
-    private EventMeshTcpMonitor eventMeshTcpMonitor;
+    private transient EventMeshTcpMonitor eventMeshTcpMonitor;
 
-    private ClientManageController clientManageController;
+    private transient ClientManageController clientManageController;
 
-    private EventMeshServer eventMeshServer;
+    private final transient EventMeshServer eventMeshServer;
 
-    private EventMeshTCPConfiguration eventMeshTCPConfiguration;
+    private final transient EventMeshTCPConfiguration eventMeshTCPConfiguration;
 
-    private GlobalTrafficShapingHandler globalTrafficShapingHandler;
+    private transient GlobalTrafficShapingHandler globalTrafficShapingHandler;
 
-    private ScheduledExecutorService scheduler;
+    private transient ScheduledExecutorService scheduler;
 
-    private ExecutorService taskHandleExecutorService;
+    private transient ExecutorService taskHandleExecutorService;
 
-    private ExecutorService broadcastMsgDownstreamExecutorService;
+    private transient ExecutorService broadcastMsgDownstreamExecutorService;
 
-    private Registry registry;
+    private final transient Registry registry;
 
-    private EventMeshRebalanceService eventMeshRebalanceService;
+    private transient EventMeshRebalanceService eventMeshRebalanceService;
+    private transient RateLimiter rateLimiter;
 
-    public void setClientSessionGroupMapping(ClientSessionGroupMapping clientSessionGroupMapping) {
+    public void setClientSessionGroupMapping(final ClientSessionGroupMapping clientSessionGroupMapping) {
         this.clientSessionGroupMapping = clientSessionGroupMapping;
     }
 
@@ -100,7 +103,7 @@ public class EventMeshTCPServer extends AbstractRemotingServer {
         return clientManageController;
     }
 
-    public void setClientManageController(ClientManageController clientManageController) {
+    public void setClientManageController(final ClientManageController clientManageController) {
         this.clientManageController = clientManageController;
     }
 
@@ -108,7 +111,7 @@ public class EventMeshTCPServer extends AbstractRemotingServer {
         return scheduler;
     }
 
-    public void setScheduler(ScheduledExecutorService scheduler) {
+    public void setScheduler(final ScheduledExecutorService scheduler) {
         this.scheduler = scheduler;
     }
 
@@ -120,7 +123,7 @@ public class EventMeshTCPServer extends AbstractRemotingServer {
         return broadcastMsgDownstreamExecutorService;
     }
 
-    public void setTaskHandleExecutorService(ExecutorService taskHandleExecutorService) {
+    public void setTaskHandleExecutorService(final ExecutorService taskHandleExecutorService) {
         this.taskHandleExecutorService = taskHandleExecutorService;
     }
 
@@ -128,71 +131,75 @@ public class EventMeshTCPServer extends AbstractRemotingServer {
         return rateLimiter;
     }
 
-    public void setRateLimiter(RateLimiter rateLimiter) {
+    public void setRateLimiter(final RateLimiter rateLimiter) {
         this.rateLimiter = rateLimiter;
     }
 
-    private RateLimiter rateLimiter;
 
-    public EventMeshTCPServer(EventMeshServer eventMeshServer,
-                              EventMeshTCPConfiguration eventMeshTCPConfiguration, Registry registry) {
+    public EventMeshTCPServer(final EventMeshServer eventMeshServer,
+                              final EventMeshTCPConfiguration eventMeshTCPConfiguration, final Registry registry) {
         super();
         this.eventMeshServer = eventMeshServer;
         this.eventMeshTCPConfiguration = eventMeshTCPConfiguration;
         this.registry = registry;
     }
 
+    @Override
+    public EventLoopGroup getWorkerGroup() {
+        return this.getWorkerGroup();
+    }
+
     private void startServer() {
         Runnable r = () -> {
             ServerBootstrap bootstrap = new ServerBootstrap();
             ChannelInitializer channelInitializer = new ChannelInitializer() {
                 @Override
-                public void initChannel(Channel ch) throws Exception {
+                public void initChannel(final Channel ch) throws Exception {
                     ch.pipeline()
-                        .addLast(new Codec.Encoder())
-                        .addLast(new Codec.Decoder())
-                        .addLast("global-traffic-shaping", globalTrafficShapingHandler)
-                        .addLast("channel-traffic-shaping", newCTSHandler())
-                        .addLast(new EventMeshTcpConnectionHandler(EventMeshTCPServer.this))
-                        .addLast(
-                            workerGroup,
-                            new IdleStateHandler(
-                                eventMeshTCPConfiguration.eventMeshTcpIdleReadSeconds,
-                                eventMeshTCPConfiguration.eventMeshTcpIdleWriteSeconds,
-                                eventMeshTCPConfiguration.eventMeshTcpIdleAllSeconds),
-                            new EventMeshTcpMessageDispatcher(EventMeshTCPServer.this),
-                            new EventMeshTcpExceptionHandler(EventMeshTCPServer.this)
-                        );
+                            .addLast(new Codec.Encoder())
+                            .addLast(new Codec.Decoder())
+                            .addLast("global-traffic-shaping", globalTrafficShapingHandler)
+                            .addLast("channel-traffic-shaping", newCTSHandler(eventMeshTCPConfiguration.getCtc().getReadLimit()))
+                            .addLast(new EventMeshTcpConnectionHandler(EventMeshTCPServer.this))
+                            .addLast(
+                                    getWorkerGroup(),
+                                    new IdleStateHandler(
+                                            eventMeshTCPConfiguration.eventMeshTcpIdleReadSeconds,
+                                            eventMeshTCPConfiguration.eventMeshTcpIdleWriteSeconds,
+                                            eventMeshTCPConfiguration.eventMeshTcpIdleAllSeconds),
+                                    new EventMeshTcpMessageDispatcher(EventMeshTCPServer.this),
+                                    new EventMeshTcpExceptionHandler(EventMeshTCPServer.this)
+                            );
                 }
             };
 
-            bootstrap.group(bossGroup, ioGroup)
-                .channel(NioServerSocketChannel.class)
-                .option(ChannelOption.SO_BACKLOG, 128)
-                .option(ChannelOption.SO_REUSEADDR, true)
-                .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000)
-                .childOption(ChannelOption.SO_KEEPALIVE, false)
-                .childOption(ChannelOption.SO_LINGER, 0)
-                .childOption(ChannelOption.SO_TIMEOUT, 600000)
-                .childOption(ChannelOption.TCP_NODELAY, true)
-                .childOption(ChannelOption.SO_SNDBUF, 65535 * 4)
-                .childOption(ChannelOption.SO_RCVBUF, 65535 * 4)
-                .option(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(2048, 4096, 65536))
-                .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
-                .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
-                .childHandler(channelInitializer);
+            bootstrap.group(this.getBossGroup(), this.getIoGroup())
+                    .channel(NioServerSocketChannel.class)
+                    .option(ChannelOption.SO_BACKLOG, 128)
+                    .option(ChannelOption.SO_REUSEADDR, true)
+                    .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10_000)
+                    .childOption(ChannelOption.SO_KEEPALIVE, false)
+                    .childOption(ChannelOption.SO_LINGER, 0)
+                    .childOption(ChannelOption.SO_TIMEOUT, 600_000)
+                    .childOption(ChannelOption.TCP_NODELAY, true)
+                    .childOption(ChannelOption.SO_SNDBUF, 65_535 * 4)
+                    .childOption(ChannelOption.SO_RCVBUF, 65_535 * 4)
+                    .option(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(2_048, 4_096, 65_536))
+                    .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
+                    .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
+                    .childHandler(channelInitializer);
 
             try {
                 int port = eventMeshTCPConfiguration.eventMeshTcpServerPort;
                 ChannelFuture f = bootstrap.bind(port).sync();
-                log.info("EventMeshTCPServer[port={}] started.....", port);
+                LOGGER.info("EventMeshTCPServer[port={}] started.....", port);
                 f.channel().closeFuture().sync();
             } catch (Exception e) {
-                log.error("EventMeshTCPServer RemotingServer Start Err!", e);
+                LOGGER.error("EventMeshTCPServer RemotingServer Start Err!", e);
                 try {
                     shutdown();
-                } catch (Exception e1) {
-                    log.error("EventMeshTCPServer RemotingServer shutdown Err!", e);
+                } catch (Exception ex) {
+                    LOGGER.error("EventMeshTCPServer RemotingServer shutdown Err!", ex);
                 }
             }
         };
@@ -202,18 +209,20 @@ public class EventMeshTCPServer extends AbstractRemotingServer {
     }
 
     public void init() throws Exception {
-        log.info("==================EventMeshTCPServer Initialing==================");
+        if (LOGGER.isInfoEnabled()) {
+            LOGGER.info("==================EventMeshTCPServer Initialing==================");
+        }
         initThreadPool();
 
         rateLimiter = RateLimiter.create(eventMeshTCPConfiguration.eventMeshTcpMsgReqnumPerSecond);
 
-        globalTrafficShapingHandler = newGTSHandler();
+        globalTrafficShapingHandler = newGTSHandler(scheduler, eventMeshTCPConfiguration.getGtc().getReadLimit());
+
 
-        
         AdminWebHookConfigOperationManage adminWebHookConfigOperationManage = new AdminWebHookConfigOperationManage();
         adminWebHookConfigOperationManage.setConfigurationWrapper(eventMeshTCPConfiguration.getConfigurationWrapper());
         adminWebHookConfigOperationManage.init();
-        
+
         clientManageController = new ClientManageController(this);
         clientManageController.setAdminWebHookConfigOperationManage(adminWebHookConfigOperationManage);
 
@@ -226,18 +235,21 @@ public class EventMeshTCPServer extends AbstractRemotingServer {
         // The MetricsRegistry is singleton, so we can use factory method to get.
         final List<MetricsRegistry> metricsRegistries = Lists.newArrayList();
         Optional.ofNullable(eventMeshTCPConfiguration.getEventMeshMetricsPluginType())
-            .ifPresent(
-                metricsPlugins -> metricsPlugins.forEach(
-                    pluginType -> metricsRegistries.add(MetricsPluginFactory.getMetricsRegistry(pluginType))));
+                .ifPresent(
+                        metricsPlugins -> metricsPlugins.forEach(
+                                pluginType -> metricsRegistries.add(MetricsPluginFactory.getMetricsRegistry(pluginType))));
         eventMeshTcpMonitor = new EventMeshTcpMonitor(this, metricsRegistries);
         eventMeshTcpMonitor.init();
 
         if (eventMeshTCPConfiguration.isEventMeshServerRegistryEnable()) {
             eventMeshRebalanceService = new EventMeshRebalanceService(this,
-                new EventmeshRebalanceImpl(this));
+                    new EventmeshRebalanceImpl(this));
             eventMeshRebalanceService.init();
         }
-        log.info("--------------------------EventMeshTCPServer Inited");
+
+        if (LOGGER.isInfoEnabled()) {
+            LOGGER.info("--------------------------EventMeshTCPServer Inited");
+        }
     }
 
     @Override
@@ -257,14 +269,16 @@ public class EventMeshTCPServer extends AbstractRemotingServer {
             eventMeshRebalanceService.start();
         }
 
-        log.info("--------------------------EventMeshTCPServer Started");
+        if (LOGGER.isInfoEnabled()) {
+            LOGGER.info("--------------------------EventMeshTCPServer Started");
+        }
     }
 
     @Override
     public void shutdown() throws Exception {
-        if (bossGroup != null) {
-            bossGroup.shutdownGracefully();
-            log.info("shutdown bossGroup, no client is allowed to connect access server");
+        if (this.getBossGroup() != null) {
+            this.getBossGroup().shutdownGracefully();
+            LOGGER.info("shutdown bossGroup, no client is allowed to connect access server");
         }
 
         if (eventMeshTCPConfiguration.isEventMeshServerRegistryEnable()) {
@@ -277,18 +291,18 @@ public class EventMeshTCPServer extends AbstractRemotingServer {
         try {
             Thread.sleep(40 * 1000);
         } catch (InterruptedException e) {
-            log.error("interruptedException occurred while sleeping", e);
+            LOGGER.error("interruptedException occurred while sleeping", e);
         }
 
         globalTrafficShapingHandler.release();
 
-        if (ioGroup != null) {
-            ioGroup.shutdownGracefully();
-            log.info("shutdown ioGroup");
+        if (this.getIoGroup() != null) {
+            this.getIoGroup().shutdownGracefully();
+            LOGGER.info("shutdown ioGroup");
         }
-        if (workerGroup != null) {
-            workerGroup.shutdownGracefully();
-            log.info("shutdown workerGroup");
+        if (this.getWorkerGroup() != null) {
+            this.getWorkerGroup().shutdownGracefully();
+            LOGGER.info("shutdown workerGroup");
         }
 
         eventMeshTcpRetryer.shutdown();
@@ -296,14 +310,16 @@ public class EventMeshTCPServer extends AbstractRemotingServer {
         eventMeshTcpMonitor.shutdown();
 
         shutdownThreadPool();
-        log.info("--------------------------EventMeshTCPServer Shutdown");
+        if (LOGGER.isInfoEnabled()) {
+            LOGGER.info("--------------------------EventMeshTCPServer Shutdown");
+        }
     }
 
     public boolean register() {
         boolean registerResult = false;
         try {
             String endPoints = IPUtils.getLocalAddress()
-                + EventMeshConstants.IP_PORT_SEPARATOR + eventMeshTCPConfiguration.eventMeshTcpServerPort;
+                    + EventMeshConstants.IP_PORT_SEPARATOR + eventMeshTCPConfiguration.eventMeshTcpServerPort;
             EventMeshRegisterInfo eventMeshRegisterInfo = new EventMeshRegisterInfo();
             eventMeshRegisterInfo.setEventMeshClusterName(eventMeshTCPConfiguration.getEventMeshCluster());
             eventMeshRegisterInfo.setEventMeshName(eventMeshTCPConfiguration.getEventMeshName() + "-"
@@ -313,7 +329,7 @@ public class EventMeshTCPServer extends AbstractRemotingServer {
             eventMeshRegisterInfo.setProtocolType(ConfigurationContextUtil.TCP);
             registerResult = registry.register(eventMeshRegisterInfo);
         } catch (Exception e) {
-            log.warn("eventMesh register to registry failed", e);
+            LOGGER.error("eventMesh register to registry failed", e);
         }
 
         return registerResult;
@@ -321,7 +337,7 @@ public class EventMeshTCPServer extends AbstractRemotingServer {
 
     private void unRegister() throws Exception {
         String endPoints = IPUtils.getLocalAddress()
-            + EventMeshConstants.IP_PORT_SEPARATOR + eventMeshTCPConfiguration.eventMeshTcpServerPort;
+                + EventMeshConstants.IP_PORT_SEPARATOR + eventMeshTCPConfiguration.eventMeshTcpServerPort;
         EventMeshUnRegisterInfo eventMeshUnRegisterInfo = new EventMeshUnRegisterInfo();
         eventMeshUnRegisterInfo.setEventMeshClusterName(eventMeshTCPConfiguration.getEventMeshCluster());
         eventMeshUnRegisterInfo.setEventMeshName(eventMeshTCPConfiguration.getEventMeshName());
@@ -337,19 +353,19 @@ public class EventMeshTCPServer extends AbstractRemotingServer {
         super.init("eventMesh-tcp");
 
         scheduler = ThreadPoolFactory.createScheduledExecutor(eventMeshTCPConfiguration.eventMeshTcpGlobalScheduler,
-            new EventMeshThreadFactoryImpl("eventMesh-tcp-scheduler", true));
+                new EventMeshThreadFactoryImpl("eventMesh-tcp-scheduler", true));
 
         taskHandleExecutorService = ThreadPoolFactory.createThreadPoolExecutor(
-            eventMeshTCPConfiguration.eventMeshTcpTaskHandleExecutorPoolSize,
-            eventMeshTCPConfiguration.eventMeshTcpTaskHandleExecutorPoolSize,
-            new LinkedBlockingQueue<>(10000),
-            new EventMeshThreadFactoryImpl("eventMesh-tcp-task-handle", true));
+                eventMeshTCPConfiguration.eventMeshTcpTaskHandleExecutorPoolSize,
+                eventMeshTCPConfiguration.eventMeshTcpTaskHandleExecutorPoolSize,
+                new LinkedBlockingQueue<>(10_000),
+                new EventMeshThreadFactoryImpl("eventMesh-tcp-task-handle", true));
 
         broadcastMsgDownstreamExecutorService = ThreadPoolFactory.createThreadPoolExecutor(
-            eventMeshTCPConfiguration.eventMeshTcpMsgDownStreamExecutorPoolSize,
-            eventMeshTCPConfiguration.eventMeshTcpMsgDownStreamExecutorPoolSize,
-            new LinkedBlockingQueue<>(10000),
-            new EventMeshThreadFactoryImpl("eventMesh-tcp-msg-downstream", true));
+                eventMeshTCPConfiguration.eventMeshTcpMsgDownStreamExecutorPoolSize,
+                eventMeshTCPConfiguration.eventMeshTcpMsgDownStreamExecutorPoolSize,
+                new LinkedBlockingQueue<>(10_000),
+                new EventMeshThreadFactoryImpl("eventMesh-tcp-msg-downstream", true));
     }
 
     private void shutdownThreadPool() {
@@ -357,27 +373,26 @@ public class EventMeshTCPServer extends AbstractRemotingServer {
         taskHandleExecutorService.shutdown();
     }
 
-    private GlobalTrafficShapingHandler newGTSHandler() {
-        GlobalTrafficShapingHandler handler = new GlobalTrafficShapingHandler(scheduler, 0,
-            eventMeshTCPConfiguration.getGtc().getReadLimit()) {
+    private GlobalTrafficShapingHandler newGTSHandler(final ScheduledExecutorService executor, final long readLimit) {
+        GlobalTrafficShapingHandler handler = new GlobalTrafficShapingHandler(executor, 0,
+                readLimit) {
             @Override
-            protected long calculateSize(Object msg) {
+            protected long calculateSize(final Object msg) {
                 return 1;
             }
         };
-        handler.setMaxTimeWait(1000);
+        handler.setMaxTimeWait(1_000);
         return handler;
     }
 
-    private ChannelTrafficShapingHandler newCTSHandler() {
-        ChannelTrafficShapingHandler handler = new ChannelTrafficShapingHandler(0,
-            eventMeshTCPConfiguration.getCtc().getReadLimit()) {
+    private ChannelTrafficShapingHandler newCTSHandler(final long readLimit) {
+        ChannelTrafficShapingHandler handler = new ChannelTrafficShapingHandler(0, readLimit) {
             @Override
-            protected long calculateSize(Object msg) {
+            protected long calculateSize(final Object msg) {
                 return 1;
             }
         };
-        handler.setMaxTimeWait(3000);
+        handler.setMaxTimeWait(3_000);
         return handler;
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org