You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by mi...@apache.org on 2022/10/08 07:57:48 UTC

[incubator-eventmesh] branch master updated: add protocol package: support publish meshmessage

This is an automated email from the ASF dual-hosted git repository.

mikexue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git


The following commit(s) were added to refs/heads/master by this push:
     new f0533bef add protocol package: support publish meshmessage
     new d3aad381 Merge pull request #1483 from walterlife/add-eventmesh-workflow-protocol
f0533bef is described below

commit f0533bef8989744c863bfe8e1e7637ea6d34ce30
Author: walterlife <wa...@gmail.com>
AuthorDate: Sat Oct 1 23:47:49 2022 +0800

    add protocol package:
    support publish meshmessage
---
 .../internal/protocol/meshmessage.go               | 59 ++++++++++++++++++++++
 eventmesh-workflow-go/internal/protocol/message.go | 27 ++++++++++
 2 files changed, 86 insertions(+)

diff --git a/eventmesh-workflow-go/internal/protocol/meshmessage.go b/eventmesh-workflow-go/internal/protocol/meshmessage.go
new file mode 100644
index 00000000..4c650c50
--- /dev/null
+++ b/eventmesh-workflow-go/internal/protocol/meshmessage.go
@@ -0,0 +1,59 @@
+package protocol
+
+import (
+	"context"
+	pgrpc "github.com/apache/incubator-eventmesh/eventmesh-sdk-go/grpc"
+	"github.com/apache/incubator-eventmesh/eventmesh-sdk-go/grpc/conf"
+	eventmesh "github.com/apache/incubator-eventmesh/eventmesh-sdk-go/grpc/proto"
+	"github.com/apache/incubator-eventmesh/eventmesh-server-go/log"
+	"github.com/apache/incubator-eventmesh/eventmesh-workflow-go/config"
+	"github.com/apache/incubator-eventmesh/eventmesh-workflow-go/internal/constants"
+	"github.com/gogf/gf/util/gconv"
+	"github.com/google/uuid"
+)
+
+func init() {
+	messageBuilder["meshmessage"] = &MeshMessage{}
+}
+
+// MeshMessage eventmesh message definition
+type MeshMessage struct {
+}
+
+func (m *MeshMessage) Publish(ctx context.Context, topic string, content string, properties map[string]string) error {
+	eventmeshCfg := config.Get()
+	cfg := &conf.GRPCConfig{
+		Host:         eventmeshCfg.EventMesh.Host,
+		Port:         eventmeshCfg.EventMesh.GRPC.Port,
+		ENV:          eventmeshCfg.EventMesh.Env,
+		IDC:          eventmeshCfg.EventMesh.IDC,
+		SYS:          eventmeshCfg.EventMesh.Sys,
+		Username:     eventmeshCfg.EventMesh.UserName,
+		Password:     eventmeshCfg.EventMesh.Password,
+		ProtocolType: pgrpc.EventmeshMessage,
+		ProducerConfig: conf.ProducerConfig{
+			ProducerGroup: eventmeshCfg.EventMesh.ProducerGroup,
+		},
+	}
+	client, err := pgrpc.New(cfg)
+	if err != nil {
+		return err
+	}
+	defer closeEventMeshClient(client)
+	message := &eventmesh.SimpleMessage{
+		Header:        pgrpc.CreateHeader(cfg),
+		ProducerGroup: eventmeshCfg.EventMesh.ProducerGroup,
+		Topic:         topic,
+		Content:       content,
+		Ttl:           gconv.String(eventmeshCfg.EventMesh.TTL),
+		UniqueId:      uuid.New().String(),
+		SeqNum:        uuid.New().String(),
+		Properties:    properties,
+	}
+	resp, err := client.Publish(ctx, message)
+	if err != nil {
+		return err
+	}
+	log.Get(constants.LogSchedule).Debugf("publish event result: %v", resp.String())
+	return nil
+}
diff --git a/eventmesh-workflow-go/internal/protocol/message.go b/eventmesh-workflow-go/internal/protocol/message.go
new file mode 100644
index 00000000..94e0a0f3
--- /dev/null
+++ b/eventmesh-workflow-go/internal/protocol/message.go
@@ -0,0 +1,27 @@
+package protocol
+
+import (
+	"context"
+	pgrpc "github.com/apache/incubator-eventmesh/eventmesh-sdk-go/grpc"
+	"github.com/apache/incubator-eventmesh/eventmesh-server-go/log"
+	"github.com/apache/incubator-eventmesh/eventmesh-workflow-go/internal/constants"
+)
+
+var messageBuilder map[string]Message
+
+// Message workflow message definition
+type Message interface {
+	Publish(ctx context.Context, string, content string, properties map[string]string) error
+}
+
+func closeEventMeshClient(client pgrpc.Interface) {
+	if client != nil {
+		if err := client.Close(); err != nil {
+			log.Get(constants.LogSchedule).Errorf("close eventmesh client error:%v", err)
+		}
+	}
+}
+
+func Builder(protocol string) Message {
+	return messageBuilder[protocol]
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org