You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@devlake.apache.org by wa...@apache.org on 2023/01/11 03:39:04 UTC
[incubator-devlake] branch main updated: fix: fix graphql cannot cancel (#4190)
This is an automated email from the ASF dual-hosted git repository.
warren pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git
The following commit(s) were added to refs/heads/main by this push:
new 6ebc789dd fix: fix graphql cannot cancel (#4190)
6ebc789dd is described below
commit 6ebc789dd432703bc57175e590d09426ff82a715
Author: Likyh <ya...@meri.co>
AuthorDate: Wed Jan 11 11:39:00 2023 +0800
fix: fix graphql cannot cancel (#4190)
* fix: fix graphql cannot cancel
* fix: fix for review
* fix: for review
---
plugins/helper/graphql_async_client.go | 10 +++++++---
plugins/helper/graphql_collector.go | 10 ++++++++--
2 files changed, 15 insertions(+), 5 deletions(-)
diff --git a/plugins/helper/graphql_async_client.go b/plugins/helper/graphql_async_client.go
index 765bf7d36..8c9ca44e6 100644
--- a/plugins/helper/graphql_async_client.go
+++ b/plugins/helper/graphql_async_client.go
@@ -120,6 +120,7 @@ func (apiClient *GraphqlAsyncClient) updateRateRemaining(rateRemaining int, rese
}
select {
case <-apiClient.ctx.Done():
+ // finish go routine when context done
return
case <-time.After(nextDuring):
newRateRemaining, newResetAt, err := apiClient.getRateRemaining(apiClient.ctx, apiClient.client, apiClient.logger)
@@ -140,7 +141,7 @@ func (apiClient *GraphqlAsyncClient) SetGetRateCost(getRateCost func(q interface
// Query send a graphql request when get lock
// []graphql.DataError are the errors returned in response body
// errors.Error is other error
-func (apiClient *GraphqlAsyncClient) Query(q interface{}, variables map[string]interface{}) ([]graphql.DataError, errors.Error) {
+func (apiClient *GraphqlAsyncClient) Query(q interface{}, variables map[string]interface{}) ([]graphql.DataError, error) {
apiClient.waitGroup.Add(1)
defer apiClient.waitGroup.Done()
apiClient.mu.Lock()
@@ -163,6 +164,9 @@ func (apiClient *GraphqlAsyncClient) Query(q interface{}, variables map[string]i
default:
var dataErrors []graphql.DataError
dataErrors, err := apiClient.client.Query(apiClient.ctx, q, variables)
+ if err == context.Canceled {
+ return nil, err
+ }
if err != nil {
apiClient.logger.Warn(err, "retry #%d graphql calling after %ds", retryTime, apiClient.waitBeforeRetry/time.Second)
retryTime++
@@ -185,7 +189,7 @@ func (apiClient *GraphqlAsyncClient) Query(q interface{}, variables map[string]i
}
// NextTick to return the NextTick of scheduler
-func (apiClient *GraphqlAsyncClient) NextTick(task func() errors.Error, taskErrorChecker func(err errors.Error)) {
+func (apiClient *GraphqlAsyncClient) NextTick(task func() errors.Error, taskErrorChecker func(err error)) {
// to make sure task will be enqueued
apiClient.waitGroup.Add(1)
go func() {
@@ -194,7 +198,7 @@ func (apiClient *GraphqlAsyncClient) NextTick(task func() errors.Error, taskErro
return
default:
go func() {
- // if set waitGroup done here, a serial of goruntine will block until son goruntine finish.
+ // if set waitGroup done here, a serial of goroutine will block until sub-goroutine finish.
// But if done out of this go func, so task will run after waitGroup finish
// I have no idea about this now...
defer apiClient.waitGroup.Done()
diff --git a/plugins/helper/graphql_collector.go b/plugins/helper/graphql_collector.go
index 277fa3f79..3e59ec476 100644
--- a/plugins/helper/graphql_collector.go
+++ b/plugins/helper/graphql_collector.go
@@ -18,6 +18,7 @@ limitations under the License.
package helper
import (
+ "context"
"encoding/json"
"github.com/apache/incubator-devlake/errors"
"github.com/apache/incubator-devlake/models/common"
@@ -256,7 +257,12 @@ func (collector *GraphqlCollector) fetchAsync(divider *BatchSaveDivider, reqData
logger := collector.args.Ctx.GetLogger()
dataErrors, err := collector.args.GraphqlClient.Query(query, variables)
if err != nil {
- collector.checkError(errors.Default.Wrap(err, `graphql query failed`))
+ if err == context.Canceled {
+ // direct error message for error combine
+ collector.checkError(err)
+ } else {
+ collector.checkError(errors.Default.Wrap(err, `graphql query failed`))
+ }
return
}
if len(dataErrors) > 0 {
@@ -349,7 +355,7 @@ func (collector *GraphqlCollector) fetchAsync(divider *BatchSaveDivider, reqData
}
}
-func (collector *GraphqlCollector) checkError(err errors.Error) {
+func (collector *GraphqlCollector) checkError(err error) {
if err == nil {
return
}