You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by my...@apache.org on 2023/02/24 03:06:40 UTC
[incubator-eventmesh] branch master updated: [ISSUE #3201]Optimize remote service related code
This is an automated email from the ASF dual-hosted git repository.
mytang0 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git
The following commit(s) were added to refs/heads/master by this push:
new aab44051f [ISSUE #3201]Optimize remote service related code
new 55c884d25 Merge pull request #3207 from mxsm/eventmesh-3201
aab44051f is described below
commit aab44051f73e01c378e9924fb9a4bac44e8107cb
Author: mxsm <lj...@gmail.com>
AuthorDate: Fri Feb 24 00:27:40 2023 +0800
[ISSUE #3201]Optimize remote service related code
---
.../apache/eventmesh/common/utils/SystemUtils.java | 76 +++++++++++++
.../apache/eventmesh/common/utils/ThreadUtils.java | 4 +-
.../eventmesh/common/utils/SystemUtilsTest.java | 45 ++++++++
.../eventmesh/runtime/boot/AbstractHTTPServer.java | 67 +++++++----
.../runtime/boot/AbstractRemotingServer.java | 42 ++++---
.../runtime/boot/EventMeshHTTPServer.java | 124 +++++++++++----------
.../runtime/boot/EventMeshHttpBootstrap.java | 5 +-
.../eventmesh/runtime/boot/EventMeshServer.java | 4 +-
.../eventmesh/runtime/boot/EventMeshTCPServer.java | 28 +++--
.../eventmesh/runtime/common/ServiceState.java | 4 +-
.../runtime/core/consumer/SubscriptionManager.java | 6 +-
.../protocol/grpc/consumer/EventMeshConsumer.java | 2 +-
.../protocol/grpc/producer/EventMeshProducer.java | 2 +-
.../tcp/client/EventMeshTcpConnectionHandler.java | 9 +-
14 files changed, 293 insertions(+), 125 deletions(-)
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/utils/SystemUtils.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/utils/SystemUtils.java
new file mode 100644
index 000000000..0cfe6f821
--- /dev/null
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/utils/SystemUtils.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.common.utils;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.Method;
+
+public abstract class SystemUtils {
+
+ public static final String OS_NAME = System.getProperty("os.name");
+
+ private static boolean isLinuxPlatform = false;
+
+ private static boolean isWindowsPlatform = false;
+
+ static {
+ if (OS_NAME != null && OS_NAME.toLowerCase().contains("linux")) {
+ isLinuxPlatform = true;
+ }
+
+ if (OS_NAME != null && OS_NAME.toLowerCase().contains("windows")) {
+ isWindowsPlatform = true;
+ }
+ }
+
+ private SystemUtils() {
+
+ }
+
+ public static boolean isLinuxPlatform() {
+ return isLinuxPlatform;
+ }
+
+ public static boolean isWindowsPlatform() {
+ return isWindowsPlatform;
+ }
+
+ public static String getProcessId() {
+ try {
+ //likely works on most platforms
+ final Class<?> managementFactoryClass = Class.forName("java.lang.management.ManagementFactory");
+ final Method getRuntimeMXBean = managementFactoryClass.getDeclaredMethod("getRuntimeMXBean");
+ final Class<?> runtimeMXBeanClass = Class.forName("java.lang.management.RuntimeMXBean");
+ final Method getName = runtimeMXBeanClass.getDeclaredMethod("getName");
+
+ final Object runtimeMXBean = getRuntimeMXBean.invoke(null);
+ final String name = (String) getName.invoke(runtimeMXBean);
+
+ return name.split("@")[0];
+ } catch (final Exception ex) {
+ try {
+ // try a Linux-specific way
+ return new File("/proc/self").getCanonicalFile().getName();
+ } catch (final IOException ignoredUseDefault) {
+ // Ignore exception.
+ }
+ }
+ return "-1";
+ }
+}
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/utils/ThreadUtils.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/utils/ThreadUtils.java
index b68050e17..03f070ceb 100644
--- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/utils/ThreadUtils.java
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/utils/ThreadUtils.java
@@ -17,8 +17,6 @@
package org.apache.eventmesh.common.utils;
-import org.apache.logging.log4j.util.ProcessIdUtil;
-
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
@@ -73,7 +71,7 @@ public class ThreadUtils {
if (currentPID == -1) {
synchronized (ThreadUtils.class) {
if (currentPID == -1) {
- currentPID = Long.parseLong(ProcessIdUtil.getProcessId());
+ currentPID = Long.parseLong(SystemUtils.getProcessId());
}
}
}
diff --git a/eventmesh-common/src/test/java/org/apache/eventmesh/common/utils/SystemUtilsTest.java b/eventmesh-common/src/test/java/org/apache/eventmesh/common/utils/SystemUtilsTest.java
new file mode 100644
index 000000000..a14b1ee18
--- /dev/null
+++ b/eventmesh-common/src/test/java/org/apache/eventmesh/common/utils/SystemUtilsTest.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.common.utils;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class SystemUtilsTest {
+
+ @Test
+ public void isLinuxPlatform() {
+ if (null != SystemUtils.OS_NAME && SystemUtils.OS_NAME.toLowerCase().contains("linux")) {
+ Assert.assertTrue(SystemUtils.isLinuxPlatform());
+ Assert.assertFalse(SystemUtils.isWindowsPlatform());
+ }
+ }
+
+ @Test
+ public void isWindowsPlatform() {
+ if (null != SystemUtils.OS_NAME && SystemUtils.OS_NAME.toLowerCase().contains("windows")) {
+ Assert.assertFalse(SystemUtils.isLinuxPlatform());
+ Assert.assertTrue(SystemUtils.isWindowsPlatform());
+ }
+ }
+
+ @Test
+ public void getProcessId() {
+ Assert.assertNotEquals("-1", SystemUtils.getProcessId());
+ }
+}
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractHTTPServer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractHTTPServer.java
index 1e1dc8060..8a1179ed1 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractHTTPServer.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractHTTPServer.java
@@ -64,11 +64,13 @@ import javax.net.ssl.SSLEngine;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.channel.epoll.EpollServerSocketChannel;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
@@ -135,6 +137,10 @@ public abstract class AbstractHTTPServer extends AbstractRemotingServer {
protected final transient Map<String/* request uri */, Pair<EventProcessor, ThreadPoolExecutor>>
eventProcessorTable = new ConcurrentHashMap<>(64);
+ private HttpConnectionHandler httpConnectionHandler;
+
+ private HTTPHandler httpHandler;
+
public AbstractHTTPServer(final int port, final boolean useTLS,
final EventMeshHTTPConfiguration eventMeshHttpConfiguration) {
super();
@@ -195,20 +201,22 @@ public abstract class AbstractHTTPServer extends AbstractRemotingServer {
@Override
public void start() throws Exception {
- final Runnable r = () -> {
- final ServerBootstrap b = new ServerBootstrap();
+
+ initSharableHandlers();
+
+ final Runnable runnable = () -> {
+ final ServerBootstrap bootstrap = new ServerBootstrap();
try {
- b.group(this.getBossGroup(), this.getWorkerGroup())
- .channel(NioServerSocketChannel.class)
- .childHandler(new HttpsServerInitializer(
- useTLS ? SSLContextFactory.getSslContext(eventMeshHttpConfiguration) : null))
+ bootstrap.group(this.getBossGroup(), this.getIoGroup())
+ .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
+ .childHandler(new HttpsServerInitializer(useTLS ? SSLContextFactory.getSslContext(eventMeshHttpConfiguration) : null))
.childOption(ChannelOption.SO_KEEPALIVE, Boolean.TRUE);
if (log.isInfoEnabled()) {
log.info("HTTPServer[port={}] started.", this.getPort());
}
- b.bind(this.getPort())
+ bootstrap.bind(this.getPort())
.channel()
.closeFuture()
.sync();
@@ -222,9 +230,9 @@ public abstract class AbstractHTTPServer extends AbstractRemotingServer {
}
};
- final Thread t = new Thread(r, "EventMesh-http-server");
- t.setDaemon(true);
- t.start();
+ final Thread thread = new Thread(runnable, "EventMesh-http-server");
+ thread.setDaemon(true);
+ thread.start();
started.compareAndSet(false, true);
}
@@ -310,8 +318,7 @@ public abstract class AbstractHTTPServer extends AbstractRemotingServer {
.parameters()
.forEach((key, value) -> httpRequestBody.put(key, value.get(0)));
} else if (HttpMethod.POST.equals(httpRequest.method())) {
- final HttpPostRequestDecoder decoder =
- new HttpPostRequestDecoder(DEFAULT_HTTP_DATA_FACTORY, httpRequest);
+ final HttpPostRequestDecoder decoder = new HttpPostRequestDecoder(DEFAULT_HTTP_DATA_FACTORY, httpRequest);
for (final InterfaceHttpData parm : decoder.getBodyHttpDatas()) {
if (InterfaceHttpData.HttpDataType.Attribute == parm.getHttpDataType()) {
final Attribute data = (Attribute) parm;
@@ -324,12 +331,18 @@ public abstract class AbstractHTTPServer extends AbstractRemotingServer {
return httpRequestBody;
}
- private class HTTPHandler extends ChannelInboundHandlerAdapter {
+ @Sharable
+ private class HTTPHandler extends SimpleChannelInboundHandler<HttpRequest> {
+ /**
+ * Is called for each message of type {@link HttpRequest}.
+ *
+ * @param ctx the {@link ChannelHandlerContext} which this {@link SimpleChannelInboundHandler} belongs to
+ * @param httpRequest the message to handle
+ * @throws Exception is thrown if an error occurred
+ */
@Override
- public void channelRead(final ChannelHandlerContext ctx, final Object message) {
- final HttpRequest httpRequest = (HttpRequest) message;
-
+ protected void channelRead0(ChannelHandlerContext ctx, HttpRequest httpRequest) throws Exception {
if (httpRequest == null) {
return;
}
@@ -433,7 +446,7 @@ public abstract class AbstractHTTPServer extends AbstractRemotingServer {
} catch (Exception ex) {
log.error("AbrstractHTTPServer.HTTPHandler.channelRead error", ex);
} finally {
- ReferenceCountUtil.release(message);
+ ReferenceCountUtil.release(httpRequest);
}
}
@@ -502,7 +515,7 @@ public abstract class AbstractHTTPServer extends AbstractRemotingServer {
}
public void processEventMeshRequest(final ChannelHandlerContext ctx,
- final AsyncContext<HttpCommand> asyncContext) {
+ final AsyncContext<HttpCommand> asyncContext) {
final HttpCommand request = asyncContext.getRequest();
final Pair<HttpRequestProcessor, ThreadPoolExecutor> choosed = processorTable.get(request.getRequestCode());
try {
@@ -644,6 +657,12 @@ public abstract class AbstractHTTPServer extends AbstractRemotingServer {
return httpEventWrapper;
}
+ private void initSharableHandlers() {
+ httpConnectionHandler = new HttpConnectionHandler();
+ httpHandler = new HTTPHandler();
+ }
+
+ @Sharable
private class HttpConnectionHandler extends ChannelDuplexHandler {
public final transient AtomicInteger connections = new AtomicInteger(0);
@@ -658,7 +677,6 @@ public abstract class AbstractHTTPServer extends AbstractRemotingServer {
ctx.close();
return;
}
-
super.channelActive(ctx);
}
@@ -701,14 +719,15 @@ public abstract class AbstractHTTPServer extends AbstractRemotingServer {
if (sslContext != null && useTLS) {
final SSLEngine sslEngine = sslContext.createSSLEngine();
sslEngine.setUseClientMode(false);
- pipeline.addFirst("ssl", new SslHandler(sslEngine));
+ pipeline.addFirst(getWorkerGroup(), "ssl", new SslHandler(sslEngine));
}
- pipeline.addLast(new HttpRequestDecoder(),
+ pipeline.addLast(getWorkerGroup(),
+ new HttpRequestDecoder(),
new HttpResponseEncoder(),
- new HttpConnectionHandler(),
+ httpConnectionHandler,
new HttpObjectAggregator(Integer.MAX_VALUE),
- new HTTPHandler());
+ httpHandler);
}
}
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractRemotingServer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractRemotingServer.java
index 1d84fcb9c..858ed6993 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractRemotingServer.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractRemotingServer.java
@@ -18,12 +18,16 @@
package org.apache.eventmesh.runtime.boot;
import org.apache.eventmesh.common.EventMeshThreadFactory;
+import org.apache.eventmesh.common.utils.SystemUtils;
import org.apache.eventmesh.common.utils.ThreadUtils;
import java.util.concurrent.TimeUnit;
import io.netty.channel.EventLoopGroup;
+import io.netty.channel.epoll.Epoll;
+import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.util.concurrent.EventExecutorGroup;
import lombok.extern.slf4j.Slf4j;
@@ -37,7 +41,7 @@ public abstract class AbstractRemotingServer {
private EventLoopGroup ioGroup;
- private EventLoopGroup workerGroup;
+ private EventExecutorGroup workerGroup;
private int port;
@@ -51,7 +55,7 @@ public abstract class AbstractRemotingServer {
return ioGroup;
}
- public EventLoopGroup getWorkerGroup() {
+ public EventExecutorGroup getWorkerGroup() {
return workerGroup;
}
@@ -67,7 +71,7 @@ public abstract class AbstractRemotingServer {
this.ioGroup = ioGroup;
}
- public void setWorkerGroup(final EventLoopGroup workerGroup) {
+ public void setWorkerGroup(final EventExecutorGroup workerGroup) {
this.workerGroup = workerGroup;
}
@@ -75,25 +79,31 @@ public abstract class AbstractRemotingServer {
this.port = port;
}
- private EventLoopGroup initBossGroup(final String threadPrefix) {
- bossGroup = new NioEventLoopGroup(1, new EventMeshThreadFactory(threadPrefix + "-boss", true));
- return bossGroup;
+ private void buildBossGroup(final String threadPrefix) {
+ if (useEpoll()) {
+ bossGroup = new EpollEventLoopGroup(1, new EventMeshThreadFactory(threadPrefix + "NettyEpoll-Boss", true));
+ } else {
+ bossGroup = new NioEventLoopGroup(1, new EventMeshThreadFactory(threadPrefix + "NettyNio-Boss", true));
+ }
+
}
- private EventLoopGroup initIOGroup(final String threadPrefix, final int threadNum) {
- ioGroup = new NioEventLoopGroup(threadNum, new EventMeshThreadFactory(threadPrefix + "-io"));
- return ioGroup;
+ private void buildIOGroup(final String threadPrefix, final int threadNum) {
+ if (useEpoll()) {
+ ioGroup = new EpollEventLoopGroup(threadNum, new EventMeshThreadFactory(threadPrefix + "-NettyEpoll-IO"));
+ } else {
+ ioGroup = new NioEventLoopGroup(threadNum, new EventMeshThreadFactory(threadPrefix + "-NettyNio-IO"));
+ }
}
- private EventLoopGroup initWorkerGroup(final String threadPrefix, final int threadNum) {
+ private void buildWorkerGroup(final String threadPrefix, final int threadNum) {
workerGroup = new NioEventLoopGroup(threadNum, new EventMeshThreadFactory(threadPrefix + "-worker"));
- return workerGroup;
}
public void init(final String threadPrefix) throws Exception {
- initBossGroup(threadPrefix);
- initIOGroup(threadPrefix, MAX_THREADS);
- initWorkerGroup(threadPrefix, MAX_THREADS);
+ buildBossGroup(threadPrefix);
+ buildIOGroup(threadPrefix, MAX_THREADS);
+ buildWorkerGroup(threadPrefix, MAX_THREADS);
}
public void shutdown() throws Exception {
@@ -121,5 +131,9 @@ public abstract class AbstractRemotingServer {
}
}
+ protected boolean useEpoll() {
+ return SystemUtils.isLinuxPlatform() && Epoll.isAvailable();
+ }
+
public abstract void start() throws Exception;
}
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshHTTPServer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshHTTPServer.java
index 338f2d758..11d3d61ea 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshHTTPServer.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshHTTPServer.java
@@ -116,10 +116,9 @@ public class EventMeshHTTPServer extends AbstractHTTPServer {
public transient HTTPClientPool httpClientPool = new HTTPClientPool(10);
- public EventMeshHTTPServer(final EventMeshServer eventMeshServer,
- final EventMeshHTTPConfiguration eventMeshHttpConfiguration) {
- super(eventMeshHttpConfiguration.getHttpServerPort(),
- eventMeshHttpConfiguration.isEventMeshServerUseTls(), eventMeshHttpConfiguration);
+ public EventMeshHTTPServer(final EventMeshServer eventMeshServer, final EventMeshHTTPConfiguration eventMeshHttpConfiguration) {
+
+ super(eventMeshHttpConfiguration.getHttpServerPort(), eventMeshHttpConfiguration.isEventMeshServerUseTls(), eventMeshHttpConfiguration);
this.eventMeshServer = eventMeshServer;
this.eventMeshHttpConfiguration = eventMeshHttpConfiguration;
this.registry = eventMeshServer.getRegistry();
@@ -127,21 +126,6 @@ public class EventMeshHTTPServer extends AbstractHTTPServer {
}
- public EventMeshServer getEventMeshServer() {
- return eventMeshServer;
- }
-
-
- public void shutdownThreadPool() {
- batchMsgExecutor.shutdown();
- adminExecutor.shutdown();
- clientManageExecutor.shutdown();
- sendMsgExecutor.shutdown();
- remoteMsgExecutor.shutdown();
- pushMsgExecutor.shutdown();
- replyMsgExecutor.shutdown();
- }
-
private void initThreadPool() {
batchMsgExecutor = ThreadPoolFactory.createThreadPoolExecutor(
@@ -187,40 +171,28 @@ public class EventMeshHTTPServer extends AbstractHTTPServer {
"eventMesh-replyMsg", true);
}
- public ThreadPoolExecutor getBatchMsgExecutor() {
- return batchMsgExecutor;
- }
-
- public ThreadPoolExecutor getSendMsgExecutor() {
- return sendMsgExecutor;
- }
-
- public ThreadPoolExecutor getReplyMsgExecutor() {
- return replyMsgExecutor;
- }
-
- public ThreadPoolExecutor getPushMsgExecutor() {
- return pushMsgExecutor;
- }
-
- public ThreadPoolExecutor getClientManageExecutor() {
- return clientManageExecutor;
- }
-
- public ThreadPoolExecutor getAdminExecutor() {
- return adminExecutor;
- }
-
- public RateLimiter getMsgRateLimiter() {
- return msgRateLimiter;
- }
-
- public RateLimiter getBatchRateLimiter() {
- return batchRateLimiter;
- }
-
- public Registry getRegistry() {
- return registry;
+ public void shutdownThreadPool() {
+ if (batchMsgExecutor != null) {
+ batchMsgExecutor.shutdown();
+ }
+ if (adminExecutor != null) {
+ adminExecutor.shutdown();
+ }
+ if (clientManageExecutor != null) {
+ clientManageExecutor.shutdown();
+ }
+ if (sendMsgExecutor != null) {
+ sendMsgExecutor.shutdown();
+ }
+ if (remoteMsgExecutor != null) {
+ remoteMsgExecutor.shutdown();
+ }
+ if (pushMsgExecutor != null) {
+ pushMsgExecutor.shutdown();
+ }
+ if (replyMsgExecutor != null) {
+ replyMsgExecutor.shutdown();
+ }
}
private void init() throws Exception {
@@ -269,7 +241,7 @@ public class EventMeshHTTPServer extends AbstractHTTPServer {
registerHTTPRequestProcessor();
this.initWebhook();
if (log.isInfoEnabled()) {
- log.info("--------EventMeshHTTPServer inited------------------");
+ log.info("==================EventMeshHTTPServer initialized==================");
}
}
@@ -285,7 +257,7 @@ public class EventMeshHTTPServer extends AbstractHTTPServer {
this.register();
}
if (log.isInfoEnabled()) {
- log.info("--------EventMeshHTTPServer started------------------");
+ log.info("==================EventMeshHTTPServer started==================");
}
}
@@ -310,7 +282,7 @@ public class EventMeshHTTPServer extends AbstractHTTPServer {
this.unRegister();
}
if (log.isInfoEnabled()) {
- log.info("-------------EventMeshHTTPServer shutdown-------------");
+ log.info("==================EventMeshHTTPServer shutdown==================");
}
}
@@ -441,4 +413,44 @@ public class EventMeshHTTPServer extends AbstractHTTPServer {
public Acl getAcl() {
return acl;
}
+
+ public EventMeshServer getEventMeshServer() {
+ return eventMeshServer;
+ }
+
+ public ThreadPoolExecutor getBatchMsgExecutor() {
+ return batchMsgExecutor;
+ }
+
+ public ThreadPoolExecutor getSendMsgExecutor() {
+ return sendMsgExecutor;
+ }
+
+ public ThreadPoolExecutor getReplyMsgExecutor() {
+ return replyMsgExecutor;
+ }
+
+ public ThreadPoolExecutor getPushMsgExecutor() {
+ return pushMsgExecutor;
+ }
+
+ public ThreadPoolExecutor getClientManageExecutor() {
+ return clientManageExecutor;
+ }
+
+ public ThreadPoolExecutor getAdminExecutor() {
+ return adminExecutor;
+ }
+
+ public RateLimiter getMsgRateLimiter() {
+ return msgRateLimiter;
+ }
+
+ public RateLimiter getBatchRateLimiter() {
+ return batchRateLimiter;
+ }
+
+ public Registry getRegistry() {
+ return registry;
+ }
}
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshHttpBootstrap.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshHttpBootstrap.java
index f4550fe71..95512b7e5 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshHttpBootstrap.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshHttpBootstrap.java
@@ -49,14 +49,15 @@ public class EventMeshHttpBootstrap implements EventMeshBootstrap {
@Override
public void start() throws Exception {
// server start
- if (eventMeshHttpConfiguration != null) {
+ if (eventMeshHttpServer != null) {
eventMeshHttpServer.start();
}
}
@Override
public void shutdown() throws Exception {
- if (eventMeshHttpConfiguration != null) {
+ //server shutdown
+ if (eventMeshHttpServer != null) {
eventMeshHttpServer.shutdown();
}
}
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshServer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshServer.java
index 7e8273545..b9e885106 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshServer.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshServer.java
@@ -160,7 +160,7 @@ public class EventMeshServer {
}
public void shutdown() throws Exception {
- serviceState = ServiceState.STOPING;
+ serviceState = ServiceState.STOPPING;
if (log.isInfoEnabled()) {
log.info(SERVER_STATE_MSG, serviceState);
}
@@ -184,7 +184,7 @@ public class EventMeshServer {
}
ConfigurationContextUtil.clear();
- serviceState = ServiceState.STOPED;
+ serviceState = ServiceState.STOPPED;
if (log.isInfoEnabled()) {
log.info(SERVER_STATE_MSG, serviceState);
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshTCPServer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshTCPServer.java
index f9bb8bffb..d7c09d6dd 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshTCPServer.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshTCPServer.java
@@ -58,11 +58,13 @@ import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
+import io.netty.channel.epoll.EpollServerSocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.handler.traffic.ChannelTrafficShapingHandler;
import io.netty.handler.traffic.GlobalTrafficShapingHandler;
+
import com.google.common.util.concurrent.RateLimiter;
import lombok.extern.slf4j.Slf4j;
@@ -82,6 +84,8 @@ public class EventMeshTCPServer extends AbstractRemotingServer {
private transient GlobalTrafficShapingHandler globalTrafficShapingHandler;
+ private EventMeshTcpConnectionHandler eventMeshTcpConnectionHandler;
+
private transient ScheduledExecutorService scheduler;
private transient ExecutorService taskHandleExecutorService;
@@ -98,6 +102,7 @@ public class EventMeshTCPServer extends AbstractRemotingServer {
private transient RateLimiter rateLimiter;
+
public void setClientSessionGroupMapping(final ClientSessionGroupMapping clientSessionGroupMapping) {
this.clientSessionGroupMapping = clientSessionGroupMapping;
}
@@ -140,19 +145,18 @@ public class EventMeshTCPServer extends AbstractRemotingServer {
}
private void startServer() {
- Runnable r = () -> {
+ Runnable runnable = () -> {
ServerBootstrap bootstrap = new ServerBootstrap();
ChannelInitializer channelInitializer = new ChannelInitializer() {
@Override
public void initChannel(final Channel ch) throws Exception {
ch.pipeline()
- .addLast(new Codec.Encoder())
- .addLast(new Codec.Decoder())
- .addLast("global-traffic-shaping", globalTrafficShapingHandler)
- .addLast("channel-traffic-shaping", newCTSHandler(eventMeshTCPConfiguration.getCtc().getReadLimit()))
- .addLast(new EventMeshTcpConnectionHandler(EventMeshTCPServer.this))
- .addLast(
- getWorkerGroup(),
+ .addLast(getWorkerGroup(), new Codec.Encoder())
+ .addLast(getWorkerGroup(), new Codec.Decoder())
+ .addLast(getWorkerGroup(), "global-traffic-shaping", globalTrafficShapingHandler)
+ .addLast(getWorkerGroup(), "channel-traffic-shaping", newCTSHandler(eventMeshTCPConfiguration.getCtc().getReadLimit()))
+ .addLast(getWorkerGroup(), eventMeshTcpConnectionHandler)
+ .addLast(getWorkerGroup(),
new IdleStateHandler(
eventMeshTCPConfiguration.getEventMeshTcpIdleReadSeconds(),
eventMeshTCPConfiguration.getEventMeshTcpIdleWriteSeconds(),
@@ -164,7 +168,7 @@ public class EventMeshTCPServer extends AbstractRemotingServer {
};
bootstrap.group(this.getBossGroup(), this.getIoGroup())
- .channel(NioServerSocketChannel.class)
+ .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128)
.option(ChannelOption.SO_REUSEADDR, true)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10_000)
@@ -194,8 +198,8 @@ public class EventMeshTCPServer extends AbstractRemotingServer {
}
};
- Thread t = new Thread(r, "eventMesh-tcp-server");
- t.start();
+ Thread thread = new Thread(runnable, "eventMesh-tcp-server");
+ thread.start();
}
public void init() throws Exception {
@@ -208,6 +212,8 @@ public class EventMeshTCPServer extends AbstractRemotingServer {
globalTrafficShapingHandler = newGTSHandler(scheduler, eventMeshTCPConfiguration.getGtc().getReadLimit());
+ eventMeshTcpConnectionHandler = new EventMeshTcpConnectionHandler(this);
+
adminWebHookConfigOperationManage = new AdminWebHookConfigOperationManager();
adminWebHookConfigOperationManage.init();
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/common/ServiceState.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/common/ServiceState.java
index 6848b2c2b..841450d5b 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/common/ServiceState.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/common/ServiceState.java
@@ -23,7 +23,7 @@ public enum ServiceState {
RUNNING,
- STOPING,
+ STOPPING,
- STOPED;
+ STOPPED;
}
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/consumer/SubscriptionManager.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/consumer/SubscriptionManager.java
index 0d1ae29e6..37ac2719e 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/consumer/SubscriptionManager.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/consumer/SubscriptionManager.java
@@ -36,11 +36,9 @@ import lombok.extern.slf4j.Slf4j;
@Slf4j
public class SubscriptionManager {
- private final ConcurrentHashMap<String /**group*/, ConsumerGroupConf> localConsumerGroupMapping =
- new ConcurrentHashMap<>();
+ private final ConcurrentHashMap<String /**group*/, ConsumerGroupConf> localConsumerGroupMapping = new ConcurrentHashMap<>(64);
- private final ConcurrentHashMap<String /**group@topic*/, List<Client>> localClientInfoMapping =
- new ConcurrentHashMap<>();
+ private final ConcurrentHashMap<String /**group@topic*/, List<Client>> localClientInfoMapping = new ConcurrentHashMap<>(64);
public ConcurrentHashMap<String, ConsumerGroupConf> getLocalConsumerGroupMapping() {
return localConsumerGroupMapping;
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/consumer/EventMeshConsumer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/consumer/EventMeshConsumer.java
index 14a92ebbc..1a1f81e7e 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/consumer/EventMeshConsumer.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/consumer/EventMeshConsumer.java
@@ -191,7 +191,7 @@ public class EventMeshConsumer {
persistentMqConsumer.shutdown();
broadcastMqConsumer.shutdown();
- serviceState = ServiceState.STOPED;
+ serviceState = ServiceState.STOPPED;
if (log.isInfoEnabled()) {
log.info("EventMeshConsumer [{}] shutdown.........", consumerGroup);
}
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/producer/EventMeshProducer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/producer/EventMeshProducer.java
index 69cc46af6..12bc3844d 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/producer/EventMeshProducer.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/grpc/producer/EventMeshProducer.java
@@ -87,7 +87,7 @@ public class EventMeshProducer {
}
mqProducerWrapper.shutdown();
- serviceState = ServiceState.STOPED;
+ serviceState = ServiceState.STOPPED;
log.info("EventMeshProducer [{}] shutdown.........", producerGroupConfig.getGroupName());
}
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/EventMeshTcpConnectionHandler.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/EventMeshTcpConnectionHandler.java
index 82718e182..9f8a92e3f 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/EventMeshTcpConnectionHandler.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/EventMeshTcpConnectionHandler.java
@@ -23,6 +23,7 @@ import org.apache.eventmesh.runtime.util.RemotingHelper;
import java.util.concurrent.atomic.AtomicInteger;
import io.netty.channel.ChannelDuplexHandler;
+import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
@@ -31,6 +32,7 @@ import io.netty.handler.timeout.IdleStateEvent;
import lombok.extern.slf4j.Slf4j;
@Slf4j
+@Sharable
public class EventMeshTcpConnectionHandler extends ChannelDuplexHandler {
public static final AtomicInteger connections = new AtomicInteger(0);
@@ -59,11 +61,8 @@ public class EventMeshTcpConnectionHandler extends ChannelDuplexHandler {
final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
log.info("client|tcp|channelActive|remoteAddress={}|msg={}", remoteAddress, "");
- int c = connections.incrementAndGet();
- if (c > eventMeshTCPServer.getEventMeshTCPConfiguration().getEventMeshTcpClientMaxNum()) {
- log.warn("client|tcp|channelActive|remoteAddress={}|msg={}", remoteAddress, "too many client connect "
- +
- "this eventMesh server");
+ if (connections.incrementAndGet() > eventMeshTCPServer.getEventMeshTCPConfiguration().getEventMeshTcpClientMaxNum()) {
+ log.warn("client|tcp|channelActive|remoteAddress={}|msg={}", remoteAddress, "too many client connect this eventMesh server");
ctx.close();
return;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org