You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by ch...@apache.org on 2022/04/01 02:23:48 UTC
[incubator-eventmesh] branch go-sdk updated: Support golang sdk (#762)
This is an automated email from the ASF dual-hosted git repository.
chenguangsheng pushed a commit to branch go-sdk
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git
The following commit(s) were added to refs/heads/go-sdk by this push:
new dc6bd50 Support golang sdk (#762)
new c9f408f Merge pull request #803 from xiehongfeng100/go-sdk
dc6bd50 is described below
commit dc6bd50954685cfdc3e414b55092d51a0e65f363
Author: xiehongfeng100 <xi...@hotmail.com>
AuthorDate: Sun Mar 6 23:40:43 2022 +0800
Support golang sdk (#762)
---
eventmesh-sdk-go/common/constants.go | 33 ++++
eventmesh-sdk-go/common/protocol/http/body/body.go | 9 +
.../http/body/client/heartbeat_request_body.go | 57 ++++++
.../http/body/client/subscribe_request_body.go | 23 +++
.../common/protocol/http/common/client_type.go | 20 ++
.../protocol/http/common/eventmesh_ret_code.go | 12 ++
.../common/protocol/http/common/protocol_key.go | 70 +++++++
.../protocol/http/common/protocol_version.go | 25 +++
.../common/protocol/http/common/request_code.go | 80 ++++++++
.../http/message/send_message_request_body.go | 96 ++++++++++
eventmesh-sdk-go/common/protocol/message_type.go | 13 ++
.../common/protocol/subscription_item.go | 7 +
.../common/protocol/subscription_mode.go | 11 ++
.../common/protocol/subscription_type.go | 11 ++
.../common/protocol/tcp/codec/codec.go | 168 +++++++++++++++++
eventmesh-sdk-go/common/protocol/tcp/command.go | 202 +++++++++++++++++++++
eventmesh-sdk-go/common/protocol/tcp/header.go | 80 ++++++++
eventmesh-sdk-go/common/protocol/tcp/package.go | 10 +
eventmesh-sdk-go/common/protocol/tcp/user_agent.go | 23 +++
eventmesh-sdk-go/common/utils/json_utils.go | 29 +++
.../examples/http/async_pub_cloudevents.go | 52 ++++++
eventmesh-sdk-go/examples/http/sub_cloudevents.go | 99 ++++++++++
.../examples/tcp/async_pub_cloudevents.go | 45 +++++
eventmesh-sdk-go/go.mod | 8 +
eventmesh-sdk-go/go.sum | 39 ++++
eventmesh-sdk-go/http/abstract_http_client.go | 43 +++++
.../http/conf/eventmesh_http_client_config.go | 144 +++++++++++++++
.../http/consumer/eventmesh_http_consumer.go | 95 ++++++++++
eventmesh-sdk-go/http/eventmesh_ret_obj.go | 7 +
eventmesh-sdk-go/http/model/request_param.go | 53 ++++++
.../http/producer/cloudevent_producer.go | 85 +++++++++
.../http/producer/eventmesh_http_producer.go | 27 +++
.../http/producer/eventmesh_protocol_producer.go | 5 +
eventmesh-sdk-go/http/utils/http_utils.go | 41 +++++
eventmesh-sdk-go/main.go | 12 ++
eventmesh-sdk-go/tcp/cloudevent_tcp_client.go | 35 ++++
eventmesh-sdk-go/tcp/cloudevent_tcp_pub_client.go | 31 ++++
eventmesh-sdk-go/tcp/cloudevent_tcp_sub_client.go | 26 +++
eventmesh-sdk-go/tcp/common/eventmesh_common.go | 22 +++
eventmesh-sdk-go/tcp/common/request_context.go | 60 ++++++
.../tcp/conf/eventmesh_tcp_client_config.go | 37 ++++
eventmesh-sdk-go/tcp/eventmesh_tcp_client.go | 10 +
.../tcp/eventmesh_tcp_client_factory.go | 15 ++
eventmesh-sdk-go/tcp/eventmesh_tcp_pub_client.go | 9 +
eventmesh-sdk-go/tcp/eventmesh_tcp_sub_client.go | 9 +
eventmesh-sdk-go/tcp/tcp_client.go | 129 +++++++++++++
eventmesh-sdk-go/tcp/utils/message_utils.go | 45 +++++
47 files changed, 2162 insertions(+)
diff --git a/eventmesh-sdk-go/common/constants.go b/eventmesh-sdk-go/common/constants.go
new file mode 100644
index 0000000..bdc0859
--- /dev/null
+++ b/eventmesh-sdk-go/common/constants.go
@@ -0,0 +1,33 @@
+package common
+
+var Constants = struct {
+ LANGUAGE_GO string
+ HTTP_PROTOCOL_PREFIX string
+ HTTPS_PROTOCOL_PREFIX string
+ PROTOCOL_TYPE string
+ PROTOCOL_VERSION string
+ PROTOCOL_DESC string
+ DEFAULT_HTTP_TIME_OUT int64
+ EVENTMESH_MESSAGE_CONST_TTL string
+
+ // Client heartbeat interval
+ HEARTBEAT int64
+
+ // Protocol type
+ CLOUD_EVENTS_PROTOCOL_NAME string
+ EM_MESSAGE_PROTOCOL_NAME string
+ OPEN_MESSAGE_PROTOCOL_NAME string
+}{
+ LANGUAGE_GO: "GO",
+ HTTP_PROTOCOL_PREFIX: "http://",
+ HTTPS_PROTOCOL_PREFIX: "https://",
+ PROTOCOL_TYPE: "protocoltype",
+ PROTOCOL_VERSION: "protocolversion",
+ PROTOCOL_DESC: "protocoldesc",
+ DEFAULT_HTTP_TIME_OUT: 15000,
+ EVENTMESH_MESSAGE_CONST_TTL: "ttl",
+ HEARTBEAT: 30 * 1000,
+ CLOUD_EVENTS_PROTOCOL_NAME: "cloudevents",
+ EM_MESSAGE_PROTOCOL_NAME: "eventmeshmessage",
+ OPEN_MESSAGE_PROTOCOL_NAME: "openmessage",
+}
diff --git a/eventmesh-sdk-go/common/protocol/http/body/body.go b/eventmesh-sdk-go/common/protocol/http/body/body.go
new file mode 100644
index 0000000..b050820
--- /dev/null
+++ b/eventmesh-sdk-go/common/protocol/http/body/body.go
@@ -0,0 +1,9 @@
+package body
+
+type Body struct {
+ ToMap map[string]interface{}
+}
+
+func (b *Body) BuildBody(requestCode string, originalMap map[string]interface{}) *Body {
+ return nil
+}
diff --git a/eventmesh-sdk-go/common/protocol/http/body/client/heartbeat_request_body.go b/eventmesh-sdk-go/common/protocol/http/body/client/heartbeat_request_body.go
new file mode 100644
index 0000000..2ae5ab3
--- /dev/null
+++ b/eventmesh-sdk-go/common/protocol/http/body/client/heartbeat_request_body.go
@@ -0,0 +1,57 @@
+package client
+
+import (
+ "eventmesh/common/protocol/http/body"
+)
+
+var HeartbeatRequestBodyKey = struct {
+ CLIENTTYPE string
+ CONSUMERGROUP string
+ HEARTBEATENTITIES string
+}{
+ CLIENTTYPE: "clientType",
+ HEARTBEATENTITIES: "heartbeatEntities",
+ CONSUMERGROUP: "consumerGroup",
+}
+
+type HeartbeatEntity struct {
+ Topic string `json:"topic"`
+ Url string `json:"url"`
+ ServiceId string `json:"serviceId"`
+ InstanceId string `json:"instanceId"`
+}
+
+type HeartbeatRequestBody struct {
+ body.Body
+ consumerGroup string
+ clientType string
+ heartbeatEntities string
+}
+
+func (h *HeartbeatRequestBody) ConsumerGroup() string {
+ return h.consumerGroup
+}
+
+func (h *HeartbeatRequestBody) SetConsumerGroup(consumerGroup string) {
+ h.consumerGroup = consumerGroup
+}
+
+func (h *HeartbeatRequestBody) ClientType() string {
+ return h.clientType
+}
+
+func (h *HeartbeatRequestBody) SetClientType(clientType string) {
+ h.clientType = clientType
+}
+
+func (h *HeartbeatRequestBody) HeartbeatEntities() string {
+ return h.heartbeatEntities
+}
+
+func (h *HeartbeatRequestBody) SetHeartbeatEntities(heartbeatEntities string) {
+ h.heartbeatEntities = heartbeatEntities
+}
+
+func (h *HeartbeatRequestBody) BuildBody(bodyParam map[string]interface{}) *HeartbeatRequestBody {
+ return nil
+}
diff --git a/eventmesh-sdk-go/common/protocol/http/body/client/subscribe_request_body.go b/eventmesh-sdk-go/common/protocol/http/body/client/subscribe_request_body.go
new file mode 100644
index 0000000..07d797d
--- /dev/null
+++ b/eventmesh-sdk-go/common/protocol/http/body/client/subscribe_request_body.go
@@ -0,0 +1,23 @@
+package client
+
+import (
+ "eventmesh/common/protocol"
+ "eventmesh/common/protocol/http/body"
+)
+
+var SubscribeRequestBodyKey = struct {
+ TOPIC string
+ URL string
+ CONSUMERGROUP string
+}{
+ TOPIC: "topic",
+ URL: "url",
+ CONSUMERGROUP: "consumerGroup",
+}
+
+type SubscribeRequestBody struct {
+ body.Body
+ topics []protocol.SubscriptionItem
+ url string
+ consumerGroup string
+}
diff --git a/eventmesh-sdk-go/common/protocol/http/common/client_type.go b/eventmesh-sdk-go/common/protocol/http/common/client_type.go
new file mode 100644
index 0000000..b72ba80
--- /dev/null
+++ b/eventmesh-sdk-go/common/protocol/http/common/client_type.go
@@ -0,0 +1,20 @@
+package common
+
+type ClientType struct {
+ Type int `json:"type"`
+ Desc string `json:"desc"`
+}
+
+var DefaultClientType = struct {
+ PUB ClientType
+ SUB ClientType
+}{
+ PUB: ClientType{
+ Type: 1,
+ Desc: "Client for publishing",
+ },
+ SUB: ClientType{
+ Type: 2,
+ Desc: "Client for subscribing",
+ },
+}
diff --git a/eventmesh-sdk-go/common/protocol/http/common/eventmesh_ret_code.go b/eventmesh-sdk-go/common/protocol/http/common/eventmesh_ret_code.go
new file mode 100644
index 0000000..3f26f82
--- /dev/null
+++ b/eventmesh-sdk-go/common/protocol/http/common/eventmesh_ret_code.go
@@ -0,0 +1,12 @@
+package common
+
+type EventMeshRetCode struct {
+ RetCode int `json:"retCode"`
+ ErrMsg string `json:"errMsg"`
+}
+
+var DefaultEventMeshRetCode = struct {
+ SUCCESS EventMeshRetCode
+}{
+ SUCCESS: EventMeshRetCode{RetCode: 0, ErrMsg: "success"},
+}
diff --git a/eventmesh-sdk-go/common/protocol/http/common/protocol_key.go b/eventmesh-sdk-go/common/protocol/http/common/protocol_key.go
new file mode 100644
index 0000000..6d67de8
--- /dev/null
+++ b/eventmesh-sdk-go/common/protocol/http/common/protocol_key.go
@@ -0,0 +1,70 @@
+package common
+
+type ClientInstanceKey struct {
+ //Protocol layer requester description
+ ENV string
+ IDC string
+ SYS string
+ PID string
+ IP string
+ USERNAME string
+ PASSWORD string
+ BIZSEQNO string
+ UNIQUEID string
+}
+
+type EventMeshInstanceKey struct {
+ //Protocol layer EventMesh description
+ EVENTMESHCLUSTER string
+ EVENTMESHIP string
+ EVENTMESHENV string
+ EVENTMESHIDC string
+}
+
+var ProtocolKey = struct {
+ REQUEST_CODE string
+ LANGUAGE string
+ VERSION string
+ PROTOCOL_TYPE string
+ PROTOCOL_VERSION string
+ PROTOCOL_DESC string
+
+ ClientInstanceKey ClientInstanceKey
+
+ EventMeshInstanceKey EventMeshInstanceKey
+
+ //return of CLIENT <-> EventMesh
+ RETCODE string
+ RETMSG string
+ RESTIME string
+}{
+ REQUEST_CODE: "code",
+ LANGUAGE: "language",
+ VERSION: "version",
+ PROTOCOL_TYPE: "protocoltype",
+ PROTOCOL_VERSION: "protocolversion",
+ PROTOCOL_DESC: "protocoldesc",
+
+ ClientInstanceKey: ClientInstanceKey{
+ ENV: "env",
+ IDC: "idc",
+ SYS: "sys",
+ PID: "pid",
+ IP: "ip",
+ USERNAME: "username",
+ PASSWORD: "passwd",
+ BIZSEQNO: "bizseqno",
+ UNIQUEID: "uniqueid",
+ },
+
+ EventMeshInstanceKey: EventMeshInstanceKey{
+ EVENTMESHCLUSTER: "eventmeshcluster",
+ EVENTMESHIP: "eventmeship",
+ EVENTMESHENV: "eventmeshenv",
+ EVENTMESHIDC: "eventmeshidc",
+ },
+
+ RETCODE: "retCode",
+ RETMSG: "retMsg",
+ RESTIME: "resTime",
+}
diff --git a/eventmesh-sdk-go/common/protocol/http/common/protocol_version.go b/eventmesh-sdk-go/common/protocol/http/common/protocol_version.go
new file mode 100644
index 0000000..f7fe6a5
--- /dev/null
+++ b/eventmesh-sdk-go/common/protocol/http/common/protocol_version.go
@@ -0,0 +1,25 @@
+package common
+
+type ProtocolVersion struct {
+ version string
+}
+
+func (p *ProtocolVersion) Version() string {
+ return p.version
+}
+
+func (p *ProtocolVersion) SetVersion(version string) {
+ p.version = version
+}
+
+var DefaultProtocolVersion = struct {
+ V1 ProtocolVersion
+ V2 ProtocolVersion
+}{
+ V1: ProtocolVersion{
+ version: "1.0",
+ },
+ V2: ProtocolVersion{
+ version: "2.0",
+ },
+}
diff --git a/eventmesh-sdk-go/common/protocol/http/common/request_code.go b/eventmesh-sdk-go/common/protocol/http/common/request_code.go
new file mode 100644
index 0000000..6ea3a66
--- /dev/null
+++ b/eventmesh-sdk-go/common/protocol/http/common/request_code.go
@@ -0,0 +1,80 @@
+package common
+
+type RequestCode struct {
+ RequestCode int `json:"requestCode"`
+ Desc string `json:"desc"`
+}
+
+var DefaultRequestCode = struct {
+ MSG_BATCH_SEND RequestCode
+ MSG_BATCH_SEND_V2 RequestCode
+ MSG_SEND_SYNC RequestCode
+ MSG_SEND_ASYNC RequestCode
+ HTTP_PUSH_CLIENT_ASYNC RequestCode
+ HTTP_PUSH_CLIENT_SYNC RequestCode
+ REGISTER RequestCode
+ UNREGISTER RequestCode
+ HEARTBEAT RequestCode
+ SUBSCRIBE RequestCode
+ UNSUBSCRIBE RequestCode
+ REPLY_MESSAGE RequestCode
+ ADMIN_METRICS RequestCode
+ ADMIN_SHUTDOWN RequestCode
+}{
+ MSG_BATCH_SEND: RequestCode{
+ RequestCode: 102,
+ Desc: "SEND BATCH MSG",
+ },
+ MSG_BATCH_SEND_V2: RequestCode{
+ RequestCode: 107,
+ Desc: "SEND BATCH MSG V2",
+ },
+ MSG_SEND_SYNC: RequestCode{
+ RequestCode: 101,
+ Desc: "SEND SINGLE MSG SYNC",
+ },
+ MSG_SEND_ASYNC: RequestCode{
+ RequestCode: 104,
+ Desc: "SEND SINGLE MSG ASYNC",
+ },
+ HTTP_PUSH_CLIENT_ASYNC: RequestCode{
+ RequestCode: 105,
+ Desc: "PUSH CLIENT BY HTTP POST",
+ },
+ HTTP_PUSH_CLIENT_SYNC: RequestCode{
+ RequestCode: 106,
+ Desc: "PUSH CLIENT BY HTTP POST",
+ },
+ REGISTER: RequestCode{
+ RequestCode: 201,
+ Desc: "REGISTER",
+ },
+ UNREGISTER: RequestCode{
+ RequestCode: 202,
+ Desc: "UNREGISTER",
+ },
+ HEARTBEAT: RequestCode{
+ RequestCode: 203,
+ Desc: "HEARTBEAT",
+ },
+ SUBSCRIBE: RequestCode{
+ RequestCode: 206,
+ Desc: "SUBSCRIBE",
+ },
+ UNSUBSCRIBE: RequestCode{
+ RequestCode: 207,
+ Desc: "UNSUBSCRIBE",
+ },
+ REPLY_MESSAGE: RequestCode{
+ RequestCode: 301,
+ Desc: "REPLY MESSAGE",
+ },
+ ADMIN_METRICS: RequestCode{
+ RequestCode: 603,
+ Desc: "ADMIN METRICS",
+ },
+ ADMIN_SHUTDOWN: RequestCode{
+ RequestCode: 601,
+ Desc: "ADMIN SHUTDOWN",
+ },
+}
diff --git a/eventmesh-sdk-go/common/protocol/http/message/send_message_request_body.go b/eventmesh-sdk-go/common/protocol/http/message/send_message_request_body.go
new file mode 100644
index 0000000..0fc0744
--- /dev/null
+++ b/eventmesh-sdk-go/common/protocol/http/message/send_message_request_body.go
@@ -0,0 +1,96 @@
+package message
+
+type SendMessageRequestBody struct {
+ topic string
+ bizSeqNo string
+ uniqueId string
+ ttl string
+ content string
+ tag string
+ extFields map[string]string
+ producerGroup string
+}
+
+func (s *SendMessageRequestBody) Topic() string {
+ return s.topic
+}
+
+func (s *SendMessageRequestBody) SetTopic(topic string) {
+ s.topic = topic
+}
+
+func (s *SendMessageRequestBody) BizSeqNo() string {
+ return s.bizSeqNo
+}
+
+func (s *SendMessageRequestBody) SetBizSeqNo(bizSeqNo string) {
+ s.bizSeqNo = bizSeqNo
+}
+
+func (s *SendMessageRequestBody) UniqueId() string {
+ return s.uniqueId
+}
+
+func (s *SendMessageRequestBody) SetUniqueId(uniqueId string) {
+ s.uniqueId = uniqueId
+}
+
+func (s *SendMessageRequestBody) Ttl() string {
+ return s.ttl
+}
+
+func (s *SendMessageRequestBody) SetTtl(ttl string) {
+ s.ttl = ttl
+}
+
+func (s *SendMessageRequestBody) Content() string {
+ return s.content
+}
+
+func (s *SendMessageRequestBody) SetContent(content string) {
+ s.content = content
+}
+
+func (s *SendMessageRequestBody) Tag() string {
+ return s.tag
+}
+
+func (s *SendMessageRequestBody) SetTag(tag string) {
+ s.tag = tag
+}
+
+func (s *SendMessageRequestBody) ExtFields() map[string]string {
+ return s.extFields
+}
+
+func (s *SendMessageRequestBody) SetExtFields(extFields map[string]string) {
+ s.extFields = extFields
+}
+
+func (s *SendMessageRequestBody) ProducerGroup() string {
+ return s.producerGroup
+}
+
+func (s *SendMessageRequestBody) SetProducerGroup(producerGroup string) {
+ s.producerGroup = producerGroup
+}
+
+var SendMessageRequestBodyKey = struct {
+ TOPIC string
+ BIZSEQNO string
+ UNIQUEID string
+ CONTENT string
+ TTL string
+ TAG string
+ EXTFIELDS string
+ PRODUCERGROUP string
+}{
+ TOPIC: "topic",
+ BIZSEQNO: "bizseqno",
+ UNIQUEID: "uniqueid",
+ CONTENT: "content",
+ TTL: "ttl",
+ TAG: "tag",
+ EXTFIELDS: "extFields",
+ PRODUCERGROUP: "producergroup",
+}
diff --git a/eventmesh-sdk-go/common/protocol/message_type.go b/eventmesh-sdk-go/common/protocol/message_type.go
new file mode 100644
index 0000000..36f87e8
--- /dev/null
+++ b/eventmesh-sdk-go/common/protocol/message_type.go
@@ -0,0 +1,13 @@
+package protocol
+
+type MessageType string
+
+var DefaultMessageType = struct {
+ CloudEvent MessageType
+ OpenMessage MessageType
+ EventMeshMessage MessageType
+}{
+ CloudEvent: "CloudEvent",
+ OpenMessage: "OpenMessage",
+ EventMeshMessage: "EventMeshMessage",
+}
diff --git a/eventmesh-sdk-go/common/protocol/subscription_item.go b/eventmesh-sdk-go/common/protocol/subscription_item.go
new file mode 100644
index 0000000..3cfa3c0
--- /dev/null
+++ b/eventmesh-sdk-go/common/protocol/subscription_item.go
@@ -0,0 +1,7 @@
+package protocol
+
+type SubscriptionItem struct {
+ Topic string `json:"topic"`
+ Mode SubscriptionMode `json:"mode"`
+ Type SubscriptionType `json:"type"`
+}
diff --git a/eventmesh-sdk-go/common/protocol/subscription_mode.go b/eventmesh-sdk-go/common/protocol/subscription_mode.go
new file mode 100644
index 0000000..1cf6961
--- /dev/null
+++ b/eventmesh-sdk-go/common/protocol/subscription_mode.go
@@ -0,0 +1,11 @@
+package protocol
+
+type SubscriptionMode string
+
+var DefaultSubscriptionMode = struct {
+ BROADCASTING SubscriptionMode
+ CLUSTERING SubscriptionMode
+}{
+ BROADCASTING: "BROADCASTING",
+ CLUSTERING: "CLUSTERING",
+}
diff --git a/eventmesh-sdk-go/common/protocol/subscription_type.go b/eventmesh-sdk-go/common/protocol/subscription_type.go
new file mode 100644
index 0000000..3d09844
--- /dev/null
+++ b/eventmesh-sdk-go/common/protocol/subscription_type.go
@@ -0,0 +1,11 @@
+package protocol
+
+type SubscriptionType string
+
+var DefaultSubscriptionType = struct {
+ SYNC SubscriptionType
+ ASYNC SubscriptionType
+}{
+ SYNC: "SYNC",
+ ASYNC: "ASYNC",
+}
diff --git a/eventmesh-sdk-go/common/protocol/tcp/codec/codec.go b/eventmesh-sdk-go/common/protocol/tcp/codec/codec.go
new file mode 100644
index 0000000..396ef7a
--- /dev/null
+++ b/eventmesh-sdk-go/common/protocol/tcp/codec/codec.go
@@ -0,0 +1,168 @@
+package codec
+
+import (
+ "bytes"
+ "encoding/binary"
+ gcommon "eventmesh/common"
+ "eventmesh/common/protocol/tcp"
+ gutils "eventmesh/common/utils"
+ "eventmesh/tcp/common"
+ "log"
+)
+
+const (
+ MAGIC = "EventMesh"
+ VERSION = "0000"
+ LENGTH_SIZE = 4
+)
+
+func EncodePackage(message tcp.Package) *bytes.Buffer {
+
+ header := message.Header
+ headerData := header.Marshal()
+
+ var bodyData []byte
+ if header.GetProperty(gcommon.Constants.PROTOCOL_TYPE) != common.EventMeshCommon.CLOUD_EVENTS_PROTOCOL_NAME {
+ bodyData = gutils.MarshalJsonBytes(message.Body)
+ } else {
+ bodyData = (message.Body).([]byte)
+ }
+
+ headerLen := len(headerData)
+ bodyLen := len(bodyData)
+
+ length := LENGTH_SIZE + LENGTH_SIZE + headerLen + bodyLen
+
+ var out bytes.Buffer
+ out.WriteString(MAGIC)
+ out.WriteString(VERSION)
+
+ lengthBytes := make([]byte, LENGTH_SIZE)
+ binary.BigEndian.PutUint32(lengthBytes, uint32(length))
+
+ headerLenBytes := make([]byte, LENGTH_SIZE)
+ binary.BigEndian.PutUint32(headerLenBytes, uint32(headerLen))
+
+ out.Write(lengthBytes)
+ out.Write(headerLenBytes)
+ out.Write(headerData)
+ out.Write(bodyData)
+
+ return &out
+}
+
+func DecodePackage(in *bytes.Buffer) tcp.Package {
+ flagBytes := parseFlag(in)
+ versionBytes := parseVersion(in)
+ validateFlag(flagBytes, versionBytes)
+
+ length := parseLength(in)
+ headerLen := parseLength(in)
+ bodyLen := length - headerLen - LENGTH_SIZE - LENGTH_SIZE
+ header := parseHeader(in, int(headerLen))
+ body := parseBody(in, header, int(bodyLen))
+ return tcp.Package{Header: header, Body: body}
+}
+
+func parseFlag(in *bytes.Buffer) []byte {
+ flagLen := len([]byte(MAGIC))
+ flagBytes := make([]byte, flagLen)
+ n, err := in.Read(flagBytes)
+ if err != nil {
+ return nil
+ }
+ log.Printf("read %d bytes (flag) \n", n)
+ return flagBytes
+}
+
+func parseVersion(in *bytes.Buffer) []byte {
+ verLen := len([]byte(VERSION))
+ verBytes := make([]byte, verLen)
+ n, err := in.Read(verBytes)
+ if err != nil {
+ return nil
+ }
+ log.Printf("read %d bytes (version) \n", n)
+ return verBytes
+}
+
+func parseLength(in *bytes.Buffer) uint32 {
+ lenBytes := make([]byte, 4)
+ n, err := in.Read(lenBytes)
+ if err != nil {
+ log.Fatal("Failed to parse length")
+ }
+ log.Printf("read %d bytes (length) \n", n)
+ return binary.BigEndian.Uint32(lenBytes)
+}
+
+func parseHeader(in *bytes.Buffer, headerLen int) tcp.Header {
+ headerBytes := make([]byte, headerLen)
+ n, err := in.Read(headerBytes)
+ if err != nil {
+ log.Fatal("Failed to parse header")
+ }
+ log.Printf("read %d bytes (header) \n", n)
+
+ var header tcp.Header
+ return header.Unmarshal(headerBytes)
+}
+
+func parseBody(in *bytes.Buffer, header tcp.Header, bodyLen int) interface{} {
+ if bodyLen <= 0 {
+ return nil
+ }
+
+ bodyBytes := make([]byte, bodyLen)
+ n, err := in.Read(bodyBytes)
+ if err != nil {
+ log.Fatal("Failed to parse body")
+ }
+ log.Printf("read %d bytes (body) \n", n)
+
+ bodyStr := string(bodyBytes)
+ return deserializeBody(bodyStr, header)
+}
+
+func deserializeBody(bodyStr string, header tcp.Header) interface{} {
+ command := header.Cmd
+ switch command {
+ case tcp.DefaultCommand.HELLO_REQUEST:
+ case tcp.DefaultCommand.RECOMMEND_REQUEST:
+ var useAgent tcp.UserAgent
+ gutils.UnMarshalJsonString(bodyStr, &useAgent)
+ return useAgent
+ case tcp.DefaultCommand.SUBSCRIBE_REQUEST:
+ case tcp.DefaultCommand.UNSUBSCRIBE_REQUEST:
+ return nil
+ //return OBJECT_MAPPER.readValue(bodyJsonString, Subscription.class);
+ case tcp.DefaultCommand.REQUEST_TO_SERVER:
+ case tcp.DefaultCommand.RESPONSE_TO_SERVER:
+ case tcp.DefaultCommand.ASYNC_MESSAGE_TO_SERVER:
+ case tcp.DefaultCommand.BROADCAST_MESSAGE_TO_SERVER:
+ case tcp.DefaultCommand.REQUEST_TO_CLIENT:
+ case tcp.DefaultCommand.RESPONSE_TO_CLIENT:
+ case tcp.DefaultCommand.ASYNC_MESSAGE_TO_CLIENT:
+ case tcp.DefaultCommand.BROADCAST_MESSAGE_TO_CLIENT:
+ case tcp.DefaultCommand.REQUEST_TO_CLIENT_ACK:
+ case tcp.DefaultCommand.RESPONSE_TO_CLIENT_ACK:
+ case tcp.DefaultCommand.ASYNC_MESSAGE_TO_CLIENT_ACK:
+ case tcp.DefaultCommand.BROADCAST_MESSAGE_TO_CLIENT_ACK:
+ // The message string will be deserialized by protocol plugin, if the event is cloudevents, the body is
+ // just a string.
+ return bodyStr
+ case tcp.DefaultCommand.REDIRECT_TO_CLIENT:
+ return nil
+ //return OBJECT_MAPPER.readValue(bodyJsonString, RedirectInfo.class);
+ default:
+ // FIXME improve codes
+ log.Printf("Invalidate TCP command: %s\n", command)
+ return nil
+ }
+
+ return nil
+}
+
+func validateFlag(flagBytes, versionBytes []byte) {
+ // TODO add check
+}
diff --git a/eventmesh-sdk-go/common/protocol/tcp/command.go b/eventmesh-sdk-go/common/protocol/tcp/command.go
new file mode 100644
index 0000000..5d0c668
--- /dev/null
+++ b/eventmesh-sdk-go/common/protocol/tcp/command.go
@@ -0,0 +1,202 @@
+package tcp
+
+type Command string
+
+var DefaultCommand = struct {
+ //heartbeat
+ HEARTBEAT_REQUEST Command //client send heartbeat packet to server
+ HEARTBEAT_RESPONSE Command //server response heartbeat packet of client
+
+ //handshake
+ HELLO_REQUEST Command //client send handshake request to server
+ HELLO_RESPONSE Command //server response handshake request of client
+
+ //disconnection
+ CLIENT_GOODBYE_REQUEST Command //Notify server when client actively disconnects
+ CLIENT_GOODBYE_RESPONSE Command //Server replies to client's active disconnection notification
+ SERVER_GOODBYE_REQUEST Command //Notify client when server actively disconnects
+ SERVER_GOODBYE_RESPONSE Command //Client replies to server's active disconnection notification
+
+ //subscription management
+ SUBSCRIBE_REQUEST Command //Subscription request sent by client to server
+ SUBSCRIBE_RESPONSE Command //Server replies to client's subscription request
+ UNSUBSCRIBE_REQUEST Command //Unsubscribe request from client to server
+ UNSUBSCRIBE_RESPONSE Command //Server replies to client's unsubscribe request
+
+ //monitor
+ LISTEN_REQUEST Command //Request from client to server to start topic listening
+ LISTEN_RESPONSE Command //The server replies to the client's listening request
+
+ //RR
+ REQUEST_TO_SERVER Command //The client sends the RR request to the server
+ REQUEST_TO_CLIENT Command //The server pushes the RR request to the client
+ REQUEST_TO_CLIENT_ACK Command //After receiving RR request, the client sends ACK to the server
+ RESPONSE_TO_SERVER Command //The client sends the RR packet back to the server
+ RESPONSE_TO_CLIENT Command //The server pushes the RR packet back to the client
+ RESPONSE_TO_CLIENT_ACK Command //After receiving the return packet, the client sends ACK to the server
+
+ //Asynchronous events
+ ASYNC_MESSAGE_TO_SERVER Command //The client sends asynchronous events to the server
+ ASYNC_MESSAGE_TO_SERVER_ACK Command //After receiving the asynchronous event, the server sends ack to the client
+ ASYNC_MESSAGE_TO_CLIENT Command //The server pushes asynchronous events to the client
+ ASYNC_MESSAGE_TO_CLIENT_ACK Command //After the client receives the asynchronous event, the ACK is sent to the server
+
+ //radio broadcast
+ BROADCAST_MESSAGE_TO_SERVER Command //The client sends the broadcast message to the server
+ BROADCAST_MESSAGE_TO_SERVER_ACK Command //After receiving the broadcast message, the server sends ACK to the client
+ BROADCAST_MESSAGE_TO_CLIENT Command //The server pushes the broadcast message to the client
+ BROADCAST_MESSAGE_TO_CLIENT_ACK Command //After the client receives the broadcast message, the ACK is sent to the server
+
+ //Log reporting
+ SYS_LOG_TO_LOGSERVER Command //Business log reporting
+
+ //RMB tracking log reporting
+ TRACE_LOG_TO_LOGSERVER Command //RMB tracking log reporting
+
+ //Redirecting instruction
+ REDIRECT_TO_CLIENT Command //The server pushes the redirection instruction to the client
+
+ //service register
+ REGISTER_REQUEST Command //Client sends registration request to server
+ REGISTER_RESPONSE Command //The server sends the registration result to the client
+
+ //service unregister
+ UNREGISTER_REQUEST Command //The client sends a de registration request to the server
+ UNREGISTER_RESPONSE Command //The server will register the result to the client
+
+ //The client asks which EventMesh to recommend
+ RECOMMEND_REQUEST Command //Client sends recommendation request to server
+ RECOMMEND_RESPONSE Command //The server will recommend the results to the client
+}{
+ //heartbeat
+ HEARTBEAT_REQUEST: "HEARTBEAT_REQUEST",
+ HEARTBEAT_RESPONSE: "HEARTBEAT_RESPONSE",
+
+ //handshake
+ HELLO_REQUEST: "HELLO_REQUEST",
+ HELLO_RESPONSE: "HELLO_RESPONSE",
+
+ //disconnection
+ CLIENT_GOODBYE_REQUEST: "CLIENT_GOODBYE_REQUEST",
+ CLIENT_GOODBYE_RESPONSE: "CLIENT_GOODBYE_RESPONSE",
+ SERVER_GOODBYE_REQUEST: "SERVER_GOODBYE_REQUEST",
+ SERVER_GOODBYE_RESPONSE: "SERVER_GOODBYE_RESPONSE",
+
+ //subscription management
+ SUBSCRIBE_REQUEST: "SUBSCRIBE_REQUEST",
+ SUBSCRIBE_RESPONSE: "SUBSCRIBE_RESPONSE",
+ UNSUBSCRIBE_REQUEST: "UNSUBSCRIBE_REQUEST",
+ UNSUBSCRIBE_RESPONSE: "UNSUBSCRIBE_RESPONSE",
+
+ //monitor
+ LISTEN_REQUEST: "LISTEN_REQUEST",
+ LISTEN_RESPONSE: "LISTEN_RESPONSE",
+
+ //RR
+ REQUEST_TO_SERVER: "REQUEST_TO_SERVER",
+ REQUEST_TO_CLIENT: "REQUEST_TO_CLIENT",
+ REQUEST_TO_CLIENT_ACK: "REQUEST_TO_CLIENT_ACK",
+ RESPONSE_TO_SERVER: "RESPONSE_TO_SERVER",
+ RESPONSE_TO_CLIENT: "RESPONSE_TO_CLIENT",
+ RESPONSE_TO_CLIENT_ACK: "RESPONSE_TO_CLIENT_ACK",
+
+ //Asynchronous events
+ ASYNC_MESSAGE_TO_SERVER: "ASYNC_MESSAGE_TO_SERVER",
+ ASYNC_MESSAGE_TO_SERVER_ACK: "ASYNC_MESSAGE_TO_SERVER_ACK",
+ ASYNC_MESSAGE_TO_CLIENT: "ASYNC_MESSAGE_TO_CLIENT",
+ ASYNC_MESSAGE_TO_CLIENT_ACK: "ASYNC_MESSAGE_TO_CLIENT_ACK",
+
+ //radio broadcast
+ BROADCAST_MESSAGE_TO_SERVER: "BROADCAST_MESSAGE_TO_SERVER",
+ BROADCAST_MESSAGE_TO_SERVER_ACK: "BROADCAST_MESSAGE_TO_SERVER_ACK",
+ BROADCAST_MESSAGE_TO_CLIENT: "BROADCAST_MESSAGE_TO_CLIENT",
+ BROADCAST_MESSAGE_TO_CLIENT_ACK: "BROADCAST_MESSAGE_TO_CLIENT_ACK",
+
+ //Log reporting
+ SYS_LOG_TO_LOGSERVER: "SYS_LOG_TO_LOGSERVER",
+
+ //RMB tracking log reporting
+ TRACE_LOG_TO_LOGSERVER: "TRACE_LOG_TO_LOGSERVER",
+
+ //Redirecting instruction
+ REDIRECT_TO_CLIENT: "REDIRECT_TO_CLIENT",
+
+ //service register
+ REGISTER_REQUEST: "REGISTER_REQUEST",
+ REGISTER_RESPONSE: "REGISTER_RESPONSE",
+
+ //service unregister
+ UNREGISTER_REQUEST: "UNREGISTER_REQUEST",
+ UNREGISTER_RESPONSE: "UNREGISTER_RESPONSE",
+
+ //The client asks which EventMesh to recommend
+ RECOMMEND_REQUEST: "RECOMMEND_REQUEST",
+ RECOMMEND_RESPONSE: "RECOMMEND_RESPONSE",
+}
+
+//{
+// //heartbeat
+// HEARTBEAT_REQUEST: 0,
+// HEARTBEAT_RESPONSE: 1,
+//
+// //handshake
+// HELLO_REQUEST: 2,
+// HELLO_RESPONSE: 3,
+//
+// //disconnection
+// CLIENT_GOODBYE_REQUEST: 4,
+// CLIENT_GOODBYE_RESPONSE: 5,
+// SERVER_GOODBYE_REQUEST: 6,
+// SERVER_GOODBYE_RESPONSE: 7,
+//
+// //subscription management
+// SUBSCRIBE_REQUEST: 8,
+// SUBSCRIBE_RESPONSE: 9,
+// UNSUBSCRIBE_REQUEST: 10,
+// UNSUBSCRIBE_RESPONSE: 11,
+//
+// //monitor
+// LISTEN_REQUEST: 12,
+// LISTEN_RESPONSE: 13,
+//
+// //RR
+// REQUEST_TO_SERVER: 14,
+// REQUEST_TO_CLIENT: 15,
+// REQUEST_TO_CLIENT_ACK: 16,
+// RESPONSE_TO_SERVER: 17,
+// RESPONSE_TO_CLIENT: 18,
+// RESPONSE_TO_CLIENT_ACK: 19,
+//
+// //Asynchronous events
+// ASYNC_MESSAGE_TO_SERVER: 20,
+// ASYNC_MESSAGE_TO_SERVER_ACK: 21,
+// ASYNC_MESSAGE_TO_CLIENT: 22,
+// ASYNC_MESSAGE_TO_CLIENT_ACK: 23,
+//
+// //radio broadcast
+// BROADCAST_MESSAGE_TO_SERVER: 24,
+// BROADCAST_MESSAGE_TO_SERVER_ACK: 25,
+// BROADCAST_MESSAGE_TO_CLIENT: 26,
+// BROADCAST_MESSAGE_TO_CLIENT_ACK: 27,
+//
+// //Log reporting
+// SYS_LOG_TO_LOGSERVER: 28,
+//
+// //RMB tracking log reporting
+// TRACE_LOG_TO_LOGSERVER: 29,
+//
+// //Redirecting instruction
+// REDIRECT_TO_CLIENT: 30,
+//
+// //service register
+// REGISTER_REQUEST: 31,
+// REGISTER_RESPONSE: 32,
+//
+// //service unregister
+// UNREGISTER_REQUEST: 33,
+// UNREGISTER_RESPONSE: 34,
+//
+// //The client asks which EventMesh to recommend
+// RECOMMEND_REQUEST: 35,
+// RECOMMEND_RESPONSE: 36,
+//}
diff --git a/eventmesh-sdk-go/common/protocol/tcp/header.go b/eventmesh-sdk-go/common/protocol/tcp/header.go
new file mode 100644
index 0000000..56dc817
--- /dev/null
+++ b/eventmesh-sdk-go/common/protocol/tcp/header.go
@@ -0,0 +1,80 @@
+package tcp
+
+import (
+ "eventmesh/common/utils"
+)
+
+type Header struct {
+ Cmd Command `json:"cmd"`
+ Code int `json:"code"`
+ Desc string `json:"desc"`
+ Seq string `json:"seq"`
+ Properties map[string]interface{} `json:"properties"`
+}
+
+func (h Header) PutProperty(name string, value interface{}) {
+ h.Properties[name] = value
+}
+
+func (h Header) GetProperty(name string) interface{} {
+ if h.Properties == nil {
+ return nil
+ }
+
+ if val, ok := h.Properties[name]; ok {
+ return val
+ }
+
+ return nil
+}
+
+func (h Header) Marshal() []byte {
+ newHeader := make(map[string]interface{})
+ newHeader["cmd"] = h.Cmd
+ // Compatible with Java Enum serialization
+ newHeader["command"] = h.Cmd
+ newHeader["code"] = h.Code
+ newHeader["desc"] = h.Desc
+ newHeader["seq"] = h.Seq
+ newHeader["properties"] = h.Properties
+ return utils.MarshalJsonBytes(newHeader)
+}
+
+func (h Header) getVal(key string, headerDict map[string]interface{}) interface{} {
+ if val, ok := headerDict[key]; ok {
+ return val
+ }
+ return nil
+}
+
+func (h Header) Unmarshal(header []byte) Header {
+
+ var headerDict map[string]interface{}
+ utils.UnMarshalJsonBytes(header, &headerDict)
+
+ if val := h.getVal("cmd", headerDict); val != nil {
+ h.Cmd = Command(val.(string))
+ }
+
+ if val := h.getVal("code", headerDict); val != nil {
+ h.Code = int(val.(float64))
+ }
+
+ if val := h.getVal("desc", headerDict); val != nil {
+ h.Desc = val.(string)
+ }
+
+ if val := h.getVal("seq", headerDict); val != nil {
+ h.Seq = val.(string)
+ }
+
+ if val := h.getVal("properties", headerDict); val != nil {
+ h.Properties = val.(map[string]interface{})
+ }
+
+ return h
+}
+
+func NewHeader(cmd Command, code int, desc string, seq string) Header {
+ return Header{Cmd: cmd, Code: code, Desc: desc, Seq: seq, Properties: map[string]interface{}{}}
+}
diff --git a/eventmesh-sdk-go/common/protocol/tcp/package.go b/eventmesh-sdk-go/common/protocol/tcp/package.go
new file mode 100644
index 0000000..e4e05a7
--- /dev/null
+++ b/eventmesh-sdk-go/common/protocol/tcp/package.go
@@ -0,0 +1,10 @@
+package tcp
+
+type Package struct {
+ Header Header `json:"header"`
+ Body interface{} `json:"body"`
+}
+
+func NewPackage(header Header) Package {
+ return Package{Header: header}
+}
diff --git a/eventmesh-sdk-go/common/protocol/tcp/user_agent.go b/eventmesh-sdk-go/common/protocol/tcp/user_agent.go
new file mode 100644
index 0000000..b2da802
--- /dev/null
+++ b/eventmesh-sdk-go/common/protocol/tcp/user_agent.go
@@ -0,0 +1,23 @@
+package tcp
+
+type UserAgent struct {
+ Env string `json:"env"`
+ Subsystem string `json:"subsystem"`
+ Path string `json:"path"`
+ Pid int `json:"pid"`
+ Host string `json:"host"`
+ Port int `json:"port"`
+ Version string `json:"version"`
+ Username string `json:"username"`
+ Password string `json:"password"`
+ Idc string `json:"idc"`
+ Group string `json:"group"`
+ Purpose string `json:"purpose"`
+ Unack int `json:"unack"`
+}
+
+func NewUserAgent(env string, subsystem string, path string, pid int, host string, port int, version string,
+ username string, password string, idc string, producerGroup string, consumerGroup string) *UserAgent {
+ return &UserAgent{Env: env, Subsystem: subsystem, Path: path, Pid: pid, Host: host, Port: port, Version: version,
+ Username: username, Password: password, Idc: idc, Group: producerGroup}
+}
diff --git a/eventmesh-sdk-go/common/utils/json_utils.go b/eventmesh-sdk-go/common/utils/json_utils.go
new file mode 100644
index 0000000..8d1a093
--- /dev/null
+++ b/eventmesh-sdk-go/common/utils/json_utils.go
@@ -0,0 +1,29 @@
+package utils
+
+import (
+ "encoding/json"
+ "log"
+)
+
+func MarshalJsonBytes(obj interface{}) []byte {
+ ret, err := json.Marshal(obj)
+ if err != nil {
+ log.Fatal("Failed to marshal json")
+ }
+ return ret
+}
+
+func MarshalJsonString(obj interface{}) string {
+ return string(MarshalJsonBytes(obj))
+}
+
+func UnMarshalJsonBytes(data []byte, obj interface{}) {
+ err := json.Unmarshal(data, obj)
+ if err != nil {
+ log.Fatal("Failed to unmarshal json")
+ }
+}
+
+func UnMarshalJsonString(data string, obj interface{}) {
+ UnMarshalJsonBytes([]byte(data), obj)
+}
diff --git a/eventmesh-sdk-go/examples/http/async_pub_cloudevents.go b/eventmesh-sdk-go/examples/http/async_pub_cloudevents.go
new file mode 100644
index 0000000..c146642
--- /dev/null
+++ b/eventmesh-sdk-go/examples/http/async_pub_cloudevents.go
@@ -0,0 +1,52 @@
+package http
+
+import (
+ "eventmesh/common"
+ "eventmesh/common/utils"
+ "eventmesh/http/conf"
+ "eventmesh/http/producer"
+ cloudevents "github.com/cloudevents/sdk-go/v2"
+ "github.com/google/uuid"
+ "log"
+ "os"
+ "strconv"
+)
+
+func AsyncPubCloudEvents() {
+ eventMeshIPPort := "127.0.0.1" + ":" + "10105"
+ producerGroup := "EventMeshTest-producerGroup"
+ topic := "TEST-TOPIC-HTTP-ASYNC"
+ env := "P"
+ idc := "FT"
+ subSys := "1234"
+ // FIXME Get ip dynamically
+ localIp := "127.0.0.1"
+
+ // (Deep) Copy of default config
+ eventMeshClientConfig := conf.DefaultEventMeshHttpClientConfig
+ eventMeshClientConfig.SetLiteEventMeshAddr(eventMeshIPPort)
+ eventMeshClientConfig.SetProducerGroup(producerGroup)
+ eventMeshClientConfig.SetEnv(env)
+ eventMeshClientConfig.SetIdc(idc)
+ eventMeshClientConfig.SetSys(subSys)
+ eventMeshClientConfig.SetIp(localIp)
+ eventMeshClientConfig.SetPid(strconv.Itoa(os.Getpid()))
+
+ // Make event to send
+ event := cloudevents.NewEvent()
+ event.SetID(uuid.New().String())
+ event.SetSubject(topic)
+ event.SetSource("example/uri")
+ event.SetType(common.Constants.CLOUD_EVENTS_PROTOCOL_NAME)
+ event.SetExtension(common.Constants.EVENTMESH_MESSAGE_CONST_TTL, strconv.Itoa(4*1000))
+ event.SetDataContentType(cloudevents.ApplicationCloudEventsJSON)
+ data := map[string]string{"hello": "EventMesh"}
+ err := event.SetData(cloudevents.ApplicationCloudEventsJSON, utils.MarshalJsonBytes(data))
+ if err != nil {
+ log.Fatalf("Failed to set cloud event data, error: %v", err)
+ }
+
+ // Publish event
+ httpProducer := producer.NewEventMeshHttpProducer(eventMeshClientConfig)
+ httpProducer.Publish(event)
+}
diff --git a/eventmesh-sdk-go/examples/http/sub_cloudevents.go b/eventmesh-sdk-go/examples/http/sub_cloudevents.go
new file mode 100644
index 0000000..35fd3e1
--- /dev/null
+++ b/eventmesh-sdk-go/examples/http/sub_cloudevents.go
@@ -0,0 +1,99 @@
+package http
+
+import (
+ "eventmesh/common/protocol"
+ "eventmesh/common/utils"
+ "eventmesh/http/conf"
+ "eventmesh/http/consumer"
+ cloudevents "github.com/cloudevents/sdk-go/v2"
+ "log"
+ "net/http"
+ "os"
+ "strconv"
+ "strings"
+)
+
+func SubCloudEvents() {
+ eventMeshIPPort := "127.0.0.1" + ":" + "10105"
+ consumerGroup := "EventMeshTest-consumerGroup"
+ topic := "TEST-TOPIC-HTTP-ASYNC"
+ env := "P"
+ idc := "FT"
+ subSys := "1234"
+ // FIXME Get ip dynamically
+ localIp := "127.0.0.1"
+ localPort := 8090
+
+ subscribeUrl := "http://" + localIp + ":" + strconv.Itoa(localPort) + "/hello"
+ topicList := []protocol.SubscriptionItem{
+ {
+ Topic: topic,
+ Mode: protocol.DefaultSubscriptionMode.CLUSTERING,
+ Type: protocol.DefaultSubscriptionType.ASYNC,
+ },
+ }
+
+ // Callback handle
+ exit := make(chan bool)
+ go httpServer(localIp, localPort, exit)
+
+ // (Deep) Copy of default config
+ eventMeshClientConfig := conf.DefaultEventMeshHttpClientConfig
+ eventMeshClientConfig.SetLiteEventMeshAddr(eventMeshIPPort)
+ eventMeshClientConfig.SetConsumerGroup(consumerGroup)
+ eventMeshClientConfig.SetEnv(env)
+ eventMeshClientConfig.SetIdc(idc)
+ eventMeshClientConfig.SetSys(subSys)
+ eventMeshClientConfig.SetIp(localIp)
+ eventMeshClientConfig.SetPid(strconv.Itoa(os.Getpid()))
+
+ // Subscribe
+ eventMeshHttpConsumer := consumer.NewEventMeshHttpConsumer(eventMeshClientConfig)
+ eventMeshHttpConsumer.Subscribe(topicList, subscribeUrl)
+ eventMeshHttpConsumer.HeartBeat(topicList, subscribeUrl)
+
+ // FIXME Add unsubscribe
+
+ // Wait for exit
+ <-exit
+}
+
+func httpServer(ip string, port int, exit chan<- bool) {
+ http.HandleFunc("/hello", hello)
+ err := http.ListenAndServe(ip+":"+strconv.Itoa(port), nil)
+ if err != nil {
+ log.Fatalf("Failed to launch a callback http server, error: %v", err)
+ }
+
+ exit <- true
+}
+
+func hello(w http.ResponseWriter, r *http.Request) {
+ if r.URL.Path != "/hello" {
+ http.NotFound(w, r)
+ return
+ }
+
+ switch r.Method {
+ case "POST":
+ contentType := r.Header.Get("Content-Type")
+
+ // FIXME Now we only support post form
+ if strings.Contains(contentType, "application/x-www-form-urlencoded") {
+ err := r.ParseForm()
+ if err != nil {
+ log.Printf("Failed to parse post form parameter, error: %v", err)
+ }
+ content := r.FormValue("content")
+ event := cloudevents.NewEvent()
+ utils.UnMarshalJsonString(content, &event)
+ log.Printf("Received data from eventmesh server: %v", string(event.Data()))
+ return
+ }
+
+ w.WriteHeader(http.StatusUnsupportedMediaType)
+ default:
+ w.WriteHeader(http.StatusNotImplemented)
+ w.Write([]byte(http.StatusText(http.StatusNotImplemented)))
+ }
+}
diff --git a/eventmesh-sdk-go/examples/tcp/async_pub_cloudevents.go b/eventmesh-sdk-go/examples/tcp/async_pub_cloudevents.go
new file mode 100644
index 0000000..62fd2c3
--- /dev/null
+++ b/eventmesh-sdk-go/examples/tcp/async_pub_cloudevents.go
@@ -0,0 +1,45 @@
+package tcp
+
+import (
+ "eventmesh/common"
+ "eventmesh/common/protocol"
+ gtcp "eventmesh/common/protocol/tcp"
+ "eventmesh/common/utils"
+ "eventmesh/tcp"
+ "time"
+
+ //"eventmesh/tcp/common"
+ "eventmesh/tcp/conf"
+ cloudevents "github.com/cloudevents/sdk-go/v2"
+ "github.com/google/uuid"
+ "strconv"
+)
+
+func AsyncPubCloudEvents() {
+ eventMeshIp := "127.0.0.1"
+ eventMeshTcpPort := 10000
+ topic := "TEST-TOPIC-TCP-ASYNC"
+
+ // Init client
+ userAgent := gtcp.UserAgent{Env: "test", Subsystem: "5023", Path: "/data/app/umg_proxy", Pid: 32893,
+ Host: "127.0.0.1", Port: 8362, Version: "2.0.11", Username: "PU4283", Password: "PUPASS", Idc: "FT",
+ Group: "EventmeshTestGroup", Purpose: "pub"}
+ config := conf.NewEventMeshTCPClientConfig(eventMeshIp, eventMeshTcpPort, userAgent)
+ client := tcp.CreateEventMeshTCPClient(*config, protocol.DefaultMessageType.CloudEvent)
+ client.Init()
+
+ // Make event to send
+ event := cloudevents.NewEvent()
+ event.SetID(uuid.New().String())
+ event.SetSubject(topic)
+ event.SetSource("example/uri")
+ event.SetType(common.Constants.CLOUD_EVENTS_PROTOCOL_NAME)
+ event.SetExtension(common.Constants.EVENTMESH_MESSAGE_CONST_TTL, strconv.Itoa(4*1000))
+ event.SetDataContentType(cloudevents.ApplicationCloudEventsJSON)
+ data := map[string]string{"hello": "EventMesh"}
+ event.SetData(cloudevents.ApplicationCloudEventsJSON, utils.MarshalJsonBytes(data))
+
+ // Publish event
+ client.Publish(event, 10000)
+ time.Sleep(10 * time.Second)
+}
diff --git a/eventmesh-sdk-go/go.mod b/eventmesh-sdk-go/go.mod
new file mode 100644
index 0000000..8a12b03
--- /dev/null
+++ b/eventmesh-sdk-go/go.mod
@@ -0,0 +1,8 @@
+module eventmesh
+
+go 1.16
+
+require (
+ github.com/cloudevents/sdk-go/v2 v2.6.0 // indirect
+ github.com/google/uuid v1.3.0 // indirect
+)
diff --git a/eventmesh-sdk-go/go.sum b/eventmesh-sdk-go/go.sum
new file mode 100644
index 0000000..f490f71
--- /dev/null
+++ b/eventmesh-sdk-go/go.sum
@@ -0,0 +1,39 @@
+github.com/cloudevents/sdk-go/v2 v2.6.0 h1:yp6zLEvhXSi6P25zzfgORgFI0quG2/NXoH9QoHzvKn8=
+github.com/cloudevents/sdk-go/v2 v2.6.0/go.mod h1:nlXhgFkf0uTopxmRXalyMwS2LG70cRGPrxzmjJgSG0U=
+github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
+github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
+github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
+github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY=
+github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
+github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
+github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
+github.com/json-iterator/go v1.1.10 h1:Kz6Cvnvv2wGdaG/V8yMvfkmNiXq9Ya2KUv4rouJJr68=
+github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
+github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
+github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
+github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
+github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 h1:ZqeYNhU3OHLH3mGKHDcjJRFFRrJa6eAM5H+CtDdOsPc=
+github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
+github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742 h1:Esafd1046DLDQ0W1YjYsBW+p8U2u7vzgW2SQVmlNazg=
+github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
+github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
+github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
+github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
+github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
+github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
+github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
+github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
+go.uber.org/atomic v1.4.0 h1:cxzIVoETapQEqDhQu3QfnvXAV4AlzcvUCxkVUFw3+EU=
+go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
+go.uber.org/multierr v1.1.0 h1:HoEmRHQPVSqub6w2z2d2EOVs2fjyFRGyofhKuyDq0QI=
+go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
+go.uber.org/zap v1.10.0 h1:ORx85nbTijNz8ljznvCMR1ZBIPKFn3jQrag10X2AsuM=
+go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q=
+golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
+gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
diff --git a/eventmesh-sdk-go/http/abstract_http_client.go b/eventmesh-sdk-go/http/abstract_http_client.go
new file mode 100644
index 0000000..d31b09d
--- /dev/null
+++ b/eventmesh-sdk-go/http/abstract_http_client.go
@@ -0,0 +1,43 @@
+package http
+
+import (
+ gcommon "eventmesh/common"
+ "eventmesh/http/conf"
+ nethttp "net/http"
+ "time"
+)
+
+type AbstractHttpClient struct {
+ EventMeshHttpClientConfig conf.EventMeshHttpClientConfig
+ HttpClient *nethttp.Client
+}
+
+func NewAbstractHttpClient(eventMeshHttpClientConfig conf.EventMeshHttpClientConfig) *AbstractHttpClient {
+ c := &AbstractHttpClient{EventMeshHttpClientConfig: eventMeshHttpClientConfig}
+ c.HttpClient = c.SetHttpClient()
+ return c
+}
+
+func (c *AbstractHttpClient) Close() {
+ // Http Client does not need to close explicitly
+}
+
+func (c *AbstractHttpClient) SetHttpClient() *nethttp.Client {
+ if !c.EventMeshHttpClientConfig.UseTls() {
+ return &nethttp.Client{Timeout: 100 * time.Second}
+ }
+
+ // Use TLS
+ return &nethttp.Client{Timeout: 100 * time.Second}
+}
+
+func (c *AbstractHttpClient) SelectEventMesh() string {
+ // FIXME Add load balance support
+ uri := c.EventMeshHttpClientConfig.LiteEventMeshAddr()
+
+ if c.EventMeshHttpClientConfig.UseTls() {
+ return gcommon.Constants.HTTPS_PROTOCOL_PREFIX + uri
+ }
+
+ return gcommon.Constants.HTTP_PROTOCOL_PREFIX + uri
+}
diff --git a/eventmesh-sdk-go/http/conf/eventmesh_http_client_config.go b/eventmesh-sdk-go/http/conf/eventmesh_http_client_config.go
new file mode 100644
index 0000000..27ba603
--- /dev/null
+++ b/eventmesh-sdk-go/http/conf/eventmesh_http_client_config.go
@@ -0,0 +1,144 @@
+package conf
+
+type EventMeshHttpClientConfig struct {
+ //The event server address list
+ // If it's a cluster, please use ; to split, and the address format is related to loadBalanceType.
+ // E.g.
+ // If you use Random strategy, the format like: 127.0.0.1:10105;127.0.0.2:10105
+ // If you use weighted round robin or weighted random strategy, the format like: 127.0.0.1:10105:1;127.0.0.2:10105:2
+ liteEventMeshAddr string
+ // TODO support load balance
+ //loadBalanceType string
+ consumeThreadCore int
+ consumeThreadMax int
+ env string
+ consumerGroup string
+ producerGroup string
+ idc string
+ ip string
+ pid string
+ sys string
+ userName string
+ password string
+ useTls bool
+}
+
+func (e *EventMeshHttpClientConfig) LiteEventMeshAddr() string {
+ return e.liteEventMeshAddr
+}
+
+func (e *EventMeshHttpClientConfig) SetLiteEventMeshAddr(liteEventMeshAddr string) {
+ e.liteEventMeshAddr = liteEventMeshAddr
+}
+
+func (e *EventMeshHttpClientConfig) ConsumeThreadCore() int {
+ return e.consumeThreadCore
+}
+
+func (e *EventMeshHttpClientConfig) SetConsumeThreadCore(consumeThreadCore int) {
+ e.consumeThreadCore = consumeThreadCore
+}
+
+func (e *EventMeshHttpClientConfig) ConsumeThreadMax() int {
+ return e.consumeThreadMax
+}
+
+func (e *EventMeshHttpClientConfig) SetConsumeThreadMax(consumeThreadMax int) {
+ e.consumeThreadMax = consumeThreadMax
+}
+
+func (e *EventMeshHttpClientConfig) Env() string {
+ return e.env
+}
+
+func (e *EventMeshHttpClientConfig) SetEnv(env string) {
+ e.env = env
+}
+
+func (e *EventMeshHttpClientConfig) ConsumerGroup() string {
+ return e.consumerGroup
+}
+
+func (e *EventMeshHttpClientConfig) SetConsumerGroup(consumerGroup string) {
+ e.consumerGroup = consumerGroup
+}
+
+func (e *EventMeshHttpClientConfig) ProducerGroup() string {
+ return e.producerGroup
+}
+
+func (e *EventMeshHttpClientConfig) SetProducerGroup(producerGroup string) {
+ e.producerGroup = producerGroup
+}
+
+func (e *EventMeshHttpClientConfig) Idc() string {
+ return e.idc
+}
+
+func (e *EventMeshHttpClientConfig) SetIdc(idc string) {
+ e.idc = idc
+}
+
+func (e *EventMeshHttpClientConfig) Ip() string {
+ return e.ip
+}
+
+func (e *EventMeshHttpClientConfig) SetIp(ip string) {
+ e.ip = ip
+}
+
+func (e *EventMeshHttpClientConfig) Pid() string {
+ return e.pid
+}
+
+func (e *EventMeshHttpClientConfig) SetPid(pid string) {
+ e.pid = pid
+}
+
+func (e *EventMeshHttpClientConfig) Sys() string {
+ return e.sys
+}
+
+func (e *EventMeshHttpClientConfig) SetSys(sys string) {
+ e.sys = sys
+}
+
+func (e *EventMeshHttpClientConfig) UserName() string {
+ return e.userName
+}
+
+func (e *EventMeshHttpClientConfig) SetUserName(userName string) {
+ e.userName = userName
+}
+
+func (e *EventMeshHttpClientConfig) Password() string {
+ return e.password
+}
+
+func (e *EventMeshHttpClientConfig) SetPassword(password string) {
+ e.password = password
+}
+
+func (e *EventMeshHttpClientConfig) UseTls() bool {
+ return e.useTls
+}
+
+func (e *EventMeshHttpClientConfig) SetUseTls(useTls bool) {
+ e.useTls = useTls
+}
+
+var DefaultEventMeshHttpClientConfig = EventMeshHttpClientConfig{
+ liteEventMeshAddr: "127.0.0.1:10105",
+ consumeThreadCore: 2,
+ consumeThreadMax: 5,
+ env: "",
+ consumerGroup: "DefaultConsumerGroup",
+ producerGroup: "DefaultProducerGroup",
+ idc: "",
+ ip: "",
+ pid: "",
+ sys: "",
+ userName: "",
+ password: "",
+ useTls: false,
+}
diff --git a/eventmesh-sdk-go/http/consumer/eventmesh_http_consumer.go b/eventmesh-sdk-go/http/consumer/eventmesh_http_consumer.go
new file mode 100644
index 0000000..62ba2a6
--- /dev/null
+++ b/eventmesh-sdk-go/http/consumer/eventmesh_http_consumer.go
@@ -0,0 +1,95 @@
+package consumer
+
+import (
+ gcommon "eventmesh/common"
+ "eventmesh/common/protocol"
+ "eventmesh/common/protocol/http/body/client"
+ "eventmesh/common/protocol/http/common"
+ gutils "eventmesh/common/utils"
+ "eventmesh/http"
+ "eventmesh/http/conf"
+ "eventmesh/http/model"
+ "eventmesh/http/utils"
+ "log"
+ nethttp "net/http"
+ "strconv"
+ "time"
+)
+
+type EventMeshHttpConsumer struct {
+ *http.AbstractHttpClient
+ subscriptions []protocol.SubscriptionItem
+}
+
+func NewEventMeshHttpConsumer(eventMeshHttpClientConfig conf.EventMeshHttpClientConfig) *EventMeshHttpConsumer {
+ c := &EventMeshHttpConsumer{AbstractHttpClient: http.NewAbstractHttpClient(eventMeshHttpClientConfig)}
+ c.subscriptions = make([]protocol.SubscriptionItem, 1000)
+ return c
+}
+
+func (e *EventMeshHttpConsumer) HeartBeat(topicList []protocol.SubscriptionItem, subscribeUrl string) {
+
+ // FIXME check topicList, subscribeUrl is not blank
+
+ for range time.Tick(time.Duration(gcommon.Constants.HEARTBEAT) * time.Millisecond) {
+
+ var heartbeatEntities []client.HeartbeatEntity
+ for _, item := range topicList {
+ entity := client.HeartbeatEntity{Topic: item.Topic, Url: subscribeUrl}
+ heartbeatEntities = append(heartbeatEntities, entity)
+ }
+
+ requestParam := e.buildCommonRequestParam()
+ requestParam.AddHeader(common.ProtocolKey.REQUEST_CODE, strconv.Itoa(common.DefaultRequestCode.HEARTBEAT.RequestCode))
+ // FIXME Java is name of SUB name
+ //requestParam.AddBody(client.HeartbeatRequestBodyKey.CLIENTTYPE, common.DefaultClientType.SUB.name())
+ requestParam.AddBody(client.HeartbeatRequestBodyKey.CLIENTTYPE, "SUB")
+ requestParam.AddBody(client.HeartbeatRequestBodyKey.HEARTBEATENTITIES, gutils.MarshalJsonString(heartbeatEntities))
+
+ target := e.SelectEventMesh()
+ resp := utils.HttpPost(e.HttpClient, target, requestParam)
+ var ret http.EventMeshRetObj
+ gutils.UnMarshalJsonString(resp, &ret)
+ if ret.RetCode != common.DefaultEventMeshRetCode.SUCCESS.RetCode {
+ log.Fatalf("Request failed, error code: %d", ret.RetCode)
+ }
+
+ }
+
+}
+
+func (e *EventMeshHttpConsumer) Subscribe(topicList []protocol.SubscriptionItem, subscribeUrl string) {
+
+ // FIXME check topicList, subscribeUrl is not blank
+
+ requestParam := e.buildCommonRequestParam()
+ requestParam.AddHeader(common.ProtocolKey.REQUEST_CODE, strconv.Itoa(common.DefaultRequestCode.SUBSCRIBE.RequestCode))
+ requestParam.AddBody(client.SubscribeRequestBodyKey.TOPIC, gutils.MarshalJsonString(topicList))
+ requestParam.AddBody(client.SubscribeRequestBodyKey.URL, subscribeUrl)
+ requestParam.AddBody(client.SubscribeRequestBodyKey.CONSUMERGROUP, e.EventMeshHttpClientConfig.ConsumerGroup())
+
+ target := e.SelectEventMesh()
+ resp := utils.HttpPost(e.HttpClient, target, requestParam)
+ var ret http.EventMeshRetObj
+ gutils.UnMarshalJsonString(resp, &ret)
+ if ret.RetCode != common.DefaultEventMeshRetCode.SUCCESS.RetCode {
+ log.Fatalf("Request failed, error code: %d", ret.RetCode)
+ }
+ e.subscriptions = append(e.subscriptions, topicList...)
+}
+
+func (e *EventMeshHttpConsumer) buildCommonRequestParam() *model.RequestParam {
+ param := model.NewRequestParam(nethttp.MethodPost)
+ param.AddHeader(common.ProtocolKey.ClientInstanceKey.ENV, e.EventMeshHttpClientConfig.Env())
+ param.AddHeader(common.ProtocolKey.ClientInstanceKey.IDC, e.EventMeshHttpClientConfig.Idc())
+ param.AddHeader(common.ProtocolKey.ClientInstanceKey.IP, e.EventMeshHttpClientConfig.Ip())
+ param.AddHeader(common.ProtocolKey.ClientInstanceKey.PID, e.EventMeshHttpClientConfig.Pid())
+ param.AddHeader(common.ProtocolKey.ClientInstanceKey.SYS, e.EventMeshHttpClientConfig.Sys())
+ param.AddHeader(common.ProtocolKey.ClientInstanceKey.USERNAME, e.EventMeshHttpClientConfig.UserName())
+ param.AddHeader(common.ProtocolKey.ClientInstanceKey.PASSWORD, e.EventMeshHttpClientConfig.Password())
+ param.AddHeader(common.ProtocolKey.VERSION, common.DefaultProtocolVersion.V1.Version())
+ param.AddHeader(common.ProtocolKey.LANGUAGE, gcommon.Constants.LANGUAGE_GO)
+ param.SetTimeout(gcommon.Constants.DEFAULT_HTTP_TIME_OUT)
+ param.AddBody(client.HeartbeatRequestBodyKey.CONSUMERGROUP, e.EventMeshHttpClientConfig.ConsumerGroup())
+ return param
+}
diff --git a/eventmesh-sdk-go/http/eventmesh_ret_obj.go b/eventmesh-sdk-go/http/eventmesh_ret_obj.go
new file mode 100644
index 0000000..9cab9fe
--- /dev/null
+++ b/eventmesh-sdk-go/http/eventmesh_ret_obj.go
@@ -0,0 +1,7 @@
+package http
+
+type EventMeshRetObj struct {
+ ResTime int64 `json:"resTime"`
+ RetCode int `json:"retCode"`
+ RetMsg string `json:"retMsg"`
+}
diff --git a/eventmesh-sdk-go/http/model/request_param.go b/eventmesh-sdk-go/http/model/request_param.go
new file mode 100644
index 0000000..75b9d8a
--- /dev/null
+++ b/eventmesh-sdk-go/http/model/request_param.go
@@ -0,0 +1,53 @@
+package model
+
+type HttpMethod string
+
+type RequestParam struct {
+ queryParams map[string][]string
+ httpMethod HttpMethod
+ body map[string]string
+ headers map[string]string
+ timeout int64
+}
+
+func NewRequestParam(httpMethod HttpMethod) *RequestParam {
+ return &RequestParam{httpMethod: httpMethod}
+}
+
+func (r *RequestParam) QueryParams() map[string][]string {
+ return r.queryParams
+}
+
+func (r *RequestParam) SetQueryParams(queryParams map[string][]string) {
+ r.queryParams = queryParams
+}
+
+func (r *RequestParam) Body() map[string]string {
+ return r.body
+}
+
+func (r *RequestParam) AddBody(key, value string) {
+ if r.body == nil {
+ r.body = make(map[string]string)
+ }
+ r.body[key] = value
+}
+
+func (r *RequestParam) Headers() map[string]string {
+ return r.headers
+}
+
+func (r *RequestParam) AddHeader(key string, object interface{}) {
+ if r.headers == nil {
+ r.headers = make(map[string]string)
+ }
+ r.headers[key] = object.(string)
+}
+
+func (r *RequestParam) Timeout() int64 {
+ return r.timeout
+}
+
+func (r *RequestParam) SetTimeout(timeout int64) {
+ r.timeout = timeout
+}
diff --git a/eventmesh-sdk-go/http/producer/cloudevent_producer.go b/eventmesh-sdk-go/http/producer/cloudevent_producer.go
new file mode 100644
index 0000000..17a178c
--- /dev/null
+++ b/eventmesh-sdk-go/http/producer/cloudevent_producer.go
@@ -0,0 +1,85 @@
+package producer
+
+import (
+ gcommon "eventmesh/common"
+ "eventmesh/common/protocol/http/common"
+ "eventmesh/common/protocol/http/message"
+ gutils "eventmesh/common/utils"
+ "eventmesh/http"
+ "eventmesh/http/conf"
+ "eventmesh/http/model"
+ "eventmesh/http/utils"
+ cloudevents "github.com/cloudevents/sdk-go/v2"
+ "log"
+ nethttp "net/http"
+ "strconv"
+)
+
+type CloudEventProducer struct {
+ *http.AbstractHttpClient
+}
+
+func NewCloudEventProducer(eventMeshHttpClientConfig conf.EventMeshHttpClientConfig) *CloudEventProducer {
+ c := &CloudEventProducer{AbstractHttpClient: http.NewAbstractHttpClient(eventMeshHttpClientConfig)}
+ return c
+}
+
+func (c *CloudEventProducer) Publish(event cloudevents.Event) {
+ enhancedEvent := c.enhanceCloudEvent(event)
+ requestParam := c.buildCommonPostParam(enhancedEvent)
+ requestParam.AddHeader(common.ProtocolKey.REQUEST_CODE, strconv.Itoa(common.DefaultRequestCode.MSG_SEND_ASYNC.RequestCode))
+
+ target := c.SelectEventMesh()
+ resp := utils.HttpPost(c.HttpClient, target, requestParam)
+ var ret http.EventMeshRetObj
+ gutils.UnMarshalJsonString(resp, &ret)
+ if ret.RetCode != common.DefaultEventMeshRetCode.SUCCESS.RetCode {
+ log.Fatalf("Request failed, error code: %d", ret.RetCode)
+ }
+}
+
+func (c *CloudEventProducer) buildCommonPostParam(event cloudevents.Event) *model.RequestParam {
+
+ eventBytes, err := event.MarshalJSON()
+ if err != nil {
+ log.Fatal("Failed to marshal cloudevent")
+ }
+ content := string(eventBytes)
+
+ requestParam := model.NewRequestParam(nethttp.MethodPost)
+ requestParam.AddHeader(common.ProtocolKey.ClientInstanceKey.ENV, c.EventMeshHttpClientConfig.Env())
+ requestParam.AddHeader(common.ProtocolKey.ClientInstanceKey.IDC, c.EventMeshHttpClientConfig.Idc())
+ requestParam.AddHeader(common.ProtocolKey.ClientInstanceKey.IP, c.EventMeshHttpClientConfig.Ip())
+ requestParam.AddHeader(common.ProtocolKey.ClientInstanceKey.PID, c.EventMeshHttpClientConfig.Pid())
+ requestParam.AddHeader(common.ProtocolKey.ClientInstanceKey.SYS, c.EventMeshHttpClientConfig.Sys())
+ requestParam.AddHeader(common.ProtocolKey.ClientInstanceKey.USERNAME, c.EventMeshHttpClientConfig.UserName())
+ requestParam.AddHeader(common.ProtocolKey.ClientInstanceKey.PASSWORD, c.EventMeshHttpClientConfig.Password())
+ requestParam.AddHeader(common.ProtocolKey.LANGUAGE, gcommon.Constants.LANGUAGE_GO)
+ // FIXME Improve constants
+ requestParam.AddHeader(common.ProtocolKey.PROTOCOL_TYPE, "cloudevents")
+ requestParam.AddHeader(common.ProtocolKey.PROTOCOL_DESC, "http")
+ requestParam.AddHeader(common.ProtocolKey.PROTOCOL_VERSION, event.SpecVersion())
+
+ // todo: move producerGroup tp header
+ requestParam.AddBody(message.SendMessageRequestBodyKey.PRODUCERGROUP, c.EventMeshHttpClientConfig.ProducerGroup())
+ requestParam.AddBody(message.SendMessageRequestBodyKey.CONTENT, content)
+
+ return requestParam
+}
+
+func (c *CloudEventProducer) enhanceCloudEvent(event cloudevents.Event) cloudevents.Event {
+ event.SetExtension(common.ProtocolKey.ClientInstanceKey.ENV, c.EventMeshHttpClientConfig.Env())
+ event.SetExtension(common.ProtocolKey.ClientInstanceKey.IDC, c.EventMeshHttpClientConfig.Idc())
+ event.SetExtension(common.ProtocolKey.ClientInstanceKey.IP, c.EventMeshHttpClientConfig.Ip())
+ event.SetExtension(common.ProtocolKey.ClientInstanceKey.PID, c.EventMeshHttpClientConfig.Pid())
+ event.SetExtension(common.ProtocolKey.ClientInstanceKey.SYS, c.EventMeshHttpClientConfig.Sys())
+ // FIXME Random string
+ event.SetExtension(common.ProtocolKey.ClientInstanceKey.BIZSEQNO, "333333")
+ event.SetExtension(common.ProtocolKey.ClientInstanceKey.UNIQUEID, "444444")
+ event.SetExtension(common.ProtocolKey.LANGUAGE, gcommon.Constants.LANGUAGE_GO)
+ // FIXME Java is name of spec version name
+ //event.SetExtension(common.ProtocolKey.PROTOCOL_DESC, event.SpecVersion())
+ event.SetExtension(common.ProtocolKey.PROTOCOL_VERSION, event.SpecVersion())
+
+ return event
+}
diff --git a/eventmesh-sdk-go/http/producer/eventmesh_http_producer.go b/eventmesh-sdk-go/http/producer/eventmesh_http_producer.go
new file mode 100644
index 0000000..dbab724
--- /dev/null
+++ b/eventmesh-sdk-go/http/producer/eventmesh_http_producer.go
@@ -0,0 +1,27 @@
+package producer
+
+import (
+ "eventmesh/http/conf"
+ cloudevents "github.com/cloudevents/sdk-go/v2"
+)
+
+type EventMeshHttpProducer struct {
+ cloudEventProducer *CloudEventProducer
+}
+
+func NewEventMeshHttpProducer(eventMeshHttpClientConfig conf.EventMeshHttpClientConfig) *EventMeshHttpProducer {
+ return &EventMeshHttpProducer{
+ cloudEventProducer: NewCloudEventProducer(eventMeshHttpClientConfig),
+ }
+}
+
+func (e *EventMeshHttpProducer) Publish(eventMeshMessage interface{}) {
+
+ // FIXME Check eventMeshMessage is not nil
+
+ // CloudEvent
+ if _, ok := eventMeshMessage.(cloudevents.Event); ok {
+ event := eventMeshMessage.(cloudevents.Event)
+ e.cloudEventProducer.Publish(event)
+ }
+}
diff --git a/eventmesh-sdk-go/http/producer/eventmesh_protocol_producer.go b/eventmesh-sdk-go/http/producer/eventmesh_protocol_producer.go
new file mode 100644
index 0000000..ab70cb1
--- /dev/null
+++ b/eventmesh-sdk-go/http/producer/eventmesh_protocol_producer.go
@@ -0,0 +1,5 @@
+package producer
+
+type EventMeshProtocolProducer interface {
+ Publish(eventMeshMessage interface{})
+}
diff --git a/eventmesh-sdk-go/http/utils/http_utils.go b/eventmesh-sdk-go/http/utils/http_utils.go
new file mode 100644
index 0000000..4333e84
--- /dev/null
+++ b/eventmesh-sdk-go/http/utils/http_utils.go
@@ -0,0 +1,41 @@
+package utils
+
+import (
+ "eventmesh/http/model"
+ "io/ioutil"
+ nethttp "net/http"
+ "net/url"
+ "strings"
+)
+
+func HttpPost(client *nethttp.Client, uri string, requestParam *model.RequestParam) string {
+
+ data := url.Values{}
+ body := requestParam.Body()
+ for key := range body {
+ data.Set(key, body[key])
+ }
+
+ req, err := nethttp.NewRequest(nethttp.MethodPost, uri, strings.NewReader(data.Encode()))
+ if err != nil {
+ }
+
+ req.Header.Set("Content-Type", "application/x-www-form-urlencoded; charset=UTF-8")
+
+ headers := requestParam.Headers()
+ for header := range headers {
+ req.Header[header] = []string{headers[header]}
+ }
+
+ resp, err := client.Do(req)
+ if err != nil {
+ }
+
+ defer resp.Body.Close()
+
+ respBody, err := ioutil.ReadAll(resp.Body)
+ if err != nil {
+ }
+
+ return string(respBody)
+}
diff --git a/eventmesh-sdk-go/main.go b/eventmesh-sdk-go/main.go
new file mode 100644
index 0000000..1e04a65
--- /dev/null
+++ b/eventmesh-sdk-go/main.go
@@ -0,0 +1,12 @@
+package main
+
+import "eventmesh/examples/tcp"
+
+func main() {
+ // HTTP Test
+ //http.AsyncPubCloudEvents()
+ //http.SubCloudEvents()
+
+ // TCP Test
+ tcp.AsyncPubCloudEvents()
+}
diff --git a/eventmesh-sdk-go/tcp/cloudevent_tcp_client.go b/eventmesh-sdk-go/tcp/cloudevent_tcp_client.go
new file mode 100644
index 0000000..4563997
--- /dev/null
+++ b/eventmesh-sdk-go/tcp/cloudevent_tcp_client.go
@@ -0,0 +1,35 @@
+package tcp
+
+import (
+ gtcp "eventmesh/common/protocol/tcp"
+ "eventmesh/tcp/conf"
+)
+
+type CloudEventTCPClient struct {
+ cloudEventTCPPubClient *CloudEventTCPPubClient
+ cloudEventTCPSubClient *CloudEventTCPSubClient
+}
+
+func NewCloudEventTCPClient(eventMeshTcpClientConfig conf.EventMeshTCPClientConfig) *CloudEventTCPClient {
+ return &CloudEventTCPClient{
+ cloudEventTCPPubClient: NewCloudEventTCPPubClient(eventMeshTcpClientConfig),
+ cloudEventTCPSubClient: NewCloudEventTCPSubClient(eventMeshTcpClientConfig),
+ }
+}
+
+func (c *CloudEventTCPClient) Init() {
+ c.cloudEventTCPPubClient.init()
+ c.cloudEventTCPSubClient.init()
+}
+
+func (c *CloudEventTCPClient) Publish(message interface{}, timeout int64) gtcp.Package {
+ return c.cloudEventTCPPubClient.publish(message, timeout)
+}
+
+func (c *CloudEventTCPClient) GetPubClient() EventMeshTCPPubClient {
+ return c.cloudEventTCPPubClient
+}
+
+func (c *CloudEventTCPClient) GetSubClient() EventMeshTCPSubClient {
+ return c.cloudEventTCPSubClient
+}
diff --git a/eventmesh-sdk-go/tcp/cloudevent_tcp_pub_client.go b/eventmesh-sdk-go/tcp/cloudevent_tcp_pub_client.go
new file mode 100644
index 0000000..ea08329
--- /dev/null
+++ b/eventmesh-sdk-go/tcp/cloudevent_tcp_pub_client.go
@@ -0,0 +1,31 @@
+package tcp
+
+import (
+ "eventmesh/common/protocol/tcp"
+ "eventmesh/tcp/conf"
+ "eventmesh/tcp/utils"
+)
+
+type CloudEventTCPPubClient struct {
+ *BaseTCPClient
+}
+
+func NewCloudEventTCPPubClient(eventMeshTcpClientConfig conf.EventMeshTCPClientConfig) *CloudEventTCPPubClient {
+ return &CloudEventTCPPubClient{BaseTCPClient: NewBaseTCPClient(eventMeshTcpClientConfig)}
+}
+
+func (c CloudEventTCPPubClient) init() {
+ c.Open()
+ c.Hello()
+ c.Heartbeat()
+}
+
+func (c CloudEventTCPPubClient) reconnect() {
+ c.Reconnect()
+ c.Heartbeat()
+}
+
+func (c CloudEventTCPPubClient) publish(message interface{}, timeout int64) tcp.Package {
+ msg := utils.BuildPackage(message, tcp.DefaultCommand.ASYNC_MESSAGE_TO_SERVER)
+ return c.IO(msg, timeout)
+}
diff --git a/eventmesh-sdk-go/tcp/cloudevent_tcp_sub_client.go b/eventmesh-sdk-go/tcp/cloudevent_tcp_sub_client.go
new file mode 100644
index 0000000..c524f19
--- /dev/null
+++ b/eventmesh-sdk-go/tcp/cloudevent_tcp_sub_client.go
@@ -0,0 +1,26 @@
+package tcp
+
+import (
+ "eventmesh/common/protocol"
+ "eventmesh/tcp/conf"
+)
+
+type CloudEventTCPSubClient struct {
+ *BaseTCPClient
+}
+
+func NewCloudEventTCPSubClient(eventMeshTcpClientConfig conf.EventMeshTCPClientConfig) *CloudEventTCPSubClient {
+ return &CloudEventTCPSubClient{BaseTCPClient: NewBaseTCPClient(eventMeshTcpClientConfig)}
+}
+
+func (c CloudEventTCPSubClient) init() {
+ //panic("implement me")
+}
+
+func (c CloudEventTCPSubClient) subscribe(topic string, subscriptionMode protocol.SubscriptionMode, subscriptionType protocol.SubscriptionType) {
+ panic("implement me")
+}
+
+func (c CloudEventTCPSubClient) unsubscribe() {
+ panic("implement me")
+}
diff --git a/eventmesh-sdk-go/tcp/common/eventmesh_common.go b/eventmesh-sdk-go/tcp/common/eventmesh_common.go
new file mode 100644
index 0000000..262f29e
--- /dev/null
+++ b/eventmesh-sdk-go/tcp/common/eventmesh_common.go
@@ -0,0 +1,22 @@
+package common
+
+var EventMeshCommon = struct {
+ // Timeout time shared by the server
+ DEFAULT_TIME_OUT_MILLS int
+
+ // User agent
+ USER_AGENT_PURPOSE_PUB string
+ USER_AGENT_PURPOSE_SUB string
+
+ // Protocol type
+ CLOUD_EVENTS_PROTOCOL_NAME string
+ EM_MESSAGE_PROTOCOL_NAME string
+ OPEN_MESSAGE_PROTOCOL_NAME string
+}{
+ DEFAULT_TIME_OUT_MILLS: 20 * 1000,
+ USER_AGENT_PURPOSE_PUB: "pub",
+ USER_AGENT_PURPOSE_SUB: "sub",
+ CLOUD_EVENTS_PROTOCOL_NAME: "cloudevents",
+ EM_MESSAGE_PROTOCOL_NAME: "eventmeshmessage",
+ OPEN_MESSAGE_PROTOCOL_NAME: "openmessage",
+}
diff --git a/eventmesh-sdk-go/tcp/common/request_context.go b/eventmesh-sdk-go/tcp/common/request_context.go
new file mode 100644
index 0000000..4c7b681
--- /dev/null
+++ b/eventmesh-sdk-go/tcp/common/request_context.go
@@ -0,0 +1,60 @@
+package common
+
+import (
+ "eventmesh/common/protocol/tcp"
+ "sync"
+)
+
+type RequestContext struct {
+ key interface{}
+ request tcp.Package
+ response tcp.Package
+ wg sync.WaitGroup
+}
+
+func (r *RequestContext) Key() interface{} {
+ return r.key
+}
+
+func (r *RequestContext) SetKey(key interface{}) {
+ r.key = key
+}
+
+func (r *RequestContext) Request() tcp.Package {
+ return r.request
+}
+
+func (r *RequestContext) SetRequest(request tcp.Package) {
+ r.request = request
+}
+
+func (r *RequestContext) Response() tcp.Package {
+ return r.response
+}
+
+func (r *RequestContext) SetResponse(response tcp.Package) {
+ r.response = response
+}
+
+func (r *RequestContext) Wg() sync.WaitGroup {
+ return r.wg
+}
+
+func (r *RequestContext) SetWg(wg sync.WaitGroup) {
+ r.wg = wg
+}
+
+func (r *RequestContext) Finish(message tcp.Package) {
+ r.response = message
+ //r.wg.Done()
+}
+
+func GetRequestContextKey(request tcp.Package) interface{} {
+ return request.Header.Seq
+}
+
+func NewRequestContext(key interface{}, request tcp.Package, latch int) *RequestContext {
+ ctx := &RequestContext{key: key, request: request}
+ //ctx.Wg().Add(latch)
+ return ctx
+}
diff --git a/eventmesh-sdk-go/tcp/conf/eventmesh_tcp_client_config.go b/eventmesh-sdk-go/tcp/conf/eventmesh_tcp_client_config.go
new file mode 100644
index 0000000..b353aa3
--- /dev/null
+++ b/eventmesh-sdk-go/tcp/conf/eventmesh_tcp_client_config.go
@@ -0,0 +1,37 @@
+package conf
+
+import "eventmesh/common/protocol/tcp"
+
+type EventMeshTCPClientConfig struct {
+ host string
+ port int
+ userAgent tcp.UserAgent
+}
+
+func NewEventMeshTCPClientConfig(host string, port int, userAgent tcp.UserAgent) *EventMeshTCPClientConfig {
+ return &EventMeshTCPClientConfig{host: host, port: port, userAgent: userAgent}
+}
+
+func (e *EventMeshTCPClientConfig) Host() string {
+ return e.host
+}
+
+func (e *EventMeshTCPClientConfig) SetHost(host string) {
+ e.host = host
+}
+
+func (e *EventMeshTCPClientConfig) Port() int {
+ return e.port
+}
+
+func (e *EventMeshTCPClientConfig) SetPort(port int) {
+ e.port = port
+}
+
+func (e *EventMeshTCPClientConfig) UserAgent() tcp.UserAgent {
+ return e.userAgent
+}
+
+func (e *EventMeshTCPClientConfig) SetUserAgent(userAgent tcp.UserAgent) {
+ e.userAgent = userAgent
+}
diff --git a/eventmesh-sdk-go/tcp/eventmesh_tcp_client.go b/eventmesh-sdk-go/tcp/eventmesh_tcp_client.go
new file mode 100644
index 0000000..ffa3414
--- /dev/null
+++ b/eventmesh-sdk-go/tcp/eventmesh_tcp_client.go
@@ -0,0 +1,10 @@
+package tcp
+
+import gtcp "eventmesh/common/protocol/tcp"
+
+type EventMeshTCPClient interface {
+ Init()
+ Publish(msg interface{}, timeout int64) gtcp.Package
+ GetPubClient() EventMeshTCPPubClient
+ GetSubClient() EventMeshTCPSubClient
+}
diff --git a/eventmesh-sdk-go/tcp/eventmesh_tcp_client_factory.go b/eventmesh-sdk-go/tcp/eventmesh_tcp_client_factory.go
new file mode 100644
index 0000000..a8c8fc9
--- /dev/null
+++ b/eventmesh-sdk-go/tcp/eventmesh_tcp_client_factory.go
@@ -0,0 +1,15 @@
+package tcp
+
+import (
+ "eventmesh/common/protocol"
+ "eventmesh/tcp/conf"
+)
+
+func CreateEventMeshTCPClient(eventMeshTcpClientConfig conf.EventMeshTCPClientConfig, messageType protocol.MessageType) EventMeshTCPClient {
+
+ if messageType == protocol.DefaultMessageType.CloudEvent {
+ return NewCloudEventTCPClient(eventMeshTcpClientConfig)
+ }
+
+ return nil
+}
diff --git a/eventmesh-sdk-go/tcp/eventmesh_tcp_pub_client.go b/eventmesh-sdk-go/tcp/eventmesh_tcp_pub_client.go
new file mode 100644
index 0000000..9bcabf4
--- /dev/null
+++ b/eventmesh-sdk-go/tcp/eventmesh_tcp_pub_client.go
@@ -0,0 +1,9 @@
+package tcp
+
+import gtcp "eventmesh/common/protocol/tcp"
+
+type EventMeshTCPPubClient interface {
+ init()
+ reconnect()
+ publish(message interface{}, timeout int64) gtcp.Package
+}
diff --git a/eventmesh-sdk-go/tcp/eventmesh_tcp_sub_client.go b/eventmesh-sdk-go/tcp/eventmesh_tcp_sub_client.go
new file mode 100644
index 0000000..a496e49
--- /dev/null
+++ b/eventmesh-sdk-go/tcp/eventmesh_tcp_sub_client.go
@@ -0,0 +1,9 @@
+package tcp
+
+import "eventmesh/common/protocol"
+
+type EventMeshTCPSubClient interface {
+ init()
+ subscribe(topic string, subscriptionMode protocol.SubscriptionMode, subscriptionType protocol.SubscriptionType)
+ unsubscribe()
+}
diff --git a/eventmesh-sdk-go/tcp/tcp_client.go b/eventmesh-sdk-go/tcp/tcp_client.go
new file mode 100644
index 0000000..79bf276
--- /dev/null
+++ b/eventmesh-sdk-go/tcp/tcp_client.go
@@ -0,0 +1,129 @@
+package tcp
+
+import (
+ "bufio"
+ "bytes"
+ "eventmesh/common/protocol/tcp"
+ "eventmesh/common/protocol/tcp/codec"
+ "eventmesh/tcp/common"
+ "eventmesh/tcp/conf"
+ "eventmesh/tcp/utils"
+ "io"
+ "log"
+ "math/rand"
+ "net"
+ "strconv"
+)
+
+type BaseTCPClient struct {
+ clientNo int
+ host string
+ port int
+ useAgent tcp.UserAgent
+ conn net.Conn
+}
+
+func NewBaseTCPClient(eventMeshTcpClientConfig conf.EventMeshTCPClientConfig) *BaseTCPClient {
+ return &BaseTCPClient{
+ clientNo: rand.Intn(10000),
+ host: eventMeshTcpClientConfig.Host(),
+ port: eventMeshTcpClientConfig.Port(),
+ useAgent: eventMeshTcpClientConfig.UserAgent(),
+ }
+}
+
+func (c *BaseTCPClient) Open() {
+ eventMeshIpAndPort := c.host + ":" + strconv.Itoa(c.port)
+ conn, err := net.Dial("tcp", eventMeshIpAndPort)
+ if err != nil {
+ log.Fatal("Failed to dial")
+ }
+ c.conn = conn
+
+ go c.read()
+}
+
+func (c *BaseTCPClient) Close() {
+ if c.conn != nil {
+ err := c.conn.Close()
+ if err != nil {
+ log.Fatal("Failed to close connection")
+ }
+ c.Goodbye()
+ }
+}
+
+func (c *BaseTCPClient) Heartbeat() {
+ msg := utils.BuildHeartBeatPackage()
+ c.IO(msg, 1000)
+}
+
+func (c *BaseTCPClient) Hello() {
+ msg := utils.BuildHelloPackage(c.useAgent)
+ c.IO(msg, 1000)
+}
+
+func (c *BaseTCPClient) Reconnect() {
+
+}
+
+func (c *BaseTCPClient) Goodbye() {
+
+}
+
+func (c *BaseTCPClient) IsActive() {
+
+}
+
+func (c *BaseTCPClient) read() error {
+ for {
+ var buf bytes.Buffer
+ for {
+ reader := bufio.NewReader(c.conn)
+ msg, isPrefix, err := reader.ReadLine()
+ if err != nil {
+ if err == io.EOF {
+ break
+ }
+ return err
+ }
+
+ buf.Write(msg)
+ if !isPrefix {
+ break
+ }
+ }
+
+ go c.handleRead(&buf)
+ }
+}
+
+func (c *BaseTCPClient) handleRead(in *bytes.Buffer) {
+ decoded := codec.DecodePackage(in)
+ log.Printf("Read from server: %v\n", decoded)
+ // TODO Handle according to the command
+}
+
+func (c *BaseTCPClient) write(message []byte) (int, error) {
+ writer := bufio.NewWriter(c.conn)
+ n, err := writer.Write(message)
+ if err == nil {
+ err = writer.Flush()
+ }
+ return n, err
+}
+
+func (c *BaseTCPClient) Send(message tcp.Package) {
+ out := codec.EncodePackage(message)
+ _, err := c.write(out.Bytes())
+ if err != nil {
+ log.Fatal("Failed to write to peer")
+ }
+}
+
+func (c *BaseTCPClient) IO(message tcp.Package, timeout int64) tcp.Package {
+ key := common.GetRequestContextKey(message)
+ ctx := common.NewRequestContext(key, message, 1)
+ c.Send(message)
+ return ctx.Response()
+}
diff --git a/eventmesh-sdk-go/tcp/utils/message_utils.go b/eventmesh-sdk-go/tcp/utils/message_utils.go
new file mode 100644
index 0000000..1b4e6fb
--- /dev/null
+++ b/eventmesh-sdk-go/tcp/utils/message_utils.go
@@ -0,0 +1,45 @@
+package utils
+
+import (
+ gcommon "eventmesh/common"
+ "eventmesh/common/protocol/tcp"
+ "eventmesh/tcp/common"
+ cloudevents "github.com/cloudevents/sdk-go/v2"
+ "log"
+)
+
+func BuildPackage(message interface{}, command tcp.Command) tcp.Package {
+ // FIXME Support random sequence
+ header := tcp.NewHeader(command, 0, "", "22222")
+ pkg := tcp.NewPackage(header)
+
+ if _, ok := message.(cloudevents.Event); ok {
+ event := message.(cloudevents.Event)
+ eventBytes, err := event.MarshalJSON()
+ if err != nil {
+ log.Fatal("Failed to marshal cloud event")
+ }
+
+ pkg.Header.PutProperty(gcommon.Constants.PROTOCOL_TYPE, common.EventMeshCommon.CLOUD_EVENTS_PROTOCOL_NAME)
+ pkg.Header.PutProperty(gcommon.Constants.PROTOCOL_VERSION, event.SpecVersion())
+ pkg.Header.PutProperty(gcommon.Constants.PROTOCOL_DESC, "tcp")
+ pkg.Body = eventBytes
+ }
+
+ return pkg
+}
+
+func BuildHelloPackage(agent tcp.UserAgent) tcp.Package {
+ // FIXME Support random sequence
+ header := tcp.NewHeader(tcp.DefaultCommand.HELLO_REQUEST, 0, "", "22222")
+ msg := tcp.NewPackage(header)
+ msg.Body = agent
+ return msg
+}
+
+func BuildHeartBeatPackage() tcp.Package {
+ // FIXME Support random sequence
+ header := tcp.NewHeader(tcp.DefaultCommand.HEARTBEAT_REQUEST, 0, "", "22222")
+ msg := tcp.NewPackage(header)
+ return msg
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org