You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@devlake.apache.org by zk...@apache.org on 2022/08/19 01:46:17 UTC
[incubator-devlake] branch main updated: [feat-2578]: Download pipeline logging archives API (#2656)
This is an automated email from the ASF dual-hosted git repository.
zky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git
The following commit(s) were added to refs/heads/main by this push:
new 62512126 [feat-2578]: Download pipeline logging archives API (#2656)
62512126 is described below
commit 625121267de32f60720531430e902c6f84a03cef
Author: Keon Amini <ke...@merico.dev>
AuthorDate: Thu Aug 18 20:46:12 2022 -0500
[feat-2578]: Download pipeline logging archives API (#2656)
* feat: add download pipeline logs API and functionality (#2578)
* refactor: re-implemented archive creation differently
* refactor: archive creation re-implemented more robustly
* refactor: using viant/afs library for archiving logic to simplify code
* refactor: reworked most things based on feedback + swagger docs and http headers fixed.
* refactor: considerably simplified the logic to setup logger instances
* refactor: some refactor in response to PR feedback
* fix: some fixes
* fix: fixed pipeline run prematuring exiting upon error
---
.env.example | 1 +
api/pipelines/pipelines.go | 41 ++++++
api/router.go | 2 +
errors/errors.go | 8 ++
go.mod | 8 +-
go.sum | 16 +--
logger/init.go | 35 ++++-
logger/logger.go | 87 +++++++-----
errors/errors.go => logger/stream.go | 41 +++---
worker/app/task_activity.go => logger/utils.go | 52 +++----
...20729_rename_columns_of_pull_request_issues.go} | 0
plugins/core/logger.go | 26 +++-
runner/run_pipeline.go | 2 +-
runner/run_task.go | 28 +++-
services/pipeline.go | 150 ++++++++-------------
services/pipeline_runner.go | 139 +++++++++++++++++++
services/task.go | 10 +-
utils/io.go | 80 +++++++++++
worker/app/pipeline_workflow.go | 66 +++++----
worker/app/shared.go | 21 ++-
worker/app/task_activity.go | 4 +-
21 files changed, 576 insertions(+), 241 deletions(-)
diff --git a/.env.example b/.env.example
index 778e06c8..2214b354 100644
--- a/.env.example
+++ b/.env.example
@@ -27,6 +27,7 @@ TEMPORAL_URL=
TEMPORAL_TASK_QUEUE=
# Debug Info Warn Error
LOGGING_LEVEL=
+LOGGING_DIR=./logs
##########################
# Sensitive information encryption key
diff --git a/api/pipelines/pipelines.go b/api/pipelines/pipelines.go
index c5cc259c..d6cbb5e4 100644
--- a/api/pipelines/pipelines.go
+++ b/api/pipelines/pipelines.go
@@ -18,7 +18,10 @@ limitations under the License.
package pipelines
import (
+ "github.com/apache/incubator-devlake/errors"
"net/http"
+ "os"
+ "path/filepath"
"strconv"
"github.com/apache/incubator-devlake/api/shared"
@@ -167,3 +170,41 @@ func Delete(c *gin.Context) {
}
shared.ApiOutputSuccess(c, nil, http.StatusOK)
}
+
+/*
+Get download logs of a pipeline
+GET /pipelines/:pipelineId/logging.tar.gz
+*/
+// download logs of a pipeline
+// @Description GET /pipelines/:pipelineId/logging.tar.gz
+// @Tags framework/pipelines
+// @Param pipelineId path int true "query"
+// @Success 200 "The archive file"
+// @Failure 400 {string} errcode.Error "Bad Request"
+// @Failure 404 {string} errcode.Error "Pipeline not found"
+// @Failure 500 {string} errcode.Error "Internel Error"
+// @Router /pipelines/{pipelineId}/logging.tar.gz [get]
+func DownloadLogs(c *gin.Context) {
+ pipelineId := c.Param("pipelineId")
+ id, err := strconv.ParseUint(pipelineId, 10, 64)
+ if err != nil {
+ shared.ApiOutputError(c, err, http.StatusBadRequest)
+ return
+ }
+ pipeline, err := services.GetPipeline(id)
+ if err != nil {
+ if errors.IsNotFound(err) {
+ shared.ApiOutputError(c, err, http.StatusNotFound)
+ } else {
+ shared.ApiOutputError(c, err, http.StatusInternalServerError)
+ }
+ return
+ }
+ archive, err := services.GetPipelineLogsArchivePath(pipeline)
+ if err != nil {
+ shared.ApiOutputError(c, err, http.StatusInternalServerError)
+ return
+ }
+ defer os.Remove(archive)
+ c.FileAttachment(archive, filepath.Base(archive))
+}
diff --git a/api/router.go b/api/router.go
index 84767c1e..0d4eb08b 100644
--- a/api/router.go
+++ b/api/router.go
@@ -50,6 +50,8 @@ func RegisterRouter(r *gin.Engine) {
r.DELETE("/pipelines/:pipelineId", pipelines.Delete)
r.GET("/pipelines/:pipelineId/tasks", task.Index)
+ r.GET("/pipelines/:pipelineId/logging.tar.gz", pipelines.DownloadLogs)
+
r.GET("/ping", ping.Get)
r.GET("/version", version.Get)
r.POST("/push/:tableName", push.Post)
diff --git a/errors/errors.go b/errors/errors.go
index d908c5a3..b0a842b9 100644
--- a/errors/errors.go
+++ b/errors/errors.go
@@ -45,4 +45,12 @@ func NewNotFound(message string) *Error {
return NewError(http.StatusNotFound, message)
}
+func IsNotFound(err error) bool {
+ errCast, ok := err.(*Error)
+ if !ok {
+ return false
+ }
+ return errCast.Status == http.StatusNotFound
+}
+
var InternalError = NewError(http.StatusInternalServerError, "Server Internal Error")
diff --git a/go.mod b/go.mod
index 7e042d66..e2481d9a 100644
--- a/go.mod
+++ b/go.mod
@@ -8,9 +8,11 @@ require (
github.com/go-git/go-git/v5 v5.4.2
github.com/go-playground/validator/v10 v10.9.0
github.com/gocarina/gocsv v0.0.0-20220707092902-b9da1f06c77e
+ github.com/google/uuid v1.3.0
github.com/libgit2/git2go/v33 v33.0.6
github.com/magiconair/properties v1.8.5
github.com/manifoldco/promptui v0.9.0
+ github.com/merico-dev/graphql v0.0.0-20220804061427-a2245fa66df2
github.com/mitchellh/mapstructure v1.4.1
github.com/panjf2000/ants/v2 v2.4.6
github.com/robfig/cron/v3 v3.0.0
@@ -22,6 +24,7 @@ require (
github.com/stretchr/testify v1.7.0
github.com/swaggo/gin-swagger v1.4.3
github.com/swaggo/swag v1.8.3
+ github.com/viant/afs v1.16.0
github.com/x-cray/logrus-prefixed-formatter v0.5.2
go.temporal.io/api v1.7.1-0.20220223032354-6e6fe738916a
go.temporal.io/sdk v1.14.0
@@ -47,6 +50,7 @@ require (
github.com/facebookgo/clock v0.0.0-20150410010913-600d898af40a // indirect
github.com/fsnotify/fsnotify v1.5.1 // indirect
github.com/gin-contrib/sse v0.1.0 // indirect
+ github.com/go-errors/errors v1.4.2 // indirect
github.com/go-git/gcfg v1.5.0 // indirect
github.com/go-git/go-billy/v5 v5.3.1 // indirect
github.com/go-openapi/jsonpointer v0.19.5 // indirect
@@ -61,7 +65,6 @@ require (
github.com/gogo/status v1.1.0 // indirect
github.com/golang/mock v1.6.0 // indirect
github.com/golang/protobuf v1.5.2 // indirect
- github.com/google/uuid v1.3.0 // indirect
github.com/gopherjs/gopherjs v0.0.0-20190430165422-3e4dfb77656c // indirect
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
@@ -86,7 +89,6 @@ require (
github.com/mattn/go-colorable v0.1.6 // indirect
github.com/mattn/go-isatty v0.0.13 // indirect
github.com/mattn/go-sqlite3 v1.14.6 // indirect
- github.com/merico-dev/graphql v0.0.0-20220804061427-a2245fa66df2 // indirect
github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d // indirect
github.com/mitchellh/go-homedir v1.1.0 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
@@ -95,11 +97,11 @@ require (
github.com/onsi/gomega v1.10.3 // indirect
github.com/pborman/uuid v1.2.1 // indirect
github.com/pelletier/go-toml v1.9.3 // indirect
+ github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/robfig/cron v1.2.0 // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/sergi/go-diff v1.1.0 // indirect
- github.com/shurcooL/graphql v0.0.0-20220606043923-3cf50f8a0a29 // indirect
github.com/spf13/cast v1.4.1 // indirect
github.com/spf13/jwalterweatherman v1.1.0 // indirect
github.com/spf13/pflag v1.0.6-0.20200504143853-81378bbcd8a1 // indirect
diff --git a/go.sum b/go.sum
index e0931d46..6dc810da 100644
--- a/go.sum
+++ b/go.sum
@@ -128,6 +128,8 @@ github.com/gin-gonic/gin v1.7.7 h1:3DoBmSbJbZAWqXJC3SLjAPfutPJJRN1U5pALB7EeTTs=
github.com/gin-gonic/gin v1.7.7/go.mod h1:axIBovoeJpVj8S3BwE0uPMTeReE4+AfFtqpqaZ1qq1U=
github.com/gliderlabs/ssh v0.2.2 h1:6zsha5zo/TWhRhwqCD3+EarCAgZ2yN28ipRnGPnwkI0=
github.com/gliderlabs/ssh v0.2.2/go.mod h1:U7qILu1NlMHj9FlMhZLlkCdDnU1DBEAqr0aevW3Awn0=
+github.com/go-errors/errors v1.4.2 h1:J6MZopCL4uSllY1OfXM374weqZFFItUbrImctkmUxIA=
+github.com/go-errors/errors v1.4.2/go.mod h1:sIVyrIiJhuEF+Pj9Ebtd6P/rEYROXFi3BopGUQ5a5Og=
github.com/go-git/gcfg v1.5.0 h1:Q5ViNfGF8zFgyJWPqYwA7qGFoMTEiBmdlkcfRmpIMa4=
github.com/go-git/gcfg v1.5.0/go.mod h1:5m20vg6GwYabIxaOonVkTdrILxQMpEShl1xiMF4ua+E=
github.com/go-git/go-billy/v5 v5.2.0/go.mod h1:pmpqyWchKfYfrkb/UVH4otLvyi/5gJlGI4Hb3ZqZ3W0=
@@ -432,10 +434,6 @@ github.com/mattn/go-isatty v0.0.13/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Ky
github.com/mattn/go-sqlite3 v1.14.5/go.mod h1:WVKg1VTActs4Qso6iwGbiFih2UIHo0ENGwNd0Lj+XmI=
github.com/mattn/go-sqlite3 v1.14.6 h1:dNPt6NO46WmLVt2DLNpwczCmdV5boIZ6g/tlDrlRUbg=
github.com/mattn/go-sqlite3 v1.14.6/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU=
-github.com/merico-dev/graphql v0.0.0-20220606043923-3cf50f8a0a29 h1:jEarKDWDyd59SKG2AoH/3JYqRFfl7RE2uEzGJRpCl3Q=
-github.com/merico-dev/graphql v0.0.0-20220606043923-3cf50f8a0a29/go.mod h1:q+XAHrrzQfwrEaGgnl/VduY0Gd9FqSnMQEpKq101TxE=
-github.com/merico-dev/graphql v0.0.0-20220803162350-42fdc19ba54c h1:7l5zeXBLMWafzHD0ElpaNVTVhNzcQz7Urkw9WebNt5o=
-github.com/merico-dev/graphql v0.0.0-20220803162350-42fdc19ba54c/go.mod h1:dcDqG8HXVtfEhTCipFMa0Q+RTKTtDKIO2vJt+JVzHEQ=
github.com/merico-dev/graphql v0.0.0-20220804061427-a2245fa66df2 h1:sOXuZIg3OwBnvJFfIuO8wegiLpeDCOSVvk2dsbjurd8=
github.com/merico-dev/graphql v0.0.0-20220804061427-a2245fa66df2/go.mod h1:dcDqG8HXVtfEhTCipFMa0Q+RTKTtDKIO2vJt+JVzHEQ=
github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d h1:5PJl274Y63IEHC+7izoQE9x6ikvDFZS2mDVS3drnohI=
@@ -516,8 +514,6 @@ github.com/shopspring/decimal v0.0.0-20180709203117-cd690d0c9e24/go.mod h1:M+9Nz
github.com/shopspring/decimal v0.0.0-20200227202807-02e2044944cc/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o=
github.com/shopspring/decimal v1.2.0 h1:abSATXmQEYyShuxI4/vyW3tV1MrKAJzCZ/0zLUXYbsQ=
github.com/shopspring/decimal v1.2.0/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o=
-github.com/shurcooL/graphql v0.0.0-20220606043923-3cf50f8a0a29 h1:B1PEwpArrNp4dkQrfxh/abbBAOZBVp0ds+fBEOUOqOc=
-github.com/shurcooL/graphql v0.0.0-20220606043923-3cf50f8a0a29/go.mod h1:AuYgA5Kyo4c7HfUmvRGs/6rGlMMV/6B1bVnB9JxJEEg=
github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc=
github.com/sirupsen/logrus v1.4.1/go.mod h1:ni0Sbl8bgC9z8RoU9G6nDWqqs/fq4eDPysMBDgk/93Q=
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
@@ -570,6 +566,8 @@ github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLY
github.com/ugorji/go/codec v1.2.6 h1:7kbGefxLoDBuYXOms4yD7223OpNMMPNPZxXk5TvFcyQ=
github.com/ugorji/go/codec v1.2.6/go.mod h1:V6TCNZ4PHqoHGFZuSG1W8nrCzzdgA2DozYxWFFpvxTw=
github.com/urfave/cli/v2 v2.3.0/go.mod h1:LJmUH05zAU44vOAcrfzZQKsZbVcdbOG8rtL3/XcUArI=
+github.com/viant/afs v1.16.0 h1:yb9TQ1gjVVLji9lcXLWaarklqmGWeXTZOwc2fwJevCI=
+github.com/viant/afs v1.16.0/go.mod h1:wdiEDffZKJwj1ZSFasy7hHoxLQdSpFZkd3XOWNt1aN0=
github.com/x-cray/logrus-prefixed-formatter v0.5.2 h1:00txxvfBM9muc0jiLIEAkAcIMJzfthRT6usrui8uGmg=
github.com/x-cray/logrus-prefixed-formatter v0.5.2/go.mod h1:2duySbKsL6M18s5GU7VPsoEPHyzalCE06qoARUCeBBE=
github.com/xanzy/ssh-agent v0.3.0 h1:wUMzuKtKilRgBAD1sUb8gOwwRr2FGoBVumcjoOACClI=
@@ -712,12 +710,8 @@ golang.org/x/net v0.0.0-20210421230115-4e50805a0758/go.mod h1:72T/g9IO56b78aLF+1
golang.org/x/net v0.0.0-20210805182204-aaa1db679c0d/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
golang.org/x/net v0.0.0-20220425223048-2871e0cb64e4/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
-golang.org/x/net v0.0.0-20220708220712-1185a9018129 h1:vucSRfWwTsoXro7P+3Cjlr6flUMtzCwzlvkxEQtHHB0=
-golang.org/x/net v0.0.0-20220708220712-1185a9018129/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
golang.org/x/net v0.0.0-20220728211354-c7608f3a8462 h1:UreQrH7DbFXSi9ZFox6FNT3WBooWmdANpU+IfkT1T4I=
golang.org/x/net v0.0.0-20220728211354-c7608f3a8462/go.mod h1:YDH+HFinaLZZlnHAfSS6ZXJJ9M9t4Dl22yv3iI2vPwk=
-golang.org/x/net v0.0.0-20220802222814-0bcc04d9c69b h1:3ogNYyK4oIQdIKzTu68hQrr4iuVxF3AxKl9Aj/eDrw0=
-golang.org/x/net v0.0.0-20220802222814-0bcc04d9c69b/go.mod h1:YDH+HFinaLZZlnHAfSS6ZXJJ9M9t4Dl22yv3iI2vPwk=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
@@ -810,8 +804,6 @@ golang.org/x/sys v0.0.0-20210806184541-e5e7981a1069/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20210809222454-d867a43fc93e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220222200937-f2425489ef4c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
-golang.org/x/sys v0.0.0-20220712014510-0a85c31ab51e h1:NHvCuwuS43lGnYhten69ZWqi2QOj/CiDNcKbVqwVoew=
-golang.org/x/sys v0.0.0-20220712014510-0a85c31ab51e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10 h1:WIoqL4EROvwiPdUtaip4VcDdpZ4kha7wBWZrbVKCIZg=
golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=
diff --git a/logger/init.go b/logger/init.go
index a5ff6616..3f5b2c70 100644
--- a/logger/init.go
+++ b/logger/init.go
@@ -18,12 +18,12 @@ limitations under the License.
package logger
import (
- "fmt"
"github.com/apache/incubator-devlake/config"
"github.com/apache/incubator-devlake/plugins/core"
"github.com/sirupsen/logrus"
prefixed "github.com/x-cray/logrus-prefixed-formatter"
- "os"
+ "io"
+ "path/filepath"
"strings"
)
@@ -33,7 +33,8 @@ var Global core.Logger
func init() {
inner = logrus.New()
logLevel := logrus.InfoLevel
- switch strings.ToLower(config.GetConfig().GetString("LOGGING_LEVEL")) {
+ cfg := config.GetConfig()
+ switch strings.ToLower(cfg.GetString("LOGGING_LEVEL")) {
case "debug":
logLevel = logrus.DebugLevel
case "info":
@@ -48,10 +49,30 @@ func init() {
TimestampFormat: "2006-01-02 15:04:05",
FullTimestamp: true,
})
+ basePath := cfg.GetString("LOGGING_DIR")
+ if basePath == "" {
+ inner.Error("LOGGING_DIR is not set. Log files will not be generated.")
+ } else {
+ basePath = filepath.Join(basePath, "devlake.log")
+ }
+ var err error
+ Global, err = NewDefaultLogger(inner)
+ Global.SetStream(&core.LoggerStreamConfig{
+ Path: basePath,
+ Writer: createLogStream(basePath),
+ })
+ if err != nil {
+ panic(err)
+ }
+}
- if err := os.Mkdir("logs", 0777); err != nil {
- inner.Info(fmt.Sprintf("failed to create dir logs: %s", err))
+func createLogStream(path string) io.Writer {
+ if path == "" {
+ return nil
+ }
+ stream, err := GetFileStream(path)
+ if err != nil {
+ panic(err)
}
- loggerPool := make(map[string]*logrus.Logger)
- Global = NewDefaultLogger(inner, "", loggerPool)
+ return stream
}
diff --git a/logger/logger.go b/logger/logger.go
index 110affe1..c4a52aac 100644
--- a/logger/logger.go
+++ b/logger/logger.go
@@ -21,21 +21,23 @@ import (
"fmt"
"github.com/apache/incubator-devlake/plugins/core"
"github.com/sirupsen/logrus"
- "io"
- "os"
"regexp"
+ "strings"
)
+var alreadyInBracketsRegex = regexp.MustCompile(`\[.*?]+`)
+
type DefaultLogger struct {
- prefix string
- log *logrus.Logger
- loggerPool map[string]*logrus.Logger
+ log *logrus.Logger
+ config *core.LoggerConfig
}
-func NewDefaultLogger(log *logrus.Logger, prefix string, loggerPool map[string]*logrus.Logger) *DefaultLogger {
- newDefaultLogger := &DefaultLogger{prefix: prefix, log: log}
- newDefaultLogger.loggerPool = loggerPool
- return newDefaultLogger
+func NewDefaultLogger(log *logrus.Logger) (core.Logger, error) {
+ defaultLogger := &DefaultLogger{
+ log: log,
+ config: &core.LoggerConfig{},
+ }
+ return defaultLogger, nil
}
func (l *DefaultLogger) IsLevelEnabled(level core.LogLevel) bool {
@@ -48,8 +50,8 @@ func (l *DefaultLogger) IsLevelEnabled(level core.LogLevel) bool {
func (l *DefaultLogger) Log(level core.LogLevel, format string, a ...interface{}) {
if l.IsLevelEnabled(level) {
msg := fmt.Sprintf(format, a...)
- if l.prefix != "" {
- msg = fmt.Sprintf("%s %s", l.prefix, msg)
+ if l.config.Prefix != "" {
+ msg = fmt.Sprintf("%s %s", l.config.Prefix, msg)
}
l.log.Log(logrus.Level(level), msg)
}
@@ -75,32 +77,57 @@ func (l *DefaultLogger) Error(format string, a ...interface{}) {
l.Log(core.LOG_ERROR, format, a...)
}
-// bind two writer to logger
-func (l *DefaultLogger) Nested(name string) core.Logger {
- writerStd := os.Stdout
- fileName := ""
- loggerPrefixRegex := regexp.MustCompile(`(\[task #\d+]\s\[\w+])`)
- groups := loggerPrefixRegex.FindStringSubmatch(fmt.Sprintf("%s [%s]", l.prefix, name))
- if len(groups) > 1 {
- fileName = groups[1]
+func (l *DefaultLogger) SetStream(config *core.LoggerStreamConfig) {
+ if config.Path != "" {
+ l.config.Path = config.Path
+ }
+ if config.Writer != nil {
+ l.log.SetOutput(config.Writer)
+ }
+}
+
+func (l *DefaultLogger) GetConfig() *core.LoggerConfig {
+ return &core.LoggerConfig{
+ Path: l.config.Path,
+ Prefix: l.config.Prefix,
}
+}
- if fileName == "" {
- fileName = "devlake"
+func (l *DefaultLogger) Nested(newPrefix string) core.Logger {
+ newTotalPrefix := newPrefix
+ if newPrefix != "" {
+ newTotalPrefix = l.createPrefix(newPrefix)
+ }
+ newLogger, err := l.getLogger(newTotalPrefix)
+ if err != nil {
+ l.Error("error getting a new logger: %v", newLogger)
+ return l
}
+ return newLogger
+}
- if l.loggerPool[fileName] != nil {
- return NewDefaultLogger(l.loggerPool[fileName], fmt.Sprintf("%s [%s]", l.prefix, name), l.loggerPool)
+func (l *DefaultLogger) getLogger(prefix string) (core.Logger, error) {
+ newLogrus := logrus.New()
+ newLogrus.SetLevel(l.log.Level)
+ newLogrus.SetFormatter(l.log.Formatter)
+ newLogrus.SetOutput(l.log.Out)
+ newLogger := &DefaultLogger{
+ log: newLogrus,
+ config: &core.LoggerConfig{
+ Path: l.config.Path,
+ Prefix: prefix,
+ },
}
- newLog := logrus.New()
- newLog.SetLevel(l.log.Level)
- newLog.SetFormatter(l.log.Formatter)
+ return newLogger, nil
+}
- if file, err := os.OpenFile(fmt.Sprintf("logs/%s.log", fileName), os.O_APPEND|os.O_WRONLY|os.O_CREATE, 0666); err == nil {
- newLog.SetOutput(io.MultiWriter(writerStd, file))
+func (l *DefaultLogger) createPrefix(newPrefix string) string {
+ newPrefix = strings.TrimSpace(newPrefix)
+ alreadyInBrackets := alreadyInBracketsRegex.MatchString(newPrefix)
+ if alreadyInBrackets {
+ return fmt.Sprintf("%s %s", l.config.Prefix, newPrefix)
}
- l.loggerPool[fileName] = newLog
- return NewDefaultLogger(newLog, fmt.Sprintf("%s [%s]", l.prefix, name), l.loggerPool)
+ return fmt.Sprintf("%s [%s]", l.config.Prefix, newPrefix)
}
var _ core.Logger = (*DefaultLogger)(nil)
diff --git a/errors/errors.go b/logger/stream.go
similarity index 63%
copy from errors/errors.go
copy to logger/stream.go
index d908c5a3..0161bb89 100644
--- a/errors/errors.go
+++ b/logger/stream.go
@@ -15,34 +15,25 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
-package errors
+package logger
import (
- "net/http"
+ "io"
+ "os"
+ "path/filepath"
)
-type Error struct {
- Status int
- Message string
-}
-
-func (e *Error) Code() int {
- return e.Status
-}
-
-func (e *Error) Error() string {
- return e.Message
-}
-
-func NewError(status int, message string) *Error {
- return &Error{
- status,
- message,
+func GetFileStream(path string) (io.Writer, error) {
+ if path == "" {
+ return os.Stdout, nil
}
+ err := os.MkdirAll(filepath.Dir(path), os.ModePerm)
+ if err != nil {
+ return nil, err
+ }
+ file, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0777)
+ if err != nil {
+ return nil, err
+ }
+ return io.MultiWriter(os.Stdout, file), nil
}
-
-func NewNotFound(message string) *Error {
- return NewError(http.StatusNotFound, message)
-}
-
-var InternalError = NewError(http.StatusInternalServerError, "Server Internal Error")
diff --git a/worker/app/task_activity.go b/logger/utils.go
similarity index 52%
copy from worker/app/task_activity.go
copy to logger/utils.go
index 9fd0ad16..a4a2ced6 100644
--- a/worker/app/task_activity.go
+++ b/logger/utils.go
@@ -15,37 +15,43 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
-package app
+package logger
import (
- "context"
-
+ "fmt"
"github.com/apache/incubator-devlake/models"
"github.com/apache/incubator-devlake/plugins/core"
- "github.com/apache/incubator-devlake/runner"
- "go.temporal.io/sdk/activity"
+ "os"
+ "path/filepath"
)
-// DevLakeTaskActivity FIXME ...
-func DevLakeTaskActivity(ctx context.Context, configJson []byte, taskId uint64) error {
- cfg, log, db, err := loadResources(configJson)
+func GetTaskLoggerPath(config *core.LoggerConfig, t *models.Task) string {
+ if config.Path == "" {
+ return ""
+ }
+ info, err := os.Stat(config.Path)
if err != nil {
- return err
+ panic(err)
}
- log.Info("received task #%d", taskId)
- progressDetail := &models.TaskProgressDetail{}
- progChan := make(chan core.RunningProgress)
- defer close(progChan)
- go func() {
- for p := range progChan {
- runner.UpdateProgressDetail(db, log, taskId, progressDetail, &p)
- activity.RecordHeartbeat(ctx, progressDetail)
- }
- }()
- err = runner.RunTask(ctx, cfg, log, db, progChan, taskId)
+ basePath := config.Path
+ if !info.IsDir() {
+ basePath = filepath.Dir(config.Path)
+ }
+ return filepath.Join(basePath, fmt.Sprintf("task-%d-%d-%d-%s.log", t.ID, t.PipelineRow, t.PipelineCol, t.Plugin))
+}
+
+func GetPipelineLoggerPath(config *core.LoggerConfig, p *models.Pipeline) string {
+ if config.Path == "" {
+ return ""
+ }
+ info, err := os.Stat(config.Path)
if err != nil {
- log.Error("failed to execute task #%d: %w", taskId, err)
+ panic(err)
+ }
+ basePath := config.Path
+ if !info.IsDir() {
+ basePath = filepath.Dir(config.Path)
}
- log.Info("finished task #%d", taskId)
- return err
+ formattedCreationTime := p.CreatedAt.UTC().Format("20060102-1504")
+ return filepath.Join(basePath, fmt.Sprintf("pipeline-%d-%s", p.ID, formattedCreationTime), "pipeline.log")
}
diff --git a/models/migrationscripts/202220729_rename_columns_of_pull_request_issues.go b/models/migrationscripts/20220729_rename_columns_of_pull_request_issues.go
similarity index 100%
rename from models/migrationscripts/202220729_rename_columns_of_pull_request_issues.go
rename to models/migrationscripts/20220729_rename_columns_of_pull_request_issues.go
diff --git a/plugins/core/logger.go b/plugins/core/logger.go
index 0b345426..0c71ab2f 100644
--- a/plugins/core/logger.go
+++ b/plugins/core/logger.go
@@ -17,7 +17,10 @@ limitations under the License.
package core
-import "github.com/sirupsen/logrus"
+import (
+ "github.com/sirupsen/logrus"
+ "io"
+)
type LogLevel logrus.Level
@@ -28,7 +31,7 @@ const (
LOG_ERROR LogLevel = LogLevel(logrus.ErrorLevel)
)
-// General logger interface, can be used any where
+// Logger General logger interface, can be used anywhere
type Logger interface {
IsLevelEnabled(level LogLevel) bool
Printf(format string, a ...interface{})
@@ -37,10 +40,27 @@ type Logger interface {
Info(format string, a ...interface{})
Warn(format string, a ...interface{})
Error(format string, a ...interface{})
- // return a new logger which output nested log
+ // Nested return a new logger instance. `name` is the extra prefix to be prepended to each message. Leaving it blank
+ // will add no additional prefix. The new Logger will inherit the properties of the original.
Nested(name string) Logger
+ // GetConfig Returns a copy of the LoggerConfig associated with this Logger. This is meant to be used by the framework.
+ GetConfig() *LoggerConfig
+ // SetStream sets the output of this Logger. This is meant to be used by the framework.
+ SetStream(config *LoggerStreamConfig)
+}
+
+// LoggerStreamConfig stream related config to set on a Logger
+type LoggerStreamConfig struct {
+ Path string
+ Writer io.Writer
}
type InjectLogger interface {
SetLogger(logger Logger)
}
+
+// LoggerConfig config related to the Logger. This needs to be serializable, so it can be passed around over the wire.
+type LoggerConfig struct {
+ Path string
+ Prefix string
+}
diff --git a/runner/run_pipeline.go b/runner/run_pipeline.go
index 8ed39d37..5bf37d5c 100644
--- a/runner/run_pipeline.go
+++ b/runner/run_pipeline.go
@@ -28,7 +28,7 @@ import (
// RunPipeline FIXME ...
func RunPipeline(
- cfg *viper.Viper,
+ _ *viper.Viper,
log core.Logger,
db *gorm.DB,
pipelineId uint64,
diff --git a/runner/run_task.go b/runner/run_task.go
index 55ec737c..1f5c2828 100644
--- a/runner/run_task.go
+++ b/runner/run_task.go
@@ -21,6 +21,7 @@ import (
"context"
"encoding/json"
"fmt"
+ "github.com/apache/incubator-devlake/logger"
"time"
"github.com/apache/incubator-devlake/config"
@@ -39,7 +40,7 @@ import (
func RunTask(
ctx context.Context,
_ *viper.Viper,
- logger core.Logger,
+ parentLogger core.Logger,
db *gorm.DB,
progress chan core.RunningProgress,
taskId uint64,
@@ -52,6 +53,10 @@ func RunTask(
if task.Status == models.TASK_COMPLETED {
return fmt.Errorf("invalid task status")
}
+ log, err := getTaskLogger(parentLogger, task)
+ if err != nil {
+ return err
+ }
beganAt := time.Now()
// make sure task status always correct even if it panicked
defer func() {
@@ -73,7 +78,7 @@ func RunTask(
"failed_sub_task": subTaskName,
}).Error
if dbe != nil {
- logger.Error("failed to finalize task status into db: %w", err)
+ log.Error("failed to finalize task status into db: %w", err)
}
} else {
err = db.Model(task).Updates(map[string]interface{}{
@@ -86,7 +91,7 @@ func RunTask(
}()
// start execution
- logger.Info("start executing task: %d", task.ID)
+ log.Info("start executing task: %d", task.ID)
err = db.Model(task).Updates(map[string]interface{}{
"status": models.TASK_RUNNING,
"message": "",
@@ -110,7 +115,7 @@ func RunTask(
err = RunPluginTask(
ctx,
config.GetConfig(),
- logger.Nested(task.Plugin),
+ log.Nested(task.Plugin),
db,
task.ID,
task.Plugin,
@@ -141,7 +146,6 @@ func RunPluginTask(
if !ok {
return fmt.Errorf("plugin %s doesn't support PluginTask interface", name)
}
-
return RunPluginSubTasks(
ctx,
cfg,
@@ -326,3 +330,17 @@ func recordSubtask(logger core.Logger, db *gorm.DB, subtask *models.Subtask) {
logger.Error("error writing subtask %d status to DB: %v", subtask.ID, err)
}
}
+
+func getTaskLogger(parentLogger core.Logger, task *models.Task) (core.Logger, error) {
+ log := parentLogger.Nested(fmt.Sprintf("task #%d", task.ID))
+ loggingPath := logger.GetTaskLoggerPath(log.GetConfig(), task)
+ stream, err := logger.GetFileStream(loggingPath)
+ if err != nil {
+ return nil, err
+ }
+ log.SetStream(&core.LoggerStreamConfig{
+ Path: loggingPath,
+ Writer: stream,
+ })
+ return log, nil
+}
diff --git a/services/pipeline.go b/services/pipeline.go
index 38c96e07..be6774a5 100644
--- a/services/pipeline.go
+++ b/services/pipeline.go
@@ -21,14 +21,16 @@ import (
"context"
"encoding/json"
"fmt"
+ "github.com/apache/incubator-devlake/utils"
+ "github.com/google/uuid"
+ "os"
+ "path/filepath"
"strings"
"time"
"github.com/apache/incubator-devlake/errors"
"github.com/apache/incubator-devlake/logger"
"github.com/apache/incubator-devlake/models"
- "github.com/apache/incubator-devlake/runner"
- "github.com/apache/incubator-devlake/worker/app"
v11 "go.temporal.io/api/enums/v1"
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/converter"
@@ -38,7 +40,7 @@ import (
var notificationService *NotificationService
var temporalClient client.Client
-var pipelineLog = logger.Global.Nested("pipeline service")
+var globalPipelineLog = logger.Global.Nested("pipeline service")
// PipelineQuery FIXME ...
type PipelineQuery struct {
@@ -85,7 +87,7 @@ func pipelineServiceInit() {
panic(fmt.Errorf(`PIPELINE_MAX_PARALLEL should be a positive integer`))
}
if pipelineMaxParallel == 0 {
- pipelineLog.Warn(`pipelineMaxParallel=0 means pipeline will be run No Limit`)
+ globalPipelineLog.Warn(`pipelineMaxParallel=0 means pipeline will be run No Limit`)
pipelineMaxParallel = 10000
}
// run pipeline with independent goroutine
@@ -109,7 +111,7 @@ func CreatePipeline(newPipeline *models.NewPipeline) (*models.Pipeline, error) {
// save pipeline to database
err := db.Create(&pipeline).Error
if err != nil {
- pipelineLog.Error("create pipline failed: %w", err)
+ globalPipelineLog.Error("create pipline failed: %w", err)
return nil, errors.InternalError
}
@@ -125,7 +127,7 @@ func CreatePipeline(newPipeline *models.NewPipeline) (*models.Pipeline, error) {
}
_, err := CreateTask(newTask)
if err != nil {
- pipelineLog.Error("create task for pipeline failed: %w", err)
+ globalPipelineLog.Error("create task for pipeline failed: %w", err)
return nil, err
}
// sync task state back to pipeline
@@ -133,7 +135,7 @@ func CreatePipeline(newPipeline *models.NewPipeline) (*models.Pipeline, error) {
}
}
if err != nil {
- pipelineLog.Error("save tasks for pipeline failed: %w", err)
+ globalPipelineLog.Error("save tasks for pipeline failed: %w", err)
return nil, errors.InternalError
}
if pipeline.TotalTasks == 0 {
@@ -150,7 +152,7 @@ func CreatePipeline(newPipeline *models.NewPipeline) (*models.Pipeline, error) {
"plan": pipeline.Plan,
}).Error
if err != nil {
- pipelineLog.Error("update pipline state failed: %w", err)
+ globalPipelineLog.Error("update pipline state failed: %w", err)
return nil, errors.InternalError
}
@@ -199,21 +201,34 @@ func GetPipeline(pipelineId uint64) (*models.Pipeline, error) {
return pipeline, nil
}
+// GetPipelineLogsArchivePath creates an archive for the logs of this pipeline and returns its file path
+func GetPipelineLogsArchivePath(pipeline *models.Pipeline) (string, error) {
+ logPath, err := getPipelineLogsPath(pipeline)
+ if err != nil {
+ return "", err
+ }
+ archive := fmt.Sprintf("%s/%s/logging.tar.gz", os.TempDir(), uuid.New())
+ if err = utils.CreateArchive(archive, true, logPath); err != nil {
+ return "", err
+ }
+ return archive, err
+}
+
// RunPipelineInQueue query pipeline from db and run it in a queue
func RunPipelineInQueue(pipelineMaxParallel int64) {
sema := semaphore.NewWeighted(pipelineMaxParallel)
startedPipelineIds := []uint64{}
- for true {
- pipelineLog.Info("wait for new pipeline")
+ for {
+ globalPipelineLog.Info("wait for new pipeline")
// start goroutine when sema lock ready and pipeline exist.
// to avoid read old pipeline, acquire lock before read exist pipeline
err := sema.Acquire(context.TODO(), 1)
if err != nil {
panic(err)
}
- pipelineLog.Info("get lock and wait pipeline")
+ globalPipelineLog.Info("get lock and wait pipeline")
pipeline := &models.Pipeline{}
- for true {
+ for {
db.Where("status = ?", models.TASK_CREATED).
Not(startedPipelineIds).
Order("id ASC").Limit(1).Find(pipeline)
@@ -225,80 +240,15 @@ func RunPipelineInQueue(pipelineMaxParallel int64) {
startedPipelineIds = append(startedPipelineIds, pipeline.ID)
go func() {
defer sema.Release(1)
- pipelineLog.Info("run pipeline, %d", pipeline.ID)
- _ = runPipeline(pipeline.ID)
+ globalPipelineLog.Info("run pipeline, %d", pipeline.ID)
+ err = runPipeline(pipeline.ID)
+ if err != nil {
+ globalPipelineLog.Error("failed to run pipeline, %d: %v", pipeline.ID, err)
+ }
}()
}
}
-// runPipeline start a pipeline actually
-func runPipeline(pipelineId uint64) error {
- var err error
- // run
- if temporalClient != nil {
- err = runPipelineViaTemporal(pipelineId)
- } else {
- err = runPipelineStandalone(pipelineId)
- }
- // load
- pipeline, e := GetPipeline(pipelineId)
- if e != nil {
- return err
- }
- // finished, update database
- finishedAt := time.Now()
- pipeline.FinishedAt = &finishedAt
- pipeline.SpentSeconds = int(finishedAt.Unix() - pipeline.BeganAt.Unix())
- if err != nil {
- pipeline.Status = models.TASK_FAILED
- pipeline.Message = err.Error()
- } else {
- pipeline.Status = models.TASK_COMPLETED
- pipeline.Message = ""
- }
- dbe := db.Model(pipeline).Select("finished_at", "spent_seconds", "status", "message").Updates(pipeline).Error
- if dbe != nil {
- pipelineLog.Error("update pipeline state failed: %w", dbe)
- return dbe
- }
- // notify external webhook
- return NotifyExternal(pipelineId)
-}
-
-func getTemporalWorkflowId(pipelineId uint64) string {
- return fmt.Sprintf("pipeline #%d", pipelineId)
-}
-
-func runPipelineViaTemporal(pipelineId uint64) error {
- workflowOpts := client.StartWorkflowOptions{
- ID: getTemporalWorkflowId(pipelineId),
- TaskQueue: cfg.GetString("TEMPORAL_TASK_QUEUE"),
- }
- // send only the very basis data
- configJson, err := json.Marshal(cfg.AllSettings())
- if err != nil {
- return err
- }
- pipelineLog.Info("enqueue pipeline #%d into temporal task queue", pipelineId)
- workflow, err := temporalClient.ExecuteWorkflow(
- context.Background(),
- workflowOpts,
- app.DevLakePipelineWorkflow,
- configJson,
- pipelineId,
- )
- if err != nil {
- pipelineLog.Error("failed to enqueue pipeline #%d into temporal", pipelineId)
- return err
- }
- err = workflow.Get(context.Background(), nil)
- if err != nil {
- pipelineLog.Info("failed to execute pipeline #%d via temporal: %w", pipelineId, err)
- }
- pipelineLog.Info("pipeline #%d finished by temporal", pipelineId)
- return err
-}
-
func watchTemporalPipelines() {
ticker := time.NewTicker(3 * time.Second)
dc := converter.GetDefaultDataConverter()
@@ -321,7 +271,7 @@ func watchTemporalPipelines() {
"",
)
if err != nil {
- pipelineLog.Error("failed to query workflow execution: %w", err)
+ globalPipelineLog.Error("failed to query workflow execution: %w", err)
continue
}
// workflow is terminated by outsider
@@ -341,7 +291,7 @@ func watchTemporalPipelines() {
for hisIter.HasNext() {
his, err := hisIter.Next()
if err != nil {
- pipelineLog.Error("failed to get next from workflow history iterator: %w", err)
+ globalPipelineLog.Error("failed to get next from workflow history iterator: %w", err)
continue
}
rp.Message = fmt.Sprintf("temporal event type: %v", his.GetEventType())
@@ -354,7 +304,7 @@ func watchTemporalPipelines() {
"finished_at": rp.FinishedAt,
}).Error
if err != nil {
- pipelineLog.Error("failed to update db: %w", err)
+ globalPipelineLog.Error("failed to update db: %w", err)
}
continue
}
@@ -363,7 +313,7 @@ func watchTemporalPipelines() {
for _, activity := range desc.PendingActivities {
taskId, err := getTaskIdFromActivityId(activity.ActivityId)
if err != nil {
- pipelineLog.Error("unable to extract task id from activity id `%s`", activity.ActivityId)
+ globalPipelineLog.Error("unable to extract task id from activity id `%s`", activity.ActivityId)
continue
}
progressDetail := &models.TaskProgressDetail{}
@@ -379,7 +329,7 @@ func watchTemporalPipelines() {
lastPayload := payloads[len(payloads)-1]
err = dc.FromPayload(lastPayload, progressDetail)
if err != nil {
- pipelineLog.Error("failed to unmarshal heartbeat payload: %w", err)
+ globalPipelineLog.Error("failed to unmarshal heartbeat payload: %w", err)
continue
}
}
@@ -389,14 +339,8 @@ func watchTemporalPipelines() {
}()
}
-func runPipelineStandalone(pipelineId uint64) error {
- return runner.RunPipeline(
- cfg,
- pipelineLog.Nested(fmt.Sprintf("pipeline #%d", pipelineId)),
- db,
- pipelineId,
- runTasksStandalone,
- )
+func getTemporalWorkflowId(pipelineId uint64) string {
+ return fmt.Sprintf("pipeline #%d", pipelineId)
}
// NotifyExternal FIXME ...
@@ -418,7 +362,7 @@ func NotifyExternal(pipelineId uint64) error {
Status: pipeline.Status,
})
if err != nil {
- pipelineLog.Error("failed to send notification: %w", err)
+ globalPipelineLog.Error("failed to send notification: %w", err)
return err
}
return nil
@@ -441,3 +385,17 @@ func CancelPipeline(pipelineId uint64) error {
}
return err
}
+
+// getPipelineLogsPath gets the logs directory of this pipeline
+func getPipelineLogsPath(pipeline *models.Pipeline) (string, error) {
+ pipelineLog := getPipelineLogger(pipeline)
+ path := filepath.Dir(pipelineLog.GetConfig().Path)
+ _, err := os.Stat(path)
+ if err == nil {
+ return path, nil
+ }
+ if os.IsNotExist(err) {
+ return "", fmt.Errorf("logs for pipeline #%d not found: %v", pipeline.ID, err)
+ }
+ return "", fmt.Errorf("err validating logs path for pipeline #%d: %v", pipeline.ID, err)
+}
diff --git a/services/pipeline_runner.go b/services/pipeline_runner.go
new file mode 100644
index 00000000..e88ce554
--- /dev/null
+++ b/services/pipeline_runner.go
@@ -0,0 +1,139 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package services
+
+import (
+ "context"
+ "encoding/json"
+ "fmt"
+ "github.com/apache/incubator-devlake/logger"
+ "github.com/apache/incubator-devlake/models"
+ "github.com/apache/incubator-devlake/plugins/core"
+ "github.com/apache/incubator-devlake/runner"
+ "github.com/apache/incubator-devlake/worker/app"
+ "go.temporal.io/sdk/client"
+ "time"
+)
+
+type pipelineRunner struct {
+ logger core.Logger
+ pipeline *models.Pipeline
+}
+
+func (p *pipelineRunner) runPipelineStandalone() error {
+ return runner.RunPipeline(
+ cfg,
+ p.logger,
+ db,
+ p.pipeline.ID,
+ func(taskIds []uint64) error {
+ return runTasksStandalone(p.logger, taskIds)
+ },
+ )
+}
+
+func (p *pipelineRunner) runPipelineViaTemporal() error {
+ workflowOpts := client.StartWorkflowOptions{
+ ID: getTemporalWorkflowId(p.pipeline.ID),
+ TaskQueue: cfg.GetString("TEMPORAL_TASK_QUEUE"),
+ }
+ // send only the very basis data
+ configJson, err := json.Marshal(cfg.AllSettings())
+ if err != nil {
+ return err
+ }
+ p.logger.Info("enqueue pipeline #%d into temporal task queue", p.pipeline.ID)
+ workflow, err := temporalClient.ExecuteWorkflow(
+ context.Background(),
+ workflowOpts,
+ app.DevLakePipelineWorkflow,
+ configJson,
+ p.pipeline.ID,
+ p.logger.GetConfig(),
+ )
+ if err != nil {
+ p.logger.Error("failed to enqueue pipeline #%d into temporal", p.pipeline.ID)
+ return err
+ }
+ err = workflow.Get(context.Background(), nil)
+ if err != nil {
+ p.logger.Info("failed to execute pipeline #%d via temporal: %w", p.pipeline.ID, err)
+ }
+ p.logger.Info("pipeline #%d finished by temporal", p.pipeline.ID)
+ return err
+}
+
+func getPipelineLogger(pipeline *models.Pipeline) core.Logger {
+ pipelineLogger := globalPipelineLog.Nested(
+ fmt.Sprintf("pipeline #%d", pipeline.ID),
+ )
+ loggingPath := logger.GetPipelineLoggerPath(pipelineLogger.GetConfig(), pipeline)
+ stream, err := logger.GetFileStream(loggingPath)
+ if err != nil {
+ globalPipelineLog.Error("unable to set stream for logging pipeline %d", pipeline.ID)
+ } else {
+ pipelineLogger.SetStream(&core.LoggerStreamConfig{
+ Path: loggingPath,
+ Writer: stream,
+ })
+ }
+ return pipelineLogger
+}
+
+// runPipeline start a pipeline actually
+func runPipeline(pipelineId uint64) error {
+ pipeline, err := GetPipeline(pipelineId)
+ if err != nil {
+ return err
+ }
+ pipelineRun := pipelineRunner{
+ logger: getPipelineLogger(pipeline),
+ pipeline: pipeline,
+ }
+ // run
+ if temporalClient != nil {
+ err = pipelineRun.runPipelineViaTemporal()
+ } else {
+ err = pipelineRun.runPipelineStandalone()
+ }
+ if err != nil {
+ err = fmt.Errorf("error running pipeline %d: %v", pipelineId, err)
+ }
+ pipeline, e := GetPipeline(pipelineId)
+ if e != nil {
+ return fmt.Errorf("unable to get pipeline %d: %v", pipelineId, err)
+ }
+ // finished, update database
+ finishedAt := time.Now()
+ pipeline.FinishedAt = &finishedAt
+ pipeline.SpentSeconds = int(finishedAt.Unix() - pipeline.BeganAt.Unix())
+ if err != nil {
+ pipeline.Status = models.TASK_FAILED
+ pipeline.Message = err.Error()
+ } else {
+ pipeline.Status = models.TASK_COMPLETED
+ pipeline.Message = ""
+ }
+ dbe := db.Model(pipeline).Select("finished_at", "spent_seconds", "status", "message").Updates(pipeline).Error
+ if dbe != nil {
+ globalPipelineLog.Error("update pipeline state failed: %w", dbe)
+ return dbe
+ }
+ // notify external webhook
+ return NotifyExternal(pipelineId)
+}
diff --git a/services/task.go b/services/task.go
index 067ff5dc..83944e78 100644
--- a/services/task.go
+++ b/services/task.go
@@ -219,13 +219,13 @@ func CancelTask(taskId uint64) error {
return nil
}
-func runTasksStandalone(taskIds []uint64) error {
+func runTasksStandalone(parentLogger core.Logger, taskIds []uint64) error {
results := make(chan error)
for _, taskId := range taskIds {
taskId := taskId
go func() {
taskLog.Info("run task in background ", taskId)
- results <- runTaskStandalone(taskId)
+ results <- runTaskStandalone(parentLogger, taskId)
}()
}
errs := make([]string, 0)
@@ -247,8 +247,8 @@ func runTasksStandalone(taskIds []uint64) error {
return err
}
-func runTaskStandalone(taskId uint64) error {
- // deferng cleaning up
+func runTaskStandalone(parentLog core.Logger, taskId uint64) error {
+ // deferring cleaning up
defer func() {
_, _ = runningTasks.Remove(taskId)
}()
@@ -264,7 +264,7 @@ func runTaskStandalone(taskId uint64) error {
err = runner.RunTask(
ctx,
cfg,
- logger.Global.Nested(fmt.Sprintf("task #%d", taskId)),
+ parentLog,
db,
progress,
taskId,
diff --git a/utils/io.go b/utils/io.go
new file mode 100644
index 00000000..d0c43365
--- /dev/null
+++ b/utils/io.go
@@ -0,0 +1,80 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package utils
+
+import (
+ "context"
+ "fmt"
+ "github.com/viant/afs"
+ "os"
+ "path/filepath"
+)
+
+// fs abstract filesystem interface singleton instance
+var fs = afs.New()
+
+// CreateArchive creates a tar archive and writes the files/directories associated with the `targetPaths` to it.
+// 'relativeCopy' = true will copy the contents inside each sourcePath (directory) over. If the sourcePath is a file, it is directly copied over.
+func CreateArchive(archivePath string, relativeCopy bool, sourcePaths ...string) error {
+ for _, sourcePath := range sourcePaths {
+ srcPathAbs, err := filepath.Abs(sourcePath)
+ if err != nil {
+ return fmt.Errorf("error getting absolute path of %s: %v", sourcePaths, err)
+ }
+ archivePathAbs, err := filepath.Abs(archivePath)
+ if err != nil {
+ return fmt.Errorf("error getting absolute path of %s: %v", archivePath, err)
+ }
+ srcInfo, err := os.Stat(srcPathAbs)
+ if err != nil {
+ return fmt.Errorf("error getting stats of path %s: %v", srcPathAbs, err)
+ }
+ if relativeCopy {
+ if srcInfo.IsDir() {
+ err = copyContentsToArchive(srcPathAbs, archivePathAbs)
+ } else {
+ err = copyToArchive(srcPathAbs, archivePathAbs, srcInfo.Name())
+ }
+ } else {
+ err = copyToArchive(srcPathAbs, archivePathAbs, sourcePath)
+ }
+ if err != nil {
+ return fmt.Errorf("error trying to copy data to archive: %v", err)
+ }
+ }
+ return nil
+}
+
+func copyContentsToArchive(absSourcePath string, absArchivePath string) error {
+ var files []os.DirEntry
+ files, err := os.ReadDir(absSourcePath)
+ if err != nil {
+ return err
+ }
+ for _, file := range files {
+ err = fs.Copy(context.Background(), fmt.Sprintf("file://%s/%s", absSourcePath, file.Name()), fmt.Sprintf("file:%s/tar:///%s", absArchivePath, file.Name()))
+ if err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
+func copyToArchive(absSourcePath string, absArchivePath string, filename string) error {
+ return fs.Copy(context.Background(), fmt.Sprintf("file://%s", absSourcePath), fmt.Sprintf("file:%s/tar:///%s", absArchivePath, filename))
+}
diff --git a/worker/app/pipeline_workflow.go b/worker/app/pipeline_workflow.go
index 90fb40bd..ad80a984 100644
--- a/worker/app/pipeline_workflow.go
+++ b/worker/app/pipeline_workflow.go
@@ -19,6 +19,7 @@ package app
import (
"fmt"
+ "github.com/apache/incubator-devlake/plugins/core"
"strings"
"time"
@@ -27,44 +28,55 @@ import (
)
// DevLakePipelineWorkflow FIXME ...
-func DevLakePipelineWorkflow(ctx workflow.Context, configJson []byte, pipelineId uint64) error {
- cfg, logger, db, err := loadResources(configJson)
+func DevLakePipelineWorkflow(ctx workflow.Context, configJson []byte, pipelineId uint64, loggerConfig *core.LoggerConfig) error {
+ cfg, log, db, err := loadResources(configJson, loggerConfig)
if err != nil {
return err
}
- logger.Info("received pipeline #%d", pipelineId)
+ log.Info("received pipeline #%d", pipelineId)
err = runner.RunPipeline(
cfg,
- logger,
+ log,
db,
pipelineId,
func(taskIds []uint64) error {
- futures := make([]workflow.Future, len(taskIds))
- for i, taskId := range taskIds {
- activityOpts := workflow.ActivityOptions{
- ActivityID: fmt.Sprintf("task #%d", taskId),
- StartToCloseTimeout: 24 * time.Hour,
- WaitForCancellation: true,
- }
- activityCtx := workflow.WithActivityOptions(ctx, activityOpts)
- futures[i] = workflow.ExecuteActivity(activityCtx, DevLakeTaskActivity, configJson, taskId)
- }
- errs := make([]string, 0)
- for _, future := range futures {
- err := future.Get(ctx, nil)
- if err != nil {
- errs = append(errs, err.Error())
- }
- }
- if len(errs) > 0 {
- return fmt.Errorf(strings.Join(errs, "\n"))
- }
- return nil
+ return runTasks(ctx, configJson, taskIds, log)
},
)
if err != nil {
- logger.Error("failed to execute pipeline #%d: %w", pipelineId, err)
+ log.Error("failed to execute pipeline #%d: %w", pipelineId, err)
}
- logger.Info("finished pipeline #%d", pipelineId)
+ log.Info("finished pipeline #%d", pipelineId)
return err
}
+
+func runTasks(ctx workflow.Context, configJson []byte, taskIds []uint64, logger core.Logger) error {
+ cleanExit := false
+ defer func() {
+ if !cleanExit {
+ logger.Error("fatal error while executing task Ids: %v", taskIds)
+ }
+ }()
+ futures := make([]workflow.Future, len(taskIds))
+ for i, taskId := range taskIds {
+ activityOpts := workflow.ActivityOptions{
+ ActivityID: fmt.Sprintf("task #%d", taskId),
+ StartToCloseTimeout: 24 * time.Hour,
+ WaitForCancellation: true,
+ }
+ activityCtx := workflow.WithActivityOptions(ctx, activityOpts)
+ futures[i] = workflow.ExecuteActivity(activityCtx, DevLakeTaskActivity, configJson, taskId, logger)
+ }
+ errs := make([]string, 0)
+ for _, future := range futures {
+ err := future.Get(ctx, nil)
+ if err != nil {
+ errs = append(errs, err.Error())
+ }
+ }
+ cleanExit = true
+ if len(errs) > 0 {
+ return fmt.Errorf(strings.Join(errs, "\n"))
+ }
+ return nil
+}
diff --git a/worker/app/shared.go b/worker/app/shared.go
index 7d0fd246..14c24df3 100644
--- a/worker/app/shared.go
+++ b/worker/app/shared.go
@@ -27,7 +27,7 @@ import (
"gorm.io/gorm"
)
-func loadResources(configJson []byte) (*viper.Viper, core.Logger, *gorm.DB, error) {
+func loadResources(configJson []byte, loggerConfig *core.LoggerConfig) (*viper.Viper, core.Logger, *gorm.DB, error) {
// prepare
cfg := viper.New()
cfg.SetConfigType("json")
@@ -41,5 +41,22 @@ func loadResources(configJson []byte) (*viper.Viper, core.Logger, *gorm.DB, erro
if err != nil {
return nil, nil, nil, err
}
- return cfg, globalLogger, db, err
+ log, err := getWorkerLogger(globalLogger, loggerConfig)
+ if err != nil {
+ return nil, nil, nil, err
+ }
+ return cfg, log, db, err
+}
+
+func getWorkerLogger(log core.Logger, logConfig *core.LoggerConfig) (core.Logger, error) {
+ newLog := log.Nested(logConfig.Prefix)
+ stream, err := logger.GetFileStream(logConfig.Path)
+ if err != nil {
+ return nil, err
+ }
+ newLog.SetStream(&core.LoggerStreamConfig{
+ Path: logConfig.Path,
+ Writer: stream,
+ })
+ return newLog, nil
}
diff --git a/worker/app/task_activity.go b/worker/app/task_activity.go
index 9fd0ad16..a23dd2b5 100644
--- a/worker/app/task_activity.go
+++ b/worker/app/task_activity.go
@@ -27,8 +27,8 @@ import (
)
// DevLakeTaskActivity FIXME ...
-func DevLakeTaskActivity(ctx context.Context, configJson []byte, taskId uint64) error {
- cfg, log, db, err := loadResources(configJson)
+func DevLakeTaskActivity(ctx context.Context, configJson []byte, taskId uint64, loggerConfig *core.LoggerConfig) error {
+ cfg, log, db, err := loadResources(configJson, loggerConfig)
if err != nil {
return err
}