You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@devlake.apache.org by wa...@apache.org on 2022/06/13 06:25:48 UTC

[incubator-devlake] 04/04: fix: worker_scheduler wouldn't halt on error

This is an automated email from the ASF dual-hosted git repository.

warren pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git

commit d5140494aa3888c0e166e0aee1ff40f939839386
Author: Klesh Wong <zh...@merico.dev>
AuthorDate: Sat Jun 11 19:23:33 2022 +0800

    fix: worker_scheduler wouldn't halt on error
---
 plugins/helper/api_async_client.go |  6 ++--
 plugins/helper/worker_scheduler.go | 61 ++++++++++++++++----------------------
 2 files changed, 28 insertions(+), 39 deletions(-)

diff --git a/plugins/helper/api_async_client.go b/plugins/helper/api_async_client.go
index fb706512..f29a7c50 100644
--- a/plugins/helper/api_async_client.go
+++ b/plugins/helper/api_async_client.go
@@ -41,7 +41,6 @@ type ApiAsyncClient struct {
 	*ApiClient
 	maxRetry     int
 	scheduler    *WorkerScheduler
-	hasError     bool
 	numOfWorkers int
 }
 
@@ -111,7 +110,6 @@ func CreateAsyncApiClient(
 		apiClient,
 		retry,
 		scheduler,
-		false,
 		numOfWorkers,
 	}, nil
 }
@@ -178,7 +176,7 @@ func (apiClient *ApiAsyncClient) DoAsync(
 		}
 
 		if err != nil {
-			apiClient.hasError = true
+			apiClient.logger.Error("retry exceeded times: %d, err: %s", retry, err.Error())
 			return err
 		}
 
@@ -205,7 +203,7 @@ func (apiClient *ApiAsyncClient) WaitAsync() error {
 }
 
 func (apiClient *ApiAsyncClient) HasError() bool {
-	return apiClient.hasError
+	return apiClient.scheduler.HasError()
 }
 
 func (apiClient *ApiAsyncClient) NextTick(task func() error) {
diff --git a/plugins/helper/worker_scheduler.go b/plugins/helper/worker_scheduler.go
index 04a0be47..2c7c4134 100644
--- a/plugins/helper/worker_scheduler.go
+++ b/plugins/helper/worker_scheduler.go
@@ -20,13 +20,11 @@ package helper
 import (
 	"context"
 	"fmt"
-	"os"
 	"sync"
 	"sync/atomic"
 	"time"
 
 	"github.com/apache/incubator-devlake/plugins/core"
-	"github.com/apache/incubator-devlake/utils"
 	ants "github.com/panjf2000/ants/v2"
 )
 
@@ -42,7 +40,7 @@ type WorkerScheduler struct {
 	logger       core.Logger
 }
 
-var callframeEnabled = os.Getenv("ASYNC_CF") == "true"
+//var callframeEnabled = os.Getenv("ASYNC_CF") == "true"
 
 // NewWorkerScheduler creates a WorkerScheduler
 func NewWorkerScheduler(
@@ -65,9 +63,7 @@ func NewWorkerScheduler(
 		logger: logger,
 	}
 	pool, err := ants.NewPool(workerNum, ants.WithPanicHandler(func(i interface{}) {
-		s.mu.Lock()
-		defer s.mu.Unlock()
-		s.workerErrors = append(s.workerErrors, i.(error))
+		s.checkError(i)
 	}))
 	if err != nil {
 		return nil, err
@@ -84,53 +80,35 @@ func NewWorkerScheduler(
 // IMPORTANT: do NOT call SubmitBlocking inside the async task, it is likely to cause a deadlock, call
 // SubmitNonBlocking instead when number of tasks is relatively small.
 func (s *WorkerScheduler) SubmitBlocking(task func() error) {
-	// this is expensive, enable by EnvVar
-	cf := s.gatherCallFrames()
-	// to make sure task is done
-	if len(s.workerErrors) > 0 {
-		// not point to continue
+	if s.HasError() {
 		return
 	}
 	s.waitGroup.Add(1)
-	err := s.pool.Submit(func() {
+	s.checkError(s.pool.Submit(func() {
 		defer s.waitGroup.Done()
 
 		id := atomic.AddInt32(&s.counter, 1)
 		s.logger.Debug("schedulerJob >>> %d started", id)
 		defer s.logger.Debug("schedulerJob <<< %d ended", id)
 
-		if len(s.workerErrors) > 0 {
-			// not point to continue
+		if s.HasError() {
 			return
 		}
-		// wait for rate limit throttling
-
-		// try recover
-		defer func() {
-			r := recover()
-			if r != nil {
-				s.appendError(fmt.Errorf("%s\n%s", r, cf))
-			}
-		}()
 
 		// normal error
-		var err error
 		select {
 		case <-s.ctx.Done():
-			err = s.ctx.Err()
+			panic(s.ctx.Err())
 		case <-s.ticker.C:
-			err = task()
-		}
-		if err != nil {
-			s.appendError(err)
+			err := task()
+			if err != nil {
+				panic(err)
+			}
 		}
-	})
-	// failed to submit, note that this is not task erro
-	if err != nil {
-		s.appendError(fmt.Errorf("%s\n%s", err, cf))
-	}
+	}))
 }
 
+/*
 func (s *WorkerScheduler) gatherCallFrames() string {
 	cf := "set Environment Varaible ASYNC_CF=true to enable callframes capturing"
 	if callframeEnabled {
@@ -138,6 +116,7 @@ func (s *WorkerScheduler) gatherCallFrames() string {
 	}
 	return cf
 }
+*/
 
 func (s *WorkerScheduler) appendError(err error) {
 	s.mu.Lock()
@@ -145,6 +124,18 @@ func (s *WorkerScheduler) appendError(err error) {
 	s.workerErrors = append(s.workerErrors, err)
 }
 
+func (s *WorkerScheduler) checkError(err interface{}) {
+	if err == nil {
+		return
+	}
+	s.appendError(err.(error))
+}
+
+// HasError return if any error occurred
+func (s *WorkerScheduler) HasError() bool {
+	return len(s.workerErrors) > 0
+}
+
 // NextTick enqueues task in a NonBlocking manner, you should only call this method within task submitted by
 // SubmitBlocking method
 // IMPORTANT: do NOT call this method with a huge number of tasks, it is likely to eat up all available memory
@@ -153,7 +144,7 @@ func (s *WorkerScheduler) NextTick(task func() error) {
 	s.waitGroup.Add(1)
 	go func() {
 		defer s.waitGroup.Done()
-		task() // nolint
+		s.checkError(task())
 	}()
 }