You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by ch...@apache.org on 2022/04/01 02:23:48 UTC

[incubator-eventmesh] branch go-sdk updated: Support golang sdk (#762)

This is an automated email from the ASF dual-hosted git repository.

chenguangsheng pushed a commit to branch go-sdk
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git


The following commit(s) were added to refs/heads/go-sdk by this push:
     new dc6bd50  Support golang sdk (#762)
     new c9f408f  Merge pull request #803 from xiehongfeng100/go-sdk
dc6bd50 is described below

commit dc6bd50954685cfdc3e414b55092d51a0e65f363
Author: xiehongfeng100 <xi...@hotmail.com>
AuthorDate: Sun Mar 6 23:40:43 2022 +0800

    Support golang sdk (#762)
---
 eventmesh-sdk-go/common/constants.go               |  33 ++++
 eventmesh-sdk-go/common/protocol/http/body/body.go |   9 +
 .../http/body/client/heartbeat_request_body.go     |  57 ++++++
 .../http/body/client/subscribe_request_body.go     |  23 +++
 .../common/protocol/http/common/client_type.go     |  20 ++
 .../protocol/http/common/eventmesh_ret_code.go     |  12 ++
 .../common/protocol/http/common/protocol_key.go    |  70 +++++++
 .../protocol/http/common/protocol_version.go       |  25 +++
 .../common/protocol/http/common/request_code.go    |  80 ++++++++
 .../http/message/send_message_request_body.go      |  96 ++++++++++
 eventmesh-sdk-go/common/protocol/message_type.go   |  13 ++
 .../common/protocol/subscription_item.go           |   7 +
 .../common/protocol/subscription_mode.go           |  11 ++
 .../common/protocol/subscription_type.go           |  11 ++
 .../common/protocol/tcp/codec/codec.go             | 168 +++++++++++++++++
 eventmesh-sdk-go/common/protocol/tcp/command.go    | 202 +++++++++++++++++++++
 eventmesh-sdk-go/common/protocol/tcp/header.go     |  80 ++++++++
 eventmesh-sdk-go/common/protocol/tcp/package.go    |  10 +
 eventmesh-sdk-go/common/protocol/tcp/user_agent.go |  23 +++
 eventmesh-sdk-go/common/utils/json_utils.go        |  29 +++
 .../examples/http/async_pub_cloudevents.go         |  52 ++++++
 eventmesh-sdk-go/examples/http/sub_cloudevents.go  |  99 ++++++++++
 .../examples/tcp/async_pub_cloudevents.go          |  45 +++++
 eventmesh-sdk-go/go.mod                            |   8 +
 eventmesh-sdk-go/go.sum                            |  39 ++++
 eventmesh-sdk-go/http/abstract_http_client.go      |  43 +++++
 .../http/conf/eventmesh_http_client_config.go      | 144 +++++++++++++++
 .../http/consumer/eventmesh_http_consumer.go       |  95 ++++++++++
 eventmesh-sdk-go/http/eventmesh_ret_obj.go         |   7 +
 eventmesh-sdk-go/http/model/request_param.go       |  53 ++++++
 .../http/producer/cloudevent_producer.go           |  85 +++++++++
 .../http/producer/eventmesh_http_producer.go       |  27 +++
 .../http/producer/eventmesh_protocol_producer.go   |   5 +
 eventmesh-sdk-go/http/utils/http_utils.go          |  41 +++++
 eventmesh-sdk-go/main.go                           |  12 ++
 eventmesh-sdk-go/tcp/cloudevent_tcp_client.go      |  35 ++++
 eventmesh-sdk-go/tcp/cloudevent_tcp_pub_client.go  |  31 ++++
 eventmesh-sdk-go/tcp/cloudevent_tcp_sub_client.go  |  26 +++
 eventmesh-sdk-go/tcp/common/eventmesh_common.go    |  22 +++
 eventmesh-sdk-go/tcp/common/request_context.go     |  60 ++++++
 .../tcp/conf/eventmesh_tcp_client_config.go        |  37 ++++
 eventmesh-sdk-go/tcp/eventmesh_tcp_client.go       |  10 +
 .../tcp/eventmesh_tcp_client_factory.go            |  15 ++
 eventmesh-sdk-go/tcp/eventmesh_tcp_pub_client.go   |   9 +
 eventmesh-sdk-go/tcp/eventmesh_tcp_sub_client.go   |   9 +
 eventmesh-sdk-go/tcp/tcp_client.go                 | 129 +++++++++++++
 eventmesh-sdk-go/tcp/utils/message_utils.go        |  45 +++++
 47 files changed, 2162 insertions(+)

diff --git a/eventmesh-sdk-go/common/constants.go b/eventmesh-sdk-go/common/constants.go
new file mode 100644
index 0000000..bdc0859
--- /dev/null
+++ b/eventmesh-sdk-go/common/constants.go
@@ -0,0 +1,33 @@
+package common
+
+var Constants = struct {
+	LANGUAGE_GO                 string
+	HTTP_PROTOCOL_PREFIX        string
+	HTTPS_PROTOCOL_PREFIX       string
+	PROTOCOL_TYPE               string
+	PROTOCOL_VERSION            string
+	PROTOCOL_DESC               string
+	DEFAULT_HTTP_TIME_OUT       int64
+	EVENTMESH_MESSAGE_CONST_TTL string
+
+	// Client heartbeat interval
+	HEARTBEAT int64
+
+	// Protocol type
+	CLOUD_EVENTS_PROTOCOL_NAME string
+	EM_MESSAGE_PROTOCOL_NAME   string
+	OPEN_MESSAGE_PROTOCOL_NAME string
+}{
+	LANGUAGE_GO:                 "GO",
+	HTTP_PROTOCOL_PREFIX:        "http://",
+	HTTPS_PROTOCOL_PREFIX:       "https://",
+	PROTOCOL_TYPE:               "protocoltype",
+	PROTOCOL_VERSION:            "protocolversion",
+	PROTOCOL_DESC:               "protocoldesc",
+	DEFAULT_HTTP_TIME_OUT:       15000,
+	EVENTMESH_MESSAGE_CONST_TTL: "ttl",
+	HEARTBEAT:                   30 * 1000,
+	CLOUD_EVENTS_PROTOCOL_NAME:  "cloudevents",
+	EM_MESSAGE_PROTOCOL_NAME:    "eventmeshmessage",
+	OPEN_MESSAGE_PROTOCOL_NAME:  "openmessage",
+}
diff --git a/eventmesh-sdk-go/common/protocol/http/body/body.go b/eventmesh-sdk-go/common/protocol/http/body/body.go
new file mode 100644
index 0000000..b050820
--- /dev/null
+++ b/eventmesh-sdk-go/common/protocol/http/body/body.go
@@ -0,0 +1,9 @@
+package body
+
+type Body struct {
+	ToMap map[string]interface{}
+}
+
+func (b *Body) BuildBody(requestCode string, originalMap map[string]interface{}) *Body {
+	return nil
+}
diff --git a/eventmesh-sdk-go/common/protocol/http/body/client/heartbeat_request_body.go b/eventmesh-sdk-go/common/protocol/http/body/client/heartbeat_request_body.go
new file mode 100644
index 0000000..2ae5ab3
--- /dev/null
+++ b/eventmesh-sdk-go/common/protocol/http/body/client/heartbeat_request_body.go
@@ -0,0 +1,57 @@
+package client
+
+import (
+	"eventmesh/common/protocol/http/body"
+)
+
+var HeartbeatRequestBodyKey = struct {
+	CLIENTTYPE        string
+	CONSUMERGROUP     string
+	HEARTBEATENTITIES string
+}{
+	CLIENTTYPE:        "clientType",
+	HEARTBEATENTITIES: "heartbeatEntities",
+	CONSUMERGROUP:     "consumerGroup",
+}
+
+type HeartbeatEntity struct {
+	Topic      string `json:"topic"`
+	Url        string `json:"url"`
+	ServiceId  string `json:"serviceId"`
+	InstanceId string `json:"instanceId"`
+}
+
+type HeartbeatRequestBody struct {
+	body.Body
+	consumerGroup     string
+	clientType        string
+	heartbeatEntities string
+}
+
+func (h *HeartbeatRequestBody) ConsumerGroup() string {
+	return h.consumerGroup
+}
+
+func (h *HeartbeatRequestBody) SetConsumerGroup(consumerGroup string) {
+	h.consumerGroup = consumerGroup
+}
+
+func (h *HeartbeatRequestBody) ClientType() string {
+	return h.clientType
+}
+
+func (h *HeartbeatRequestBody) SetClientType(clientType string) {
+	h.clientType = clientType
+}
+
+func (h *HeartbeatRequestBody) HeartbeatEntities() string {
+	return h.heartbeatEntities
+}
+
+func (h *HeartbeatRequestBody) SetHeartbeatEntities(heartbeatEntities string) {
+	h.heartbeatEntities = heartbeatEntities
+}
+
+func (h *HeartbeatRequestBody) BuildBody(bodyParam map[string]interface{}) *HeartbeatRequestBody {
+	return nil
+}
diff --git a/eventmesh-sdk-go/common/protocol/http/body/client/subscribe_request_body.go b/eventmesh-sdk-go/common/protocol/http/body/client/subscribe_request_body.go
new file mode 100644
index 0000000..07d797d
--- /dev/null
+++ b/eventmesh-sdk-go/common/protocol/http/body/client/subscribe_request_body.go
@@ -0,0 +1,23 @@
+package client
+
+import (
+	"eventmesh/common/protocol"
+	"eventmesh/common/protocol/http/body"
+)
+
+var SubscribeRequestBodyKey = struct {
+	TOPIC         string
+	URL           string
+	CONSUMERGROUP string
+}{
+	TOPIC:         "topic",
+	URL:           "url",
+	CONSUMERGROUP: "consumerGroup",
+}
+
+type SubscribeRequestBody struct {
+	body.Body
+	topics        []protocol.SubscriptionItem
+	url           string
+	consumerGroup string
+}
diff --git a/eventmesh-sdk-go/common/protocol/http/common/client_type.go b/eventmesh-sdk-go/common/protocol/http/common/client_type.go
new file mode 100644
index 0000000..b72ba80
--- /dev/null
+++ b/eventmesh-sdk-go/common/protocol/http/common/client_type.go
@@ -0,0 +1,20 @@
+package common
+
+type ClientType struct {
+	Type int    `json:"type"`
+	Desc string `json:"desc"`
+}
+
+var DefaultClientType = struct {
+	PUB ClientType
+	SUB ClientType
+}{
+	PUB: ClientType{
+		Type: 1,
+		Desc: "Client for publishing",
+	},
+	SUB: ClientType{
+		Type: 2,
+		Desc: "Client for subscribing",
+	},
+}
diff --git a/eventmesh-sdk-go/common/protocol/http/common/eventmesh_ret_code.go b/eventmesh-sdk-go/common/protocol/http/common/eventmesh_ret_code.go
new file mode 100644
index 0000000..3f26f82
--- /dev/null
+++ b/eventmesh-sdk-go/common/protocol/http/common/eventmesh_ret_code.go
@@ -0,0 +1,12 @@
+package common
+
+type EventMeshRetCode struct {
+	RetCode int    `json:"retCode"`
+	ErrMsg  string `json:"errMsg"`
+}
+
+var DefaultEventMeshRetCode = struct {
+	SUCCESS EventMeshRetCode
+}{
+	SUCCESS: EventMeshRetCode{RetCode: 0, ErrMsg: "success"},
+}
diff --git a/eventmesh-sdk-go/common/protocol/http/common/protocol_key.go b/eventmesh-sdk-go/common/protocol/http/common/protocol_key.go
new file mode 100644
index 0000000..6d67de8
--- /dev/null
+++ b/eventmesh-sdk-go/common/protocol/http/common/protocol_key.go
@@ -0,0 +1,70 @@
+package common
+
+type ClientInstanceKey struct {
+	//Protocol layer requester description
+	ENV      string
+	IDC      string
+	SYS      string
+	PID      string
+	IP       string
+	USERNAME string
+	PASSWORD string
+	BIZSEQNO string
+	UNIQUEID string
+}
+
+type EventMeshInstanceKey struct {
+	//Protocol layer EventMesh description
+	EVENTMESHCLUSTER string
+	EVENTMESHIP      string
+	EVENTMESHENV     string
+	EVENTMESHIDC     string
+}
+
+var ProtocolKey = struct {
+	REQUEST_CODE     string
+	LANGUAGE         string
+	VERSION          string
+	PROTOCOL_TYPE    string
+	PROTOCOL_VERSION string
+	PROTOCOL_DESC    string
+
+	ClientInstanceKey ClientInstanceKey
+
+	EventMeshInstanceKey EventMeshInstanceKey
+
+	//return of CLIENT <-> EventMesh
+	RETCODE string
+	RETMSG  string
+	RESTIME string
+}{
+	REQUEST_CODE:     "code",
+	LANGUAGE:         "language",
+	VERSION:          "version",
+	PROTOCOL_TYPE:    "protocoltype",
+	PROTOCOL_VERSION: "protocolversion",
+	PROTOCOL_DESC:    "protocoldesc",
+
+	ClientInstanceKey: ClientInstanceKey{
+		ENV:      "env",
+		IDC:      "idc",
+		SYS:      "sys",
+		PID:      "pid",
+		IP:       "ip",
+		USERNAME: "username",
+		PASSWORD: "passwd",
+		BIZSEQNO: "bizseqno",
+		UNIQUEID: "uniqueid",
+	},
+
+	EventMeshInstanceKey: EventMeshInstanceKey{
+		EVENTMESHCLUSTER: "eventmeshcluster",
+		EVENTMESHIP:      "eventmeship",
+		EVENTMESHENV:     "eventmeshenv",
+		EVENTMESHIDC:     "eventmeshidc",
+	},
+
+	RETCODE: "retCode",
+	RETMSG:  "retMsg",
+	RESTIME: "resTime",
+}
diff --git a/eventmesh-sdk-go/common/protocol/http/common/protocol_version.go b/eventmesh-sdk-go/common/protocol/http/common/protocol_version.go
new file mode 100644
index 0000000..f7fe6a5
--- /dev/null
+++ b/eventmesh-sdk-go/common/protocol/http/common/protocol_version.go
@@ -0,0 +1,25 @@
+package common
+
+type ProtocolVersion struct {
+	version string
+}
+
+func (p *ProtocolVersion) Version() string {
+	return p.version
+}
+
+func (p *ProtocolVersion) SetVersion(version string) {
+	p.version = version
+}
+
+var DefaultProtocolVersion = struct {
+	V1 ProtocolVersion
+	V2 ProtocolVersion
+}{
+	V1: ProtocolVersion{
+		version: "1.0",
+	},
+	V2: ProtocolVersion{
+		version: "2.0",
+	},
+}
diff --git a/eventmesh-sdk-go/common/protocol/http/common/request_code.go b/eventmesh-sdk-go/common/protocol/http/common/request_code.go
new file mode 100644
index 0000000..6ea3a66
--- /dev/null
+++ b/eventmesh-sdk-go/common/protocol/http/common/request_code.go
@@ -0,0 +1,80 @@
+package common
+
+type RequestCode struct {
+	RequestCode int    `json:"requestCode"`
+	Desc        string `json:"desc"`
+}
+
+var DefaultRequestCode = struct {
+	MSG_BATCH_SEND         RequestCode
+	MSG_BATCH_SEND_V2      RequestCode
+	MSG_SEND_SYNC          RequestCode
+	MSG_SEND_ASYNC         RequestCode
+	HTTP_PUSH_CLIENT_ASYNC RequestCode
+	HTTP_PUSH_CLIENT_SYNC  RequestCode
+	REGISTER               RequestCode
+	UNREGISTER             RequestCode
+	HEARTBEAT              RequestCode
+	SUBSCRIBE              RequestCode
+	UNSUBSCRIBE            RequestCode
+	REPLY_MESSAGE          RequestCode
+	ADMIN_METRICS          RequestCode
+	ADMIN_SHUTDOWN         RequestCode
+}{
+	MSG_BATCH_SEND: RequestCode{
+		RequestCode: 102,
+		Desc:        "SEND BATCH MSG",
+	},
+	MSG_BATCH_SEND_V2: RequestCode{
+		RequestCode: 107,
+		Desc:        "SEND BATCH MSG V2",
+	},
+	MSG_SEND_SYNC: RequestCode{
+		RequestCode: 101,
+		Desc:        "SEND SINGLE MSG SYNC",
+	},
+	MSG_SEND_ASYNC: RequestCode{
+		RequestCode: 104,
+		Desc:        "SEND SINGLE MSG ASYNC",
+	},
+	HTTP_PUSH_CLIENT_ASYNC: RequestCode{
+		RequestCode: 105,
+		Desc:        "PUSH CLIENT BY HTTP POST",
+	},
+	HTTP_PUSH_CLIENT_SYNC: RequestCode{
+		RequestCode: 106,
+		Desc:        "PUSH CLIENT BY HTTP POST",
+	},
+	REGISTER: RequestCode{
+		RequestCode: 201,
+		Desc:        "REGISTER",
+	},
+	UNREGISTER: RequestCode{
+		RequestCode: 202,
+		Desc:        "UNREGISTER",
+	},
+	HEARTBEAT: RequestCode{
+		RequestCode: 203,
+		Desc:        "HEARTBEAT",
+	},
+	SUBSCRIBE: RequestCode{
+		RequestCode: 206,
+		Desc:        "SUBSCRIBE",
+	},
+	UNSUBSCRIBE: RequestCode{
+		RequestCode: 207,
+		Desc:        "UNSUBSCRIBE",
+	},
+	REPLY_MESSAGE: RequestCode{
+		RequestCode: 301,
+		Desc:        "REPLY MESSAGE",
+	},
+	ADMIN_METRICS: RequestCode{
+		RequestCode: 603,
+		Desc:        "ADMIN METRICS",
+	},
+	ADMIN_SHUTDOWN: RequestCode{
+		RequestCode: 601,
+		Desc:        "ADMIN SHUTDOWN",
+	},
+}
diff --git a/eventmesh-sdk-go/common/protocol/http/message/send_message_request_body.go b/eventmesh-sdk-go/common/protocol/http/message/send_message_request_body.go
new file mode 100644
index 0000000..0fc0744
--- /dev/null
+++ b/eventmesh-sdk-go/common/protocol/http/message/send_message_request_body.go
@@ -0,0 +1,96 @@
+package message
+
+type SendMessageRequestBody struct {
+	topic         string
+	bizSeqNo      string
+	uniqueId      string
+	ttl           string
+	content       string
+	tag           string
+	extFields     map[string]string
+	producerGroup string
+}
+
+func (s *SendMessageRequestBody) Topic() string {
+	return s.topic
+}
+
+func (s *SendMessageRequestBody) SetTopic(topic string) {
+	s.topic = topic
+}
+
+func (s *SendMessageRequestBody) BizSeqNo() string {
+	return s.bizSeqNo
+}
+
+func (s *SendMessageRequestBody) SetBizSeqNo(bizSeqNo string) {
+	s.bizSeqNo = bizSeqNo
+}
+
+func (s *SendMessageRequestBody) UniqueId() string {
+	return s.uniqueId
+}
+
+func (s *SendMessageRequestBody) SetUniqueId(uniqueId string) {
+	s.uniqueId = uniqueId
+}
+
+func (s *SendMessageRequestBody) Ttl() string {
+	return s.ttl
+}
+
+func (s *SendMessageRequestBody) SetTtl(ttl string) {
+	s.ttl = ttl
+}
+
+func (s *SendMessageRequestBody) Content() string {
+	return s.content
+}
+
+func (s *SendMessageRequestBody) SetContent(content string) {
+	s.content = content
+}
+
+func (s *SendMessageRequestBody) Tag() string {
+	return s.tag
+}
+
+func (s *SendMessageRequestBody) SetTag(tag string) {
+	s.tag = tag
+}
+
+func (s *SendMessageRequestBody) ExtFields() map[string]string {
+	return s.extFields
+}
+
+func (s *SendMessageRequestBody) SetExtFields(extFields map[string]string) {
+	s.extFields = extFields
+}
+
+func (s *SendMessageRequestBody) ProducerGroup() string {
+	return s.producerGroup
+}
+
+func (s *SendMessageRequestBody) SetProducerGroup(producerGroup string) {
+	s.producerGroup = producerGroup
+}
+
+var SendMessageRequestBodyKey = struct {
+	TOPIC         string
+	BIZSEQNO      string
+	UNIQUEID      string
+	CONTENT       string
+	TTL           string
+	TAG           string
+	EXTFIELDS     string
+	PRODUCERGROUP string
+}{
+	TOPIC:         "topic",
+	BIZSEQNO:      "bizseqno",
+	UNIQUEID:      "uniqueid",
+	CONTENT:       "content",
+	TTL:           "ttl",
+	TAG:           "tag",
+	EXTFIELDS:     "extFields",
+	PRODUCERGROUP: "producergroup",
+}
diff --git a/eventmesh-sdk-go/common/protocol/message_type.go b/eventmesh-sdk-go/common/protocol/message_type.go
new file mode 100644
index 0000000..36f87e8
--- /dev/null
+++ b/eventmesh-sdk-go/common/protocol/message_type.go
@@ -0,0 +1,13 @@
+package protocol
+
+type MessageType string
+
+var DefaultMessageType = struct {
+	CloudEvent       MessageType
+	OpenMessage      MessageType
+	EventMeshMessage MessageType
+}{
+	CloudEvent:       "CloudEvent",
+	OpenMessage:      "OpenMessage",
+	EventMeshMessage: "EventMeshMessage",
+}
diff --git a/eventmesh-sdk-go/common/protocol/subscription_item.go b/eventmesh-sdk-go/common/protocol/subscription_item.go
new file mode 100644
index 0000000..3cfa3c0
--- /dev/null
+++ b/eventmesh-sdk-go/common/protocol/subscription_item.go
@@ -0,0 +1,7 @@
+package protocol
+
+type SubscriptionItem struct {
+	Topic string           `json:"topic"`
+	Mode  SubscriptionMode `json:"mode"`
+	Type  SubscriptionType `json:"type"`
+}
diff --git a/eventmesh-sdk-go/common/protocol/subscription_mode.go b/eventmesh-sdk-go/common/protocol/subscription_mode.go
new file mode 100644
index 0000000..1cf6961
--- /dev/null
+++ b/eventmesh-sdk-go/common/protocol/subscription_mode.go
@@ -0,0 +1,11 @@
+package protocol
+
+type SubscriptionMode string
+
+var DefaultSubscriptionMode = struct {
+	BROADCASTING SubscriptionMode
+	CLUSTERING   SubscriptionMode
+}{
+	BROADCASTING: "BROADCASTING",
+	CLUSTERING:   "CLUSTERING",
+}
diff --git a/eventmesh-sdk-go/common/protocol/subscription_type.go b/eventmesh-sdk-go/common/protocol/subscription_type.go
new file mode 100644
index 0000000..3d09844
--- /dev/null
+++ b/eventmesh-sdk-go/common/protocol/subscription_type.go
@@ -0,0 +1,11 @@
+package protocol
+
+type SubscriptionType string
+
+var DefaultSubscriptionType = struct {
+	SYNC  SubscriptionType
+	ASYNC SubscriptionType
+}{
+	SYNC:  "SYNC",
+	ASYNC: "ASYNC",
+}
diff --git a/eventmesh-sdk-go/common/protocol/tcp/codec/codec.go b/eventmesh-sdk-go/common/protocol/tcp/codec/codec.go
new file mode 100644
index 0000000..396ef7a
--- /dev/null
+++ b/eventmesh-sdk-go/common/protocol/tcp/codec/codec.go
@@ -0,0 +1,168 @@
+package codec
+
+import (
+	"bytes"
+	"encoding/binary"
+	gcommon "eventmesh/common"
+	"eventmesh/common/protocol/tcp"
+	gutils "eventmesh/common/utils"
+	"eventmesh/tcp/common"
+	"log"
+)
+
+const (
+	MAGIC       = "EventMesh"
+	VERSION     = "0000"
+	LENGTH_SIZE = 4
+)
+
+func EncodePackage(message tcp.Package) *bytes.Buffer {
+
+	header := message.Header
+	headerData := header.Marshal()
+
+	var bodyData []byte
+	if header.GetProperty(gcommon.Constants.PROTOCOL_TYPE) != common.EventMeshCommon.CLOUD_EVENTS_PROTOCOL_NAME {
+		bodyData = gutils.MarshalJsonBytes(message.Body)
+	} else {
+		bodyData = (message.Body).([]byte)
+	}
+
+	headerLen := len(headerData)
+	bodyLen := len(bodyData)
+
+	length := LENGTH_SIZE + LENGTH_SIZE + headerLen + bodyLen
+
+	var out bytes.Buffer
+	out.WriteString(MAGIC)
+	out.WriteString(VERSION)
+
+	lengthBytes := make([]byte, LENGTH_SIZE)
+	binary.BigEndian.PutUint32(lengthBytes, uint32(length))
+
+	headerLenBytes := make([]byte, LENGTH_SIZE)
+	binary.BigEndian.PutUint32(headerLenBytes, uint32(headerLen))
+
+	out.Write(lengthBytes)
+	out.Write(headerLenBytes)
+	out.Write(headerData)
+	out.Write(bodyData)
+
+	return &out
+}
+
+func DecodePackage(in *bytes.Buffer) tcp.Package {
+	flagBytes := parseFlag(in)
+	versionBytes := parseVersion(in)
+	validateFlag(flagBytes, versionBytes)
+
+	length := parseLength(in)
+	headerLen := parseLength(in)
+	bodyLen := length - headerLen - LENGTH_SIZE - LENGTH_SIZE
+	header := parseHeader(in, int(headerLen))
+	body := parseBody(in, header, int(bodyLen))
+	return tcp.Package{Header: header, Body: body}
+}
+
+func parseFlag(in *bytes.Buffer) []byte {
+	flagLen := len([]byte(MAGIC))
+	flagBytes := make([]byte, flagLen)
+	n, err := in.Read(flagBytes)
+	if err != nil {
+		return nil
+	}
+	log.Printf("read %d bytes (flag) \n", n)
+	return flagBytes
+}
+
+func parseVersion(in *bytes.Buffer) []byte {
+	verLen := len([]byte(VERSION))
+	verBytes := make([]byte, verLen)
+	n, err := in.Read(verBytes)
+	if err != nil {
+		return nil
+	}
+	log.Printf("read %d bytes (version) \n", n)
+	return verBytes
+}
+
+func parseLength(in *bytes.Buffer) uint32 {
+	lenBytes := make([]byte, 4)
+	n, err := in.Read(lenBytes)
+	if err != nil {
+		log.Fatal("Failed to parse length")
+	}
+	log.Printf("read %d bytes (length) \n", n)
+	return binary.BigEndian.Uint32(lenBytes)
+}
+
+func parseHeader(in *bytes.Buffer, headerLen int) tcp.Header {
+	headerBytes := make([]byte, headerLen)
+	n, err := in.Read(headerBytes)
+	if err != nil {
+		log.Fatal("Failed to parse header")
+	}
+	log.Printf("read %d bytes (header) \n", n)
+
+	var header tcp.Header
+	return header.Unmarshal(headerBytes)
+}
+
+func parseBody(in *bytes.Buffer, header tcp.Header, bodyLen int) interface{} {
+	if bodyLen <= 0 {
+		return nil
+	}
+
+	bodyBytes := make([]byte, bodyLen)
+	n, err := in.Read(bodyBytes)
+	if err != nil {
+		log.Fatal("Failed to parse body")
+	}
+	log.Printf("read %d bytes (body) \n", n)
+
+	bodyStr := string(bodyBytes)
+	return deserializeBody(bodyStr, header)
+}
+
+func deserializeBody(bodyStr string, header tcp.Header) interface{} {
+	command := header.Cmd
+	switch command {
+	case tcp.DefaultCommand.HELLO_REQUEST:
+	case tcp.DefaultCommand.RECOMMEND_REQUEST:
+		var useAgent tcp.UserAgent
+		gutils.UnMarshalJsonString(bodyStr, &useAgent)
+		return useAgent
+	case tcp.DefaultCommand.SUBSCRIBE_REQUEST:
+	case tcp.DefaultCommand.UNSUBSCRIBE_REQUEST:
+		return nil
+		//return OBJECT_MAPPER.readValue(bodyJsonString, Subscription.class);
+	case tcp.DefaultCommand.REQUEST_TO_SERVER:
+	case tcp.DefaultCommand.RESPONSE_TO_SERVER:
+	case tcp.DefaultCommand.ASYNC_MESSAGE_TO_SERVER:
+	case tcp.DefaultCommand.BROADCAST_MESSAGE_TO_SERVER:
+	case tcp.DefaultCommand.REQUEST_TO_CLIENT:
+	case tcp.DefaultCommand.RESPONSE_TO_CLIENT:
+	case tcp.DefaultCommand.ASYNC_MESSAGE_TO_CLIENT:
+	case tcp.DefaultCommand.BROADCAST_MESSAGE_TO_CLIENT:
+	case tcp.DefaultCommand.REQUEST_TO_CLIENT_ACK:
+	case tcp.DefaultCommand.RESPONSE_TO_CLIENT_ACK:
+	case tcp.DefaultCommand.ASYNC_MESSAGE_TO_CLIENT_ACK:
+	case tcp.DefaultCommand.BROADCAST_MESSAGE_TO_CLIENT_ACK:
+		// The message string will be deserialized by protocol plugin, if the event is cloudevents, the body is
+		// just a string.
+		return bodyStr
+	case tcp.DefaultCommand.REDIRECT_TO_CLIENT:
+		return nil
+		//return OBJECT_MAPPER.readValue(bodyJsonString, RedirectInfo.class);
+	default:
+		// FIXME improve codes
+		log.Printf("Invalidate TCP command: %s\n", command)
+		return nil
+	}
+
+	return nil
+}
+
+func validateFlag(flagBytes, versionBytes []byte) {
+	// TODO add check
+}
diff --git a/eventmesh-sdk-go/common/protocol/tcp/command.go b/eventmesh-sdk-go/common/protocol/tcp/command.go
new file mode 100644
index 0000000..5d0c668
--- /dev/null
+++ b/eventmesh-sdk-go/common/protocol/tcp/command.go
@@ -0,0 +1,202 @@
+package tcp
+
+type Command string
+
+var DefaultCommand = struct {
+	//heartbeat
+	HEARTBEAT_REQUEST  Command //client send heartbeat packet to server
+	HEARTBEAT_RESPONSE Command //server response heartbeat packet of client
+
+	//handshake
+	HELLO_REQUEST  Command //client send handshake request to server
+	HELLO_RESPONSE Command //server response handshake request of client
+
+	//disconnection
+	CLIENT_GOODBYE_REQUEST  Command //Notify server when client actively disconnects
+	CLIENT_GOODBYE_RESPONSE Command //Server replies to client's active disconnection notification
+	SERVER_GOODBYE_REQUEST  Command //Notify client when server actively disconnects
+	SERVER_GOODBYE_RESPONSE Command //Client replies to server's active disconnection notification
+
+	//subscription management
+	SUBSCRIBE_REQUEST    Command //Subscription request sent by client to server
+	SUBSCRIBE_RESPONSE   Command //Server replies to client's subscription request
+	UNSUBSCRIBE_REQUEST  Command //Unsubscribe request from client to server
+	UNSUBSCRIBE_RESPONSE Command //Server replies to client's unsubscribe request
+
+	//monitor
+	LISTEN_REQUEST  Command //Request from client to server to start topic listening
+	LISTEN_RESPONSE Command //The server replies to the client's listening request
+
+	//RR
+	REQUEST_TO_SERVER      Command //The client sends the RR request to the server
+	REQUEST_TO_CLIENT      Command //The server pushes the RR request to the client
+	REQUEST_TO_CLIENT_ACK  Command //After receiving RR request, the client sends ACK to the server
+	RESPONSE_TO_SERVER     Command //The client sends the RR packet back to the server
+	RESPONSE_TO_CLIENT     Command //The server pushes the RR packet back to the client
+	RESPONSE_TO_CLIENT_ACK Command //After receiving the return packet, the client sends ACK to the server
+
+	//Asynchronous events
+	ASYNC_MESSAGE_TO_SERVER     Command //The client sends asynchronous events to the server
+	ASYNC_MESSAGE_TO_SERVER_ACK Command //After receiving the asynchronous event, the server sends ack to the client
+	ASYNC_MESSAGE_TO_CLIENT     Command //The server pushes asynchronous events to the client
+	ASYNC_MESSAGE_TO_CLIENT_ACK Command //After the client receives the asynchronous event, the ACK is sent to the server
+
+	//radio broadcast
+	BROADCAST_MESSAGE_TO_SERVER     Command //The client sends the broadcast message to the server
+	BROADCAST_MESSAGE_TO_SERVER_ACK Command //After receiving the broadcast message, the server sends ACK to the client
+	BROADCAST_MESSAGE_TO_CLIENT     Command //The server pushes the broadcast message to the client
+	BROADCAST_MESSAGE_TO_CLIENT_ACK Command //After the client receives the broadcast message, the ACK is sent to the server
+
+	//Log reporting
+	SYS_LOG_TO_LOGSERVER Command //Business log reporting
+
+	//RMB tracking log reporting
+	TRACE_LOG_TO_LOGSERVER Command //RMB tracking log reporting
+
+	//Redirecting instruction
+	REDIRECT_TO_CLIENT Command //The server pushes the redirection instruction to the client
+
+	//service register
+	REGISTER_REQUEST  Command //Client sends registration request to server
+	REGISTER_RESPONSE Command //The server sends the registration result to the client
+
+	//service unregister
+	UNREGISTER_REQUEST  Command //The client sends a de registration request to the server
+	UNREGISTER_RESPONSE Command //The server will register the result to the client
+
+	//The client asks which EventMesh to recommend
+	RECOMMEND_REQUEST  Command //Client sends recommendation request to server
+	RECOMMEND_RESPONSE Command //The server will recommend the results to the client
+}{
+	//heartbeat
+	HEARTBEAT_REQUEST:  "HEARTBEAT_REQUEST",
+	HEARTBEAT_RESPONSE: "HEARTBEAT_RESPONSE",
+
+	//handshake
+	HELLO_REQUEST:  "HELLO_REQUEST",
+	HELLO_RESPONSE: "HELLO_RESPONSE",
+
+	//disconnection
+	CLIENT_GOODBYE_REQUEST:  "CLIENT_GOODBYE_REQUEST",
+	CLIENT_GOODBYE_RESPONSE: "CLIENT_GOODBYE_RESPONSE",
+	SERVER_GOODBYE_REQUEST:  "SERVER_GOODBYE_REQUEST",
+	SERVER_GOODBYE_RESPONSE: "SERVER_GOODBYE_RESPONSE",
+
+	//subscription management
+	SUBSCRIBE_REQUEST:    "SUBSCRIBE_REQUEST",
+	SUBSCRIBE_RESPONSE:   "SUBSCRIBE_RESPONSE",
+	UNSUBSCRIBE_REQUEST:  "UNSUBSCRIBE_REQUEST",
+	UNSUBSCRIBE_RESPONSE: "UNSUBSCRIBE_RESPONSE",
+
+	//monitor
+	LISTEN_REQUEST:  "LISTEN_REQUEST",
+	LISTEN_RESPONSE: "LISTEN_RESPONSE",
+
+	//RR
+	REQUEST_TO_SERVER:      "REQUEST_TO_SERVER",
+	REQUEST_TO_CLIENT:      "REQUEST_TO_CLIENT",
+	REQUEST_TO_CLIENT_ACK:  "REQUEST_TO_CLIENT_ACK",
+	RESPONSE_TO_SERVER:     "RESPONSE_TO_SERVER",
+	RESPONSE_TO_CLIENT:     "RESPONSE_TO_CLIENT",
+	RESPONSE_TO_CLIENT_ACK: "RESPONSE_TO_CLIENT_ACK",
+
+	//Asynchronous events
+	ASYNC_MESSAGE_TO_SERVER:     "ASYNC_MESSAGE_TO_SERVER",
+	ASYNC_MESSAGE_TO_SERVER_ACK: "ASYNC_MESSAGE_TO_SERVER_ACK",
+	ASYNC_MESSAGE_TO_CLIENT:     "ASYNC_MESSAGE_TO_CLIENT",
+	ASYNC_MESSAGE_TO_CLIENT_ACK: "ASYNC_MESSAGE_TO_CLIENT_ACK",
+
+	//radio broadcast
+	BROADCAST_MESSAGE_TO_SERVER:     "BROADCAST_MESSAGE_TO_SERVER",
+	BROADCAST_MESSAGE_TO_SERVER_ACK: "BROADCAST_MESSAGE_TO_SERVER_ACK",
+	BROADCAST_MESSAGE_TO_CLIENT:     "BROADCAST_MESSAGE_TO_CLIENT",
+	BROADCAST_MESSAGE_TO_CLIENT_ACK: "BROADCAST_MESSAGE_TO_CLIENT_ACK",
+
+	//Log reporting
+	SYS_LOG_TO_LOGSERVER: "SYS_LOG_TO_LOGSERVER",
+
+	//RMB tracking log reporting
+	TRACE_LOG_TO_LOGSERVER: "TRACE_LOG_TO_LOGSERVER",
+
+	//Redirecting instruction
+	REDIRECT_TO_CLIENT: "REDIRECT_TO_CLIENT",
+
+	//service register
+	REGISTER_REQUEST:  "REGISTER_REQUEST",
+	REGISTER_RESPONSE: "REGISTER_RESPONSE",
+
+	//service unregister
+	UNREGISTER_REQUEST:  "UNREGISTER_REQUEST",
+	UNREGISTER_RESPONSE: "UNREGISTER_RESPONSE",
+
+	//The client asks which EventMesh to recommend
+	RECOMMEND_REQUEST:  "RECOMMEND_REQUEST",
+	RECOMMEND_RESPONSE: "RECOMMEND_RESPONSE",
+}
+
+//{
+//	//heartbeat
+//	HEARTBEAT_REQUEST:  0,
+//	HEARTBEAT_RESPONSE: 1,
+//
+//	//handshake
+//	HELLO_REQUEST:  2,
+//	HELLO_RESPONSE: 3,
+//
+//	//disconnection
+//	CLIENT_GOODBYE_REQUEST:  4,
+//	CLIENT_GOODBYE_RESPONSE: 5,
+//	SERVER_GOODBYE_REQUEST:  6,
+//	SERVER_GOODBYE_RESPONSE: 7,
+//
+//	//subscription management
+//	SUBSCRIBE_REQUEST:    8,
+//	SUBSCRIBE_RESPONSE:   9,
+//	UNSUBSCRIBE_REQUEST:  10,
+//	UNSUBSCRIBE_RESPONSE: 11,
+//
+//	//monitor
+//	LISTEN_REQUEST:  12,
+//	LISTEN_RESPONSE: 13,
+//
+//	//RR
+//	REQUEST_TO_SERVER:      14,
+//	REQUEST_TO_CLIENT:      15,
+//	REQUEST_TO_CLIENT_ACK:  16,
+//	RESPONSE_TO_SERVER:     17,
+//	RESPONSE_TO_CLIENT:     18,
+//	RESPONSE_TO_CLIENT_ACK: 19,
+//
+//	//Asynchronous events
+//	ASYNC_MESSAGE_TO_SERVER:     20,
+//	ASYNC_MESSAGE_TO_SERVER_ACK: 21,
+//	ASYNC_MESSAGE_TO_CLIENT:     22,
+//	ASYNC_MESSAGE_TO_CLIENT_ACK: 23,
+//
+//	//radio broadcast
+//	BROADCAST_MESSAGE_TO_SERVER:     24,
+//	BROADCAST_MESSAGE_TO_SERVER_ACK: 25,
+//	BROADCAST_MESSAGE_TO_CLIENT:     26,
+//	BROADCAST_MESSAGE_TO_CLIENT_ACK: 27,
+//
+//	//Log reporting
+//	SYS_LOG_TO_LOGSERVER: 28,
+//
+//	//RMB tracking log reporting
+//	TRACE_LOG_TO_LOGSERVER: 29,
+//
+//	//Redirecting instruction
+//	REDIRECT_TO_CLIENT: 30,
+//
+//	//service register
+//	REGISTER_REQUEST:  31,
+//	REGISTER_RESPONSE: 32,
+//
+//	//service unregister
+//	UNREGISTER_REQUEST:  33,
+//	UNREGISTER_RESPONSE: 34,
+//
+//	//The client asks which EventMesh to recommend
+//	RECOMMEND_REQUEST:  35,
+//	RECOMMEND_RESPONSE: 36,
+//}
diff --git a/eventmesh-sdk-go/common/protocol/tcp/header.go b/eventmesh-sdk-go/common/protocol/tcp/header.go
new file mode 100644
index 0000000..56dc817
--- /dev/null
+++ b/eventmesh-sdk-go/common/protocol/tcp/header.go
@@ -0,0 +1,80 @@
+package tcp
+
+import (
+	"eventmesh/common/utils"
+)
+
+type Header struct {
+	Cmd        Command                `json:"cmd"`
+	Code       int                    `json:"code"`
+	Desc       string                 `json:"desc"`
+	Seq        string                 `json:"seq"`
+	Properties map[string]interface{} `json:"properties"`
+}
+
+func (h Header) PutProperty(name string, value interface{}) {
+	h.Properties[name] = value
+}
+
+func (h Header) GetProperty(name string) interface{} {
+	if h.Properties == nil {
+		return nil
+	}
+
+	if val, ok := h.Properties[name]; ok {
+		return val
+	}
+
+	return nil
+}
+
+func (h Header) Marshal() []byte {
+	newHeader := make(map[string]interface{})
+	newHeader["cmd"] = h.Cmd
+	// Compatible with Java Enum serialization
+	newHeader["command"] = h.Cmd
+	newHeader["code"] = h.Code
+	newHeader["desc"] = h.Desc
+	newHeader["seq"] = h.Seq
+	newHeader["properties"] = h.Properties
+	return utils.MarshalJsonBytes(newHeader)
+}
+
+func (h Header) getVal(key string, headerDict map[string]interface{}) interface{} {
+	if val, ok := headerDict[key]; ok {
+		return val
+	}
+	return nil
+}
+
+func (h Header) Unmarshal(header []byte) Header {
+
+	var headerDict map[string]interface{}
+	utils.UnMarshalJsonBytes(header, &headerDict)
+
+	if val := h.getVal("cmd", headerDict); val != nil {
+		h.Cmd = Command(val.(string))
+	}
+
+	if val := h.getVal("code", headerDict); val != nil {
+		h.Code = int(val.(float64))
+	}
+
+	if val := h.getVal("desc", headerDict); val != nil {
+		h.Desc = val.(string)
+	}
+
+	if val := h.getVal("seq", headerDict); val != nil {
+		h.Seq = val.(string)
+	}
+
+	if val := h.getVal("properties", headerDict); val != nil {
+		h.Properties = val.(map[string]interface{})
+	}
+
+	return h
+}
+
+func NewHeader(cmd Command, code int, desc string, seq string) Header {
+	return Header{Cmd: cmd, Code: code, Desc: desc, Seq: seq, Properties: map[string]interface{}{}}
+}
diff --git a/eventmesh-sdk-go/common/protocol/tcp/package.go b/eventmesh-sdk-go/common/protocol/tcp/package.go
new file mode 100644
index 0000000..e4e05a7
--- /dev/null
+++ b/eventmesh-sdk-go/common/protocol/tcp/package.go
@@ -0,0 +1,10 @@
+package tcp
+
+type Package struct {
+	Header Header      `json:"header"`
+	Body   interface{} `json:"body"`
+}
+
+func NewPackage(header Header) Package {
+	return Package{Header: header}
+}
diff --git a/eventmesh-sdk-go/common/protocol/tcp/user_agent.go b/eventmesh-sdk-go/common/protocol/tcp/user_agent.go
new file mode 100644
index 0000000..b2da802
--- /dev/null
+++ b/eventmesh-sdk-go/common/protocol/tcp/user_agent.go
@@ -0,0 +1,23 @@
+package tcp
+
+type UserAgent struct {
+	Env       string `json:"env"`
+	Subsystem string `json:"subsystem"`
+	Path      string `json:"path"`
+	Pid       int    `json:"pid"`
+	Host      string `json:"host"`
+	Port      int    `json:"port"`
+	Version   string `json:"version"`
+	Username  string `json:"username"`
+	Password  string `json:"password"`
+	Idc       string `json:"idc"`
+	Group     string `json:"group"`
+	Purpose   string `json:"purpose"`
+	Unack     int    `json:"unack"`
+}
+
+func NewUserAgent(env string, subsystem string, path string, pid int, host string, port int, version string,
+	username string, password string, idc string, producerGroup string, consumerGroup string) *UserAgent {
+	return &UserAgent{Env: env, Subsystem: subsystem, Path: path, Pid: pid, Host: host, Port: port, Version: version,
+		Username: username, Password: password, Idc: idc, Group: producerGroup}
+}
diff --git a/eventmesh-sdk-go/common/utils/json_utils.go b/eventmesh-sdk-go/common/utils/json_utils.go
new file mode 100644
index 0000000..8d1a093
--- /dev/null
+++ b/eventmesh-sdk-go/common/utils/json_utils.go
@@ -0,0 +1,29 @@
+package utils
+
+import (
+	"encoding/json"
+	"log"
+)
+
+func MarshalJsonBytes(obj interface{}) []byte {
+	ret, err := json.Marshal(obj)
+	if err != nil {
+		log.Fatal("Failed to marshal json")
+	}
+	return ret
+}
+
+func MarshalJsonString(obj interface{}) string {
+	return string(MarshalJsonBytes(obj))
+}
+
+func UnMarshalJsonBytes(data []byte, obj interface{}) {
+	err := json.Unmarshal(data, obj)
+	if err != nil {
+		log.Fatal("Failed to unmarshal json")
+	}
+}
+
+func UnMarshalJsonString(data string, obj interface{}) {
+	UnMarshalJsonBytes([]byte(data), obj)
+}
diff --git a/eventmesh-sdk-go/examples/http/async_pub_cloudevents.go b/eventmesh-sdk-go/examples/http/async_pub_cloudevents.go
new file mode 100644
index 0000000..c146642
--- /dev/null
+++ b/eventmesh-sdk-go/examples/http/async_pub_cloudevents.go
@@ -0,0 +1,52 @@
+package http
+
+import (
+	"eventmesh/common"
+	"eventmesh/common/utils"
+	"eventmesh/http/conf"
+	"eventmesh/http/producer"
+	cloudevents "github.com/cloudevents/sdk-go/v2"
+	"github.com/google/uuid"
+	"log"
+	"os"
+	"strconv"
+)
+
+func AsyncPubCloudEvents() {
+	eventMeshIPPort := "127.0.0.1" + ":" + "10105"
+	producerGroup := "EventMeshTest-producerGroup"
+	topic := "TEST-TOPIC-HTTP-ASYNC"
+	env := "P"
+	idc := "FT"
+	subSys := "1234"
+	// FIXME Get ip dynamically
+	localIp := "127.0.0.1"
+
+	// (Deep) Copy of default config
+	eventMeshClientConfig := conf.DefaultEventMeshHttpClientConfig
+	eventMeshClientConfig.SetLiteEventMeshAddr(eventMeshIPPort)
+	eventMeshClientConfig.SetProducerGroup(producerGroup)
+	eventMeshClientConfig.SetEnv(env)
+	eventMeshClientConfig.SetIdc(idc)
+	eventMeshClientConfig.SetSys(subSys)
+	eventMeshClientConfig.SetIp(localIp)
+	eventMeshClientConfig.SetPid(strconv.Itoa(os.Getpid()))
+
+	// Make event to send
+	event := cloudevents.NewEvent()
+	event.SetID(uuid.New().String())
+	event.SetSubject(topic)
+	event.SetSource("example/uri")
+	event.SetType(common.Constants.CLOUD_EVENTS_PROTOCOL_NAME)
+	event.SetExtension(common.Constants.EVENTMESH_MESSAGE_CONST_TTL, strconv.Itoa(4*1000))
+	event.SetDataContentType(cloudevents.ApplicationCloudEventsJSON)
+	data := map[string]string{"hello": "EventMesh"}
+	err := event.SetData(cloudevents.ApplicationCloudEventsJSON, utils.MarshalJsonBytes(data))
+	if err != nil {
+		log.Fatalf("Failed to set cloud event data, error: %v", err)
+	}
+
+	// Publish event
+	httpProducer := producer.NewEventMeshHttpProducer(eventMeshClientConfig)
+	httpProducer.Publish(event)
+}
diff --git a/eventmesh-sdk-go/examples/http/sub_cloudevents.go b/eventmesh-sdk-go/examples/http/sub_cloudevents.go
new file mode 100644
index 0000000..35fd3e1
--- /dev/null
+++ b/eventmesh-sdk-go/examples/http/sub_cloudevents.go
@@ -0,0 +1,99 @@
+package http
+
+import (
+	"eventmesh/common/protocol"
+	"eventmesh/common/utils"
+	"eventmesh/http/conf"
+	"eventmesh/http/consumer"
+	cloudevents "github.com/cloudevents/sdk-go/v2"
+	"log"
+	"net/http"
+	"os"
+	"strconv"
+	"strings"
+)
+
+func SubCloudEvents() {
+	eventMeshIPPort := "127.0.0.1" + ":" + "10105"
+	consumerGroup := "EventMeshTest-consumerGroup"
+	topic := "TEST-TOPIC-HTTP-ASYNC"
+	env := "P"
+	idc := "FT"
+	subSys := "1234"
+	// FIXME Get ip dynamically
+	localIp := "127.0.0.1"
+	localPort := 8090
+
+	subscribeUrl := "http://" + localIp + ":" + strconv.Itoa(localPort) + "/hello"
+	topicList := []protocol.SubscriptionItem{
+		{
+			Topic: topic,
+			Mode:  protocol.DefaultSubscriptionMode.CLUSTERING,
+			Type:  protocol.DefaultSubscriptionType.ASYNC,
+		},
+	}
+
+	// Callback handle
+	exit := make(chan bool)
+	go httpServer(localIp, localPort, exit)
+
+	// (Deep) Copy of default config
+	eventMeshClientConfig := conf.DefaultEventMeshHttpClientConfig
+	eventMeshClientConfig.SetLiteEventMeshAddr(eventMeshIPPort)
+	eventMeshClientConfig.SetConsumerGroup(consumerGroup)
+	eventMeshClientConfig.SetEnv(env)
+	eventMeshClientConfig.SetIdc(idc)
+	eventMeshClientConfig.SetSys(subSys)
+	eventMeshClientConfig.SetIp(localIp)
+	eventMeshClientConfig.SetPid(strconv.Itoa(os.Getpid()))
+
+	// Subscribe
+	eventMeshHttpConsumer := consumer.NewEventMeshHttpConsumer(eventMeshClientConfig)
+	eventMeshHttpConsumer.Subscribe(topicList, subscribeUrl)
+	eventMeshHttpConsumer.HeartBeat(topicList, subscribeUrl)
+
+	// FIXME Add unsubscribe
+
+	// Wait for exit
+	<-exit
+}
+
+func httpServer(ip string, port int, exit chan<- bool) {
+	http.HandleFunc("/hello", hello)
+	err := http.ListenAndServe(ip+":"+strconv.Itoa(port), nil)
+	if err != nil {
+		log.Fatalf("Failed to launch a callback http server, error: %v", err)
+	}
+
+	exit <- true
+}
+
+func hello(w http.ResponseWriter, r *http.Request) {
+	if r.URL.Path != "/hello" {
+		http.NotFound(w, r)
+		return
+	}
+
+	switch r.Method {
+	case "POST":
+		contentType := r.Header.Get("Content-Type")
+
+		// FIXME Now we only support post form
+		if strings.Contains(contentType, "application/x-www-form-urlencoded") {
+			err := r.ParseForm()
+			if err != nil {
+				log.Printf("Failed to parse post form parameter, error: %v", err)
+			}
+			content := r.FormValue("content")
+			event := cloudevents.NewEvent()
+			utils.UnMarshalJsonString(content, &event)
+			log.Printf("Received data from eventmesh server: %v", string(event.Data()))
+			return
+		}
+
+		w.WriteHeader(http.StatusUnsupportedMediaType)
+	default:
+		w.WriteHeader(http.StatusNotImplemented)
+		w.Write([]byte(http.StatusText(http.StatusNotImplemented)))
+	}
+}
diff --git a/eventmesh-sdk-go/examples/tcp/async_pub_cloudevents.go b/eventmesh-sdk-go/examples/tcp/async_pub_cloudevents.go
new file mode 100644
index 0000000..62fd2c3
--- /dev/null
+++ b/eventmesh-sdk-go/examples/tcp/async_pub_cloudevents.go
@@ -0,0 +1,45 @@
+package tcp
+
+import (
+	"eventmesh/common"
+	"eventmesh/common/protocol"
+	gtcp "eventmesh/common/protocol/tcp"
+	"eventmesh/common/utils"
+	"eventmesh/tcp"
+	"time"
+
+	//"eventmesh/tcp/common"
+	"eventmesh/tcp/conf"
+	cloudevents "github.com/cloudevents/sdk-go/v2"
+	"github.com/google/uuid"
+	"strconv"
+)
+
+func AsyncPubCloudEvents() {
+	eventMeshIp := "127.0.0.1"
+	eventMeshTcpPort := 10000
+	topic := "TEST-TOPIC-TCP-ASYNC"
+
+	// Init client
+	userAgent := gtcp.UserAgent{Env: "test", Subsystem: "5023", Path: "/data/app/umg_proxy", Pid: 32893,
+		Host: "127.0.0.1", Port: 8362, Version: "2.0.11", Username: "PU4283", Password: "PUPASS", Idc: "FT",
+		Group: "EventmeshTestGroup", Purpose: "pub"}
+	config := conf.NewEventMeshTCPClientConfig(eventMeshIp, eventMeshTcpPort, userAgent)
+	client := tcp.CreateEventMeshTCPClient(*config, protocol.DefaultMessageType.CloudEvent)
+	client.Init()
+
+	// Make event to send
+	event := cloudevents.NewEvent()
+	event.SetID(uuid.New().String())
+	event.SetSubject(topic)
+	event.SetSource("example/uri")
+	event.SetType(common.Constants.CLOUD_EVENTS_PROTOCOL_NAME)
+	event.SetExtension(common.Constants.EVENTMESH_MESSAGE_CONST_TTL, strconv.Itoa(4*1000))
+	event.SetDataContentType(cloudevents.ApplicationCloudEventsJSON)
+	data := map[string]string{"hello": "EventMesh"}
+	event.SetData(cloudevents.ApplicationCloudEventsJSON, utils.MarshalJsonBytes(data))
+
+	// Publish event
+	client.Publish(event, 10000)
+	time.Sleep(10 * time.Second)
+}
diff --git a/eventmesh-sdk-go/go.mod b/eventmesh-sdk-go/go.mod
new file mode 100644
index 0000000..8a12b03
--- /dev/null
+++ b/eventmesh-sdk-go/go.mod
@@ -0,0 +1,8 @@
+module eventmesh
+
+go 1.16
+
+require (
+	github.com/cloudevents/sdk-go/v2 v2.6.0 // indirect
+	github.com/google/uuid v1.3.0 // indirect
+)
diff --git a/eventmesh-sdk-go/go.sum b/eventmesh-sdk-go/go.sum
new file mode 100644
index 0000000..f490f71
--- /dev/null
+++ b/eventmesh-sdk-go/go.sum
@@ -0,0 +1,39 @@
+github.com/cloudevents/sdk-go/v2 v2.6.0 h1:yp6zLEvhXSi6P25zzfgORgFI0quG2/NXoH9QoHzvKn8=
+github.com/cloudevents/sdk-go/v2 v2.6.0/go.mod h1:nlXhgFkf0uTopxmRXalyMwS2LG70cRGPrxzmjJgSG0U=
+github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
+github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
+github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
+github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY=
+github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
+github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
+github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
+github.com/json-iterator/go v1.1.10 h1:Kz6Cvnvv2wGdaG/V8yMvfkmNiXq9Ya2KUv4rouJJr68=
+github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
+github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
+github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
+github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
+github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 h1:ZqeYNhU3OHLH3mGKHDcjJRFFRrJa6eAM5H+CtDdOsPc=
+github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
+github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742 h1:Esafd1046DLDQ0W1YjYsBW+p8U2u7vzgW2SQVmlNazg=
+github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
+github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
+github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
+github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
+github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
+github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
+github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
+github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
+go.uber.org/atomic v1.4.0 h1:cxzIVoETapQEqDhQu3QfnvXAV4AlzcvUCxkVUFw3+EU=
+go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
+go.uber.org/multierr v1.1.0 h1:HoEmRHQPVSqub6w2z2d2EOVs2fjyFRGyofhKuyDq0QI=
+go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
+go.uber.org/zap v1.10.0 h1:ORx85nbTijNz8ljznvCMR1ZBIPKFn3jQrag10X2AsuM=
+go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q=
+golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
+gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
diff --git a/eventmesh-sdk-go/http/abstract_http_client.go b/eventmesh-sdk-go/http/abstract_http_client.go
new file mode 100644
index 0000000..d31b09d
--- /dev/null
+++ b/eventmesh-sdk-go/http/abstract_http_client.go
@@ -0,0 +1,43 @@
+package http
+
+import (
+	gcommon "eventmesh/common"
+	"eventmesh/http/conf"
+	nethttp "net/http"
+	"time"
+)
+
+type AbstractHttpClient struct {
+	EventMeshHttpClientConfig conf.EventMeshHttpClientConfig
+	HttpClient                *nethttp.Client
+}
+
+func NewAbstractHttpClient(eventMeshHttpClientConfig conf.EventMeshHttpClientConfig) *AbstractHttpClient {
+	c := &AbstractHttpClient{EventMeshHttpClientConfig: eventMeshHttpClientConfig}
+	c.HttpClient = c.SetHttpClient()
+	return c
+}
+
+func (c *AbstractHttpClient) Close() {
+	//	Http Client does not need to close explicitly
+}
+
+func (c *AbstractHttpClient) SetHttpClient() *nethttp.Client {
+	if !c.EventMeshHttpClientConfig.UseTls() {
+		return &nethttp.Client{Timeout: 100 * time.Second}
+	}
+
+	// Use TLS
+	return &nethttp.Client{Timeout: 100 * time.Second}
+}
+
+func (c *AbstractHttpClient) SelectEventMesh() string {
+	// FIXME Add load balance support
+	uri := c.EventMeshHttpClientConfig.LiteEventMeshAddr()
+
+	if c.EventMeshHttpClientConfig.UseTls() {
+		return gcommon.Constants.HTTPS_PROTOCOL_PREFIX + uri
+	}
+
+	return gcommon.Constants.HTTP_PROTOCOL_PREFIX + uri
+}
diff --git a/eventmesh-sdk-go/http/conf/eventmesh_http_client_config.go b/eventmesh-sdk-go/http/conf/eventmesh_http_client_config.go
new file mode 100644
index 0000000..27ba603
--- /dev/null
+++ b/eventmesh-sdk-go/http/conf/eventmesh_http_client_config.go
@@ -0,0 +1,144 @@
+package conf
+
+type EventMeshHttpClientConfig struct {
+	//The event server address list
+	//	If it's a cluster, please use ; to split, and the address format is related to loadBalanceType.
+	//	E.g.
+	//	If you use Random strategy, the format like: 127.0.0.1:10105;127.0.0.2:10105
+	//	If you use weighted round robin or weighted random strategy, the format like: 127.0.0.1:10105:1;127.0.0.2:10105:2
+	liteEventMeshAddr string
+	// TODO support load balance
+	//loadBalanceType string
+	consumeThreadCore int
+	consumeThreadMax  int
+	env               string
+	consumerGroup     string
+	producerGroup     string
+	idc               string
+	ip                string
+	pid               string
+	sys               string
+	userName          string
+	password          string
+	useTls            bool
+}
+
+func (e *EventMeshHttpClientConfig) LiteEventMeshAddr() string {
+	return e.liteEventMeshAddr
+}
+
+func (e *EventMeshHttpClientConfig) SetLiteEventMeshAddr(liteEventMeshAddr string) {
+	e.liteEventMeshAddr = liteEventMeshAddr
+}
+
+func (e *EventMeshHttpClientConfig) ConsumeThreadCore() int {
+	return e.consumeThreadCore
+}
+
+func (e *EventMeshHttpClientConfig) SetConsumeThreadCore(consumeThreadCore int) {
+	e.consumeThreadCore = consumeThreadCore
+}
+
+func (e *EventMeshHttpClientConfig) ConsumeThreadMax() int {
+	return e.consumeThreadMax
+}
+
+func (e *EventMeshHttpClientConfig) SetConsumeThreadMax(consumeThreadMax int) {
+	e.consumeThreadMax = consumeThreadMax
+}
+
+func (e *EventMeshHttpClientConfig) Env() string {
+	return e.env
+}
+
+func (e *EventMeshHttpClientConfig) SetEnv(env string) {
+	e.env = env
+}
+
+func (e *EventMeshHttpClientConfig) ConsumerGroup() string {
+	return e.consumerGroup
+}
+
+func (e *EventMeshHttpClientConfig) SetConsumerGroup(consumerGroup string) {
+	e.consumerGroup = consumerGroup
+}
+
+func (e *EventMeshHttpClientConfig) ProducerGroup() string {
+	return e.producerGroup
+}
+
+func (e *EventMeshHttpClientConfig) SetProducerGroup(producerGroup string) {
+	e.producerGroup = producerGroup
+}
+
+func (e *EventMeshHttpClientConfig) Idc() string {
+	return e.idc
+}
+
+func (e *EventMeshHttpClientConfig) SetIdc(idc string) {
+	e.idc = idc
+}
+
+func (e *EventMeshHttpClientConfig) Ip() string {
+	return e.ip
+}
+
+func (e *EventMeshHttpClientConfig) SetIp(ip string) {
+	e.ip = ip
+}
+
+func (e *EventMeshHttpClientConfig) Pid() string {
+	return e.pid
+}
+
+func (e *EventMeshHttpClientConfig) SetPid(pid string) {
+	e.pid = pid
+}
+
+func (e *EventMeshHttpClientConfig) Sys() string {
+	return e.sys
+}
+
+func (e *EventMeshHttpClientConfig) SetSys(sys string) {
+	e.sys = sys
+}
+
+func (e *EventMeshHttpClientConfig) UserName() string {
+	return e.userName
+}
+
+func (e *EventMeshHttpClientConfig) SetUserName(userName string) {
+	e.userName = userName
+}
+
+func (e *EventMeshHttpClientConfig) Password() string {
+	return e.password
+}
+
+func (e *EventMeshHttpClientConfig) SetPassword(password string) {
+	e.password = password
+}
+
+func (e *EventMeshHttpClientConfig) UseTls() bool {
+	return e.useTls
+}
+
+func (e *EventMeshHttpClientConfig) SetUseTls(useTls bool) {
+	e.useTls = useTls
+}
+
+var DefaultEventMeshHttpClientConfig = EventMeshHttpClientConfig{
+	liteEventMeshAddr: "127.0.0.1:10105",
+	consumeThreadCore: 2,
+	consumeThreadMax:  5,
+	env:               "",
+	consumerGroup:     "DefaultConsumerGroup",
+	producerGroup:     "DefaultProducerGroup",
+	idc:               "",
+	ip:                "",
+	pid:               "",
+	sys:               "",
+	userName:          "",
+	password:          "",
+	useTls:            false,
+}
diff --git a/eventmesh-sdk-go/http/consumer/eventmesh_http_consumer.go b/eventmesh-sdk-go/http/consumer/eventmesh_http_consumer.go
new file mode 100644
index 0000000..62ba2a6
--- /dev/null
+++ b/eventmesh-sdk-go/http/consumer/eventmesh_http_consumer.go
@@ -0,0 +1,95 @@
+package consumer
+
+import (
+	gcommon "eventmesh/common"
+	"eventmesh/common/protocol"
+	"eventmesh/common/protocol/http/body/client"
+	"eventmesh/common/protocol/http/common"
+	gutils "eventmesh/common/utils"
+	"eventmesh/http"
+	"eventmesh/http/conf"
+	"eventmesh/http/model"
+	"eventmesh/http/utils"
+	"log"
+	nethttp "net/http"
+	"strconv"
+	"time"
+)
+
+type EventMeshHttpConsumer struct {
+	*http.AbstractHttpClient
+	subscriptions []protocol.SubscriptionItem
+}
+
+func NewEventMeshHttpConsumer(eventMeshHttpClientConfig conf.EventMeshHttpClientConfig) *EventMeshHttpConsumer {
+	c := &EventMeshHttpConsumer{AbstractHttpClient: http.NewAbstractHttpClient(eventMeshHttpClientConfig)}
+	c.subscriptions = make([]protocol.SubscriptionItem, 1000)
+	return c
+}
+
+func (e *EventMeshHttpConsumer) HeartBeat(topicList []protocol.SubscriptionItem, subscribeUrl string) {
+
+	// FIXME check topicList, subscribeUrl is not blank
+
+	for range time.Tick(time.Duration(gcommon.Constants.HEARTBEAT) * time.Millisecond) {
+
+		var heartbeatEntities []client.HeartbeatEntity
+		for _, item := range topicList {
+			entity := client.HeartbeatEntity{Topic: item.Topic, Url: subscribeUrl}
+			heartbeatEntities = append(heartbeatEntities, entity)
+		}
+
+		requestParam := e.buildCommonRequestParam()
+		requestParam.AddHeader(common.ProtocolKey.REQUEST_CODE, strconv.Itoa(common.DefaultRequestCode.HEARTBEAT.RequestCode))
+		// FIXME Java is name of SUB name
+		//requestParam.AddBody(client.HeartbeatRequestBodyKey.CLIENTTYPE, common.DefaultClientType.SUB.name())
+		requestParam.AddBody(client.HeartbeatRequestBodyKey.CLIENTTYPE, "SUB")
+		requestParam.AddBody(client.HeartbeatRequestBodyKey.HEARTBEATENTITIES, gutils.MarshalJsonString(heartbeatEntities))
+
+		target := e.SelectEventMesh()
+		resp := utils.HttpPost(e.HttpClient, target, requestParam)
+		var ret http.EventMeshRetObj
+		gutils.UnMarshalJsonString(resp, &ret)
+		if ret.RetCode != common.DefaultEventMeshRetCode.SUCCESS.RetCode {
+			log.Fatalf("Request failed, error code: %d", ret.RetCode)
+		}
+
+	}
+
+}
+
+func (e *EventMeshHttpConsumer) Subscribe(topicList []protocol.SubscriptionItem, subscribeUrl string) {
+
+	// FIXME check topicList, subscribeUrl is not blank
+
+	requestParam := e.buildCommonRequestParam()
+	requestParam.AddHeader(common.ProtocolKey.REQUEST_CODE, strconv.Itoa(common.DefaultRequestCode.SUBSCRIBE.RequestCode))
+	requestParam.AddBody(client.SubscribeRequestBodyKey.TOPIC, gutils.MarshalJsonString(topicList))
+	requestParam.AddBody(client.SubscribeRequestBodyKey.URL, subscribeUrl)
+	requestParam.AddBody(client.SubscribeRequestBodyKey.CONSUMERGROUP, e.EventMeshHttpClientConfig.ConsumerGroup())
+
+	target := e.SelectEventMesh()
+	resp := utils.HttpPost(e.HttpClient, target, requestParam)
+	var ret http.EventMeshRetObj
+	gutils.UnMarshalJsonString(resp, &ret)
+	if ret.RetCode != common.DefaultEventMeshRetCode.SUCCESS.RetCode {
+		log.Fatalf("Request failed, error code: %d", ret.RetCode)
+	}
+	e.subscriptions = append(e.subscriptions, topicList...)
+}
+
+func (e *EventMeshHttpConsumer) buildCommonRequestParam() *model.RequestParam {
+	param := model.NewRequestParam(nethttp.MethodPost)
+	param.AddHeader(common.ProtocolKey.ClientInstanceKey.ENV, e.EventMeshHttpClientConfig.Env())
+	param.AddHeader(common.ProtocolKey.ClientInstanceKey.IDC, e.EventMeshHttpClientConfig.Idc())
+	param.AddHeader(common.ProtocolKey.ClientInstanceKey.IP, e.EventMeshHttpClientConfig.Ip())
+	param.AddHeader(common.ProtocolKey.ClientInstanceKey.PID, e.EventMeshHttpClientConfig.Pid())
+	param.AddHeader(common.ProtocolKey.ClientInstanceKey.SYS, e.EventMeshHttpClientConfig.Sys())
+	param.AddHeader(common.ProtocolKey.ClientInstanceKey.USERNAME, e.EventMeshHttpClientConfig.UserName())
+	param.AddHeader(common.ProtocolKey.ClientInstanceKey.PASSWORD, e.EventMeshHttpClientConfig.Password())
+	param.AddHeader(common.ProtocolKey.VERSION, common.DefaultProtocolVersion.V1.Version())
+	param.AddHeader(common.ProtocolKey.LANGUAGE, gcommon.Constants.LANGUAGE_GO)
+	param.SetTimeout(gcommon.Constants.DEFAULT_HTTP_TIME_OUT)
+	param.AddBody(client.HeartbeatRequestBodyKey.CONSUMERGROUP, e.EventMeshHttpClientConfig.ConsumerGroup())
+	return param
+}
diff --git a/eventmesh-sdk-go/http/eventmesh_ret_obj.go b/eventmesh-sdk-go/http/eventmesh_ret_obj.go
new file mode 100644
index 0000000..9cab9fe
--- /dev/null
+++ b/eventmesh-sdk-go/http/eventmesh_ret_obj.go
@@ -0,0 +1,7 @@
+package http
+
+type EventMeshRetObj struct {
+	ResTime int64  `json:"resTime"`
+	RetCode int    `json:"retCode"`
+	RetMsg  string `json:"retMsg"`
+}
diff --git a/eventmesh-sdk-go/http/model/request_param.go b/eventmesh-sdk-go/http/model/request_param.go
new file mode 100644
index 0000000..75b9d8a
--- /dev/null
+++ b/eventmesh-sdk-go/http/model/request_param.go
@@ -0,0 +1,53 @@
+package model
+
+type HttpMethod string
+
+type RequestParam struct {
+	queryParams map[string][]string
+	httpMethod  HttpMethod
+	body        map[string]string
+	headers     map[string]string
+	timeout     int64
+}
+
+func NewRequestParam(httpMethod HttpMethod) *RequestParam {
+	return &RequestParam{httpMethod: httpMethod}
+}
+
+func (r *RequestParam) QueryParams() map[string][]string {
+	return r.queryParams
+}
+
+func (r *RequestParam) SetQueryParams(queryParams map[string][]string) {
+	r.queryParams = queryParams
+}
+
+func (r *RequestParam) Body() map[string]string {
+	return r.body
+}
+
+func (r *RequestParam) AddBody(key, value string) {
+	if r.body == nil {
+		r.body = make(map[string]string)
+	}
+	r.body[key] = value
+}
+
+func (r *RequestParam) Headers() map[string]string {
+	return r.headers
+}
+
+func (r *RequestParam) AddHeader(key string, object interface{}) {
+	if r.headers == nil {
+		r.headers = make(map[string]string)
+	}
+	r.headers[key] = object.(string)
+}
+
+func (r *RequestParam) Timeout() int64 {
+	return r.timeout
+}
+
+func (r *RequestParam) SetTimeout(timeout int64) {
+	r.timeout = timeout
+}
diff --git a/eventmesh-sdk-go/http/producer/cloudevent_producer.go b/eventmesh-sdk-go/http/producer/cloudevent_producer.go
new file mode 100644
index 0000000..17a178c
--- /dev/null
+++ b/eventmesh-sdk-go/http/producer/cloudevent_producer.go
@@ -0,0 +1,85 @@
+package producer
+
+import (
+	gcommon "eventmesh/common"
+	"eventmesh/common/protocol/http/common"
+	"eventmesh/common/protocol/http/message"
+	gutils "eventmesh/common/utils"
+	"eventmesh/http"
+	"eventmesh/http/conf"
+	"eventmesh/http/model"
+	"eventmesh/http/utils"
+	cloudevents "github.com/cloudevents/sdk-go/v2"
+	"log"
+	nethttp "net/http"
+	"strconv"
+)
+
+type CloudEventProducer struct {
+	*http.AbstractHttpClient
+}
+
+func NewCloudEventProducer(eventMeshHttpClientConfig conf.EventMeshHttpClientConfig) *CloudEventProducer {
+	c := &CloudEventProducer{AbstractHttpClient: http.NewAbstractHttpClient(eventMeshHttpClientConfig)}
+	return c
+}
+
+func (c *CloudEventProducer) Publish(event cloudevents.Event) {
+	enhancedEvent := c.enhanceCloudEvent(event)
+	requestParam := c.buildCommonPostParam(enhancedEvent)
+	requestParam.AddHeader(common.ProtocolKey.REQUEST_CODE, strconv.Itoa(common.DefaultRequestCode.MSG_SEND_ASYNC.RequestCode))
+
+	target := c.SelectEventMesh()
+	resp := utils.HttpPost(c.HttpClient, target, requestParam)
+	var ret http.EventMeshRetObj
+	gutils.UnMarshalJsonString(resp, &ret)
+	if ret.RetCode != common.DefaultEventMeshRetCode.SUCCESS.RetCode {
+		log.Fatalf("Request failed, error code: %d", ret.RetCode)
+	}
+}
+
+func (c *CloudEventProducer) buildCommonPostParam(event cloudevents.Event) *model.RequestParam {
+
+	eventBytes, err := event.MarshalJSON()
+	if err != nil {
+		log.Fatal("Failed to marshal cloudevent")
+	}
+	content := string(eventBytes)
+
+	requestParam := model.NewRequestParam(nethttp.MethodPost)
+	requestParam.AddHeader(common.ProtocolKey.ClientInstanceKey.ENV, c.EventMeshHttpClientConfig.Env())
+	requestParam.AddHeader(common.ProtocolKey.ClientInstanceKey.IDC, c.EventMeshHttpClientConfig.Idc())
+	requestParam.AddHeader(common.ProtocolKey.ClientInstanceKey.IP, c.EventMeshHttpClientConfig.Ip())
+	requestParam.AddHeader(common.ProtocolKey.ClientInstanceKey.PID, c.EventMeshHttpClientConfig.Pid())
+	requestParam.AddHeader(common.ProtocolKey.ClientInstanceKey.SYS, c.EventMeshHttpClientConfig.Sys())
+	requestParam.AddHeader(common.ProtocolKey.ClientInstanceKey.USERNAME, c.EventMeshHttpClientConfig.UserName())
+	requestParam.AddHeader(common.ProtocolKey.ClientInstanceKey.PASSWORD, c.EventMeshHttpClientConfig.Password())
+	requestParam.AddHeader(common.ProtocolKey.LANGUAGE, gcommon.Constants.LANGUAGE_GO)
+	// FIXME Improve constants
+	requestParam.AddHeader(common.ProtocolKey.PROTOCOL_TYPE, "cloudevents")
+	requestParam.AddHeader(common.ProtocolKey.PROTOCOL_DESC, "http")
+	requestParam.AddHeader(common.ProtocolKey.PROTOCOL_VERSION, event.SpecVersion())
+
+	// todo: move producerGroup tp header
+	requestParam.AddBody(message.SendMessageRequestBodyKey.PRODUCERGROUP, c.EventMeshHttpClientConfig.ProducerGroup())
+	requestParam.AddBody(message.SendMessageRequestBodyKey.CONTENT, content)
+
+	return requestParam
+}
+
+func (c *CloudEventProducer) enhanceCloudEvent(event cloudevents.Event) cloudevents.Event {
+	event.SetExtension(common.ProtocolKey.ClientInstanceKey.ENV, c.EventMeshHttpClientConfig.Env())
+	event.SetExtension(common.ProtocolKey.ClientInstanceKey.IDC, c.EventMeshHttpClientConfig.Idc())
+	event.SetExtension(common.ProtocolKey.ClientInstanceKey.IP, c.EventMeshHttpClientConfig.Ip())
+	event.SetExtension(common.ProtocolKey.ClientInstanceKey.PID, c.EventMeshHttpClientConfig.Pid())
+	event.SetExtension(common.ProtocolKey.ClientInstanceKey.SYS, c.EventMeshHttpClientConfig.Sys())
+	// FIXME Random string
+	event.SetExtension(common.ProtocolKey.ClientInstanceKey.BIZSEQNO, "333333")
+	event.SetExtension(common.ProtocolKey.ClientInstanceKey.UNIQUEID, "444444")
+	event.SetExtension(common.ProtocolKey.LANGUAGE, gcommon.Constants.LANGUAGE_GO)
+	// FIXME Java is name of spec version name
+	//event.SetExtension(common.ProtocolKey.PROTOCOL_DESC, event.SpecVersion())
+	event.SetExtension(common.ProtocolKey.PROTOCOL_VERSION, event.SpecVersion())
+
+	return event
+}
diff --git a/eventmesh-sdk-go/http/producer/eventmesh_http_producer.go b/eventmesh-sdk-go/http/producer/eventmesh_http_producer.go
new file mode 100644
index 0000000..dbab724
--- /dev/null
+++ b/eventmesh-sdk-go/http/producer/eventmesh_http_producer.go
@@ -0,0 +1,27 @@
+package producer
+
+import (
+	"eventmesh/http/conf"
+	cloudevents "github.com/cloudevents/sdk-go/v2"
+)
+
+type EventMeshHttpProducer struct {
+	cloudEventProducer *CloudEventProducer
+}
+
+func NewEventMeshHttpProducer(eventMeshHttpClientConfig conf.EventMeshHttpClientConfig) *EventMeshHttpProducer {
+	return &EventMeshHttpProducer{
+		cloudEventProducer: NewCloudEventProducer(eventMeshHttpClientConfig),
+	}
+}
+
+func (e *EventMeshHttpProducer) Publish(eventMeshMessage interface{}) {
+
+	// FIXME Check eventMeshMessage is not nil
+
+	// CloudEvent
+	if _, ok := eventMeshMessage.(cloudevents.Event); ok {
+		event := eventMeshMessage.(cloudevents.Event)
+		e.cloudEventProducer.Publish(event)
+	}
+}
diff --git a/eventmesh-sdk-go/http/producer/eventmesh_protocol_producer.go b/eventmesh-sdk-go/http/producer/eventmesh_protocol_producer.go
new file mode 100644
index 0000000..ab70cb1
--- /dev/null
+++ b/eventmesh-sdk-go/http/producer/eventmesh_protocol_producer.go
@@ -0,0 +1,5 @@
+package producer
+
+type EventMeshProtocolProducer interface {
+	Publish(eventMeshMessage interface{})
+}
diff --git a/eventmesh-sdk-go/http/utils/http_utils.go b/eventmesh-sdk-go/http/utils/http_utils.go
new file mode 100644
index 0000000..4333e84
--- /dev/null
+++ b/eventmesh-sdk-go/http/utils/http_utils.go
@@ -0,0 +1,41 @@
+package utils
+
+import (
+	"eventmesh/http/model"
+	"io/ioutil"
+	nethttp "net/http"
+	"net/url"
+	"strings"
+)
+
+func HttpPost(client *nethttp.Client, uri string, requestParam *model.RequestParam) string {
+
+	data := url.Values{}
+	body := requestParam.Body()
+	for key := range body {
+		data.Set(key, body[key])
+	}
+
+	req, err := nethttp.NewRequest(nethttp.MethodPost, uri, strings.NewReader(data.Encode()))
+	if err != nil {
+	}
+
+	req.Header.Set("Content-Type", "application/x-www-form-urlencoded; charset=UTF-8")
+
+	headers := requestParam.Headers()
+	for header := range headers {
+		req.Header[header] = []string{headers[header]}
+	}
+
+	resp, err := client.Do(req)
+	if err != nil {
+	}
+
+	defer resp.Body.Close()
+
+	respBody, err := ioutil.ReadAll(resp.Body)
+	if err != nil {
+	}
+
+	return string(respBody)
+}
diff --git a/eventmesh-sdk-go/main.go b/eventmesh-sdk-go/main.go
new file mode 100644
index 0000000..1e04a65
--- /dev/null
+++ b/eventmesh-sdk-go/main.go
@@ -0,0 +1,12 @@
+package main
+
+import "eventmesh/examples/tcp"
+
+func main() {
+	// HTTP Test
+	//http.AsyncPubCloudEvents()
+	//http.SubCloudEvents()
+
+	// TCP Test
+	tcp.AsyncPubCloudEvents()
+}
diff --git a/eventmesh-sdk-go/tcp/cloudevent_tcp_client.go b/eventmesh-sdk-go/tcp/cloudevent_tcp_client.go
new file mode 100644
index 0000000..4563997
--- /dev/null
+++ b/eventmesh-sdk-go/tcp/cloudevent_tcp_client.go
@@ -0,0 +1,35 @@
+package tcp
+
+import (
+	gtcp "eventmesh/common/protocol/tcp"
+	"eventmesh/tcp/conf"
+)
+
+type CloudEventTCPClient struct {
+	cloudEventTCPPubClient *CloudEventTCPPubClient
+	cloudEventTCPSubClient *CloudEventTCPSubClient
+}
+
+func NewCloudEventTCPClient(eventMeshTcpClientConfig conf.EventMeshTCPClientConfig) *CloudEventTCPClient {
+	return &CloudEventTCPClient{
+		cloudEventTCPPubClient: NewCloudEventTCPPubClient(eventMeshTcpClientConfig),
+		cloudEventTCPSubClient: NewCloudEventTCPSubClient(eventMeshTcpClientConfig),
+	}
+}
+
+func (c *CloudEventTCPClient) Init() {
+	c.cloudEventTCPPubClient.init()
+	c.cloudEventTCPSubClient.init()
+}
+
+func (c *CloudEventTCPClient) Publish(message interface{}, timeout int64) gtcp.Package {
+	return c.cloudEventTCPPubClient.publish(message, timeout)
+}
+
+func (c *CloudEventTCPClient) GetPubClient() EventMeshTCPPubClient {
+	return c.cloudEventTCPPubClient
+}
+
+func (c *CloudEventTCPClient) GetSubClient() EventMeshTCPSubClient {
+	return c.cloudEventTCPSubClient
+}
diff --git a/eventmesh-sdk-go/tcp/cloudevent_tcp_pub_client.go b/eventmesh-sdk-go/tcp/cloudevent_tcp_pub_client.go
new file mode 100644
index 0000000..ea08329
--- /dev/null
+++ b/eventmesh-sdk-go/tcp/cloudevent_tcp_pub_client.go
@@ -0,0 +1,31 @@
+package tcp
+
+import (
+	"eventmesh/common/protocol/tcp"
+	"eventmesh/tcp/conf"
+	"eventmesh/tcp/utils"
+)
+
+type CloudEventTCPPubClient struct {
+	*BaseTCPClient
+}
+
+func NewCloudEventTCPPubClient(eventMeshTcpClientConfig conf.EventMeshTCPClientConfig) *CloudEventTCPPubClient {
+	return &CloudEventTCPPubClient{BaseTCPClient: NewBaseTCPClient(eventMeshTcpClientConfig)}
+}
+
+func (c CloudEventTCPPubClient) init() {
+	c.Open()
+	c.Hello()
+	c.Heartbeat()
+}
+
+func (c CloudEventTCPPubClient) reconnect() {
+	c.Reconnect()
+	c.Heartbeat()
+}
+
+func (c CloudEventTCPPubClient) publish(message interface{}, timeout int64) tcp.Package {
+	msg := utils.BuildPackage(message, tcp.DefaultCommand.ASYNC_MESSAGE_TO_SERVER)
+	return c.IO(msg, timeout)
+}
diff --git a/eventmesh-sdk-go/tcp/cloudevent_tcp_sub_client.go b/eventmesh-sdk-go/tcp/cloudevent_tcp_sub_client.go
new file mode 100644
index 0000000..c524f19
--- /dev/null
+++ b/eventmesh-sdk-go/tcp/cloudevent_tcp_sub_client.go
@@ -0,0 +1,26 @@
+package tcp
+
+import (
+	"eventmesh/common/protocol"
+	"eventmesh/tcp/conf"
+)
+
+type CloudEventTCPSubClient struct {
+	*BaseTCPClient
+}
+
+func NewCloudEventTCPSubClient(eventMeshTcpClientConfig conf.EventMeshTCPClientConfig) *CloudEventTCPSubClient {
+	return &CloudEventTCPSubClient{BaseTCPClient: NewBaseTCPClient(eventMeshTcpClientConfig)}
+}
+
+func (c CloudEventTCPSubClient) init() {
+	//panic("implement me")
+}
+
+func (c CloudEventTCPSubClient) subscribe(topic string, subscriptionMode protocol.SubscriptionMode, subscriptionType protocol.SubscriptionType) {
+	panic("implement me")
+}
+
+func (c CloudEventTCPSubClient) unsubscribe() {
+	panic("implement me")
+}
diff --git a/eventmesh-sdk-go/tcp/common/eventmesh_common.go b/eventmesh-sdk-go/tcp/common/eventmesh_common.go
new file mode 100644
index 0000000..262f29e
--- /dev/null
+++ b/eventmesh-sdk-go/tcp/common/eventmesh_common.go
@@ -0,0 +1,22 @@
+package common
+
+var EventMeshCommon = struct {
+	// Timeout time shared by the server
+	DEFAULT_TIME_OUT_MILLS int
+
+	// User agent
+	USER_AGENT_PURPOSE_PUB string
+	USER_AGENT_PURPOSE_SUB string
+
+	// Protocol type
+	CLOUD_EVENTS_PROTOCOL_NAME string
+	EM_MESSAGE_PROTOCOL_NAME   string
+	OPEN_MESSAGE_PROTOCOL_NAME string
+}{
+	DEFAULT_TIME_OUT_MILLS:     20 * 1000,
+	USER_AGENT_PURPOSE_PUB:     "pub",
+	USER_AGENT_PURPOSE_SUB:     "sub",
+	CLOUD_EVENTS_PROTOCOL_NAME: "cloudevents",
+	EM_MESSAGE_PROTOCOL_NAME:   "eventmeshmessage",
+	OPEN_MESSAGE_PROTOCOL_NAME: "openmessage",
+}
diff --git a/eventmesh-sdk-go/tcp/common/request_context.go b/eventmesh-sdk-go/tcp/common/request_context.go
new file mode 100644
index 0000000..4c7b681
--- /dev/null
+++ b/eventmesh-sdk-go/tcp/common/request_context.go
@@ -0,0 +1,60 @@
+package common
+
+import (
+	"eventmesh/common/protocol/tcp"
+	"sync"
+)
+
+type RequestContext struct {
+	key      interface{}
+	request  tcp.Package
+	response tcp.Package
+	wg       sync.WaitGroup
+}
+
+func (r *RequestContext) Key() interface{} {
+	return r.key
+}
+
+func (r *RequestContext) SetKey(key interface{}) {
+	r.key = key
+}
+
+func (r *RequestContext) Request() tcp.Package {
+	return r.request
+}
+
+func (r *RequestContext) SetRequest(request tcp.Package) {
+	r.request = request
+}
+
+func (r *RequestContext) Response() tcp.Package {
+	return r.response
+}
+
+func (r *RequestContext) SetResponse(response tcp.Package) {
+	r.response = response
+}
+
+func (r *RequestContext) Wg() sync.WaitGroup {
+	return r.wg
+}
+
+func (r *RequestContext) SetWg(wg sync.WaitGroup) {
+	r.wg = wg
+}
+
+func (r *RequestContext) Finish(message tcp.Package) {
+	r.response = message
+	//r.wg.Done()
+}
+
+func GetRequestContextKey(request tcp.Package) interface{} {
+	return request.Header.Seq
+}
+
+func NewRequestContext(key interface{}, request tcp.Package, latch int) *RequestContext {
+	ctx := &RequestContext{key: key, request: request}
+	//ctx.Wg().Add(latch)
+	return ctx
+}
diff --git a/eventmesh-sdk-go/tcp/conf/eventmesh_tcp_client_config.go b/eventmesh-sdk-go/tcp/conf/eventmesh_tcp_client_config.go
new file mode 100644
index 0000000..b353aa3
--- /dev/null
+++ b/eventmesh-sdk-go/tcp/conf/eventmesh_tcp_client_config.go
@@ -0,0 +1,37 @@
+package conf
+
+import "eventmesh/common/protocol/tcp"
+
+type EventMeshTCPClientConfig struct {
+	host      string
+	port      int
+	userAgent tcp.UserAgent
+}
+
+func NewEventMeshTCPClientConfig(host string, port int, userAgent tcp.UserAgent) *EventMeshTCPClientConfig {
+	return &EventMeshTCPClientConfig{host: host, port: port, userAgent: userAgent}
+}
+
+func (e *EventMeshTCPClientConfig) Host() string {
+	return e.host
+}
+
+func (e *EventMeshTCPClientConfig) SetHost(host string) {
+	e.host = host
+}
+
+func (e *EventMeshTCPClientConfig) Port() int {
+	return e.port
+}
+
+func (e *EventMeshTCPClientConfig) SetPort(port int) {
+	e.port = port
+}
+
+func (e *EventMeshTCPClientConfig) UserAgent() tcp.UserAgent {
+	return e.userAgent
+}
+
+func (e *EventMeshTCPClientConfig) SetUserAgent(userAgent tcp.UserAgent) {
+	e.userAgent = userAgent
+}
diff --git a/eventmesh-sdk-go/tcp/eventmesh_tcp_client.go b/eventmesh-sdk-go/tcp/eventmesh_tcp_client.go
new file mode 100644
index 0000000..ffa3414
--- /dev/null
+++ b/eventmesh-sdk-go/tcp/eventmesh_tcp_client.go
@@ -0,0 +1,10 @@
+package tcp
+
+import gtcp "eventmesh/common/protocol/tcp"
+
+type EventMeshTCPClient interface {
+	Init()
+	Publish(msg interface{}, timeout int64) gtcp.Package
+	GetPubClient() EventMeshTCPPubClient
+	GetSubClient() EventMeshTCPSubClient
+}
diff --git a/eventmesh-sdk-go/tcp/eventmesh_tcp_client_factory.go b/eventmesh-sdk-go/tcp/eventmesh_tcp_client_factory.go
new file mode 100644
index 0000000..a8c8fc9
--- /dev/null
+++ b/eventmesh-sdk-go/tcp/eventmesh_tcp_client_factory.go
@@ -0,0 +1,15 @@
+package tcp
+
+import (
+	"eventmesh/common/protocol"
+	"eventmesh/tcp/conf"
+)
+
+func CreateEventMeshTCPClient(eventMeshTcpClientConfig conf.EventMeshTCPClientConfig, messageType protocol.MessageType) EventMeshTCPClient {
+
+	if messageType == protocol.DefaultMessageType.CloudEvent {
+		return NewCloudEventTCPClient(eventMeshTcpClientConfig)
+	}
+
+	return nil
+}
diff --git a/eventmesh-sdk-go/tcp/eventmesh_tcp_pub_client.go b/eventmesh-sdk-go/tcp/eventmesh_tcp_pub_client.go
new file mode 100644
index 0000000..9bcabf4
--- /dev/null
+++ b/eventmesh-sdk-go/tcp/eventmesh_tcp_pub_client.go
@@ -0,0 +1,9 @@
+package tcp
+
+import gtcp "eventmesh/common/protocol/tcp"
+
+type EventMeshTCPPubClient interface {
+	init()
+	reconnect()
+	publish(message interface{}, timeout int64) gtcp.Package
+}
diff --git a/eventmesh-sdk-go/tcp/eventmesh_tcp_sub_client.go b/eventmesh-sdk-go/tcp/eventmesh_tcp_sub_client.go
new file mode 100644
index 0000000..a496e49
--- /dev/null
+++ b/eventmesh-sdk-go/tcp/eventmesh_tcp_sub_client.go
@@ -0,0 +1,9 @@
+package tcp
+
+import "eventmesh/common/protocol"
+
+type EventMeshTCPSubClient interface {
+	init()
+	subscribe(topic string, subscriptionMode protocol.SubscriptionMode, subscriptionType protocol.SubscriptionType)
+	unsubscribe()
+}
diff --git a/eventmesh-sdk-go/tcp/tcp_client.go b/eventmesh-sdk-go/tcp/tcp_client.go
new file mode 100644
index 0000000..79bf276
--- /dev/null
+++ b/eventmesh-sdk-go/tcp/tcp_client.go
@@ -0,0 +1,129 @@
+package tcp
+
+import (
+	"bufio"
+	"bytes"
+	"eventmesh/common/protocol/tcp"
+	"eventmesh/common/protocol/tcp/codec"
+	"eventmesh/tcp/common"
+	"eventmesh/tcp/conf"
+	"eventmesh/tcp/utils"
+	"io"
+	"log"
+	"math/rand"
+	"net"
+	"strconv"
+)
+
+type BaseTCPClient struct {
+	clientNo int
+	host     string
+	port     int
+	useAgent tcp.UserAgent
+	conn     net.Conn
+}
+
+func NewBaseTCPClient(eventMeshTcpClientConfig conf.EventMeshTCPClientConfig) *BaseTCPClient {
+	return &BaseTCPClient{
+		clientNo: rand.Intn(10000),
+		host:     eventMeshTcpClientConfig.Host(),
+		port:     eventMeshTcpClientConfig.Port(),
+		useAgent: eventMeshTcpClientConfig.UserAgent(),
+	}
+}
+
+func (c *BaseTCPClient) Open() {
+	eventMeshIpAndPort := c.host + ":" + strconv.Itoa(c.port)
+	conn, err := net.Dial("tcp", eventMeshIpAndPort)
+	if err != nil {
+		log.Fatal("Failed to dial")
+	}
+	c.conn = conn
+
+	go c.read()
+}
+
+func (c *BaseTCPClient) Close() {
+	if c.conn != nil {
+		err := c.conn.Close()
+		if err != nil {
+			log.Fatal("Failed to close connection")
+		}
+		c.Goodbye()
+	}
+}
+
+func (c *BaseTCPClient) Heartbeat() {
+	msg := utils.BuildHeartBeatPackage()
+	c.IO(msg, 1000)
+}
+
+func (c *BaseTCPClient) Hello() {
+	msg := utils.BuildHelloPackage(c.useAgent)
+	c.IO(msg, 1000)
+}
+
+func (c *BaseTCPClient) Reconnect() {
+
+}
+
+func (c *BaseTCPClient) Goodbye() {
+
+}
+
+func (c *BaseTCPClient) IsActive() {
+
+}
+
+func (c *BaseTCPClient) read() error {
+	for {
+		var buf bytes.Buffer
+		for {
+			reader := bufio.NewReader(c.conn)
+			msg, isPrefix, err := reader.ReadLine()
+			if err != nil {
+				if err == io.EOF {
+					break
+				}
+				return err
+			}
+
+			buf.Write(msg)
+			if !isPrefix {
+				break
+			}
+		}
+
+		go c.handleRead(&buf)
+	}
+}
+
+func (c *BaseTCPClient) handleRead(in *bytes.Buffer) {
+	decoded := codec.DecodePackage(in)
+	log.Printf("Read from server: %v\n", decoded)
+	// TODO Handle according to the command
+}
+
+func (c *BaseTCPClient) write(message []byte) (int, error) {
+	writer := bufio.NewWriter(c.conn)
+	n, err := writer.Write(message)
+	if err == nil {
+		err = writer.Flush()
+	}
+	return n, err
+}
+
+func (c *BaseTCPClient) Send(message tcp.Package) {
+	out := codec.EncodePackage(message)
+	_, err := c.write(out.Bytes())
+	if err != nil {
+		log.Fatal("Failed to write to peer")
+	}
+}
+
+func (c *BaseTCPClient) IO(message tcp.Package, timeout int64) tcp.Package {
+	key := common.GetRequestContextKey(message)
+	ctx := common.NewRequestContext(key, message, 1)
+	c.Send(message)
+	return ctx.Response()
+}
diff --git a/eventmesh-sdk-go/tcp/utils/message_utils.go b/eventmesh-sdk-go/tcp/utils/message_utils.go
new file mode 100644
index 0000000..1b4e6fb
--- /dev/null
+++ b/eventmesh-sdk-go/tcp/utils/message_utils.go
@@ -0,0 +1,45 @@
+package utils
+
+import (
+	gcommon "eventmesh/common"
+	"eventmesh/common/protocol/tcp"
+	"eventmesh/tcp/common"
+	cloudevents "github.com/cloudevents/sdk-go/v2"
+	"log"
+)
+
+func BuildPackage(message interface{}, command tcp.Command) tcp.Package {
+	// FIXME Support random sequence
+	header := tcp.NewHeader(command, 0, "", "22222")
+	pkg := tcp.NewPackage(header)
+
+	if _, ok := message.(cloudevents.Event); ok {
+		event := message.(cloudevents.Event)
+		eventBytes, err := event.MarshalJSON()
+		if err != nil {
+			log.Fatal("Failed to marshal cloud event")
+		}
+
+		pkg.Header.PutProperty(gcommon.Constants.PROTOCOL_TYPE, common.EventMeshCommon.CLOUD_EVENTS_PROTOCOL_NAME)
+		pkg.Header.PutProperty(gcommon.Constants.PROTOCOL_VERSION, event.SpecVersion())
+		pkg.Header.PutProperty(gcommon.Constants.PROTOCOL_DESC, "tcp")
+		pkg.Body = eventBytes
+	}
+
+	return pkg
+}
+
+func BuildHelloPackage(agent tcp.UserAgent) tcp.Package {
+	// FIXME Support random sequence
+	header := tcp.NewHeader(tcp.DefaultCommand.HELLO_REQUEST, 0, "", "22222")
+	msg := tcp.NewPackage(header)
+	msg.Body = agent
+	return msg
+}
+
+func BuildHeartBeatPackage() tcp.Package {
+	// FIXME Support random sequence
+	header := tcp.NewHeader(tcp.DefaultCommand.HEARTBEAT_REQUEST, 0, "", "22222")
+	msg := tcp.NewPackage(header)
+	return msg
+}

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org