You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@eventmesh.apache.org by GitBox <gi...@apache.org> on 2022/06/19 18:44:36 UTC

[GitHub] [incubator-eventmesh] githublaohu opened a new pull request, #937: [ISSUE #865] Complete the joint commissioning of webhook and httpservice

githublaohu opened a new pull request, #937:
URL: https://github.com/apache/incubator-eventmesh/pull/937

   <!--
   ### Contribution Checklist
   
     - Name the pull request in the form "[ISSUE #XXXX] Title of the pull request", 
       where *XXXX* should be replaced by the actual issue number.
       Skip *[ISSUE #XXXX]* if there is no associated github issue for this pull request.
   
     - Fill out the template below to describe the changes contributed by the pull request. 
       That will give reviewers the context they need to do the review.
     
     - Each pull request should address only one issue. 
       Please do not mix up code from multiple issues.
     
     - Each commit in the pull request should have a meaningful commit message.
   
     - Once all items of the checklist are addressed, remove the above text and this checklist, 
       leaving only the filled out template below.
   
   (The sections below can be removed for hotfixes of typos)
   -->
   
   <!--
   (If this PR fixes a GitHub issue, please add `Fixes ISSUE #<XXX>`.)
   -->
   
   Fixes ISSUE #<XXXX>.
   
   ### Motivation
   
   *Explain the content here.*
   *Explain why you want to make the changes and what problem you're trying to solve.*
   
   
   
   ### Modifications
   
   *Describe the modifications you've done.*
   
   
   
   ### Documentation
   
   - Does this pull request introduce a new feature? (yes / no)
   - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)
   - If a feature is not applicable for documentation, explain why?
   - If a feature is not documented yet in this PR, please create a followup issue for adding the documentation
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@eventmesh.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: dev-help@eventmesh.apache.org


[GitHub] [incubator-eventmesh] xwm1992 commented on a diff in pull request #937: [ISSUE #865] Complete the joint commissioning of webhook and httpservice

Posted by GitBox <gi...@apache.org>.
xwm1992 commented on code in PR #937:
URL: https://github.com/apache/incubator-eventmesh/pull/937#discussion_r910561784


##########
eventmesh-runtime/conf/eventmesh.properties:
##########
@@ -89,4 +89,17 @@ eventMesh.metrics.plugin=prometheus
 
 # trace plugin
 eventMesh.server.trace.enabled=false
-eventMesh.trace.plugin=zipkin
\ No newline at end of file
+eventMesh.trace.plugin=zipkin
+
+# webhook
+# 是否启动webhook admin服务

Review Comment:
   please change the Chinese comments to English in the project.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@eventmesh.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: dev-help@eventmesh.apache.org


[GitHub] [incubator-eventmesh] githublaohu commented on a diff in pull request #937: [ISSUE #865] Complete the joint commissioning of webhook and httpservice

Posted by GitBox <gi...@apache.org>.
githublaohu commented on code in PR #937:
URL: https://github.com/apache/incubator-eventmesh/pull/937#discussion_r920326682


