You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@devlake.apache.org by wa...@apache.org on 2022/06/13 06:25:47 UTC

[incubator-devlake] 03/04: fix: worker errors was not caught

This is an automated email from the ASF dual-hosted git repository.

warren pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git

commit a677c3f2e6da4e72b7cce36f4d657e156fad1b3f
Author: Klesh Wong <zh...@merico.dev>
AuthorDate: Sat Jun 11 17:03:50 2022 +0800

    fix: worker errors was not caught
---
 plugins/helper/worker_scheduler.go | 40 +++++++++++++++++++-------------------
 1 file changed, 20 insertions(+), 20 deletions(-)

diff --git a/plugins/helper/worker_scheduler.go b/plugins/helper/worker_scheduler.go
index 71d7574e..04a0be47 100644
--- a/plugins/helper/worker_scheduler.go
+++ b/plugins/helper/worker_scheduler.go
@@ -27,7 +27,7 @@ import (
 
 	"github.com/apache/incubator-devlake/plugins/core"
 	"github.com/apache/incubator-devlake/utils"
-	"github.com/panjf2000/ants/v2"
+	ants "github.com/panjf2000/ants/v2"
 )
 
 // WorkerScheduler runs asynchronous tasks in parallel with throttling support
@@ -105,18 +105,29 @@ func (s *WorkerScheduler) SubmitBlocking(task func() error) {
 		}
 		// wait for rate limit throttling
 
+		// try recover
+		defer func() {
+			r := recover()
+			if r != nil {
+				s.appendError(fmt.Errorf("%s\n%s", r, cf))
+			}
+		}()
+
+		// normal error
 		var err error
-		defer s.gatherError(err, cf)
 		select {
 		case <-s.ctx.Done():
 			err = s.ctx.Err()
 		case <-s.ticker.C:
-			err = task() // nolint
+			err = task()
+		}
+		if err != nil {
+			s.appendError(err)
 		}
 	})
 	// failed to submit, note that this is not task erro
 	if err != nil {
-		s.gatherError(err, cf)
+		s.appendError(fmt.Errorf("%s\n%s", err, cf))
 	}
 }
 
@@ -128,32 +139,21 @@ func (s *WorkerScheduler) gatherCallFrames() string {
 	return cf
 }
 
-func (s *WorkerScheduler) gatherError(err error, callframs string) {
-	if err == nil {
-		r := recover()
-		if r != nil {
-			err = fmt.Errorf("%s\n%s", r, callframs)
-		}
-	}
-	if err != nil {
-		s.mu.Lock()
-		defer s.mu.Unlock()
-		s.workerErrors = append(s.workerErrors, err)
-	}
+func (s *WorkerScheduler) appendError(err error) {
+	s.mu.Lock()
+	defer s.mu.Unlock()
+	s.workerErrors = append(s.workerErrors, err)
 }
 
 // NextTick enqueues task in a NonBlocking manner, you should only call this method within task submitted by
 // SubmitBlocking method
 // IMPORTANT: do NOT call this method with a huge number of tasks, it is likely to eat up all available memory
 func (s *WorkerScheduler) NextTick(task func() error) {
-	cf := s.gatherCallFrames()
 	// to make sure task will be enqueued
 	s.waitGroup.Add(1)
 	go func() {
-		var err error
 		defer s.waitGroup.Done()
-		defer s.gatherError(err, cf)
-		err = task() // nolint
+		task() // nolint
 	}()
 }