You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by mi...@apache.org on 2022/12/31 04:14:39 UTC
[incubator-eventmesh] branch master updated: fix issue2722
This is an automated email from the ASF dual-hosted git repository.
mikexue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git
The following commit(s) were added to refs/heads/master by this push:
new 875247817 fix issue2722
new a47c1f712 Merge pull request #2746 from jonyangx/issue2722
875247817 is described below
commit 8752478173a1c0e9ffca972e85bb83b2a6753658
Author: jonyangx <ya...@gmail.com>
AuthorDate: Sat Dec 31 09:56:20 2022 +0800
fix issue2722
---
.../protocol/tcp/client/task/AbstractTask.java | 2 +-
.../protocol/tcp/client/task/SubscribeTask.java | 36 +++++++++++-----------
2 files changed, 19 insertions(+), 19 deletions(-)
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/AbstractTask.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/AbstractTask.java
index c2cb04d7a..fe98f303e 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/AbstractTask.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/AbstractTask.java
@@ -34,7 +34,7 @@ public abstract class AbstractTask implements Runnable {
protected long startTime;
protected EventMeshTCPServer eventMeshTCPServer;
- public AbstractTask(Package pkg, ChannelHandlerContext ctx, long startTime, EventMeshTCPServer eventMeshTCPServer) {
+ public AbstractTask(final Package pkg, final ChannelHandlerContext ctx, long startTime, final EventMeshTCPServer eventMeshTCPServer) {
this.eventMeshTCPServer = eventMeshTCPServer;
this.pkg = pkg;
this.ctx = ctx;
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/SubscribeTask.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/SubscribeTask.java
index 575b8395e..e3fbc927b 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/SubscribeTask.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/SubscribeTask.java
@@ -30,6 +30,7 @@ import org.apache.eventmesh.runtime.util.Utils;
import java.util.ArrayList;
import java.util.List;
+import java.util.Objects;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -40,43 +41,42 @@ public class SubscribeTask extends AbstractTask {
private static final Logger LOGGER = LoggerFactory.getLogger(SubscribeTask.class);
- private static final Logger MESSAGE_LOGGER = LoggerFactory.getLogger("message");
-
- public SubscribeTask(Package pkg, ChannelHandlerContext ctx, long startTime, EventMeshTCPServer eventMeshTCPServer) {
+ public SubscribeTask(final Package pkg, final ChannelHandlerContext ctx, long startTime, final EventMeshTCPServer eventMeshTCPServer) {
super(pkg, ctx, startTime, eventMeshTCPServer);
}
@Override
public void run() {
- long taskExecuteTime = System.currentTimeMillis();
- Package msg = new Package();
- try {
- Subscription subscriptionInfo = (Subscription) pkg.getBody();
- if (subscriptionInfo == null) {
- throw new Exception("subscriptionInfo is null");
- }
+ final long taskExecuteTime = System.currentTimeMillis();
- List<SubscriptionItem> subscriptionItems = new ArrayList<>();
- for (int i = 0; i < subscriptionInfo.getTopicList().size(); i++) {
- SubscriptionItem item = subscriptionInfo.getTopicList().get(i);
+ final Package msg = new Package();
+ try {
+ final Subscription subscriptionInfo = (Subscription) pkg.getBody();
+ Objects.requireNonNull(subscriptionInfo, "subscriptionInfo can not be null");
+ final List<SubscriptionItem> subscriptionItems = new ArrayList<>();
+ final boolean eventMeshServerSecurityEnable = eventMeshTCPServer.getEventMeshTCPConfiguration().isEventMeshServerSecurityEnable();
+ final String remoteAddr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
+ subscriptionInfo.getTopicList().forEach(item -> {
//do acl check for receive msg
- if (eventMeshTCPServer.getEventMeshTCPConfiguration().isEventMeshServerSecurityEnable()) {
- String remoteAddr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
+ if (eventMeshServerSecurityEnable) {
Acl.doAclCheckInTcpReceive(remoteAddr, session.getClient(), item.getTopic(),
Command.SUBSCRIBE_REQUEST.getValue());
}
subscriptionItems.add(item);
- }
+ });
+
synchronized (session) {
session.subscribe(subscriptionItems);
- MESSAGE_LOGGER.info("SubscribeTask succeed|user={}|topics={}", session.getClient(), subscriptionItems);
+ if (LOGGER.isInfoEnabled()) {
+ LOGGER.info("SubscribeTask succeed|user={}|topics={}", session.getClient(), subscriptionItems);
+ }
}
msg.setHeader(new Header(Command.SUBSCRIBE_RESPONSE, OPStatus.SUCCESS.getCode(), OPStatus.SUCCESS.getDesc(),
pkg.getHeader().getSeq()));
} catch (Exception e) {
- MESSAGE_LOGGER.error("SubscribeTask failed|user={}|errMsg={}", session.getClient(), e);
+ LOGGER.error("SubscribeTask failed|user={}|errMsg={}", session.getClient(), e);
msg.setHeader(new Header(Command.SUBSCRIBE_RESPONSE, OPStatus.FAIL.getCode(), e.toString(), pkg.getHeader()
.getSeq()));
} finally {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org