You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@devlake.apache.org by li...@apache.org on 2022/11/15 07:53:42 UTC
[incubator-devlake] branch main updated: feature: skip & rerun failed tasks (#3680)
This is an automated email from the ASF dual-hosted git repository.
likyh pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git
The following commit(s) were added to refs/heads/main by this push:
new e3f18800b feature: skip & rerun failed tasks (#3680)
e3f18800b is described below
commit e3f18800b061287ef90f16ab8ffe4fcc9511498f
Author: mindlesscloud <li...@merico.dev>
AuthorDate: Tue Nov 15 15:53:39 2022 +0800
feature: skip & rerun failed tasks (#3680)
* feat: skip/rerun failed tasks
* fix: fix lint error
* fix: remove redundant code and fix comment
---
api/blueprints/blueprints.go | 53 ++------
api/router.go | 3 +-
api/task/task.go | 143 +++++++++++++++------
models/blueprint.go | 3 +
.../migrationscripts/20221107_add_skip_on_fail.go | 60 +++++++++
models/migrationscripts/register.go | 1 +
models/task.go | 2 +
plugins/core/plugin_blueprint.go | 8 +-
runner/run_pipeline.go | 45 +++----
runner/run_task.go | 7 +-
services/blueprint_helper.go | 2 +
services/blueprint_makeplan_v100.go | 1 +
services/pipeline.go | 22 ++--
services/pipeline_helper.go | 9 ++
services/pipeline_runner.go | 14 +-
services/task.go | 83 ++++++++++--
16 files changed, 324 insertions(+), 132 deletions(-)
diff --git a/api/blueprints/blueprints.go b/api/blueprints/blueprints.go
index dd8188158..c91b234f3 100644
--- a/api/blueprints/blueprints.go
+++ b/api/blueprints/blueprints.go
@@ -34,8 +34,8 @@ import (
// @Accept application/json
// @Param blueprint body models.Blueprint true "json"
// @Success 200 {object} models.Blueprint
-// @Failure 400 {string} errcode.Error "Bad Request"
-// @Failure 500 {string} errcode.Error "Internel Error"
+// @Failure 400 {object} shared.ApiBody "Bad Request"
+// @Failure 500 {object} shared.ApiBody "Internal Error"
// @Router /blueprints [post]
func Post(c *gin.Context) {
blueprint := &models.Blueprint{}
@@ -60,8 +60,8 @@ func Post(c *gin.Context) {
// @Tags framework/blueprints
// @Accept application/json
// @Success 200 {object} gin.H
-// @Failure 400 {string} errcode.Error "Bad Request"
-// @Failure 500 {string} errcode.Error "Internel Error"
+// @Failure 400 {object} shared.ApiBody "Bad Request"
+// @Failure 500 {object} shared.ApiBody "Internal Error"
// @Router /blueprints [get]
func Index(c *gin.Context) {
var query services.BlueprintQuery
@@ -84,8 +84,8 @@ func Index(c *gin.Context) {
// @Accept application/json
// @Param blueprintId path int true "blueprint id"
// @Success 200 {object} models.Blueprint
-// @Failure 400 {string} errcode.Error "Bad Request"
-// @Failure 500 {string} errcode.Error "Internel Error"
+// @Failure 400 {object} shared.ApiBody "Bad Request"
+// @Failure 500 {object} shared.ApiBody "Internal Error"
// @Router /blueprints/{blueprintId} [get]
func Get(c *gin.Context) {
blueprintId := c.Param("blueprintId")
@@ -107,8 +107,8 @@ func Get(c *gin.Context) {
// @Tags framework/blueprints
// @Param blueprintId path string true "blueprintId"
// @Success 200
-// @Failure 400 {string} errcode.Error "Bad Request"
-// @Failure 500 {string} errcode.Error "Internel Error"
+// @Failure 400 {object} shared.ApiBody "Bad Request"
+// @Failure 500 {object} shared.ApiBody "Internal Error"
// @Router /blueprints/{blueprintId} [delete]
func Delete(c *gin.Context) {
pipelineId := c.Param("blueprintId")
@@ -123,39 +123,14 @@ func Delete(c *gin.Context) {
}
}
-/*
-func Put(c *gin.Context) {
- blueprintId := c.Param("blueprintId")
- id, err := strconv.ParseUint(blueprintId, 10, 64)
- if err != nil {
- shared.ApiOutputError(c, err, http.StatusBadRequest)
- return
- }
- editBlueprint := &models.EditBlueprint{}
- err = c.MustBindWith(editBlueprint, binding.JSON)
- if err != nil {
- shared.ApiOutputError(c, err, http.StatusBadRequest)
- fmt.Println(err)
- return
- }
- editBlueprint.BlueprintId = id
- blueprint, err := services.ModifyBlueprint(editBlueprint)
- if err != nil {
- shared.ApiOutputError(c, err, http.StatusBadRequest)
- return
- }
- shared.ApiOutputSuccess(c, blueprint, http.StatusOK)
-}
-*/
-
// @Summary patch blueprints
// @Description patch blueprints
// @Tags framework/blueprints
// @Accept application/json
// @Param blueprintId path string true "blueprintId"
// @Success 200 {object} models.Blueprint
-// @Failure 400 {string} errcode.Error "Bad Request"
-// @Failure 500 {string} errcode.Error "Internel Error"
+// @Failure 400 {object} shared.ApiBody "Bad Request"
+// @Failure 500 {object} shared.ApiBody "Internal Error"
// @Router /blueprints/{blueprintId} [Patch]
func Patch(c *gin.Context) {
blueprintId := c.Param("blueprintId")
@@ -184,8 +159,8 @@ func Patch(c *gin.Context) {
// @Accept application/json
// @Param blueprintId path string true "blueprintId"
// @Success 200 {object} models.Pipeline
-// @Failure 400 {string} errcode.Error "Bad Request"
-// @Failure 500 {string} errcode.Error "Internel Error"
+// @Failure 400 {object} shared.ApiBody "Bad Request"
+// @Failure 500 {object} shared.ApiBody "Internal Error"
// @Router /blueprints/{blueprintId}/trigger [Post]
func Trigger(c *gin.Context) {
blueprintId := c.Param("blueprintId")
@@ -208,8 +183,8 @@ func Trigger(c *gin.Context) {
// @Accept application/json
// @Param blueprintId path int true "blueprint id"
// @Success 200 {object} shared.ResponsePipelines
-// @Failure 400 {string} errcode.Error "Bad Request"
-// @Failure 500 {string} errcode.Error "Internel Error"
+// @Failure 400 {object} shared.ApiBody "Bad Request"
+// @Failure 500 {object} shared.ApiBody "Internal Error"
// @Router /blueprints/{blueprintId}/pipelines [get]
func GetBlueprintPipelines(c *gin.Context) {
var query services.PipelineQuery
diff --git a/api/router.go b/api/router.go
index 662778429..0122a185a 100644
--- a/api/router.go
+++ b/api/router.go
@@ -49,7 +49,8 @@ func RegisterRouter(r *gin.Engine) {
r.GET("/blueprints/:blueprintId", blueprints.Get)
r.GET("/blueprints/:blueprintId/pipelines", blueprints.GetBlueprintPipelines)
r.DELETE("/pipelines/:pipelineId", pipelines.Delete)
- r.GET("/pipelines/:pipelineId/tasks", task.Index)
+ r.GET("/pipelines/:pipelineId/tasks", task.GetTaskByPipeline)
+ r.POST("/pipelines/:pipelineId/tasks", task.RerunTask)
r.GET("/pipelines/:pipelineId/logging.tar.gz", pipelines.DownloadLogs)
diff --git a/api/task/task.go b/api/task/task.go
index eca459eba..17bee977a 100644
--- a/api/task/task.go
+++ b/api/task/task.go
@@ -23,66 +23,135 @@ import (
"github.com/apache/incubator-devlake/api/shared"
"github.com/apache/incubator-devlake/errors"
+ "github.com/apache/incubator-devlake/models"
"github.com/apache/incubator-devlake/services"
"github.com/gin-gonic/gin"
)
-/*
-Get list of pipelines
-GET /pipelines/pipeline:id/tasks?status=TASK_RUNNING&pending=1&page=1&=pagesize=10
-{
- "tasks": [
- {"id": 1, "plugin": "", ...}
- ],
- "count": 5
+func Delete(c *gin.Context) {
+ taskId := c.Param("taskId")
+ id, err := strconv.ParseUint(taskId, 10, 64)
+ if err != nil {
+ shared.ApiOutputError(c, errors.BadInput.Wrap(err, "invalid task ID format"))
+ return
+ }
+ err = services.CancelTask(id)
+ if err != nil {
+ shared.ApiOutputError(c, errors.Default.Wrap(err, "error cancelling task"))
+ return
+ }
+ shared.ApiOutputSuccess(c, nil, http.StatusOK)
}
-*/
-// @Summary Get task
-// @Description get task
-// @Description SAMPLE
-// @Description {
-// @Description "tasks": [
-// @Description {"id": 1, "plugin": "", ...}
-// @Description ],
-// @Description "count": 5
-// @Description }
+
+type getTaskResponse struct {
+ Tasks []models.Task `json:"tasks"`
+ Count int `json:"count"`
+}
+
+// GetTaskByPipeline return most recent tasks
+// @Summary Get tasks, only the most recent tasks will be returned
// @Tags framework/task
// @Accept application/json
// @Param pipelineId path int true "pipelineId"
-// @Success 200 {string} gin.H "{"tasks": tasks, "count": count}"
-// @Failure 400 {string} errcode.Error "Bad Request"
-// @Failure 500 {string} errcode.Error "Internel Error"
+// @Success 200 {object} getTaskResponse
+// @Failure 400 {object} shared.ApiBody "Bad Request"
+// @Failure 500 {object} shared.ApiBody "Internal Error"
// @Router /pipelines/{pipelineId}/tasks [get]
-func Index(c *gin.Context) {
- var query services.TaskQuery
- err := c.ShouldBindQuery(&query)
+func GetTaskByPipeline(c *gin.Context) {
+ pipelineId, err := strconv.ParseUint(c.Param("pipelineId"), 10, 64)
if err != nil {
- shared.ApiOutputError(c, errors.BadInput.Wrap(err, shared.BadRequestBody))
+ shared.ApiOutputError(c, errors.BadInput.Wrap(err, "invalid pipeline ID format"))
return
}
- err = c.ShouldBindUri(&query)
- if err != nil {
- shared.ApiOutputError(c, errors.BadInput.Wrap(err, "bad request URI format"))
- return
- }
- tasks, count, err := services.GetTasks(&query)
+ tasks, err := services.GetTasksWithLastStatus(pipelineId)
if err != nil {
shared.ApiOutputError(c, errors.Default.Wrap(err, "error getting tasks"))
return
}
- shared.ApiOutputSuccess(c, gin.H{"tasks": tasks, "count": count}, http.StatusOK)
+ shared.ApiOutputSuccess(c, getTaskResponse{Tasks: tasks, Count: len(tasks)}, http.StatusOK)
}
-func Delete(c *gin.Context) {
- taskId := c.Param("taskId")
- id, err := strconv.ParseUint(taskId, 10, 64)
+type rerunRequest struct {
+ TaskId uint64 `json:"taskId"`
+}
+
+// RerunTask rerun the specified the task. If taskId is 0, all failed tasks of this pipeline will rerun
+// @Summary rerun tasks
+// @Tags framework/task
+// @Accept application/json
+// @Param pipelineId path int true "pipelineId"
+// @Param request body rerunRequest false "specify the task to rerun. If it's 0, all failed tasks of this pipeline will rerun"
+// @Success 200 {object} shared.ApiBody
+// @Failure 400 {object} shared.ApiBody "Bad Request"
+// @Failure 500 {object} shared.ApiBody "Internal Error"
+// @Router /pipelines/{pipelineId}/tasks [post]
+func RerunTask(c *gin.Context) {
+ var request rerunRequest
+ err := c.BindJSON(&request)
if err != nil {
shared.ApiOutputError(c, errors.BadInput.Wrap(err, "invalid task ID format"))
return
}
- err = services.CancelTask(id)
+ pipelineId, err := strconv.ParseUint(c.Param("pipelineId"), 10, 64)
if err != nil {
- shared.ApiOutputError(c, errors.Default.Wrap(err, "error cancelling task"))
+ shared.ApiOutputError(c, errors.BadInput.Wrap(err, "invalid pipeline ID format"))
+ return
+ }
+ pipeline, err := services.GetPipeline(pipelineId)
+ if err != nil {
+ shared.ApiOutputError(c, errors.Default.Wrap(err, "error get pipeline"))
+ return
+ }
+ if pipeline.Status == models.TASK_RUNNING {
+ shared.ApiOutputError(c, errors.BadInput.New("pipeline is running"))
+ return
+ }
+ if pipeline.Status == models.TASK_CREATED || pipeline.Status == models.TASK_RERUN {
+ shared.ApiOutputError(c, errors.BadInput.New("pipeline is waiting to run"))
+ return
+ }
+
+ var failedTasks []models.Task
+ if request.TaskId > 0 {
+ failedTask, err := services.GetTask(request.TaskId)
+ if err != nil || failedTask == nil {
+ shared.ApiOutputError(c, errors.Default.Wrap(err, "error getting failed task"))
+ return
+ }
+ if failedTask.PipelineId != pipelineId {
+ shared.ApiOutputError(c, errors.BadInput.New("the task ID and pipeline ID doesn't match"))
+ return
+ }
+ failedTasks = append(failedTasks, *failedTask)
+ } else {
+ tasks, err := services.GetTasksWithLastStatus(pipelineId)
+ if err != nil {
+ shared.ApiOutputError(c, errors.Default.Wrap(err, "error getting tasks"))
+ return
+ }
+ for _, task := range tasks {
+ if task.Status == models.TASK_FAILED {
+ failedTasks = append(failedTasks, task)
+ }
+ }
+ }
+ if len(failedTasks) == 0 {
+ shared.ApiOutputSuccess(c, nil, http.StatusOK)
+ return
+ }
+ err = services.DeleteCreatedTasks(pipelineId)
+ if err != nil {
+ shared.ApiOutputError(c, errors.Default.Wrap(err, "error delete tasks"))
+ return
+ }
+ _, err = services.SpawnTasks(failedTasks)
+ if err != nil {
+ shared.ApiOutputError(c, errors.Default.Wrap(err, "error create tasks"))
+ return
+ }
+ err = services.UpdateDbPipelineStatus(pipelineId, models.TASK_RERUN)
+ if err != nil {
+ shared.ApiOutputError(c, errors.Default.Wrap(err, "error create tasks"))
return
}
shared.ApiOutputSuccess(c, nil, http.StatusOK)
diff --git a/models/blueprint.go b/models/blueprint.go
index 12a93e995..18abab73e 100644
--- a/models/blueprint.go
+++ b/models/blueprint.go
@@ -39,12 +39,14 @@ type Blueprint struct {
//please check this https://crontab.guru/ for detail
CronConfig string `json:"cronConfig" format:"* * * * *" example:"0 0 * * 1"`
IsManual bool `json:"isManual"`
+ SkipOnFail bool `json:"skipOnFail"`
Settings json.RawMessage `json:"settings" swaggertype:"array,string" example:"please check api: /blueprints/<PLUGIN_NAME>/blueprint-setting"`
common.Model `swaggerignore:"true"`
}
type BlueprintSettings struct {
Version string `json:"version" validate:"required,semver,oneof=1.0.0"`
+ SkipOnFail bool `json:"skipOnFail"`
Connections json.RawMessage `json:"connections" validate:"required"`
BeforePlan json.RawMessage `json:"before_plan"`
AfterPlan json.RawMessage `json:"after_plan"`
@@ -69,6 +71,7 @@ type DbBlueprint struct {
//please check this https://crontab.guru/ for detail
CronConfig string `json:"cronConfig" format:"* * * * *" example:"0 0 * * 1"`
IsManual bool `json:"isManual"`
+ SkipOnFail bool `json:"skipOnFail"`
Settings string `json:"settings" encrypt:"yes" swaggertype:"array,string" example:"please check api: /blueprints/<PLUGIN_NAME>/blueprint-setting"`
common.Model `swaggerignore:"true"`
}
diff --git a/models/migrationscripts/20221107_add_skip_on_fail.go b/models/migrationscripts/20221107_add_skip_on_fail.go
new file mode 100644
index 000000000..71942ea55
--- /dev/null
+++ b/models/migrationscripts/20221107_add_skip_on_fail.go
@@ -0,0 +1,60 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package migrationscripts
+
+import (
+ "github.com/apache/incubator-devlake/errors"
+ "github.com/apache/incubator-devlake/helpers/migrationhelper"
+ "github.com/apache/incubator-devlake/plugins/core"
+)
+
+var _ core.MigrationScript = (*addSkipOnFail)(nil)
+
+type blueprint20221107 struct {
+ SkipOnFail bool
+}
+
+func (p blueprint20221107) TableName() string {
+ return "_devlake_blueprints"
+}
+
+type task20221107 struct {
+ SkipOnFail bool
+}
+
+func (t task20221107) TableName() string {
+ return "_devlake_tasks"
+}
+
+type addSkipOnFail struct{}
+
+func (*addSkipOnFail) Up(basicRes core.BasicRes) errors.Error {
+ return migrationhelper.AutoMigrateTables(
+ basicRes,
+ &blueprint20221107{},
+ &task20221107{},
+ )
+}
+
+func (*addSkipOnFail) Version() uint64 {
+ return 20221107095744
+}
+
+func (*addSkipOnFail) Name() string {
+ return "add skip_on_fail to _devlake_pipelines and _devlake_tasks"
+}
diff --git a/models/migrationscripts/register.go b/models/migrationscripts/register.go
index 4144b2d3d..7acf5e669 100644
--- a/models/migrationscripts/register.go
+++ b/models/migrationscripts/register.go
@@ -54,6 +54,7 @@ func All() []core.MigrationScript {
new(createCollectorState),
new(removeCicdPipelineRelation),
new(addCicdScope),
+ new(addSkipOnFail),
new(modifyCommitsDiffs),
}
}
diff --git a/models/task.go b/models/task.go
index 120057633..2f4a85b1e 100644
--- a/models/task.go
+++ b/models/task.go
@@ -27,6 +27,7 @@ import (
const (
TASK_CREATED = "TASK_CREATED"
+ TASK_RERUN = "TASK_RERUN"
TASK_RUNNING = "TASK_RUNNING"
TASK_COMPLETED = "TASK_COMPLETED"
TASK_FAILED = "TASK_FAILED"
@@ -58,6 +59,7 @@ type Task struct {
BeganAt *time.Time `json:"beganAt"`
FinishedAt *time.Time `json:"finishedAt" gorm:"index"`
SpentSeconds int `json:"spentSeconds"`
+ SkipOnFail bool `json:"-"`
}
type NewTask struct {
diff --git a/plugins/core/plugin_blueprint.go b/plugins/core/plugin_blueprint.go
index cb1d8f7ee..3d8ca0583 100644
--- a/plugins/core/plugin_blueprint.go
+++ b/plugins/core/plugin_blueprint.go
@@ -25,9 +25,10 @@ import (
// PipelineTask represents a smallest unit of execution inside a PipelinePlan
type PipelineTask struct {
// Plugin name
- Plugin string `json:"plugin" binding:"required"`
- Subtasks []string `json:"subtasks"`
- Options map[string]interface{} `json:"options"`
+ Plugin string `json:"plugin" binding:"required"`
+ SkipOnFail bool `json:"skipOnFail"`
+ Subtasks []string `json:"subtasks"`
+ Options map[string]interface{} `json:"options"`
}
// PipelineStage consist of multiple PipelineTasks, they will be executed in parallel
@@ -51,6 +52,7 @@ type PluginBlueprintV100 interface {
type BlueprintConnectionV100 struct {
Plugin string `json:"plugin" validate:"required"`
ConnectionId uint64 `json:"connectionId" validate:"required"`
+ SkipOnFail bool `json:"skipOnFail"`
Scope []*BlueprintScopeV100 `json:"scope" validate:"required"`
}
diff --git a/runner/run_pipeline.go b/runner/run_pipeline.go
index cbaa64ff8..32b85d69a 100644
--- a/runner/run_pipeline.go
+++ b/runner/run_pipeline.go
@@ -18,11 +18,9 @@ limitations under the License.
package runner
import (
- "fmt"
"time"
"github.com/apache/incubator-devlake/errors"
-
"github.com/apache/incubator-devlake/models"
"github.com/apache/incubator-devlake/plugins/core"
"github.com/spf13/viper"
@@ -37,23 +35,12 @@ func RunPipeline(
pipelineId uint64,
runTasks func([]uint64) errors.Error,
) errors.Error {
- startTime := time.Now()
- // load pipeline from db
- dbPipeline := &models.DbPipeline{}
- err := db.Find(dbPipeline, pipelineId).Error
- if err != nil {
- return errors.Convert(err)
- }
// load tasks for pipeline
- var tasks []*models.Task
- err = db.Where("pipeline_id = ?", dbPipeline.ID).Order("pipeline_row, pipeline_col").Find(&tasks).Error
+ var tasks []models.Task
+ err := db.Where("pipeline_id = ? AND status = ?", pipelineId, models.TASK_CREATED).Order("pipeline_row, pipeline_col").Find(&tasks).Error
if err != nil {
return errors.Convert(err)
}
- if len(tasks) != dbPipeline.TotalTasks {
- return errors.Internal.New(fmt.Sprintf("expected total tasks to be %v, got %v", dbPipeline.TotalTasks, len(tasks)))
- }
- // convert to 2d array
taskIds := make([][]uint64, 0)
for _, task := range tasks {
for len(taskIds) < task.PipelineRow {
@@ -61,19 +48,25 @@ func RunPipeline(
}
taskIds[task.PipelineRow-1] = append(taskIds[task.PipelineRow-1], task.ID)
}
+ return runPipelineTasks(log, db, pipelineId, taskIds, runTasks)
+}
- beganAt := time.Now()
- err = db.Model(dbPipeline).Updates(map[string]interface{}{
- "status": models.TASK_RUNNING,
- "message": "",
- "began_at": beganAt,
- }).Error
+func runPipelineTasks(
+ log core.Logger,
+ db *gorm.DB,
+ pipelineId uint64,
+ taskIds [][]uint64,
+ runTasks func([]uint64) errors.Error,
+) errors.Error {
+ // load pipeline from db
+ dbPipeline := &models.DbPipeline{}
+ err := db.Find(dbPipeline, pipelineId).Error
if err != nil {
return errors.Convert(err)
}
+
// This double for loop executes each set of tasks sequentially while
// executing the set of tasks concurrently.
- finishedTasks := 0
for i, row := range taskIds {
// update stage
err = db.Model(dbPipeline).Updates(map[string]interface{}{
@@ -90,18 +83,16 @@ func RunPipeline(
log.Error(err, "run tasks failed")
return errors.Convert(err)
}
- // Deprecated
+
// update finishedTasks
- finishedTasks += len(row)
err = db.Model(dbPipeline).Updates(map[string]interface{}{
- "finished_tasks": finishedTasks,
+ "finished_tasks": gorm.Expr("finished_tasks + ?", len(row)),
}).Error
if err != nil {
log.Error(err, "update pipeline state failed")
return errors.Convert(err)
}
}
- endTime := time.Now()
- log.Info("pipeline finished in %d ms: %v", endTime.UnixMilli()-startTime.UnixMilli(), err)
+ log.Info("pipeline finished in %d ms: %v", time.Now().UnixMilli()-dbPipeline.BeganAt.UnixMilli(), err)
return errors.Convert(err)
}
diff --git a/runner/run_task.go b/runner/run_task.go
index b1568f7a5..8e6bedc58 100644
--- a/runner/run_task.go
+++ b/runner/run_task.go
@@ -66,7 +66,9 @@ func RunTask(
default:
e = fmt.Errorf("%v", et)
}
- err = errors.Default.Wrap(e, fmt.Sprintf("run task failed with panic (%s)", utils.GatherCallFrames(0)))
+ if !task.SkipOnFail {
+ err = errors.Default.Wrap(e, fmt.Sprintf("run task failed with panic (%s)", utils.GatherCallFrames(0)))
+ }
}
finishedAt := time.Now()
spentSeconds := finishedAt.Unix() - beganAt.Unix()
@@ -135,6 +137,9 @@ func RunTask(
options,
progress,
)
+ if err != nil && task.SkipOnFail {
+ return nil
+ }
return err
}
diff --git a/services/blueprint_helper.go b/services/blueprint_helper.go
index 393735ffb..8a56bb710 100644
--- a/services/blueprint_helper.go
+++ b/services/blueprint_helper.go
@@ -90,6 +90,7 @@ func parseBlueprint(DbBlueprint *models.DbBlueprint) *models.Blueprint {
Enable: DbBlueprint.Enable,
CronConfig: DbBlueprint.CronConfig,
IsManual: DbBlueprint.IsManual,
+ SkipOnFail: DbBlueprint.SkipOnFail,
Settings: []byte(DbBlueprint.Settings),
Model: DbBlueprint.Model,
}
@@ -105,6 +106,7 @@ func parseDbBlueprint(blueprint *models.Blueprint) *models.DbBlueprint {
Enable: blueprint.Enable,
CronConfig: blueprint.CronConfig,
IsManual: blueprint.IsManual,
+ SkipOnFail: blueprint.SkipOnFail,
Settings: string(blueprint.Settings),
Model: blueprint.Model,
}
diff --git a/services/blueprint_makeplan_v100.go b/services/blueprint_makeplan_v100.go
index f3bd47d64..4c8be942e 100644
--- a/services/blueprint_makeplan_v100.go
+++ b/services/blueprint_makeplan_v100.go
@@ -54,6 +54,7 @@ func GeneratePlanJsonV100(settings *models.BlueprintSettings) (core.PipelinePlan
}
for _, stage := range plans[i] {
for _, task := range stage {
+ task.SkipOnFail = settings.SkipOnFail
if task.Plugin == "dora" {
hasDoraEnrich = true
for k, v := range task.Options {
diff --git a/services/pipeline.go b/services/pipeline.go
index 615218b37..5bcbf3fd3 100644
--- a/services/pipeline.go
+++ b/services/pipeline.go
@@ -155,7 +155,6 @@ func GetPipelineLogsArchivePath(pipeline *models.Pipeline) (string, errors.Error
// RunPipelineInQueue query pipeline from db and run it in a queue
func RunPipelineInQueue(pipelineMaxParallel int64) {
sema := semaphore.NewWeighted(pipelineMaxParallel)
- startedPipelineIds := []uint64{}
for {
globalPipelineLog.Info("acquire lock")
// start goroutine when sema lock ready and pipeline exist.
@@ -168,24 +167,27 @@ func RunPipelineInQueue(pipelineMaxParallel int64) {
dbPipeline := &models.DbPipeline{}
for {
cronLocker.Lock()
- db.Where("status = ?", models.TASK_CREATED).
- Not(startedPipelineIds).
+ db.Where("status IN ?", []string{models.TASK_CREATED, models.TASK_RERUN}).
Order("id ASC").Limit(1).Find(dbPipeline)
cronLocker.Unlock()
if dbPipeline.ID != 0 {
+ db.Model(&models.DbPipeline{}).Where("id = ?", dbPipeline.ID).Updates(map[string]interface{}{
+ "status": models.TASK_RUNNING,
+ "message": "",
+ "began_at": time.Now(),
+ })
break
}
time.Sleep(time.Second)
}
- startedPipelineIds = append(startedPipelineIds, dbPipeline.ID)
- go func() {
+ go func(pipelineId uint64) {
defer sema.Release(1)
- globalPipelineLog.Info("run pipeline, %d", dbPipeline.ID)
- err = runPipeline(dbPipeline.ID)
+ globalPipelineLog.Info("run pipeline, %d", pipelineId)
+ err = runPipeline(pipelineId)
if err != nil {
- globalPipelineLog.Error(err, "failed to run pipeline %d", dbPipeline.ID)
+ globalPipelineLog.Error(err, "failed to run pipeline %d", pipelineId)
}
- }()
+ }(dbPipeline.ID)
}
}
@@ -329,7 +331,7 @@ func CancelPipeline(pipelineId uint64) errors.Error {
// getPipelineLogsPath gets the logs directory of this pipeline
func getPipelineLogsPath(pipeline *models.Pipeline) (string, errors.Error) {
- pipelineLog := getPipelineLogger(pipeline)
+ pipelineLog := GetPipelineLogger(pipeline)
path := filepath.Dir(pipelineLog.GetConfig().Path)
_, err := os.Stat(path)
if err == nil {
diff --git a/services/pipeline_helper.go b/services/pipeline_helper.go
index 26661d460..a8e38dc18 100644
--- a/services/pipeline_helper.go
+++ b/services/pipeline_helper.go
@@ -196,3 +196,12 @@ func decryptDbPipeline(dbPipeline *models.DbPipeline) (*models.DbPipeline, error
dbPipeline.Plan = plan
return dbPipeline, nil
}
+
+// UpdateDbPipelineStatus update the status of pipeline
+func UpdateDbPipelineStatus(pipelineId uint64, status string) errors.Error {
+ err := db.Model(&models.DbPipeline{}).Where("id = ?", pipelineId).Update("status", status).Error
+ if err != nil {
+ return errors.Convert(err)
+ }
+ return nil
+}
diff --git a/services/pipeline_runner.go b/services/pipeline_runner.go
index 0b7a8b27e..0ec968704 100644
--- a/services/pipeline_runner.go
+++ b/services/pipeline_runner.go
@@ -21,6 +21,8 @@ import (
"context"
"encoding/json"
"fmt"
+ "time"
+
"github.com/apache/incubator-devlake/errors"
"github.com/apache/incubator-devlake/logger"
"github.com/apache/incubator-devlake/models"
@@ -28,7 +30,6 @@ import (
"github.com/apache/incubator-devlake/runner"
"github.com/apache/incubator-devlake/worker/app"
"go.temporal.io/sdk/client"
- "time"
)
type pipelineRunner struct {
@@ -43,7 +44,7 @@ func (p *pipelineRunner) runPipelineStandalone() errors.Error {
db,
p.pipeline.ID,
func(taskIds []uint64) errors.Error {
- return runTasksStandalone(p.logger, taskIds)
+ return RunTasksStandalone(p.logger, taskIds)
},
)
}
@@ -79,7 +80,8 @@ func (p *pipelineRunner) runPipelineViaTemporal() errors.Error {
return errors.Convert(err)
}
-func getPipelineLogger(pipeline *models.Pipeline) core.Logger {
+// GetPipelineLogger returns logger for the pipeline
+func GetPipelineLogger(pipeline *models.Pipeline) core.Logger {
pipelineLogger := globalPipelineLog.Nested(
fmt.Sprintf("pipeline #%d", pipeline.ID),
)
@@ -98,13 +100,13 @@ func getPipelineLogger(pipeline *models.Pipeline) core.Logger {
// runPipeline start a pipeline actually
func runPipeline(pipelineId uint64) errors.Error {
- pipeline, err := GetPipeline(pipelineId)
+ ppl, err := GetPipeline(pipelineId)
if err != nil {
return err
}
pipelineRun := pipelineRunner{
- logger: getPipelineLogger(pipeline),
- pipeline: pipeline,
+ logger: GetPipelineLogger(ppl),
+ pipeline: ppl,
}
// run
if temporalClient != nil {
diff --git a/services/task.go b/services/task.go
index c21acdbc8..022450488 100644
--- a/services/task.go
+++ b/services/task.go
@@ -27,9 +27,9 @@ import (
"sync"
"github.com/apache/incubator-devlake/errors"
-
"github.com/apache/incubator-devlake/logger"
"github.com/apache/incubator-devlake/models"
+ "github.com/apache/incubator-devlake/models/common"
"github.com/apache/incubator-devlake/plugins/core"
"github.com/apache/incubator-devlake/runner"
"gorm.io/gorm"
@@ -153,6 +153,7 @@ func CreateTask(newTask *models.NewTask) (*models.Task, errors.Error) {
PipelineId: newTask.PipelineId,
PipelineRow: newTask.PipelineRow,
PipelineCol: newTask.PipelineCol,
+ SkipOnFail: newTask.SkipOnFail,
}
err = db.Save(&task).Error
if err != nil {
@@ -194,6 +195,60 @@ func GetTasks(query *TaskQuery) ([]models.Task, int64, errors.Error) {
return tasks, count, nil
}
+// GetTasksWithLastStatus returns task list of the pipeline, only the most recently tasks would be returned
+func GetTasksWithLastStatus(pipelineId uint64) ([]models.Task, errors.Error) {
+ var tasks []models.Task
+ dbInner := db.Model(&models.Task{}).Order("id DESC").Where("pipeline_id = ?", pipelineId)
+ err := dbInner.Find(&tasks).Error
+ if err != nil {
+ return nil, errors.Convert(err)
+ }
+ taskIds := make(map[int64]struct{})
+ var result []models.Task
+ var maxRow, maxCol int
+ for _, task := range tasks {
+ if task.PipelineRow > maxRow {
+ maxRow = task.PipelineRow
+ }
+ if task.PipelineCol > maxCol {
+ maxCol = task.PipelineCol
+ }
+ }
+ for _, task := range tasks {
+ index := int64(task.PipelineRow)*int64(maxCol) + int64(task.PipelineCol)
+ if _, ok := taskIds[index]; !ok {
+ taskIds[index] = struct{}{}
+ result = append(result, task)
+ }
+ }
+ runningTasks.FillProgressDetailToTasks(result)
+ return result, nil
+}
+
+// SpawnTasks create new tasks from the failed ones
+func SpawnTasks(input []models.Task) ([]models.Task, errors.Error) {
+ var result []models.Task
+ for _, task := range input {
+ task.Model = common.Model{}
+ task.Status = models.TASK_CREATED
+ task.Message = ""
+ task.Progress = 0
+ task.ProgressDetail = nil
+ task.FailedSubTask = ""
+ task.BeganAt = nil
+ task.FinishedAt = nil
+ task.SpentSeconds = 0
+ task.SkipOnFail = true
+ result = append(result, task)
+ }
+ err := db.Save(&result).Error
+ if err != nil {
+ taskLog.Error(err, "save task failed")
+ return nil, errors.Internal.Wrap(err, "save task failed")
+ }
+ return result, nil
+}
+
// GetTask FIXME ...
func GetTask(taskId uint64) (*models.Task, errors.Error) {
task := &models.Task{}
@@ -217,19 +272,22 @@ func CancelTask(taskId uint64) errors.Error {
return nil
}
-func runTasksStandalone(parentLogger core.Logger, taskIds []uint64) errors.Error {
+// RunTasksStandalone run tasks in parallel
+func RunTasksStandalone(parentLogger core.Logger, taskIds []uint64) errors.Error {
+ if len(taskIds) == 0 {
+ return nil
+ }
results := make(chan error)
for _, taskId := range taskIds {
- taskId := taskId
- go func() {
- taskLog.Info("run task #%d in background ", taskId)
+ go func(id uint64) {
+ taskLog.Info("run task #%d in background ", id)
var err errors.Error
- taskErr := runTaskStandalone(parentLogger, taskId)
+ taskErr := runTaskStandalone(parentLogger, id)
if taskErr != nil {
- err = errors.Default.Wrap(taskErr, fmt.Sprintf("Error running task %d.", taskId))
+ err = errors.Default.Wrap(taskErr, fmt.Sprintf("Error running task %d.", id))
}
results <- err
- }()
+ }(taskId)
}
errs := make([]error, 0)
var err error
@@ -303,3 +361,12 @@ func getTaskIdFromActivityId(activityId string) (uint64, errors.Error) {
}
return errors.Convert01(strconv.ParseUint(submatches[1], 10, 64))
}
+
+// DeleteCreatedTasks deletes tasks with status `TASK_CREATED`
+func DeleteCreatedTasks(pipelineId uint64) errors.Error {
+ err := db.Where("pipeline_id = ? AND status = ?", pipelineId, models.TASK_CREATED).Delete(&models.Task{}).Error
+ if err != nil {
+ return errors.Convert(err)
+ }
+ return nil
+}