You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by mi...@apache.org on 2022/10/08 07:57:48 UTC
[incubator-eventmesh] branch master updated: add protocol package: support publish meshmessage
This is an automated email from the ASF dual-hosted git repository.
mikexue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git
The following commit(s) were added to refs/heads/master by this push:
new f0533bef add protocol package: support publish meshmessage
new d3aad381 Merge pull request #1483 from walterlife/add-eventmesh-workflow-protocol
f0533bef is described below
commit f0533bef8989744c863bfe8e1e7637ea6d34ce30
Author: walterlife <wa...@gmail.com>
AuthorDate: Sat Oct 1 23:47:49 2022 +0800
add protocol package:
support publish meshmessage
---
.../internal/protocol/meshmessage.go | 59 ++++++++++++++++++++++
eventmesh-workflow-go/internal/protocol/message.go | 27 ++++++++++
2 files changed, 86 insertions(+)
diff --git a/eventmesh-workflow-go/internal/protocol/meshmessage.go b/eventmesh-workflow-go/internal/protocol/meshmessage.go
new file mode 100644
index 00000000..4c650c50
--- /dev/null
+++ b/eventmesh-workflow-go/internal/protocol/meshmessage.go
@@ -0,0 +1,59 @@
+package protocol
+
+import (
+ "context"
+ pgrpc "github.com/apache/incubator-eventmesh/eventmesh-sdk-go/grpc"
+ "github.com/apache/incubator-eventmesh/eventmesh-sdk-go/grpc/conf"
+ eventmesh "github.com/apache/incubator-eventmesh/eventmesh-sdk-go/grpc/proto"
+ "github.com/apache/incubator-eventmesh/eventmesh-server-go/log"
+ "github.com/apache/incubator-eventmesh/eventmesh-workflow-go/config"
+ "github.com/apache/incubator-eventmesh/eventmesh-workflow-go/internal/constants"
+ "github.com/gogf/gf/util/gconv"
+ "github.com/google/uuid"
+)
+
+func init() {
+ messageBuilder["meshmessage"] = &MeshMessage{}
+}
+
+// MeshMessage eventmesh message definition
+type MeshMessage struct {
+}
+
+func (m *MeshMessage) Publish(ctx context.Context, topic string, content string, properties map[string]string) error {
+ eventmeshCfg := config.Get()
+ cfg := &conf.GRPCConfig{
+ Host: eventmeshCfg.EventMesh.Host,
+ Port: eventmeshCfg.EventMesh.GRPC.Port,
+ ENV: eventmeshCfg.EventMesh.Env,
+ IDC: eventmeshCfg.EventMesh.IDC,
+ SYS: eventmeshCfg.EventMesh.Sys,
+ Username: eventmeshCfg.EventMesh.UserName,
+ Password: eventmeshCfg.EventMesh.Password,
+ ProtocolType: pgrpc.EventmeshMessage,
+ ProducerConfig: conf.ProducerConfig{
+ ProducerGroup: eventmeshCfg.EventMesh.ProducerGroup,
+ },
+ }
+ client, err := pgrpc.New(cfg)
+ if err != nil {
+ return err
+ }
+ defer closeEventMeshClient(client)
+ message := &eventmesh.SimpleMessage{
+ Header: pgrpc.CreateHeader(cfg),
+ ProducerGroup: eventmeshCfg.EventMesh.ProducerGroup,
+ Topic: topic,
+ Content: content,
+ Ttl: gconv.String(eventmeshCfg.EventMesh.TTL),
+ UniqueId: uuid.New().String(),
+ SeqNum: uuid.New().String(),
+ Properties: properties,
+ }
+ resp, err := client.Publish(ctx, message)
+ if err != nil {
+ return err
+ }
+ log.Get(constants.LogSchedule).Debugf("publish event result: %v", resp.String())
+ return nil
+}
diff --git a/eventmesh-workflow-go/internal/protocol/message.go b/eventmesh-workflow-go/internal/protocol/message.go
new file mode 100644
index 00000000..94e0a0f3
--- /dev/null
+++ b/eventmesh-workflow-go/internal/protocol/message.go
@@ -0,0 +1,27 @@
+package protocol
+
+import (
+ "context"
+ pgrpc "github.com/apache/incubator-eventmesh/eventmesh-sdk-go/grpc"
+ "github.com/apache/incubator-eventmesh/eventmesh-server-go/log"
+ "github.com/apache/incubator-eventmesh/eventmesh-workflow-go/internal/constants"
+)
+
+var messageBuilder map[string]Message
+
+// Message workflow message definition
+type Message interface {
+ Publish(ctx context.Context, string, content string, properties map[string]string) error
+}
+
+func closeEventMeshClient(client pgrpc.Interface) {
+ if client != nil {
+ if err := client.Close(); err != nil {
+ log.Get(constants.LogSchedule).Errorf("close eventmesh client error:%v", err)
+ }
+ }
+}
+
+func Builder(protocol string) Message {
+ return messageBuilder[protocol]
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org