You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by mi...@apache.org on 2022/02/25 07:39:24 UTC
[incubator-eventmesh] branch master updated: [Issue #658] Eventmesh Http Support CloudEvents Webhook spec (#772)
This is an automated email from the ASF dual-hosted git repository.
mikexue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git
The following commit(s) were added to refs/heads/master by this push:
new a52bf29 [Issue #658] Eventmesh Http Support CloudEvents Webhook spec (#772)
a52bf29 is described below
commit a52bf296a2a70128d91a77ca75c682aaf41f2289
Author: jinrongluo <ka...@gmail.com>
AuthorDate: Fri Feb 25 02:39:17 2022 -0500
[Issue #658] Eventmesh Http Support CloudEvents Webhook spec (#772)
* [Issue #658] support CloudEvents Webhook spec
* [Issue #658] create auth-http-basic security module
* [Issue #658] create auth-http-basic security module
* [Issue #658] code refactor for CloudEvents Webhook
* [Issue #658] fix build checkstyles
* [Issue #658] fix javadoc build issue
* [Issue #658] fix javadoc build issue in eventmesh-security-auth-http-basic
* [Issue #658] adding more log to the WebhookUtil
* [Issue #658] address PR review comments.
* [Issue #658] fixed checkstyles issue
Co-authored-by: mike_xwm <mi...@126.com>
---
.../org/apache/eventmesh/common/Constants.java | 2 +-
.../common/config/CommonConfiguration.java | 2 +
.../demo/pub/cloudevents/AsyncPublishInstance.java | 2 +-
eventmesh-runtime/build.gradle | 1 +
.../runtime/boot/EventMeshHTTPServer.java | 5 +-
.../core/consumergroup/ConsumerGroupTopicConf.java | 9 ++
.../http/processor/SubscribeProcessor.java | 17 +++
.../http/push/AbstractHTTPPushRequest.java | 12 +-
.../protocol/http/push/AsyncHTTPPushRequest.java | 152 +++++++++++++--------
.../core/protocol/http/push/HTTPClientPool.java | 68 ++++++++-
.../apache/eventmesh/runtime/util/WebhookUtil.java | 109 +++++++++++++++
.../eventmesh-security-api/build.gradle | 2 +
.../apache/eventmesh/api/auth/AuthService.java} | 23 +++-
.../eventmesh/api/common/ConfigurationWrapper.java | 58 ++++++++
.../eventmesh/api/exception/AuthException.java} | 13 +-
.../build.gradle | 6 +-
.../gradle.properties | 18 +++
.../auth/http/basic/config/AuthConfigs.java} | 25 +++-
.../auth/http/basic/impl/AuthHttpBasicService.java | 61 +++++++++
.../org.apache.eventmesh.api.auth.AuthService | 16 +++
.../src/main/resources/auth-http-basic.properties | 18 +++
settings.gradle | 4 +-
22 files changed, 544 insertions(+), 79 deletions(-)
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/Constants.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/Constants.java
index dadd908..ed96085 100644
--- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/Constants.java
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/Constants.java
@@ -35,7 +35,7 @@ public class Constants {
public static final String PROTOCOL_DESC = "protocoldesc";
- public static final int DEFAULT_HTTP_TIME_OUT = 3000;
+ public static final int DEFAULT_HTTP_TIME_OUT = 15000;
public static final String EVENTMESH_MESSAGE_CONST_TTL = "ttl";
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/CommonConfiguration.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/CommonConfiguration.java
index 8d3e3eb..2552098 100644
--- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/CommonConfiguration.java
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/CommonConfiguration.java
@@ -48,6 +48,8 @@ public class CommonConfiguration {
public boolean eventMeshServerRegistryEnable = false;
protected ConfigurationWrapper configurationWrapper;
+ public String eventMeshWebhookOrigin = "eventmesh." + eventMeshIDC;
+
public CommonConfiguration(ConfigurationWrapper configurationWrapper) {
this.configurationWrapper = configurationWrapper;
}
diff --git a/eventmesh-examples/src/main/java/org/apache/eventmesh/http/demo/pub/cloudevents/AsyncPublishInstance.java b/eventmesh-examples/src/main/java/org/apache/eventmesh/http/demo/pub/cloudevents/AsyncPublishInstance.java
index fd3b1f4..e4cd10f 100644
--- a/eventmesh-examples/src/main/java/org/apache/eventmesh/http/demo/pub/cloudevents/AsyncPublishInstance.java
+++ b/eventmesh-examples/src/main/java/org/apache/eventmesh/http/demo/pub/cloudevents/AsyncPublishInstance.java
@@ -44,7 +44,7 @@ import lombok.extern.slf4j.Slf4j;
public class AsyncPublishInstance {
// This messageSize is also used in SubService.java (Subscriber)
- public static final int MESSAGE_SIZE = 5;
+ public static final int MESSAGE_SIZE = 1;
public static final String DEFAULT_IP_PORT = "127.0.0.1:10105";
diff --git a/eventmesh-runtime/build.gradle b/eventmesh-runtime/build.gradle
index 54e763d..a3b9016 100644
--- a/eventmesh-runtime/build.gradle
+++ b/eventmesh-runtime/build.gradle
@@ -36,6 +36,7 @@ dependencies {
implementation project(":eventmesh-connector-plugin:eventmesh-connector-standalone")
implementation project(":eventmesh-security-plugin:eventmesh-security-api")
implementation project(":eventmesh-security-plugin:eventmesh-security-acl")
+ implementation project(":eventmesh-security-plugin:eventmesh-security-auth-http-basic")
implementation project(":eventmesh-registry-plugin:eventmesh-registry-api")
implementation project(":eventmesh-admin:eventmesh-admin-rocketmq")
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshHTTPServer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshHTTPServer.java
index dfa6e6b..07ec0a4 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshHTTPServer.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshHTTPServer.java
@@ -37,6 +37,7 @@ import org.apache.eventmesh.runtime.core.protocol.http.processor.UnSubscribeProc
import org.apache.eventmesh.runtime.core.protocol.http.processor.inf.Client;
import org.apache.eventmesh.runtime.core.protocol.http.producer.ProducerManager;
import org.apache.eventmesh.runtime.core.protocol.http.push.AbstractHTTPPushRequest;
+import org.apache.eventmesh.runtime.core.protocol.http.push.HTTPClientPool;
import org.apache.eventmesh.runtime.core.protocol.http.retry.HttpRetryer;
import org.apache.eventmesh.runtime.metrics.http.HTTPMetricsServer;
import org.apache.eventmesh.trace.api.TracePluginFactory;
@@ -107,6 +108,8 @@ public class EventMeshHTTPServer extends AbstractHTTPServer {
private RateLimiter batchRateLimiter;
+ public HTTPClientPool httpClientPool = new HTTPClientPool(10);
+
public void shutdownThreadPool() throws Exception {
batchMsgExecutor.shutdown();
adminExecutor.shutdown();
@@ -260,7 +263,7 @@ public class EventMeshHTTPServer extends AbstractHTTPServer {
shutdownThreadPool();
- AbstractHTTPPushRequest.httpClientPool.shutdown();
+ httpClientPool.shutdown();
producerManager.shutdown();
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/consumergroup/ConsumerGroupTopicConf.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/consumergroup/ConsumerGroupTopicConf.java
index f97a51d..1bc73e1 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/consumergroup/ConsumerGroupTopicConf.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/consumergroup/ConsumerGroupTopicConf.java
@@ -55,6 +55,11 @@ public class ConsumerGroupTopicConf implements Serializable {
*/
private Set<String> urls = Sets.newConcurrentHashSet();
+ /**
+ * url auth type
+ */
+ private Map<String, String> httpAuthTypeMap = Maps.newConcurrentMap();
+
@Override
public boolean equals(Object o) {
if (this == o) {
@@ -127,4 +132,8 @@ public class ConsumerGroupTopicConf implements Serializable {
public void setUrls(Set<String> urls) {
this.urls = urls;
}
+
+ public Map<String, String> getHttpAuthTypeMap() {
+ return httpAuthTypeMap;
+ }
}
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SubscribeProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SubscribeProcessor.java
index 01c156b..fa521f8 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SubscribeProcessor.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SubscribeProcessor.java
@@ -39,6 +39,7 @@ import org.apache.eventmesh.runtime.core.protocol.http.processor.inf.Client;
import org.apache.eventmesh.runtime.core.protocol.http.processor.inf.HttpRequestProcessor;
import org.apache.eventmesh.runtime.util.EventMeshUtil;
import org.apache.eventmesh.runtime.util.RemotingHelper;
+import org.apache.eventmesh.runtime.util.WebhookUtil;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
@@ -172,6 +173,21 @@ public class SubscribeProcessor implements HttpRequestProcessor {
return;
}
+ // obtain webhook delivery agreement for Abuse Protection
+ boolean isWebhookAllowed = WebhookUtil.obtainDeliveryAgreement(eventMeshHTTPServer.httpClientPool.getClient(),
+ url, eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshWebhookOrigin);
+
+ if (!isWebhookAllowed) {
+ httpLogger.error("subscriber url {} is not allowed by the target system", url);
+ responseEventMeshCommand = request.createHttpCommandResponse(
+ subscribeResponseHeader,
+ SubscribeResponseBody
+ .buildBody(EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR.getRetCode(),
+ EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR.getErrMsg() + " unauthorized webhook URL: " + url));
+ asyncContext.onComplete(responseEventMeshCommand);
+ return;
+ }
+
synchronized (eventMeshHTTPServer.localClientInfoMapping) {
registerClient(subscribeRequestHeader, consumerGroup, subTopicList, url);
@@ -329,4 +345,5 @@ public class SubscribeProcessor implements HttpRequestProcessor {
}
}
}
+
}
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AbstractHTTPPushRequest.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AbstractHTTPPushRequest.java
index dd60a62..1f7c3a5 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AbstractHTTPPushRequest.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AbstractHTTPPushRequest.java
@@ -57,8 +57,6 @@ public abstract class AbstractHTTPPushRequest extends RetryContext {
public HandleMsgContext handleMsgContext;
- public static HTTPClientPool httpClientPool = new HTTPClientPool(10);
-
private AtomicBoolean complete = new AtomicBoolean(Boolean.FALSE);
public AbstractHTTPPushRequest(HandleMsgContext handleMsgContext) {
@@ -75,6 +73,16 @@ public abstract class AbstractHTTPPushRequest extends RetryContext {
public void tryHTTPRequest() {
}
+ public void delayRetry(long delayTime) {
+ if (retryTimes < EventMeshConstants.DEFAULT_PUSH_RETRY_TIMES && delayTime > 0) {
+ retryTimes++;
+ delay(delayTime);
+ retryer.pushRetry(this);
+ } else {
+ complete.compareAndSet(Boolean.FALSE, Boolean.TRUE);
+ }
+ }
+
public void delayRetry() {
if (retryTimes < EventMeshConstants.DEFAULT_PUSH_RETRY_TIMES) {
retryTimes++;
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AsyncHTTPPushRequest.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AsyncHTTPPushRequest.java
index 89c59c9..d874c2b 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AsyncHTTPPushRequest.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AsyncHTTPPushRequest.java
@@ -35,9 +35,12 @@ import org.apache.eventmesh.protocol.api.ProtocolPluginFactory;
import org.apache.eventmesh.runtime.constants.EventMeshConstants;
import org.apache.eventmesh.runtime.core.protocol.http.consumer.HandleMsgContext;
import org.apache.eventmesh.runtime.util.EventMeshUtil;
+import org.apache.eventmesh.runtime.util.WebhookUtil;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.DateFormatUtils;
+import org.apache.http.Header;
+import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
import org.apache.http.HttpStatus;
import org.apache.http.NameValuePair;
@@ -51,9 +54,11 @@ import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Optional;
import java.util.Set;
import org.slf4j.Logger;
@@ -72,10 +77,8 @@ public class AsyncHTTPPushRequest extends AbstractHTTPPushRequest {
public Logger cmdLogger = LoggerFactory.getLogger("cmd");
public Logger logger = LoggerFactory.getLogger(this.getClass());
-
- private Map<String, Set<AbstractHTTPPushRequest>> waitingRequests;
-
public String currPushUrl;
+ private Map<String, Set<AbstractHTTPPushRequest>> waitingRequests;
public AsyncHTTPPushRequest(HandleMsgContext handleMsgContext,
Map<String, Set<AbstractHTTPPushRequest>> waitingRequests) {
@@ -106,18 +109,18 @@ public class AsyncHTTPPushRequest extends AbstractHTTPPushRequest {
builder.addHeader(ProtocolKey.LANGUAGE, Constants.LANGUAGE_JAVA);
builder.addHeader(ProtocolKey.VERSION, ProtocolVersion.V1.getVersion());
builder.addHeader(ProtocolKey.EventMeshInstanceKey.EVENTMESHCLUSTER,
- handleMsgContext.getEventMeshHTTPServer()
- .getEventMeshHttpConfiguration().eventMeshCluster);
+ handleMsgContext.getEventMeshHTTPServer()
+ .getEventMeshHttpConfiguration().eventMeshCluster);
builder.addHeader(ProtocolKey.EventMeshInstanceKey.EVENTMESHIP, IPUtils.getLocalAddress());
builder.addHeader(ProtocolKey.EventMeshInstanceKey.EVENTMESHENV,
- handleMsgContext.getEventMeshHTTPServer().getEventMeshHttpConfiguration().eventMeshEnv);
+ handleMsgContext.getEventMeshHTTPServer().getEventMeshHttpConfiguration().eventMeshEnv);
builder.addHeader(ProtocolKey.EventMeshInstanceKey.EVENTMESHIDC,
- handleMsgContext.getEventMeshHTTPServer().getEventMeshHttpConfiguration().eventMeshIDC);
+ handleMsgContext.getEventMeshHTTPServer().getEventMeshHttpConfiguration().eventMeshIDC);
CloudEvent event = CloudEventBuilder.from(handleMsgContext.getEvent())
- .withExtension(EventMeshConstants.REQ_EVENTMESH2C_TIMESTAMP,
- String.valueOf(System.currentTimeMillis()))
- .build();
+ .withExtension(EventMeshConstants.REQ_EVENTMESH2C_TIMESTAMP,
+ String.valueOf(System.currentTimeMillis()))
+ .build();
handleMsgContext.setEvent(event);
String content = "";
@@ -127,7 +130,7 @@ public class AsyncHTTPPushRequest extends AbstractHTTPPushRequest {
ProtocolAdaptor<ProtocolTransportObject> protocolAdaptor = ProtocolPluginFactory.getProtocolAdaptor(protocolType);
ProtocolTransportObject protocolTransportObject =
- protocolAdaptor.fromCloudEvent(handleMsgContext.getEvent());
+ protocolAdaptor.fromCloudEvent(handleMsgContext.getEvent());
content = ((HttpCommand) protocolTransportObject).getBody().toMap().get("content").toString();
} catch (Exception ex) {
return;
@@ -137,27 +140,36 @@ public class AsyncHTTPPushRequest extends AbstractHTTPPushRequest {
body.add(new BasicNameValuePair(PushMessageRequestBody.CONTENT, content));
if (StringUtils.isBlank(handleMsgContext.getBizSeqNo())) {
body.add(new BasicNameValuePair(PushMessageRequestBody.BIZSEQNO,
- RandomStringUtils.generateNum(20)));
+ RandomStringUtils.generateNum(20)));
} else {
body.add(new BasicNameValuePair(PushMessageRequestBody.BIZSEQNO,
- handleMsgContext.getBizSeqNo()));
+ handleMsgContext.getBizSeqNo()));
}
if (StringUtils.isBlank(handleMsgContext.getUniqueId())) {
body.add(new BasicNameValuePair(PushMessageRequestBody.UNIQUEID,
- RandomStringUtils.generateNum(20)));
+ RandomStringUtils.generateNum(20)));
} else {
body.add(new BasicNameValuePair(PushMessageRequestBody.UNIQUEID,
- handleMsgContext.getUniqueId()));
+ handleMsgContext.getUniqueId()));
}
body.add(new BasicNameValuePair(PushMessageRequestBody.RANDOMNO,
- handleMsgContext.getMsgRandomNo()));
+ handleMsgContext.getMsgRandomNo()));
body.add(new BasicNameValuePair(PushMessageRequestBody.TOPIC, handleMsgContext.getTopic()));
body.add(new BasicNameValuePair(PushMessageRequestBody.EXTFIELDS,
- JsonUtils.serialize(EventMeshUtil.getEventProp(handleMsgContext.getEvent()))));
+ JsonUtils.serialize(EventMeshUtil.getEventProp(handleMsgContext.getEvent()))));
+
+ HttpEntity httpEntity = new UrlEncodedFormEntity(body, StandardCharsets.UTF_8);
+
+ builder.setEntity(httpEntity);
+
+ // for CloudEvents Webhook spec
+ String urlAuthType = handleMsgContext.getConsumerGroupConfig().getConsumerGroupTopicConf()
+ .get(handleMsgContext.getTopic()).getHttpAuthTypeMap().get(currPushUrl);
- builder.setEntity(new UrlEncodedFormEntity(body, StandardCharsets.UTF_8));
+ WebhookUtil.setWebhookHeaders(builder, httpEntity.getContentType().getValue(), eventMeshHttpConfiguration.eventMeshWebhookOrigin,
+ urlAuthType);
eventMeshHTTPServer.metrics.getSummaryMetrics().recordPushMsg();
@@ -166,41 +178,32 @@ public class AsyncHTTPPushRequest extends AbstractHTTPPushRequest {
addToWaitingMap(this);
cmdLogger.info("cmd={}|eventMesh2client|from={}|to={}", requestCode,
- IPUtils.getLocalAddress(), currPushUrl);
+ IPUtils.getLocalAddress(), currPushUrl);
try {
- httpClientPool.getClient().execute(builder, new ResponseHandler<Object>() {
+ eventMeshHTTPServer.httpClientPool.getClient().execute(builder, new ResponseHandler<Object>() {
@Override
public Object handleResponse(HttpResponse response) {
removeWaitingMap(AsyncHTTPPushRequest.this);
long cost = System.currentTimeMillis() - lastPushTime;
eventMeshHTTPServer.metrics.getSummaryMetrics().recordHTTPPushTimeCost(cost);
- if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK) {
- eventMeshHTTPServer.metrics.getSummaryMetrics().recordHttpPushMsgFailed();
- messageLogger.info(
- "message|eventMesh2client|exception|url={}|topic={}|bizSeqNo={}"
- + "|uniqueId={}|cost={}", currPushUrl, handleMsgContext.getTopic(),
- handleMsgContext.getBizSeqNo(), handleMsgContext.getUniqueId(), cost);
- delayRetry();
- if (isComplete()) {
- handleMsgContext.finish();
- }
- } else {
+ if (processResponseStatus(response.getStatusLine().getStatusCode(), response)) {
+ // this is successful response, process response payload
String res = "";
try {
res = EntityUtils.toString(response.getEntity(),
- Charset.forName(EventMeshConstants.DEFAULT_CHARSET));
+ Charset.forName(EventMeshConstants.DEFAULT_CHARSET));
} catch (IOException e) {
handleMsgContext.finish();
return new Object();
}
ClientRetCode result = processResponseContent(res);
messageLogger.info(
- "message|eventMesh2client|{}|url={}|topic={}|bizSeqNo={}"
- + "|uniqueId={}|cost={}",
- result, currPushUrl, handleMsgContext.getTopic(),
- handleMsgContext.getBizSeqNo(), handleMsgContext.getUniqueId(), cost);
+ "message|eventMesh2client|{}|url={}|topic={}|bizSeqNo={}"
+ + "|uniqueId={}|cost={}",
+ result, currPushUrl, handleMsgContext.getTopic(),
+ handleMsgContext.getBizSeqNo(), handleMsgContext.getUniqueId(), cost);
if (result == ClientRetCode.OK) {
complete();
if (isComplete()) {
@@ -222,6 +225,16 @@ public class AsyncHTTPPushRequest extends AbstractHTTPPushRequest {
handleMsgContext.finish();
}
}
+ } else {
+ eventMeshHTTPServer.metrics.getSummaryMetrics().recordHttpPushMsgFailed();
+ messageLogger.info(
+ "message|eventMesh2client|exception|url={}|topic={}|bizSeqNo={}"
+ + "|uniqueId={}|cost={}", currPushUrl, handleMsgContext.getTopic(),
+ handleMsgContext.getBizSeqNo(), handleMsgContext.getUniqueId(), cost);
+
+ if (isComplete()) {
+ handleMsgContext.finish();
+ }
}
return new Object();
}
@@ -229,13 +242,13 @@ public class AsyncHTTPPushRequest extends AbstractHTTPPushRequest {
if (messageLogger.isDebugEnabled()) {
messageLogger.debug("message|eventMesh2client|url={}|topic={}|event={}", currPushUrl,
- handleMsgContext.getTopic(),
- handleMsgContext.getEvent());
+ handleMsgContext.getTopic(),
+ handleMsgContext.getEvent());
} else {
messageLogger
- .info("message|eventMesh2client|url={}|topic={}|bizSeqNo={}|uniqueId={}",
- currPushUrl, handleMsgContext.getTopic(),
- handleMsgContext.getBizSeqNo(), handleMsgContext.getUniqueId());
+ .info("message|eventMesh2client|url={}|topic={}|bizSeqNo={}|uniqueId={}",
+ currPushUrl, handleMsgContext.getTopic(),
+ handleMsgContext.getBizSeqNo(), handleMsgContext.getUniqueId());
}
} catch (IOException e) {
messageLogger.error("push2client err", e);
@@ -251,19 +264,44 @@ public class AsyncHTTPPushRequest extends AbstractHTTPPushRequest {
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("asyncPushRequest={")
- .append("bizSeqNo=").append(handleMsgContext.getBizSeqNo())
- .append(",startIdx=").append(startIdx)
- .append(",retryTimes=").append(retryTimes)
- .append(",uniqueId=").append(handleMsgContext.getUniqueId())
- .append(",executeTime=")
- .append(DateFormatUtils.format(executeTime, Constants.DATE_FORMAT))
- .append(",lastPushTime=")
- .append(DateFormatUtils.format(lastPushTime, Constants.DATE_FORMAT))
- .append(",createTime=")
- .append(DateFormatUtils.format(createTime, Constants.DATE_FORMAT)).append("}");
+ .append("bizSeqNo=").append(handleMsgContext.getBizSeqNo())
+ .append(",startIdx=").append(startIdx)
+ .append(",retryTimes=").append(retryTimes)
+ .append(",uniqueId=").append(handleMsgContext.getUniqueId())
+ .append(",executeTime=")
+ .append(DateFormatUtils.format(executeTime, Constants.DATE_FORMAT))
+ .append(",lastPushTime=")
+ .append(DateFormatUtils.format(lastPushTime, Constants.DATE_FORMAT))
+ .append(",createTime=")
+ .append(DateFormatUtils.format(createTime, Constants.DATE_FORMAT)).append("}");
return sb.toString();
}
+ boolean processResponseStatus(int httpStatus, HttpResponse httpResponse) {
+ if (httpStatus == HttpStatus.SC_OK || httpStatus == HttpStatus.SC_CREATED
+ || httpStatus == HttpStatus.SC_NO_CONTENT || httpStatus == HttpStatus.SC_ACCEPTED) {
+ // success http response
+ return true;
+ } else if (httpStatus == 429) {
+ // failed with customer retry interval
+
+ // Response Status code is 429 Too Many Requests
+ // retry after the time specified by the header
+ Optional<Header> optHeader = Arrays.stream(httpResponse.getHeaders("Retry-After")).findAny();
+ if (optHeader.isPresent() && StringUtils.isNumeric(optHeader.get().getValue())) {
+ delayRetry(Long.parseLong(optHeader.get().getValue()));
+ }
+ return false;
+ } else if (httpStatus == HttpStatus.SC_GONE || httpStatus == HttpStatus.SC_UNSUPPORTED_MEDIA_TYPE) {
+ // failed with no retry
+ return false;
+ }
+
+ // failed with default retry
+ delayRetry();
+ return false;
+ }
+
ClientRetCode processResponseContent(String content) {
if (StringUtils.isBlank(content)) {
return ClientRetCode.FAIL;
@@ -271,8 +309,8 @@ public class AsyncHTTPPushRequest extends AbstractHTTPPushRequest {
try {
Map<String, Object> ret =
- JsonUtils.deserialize(content, new TypeReference<Map<String, Object>>() {
- });
+ JsonUtils.deserialize(content, new TypeReference<Map<String, Object>>() {
+ });
Integer retCode = (Integer) ret.get("retCode");
if (retCode != null && ClientRetCode.contains(retCode)) {
return ClientRetCode.get(retCode);
@@ -281,15 +319,15 @@ public class AsyncHTTPPushRequest extends AbstractHTTPPushRequest {
return ClientRetCode.FAIL;
} catch (NumberFormatException e) {
messageLogger.warn("url:{}, bizSeqno:{}, uniqueId:{}, httpResponse:{}", currPushUrl,
- handleMsgContext.getBizSeqNo(), handleMsgContext.getUniqueId(), content);
+ handleMsgContext.getBizSeqNo(), handleMsgContext.getUniqueId(), content);
return ClientRetCode.FAIL;
} catch (JsonException e) {
messageLogger.warn("url:{}, bizSeqno:{}, uniqueId:{}, httpResponse:{}", currPushUrl,
- handleMsgContext.getBizSeqNo(), handleMsgContext.getUniqueId(), content);
+ handleMsgContext.getBizSeqNo(), handleMsgContext.getUniqueId(), content);
return ClientRetCode.FAIL;
} catch (Throwable t) {
messageLogger.warn("url:{}, bizSeqno:{}, uniqueId:{}, httpResponse:{}", currPushUrl,
- handleMsgContext.getBizSeqNo(), handleMsgContext.getUniqueId(), content);
+ handleMsgContext.getBizSeqNo(), handleMsgContext.getUniqueId(), content);
return ClientRetCode.FAIL;
}
}
@@ -300,7 +338,7 @@ public class AsyncHTTPPushRequest extends AbstractHTTPPushRequest {
return;
}
waitingRequests
- .put(request.handleMsgContext.getConsumerGroup(), Sets.newConcurrentHashSet());
+ .put(request.handleMsgContext.getConsumerGroup(), Sets.newConcurrentHashSet());
waitingRequests.get(request.handleMsgContext.getConsumerGroup()).add(request);
}
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/HTTPClientPool.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/HTTPClientPool.java
index 28eec0a..8a96937 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/HTTPClientPool.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/HTTPClientPool.java
@@ -19,17 +19,41 @@ package org.apache.eventmesh.runtime.core.protocol.http.push;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.RandomUtils;
+import org.apache.http.config.Registry;
+import org.apache.http.config.RegistryBuilder;
+import org.apache.http.conn.socket.ConnectionSocketFactory;
+import org.apache.http.conn.socket.PlainConnectionSocketFactory;
+import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
+import org.apache.http.impl.DefaultConnectionReuseStrategy;
import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.DefaultConnectionKeepAliveStrategy;
+import org.apache.http.impl.client.DefaultHttpRequestRetryHandler;
import org.apache.http.impl.client.HttpClients;
+import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
+import org.apache.http.ssl.SSLContexts;
+import org.apache.http.ssl.TrustStrategy;
+import java.security.KeyManagementException;
+import java.security.KeyStoreException;
+import java.security.NoSuchAlgorithmException;
+import java.security.cert.X509Certificate;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.Iterator;
import java.util.List;
+import java.util.concurrent.TimeUnit;
-import com.google.common.collect.Lists;
+import javax.net.ssl.HostnameVerifier;
+import javax.net.ssl.SSLContext;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class HTTPClientPool {
- private List<CloseableHttpClient> clients = Lists.newArrayList();
+ public Logger logger = LoggerFactory.getLogger(this.getClass());
+
+ private List<CloseableHttpClient> clients = Collections.synchronizedList(new ArrayList<>());
private int core = 1;
@@ -39,14 +63,13 @@ public class HTTPClientPool {
public CloseableHttpClient getClient() {
if (CollectionUtils.size(clients) < core) {
- CloseableHttpClient client = HttpClients.createDefault();
+ CloseableHttpClient client = getHttpClient(200, 30, null);
clients.add(client);
return client;
}
return clients.get(RandomUtils.nextInt(core, 2 * core) % core);
}
-
public void shutdown() throws Exception {
Iterator<CloseableHttpClient> itr = clients.iterator();
while (itr.hasNext()) {
@@ -55,4 +78,41 @@ public class HTTPClientPool {
itr.remove();
}
}
+
+ public CloseableHttpClient getHttpClient(int maxTotal, int idleTimeInSeconds, SSLContext sslContext) {
+ try {
+ if (sslContext == null) {
+ sslContext = SSLContexts.custom().loadTrustMaterial(new TheTrustStrategy()).build();
+ }
+ } catch (KeyManagementException | NoSuchAlgorithmException | KeyStoreException e) {
+ logger.error("Get sslContext error: {}", e.getMessage());
+ return HttpClients.createDefault();
+ }
+
+ HostnameVerifier hostnameVerifier = SSLConnectionSocketFactory.ALLOW_ALL_HOSTNAME_VERIFIER;
+
+ SSLConnectionSocketFactory sslsf = new SSLConnectionSocketFactory(sslContext, hostnameVerifier);
+ Registry<ConnectionSocketFactory> socketFactoryRegistry
+ = RegistryBuilder.<ConnectionSocketFactory>create()
+ .register("http", PlainConnectionSocketFactory.getSocketFactory())
+ .register("https", sslsf).build();
+ PoolingHttpClientConnectionManager connectionManager = new PoolingHttpClientConnectionManager(socketFactoryRegistry);
+
+ connectionManager.setDefaultMaxPerRoute(maxTotal);
+ connectionManager.setMaxTotal(maxTotal);
+ return HttpClients.custom()
+ .setConnectionManager(connectionManager)
+ .setKeepAliveStrategy(new DefaultConnectionKeepAliveStrategy())
+ .evictIdleConnections(idleTimeInSeconds, TimeUnit.SECONDS)
+ .setConnectionReuseStrategy(new DefaultConnectionReuseStrategy())
+ .setRetryHandler(new DefaultHttpRequestRetryHandler())
+ .build();
+ }
+
+ public static class TheTrustStrategy implements TrustStrategy {
+ @Override
+ public boolean isTrusted(X509Certificate[] arg0, String arg1) {
+ return true;
+ }
+ }
}
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/WebhookUtil.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/WebhookUtil.java
new file mode 100644
index 0000000..6cc515a
--- /dev/null
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/WebhookUtil.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.runtime.util;
+
+import org.apache.eventmesh.api.auth.AuthService;
+import org.apache.eventmesh.spi.EventMeshExtensionFactory;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpOptions;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.message.BasicHeader;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Utility class for implementing CloudEvents Http Webhook spec
+ *
+ * @see <a href="https://github.com/cloudevents/spec/blob/v1.0.2/cloudevents/http-webhook.md">CloudEvents Http Webhook</a>
+ */
+public class WebhookUtil {
+
+ private static final Logger logger = LoggerFactory.getLogger(WebhookUtil.class.getName());
+
+ private static final String CONTENT_TYPE_HEADER = "Content-Type";
+ private static final String REQUEST_ORIGIN_HEADER = "WebHook-Request-Origin";
+ private static final String ALLOWED_ORIGIN_HEADER = "WebHook-Allowed-Origin";
+
+ private static final Map<String, AuthService> authServices = new ConcurrentHashMap<>();
+
+ public static boolean obtainDeliveryAgreement(CloseableHttpClient httpClient, String targetUrl, String requestOrigin) {
+ logger.info("obtain webhook delivery agreement for url: {}", targetUrl);
+ HttpOptions builder = new HttpOptions(targetUrl);
+ builder.addHeader(REQUEST_ORIGIN_HEADER, requestOrigin);
+
+ try (CloseableHttpResponse response = httpClient.execute(builder)) {
+ String allowedOrigin = response.getLastHeader(ALLOWED_ORIGIN_HEADER).getValue();
+ return StringUtils.isEmpty(allowedOrigin)
+ || allowedOrigin.equals("*") || allowedOrigin.equalsIgnoreCase(requestOrigin);
+ } catch (Exception e) {
+ logger.warn("HTTP Options Method is not supported at the Delivery Target: {},"
+ + " unable to obtain the webhook delivery agreement.", targetUrl);
+ }
+ return true;
+ }
+
+ public static void setWebhookHeaders(HttpPost builder, String contentType, String requestOrigin, String urlAuthType) {
+ builder.setHeader(CONTENT_TYPE_HEADER, contentType);
+ builder.setHeader(REQUEST_ORIGIN_HEADER, requestOrigin);
+
+ Map<String, String> authParam = getHttpAuthParam(urlAuthType);
+ if (authParam != null) {
+ authParam.forEach((k, v) -> builder.addHeader(new BasicHeader(k, v)));
+ }
+ }
+
+ private static Map<String, String> getHttpAuthParam(String authType) {
+ if (StringUtils.isEmpty(authType)) {
+ return null;
+ }
+ AuthService authService = getHttpAuthPlugin(authType);
+ if (authService != null) {
+ return authService.getAuthParams();
+ } else {
+ return null;
+ }
+ }
+
+ private static AuthService getHttpAuthPlugin(String pluginType) {
+ if (authServices.containsKey(pluginType)) {
+ return authServices.get(pluginType);
+ }
+
+ AuthService authService = EventMeshExtensionFactory.getExtension(AuthService.class, pluginType);
+
+ if (authService == null) {
+ logger.error("can't load the authService plugin, please check.");
+ throw new RuntimeException("doesn't load the authService plugin, please check.");
+ }
+ try {
+ authService.init();
+ authServices.put(pluginType, authService);
+ return authService;
+ } catch (Exception e) {
+ logger.error("Error in initializing authService", e);
+ }
+ return null;
+ }
+}
diff --git a/eventmesh-security-plugin/eventmesh-security-api/build.gradle b/eventmesh-security-plugin/eventmesh-security-api/build.gradle
index 0d41042..482f87d 100644
--- a/eventmesh-security-plugin/eventmesh-security-api/build.gradle
+++ b/eventmesh-security-plugin/eventmesh-security-api/build.gradle
@@ -16,6 +16,8 @@
*/
dependencies {
+ implementation 'org.slf4j:slf4j-api'
+
api project(":eventmesh-spi")
testImplementation project(":eventmesh-spi")
diff --git a/eventmesh-security-plugin/eventmesh-security-api/build.gradle b/eventmesh-security-plugin/eventmesh-security-api/src/main/java/org/apache/eventmesh/api/auth/AuthService.java
similarity index 60%
copy from eventmesh-security-plugin/eventmesh-security-api/build.gradle
copy to eventmesh-security-plugin/eventmesh-security-api/src/main/java/org/apache/eventmesh/api/auth/AuthService.java
index 0d41042..d957b50 100644
--- a/eventmesh-security-plugin/eventmesh-security-api/build.gradle
+++ b/eventmesh-security-plugin/eventmesh-security-api/src/main/java/org/apache/eventmesh/api/auth/AuthService.java
@@ -15,8 +15,25 @@
* limitations under the License.
*/
-dependencies {
- api project(":eventmesh-spi")
+package org.apache.eventmesh.api.auth;
- testImplementation project(":eventmesh-spi")
+import org.apache.eventmesh.api.exception.AuthException;
+import org.apache.eventmesh.spi.EventMeshExtensionType;
+import org.apache.eventmesh.spi.EventMeshSPI;
+
+import java.util.Map;
+
+/**
+ * AuthService
+ */
+@EventMeshSPI(isSingleton = true, eventMeshExtensionType = EventMeshExtensionType.SECURITY)
+public interface AuthService {
+
+ void init() throws AuthException;
+
+ void start() throws AuthException;
+
+ void shutdown() throws AuthException;
+
+ Map getAuthParams() throws AuthException;
}
diff --git a/eventmesh-security-plugin/eventmesh-security-api/src/main/java/org/apache/eventmesh/api/common/ConfigurationWrapper.java b/eventmesh-security-plugin/eventmesh-security-api/src/main/java/org/apache/eventmesh/api/common/ConfigurationWrapper.java
new file mode 100644
index 0000000..dc359c1
--- /dev/null
+++ b/eventmesh-security-plugin/eventmesh-security-api/src/main/java/org/apache/eventmesh/api/common/ConfigurationWrapper.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.api.common;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.net.URL;
+import java.util.Properties;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ConfigurationWrapper {
+
+ private static Logger logger = LoggerFactory.getLogger("ConfigurationWrapper");
+
+ private static final String EVENTMESH_CONFIG_HOME = System.getProperty("confPath", System.getenv("confPath"));
+
+ public static Properties getConfig(String configFile) {
+ String configFilePath;
+
+ // get from classpath
+ URL resource = ConfigurationWrapper.class.getClassLoader().getResource(configFile);
+ if (resource != null && new File(resource.getPath()).exists()) {
+ configFilePath = resource.getPath();
+ } else {
+ // get from config home
+ configFilePath = EVENTMESH_CONFIG_HOME + File.separator + configFile;
+ }
+
+ logger.info("loading auth config: {}", configFilePath);
+ Properties properties = new Properties();
+ try {
+ properties.load(new BufferedReader(new FileReader(configFilePath)));
+ } catch (IOException e) {
+ throw new IllegalArgumentException(
+ String.format("Cannot load RocketMQ configuration file from :%s", configFilePath));
+ }
+ return properties;
+ }
+}
diff --git a/eventmesh-security-plugin/eventmesh-security-api/build.gradle b/eventmesh-security-plugin/eventmesh-security-api/src/main/java/org/apache/eventmesh/api/exception/AuthException.java
similarity index 74%
copy from eventmesh-security-plugin/eventmesh-security-api/build.gradle
copy to eventmesh-security-plugin/eventmesh-security-api/src/main/java/org/apache/eventmesh/api/exception/AuthException.java
index 0d41042..67bef31 100644
--- a/eventmesh-security-plugin/eventmesh-security-api/build.gradle
+++ b/eventmesh-security-plugin/eventmesh-security-api/src/main/java/org/apache/eventmesh/api/exception/AuthException.java
@@ -15,8 +15,15 @@
* limitations under the License.
*/
-dependencies {
- api project(":eventmesh-spi")
+package org.apache.eventmesh.api.exception;
- testImplementation project(":eventmesh-spi")
+public class AuthException extends RuntimeException {
+
+ public AuthException(String message) {
+ super(message);
+ }
+
+ public AuthException(String message, Throwable cause) {
+ super(message, cause);
+ }
}
diff --git a/eventmesh-security-plugin/eventmesh-security-api/build.gradle b/eventmesh-security-plugin/eventmesh-security-auth-http-basic/build.gradle
similarity index 83%
copy from eventmesh-security-plugin/eventmesh-security-api/build.gradle
copy to eventmesh-security-plugin/eventmesh-security-auth-http-basic/build.gradle
index 0d41042..bb0f4d6 100644
--- a/eventmesh-security-plugin/eventmesh-security-api/build.gradle
+++ b/eventmesh-security-plugin/eventmesh-security-auth-http-basic/build.gradle
@@ -16,7 +16,7 @@
*/
dependencies {
- api project(":eventmesh-spi")
+ implementation project(":eventmesh-security-plugin:eventmesh-security-api")
- testImplementation project(":eventmesh-spi")
-}
+ testImplementation project(":eventmesh-security-plugin:eventmesh-security-api")
+}
\ No newline at end of file
diff --git a/eventmesh-security-plugin/eventmesh-security-auth-http-basic/gradle.properties b/eventmesh-security-plugin/eventmesh-security-auth-http-basic/gradle.properties
new file mode 100644
index 0000000..83021bb
--- /dev/null
+++ b/eventmesh-security-plugin/eventmesh-security-auth-http-basic/gradle.properties
@@ -0,0 +1,18 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+pluginType=security
+pluginName=auth-http-basic
\ No newline at end of file
diff --git a/eventmesh-security-plugin/eventmesh-security-api/build.gradle b/eventmesh-security-plugin/eventmesh-security-auth-http-basic/src/main/java/org/apache/eventmesh/auth/http/basic/config/AuthConfigs.java
similarity index 54%
copy from eventmesh-security-plugin/eventmesh-security-api/build.gradle
copy to eventmesh-security-plugin/eventmesh-security-auth-http-basic/src/main/java/org/apache/eventmesh/auth/http/basic/config/AuthConfigs.java
index 0d41042..c077bf4 100644
--- a/eventmesh-security-plugin/eventmesh-security-api/build.gradle
+++ b/eventmesh-security-plugin/eventmesh-security-auth-http-basic/src/main/java/org/apache/eventmesh/auth/http/basic/config/AuthConfigs.java
@@ -15,8 +15,27 @@
* limitations under the License.
*/
-dependencies {
- api project(":eventmesh-spi")
+package org.apache.eventmesh.auth.http.basic.config;
- testImplementation project(":eventmesh-spi")
+import org.apache.eventmesh.api.common.ConfigurationWrapper;
+
+import java.util.Properties;
+
+public class AuthConfigs {
+
+ public String username;
+
+ public String password;
+
+ private static AuthConfigs instance;
+
+ public static synchronized AuthConfigs getConfigs() {
+ if (instance == null) {
+ Properties props = ConfigurationWrapper.getConfig("auth-http-basic.properties");
+ instance = new AuthConfigs();
+ instance.username = props.getProperty("auth.username");
+ instance.password = props.getProperty("auth.password");
+ }
+ return instance;
+ }
}
diff --git a/eventmesh-security-plugin/eventmesh-security-auth-http-basic/src/main/java/org/apache/eventmesh/auth/http/basic/impl/AuthHttpBasicService.java b/eventmesh-security-plugin/eventmesh-security-auth-http-basic/src/main/java/org/apache/eventmesh/auth/http/basic/impl/AuthHttpBasicService.java
new file mode 100644
index 0000000..c32c988
--- /dev/null
+++ b/eventmesh-security-plugin/eventmesh-security-auth-http-basic/src/main/java/org/apache/eventmesh/auth/http/basic/impl/AuthHttpBasicService.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.auth.http.basic.impl;
+
+import org.apache.eventmesh.api.auth.AuthService;
+import org.apache.eventmesh.api.exception.AuthException;
+import org.apache.eventmesh.auth.http.basic.config.AuthConfigs;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
+import java.util.HashMap;
+import java.util.Map;
+
+public class AuthHttpBasicService implements AuthService {
+
+ private AuthConfigs authConfigs;
+
+ @Override
+ public void init() throws AuthException {
+ authConfigs = AuthConfigs.getConfigs();
+ }
+
+ @Override
+ public void start() throws AuthException {
+
+ }
+
+ @Override
+ public void shutdown() throws AuthException {
+
+ }
+
+ @Override
+ public Map getAuthParams() throws AuthException {
+ if (authConfigs == null) {
+ init();
+ }
+
+ String token = Base64.getEncoder().encodeToString((authConfigs.username + authConfigs.password)
+ .getBytes(StandardCharsets.UTF_8));
+
+ Map authParams = new HashMap();
+ authParams.put("Authorization", "Basic " + token);
+ return authParams;
+ }
+}
diff --git a/eventmesh-security-plugin/eventmesh-security-auth-http-basic/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.api.auth.AuthService b/eventmesh-security-plugin/eventmesh-security-auth-http-basic/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.api.auth.AuthService
new file mode 100644
index 0000000..2dde936
--- /dev/null
+++ b/eventmesh-security-plugin/eventmesh-security-auth-http-basic/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.api.auth.AuthService
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+auth-http-basic=org.apache.eventmesh.auth.http.basic.impl.AuthHttpBasicService
\ No newline at end of file
diff --git a/eventmesh-security-plugin/eventmesh-security-auth-http-basic/src/main/resources/auth-http-basic.properties b/eventmesh-security-plugin/eventmesh-security-auth-http-basic/src/main/resources/auth-http-basic.properties
new file mode 100644
index 0000000..f06e6c1
--- /dev/null
+++ b/eventmesh-security-plugin/eventmesh-security-auth-http-basic/src/main/resources/auth-http-basic.properties
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+auth.username = ****
+auth.password = ****
\ No newline at end of file
diff --git a/settings.gradle b/settings.gradle
index 6481fdd..08c0816 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -42,7 +42,9 @@ findProject(':eventmesh-protocol-plugin:eventmesh-protocol-grpcmessage')?.name =
include 'eventmesh-metrics-plugin'
include 'eventmesh-metrics-plugin:eventmesh-metrics-api'
include 'eventmesh-metrics-plugin:eventmesh-metrics-opentelemetry'
+
+include 'eventmesh-security-plugin:eventmesh-security-auth-http-basic'
+
include 'eventmesh-trace-plugin'
include 'eventmesh-trace-plugin:eventmesh-trace-api'
include 'eventmesh-trace-plugin:eventmesh-trace-zipkin'
-
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org