You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by mi...@apache.org on 2022/12/06 02:11:24 UTC

[incubator-eventmesh] branch master updated: fix issue2366

This is an automated email from the ASF dual-hosted git repository.

mikexue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git


The following commit(s) were added to refs/heads/master by this push:
     new 39227c6cd fix issue2366
     new 0a3d4b116 Merge pull request #2465 from jonyangx/issue2366
39227c6cd is described below

commit 39227c6cdfee6af70eeb8d36c9465e638fd1045f
Author: jonyangx <ya...@gmail.com>
AuthorDate: Mon Dec 5 21:14:47 2022 +0800

    fix issue2366
---
 .../core/protocol/tcp/client/session/Session.java  | 90 +++++++++++-----------
 1 file changed, 47 insertions(+), 43 deletions(-)

diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/Session.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/Session.java
index f9a065e40..52b8d751b 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/Session.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/Session.java
@@ -53,11 +53,11 @@ import io.netty.channel.ChannelHandlerContext;
 
 public class Session {
 
-    protected final Logger messageLogger = LoggerFactory.getLogger("message");
+    protected static final Logger MESSAGE_LOGGER = LoggerFactory.getLogger("message");
 
-    private final Logger subscribeLogger = LoggerFactory.getLogger("subscribeLogger");
+    private static final Logger SUBSCRIB_LOGGER = LoggerFactory.getLogger("subscribeLogger");
 
-    private final Logger logger = LoggerFactory.getLogger(this.getClass());
+    private static final Logger LOGGER = LoggerFactory.getLogger(Session.class);
 
     private UserAgent client;
 
@@ -170,10 +170,11 @@ public class Session {
             sessionContext.subscribeTopics.putIfAbsent(item.getTopic(), item);
             Objects.requireNonNull(clientGroupWrapper.get()).subscribe(item);
 
-            Objects.requireNonNull(clientGroupWrapper.get()).getMqProducerWrapper().getMeshMQProducer().checkTopicExist(item.getTopic());
+            Objects.requireNonNull(clientGroupWrapper.get()).getMqProducerWrapper().getMeshMQProducer()
+                    .checkTopicExist(item.getTopic());
 
             Objects.requireNonNull(clientGroupWrapper.get()).addSubscription(item, this);
-            subscribeLogger.info("subscribe|succeed|topic={}|user={}", item.getTopic(), client);
+            SUBSCRIB_LOGGER.info("subscribe|succeed|topic={}|user={}", item.getTopic(), client);
         }
     }
 
@@ -184,12 +185,13 @@ public class Session {
 
             if (!Objects.requireNonNull(clientGroupWrapper.get()).hasSubscription(item.getTopic())) {
                 Objects.requireNonNull(clientGroupWrapper.get()).unsubscribe(item);
-                subscribeLogger.info("unSubscribe|succeed|topic={}|lastUser={}", item.getTopic(), client);
+                SUBSCRIB_LOGGER.info("unSubscribe|succeed|topic={}|lastUser={}", item.getTopic(), client);
             }
         }
     }
 
