You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by mi...@apache.org on 2022/10/08 06:59:23 UTC
[incubator-eventmesh] branch master updated: modify workflow engine logic
This is an automated email from the ASF dual-hosted git repository.
mikexue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git
The following commit(s) were added to refs/heads/master by this push:
new c7561250 modify workflow engine logic
new 685377a0 Merge pull request #1475 from walterlife/modify-eventmesh-workflow-flow
c7561250 is described below
commit c75612500c8f8b0a7602b0a8fd0a54d1fd981f63
Author: walterlife <wa...@gmail.com>
AuthorDate: Sat Oct 1 21:55:05 2022 +0800
modify workflow engine logic
---
eventmesh-workflow-go/flow/engine.go | 24 +++++++++++-------------
eventmesh-workflow-go/flow/model.go | 14 +++++++++++---
2 files changed, 22 insertions(+), 16 deletions(-)
diff --git a/eventmesh-workflow-go/flow/engine.go b/eventmesh-workflow-go/flow/engine.go
index 6191df55..713186ef 100644
--- a/eventmesh-workflow-go/flow/engine.go
+++ b/eventmesh-workflow-go/flow/engine.go
@@ -42,7 +42,7 @@ func (e *Engine) Validate(ctx context.Context, instanceID string) error {
return nil
}
-// Start ...
+// Start start workflow
func (e *Engine) Start(ctx context.Context, param *WorkflowParam) (string, error) {
r, err := e.workflowDAL.SelectStartTask(ctx, model.WorkflowTask{WorkflowID: param.ID})
if err != nil {
@@ -59,24 +59,22 @@ func (e *Engine) Start(ctx context.Context, param *WorkflowParam) (string, error
return "", err
}
var w = model.WorkflowTaskInstance{WorkflowInstanceID: workflowInstanceID, WorkflowID: param.ID,
- TaskID: r.TaskID, TaskInstanceId: uuid.New().String(), Status: constants.TaskInstanceWaitStatus,
- Input: param.Input}
+ TaskID: r.TaskID, TaskInstanceID: uuid.New().String(), Status: constants.TaskInstanceWaitStatus,
+ Input: param.Input, IsStart: true}
return workflowInstanceID, e.queue.Publish([]*model.WorkflowTaskInstance{&w})
}
-// Transition ...
+// Transition transition next workflow task
func (e *Engine) Transition(ctx context.Context, param *WorkflowParam) error {
- r, err := e.workflowDAL.SelectTransitionTask(ctx, model.WorkflowTask{WorkflowID: param.ID,
- WorkflowInstanceID: param.InstanceID})
+ r, err := e.workflowDAL.SelectTransitionTask(ctx, model.WorkflowTaskInstance{WorkflowID: param.ID,
+ WorkflowInstanceID: param.InstanceID, TaskInstanceID: param.TaskInstanceID,
+ Status: constants.TaskInstanceSleepStatus})
if err != nil {
return err
}
- var taskInstances []*model.WorkflowTaskInstance
- for _, task := range r {
- var taskInstance = model.WorkflowTaskInstance{WorkflowInstanceID: param.InstanceID, WorkflowID: param.ID,
- TaskID: task.TaskID, TaskInstanceId: uuid.New().String(), Status: constants.TaskInstanceWaitStatus,
- Input: param.Input}
- taskInstances = append(taskInstances, &taskInstance)
+ if r == nil {
+ return nil
}
- return e.queue.Publish(taskInstances)
+ r.Status = constants.TaskInstanceWaitStatus
+ return e.queue.Publish([]*model.WorkflowTaskInstance{r})
}
diff --git a/eventmesh-workflow-go/flow/model.go b/eventmesh-workflow-go/flow/model.go
index dab09245..afb740f9 100644
--- a/eventmesh-workflow-go/flow/model.go
+++ b/eventmesh-workflow-go/flow/model.go
@@ -15,8 +15,16 @@
package flow
+// WorkflowParam workflow runtime param
type WorkflowParam struct {
- ID string `json:"id"`
- InstanceID string `json:"instance_id"`
- Input string `json:"input"`
+ ID string `json:"id"`
+ InstanceID string `json:"instance_id"`
+ TaskInstanceID string `json:"task_instance_id"`
+ Input string `json:"input"`
+}
+
+type WorkflowEventCatalog struct {
+ OperationID string `json:"operation_id"`
+ Topic string `json:"topic"`
+ Schema string `json:"schema"`
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org