You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by mi...@apache.org on 2022/12/04 13:37:15 UTC

[incubator-eventmesh] branch master updated: fix issue2369

This is an automated email from the ASF dual-hosted git repository.

mikexue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git


The following commit(s) were added to refs/heads/master by this push:
     new 560f156f9 fix issue2369
     new f8f8f5aac Merge pull request #2450 from jonyangx/issue2369
560f156f9 is described below

commit 560f156f9750d78e34f229c0743d83ced4416bba
Author: jonyangx <ya...@gmail.com>
AuthorDate: Sun Dec 4 09:58:18 2022 +0800

    fix issue2369
---
 .../tcp/client/EventMeshTcpMessageDispatcher.java  | 69 +++++++++++++---------
 1 file changed, 40 insertions(+), 29 deletions(-)

diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/EventMeshTcpMessageDispatcher.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/EventMeshTcpMessageDispatcher.java
index 0623d282c..006739b0c 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/EventMeshTcpMessageDispatcher.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/EventMeshTcpMessageDispatcher.java
@@ -50,8 +50,8 @@ import io.opentelemetry.api.trace.Span;
 
 public class EventMeshTcpMessageDispatcher extends SimpleChannelInboundHandler<Package> {
 
-    private final Logger logger = LoggerFactory.getLogger(this.getClass());
-    private final Logger messageLogger = LoggerFactory.getLogger("message");
+    private static final Logger LOGGER = LoggerFactory.getLogger(EventMeshTcpMessageDispatcher.class);
+    private static final Logger MESSAGE_LOGGER = LoggerFactory.getLogger("message");
     private EventMeshTCPServer eventMeshTCPServer;
 
     public EventMeshTcpMessageDispatcher(EventMeshTCPServer eventMeshTCPServer) {
@@ -64,7 +64,7 @@ public class EventMeshTcpMessageDispatcher extends SimpleChannelInboundHandler<P
         validateMsg(pkg);
 
         eventMeshTCPServer.getEventMeshTcpMonitor().getTcpSummaryMetrics()
-            .getClient2eventMeshMsgNum().incrementAndGet();
+                .getClient2eventMeshMsgNum().incrementAndGet();
 
         Command cmd = pkg.getHeader().getCmd();
         try {
@@ -72,9 +72,9 @@ public class EventMeshTcpMessageDispatcher extends SimpleChannelInboundHandler<P
 
             if (isNeedTrace(cmd)) {
                 pkg.getHeader().getProperties()
-                    .put(EventMeshConstants.REQ_C2EVENTMESH_TIMESTAMP, startTime);
+                        .put(EventMeshConstants.REQ_C2EVENTMESH_TIMESTAMP, startTime);
                 pkg.getHeader().getProperties().put(EventMeshConstants.REQ_SEND_EVENTMESH_IP,
-                    eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshServerIp);
+                        eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshServerIp);
                 Session session = eventMeshTCPServer.getClientSessionGroupMapping().getSession(ctx);
 
                 pkg.getHeader().getProperties().put(EventMeshConstants.REQ_SYS, session.getClient().getSubsystem());
@@ -83,42 +83,49 @@ public class EventMeshTcpMessageDispatcher extends SimpleChannelInboundHandler<P
                 pkg.getHeader().getProperties().put(EventMeshConstants.REQ_GROUP, session.getClient().getGroup());
             }
 
-            if (cmd.equals(Command.RECOMMEND_REQUEST)) {
-                messageLogger.info("pkg|c2eventMesh|cmd={}|pkg={}", cmd, pkg);
+            if (Command.RECOMMEND_REQUEST == cmd) {
+                if (MESSAGE_LOGGER.isInfoEnabled()) {
+                    MESSAGE_LOGGER.info("pkg|c2eventMesh|cmd={}|pkg={}", cmd, pkg);
+                }
                 task = new RecommendTask(pkg, ctx, startTime, eventMeshTCPServer);
                 eventMeshTCPServer.getTaskHandleExecutorService().submit(task);
                 return;
             }
-            if (cmd.equals(Command.HELLO_REQUEST)) {
-                messageLogger.info("pkg|c2eventMesh|cmd={}|pkg={}", cmd, pkg);
+            
+            if (Command.HELLO_REQUEST == cmd) {
+                if (MESSAGE_LOGGER.isInfoEnabled()) {
+                    MESSAGE_LOGGER.info("pkg|c2eventMesh|cmd={}|pkg={}", cmd, pkg);
+                }
                 task = new HelloTask(pkg, ctx, startTime, eventMeshTCPServer);
                 eventMeshTCPServer.getTaskHandleExecutorService().submit(task);
                 return;
             }
 
             if (eventMeshTCPServer.getClientSessionGroupMapping().getSession(ctx) == null) {
-                messageLogger.info("pkg|c2eventMesh|cmd={}|pkg={},no session is found", cmd, pkg);
+                if (MESSAGE_LOGGER.isInfoEnabled()) {
+                    MESSAGE_LOGGER.info("pkg|c2eventMesh|cmd={}|pkg={},no session is found", cmd, pkg);
+                }
                 throw new Exception("no session is found");
             }
 
             logMessageFlow(ctx, pkg, cmd);
 
             if (eventMeshTCPServer.getClientSessionGroupMapping().getSession(ctx)
-                .getSessionState() == SessionState.CLOSED) {
+                    .getSessionState() == SessionState.CLOSED) {
                 throw new Exception(
-                    "this eventMesh tcp session will be closed, may be reboot or version change!");
+                        "this eventMesh tcp session will be closed, may be reboot or version change!");
             }
 
             dispatch(ctx, pkg, startTime, cmd);
         } catch (Exception e) {
-            logger.error("exception occurred while pkg|cmd={}|pkg={}", cmd, pkg, e);
+            LOGGER.error("exception occurred while pkg|cmd={}|pkg={}", cmd, pkg, e);
 
             if (isNeedTrace(cmd)) {
                 Span span = TraceUtils.prepareServerSpan(pkg.getHeader().getProperties(),
-                    EventMeshTraceConstants.TRACE_UPSTREAM_EVENTMESH_SERVER_SPAN, startTime,
-                    TimeUnit.MILLISECONDS, false);
+                        EventMeshTraceConstants.TRACE_UPSTREAM_EVENTMESH_SERVER_SPAN, startTime,
+                        TimeUnit.MILLISECONDS, false);
                 TraceUtils.finishSpanWithException(span, pkg.getHeader().getProperties(),
-                    "exception occurred while dispatch pkg", e);
+                        "exception occurred while dispatch pkg", e);
             }
 
             writeToClient(cmd, pkg, ctx, e);
@@ -127,9 +134,9 @@ public class EventMeshTcpMessageDispatcher extends SimpleChannelInboundHandler<P
 
     private boolean isNeedTrace(Command cmd) {
         if (eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshServerTraceEnable
-            && cmd != null && (Command.REQUEST_TO_SERVER == cmd
-            || Command.ASYNC_MESSAGE_TO_SERVER == cmd
-            || Command.BROADCAST_MESSAGE_TO_SERVER == cmd)) {
+                && cmd != null && (Command.REQUEST_TO_SERVER == cmd
+                || Command.ASYNC_MESSAGE_TO_SERVER == cmd
+                || Command.BROADCAST_MESSAGE_TO_SERVER == cmd)) {
             return true;
         }
         return false;
@@ -139,10 +146,10 @@ public class EventMeshTcpMessageDispatcher extends SimpleChannelInboundHandler<P
         try {
             Package res = new Package();
             res.setHeader(new Header(getReplyCommand(cmd), OPStatus.FAIL.getCode(), e.toString(),
-                pkg.getHeader().getSeq()));
+                    pkg.getHeader().getSeq()));
             ctx.writeAndFlush(res);
         } catch (Exception ex) {
-            logger.warn("writeToClient failed", ex);
+            LOGGER.warn("writeToClient failed", ex);
         }
     }
 
