You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@devlake.apache.org by wa...@apache.org on 2022/06/13 06:25:48 UTC
[incubator-devlake] 04/04: fix: worker_scheduler wouldn't halt on error
This is an automated email from the ASF dual-hosted git repository.
warren pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git
commit d5140494aa3888c0e166e0aee1ff40f939839386
Author: Klesh Wong <zh...@merico.dev>
AuthorDate: Sat Jun 11 19:23:33 2022 +0800
fix: worker_scheduler wouldn't halt on error
---
plugins/helper/api_async_client.go | 6 ++--
plugins/helper/worker_scheduler.go | 61 ++++++++++++++++----------------------
2 files changed, 28 insertions(+), 39 deletions(-)
diff --git a/plugins/helper/api_async_client.go b/plugins/helper/api_async_client.go
index fb706512..f29a7c50 100644
--- a/plugins/helper/api_async_client.go
+++ b/plugins/helper/api_async_client.go
@@ -41,7 +41,6 @@ type ApiAsyncClient struct {
*ApiClient
maxRetry int
scheduler *WorkerScheduler
- hasError bool
numOfWorkers int
}
@@ -111,7 +110,6 @@ func CreateAsyncApiClient(
apiClient,
retry,
scheduler,
- false,
numOfWorkers,
}, nil
}
@@ -178,7 +176,7 @@ func (apiClient *ApiAsyncClient) DoAsync(
}
if err != nil {
- apiClient.hasError = true
+ apiClient.logger.Error("retry exceeded times: %d, err: %s", retry, err.Error())
return err
}
@@ -205,7 +203,7 @@ func (apiClient *ApiAsyncClient) WaitAsync() error {
}
func (apiClient *ApiAsyncClient) HasError() bool {
- return apiClient.hasError
+ return apiClient.scheduler.HasError()
}
func (apiClient *ApiAsyncClient) NextTick(task func() error) {
diff --git a/plugins/helper/worker_scheduler.go b/plugins/helper/worker_scheduler.go
index 04a0be47..2c7c4134 100644
--- a/plugins/helper/worker_scheduler.go
+++ b/plugins/helper/worker_scheduler.go
@@ -20,13 +20,11 @@ package helper
import (
"context"
"fmt"
- "os"
"sync"
"sync/atomic"
"time"
"github.com/apache/incubator-devlake/plugins/core"
- "github.com/apache/incubator-devlake/utils"
ants "github.com/panjf2000/ants/v2"
)
@@ -42,7 +40,7 @@ type WorkerScheduler struct {
logger core.Logger
}
-var callframeEnabled = os.Getenv("ASYNC_CF") == "true"
+//var callframeEnabled = os.Getenv("ASYNC_CF") == "true"
// NewWorkerScheduler creates a WorkerScheduler
func NewWorkerScheduler(
@@ -65,9 +63,7 @@ func NewWorkerScheduler(
logger: logger,
}
pool, err := ants.NewPool(workerNum, ants.WithPanicHandler(func(i interface{}) {
- s.mu.Lock()
- defer s.mu.Unlock()
- s.workerErrors = append(s.workerErrors, i.(error))
+ s.checkError(i)
}))
if err != nil {
return nil, err
@@ -84,53 +80,35 @@ func NewWorkerScheduler(
// IMPORTANT: do NOT call SubmitBlocking inside the async task, it is likely to cause a deadlock, call
// SubmitNonBlocking instead when number of tasks is relatively small.
func (s *WorkerScheduler) SubmitBlocking(task func() error) {
- // this is expensive, enable by EnvVar
- cf := s.gatherCallFrames()
- // to make sure task is done
- if len(s.workerErrors) > 0 {
- // not point to continue
+ if s.HasError() {
return
}
s.waitGroup.Add(1)
- err := s.pool.Submit(func() {
+ s.checkError(s.pool.Submit(func() {
defer s.waitGroup.Done()
id := atomic.AddInt32(&s.counter, 1)
s.logger.Debug("schedulerJob >>> %d started", id)
defer s.logger.Debug("schedulerJob <<< %d ended", id)
- if len(s.workerErrors) > 0 {
- // not point to continue
+ if s.HasError() {
return
}
- // wait for rate limit throttling
-
- // try recover
- defer func() {
- r := recover()
- if r != nil {
- s.appendError(fmt.Errorf("%s\n%s", r, cf))
- }
- }()
// normal error
- var err error
select {
case <-s.ctx.Done():
- err = s.ctx.Err()
+ panic(s.ctx.Err())
case <-s.ticker.C:
- err = task()
- }
- if err != nil {
- s.appendError(err)
+ err := task()
+ if err != nil {
+ panic(err)
+ }
}
- })
- // failed to submit, note that this is not task erro
- if err != nil {
- s.appendError(fmt.Errorf("%s\n%s", err, cf))
- }
+ }))
}
+/*
func (s *WorkerScheduler) gatherCallFrames() string {
cf := "set Environment Varaible ASYNC_CF=true to enable callframes capturing"
if callframeEnabled {
@@ -138,6 +116,7 @@ func (s *WorkerScheduler) gatherCallFrames() string {
}
return cf
}
+*/
func (s *WorkerScheduler) appendError(err error) {
s.mu.Lock()
@@ -145,6 +124,18 @@ func (s *WorkerScheduler) appendError(err error) {
s.workerErrors = append(s.workerErrors, err)
}
+func (s *WorkerScheduler) checkError(err interface{}) {
+ if err == nil {
+ return
+ }
+ s.appendError(err.(error))
+}
+
+// HasError return if any error occurred
+func (s *WorkerScheduler) HasError() bool {
+ return len(s.workerErrors) > 0
+}
+
// NextTick enqueues task in a NonBlocking manner, you should only call this method within task submitted by
// SubmitBlocking method
// IMPORTANT: do NOT call this method with a huge number of tasks, it is likely to eat up all available memory
@@ -153,7 +144,7 @@ func (s *WorkerScheduler) NextTick(task func() error) {
s.waitGroup.Add(1)
go func() {
defer s.waitGroup.Done()
- task() // nolint
+ s.checkError(task())
}()
}