You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@devlake.apache.org by zh...@apache.org on 2023/02/22 07:59:10 UTC
[incubator-devlake] branch main updated: feat: finalizable collector helper / github pr support timeFilter/dif… (#4478)
This is an automated email from the ASF dual-hosted git repository.
zhangliang2022 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git
The following commit(s) were added to refs/heads/main by this push:
new 63d0062ad feat: finalizable collector helper / github pr support timeFilter/dif… (#4478)
63d0062ad is described below
commit 63d0062ad60c1960fdfb333fa970664772fde2af
Author: Klesh Wong <zh...@merico.dev>
AuthorDate: Wed Feb 22 15:59:05 2023 +0800
feat: finalizable collector helper / github pr support timeFilter/dif… (#4478)
* feat: finalizable collector helper / github pr support timeFilter/diffSync
* fix: GetCreated should be optional for APIs that support filtering by createdAt
* fix: typo and alias
---
.../pluginhelper/api/api_collector_with_state.go | 212 ++++++++++++++++++---
backend/plugins/github/tasks/pr_collector.go | 121 ++++++++----
.../github_graphql/tasks/check_run_collector.go | 2 +-
3 files changed, 274 insertions(+), 61 deletions(-)
diff --git a/backend/helpers/pluginhelper/api/api_collector_with_state.go b/backend/helpers/pluginhelper/api/api_collector_with_state.go
index 3d8b11eda..c506468cb 100644
--- a/backend/helpers/pluginhelper/api/api_collector_with_state.go
+++ b/backend/helpers/pluginhelper/api/api_collector_with_state.go
@@ -18,18 +18,24 @@ limitations under the License.
package api
import (
+ "encoding/json"
+ "net/http"
+ "net/url"
"time"
"github.com/apache/incubator-devlake/core/dal"
"github.com/apache/incubator-devlake/core/errors"
"github.com/apache/incubator-devlake/core/models"
+ "github.com/apache/incubator-devlake/core/plugin"
+ "github.com/apache/incubator-devlake/helpers/pluginhelper/common"
)
// ApiCollectorStateManager save collector state in framework table
type ApiCollectorStateManager struct {
RawDataSubTaskArgs
- *ApiCollector
- *GraphqlCollector
+ // *ApiCollector
+ // *GraphqlCollector
+ subtasks []plugin.SubTask
LatestState models.CollectorLatestState
// Deprecating(timeAfter): to be deleted
CreatedDateAfter *time.Time
@@ -94,40 +100,36 @@ func (m *ApiCollectorStateManager) IsIncremental() bool {
}
// InitCollector init the embedded collector
-func (m *ApiCollectorStateManager) InitCollector(args ApiCollectorArgs) (err errors.Error) {
+func (m *ApiCollectorStateManager) InitCollector(args ApiCollectorArgs) errors.Error {
args.RawDataSubTaskArgs = m.RawDataSubTaskArgs
- m.ApiCollector, err = NewApiCollector(args)
- return err
+ apiCollector, err := NewApiCollector(args)
+ if err != nil {
+ return err
+ }
+ m.subtasks = append(m.subtasks, apiCollector)
+ return nil
}
// InitGraphQLCollector init the embedded collector
-func (m *ApiCollectorStateManager) InitGraphQLCollector(args GraphqlCollectorArgs) (err errors.Error) {
+func (m *ApiCollectorStateManager) InitGraphQLCollector(args GraphqlCollectorArgs) errors.Error {
args.RawDataSubTaskArgs = m.RawDataSubTaskArgs
- m.GraphqlCollector, err = NewGraphqlCollector(args)
- return err
-}
-
-// Execute the embedded collector and record execute state
-func (m ApiCollectorStateManager) Execute() errors.Error {
- err := m.ApiCollector.Execute()
+ graphqlCollector, err := NewGraphqlCollector(args)
if err != nil {
return err
}
-
- return m.updateState()
+ m.subtasks = append(m.subtasks, graphqlCollector)
+ return nil
}
-// ExecuteGraphQL the embedded collector and record execute state
-func (m ApiCollectorStateManager) ExecuteGraphQL() errors.Error {
- err := m.GraphqlCollector.Execute()
- if err != nil {
- return err
+// Execute the embedded collector and record execute state
+func (m *ApiCollectorStateManager) Execute() errors.Error {
+ for _, subtask := range m.subtasks {
+ err := subtask.Execute()
+ if err != nil {
+ return err
+ }
}
- return m.updateState()
-}
-
-func (m ApiCollectorStateManager) updateState() errors.Error {
db := m.Ctx.GetDal()
m.LatestState.LatestSuccessStart = &m.ExecuteStart
// Deprecating(timeAfter): to be deleted
@@ -135,3 +137,165 @@ func (m ApiCollectorStateManager) updateState() errors.Error {
m.LatestState.TimeAfter = m.TimeAfter
return db.CreateOrUpdate(&m.LatestState)
}
+
+// NewStatefulApiCollectorForFinalizableEntity aims to add timeFilter/diffSync support for
+// APIs that do NOT support filtering data by updated date. However, it comes with the
+// following constraints:
+// 1. The entity is a short-lived object or it is likely to be irrelevant
+// a. ci/id pipelines are short-lived objects
+// b. pull request might took a year to be closed or never, but it is likely irrelevant
+// 2. The entity must be Finalizable: when it is finalized, no modification forever
+// 3. The API must fit one of the following traits:
+// a. it supports filtering by Created Date, in this case, you may specify the `GetTotalPages`
+// option to fetch data with Determined Strategy if possible.
+// b. or sorting by Created Date in Descending order, in this case, you must use `Concurrency`
+// or `GetNextPageCustomData` instead of `GetTotalPages` for Undetermined Strategy since we have
+// to stop the process in the middle.
+func NewStatefulApiCollectorForFinalizableEntity(args FinalizableApiCollectorArgs) (plugin.SubTask, errors.Error) {
+ // create a manager which could execute multiple collector but acts as a single subtask to callers
+ manager, err := NewStatefulApiCollector(RawDataSubTaskArgs{
+ Ctx: args.Ctx,
+ Params: args.Params,
+ Table: args.Table,
+ }, args.TimeAfter)
+ if err != nil {
+ return nil, err
+ }
+
+ // // prepare the basic variables
+ var isIncremental = manager.IsIncremental()
+ var createdAfter *time.Time
+ if isIncremental {
+ createdAfter = manager.LatestState.LatestSuccessStart
+ } else {
+ createdAfter = manager.TimeAfter
+ }
+
+ // step 1: create a collector to collect newly added records
+ err = manager.InitCollector(ApiCollectorArgs{
+ ApiClient: args.ApiClient,
+ // common
+ Incremental: isIncremental,
+ UrlTemplate: args.CollectNewRecordsByList.UrlTemplate,
+ Query: func(reqData *RequestData) (url.Values, errors.Error) {
+ if args.CollectNewRecordsByList.Query != nil {
+ return args.CollectNewRecordsByList.Query(reqData, createdAfter)
+ }
+ return nil, nil
+ },
+ Header: func(reqData *RequestData) (http.Header, errors.Error) {
+ if args.CollectNewRecordsByList.Header != nil {
+ return args.CollectNewRecordsByList.Header(reqData, createdAfter)
+ }
+ return nil, nil
+ },
+ MinTickInterval: args.CollectNewRecordsByList.MinTickInterval,
+ ResponseParser: func(res *http.Response) ([]json.RawMessage, errors.Error) {
+ items, err := args.CollectNewRecordsByList.ResponseParser(res)
+ if err != nil {
+ return nil, err
+ }
+ if len(items) == 0 {
+ return nil, nil
+ }
+
+ // time filter or diff sync
+ if createdAfter != nil && args.CollectNewRecordsByList.GetCreated != nil {
+ // if the first record of the page was created before createdAfter, return emtpy set and stop
+ firstCreated, err := args.CollectNewRecordsByList.GetCreated(items[0])
+ if err != nil {
+ return nil, err
+ }
+ if firstCreated.Before(*createdAfter) {
+ return nil, ErrFinishCollect
+ }
+ // if the last record was created before createdAfter, return records and stop
+ lastCreated, err := args.CollectNewRecordsByList.GetCreated(items[len(items)-1])
+ if err != nil {
+ return nil, err
+ }
+ if lastCreated.Before(*createdAfter) {
+ return items, ErrFinishCollect
+ }
+ }
+ return items, err
+ },
+ AfterResponse: args.CollectNewRecordsByList.AfterResponse,
+ RequestBody: args.CollectNewRecordsByList.RequestBody,
+ Method: args.CollectNewRecordsByList.Method,
+ // pagination
+ PageSize: args.CollectNewRecordsByList.PageSize,
+ Concurrency: args.CollectNewRecordsByList.Concurrency,
+ GetNextPageCustomData: args.CollectNewRecordsByList.GetNextPageCustomData,
+ // GetTotalPages: args.CollectNewRecordsByList.GetTotalPages,
+ })
+
+ if err != nil {
+ return nil, err
+ }
+
+ // step 2: create another collector to collect updated records
+ // TODO: this creates cursor before previous step gets executed, which is too early, to be optimized
+ input, err := args.CollectUnfinishedDetails.BuildInputIterator()
+ if err != nil {
+ return nil, err
+ }
+ err = manager.InitCollector(ApiCollectorArgs{
+ ApiClient: args.ApiClient,
+ // common
+ Incremental: true,
+ Input: input,
+ UrlTemplate: args.CollectUnfinishedDetails.UrlTemplate,
+ Query: func(reqData *RequestData) (url.Values, errors.Error) {
+ if args.CollectUnfinishedDetails.Query != nil {
+ return args.CollectUnfinishedDetails.Query(reqData, createdAfter)
+ }
+ return nil, nil
+ },
+ Header: func(reqData *RequestData) (http.Header, errors.Error) {
+ if args.CollectUnfinishedDetails.Header != nil {
+ return args.CollectUnfinishedDetails.Header(reqData, createdAfter)
+ }
+ return nil, nil
+ },
+ MinTickInterval: args.CollectUnfinishedDetails.MinTickInterval,
+ ResponseParser: args.CollectUnfinishedDetails.ResponseParser,
+ AfterResponse: args.CollectUnfinishedDetails.AfterResponse,
+ RequestBody: args.CollectUnfinishedDetails.RequestBody,
+ Method: args.CollectUnfinishedDetails.Method,
+ })
+ return manager, err
+}
+
+type FinalizableApiCollectorArgs struct {
+ RawDataSubTaskArgs
+ ApiClient RateLimitedApiClient
+ TimeAfter *time.Time // leave it be nil to disable time filter
+ CollectNewRecordsByList FinalizableApiCollectorListArgs
+ CollectUnfinishedDetails FinalizableApiCollectorDetailArgs
+}
+
+type FinalizableApiCollectorCommonArgs struct {
+ UrlTemplate string `comment:"GoTemplate for API url"`
+ Query func(reqData *RequestData, createdAfter *time.Time) (url.Values, errors.Error)
+ Header func(reqData *RequestData, createdAfter *time.Time) (http.Header, errors.Error)
+ MinTickInterval *time.Duration
+ ResponseParser func(res *http.Response) ([]json.RawMessage, errors.Error)
+ AfterResponse common.ApiClientAfterResponse
+ RequestBody func(reqData *RequestData) map[string]interface{}
+ Method string
+}
+type FinalizableApiCollectorListArgs struct {
+ // optional, leave it be `nil` if API supports filtering by created date (Don't forget to set the Query)
+ GetCreated func(item json.RawMessage) (time.Time, errors.Error)
+ FinalizableApiCollectorCommonArgs
+ Concurrency int
+ PageSize int
+ GetNextPageCustomData func(prevReqData *RequestData, prevPageResponse *http.Response) (interface{}, errors.Error)
+ // need to consider the data missing problem: what if new data gets created during collection?
+ // GetTotalPages func(res *http.Response, args *ApiCollectorArgs) (int, errors.Error)
+}
+type FinalizableApiCollectorDetailArgs struct {
+ FinalizableApiCollectorCommonArgs
+ BuildInputIterator func() (Iterator, errors.Error)
+}
diff --git a/backend/plugins/github/tasks/pr_collector.go b/backend/plugins/github/tasks/pr_collector.go
index 50cde330c..4456a9d3b 100644
--- a/backend/plugins/github/tasks/pr_collector.go
+++ b/backend/plugins/github/tasks/pr_collector.go
@@ -20,12 +20,17 @@ package tasks
import (
"encoding/json"
"fmt"
+ "io"
"net/http"
"net/url"
+ "reflect"
+ "time"
+ "github.com/apache/incubator-devlake/core/dal"
"github.com/apache/incubator-devlake/core/errors"
"github.com/apache/incubator-devlake/core/plugin"
helper "github.com/apache/incubator-devlake/helpers/pluginhelper/api"
+ "github.com/apache/incubator-devlake/plugins/github/models"
)
const RAW_PULL_REQUEST_TABLE = "github_api_pull_requests"
@@ -38,50 +43,94 @@ var CollectApiPullRequestsMeta = plugin.SubTaskMeta{
DomainTypes: []string{plugin.DOMAIN_TYPE_CROSS, plugin.DOMAIN_TYPE_CODE_REVIEW},
}
-func CollectApiPullRequests(taskCtx plugin.SubTaskContext) errors.Error {
- data := taskCtx.GetData().(*GithubTaskData)
- collectorWithState, err := helper.NewApiCollectorWithState(helper.RawDataSubTaskArgs{
- Ctx: taskCtx,
- Params: GithubApiParams{
- ConnectionId: data.Options.ConnectionId,
- Name: data.Options.Name,
- },
- Table: RAW_PULL_REQUEST_TABLE,
- }, data.TimeAfter)
- if err != nil {
- return err
- }
-
- err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
- ApiClient: data.ApiClient,
- PageSize: 100,
- Incremental: false,
+type SimpleGithubPr struct {
+ GithubId int64
+}
- UrlTemplate: "repos/{{ .Params.Name }}/pulls",
+type SimpleGithubApiPr struct {
+ CreatedAt time.Time `json:"created_at"`
+}
- Query: func(reqData *helper.RequestData) (url.Values, errors.Error) {
- query := url.Values{}
- query.Set("state", "all")
- query.Set("page", fmt.Sprintf("%v", reqData.Pager.Page))
- query.Set("direction", "asc")
- query.Set("per_page", fmt.Sprintf("%v", reqData.Pager.Size))
+func CollectApiPullRequests(taskCtx plugin.SubTaskContext) errors.Error {
+ // pull requests are Finalizable, they can't be re-open once closed
+ data := taskCtx.GetData().(*GithubTaskData)
+ db := taskCtx.GetDal()
- return query, nil
+ collector, err := helper.NewStatefulApiCollectorForFinalizableEntity(helper.FinalizableApiCollectorArgs{
+ RawDataSubTaskArgs: helper.RawDataSubTaskArgs{
+ Ctx: taskCtx,
+ Params: GithubApiParams{
+ ConnectionId: data.Options.ConnectionId,
+ Name: data.Options.Name,
+ },
+ Table: RAW_PULL_REQUEST_TABLE,
},
-
- GetTotalPages: GetTotalPagesFromResponse,
- ResponseParser: func(res *http.Response) ([]json.RawMessage, errors.Error) {
- var items []json.RawMessage
- err := helper.UnmarshalResponse(res, &items)
- if err != nil {
- return nil, err
- }
- return items, nil
+ ApiClient: data.ApiClient,
+ TimeAfter: data.TimeAfter, // set to nil to disable timeFilter
+ CollectNewRecordsByList: helper.FinalizableApiCollectorListArgs{
+ PageSize: 100,
+ Concurrency: 10,
+ FinalizableApiCollectorCommonArgs: helper.FinalizableApiCollectorCommonArgs{
+ UrlTemplate: "repos/{{ .Params.Name }}/pulls",
+ Query: func(reqData *helper.RequestData, createdAfter *time.Time) (url.Values, errors.Error) {
+ query := url.Values{}
+ query.Set("state", "all")
+ query.Set("direction", "desc")
+ query.Set("page", fmt.Sprintf("%v", reqData.Pager.Page))
+ query.Set("per_page", fmt.Sprintf("%v", reqData.Pager.Size))
+ return query, nil
+ },
+ ResponseParser: func(res *http.Response) ([]json.RawMessage, errors.Error) {
+ var items []json.RawMessage
+ err := helper.UnmarshalResponse(res, &items)
+ if err != nil {
+ return nil, err
+ }
+ return items, nil
+ },
+ },
+ GetCreated: func(item json.RawMessage) (time.Time, errors.Error) {
+ pr := &SimpleGithubApiPr{}
+ err := json.Unmarshal(item, pr)
+ if err != nil {
+ return time.Time{}, errors.BadInput.Wrap(err, "failed to unmarshal github pull request")
+ }
+ return pr.CreatedAt, nil
+ },
+ },
+ CollectUnfinishedDetails: helper.FinalizableApiCollectorDetailArgs{
+ BuildInputIterator: func() (helper.Iterator, errors.Error) {
+ // select pull id from database
+ cursor, err := db.Cursor(
+ dal.Select("github_id"),
+ dal.From(&models.GithubPullRequest{}),
+ dal.Where(
+ "repo_id = ? AND connection_id = ? AND state != 'closed'",
+ data.Options.GithubId, data.Options.ConnectionId,
+ ),
+ )
+ if err != nil {
+ return nil, err
+ }
+ return helper.NewDalCursorIterator(db, cursor, reflect.TypeOf(SimpleGithubPr{}))
+ },
+ FinalizableApiCollectorCommonArgs: helper.FinalizableApiCollectorCommonArgs{
+ UrlTemplate: "repos/{{ .Params.Name }}/pulls/{{ .Input.GithubId }}",
+ ResponseParser: func(res *http.Response) ([]json.RawMessage, errors.Error) {
+ body, err := io.ReadAll(res.Body)
+ if err != nil {
+ return nil, errors.Convert(err)
+ }
+ res.Body.Close()
+ return []json.RawMessage{body}, nil
+ },
+ },
},
})
+
if err != nil {
return err
}
- return collectorWithState.Execute()
+ return collector.Execute()
}
diff --git a/backend/plugins/github_graphql/tasks/check_run_collector.go b/backend/plugins/github_graphql/tasks/check_run_collector.go
index e21db4d63..65a4b4c12 100644
--- a/backend/plugins/github_graphql/tasks/check_run_collector.go
+++ b/backend/plugins/github_graphql/tasks/check_run_collector.go
@@ -208,5 +208,5 @@ func CollectCheckRun(taskCtx plugin.SubTaskContext) errors.Error {
return err
}
- return collectorWithState.ExecuteGraphQL()
+ return collectorWithState.Execute()
}