You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@devlake.apache.org by ab...@apache.org on 2022/09/28 14:51:19 UTC
[incubator-devlake] 01/02: feat(framework): add post api support
This is an automated email from the ASF dual-hosted git repository.
abeizn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git
commit ece5a603aa028dc271864925c7396a114d9a71c2
Author: Yingchu Chen <yi...@merico.dev>
AuthorDate: Wed Sep 28 15:11:42 2022 +0800
feat(framework): add post api support
---
plugins/helper/api_async_client.go | 12 ++++++++++++
plugins/helper/api_collector.go | 17 ++++++++++++-----
2 files changed, 24 insertions(+), 5 deletions(-)
diff --git a/plugins/helper/api_async_client.go b/plugins/helper/api_async_client.go
index 979029e6..fe87876c 100644
--- a/plugins/helper/api_async_client.go
+++ b/plugins/helper/api_async_client.go
@@ -219,6 +219,17 @@ func (apiClient *ApiAsyncClient) DoGetAsync(
apiClient.DoAsync(http.MethodGet, path, query, nil, header, handler, 0)
}
+// DoPostAsync Enqueue an api post request, the request may be sent sometime in future in parallel with other api requests
+func (apiClient *ApiAsyncClient) DoPostAsync(
+ path string,
+ query url.Values,
+ body interface{},
+ header http.Header,
+ handler common.ApiAsyncCallback,
+) {
+ apiClient.DoAsync(http.MethodPost, path, query, body, header, handler, 0)
+}
+
// WaitAsync blocks until all async requests were done
func (apiClient *ApiAsyncClient) WaitAsync() errors.Error {
return apiClient.scheduler.Wait()
@@ -247,6 +258,7 @@ func (apiClient *ApiAsyncClient) Release() {
// RateLimitedApiClient FIXME ...
type RateLimitedApiClient interface {
DoGetAsync(path string, query url.Values, header http.Header, handler common.ApiAsyncCallback)
+ DoPostAsync(path string, query url.Values, body interface{}, header http.Header, handler common.ApiAsyncCallback)
WaitAsync() errors.Error
HasError() bool
NextTick(task func() errors.Error)
diff --git a/plugins/helper/api_collector.go b/plugins/helper/api_collector.go
index 1f8919d4..057c081f 100644
--- a/plugins/helper/api_collector.go
+++ b/plugins/helper/api_collector.go
@@ -78,6 +78,8 @@ type ApiCollectorArgs struct {
Concurrency int
ResponseParser func(res *http.Response) ([]json.RawMessage, errors.Error)
AfterResponse common.ApiClientAfterResponse
+ RequestBody interface{}
+ Method string
}
// ApiCollector FIXME ...
@@ -345,8 +347,8 @@ func (collector *ApiCollector) fetchAsync(reqData *RequestData, handler func(int
}
logger := collector.args.Ctx.GetLogger()
logger.Debug("fetchAsync <<< enqueueing for %s %v", apiUrl, apiQuery)
- collector.args.ApiClient.DoGetAsync(apiUrl, apiQuery, apiHeader, func(res *http.Response) errors.Error {
- defer logger.Debug("fetchAsync >>> done for %s %v", apiUrl, apiQuery)
+ responseHandler := func(res *http.Response) errors.Error {
+ defer logger.Debug("fetchAsync >>> done for %s %v %v", apiUrl, apiQuery, collector.args.RequestBody)
logger := collector.args.Ctx.GetLogger()
// read body to buffer
body, err := io.ReadAll(res.Body)
@@ -367,13 +369,13 @@ func (collector *ApiCollector) fetchAsync(reqData *RequestData, handler func(int
return nil
}
db := collector.args.Ctx.GetDal()
- url := res.Request.URL.String()
+ urlString := res.Request.URL.String()
rows := make([]*RawData, count)
for i, msg := range items {
rows[i] = &RawData{
Params: collector.params,
Data: msg,
- Url: url,
+ Url: urlString,
Input: reqData.InputJSON,
}
}
@@ -389,7 +391,12 @@ func (collector *ApiCollector) fetchAsync(reqData *RequestData, handler func(int
return handler(count, body, res)
}
return nil
- })
+ }
+ if collector.args.Method == http.MethodPost {
+ collector.args.ApiClient.DoPostAsync(apiUrl, apiQuery, collector.args.RequestBody, apiHeader, responseHandler)
+ } else {
+ collector.args.ApiClient.DoGetAsync(apiUrl, apiQuery, apiHeader, responseHandler)
+ }
logger.Debug("fetchAsync === enqueued for %s %v", apiUrl, apiQuery)
}