You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@devlake.apache.org by wa...@apache.org on 2022/06/13 06:25:47 UTC
[incubator-devlake] 03/04: fix: worker errors was not caught
This is an automated email from the ASF dual-hosted git repository.
warren pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git
commit a677c3f2e6da4e72b7cce36f4d657e156fad1b3f
Author: Klesh Wong <zh...@merico.dev>
AuthorDate: Sat Jun 11 17:03:50 2022 +0800
fix: worker errors was not caught
---
plugins/helper/worker_scheduler.go | 40 +++++++++++++++++++-------------------
1 file changed, 20 insertions(+), 20 deletions(-)
diff --git a/plugins/helper/worker_scheduler.go b/plugins/helper/worker_scheduler.go
index 71d7574e..04a0be47 100644
--- a/plugins/helper/worker_scheduler.go
+++ b/plugins/helper/worker_scheduler.go
@@ -27,7 +27,7 @@ import (
"github.com/apache/incubator-devlake/plugins/core"
"github.com/apache/incubator-devlake/utils"
- "github.com/panjf2000/ants/v2"
+ ants "github.com/panjf2000/ants/v2"
)
// WorkerScheduler runs asynchronous tasks in parallel with throttling support
@@ -105,18 +105,29 @@ func (s *WorkerScheduler) SubmitBlocking(task func() error) {
}
// wait for rate limit throttling
+ // try recover
+ defer func() {
+ r := recover()
+ if r != nil {
+ s.appendError(fmt.Errorf("%s\n%s", r, cf))
+ }
+ }()
+
+ // normal error
var err error
- defer s.gatherError(err, cf)
select {
case <-s.ctx.Done():
err = s.ctx.Err()
case <-s.ticker.C:
- err = task() // nolint
+ err = task()
+ }
+ if err != nil {
+ s.appendError(err)
}
})
// failed to submit, note that this is not task erro
if err != nil {
- s.gatherError(err, cf)
+ s.appendError(fmt.Errorf("%s\n%s", err, cf))
}
}
@@ -128,32 +139,21 @@ func (s *WorkerScheduler) gatherCallFrames() string {
return cf
}
-func (s *WorkerScheduler) gatherError(err error, callframs string) {
- if err == nil {
- r := recover()
- if r != nil {
- err = fmt.Errorf("%s\n%s", r, callframs)
- }
- }
- if err != nil {
- s.mu.Lock()
- defer s.mu.Unlock()
- s.workerErrors = append(s.workerErrors, err)
- }
+func (s *WorkerScheduler) appendError(err error) {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ s.workerErrors = append(s.workerErrors, err)
}
// NextTick enqueues task in a NonBlocking manner, you should only call this method within task submitted by
// SubmitBlocking method
// IMPORTANT: do NOT call this method with a huge number of tasks, it is likely to eat up all available memory
func (s *WorkerScheduler) NextTick(task func() error) {
- cf := s.gatherCallFrames()
// to make sure task will be enqueued
s.waitGroup.Add(1)
go func() {
- var err error
defer s.waitGroup.Done()
- defer s.gatherError(err, cf)
- err = task() // nolint
+ task() // nolint
}()
}