You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@devlake.apache.org by li...@apache.org on 2023/02/20 07:47:29 UTC
[incubator-devlake] branch main updated: feat: github runs collector supports timeFilter/diffSync (#4442)
This is an automated email from the ASF dual-hosted git repository.
likyh pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git
The following commit(s) were added to refs/heads/main by this push:
new a6ed2ee25 feat: github runs collector supports timeFilter/diffSync (#4442)
a6ed2ee25 is described below
commit a6ed2ee25b1e579f2a004eafa1276053ff2118d8
Author: Klesh Wong <zh...@merico.dev>
AuthorDate: Mon Feb 20 15:47:25 2023 +0800
feat: github runs collector supports timeFilter/diffSync (#4442)
---
backend/plugins/github/tasks/cicd_run_collector.go | 131 ++++++++++++++++++---
1 file changed, 112 insertions(+), 19 deletions(-)
diff --git a/backend/plugins/github/tasks/cicd_run_collector.go b/backend/plugins/github/tasks/cicd_run_collector.go
index 5990f5231..19d6b9d02 100644
--- a/backend/plugins/github/tasks/cicd_run_collector.go
+++ b/backend/plugins/github/tasks/cicd_run_collector.go
@@ -20,16 +20,26 @@ package tasks
import (
"encoding/json"
"fmt"
+ "io"
"net/http"
"net/url"
+ "reflect"
+ "time"
+ "github.com/apache/incubator-devlake/core/dal"
"github.com/apache/incubator-devlake/core/errors"
"github.com/apache/incubator-devlake/core/plugin"
helper "github.com/apache/incubator-devlake/helpers/pluginhelper/api"
+ "github.com/apache/incubator-devlake/plugins/github/models"
)
const RAW_RUN_TABLE = "github_api_runs"
+// Although the API accepts a maximum of 100 entries per page, sometimes
+// the response body is too large which would lead to request failures
+// https://github.com/apache/incubator-devlake/issues/3199
+const PAGE_SIZE = 30
+
var CollectRunsMeta = plugin.SubTaskMeta{
Name: "collectRuns",
EntryPoint: CollectRuns,
@@ -40,7 +50,7 @@ var CollectRunsMeta = plugin.SubTaskMeta{
func CollectRuns(taskCtx plugin.SubTaskContext) errors.Error {
data := taskCtx.GetData().(*GithubTaskData)
- collectorWithState, err := helper.NewApiCollectorWithState(helper.RawDataSubTaskArgs{
+ collectorWithState, err := helper.NewStatefulApiCollector(helper.RawDataSubTaskArgs{
Ctx: taskCtx,
Params: GithubApiParams{
ConnectionId: data.Options.ConnectionId,
@@ -52,37 +62,58 @@ func CollectRuns(taskCtx plugin.SubTaskContext) errors.Error {
return err
}
- //incremental := collectorWithState.IsIncremental()
+ incremental := collectorWithState.IsIncremental()
+
+ // step 1: fetch records created after createdAfter
+ var createdAfter *time.Time
+ if incremental {
+ createdAfter = collectorWithState.LatestState.LatestSuccessStart
+ } else {
+ createdAfter = data.TimeAfter
+ }
+
err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
- ApiClient: data.ApiClient,
- PageSize: 30,
- //Incremental: incremental,
+ ApiClient: data.ApiClient,
+ PageSize: PAGE_SIZE,
+ Incremental: incremental,
UrlTemplate: "repos/{{ .Params.Name }}/actions/runs",
Query: func(reqData *helper.RequestData) (url.Values, errors.Error) {
query := url.Values{}
- // if data.CreatedDateAfter != nil, we set since once
- // There is a bug for github rest api, so temporarily commented the following code
- //if data.CreatedDateAfter != nil {
- // startDate := data.CreatedDateAfter.Format("2006-01-02")
- // query.Set("created", fmt.Sprintf("%s..*", startDate))
- //}
- //// if incremental == true, we overwrite it
- //if incremental {
- // startDate := collectorWithState.LatestState.LatestSuccessStart.Format("2006-01-02")
- // query.Set("created", fmt.Sprintf("%s..*", startDate))
- //}
query.Set("page", fmt.Sprintf("%v", reqData.Pager.Page))
query.Set("per_page", fmt.Sprintf("%v", reqData.Pager.Size))
return query, nil
},
- GetTotalPages: GetTotalPagesFromResponse,
+ // use Undetermined strategy so we can stop fetching further pages by using
+ // ErrFinishCollect
+ Concurrency: 10,
ResponseParser: func(res *http.Response) ([]json.RawMessage, errors.Error) {
body := &GithubRawRunsResult{}
err := helper.UnmarshalResponse(res, body)
if err != nil {
return nil, err
}
- return body.GithubWorkflowRuns, nil
+
+ // time filter or diff sync
+ if createdAfter != nil {
+ // if the first record of the page was created before minCreated, return emtpy set and stop
+ firstRun := &models.GithubRun{}
+ if e := json.Unmarshal(body.GithubWorkflowRuns[0], firstRun); e != nil {
+ return nil, errors.Default.Wrap(e, "failed to unmarshal first run")
+ }
+ if firstRun.GithubCreatedAt.Before(*createdAfter) {
+ return nil, helper.ErrFinishCollect
+ }
+ // if the last record was created before minCreated, return records and stop
+ lastRun := &models.GithubRun{}
+ if e := json.Unmarshal(body.GithubWorkflowRuns[len(body.GithubWorkflowRuns)-1], lastRun); e != nil {
+ return nil, errors.Default.Wrap(e, "failed to unmarshal last run")
+ }
+ if lastRun.GithubCreatedAt.Before(*createdAfter) {
+ err = helper.ErrFinishCollect
+ }
+ }
+
+ return body.GithubWorkflowRuns, err
},
})
@@ -90,10 +121,72 @@ func CollectRuns(taskCtx plugin.SubTaskContext) errors.Error {
return err
}
- return collectorWithState.Execute()
+ err = collectorWithState.Execute()
+ if err != nil {
+ return err
+ }
+
+ // step 2: for incremental collection, we have to update previous collected data which status is unfinished
+ if incremental {
+ // update existing data by collecting unfinished runs prior to LatestState.LatestSuccessStart
+ return collectUnfinishedRuns(taskCtx)
+ }
+ return nil
}
type GithubRawRunsResult struct {
TotalCount int64 `json:"total_count"`
GithubWorkflowRuns []json.RawMessage `json:"workflow_runs"`
}
+
+func collectUnfinishedRuns(taskCtx plugin.SubTaskContext) errors.Error {
+ data := taskCtx.GetData().(*GithubTaskData)
+ db := taskCtx.GetDal()
+
+ // load unfinished runs from the database
+ cursor, err := db.Cursor(
+ dal.Select("id"),
+ dal.From(&models.GithubRun{}),
+ dal.Where(
+ "repo_id = ? AND connection_id = ? AND status IN ('ACTION_REQUIRED', 'STALE', 'IN_PROGRESS', 'QUEUED', 'REQUESTED', 'WAITING', 'PENDING')",
+ data.Options.GithubId, data.Options.ConnectionId,
+ ),
+ )
+ if err != nil {
+ return err
+ }
+ iterator, err := helper.NewDalCursorIterator(db, cursor, reflect.TypeOf(SimpleGithubRun{}))
+ if err != nil {
+ return err
+ }
+
+ // collect details from api
+ collector, err := helper.NewApiCollector(helper.ApiCollectorArgs{
+ RawDataSubTaskArgs: helper.RawDataSubTaskArgs{
+ Ctx: taskCtx,
+ Params: GithubApiParams{
+ ConnectionId: data.Options.ConnectionId,
+ Name: data.Options.Name,
+ },
+ Table: RAW_RUN_TABLE,
+ },
+ ApiClient: data.ApiClient,
+ Input: iterator,
+ Incremental: true,
+ UrlTemplate: "repos/{{ .Params.Name }}/actions/runs/{{ .Input.ID }}",
+ ResponseParser: func(res *http.Response) ([]json.RawMessage, errors.Error) {
+ body, err := io.ReadAll(res.Body)
+ if err != nil {
+ return nil, errors.Convert(err)
+ }
+ res.Body.Close()
+ return []json.RawMessage{body}, nil
+ },
+ AfterResponse: ignoreHTTPStatus404,
+ })
+
+ if err != nil {
+ return err
+ }
+ return collector.Execute()
+}