You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by mi...@apache.org on 2022/10/27 02:32:44 UTC
[incubator-eventmesh] branch master updated: fix task publish em message error
This is an automated email from the ASF dual-hosted git repository.
mikexue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git
The following commit(s) were added to refs/heads/master by this push:
new 74f8bf94 fix task publish em message error
new fcd99788 Merge pull request #1936 from walterlife/optimize-task-logic
74f8bf94 is described below
commit 74f8bf94d659a7a059b181bbef1cc768e90852e3
Author: yuweizhu <yu...@tencent.com>
AuthorDate: Wed Oct 26 23:34:35 2022 +0800
fix task publish em message error
---
.../internal/constants/constants.go | 4 +--
.../internal/protocol/meshmessage.go | 6 +++-
.../internal/task/operation_task.go | 16 ++++++++++-
eventmesh-workflow-go/internal/task/switch_task.go | 32 ++++++++++++++++------
eventmesh-workflow-go/internal/task/task.go | 16 ++++++++---
5 files changed, 57 insertions(+), 17 deletions(-)
diff --git a/eventmesh-workflow-go/internal/constants/constants.go b/eventmesh-workflow-go/internal/constants/constants.go
index 3c014e31..a6d22135 100644
--- a/eventmesh-workflow-go/internal/constants/constants.go
+++ b/eventmesh-workflow-go/internal/constants/constants.go
@@ -63,6 +63,6 @@ const (
const (
EventTypePublish = "publish"
- EventPropsWorkflowInstanceID = "workflow_instance_id"
- EventPropsWorkflowTaskInstanceID = "workflow_task_instance_id"
+ EventPropsWorkflowInstanceID = "workflowinstanceid"
+ EventPropsWorkflowTaskInstanceID = "workflowtaskinstanceid"
)
diff --git a/eventmesh-workflow-go/internal/protocol/meshmessage.go b/eventmesh-workflow-go/internal/protocol/meshmessage.go
index 90f95652..c8af9d52 100644
--- a/eventmesh-workflow-go/internal/protocol/meshmessage.go
+++ b/eventmesh-workflow-go/internal/protocol/meshmessage.go
@@ -17,6 +17,7 @@ package protocol
import (
"context"
+ "fmt"
pgrpc "github.com/apache/incubator-eventmesh/eventmesh-sdk-go/grpc"
"github.com/apache/incubator-eventmesh/eventmesh-sdk-go/grpc/conf"
eventmesh "github.com/apache/incubator-eventmesh/eventmesh-sdk-go/grpc/proto"
@@ -65,10 +66,13 @@ func (m *MeshMessage) Publish(ctx context.Context, topic string, content string,
SeqNum: uuid.New().String(),
Properties: properties,
}
- resp, err := client.Publish(ctx, message)
+ resp, err := client.Publish(context.Background(), message)
if err != nil {
return err
}
log.Get(constants.LogSchedule).Debugf("publish event result: %v", resp.String())
+ if resp.RespCode != "0" {
+ return fmt.Errorf("eventmesh publish message error: [code]%v[msg]%v", resp.RespCode, resp.RespMsg)
+ }
return nil
}
diff --git a/eventmesh-workflow-go/internal/task/operation_task.go b/eventmesh-workflow-go/internal/task/operation_task.go
index b658d307..1e58921c 100644
--- a/eventmesh-workflow-go/internal/task/operation_task.go
+++ b/eventmesh-workflow-go/internal/task/operation_task.go
@@ -16,8 +16,10 @@
package task
import (
+ "context"
"github.com/apache/incubator-eventmesh/eventmesh-server-go/config"
"github.com/apache/incubator-eventmesh/eventmesh-workflow-go/internal/constants"
+ "github.com/apache/incubator-eventmesh/eventmesh-workflow-go/internal/dal"
"github.com/apache/incubator-eventmesh/eventmesh-workflow-go/internal/dal/model"
"github.com/apache/incubator-eventmesh/eventmesh-workflow-go/internal/queue"
"github.com/google/uuid"
@@ -39,6 +41,7 @@ func NewOperationTask(instance *model.WorkflowTaskInstance) Task {
t.action = instance.Task.Actions[0]
t.transition = instance.Task.ChildTasks[0]
t.baseTask.queue = queue.GetQueue(config.GlobalConfig().Flow.Queue.Store)
+ t.workflowDAL = dal.NewWorkflowDAL()
return &t
}
@@ -46,6 +49,17 @@ func (t *operationTask) Run() error {
if t.action == nil {
return nil
}
+ // match end
+ if t.transition.ToTaskID == constants.TaskEndID {
+ if t.action != nil {
+ if err := publishEvent(t.workflowInstanceID, uuid.New().String(), t.action.OperationName, t.input); err != nil {
+ return err
+ }
+ }
+ return t.workflowDAL.UpdateInstance(context.Background(),
+ &model.WorkflowInstance{WorkflowInstanceID: t.workflowInstanceID,
+ WorkflowStatus: constants.WorkflowInstanceSuccessStatus})
+ }
var taskInstanceID = uuid.New().String()
var taskInstance = model.WorkflowTaskInstance{WorkflowInstanceID: t.workflowInstanceID, WorkflowID: t.workflowID,
TaskID: t.transition.ToTaskID, TaskInstanceID: taskInstanceID, Status: constants.TaskInstanceSleepStatus,
@@ -53,5 +67,5 @@ func (t *operationTask) Run() error {
if err := t.baseTask.queue.Publish([]*model.WorkflowTaskInstance{&taskInstance}); err != nil {
return err
}
- return publishEvent(t.workflowInstanceID, taskInstanceID, t.input, t.action.OperationName)
+ return publishEvent(t.workflowInstanceID, taskInstanceID, t.action.OperationName, t.input)
}
diff --git a/eventmesh-workflow-go/internal/task/switch_task.go b/eventmesh-workflow-go/internal/task/switch_task.go
index b39aeb24..cc39d113 100644
--- a/eventmesh-workflow-go/internal/task/switch_task.go
+++ b/eventmesh-workflow-go/internal/task/switch_task.go
@@ -16,12 +16,17 @@
package task
import (
+ "context"
+ "encoding/json"
"github.com/apache/incubator-eventmesh/eventmesh-server-go/config"
"github.com/apache/incubator-eventmesh/eventmesh-workflow-go/internal/constants"
+ "github.com/apache/incubator-eventmesh/eventmesh-workflow-go/internal/dal"
"github.com/apache/incubator-eventmesh/eventmesh-workflow-go/internal/dal/model"
"github.com/apache/incubator-eventmesh/eventmesh-workflow-go/internal/queue"
"github.com/apache/incubator-eventmesh/eventmesh-workflow-go/third_party/jqer"
"github.com/gogf/gf/util/gconv"
+ "github.com/google/uuid"
+ "strconv"
)
type switchTask struct {
@@ -35,11 +40,12 @@ func NewSwitchTask(instance *model.WorkflowTaskInstance) Task {
if instance == nil || instance.Task == nil {
return nil
}
- t.baseTask = baseTask{taskID: instance.TaskID, taskInstanceID: instance.TaskInstanceID, input: instance.Input,
+ t.baseTask = baseTask{taskID: instance.TaskID, input: instance.Input,
workflowID: instance.WorkflowID, workflowInstanceID: instance.WorkflowInstanceID,
taskType: instance.Task.TaskType}
t.transitions = instance.Task.ChildTasks
t.baseTask.queue = queue.GetQueue(config.GlobalConfig().Flow.Queue.Store)
+ t.workflowDAL = dal.NewWorkflowDAL()
t.jq = jqer.NewJQ()
return &t
}
@@ -48,22 +54,30 @@ func (t *switchTask) Run() error {
if len(t.transitions) == 0 {
return nil
}
- var tasks []*model.WorkflowTaskInstance
for _, transition := range t.transitions {
- res, err := t.jq.One(t.input, transition.Condition)
+ if transition.ToTaskID == constants.TaskEndID {
+ continue
+ }
+ var jqData interface{}
+ err := json.Unmarshal([]byte(t.input), &jqData)
if err != nil {
return err
}
- if !gconv.Bool(res) {
+ res, err := t.jq.One(jqData, transition.Condition)
+ if err != nil {
continue
}
- if transition.ToTaskID == constants.TaskEndID {
- break
+ boolValue, err := strconv.ParseBool(gconv.String(res))
+ if err != nil || !boolValue {
+ continue
}
var taskInstance = model.WorkflowTaskInstance{WorkflowInstanceID: t.workflowInstanceID,
- WorkflowID: t.workflowID, TaskID: transition.ToTaskID, TaskInstanceID: t.taskInstanceID,
+ WorkflowID: t.workflowID, TaskID: transition.ToTaskID, TaskInstanceID: uuid.New().String(),
Status: constants.TaskInstanceWaitStatus, Input: t.baseTask.input}
- tasks = append(tasks, &taskInstance)
+ return t.baseTask.queue.Publish([]*model.WorkflowTaskInstance{&taskInstance})
}
- return t.baseTask.queue.Publish(tasks)
+ // not match
+ return t.workflowDAL.UpdateInstance(context.Background(),
+ &model.WorkflowInstance{WorkflowInstanceID: t.workflowInstanceID,
+ WorkflowStatus: constants.WorkflowInstanceSuccessStatus})
}
diff --git a/eventmesh-workflow-go/internal/task/task.go b/eventmesh-workflow-go/internal/task/task.go
index c31a8aec..5735d695 100644
--- a/eventmesh-workflow-go/internal/task/task.go
+++ b/eventmesh-workflow-go/internal/task/task.go
@@ -19,11 +19,13 @@ import (
"context"
"fmt"
catalog "github.com/apache/incubator-eventmesh/eventmesh-catalog-go/api/proto"
+ pconfig "github.com/apache/incubator-eventmesh/eventmesh-server-go/config"
"github.com/apache/incubator-eventmesh/eventmesh-server-go/log"
"github.com/apache/incubator-eventmesh/eventmesh-server-go/pkg/naming/selector"
"github.com/apache/incubator-eventmesh/eventmesh-workflow-go/config"
"github.com/apache/incubator-eventmesh/eventmesh-workflow-go/flow"
"github.com/apache/incubator-eventmesh/eventmesh-workflow-go/internal/constants"
+ "github.com/apache/incubator-eventmesh/eventmesh-workflow-go/internal/dal"
"github.com/apache/incubator-eventmesh/eventmesh-workflow-go/internal/dal/model"
"github.com/apache/incubator-eventmesh/eventmesh-workflow-go/internal/protocol"
"github.com/apache/incubator-eventmesh/eventmesh-workflow-go/internal/queue"
@@ -44,6 +46,7 @@ type baseTask struct {
input string
taskType string
queue queue.ObserveQueue
+ workflowDAL dal.WorkflowDAL
}
func New(instance *model.WorkflowTaskInstance) Task {
@@ -80,20 +83,25 @@ func queryPublishCatalog(operationID string) (*flow.WorkflowEventCatalog, error)
defer closeGRPCConn(grpcConn)
catalogClient := catalog.NewCatalogClient(grpcConn)
- rsp, err := catalogClient.Query(context.Background(), &catalog.QueryRequest{
+ rsp, err := catalogClient.QueryOperations(context.Background(), &catalog.QueryOperationsRequest{
OperationId: operationID,
})
if err != nil {
return nil, err
}
- if rsp.Type != constants.EventTypePublish {
+ if len(rsp.Operations) == 0 {
return nil, fmt.Errorf("operationID %s invalid, please check it", operationID)
}
- return &flow.WorkflowEventCatalog{Topic: rsp.ChannelName, Schema: rsp.Schema, OperationID: operationID}, nil
+ operation := rsp.Operations[0]
+ if operation.Type != constants.EventTypePublish {
+ return nil, fmt.Errorf("operationID %s invalid, please check it", operationID)
+ }
+ return &flow.WorkflowEventCatalog{Topic: operation.ChannelName, Schema: operation.Schema,
+ OperationID: operationID}, nil
}
func getGRPCConn() (*grpc.ClientConn, error) {
- namingClient := selector.Get(config.Get().Flow.Selector)
+ namingClient := selector.Get(pconfig.GlobalConfig().Server.Name)
if namingClient == nil {
return nil, fmt.Errorf("naming client is not registered.please register it")
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org