You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@devlake.apache.org by wa...@apache.org on 2022/09/29 16:53:38 UTC

[incubator-devlake] branch main updated: fix: pipeline showed succeeded when tasks actually failed

This is an automated email from the ASF dual-hosted git repository.

warren pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git


The following commit(s) were added to refs/heads/main by this push:
     new a7acfdd4 fix: pipeline showed succeeded when tasks actually failed
a7acfdd4 is described below

commit a7acfdd402b148cffc579f25e0fbebdef5ebe5c1
Author: Klesh Wong <zh...@merico.dev>
AuthorDate: Fri Sep 30 00:32:26 2022 +0800

    fix: pipeline showed succeeded when tasks actually failed
---
 runner/run_pipeline.go      | 7 ++++++-
 services/init.go            | 5 ++++-
 services/pipeline.go        | 2 ++
 services/pipeline_helper.go | 2 ++
 4 files changed, 14 insertions(+), 2 deletions(-)

diff --git a/runner/run_pipeline.go b/runner/run_pipeline.go
index 856709d5..1cd5e408 100644
--- a/runner/run_pipeline.go
+++ b/runner/run_pipeline.go
@@ -18,9 +18,11 @@ limitations under the License.
 package runner
 
 import (
-	"github.com/apache/incubator-devlake/errors"
+	"fmt"
 	"time"
 
+	"github.com/apache/incubator-devlake/errors"
+
 	"github.com/apache/incubator-devlake/models"
 	"github.com/apache/incubator-devlake/plugins/core"
 	"github.com/spf13/viper"
@@ -48,6 +50,9 @@ func RunPipeline(
 	if err != nil {
 		return errors.Convert(err)
 	}
+	if len(tasks) != dbPipeline.TotalTasks {
+		return errors.Internal.New(fmt.Sprintf("expected total tasks to be %v, got %v", dbPipeline.TotalTasks, len(tasks)))
+	}
 	// convert to 2d array
 	taskIds := make([][]uint64, 0)
 	for _, task := range tasks {
diff --git a/services/init.go b/services/init.go
index 7b3b8940..cb1a4319 100644
--- a/services/init.go
+++ b/services/init.go
@@ -19,9 +19,11 @@ package services
 
 import (
 	"context"
-	"github.com/apache/incubator-devlake/errors"
+	"sync"
 	"time"
 
+	"github.com/apache/incubator-devlake/errors"
+
 	"github.com/apache/incubator-devlake/config"
 	"github.com/apache/incubator-devlake/logger"
 	"github.com/apache/incubator-devlake/migration"
@@ -38,6 +40,7 @@ var db *gorm.DB
 var cronManager *cron.Cron
 var log core.Logger
 var migrationRequireConfirmation bool
+var cronLocker sync.Mutex
 
 const failToCreateCronJob = "created cron job failed"
 
diff --git a/services/pipeline.go b/services/pipeline.go
index a4bced5b..17c1a65b 100644
--- a/services/pipeline.go
+++ b/services/pipeline.go
@@ -167,9 +167,11 @@ func RunPipelineInQueue(pipelineMaxParallel int64) {
 		globalPipelineLog.Info("get lock and wait pipeline")
 		dbPipeline := &models.DbPipeline{}
 		for {
+			cronLocker.Lock()
 			db.Where("status = ?", models.TASK_CREATED).
 				Not(startedPipelineIds).
 				Order("id ASC").Limit(1).Find(dbPipeline)
+			cronLocker.Unlock()
 			if dbPipeline.ID != 0 {
 				break
 			}
diff --git a/services/pipeline_helper.go b/services/pipeline_helper.go
index 9eff3fd8..203f475b 100644
--- a/services/pipeline_helper.go
+++ b/services/pipeline_helper.go
@@ -29,6 +29,8 @@ import (
 
 // CreateDbPipeline returns a NewPipeline
 func CreateDbPipeline(newPipeline *models.NewPipeline) (*models.DbPipeline, errors.Error) {
+	cronLocker.Lock()
+	defer cronLocker.Unlock()
 	planByte, err := errors.Convert01(json.Marshal(newPipeline.Plan))
 	if err != nil {
 		return nil, err