You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@devlake.apache.org by kl...@apache.org on 2022/11/24 02:47:50 UTC
[incubator-devlake] branch main updated: Issues/3234 add parallelLabels for blueprint/pipeline (#3764)
This is an automated email from the ASF dual-hosted git repository.
klesh pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git
The following commit(s) were added to refs/heads/main by this push:
new 3b61d4ab7 Issues/3234 add parallelLabels for blueprint/pipeline (#3764)
3b61d4ab7 is described below
commit 3b61d4ab76fe85f45348c35d7515372c1d4d0a3c
Author: Likyh <ya...@meri.co>
AuthorDate: Thu Nov 24 10:47:45 2022 +0800
Issues/3234 add parallelLabels for blueprint/pipeline (#3764)
* fix: delete a unuse var
* feat: add parallelLabels in pipeline
* feat: add parallel_label in pipeline api
* feat: add parallel labels for blueprint
* fix: fix for linter
* fix: fix some bug found by api docs
* feat: use common label to replace parallel
* fix: fix a bug
* fix: add lock for review
* fix: fix for review, add Labels in the model DbPipeline and DbBlueprint.
* fix: use tab instead of space in sql
---
api/blueprints/blueprints.go | 10 ++-
api/pipelines/pipelines.go | 12 +--
config-ui/src/hooks/usePipelineManager.jsx | 10 +--
.../src/pages/blueprints/blueprint-detail.jsx | 2 +-
.../src/pages/blueprints/create-blueprint.jsx | 2 +-
models/blueprint.go | 16 +++-
models/migrationscripts/20221115_add_labels.go | 61 ++++++++++++++
models/migrationscripts/register.go | 1 +
models/pipeline.go | 15 ++++
plugins/helper/api_async_client.go | 1 -
plugins/helper/worker_scheduler.go | 1 -
plugins/helper/worker_scheduler_test.go | 2 +-
services/blueprint.go | 58 +++++++-------
services/blueprint_helper.go | 93 +++++++++++++++++-----
services/pipeline.go | 53 +++++++++---
services/pipeline_helper.go | 71 ++++++++++++++---
services/pipeline_runner.go | 21 ++---
utils/slice.go | 37 +++++++++
utils/slice_test.go | 30 +++++++
19 files changed, 402 insertions(+), 94 deletions(-)
diff --git a/api/blueprints/blueprints.go b/api/blueprints/blueprints.go
index 517f6c30e..a28642672 100644
--- a/api/blueprints/blueprints.go
+++ b/api/blueprints/blueprints.go
@@ -29,8 +29,8 @@ import (
)
type PaginatedBlueprint struct {
- Blueprints []*models.Blueprint
- Count int64
+ Blueprints []*models.Blueprint `json:"blueprints"`
+ Count int64 `json:"count"`
}
// @Summary post blueprints
@@ -63,7 +63,11 @@ func Post(c *gin.Context) {
// @Summary get blueprints
// @Description get blueprints
// @Tags framework/blueprints
-// @Accept application/json
+// @Param enable query bool false "enable"
+// @Param is_manual query bool false "is_manual"
+// @Param page query int false "page"
+// @Param page_size query int false "page_size"
+// @Param label query string false "label"
// @Success 200 {object} PaginatedBlueprint
// @Failure 400 {object} shared.ApiBody "Bad Request"
// @Failure 500 {object} shared.ApiBody "Internal Error"
diff --git a/api/pipelines/pipelines.go b/api/pipelines/pipelines.go
index 4a144f124..c35ab5d19 100644
--- a/api/pipelines/pipelines.go
+++ b/api/pipelines/pipelines.go
@@ -81,12 +81,14 @@ GET /pipelines?status=TASK_RUNNING&pending=1&page=1&pagesize=10
*/
// @Summary Get list of pipelines
-// @Description GET /pipelines?status=TASK_RUNNING&pending=1&page=1&pagesize=10
+// @Description GET /pipelines?status=TASK_RUNNING&pending=1&label=search_text&page=1&pagesize=10
// @Tags framework/pipelines
-// @Param status query string true "query"
-// @Param pending query int true "query"
-// @Param page query int true "query"
-// @Param pagesize query int true "query"
+// @Param status query string false "status"
+// @Param pending query int false "pending"
+// @Param page query int false "page"
+// @Param pagesize query int false "pagesize"
+// @Param blueprint_id query int false "blueprint_id"
+// @Param label query string false "label"
// @Success 200 {object} shared.ResponsePipelines
// @Failure 400 {string} errcode.Error "Bad Request"
// @Failure 500 {string} errcode.Error "Internel Error"
diff --git a/config-ui/src/hooks/usePipelineManager.jsx b/config-ui/src/hooks/usePipelineManager.jsx
index c9e3ab17f..fdbd7647e 100644
--- a/config-ui/src/hooks/usePipelineManager.jsx
+++ b/config-ui/src/hooks/usePipelineManager.jsx
@@ -57,21 +57,19 @@ function usePipelineManager(
const [logfile, setLogfile] = useState('logging.tar.gz')
const runPipeline = useCallback(
- (runSettings = null) => {
- console.log('>> RUNNING PIPELINE....')
+ (blueprintId) => {
+ console.log('>> RUNNING PIPELINE....', blueprintId)
try {
setIsRunning(true)
setErrors([])
ToastNotification.clear()
- console.log('>> DISPATCHING PIPELINE REQUEST', runSettings || settings)
const run = async () => {
// @todo: remove "ID" fallback key when no longer needed
const p = await request.post(
- `${DEVLAKE_ENDPOINT}/pipelines`,
- runSettings || settings
+ `${DEVLAKE_ENDPOINT}/blueprints/${blueprintId}/trigger`
)
const t = await request.get(
- `${DEVLAKE_ENDPOINT}/pipelines/${p.data?.ID || p.data?.id}/tasks`
+ `${DEVLAKE_ENDPOINT}/pipelines/${p.data?.id}/tasks`
)
console.log('>> RAW PIPELINE DATA FROM API...', p.data)
setPipelineRun({
diff --git a/config-ui/src/pages/blueprints/blueprint-detail.jsx b/config-ui/src/pages/blueprints/blueprint-detail.jsx
index 5d77b5368..72929b968 100644
--- a/config-ui/src/pages/blueprints/blueprint-detail.jsx
+++ b/config-ui/src/pages/blueprints/blueprint-detail.jsx
@@ -134,7 +134,7 @@ const BlueprintDetail = (props) => {
const runBlueprint = useCallback(() => {
if (activeBlueprint !== null) {
- runPipeline()
+ runPipeline(activeBlueprint.id)
}
}, [activeBlueprint, runPipeline])
diff --git a/config-ui/src/pages/blueprints/create-blueprint.jsx b/config-ui/src/pages/blueprints/create-blueprint.jsx
index f8cdf75fd..11d1b3cef 100644
--- a/config-ui/src/pages/blueprints/create-blueprint.jsx
+++ b/config-ui/src/pages/blueprints/create-blueprint.jsx
@@ -812,7 +812,7 @@ const CreateBlueprint = (props) => {
blueprintId: saveBlueprintComplete?.id,
plan: saveBlueprintComplete?.plan
}
- runPipeline(newPipelineConfiguration)
+ runPipeline(saveBlueprintComplete?.id)
setRunNow(false)
history.push(`/blueprints/detail/${saveBlueprintComplete?.id}`)
} else if (newBlueprintId) {
diff --git a/models/blueprint.go b/models/blueprint.go
index 4d00a0931..f223330ff 100644
--- a/models/blueprint.go
+++ b/models/blueprint.go
@@ -19,9 +19,9 @@ package models
import (
"encoding/json"
+ "time"
"github.com/apache/incubator-devlake/errors"
-
"github.com/apache/incubator-devlake/models/common"
"github.com/apache/incubator-devlake/plugins/core"
)
@@ -42,6 +42,7 @@ type Blueprint struct {
CronConfig string `json:"cronConfig" format:"* * * * *" example:"0 0 * * 1"`
IsManual bool `json:"isManual"`
SkipOnFail bool `json:"skipOnFail"`
+ Labels []string `json:"labels"`
Settings json.RawMessage `json:"settings" swaggertype:"array,string" example:"please check api: /blueprints/<PLUGIN_NAME>/blueprint-setting"`
common.Model `swaggerignore:"true"`
}
@@ -77,8 +78,21 @@ type DbBlueprint struct {
SkipOnFail bool `json:"skipOnFail"`
Settings string `json:"settings" encrypt:"yes" swaggertype:"array,string" example:"please check api: /blueprints/<PLUGIN_NAME>/blueprint-setting"`
common.Model `swaggerignore:"true"`
+
+ Labels []DbBlueprintLabel `json:"-" gorm:"-"`
}
func (DbBlueprint) TableName() string {
return "_devlake_blueprints"
}
+
+type DbBlueprintLabel struct {
+ CreatedAt time.Time `json:"createdAt"`
+ UpdatedAt time.Time `json:"updatedAt"`
+ BlueprintId uint64 `json:"blueprint_id" gorm:"primaryKey"`
+ Name string `json:"name" gorm:"primaryKey;index"`
+}
+
+func (DbBlueprintLabel) TableName() string {
+ return "_devlake_blueprint_labels"
+}
diff --git a/models/migrationscripts/20221115_add_labels.go b/models/migrationscripts/20221115_add_labels.go
new file mode 100644
index 000000000..0c82066bf
--- /dev/null
+++ b/models/migrationscripts/20221115_add_labels.go
@@ -0,0 +1,61 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package migrationscripts
+
+import (
+ "github.com/apache/incubator-devlake/errors"
+ "github.com/apache/incubator-devlake/helpers/migrationhelper"
+ "github.com/apache/incubator-devlake/plugins/core"
+ "time"
+)
+
+type DbPipelineLabel20221115 struct {
+ CreatedAt time.Time `json:"createdAt"`
+ UpdatedAt time.Time `json:"updatedAt"`
+ PipelineId uint64 `json:"pipeline_id" gorm:"primaryKey"`
+ Name string `json:"name" gorm:"primaryKey;index"`
+}
+
+func (DbPipelineLabel20221115) TableName() string {
+ return "_devlake_pipeline_labels"
+}
+
+type DbBlueprintLabel20221115 struct {
+ CreatedAt time.Time `json:"createdAt"`
+ UpdatedAt time.Time `json:"updatedAt"`
+ BlueprintId uint64 `json:"blueprint_id" gorm:"primaryKey"`
+ Name string `json:"name" gorm:"primaryKey;index"`
+}
+
+func (DbBlueprintLabel20221115) TableName() string {
+ return "_devlake_blueprint_labels"
+}
+
+type addLabels struct{}
+
+func (*addLabels) Up(res core.BasicRes) errors.Error {
+ return migrationhelper.AutoMigrateTables(res, &DbPipelineLabel20221115{}, &DbBlueprintLabel20221115{})
+}
+
+func (*addLabels) Version() uint64 {
+ return 20221115000034
+}
+
+func (*addLabels) Name() string {
+ return "add labels' schema for blueprint and pipeline"
+}
diff --git a/models/migrationscripts/register.go b/models/migrationscripts/register.go
index 00478e238..943616df1 100644
--- a/models/migrationscripts/register.go
+++ b/models/migrationscripts/register.go
@@ -60,5 +60,6 @@ func All() []core.MigrationScript {
new(addProjectTables),
new(addProjectToBluePrint),
new(addProjectIssueMetric),
+ new(addLabels),
}
}
diff --git a/models/pipeline.go b/models/pipeline.go
index e3d56465e..7427c2338 100644
--- a/models/pipeline.go
+++ b/models/pipeline.go
@@ -39,6 +39,7 @@ type Pipeline struct {
Message string `json:"message"`
SpentSeconds int `json:"spentSeconds"`
Stage int `json:"stage"`
+ Labels []string `json:"labels"`
}
// We use a 2D array because the request body must be an array of a set of tasks
@@ -46,6 +47,7 @@ type Pipeline struct {
type NewPipeline struct {
Name string `json:"name"`
Plan core.PipelinePlan `json:"plan" swaggertype:"array,string" example:"please check api /pipelines/<PLUGIN_NAME>/pipeline-plan"`
+ Labels []string `json:"labels"`
BlueprintId uint64
}
@@ -62,8 +64,21 @@ type DbPipeline struct {
Message string `json:"message"`
SpentSeconds int `json:"spentSeconds"`
Stage int `json:"stage"`
+
+ Labels []DbPipelineLabel `json:"-" gorm:"-"`
}
func (DbPipeline) TableName() string {
return "_devlake_pipelines"
}
+
+type DbPipelineLabel struct {
+ CreatedAt time.Time `json:"createdAt"`
+ UpdatedAt time.Time `json:"updatedAt"`
+ PipelineId uint64 `json:"pipeline_id" gorm:"primaryKey"`
+ Name string `json:"name" gorm:"primaryKey;index"`
+}
+
+func (DbPipelineLabel) TableName() string {
+ return "_devlake_pipeline_labels"
+}
diff --git a/plugins/helper/api_async_client.go b/plugins/helper/api_async_client.go
index a7cc71222..1d54b8a98 100644
--- a/plugins/helper/api_async_client.go
+++ b/plugins/helper/api_async_client.go
@@ -111,7 +111,6 @@ func CreateAsyncApiClient(
numOfWorkers,
requests,
duration,
- retry,
logger,
)
if err != nil {
diff --git a/plugins/helper/worker_scheduler.go b/plugins/helper/worker_scheduler.go
index e74f228f6..45be63c3f 100644
--- a/plugins/helper/worker_scheduler.go
+++ b/plugins/helper/worker_scheduler.go
@@ -48,7 +48,6 @@ func NewWorkerScheduler(
workerNum int,
maxWork int,
maxWorkDuration time.Duration,
- maxRetry int,
logger core.Logger,
) (*WorkerScheduler, errors.Error) {
if maxWork <= 0 {
diff --git a/plugins/helper/worker_scheduler_test.go b/plugins/helper/worker_scheduler_test.go
index 22f055822..e95446596 100644
--- a/plugins/helper/worker_scheduler_test.go
+++ b/plugins/helper/worker_scheduler_test.go
@@ -31,7 +31,7 @@ func TestWorkerSchedulerQpsControl(t *testing.T) {
// assuming we want 2 requests per second
testChannel := make(chan int, 100)
ctx, cancel := context.WithCancel(context.Background())
- s, _ := NewWorkerScheduler(ctx, 5, 2, 1*time.Second, 0, unithelper.DummyLogger())
+ s, _ := NewWorkerScheduler(ctx, 5, 2, 1*time.Second, unithelper.DummyLogger())
defer s.Release()
for i := 1; i <= 5; i++ {
t := i
diff --git a/services/blueprint.go b/services/blueprint.go
index 6c080ebdb..2c0cc7669 100644
--- a/services/blueprint.go
+++ b/services/blueprint.go
@@ -34,11 +34,13 @@ import (
"gorm.io/gorm"
)
-// BlueprintQuery FIXME ...
+// BlueprintQuery is a query for GetBlueprints
type BlueprintQuery struct {
- Enable *bool `form:"enable,omitempty"`
- Page int `form:"page"`
- PageSize int `form:"pageSize"`
+ Enable *bool `form:"enable,omitempty"`
+ IsManual *bool `form:"is_manual"`
+ Page int `form:"page"`
+ PageSize int `form:"pageSize"`
+ Label string `form:"label"`
}
var (
@@ -52,11 +54,12 @@ func CreateBlueprint(blueprint *models.Blueprint) errors.Error {
if err != nil {
return err
}
- dbBlueprint, err := encryptDbBlueprint(parseDbBlueprint(blueprint))
+ dbBlueprint := parseDbBlueprint(blueprint)
+ dbBlueprint, err = encryptDbBlueprint(dbBlueprint)
if err != nil {
return err
}
- err = CreateDbBlueprint(dbBlueprint)
+ err = SaveDbBlueprint(dbBlueprint)
if err != nil {
return err
}
@@ -177,9 +180,14 @@ func PatchBlueprint(id uint64, body map[string]interface{}) (*models.Blueprint,
}
// save
- err = save(blueprint)
+ dbBlueprint := parseDbBlueprint(blueprint)
+ dbBlueprint, err = encryptDbBlueprint(dbBlueprint)
if err != nil {
- return nil, errors.Internal.Wrap(err, "error saving blueprint")
+ return nil, err
+ }
+ err = SaveDbBlueprint(dbBlueprint)
+ if err != nil {
+ return nil, err
}
// reload schedule
@@ -206,29 +214,29 @@ func DeleteBlueprint(id uint64) errors.Error {
// ReloadBlueprints FIXME ...
func ReloadBlueprints(c *cron.Cron) errors.Error {
- dbBlueprints := make([]*models.DbBlueprint, 0)
- if err := db.Model(&models.DbBlueprint{}).
- Where("enable = ? AND is_manual = ?", true, false).
- Find(&dbBlueprints).Error; err != nil {
- return errors.Internal.Wrap(err, "error finding blueprints while reloading")
+ enable := true
+ isManual := false
+ dbBlueprints, _, err := GetDbBlueprints(&BlueprintQuery{Enable: &enable, IsManual: &isManual})
+ if err != nil {
+ return err
}
for _, e := range c.Entries() {
c.Remove(e.ID)
}
c.Stop()
- for _, pp := range dbBlueprints {
- pp, err := decryptDbBlueprint(pp)
+ for _, dbBlueprint := range dbBlueprints {
+ dbBlueprint, err = decryptDbBlueprint(dbBlueprint)
if err != nil {
return err
}
- blueprint := parseBlueprint(pp)
+ blueprint := parseBlueprint(dbBlueprint)
plan, err := blueprint.UnmarshalPlan()
if err != nil {
blueprintLog.Error(err, failToCreateCronJob)
return err
}
if _, err := c.AddFunc(blueprint.CronConfig, func() {
- pipeline, err := createPipelineByBlueprint(blueprint.ID, blueprint.Name, plan)
+ pipeline, err := createPipelineByBlueprint(blueprint, blueprint.Name, plan)
if err != nil {
blueprintLog.Error(err, "run cron job failed")
} else {
@@ -246,11 +254,12 @@ func ReloadBlueprints(c *cron.Cron) errors.Error {
return nil
}
-func createPipelineByBlueprint(blueprintId uint64, name string, plan core.PipelinePlan) (*models.Pipeline, errors.Error) {
+func createPipelineByBlueprint(blueprint *models.Blueprint, name string, plan core.PipelinePlan) (*models.Pipeline, errors.Error) {
newPipeline := models.NewPipeline{}
newPipeline.Plan = plan
newPipeline.Name = name
- newPipeline.BlueprintId = blueprintId
+ newPipeline.BlueprintId = blueprint.ID
+ newPipeline.Labels = blueprint.Labels
pipeline, err := CreatePipeline(&newPipeline)
// Return all created tasks to the User
if err != nil {
@@ -342,15 +351,8 @@ func TriggerBlueprint(id uint64) (*models.Pipeline, errors.Error) {
if err != nil {
return nil, err
}
- pipeline, err := createPipelineByBlueprint(blueprint.ID, blueprint.Name, plan)
+
+ pipeline, err := createPipelineByBlueprint(blueprint, blueprint.Name, plan)
// done
return pipeline, err
}
-func save(blueprint *models.Blueprint) errors.Error {
- dbBlueprint := parseDbBlueprint(blueprint)
- dbBlueprint, err := encryptDbBlueprint(dbBlueprint)
- if err != nil {
- return err
- }
- return errors.Convert(db.Save(dbBlueprint).Error)
-}
diff --git a/services/blueprint_helper.go b/services/blueprint_helper.go
index 8a56bb710..e8fad2b81 100644
--- a/services/blueprint_helper.go
+++ b/services/blueprint_helper.go
@@ -26,36 +26,76 @@ import (
"gorm.io/gorm"
)
-// CreateDbBlueprint accepts a Blueprint instance and insert it to database
-func CreateDbBlueprint(dbBlueprint *models.DbBlueprint) errors.Error {
- err := db.Create(&dbBlueprint).Error
+// SaveDbBlueprint accepts a Blueprint instance and upsert it to database
+func SaveDbBlueprint(dbBlueprint *models.DbBlueprint) errors.Error {
+ var err error
+ if dbBlueprint.ID != 0 {
+ err = db.Save(&dbBlueprint).Error
+ } else {
+ err = db.Create(&dbBlueprint).Error
+ }
if err != nil {
return errors.Default.Wrap(err, "error creating DB blueprint")
}
+ err = db.Delete(&models.DbBlueprintLabel{}, `blueprint_id = ?`, dbBlueprint.ID).Error
+ if err != nil {
+ return errors.Default.Wrap(err, "error delete DB blueprint's old labelModels")
+ }
+ if len(dbBlueprint.Labels) > 0 {
+ for i := range dbBlueprint.Labels {
+ dbBlueprint.Labels[i].BlueprintId = dbBlueprint.ID
+ }
+ err = db.Create(&dbBlueprint.Labels).Error
+ if err != nil {
+ return errors.Default.Wrap(err, "error creating DB blueprint's labelModels")
+ }
+ }
return nil
}
// GetDbBlueprints returns a paginated list of Blueprints based on `query`
func GetDbBlueprints(query *BlueprintQuery) ([]*models.DbBlueprint, int64, errors.Error) {
dbBlueprints := make([]*models.DbBlueprint, 0)
- db := db.Model(dbBlueprints).Order("id DESC")
+ dbQuery := db.Model(dbBlueprints).Order("id DESC")
if query.Enable != nil {
- db = db.Where("enable = ?", *query.Enable)
+ dbQuery = dbQuery.Where("enable = ?", *query.Enable)
+ }
+ if query.IsManual != nil {
+ dbQuery = dbQuery.Where("is_manual = ?", *query.IsManual)
+ }
+ if query.Label != "" {
+ dbQuery = dbQuery.
+ Joins(`left join _devlake_blueprint_labels ON _devlake_blueprint_labels.blueprint_id = _devlake_blueprints.id`).
+ Where(`_devlake_blueprint_labels.name = ?`, query.Label)
}
var count int64
- err := db.Count(&count).Error
+ err := dbQuery.Count(&count).Error
if err != nil {
return nil, 0, errors.Default.Wrap(err, "error getting DB count of blueprints")
}
- db = processDbClausesWithPager(db, query.PageSize, query.Page)
+ dbQuery = processDbClausesWithPager(dbQuery, query.PageSize, query.Page)
- err = db.Find(&dbBlueprints).Error
+ err = dbQuery.Find(&dbBlueprints).Error
if err != nil {
return nil, 0, errors.Default.Wrap(err, "error finding DB blueprints")
}
+ var blueprintIds []uint64
+ for _, dbBlueprint := range dbBlueprints {
+ blueprintIds = append(blueprintIds, dbBlueprint.ID)
+ }
+ var dbLabels []models.DbBlueprintLabel
+ dbLabelsMap := map[uint64][]models.DbBlueprintLabel{}
+ db.Where(`blueprint_id in ?`, blueprintIds).Find(&dbLabels)
+ for _, dbLabel := range dbLabels {
+ dbLabelsMap[dbLabel.BlueprintId] = append(dbLabelsMap[dbLabel.BlueprintId], dbLabel)
+ }
+ for _, dbBlueprint := range dbBlueprints {
+ dbBlueprint.Labels = dbLabelsMap[dbBlueprint.ID]
+ }
+
return dbBlueprints, count, nil
}
@@ -69,6 +109,10 @@ func GetDbBlueprint(dbBlueprintId uint64) (*models.DbBlueprint, errors.Error) {
}
return nil, errors.Default.Wrap(err, "error getting blueprint from DB")
}
+ err = db.Find(&dbBlueprint.Labels, "blueprint_id = ?", dbBlueprint.ID).Error
+ if err != nil {
+ return nil, errors.Internal.Wrap(err, "error getting the blueprint from database")
+ }
return dbBlueprint, nil
}
@@ -82,17 +126,22 @@ func DeleteDbBlueprint(id uint64) errors.Error {
}
// parseBlueprint
-func parseBlueprint(DbBlueprint *models.DbBlueprint) *models.Blueprint {
+func parseBlueprint(dbBlueprint *models.DbBlueprint) *models.Blueprint {
+ labelList := []string{}
+ for _, labelModel := range dbBlueprint.Labels {
+ labelList = append(labelList, labelModel.Name)
+ }
blueprint := models.Blueprint{
- Name: DbBlueprint.Name,
- Mode: DbBlueprint.Mode,
- Plan: []byte(DbBlueprint.Plan),
- Enable: DbBlueprint.Enable,
- CronConfig: DbBlueprint.CronConfig,
- IsManual: DbBlueprint.IsManual,
- SkipOnFail: DbBlueprint.SkipOnFail,
- Settings: []byte(DbBlueprint.Settings),
- Model: DbBlueprint.Model,
+ Name: dbBlueprint.Name,
+ Mode: dbBlueprint.Mode,
+ Plan: []byte(dbBlueprint.Plan),
+ Enable: dbBlueprint.Enable,
+ CronConfig: dbBlueprint.CronConfig,
+ IsManual: dbBlueprint.IsManual,
+ SkipOnFail: dbBlueprint.SkipOnFail,
+ Settings: []byte(dbBlueprint.Settings),
+ Model: dbBlueprint.Model,
+ Labels: labelList,
}
return &blueprint
}
@@ -110,6 +159,14 @@ func parseDbBlueprint(blueprint *models.Blueprint) *models.DbBlueprint {
Settings: string(blueprint.Settings),
Model: blueprint.Model,
}
+ dbBlueprint.Labels = []models.DbBlueprintLabel{}
+ for _, label := range blueprint.Labels {
+ dbBlueprint.Labels = append(dbBlueprint.Labels, models.DbBlueprintLabel{
+ // NOTICE: BlueprintId may be nil
+ BlueprintId: blueprint.ID,
+ Name: label,
+ })
+ }
return &dbBlueprint
}
diff --git a/services/pipeline.go b/services/pipeline.go
index 5bcbf3fd3..52a05d6f7 100644
--- a/services/pipeline.go
+++ b/services/pipeline.go
@@ -23,6 +23,7 @@ import (
"os"
"path/filepath"
"strings"
+ "sync"
"time"
"github.com/apache/incubator-devlake/errors"
@@ -40,13 +41,14 @@ var notificationService *NotificationService
var temporalClient client.Client
var globalPipelineLog = logger.Global.Nested("pipeline service")
-// PipelineQuery FIXME ...
+// PipelineQuery is a query for GetPipelines
type PipelineQuery struct {
Status string `form:"status"`
Pending int `form:"pending"`
Page int `form:"page"`
PageSize int `form:"pageSize"`
BlueprintId uint64 `uri:"blueprintId" form:"blueprint_id"`
+ Label string `form:"label"`
}
func pipelineServiceInit() {
@@ -155,6 +157,8 @@ func GetPipelineLogsArchivePath(pipeline *models.Pipeline) (string, errors.Error
// RunPipelineInQueue query pipeline from db and run it in a queue
func RunPipelineInQueue(pipelineMaxParallel int64) {
sema := semaphore.NewWeighted(pipelineMaxParallel)
+ runningParallelLabels := []string{}
+ var runningParallelLabelLock sync.Mutex
for {
globalPipelineLog.Info("acquire lock")
// start goroutine when sema lock ready and pipeline exist.
@@ -167,27 +171,58 @@ func RunPipelineInQueue(pipelineMaxParallel int64) {
dbPipeline := &models.DbPipeline{}
for {
cronLocker.Lock()
+ // prepare query to find an appropriate pipeline to execute
db.Where("status IN ?", []string{models.TASK_CREATED, models.TASK_RERUN}).
+ Joins(`left join _devlake_pipeline_labels ON
+ _devlake_pipeline_labels.pipeline_id = _devlake_pipelines.id AND
+ _devlake_pipeline_labels.name LIKE 'parallel/%' AND
+ _devlake_pipeline_labels.name in ?`, runningParallelLabels).
+ Group(`id`).
+ Having(`count(_devlake_pipeline_labels.name)=0`).
+ Select("id").
Order("id ASC").Limit(1).Find(dbPipeline)
cronLocker.Unlock()
if dbPipeline.ID != 0 {
- db.Model(&models.DbPipeline{}).Where("id = ?", dbPipeline.ID).Updates(map[string]interface{}{
- "status": models.TASK_RUNNING,
- "message": "",
- "began_at": time.Now(),
- })
break
}
time.Sleep(time.Second)
}
- go func(pipelineId uint64) {
+
+ db.Model(&models.DbPipeline{}).Where("id = ?", dbPipeline.ID).Updates(map[string]interface{}{
+ "status": models.TASK_RUNNING,
+ "message": "",
+ "began_at": time.Now(),
+ })
+ dbPipeline, err = GetDbPipeline(dbPipeline.ID)
+ if err != nil {
+ panic(err)
+ }
+
+ // add pipelineParallelLabels to runningParallelLabels
+ var pipelineParallelLabels []string
+ for _, dbLabel := range dbPipeline.Labels {
+ if strings.HasPrefix(dbLabel.Name, `parallel/`) {
+ pipelineParallelLabels = append(pipelineParallelLabels, dbLabel.Name)
+ }
+ }
+ runningParallelLabelLock.Lock()
+ runningParallelLabels = append(runningParallelLabels, pipelineParallelLabels...)
+ runningParallelLabelLock.Unlock()
+
+ go func(pipelineId uint64, parallelLabels []string) {
defer sema.Release(1)
- globalPipelineLog.Info("run pipeline, %d", pipelineId)
+ defer func() {
+ runningParallelLabelLock.Lock()
+ runningParallelLabels = utils.SliceRemove(runningParallelLabels, parallelLabels...)
+ runningParallelLabelLock.Unlock()
+ globalPipelineLog.Info("finish pipeline #%d, now runningParallelLabels is %s", pipelineId, runningParallelLabels)
+ }()
+ globalPipelineLog.Info("run pipeline, %d, now running runningParallelLabels are %s", pipelineId, runningParallelLabels)
err = runPipeline(pipelineId)
if err != nil {
globalPipelineLog.Error(err, "failed to run pipeline %d", pipelineId)
}
- }(dbPipeline.ID)
+ }(dbPipeline.ID, pipelineParallelLabels)
}
}
diff --git a/services/pipeline_helper.go b/services/pipeline_helper.go
index a8e38dc18..2d59f802f 100644
--- a/services/pipeline_helper.go
+++ b/services/pipeline_helper.go
@@ -52,10 +52,25 @@ func CreateDbPipeline(newPipeline *models.NewPipeline) (*models.DbPipeline, erro
if err != nil {
return nil, err
}
+
// save pipeline to database
if err := db.Create(&dbPipeline).Error; err != nil {
- globalPipelineLog.Error(err, "create pipline failed: %v", err)
- return nil, errors.Internal.Wrap(err, "create pipline failed")
+ globalPipelineLog.Error(err, "create pipeline failed: %v", err)
+ return nil, errors.Internal.Wrap(err, "create pipeline failed")
+ }
+
+ dbPipeline.Labels = []models.DbPipelineLabel{}
+ for _, label := range newPipeline.Labels {
+ dbPipeline.Labels = append(dbPipeline.Labels, models.DbPipelineLabel{
+ PipelineId: dbPipeline.ID,
+ Name: label,
+ })
+ }
+ if len(dbPipeline.Labels) > 0 {
+ if err := db.Create(&dbPipeline.Labels).Error; err != nil {
+ globalPipelineLog.Error(err, "create pipeline's labelModels failed: %v", err)
+ return nil, errors.Internal.Wrap(err, "create pipeline's labelModels failed")
+ }
}
// create tasks accordingly
@@ -99,28 +114,48 @@ func CreateDbPipeline(newPipeline *models.NewPipeline) (*models.DbPipeline, erro
// GetDbPipelines by query
func GetDbPipelines(query *PipelineQuery) ([]*models.DbPipeline, int64, errors.Error) {
dbPipelines := make([]*models.DbPipeline, 0)
- db := db.Model(dbPipelines).Order("id DESC")
+ dbQuery := db.Model(dbPipelines).Order("id DESC")
if query.BlueprintId != 0 {
- db = db.Where("blueprint_id = ?", query.BlueprintId)
+ dbQuery = dbQuery.Where("blueprint_id = ?", query.BlueprintId)
}
if query.Status != "" {
- db = db.Where("status = ?", query.Status)
+ dbQuery = dbQuery.Where("status = ?", query.Status)
}
if query.Pending > 0 {
- db = db.Where("finished_at is null and status != ?", "TASK_FAILED")
+ dbQuery = dbQuery.Where("finished_at is null and status != ?", "TASK_FAILED")
+ }
+ if query.Label != "" {
+ dbQuery = dbQuery.
+ Joins(`left join _devlake_pipeline_labels ON _devlake_pipeline_labels.pipeline_id = _devlake_pipelines.id`).
+ Where(`_devlake_pipeline_labels.name = ?`, query.Label)
}
var count int64
- err := db.Count(&count).Error
+ err := dbQuery.Count(&count).Error
if err != nil {
return nil, 0, errors.Default.Wrap(err, "error getting DB pipelines count")
}
- db = processDbClausesWithPager(db, query.PageSize, query.Page)
+ dbQuery = processDbClausesWithPager(dbQuery, query.PageSize, query.Page)
- err = db.Find(&dbPipelines).Error
+ err = dbQuery.Find(&dbPipelines).Error
if err != nil {
return nil, count, errors.Default.Wrap(err, "error finding DB pipelines")
}
+
+ var pipelineIds []uint64
+ for _, dbPipeline := range dbPipelines {
+ pipelineIds = append(pipelineIds, dbPipeline.ID)
+ }
+ dbLabels := []models.DbPipelineLabel{}
+ db.Where(`pipeline_id in ?`, pipelineIds).Find(&dbLabels)
+ dbLabelsMap := map[uint64][]models.DbPipelineLabel{}
+ for _, dbLabel := range dbLabels {
+ dbLabelsMap[dbLabel.PipelineId] = append(dbLabelsMap[dbLabel.PipelineId], dbLabel)
+ }
+ for _, dbPipeline := range dbPipelines {
+ dbPipeline.Labels = dbLabelsMap[dbPipeline.ID]
+ }
+
return dbPipelines, count, nil
}
@@ -134,11 +169,19 @@ func GetDbPipeline(pipelineId uint64) (*models.DbPipeline, errors.Error) {
}
return nil, errors.Internal.Wrap(err, "error getting the pipeline from database")
}
+ err = db.Find(&dbPipeline.Labels, "pipeline_id = ?", pipelineId).Error
+ if err != nil {
+ return nil, errors.Internal.Wrap(err, "error getting the pipeline from database")
+ }
return dbPipeline, nil
}
// parsePipeline converts DbPipeline to Pipeline
func parsePipeline(dbPipeline *models.DbPipeline) *models.Pipeline {
+ labelList := []string{}
+ for _, labelModel := range dbPipeline.Labels {
+ labelList = append(labelList, labelModel.Name)
+ }
pipeline := models.Pipeline{
Model: dbPipeline.Model,
Name: dbPipeline.Name,
@@ -152,11 +195,13 @@ func parsePipeline(dbPipeline *models.DbPipeline) *models.Pipeline {
Message: dbPipeline.Message,
SpentSeconds: dbPipeline.SpentSeconds,
Stage: dbPipeline.Stage,
+ Labels: labelList,
}
return &pipeline
}
// parseDbPipeline converts Pipeline to DbPipeline
+// nolint:unused
func parseDbPipeline(pipeline *models.Pipeline) *models.DbPipeline {
dbPipeline := models.DbPipeline{
Model: pipeline.Model,
@@ -172,6 +217,14 @@ func parseDbPipeline(pipeline *models.Pipeline) *models.DbPipeline {
SpentSeconds: pipeline.SpentSeconds,
Stage: pipeline.Stage,
}
+ dbPipeline.Labels = []models.DbPipelineLabel{}
+ for _, label := range pipeline.Labels {
+ dbPipeline.Labels = append(dbPipeline.Labels, models.DbPipelineLabel{
+ // NOTICE: PipelineId may be nil
+ PipelineId: pipeline.ID,
+ Name: label,
+ })
+ }
return &dbPipeline
}
diff --git a/services/pipeline_runner.go b/services/pipeline_runner.go
index 0ec968704..8eea52ebb 100644
--- a/services/pipeline_runner.go
+++ b/services/pipeline_runner.go
@@ -117,27 +117,28 @@ func runPipeline(pipelineId uint64) errors.Error {
if err != nil {
err = errors.Default.Wrap(err, fmt.Sprintf("Error running pipeline %d.", pipelineId))
}
- pipeline, e := GetPipeline(pipelineId)
+ dbPipeline, e := GetDbPipeline(pipelineId)
if e != nil {
return errors.Default.Wrap(err, fmt.Sprintf("Unable to get pipeline %d.", pipelineId))
}
// finished, update database
finishedAt := time.Now()
- pipeline.FinishedAt = &finishedAt
- pipeline.SpentSeconds = int(finishedAt.Unix() - pipeline.BeganAt.Unix())
+ dbPipeline.FinishedAt = &finishedAt
+ if dbPipeline.BeganAt != nil {
+ dbPipeline.SpentSeconds = int(finishedAt.Unix() - dbPipeline.BeganAt.Unix())
+ }
if err != nil {
- pipeline.Status = models.TASK_FAILED
+ dbPipeline.Status = models.TASK_FAILED
if lakeErr := errors.AsLakeErrorType(err); lakeErr != nil {
- pipeline.Message = lakeErr.Messages().Format()
+ dbPipeline.Message = lakeErr.Messages().Format()
} else {
- pipeline.Message = err.Error()
+ dbPipeline.Message = err.Error()
}
} else {
- pipeline.Status = models.TASK_COMPLETED
- pipeline.Message = ""
+ dbPipeline.Status = models.TASK_COMPLETED
+ dbPipeline.Message = ""
}
- dbPipeline := parseDbPipeline(pipeline)
- dbe := db.Model(dbPipeline).Select("finished_at", "spent_seconds", "status", "message").Updates(dbPipeline).Error
+ dbe := db.Model(dbPipeline).Updates(dbPipeline).Error
if dbe != nil {
globalPipelineLog.Error(dbe, "update pipeline state failed")
return errors.Convert(dbe)
diff --git a/utils/slice.go b/utils/slice.go
new file mode 100644
index 000000000..659a21078
--- /dev/null
+++ b/utils/slice.go
@@ -0,0 +1,37 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package utils
+
+// SliceRemove remove some items in old slice
+func SliceRemove[T ~int | ~string](source []T, toRemoves ...T) []T {
+ j := 0
+ for _, v := range source {
+ needRemove := false
+ for _, toRemove := range toRemoves {
+ if v == toRemove {
+ needRemove = true
+ break
+ }
+ }
+ if !needRemove {
+ source[j] = v
+ j++
+ }
+ }
+ return source[:j]
+}
diff --git a/utils/slice_test.go b/utils/slice_test.go
new file mode 100644
index 000000000..d55f8ae6e
--- /dev/null
+++ b/utils/slice_test.go
@@ -0,0 +1,30 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package utils
+
+import (
+ "github.com/stretchr/testify/assert"
+ "testing"
+)
+
+// TestSliceRemove test the SliceRemove
+func TestSliceRemove(t *testing.T) {
+ assert.Equal(t, []int{3, 4, 5}, SliceRemove([]int{1, 2, 3, 4, 5}, 1, 2))
+ assert.Equal(t, []int{1, 2, 4, 5}, SliceRemove([]int{1, 2, 3, 4, 5}, 3, 3))
+ assert.Equal(t, []string{`1`, `2`, `4`, `5`}, SliceRemove([]string{`1`, `2`, `3`, `4`, `5`}, `3`, `3`))
+}