You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by mi...@apache.org on 2022/10/27 02:32:44 UTC

[incubator-eventmesh] branch master updated: fix task publish em message error

This is an automated email from the ASF dual-hosted git repository.

mikexue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git


The following commit(s) were added to refs/heads/master by this push:
     new 74f8bf94 fix task publish em message error
     new fcd99788 Merge pull request #1936 from walterlife/optimize-task-logic
74f8bf94 is described below

commit 74f8bf94d659a7a059b181bbef1cc768e90852e3
Author: yuweizhu <yu...@tencent.com>
AuthorDate: Wed Oct 26 23:34:35 2022 +0800

    fix task publish em message error
---
 .../internal/constants/constants.go                |  4 +--
 .../internal/protocol/meshmessage.go               |  6 +++-
 .../internal/task/operation_task.go                | 16 ++++++++++-
 eventmesh-workflow-go/internal/task/switch_task.go | 32 ++++++++++++++++------
 eventmesh-workflow-go/internal/task/task.go        | 16 ++++++++---
 5 files changed, 57 insertions(+), 17 deletions(-)

diff --git a/eventmesh-workflow-go/internal/constants/constants.go b/eventmesh-workflow-go/internal/constants/constants.go
index 3c014e31..a6d22135 100644
--- a/eventmesh-workflow-go/internal/constants/constants.go
+++ b/eventmesh-workflow-go/internal/constants/constants.go
@@ -63,6 +63,6 @@ const (
 
 const (
 	EventTypePublish                 = "publish"
-	EventPropsWorkflowInstanceID     = "workflow_instance_id"
-	EventPropsWorkflowTaskInstanceID = "workflow_task_instance_id"
+	EventPropsWorkflowInstanceID     = "workflowinstanceid"
+	EventPropsWorkflowTaskInstanceID = "workflowtaskinstanceid"
 )
diff --git a/eventmesh-workflow-go/internal/protocol/meshmessage.go b/eventmesh-workflow-go/internal/protocol/meshmessage.go
index 90f95652..c8af9d52 100644
--- a/eventmesh-workflow-go/internal/protocol/meshmessage.go
+++ b/eventmesh-workflow-go/internal/protocol/meshmessage.go
@@ -17,6 +17,7 @@ package protocol
 
 import (
 	"context"
+	"fmt"
 	pgrpc "github.com/apache/incubator-eventmesh/eventmesh-sdk-go/grpc"
 	"github.com/apache/incubator-eventmesh/eventmesh-sdk-go/grpc/conf"
 	eventmesh "github.com/apache/incubator-eventmesh/eventmesh-sdk-go/grpc/proto"
@@ -65,10 +66,13 @@ func (m *MeshMessage) Publish(ctx context.Context, topic string, content string,
 		SeqNum:        uuid.New().String(),
 		Properties:    properties,
 	}
-	resp, err := client.Publish(ctx, message)
+	resp, err := client.Publish(context.Background(), message)
 	if err != nil {
 		return err
 	}
 	log.Get(constants.LogSchedule).Debugf("publish event result: %v", resp.String())
+	if resp.RespCode != "0" {
+		return fmt.Errorf("eventmesh publish message error: [code]%v[msg]%v", resp.RespCode, resp.RespMsg)
+	}
 	return nil
 }
diff --git a/eventmesh-workflow-go/internal/task/operation_task.go b/eventmesh-workflow-go/internal/task/operation_task.go
index b658d307..1e58921c 100644
--- a/eventmesh-workflow-go/internal/task/operation_task.go
+++ b/eventmesh-workflow-go/internal/task/operation_task.go
@@ -16,8 +16,10 @@
 package task
 
 import (
+	"context"
 	"github.com/apache/incubator-eventmesh/eventmesh-server-go/config"
 	"github.com/apache/incubator-eventmesh/eventmesh-workflow-go/internal/constants"
+	"github.com/apache/incubator-eventmesh/eventmesh-workflow-go/internal/dal"
 	"github.com/apache/incubator-eventmesh/eventmesh-workflow-go/internal/dal/model"
 	"github.com/apache/incubator-eventmesh/eventmesh-workflow-go/internal/queue"
 	"github.com/google/uuid"
@@ -39,6 +41,7 @@ func NewOperationTask(instance *model.WorkflowTaskInstance) Task {
 	t.action = instance.Task.Actions[0]
 	t.transition = instance.Task.ChildTasks[0]
 	t.baseTask.queue = queue.GetQueue(config.GlobalConfig().Flow.Queue.Store)
+	t.workflowDAL = dal.NewWorkflowDAL()
 	return &t
 }
 
@@ -46,6 +49,17 @@ func (t *operationTask) Run() error {
 	if t.action == nil {
 		return nil
 	}
+	// match end
+	if t.transition.ToTaskID == constants.TaskEndID {
+		if t.action != nil {
+			if err := publishEvent(t.workflowInstanceID, uuid.New().String(), t.action.OperationName, t.input); err != nil {
+				return err
+			}
+		}
+		return t.workflowDAL.UpdateInstance(context.Background(),
+			&model.WorkflowInstance{WorkflowInstanceID: t.workflowInstanceID,
+				WorkflowStatus: constants.WorkflowInstanceSuccessStatus})
+	}
 	var taskInstanceID = uuid.New().String()
 	var taskInstance = model.WorkflowTaskInstance{WorkflowInstanceID: t.workflowInstanceID, WorkflowID: t.workflowID,
 		TaskID: t.transition.ToTaskID, TaskInstanceID: taskInstanceID, Status: constants.TaskInstanceSleepStatus,
@@ -53,5 +67,5 @@ func (t *operationTask) Run() error {
 	if err := t.baseTask.queue.Publish([]*model.WorkflowTaskInstance{&taskInstance}); err != nil {
 		return err
 	}
-	return publishEvent(t.workflowInstanceID, taskInstanceID, t.input, t.action.OperationName)
+	return publishEvent(t.workflowInstanceID, taskInstanceID, t.action.OperationName, t.input)
 }
diff --git a/eventmesh-workflow-go/internal/task/switch_task.go b/eventmesh-workflow-go/internal/task/switch_task.go
index b39aeb24..cc39d113 100644
--- a/eventmesh-workflow-go/internal/task/switch_task.go
+++ b/eventmesh-workflow-go/internal/task/switch_task.go
@@ -16,12 +16,17 @@
 package task
 
 import (
+	"context"
+	"encoding/json"
 	"github.com/apache/incubator-eventmesh/eventmesh-server-go/config"
 	"github.com/apache/incubator-eventmesh/eventmesh-workflow-go/internal/constants"
+	"github.com/apache/incubator-eventmesh/eventmesh-workflow-go/internal/dal"
 	"github.com/apache/incubator-eventmesh/eventmesh-workflow-go/internal/dal/model"
 	"github.com/apache/incubator-eventmesh/eventmesh-workflow-go/internal/queue"
 	"github.com/apache/incubator-eventmesh/eventmesh-workflow-go/third_party/jqer"
 	"github.com/gogf/gf/util/gconv"
+	"github.com/google/uuid"
+	"strconv"
 )
 
 type switchTask struct {
@@ -35,11 +40,12 @@ func NewSwitchTask(instance *model.WorkflowTaskInstance) Task {
 	if instance == nil || instance.Task == nil {
 		return nil
 	}
-	t.baseTask = baseTask{taskID: instance.TaskID, taskInstanceID: instance.TaskInstanceID, input: instance.Input,
+	t.baseTask = baseTask{taskID: instance.TaskID, input: instance.Input,
 		workflowID: instance.WorkflowID, workflowInstanceID: instance.WorkflowInstanceID,
 		taskType: instance.Task.TaskType}
 	t.transitions = instance.Task.ChildTasks
 	t.baseTask.queue = queue.GetQueue(config.GlobalConfig().Flow.Queue.Store)
+	t.workflowDAL = dal.NewWorkflowDAL()
 	t.jq = jqer.NewJQ()
 	return &t
 }
@@ -48,22 +54,30 @@ func (t *switchTask) Run() error {
 	if len(t.transitions) == 0 {
 		return nil
 	}
-	var tasks []*model.WorkflowTaskInstance
 	for _, transition := range t.transitions {
-		res, err := t.jq.One(t.input, transition.Condition)
+		if transition.ToTaskID == constants.TaskEndID {
+			continue
+		}
+		var jqData interface{}
+		err := json.Unmarshal([]byte(t.input), &jqData)
 		if err != nil {
 			return err
 		}
-		if !gconv.Bool(res) {
+		res, err := t.jq.One(jqData, transition.Condition)
+		if err != nil {
 			continue
 		}
-		if transition.ToTaskID == constants.TaskEndID {
-			break
+		boolValue, err := strconv.ParseBool(gconv.String(res))
+		if err != nil || !boolValue {
+			continue
 		}
 		var taskInstance = model.WorkflowTaskInstance{WorkflowInstanceID: t.workflowInstanceID,
-			WorkflowID: t.workflowID, TaskID: transition.ToTaskID, TaskInstanceID: t.taskInstanceID,
+			WorkflowID: t.workflowID, TaskID: transition.ToTaskID, TaskInstanceID: uuid.New().String(),
 			Status: constants.TaskInstanceWaitStatus, Input: t.baseTask.input}
-		tasks = append(tasks, &taskInstance)
+		return t.baseTask.queue.Publish([]*model.WorkflowTaskInstance{&taskInstance})
 	}
-	return t.baseTask.queue.Publish(tasks)
+	// not match
+	return t.workflowDAL.UpdateInstance(context.Background(),
+		&model.WorkflowInstance{WorkflowInstanceID: t.workflowInstanceID,
+			WorkflowStatus: constants.WorkflowInstanceSuccessStatus})
 }
diff --git a/eventmesh-workflow-go/internal/task/task.go b/eventmesh-workflow-go/internal/task/task.go
index c31a8aec..5735d695 100644
--- a/eventmesh-workflow-go/internal/task/task.go
+++ b/eventmesh-workflow-go/internal/task/task.go
@@ -19,11 +19,13 @@ import (
 	"context"
 	"fmt"
 	catalog "github.com/apache/incubator-eventmesh/eventmesh-catalog-go/api/proto"
+	pconfig "github.com/apache/incubator-eventmesh/eventmesh-server-go/config"
 	"github.com/apache/incubator-eventmesh/eventmesh-server-go/log"
 	"github.com/apache/incubator-eventmesh/eventmesh-server-go/pkg/naming/selector"
 	"github.com/apache/incubator-eventmesh/eventmesh-workflow-go/config"
 	"github.com/apache/incubator-eventmesh/eventmesh-workflow-go/flow"
 	"github.com/apache/incubator-eventmesh/eventmesh-workflow-go/internal/constants"
+	"github.com/apache/incubator-eventmesh/eventmesh-workflow-go/internal/dal"
 	"github.com/apache/incubator-eventmesh/eventmesh-workflow-go/internal/dal/model"
 	"github.com/apache/incubator-eventmesh/eventmesh-workflow-go/internal/protocol"
 	"github.com/apache/incubator-eventmesh/eventmesh-workflow-go/internal/queue"
@@ -44,6 +46,7 @@ type baseTask struct {
 	input              string
 	taskType           string
 	queue              queue.ObserveQueue
+	workflowDAL        dal.WorkflowDAL
 }
 
 func New(instance *model.WorkflowTaskInstance) Task {
@@ -80,20 +83,25 @@ func queryPublishCatalog(operationID string) (*flow.WorkflowEventCatalog, error)
 	defer closeGRPCConn(grpcConn)
 
 	catalogClient := catalog.NewCatalogClient(grpcConn)
-	rsp, err := catalogClient.Query(context.Background(), &catalog.QueryRequest{
+	rsp, err := catalogClient.QueryOperations(context.Background(), &catalog.QueryOperationsRequest{
 		OperationId: operationID,
 	})
 	if err != nil {
 		return nil, err
 	}
-	if rsp.Type != constants.EventTypePublish {
+	if len(rsp.Operations) == 0 {
 		return nil, fmt.Errorf("operationID %s invalid, please check it", operationID)
 	}
-	return &flow.WorkflowEventCatalog{Topic: rsp.ChannelName, Schema: rsp.Schema, OperationID: operationID}, nil
+	operation := rsp.Operations[0]
+	if operation.Type != constants.EventTypePublish {
+		return nil, fmt.Errorf("operationID %s invalid, please check it", operationID)
+	}
+	return &flow.WorkflowEventCatalog{Topic: operation.ChannelName, Schema: operation.Schema,
+		OperationID: operationID}, nil
 }
 
 func getGRPCConn() (*grpc.ClientConn, error) {
-	namingClient := selector.Get(config.Get().Flow.Selector)
+	namingClient := selector.Get(pconfig.GlobalConfig().Server.Name)
 	if namingClient == nil {
 		return nil, fmt.Errorf("naming client is not registered.please register it")
 	}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org