@@ -175,12 +182,16 @@ public class EventMeshTcpMessageDispatcher extends SimpleChannelInboundHandler<P
 
     private void logMessageFlow(ChannelHandlerContext ctx, Package pkg, Command cmd) {
         if (pkg.getBody() instanceof EventMeshMessage) {
-            messageLogger.info("pkg|c2eventMesh|cmd={}|Msg={}|user={}", cmd,
-                EventMeshUtil.printMqMessage((EventMeshMessage) pkg.getBody()),
-                eventMeshTCPServer.getClientSessionGroupMapping().getSession(ctx).getClient());
+            if (MESSAGE_LOGGER.isInfoEnabled()) {
+                MESSAGE_LOGGER.info("pkg|c2eventMesh|cmd={}|Msg={}|user={}", cmd,
+                        EventMeshUtil.printMqMessage((EventMeshMessage) pkg.getBody()),
+                        eventMeshTCPServer.getClientSessionGroupMapping().getSession(ctx).getClient());
+            }
         } else {
-            messageLogger.info("pkg|c2eventMesh|cmd={}|pkg={}|user={}", cmd, pkg,
-                eventMeshTCPServer.getClientSessionGroupMapping().getSession(ctx).getClient());
+            if (MESSAGE_LOGGER.isInfoEnabled()) {
+                MESSAGE_LOGGER.info("pkg|c2eventMesh|cmd={}|pkg={}|user={}", cmd, pkg,
+                        eventMeshTCPServer.getClientSessionGroupMapping().getSession(ctx).getClient());
+            }
         }
     }
 
@@ -189,17 +200,17 @@ public class EventMeshTcpMessageDispatcher extends SimpleChannelInboundHandler<P
             throw new Exception("the incoming message is empty.");
         }
         if (pkg.getHeader() == null) {
-            logger.error("the incoming message does not have a header|pkg={}", pkg);
+            LOGGER.error("the incoming message does not have a header|pkg={}", pkg);
             throw new Exception("the incoming message does not have a header.");
         }
         if (pkg.getHeader().getCmd() == null) {
-            logger.error("the incoming message does not have a command type|pkg={}", pkg);
+            LOGGER.error("the incoming message does not have a command type|pkg={}", pkg);
             throw new Exception("the incoming message does not have a command type.");
         }
     }
 
     private void dispatch(ChannelHandlerContext ctx, Package pkg, long startTime, Command cmd)
-        throws Exception {
+            throws Exception {
         Runnable task;
         switch (cmd) {
             case HEARTBEAT_REQUEST:


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org