You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by ch...@apache.org on 2021/08/06 02:16:19 UTC
[incubator-eventmesh] branch develop updated: [ISSUE #484] Support
access control for EventMesh (#485)
This is an automated email from the ASF dual-hosted git repository.
chenguangsheng pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git
The following commit(s) were added to refs/heads/develop by this push:
new d5ac681 [ISSUE #484] Support access control for EventMesh (#485)
d5ac681 is described below
commit d5ac681323078d86d73d7a2ecfcf9dfb561b438f
Author: lrhkobe <34...@users.noreply.github.com>
AuthorDate: Fri Aug 6 10:16:13 2021 +0800
[ISSUE #484] Support access control for EventMesh (#485)
* modify:optimize flow control in downstreaming msg
* modify:optimize stategy of selecting session in downstream msg
* modify:optimize msg downstream,msg store in session
* modify:fix bug:not a @Sharable handler
* modify:downstream broadcast msg asynchronously
* modify:remove unneccessary interface in eventmesh-connector-api
* modify:fix conflict
* modify:add license in EventMeshAction
* modify:fix ack problem
* modify:fix exception handle when exception occured in EventMeshTcpMessageDispatcher
* modify:fix log print
* modify:add acl module in eventmesh
* modify:add eventmesh-acl-impl denpendency in eventmesh-runtime
* modify:reactor security module name
* modify:reactor filed name and config,fix unit test problem
---
.../common/config/CommonConfiguration.java | 26 ++---
.../protocol/http/common/EventMeshRetCode.java | 3 +-
.../src/test/resources/configuration.properties | 1 +
eventmesh-runtime/build.gradle | 4 +
eventmesh-runtime/conf/eventmesh.properties | 6 +-
eventmesh-runtime/conf/log4j2.xml | 4 +
.../java/org/apache/eventmesh/runtime/acl/Acl.java | 115 +++++++++++++++++++++
.../eventmesh/runtime/boot/EventMeshServer.java | 16 +++
.../http/processor/BatchSendMessageProcessor.java | 27 ++++-
.../processor/BatchSendMessageV2Processor.java | 26 +++++
.../http/processor/HeartBeatProcessor.java | 24 +++++
.../http/processor/SendAsyncMessageProcessor.java | 25 +++++
.../http/processor/SendSyncMessageProcessor.java | 25 +++++
.../http/processor/SubscribeProcessor.java | 27 +++++
.../protocol/tcp/client/task/HeartBeatTask.java | 9 ++
.../core/protocol/tcp/client/task/HelloTask.java | 9 ++
.../tcp/client/task/MessageTransferTask.java | 8 ++
.../protocol/tcp/client/task/SubscribeTask.java | 9 ++
.../build.gradle | 25 ++++-
.../eventmesh-security-acl}/build.gradle | 4 +-
.../apache/eventmesh/acl/impl/AclServiceImpl.java | 60 +++++++++++
.../org.apache.eventmesh.api.acl.AclService | 10 +-
.../eventmesh-security-api}/build.gradle | 4 +-
.../apache/eventmesh/api/acl/AclPropertyKeys.java | 11 +-
.../org/apache/eventmesh/api/acl/AclService.java | 24 ++++-
.../eventmesh/api/exception/AclException.java | 12 ++-
settings.gradle | 2 +
27 files changed, 474 insertions(+), 42 deletions(-)
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/CommonConfiguration.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/CommonConfiguration.java
index 321ebe0..9af0024 100644
--- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/CommonConfiguration.java
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/CommonConfiguration.java
@@ -29,23 +29,13 @@ public class CommonConfiguration {
public String eventMeshName = "";
public String sysID = "5477";
public String eventMeshConnectorPluginType = "rocketmq";
+ public String eventMeshSecurityPluginType = "security";
public String namesrvAddr = "";
- public String clientUserName = "username";
- public String clientPass = "user@123";
- public Integer consumeThreadMin = 2;
- public Integer consumeThreadMax = 2;
- public Integer consumeQueueSize = 10000;
- public Integer pullBatchSize = 32;
- public Integer ackWindow = 1000;
- public Integer pubWindow = 100;
- public long consumeTimeout = 0L;
- public Integer pollNameServerInteval = 10 * 1000;
- public Integer heartbeatBrokerInterval = 30 * 1000;
- public Integer rebalanceInterval = 20 * 1000;
public Integer eventMeshRegisterIntervalInMills = 10 * 1000;
public Integer eventMeshFetchRegistryAddrInterval = 10 * 1000;
public String eventMeshServerIp = null;
+ public boolean eventMeshServerSecurityEnable = false;
protected ConfigurationWrapper configurationWrapper;
public CommonConfiguration(ConfigurationWrapper configurationWrapper) {
@@ -82,6 +72,14 @@ public class CommonConfiguration {
eventMeshConnectorPluginType = configurationWrapper.getProp(ConfKeys.KEYS_ENENTMESH_CONNECTOR_PLUGIN_TYPE);
Preconditions.checkState(StringUtils.isNotEmpty(eventMeshConnectorPluginType), String.format("%s error", ConfKeys.KEYS_ENENTMESH_CONNECTOR_PLUGIN_TYPE));
+
+ String eventMeshServerAclEnableStr = configurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_SECURITY_ENABLED);
+ if (StringUtils.isNotBlank(eventMeshServerAclEnableStr)) {
+ eventMeshServerSecurityEnable = Boolean.valueOf(StringUtils.deleteWhitespace(eventMeshServerAclEnableStr));
+ }
+
+ eventMeshSecurityPluginType = configurationWrapper.getProp(ConfKeys.KEYS_ENENTMESH_SECURITY_PLUGIN_TYPE);
+ Preconditions.checkState(StringUtils.isNotEmpty(eventMeshSecurityPluginType), String.format("%s error", ConfKeys.KEYS_ENENTMESH_SECURITY_PLUGIN_TYPE));
}
}
@@ -103,5 +101,9 @@ public class CommonConfiguration {
public static String KEYS_EVENTMESH_SERVER_FETCH_REGISTRY_ADDR_INTERVAL = "eventMesh.server.registry.fetchRegistryAddrIntervalInMills";
public static String KEYS_ENENTMESH_CONNECTOR_PLUGIN_TYPE = "eventMesh.connector.plugin.type";
+
+ public static String KEYS_EVENTMESH_SECURITY_ENABLED = "eventMesh.server.security.enabled";
+
+ public static String KEYS_ENENTMESH_SECURITY_PLUGIN_TYPE = "eventMesh.security.plugin.type";
}
}
\ No newline at end of file
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/common/EventMeshRetCode.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/common/EventMeshRetCode.java
index 3593e0e..7795507 100644
--- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/common/EventMeshRetCode.java
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/common/EventMeshRetCode.java
@@ -38,7 +38,8 @@ public enum EventMeshRetCode {
EVENTMESH_RUNTIME_ERR(16, "eventMesh runtime err, "),
EVENTMESH_SUBSCRIBE_ERR(17, "eventMesh subscribe err"),
EVENTMESH_UNSUBSCRIBE_ERR(18, "eventMesh unsubscribe err"),
- EVENTMESH_HEARTBEAT_ERR(19, "eventMesh heartbeat err");
+ EVENTMESH_HEARTBEAT_ERR(19, "eventMesh heartbeat err"),
+ EVENTMESH_ACL_ERR(20, "eventMesh acl err");
private Integer retCode;
diff --git a/eventmesh-common/src/test/resources/configuration.properties b/eventmesh-common/src/test/resources/configuration.properties
index 76f29f2..c9425d7 100644
--- a/eventmesh-common/src/test/resources/configuration.properties
+++ b/eventmesh-common/src/test/resources/configuration.properties
@@ -22,3 +22,4 @@ eventMesh.server.cluster=value4
eventMesh.server.name=value5
eventMesh.server.hostIp=value6
eventMesh.connector.plugin.type=rocketmq
+eventMesh.security.plugin.type=security
diff --git a/eventmesh-runtime/build.gradle b/eventmesh-runtime/build.gradle
index c5c141a..6a82a55 100644
--- a/eventmesh-runtime/build.gradle
+++ b/eventmesh-runtime/build.gradle
@@ -17,6 +17,10 @@
dependencies {
implementation project(":eventmesh-connector-plugin:eventmesh-connector-api")
+ implementation project(":eventmesh-security-plugin:eventmesh-security-api")
+ implementation project(":eventmesh-security-plugin:eventmesh-security-acl")
testImplementation project(":eventmesh-connector-plugin:eventmesh-connector-api")
+ testImplementation project(":eventmesh-security-plugin:eventmesh-security-api")
+ testImplementation project(":eventmesh-security-plugin:eventmesh-security-acl")
}
diff --git a/eventmesh-runtime/conf/eventmesh.properties b/eventmesh-runtime/conf/eventmesh.properties
index 927fc31..a9001be 100644
--- a/eventmesh-runtime/conf/eventmesh.properties
+++ b/eventmesh-runtime/conf/eventmesh.properties
@@ -56,4 +56,8 @@ eventMesh.server.registry.fetchRegistryAddrIntervalInMills=20000
#eventMesh.server.defibus.client.comsumeTimeoutInMin=5
#connector plugin
-eventMesh.connector.plugin.type=rocketmq
\ No newline at end of file
+eventMesh.connector.plugin.type=rocketmq
+
+#security plugin
+eventMesh.server.security.enabled=false
+eventMesh.security.plugin.type=security
\ No newline at end of file
diff --git a/eventmesh-runtime/conf/log4j2.xml b/eventmesh-runtime/conf/log4j2.xml
index 8495181..756abea 100644
--- a/eventmesh-runtime/conf/log4j2.xml
+++ b/eventmesh-runtime/conf/log4j2.xml
@@ -64,6 +64,10 @@
<AppenderRef ref="console"/>
</AsyncLogger>
+ <AsyncLogger name="acl" level="debug" additivity="false" includeLocation="true">
+ <AppenderRef ref="console"/>
+ </AsyncLogger>
+
<AsyncLogger name="org.apache.eventmesh.runtime" level="debug" additivity="false" includeLocation="true">
<AppenderRef ref="console"/>
</AsyncLogger>
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/acl/Acl.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/acl/Acl.java
new file mode 100644
index 0000000..cb773f4
--- /dev/null
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/acl/Acl.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eventmesh.runtime.acl;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.eventmesh.api.acl.AclPropertyKeys;
+import org.apache.eventmesh.api.acl.AclService;
+import org.apache.eventmesh.api.exception.AclException;
+import org.apache.eventmesh.api.producer.MeshMQProducer;
+import org.apache.eventmesh.common.protocol.tcp.UserAgent;
+import org.apache.eventmesh.spi.EventMeshExtensionFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Properties;
+import java.util.ServiceLoader;
+
+public class Acl {
+ private static final Logger logger = LoggerFactory.getLogger(Acl.class);
+ private static AclService aclService;
+
+ public void init(String aclPluginType) throws AclException{
+// aclService = getSpiAclService();
+ aclService = EventMeshExtensionFactory.getExtension(AclService.class, aclPluginType);
+ if (aclService == null) {
+ logger.error("can't load the aclService plugin, please check.");
+ throw new RuntimeException("doesn't load the aclService plugin, please check.");
+ }
+ aclService.init();
+ }
+
+ public void start() throws AclException{
+ aclService.start();
+ }
+
+ public void shutdown() throws AclException{
+ aclService.shutdown();
+ }
+
+ public static void doAclCheckInTcpConnect(String remoteAddr, UserAgent userAgent, int requestCode) throws AclException{
+ aclService.doAclCheckInConnect(buildTcpAclProperties(remoteAddr, userAgent, null, requestCode));
+ }
+
+ public static void doAclCheckInTcpHeartbeat(String remoteAddr, UserAgent userAgent, int requestCode) throws AclException{
+ aclService.doAclCheckInHeartbeat(buildTcpAclProperties(remoteAddr, userAgent, null, requestCode));
+ }
+
+ public static void doAclCheckInTcpSend(String remoteAddr, UserAgent userAgent, String topic, int requestCode) throws AclException{
+ aclService.doAclCheckInSend(buildTcpAclProperties(remoteAddr, userAgent, topic, requestCode));
+ }
+
+ public static void doAclCheckInTcpReceive(String remoteAddr, UserAgent userAgent, String topic, int requestCode) throws AclException{
+ aclService.doAclCheckInReceive(buildTcpAclProperties(remoteAddr, userAgent, topic, requestCode));
+ }
+
+ private static Properties buildTcpAclProperties(String remoteAddr, UserAgent userAgent, String topic, int requestCode){
+ Properties aclProperties = new Properties();
+ aclProperties.put(AclPropertyKeys.CLIENT_IP, remoteAddr);
+ aclProperties.put(AclPropertyKeys.USER, userAgent.getUsername());
+ aclProperties.put(AclPropertyKeys.PASSWORD, userAgent.getPassword());
+ aclProperties.put(AclPropertyKeys.SUBSYSTEM, userAgent.getSubsystem());
+ aclProperties.put(AclPropertyKeys.REQUEST_CODE, requestCode);
+ if(StringUtils.isNotBlank(topic)) {
+ aclProperties.put(AclPropertyKeys.TOPIC, topic);
+ }
+ return aclProperties;
+ }
+
+ public static void doAclCheckInHttpSend(String remoteAddr, String user, String pass,String subsystem, String topic, int requestCode) throws AclException{
+ aclService.doAclCheckInSend(buildHttpAclProperties(remoteAddr, user, pass, subsystem, topic, requestCode));
+ }
+
+ public static void doAclCheckInHttpReceive(String remoteAddr, String user, String pass,String subsystem, String topic, int requestCode) throws AclException{
+ aclService.doAclCheckInReceive(buildHttpAclProperties(remoteAddr, user, pass, subsystem, topic, requestCode));
+ }
+
+ public static void doAclCheckInHttpHeartbeat(String remoteAddr, String user, String pass,String subsystem, String topic, int requestCode) throws AclException{
+ aclService.doAclCheckInHeartbeat(buildHttpAclProperties(remoteAddr, user, pass, subsystem, topic, requestCode));
+ }
+
+ private static Properties buildHttpAclProperties(String remoteAddr, String user, String pass,String subsystem, String topic, int requestCode){
+ Properties aclProperties = new Properties();
+ aclProperties.put(AclPropertyKeys.CLIENT_IP, remoteAddr);
+ aclProperties.put(AclPropertyKeys.USER, user);
+ aclProperties.put(AclPropertyKeys.PASSWORD, pass);
+ aclProperties.put(AclPropertyKeys.SUBSYSTEM, subsystem);
+ aclProperties.put(AclPropertyKeys.REQUEST_CODE, requestCode);
+ if(StringUtils.isNotBlank(topic)) {
+ aclProperties.put(AclPropertyKeys.TOPIC, topic);
+ }
+ return aclProperties;
+ }
+
+ private AclService getSpiAclService() {
+ ServiceLoader<AclService> serviceLoader = ServiceLoader.load(AclService.class);
+ if (serviceLoader.iterator().hasNext()) {
+ return serviceLoader.iterator().next();
+ }
+ return null;
+ }
+}
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshServer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshServer.java
index 92684d0..615731d 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshServer.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshServer.java
@@ -17,6 +17,7 @@
package org.apache.eventmesh.runtime.boot;
+import org.apache.eventmesh.runtime.acl.Acl;
import org.apache.eventmesh.runtime.common.ServiceState;
import org.apache.eventmesh.runtime.configuration.EventMeshHTTPConfiguration;
import org.apache.eventmesh.runtime.configuration.EventMeshTCPConfiguration;
@@ -36,15 +37,22 @@ public class EventMeshServer {
private EventMeshTCPConfiguration eventMeshTCPConfiguration;
+ private Acl acl;
+
private ServiceState serviceState;
public EventMeshServer(EventMeshHTTPConfiguration eventMeshHttpConfiguration,
EventMeshTCPConfiguration eventMeshTCPConfiguration) {
this.eventMeshHttpConfiguration = eventMeshHttpConfiguration;
this.eventMeshTCPConfiguration = eventMeshTCPConfiguration;
+ this.acl = new Acl();
}
public void init() throws Exception {
+ if(eventMeshHttpConfiguration != null && eventMeshHttpConfiguration.eventMeshServerSecurityEnable){
+ acl.init(eventMeshHttpConfiguration.eventMeshSecurityPluginType);
+ }
+
eventMeshHTTPServer = new EventMeshHTTPServer(this, eventMeshHttpConfiguration);
eventMeshHTTPServer.init();
eventMeshTCPServer = new EventMeshTCPServer(this, eventMeshTCPConfiguration);
@@ -60,6 +68,10 @@ public class EventMeshServer {
}
public void start() throws Exception {
+ if(eventMeshHttpConfiguration != null && eventMeshHttpConfiguration.eventMeshServerSecurityEnable){
+ acl.start();
+ }
+
eventMeshHTTPServer.start();
if (eventMeshTCPConfiguration != null && eventMeshTCPConfiguration.eventMeshTcpServerEnabled) {
eventMeshTCPServer.start();
@@ -75,6 +87,10 @@ public class EventMeshServer {
if (eventMeshTCPConfiguration != null && eventMeshTCPConfiguration.eventMeshTcpServerEnabled) {
eventMeshTCPServer.shutdown();
}
+
+ if(eventMeshHttpConfiguration != null && eventMeshHttpConfiguration.eventMeshServerSecurityEnable){
+ acl.shutdown();
+ }
serviceState = ServiceState.STOPED;
logger.info("server state:{}", serviceState);
}
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/BatchSendMessageProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/BatchSendMessageProcessor.java
index dacb22f..cc7ea48 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/BatchSendMessageProcessor.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/BatchSendMessageProcessor.java
@@ -36,17 +36,18 @@ import org.apache.eventmesh.common.IPUtil;
import org.apache.eventmesh.common.command.HttpCommand;
import org.apache.eventmesh.common.protocol.http.body.message.SendMessageBatchRequestBody;
import org.apache.eventmesh.common.protocol.http.body.message.SendMessageBatchResponseBody;
+import org.apache.eventmesh.common.protocol.http.body.message.SendMessageResponseBody;
import org.apache.eventmesh.common.protocol.http.common.EventMeshRetCode;
import org.apache.eventmesh.common.protocol.http.common.RequestCode;
import org.apache.eventmesh.common.protocol.http.header.message.SendMessageBatchRequestHeader;
import org.apache.eventmesh.common.protocol.http.header.message.SendMessageBatchResponseHeader;
+import org.apache.eventmesh.runtime.acl.Acl;
import org.apache.eventmesh.runtime.boot.EventMeshHTTPServer;
import org.apache.eventmesh.runtime.constants.EventMeshConstants;
import org.apache.eventmesh.runtime.core.protocol.http.async.AsyncContext;
import org.apache.eventmesh.runtime.core.protocol.http.processor.inf.HttpRequestProcessor;
import org.apache.eventmesh.runtime.core.protocol.http.producer.EventMeshProducer;
import org.apache.eventmesh.runtime.core.protocol.http.producer.SendMessageContext;
-import org.apache.eventmesh.runtime.util.EventMeshUtil;
import org.apache.eventmesh.runtime.util.RemotingHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -55,6 +56,8 @@ public class BatchSendMessageProcessor implements HttpRequestProcessor {
public Logger cmdLogger = LoggerFactory.getLogger("cmd");
+ public Logger aclLogger = LoggerFactory.getLogger("acl");
+
private EventMeshHTTPServer eventMeshHTTPServer;
public BatchSendMessageProcessor(EventMeshHTTPServer eventMeshHTTPServer) {
@@ -128,6 +131,12 @@ public class BatchSendMessageProcessor implements HttpRequestProcessor {
long batchStartTime = System.currentTimeMillis();
+ String remoteAddr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
+ String user = sendMessageBatchRequestHeader.getUsername();
+ String pass = sendMessageBatchRequestHeader.getPasswd();
+ String subsystem = sendMessageBatchRequestHeader.getSys();
+ int requestCode = Integer.valueOf(sendMessageBatchRequestHeader.getCode());
+
List<Message> msgList = new ArrayList<>();
Map<String, List<Message>> topicBatchMessageMappings = new ConcurrentHashMap<String, List<Message>>();
for (SendMessageBatchRequestBody.BatchMessageEntity msg : sendMessageBatchRequestBody.getContents()) {
@@ -136,6 +145,22 @@ public class BatchSendMessageProcessor implements HttpRequestProcessor {
continue;
}
+ //do acl check
+ if(eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshServerSecurityEnable) {
+ try {
+ Acl.doAclCheckInHttpSend(remoteAddr, user, pass, subsystem, msg.topic, requestCode);
+ }catch (Exception e){
+ //String errorMsg = String.format("CLIENT HAS NO PERMISSION,send failed, topic:%s, subsys:%s, realIp:%s", topic, subsys, realIp);
+
+ responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse(
+ sendMessageBatchResponseHeader,
+ SendMessageResponseBody.buildBody(EventMeshRetCode.EVENTMESH_ACL_ERR.getRetCode(), e.getMessage()));
+ asyncContext.onComplete(responseEventMeshCommand);
+ aclLogger.warn("CLIENT HAS NO PERMISSION,BatchSendMessageProcessor send failed", e);
+ return;
+ }
+ }
+
if (StringUtils.isBlank(msg.ttl) || !StringUtils.isNumeric(msg.ttl)) {
msg.ttl = String.valueOf(EventMeshConstants.DEFAULT_MSG_TTL_MILLS);
}
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/BatchSendMessageV2Processor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/BatchSendMessageV2Processor.java
index 8d5f7b5..07a2dbe 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/BatchSendMessageV2Processor.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/BatchSendMessageV2Processor.java
@@ -31,10 +31,12 @@ import org.apache.eventmesh.common.IPUtil;
import org.apache.eventmesh.common.command.HttpCommand;
import org.apache.eventmesh.common.protocol.http.body.message.SendMessageBatchV2RequestBody;
import org.apache.eventmesh.common.protocol.http.body.message.SendMessageBatchV2ResponseBody;
+import org.apache.eventmesh.common.protocol.http.body.message.SendMessageResponseBody;
import org.apache.eventmesh.common.protocol.http.common.EventMeshRetCode;
import org.apache.eventmesh.common.protocol.http.common.RequestCode;
import org.apache.eventmesh.common.protocol.http.header.message.SendMessageBatchV2RequestHeader;
import org.apache.eventmesh.common.protocol.http.header.message.SendMessageBatchV2ResponseHeader;
+import org.apache.eventmesh.runtime.acl.Acl;
import org.apache.eventmesh.runtime.boot.EventMeshHTTPServer;
import org.apache.eventmesh.runtime.constants.EventMeshConstants;
import org.apache.eventmesh.runtime.core.protocol.http.async.AsyncContext;
@@ -50,6 +52,8 @@ public class BatchSendMessageV2Processor implements HttpRequestProcessor {
public Logger cmdLogger = LoggerFactory.getLogger("cmd");
+ public Logger aclLogger = LoggerFactory.getLogger("acl");
+
private EventMeshHTTPServer eventMeshHTTPServer;
public BatchSendMessageV2Processor(EventMeshHTTPServer eventMeshHTTPServer) {
@@ -96,6 +100,28 @@ public class BatchSendMessageV2Processor implements HttpRequestProcessor {
return;
}
+ //do acl check
+ if(eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshServerSecurityEnable) {
+ String remoteAddr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
+ String user = sendMessageBatchV2RequestHeader.getUsername();
+ String pass = sendMessageBatchV2RequestHeader.getPasswd();
+ String subsystem = sendMessageBatchV2RequestHeader.getSys();
+ int requestCode = Integer.valueOf(sendMessageBatchV2RequestHeader.getCode());
+ String topic = sendMessageBatchV2RequestBody.getTopic();
+ try {
+ Acl.doAclCheckInHttpSend(remoteAddr, user, pass, subsystem, topic, requestCode);
+ }catch (Exception e){
+ //String errorMsg = String.format("CLIENT HAS NO PERMISSION,send failed, topic:%s, subsys:%s, realIp:%s", topic, subsys, realIp);
+
+ responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse(
+ sendMessageBatchV2ResponseHeader,
+ SendMessageResponseBody.buildBody(EventMeshRetCode.EVENTMESH_ACL_ERR.getRetCode(), e.getMessage()));
+ asyncContext.onComplete(responseEventMeshCommand);
+ aclLogger.warn("CLIENT HAS NO PERMISSION,BatchSendMessageV2Processor send failed", e);
+ return;
+ }
+ }
+
if (!eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshServerBatchMsgNumLimiter
.tryAcquire(EventMeshConstants.DEFAULT_FASTFAIL_TIMEOUT_IN_MILLISECONDS, TimeUnit.MILLISECONDS)) {
responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse(
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/HeartBeatProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/HeartBeatProcessor.java
index 9e4c41e..e0341cf 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/HeartBeatProcessor.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/HeartBeatProcessor.java
@@ -31,10 +31,12 @@ import org.apache.eventmesh.common.IPUtil;
import org.apache.eventmesh.common.command.HttpCommand;
import org.apache.eventmesh.common.protocol.http.body.client.HeartbeatRequestBody;
import org.apache.eventmesh.common.protocol.http.body.client.HeartbeatResponseBody;
+import org.apache.eventmesh.common.protocol.http.body.message.SendMessageResponseBody;
import org.apache.eventmesh.common.protocol.http.common.EventMeshRetCode;
import org.apache.eventmesh.common.protocol.http.common.RequestCode;
import org.apache.eventmesh.common.protocol.http.header.client.HeartbeatRequestHeader;
import org.apache.eventmesh.common.protocol.http.header.client.HeartbeatResponseHeader;
+import org.apache.eventmesh.runtime.acl.Acl;
import org.apache.eventmesh.runtime.boot.EventMeshHTTPServer;
import org.apache.eventmesh.runtime.constants.EventMeshConstants;
import org.apache.eventmesh.runtime.core.protocol.http.async.AsyncContext;
@@ -50,6 +52,8 @@ public class HeartBeatProcessor implements HttpRequestProcessor {
public Logger httpLogger = LoggerFactory.getLogger("http");
+ public Logger aclLogger = LoggerFactory.getLogger("acl");
+
private EventMeshHTTPServer eventMeshHTTPServer;
public HeartBeatProcessor(EventMeshHTTPServer eventMeshHTTPServer) {
@@ -121,6 +125,26 @@ public class HeartBeatProcessor implements HttpRequestProcessor {
continue;
}
+ //do acl check
+ if(eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshServerSecurityEnable) {
+ String remoteAddr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
+ String user = heartbeatRequestHeader.getUsername();
+ String pass = heartbeatRequestHeader.getPasswd();
+ int requestCode = Integer.valueOf(heartbeatRequestHeader.getCode());
+ try {
+ Acl.doAclCheckInHttpHeartbeat(remoteAddr, user, pass, sys, topic, requestCode);
+ } catch (Exception e) {
+ //String errorMsg = String.format("CLIENT HAS NO PERMISSION,send failed, topic:%s, subsys:%s, realIp:%s", topic, subsys, realIp);
+
+ responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse(
+ heartbeatResponseHeader,
+ SendMessageResponseBody.buildBody(EventMeshRetCode.EVENTMESH_ACL_ERR.getRetCode(), e.getMessage()));
+ asyncContext.onComplete(responseEventMeshCommand);
+ aclLogger.warn("CLIENT HAS NO PERMISSION,HeartBeatProcessor subscribe failed", e);
+ return;
+ }
+ }
+
if (StringUtils.isBlank(client.url)) {
continue;
}
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncMessageProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncMessageProcessor.java
index cd3d4be..b79b587 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncMessageProcessor.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncMessageProcessor.java
@@ -34,6 +34,7 @@ import org.apache.eventmesh.common.protocol.http.common.EventMeshRetCode;
import org.apache.eventmesh.common.protocol.http.common.RequestCode;
import org.apache.eventmesh.common.protocol.http.header.message.SendMessageRequestHeader;
import org.apache.eventmesh.common.protocol.http.header.message.SendMessageResponseHeader;
+import org.apache.eventmesh.runtime.acl.Acl;
import org.apache.eventmesh.runtime.boot.EventMeshHTTPServer;
import org.apache.eventmesh.runtime.constants.EventMeshConstants;
import org.apache.eventmesh.runtime.core.protocol.http.async.AsyncContext;
@@ -54,6 +55,8 @@ public class SendAsyncMessageProcessor implements HttpRequestProcessor {
public Logger cmdLogger = LoggerFactory.getLogger("cmd");
+ public Logger aclLogger = LoggerFactory.getLogger("acl");
+
private EventMeshHTTPServer eventMeshHTTPServer;
public SendAsyncMessageProcessor(EventMeshHTTPServer eventMeshHTTPServer) {
@@ -104,6 +107,28 @@ public class SendAsyncMessageProcessor implements HttpRequestProcessor {
return;
}
+ //do acl check
+ if(eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshServerSecurityEnable) {
+ String remoteAddr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
+ String user = sendMessageRequestHeader.getUsername();
+ String pass = sendMessageRequestHeader.getPasswd();
+ String subsystem = sendMessageRequestHeader.getSys();
+ int requestCode = Integer.valueOf(sendMessageRequestHeader.getCode());
+ String topic = sendMessageRequestBody.getTopic();
+ try {
+ Acl.doAclCheckInHttpSend(remoteAddr, user, pass, subsystem, topic, requestCode);
+ }catch (Exception e){
+ //String errorMsg = String.format("CLIENT HAS NO PERMISSION,send failed, topic:%s, subsys:%s, realIp:%s", topic, subsys, realIp);
+
+ responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse(
+ sendMessageResponseHeader,
+ SendMessageResponseBody.buildBody(EventMeshRetCode.EVENTMESH_ACL_ERR.getRetCode(), e.getMessage()));
+ asyncContext.onComplete(responseEventMeshCommand);
+ aclLogger.warn("CLIENT HAS NO PERMISSION,SendAsyncMessageProcessor send failed", e);
+ return;
+ }
+ }
+
String producerGroup = sendMessageRequestBody.getProducerGroup();
EventMeshProducer eventMeshProducer = eventMeshHTTPServer.getProducerManager().getEventMeshProducer(producerGroup);
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendSyncMessageProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendSyncMessageProcessor.java
index 5511e38..136961a 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendSyncMessageProcessor.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendSyncMessageProcessor.java
@@ -37,6 +37,7 @@ import org.apache.eventmesh.common.protocol.http.common.EventMeshRetCode;
import org.apache.eventmesh.common.protocol.http.common.RequestCode;
import org.apache.eventmesh.common.protocol.http.header.message.SendMessageRequestHeader;
import org.apache.eventmesh.common.protocol.http.header.message.SendMessageResponseHeader;
+import org.apache.eventmesh.runtime.acl.Acl;
import org.apache.eventmesh.runtime.boot.EventMeshHTTPServer;
import org.apache.eventmesh.runtime.constants.EventMeshConstants;
import org.apache.eventmesh.runtime.core.protocol.http.async.AsyncContext;
@@ -58,6 +59,8 @@ public class SendSyncMessageProcessor implements HttpRequestProcessor {
public Logger httpLogger = LoggerFactory.getLogger("http");
+ public Logger aclLogger = LoggerFactory.getLogger("acl");
+
private EventMeshHTTPServer eventMeshHTTPServer;
public SendSyncMessageProcessor(EventMeshHTTPServer eventMeshHTTPServer) {
@@ -105,6 +108,28 @@ public class SendSyncMessageProcessor implements HttpRequestProcessor {
return;
}
+ //do acl check
+ if(eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshServerSecurityEnable) {
+ String remoteAddr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
+ String user = sendMessageRequestHeader.getUsername();
+ String pass = sendMessageRequestHeader.getPasswd();
+ String subsystem = sendMessageRequestHeader.getSys();
+ int requestCode = Integer.valueOf(sendMessageRequestHeader.getCode());
+ String topic = sendMessageRequestBody.getTopic();
+ try {
+ Acl.doAclCheckInHttpSend(remoteAddr, user, pass, subsystem, topic, requestCode);
+ }catch (Exception e){
+ //String errorMsg = String.format("CLIENT HAS NO PERMISSION,send failed, topic:%s, subsys:%s, realIp:%s", topic, subsys, realIp);
+
+ responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse(
+ sendMessageResponseHeader,
+ SendMessageResponseBody.buildBody(EventMeshRetCode.EVENTMESH_ACL_ERR.getRetCode(), e.getMessage()));
+ asyncContext.onComplete(responseEventMeshCommand);
+ aclLogger.warn("CLIENT HAS NO PERMISSION,SendSyncMessageProcessor send failed", e);
+ return;
+ }
+ }
+
String producerGroup = sendMessageRequestBody.getProducerGroup();
EventMeshProducer eventMeshProducer = eventMeshHTTPServer.getProducerManager().getEventMeshProducer(producerGroup);
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SubscribeProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SubscribeProcessor.java
index 1e8daf0..14e185e 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SubscribeProcessor.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SubscribeProcessor.java
@@ -33,10 +33,12 @@ import org.apache.eventmesh.common.command.HttpCommand;
import org.apache.eventmesh.common.protocol.SubscriptionItem;
import org.apache.eventmesh.common.protocol.http.body.client.SubscribeRequestBody;
import org.apache.eventmesh.common.protocol.http.body.client.SubscribeResponseBody;
+import org.apache.eventmesh.common.protocol.http.body.message.SendMessageResponseBody;
import org.apache.eventmesh.common.protocol.http.common.EventMeshRetCode;
import org.apache.eventmesh.common.protocol.http.common.RequestCode;
import org.apache.eventmesh.common.protocol.http.header.client.SubscribeRequestHeader;
import org.apache.eventmesh.common.protocol.http.header.client.SubscribeResponseHeader;
+import org.apache.eventmesh.runtime.acl.Acl;
import org.apache.eventmesh.runtime.boot.EventMeshHTTPServer;
import org.apache.eventmesh.runtime.constants.EventMeshConstants;
import org.apache.eventmesh.runtime.core.consumergroup.ConsumerGroupConf;
@@ -54,6 +56,8 @@ public class SubscribeProcessor implements HttpRequestProcessor {
public Logger httpLogger = LoggerFactory.getLogger("http");
+ public Logger aclLogger = LoggerFactory.getLogger("acl");
+
private EventMeshHTTPServer eventMeshHTTPServer;
public SubscribeProcessor(EventMeshHTTPServer eventMeshHTTPServer) {
@@ -100,6 +104,29 @@ public class SubscribeProcessor implements HttpRequestProcessor {
}
List<SubscriptionItem> subTopicList = subscribeRequestBody.getTopics();
+ //do acl check
+ if(eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshServerSecurityEnable) {
+ String remoteAddr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
+ String user = subscribeRequestHeader.getUsername();
+ String pass = subscribeRequestHeader.getPasswd();
+ String subsystem = subscribeRequestHeader.getSys();
+ int requestCode = Integer.valueOf(subscribeRequestHeader.getCode());
+ for(SubscriptionItem item : subTopicList) {
+ try {
+ Acl.doAclCheckInHttpReceive(remoteAddr, user, pass, subsystem, item.getTopic(), requestCode);
+ } catch (Exception e) {
+ //String errorMsg = String.format("CLIENT HAS NO PERMISSION,send failed, topic:%s, subsys:%s, realIp:%s", topic, subsys, realIp);
+
+ responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse(
+ subscribeResponseHeader,
+ SendMessageResponseBody.buildBody(EventMeshRetCode.EVENTMESH_ACL_ERR.getRetCode(), e.getMessage()));
+ asyncContext.onComplete(responseEventMeshCommand);
+ aclLogger.warn("CLIENT HAS NO PERMISSION,SubscribeProcessor subscribe failed", e);
+ return;
+ }
+ }
+ }
+
String url = subscribeRequestBody.getUrl();
String consumerGroup = subscribeRequestBody.getConsumerGroup();
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/HeartBeatTask.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/HeartBeatTask.java
index eac1354..9e16ef2 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/HeartBeatTask.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/HeartBeatTask.java
@@ -17,6 +17,7 @@
package org.apache.eventmesh.runtime.core.protocol.tcp.client.task;
+import static org.apache.eventmesh.common.protocol.tcp.Command.HEARTBEAT_REQUEST;
import static org.apache.eventmesh.common.protocol.tcp.Command.HEARTBEAT_RESPONSE;
import io.netty.channel.ChannelHandlerContext;
@@ -24,7 +25,9 @@ import io.netty.channel.ChannelHandlerContext;
import org.apache.eventmesh.common.protocol.tcp.Header;
import org.apache.eventmesh.common.protocol.tcp.OPStatus;
import org.apache.eventmesh.common.protocol.tcp.Package;
+import org.apache.eventmesh.runtime.acl.Acl;
import org.apache.eventmesh.runtime.boot.EventMeshTCPServer;
+import org.apache.eventmesh.runtime.util.RemotingHelper;
import org.apache.eventmesh.runtime.util.Utils;
public class HeartBeatTask extends AbstractTask {
@@ -38,6 +41,12 @@ public class HeartBeatTask extends AbstractTask {
long taskExecuteTime = System.currentTimeMillis();
Package res = new Package();
try {
+ //do acl check in heartbeat
+ if(eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshServerSecurityEnable){
+ String remoteAddr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
+ Acl.doAclCheckInTcpHeartbeat(remoteAddr, session.getClient(), HEARTBEAT_REQUEST.value());
+ }
+
if (session != null) {
session.notifyHeartbeat(startTime);
}
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/HelloTask.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/HelloTask.java
index 161b1d7..df4a606 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/HelloTask.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/HelloTask.java
@@ -17,6 +17,7 @@
package org.apache.eventmesh.runtime.core.protocol.tcp.client.task;
+import static org.apache.eventmesh.common.protocol.tcp.Command.HELLO_REQUEST;
import static org.apache.eventmesh.common.protocol.tcp.Command.HELLO_RESPONSE;
import io.netty.channel.ChannelFuture;
@@ -28,10 +29,12 @@ import org.apache.eventmesh.common.protocol.tcp.Header;
import org.apache.eventmesh.common.protocol.tcp.OPStatus;
import org.apache.eventmesh.common.protocol.tcp.Package;
import org.apache.eventmesh.common.protocol.tcp.UserAgent;
+import org.apache.eventmesh.runtime.acl.Acl;
import org.apache.eventmesh.runtime.boot.EventMeshTCPServer;
import org.apache.eventmesh.runtime.common.ServiceState;
import org.apache.eventmesh.runtime.constants.EventMeshConstants;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.Session;
+import org.apache.eventmesh.runtime.util.RemotingHelper;
import org.apache.eventmesh.runtime.util.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -51,6 +54,12 @@ public class HelloTask extends AbstractTask {
Session session = null;
UserAgent user = (UserAgent) pkg.getBody();
try {
+ //do acl check in connect
+ if(eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshServerSecurityEnable){
+ String remoteAddr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
+ Acl.doAclCheckInTcpConnect(remoteAddr, user, HELLO_REQUEST.value());
+ }
+
if (eventMeshTCPServer.getEventMeshServer().getServiceState() != ServiceState.RUNNING) {
logger.error("server state is not running:{}", eventMeshTCPServer.getEventMeshServer().getServiceState());
throw new Exception("server state is not running, maybe deploying...");
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/MessageTransferTask.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/MessageTransferTask.java
index 7ef1b8f..87eeed0 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/MessageTransferTask.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/MessageTransferTask.java
@@ -34,11 +34,13 @@ import org.apache.eventmesh.common.protocol.tcp.EventMeshMessage;
import org.apache.eventmesh.common.protocol.tcp.Header;
import org.apache.eventmesh.common.protocol.tcp.OPStatus;
import org.apache.eventmesh.common.protocol.tcp.Package;
+import org.apache.eventmesh.runtime.acl.Acl;
import org.apache.eventmesh.runtime.boot.EventMeshTCPServer;
import org.apache.eventmesh.runtime.constants.EventMeshConstants;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.send.EventMeshTcpSendResult;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.send.EventMeshTcpSendStatus;
import org.apache.eventmesh.runtime.util.EventMeshUtil;
+import org.apache.eventmesh.runtime.util.RemotingHelper;
import org.apache.eventmesh.runtime.util.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -67,6 +69,12 @@ public class MessageTransferTask extends AbstractTask {
throw new Exception("eventMeshMessage is null");
}
+ //do acl check in sending msg
+ if(eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshServerSecurityEnable){
+ String remoteAddr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
+ Acl.doAclCheckInTcpSend(remoteAddr, session.getClient(), eventMeshMessage.getTopic(), cmd.value());
+ }
+
if (eventMeshTCPServer.getRateLimiter().tryAcquire(TRY_PERMIT_TIME_OUT, TimeUnit.MILLISECONDS)) {
synchronized (session) {
long sendTime = System.currentTimeMillis();
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/SubscribeTask.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/SubscribeTask.java
index ccacf56..c48a881 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/SubscribeTask.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/task/SubscribeTask.java
@@ -26,7 +26,9 @@ import org.apache.eventmesh.common.protocol.tcp.Subscription;
import org.apache.eventmesh.common.protocol.SubscriptionItem;
import org.apache.eventmesh.common.protocol.tcp.*;
import org.apache.eventmesh.common.protocol.tcp.Package;
+import org.apache.eventmesh.runtime.acl.Acl;
import org.apache.eventmesh.runtime.boot.EventMeshTCPServer;
+import org.apache.eventmesh.runtime.util.RemotingHelper;
import org.apache.eventmesh.runtime.util.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -52,6 +54,13 @@ public class SubscribeTask extends AbstractTask {
List<SubscriptionItem> subscriptionItems = new ArrayList<>();
for (int i = 0; i < subscriptionInfo.getTopicList().size(); i++) {
SubscriptionItem item = subscriptionInfo.getTopicList().get(i);
+
+ //do acl check for receive msg
+ if(eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshServerSecurityEnable){
+ String remoteAddr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
+ Acl.doAclCheckInTcpReceive(remoteAddr, session.getClient(), item.getTopic(), Command.SUBSCRIBE_REQUEST.value());
+ }
+
subscriptionItems.add(item);
}
synchronized (session) {
diff --git a/eventmesh-runtime/build.gradle b/eventmesh-security-plugin/build.gradle
similarity index 54%
copy from eventmesh-runtime/build.gradle
copy to eventmesh-security-plugin/build.gradle
index c5c141a..dd414e3 100644
--- a/eventmesh-runtime/build.gradle
+++ b/eventmesh-security-plugin/build.gradle
@@ -15,8 +15,23 @@
* limitations under the License.
*/
-dependencies {
- implementation project(":eventmesh-connector-plugin:eventmesh-connector-api")
-
- testImplementation project(":eventmesh-connector-plugin:eventmesh-connector-api")
-}
+task copyAclPlugin(dependsOn: ['jar']) {
+ doFirst {
+ new File(projectDir, '../eventmesh-security-plugin/dist/apps').mkdir()
+ new File(projectDir, '../dist/plugin/security').mkdirs()
+ }
+ doLast {
+ copy {
+ into('../eventmesh-security-plugin/dist/apps/')
+ from project.jar.getArchivePath()
+ exclude {
+ "eventmesh-security-plugin-${version}.jar"
+ "eventmesh-security-api-${version}.jar"
+ }
+ }
+ copy {
+ into '../dist/plugin/security'
+ from "../eventmesh-security-plugin/dist/apps/eventmesh-security-acl-${version}.jar"
+ }
+ }
+}
\ No newline at end of file
diff --git a/eventmesh-runtime/build.gradle b/eventmesh-security-plugin/eventmesh-security-acl/build.gradle
similarity index 83%
copy from eventmesh-runtime/build.gradle
copy to eventmesh-security-plugin/eventmesh-security-acl/build.gradle
index c5c141a..1a18796 100644
--- a/eventmesh-runtime/build.gradle
+++ b/eventmesh-security-plugin/eventmesh-security-acl/build.gradle
@@ -16,7 +16,7 @@
*/
dependencies {
- implementation project(":eventmesh-connector-plugin:eventmesh-connector-api")
+ api project(":eventmesh-security-plugin:eventmesh-security-api")
- testImplementation project(":eventmesh-connector-plugin:eventmesh-connector-api")
+ testImplementation project(":eventmesh-security-plugin:eventmesh-security-api")
}
diff --git a/eventmesh-security-plugin/eventmesh-security-acl/src/main/java/org/apache/eventmesh/acl/impl/AclServiceImpl.java b/eventmesh-security-plugin/eventmesh-security-acl/src/main/java/org/apache/eventmesh/acl/impl/AclServiceImpl.java
new file mode 100644
index 0000000..8d73bc7
--- /dev/null
+++ b/eventmesh-security-plugin/eventmesh-security-acl/src/main/java/org/apache/eventmesh/acl/impl/AclServiceImpl.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.acl.impl;
+
+import org.apache.eventmesh.api.acl.AclService;
+import org.apache.eventmesh.api.exception.AclException;
+
+import java.util.Properties;
+
+public class AclServiceImpl implements AclService {
+ @Override
+ public void init() throws AclException {
+ //TODO
+ }
+
+ @Override
+ public void start() throws AclException {
+ //TODO
+ }
+
+ @Override
+ public void shutdown() throws AclException {
+ //TODO
+ }
+
+ @Override
+ public void doAclCheckInConnect(Properties aclProperties) throws AclException {
+ //TODO
+ }
+
+ @Override
+ public void doAclCheckInHeartbeat(Properties aclProperties) throws AclException {
+ //TODO
+ }
+
+ @Override
+ public void doAclCheckInSend(Properties aclProperties) throws AclException {
+ //TODO
+ }
+
+ @Override
+ public void doAclCheckInReceive(Properties aclProperties) throws AclException {
+ //TODO
+ }
+}
diff --git a/eventmesh-common/src/test/resources/configuration.properties b/eventmesh-security-plugin/eventmesh-security-acl/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.api.acl.AclService
similarity index 78%
copy from eventmesh-common/src/test/resources/configuration.properties
copy to eventmesh-security-plugin/eventmesh-security-acl/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.api.acl.AclService
index 76f29f2..b883bd8 100644
--- a/eventmesh-common/src/test/resources/configuration.properties
+++ b/eventmesh-security-plugin/eventmesh-security-acl/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.api.acl.AclService
@@ -1,4 +1,3 @@
-#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
@@ -13,12 +12,5 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-#
-eventMesh.server.env=value1
-eventMesh.server.idc=value2
-eventMesh.sysid=3
-eventMesh.server.cluster=value4
-eventMesh.server.name=value5
-eventMesh.server.hostIp=value6
-eventMesh.connector.plugin.type=rocketmq
+acl=org.apache.eventmesh.acl.impl.AclServiceImpl
\ No newline at end of file
diff --git a/eventmesh-runtime/build.gradle b/eventmesh-security-plugin/eventmesh-security-api/build.gradle
similarity index 83%
copy from eventmesh-runtime/build.gradle
copy to eventmesh-security-plugin/eventmesh-security-api/build.gradle
index c5c141a..0d41042 100644
--- a/eventmesh-runtime/build.gradle
+++ b/eventmesh-security-plugin/eventmesh-security-api/build.gradle
@@ -16,7 +16,7 @@
*/
dependencies {
- implementation project(":eventmesh-connector-plugin:eventmesh-connector-api")
+ api project(":eventmesh-spi")
- testImplementation project(":eventmesh-connector-plugin:eventmesh-connector-api")
+ testImplementation project(":eventmesh-spi")
}
diff --git a/eventmesh-runtime/build.gradle b/eventmesh-security-plugin/eventmesh-security-api/src/main/java/org/apache/eventmesh/api/acl/AclPropertyKeys.java
similarity index 67%
copy from eventmesh-runtime/build.gradle
copy to eventmesh-security-plugin/eventmesh-security-api/src/main/java/org/apache/eventmesh/api/acl/AclPropertyKeys.java
index c5c141a..eb5514e 100644
--- a/eventmesh-runtime/build.gradle
+++ b/eventmesh-security-plugin/eventmesh-security-api/src/main/java/org/apache/eventmesh/api/acl/AclPropertyKeys.java
@@ -15,8 +15,13 @@
* limitations under the License.
*/
-dependencies {
- implementation project(":eventmesh-connector-plugin:eventmesh-connector-api")
+package org.apache.eventmesh.api.acl;
- testImplementation project(":eventmesh-connector-plugin:eventmesh-connector-api")
+public class AclPropertyKeys {
+ public static final String CLIENT_IP = "clientIp";
+ public static final String USER = "user";
+ public static final String PASSWORD = "pwd";
+ public static final String SUBSYSTEM = "subsystem";
+ public static final String TOPIC = "topic";
+ public static final String REQUEST_CODE = "requestCode";
}
diff --git a/eventmesh-runtime/build.gradle b/eventmesh-security-plugin/eventmesh-security-api/src/main/java/org/apache/eventmesh/api/acl/AclService.java
similarity index 54%
copy from eventmesh-runtime/build.gradle
copy to eventmesh-security-plugin/eventmesh-security-api/src/main/java/org/apache/eventmesh/api/acl/AclService.java
index c5c141a..3502e90 100644
--- a/eventmesh-runtime/build.gradle
+++ b/eventmesh-security-plugin/eventmesh-security-api/src/main/java/org/apache/eventmesh/api/acl/AclService.java
@@ -14,9 +14,27 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+package org.apache.eventmesh.api.acl;
-dependencies {
- implementation project(":eventmesh-connector-plugin:eventmesh-connector-api")
+import org.apache.eventmesh.api.exception.AclException;
+import org.apache.eventmesh.spi.EventMeshSPI;
+
+import java.util.Properties;
+
+@EventMeshSPI(isSingleton = true)
+public interface AclService {
+ void init() throws AclException;
+
+ void start() throws AclException;
+
+ void shutdown() throws AclException;
+
+ void doAclCheckInConnect(Properties aclProperties) throws AclException;
+
+ void doAclCheckInHeartbeat(Properties aclProperties) throws AclException;
+
+ void doAclCheckInSend(Properties aclProperties) throws AclException;
+
+ void doAclCheckInReceive(Properties aclProperties) throws AclException;
- testImplementation project(":eventmesh-connector-plugin:eventmesh-connector-api")
}
diff --git a/eventmesh-runtime/build.gradle b/eventmesh-security-plugin/eventmesh-security-api/src/main/java/org/apache/eventmesh/api/exception/AclException.java
similarity index 75%
copy from eventmesh-runtime/build.gradle
copy to eventmesh-security-plugin/eventmesh-security-api/src/main/java/org/apache/eventmesh/api/exception/AclException.java
index c5c141a..389bf8a 100644
--- a/eventmesh-runtime/build.gradle
+++ b/eventmesh-security-plugin/eventmesh-security-api/src/main/java/org/apache/eventmesh/api/exception/AclException.java
@@ -14,9 +14,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+package org.apache.eventmesh.api.exception;
-dependencies {
- implementation project(":eventmesh-connector-plugin:eventmesh-connector-api")
+public class AclException extends RuntimeException {
- testImplementation project(":eventmesh-connector-plugin:eventmesh-connector-api")
+ public AclException(String message) {
+ super(message);
+ }
+
+ public AclException(String message, Throwable cause) {
+ super(message, cause);
+ }
}
diff --git a/settings.gradle b/settings.gradle
index 7c0e70a..145df7e 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -25,4 +25,6 @@ include 'eventmesh-test'
include 'eventmesh-spi'
include 'eventmesh-connector-plugin:eventmesh-connector-api'
include 'eventmesh-connector-plugin:eventmesh-connector-rocketmq'
+include 'eventmesh-security-plugin:eventmesh-security-api'
+include 'eventmesh-security-plugin:eventmesh-security-acl'
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org