You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by mi...@apache.org on 2022/12/04 13:37:15 UTC
[incubator-eventmesh] branch master updated: fix issue2369
This is an automated email from the ASF dual-hosted git repository.
mikexue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git
The following commit(s) were added to refs/heads/master by this push:
new 560f156f9 fix issue2369
new f8f8f5aac Merge pull request #2450 from jonyangx/issue2369
560f156f9 is described below
commit 560f156f9750d78e34f229c0743d83ced4416bba
Author: jonyangx <ya...@gmail.com>
AuthorDate: Sun Dec 4 09:58:18 2022 +0800
fix issue2369
---
.../tcp/client/EventMeshTcpMessageDispatcher.java | 69 +++++++++++++---------
1 file changed, 40 insertions(+), 29 deletions(-)
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/EventMeshTcpMessageDispatcher.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/EventMeshTcpMessageDispatcher.java
index 0623d282c..006739b0c 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/EventMeshTcpMessageDispatcher.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/EventMeshTcpMessageDispatcher.java
@@ -50,8 +50,8 @@ import io.opentelemetry.api.trace.Span;
public class EventMeshTcpMessageDispatcher extends SimpleChannelInboundHandler<Package> {
- private final Logger logger = LoggerFactory.getLogger(this.getClass());
- private final Logger messageLogger = LoggerFactory.getLogger("message");
+ private static final Logger LOGGER = LoggerFactory.getLogger(EventMeshTcpMessageDispatcher.class);
+ private static final Logger MESSAGE_LOGGER = LoggerFactory.getLogger("message");
private EventMeshTCPServer eventMeshTCPServer;
public EventMeshTcpMessageDispatcher(EventMeshTCPServer eventMeshTCPServer) {
@@ -64,7 +64,7 @@ public class EventMeshTcpMessageDispatcher extends SimpleChannelInboundHandler<P
validateMsg(pkg);
eventMeshTCPServer.getEventMeshTcpMonitor().getTcpSummaryMetrics()
- .getClient2eventMeshMsgNum().incrementAndGet();
+ .getClient2eventMeshMsgNum().incrementAndGet();
Command cmd = pkg.getHeader().getCmd();
try {
@@ -72,9 +72,9 @@ public class EventMeshTcpMessageDispatcher extends SimpleChannelInboundHandler<P
if (isNeedTrace(cmd)) {
pkg.getHeader().getProperties()
- .put(EventMeshConstants.REQ_C2EVENTMESH_TIMESTAMP, startTime);
+ .put(EventMeshConstants.REQ_C2EVENTMESH_TIMESTAMP, startTime);
pkg.getHeader().getProperties().put(EventMeshConstants.REQ_SEND_EVENTMESH_IP,
- eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshServerIp);
+ eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshServerIp);
Session session = eventMeshTCPServer.getClientSessionGroupMapping().getSession(ctx);
pkg.getHeader().getProperties().put(EventMeshConstants.REQ_SYS, session.getClient().getSubsystem());
@@ -83,42 +83,49 @@ public class EventMeshTcpMessageDispatcher extends SimpleChannelInboundHandler<P
pkg.getHeader().getProperties().put(EventMeshConstants.REQ_GROUP, session.getClient().getGroup());
}
- if (cmd.equals(Command.RECOMMEND_REQUEST)) {
- messageLogger.info("pkg|c2eventMesh|cmd={}|pkg={}", cmd, pkg);
+ if (Command.RECOMMEND_REQUEST == cmd) {
+ if (MESSAGE_LOGGER.isInfoEnabled()) {
+ MESSAGE_LOGGER.info("pkg|c2eventMesh|cmd={}|pkg={}", cmd, pkg);
+ }
task = new RecommendTask(pkg, ctx, startTime, eventMeshTCPServer);
eventMeshTCPServer.getTaskHandleExecutorService().submit(task);
return;
}
- if (cmd.equals(Command.HELLO_REQUEST)) {
- messageLogger.info("pkg|c2eventMesh|cmd={}|pkg={}", cmd, pkg);
+
+ if (Command.HELLO_REQUEST == cmd) {
+ if (MESSAGE_LOGGER.isInfoEnabled()) {
+ MESSAGE_LOGGER.info("pkg|c2eventMesh|cmd={}|pkg={}", cmd, pkg);
+ }
task = new HelloTask(pkg, ctx, startTime, eventMeshTCPServer);
eventMeshTCPServer.getTaskHandleExecutorService().submit(task);
return;
}
if (eventMeshTCPServer.getClientSessionGroupMapping().getSession(ctx) == null) {
- messageLogger.info("pkg|c2eventMesh|cmd={}|pkg={},no session is found", cmd, pkg);
+ if (MESSAGE_LOGGER.isInfoEnabled()) {
+ MESSAGE_LOGGER.info("pkg|c2eventMesh|cmd={}|pkg={},no session is found", cmd, pkg);
+ }
throw new Exception("no session is found");
}
logMessageFlow(ctx, pkg, cmd);
if (eventMeshTCPServer.getClientSessionGroupMapping().getSession(ctx)
- .getSessionState() == SessionState.CLOSED) {
+ .getSessionState() == SessionState.CLOSED) {
throw new Exception(
- "this eventMesh tcp session will be closed, may be reboot or version change!");
+ "this eventMesh tcp session will be closed, may be reboot or version change!");
}
dispatch(ctx, pkg, startTime, cmd);
} catch (Exception e) {
- logger.error("exception occurred while pkg|cmd={}|pkg={}", cmd, pkg, e);
+ LOGGER.error("exception occurred while pkg|cmd={}|pkg={}", cmd, pkg, e);
if (isNeedTrace(cmd)) {
Span span = TraceUtils.prepareServerSpan(pkg.getHeader().getProperties(),
- EventMeshTraceConstants.TRACE_UPSTREAM_EVENTMESH_SERVER_SPAN, startTime,
- TimeUnit.MILLISECONDS, false);
+ EventMeshTraceConstants.TRACE_UPSTREAM_EVENTMESH_SERVER_SPAN, startTime,
+ TimeUnit.MILLISECONDS, false);
TraceUtils.finishSpanWithException(span, pkg.getHeader().getProperties(),
- "exception occurred while dispatch pkg", e);
+ "exception occurred while dispatch pkg", e);
}
writeToClient(cmd, pkg, ctx, e);
@@ -127,9 +134,9 @@ public class EventMeshTcpMessageDispatcher extends SimpleChannelInboundHandler<P
private boolean isNeedTrace(Command cmd) {
if (eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshServerTraceEnable
- && cmd != null && (Command.REQUEST_TO_SERVER == cmd
- || Command.ASYNC_MESSAGE_TO_SERVER == cmd
- || Command.BROADCAST_MESSAGE_TO_SERVER == cmd)) {
+ && cmd != null && (Command.REQUEST_TO_SERVER == cmd
+ || Command.ASYNC_MESSAGE_TO_SERVER == cmd
+ || Command.BROADCAST_MESSAGE_TO_SERVER == cmd)) {
return true;
}
return false;
@@ -139,10 +146,10 @@ public class EventMeshTcpMessageDispatcher extends SimpleChannelInboundHandler<P
try {
Package res = new Package();
res.setHeader(new Header(getReplyCommand(cmd), OPStatus.FAIL.getCode(), e.toString(),
- pkg.getHeader().getSeq()));
+ pkg.getHeader().getSeq()));
ctx.writeAndFlush(res);
} catch (Exception ex) {
- logger.warn("writeToClient failed", ex);
+ LOGGER.warn("writeToClient failed", ex);
}
}
@@ -175,12 +182,16 @@ public class EventMeshTcpMessageDispatcher extends SimpleChannelInboundHandler<P
private void logMessageFlow(ChannelHandlerContext ctx, Package pkg, Command cmd) {
if (pkg.getBody() instanceof EventMeshMessage) {
- messageLogger.info("pkg|c2eventMesh|cmd={}|Msg={}|user={}", cmd,
- EventMeshUtil.printMqMessage((EventMeshMessage) pkg.getBody()),
- eventMeshTCPServer.getClientSessionGroupMapping().getSession(ctx).getClient());
+ if (MESSAGE_LOGGER.isInfoEnabled()) {
+ MESSAGE_LOGGER.info("pkg|c2eventMesh|cmd={}|Msg={}|user={}", cmd,
+ EventMeshUtil.printMqMessage((EventMeshMessage) pkg.getBody()),
+ eventMeshTCPServer.getClientSessionGroupMapping().getSession(ctx).getClient());
+ }
} else {
- messageLogger.info("pkg|c2eventMesh|cmd={}|pkg={}|user={}", cmd, pkg,
- eventMeshTCPServer.getClientSessionGroupMapping().getSession(ctx).getClient());
+ if (MESSAGE_LOGGER.isInfoEnabled()) {
+ MESSAGE_LOGGER.info("pkg|c2eventMesh|cmd={}|pkg={}|user={}", cmd, pkg,
+ eventMeshTCPServer.getClientSessionGroupMapping().getSession(ctx).getClient());
+ }
}
}
@@ -189,17 +200,17 @@ public class EventMeshTcpMessageDispatcher extends SimpleChannelInboundHandler<P
throw new Exception("the incoming message is empty.");
}
if (pkg.getHeader() == null) {
- logger.error("the incoming message does not have a header|pkg={}", pkg);
+ LOGGER.error("the incoming message does not have a header|pkg={}", pkg);
throw new Exception("the incoming message does not have a header.");
}
if (pkg.getHeader().getCmd() == null) {
- logger.error("the incoming message does not have a command type|pkg={}", pkg);
+ LOGGER.error("the incoming message does not have a command type|pkg={}", pkg);
throw new Exception("the incoming message does not have a command type.");
}
}
private void dispatch(ChannelHandlerContext ctx, Package pkg, long startTime, Command cmd)
- throws Exception {
+ throws Exception {
Runnable task;
switch (cmd) {
case HEARTBEAT_REQUEST:
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org