You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@devlake.apache.org by kl...@apache.org on 2022/09/15 10:14:10 UTC

[incubator-devlake] branch main updated: [fix-2940][framework]: Add lazy caching of http codes for error handling and fix any broken Http error messages (#2984)

This is an automated email from the ASF dual-hosted git repository.

klesh pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git


The following commit(s) were added to refs/heads/main by this push:
     new 4302140d [fix-2940][framework]: Add lazy caching of http codes for error handling and fix any broken Http error messages (#2984)
4302140d is described below

commit 4302140db231daa5e9671fad2985179c9fa58423
Author: Keon Amini <ke...@merico.dev>
AuthorDate: Thu Sep 15 05:14:07 2022 -0500

    [fix-2940][framework]: Add lazy caching of http codes for error handling and fix any broken Http error messages (#2984)
    
    * fix: add lazy caching of http codes for error handling
    
    * fix: uplift github test action go version to get build running (also see #2908)
    
    * fix: github workflow adjustments
    
    * refactor: reworked parts of the error interface to handle combining errors better + addressed the issue in #3031
    
    * fix: fixed some error messages
    
    * fix: restored error-400 default error behavior in plugin router
    
    * fix: removed redundant function call
    
    * fix: fixed some post-merge buggy behavior
    
    * refactor: removed pre-merge user-message based implementation
    
    * refactor: minor naming changes
---
 api/router.go                          |   2 +-
 api/shared/api_output.go               |  11 ++--
 errors/errors.go                       |   4 +-
 errors/errors_test.go                  |  17 +++---
 errors/impl.go                         |  69 +++++++++++++---------
 errors/{errors.go => map.go}           |  46 +++++++--------
 errors/message.go                      | 104 +++++++++++++++++++++++++++++++++
 errors/types.go                        |  42 +++++--------
 plugins/gitextractor/gitextractor.go   |   2 +-
 plugins/github/api/connection.go       |  18 +++---
 plugins/helper/graphql_async_client.go |   2 +-
 plugins/helper/worker_scheduler.go     |   2 +-
 runner/run_task.go                     |   4 +-
 services/pipeline_runner.go            |  10 +++-
 services/task.go                       |  14 +++--
 15 files changed, 231 insertions(+), 116 deletions(-)

diff --git a/api/router.go b/api/router.go
index 1a86f767..97fd3d96 100644
--- a/api/router.go
+++ b/api/router.go
@@ -103,7 +103,7 @@ func handlePluginCall(pluginName string, handler core.ApiResourceHandler) func(c
 		}
 		output, err := handler(input)
 		if err != nil {
-			shared.ApiOutputError(c, errors.Default.Wrap(err, fmt.Sprintf("error executing the requested resource for plugin %s", pluginName)))
+			shared.ApiOutputError(c, errors.BadInput.Wrap(err, fmt.Sprintf("error executing the requested resource for plugin %s", pluginName)))
 		} else if output != nil {
 			status := output.Status
 			if status < http.StatusContinue {
diff --git a/api/shared/api_output.go b/api/shared/api_output.go
index 69be3ef1..cd0ffc6d 100644
--- a/api/shared/api_output.go
+++ b/api/shared/api_output.go
@@ -29,8 +29,9 @@ import (
 const BadRequestBody = "bad request body format"
 
 type ApiBody struct {
-	Success bool   `json:"success"`
-	Message string `json:"message"`
+	Success bool     `json:"success"`
+	Message string   `json:"message"`
+	Causes  []string `json:"causes"`
 }
 
 type ResponsePipelines struct {
@@ -42,9 +43,11 @@ type ResponsePipelines struct {
 func ApiOutputError(c *gin.Context, err error) {
 	if e, ok := err.(errors.Error); ok {
 		logger.Global.Error(err, "HTTP %d error", e.GetType().GetHttpCode())
+		messages := e.Messages()
 		c.JSON(e.GetType().GetHttpCode(), &ApiBody{
 			Success: false,
-			Message: e.Message(),
+			Message: messages.Get(),
+			Causes:  messages.Causes(),
 		})
 	} else {
 		logger.Global.Error(err, "HTTP %d error (native)", http.StatusInternalServerError)
@@ -71,7 +74,7 @@ func ApiOutputSuccess(c *gin.Context, body interface{}, status int) {
 func ApiOutputAbort(c *gin.Context, err error) {
 	if e, ok := err.(errors.Error); ok {
 		logger.Global.Error(err, "HTTP %d abort-error", e.GetType().GetHttpCode())
-		_ = c.AbortWithError(e.GetType().GetHttpCode(), fmt.Errorf(e.Message()))
+		_ = c.AbortWithError(e.GetType().GetHttpCode(), fmt.Errorf(e.Messages().Format()))
 	} else {
 		logger.Global.Error(err, "HTTP %d abort-error (native)", http.StatusInternalServerError)
 		_ = c.AbortWithError(http.StatusInternalServerError, err)
diff --git a/errors/errors.go b/errors/errors.go
index 5df4909f..ff839670 100644
--- a/errors/errors.go
+++ b/errors/errors.go
@@ -25,8 +25,8 @@ type (
 	// Error The interface that all internally managed errors should adhere to.
 	Error interface {
 		requiredSupertype
-		// Message the message associated with this Error.
-		Message() string
+		// Messages the message associated with this Error.
+		Messages() Messages
 		// GetType gets the Type of this error
 		GetType() *Type
 		// As Attempts to cast this Error to the requested Type, and returns nil if it can't.
diff --git a/errors/errors_test.go b/errors/errors_test.go
index d241f93e..0d59c95e 100644
--- a/errors/errors_test.go
+++ b/errors/errors_test.go
@@ -35,7 +35,7 @@ func TestCrdbErrorImpl(t *testing.T) {
 		require.Equal(t, err.Error(), lakeErr.Error())
 	})
 	t.Run("raw_message", func(t *testing.T) {
-		msg := lakeErr.Message()
+		msg := lakeErr.Messages().Format()
 		require.NotEqual(t, err.Error(), msg)
 		fmt.Printf("======================Raw Message=======================: \n%s\n\n\n", msg)
 		msgParts := strings.Split(msg, "\ncaused by: ")
@@ -59,7 +59,7 @@ func TestCrdbErrorImpl(t *testing.T) {
 		require.True(t, errors.Is(lakeErr, os.ErrNotExist))
 	})
 	t.Run("combine_errors_type", func(t *testing.T) {
-		err = Unauthorized.Combine([]error{err, err}, "combined")
+		err = Unauthorized.Combine([]error{err, err})
 		lakeErr = AsLakeErrorType(err)
 		require.NotNil(t, lakeErr)
 		e := lakeErr.As(Unauthorized)
@@ -78,31 +78,32 @@ func TestCrdbErrorImpl(t *testing.T) {
 		baseErr := BadInput.Wrap(rawErr, "wrapped")
 		err2 := Convert(baseErr)
 		require.Same(t, baseErr, err2)
-		require.Equal(t, "wrapped (400)", err2.Message())
+		require.Equal(t, "wrapped (400)", err2.Messages().Get())
 		require.Same(t, rawErr, err2.Unwrap())
 		err3 := Default.WrapRaw(baseErr)
 		require.NotSame(t, baseErr, err3)
-		require.Equal(t, "wrapped (400)", err3.Message())
+		require.Equal(t, "wrapped (400)", err3.Messages().Get())
+		require.Equal(t, "wrapped (400)", err3.Messages().Get())
 		require.Same(t, baseErr, err3.Unwrap())
 	})
 }
 
-func f1() error {
+func f1() Error {
 	err := f2()
 	return Default.Wrap(err, "f1 error")
 }
 
-func f2() error {
+func f2() Error {
 	err := f3()
 	return NotFound.Wrap(err, "f2 error")
 }
 
-func f3() error {
+func f3() Error {
 	err := f4()
 	return Default.Wrap(err, "f3 error")
 }
 
-func f4() error {
+func f4() Error {
 	err := f5()
 	return BadInput.WrapRaw(err)
 }
diff --git a/errors/impl.go b/errors/impl.go
index 2c55f9ce..83d51d2f 100644
--- a/errors/impl.go
+++ b/errors/impl.go
@@ -30,7 +30,7 @@ type (
 	crdbErrorImpl struct {
 		wrappedRaw error
 		wrapped    *crdbErrorImpl
-		msg        string
+		msg        *errMessage
 		data       interface{}
 		t          *Type
 	}
@@ -56,17 +56,10 @@ func (e *crdbErrorImpl) Error() string {
 	return parts[1]
 }
 
-func (e *crdbErrorImpl) Message() string {
-	return strings.Join(e.getMessages(func(err *crdbErrorImpl) string {
-		if err.msg == "" {
-			return ""
-		}
-		code := ""
-		if err.t.httpCode != 0 {
-			code = fmt.Sprintf("(%d)", err.t.httpCode)
-		}
-		return err.msg + " " + code
-	}), "\ncaused by: ")
+func (e *crdbErrorImpl) Messages() Messages {
+	return e.getMessages(func(err *crdbErrorImpl) *errMessage {
+		return err.msg
+	})
 }
 
 func (e *crdbErrorImpl) Unwrap() error {
@@ -98,13 +91,13 @@ func (e *crdbErrorImpl) As(t *Type) Error {
 	}
 }
 
-func (e *crdbErrorImpl) getMessages(getMessage func(*crdbErrorImpl) string) []string {
-	msgs := []string{}
+func (e *crdbErrorImpl) getMessages(getMessage func(*crdbErrorImpl) *errMessage) []*errMessage {
+	msgs := []*errMessage{}
 	err := e
 	ok := false
 	for {
 		msg := getMessage(err)
-		if msg != "" {
+		if len(msg.msgs) > 0 {
 			msgs = append(msgs, msg)
 		}
 		unwrapped := err.Unwrap()
@@ -120,41 +113,63 @@ func (e *crdbErrorImpl) getMessages(getMessage func(*crdbErrorImpl) string) []st
 	return msgs
 }
 
-func newCrdbError(t *Type, err error, message string, opts ...Option) *crdbErrorImpl {
+func newSingleCrdbError(t *Type, err error, message string, opts ...Option) Error {
 	cfg := &options{}
 	for _, opt := range opts {
 		opt(cfg)
 	}
+	cfg.stackOffset += 1
+	msg := &errMessage{}
+	if cast, ok := err.(*crdbErrorImpl); ok {
+		if t == Default { // inherit wrapped error's type
+			t = cast.GetType()
+		}
+	}
+	msg.addMessage(t, message)
+	return newCrdbError(t, err, msg, cfg)
+}
+
+func newCombinedCrdbError(t *Type, errs []error) Error {
+	msg := &errMessage{}
+	for _, e := range errs {
+		if le, ok := e.(*crdbErrorImpl); ok {
+			msg.appendMessage(le.msg.getMessage())
+		} else {
+			msg.appendMessage(e.Error())
+		}
+	}
+	opts := &options{}
+	opts.stackOffset += 1
+	return newCrdbError(t, nil, msg, opts)
+}
+
+func newCrdbError(t *Type, err error, msg *errMessage, opts *options) *crdbErrorImpl {
 	errType := t
 	var wrappedErr *crdbErrorImpl
 	var wrappedRaw error
-	rawMessage := message
-	cfg.stackOffset += 2
+	opts.stackOffset += 2
 	if err == nil {
 		if enableStacktraces {
-			wrappedRaw = cerror.NewWithDepth(int(cfg.stackOffset), rawMessage)
+			wrappedRaw = cerror.NewWithDepth(int(opts.stackOffset), msg.getPrettifiedMessage())
 		} else {
-			wrappedRaw = errors.New(message)
+			wrappedRaw = errors.New(msg.getPrettifiedMessage())
 		}
 	} else {
 		if cast, ok := err.(*crdbErrorImpl); ok {
 			err = cast.wrappedRaw
 			wrappedErr = cast
-			if t == Default { // inherit wrapped error's type
-				errType = cast.GetType()
-			}
 		}
 		if enableStacktraces {
-			wrappedRaw = cerror.WrapWithDepth(int(cfg.stackOffset), err, rawMessage)
+			wrappedRaw = cerror.WrapWithDepth(int(opts.stackOffset), err, msg.getPrettifiedMessage())
 		} else {
-			wrappedRaw = cerror.WithDetail(err, rawMessage)
+			wrappedRaw = cerror.WithDetail(err, msg.getPrettifiedMessage())
 		}
 	}
 	impl := &crdbErrorImpl{
 		wrappedRaw: wrappedRaw,
 		wrapped:    wrappedErr,
-		msg:        rawMessage,
-		data:       cfg.data,
+		msg:        msg,
+		data:       opts.data,
 		t:          errType,
 	}
 	return impl
diff --git a/errors/errors.go b/errors/map.go
similarity index 51%
copy from errors/errors.go
copy to errors/map.go
index 5df4909f..0628ab96 100644
--- a/errors/errors.go
+++ b/errors/map.go
@@ -17,32 +17,28 @@ limitations under the License.
 
 package errors
 
-type (
-	requiredSupertype interface {
-		error
-		Unwrap() error
-	}
-	// Error The interface that all internally managed errors should adhere to.
-	Error interface {
-		requiredSupertype
-		// Message the message associated with this Error.
-		Message() string
-		// GetType gets the Type of this error
-		GetType() *Type
-		// As Attempts to cast this Error to the requested Type, and returns nil if it can't.
-		As(*Type) Error
-		// GetData returns the data associated with this Error (may be nil)
-		GetData() interface{}
-	}
-)
+import "sync"
+
+// Wraps the native sync map in generic form. Consider moving this to another package for broader use later
+type syncMap[K any, V any] struct {
+	m *sync.Map
+}
 
-// AsLakeErrorType attempts to cast err to Error, otherwise returns nil
-func AsLakeErrorType(err error) Error {
-	if cast, ok := err.(Error); ok {
-		return cast
+func newSyncMap[K any, V any]() *syncMap[K, V] {
+	return &syncMap[K, V]{
+		m: new(sync.Map),
 	}
-	return nil
 }
 
-var _ error = (Error)(nil)
-var _ requiredSupertype = (Error)(nil)
+func (sm *syncMap[K, V]) Store(key K, val V) {
+	sm.m.Store(key, val)
+}
+
+func (sm *syncMap[K, V]) Load(key K) (V, bool) {
+	var v V
+	val, ok := sm.m.Load(key)
+	if ok {
+		v = val.(V)
+	}
+	return v, ok
+}
diff --git a/errors/message.go b/errors/message.go
new file mode 100644
index 00000000..1a645b85
--- /dev/null
+++ b/errors/message.go
@@ -0,0 +1,104 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package errors
+
+import (
+	"fmt"
+	"strings"
+)
+
+type (
+	// Messages alias for messages of an Error
+	Messages []*errMessage
+	// MessageType the type of message for an Error
+	MessageType int
+
+	// errMessage an abstraction around a given Error's message
+	errMessage struct {
+		// all the messages associated with an Error. The size will be > 1 if the Error is created using Type.Combine
+		msgs []string
+	}
+)
+
+func (m *errMessage) addMessage(t *Type, msg string) {
+	if msg == "" {
+		return
+	}
+	if t.httpCode != 0 {
+		msg = fmt.Sprintf("%s (%d)", msg, t.httpCode)
+	}
+	m.appendMessage(msg)
+}
+
+func (m *errMessage) appendMessage(msg string) {
+	m.msgs = append(m.msgs, msg)
+}
+
+func (m *errMessage) getMessage() string {
+	if len(m.msgs) == 0 {
+		return ""
+	}
+	return strings.Join(m.msgs, ",")
+}
+
+func (m *errMessage) getPrettifiedMessage() string {
+	if len(m.msgs) == 0 {
+		return ""
+	}
+	if len(m.msgs) == 1 {
+		return m.msgs[0]
+	}
+	effectiveMsg := strings.Join(m.msgs, "\n=====================\n")
+	effectiveMsg = "\t" + strings.ReplaceAll(effectiveMsg, "\n", "\n\t")
+	return fmt.Sprintf("\ncombined messages: \n{\n%s\n}", effectiveMsg)
+}
+
+// Format formats the messages into a single string
+func (m Messages) Format() string {
+	msgs := []string{}
+	for _, m := range m {
+		if msg := m.getMessage(); msg != "" {
+			msgs = append(msgs, msg)
+		}
+	}
+	return strings.Join(msgs, "\ncaused by: ")
+}
+
+// Get gets the main (top-level) (or first non-empty message if exists) message of the Messages
+func (m Messages) Get() string {
+	for _, m := range m {
+		if msg := m.getMessage(); msg != "" {
+			return msg
+		}
+	}
+	return ""
+}
+
+// Causes gets the non-main messages of the Messages in causal sequence
+func (m Messages) Causes() []string {
+	if len(m) < 2 {
+		return nil
+	}
+	causes := []string{}
+	for _, m := range m[1:] {
+		if msg := m.getMessage(); msg != "" {
+			causes = append(causes, msg)
+		}
+	}
+	return causes
+}
diff --git a/errors/types.go b/errors/types.go
index 981ff64a..06bd82dd 100644
--- a/errors/types.go
+++ b/errors/types.go
@@ -20,7 +20,6 @@ package errors
 import (
 	"fmt"
 	"net/http"
-	"strings"
 )
 
 // Supported error types
@@ -37,7 +36,7 @@ var (
 	Timeout      = register(&Type{httpCode: http.StatusGatewayTimeout, meta: "timeout"})
 
 	//cached values
-	typesByHttpCode = map[int]*Type{}
+	typesByHttpCode = newSyncMap[int, *Type]()
 )
 
 type (
@@ -58,24 +57,22 @@ type (
 )
 
 func HttpStatus(code int) *Type {
-	t, ok := typesByHttpCode[code]
-	if !ok {
-		t = Internal
+	t, ok := typesByHttpCode.Load(code)
+	if !ok { // lazily cache any missing codes
+		t = &Type{httpCode: code, meta: fmt.Sprintf("type_http_%d", code)}
+		typesByHttpCode.Store(code, t)
 	}
 	return t
 }
 
 // New constructs a new Error instance with this message
 func (t *Type) New(message string, opts ...Option) Error {
-	return newCrdbError(t, nil, message, opts...)
+	return newSingleCrdbError(t, nil, message, opts...)
 }
 
 // Wrap constructs a new Error instance with this message and wraps the passed in error. A nil 'err' will return a nil Error.
 func (t *Type) Wrap(err error, message string, opts ...Option) Error {
-	if err == nil {
-		return nil
-	}
-	return newCrdbError(t, err, message, opts...)
+	return newSingleCrdbError(t, err, message, opts...)
 }
 
 // WrapRaw constructs a new Error instance that directly wraps this error with no additional context. A nil 'err' will return a nil Error.
@@ -103,24 +100,13 @@ func (t *Type) wrapRaw(err error, forceWrap bool, opts ...Option) Error {
 	} else {
 		msg = err.Error()
 	}
-	return newCrdbError(t, err, msg, opts...)
+	return newSingleCrdbError(t, err, msg, opts...)
 }
 
-// Combine constructs a new Error from combining multiple errors. Stacktrace info for each of the errors will not be present in the result.
-func (t *Type) Combine(errs []error, msg string, opts ...Option) Error {
-	msgs := []string{}
-	for _, e := range errs {
-		if le := AsLakeErrorType(e); le != nil {
-			if msg0 := le.Message(); msg0 != "" {
-				msgs = append(msgs, le.Message())
-			}
-		} else {
-			msgs = append(msgs, e.Error())
-		}
-	}
-	effectiveMsg := strings.Join(msgs, "\n=====================\n")
-	effectiveMsg = "\t" + strings.ReplaceAll(effectiveMsg, "\n", "\n\t")
-	return newCrdbError(t, nil, fmt.Sprintf("%s\ncombined messages: \n{\n%s\n}", msg, effectiveMsg), opts...)
+// Combine constructs a new Error from combining multiple errors. Stacktrace info for each of the errors will not be present in the result, so it's
+// best to log the errors before combining them.
+func (t *Type) Combine(errs []error) Error {
+	return newCombinedCrdbError(t, errs)
 }
 
 // GetHttpCode gets the associated Http code with this Type, if explicitly set, otherwise http.StatusInternalServerError
@@ -149,9 +135,9 @@ func withStackOffset(offset uint) Option {
 func register(t *Type) *Type {
 	if t == nil {
 		t = &Type{meta: "default"}
-		typesByHttpCode[t.httpCode] = t
+		typesByHttpCode.Store(t.httpCode, t)
 	} else if t.httpCode != 0 {
-		typesByHttpCode[t.httpCode] = t
+		typesByHttpCode.Store(t.httpCode, t)
 	}
 	return t
 }
diff --git a/plugins/gitextractor/gitextractor.go b/plugins/gitextractor/gitextractor.go
index 0e9793f4..a0ac527a 100644
--- a/plugins/gitextractor/gitextractor.go
+++ b/plugins/gitextractor/gitextractor.go
@@ -54,7 +54,7 @@ func (plugin GitExtractor) SubTaskMetas() []core.SubTaskMeta {
 // based on task context and user input options, return data that shared among all subtasks
 func (plugin GitExtractor) PrepareTaskData(taskCtx core.TaskContext, options map[string]interface{}) (interface{}, errors.Error) {
 	var op tasks.GitExtractorOptions
-	if err := helper.Decode(options, op, nil); err != nil {
+	if err := helper.Decode(options, &op, nil); err != nil {
 		return nil, err
 	}
 	if err := op.Valid(); err != nil {
diff --git a/plugins/github/api/connection.go b/plugins/github/api/connection.go
index 046a4bfb..973530a1 100644
--- a/plugins/github/api/connection.go
+++ b/plugins/github/api/connection.go
@@ -57,7 +57,7 @@ func TestConnection(input *core.ApiResourceInput) (*core.ApiResourceOutput, erro
 
 	// verify multiple token in parallel
 	type VerifyResult struct {
-		err   error
+		err   errors.Error
 		login string
 	}
 	results := make(chan VerifyResult)
@@ -76,21 +76,21 @@ func TestConnection(input *core.ApiResourceInput) (*core.ApiResourceOutput, erro
 				basicRes,
 			)
 			if err != nil {
-				results <- VerifyResult{err: errors.Default.Wrap(err, fmt.Sprintf("verify token failed for #%d %s", j, token))}
+				results <- VerifyResult{err: errors.BadInput.Wrap(err, fmt.Sprintf("verify token failed for #%d %s", j, token))}
 				return
 			}
 			res, err := apiClient.Get("user", nil, nil)
 			if err != nil {
-				results <- VerifyResult{err: errors.Default.Wrap(err, fmt.Sprintf("verify token failed for #%d %s", j, token))}
+				results <- VerifyResult{err: errors.HttpStatus(res.StatusCode).Wrap(err, fmt.Sprintf("verify token failed for #%d %s", j, token))}
 				return
 			}
 			githubUserOfToken := &models.GithubUserOfToken{}
 			err = helper.UnmarshalResponse(res, githubUserOfToken)
 			if err != nil {
-				results <- VerifyResult{err: errors.Default.Wrap(err, fmt.Sprintf("verify token failed for #%v %s", j, token))}
+				results <- VerifyResult{err: errors.BadInput.Wrap(err, fmt.Sprintf("verify token failed for #%v %s", j, token))}
 				return
 			} else if githubUserOfToken.Login == "" {
-				results <- VerifyResult{err: errors.Default.Wrap(err, fmt.Sprintf("invalid token for #%v %s", j, token))}
+				results <- VerifyResult{err: errors.BadInput.Wrap(err, fmt.Sprintf("invalid token for #%v %s", j, token))}
 				return
 			}
 			results <- VerifyResult{login: githubUserOfToken.Login}
@@ -99,11 +99,11 @@ func TestConnection(input *core.ApiResourceInput) (*core.ApiResourceOutput, erro
 
 	// collect verification results
 	logins := make([]string, 0)
-	msgs := make([]string, 0)
+	allErrors := make([]error, 0)
 	i := 0
 	for result := range results {
 		if result.err != nil {
-			msgs = append(msgs, result.err.Error())
+			allErrors = append(allErrors, result.err)
 		}
 		logins = append(logins, result.login)
 		i++
@@ -111,8 +111,8 @@ func TestConnection(input *core.ApiResourceInput) (*core.ApiResourceOutput, erro
 			close(results)
 		}
 	}
-	if len(msgs) > 0 {
-		return nil, errors.Default.New(strings.Join(msgs, "\n"))
+	if len(allErrors) > 0 {
+		return nil, errors.Default.Combine(allErrors)
 	}
 
 	githubApiResponse := GithubTestConnResponse{}
diff --git a/plugins/helper/graphql_async_client.go b/plugins/helper/graphql_async_client.go
index 43068b80..91be1728 100644
--- a/plugins/helper/graphql_async_client.go
+++ b/plugins/helper/graphql_async_client.go
@@ -147,7 +147,7 @@ func (apiClient *GraphqlAsyncClient) NextTick(task func() errors.Error) {
 func (apiClient *GraphqlAsyncClient) Wait() errors.Error {
 	apiClient.waitGroup.Wait()
 	if len(apiClient.workerErrors) > 0 {
-		return errors.Default.Combine(apiClient.workerErrors, "graphql workers encountered error(s)")
+		return errors.Default.Combine(apiClient.workerErrors)
 	}
 	return nil
 }
diff --git a/plugins/helper/worker_scheduler.go b/plugins/helper/worker_scheduler.go
index 8a181bfb..e74f228f 100644
--- a/plugins/helper/worker_scheduler.go
+++ b/plugins/helper/worker_scheduler.go
@@ -152,7 +152,7 @@ func (s *WorkerScheduler) NextTick(task func() errors.Error) {
 func (s *WorkerScheduler) Wait() errors.Error {
 	s.waitGroup.Wait()
 	if len(s.workerErrors) > 0 {
-		return errors.Default.Combine(s.workerErrors, "worker scheduler captured these errors")
+		return errors.Default.Combine(s.workerErrors)
 	}
 	return nil
 }
diff --git a/runner/run_task.go b/runner/run_task.go
index 116e6169..28124877 100644
--- a/runner/run_task.go
+++ b/runner/run_task.go
@@ -70,10 +70,12 @@ func RunTask(
 				if meta, ok := lakeErr.GetData().(*core.SubTaskMeta); ok {
 					subTaskName = meta.Name
 				}
+			} else {
+				lakeErr = errors.Convert(err)
 			}
 			dbe := db.Model(task).Updates(map[string]interface{}{
 				"status":          models.TASK_FAILED,
-				"message":         err.Error(),
+				"message":         lakeErr.Messages().Format(),
 				"finished_at":     finishedAt,
 				"spent_seconds":   spentSeconds,
 				"failed_sub_task": subTaskName,
diff --git a/services/pipeline_runner.go b/services/pipeline_runner.go
index f40746e0..5ab5c4c6 100644
--- a/services/pipeline_runner.go
+++ b/services/pipeline_runner.go
@@ -113,11 +113,11 @@ func runPipeline(pipelineId uint64) errors.Error {
 		err = pipelineRun.runPipelineStandalone()
 	}
 	if err != nil {
-		err = errors.Default.Wrap(err, fmt.Sprintf("error running pipeline %d", pipelineId))
+		err = errors.Default.Wrap(err, fmt.Sprintf("Error running pipeline %d.", pipelineId))
 	}
 	pipeline, e := GetPipeline(pipelineId)
 	if e != nil {
-		return errors.Default.Wrap(err, fmt.Sprintf("unable to get pipeline %d", pipelineId))
+		return errors.Default.Wrap(err, fmt.Sprintf("Unable to get pipeline %d.", pipelineId))
 	}
 	// finished, update database
 	finishedAt := time.Now()
@@ -125,7 +125,11 @@ func runPipeline(pipelineId uint64) errors.Error {
 	pipeline.SpentSeconds = int(finishedAt.Unix() - pipeline.BeganAt.Unix())
 	if err != nil {
 		pipeline.Status = models.TASK_FAILED
-		pipeline.Message = err.Error()
+		if lakeErr := errors.AsLakeErrorType(err); lakeErr != nil {
+			pipeline.Message = lakeErr.Messages().Format()
+		} else {
+			pipeline.Message = err.Error()
+		}
 	} else {
 		pipeline.Status = models.TASK_COMPLETED
 		pipeline.Message = ""
diff --git a/services/task.go b/services/task.go
index fd5f7a6a..2268956d 100644
--- a/services/task.go
+++ b/services/task.go
@@ -24,7 +24,6 @@ import (
 	"github.com/apache/incubator-devlake/errors"
 	"regexp"
 	"strconv"
-	"strings"
 	"sync"
 
 	"github.com/apache/incubator-devlake/logger"
@@ -225,16 +224,21 @@ func runTasksStandalone(parentLogger core.Logger, taskIds []uint64) errors.Error
 		taskId := taskId
 		go func() {
 			taskLog.Info("run task in background ", taskId)
-			results <- runTaskStandalone(parentLogger, taskId)
+			var err errors.Error
+			taskErr := runTaskStandalone(parentLogger, taskId)
+			if taskErr != nil {
+				err = errors.Default.Wrap(taskErr, fmt.Sprintf("Error running task %d.", taskId))
+			}
+			results <- err
 		}()
 	}
-	errs := make([]string, 0)
+	errs := make([]error, 0)
 	var err error
 	finished := 0
 	for err = range results {
 		if err != nil {
 			taskLog.Error(err, "task failed")
-			errs = append(errs, err.Error())
+			errs = append(errs, err)
 		}
 		finished++
 		if finished == len(taskIds) {
@@ -242,7 +246,7 @@ func runTasksStandalone(parentLogger core.Logger, taskIds []uint64) errors.Error
 		}
 	}
 	if len(errs) > 0 {
-		err = errors.Default.New(strings.Join(errs, "\n"))
+		err = errors.Default.Combine(errs)
 	}
 	return errors.Convert(err)
 }