You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@devlake.apache.org by li...@apache.org on 2022/11/15 07:53:42 UTC

[incubator-devlake] branch main updated: feature: skip & rerun failed tasks (#3680)

This is an automated email from the ASF dual-hosted git repository.

likyh pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git


The following commit(s) were added to refs/heads/main by this push:
     new e3f18800b feature: skip & rerun failed tasks (#3680)
e3f18800b is described below

commit e3f18800b061287ef90f16ab8ffe4fcc9511498f
Author: mindlesscloud <li...@merico.dev>
AuthorDate: Tue Nov 15 15:53:39 2022 +0800

    feature: skip & rerun failed tasks (#3680)
    
    * feat: skip/rerun failed tasks
    
    * fix: fix lint error
    
    * fix: remove redundant code and fix comment
---
 api/blueprints/blueprints.go                       |  53 ++------
 api/router.go                                      |   3 +-
 api/task/task.go                                   | 143 +++++++++++++++------
 models/blueprint.go                                |   3 +
 .../migrationscripts/20221107_add_skip_on_fail.go  |  60 +++++++++
 models/migrationscripts/register.go                |   1 +
 models/task.go                                     |   2 +
 plugins/core/plugin_blueprint.go                   |   8 +-
 runner/run_pipeline.go                             |  45 +++----
 runner/run_task.go                                 |   7 +-
 services/blueprint_helper.go                       |   2 +
 services/blueprint_makeplan_v100.go                |   1 +
 services/pipeline.go                               |  22 ++--
 services/pipeline_helper.go                        |   9 ++
 services/pipeline_runner.go                        |  14 +-
 services/task.go                                   |  83 ++++++++++--
 16 files changed, 324 insertions(+), 132 deletions(-)

diff --git a/api/blueprints/blueprints.go b/api/blueprints/blueprints.go
index dd8188158..c91b234f3 100644
--- a/api/blueprints/blueprints.go
+++ b/api/blueprints/blueprints.go
@@ -34,8 +34,8 @@ import (
 // @Accept application/json
 // @Param blueprint body models.Blueprint true "json"
 // @Success 200  {object} models.Blueprint
-// @Failure 400  {string} errcode.Error "Bad Request"
-// @Failure 500  {string} errcode.Error "Internel Error"
+// @Failure 400  {object} shared.ApiBody "Bad Request"
+// @Failure 500  {object} shared.ApiBody "Internal Error"
 // @Router /blueprints [post]
 func Post(c *gin.Context) {
 	blueprint := &models.Blueprint{}
@@ -60,8 +60,8 @@ func Post(c *gin.Context) {
 // @Tags framework/blueprints
 // @Accept application/json
 // @Success 200  {object} gin.H
-// @Failure 400  {string} errcode.Error "Bad Request"
-// @Failure 500  {string} errcode.Error "Internel Error"
+// @Failure 400  {object} shared.ApiBody "Bad Request"
+// @Failure 500  {object} shared.ApiBody "Internal Error"
 // @Router /blueprints [get]
 func Index(c *gin.Context) {
 	var query services.BlueprintQuery
@@ -84,8 +84,8 @@ func Index(c *gin.Context) {
 // @Accept application/json
 // @Param blueprintId path int true "blueprint id"
 // @Success 200  {object} models.Blueprint
-// @Failure 400  {string} errcode.Error "Bad Request"
-// @Failure 500  {string} errcode.Error "Internel Error"
+// @Failure 400  {object} shared.ApiBody "Bad Request"
+// @Failure 500  {object} shared.ApiBody "Internal Error"
 // @Router /blueprints/{blueprintId} [get]
 func Get(c *gin.Context) {
 	blueprintId := c.Param("blueprintId")
@@ -107,8 +107,8 @@ func Get(c *gin.Context) {
 // @Tags framework/blueprints
 // @Param blueprintId path string true "blueprintId"
 // @Success 200
-// @Failure 400  {string} errcode.Error "Bad Request"
-// @Failure 500  {string} errcode.Error "Internel Error"
+// @Failure 400  {object} shared.ApiBody "Bad Request"
+// @Failure 500  {object} shared.ApiBody "Internal Error"
 // @Router /blueprints/{blueprintId} [delete]
 func Delete(c *gin.Context) {
 	pipelineId := c.Param("blueprintId")
@@ -123,39 +123,14 @@ func Delete(c *gin.Context) {
 	}
 }
 
-/*
-func Put(c *gin.Context) {
-	blueprintId := c.Param("blueprintId")
-	id, err := strconv.ParseUint(blueprintId, 10, 64)
-	if err != nil {
-		shared.ApiOutputError(c, err, http.StatusBadRequest)
-		return
-	}
-	editBlueprint := &models.EditBlueprint{}
-	err = c.MustBindWith(editBlueprint, binding.JSON)
-	if err != nil {
-		shared.ApiOutputError(c, err, http.StatusBadRequest)
-		fmt.Println(err)
-		return
-	}
-	editBlueprint.BlueprintId = id
-	blueprint, err := services.ModifyBlueprint(editBlueprint)
-	if err != nil {
-		shared.ApiOutputError(c, err, http.StatusBadRequest)
-		return
-	}
-	shared.ApiOutputSuccess(c, blueprint, http.StatusOK)
-}
-*/
-
 // @Summary patch blueprints
 // @Description patch blueprints
 // @Tags framework/blueprints
 // @Accept application/json
 // @Param blueprintId path string true "blueprintId"
 // @Success 200  {object} models.Blueprint
-// @Failure 400  {string} errcode.Error "Bad Request"
-// @Failure 500  {string} errcode.Error "Internel Error"
+// @Failure 400  {object} shared.ApiBody "Bad Request"
+// @Failure 500  {object} shared.ApiBody "Internal Error"
 // @Router /blueprints/{blueprintId} [Patch]
 func Patch(c *gin.Context) {
 	blueprintId := c.Param("blueprintId")
@@ -184,8 +159,8 @@ func Patch(c *gin.Context) {
 // @Accept application/json
 // @Param blueprintId path string true "blueprintId"
 // @Success 200  {object} models.Pipeline
-// @Failure 400  {string} errcode.Error "Bad Request"
-// @Failure 500  {string} errcode.Error "Internel Error"
+// @Failure 400  {object} shared.ApiBody "Bad Request"
+// @Failure 500  {object} shared.ApiBody "Internal Error"
 // @Router /blueprints/{blueprintId}/trigger [Post]
 func Trigger(c *gin.Context) {
 	blueprintId := c.Param("blueprintId")
@@ -208,8 +183,8 @@ func Trigger(c *gin.Context) {
 // @Accept application/json
 // @Param blueprintId path int true "blueprint id"
 // @Success 200  {object} shared.ResponsePipelines
-// @Failure 400  {string} errcode.Error "Bad Request"
-// @Failure 500  {string} errcode.Error "Internel Error"
+// @Failure 400  {object} shared.ApiBody "Bad Request"
+// @Failure 500  {object} shared.ApiBody "Internal Error"
 // @Router /blueprints/{blueprintId}/pipelines [get]
 func GetBlueprintPipelines(c *gin.Context) {
 	var query services.PipelineQuery
diff --git a/api/router.go b/api/router.go
index 662778429..0122a185a 100644
--- a/api/router.go
+++ b/api/router.go
@@ -49,7 +49,8 @@ func RegisterRouter(r *gin.Engine) {
 	r.GET("/blueprints/:blueprintId", blueprints.Get)
 	r.GET("/blueprints/:blueprintId/pipelines", blueprints.GetBlueprintPipelines)
 	r.DELETE("/pipelines/:pipelineId", pipelines.Delete)
-	r.GET("/pipelines/:pipelineId/tasks", task.Index)
+	r.GET("/pipelines/:pipelineId/tasks", task.GetTaskByPipeline)
+	r.POST("/pipelines/:pipelineId/tasks", task.RerunTask)
 
 	r.GET("/pipelines/:pipelineId/logging.tar.gz", pipelines.DownloadLogs)
 
diff --git a/api/task/task.go b/api/task/task.go
index eca459eba..17bee977a 100644
--- a/api/task/task.go
+++ b/api/task/task.go
@@ -23,66 +23,135 @@ import (
 
 	"github.com/apache/incubator-devlake/api/shared"
 	"github.com/apache/incubator-devlake/errors"
+	"github.com/apache/incubator-devlake/models"
 	"github.com/apache/incubator-devlake/services"
 	"github.com/gin-gonic/gin"
 )
 
-/*
-Get list of pipelines
-GET /pipelines/pipeline:id/tasks?status=TASK_RUNNING&pending=1&page=1&=pagesize=10
-{
-	"tasks": [
-		{"id": 1, "plugin": "", ...}
-	],
-	"count": 5
+func Delete(c *gin.Context) {
+	taskId := c.Param("taskId")
+	id, err := strconv.ParseUint(taskId, 10, 64)
+	if err != nil {
+		shared.ApiOutputError(c, errors.BadInput.Wrap(err, "invalid task ID format"))
+		return
+	}
+	err = services.CancelTask(id)
+	if err != nil {
+		shared.ApiOutputError(c, errors.Default.Wrap(err, "error cancelling task"))
+		return
+	}
+	shared.ApiOutputSuccess(c, nil, http.StatusOK)
 }
-*/
-// @Summary Get task
-// @Description get task
-// @Description SAMPLE
-// @Description {
-// @Description 	"tasks": [
-// @Description 		{"id": 1, "plugin": "", ...}
-// @Description 	],
-// @Description 	"count": 5
-// @Description }
+
+type getTaskResponse struct {
+	Tasks []models.Task `json:"tasks"`
+	Count int           `json:"count"`
+}
+
+// GetTaskByPipeline return most recent tasks
+// @Summary Get tasks, only the most recent tasks will be returned
 // @Tags framework/task
 // @Accept application/json
 // @Param pipelineId path int true "pipelineId"
-// @Success 200  {string} gin.H "{"tasks": tasks, "count": count}"
-// @Failure 400  {string} errcode.Error "Bad Request"
-// @Failure 500  {string} errcode.Error "Internel Error"
+// @Success 200  {object} getTaskResponse
+// @Failure 400  {object} shared.ApiBody "Bad Request"
+// @Failure 500  {object} shared.ApiBody "Internal Error"
 // @Router /pipelines/{pipelineId}/tasks [get]
-func Index(c *gin.Context) {
-	var query services.TaskQuery
-	err := c.ShouldBindQuery(&query)
+func GetTaskByPipeline(c *gin.Context) {
+	pipelineId, err := strconv.ParseUint(c.Param("pipelineId"), 10, 64)
 	if err != nil {
-		shared.ApiOutputError(c, errors.BadInput.Wrap(err, shared.BadRequestBody))
+		shared.ApiOutputError(c, errors.BadInput.Wrap(err, "invalid pipeline ID format"))
 		return
 	}
-	err = c.ShouldBindUri(&query)
-	if err != nil {
-		shared.ApiOutputError(c, errors.BadInput.Wrap(err, "bad request URI format"))
-		return
-	}
-	tasks, count, err := services.GetTasks(&query)
+	tasks, err := services.GetTasksWithLastStatus(pipelineId)
 	if err != nil {
 		shared.ApiOutputError(c, errors.Default.Wrap(err, "error getting tasks"))
 		return
 	}
-	shared.ApiOutputSuccess(c, gin.H{"tasks": tasks, "count": count}, http.StatusOK)
+	shared.ApiOutputSuccess(c, getTaskResponse{Tasks: tasks, Count: len(tasks)}, http.StatusOK)
 }
 
-func Delete(c *gin.Context) {
-	taskId := c.Param("taskId")
-	id, err := strconv.ParseUint(taskId, 10, 64)
+type rerunRequest struct {
+	TaskId uint64 `json:"taskId"`
+}
+
+// RerunTask rerun the specified the task. If taskId is 0, all failed tasks of this pipeline will rerun
+// @Summary rerun tasks
+// @Tags framework/task
+// @Accept application/json
+// @Param pipelineId path int true "pipelineId"
+// @Param request body rerunRequest false "specify the task to rerun. If it's 0, all failed tasks of this pipeline will rerun"
+// @Success 200  {object} shared.ApiBody
+// @Failure 400  {object} shared.ApiBody "Bad Request"
+// @Failure 500  {object} shared.ApiBody "Internal Error"
+// @Router /pipelines/{pipelineId}/tasks [post]
+func RerunTask(c *gin.Context) {
+	var request rerunRequest
+	err := c.BindJSON(&request)
 	if err != nil {
 		shared.ApiOutputError(c, errors.BadInput.Wrap(err, "invalid task ID format"))
 		return
 	}
-	err = services.CancelTask(id)
+	pipelineId, err := strconv.ParseUint(c.Param("pipelineId"), 10, 64)
 	if err != nil {
-		shared.ApiOutputError(c, errors.Default.Wrap(err, "error cancelling task"))
+		shared.ApiOutputError(c, errors.BadInput.Wrap(err, "invalid pipeline ID format"))
+		return
+	}
+	pipeline, err := services.GetPipeline(pipelineId)
+	if err != nil {
+		shared.ApiOutputError(c, errors.Default.Wrap(err, "error get pipeline"))
+		return
+	}
+	if pipeline.Status == models.TASK_RUNNING {
+		shared.ApiOutputError(c, errors.BadInput.New("pipeline is running"))
+		return
+	}
+	if pipeline.Status == models.TASK_CREATED || pipeline.Status == models.TASK_RERUN {
+		shared.ApiOutputError(c, errors.BadInput.New("pipeline is waiting to run"))
+		return
+	}
+
+	var failedTasks []models.Task
+	if request.TaskId > 0 {
+		failedTask, err := services.GetTask(request.TaskId)
+		if err != nil || failedTask == nil {
+			shared.ApiOutputError(c, errors.Default.Wrap(err, "error getting failed task"))
+			return
+		}
+		if failedTask.PipelineId != pipelineId {
+			shared.ApiOutputError(c, errors.BadInput.New("the task ID and pipeline ID doesn't match"))
+			return
+		}
+		failedTasks = append(failedTasks, *failedTask)
+	} else {
+		tasks, err := services.GetTasksWithLastStatus(pipelineId)
+		if err != nil {
+			shared.ApiOutputError(c, errors.Default.Wrap(err, "error getting tasks"))
+			return
+		}
+		for _, task := range tasks {
+			if task.Status == models.TASK_FAILED {
+				failedTasks = append(failedTasks, task)
+			}
+		}
+	}
+	if len(failedTasks) == 0 {
+		shared.ApiOutputSuccess(c, nil, http.StatusOK)
+		return
+	}
+	err = services.DeleteCreatedTasks(pipelineId)
+	if err != nil {
+		shared.ApiOutputError(c, errors.Default.Wrap(err, "error delete tasks"))
+		return
+	}
+	_, err = services.SpawnTasks(failedTasks)
+	if err != nil {
+		shared.ApiOutputError(c, errors.Default.Wrap(err, "error create tasks"))
+		return
+	}
+	err = services.UpdateDbPipelineStatus(pipelineId, models.TASK_RERUN)
+	if err != nil {
+		shared.ApiOutputError(c, errors.Default.Wrap(err, "error create tasks"))
 		return
 	}
 	shared.ApiOutputSuccess(c, nil, http.StatusOK)
diff --git a/models/blueprint.go b/models/blueprint.go
index 12a93e995..18abab73e 100644
--- a/models/blueprint.go
+++ b/models/blueprint.go
@@ -39,12 +39,14 @@ type Blueprint struct {
 	//please check this https://crontab.guru/ for detail
 	CronConfig   string          `json:"cronConfig" format:"* * * * *" example:"0 0 * * 1"`
 	IsManual     bool            `json:"isManual"`
+	SkipOnFail   bool            `json:"skipOnFail"`
 	Settings     json.RawMessage `json:"settings" swaggertype:"array,string" example:"please check api: /blueprints/<PLUGIN_NAME>/blueprint-setting"`
 	common.Model `swaggerignore:"true"`
 }
 
 type BlueprintSettings struct {
 	Version     string          `json:"version" validate:"required,semver,oneof=1.0.0"`
+	SkipOnFail  bool            `json:"skipOnFail"`
 	Connections json.RawMessage `json:"connections" validate:"required"`
 	BeforePlan  json.RawMessage `json:"before_plan"`
 	AfterPlan   json.RawMessage `json:"after_plan"`
@@ -69,6 +71,7 @@ type DbBlueprint struct {
 	//please check this https://crontab.guru/ for detail
 	CronConfig   string `json:"cronConfig" format:"* * * * *" example:"0 0 * * 1"`
 	IsManual     bool   `json:"isManual"`
+	SkipOnFail   bool   `json:"skipOnFail"`
 	Settings     string `json:"settings" encrypt:"yes" swaggertype:"array,string" example:"please check api: /blueprints/<PLUGIN_NAME>/blueprint-setting"`
 	common.Model `swaggerignore:"true"`
 }
diff --git a/models/migrationscripts/20221107_add_skip_on_fail.go b/models/migrationscripts/20221107_add_skip_on_fail.go
new file mode 100644
index 000000000..71942ea55
--- /dev/null
+++ b/models/migrationscripts/20221107_add_skip_on_fail.go
@@ -0,0 +1,60 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package migrationscripts
+
+import (
+	"github.com/apache/incubator-devlake/errors"
+	"github.com/apache/incubator-devlake/helpers/migrationhelper"
+	"github.com/apache/incubator-devlake/plugins/core"
+)
+
+var _ core.MigrationScript = (*addSkipOnFail)(nil)
+
+type blueprint20221107 struct {
+	SkipOnFail bool
+}
+
+func (p blueprint20221107) TableName() string {
+	return "_devlake_blueprints"
+}
+
+type task20221107 struct {
+	SkipOnFail bool
+}
+
+func (t task20221107) TableName() string {
+	return "_devlake_tasks"
+}
+
+type addSkipOnFail struct{}
+
+func (*addSkipOnFail) Up(basicRes core.BasicRes) errors.Error {
+	return migrationhelper.AutoMigrateTables(
+		basicRes,
+		&blueprint20221107{},
+		&task20221107{},
+	)
+}
+
+func (*addSkipOnFail) Version() uint64 {
+	return 20221107095744
+}
+
+func (*addSkipOnFail) Name() string {
+	return "add skip_on_fail to _devlake_pipelines and _devlake_tasks"
+}
diff --git a/models/migrationscripts/register.go b/models/migrationscripts/register.go
index 4144b2d3d..7acf5e669 100644
--- a/models/migrationscripts/register.go
+++ b/models/migrationscripts/register.go
@@ -54,6 +54,7 @@ func All() []core.MigrationScript {
 		new(createCollectorState),
 		new(removeCicdPipelineRelation),
 		new(addCicdScope),
+		new(addSkipOnFail),
 		new(modifyCommitsDiffs),
 	}
 }
diff --git a/models/task.go b/models/task.go
index 120057633..2f4a85b1e 100644
--- a/models/task.go
+++ b/models/task.go
@@ -27,6 +27,7 @@ import (
 
 const (
 	TASK_CREATED   = "TASK_CREATED"
+	TASK_RERUN     = "TASK_RERUN"
 	TASK_RUNNING   = "TASK_RUNNING"
 	TASK_COMPLETED = "TASK_COMPLETED"
 	TASK_FAILED    = "TASK_FAILED"
@@ -58,6 +59,7 @@ type Task struct {
 	BeganAt       *time.Time `json:"beganAt"`
 	FinishedAt    *time.Time `json:"finishedAt" gorm:"index"`
 	SpentSeconds  int        `json:"spentSeconds"`
+	SkipOnFail    bool       `json:"-"`
 }
 
 type NewTask struct {
diff --git a/plugins/core/plugin_blueprint.go b/plugins/core/plugin_blueprint.go
index cb1d8f7ee..3d8ca0583 100644
--- a/plugins/core/plugin_blueprint.go
+++ b/plugins/core/plugin_blueprint.go
@@ -25,9 +25,10 @@ import (
 // PipelineTask represents a smallest unit of execution inside a PipelinePlan
 type PipelineTask struct {
 	// Plugin name
-	Plugin   string                 `json:"plugin" binding:"required"`
-	Subtasks []string               `json:"subtasks"`
-	Options  map[string]interface{} `json:"options"`
+	Plugin     string                 `json:"plugin" binding:"required"`
+	SkipOnFail bool                   `json:"skipOnFail"`
+	Subtasks   []string               `json:"subtasks"`
+	Options    map[string]interface{} `json:"options"`
 }
 
 // PipelineStage consist of multiple PipelineTasks, they will be executed in parallel
@@ -51,6 +52,7 @@ type PluginBlueprintV100 interface {
 type BlueprintConnectionV100 struct {
 	Plugin       string                `json:"plugin" validate:"required"`
 	ConnectionId uint64                `json:"connectionId" validate:"required"`
+	SkipOnFail   bool                  `json:"skipOnFail"`
 	Scope        []*BlueprintScopeV100 `json:"scope" validate:"required"`
 }
 
diff --git a/runner/run_pipeline.go b/runner/run_pipeline.go
index cbaa64ff8..32b85d69a 100644
--- a/runner/run_pipeline.go
+++ b/runner/run_pipeline.go
@@ -18,11 +18,9 @@ limitations under the License.
 package runner
 
 import (
-	"fmt"
 	"time"
 
 	"github.com/apache/incubator-devlake/errors"
-
 	"github.com/apache/incubator-devlake/models"
 	"github.com/apache/incubator-devlake/plugins/core"
 	"github.com/spf13/viper"
@@ -37,23 +35,12 @@ func RunPipeline(
 	pipelineId uint64,
 	runTasks func([]uint64) errors.Error,
 ) errors.Error {
-	startTime := time.Now()
-	// load pipeline from db
-	dbPipeline := &models.DbPipeline{}
-	err := db.Find(dbPipeline, pipelineId).Error
-	if err != nil {
-		return errors.Convert(err)
-	}
 	// load tasks for pipeline
-	var tasks []*models.Task
-	err = db.Where("pipeline_id = ?", dbPipeline.ID).Order("pipeline_row, pipeline_col").Find(&tasks).Error
+	var tasks []models.Task
+	err := db.Where("pipeline_id = ? AND status = ?", pipelineId, models.TASK_CREATED).Order("pipeline_row, pipeline_col").Find(&tasks).Error
 	if err != nil {
 		return errors.Convert(err)
 	}
-	if len(tasks) != dbPipeline.TotalTasks {
-		return errors.Internal.New(fmt.Sprintf("expected total tasks to be %v, got %v", dbPipeline.TotalTasks, len(tasks)))
-	}
-	// convert to 2d array
 	taskIds := make([][]uint64, 0)
 	for _, task := range tasks {
 		for len(taskIds) < task.PipelineRow {
@@ -61,19 +48,25 @@ func RunPipeline(
 		}
 		taskIds[task.PipelineRow-1] = append(taskIds[task.PipelineRow-1], task.ID)
 	}
+	return runPipelineTasks(log, db, pipelineId, taskIds, runTasks)
+}
 
-	beganAt := time.Now()
-	err = db.Model(dbPipeline).Updates(map[string]interface{}{
-		"status":   models.TASK_RUNNING,
-		"message":  "",
-		"began_at": beganAt,
-	}).Error
+func runPipelineTasks(
+	log core.Logger,
+	db *gorm.DB,
+	pipelineId uint64,
+	taskIds [][]uint64,
+	runTasks func([]uint64) errors.Error,
+) errors.Error {
+	// load pipeline from db
+	dbPipeline := &models.DbPipeline{}
+	err := db.Find(dbPipeline, pipelineId).Error
 	if err != nil {
 		return errors.Convert(err)
 	}
+
 	// This double for loop executes each set of tasks sequentially while
 	// executing the set of tasks concurrently.
-	finishedTasks := 0
 	for i, row := range taskIds {
 		// update stage
 		err = db.Model(dbPipeline).Updates(map[string]interface{}{
@@ -90,18 +83,16 @@ func RunPipeline(
 			log.Error(err, "run tasks failed")
 			return errors.Convert(err)
 		}
-		// Deprecated
+
 		// update finishedTasks
-		finishedTasks += len(row)
 		err = db.Model(dbPipeline).Updates(map[string]interface{}{
-			"finished_tasks": finishedTasks,
+			"finished_tasks": gorm.Expr("finished_tasks + ?", len(row)),
 		}).Error
 		if err != nil {
 			log.Error(err, "update pipeline state failed")
 			return errors.Convert(err)
 		}
 	}
-	endTime := time.Now()
-	log.Info("pipeline finished in %d ms: %v", endTime.UnixMilli()-startTime.UnixMilli(), err)
+	log.Info("pipeline finished in %d ms: %v", time.Now().UnixMilli()-dbPipeline.BeganAt.UnixMilli(), err)
 	return errors.Convert(err)
 }
diff --git a/runner/run_task.go b/runner/run_task.go
index b1568f7a5..8e6bedc58 100644
--- a/runner/run_task.go
+++ b/runner/run_task.go
@@ -66,7 +66,9 @@ func RunTask(
 			default:
 				e = fmt.Errorf("%v", et)
 			}
-			err = errors.Default.Wrap(e, fmt.Sprintf("run task failed with panic (%s)", utils.GatherCallFrames(0)))
+			if !task.SkipOnFail {
+				err = errors.Default.Wrap(e, fmt.Sprintf("run task failed with panic (%s)", utils.GatherCallFrames(0)))
+			}
 		}
 		finishedAt := time.Now()
 		spentSeconds := finishedAt.Unix() - beganAt.Unix()
@@ -135,6 +137,9 @@ func RunTask(
 		options,
 		progress,
 	)
+	if err != nil && task.SkipOnFail {
+		return nil
+	}
 	return err
 }
 
diff --git a/services/blueprint_helper.go b/services/blueprint_helper.go
index 393735ffb..8a56bb710 100644
--- a/services/blueprint_helper.go
+++ b/services/blueprint_helper.go
@@ -90,6 +90,7 @@ func parseBlueprint(DbBlueprint *models.DbBlueprint) *models.Blueprint {
 		Enable:     DbBlueprint.Enable,
 		CronConfig: DbBlueprint.CronConfig,
 		IsManual:   DbBlueprint.IsManual,
+		SkipOnFail: DbBlueprint.SkipOnFail,
 		Settings:   []byte(DbBlueprint.Settings),
 		Model:      DbBlueprint.Model,
 	}
@@ -105,6 +106,7 @@ func parseDbBlueprint(blueprint *models.Blueprint) *models.DbBlueprint {
 		Enable:     blueprint.Enable,
 		CronConfig: blueprint.CronConfig,
 		IsManual:   blueprint.IsManual,
+		SkipOnFail: blueprint.SkipOnFail,
 		Settings:   string(blueprint.Settings),
 		Model:      blueprint.Model,
 	}
diff --git a/services/blueprint_makeplan_v100.go b/services/blueprint_makeplan_v100.go
index f3bd47d64..4c8be942e 100644
--- a/services/blueprint_makeplan_v100.go
+++ b/services/blueprint_makeplan_v100.go
@@ -54,6 +54,7 @@ func GeneratePlanJsonV100(settings *models.BlueprintSettings) (core.PipelinePlan
 		}
 		for _, stage := range plans[i] {
 			for _, task := range stage {
+				task.SkipOnFail = settings.SkipOnFail
 				if task.Plugin == "dora" {
 					hasDoraEnrich = true
 					for k, v := range task.Options {
diff --git a/services/pipeline.go b/services/pipeline.go
index 615218b37..5bcbf3fd3 100644
--- a/services/pipeline.go
+++ b/services/pipeline.go
@@ -155,7 +155,6 @@ func GetPipelineLogsArchivePath(pipeline *models.Pipeline) (string, errors.Error
 // RunPipelineInQueue query pipeline from db and run it in a queue
 func RunPipelineInQueue(pipelineMaxParallel int64) {
 	sema := semaphore.NewWeighted(pipelineMaxParallel)
-	startedPipelineIds := []uint64{}
 	for {
 		globalPipelineLog.Info("acquire lock")
 		// start goroutine when sema lock ready and pipeline exist.
@@ -168,24 +167,27 @@ func RunPipelineInQueue(pipelineMaxParallel int64) {
 		dbPipeline := &models.DbPipeline{}
 		for {
 			cronLocker.Lock()
-			db.Where("status = ?", models.TASK_CREATED).
-				Not(startedPipelineIds).
+			db.Where("status IN ?", []string{models.TASK_CREATED, models.TASK_RERUN}).
 				Order("id ASC").Limit(1).Find(dbPipeline)
 			cronLocker.Unlock()
 			if dbPipeline.ID != 0 {
+				db.Model(&models.DbPipeline{}).Where("id = ?", dbPipeline.ID).Updates(map[string]interface{}{
+					"status":   models.TASK_RUNNING,
+					"message":  "",
+					"began_at": time.Now(),
+				})
 				break
 			}
 			time.Sleep(time.Second)
 		}
-		startedPipelineIds = append(startedPipelineIds, dbPipeline.ID)
-		go func() {
+		go func(pipelineId uint64) {
 			defer sema.Release(1)
-			globalPipelineLog.Info("run pipeline, %d", dbPipeline.ID)
-			err = runPipeline(dbPipeline.ID)
+			globalPipelineLog.Info("run pipeline, %d", pipelineId)
+			err = runPipeline(pipelineId)
 			if err != nil {
-				globalPipelineLog.Error(err, "failed to run pipeline %d", dbPipeline.ID)
+				globalPipelineLog.Error(err, "failed to run pipeline %d", pipelineId)
 			}
-		}()
+		}(dbPipeline.ID)
 	}
 }
 
@@ -329,7 +331,7 @@ func CancelPipeline(pipelineId uint64) errors.Error {
 
 // getPipelineLogsPath gets the logs directory of this pipeline
 func getPipelineLogsPath(pipeline *models.Pipeline) (string, errors.Error) {
-	pipelineLog := getPipelineLogger(pipeline)
+	pipelineLog := GetPipelineLogger(pipeline)
 	path := filepath.Dir(pipelineLog.GetConfig().Path)
 	_, err := os.Stat(path)
 	if err == nil {
diff --git a/services/pipeline_helper.go b/services/pipeline_helper.go
index 26661d460..a8e38dc18 100644
--- a/services/pipeline_helper.go
+++ b/services/pipeline_helper.go
@@ -196,3 +196,12 @@ func decryptDbPipeline(dbPipeline *models.DbPipeline) (*models.DbPipeline, error
 	dbPipeline.Plan = plan
 	return dbPipeline, nil
 }
+
+// UpdateDbPipelineStatus update the status of pipeline
+func UpdateDbPipelineStatus(pipelineId uint64, status string) errors.Error {
+	err := db.Model(&models.DbPipeline{}).Where("id = ?", pipelineId).Update("status", status).Error
+	if err != nil {
+		return errors.Convert(err)
+	}
+	return nil
+}
diff --git a/services/pipeline_runner.go b/services/pipeline_runner.go
index 0b7a8b27e..0ec968704 100644
--- a/services/pipeline_runner.go
+++ b/services/pipeline_runner.go
@@ -21,6 +21,8 @@ import (
 	"context"
 	"encoding/json"
 	"fmt"
+	"time"
+
 	"github.com/apache/incubator-devlake/errors"
 	"github.com/apache/incubator-devlake/logger"
 	"github.com/apache/incubator-devlake/models"
@@ -28,7 +30,6 @@ import (
 	"github.com/apache/incubator-devlake/runner"
 	"github.com/apache/incubator-devlake/worker/app"
 	"go.temporal.io/sdk/client"
-	"time"
 )
 
 type pipelineRunner struct {
@@ -43,7 +44,7 @@ func (p *pipelineRunner) runPipelineStandalone() errors.Error {
 		db,
 		p.pipeline.ID,
 		func(taskIds []uint64) errors.Error {
-			return runTasksStandalone(p.logger, taskIds)
+			return RunTasksStandalone(p.logger, taskIds)
 		},
 	)
 }
@@ -79,7 +80,8 @@ func (p *pipelineRunner) runPipelineViaTemporal() errors.Error {
 	return errors.Convert(err)
 }
 
-func getPipelineLogger(pipeline *models.Pipeline) core.Logger {
+// GetPipelineLogger returns logger for the pipeline
+func GetPipelineLogger(pipeline *models.Pipeline) core.Logger {
 	pipelineLogger := globalPipelineLog.Nested(
 		fmt.Sprintf("pipeline #%d", pipeline.ID),
 	)
@@ -98,13 +100,13 @@ func getPipelineLogger(pipeline *models.Pipeline) core.Logger {
 
 // runPipeline start a pipeline actually
 func runPipeline(pipelineId uint64) errors.Error {
-	pipeline, err := GetPipeline(pipelineId)
+	ppl, err := GetPipeline(pipelineId)
 	if err != nil {
 		return err
 	}
 	pipelineRun := pipelineRunner{
-		logger:   getPipelineLogger(pipeline),
-		pipeline: pipeline,
+		logger:   GetPipelineLogger(ppl),
+		pipeline: ppl,
 	}
 	// run
 	if temporalClient != nil {
diff --git a/services/task.go b/services/task.go
index c21acdbc8..022450488 100644
--- a/services/task.go
+++ b/services/task.go
@@ -27,9 +27,9 @@ import (
 	"sync"
 
 	"github.com/apache/incubator-devlake/errors"
-
 	"github.com/apache/incubator-devlake/logger"
 	"github.com/apache/incubator-devlake/models"
+	"github.com/apache/incubator-devlake/models/common"
 	"github.com/apache/incubator-devlake/plugins/core"
 	"github.com/apache/incubator-devlake/runner"
 	"gorm.io/gorm"
@@ -153,6 +153,7 @@ func CreateTask(newTask *models.NewTask) (*models.Task, errors.Error) {
 		PipelineId:  newTask.PipelineId,
 		PipelineRow: newTask.PipelineRow,
 		PipelineCol: newTask.PipelineCol,
+		SkipOnFail:  newTask.SkipOnFail,
 	}
 	err = db.Save(&task).Error
 	if err != nil {
@@ -194,6 +195,60 @@ func GetTasks(query *TaskQuery) ([]models.Task, int64, errors.Error) {
 	return tasks, count, nil
 }
 
+// GetTasksWithLastStatus returns task list of the pipeline, only the most recently tasks would be returned
+func GetTasksWithLastStatus(pipelineId uint64) ([]models.Task, errors.Error) {
+	var tasks []models.Task
+	dbInner := db.Model(&models.Task{}).Order("id DESC").Where("pipeline_id = ?", pipelineId)
+	err := dbInner.Find(&tasks).Error
+	if err != nil {
+		return nil, errors.Convert(err)
+	}
+	taskIds := make(map[int64]struct{})
+	var result []models.Task
+	var maxRow, maxCol int
+	for _, task := range tasks {
+		if task.PipelineRow > maxRow {
+			maxRow = task.PipelineRow
+		}
+		if task.PipelineCol > maxCol {
+			maxCol = task.PipelineCol
+		}
+	}
+	for _, task := range tasks {
+		index := int64(task.PipelineRow)*int64(maxCol) + int64(task.PipelineCol)
+		if _, ok := taskIds[index]; !ok {
+			taskIds[index] = struct{}{}
+			result = append(result, task)
+		}
+	}
+	runningTasks.FillProgressDetailToTasks(result)
+	return result, nil
+}
+
+// SpawnTasks create new tasks from the failed ones
+func SpawnTasks(input []models.Task) ([]models.Task, errors.Error) {
+	var result []models.Task
+	for _, task := range input {
+		task.Model = common.Model{}
+		task.Status = models.TASK_CREATED
+		task.Message = ""
+		task.Progress = 0
+		task.ProgressDetail = nil
+		task.FailedSubTask = ""
+		task.BeganAt = nil
+		task.FinishedAt = nil
+		task.SpentSeconds = 0
+		task.SkipOnFail = true
+		result = append(result, task)
+	}
+	err := db.Save(&result).Error
+	if err != nil {
+		taskLog.Error(err, "save task failed")
+		return nil, errors.Internal.Wrap(err, "save task failed")
+	}
+	return result, nil
+}
+
 // GetTask FIXME ...
 func GetTask(taskId uint64) (*models.Task, errors.Error) {
 	task := &models.Task{}
@@ -217,19 +272,22 @@ func CancelTask(taskId uint64) errors.Error {
 	return nil
 }
 
-func runTasksStandalone(parentLogger core.Logger, taskIds []uint64) errors.Error {
+// RunTasksStandalone run tasks in parallel
+func RunTasksStandalone(parentLogger core.Logger, taskIds []uint64) errors.Error {
+	if len(taskIds) == 0 {
+		return nil
+	}
 	results := make(chan error)
 	for _, taskId := range taskIds {
-		taskId := taskId
-		go func() {
-			taskLog.Info("run task #%d in background ", taskId)
+		go func(id uint64) {
+			taskLog.Info("run task #%d in background ", id)
 			var err errors.Error
-			taskErr := runTaskStandalone(parentLogger, taskId)
+			taskErr := runTaskStandalone(parentLogger, id)
 			if taskErr != nil {
-				err = errors.Default.Wrap(taskErr, fmt.Sprintf("Error running task %d.", taskId))
+				err = errors.Default.Wrap(taskErr, fmt.Sprintf("Error running task %d.", id))
 			}
 			results <- err
-		}()
+		}(taskId)
 	}
 	errs := make([]error, 0)
 	var err error
@@ -303,3 +361,12 @@ func getTaskIdFromActivityId(activityId string) (uint64, errors.Error) {
 	}
 	return errors.Convert01(strconv.ParseUint(submatches[1], 10, 64))
 }
+
+// DeleteCreatedTasks deletes tasks with status `TASK_CREATED`
+func DeleteCreatedTasks(pipelineId uint64) errors.Error {
+	err := db.Where("pipeline_id = ? AND status = ?", pipelineId, models.TASK_CREATED).Delete(&models.Task{}).Error
+	if err != nil {
+		return errors.Convert(err)
+	}
+	return nil
+}