You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by mi...@apache.org on 2022/10/08 07:08:36 UTC
[incubator-eventmesh] branch master updated: modify dal logic 1. add new constants 2. modify query task instance logic
This is an automated email from the ASF dual-hosted git repository.
mikexue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git
The following commit(s) were added to refs/heads/master by this push:
new abbe8079 modify dal logic 1. add new constants 2. modify query task instance logic
new 5586b564 Merge pull request #1481 from walterlife/modify-eventmesh-workflow-dal
abbe8079 is described below
commit abbe807990e5699b92080b6e9d6c9a65fd92b1ac
Author: walterlife <wa...@gmail.com>
AuthorDate: Sat Oct 1 22:06:24 2022 +0800
modify dal logic
1. add new constants
2. modify query task instance logic
---
.../internal/constants/constants.go | 18 +++-
.../internal/dal/model/workflow_task.go | 13 +--
eventmesh-workflow-go/internal/dal/workflow.go | 107 +++++++++------------
3 files changed, 67 insertions(+), 71 deletions(-)
diff --git a/eventmesh-workflow-go/internal/constants/constants.go b/eventmesh-workflow-go/internal/constants/constants.go
index 61bb474b..3c014e31 100644
--- a/eventmesh-workflow-go/internal/constants/constants.go
+++ b/eventmesh-workflow-go/internal/constants/constants.go
@@ -31,10 +31,11 @@ const (
// task instance status
const (
- TaskInstanceWaitStatus = 1
- TaskInstanceProcessStatus = 2
- TaskInstanceSuccessStatus = 3
- TaskInstanceFailStatus = 4
+ TaskInstanceSleepStatus = 1
+ TaskInstanceWaitStatus = 2
+ TaskInstanceProcessStatus = 3
+ TaskInstanceSuccessStatus = 4
+ TaskInstanceFailStatus = 5
)
// workflow instance status
@@ -56,5 +57,12 @@ const (
)
const (
- OrderDesc = "DESC"
+ // RetryAttempts fail retry max times
+ RetryAttempts = 5
+)
+
+const (
+ EventTypePublish = "publish"
+ EventPropsWorkflowInstanceID = "workflow_instance_id"
+ EventPropsWorkflowTaskInstanceID = "workflow_task_instance_id"
)
diff --git a/eventmesh-workflow-go/internal/dal/model/workflow_task.go b/eventmesh-workflow-go/internal/dal/model/workflow_task.go
index 2114398e..bc77d177 100644
--- a/eventmesh-workflow-go/internal/dal/model/workflow_task.go
+++ b/eventmesh-workflow-go/internal/dal/model/workflow_task.go
@@ -27,9 +27,9 @@ type WorkflowTask struct {
CreateTime time.Time `json:"create_time"`
UpdateTime time.Time `json:"update_time"`
Actions []*WorkflowTaskAction `json:"-" gorm:"-"`
- Relations []*WorkflowTaskRelation `json:"-" gorm:"-"`
TaskIDs []string `json:"-" gorm:"-"`
WorkflowInstanceID string `json:"-" gorm:"-"`
+ ChildTasks []*WorkflowTaskRelation `json:"-" gorm:"-"`
}
func (w WorkflowTask) TableName() string {
@@ -72,14 +72,15 @@ type WorkflowTaskInstance struct {
WorkflowInstanceID string `json:"workflow_instance_id" gorm:"column:workflow_instance_id;type:varchar;size:1024"`
WorkflowID string `json:"workflow_id" gorm:"column:workflow_id;type:varchar;size:1024"`
TaskID string `json:"task_id" gorm:"column:task_id;type:varchar;size:1024"`
- TaskInstanceId string `json:"task_instance_id" gorm:"column:task_instance_id;type:varchar;size:1024"`
+ TaskInstanceID string `json:"task_instance_id" gorm:"column:task_instance_id;type:varchar;size:1024"`
Status int `json:"status" gorm:"column:status;type:int"`
Input string `json:"input" gorm:"column:input;type:text;"`
RetryTimes int `json:"retry_times" gorm:"column:retry_times;type:int"`
- CreateTime time.Time `json:"create_time"`
- UpdateTime time.Time `json:"update_time"`
- Task *WorkflowTask `gorm:"-"`
- Order string `gorm:"-"`
+ CreateTime time.Time `json:"create_time" gorm:"column:create_time"`
+ UpdateTime time.Time `json:"update_time" gorm:"column:update_time"`
+ Task *WorkflowTask `json:"task" gorm:"-"`
+ Order string `json:"order" gorm:"-"`
+ IsStart bool `json:"is_start" gorm:"-"`
}
func (w WorkflowTaskInstance) TableName() string {
diff --git a/eventmesh-workflow-go/internal/dal/workflow.go b/eventmesh-workflow-go/internal/dal/workflow.go
index 2efb9fab..1622973b 100644
--- a/eventmesh-workflow-go/internal/dal/workflow.go
+++ b/eventmesh-workflow-go/internal/dal/workflow.go
@@ -33,13 +33,12 @@ import (
type WorkflowDAL interface {
Select(ctx context.Context, workflowID string) (*model.Workflow, error)
SelectStartTask(ctx context.Context, condition model.WorkflowTask) (*model.WorkflowTask, error)
- SelectTransitionTask(ctx context.Context, condition model.WorkflowTask) ([]*model.WorkflowTask, error)
- SelectChildTask(ctx context.Context, condition model.WorkflowTask) ([]*model.WorkflowTask, error)
+ SelectTransitionTask(ctx context.Context, condition model.WorkflowTaskInstance) (*model.WorkflowTaskInstance, error)
SelectTaskInstance(ctx context.Context, condition model.WorkflowTaskInstance) (*model.WorkflowTaskInstance, error)
Insert(ctx context.Context, record *model.Workflow) error
InsertInstance(ctx context.Context, record *model.WorkflowInstance) error
InsertTaskInstance(ctx context.Context, record *model.WorkflowTaskInstance) error
- UpdateTaskInstance(ctx context.Context, record *model.WorkflowTaskInstance) error
+ UpdateTaskInstance(tx *gorm.DB, record *model.WorkflowTaskInstance) error
}
func NewWorkflowDAL() WorkflowDAL {
@@ -77,71 +76,52 @@ func (w *workflowDALImpl) SelectStartTask(ctx context.Context, condition model.W
return &model.WorkflowTask{TaskID: r.ToTaskID, WorkflowID: condition.WorkflowID}, nil
}
-func (w *workflowDALImpl) SelectTransitionTask(ctx context.Context, condition model.WorkflowTask) (
- []*model.WorkflowTask, error) {
- var c = model.WorkflowTaskInstance{WorkflowInstanceID: condition.WorkflowInstanceID,
- Status: constants.TaskInstanceSuccessStatus}
+func (w *workflowDALImpl) SelectTransitionTask(ctx context.Context, condition model.WorkflowTaskInstance) (
+ *model.WorkflowTaskInstance, error) {
var r model.WorkflowTaskInstance
- if err := workflowDB.WithContext(ctx).Where(&c).Order("update_time DESC").
- First(&r).Error; err != nil {
+ if err := workflowDB.WithContext(ctx).Where(&condition).First(&r).Error; err != nil {
if err == gorm.ErrRecordNotFound {
return nil, nil
}
return nil, err
}
- return w.SelectChildTask(ctx, model.WorkflowTask{WorkflowID: condition.WorkflowID, TaskID: r.TaskID})
+ return &r, nil
}
-func (w *workflowDALImpl) SelectChildTask(ctx context.Context, condition model.WorkflowTask) (
- []*model.WorkflowTask, error) {
- var c = model.WorkflowTaskRelation{FromTaskID: condition.TaskID, WorkflowID: condition.WorkflowID,
- Status: constants.TaskNormalStatus}
- var rel []*model.WorkflowTaskRelation
- if err := workflowDB.WithContext(ctx).Where(&c).Find(&rel).Error; err != nil {
+func (w *workflowDALImpl) SelectTaskInstance(ctx context.Context, condition model.WorkflowTaskInstance) (*model.
+ WorkflowTaskInstance, error) {
+ var r model.WorkflowTaskInstance
+ if err := workflowDB.WithContext(ctx).Where(&condition).Order("create_time desc").
+ First(&r).Error; err != nil {
+ if err == gorm.ErrRecordNotFound {
+ return nil, nil
+ }
return nil, err
}
- if len(rel) == 0 {
- return nil, nil
- }
- var taskIDs []string
- for _, r := range rel {
- taskIDs = append(taskIDs, r.ToTaskID)
- }
-
var handlers []func() error
var err error
var tasks []*model.WorkflowTask
+ var childTasks []*model.WorkflowTaskRelation
var taskActions []*model.WorkflowTaskAction
handlers = append(handlers, func() error {
- tasks, err = w.selectTask(context.Background(), condition.WorkflowID, taskIDs)
- if err != nil {
- return err
- }
- return nil
+ tasks, err = w.selectTask(context.Background(), condition.WorkflowID, []string{r.TaskID})
+ return err
})
handlers = append(handlers, func() error {
- taskActions, err = w.selectTaskAction(context.Background(), condition.WorkflowID, taskIDs)
+ childTasks, err = w.selectTaskRelation(context.Background(), condition.WorkflowID, condition.TaskID)
+ return err
+ })
+ handlers = append(handlers, func() error {
+ taskActions, err = w.selectTaskAction(context.Background(), condition.WorkflowID, []string{r.TaskID})
if err != nil {
return err
}
return nil
})
- if err := util.GoAndWait(handlers...); err != nil {
- return nil, err
- }
- return w.completeTask(tasks, taskActions), nil
-}
-
-func (w *workflowDALImpl) SelectTaskInstance(ctx context.Context, condition model.WorkflowTaskInstance) (*model.
- WorkflowTaskInstance, error) {
- var r model.WorkflowTaskInstance
- if err := workflowDB.WithContext(ctx).Where(&condition).First(&r).Error; err != nil {
- if err == gorm.ErrRecordNotFound {
- return nil, nil
- }
+ if err = util.GoAndWait(handlers...); err != nil {
return nil, err
}
- return &r, nil
+ return w.completeTaskInstance(r, tasks, childTasks, taskActions)
}
func (w *workflowDALImpl) Insert(ctx context.Context, record *model.Workflow) error {
@@ -191,9 +171,9 @@ func (w *workflowDALImpl) InsertTaskInstance(ctx context.Context,
return workflowDB.WithContext(ctx).Create(&record).Error
}
-func (w *workflowDALImpl) UpdateTaskInstance(ctx context.Context, record *model.WorkflowTaskInstance) error {
+func (w *workflowDALImpl) UpdateTaskInstance(tx *gorm.DB, record *model.WorkflowTaskInstance) error {
var condition = model.WorkflowInstance{ID: record.ID}
- return workflowDB.WithContext(ctx).Where(&condition).Updates(&record).Error
+ return tx.Where(&condition).Updates(&record).Error
}
func (w *workflowDALImpl) buildTask(workflow *pmodel.Workflow) []*model.WorkflowTask {
@@ -404,20 +384,27 @@ func (w *workflowDALImpl) selectTaskAction(ctx context.Context,
return r, nil
}
-func (w *workflowDALImpl) completeTask(tasks []*model.WorkflowTask,
- taskActions []*model.WorkflowTaskAction) []*model.WorkflowTask {
- var r []*model.WorkflowTask
- var t = make(map[string][]*model.WorkflowTaskAction)
- for _, action := range taskActions {
- t[action.TaskID] = append(t[action.TaskID], action)
+func (w *workflowDALImpl) selectTaskRelation(ctx context.Context, workflowID string, taskID string) (
+ []*model.WorkflowTaskRelation, error) {
+ var relations []*model.WorkflowTaskRelation
+ var c = model.WorkflowTaskRelation{FromTaskID: taskID, WorkflowID: workflowID, Status: constants.TaskNormalStatus}
+ if err := workflowDB.WithContext(ctx).Where(&c).Find(&relations).Error; err != nil {
+ return nil, err
}
- for _, task := range tasks {
- var wt model.WorkflowTask
- if err := gconv.Struct(task, &wt); err != nil {
- continue
- }
- wt.Actions = t[task.TaskID]
- r = append(r, &wt)
+ return relations, nil
+}
+
+func (w *workflowDALImpl) completeTaskInstance(instance model.WorkflowTaskInstance, tasks []*model.WorkflowTask,
+ childTasks []*model.WorkflowTaskRelation, taskActions []*model.WorkflowTaskAction) (*model.WorkflowTaskInstance, error) {
+ if len(tasks) == 0 {
+ return nil, nil
}
- return r
+ var r model.WorkflowTaskInstance
+ if err := gconv.Struct(instance, &r); err != nil {
+ return nil, err
+ }
+ r.Task = tasks[0]
+ r.Task.ChildTasks = childTasks
+ r.Task.Actions = taskActions
+ return &r, nil
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org