You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@devlake.apache.org by zh...@apache.org on 2023/01/09 14:43:09 UTC
[incubator-devlake] branch main updated: feat: add TASK_PARTIAL for skip-on-fail pipelines (#4158)
This is an automated email from the ASF dual-hosted git repository.
zhangliang2022 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git
The following commit(s) were added to refs/heads/main by this push:
new f3b08f6f6 feat: add TASK_PARTIAL for skip-on-fail pipelines (#4158)
f3b08f6f6 is described below
commit f3b08f6f6acaec3a82e7a7ab6f1e1c124802b9a3
Author: Klesh Wong <zh...@merico.dev>
AuthorDate: Mon Jan 9 22:43:04 2023 +0800
feat: add TASK_PARTIAL for skip-on-fail pipelines (#4158)
* feat: add TASK_PARTIAL for skip-on-fail pipelines
* fix: handle failure when SkipOnFail=false
* fix: github action test cases
* fix: e2e test
* fix: e2e on github
* fix: migration fail
---
Makefile | 5 +-
models/task.go | 1 +
services/init.go | 28 ++++-
services/pipeline_runner.go | 81 +++++++++++++-
services/task.go | 1 +
test/api/task/task_test.go | 23 +---
test/example/greetings.go | 27 -----
test/{example/greetings_test.go => init.go} | 33 +++---
test/services/pipeline_runner_e2e_test.go | 166 ++++++++++++++++++++++++++++
9 files changed, 291 insertions(+), 74 deletions(-)
diff --git a/Makefile b/Makefile
index 0b825b2e1..0e1068d40 100644
--- a/Makefile
+++ b/Makefile
@@ -111,14 +111,11 @@ unit-test: mock build
set -e; for m in $$(go list ./... | egrep -v 'test|models|e2e'); do echo $$m; go test -timeout 60s -v $$m; done
e2e-test: build
- PLUGIN_DIR=$(shell readlink -f bin/plugins) go test -timeout 300s -v ./test/...
+ PLUGIN_DIR=$(shell readlink -f bin/plugins) go test -timeout 300s -p 1 -v ./test/...
e2e-plugins:
export ENV_PATH=$(shell readlink -f .env); set -e; for m in $$(go list ./plugins/... | egrep 'e2e'); do echo $$m; go test -timeout 300s -gcflags=all=-l -v $$m; done
-real-e2e-test:
- PLUGIN_DIR=$(shell readlink -f bin/plugins) go test -v ./e2e/...
-
lint:
golangci-lint run
diff --git a/models/task.go b/models/task.go
index f1933a35c..b0d1a8221 100644
--- a/models/task.go
+++ b/models/task.go
@@ -34,6 +34,7 @@ const (
TASK_COMPLETED = "TASK_COMPLETED"
TASK_FAILED = "TASK_FAILED"
TASK_CANCELLED = "TASK_CANCELLED"
+ TASK_PARTIAL = "TASK_PARTIAL"
)
var PendingTaskStatus = []string{TASK_CREATED, TASK_RERUN, TASK_RUNNING}
diff --git a/services/init.go b/services/init.go
index c51808cce..84cd639a7 100644
--- a/services/init.go
+++ b/services/init.go
@@ -45,8 +45,8 @@ var vld *validator.Validate
const failToCreateCronJob = "created cron job failed"
-// Init the services module
-func Init() {
+// InitResources creates resources needed by services module
+func InitResources() {
var err error
// basic resources initialization
@@ -56,9 +56,6 @@ func Init() {
log = basicRes.GetLogger()
db = basicRes.GetDal()
- // lock the database to avoid multiple devlake instances from sharing the same one
- lockDb()
-
// initialize db migrator
migrator, err = runner.InitMigrator(basicRes)
if err != nil {
@@ -66,11 +63,30 @@ func Init() {
}
log.Info("migration initialized")
migrator.Register(migrationscripts.All(), "Framework")
+}
+
+// GetBasicRes returns the core.BasicRes instance used by services module
+func GetBasicRes() core.BasicRes {
+ return basicRes
+}
+
+// GetMigrator returns the core.Migrator instance used by services module
+func GetMigrator() core.Migrator {
+ return migrator
+}
+// Init the services module
+func Init() {
+ InitResources()
+
+ // lock the database to avoid multiple devlake instances from sharing the same one
+ lockDb()
+
+ var err error
// now, load the plugins
err = runner.LoadPlugins(basicRes)
if err != nil {
- panic(err)
+ log.Error(err, "failed to load plugins")
}
// pull migration scripts from plugins to migrator
diff --git a/services/pipeline_runner.go b/services/pipeline_runner.go
index 441dd47ab..a84bc816e 100644
--- a/services/pipeline_runner.go
+++ b/services/pipeline_runner.go
@@ -27,6 +27,7 @@ import (
"github.com/apache/incubator-devlake/logger"
"github.com/apache/incubator-devlake/models"
"github.com/apache/incubator-devlake/plugins/core"
+ "github.com/apache/incubator-devlake/plugins/core/dal"
"github.com/apache/incubator-devlake/runner"
"github.com/apache/incubator-devlake/worker/app"
"go.temporal.io/sdk/client"
@@ -126,18 +127,88 @@ func runPipeline(pipelineId uint64) errors.Error {
dbPipeline.SpentSeconds = int(finishedAt.Unix() - dbPipeline.BeganAt.Unix())
}
if err != nil {
- dbPipeline.Status = models.TASK_FAILED
dbPipeline.Message = err.Error()
dbPipeline.ErrorName = err.Messages().Format()
- } else {
- dbPipeline.Status = models.TASK_COMPLETED
- dbPipeline.Message = ""
+ }
+ dbPipeline.Status, err = ComputePipelineStatus(dbPipeline)
+ if err != nil {
+ globalPipelineLog.Error(err, "compute pipeline status failed")
+ return err
}
err = db.Update(dbPipeline)
if err != nil {
globalPipelineLog.Error(err, "update pipeline state failed")
- return errors.Convert(err)
+ return err
}
// notify external webhook
return NotifyExternal(pipelineId)
}
+
+// ComputePipelineStatus determines pipleline status by its latest(rerun included) tasks statuses
+// 1. TASK_COMPLETED: all tasks were executed sucessfully
+// 2. TASK_FAILED: SkipOnFail=false with failed task(s)
+// 3. TASK_PARTIAL: SkipOnFail=true with failed task(s)
+func ComputePipelineStatus(pipeline *models.DbPipeline) (string, errors.Error) {
+ tasks, err := GetLatestTasksOfPipeline(pipeline)
+ if err != nil {
+ return "", err
+ }
+
+ succeeded, failed, pending, running := 0, 0, 0, 0
+
+ for _, task := range tasks {
+ if task.Status == models.TASK_COMPLETED {
+ succeeded += 1
+ } else if task.Status == models.TASK_FAILED || task.Status == models.TASK_CANCELLED {
+ failed += 1
+ } else if task.Status == models.TASK_RUNNING {
+ running += 1
+ } else {
+ pending += 1
+ }
+ }
+
+ if running > 0 || (pipeline.SkipOnFail && pending > 0) {
+ return "", errors.Default.New("unexpected status, did you call computePipelineStatus at a wrong timing?")
+ }
+
+ if failed == 0 {
+ return models.TASK_COMPLETED, nil
+ }
+ if pipeline.SkipOnFail && succeeded > 0 {
+ return models.TASK_PARTIAL, nil
+ }
+ return models.TASK_FAILED, nil
+}
+
+// GetLatestTasksOfPipeline returns latest tasks (reran tasks are excluding) of specified pipeline
+func GetLatestTasksOfPipeline(pipeline *models.DbPipeline) ([]*models.Task, errors.Error) {
+ task := &models.Task{}
+ cursor, err := db.Cursor(
+ dal.From(task),
+ dal.Where("pipeline_id = ?", pipeline.ID),
+ dal.Orderby("id DESC"), // sort it by id so we can hit the latest task first for the RERUNed row/col
+ )
+ if err != nil {
+ return nil, err
+ }
+ defer cursor.Close()
+ tasks := make([]*models.Task, 0, pipeline.TotalTasks)
+ // define a struct for composite key to dedupe RERUNed tasks
+ type rowcol struct{ row, col int }
+ memorized := make(map[rowcol]bool)
+ for cursor.Next() {
+ if e := db.Fetch(cursor, task); e != nil {
+ return nil, errors.Convert(e)
+ }
+ // dedupe reran tasks
+ key := rowcol{task.PipelineRow, task.PipelineCol}
+ if memorized[key] {
+ continue
+ }
+ memorized[key] = true
+ tasks = append(tasks, task)
+ task = &models.Task{}
+ }
+ return tasks, nil
+}
diff --git a/services/task.go b/services/task.go
index 20d3af508..99c734db1 100644
--- a/services/task.go
+++ b/services/task.go
@@ -122,6 +122,7 @@ func GetTasks(query *TaskQuery) ([]*models.Task, int64, errors.Error) {
}
// GetTasksWithLastStatus returns task list of the pipeline, only the most recently tasks would be returned
+// TODO: adopts GetLatestTasksOfPipeline
func GetTasksWithLastStatus(pipelineId uint64) ([]*models.Task, errors.Error) {
var tasks []*models.Task
err := db.All(&tasks, dal.Where("pipeline_id = ?", pipelineId), dal.Orderby("id DESC"))
diff --git a/test/api/task/task_test.go b/test/api/task/task_test.go
index 14e397ff6..ff74e4aa4 100644
--- a/test/api/task/task_test.go
+++ b/test/api/task/task_test.go
@@ -19,35 +19,20 @@ package task
import (
"encoding/json"
- "github.com/apache/incubator-devlake/errors"
"net/http"
"net/http/httptest"
"strings"
"testing"
+ "github.com/apache/incubator-devlake/errors"
+
"github.com/apache/incubator-devlake/api"
- "github.com/apache/incubator-devlake/config"
"github.com/apache/incubator-devlake/models"
- "github.com/apache/incubator-devlake/plugins/core"
- "github.com/apache/incubator-devlake/services"
"github.com/gin-gonic/gin"
"github.com/magiconair/properties/assert"
-)
-func init() {
- v := config.GetConfig()
- encKey := v.GetString(core.EncodeKeyEnvStr)
- if encKey == "" {
- // Randomly generate a bunch of encryption keys and set them to config
- encKey = core.RandomEncKey()
- v.Set(core.EncodeKeyEnvStr, encKey)
- err := config.WriteConfig(v)
- if err != nil {
- panic(err)
- }
- }
- services.Init()
-}
+ _ "github.com/apache/incubator-devlake/test"
+)
func TestNewTask(t *testing.T) {
r := gin.Default()
diff --git a/test/example/greetings.go b/test/example/greetings.go
deleted file mode 100644
index 3d4e41453..000000000
--- a/test/example/greetings.go
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one or more
-contributor license agreements. See the NOTICE file distributed with
-this work for additional information regarding copyright ownership.
-The ASF licenses this file to You under the Apache License, Version 2.0
-(the "License"); you may not use this file except in compliance with
-the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-*/
-
-package greetings
-
-import "fmt"
-
-// Hello returns a greeting for the named person.
-func Hello(name string) string {
- // Return a greeting that embeds the name in a message.
- message := fmt.Sprintf("Hi, %v. Welcome!", name)
- return message
-}
diff --git a/test/example/greetings_test.go b/test/init.go
similarity index 57%
rename from test/example/greetings_test.go
rename to test/init.go
index e6ad59a52..c114fc1fa 100644
--- a/test/example/greetings_test.go
+++ b/test/init.go
@@ -15,22 +15,29 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
-// https://golang.org/doc/tutorial/add-a-test
-
-package greetings
+package test
import (
- "regexp"
- "testing"
+ "github.com/apache/incubator-devlake/config"
+ "github.com/apache/incubator-devlake/plugins/core"
+ "github.com/apache/incubator-devlake/services"
)
-// TestHelloName calls greetings.Hello with a name, checking
-// for a valid return value.
-func TestHelloName(t *testing.T) {
- name := "Gladys"
- want := regexp.MustCompile(`\b` + name + `\b`)
- msg := Hello("Gladys")
- if !want.MatchString(msg) {
- t.Fatalf(`Hello("Gladys") = %q, %v`, msg, want)
+func init() {
+ v := config.GetConfig()
+ encKey := v.GetString(core.EncodeKeyEnvStr)
+ if encKey == "" {
+ // Randomly generate a bunch of encryption keys and set them to config
+ encKey = core.RandomEncKey()
+ v.Set(core.EncodeKeyEnvStr, encKey)
+ err := config.WriteConfig(v)
+ if err != nil {
+ panic(err)
+ }
+ }
+ services.InitResources()
+ err := services.GetMigrator().Execute()
+ if err != nil {
+ panic(err)
}
}
diff --git a/test/services/pipeline_runner_e2e_test.go b/test/services/pipeline_runner_e2e_test.go
new file mode 100644
index 000000000..eff85467a
--- /dev/null
+++ b/test/services/pipeline_runner_e2e_test.go
@@ -0,0 +1,166 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package services
+
+import (
+ "testing"
+
+ "github.com/apache/incubator-devlake/models"
+ "github.com/apache/incubator-devlake/services"
+ _ "github.com/apache/incubator-devlake/test"
+ "github.com/stretchr/testify/assert"
+)
+
+func TestComputePipelineStatus(t *testing.T) {
+ db := services.GetBasicRes().GetDal()
+ // insert fake tasks to datbase
+ pipeline := &models.DbPipeline{
+ TotalTasks: 3,
+ }
+ err := db.Create(pipeline)
+ assert.Nil(t, err)
+ assert.NotZero(t, pipeline.ID)
+
+ task_row1_col1 := &models.Task{
+ PipelineId: pipeline.ID,
+ PipelineRow: 1,
+ PipelineCol: 1,
+ Plugin: "github",
+ Status: models.TASK_COMPLETED,
+ }
+ err = db.Create(task_row1_col1)
+ assert.Nil(t, err)
+ assert.NotZero(t, task_row1_col1.ID)
+
+ task_row1_col2 := &models.Task{
+ PipelineId: pipeline.ID,
+ PipelineRow: 1,
+ PipelineCol: 2,
+ Plugin: "gitext",
+ Status: models.TASK_FAILED,
+ }
+ err = db.Create(task_row1_col2)
+ assert.Nil(t, err)
+ assert.NotZero(t, task_row1_col2.ID)
+
+ task_row2_col1 := &models.Task{
+ PipelineId: pipeline.ID,
+ PipelineRow: 2,
+ PipelineCol: 1,
+ Plugin: "refdiff",
+ Status: models.TASK_CREATED,
+ }
+ err = db.Create(task_row2_col1)
+ assert.Nil(t, err)
+ assert.NotZero(t, task_row2_col1.ID)
+
+ // pipeline.status == "failed" if SkipOnFailed=false and any tasks failed
+ status, err := services.ComputePipelineStatus(pipeline)
+ if !assert.Nil(t, err) {
+ println(err.Messages().Format())
+ }
+ assert.Equal(t, models.TASK_FAILED, status)
+
+ // pipeline.status == "completed" if all latest tasks were succeeded
+ task_row1_col2.Status = models.TASK_COMPLETED
+ err = db.Update(task_row1_col2)
+ assert.Nil(t, err)
+ task_row2_col1.Status = models.TASK_COMPLETED
+ err = db.Update(task_row2_col1)
+ assert.Nil(t, err)
+ status, err = services.ComputePipelineStatus(pipeline)
+ if !assert.Nil(t, err) {
+ println(err.Messages().Format())
+ }
+ assert.Equal(t, models.TASK_COMPLETED, status)
+
+ pipeline.SkipOnFail = true
+ err = db.Update(pipeline)
+ assert.Nil(t, err)
+ status, err = services.ComputePipelineStatus(pipeline)
+ assert.Nil(t, err)
+ assert.Equal(t, models.TASK_COMPLETED, status)
+
+ // pipeline.status == "partial" if SkipOnFail=true and some were succeeded while others not
+ task_row1_col1.Status = models.TASK_FAILED
+ err = db.Update(task_row1_col1)
+ assert.Nil(t, err)
+ status, err = services.ComputePipelineStatus(pipeline)
+ assert.Nil(t, err)
+ assert.Equal(t, models.TASK_PARTIAL, status)
+
+ // pipeline.status == "failed" is SkipOnFail=true and all tasks were fail
+ task_row1_col1.Status = models.TASK_FAILED
+ err = db.Update(task_row1_col1)
+ assert.Nil(t, err)
+ task_row1_col2.Status = models.TASK_FAILED
+ err = db.Update(task_row1_col2)
+ assert.Nil(t, err)
+ task_row2_col1.Status = models.TASK_FAILED
+ err = db.Update(task_row2_col1)
+ assert.Nil(t, err)
+ status, err = services.ComputePipelineStatus(pipeline)
+ assert.Nil(t, err)
+ assert.Equal(t, models.TASK_FAILED, status)
+
+ // pipeline.status == "completed" if all failed tasks were reran successfully
+ task_row1_col1_rerun := &models.Task{
+ PipelineId: pipeline.ID,
+ PipelineRow: 1,
+ PipelineCol: 1,
+ Plugin: "github",
+ Status: models.TASK_COMPLETED,
+ }
+ err = db.Create(task_row1_col1_rerun)
+ assert.Nil(t, err)
+ assert.NotZero(t, task_row1_col1_rerun.ID)
+
+ task_row1_col2_rerun := &models.Task{
+ PipelineId: pipeline.ID,
+ PipelineRow: 1,
+ PipelineCol: 2,
+ Plugin: "gitext",
+ Status: models.TASK_COMPLETED,
+ }
+ err = db.Create(task_row1_col2_rerun)
+ assert.Nil(t, err)
+ assert.NotZero(t, task_row1_col2_rerun.ID)
+
+ task_row2_col1_rerun := &models.Task{
+ PipelineId: pipeline.ID,
+ PipelineRow: 2,
+ PipelineCol: 1,
+ Plugin: "refdiff",
+ Status: models.TASK_COMPLETED,
+ }
+ err = db.Create(task_row2_col1_rerun)
+ assert.Nil(t, err)
+ assert.NotZero(t, task_row2_col1.ID)
+
+ status, err = services.ComputePipelineStatus(pipeline)
+ assert.Nil(t, err)
+ assert.Equal(t, models.TASK_COMPLETED, status)
+
+ // pipeline.status == "partial" if there were failed task in reran tasks
+ task_row1_col1_rerun.Status = models.TASK_CANCELLED
+ err = db.Update(task_row1_col1_rerun)
+ assert.Nil(t, err)
+ status, err = services.ComputePipelineStatus(pipeline)
+ assert.Nil(t, err)
+ assert.Equal(t, models.TASK_PARTIAL, status)
+}