You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@devlake.apache.org by kl...@apache.org on 2022/11/24 02:47:50 UTC

[incubator-devlake] branch main updated: Issues/3234 add parallelLabels for blueprint/pipeline (#3764)

This is an automated email from the ASF dual-hosted git repository.

klesh pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git


The following commit(s) were added to refs/heads/main by this push:
     new 3b61d4ab7 Issues/3234 add parallelLabels for blueprint/pipeline (#3764)
3b61d4ab7 is described below

commit 3b61d4ab76fe85f45348c35d7515372c1d4d0a3c
Author: Likyh <ya...@meri.co>
AuthorDate: Thu Nov 24 10:47:45 2022 +0800

    Issues/3234 add parallelLabels for blueprint/pipeline (#3764)
    
    * fix: delete a unuse var
    
    * feat: add parallelLabels in pipeline
    
    * feat: add parallel_label in pipeline api
    
    * feat: add parallel labels for blueprint
    
    * fix: fix for linter
    
    * fix: fix some bug found by api docs
    
    * feat: use common label to replace parallel
    
    * fix: fix a bug
    
    * fix: add lock for review
    
    * fix: fix for review, add Labels in the model DbPipeline and DbBlueprint.
    
    * fix: use tab instead of space in sql
---
 api/blueprints/blueprints.go                       | 10 ++-
 api/pipelines/pipelines.go                         | 12 +--
 config-ui/src/hooks/usePipelineManager.jsx         | 10 +--
 .../src/pages/blueprints/blueprint-detail.jsx      |  2 +-
 .../src/pages/blueprints/create-blueprint.jsx      |  2 +-
 models/blueprint.go                                | 16 +++-
 models/migrationscripts/20221115_add_labels.go     | 61 ++++++++++++++
 models/migrationscripts/register.go                |  1 +
 models/pipeline.go                                 | 15 ++++
 plugins/helper/api_async_client.go                 |  1 -
 plugins/helper/worker_scheduler.go                 |  1 -
 plugins/helper/worker_scheduler_test.go            |  2 +-
 services/blueprint.go                              | 58 +++++++-------
 services/blueprint_helper.go                       | 93 +++++++++++++++++-----
 services/pipeline.go                               | 53 +++++++++---
 services/pipeline_helper.go                        | 71 ++++++++++++++---
 services/pipeline_runner.go                        | 21 ++---
 utils/slice.go                                     | 37 +++++++++
 utils/slice_test.go                                | 30 +++++++
 19 files changed, 402 insertions(+), 94 deletions(-)

diff --git a/api/blueprints/blueprints.go b/api/blueprints/blueprints.go
index 517f6c30e..a28642672 100644
--- a/api/blueprints/blueprints.go
+++ b/api/blueprints/blueprints.go
@@ -29,8 +29,8 @@ import (
 )
 
 type PaginatedBlueprint struct {
-	Blueprints []*models.Blueprint
-	Count      int64
+	Blueprints []*models.Blueprint `json:"blueprints"`
+	Count      int64               `json:"count"`
 }
 
 // @Summary post blueprints
@@ -63,7 +63,11 @@ func Post(c *gin.Context) {
 // @Summary get blueprints
 // @Description get blueprints
 // @Tags framework/blueprints
-// @Accept application/json
+// @Param enable query bool false "enable"
+// @Param is_manual query bool false "is_manual"
+// @Param page query int false "page"
+// @Param page_size query int false "page_size"
+// @Param label query string false "label"
 // @Success 200  {object} PaginatedBlueprint
 // @Failure 400  {object} shared.ApiBody "Bad Request"
 // @Failure 500  {object} shared.ApiBody "Internal Error"
diff --git a/api/pipelines/pipelines.go b/api/pipelines/pipelines.go
index 4a144f124..c35ab5d19 100644
--- a/api/pipelines/pipelines.go
+++ b/api/pipelines/pipelines.go
@@ -81,12 +81,14 @@ GET /pipelines?status=TASK_RUNNING&pending=1&page=1&pagesize=10
 */
 
 // @Summary Get list of pipelines
-// @Description GET /pipelines?status=TASK_RUNNING&pending=1&page=1&pagesize=10
+// @Description GET /pipelines?status=TASK_RUNNING&pending=1&label=search_text&page=1&pagesize=10
 // @Tags framework/pipelines
-// @Param status query string true "query"
-// @Param pending query int true "query"
-// @Param page query int true "query"
-// @Param pagesize query int true "query"
+// @Param status query string false "status"
+// @Param pending query int false "pending"
+// @Param page query int false "page"
+// @Param pagesize query int false "pagesize"
+// @Param blueprint_id query int false "blueprint_id"
+// @Param label query string false "label"
 // @Success 200  {object} shared.ResponsePipelines
 // @Failure 400  {string} errcode.Error "Bad Request"
 // @Failure 500  {string} errcode.Error "Internel Error"
diff --git a/config-ui/src/hooks/usePipelineManager.jsx b/config-ui/src/hooks/usePipelineManager.jsx
index c9e3ab17f..fdbd7647e 100644
--- a/config-ui/src/hooks/usePipelineManager.jsx
+++ b/config-ui/src/hooks/usePipelineManager.jsx
@@ -57,21 +57,19 @@ function usePipelineManager(
   const [logfile, setLogfile] = useState('logging.tar.gz')
 
   const runPipeline = useCallback(
-    (runSettings = null) => {
-      console.log('>> RUNNING PIPELINE....')
+    (blueprintId) => {
+      console.log('>> RUNNING PIPELINE....', blueprintId)
       try {
         setIsRunning(true)
         setErrors([])
         ToastNotification.clear()
-        console.log('>> DISPATCHING PIPELINE REQUEST', runSettings || settings)
         const run = async () => {
           // @todo: remove "ID" fallback key when no longer needed
           const p = await request.post(
-            `${DEVLAKE_ENDPOINT}/pipelines`,
-            runSettings || settings
+            `${DEVLAKE_ENDPOINT}/blueprints/${blueprintId}/trigger`
           )
           const t = await request.get(
-            `${DEVLAKE_ENDPOINT}/pipelines/${p.data?.ID || p.data?.id}/tasks`
+            `${DEVLAKE_ENDPOINT}/pipelines/${p.data?.id}/tasks`
           )
           console.log('>> RAW PIPELINE DATA FROM API...', p.data)
           setPipelineRun({
diff --git a/config-ui/src/pages/blueprints/blueprint-detail.jsx b/config-ui/src/pages/blueprints/blueprint-detail.jsx
index 5d77b5368..72929b968 100644
--- a/config-ui/src/pages/blueprints/blueprint-detail.jsx
+++ b/config-ui/src/pages/blueprints/blueprint-detail.jsx
@@ -134,7 +134,7 @@ const BlueprintDetail = (props) => {
 
   const runBlueprint = useCallback(() => {
     if (activeBlueprint !== null) {
-      runPipeline()
+      runPipeline(activeBlueprint.id)
     }
   }, [activeBlueprint, runPipeline])
 
diff --git a/config-ui/src/pages/blueprints/create-blueprint.jsx b/config-ui/src/pages/blueprints/create-blueprint.jsx
index f8cdf75fd..11d1b3cef 100644
--- a/config-ui/src/pages/blueprints/create-blueprint.jsx
+++ b/config-ui/src/pages/blueprints/create-blueprint.jsx
@@ -812,7 +812,7 @@ const CreateBlueprint = (props) => {
         blueprintId: saveBlueprintComplete?.id,
         plan: saveBlueprintComplete?.plan
       }
-      runPipeline(newPipelineConfiguration)
+      runPipeline(saveBlueprintComplete?.id)
       setRunNow(false)
       history.push(`/blueprints/detail/${saveBlueprintComplete?.id}`)
     } else if (newBlueprintId) {
diff --git a/models/blueprint.go b/models/blueprint.go
index 4d00a0931..f223330ff 100644
--- a/models/blueprint.go
+++ b/models/blueprint.go
@@ -19,9 +19,9 @@ package models
 
 import (
 	"encoding/json"
+	"time"
 
 	"github.com/apache/incubator-devlake/errors"
-
 	"github.com/apache/incubator-devlake/models/common"
 	"github.com/apache/incubator-devlake/plugins/core"
 )
@@ -42,6 +42,7 @@ type Blueprint struct {
 	CronConfig   string          `json:"cronConfig" format:"* * * * *" example:"0 0 * * 1"`
 	IsManual     bool            `json:"isManual"`
 	SkipOnFail   bool            `json:"skipOnFail"`
+	Labels       []string        `json:"labels"`
 	Settings     json.RawMessage `json:"settings" swaggertype:"array,string" example:"please check api: /blueprints/<PLUGIN_NAME>/blueprint-setting"`
 	common.Model `swaggerignore:"true"`
 }
@@ -77,8 +78,21 @@ type DbBlueprint struct {
 	SkipOnFail   bool   `json:"skipOnFail"`
 	Settings     string `json:"settings" encrypt:"yes" swaggertype:"array,string" example:"please check api: /blueprints/<PLUGIN_NAME>/blueprint-setting"`
 	common.Model `swaggerignore:"true"`
+
+	Labels []DbBlueprintLabel `json:"-" gorm:"-"`
 }
 
 func (DbBlueprint) TableName() string {
 	return "_devlake_blueprints"
 }
+
+type DbBlueprintLabel struct {
+	CreatedAt   time.Time `json:"createdAt"`
+	UpdatedAt   time.Time `json:"updatedAt"`
+	BlueprintId uint64    `json:"blueprint_id" gorm:"primaryKey"`
+	Name        string    `json:"name" gorm:"primaryKey;index"`
+}
+
+func (DbBlueprintLabel) TableName() string {
+	return "_devlake_blueprint_labels"
+}
diff --git a/models/migrationscripts/20221115_add_labels.go b/models/migrationscripts/20221115_add_labels.go
new file mode 100644
index 000000000..0c82066bf
--- /dev/null
+++ b/models/migrationscripts/20221115_add_labels.go
@@ -0,0 +1,61 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package migrationscripts
+
+import (
+	"github.com/apache/incubator-devlake/errors"
+	"github.com/apache/incubator-devlake/helpers/migrationhelper"
+	"github.com/apache/incubator-devlake/plugins/core"
+	"time"
+)
+
+type DbPipelineLabel20221115 struct {
+	CreatedAt  time.Time `json:"createdAt"`
+	UpdatedAt  time.Time `json:"updatedAt"`
+	PipelineId uint64    `json:"pipeline_id" gorm:"primaryKey"`
+	Name       string    `json:"name" gorm:"primaryKey;index"`
+}
+
+func (DbPipelineLabel20221115) TableName() string {
+	return "_devlake_pipeline_labels"
+}
+
+type DbBlueprintLabel20221115 struct {
+	CreatedAt   time.Time `json:"createdAt"`
+	UpdatedAt   time.Time `json:"updatedAt"`
+	BlueprintId uint64    `json:"blueprint_id" gorm:"primaryKey"`
+	Name        string    `json:"name" gorm:"primaryKey;index"`
+}
+
+func (DbBlueprintLabel20221115) TableName() string {
+	return "_devlake_blueprint_labels"
+}
+
+type addLabels struct{}
+
+func (*addLabels) Up(res core.BasicRes) errors.Error {
+	return migrationhelper.AutoMigrateTables(res, &DbPipelineLabel20221115{}, &DbBlueprintLabel20221115{})
+}
+
+func (*addLabels) Version() uint64 {
+	return 20221115000034
+}
+
+func (*addLabels) Name() string {
+	return "add labels' schema for blueprint and pipeline"
+}
diff --git a/models/migrationscripts/register.go b/models/migrationscripts/register.go
index 00478e238..943616df1 100644
--- a/models/migrationscripts/register.go
+++ b/models/migrationscripts/register.go
@@ -60,5 +60,6 @@ func All() []core.MigrationScript {
 		new(addProjectTables),
 		new(addProjectToBluePrint),
 		new(addProjectIssueMetric),
+		new(addLabels),
 	}
 }
diff --git a/models/pipeline.go b/models/pipeline.go
index e3d56465e..7427c2338 100644
--- a/models/pipeline.go
+++ b/models/pipeline.go
@@ -39,6 +39,7 @@ type Pipeline struct {
 	Message       string         `json:"message"`
 	SpentSeconds  int            `json:"spentSeconds"`
 	Stage         int            `json:"stage"`
+	Labels        []string       `json:"labels"`
 }
 
 // We use a 2D array because the request body must be an array of a set of tasks
@@ -46,6 +47,7 @@ type Pipeline struct {
 type NewPipeline struct {
 	Name        string            `json:"name"`
 	Plan        core.PipelinePlan `json:"plan" swaggertype:"array,string" example:"please check api /pipelines/<PLUGIN_NAME>/pipeline-plan"`
+	Labels      []string          `json:"labels"`
 	BlueprintId uint64
 }
 
@@ -62,8 +64,21 @@ type DbPipeline struct {
 	Message       string     `json:"message"`
 	SpentSeconds  int        `json:"spentSeconds"`
 	Stage         int        `json:"stage"`
+
+	Labels []DbPipelineLabel `json:"-" gorm:"-"`
 }
 
 func (DbPipeline) TableName() string {
 	return "_devlake_pipelines"
 }
+
+type DbPipelineLabel struct {
+	CreatedAt  time.Time `json:"createdAt"`
+	UpdatedAt  time.Time `json:"updatedAt"`
+	PipelineId uint64    `json:"pipeline_id" gorm:"primaryKey"`
+	Name       string    `json:"name" gorm:"primaryKey;index"`
+}
+
+func (DbPipelineLabel) TableName() string {
+	return "_devlake_pipeline_labels"
+}
diff --git a/plugins/helper/api_async_client.go b/plugins/helper/api_async_client.go
index a7cc71222..1d54b8a98 100644
--- a/plugins/helper/api_async_client.go
+++ b/plugins/helper/api_async_client.go
@@ -111,7 +111,6 @@ func CreateAsyncApiClient(
 		numOfWorkers,
 		requests,
 		duration,
-		retry,
 		logger,
 	)
 	if err != nil {
diff --git a/plugins/helper/worker_scheduler.go b/plugins/helper/worker_scheduler.go
index e74f228f6..45be63c3f 100644
--- a/plugins/helper/worker_scheduler.go
+++ b/plugins/helper/worker_scheduler.go
@@ -48,7 +48,6 @@ func NewWorkerScheduler(
 	workerNum int,
 	maxWork int,
 	maxWorkDuration time.Duration,
-	maxRetry int,
 	logger core.Logger,
 ) (*WorkerScheduler, errors.Error) {
 	if maxWork <= 0 {
diff --git a/plugins/helper/worker_scheduler_test.go b/plugins/helper/worker_scheduler_test.go
index 22f055822..e95446596 100644
--- a/plugins/helper/worker_scheduler_test.go
+++ b/plugins/helper/worker_scheduler_test.go
@@ -31,7 +31,7 @@ func TestWorkerSchedulerQpsControl(t *testing.T) {
 	// assuming we want 2 requests per second
 	testChannel := make(chan int, 100)
 	ctx, cancel := context.WithCancel(context.Background())
-	s, _ := NewWorkerScheduler(ctx, 5, 2, 1*time.Second, 0, unithelper.DummyLogger())
+	s, _ := NewWorkerScheduler(ctx, 5, 2, 1*time.Second, unithelper.DummyLogger())
 	defer s.Release()
 	for i := 1; i <= 5; i++ {
 		t := i
diff --git a/services/blueprint.go b/services/blueprint.go
index 6c080ebdb..2c0cc7669 100644
--- a/services/blueprint.go
+++ b/services/blueprint.go
@@ -34,11 +34,13 @@ import (
 	"gorm.io/gorm"
 )
 
-// BlueprintQuery FIXME ...
+// BlueprintQuery is a query for GetBlueprints
 type BlueprintQuery struct {
-	Enable   *bool `form:"enable,omitempty"`
-	Page     int   `form:"page"`
-	PageSize int   `form:"pageSize"`
+	Enable   *bool  `form:"enable,omitempty"`
+	IsManual *bool  `form:"is_manual"`
+	Page     int    `form:"page"`
+	PageSize int    `form:"pageSize"`
+	Label    string `form:"label"`
 }
 
 var (
@@ -52,11 +54,12 @@ func CreateBlueprint(blueprint *models.Blueprint) errors.Error {
 	if err != nil {
 		return err
 	}
-	dbBlueprint, err := encryptDbBlueprint(parseDbBlueprint(blueprint))
+	dbBlueprint := parseDbBlueprint(blueprint)
+	dbBlueprint, err = encryptDbBlueprint(dbBlueprint)
 	if err != nil {
 		return err
 	}
-	err = CreateDbBlueprint(dbBlueprint)
+	err = SaveDbBlueprint(dbBlueprint)
 	if err != nil {
 		return err
 	}
@@ -177,9 +180,14 @@ func PatchBlueprint(id uint64, body map[string]interface{}) (*models.Blueprint,
 	}
 
 	// save
-	err = save(blueprint)
+	dbBlueprint := parseDbBlueprint(blueprint)
+	dbBlueprint, err = encryptDbBlueprint(dbBlueprint)
 	if err != nil {
-		return nil, errors.Internal.Wrap(err, "error saving blueprint")
+		return nil, err
+	}
+	err = SaveDbBlueprint(dbBlueprint)
+	if err != nil {
+		return nil, err
 	}
 
 	// reload schedule
@@ -206,29 +214,29 @@ func DeleteBlueprint(id uint64) errors.Error {
 
 // ReloadBlueprints FIXME ...
 func ReloadBlueprints(c *cron.Cron) errors.Error {
-	dbBlueprints := make([]*models.DbBlueprint, 0)
-	if err := db.Model(&models.DbBlueprint{}).
-		Where("enable = ? AND is_manual = ?", true, false).
-		Find(&dbBlueprints).Error; err != nil {
-		return errors.Internal.Wrap(err, "error finding blueprints while reloading")
+	enable := true
+	isManual := false
+	dbBlueprints, _, err := GetDbBlueprints(&BlueprintQuery{Enable: &enable, IsManual: &isManual})
+	if err != nil {
+		return err
 	}
 	for _, e := range c.Entries() {
 		c.Remove(e.ID)
 	}
 	c.Stop()
-	for _, pp := range dbBlueprints {
-		pp, err := decryptDbBlueprint(pp)
+	for _, dbBlueprint := range dbBlueprints {
+		dbBlueprint, err = decryptDbBlueprint(dbBlueprint)
 		if err != nil {
 			return err
 		}
-		blueprint := parseBlueprint(pp)
+		blueprint := parseBlueprint(dbBlueprint)
 		plan, err := blueprint.UnmarshalPlan()
 		if err != nil {
 			blueprintLog.Error(err, failToCreateCronJob)
 			return err
 		}
 		if _, err := c.AddFunc(blueprint.CronConfig, func() {
-			pipeline, err := createPipelineByBlueprint(blueprint.ID, blueprint.Name, plan)
+			pipeline, err := createPipelineByBlueprint(blueprint, blueprint.Name, plan)
 			if err != nil {
 				blueprintLog.Error(err, "run cron job failed")
 			} else {
@@ -246,11 +254,12 @@ func ReloadBlueprints(c *cron.Cron) errors.Error {
 	return nil
 }
 
-func createPipelineByBlueprint(blueprintId uint64, name string, plan core.PipelinePlan) (*models.Pipeline, errors.Error) {
+func createPipelineByBlueprint(blueprint *models.Blueprint, name string, plan core.PipelinePlan) (*models.Pipeline, errors.Error) {
 	newPipeline := models.NewPipeline{}
 	newPipeline.Plan = plan
 	newPipeline.Name = name
-	newPipeline.BlueprintId = blueprintId
+	newPipeline.BlueprintId = blueprint.ID
+	newPipeline.Labels = blueprint.Labels
 	pipeline, err := CreatePipeline(&newPipeline)
 	// Return all created tasks to the User
 	if err != nil {
@@ -342,15 +351,8 @@ func TriggerBlueprint(id uint64) (*models.Pipeline, errors.Error) {
 	if err != nil {
 		return nil, err
 	}
-	pipeline, err := createPipelineByBlueprint(blueprint.ID, blueprint.Name, plan)
+
+	pipeline, err := createPipelineByBlueprint(blueprint, blueprint.Name, plan)
 	// done
 	return pipeline, err
 }
-func save(blueprint *models.Blueprint) errors.Error {
-	dbBlueprint := parseDbBlueprint(blueprint)
-	dbBlueprint, err := encryptDbBlueprint(dbBlueprint)
-	if err != nil {
-		return err
-	}
-	return errors.Convert(db.Save(dbBlueprint).Error)
-}
diff --git a/services/blueprint_helper.go b/services/blueprint_helper.go
index 8a56bb710..e8fad2b81 100644
--- a/services/blueprint_helper.go
+++ b/services/blueprint_helper.go
@@ -26,36 +26,76 @@ import (
 	"gorm.io/gorm"
 )
 
-// CreateDbBlueprint accepts a Blueprint instance and insert it to database
-func CreateDbBlueprint(dbBlueprint *models.DbBlueprint) errors.Error {
-	err := db.Create(&dbBlueprint).Error
+// SaveDbBlueprint accepts a Blueprint instance and upsert it to database
+func SaveDbBlueprint(dbBlueprint *models.DbBlueprint) errors.Error {
+	var err error
+	if dbBlueprint.ID != 0 {
+		err = db.Save(&dbBlueprint).Error
+	} else {
+		err = db.Create(&dbBlueprint).Error
+	}
 	if err != nil {
 		return errors.Default.Wrap(err, "error creating DB blueprint")
 	}
+	err = db.Delete(&models.DbBlueprintLabel{}, `blueprint_id = ?`, dbBlueprint.ID).Error
+	if err != nil {
+		return errors.Default.Wrap(err, "error delete DB blueprint's old labelModels")
+	}
+	if len(dbBlueprint.Labels) > 0 {
+		for i := range dbBlueprint.Labels {
+			dbBlueprint.Labels[i].BlueprintId = dbBlueprint.ID
+		}
+		err = db.Create(&dbBlueprint.Labels).Error
+		if err != nil {
+			return errors.Default.Wrap(err, "error creating DB blueprint's labelModels")
+		}
+	}
 	return nil
 }
 
 // GetDbBlueprints returns a paginated list of Blueprints based on `query`
 func GetDbBlueprints(query *BlueprintQuery) ([]*models.DbBlueprint, int64, errors.Error) {
 	dbBlueprints := make([]*models.DbBlueprint, 0)
-	db := db.Model(dbBlueprints).Order("id DESC")
+	dbQuery := db.Model(dbBlueprints).Order("id DESC")
 	if query.Enable != nil {
-		db = db.Where("enable = ?", *query.Enable)
+		dbQuery = dbQuery.Where("enable = ?", *query.Enable)
+	}
+	if query.IsManual != nil {
+		dbQuery = dbQuery.Where("is_manual = ?", *query.IsManual)
+	}
+	if query.Label != "" {
+		dbQuery = dbQuery.
+			Joins(`left join _devlake_blueprint_labels ON _devlake_blueprint_labels.blueprint_id = _devlake_blueprints.id`).
+			Where(`_devlake_blueprint_labels.name = ?`, query.Label)
 	}
 
 	var count int64
-	err := db.Count(&count).Error
+	err := dbQuery.Count(&count).Error
 	if err != nil {
 		return nil, 0, errors.Default.Wrap(err, "error getting DB count of blueprints")
 	}
 
-	db = processDbClausesWithPager(db, query.PageSize, query.Page)
+	dbQuery = processDbClausesWithPager(dbQuery, query.PageSize, query.Page)
 
-	err = db.Find(&dbBlueprints).Error
+	err = dbQuery.Find(&dbBlueprints).Error
 	if err != nil {
 		return nil, 0, errors.Default.Wrap(err, "error finding DB blueprints")
 	}
 
+	var blueprintIds []uint64
+	for _, dbBlueprint := range dbBlueprints {
+		blueprintIds = append(blueprintIds, dbBlueprint.ID)
+	}
+	var dbLabels []models.DbBlueprintLabel
+	dbLabelsMap := map[uint64][]models.DbBlueprintLabel{}
+	db.Where(`blueprint_id in ?`, blueprintIds).Find(&dbLabels)
+	for _, dbLabel := range dbLabels {
+		dbLabelsMap[dbLabel.BlueprintId] = append(dbLabelsMap[dbLabel.BlueprintId], dbLabel)
+	}
+	for _, dbBlueprint := range dbBlueprints {
+		dbBlueprint.Labels = dbLabelsMap[dbBlueprint.ID]
+	}
+
 	return dbBlueprints, count, nil
 }
 
@@ -69,6 +109,10 @@ func GetDbBlueprint(dbBlueprintId uint64) (*models.DbBlueprint, errors.Error) {
 		}
 		return nil, errors.Default.Wrap(err, "error getting blueprint from DB")
 	}
+	err = db.Find(&dbBlueprint.Labels, "blueprint_id = ?", dbBlueprint.ID).Error
+	if err != nil {
+		return nil, errors.Internal.Wrap(err, "error getting the blueprint from database")
+	}
 	return dbBlueprint, nil
 }
 
@@ -82,17 +126,22 @@ func DeleteDbBlueprint(id uint64) errors.Error {
 }
 
 // parseBlueprint
-func parseBlueprint(DbBlueprint *models.DbBlueprint) *models.Blueprint {
+func parseBlueprint(dbBlueprint *models.DbBlueprint) *models.Blueprint {
+	labelList := []string{}
+	for _, labelModel := range dbBlueprint.Labels {
+		labelList = append(labelList, labelModel.Name)
+	}
 	blueprint := models.Blueprint{
-		Name:       DbBlueprint.Name,
-		Mode:       DbBlueprint.Mode,
-		Plan:       []byte(DbBlueprint.Plan),
-		Enable:     DbBlueprint.Enable,
-		CronConfig: DbBlueprint.CronConfig,
-		IsManual:   DbBlueprint.IsManual,
-		SkipOnFail: DbBlueprint.SkipOnFail,
-		Settings:   []byte(DbBlueprint.Settings),
-		Model:      DbBlueprint.Model,
+		Name:       dbBlueprint.Name,
+		Mode:       dbBlueprint.Mode,
+		Plan:       []byte(dbBlueprint.Plan),
+		Enable:     dbBlueprint.Enable,
+		CronConfig: dbBlueprint.CronConfig,
+		IsManual:   dbBlueprint.IsManual,
+		SkipOnFail: dbBlueprint.SkipOnFail,
+		Settings:   []byte(dbBlueprint.Settings),
+		Model:      dbBlueprint.Model,
+		Labels:     labelList,
 	}
 	return &blueprint
 }
@@ -110,6 +159,14 @@ func parseDbBlueprint(blueprint *models.Blueprint) *models.DbBlueprint {
 		Settings:   string(blueprint.Settings),
 		Model:      blueprint.Model,
 	}
+	dbBlueprint.Labels = []models.DbBlueprintLabel{}
+	for _, label := range blueprint.Labels {
+		dbBlueprint.Labels = append(dbBlueprint.Labels, models.DbBlueprintLabel{
+			// NOTICE: BlueprintId may be nil
+			BlueprintId: blueprint.ID,
+			Name:        label,
+		})
+	}
 	return &dbBlueprint
 }
 
diff --git a/services/pipeline.go b/services/pipeline.go
index 5bcbf3fd3..52a05d6f7 100644
--- a/services/pipeline.go
+++ b/services/pipeline.go
@@ -23,6 +23,7 @@ import (
 	"os"
 	"path/filepath"
 	"strings"
+	"sync"
 	"time"
 
 	"github.com/apache/incubator-devlake/errors"
@@ -40,13 +41,14 @@ var notificationService *NotificationService
 var temporalClient client.Client
 var globalPipelineLog = logger.Global.Nested("pipeline service")
 
-// PipelineQuery FIXME ...
+// PipelineQuery is a query for GetPipelines
 type PipelineQuery struct {
 	Status      string `form:"status"`
 	Pending     int    `form:"pending"`
 	Page        int    `form:"page"`
 	PageSize    int    `form:"pageSize"`
 	BlueprintId uint64 `uri:"blueprintId" form:"blueprint_id"`
+	Label       string `form:"label"`
 }
 
 func pipelineServiceInit() {
@@ -155,6 +157,8 @@ func GetPipelineLogsArchivePath(pipeline *models.Pipeline) (string, errors.Error
 // RunPipelineInQueue query pipeline from db and run it in a queue
 func RunPipelineInQueue(pipelineMaxParallel int64) {
 	sema := semaphore.NewWeighted(pipelineMaxParallel)
+	runningParallelLabels := []string{}
+	var runningParallelLabelLock sync.Mutex
 	for {
 		globalPipelineLog.Info("acquire lock")
 		// start goroutine when sema lock ready and pipeline exist.
@@ -167,27 +171,58 @@ func RunPipelineInQueue(pipelineMaxParallel int64) {
 		dbPipeline := &models.DbPipeline{}
 		for {
 			cronLocker.Lock()
+			// prepare query to find an appropriate pipeline to execute
 			db.Where("status IN ?", []string{models.TASK_CREATED, models.TASK_RERUN}).
+				Joins(`left join _devlake_pipeline_labels ON
+						_devlake_pipeline_labels.pipeline_id = _devlake_pipelines.id AND
+						_devlake_pipeline_labels.name LIKE 'parallel/%' AND
+						_devlake_pipeline_labels.name in ?`, runningParallelLabels).
+				Group(`id`).
+				Having(`count(_devlake_pipeline_labels.name)=0`).
+				Select("id").
 				Order("id ASC").Limit(1).Find(dbPipeline)
 			cronLocker.Unlock()
 			if dbPipeline.ID != 0 {
-				db.Model(&models.DbPipeline{}).Where("id = ?", dbPipeline.ID).Updates(map[string]interface{}{
-					"status":   models.TASK_RUNNING,
-					"message":  "",
-					"began_at": time.Now(),
-				})
 				break
 			}
 			time.Sleep(time.Second)
 		}
-		go func(pipelineId uint64) {
+
+		db.Model(&models.DbPipeline{}).Where("id = ?", dbPipeline.ID).Updates(map[string]interface{}{
+			"status":   models.TASK_RUNNING,
+			"message":  "",
+			"began_at": time.Now(),
+		})
+		dbPipeline, err = GetDbPipeline(dbPipeline.ID)
+		if err != nil {
+			panic(err)
+		}
+
+		// add pipelineParallelLabels to runningParallelLabels
+		var pipelineParallelLabels []string
+		for _, dbLabel := range dbPipeline.Labels {
+			if strings.HasPrefix(dbLabel.Name, `parallel/`) {
+				pipelineParallelLabels = append(pipelineParallelLabels, dbLabel.Name)
+			}
+		}
+		runningParallelLabelLock.Lock()
+		runningParallelLabels = append(runningParallelLabels, pipelineParallelLabels...)
+		runningParallelLabelLock.Unlock()
+
+		go func(pipelineId uint64, parallelLabels []string) {
 			defer sema.Release(1)
-			globalPipelineLog.Info("run pipeline, %d", pipelineId)
+			defer func() {
+				runningParallelLabelLock.Lock()
+				runningParallelLabels = utils.SliceRemove(runningParallelLabels, parallelLabels...)
+				runningParallelLabelLock.Unlock()
+				globalPipelineLog.Info("finish pipeline #%d, now runningParallelLabels is %s", pipelineId, runningParallelLabels)
+			}()
+			globalPipelineLog.Info("run pipeline, %d, now running runningParallelLabels are %s", pipelineId, runningParallelLabels)
 			err = runPipeline(pipelineId)
 			if err != nil {
 				globalPipelineLog.Error(err, "failed to run pipeline %d", pipelineId)
 			}
-		}(dbPipeline.ID)
+		}(dbPipeline.ID, pipelineParallelLabels)
 	}
 }
 
diff --git a/services/pipeline_helper.go b/services/pipeline_helper.go
index a8e38dc18..2d59f802f 100644
--- a/services/pipeline_helper.go
+++ b/services/pipeline_helper.go
@@ -52,10 +52,25 @@ func CreateDbPipeline(newPipeline *models.NewPipeline) (*models.DbPipeline, erro
 	if err != nil {
 		return nil, err
 	}
+
 	// save pipeline to database
 	if err := db.Create(&dbPipeline).Error; err != nil {
-		globalPipelineLog.Error(err, "create pipline failed: %v", err)
-		return nil, errors.Internal.Wrap(err, "create pipline failed")
+		globalPipelineLog.Error(err, "create pipeline failed: %v", err)
+		return nil, errors.Internal.Wrap(err, "create pipeline failed")
+	}
+
+	dbPipeline.Labels = []models.DbPipelineLabel{}
+	for _, label := range newPipeline.Labels {
+		dbPipeline.Labels = append(dbPipeline.Labels, models.DbPipelineLabel{
+			PipelineId: dbPipeline.ID,
+			Name:       label,
+		})
+	}
+	if len(dbPipeline.Labels) > 0 {
+		if err := db.Create(&dbPipeline.Labels).Error; err != nil {
+			globalPipelineLog.Error(err, "create pipeline's labelModels failed: %v", err)
+			return nil, errors.Internal.Wrap(err, "create pipeline's labelModels failed")
+		}
 	}
 
 	// create tasks accordingly
@@ -99,28 +114,48 @@ func CreateDbPipeline(newPipeline *models.NewPipeline) (*models.DbPipeline, erro
 // GetDbPipelines by query
 func GetDbPipelines(query *PipelineQuery) ([]*models.DbPipeline, int64, errors.Error) {
 	dbPipelines := make([]*models.DbPipeline, 0)
-	db := db.Model(dbPipelines).Order("id DESC")
+	dbQuery := db.Model(dbPipelines).Order("id DESC")
 	if query.BlueprintId != 0 {
-		db = db.Where("blueprint_id = ?", query.BlueprintId)
+		dbQuery = dbQuery.Where("blueprint_id = ?", query.BlueprintId)
 	}
 	if query.Status != "" {
-		db = db.Where("status = ?", query.Status)
+		dbQuery = dbQuery.Where("status = ?", query.Status)
 	}
 	if query.Pending > 0 {
-		db = db.Where("finished_at is null and status != ?", "TASK_FAILED")
+		dbQuery = dbQuery.Where("finished_at is null and status != ?", "TASK_FAILED")
+	}
+	if query.Label != "" {
+		dbQuery = dbQuery.
+			Joins(`left join _devlake_pipeline_labels ON _devlake_pipeline_labels.pipeline_id = _devlake_pipelines.id`).
+			Where(`_devlake_pipeline_labels.name = ?`, query.Label)
 	}
 	var count int64
-	err := db.Count(&count).Error
+	err := dbQuery.Count(&count).Error
 	if err != nil {
 		return nil, 0, errors.Default.Wrap(err, "error getting DB pipelines count")
 	}
 
-	db = processDbClausesWithPager(db, query.PageSize, query.Page)
+	dbQuery = processDbClausesWithPager(dbQuery, query.PageSize, query.Page)
 
-	err = db.Find(&dbPipelines).Error
+	err = dbQuery.Find(&dbPipelines).Error
 	if err != nil {
 		return nil, count, errors.Default.Wrap(err, "error finding DB pipelines")
 	}
+
+	var pipelineIds []uint64
+	for _, dbPipeline := range dbPipelines {
+		pipelineIds = append(pipelineIds, dbPipeline.ID)
+	}
+	dbLabels := []models.DbPipelineLabel{}
+	db.Where(`pipeline_id in ?`, pipelineIds).Find(&dbLabels)
+	dbLabelsMap := map[uint64][]models.DbPipelineLabel{}
+	for _, dbLabel := range dbLabels {
+		dbLabelsMap[dbLabel.PipelineId] = append(dbLabelsMap[dbLabel.PipelineId], dbLabel)
+	}
+	for _, dbPipeline := range dbPipelines {
+		dbPipeline.Labels = dbLabelsMap[dbPipeline.ID]
+	}
+
 	return dbPipelines, count, nil
 }
 
@@ -134,11 +169,19 @@ func GetDbPipeline(pipelineId uint64) (*models.DbPipeline, errors.Error) {
 		}
 		return nil, errors.Internal.Wrap(err, "error getting the pipeline from database")
 	}
+	err = db.Find(&dbPipeline.Labels, "pipeline_id = ?", pipelineId).Error
+	if err != nil {
+		return nil, errors.Internal.Wrap(err, "error getting the pipeline from database")
+	}
 	return dbPipeline, nil
 }
 
 // parsePipeline converts DbPipeline to Pipeline
 func parsePipeline(dbPipeline *models.DbPipeline) *models.Pipeline {
+	labelList := []string{}
+	for _, labelModel := range dbPipeline.Labels {
+		labelList = append(labelList, labelModel.Name)
+	}
 	pipeline := models.Pipeline{
 		Model:         dbPipeline.Model,
 		Name:          dbPipeline.Name,
@@ -152,11 +195,13 @@ func parsePipeline(dbPipeline *models.DbPipeline) *models.Pipeline {
 		Message:       dbPipeline.Message,
 		SpentSeconds:  dbPipeline.SpentSeconds,
 		Stage:         dbPipeline.Stage,
+		Labels:        labelList,
 	}
 	return &pipeline
 }
 
 // parseDbPipeline converts Pipeline to DbPipeline
+// nolint:unused
 func parseDbPipeline(pipeline *models.Pipeline) *models.DbPipeline {
 	dbPipeline := models.DbPipeline{
 		Model:         pipeline.Model,
@@ -172,6 +217,14 @@ func parseDbPipeline(pipeline *models.Pipeline) *models.DbPipeline {
 		SpentSeconds:  pipeline.SpentSeconds,
 		Stage:         pipeline.Stage,
 	}
+	dbPipeline.Labels = []models.DbPipelineLabel{}
+	for _, label := range pipeline.Labels {
+		dbPipeline.Labels = append(dbPipeline.Labels, models.DbPipelineLabel{
+			// NOTICE: PipelineId may be nil
+			PipelineId: pipeline.ID,
+			Name:       label,
+		})
+	}
 	return &dbPipeline
 }
 
diff --git a/services/pipeline_runner.go b/services/pipeline_runner.go
index 0ec968704..8eea52ebb 100644
--- a/services/pipeline_runner.go
+++ b/services/pipeline_runner.go
@@ -117,27 +117,28 @@ func runPipeline(pipelineId uint64) errors.Error {
 	if err != nil {
 		err = errors.Default.Wrap(err, fmt.Sprintf("Error running pipeline %d.", pipelineId))
 	}
-	pipeline, e := GetPipeline(pipelineId)
+	dbPipeline, e := GetDbPipeline(pipelineId)
 	if e != nil {
 		return errors.Default.Wrap(err, fmt.Sprintf("Unable to get pipeline %d.", pipelineId))
 	}
 	// finished, update database
 	finishedAt := time.Now()
-	pipeline.FinishedAt = &finishedAt
-	pipeline.SpentSeconds = int(finishedAt.Unix() - pipeline.BeganAt.Unix())
+	dbPipeline.FinishedAt = &finishedAt
+	if dbPipeline.BeganAt != nil {
+		dbPipeline.SpentSeconds = int(finishedAt.Unix() - dbPipeline.BeganAt.Unix())
+	}
 	if err != nil {
-		pipeline.Status = models.TASK_FAILED
+		dbPipeline.Status = models.TASK_FAILED
 		if lakeErr := errors.AsLakeErrorType(err); lakeErr != nil {
-			pipeline.Message = lakeErr.Messages().Format()
+			dbPipeline.Message = lakeErr.Messages().Format()
 		} else {
-			pipeline.Message = err.Error()
+			dbPipeline.Message = err.Error()
 		}
 	} else {
-		pipeline.Status = models.TASK_COMPLETED
-		pipeline.Message = ""
+		dbPipeline.Status = models.TASK_COMPLETED
+		dbPipeline.Message = ""
 	}
-	dbPipeline := parseDbPipeline(pipeline)
-	dbe := db.Model(dbPipeline).Select("finished_at", "spent_seconds", "status", "message").Updates(dbPipeline).Error
+	dbe := db.Model(dbPipeline).Updates(dbPipeline).Error
 	if dbe != nil {
 		globalPipelineLog.Error(dbe, "update pipeline state failed")
 		return errors.Convert(dbe)
diff --git a/utils/slice.go b/utils/slice.go
new file mode 100644
index 000000000..659a21078
--- /dev/null
+++ b/utils/slice.go
@@ -0,0 +1,37 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package utils
+
+// SliceRemove remove some items in old slice
+func SliceRemove[T ~int | ~string](source []T, toRemoves ...T) []T {
+	j := 0
+	for _, v := range source {
+		needRemove := false
+		for _, toRemove := range toRemoves {
+			if v == toRemove {
+				needRemove = true
+				break
+			}
+		}
+		if !needRemove {
+			source[j] = v
+			j++
+		}
+	}
+	return source[:j]
+}
diff --git a/utils/slice_test.go b/utils/slice_test.go
new file mode 100644
index 000000000..d55f8ae6e
--- /dev/null
+++ b/utils/slice_test.go
@@ -0,0 +1,30 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package utils
+
+import (
+	"github.com/stretchr/testify/assert"
+	"testing"
+)
+
+// TestSliceRemove test the SliceRemove
+func TestSliceRemove(t *testing.T) {
+	assert.Equal(t, []int{3, 4, 5}, SliceRemove([]int{1, 2, 3, 4, 5}, 1, 2))
+	assert.Equal(t, []int{1, 2, 4, 5}, SliceRemove([]int{1, 2, 3, 4, 5}, 3, 3))
+	assert.Equal(t, []string{`1`, `2`, `4`, `5`}, SliceRemove([]string{`1`, `2`, `3`, `4`, `5`}, `3`, `3`))
+}