You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by my...@apache.org on 2023/02/24 03:06:40 UTC

[incubator-eventmesh] branch master updated: [ISSUE #3201]Optimize remote service related code

This is an automated email from the ASF dual-hosted git repository.

mytang0 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git


The following commit(s) were added to refs/heads/master by this push:
     new aab44051f [ISSUE #3201]Optimize remote service related code
     new 55c884d25 Merge pull request #3207 from mxsm/eventmesh-3201
aab44051f is described below

commit aab44051f73e01c378e9924fb9a4bac44e8107cb
Author: mxsm <lj...@gmail.com>
AuthorDate: Fri Feb 24 00:27:40 2023 +0800

    [ISSUE #3201]Optimize remote service related code
---
 .../apache/eventmesh/common/utils/SystemUtils.java |  76 +++++++++++++
 .../apache/eventmesh/common/utils/ThreadUtils.java |   4 +-
 .../eventmesh/common/utils/SystemUtilsTest.java    |  45 ++++++++
 .../eventmesh/runtime/boot/AbstractHTTPServer.java |  67 +++++++----
 .../runtime/boot/AbstractRemotingServer.java       |  42 ++++---
 .../runtime/boot/EventMeshHTTPServer.java          | 124 +++++++++++----------
 .../runtime/boot/EventMeshHttpBootstrap.java       |   5 +-
 .../eventmesh/runtime/boot/EventMeshServer.java    |   4 +-
 .../eventmesh/runtime/boot/EventMeshTCPServer.java |  28 +++--
 .../eventmesh/runtime/common/ServiceState.java     |   4 +-
 .../runtime/core/consumer/SubscriptionManager.java |   6 +-
 .../protocol/grpc/consumer/EventMeshConsumer.java  |   2 +-
 .../protocol/grpc/producer/EventMeshProducer.java  |   2 +-
 .../tcp/client/EventMeshTcpConnectionHandler.java  |   9 +-
 14 files changed, 293 insertions(+), 125 deletions(-)

diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/utils/SystemUtils.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/utils/SystemUtils.java
new file mode 100644
index 000000000..0cfe6f821
--- /dev/null
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/utils/SystemUtils.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.common.utils;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.Method;
+
+public abstract class SystemUtils {
+
+    public static final String OS_NAME = System.getProperty("os.name");
+
+    private static boolean isLinuxPlatform = false;
+
+    private static boolean isWindowsPlatform = false;
+
+    static {
+        if (OS_NAME != null && OS_NAME.toLowerCase().contains("linux")) {
+            isLinuxPlatform = true;
+        }
+
+        if (OS_NAME != null && OS_NAME.toLowerCase().contains("windows")) {
+            isWindowsPlatform = true;
+        }
+    }
+
+    private SystemUtils() {
+
+    }
+
+    public static boolean isLinuxPlatform() {
+        return isLinuxPlatform;
+    }
+
+    public static boolean isWindowsPlatform() {
+        return isWindowsPlatform;
+    }
+
+    public static String getProcessId() {
+        try {
+            //likely works on most platforms
+            final Class<?> managementFactoryClass = Class.forName("java.lang.management.ManagementFactory");
+            final Method getRuntimeMXBean = managementFactoryClass.getDeclaredMethod("getRuntimeMXBean");
+            final Class<?> runtimeMXBeanClass = Class.forName("java.lang.management.RuntimeMXBean");
+            final Method getName = runtimeMXBeanClass.getDeclaredMethod("getName");
+
+            final Object runtimeMXBean = getRuntimeMXBean.invoke(null);
+            final String name = (String) getName.invoke(runtimeMXBean);
+
+            return name.split("@")[0];
+        } catch (final Exception ex) {
+            try {
+                // try a Linux-specific way
+                return new File("/proc/self").getCanonicalFile().getName();
+            } catch (final IOException ignoredUseDefault) {
+                // Ignore exception.
+            }
+        }
+        return "-1";
+    }
+}
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/utils/ThreadUtils.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/utils/ThreadUtils.java
index b68050e17..03f070ceb 100644
--- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/utils/ThreadUtils.java
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/utils/ThreadUtils.java
@@ -17,8 +17,6 @@
 
 package org.apache.eventmesh.common.utils;
 
-import org.apache.logging.log4j.util.ProcessIdUtil;
-
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 
@@ -73,7 +71,7 @@ public class ThreadUtils {
         if (currentPID == -1) {
             synchronized (ThreadUtils.class) {
                 if (currentPID == -1) {
-                    currentPID = Long.parseLong(ProcessIdUtil.getProcessId());
+                    currentPID = Long.parseLong(SystemUtils.getProcessId());
                 }
             }
         }
diff --git a/eventmesh-common/src/test/java/org/apache/eventmesh/common/utils/SystemUtilsTest.java b/eventmesh-common/src/test/java/org/apache/eventmesh/common/utils/SystemUtilsTest.java
new file mode 100644
index 000000000..a14b1ee18
--- /dev/null
+++ b/eventmesh-common/src/test/java/org/apache/eventmesh/common/utils/SystemUtilsTest.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.common.utils;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class SystemUtilsTest {
+
+    @Test
+    public void isLinuxPlatform() {
+        if (null != SystemUtils.OS_NAME && SystemUtils.OS_NAME.toLowerCase().contains("linux")) {
+            Assert.assertTrue(SystemUtils.isLinuxPlatform());
+            Assert.assertFalse(SystemUtils.isWindowsPlatform());
+        }
+    }
+
+    @Test
+    public void isWindowsPlatform() {
+        if (null != SystemUtils.OS_NAME && SystemUtils.OS_NAME.toLowerCase().contains("windows")) {
+            Assert.assertFalse(SystemUtils.isLinuxPlatform());
+            Assert.assertTrue(SystemUtils.isWindowsPlatform());
+        }
+    }
+
+    @Test
+    public void getProcessId() {
+        Assert.assertNotEquals("-1", SystemUtils.getProcessId());
+    }
+}
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractHTTPServer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractHTTPServer.java
index 1e1dc8060..8a1179ed1 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractHTTPServer.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractHTTPServer.java
@@ -64,11 +64,13 @@ import javax.net.ssl.SSLEngine;
 import io.netty.bootstrap.ServerBootstrap;
 import io.netty.channel.ChannelDuplexHandler;
 import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandler.Sharable;
 import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelInboundHandlerAdapter;
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelOption;
 import io.netty.channel.ChannelPipeline;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.channel.epoll.EpollServerSocketChannel;
 import io.netty.channel.socket.SocketChannel;
 import io.netty.channel.socket.nio.NioServerSocketChannel;
 import io.netty.handler.codec.http.DefaultFullHttpResponse;
@@ -135,6 +137,10 @@ public abstract class AbstractHTTPServer extends AbstractRemotingServer {
     protected final transient Map<String/* request uri */, Pair<EventProcessor, ThreadPoolExecutor>>
         eventProcessorTable = new ConcurrentHashMap<>(64);
 
+    private HttpConnectionHandler httpConnectionHandler;
+
+    private HTTPHandler httpHandler;
+
     public AbstractHTTPServer(final int port, final boolean useTLS,
         final EventMeshHTTPConfiguration eventMeshHttpConfiguration) {
         super();
@@ -195,20 +201,22 @@ public abstract class AbstractHTTPServer extends AbstractRemotingServer {
 
     @Override
     public void start() throws Exception {
-        final Runnable r = () -> {
-            final ServerBootstrap b = new ServerBootstrap();
+
+        initSharableHandlers();
+
+        final Runnable runnable = () -> {
+            final ServerBootstrap bootstrap = new ServerBootstrap();
             try {
-                b.group(this.getBossGroup(), this.getWorkerGroup())
-                    .channel(NioServerSocketChannel.class)
-                    .childHandler(new HttpsServerInitializer(
-                        useTLS ? SSLContextFactory.getSslContext(eventMeshHttpConfiguration) : null))
+                bootstrap.group(this.getBossGroup(), this.getIoGroup())
+                    .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
+                    .childHandler(new HttpsServerInitializer(useTLS ? SSLContextFactory.getSslContext(eventMeshHttpConfiguration) : null))
                     .childOption(ChannelOption.SO_KEEPALIVE, Boolean.TRUE);
 
                 if (log.isInfoEnabled()) {
                     log.info("HTTPServer[port={}] started.", this.getPort());
                 }
 
-                b.bind(this.getPort())
+                bootstrap.bind(this.getPort())
                     .channel()
                     .closeFuture()
                     .sync();
@@ -222,9 +230,9 @@ public abstract class AbstractHTTPServer extends AbstractRemotingServer {
             }
         };
 
-        final Thread t = new Thread(r, "EventMesh-http-server");
-        t.setDaemon(true);
-        t.start();
+        final Thread thread = new Thread(runnable, "EventMesh-http-server");
+        thread.setDaemon(true);
+        thread.start();
         started.compareAndSet(false, true);
     }
 
@@ -310,8 +318,7 @@ public abstract class AbstractHTTPServer extends AbstractRemotingServer {
                 .parameters()
                 .forEach((key, value) -> httpRequestBody.put(key, value.get(0)));
         } else if (HttpMethod.POST.equals(httpRequest.method())) {
-            final HttpPostRequestDecoder decoder =
-                new HttpPostRequestDecoder(DEFAULT_HTTP_DATA_FACTORY, httpRequest);
+            final HttpPostRequestDecoder decoder = new HttpPostRequestDecoder(DEFAULT_HTTP_DATA_FACTORY, httpRequest);
             for (final InterfaceHttpData parm : decoder.getBodyHttpDatas()) {
                 if (InterfaceHttpData.HttpDataType.Attribute == parm.getHttpDataType()) {
                     final Attribute data = (Attribute) parm;
@@ -324,12 +331,18 @@ public abstract class AbstractHTTPServer extends AbstractRemotingServer {
         return httpRequestBody;
     }
 
-    private class HTTPHandler extends ChannelInboundHandlerAdapter {
+    @Sharable
+    private class HTTPHandler extends SimpleChannelInboundHandler<HttpRequest> {
 
+        /**
+         * Is called for each message of type {@link HttpRequest}.
+         *
+         * @param ctx         the {@link ChannelHandlerContext} which this {@link SimpleChannelInboundHandler} belongs to
+         * @param httpRequest the message to handle
+         * @throws Exception is thrown if an error occurred
+         */
         @Override
-        public void channelRead(final ChannelHandlerContext ctx, final Object message) {
-            final HttpRequest httpRequest = (HttpRequest) message;
-
+        protected void channelRead0(ChannelHandlerContext ctx, HttpRequest httpRequest) throws Exception {
             if (httpRequest == null) {
                 return;
             }
@@ -433,7 +446,7 @@ public abstract class AbstractHTTPServer extends AbstractRemotingServer {
             } catch (Exception ex) {
                 log.error("AbrstractHTTPServer.HTTPHandler.channelRead error", ex);
             } finally {
-                ReferenceCountUtil.release(message);
+                ReferenceCountUtil.release(httpRequest);
             }
         }
 
@@ -502,7 +515,7 @@ public abstract class AbstractHTTPServer extends AbstractRemotingServer {
         }
 
         public void processEventMeshRequest(final ChannelHandlerContext ctx,
-            final AsyncContext<HttpCommand> asyncContext) {
+                                            final AsyncContext<HttpCommand> asyncContext) {
             final HttpCommand request = asyncContext.getRequest();
             final Pair<HttpRequestProcessor, ThreadPoolExecutor> choosed = processorTable.get(request.getRequestCode());
             try {
@@ -644,6 +657,12 @@ public abstract class AbstractHTTPServer extends AbstractRemotingServer {
         return httpEventWrapper;
     }
 
+    private void initSharableHandlers() {
+        httpConnectionHandler = new HttpConnectionHandler();
+        httpHandler = new HTTPHandler();
+    }
+
+    @Sharable
     private class HttpConnectionHandler extends ChannelDuplexHandler {
 
         public final transient AtomicInteger connections = new AtomicInteger(0);
@@ -658,7 +677,6 @@ public abstract class AbstractHTTPServer extends AbstractRemotingServer {
                 ctx.close();
                 return;
             }
-
             super.channelActive(ctx);
         }
 
@@ -701,14 +719,15 @@ public abstract class AbstractHTTPServer extends AbstractRemotingServer {
             if (sslContext != null && useTLS) {
                 final SSLEngine sslEngine = sslContext.createSSLEngine();
                 sslEngine.setUseClientMode(false);
-                pipeline.addFirst("ssl", new SslHandler(sslEngine));
+                pipeline.addFirst(getWorkerGroup(), "ssl", new SslHandler(sslEngine));
             }
 
-            pipeline.addLast(new HttpRequestDecoder(),
+            pipeline.addLast(getWorkerGroup(),
+                new HttpRequestDecoder(),
                 new HttpResponseEncoder(),
-                new HttpConnectionHandler(),
+                httpConnectionHandler,
                 new HttpObjectAggregator(Integer.MAX_VALUE),
-                new HTTPHandler());
+                httpHandler);
         }
     }
 
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractRemotingServer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractRemotingServer.java
index 1d84fcb9c..858ed6993 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractRemotingServer.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractRemotingServer.java
@@ -18,12 +18,16 @@
 package org.apache.eventmesh.runtime.boot;
 
 import org.apache.eventmesh.common.EventMeshThreadFactory;
+import org.apache.eventmesh.common.utils.SystemUtils;
 import org.apache.eventmesh.common.utils.ThreadUtils;
 
 import java.util.concurrent.TimeUnit;
 
 import io.netty.channel.EventLoopGroup;
+import io.netty.channel.epoll.Epoll;
+import io.netty.channel.epoll.EpollEventLoopGroup;
 import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.util.concurrent.EventExecutorGroup;
 
 
 import lombok.extern.slf4j.Slf4j;
@@ -37,7 +41,7 @@ public abstract class AbstractRemotingServer {
 
     private EventLoopGroup ioGroup;
 
-    private EventLoopGroup workerGroup;
+    private EventExecutorGroup workerGroup;
 
     private int port;
 
@@ -51,7 +55,7 @@ public abstract class AbstractRemotingServer {
         return ioGroup;
     }
 
-    public EventLoopGroup getWorkerGroup() {
+    public EventExecutorGroup getWorkerGroup() {
         return workerGroup;
     }
 
@@ -67,7 +71,7 @@ public abstract class AbstractRemotingServer {
         this.ioGroup = ioGroup;
     }
 
-    public void setWorkerGroup(final EventLoopGroup workerGroup) {
+    public void setWorkerGroup(final EventExecutorGroup workerGroup) {
         this.workerGroup = workerGroup;
     }
 
@@ -75,25 +79,31 @@ public abstract class AbstractRemotingServer {
         this.port = port;
     }
 
-    private EventLoopGroup initBossGroup(final String threadPrefix) {
-        bossGroup = new NioEventLoopGroup(1, new EventMeshThreadFactory(threadPrefix + "-boss", true));
-        return bossGroup;
+    private void buildBossGroup(final String threadPrefix) {
+        if (useEpoll()) {
+            bossGroup = new EpollEventLoopGroup(1, new EventMeshThreadFactory(threadPrefix + "NettyEpoll-Boss", true));
+        } else {
+            bossGroup = new NioEventLoopGroup(1, new EventMeshThreadFactory(threadPrefix + "NettyNio-Boss", true));
+        }
+
     }
 
-    private EventLoopGroup initIOGroup(final String threadPrefix, final int threadNum) {
-        ioGroup = new NioEventLoopGroup(threadNum, new EventMeshThreadFactory(threadPrefix + "-io"));
-        return ioGroup;
+    private void buildIOGroup(final String threadPrefix, final int threadNum) {
+        if (useEpoll()) {
+            ioGroup = new EpollEventLoopGroup(threadNum, new EventMeshThreadFactory(threadPrefix + "-NettyEpoll-IO"));
+        } else {
+            ioGroup = new NioEventLoopGroup(threadNum, new EventMeshThreadFactory(threadPrefix + "-NettyNio-IO"));
+        }
     }
 
-    private EventLoopGroup initWorkerGroup(final String threadPrefix, final int threadNum) {
+    private void buildWorkerGroup(final String threadPrefix, final int threadNum) {
         workerGroup = new NioEventLoopGroup(threadNum, new EventMeshThreadFactory(threadPrefix + "-worker"));
-        return workerGroup;
     }
 
     public void init(final String threadPrefix) throws Exception {
-        initBossGroup(threadPrefix);
-        initIOGroup(threadPrefix, MAX_THREADS);
-        initWorkerGroup(threadPrefix, MAX_THREADS);
+        buildBossGroup(threadPrefix);
+        buildIOGroup(threadPrefix, MAX_THREADS);
+        buildWorkerGroup(threadPrefix, MAX_THREADS);
     }
 
     public void shutdown() throws Exception {
@@ -121,5 +131,9 @@ public abstract class AbstractRemotingServer {
         }
     }
 
+    protected boolean useEpoll() {
+        return SystemUtils.isLinuxPlatform() && Epoll.isAvailable();
+    }
+
     public abstract void start() throws Exception;
 }
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshHTTPServer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshHTTPServer.java
index 338f2d758..11d3d61ea 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshHTTPServer.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshHTTPServer.java
@@ -116,10 +116,9 @@ public class EventMeshHTTPServer extends AbstractHTTPServer {
 
     public transient HTTPClientPool httpClientPool = new HTTPClientPool(10);
 
-    public EventMeshHTTPServer(final EventMeshServer eventMeshServer,
-        final EventMeshHTTPConfiguration eventMeshHttpConfiguration) {
-        super(eventMeshHttpConfiguration.getHttpServerPort(),
-            eventMeshHttpConfiguration.isEventMeshServerUseTls(), eventMeshHttpConfiguration);
+    public EventMeshHTTPServer(final EventMeshServer eventMeshServer, final EventMeshHTTPConfiguration eventMeshHttpConfiguration) {
+
+        super(eventMeshHttpConfiguration.getHttpServerPort(), eventMeshHttpConfiguration.isEventMeshServerUseTls(), eventMeshHttpConfiguration);
         this.eventMeshServer = eventMeshServer;
         this.eventMeshHttpConfiguration = eventMeshHttpConfiguration;
         this.registry = eventMeshServer.getRegistry();
@@ -127,21 +126,6 @@ public class EventMeshHTTPServer extends AbstractHTTPServer {
 
     }
 
-    public EventMeshServer getEventMeshServer() {
-        return eventMeshServer;
-    }
-
-
-    public void shutdownThreadPool() {
-        batchMsgExecutor.shutdown();
-        adminExecutor.shutdown();
-        clientManageExecutor.shutdown();
-        sendMsgExecutor.shutdown();
-        remoteMsgExecutor.shutdown();
-        pushMsgExecutor.shutdown();
-        replyMsgExecutor.shutdown();
-    }
-
     private void initThreadPool() {
 
         batchMsgExecutor = ThreadPoolFactory.createThreadPoolExecutor(
@@ -187,40 +171,28 @@ public class EventMeshHTTPServer extends AbstractHTTPServer {
             "eventMesh-replyMsg", true);
     }
 
-    public ThreadPoolExecutor getBatchMsgExecutor() {
-        return batchMsgExecutor;
-    }
-
-    public ThreadPoolExecutor getSendMsgExecutor() {
-        return sendMsgExecutor;
-    }
-
-    public ThreadPoolExecutor getReplyMsgExecutor() {
-        return replyMsgExecutor;
-    }
-
-    public ThreadPoolExecutor getPushMsgExecutor() {
-        return pushMsgExecutor;
-    }
-
-    public ThreadPoolExecutor getClientManageExecutor() {
-        return clientManageExecutor;
-    }
-
-    public ThreadPoolExecutor getAdminExecutor() {
-        return adminExecutor;
-    }
-
-    public RateLimiter getMsgRateLimiter() {
-        return msgRateLimiter;
-    }
-
-    public RateLimiter getBatchRateLimiter() {
-        return batchRateLimiter;
-    }
-
-    public Registry getRegistry() {
-        return registry;
+    public void shutdownThreadPool() {
+        if (batchMsgExecutor != null) {
+            batchMsgExecutor.shutdown();
+        }
+        if (adminExecutor != null) {
+            adminExecutor.shutdown();
+        }
+        if (clientManageExecutor != null) {
+            clientManageExecutor.shutdown();
+        }
+        if (sendMsgExecutor != null) {
+            sendMsgExecutor.shutdown();
+        }
+        if (remoteMsgExecutor != null) {
+            remoteMsgExecutor.shutdown();
+        }
+        if (pushMsgExecutor != null) {
+            pushMsgExecutor.shutdown();
+        }
+        if (replyMsgExecutor != null) {
+            replyMsgExecutor.shutdown();
+        }
     }
 
     private void init() throws Exception {
@@ -269,7 +241,7 @@ public class EventMeshHTTPServer extends AbstractHTTPServer {
         registerHTTPRequestProcessor();
         this.initWebhook();
         if (log.isInfoEnabled()) {
-            log.info("--------EventMeshHTTPServer inited------------------");
+            log.info("==================EventMeshHTTPServer initialized==================");
         }
     }
 
@@ -285,7 +257,7 @@ public class EventMeshHTTPServer extends AbstractHTTPServer {
             this.register();
         }
         if (log.isInfoEnabled()) {
-            log.info("--------EventMeshHTTPServer started------------------");
+            log.info("==================EventMeshHTTPServer started==================");
         }
     }
 
@@ -310,7 +282,7 @@ public class EventMeshHTTPServer extends AbstractHTTPServer {
             this.unRegister();
         }
         if (log.isInfoEnabled()) {
-            log.info("-------------EventMeshHTTPServer shutdown-------------");
+            log.info("==================EventMeshHTTPServer shutdown==================");
         }
     }
 
@@ -441,4 +413,44 @@ public class EventMeshHTTPServer extends AbstractHTTPServer {
     public Acl getAcl() {
         return acl;
     }
+
+    public EventMeshServer getEventMeshServer() {
+        return eventMeshServer;
+    }
+
+    public ThreadPoolExecutor getBatchMsgExecutor() {
+        return batchMsgExecutor;
+    }
+
+    public ThreadPoolExecutor getSendMsgExecutor() {
+        return sendMsgExecutor;
+    }
+
+    public ThreadPoolExecutor getReplyMsgExecutor() {
+        return replyMsgExecutor;
+    }
+
+    public ThreadPoolExecutor getPushMsgExecutor() {
+        return pushMsgExecutor;
+    }
+
+    public ThreadPoolExecutor getClientManageExecutor() {
+        return clientManageExecutor;
+    }
+
+    public ThreadPoolExecutor getAdminExecutor() {
+        return adminExecutor;
+    }
+
+    public RateLimiter getMsgRateLimiter() {
+        return msgRateLimiter;
+    }
+
+    public RateLimiter getBatchRateLimiter() {
+        return batchRateLimiter;
+    }
+
+    public Registry getRegistry() {
+        return registry;
+    }
 }
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshHttpBootstrap.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshHttpBootstrap.java
index f4550fe71..95512b7e5 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshHttpBootstrap.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshHttpBootstrap.java
@@ -49,14 +49,15 @@ public class EventMeshHttpBootstrap implements EventMeshBootstrap {
     @Override
     public void start() throws Exception {
         // server start
-        if (eventMeshHttpConfiguration != null) {
+        if (eventMeshHttpServer != null) {
             eventMeshHttpServer.start();
         }
     }
 
     @Override
     public void shutdown() throws Exception {
-        if (eventMeshHttpConfiguration != null) {
+        //server shutdown
+        if (eventMeshHttpServer != null) {
             eventMeshHttpServer.shutdown();
         }
     }
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshServer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshServer.java
index 7e8273545..b9e885106 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshServer.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshServer.java
@@ -160,7 +160,7 @@ public class EventMeshServer {
     }
 
     public void shutdown() throws Exception {
-        serviceState = ServiceState.STOPING;
+        serviceState = ServiceState.STOPPING;
         if (log.isInfoEnabled()) {
             log.info(SERVER_STATE_MSG, serviceState);
         }
@@ -184,7 +184,7 @@ public class EventMeshServer {
         }
 
         ConfigurationContextUtil.clear();
-        serviceState = ServiceState.STOPED;
+        serviceState = ServiceState.STOPPED;
 
         if (log.isInfoEnabled()) {
             log.info(SERVER_STATE_MSG, serviceState);
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshTCPServer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshTCPServer.java
index f9bb8bffb..d7c09d6dd 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshTCPServer.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshTCPServer.java
@@ -58,11 +58,13 @@ import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelOption;
+import io.netty.channel.epoll.EpollServerSocketChannel;
 import io.netty.channel.socket.nio.NioServerSocketChannel;
 import io.netty.handler.timeout.IdleStateHandler;
 import io.netty.handler.traffic.ChannelTrafficShapingHandler;
 import io.netty.handler.traffic.GlobalTrafficShapingHandler;
 
+
 import com.google.common.util.concurrent.RateLimiter;
 
 import lombok.extern.slf4j.Slf4j;
@@ -82,6 +84,8 @@ public class EventMeshTCPServer extends AbstractRemotingServer {
 
     private transient GlobalTrafficShapingHandler globalTrafficShapingHandler;
 
+    private EventMeshTcpConnectionHandler eventMeshTcpConnectionHandler;
+
     private transient ScheduledExecutorService scheduler;
 
     private transient ExecutorService taskHandleExecutorService;
@@ -98,6 +102,7 @@ public class EventMeshTCPServer extends AbstractRemotingServer {
 
     private transient RateLimiter rateLimiter;
 
+
     public void setClientSessionGroupMapping(final ClientSessionGroupMapping clientSessionGroupMapping) {
         this.clientSessionGroupMapping = clientSessionGroupMapping;
     }
@@ -140,19 +145,18 @@ public class EventMeshTCPServer extends AbstractRemotingServer {
     }
 
     private void startServer() {
-        Runnable r = () -> {
+        Runnable runnable = () -> {
             ServerBootstrap bootstrap = new ServerBootstrap();
             ChannelInitializer channelInitializer = new ChannelInitializer() {
                 @Override
                 public void initChannel(final Channel ch) throws Exception {
                     ch.pipeline()
-                        .addLast(new Codec.Encoder())
-                        .addLast(new Codec.Decoder())
-                        .addLast("global-traffic-shaping", globalTrafficShapingHandler)
-                        .addLast("channel-traffic-shaping", newCTSHandler(eventMeshTCPConfiguration.getCtc().getReadLimit()))
-                        .addLast(new EventMeshTcpConnectionHandler(EventMeshTCPServer.this))
-                        .addLast(
-                            getWorkerGroup(),
+                        .addLast(getWorkerGroup(), new Codec.Encoder())
+                        .addLast(getWorkerGroup(), new Codec.Decoder())
+                        .addLast(getWorkerGroup(), "global-traffic-shaping", globalTrafficShapingHandler)
+                        .addLast(getWorkerGroup(), "channel-traffic-shaping", newCTSHandler(eventMeshTCPConfiguration.getCtc().getReadLimit()))
+                        .addLast(getWorkerGroup(), eventMeshTcpConnectionHandler)
+                        .addLast(getWorkerGroup(),
                             new IdleStateHandler(
                                 eventMeshTCPConfiguration.getEventMeshTcpIdleReadSeconds(),
                                 eventMeshTCPConfiguration.getEventMeshTcpIdleWriteSeconds(),
@@ -164,7 +168,7 @@ public class EventMeshTCPServer extends AbstractRemotingServer {
             };
 
             bootstrap.group(this.getBossGroup(), this.getIoGroup())
-                .channel(NioServerSocketChannel.class)
+                .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
                 .option(ChannelOption.SO_BACKLOG, 128)
                 .option(ChannelOption.SO_REUSEADDR, true)
                 .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10_000)
@@ -194,8 +198,8 @@ public class EventMeshTCPServer extends AbstractRemotingServer {
             }
         };
 
-        Thread t = new Thread(r, "eventMesh-tcp-server");
-        t.start();
+        Thread thread = new Thread(runnable, "eventMesh-tcp-server");
+        thread.start();
     }
 
     public void init() throws Exception {
@@ -208,6 +212,8 @@ public class EventMeshTCPServer extends AbstractRemotingServer {
 
         globalTrafficShapingHandler = newGTSHandler(scheduler, eventMeshTCPConfiguration.getGtc().getReadLimit());
 
+        eventMeshTcpConnectionHandler = new EventMeshTcpConnectionHandler(this);
+
         adminWebHookConfigOperationManage = new AdminWebHookConfigOperationManager();
         adminWebHookConfigOperationManage.init();
 
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/common/ServiceState.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/common/ServiceState.java
index 6848b2c2b..841450d5b 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/common/ServiceState.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/common/ServiceState.java
@@ -23,7 +23,7 @@ public enum ServiceState {
 
     RUNNING,
 
-    STOPING,
+    STOPPING,
 
-    STOPED;
+    STOPPED;
 }
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/consumer/SubscriptionManager.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/consumer/SubscriptionManager.java
index 0d1ae29e6..37ac2719e 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/consumer/SubscriptionManager.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/consumer/SubscriptionManager.java
@@ -36,11 +36,9 @@ import lombok.extern.slf4j.Slf4j;
 @Slf4j
 public class SubscriptionManager {
 
-    private final ConcurrentHashMap<String /**group*/, ConsumerGroupConf> localConsumerGroupMapping =
-        new ConcurrentHashMap<>();
+    private final ConcurrentHashMap<String /**group*/, ConsumerGroupConf> localConsumerGroupMapping = new ConcurrentHashMap<>(64);
 
-    private final ConcurrentHashMap<String /**group@topic*/, List<Client>> localClientInfoMapping =
-        new ConcurrentHashMap<>();
+    private final ConcurrentHashMap<String /**group@topic*/, List<Client>> localClientInfoMapping = new ConcurrentHashMap<>(64);
 
     public ConcurrentHashMap<String, ConsumerGroupConf> getLocalConsumerGroupMapping() {
         return localConsumerGroupMapping;
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/consumer/EventMeshConsumer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/consumer/EventMeshConsumer.java
index 14a92ebbc..1a1f81e7e 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/consumer/EventMeshConsumer.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/consumer/EventMeshConsumer.java
@@ -191,7 +191,7 @@ public class EventMeshConsumer {
         persistentMqConsumer.shutdown();
         broadcastMqConsumer.shutdown();
 
-        serviceState = ServiceState.STOPED;
+        serviceState = ServiceState.STOPPED;
         if (log.isInfoEnabled()) {
             log.info("EventMeshConsumer [{}] shutdown.........", consumerGroup);
         }
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/producer/EventMeshProducer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/producer/EventMeshProducer.java
index 69cc46af6..12bc3844d 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/producer/EventMeshProducer.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/producer/EventMeshProducer.java
@@ -87,7 +87,7 @@ public class EventMeshProducer {
         }
 
         mqProducerWrapper.shutdown();
-        serviceState = ServiceState.STOPED;
+        serviceState = ServiceState.STOPPED;
         log.info("EventMeshProducer [{}] shutdown.........", producerGroupConfig.getGroupName());
     }
 
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/EventMeshTcpConnectionHandler.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/EventMeshTcpConnectionHandler.java
index 82718e182..9f8a92e3f 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/EventMeshTcpConnectionHandler.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/EventMeshTcpConnectionHandler.java
@@ -23,6 +23,7 @@ import org.apache.eventmesh.runtime.util.RemotingHelper;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import io.netty.channel.ChannelDuplexHandler;
+import io.netty.channel.ChannelHandler.Sharable;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.handler.timeout.IdleState;
 import io.netty.handler.timeout.IdleStateEvent;
@@ -31,6 +32,7 @@ import io.netty.handler.timeout.IdleStateEvent;
 import lombok.extern.slf4j.Slf4j;
 
 @Slf4j
+@Sharable
 public class EventMeshTcpConnectionHandler extends ChannelDuplexHandler {
 
     public static final AtomicInteger connections = new AtomicInteger(0);
@@ -59,11 +61,8 @@ public class EventMeshTcpConnectionHandler extends ChannelDuplexHandler {
         final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
         log.info("client|tcp|channelActive|remoteAddress={}|msg={}", remoteAddress, "");
 
-        int c = connections.incrementAndGet();
-        if (c > eventMeshTCPServer.getEventMeshTCPConfiguration().getEventMeshTcpClientMaxNum()) {
-            log.warn("client|tcp|channelActive|remoteAddress={}|msg={}", remoteAddress, "too many client connect "
-                +
-                "this eventMesh server");
+        if (connections.incrementAndGet() > eventMeshTCPServer.getEventMeshTCPConfiguration().getEventMeshTcpClientMaxNum()) {
+            log.warn("client|tcp|channelActive|remoteAddress={}|msg={}", remoteAddress, "too many client connect this eventMesh server");
             ctx.close();
             return;
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org