You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by mi...@apache.org on 2022/10/08 07:08:36 UTC

[incubator-eventmesh] branch master updated: modify dal logic 1. add new constants 2. modify query task instance logic

This is an automated email from the ASF dual-hosted git repository.

mikexue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git


The following commit(s) were added to refs/heads/master by this push:
     new abbe8079 modify dal logic 1. add new constants 2. modify query task instance logic
     new 5586b564 Merge pull request #1481 from walterlife/modify-eventmesh-workflow-dal
abbe8079 is described below

commit abbe807990e5699b92080b6e9d6c9a65fd92b1ac
Author: walterlife <wa...@gmail.com>
AuthorDate: Sat Oct 1 22:06:24 2022 +0800

    modify dal logic
    1. add new constants
    2. modify query task instance logic
---
 .../internal/constants/constants.go                |  18 +++-
 .../internal/dal/model/workflow_task.go            |  13 +--
 eventmesh-workflow-go/internal/dal/workflow.go     | 107 +++++++++------------
 3 files changed, 67 insertions(+), 71 deletions(-)

diff --git a/eventmesh-workflow-go/internal/constants/constants.go b/eventmesh-workflow-go/internal/constants/constants.go
index 61bb474b..3c014e31 100644
--- a/eventmesh-workflow-go/internal/constants/constants.go
+++ b/eventmesh-workflow-go/internal/constants/constants.go
@@ -31,10 +31,11 @@ const (
 
 // task instance status
 const (
-	TaskInstanceWaitStatus    = 1
-	TaskInstanceProcessStatus = 2
-	TaskInstanceSuccessStatus = 3
-	TaskInstanceFailStatus    = 4
+	TaskInstanceSleepStatus   = 1
+	TaskInstanceWaitStatus    = 2
+	TaskInstanceProcessStatus = 3
+	TaskInstanceSuccessStatus = 4
+	TaskInstanceFailStatus    = 5
 )
 
 // workflow instance status
@@ -56,5 +57,12 @@ const (
 )
 
 const (
-	OrderDesc = "DESC"
+	// RetryAttempts fail retry max times
+	RetryAttempts = 5
+)
+
+const (
+	EventTypePublish                 = "publish"
+	EventPropsWorkflowInstanceID     = "workflow_instance_id"
+	EventPropsWorkflowTaskInstanceID = "workflow_task_instance_id"
 )
diff --git a/eventmesh-workflow-go/internal/dal/model/workflow_task.go b/eventmesh-workflow-go/internal/dal/model/workflow_task.go
index 2114398e..bc77d177 100644
--- a/eventmesh-workflow-go/internal/dal/model/workflow_task.go
+++ b/eventmesh-workflow-go/internal/dal/model/workflow_task.go
@@ -27,9 +27,9 @@ type WorkflowTask struct {
 	CreateTime         time.Time               `json:"create_time"`
 	UpdateTime         time.Time               `json:"update_time"`
 	Actions            []*WorkflowTaskAction   `json:"-" gorm:"-"`
-	Relations          []*WorkflowTaskRelation `json:"-" gorm:"-"`
 	TaskIDs            []string                `json:"-" gorm:"-"`
 	WorkflowInstanceID string                  `json:"-" gorm:"-"`
+	ChildTasks         []*WorkflowTaskRelation `json:"-" gorm:"-"`
 }
 
 func (w WorkflowTask) TableName() string {
@@ -72,14 +72,15 @@ type WorkflowTaskInstance struct {
 	WorkflowInstanceID string        `json:"workflow_instance_id" gorm:"column:workflow_instance_id;type:varchar;size:1024"`
 	WorkflowID         string        `json:"workflow_id" gorm:"column:workflow_id;type:varchar;size:1024"`
 	TaskID             string        `json:"task_id" gorm:"column:task_id;type:varchar;size:1024"`
-	TaskInstanceId     string        `json:"task_instance_id" gorm:"column:task_instance_id;type:varchar;size:1024"`
+	TaskInstanceID     string        `json:"task_instance_id" gorm:"column:task_instance_id;type:varchar;size:1024"`
 	Status             int           `json:"status" gorm:"column:status;type:int"`
 	Input              string        `json:"input" gorm:"column:input;type:text;"`
 	RetryTimes         int           `json:"retry_times" gorm:"column:retry_times;type:int"`
-	CreateTime         time.Time     `json:"create_time"`
-	UpdateTime         time.Time     `json:"update_time"`
-	Task               *WorkflowTask `gorm:"-"`
-	Order              string        `gorm:"-"`
+	CreateTime         time.Time     `json:"create_time" gorm:"column:create_time"`
+	UpdateTime         time.Time     `json:"update_time" gorm:"column:update_time"`
+	Task               *WorkflowTask `json:"task" gorm:"-"`
+	Order              string        `json:"order" gorm:"-"`
+	IsStart            bool          `json:"is_start" gorm:"-"`
 }
 
 func (w WorkflowTaskInstance) TableName() string {
diff --git a/eventmesh-workflow-go/internal/dal/workflow.go b/eventmesh-workflow-go/internal/dal/workflow.go
index 2efb9fab..1622973b 100644
--- a/eventmesh-workflow-go/internal/dal/workflow.go
+++ b/eventmesh-workflow-go/internal/dal/workflow.go
@@ -33,13 +33,12 @@ import (
 type WorkflowDAL interface {
 	Select(ctx context.Context, workflowID string) (*model.Workflow, error)
 	SelectStartTask(ctx context.Context, condition model.WorkflowTask) (*model.WorkflowTask, error)
-	SelectTransitionTask(ctx context.Context, condition model.WorkflowTask) ([]*model.WorkflowTask, error)
-	SelectChildTask(ctx context.Context, condition model.WorkflowTask) ([]*model.WorkflowTask, error)
+	SelectTransitionTask(ctx context.Context, condition model.WorkflowTaskInstance) (*model.WorkflowTaskInstance, error)
 	SelectTaskInstance(ctx context.Context, condition model.WorkflowTaskInstance) (*model.WorkflowTaskInstance, error)
 	Insert(ctx context.Context, record *model.Workflow) error
 	InsertInstance(ctx context.Context, record *model.WorkflowInstance) error
 	InsertTaskInstance(ctx context.Context, record *model.WorkflowTaskInstance) error
-	UpdateTaskInstance(ctx context.Context, record *model.WorkflowTaskInstance) error
+	UpdateTaskInstance(tx *gorm.DB, record *model.WorkflowTaskInstance) error
 }
 
 func NewWorkflowDAL() WorkflowDAL {
@@ -77,71 +76,52 @@ func (w *workflowDALImpl) SelectStartTask(ctx context.Context, condition model.W
 	return &model.WorkflowTask{TaskID: r.ToTaskID, WorkflowID: condition.WorkflowID}, nil
 }
 
-func (w *workflowDALImpl) SelectTransitionTask(ctx context.Context, condition model.WorkflowTask) (
-	[]*model.WorkflowTask, error) {
-	var c = model.WorkflowTaskInstance{WorkflowInstanceID: condition.WorkflowInstanceID,
-		Status: constants.TaskInstanceSuccessStatus}
+func (w *workflowDALImpl) SelectTransitionTask(ctx context.Context, condition model.WorkflowTaskInstance) (
+	*model.WorkflowTaskInstance, error) {
 	var r model.WorkflowTaskInstance
-	if err := workflowDB.WithContext(ctx).Where(&c).Order("update_time DESC").
-		First(&r).Error; err != nil {
+	if err := workflowDB.WithContext(ctx).Where(&condition).First(&r).Error; err != nil {
 		if err == gorm.ErrRecordNotFound {
 			return nil, nil
 		}
 		return nil, err
 	}
-	return w.SelectChildTask(ctx, model.WorkflowTask{WorkflowID: condition.WorkflowID, TaskID: r.TaskID})
+	return &r, nil
 }
 
-func (w *workflowDALImpl) SelectChildTask(ctx context.Context, condition model.WorkflowTask) (
-	[]*model.WorkflowTask, error) {
-	var c = model.WorkflowTaskRelation{FromTaskID: condition.TaskID, WorkflowID: condition.WorkflowID,
-		Status: constants.TaskNormalStatus}
-	var rel []*model.WorkflowTaskRelation
-	if err := workflowDB.WithContext(ctx).Where(&c).Find(&rel).Error; err != nil {
+func (w *workflowDALImpl) SelectTaskInstance(ctx context.Context, condition model.WorkflowTaskInstance) (*model.
+	WorkflowTaskInstance, error) {
+	var r model.WorkflowTaskInstance
+	if err := workflowDB.WithContext(ctx).Where(&condition).Order("create_time desc").
+		First(&r).Error; err != nil {
+		if err == gorm.ErrRecordNotFound {
+			return nil, nil
+		}
 		return nil, err
 	}
-	if len(rel) == 0 {
-		return nil, nil
-	}
-	var taskIDs []string
-	for _, r := range rel {
-		taskIDs = append(taskIDs, r.ToTaskID)
-	}
-
 	var handlers []func() error
 	var err error
 	var tasks []*model.WorkflowTask
+	var childTasks []*model.WorkflowTaskRelation
 	var taskActions []*model.WorkflowTaskAction
 	handlers = append(handlers, func() error {
-		tasks, err = w.selectTask(context.Background(), condition.WorkflowID, taskIDs)
-		if err != nil {
-			return err
-		}
-		return nil
+		tasks, err = w.selectTask(context.Background(), condition.WorkflowID, []string{r.TaskID})
+		return err
 	})
 	handlers = append(handlers, func() error {
-		taskActions, err = w.selectTaskAction(context.Background(), condition.WorkflowID, taskIDs)
+		childTasks, err = w.selectTaskRelation(context.Background(), condition.WorkflowID, condition.TaskID)
+		return err
+	})
+	handlers = append(handlers, func() error {
+		taskActions, err = w.selectTaskAction(context.Background(), condition.WorkflowID, []string{r.TaskID})
 		if err != nil {
 			return err
 		}
 		return nil
 	})
-	if err := util.GoAndWait(handlers...); err != nil {
-		return nil, err
-	}
-	return w.completeTask(tasks, taskActions), nil
-}
-
-func (w *workflowDALImpl) SelectTaskInstance(ctx context.Context, condition model.WorkflowTaskInstance) (*model.
-	WorkflowTaskInstance, error) {
-	var r model.WorkflowTaskInstance
-	if err := workflowDB.WithContext(ctx).Where(&condition).First(&r).Error; err != nil {
-		if err == gorm.ErrRecordNotFound {
-			return nil, nil
-		}
+	if err = util.GoAndWait(handlers...); err != nil {
 		return nil, err
 	}
-	return &r, nil
+	return w.completeTaskInstance(r, tasks, childTasks, taskActions)
 }
 
 func (w *workflowDALImpl) Insert(ctx context.Context, record *model.Workflow) error {
@@ -191,9 +171,9 @@ func (w *workflowDALImpl) InsertTaskInstance(ctx context.Context,
 	return workflowDB.WithContext(ctx).Create(&record).Error
 }
 
-func (w *workflowDALImpl) UpdateTaskInstance(ctx context.Context, record *model.WorkflowTaskInstance) error {
+func (w *workflowDALImpl) UpdateTaskInstance(tx *gorm.DB, record *model.WorkflowTaskInstance) error {
 	var condition = model.WorkflowInstance{ID: record.ID}
-	return workflowDB.WithContext(ctx).Where(&condition).Updates(&record).Error
+	return tx.Where(&condition).Updates(&record).Error
 }
 
 func (w *workflowDALImpl) buildTask(workflow *pmodel.Workflow) []*model.WorkflowTask {
@@ -404,20 +384,27 @@ func (w *workflowDALImpl) selectTaskAction(ctx context.Context,
 	return r, nil
 }
 
-func (w *workflowDALImpl) completeTask(tasks []*model.WorkflowTask,
-	taskActions []*model.WorkflowTaskAction) []*model.WorkflowTask {
-	var r []*model.WorkflowTask
-	var t = make(map[string][]*model.WorkflowTaskAction)
-	for _, action := range taskActions {
-		t[action.TaskID] = append(t[action.TaskID], action)
+func (w *workflowDALImpl) selectTaskRelation(ctx context.Context, workflowID string, taskID string) (
+	[]*model.WorkflowTaskRelation, error) {
+	var relations []*model.WorkflowTaskRelation
+	var c = model.WorkflowTaskRelation{FromTaskID: taskID, WorkflowID: workflowID, Status: constants.TaskNormalStatus}
+	if err := workflowDB.WithContext(ctx).Where(&c).Find(&relations).Error; err != nil {
+		return nil, err
 	}
-	for _, task := range tasks {
-		var wt model.WorkflowTask
-		if err := gconv.Struct(task, &wt); err != nil {
-			continue
-		}
-		wt.Actions = t[task.TaskID]
-		r = append(r, &wt)
+	return relations, nil
+}
+
+func (w *workflowDALImpl) completeTaskInstance(instance model.WorkflowTaskInstance, tasks []*model.WorkflowTask,
+	childTasks []*model.WorkflowTaskRelation, taskActions []*model.WorkflowTaskAction) (*model.WorkflowTaskInstance, error) {
+	if len(tasks) == 0 {
+		return nil, nil
 	}
-	return r
+	var r model.WorkflowTaskInstance
+	if err := gconv.Struct(instance, &r); err != nil {
+		return nil, err
+	}
+	r.Task = tasks[0]
+	r.Task.ChildTasks = childTasks
+	r.Task.Actions = taskActions
+	return &r, nil
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org