You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@devlake.apache.org by ma...@apache.org on 2023/03/24 12:57:46 UTC
[incubator-devlake] branch main updated: fix: add log for cron (#4766)
This is an automated email from the ASF dual-hosted git repository.
mappjzc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git
The following commit(s) were added to refs/heads/main by this push:
new 1e38ef107 fix: add log for cron (#4766)
1e38ef107 is described below
commit 1e38ef107f1fad2f742b336b892f8bd0fcf62ce5
Author: mappjzc <zh...@merico.dev>
AuthorDate: Fri Mar 24 20:57:42 2023 +0800
fix: add log for cron (#4766)
Add log to blueprint cron.
Add log if the pipelines padding.
Nddtfjiang <zh...@merico.dev>
---
backend/server/api/blueprints/blueprints.go | 9 +++------
backend/server/services/blueprint.go | 7 ++++---
backend/server/services/pipeline_helper.go | 29 +++++++++++++++++++++++------
3 files changed, 30 insertions(+), 15 deletions(-)
diff --git a/backend/server/api/blueprints/blueprints.go b/backend/server/api/blueprints/blueprints.go
index e25b2cf0e..9810ba585 100644
--- a/backend/server/api/blueprints/blueprints.go
+++ b/backend/server/api/blueprints/blueprints.go
@@ -18,12 +18,13 @@ limitations under the License.
package blueprints
import (
+ "net/http"
+ "strconv"
+
"github.com/apache/incubator-devlake/core/errors"
"github.com/apache/incubator-devlake/core/models"
"github.com/apache/incubator-devlake/server/api/shared"
"github.com/apache/incubator-devlake/server/services"
- "net/http"
- "strconv"
"github.com/gin-gonic/gin"
)
@@ -179,10 +180,6 @@ func Trigger(c *gin.Context) {
return
}
pipeline, err := services.TriggerBlueprint(id)
- if errors.Is(err, services.ErrBlueprintRunning) {
- shared.ApiOutputError(c, errors.BadInput.Wrap(err, "the blueprint is running"))
- return
- }
if err != nil {
shared.ApiOutputError(c, errors.Default.Wrap(err, "error triggering blueprint"))
return
diff --git a/backend/server/services/blueprint.go b/backend/server/services/blueprint.go
index 504853b33..17819f133 100644
--- a/backend/server/services/blueprint.go
+++ b/backend/server/services/blueprint.go
@@ -223,9 +223,9 @@ func ReloadBlueprints(c *cron.Cron) errors.Error {
if _, err := c.AddFunc(blueprint.CronConfig, func() {
pipeline, err := createPipelineByBlueprint(blueprint)
if err != nil {
- blueprintLog.Error(err, "run cron job failed")
+ blueprintLog.Error(err, fmt.Sprintf("run cron job failed on blueprint:[%d][%s]", blueprint.ID, blueprint.Name))
} else {
- blueprintLog.Info("Run new cron job successfully, pipeline id: %d", pipeline.ID)
+ blueprintLog.Info("Run new cron job successfully,pipeline id:[%d] pipeline id:[%d]", blueprint.ID, pipeline.ID)
}
}); err != nil {
blueprintLog.Error(err, failToCreateCronJob)
@@ -248,6 +248,7 @@ func createPipelineByBlueprint(blueprint *models.Blueprint) (*models.Pipeline, e
plan, err = blueprint.UnmarshalPlan()
}
if err != nil {
+ blueprintLog.Error(err, fmt.Sprintf("failed to MakePlanForBlueprint on blueprint:[%d][%s]", blueprint.ID, blueprint.Name))
return nil, err
}
newPipeline := models.NewPipeline{}
@@ -259,7 +260,7 @@ func createPipelineByBlueprint(blueprint *models.Blueprint) (*models.Pipeline, e
pipeline, err := CreatePipeline(&newPipeline)
// Return all created tasks to the User
if err != nil {
- blueprintLog.Error(err, failToCreateCronJob)
+ blueprintLog.Error(err, fmt.Sprintf("%s on blueprint:[%d][%s]", failToCreateCronJob, blueprint.ID, blueprint.Name))
return nil, errors.Convert(err)
}
return pipeline, nil
diff --git a/backend/server/services/pipeline_helper.go b/backend/server/services/pipeline_helper.go
index 03f3e322b..c0dd11cb6 100644
--- a/backend/server/services/pipeline_helper.go
+++ b/backend/server/services/pipeline_helper.go
@@ -20,28 +20,45 @@ package services
import (
"encoding/json"
"fmt"
+
"github.com/apache/incubator-devlake/core/dal"
"github.com/apache/incubator-devlake/core/errors"
"github.com/apache/incubator-devlake/core/models"
)
-// ErrBlueprintRunning indicates there is a running pipeline with the specified blueprint_id
-var ErrBlueprintRunning = errors.Default.New("the blueprint is running")
-
// CreateDbPipeline returns a NewPipeline
func CreateDbPipeline(newPipeline *models.NewPipeline) (*models.Pipeline, errors.Error) {
cronLocker.Lock()
defer cronLocker.Unlock()
if newPipeline.BlueprintId > 0 {
- count, err := db.Count(
+ clauses := []dal.Clause{
dal.From(&models.Pipeline{}),
dal.Where("blueprint_id = ? AND status IN ?", newPipeline.BlueprintId, models.PendingTaskStatus),
- )
+ }
+ count, err := db.Count(clauses...)
if err != nil {
return nil, errors.Default.Wrap(err, "query pipelines error")
}
+ // some pipeline is ruunning , get the detail and output them.
if count > 0 {
- return nil, ErrBlueprintRunning
+ cursor, err := db.Cursor(clauses...)
+ if err != nil {
+ return nil, errors.Default.Wrap(err, fmt.Sprintf("query pipelines error but count it success. count:%d", count))
+ }
+ defer cursor.Close()
+ fetched := 0
+ errstr := ""
+ for cursor.Next() {
+ pipeline := &models.Pipeline{}
+ err = db.Fetch(cursor, pipeline)
+ if err != nil {
+ return nil, errors.Default.Wrap(err, fmt.Sprintf("failed to Fetch pipelines fetched:[%d],count:[%d]", fetched, count))
+ }
+ fetched++
+
+ errstr += fmt.Sprintf("pipeline:[%d] on state:[%s] Pending it\r\n", pipeline.ID, pipeline.Status)
+ }
+ return nil, errors.Default.New(fmt.Sprintf("the blueprint is running fetched:[%d],count:[%d]:\r\n%s", fetched, count, errstr))
}
}
planByte, err := errors.Convert01(json.Marshal(newPipeline.Plan))