You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by mi...@apache.org on 2022/12/31 10:23:56 UTC
[incubator-eventmesh] branch master updated: fix issue2707
This is an automated email from the ASF dual-hosted git repository.
mikexue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git
The following commit(s) were added to refs/heads/master by this push:
new 556a56631 fix issue2707
new d4e863e99 Merge pull request #2752 from jonyangx/issue2707
556a56631 is described below
commit 556a56631932731dd8bdf0481cd0c152baac89a0
Author: jonyangx <ya...@gmail.com>
AuthorDate: Sat Dec 31 14:32:10 2022 +0800
fix issue2707
---
.../eventmesh/runtime/boot/AbstractHTTPServer.java | 97 +++++-----
.../runtime/boot/AbstractRemotingServer.java | 104 ++++++----
.../eventmesh/runtime/boot/EventMeshTCPServer.java | 211 +++++++++++----------
3 files changed, 233 insertions(+), 179 deletions(-)
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractHTTPServer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractHTTPServer.java
index 1001cfbc3..d24afbcb1 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractHTTPServer.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractHTTPServer.java
@@ -125,20 +125,20 @@ public abstract class AbstractHTTPServer extends AbstractRemotingServer {
private final EventMeshHTTPConfiguration eventMeshHttpConfiguration;
public ThreadPoolExecutor asyncContextCompleteHandler =
- ThreadPoolFactory.createThreadPoolExecutor(10, 10, "EventMesh-http-asyncContext-");
+ ThreadPoolFactory.createThreadPoolExecutor(10, 10, "EventMesh-http-asyncContext-");
static {
DiskAttribute.deleteOnExitTemporaryFile = false;
}
protected final Map<String/* request code */, Pair<HttpRequestProcessor, ThreadPoolExecutor>>
- processorTable = new HashMap<>(64);
+ processorTable = new HashMap<>(64);
protected final Map<String/* request uri */, Pair<EventProcessor, ThreadPoolExecutor>>
- eventProcessorTable = new HashMap<>(64);
+ eventProcessorTable = new HashMap<>(64);
public AbstractHTTPServer(int port, boolean useTLS, EventMeshHTTPConfiguration eventMeshHttpConfiguration) {
- this.port = port;
+ this.setPort(port);
this.useTLS = useTLS;
this.eventMeshHttpConfiguration = eventMeshHttpConfiguration;
}
@@ -147,7 +147,7 @@ public abstract class AbstractHTTPServer extends AbstractRemotingServer {
FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, status);
HttpHeaders responseHeaders = response.headers();
responseHeaders.add(
- HttpHeaderNames.CONTENT_TYPE, String.format("text/plain; charset=%s", EventMeshConstants.DEFAULT_CHARSET)
+ HttpHeaderNames.CONTENT_TYPE, String.format("text/plain; charset=%s", EventMeshConstants.DEFAULT_CHARSET)
);
responseHeaders.add(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes());
responseHeaders.add(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
@@ -159,7 +159,7 @@ public abstract class AbstractHTTPServer extends AbstractRemotingServer {
ctx.writeAndFlush(response).addListener((ChannelFutureListener) f -> {
if (!f.isSuccess()) {
httpLogger.warn("send response to [{}] fail, will close this channel",
- RemotingHelper.parseChannelRemoteAddr(f.channel()));
+ RemotingHelper.parseChannelRemoteAddr(f.channel()));
f.channel().close();
}
});
@@ -167,17 +167,16 @@ public abstract class AbstractHTTPServer extends AbstractRemotingServer {
@Override
public void start() throws Exception {
- super.start();
Runnable r = () -> {
ServerBootstrap b = new ServerBootstrap();
SSLContext sslContext = useTLS ? SSLContextFactory.getSslContext(eventMeshHttpConfiguration) : null;
- b.group(this.bossGroup, this.workerGroup)
- .channel(NioServerSocketChannel.class)
- .childHandler(new HttpsServerInitializer(sslContext))
- .childOption(ChannelOption.SO_KEEPALIVE, Boolean.TRUE);
+ b.group(this.getBossGroup(), this.getWorkerGroup())
+ .channel(NioServerSocketChannel.class)
+ .childHandler(new HttpsServerInitializer(sslContext))
+ .childOption(ChannelOption.SO_KEEPALIVE, Boolean.TRUE);
try {
- httpServerLogger.info("HTTPServer[port={}] started......", this.port);
- ChannelFuture future = b.bind(this.port).sync();
+ httpServerLogger.info("HTTPServer[port={}] started......", this.getPort());
+ ChannelFuture future = b.bind(this.getPort()).sync();
future.channel().closeFuture().sync();
} catch (Exception e) {
httpServerLogger.error("HTTPServer start Err!", e);
@@ -254,7 +253,7 @@ public abstract class AbstractHTTPServer extends AbstractRemotingServer {
long startTime = System.currentTimeMillis();
HttpHeaders requestHeaders = httpRequest.headers();
requestHeaders.set(ProtocolKey.ClientInstanceKey.IP,
- RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
+ RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
String protocolVersion = httpRequest.headers().get(ProtocolKey.VERSION);
if (StringUtils.isBlank(protocolVersion)) {
@@ -279,7 +278,7 @@ public abstract class AbstractHTTPServer extends AbstractRemotingServer {
getDecoder.parameters().forEach((key, value) -> httpRequestBody.put(key, value.get(0)));
} else if (HttpMethod.POST.equals(httpRequest.method())) {
HttpPostRequestDecoder decoder =
- new HttpPostRequestDecoder(defaultHttpDataFactory, httpRequest);
+ new HttpPostRequestDecoder(defaultHttpDataFactory, httpRequest);
for (InterfaceHttpData parm : decoder.getBodyHttpDatas()) {
if (parm.getHttpDataType() == InterfaceHttpData.HttpDataType.Attribute) {
Attribute data = (Attribute) parm;
@@ -318,7 +317,7 @@ public abstract class AbstractHTTPServer extends AbstractRemotingServer {
sendError(ctx, errorStatus);
span = TraceUtils.prepareServerSpan(headerMap,
- EventMeshTraceConstants.TRACE_UPSTREAM_EVENTMESH_SERVER_SPAN, false);
+ EventMeshTraceConstants.TRACE_UPSTREAM_EVENTMESH_SERVER_SPAN, false);
TraceUtils.finishSpanWithException(span, headerMap, errorStatus.reasonPhrase(), null);
return;
}
@@ -340,7 +339,7 @@ public abstract class AbstractHTTPServer extends AbstractRemotingServer {
HttpEventWrapper httpEventWrapper = parseHttpRequest(httpRequest);
AsyncContext<HttpEventWrapper> asyncContext =
- new AsyncContext<>(httpEventWrapper, null, asyncContextCompleteHandler);
+ new AsyncContext<>(httpEventWrapper, null, asyncContextCompleteHandler);
processHttpRequest(ctx, asyncContext);
} else {
@@ -349,9 +348,9 @@ public abstract class AbstractHTTPServer extends AbstractRemotingServer {
final Map<String, Object> bodyMap = parseHttpRequestBody(httpRequest);
String requestCode =
- (httpRequest.method() == HttpMethod.POST)
- ? httpRequest.headers().get(ProtocolKey.REQUEST_CODE)
- : MapUtils.getString(bodyMap, StringUtils.lowerCase(ProtocolKey.REQUEST_CODE), "");
+ (httpRequest.method() == HttpMethod.POST)
+ ? httpRequest.headers().get(ProtocolKey.REQUEST_CODE)
+ : MapUtils.getString(bodyMap, StringUtils.lowerCase(ProtocolKey.REQUEST_CODE), "");
requestCommand.setHttpMethod(httpRequest.method().name());
requestCommand.setHttpVersion(httpRequest.protocolVersion().protocolName());
@@ -360,16 +359,16 @@ public abstract class AbstractHTTPServer extends AbstractRemotingServer {
HttpCommand responseCommand = null;
if (StringUtils.isBlank(requestCode)
- || !processorTable.containsKey(requestCode)
- || !RequestCode.contains(Integer.valueOf(requestCode))) {
+ || !processorTable.containsKey(requestCode)
+ || !RequestCode.contains(Integer.valueOf(requestCode))) {
responseCommand =
- requestCommand.createHttpCommandResponse(EventMeshRetCode.EVENTMESH_REQUESTCODE_INVALID);
+ requestCommand.createHttpCommandResponse(EventMeshRetCode.EVENTMESH_REQUESTCODE_INVALID);
sendResponse(ctx, responseCommand.httpResponse());
span = TraceUtils.prepareServerSpan(headerMap,
- EventMeshTraceConstants.TRACE_UPSTREAM_EVENTMESH_SERVER_SPAN, false);
+ EventMeshTraceConstants.TRACE_UPSTREAM_EVENTMESH_SERVER_SPAN, false);
TraceUtils.finishSpanWithException(span, headerMap,
- EventMeshRetCode.EVENTMESH_REQUESTCODE_INVALID.getErrMsg(), null);
+ EventMeshRetCode.EVENTMESH_REQUESTCODE_INVALID.getErrMsg(), null);
return;
}
@@ -381,9 +380,9 @@ public abstract class AbstractHTTPServer extends AbstractRemotingServer {
sendResponse(ctx, responseCommand.httpResponse());
span = TraceUtils.prepareServerSpan(headerMap,
- EventMeshTraceConstants.TRACE_UPSTREAM_EVENTMESH_SERVER_SPAN, false);
+ EventMeshTraceConstants.TRACE_UPSTREAM_EVENTMESH_SERVER_SPAN, false);
TraceUtils.finishSpanWithException(span, headerMap,
- EventMeshRetCode.EVENTMESH_RUNTIME_ERR.getErrMsg(), e);
+ EventMeshRetCode.EVENTMESH_RUNTIME_ERR.getErrMsg(), e);
return;
}
@@ -392,7 +391,7 @@ public abstract class AbstractHTTPServer extends AbstractRemotingServer {
}
AsyncContext<HttpCommand> asyncContext =
- new AsyncContext<>(requestCommand, responseCommand, asyncContextCompleteHandler);
+ new AsyncContext<>(requestCommand, responseCommand, asyncContextCompleteHandler);
processEventMeshRequest(ctx, asyncContext);
}
@@ -422,7 +421,7 @@ public abstract class AbstractHTTPServer extends AbstractRemotingServer {
if (processor.rejectRequest()) {
HttpEventWrapper responseWrapper =
- requestWrapper.createHttpResponse(EventMeshRetCode.EVENTMESH_REJECT_BY_PROCESSOR_ERROR);
+ requestWrapper.createHttpResponse(EventMeshRetCode.EVENTMESH_REJECT_BY_PROCESSOR_ERROR);
asyncContext.onComplete(responseWrapper);
if (asyncContext.isComplete()) {
@@ -440,7 +439,7 @@ public abstract class AbstractHTTPServer extends AbstractRemotingServer {
}
metrics.getSummaryMetrics()
- .recordHTTPReqResTimeCost(System.currentTimeMillis() - requestWrapper.getReqTime());
+ .recordHTTPReqResTimeCost(System.currentTimeMillis() - requestWrapper.getReqTime());
if (httpLogger.isDebugEnabled()) {
httpLogger.debug("{}", asyncContext.getResponse());
@@ -456,7 +455,7 @@ public abstract class AbstractHTTPServer extends AbstractRemotingServer {
asyncContext.onComplete(responseWrapper);
metrics.getSummaryMetrics().recordHTTPDiscard();
metrics.getSummaryMetrics().recordHTTPReqResTimeCost(
- System.currentTimeMillis() - requestWrapper.getReqTime());
+ System.currentTimeMillis() - requestWrapper.getReqTime());
try {
sendResponse(ctx, asyncContext.getResponse().httpResponse());
} catch (Exception e) {
@@ -476,7 +475,7 @@ public abstract class AbstractHTTPServer extends AbstractRemotingServer {
HttpRequestProcessor processor = choosed.getObject1();
if (processor.rejectRequest()) {
HttpCommand responseCommand =
- request.createHttpCommandResponse(EventMeshRetCode.EVENTMESH_REJECT_BY_PROCESSOR_ERROR);
+ request.createHttpCommandResponse(EventMeshRetCode.EVENTMESH_REJECT_BY_PROCESSOR_ERROR);
asyncContext.onComplete(responseCommand);
if (asyncContext.isComplete()) {
if (httpLogger.isDebugEnabled()) {
@@ -486,10 +485,10 @@ public abstract class AbstractHTTPServer extends AbstractRemotingServer {
Map<String, Object> traceMap = asyncContext.getRequest().getHeader().toMap();
Span span = TraceUtils.prepareServerSpan(traceMap,
- EventMeshTraceConstants.TRACE_UPSTREAM_EVENTMESH_SERVER_SPAN,
- false);
+ EventMeshTraceConstants.TRACE_UPSTREAM_EVENTMESH_SERVER_SPAN,
+ false);
TraceUtils.finishSpanWithException(span, traceMap,
- EventMeshRetCode.EVENTMESH_REJECT_BY_PROCESSOR_ERROR.getErrMsg(), null);
+ EventMeshRetCode.EVENTMESH_REJECT_BY_PROCESSOR_ERROR.getErrMsg(), null);
}
return;
}
@@ -500,7 +499,7 @@ public abstract class AbstractHTTPServer extends AbstractRemotingServer {
}
metrics.getSummaryMetrics()
- .recordHTTPReqResTimeCost(System.currentTimeMillis() - request.getReqTime());
+ .recordHTTPReqResTimeCost(System.currentTimeMillis() - request.getReqTime());
if (httpLogger.isDebugEnabled()) {
httpLogger.debug("{}", asyncContext.getResponse());
@@ -522,9 +521,9 @@ public abstract class AbstractHTTPServer extends AbstractRemotingServer {
Map<String, Object> traceMap = asyncContext.getRequest().getHeader().toMap();
Span span = TraceUtils.prepareServerSpan(traceMap,
- EventMeshTraceConstants.TRACE_UPSTREAM_EVENTMESH_SERVER_SPAN, false);
+ EventMeshTraceConstants.TRACE_UPSTREAM_EVENTMESH_SERVER_SPAN, false);
TraceUtils.finishSpanWithException(span, traceMap,
- EventMeshRetCode.EVENTMESH_RUNTIME_ERR.getErrMsg(), re);
+ EventMeshRetCode.EVENTMESH_RUNTIME_ERR.getErrMsg(), re);
} catch (Exception e) {
httpServerLogger.error("processEventMeshRequest fail", re);
}
@@ -573,18 +572,18 @@ public abstract class AbstractHTTPServer extends AbstractRemotingServer {
} else if (HttpMethod.POST == fullHttpRequest.method()) {
if (StringUtils.contains(httpRequest.headers().get("Content-Type"),
- ContentType.APPLICATION_JSON.getMimeType())) {
+ ContentType.APPLICATION_JSON.getMimeType())) {
int length = fullHttpRequest.content().readableBytes();
if (length > 0) {
byte[] body = new byte[length];
fullHttpRequest.content().readBytes(body);
bodyMap.putAll(Objects.requireNonNull(JsonUtils.deserialize(new String(body, Constants.DEFAULT_CHARSET),
- new TypeReference<Map<String, Object>>() {
- })));
+ new TypeReference<Map<String, Object>>() {
+ })));
}
} else {
HttpPostRequestDecoder decoder =
- new HttpPostRequestDecoder(defaultHttpDataFactory, httpRequest);
+ new HttpPostRequestDecoder(defaultHttpDataFactory, httpRequest);
for (InterfaceHttpData parm : decoder.getBodyHttpDatas()) {
if (parm.getHttpDataType() == InterfaceHttpData.HttpDataType.Attribute) {
Attribute data = (Attribute) parm;
@@ -625,8 +624,8 @@ public abstract class AbstractHTTPServer extends AbstractRemotingServer {
int c = connections.incrementAndGet();
if (c > 20000) {
httpServerLogger
- .warn("client|http|channelActive|remoteAddress={}|msg={}", remoteAddress,
- "too many client(20000) connect this eventMesh server");
+ .warn("client|http|channelActive|remoteAddress={}|msg={}", remoteAddress,
+ "too many client(20000) connect this eventMesh server");
ctx.close();
return;
}
@@ -647,7 +646,7 @@ public abstract class AbstractHTTPServer extends AbstractRemotingServer {
if (event.state().equals(IdleState.ALL_IDLE)) {
final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
httpServerLogger.info("client|http|userEventTriggered|remoteAddress={}|msg={}",
- remoteAddress, evt.getClass().getName());
+ remoteAddress, evt.getClass().getName());
ctx.close();
}
}
@@ -675,10 +674,10 @@ public abstract class AbstractHTTPServer extends AbstractRemotingServer {
pipeline.addFirst("ssl", new SslHandler(sslEngine));
}
pipeline.addLast(new HttpRequestDecoder(),
- new HttpResponseEncoder(),
- new HttpConnectionHandler(),
- new HttpObjectAggregator(Integer.MAX_VALUE),
- new HTTPHandler());
+ new HttpResponseEncoder(),
+ new HttpConnectionHandler(),
+ new HttpObjectAggregator(Integer.MAX_VALUE),
+ new HTTPHandler());
}
}
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractRemotingServer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractRemotingServer.java
index f97f7b202..71742cfa9 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractRemotingServer.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractRemotingServer.java
@@ -22,29 +22,66 @@ import org.apache.eventmesh.common.utils.ThreadUtils;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
-import lombok.extern.slf4j.Slf4j;
-
-@Slf4j
public abstract class AbstractRemotingServer {
-
- public EventLoopGroup bossGroup;
+ private static final Logger LOGGER = LoggerFactory.getLogger(AbstractRemotingServer.class);
+
+ private static final int DEFAULT_SLEEP_SECONDS = 30;
+
+ private EventLoopGroup bossGroup;
+
+ private EventLoopGroup ioGroup;
+
+ private EventLoopGroup workerGroup;
+
+ private int port;
+
+ private static final int MAX_THREADS = Runtime.getRuntime().availableProcessors();
+
+ public EventLoopGroup getBossGroup() {
+ return bossGroup;
+ }
+
+ public EventLoopGroup getIoGroup() {
+ return ioGroup;
+ }
+
+ public EventLoopGroup getWorkerGroup() {
+ return workerGroup;
+ }
+
+ public int getPort() {
+ return port;
+ }
- public EventLoopGroup ioGroup;
+ public void setBossGroup(final EventLoopGroup bossGroup) {
+ this.bossGroup = bossGroup;
+ }
- public EventLoopGroup workerGroup;
+ public void setIoGroup(final EventLoopGroup ioGroup) {
+ this.ioGroup = ioGroup;
+ }
- public int port;
+ public void setWorkerGroup(final EventLoopGroup workerGroup) {
+ this.workerGroup = workerGroup;
+ }
- private EventLoopGroup initBossGroup(String threadPrefix) {
+ public void setPort(final int port) {
+ this.port = port;
+ }
+
+ private EventLoopGroup initBossGroup(final String threadPrefix) {
bossGroup = new NioEventLoopGroup(1, new ThreadFactory() {
- final AtomicInteger count = new AtomicInteger(0);
+ private final AtomicInteger count = new AtomicInteger(0);
@Override
- public Thread newThread(Runnable r) {
- Thread t = new Thread(r, threadPrefix + "-boss-" + count.incrementAndGet());
+ public Thread newThread(final Runnable r) {
+ final Thread t = new Thread(r, threadPrefix + "-boss-" + count.incrementAndGet());
t.setDaemon(true);
return t;
}
@@ -53,57 +90,60 @@ public abstract class AbstractRemotingServer {
return bossGroup;
}
- private EventLoopGroup initIOGroup(String threadPrefix) {
- ioGroup = new NioEventLoopGroup(Runtime.getRuntime().availableProcessors(), new ThreadFactory() {
- AtomicInteger count = new AtomicInteger(0);
+ private EventLoopGroup initIOGroup(final String threadPrefix, final int threadNum) {
+ ioGroup = new NioEventLoopGroup(threadNum, new ThreadFactory() {
+ private final AtomicInteger count = new AtomicInteger(0);
@Override
- public Thread newThread(Runnable r) {
+ public Thread newThread(final Runnable r) {
return new Thread(r, threadPrefix + "-io-" + count.incrementAndGet());
}
});
return ioGroup;
}
- private EventLoopGroup initWorkerGroup(String threadPrefix) {
- workerGroup = new NioEventLoopGroup(Runtime.getRuntime().availableProcessors(), new ThreadFactory() {
- AtomicInteger count = new AtomicInteger(0);
+ private EventLoopGroup initWorkerGroup(final String threadPrefix, final int threadNum) {
+ workerGroup = new NioEventLoopGroup(threadNum, new ThreadFactory() {
+ private final AtomicInteger count = new AtomicInteger(0);
@Override
- public Thread newThread(Runnable r) {
- Thread t = new Thread(r, threadPrefix + "-worker-" + count.incrementAndGet());
- return t;
+ public Thread newThread(final Runnable r) {
+ return new Thread(r, threadPrefix + "-worker-" + count.incrementAndGet());
}
});
return workerGroup;
}
- public void init(String threadPrefix) throws Exception {
+ public void init(final String threadPrefix) throws Exception {
initBossGroup(threadPrefix);
- initIOGroup(threadPrefix);
- initWorkerGroup(threadPrefix);
+ initIOGroup(threadPrefix, MAX_THREADS);
+ initWorkerGroup(threadPrefix, MAX_THREADS);
}
public void shutdown() throws Exception {
if (bossGroup != null) {
bossGroup.shutdownGracefully();
- log.info("shutdown bossGroup");
+ if (LOGGER.isInfoEnabled()) {
+ LOGGER.info("shutdown bossGroup");
+ }
}
- ThreadUtils.randomSleep(30);
+ ThreadUtils.randomSleep(DEFAULT_SLEEP_SECONDS);
if (ioGroup != null) {
ioGroup.shutdownGracefully();
- log.info("shutdown ioGroup");
+ if (LOGGER.isInfoEnabled()) {
+ LOGGER.info("shutdown ioGroup");
+ }
}
if (workerGroup != null) {
workerGroup.shutdownGracefully();
- log.info("shutdown workerGroup");
+ if (LOGGER.isInfoEnabled()) {
+ LOGGER.info("shutdown workerGroup");
+ }
}
}
- public void start() throws Exception {
-
- }
+ public abstract void start() throws Exception;
}
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshTCPServer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshTCPServer.java
index fcc8e7485..01c64f579 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshTCPServer.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshTCPServer.java
@@ -48,6 +48,8 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import org.assertj.core.util.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.PooledByteBufAllocator;
@@ -56,6 +58,7 @@ import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.handler.traffic.ChannelTrafficShapingHandler;
@@ -63,36 +66,36 @@ import io.netty.handler.traffic.GlobalTrafficShapingHandler;
import com.google.common.util.concurrent.RateLimiter;
-import lombok.extern.slf4j.Slf4j;
-@Slf4j
public class EventMeshTCPServer extends AbstractRemotingServer {
+ private static final Logger LOGGER = LoggerFactory.getLogger(EventMeshTCPServer.class);
private ClientSessionGroupMapping clientSessionGroupMapping;
- private EventMeshTcpRetryer eventMeshTcpRetryer;
+ private transient EventMeshTcpRetryer eventMeshTcpRetryer;
- private EventMeshTcpMonitor eventMeshTcpMonitor;
+ private transient EventMeshTcpMonitor eventMeshTcpMonitor;
- private ClientManageController clientManageController;
+ private transient ClientManageController clientManageController;
- private EventMeshServer eventMeshServer;
+ private final transient EventMeshServer eventMeshServer;
- private EventMeshTCPConfiguration eventMeshTCPConfiguration;
+ private final transient EventMeshTCPConfiguration eventMeshTCPConfiguration;
- private GlobalTrafficShapingHandler globalTrafficShapingHandler;
+ private transient GlobalTrafficShapingHandler globalTrafficShapingHandler;
- private ScheduledExecutorService scheduler;
+ private transient ScheduledExecutorService scheduler;
- private ExecutorService taskHandleExecutorService;
+ private transient ExecutorService taskHandleExecutorService;
- private ExecutorService broadcastMsgDownstreamExecutorService;
+ private transient ExecutorService broadcastMsgDownstreamExecutorService;
- private Registry registry;
+ private final transient Registry registry;
- private EventMeshRebalanceService eventMeshRebalanceService;
+ private transient EventMeshRebalanceService eventMeshRebalanceService;
+ private transient RateLimiter rateLimiter;
- public void setClientSessionGroupMapping(ClientSessionGroupMapping clientSessionGroupMapping) {
+ public void setClientSessionGroupMapping(final ClientSessionGroupMapping clientSessionGroupMapping) {
this.clientSessionGroupMapping = clientSessionGroupMapping;
}
@@ -100,7 +103,7 @@ public class EventMeshTCPServer extends AbstractRemotingServer {
return clientManageController;
}
- public void setClientManageController(ClientManageController clientManageController) {
+ public void setClientManageController(final ClientManageController clientManageController) {
this.clientManageController = clientManageController;
}
@@ -108,7 +111,7 @@ public class EventMeshTCPServer extends AbstractRemotingServer {
return scheduler;
}
- public void setScheduler(ScheduledExecutorService scheduler) {
+ public void setScheduler(final ScheduledExecutorService scheduler) {
this.scheduler = scheduler;
}
@@ -120,7 +123,7 @@ public class EventMeshTCPServer extends AbstractRemotingServer {
return broadcastMsgDownstreamExecutorService;
}
- public void setTaskHandleExecutorService(ExecutorService taskHandleExecutorService) {
+ public void setTaskHandleExecutorService(final ExecutorService taskHandleExecutorService) {
this.taskHandleExecutorService = taskHandleExecutorService;
}
@@ -128,71 +131,75 @@ public class EventMeshTCPServer extends AbstractRemotingServer {
return rateLimiter;
}
- public void setRateLimiter(RateLimiter rateLimiter) {
+ public void setRateLimiter(final RateLimiter rateLimiter) {
this.rateLimiter = rateLimiter;
}
- private RateLimiter rateLimiter;
- public EventMeshTCPServer(EventMeshServer eventMeshServer,
- EventMeshTCPConfiguration eventMeshTCPConfiguration, Registry registry) {
+ public EventMeshTCPServer(final EventMeshServer eventMeshServer,
+ final EventMeshTCPConfiguration eventMeshTCPConfiguration, final Registry registry) {
super();
this.eventMeshServer = eventMeshServer;
this.eventMeshTCPConfiguration = eventMeshTCPConfiguration;
this.registry = registry;
}
+ @Override
+ public EventLoopGroup getWorkerGroup() {
+ return this.getWorkerGroup();
+ }
+
private void startServer() {
Runnable r = () -> {
ServerBootstrap bootstrap = new ServerBootstrap();
ChannelInitializer channelInitializer = new ChannelInitializer() {
@Override
- public void initChannel(Channel ch) throws Exception {
+ public void initChannel(final Channel ch) throws Exception {
ch.pipeline()
- .addLast(new Codec.Encoder())
- .addLast(new Codec.Decoder())
- .addLast("global-traffic-shaping", globalTrafficShapingHandler)
- .addLast("channel-traffic-shaping", newCTSHandler())
- .addLast(new EventMeshTcpConnectionHandler(EventMeshTCPServer.this))
- .addLast(
- workerGroup,
- new IdleStateHandler(
- eventMeshTCPConfiguration.eventMeshTcpIdleReadSeconds,
- eventMeshTCPConfiguration.eventMeshTcpIdleWriteSeconds,
- eventMeshTCPConfiguration.eventMeshTcpIdleAllSeconds),
- new EventMeshTcpMessageDispatcher(EventMeshTCPServer.this),
- new EventMeshTcpExceptionHandler(EventMeshTCPServer.this)
- );
+ .addLast(new Codec.Encoder())
+ .addLast(new Codec.Decoder())
+ .addLast("global-traffic-shaping", globalTrafficShapingHandler)
+ .addLast("channel-traffic-shaping", newCTSHandler(eventMeshTCPConfiguration.getCtc().getReadLimit()))
+ .addLast(new EventMeshTcpConnectionHandler(EventMeshTCPServer.this))
+ .addLast(
+ getWorkerGroup(),
+ new IdleStateHandler(
+ eventMeshTCPConfiguration.eventMeshTcpIdleReadSeconds,
+ eventMeshTCPConfiguration.eventMeshTcpIdleWriteSeconds,
+ eventMeshTCPConfiguration.eventMeshTcpIdleAllSeconds),
+ new EventMeshTcpMessageDispatcher(EventMeshTCPServer.this),
+ new EventMeshTcpExceptionHandler(EventMeshTCPServer.this)
+ );
}
};
- bootstrap.group(bossGroup, ioGroup)
- .channel(NioServerSocketChannel.class)
- .option(ChannelOption.SO_BACKLOG, 128)
- .option(ChannelOption.SO_REUSEADDR, true)
- .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000)
- .childOption(ChannelOption.SO_KEEPALIVE, false)
- .childOption(ChannelOption.SO_LINGER, 0)
- .childOption(ChannelOption.SO_TIMEOUT, 600000)
- .childOption(ChannelOption.TCP_NODELAY, true)
- .childOption(ChannelOption.SO_SNDBUF, 65535 * 4)
- .childOption(ChannelOption.SO_RCVBUF, 65535 * 4)
- .option(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(2048, 4096, 65536))
- .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
- .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
- .childHandler(channelInitializer);
+ bootstrap.group(this.getBossGroup(), this.getIoGroup())
+ .channel(NioServerSocketChannel.class)
+ .option(ChannelOption.SO_BACKLOG, 128)
+ .option(ChannelOption.SO_REUSEADDR, true)
+ .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10_000)
+ .childOption(ChannelOption.SO_KEEPALIVE, false)
+ .childOption(ChannelOption.SO_LINGER, 0)
+ .childOption(ChannelOption.SO_TIMEOUT, 600_000)
+ .childOption(ChannelOption.TCP_NODELAY, true)
+ .childOption(ChannelOption.SO_SNDBUF, 65_535 * 4)
+ .childOption(ChannelOption.SO_RCVBUF, 65_535 * 4)
+ .option(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(2_048, 4_096, 65_536))
+ .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
+ .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
+ .childHandler(channelInitializer);
try {
int port = eventMeshTCPConfiguration.eventMeshTcpServerPort;
ChannelFuture f = bootstrap.bind(port).sync();
- log.info("EventMeshTCPServer[port={}] started.....", port);
+ LOGGER.info("EventMeshTCPServer[port={}] started.....", port);
f.channel().closeFuture().sync();
} catch (Exception e) {
- log.error("EventMeshTCPServer RemotingServer Start Err!", e);
+ LOGGER.error("EventMeshTCPServer RemotingServer Start Err!", e);
try {
shutdown();
- } catch (Exception e1) {
- log.error("EventMeshTCPServer RemotingServer shutdown Err!", e);
+ } catch (Exception ex) {
+ LOGGER.error("EventMeshTCPServer RemotingServer shutdown Err!", ex);
}
}
};
@@ -202,18 +209,20 @@ public class EventMeshTCPServer extends AbstractRemotingServer {
}
public void init() throws Exception {
- log.info("==================EventMeshTCPServer Initialing==================");
+ if (LOGGER.isInfoEnabled()) {
+ LOGGER.info("==================EventMeshTCPServer Initialing==================");
+ }
initThreadPool();
rateLimiter = RateLimiter.create(eventMeshTCPConfiguration.eventMeshTcpMsgReqnumPerSecond);
- globalTrafficShapingHandler = newGTSHandler();
+ globalTrafficShapingHandler = newGTSHandler(scheduler, eventMeshTCPConfiguration.getGtc().getReadLimit());
+
-
AdminWebHookConfigOperationManage adminWebHookConfigOperationManage = new AdminWebHookConfigOperationManage();
adminWebHookConfigOperationManage.setConfigurationWrapper(eventMeshTCPConfiguration.getConfigurationWrapper());
adminWebHookConfigOperationManage.init();
-
+
clientManageController = new ClientManageController(this);
clientManageController.setAdminWebHookConfigOperationManage(adminWebHookConfigOperationManage);
@@ -226,18 +235,21 @@ public class EventMeshTCPServer extends AbstractRemotingServer {
// The MetricsRegistry is singleton, so we can use factory method to get.
final List<MetricsRegistry> metricsRegistries = Lists.newArrayList();
Optional.ofNullable(eventMeshTCPConfiguration.getEventMeshMetricsPluginType())
- .ifPresent(
- metricsPlugins -> metricsPlugins.forEach(
- pluginType -> metricsRegistries.add(MetricsPluginFactory.getMetricsRegistry(pluginType))));
+ .ifPresent(
+ metricsPlugins -> metricsPlugins.forEach(
+ pluginType -> metricsRegistries.add(MetricsPluginFactory.getMetricsRegistry(pluginType))));
eventMeshTcpMonitor = new EventMeshTcpMonitor(this, metricsRegistries);
eventMeshTcpMonitor.init();
if (eventMeshTCPConfiguration.isEventMeshServerRegistryEnable()) {
eventMeshRebalanceService = new EventMeshRebalanceService(this,
- new EventmeshRebalanceImpl(this));
+ new EventmeshRebalanceImpl(this));
eventMeshRebalanceService.init();
}
- log.info("--------------------------EventMeshTCPServer Inited");
+
+ if (LOGGER.isInfoEnabled()) {
+ LOGGER.info("--------------------------EventMeshTCPServer Inited");
+ }
}
@Override
@@ -257,14 +269,16 @@ public class EventMeshTCPServer extends AbstractRemotingServer {
eventMeshRebalanceService.start();
}
- log.info("--------------------------EventMeshTCPServer Started");
+ if (LOGGER.isInfoEnabled()) {
+ LOGGER.info("--------------------------EventMeshTCPServer Started");
+ }
}
@Override
public void shutdown() throws Exception {
- if (bossGroup != null) {
- bossGroup.shutdownGracefully();
- log.info("shutdown bossGroup, no client is allowed to connect access server");
+ if (this.getBossGroup() != null) {
+ this.getBossGroup().shutdownGracefully();
+ LOGGER.info("shutdown bossGroup, no client is allowed to connect access server");
}
if (eventMeshTCPConfiguration.isEventMeshServerRegistryEnable()) {
@@ -277,18 +291,18 @@ public class EventMeshTCPServer extends AbstractRemotingServer {
try {
Thread.sleep(40 * 1000);
} catch (InterruptedException e) {
- log.error("interruptedException occurred while sleeping", e);
+ LOGGER.error("interruptedException occurred while sleeping", e);
}
globalTrafficShapingHandler.release();
- if (ioGroup != null) {
- ioGroup.shutdownGracefully();
- log.info("shutdown ioGroup");
+ if (this.getIoGroup() != null) {
+ this.getIoGroup().shutdownGracefully();
+ LOGGER.info("shutdown ioGroup");
}
- if (workerGroup != null) {
- workerGroup.shutdownGracefully();
- log.info("shutdown workerGroup");
+ if (this.getWorkerGroup() != null) {
+ this.getWorkerGroup().shutdownGracefully();
+ LOGGER.info("shutdown workerGroup");
}
eventMeshTcpRetryer.shutdown();
@@ -296,14 +310,16 @@ public class EventMeshTCPServer extends AbstractRemotingServer {
eventMeshTcpMonitor.shutdown();
shutdownThreadPool();
- log.info("--------------------------EventMeshTCPServer Shutdown");
+ if (LOGGER.isInfoEnabled()) {
+ LOGGER.info("--------------------------EventMeshTCPServer Shutdown");
+ }
}
public boolean register() {
boolean registerResult = false;
try {
String endPoints = IPUtils.getLocalAddress()
- + EventMeshConstants.IP_PORT_SEPARATOR + eventMeshTCPConfiguration.eventMeshTcpServerPort;
+ + EventMeshConstants.IP_PORT_SEPARATOR + eventMeshTCPConfiguration.eventMeshTcpServerPort;
EventMeshRegisterInfo eventMeshRegisterInfo = new EventMeshRegisterInfo();
eventMeshRegisterInfo.setEventMeshClusterName(eventMeshTCPConfiguration.getEventMeshCluster());
eventMeshRegisterInfo.setEventMeshName(eventMeshTCPConfiguration.getEventMeshName() + "-"
@@ -313,7 +329,7 @@ public class EventMeshTCPServer extends AbstractRemotingServer {
eventMeshRegisterInfo.setProtocolType(ConfigurationContextUtil.TCP);
registerResult = registry.register(eventMeshRegisterInfo);
} catch (Exception e) {
- log.warn("eventMesh register to registry failed", e);
+ LOGGER.error("eventMesh register to registry failed", e);
}
return registerResult;
@@ -321,7 +337,7 @@ public class EventMeshTCPServer extends AbstractRemotingServer {
private void unRegister() throws Exception {
String endPoints = IPUtils.getLocalAddress()
- + EventMeshConstants.IP_PORT_SEPARATOR + eventMeshTCPConfiguration.eventMeshTcpServerPort;
+ + EventMeshConstants.IP_PORT_SEPARATOR + eventMeshTCPConfiguration.eventMeshTcpServerPort;
EventMeshUnRegisterInfo eventMeshUnRegisterInfo = new EventMeshUnRegisterInfo();
eventMeshUnRegisterInfo.setEventMeshClusterName(eventMeshTCPConfiguration.getEventMeshCluster());
eventMeshUnRegisterInfo.setEventMeshName(eventMeshTCPConfiguration.getEventMeshName());
@@ -337,19 +353,19 @@ public class EventMeshTCPServer extends AbstractRemotingServer {
super.init("eventMesh-tcp");
scheduler = ThreadPoolFactory.createScheduledExecutor(eventMeshTCPConfiguration.eventMeshTcpGlobalScheduler,
- new EventMeshThreadFactoryImpl("eventMesh-tcp-scheduler", true));
+ new EventMeshThreadFactoryImpl("eventMesh-tcp-scheduler", true));
taskHandleExecutorService = ThreadPoolFactory.createThreadPoolExecutor(
- eventMeshTCPConfiguration.eventMeshTcpTaskHandleExecutorPoolSize,
- eventMeshTCPConfiguration.eventMeshTcpTaskHandleExecutorPoolSize,
- new LinkedBlockingQueue<>(10000),
- new EventMeshThreadFactoryImpl("eventMesh-tcp-task-handle", true));
+ eventMeshTCPConfiguration.eventMeshTcpTaskHandleExecutorPoolSize,
+ eventMeshTCPConfiguration.eventMeshTcpTaskHandleExecutorPoolSize,
+ new LinkedBlockingQueue<>(10_000),
+ new EventMeshThreadFactoryImpl("eventMesh-tcp-task-handle", true));
broadcastMsgDownstreamExecutorService = ThreadPoolFactory.createThreadPoolExecutor(
- eventMeshTCPConfiguration.eventMeshTcpMsgDownStreamExecutorPoolSize,
- eventMeshTCPConfiguration.eventMeshTcpMsgDownStreamExecutorPoolSize,
- new LinkedBlockingQueue<>(10000),
- new EventMeshThreadFactoryImpl("eventMesh-tcp-msg-downstream", true));
+ eventMeshTCPConfiguration.eventMeshTcpMsgDownStreamExecutorPoolSize,
+ eventMeshTCPConfiguration.eventMeshTcpMsgDownStreamExecutorPoolSize,
+ new LinkedBlockingQueue<>(10_000),
+ new EventMeshThreadFactoryImpl("eventMesh-tcp-msg-downstream", true));
}
private void shutdownThreadPool() {
@@ -357,27 +373,26 @@ public class EventMeshTCPServer extends AbstractRemotingServer {
taskHandleExecutorService.shutdown();
}
- private GlobalTrafficShapingHandler newGTSHandler() {
- GlobalTrafficShapingHandler handler = new GlobalTrafficShapingHandler(scheduler, 0,
- eventMeshTCPConfiguration.getGtc().getReadLimit()) {
+ private GlobalTrafficShapingHandler newGTSHandler(final ScheduledExecutorService executor, final long readLimit) {
+ GlobalTrafficShapingHandler handler = new GlobalTrafficShapingHandler(executor, 0,
+ readLimit) {
@Override
- protected long calculateSize(Object msg) {
+ protected long calculateSize(final Object msg) {
return 1;
}
};
- handler.setMaxTimeWait(1000);
+ handler.setMaxTimeWait(1_000);
return handler;
}
- private ChannelTrafficShapingHandler newCTSHandler() {
- ChannelTrafficShapingHandler handler = new ChannelTrafficShapingHandler(0,
- eventMeshTCPConfiguration.getCtc().getReadLimit()) {
+ private ChannelTrafficShapingHandler newCTSHandler(final long readLimit) {
+ ChannelTrafficShapingHandler handler = new ChannelTrafficShapingHandler(0, readLimit) {
@Override
- protected long calculateSize(Object msg) {
+ protected long calculateSize(final Object msg) {
return 1;
}
};
- handler.setMaxTimeWait(3000);
+ handler.setMaxTimeWait(3_000);
return handler;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org