You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@devlake.apache.org by wa...@apache.org on 2023/01/11 03:39:04 UTC

[incubator-devlake] branch main updated: fix: fix graphql cannot cancel (#4190)

This is an automated email from the ASF dual-hosted git repository.

warren pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git


The following commit(s) were added to refs/heads/main by this push:
     new 6ebc789dd fix: fix graphql cannot cancel (#4190)
6ebc789dd is described below

commit 6ebc789dd432703bc57175e590d09426ff82a715
Author: Likyh <ya...@meri.co>
AuthorDate: Wed Jan 11 11:39:00 2023 +0800

    fix: fix graphql cannot cancel (#4190)
    
    * fix: fix graphql cannot cancel
    
    * fix: fix for review
    
    * fix: for review
---
 plugins/helper/graphql_async_client.go | 10 +++++++---
 plugins/helper/graphql_collector.go    | 10 ++++++++--
 2 files changed, 15 insertions(+), 5 deletions(-)

diff --git a/plugins/helper/graphql_async_client.go b/plugins/helper/graphql_async_client.go
index 765bf7d36..8c9ca44e6 100644
--- a/plugins/helper/graphql_async_client.go
+++ b/plugins/helper/graphql_async_client.go
@@ -120,6 +120,7 @@ func (apiClient *GraphqlAsyncClient) updateRateRemaining(rateRemaining int, rese
 		}
 		select {
 		case <-apiClient.ctx.Done():
+			// finish go routine when context done
 			return
 		case <-time.After(nextDuring):
 			newRateRemaining, newResetAt, err := apiClient.getRateRemaining(apiClient.ctx, apiClient.client, apiClient.logger)
@@ -140,7 +141,7 @@ func (apiClient *GraphqlAsyncClient) SetGetRateCost(getRateCost func(q interface
 // Query send a graphql request when get lock
 // []graphql.DataError are the errors returned in response body
 // errors.Error is other error
-func (apiClient *GraphqlAsyncClient) Query(q interface{}, variables map[string]interface{}) ([]graphql.DataError, errors.Error) {
+func (apiClient *GraphqlAsyncClient) Query(q interface{}, variables map[string]interface{}) ([]graphql.DataError, error) {
 	apiClient.waitGroup.Add(1)
 	defer apiClient.waitGroup.Done()
 	apiClient.mu.Lock()
@@ -163,6 +164,9 @@ func (apiClient *GraphqlAsyncClient) Query(q interface{}, variables map[string]i
 		default:
 			var dataErrors []graphql.DataError
 			dataErrors, err := apiClient.client.Query(apiClient.ctx, q, variables)
+			if err == context.Canceled {
+				return nil, err
+			}
 			if err != nil {
 				apiClient.logger.Warn(err, "retry #%d graphql calling after %ds", retryTime, apiClient.waitBeforeRetry/time.Second)
 				retryTime++
@@ -185,7 +189,7 @@ func (apiClient *GraphqlAsyncClient) Query(q interface{}, variables map[string]i
 }
 
 // NextTick to return the NextTick of scheduler
-func (apiClient *GraphqlAsyncClient) NextTick(task func() errors.Error, taskErrorChecker func(err errors.Error)) {
+func (apiClient *GraphqlAsyncClient) NextTick(task func() errors.Error, taskErrorChecker func(err error)) {
 	// to make sure task will be enqueued
 	apiClient.waitGroup.Add(1)
 	go func() {
@@ -194,7 +198,7 @@ func (apiClient *GraphqlAsyncClient) NextTick(task func() errors.Error, taskErro
 			return
 		default:
 			go func() {
-				// if set waitGroup done here, a serial of goruntine will block until son goruntine finish.
+				// if set waitGroup done here, a serial of goroutine will block until sub-goroutine finish.
 				// But if done out of this go func, so task will run after waitGroup finish
 				// I have no idea about this now...
 				defer apiClient.waitGroup.Done()
diff --git a/plugins/helper/graphql_collector.go b/plugins/helper/graphql_collector.go
index 277fa3f79..3e59ec476 100644
--- a/plugins/helper/graphql_collector.go
+++ b/plugins/helper/graphql_collector.go
@@ -18,6 +18,7 @@ limitations under the License.
 package helper
 
 import (
+	"context"
 	"encoding/json"
 	"github.com/apache/incubator-devlake/errors"
 	"github.com/apache/incubator-devlake/models/common"
@@ -256,7 +257,12 @@ func (collector *GraphqlCollector) fetchAsync(divider *BatchSaveDivider, reqData
 	logger := collector.args.Ctx.GetLogger()
 	dataErrors, err := collector.args.GraphqlClient.Query(query, variables)
 	if err != nil {
-		collector.checkError(errors.Default.Wrap(err, `graphql query failed`))
+		if err == context.Canceled {
+			// direct error message for error combine
+			collector.checkError(err)
+		} else {
+			collector.checkError(errors.Default.Wrap(err, `graphql query failed`))
+		}
 		return
 	}
 	if len(dataErrors) > 0 {
@@ -349,7 +355,7 @@ func (collector *GraphqlCollector) fetchAsync(divider *BatchSaveDivider, reqData
 	}
 }
 
-func (collector *GraphqlCollector) checkError(err errors.Error) {
+func (collector *GraphqlCollector) checkError(err error) {
 	if err == nil {
 		return
 	}