You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by ch...@apache.org on 2021/08/06 02:16:19 UTC

[incubator-eventmesh] branch develop updated: [ISSUE #484] Support access control for EventMesh (#485)

This is an automated email from the ASF dual-hosted git repository.

chenguangsheng pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git


The following commit(s) were added to refs/heads/develop by this push:
     new d5ac681  [ISSUE #484] Support access control for EventMesh (#485)
d5ac681 is described below

commit d5ac681323078d86d73d7a2ecfcf9dfb561b438f
Author: lrhkobe <34...@users.noreply.github.com>
AuthorDate: Fri Aug 6 10:16:13 2021 +0800

    [ISSUE #484] Support access control for EventMesh (#485)
    
    * modify:optimize flow control in downstreaming msg
    
    * modify:optimize stategy of selecting session in downstream msg
    
    * modify:optimize msg downstream,msg store in session
    
    * modify:fix bug:not a @Sharable handler
    
    * modify:downstream broadcast msg asynchronously
    
    * modify:remove unneccessary interface in eventmesh-connector-api
    
    * modify:fix conflict
    
    * modify:add license in EventMeshAction
    
    * modify:fix ack problem
    
    * modify:fix exception handle when exception occured in EventMeshTcpMessageDispatcher
    
    * modify:fix log print
    
    * modify:add acl module in eventmesh
    
    * modify:add eventmesh-acl-impl denpendency in eventmesh-runtime
    
    * modify:reactor security module name
    
    * modify:reactor filed name and config,fix unit test problem
---
 .../common/config/CommonConfiguration.java         |  26 ++---
 .../protocol/http/common/EventMeshRetCode.java     |   3 +-
 .../src/test/resources/configuration.properties    |   1 +
 eventmesh-runtime/build.gradle                     |   4 +
 eventmesh-runtime/conf/eventmesh.properties        |   6 +-
 eventmesh-runtime/conf/log4j2.xml                  |   4 +
 .../java/org/apache/eventmesh/runtime/acl/Acl.java | 115 +++++++++++++++++++++
 .../eventmesh/runtime/boot/EventMeshServer.java    |  16 +++
 .../http/processor/BatchSendMessageProcessor.java  |  27 ++++-
 .../processor/BatchSendMessageV2Processor.java     |  26 +++++
 .../http/processor/HeartBeatProcessor.java         |  24 +++++
 .../http/processor/SendAsyncMessageProcessor.java  |  25 +++++
 .../http/processor/SendSyncMessageProcessor.java   |  25 +++++
 .../http/processor/SubscribeProcessor.java         |  27 +++++
 .../protocol/tcp/client/task/HeartBeatTask.java    |   9 ++
 .../core/protocol/tcp/client/task/HelloTask.java   |   9 ++
 .../tcp/client/task/MessageTransferTask.java       |   8 ++
 .../protocol/tcp/client/task/SubscribeTask.java    |   9 ++
 .../build.gradle                                   |  25 ++++-
 .../eventmesh-security-acl}/build.gradle           |   4 +-
 .../apache/eventmesh/acl/impl/AclServiceImpl.java  |  60 +++++++++++
 .../org.apache.eventmesh.api.acl.AclService        |  10 +-
 .../eventmesh-security-api}/build.gradle           |   4 +-
 .../apache/eventmesh/api/acl/AclPropertyKeys.java  |  11 +-
 .../org/apache/eventmesh/api/acl/AclService.java   |  24 ++++-
 .../eventmesh/api/exception/AclException.java      |  12 ++-
 settings.gradle                                    |   2 +
 27 files changed, 474 insertions(+), 42 deletions(-)

diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/CommonConfiguration.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/CommonConfiguration.java
index 321ebe0..9af0024 100644
--- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/CommonConfiguration.java
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/CommonConfiguration.java
@@ -29,23 +29,13 @@ public class CommonConfiguration {
     public String eventMeshName = "";
     public String sysID = "5477";
     public String eventMeshConnectorPluginType = "rocketmq";
+    public String eventMeshSecurityPluginType = "security";
 
     public String namesrvAddr = "";
-    public String clientUserName = "username";
-    public String clientPass = "user@123";
-    public Integer consumeThreadMin = 2;
-    public Integer consumeThreadMax = 2;
-    public Integer consumeQueueSize = 10000;
-    public Integer pullBatchSize = 32;
-    public Integer ackWindow = 1000;
-    public Integer pubWindow = 100;
-    public long consumeTimeout = 0L;
-    public Integer pollNameServerInteval = 10 * 1000;
-    public Integer heartbeatBrokerInterval = 30 * 1000;
-    public Integer rebalanceInterval = 20 * 1000;
     public Integer eventMeshRegisterIntervalInMills = 10 * 1000;
     public Integer eventMeshFetchRegistryAddrInterval = 10 * 1000;
     public String eventMeshServerIp = null;
+    public boolean eventMeshServerSecurityEnable = false;
     protected ConfigurationWrapper configurationWrapper;
 
     public CommonConfiguration(ConfigurationWrapper configurationWrapper) {
@@ -82,6 +72,14 @@ public class CommonConfiguration {
 
             eventMeshConnectorPluginType = configurationWrapper.getProp(ConfKeys.KEYS_ENENTMESH_CONNECTOR_PLUGIN_TYPE);
             Preconditions.checkState(StringUtils.isNotEmpty(eventMeshConnectorPluginType), String.format("%s error", ConfKeys.KEYS_ENENTMESH_CONNECTOR_PLUGIN_TYPE));
+
+            String eventMeshServerAclEnableStr = configurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_SECURITY_ENABLED);
+            if (StringUtils.isNotBlank(eventMeshServerAclEnableStr)) {
+                eventMeshServerSecurityEnable = Boolean.valueOf(StringUtils.deleteWhitespace(eventMeshServerAclEnableStr));
+            }
+
+            eventMeshSecurityPluginType = configurationWrapper.getProp(ConfKeys.KEYS_ENENTMESH_SECURITY_PLUGIN_TYPE);
+            Preconditions.checkState(StringUtils.isNotEmpty(eventMeshSecurityPluginType), String.format("%s error", ConfKeys.KEYS_ENENTMESH_SECURITY_PLUGIN_TYPE));
         }
     }
 
@@ -103,5 +101,9 @@ public class CommonConfiguration {
         public static String KEYS_EVENTMESH_SERVER_FETCH_REGISTRY_ADDR_INTERVAL = "eventMesh.server.registry.fetchRegistryAddrIntervalInMills";
 
         public static String KEYS_ENENTMESH_CONNECTOR_PLUGIN_TYPE = "eventMesh.connector.plugin.type";
+
+        public static String KEYS_EVENTMESH_SECURITY_ENABLED = "eventMesh.server.security.enabled";
+
+        public static String KEYS_ENENTMESH_SECURITY_PLUGIN_TYPE = "eventMesh.security.plugin.type";
     }
 }
