You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@eventmesh.apache.org by GitBox <gi...@apache.org> on 2022/07/21 12:07:32 UTC

[GitHub] [incubator-eventmesh] mytang0 opened a new pull request, #1037: [ISSUE #1035 #1036]Fix patch tcp

mytang0 opened a new pull request, #1037:
URL: https://github.com/apache/incubator-eventmesh/pull/1037

   Fixes ISSUE #1035 #1036 .


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@eventmesh.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: dev-help@eventmesh.apache.org


[GitHub] [incubator-eventmesh] codecov[bot] commented on pull request #1037: [ISSUE #1035 #1036]Fix patch tcp

Posted by GitBox <gi...@apache.org>.
codecov[bot] commented on PR #1037:
URL: https://github.com/apache/incubator-eventmesh/pull/1037#issuecomment-1191431038

   # [Codecov](https://codecov.io/gh/apache/incubator-eventmesh/pull/1037?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#1037](https://codecov.io/gh/apache/incubator-eventmesh/pull/1037?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (66db0ca) into [master](https://codecov.io/gh/apache/incubator-eventmesh/commit/de9092a7813730b08a1cdaf2de2ece35c92daa00?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (de9092a) will **decrease** coverage by `0.10%`.
   > The diff coverage is `0.00%`.
   
   > :exclamation: Current head 66db0ca differs from pull request most recent head 5ce5d55. Consider uploading reports for the commit 5ce5d55 to get more accurate results
   
   ```diff
   @@             Coverage Diff             @@
   ##             master   #1037      +/-   ##
   ===========================================
   - Coverage      9.72%   9.61%   -0.11%     
   + Complexity      614     607       -7     
   ===========================================
     Files           363     363              
     Lines         23126   23118       -8     
     Branches       2548    2541       -7     
   ===========================================
   - Hits           2249    2223      -26     
   - Misses        20673   20689      +16     
   - Partials        204     206       +2     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/incubator-eventmesh/pull/1037?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [...ol/tcp/client/group/ClientSessionGroupMapping.java](https://codecov.io/gh/apache/incubator-eventmesh/pull/1037/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZXZlbnRtZXNoLXJ1bnRpbWUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2V2ZW50bWVzaC9ydW50aW1lL2NvcmUvcHJvdG9jb2wvdGNwL2NsaWVudC9ncm91cC9DbGllbnRTZXNzaW9uR3JvdXBNYXBwaW5nLmphdmE=) | `0.00% <0.00%> (ø)` | |
   | [...ol/tcp/client/session/send/UpStreamMsgContext.java](https://codecov.io/gh/apache/incubator-eventmesh/pull/1037/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZXZlbnRtZXNoLXJ1bnRpbWUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2V2ZW50bWVzaC9ydW50aW1lL2NvcmUvcHJvdG9jb2wvdGNwL2NsaWVudC9zZXNzaW9uL3NlbmQvVXBTdHJlYW1Nc2dDb250ZXh0LmphdmE=) | `0.00% <0.00%> (ø)` | |
   | [...tandalone/broker/task/HistoryMessageClearTask.java](https://codecov.io/gh/apache/incubator-eventmesh/pull/1037/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZXZlbnRtZXNoLWNvbm5lY3Rvci1wbHVnaW4vZXZlbnRtZXNoLWNvbm5lY3Rvci1zdGFuZGFsb25lL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9ldmVudG1lc2gvY29ubmVjdG9yL3N0YW5kYWxvbmUvYnJva2VyL3Rhc2svSGlzdG9yeU1lc3NhZ2VDbGVhclRhc2suamF2YQ==) | `29.41% <0.00%> (-17.65%)` | :arrow_down: |
   | [...mesh/connector/standalone/broker/MessageQueue.java](https://codecov.io/gh/apache/incubator-eventmesh/pull/1037/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZXZlbnRtZXNoLWNvbm5lY3Rvci1wbHVnaW4vZXZlbnRtZXNoLWNvbm5lY3Rvci1zdGFuZGFsb25lL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9ldmVudG1lc2gvY29ubmVjdG9yL3N0YW5kYWxvbmUvYnJva2VyL01lc3NhZ2VRdWV1ZS5qYXZh) | `32.46% <0.00%> (-7.80%)` | :arrow_down: |
   | [...ava/org/apache/eventmesh/common/utils/IPUtils.java](https://codecov.io/gh/apache/incubator-eventmesh/pull/1037/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZXZlbnRtZXNoLWNvbW1vbi9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvZXZlbnRtZXNoL2NvbW1vbi91dGlscy9JUFV0aWxzLmphdmE=) | `33.33% <0.00%> (-6.49%)` | :arrow_down: |
   | [...g/apache/eventmesh/runtime/util/EventMeshUtil.java](https://codecov.io/gh/apache/incubator-eventmesh/pull/1037/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZXZlbnRtZXNoLXJ1bnRpbWUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2V2ZW50bWVzaC9ydW50aW1lL3V0aWwvRXZlbnRNZXNoVXRpbC5qYXZh) | `58.08% <0.00%> (-5.89%)` | :arrow_down: |
   | [...sh/client/grpc/consumer/EventMeshGrpcConsumer.java](https://codecov.io/gh/apache/incubator-eventmesh/pull/1037/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZXZlbnRtZXNoLXNkay1qYXZhL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9ldmVudG1lc2gvY2xpZW50L2dycGMvY29uc3VtZXIvRXZlbnRNZXNoR3JwY0NvbnN1bWVyLmphdmE=) | `75.80% <0.00%> (-1.62%)` | :arrow_down: |
   | [...rg/apache/eventmesh/runtime/trace/LogExporter.java](https://codecov.io/gh/apache/incubator-eventmesh/pull/1037/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZXZlbnRtZXNoLXJ1bnRpbWUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2V2ZW50bWVzaC9ydW50aW1lL3RyYWNlL0xvZ0V4cG9ydGVyLmphdmE=) | `0.00% <0.00%> (ø)` | |
   | [...core/protocol/http/consumer/EventMeshConsumer.java](https://codecov.io/gh/apache/incubator-eventmesh/pull/1037/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZXZlbnRtZXNoLXJ1bnRpbWUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2V2ZW50bWVzaC9ydW50aW1lL2NvcmUvcHJvdG9jb2wvaHR0cC9jb25zdW1lci9FdmVudE1lc2hDb25zdW1lci5qYXZh) | `0.00% <0.00%> (ø)` | |
   | [.../protocol/tcp/client/group/ClientGroupWrapper.java](https://codecov.io/gh/apache/incubator-eventmesh/pull/1037/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZXZlbnRtZXNoLXJ1bnRpbWUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2V2ZW50bWVzaC9ydW50aW1lL2NvcmUvcHJvdG9jb2wvdGNwL2NsaWVudC9ncm91cC9DbGllbnRHcm91cFdyYXBwZXIuamF2YQ==) | `0.00% <0.00%> (ø)` | |
   | ... and [2 more](https://codecov.io/gh/apache/incubator-eventmesh/pull/1037/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@eventmesh.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: dev-help@eventmesh.apache.org


[GitHub] [incubator-eventmesh] xwm1992 merged pull request #1037: [ISSUE #1035 #1036] Fix patch TCP

Posted by GitBox <gi...@apache.org>.
xwm1992 merged PR #1037:
URL: https://github.com/apache/incubator-eventmesh/pull/1037


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@eventmesh.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: dev-help@eventmesh.apache.org


[GitHub] [incubator-eventmesh] mytang0 commented on a diff in pull request #1037: [ISSUE #1035 #1036] Fix patch TCP

Posted by GitBox <gi...@apache.org>.
mytang0 commented on code in PR #1037:
URL: https://github.com/apache/incubator-eventmesh/pull/1037#discussion_r931934453


##########
eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/send/UpStreamMsgContext.java:
##########
@@ -92,21 +92,26 @@ public void retry() {
             Command replyCmd = getReplyCmd(header.getCmd());
             long sendTime = System.currentTimeMillis();
 
-            EventMeshTcpSendResult sendStatus = session.upstreamMsg(header, event,
-                    createSendCallback(replyCmd, taskExecuteTime, event), startTime, taskExecuteTime);
+            retryTimes++;
 
-            if (StringUtils.equals(EventMeshTcpSendStatus.SUCCESS.name(), sendStatus.getSendStatus().name())) {
-                logger.info("pkg|eventMesh2mq|cmd={}|event={}|user={}|wait={}ms|cost={}ms", header.getCmd(), event,
+            // check session availability
+            if (session.isAvailable(event.getSubject())) {

Review Comment:
   > session.isAvailable() is not suitable for use here,this method was originally used to determine whether a message can be sent to a consumer session. The method contains logic as follows:
   > 
   > ```
   >       if (!sessionContext.subscribeTopics.containsKey(topic)) {
   >           logger.warn("session is not available because session has not subscribe topic:{},client:{}", topic, client);
   >           return false;
   >       }
   > ```
   
   If so, then isAvailable needs to be more refined. This modification is because the session has been closed, the retry logic is still calling session.upstreamMsg, an unnecessary exception occurs, and even an infinite loop.



##########
eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/send/UpStreamMsgContext.java:
##########
@@ -92,21 +92,26 @@ public void retry() {
             Command replyCmd = getReplyCmd(header.getCmd());
             long sendTime = System.currentTimeMillis();
 
-            EventMeshTcpSendResult sendStatus = session.upstreamMsg(header, event,
-                    createSendCallback(replyCmd, taskExecuteTime, event), startTime, taskExecuteTime);
+            retryTimes++;
 
-            if (StringUtils.equals(EventMeshTcpSendStatus.SUCCESS.name(), sendStatus.getSendStatus().name())) {
-                logger.info("pkg|eventMesh2mq|cmd={}|event={}|user={}|wait={}ms|cost={}ms", header.getCmd(), event,
+            // check session availability
+            if (session.isAvailable(event.getSubject())) {

Review Comment:
   > If so, then isAvailable needs to be more refined. This modification is because the session has been closed, the retry logic is still calling session.upstreamMsg, an unnecessary exception occurs, and even an infinite loop.
   
   If so, then isAvailable needs to be more refined. This modification is because the session has been closed, the retry logic is still calling session.upstreamMsg, an unnecessary exception occurs, and even an infinite loop.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@eventmesh.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: dev-help@eventmesh.apache.org


[GitHub] [incubator-eventmesh] lrhkobe commented on a diff in pull request #1037: [ISSUE #1035 #1036] Fix patch TCP

Posted by GitBox <gi...@apache.org>.
lrhkobe commented on code in PR #1037:
URL: https://github.com/apache/incubator-eventmesh/pull/1037#discussion_r930699445


##########
eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/send/UpStreamMsgContext.java:
##########
@@ -92,21 +92,26 @@ public void retry() {
             Command replyCmd = getReplyCmd(header.getCmd());
             long sendTime = System.currentTimeMillis();
 
-            EventMeshTcpSendResult sendStatus = session.upstreamMsg(header, event,
-                    createSendCallback(replyCmd, taskExecuteTime, event), startTime, taskExecuteTime);
+            retryTimes++;
 
-            if (StringUtils.equals(EventMeshTcpSendStatus.SUCCESS.name(), sendStatus.getSendStatus().name())) {
-                logger.info("pkg|eventMesh2mq|cmd={}|event={}|user={}|wait={}ms|cost={}ms", header.getCmd(), event,
+            // check session availability
+            if (session.isAvailable(event.getSubject())) {

Review Comment:
   session.isAvailable() is not suitable for use here,this method was originally used to determine whether a message can be sent to a consumer session. The method contains logic as follows:
   ```
         if (!sessionContext.subscribeTopics.containsKey(topic)) {
             logger.warn("session is not available because session has not subscribe topic:{},client:{}", topic, client);
             return false;
         }
   ```



##########
eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientSessionGroupMapping.java:
##########
@@ -133,11 +133,11 @@ public void operationComplete(ChannelFuture future) throws Exception {
             return;
         }
 
-        closeSession(session);
-
         //remove session from sessionTable
         sessionTable.remove(addr);
 
+        closeSession(session);
+

Review Comment:
   Can you describe the specific problem, I don't understand what the original way would cause.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@eventmesh.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: dev-help@eventmesh.apache.org


[GitHub] [incubator-eventmesh] lrhkobe commented on a diff in pull request #1037: [ISSUE #1035 #1036] Fix patch TCP

Posted by GitBox <gi...@apache.org>.
lrhkobe commented on code in PR #1037:
URL: https://github.com/apache/incubator-eventmesh/pull/1037#discussion_r931840761


##########
eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientSessionGroupMapping.java:
##########
@@ -133,11 +133,11 @@ public void operationComplete(ChannelFuture future) throws Exception {
             return;
         }
 
-        closeSession(session);
-
         //remove session from sessionTable
         sessionTable.remove(addr);
 
+        closeSession(session);
+

Review Comment:
   
   When the client reconnects, the port will change, and the session is identified by ip and port. EventMesh acquires session as follows:
   ```
   public Session getSession(ChannelHandlerContext ctx) {
           Session session = getSession((InetSocketAddress) ctx.channel().remoteAddress());
           return session;
       }
   
       public Session getSession(InetSocketAddress address) {
           return sessionTable.get(address);
       }
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@eventmesh.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: dev-help@eventmesh.apache.org


