You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by mi...@apache.org on 2022/02/25 07:39:24 UTC

[incubator-eventmesh] branch master updated: [Issue #658] Eventmesh Http Support CloudEvents Webhook spec (#772)

This is an automated email from the ASF dual-hosted git repository.

mikexue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git


The following commit(s) were added to refs/heads/master by this push:
     new a52bf29  [Issue #658] Eventmesh Http Support CloudEvents Webhook spec (#772)
a52bf29 is described below

commit a52bf296a2a70128d91a77ca75c682aaf41f2289
Author: jinrongluo <ka...@gmail.com>
AuthorDate: Fri Feb 25 02:39:17 2022 -0500

    [Issue #658] Eventmesh Http Support CloudEvents Webhook spec (#772)
    
    * [Issue #658] support CloudEvents Webhook spec
    
    * [Issue #658] create auth-http-basic security module
    
    * [Issue #658] create auth-http-basic security module
    
    * [Issue #658] code refactor for CloudEvents Webhook
    
    * [Issue #658] fix build checkstyles
    
    * [Issue #658] fix javadoc build issue
    
    * [Issue #658] fix javadoc build issue in eventmesh-security-auth-http-basic
    
    * [Issue #658] adding more log to the WebhookUtil
    
    * [Issue #658] address PR review comments.
    
    * [Issue #658] fixed checkstyles issue
    
    Co-authored-by: mike_xwm <mi...@126.com>
---
 .../org/apache/eventmesh/common/Constants.java     |   2 +-
 .../common/config/CommonConfiguration.java         |   2 +
 .../demo/pub/cloudevents/AsyncPublishInstance.java |   2 +-
 eventmesh-runtime/build.gradle                     |   1 +
 .../runtime/boot/EventMeshHTTPServer.java          |   5 +-
 .../core/consumergroup/ConsumerGroupTopicConf.java |   9 ++
 .../http/processor/SubscribeProcessor.java         |  17 +++
 .../http/push/AbstractHTTPPushRequest.java         |  12 +-
 .../protocol/http/push/AsyncHTTPPushRequest.java   | 152 +++++++++++++--------
 .../core/protocol/http/push/HTTPClientPool.java    |  68 ++++++++-
 .../apache/eventmesh/runtime/util/WebhookUtil.java | 109 +++++++++++++++
 .../eventmesh-security-api/build.gradle            |   2 +
 .../apache/eventmesh/api/auth/AuthService.java}    |  23 +++-
 .../eventmesh/api/common/ConfigurationWrapper.java |  58 ++++++++
 .../eventmesh/api/exception/AuthException.java}    |  13 +-
 .../build.gradle                                   |   6 +-
 .../gradle.properties                              |  18 +++
 .../auth/http/basic/config/AuthConfigs.java}       |  25 +++-
 .../auth/http/basic/impl/AuthHttpBasicService.java |  61 +++++++++
 .../org.apache.eventmesh.api.auth.AuthService      |  16 +++
 .../src/main/resources/auth-http-basic.properties  |  18 +++
 settings.gradle                                    |   4 +-
 22 files changed, 544 insertions(+), 79 deletions(-)

diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/Constants.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/Constants.java
index dadd908..ed96085 100644
--- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/Constants.java
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/Constants.java
@@ -35,7 +35,7 @@ public class Constants {
 
     public static final String PROTOCOL_DESC = "protocoldesc";
 
-    public static final int DEFAULT_HTTP_TIME_OUT = 3000;
+    public static final int DEFAULT_HTTP_TIME_OUT = 15000;
 
     public static final String EVENTMESH_MESSAGE_CONST_TTL = "ttl";
 
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/CommonConfiguration.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/CommonConfiguration.java
index 8d3e3eb..2552098 100644
--- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/CommonConfiguration.java
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/CommonConfiguration.java
@@ -48,6 +48,8 @@ public class CommonConfiguration {
     public    boolean              eventMeshServerRegistryEnable      = false;
     protected ConfigurationWrapper configurationWrapper;
 
+    public String eventMeshWebhookOrigin = "eventmesh." + eventMeshIDC;
+
     public CommonConfiguration(ConfigurationWrapper configurationWrapper) {
         this.configurationWrapper = configurationWrapper;
     }
diff --git a/eventmesh-examples/src/main/java/org/apache/eventmesh/http/demo/pub/cloudevents/AsyncPublishInstance.java b/eventmesh-examples/src/main/java/org/apache/eventmesh/http/demo/pub/cloudevents/AsyncPublishInstance.java
index fd3b1f4..e4cd10f 100644
--- a/eventmesh-examples/src/main/java/org/apache/eventmesh/http/demo/pub/cloudevents/AsyncPublishInstance.java
+++ b/eventmesh-examples/src/main/java/org/apache/eventmesh/http/demo/pub/cloudevents/AsyncPublishInstance.java
@@ -44,7 +44,7 @@ import lombok.extern.slf4j.Slf4j;
 public class AsyncPublishInstance {
     
     // This messageSize is also used in SubService.java (Subscriber)
-    public static final int MESSAGE_SIZE = 5;
+    public static final int MESSAGE_SIZE = 1;
     
     public static final String DEFAULT_IP_PORT = "127.0.0.1:10105";
     
diff --git a/eventmesh-runtime/build.gradle b/eventmesh-runtime/build.gradle
index 54e763d..a3b9016 100644
--- a/eventmesh-runtime/build.gradle
+++ b/eventmesh-runtime/build.gradle
@@ -36,6 +36,7 @@ dependencies {
     implementation project(":eventmesh-connector-plugin:eventmesh-connector-standalone")
     implementation project(":eventmesh-security-plugin:eventmesh-security-api")
     implementation project(":eventmesh-security-plugin:eventmesh-security-acl")
+    implementation project(":eventmesh-security-plugin:eventmesh-security-auth-http-basic")
     implementation project(":eventmesh-registry-plugin:eventmesh-registry-api")
     implementation project(":eventmesh-admin:eventmesh-admin-rocketmq")
 
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshHTTPServer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshHTTPServer.java
index dfa6e6b..07ec0a4 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshHTTPServer.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshHTTPServer.java
@@ -37,6 +37,7 @@ import org.apache.eventmesh.runtime.core.protocol.http.processor.UnSubscribeProc
 import org.apache.eventmesh.runtime.core.protocol.http.processor.inf.Client;
 import org.apache.eventmesh.runtime.core.protocol.http.producer.ProducerManager;
 import org.apache.eventmesh.runtime.core.protocol.http.push.AbstractHTTPPushRequest;
+import org.apache.eventmesh.runtime.core.protocol.http.push.HTTPClientPool;
 import org.apache.eventmesh.runtime.core.protocol.http.retry.HttpRetryer;
 import org.apache.eventmesh.runtime.metrics.http.HTTPMetricsServer;
 import org.apache.eventmesh.trace.api.TracePluginFactory;
@@ -107,6 +108,8 @@ public class EventMeshHTTPServer extends AbstractHTTPServer {
 
     private RateLimiter batchRateLimiter;
 
+    public HTTPClientPool httpClientPool = new HTTPClientPool(10);
+
     public void shutdownThreadPool() throws Exception {
         batchMsgExecutor.shutdown();
         adminExecutor.shutdown();
@@ -260,7 +263,7 @@ public class EventMeshHTTPServer extends AbstractHTTPServer {
 
         shutdownThreadPool();
 
-        AbstractHTTPPushRequest.httpClientPool.shutdown();
+        httpClientPool.shutdown();
 
         producerManager.shutdown();
 
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/consumergroup/ConsumerGroupTopicConf.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/consumergroup/ConsumerGroupTopicConf.java
index f97a51d..1bc73e1 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/consumergroup/ConsumerGroupTopicConf.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/consumergroup/ConsumerGroupTopicConf.java
@@ -55,6 +55,11 @@ public class ConsumerGroupTopicConf implements Serializable {
      */
     private Set<String> urls = Sets.newConcurrentHashSet();
 
+    /**
+     * url auth type
+     */
+    private Map<String, String> httpAuthTypeMap = Maps.newConcurrentMap();
+
     @Override
     public boolean equals(Object o) {
         if (this == o) {
@@ -127,4 +132,8 @@ public class ConsumerGroupTopicConf implements Serializable {
     public void setUrls(Set<String> urls) {
         this.urls = urls;
     }
+
+    public Map<String, String> getHttpAuthTypeMap() {
+        return httpAuthTypeMap;
+    }
 }
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SubscribeProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SubscribeProcessor.java
index 01c156b..fa521f8 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SubscribeProcessor.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SubscribeProcessor.java
@@ -39,6 +39,7 @@ import org.apache.eventmesh.runtime.core.protocol.http.processor.inf.Client;
 import org.apache.eventmesh.runtime.core.protocol.http.processor.inf.HttpRequestProcessor;
 import org.apache.eventmesh.runtime.util.EventMeshUtil;
 import org.apache.eventmesh.runtime.util.RemotingHelper;
+import org.apache.eventmesh.runtime.util.WebhookUtil;
 
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
@@ -172,6 +173,21 @@ public class SubscribeProcessor implements HttpRequestProcessor {
             return;
         }
 
+        // obtain webhook delivery agreement for Abuse Protection
+        boolean isWebhookAllowed = WebhookUtil.obtainDeliveryAgreement(eventMeshHTTPServer.httpClientPool.getClient(),
+            url, eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshWebhookOrigin);
+
+        if (!isWebhookAllowed) {
+            httpLogger.error("subscriber url {} is not allowed by the target system", url);
+            responseEventMeshCommand = request.createHttpCommandResponse(
+                subscribeResponseHeader,
+                SubscribeResponseBody
+                    .buildBody(EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR.getRetCode(),
+                        EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR.getErrMsg() + " unauthorized webhook URL: " + url));
+            asyncContext.onComplete(responseEventMeshCommand);
+            return;
+        }
+
         synchronized (eventMeshHTTPServer.localClientInfoMapping) {
 
             registerClient(subscribeRequestHeader, consumerGroup, subTopicList, url);
@@ -329,4 +345,5 @@ public class SubscribeProcessor implements HttpRequestProcessor {
             }
         }
     }
+
 }
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AbstractHTTPPushRequest.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AbstractHTTPPushRequest.java
index dd60a62..1f7c3a5 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AbstractHTTPPushRequest.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AbstractHTTPPushRequest.java
@@ -57,8 +57,6 @@ public abstract class AbstractHTTPPushRequest extends RetryContext {
 
     public HandleMsgContext handleMsgContext;
 
-    public static HTTPClientPool httpClientPool = new HTTPClientPool(10);
-
     private AtomicBoolean complete = new AtomicBoolean(Boolean.FALSE);
 
     public AbstractHTTPPushRequest(HandleMsgContext handleMsgContext) {
@@ -75,6 +73,16 @@ public abstract class AbstractHTTPPushRequest extends RetryContext {
     public void tryHTTPRequest() {
     }
 
+    public void delayRetry(long delayTime) {
+        if (retryTimes < EventMeshConstants.DEFAULT_PUSH_RETRY_TIMES && delayTime > 0) {
+            retryTimes++;
+            delay(delayTime);
+            retryer.pushRetry(this);
+        } else {
+            complete.compareAndSet(Boolean.FALSE, Boolean.TRUE);
+        }
+    }
+
     public void delayRetry() {
         if (retryTimes < EventMeshConstants.DEFAULT_PUSH_RETRY_TIMES) {
             retryTimes++;
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AsyncHTTPPushRequest.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AsyncHTTPPushRequest.java
index 89c59c9..d874c2b 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AsyncHTTPPushRequest.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AsyncHTTPPushRequest.java
@@ -35,9 +35,12 @@ import org.apache.eventmesh.protocol.api.ProtocolPluginFactory;
 import org.apache.eventmesh.runtime.constants.EventMeshConstants;
 import org.apache.eventmesh.runtime.core.protocol.http.consumer.HandleMsgContext;
 import org.apache.eventmesh.runtime.util.EventMeshUtil;
+import org.apache.eventmesh.runtime.util.WebhookUtil;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.time.DateFormatUtils;
+import org.apache.http.Header;
+import org.apache.http.HttpEntity;
 import org.apache.http.HttpResponse;
 import org.apache.http.HttpStatus;
 import org.apache.http.NameValuePair;
@@ -51,9 +54,11 @@ import java.io.IOException;
 import java.nio.charset.Charset;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.Set;
 
 import org.slf4j.Logger;
@@ -72,10 +77,8 @@ public class AsyncHTTPPushRequest extends AbstractHTTPPushRequest {
     public Logger cmdLogger = LoggerFactory.getLogger("cmd");
 
     public Logger logger = LoggerFactory.getLogger(this.getClass());
-
-    private Map<String, Set<AbstractHTTPPushRequest>> waitingRequests;
-
     public String currPushUrl;
+    private Map<String, Set<AbstractHTTPPushRequest>> waitingRequests;
 
     public AsyncHTTPPushRequest(HandleMsgContext handleMsgContext,
                                 Map<String, Set<AbstractHTTPPushRequest>> waitingRequests) {
@@ -106,18 +109,18 @@ public class AsyncHTTPPushRequest extends AbstractHTTPPushRequest {
         builder.addHeader(ProtocolKey.LANGUAGE, Constants.LANGUAGE_JAVA);
         builder.addHeader(ProtocolKey.VERSION, ProtocolVersion.V1.getVersion());
         builder.addHeader(ProtocolKey.EventMeshInstanceKey.EVENTMESHCLUSTER,
-                handleMsgContext.getEventMeshHTTPServer()
-                        .getEventMeshHttpConfiguration().eventMeshCluster);
+            handleMsgContext.getEventMeshHTTPServer()
+                .getEventMeshHttpConfiguration().eventMeshCluster);
         builder.addHeader(ProtocolKey.EventMeshInstanceKey.EVENTMESHIP, IPUtils.getLocalAddress());
         builder.addHeader(ProtocolKey.EventMeshInstanceKey.EVENTMESHENV,
-                handleMsgContext.getEventMeshHTTPServer().getEventMeshHttpConfiguration().eventMeshEnv);
+            handleMsgContext.getEventMeshHTTPServer().getEventMeshHttpConfiguration().eventMeshEnv);
         builder.addHeader(ProtocolKey.EventMeshInstanceKey.EVENTMESHIDC,
-                handleMsgContext.getEventMeshHTTPServer().getEventMeshHttpConfiguration().eventMeshIDC);
+            handleMsgContext.getEventMeshHTTPServer().getEventMeshHttpConfiguration().eventMeshIDC);
 
         CloudEvent event = CloudEventBuilder.from(handleMsgContext.getEvent())
-                .withExtension(EventMeshConstants.REQ_EVENTMESH2C_TIMESTAMP,
-                        String.valueOf(System.currentTimeMillis()))
-                .build();
+            .withExtension(EventMeshConstants.REQ_EVENTMESH2C_TIMESTAMP,
+                String.valueOf(System.currentTimeMillis()))
+            .build();
         handleMsgContext.setEvent(event);
 
         String content = "";
@@ -127,7 +130,7 @@ public class AsyncHTTPPushRequest extends AbstractHTTPPushRequest {
             ProtocolAdaptor<ProtocolTransportObject> protocolAdaptor = ProtocolPluginFactory.getProtocolAdaptor(protocolType);
 
             ProtocolTransportObject protocolTransportObject =
-                    protocolAdaptor.fromCloudEvent(handleMsgContext.getEvent());
+                protocolAdaptor.fromCloudEvent(handleMsgContext.getEvent());
             content = ((HttpCommand) protocolTransportObject).getBody().toMap().get("content").toString();
         } catch (Exception ex) {
             return;
@@ -137,27 +140,36 @@ public class AsyncHTTPPushRequest extends AbstractHTTPPushRequest {
         body.add(new BasicNameValuePair(PushMessageRequestBody.CONTENT, content));
         if (StringUtils.isBlank(handleMsgContext.getBizSeqNo())) {
             body.add(new BasicNameValuePair(PushMessageRequestBody.BIZSEQNO,
-                    RandomStringUtils.generateNum(20)));
+                RandomStringUtils.generateNum(20)));
         } else {
             body.add(new BasicNameValuePair(PushMessageRequestBody.BIZSEQNO,
-                    handleMsgContext.getBizSeqNo()));
+                handleMsgContext.getBizSeqNo()));
         }
         if (StringUtils.isBlank(handleMsgContext.getUniqueId())) {
             body.add(new BasicNameValuePair(PushMessageRequestBody.UNIQUEID,
-                    RandomStringUtils.generateNum(20)));
+                RandomStringUtils.generateNum(20)));
         } else {
             body.add(new BasicNameValuePair(PushMessageRequestBody.UNIQUEID,
-                    handleMsgContext.getUniqueId()));
+                handleMsgContext.getUniqueId()));
         }
 
         body.add(new BasicNameValuePair(PushMessageRequestBody.RANDOMNO,
-                handleMsgContext.getMsgRandomNo()));
+            handleMsgContext.getMsgRandomNo()));
         body.add(new BasicNameValuePair(PushMessageRequestBody.TOPIC, handleMsgContext.getTopic()));
 
         body.add(new BasicNameValuePair(PushMessageRequestBody.EXTFIELDS,
-                JsonUtils.serialize(EventMeshUtil.getEventProp(handleMsgContext.getEvent()))));
+            JsonUtils.serialize(EventMeshUtil.getEventProp(handleMsgContext.getEvent()))));
+
+        HttpEntity httpEntity = new UrlEncodedFormEntity(body, StandardCharsets.UTF_8);
+
+        builder.setEntity(httpEntity);
+
+        // for CloudEvents Webhook spec
+        String urlAuthType = handleMsgContext.getConsumerGroupConfig().getConsumerGroupTopicConf()
+            .get(handleMsgContext.getTopic()).getHttpAuthTypeMap().get(currPushUrl);
 
-        builder.setEntity(new UrlEncodedFormEntity(body, StandardCharsets.UTF_8));
+        WebhookUtil.setWebhookHeaders(builder, httpEntity.getContentType().getValue(), eventMeshHttpConfiguration.eventMeshWebhookOrigin,
+            urlAuthType);
 
         eventMeshHTTPServer.metrics.getSummaryMetrics().recordPushMsg();
 
@@ -166,41 +178,32 @@ public class AsyncHTTPPushRequest extends AbstractHTTPPushRequest {
         addToWaitingMap(this);
 
         cmdLogger.info("cmd={}|eventMesh2client|from={}|to={}", requestCode,
-                IPUtils.getLocalAddress(), currPushUrl);
+            IPUtils.getLocalAddress(), currPushUrl);
 
         try {
-            httpClientPool.getClient().execute(builder, new ResponseHandler<Object>() {
+            eventMeshHTTPServer.httpClientPool.getClient().execute(builder, new ResponseHandler<Object>() {
                 @Override
                 public Object handleResponse(HttpResponse response) {
                     removeWaitingMap(AsyncHTTPPushRequest.this);
                     long cost = System.currentTimeMillis() - lastPushTime;
                     eventMeshHTTPServer.metrics.getSummaryMetrics().recordHTTPPushTimeCost(cost);
-                    if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK) {
-                        eventMeshHTTPServer.metrics.getSummaryMetrics().recordHttpPushMsgFailed();
-                        messageLogger.info(
-                                "message|eventMesh2client|exception|url={}|topic={}|bizSeqNo={}"
-                                        + "|uniqueId={}|cost={}", currPushUrl, handleMsgContext.getTopic(),
-                                handleMsgContext.getBizSeqNo(), handleMsgContext.getUniqueId(), cost);
 
-                        delayRetry();
-                        if (isComplete()) {
-                            handleMsgContext.finish();
-                        }
-                    } else {
+                    if (processResponseStatus(response.getStatusLine().getStatusCode(), response)) {
+                        // this is successful response, process response payload
                         String res = "";
                         try {
                             res = EntityUtils.toString(response.getEntity(),
-                                    Charset.forName(EventMeshConstants.DEFAULT_CHARSET));
+                                Charset.forName(EventMeshConstants.DEFAULT_CHARSET));
                         } catch (IOException e) {
                             handleMsgContext.finish();
                             return new Object();
                         }
                         ClientRetCode result = processResponseContent(res);
                         messageLogger.info(
-                                "message|eventMesh2client|{}|url={}|topic={}|bizSeqNo={}"
-                                        + "|uniqueId={}|cost={}",
-                                result, currPushUrl, handleMsgContext.getTopic(),
-                                handleMsgContext.getBizSeqNo(), handleMsgContext.getUniqueId(), cost);
+                            "message|eventMesh2client|{}|url={}|topic={}|bizSeqNo={}"
+                                + "|uniqueId={}|cost={}",
+                            result, currPushUrl, handleMsgContext.getTopic(),
+                            handleMsgContext.getBizSeqNo(), handleMsgContext.getUniqueId(), cost);
                         if (result == ClientRetCode.OK) {
                             complete();
                             if (isComplete()) {
@@ -222,6 +225,16 @@ public class AsyncHTTPPushRequest extends AbstractHTTPPushRequest {
                                 handleMsgContext.finish();
                             }
                         }
+                    } else {
+                        eventMeshHTTPServer.metrics.getSummaryMetrics().recordHttpPushMsgFailed();
+                        messageLogger.info(
+                            "message|eventMesh2client|exception|url={}|topic={}|bizSeqNo={}"
+                                + "|uniqueId={}|cost={}", currPushUrl, handleMsgContext.getTopic(),
+                            handleMsgContext.getBizSeqNo(), handleMsgContext.getUniqueId(), cost);
+
+                        if (isComplete()) {
+                            handleMsgContext.finish();
+                        }
                     }
                     return new Object();
                 }
@@ -229,13 +242,13 @@ public class AsyncHTTPPushRequest extends AbstractHTTPPushRequest {
 
             if (messageLogger.isDebugEnabled()) {
                 messageLogger.debug("message|eventMesh2client|url={}|topic={}|event={}", currPushUrl,
-                        handleMsgContext.getTopic(),
-                        handleMsgContext.getEvent());
+                    handleMsgContext.getTopic(),
+                    handleMsgContext.getEvent());
             } else {
                 messageLogger
-                        .info("message|eventMesh2client|url={}|topic={}|bizSeqNo={}|uniqueId={}",
-                                currPushUrl, handleMsgContext.getTopic(),
-                                handleMsgContext.getBizSeqNo(), handleMsgContext.getUniqueId());
+                    .info("message|eventMesh2client|url={}|topic={}|bizSeqNo={}|uniqueId={}",
+                        currPushUrl, handleMsgContext.getTopic(),
+                        handleMsgContext.getBizSeqNo(), handleMsgContext.getUniqueId());
             }
         } catch (IOException e) {
             messageLogger.error("push2client err", e);
@@ -251,19 +264,44 @@ public class AsyncHTTPPushRequest extends AbstractHTTPPushRequest {
     public String toString() {
         StringBuilder sb = new StringBuilder();
         sb.append("asyncPushRequest={")
-                .append("bizSeqNo=").append(handleMsgContext.getBizSeqNo())
-                .append(",startIdx=").append(startIdx)
-                .append(",retryTimes=").append(retryTimes)
-                .append(",uniqueId=").append(handleMsgContext.getUniqueId())
-                .append(",executeTime=")
-                .append(DateFormatUtils.format(executeTime, Constants.DATE_FORMAT))
-                .append(",lastPushTime=")
-                .append(DateFormatUtils.format(lastPushTime, Constants.DATE_FORMAT))
-                .append(",createTime=")
-                .append(DateFormatUtils.format(createTime, Constants.DATE_FORMAT)).append("}");
+            .append("bizSeqNo=").append(handleMsgContext.getBizSeqNo())
+            .append(",startIdx=").append(startIdx)
+            .append(",retryTimes=").append(retryTimes)
+            .append(",uniqueId=").append(handleMsgContext.getUniqueId())
+            .append(",executeTime=")
+            .append(DateFormatUtils.format(executeTime, Constants.DATE_FORMAT))
+            .append(",lastPushTime=")
+            .append(DateFormatUtils.format(lastPushTime, Constants.DATE_FORMAT))
+            .append(",createTime=")
+            .append(DateFormatUtils.format(createTime, Constants.DATE_FORMAT)).append("}");
         return sb.toString();
     }
 
+    boolean processResponseStatus(int httpStatus, HttpResponse httpResponse) {
+        if (httpStatus == HttpStatus.SC_OK || httpStatus == HttpStatus.SC_CREATED
+            || httpStatus == HttpStatus.SC_NO_CONTENT || httpStatus == HttpStatus.SC_ACCEPTED) {
+            // success http response
+            return true;
+        } else if (httpStatus == 429) {
+            // failed with customer retry interval
+
+            // Response Status code is 429 Too Many Requests
+            // retry after the time specified by the header
+            Optional<Header> optHeader = Arrays.stream(httpResponse.getHeaders("Retry-After")).findAny();
+            if (optHeader.isPresent() && StringUtils.isNumeric(optHeader.get().getValue())) {
+                delayRetry(Long.parseLong(optHeader.get().getValue()));
+            }
+            return false;
+        } else if (httpStatus == HttpStatus.SC_GONE || httpStatus == HttpStatus.SC_UNSUPPORTED_MEDIA_TYPE) {
+            // failed with no retry
+            return false;
+        }
+
+        // failed with default retry
+        delayRetry();
+        return false;
+    }
+
     ClientRetCode processResponseContent(String content) {
         if (StringUtils.isBlank(content)) {
             return ClientRetCode.FAIL;
@@ -271,8 +309,8 @@ public class AsyncHTTPPushRequest extends AbstractHTTPPushRequest {
 
         try {
             Map<String, Object> ret =
-                    JsonUtils.deserialize(content, new TypeReference<Map<String, Object>>() {
-                    });
+                JsonUtils.deserialize(content, new TypeReference<Map<String, Object>>() {
+                });
             Integer retCode = (Integer) ret.get("retCode");
             if (retCode != null && ClientRetCode.contains(retCode)) {
                 return ClientRetCode.get(retCode);
@@ -281,15 +319,15 @@ public class AsyncHTTPPushRequest extends AbstractHTTPPushRequest {
             return ClientRetCode.FAIL;
         } catch (NumberFormatException e) {
             messageLogger.warn("url:{}, bizSeqno:{}, uniqueId:{}, httpResponse:{}", currPushUrl,
-                    handleMsgContext.getBizSeqNo(), handleMsgContext.getUniqueId(), content);
+                handleMsgContext.getBizSeqNo(), handleMsgContext.getUniqueId(), content);
             return ClientRetCode.FAIL;
         } catch (JsonException e) {
             messageLogger.warn("url:{}, bizSeqno:{}, uniqueId:{},  httpResponse:{}", currPushUrl,
-                    handleMsgContext.getBizSeqNo(), handleMsgContext.getUniqueId(), content);
+                handleMsgContext.getBizSeqNo(), handleMsgContext.getUniqueId(), content);
             return ClientRetCode.FAIL;
         } catch (Throwable t) {
             messageLogger.warn("url:{}, bizSeqno:{}, uniqueId:{},  httpResponse:{}", currPushUrl,
-                    handleMsgContext.getBizSeqNo(), handleMsgContext.getUniqueId(), content);
+                handleMsgContext.getBizSeqNo(), handleMsgContext.getUniqueId(), content);
             return ClientRetCode.FAIL;
         }
     }
@@ -300,7 +338,7 @@ public class AsyncHTTPPushRequest extends AbstractHTTPPushRequest {
             return;
         }
         waitingRequests
-                .put(request.handleMsgContext.getConsumerGroup(), Sets.newConcurrentHashSet());
+            .put(request.handleMsgContext.getConsumerGroup(), Sets.newConcurrentHashSet());
         waitingRequests.get(request.handleMsgContext.getConsumerGroup()).add(request);
     }
 
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/HTTPClientPool.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/HTTPClientPool.java
index 28eec0a..8a96937 100644
--- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/HTTPClientPool.java
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/HTTPClientPool.java
@@ -19,17 +19,41 @@ package org.apache.eventmesh.runtime.core.protocol.http.push;
 
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.lang3.RandomUtils;
+import org.apache.http.config.Registry;
+import org.apache.http.config.RegistryBuilder;
+import org.apache.http.conn.socket.ConnectionSocketFactory;
+import org.apache.http.conn.socket.PlainConnectionSocketFactory;
+import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
+import org.apache.http.impl.DefaultConnectionReuseStrategy;
 import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.DefaultConnectionKeepAliveStrategy;
+import org.apache.http.impl.client.DefaultHttpRequestRetryHandler;
 import org.apache.http.impl.client.HttpClients;
+import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
+import org.apache.http.ssl.SSLContexts;
+import org.apache.http.ssl.TrustStrategy;
 
+import java.security.KeyManagementException;
+import java.security.KeyStoreException;
+import java.security.NoSuchAlgorithmException;
+import java.security.cert.X509Certificate;
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 
-import com.google.common.collect.Lists;
+import javax.net.ssl.HostnameVerifier;
+import javax.net.ssl.SSLContext;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class HTTPClientPool {
 
-    private List<CloseableHttpClient> clients = Lists.newArrayList();
+    public Logger logger = LoggerFactory.getLogger(this.getClass());
+
+    private List<CloseableHttpClient> clients = Collections.synchronizedList(new ArrayList<>());
 
     private int core = 1;
 
@@ -39,14 +63,13 @@ public class HTTPClientPool {
 
     public CloseableHttpClient getClient() {
         if (CollectionUtils.size(clients) < core) {
-            CloseableHttpClient client = HttpClients.createDefault();
+            CloseableHttpClient client = getHttpClient(200, 30, null);
             clients.add(client);
             return client;
         }
         return clients.get(RandomUtils.nextInt(core, 2 * core) % core);
     }
 
-
     public void shutdown() throws Exception {
         Iterator<CloseableHttpClient> itr = clients.iterator();
         while (itr.hasNext()) {
@@ -55,4 +78,41 @@ public class HTTPClientPool {
             itr.remove();
         }
     }
+
+    public CloseableHttpClient getHttpClient(int maxTotal, int idleTimeInSeconds, SSLContext sslContext) {
+        try {
+            if (sslContext == null) {
+                sslContext = SSLContexts.custom().loadTrustMaterial(new TheTrustStrategy()).build();
+            }
+        } catch (KeyManagementException | NoSuchAlgorithmException | KeyStoreException e) {
+            logger.error("Get sslContext error: {}", e.getMessage());
+            return HttpClients.createDefault();
+        }
+
+        HostnameVerifier hostnameVerifier = SSLConnectionSocketFactory.ALLOW_ALL_HOSTNAME_VERIFIER;
+
+        SSLConnectionSocketFactory sslsf = new SSLConnectionSocketFactory(sslContext, hostnameVerifier);
+        Registry<ConnectionSocketFactory> socketFactoryRegistry
+            = RegistryBuilder.<ConnectionSocketFactory>create()
+            .register("http", PlainConnectionSocketFactory.getSocketFactory())
+            .register("https", sslsf).build();
+        PoolingHttpClientConnectionManager connectionManager = new PoolingHttpClientConnectionManager(socketFactoryRegistry);
+
+        connectionManager.setDefaultMaxPerRoute(maxTotal);
+        connectionManager.setMaxTotal(maxTotal);
+        return HttpClients.custom()
+            .setConnectionManager(connectionManager)
+            .setKeepAliveStrategy(new DefaultConnectionKeepAliveStrategy())
+            .evictIdleConnections(idleTimeInSeconds, TimeUnit.SECONDS)
+            .setConnectionReuseStrategy(new DefaultConnectionReuseStrategy())
+            .setRetryHandler(new DefaultHttpRequestRetryHandler())
+            .build();
+    }
+
+    public static class TheTrustStrategy implements TrustStrategy {
+        @Override
+        public boolean isTrusted(X509Certificate[] arg0, String arg1) {
+            return true;
+        }
+    }
 }
diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/WebhookUtil.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/WebhookUtil.java
new file mode 100644
index 0000000..6cc515a
--- /dev/null
+++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/util/WebhookUtil.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.runtime.util;
+
+import org.apache.eventmesh.api.auth.AuthService;
+import org.apache.eventmesh.spi.EventMeshExtensionFactory;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpOptions;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.message.BasicHeader;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Utility class for implementing CloudEvents Http Webhook spec
+ *
+ * @see <a href="https://github.com/cloudevents/spec/blob/v1.0.2/cloudevents/http-webhook.md">CloudEvents Http Webhook</a>
+ */
+public class WebhookUtil {
+
+    private static final Logger logger = LoggerFactory.getLogger(WebhookUtil.class.getName());
+
+    private static final String CONTENT_TYPE_HEADER = "Content-Type";
+    private static final String REQUEST_ORIGIN_HEADER = "WebHook-Request-Origin";
+    private static final String ALLOWED_ORIGIN_HEADER = "WebHook-Allowed-Origin";
+
+    private static final Map<String, AuthService> authServices = new ConcurrentHashMap<>();
+
+    public static boolean obtainDeliveryAgreement(CloseableHttpClient httpClient, String targetUrl, String requestOrigin) {
+        logger.info("obtain webhook delivery agreement for url: {}", targetUrl);
+        HttpOptions builder = new HttpOptions(targetUrl);
+        builder.addHeader(REQUEST_ORIGIN_HEADER, requestOrigin);
+
+        try (CloseableHttpResponse response = httpClient.execute(builder)) {
+            String allowedOrigin = response.getLastHeader(ALLOWED_ORIGIN_HEADER).getValue();
+            return StringUtils.isEmpty(allowedOrigin)
+                || allowedOrigin.equals("*") || allowedOrigin.equalsIgnoreCase(requestOrigin);
+        } catch (Exception e) {
+            logger.warn("HTTP Options Method is not supported at the Delivery Target: {},"
+                + " unable to obtain the webhook delivery agreement.", targetUrl);
+        }
+        return true;
+    }
+
+    public static void setWebhookHeaders(HttpPost builder, String contentType, String requestOrigin, String urlAuthType) {
+        builder.setHeader(CONTENT_TYPE_HEADER, contentType);
+        builder.setHeader(REQUEST_ORIGIN_HEADER, requestOrigin);
+
+        Map<String, String> authParam = getHttpAuthParam(urlAuthType);
+        if (authParam != null) {
+            authParam.forEach((k, v) -> builder.addHeader(new BasicHeader(k, v)));
+        }
+    }
+
+    private static Map<String, String> getHttpAuthParam(String authType) {
+        if (StringUtils.isEmpty(authType)) {
+            return null;
+        }
+        AuthService authService = getHttpAuthPlugin(authType);
+        if (authService != null) {
+            return authService.getAuthParams();
+        } else {
+            return null;
+        }
+    }
+
+    private static AuthService getHttpAuthPlugin(String pluginType) {
+        if (authServices.containsKey(pluginType)) {
+            return authServices.get(pluginType);
+        }
+
+        AuthService authService = EventMeshExtensionFactory.getExtension(AuthService.class, pluginType);
+
+        if (authService == null) {
+            logger.error("can't load the authService plugin, please check.");
+            throw new RuntimeException("doesn't load the authService plugin, please check.");
+        }
+        try {
+            authService.init();
+            authServices.put(pluginType, authService);
+            return authService;
+        } catch (Exception e) {
+            logger.error("Error in initializing authService", e);
+        }
+        return null;
+    }
+}
diff --git a/eventmesh-security-plugin/eventmesh-security-api/build.gradle b/eventmesh-security-plugin/eventmesh-security-api/build.gradle
index 0d41042..482f87d 100644
--- a/eventmesh-security-plugin/eventmesh-security-api/build.gradle
+++ b/eventmesh-security-plugin/eventmesh-security-api/build.gradle
@@ -16,6 +16,8 @@
  */
 
 dependencies {
+    implementation 'org.slf4j:slf4j-api'
+
     api project(":eventmesh-spi")
 
     testImplementation project(":eventmesh-spi")
diff --git a/eventmesh-security-plugin/eventmesh-security-api/build.gradle b/eventmesh-security-plugin/eventmesh-security-api/src/main/java/org/apache/eventmesh/api/auth/AuthService.java
similarity index 60%
copy from eventmesh-security-plugin/eventmesh-security-api/build.gradle
copy to eventmesh-security-plugin/eventmesh-security-api/src/main/java/org/apache/eventmesh/api/auth/AuthService.java
index 0d41042..d957b50 100644
--- a/eventmesh-security-plugin/eventmesh-security-api/build.gradle
+++ b/eventmesh-security-plugin/eventmesh-security-api/src/main/java/org/apache/eventmesh/api/auth/AuthService.java
@@ -15,8 +15,25 @@
  * limitations under the License.
  */
 
-dependencies {
-    api project(":eventmesh-spi")
+package org.apache.eventmesh.api.auth;
 
-    testImplementation project(":eventmesh-spi")
+import org.apache.eventmesh.api.exception.AuthException;
+import org.apache.eventmesh.spi.EventMeshExtensionType;
+import org.apache.eventmesh.spi.EventMeshSPI;
+
+import java.util.Map;
+
+/**
+ * AuthService
+ */
+@EventMeshSPI(isSingleton = true, eventMeshExtensionType = EventMeshExtensionType.SECURITY)
+public interface AuthService {
+
+    void init() throws AuthException;
+
+    void start() throws AuthException;
+
+    void shutdown() throws AuthException;
+
+    Map getAuthParams() throws AuthException;
 }
diff --git a/eventmesh-security-plugin/eventmesh-security-api/src/main/java/org/apache/eventmesh/api/common/ConfigurationWrapper.java b/eventmesh-security-plugin/eventmesh-security-api/src/main/java/org/apache/eventmesh/api/common/ConfigurationWrapper.java
new file mode 100644
index 0000000..dc359c1
--- /dev/null
+++ b/eventmesh-security-plugin/eventmesh-security-api/src/main/java/org/apache/eventmesh/api/common/ConfigurationWrapper.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.api.common;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.net.URL;
+import java.util.Properties;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ConfigurationWrapper {
+
+    private static Logger logger = LoggerFactory.getLogger("ConfigurationWrapper");
+
+    private static final String EVENTMESH_CONFIG_HOME = System.getProperty("confPath", System.getenv("confPath"));
+
+    public static Properties getConfig(String configFile) {
+        String configFilePath;
+
+        // get from classpath
+        URL resource = ConfigurationWrapper.class.getClassLoader().getResource(configFile);
+        if (resource != null && new File(resource.getPath()).exists()) {
+            configFilePath = resource.getPath();
+        } else {
+            // get from config home
+            configFilePath = EVENTMESH_CONFIG_HOME + File.separator + configFile;
+        }
+
+        logger.info("loading auth config: {}", configFilePath);
+        Properties properties = new Properties();
+        try {
+            properties.load(new BufferedReader(new FileReader(configFilePath)));
+        } catch (IOException e) {
+            throw new IllegalArgumentException(
+                    String.format("Cannot load RocketMQ configuration file from :%s", configFilePath));
+        }
+        return properties;
+    }
+}
diff --git a/eventmesh-security-plugin/eventmesh-security-api/build.gradle b/eventmesh-security-plugin/eventmesh-security-api/src/main/java/org/apache/eventmesh/api/exception/AuthException.java
similarity index 74%
copy from eventmesh-security-plugin/eventmesh-security-api/build.gradle
copy to eventmesh-security-plugin/eventmesh-security-api/src/main/java/org/apache/eventmesh/api/exception/AuthException.java
index 0d41042..67bef31 100644
--- a/eventmesh-security-plugin/eventmesh-security-api/build.gradle
+++ b/eventmesh-security-plugin/eventmesh-security-api/src/main/java/org/apache/eventmesh/api/exception/AuthException.java
@@ -15,8 +15,15 @@
  * limitations under the License.
  */
 
-dependencies {
-    api project(":eventmesh-spi")
+package org.apache.eventmesh.api.exception;
 
-    testImplementation project(":eventmesh-spi")
+public class AuthException extends RuntimeException {
+
+    public AuthException(String message) {
+        super(message);
+    }
+
+    public AuthException(String message, Throwable cause) {
+        super(message, cause);
+    }
 }
diff --git a/eventmesh-security-plugin/eventmesh-security-api/build.gradle b/eventmesh-security-plugin/eventmesh-security-auth-http-basic/build.gradle
similarity index 83%
copy from eventmesh-security-plugin/eventmesh-security-api/build.gradle
copy to eventmesh-security-plugin/eventmesh-security-auth-http-basic/build.gradle
index 0d41042..bb0f4d6 100644
--- a/eventmesh-security-plugin/eventmesh-security-api/build.gradle
+++ b/eventmesh-security-plugin/eventmesh-security-auth-http-basic/build.gradle
@@ -16,7 +16,7 @@
  */
 
 dependencies {
-    api project(":eventmesh-spi")
+    implementation project(":eventmesh-security-plugin:eventmesh-security-api")
 
-    testImplementation project(":eventmesh-spi")
-}
+    testImplementation project(":eventmesh-security-plugin:eventmesh-security-api")
+}
\ No newline at end of file
diff --git a/eventmesh-security-plugin/eventmesh-security-auth-http-basic/gradle.properties b/eventmesh-security-plugin/eventmesh-security-auth-http-basic/gradle.properties
new file mode 100644
index 0000000..83021bb
--- /dev/null
+++ b/eventmesh-security-plugin/eventmesh-security-auth-http-basic/gradle.properties
@@ -0,0 +1,18 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+pluginType=security
+pluginName=auth-http-basic
\ No newline at end of file
diff --git a/eventmesh-security-plugin/eventmesh-security-api/build.gradle b/eventmesh-security-plugin/eventmesh-security-auth-http-basic/src/main/java/org/apache/eventmesh/auth/http/basic/config/AuthConfigs.java
similarity index 54%
copy from eventmesh-security-plugin/eventmesh-security-api/build.gradle
copy to eventmesh-security-plugin/eventmesh-security-auth-http-basic/src/main/java/org/apache/eventmesh/auth/http/basic/config/AuthConfigs.java
index 0d41042..c077bf4 100644
--- a/eventmesh-security-plugin/eventmesh-security-api/build.gradle
+++ b/eventmesh-security-plugin/eventmesh-security-auth-http-basic/src/main/java/org/apache/eventmesh/auth/http/basic/config/AuthConfigs.java
@@ -15,8 +15,27 @@
  * limitations under the License.
  */
 
-dependencies {
-    api project(":eventmesh-spi")
+package org.apache.eventmesh.auth.http.basic.config;
 
-    testImplementation project(":eventmesh-spi")
+import org.apache.eventmesh.api.common.ConfigurationWrapper;
+
+import java.util.Properties;
+
+public class AuthConfigs {
+
+    public String username;
+
+    public String password;
+
+    private static AuthConfigs instance;
+
+    public static synchronized AuthConfigs getConfigs() {
+        if (instance == null) {
+            Properties props = ConfigurationWrapper.getConfig("auth-http-basic.properties");
+            instance = new AuthConfigs();
+            instance.username = props.getProperty("auth.username");
+            instance.password = props.getProperty("auth.password");
+        }
+        return instance;
+    }
 }
diff --git a/eventmesh-security-plugin/eventmesh-security-auth-http-basic/src/main/java/org/apache/eventmesh/auth/http/basic/impl/AuthHttpBasicService.java b/eventmesh-security-plugin/eventmesh-security-auth-http-basic/src/main/java/org/apache/eventmesh/auth/http/basic/impl/AuthHttpBasicService.java
new file mode 100644
index 0000000..c32c988
--- /dev/null
+++ b/eventmesh-security-plugin/eventmesh-security-auth-http-basic/src/main/java/org/apache/eventmesh/auth/http/basic/impl/AuthHttpBasicService.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.auth.http.basic.impl;
+
+import org.apache.eventmesh.api.auth.AuthService;
+import org.apache.eventmesh.api.exception.AuthException;
+import org.apache.eventmesh.auth.http.basic.config.AuthConfigs;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
+import java.util.HashMap;
+import java.util.Map;
+
+public class AuthHttpBasicService implements AuthService {
+
+    private AuthConfigs authConfigs;
+
+    @Override
+    public void init() throws AuthException {
+        authConfigs = AuthConfigs.getConfigs();
+    }
+
+    @Override
+    public void start() throws AuthException {
+
+    }
+
+    @Override
+    public void shutdown() throws AuthException {
+
+    }
+
+    @Override
+    public Map getAuthParams() throws AuthException {
+        if (authConfigs == null) {
+            init();
+        }
+
+        String token = Base64.getEncoder().encodeToString((authConfigs.username + authConfigs.password)
+            .getBytes(StandardCharsets.UTF_8));
+
+        Map authParams = new HashMap();
+        authParams.put("Authorization", "Basic " + token);
+        return authParams;
+    }
+}
diff --git a/eventmesh-security-plugin/eventmesh-security-auth-http-basic/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.api.auth.AuthService b/eventmesh-security-plugin/eventmesh-security-auth-http-basic/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.api.auth.AuthService
new file mode 100644
index 0000000..2dde936
--- /dev/null
+++ b/eventmesh-security-plugin/eventmesh-security-auth-http-basic/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.api.auth.AuthService
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+auth-http-basic=org.apache.eventmesh.auth.http.basic.impl.AuthHttpBasicService
\ No newline at end of file
diff --git a/eventmesh-security-plugin/eventmesh-security-auth-http-basic/src/main/resources/auth-http-basic.properties b/eventmesh-security-plugin/eventmesh-security-auth-http-basic/src/main/resources/auth-http-basic.properties
new file mode 100644
index 0000000..f06e6c1
--- /dev/null
+++ b/eventmesh-security-plugin/eventmesh-security-auth-http-basic/src/main/resources/auth-http-basic.properties
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+auth.username = ****
+auth.password = ****
\ No newline at end of file
diff --git a/settings.gradle b/settings.gradle
index 6481fdd..08c0816 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -42,7 +42,9 @@ findProject(':eventmesh-protocol-plugin:eventmesh-protocol-grpcmessage')?.name =
 include 'eventmesh-metrics-plugin'
 include 'eventmesh-metrics-plugin:eventmesh-metrics-api'
 include 'eventmesh-metrics-plugin:eventmesh-metrics-opentelemetry'
+
+include 'eventmesh-security-plugin:eventmesh-security-auth-http-basic'
+
 include 'eventmesh-trace-plugin'
 include 'eventmesh-trace-plugin:eventmesh-trace-api'
 include 'eventmesh-trace-plugin:eventmesh-trace-zipkin'
-

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org