\ No newline at end of file
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/common/EventMeshRetCode.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/common/EventMeshRetCode.java
index 3593e0e..7795507 100644
--- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/common/EventMeshRetCode.java
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/common/EventMeshRetCode.java
@@ -38,7 +38,8 @@ public enum EventMeshRetCode {
     EVENTMESH_RUNTIME_ERR(16, "eventMesh runtime err, "),
     EVENTMESH_SUBSCRIBE_ERR(17, "eventMesh subscribe err"),
     EVENTMESH_UNSUBSCRIBE_ERR(18, "eventMesh unsubscribe err"),
-    EVENTMESH_HEARTBEAT_ERR(19, "eventMesh heartbeat err");
+    EVENTMESH_HEARTBEAT_ERR(19, "eventMesh heartbeat err"),
+    EVENTMESH_ACL_ERR(20, "eventMesh acl err");
 
     private Integer retCode;
 
diff --git a/eventmesh-common/src/test/resources/configuration.properties b/eventmesh-common/src/test/resources/configuration.properties
index 76f29f2..c9425d7 100644
--- a/eventmesh-common/src/test/resources/configuration.properties
+++ b/eventmesh-common/src/test/resources/configuration.properties
@@ -22,3 +22,4 @@ eventMesh.server.cluster=value4
 eventMesh.server.name=value5
 eventMesh.server.hostIp=value6
 eventMesh.connector.plugin.type=rocketmq
+eventMesh.security.plugin.type=security
diff --git a/eventmesh-runtime/build.gradle b/eventmesh-runtime/build.gradle
index c5c141a..6a82a55 100644
--- a/eventmesh-runtime/build.gradle
+++ b/eventmesh-runtime/build.gradle
@@ -17,6 +17,10 @@
 
 dependencies {
     implementation project(":eventmesh-connector-plugin:eventmesh-connector-api")
+    implementation project(":eventmesh-security-plugin:eventmesh-security-api")
+    implementation project(":eventmesh-security-plugin:eventmesh-security-acl")
 
     testImplementation project(":eventmesh-connector-plugin:eventmesh-connector-api")
+    testImplementation project(":eventmesh-security-plugin:eventmesh-security-api")
+    testImplementation project(":eventmesh-security-plugin:eventmesh-security-acl")
 }
diff --git a/eventmesh-runtime/conf/eventmesh.properties b/eventmesh-runtime/conf/eventmesh.properties
index 927fc31..a9001be 100644
--- a/eventmesh-runtime/conf/eventmesh.properties
+++ b/eventmesh-runtime/conf/eventmesh.properties
@@ -56,4 +56,8 @@ eventMesh.server.registry.fetchRegistryAddrIntervalInMills=20000
 #eventMesh.server.defibus.client.comsumeTimeoutInMin=5
 
 #connector plugin
-eventMesh.connector.plugin.type=rocketmq
\ No newline at end of file
+eventMesh.connector.plugin.type=rocketmq
+
+#security plugin
+eventMesh.server.security.enabled=false
+eventMesh.security.plugin.type=security
\ No newline at end of file
diff --git a/eventmesh-runtime/conf/log4j2.xml b/eventmesh-runtime/conf/log4j2.xml
index 8495181..756abea 100644
--- a/eventmesh-runtime/conf/log4j2.xml
+++ b/eventmesh-runtime/conf/log4j2.xml
@@ -64,6 +64,10 @@
             <AppenderRef ref="console"/>
         </AsyncLogger>
 
+        <AsyncLogger name="acl" level="debug" additivity="false" includeLocation="true">
+            <AppenderRef ref="console"/>
+        </AsyncLogger>
+
         <AsyncLogger name="org.apache.eventmesh.runtime" level="debug" additivity="false" includeLocation="true">
             <AppenderRef ref="console"/>
         </AsyncLogger>
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/acl/Acl.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/acl/Acl.java
new file mode 100644
index 0000000..cb773f4
--- /dev/null
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/acl/Acl.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eventmesh.runtime.acl;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.eventmesh.api.acl.AclPropertyKeys;
+import org.apache.eventmesh.api.acl.AclService;
+import org.apache.eventmesh.api.exception.AclException;
+import org.apache.eventmesh.api.producer.MeshMQProducer;
+import org.apache.eventmesh.common.protocol.tcp.UserAgent;
+import org.apache.eventmesh.spi.EventMeshExtensionFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Properties;
+import java.util.ServiceLoader;
+
+public class Acl {
+    private static final Logger logger = LoggerFactory.getLogger(Acl.class);
+    private static AclService aclService;
+
+    public void init(String aclPluginType) throws AclException{
+//        aclService = getSpiAclService();
+        aclService = EventMeshExtensionFactory.getExtension(AclService.class, aclPluginType);
+        if (aclService == null) {
+            logger.error("can't load the aclService plugin, please check.");
+            throw new RuntimeException("doesn't load the aclService plugin, please check.");
+        }
+        aclService.init();
+    }
+
+    public void start() throws AclException{
+        aclService.start();
+    }
+
+    public void shutdown() throws AclException{
+        aclService.shutdown();
+    }
+
+    public static void doAclCheckInTcpConnect(String remoteAddr, UserAgent userAgent, int requestCode) throws AclException{
+        aclService.doAclCheckInConnect(buildTcpAclProperties(remoteAddr, userAgent, null, requestCode));
+    }
+
+    public static void doAclCheckInTcpHeartbeat(String remoteAddr, UserAgent userAgent, int requestCode) throws AclException{
+        aclService.doAclCheckInHeartbeat(buildTcpAclProperties(remoteAddr, userAgent, null, requestCode));
+    }
+
+    public static void doAclCheckInTcpSend(String remoteAddr, UserAgent userAgent, String topic, int requestCode) throws AclException{
+        aclService.doAclCheckInSend(buildTcpAclProperties(remoteAddr, userAgent, topic, requestCode));
+    }
+
+    public static void doAclCheckInTcpReceive(String remoteAddr, UserAgent userAgent, String topic, int requestCode) throws AclException{
+        aclService.doAclCheckInReceive(buildTcpAclProperties(remoteAddr, userAgent, topic, requestCode));
+    }
+
+    private static Properties buildTcpAclProperties(String remoteAddr, UserAgent userAgent, String topic, int requestCode){
+        Properties aclProperties = new Properties();
+        aclProperties.put(AclPropertyKeys.CLIENT_IP, remoteAddr);
+        aclProperties.put(AclPropertyKeys.USER, userAgent.getUsername());
+        aclProperties.put(AclPropertyKeys.PASSWORD, userAgent.getPassword());
+        aclProperties.put(AclPropertyKeys.SUBSYSTEM, userAgent.getSubsystem());
+        aclProperties.put(AclPropertyKeys.REQUEST_CODE, requestCode);
+        if(StringUtils.isNotBlank(topic)) {
+            aclProperties.put(AclPropertyKeys.TOPIC, topic);
+        }
+        return aclProperties;
+    }
+
+    public static void doAclCheckInHttpSend(String remoteAddr, String user, String pass,String subsystem, String topic, int requestCode) throws AclException{
+        aclService.doAclCheckInSend(buildHttpAclProperties(remoteAddr, user, pass, subsystem, topic, requestCode));
+    }
+
+    public static void doAclCheckInHttpReceive(String remoteAddr, String user, String pass,String subsystem, String topic, int requestCode) throws AclException{
+        aclService.doAclCheckInReceive(buildHttpAclProperties(remoteAddr, user, pass, subsystem, topic, requestCode));
+    }
+
+    public static void doAclCheckInHttpHeartbeat(String remoteAddr, String user, String pass,String subsystem, String topic, int requestCode) throws AclException{
+        aclService.doAclCheckInHeartbeat(buildHttpAclProperties(remoteAddr, user, pass, subsystem, topic, requestCode));
+    }
+
+    private static Properties buildHttpAclProperties(String remoteAddr, String user, String pass,String subsystem, String topic, int requestCode){
+        Properties aclProperties = new Properties();
+        aclProperties.put(AclPropertyKeys.CLIENT_IP, remoteAddr);
+        aclProperties.put(AclPropertyKeys.USER, user);
+        aclProperties.put(AclPropertyKeys.PASSWORD, pass);
+        aclProperties.put(AclPropertyKeys.SUBSYSTEM, subsystem);
+        aclProperties.put(AclPropertyKeys.REQUEST_CODE, requestCode);
+        if(StringUtils.isNotBlank(topic)) {
+            aclProperties.put(AclPropertyKeys.TOPIC, topic);
+        }
+        return aclProperties;
+    }
+
+    private AclService getSpiAclService() {
+        ServiceLoader<AclService> serviceLoader = ServiceLoader.load(AclService.class);
+        if (serviceLoader.iterator().hasNext()) {
+            return serviceLoader.iterator().next();
+        }
+        return null;
+    }
+}
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshServer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshServer.java
index 92684d0..615731d 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshServer.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshServer.java
@@ -17,6 +17,7 @@
 
 package org.apache.eventmesh.runtime.boot;
 
+import org.apache.eventmesh.runtime.acl.Acl;
 import org.apache.eventmesh.runtime.common.ServiceState;
 import org.apache.eventmesh.runtime.configuration.EventMeshHTTPConfiguration;
 import org.apache.eventmesh.runtime.configuration.EventMeshTCPConfiguration;
@@ -36,15 +37,22 @@ public class EventMeshServer {
 
     private EventMeshTCPConfiguration eventMeshTCPConfiguration;
 
+    private Acl acl;
+
     private ServiceState serviceState;
 
     public EventMeshServer(EventMeshHTTPConfiguration eventMeshHttpConfiguration,
                            EventMeshTCPConfiguration eventMeshTCPConfiguration) {
         this.eventMeshHttpConfiguration = eventMeshHttpConfiguration;
         this.eventMeshTCPConfiguration = eventMeshTCPConfiguration;
+        this.acl = new Acl();
     }
 
     public void init() throws Exception {
+        if(eventMeshHttpConfiguration != null && eventMeshHttpConfiguration.eventMeshServerSecurityEnable){
+            acl.init(eventMeshHttpConfiguration.eventMeshSecurityPluginType);
+        }
+
         eventMeshHTTPServer = new EventMeshHTTPServer(this, eventMeshHttpConfiguration);
         eventMeshHTTPServer.init();
         eventMeshTCPServer = new EventMeshTCPServer(this, eventMeshTCPConfiguration);
@@ -60,6 +68,10 @@ public class EventMeshServer {
     }
 
     public void start() throws Exception {
+        if(eventMeshHttpConfiguration != null && eventMeshHttpConfiguration.eventMeshServerSecurityEnable){
+            acl.start();
+        }
+
         eventMeshHTTPServer.start();
         if (eventMeshTCPConfiguration != null && eventMeshTCPConfiguration.eventMeshTcpServerEnabled) {
             eventMeshTCPServer.start();
@@ -75,6 +87,10 @@ public class EventMeshServer {
         if (eventMeshTCPConfiguration != null && eventMeshTCPConfiguration.eventMeshTcpServerEnabled) {
             eventMeshTCPServer.shutdown();
         }
+
+        if(eventMeshHttpConfiguration != null && eventMeshHttpConfiguration.eventMeshServerSecurityEnable){
+            acl.shutdown();
+        }
         serviceState = ServiceState.STOPED;
         logger.info("server state:{}", serviceState);
     }
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/BatchSendMessageProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/BatchSendMessageProcessor.java
index dacb22f..cc7ea48 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/BatchSendMessageProcessor.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/BatchSendMessageProcessor.java
@@ -36,17 +36,18 @@ import org.apache.eventmesh.common.IPUtil;
 import org.apache.eventmesh.common.command.HttpCommand;
 import org.apache.eventmesh.common.protocol.http.body.message.SendMessageBatchRequestBody;
 import org.apache.eventmesh.common.protocol.http.body.message.SendMessageBatchResponseBody;
+import org.apache.eventmesh.common.protocol.http.body.message.SendMessageResponseBody;
 import org.apache.eventmesh.common.protocol.http.common.EventMeshRetCode;
 import org.apache.eventmesh.common.protocol.http.common.RequestCode;
 import org.apache.eventmesh.common.protocol.http.header.message.SendMessageBatchRequestHeader;
 import org.apache.eventmesh.common.protocol.http.header.message.SendMessageBatchResponseHeader;
+import org.apache.eventmesh.runtime.acl.Acl;
 import org.apache.eventmesh.runtime.boot.EventMeshHTTPServer;
 import org.apache.eventmesh.runtime.constants.EventMeshConstants;
 import org.apache.eventmesh.runtime.core.protocol.http.async.AsyncContext;
 import org.apache.eventmesh.runtime.core.protocol.http.processor.inf.HttpRequestProcessor;
 import org.apache.eventmesh.runtime.core.protocol.http.producer.EventMeshProducer;
 import org.apache.eventmesh.runtime.core.protocol.http.producer.SendMessageContext;
-import org.apache.eventmesh.runtime.util.EventMeshUtil;
 import org.apache.eventmesh.runtime.util.RemotingHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -55,6 +56,8 @@ public class BatchSendMessageProcessor implements HttpRequestProcessor {
 
     public Logger cmdLogger = LoggerFactory.getLogger("cmd");
 
+    public Logger aclLogger = LoggerFactory.getLogger("acl");
+
     private EventMeshHTTPServer eventMeshHTTPServer;
 
     public BatchSendMessageProcessor(EventMeshHTTPServer eventMeshHTTPServer) {
@@ -128,6 +131,12 @@ public class BatchSendMessageProcessor implements HttpRequestProcessor {
 
         long batchStartTime = System.currentTimeMillis();
 
+        String remoteAddr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
+        String user = sendMessageBatchRequestHeader.getUsername();
+        String pass = sendMessageBatchRequestHeader.getPasswd();
+        String subsystem = sendMessageBatchRequestHeader.getSys();
+        int requestCode = Integer.valueOf(sendMessageBatchRequestHeader.getCode());
+
         List<Message> msgList = new ArrayList<>();
         Map<String, List<Message>> topicBatchMessageMappings = new ConcurrentHashMap<String, List<Message>>();
         for (SendMessageBatchRequestBody.BatchMessageEntity msg : sendMessageBatchRequestBody.getContents()) {
@@ -136,6 +145,22 @@ public class BatchSendMessageProcessor implements HttpRequestProcessor {
                 continue;
             }
 
+            //do acl check
+            if(eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshServerSecurityEnable) {
+                try {
+                    Acl.doAclCheckInHttpSend(remoteAddr, user, pass, subsystem, msg.topic, requestCode);
+                }catch (Exception e){
+                    //String errorMsg = String.format("CLIENT HAS NO PERMISSION,send failed, topic:%s, subsys:%s, realIp:%s", topic, subsys, realIp);
+
+                    responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse(
+                            sendMessageBatchResponseHeader,
+                            SendMessageResponseBody.buildBody(EventMeshRetCode.EVENTMESH_ACL_ERR.getRetCode(), e.getMessage()));
+                    asyncContext.onComplete(responseEventMeshCommand);
+                    aclLogger.warn("CLIENT HAS NO PERMISSION,BatchSendMessageProcessor send failed", e);
+                    return;
+                }
+            }
+
             if (StringUtils.isBlank(msg.ttl) || !StringUtils.isNumeric(msg.ttl)) {
                 msg.ttl = String.valueOf(EventMeshConstants.DEFAULT_MSG_TTL_MILLS);
             }
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/BatchSendMessageV2Processor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/BatchSendMessageV2Processor.java
index 8d5f7b5..07a2dbe 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/BatchSendMessageV2Processor.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/BatchSendMessageV2Processor.java
@@ -31,10 +31,12 @@ import org.apache.eventmesh.common.IPUtil;
 import org.apache.eventmesh.common.command.HttpCommand;
 import org.apache.eventmesh.common.protocol.http.body.message.SendMessageBatchV2RequestBody;
 import org.apache.eventmesh.common.protocol.http.body.message.SendMessageBatchV2ResponseBody;
+import org.apache.eventmesh.common.protocol.http.body.message.SendMessageResponseBody;
 import org.apache.eventmesh.common.protocol.http.common.EventMeshRetCode;
 import org.apache.eventmesh.common.protocol.http.common.RequestCode;
 import org.apache.eventmesh.common.protocol.http.header.message.SendMessageBatchV2RequestHeader;
 import org.apache.eventmesh.common.protocol.http.header.message.SendMessageBatchV2ResponseHeader;
+import org.apache.eventmesh.runtime.acl.Acl;
 import org.apache.eventmesh.runtime.boot.EventMeshHTTPServer;
 import org.apache.eventmesh.runtime.constants.EventMeshConstants;
 import org.apache.eventmesh.runtime.core.protocol.http.async.AsyncContext;
@@ -50,6 +52,8 @@ public class BatchSendMessageV2Processor implements HttpRequestProcessor {
 
     public Logger cmdLogger = LoggerFactory.getLogger("cmd");
 
+    public Logger aclLogger = LoggerFactory.getLogger("acl");
+
     private EventMeshHTTPServer eventMeshHTTPServer;
 
     public BatchSendMessageV2Processor(EventMeshHTTPServer eventMeshHTTPServer) {
@@ -96,6 +100,28 @@ public class BatchSendMessageV2Processor implements HttpRequestProcessor {
             return;
         }
 
+        //do acl check
+        if(eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshServerSecurityEnable) {
+            String remoteAddr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
+            String user = sendMessageBatchV2RequestHeader.getUsername();
+            String pass = sendMessageBatchV2RequestHeader.getPasswd();
+            String subsystem = sendMessageBatchV2RequestHeader.getSys();
+            int requestCode = Integer.valueOf(sendMessageBatchV2RequestHeader.getCode());
+            String topic = sendMessageBatchV2RequestBody.getTopic();
+            try {
+                Acl.doAclCheckInHttpSend(remoteAddr, user, pass, subsystem, topic, requestCode);
+            }catch (Exception e){
+                //String errorMsg = String.format("CLIENT HAS NO PERMISSION,send failed, topic:%s, subsys:%s, realIp:%s", topic, subsys, realIp);
+
+                responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse(
+                        sendMessageBatchV2ResponseHeader,
+                        SendMessageResponseBody.buildBody(EventMeshRetCode.EVENTMESH_ACL_ERR.getRetCode(), e.getMessage()));
+                asyncContext.onComplete(responseEventMeshCommand);
+                aclLogger.warn("CLIENT HAS NO PERMISSION,BatchSendMessageV2Processor send failed", e);
+                return;
+            }
+        }
+
         if (!eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshServerBatchMsgNumLimiter
                 .tryAcquire(EventMeshConstants.DEFAULT_FASTFAIL_TIMEOUT_IN_MILLISECONDS, TimeUnit.MILLISECONDS)) {
             responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse(
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/HeartBeatProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/HeartBeatProcessor.java
index 9e4c41e..e0341cf 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/HeartBeatProcessor.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/HeartBeatProcessor.java
@@ -31,10 +31,12 @@ import org.apache.eventmesh.common.IPUtil;
 import org.apache.eventmesh.common.command.HttpCommand;
 import org.apache.eventmesh.common.protocol.http.body.client.HeartbeatRequestBody;
 import org.apache.eventmesh.common.protocol.http.body.client.HeartbeatResponseBody;
+import org.apache.eventmesh.common.protocol.http.body.message.SendMessageResponseBody;
 import org.apache.eventmesh.common.protocol.http.common.EventMeshRetCode;
 import org.apache.eventmesh.common.protocol.http.common.RequestCode;
 import org.apache.eventmesh.common.protocol.http.header.client.HeartbeatRequestHeader;
 import org.apache.eventmesh.common.protocol.http.header.client.HeartbeatResponseHeader;
+import org.apache.eventmesh.runtime.acl.Acl;
 import org.apache.eventmesh.runtime.boot.EventMeshHTTPServer;
 import org.apache.eventmesh.runtime.constants.EventMeshConstants;
 import org.apache.eventmesh.runtime.core.protocol.http.async.AsyncContext;
@@ -50,6 +52,8 @@ public class HeartBeatProcessor implements HttpRequestProcessor {
 
     public Logger httpLogger = LoggerFactory.getLogger("http");
 
+    public Logger aclLogger = LoggerFactory.getLogger("acl");
+
     private EventMeshHTTPServer eventMeshHTTPServer;
 
     public HeartBeatProcessor(EventMeshHTTPServer eventMeshHTTPServer) {
@@ -121,6 +125,26 @@ public class HeartBeatProcessor implements HttpRequestProcessor {
                 continue;
             }
 
+            //do acl check
+            if(eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshServerSecurityEnable) {
+                String remoteAddr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
+                String user = heartbeatRequestHeader.getUsername();
+                String pass = heartbeatRequestHeader.getPasswd();
+                int requestCode = Integer.valueOf(heartbeatRequestHeader.getCode());
+                try {
+                    Acl.doAclCheckInHttpHeartbeat(remoteAddr, user, pass, sys, topic, requestCode);
+                } catch (Exception e) {
+                    //String errorMsg = String.format("CLIENT HAS NO PERMISSION,send failed, topic:%s, subsys:%s, realIp:%s", topic, subsys, realIp);
+
+                    responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse(
+                            heartbeatResponseHeader,
+                            SendMessageResponseBody.buildBody(EventMeshRetCode.EVENTMESH_ACL_ERR.getRetCode(), e.getMessage()));
+                    asyncContext.onComplete(responseEventMeshCommand);
+                    aclLogger.warn("CLIENT HAS NO PERMISSION,HeartBeatProcessor subscribe failed", e);
+                    return;
+                }
+            }
+
             if (StringUtils.isBlank(client.url)) {
                 continue;
             }
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncMessageProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncMessageProcessor.java
index cd3d4be..b79b587 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncMessageProcessor.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncMessageProcessor.java
@@ -34,6 +34,7 @@ import org.apache.eventmesh.common.protocol.http.common.EventMeshRetCode;
 import org.apache.eventmesh.common.protocol.http.common.RequestCode;
 import org.apache.eventmesh.common.protocol.http.header.message.SendMessageRequestHeader;
 import org.apache.eventmesh.common.protocol.http.header.message.SendMessageResponseHeader;
+import org.apache.eventmesh.runtime.acl.Acl;
 import org.apache.eventmesh.runtime.boot.EventMeshHTTPServer;
 import org.apache.eventmesh.runtime.constants.EventMeshConstants;
 import org.apache.eventmesh.runtime.core.protocol.http.async.AsyncContext;
@@ -54,6 +55,8 @@ public class SendAsyncMessageProcessor implements HttpRequestProcessor {
 
     public Logger cmdLogger = LoggerFactory.getLogger("cmd");
 
+    public Logger aclLogger = LoggerFactory.getLogger("acl");
+
     private EventMeshHTTPServer eventMeshHTTPServer;
 
     public SendAsyncMessageProcessor(EventMeshHTTPServer eventMeshHTTPServer) {
@@ -104,6 +107,28 @@ public class SendAsyncMessageProcessor implements HttpRequestProcessor {
             return;
         }
 
+        //do acl check
+        if(eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshServerSecurityEnable) {
+            String remoteAddr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
+            String user = sendMessageRequestHeader.getUsername();
+            String pass = sendMessageRequestHeader.getPasswd();
+            String subsystem = sendMessageRequestHeader.getSys();
+            int requestCode = Integer.valueOf(sendMessageRequestHeader.getCode());
+            String topic = sendMessageRequestBody.getTopic();
+            try {
+                Acl.doAclCheckInHttpSend(remoteAddr, user, pass, subsystem, topic, requestCode);
+            }catch (Exception e){
+                //String errorMsg = String.format("CLIENT HAS NO PERMISSION,send failed, topic:%s, subsys:%s, realIp:%s", topic, subsys, realIp);
+
+                responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse(
+                        sendMessageResponseHeader,
+                        SendMessageResponseBody.buildBody(EventMeshRetCode.EVENTMESH_ACL_ERR.getRetCode(), e.getMessage()));
+                asyncContext.onComplete(responseEventMeshCommand);
+                aclLogger.warn("CLIENT HAS NO PERMISSION,SendAsyncMessageProcessor send failed", e);
+                return;
+            }
+        }
+
         String producerGroup = sendMessageRequestBody.getProducerGroup();
         EventMeshProducer eventMeshProducer = eventMeshHTTPServer.getProducerManager().getEventMeshProducer(producerGroup);
 
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendSyncMessageProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendSyncMessageProcessor.java
index 5511e38..136961a 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendSyncMessageProcessor.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendSyncMessageProcessor.java
@@ -37,6 +37,7 @@ import org.apache.eventmesh.common.protocol.http.common.EventMeshRetCode;
 import org.apache.eventmesh.common.protocol.http.common.RequestCode;
 import org.apache.eventmesh.common.protocol.http.header.message.SendMessageRequestHeader;
 import org.apache.eventmesh.common.protocol.http.header.message.SendMessageResponseHeader;
+import org.apache.eventmesh.runtime.acl.Acl;
 import org.apache.eventmesh.runtime.boot.EventMeshHTTPServer;
 import org.apache.eventmesh.runtime.constants.EventMeshConstants;
 import org.apache.eventmesh.runtime.core.protocol.http.async.AsyncContext;
@@ -58,6 +59,8 @@ public class SendSyncMessageProcessor implements HttpRequestProcessor {
 
     public Logger httpLogger = LoggerFactory.getLogger("http");
 
+    public Logger aclLogger = LoggerFactory.getLogger("acl");
+
     private EventMeshHTTPServer eventMeshHTTPServer;
 
     public SendSyncMessageProcessor(EventMeshHTTPServer eventMeshHTTPServer) {
@@ -105,6 +108,28 @@ public class SendSyncMessageProcessor implements HttpRequestProcessor {
             return;
         }
 
+        //do acl check
+        if(eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshServerSecurityEnable) {
+            String remoteAddr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
+            String user = sendMessageRequestHeader.getUsername();
+            String pass = sendMessageRequestHeader.getPasswd();
+            String subsystem = sendMessageRequestHeader.getSys();
+            int requestCode = Integer.valueOf(sendMessageRequestHeader.getCode());
+            String topic = sendMessageRequestBody.getTopic();
+            try {
+                Acl.doAclCheckInHttpSend(remoteAddr, user, pass, subsystem, topic, requestCode);
+            }catch (Exception e){
+                //String errorMsg = String.format("CLIENT HAS NO PERMISSION,send failed, topic:%s, subsys:%s, realIp:%s", topic, subsys, realIp);
+
+                responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse(
+                        sendMessageResponseHeader,
+                        SendMessageResponseBody.buildBody(EventMeshRetCode.EVENTMESH_ACL_ERR.getRetCode(), e.getMessage()));
+                asyncContext.onComplete(responseEventMeshCommand);
+                aclLogger.warn("CLIENT HAS NO PERMISSION,SendSyncMessageProcessor send failed", e);
+                return;
+            }
+        }
+
         String producerGroup = sendMessageRequestBody.getProducerGroup();
         EventMeshProducer eventMeshProducer = eventMeshHTTPServer.getProducerManager().getEventMeshProducer(producerGroup);
 
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SubscribeProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SubscribeProcessor.java
index 1e8daf0..14e185e 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SubscribeProcessor.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SubscribeProcessor.java
@@ -33,10 +33,12 @@ import org.apache.eventmesh.common.command.HttpCommand;
 import org.apache.eventmesh.common.protocol.SubscriptionItem;
 import org.apache.eventmesh.common.protocol.http.body.client.SubscribeRequestBody;
 import org.apache.eventmesh.common.protocol.http.body.client.SubscribeResponseBody;
+import org.apache.eventmesh.common.protocol.http.body.message.SendMessageResponseBody;
 import org.apache.eventmesh.common.protocol.http.common.EventMeshRetCode;
 import org.apache.eventmesh.common.protocol.http.common.RequestCode;
 import org.apache.eventmesh.common.protocol.http.header.client.SubscribeRequestHeader;
 import org.apache.eventmesh.common.protocol.http.header.client.SubscribeResponseHeader;
+import org.apache.eventmesh.runtime.acl.Acl;
 import org.apache.eventmesh.runtime.boot.EventMeshHTTPServer;
 import org.apache.eventmesh.runtime.constants.EventMeshConstants;
 import org.apache.eventmesh.runtime.core.consumergroup.ConsumerGroupConf;
@@ -54,6 +56,8 @@ public class SubscribeProcessor implements HttpRequestProcessor {
 
     public Logger httpLogger = LoggerFactory.getLogger("http");
 
+    public Logger aclLogger = LoggerFactory.getLogger("acl");
+
     private EventMeshHTTPServer eventMeshHTTPServer;
 
     public SubscribeProcessor(EventMeshHTTPServer eventMeshHTTPServer) {
@@ -100,6 +104,29 @@ public class SubscribeProcessor implements HttpRequestProcessor {
         }
         List<SubscriptionItem> subTopicList = subscribeRequestBody.getTopics();
 
+        //do acl check
+        if(eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshServerSecurityEnable) {
+            String remoteAddr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
+            String user = subscribeRequestHeader.getUsername();
+            String pass = subscribeRequestHeader.getPasswd();
+            String subsystem = subscribeRequestHeader.getSys();
+            int requestCode = Integer.valueOf(subscribeRequestHeader.getCode());
+            for(SubscriptionItem item : subTopicList) {
+                try {
+                    Acl.doAclCheckInHttpReceive(remoteAddr, user, pass, subsystem, item.getTopic(), requestCode);
+                } catch (Exception e) {
+                    //String errorMsg = String.format("CLIENT HAS NO PERMISSION,send failed, topic:%s, subsys:%s, realIp:%s", topic, subsys, realIp);
+
+                    responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse(
+                            subscribeResponseHeader,
+                            SendMessageResponseBody.buildBody(EventMeshRetCode.EVENTMESH_ACL_ERR.getRetCode(), e.getMessage()));
+                    asyncContext.onComplete(responseEventMeshCommand);
+                    aclLogger.warn("CLIENT HAS NO PERMISSION,SubscribeProcessor subscribe failed", e);
+                    return;
+                }
+            }
+        }
+
         String url = subscribeRequestBody.getUrl();
         String consumerGroup = subscribeRequestBody.getConsumerGroup();
 
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/HeartBeatTask.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/HeartBeatTask.java
index eac1354..9e16ef2 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/HeartBeatTask.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/HeartBeatTask.java
@@ -17,6 +17,7 @@
 
 package org.apache.eventmesh.runtime.core.protocol.tcp.client.task;
 
+import static org.apache.eventmesh.common.protocol.tcp.Command.HEARTBEAT_REQUEST;
 import static org.apache.eventmesh.common.protocol.tcp.Command.HEARTBEAT_RESPONSE;
 
 import io.netty.channel.ChannelHandlerContext;
@@ -24,7 +25,9 @@ import io.netty.channel.ChannelHandlerContext;
 import org.apache.eventmesh.common.protocol.tcp.Header;
 import org.apache.eventmesh.common.protocol.tcp.OPStatus;
 import org.apache.eventmesh.common.protocol.tcp.Package;
+import org.apache.eventmesh.runtime.acl.Acl;
 import org.apache.eventmesh.runtime.boot.EventMeshTCPServer;
+import org.apache.eventmesh.runtime.util.RemotingHelper;
 import org.apache.eventmesh.runtime.util.Utils;
 
 public class HeartBeatTask extends AbstractTask {
@@ -38,6 +41,12 @@ public class HeartBeatTask extends AbstractTask {
         long taskExecuteTime = System.currentTimeMillis();
         Package res = new Package();
         try {
+            //do acl check in heartbeat
+            if(eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshServerSecurityEnable){
+                String remoteAddr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
+                Acl.doAclCheckInTcpHeartbeat(remoteAddr, session.getClient(), HEARTBEAT_REQUEST.value());
+            }
+
             if (session != null) {
                 session.notifyHeartbeat(startTime);
             }
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/HelloTask.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/HelloTask.java
index 161b1d7..df4a606 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/HelloTask.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/HelloTask.java
@@ -17,6 +17,7 @@
 
 package org.apache.eventmesh.runtime.core.protocol.tcp.client.task;
 
+import static org.apache.eventmesh.common.protocol.tcp.Command.HELLO_REQUEST;
 import static org.apache.eventmesh.common.protocol.tcp.Command.HELLO_RESPONSE;
 
 import io.netty.channel.ChannelFuture;
@@ -28,10 +29,12 @@ import org.apache.eventmesh.common.protocol.tcp.Header;
 import org.apache.eventmesh.common.protocol.tcp.OPStatus;
 import org.apache.eventmesh.common.protocol.tcp.Package;
 import org.apache.eventmesh.common.protocol.tcp.UserAgent;
+import org.apache.eventmesh.runtime.acl.Acl;
 import org.apache.eventmesh.runtime.boot.EventMeshTCPServer;
 import org.apache.eventmesh.runtime.common.ServiceState;
 import org.apache.eventmesh.runtime.constants.EventMeshConstants;
 import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.Session;
+import org.apache.eventmesh.runtime.util.RemotingHelper;
 import org.apache.eventmesh.runtime.util.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -51,6 +54,12 @@ public class HelloTask extends AbstractTask {
         Session session = null;
         UserAgent user = (UserAgent) pkg.getBody();
         try {
+            //do acl check in connect
+            if(eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshServerSecurityEnable){
+                String remoteAddr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
+                Acl.doAclCheckInTcpConnect(remoteAddr, user, HELLO_REQUEST.value());
+            }
+
             if (eventMeshTCPServer.getEventMeshServer().getServiceState() != ServiceState.RUNNING) {
                 logger.error("server state is not running:{}", eventMeshTCPServer.getEventMeshServer().getServiceState());
                 throw new Exception("server state is not running, maybe deploying...");
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/MessageTransferTask.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/MessageTransferTask.java
index 7ef1b8f..87eeed0 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/MessageTransferTask.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/MessageTransferTask.java
@@ -34,11 +34,13 @@ import org.apache.eventmesh.common.protocol.tcp.EventMeshMessage;
 import org.apache.eventmesh.common.protocol.tcp.Header;
 import org.apache.eventmesh.common.protocol.tcp.OPStatus;
 import org.apache.eventmesh.common.protocol.tcp.Package;
+import org.apache.eventmesh.runtime.acl.Acl;
 import org.apache.eventmesh.runtime.boot.EventMeshTCPServer;
 import org.apache.eventmesh.runtime.constants.EventMeshConstants;
 import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.send.EventMeshTcpSendResult;
 import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.send.EventMeshTcpSendStatus;
 import org.apache.eventmesh.runtime.util.EventMeshUtil;
+import org.apache.eventmesh.runtime.util.RemotingHelper;
 import org.apache.eventmesh.runtime.util.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -67,6 +69,12 @@ public class MessageTransferTask extends AbstractTask {
                 throw new Exception("eventMeshMessage is null");
             }
 
+            //do acl check in sending msg
+            if(eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshServerSecurityEnable){
+                String remoteAddr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
+                Acl.doAclCheckInTcpSend(remoteAddr, session.getClient(), eventMeshMessage.getTopic(), cmd.value());
+            }
+
             if (eventMeshTCPServer.getRateLimiter().tryAcquire(TRY_PERMIT_TIME_OUT, TimeUnit.MILLISECONDS)) {
                 synchronized (session) {
                     long sendTime = System.currentTimeMillis();
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/SubscribeTask.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/SubscribeTask.java
index ccacf56..c48a881 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/SubscribeTask.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/SubscribeTask.java
@@ -26,7 +26,9 @@ import org.apache.eventmesh.common.protocol.tcp.Subscription;
 import org.apache.eventmesh.common.protocol.SubscriptionItem;
 import org.apache.eventmesh.common.protocol.tcp.*;
 import org.apache.eventmesh.common.protocol.tcp.Package;
+import org.apache.eventmesh.runtime.acl.Acl;
 import org.apache.eventmesh.runtime.boot.EventMeshTCPServer;
+import org.apache.eventmesh.runtime.util.RemotingHelper;
 import org.apache.eventmesh.runtime.util.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -52,6 +54,13 @@ public class SubscribeTask extends AbstractTask {
             List<SubscriptionItem> subscriptionItems = new ArrayList<>();
             for (int i = 0; i < subscriptionInfo.getTopicList().size(); i++) {
                 SubscriptionItem item = subscriptionInfo.getTopicList().get(i);
+
+                //do acl check for receive msg
+                if(eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshServerSecurityEnable){
+                    String remoteAddr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
+                    Acl.doAclCheckInTcpReceive(remoteAddr, session.getClient(), item.getTopic(), Command.SUBSCRIBE_REQUEST.value());
+                }
+
                 subscriptionItems.add(item);
             }
             synchronized (session) {
diff --git a/eventmesh-runtime/build.gradle b/eventmesh-security-plugin/build.gradle
similarity index 54%
copy from eventmesh-runtime/build.gradle
copy to eventmesh-security-plugin/build.gradle
index c5c141a..dd414e3 100644
--- a/eventmesh-runtime/build.gradle
+++ b/eventmesh-security-plugin/build.gradle
@@ -15,8 +15,23 @@
  * limitations under the License.
  */
 
-dependencies {
-    implementation project(":eventmesh-connector-plugin:eventmesh-connector-api")
-
-    testImplementation project(":eventmesh-connector-plugin:eventmesh-connector-api")
-}
+task copyAclPlugin(dependsOn: ['jar']) {
+    doFirst {
+        new File(projectDir, '../eventmesh-security-plugin/dist/apps').mkdir()
+        new File(projectDir, '../dist/plugin/security').mkdirs()
+    }
+    doLast {
+        copy {
+            into('../eventmesh-security-plugin/dist/apps/')
+            from project.jar.getArchivePath()
+            exclude {
+                "eventmesh-security-plugin-${version}.jar"
+                "eventmesh-security-api-${version}.jar"
+            }
+        }
+        copy {
+            into '../dist/plugin/security'
+            from "../eventmesh-security-plugin/dist/apps/eventmesh-security-acl-${version}.jar"
+        }
+    }
+}
\ No newline at end of file
diff --git a/eventmesh-runtime/build.gradle b/eventmesh-security-plugin/eventmesh-security-acl/build.gradle
similarity index 83%
copy from eventmesh-runtime/build.gradle
copy to eventmesh-security-plugin/eventmesh-security-acl/build.gradle
index c5c141a..1a18796 100644
--- a/eventmesh-runtime/build.gradle
+++ b/eventmesh-security-plugin/eventmesh-security-acl/build.gradle
@@ -16,7 +16,7 @@
  */
 
 dependencies {
-    implementation project(":eventmesh-connector-plugin:eventmesh-connector-api")
+    api project(":eventmesh-security-plugin:eventmesh-security-api")
 
-    testImplementation project(":eventmesh-connector-plugin:eventmesh-connector-api")
+    testImplementation project(":eventmesh-security-plugin:eventmesh-security-api")
 }
diff --git a/eventmesh-security-plugin/eventmesh-security-acl/src/main/java/org/apache/eventmesh/acl/impl/AclServiceImpl.java b/eventmesh-security-plugin/eventmesh-security-acl/src/main/java/org/apache/eventmesh/acl/impl/AclServiceImpl.java
new file mode 100644
index 0000000..8d73bc7
--- /dev/null
+++ b/eventmesh-security-plugin/eventmesh-security-acl/src/main/java/org/apache/eventmesh/acl/impl/AclServiceImpl.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.acl.impl;
+
+import org.apache.eventmesh.api.acl.AclService;
+import org.apache.eventmesh.api.exception.AclException;
+
+import java.util.Properties;
+
+public class AclServiceImpl implements AclService {
+    @Override
+    public void init() throws AclException {
+        //TODO
+    }
+
+    @Override
+    public void start() throws AclException {
+        //TODO
+    }
+
+    @Override
+    public void shutdown() throws AclException {
+        //TODO
+    }
+
+    @Override
+    public void doAclCheckInConnect(Properties aclProperties) throws AclException {
+        //TODO
+    }
+
+    @Override
+    public void doAclCheckInHeartbeat(Properties aclProperties) throws AclException {
+        //TODO
+    }
+
+    @Override
+    public void doAclCheckInSend(Properties aclProperties) throws AclException {
+        //TODO
+    }
+
+    @Override
+    public void doAclCheckInReceive(Properties aclProperties) throws AclException {
+        //TODO
+    }
+}
diff --git a/eventmesh-common/src/test/resources/configuration.properties b/eventmesh-security-plugin/eventmesh-security-acl/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.api.acl.AclService
similarity index 78%
copy from eventmesh-common/src/test/resources/configuration.properties
copy to eventmesh-security-plugin/eventmesh-security-acl/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.api.acl.AclService
index 76f29f2..b883bd8 100644
--- a/eventmesh-common/src/test/resources/configuration.properties
+++ b/eventmesh-security-plugin/eventmesh-security-acl/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.api.acl.AclService
@@ -1,4 +1,3 @@
-#
 # Licensed to the Apache Software Foundation (ASF) under one or more
 # contributor license agreements.  See the NOTICE file distributed with
 # this work for additional information regarding copyright ownership.
@@ -13,12 +12,5 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-#
 
-eventMesh.server.env=value1
-eventMesh.server.idc=value2
-eventMesh.sysid=3
-eventMesh.server.cluster=value4
-eventMesh.server.name=value5
-eventMesh.server.hostIp=value6
-eventMesh.connector.plugin.type=rocketmq
+acl=org.apache.eventmesh.acl.impl.AclServiceImpl
\ No newline at end of file
diff --git a/eventmesh-runtime/build.gradle b/eventmesh-security-plugin/eventmesh-security-api/build.gradle
similarity index 83%
copy from eventmesh-runtime/build.gradle
copy to eventmesh-security-plugin/eventmesh-security-api/build.gradle
index c5c141a..0d41042 100644
--- a/eventmesh-runtime/build.gradle
+++ b/eventmesh-security-plugin/eventmesh-security-api/build.gradle
@@ -16,7 +16,7 @@
  */
 
 dependencies {
-    implementation project(":eventmesh-connector-plugin:eventmesh-connector-api")
+    api project(":eventmesh-spi")
 
-    testImplementation project(":eventmesh-connector-plugin:eventmesh-connector-api")
+    testImplementation project(":eventmesh-spi")
 }
diff --git a/eventmesh-runtime/build.gradle b/eventmesh-security-plugin/eventmesh-security-api/src/main/java/org/apache/eventmesh/api/acl/AclPropertyKeys.java
similarity index 67%
copy from eventmesh-runtime/build.gradle
copy to eventmesh-security-plugin/eventmesh-security-api/src/main/java/org/apache/eventmesh/api/acl/AclPropertyKeys.java
index c5c141a..eb5514e 100644
--- a/eventmesh-runtime/build.gradle
+++ b/eventmesh-security-plugin/eventmesh-security-api/src/main/java/org/apache/eventmesh/api/acl/AclPropertyKeys.java
@@ -15,8 +15,13 @@
  * limitations under the License.
  */
 
-dependencies {
-    implementation project(":eventmesh-connector-plugin:eventmesh-connector-api")
+package org.apache.eventmesh.api.acl;
 
-    testImplementation project(":eventmesh-connector-plugin:eventmesh-connector-api")
+public class AclPropertyKeys {
+    public static final String CLIENT_IP = "clientIp";
+    public static final String USER = "user";
+    public static final String PASSWORD = "pwd";
+    public static final String SUBSYSTEM = "subsystem";
+    public static final String TOPIC = "topic";
+    public static final String REQUEST_CODE = "requestCode";
 }
diff --git a/eventmesh-runtime/build.gradle b/eventmesh-security-plugin/eventmesh-security-api/src/main/java/org/apache/eventmesh/api/acl/AclService.java
similarity index 54%
copy from eventmesh-runtime/build.gradle
copy to eventmesh-security-plugin/eventmesh-security-api/src/main/java/org/apache/eventmesh/api/acl/AclService.java
index c5c141a..3502e90 100644
--- a/eventmesh-runtime/build.gradle
+++ b/eventmesh-security-plugin/eventmesh-security-api/src/main/java/org/apache/eventmesh/api/acl/AclService.java
@@ -14,9 +14,27 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.eventmesh.api.acl;
 
-dependencies {
-    implementation project(":eventmesh-connector-plugin:eventmesh-connector-api")
+import org.apache.eventmesh.api.exception.AclException;
+import org.apache.eventmesh.spi.EventMeshSPI;
+
+import java.util.Properties;
+
+@EventMeshSPI(isSingleton = true)
+public interface AclService {
+    void init() throws AclException;
+
+    void start() throws AclException;
+
+    void shutdown() throws AclException;
+
+    void doAclCheckInConnect(Properties aclProperties) throws AclException;
+
+    void doAclCheckInHeartbeat(Properties aclProperties) throws AclException;
+
+    void doAclCheckInSend(Properties aclProperties) throws AclException;
+
+    void doAclCheckInReceive(Properties aclProperties) throws AclException;
 
-    testImplementation project(":eventmesh-connector-plugin:eventmesh-connector-api")
 }
diff --git a/eventmesh-runtime/build.gradle b/eventmesh-security-plugin/eventmesh-security-api/src/main/java/org/apache/eventmesh/api/exception/AclException.java
similarity index 75%
copy from eventmesh-runtime/build.gradle
copy to eventmesh-security-plugin/eventmesh-security-api/src/main/java/org/apache/eventmesh/api/exception/AclException.java
index c5c141a..389bf8a 100644
--- a/eventmesh-runtime/build.gradle
+++ b/eventmesh-security-plugin/eventmesh-security-api/src/main/java/org/apache/eventmesh/api/exception/AclException.java
@@ -14,9 +14,15 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.eventmesh.api.exception;
 
-dependencies {
-    implementation project(":eventmesh-connector-plugin:eventmesh-connector-api")
+public class AclException extends RuntimeException {
 
-    testImplementation project(":eventmesh-connector-plugin:eventmesh-connector-api")
+    public AclException(String message) {
+        super(message);
+    }
+
+    public AclException(String message, Throwable cause) {
+        super(message, cause);
+    }
 }
diff --git a/settings.gradle b/settings.gradle
index 7c0e70a..145df7e 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -25,4 +25,6 @@ include 'eventmesh-test'
 include 'eventmesh-spi'
 include 'eventmesh-connector-plugin:eventmesh-connector-api'
 include 'eventmesh-connector-plugin:eventmesh-connector-rocketmq'
+include 'eventmesh-security-plugin:eventmesh-security-api'
+include 'eventmesh-security-plugin:eventmesh-security-acl'
 

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org