You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@devlake.apache.org by zk...@apache.org on 2022/08/19 01:46:17 UTC

[incubator-devlake] branch main updated: [feat-2578]: Download pipeline logging archives API (#2656)

This is an automated email from the ASF dual-hosted git repository.

zky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git


The following commit(s) were added to refs/heads/main by this push:
     new 62512126 [feat-2578]: Download pipeline logging archives API (#2656)
62512126 is described below

commit 625121267de32f60720531430e902c6f84a03cef
Author: Keon Amini <ke...@merico.dev>
AuthorDate: Thu Aug 18 20:46:12 2022 -0500

    [feat-2578]: Download pipeline logging archives API (#2656)
    
    * feat: add download pipeline logs API and functionality (#2578)
    
    * refactor: re-implemented archive creation differently
    
    * refactor: archive creation re-implemented more robustly
    
    * refactor: using viant/afs library for archiving logic to simplify code
    
    * refactor: reworked most things based on feedback + swagger docs and http headers fixed.
    
    * refactor: considerably simplified the logic to setup logger instances
    
    * refactor: some refactor in response to PR feedback
    
    * fix: some fixes
    
    * fix: fixed pipeline run prematuring exiting upon error
---
 .env.example                                       |   1 +
 api/pipelines/pipelines.go                         |  41 ++++++
 api/router.go                                      |   2 +
 errors/errors.go                                   |   8 ++
 go.mod                                             |   8 +-
 go.sum                                             |  16 +--
 logger/init.go                                     |  35 ++++-
 logger/logger.go                                   |  87 +++++++-----
 errors/errors.go => logger/stream.go               |  41 +++---
 worker/app/task_activity.go => logger/utils.go     |  52 +++----
 ...20729_rename_columns_of_pull_request_issues.go} |   0
 plugins/core/logger.go                             |  26 +++-
 runner/run_pipeline.go                             |   2 +-
 runner/run_task.go                                 |  28 +++-
 services/pipeline.go                               | 150 ++++++++-------------
 services/pipeline_runner.go                        | 139 +++++++++++++++++++
 services/task.go                                   |  10 +-
 utils/io.go                                        |  80 +++++++++++
 worker/app/pipeline_workflow.go                    |  66 +++++----
 worker/app/shared.go                               |  21 ++-
 worker/app/task_activity.go                        |   4 +-
 21 files changed, 576 insertions(+), 241 deletions(-)

diff --git a/.env.example b/.env.example
index 778e06c8..2214b354 100644
--- a/.env.example
+++ b/.env.example
@@ -27,6 +27,7 @@ TEMPORAL_URL=
 TEMPORAL_TASK_QUEUE=
 # Debug Info Warn Error
 LOGGING_LEVEL=
+LOGGING_DIR=./logs
 
 ##########################
 # Sensitive information encryption key
diff --git a/api/pipelines/pipelines.go b/api/pipelines/pipelines.go
index c5cc259c..d6cbb5e4 100644
--- a/api/pipelines/pipelines.go
+++ b/api/pipelines/pipelines.go
@@ -18,7 +18,10 @@ limitations under the License.
 package pipelines
 
 import (
+	"github.com/apache/incubator-devlake/errors"
 	"net/http"
+	"os"
+	"path/filepath"
 	"strconv"
 
 	"github.com/apache/incubator-devlake/api/shared"
@@ -167,3 +170,41 @@ func Delete(c *gin.Context) {
 	}
 	shared.ApiOutputSuccess(c, nil, http.StatusOK)
 }
+
+/*
+Get download logs of a pipeline
+GET /pipelines/:pipelineId/logging.tar.gz
+*/
+// download logs of a pipeline
+// @Description GET /pipelines/:pipelineId/logging.tar.gz
+// @Tags framework/pipelines
+// @Param pipelineId path int true "query"
+// @Success 200  "The archive file"
+// @Failure 400  {string} errcode.Error "Bad Request"
+// @Failure 404  {string} errcode.Error "Pipeline not found"
+// @Failure 500  {string} errcode.Error "Internel Error"
+// @Router /pipelines/{pipelineId}/logging.tar.gz [get]
+func DownloadLogs(c *gin.Context) {
+	pipelineId := c.Param("pipelineId")
+	id, err := strconv.ParseUint(pipelineId, 10, 64)
+	if err != nil {
+		shared.ApiOutputError(c, err, http.StatusBadRequest)
+		return
+	}
+	pipeline, err := services.GetPipeline(id)
+	if err != nil {
+		if errors.IsNotFound(err) {
+			shared.ApiOutputError(c, err, http.StatusNotFound)
+		} else {
+			shared.ApiOutputError(c, err, http.StatusInternalServerError)
+		}
+		return
+	}
+	archive, err := services.GetPipelineLogsArchivePath(pipeline)
+	if err != nil {
+		shared.ApiOutputError(c, err, http.StatusInternalServerError)
+		return
+	}
+	defer os.Remove(archive)
+	c.FileAttachment(archive, filepath.Base(archive))
+}
diff --git a/api/router.go b/api/router.go
index 84767c1e..0d4eb08b 100644
--- a/api/router.go
+++ b/api/router.go
@@ -50,6 +50,8 @@ func RegisterRouter(r *gin.Engine) {
 	r.DELETE("/pipelines/:pipelineId", pipelines.Delete)
 	r.GET("/pipelines/:pipelineId/tasks", task.Index)
 
+	r.GET("/pipelines/:pipelineId/logging.tar.gz", pipelines.DownloadLogs)
+
 	r.GET("/ping", ping.Get)
 	r.GET("/version", version.Get)
 	r.POST("/push/:tableName", push.Post)
diff --git a/errors/errors.go b/errors/errors.go
index d908c5a3..b0a842b9 100644
--- a/errors/errors.go
+++ b/errors/errors.go
@@ -45,4 +45,12 @@ func NewNotFound(message string) *Error {
 	return NewError(http.StatusNotFound, message)
 }
 
+func IsNotFound(err error) bool {
+	errCast, ok := err.(*Error)
+	if !ok {
+		return false
+	}
+	return errCast.Status == http.StatusNotFound
+}
+
 var InternalError = NewError(http.StatusInternalServerError, "Server Internal Error")
diff --git a/go.mod b/go.mod
index 7e042d66..e2481d9a 100644
--- a/go.mod
+++ b/go.mod
@@ -8,9 +8,11 @@ require (
 	github.com/go-git/go-git/v5 v5.4.2
 	github.com/go-playground/validator/v10 v10.9.0
 	github.com/gocarina/gocsv v0.0.0-20220707092902-b9da1f06c77e
+	github.com/google/uuid v1.3.0
 	github.com/libgit2/git2go/v33 v33.0.6
 	github.com/magiconair/properties v1.8.5
 	github.com/manifoldco/promptui v0.9.0
+	github.com/merico-dev/graphql v0.0.0-20220804061427-a2245fa66df2
 	github.com/mitchellh/mapstructure v1.4.1
 	github.com/panjf2000/ants/v2 v2.4.6
 	github.com/robfig/cron/v3 v3.0.0
@@ -22,6 +24,7 @@ require (
 	github.com/stretchr/testify v1.7.0
 	github.com/swaggo/gin-swagger v1.4.3
 	github.com/swaggo/swag v1.8.3
+	github.com/viant/afs v1.16.0
 	github.com/x-cray/logrus-prefixed-formatter v0.5.2
 	go.temporal.io/api v1.7.1-0.20220223032354-6e6fe738916a
 	go.temporal.io/sdk v1.14.0
@@ -47,6 +50,7 @@ require (
 	github.com/facebookgo/clock v0.0.0-20150410010913-600d898af40a // indirect
 	github.com/fsnotify/fsnotify v1.5.1 // indirect
 	github.com/gin-contrib/sse v0.1.0 // indirect
+	github.com/go-errors/errors v1.4.2 // indirect
 	github.com/go-git/gcfg v1.5.0 // indirect
 	github.com/go-git/go-billy/v5 v5.3.1 // indirect
 	github.com/go-openapi/jsonpointer v0.19.5 // indirect
@@ -61,7 +65,6 @@ require (
 	github.com/gogo/status v1.1.0 // indirect
 	github.com/golang/mock v1.6.0 // indirect
 	github.com/golang/protobuf v1.5.2 // indirect
-	github.com/google/uuid v1.3.0 // indirect
 	github.com/gopherjs/gopherjs v0.0.0-20190430165422-3e4dfb77656c // indirect
 	github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 // indirect
 	github.com/hashicorp/hcl v1.0.0 // indirect
@@ -86,7 +89,6 @@ require (
 	github.com/mattn/go-colorable v0.1.6 // indirect
 	github.com/mattn/go-isatty v0.0.13 // indirect
 	github.com/mattn/go-sqlite3 v1.14.6 // indirect
-	github.com/merico-dev/graphql v0.0.0-20220804061427-a2245fa66df2 // indirect
 	github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d // indirect
 	github.com/mitchellh/go-homedir v1.1.0 // indirect
 	github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
@@ -95,11 +97,11 @@ require (
 	github.com/onsi/gomega v1.10.3 // indirect
 	github.com/pborman/uuid v1.2.1 // indirect
 	github.com/pelletier/go-toml v1.9.3 // indirect
+	github.com/pkg/errors v0.9.1 // indirect
 	github.com/pmezard/go-difflib v1.0.0 // indirect
 	github.com/robfig/cron v1.2.0 // indirect
 	github.com/russross/blackfriday/v2 v2.1.0 // indirect
 	github.com/sergi/go-diff v1.1.0 // indirect
-	github.com/shurcooL/graphql v0.0.0-20220606043923-3cf50f8a0a29 // indirect
 	github.com/spf13/cast v1.4.1 // indirect
 	github.com/spf13/jwalterweatherman v1.1.0 // indirect
 	github.com/spf13/pflag v1.0.6-0.20200504143853-81378bbcd8a1 // indirect
diff --git a/go.sum b/go.sum
index e0931d46..6dc810da 100644
--- a/go.sum
+++ b/go.sum
@@ -128,6 +128,8 @@ github.com/gin-gonic/gin v1.7.7 h1:3DoBmSbJbZAWqXJC3SLjAPfutPJJRN1U5pALB7EeTTs=
 github.com/gin-gonic/gin v1.7.7/go.mod h1:axIBovoeJpVj8S3BwE0uPMTeReE4+AfFtqpqaZ1qq1U=
 github.com/gliderlabs/ssh v0.2.2 h1:6zsha5zo/TWhRhwqCD3+EarCAgZ2yN28ipRnGPnwkI0=
 github.com/gliderlabs/ssh v0.2.2/go.mod h1:U7qILu1NlMHj9FlMhZLlkCdDnU1DBEAqr0aevW3Awn0=
+github.com/go-errors/errors v1.4.2 h1:J6MZopCL4uSllY1OfXM374weqZFFItUbrImctkmUxIA=
+github.com/go-errors/errors v1.4.2/go.mod h1:sIVyrIiJhuEF+Pj9Ebtd6P/rEYROXFi3BopGUQ5a5Og=
 github.com/go-git/gcfg v1.5.0 h1:Q5ViNfGF8zFgyJWPqYwA7qGFoMTEiBmdlkcfRmpIMa4=
 github.com/go-git/gcfg v1.5.0/go.mod h1:5m20vg6GwYabIxaOonVkTdrILxQMpEShl1xiMF4ua+E=
 github.com/go-git/go-billy/v5 v5.2.0/go.mod h1:pmpqyWchKfYfrkb/UVH4otLvyi/5gJlGI4Hb3ZqZ3W0=
@@ -432,10 +434,6 @@ github.com/mattn/go-isatty v0.0.13/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Ky
 github.com/mattn/go-sqlite3 v1.14.5/go.mod h1:WVKg1VTActs4Qso6iwGbiFih2UIHo0ENGwNd0Lj+XmI=
 github.com/mattn/go-sqlite3 v1.14.6 h1:dNPt6NO46WmLVt2DLNpwczCmdV5boIZ6g/tlDrlRUbg=
 github.com/mattn/go-sqlite3 v1.14.6/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU=
-github.com/merico-dev/graphql v0.0.0-20220606043923-3cf50f8a0a29 h1:jEarKDWDyd59SKG2AoH/3JYqRFfl7RE2uEzGJRpCl3Q=
-github.com/merico-dev/graphql v0.0.0-20220606043923-3cf50f8a0a29/go.mod h1:q+XAHrrzQfwrEaGgnl/VduY0Gd9FqSnMQEpKq101TxE=
-github.com/merico-dev/graphql v0.0.0-20220803162350-42fdc19ba54c h1:7l5zeXBLMWafzHD0ElpaNVTVhNzcQz7Urkw9WebNt5o=
-github.com/merico-dev/graphql v0.0.0-20220803162350-42fdc19ba54c/go.mod h1:dcDqG8HXVtfEhTCipFMa0Q+RTKTtDKIO2vJt+JVzHEQ=
 github.com/merico-dev/graphql v0.0.0-20220804061427-a2245fa66df2 h1:sOXuZIg3OwBnvJFfIuO8wegiLpeDCOSVvk2dsbjurd8=
 github.com/merico-dev/graphql v0.0.0-20220804061427-a2245fa66df2/go.mod h1:dcDqG8HXVtfEhTCipFMa0Q+RTKTtDKIO2vJt+JVzHEQ=
 github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d h1:5PJl274Y63IEHC+7izoQE9x6ikvDFZS2mDVS3drnohI=
@@ -516,8 +514,6 @@ github.com/shopspring/decimal v0.0.0-20180709203117-cd690d0c9e24/go.mod h1:M+9Nz
 github.com/shopspring/decimal v0.0.0-20200227202807-02e2044944cc/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o=
 github.com/shopspring/decimal v1.2.0 h1:abSATXmQEYyShuxI4/vyW3tV1MrKAJzCZ/0zLUXYbsQ=
 github.com/shopspring/decimal v1.2.0/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o=
-github.com/shurcooL/graphql v0.0.0-20220606043923-3cf50f8a0a29 h1:B1PEwpArrNp4dkQrfxh/abbBAOZBVp0ds+fBEOUOqOc=
-github.com/shurcooL/graphql v0.0.0-20220606043923-3cf50f8a0a29/go.mod h1:AuYgA5Kyo4c7HfUmvRGs/6rGlMMV/6B1bVnB9JxJEEg=
 github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc=
 github.com/sirupsen/logrus v1.4.1/go.mod h1:ni0Sbl8bgC9z8RoU9G6nDWqqs/fq4eDPysMBDgk/93Q=
 github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
@@ -570,6 +566,8 @@ github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLY
 github.com/ugorji/go/codec v1.2.6 h1:7kbGefxLoDBuYXOms4yD7223OpNMMPNPZxXk5TvFcyQ=
 github.com/ugorji/go/codec v1.2.6/go.mod h1:V6TCNZ4PHqoHGFZuSG1W8nrCzzdgA2DozYxWFFpvxTw=
 github.com/urfave/cli/v2 v2.3.0/go.mod h1:LJmUH05zAU44vOAcrfzZQKsZbVcdbOG8rtL3/XcUArI=
+github.com/viant/afs v1.16.0 h1:yb9TQ1gjVVLji9lcXLWaarklqmGWeXTZOwc2fwJevCI=
+github.com/viant/afs v1.16.0/go.mod h1:wdiEDffZKJwj1ZSFasy7hHoxLQdSpFZkd3XOWNt1aN0=
 github.com/x-cray/logrus-prefixed-formatter v0.5.2 h1:00txxvfBM9muc0jiLIEAkAcIMJzfthRT6usrui8uGmg=
 github.com/x-cray/logrus-prefixed-formatter v0.5.2/go.mod h1:2duySbKsL6M18s5GU7VPsoEPHyzalCE06qoARUCeBBE=
 github.com/xanzy/ssh-agent v0.3.0 h1:wUMzuKtKilRgBAD1sUb8gOwwRr2FGoBVumcjoOACClI=
@@ -712,12 +710,8 @@ golang.org/x/net v0.0.0-20210421230115-4e50805a0758/go.mod h1:72T/g9IO56b78aLF+1
 golang.org/x/net v0.0.0-20210805182204-aaa1db679c0d/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
 golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
 golang.org/x/net v0.0.0-20220425223048-2871e0cb64e4/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
-golang.org/x/net v0.0.0-20220708220712-1185a9018129 h1:vucSRfWwTsoXro7P+3Cjlr6flUMtzCwzlvkxEQtHHB0=
-golang.org/x/net v0.0.0-20220708220712-1185a9018129/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
 golang.org/x/net v0.0.0-20220728211354-c7608f3a8462 h1:UreQrH7DbFXSi9ZFox6FNT3WBooWmdANpU+IfkT1T4I=
 golang.org/x/net v0.0.0-20220728211354-c7608f3a8462/go.mod h1:YDH+HFinaLZZlnHAfSS6ZXJJ9M9t4Dl22yv3iI2vPwk=
-golang.org/x/net v0.0.0-20220802222814-0bcc04d9c69b h1:3ogNYyK4oIQdIKzTu68hQrr4iuVxF3AxKl9Aj/eDrw0=
-golang.org/x/net v0.0.0-20220802222814-0bcc04d9c69b/go.mod h1:YDH+HFinaLZZlnHAfSS6ZXJJ9M9t4Dl22yv3iI2vPwk=
 golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
 golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
 golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
@@ -810,8 +804,6 @@ golang.org/x/sys v0.0.0-20210806184541-e5e7981a1069/go.mod h1:oPkhp1MJrh7nUepCBc
 golang.org/x/sys v0.0.0-20210809222454-d867a43fc93e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
 golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
 golang.org/x/sys v0.0.0-20220222200937-f2425489ef4c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
-golang.org/x/sys v0.0.0-20220712014510-0a85c31ab51e h1:NHvCuwuS43lGnYhten69ZWqi2QOj/CiDNcKbVqwVoew=
-golang.org/x/sys v0.0.0-20220712014510-0a85c31ab51e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
 golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10 h1:WIoqL4EROvwiPdUtaip4VcDdpZ4kha7wBWZrbVKCIZg=
 golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
 golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=
diff --git a/logger/init.go b/logger/init.go
index a5ff6616..3f5b2c70 100644
--- a/logger/init.go
+++ b/logger/init.go
@@ -18,12 +18,12 @@ limitations under the License.
 package logger
 
 import (
-	"fmt"
 	"github.com/apache/incubator-devlake/config"
 	"github.com/apache/incubator-devlake/plugins/core"
 	"github.com/sirupsen/logrus"
 	prefixed "github.com/x-cray/logrus-prefixed-formatter"
-	"os"
+	"io"
+	"path/filepath"
 	"strings"
 )
 
@@ -33,7 +33,8 @@ var Global core.Logger
 func init() {
 	inner = logrus.New()
 	logLevel := logrus.InfoLevel
-	switch strings.ToLower(config.GetConfig().GetString("LOGGING_LEVEL")) {
+	cfg := config.GetConfig()
+	switch strings.ToLower(cfg.GetString("LOGGING_LEVEL")) {
 	case "debug":
 		logLevel = logrus.DebugLevel
 	case "info":
@@ -48,10 +49,30 @@ func init() {
 		TimestampFormat: "2006-01-02 15:04:05",
 		FullTimestamp:   true,
 	})
+	basePath := cfg.GetString("LOGGING_DIR")
+	if basePath == "" {
+		inner.Error("LOGGING_DIR is not set. Log files will not be generated.")
+	} else {
+		basePath = filepath.Join(basePath, "devlake.log")
+	}
+	var err error
+	Global, err = NewDefaultLogger(inner)
+	Global.SetStream(&core.LoggerStreamConfig{
+		Path:   basePath,
+		Writer: createLogStream(basePath),
+	})
+	if err != nil {
+		panic(err)
+	}
+}
 
-	if err := os.Mkdir("logs", 0777); err != nil {
-		inner.Info(fmt.Sprintf("failed to create dir logs: %s", err))
+func createLogStream(path string) io.Writer {
+	if path == "" {
+		return nil
+	}
+	stream, err := GetFileStream(path)
+	if err != nil {
+		panic(err)
 	}
-	loggerPool := make(map[string]*logrus.Logger)
-	Global = NewDefaultLogger(inner, "", loggerPool)
+	return stream
 }
diff --git a/logger/logger.go b/logger/logger.go
index 110affe1..c4a52aac 100644
--- a/logger/logger.go
+++ b/logger/logger.go
@@ -21,21 +21,23 @@ import (
 	"fmt"
 	"github.com/apache/incubator-devlake/plugins/core"
 	"github.com/sirupsen/logrus"
-	"io"
-	"os"
 	"regexp"
+	"strings"
 )
 
+var alreadyInBracketsRegex = regexp.MustCompile(`\[.*?]+`)
+
 type DefaultLogger struct {
-	prefix     string
-	log        *logrus.Logger
-	loggerPool map[string]*logrus.Logger
+	log    *logrus.Logger
+	config *core.LoggerConfig
 }
 
-func NewDefaultLogger(log *logrus.Logger, prefix string, loggerPool map[string]*logrus.Logger) *DefaultLogger {
-	newDefaultLogger := &DefaultLogger{prefix: prefix, log: log}
-	newDefaultLogger.loggerPool = loggerPool
-	return newDefaultLogger
+func NewDefaultLogger(log *logrus.Logger) (core.Logger, error) {
+	defaultLogger := &DefaultLogger{
+		log:    log,
+		config: &core.LoggerConfig{},
+	}
+	return defaultLogger, nil
 }
 
 func (l *DefaultLogger) IsLevelEnabled(level core.LogLevel) bool {
@@ -48,8 +50,8 @@ func (l *DefaultLogger) IsLevelEnabled(level core.LogLevel) bool {
 func (l *DefaultLogger) Log(level core.LogLevel, format string, a ...interface{}) {
 	if l.IsLevelEnabled(level) {
 		msg := fmt.Sprintf(format, a...)
-		if l.prefix != "" {
-			msg = fmt.Sprintf("%s %s", l.prefix, msg)
+		if l.config.Prefix != "" {
+			msg = fmt.Sprintf("%s %s", l.config.Prefix, msg)
 		}
 		l.log.Log(logrus.Level(level), msg)
 	}
@@ -75,32 +77,57 @@ func (l *DefaultLogger) Error(format string, a ...interface{}) {
 	l.Log(core.LOG_ERROR, format, a...)
 }
 
-// bind two writer to logger
-func (l *DefaultLogger) Nested(name string) core.Logger {
-	writerStd := os.Stdout
-	fileName := ""
-	loggerPrefixRegex := regexp.MustCompile(`(\[task #\d+]\s\[\w+])`)
-	groups := loggerPrefixRegex.FindStringSubmatch(fmt.Sprintf("%s [%s]", l.prefix, name))
-	if len(groups) > 1 {
-		fileName = groups[1]
+func (l *DefaultLogger) SetStream(config *core.LoggerStreamConfig) {
+	if config.Path != "" {
+		l.config.Path = config.Path
+	}
+	if config.Writer != nil {
+		l.log.SetOutput(config.Writer)
+	}
+}
+
+func (l *DefaultLogger) GetConfig() *core.LoggerConfig {
+	return &core.LoggerConfig{
+		Path:   l.config.Path,
+		Prefix: l.config.Prefix,
 	}
+}
 
-	if fileName == "" {
-		fileName = "devlake"
+func (l *DefaultLogger) Nested(newPrefix string) core.Logger {
+	newTotalPrefix := newPrefix
+	if newPrefix != "" {
+		newTotalPrefix = l.createPrefix(newPrefix)
+	}
+	newLogger, err := l.getLogger(newTotalPrefix)
+	if err != nil {
+		l.Error("error getting a new logger: %v", newLogger)
+		return l
 	}
+	return newLogger
+}
 
-	if l.loggerPool[fileName] != nil {
-		return NewDefaultLogger(l.loggerPool[fileName], fmt.Sprintf("%s [%s]", l.prefix, name), l.loggerPool)
+func (l *DefaultLogger) getLogger(prefix string) (core.Logger, error) {
+	newLogrus := logrus.New()
+	newLogrus.SetLevel(l.log.Level)
+	newLogrus.SetFormatter(l.log.Formatter)
+	newLogrus.SetOutput(l.log.Out)
+	newLogger := &DefaultLogger{
+		log: newLogrus,
+		config: &core.LoggerConfig{
+			Path:   l.config.Path,
+			Prefix: prefix,
+		},
 	}
-	newLog := logrus.New()
-	newLog.SetLevel(l.log.Level)
-	newLog.SetFormatter(l.log.Formatter)
+	return newLogger, nil
+}
 
-	if file, err := os.OpenFile(fmt.Sprintf("logs/%s.log", fileName), os.O_APPEND|os.O_WRONLY|os.O_CREATE, 0666); err == nil {
-		newLog.SetOutput(io.MultiWriter(writerStd, file))
+func (l *DefaultLogger) createPrefix(newPrefix string) string {
+	newPrefix = strings.TrimSpace(newPrefix)
+	alreadyInBrackets := alreadyInBracketsRegex.MatchString(newPrefix)
+	if alreadyInBrackets {
+		return fmt.Sprintf("%s %s", l.config.Prefix, newPrefix)
 	}
-	l.loggerPool[fileName] = newLog
-	return NewDefaultLogger(newLog, fmt.Sprintf("%s [%s]", l.prefix, name), l.loggerPool)
+	return fmt.Sprintf("%s [%s]", l.config.Prefix, newPrefix)
 }
 
 var _ core.Logger = (*DefaultLogger)(nil)
diff --git a/errors/errors.go b/logger/stream.go
similarity index 63%
copy from errors/errors.go
copy to logger/stream.go
index d908c5a3..0161bb89 100644
--- a/errors/errors.go
+++ b/logger/stream.go
@@ -15,34 +15,25 @@ See the License for the specific language governing permissions and
 limitations under the License.
 */
 
-package errors
+package logger
 
 import (
-	"net/http"
+	"io"
+	"os"
+	"path/filepath"
 )
 
-type Error struct {
-	Status  int
-	Message string
-}
-
-func (e *Error) Code() int {
-	return e.Status
-}
-
-func (e *Error) Error() string {
-	return e.Message
-}
-
-func NewError(status int, message string) *Error {
-	return &Error{
-		status,
-		message,
+func GetFileStream(path string) (io.Writer, error) {
+	if path == "" {
+		return os.Stdout, nil
 	}
+	err := os.MkdirAll(filepath.Dir(path), os.ModePerm)
+	if err != nil {
+		return nil, err
+	}
+	file, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0777)
+	if err != nil {
+		return nil, err
+	}
+	return io.MultiWriter(os.Stdout, file), nil
 }
-
-func NewNotFound(message string) *Error {
-	return NewError(http.StatusNotFound, message)
-}
-
-var InternalError = NewError(http.StatusInternalServerError, "Server Internal Error")
diff --git a/worker/app/task_activity.go b/logger/utils.go
similarity index 52%
copy from worker/app/task_activity.go
copy to logger/utils.go
index 9fd0ad16..a4a2ced6 100644
--- a/worker/app/task_activity.go
+++ b/logger/utils.go
@@ -15,37 +15,43 @@ See the License for the specific language governing permissions and
 limitations under the License.
 */
 
-package app
+package logger
 
 import (
-	"context"
-
+	"fmt"
 	"github.com/apache/incubator-devlake/models"
 	"github.com/apache/incubator-devlake/plugins/core"
-	"github.com/apache/incubator-devlake/runner"
-	"go.temporal.io/sdk/activity"
+	"os"
+	"path/filepath"
 )
 
-// DevLakeTaskActivity FIXME ...
-func DevLakeTaskActivity(ctx context.Context, configJson []byte, taskId uint64) error {
-	cfg, log, db, err := loadResources(configJson)
+func GetTaskLoggerPath(config *core.LoggerConfig, t *models.Task) string {
+	if config.Path == "" {
+		return ""
+	}
+	info, err := os.Stat(config.Path)
 	if err != nil {
-		return err
+		panic(err)
 	}
-	log.Info("received task #%d", taskId)
-	progressDetail := &models.TaskProgressDetail{}
-	progChan := make(chan core.RunningProgress)
-	defer close(progChan)
-	go func() {
-		for p := range progChan {
-			runner.UpdateProgressDetail(db, log, taskId, progressDetail, &p)
-			activity.RecordHeartbeat(ctx, progressDetail)
-		}
-	}()
-	err = runner.RunTask(ctx, cfg, log, db, progChan, taskId)
+	basePath := config.Path
+	if !info.IsDir() {
+		basePath = filepath.Dir(config.Path)
+	}
+	return filepath.Join(basePath, fmt.Sprintf("task-%d-%d-%d-%s.log", t.ID, t.PipelineRow, t.PipelineCol, t.Plugin))
+}
+
+func GetPipelineLoggerPath(config *core.LoggerConfig, p *models.Pipeline) string {
+	if config.Path == "" {
+		return ""
+	}
+	info, err := os.Stat(config.Path)
 	if err != nil {
-		log.Error("failed to execute task #%d: %w", taskId, err)
+		panic(err)
+	}
+	basePath := config.Path
+	if !info.IsDir() {
+		basePath = filepath.Dir(config.Path)
 	}
-	log.Info("finished task #%d", taskId)
-	return err
+	formattedCreationTime := p.CreatedAt.UTC().Format("20060102-1504")
+	return filepath.Join(basePath, fmt.Sprintf("pipeline-%d-%s", p.ID, formattedCreationTime), "pipeline.log")
 }
diff --git a/models/migrationscripts/202220729_rename_columns_of_pull_request_issues.go b/models/migrationscripts/20220729_rename_columns_of_pull_request_issues.go
similarity index 100%
rename from models/migrationscripts/202220729_rename_columns_of_pull_request_issues.go
rename to models/migrationscripts/20220729_rename_columns_of_pull_request_issues.go
diff --git a/plugins/core/logger.go b/plugins/core/logger.go
index 0b345426..0c71ab2f 100644
--- a/plugins/core/logger.go
+++ b/plugins/core/logger.go
@@ -17,7 +17,10 @@ limitations under the License.
 
 package core
 
-import "github.com/sirupsen/logrus"
+import (
+	"github.com/sirupsen/logrus"
+	"io"
+)
 
 type LogLevel logrus.Level
 
@@ -28,7 +31,7 @@ const (
 	LOG_ERROR LogLevel = LogLevel(logrus.ErrorLevel)
 )
 
-// General logger interface, can be used any where
+// Logger General logger interface, can be used anywhere
 type Logger interface {
 	IsLevelEnabled(level LogLevel) bool
 	Printf(format string, a ...interface{})
@@ -37,10 +40,27 @@ type Logger interface {
 	Info(format string, a ...interface{})
 	Warn(format string, a ...interface{})
 	Error(format string, a ...interface{})
-	// return a new logger which output nested log
+	// Nested return a new logger instance. `name` is the extra prefix to be prepended to each message. Leaving it blank
+	// will add no additional prefix. The new Logger will inherit the properties of the original.
 	Nested(name string) Logger
+	// GetConfig Returns a copy of the LoggerConfig associated with this Logger. This is meant to be used by the framework.
+	GetConfig() *LoggerConfig
+	// SetStream sets the output of this Logger. This is meant to be used by the framework.
+	SetStream(config *LoggerStreamConfig)
+}
+
+// LoggerStreamConfig stream related config to set on a Logger
+type LoggerStreamConfig struct {
+	Path   string
+	Writer io.Writer
 }
 
 type InjectLogger interface {
 	SetLogger(logger Logger)
 }
+
+// LoggerConfig config related to the Logger. This needs to be serializable, so it can be passed around over the wire.
+type LoggerConfig struct {
+	Path   string
+	Prefix string
+}
diff --git a/runner/run_pipeline.go b/runner/run_pipeline.go
index 8ed39d37..5bf37d5c 100644
--- a/runner/run_pipeline.go
+++ b/runner/run_pipeline.go
@@ -28,7 +28,7 @@ import (
 
 // RunPipeline FIXME ...
 func RunPipeline(
-	cfg *viper.Viper,
+	_ *viper.Viper,
 	log core.Logger,
 	db *gorm.DB,
 	pipelineId uint64,
diff --git a/runner/run_task.go b/runner/run_task.go
index 55ec737c..1f5c2828 100644
--- a/runner/run_task.go
+++ b/runner/run_task.go
@@ -21,6 +21,7 @@ import (
 	"context"
 	"encoding/json"
 	"fmt"
+	"github.com/apache/incubator-devlake/logger"
 	"time"
 
 	"github.com/apache/incubator-devlake/config"
@@ -39,7 +40,7 @@ import (
 func RunTask(
 	ctx context.Context,
 	_ *viper.Viper,
-	logger core.Logger,
+	parentLogger core.Logger,
 	db *gorm.DB,
 	progress chan core.RunningProgress,
 	taskId uint64,
@@ -52,6 +53,10 @@ func RunTask(
 	if task.Status == models.TASK_COMPLETED {
 		return fmt.Errorf("invalid task status")
 	}
+	log, err := getTaskLogger(parentLogger, task)
+	if err != nil {
+		return err
+	}
 	beganAt := time.Now()
 	// make sure task status always correct even if it panicked
 	defer func() {
@@ -73,7 +78,7 @@ func RunTask(
 				"failed_sub_task": subTaskName,
 			}).Error
 			if dbe != nil {
-				logger.Error("failed to finalize task status into db: %w", err)
+				log.Error("failed to finalize task status into db: %w", err)
 			}
 		} else {
 			err = db.Model(task).Updates(map[string]interface{}{
@@ -86,7 +91,7 @@ func RunTask(
 	}()
 
 	// start execution
-	logger.Info("start executing task: %d", task.ID)
+	log.Info("start executing task: %d", task.ID)
 	err = db.Model(task).Updates(map[string]interface{}{
 		"status":   models.TASK_RUNNING,
 		"message":  "",
@@ -110,7 +115,7 @@ func RunTask(
 	err = RunPluginTask(
 		ctx,
 		config.GetConfig(),
-		logger.Nested(task.Plugin),
+		log.Nested(task.Plugin),
 		db,
 		task.ID,
 		task.Plugin,
@@ -141,7 +146,6 @@ func RunPluginTask(
 	if !ok {
 		return fmt.Errorf("plugin %s doesn't support PluginTask interface", name)
 	}
-
 	return RunPluginSubTasks(
 		ctx,
 		cfg,
@@ -326,3 +330,17 @@ func recordSubtask(logger core.Logger, db *gorm.DB, subtask *models.Subtask) {
 		logger.Error("error writing subtask %d status to DB: %v", subtask.ID, err)
 	}
 }
+
+func getTaskLogger(parentLogger core.Logger, task *models.Task) (core.Logger, error) {
+	log := parentLogger.Nested(fmt.Sprintf("task #%d", task.ID))
+	loggingPath := logger.GetTaskLoggerPath(log.GetConfig(), task)
+	stream, err := logger.GetFileStream(loggingPath)
+	if err != nil {
+		return nil, err
+	}
+	log.SetStream(&core.LoggerStreamConfig{
+		Path:   loggingPath,
+		Writer: stream,
+	})
+	return log, nil
+}
diff --git a/services/pipeline.go b/services/pipeline.go
index 38c96e07..be6774a5 100644
--- a/services/pipeline.go
+++ b/services/pipeline.go
@@ -21,14 +21,16 @@ import (
 	"context"
 	"encoding/json"
 	"fmt"
+	"github.com/apache/incubator-devlake/utils"
+	"github.com/google/uuid"
+	"os"
+	"path/filepath"
 	"strings"
 	"time"
 
 	"github.com/apache/incubator-devlake/errors"
 	"github.com/apache/incubator-devlake/logger"
 	"github.com/apache/incubator-devlake/models"
-	"github.com/apache/incubator-devlake/runner"
-	"github.com/apache/incubator-devlake/worker/app"
 	v11 "go.temporal.io/api/enums/v1"
 	"go.temporal.io/sdk/client"
 	"go.temporal.io/sdk/converter"
@@ -38,7 +40,7 @@ import (
 
 var notificationService *NotificationService
 var temporalClient client.Client
-var pipelineLog = logger.Global.Nested("pipeline service")
+var globalPipelineLog = logger.Global.Nested("pipeline service")
 
 // PipelineQuery FIXME ...
 type PipelineQuery struct {
@@ -85,7 +87,7 @@ func pipelineServiceInit() {
 		panic(fmt.Errorf(`PIPELINE_MAX_PARALLEL should be a positive integer`))
 	}
 	if pipelineMaxParallel == 0 {
-		pipelineLog.Warn(`pipelineMaxParallel=0 means pipeline will be run No Limit`)
+		globalPipelineLog.Warn(`pipelineMaxParallel=0 means pipeline will be run No Limit`)
 		pipelineMaxParallel = 10000
 	}
 	// run pipeline with independent goroutine
@@ -109,7 +111,7 @@ func CreatePipeline(newPipeline *models.NewPipeline) (*models.Pipeline, error) {
 	// save pipeline to database
 	err := db.Create(&pipeline).Error
 	if err != nil {
-		pipelineLog.Error("create pipline failed: %w", err)
+		globalPipelineLog.Error("create pipline failed: %w", err)
 		return nil, errors.InternalError
 	}
 
@@ -125,7 +127,7 @@ func CreatePipeline(newPipeline *models.NewPipeline) (*models.Pipeline, error) {
 			}
 			_, err := CreateTask(newTask)
 			if err != nil {
-				pipelineLog.Error("create task for pipeline failed: %w", err)
+				globalPipelineLog.Error("create task for pipeline failed: %w", err)
 				return nil, err
 			}
 			// sync task state back to pipeline
@@ -133,7 +135,7 @@ func CreatePipeline(newPipeline *models.NewPipeline) (*models.Pipeline, error) {
 		}
 	}
 	if err != nil {
-		pipelineLog.Error("save tasks for pipeline failed: %w", err)
+		globalPipelineLog.Error("save tasks for pipeline failed: %w", err)
 		return nil, errors.InternalError
 	}
 	if pipeline.TotalTasks == 0 {
@@ -150,7 +152,7 @@ func CreatePipeline(newPipeline *models.NewPipeline) (*models.Pipeline, error) {
 		"plan":        pipeline.Plan,
 	}).Error
 	if err != nil {
-		pipelineLog.Error("update pipline state failed: %w", err)
+		globalPipelineLog.Error("update pipline state failed: %w", err)
 		return nil, errors.InternalError
 	}
 
@@ -199,21 +201,34 @@ func GetPipeline(pipelineId uint64) (*models.Pipeline, error) {
 	return pipeline, nil
 }
 
+// GetPipelineLogsArchivePath creates an archive for the logs of this pipeline and returns its file path
+func GetPipelineLogsArchivePath(pipeline *models.Pipeline) (string, error) {
+	logPath, err := getPipelineLogsPath(pipeline)
+	if err != nil {
+		return "", err
+	}
+	archive := fmt.Sprintf("%s/%s/logging.tar.gz", os.TempDir(), uuid.New())
+	if err = utils.CreateArchive(archive, true, logPath); err != nil {
+		return "", err
+	}
+	return archive, err
+}
+
 // RunPipelineInQueue query pipeline from db and run it in a queue
 func RunPipelineInQueue(pipelineMaxParallel int64) {
 	sema := semaphore.NewWeighted(pipelineMaxParallel)
 	startedPipelineIds := []uint64{}
-	for true {
-		pipelineLog.Info("wait for new pipeline")
+	for {
+		globalPipelineLog.Info("wait for new pipeline")
 		// start goroutine when sema lock ready and pipeline exist.
 		// to avoid read old pipeline, acquire lock before read exist pipeline
 		err := sema.Acquire(context.TODO(), 1)
 		if err != nil {
 			panic(err)
 		}
-		pipelineLog.Info("get lock and wait pipeline")
+		globalPipelineLog.Info("get lock and wait pipeline")
 		pipeline := &models.Pipeline{}
-		for true {
+		for {
 			db.Where("status = ?", models.TASK_CREATED).
 				Not(startedPipelineIds).
 				Order("id ASC").Limit(1).Find(pipeline)
@@ -225,80 +240,15 @@ func RunPipelineInQueue(pipelineMaxParallel int64) {
 		startedPipelineIds = append(startedPipelineIds, pipeline.ID)
 		go func() {
 			defer sema.Release(1)
-			pipelineLog.Info("run pipeline, %d", pipeline.ID)
-			_ = runPipeline(pipeline.ID)
+			globalPipelineLog.Info("run pipeline, %d", pipeline.ID)
+			err = runPipeline(pipeline.ID)
+			if err != nil {
+				globalPipelineLog.Error("failed to run pipeline, %d: %v", pipeline.ID, err)
+			}
 		}()
 	}
 }
 
-// runPipeline start a pipeline actually
-func runPipeline(pipelineId uint64) error {
-	var err error
-	// run
-	if temporalClient != nil {
-		err = runPipelineViaTemporal(pipelineId)
-	} else {
-		err = runPipelineStandalone(pipelineId)
-	}
-	// load
-	pipeline, e := GetPipeline(pipelineId)
-	if e != nil {
-		return err
-	}
-	// finished, update database
-	finishedAt := time.Now()
-	pipeline.FinishedAt = &finishedAt
-	pipeline.SpentSeconds = int(finishedAt.Unix() - pipeline.BeganAt.Unix())
-	if err != nil {
-		pipeline.Status = models.TASK_FAILED
-		pipeline.Message = err.Error()
-	} else {
-		pipeline.Status = models.TASK_COMPLETED
-		pipeline.Message = ""
-	}
-	dbe := db.Model(pipeline).Select("finished_at", "spent_seconds", "status", "message").Updates(pipeline).Error
-	if dbe != nil {
-		pipelineLog.Error("update pipeline state failed: %w", dbe)
-		return dbe
-	}
-	// notify external webhook
-	return NotifyExternal(pipelineId)
-}
-
-func getTemporalWorkflowId(pipelineId uint64) string {
-	return fmt.Sprintf("pipeline #%d", pipelineId)
-}
-
-func runPipelineViaTemporal(pipelineId uint64) error {
-	workflowOpts := client.StartWorkflowOptions{
-		ID:        getTemporalWorkflowId(pipelineId),
-		TaskQueue: cfg.GetString("TEMPORAL_TASK_QUEUE"),
-	}
-	// send only the very basis data
-	configJson, err := json.Marshal(cfg.AllSettings())
-	if err != nil {
-		return err
-	}
-	pipelineLog.Info("enqueue pipeline #%d into temporal task queue", pipelineId)
-	workflow, err := temporalClient.ExecuteWorkflow(
-		context.Background(),
-		workflowOpts,
-		app.DevLakePipelineWorkflow,
-		configJson,
-		pipelineId,
-	)
-	if err != nil {
-		pipelineLog.Error("failed to enqueue pipeline #%d into temporal", pipelineId)
-		return err
-	}
-	err = workflow.Get(context.Background(), nil)
-	if err != nil {
-		pipelineLog.Info("failed to execute pipeline #%d via temporal: %w", pipelineId, err)
-	}
-	pipelineLog.Info("pipeline #%d finished by temporal", pipelineId)
-	return err
-}
-
 func watchTemporalPipelines() {
 	ticker := time.NewTicker(3 * time.Second)
 	dc := converter.GetDefaultDataConverter()
@@ -321,7 +271,7 @@ func watchTemporalPipelines() {
 					"",
 				)
 				if err != nil {
-					pipelineLog.Error("failed to query workflow execution: %w", err)
+					globalPipelineLog.Error("failed to query workflow execution: %w", err)
 					continue
 				}
 				// workflow is terminated by outsider
@@ -341,7 +291,7 @@ func watchTemporalPipelines() {
 						for hisIter.HasNext() {
 							his, err := hisIter.Next()
 							if err != nil {
-								pipelineLog.Error("failed to get next from workflow history iterator: %w", err)
+								globalPipelineLog.Error("failed to get next from workflow history iterator: %w", err)
 								continue
 							}
 							rp.Message = fmt.Sprintf("temporal event type: %v", his.GetEventType())
@@ -354,7 +304,7 @@ func watchTemporalPipelines() {
 						"finished_at": rp.FinishedAt,
 					}).Error
 					if err != nil {
-						pipelineLog.Error("failed to update db: %w", err)
+						globalPipelineLog.Error("failed to update db: %w", err)
 					}
 					continue
 				}
@@ -363,7 +313,7 @@ func watchTemporalPipelines() {
 				for _, activity := range desc.PendingActivities {
 					taskId, err := getTaskIdFromActivityId(activity.ActivityId)
 					if err != nil {
-						pipelineLog.Error("unable to extract task id from activity id `%s`", activity.ActivityId)
+						globalPipelineLog.Error("unable to extract task id from activity id `%s`", activity.ActivityId)
 						continue
 					}
 					progressDetail := &models.TaskProgressDetail{}
@@ -379,7 +329,7 @@ func watchTemporalPipelines() {
 					lastPayload := payloads[len(payloads)-1]
 					err = dc.FromPayload(lastPayload, progressDetail)
 					if err != nil {
-						pipelineLog.Error("failed to unmarshal heartbeat payload: %w", err)
+						globalPipelineLog.Error("failed to unmarshal heartbeat payload: %w", err)
 						continue
 					}
 				}
@@ -389,14 +339,8 @@ func watchTemporalPipelines() {
 	}()
 }
 
-func runPipelineStandalone(pipelineId uint64) error {
-	return runner.RunPipeline(
-		cfg,
-		pipelineLog.Nested(fmt.Sprintf("pipeline #%d", pipelineId)),
-		db,
-		pipelineId,
-		runTasksStandalone,
-	)
+func getTemporalWorkflowId(pipelineId uint64) string {
+	return fmt.Sprintf("pipeline #%d", pipelineId)
 }
 
 // NotifyExternal FIXME ...
@@ -418,7 +362,7 @@ func NotifyExternal(pipelineId uint64) error {
 		Status:     pipeline.Status,
 	})
 	if err != nil {
-		pipelineLog.Error("failed to send notification: %w", err)
+		globalPipelineLog.Error("failed to send notification: %w", err)
 		return err
 	}
 	return nil
@@ -441,3 +385,17 @@ func CancelPipeline(pipelineId uint64) error {
 	}
 	return err
 }
+
+// getPipelineLogsPath gets the logs directory of this pipeline
+func getPipelineLogsPath(pipeline *models.Pipeline) (string, error) {
+	pipelineLog := getPipelineLogger(pipeline)
+	path := filepath.Dir(pipelineLog.GetConfig().Path)
+	_, err := os.Stat(path)
+	if err == nil {
+		return path, nil
+	}
+	if os.IsNotExist(err) {
+		return "", fmt.Errorf("logs for pipeline #%d not found: %v", pipeline.ID, err)
+	}
+	return "", fmt.Errorf("err validating logs path for pipeline #%d: %v", pipeline.ID, err)
+}
diff --git a/services/pipeline_runner.go b/services/pipeline_runner.go
new file mode 100644
index 00000000..e88ce554
--- /dev/null
+++ b/services/pipeline_runner.go
@@ -0,0 +1,139 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package services
+
+import (
+	"context"
+	"encoding/json"
+	"fmt"
+	"github.com/apache/incubator-devlake/logger"
+	"github.com/apache/incubator-devlake/models"
+	"github.com/apache/incubator-devlake/plugins/core"
+	"github.com/apache/incubator-devlake/runner"
+	"github.com/apache/incubator-devlake/worker/app"
+	"go.temporal.io/sdk/client"
+	"time"
+)
+
+type pipelineRunner struct {
+	logger   core.Logger
+	pipeline *models.Pipeline
+}
+
+func (p *pipelineRunner) runPipelineStandalone() error {
+	return runner.RunPipeline(
+		cfg,
+		p.logger,
+		db,
+		p.pipeline.ID,
+		func(taskIds []uint64) error {
+			return runTasksStandalone(p.logger, taskIds)
+		},
+	)
+}
+
+func (p *pipelineRunner) runPipelineViaTemporal() error {
+	workflowOpts := client.StartWorkflowOptions{
+		ID:        getTemporalWorkflowId(p.pipeline.ID),
+		TaskQueue: cfg.GetString("TEMPORAL_TASK_QUEUE"),
+	}
+	// send only the very basis data
+	configJson, err := json.Marshal(cfg.AllSettings())
+	if err != nil {
+		return err
+	}
+	p.logger.Info("enqueue pipeline #%d into temporal task queue", p.pipeline.ID)
+	workflow, err := temporalClient.ExecuteWorkflow(
+		context.Background(),
+		workflowOpts,
+		app.DevLakePipelineWorkflow,
+		configJson,
+		p.pipeline.ID,
+		p.logger.GetConfig(),
+	)
+	if err != nil {
+		p.logger.Error("failed to enqueue pipeline #%d into temporal", p.pipeline.ID)
+		return err
+	}
+	err = workflow.Get(context.Background(), nil)
+	if err != nil {
+		p.logger.Info("failed to execute pipeline #%d via temporal: %w", p.pipeline.ID, err)
+	}
+	p.logger.Info("pipeline #%d finished by temporal", p.pipeline.ID)
+	return err
+}
+
+func getPipelineLogger(pipeline *models.Pipeline) core.Logger {
+	pipelineLogger := globalPipelineLog.Nested(
+		fmt.Sprintf("pipeline #%d", pipeline.ID),
+	)
+	loggingPath := logger.GetPipelineLoggerPath(pipelineLogger.GetConfig(), pipeline)
+	stream, err := logger.GetFileStream(loggingPath)
+	if err != nil {
+		globalPipelineLog.Error("unable to set stream for logging pipeline %d", pipeline.ID)
+	} else {
+		pipelineLogger.SetStream(&core.LoggerStreamConfig{
+			Path:   loggingPath,
+			Writer: stream,
+		})
+	}
+	return pipelineLogger
+}
+
+// runPipeline start a pipeline actually
+func runPipeline(pipelineId uint64) error {
+	pipeline, err := GetPipeline(pipelineId)
+	if err != nil {
+		return err
+	}
+	pipelineRun := pipelineRunner{
+		logger:   getPipelineLogger(pipeline),
+		pipeline: pipeline,
+	}
+	// run
+	if temporalClient != nil {
+		err = pipelineRun.runPipelineViaTemporal()
+	} else {
+		err = pipelineRun.runPipelineStandalone()
+	}
+	if err != nil {
+		err = fmt.Errorf("error running pipeline %d: %v", pipelineId, err)
+	}
+	pipeline, e := GetPipeline(pipelineId)
+	if e != nil {
+		return fmt.Errorf("unable to get pipeline %d: %v", pipelineId, err)
+	}
+	// finished, update database
+	finishedAt := time.Now()
+	pipeline.FinishedAt = &finishedAt
+	pipeline.SpentSeconds = int(finishedAt.Unix() - pipeline.BeganAt.Unix())
+	if err != nil {
+		pipeline.Status = models.TASK_FAILED
+		pipeline.Message = err.Error()
+	} else {
+		pipeline.Status = models.TASK_COMPLETED
+		pipeline.Message = ""
+	}
+	dbe := db.Model(pipeline).Select("finished_at", "spent_seconds", "status", "message").Updates(pipeline).Error
+	if dbe != nil {
+		globalPipelineLog.Error("update pipeline state failed: %w", dbe)
+		return dbe
+	}
+	// notify external webhook
+	return NotifyExternal(pipelineId)
+}
diff --git a/services/task.go b/services/task.go
index 067ff5dc..83944e78 100644
--- a/services/task.go
+++ b/services/task.go
@@ -219,13 +219,13 @@ func CancelTask(taskId uint64) error {
 	return nil
 }
 
-func runTasksStandalone(taskIds []uint64) error {
+func runTasksStandalone(parentLogger core.Logger, taskIds []uint64) error {
 	results := make(chan error)
 	for _, taskId := range taskIds {
 		taskId := taskId
 		go func() {
 			taskLog.Info("run task in background ", taskId)
-			results <- runTaskStandalone(taskId)
+			results <- runTaskStandalone(parentLogger, taskId)
 		}()
 	}
 	errs := make([]string, 0)
@@ -247,8 +247,8 @@ func runTasksStandalone(taskIds []uint64) error {
 	return err
 }
 
-func runTaskStandalone(taskId uint64) error {
-	// deferng cleaning up
+func runTaskStandalone(parentLog core.Logger, taskId uint64) error {
+	// deferring cleaning up
 	defer func() {
 		_, _ = runningTasks.Remove(taskId)
 	}()
@@ -264,7 +264,7 @@ func runTaskStandalone(taskId uint64) error {
 	err = runner.RunTask(
 		ctx,
 		cfg,
-		logger.Global.Nested(fmt.Sprintf("task #%d", taskId)),
+		parentLog,
 		db,
 		progress,
 		taskId,
diff --git a/utils/io.go b/utils/io.go
new file mode 100644
index 00000000..d0c43365
--- /dev/null
+++ b/utils/io.go
@@ -0,0 +1,80 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package utils
+
+import (
+	"context"
+	"fmt"
+	"github.com/viant/afs"
+	"os"
+	"path/filepath"
+)
+
+// fs abstract filesystem interface singleton instance
+var fs = afs.New()
+
+// CreateArchive creates a tar archive and writes the files/directories associated with the `targetPaths` to it.
+// 'relativeCopy' = true will copy the contents inside each sourcePath (directory) over. If the sourcePath is a file, it is directly copied over.
+func CreateArchive(archivePath string, relativeCopy bool, sourcePaths ...string) error {
+	for _, sourcePath := range sourcePaths {
+		srcPathAbs, err := filepath.Abs(sourcePath)
+		if err != nil {
+			return fmt.Errorf("error getting absolute path of %s: %v", sourcePaths, err)
+		}
+		archivePathAbs, err := filepath.Abs(archivePath)
+		if err != nil {
+			return fmt.Errorf("error getting absolute path of %s: %v", archivePath, err)
+		}
+		srcInfo, err := os.Stat(srcPathAbs)
+		if err != nil {
+			return fmt.Errorf("error getting stats of path %s: %v", srcPathAbs, err)
+		}
+		if relativeCopy {
+			if srcInfo.IsDir() {
+				err = copyContentsToArchive(srcPathAbs, archivePathAbs)
+			} else {
+				err = copyToArchive(srcPathAbs, archivePathAbs, srcInfo.Name())
+			}
+		} else {
+			err = copyToArchive(srcPathAbs, archivePathAbs, sourcePath)
+		}
+		if err != nil {
+			return fmt.Errorf("error trying to copy data to archive: %v", err)
+		}
+	}
+	return nil
+}
+
+func copyContentsToArchive(absSourcePath string, absArchivePath string) error {
+	var files []os.DirEntry
+	files, err := os.ReadDir(absSourcePath)
+	if err != nil {
+		return err
+	}
+	for _, file := range files {
+		err = fs.Copy(context.Background(), fmt.Sprintf("file://%s/%s", absSourcePath, file.Name()), fmt.Sprintf("file:%s/tar:///%s", absArchivePath, file.Name()))
+		if err != nil {
+			return err
+		}
+	}
+	return nil
+}
+
+func copyToArchive(absSourcePath string, absArchivePath string, filename string) error {
+	return fs.Copy(context.Background(), fmt.Sprintf("file://%s", absSourcePath), fmt.Sprintf("file:%s/tar:///%s", absArchivePath, filename))
+}
diff --git a/worker/app/pipeline_workflow.go b/worker/app/pipeline_workflow.go
index 90fb40bd..ad80a984 100644
--- a/worker/app/pipeline_workflow.go
+++ b/worker/app/pipeline_workflow.go
@@ -19,6 +19,7 @@ package app
 
 import (
 	"fmt"
+	"github.com/apache/incubator-devlake/plugins/core"
 	"strings"
 	"time"
 
@@ -27,44 +28,55 @@ import (
 )
 
 // DevLakePipelineWorkflow FIXME ...
-func DevLakePipelineWorkflow(ctx workflow.Context, configJson []byte, pipelineId uint64) error {
-	cfg, logger, db, err := loadResources(configJson)
+func DevLakePipelineWorkflow(ctx workflow.Context, configJson []byte, pipelineId uint64, loggerConfig *core.LoggerConfig) error {
+	cfg, log, db, err := loadResources(configJson, loggerConfig)
 	if err != nil {
 		return err
 	}
-	logger.Info("received pipeline #%d", pipelineId)
+	log.Info("received pipeline #%d", pipelineId)
 	err = runner.RunPipeline(
 		cfg,
-		logger,
+		log,
 		db,
 		pipelineId,
 		func(taskIds []uint64) error {
-			futures := make([]workflow.Future, len(taskIds))
-			for i, taskId := range taskIds {
-				activityOpts := workflow.ActivityOptions{
-					ActivityID:          fmt.Sprintf("task #%d", taskId),
-					StartToCloseTimeout: 24 * time.Hour,
-					WaitForCancellation: true,
-				}
-				activityCtx := workflow.WithActivityOptions(ctx, activityOpts)
-				futures[i] = workflow.ExecuteActivity(activityCtx, DevLakeTaskActivity, configJson, taskId)
-			}
-			errs := make([]string, 0)
-			for _, future := range futures {
-				err := future.Get(ctx, nil)
-				if err != nil {
-					errs = append(errs, err.Error())
-				}
-			}
-			if len(errs) > 0 {
-				return fmt.Errorf(strings.Join(errs, "\n"))
-			}
-			return nil
+			return runTasks(ctx, configJson, taskIds, log)
 		},
 	)
 	if err != nil {
-		logger.Error("failed to execute pipeline #%d: %w", pipelineId, err)
+		log.Error("failed to execute pipeline #%d: %w", pipelineId, err)
 	}
-	logger.Info("finished pipeline #%d", pipelineId)
+	log.Info("finished pipeline #%d", pipelineId)
 	return err
 }
+
+func runTasks(ctx workflow.Context, configJson []byte, taskIds []uint64, logger core.Logger) error {
+	cleanExit := false
+	defer func() {
+		if !cleanExit {
+			logger.Error("fatal error while executing task Ids: %v", taskIds)
+		}
+	}()
+	futures := make([]workflow.Future, len(taskIds))
+	for i, taskId := range taskIds {
+		activityOpts := workflow.ActivityOptions{
+			ActivityID:          fmt.Sprintf("task #%d", taskId),
+			StartToCloseTimeout: 24 * time.Hour,
+			WaitForCancellation: true,
+		}
+		activityCtx := workflow.WithActivityOptions(ctx, activityOpts)
+		futures[i] = workflow.ExecuteActivity(activityCtx, DevLakeTaskActivity, configJson, taskId, logger)
+	}
+	errs := make([]string, 0)
+	for _, future := range futures {
+		err := future.Get(ctx, nil)
+		if err != nil {
+			errs = append(errs, err.Error())
+		}
+	}
+	cleanExit = true
+	if len(errs) > 0 {
+		return fmt.Errorf(strings.Join(errs, "\n"))
+	}
+	return nil
+}
diff --git a/worker/app/shared.go b/worker/app/shared.go
index 7d0fd246..14c24df3 100644
--- a/worker/app/shared.go
+++ b/worker/app/shared.go
@@ -27,7 +27,7 @@ import (
 	"gorm.io/gorm"
 )
 
-func loadResources(configJson []byte) (*viper.Viper, core.Logger, *gorm.DB, error) {
+func loadResources(configJson []byte, loggerConfig *core.LoggerConfig) (*viper.Viper, core.Logger, *gorm.DB, error) {
 	// prepare
 	cfg := viper.New()
 	cfg.SetConfigType("json")
@@ -41,5 +41,22 @@ func loadResources(configJson []byte) (*viper.Viper, core.Logger, *gorm.DB, erro
 	if err != nil {
 		return nil, nil, nil, err
 	}
-	return cfg, globalLogger, db, err
+	log, err := getWorkerLogger(globalLogger, loggerConfig)
+	if err != nil {
+		return nil, nil, nil, err
+	}
+	return cfg, log, db, err
+}
+
+func getWorkerLogger(log core.Logger, logConfig *core.LoggerConfig) (core.Logger, error) {
+	newLog := log.Nested(logConfig.Prefix)
+	stream, err := logger.GetFileStream(logConfig.Path)
+	if err != nil {
+		return nil, err
+	}
+	newLog.SetStream(&core.LoggerStreamConfig{
+		Path:   logConfig.Path,
+		Writer: stream,
+	})
+	return newLog, nil
 }
diff --git a/worker/app/task_activity.go b/worker/app/task_activity.go
index 9fd0ad16..a23dd2b5 100644
--- a/worker/app/task_activity.go
+++ b/worker/app/task_activity.go
@@ -27,8 +27,8 @@ import (
 )
 
 // DevLakeTaskActivity FIXME ...
-func DevLakeTaskActivity(ctx context.Context, configJson []byte, taskId uint64) error {
-	cfg, log, db, err := loadResources(configJson)
+func DevLakeTaskActivity(ctx context.Context, configJson []byte, taskId uint64, loggerConfig *core.LoggerConfig) error {
+	cfg, log, db, err := loadResources(configJson, loggerConfig)
 	if err != nil {
 		return err
 	}