You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@devlake.apache.org by li...@apache.org on 2023/02/20 07:47:29 UTC

[incubator-devlake] branch main updated: feat: github runs collector supports timeFilter/diffSync (#4442)

This is an automated email from the ASF dual-hosted git repository.

likyh pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git


The following commit(s) were added to refs/heads/main by this push:
     new a6ed2ee25 feat: github runs collector supports timeFilter/diffSync (#4442)
a6ed2ee25 is described below

commit a6ed2ee25b1e579f2a004eafa1276053ff2118d8
Author: Klesh Wong <zh...@merico.dev>
AuthorDate: Mon Feb 20 15:47:25 2023 +0800

    feat: github runs collector supports timeFilter/diffSync (#4442)
---
 backend/plugins/github/tasks/cicd_run_collector.go | 131 ++++++++++++++++++---
 1 file changed, 112 insertions(+), 19 deletions(-)

diff --git a/backend/plugins/github/tasks/cicd_run_collector.go b/backend/plugins/github/tasks/cicd_run_collector.go
index 5990f5231..19d6b9d02 100644
--- a/backend/plugins/github/tasks/cicd_run_collector.go
+++ b/backend/plugins/github/tasks/cicd_run_collector.go
@@ -20,16 +20,26 @@ package tasks
 import (
 	"encoding/json"
 	"fmt"
+	"io"
 	"net/http"
 	"net/url"
+	"reflect"
+	"time"
 
+	"github.com/apache/incubator-devlake/core/dal"
 	"github.com/apache/incubator-devlake/core/errors"
 	"github.com/apache/incubator-devlake/core/plugin"
 	helper "github.com/apache/incubator-devlake/helpers/pluginhelper/api"
+	"github.com/apache/incubator-devlake/plugins/github/models"
 )
 
 const RAW_RUN_TABLE = "github_api_runs"
 
+// Although the API accepts a maximum of 100 entries per page, sometimes
+// the response body is too large which would lead to request failures
+// https://github.com/apache/incubator-devlake/issues/3199
+const PAGE_SIZE = 30
+
 var CollectRunsMeta = plugin.SubTaskMeta{
 	Name:             "collectRuns",
 	EntryPoint:       CollectRuns,
@@ -40,7 +50,7 @@ var CollectRunsMeta = plugin.SubTaskMeta{
 
 func CollectRuns(taskCtx plugin.SubTaskContext) errors.Error {
 	data := taskCtx.GetData().(*GithubTaskData)
-	collectorWithState, err := helper.NewApiCollectorWithState(helper.RawDataSubTaskArgs{
+	collectorWithState, err := helper.NewStatefulApiCollector(helper.RawDataSubTaskArgs{
 		Ctx: taskCtx,
 		Params: GithubApiParams{
 			ConnectionId: data.Options.ConnectionId,
@@ -52,37 +62,58 @@ func CollectRuns(taskCtx plugin.SubTaskContext) errors.Error {
 		return err
 	}
 
-	//incremental := collectorWithState.IsIncremental()
+	incremental := collectorWithState.IsIncremental()
+
+	// step 1: fetch records created after createdAfter
+	var createdAfter *time.Time
+	if incremental {
+		createdAfter = collectorWithState.LatestState.LatestSuccessStart
+	} else {
+		createdAfter = data.TimeAfter
+	}
+
 	err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
-		ApiClient: data.ApiClient,
-		PageSize:  30,
-		//Incremental: incremental,
+		ApiClient:   data.ApiClient,
+		PageSize:    PAGE_SIZE,
+		Incremental: incremental,
 		UrlTemplate: "repos/{{ .Params.Name }}/actions/runs",
 		Query: func(reqData *helper.RequestData) (url.Values, errors.Error) {
 			query := url.Values{}
-			// if data.CreatedDateAfter != nil, we set since once
-			// There is a bug for github rest api, so temporarily commented the following code
-			//if data.CreatedDateAfter != nil {
-			//	startDate := data.CreatedDateAfter.Format("2006-01-02")
-			//	query.Set("created", fmt.Sprintf("%s..*", startDate))
-			//}
-			//// if incremental == true, we overwrite it
-			//if incremental {
-			//	startDate := collectorWithState.LatestState.LatestSuccessStart.Format("2006-01-02")
-			//	query.Set("created", fmt.Sprintf("%s..*", startDate))
-			//}
 			query.Set("page", fmt.Sprintf("%v", reqData.Pager.Page))
 			query.Set("per_page", fmt.Sprintf("%v", reqData.Pager.Size))
 			return query, nil
 		},
-		GetTotalPages: GetTotalPagesFromResponse,
+		// use Undetermined strategy so we can stop fetching further pages by using
+		// ErrFinishCollect
+		Concurrency: 10,
 		ResponseParser: func(res *http.Response) ([]json.RawMessage, errors.Error) {
 			body := &GithubRawRunsResult{}
 			err := helper.UnmarshalResponse(res, body)
 			if err != nil {
 				return nil, err
 			}
-			return body.GithubWorkflowRuns, nil
+
+			// time filter or diff sync
+			if createdAfter != nil {
+				// if the first record of the page was created before minCreated, return emtpy set and stop
+				firstRun := &models.GithubRun{}
+				if e := json.Unmarshal(body.GithubWorkflowRuns[0], firstRun); e != nil {
+					return nil, errors.Default.Wrap(e, "failed to unmarshal first run")
+				}
+				if firstRun.GithubCreatedAt.Before(*createdAfter) {
+					return nil, helper.ErrFinishCollect
+				}
+				// if the last record was created before minCreated, return records and stop
+				lastRun := &models.GithubRun{}
+				if e := json.Unmarshal(body.GithubWorkflowRuns[len(body.GithubWorkflowRuns)-1], lastRun); e != nil {
+					return nil, errors.Default.Wrap(e, "failed to unmarshal last run")
+				}
+				if lastRun.GithubCreatedAt.Before(*createdAfter) {
+					err = helper.ErrFinishCollect
+				}
+			}
+
+			return body.GithubWorkflowRuns, err
 		},
 	})
 
@@ -90,10 +121,72 @@ func CollectRuns(taskCtx plugin.SubTaskContext) errors.Error {
 		return err
 	}
 
-	return collectorWithState.Execute()
+	err = collectorWithState.Execute()
+	if err != nil {
+		return err
+	}
+
+	// step 2: for incremental collection, we have to update previous collected data which status is unfinished
+	if incremental {
+		// update existing data by collecting unfinished runs prior to LatestState.LatestSuccessStart
+		return collectUnfinishedRuns(taskCtx)
+	}
+	return nil
 }
 
 type GithubRawRunsResult struct {
 	TotalCount         int64             `json:"total_count"`
 	GithubWorkflowRuns []json.RawMessage `json:"workflow_runs"`
 }
+
+func collectUnfinishedRuns(taskCtx plugin.SubTaskContext) errors.Error {
+	data := taskCtx.GetData().(*GithubTaskData)
+	db := taskCtx.GetDal()
+
+	// load unfinished runs from the database
+	cursor, err := db.Cursor(
+		dal.Select("id"),
+		dal.From(&models.GithubRun{}),
+		dal.Where(
+			"repo_id = ? AND connection_id = ? AND status IN ('ACTION_REQUIRED', 'STALE', 'IN_PROGRESS', 'QUEUED', 'REQUESTED', 'WAITING', 'PENDING')",
+			data.Options.GithubId, data.Options.ConnectionId,
+		),
+	)
+	if err != nil {
+		return err
+	}
+	iterator, err := helper.NewDalCursorIterator(db, cursor, reflect.TypeOf(SimpleGithubRun{}))
+	if err != nil {
+		return err
+	}
+
+	// collect details from api
+	collector, err := helper.NewApiCollector(helper.ApiCollectorArgs{
+		RawDataSubTaskArgs: helper.RawDataSubTaskArgs{
+			Ctx: taskCtx,
+			Params: GithubApiParams{
+				ConnectionId: data.Options.ConnectionId,
+				Name:         data.Options.Name,
+			},
+			Table: RAW_RUN_TABLE,
+		},
+		ApiClient:   data.ApiClient,
+		Input:       iterator,
+		Incremental: true,
+		UrlTemplate: "repos/{{ .Params.Name }}/actions/runs/{{ .Input.ID }}",
+		ResponseParser: func(res *http.Response) ([]json.RawMessage, errors.Error) {
+			body, err := io.ReadAll(res.Body)
+			if err != nil {
+				return nil, errors.Convert(err)
+			}
+			res.Body.Close()
+			return []json.RawMessage{body}, nil
+		},
+		AfterResponse: ignoreHTTPStatus404,
+	})
+
+	if err != nil {
+		return err
+	}
+	return collector.Execute()
+}