You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by di...@apache.org on 2019/02/26 03:49:01 UTC

[rocketmq-client-go] branch native updated: remote codec (#33)

This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/native by this push:
     new 4a93af0  remote codec (#33)
4a93af0 is described below

commit 4a93af0a18903a48657f95b294f402e647588eb8
Author: 高峰 <ga...@foxmail.com>
AuthorDate: Tue Feb 26 11:48:57 2019 +0800

    remote codec (#33)
    
    * protocol doc fix typo and markdown linter error
    
    * protocol doc update and bug fix
    
    * bug fix for optimize
    
    * refactor remote/codecs.go and remote/codecs_test.go
    * using fix-size type
    * add unit tests
    * update protocl documents and fix typo
    
    * add unit tests and benchmarks with codec
    
    * fix RemotingCommand's decode benchmark test
---
 docs/zh/rocketmq-protocol_zh.md | 129 +++++++++---------
 remote/client.go                |  14 +-
 remote/codec.go                 | 115 +++++++++-------
 remote/codec_test.go            | 292 +++++++++++++++++++++++++++++++++++++++-
 4 files changed, 428 insertions(+), 122 deletions(-)

diff --git a/docs/zh/rocketmq-protocol_zh.md b/docs/zh/rocketmq-protocol_zh.md
index 70cd7a5..818d2e1 100644
--- a/docs/zh/rocketmq-protocol_zh.md
+++ b/docs/zh/rocketmq-protocol_zh.md
@@ -1,34 +1,37 @@
 # RocketMQ 通信协议
 
-在RocketMQ中,`RemotingCommand`是RocketMQ通信的基本对象,Request/Response最后均被包装成`RemotingCommand`。一个`RemotingCommand`
-在被序列化后的格式如下:
-```go
+在 RocketMQ 中,`RemotingCommand` 是 RocketMQ 通信的基本对象,Request/Response 最后均被包装成 `RemotingCommand`。一个 `RemotingCommand` 在被序列化后的格式如下:
+
+```
 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
 + frame_size | header_length |         header_body        |     body     +
 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
 +   4bytes   |     4bytes    | (21 + r_len + e_len) bytes | remain bytes +
 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
 ```
-|item|type|description|
-|:-:|:-:|:-:|
-|frame_size|`int32`|一个`RemotingCommand`数据包大小|
-|header_length|`in32`|高8位表示数据的序列化方式,余下的表示真实header长度|
-|header_body|`[]byte`|header的playload,长度由附带的`remark`和`properties`决定|
-|body|`[]byte`|具体Request/Response的playload|
+
+| item | type | description |
+| :-: | :-: | :-: |
+| frame_size | `int32` | 一个 `RemotingCommand` 数据包大小 |
+| header_length | `int32` | 高8位表示数据的序列化方式,余下的表示真实 header 长度 |
+| header_body | `[]byte` | header 的 payload,长度由附带的 `remark` 和 `properties` 决定|
+| body | `[]byte` | 具体 Request/Response 的 payload |
 
 ## Header
-RocketMQ的Header序列化方式有两种:JSON和RocketMQ私有的序列化方式。JSON序列化方式不再赘述。具体可以参考Java`RemotingCommand`类。
-主要介绍RocketMQ的私有序列化方式。
 
-在序列化的时候,需要将序列化方式记录进数据包里面,即对`header_length`进行编码
+RocketMQ 的 Header 序列化方式有两种:JSON 和 RocketMQ 私有的序列化方式。JSON 序列化方式不再赘述。具体可以参考 Java `RemotingCommand` 类。
+
+主要介绍 RocketMQ 的私有序列化方式。
+
+在序列化的时候,需要将序列化方式记录进数据包里面,即对 `header_length` 进行编码
 
 ```go
 // 编码算法
 
-// 编码后的header_length
+// 编码后的 header_length
 var header_length int32
 
-// 实际的header长度
+// 实际的 header 长度
 var headerDataLen int32
 
 // 序列化方式
@@ -41,7 +44,6 @@ result[2]|byte((headerDataLen >> 8) & 0xFF)
 result[3]|byte(headerDataLen & 0xFF)
 binary.Read(result, binary.BigEndian, &header_length)
 
-
 // 解码算法
 headerDataLen := header_length & 0xFFFFFF
 SerializedType := byte((header_length >> 24) & 0xFF)
@@ -57,56 +59,59 @@ SerializedType := byte((header_length >> 24) & 0xFF)
 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
 ```
 
-|item|type|description|
-|:-:|:-:|:-:|
-|request_code|`short`|哪一种Request或ResponseCode,具体类别由request_flag决定|
-|l_flag|`byte`|language位,用来标识Request来源方的开发语言|
-|v_flag|`int16`|版本标记位|
-|request_flag|`int32`|Header标记位,用来标记该`RemotingCommand`的类型和请求方式|
-|opaque|`int32`|标识Request/Response的RequestID,Broker返回的Response通过该值和Client缓存的Request一一对应|
-|r_len|`int32`| length of remark, remark是Request/Response的附带说明信息,一般在Response中用来说明具体的错误原因|
-|r_body|`[]byte`| playload of remark |
-|e_len|`int32`| length of extended fields,即properties,一些非标准字段会存储在这里,在RocketMQ的各种feature中均有广泛应用|
-|e_body|`int32`| playload of extended fields |
+| item | type | description |
+| :-: | :-: | :-: |
+| request_code | `int16` | 哪一种 Request 或 ResponseCode,具体类别由 request_flag 决定 |
+| l_flag | `byte` | language 位,用来标识Request来源方的开发语言 |
+| v_flag | `int16` | 版本标记位 |
+| request_flag |`int32`| Header标记位,用来标记该 `RemotingCommand` 的类型和请求方式 |
+| opaque | `int32` | 标识 Request/Response 的 RequestID,Broker 返回的 Response 通过该值和 Client 缓存的 Request 一一对应 |
+| r_len | `int32` | length of remark, remark 是 Request/Response 的附带说明信息,一般在 Response 中用来说明具体的错误原因 |
+| r_body | `[]byte` | payload of remark |
+| e_len | `int32` | length of extended fields,即 properties,一些非标准字段会存储在这里,在 RocketMQ 的各种 feature 中均有广泛应用 |
+| e_body | `int32` | payload of extended fields |
 
 ## Body
-`body`是具体的Request/Response的数据,在RocketMQ中,有许多种Request/Response。每个类有自己的序列化和反序列方式,由于种类过多,
-这里就不再展开。可以具体参考Java代码中对`CommandCustomHeader`的使用。下面列一些Client使用到的Request和Response
+
+`body` 是具体的 Request/Response 的数据,在 RocketMQ 中,有许多种 Request/Response。每个类有自己的序列化和反序列方式,由于种类过多,
+这里就不再展开。可以具体参考Java代码中对`CommandCustomHeader`的使用。下面列一些 Client 使用到的 Request 和 Response。
 
 ### RequestCode
-|item|type|description|
-|:-:|:-:|:-:|
-|SEND_MESSAGE|10| 向broker发送消息|
-|PULL_MESSAGE|11| 从broker拉取消息,client的push模式也是通过pull的长轮询来实现的|
-|TODO...|||
+
+| item | type | description |
+| :-: | :-: | :-: |
+| SEND_MESSAGE | 10 | 向broker发送消息 |
+| PULL_MESSAGE | 11 | 从broker拉取消息,client的push模式也是通过pull的长轮询来实现的 |
+| TODO... | | |
 
 ### ResponseCode
-|item|type|description|
-|:-:|:-:|:-:|
-|FLUSH_DISK_TIMEOUT|10|broker存储层刷盘超时|
-|SLAVE_NOT_AVAILABLE|11|slave节点无法服务|
-|FLUSH_SLAVE_TIMEOUT|12|数据同步到slave超时|
-|MESSAGE_ILLEGAL|13|消息格式不合格|
-|SERVICE_NOT_AVAILABLE|14|broker暂时不可用|
-|VERSION_NOT_SUPPORTED|15|不支持的请求,目前没有看到使用|
-|NO_PERMISSION|16|对broker、topic或subscription无访问权限|
-|TOPIC_EXIST_ALREADY|18|topic已存在,目前没看到使用|
-|PULL_NOT_FOUND|19|没拉到消息,大多为offset错误|
-|PULL_RETRY_IMMEDIATELY|20|建议client立即重新拉取消息|
-|PULL_OFFSET_MOVED|21|offset太小或太大|
-|QUERY_NOT_FOUND|22|管理面Response,TODO|
-|SUBSCRIPTION_PARSE_FAILED|23|订阅数据解析失败|
-|SUBSCRIPTION_NOT_EXIST|24|订阅不存在|
-|SUBSCRIPTION_NOT_LATEST|25|订阅数据版本和request数据版本不匹配|
-|SUBSCRIPTION_GROUP_NOT_EXIST|26|订阅组不存在|
-|FILTER_DATA_NOT_EXIST|27|filter数据不存在|
-|FILTER_DATA_NOT_LATEST|28|filter数据版本和request数据版本不匹配|
-|TRANSACTION_SHOULD_COMMIT|200|事务Response,TODO|
-|TRANSACTION_SHOULD_ROLLBACK|201|事务Response,TODO|
-|TRANSACTION_STATE_UNKNOW|202|事务Response,TODO||
-|TRANSACTION_STATE_GROUP_WRONG|203|事务Response,TODO|
-|NO_BUYER_ID|204|不知道是什么,没看到broker端在使用|
-|NOT_IN_CURRENT_UNIT|205|不知道是什么,没看到broker端在使用|
-|CONSUMER_NOT_ONLINE|206|consumer不在线,控制面response|
-|CONSUME_MSG_TIMEOUT|207|client request等待broker相应超时|
-|NO_MESSAGE|208|控制面response,由client自己设置,不清楚具体用途|
\ No newline at end of file
+
+| item | type | description |
+| :-: | :-: | :-: |
+| FLUSH_DISK_TIMEOUT | 10 | broker 存储层刷盘超时 |
+| SLAVE_NOT_AVAILABLE | 11 | slave 节点无法服务 |
+| FLUSH_SLAVE_TIMEOUT | 12 | 数据同步到 slave 超时 |
+| MESSAGE_ILLEGAL | 13 | 消息格式不合格 |
+| SERVICE_NOT_AVAILABLE | 14 | broker 暂时不可用 |
+| VERSION_NOT_SUPPORTED | 15 | 不支持的请求,目前没有看到使用 |
+| NO_PERMISSION | 16 | 对 broker、topic 或 subscription 无访问权限 |
+| TOPIC_EXIST_ALREADY | 18 | topic 已存在,目前没看到使用 |
+| PULL_NOT_FOUND | 19 | 没拉到消息,大多为 offset 错误 |
+| PULL_RETRY_IMMEDIATELY | 20 | 建议 client 立即重新拉取消息 |
+| PULL_OFFSET_MOVED | 21 | offset 太小或太大 |
+| QUERY_NOT_FOUND | 22 | 管理面 Response,TODO |
+| SUBSCRIPTION_PARSE_FAILED | 23 | 订阅数据解析失败 |
+| SUBSCRIPTION_NOT_EXIST | 24 | 订阅不存在 |
+| SUBSCRIPTION_NOT_LATEST | 25 | 订阅数据版本和 request 数据版本不匹配 |
+| SUBSCRIPTION_GROUP_NOT_EXIST | 26 | 订阅组不存在 |
+| FILTER_DATA_NOT_EXIST | 27 | filter 数据不存在 |
+| FILTER_DATA_NOT_LATEST | 28 | filter 数据版本和 request 数据版本不匹配 |
+| TRANSACTION_SHOULD_COMMIT | 200 | 事务 Response,TODO |
+| TRANSACTION_SHOULD_ROLLBACK | 201 | 事务 Response,TODO |
+| TRANSACTION_STATE_UNKNOW | 202 | 事务 Response,TODO | |
+| TRANSACTION_STATE_GROUP_WRONG | 203 | 事务 Response,TODO |
+| NO_BUYER_ID | 204 | 不知道是什么,没看到 broker 端在使用 |
+| NOT_IN_CURRENT_UNIT | 205 | 不知道是什么,没看到 broker 端在使用 |
+| CONSUMER_NOT_ONLINE | 206 | consumer 不在线,控制面 response |
+| CONSUME_MSG_TIMEOUT | 207 | client request 等待 broker 相应超时 |
+| NO_MESSAGE | 208 | 控制面 response,由 client 自己设置,不清楚具体用途 |
diff --git a/remote/client.go b/remote/client.go
index 2342905..2c6805c 100644
--- a/remote/client.go
+++ b/remote/client.go
@@ -19,11 +19,12 @@ package remote
 import (
 	"encoding/binary"
 	"errors"
-	"github.com/apache/rocketmq-client-go/utils"
-	log "github.com/sirupsen/logrus"
 	"net"
 	"sync"
 	"time"
+
+	"github.com/apache/rocketmq-client-go/utils"
+	log "github.com/sirupsen/logrus"
 )
 
 var (
@@ -157,11 +158,12 @@ func (client *defaultClient) invokeOneWay(request *RemotingCommand) error {
 }
 
 func (client *defaultClient) doRequest(header, body []byte) error {
-	var requestBytes []byte
-	requestBytes = append(requestBytes, header...)
-	if body != nil && len(body) > 0 {
-		requestBytes = append(requestBytes, body...)
+	var requestBytes = make([]byte, len(header)+len(body))
+	copy(requestBytes, header)
+	if len(body) > 0 {
+		copy(requestBytes[len(header):], body)
 	}
+
 	_, err := client.conn.Write(requestBytes)
 	return err
 }
diff --git a/remote/codec.go b/remote/codec.go
index b923f01..ca50fa6 100644
--- a/remote/codec.go
+++ b/remote/codec.go
@@ -20,51 +20,48 @@ import (
 	"bytes"
 	"encoding/binary"
 	"encoding/json"
+	"fmt"
+	"sync/atomic"
 )
 
 var opaque int32
 
 const (
 	// 0, REQUEST_COMMAND
-	RpcType = 0
+	RPCType = 0
 
 	// 1, RPC
-	RpcOneWay = 1
+	RPCOneWay = 1
 
+	//ResponseType for reponse
 	ResponseType = 1
 
-	RemotingCommandFlag    = 0
-	languageFlag           = "golang"
-	languageCode           = byte(9)
-	RemotingCommandVersion = 137
+	_Flag         = 0
+	_LanguageFlag = "golang"
+	_LanguageCode = byte(9)
+	_Version      = 137
 )
 
 type RemotingCommand struct {
 	Code      int16             `json:"code"`
-	Language  string            `json:"language"`
+	Language  byte              `json:"language"`
 	Version   int16             `json:"version"`
 	Opaque    int32             `json:"opaque"`
-	Flag      int               `json:"flag"`
+	Flag      int32             `json:"flag"`
 	Remark    string            `json:"remark"`
 	ExtFields map[string]string `json:"extFields"`
-	Body      []byte            `json:"body,omitempty"`
+	Body      []byte            `json:"-"`
 }
 
-type CustomHeader interface {
-	Header() map[string]string
-}
-
-func NewRemotingCommand(code int16, header CustomHeader) *RemotingCommand {
-	rc := &RemotingCommand{
+func NewRemotingCommand(code int16, properties map[string]string, body []byte) *RemotingCommand {
+	return &RemotingCommand{
 		Code:      code,
-		Language:  languageFlag,
-		Version:   RemotingCommandVersion,
-	}
-
-	if header != nil {
-		rc.ExtFields = header.Header()
+		Language:  _LanguageCode,
+		Version:   _Version,
+		Opaque:    atomic.AddInt32(&opaque, 1),
+		ExtFields: properties,
+		Body:      body,
 	}
-	return rc
 }
 
 func (command *RemotingCommand) isResponseType() bool {
@@ -96,9 +93,9 @@ func encode(command *RemotingCommand) ([]byte, error) {
 	)
 
 	switch codecType {
-	case Json:
+	case JsonCodecs:
 		header, err = jsonSerializer.encodeHeader(command)
-	case RocketMQ:
+	case RocketMQCodecs:
 		header, err = rocketMqSerializer.encodeHeader(command)
 	}
 
@@ -108,6 +105,7 @@ func encode(command *RemotingCommand) ([]byte, error) {
 
 	frameSize := 8 + len(header) + len(command.Body)
 	buf := bytes.NewBuffer(make([]byte, frameSize))
+	buf.Reset()
 
 	err = binary.Write(buf, binary.BigEndian, int32(frameSize))
 	if err != nil {
@@ -119,7 +117,12 @@ func encode(command *RemotingCommand) ([]byte, error) {
 		return nil, err
 	}
 
-	err = binary.Write(buf, binary.BigEndian, int32(len(command.Body)))
+	err = binary.Write(buf, binary.BigEndian, header)
+	if err != nil {
+		return nil, err
+	}
+
+	err = binary.Write(buf, binary.BigEndian, command.Body)
 	if err != nil {
 		return nil, err
 	}
@@ -129,39 +132,45 @@ func encode(command *RemotingCommand) ([]byte, error) {
 
 func decode(data []byte) (*RemotingCommand, error) {
 	buf := bytes.NewBuffer(data)
-
-	var oriHeaderLen, headerLength int32
-	err := binary.Read(buf, binary.BigEndian, &oriHeaderLen)
+	var length int32
+	err := binary.Read(buf, binary.BigEndian, &length)
 	if err != nil {
 		return nil, err
 	}
-	headerLength = oriHeaderLen & 0xFFFFFF
+	var oriHeaderLen int32
+	err = binary.Read(buf, binary.BigEndian, &oriHeaderLen)
+	if err != nil {
+		return nil, err
+	}
+	headerLength := oriHeaderLen & 0xFFFFFF
 
 	headerData := make([]byte, headerLength)
-	err = binary.Read(buf, binary.BigEndian, &headerLength)
+	err = binary.Read(buf, binary.BigEndian, &headerData)
 	if err != nil {
 		return nil, err
 	}
 
 	var command *RemotingCommand
 
-	switch byte((oriHeaderLen >> 24) & 0xFF) {
-	case Json:
+	switch codeType := byte((oriHeaderLen >> 24) & 0xFF); codeType {
+	case JsonCodecs:
 		command, err = jsonSerializer.decodeHeader(headerData)
-	case RocketMQ:
+	case RocketMQCodecs:
 		command, err = rocketMqSerializer.decodeHeader(headerData)
+	default:
+		err = fmt.Errorf("unknown codec type: %d", codeType)
 	}
 	if err != nil {
 		return nil, err
 	}
 
-	bodyLength := int32(len(data)) - 4 - headerLength
-	command.Body = make([]byte, bodyLength)
-	err = binary.Read(buf, binary.BigEndian, &command.Body)
+	bodyLength := length - 8 - headerLength
+	bodyData := make([]byte, bodyLength)
+	err = binary.Read(buf, binary.BigEndian, &bodyData)
 	if err != nil {
 		return nil, err
 	}
-
+	command.Body = bodyData
 	return command, nil
 }
 
@@ -175,8 +184,8 @@ func markProtocolType(source int32) []byte {
 }
 
 const (
-	Json     = byte(0)
-	RocketMQ = byte(1)
+	JsonCodecs     = byte(0)
+	RocketMQCodecs = byte(1)
 )
 
 type serializer interface {
@@ -198,6 +207,7 @@ func (c *jsonCodec) encodeHeader(command *RemotingCommand) ([]byte, error) {
 func (c *jsonCodec) decodeHeader(header []byte) (*RemotingCommand, error) {
 	command := &RemotingCommand{}
 	command.ExtFields = make(map[string]string)
+	command.Body = make([]byte, 0)
 	err := json.Unmarshal(header, command)
 	if err != nil {
 		return nil, err
@@ -205,8 +215,8 @@ func (c *jsonCodec) decodeHeader(header []byte) (*RemotingCommand, error) {
 	return command, nil
 }
 
-// rmqCodec implementation of RocketMQ private protocol, please refer to remoting/protocol/RocketMQSerializable
-// RocketMQ Private Protocol Header format:
+// rmqCodec implementation of RocketMQCodecs private protocol, please refer to remoting/protocol/RocketMQSerializable
+// RocketMQCodecs Private Protocol Header format:
 //
 // v_flag: version flag
 // r_len: length of remark body
@@ -234,21 +244,22 @@ func (c *rmqCodec) encodeHeader(command *RemotingCommand) ([]byte, error) {
 	}
 
 	buf := bytes.NewBuffer(make([]byte, headerFixedLength+len(command.Remark)+len(extBytes)))
+	buf.Reset()
 
 	// request code, length is 2 bytes
-	err = binary.Write(buf, binary.BigEndian, command.Code)
+	err = binary.Write(buf, binary.BigEndian, int16(command.Code))
 	if err != nil {
 		return nil, err
 	}
 
 	// language flag, length is 1 byte
-	err = binary.Write(buf, binary.BigEndian, languageCode)
+	err = binary.Write(buf, binary.BigEndian, _LanguageCode)
 	if err != nil {
 		return nil, err
 	}
 
 	// version flag, length is 2 bytes
-	err = binary.Write(buf, binary.BigEndian, command.Version)
+	err = binary.Write(buf, binary.BigEndian, int16(command.Version))
 	if err != nil {
 		return nil, err
 	}
@@ -273,7 +284,7 @@ func (c *rmqCodec) encodeHeader(command *RemotingCommand) ([]byte, error) {
 
 	// write remark, len(command.Remark) bytes
 	if len(command.Remark) > 0 {
-		err = binary.Write(buf, binary.BigEndian, command.Remark)
+		err = binary.Write(buf, binary.BigEndian, []byte(command.Remark))
 		if err != nil {
 			return nil, err
 		}
@@ -326,13 +337,12 @@ func (c *rmqCodec) decodeHeader(data []byte) (*RemotingCommand, error) {
 	var err error
 	command := &RemotingCommand{}
 	buf := bytes.NewBuffer(data)
-	// int code(~32767)
-	err = binary.Read(buf, binary.BigEndian, &command.Code)
+	var code int16
+	err = binary.Read(buf, binary.BigEndian, &code)
 	if err != nil {
 		return nil, err
 	}
-
-	// LanguageCode language
+	command.Code = code
 
 	var (
 		languageCode byte
@@ -343,13 +353,14 @@ func (c *rmqCodec) decodeHeader(data []byte) (*RemotingCommand, error) {
 	if err != nil {
 		return nil, err
 	}
-	//command.Language = languageFlag
+	command.Language = languageCode
 
-	// int version(~32767)
-	err = binary.Read(buf, binary.BigEndian, &command.Version)
+	var version int16
+	err = binary.Read(buf, binary.BigEndian, &version)
 	if err != nil {
 		return nil, err
 	}
+	command.Version = version
 
 	// int opaque
 	err = binary.Read(buf, binary.BigEndian, &command.Opaque)
diff --git a/remote/codec_test.go b/remote/codec_test.go
index aaf065b..62dd528 100644
--- a/remote/codec_test.go
+++ b/remote/codec_test.go
@@ -17,11 +17,299 @@
 package remote
 
 import (
+	"math/rand"
+	"reflect"
 	"testing"
 )
 
-func TestEncode(t *testing.T) {
+func randomBytes(length int) []byte {
+	bs := make([]byte, length)
+	if _, err := rand.Read(bs); err != nil {
+		panic("read random bytes fail")
+	}
+	return bs
 }
 
-func TestDecode(t *testing.T) {
+func randomString(length int) string {
+	bs := make([]byte, length)
+	for i := 0; i < len(bs); i++ {
+		bs[i] = byte(97 + rand.Intn(26))
+	}
+	return string(bs)
+}
+
+func randomNewRemotingCommand() *RemotingCommand {
+	properties := make(map[string]string)
+	for i := 0; i < 10; i++ {
+		properties[randomString(rand.Intn(20))] = randomString(rand.Intn(20))
+	}
+	body := randomBytes(rand.Intn(100))
+	return NewRemotingCommand(int16(rand.Intn(1000)), properties, body)
+}
+
+func Test_encode(t *testing.T) {
+	for i := 0; i < 1000; i++ {
+		rc := randomNewRemotingCommand()
+		if _, err := encode(rc); err != nil {
+			t.Fatalf("encode RemotingCommand to bytes fail: %v", err)
+		}
+	}
+}
+
+func Benchmark_encode(b *testing.B) {
+	rc := randomNewRemotingCommand()
+	b.ResetTimer()
+
+	for i := 0; i < b.N; i++ {
+		if _, err := encode(rc); err != nil {
+			b.Fatalf("encode RemotingCommand to bytes fail: %v", err)
+		}
+	}
+}
+
+func Test_decode(t *testing.T) {
+	for i := 0; i < 1000; i++ {
+		rc := randomNewRemotingCommand()
+
+		bs, err := encode(rc)
+		if err != nil {
+			t.Fatalf("encode RemotingCommand to bytes fail: %v", err)
+		}
+
+		decodedRc, err := decode(bs)
+		if err != nil {
+			t.Fatalf("decode bytes to RemotingCommand fail: %v", err)
+		}
+
+		if !reflect.DeepEqual(*rc, *decodedRc) {
+			t.Fatal("decoded RemotingCommand not equal to the original one")
+		}
+	}
+}
+
+func Benchmark_decode(b *testing.B) {
+	rc := randomNewRemotingCommand()
+	bs, err := encode(rc)
+	if err != nil {
+		b.Fatalf("encode RemotingCommand to bytes fail: %v", err)
+	}
+	b.ResetTimer()
+
+	for i := 0; i < b.N; i++ {
+		if _, err := decode(bs); err != nil {
+			b.Fatalf("decode bytes to RemotingCommand fail: %v", err)
+		}
+	}
+}
+
+func Test_jsonCodec_encodeHeader(t *testing.T) {
+	for i := 0; i < 1000; i++ {
+		rc := randomNewRemotingCommand()
+
+		if _, err := jsonSerializer.encodeHeader(rc); err != nil {
+			t.Fatalf("encode header with jsonCodec fail: %v", err)
+		}
+	}
+}
+
+func Benchmark_jsonCodec_encodeHeader(b *testing.B) {
+	rc := randomNewRemotingCommand()
+	b.ResetTimer()
+
+	for i := 0; i < b.N; i++ {
+		if _, err := jsonSerializer.encodeHeader(rc); err != nil {
+			b.Fatalf("encode header with jsonCodec fail: %v", err)
+		}
+	}
+}
+
+func Test_jsonCodec_decodeHeader(t *testing.T) {
+	for i := 0; i < 1; i++ {
+		rc := randomNewRemotingCommand()
+
+		headers, err := jsonSerializer.encodeHeader(rc)
+		if err != nil {
+			t.Fatalf("encode header with jsonCodec fail: %v", err)
+		}
+
+		decodedRc, err := jsonSerializer.decodeHeader(headers)
+		if err != nil {
+			t.Fatalf("decode header with jsonCodec fail: %v", err)
+		}
+
+		if rc.Code != decodedRc.Code ||
+			rc.Language != decodedRc.Language ||
+			rc.Version != decodedRc.Version ||
+			rc.Opaque != rc.Opaque ||
+			rc.Flag != rc.Flag ||
+			rc.Remark != rc.Remark ||
+			!reflect.DeepEqual(rc.ExtFields, decodedRc.ExtFields) {
+			t.Fatal("decoded RemotingCommand not equal to the original one")
+		}
+	}
+}
+
+func Benchmark_jsonCodec_decodeHeader(b *testing.B) {
+	rc := randomNewRemotingCommand()
+	headers, err := jsonSerializer.encodeHeader(rc)
+	if err != nil {
+		b.Fatalf("encode header with jsonCodec fail: %v", err)
+	}
+	b.ResetTimer()
+
+	for i := 0; i < b.N; i++ {
+		if _, err := jsonSerializer.decodeHeader(headers); err != nil {
+			b.Fatalf("decode header with jsonCodec fail: %v", err)
+		}
+	}
+}
+
+func Test_rmqCodec_encodeHeader(t *testing.T) {
+	for i := 0; i < 1000; i++ {
+		rc := randomNewRemotingCommand()
+
+		if _, err := rocketMqSerializer.encodeHeader(rc); err != nil {
+			t.Fatalf("encode header with rmqCodec fail: %v", err)
+		}
+	}
+}
+
+func Benchmark_rmqCodec_encodeHeader(b *testing.B) {
+	rc := randomNewRemotingCommand()
+	b.ResetTimer()
+
+	for i := 0; i < b.N; i++ {
+		if _, err := rocketMqSerializer.encodeHeader(rc); err != nil {
+			b.Fatalf("encode header with rmqCodec fail: %v", err)
+		}
+	}
+}
+
+func Test_rmqCodec_decodeHeader(t *testing.T) {
+	for i := 0; i < 1; i++ {
+		rc := randomNewRemotingCommand()
+
+		headers, err := rocketMqSerializer.encodeHeader(rc)
+		if err != nil {
+			t.Fatalf("encode header with rmqCodec fail: %v", err)
+		}
+
+		decodedRc, err := rocketMqSerializer.decodeHeader(headers)
+		if err != nil {
+			t.Fatalf("decode header with rmqCodec fail: %v", err)
+		}
+
+		if rc.Code != decodedRc.Code ||
+			rc.Language != decodedRc.Language ||
+			rc.Version != decodedRc.Version ||
+			rc.Opaque != rc.Opaque ||
+			rc.Flag != rc.Flag ||
+			rc.Remark != rc.Remark ||
+			!reflect.DeepEqual(rc.ExtFields, decodedRc.ExtFields) {
+			t.Fatal("decoded RemotingCommand not equal to the original one")
+		}
+	}
+}
+
+func Benchmark_rmqCodec_decodeHeader(b *testing.B) {
+	rc := randomNewRemotingCommand()
+	headers, err := rocketMqSerializer.encodeHeader(rc)
+	if err != nil {
+		b.Fatalf("encode header with rmqCodec fail: %v", err)
+	}
+	b.ResetTimer()
+
+	for i := 0; i < b.N; i++ {
+		if _, err := rocketMqSerializer.decodeHeader(headers); err != nil {
+			b.Fatalf("decode header with rmqCodec fail: %v", err)
+		}
+	}
+}
+
+func TestCommandJsonEncodeDecode(t *testing.T) {
+	cmd := NewRemotingCommand(192, map[string]string{"brokers": "127.0.0.1"}, []byte("Hello RocketMQCodecs"))
+	codecType = JsonCodecs
+	cmdData, err := encode(cmd)
+	if err != nil {
+		t.Errorf("failed to encode remotingCommand in JSON, %s", err)
+	} else {
+		if len(cmdData) == 0 {
+			t.Errorf("failed to encode remotingCommand, result is empty.")
+		}
+	}
+	newCmd, err := decode(cmdData)
+	if err != nil {
+		t.Errorf("failed to decode remoting in JSON. %s", err)
+	}
+	if newCmd.Code != cmd.Code {
+		t.Errorf("wrong command code. want=%d, got=%d", cmd.Code, newCmd.Code)
+	}
+	if newCmd.Language != cmd.Language {
+		t.Errorf("wrong command language. want=%d, got=%d", cmd.Language, newCmd.Language)
+	}
+	if newCmd.Version != cmd.Version {
+		t.Errorf("wrong command version. want=%d, got=%d", cmd.Version, newCmd.Version)
+	}
+	if newCmd.Opaque != cmd.Opaque {
+		t.Errorf("wrong command version. want=%d, got=%d", cmd.Opaque, newCmd.Opaque)
+	}
+	if newCmd.Flag != cmd.Flag {
+		t.Errorf("wrong commad flag. want=%d, got=%d", cmd.Flag, newCmd.Flag)
+	}
+	if newCmd.Remark != cmd.Remark {
+		t.Errorf("wrong command remakr. want=%s, got=%s", cmd.Remark, newCmd.Remark)
+	}
+	for k, v := range cmd.ExtFields {
+		if vv, ok := newCmd.ExtFields[k]; !ok {
+			t.Errorf("key: %s not exists in newCommand.", k)
+		} else {
+			if v != vv {
+				t.Errorf("wrong value. want=%s, got=%s", v, vv)
+			}
+		}
+	}
+}
+
+func TestCommandRocketMQEncodeDecode(t *testing.T) {
+	cmd := NewRemotingCommand(192, map[string]string{"brokers": "127.0.0.1"}, []byte("Hello RocketMQCodecs"))
+	codecType = RocketMQCodecs
+	cmdData, err := encode(cmd)
+	if err != nil {
+		t.Errorf("failed to encode remotingCommand in JSON, %s", err)
+	} else {
+		if len(cmdData) == 0 {
+			t.Errorf("failed to encode remotingCommand, result is empty.")
+		}
+	}
+	newCmd, err := decode(cmdData)
+	if err != nil {
+		t.Errorf("failed to decode remoting in JSON. %s", err)
+	}
+	if newCmd.Code != cmd.Code {
+		t.Errorf("wrong command code. want=%d, got=%d", cmd.Code, newCmd.Code)
+	}
+	if newCmd.Language != cmd.Language {
+		t.Errorf("wrong command language. want=%d, got=%d", cmd.Language, newCmd.Language)
+	}
+	if newCmd.Version != cmd.Version {
+		t.Errorf("wrong command version. want=%d, got=%d", cmd.Version, newCmd.Version)
+	}
+	if newCmd.Opaque != cmd.Opaque {
+		t.Errorf("wrong command version. want=%d, got=%d", cmd.Opaque, newCmd.Opaque)
+	}
+	if newCmd.Flag != cmd.Flag {
+		t.Errorf("wrong commad flag. want=%d, got=%d", cmd.Flag, newCmd.Flag)
+	}
+	if newCmd.Remark != cmd.Remark {
+		t.Errorf("wrong command remakr. want=%s, got=%s", cmd.Remark, newCmd.Remark)
+	}
+	for k, v := range cmd.ExtFields {
+		if vv, ok := newCmd.ExtFields[k]; !ok {
+			t.Errorf("key: %s not exists in newCommand.", k)
+		} else {
+			if v != vv {
+				t.Errorf("wrong value. want=%s, got=%s", v, vv)
+			}
+		}
+	}
 }