[GitHub] [incubator-eventmesh] mytang0 commented on a diff in pull request #1037: [ISSUE #1035 #1036] Fix patch TCP

Posted by GitBox <gi...@apache.org>.
mytang0 commented on code in PR #1037:
URL: https://github.com/apache/incubator-eventmesh/pull/1037#discussion_r931833366


##########
eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/send/UpStreamMsgContext.java:
##########
@@ -92,21 +92,26 @@ public void retry() {
             Command replyCmd = getReplyCmd(header.getCmd());
             long sendTime = System.currentTimeMillis();
 
-            EventMeshTcpSendResult sendStatus = session.upstreamMsg(header, event,
-                    createSendCallback(replyCmd, taskExecuteTime, event), startTime, taskExecuteTime);
+            retryTimes++;
 
-            if (StringUtils.equals(EventMeshTcpSendStatus.SUCCESS.name(), sendStatus.getSendStatus().name())) {
-                logger.info("pkg|eventMesh2mq|cmd={}|event={}|user={}|wait={}ms|cost={}ms", header.getCmd(), event,
+            // check session availability
+            if (session.isAvailable(event.getSubject())) {

Review Comment:
   If so, then isAvailable needs to be more refined. This modification is because the session has been closed, the retry logic is still calling session.upstreamMsg, an unnecessary exception occurs, and even an infinite loop.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@eventmesh.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: dev-help@eventmesh.apache.org


[GitHub] [incubator-eventmesh] mytang0 commented on a diff in pull request #1037: [ISSUE #1035 #1036] Fix patch TCP

Posted by GitBox <gi...@apache.org>.
mytang0 commented on code in PR #1037:
URL: https://github.com/apache/incubator-eventmesh/pull/1037#discussion_r934169111


##########
eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/send/UpStreamMsgContext.java:
##########
@@ -92,21 +92,26 @@ public void retry() {
             Command replyCmd = getReplyCmd(header.getCmd());
             long sendTime = System.currentTimeMillis();
 
-            EventMeshTcpSendResult sendStatus = session.upstreamMsg(header, event,
-                    createSendCallback(replyCmd, taskExecuteTime, event), startTime, taskExecuteTime);
+            retryTimes++;
 
-            if (StringUtils.equals(EventMeshTcpSendStatus.SUCCESS.name(), sendStatus.getSendStatus().name())) {
-                logger.info("pkg|eventMesh2mq|cmd={}|event={}|user={}|wait={}ms|cost={}ms", header.getCmd(), event,
+            // check session availability
+            if (session.isAvailable(event.getSubject())) {

Review Comment:
   > Yes, that's right. You can supply another method for replacing`isAvailable`.
   
   Ok, it's done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@eventmesh.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: dev-help@eventmesh.apache.org


[GitHub] [incubator-eventmesh] mytang0 commented on a diff in pull request #1037: [ISSUE #1035 #1036] Fix patch TCP

Posted by GitBox <gi...@apache.org>.
mytang0 commented on code in PR #1037:
URL: https://github.com/apache/incubator-eventmesh/pull/1037#discussion_r931828266


##########
eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientSessionGroupMapping.java:
##########
@@ -133,11 +133,11 @@ public void operationComplete(ChannelFuture future) throws Exception {
             return;
         }
 
-        closeSession(session);
-
         //remove session from sessionTable
         sessionTable.remove(addr);
 
+        closeSession(session);
+

Review Comment:
   > Can you describe the specific problem, I don't understand what the original way would cause.
   
   When the client channel is inactive and the session is closed, the client reconnects and the closed session is acquired and used. So first sessionTable.remove, then closeSession.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@eventmesh.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: dev-help@eventmesh.apache.org


[GitHub] [incubator-eventmesh] lrhkobe commented on a diff in pull request #1037: [ISSUE #1035 #1036] Fix patch TCP

Posted by GitBox <gi...@apache.org>.
lrhkobe commented on code in PR #1037:
URL: https://github.com/apache/incubator-eventmesh/pull/1037#discussion_r934087188


##########
eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/send/UpStreamMsgContext.java:
##########
@@ -92,21 +92,26 @@ public void retry() {
             Command replyCmd = getReplyCmd(header.getCmd());
             long sendTime = System.currentTimeMillis();
 
-            EventMeshTcpSendResult sendStatus = session.upstreamMsg(header, event,
-                    createSendCallback(replyCmd, taskExecuteTime, event), startTime, taskExecuteTime);
+            retryTimes++;
 
-            if (StringUtils.equals(EventMeshTcpSendStatus.SUCCESS.name(), sendStatus.getSendStatus().name())) {
-                logger.info("pkg|eventMesh2mq|cmd={}|event={}|user={}|wait={}ms|cost={}ms", header.getCmd(), event,
+            // check session availability
+            if (session.isAvailable(event.getSubject())) {

Review Comment:
   Yes, that's right. You can supply another method for replacing`isAvailable`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@eventmesh.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: dev-help@eventmesh.apache.org


[GitHub] [incubator-eventmesh] mytang0 commented on a diff in pull request #1037: [ISSUE #1035 #1036] Fix patch TCP

Posted by GitBox <gi...@apache.org>.
mytang0 commented on code in PR #1037:
URL: https://github.com/apache/incubator-eventmesh/pull/1037#discussion_r931933145


##########
eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientSessionGroupMapping.java:
##########
@@ -133,11 +133,11 @@ public void operationComplete(ChannelFuture future) throws Exception {
             return;
         }
 
-        closeSession(session);
-
         //remove session from sessionTable
         sessionTable.remove(addr);
 
+        closeSession(session);
+

Review Comment:
   > When the client reconnects, the port will change, and the session is identified by ip and port. EventMesh acquires session as follows:
   > 
   > ```
   > public Session getSession(ChannelHandlerContext ctx) {
   >         Session session = getSession((InetSocketAddress) ctx.channel().remoteAddress());
   >         return session;
   >     }
   > 
   >     public Session getSession(InetSocketAddress address) {
   >         return sessionTable.get(address);
   >     }
   > ```
   
   Yes. If there is a problem of port reuse, is it better to adjust this way?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@eventmesh.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: dev-help@eventmesh.apache.org


[GitHub] [incubator-eventmesh] mytang0 commented on a diff in pull request #1037: [ISSUE #1035 #1036] Fix patch TCP

Posted by GitBox <gi...@apache.org>.
mytang0 commented on code in PR #1037:
URL: https://github.com/apache/incubator-eventmesh/pull/1037#discussion_r931833513


##########
eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/send/UpStreamMsgContext.java:
##########
@@ -92,21 +92,26 @@ public void retry() {
             Command replyCmd = getReplyCmd(header.getCmd());
             long sendTime = System.currentTimeMillis();
 
-            EventMeshTcpSendResult sendStatus = session.upstreamMsg(header, event,
-                    createSendCallback(replyCmd, taskExecuteTime, event), startTime, taskExecuteTime);
+            retryTimes++;
 
-            if (StringUtils.equals(EventMeshTcpSendStatus.SUCCESS.name(), sendStatus.getSendStatus().name())) {
-                logger.info("pkg|eventMesh2mq|cmd={}|event={}|user={}|wait={}ms|cost={}ms", header.getCmd(), event,
+            // check session availability
+            if (session.isAvailable(event.getSubject())) {

Review Comment:
   > If so, then isAvailable needs to be more refined. This modification is because the session has been closed, the retry logic is still calling session.upstreamMsg, an unnecessary exception occurs, and even an infinite loop.
   
   If so, then isAvailable needs to be more refined. This modification is because the session has been closed, the retry logic is still calling session.upstreamMsg, an unnecessary exception occurs, and even an infinite loop.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@eventmesh.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: dev-help@eventmesh.apache.org


[GitHub] [incubator-eventmesh] mytang0 commented on a diff in pull request #1037: [ISSUE #1035 #1036] Fix patch TCP

Posted by GitBox <gi...@apache.org>.
mytang0 commented on code in PR #1037:
URL: https://github.com/apache/incubator-eventmesh/pull/1037#discussion_r931827963


##########
eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientSessionGroupMapping.java:
##########
@@ -133,11 +133,11 @@ public void operationComplete(ChannelFuture future) throws Exception {
             return;
         }
 
-        closeSession(session);
-
         //remove session from sessionTable
         sessionTable.remove(addr);
 
+        closeSession(session);
+

Review Comment:
   When the client channel is inactive and the session is closed, the client reconnects and the closed session is acquired and used. So first sessionTable.remove, then closeSession.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@eventmesh.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: dev-help@eventmesh.apache.org