You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@devlake.apache.org by kl...@apache.org on 2023/02/23 09:54:12 UTC
[incubator-devlake] branch main updated: feat: add increment support for graphql collector (#4491)
This is an automated email from the ASF dual-hosted git repository.
klesh pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git
The following commit(s) were added to refs/heads/main by this push:
new dc50e92ba feat: add increment support for graphql collector (#4491)
dc50e92ba is described below
commit dc50e92bac0a4ec84ec50df3f64283f30a0700f9
Author: Likyh <ya...@meri.co>
AuthorDate: Thu Feb 23 17:54:07 2023 +0800
feat: add increment support for graphql collector (#4491)
* feat: add increment support for graphql collector
* refactor: change collector name from CollectCheckRun to CollectGraphqlJobs
* fix: fix for review
* fix: use filter at to support increment for issue; add add some comment for pr
---
.../helpers/pluginhelper/api/graphql_collector.go | 153 ++++++++++++++++-----
backend/plugins/github_graphql/impl/impl.go | 2 +-
.../github_graphql/tasks/issue_collector.go | 39 ++++--
.../{check_run_collector.go => job_collector.go} | 26 ++--
.../plugins/github_graphql/tasks/pr_collector.go | 43 +++---
5 files changed, 186 insertions(+), 77 deletions(-)
diff --git a/backend/helpers/pluginhelper/api/graphql_collector.go b/backend/helpers/pluginhelper/api/graphql_collector.go
index e1753c90f..de364cfd6 100644
--- a/backend/helpers/pluginhelper/api/graphql_collector.go
+++ b/backend/helpers/pluginhelper/api/graphql_collector.go
@@ -27,6 +27,7 @@ import (
"github.com/merico-dev/graphql"
"net/http"
"reflect"
+ "time"
)
// CursorPager contains pagination information for a graphql request
@@ -49,6 +50,10 @@ type GraphqlQueryPageInfo struct {
HasNextPage bool `json:"hasNextPage"`
}
+// DateTime is the type of time in Graphql
+// graphql lib can only read this name...
+type DateTime struct{ time.Time }
+
// GraphqlAsyncResponseHandler callback function to handle the Response asynchronously
type GraphqlAsyncResponseHandler func(res *http.Response) error
@@ -203,6 +208,11 @@ func (collector *GraphqlCollector) exec(divider *BatchSaveDivider, input interfa
SkipCursor: nil,
Size: collector.args.PageSize,
}
+ err = collector.ExtractExistRawData(divider, reqData)
+ if err != nil {
+ collector.checkError(err)
+ return
+ }
if collector.args.GetPageInfo != nil {
collector.fetchOneByOne(divider, reqData)
} else {
@@ -210,7 +220,7 @@ func (collector *GraphqlCollector) exec(divider *BatchSaveDivider, input interfa
}
}
-// fetchPagesDetermined fetches data of all pages for APIs that return paging information
+// fetchOneByOne fetches data of all pages for APIs that return paging information
func (collector *GraphqlCollector) fetchOneByOne(divider *BatchSaveDivider, reqData *GraphqlRequestData) {
// fetch first page
var fetchNextPage func(query interface{}) errors.Error
@@ -241,6 +251,110 @@ func (collector *GraphqlCollector) fetchOneByOne(divider *BatchSaveDivider, reqD
collector.fetchAsync(divider, reqData, fetchNextPage)
}
+// BatchSaveWithOrigin save the results and fill raw data origin for them
+func (collector *GraphqlCollector) BatchSaveWithOrigin(divider *BatchSaveDivider, results []interface{}, row *RawData) errors.Error {
+ // batch save divider
+ RAW_DATA_ORIGIN := "RawDataOrigin"
+ for _, result := range results {
+ // get the batch operator for the specific type
+ batch, err := divider.ForType(reflect.TypeOf(result))
+ if err != nil {
+ return err
+ }
+ // set raw data origin field
+ origin := reflect.ValueOf(result).Elem().FieldByName(RAW_DATA_ORIGIN)
+ if origin.IsValid() && origin.IsZero() {
+ origin.Set(reflect.ValueOf(common.RawDataOrigin{
+ RawDataTable: collector.table,
+ RawDataId: row.ID,
+ RawDataParams: row.Params,
+ }))
+ }
+ // records get saved into db when slots were max outed
+ err = batch.Add(result)
+ if err != nil {
+ return errors.Default.Wrap(err, "error adding result to batch")
+ }
+ }
+ return nil
+}
+
+// ExtractExistRawData will extract data from existing data from raw layer if increment
+func (collector *GraphqlCollector) ExtractExistRawData(divider *BatchSaveDivider, reqData *GraphqlRequestData) errors.Error {
+ // load data from database
+ db := collector.args.Ctx.GetDal()
+ logger := collector.args.Ctx.GetLogger()
+
+ clauses := []dal.Clause{
+ dal.From(collector.table),
+ dal.Where("params = ?", collector.params),
+ dal.Orderby("id ASC"),
+ }
+
+ count, err := db.Count(clauses...)
+ if err != nil {
+ return errors.Default.Wrap(err, "error getting count of clauses")
+ }
+ cursor, err := db.Cursor(clauses...)
+ if err != nil {
+ return errors.Default.Wrap(err, "error running DB query")
+ }
+ logger.Info("get data from %s where params=%s and got %d", collector.table, collector.params, count)
+ defer cursor.Close()
+ row := &RawData{}
+
+ // get the type of query and variables
+ query, variables, _ := collector.args.BuildQuery(reqData)
+
+ // prgress
+ collector.args.Ctx.SetProgress(0, -1)
+ ctx := collector.args.Ctx.GetContext()
+ // iterate all rows
+ for cursor.Next() {
+ select {
+ case <-ctx.Done():
+ return errors.Convert(ctx.Err())
+ default:
+ }
+ err = db.Fetch(cursor, row)
+ if err != nil {
+ return errors.Default.Wrap(err, "error fetching row")
+ }
+
+ err = errors.Convert(json.Unmarshal(row.Data, &query))
+ if err != nil {
+ return errors.Default.Wrap(err, `graphql collector unmarshal query failed`)
+ }
+ err = errors.Convert(json.Unmarshal(row.Input, &variables))
+ if err != nil {
+ return errors.Default.Wrap(err, `variables in graphql query can not unmarshal from json`)
+ }
+
+ var results []interface{}
+ if collector.args.ResponseParserWithDataErrors != nil {
+ results, err = errors.Convert01(collector.args.ResponseParserWithDataErrors(query, variables, nil))
+ } else {
+ results, err = errors.Convert01(collector.args.ResponseParser(query, variables))
+ }
+ if err != nil {
+ if errors.Is(err, ErrFinishCollect) {
+ logger.Info("existing data parser return ErrFinishCollect, but skip. rawId: #%d", row.ID)
+ } else {
+ return errors.Default.Wrap(err, "error calling plugin Extract implementation")
+ }
+ }
+ err = collector.BatchSaveWithOrigin(divider, results, row)
+ if err != nil {
+ return err
+ }
+
+ collector.args.Ctx.IncProgress(1)
+ }
+
+ // save the last batches
+ return divider.Close()
+}
+
func (collector *GraphqlCollector) fetchAsync(divider *BatchSaveDivider, reqData *GraphqlRequestData, handler func(query interface{}) errors.Error) {
if reqData.Pager == nil {
reqData.Pager = &CursorPager{
@@ -300,10 +414,8 @@ func (collector *GraphqlCollector) fetchAsync(divider *BatchSaveDivider, reqData
return
}
- var (
- results []interface{}
- )
- if len(dataErrors) > 0 || collector.args.ResponseParser == nil {
+ var results []interface{}
+ if collector.args.ResponseParserWithDataErrors != nil {
results, err = collector.args.ResponseParserWithDataErrors(query, variables, dataErrors)
} else {
results, err = collector.args.ResponseParser(query, variables)
@@ -317,33 +429,12 @@ func (collector *GraphqlCollector) fetchAsync(divider *BatchSaveDivider, reqData
return
}
}
-
- RAW_DATA_ORIGIN := "RawDataOrigin"
- // batch save divider
- for _, result := range results {
- // get the batch operator for the specific type
- batch, err := divider.ForType(reflect.TypeOf(result))
- if err != nil {
- collector.checkError(err)
- return
- }
- // set raw data origin field
- origin := reflect.ValueOf(result).Elem().FieldByName(RAW_DATA_ORIGIN)
- if origin.IsValid() {
- origin.Set(reflect.ValueOf(common.RawDataOrigin{
- RawDataTable: collector.table,
- RawDataId: row.ID,
- RawDataParams: row.Params,
- }))
- }
- // records get saved into db when slots were max outed
- err = batch.Add(result)
- if err != nil {
- collector.checkError(err)
- return
- }
- collector.args.Ctx.IncProgress(1)
+ err = collector.BatchSaveWithOrigin(divider, results, row)
+ if err != nil {
+ collector.checkError(err)
+ return
}
+
collector.args.Ctx.IncProgress(1)
if handler != nil {
// trigger next fetch, but return if ErrFinishCollect got from ResponseParser
diff --git a/backend/plugins/github_graphql/impl/impl.go b/backend/plugins/github_graphql/impl/impl.go
index abc675989..1602a0514 100644
--- a/backend/plugins/github_graphql/impl/impl.go
+++ b/backend/plugins/github_graphql/impl/impl.go
@@ -83,7 +83,7 @@ func (p GithubGraphql) SubTaskMetas() []plugin.SubTaskMeta {
// collect workflow run & job
githubTasks.CollectRunsMeta,
githubTasks.ExtractRunsMeta,
- tasks.CollectCheckRunMeta,
+ tasks.CollectGraphqlJobsMeta,
// collect others
githubTasks.CollectApiCommentsMeta,
diff --git a/backend/plugins/github_graphql/tasks/issue_collector.go b/backend/plugins/github_graphql/tasks/issue_collector.go
index 7fac6690e..5cddf35d3 100644
--- a/backend/plugins/github_graphql/tasks/issue_collector.go
+++ b/backend/plugins/github_graphql/tasks/issue_collector.go
@@ -42,7 +42,7 @@ type GraphqlQueryIssueWrapper struct {
TotalCount graphql.Int
Issues []GraphqlQueryIssue `graphql:"nodes"`
PageInfo *helper.GraphqlQueryPageInfo
- } `graphql:"issues(first: $pageSize, after: $skipCursor, orderBy: {field: CREATED_AT, direction: DESC})"`
+ } `graphql:"issues(first: $pageSize, after: $skipCursor, orderBy: {field: CREATED_AT, direction: DESC}, filterBy: {since: $since})"`
} `graphql:"repository(owner: $owner, name: $name)"`
}
@@ -97,21 +97,35 @@ func CollectIssue(taskCtx plugin.SubTaskContext) errors.Error {
return nil
}
- collector, err := helper.NewGraphqlCollector(helper.GraphqlCollectorArgs{
- RawDataSubTaskArgs: helper.RawDataSubTaskArgs{
- Ctx: taskCtx,
- Params: githubTasks.GithubApiParams{
- ConnectionId: data.Options.ConnectionId,
- Name: data.Options.Name,
- },
- Table: RAW_ISSUES_TABLE,
+ collectorWithState, err := helper.NewStatefulApiCollector(helper.RawDataSubTaskArgs{
+ Ctx: taskCtx,
+ Params: githubTasks.GithubApiParams{
+ ConnectionId: data.Options.ConnectionId,
+ Name: data.Options.Name,
},
+ Table: RAW_ISSUES_TABLE,
+ }, data.TimeAfter)
+ if err != nil {
+ return err
+ }
+
+ incremental := collectorWithState.IsIncremental()
+
+ err = collectorWithState.InitGraphQLCollector(helper.GraphqlCollectorArgs{
GraphqlClient: data.GraphqlClient,
PageSize: 100,
+ Incremental: incremental,
BuildQuery: func(reqData *helper.GraphqlRequestData) (interface{}, map[string]interface{}, error) {
+ since := helper.DateTime{}
+ if incremental {
+ since = helper.DateTime{Time: *collectorWithState.LatestState.LatestSuccessStart}
+ } else if collectorWithState.TimeAfter != nil {
+ since = helper.DateTime{Time: *collectorWithState.TimeAfter}
+ }
query := &GraphqlQueryIssueWrapper{}
ownerName := strings.Split(data.Options.Name, "/")
variables := map[string]interface{}{
+ "since": since,
"pageSize": graphql.Int(reqData.Pager.Size),
"skipCursor": (*graphql.String)(reqData.Pager.SkipCursor),
"owner": graphql.String(ownerName[0]),
@@ -130,10 +144,6 @@ func CollectIssue(taskCtx plugin.SubTaskContext) errors.Error {
results := make([]interface{}, 0, 1)
isFinish := false
for _, issue := range issues {
- if data.TimeAfter != nil && !data.TimeAfter.Before(issue.CreatedAt) {
- isFinish = true
- break
- }
githubIssue, err := convertGithubIssue(milestoneMap, issue, data.Options.ConnectionId, data.Options.GithubId)
if err != nil {
return nil, err
@@ -166,12 +176,11 @@ func CollectIssue(taskCtx plugin.SubTaskContext) errors.Error {
}
},
})
-
if err != nil {
return err
}
- return collector.Execute()
+ return collectorWithState.Execute()
}
// create a milestone map for numberId to databaseId
diff --git a/backend/plugins/github_graphql/tasks/check_run_collector.go b/backend/plugins/github_graphql/tasks/job_collector.go
similarity index 90%
rename from backend/plugins/github_graphql/tasks/check_run_collector.go
rename to backend/plugins/github_graphql/tasks/job_collector.go
index 65a4b4c12..9374a5595 100644
--- a/backend/plugins/github_graphql/tasks/check_run_collector.go
+++ b/backend/plugins/github_graphql/tasks/job_collector.go
@@ -32,7 +32,7 @@ import (
"github.com/merico-dev/graphql"
)
-const RAW_CHECK_RUNS_TABLE = "github_graphql_check_runs"
+const RAW_GRAPHQL_JOBS_TABLE = "github_graphql_jobs"
type GraphqlQueryCheckRunWrapper struct {
RateLimit struct {
@@ -42,12 +42,14 @@ type GraphqlQueryCheckRunWrapper struct {
}
type GraphqlQueryCheckSuite struct {
- Id string
- Typename string `graphql:"__typename"`
+ Id string
+ Typename string `graphql:"__typename"`
+ // equal to Run in rest
CheckSuite struct {
WorkflowRun struct {
DatabaseId int
}
+ // equal to Job in rest
CheckRuns struct {
TotalCount int
Nodes []struct {
@@ -86,28 +88,28 @@ type SimpleWorkflowRun struct {
CheckSuiteNodeID string
}
-var CollectCheckRunMeta = plugin.SubTaskMeta{
- Name: "CollectCheckRun",
- EntryPoint: CollectCheckRun,
+var CollectGraphqlJobsMeta = plugin.SubTaskMeta{
+ Name: "CollectGraphqlJobs",
+ EntryPoint: CollectGraphqlJobs,
EnabledByDefault: true,
- Description: "Collect CheckRun data from GithubGraphql api",
+ Description: "Collect Jobs(CheckRun) data from GithubGraphql api",
DomainTypes: []string{plugin.DOMAIN_TYPE_CICD},
}
var _ plugin.SubTaskEntryPoint = CollectAccount
-func CollectCheckRun(taskCtx plugin.SubTaskContext) errors.Error {
+func CollectGraphqlJobs(taskCtx plugin.SubTaskContext) errors.Error {
logger := taskCtx.GetLogger()
db := taskCtx.GetDal()
data := taskCtx.GetData().(*githubTasks.GithubTaskData)
- collectorWithState, err := helper.NewApiCollectorWithState(helper.RawDataSubTaskArgs{
+ collectorWithState, err := helper.NewStatefulApiCollector(helper.RawDataSubTaskArgs{
Ctx: taskCtx,
Params: githubTasks.GithubApiParams{
ConnectionId: data.Options.ConnectionId,
Name: data.Options.Name,
},
- Table: RAW_CHECK_RUNS_TABLE,
+ Table: RAW_GRAPHQL_JOBS_TABLE,
}, data.TimeAfter)
if err != nil {
return err
@@ -121,9 +123,6 @@ func CollectCheckRun(taskCtx plugin.SubTaskContext) errors.Error {
dal.Where("repo_id = ? and connection_id=?", data.Options.GithubId, data.Options.ConnectionId),
dal.Orderby("github_updated_at DESC"),
}
- if collectorWithState.TimeAfter != nil {
- clauses = append(clauses, dal.Where("github_created_at > ?", *collectorWithState.TimeAfter))
- }
if incremental {
clauses = append(clauses, dal.Where("github_updated_at > ?", *collectorWithState.LatestState.LatestSuccessStart))
}
@@ -203,7 +202,6 @@ func CollectCheckRun(taskCtx plugin.SubTaskContext) errors.Error {
return results, nil
},
})
-
if err != nil {
return err
}
diff --git a/backend/plugins/github_graphql/tasks/pr_collector.go b/backend/plugins/github_graphql/tasks/pr_collector.go
index b4b67d81d..a71ea1c2e 100644
--- a/backend/plugins/github_graphql/tasks/pr_collector.go
+++ b/backend/plugins/github_graphql/tasks/pr_collector.go
@@ -36,12 +36,16 @@ type GraphqlQueryPrWrapper struct {
RateLimit struct {
Cost int
}
+ // now it orderBy UPDATED_AT and use cursor pagination
+ // It may miss some PRs updated when collection.
+ // Because these missed PRs will be collected on next, But it's not enough.
+ // So Next Millstone(0.17) we should change it to filter by CREATE_AT + collect detail
Repository struct {
PullRequests struct {
PageInfo *api.GraphqlQueryPageInfo
Prs []GraphqlQueryPr `graphql:"nodes"`
TotalCount graphql.Int
- } `graphql:"pullRequests(first: $pageSize, after: $skipCursor, orderBy: {field: CREATED_AT, direction: DESC})"`
+ } `graphql:"pullRequests(first: $pageSize, after: $skipCursor, orderBy: {field: UPDATED_AT, direction: DESC})"`
} `graphql:"repository(owner: $owner, name: $name)"`
}
@@ -130,31 +134,38 @@ func CollectPr(taskCtx plugin.SubTaskContext) errors.Error {
config := data.Options.GithubTransformationRule
var labelTypeRegex *regexp.Regexp
var labelComponentRegex *regexp.Regexp
- var err error
+ var err errors.Error
if config != nil && len(config.PrType) > 0 {
- labelTypeRegex, err = regexp.Compile(config.PrType)
+ labelTypeRegex, err = errors.Convert01(regexp.Compile(config.PrType))
if err != nil {
return errors.Default.Wrap(err, "regexp Compile prType failed")
}
}
if config != nil && len(config.PrComponent) > 0 {
- labelComponentRegex, err = regexp.Compile(config.PrComponent)
+ labelComponentRegex, err = errors.Convert01(regexp.Compile(config.PrComponent))
if err != nil {
return errors.Default.Wrap(err, "regexp Compile prComponent failed")
}
}
- collector, err := api.NewGraphqlCollector(api.GraphqlCollectorArgs{
- RawDataSubTaskArgs: api.RawDataSubTaskArgs{
- Ctx: taskCtx,
- Params: tasks.GithubApiParams{
- ConnectionId: data.Options.ConnectionId,
- Name: data.Options.Name,
- },
- Table: RAW_PRS_TABLE,
+ collectorWithState, err := api.NewStatefulApiCollector(api.RawDataSubTaskArgs{
+ Ctx: taskCtx,
+ Params: tasks.GithubApiParams{
+ ConnectionId: data.Options.ConnectionId,
+ Name: data.Options.Name,
},
+ Table: RAW_PRS_TABLE,
+ }, data.TimeAfter)
+ if err != nil {
+ return err
+ }
+
+ incremental := collectorWithState.IsIncremental()
+
+ err = collectorWithState.InitGraphQLCollector(api.GraphqlCollectorArgs{
GraphqlClient: data.GraphqlClient,
PageSize: 30,
+ Incremental: incremental,
/*
(Optional) Return query string for request, or you can plug them into UrlTemplate directly
*/
@@ -180,7 +191,8 @@ func CollectPr(taskCtx plugin.SubTaskContext) errors.Error {
results := make([]interface{}, 0, 1)
isFinish := false
for _, rawL := range prs {
- if data.TimeAfter != nil && !data.TimeAfter.Before(rawL.CreatedAt) {
+ // collect all data even though in increment mode because of existing data extracting
+ if collectorWithState.TimeAfter != nil && !collectorWithState.TimeAfter.Before(rawL.UpdatedAt) {
isFinish = true
break
}
@@ -291,12 +303,11 @@ func CollectPr(taskCtx plugin.SubTaskContext) errors.Error {
}
},
})
-
if err != nil {
- return errors.Convert(err)
+ return err
}
- return collector.Execute()
+ return collectorWithState.Execute()
}
func convertGithubPullRequest(pull GraphqlQueryPr, connId uint64, repoId int) (*models.GithubPullRequest, errors.Error) {