You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by di...@apache.org on 2019/02/26 03:49:01 UTC
[rocketmq-client-go] branch native updated: remote codec (#33)
This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git
The following commit(s) were added to refs/heads/native by this push:
new 4a93af0 remote codec (#33)
4a93af0 is described below
commit 4a93af0a18903a48657f95b294f402e647588eb8
Author: 高峰 <ga...@foxmail.com>
AuthorDate: Tue Feb 26 11:48:57 2019 +0800
remote codec (#33)
* protocol doc fix typo and markdown linter error
* protocol doc update and bug fix
* bug fix for optimize
* refactor remote/codecs.go and remote/codecs_test.go
* using fix-size type
* add unit tests
* update protocl documents and fix typo
* add unit tests and benchmarks with codec
* fix RemotingCommand's decode benchmark test
---
docs/zh/rocketmq-protocol_zh.md | 129 +++++++++---------
remote/client.go | 14 +-
remote/codec.go | 115 +++++++++-------
remote/codec_test.go | 292 +++++++++++++++++++++++++++++++++++++++-
4 files changed, 428 insertions(+), 122 deletions(-)
diff --git a/docs/zh/rocketmq-protocol_zh.md b/docs/zh/rocketmq-protocol_zh.md
index 70cd7a5..818d2e1 100644
--- a/docs/zh/rocketmq-protocol_zh.md
+++ b/docs/zh/rocketmq-protocol_zh.md
@@ -1,34 +1,37 @@
# RocketMQ 通信协议
-在RocketMQ中,`RemotingCommand`是RocketMQ通信的基本对象,Request/Response最后均被包装成`RemotingCommand`。一个`RemotingCommand`
-在被序列化后的格式如下:
-```go
+在 RocketMQ 中,`RemotingCommand` 是 RocketMQ 通信的基本对象,Request/Response 最后均被包装成 `RemotingCommand`。一个 `RemotingCommand` 在被序列化后的格式如下:
+
+```
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
+ frame_size | header_length | header_body | body +
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
+ 4bytes | 4bytes | (21 + r_len + e_len) bytes | remain bytes +
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
```
-|item|type|description|
-|:-:|:-:|:-:|
-|frame_size|`int32`|一个`RemotingCommand`数据包大小|
-|header_length|`in32`|高8位表示数据的序列化方式,余下的表示真实header长度|
-|header_body|`[]byte`|header的playload,长度由附带的`remark`和`properties`决定|
-|body|`[]byte`|具体Request/Response的playload|
+
+| item | type | description |
+| :-: | :-: | :-: |
+| frame_size | `int32` | 一个 `RemotingCommand` 数据包大小 |
+| header_length | `int32` | 高8位表示数据的序列化方式,余下的表示真实 header 长度 |
+| header_body | `[]byte` | header 的 payload,长度由附带的 `remark` 和 `properties` 决定|
+| body | `[]byte` | 具体 Request/Response 的 payload |
## Header
-RocketMQ的Header序列化方式有两种:JSON和RocketMQ私有的序列化方式。JSON序列化方式不再赘述。具体可以参考Java`RemotingCommand`类。
-主要介绍RocketMQ的私有序列化方式。
-在序列化的时候,需要将序列化方式记录进数据包里面,即对`header_length`进行编码
+RocketMQ 的 Header 序列化方式有两种:JSON 和 RocketMQ 私有的序列化方式。JSON 序列化方式不再赘述。具体可以参考 Java `RemotingCommand` 类。
+
+主要介绍 RocketMQ 的私有序列化方式。
+
+在序列化的时候,需要将序列化方式记录进数据包里面,即对 `header_length` 进行编码
```go
// 编码算法
-// 编码后的header_length
+// 编码后的 header_length
var header_length int32
-// 实际的header长度
+// 实际的 header 长度
var headerDataLen int32
// 序列化方式
@@ -41,7 +44,6 @@ result[2]|byte((headerDataLen >> 8) & 0xFF)
result[3]|byte(headerDataLen & 0xFF)
binary.Read(result, binary.BigEndian, &header_length)
-
// 解码算法
headerDataLen := header_length & 0xFFFFFF
SerializedType := byte((header_length >> 24) & 0xFF)
@@ -57,56 +59,59 @@ SerializedType := byte((header_length >> 24) & 0xFF)
+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
```
-|item|type|description|
-|:-:|:-:|:-:|
-|request_code|`short`|哪一种Request或ResponseCode,具体类别由request_flag决定|
-|l_flag|`byte`|language位,用来标识Request来源方的开发语言|
-|v_flag|`int16`|版本标记位|
-|request_flag|`int32`|Header标记位,用来标记该`RemotingCommand`的类型和请求方式|
-|opaque|`int32`|标识Request/Response的RequestID,Broker返回的Response通过该值和Client缓存的Request一一对应|
-|r_len|`int32`| length of remark, remark是Request/Response的附带说明信息,一般在Response中用来说明具体的错误原因|
-|r_body|`[]byte`| playload of remark |
-|e_len|`int32`| length of extended fields,即properties,一些非标准字段会存储在这里,在RocketMQ的各种feature中均有广泛应用|
-|e_body|`int32`| playload of extended fields |
+| item | type | description |
+| :-: | :-: | :-: |
+| request_code | `int16` | 哪一种 Request 或 ResponseCode,具体类别由 request_flag 决定 |
+| l_flag | `byte` | language 位,用来标识Request来源方的开发语言 |
+| v_flag | `int16` | 版本标记位 |
+| request_flag |`int32`| Header标记位,用来标记该 `RemotingCommand` 的类型和请求方式 |
+| opaque | `int32` | 标识 Request/Response 的 RequestID,Broker 返回的 Response 通过该值和 Client 缓存的 Request 一一对应 |
+| r_len | `int32` | length of remark, remark 是 Request/Response 的附带说明信息,一般在 Response 中用来说明具体的错误原因 |
+| r_body | `[]byte` | payload of remark |
+| e_len | `int32` | length of extended fields,即 properties,一些非标准字段会存储在这里,在 RocketMQ 的各种 feature 中均有广泛应用 |
+| e_body | `int32` | payload of extended fields |
## Body
-`body`是具体的Request/Response的数据,在RocketMQ中,有许多种Request/Response。每个类有自己的序列化和反序列方式,由于种类过多,
-这里就不再展开。可以具体参考Java代码中对`CommandCustomHeader`的使用。下面列一些Client使用到的Request和Response
+
+`body` 是具体的 Request/Response 的数据,在 RocketMQ 中,有许多种 Request/Response。每个类有自己的序列化和反序列方式,由于种类过多,
+这里就不再展开。可以具体参考Java代码中对`CommandCustomHeader`的使用。下面列一些 Client 使用到的 Request 和 Response。
### RequestCode
-|item|type|description|
-|:-:|:-:|:-:|
-|SEND_MESSAGE|10| 向broker发送消息|
-|PULL_MESSAGE|11| 从broker拉取消息,client的push模式也是通过pull的长轮询来实现的|
-|TODO...|||
+
+| item | type | description |
+| :-: | :-: | :-: |
+| SEND_MESSAGE | 10 | 向broker发送消息 |
+| PULL_MESSAGE | 11 | 从broker拉取消息,client的push模式也是通过pull的长轮询来实现的 |
+| TODO... | | |
### ResponseCode
-|item|type|description|
-|:-:|:-:|:-:|
-|FLUSH_DISK_TIMEOUT|10|broker存储层刷盘超时|
-|SLAVE_NOT_AVAILABLE|11|slave节点无法服务|
-|FLUSH_SLAVE_TIMEOUT|12|数据同步到slave超时|
-|MESSAGE_ILLEGAL|13|消息格式不合格|
-|SERVICE_NOT_AVAILABLE|14|broker暂时不可用|
-|VERSION_NOT_SUPPORTED|15|不支持的请求,目前没有看到使用|
-|NO_PERMISSION|16|对broker、topic或subscription无访问权限|
-|TOPIC_EXIST_ALREADY|18|topic已存在,目前没看到使用|
-|PULL_NOT_FOUND|19|没拉到消息,大多为offset错误|
-|PULL_RETRY_IMMEDIATELY|20|建议client立即重新拉取消息|
-|PULL_OFFSET_MOVED|21|offset太小或太大|
-|QUERY_NOT_FOUND|22|管理面Response,TODO|
-|SUBSCRIPTION_PARSE_FAILED|23|订阅数据解析失败|
-|SUBSCRIPTION_NOT_EXIST|24|订阅不存在|
-|SUBSCRIPTION_NOT_LATEST|25|订阅数据版本和request数据版本不匹配|
-|SUBSCRIPTION_GROUP_NOT_EXIST|26|订阅组不存在|
-|FILTER_DATA_NOT_EXIST|27|filter数据不存在|
-|FILTER_DATA_NOT_LATEST|28|filter数据版本和request数据版本不匹配|
-|TRANSACTION_SHOULD_COMMIT|200|事务Response,TODO|
-|TRANSACTION_SHOULD_ROLLBACK|201|事务Response,TODO|
-|TRANSACTION_STATE_UNKNOW|202|事务Response,TODO||
-|TRANSACTION_STATE_GROUP_WRONG|203|事务Response,TODO|
-|NO_BUYER_ID|204|不知道是什么,没看到broker端在使用|
-|NOT_IN_CURRENT_UNIT|205|不知道是什么,没看到broker端在使用|
-|CONSUMER_NOT_ONLINE|206|consumer不在线,控制面response|
-|CONSUME_MSG_TIMEOUT|207|client request等待broker相应超时|
-|NO_MESSAGE|208|控制面response,由client自己设置,不清楚具体用途|
\ No newline at end of file
+
+| item | type | description |
+| :-: | :-: | :-: |
+| FLUSH_DISK_TIMEOUT | 10 | broker 存储层刷盘超时 |
+| SLAVE_NOT_AVAILABLE | 11 | slave 节点无法服务 |
+| FLUSH_SLAVE_TIMEOUT | 12 | 数据同步到 slave 超时 |
+| MESSAGE_ILLEGAL | 13 | 消息格式不合格 |
+| SERVICE_NOT_AVAILABLE | 14 | broker 暂时不可用 |
+| VERSION_NOT_SUPPORTED | 15 | 不支持的请求,目前没有看到使用 |
+| NO_PERMISSION | 16 | 对 broker、topic 或 subscription 无访问权限 |
+| TOPIC_EXIST_ALREADY | 18 | topic 已存在,目前没看到使用 |
+| PULL_NOT_FOUND | 19 | 没拉到消息,大多为 offset 错误 |
+| PULL_RETRY_IMMEDIATELY | 20 | 建议 client 立即重新拉取消息 |
+| PULL_OFFSET_MOVED | 21 | offset 太小或太大 |
+| QUERY_NOT_FOUND | 22 | 管理面 Response,TODO |
+| SUBSCRIPTION_PARSE_FAILED | 23 | 订阅数据解析失败 |
+| SUBSCRIPTION_NOT_EXIST | 24 | 订阅不存在 |
+| SUBSCRIPTION_NOT_LATEST | 25 | 订阅数据版本和 request 数据版本不匹配 |
+| SUBSCRIPTION_GROUP_NOT_EXIST | 26 | 订阅组不存在 |
+| FILTER_DATA_NOT_EXIST | 27 | filter 数据不存在 |
+| FILTER_DATA_NOT_LATEST | 28 | filter 数据版本和 request 数据版本不匹配 |
+| TRANSACTION_SHOULD_COMMIT | 200 | 事务 Response,TODO |
+| TRANSACTION_SHOULD_ROLLBACK | 201 | 事务 Response,TODO |
+| TRANSACTION_STATE_UNKNOW | 202 | 事务 Response,TODO | |
+| TRANSACTION_STATE_GROUP_WRONG | 203 | 事务 Response,TODO |
+| NO_BUYER_ID | 204 | 不知道是什么,没看到 broker 端在使用 |
+| NOT_IN_CURRENT_UNIT | 205 | 不知道是什么,没看到 broker 端在使用 |
+| CONSUMER_NOT_ONLINE | 206 | consumer 不在线,控制面 response |
+| CONSUME_MSG_TIMEOUT | 207 | client request 等待 broker 相应超时 |
+| NO_MESSAGE | 208 | 控制面 response,由 client 自己设置,不清楚具体用途 |
diff --git a/remote/client.go b/remote/client.go
index 2342905..2c6805c 100644
--- a/remote/client.go
+++ b/remote/client.go
@@ -19,11 +19,12 @@ package remote
import (
"encoding/binary"
"errors"
- "github.com/apache/rocketmq-client-go/utils"
- log "github.com/sirupsen/logrus"
"net"
"sync"
"time"
+
+ "github.com/apache/rocketmq-client-go/utils"
+ log "github.com/sirupsen/logrus"
)
var (
@@ -157,11 +158,12 @@ func (client *defaultClient) invokeOneWay(request *RemotingCommand) error {
}
func (client *defaultClient) doRequest(header, body []byte) error {
- var requestBytes []byte
- requestBytes = append(requestBytes, header...)
- if body != nil && len(body) > 0 {
- requestBytes = append(requestBytes, body...)
+ var requestBytes = make([]byte, len(header)+len(body))
+ copy(requestBytes, header)
+ if len(body) > 0 {
+ copy(requestBytes[len(header):], body)
}
+
_, err := client.conn.Write(requestBytes)
return err
}
diff --git a/remote/codec.go b/remote/codec.go
index b923f01..ca50fa6 100644
--- a/remote/codec.go
+++ b/remote/codec.go
@@ -20,51 +20,48 @@ import (
"bytes"
"encoding/binary"
"encoding/json"
+ "fmt"
+ "sync/atomic"
)
var opaque int32
const (
// 0, REQUEST_COMMAND
- RpcType = 0
+ RPCType = 0
// 1, RPC
- RpcOneWay = 1
+ RPCOneWay = 1
+ //ResponseType for reponse
ResponseType = 1
- RemotingCommandFlag = 0
- languageFlag = "golang"
- languageCode = byte(9)
- RemotingCommandVersion = 137
+ _Flag = 0
+ _LanguageFlag = "golang"
+ _LanguageCode = byte(9)
+ _Version = 137
)
type RemotingCommand struct {
Code int16 `json:"code"`
- Language string `json:"language"`
+ Language byte `json:"language"`
Version int16 `json:"version"`
Opaque int32 `json:"opaque"`
- Flag int `json:"flag"`
+ Flag int32 `json:"flag"`
Remark string `json:"remark"`
ExtFields map[string]string `json:"extFields"`
- Body []byte `json:"body,omitempty"`
+ Body []byte `json:"-"`
}
-type CustomHeader interface {
- Header() map[string]string
-}
-
-func NewRemotingCommand(code int16, header CustomHeader) *RemotingCommand {
- rc := &RemotingCommand{
+func NewRemotingCommand(code int16, properties map[string]string, body []byte) *RemotingCommand {
+ return &RemotingCommand{
Code: code,
- Language: languageFlag,
- Version: RemotingCommandVersion,
- }
-
- if header != nil {
- rc.ExtFields = header.Header()
+ Language: _LanguageCode,
+ Version: _Version,
+ Opaque: atomic.AddInt32(&opaque, 1),
+ ExtFields: properties,
+ Body: body,
}
- return rc
}
func (command *RemotingCommand) isResponseType() bool {
@@ -96,9 +93,9 @@ func encode(command *RemotingCommand) ([]byte, error) {
)
switch codecType {
- case Json:
+ case JsonCodecs:
header, err = jsonSerializer.encodeHeader(command)
- case RocketMQ:
+ case RocketMQCodecs:
header, err = rocketMqSerializer.encodeHeader(command)
}
@@ -108,6 +105,7 @@ func encode(command *RemotingCommand) ([]byte, error) {
frameSize := 8 + len(header) + len(command.Body)
buf := bytes.NewBuffer(make([]byte, frameSize))
+ buf.Reset()
err = binary.Write(buf, binary.BigEndian, int32(frameSize))
if err != nil {
@@ -119,7 +117,12 @@ func encode(command *RemotingCommand) ([]byte, error) {
return nil, err
}
- err = binary.Write(buf, binary.BigEndian, int32(len(command.Body)))
+ err = binary.Write(buf, binary.BigEndian, header)
+ if err != nil {
+ return nil, err
+ }
+
+ err = binary.Write(buf, binary.BigEndian, command.Body)
if err != nil {
return nil, err
}
@@ -129,39 +132,45 @@ func encode(command *RemotingCommand) ([]byte, error) {
func decode(data []byte) (*RemotingCommand, error) {
buf := bytes.NewBuffer(data)
-
- var oriHeaderLen, headerLength int32
- err := binary.Read(buf, binary.BigEndian, &oriHeaderLen)
+ var length int32
+ err := binary.Read(buf, binary.BigEndian, &length)
if err != nil {
return nil, err
}
- headerLength = oriHeaderLen & 0xFFFFFF
+ var oriHeaderLen int32
+ err = binary.Read(buf, binary.BigEndian, &oriHeaderLen)
+ if err != nil {
+ return nil, err
+ }
+ headerLength := oriHeaderLen & 0xFFFFFF
headerData := make([]byte, headerLength)
- err = binary.Read(buf, binary.BigEndian, &headerLength)
+ err = binary.Read(buf, binary.BigEndian, &headerData)
if err != nil {
return nil, err
}
var command *RemotingCommand
- switch byte((oriHeaderLen >> 24) & 0xFF) {
- case Json:
+ switch codeType := byte((oriHeaderLen >> 24) & 0xFF); codeType {
+ case JsonCodecs:
command, err = jsonSerializer.decodeHeader(headerData)
- case RocketMQ:
+ case RocketMQCodecs:
command, err = rocketMqSerializer.decodeHeader(headerData)
+ default:
+ err = fmt.Errorf("unknown codec type: %d", codeType)
}
if err != nil {
return nil, err
}
- bodyLength := int32(len(data)) - 4 - headerLength
- command.Body = make([]byte, bodyLength)
- err = binary.Read(buf, binary.BigEndian, &command.Body)
+ bodyLength := length - 8 - headerLength
+ bodyData := make([]byte, bodyLength)
+ err = binary.Read(buf, binary.BigEndian, &bodyData)
if err != nil {
return nil, err
}
-
+ command.Body = bodyData
return command, nil
}
@@ -175,8 +184,8 @@ func markProtocolType(source int32) []byte {
}
const (
- Json = byte(0)
- RocketMQ = byte(1)
+ JsonCodecs = byte(0)
+ RocketMQCodecs = byte(1)
)
type serializer interface {
@@ -198,6 +207,7 @@ func (c *jsonCodec) encodeHeader(command *RemotingCommand) ([]byte, error) {
func (c *jsonCodec) decodeHeader(header []byte) (*RemotingCommand, error) {
command := &RemotingCommand{}
command.ExtFields = make(map[string]string)
+ command.Body = make([]byte, 0)
err := json.Unmarshal(header, command)
if err != nil {
return nil, err
@@ -205,8 +215,8 @@ func (c *jsonCodec) decodeHeader(header []byte) (*RemotingCommand, error) {
return command, nil
}
-// rmqCodec implementation of RocketMQ private protocol, please refer to remoting/protocol/RocketMQSerializable
-// RocketMQ Private Protocol Header format:
+// rmqCodec implementation of RocketMQCodecs private protocol, please refer to remoting/protocol/RocketMQSerializable
+// RocketMQCodecs Private Protocol Header format:
//
// v_flag: version flag
// r_len: length of remark body
@@ -234,21 +244,22 @@ func (c *rmqCodec) encodeHeader(command *RemotingCommand) ([]byte, error) {
}
buf := bytes.NewBuffer(make([]byte, headerFixedLength+len(command.Remark)+len(extBytes)))
+ buf.Reset()
// request code, length is 2 bytes
- err = binary.Write(buf, binary.BigEndian, command.Code)
+ err = binary.Write(buf, binary.BigEndian, int16(command.Code))
if err != nil {
return nil, err
}
// language flag, length is 1 byte
- err = binary.Write(buf, binary.BigEndian, languageCode)
+ err = binary.Write(buf, binary.BigEndian, _LanguageCode)
if err != nil {
return nil, err
}
// version flag, length is 2 bytes
- err = binary.Write(buf, binary.BigEndian, command.Version)
+ err = binary.Write(buf, binary.BigEndian, int16(command.Version))
if err != nil {
return nil, err
}
@@ -273,7 +284,7 @@ func (c *rmqCodec) encodeHeader(command *RemotingCommand) ([]byte, error) {
// write remark, len(command.Remark) bytes
if len(command.Remark) > 0 {
- err = binary.Write(buf, binary.BigEndian, command.Remark)
+ err = binary.Write(buf, binary.BigEndian, []byte(command.Remark))
if err != nil {
return nil, err
}
@@ -326,13 +337,12 @@ func (c *rmqCodec) decodeHeader(data []byte) (*RemotingCommand, error) {
var err error
command := &RemotingCommand{}
buf := bytes.NewBuffer(data)
- // int code(~32767)
- err = binary.Read(buf, binary.BigEndian, &command.Code)
+ var code int16
+ err = binary.Read(buf, binary.BigEndian, &code)
if err != nil {
return nil, err
}
-
- // LanguageCode language
+ command.Code = code
var (
languageCode byte
@@ -343,13 +353,14 @@ func (c *rmqCodec) decodeHeader(data []byte) (*RemotingCommand, error) {
if err != nil {
return nil, err
}
- //command.Language = languageFlag
+ command.Language = languageCode
- // int version(~32767)
- err = binary.Read(buf, binary.BigEndian, &command.Version)
+ var version int16
+ err = binary.Read(buf, binary.BigEndian, &version)
if err != nil {
return nil, err
}
+ command.Version = version
// int opaque
err = binary.Read(buf, binary.BigEndian, &command.Opaque)
diff --git a/remote/codec_test.go b/remote/codec_test.go
index aaf065b..62dd528 100644
--- a/remote/codec_test.go
+++ b/remote/codec_test.go
@@ -17,11 +17,299 @@
package remote
import (
+ "math/rand"
+ "reflect"
"testing"
)
-func TestEncode(t *testing.T) {
+func randomBytes(length int) []byte {
+ bs := make([]byte, length)
+ if _, err := rand.Read(bs); err != nil {
+ panic("read random bytes fail")
+ }
+ return bs
}
-func TestDecode(t *testing.T) {
+func randomString(length int) string {
+ bs := make([]byte, length)
+ for i := 0; i < len(bs); i++ {
+ bs[i] = byte(97 + rand.Intn(26))
+ }
+ return string(bs)
+}
+
+func randomNewRemotingCommand() *RemotingCommand {
+ properties := make(map[string]string)
+ for i := 0; i < 10; i++ {
+ properties[randomString(rand.Intn(20))] = randomString(rand.Intn(20))
+ }
+ body := randomBytes(rand.Intn(100))
+ return NewRemotingCommand(int16(rand.Intn(1000)), properties, body)
+}
+
+func Test_encode(t *testing.T) {
+ for i := 0; i < 1000; i++ {
+ rc := randomNewRemotingCommand()
+ if _, err := encode(rc); err != nil {
+ t.Fatalf("encode RemotingCommand to bytes fail: %v", err)
+ }
+ }
+}
+
+func Benchmark_encode(b *testing.B) {
+ rc := randomNewRemotingCommand()
+ b.ResetTimer()
+
+ for i := 0; i < b.N; i++ {
+ if _, err := encode(rc); err != nil {
+ b.Fatalf("encode RemotingCommand to bytes fail: %v", err)
+ }
+ }
+}
+
+func Test_decode(t *testing.T) {
+ for i := 0; i < 1000; i++ {
+ rc := randomNewRemotingCommand()
+
+ bs, err := encode(rc)
+ if err != nil {
+ t.Fatalf("encode RemotingCommand to bytes fail: %v", err)
+ }
+
+ decodedRc, err := decode(bs)
+ if err != nil {
+ t.Fatalf("decode bytes to RemotingCommand fail: %v", err)
+ }
+
+ if !reflect.DeepEqual(*rc, *decodedRc) {
+ t.Fatal("decoded RemotingCommand not equal to the original one")
+ }
+ }
+}
+
+func Benchmark_decode(b *testing.B) {
+ rc := randomNewRemotingCommand()
+ bs, err := encode(rc)
+ if err != nil {
+ b.Fatalf("encode RemotingCommand to bytes fail: %v", err)
+ }
+ b.ResetTimer()
+
+ for i := 0; i < b.N; i++ {
+ if _, err := decode(bs); err != nil {
+ b.Fatalf("decode bytes to RemotingCommand fail: %v", err)
+ }
+ }
+}
+
+func Test_jsonCodec_encodeHeader(t *testing.T) {
+ for i := 0; i < 1000; i++ {
+ rc := randomNewRemotingCommand()
+
+ if _, err := jsonSerializer.encodeHeader(rc); err != nil {
+ t.Fatalf("encode header with jsonCodec fail: %v", err)
+ }
+ }
+}
+
+func Benchmark_jsonCodec_encodeHeader(b *testing.B) {
+ rc := randomNewRemotingCommand()
+ b.ResetTimer()
+
+ for i := 0; i < b.N; i++ {
+ if _, err := jsonSerializer.encodeHeader(rc); err != nil {
+ b.Fatalf("encode header with jsonCodec fail: %v", err)
+ }
+ }
+}
+
+func Test_jsonCodec_decodeHeader(t *testing.T) {
+ for i := 0; i < 1; i++ {
+ rc := randomNewRemotingCommand()
+
+ headers, err := jsonSerializer.encodeHeader(rc)
+ if err != nil {
+ t.Fatalf("encode header with jsonCodec fail: %v", err)
+ }
+
+ decodedRc, err := jsonSerializer.decodeHeader(headers)
+ if err != nil {
+ t.Fatalf("decode header with jsonCodec fail: %v", err)
+ }
+
+ if rc.Code != decodedRc.Code ||
+ rc.Language != decodedRc.Language ||
+ rc.Version != decodedRc.Version ||
+ rc.Opaque != rc.Opaque ||
+ rc.Flag != rc.Flag ||
+ rc.Remark != rc.Remark ||
+ !reflect.DeepEqual(rc.ExtFields, decodedRc.ExtFields) {
+ t.Fatal("decoded RemotingCommand not equal to the original one")
+ }
+ }
+}
+
+func Benchmark_jsonCodec_decodeHeader(b *testing.B) {
+ rc := randomNewRemotingCommand()
+ headers, err := jsonSerializer.encodeHeader(rc)
+ if err != nil {
+ b.Fatalf("encode header with jsonCodec fail: %v", err)
+ }
+ b.ResetTimer()
+
+ for i := 0; i < b.N; i++ {
+ if _, err := jsonSerializer.decodeHeader(headers); err != nil {
+ b.Fatalf("decode header with jsonCodec fail: %v", err)
+ }
+ }
+}
+
+func Test_rmqCodec_encodeHeader(t *testing.T) {
+ for i := 0; i < 1000; i++ {
+ rc := randomNewRemotingCommand()
+
+ if _, err := rocketMqSerializer.encodeHeader(rc); err != nil {
+ t.Fatalf("encode header with rmqCodec fail: %v", err)
+ }
+ }
+}
+
+func Benchmark_rmqCodec_encodeHeader(b *testing.B) {
+ rc := randomNewRemotingCommand()
+ b.ResetTimer()
+
+ for i := 0; i < b.N; i++ {
+ if _, err := rocketMqSerializer.encodeHeader(rc); err != nil {
+ b.Fatalf("encode header with rmqCodec fail: %v", err)
+ }
+ }
+}
+
+func Test_rmqCodec_decodeHeader(t *testing.T) {
+ for i := 0; i < 1; i++ {
+ rc := randomNewRemotingCommand()
+
+ headers, err := rocketMqSerializer.encodeHeader(rc)
+ if err != nil {
+ t.Fatalf("encode header with rmqCodec fail: %v", err)
+ }
+
+ decodedRc, err := rocketMqSerializer.decodeHeader(headers)
+ if err != nil {
+ t.Fatalf("decode header with rmqCodec fail: %v", err)
+ }
+
+ if rc.Code != decodedRc.Code ||
+ rc.Language != decodedRc.Language ||
+ rc.Version != decodedRc.Version ||
+ rc.Opaque != rc.Opaque ||
+ rc.Flag != rc.Flag ||
+ rc.Remark != rc.Remark ||
+ !reflect.DeepEqual(rc.ExtFields, decodedRc.ExtFields) {
+ t.Fatal("decoded RemotingCommand not equal to the original one")
+ }
+ }
+}
+
+func Benchmark_rmqCodec_decodeHeader(b *testing.B) {
+ rc := randomNewRemotingCommand()
+ headers, err := rocketMqSerializer.encodeHeader(rc)
+ if err != nil {
+ b.Fatalf("encode header with rmqCodec fail: %v", err)
+ }
+ b.ResetTimer()
+
+ for i := 0; i < b.N; i++ {
+ if _, err := rocketMqSerializer.decodeHeader(headers); err != nil {
+ b.Fatalf("decode header with rmqCodec fail: %v", err)
+ }
+ }
+}
+
+func TestCommandJsonEncodeDecode(t *testing.T) {
+ cmd := NewRemotingCommand(192, map[string]string{"brokers": "127.0.0.1"}, []byte("Hello RocketMQCodecs"))
+ codecType = JsonCodecs
+ cmdData, err := encode(cmd)
+ if err != nil {
+ t.Errorf("failed to encode remotingCommand in JSON, %s", err)
+ } else {
+ if len(cmdData) == 0 {
+ t.Errorf("failed to encode remotingCommand, result is empty.")
+ }
+ }
+ newCmd, err := decode(cmdData)
+ if err != nil {
+ t.Errorf("failed to decode remoting in JSON. %s", err)
+ }
+ if newCmd.Code != cmd.Code {
+ t.Errorf("wrong command code. want=%d, got=%d", cmd.Code, newCmd.Code)
+ }
+ if newCmd.Language != cmd.Language {
+ t.Errorf("wrong command language. want=%d, got=%d", cmd.Language, newCmd.Language)
+ }
+ if newCmd.Version != cmd.Version {
+ t.Errorf("wrong command version. want=%d, got=%d", cmd.Version, newCmd.Version)
+ }
+ if newCmd.Opaque != cmd.Opaque {
+ t.Errorf("wrong command version. want=%d, got=%d", cmd.Opaque, newCmd.Opaque)
+ }
+ if newCmd.Flag != cmd.Flag {
+ t.Errorf("wrong commad flag. want=%d, got=%d", cmd.Flag, newCmd.Flag)
+ }
+ if newCmd.Remark != cmd.Remark {
+ t.Errorf("wrong command remakr. want=%s, got=%s", cmd.Remark, newCmd.Remark)
+ }
+ for k, v := range cmd.ExtFields {
+ if vv, ok := newCmd.ExtFields[k]; !ok {
+ t.Errorf("key: %s not exists in newCommand.", k)
+ } else {
+ if v != vv {
+ t.Errorf("wrong value. want=%s, got=%s", v, vv)
+ }
+ }
+ }
+}
+
+func TestCommandRocketMQEncodeDecode(t *testing.T) {
+ cmd := NewRemotingCommand(192, map[string]string{"brokers": "127.0.0.1"}, []byte("Hello RocketMQCodecs"))
+ codecType = RocketMQCodecs
+ cmdData, err := encode(cmd)
+ if err != nil {
+ t.Errorf("failed to encode remotingCommand in JSON, %s", err)
+ } else {
+ if len(cmdData) == 0 {
+ t.Errorf("failed to encode remotingCommand, result is empty.")
+ }
+ }
+ newCmd, err := decode(cmdData)
+ if err != nil {
+ t.Errorf("failed to decode remoting in JSON. %s", err)
+ }
+ if newCmd.Code != cmd.Code {
+ t.Errorf("wrong command code. want=%d, got=%d", cmd.Code, newCmd.Code)
+ }
+ if newCmd.Language != cmd.Language {
+ t.Errorf("wrong command language. want=%d, got=%d", cmd.Language, newCmd.Language)
+ }
+ if newCmd.Version != cmd.Version {
+ t.Errorf("wrong command version. want=%d, got=%d", cmd.Version, newCmd.Version)
+ }
+ if newCmd.Opaque != cmd.Opaque {
+ t.Errorf("wrong command version. want=%d, got=%d", cmd.Opaque, newCmd.Opaque)
+ }
+ if newCmd.Flag != cmd.Flag {
+ t.Errorf("wrong commad flag. want=%d, got=%d", cmd.Flag, newCmd.Flag)
+ }
+ if newCmd.Remark != cmd.Remark {
+ t.Errorf("wrong command remakr. want=%s, got=%s", cmd.Remark, newCmd.Remark)
+ }
+ for k, v := range cmd.ExtFields {
+ if vv, ok := newCmd.ExtFields[k]; !ok {
+ t.Errorf("key: %s not exists in newCommand.", k)
+ } else {
+ if v != vv {
+ t.Errorf("wrong value. want=%s, got=%s", v, vv)
+ }
+ }
+ }
}