##########
eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/HandlerService.java:
##########
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.runtime.core.protocol.http.processor;
+
+import org.apache.eventmesh.runtime.boot.HTTPTrace;
+import org.apache.eventmesh.runtime.boot.HTTPTrace.TraceOperation;
+import org.apache.eventmesh.runtime.metrics.http.HTTPMetricsServer;
+import org.apache.eventmesh.runtime.util.HttpResponseUtils;
+import org.apache.eventmesh.runtime.util.RemotingHelper;
+
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.http.HttpRequest;
+import io.netty.handler.codec.http.HttpResponse;
+
+import lombok.Setter;
+
+
+public class HandlerService {
+
+    private Logger httpServerLogger = LoggerFactory.getLogger(this.getClass());
+
+    private Logger httpLogger = LoggerFactory.getLogger("http");
+
+    private Map<String, ProcessorWrapper> httpProcessorMap = new ConcurrentHashMap<>();
+
+    @Setter
+    private HTTPMetricsServer metrics;
+
+    @Setter
+    private HTTPTrace httpTrace;
+
+
+    public void init() {
+        httpServerLogger.info("HandlerService start ");
+    }
+
+    public void register(HttpProcessor httpProcessor, ThreadPoolExecutor threadPoolExecutor) {
+        for (String path : httpProcessor.paths()) {
+            this.register(path, httpProcessor, threadPoolExecutor);
+        }
+    }
+
+    public void register(String path, HttpProcessor httpProcessor, ThreadPoolExecutor threadPoolExecutor) {
+
+        if (httpProcessorMap.containsKey(path)) {
+            throw new RuntimeException(String.format("HandlerService path %s repeat, repeat processor is %s ",
+                    path, httpProcessor.getClass().getSimpleName()));
+        }
+        ProcessorWrapper processorWrapper = new ProcessorWrapper();
+        processorWrapper.threadPoolExecutor = threadPoolExecutor;
+        processorWrapper.httpProcessor = httpProcessor;
+        if (httpProcessor instanceof AsynHttpProcessor) {
+            processorWrapper.asyn = (AsynHttpProcessor) httpProcessor;
+        } else {
+            processorWrapper.httpProcessor = httpProcessor;

Review Comment:
   ok



##########
eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/AsynHttpProcessor.java:
##########
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.runtime.core.protocol.http.processor;
+
+import org.apache.eventmesh.runtime.core.protocol.http.processor.HandlerService.HandlerSpecific;
+
+import io.netty.handler.codec.http.HttpRequest;
+import io.netty.handler.codec.http.HttpResponse;
+
+/**
+ * aysn http processor
+ */
+public interface AsynHttpProcessor extends HttpProcessor {

Review Comment:
   ok



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@eventmesh.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: dev-help@eventmesh.apache.org


[GitHub] [incubator-eventmesh] xwm1992 commented on a diff in pull request #937: [ISSUE #865] Complete the joint commissioning of webhook and httpservice

Posted by GitBox <gi...@apache.org>.
xwm1992 commented on code in PR #937:
URL: https://github.com/apache/incubator-eventmesh/pull/937#discussion_r916525171


##########
docs/cn/features/webhook.md:
##########
@@ -0,0 +1,158 @@
+
+
+## webhook使用流程
+#### 第一步:在eventmesh配置webhook相关信息并且启动
+
+##### 配置说明
+```
+# 是否启动webhook admin服务
+eventMesh.webHook.admin.start=true
+
+# webhook事件配置存储模式。目前只支持file与nacos
+eventMesh.webHook.operationMode=file
+# 文件存储模式的文件存放路径,如果写上#{eventMeshHome},在eventMesh根目录
+eventMesh.webHook.fileMode.filePath= #{eventMeshHome}/webhook
+
+# nacos存储模式,配置命名规则是eventMesh.webHook.nacosMode.{nacos 原生配置key} 具体的配置请看 [nacos github api](https://github.com/alibaba/nacos/blob/develop/api/src/main/java/com/alibaba/nacos/api/SystemPropertyKeyConst.java)
+## nacos的地址
+eventMesh.webHook.nacosMode.serverAddr=127.0.0.1:8848
+
+# webhook eventcloud 发送模式。与eventMesh.connector.plugin.type 配置一样
+eventMesh.webHook.producer.connector=standalone
+```
+
+#### 第二步:添加webhook配置信息
+配置信息说明
+```java
+	/**
+	 * 厂商调用的path。厂商事件调用地址、 [http or https ]://[域名 or IP 【厂商可以被调用】]:[端口]/webhook/[callbackPath]
+	 * 比如:http://127.0.0.1:10504/webhook/test/event 需要把全完url填入厂商调用输入中
+	 * callbackPath 是唯一
+	 * manufacturer callback path
+	 */
+    private String callbackPath;
+
+    /**
+     * 厂商的名字
+     * manufacturer name ,like github
+     */
+    private String manufacturerName;
+
+    /**
+     * 厂商的事件名
+     * webhook event name ,like rep-push
+     */
+    private String manufacturerEventName;
+
+    /**
+     * 
+     * http header content type
+     */
+    private String contentType = "application/json";
+
+    /**
+     * 说明
+     * description of this WebHookConfig
+     */
+    private String description;
+
+    /**
+     * 有一些厂商使用验签方式,
+     * secret key ,for authentication
+     */
+    private String secret;
+
+    /**
+     *  有一些厂商使用验签方式,使用账户密码方式
+     * userName ,for HTTP authentication
+     */
+    private String userName;
+
+    /**
+     *  有一些厂商使用验签方式,使用账户密码方式
+     * password ,for HTTP authentication
+     */
+    private String password;
+
+
+
+    /**
+     * 事件发送到那个topic
+     * roll out event name ,like topic to mq
+     */
+    private String cloudEventName;
+
+    /**
+     * roll out data format -> CloudEvent serialization mode
+     * If HTTP protocol is used, the request header contentType needs to be marked
+     */
+    private String dataContentType = "application/json";;
+
+    /**
+     * source of event
+     */
+    private String cloudEventSource;
+
+    /**
+     * cloudEvent事件对象唯一标识符识别方式,uuid或者manufacturerEventId(厂商id)
+     * id of cloudEvent ,like uuid/manufacturerEventId
+     */
+    private String cloudEventIdGenerateMode;
+
+```
+
+##### 添加接口
+路径: /webhook/insertWebHookConfig
+方法: POST
+contentType: application/json
+
+输入参数:
+| 字段 | 说明 | 类型 | 必须 | 默认值 |
+| -- | -- | -- | -- | -- |
+| callbackPath | 调用地址,唯一地址 | string | 是 | null |
+| manufacturerName | 厂商名 | string | 是 | null |
+| manufacturerEventName | 厂商事件名 | string | 是 | null |
+| contentType | http connettype | string | 否 | application/json |
+| description | 配置说明 | string | 否 | null |
+| secret | 验签密钥 | string | 否 | null |
+| userName | 用户名 | string | 否 | null |
+| password | 用户密码 | string | 否 | null |
+| cloudEventName | 事件名() | string | 是 | null |
+| cloudEventSource | 事件来源可以填写 | string | 是 | null |
+| cloudEventIdGenerateMode | cloudEvent事件对象唯一标识符识别方式,uuid或者manufacturerEventId(厂商id)  | string | 否 | manufacturerEventId |
+
+输出参数:1 成功,0失败
+
+
+
+
+#### 第三步:查看配置是否成功
+1. file存储模式。请到eventMesh.webHook.fileMode.filePath 目录下查看。文件名为callbackPath转移后的
+2. nacos存储模式。请到eventMesh.webHook.nacosMode.serverAddr 配置的nacos服务去看
+
+#### 第四步:配置cloudevent的消费者
+
+
+#### 第五步:在厂商配置webhook相关信息
+> 厂商操作请看【厂商webhook操作说明】
+
+
+## 厂商webhook操作说明
+### github 注册
+#### 第一步:进入对应的项目
+#### 第二步:点击setting
+![](../images/webhook/webhook-github-setting.png)

Review Comment:
   these images not display may be url is not correct



##########
eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/HandlerService.java:
##########
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.runtime.core.protocol.http.processor;
+
+import org.apache.eventmesh.runtime.boot.HTTPTrace;
+import org.apache.eventmesh.runtime.boot.HTTPTrace.TraceOperation;
+import org.apache.eventmesh.runtime.metrics.http.HTTPMetricsServer;
+import org.apache.eventmesh.runtime.util.HttpResponseUtils;
+import org.apache.eventmesh.runtime.util.RemotingHelper;
+
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.http.HttpRequest;
+import io.netty.handler.codec.http.HttpResponse;
+
+import lombok.Setter;
+
+
+public class HandlerService {
+
+    private Logger httpServerLogger = LoggerFactory.getLogger(this.getClass());
+
+    private Logger httpLogger = LoggerFactory.getLogger("http");
+
+    private Map<String, ProcessorWrapper> httpProcessorMap = new ConcurrentHashMap<>();
+
+    @Setter
+    private HTTPMetricsServer metrics;
+
+    @Setter
+    private HTTPTrace httpTrace;
+
+
+    public void init() {
+        httpServerLogger.info("HandlerService start ");
+    }
+
+    public void register(HttpProcessor httpProcessor, ThreadPoolExecutor threadPoolExecutor) {
+        for (String path : httpProcessor.paths()) {
+            this.register(path, httpProcessor, threadPoolExecutor);
+        }
+    }
+
+    public void register(String path, HttpProcessor httpProcessor, ThreadPoolExecutor threadPoolExecutor) {
+
+        if (httpProcessorMap.containsKey(path)) {
+            throw new RuntimeException(String.format("HandlerService path %s repeat, repeat processor is %s ",
+                    path, httpProcessor.getClass().getSimpleName()));
+        }
+        ProcessorWrapper processorWrapper = new ProcessorWrapper();
+        processorWrapper.threadPoolExecutor = threadPoolExecutor;
+        processorWrapper.httpProcessor = httpProcessor;
+        if (httpProcessor instanceof AsynHttpProcessor) {
+            processorWrapper.asyn = (AsynHttpProcessor) httpProcessor;
+        } else {
+            processorWrapper.httpProcessor = httpProcessor;
+        }
+        httpProcessorMap.put(path, processorWrapper);
+        httpServerLogger.info("path is {}  proocessor name is", path, httpProcessor.getClass().getSimpleName());

Review Comment:
   change -> httpServerLogger.info("path is {} processor name is {}", path, httpProcessor.getClass().getSimpleName());
   



##########
eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/HandlerService.java:
##########
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.runtime.core.protocol.http.processor;
+
+import org.apache.eventmesh.runtime.boot.HTTPTrace;
+import org.apache.eventmesh.runtime.boot.HTTPTrace.TraceOperation;
+import org.apache.eventmesh.runtime.metrics.http.HTTPMetricsServer;
+import org.apache.eventmesh.runtime.util.HttpResponseUtils;
+import org.apache.eventmesh.runtime.util.RemotingHelper;
+
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.http.HttpRequest;
+import io.netty.handler.codec.http.HttpResponse;
+
+import lombok.Setter;
+
+
+public class HandlerService {
+
+    private Logger httpServerLogger = LoggerFactory.getLogger(this.getClass());
+
+    private Logger httpLogger = LoggerFactory.getLogger("http");
+
+    private Map<String, ProcessorWrapper> httpProcessorMap = new ConcurrentHashMap<>();
+
+    @Setter
+    private HTTPMetricsServer metrics;
+
+    @Setter
+    private HTTPTrace httpTrace;
+
+
+    public void init() {
+        httpServerLogger.info("HandlerService start ");
+    }
+
+    public void register(HttpProcessor httpProcessor, ThreadPoolExecutor threadPoolExecutor) {
+        for (String path : httpProcessor.paths()) {
+            this.register(path, httpProcessor, threadPoolExecutor);
+        }
+    }
+
+    public void register(String path, HttpProcessor httpProcessor, ThreadPoolExecutor threadPoolExecutor) {
+
+        if (httpProcessorMap.containsKey(path)) {
+            throw new RuntimeException(String.format("HandlerService path %s repeat, repeat processor is %s ",
+                    path, httpProcessor.getClass().getSimpleName()));
+        }
+        ProcessorWrapper processorWrapper = new ProcessorWrapper();
+        processorWrapper.threadPoolExecutor = threadPoolExecutor;
+        processorWrapper.httpProcessor = httpProcessor;
+        if (httpProcessor instanceof AsynHttpProcessor) {
+            processorWrapper.asyn = (AsynHttpProcessor) httpProcessor;
+        } else {
+            processorWrapper.httpProcessor = httpProcessor;
+        }
+        httpProcessorMap.put(path, processorWrapper);
+        httpServerLogger.info("path is {}  proocessor name is", path, httpProcessor.getClass().getSimpleName());
+    }
+
+    public boolean isProcessorWrapper(HttpRequest httpRequest) {
+        return this.getProcessorWrapper(httpRequest) == null ? false : true;

Review Comment:
   change -> return this.getProcessorWrapper(httpRequest) != null;



##########
eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/HandlerService.java:
##########
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.runtime.core.protocol.http.processor;
+
+import org.apache.eventmesh.runtime.boot.HTTPTrace;
+import org.apache.eventmesh.runtime.boot.HTTPTrace.TraceOperation;
+import org.apache.eventmesh.runtime.metrics.http.HTTPMetricsServer;
+import org.apache.eventmesh.runtime.util.HttpResponseUtils;
+import org.apache.eventmesh.runtime.util.RemotingHelper;
+
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.http.HttpRequest;
+import io.netty.handler.codec.http.HttpResponse;
+
+import lombok.Setter;
+
+
+public class HandlerService {
+
+    private Logger httpServerLogger = LoggerFactory.getLogger(this.getClass());
+
+    private Logger httpLogger = LoggerFactory.getLogger("http");
+
+    private Map<String, ProcessorWrapper> httpProcessorMap = new ConcurrentHashMap<>();
+
+    @Setter
+    private HTTPMetricsServer metrics;
+
+    @Setter
+    private HTTPTrace httpTrace;
+
+
+    public void init() {
+        httpServerLogger.info("HandlerService start ");
+    }
+
+    public void register(HttpProcessor httpProcessor, ThreadPoolExecutor threadPoolExecutor) {
+        for (String path : httpProcessor.paths()) {
+            this.register(path, httpProcessor, threadPoolExecutor);
+        }
+    }
+
+    public void register(String path, HttpProcessor httpProcessor, ThreadPoolExecutor threadPoolExecutor) {
+
+        if (httpProcessorMap.containsKey(path)) {
+            throw new RuntimeException(String.format("HandlerService path %s repeat, repeat processor is %s ",
+                    path, httpProcessor.getClass().getSimpleName()));
+        }
+        ProcessorWrapper processorWrapper = new ProcessorWrapper();
+        processorWrapper.threadPoolExecutor = threadPoolExecutor;
+        processorWrapper.httpProcessor = httpProcessor;
+        if (httpProcessor instanceof AsynHttpProcessor) {
+            processorWrapper.asyn = (AsynHttpProcessor) httpProcessor;
+        } else {
+            processorWrapper.httpProcessor = httpProcessor;
+        }
+        httpProcessorMap.put(path, processorWrapper);
+        httpServerLogger.info("path is {}  proocessor name is", path, httpProcessor.getClass().getSimpleName());
+    }
+
+    public boolean isProcessorWrapper(HttpRequest httpRequest) {
+        return this.getProcessorWrapper(httpRequest) == null ? false : true;
+    }
+
+    private ProcessorWrapper getProcessorWrapper(HttpRequest httpRequest) {
+        String uri = httpRequest.uri();
+        for (Entry<String, ProcessorWrapper> e : httpProcessorMap.entrySet()) {
+            if (e.getKey().startsWith(uri)) {
+                return e.getValue();
+            }
+        }
+        return null;
+    }
+
+    /**
+     * @param httpRequest
+     */
+    public void handler(ChannelHandlerContext ctx, HttpRequest httpRequest) {
+
+
+        TraceOperation traceOperation = httpTrace.getTraceOperation(httpRequest, ctx.channel());
+
+        ProcessorWrapper processorWrapper = getProcessorWrapper(httpRequest);
+        if (Objects.isNull(processorWrapper)) {
+            this.sendResponse(ctx, HttpResponseUtils.createNotFound());
+        }
+        try {
+            HandlerSpecific handlerSpecific = new HandlerSpecific();
+            handlerSpecific.httpRequest = httpRequest;
+            handlerSpecific.ctx = ctx;
+            handlerSpecific.traceOperation = traceOperation;
+            processorWrapper.threadPoolExecutor.execute(handlerSpecific);

Review Comment:
   `processorWrapper` may produce NullPointerException



##########
eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/HandlerService.java:
##########
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.runtime.core.protocol.http.processor;
+
+import org.apache.eventmesh.runtime.boot.HTTPTrace;
+import org.apache.eventmesh.runtime.boot.HTTPTrace.TraceOperation;
+import org.apache.eventmesh.runtime.metrics.http.HTTPMetricsServer;
+import org.apache.eventmesh.runtime.util.HttpResponseUtils;
+import org.apache.eventmesh.runtime.util.RemotingHelper;
+
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.http.HttpRequest;
+import io.netty.handler.codec.http.HttpResponse;
+
+import lombok.Setter;
+
+
+public class HandlerService {
+
+    private Logger httpServerLogger = LoggerFactory.getLogger(this.getClass());
+
+    private Logger httpLogger = LoggerFactory.getLogger("http");
+
+    private Map<String, ProcessorWrapper> httpProcessorMap = new ConcurrentHashMap<>();
+
+    @Setter
+    private HTTPMetricsServer metrics;
+
+    @Setter
+    private HTTPTrace httpTrace;
+
+
+    public void init() {
+        httpServerLogger.info("HandlerService start ");
+    }
+
+    public void register(HttpProcessor httpProcessor, ThreadPoolExecutor threadPoolExecutor) {
+        for (String path : httpProcessor.paths()) {
+            this.register(path, httpProcessor, threadPoolExecutor);
+        }
+    }
+
+    public void register(String path, HttpProcessor httpProcessor, ThreadPoolExecutor threadPoolExecutor) {
+
+        if (httpProcessorMap.containsKey(path)) {
+            throw new RuntimeException(String.format("HandlerService path %s repeat, repeat processor is %s ",
+                    path, httpProcessor.getClass().getSimpleName()));
+        }
+        ProcessorWrapper processorWrapper = new ProcessorWrapper();
+        processorWrapper.threadPoolExecutor = threadPoolExecutor;
+        processorWrapper.httpProcessor = httpProcessor;
+        if (httpProcessor instanceof AsynHttpProcessor) {
+            processorWrapper.asyn = (AsynHttpProcessor) httpProcessor;
+        } else {
+            processorWrapper.httpProcessor = httpProcessor;
+        }
+        httpProcessorMap.put(path, processorWrapper);
+        httpServerLogger.info("path is {}  proocessor name is", path, httpProcessor.getClass().getSimpleName());
+    }
+
+    public boolean isProcessorWrapper(HttpRequest httpRequest) {
+        return this.getProcessorWrapper(httpRequest) == null ? false : true;
+    }
+
+    private ProcessorWrapper getProcessorWrapper(HttpRequest httpRequest) {
+        String uri = httpRequest.uri();
+        for (Entry<String, ProcessorWrapper> e : httpProcessorMap.entrySet()) {
+            if (e.getKey().startsWith(uri)) {
+                return e.getValue();
+            }
+        }
+        return null;
+    }
+
+    /**
+     * @param httpRequest
+     */
+    public void handler(ChannelHandlerContext ctx, HttpRequest httpRequest) {
+
+
+        TraceOperation traceOperation = httpTrace.getTraceOperation(httpRequest, ctx.channel());
+
+        ProcessorWrapper processorWrapper = getProcessorWrapper(httpRequest);
+        if (Objects.isNull(processorWrapper)) {
+            this.sendResponse(ctx, HttpResponseUtils.createNotFound());
+        }
+        try {
+            HandlerSpecific handlerSpecific = new HandlerSpecific();
+            handlerSpecific.httpRequest = httpRequest;
+            handlerSpecific.ctx = ctx;
+            handlerSpecific.traceOperation = traceOperation;
+            processorWrapper.threadPoolExecutor.execute(handlerSpecific);
+        } catch (Exception e) {
+            httpServerLogger.error(e.getMessage(), e);
+            this.sendResponse(ctx, HttpResponseUtils.createInternalServerError());
+        }
+    }
+
+    private void sendResponse(ChannelHandlerContext ctx, HttpResponse response) {
+        this.sendResponse(ctx, response, true);
+    }
+
+    private void sendResponse(ChannelHandlerContext ctx, HttpResponse response, boolean isClose) {
+        ctx.writeAndFlush(response).addListener((ChannelFutureListener) f -> {
+            if (!f.isSuccess()) {
+                httpLogger.warn("send response to [{}] fail, will close this channel",
+                        RemotingHelper.parseChannelRemoteAddr(f.channel()));
+                if (isClose) {
+                    f.channel().close();
+                }
+            }
+        });
+    }
+
+    class HandlerSpecific implements Runnable {
+
+        private TraceOperation traceOperation;
+
+        private ChannelHandlerContext ctx;
+
+        private HttpRequest httpRequest;
+
+        private HttpResponse response;
+
+        private Throwable exception;
+
+        long requestTime = System.currentTimeMillis();
+
+
+        public void run() {
+            ProcessorWrapper processorWrapper = HandlerService.this.httpProcessorMap.get(httpRequest.uri());
+            try {
+                this.postHandler();
+                if (Objects.isNull(processorWrapper.httpProcessor)) {
+                    processorWrapper.asyn.handler(this, httpRequest);
+                    return;
+                }
+                response = processorWrapper.httpProcessor.handler(httpRequest);
+
+                this.preHandler();
+            } catch (Throwable e) {
+                httpServerLogger.error("{},{}");

Review Comment:
   this line is no use.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@eventmesh.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: dev-help@eventmesh.apache.org


[GitHub] [incubator-eventmesh] qqeasonchen commented on pull request #937: [ISSUE #865] Complete the joint commissioning of webhook and httpservice

Posted by GitBox <gi...@apache.org>.
qqeasonchen commented on PR #937:
URL: https://github.com/apache/incubator-eventmesh/pull/937#issuecomment-1160129774

   @githublaohu please add an issue.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@eventmesh.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: dev-help@eventmesh.apache.org


[GitHub] [incubator-eventmesh] xwm1992 commented on a diff in pull request #937: [ISSUE #865] Complete the joint commissioning of webhook and httpservice

Posted by GitBox <gi...@apache.org>.
xwm1992 commented on code in PR #937:
URL: https://github.com/apache/incubator-eventmesh/pull/937#discussion_r916563511


##########
eventmesh-webhook/eventmesh-webhook-admin/src/main/java/org/apache/eventmesh/webhook/admin/FileWebHookConfigOperation.java:
##########
@@ -48,141 +45,144 @@ public class FileWebHookConfigOperation implements WebHookConfigOperation {
 
     private static final Logger logger = LoggerFactory.getLogger(FileWebHookConfigOperation.class);
 
-	private final String webHookFilePath;
-
-	private static final String FILE_PROPERTIES_PREFIX = "eventMesh.webHook.fileMode.";
-
-	public FileWebHookConfigOperation(Properties properties) throws FileNotFoundException {
-		String webHookFilePath = properties.getProperty(FILE_PROPERTIES_PREFIX + "filePath");
-		assert webHookFilePath != null;
-		File webHookFileDir = new File(webHookFilePath);
-		if (!webHookFileDir.isDirectory()) {
-			throw new FileNotFoundException("File path " + webHookFilePath + " is not directory");
-		}
-		if (!webHookFileDir.exists()) {
-			webHookFileDir.mkdirs();
-		}
-		this.webHookFilePath = webHookFilePath;
-	}
-
-	@Override
-	public Integer insertWebHookConfig(WebHookConfig webHookConfig) {
-		if (!webHookConfig.getCallbackPath().startsWith(WebHookOperationConstant.CALLBACK_PATH_PREFIX)) {
-			logger.error("webhookConfig callback path must start with {}", WebHookOperationConstant.CALLBACK_PATH_PREFIX);
-			return 0;
-		}
-		File manuDir = new File(getWebhookConfigManuDir(webHookConfig));
-		if (!manuDir.exists()) {
-			manuDir.mkdir();
-		}
-		File webhookConfigFile = getWebhookConfigFile(webHookConfig);
-		if (webhookConfigFile.exists()) {
-			logger.error("webhookConfig {} is existed", webHookConfig.getCallbackPath());
-			return 0;
-		}
-		return writeToFile(webhookConfigFile, webHookConfig) ? 1 : 0;
-	}
-
-	@Override
-	public Integer updateWebHookConfig(WebHookConfig webHookConfig) {
-		File webhookConfigFile = getWebhookConfigFile(webHookConfig);
-		if (!webhookConfigFile.exists()) {
-			logger.error("webhookConfig {} is not existed", webHookConfig.getCallbackPath());
-			return 0;
-		}
-		return writeToFile(webhookConfigFile, webHookConfig) ? 1 : 0;
-	}
-
-	@Override
-	public Integer deleteWebHookConfig(WebHookConfig webHookConfig) {
-		File webhookConfigFile = getWebhookConfigFile(webHookConfig);
-		if (!webhookConfigFile.exists()) {
-			logger.error("webhookConfig {} is not existed", webHookConfig.getCallbackPath());
-			return 0;
-		}
-		return webhookConfigFile.delete() ? 1 : 0;
-	}
-
-	@Override
-	public WebHookConfig queryWebHookConfigById(WebHookConfig webHookConfig) {
-		File webhookConfigFile = getWebhookConfigFile(webHookConfig);
-		if (!webhookConfigFile.exists()) {
-			logger.error("webhookConfig {} is not existed", webHookConfig.getCallbackPath());
-			return null;
-		}
-		return getWebHookConfigFromFile(webhookConfigFile);
-	}
-
-	@Override
-	public List<WebHookConfig> queryWebHookConfigByManufacturer(WebHookConfig webHookConfig, Integer pageNum,
-			Integer pageSize) {
-		String manuDirPath = getWebhookConfigManuDir(webHookConfig);
-		File manuDir = new File(manuDirPath);
-		if (!manuDir.exists()) {
-			logger.warn("webhookConfig dir {} is not existed", manuDirPath);
-			return new ArrayList<>();
-		}
-		File[] webhookFiles = manuDir.listFiles();
-		int startIndex = (pageNum-1)*pageSize, endIndex = pageNum*pageSize-1;
-		List<WebHookConfig> webHookConfigs = new ArrayList<>();
-		if (webhookFiles.length > startIndex) {
-			for (int i = startIndex; i < endIndex && i < webhookFiles.length; i++) {
-				webHookConfigs.add(getWebHookConfigFromFile(webhookFiles[i]));
-			}
-		}
-		return webHookConfigs;
-	}
-
-	private WebHookConfig getWebHookConfigFromFile(File webhookConfigFile) {
-		StringBuffer fileContent = new StringBuffer();
-		try (BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(webhookConfigFile)))) {
-			String line = null;
-			while ((line = br.readLine()) != null) {
-				fileContent.append(line);
-			}
-		} catch (IOException e) {
-			logger.error("get webhook from file {} error", webhookConfigFile.getPath(), e);
-			return null;
-		}
-		return JsonUtils.deserialize(fileContent.toString(), WebHookConfig.class);
-	}
-
-	private boolean writeToFile(File webhookConfigFile, WebHookConfig webHookConfig) {
-		FileLock lock = null;
-		try (FileOutputStream fos = new FileOutputStream(webhookConfigFile);BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(fos))){
-			// lock this file
-			lock = fos.getChannel().lock();
-			bw.write(JsonUtils.serialize(webHookConfig));
-		} catch (IOException e) {
-			logger.error("write webhookConfig {} to file error", webHookConfig.getCallbackPath());
-			return false;
-		} finally {
-			try {
-				if (lock != null) {
-					lock.release();
-				}
-			} catch (IOException e) {
-				logger.warn("writeToFile finally caught an exception", e);
-			}
-		}
-		return true;
-	}
-
-	private String getWebhookConfigManuDir(WebHookConfig webHookConfig) {
-		return webHookFilePath + WebHookOperationConstant.FILE_SEPARATOR + webHookConfig.getManufacturerName();
-	}
-
-	private File getWebhookConfigFile(WebHookConfig webHookConfig) {
-		String webhookConfigFilePath = null;
-		try {
-			// use URLEncoder.encode before, because the path may contain some speacial char like '/', which is illegal as a file name.
-			webhookConfigFilePath = this.getWebhookConfigManuDir(webHookConfig) + WebHookOperationConstant.FILE_SEPARATOR + URLEncoder.encode(webHookConfig.getCallbackPath(), "UTF-8") + WebHookOperationConstant.FILE_EXTENSION;
-		} catch (UnsupportedEncodingException e) {
-			logger.error("get webhookConfig file path {} failed", webHookConfig.getCallbackPath(), e);
-		}
-		assert webhookConfigFilePath != null;
-		return new File(webhookConfigFilePath);
-	}
+    private final String webHookFilePath;
+
+
+    public FileWebHookConfigOperation(Properties properties) throws FileNotFoundException {
+        String webHookFilePath = WebHookOperationConstant.getFilePath(properties.getProperty("filePath"));
+
+        assert webHookFilePath != null;

Review Comment:
   Here is always true, need remove



##########
eventmesh-common/src/main/java/org/apache/eventmesh/common/config/ConfigurationWrapper.java:
##########
@@ -24,28 +24,31 @@
 import java.io.BufferedReader;
 import java.io.FileReader;
 import java.io.IOException;
+import java.util.Map.Entry;
 import java.util.Properties;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Preconditions;
 
 public class ConfigurationWrapper {
 
     public Logger logger = LoggerFactory.getLogger(this.getClass());
-    
+
     private static final long TIME_INTERVAL = 30 * 1000L;
-    
+
     private String file;
 
     private Properties properties = new Properties();
 
     private boolean reload;
 
-    private ScheduledExecutorService configLoader = ThreadPoolFactory.createSingleScheduledExecutor("eventMesh-configLoader-");
+    private ScheduledExecutorService configLoader = ThreadPoolFactory
+            .createSingleScheduledExecutor("eventMesh-configLoader-");

Review Comment:
   These variables can be added final



##########
eventmesh-webhook/eventmesh-webhook-admin/src/main/java/org/apache/eventmesh/webhook/admin/NacosWebHookConfigOperation.java:
##########
@@ -16,182 +16,190 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-import com.alibaba.nacos.api.PropertyKeyConst;
-import com.alibaba.nacos.api.config.ConfigFactory;
-import com.alibaba.nacos.api.config.ConfigService;
-import com.alibaba.nacos.api.config.ConfigType;
-import com.alibaba.nacos.api.exception.NacosException;
-import com.alibaba.nacos.shaded.io.grpc.netty.shaded.io.netty.util.internal.StringUtil;
+
+import static org.apache.eventmesh.webhook.api.WebHookOperationConstant.DATA_ID_EXTENSION;
+import static org.apache.eventmesh.webhook.api.WebHookOperationConstant.GROUP_PREFIX;
+import static org.apache.eventmesh.webhook.api.WebHookOperationConstant.MANUFACTURERS_DATA_ID;
+import static org.apache.eventmesh.webhook.api.WebHookOperationConstant.TIMEOUT_MS;
+
+
+
 import org.apache.eventmesh.common.utils.JsonUtils;
 import org.apache.eventmesh.webhook.api.ManufacturerObject;
 import org.apache.eventmesh.webhook.api.WebHookConfig;
 import org.apache.eventmesh.webhook.api.WebHookConfigOperation;
+import org.apache.eventmesh.webhook.api.WebHookOperationConstant;
 
 import java.io.UnsupportedEncodingException;
 import java.net.URLEncoder;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
 
-import org.apache.eventmesh.webhook.api.WebHookOperationConstant;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.apache.eventmesh.webhook.api.WebHookOperationConstant.*;
-
-public class NacosWebHookConfigOperation implements WebHookConfigOperation{
-
-	private static final Logger logger = LoggerFactory.getLogger(NacosWebHookConfigOperation.class);
-
-	private ConfigService configService;
-
-	private static final String NACOS_PROPERTIES_PREFIX = "eventMesh.webHook.nacosMode.";
-
-	public NacosWebHookConfigOperation(Properties properties) throws NacosException {
-		Properties nacosProperties = new Properties();
-		nacosProperties.put(PropertyKeyConst.SERVER_ADDR, properties.getProperty(NACOS_PROPERTIES_PREFIX + PropertyKeyConst.SERVER_ADDR));
-		nacosProperties.put(PropertyKeyConst.NAMESPACE, properties.getProperty(NACOS_PROPERTIES_PREFIX + PropertyKeyConst.NAMESPACE));
-		nacosProperties.put(PropertyKeyConst.USERNAME, properties.getProperty(NACOS_PROPERTIES_PREFIX + PropertyKeyConst.USERNAME));
-		nacosProperties.put(PropertyKeyConst.PASSWORD, properties.getProperty(NACOS_PROPERTIES_PREFIX + PropertyKeyConst.PASSWORD));
-		configService = ConfigFactory.createConfigService(nacosProperties);
-
-		String manufacturers= configService.getConfig(MANUFACTURERS_DATA_ID, "webhook", TIMEOUT_MS);
-		if (manufacturers == null) {
-			configService.publishConfig(MANUFACTURERS_DATA_ID, "webhook", JsonUtils.serialize(new ManufacturerObject()), ConfigType.JSON.getType());
-		}
-
-	}
-
-	@Override
-	public Integer insertWebHookConfig(WebHookConfig webHookConfig) {
-		if (!webHookConfig.getCallbackPath().startsWith(WebHookOperationConstant.CALLBACK_PATH_PREFIX)) {
-			logger.error("webhookConfig callback path must start with {}", WebHookOperationConstant.CALLBACK_PATH_PREFIX);
-			return 0;
-		}
-		Boolean result;
-		String manufacturerName = webHookConfig.getManufacturerName();
-		try {
-			if (configService.getConfig(getWebHookConfigDataId(webHookConfig), getManuGroupId(webHookConfig), TIMEOUT_MS) != null) {
-				logger.error("insertWebHookConfig failed, config has existed");
-				return 0;
-			}
-			result = configService.publishConfig(getWebHookConfigDataId(webHookConfig), getManuGroupId(webHookConfig), JsonUtils.serialize(webHookConfig), ConfigType.JSON.getType());
-		} catch (NacosException e) {
-			logger.error("insertWebHookConfig failed", e);
-			return 0;
-		}
-		if (result) {
-			// update manufacturer config
-			try {
-				ManufacturerObject manufacturerObject = getManufacturersInfo();
-				manufacturerObject.addManufacturer(manufacturerName);
-				manufacturerObject.getManufacturerEvents(manufacturerName).add(getWebHookConfigDataId(webHookConfig));
-				configService.publishConfig(MANUFACTURERS_DATA_ID, "webhook", JsonUtils.serialize(manufacturerObject), ConfigType.JSON.getType());
-			} catch (NacosException e) {
-				logger.error("update manufacturersInfo error", e);
-				//rollback insert
-				try {
-					configService.removeConfig(getWebHookConfigDataId(webHookConfig), getManuGroupId(webHookConfig));
-				} catch (NacosException ex) {
-					logger.error("rollback insertWebHookConfig failed", e);
-				}
-			}
-		}
-		return result ? 1 : 0;
-	}
-
-	@Override
-	public Integer updateWebHookConfig(WebHookConfig webHookConfig) {
-		Boolean result = false;
-		try {
-			if (configService.getConfig(getWebHookConfigDataId(webHookConfig), getManuGroupId(webHookConfig), TIMEOUT_MS) == null) {
-				logger.error("updateWebHookConfig failed, config is not existed");
-				return 0;
-			}
-			result = configService.publishConfig(getWebHookConfigDataId(webHookConfig), getManuGroupId(webHookConfig), JsonUtils.serialize(webHookConfig), ConfigType.JSON.getType());
-		} catch (NacosException e) {
-			logger.error("updateWebHookConfig failed", e);
-		}
-		return result ? 1 : 0;
-	}
-
-	@Override
-	public Integer deleteWebHookConfig(WebHookConfig webHookConfig) {
-		Boolean result = false;
-		String manufacturerName = webHookConfig.getManufacturerName();
-		try {
-			result = configService.removeConfig(getWebHookConfigDataId(webHookConfig), getManuGroupId(webHookConfig));
-		} catch (NacosException e) {
-			logger.error("deleteWebHookConfig failed", e);
-		}
-		if (result) {
-			//更新集合
-			try {
-				ManufacturerObject manufacturerObject = getManufacturersInfo();
-				manufacturerObject.getManufacturerEvents(manufacturerName).remove(getWebHookConfigDataId(webHookConfig));
-				configService.publishConfig(MANUFACTURERS_DATA_ID, "webhook", JsonUtils.serialize(manufacturerObject), ConfigType.JSON.getType());
-			} catch (NacosException e) {
-				logger.error("update manufacturersInfo error", e);
-			}
-		}
-		return result ? 1 : 0;
-	}
-
-	@Override
-	public WebHookConfig queryWebHookConfigById(WebHookConfig webHookConfig) {
-		try {
-			String content = configService.getConfig(getWebHookConfigDataId(webHookConfig), getManuGroupId(webHookConfig), TIMEOUT_MS);
-			return JsonUtils.deserialize(content, WebHookConfig.class);
-		} catch (NacosException e) {
-			logger.error("queryWebHookConfigById failed", e);
-		}
-		return null;
-	}
-
-	@Override
-	public List<WebHookConfig> queryWebHookConfigByManufacturer(WebHookConfig webHookConfig, Integer pageNum,
-			Integer pageSize) {
-		List<WebHookConfig> webHookConfigs = new ArrayList<>();
-		String manufacturerName = webHookConfig.getManufacturerName();
-		// get manufacturer event list
-		try {
-			ManufacturerObject manufacturerObject = getManufacturersInfo();
-			List<String> manufacturerEvents = manufacturerObject.getManufacturerEvents(manufacturerName);
-			int startIndex = (pageNum-1)*pageSize, endIndex = pageNum*pageSize-1;
-			if (manufacturerEvents.size() > startIndex) {
-				// nacos API is not able to get all config, so use foreach
-				for (int i=startIndex; i<endIndex && i<manufacturerEvents.size(); i++) {
-					String content = configService.getConfig(manufacturerEvents.get(i)+ DATA_ID_EXTENSION, getManuGroupId(webHookConfig), TIMEOUT_MS);
-					webHookConfigs.add(JsonUtils.deserialize(content, WebHookConfig.class));
-				}
-			}
-		} catch (NacosException e) {
-			logger.error("queryWebHookConfigByManufacturer failed", e);
-		}
-		return webHookConfigs;
-	}
-
-	/**
-	 * @param webHookConfig
-	 * @return
-	 */
-	private String getWebHookConfigDataId(WebHookConfig webHookConfig) {
-		try {
-			// use URLEncoder.encode before, because the path may contain some speacial char like '/', which is illegal as a data id.
-			return URLEncoder.encode(webHookConfig.getCallbackPath(), "UTF-8") + DATA_ID_EXTENSION;
-		} catch (UnsupportedEncodingException e) {
-			logger.error("get webhookConfig dataId {} failed", webHookConfig.getCallbackPath(), e);
-		}
-		return webHookConfig.getCallbackPath() + DATA_ID_EXTENSION;
-	}
-
-	private String getManuGroupId(WebHookConfig webHookConfig) {
-		return GROUP_PREFIX + webHookConfig.getManufacturerName();
-	}
-
-	private ManufacturerObject getManufacturersInfo() throws NacosException {
-		String manufacturersContent = configService.getConfig(MANUFACTURERS_DATA_ID, "webhook", TIMEOUT_MS);
-		return StringUtil.isNullOrEmpty(manufacturersContent) ?
-				new ManufacturerObject():
-				JsonUtils.deserialize(manufacturersContent, ManufacturerObject.class);
-	}
+import com.alibaba.nacos.api.config.ConfigFactory;
+import com.alibaba.nacos.api.config.ConfigService;
+import com.alibaba.nacos.api.config.ConfigType;
+import com.alibaba.nacos.api.exception.NacosException;
+import com.alibaba.nacos.shaded.io.grpc.netty.shaded.io.netty.util.internal.StringUtil;
+
+
+public class NacosWebHookConfigOperation implements WebHookConfigOperation {
+
+    private static final Logger logger = LoggerFactory.getLogger(NacosWebHookConfigOperation.class);
+
+    private ConfigService configService;

Review Comment:
   Here can add final



##########
eventmesh-webhook/eventmesh-webhook-admin/src/main/java/org/apache/eventmesh/webhook/admin/FileWebHookConfigOperation.java:
##########
@@ -48,141 +45,144 @@ public class FileWebHookConfigOperation implements WebHookConfigOperation {
 
     private static final Logger logger = LoggerFactory.getLogger(FileWebHookConfigOperation.class);
 
-	private final String webHookFilePath;
-
-	private static final String FILE_PROPERTIES_PREFIX = "eventMesh.webHook.fileMode.";
-
-	public FileWebHookConfigOperation(Properties properties) throws FileNotFoundException {
-		String webHookFilePath = properties.getProperty(FILE_PROPERTIES_PREFIX + "filePath");
-		assert webHookFilePath != null;
-		File webHookFileDir = new File(webHookFilePath);
-		if (!webHookFileDir.isDirectory()) {
-			throw new FileNotFoundException("File path " + webHookFilePath + " is not directory");
-		}
-		if (!webHookFileDir.exists()) {
-			webHookFileDir.mkdirs();
-		}
-		this.webHookFilePath = webHookFilePath;
-	}
-
-	@Override
-	public Integer insertWebHookConfig(WebHookConfig webHookConfig) {
-		if (!webHookConfig.getCallbackPath().startsWith(WebHookOperationConstant.CALLBACK_PATH_PREFIX)) {
-			logger.error("webhookConfig callback path must start with {}", WebHookOperationConstant.CALLBACK_PATH_PREFIX);
-			return 0;
-		}
-		File manuDir = new File(getWebhookConfigManuDir(webHookConfig));
-		if (!manuDir.exists()) {
-			manuDir.mkdir();
-		}
-		File webhookConfigFile = getWebhookConfigFile(webHookConfig);
-		if (webhookConfigFile.exists()) {
-			logger.error("webhookConfig {} is existed", webHookConfig.getCallbackPath());
-			return 0;
-		}
-		return writeToFile(webhookConfigFile, webHookConfig) ? 1 : 0;
-	}
-
-	@Override
-	public Integer updateWebHookConfig(WebHookConfig webHookConfig) {
-		File webhookConfigFile = getWebhookConfigFile(webHookConfig);
-		if (!webhookConfigFile.exists()) {
-			logger.error("webhookConfig {} is not existed", webHookConfig.getCallbackPath());
-			return 0;
-		}
-		return writeToFile(webhookConfigFile, webHookConfig) ? 1 : 0;
-	}
-
-	@Override
-	public Integer deleteWebHookConfig(WebHookConfig webHookConfig) {
-		File webhookConfigFile = getWebhookConfigFile(webHookConfig);
-		if (!webhookConfigFile.exists()) {
-			logger.error("webhookConfig {} is not existed", webHookConfig.getCallbackPath());
-			return 0;
-		}
-		return webhookConfigFile.delete() ? 1 : 0;
-	}
-
-	@Override
-	public WebHookConfig queryWebHookConfigById(WebHookConfig webHookConfig) {
-		File webhookConfigFile = getWebhookConfigFile(webHookConfig);
-		if (!webhookConfigFile.exists()) {
-			logger.error("webhookConfig {} is not existed", webHookConfig.getCallbackPath());
-			return null;
-		}
-		return getWebHookConfigFromFile(webhookConfigFile);
-	}
-
-	@Override
-	public List<WebHookConfig> queryWebHookConfigByManufacturer(WebHookConfig webHookConfig, Integer pageNum,
-			Integer pageSize) {
-		String manuDirPath = getWebhookConfigManuDir(webHookConfig);
-		File manuDir = new File(manuDirPath);
-		if (!manuDir.exists()) {
-			logger.warn("webhookConfig dir {} is not existed", manuDirPath);
-			return new ArrayList<>();
-		}
-		File[] webhookFiles = manuDir.listFiles();
-		int startIndex = (pageNum-1)*pageSize, endIndex = pageNum*pageSize-1;
-		List<WebHookConfig> webHookConfigs = new ArrayList<>();
-		if (webhookFiles.length > startIndex) {
-			for (int i = startIndex; i < endIndex && i < webhookFiles.length; i++) {
-				webHookConfigs.add(getWebHookConfigFromFile(webhookFiles[i]));
-			}
-		}
-		return webHookConfigs;
-	}
-
-	private WebHookConfig getWebHookConfigFromFile(File webhookConfigFile) {
-		StringBuffer fileContent = new StringBuffer();
-		try (BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(webhookConfigFile)))) {
-			String line = null;
-			while ((line = br.readLine()) != null) {
-				fileContent.append(line);
-			}
-		} catch (IOException e) {
-			logger.error("get webhook from file {} error", webhookConfigFile.getPath(), e);
-			return null;
-		}
-		return JsonUtils.deserialize(fileContent.toString(), WebHookConfig.class);
-	}
-
-	private boolean writeToFile(File webhookConfigFile, WebHookConfig webHookConfig) {
-		FileLock lock = null;
-		try (FileOutputStream fos = new FileOutputStream(webhookConfigFile);BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(fos))){
-			// lock this file
-			lock = fos.getChannel().lock();
-			bw.write(JsonUtils.serialize(webHookConfig));
-		} catch (IOException e) {
-			logger.error("write webhookConfig {} to file error", webHookConfig.getCallbackPath());
-			return false;
-		} finally {
-			try {
-				if (lock != null) {
-					lock.release();
-				}
-			} catch (IOException e) {
-				logger.warn("writeToFile finally caught an exception", e);
-			}
-		}
-		return true;
-	}
-
-	private String getWebhookConfigManuDir(WebHookConfig webHookConfig) {
-		return webHookFilePath + WebHookOperationConstant.FILE_SEPARATOR + webHookConfig.getManufacturerName();
-	}
-
-	private File getWebhookConfigFile(WebHookConfig webHookConfig) {
-		String webhookConfigFilePath = null;
-		try {
-			// use URLEncoder.encode before, because the path may contain some speacial char like '/', which is illegal as a file name.
-			webhookConfigFilePath = this.getWebhookConfigManuDir(webHookConfig) + WebHookOperationConstant.FILE_SEPARATOR + URLEncoder.encode(webHookConfig.getCallbackPath(), "UTF-8") + WebHookOperationConstant.FILE_EXTENSION;
-		} catch (UnsupportedEncodingException e) {
-			logger.error("get webhookConfig file path {} failed", webHookConfig.getCallbackPath(), e);
-		}
-		assert webhookConfigFilePath != null;
-		return new File(webhookConfigFilePath);
-	}
+    private final String webHookFilePath;
+
+
+    public FileWebHookConfigOperation(Properties properties) throws FileNotFoundException {
+        String webHookFilePath = WebHookOperationConstant.getFilePath(properties.getProperty("filePath"));
+
+        assert webHookFilePath != null;
+        File webHookFileDir = new File(webHookFilePath);
+        if (!webHookFileDir.isDirectory()) {
+            throw new FileNotFoundException("File path " + webHookFilePath + " is not directory");
+        }
+        if (!webHookFileDir.exists()) {
+            webHookFileDir.mkdirs();
+        }
+        this.webHookFilePath = webHookFilePath;
+    }
+
+    @Override
+    public Integer insertWebHookConfig(WebHookConfig webHookConfig) {
+        if (!webHookConfig.getCallbackPath().startsWith(WebHookOperationConstant.CALLBACK_PATH_PREFIX)) {
+            logger.error("webhookConfig callback path must start with {}", WebHookOperationConstant.CALLBACK_PATH_PREFIX);
+            return 0;
+        }
+        File manuDir = new File(getWebhookConfigManuDir(webHookConfig));
+        if (!manuDir.exists()) {
+            manuDir.mkdir();
+        }
+        File webhookConfigFile = getWebhookConfigFile(webHookConfig);
+        if (webhookConfigFile.exists()) {
+            logger.error("webhookConfig {} is existed", webHookConfig.getCallbackPath());
+            return 0;
+        }
+        return writeToFile(webhookConfigFile, webHookConfig) ? 1 : 0;
+    }
+
+    @Override
+    public Integer updateWebHookConfig(WebHookConfig webHookConfig) {
+        File webhookConfigFile = getWebhookConfigFile(webHookConfig);
+        if (!webhookConfigFile.exists()) {
+            logger.error("webhookConfig {} is not existed", webHookConfig.getCallbackPath());
+            return 0;
+        }
+        return writeToFile(webhookConfigFile, webHookConfig) ? 1 : 0;
+    }
+
+    @Override
+    public Integer deleteWebHookConfig(WebHookConfig webHookConfig) {
+        File webhookConfigFile = getWebhookConfigFile(webHookConfig);
+        if (!webhookConfigFile.exists()) {
+            logger.error("webhookConfig {} is not existed", webHookConfig.getCallbackPath());
+            return 0;
+        }
+        return webhookConfigFile.delete() ? 1 : 0;
+    }
+
+    @Override
+    public WebHookConfig queryWebHookConfigById(WebHookConfig webHookConfig) {
+        File webhookConfigFile = getWebhookConfigFile(webHookConfig);
+        if (!webhookConfigFile.exists()) {
+            logger.error("webhookConfig {} is not existed", webHookConfig.getCallbackPath());
+            return null;
+        }
+        return getWebHookConfigFromFile(webhookConfigFile);
+    }
+
+    @Override
+    public List<WebHookConfig> queryWebHookConfigByManufacturer(WebHookConfig webHookConfig, Integer pageNum,
+                                                                Integer pageSize) {
+        String manuDirPath = getWebhookConfigManuDir(webHookConfig);
+        File manuDir = new File(manuDirPath);
+        if (!manuDir.exists()) {
+            logger.warn("webhookConfig dir {} is not existed", manuDirPath);
+            return new ArrayList<>();
+        }
+        File[] webhookFiles = manuDir.listFiles();
+        int startIndex = (pageNum - 1) * pageSize;
+        int endIndex = pageNum * pageSize - 1;
+        List<WebHookConfig> webHookConfigs = new ArrayList<>();
+        if (webhookFiles.length > startIndex) {

Review Comment:
   Here webhookFiles may cause NullPointException



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@eventmesh.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: dev-help@eventmesh.apache.org


[GitHub] [incubator-eventmesh] githublaohu commented on a diff in pull request #937: [ISSUE #865] Complete the joint commissioning of webhook and httpservice

Posted by GitBox <gi...@apache.org>.
githublaohu commented on code in PR #937:
URL: https://github.com/apache/incubator-eventmesh/pull/937#discussion_r920326971


##########
eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/AsynHttpProcessor.java:
##########
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.runtime.core.protocol.http.processor;
+
+import org.apache.eventmesh.runtime.core.protocol.http.processor.HandlerService.HandlerSpecific;
+
+import io.netty.handler.codec.http.HttpRequest;
+import io.netty.handler.codec.http.HttpResponse;
+
+/**
+ * aysn http processor

Review Comment:
   ok



##########
eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/HandlerService.java:
##########
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.runtime.core.protocol.http.processor;
+
+import org.apache.eventmesh.runtime.boot.HTTPTrace;
+import org.apache.eventmesh.runtime.boot.HTTPTrace.TraceOperation;
+import org.apache.eventmesh.runtime.metrics.http.HTTPMetricsServer;
+import org.apache.eventmesh.runtime.util.HttpResponseUtils;
+import org.apache.eventmesh.runtime.util.RemotingHelper;
+
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.http.HttpRequest;
+import io.netty.handler.codec.http.HttpResponse;
+
+import lombok.Setter;
+
+
+public class HandlerService {
+
+    private Logger httpServerLogger = LoggerFactory.getLogger(this.getClass());
+
+    private Logger httpLogger = LoggerFactory.getLogger("http");
+
+    private Map<String, ProcessorWrapper> httpProcessorMap = new ConcurrentHashMap<>();
+
+    @Setter
+    private HTTPMetricsServer metrics;
+
+    @Setter
+    private HTTPTrace httpTrace;
+
+
+    public void init() {
+        httpServerLogger.info("HandlerService start ");
+    }
+
+    public void register(HttpProcessor httpProcessor, ThreadPoolExecutor threadPoolExecutor) {
+        for (String path : httpProcessor.paths()) {
+            this.register(path, httpProcessor, threadPoolExecutor);
+        }
+    }
+
+    public void register(String path, HttpProcessor httpProcessor, ThreadPoolExecutor threadPoolExecutor) {
+
+        if (httpProcessorMap.containsKey(path)) {
+            throw new RuntimeException(String.format("HandlerService path %s repeat, repeat processor is %s ",
+                    path, httpProcessor.getClass().getSimpleName()));
+        }
+        ProcessorWrapper processorWrapper = new ProcessorWrapper();
+        processorWrapper.threadPoolExecutor = threadPoolExecutor;
+        processorWrapper.httpProcessor = httpProcessor;
+        if (httpProcessor instanceof AsynHttpProcessor) {
+            processorWrapper.asyn = (AsynHttpProcessor) httpProcessor;
+        } else {
+            processorWrapper.httpProcessor = httpProcessor;
+        }
+        httpProcessorMap.put(path, processorWrapper);
+        httpServerLogger.info("path is {}  proocessor name is {}", path, httpProcessor.getClass().getSimpleName());
+    }
+
+    public boolean isProcessorWrapper(HttpRequest httpRequest) {
+        return Objects.nonNull(this.getProcessorWrapper(httpRequest));
+    }
+
+    private ProcessorWrapper getProcessorWrapper(HttpRequest httpRequest) {
+        String uri = httpRequest.uri();
+        for (Entry<String, ProcessorWrapper> e : httpProcessorMap.entrySet()) {
+            if (e.getKey().startsWith(uri)) {
+                return e.getValue();
+            }
+        }
+        return null;
+    }
+
+    /**
+     * @param httpRequest
+     */
+    public void handler(ChannelHandlerContext ctx, HttpRequest httpRequest) {
+
+
+        TraceOperation traceOperation = httpTrace.getTraceOperation(httpRequest, ctx.channel());
+
+        ProcessorWrapper processorWrapper = getProcessorWrapper(httpRequest);
+        if (Objects.isNull(processorWrapper)) {
+            this.sendResponse(ctx, HttpResponseUtils.createNotFound());
+        }
+        try {
+            HandlerSpecific handlerSpecific = new HandlerSpecific();
+            handlerSpecific.httpRequest = httpRequest;
+            handlerSpecific.ctx = ctx;
+            handlerSpecific.traceOperation = traceOperation;
+            processorWrapper.threadPoolExecutor.execute(handlerSpecific);
+        } catch (Exception e) {
+            httpServerLogger.error(e.getMessage(), e);
+            this.sendResponse(ctx, HttpResponseUtils.createInternalServerError());
+        }
+    }
+
+    private void sendResponse(ChannelHandlerContext ctx, HttpResponse response) {
+        this.sendResponse(ctx, response, true);
+    }
+
+    private void sendResponse(ChannelHandlerContext ctx, HttpResponse response, boolean isClose) {
+        ctx.writeAndFlush(response).addListener((ChannelFutureListener) f -> {
+            if (!f.isSuccess()) {
+                httpLogger.warn("send response to [{}] fail, will close this channel",
+                        RemotingHelper.parseChannelRemoteAddr(f.channel()));
+                if (isClose) {
+                    f.channel().close();
+                }
+            }
+        });
+    }
+
+    class HandlerSpecific implements Runnable {
+
+        private TraceOperation traceOperation;
+
+        private ChannelHandlerContext ctx;
+
+        private HttpRequest httpRequest;
+
+        private HttpResponse response;
+
+        private Throwable exception;
+
+        long requestTime = System.currentTimeMillis();
+
+
+        public void run() {
+            ProcessorWrapper processorWrapper = HandlerService.this.httpProcessorMap.get(httpRequest.uri());
+            try {
+                this.postHandler();
+                if (Objects.isNull(processorWrapper.httpProcessor)) {
+                    processorWrapper.asyn.handler(this, httpRequest);
+                    return;
+                }
+                response = processorWrapper.httpProcessor.handler(httpRequest);
+
+                this.preHandler();
+            } catch (Throwable e) {
+                httpServerLogger.error(e.getMessage(), e);
+                exception = e;
+                this.errer();
+            }
+        }
+
+        private void preHandler() {
+            metrics.getSummaryMetrics().recordHTTPRequest();
+            if (httpLogger.isDebugEnabled()) {
+                httpLogger.debug("{}", httpRequest);
+            }
+            if (Objects.isNull(response)) {
+                this.response = HttpResponseUtils.createSuccess();
+            }
+            this.traceOperation.endTrace();
+            this.sendResponse(this.response);
+        }
+
+        private void postHandler() {
+            metrics.getSummaryMetrics().recordHTTPReqResTimeCost(System.currentTimeMillis() - requestTime);
+            if (httpLogger.isDebugEnabled()) {
+                httpLogger.debug("{}", response);
+            }
+        }
+
+        private void errer() {
+            this.traceOperation.exceptionTrace(this.exception);
+            metrics.getSummaryMetrics().recordHTTPDiscard();
+            metrics.getSummaryMetrics().recordHTTPReqResTimeCost(System.currentTimeMillis() - requestTime);
+            this.sendResponse(HttpResponseUtils.createInternalServerError());
+        }
+
+
+        public void setResponseJsonBody(String body) {
+            this.sendResponse(HttpResponseUtils.setResponseJsonBody(body, ctx));
+        }
+
+        public void setResponseTextBody(String body) {
+            this.sendResponse(HttpResponseUtils.setResponseTextBody(body, ctx));
+        }
+
+        public void sendResponse(HttpResponse response) {
+            this.response = response;
+            this.preHandler();
+        }
+
+        /**
+         * @param count
+         */
+        public void recordSendBatchMsgFailed(int count) {
+            metrics.getSummaryMetrics().recordSendBatchMsgFailed(1);
+        }
+
+    }
+
+
+    private static class ProcessorWrapper {
+
+        private ThreadPoolExecutor threadPoolExecutor;
+
+        private HttpProcessor httpProcessor;
+
+        private AsynHttpProcessor asyn;

Review Comment:
   ok



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@eventmesh.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: dev-help@eventmesh.apache.org


[GitHub] [incubator-eventmesh] githublaohu commented on a diff in pull request #937: [ISSUE #865] Complete the joint commissioning of webhook and httpservice

Posted by GitBox <gi...@apache.org>.
githublaohu commented on code in PR #937:
URL: https://github.com/apache/incubator-eventmesh/pull/937#discussion_r920328893


##########
eventmesh-webhook/eventmesh-webhook-receive/src/main/java/org/apache/eventmesh/webhook/receive/storage/HookConfigOperationManage.java:
##########
@@ -61,20 +63,19 @@ public HookConfigOperationManage() {
      * @param operationMode file/nacos...
      * @param config        Parameters required to initialize the behavior

Review Comment:
   OK



##########
eventmesh-webhook/eventmesh-webhook-receive/src/main/java/org/apache/eventmesh/webhook/receive/storage/WebhookFileListener.java:
##########
@@ -74,9 +75,6 @@ public WebhookFileListener(String filePath, Map<String, WebHookConfig> cacheWebH
      */
     public void filePatternInit() throws FileNotFoundException {

Review Comment:
   OK



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@eventmesh.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: dev-help@eventmesh.apache.org


[GitHub] [incubator-eventmesh] xwm1992 merged pull request #937: [ISSUE #865] Complete the joint commissioning of webhook and httpservice

Posted by GitBox <gi...@apache.org>.
xwm1992 merged PR #937:
URL: https://github.com/apache/incubator-eventmesh/pull/937


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@eventmesh.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: dev-help@eventmesh.apache.org


[GitHub] [incubator-eventmesh] xwm1992 commented on a diff in pull request #937: [ISSUE #865] Complete the joint commissioning of webhook and httpservice

Posted by GitBox <gi...@apache.org>.
xwm1992 commented on code in PR #937:
URL: https://github.com/apache/incubator-eventmesh/pull/937#discussion_r919731520


##########
eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/HandlerService.java:
##########
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.runtime.core.protocol.http.processor;
+
+import org.apache.eventmesh.runtime.boot.HTTPTrace;
+import org.apache.eventmesh.runtime.boot.HTTPTrace.TraceOperation;
+import org.apache.eventmesh.runtime.metrics.http.HTTPMetricsServer;
+import org.apache.eventmesh.runtime.util.HttpResponseUtils;
+import org.apache.eventmesh.runtime.util.RemotingHelper;
+
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.http.HttpRequest;
+import io.netty.handler.codec.http.HttpResponse;
+
+import lombok.Setter;
+
+
+public class HandlerService {
+
+    private Logger httpServerLogger = LoggerFactory.getLogger(this.getClass());
+
+    private Logger httpLogger = LoggerFactory.getLogger("http");
+
+    private Map<String, ProcessorWrapper> httpProcessorMap = new ConcurrentHashMap<>();
+
+    @Setter
+    private HTTPMetricsServer metrics;
+
+    @Setter
+    private HTTPTrace httpTrace;
+
+
+    public void init() {
+        httpServerLogger.info("HandlerService start ");
+    }
+
+    public void register(HttpProcessor httpProcessor, ThreadPoolExecutor threadPoolExecutor) {
+        for (String path : httpProcessor.paths()) {
+            this.register(path, httpProcessor, threadPoolExecutor);
+        }
+    }
+
+    public void register(String path, HttpProcessor httpProcessor, ThreadPoolExecutor threadPoolExecutor) {
+
+        if (httpProcessorMap.containsKey(path)) {
+            throw new RuntimeException(String.format("HandlerService path %s repeat, repeat processor is %s ",
+                    path, httpProcessor.getClass().getSimpleName()));
+        }
+        ProcessorWrapper processorWrapper = new ProcessorWrapper();
+        processorWrapper.threadPoolExecutor = threadPoolExecutor;
+        processorWrapper.httpProcessor = httpProcessor;
+        if (httpProcessor instanceof AsynHttpProcessor) {
+            processorWrapper.asyn = (AsynHttpProcessor) httpProcessor;
+        } else {
+            processorWrapper.httpProcessor = httpProcessor;
+        }
+        httpProcessorMap.put(path, processorWrapper);
+        httpServerLogger.info("path is {}  proocessor name is {}", path, httpProcessor.getClass().getSimpleName());
+    }
+
+    public boolean isProcessorWrapper(HttpRequest httpRequest) {
+        return Objects.nonNull(this.getProcessorWrapper(httpRequest));
+    }
+
+    private ProcessorWrapper getProcessorWrapper(HttpRequest httpRequest) {
+        String uri = httpRequest.uri();
+        for (Entry<String, ProcessorWrapper> e : httpProcessorMap.entrySet()) {
+            if (e.getKey().startsWith(uri)) {
+                return e.getValue();
+            }
+        }
+        return null;
+    }
+
+    /**
+     * @param httpRequest
+     */
+    public void handler(ChannelHandlerContext ctx, HttpRequest httpRequest) {
+
+
+        TraceOperation traceOperation = httpTrace.getTraceOperation(httpRequest, ctx.channel());
+
+        ProcessorWrapper processorWrapper = getProcessorWrapper(httpRequest);
+        if (Objects.isNull(processorWrapper)) {
+            this.sendResponse(ctx, HttpResponseUtils.createNotFound());
+        }
+        try {
+            HandlerSpecific handlerSpecific = new HandlerSpecific();
+            handlerSpecific.httpRequest = httpRequest;
+            handlerSpecific.ctx = ctx;
+            handlerSpecific.traceOperation = traceOperation;
+            processorWrapper.threadPoolExecutor.execute(handlerSpecific);
+        } catch (Exception e) {
+            httpServerLogger.error(e.getMessage(), e);
+            this.sendResponse(ctx, HttpResponseUtils.createInternalServerError());
+        }
+    }
+
+    private void sendResponse(ChannelHandlerContext ctx, HttpResponse response) {
+        this.sendResponse(ctx, response, true);
+    }
+
+    private void sendResponse(ChannelHandlerContext ctx, HttpResponse response, boolean isClose) {
+        ctx.writeAndFlush(response).addListener((ChannelFutureListener) f -> {
+            if (!f.isSuccess()) {
+                httpLogger.warn("send response to [{}] fail, will close this channel",
+                        RemotingHelper.parseChannelRemoteAddr(f.channel()));
+                if (isClose) {
+                    f.channel().close();
+                }
+            }
+        });
+    }
+
+    class HandlerSpecific implements Runnable {
+
+        private TraceOperation traceOperation;
+
+        private ChannelHandlerContext ctx;
+
+        private HttpRequest httpRequest;
+
+        private HttpResponse response;
+
+        private Throwable exception;
+
+        long requestTime = System.currentTimeMillis();
+
+
+        public void run() {
+            ProcessorWrapper processorWrapper = HandlerService.this.httpProcessorMap.get(httpRequest.uri());
+            try {
+                this.postHandler();
+                if (Objects.isNull(processorWrapper.httpProcessor)) {
+                    processorWrapper.asyn.handler(this, httpRequest);
+                    return;
+                }
+                response = processorWrapper.httpProcessor.handler(httpRequest);
+
+                this.preHandler();
+            } catch (Throwable e) {
+                httpServerLogger.error(e.getMessage(), e);
+                exception = e;
+                this.errer();
+            }
+        }
+
+        private void preHandler() {
+            metrics.getSummaryMetrics().recordHTTPRequest();
+            if (httpLogger.isDebugEnabled()) {
+                httpLogger.debug("{}", httpRequest);
+            }
+            if (Objects.isNull(response)) {
+                this.response = HttpResponseUtils.createSuccess();
+            }
+            this.traceOperation.endTrace();
+            this.sendResponse(this.response);
+        }
+
+        private void postHandler() {
+            metrics.getSummaryMetrics().recordHTTPReqResTimeCost(System.currentTimeMillis() - requestTime);
+            if (httpLogger.isDebugEnabled()) {
+                httpLogger.debug("{}", response);
+            }
+        }
+
+        private void errer() {
+            this.traceOperation.exceptionTrace(this.exception);
+            metrics.getSummaryMetrics().recordHTTPDiscard();
+            metrics.getSummaryMetrics().recordHTTPReqResTimeCost(System.currentTimeMillis() - requestTime);
+            this.sendResponse(HttpResponseUtils.createInternalServerError());
+        }
+
+
+        public void setResponseJsonBody(String body) {
+            this.sendResponse(HttpResponseUtils.setResponseJsonBody(body, ctx));
+        }
+
+        public void setResponseTextBody(String body) {
+            this.sendResponse(HttpResponseUtils.setResponseTextBody(body, ctx));
+        }
+
+        public void sendResponse(HttpResponse response) {
+            this.response = response;
+            this.preHandler();
+        }
+
+        /**
+         * @param count
+         */
+        public void recordSendBatchMsgFailed(int count) {
+            metrics.getSummaryMetrics().recordSendBatchMsgFailed(1);
+        }
+
+    }
+
+
+    private static class ProcessorWrapper {
+
+        private ThreadPoolExecutor threadPoolExecutor;
+
+        private HttpProcessor httpProcessor;
+
+        private AsynHttpProcessor asyn;

Review Comment:
   asyn -> async



##########
eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/HandlerService.java:
##########
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.runtime.core.protocol.http.processor;
+
+import org.apache.eventmesh.runtime.boot.HTTPTrace;
+import org.apache.eventmesh.runtime.boot.HTTPTrace.TraceOperation;
+import org.apache.eventmesh.runtime.metrics.http.HTTPMetricsServer;
+import org.apache.eventmesh.runtime.util.HttpResponseUtils;
+import org.apache.eventmesh.runtime.util.RemotingHelper;
+
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.http.HttpRequest;
+import io.netty.handler.codec.http.HttpResponse;
+
+import lombok.Setter;
+
+
+public class HandlerService {
+
+    private Logger httpServerLogger = LoggerFactory.getLogger(this.getClass());
+
+    private Logger httpLogger = LoggerFactory.getLogger("http");
+
+    private Map<String, ProcessorWrapper> httpProcessorMap = new ConcurrentHashMap<>();
+
+    @Setter
+    private HTTPMetricsServer metrics;
+
+    @Setter
+    private HTTPTrace httpTrace;
+
+
+    public void init() {
+        httpServerLogger.info("HandlerService start ");
+    }
+
+    public void register(HttpProcessor httpProcessor, ThreadPoolExecutor threadPoolExecutor) {
+        for (String path : httpProcessor.paths()) {
+            this.register(path, httpProcessor, threadPoolExecutor);
+        }
+    }
+
+    public void register(String path, HttpProcessor httpProcessor, ThreadPoolExecutor threadPoolExecutor) {
+
+        if (httpProcessorMap.containsKey(path)) {
+            throw new RuntimeException(String.format("HandlerService path %s repeat, repeat processor is %s ",
+                    path, httpProcessor.getClass().getSimpleName()));
+        }
+        ProcessorWrapper processorWrapper = new ProcessorWrapper();
+        processorWrapper.threadPoolExecutor = threadPoolExecutor;
+        processorWrapper.httpProcessor = httpProcessor;
+        if (httpProcessor instanceof AsynHttpProcessor) {
+            processorWrapper.asyn = (AsynHttpProcessor) httpProcessor;
+        } else {
+            processorWrapper.httpProcessor = httpProcessor;
+        }
+        httpProcessorMap.put(path, processorWrapper);
+        httpServerLogger.info("path is {}  proocessor name is {}", path, httpProcessor.getClass().getSimpleName());
+    }
+
+    public boolean isProcessorWrapper(HttpRequest httpRequest) {
+        return Objects.nonNull(this.getProcessorWrapper(httpRequest));
+    }
+
+    private ProcessorWrapper getProcessorWrapper(HttpRequest httpRequest) {
+        String uri = httpRequest.uri();
+        for (Entry<String, ProcessorWrapper> e : httpProcessorMap.entrySet()) {
+            if (e.getKey().startsWith(uri)) {
+                return e.getValue();
+            }
+        }
+        return null;
+    }
+
+    /**
+     * @param httpRequest
+     */
+    public void handler(ChannelHandlerContext ctx, HttpRequest httpRequest) {
+
+
+        TraceOperation traceOperation = httpTrace.getTraceOperation(httpRequest, ctx.channel());
+
+        ProcessorWrapper processorWrapper = getProcessorWrapper(httpRequest);
+        if (Objects.isNull(processorWrapper)) {
+            this.sendResponse(ctx, HttpResponseUtils.createNotFound());

Review Comment:
   after sendResponse here need to return ?



##########
eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/AsynHttpProcessor.java:
##########
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.runtime.core.protocol.http.processor;
+
+import org.apache.eventmesh.runtime.core.protocol.http.processor.HandlerService.HandlerSpecific;
+
+import io.netty.handler.codec.http.HttpRequest;
+import io.netty.handler.codec.http.HttpResponse;
+
+/**
+ * aysn http processor

Review Comment:
   aysn -> async



##########
eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/HandlerService.java:
##########
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.runtime.core.protocol.http.processor;
+
+import org.apache.eventmesh.runtime.boot.HTTPTrace;
+import org.apache.eventmesh.runtime.boot.HTTPTrace.TraceOperation;
+import org.apache.eventmesh.runtime.metrics.http.HTTPMetricsServer;
+import org.apache.eventmesh.runtime.util.HttpResponseUtils;
+import org.apache.eventmesh.runtime.util.RemotingHelper;
+
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.http.HttpRequest;
+import io.netty.handler.codec.http.HttpResponse;
+
+import lombok.Setter;
+
+
+public class HandlerService {
+
+    private Logger httpServerLogger = LoggerFactory.getLogger(this.getClass());
+
+    private Logger httpLogger = LoggerFactory.getLogger("http");
+
+    private Map<String, ProcessorWrapper> httpProcessorMap = new ConcurrentHashMap<>();
+
+    @Setter
+    private HTTPMetricsServer metrics;
+
+    @Setter
+    private HTTPTrace httpTrace;
+
+
+    public void init() {
+        httpServerLogger.info("HandlerService start ");
+    }
+
+    public void register(HttpProcessor httpProcessor, ThreadPoolExecutor threadPoolExecutor) {
+        for (String path : httpProcessor.paths()) {
+            this.register(path, httpProcessor, threadPoolExecutor);
+        }
+    }
+
+    public void register(String path, HttpProcessor httpProcessor, ThreadPoolExecutor threadPoolExecutor) {
+
+        if (httpProcessorMap.containsKey(path)) {
+            throw new RuntimeException(String.format("HandlerService path %s repeat, repeat processor is %s ",
+                    path, httpProcessor.getClass().getSimpleName()));
+        }
+        ProcessorWrapper processorWrapper = new ProcessorWrapper();
+        processorWrapper.threadPoolExecutor = threadPoolExecutor;
+        processorWrapper.httpProcessor = httpProcessor;
+        if (httpProcessor instanceof AsynHttpProcessor) {
+            processorWrapper.asyn = (AsynHttpProcessor) httpProcessor;
+        } else {
+            processorWrapper.httpProcessor = httpProcessor;

Review Comment:
   `processorWrapper.httpProcessor` has already assigned value at line 76



##########
eventmesh-webhook/eventmesh-webhook-receive/src/main/java/org/apache/eventmesh/webhook/receive/storage/WebhookFileListener.java:
##########
@@ -74,9 +75,6 @@ public WebhookFileListener(String filePath, Map<String, WebHookConfig> cacheWebH
      */
     public void filePatternInit() throws FileNotFoundException {

Review Comment:
   After updating the code , this FileNotFoundException will never thrown, can be removed



##########
eventmesh-webhook/eventmesh-webhook-receive/src/main/java/org/apache/eventmesh/webhook/receive/storage/HookConfigOperationManage.java:
##########
@@ -61,20 +63,19 @@ public HookConfigOperationManage() {
      * @param operationMode file/nacos...
      * @param config        Parameters required to initialize the behavior

Review Comment:
   remove these comments



##########
eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/AsynHttpProcessor.java:
##########
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.runtime.core.protocol.http.processor;
+
+import org.apache.eventmesh.runtime.core.protocol.http.processor.HandlerService.HandlerSpecific;
+
+import io.netty.handler.codec.http.HttpRequest;
+import io.netty.handler.codec.http.HttpResponse;
+
+/**
+ * aysn http processor
+ */
+public interface AsynHttpProcessor extends HttpProcessor {

Review Comment:
   please rename to AsyncHttpProcessor



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@eventmesh.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: dev-help@eventmesh.apache.org