You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@devlake.apache.org by kl...@apache.org on 2022/09/15 10:14:10 UTC
[incubator-devlake] branch main updated: [fix-2940][framework]: Add lazy caching of http codes for error handling and fix any broken Http error messages (#2984)
This is an automated email from the ASF dual-hosted git repository.
klesh pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git
The following commit(s) were added to refs/heads/main by this push:
new 4302140d [fix-2940][framework]: Add lazy caching of http codes for error handling and fix any broken Http error messages (#2984)
4302140d is described below
commit 4302140db231daa5e9671fad2985179c9fa58423
Author: Keon Amini <ke...@merico.dev>
AuthorDate: Thu Sep 15 05:14:07 2022 -0500
[fix-2940][framework]: Add lazy caching of http codes for error handling and fix any broken Http error messages (#2984)
* fix: add lazy caching of http codes for error handling
* fix: uplift github test action go version to get build running (also see #2908)
* fix: github workflow adjustments
* refactor: reworked parts of the error interface to handle combining errors better + addressed the issue in #3031
* fix: fixed some error messages
* fix: restored error-400 default error behavior in plugin router
* fix: removed redundant function call
* fix: fixed some post-merge buggy behavior
* refactor: removed pre-merge user-message based implementation
* refactor: minor naming changes
---
api/router.go | 2 +-
api/shared/api_output.go | 11 ++--
errors/errors.go | 4 +-
errors/errors_test.go | 17 +++---
errors/impl.go | 69 +++++++++++++---------
errors/{errors.go => map.go} | 46 +++++++--------
errors/message.go | 104 +++++++++++++++++++++++++++++++++
errors/types.go | 42 +++++--------
plugins/gitextractor/gitextractor.go | 2 +-
plugins/github/api/connection.go | 18 +++---
plugins/helper/graphql_async_client.go | 2 +-
plugins/helper/worker_scheduler.go | 2 +-
runner/run_task.go | 4 +-
services/pipeline_runner.go | 10 +++-
services/task.go | 14 +++--
15 files changed, 231 insertions(+), 116 deletions(-)
diff --git a/api/router.go b/api/router.go
index 1a86f767..97fd3d96 100644
--- a/api/router.go
+++ b/api/router.go
@@ -103,7 +103,7 @@ func handlePluginCall(pluginName string, handler core.ApiResourceHandler) func(c
}
output, err := handler(input)
if err != nil {
- shared.ApiOutputError(c, errors.Default.Wrap(err, fmt.Sprintf("error executing the requested resource for plugin %s", pluginName)))
+ shared.ApiOutputError(c, errors.BadInput.Wrap(err, fmt.Sprintf("error executing the requested resource for plugin %s", pluginName)))
} else if output != nil {
status := output.Status
if status < http.StatusContinue {
diff --git a/api/shared/api_output.go b/api/shared/api_output.go
index 69be3ef1..cd0ffc6d 100644
--- a/api/shared/api_output.go
+++ b/api/shared/api_output.go
@@ -29,8 +29,9 @@ import (
const BadRequestBody = "bad request body format"
type ApiBody struct {
- Success bool `json:"success"`
- Message string `json:"message"`
+ Success bool `json:"success"`
+ Message string `json:"message"`
+ Causes []string `json:"causes"`
}
type ResponsePipelines struct {
@@ -42,9 +43,11 @@ type ResponsePipelines struct {
func ApiOutputError(c *gin.Context, err error) {
if e, ok := err.(errors.Error); ok {
logger.Global.Error(err, "HTTP %d error", e.GetType().GetHttpCode())
+ messages := e.Messages()
c.JSON(e.GetType().GetHttpCode(), &ApiBody{
Success: false,
- Message: e.Message(),
+ Message: messages.Get(),
+ Causes: messages.Causes(),
})
} else {
logger.Global.Error(err, "HTTP %d error (native)", http.StatusInternalServerError)
@@ -71,7 +74,7 @@ func ApiOutputSuccess(c *gin.Context, body interface{}, status int) {
func ApiOutputAbort(c *gin.Context, err error) {
if e, ok := err.(errors.Error); ok {
logger.Global.Error(err, "HTTP %d abort-error", e.GetType().GetHttpCode())
- _ = c.AbortWithError(e.GetType().GetHttpCode(), fmt.Errorf(e.Message()))
+ _ = c.AbortWithError(e.GetType().GetHttpCode(), fmt.Errorf(e.Messages().Format()))
} else {
logger.Global.Error(err, "HTTP %d abort-error (native)", http.StatusInternalServerError)
_ = c.AbortWithError(http.StatusInternalServerError, err)
diff --git a/errors/errors.go b/errors/errors.go
index 5df4909f..ff839670 100644
--- a/errors/errors.go
+++ b/errors/errors.go
@@ -25,8 +25,8 @@ type (
// Error The interface that all internally managed errors should adhere to.
Error interface {
requiredSupertype
- // Message the message associated with this Error.
- Message() string
+ // Messages the message associated with this Error.
+ Messages() Messages
// GetType gets the Type of this error
GetType() *Type
// As Attempts to cast this Error to the requested Type, and returns nil if it can't.
diff --git a/errors/errors_test.go b/errors/errors_test.go
index d241f93e..0d59c95e 100644
--- a/errors/errors_test.go
+++ b/errors/errors_test.go
@@ -35,7 +35,7 @@ func TestCrdbErrorImpl(t *testing.T) {
require.Equal(t, err.Error(), lakeErr.Error())
})
t.Run("raw_message", func(t *testing.T) {
- msg := lakeErr.Message()
+ msg := lakeErr.Messages().Format()
require.NotEqual(t, err.Error(), msg)
fmt.Printf("======================Raw Message=======================: \n%s\n\n\n", msg)
msgParts := strings.Split(msg, "\ncaused by: ")
@@ -59,7 +59,7 @@ func TestCrdbErrorImpl(t *testing.T) {
require.True(t, errors.Is(lakeErr, os.ErrNotExist))
})
t.Run("combine_errors_type", func(t *testing.T) {
- err = Unauthorized.Combine([]error{err, err}, "combined")
+ err = Unauthorized.Combine([]error{err, err})
lakeErr = AsLakeErrorType(err)
require.NotNil(t, lakeErr)
e := lakeErr.As(Unauthorized)
@@ -78,31 +78,32 @@ func TestCrdbErrorImpl(t *testing.T) {
baseErr := BadInput.Wrap(rawErr, "wrapped")
err2 := Convert(baseErr)
require.Same(t, baseErr, err2)
- require.Equal(t, "wrapped (400)", err2.Message())
+ require.Equal(t, "wrapped (400)", err2.Messages().Get())
require.Same(t, rawErr, err2.Unwrap())
err3 := Default.WrapRaw(baseErr)
require.NotSame(t, baseErr, err3)
- require.Equal(t, "wrapped (400)", err3.Message())
+ require.Equal(t, "wrapped (400)", err3.Messages().Get())
+ require.Equal(t, "wrapped (400)", err3.Messages().Get())
require.Same(t, baseErr, err3.Unwrap())
})
}
-func f1() error {
+func f1() Error {
err := f2()
return Default.Wrap(err, "f1 error")
}
-func f2() error {
+func f2() Error {
err := f3()
return NotFound.Wrap(err, "f2 error")
}
-func f3() error {
+func f3() Error {
err := f4()
return Default.Wrap(err, "f3 error")
}
-func f4() error {
+func f4() Error {
err := f5()
return BadInput.WrapRaw(err)
}
diff --git a/errors/impl.go b/errors/impl.go
index 2c55f9ce..83d51d2f 100644
--- a/errors/impl.go
+++ b/errors/impl.go
@@ -30,7 +30,7 @@ type (
crdbErrorImpl struct {
wrappedRaw error
wrapped *crdbErrorImpl
- msg string
+ msg *errMessage
data interface{}
t *Type
}
@@ -56,17 +56,10 @@ func (e *crdbErrorImpl) Error() string {
return parts[1]
}
-func (e *crdbErrorImpl) Message() string {
- return strings.Join(e.getMessages(func(err *crdbErrorImpl) string {
- if err.msg == "" {
- return ""
- }
- code := ""
- if err.t.httpCode != 0 {
- code = fmt.Sprintf("(%d)", err.t.httpCode)
- }
- return err.msg + " " + code
- }), "\ncaused by: ")
+func (e *crdbErrorImpl) Messages() Messages {
+ return e.getMessages(func(err *crdbErrorImpl) *errMessage {
+ return err.msg
+ })
}
func (e *crdbErrorImpl) Unwrap() error {
@@ -98,13 +91,13 @@ func (e *crdbErrorImpl) As(t *Type) Error {
}
}
-func (e *crdbErrorImpl) getMessages(getMessage func(*crdbErrorImpl) string) []string {
- msgs := []string{}
+func (e *crdbErrorImpl) getMessages(getMessage func(*crdbErrorImpl) *errMessage) []*errMessage {
+ msgs := []*errMessage{}
err := e
ok := false
for {
msg := getMessage(err)
- if msg != "" {
+ if len(msg.msgs) > 0 {
msgs = append(msgs, msg)
}
unwrapped := err.Unwrap()
@@ -120,41 +113,63 @@ func (e *crdbErrorImpl) getMessages(getMessage func(*crdbErrorImpl) string) []st
return msgs
}
-func newCrdbError(t *Type, err error, message string, opts ...Option) *crdbErrorImpl {
+func newSingleCrdbError(t *Type, err error, message string, opts ...Option) Error {
cfg := &options{}
for _, opt := range opts {
opt(cfg)
}
+ cfg.stackOffset += 1
+ msg := &errMessage{}
+ if cast, ok := err.(*crdbErrorImpl); ok {
+ if t == Default { // inherit wrapped error's type
+ t = cast.GetType()
+ }
+ }
+ msg.addMessage(t, message)
+ return newCrdbError(t, err, msg, cfg)
+}
+
+func newCombinedCrdbError(t *Type, errs []error) Error {
+ msg := &errMessage{}
+ for _, e := range errs {
+ if le, ok := e.(*crdbErrorImpl); ok {
+ msg.appendMessage(le.msg.getMessage())
+ } else {
+ msg.appendMessage(e.Error())
+ }
+ }
+ opts := &options{}
+ opts.stackOffset += 1
+ return newCrdbError(t, nil, msg, opts)
+}
+
+func newCrdbError(t *Type, err error, msg *errMessage, opts *options) *crdbErrorImpl {
errType := t
var wrappedErr *crdbErrorImpl
var wrappedRaw error
- rawMessage := message
- cfg.stackOffset += 2
+ opts.stackOffset += 2
if err == nil {
if enableStacktraces {
- wrappedRaw = cerror.NewWithDepth(int(cfg.stackOffset), rawMessage)
+ wrappedRaw = cerror.NewWithDepth(int(opts.stackOffset), msg.getPrettifiedMessage())
} else {
- wrappedRaw = errors.New(message)
+ wrappedRaw = errors.New(msg.getPrettifiedMessage())
}
} else {
if cast, ok := err.(*crdbErrorImpl); ok {
err = cast.wrappedRaw
wrappedErr = cast
- if t == Default { // inherit wrapped error's type
- errType = cast.GetType()
- }
}
if enableStacktraces {
- wrappedRaw = cerror.WrapWithDepth(int(cfg.stackOffset), err, rawMessage)
+ wrappedRaw = cerror.WrapWithDepth(int(opts.stackOffset), err, msg.getPrettifiedMessage())
} else {
- wrappedRaw = cerror.WithDetail(err, rawMessage)
+ wrappedRaw = cerror.WithDetail(err, msg.getPrettifiedMessage())
}
}
impl := &crdbErrorImpl{
wrappedRaw: wrappedRaw,
wrapped: wrappedErr,
- msg: rawMessage,
- data: cfg.data,
+ msg: msg,
+ data: opts.data,
t: errType,
}
return impl
diff --git a/errors/errors.go b/errors/map.go
similarity index 51%
copy from errors/errors.go
copy to errors/map.go
index 5df4909f..0628ab96 100644
--- a/errors/errors.go
+++ b/errors/map.go
@@ -17,32 +17,28 @@ limitations under the License.
package errors
-type (
- requiredSupertype interface {
- error
- Unwrap() error
- }
- // Error The interface that all internally managed errors should adhere to.
- Error interface {
- requiredSupertype
- // Message the message associated with this Error.
- Message() string
- // GetType gets the Type of this error
- GetType() *Type
- // As Attempts to cast this Error to the requested Type, and returns nil if it can't.
- As(*Type) Error
- // GetData returns the data associated with this Error (may be nil)
- GetData() interface{}
- }
-)
+import "sync"
+
+// Wraps the native sync map in generic form. Consider moving this to another package for broader use later
+type syncMap[K any, V any] struct {
+ m *sync.Map
+}
-// AsLakeErrorType attempts to cast err to Error, otherwise returns nil
-func AsLakeErrorType(err error) Error {
- if cast, ok := err.(Error); ok {
- return cast
+func newSyncMap[K any, V any]() *syncMap[K, V] {
+ return &syncMap[K, V]{
+ m: new(sync.Map),
}
- return nil
}
-var _ error = (Error)(nil)
-var _ requiredSupertype = (Error)(nil)
+func (sm *syncMap[K, V]) Store(key K, val V) {
+ sm.m.Store(key, val)
+}
+
+func (sm *syncMap[K, V]) Load(key K) (V, bool) {
+ var v V
+ val, ok := sm.m.Load(key)
+ if ok {
+ v = val.(V)
+ }
+ return v, ok
+}
diff --git a/errors/message.go b/errors/message.go
new file mode 100644
index 00000000..1a645b85
--- /dev/null
+++ b/errors/message.go
@@ -0,0 +1,104 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package errors
+
+import (
+ "fmt"
+ "strings"
+)
+
+type (
+ // Messages alias for messages of an Error
+ Messages []*errMessage
+ // MessageType the type of message for an Error
+ MessageType int
+
+ // errMessage an abstraction around a given Error's message
+ errMessage struct {
+ // all the messages associated with an Error. The size will be > 1 if the Error is created using Type.Combine
+ msgs []string
+ }
+)
+
+func (m *errMessage) addMessage(t *Type, msg string) {
+ if msg == "" {
+ return
+ }
+ if t.httpCode != 0 {
+ msg = fmt.Sprintf("%s (%d)", msg, t.httpCode)
+ }
+ m.appendMessage(msg)
+}
+
+func (m *errMessage) appendMessage(msg string) {
+ m.msgs = append(m.msgs, msg)
+}
+
+func (m *errMessage) getMessage() string {
+ if len(m.msgs) == 0 {
+ return ""
+ }
+ return strings.Join(m.msgs, ",")
+}
+
+func (m *errMessage) getPrettifiedMessage() string {
+ if len(m.msgs) == 0 {
+ return ""
+ }
+ if len(m.msgs) == 1 {
+ return m.msgs[0]
+ }
+ effectiveMsg := strings.Join(m.msgs, "\n=====================\n")
+ effectiveMsg = "\t" + strings.ReplaceAll(effectiveMsg, "\n", "\n\t")
+ return fmt.Sprintf("\ncombined messages: \n{\n%s\n}", effectiveMsg)
+}
+
+// Format formats the messages into a single string
+func (m Messages) Format() string {
+ msgs := []string{}
+ for _, m := range m {
+ if msg := m.getMessage(); msg != "" {
+ msgs = append(msgs, msg)
+ }
+ }
+ return strings.Join(msgs, "\ncaused by: ")
+}
+
+// Get gets the main (top-level) (or first non-empty message if exists) message of the Messages
+func (m Messages) Get() string {
+ for _, m := range m {
+ if msg := m.getMessage(); msg != "" {
+ return msg
+ }
+ }
+ return ""
+}
+
+// Causes gets the non-main messages of the Messages in causal sequence
+func (m Messages) Causes() []string {
+ if len(m) < 2 {
+ return nil
+ }
+ causes := []string{}
+ for _, m := range m[1:] {
+ if msg := m.getMessage(); msg != "" {
+ causes = append(causes, msg)
+ }
+ }
+ return causes
+}
diff --git a/errors/types.go b/errors/types.go
index 981ff64a..06bd82dd 100644
--- a/errors/types.go
+++ b/errors/types.go
@@ -20,7 +20,6 @@ package errors
import (
"fmt"
"net/http"
- "strings"
)
// Supported error types
@@ -37,7 +36,7 @@ var (
Timeout = register(&Type{httpCode: http.StatusGatewayTimeout, meta: "timeout"})
//cached values
- typesByHttpCode = map[int]*Type{}
+ typesByHttpCode = newSyncMap[int, *Type]()
)
type (
@@ -58,24 +57,22 @@ type (
)
func HttpStatus(code int) *Type {
- t, ok := typesByHttpCode[code]
- if !ok {
- t = Internal
+ t, ok := typesByHttpCode.Load(code)
+ if !ok { // lazily cache any missing codes
+ t = &Type{httpCode: code, meta: fmt.Sprintf("type_http_%d", code)}
+ typesByHttpCode.Store(code, t)
}
return t
}
// New constructs a new Error instance with this message
func (t *Type) New(message string, opts ...Option) Error {
- return newCrdbError(t, nil, message, opts...)
+ return newSingleCrdbError(t, nil, message, opts...)
}
// Wrap constructs a new Error instance with this message and wraps the passed in error. A nil 'err' will return a nil Error.
func (t *Type) Wrap(err error, message string, opts ...Option) Error {
- if err == nil {
- return nil
- }
- return newCrdbError(t, err, message, opts...)
+ return newSingleCrdbError(t, err, message, opts...)
}
// WrapRaw constructs a new Error instance that directly wraps this error with no additional context. A nil 'err' will return a nil Error.
@@ -103,24 +100,13 @@ func (t *Type) wrapRaw(err error, forceWrap bool, opts ...Option) Error {
} else {
msg = err.Error()
}
- return newCrdbError(t, err, msg, opts...)
+ return newSingleCrdbError(t, err, msg, opts...)
}
-// Combine constructs a new Error from combining multiple errors. Stacktrace info for each of the errors will not be present in the result.
-func (t *Type) Combine(errs []error, msg string, opts ...Option) Error {
- msgs := []string{}
- for _, e := range errs {
- if le := AsLakeErrorType(e); le != nil {
- if msg0 := le.Message(); msg0 != "" {
- msgs = append(msgs, le.Message())
- }
- } else {
- msgs = append(msgs, e.Error())
- }
- }
- effectiveMsg := strings.Join(msgs, "\n=====================\n")
- effectiveMsg = "\t" + strings.ReplaceAll(effectiveMsg, "\n", "\n\t")
- return newCrdbError(t, nil, fmt.Sprintf("%s\ncombined messages: \n{\n%s\n}", msg, effectiveMsg), opts...)
+// Combine constructs a new Error from combining multiple errors. Stacktrace info for each of the errors will not be present in the result, so it's
+// best to log the errors before combining them.
+func (t *Type) Combine(errs []error) Error {
+ return newCombinedCrdbError(t, errs)
}
// GetHttpCode gets the associated Http code with this Type, if explicitly set, otherwise http.StatusInternalServerError
@@ -149,9 +135,9 @@ func withStackOffset(offset uint) Option {
func register(t *Type) *Type {
if t == nil {
t = &Type{meta: "default"}
- typesByHttpCode[t.httpCode] = t
+ typesByHttpCode.Store(t.httpCode, t)
} else if t.httpCode != 0 {
- typesByHttpCode[t.httpCode] = t
+ typesByHttpCode.Store(t.httpCode, t)
}
return t
}
diff --git a/plugins/gitextractor/gitextractor.go b/plugins/gitextractor/gitextractor.go
index 0e9793f4..a0ac527a 100644
--- a/plugins/gitextractor/gitextractor.go
+++ b/plugins/gitextractor/gitextractor.go
@@ -54,7 +54,7 @@ func (plugin GitExtractor) SubTaskMetas() []core.SubTaskMeta {
// based on task context and user input options, return data that shared among all subtasks
func (plugin GitExtractor) PrepareTaskData(taskCtx core.TaskContext, options map[string]interface{}) (interface{}, errors.Error) {
var op tasks.GitExtractorOptions
- if err := helper.Decode(options, op, nil); err != nil {
+ if err := helper.Decode(options, &op, nil); err != nil {
return nil, err
}
if err := op.Valid(); err != nil {
diff --git a/plugins/github/api/connection.go b/plugins/github/api/connection.go
index 046a4bfb..973530a1 100644
--- a/plugins/github/api/connection.go
+++ b/plugins/github/api/connection.go
@@ -57,7 +57,7 @@ func TestConnection(input *core.ApiResourceInput) (*core.ApiResourceOutput, erro
// verify multiple token in parallel
type VerifyResult struct {
- err error
+ err errors.Error
login string
}
results := make(chan VerifyResult)
@@ -76,21 +76,21 @@ func TestConnection(input *core.ApiResourceInput) (*core.ApiResourceOutput, erro
basicRes,
)
if err != nil {
- results <- VerifyResult{err: errors.Default.Wrap(err, fmt.Sprintf("verify token failed for #%d %s", j, token))}
+ results <- VerifyResult{err: errors.BadInput.Wrap(err, fmt.Sprintf("verify token failed for #%d %s", j, token))}
return
}
res, err := apiClient.Get("user", nil, nil)
if err != nil {
- results <- VerifyResult{err: errors.Default.Wrap(err, fmt.Sprintf("verify token failed for #%d %s", j, token))}
+ results <- VerifyResult{err: errors.HttpStatus(res.StatusCode).Wrap(err, fmt.Sprintf("verify token failed for #%d %s", j, token))}
return
}
githubUserOfToken := &models.GithubUserOfToken{}
err = helper.UnmarshalResponse(res, githubUserOfToken)
if err != nil {
- results <- VerifyResult{err: errors.Default.Wrap(err, fmt.Sprintf("verify token failed for #%v %s", j, token))}
+ results <- VerifyResult{err: errors.BadInput.Wrap(err, fmt.Sprintf("verify token failed for #%v %s", j, token))}
return
} else if githubUserOfToken.Login == "" {
- results <- VerifyResult{err: errors.Default.Wrap(err, fmt.Sprintf("invalid token for #%v %s", j, token))}
+ results <- VerifyResult{err: errors.BadInput.Wrap(err, fmt.Sprintf("invalid token for #%v %s", j, token))}
return
}
results <- VerifyResult{login: githubUserOfToken.Login}
@@ -99,11 +99,11 @@ func TestConnection(input *core.ApiResourceInput) (*core.ApiResourceOutput, erro
// collect verification results
logins := make([]string, 0)
- msgs := make([]string, 0)
+ allErrors := make([]error, 0)
i := 0
for result := range results {
if result.err != nil {
- msgs = append(msgs, result.err.Error())
+ allErrors = append(allErrors, result.err)
}
logins = append(logins, result.login)
i++
@@ -111,8 +111,8 @@ func TestConnection(input *core.ApiResourceInput) (*core.ApiResourceOutput, erro
close(results)
}
}
- if len(msgs) > 0 {
- return nil, errors.Default.New(strings.Join(msgs, "\n"))
+ if len(allErrors) > 0 {
+ return nil, errors.Default.Combine(allErrors)
}
githubApiResponse := GithubTestConnResponse{}
diff --git a/plugins/helper/graphql_async_client.go b/plugins/helper/graphql_async_client.go
index 43068b80..91be1728 100644
--- a/plugins/helper/graphql_async_client.go
+++ b/plugins/helper/graphql_async_client.go
@@ -147,7 +147,7 @@ func (apiClient *GraphqlAsyncClient) NextTick(task func() errors.Error) {
func (apiClient *GraphqlAsyncClient) Wait() errors.Error {
apiClient.waitGroup.Wait()
if len(apiClient.workerErrors) > 0 {
- return errors.Default.Combine(apiClient.workerErrors, "graphql workers encountered error(s)")
+ return errors.Default.Combine(apiClient.workerErrors)
}
return nil
}
diff --git a/plugins/helper/worker_scheduler.go b/plugins/helper/worker_scheduler.go
index 8a181bfb..e74f228f 100644
--- a/plugins/helper/worker_scheduler.go
+++ b/plugins/helper/worker_scheduler.go
@@ -152,7 +152,7 @@ func (s *WorkerScheduler) NextTick(task func() errors.Error) {
func (s *WorkerScheduler) Wait() errors.Error {
s.waitGroup.Wait()
if len(s.workerErrors) > 0 {
- return errors.Default.Combine(s.workerErrors, "worker scheduler captured these errors")
+ return errors.Default.Combine(s.workerErrors)
}
return nil
}
diff --git a/runner/run_task.go b/runner/run_task.go
index 116e6169..28124877 100644
--- a/runner/run_task.go
+++ b/runner/run_task.go
@@ -70,10 +70,12 @@ func RunTask(
if meta, ok := lakeErr.GetData().(*core.SubTaskMeta); ok {
subTaskName = meta.Name
}
+ } else {
+ lakeErr = errors.Convert(err)
}
dbe := db.Model(task).Updates(map[string]interface{}{
"status": models.TASK_FAILED,
- "message": err.Error(),
+ "message": lakeErr.Messages().Format(),
"finished_at": finishedAt,
"spent_seconds": spentSeconds,
"failed_sub_task": subTaskName,
diff --git a/services/pipeline_runner.go b/services/pipeline_runner.go
index f40746e0..5ab5c4c6 100644
--- a/services/pipeline_runner.go
+++ b/services/pipeline_runner.go
@@ -113,11 +113,11 @@ func runPipeline(pipelineId uint64) errors.Error {
err = pipelineRun.runPipelineStandalone()
}
if err != nil {
- err = errors.Default.Wrap(err, fmt.Sprintf("error running pipeline %d", pipelineId))
+ err = errors.Default.Wrap(err, fmt.Sprintf("Error running pipeline %d.", pipelineId))
}
pipeline, e := GetPipeline(pipelineId)
if e != nil {
- return errors.Default.Wrap(err, fmt.Sprintf("unable to get pipeline %d", pipelineId))
+ return errors.Default.Wrap(err, fmt.Sprintf("Unable to get pipeline %d.", pipelineId))
}
// finished, update database
finishedAt := time.Now()
@@ -125,7 +125,11 @@ func runPipeline(pipelineId uint64) errors.Error {
pipeline.SpentSeconds = int(finishedAt.Unix() - pipeline.BeganAt.Unix())
if err != nil {
pipeline.Status = models.TASK_FAILED
- pipeline.Message = err.Error()
+ if lakeErr := errors.AsLakeErrorType(err); lakeErr != nil {
+ pipeline.Message = lakeErr.Messages().Format()
+ } else {
+ pipeline.Message = err.Error()
+ }
} else {
pipeline.Status = models.TASK_COMPLETED
pipeline.Message = ""
diff --git a/services/task.go b/services/task.go
index fd5f7a6a..2268956d 100644
--- a/services/task.go
+++ b/services/task.go
@@ -24,7 +24,6 @@ import (
"github.com/apache/incubator-devlake/errors"
"regexp"
"strconv"
- "strings"
"sync"
"github.com/apache/incubator-devlake/logger"
@@ -225,16 +224,21 @@ func runTasksStandalone(parentLogger core.Logger, taskIds []uint64) errors.Error
taskId := taskId
go func() {
taskLog.Info("run task in background ", taskId)
- results <- runTaskStandalone(parentLogger, taskId)
+ var err errors.Error
+ taskErr := runTaskStandalone(parentLogger, taskId)
+ if taskErr != nil {
+ err = errors.Default.Wrap(taskErr, fmt.Sprintf("Error running task %d.", taskId))
+ }
+ results <- err
}()
}
- errs := make([]string, 0)
+ errs := make([]error, 0)
var err error
finished := 0
for err = range results {
if err != nil {
taskLog.Error(err, "task failed")
- errs = append(errs, err.Error())
+ errs = append(errs, err)
}
finished++
if finished == len(taskIds) {
@@ -242,7 +246,7 @@ func runTasksStandalone(parentLogger core.Logger, taskIds []uint64) errors.Error
}
}
if len(errs) > 0 {
- err = errors.Default.New(strings.Join(errs, "\n"))
+ err = errors.Default.Combine(errs)
}
return errors.Convert(err)
}