You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@devlake.apache.org by zh...@apache.org on 2023/02/22 07:59:10 UTC

[incubator-devlake] branch main updated: feat: finalizable collector helper / github pr support timeFilter/dif… (#4478)

This is an automated email from the ASF dual-hosted git repository.

zhangliang2022 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git


The following commit(s) were added to refs/heads/main by this push:
     new 63d0062ad feat: finalizable collector helper / github pr support timeFilter/dif… (#4478)
63d0062ad is described below

commit 63d0062ad60c1960fdfb333fa970664772fde2af
Author: Klesh Wong <zh...@merico.dev>
AuthorDate: Wed Feb 22 15:59:05 2023 +0800

    feat: finalizable collector helper / github pr support timeFilter/dif… (#4478)
    
    * feat: finalizable collector helper / github pr support timeFilter/diffSync
    
    * fix: GetCreated should be optional for APIs that support filtering by createdAt
    
    * fix: typo and alias
---
 .../pluginhelper/api/api_collector_with_state.go   | 212 ++++++++++++++++++---
 backend/plugins/github/tasks/pr_collector.go       | 121 ++++++++----
 .../github_graphql/tasks/check_run_collector.go    |   2 +-
 3 files changed, 274 insertions(+), 61 deletions(-)

diff --git a/backend/helpers/pluginhelper/api/api_collector_with_state.go b/backend/helpers/pluginhelper/api/api_collector_with_state.go
index 3d8b11eda..c506468cb 100644
--- a/backend/helpers/pluginhelper/api/api_collector_with_state.go
+++ b/backend/helpers/pluginhelper/api/api_collector_with_state.go
@@ -18,18 +18,24 @@ limitations under the License.
 package api
 
 import (
+	"encoding/json"
+	"net/http"
+	"net/url"
 	"time"
 
 	"github.com/apache/incubator-devlake/core/dal"
 	"github.com/apache/incubator-devlake/core/errors"
 	"github.com/apache/incubator-devlake/core/models"
+	"github.com/apache/incubator-devlake/core/plugin"
+	"github.com/apache/incubator-devlake/helpers/pluginhelper/common"
 )
 
 // ApiCollectorStateManager save collector state in framework table
 type ApiCollectorStateManager struct {
 	RawDataSubTaskArgs
-	*ApiCollector
-	*GraphqlCollector
+	// *ApiCollector
+	// *GraphqlCollector
+	subtasks    []plugin.SubTask
 	LatestState models.CollectorLatestState
 	// Deprecating(timeAfter): to be deleted
 	CreatedDateAfter *time.Time
@@ -94,40 +100,36 @@ func (m *ApiCollectorStateManager) IsIncremental() bool {
 }
 
 // InitCollector init the embedded collector
-func (m *ApiCollectorStateManager) InitCollector(args ApiCollectorArgs) (err errors.Error) {
+func (m *ApiCollectorStateManager) InitCollector(args ApiCollectorArgs) errors.Error {
 	args.RawDataSubTaskArgs = m.RawDataSubTaskArgs
-	m.ApiCollector, err = NewApiCollector(args)
-	return err
+	apiCollector, err := NewApiCollector(args)
+	if err != nil {
+		return err
+	}
+	m.subtasks = append(m.subtasks, apiCollector)
+	return nil
 }
 
 // InitGraphQLCollector init the embedded collector
-func (m *ApiCollectorStateManager) InitGraphQLCollector(args GraphqlCollectorArgs) (err errors.Error) {
+func (m *ApiCollectorStateManager) InitGraphQLCollector(args GraphqlCollectorArgs) errors.Error {
 	args.RawDataSubTaskArgs = m.RawDataSubTaskArgs
-	m.GraphqlCollector, err = NewGraphqlCollector(args)
-	return err
-}
-
-// Execute the embedded collector and record execute state
-func (m ApiCollectorStateManager) Execute() errors.Error {
-	err := m.ApiCollector.Execute()
+	graphqlCollector, err := NewGraphqlCollector(args)
 	if err != nil {
 		return err
 	}
-
-	return m.updateState()
+	m.subtasks = append(m.subtasks, graphqlCollector)
+	return nil
 }
 
-// ExecuteGraphQL the embedded collector and record execute state
-func (m ApiCollectorStateManager) ExecuteGraphQL() errors.Error {
-	err := m.GraphqlCollector.Execute()
-	if err != nil {
-		return err
+// Execute the embedded collector and record execute state
+func (m *ApiCollectorStateManager) Execute() errors.Error {
+	for _, subtask := range m.subtasks {
+		err := subtask.Execute()
+		if err != nil {
+			return err
+		}
 	}
 
-	return m.updateState()
-}
-
-func (m ApiCollectorStateManager) updateState() errors.Error {
 	db := m.Ctx.GetDal()
 	m.LatestState.LatestSuccessStart = &m.ExecuteStart
 	// Deprecating(timeAfter): to be deleted
@@ -135,3 +137,165 @@ func (m ApiCollectorStateManager) updateState() errors.Error {
 	m.LatestState.TimeAfter = m.TimeAfter
 	return db.CreateOrUpdate(&m.LatestState)
 }
+
+// NewStatefulApiCollectorForFinalizableEntity aims to add timeFilter/diffSync support for
+// APIs that do NOT support filtering data by updated date. However, it comes with the
+// following constraints:
+//  1. The entity is a short-lived object or it is likely to be irrelevant
+//     a. ci/id pipelines are short-lived objects
+//     b. pull request might took a year to be closed or never, but it is likely irrelevant
+//  2. The entity must be Finalizable: when it is finalized, no modification forever
+//  3. The API must fit one of the following traits:
+//     a. it supports filtering by Created Date, in this case, you may specify the `GetTotalPages`
+//     option to fetch data with Determined Strategy if possible.
+//     b. or sorting by Created Date in Descending order, in this case, you must use `Concurrency`
+//     or `GetNextPageCustomData` instead of `GetTotalPages` for Undetermined Strategy since we have
+//     to stop the process in the middle.
+func NewStatefulApiCollectorForFinalizableEntity(args FinalizableApiCollectorArgs) (plugin.SubTask, errors.Error) {
+	// create a manager which could execute multiple collector but acts as a single subtask to callers
+	manager, err := NewStatefulApiCollector(RawDataSubTaskArgs{
+		Ctx:    args.Ctx,
+		Params: args.Params,
+		Table:  args.Table,
+	}, args.TimeAfter)
+	if err != nil {
+		return nil, err
+	}
+
+	// // prepare the basic variables
+	var isIncremental = manager.IsIncremental()
+	var createdAfter *time.Time
+	if isIncremental {
+		createdAfter = manager.LatestState.LatestSuccessStart
+	} else {
+		createdAfter = manager.TimeAfter
+	}
+
+	// step 1: create a collector to collect newly added records
+	err = manager.InitCollector(ApiCollectorArgs{
+		ApiClient: args.ApiClient,
+		// common
+		Incremental: isIncremental,
+		UrlTemplate: args.CollectNewRecordsByList.UrlTemplate,
+		Query: func(reqData *RequestData) (url.Values, errors.Error) {
+			if args.CollectNewRecordsByList.Query != nil {
+				return args.CollectNewRecordsByList.Query(reqData, createdAfter)
+			}
+			return nil, nil
+		},
+		Header: func(reqData *RequestData) (http.Header, errors.Error) {
+			if args.CollectNewRecordsByList.Header != nil {
+				return args.CollectNewRecordsByList.Header(reqData, createdAfter)
+			}
+			return nil, nil
+		},
+		MinTickInterval: args.CollectNewRecordsByList.MinTickInterval,
+		ResponseParser: func(res *http.Response) ([]json.RawMessage, errors.Error) {
+			items, err := args.CollectNewRecordsByList.ResponseParser(res)
+			if err != nil {
+				return nil, err
+			}
+			if len(items) == 0 {
+				return nil, nil
+			}
+
+			// time filter or diff sync
+			if createdAfter != nil && args.CollectNewRecordsByList.GetCreated != nil {
+				// if the first record of the page was created before createdAfter, return emtpy set and stop
+				firstCreated, err := args.CollectNewRecordsByList.GetCreated(items[0])
+				if err != nil {
+					return nil, err
+				}
+				if firstCreated.Before(*createdAfter) {
+					return nil, ErrFinishCollect
+				}
+				// if the last record was created before createdAfter, return records and stop
+				lastCreated, err := args.CollectNewRecordsByList.GetCreated(items[len(items)-1])
+				if err != nil {
+					return nil, err
+				}
+				if lastCreated.Before(*createdAfter) {
+					return items, ErrFinishCollect
+				}
+			}
+			return items, err
+		},
+		AfterResponse: args.CollectNewRecordsByList.AfterResponse,
+		RequestBody:   args.CollectNewRecordsByList.RequestBody,
+		Method:        args.CollectNewRecordsByList.Method,
+		// pagination
+		PageSize:              args.CollectNewRecordsByList.PageSize,
+		Concurrency:           args.CollectNewRecordsByList.Concurrency,
+		GetNextPageCustomData: args.CollectNewRecordsByList.GetNextPageCustomData,
+		// GetTotalPages:         args.CollectNewRecordsByList.GetTotalPages,
+	})
+
+	if err != nil {
+		return nil, err
+	}
+
+	// step 2: create another collector to collect updated records
+	// TODO: this creates cursor before previous step gets executed, which is too early, to be optimized
+	input, err := args.CollectUnfinishedDetails.BuildInputIterator()
+	if err != nil {
+		return nil, err
+	}
+	err = manager.InitCollector(ApiCollectorArgs{
+		ApiClient: args.ApiClient,
+		// common
+		Incremental: true,
+		Input:       input,
+		UrlTemplate: args.CollectUnfinishedDetails.UrlTemplate,
+		Query: func(reqData *RequestData) (url.Values, errors.Error) {
+			if args.CollectUnfinishedDetails.Query != nil {
+				return args.CollectUnfinishedDetails.Query(reqData, createdAfter)
+			}
+			return nil, nil
+		},
+		Header: func(reqData *RequestData) (http.Header, errors.Error) {
+			if args.CollectUnfinishedDetails.Header != nil {
+				return args.CollectUnfinishedDetails.Header(reqData, createdAfter)
+			}
+			return nil, nil
+		},
+		MinTickInterval: args.CollectUnfinishedDetails.MinTickInterval,
+		ResponseParser:  args.CollectUnfinishedDetails.ResponseParser,
+		AfterResponse:   args.CollectUnfinishedDetails.AfterResponse,
+		RequestBody:     args.CollectUnfinishedDetails.RequestBody,
+		Method:          args.CollectUnfinishedDetails.Method,
+	})
+	return manager, err
+}
+
+type FinalizableApiCollectorArgs struct {
+	RawDataSubTaskArgs
+	ApiClient                RateLimitedApiClient
+	TimeAfter                *time.Time // leave it be nil to disable time filter
+	CollectNewRecordsByList  FinalizableApiCollectorListArgs
+	CollectUnfinishedDetails FinalizableApiCollectorDetailArgs
+}
+
+type FinalizableApiCollectorCommonArgs struct {
+	UrlTemplate     string `comment:"GoTemplate for API url"`
+	Query           func(reqData *RequestData, createdAfter *time.Time) (url.Values, errors.Error)
+	Header          func(reqData *RequestData, createdAfter *time.Time) (http.Header, errors.Error)
+	MinTickInterval *time.Duration
+	ResponseParser  func(res *http.Response) ([]json.RawMessage, errors.Error)
+	AfterResponse   common.ApiClientAfterResponse
+	RequestBody     func(reqData *RequestData) map[string]interface{}
+	Method          string
+}
+type FinalizableApiCollectorListArgs struct {
+	// optional, leave it be `nil` if API supports filtering by created date (Don't forget to set the Query)
+	GetCreated func(item json.RawMessage) (time.Time, errors.Error)
+	FinalizableApiCollectorCommonArgs
+	Concurrency           int
+	PageSize              int
+	GetNextPageCustomData func(prevReqData *RequestData, prevPageResponse *http.Response) (interface{}, errors.Error)
+	// need to consider the data missing problem: what if new data gets created during collection?
+	// GetTotalPages         func(res *http.Response, args *ApiCollectorArgs) (int, errors.Error)
+}
+type FinalizableApiCollectorDetailArgs struct {
+	FinalizableApiCollectorCommonArgs
+	BuildInputIterator func() (Iterator, errors.Error)
+}
diff --git a/backend/plugins/github/tasks/pr_collector.go b/backend/plugins/github/tasks/pr_collector.go
index 50cde330c..4456a9d3b 100644
--- a/backend/plugins/github/tasks/pr_collector.go
+++ b/backend/plugins/github/tasks/pr_collector.go
@@ -20,12 +20,17 @@ package tasks
 import (
 	"encoding/json"
 	"fmt"
+	"io"
 	"net/http"
 	"net/url"
+	"reflect"
+	"time"
 
+	"github.com/apache/incubator-devlake/core/dal"
 	"github.com/apache/incubator-devlake/core/errors"
 	"github.com/apache/incubator-devlake/core/plugin"
 	helper "github.com/apache/incubator-devlake/helpers/pluginhelper/api"
+	"github.com/apache/incubator-devlake/plugins/github/models"
 )
 
 const RAW_PULL_REQUEST_TABLE = "github_api_pull_requests"
@@ -38,50 +43,94 @@ var CollectApiPullRequestsMeta = plugin.SubTaskMeta{
 	DomainTypes:      []string{plugin.DOMAIN_TYPE_CROSS, plugin.DOMAIN_TYPE_CODE_REVIEW},
 }
 
-func CollectApiPullRequests(taskCtx plugin.SubTaskContext) errors.Error {
-	data := taskCtx.GetData().(*GithubTaskData)
-	collectorWithState, err := helper.NewApiCollectorWithState(helper.RawDataSubTaskArgs{
-		Ctx: taskCtx,
-		Params: GithubApiParams{
-			ConnectionId: data.Options.ConnectionId,
-			Name:         data.Options.Name,
-		},
-		Table: RAW_PULL_REQUEST_TABLE,
-	}, data.TimeAfter)
-	if err != nil {
-		return err
-	}
-
-	err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
-		ApiClient:   data.ApiClient,
-		PageSize:    100,
-		Incremental: false,
+type SimpleGithubPr struct {
+	GithubId int64
+}
 
-		UrlTemplate: "repos/{{ .Params.Name }}/pulls",
+type SimpleGithubApiPr struct {
+	CreatedAt time.Time `json:"created_at"`
+}
 
-		Query: func(reqData *helper.RequestData) (url.Values, errors.Error) {
-			query := url.Values{}
-			query.Set("state", "all")
-			query.Set("page", fmt.Sprintf("%v", reqData.Pager.Page))
-			query.Set("direction", "asc")
-			query.Set("per_page", fmt.Sprintf("%v", reqData.Pager.Size))
+func CollectApiPullRequests(taskCtx plugin.SubTaskContext) errors.Error {
+	// pull requests are Finalizable, they can't be re-open once closed
+	data := taskCtx.GetData().(*GithubTaskData)
+	db := taskCtx.GetDal()
 
-			return query, nil
+	collector, err := helper.NewStatefulApiCollectorForFinalizableEntity(helper.FinalizableApiCollectorArgs{
+		RawDataSubTaskArgs: helper.RawDataSubTaskArgs{
+			Ctx: taskCtx,
+			Params: GithubApiParams{
+				ConnectionId: data.Options.ConnectionId,
+				Name:         data.Options.Name,
+			},
+			Table: RAW_PULL_REQUEST_TABLE,
 		},
-
-		GetTotalPages: GetTotalPagesFromResponse,
-		ResponseParser: func(res *http.Response) ([]json.RawMessage, errors.Error) {
-			var items []json.RawMessage
-			err := helper.UnmarshalResponse(res, &items)
-			if err != nil {
-				return nil, err
-			}
-			return items, nil
+		ApiClient: data.ApiClient,
+		TimeAfter: data.TimeAfter, // set to nil to disable timeFilter
+		CollectNewRecordsByList: helper.FinalizableApiCollectorListArgs{
+			PageSize:    100,
+			Concurrency: 10,
+			FinalizableApiCollectorCommonArgs: helper.FinalizableApiCollectorCommonArgs{
+				UrlTemplate: "repos/{{ .Params.Name }}/pulls",
+				Query: func(reqData *helper.RequestData, createdAfter *time.Time) (url.Values, errors.Error) {
+					query := url.Values{}
+					query.Set("state", "all")
+					query.Set("direction", "desc")
+					query.Set("page", fmt.Sprintf("%v", reqData.Pager.Page))
+					query.Set("per_page", fmt.Sprintf("%v", reqData.Pager.Size))
+					return query, nil
+				},
+				ResponseParser: func(res *http.Response) ([]json.RawMessage, errors.Error) {
+					var items []json.RawMessage
+					err := helper.UnmarshalResponse(res, &items)
+					if err != nil {
+						return nil, err
+					}
+					return items, nil
+				},
+			},
+			GetCreated: func(item json.RawMessage) (time.Time, errors.Error) {
+				pr := &SimpleGithubApiPr{}
+				err := json.Unmarshal(item, pr)
+				if err != nil {
+					return time.Time{}, errors.BadInput.Wrap(err, "failed to unmarshal github pull request")
+				}
+				return pr.CreatedAt, nil
+			},
+		},
+		CollectUnfinishedDetails: helper.FinalizableApiCollectorDetailArgs{
+			BuildInputIterator: func() (helper.Iterator, errors.Error) {
+				// select pull id from database
+				cursor, err := db.Cursor(
+					dal.Select("github_id"),
+					dal.From(&models.GithubPullRequest{}),
+					dal.Where(
+						"repo_id = ? AND connection_id = ? AND state != 'closed'",
+						data.Options.GithubId, data.Options.ConnectionId,
+					),
+				)
+				if err != nil {
+					return nil, err
+				}
+				return helper.NewDalCursorIterator(db, cursor, reflect.TypeOf(SimpleGithubPr{}))
+			},
+			FinalizableApiCollectorCommonArgs: helper.FinalizableApiCollectorCommonArgs{
+				UrlTemplate: "repos/{{ .Params.Name }}/pulls/{{ .Input.GithubId }}",
+				ResponseParser: func(res *http.Response) ([]json.RawMessage, errors.Error) {
+					body, err := io.ReadAll(res.Body)
+					if err != nil {
+						return nil, errors.Convert(err)
+					}
+					res.Body.Close()
+					return []json.RawMessage{body}, nil
+				},
+			},
 		},
 	})
+
 	if err != nil {
 		return err
 	}
 
-	return collectorWithState.Execute()
+	return collector.Execute()
 }
diff --git a/backend/plugins/github_graphql/tasks/check_run_collector.go b/backend/plugins/github_graphql/tasks/check_run_collector.go
index e21db4d63..65a4b4c12 100644
--- a/backend/plugins/github_graphql/tasks/check_run_collector.go
+++ b/backend/plugins/github_graphql/tasks/check_run_collector.go
@@ -208,5 +208,5 @@ func CollectCheckRun(taskCtx plugin.SubTaskContext) errors.Error {
 		return err
 	}
 
-	return collectorWithState.ExecuteGraphQL()
+	return collectorWithState.Execute()
 }