You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by mi...@apache.org on 2022/12/06 02:11:24 UTC
[incubator-eventmesh] branch master updated: fix issue2366
This is an automated email from the ASF dual-hosted git repository.
mikexue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git
The following commit(s) were added to refs/heads/master by this push:
new 39227c6cd fix issue2366
new 0a3d4b116 Merge pull request #2465 from jonyangx/issue2366
39227c6cd is described below
commit 39227c6cdfee6af70eeb8d36c9465e638fd1045f
Author: jonyangx <ya...@gmail.com>
AuthorDate: Mon Dec 5 21:14:47 2022 +0800
fix issue2366
---
.../core/protocol/tcp/client/session/Session.java | 90 +++++++++++-----------
1 file changed, 47 insertions(+), 43 deletions(-)
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/Session.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/Session.java
index f9a065e40..52b8d751b 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/Session.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/Session.java
@@ -53,11 +53,11 @@ import io.netty.channel.ChannelHandlerContext;
public class Session {
- protected final Logger messageLogger = LoggerFactory.getLogger("message");
+ protected static final Logger MESSAGE_LOGGER = LoggerFactory.getLogger("message");
- private final Logger subscribeLogger = LoggerFactory.getLogger("subscribeLogger");
+ private static final Logger SUBSCRIB_LOGGER = LoggerFactory.getLogger("subscribeLogger");
- private final Logger logger = LoggerFactory.getLogger(this.getClass());
+ private static final Logger LOGGER = LoggerFactory.getLogger(Session.class);
private UserAgent client;
@@ -170,10 +170,11 @@ public class Session {
sessionContext.subscribeTopics.putIfAbsent(item.getTopic(), item);
Objects.requireNonNull(clientGroupWrapper.get()).subscribe(item);
- Objects.requireNonNull(clientGroupWrapper.get()).getMqProducerWrapper().getMeshMQProducer().checkTopicExist(item.getTopic());
+ Objects.requireNonNull(clientGroupWrapper.get()).getMqProducerWrapper().getMeshMQProducer()
+ .checkTopicExist(item.getTopic());
Objects.requireNonNull(clientGroupWrapper.get()).addSubscription(item, this);
- subscribeLogger.info("subscribe|succeed|topic={}|user={}", item.getTopic(), client);
+ SUBSCRIB_LOGGER.info("subscribe|succeed|topic={}|user={}", item.getTopic(), client);
}
}
@@ -184,12 +185,13 @@ public class Session {
if (!Objects.requireNonNull(clientGroupWrapper.get()).hasSubscription(item.getTopic())) {
Objects.requireNonNull(clientGroupWrapper.get()).unsubscribe(item);
- subscribeLogger.info("unSubscribe|succeed|topic={}|lastUser={}", item.getTopic(), client);
+ SUBSCRIB_LOGGER.info("unSubscribe|succeed|topic={}|lastUser={}", item.getTopic(), client);
}
}
}
- public EventMeshTcpSendResult upstreamMsg(Header header, CloudEvent event, SendCallback sendCallback, long startTime, long taskExecuteTime) {
+ public EventMeshTcpSendResult upstreamMsg(Header header, CloudEvent event, SendCallback sendCallback,
+ long startTime, long taskExecuteTime) {
String topic = event.getSubject();
sessionContext.sendTopics.putIfAbsent(topic, topic);
return sender.send(header, event, sendCallback, startTime, taskExecuteTime);
@@ -197,7 +199,8 @@ public class Session {
public void downstreamMsg(DownStreamMsgContext downStreamMsgContext) {
long currTime = System.currentTimeMillis();
- trySendListenResponse(new Header(LISTEN_RESPONSE, OPStatus.SUCCESS.getCode(), "succeed", getListenRequestSeq()), currTime, currTime);
+ trySendListenResponse(new Header(LISTEN_RESPONSE, OPStatus.SUCCESS.getCode(), "succeed",
+ getListenRequestSeq()), currTime, currTime);
pusher.push(downStreamMsgContext);
}
@@ -209,51 +212,52 @@ public class Session {
public void write2Client(final Package pkg) {
try {
- if (SessionState.CLOSED.equals(sessionState)) {
+ if (SessionState.CLOSED == sessionState) {
return;
}
+
context.writeAndFlush(pkg).addListener(
- new ChannelFutureListener() {
- @Override
- public void operationComplete(ChannelFuture future) throws Exception {
- if (!future.isSuccess()) {
- messageLogger.error("write2Client fail, pkg[{}] session[{}]", pkg, this);
- } else {
- Objects.requireNonNull(clientGroupWrapper.get())
- .getEventMeshTcpMonitor()
- .getTcpSummaryMetrics()
- .getEventMesh2clientMsgNum()
- .incrementAndGet();
+ new ChannelFutureListener() {
+ @Override
+ public void operationComplete(ChannelFuture future) throws Exception {
+ if (!future.isSuccess()) {
+ MESSAGE_LOGGER.error("write2Client fail, pkg[{}] session[{}]", pkg, this);
+ } else {
+ Objects.requireNonNull(clientGroupWrapper.get())
+ .getEventMeshTcpMonitor()
+ .getTcpSummaryMetrics()
+ .getEventMesh2clientMsgNum()
+ .incrementAndGet();
+ }
}
}
- }
);
} catch (Exception e) {
- logger.error("exception while write2Client", e);
+ LOGGER.error("exception while write2Client", e);
}
}
@Override
public String toString() {
return "Session{"
- +
- "sysId=" + Objects.requireNonNull(clientGroupWrapper.get()).getSysId()
- +
- ",remoteAddr=" + RemotingHelper.parseSocketAddressAddr(remoteAddress)
- +
- ",client=" + client
- +
- ",sessionState=" + sessionState
- +
- ",sessionContext=" + sessionContext
- +
- ",pusher=" + pusher
- +
- ",sender=" + sender
- +
- ",createTime=" + DateFormatUtils.format(createTime, EventMeshConstants.DATE_FORMAT)
- +
- ",lastHeartbeatTime=" + DateFormatUtils.format(lastHeartbeatTime, EventMeshConstants.DATE_FORMAT) + '}';
+ +
+ "sysId=" + Objects.requireNonNull(clientGroupWrapper.get()).getSysId()
+ +
+ ",remoteAddr=" + RemotingHelper.parseSocketAddressAddr(remoteAddress)
+ +
+ ",client=" + client
+ +
+ ",sessionState=" + sessionState
+ +
+ ",sessionContext=" + sessionContext
+ +
+ ",pusher=" + pusher
+ +
+ ",sender=" + sender
+ +
+ ",createTime=" + DateFormatUtils.format(createTime, EventMeshConstants.DATE_FORMAT)
+ +
+ ",lastHeartbeatTime=" + DateFormatUtils.format(lastHeartbeatTime, EventMeshConstants.DATE_FORMAT) + '}';
}
@Override
@@ -331,12 +335,12 @@ public class Session {
public boolean isAvailable(String topic) {
if (SessionState.CLOSED == sessionState) {
- logger.warn("session is not available because session has been closed,topic:{},client:{}", topic, client);
+ LOGGER.warn("session is not available because session has been closed,topic:{},client:{}", topic, client);
return false;
}
if (!sessionContext.subscribeTopics.containsKey(topic)) {
- logger.warn("session is not available because session has not subscribe topic:{},client:{}", topic, client);
+ LOGGER.warn("session is not available because session has not subscribe topic:{},client:{}", topic, client);
return false;
}
@@ -345,7 +349,7 @@ public class Session {
public boolean isRunning() {
if (SessionState.RUNNING != sessionState) {
- logger.warn("session is not running, state:{} client:{}", sessionState, client);
+ LOGGER.warn("session is not running, state:{} client:{}", sessionState, client);
return false;
}
return true;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org