You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@devlake.apache.org by ma...@apache.org on 2023/03/24 12:57:46 UTC

[incubator-devlake] branch main updated: fix: add log for cron (#4766)

This is an automated email from the ASF dual-hosted git repository.

mappjzc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git


The following commit(s) were added to refs/heads/main by this push:
     new 1e38ef107 fix: add log for cron (#4766)
1e38ef107 is described below

commit 1e38ef107f1fad2f742b336b892f8bd0fcf62ce5
Author: mappjzc <zh...@merico.dev>
AuthorDate: Fri Mar 24 20:57:42 2023 +0800

    fix: add log for cron (#4766)
    
    Add log to blueprint cron.
    Add log if the pipelines padding.
    
    Nddtfjiang <zh...@merico.dev>
---
 backend/server/api/blueprints/blueprints.go |  9 +++------
 backend/server/services/blueprint.go        |  7 ++++---
 backend/server/services/pipeline_helper.go  | 29 +++++++++++++++++++++++------
 3 files changed, 30 insertions(+), 15 deletions(-)

diff --git a/backend/server/api/blueprints/blueprints.go b/backend/server/api/blueprints/blueprints.go
index e25b2cf0e..9810ba585 100644
--- a/backend/server/api/blueprints/blueprints.go
+++ b/backend/server/api/blueprints/blueprints.go
@@ -18,12 +18,13 @@ limitations under the License.
 package blueprints
 
 import (
+	"net/http"
+	"strconv"
+
 	"github.com/apache/incubator-devlake/core/errors"
 	"github.com/apache/incubator-devlake/core/models"
 	"github.com/apache/incubator-devlake/server/api/shared"
 	"github.com/apache/incubator-devlake/server/services"
-	"net/http"
-	"strconv"
 
 	"github.com/gin-gonic/gin"
 )
@@ -179,10 +180,6 @@ func Trigger(c *gin.Context) {
 		return
 	}
 	pipeline, err := services.TriggerBlueprint(id)
-	if errors.Is(err, services.ErrBlueprintRunning) {
-		shared.ApiOutputError(c, errors.BadInput.Wrap(err, "the blueprint is running"))
-		return
-	}
 	if err != nil {
 		shared.ApiOutputError(c, errors.Default.Wrap(err, "error triggering blueprint"))
 		return
diff --git a/backend/server/services/blueprint.go b/backend/server/services/blueprint.go
index 504853b33..17819f133 100644
--- a/backend/server/services/blueprint.go
+++ b/backend/server/services/blueprint.go
@@ -223,9 +223,9 @@ func ReloadBlueprints(c *cron.Cron) errors.Error {
 		if _, err := c.AddFunc(blueprint.CronConfig, func() {
 			pipeline, err := createPipelineByBlueprint(blueprint)
 			if err != nil {
-				blueprintLog.Error(err, "run cron job failed")
+				blueprintLog.Error(err, fmt.Sprintf("run cron job failed on blueprint:[%d][%s]", blueprint.ID, blueprint.Name))
 			} else {
-				blueprintLog.Info("Run new cron job successfully, pipeline id: %d", pipeline.ID)
+				blueprintLog.Info("Run new cron job successfully,pipeline id:[%d] pipeline id:[%d]", blueprint.ID, pipeline.ID)
 			}
 		}); err != nil {
 			blueprintLog.Error(err, failToCreateCronJob)
@@ -248,6 +248,7 @@ func createPipelineByBlueprint(blueprint *models.Blueprint) (*models.Pipeline, e
 		plan, err = blueprint.UnmarshalPlan()
 	}
 	if err != nil {
+		blueprintLog.Error(err, fmt.Sprintf("failed to MakePlanForBlueprint on blueprint:[%d][%s]", blueprint.ID, blueprint.Name))
 		return nil, err
 	}
 	newPipeline := models.NewPipeline{}
@@ -259,7 +260,7 @@ func createPipelineByBlueprint(blueprint *models.Blueprint) (*models.Pipeline, e
 	pipeline, err := CreatePipeline(&newPipeline)
 	// Return all created tasks to the User
 	if err != nil {
-		blueprintLog.Error(err, failToCreateCronJob)
+		blueprintLog.Error(err, fmt.Sprintf("%s on blueprint:[%d][%s]", failToCreateCronJob, blueprint.ID, blueprint.Name))
 		return nil, errors.Convert(err)
 	}
 	return pipeline, nil
diff --git a/backend/server/services/pipeline_helper.go b/backend/server/services/pipeline_helper.go
index 03f3e322b..c0dd11cb6 100644
--- a/backend/server/services/pipeline_helper.go
+++ b/backend/server/services/pipeline_helper.go
@@ -20,28 +20,45 @@ package services
 import (
 	"encoding/json"
 	"fmt"
+
 	"github.com/apache/incubator-devlake/core/dal"
 	"github.com/apache/incubator-devlake/core/errors"
 	"github.com/apache/incubator-devlake/core/models"
 )
 
-// ErrBlueprintRunning indicates there is a running pipeline with the specified blueprint_id
-var ErrBlueprintRunning = errors.Default.New("the blueprint is running")
-
 // CreateDbPipeline returns a NewPipeline
 func CreateDbPipeline(newPipeline *models.NewPipeline) (*models.Pipeline, errors.Error) {
 	cronLocker.Lock()
 	defer cronLocker.Unlock()
 	if newPipeline.BlueprintId > 0 {
-		count, err := db.Count(
+		clauses := []dal.Clause{
 			dal.From(&models.Pipeline{}),
 			dal.Where("blueprint_id = ? AND status IN ?", newPipeline.BlueprintId, models.PendingTaskStatus),
-		)
+		}
+		count, err := db.Count(clauses...)
 		if err != nil {
 			return nil, errors.Default.Wrap(err, "query pipelines error")
 		}
+		// some pipeline is ruunning , get the detail and output them.
 		if count > 0 {
-			return nil, ErrBlueprintRunning
+			cursor, err := db.Cursor(clauses...)
+			if err != nil {
+				return nil, errors.Default.Wrap(err, fmt.Sprintf("query pipelines error but count it success. count:%d", count))
+			}
+			defer cursor.Close()
+			fetched := 0
+			errstr := ""
+			for cursor.Next() {
+				pipeline := &models.Pipeline{}
+				err = db.Fetch(cursor, pipeline)
+				if err != nil {
+					return nil, errors.Default.Wrap(err, fmt.Sprintf("failed to Fetch pipelines fetched:[%d],count:[%d]", fetched, count))
+				}
+				fetched++
+
+				errstr += fmt.Sprintf("pipeline:[%d] on state:[%s] Pending it\r\n", pipeline.ID, pipeline.Status)
+			}
+			return nil, errors.Default.New(fmt.Sprintf("the blueprint is running fetched:[%d],count:[%d]:\r\n%s", fetched, count, errstr))
 		}
 	}
 	planByte, err := errors.Convert01(json.Marshal(newPipeline.Plan))