You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@devlake.apache.org by kl...@apache.org on 2023/02/23 09:54:12 UTC

[incubator-devlake] branch main updated: feat: add increment support for graphql collector (#4491)

This is an automated email from the ASF dual-hosted git repository.

klesh pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git


The following commit(s) were added to refs/heads/main by this push:
     new dc50e92ba feat: add increment support for graphql collector (#4491)
dc50e92ba is described below

commit dc50e92bac0a4ec84ec50df3f64283f30a0700f9
Author: Likyh <ya...@meri.co>
AuthorDate: Thu Feb 23 17:54:07 2023 +0800

    feat: add increment support for graphql collector (#4491)
    
    * feat: add increment support for graphql collector
    
    * refactor: change collector name from CollectCheckRun to CollectGraphqlJobs
    
    * fix: fix for review
    
    * fix: use filter at to support increment for issue; add add some comment for pr
---
 .../helpers/pluginhelper/api/graphql_collector.go  | 153 ++++++++++++++++-----
 backend/plugins/github_graphql/impl/impl.go        |   2 +-
 .../github_graphql/tasks/issue_collector.go        |  39 ++++--
 .../{check_run_collector.go => job_collector.go}   |  26 ++--
 .../plugins/github_graphql/tasks/pr_collector.go   |  43 +++---
 5 files changed, 186 insertions(+), 77 deletions(-)

diff --git a/backend/helpers/pluginhelper/api/graphql_collector.go b/backend/helpers/pluginhelper/api/graphql_collector.go
index e1753c90f..de364cfd6 100644
--- a/backend/helpers/pluginhelper/api/graphql_collector.go
+++ b/backend/helpers/pluginhelper/api/graphql_collector.go
@@ -27,6 +27,7 @@ import (
 	"github.com/merico-dev/graphql"
 	"net/http"
 	"reflect"
+	"time"
 )
 
 // CursorPager contains pagination information for a graphql request
@@ -49,6 +50,10 @@ type GraphqlQueryPageInfo struct {
 	HasNextPage bool   `json:"hasNextPage"`
 }
 
+// DateTime is the type of time in Graphql
+// graphql lib can only read this name...
+type DateTime struct{ time.Time }
+
 // GraphqlAsyncResponseHandler callback function to handle the Response asynchronously
 type GraphqlAsyncResponseHandler func(res *http.Response) error
 
@@ -203,6 +208,11 @@ func (collector *GraphqlCollector) exec(divider *BatchSaveDivider, input interfa
 		SkipCursor: nil,
 		Size:       collector.args.PageSize,
 	}
+	err = collector.ExtractExistRawData(divider, reqData)
+	if err != nil {
+		collector.checkError(err)
+		return
+	}
 	if collector.args.GetPageInfo != nil {
 		collector.fetchOneByOne(divider, reqData)
 	} else {
@@ -210,7 +220,7 @@ func (collector *GraphqlCollector) exec(divider *BatchSaveDivider, input interfa
 	}
 }
 
-// fetchPagesDetermined fetches data of all pages for APIs that return paging information
+// fetchOneByOne fetches data of all pages for APIs that return paging information
 func (collector *GraphqlCollector) fetchOneByOne(divider *BatchSaveDivider, reqData *GraphqlRequestData) {
 	// fetch first page
 	var fetchNextPage func(query interface{}) errors.Error
@@ -241,6 +251,110 @@ func (collector *GraphqlCollector) fetchOneByOne(divider *BatchSaveDivider, reqD
 	collector.fetchAsync(divider, reqData, fetchNextPage)
 }
 
+// BatchSaveWithOrigin save the results and fill raw data origin for them
+func (collector *GraphqlCollector) BatchSaveWithOrigin(divider *BatchSaveDivider, results []interface{}, row *RawData) errors.Error {
+	// batch save divider
+	RAW_DATA_ORIGIN := "RawDataOrigin"
+	for _, result := range results {
+		// get the batch operator for the specific type
+		batch, err := divider.ForType(reflect.TypeOf(result))
+		if err != nil {
+			return err
+		}
+		// set raw data origin field
+		origin := reflect.ValueOf(result).Elem().FieldByName(RAW_DATA_ORIGIN)
+		if origin.IsValid() && origin.IsZero() {
+			origin.Set(reflect.ValueOf(common.RawDataOrigin{
+				RawDataTable:  collector.table,
+				RawDataId:     row.ID,
+				RawDataParams: row.Params,
+			}))
+		}
+		// records get saved into db when slots were max outed
+		err = batch.Add(result)
+		if err != nil {
+			return errors.Default.Wrap(err, "error adding result to batch")
+		}
+	}
+	return nil
+}
+
+// ExtractExistRawData will extract data from existing data from raw layer if increment
+func (collector *GraphqlCollector) ExtractExistRawData(divider *BatchSaveDivider, reqData *GraphqlRequestData) errors.Error {
+	// load data from database
+	db := collector.args.Ctx.GetDal()
+	logger := collector.args.Ctx.GetLogger()
+
+	clauses := []dal.Clause{
+		dal.From(collector.table),
+		dal.Where("params = ?", collector.params),
+		dal.Orderby("id ASC"),
+	}
+
+	count, err := db.Count(clauses...)
+	if err != nil {
+		return errors.Default.Wrap(err, "error getting count of clauses")
+	}
+	cursor, err := db.Cursor(clauses...)
+	if err != nil {
+		return errors.Default.Wrap(err, "error running DB query")
+	}
+	logger.Info("get data from %s where params=%s and got %d", collector.table, collector.params, count)
+	defer cursor.Close()
+	row := &RawData{}
+
+	// get the type of query and variables
+	query, variables, _ := collector.args.BuildQuery(reqData)
+
+	// prgress
+	collector.args.Ctx.SetProgress(0, -1)
+	ctx := collector.args.Ctx.GetContext()
+	// iterate all rows
+	for cursor.Next() {
+		select {
+		case <-ctx.Done():
+			return errors.Convert(ctx.Err())
+		default:
+		}
+		err = db.Fetch(cursor, row)
+		if err != nil {
+			return errors.Default.Wrap(err, "error fetching row")
+		}
+
+		err = errors.Convert(json.Unmarshal(row.Data, &query))
+		if err != nil {
+			return errors.Default.Wrap(err, `graphql collector unmarshal query failed`)
+		}
+		err = errors.Convert(json.Unmarshal(row.Input, &variables))
+		if err != nil {
+			return errors.Default.Wrap(err, `variables in graphql query can not unmarshal from json`)
+		}
+
+		var results []interface{}
+		if collector.args.ResponseParserWithDataErrors != nil {
+			results, err = errors.Convert01(collector.args.ResponseParserWithDataErrors(query, variables, nil))
+		} else {
+			results, err = errors.Convert01(collector.args.ResponseParser(query, variables))
+		}
+		if err != nil {
+			if errors.Is(err, ErrFinishCollect) {
+				logger.Info("existing data parser return ErrFinishCollect, but skip. rawId: #%d", row.ID)
+			} else {
+				return errors.Default.Wrap(err, "error calling plugin Extract implementation")
+			}
+		}
+		err = collector.BatchSaveWithOrigin(divider, results, row)
+		if err != nil {
+			return err
+		}
+
+		collector.args.Ctx.IncProgress(1)
+	}
+
+	// save the last batches
+	return divider.Close()
+}
+
 func (collector *GraphqlCollector) fetchAsync(divider *BatchSaveDivider, reqData *GraphqlRequestData, handler func(query interface{}) errors.Error) {
 	if reqData.Pager == nil {
 		reqData.Pager = &CursorPager{
@@ -300,10 +414,8 @@ func (collector *GraphqlCollector) fetchAsync(divider *BatchSaveDivider, reqData
 		return
 	}
 
-	var (
-		results []interface{}
-	)
-	if len(dataErrors) > 0 || collector.args.ResponseParser == nil {
+	var results []interface{}
+	if collector.args.ResponseParserWithDataErrors != nil {
 		results, err = collector.args.ResponseParserWithDataErrors(query, variables, dataErrors)
 	} else {
 		results, err = collector.args.ResponseParser(query, variables)
@@ -317,33 +429,12 @@ func (collector *GraphqlCollector) fetchAsync(divider *BatchSaveDivider, reqData
 			return
 		}
 	}
-
-	RAW_DATA_ORIGIN := "RawDataOrigin"
-	// batch save divider
-	for _, result := range results {
-		// get the batch operator for the specific type
-		batch, err := divider.ForType(reflect.TypeOf(result))
-		if err != nil {
-			collector.checkError(err)
-			return
-		}
-		// set raw data origin field
-		origin := reflect.ValueOf(result).Elem().FieldByName(RAW_DATA_ORIGIN)
-		if origin.IsValid() {
-			origin.Set(reflect.ValueOf(common.RawDataOrigin{
-				RawDataTable:  collector.table,
-				RawDataId:     row.ID,
-				RawDataParams: row.Params,
-			}))
-		}
-		// records get saved into db when slots were max outed
-		err = batch.Add(result)
-		if err != nil {
-			collector.checkError(err)
-			return
-		}
-		collector.args.Ctx.IncProgress(1)
+	err = collector.BatchSaveWithOrigin(divider, results, row)
+	if err != nil {
+		collector.checkError(err)
+		return
 	}
+
 	collector.args.Ctx.IncProgress(1)
 	if handler != nil {
 		// trigger next fetch, but return if ErrFinishCollect got from ResponseParser
diff --git a/backend/plugins/github_graphql/impl/impl.go b/backend/plugins/github_graphql/impl/impl.go
index abc675989..1602a0514 100644
--- a/backend/plugins/github_graphql/impl/impl.go
+++ b/backend/plugins/github_graphql/impl/impl.go
@@ -83,7 +83,7 @@ func (p GithubGraphql) SubTaskMetas() []plugin.SubTaskMeta {
 		// collect workflow run & job
 		githubTasks.CollectRunsMeta,
 		githubTasks.ExtractRunsMeta,
-		tasks.CollectCheckRunMeta,
+		tasks.CollectGraphqlJobsMeta,
 
 		// collect others
 		githubTasks.CollectApiCommentsMeta,
diff --git a/backend/plugins/github_graphql/tasks/issue_collector.go b/backend/plugins/github_graphql/tasks/issue_collector.go
index 7fac6690e..5cddf35d3 100644
--- a/backend/plugins/github_graphql/tasks/issue_collector.go
+++ b/backend/plugins/github_graphql/tasks/issue_collector.go
@@ -42,7 +42,7 @@ type GraphqlQueryIssueWrapper struct {
 			TotalCount graphql.Int
 			Issues     []GraphqlQueryIssue `graphql:"nodes"`
 			PageInfo   *helper.GraphqlQueryPageInfo
-		} `graphql:"issues(first: $pageSize, after: $skipCursor, orderBy: {field: CREATED_AT, direction: DESC})"`
+		} `graphql:"issues(first: $pageSize, after: $skipCursor, orderBy: {field: CREATED_AT, direction: DESC}, filterBy: {since: $since})"`
 	} `graphql:"repository(owner: $owner, name: $name)"`
 }
 
@@ -97,21 +97,35 @@ func CollectIssue(taskCtx plugin.SubTaskContext) errors.Error {
 		return nil
 	}
 
-	collector, err := helper.NewGraphqlCollector(helper.GraphqlCollectorArgs{
-		RawDataSubTaskArgs: helper.RawDataSubTaskArgs{
-			Ctx: taskCtx,
-			Params: githubTasks.GithubApiParams{
-				ConnectionId: data.Options.ConnectionId,
-				Name:         data.Options.Name,
-			},
-			Table: RAW_ISSUES_TABLE,
+	collectorWithState, err := helper.NewStatefulApiCollector(helper.RawDataSubTaskArgs{
+		Ctx: taskCtx,
+		Params: githubTasks.GithubApiParams{
+			ConnectionId: data.Options.ConnectionId,
+			Name:         data.Options.Name,
 		},
+		Table: RAW_ISSUES_TABLE,
+	}, data.TimeAfter)
+	if err != nil {
+		return err
+	}
+
+	incremental := collectorWithState.IsIncremental()
+
+	err = collectorWithState.InitGraphQLCollector(helper.GraphqlCollectorArgs{
 		GraphqlClient: data.GraphqlClient,
 		PageSize:      100,
+		Incremental:   incremental,
 		BuildQuery: func(reqData *helper.GraphqlRequestData) (interface{}, map[string]interface{}, error) {
+			since := helper.DateTime{}
+			if incremental {
+				since = helper.DateTime{Time: *collectorWithState.LatestState.LatestSuccessStart}
+			} else if collectorWithState.TimeAfter != nil {
+				since = helper.DateTime{Time: *collectorWithState.TimeAfter}
+			}
 			query := &GraphqlQueryIssueWrapper{}
 			ownerName := strings.Split(data.Options.Name, "/")
 			variables := map[string]interface{}{
+				"since":      since,
 				"pageSize":   graphql.Int(reqData.Pager.Size),
 				"skipCursor": (*graphql.String)(reqData.Pager.SkipCursor),
 				"owner":      graphql.String(ownerName[0]),
@@ -130,10 +144,6 @@ func CollectIssue(taskCtx plugin.SubTaskContext) errors.Error {
 			results := make([]interface{}, 0, 1)
 			isFinish := false
 			for _, issue := range issues {
-				if data.TimeAfter != nil && !data.TimeAfter.Before(issue.CreatedAt) {
-					isFinish = true
-					break
-				}
 				githubIssue, err := convertGithubIssue(milestoneMap, issue, data.Options.ConnectionId, data.Options.GithubId)
 				if err != nil {
 					return nil, err
@@ -166,12 +176,11 @@ func CollectIssue(taskCtx plugin.SubTaskContext) errors.Error {
 			}
 		},
 	})
-
 	if err != nil {
 		return err
 	}
 
-	return collector.Execute()
+	return collectorWithState.Execute()
 }
 
 // create a milestone map for numberId to databaseId
diff --git a/backend/plugins/github_graphql/tasks/check_run_collector.go b/backend/plugins/github_graphql/tasks/job_collector.go
similarity index 90%
rename from backend/plugins/github_graphql/tasks/check_run_collector.go
rename to backend/plugins/github_graphql/tasks/job_collector.go
index 65a4b4c12..9374a5595 100644
--- a/backend/plugins/github_graphql/tasks/check_run_collector.go
+++ b/backend/plugins/github_graphql/tasks/job_collector.go
@@ -32,7 +32,7 @@ import (
 	"github.com/merico-dev/graphql"
 )
 
-const RAW_CHECK_RUNS_TABLE = "github_graphql_check_runs"
+const RAW_GRAPHQL_JOBS_TABLE = "github_graphql_jobs"
 
 type GraphqlQueryCheckRunWrapper struct {
 	RateLimit struct {
@@ -42,12 +42,14 @@ type GraphqlQueryCheckRunWrapper struct {
 }
 
 type GraphqlQueryCheckSuite struct {
-	Id         string
-	Typename   string `graphql:"__typename"`
+	Id       string
+	Typename string `graphql:"__typename"`
+	// equal to Run in rest
 	CheckSuite struct {
 		WorkflowRun struct {
 			DatabaseId int
 		}
+		// equal to Job in rest
 		CheckRuns struct {
 			TotalCount int
 			Nodes      []struct {
@@ -86,28 +88,28 @@ type SimpleWorkflowRun struct {
 	CheckSuiteNodeID string
 }
 
-var CollectCheckRunMeta = plugin.SubTaskMeta{
-	Name:             "CollectCheckRun",
-	EntryPoint:       CollectCheckRun,
+var CollectGraphqlJobsMeta = plugin.SubTaskMeta{
+	Name:             "CollectGraphqlJobs",
+	EntryPoint:       CollectGraphqlJobs,
 	EnabledByDefault: true,
-	Description:      "Collect CheckRun data from GithubGraphql api",
+	Description:      "Collect Jobs(CheckRun) data from GithubGraphql api",
 	DomainTypes:      []string{plugin.DOMAIN_TYPE_CICD},
 }
 
 var _ plugin.SubTaskEntryPoint = CollectAccount
 
-func CollectCheckRun(taskCtx plugin.SubTaskContext) errors.Error {
+func CollectGraphqlJobs(taskCtx plugin.SubTaskContext) errors.Error {
 	logger := taskCtx.GetLogger()
 	db := taskCtx.GetDal()
 	data := taskCtx.GetData().(*githubTasks.GithubTaskData)
 
-	collectorWithState, err := helper.NewApiCollectorWithState(helper.RawDataSubTaskArgs{
+	collectorWithState, err := helper.NewStatefulApiCollector(helper.RawDataSubTaskArgs{
 		Ctx: taskCtx,
 		Params: githubTasks.GithubApiParams{
 			ConnectionId: data.Options.ConnectionId,
 			Name:         data.Options.Name,
 		},
-		Table: RAW_CHECK_RUNS_TABLE,
+		Table: RAW_GRAPHQL_JOBS_TABLE,
 	}, data.TimeAfter)
 	if err != nil {
 		return err
@@ -121,9 +123,6 @@ func CollectCheckRun(taskCtx plugin.SubTaskContext) errors.Error {
 		dal.Where("repo_id = ? and connection_id=?", data.Options.GithubId, data.Options.ConnectionId),
 		dal.Orderby("github_updated_at DESC"),
 	}
-	if collectorWithState.TimeAfter != nil {
-		clauses = append(clauses, dal.Where("github_created_at > ?", *collectorWithState.TimeAfter))
-	}
 	if incremental {
 		clauses = append(clauses, dal.Where("github_updated_at > ?", *collectorWithState.LatestState.LatestSuccessStart))
 	}
@@ -203,7 +202,6 @@ func CollectCheckRun(taskCtx plugin.SubTaskContext) errors.Error {
 			return results, nil
 		},
 	})
-
 	if err != nil {
 		return err
 	}
diff --git a/backend/plugins/github_graphql/tasks/pr_collector.go b/backend/plugins/github_graphql/tasks/pr_collector.go
index b4b67d81d..a71ea1c2e 100644
--- a/backend/plugins/github_graphql/tasks/pr_collector.go
+++ b/backend/plugins/github_graphql/tasks/pr_collector.go
@@ -36,12 +36,16 @@ type GraphqlQueryPrWrapper struct {
 	RateLimit struct {
 		Cost int
 	}
+	// now it orderBy UPDATED_AT and use cursor pagination
+	// It may miss some PRs updated when collection.
+	// Because these missed PRs will be collected on next, But it's not enough.
+	// So Next Millstone(0.17) we should change it to filter by CREATE_AT + collect detail
 	Repository struct {
 		PullRequests struct {
 			PageInfo   *api.GraphqlQueryPageInfo
 			Prs        []GraphqlQueryPr `graphql:"nodes"`
 			TotalCount graphql.Int
-		} `graphql:"pullRequests(first: $pageSize, after: $skipCursor, orderBy: {field: CREATED_AT, direction: DESC})"`
+		} `graphql:"pullRequests(first: $pageSize, after: $skipCursor, orderBy: {field: UPDATED_AT, direction: DESC})"`
 	} `graphql:"repository(owner: $owner, name: $name)"`
 }
 
@@ -130,31 +134,38 @@ func CollectPr(taskCtx plugin.SubTaskContext) errors.Error {
 	config := data.Options.GithubTransformationRule
 	var labelTypeRegex *regexp.Regexp
 	var labelComponentRegex *regexp.Regexp
-	var err error
+	var err errors.Error
 	if config != nil && len(config.PrType) > 0 {
-		labelTypeRegex, err = regexp.Compile(config.PrType)
+		labelTypeRegex, err = errors.Convert01(regexp.Compile(config.PrType))
 		if err != nil {
 			return errors.Default.Wrap(err, "regexp Compile prType failed")
 		}
 	}
 	if config != nil && len(config.PrComponent) > 0 {
-		labelComponentRegex, err = regexp.Compile(config.PrComponent)
+		labelComponentRegex, err = errors.Convert01(regexp.Compile(config.PrComponent))
 		if err != nil {
 			return errors.Default.Wrap(err, "regexp Compile prComponent failed")
 		}
 	}
 
-	collector, err := api.NewGraphqlCollector(api.GraphqlCollectorArgs{
-		RawDataSubTaskArgs: api.RawDataSubTaskArgs{
-			Ctx: taskCtx,
-			Params: tasks.GithubApiParams{
-				ConnectionId: data.Options.ConnectionId,
-				Name:         data.Options.Name,
-			},
-			Table: RAW_PRS_TABLE,
+	collectorWithState, err := api.NewStatefulApiCollector(api.RawDataSubTaskArgs{
+		Ctx: taskCtx,
+		Params: tasks.GithubApiParams{
+			ConnectionId: data.Options.ConnectionId,
+			Name:         data.Options.Name,
 		},
+		Table: RAW_PRS_TABLE,
+	}, data.TimeAfter)
+	if err != nil {
+		return err
+	}
+
+	incremental := collectorWithState.IsIncremental()
+
+	err = collectorWithState.InitGraphQLCollector(api.GraphqlCollectorArgs{
 		GraphqlClient: data.GraphqlClient,
 		PageSize:      30,
+		Incremental:   incremental,
 		/*
 			(Optional) Return query string for request, or you can plug them into UrlTemplate directly
 		*/
@@ -180,7 +191,8 @@ func CollectPr(taskCtx plugin.SubTaskContext) errors.Error {
 			results := make([]interface{}, 0, 1)
 			isFinish := false
 			for _, rawL := range prs {
-				if data.TimeAfter != nil && !data.TimeAfter.Before(rawL.CreatedAt) {
+				// collect all data even though in increment mode because of existing data extracting
+				if collectorWithState.TimeAfter != nil && !collectorWithState.TimeAfter.Before(rawL.UpdatedAt) {
 					isFinish = true
 					break
 				}
@@ -291,12 +303,11 @@ func CollectPr(taskCtx plugin.SubTaskContext) errors.Error {
 			}
 		},
 	})
-
 	if err != nil {
-		return errors.Convert(err)
+		return err
 	}
 
-	return collector.Execute()
+	return collectorWithState.Execute()
 }
 
 func convertGithubPullRequest(pull GraphqlQueryPr, connId uint64, repoId int) (*models.GithubPullRequest, errors.Error) {