-    public EventMeshTcpSendResult upstreamMsg(Header header, CloudEvent event, SendCallback sendCallback, long startTime, long taskExecuteTime) {
+    public EventMeshTcpSendResult upstreamMsg(Header header, CloudEvent event, SendCallback sendCallback,
+                                              long startTime, long taskExecuteTime) {
         String topic = event.getSubject();
         sessionContext.sendTopics.putIfAbsent(topic, topic);
         return sender.send(header, event, sendCallback, startTime, taskExecuteTime);
@@ -197,7 +199,8 @@ public class Session {
 
     public void downstreamMsg(DownStreamMsgContext downStreamMsgContext) {
         long currTime = System.currentTimeMillis();
-        trySendListenResponse(new Header(LISTEN_RESPONSE, OPStatus.SUCCESS.getCode(), "succeed", getListenRequestSeq()), currTime, currTime);
+        trySendListenResponse(new Header(LISTEN_RESPONSE, OPStatus.SUCCESS.getCode(), "succeed",
+                getListenRequestSeq()), currTime, currTime);
 
         pusher.push(downStreamMsgContext);
     }
@@ -209,51 +212,52 @@ public class Session {
     public void write2Client(final Package pkg) {
 
         try {
-            if (SessionState.CLOSED.equals(sessionState)) {
+            if (SessionState.CLOSED == sessionState) {
                 return;
             }
+
             context.writeAndFlush(pkg).addListener(
-                new ChannelFutureListener() {
-                    @Override
-                    public void operationComplete(ChannelFuture future) throws Exception {
-                        if (!future.isSuccess()) {
-                            messageLogger.error("write2Client fail, pkg[{}] session[{}]", pkg, this);
-                        } else {
-                            Objects.requireNonNull(clientGroupWrapper.get())
-                                .getEventMeshTcpMonitor()
-                                .getTcpSummaryMetrics()
-                                .getEventMesh2clientMsgNum()
-                                .incrementAndGet();
+                    new ChannelFutureListener() {
+                        @Override
+                        public void operationComplete(ChannelFuture future) throws Exception {
+                            if (!future.isSuccess()) {
+                                MESSAGE_LOGGER.error("write2Client fail, pkg[{}] session[{}]", pkg, this);
+                            } else {
+                                Objects.requireNonNull(clientGroupWrapper.get())
+                                        .getEventMeshTcpMonitor()
+                                        .getTcpSummaryMetrics()
+                                        .getEventMesh2clientMsgNum()
+                                        .incrementAndGet();
+                            }
                         }
                     }
-                }
             );
         } catch (Exception e) {
-            logger.error("exception while write2Client", e);
+            LOGGER.error("exception while write2Client", e);
         }
     }
 
     @Override
     public String toString() {
         return "Session{"
-            +
-            "sysId=" + Objects.requireNonNull(clientGroupWrapper.get()).getSysId()
-            +
-            ",remoteAddr=" + RemotingHelper.parseSocketAddressAddr(remoteAddress)
-            +
-            ",client=" + client
-            +
-            ",sessionState=" + sessionState
-            +
-            ",sessionContext=" + sessionContext
-            +
-            ",pusher=" + pusher
-            +
-            ",sender=" + sender
-            +
-            ",createTime=" + DateFormatUtils.format(createTime, EventMeshConstants.DATE_FORMAT)
-            +
-            ",lastHeartbeatTime=" + DateFormatUtils.format(lastHeartbeatTime, EventMeshConstants.DATE_FORMAT) + '}';
+                +
+                "sysId=" + Objects.requireNonNull(clientGroupWrapper.get()).getSysId()
+                +
+                ",remoteAddr=" + RemotingHelper.parseSocketAddressAddr(remoteAddress)
+                +
+                ",client=" + client
+                +
+                ",sessionState=" + sessionState
+                +
+                ",sessionContext=" + sessionContext
+                +
+                ",pusher=" + pusher
+                +
+                ",sender=" + sender
+                +
+                ",createTime=" + DateFormatUtils.format(createTime, EventMeshConstants.DATE_FORMAT)
+                +
+                ",lastHeartbeatTime=" + DateFormatUtils.format(lastHeartbeatTime, EventMeshConstants.DATE_FORMAT) + '}';
     }
 
     @Override
@@ -331,12 +335,12 @@ public class Session {
 
     public boolean isAvailable(String topic) {
         if (SessionState.CLOSED == sessionState) {
-            logger.warn("session is not available because session has been closed,topic:{},client:{}", topic, client);
+            LOGGER.warn("session is not available because session has been closed,topic:{},client:{}", topic, client);
             return false;
         }
 
         if (!sessionContext.subscribeTopics.containsKey(topic)) {
-            logger.warn("session is not available because session has not subscribe topic:{},client:{}", topic, client);
+            LOGGER.warn("session is not available because session has not subscribe topic:{},client:{}", topic, client);
             return false;
         }
 
@@ -345,7 +349,7 @@ public class Session {
 
     public boolean isRunning() {
         if (SessionState.RUNNING != sessionState) {
-            logger.warn("session is not running, state:{} client:{}", sessionState, client);
+            LOGGER.warn("session is not running, state:{} client:{}", sessionState, client);
             return false;
         }
         return true;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org