You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@devlake.apache.org by ab...@apache.org on 2022/11/01 07:35:22 UTC

[incubator-devlake] branch release-v0.14 updated: cherry-pick some github & graphql fix to 0.14 (#3629)

This is an automated email from the ASF dual-hosted git repository.

abeizn pushed a commit to branch release-v0.14
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git


The following commit(s) were added to refs/heads/release-v0.14 by this push:
     new 46fed529 cherry-pick some github & graphql fix to 0.14 (#3629)
46fed529 is described below

commit 46fed5293b0fc058e52e3cfa3903dcbfab3cd93d
Author: Likyh <l...@likyh.com>
AuthorDate: Tue Nov 1 15:35:18 2022 +0800

    cherry-pick some github & graphql fix to 0.14 (#3629)
    
    * fix: change the dealing type for error
    
    * feat: add close method to finish rateLimit timer
    
    * feat: only sync updated github user to domain layer
    
    * feat: support 2 plugin in make dev
    
    * fix: replace \0 to '0x00' for github issue body
    
    * feat: add retry logic for graphql
    
    * feat: deal data errors in github graphql users collector
    
    * fix: fix the bug for first user in each page droped
    
    Co-authored-by: linyh <ya...@meri.co>
    
    * fix: fix e2e and linter (#3627)
    
    Co-authored-by: linyh <ya...@meri.co>
    
    Co-authored-by: linyh <ya...@meri.co>
---
 go.mod                                             |   2 +-
 go.sum                                             |   2 +
 plugins/github/e2e/account_test.go                 |   3 +
 .../snapshot_tables/_tool_github_repo_accounts.csv |  16 +++
 plugins/github/e2e/snapshot_tables/account.csv     |   8 --
 plugins/github/tasks/account_convertor.go          |  14 +-
 plugins/github_graphql/plugin_main.go              |  21 ++-
 plugins/github_graphql/tasks/account_collector.go  |  13 +-
 plugins/github_graphql/tasks/issue_collector.go    |   3 +-
 plugins/helper/graphql_async_client.go             | 149 ++++++++++++++-------
 plugins/helper/graphql_collector.go                | 115 +++++++++++-----
 scripts/compile-plugins.sh                         |   7 +
 12 files changed, 247 insertions(+), 106 deletions(-)

diff --git a/go.mod b/go.mod
index 0fc69e61..96192cd9 100644
--- a/go.mod
+++ b/go.mod
@@ -15,7 +15,7 @@ require (
 	github.com/libgit2/git2go/v33 v33.0.6
 	github.com/magiconair/properties v1.8.5
 	github.com/manifoldco/promptui v0.9.0
-	github.com/merico-dev/graphql v0.0.0-20220804061427-a2245fa66df2
+	github.com/merico-dev/graphql v0.0.0-20221027131946-77460a1fd4cd
 	github.com/mitchellh/mapstructure v1.4.1
 	github.com/panjf2000/ants/v2 v2.4.6
 	github.com/robfig/cron/v3 v3.0.0
diff --git a/go.sum b/go.sum
index 1d105828..8f17cd22 100644
--- a/go.sum
+++ b/go.sum
@@ -542,6 +542,8 @@ github.com/mediocregopher/radix/v3 v3.3.0/go.mod h1:EmfVyvspXz1uZEyPBMyGK+kjWiKQ
 github.com/mediocregopher/radix/v3 v3.4.2/go.mod h1:8FL3F6UQRXHXIBSPUs5h0RybMF8i4n7wVopoX3x7Bv8=
 github.com/merico-dev/graphql v0.0.0-20220804061427-a2245fa66df2 h1:sOXuZIg3OwBnvJFfIuO8wegiLpeDCOSVvk2dsbjurd8=
 github.com/merico-dev/graphql v0.0.0-20220804061427-a2245fa66df2/go.mod h1:dcDqG8HXVtfEhTCipFMa0Q+RTKTtDKIO2vJt+JVzHEQ=
+github.com/merico-dev/graphql v0.0.0-20221027131946-77460a1fd4cd h1:hGQXd4a72JSFIZE+ZVkH5ivE925PGogjob6stgc2too=
+github.com/merico-dev/graphql v0.0.0-20221027131946-77460a1fd4cd/go.mod h1:dcDqG8HXVtfEhTCipFMa0Q+RTKTtDKIO2vJt+JVzHEQ=
 github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d h1:5PJl274Y63IEHC+7izoQE9x6ikvDFZS2mDVS3drnohI=
 github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d/go.mod h1:01TrycV0kFyexm33Z7vhZRXopbI8J3TDReVlkTgMUxE=
 github.com/microcosm-cc/bluemonday v1.0.2/go.mod h1:iVP4YcDBq+n/5fb23BhYFvIMq/leAFZyRl6bYmGDlGc=
diff --git a/plugins/github/e2e/account_test.go b/plugins/github/e2e/account_test.go
index 5619de28..10e8405a 100644
--- a/plugins/github/e2e/account_test.go
+++ b/plugins/github/e2e/account_test.go
@@ -65,6 +65,9 @@ func TestAccountDataFlow(t *testing.T) {
 		IgnoreTypes: []interface{}{common.NoPKModel{}},
 	})
 
+	// ConvertAccountsMeta only convert the account in this repo
+	dataflowTester.ImportCsvIntoTabler("./snapshot_tables/_tool_github_repo_accounts.csv", &models.GithubRepoAccount{})
+
 	// verify converter
 	dataflowTester.FlushTabler(&crossdomain.Account{})
 	dataflowTester.Subtask(tasks.ConvertAccountsMeta, taskData)
diff --git a/plugins/github/e2e/snapshot_tables/_tool_github_repo_accounts.csv b/plugins/github/e2e/snapshot_tables/_tool_github_repo_accounts.csv
new file mode 100644
index 00000000..8b866644
--- /dev/null
+++ b/plugins/github/e2e/snapshot_tables/_tool_github_repo_accounts.csv
@@ -0,0 +1,16 @@
+connection_id,account_id,repo_github_id,login
+1,21979,134018330,appleboy
+1,964542,134018330,sarathsp06
+1,1052632,134018330,runner-mei
+1,3794113,134018330,shanhuhai5739
+1,3971390,134018330,ppmoon
+1,7496278,134018330,panjf2000
+1,8518239,134018330,gitter-badger
+1,11763614,2,Moonlight-Zhao
+1,12420699,2,shanghai-Jerry
+1,14950473,2,zqkgo
+1,22429695,2,codecov[bot]
+1,24841832,2,rikewang
+1,31087327,2,chensanle
+1,32893410,2,zhangyuanxue
+1,38849208,2,king526
\ No newline at end of file
diff --git a/plugins/github/e2e/snapshot_tables/account.csv b/plugins/github/e2e/snapshot_tables/account.csv
index adf6a806..474f1575 100644
--- a/plugins/github/e2e/snapshot_tables/account.csv
+++ b/plugins/github/e2e/snapshot_tables/account.csv
@@ -1,15 +1,7 @@
 id,email,full_name,user_name,avatar_url,organization,created_date,status
 github:GithubAccount:1:1052632,runner.mei@,runner,runner-mei,https://avatars.githubusercontent.com/u/1052632?v=4,,,0
-github:GithubAccount:1:11763614,zhaozh90@163.com,Jerry,Moonlight-Zhao,https://avatars.githubusercontent.com/u/11763614?v=4,,,0
-github:GithubAccount:1:12420699,,Jerry You,shanghai-Jerry,https://avatars.githubusercontent.com/u/12420699?v=4,,,0
-github:GithubAccount:1:14950473,,Z.Q.K,zqkgo,https://avatars.githubusercontent.com/u/14950473?v=4,ankiband,,0
 github:GithubAccount:1:21979,appleboy.tw@gmail.com,Bo-Yi Wu,appleboy,https://avatars.githubusercontent.com/u/21979?v=4,"COSCUP,nodejs-tw,moztw,h5bp,CodeIgniter-TW,drone,Getmore,golangtw,laravel-taiwan,go-xorm,gin-gonic,PHPConf-TW,Mediatek-Cloud,SJFinder,go-gitea,laradock,gin-contrib,tagfans,maintainers,go-training,go-ggz,the-benchmarker,golang-queue",,0
-github:GithubAccount:1:22429695,,,codecov[bot],https://avatars.githubusercontent.com/in/254?v=4,,,0
-github:GithubAccount:1:24841832,,,rikewang,https://avatars.githubusercontent.com/u/24841832?v=4,,,0
-github:GithubAccount:1:31087327,,sanle,chensanle,https://avatars.githubusercontent.com/u/31087327?v=4,,,0
-github:GithubAccount:1:32893410,,zyx,zhangyuanxue,https://avatars.githubusercontent.com/u/32893410?v=4,,,0
 github:GithubAccount:1:3794113,shanhu5739@gmail.com,Derek,shanhuhai5739,https://avatars.githubusercontent.com/u/3794113?v=4,,,0
-github:GithubAccount:1:38849208,,,king526,https://avatars.githubusercontent.com/u/38849208?v=4,,,0
 github:GithubAccount:1:3971390,cnliuyunpeng@gmail.com,ppmoon,ppmoon,https://avatars.githubusercontent.com/u/3971390?v=4,,,0
 github:GithubAccount:1:7496278,i@andypan.me,Andy Pan,panjf2000,https://avatars.githubusercontent.com/u/7496278?v=4,,,0
 github:GithubAccount:1:8518239,badger@gitter.im,The Gitter Badger,gitter-badger,https://avatars.githubusercontent.com/u/8518239?v=4,,,0
diff --git a/plugins/github/tasks/account_convertor.go b/plugins/github/tasks/account_convertor.go
index e8643226..328de4a0 100644
--- a/plugins/github/tasks/account_convertor.go
+++ b/plugins/github/tasks/account_convertor.go
@@ -52,7 +52,19 @@ func ConvertAccounts(taskCtx core.SubTaskContext) errors.Error {
 	db := taskCtx.GetDal()
 	data := taskCtx.GetData().(*GithubTaskData)
 
-	cursor, err := db.Cursor(dal.From(&githubModels.GithubAccount{}), dal.Where("connection_id = ?", data.Options.ConnectionId))
+	cursor, err := db.Cursor(
+		dal.Select("_tool_github_accounts.*"),
+		dal.From(&githubModels.GithubAccount{}),
+		dal.Where(
+			"repo_github_id = ? and _tool_github_accounts.connection_id=?",
+			data.Repo.GithubId,
+			data.Options.ConnectionId,
+		),
+		dal.Join(`left join _tool_github_repo_accounts gra on (
+			_tool_github_accounts.connection_id = gra.connection_id
+			AND _tool_github_accounts.id = gra.account_id
+		)`),
+	)
 	if err != nil {
 		return err
 	}
diff --git a/plugins/github_graphql/plugin_main.go b/plugins/github_graphql/plugin_main.go
index 493c1f59..a2a8b84a 100644
--- a/plugins/github_graphql/plugin_main.go
+++ b/plugins/github_graphql/plugin_main.go
@@ -19,6 +19,7 @@ package main
 
 import (
 	"context"
+	"fmt"
 	"github.com/apache/incubator-devlake/errors"
 	"github.com/apache/incubator-devlake/plugins/core"
 	"github.com/apache/incubator-devlake/plugins/github/models"
@@ -128,17 +129,23 @@ func (plugin GithubGraphql) PrepareTaskData(taskCtx core.TaskContext, options ma
 	)
 	httpClient := oauth2.NewClient(taskCtx.GetContext(), src)
 	client := graphql.NewClient(connection.Endpoint+`graphql`, httpClient)
-	graphqlClient := helper.CreateAsyncGraphqlClient(taskCtx.GetContext(), client, taskCtx.GetLogger(),
+	graphqlClient, err := helper.CreateAsyncGraphqlClient(taskCtx, client, taskCtx.GetLogger(),
 		func(ctx context.Context, client *graphql.Client, logger core.Logger) (rateRemaining int, resetAt *time.Time, err errors.Error) {
 			var query GraphQueryRateLimit
-			err = errors.Convert(client.Query(taskCtx.GetContext(), &query, nil))
+			dataErrors, err := errors.Convert01(client.Query(taskCtx.GetContext(), &query, nil))
 			if err != nil {
 				return 0, nil, err
 			}
+			if len(dataErrors) > 0 {
+				return 0, nil, errors.Default.Wrap(dataErrors[0], `query rate limit fail`)
+			}
 			logger.Info(`github graphql init success with remaining %d/%d and will reset at %s`,
 				query.RateLimit.Remaining, query.RateLimit.Limit, query.RateLimit.ResetAt)
 			return int(query.RateLimit.Remaining), &query.RateLimit.ResetAt, nil
 		})
+	if err != nil {
+		return nil, err
+	}
 
 	graphqlClient.SetGetRateCost(func(q interface{}) int {
 		v := reflect.ValueOf(q)
@@ -166,6 +173,16 @@ func (plugin GithubGraphql) ApiResources() map[string]map[string]core.ApiResourc
 	return nil
 }
 
+func (plugin GithubGraphql) Close(taskCtx core.TaskContext) errors.Error {
+	data, ok := taskCtx.GetData().(*githubTasks.GithubTaskData)
+	if !ok {
+		return errors.Default.New(fmt.Sprintf("GetData failed when try to close %+v", taskCtx))
+	}
+	data.ApiClient.Release()
+	data.GraphqlClient.Release()
+	return nil
+}
+
 // standalone mode for debugging
 func main() {
 	cmd := &cobra.Command{Use: "githubGraphql"}
diff --git a/plugins/github_graphql/tasks/account_collector.go b/plugins/github_graphql/tasks/account_collector.go
index d095cdd7..89e05df8 100644
--- a/plugins/github_graphql/tasks/account_collector.go
+++ b/plugins/github_graphql/tasks/account_collector.go
@@ -96,10 +96,9 @@ func CollectAccount(taskCtx core.SubTaskContext) errors.Error {
 			},
 			Table: RAW_ACCOUNTS_TABLE,
 		},
-		IgnoreQueryErr: true,
-		Input:          iterator,
-		InputStep:      100,
-		GraphqlClient:  data.GraphqlClient,
+		Input:         iterator,
+		InputStep:     100,
+		GraphqlClient: data.GraphqlClient,
 		BuildQuery: func(reqData *helper.GraphqlRequestData) (interface{}, map[string]interface{}, error) {
 			accounts := reqData.Input.([]interface{})
 			query := &GraphqlQueryAccountWrapper{}
@@ -115,7 +114,11 @@ func CollectAccount(taskCtx core.SubTaskContext) errors.Error {
 			}
 			return query, variables, nil
 		},
-		ResponseParser: func(iQuery interface{}, variables map[string]interface{}) ([]interface{}, error) {
+		ResponseParserWithDataErrors: func(iQuery interface{}, variables map[string]interface{}, dataErrors []graphql.DataError) ([]interface{}, error) {
+			for _, dataError := range dataErrors {
+				// log and ignore
+				taskCtx.GetLogger().Warn(dataError, `query user get error but ignore`)
+			}
 			query := iQuery.(*GraphqlQueryAccountWrapper)
 			accounts := query.Users
 
diff --git a/plugins/github_graphql/tasks/issue_collector.go b/plugins/github_graphql/tasks/issue_collector.go
index f54e4cee..9ceb85a3 100644
--- a/plugins/github_graphql/tasks/issue_collector.go
+++ b/plugins/github_graphql/tasks/issue_collector.go
@@ -25,6 +25,7 @@ import (
 	githubTasks "github.com/apache/incubator-devlake/plugins/github/tasks"
 	"github.com/apache/incubator-devlake/plugins/helper"
 	"github.com/merico-dev/graphql"
+	"strings"
 	"time"
 )
 
@@ -170,7 +171,7 @@ func convertGithubIssue(issue GraphqlQueryIssue, connectionId uint64, repository
 		Number:          issue.Number,
 		State:           issue.State,
 		Title:           issue.Title,
-		Body:            issue.Body,
+		Body:            strings.ReplaceAll(issue.Body, "\x00", `<0x00>`),
 		Url:             issue.Url,
 		ClosedAt:        issue.ClosedAt,
 		GithubCreatedAt: issue.CreatedAt,
diff --git a/plugins/helper/graphql_async_client.go b/plugins/helper/graphql_async_client.go
index fcd46aef..765bf7d3 100644
--- a/plugins/helper/graphql_async_client.go
+++ b/plugins/helper/graphql_async_client.go
@@ -19,23 +19,27 @@ package helper
 
 import (
 	"context"
+	"fmt"
+	"sync"
+	"time"
+
 	"github.com/apache/incubator-devlake/errors"
 	"github.com/apache/incubator-devlake/plugins/core"
+	"github.com/apache/incubator-devlake/utils"
 	"github.com/merico-dev/graphql"
-	"sync"
-	"time"
 )
 
 // GraphqlAsyncClient send graphql one by one
 type GraphqlAsyncClient struct {
-	ctx          context.Context
-	cancel       context.CancelFunc
-	client       *graphql.Client
-	logger       core.Logger
-	mu           sync.Mutex
-	waitGroup    sync.WaitGroup
-	workerErrors []error
+	ctx       context.Context
+	cancel    context.CancelFunc
+	client    *graphql.Client
+	logger    core.Logger
+	mu        sync.Mutex
+	waitGroup sync.WaitGroup
 
+	maxRetry         int
+	waitBeforeRetry  time.Duration
 	rateExhaustCond  *sync.Cond
 	rateRemaining    int
 	getRateRemaining func(context.Context, *graphql.Client, core.Logger) (rateRemaining int, resetAt *time.Time, err errors.Error)
@@ -44,12 +48,12 @@ type GraphqlAsyncClient struct {
 
 // CreateAsyncGraphqlClient creates a new GraphqlAsyncClient
 func CreateAsyncGraphqlClient(
-	ctx context.Context,
+	taskCtx core.TaskContext,
 	graphqlClient *graphql.Client,
 	logger core.Logger,
 	getRateRemaining func(context.Context, *graphql.Client, core.Logger) (rateRemaining int, resetAt *time.Time, err errors.Error),
-) *GraphqlAsyncClient {
-	ctxWithCancel, cancel := context.WithCancel(ctx)
+) (*GraphqlAsyncClient, errors.Error) {
+	ctxWithCancel, cancel := context.WithCancel(taskCtx.GetContext())
 	graphqlAsyncClient := &GraphqlAsyncClient{
 		ctx:              ctxWithCancel,
 		cancel:           cancel,
@@ -59,14 +63,48 @@ func CreateAsyncGraphqlClient(
 		rateRemaining:    0,
 		getRateRemaining: getRateRemaining,
 	}
+
 	if getRateRemaining != nil {
-		rateRemaining, resetAt, err := getRateRemaining(ctx, graphqlClient, logger)
+		rateRemaining, resetAt, err := getRateRemaining(taskCtx.GetContext(), graphqlClient, logger)
 		if err != nil {
 			panic(err)
 		}
 		graphqlAsyncClient.updateRateRemaining(rateRemaining, resetAt)
 	}
-	return graphqlAsyncClient
+
+	// load retry/timeout from configuration
+	// use API_RETRY as max retry time
+	// use API_TIMEOUT as retry before wait seconds to confirm the prev request finish
+	timeout := 30 * time.Second
+	retry, err := utils.StrToIntOr(taskCtx.GetConfig("API_RETRY"), 3)
+	if err != nil {
+		return nil, errors.BadInput.Wrap(err, "failed to parse API_RETRY")
+	}
+	timeoutConf := taskCtx.GetConfig("API_TIMEOUT")
+	if timeoutConf != "" {
+		// override timeout value if API_TIMEOUT is provided
+		timeout, err = errors.Convert01(time.ParseDuration(timeoutConf))
+		if err != nil {
+			return nil, errors.BadInput.Wrap(err, "failed to parse API_TIMEOUT")
+		}
+	}
+	graphqlAsyncClient.SetMaxRetry(retry, timeout)
+
+	return graphqlAsyncClient, nil
+}
+
+// GetMaxRetry returns the maximum retry attempts for a request
+func (apiClient *GraphqlAsyncClient) GetMaxRetry() (int, time.Duration) {
+	return apiClient.maxRetry, apiClient.waitBeforeRetry
+}
+
+// SetMaxRetry sets the maximum retry attempts for a request
+func (apiClient *GraphqlAsyncClient) SetMaxRetry(
+	maxRetry int,
+	waitBeforeRetry time.Duration,
+) {
+	apiClient.maxRetry = maxRetry
+	apiClient.waitBeforeRetry = waitBeforeRetry
 }
 
 // updateRateRemaining call getRateRemaining to update rateRemaining periodically
@@ -80,12 +118,16 @@ func (apiClient *GraphqlAsyncClient) updateRateRemaining(rateRemaining int, rese
 		if resetAt != nil && resetAt.After(time.Now()) {
 			nextDuring = time.Until(*resetAt)
 		}
-		<-time.After(nextDuring)
-		newRateRemaining, newResetAt, err := apiClient.getRateRemaining(apiClient.ctx, apiClient.client, apiClient.logger)
-		if err != nil {
-			panic(err)
+		select {
+		case <-apiClient.ctx.Done():
+			return
+		case <-time.After(nextDuring):
+			newRateRemaining, newResetAt, err := apiClient.getRateRemaining(apiClient.ctx, apiClient.client, apiClient.logger)
+			if err != nil {
+				panic(err)
+			}
+			apiClient.updateRateRemaining(newRateRemaining, newResetAt)
 		}
-		apiClient.updateRateRemaining(newRateRemaining, newResetAt)
 	}()
 }
 
@@ -96,7 +138,9 @@ func (apiClient *GraphqlAsyncClient) SetGetRateCost(getRateCost func(q interface
 }
 
 // Query send a graphql request when get lock
-func (apiClient *GraphqlAsyncClient) Query(q interface{}, variables map[string]interface{}) errors.Error {
+// []graphql.DataError are the errors returned in response body
+// errors.Error is other error
+func (apiClient *GraphqlAsyncClient) Query(q interface{}, variables map[string]interface{}) ([]graphql.DataError, errors.Error) {
 	apiClient.waitGroup.Add(1)
 	defer apiClient.waitGroup.Done()
 	apiClient.mu.Lock()
@@ -108,26 +152,40 @@ func (apiClient *GraphqlAsyncClient) Query(q interface{}, variables map[string]i
 		apiClient.logger.Info(`rate limit remaining exhausted, waiting for next period.`)
 		apiClient.rateExhaustCond.Wait()
 	}
-	select {
-	case <-apiClient.ctx.Done():
-		return nil
-	default:
-		err := apiClient.client.Query(apiClient.ctx, q, variables)
-		if err != nil {
-			return errors.Default.Wrap(err, "error making GraphQL call")
-		}
-		cost := 1
-		if apiClient.getRateCost != nil {
-			cost = apiClient.getRateCost(q)
+
+	retryTime := 0
+	var err error
+	//  if it needs retry, check and retry
+	for retryTime < apiClient.maxRetry {
+		select {
+		case <-apiClient.ctx.Done():
+			return nil, nil
+		default:
+			var dataErrors []graphql.DataError
+			dataErrors, err := apiClient.client.Query(apiClient.ctx, q, variables)
+			if err != nil {
+				apiClient.logger.Warn(err, "retry #%d graphql calling after %ds", retryTime, apiClient.waitBeforeRetry/time.Second)
+				retryTime++
+				<-time.After(apiClient.waitBeforeRetry)
+				continue
+			}
+			if dataErrors != nil {
+				return dataErrors, nil
+			}
+			cost := 1
+			if apiClient.getRateCost != nil {
+				cost = apiClient.getRateCost(q)
+			}
+			apiClient.rateRemaining -= cost
+			apiClient.logger.Debug(`query cost %d in %v`, cost, variables)
+			return nil, nil
 		}
-		apiClient.rateRemaining -= cost
-		apiClient.logger.Debug(`query cost %d in %v`, cost, variables)
-		return nil
 	}
+	return nil, errors.Default.Wrap(err, fmt.Sprintf("got error when querying GraphQL (from the %dth retry)", retryTime))
 }
 
 // NextTick to return the NextTick of scheduler
-func (apiClient *GraphqlAsyncClient) NextTick(task func() errors.Error) {
+func (apiClient *GraphqlAsyncClient) NextTick(task func() errors.Error, taskErrorChecker func(err errors.Error)) {
 	// to make sure task will be enqueued
 	apiClient.waitGroup.Add(1)
 	go func() {
@@ -140,29 +198,18 @@ func (apiClient *GraphqlAsyncClient) NextTick(task func() errors.Error) {
 				// But if done out of this go func, so task will run after waitGroup finish
 				// I have no idea about this now...
 				defer apiClient.waitGroup.Done()
-				apiClient.checkError(task())
+				taskErrorChecker(task())
 			}()
 		}
 	}()
 }
 
 // Wait blocks until all async requests were done
-func (apiClient *GraphqlAsyncClient) Wait() errors.Error {
+func (apiClient *GraphqlAsyncClient) Wait() {
 	apiClient.waitGroup.Wait()
-	if len(apiClient.workerErrors) > 0 {
-		return errors.Default.Combine(apiClient.workerErrors)
-	}
-	return nil
-}
-
-func (apiClient *GraphqlAsyncClient) checkError(err errors.Error) {
-	if err == nil {
-		return
-	}
-	apiClient.workerErrors = append(apiClient.workerErrors, err)
 }
 
-// HasError return if any error occurred
-func (apiClient *GraphqlAsyncClient) HasError() bool {
-	return len(apiClient.workerErrors) > 0
+// Release will release the ApiAsyncClient with scheduler
+func (apiClient *GraphqlAsyncClient) Release() {
+	apiClient.cancel()
 }
diff --git a/plugins/helper/graphql_collector.go b/plugins/helper/graphql_collector.go
index 07999394..1091e199 100644
--- a/plugins/helper/graphql_collector.go
+++ b/plugins/helper/graphql_collector.go
@@ -65,18 +65,20 @@ type GraphqlCollectorArgs struct {
 	Input Iterator
 	// how many times fetched from input, default 1 means only fetch once
 	// NOTICE: InputStep=1 will fill value as item and InputStep>1 will fill value as []item
-	InputStep      int
-	IgnoreQueryErr bool
+	InputStep int
 	// GetPageInfo is to tell `GraphqlCollector` is page information
-	GetPageInfo    func(query interface{}, args *GraphqlCollectorArgs) (*GraphqlQueryPageInfo, error)
-	BatchSize      int
-	ResponseParser func(query interface{}, variables map[string]interface{}) ([]interface{}, error)
+	GetPageInfo func(query interface{}, args *GraphqlCollectorArgs) (*GraphqlQueryPageInfo, error)
+	BatchSize   int
+	// one of ResponseParser and ResponseParserEvenWhenDataErrors is required to parse response
+	ResponseParser               func(query interface{}, variables map[string]interface{}) ([]interface{}, error)
+	ResponseParserWithDataErrors func(query interface{}, variables map[string]interface{}, dataErrors []graphql.DataError) ([]interface{}, error)
 }
 
 // GraphqlCollector help you collect data from Graphql services
 type GraphqlCollector struct {
 	*RawDataSubTask
-	args *GraphqlCollectorArgs
+	args         *GraphqlCollectorArgs
+	workerErrors []error
 }
 
 // NewGraphqlCollector allocates a new GraphqlCollector with the given args.
@@ -94,8 +96,8 @@ func NewGraphqlCollector(args GraphqlCollectorArgs) (*GraphqlCollector, errors.E
 	if args.GraphqlClient == nil {
 		return nil, errors.Default.New("ApiClient is required")
 	}
-	if args.ResponseParser == nil {
-		return nil, errors.Default.New("ResponseParser is required")
+	if args.ResponseParser == nil && args.ResponseParserWithDataErrors == nil {
+		return nil, errors.Default.New("one of ResponseParser and ResponseParserWithDataErrors is required")
 	}
 	apicllector := &GraphqlCollector{
 		RawDataSubTask: rawDataSubTask,
@@ -143,23 +145,30 @@ func (collector *GraphqlCollector) Execute() errors.Error {
 	if collector.args.Input != nil {
 		iterator := collector.args.Input
 		defer iterator.Close()
-		apiClient := collector.args.GraphqlClient
-		for iterator.HasNext() && !apiClient.HasError() {
-			if collector.args.InputStep == 1 {
+		// the comment about difference is written at GraphqlCollectorArgs.InputStep
+		if collector.args.InputStep == 1 {
+			for iterator.HasNext() && !collector.HasError() {
 				input, err := iterator.Fetch()
 				if err != nil {
+					collector.checkError(err)
 					break
 				}
 				collector.exec(divider, input)
-			} else {
+			}
+		} else {
+			for !collector.HasError() {
 				var inputs []interface{}
 				for i := 0; i < collector.args.InputStep && iterator.HasNext(); i++ {
 					input, err := iterator.Fetch()
 					if err != nil {
+						collector.checkError(err)
 						break
 					}
 					inputs = append(inputs, input)
 				}
+				if inputs == nil {
+					break
+				}
 				collector.exec(divider, inputs)
 			}
 		}
@@ -169,23 +178,25 @@ func (collector *GraphqlCollector) Execute() errors.Error {
 	}
 
 	logger.Debug("wait for all async api to finished")
+	collector.args.GraphqlClient.Wait()
 
-	err = collector.args.GraphqlClient.Wait()
-	if err != nil {
-		err = errors.Default.Wrap(err, "ended API collector execution with error")
-		logger.Error(err, "")
+	if collector.HasError() {
+		err = errors.Default.Combine(collector.workerErrors)
+		logger.Error(err, "ended Graphql collector execution with error")
+		logger.Error(collector.workerErrors[0], "the first error of them")
+		return err
 	} else {
 		logger.Info("ended api collection without error")
 	}
-	err = divider.Close()
 
+	err = divider.Close()
 	return err
 }
 
 func (collector *GraphqlCollector) exec(divider *BatchSaveDivider, input interface{}) {
 	inputJson, err := json.Marshal(input)
 	if err != nil {
-		panic(err)
+		collector.checkError(errors.Default.Wrap(err, `input can not be marshal to json`))
 	}
 	reqData := new(GraphqlRequestData)
 	reqData.Input = input
@@ -210,6 +221,9 @@ func (collector *GraphqlCollector) fetchOneByOne(divider *BatchSaveDivider, reqD
 		if err != nil {
 			return errors.Default.Wrap(err, "fetchPagesDetermined get totalPages failed")
 		}
+		if pageInfo == nil {
+			return errors.Default.New("fetchPagesDetermined got pageInfo is nil")
+		}
 		if pageInfo.HasNextPage {
 			collector.args.GraphqlClient.NextTick(func() errors.Error {
 				reqDataTemp := &GraphqlRequestData{
@@ -222,7 +236,7 @@ func (collector *GraphqlCollector) fetchOneByOne(divider *BatchSaveDivider, reqD
 				}
 				collector.fetchAsync(divider, reqDataTemp, fetchNextPage)
 				return nil
-			})
+			}, collector.checkError)
 		}
 		return nil
 	}
@@ -238,30 +252,36 @@ func (collector *GraphqlCollector) fetchAsync(divider *BatchSaveDivider, reqData
 	}
 	query, variables, err := collector.args.BuildQuery(reqData)
 	if err != nil {
-		panic(err)
+		collector.checkError(errors.Default.Wrap(err, `graphql collector BuildQuery failed`))
+		return
 	}
 
 	logger := collector.args.Ctx.GetLogger()
-	err = collector.args.GraphqlClient.Query(query, variables)
+	dataErrors, err := collector.args.GraphqlClient.Query(query, variables)
 	if err != nil {
-		if collector.args.IgnoreQueryErr {
-			logger.Error(err, "fetchAsync failed")
+		collector.checkError(errors.Default.Wrap(err, `graphql query failed`))
+		return
+	}
+	if len(dataErrors) > 0 {
+		if collector.args.ResponseParserWithDataErrors == nil {
+			collector.checkError(errors.Default.Wrap(err, `graphql query got error`))
 			return
-		} else {
-			panic(err)
 		}
+		// else: error will deal by ResponseParserWithDataErrors
 	}
 	defer logger.Debug("fetchAsync >>> done for %v %v", query, variables)
 
 	paramsBytes, err := json.Marshal(query)
 	if err != nil {
-		panic(err)
+		collector.checkError(errors.Default.Wrap(err, `graphql collector marshal query failed`))
+		return
 	}
 	db := collector.args.Ctx.GetDal()
 	queryStr, _ := graphql.ConstructQuery(query, variables)
 	variablesJson, err := json.Marshal(variables)
 	if err != nil {
-		panic(err)
+		collector.checkError(errors.Default.Wrap(err, `variables in graphql query can not marshal to json`))
+		return
 	}
 	row := &RawData{
 		Params: collector.params,
@@ -271,12 +291,21 @@ func (collector *GraphqlCollector) fetchAsync(divider *BatchSaveDivider, reqData
 	}
 	err = db.Create(row, dal.From(collector.table))
 	if err != nil {
-		panic(err)
+		collector.checkError(errors.Default.Wrap(err, `not created row table in graphql collector`))
+		return
 	}
 
-	results, err := collector.args.ResponseParser(query, variables)
+	var (
+		results []interface{}
+	)
+	if len(dataErrors) > 0 || collector.args.ResponseParser == nil {
+		results, err = collector.args.ResponseParserWithDataErrors(query, variables, dataErrors)
+	} else {
+		results, err = collector.args.ResponseParser(query, variables)
+	}
 	if err != nil {
-		panic(err)
+		collector.checkError(errors.Default.Wrap(err, `not parsed response in graphql collector`))
+		return
 	}
 
 	RAW_DATA_ORIGIN := "RawDataOrigin"
@@ -285,7 +314,8 @@ func (collector *GraphqlCollector) fetchAsync(divider *BatchSaveDivider, reqData
 		// get the batch operator for the specific type
 		batch, err := divider.ForType(reflect.TypeOf(result))
 		if err != nil {
-			panic(err)
+			collector.checkError(err)
+			return
 		}
 		// set raw data origin field
 		origin := reflect.ValueOf(result).Elem().FieldByName(RAW_DATA_ORIGIN)
@@ -299,20 +329,31 @@ func (collector *GraphqlCollector) fetchAsync(divider *BatchSaveDivider, reqData
 		// records get saved into db when slots were max outed
 		err = batch.Add(result)
 		if err != nil {
-			panic(err)
+			collector.checkError(err)
+			return
 		}
 		collector.args.Ctx.IncProgress(1)
 	}
-	if err != nil {
-		panic(err)
-	}
 	collector.args.Ctx.IncProgress(1)
 	if handler != nil {
 		err = handler(query)
 		if err != nil {
-			panic(err)
+			collector.checkError(errors.Default.Wrap(err, `handle failed in graphql collector`))
+			return
 		}
 	}
 }
 
-var _ core.SubTask = (*ApiCollector)(nil)
+func (collector *GraphqlCollector) checkError(err errors.Error) {
+	if err == nil {
+		return
+	}
+	collector.workerErrors = append(collector.workerErrors, err)
+}
+
+// HasError return if any error occurred
+func (collector *GraphqlCollector) HasError() bool {
+	return len(collector.workerErrors) > 0
+}
+
+var _ core.SubTask = (*GraphqlCollector)(nil)
diff --git a/scripts/compile-plugins.sh b/scripts/compile-plugins.sh
index 36fdd7c7..f14eb74e 100755
--- a/scripts/compile-plugins.sh
+++ b/scripts/compile-plugins.sh
@@ -21,18 +21,21 @@
 #
 # compile specific plugin and fire up api server:
 #   PLUGIN=<PLUGIN_NAME> make dev
+#   PLUGIN=<PLUGIN_NAME> PLUGIN2=<PLUGIN_NAME2> make dev
 #
 # compile all plugins and fire up api server in DEBUG MODE with `delve`:
 #   make debug
 #
 # compile specific plugin and fire up api server in DEBUG MODE with `delve`:
 #   PLUGIN=<PLUGIN_NAME> make dev
+#   PLUGIN=<PLUGIN_NAME> PLUGIN2=<PLUGIN_NAME> make dev
 
 set -e
 
 echo "Usage: "
 echo "  build all plugins:              $0 [golang build flags...]"
 echo "  build and keep one plugin only: PLUGIN=jira $0 [golang build flags...]"
+echo "  build and keep two plugin only: PLUGIN=jira PLUGIN2=github $0 [golang build flags...]"
 
 SCRIPT_DIR="$( cd "$( dirname "$0" )" && pwd )"
 PLUGIN_SRC_DIR=$SCRIPT_DIR/../plugins
@@ -44,6 +47,10 @@ else
     PLUGINS=$PLUGIN_SRC_DIR/$PLUGIN
 fi
 
+if [ $PLUGIN ] && [ $PLUGIN2 ]; then
+    PLUGINS="$PLUGINS $PLUGIN_SRC_DIR/$PLUGIN2"
+fi
+
 rm -rf $PLUGIN_OUTPUT_DIR/*
 
 PIDS=""