You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by wa...@apache.org on 2022/10/24 10:13:21 UTC
[incubator-eventmesh] branch eventmesh-server-go updated: add prototol plugin
This is an automated email from the ASF dual-hosted git repository.
walleliu pushed a commit to branch eventmesh-server-go
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git
The following commit(s) were added to refs/heads/eventmesh-server-go by this push:
new 71795d76 add prototol plugin
new 5e45fd69 Merge pull request #1813 from walleliu1016/eventmesh-server-go
71795d76 is described below
commit 71795d765360d61811af0dc6977b01b266b847a2
Author: walleliu <li...@163.com>
AuthorDate: Mon Oct 24 18:11:22 2022 +0800
add prototol plugin
---
.../plugin/protocol/cloudevents/Readme.md | 1 +
.../plugin/protocol/cloudevents/cloudevents.go | 215 +++++++++++++++++++++
eventmesh-server-go/plugin/protocol/protocol.go | 4 +
3 files changed, 220 insertions(+)
diff --git a/eventmesh-server-go/plugin/protocol/cloudevents/Readme.md b/eventmesh-server-go/plugin/protocol/cloudevents/Readme.md
new file mode 100644
index 00000000..6f2089ae
--- /dev/null
+++ b/eventmesh-server-go/plugin/protocol/cloudevents/Readme.md
@@ -0,0 +1 @@
+transfer the message with cloudevents protocol
\ No newline at end of file
diff --git a/eventmesh-server-go/plugin/protocol/cloudevents/cloudevents.go b/eventmesh-server-go/plugin/protocol/cloudevents/cloudevents.go
new file mode 100644
index 00000000..bd7ebe7f
--- /dev/null
+++ b/eventmesh-server-go/plugin/protocol/cloudevents/cloudevents.go
@@ -0,0 +1,215 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package cloudevents
+
+import (
+ "context"
+ "fmt"
+ "github.com/apache/incubator-eventmesh/eventmesh-server-go/pkg/common/protocol/grpc"
+ "github.com/apache/incubator-eventmesh/eventmesh-server-go/pkg/common/protocol/tcp"
+ "github.com/apache/incubator-eventmesh/eventmesh-server-go/plugin"
+ "github.com/apache/incubator-eventmesh/eventmesh-server-go/plugin/protocol"
+ "github.com/apache/incubator-eventmesh/eventmesh-server-go/runtime/consts"
+ "github.com/apache/incubator-eventmesh/eventmesh-server-go/runtime/proto/pb"
+ "github.com/cloudevents/sdk-go/v2"
+ "github.com/cloudevents/sdk-go/v2/event"
+ "github.com/cloudevents/sdk-go/v2/event/datacodec"
+)
+
+func init() {
+ plugin.Register(plugin.Protocol, &CloudeventsPlugin{})
+}
+
+// CloudeventsPlugin CloudEvents protocol adaptor
+// used to transform CloudEvents message to CloudEvents message.
+type CloudeventsPlugin struct {
+}
+
+func (c *CloudeventsPlugin) Type() string {
+ return protocol.CloudEvents
+}
+
+func (c *CloudeventsPlugin) Setup(name string, dec plugin.Decoder) error {
+ return nil
+}
+
+func (c *CloudeventsPlugin) ToCloudEvent(msg interface{}) (*v2.Event, error) {
+ switch msg.(type) {
+ case *tcp.Package:
+ pck := msg.(*tcp.Package)
+ return c.deserializeTcpProtocol(pck)
+ case *grpc.SimpleMessageWrapper:
+ sw := msg.(*grpc.SimpleMessageWrapper)
+ sm := sw.SimpleMessage
+ return deserializeGrpcProtocol(sm)
+ }
+ panic("implement me")
+}
+
+func (c *CloudeventsPlugin) ToCloudEvents(msg interface{}) ([]*v2.Event, error) {
+ bmw := msg.(*grpc.BatchMessageWrapper)
+ return buildBatchMessage(bmw.BatchMessage)
+}
+
+func (c *CloudeventsPlugin) FromCloudEvent(event *v2.Event) (interface{}, error) {
+ desc := event.Extensions()[grpc.PROTOCOL_DESC].(string)
+ if desc == "grpc" {
+ return buildSimpleMessage(event)
+ }
+ return nil, fmt.Errorf("only grpc supported now")
+}
+
+func (c *CloudeventsPlugin) ProtocolType() string {
+ //TODO implement me
+ panic("implement me")
+}
+
+func (c *CloudeventsPlugin) deserializeTcpProtocol(pck *tcp.Package) (*v2.Event, error) {
+ // TODO add when support tcp procotol
+ panic("implement me")
+}
+
+func defaultIfEmpty(in string, def interface{}) string {
+ if in == "" {
+ return def.(string)
+ }
+ return in
+}
+
+func defaultIfNil(def string, in interface{}) string {
+ if in == nil {
+ return def
+ }
+ return in.(string)
+}
+
+func deserializeGrpcProtocol(sm *pb.SimpleMessage) (*v2.Event, error) {
+ content := sm.Content
+ ct, ok := sm.Properties[grpc.CONTENT_TYPE]
+ if !ok {
+ ct = consts.CONTENT_TYPE_CLOUDEVENTS_JSON
+ }
+ evt := v2.NewEvent()
+ if err := datacodec.Decode(context.TODO(), ct, []byte(content), &evt); err != nil {
+ return nil, err
+ }
+ hdr := sm.Header
+ result := v2.NewEvent()
+ ver := defaultIfEmpty(hdr.ProtocolVersion, evt.Extensions()[grpc.PROTOCOL_VERSION])
+ topic := defaultIfEmpty(sm.Topic, evt.Subject())
+ result.Extensions()[grpc.ENV] = defaultIfEmpty(hdr.Env, evt.Extensions()[grpc.ENV])
+ result.Extensions()[grpc.IDC] = defaultIfEmpty(hdr.Idc, evt.Extensions()[grpc.IDC])
+ result.Extensions()[grpc.IP] = defaultIfEmpty(hdr.Ip, evt.Extensions()[grpc.IP])
+ result.Extensions()[grpc.PID] = defaultIfEmpty(hdr.Pid, evt.Extensions()[grpc.PID])
+ result.Extensions()[grpc.SYS] = defaultIfEmpty(hdr.Sys, evt.Extensions()[grpc.SYS])
+ result.Extensions()[grpc.LANGUAGE] = defaultIfEmpty(hdr.Language, evt.Extensions()[grpc.LANGUAGE])
+ result.Extensions()[grpc.PROTOCOL_TYPE] = defaultIfEmpty(hdr.ProtocolType, evt.Extensions()[grpc.PROTOCOL_TYPE])
+ result.Extensions()[grpc.PROTOCOL_DESC] = defaultIfEmpty(hdr.ProtocolDesc, evt.Extensions()[grpc.PROTOCOL_DESC])
+ result.Extensions()[grpc.PROTOCOL_VERSION] = defaultIfEmpty(hdr.ProtocolVersion, evt.Extensions()[grpc.PROTOCOL_VERSION])
+ result.Extensions()[grpc.UNIQUE_ID] = defaultIfEmpty(sm.UniqueId, evt.Extensions()[grpc.UNIQUE_ID])
+ result.Extensions()[grpc.SEQ_NUM] = defaultIfEmpty(sm.SeqNum, evt.Extensions()[grpc.SEQ_NUM])
+ result.Extensions()[grpc.USERNAME] = defaultIfEmpty(hdr.Username, evt.Extensions()[grpc.USERNAME])
+ result.Extensions()[grpc.PASSWD] = defaultIfEmpty(hdr.Password, evt.Extensions()[grpc.PASSWD])
+ result.Extensions()[grpc.TTL] = defaultIfEmpty(sm.Ttl, evt.Extensions()[grpc.TTL])
+ result.Extensions()[grpc.PRODUCERGROUP] = defaultIfEmpty(sm.ProducerGroup, evt.Extensions()[grpc.PRODUCERGROUP])
+
+ if ver == event.CloudEventsVersionV1 {
+ result.SetSpecVersion(event.CloudEventsVersionV1)
+ } else {
+ result.SetSpecVersion(event.CloudEventsVersionV03)
+ }
+ result.SetSubject(topic)
+
+ return &result, nil
+}
+
+func buildBatchMessage(bm *pb.BatchMessage) ([]*v2.Event, error) {
+ var msgs []*v2.Event
+ hdr := bm.Header
+
+ for _, item := range bm.MessageItem {
+ content := item.Content
+ ct, ok := item.Properties[grpc.CONTENT_TYPE]
+ if !ok {
+ ct = consts.CONTENT_TYPE_CLOUDEVENTS_JSON
+ }
+ evt := v2.NewEvent()
+ if err := datacodec.Decode(context.TODO(), ct, []byte(content), &evt); err != nil {
+ return nil, err
+ }
+ result := v2.NewEvent()
+ ver := defaultIfEmpty(hdr.ProtocolVersion, evt.Extensions()[grpc.PROTOCOL_VERSION])
+ topic := defaultIfEmpty(bm.Topic, evt.Subject())
+ result.Extensions()[grpc.ENV] = defaultIfEmpty(hdr.Env, evt.Extensions()[grpc.ENV])
+ result.Extensions()[grpc.IDC] = defaultIfEmpty(hdr.Idc, evt.Extensions()[grpc.IDC])
+ result.Extensions()[grpc.IP] = defaultIfEmpty(hdr.Ip, evt.Extensions()[grpc.IP])
+ result.Extensions()[grpc.PID] = defaultIfEmpty(hdr.Pid, evt.Extensions()[grpc.PID])
+ result.Extensions()[grpc.SYS] = defaultIfEmpty(hdr.Sys, evt.Extensions()[grpc.SYS])
+ result.Extensions()[grpc.LANGUAGE] = defaultIfEmpty(hdr.Language, evt.Extensions()[grpc.LANGUAGE])
+ result.Extensions()[grpc.PROTOCOL_TYPE] = defaultIfEmpty(hdr.ProtocolType, evt.Extensions()[grpc.PROTOCOL_TYPE])
+ result.Extensions()[grpc.PROTOCOL_DESC] = defaultIfEmpty(hdr.ProtocolDesc, evt.Extensions()[grpc.PROTOCOL_DESC])
+ result.Extensions()[grpc.PROTOCOL_VERSION] = defaultIfEmpty(hdr.ProtocolVersion, evt.Extensions()[grpc.PROTOCOL_VERSION])
+ result.Extensions()[grpc.UNIQUE_ID] = defaultIfEmpty(item.UniqueId, evt.Extensions()[grpc.UNIQUE_ID])
+ result.Extensions()[grpc.SEQ_NUM] = defaultIfEmpty(item.SeqNum, evt.Extensions()[grpc.SEQ_NUM])
+ result.Extensions()[grpc.USERNAME] = defaultIfEmpty(hdr.Username, evt.Extensions()[grpc.USERNAME])
+ result.Extensions()[grpc.PASSWD] = defaultIfEmpty(hdr.Password, evt.Extensions()[grpc.PASSWD])
+ result.Extensions()[grpc.TTL] = defaultIfEmpty(item.Ttl, evt.Extensions()[grpc.TTL])
+ result.Extensions()[grpc.PRODUCERGROUP] = defaultIfEmpty(bm.ProducerGroup, evt.Extensions()[grpc.PRODUCERGROUP])
+ if ver == event.CloudEventsVersionV1 {
+ result.SetSpecVersion(event.CloudEventsVersionV1)
+ } else {
+ result.SetSpecVersion(event.CloudEventsVersionV03)
+ }
+ result.SetSubject(topic)
+ msgs = append(msgs, &result)
+ }
+ return msgs, nil
+}
+
+func buildSimpleMessage(evt *v2.Event) (*grpc.SimpleMessageWrapper, error) {
+ ct, err := datacodec.Encode(context.TODO(), evt.DataContentType(), evt)
+ if err != nil {
+ return nil, err
+ }
+ hdr := &pb.RequestHeader{
+ Env: defaultIfNil("env", evt.Extensions()[grpc.ENV]),
+ Idc: defaultIfNil("idc", evt.Extensions()[grpc.IDC]),
+ Ip: defaultIfNil("127.0.0.1", evt.Extensions()[grpc.IP]),
+ Pid: defaultIfNil("123", evt.Extensions()[grpc.PID]),
+ Sys: defaultIfNil("sys123", evt.Extensions()[grpc.SYS]),
+ Username: defaultIfNil("user", evt.Extensions()[grpc.USERNAME]),
+ Password: defaultIfNil("pass", evt.Extensions()[grpc.PASSWD]),
+ Language: defaultIfNil("JAVA", evt.Extensions()[grpc.LANGUAGE]),
+ ProtocolType: defaultIfNil("protocol", evt.Extensions()[grpc.PROTOCOL_TYPE]),
+ ProtocolDesc: defaultIfNil("protocolDesc", evt.Extensions()[grpc.PROTOCOL_DESC]),
+ ProtocolVersion: defaultIfNil("1.0", evt.Extensions()[grpc.PROTOCOL_VERSION]),
+ }
+ msg := &pb.SimpleMessage{
+ Header: hdr,
+ Content: string(ct),
+ ProducerGroup: defaultIfNil("producerGroup", evt.Extensions()[grpc.PRODUCERGROUP]),
+ SeqNum: defaultIfNil("", evt.Extensions()[grpc.SEQ_NUM]),
+ UniqueId: defaultIfNil("", evt.Extensions()[grpc.UNIQUE_ID]),
+ Topic: evt.Subject(),
+ Ttl: defaultIfNil("3000", evt.Extensions()[grpc.TTL]),
+ Properties: map[string]string{grpc.CONTENT_TYPE: evt.DataContentType()},
+ }
+ for k, v := range evt.Extensions() {
+ msg.Properties[k] = defaultIfNil("", v)
+ }
+
+ return &grpc.SimpleMessageWrapper{SimpleMessage: msg}, nil
+}
diff --git a/eventmesh-server-go/plugin/protocol/protocol.go b/eventmesh-server-go/plugin/protocol/protocol.go
index 64023d13..e06b35c3 100644
--- a/eventmesh-server-go/plugin/protocol/protocol.go
+++ b/eventmesh-server-go/plugin/protocol/protocol.go
@@ -19,6 +19,10 @@ import (
cloudv2 "github.com/cloudevents/sdk-go/v2"
)
+var (
+ CloudEvents = "cloudevents"
+)
+
// Adapter protocol adapter
// transfer the message with given protocol and events message
type Adapter interface {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org