You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by mi...@apache.org on 2022/11/15 02:51:47 UTC

[incubator-eventmesh] branch master updated: add workflow eventmest queue type

This is an automated email from the ASF dual-hosted git repository.

mikexue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git


The following commit(s) were added to refs/heads/master by this push:
     new a9af1a39 add workflow eventmest queue type
     new 476ba57a Merge pull request #2217 from horoc/add-workflow-eventmesh-task-queue
a9af1a39 is described below

commit a9af1a3996489285f57f63413c49c8c0b86bdfda
Author: horoc <ho...@gmail.com>
AuthorDate: Thu Nov 10 00:57:21 2022 +0800

    add workflow eventmest queue type
---
 eventmesh-workflow-go/config/config.go             |   2 +
 .../internal/constants/constants.go                |   2 +
 .../internal/queue/eventmesh_queue.go              | 174 +++++++++++++++++++++
 3 files changed, 178 insertions(+)

diff --git a/eventmesh-workflow-go/config/config.go b/eventmesh-workflow-go/config/config.go
index 7e8bdee5..3d68aa08 100644
--- a/eventmesh-workflow-go/config/config.go
+++ b/eventmesh-workflow-go/config/config.go
@@ -28,6 +28,7 @@ type Config struct {
 	Flow struct {
 		Queue struct {
 			Store string `yaml:"store"`
+			Topic string `yaml:"topic"`
 		} `yaml:"queue"`
 		Scheduler struct {
 			Type     string `yaml:"type"`
@@ -50,6 +51,7 @@ type Config struct {
 		UserName      string `yaml:"username"`
 		Password      string `yaml:"password"`
 		ProducerGroup string `yaml:"producer_group"`
+		ConsumerGroup string `yaml:"consumer_group"`
 		TTL           int    `yaml:"ttl"`
 	} `yaml:"eventmesh"`
 }
diff --git a/eventmesh-workflow-go/internal/constants/constants.go b/eventmesh-workflow-go/internal/constants/constants.go
index 2abe5c83..73587976 100644
--- a/eventmesh-workflow-go/internal/constants/constants.go
+++ b/eventmesh-workflow-go/internal/constants/constants.go
@@ -21,6 +21,8 @@ type QueueType int
 const (
 	// QueueTypeInMemory in memory queue type
 	QueueTypeInMemory = "in-memory"
+	// QueueTypeEventMesh EventMesh queue
+	QueueTypeEventMesh = "eventmesh"
 )
 
 const (
diff --git a/eventmesh-workflow-go/internal/queue/eventmesh_queue.go b/eventmesh-workflow-go/internal/queue/eventmesh_queue.go
new file mode 100644
index 00000000..4440fdf4
--- /dev/null
+++ b/eventmesh-workflow-go/internal/queue/eventmesh_queue.go
@@ -0,0 +1,174 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package queue
+
+import (
+	"context"
+	"encoding/json"
+	sdk "github.com/apache/incubator-eventmesh/eventmesh-sdk-go/grpc"
+	sdk_conf "github.com/apache/incubator-eventmesh/eventmesh-sdk-go/grpc/conf"
+	sdk_pb "github.com/apache/incubator-eventmesh/eventmesh-sdk-go/grpc/proto"
+	"github.com/apache/incubator-eventmesh/eventmesh-server-go/log"
+	"github.com/gogf/gf/util/gconv"
+	"github.com/google/uuid"
+
+	conf "github.com/apache/incubator-eventmesh/eventmesh-workflow-go/config"
+
+	"github.com/apache/incubator-eventmesh/eventmesh-workflow-go/internal/constants"
+	"github.com/apache/incubator-eventmesh/eventmesh-workflow-go/internal/dal"
+	"github.com/apache/incubator-eventmesh/eventmesh-workflow-go/internal/dal/model"
+)
+
+func init() {
+	cfg := conf.Get()
+	// init and register EventMesh task queue only when config corresponding queue type
+	if cfg != nil && cfg.Flow.Queue.Store == constants.QueueTypeEventMesh {
+		RegisterQueue(newEventMeshQueue(cfg))
+	}
+}
+
+type eventMeshQueue struct {
+	// EventMesh go sdk grpc client
+	grpcClient sdk.Interface
+	// EventMesh go sdk grpc config
+	grpcConfig *sdk_conf.GRPCConfig
+
+	workflowConfig *conf.Config
+	workflowDAL    dal.WorkflowDAL
+	observeTopic   string
+}
+
+func newEventMeshQueue(workflowConfig *conf.Config) ObserveQueue {
+	eventMeshConfig := workflowConfig.EventMesh
+	grpcConfig := &sdk_conf.GRPCConfig{
+		Host:         eventMeshConfig.Host,
+		Port:         eventMeshConfig.GRPC.Port,
+		ENV:          eventMeshConfig.Env,
+		IDC:          eventMeshConfig.IDC,
+		SYS:          eventMeshConfig.Sys,
+		Username:     eventMeshConfig.UserName,
+		Password:     eventMeshConfig.Password,
+		ProtocolType: sdk.EventmeshMessage,
+		ConsumerConfig: sdk_conf.ConsumerConfig{
+			Enabled:       true,
+			ConsumerGroup: eventMeshConfig.ConsumerGroup,
+		},
+	}
+	client, err := sdk.New(grpcConfig)
+	if err != nil {
+		log.Get(constants.LogQueue).Errorf("EventMesh task queue, fail to init EventMesh client , error=%v", err)
+		panic(err)
+	}
+	return &eventMeshQueue{
+		grpcClient:     client,
+		grpcConfig:     grpcConfig,
+		workflowConfig: workflowConfig,
+		observeTopic:   workflowConfig.Flow.Queue.Topic,
+		workflowDAL:    dal.NewWorkflowDAL(),
+	}
+}
+
+func (q *eventMeshQueue) Name() string {
+	return constants.QueueTypeEventMesh
+}
+
+// Publish send task to EventMesh queue, store task info in message content with json structure
+func (q *eventMeshQueue) Publish(tasks []*model.WorkflowTaskInstance) error {
+	if len(tasks) == 0 {
+		return nil
+	}
+	for _, task := range tasks {
+		if task == nil {
+			continue
+		}
+		message, err := q.toEventMeshMessage(task)
+		if err != nil {
+			log.Get(constants.LogQueue).Errorf("EventMesh task queue, fail to publish task, error=%v", err)
+			return err
+		}
+		_, err = q.grpcClient.Publish(context.Background(), message)
+		if err != nil {
+			log.Get(constants.LogQueue).Errorf("EventMesh task queue, fail to publish task, error=%v", err)
+			return err
+		}
+	}
+	return nil
+}
+
+// Ack do nothing
+func (q *eventMeshQueue) Ack(tasks *model.WorkflowTaskInstance) error {
+	return nil
+}
+
+// Observe consume task by EventMesh subscription api
+func (q *eventMeshQueue) Observe() {
+	err := q.grpcClient.SubscribeStream(sdk_conf.SubscribeItem{
+		Topic:         q.observeTopic,
+		SubscribeMode: sdk_conf.CLUSTERING,
+		SubscribeType: sdk_conf.SYNC,
+	}, q.handler)
+	if err != nil {
+		log.Get(constants.LogQueue).Errorf("EventMesh task queue observe error=%v", err)
+		panic(err)
+	}
+}
+
+func (q *eventMeshQueue) handler(message *sdk_pb.SimpleMessage) interface{} {
+	workflowTask, err := q.toWorkflowTask(message)
+	if err != nil {
+		return err
+	}
+	log.Get(constants.LogQueue).Infof("receive task from EventMesh queue, task=%s", gconv.String(workflowTask))
+	if workflowTask.ID != 0 {
+		if err := q.workflowDAL.UpdateTaskInstance(dal.GetDalClient(), workflowTask); err != nil {
+			log.Get(constants.LogQueue).Errorf("EventMesh task queue observe UpdateTaskInstance error=%v", err)
+		}
+		return err
+	}
+	// new task
+	if err := q.workflowDAL.InsertTaskInstance(context.Background(), workflowTask); err != nil {
+		log.Get(constants.LogQueue).Errorf("EventMesh task queue observe InsertTaskInstance error=%v", err)
+	}
+	return nil
+}
+
+func (q *eventMeshQueue) toEventMeshMessage(task *model.WorkflowTaskInstance) (*sdk_pb.SimpleMessage, error) {
+	taskJsonBytes, err := json.Marshal(task)
+	if err != nil {
+		return nil, err
+	}
+
+	message := &sdk_pb.SimpleMessage{
+		Header:        sdk.CreateHeader(q.grpcConfig),
+		ProducerGroup: q.workflowConfig.EventMesh.ProducerGroup,
+		Topic:         q.observeTopic,
+		Content:       string(taskJsonBytes),
+		Ttl:           gconv.String(q.workflowConfig.EventMesh.TTL),
+		UniqueId:      uuid.New().String(),
+		SeqNum:        uuid.New().String(),
+	}
+	return message, nil
+}
+
+func (q *eventMeshQueue) toWorkflowTask(message *sdk_pb.SimpleMessage) (*model.WorkflowTaskInstance, error) {
+	taskJsonBytes := []byte(message.Content)
+	task := &model.WorkflowTaskInstance{}
+	err := json.Unmarshal(taskJsonBytes, task)
+	if err != nil {
+		return nil, err
+	}
+	return task, nil
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org