You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by mi...@apache.org on 2022/11/15 02:51:47 UTC
[incubator-eventmesh] branch master updated: add workflow eventmest queue type
This is an automated email from the ASF dual-hosted git repository.
mikexue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git
The following commit(s) were added to refs/heads/master by this push:
new a9af1a39 add workflow eventmest queue type
new 476ba57a Merge pull request #2217 from horoc/add-workflow-eventmesh-task-queue
a9af1a39 is described below
commit a9af1a3996489285f57f63413c49c8c0b86bdfda
Author: horoc <ho...@gmail.com>
AuthorDate: Thu Nov 10 00:57:21 2022 +0800
add workflow eventmest queue type
---
eventmesh-workflow-go/config/config.go | 2 +
.../internal/constants/constants.go | 2 +
.../internal/queue/eventmesh_queue.go | 174 +++++++++++++++++++++
3 files changed, 178 insertions(+)
diff --git a/eventmesh-workflow-go/config/config.go b/eventmesh-workflow-go/config/config.go
index 7e8bdee5..3d68aa08 100644
--- a/eventmesh-workflow-go/config/config.go
+++ b/eventmesh-workflow-go/config/config.go
@@ -28,6 +28,7 @@ type Config struct {
Flow struct {
Queue struct {
Store string `yaml:"store"`
+ Topic string `yaml:"topic"`
} `yaml:"queue"`
Scheduler struct {
Type string `yaml:"type"`
@@ -50,6 +51,7 @@ type Config struct {
UserName string `yaml:"username"`
Password string `yaml:"password"`
ProducerGroup string `yaml:"producer_group"`
+ ConsumerGroup string `yaml:"consumer_group"`
TTL int `yaml:"ttl"`
} `yaml:"eventmesh"`
}
diff --git a/eventmesh-workflow-go/internal/constants/constants.go b/eventmesh-workflow-go/internal/constants/constants.go
index 2abe5c83..73587976 100644
--- a/eventmesh-workflow-go/internal/constants/constants.go
+++ b/eventmesh-workflow-go/internal/constants/constants.go
@@ -21,6 +21,8 @@ type QueueType int
const (
// QueueTypeInMemory in memory queue type
QueueTypeInMemory = "in-memory"
+ // QueueTypeEventMesh EventMesh queue
+ QueueTypeEventMesh = "eventmesh"
)
const (
diff --git a/eventmesh-workflow-go/internal/queue/eventmesh_queue.go b/eventmesh-workflow-go/internal/queue/eventmesh_queue.go
new file mode 100644
index 00000000..4440fdf4
--- /dev/null
+++ b/eventmesh-workflow-go/internal/queue/eventmesh_queue.go
@@ -0,0 +1,174 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package queue
+
+import (
+ "context"
+ "encoding/json"
+ sdk "github.com/apache/incubator-eventmesh/eventmesh-sdk-go/grpc"
+ sdk_conf "github.com/apache/incubator-eventmesh/eventmesh-sdk-go/grpc/conf"
+ sdk_pb "github.com/apache/incubator-eventmesh/eventmesh-sdk-go/grpc/proto"
+ "github.com/apache/incubator-eventmesh/eventmesh-server-go/log"
+ "github.com/gogf/gf/util/gconv"
+ "github.com/google/uuid"
+
+ conf "github.com/apache/incubator-eventmesh/eventmesh-workflow-go/config"
+
+ "github.com/apache/incubator-eventmesh/eventmesh-workflow-go/internal/constants"
+ "github.com/apache/incubator-eventmesh/eventmesh-workflow-go/internal/dal"
+ "github.com/apache/incubator-eventmesh/eventmesh-workflow-go/internal/dal/model"
+)
+
+func init() {
+ cfg := conf.Get()
+ // init and register EventMesh task queue only when config corresponding queue type
+ if cfg != nil && cfg.Flow.Queue.Store == constants.QueueTypeEventMesh {
+ RegisterQueue(newEventMeshQueue(cfg))
+ }
+}
+
+type eventMeshQueue struct {
+ // EventMesh go sdk grpc client
+ grpcClient sdk.Interface
+ // EventMesh go sdk grpc config
+ grpcConfig *sdk_conf.GRPCConfig
+
+ workflowConfig *conf.Config
+ workflowDAL dal.WorkflowDAL
+ observeTopic string
+}
+
+func newEventMeshQueue(workflowConfig *conf.Config) ObserveQueue {
+ eventMeshConfig := workflowConfig.EventMesh
+ grpcConfig := &sdk_conf.GRPCConfig{
+ Host: eventMeshConfig.Host,
+ Port: eventMeshConfig.GRPC.Port,
+ ENV: eventMeshConfig.Env,
+ IDC: eventMeshConfig.IDC,
+ SYS: eventMeshConfig.Sys,
+ Username: eventMeshConfig.UserName,
+ Password: eventMeshConfig.Password,
+ ProtocolType: sdk.EventmeshMessage,
+ ConsumerConfig: sdk_conf.ConsumerConfig{
+ Enabled: true,
+ ConsumerGroup: eventMeshConfig.ConsumerGroup,
+ },
+ }
+ client, err := sdk.New(grpcConfig)
+ if err != nil {
+ log.Get(constants.LogQueue).Errorf("EventMesh task queue, fail to init EventMesh client , error=%v", err)
+ panic(err)
+ }
+ return &eventMeshQueue{
+ grpcClient: client,
+ grpcConfig: grpcConfig,
+ workflowConfig: workflowConfig,
+ observeTopic: workflowConfig.Flow.Queue.Topic,
+ workflowDAL: dal.NewWorkflowDAL(),
+ }
+}
+
+func (q *eventMeshQueue) Name() string {
+ return constants.QueueTypeEventMesh
+}
+
+// Publish send task to EventMesh queue, store task info in message content with json structure
+func (q *eventMeshQueue) Publish(tasks []*model.WorkflowTaskInstance) error {
+ if len(tasks) == 0 {
+ return nil
+ }
+ for _, task := range tasks {
+ if task == nil {
+ continue
+ }
+ message, err := q.toEventMeshMessage(task)
+ if err != nil {
+ log.Get(constants.LogQueue).Errorf("EventMesh task queue, fail to publish task, error=%v", err)
+ return err
+ }
+ _, err = q.grpcClient.Publish(context.Background(), message)
+ if err != nil {
+ log.Get(constants.LogQueue).Errorf("EventMesh task queue, fail to publish task, error=%v", err)
+ return err
+ }
+ }
+ return nil
+}
+
+// Ack do nothing
+func (q *eventMeshQueue) Ack(tasks *model.WorkflowTaskInstance) error {
+ return nil
+}
+
+// Observe consume task by EventMesh subscription api
+func (q *eventMeshQueue) Observe() {
+ err := q.grpcClient.SubscribeStream(sdk_conf.SubscribeItem{
+ Topic: q.observeTopic,
+ SubscribeMode: sdk_conf.CLUSTERING,
+ SubscribeType: sdk_conf.SYNC,
+ }, q.handler)
+ if err != nil {
+ log.Get(constants.LogQueue).Errorf("EventMesh task queue observe error=%v", err)
+ panic(err)
+ }
+}
+
+func (q *eventMeshQueue) handler(message *sdk_pb.SimpleMessage) interface{} {
+ workflowTask, err := q.toWorkflowTask(message)
+ if err != nil {
+ return err
+ }
+ log.Get(constants.LogQueue).Infof("receive task from EventMesh queue, task=%s", gconv.String(workflowTask))
+ if workflowTask.ID != 0 {
+ if err := q.workflowDAL.UpdateTaskInstance(dal.GetDalClient(), workflowTask); err != nil {
+ log.Get(constants.LogQueue).Errorf("EventMesh task queue observe UpdateTaskInstance error=%v", err)
+ }
+ return err
+ }
+ // new task
+ if err := q.workflowDAL.InsertTaskInstance(context.Background(), workflowTask); err != nil {
+ log.Get(constants.LogQueue).Errorf("EventMesh task queue observe InsertTaskInstance error=%v", err)
+ }
+ return nil
+}
+
+func (q *eventMeshQueue) toEventMeshMessage(task *model.WorkflowTaskInstance) (*sdk_pb.SimpleMessage, error) {
+ taskJsonBytes, err := json.Marshal(task)
+ if err != nil {
+ return nil, err
+ }
+
+ message := &sdk_pb.SimpleMessage{
+ Header: sdk.CreateHeader(q.grpcConfig),
+ ProducerGroup: q.workflowConfig.EventMesh.ProducerGroup,
+ Topic: q.observeTopic,
+ Content: string(taskJsonBytes),
+ Ttl: gconv.String(q.workflowConfig.EventMesh.TTL),
+ UniqueId: uuid.New().String(),
+ SeqNum: uuid.New().String(),
+ }
+ return message, nil
+}
+
+func (q *eventMeshQueue) toWorkflowTask(message *sdk_pb.SimpleMessage) (*model.WorkflowTaskInstance, error) {
+ taskJsonBytes := []byte(message.Content)
+ task := &model.WorkflowTaskInstance{}
+ err := json.Unmarshal(taskJsonBytes, task)
+ if err != nil {
+ return nil, err
+ }
+ return task, nil
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org