You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@devlake.apache.org by ab...@apache.org on 2022/09/28 14:51:19 UTC

[incubator-devlake] 01/02: feat(framework): add post api support

This is an automated email from the ASF dual-hosted git repository.

abeizn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git

commit ece5a603aa028dc271864925c7396a114d9a71c2
Author: Yingchu Chen <yi...@merico.dev>
AuthorDate: Wed Sep 28 15:11:42 2022 +0800

    feat(framework): add post api support
---
 plugins/helper/api_async_client.go | 12 ++++++++++++
 plugins/helper/api_collector.go    | 17 ++++++++++++-----
 2 files changed, 24 insertions(+), 5 deletions(-)

diff --git a/plugins/helper/api_async_client.go b/plugins/helper/api_async_client.go
index 979029e6..fe87876c 100644
--- a/plugins/helper/api_async_client.go
+++ b/plugins/helper/api_async_client.go
@@ -219,6 +219,17 @@ func (apiClient *ApiAsyncClient) DoGetAsync(
 	apiClient.DoAsync(http.MethodGet, path, query, nil, header, handler, 0)
 }
 
+// DoPostAsync Enqueue an api post request, the request may be sent sometime in future in parallel with other api requests
+func (apiClient *ApiAsyncClient) DoPostAsync(
+	path string,
+	query url.Values,
+	body interface{},
+	header http.Header,
+	handler common.ApiAsyncCallback,
+) {
+	apiClient.DoAsync(http.MethodPost, path, query, body, header, handler, 0)
+}
+
 // WaitAsync blocks until all async requests were done
 func (apiClient *ApiAsyncClient) WaitAsync() errors.Error {
 	return apiClient.scheduler.Wait()
@@ -247,6 +258,7 @@ func (apiClient *ApiAsyncClient) Release() {
 // RateLimitedApiClient FIXME ...
 type RateLimitedApiClient interface {
 	DoGetAsync(path string, query url.Values, header http.Header, handler common.ApiAsyncCallback)
+	DoPostAsync(path string, query url.Values, body interface{}, header http.Header, handler common.ApiAsyncCallback)
 	WaitAsync() errors.Error
 	HasError() bool
 	NextTick(task func() errors.Error)
diff --git a/plugins/helper/api_collector.go b/plugins/helper/api_collector.go
index 1f8919d4..057c081f 100644
--- a/plugins/helper/api_collector.go
+++ b/plugins/helper/api_collector.go
@@ -78,6 +78,8 @@ type ApiCollectorArgs struct {
 	Concurrency    int
 	ResponseParser func(res *http.Response) ([]json.RawMessage, errors.Error)
 	AfterResponse  common.ApiClientAfterResponse
+	RequestBody    interface{}
+	Method         string
 }
 
 // ApiCollector FIXME ...
@@ -345,8 +347,8 @@ func (collector *ApiCollector) fetchAsync(reqData *RequestData, handler func(int
 	}
 	logger := collector.args.Ctx.GetLogger()
 	logger.Debug("fetchAsync <<< enqueueing for %s %v", apiUrl, apiQuery)
-	collector.args.ApiClient.DoGetAsync(apiUrl, apiQuery, apiHeader, func(res *http.Response) errors.Error {
-		defer logger.Debug("fetchAsync >>> done for %s %v", apiUrl, apiQuery)
+	responseHandler := func(res *http.Response) errors.Error {
+		defer logger.Debug("fetchAsync >>> done for %s %v %v", apiUrl, apiQuery, collector.args.RequestBody)
 		logger := collector.args.Ctx.GetLogger()
 		// read body to buffer
 		body, err := io.ReadAll(res.Body)
@@ -367,13 +369,13 @@ func (collector *ApiCollector) fetchAsync(reqData *RequestData, handler func(int
 			return nil
 		}
 		db := collector.args.Ctx.GetDal()
-		url := res.Request.URL.String()
+		urlString := res.Request.URL.String()
 		rows := make([]*RawData, count)
 		for i, msg := range items {
 			rows[i] = &RawData{
 				Params: collector.params,
 				Data:   msg,
-				Url:    url,
+				Url:    urlString,
 				Input:  reqData.InputJSON,
 			}
 		}
@@ -389,7 +391,12 @@ func (collector *ApiCollector) fetchAsync(reqData *RequestData, handler func(int
 			return handler(count, body, res)
 		}
 		return nil
-	})
+	}
+	if collector.args.Method == http.MethodPost {
+		collector.args.ApiClient.DoPostAsync(apiUrl, apiQuery, collector.args.RequestBody, apiHeader, responseHandler)
+	} else {
+		collector.args.ApiClient.DoGetAsync(apiUrl, apiQuery, apiHeader, responseHandler)
+	}
 	logger.Debug("fetchAsync === enqueued for %s %v", apiUrl, apiQuery)
 }