You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@devlake.apache.org by kl...@apache.org on 2023/02/03 11:54:40 UTC

[incubator-devlake] branch main updated: feat: add GetNextPageCustomData to support request pages one by one (#4320)

This is an automated email from the ASF dual-hosted git repository.

klesh pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git


The following commit(s) were added to refs/heads/main by this push:
     new 7f46bc17f feat: add GetNextPageCustomData to support request pages one by one (#4320)
7f46bc17f is described below

commit 7f46bc17f187b489b19764d2ecefd375542cf5db
Author: Likyh <ya...@meri.co>
AuthorDate: Fri Feb 3 19:54:35 2023 +0800

    feat: add GetNextPageCustomData to support request pages one by one (#4320)
    
    * fix: delete debug code
    
    * fix: replace PageInOrder with GetNextPageCustomData
    
    * fix: fix for review
---
 backend/helpers/pluginhelper/api/api_collector.go | 42 ++++++++++++++++++++---
 1 file changed, 37 insertions(+), 5 deletions(-)

diff --git a/backend/helpers/pluginhelper/api/api_collector.go b/backend/helpers/pluginhelper/api/api_collector.go
index 0a0bd020e..98f25ce59 100644
--- a/backend/helpers/pluginhelper/api/api_collector.go
+++ b/backend/helpers/pluginhelper/api/api_collector.go
@@ -44,6 +44,8 @@ type RequestData struct {
 	Params    interface{}
 	Input     interface{}
 	InputJSON []byte
+	// equal to the return value from GetNextPageCustomData when PageSize>0 and not the first request
+	CustomData interface{}
 }
 
 // AsyncResponseHandler FIXME ...
@@ -57,21 +59,23 @@ type ApiCollectorArgs struct {
 	// For detail of what variables can be used, please check `RequestData`
 	UrlTemplate string `comment:"GoTemplate for API url"`
 	// Query would be sent out as part of the request URL
-	Query func(reqData *RequestData) (url.Values, errors.Error) ``
+	Query func(reqData *RequestData) (url.Values, errors.Error)
 	// Header would be sent out along with request
 	Header func(reqData *RequestData) (http.Header, errors.Error)
+	// GetTotalPages is to tell `ApiCollector` total number of pages based on response of the first page.
+	// so `ApiCollector` could collect those pages in parallel for us
+	GetTotalPages func(res *http.Response, args *ApiCollectorArgs) (int, errors.Error)
 	// PageSize tells ApiCollector the page size
 	PageSize int
-	// Incremental indicate if this is a incremental collection, the existing data won't get deleted if it was true
+	// GetNextPageCustomData indicate if this collection request each page in order and build query by the prev request
+	GetNextPageCustomData func(prevReqData *RequestData, prevPageResponse *http.Response) (interface{}, errors.Error)
+	// Incremental indicate if this is an incremental collection, the existing data won't get deleted if it was true
 	Incremental bool `comment:"indicate if this collection is incremental update"`
 	// ApiClient is a asynchronize api request client with qps
 	ApiClient RateLimitedApiClient
 	// Input helps us collect data based on previous collected data, like collecting changelogs based on jira
 	// issue ids
 	Input Iterator
-	// GetTotalPages is to tell `ApiCollector` total number of pages based on response of the first page.
-	// so `ApiCollector` could collect those pages in parallel for us
-	GetTotalPages func(res *http.Response, args *ApiCollectorArgs) (int, errors.Error)
 	// Concurrency specify qps for api that doesn't return total number of pages/records
 	// NORMALLY, DO NOT SPECIFY THIS PARAMETER, unless you know what it means
 	Concurrency    int
@@ -208,6 +212,8 @@ func (collector *ApiCollector) exec(input interface{}) {
 	}
 	if collector.args.PageSize <= 0 {
 		collector.fetchAsync(reqData, nil)
+	} else if collector.args.GetNextPageCustomData != nil {
+		collector.fetchPagesSequentially(reqData)
 	} else if collector.args.GetTotalPages != nil {
 		collector.fetchPagesDetermined(reqData)
 	} else {
@@ -215,6 +221,32 @@ func (collector *ApiCollector) exec(input interface{}) {
 	}
 }
 
+// fetchPagesSequentially fetches data of all pages in order to build RequestData by prev response
+func (collector *ApiCollector) fetchPagesSequentially(reqData *RequestData) {
+	var collect func() errors.Error
+	collect = func() errors.Error {
+		collector.fetchAsync(reqData, func(count int, body []byte, res *http.Response) errors.Error {
+			if count < collector.args.PageSize {
+				return nil
+			}
+			customData, err := collector.args.GetNextPageCustomData(reqData, res)
+			if err != nil {
+				if errors.Is(err, ErrFinishCollect) {
+					return nil
+				} else {
+					panic(err)
+				}
+			}
+			reqData.CustomData = customData
+			reqData.Pager.Skip += collector.args.PageSize
+			reqData.Pager.Page += 1
+			return collect()
+		})
+		return nil
+	}
+	collector.args.ApiClient.NextTick(collect)
+}
+
 // fetchPagesDetermined fetches data of all pages for APIs that return paging information
 func (collector *ApiCollector) fetchPagesDetermined(reqData *RequestData) {
 	// fetch first page