You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@devlake.apache.org by ab...@apache.org on 2022/11/01 07:35:22 UTC
[incubator-devlake] branch release-v0.14 updated: cherry-pick some github & graphql fix to 0.14 (#3629)
This is an automated email from the ASF dual-hosted git repository.
abeizn pushed a commit to branch release-v0.14
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git
The following commit(s) were added to refs/heads/release-v0.14 by this push:
new 46fed529 cherry-pick some github & graphql fix to 0.14 (#3629)
46fed529 is described below
commit 46fed5293b0fc058e52e3cfa3903dcbfab3cd93d
Author: Likyh <l...@likyh.com>
AuthorDate: Tue Nov 1 15:35:18 2022 +0800
cherry-pick some github & graphql fix to 0.14 (#3629)
* fix: change the dealing type for error
* feat: add close method to finish rateLimit timer
* feat: only sync updated github user to domain layer
* feat: support 2 plugin in make dev
* fix: replace \0 to '0x00' for github issue body
* feat: add retry logic for graphql
* feat: deal data errors in github graphql users collector
* fix: fix the bug for first user in each page droped
Co-authored-by: linyh <ya...@meri.co>
* fix: fix e2e and linter (#3627)
Co-authored-by: linyh <ya...@meri.co>
Co-authored-by: linyh <ya...@meri.co>
---
go.mod | 2 +-
go.sum | 2 +
plugins/github/e2e/account_test.go | 3 +
.../snapshot_tables/_tool_github_repo_accounts.csv | 16 +++
plugins/github/e2e/snapshot_tables/account.csv | 8 --
plugins/github/tasks/account_convertor.go | 14 +-
plugins/github_graphql/plugin_main.go | 21 ++-
plugins/github_graphql/tasks/account_collector.go | 13 +-
plugins/github_graphql/tasks/issue_collector.go | 3 +-
plugins/helper/graphql_async_client.go | 149 ++++++++++++++-------
plugins/helper/graphql_collector.go | 115 +++++++++++-----
scripts/compile-plugins.sh | 7 +
12 files changed, 247 insertions(+), 106 deletions(-)
diff --git a/go.mod b/go.mod
index 0fc69e61..96192cd9 100644
--- a/go.mod
+++ b/go.mod
@@ -15,7 +15,7 @@ require (
github.com/libgit2/git2go/v33 v33.0.6
github.com/magiconair/properties v1.8.5
github.com/manifoldco/promptui v0.9.0
- github.com/merico-dev/graphql v0.0.0-20220804061427-a2245fa66df2
+ github.com/merico-dev/graphql v0.0.0-20221027131946-77460a1fd4cd
github.com/mitchellh/mapstructure v1.4.1
github.com/panjf2000/ants/v2 v2.4.6
github.com/robfig/cron/v3 v3.0.0
diff --git a/go.sum b/go.sum
index 1d105828..8f17cd22 100644
--- a/go.sum
+++ b/go.sum
@@ -542,6 +542,8 @@ github.com/mediocregopher/radix/v3 v3.3.0/go.mod h1:EmfVyvspXz1uZEyPBMyGK+kjWiKQ
github.com/mediocregopher/radix/v3 v3.4.2/go.mod h1:8FL3F6UQRXHXIBSPUs5h0RybMF8i4n7wVopoX3x7Bv8=
github.com/merico-dev/graphql v0.0.0-20220804061427-a2245fa66df2 h1:sOXuZIg3OwBnvJFfIuO8wegiLpeDCOSVvk2dsbjurd8=
github.com/merico-dev/graphql v0.0.0-20220804061427-a2245fa66df2/go.mod h1:dcDqG8HXVtfEhTCipFMa0Q+RTKTtDKIO2vJt+JVzHEQ=
+github.com/merico-dev/graphql v0.0.0-20221027131946-77460a1fd4cd h1:hGQXd4a72JSFIZE+ZVkH5ivE925PGogjob6stgc2too=
+github.com/merico-dev/graphql v0.0.0-20221027131946-77460a1fd4cd/go.mod h1:dcDqG8HXVtfEhTCipFMa0Q+RTKTtDKIO2vJt+JVzHEQ=
github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d h1:5PJl274Y63IEHC+7izoQE9x6ikvDFZS2mDVS3drnohI=
github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d/go.mod h1:01TrycV0kFyexm33Z7vhZRXopbI8J3TDReVlkTgMUxE=
github.com/microcosm-cc/bluemonday v1.0.2/go.mod h1:iVP4YcDBq+n/5fb23BhYFvIMq/leAFZyRl6bYmGDlGc=
diff --git a/plugins/github/e2e/account_test.go b/plugins/github/e2e/account_test.go
index 5619de28..10e8405a 100644
--- a/plugins/github/e2e/account_test.go
+++ b/plugins/github/e2e/account_test.go
@@ -65,6 +65,9 @@ func TestAccountDataFlow(t *testing.T) {
IgnoreTypes: []interface{}{common.NoPKModel{}},
})
+ // ConvertAccountsMeta only convert the account in this repo
+ dataflowTester.ImportCsvIntoTabler("./snapshot_tables/_tool_github_repo_accounts.csv", &models.GithubRepoAccount{})
+
// verify converter
dataflowTester.FlushTabler(&crossdomain.Account{})
dataflowTester.Subtask(tasks.ConvertAccountsMeta, taskData)
diff --git a/plugins/github/e2e/snapshot_tables/_tool_github_repo_accounts.csv b/plugins/github/e2e/snapshot_tables/_tool_github_repo_accounts.csv
new file mode 100644
index 00000000..8b866644
--- /dev/null
+++ b/plugins/github/e2e/snapshot_tables/_tool_github_repo_accounts.csv
@@ -0,0 +1,16 @@
+connection_id,account_id,repo_github_id,login
+1,21979,134018330,appleboy
+1,964542,134018330,sarathsp06
+1,1052632,134018330,runner-mei
+1,3794113,134018330,shanhuhai5739
+1,3971390,134018330,ppmoon
+1,7496278,134018330,panjf2000
+1,8518239,134018330,gitter-badger
+1,11763614,2,Moonlight-Zhao
+1,12420699,2,shanghai-Jerry
+1,14950473,2,zqkgo
+1,22429695,2,codecov[bot]
+1,24841832,2,rikewang
+1,31087327,2,chensanle
+1,32893410,2,zhangyuanxue
+1,38849208,2,king526
\ No newline at end of file
diff --git a/plugins/github/e2e/snapshot_tables/account.csv b/plugins/github/e2e/snapshot_tables/account.csv
index adf6a806..474f1575 100644
--- a/plugins/github/e2e/snapshot_tables/account.csv
+++ b/plugins/github/e2e/snapshot_tables/account.csv
@@ -1,15 +1,7 @@
id,email,full_name,user_name,avatar_url,organization,created_date,status
github:GithubAccount:1:1052632,runner.mei@,runner,runner-mei,https://avatars.githubusercontent.com/u/1052632?v=4,,,0
-github:GithubAccount:1:11763614,zhaozh90@163.com,Jerry,Moonlight-Zhao,https://avatars.githubusercontent.com/u/11763614?v=4,,,0
-github:GithubAccount:1:12420699,,Jerry You,shanghai-Jerry,https://avatars.githubusercontent.com/u/12420699?v=4,,,0
-github:GithubAccount:1:14950473,,Z.Q.K,zqkgo,https://avatars.githubusercontent.com/u/14950473?v=4,ankiband,,0
github:GithubAccount:1:21979,appleboy.tw@gmail.com,Bo-Yi Wu,appleboy,https://avatars.githubusercontent.com/u/21979?v=4,"COSCUP,nodejs-tw,moztw,h5bp,CodeIgniter-TW,drone,Getmore,golangtw,laravel-taiwan,go-xorm,gin-gonic,PHPConf-TW,Mediatek-Cloud,SJFinder,go-gitea,laradock,gin-contrib,tagfans,maintainers,go-training,go-ggz,the-benchmarker,golang-queue",,0
-github:GithubAccount:1:22429695,,,codecov[bot],https://avatars.githubusercontent.com/in/254?v=4,,,0
-github:GithubAccount:1:24841832,,,rikewang,https://avatars.githubusercontent.com/u/24841832?v=4,,,0
-github:GithubAccount:1:31087327,,sanle,chensanle,https://avatars.githubusercontent.com/u/31087327?v=4,,,0
-github:GithubAccount:1:32893410,,zyx,zhangyuanxue,https://avatars.githubusercontent.com/u/32893410?v=4,,,0
github:GithubAccount:1:3794113,shanhu5739@gmail.com,Derek,shanhuhai5739,https://avatars.githubusercontent.com/u/3794113?v=4,,,0
-github:GithubAccount:1:38849208,,,king526,https://avatars.githubusercontent.com/u/38849208?v=4,,,0
github:GithubAccount:1:3971390,cnliuyunpeng@gmail.com,ppmoon,ppmoon,https://avatars.githubusercontent.com/u/3971390?v=4,,,0
github:GithubAccount:1:7496278,i@andypan.me,Andy Pan,panjf2000,https://avatars.githubusercontent.com/u/7496278?v=4,,,0
github:GithubAccount:1:8518239,badger@gitter.im,The Gitter Badger,gitter-badger,https://avatars.githubusercontent.com/u/8518239?v=4,,,0
diff --git a/plugins/github/tasks/account_convertor.go b/plugins/github/tasks/account_convertor.go
index e8643226..328de4a0 100644
--- a/plugins/github/tasks/account_convertor.go
+++ b/plugins/github/tasks/account_convertor.go
@@ -52,7 +52,19 @@ func ConvertAccounts(taskCtx core.SubTaskContext) errors.Error {
db := taskCtx.GetDal()
data := taskCtx.GetData().(*GithubTaskData)
- cursor, err := db.Cursor(dal.From(&githubModels.GithubAccount{}), dal.Where("connection_id = ?", data.Options.ConnectionId))
+ cursor, err := db.Cursor(
+ dal.Select("_tool_github_accounts.*"),
+ dal.From(&githubModels.GithubAccount{}),
+ dal.Where(
+ "repo_github_id = ? and _tool_github_accounts.connection_id=?",
+ data.Repo.GithubId,
+ data.Options.ConnectionId,
+ ),
+ dal.Join(`left join _tool_github_repo_accounts gra on (
+ _tool_github_accounts.connection_id = gra.connection_id
+ AND _tool_github_accounts.id = gra.account_id
+ )`),
+ )
if err != nil {
return err
}
diff --git a/plugins/github_graphql/plugin_main.go b/plugins/github_graphql/plugin_main.go
index 493c1f59..a2a8b84a 100644
--- a/plugins/github_graphql/plugin_main.go
+++ b/plugins/github_graphql/plugin_main.go
@@ -19,6 +19,7 @@ package main
import (
"context"
+ "fmt"
"github.com/apache/incubator-devlake/errors"
"github.com/apache/incubator-devlake/plugins/core"
"github.com/apache/incubator-devlake/plugins/github/models"
@@ -128,17 +129,23 @@ func (plugin GithubGraphql) PrepareTaskData(taskCtx core.TaskContext, options ma
)
httpClient := oauth2.NewClient(taskCtx.GetContext(), src)
client := graphql.NewClient(connection.Endpoint+`graphql`, httpClient)
- graphqlClient := helper.CreateAsyncGraphqlClient(taskCtx.GetContext(), client, taskCtx.GetLogger(),
+ graphqlClient, err := helper.CreateAsyncGraphqlClient(taskCtx, client, taskCtx.GetLogger(),
func(ctx context.Context, client *graphql.Client, logger core.Logger) (rateRemaining int, resetAt *time.Time, err errors.Error) {
var query GraphQueryRateLimit
- err = errors.Convert(client.Query(taskCtx.GetContext(), &query, nil))
+ dataErrors, err := errors.Convert01(client.Query(taskCtx.GetContext(), &query, nil))
if err != nil {
return 0, nil, err
}
+ if len(dataErrors) > 0 {
+ return 0, nil, errors.Default.Wrap(dataErrors[0], `query rate limit fail`)
+ }
logger.Info(`github graphql init success with remaining %d/%d and will reset at %s`,
query.RateLimit.Remaining, query.RateLimit.Limit, query.RateLimit.ResetAt)
return int(query.RateLimit.Remaining), &query.RateLimit.ResetAt, nil
})
+ if err != nil {
+ return nil, err
+ }
graphqlClient.SetGetRateCost(func(q interface{}) int {
v := reflect.ValueOf(q)
@@ -166,6 +173,16 @@ func (plugin GithubGraphql) ApiResources() map[string]map[string]core.ApiResourc
return nil
}
+func (plugin GithubGraphql) Close(taskCtx core.TaskContext) errors.Error {
+ data, ok := taskCtx.GetData().(*githubTasks.GithubTaskData)
+ if !ok {
+ return errors.Default.New(fmt.Sprintf("GetData failed when try to close %+v", taskCtx))
+ }
+ data.ApiClient.Release()
+ data.GraphqlClient.Release()
+ return nil
+}
+
// standalone mode for debugging
func main() {
cmd := &cobra.Command{Use: "githubGraphql"}
diff --git a/plugins/github_graphql/tasks/account_collector.go b/plugins/github_graphql/tasks/account_collector.go
index d095cdd7..89e05df8 100644
--- a/plugins/github_graphql/tasks/account_collector.go
+++ b/plugins/github_graphql/tasks/account_collector.go
@@ -96,10 +96,9 @@ func CollectAccount(taskCtx core.SubTaskContext) errors.Error {
},
Table: RAW_ACCOUNTS_TABLE,
},
- IgnoreQueryErr: true,
- Input: iterator,
- InputStep: 100,
- GraphqlClient: data.GraphqlClient,
+ Input: iterator,
+ InputStep: 100,
+ GraphqlClient: data.GraphqlClient,
BuildQuery: func(reqData *helper.GraphqlRequestData) (interface{}, map[string]interface{}, error) {
accounts := reqData.Input.([]interface{})
query := &GraphqlQueryAccountWrapper{}
@@ -115,7 +114,11 @@ func CollectAccount(taskCtx core.SubTaskContext) errors.Error {
}
return query, variables, nil
},
- ResponseParser: func(iQuery interface{}, variables map[string]interface{}) ([]interface{}, error) {
+ ResponseParserWithDataErrors: func(iQuery interface{}, variables map[string]interface{}, dataErrors []graphql.DataError) ([]interface{}, error) {
+ for _, dataError := range dataErrors {
+ // log and ignore
+ taskCtx.GetLogger().Warn(dataError, `query user get error but ignore`)
+ }
query := iQuery.(*GraphqlQueryAccountWrapper)
accounts := query.Users
diff --git a/plugins/github_graphql/tasks/issue_collector.go b/plugins/github_graphql/tasks/issue_collector.go
index f54e4cee..9ceb85a3 100644
--- a/plugins/github_graphql/tasks/issue_collector.go
+++ b/plugins/github_graphql/tasks/issue_collector.go
@@ -25,6 +25,7 @@ import (
githubTasks "github.com/apache/incubator-devlake/plugins/github/tasks"
"github.com/apache/incubator-devlake/plugins/helper"
"github.com/merico-dev/graphql"
+ "strings"
"time"
)
@@ -170,7 +171,7 @@ func convertGithubIssue(issue GraphqlQueryIssue, connectionId uint64, repository
Number: issue.Number,
State: issue.State,
Title: issue.Title,
- Body: issue.Body,
+ Body: strings.ReplaceAll(issue.Body, "\x00", `<0x00>`),
Url: issue.Url,
ClosedAt: issue.ClosedAt,
GithubCreatedAt: issue.CreatedAt,
diff --git a/plugins/helper/graphql_async_client.go b/plugins/helper/graphql_async_client.go
index fcd46aef..765bf7d3 100644
--- a/plugins/helper/graphql_async_client.go
+++ b/plugins/helper/graphql_async_client.go
@@ -19,23 +19,27 @@ package helper
import (
"context"
+ "fmt"
+ "sync"
+ "time"
+
"github.com/apache/incubator-devlake/errors"
"github.com/apache/incubator-devlake/plugins/core"
+ "github.com/apache/incubator-devlake/utils"
"github.com/merico-dev/graphql"
- "sync"
- "time"
)
// GraphqlAsyncClient send graphql one by one
type GraphqlAsyncClient struct {
- ctx context.Context
- cancel context.CancelFunc
- client *graphql.Client
- logger core.Logger
- mu sync.Mutex
- waitGroup sync.WaitGroup
- workerErrors []error
+ ctx context.Context
+ cancel context.CancelFunc
+ client *graphql.Client
+ logger core.Logger
+ mu sync.Mutex
+ waitGroup sync.WaitGroup
+ maxRetry int
+ waitBeforeRetry time.Duration
rateExhaustCond *sync.Cond
rateRemaining int
getRateRemaining func(context.Context, *graphql.Client, core.Logger) (rateRemaining int, resetAt *time.Time, err errors.Error)
@@ -44,12 +48,12 @@ type GraphqlAsyncClient struct {
// CreateAsyncGraphqlClient creates a new GraphqlAsyncClient
func CreateAsyncGraphqlClient(
- ctx context.Context,
+ taskCtx core.TaskContext,
graphqlClient *graphql.Client,
logger core.Logger,
getRateRemaining func(context.Context, *graphql.Client, core.Logger) (rateRemaining int, resetAt *time.Time, err errors.Error),
-) *GraphqlAsyncClient {
- ctxWithCancel, cancel := context.WithCancel(ctx)
+) (*GraphqlAsyncClient, errors.Error) {
+ ctxWithCancel, cancel := context.WithCancel(taskCtx.GetContext())
graphqlAsyncClient := &GraphqlAsyncClient{
ctx: ctxWithCancel,
cancel: cancel,
@@ -59,14 +63,48 @@ func CreateAsyncGraphqlClient(
rateRemaining: 0,
getRateRemaining: getRateRemaining,
}
+
if getRateRemaining != nil {
- rateRemaining, resetAt, err := getRateRemaining(ctx, graphqlClient, logger)
+ rateRemaining, resetAt, err := getRateRemaining(taskCtx.GetContext(), graphqlClient, logger)
if err != nil {
panic(err)
}
graphqlAsyncClient.updateRateRemaining(rateRemaining, resetAt)
}
- return graphqlAsyncClient
+
+ // load retry/timeout from configuration
+ // use API_RETRY as max retry time
+ // use API_TIMEOUT as retry before wait seconds to confirm the prev request finish
+ timeout := 30 * time.Second
+ retry, err := utils.StrToIntOr(taskCtx.GetConfig("API_RETRY"), 3)
+ if err != nil {
+ return nil, errors.BadInput.Wrap(err, "failed to parse API_RETRY")
+ }
+ timeoutConf := taskCtx.GetConfig("API_TIMEOUT")
+ if timeoutConf != "" {
+ // override timeout value if API_TIMEOUT is provided
+ timeout, err = errors.Convert01(time.ParseDuration(timeoutConf))
+ if err != nil {
+ return nil, errors.BadInput.Wrap(err, "failed to parse API_TIMEOUT")
+ }
+ }
+ graphqlAsyncClient.SetMaxRetry(retry, timeout)
+
+ return graphqlAsyncClient, nil
+}
+
+// GetMaxRetry returns the maximum retry attempts for a request
+func (apiClient *GraphqlAsyncClient) GetMaxRetry() (int, time.Duration) {
+ return apiClient.maxRetry, apiClient.waitBeforeRetry
+}
+
+// SetMaxRetry sets the maximum retry attempts for a request
+func (apiClient *GraphqlAsyncClient) SetMaxRetry(
+ maxRetry int,
+ waitBeforeRetry time.Duration,
+) {
+ apiClient.maxRetry = maxRetry
+ apiClient.waitBeforeRetry = waitBeforeRetry
}
// updateRateRemaining call getRateRemaining to update rateRemaining periodically
@@ -80,12 +118,16 @@ func (apiClient *GraphqlAsyncClient) updateRateRemaining(rateRemaining int, rese
if resetAt != nil && resetAt.After(time.Now()) {
nextDuring = time.Until(*resetAt)
}
- <-time.After(nextDuring)
- newRateRemaining, newResetAt, err := apiClient.getRateRemaining(apiClient.ctx, apiClient.client, apiClient.logger)
- if err != nil {
- panic(err)
+ select {
+ case <-apiClient.ctx.Done():
+ return
+ case <-time.After(nextDuring):
+ newRateRemaining, newResetAt, err := apiClient.getRateRemaining(apiClient.ctx, apiClient.client, apiClient.logger)
+ if err != nil {
+ panic(err)
+ }
+ apiClient.updateRateRemaining(newRateRemaining, newResetAt)
}
- apiClient.updateRateRemaining(newRateRemaining, newResetAt)
}()
}
@@ -96,7 +138,9 @@ func (apiClient *GraphqlAsyncClient) SetGetRateCost(getRateCost func(q interface
}
// Query send a graphql request when get lock
-func (apiClient *GraphqlAsyncClient) Query(q interface{}, variables map[string]interface{}) errors.Error {
+// []graphql.DataError are the errors returned in response body
+// errors.Error is other error
+func (apiClient *GraphqlAsyncClient) Query(q interface{}, variables map[string]interface{}) ([]graphql.DataError, errors.Error) {
apiClient.waitGroup.Add(1)
defer apiClient.waitGroup.Done()
apiClient.mu.Lock()
@@ -108,26 +152,40 @@ func (apiClient *GraphqlAsyncClient) Query(q interface{}, variables map[string]i
apiClient.logger.Info(`rate limit remaining exhausted, waiting for next period.`)
apiClient.rateExhaustCond.Wait()
}
- select {
- case <-apiClient.ctx.Done():
- return nil
- default:
- err := apiClient.client.Query(apiClient.ctx, q, variables)
- if err != nil {
- return errors.Default.Wrap(err, "error making GraphQL call")
- }
- cost := 1
- if apiClient.getRateCost != nil {
- cost = apiClient.getRateCost(q)
+
+ retryTime := 0
+ var err error
+ // if it needs retry, check and retry
+ for retryTime < apiClient.maxRetry {
+ select {
+ case <-apiClient.ctx.Done():
+ return nil, nil
+ default:
+ var dataErrors []graphql.DataError
+ dataErrors, err := apiClient.client.Query(apiClient.ctx, q, variables)
+ if err != nil {
+ apiClient.logger.Warn(err, "retry #%d graphql calling after %ds", retryTime, apiClient.waitBeforeRetry/time.Second)
+ retryTime++
+ <-time.After(apiClient.waitBeforeRetry)
+ continue
+ }
+ if dataErrors != nil {
+ return dataErrors, nil
+ }
+ cost := 1
+ if apiClient.getRateCost != nil {
+ cost = apiClient.getRateCost(q)
+ }
+ apiClient.rateRemaining -= cost
+ apiClient.logger.Debug(`query cost %d in %v`, cost, variables)
+ return nil, nil
}
- apiClient.rateRemaining -= cost
- apiClient.logger.Debug(`query cost %d in %v`, cost, variables)
- return nil
}
+ return nil, errors.Default.Wrap(err, fmt.Sprintf("got error when querying GraphQL (from the %dth retry)", retryTime))
}
// NextTick to return the NextTick of scheduler
-func (apiClient *GraphqlAsyncClient) NextTick(task func() errors.Error) {
+func (apiClient *GraphqlAsyncClient) NextTick(task func() errors.Error, taskErrorChecker func(err errors.Error)) {
// to make sure task will be enqueued
apiClient.waitGroup.Add(1)
go func() {
@@ -140,29 +198,18 @@ func (apiClient *GraphqlAsyncClient) NextTick(task func() errors.Error) {
// But if done out of this go func, so task will run after waitGroup finish
// I have no idea about this now...
defer apiClient.waitGroup.Done()
- apiClient.checkError(task())
+ taskErrorChecker(task())
}()
}
}()
}
// Wait blocks until all async requests were done
-func (apiClient *GraphqlAsyncClient) Wait() errors.Error {
+func (apiClient *GraphqlAsyncClient) Wait() {
apiClient.waitGroup.Wait()
- if len(apiClient.workerErrors) > 0 {
- return errors.Default.Combine(apiClient.workerErrors)
- }
- return nil
-}
-
-func (apiClient *GraphqlAsyncClient) checkError(err errors.Error) {
- if err == nil {
- return
- }
- apiClient.workerErrors = append(apiClient.workerErrors, err)
}
-// HasError return if any error occurred
-func (apiClient *GraphqlAsyncClient) HasError() bool {
- return len(apiClient.workerErrors) > 0
+// Release will release the ApiAsyncClient with scheduler
+func (apiClient *GraphqlAsyncClient) Release() {
+ apiClient.cancel()
}
diff --git a/plugins/helper/graphql_collector.go b/plugins/helper/graphql_collector.go
index 07999394..1091e199 100644
--- a/plugins/helper/graphql_collector.go
+++ b/plugins/helper/graphql_collector.go
@@ -65,18 +65,20 @@ type GraphqlCollectorArgs struct {
Input Iterator
// how many times fetched from input, default 1 means only fetch once
// NOTICE: InputStep=1 will fill value as item and InputStep>1 will fill value as []item
- InputStep int
- IgnoreQueryErr bool
+ InputStep int
// GetPageInfo is to tell `GraphqlCollector` is page information
- GetPageInfo func(query interface{}, args *GraphqlCollectorArgs) (*GraphqlQueryPageInfo, error)
- BatchSize int
- ResponseParser func(query interface{}, variables map[string]interface{}) ([]interface{}, error)
+ GetPageInfo func(query interface{}, args *GraphqlCollectorArgs) (*GraphqlQueryPageInfo, error)
+ BatchSize int
+ // one of ResponseParser and ResponseParserEvenWhenDataErrors is required to parse response
+ ResponseParser func(query interface{}, variables map[string]interface{}) ([]interface{}, error)
+ ResponseParserWithDataErrors func(query interface{}, variables map[string]interface{}, dataErrors []graphql.DataError) ([]interface{}, error)
}
// GraphqlCollector help you collect data from Graphql services
type GraphqlCollector struct {
*RawDataSubTask
- args *GraphqlCollectorArgs
+ args *GraphqlCollectorArgs
+ workerErrors []error
}
// NewGraphqlCollector allocates a new GraphqlCollector with the given args.
@@ -94,8 +96,8 @@ func NewGraphqlCollector(args GraphqlCollectorArgs) (*GraphqlCollector, errors.E
if args.GraphqlClient == nil {
return nil, errors.Default.New("ApiClient is required")
}
- if args.ResponseParser == nil {
- return nil, errors.Default.New("ResponseParser is required")
+ if args.ResponseParser == nil && args.ResponseParserWithDataErrors == nil {
+ return nil, errors.Default.New("one of ResponseParser and ResponseParserWithDataErrors is required")
}
apicllector := &GraphqlCollector{
RawDataSubTask: rawDataSubTask,
@@ -143,23 +145,30 @@ func (collector *GraphqlCollector) Execute() errors.Error {
if collector.args.Input != nil {
iterator := collector.args.Input
defer iterator.Close()
- apiClient := collector.args.GraphqlClient
- for iterator.HasNext() && !apiClient.HasError() {
- if collector.args.InputStep == 1 {
+ // the comment about difference is written at GraphqlCollectorArgs.InputStep
+ if collector.args.InputStep == 1 {
+ for iterator.HasNext() && !collector.HasError() {
input, err := iterator.Fetch()
if err != nil {
+ collector.checkError(err)
break
}
collector.exec(divider, input)
- } else {
+ }
+ } else {
+ for !collector.HasError() {
var inputs []interface{}
for i := 0; i < collector.args.InputStep && iterator.HasNext(); i++ {
input, err := iterator.Fetch()
if err != nil {
+ collector.checkError(err)
break
}
inputs = append(inputs, input)
}
+ if inputs == nil {
+ break
+ }
collector.exec(divider, inputs)
}
}
@@ -169,23 +178,25 @@ func (collector *GraphqlCollector) Execute() errors.Error {
}
logger.Debug("wait for all async api to finished")
+ collector.args.GraphqlClient.Wait()
- err = collector.args.GraphqlClient.Wait()
- if err != nil {
- err = errors.Default.Wrap(err, "ended API collector execution with error")
- logger.Error(err, "")
+ if collector.HasError() {
+ err = errors.Default.Combine(collector.workerErrors)
+ logger.Error(err, "ended Graphql collector execution with error")
+ logger.Error(collector.workerErrors[0], "the first error of them")
+ return err
} else {
logger.Info("ended api collection without error")
}
- err = divider.Close()
+ err = divider.Close()
return err
}
func (collector *GraphqlCollector) exec(divider *BatchSaveDivider, input interface{}) {
inputJson, err := json.Marshal(input)
if err != nil {
- panic(err)
+ collector.checkError(errors.Default.Wrap(err, `input can not be marshal to json`))
}
reqData := new(GraphqlRequestData)
reqData.Input = input
@@ -210,6 +221,9 @@ func (collector *GraphqlCollector) fetchOneByOne(divider *BatchSaveDivider, reqD
if err != nil {
return errors.Default.Wrap(err, "fetchPagesDetermined get totalPages failed")
}
+ if pageInfo == nil {
+ return errors.Default.New("fetchPagesDetermined got pageInfo is nil")
+ }
if pageInfo.HasNextPage {
collector.args.GraphqlClient.NextTick(func() errors.Error {
reqDataTemp := &GraphqlRequestData{
@@ -222,7 +236,7 @@ func (collector *GraphqlCollector) fetchOneByOne(divider *BatchSaveDivider, reqD
}
collector.fetchAsync(divider, reqDataTemp, fetchNextPage)
return nil
- })
+ }, collector.checkError)
}
return nil
}
@@ -238,30 +252,36 @@ func (collector *GraphqlCollector) fetchAsync(divider *BatchSaveDivider, reqData
}
query, variables, err := collector.args.BuildQuery(reqData)
if err != nil {
- panic(err)
+ collector.checkError(errors.Default.Wrap(err, `graphql collector BuildQuery failed`))
+ return
}
logger := collector.args.Ctx.GetLogger()
- err = collector.args.GraphqlClient.Query(query, variables)
+ dataErrors, err := collector.args.GraphqlClient.Query(query, variables)
if err != nil {
- if collector.args.IgnoreQueryErr {
- logger.Error(err, "fetchAsync failed")
+ collector.checkError(errors.Default.Wrap(err, `graphql query failed`))
+ return
+ }
+ if len(dataErrors) > 0 {
+ if collector.args.ResponseParserWithDataErrors == nil {
+ collector.checkError(errors.Default.Wrap(err, `graphql query got error`))
return
- } else {
- panic(err)
}
+ // else: error will deal by ResponseParserWithDataErrors
}
defer logger.Debug("fetchAsync >>> done for %v %v", query, variables)
paramsBytes, err := json.Marshal(query)
if err != nil {
- panic(err)
+ collector.checkError(errors.Default.Wrap(err, `graphql collector marshal query failed`))
+ return
}
db := collector.args.Ctx.GetDal()
queryStr, _ := graphql.ConstructQuery(query, variables)
variablesJson, err := json.Marshal(variables)
if err != nil {
- panic(err)
+ collector.checkError(errors.Default.Wrap(err, `variables in graphql query can not marshal to json`))
+ return
}
row := &RawData{
Params: collector.params,
@@ -271,12 +291,21 @@ func (collector *GraphqlCollector) fetchAsync(divider *BatchSaveDivider, reqData
}
err = db.Create(row, dal.From(collector.table))
if err != nil {
- panic(err)
+ collector.checkError(errors.Default.Wrap(err, `not created row table in graphql collector`))
+ return
}
- results, err := collector.args.ResponseParser(query, variables)
+ var (
+ results []interface{}
+ )
+ if len(dataErrors) > 0 || collector.args.ResponseParser == nil {
+ results, err = collector.args.ResponseParserWithDataErrors(query, variables, dataErrors)
+ } else {
+ results, err = collector.args.ResponseParser(query, variables)
+ }
if err != nil {
- panic(err)
+ collector.checkError(errors.Default.Wrap(err, `not parsed response in graphql collector`))
+ return
}
RAW_DATA_ORIGIN := "RawDataOrigin"
@@ -285,7 +314,8 @@ func (collector *GraphqlCollector) fetchAsync(divider *BatchSaveDivider, reqData
// get the batch operator for the specific type
batch, err := divider.ForType(reflect.TypeOf(result))
if err != nil {
- panic(err)
+ collector.checkError(err)
+ return
}
// set raw data origin field
origin := reflect.ValueOf(result).Elem().FieldByName(RAW_DATA_ORIGIN)
@@ -299,20 +329,31 @@ func (collector *GraphqlCollector) fetchAsync(divider *BatchSaveDivider, reqData
// records get saved into db when slots were max outed
err = batch.Add(result)
if err != nil {
- panic(err)
+ collector.checkError(err)
+ return
}
collector.args.Ctx.IncProgress(1)
}
- if err != nil {
- panic(err)
- }
collector.args.Ctx.IncProgress(1)
if handler != nil {
err = handler(query)
if err != nil {
- panic(err)
+ collector.checkError(errors.Default.Wrap(err, `handle failed in graphql collector`))
+ return
}
}
}
-var _ core.SubTask = (*ApiCollector)(nil)
+func (collector *GraphqlCollector) checkError(err errors.Error) {
+ if err == nil {
+ return
+ }
+ collector.workerErrors = append(collector.workerErrors, err)
+}
+
+// HasError return if any error occurred
+func (collector *GraphqlCollector) HasError() bool {
+ return len(collector.workerErrors) > 0
+}
+
+var _ core.SubTask = (*GraphqlCollector)(nil)
diff --git a/scripts/compile-plugins.sh b/scripts/compile-plugins.sh
index 36fdd7c7..f14eb74e 100755
--- a/scripts/compile-plugins.sh
+++ b/scripts/compile-plugins.sh
@@ -21,18 +21,21 @@
#
# compile specific plugin and fire up api server:
# PLUGIN=<PLUGIN_NAME> make dev
+# PLUGIN=<PLUGIN_NAME> PLUGIN2=<PLUGIN_NAME2> make dev
#
# compile all plugins and fire up api server in DEBUG MODE with `delve`:
# make debug
#
# compile specific plugin and fire up api server in DEBUG MODE with `delve`:
# PLUGIN=<PLUGIN_NAME> make dev
+# PLUGIN=<PLUGIN_NAME> PLUGIN2=<PLUGIN_NAME> make dev
set -e
echo "Usage: "
echo " build all plugins: $0 [golang build flags...]"
echo " build and keep one plugin only: PLUGIN=jira $0 [golang build flags...]"
+echo " build and keep two plugin only: PLUGIN=jira PLUGIN2=github $0 [golang build flags...]"
SCRIPT_DIR="$( cd "$( dirname "$0" )" && pwd )"
PLUGIN_SRC_DIR=$SCRIPT_DIR/../plugins
@@ -44,6 +47,10 @@ else
PLUGINS=$PLUGIN_SRC_DIR/$PLUGIN
fi
+if [ $PLUGIN ] && [ $PLUGIN2 ]; then
+ PLUGINS="$PLUGINS $PLUGIN_SRC_DIR/$PLUGIN2"
+fi
+
rm -rf $PLUGIN_OUTPUT_DIR/*
PIDS=""