You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@devlake.apache.org by zh...@apache.org on 2023/01/09 14:43:09 UTC

[incubator-devlake] branch main updated: feat: add TASK_PARTIAL for skip-on-fail pipelines (#4158)

This is an automated email from the ASF dual-hosted git repository.

zhangliang2022 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git


The following commit(s) were added to refs/heads/main by this push:
     new f3b08f6f6 feat: add TASK_PARTIAL for skip-on-fail pipelines (#4158)
f3b08f6f6 is described below

commit f3b08f6f6acaec3a82e7a7ab6f1e1c124802b9a3
Author: Klesh Wong <zh...@merico.dev>
AuthorDate: Mon Jan 9 22:43:04 2023 +0800

    feat: add TASK_PARTIAL for skip-on-fail pipelines (#4158)
    
    * feat: add TASK_PARTIAL for skip-on-fail pipelines
    
    * fix: handle failure when SkipOnFail=false
    
    * fix: github action test cases
    
    * fix: e2e test
    
    * fix: e2e on github
    
    * fix: migration fail
---
 Makefile                                    |   5 +-
 models/task.go                              |   1 +
 services/init.go                            |  28 ++++-
 services/pipeline_runner.go                 |  81 +++++++++++++-
 services/task.go                            |   1 +
 test/api/task/task_test.go                  |  23 +---
 test/example/greetings.go                   |  27 -----
 test/{example/greetings_test.go => init.go} |  33 +++---
 test/services/pipeline_runner_e2e_test.go   | 166 ++++++++++++++++++++++++++++
 9 files changed, 291 insertions(+), 74 deletions(-)

diff --git a/Makefile b/Makefile
index 0b825b2e1..0e1068d40 100644
--- a/Makefile
+++ b/Makefile
@@ -111,14 +111,11 @@ unit-test: mock build
 	set -e; for m in $$(go list ./... | egrep -v 'test|models|e2e'); do echo $$m; go test -timeout 60s -v $$m; done
 
 e2e-test: build
-	PLUGIN_DIR=$(shell readlink -f bin/plugins) go test -timeout 300s -v ./test/...
+	PLUGIN_DIR=$(shell readlink -f bin/plugins) go test -timeout 300s -p 1 -v ./test/...
 
 e2e-plugins:
 	export ENV_PATH=$(shell readlink -f .env); set -e; for m in $$(go list ./plugins/... | egrep 'e2e'); do echo $$m; go test -timeout 300s -gcflags=all=-l -v $$m; done
 
-real-e2e-test:
-	PLUGIN_DIR=$(shell readlink -f bin/plugins) go test -v ./e2e/...
-
 lint:
 	golangci-lint run
 
diff --git a/models/task.go b/models/task.go
index f1933a35c..b0d1a8221 100644
--- a/models/task.go
+++ b/models/task.go
@@ -34,6 +34,7 @@ const (
 	TASK_COMPLETED = "TASK_COMPLETED"
 	TASK_FAILED    = "TASK_FAILED"
 	TASK_CANCELLED = "TASK_CANCELLED"
+	TASK_PARTIAL   = "TASK_PARTIAL"
 )
 
 var PendingTaskStatus = []string{TASK_CREATED, TASK_RERUN, TASK_RUNNING}
diff --git a/services/init.go b/services/init.go
index c51808cce..84cd639a7 100644
--- a/services/init.go
+++ b/services/init.go
@@ -45,8 +45,8 @@ var vld *validator.Validate
 
 const failToCreateCronJob = "created cron job failed"
 
-// Init the services module
-func Init() {
+// InitResources creates resources needed by services module
+func InitResources() {
 	var err error
 
 	// basic resources initialization
@@ -56,9 +56,6 @@ func Init() {
 	log = basicRes.GetLogger()
 	db = basicRes.GetDal()
 
-	// lock the database to avoid multiple devlake instances from sharing the same one
-	lockDb()
-
 	// initialize db migrator
 	migrator, err = runner.InitMigrator(basicRes)
 	if err != nil {
@@ -66,11 +63,30 @@ func Init() {
 	}
 	log.Info("migration initialized")
 	migrator.Register(migrationscripts.All(), "Framework")
+}
+
+// GetBasicRes returns the core.BasicRes instance used by services module
+func GetBasicRes() core.BasicRes {
+	return basicRes
+}
+
+// GetMigrator returns the core.Migrator instance used by services module
+func GetMigrator() core.Migrator {
+	return migrator
+}
 
+// Init the services module
+func Init() {
+	InitResources()
+
+	// lock the database to avoid multiple devlake instances from sharing the same one
+	lockDb()
+
+	var err error
 	// now, load the plugins
 	err = runner.LoadPlugins(basicRes)
 	if err != nil {
-		panic(err)
+		log.Error(err, "failed to load plugins")
 	}
 
 	// pull migration scripts from plugins to migrator
diff --git a/services/pipeline_runner.go b/services/pipeline_runner.go
index 441dd47ab..a84bc816e 100644
--- a/services/pipeline_runner.go
+++ b/services/pipeline_runner.go
@@ -27,6 +27,7 @@ import (
 	"github.com/apache/incubator-devlake/logger"
 	"github.com/apache/incubator-devlake/models"
 	"github.com/apache/incubator-devlake/plugins/core"
+	"github.com/apache/incubator-devlake/plugins/core/dal"
 	"github.com/apache/incubator-devlake/runner"
 	"github.com/apache/incubator-devlake/worker/app"
 	"go.temporal.io/sdk/client"
@@ -126,18 +127,88 @@ func runPipeline(pipelineId uint64) errors.Error {
 		dbPipeline.SpentSeconds = int(finishedAt.Unix() - dbPipeline.BeganAt.Unix())
 	}
 	if err != nil {
-		dbPipeline.Status = models.TASK_FAILED
 		dbPipeline.Message = err.Error()
 		dbPipeline.ErrorName = err.Messages().Format()
-	} else {
-		dbPipeline.Status = models.TASK_COMPLETED
-		dbPipeline.Message = ""
+	}
+	dbPipeline.Status, err = ComputePipelineStatus(dbPipeline)
+	if err != nil {
+		globalPipelineLog.Error(err, "compute pipeline status failed")
+		return err
 	}
 	err = db.Update(dbPipeline)
 	if err != nil {
 		globalPipelineLog.Error(err, "update pipeline state failed")
-		return errors.Convert(err)
+		return err
 	}
 	// notify external webhook
 	return NotifyExternal(pipelineId)
 }
+
+// ComputePipelineStatus determines pipleline status by its latest(rerun included) tasks statuses
+// 1. TASK_COMPLETED: all tasks were executed sucessfully
+// 2. TASK_FAILED: SkipOnFail=false with failed task(s)
+// 3. TASK_PARTIAL: SkipOnFail=true with failed task(s)
+func ComputePipelineStatus(pipeline *models.DbPipeline) (string, errors.Error) {
+	tasks, err := GetLatestTasksOfPipeline(pipeline)
+	if err != nil {
+		return "", err
+	}
+
+	succeeded, failed, pending, running := 0, 0, 0, 0
+
+	for _, task := range tasks {
+		if task.Status == models.TASK_COMPLETED {
+			succeeded += 1
+		} else if task.Status == models.TASK_FAILED || task.Status == models.TASK_CANCELLED {
+			failed += 1
+		} else if task.Status == models.TASK_RUNNING {
+			running += 1
+		} else {
+			pending += 1
+		}
+	}
+
+	if running > 0 || (pipeline.SkipOnFail && pending > 0) {
+		return "", errors.Default.New("unexpected status, did you call computePipelineStatus at a wrong timing?")
+	}
+
+	if failed == 0 {
+		return models.TASK_COMPLETED, nil
+	}
+	if pipeline.SkipOnFail && succeeded > 0 {
+		return models.TASK_PARTIAL, nil
+	}
+	return models.TASK_FAILED, nil
+}
+
+// GetLatestTasksOfPipeline returns latest tasks (reran tasks are excluding) of specified pipeline
+func GetLatestTasksOfPipeline(pipeline *models.DbPipeline) ([]*models.Task, errors.Error) {
+	task := &models.Task{}
+	cursor, err := db.Cursor(
+		dal.From(task),
+		dal.Where("pipeline_id = ?", pipeline.ID),
+		dal.Orderby("id DESC"), // sort it by id so we can hit the latest task first for the RERUNed row/col
+	)
+	if err != nil {
+		return nil, err
+	}
+	defer cursor.Close()
+	tasks := make([]*models.Task, 0, pipeline.TotalTasks)
+	// define a struct for composite key to dedupe RERUNed tasks
+	type rowcol struct{ row, col int }
+	memorized := make(map[rowcol]bool)
+	for cursor.Next() {
+		if e := db.Fetch(cursor, task); e != nil {
+			return nil, errors.Convert(e)
+		}
+		// dedupe reran tasks
+		key := rowcol{task.PipelineRow, task.PipelineCol}
+		if memorized[key] {
+			continue
+		}
+		memorized[key] = true
+		tasks = append(tasks, task)
+		task = &models.Task{}
+	}
+	return tasks, nil
+}
diff --git a/services/task.go b/services/task.go
index 20d3af508..99c734db1 100644
--- a/services/task.go
+++ b/services/task.go
@@ -122,6 +122,7 @@ func GetTasks(query *TaskQuery) ([]*models.Task, int64, errors.Error) {
 }
 
 // GetTasksWithLastStatus returns task list of the pipeline, only the most recently tasks would be returned
+// TODO: adopts GetLatestTasksOfPipeline
 func GetTasksWithLastStatus(pipelineId uint64) ([]*models.Task, errors.Error) {
 	var tasks []*models.Task
 	err := db.All(&tasks, dal.Where("pipeline_id = ?", pipelineId), dal.Orderby("id DESC"))
diff --git a/test/api/task/task_test.go b/test/api/task/task_test.go
index 14e397ff6..ff74e4aa4 100644
--- a/test/api/task/task_test.go
+++ b/test/api/task/task_test.go
@@ -19,35 +19,20 @@ package task
 
 import (
 	"encoding/json"
-	"github.com/apache/incubator-devlake/errors"
 	"net/http"
 	"net/http/httptest"
 	"strings"
 	"testing"
 
+	"github.com/apache/incubator-devlake/errors"
+
 	"github.com/apache/incubator-devlake/api"
-	"github.com/apache/incubator-devlake/config"
 	"github.com/apache/incubator-devlake/models"
-	"github.com/apache/incubator-devlake/plugins/core"
-	"github.com/apache/incubator-devlake/services"
 	"github.com/gin-gonic/gin"
 	"github.com/magiconair/properties/assert"
-)
 
-func init() {
-	v := config.GetConfig()
-	encKey := v.GetString(core.EncodeKeyEnvStr)
-	if encKey == "" {
-		// Randomly generate a bunch of encryption keys and set them to config
-		encKey = core.RandomEncKey()
-		v.Set(core.EncodeKeyEnvStr, encKey)
-		err := config.WriteConfig(v)
-		if err != nil {
-			panic(err)
-		}
-	}
-	services.Init()
-}
+	_ "github.com/apache/incubator-devlake/test"
+)
 
 func TestNewTask(t *testing.T) {
 	r := gin.Default()
diff --git a/test/example/greetings.go b/test/example/greetings.go
deleted file mode 100644
index 3d4e41453..000000000
--- a/test/example/greetings.go
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one or more
-contributor license agreements.  See the NOTICE file distributed with
-this work for additional information regarding copyright ownership.
-The ASF licenses this file to You under the Apache License, Version 2.0
-(the "License"); you may not use this file except in compliance with
-the License.  You may obtain a copy of the License at
-
-    http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-*/
-
-package greetings
-
-import "fmt"
-
-// Hello returns a greeting for the named person.
-func Hello(name string) string {
-	// Return a greeting that embeds the name in a message.
-	message := fmt.Sprintf("Hi, %v. Welcome!", name)
-	return message
-}
diff --git a/test/example/greetings_test.go b/test/init.go
similarity index 57%
rename from test/example/greetings_test.go
rename to test/init.go
index e6ad59a52..c114fc1fa 100644
--- a/test/example/greetings_test.go
+++ b/test/init.go
@@ -15,22 +15,29 @@ See the License for the specific language governing permissions and
 limitations under the License.
 */
 
-// https://golang.org/doc/tutorial/add-a-test
-
-package greetings
+package test
 
 import (
-	"regexp"
-	"testing"
+	"github.com/apache/incubator-devlake/config"
+	"github.com/apache/incubator-devlake/plugins/core"
+	"github.com/apache/incubator-devlake/services"
 )
 
-// TestHelloName calls greetings.Hello with a name, checking
-// for a valid return value.
-func TestHelloName(t *testing.T) {
-	name := "Gladys"
-	want := regexp.MustCompile(`\b` + name + `\b`)
-	msg := Hello("Gladys")
-	if !want.MatchString(msg) {
-		t.Fatalf(`Hello("Gladys") = %q, %v`, msg, want)
+func init() {
+	v := config.GetConfig()
+	encKey := v.GetString(core.EncodeKeyEnvStr)
+	if encKey == "" {
+		// Randomly generate a bunch of encryption keys and set them to config
+		encKey = core.RandomEncKey()
+		v.Set(core.EncodeKeyEnvStr, encKey)
+		err := config.WriteConfig(v)
+		if err != nil {
+			panic(err)
+		}
+	}
+	services.InitResources()
+	err := services.GetMigrator().Execute()
+	if err != nil {
+		panic(err)
 	}
 }
diff --git a/test/services/pipeline_runner_e2e_test.go b/test/services/pipeline_runner_e2e_test.go
new file mode 100644
index 000000000..eff85467a
--- /dev/null
+++ b/test/services/pipeline_runner_e2e_test.go
@@ -0,0 +1,166 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package services
+
+import (
+	"testing"
+
+	"github.com/apache/incubator-devlake/models"
+	"github.com/apache/incubator-devlake/services"
+	_ "github.com/apache/incubator-devlake/test"
+	"github.com/stretchr/testify/assert"
+)
+
+func TestComputePipelineStatus(t *testing.T) {
+	db := services.GetBasicRes().GetDal()
+	// insert fake tasks to datbase
+	pipeline := &models.DbPipeline{
+		TotalTasks: 3,
+	}
+	err := db.Create(pipeline)
+	assert.Nil(t, err)
+	assert.NotZero(t, pipeline.ID)
+
+	task_row1_col1 := &models.Task{
+		PipelineId:  pipeline.ID,
+		PipelineRow: 1,
+		PipelineCol: 1,
+		Plugin:      "github",
+		Status:      models.TASK_COMPLETED,
+	}
+	err = db.Create(task_row1_col1)
+	assert.Nil(t, err)
+	assert.NotZero(t, task_row1_col1.ID)
+
+	task_row1_col2 := &models.Task{
+		PipelineId:  pipeline.ID,
+		PipelineRow: 1,
+		PipelineCol: 2,
+		Plugin:      "gitext",
+		Status:      models.TASK_FAILED,
+	}
+	err = db.Create(task_row1_col2)
+	assert.Nil(t, err)
+	assert.NotZero(t, task_row1_col2.ID)
+
+	task_row2_col1 := &models.Task{
+		PipelineId:  pipeline.ID,
+		PipelineRow: 2,
+		PipelineCol: 1,
+		Plugin:      "refdiff",
+		Status:      models.TASK_CREATED,
+	}
+	err = db.Create(task_row2_col1)
+	assert.Nil(t, err)
+	assert.NotZero(t, task_row2_col1.ID)
+
+	// pipeline.status == "failed" if SkipOnFailed=false and any tasks failed
+	status, err := services.ComputePipelineStatus(pipeline)
+	if !assert.Nil(t, err) {
+		println(err.Messages().Format())
+	}
+	assert.Equal(t, models.TASK_FAILED, status)
+
+	// pipeline.status == "completed" if all latest tasks were succeeded
+	task_row1_col2.Status = models.TASK_COMPLETED
+	err = db.Update(task_row1_col2)
+	assert.Nil(t, err)
+	task_row2_col1.Status = models.TASK_COMPLETED
+	err = db.Update(task_row2_col1)
+	assert.Nil(t, err)
+	status, err = services.ComputePipelineStatus(pipeline)
+	if !assert.Nil(t, err) {
+		println(err.Messages().Format())
+	}
+	assert.Equal(t, models.TASK_COMPLETED, status)
+
+	pipeline.SkipOnFail = true
+	err = db.Update(pipeline)
+	assert.Nil(t, err)
+	status, err = services.ComputePipelineStatus(pipeline)
+	assert.Nil(t, err)
+	assert.Equal(t, models.TASK_COMPLETED, status)
+
+	// pipeline.status == "partial" if SkipOnFail=true and some were succeeded while others not
+	task_row1_col1.Status = models.TASK_FAILED
+	err = db.Update(task_row1_col1)
+	assert.Nil(t, err)
+	status, err = services.ComputePipelineStatus(pipeline)
+	assert.Nil(t, err)
+	assert.Equal(t, models.TASK_PARTIAL, status)
+
+	// pipeline.status == "failed" is SkipOnFail=true and all tasks were fail
+	task_row1_col1.Status = models.TASK_FAILED
+	err = db.Update(task_row1_col1)
+	assert.Nil(t, err)
+	task_row1_col2.Status = models.TASK_FAILED
+	err = db.Update(task_row1_col2)
+	assert.Nil(t, err)
+	task_row2_col1.Status = models.TASK_FAILED
+	err = db.Update(task_row2_col1)
+	assert.Nil(t, err)
+	status, err = services.ComputePipelineStatus(pipeline)
+	assert.Nil(t, err)
+	assert.Equal(t, models.TASK_FAILED, status)
+
+	// pipeline.status == "completed" if all failed tasks were reran successfully
+	task_row1_col1_rerun := &models.Task{
+		PipelineId:  pipeline.ID,
+		PipelineRow: 1,
+		PipelineCol: 1,
+		Plugin:      "github",
+		Status:      models.TASK_COMPLETED,
+	}
+	err = db.Create(task_row1_col1_rerun)
+	assert.Nil(t, err)
+	assert.NotZero(t, task_row1_col1_rerun.ID)
+
+	task_row1_col2_rerun := &models.Task{
+		PipelineId:  pipeline.ID,
+		PipelineRow: 1,
+		PipelineCol: 2,
+		Plugin:      "gitext",
+		Status:      models.TASK_COMPLETED,
+	}
+	err = db.Create(task_row1_col2_rerun)
+	assert.Nil(t, err)
+	assert.NotZero(t, task_row1_col2_rerun.ID)
+
+	task_row2_col1_rerun := &models.Task{
+		PipelineId:  pipeline.ID,
+		PipelineRow: 2,
+		PipelineCol: 1,
+		Plugin:      "refdiff",
+		Status:      models.TASK_COMPLETED,
+	}
+	err = db.Create(task_row2_col1_rerun)
+	assert.Nil(t, err)
+	assert.NotZero(t, task_row2_col1.ID)
+
+	status, err = services.ComputePipelineStatus(pipeline)
+	assert.Nil(t, err)
+	assert.Equal(t, models.TASK_COMPLETED, status)
+
+	// pipeline.status == "partial" if there were failed task in reran tasks
+	task_row1_col1_rerun.Status = models.TASK_CANCELLED
+	err = db.Update(task_row1_col1_rerun)
+	assert.Nil(t, err)
+	status, err = services.ComputePipelineStatus(pipeline)
+	assert.Nil(t, err)
+	assert.Equal(t, models.TASK_PARTIAL, status)
+}