You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by mi...@apache.org on 2022/12/05 08:02:32 UTC
[incubator-eventmesh] branch master updated: fix issue2367
This is an automated email from the ASF dual-hosted git repository.
mikexue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git
The following commit(s) were added to refs/heads/master by this push:
new e31b561e1 fix issue2367
new ae1b5623d Merge pull request #2458 from jonyangx/issue2367
e31b561e1 is described below
commit e31b561e12758399df9f5ae994efc98b48de7ee7
Author: jonyangx <jo...@gmail.com>
AuthorDate: Mon Dec 5 09:26:17 2022 +0800
fix issue2367
---
.../client/group/ClientSessionGroupMapping.java | 74 +++++++++++-----------
1 file changed, 37 insertions(+), 37 deletions(-)
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientSessionGroupMapping.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientSessionGroupMapping.java
index 9ca761657..a26de8fd3 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientSessionGroupMapping.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientSessionGroupMapping.java
@@ -52,8 +52,8 @@ import io.netty.channel.ChannelHandlerContext;
public class ClientSessionGroupMapping {
- private final Logger logger = LoggerFactory.getLogger(this.getClass());
- private final Logger sessionLogger = LoggerFactory.getLogger("sessionLogger");
+ private static final Logger LOGGER = LoggerFactory.getLogger(ClientSessionGroupMapping.class);
+ private static final Logger SESSION_LOGGER = LoggerFactory.getLogger("sessionLogger");
private ConcurrentHashMap<InetSocketAddress, Session> sessionTable = new ConcurrentHashMap<>();
@@ -96,14 +96,14 @@ public class ClientSessionGroupMapping {
user.setPort(addr.getPort());
Session session = null;
if (!sessionTable.containsKey(addr)) {
- logger.info("createSession client[{}]", RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
+ LOGGER.info("createSession client[{}]", RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
session = new Session(user, ctx, eventMeshTCPServer.getEventMeshTCPConfiguration());
initClientGroupWrapper(user, session);
sessionTable.put(addr, session);
- sessionLogger.info("session|open|succeed|user={}", user);
+ SESSION_LOGGER.info("session|open|succeed|user={}", user);
} else {
session = sessionTable.get(addr);
- sessionLogger.error("session|open|failed|user={}|msg={}", user, "session has been created!");
+ SESSION_LOGGER.error("session|open|failed|user={}|msg={}", user, "session has been created!");
}
return session;
}
@@ -121,15 +121,15 @@ public class ClientSessionGroupMapping {
Session session = MapUtils.getObject(sessionTable, addr, null);
if (session == null) {
final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
- logger.info("begin to close channel to remote address[{}]", remoteAddress);
+ LOGGER.info("begin to close channel to remote address[{}]", remoteAddress);
ctx.channel().close().addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
- logger.info("close the connection to remote address[{}] result: {}", remoteAddress,
+ LOGGER.info("close the connection to remote address[{}] result: {}", remoteAddress,
future.isSuccess());
}
});
- sessionLogger.info("session|close|succeed|address={}|msg={}", addr, "no session was found");
+ SESSION_LOGGER.info("session|close|succeed|address={}|msg={}", addr, "no session was found");
return;
}
@@ -138,13 +138,13 @@ public class ClientSessionGroupMapping {
//remove session from sessionTable
sessionTable.remove(addr);
- sessionLogger.info("session|close|succeed|user={}", session.getClient());
+ SESSION_LOGGER.info("session|close|succeed|user={}", session.getClient());
}
private void closeSession(Session session) throws Exception {
final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(session.getContext().channel());
if (SessionState.CLOSED == session.getSessionState()) {
- logger.info("session has been closed, addr:{}", remoteAddress);
+ LOGGER.info("session has been closed, addr:{}", remoteAddress);
return;
}
@@ -152,7 +152,7 @@ public class ClientSessionGroupMapping {
synchronized (session) {
if (SessionState.CLOSED == session.getSessionState()) {
- logger.info("session has been closed in sync, addr:{}", remoteAddress);
+ LOGGER.info("session has been closed in sync, addr:{}", remoteAddress);
return;
}
@@ -163,15 +163,15 @@ public class ClientSessionGroupMapping {
} else if (EventMeshConstants.PURPOSE_PUB.equals(session.getClient().getPurpose())) {
cleanClientGroupWrapperByClosePub(session);
} else {
- logger.error("client purpose config is error:{}", session.getClient().getPurpose());
+ LOGGER.error("client purpose config is error:{}", session.getClient().getPurpose());
}
if (session.getContext() != null) {
- logger.info("begin to close channel to remote address[{}]", remoteAddress);
+ LOGGER.info("begin to close channel to remote address[{}]", remoteAddress);
session.getContext().channel().close().addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
- logger.info("close the connection to remote address[{}] result: {}", remoteAddress,
+ LOGGER.info("close the connection to remote address[{}] result: {}", remoteAddress,
future.isSuccess());
}
});
@@ -190,7 +190,7 @@ public class ClientSessionGroupMapping {
if (!lockMap.containsKey(user.getGroup())) {
Object obj = lockMap.putIfAbsent(user.getGroup(), new Object());
if (obj == null) {
- logger.info("add lock to map for group:{}", user.getGroup());
+ LOGGER.info("add lock to map for group:{}", user.getGroup());
}
}
synchronized (lockMap.get(user.getGroup())) {
@@ -198,7 +198,7 @@ public class ClientSessionGroupMapping {
ClientGroupWrapper cgw = constructClientGroupWrapper(user.getSubsystem(), user.getGroup(),
eventMeshTCPServer, new FreePriorityDispatchStrategy());
clientGroupMap.put(user.getGroup(), cgw);
- logger.info("create new ClientGroupWrapper, group:{}", user.getGroup());
+ LOGGER.info("create new ClientGroupWrapper, group:{}", user.getGroup());
}
ClientGroupWrapper cgw = clientGroupMap.get(user.getGroup());
@@ -208,7 +208,7 @@ public class ClientSessionGroupMapping {
} else if (EventMeshConstants.PURPOSE_SUB.equals(user.getPurpose())) {
initClientGroupConsumser(cgw);
} else {
- logger.error("unknown client purpose:{}", user.getPurpose());
+ LOGGER.error("unknown client purpose:{}", user.getPurpose());
throw new Exception("client purpose config is error");
}
@@ -246,7 +246,7 @@ public class ClientSessionGroupMapping {
lockMap.putIfAbsent(session.getClient().getSubsystem(), new Object());
}
synchronized (lockMap.get(session.getClient().getSubsystem())) {
- logger.info("readySession session[{}]", session);
+ LOGGER.info("readySession session[{}]", session);
ClientGroupWrapper cgw = session.getClientGroupWrapper().get();
boolean flag = cgw != null && cgw.addGroupConsumerSession(session);
@@ -300,8 +300,8 @@ public class ClientSessionGroupMapping {
if (unAckMsg.size() > 0 && Objects.requireNonNull(session.getClientGroupWrapper().get()).getGroupConsumerSessions().size() > 0) {
for (Map.Entry<String, DownStreamMsgContext> entry : unAckMsg.entrySet()) {
DownStreamMsgContext downStreamMsgContext = entry.getValue();
- if (SubscriptionMode.BROADCASTING.equals(downStreamMsgContext.subscriptionItem.getMode())) {
- logger.warn("exist broadcast msg unack when closeSession,seq:{},bizSeq:{},client:{}",
+ if (SubscriptionMode.BROADCASTING == downStreamMsgContext.subscriptionItem.getMode()) {
+ LOGGER.warn("exist broadcast msg unack when closeSession,seq:{},bizSeq:{},client:{}",
downStreamMsgContext.seq, EventMeshUtil.getMessageBizSeq(downStreamMsgContext.event),
session.getClient());
continue;
@@ -314,10 +314,10 @@ public class ClientSessionGroupMapping {
downStreamMsgContext.session = reChooseSession;
reChooseSession.getPusher().unAckMsg(downStreamMsgContext.seq, downStreamMsgContext);
reChooseSession.downstreamMsg(downStreamMsgContext);
- logger.info("rePush msg form unAckMsgs,seq:{},rePushClient:{}", entry.getKey(),
+ LOGGER.info("rePush msg form unAckMsgs,seq:{},rePushClient:{}", entry.getKey(),
downStreamMsgContext.session.getClient());
} else {
- logger.warn("select session fail in handleUnackMsgsInSession,seq:{},topic:{}", entry.getKey(),
+ LOGGER.warn("select session fail in handleUnackMsgsInSession,seq:{},topic:{}", entry.getKey(),
downStreamMsgContext.event.getSubject());
}
}
@@ -325,13 +325,13 @@ public class ClientSessionGroupMapping {
}
private void cleanClientGroupWrapperCommon(Session session) throws Exception {
- logger.info("GroupConsumerSessions size:{}",
+ LOGGER.info("GroupConsumerSessions size:{}",
Objects.requireNonNull(session.getClientGroupWrapper().get()).getGroupConsumerSessions().size());
if (Objects.requireNonNull(session.getClientGroupWrapper().get()).getGroupConsumerSessions().size() == 0) {
shutdownClientGroupConsumer(session);
}
- logger.info("GroupProducerSessions size:{}",
+ LOGGER.info("GroupProducerSessions size:{}",
Objects.requireNonNull(session.getClientGroupWrapper().get()).getGroupProducerSessions().size());
if ((Objects.requireNonNull(session.getClientGroupWrapper().get()).getGroupConsumerSessions().size() == 0)
&& (Objects.requireNonNull(session.getClientGroupWrapper().get()).getGroupProducerSessions().size() == 0)) {
@@ -339,7 +339,7 @@ public class ClientSessionGroupMapping {
clientGroupMap.remove(Objects.requireNonNull(session.getClientGroupWrapper().get()).getGroup());
lockMap.remove(Objects.requireNonNull(session.getClientGroupWrapper().get()).getGroup());
- logger.info("remove clientGroupWrapper group[{}]", Objects.requireNonNull(session.getClientGroupWrapper().get()).getGroup());
+ LOGGER.info("remove clientGroupWrapper group[{}]", Objects.requireNonNull(session.getClientGroupWrapper().get()).getGroup());
}
}
@@ -371,10 +371,10 @@ public class ClientSessionGroupMapping {
if (System.currentTimeMillis() - tmp.getLastHeartbeatTime()
> eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshTcpSessionExpiredInMills) {
try {
- logger.warn("clean expired session,client:{}", tmp.getClient());
+ LOGGER.warn("clean expired session,client:{}", tmp.getClient());
closeSession(tmp.getContext());
} catch (Exception e) {
- logger.error("say goodbye to session error! {}", tmp, e);
+ LOGGER.error("say goodbye to session error! {}", tmp, e);
}
}
}
@@ -401,7 +401,7 @@ public class ClientSessionGroupMapping {
}
downStreamMsgContext.ackMsg();
tmp.getPusher().getUnAckMsg().remove(seqKey);
- logger.warn("remove expire downStreamMsgContext, session:{}, topic:{}, seq:{}", tmp,
+ LOGGER.warn("remove expire downStreamMsgContext, session:{}, topic:{}, seq:{}", tmp,
downStreamMsgContext.event.getSubject(), seqKey);
}
}
@@ -413,21 +413,21 @@ public class ClientSessionGroupMapping {
public void init() throws Exception {
initSessionCleaner();
initDownStreamMsgContextCleaner();
- logger.info("ClientSessionGroupMapping inited......");
+ LOGGER.info("ClientSessionGroupMapping inited......");
}
public void start() throws Exception {
- logger.info("ClientSessionGroupMapping started......");
+ LOGGER.info("ClientSessionGroupMapping started......");
}
public void shutdown() throws Exception {
- logger.info("begin to close sessions gracefully");
+ LOGGER.info("begin to close sessions gracefully");
for (ClientGroupWrapper clientGroupWrapper : clientGroupMap.values()) {
for (Session subSession : clientGroupWrapper.getGroupConsumerSessions()) {
try {
EventMeshTcp2Client.serverGoodby2Client(eventMeshTCPServer, subSession, this);
} catch (Exception e) {
- logger.error("say goodbye to subSession error! {}", subSession, e);
+ LOGGER.error("say goodbye to subSession error! {}", subSession, e);
}
}
@@ -435,31 +435,31 @@ public class ClientSessionGroupMapping {
try {
EventMeshTcp2Client.serverGoodby2Client(eventMeshTCPServer, pubSession, this);
} catch (Exception e) {
- logger.error("say goodbye to pubSession error! {}", pubSession, e);
+ LOGGER.error("say goodbye to pubSession error! {}", pubSession, e);
}
}
try {
Thread.sleep(eventMeshTCPServer.getEventMeshTCPConfiguration().gracefulShutdownSleepIntervalInMills);
} catch (InterruptedException e) {
- logger.warn("Thread.sleep occur InterruptedException", e);
+ LOGGER.warn("Thread.sleep occur InterruptedException", e);
}
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
- logger.warn("Thread.sleep occur InterruptedException", e);
+ LOGGER.warn("Thread.sleep occur InterruptedException", e);
}
sessionTable.values().parallelStream().forEach(itr -> {
try {
EventMeshTcp2Client.serverGoodby2Client(this.eventMeshTCPServer, itr, this);
} catch (Exception e) {
- logger.error("say goodbye to session error! {}", itr, e);
+ LOGGER.error("say goodbye to session error! {}", itr, e);
}
});
ThreadUtils.randomSleep(50);
- logger.info("ClientSessionGroupMapping shutdown......");
+ LOGGER.info("ClientSessionGroupMapping shutdown......");
}
public ConcurrentHashMap<InetSocketAddress, Session> getSessionMap() {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org