You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by mi...@apache.org on 2022/12/05 08:02:32 UTC

[incubator-eventmesh] branch master updated: fix issue2367

This is an automated email from the ASF dual-hosted git repository.

mikexue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git


The following commit(s) were added to refs/heads/master by this push:
     new e31b561e1 fix issue2367
     new ae1b5623d Merge pull request #2458 from jonyangx/issue2367
e31b561e1 is described below

commit e31b561e12758399df9f5ae994efc98b48de7ee7
Author: jonyangx <jo...@gmail.com>
AuthorDate: Mon Dec 5 09:26:17 2022 +0800

    fix issue2367
---
 .../client/group/ClientSessionGroupMapping.java    | 74 +++++++++++-----------
 1 file changed, 37 insertions(+), 37 deletions(-)

diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientSessionGroupMapping.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientSessionGroupMapping.java
index 9ca761657..a26de8fd3 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientSessionGroupMapping.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientSessionGroupMapping.java
@@ -52,8 +52,8 @@ import io.netty.channel.ChannelHandlerContext;
 
 public class ClientSessionGroupMapping {
 
-    private final Logger logger = LoggerFactory.getLogger(this.getClass());
-    private final Logger sessionLogger = LoggerFactory.getLogger("sessionLogger");
+    private static final Logger LOGGER = LoggerFactory.getLogger(ClientSessionGroupMapping.class);
+    private static final Logger SESSION_LOGGER = LoggerFactory.getLogger("sessionLogger");
 
     private ConcurrentHashMap<InetSocketAddress, Session> sessionTable = new ConcurrentHashMap<>();
 
@@ -96,14 +96,14 @@ public class ClientSessionGroupMapping {
         user.setPort(addr.getPort());
         Session session = null;
         if (!sessionTable.containsKey(addr)) {
-            logger.info("createSession client[{}]", RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
+            LOGGER.info("createSession client[{}]", RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
             session = new Session(user, ctx, eventMeshTCPServer.getEventMeshTCPConfiguration());
             initClientGroupWrapper(user, session);
             sessionTable.put(addr, session);
-            sessionLogger.info("session|open|succeed|user={}", user);
+            SESSION_LOGGER.info("session|open|succeed|user={}", user);
         } else {
             session = sessionTable.get(addr);
-            sessionLogger.error("session|open|failed|user={}|msg={}", user, "session has been created!");
+            SESSION_LOGGER.error("session|open|failed|user={}|msg={}", user, "session has been created!");
         }
         return session;
     }
@@ -121,15 +121,15 @@ public class ClientSessionGroupMapping {
         Session session = MapUtils.getObject(sessionTable, addr, null);
         if (session == null) {
             final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
-            logger.info("begin to close channel to remote address[{}]", remoteAddress);
+            LOGGER.info("begin to close channel to remote address[{}]", remoteAddress);
             ctx.channel().close().addListener(new ChannelFutureListener() {
                 @Override
                 public void operationComplete(ChannelFuture future) throws Exception {
-                    logger.info("close the connection to remote address[{}] result: {}", remoteAddress,
+                    LOGGER.info("close the connection to remote address[{}] result: {}", remoteAddress,
                             future.isSuccess());
                 }
             });
-            sessionLogger.info("session|close|succeed|address={}|msg={}", addr, "no session was found");
+            SESSION_LOGGER.info("session|close|succeed|address={}|msg={}", addr, "no session was found");
             return;
         }
 
@@ -138,13 +138,13 @@ public class ClientSessionGroupMapping {
         //remove session from sessionTable
         sessionTable.remove(addr);
 
-        sessionLogger.info("session|close|succeed|user={}", session.getClient());
+        SESSION_LOGGER.info("session|close|succeed|user={}", session.getClient());
     }
 
     private void closeSession(Session session) throws Exception {
         final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(session.getContext().channel());
         if (SessionState.CLOSED == session.getSessionState()) {
-            logger.info("session has been closed, addr:{}", remoteAddress);
+            LOGGER.info("session has been closed, addr:{}", remoteAddress);
             return;
         }
 
@@ -152,7 +152,7 @@ public class ClientSessionGroupMapping {
         synchronized (session) {
 
             if (SessionState.CLOSED == session.getSessionState()) {
-                logger.info("session has been closed in sync, addr:{}", remoteAddress);
+                LOGGER.info("session has been closed in sync, addr:{}", remoteAddress);
                 return;
             }
 
@@ -163,15 +163,15 @@ public class ClientSessionGroupMapping {
             } else if (EventMeshConstants.PURPOSE_PUB.equals(session.getClient().getPurpose())) {
                 cleanClientGroupWrapperByClosePub(session);
             } else {
-                logger.error("client purpose config is error:{}", session.getClient().getPurpose());
+                LOGGER.error("client purpose config is error:{}", session.getClient().getPurpose());
             }
 
             if (session.getContext() != null) {
-                logger.info("begin to close channel to remote address[{}]", remoteAddress);
+                LOGGER.info("begin to close channel to remote address[{}]", remoteAddress);
                 session.getContext().channel().close().addListener(new ChannelFutureListener() {
                     @Override
                     public void operationComplete(ChannelFuture future) throws Exception {
-                        logger.info("close the connection to remote address[{}] result: {}", remoteAddress,
+                        LOGGER.info("close the connection to remote address[{}] result: {}", remoteAddress,
                                 future.isSuccess());
                     }
                 });
@@ -190,7 +190,7 @@ public class ClientSessionGroupMapping {
         if (!lockMap.containsKey(user.getGroup())) {
             Object obj = lockMap.putIfAbsent(user.getGroup(), new Object());
             if (obj == null) {
-                logger.info("add lock to map for group:{}", user.getGroup());
+                LOGGER.info("add lock to map for group:{}", user.getGroup());
             }
         }
         synchronized (lockMap.get(user.getGroup())) {
@@ -198,7 +198,7 @@ public class ClientSessionGroupMapping {
                 ClientGroupWrapper cgw = constructClientGroupWrapper(user.getSubsystem(), user.getGroup(),
                         eventMeshTCPServer, new FreePriorityDispatchStrategy());
                 clientGroupMap.put(user.getGroup(), cgw);
-                logger.info("create new ClientGroupWrapper, group:{}", user.getGroup());
+                LOGGER.info("create new ClientGroupWrapper, group:{}", user.getGroup());
             }
 
             ClientGroupWrapper cgw = clientGroupMap.get(user.getGroup());
@@ -208,7 +208,7 @@ public class ClientSessionGroupMapping {
             } else if (EventMeshConstants.PURPOSE_SUB.equals(user.getPurpose())) {
                 initClientGroupConsumser(cgw);
             } else {
-                logger.error("unknown client purpose:{}", user.getPurpose());
+                LOGGER.error("unknown client purpose:{}", user.getPurpose());
                 throw new Exception("client purpose config is error");
             }
 
@@ -246,7 +246,7 @@ public class ClientSessionGroupMapping {
             lockMap.putIfAbsent(session.getClient().getSubsystem(), new Object());
         }
         synchronized (lockMap.get(session.getClient().getSubsystem())) {
-            logger.info("readySession session[{}]", session);
+            LOGGER.info("readySession session[{}]", session);
             ClientGroupWrapper cgw = session.getClientGroupWrapper().get();
 
             boolean flag = cgw != null && cgw.addGroupConsumerSession(session);
@@ -300,8 +300,8 @@ public class ClientSessionGroupMapping {
         if (unAckMsg.size() > 0 && Objects.requireNonNull(session.getClientGroupWrapper().get()).getGroupConsumerSessions().size() > 0) {
             for (Map.Entry<String, DownStreamMsgContext> entry : unAckMsg.entrySet()) {
                 DownStreamMsgContext downStreamMsgContext = entry.getValue();
-                if (SubscriptionMode.BROADCASTING.equals(downStreamMsgContext.subscriptionItem.getMode())) {
-                    logger.warn("exist broadcast msg unack when closeSession,seq:{},bizSeq:{},client:{}",
+                if (SubscriptionMode.BROADCASTING == downStreamMsgContext.subscriptionItem.getMode()) {
+                    LOGGER.warn("exist broadcast msg unack when closeSession,seq:{},bizSeq:{},client:{}",
                             downStreamMsgContext.seq, EventMeshUtil.getMessageBizSeq(downStreamMsgContext.event),
                             session.getClient());
                     continue;
@@ -314,10 +314,10 @@ public class ClientSessionGroupMapping {
                     downStreamMsgContext.session = reChooseSession;
                     reChooseSession.getPusher().unAckMsg(downStreamMsgContext.seq, downStreamMsgContext);
                     reChooseSession.downstreamMsg(downStreamMsgContext);
-                    logger.info("rePush msg form unAckMsgs,seq:{},rePushClient:{}", entry.getKey(),
+                    LOGGER.info("rePush msg form unAckMsgs,seq:{},rePushClient:{}", entry.getKey(),
                             downStreamMsgContext.session.getClient());
                 } else {
-                    logger.warn("select session fail in handleUnackMsgsInSession,seq:{},topic:{}", entry.getKey(),
+                    LOGGER.warn("select session fail in handleUnackMsgsInSession,seq:{},topic:{}", entry.getKey(),
                             downStreamMsgContext.event.getSubject());
                 }
             }
@@ -325,13 +325,13 @@ public class ClientSessionGroupMapping {
     }
 
     private void cleanClientGroupWrapperCommon(Session session) throws Exception {
-        logger.info("GroupConsumerSessions size:{}",
+        LOGGER.info("GroupConsumerSessions size:{}",
                 Objects.requireNonNull(session.getClientGroupWrapper().get()).getGroupConsumerSessions().size());
         if (Objects.requireNonNull(session.getClientGroupWrapper().get()).getGroupConsumerSessions().size() == 0) {
             shutdownClientGroupConsumer(session);
         }
 
-        logger.info("GroupProducerSessions size:{}",
+        LOGGER.info("GroupProducerSessions size:{}",
                 Objects.requireNonNull(session.getClientGroupWrapper().get()).getGroupProducerSessions().size());
         if ((Objects.requireNonNull(session.getClientGroupWrapper().get()).getGroupConsumerSessions().size() == 0)
                 && (Objects.requireNonNull(session.getClientGroupWrapper().get()).getGroupProducerSessions().size() == 0)) {
@@ -339,7 +339,7 @@ public class ClientSessionGroupMapping {
 
             clientGroupMap.remove(Objects.requireNonNull(session.getClientGroupWrapper().get()).getGroup());
             lockMap.remove(Objects.requireNonNull(session.getClientGroupWrapper().get()).getGroup());
-            logger.info("remove clientGroupWrapper group[{}]", Objects.requireNonNull(session.getClientGroupWrapper().get()).getGroup());
+            LOGGER.info("remove clientGroupWrapper group[{}]", Objects.requireNonNull(session.getClientGroupWrapper().get()).getGroup());
         }
     }
 
@@ -371,10 +371,10 @@ public class ClientSessionGroupMapping {
                             if (System.currentTimeMillis() - tmp.getLastHeartbeatTime()
                                     > eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshTcpSessionExpiredInMills) {
                                 try {
-                                    logger.warn("clean expired session,client:{}", tmp.getClient());
+                                    LOGGER.warn("clean expired session,client:{}", tmp.getClient());
                                     closeSession(tmp.getContext());
                                 } catch (Exception e) {
-                                    logger.error("say goodbye to session error! {}", tmp, e);
+                                    LOGGER.error("say goodbye to session error! {}", tmp, e);
                                 }
                             }
                         }
@@ -401,7 +401,7 @@ public class ClientSessionGroupMapping {
                                 }
                                 downStreamMsgContext.ackMsg();
                                 tmp.getPusher().getUnAckMsg().remove(seqKey);
-                                logger.warn("remove expire downStreamMsgContext, session:{}, topic:{}, seq:{}", tmp,
+                                LOGGER.warn("remove expire downStreamMsgContext, session:{}, topic:{}, seq:{}", tmp,
                                         downStreamMsgContext.event.getSubject(), seqKey);
                             }
                         }
@@ -413,21 +413,21 @@ public class ClientSessionGroupMapping {
     public void init() throws Exception {
         initSessionCleaner();
         initDownStreamMsgContextCleaner();
-        logger.info("ClientSessionGroupMapping inited......");
+        LOGGER.info("ClientSessionGroupMapping inited......");
     }
 
     public void start() throws Exception {
-        logger.info("ClientSessionGroupMapping started......");
+        LOGGER.info("ClientSessionGroupMapping started......");
     }
 
     public void shutdown() throws Exception {
-        logger.info("begin to close sessions gracefully");
+        LOGGER.info("begin to close sessions gracefully");
         for (ClientGroupWrapper clientGroupWrapper : clientGroupMap.values()) {
             for (Session subSession : clientGroupWrapper.getGroupConsumerSessions()) {
                 try {
                     EventMeshTcp2Client.serverGoodby2Client(eventMeshTCPServer, subSession, this);
                 } catch (Exception e) {
-                    logger.error("say goodbye to subSession error! {}", subSession, e);
+                    LOGGER.error("say goodbye to subSession error! {}", subSession, e);
                 }
             }
 
@@ -435,31 +435,31 @@ public class ClientSessionGroupMapping {
                 try {
                     EventMeshTcp2Client.serverGoodby2Client(eventMeshTCPServer, pubSession, this);
                 } catch (Exception e) {
-                    logger.error("say goodbye to pubSession error! {}", pubSession, e);
+                    LOGGER.error("say goodbye to pubSession error! {}", pubSession, e);
                 }
             }
             try {
                 Thread.sleep(eventMeshTCPServer.getEventMeshTCPConfiguration().gracefulShutdownSleepIntervalInMills);
             } catch (InterruptedException e) {
-                logger.warn("Thread.sleep occur InterruptedException", e);
+                LOGGER.warn("Thread.sleep occur InterruptedException", e);
             }
         }
 
         try {
             Thread.sleep(1000);
         } catch (InterruptedException e) {
-            logger.warn("Thread.sleep occur InterruptedException", e);
+            LOGGER.warn("Thread.sleep occur InterruptedException", e);
         }
 
         sessionTable.values().parallelStream().forEach(itr -> {
             try {
                 EventMeshTcp2Client.serverGoodby2Client(this.eventMeshTCPServer, itr, this);
             } catch (Exception e) {
-                logger.error("say goodbye to session error! {}", itr, e);
+                LOGGER.error("say goodbye to session error! {}", itr, e);
             }
         });
         ThreadUtils.randomSleep(50);
-        logger.info("ClientSessionGroupMapping shutdown......");
+        LOGGER.info("ClientSessionGroupMapping shutdown......");
     }
 
     public ConcurrentHashMap<InetSocketAddress, Session> getSessionMap() {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org