You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by mi...@apache.org on 2022/12/31 04:14:39 UTC

[incubator-eventmesh] branch master updated: fix issue2722

This is an automated email from the ASF dual-hosted git repository.

mikexue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git


The following commit(s) were added to refs/heads/master by this push:
     new 875247817 fix issue2722
     new a47c1f712 Merge pull request #2746 from jonyangx/issue2722
875247817 is described below

commit 8752478173a1c0e9ffca972e85bb83b2a6753658
Author: jonyangx <ya...@gmail.com>
AuthorDate: Sat Dec 31 09:56:20 2022 +0800

    fix issue2722
---
 .../protocol/tcp/client/task/AbstractTask.java     |  2 +-
 .../protocol/tcp/client/task/SubscribeTask.java    | 36 +++++++++++-----------
 2 files changed, 19 insertions(+), 19 deletions(-)

diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/AbstractTask.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/AbstractTask.java
index c2cb04d7a..fe98f303e 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/AbstractTask.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/AbstractTask.java
@@ -34,7 +34,7 @@ public abstract class AbstractTask implements Runnable {
     protected long startTime;
     protected EventMeshTCPServer eventMeshTCPServer;
 
-    public AbstractTask(Package pkg, ChannelHandlerContext ctx, long startTime, EventMeshTCPServer eventMeshTCPServer) {
+    public AbstractTask(final Package pkg, final ChannelHandlerContext ctx, long startTime, final EventMeshTCPServer eventMeshTCPServer) {
         this.eventMeshTCPServer = eventMeshTCPServer;
         this.pkg = pkg;
         this.ctx = ctx;
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/SubscribeTask.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/SubscribeTask.java
index 575b8395e..e3fbc927b 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/SubscribeTask.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/SubscribeTask.java
@@ -30,6 +30,7 @@ import org.apache.eventmesh.runtime.util.Utils;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Objects;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -40,43 +41,42 @@ public class SubscribeTask extends AbstractTask {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(SubscribeTask.class);
 
-    private static final Logger MESSAGE_LOGGER = LoggerFactory.getLogger("message");
-
-    public SubscribeTask(Package pkg, ChannelHandlerContext ctx, long startTime, EventMeshTCPServer eventMeshTCPServer) {
+    public SubscribeTask(final Package pkg, final ChannelHandlerContext ctx, long startTime, final EventMeshTCPServer eventMeshTCPServer) {
         super(pkg, ctx, startTime, eventMeshTCPServer);
     }
 
     @Override
     public void run() {
-        long taskExecuteTime = System.currentTimeMillis();
-        Package msg = new Package();
-        try {
-            Subscription subscriptionInfo = (Subscription) pkg.getBody();
-            if (subscriptionInfo == null) {
-                throw new Exception("subscriptionInfo is null");
-            }
+        final long taskExecuteTime = System.currentTimeMillis();
 
-            List<SubscriptionItem> subscriptionItems = new ArrayList<>();
-            for (int i = 0; i < subscriptionInfo.getTopicList().size(); i++) {
-                SubscriptionItem item = subscriptionInfo.getTopicList().get(i);
+        final Package msg = new Package();
+        try {
+            final Subscription subscriptionInfo = (Subscription) pkg.getBody();
+            Objects.requireNonNull(subscriptionInfo, "subscriptionInfo can not be null");
 
+            final List<SubscriptionItem> subscriptionItems = new ArrayList<>();
+            final boolean eventMeshServerSecurityEnable = eventMeshTCPServer.getEventMeshTCPConfiguration().isEventMeshServerSecurityEnable();
+            final String remoteAddr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
+            subscriptionInfo.getTopicList().forEach(item -> {
                 //do acl check for receive msg
-                if (eventMeshTCPServer.getEventMeshTCPConfiguration().isEventMeshServerSecurityEnable()) {
-                    String remoteAddr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
+                if (eventMeshServerSecurityEnable) {
                     Acl.doAclCheckInTcpReceive(remoteAddr, session.getClient(), item.getTopic(),
                             Command.SUBSCRIBE_REQUEST.getValue());
                 }
 
                 subscriptionItems.add(item);
-            }
+            });
+
             synchronized (session) {
                 session.subscribe(subscriptionItems);
-                MESSAGE_LOGGER.info("SubscribeTask succeed|user={}|topics={}", session.getClient(), subscriptionItems);
+                if (LOGGER.isInfoEnabled()) {
+                    LOGGER.info("SubscribeTask succeed|user={}|topics={}", session.getClient(), subscriptionItems);
+                }
             }
             msg.setHeader(new Header(Command.SUBSCRIBE_RESPONSE, OPStatus.SUCCESS.getCode(), OPStatus.SUCCESS.getDesc(),
                     pkg.getHeader().getSeq()));
         } catch (Exception e) {
-            MESSAGE_LOGGER.error("SubscribeTask failed|user={}|errMsg={}", session.getClient(), e);
+            LOGGER.error("SubscribeTask failed|user={}|errMsg={}", session.getClient(), e);
             msg.setHeader(new Header(Command.SUBSCRIBE_RESPONSE, OPStatus.FAIL.getCode(), e.toString(), pkg.getHeader()
                     .getSeq()));
         } finally {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org