You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by di...@apache.org on 2019/02/18 02:09:06 UTC
[rocketmq-client-go] branch native updated: Adding partial
implementation of remote in native and docs (#29)
This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git
The following commit(s) were added to refs/heads/native by this push:
new 26011d0 Adding partial implementation of remote in native and docs (#29)
26011d0 is described below
commit 26011d04ffdef49c5aa98e0094ecd9406023dcf1
Author: wenfeng <sx...@gmail.com>
AuthorDate: Mon Feb 18 10:09:02 2019 +0800
Adding partial implementation of remote in native and docs (#29)
* Add implementation of RocketMQ protocol in pure go
* Add design & protocol docs
---
common/manager.go | 1 +
common/route.go | 1 +
core/producer.go | 6 +-
core/push_consumer.go | 2 +-
{doc => docs}/Introduction.md | 0
docs/client-design.gliffy | 1 +
docs/client-design.png | Bin 0 -> 30513 bytes
docs/feature.md | 93 +++++++++
docs/zh/native-design_zh.md | 102 ++++++++++
docs/zh/rocketmq-protocol_zh.md | 112 +++++++++++
remote/client.go | 252 +++++++++++++++++++++++
remote/client_test.go | 17 ++
remote/codec.go | 431 ++++++++++++++++++++++++++++++++++++++++
remote/codec_test.go | 27 +++
remote/processor.go | 26 +++
remote/request.go | 111 +++++++++++
remote/response.go | 66 ++++++
remote/rpchook.go | 23 +++
utils/ring_buffer.go | 64 ++++++
19 files changed, 1331 insertions(+), 4 deletions(-)
diff --git a/common/manager.go b/common/manager.go
new file mode 100644
index 0000000..805d0c7
--- /dev/null
+++ b/common/manager.go
@@ -0,0 +1 @@
+package common
diff --git a/common/route.go b/common/route.go
new file mode 100644
index 0000000..805d0c7
--- /dev/null
+++ b/common/route.go
@@ -0,0 +1 @@
+package common
diff --git a/core/producer.go b/core/producer.go
index fe2d978..7df57e6 100644
--- a/core/producer.go
+++ b/core/producer.go
@@ -235,12 +235,12 @@ func (p *defaultProducer) SendMessageOrderly(msg *Message, selector MessageQueue
unsafe.Pointer(&key),
C.int(autoRetryTimes),
&sr))
-
+
if err != NIL {
log.Warnf("send message orderly error, error is: %s", err.Error())
return nil, err
}
-
+
return &SendResult{
Status: SendStatus(sr.sendStatus),
MsgId: C.GoString(&sr.msgId[0]),
@@ -257,7 +257,7 @@ func (p *defaultProducer) SendMessageOneway(msg *Message) error {
log.Warnf("send message with oneway error, error is: %s", err.Error())
return err
}
-
+
log.Debugf("Send Message: %s with oneway success.", msg.String())
return nil
}
diff --git a/core/push_consumer.go b/core/push_consumer.go
index 90e1454..c222587 100644
--- a/core/push_consumer.go
+++ b/core/push_consumer.go
@@ -32,11 +32,11 @@ int callback_cgo(CPushConsumer *consumer, CMessageExt *msg) {
import "C"
import (
+ "errors"
"fmt"
log "github.com/sirupsen/logrus"
"sync"
"unsafe"
- "errors"
)
type ConsumeStatus int
diff --git a/doc/Introduction.md b/docs/Introduction.md
similarity index 100%
rename from doc/Introduction.md
rename to docs/Introduction.md
diff --git a/docs/client-design.gliffy b/docs/client-design.gliffy
new file mode 100644
index 0000000..418f4d8
--- /dev/null
+++ b/docs/client-design.gliffy
@@ -0,0 +1 @@
+{"contentType":"application/gliffy+json","version":"1.1","metadata":{"title":"untitled","revision":0,"exportBorder":false},"embeddedResources":{"index":0,"resources":[]},"stage":{"objects":[{"x":31,"y":44,"rotation":0,"id":57,"uid":"com.gliffy.shape.basic.basic_v1.default.text","width":150,"height":27,"lockAspectRatio":false,"lockShape":false,"order":57,"graphic":{"type":"Text","Text":{"tid":null,"valign":"middle","overflow":"none","vposition":"none","hposition":"none","html":"<p style=\ [...]
\ No newline at end of file
diff --git a/docs/client-design.png b/docs/client-design.png
new file mode 100644
index 0000000..97b86ef
Binary files /dev/null and b/docs/client-design.png differ
diff --git a/docs/feature.md b/docs/feature.md
new file mode 100644
index 0000000..bc23be2
--- /dev/null
+++ b/docs/feature.md
@@ -0,0 +1,93 @@
+# Feature
+
+## Producer
+- [ ] ProducerType
+ - [ ] DefaultProducer
+ - [ ] TransactionProducer
+- [ ] API
+ - [ ] Send
+ - [ ] Sync
+ - [ ] Async
+ - [ ] OneWay
+- [ ] Other
+ - [ ] DelayMessage
+ - [ ] Config
+ - [ ] MessageId Generate
+ - [ ] CompressMsg
+ - [ ] TimeOut
+ - [ ] LoadBalance
+ - [ ] DefaultTopic
+ - [ ] VipChannel
+ - [ ] Retry
+ - [ ] SendMessageHook
+ - [ ] CheckRequestQueue
+ - [ ] CheckForbiddenHookList
+ - [ ] MQFaultStrategy
+
+## Consumer
+- [ ] ConsumerType
+ - [ ] PushConsumer
+ - [ ] PullConsumer
+- [ ] MessageListener
+ - [ ] Concurrently
+ - [ ] Orderly
+- [ ] MessageModel
+ - [ ] CLUSTERING
+ - [ ] BROADCASTING
+- [ ] OffsetStore
+ - [ ] RemoteBrokerOffsetStore
+ - [ ] many actions
+ - [ ] LocalFileOffsetStore
+- [ ] RebalanceService
+- [ ] PullMessageService
+- [ ] ConsumeMessageService
+- [ ] AllocateMessageQueueStrategy
+ - [ ] AllocateMessageQueueAveragely
+ - [ ] AllocateMessageQueueAveragelyByCircle
+ - [ ] AllocateMessageQueueByConfig
+ - [ ] AllocateMessageQueueByMachineRoom
+- [ ] Other
+ - [ ] Config
+ - [ ] ZIP
+ - [ ] AllocateMessageQueueStrategy
+ - [ ] ConsumeFromWhere
+ - [ ] CONSUME_FROM_LAST_OFFSET
+ - [ ] CONSUME_FROM_FIRST_OFFSET
+ - [ ] CONSUME_FROM_TIMESTAMP
+ - [ ] Retry(sendMessageBack)
+ - [ ] TimeOut(clearExpiredMessage)
+ - [ ] ACK(partSuccess)
+ - [ ] FlowControl(messageCanNotConsume)
+ - [ ] ConsumeMessageHook
+ - [ ] filterMessageHookList
+
+## Manager
+- [ ] Multiple Request API Wrapper
+ - many functions...
+- [ ] Task
+ - [ ] PollNameServer
+ - [ ] Heartbeat
+ - [ ] UpdateTopicRouteInfoFromNameServer
+ - [ ] CleanOfflineBroker
+ - [ ] PersistAllConsumerOffset
+ - [ ] ClearExpiredMessage(form consumer consumeMessageService)
+- [ ] Processor
+ - [ ] CHECK_TRANSACTION_STATE
+ - [ ] NOTIFY_CONSUMER_IDS_CHANGED
+ - [ ] RESET_CONSUMER_CLIENT_OFFSET
+ - [ ] GET_CONSUMER_STATUS_FROM_CLIENT
+ - [ ] GET_CONSUMER_RUNNING_INFO
+ - [ ] CONSUME_MESSAGE_DIRECTLY
+
+## Remoting
+- [ ] API
+ - [ ] InvokeSync
+ - [ ] InvokeAsync
+ - [ ] InvokeOneWay
+- [ ] Serialize
+ - [ ] JSON
+ - [ ] ROCKETMQ
+- [ ] Other
+ - [ ] VIPChannel
+ - [ ] RPCHook
+
\ No newline at end of file
diff --git a/docs/zh/native-design_zh.md b/docs/zh/native-design_zh.md
new file mode 100644
index 0000000..c1745e0
--- /dev/null
+++ b/docs/zh/native-design_zh.md
@@ -0,0 +1,102 @@
+# RocketMQ Go Client Design Draft
+
+## Architecture
+
+### Overview
+![client-design](client-design.png)
+
+### Description
+在RocketMQ Java Client的实现里面,代码耦合了大量的admin方面的功能, 其为了尽可能的提高代码复用率,代码的依赖关系较为复杂、接口的设计比
+较重、语义的界限不够清晰。因此,为了避免简单的对Java代码进行翻译,故Go客户端进行了重新的设计,剥离了admin相关的逻辑。整体如上面的图所示,在逻辑层次上,按照请求
+顺序,从下到上总共分为三层:
+- remote层:client网络通信和私有协议层,将到每个到服务端(NameServer或Broker)的连接实体抽象为一个client结构,并在这里实现了网络数据的
+序列/反序列化。
+- 公共层:由于remote层对上层只暴露了`Sync/Async/Oneway`三个接口,所以对于特定的Request/Response、连接的管理、路由等信息的处理和维护、
+其它`producer/consumer`共用的逻辑,均在这里实现。
+- 业务逻辑层:在这里实现各种producer、consumer的语义。所有producer、consumer专有的逻辑,均放到这里实现,如`queue selector`,
+`consume balance`, `offset storage`等。除了基础的数据结构外,producer和consumer之间内部的代码不能进行复用。
+
+
+## 设计目标
+0. 兼容cgo版本已经暴露出去的API
+1. 实现语义清晰、轻量的API接口
+2. 依赖关系清晰简单,设计和实现要正交
+
+## 目录结构
+### 源码
+- producer:producer相关逻辑
+- consumer:consumer相关逻辑
+- common(可改名):连接管理和路由管理相关的通用逻辑
+- remote:网络通信和序列化
+
+### 其它
+- benchmark:压力测试相关代码
+- core:1.2版本cgo的代码库,该目录下的代码将会被移除,只保留API进行兼容,并会被标记为`Deprecated`
+- docs:文档,包括面向用户和开发者
+- examples:示例代码
+- test:集成测试代码
+
+## API
+
+### remote
+```go
+// send a request to servers and return until response received.
+InvokeSync(request *remotingCommand) (*remotingCommand, error)
+
+// send a request to servers, process response with async
+InvokeAsync(request *remotingCommand, f func(*remotingCommand)) error
+
+// send request only, no response will be returned by servers.
+InvokeOneWay(request *remotingCommand) error
+```
+
+### common
+```go
+// for producer
+SendMessageSync(route *TopicRouteInfo, msg *Message) error
+SendMessageAsync(route *TopicRouteInfo, msg *Message, f func(result *SendResult)) error
+
+SendMessageSyncBatch(route *TopicRouteInfo, msg []*Message) error
+SendMessageAsyncBatch(route *TopicRouteInfo, msg []*Message, f func(result *SendResult)) error
+
+// for consumer
+PullMessageSync(route *TopicRouteInfo, request *PullMessageRequest) error
+PullMessageAsync(route *TopicRouteInfo, request *PullMessageRequest, f func(result *PullResult)) error
+
+// for offset
+TODO
+```
+
+## Road map
+for more details about features: [feature-list](feature.md)
+
+### Milestone1(due: 2019.3.10)
+
+#### producer
+- [ ] normal message
+- [ ] order message
+
+#### consumer
+- [ ] normal message with pull/push
+- [ ] order message with pull/push
+- [ ] rebalance
+- [ ] offset manager
+
+#### common
+- [ ] API wrapper
+- [ ] connections manager
+- [ ] route
+
+#### remote
+- [ ] serializer
+- [ ] communication
+- [ ] processor
+- [ ] RPC
+
+### Milestone2 (2019.4.12)
+- Transaction Message
+- ACL
+- Message Tracing
+
+## sub project
+- RocketMQ Administration tools: JVM too heavy for command line tools
\ No newline at end of file
diff --git a/docs/zh/rocketmq-protocol_zh.md b/docs/zh/rocketmq-protocol_zh.md
new file mode 100644
index 0000000..9d741ce
--- /dev/null
+++ b/docs/zh/rocketmq-protocol_zh.md
@@ -0,0 +1,112 @@
+# RocketMQ 通信协议
+
+在RocketMQ中,`RemotingCommand`是RocketMQ通信的基本对象,Request/Response最后均被包装成`RemotingCommand`。一个`RemotingCommand`
+在被序列化后的格式如下:
+```go
+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
++ frame_size | header_length | header_body | body +
+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
++ 4bytes | 4bytes | (19 + r_len + e_len) bytes | remain bytes +
+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
+```
+|item|type|description|
+|:-:|:-:|:-:|
+|frame_size|`int32`|一个`RemotingCommand`数据包大小|
+|header_length|`in32`|高8位表示数据的序列化方式,余下的表示真实header长度|
+|header_body|`[]byte`|header的playload,长度由附带的`remark`和`properties`决定|
+|body|`[]byte`|具体Request/Response的playload|
+
+## Header
+RocketMQ的Header序列化方式有两种:JSON和RocketMQ私有的序列化方式。JSON序列化方式不再赘述。具体可以参考Java`RemotingCommand`类。
+主要介绍RocketMQ的私有序列化方式。
+
+在序列化的时候,需要将序列化方式记录进数据包里面,即对`header_length`进行编码
+
+```go
+// 编码算法
+
+// 编码后的header_length
+var header_length int32
+
+// 实际的header长度
+var headerDataLen int32
+
+// 序列化方式
+var SerializedType byte
+
+result := make([]byte, 4)
+result[0]|SerializedType
+result[1]|byte((headerDataLen >> 16) & 0xFF)
+result[2]|byte((headerDataLen >> 8) & 0xFF)
+result[3]|byte(headerDataLen & 0xFF)
+binary.Read(result, binary.BigEndian, &header_length)
+
+
+// 解码算法
+headerDataLen := header_length & 0xFFFFFF
+SerializedType := byte((header_length >> 24) & 0xFF)
+```
+
+### Header Frame
+
+```
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
++ request_code | l_flag | v_flag | opaque | request_flag | r_len | r_body | e_len | e_body +
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
++ 2bytes | 1byte | 2bytes | 4bytes | 4 bytes | 4 bytes | r_len bytes | 4 bytes | e_len bytes +
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
+```
+
+|item|type|description|
+|:-:|:-:|:-:|
+|request_code|`short`|哪一种Request或ResponseCode,具体类别由request_flag决定|
+|l_flag|`byte`|language位,用来标识Request来源方的开发语言|
+|v_flag|`int16`|版本标记位|
+|request_flag|`int32`|Header标记位,用来标记该`RemotingCommand`的类型和请求方式|
+|opaque|`int32`|标识Request/Response的RequestID,Broker返回的Response通过该值和Client缓存的Request一一对应|
+|r_len|`int32`| length of remark, remark是Request/Response的附带说明信息,一般在Response中用来说明具体的错误原因|
+|r_body|`[]byte`| playload of remark |
+|e_len|`int32`| length of extended fields,即properties,一些非标准字段会存储在这里,在RocketMQ的各种feature中均有广泛应用|
+|e_body|`int32`| playload of extended fields |
+
+## Body
+`body`是具体的Request/Response的数据,在RocketMQ中,有许多种Request/Response。每个类有自己的序列化和反序列方式,由于种类过多,
+这里就不再展开。可以具体参考Java代码中对`CommandCustomHeader`的使用。下面列一些Client使用到的Request和Response
+
+### RequestCode
+|item|type|description|
+|:-:|:-:|:-:|
+|SEND_MESSAGE|10| 向broker发送消息|
+|PULL_MESSAGE|11| 从broker拉取消息,client的push模式也是通过pull的长轮询来实现的|
+|TODO...|||
+
+### ResponseCode
+|item|type|description|
+|:-:|:-:|:-:|
+|FLUSH_DISK_TIMEOUT|10|broker存储层刷盘超时|
+|SLAVE_NOT_AVAILABLE|11|slave节点无法服务|
+|FLUSH_SLAVE_TIMEOUT|12|数据同步到slave超时|
+|MESSAGE_ILLEGAL|13|消息格式不合格|
+|SERVICE_NOT_AVAILABLE|14|broker暂时不可用|
+|VERSION_NOT_SUPPORTED|15|不支持的请求,目前没有看到使用|
+|NO_PERMISSION|16|对broker、topic或subscription无访问权限|
+|TOPIC_EXIST_ALREADY|18|topic已存在,目前没看到使用|
+|PULL_NOT_FOUND|19|没拉到消息,大多为offset错误|
+|PULL_RETRY_IMMEDIATELY|20|建议client立即重新拉取消息|
+|PULL_OFFSET_MOVED|21|offset太小或太大|
+|QUERY_NOT_FOUND|22|管理面Response,TODO|
+|SUBSCRIPTION_PARSE_FAILED|23|订阅数据解析失败|
+|SUBSCRIPTION_NOT_EXIST|24|订阅不存在|
+|SUBSCRIPTION_NOT_LATEST|25|订阅数据版本和request数据版本不匹配|
+|SUBSCRIPTION_GROUP_NOT_EXIST|26|订阅组不存在|
+|FILTER_DATA_NOT_EXIST|27|filter数据不存在|
+|FILTER_DATA_NOT_LATEST|28|filter数据版本和request数据版本不匹配|
+|TRANSACTION_SHOULD_COMMIT|200|事务Response,TODO|
+|TRANSACTION_SHOULD_ROLLBACK|201|事务Response,TODO|
+|TRANSACTION_STATE_UNKNOW|202|事务Response,TODO||
+|TRANSACTION_STATE_GROUP_WRONG|203|事务Response,TODO|
+|NO_BUYER_ID|204|不知道是什么,没看到broker端在使用|
+|NOT_IN_CURRENT_UNIT|205|不知道是什么,没看到broker端在使用|
+|CONSUMER_NOT_ONLINE|206|consumer不在线,控制面response|
+|CONSUME_MSG_TIMEOUT|207|client request等待broker相应超时|
+|NO_MESSAGE|208|控制面response,由client自己设置,不清楚具体用途|
\ No newline at end of file
diff --git a/remote/client.go b/remote/client.go
new file mode 100644
index 0000000..058ccdd
--- /dev/null
+++ b/remote/client.go
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package remote
+
+import (
+ "encoding/binary"
+ "errors"
+ "github.com/apache/rocketmq-client-go/utils"
+ log "github.com/sirupsen/logrus"
+ "net"
+ "sync"
+ "time"
+)
+
+var (
+ ErrRequestTimeout = errors.New("request timeout")
+)
+
+type RemotingClient interface {
+ InvokeSync(request *remotingCommand) (*remotingCommand, error)
+ InvokeAsync(request *remotingCommand, f func(*remotingCommand)) error
+ InvokeOneWay(request *remotingCommand) error
+}
+
+// ClientConfig common config
+type ClientConfig struct {
+ // NameServer or Broker address
+ RemotingAddress string
+
+ ClientIP string
+ InstanceName string
+
+ // Heartbeat interval in microseconds with message broker, default is 30
+ HeartbeatBrokerInterval time.Duration
+
+ // request timeout time
+ RequestTimeout time.Duration
+ CType byte
+
+ UnitMode bool
+ UnitName string
+ VipChannelEnabled bool
+}
+
+type defaultClient struct {
+ //clientId string
+ config ClientConfig
+ conn net.Conn
+ // requestId
+ opaque int32
+
+ // int32 -> ResponseFuture
+ responseTable sync.Map
+ codec serializer
+ exitCh chan interface{}
+}
+
+func NewRemotingClient(config ClientConfig) (RemotingClient, error) {
+ client := &defaultClient{
+ config: config,
+ }
+
+ switch config.CType {
+ case Json:
+ client.codec = &jsonCodec{}
+ case RocketMQ:
+ client.codec = &rmqCodec{}
+ default:
+ return nil, errors.New("unknow codec")
+ }
+
+ conn, err := net.Dial("tcp", config.RemotingAddress)
+ if err != nil {
+ log.Error(err)
+ return nil, err
+ }
+ client.conn = conn
+ go client.listen()
+ go client.clearExpiredRequest()
+ return client, nil
+}
+
+func (client *defaultClient) InvokeSync(request *remotingCommand) (*remotingCommand, error) {
+
+ response := &ResponseFuture{
+ SendRequestOK: false,
+ Opaque: request.Opaque,
+ TimeoutMillis: client.config.RequestTimeout,
+ BeginTimestamp: time.Now().Unix(),
+ Done: make(chan bool),
+ }
+ header, err := encode(request)
+ body := request.Body
+ client.responseTable.Store(request.Opaque, response)
+ err = client.doRequest(header, body)
+
+ if err != nil {
+ log.Error(err)
+ return nil, err
+ }
+ select {
+ case <-response.Done:
+ rmd := response.ResponseCommand
+ return rmd, nil
+ case <-time.After(client.config.RequestTimeout):
+ return nil, ErrRequestTimeout
+ }
+}
+
+func (client *defaultClient) InvokeAsync(request *remotingCommand, f func(*remotingCommand)) error {
+
+ response := &ResponseFuture{
+ SendRequestOK: false,
+ Opaque: request.Opaque,
+ TimeoutMillis: client.config.RequestTimeout,
+ BeginTimestamp: time.Now().Unix(),
+ callback: f,
+ }
+ client.responseTable.Store(request.Opaque, response)
+ header, err := encode(request)
+ if err != nil {
+ return err
+ }
+
+ body := request.Body
+ return client.doRequest(header, body)
+}
+
+func (client *defaultClient) InvokeOneWay(request *remotingCommand) error {
+ header, err := encode(request)
+ if err != nil {
+ return err
+ }
+
+ body := request.Body
+ return client.doRequest(header, body)
+}
+
+func (client *defaultClient) doRequest(header, body []byte) error {
+ var requestBytes []byte
+ requestBytes = append(requestBytes, header...)
+ if body != nil && len(body) > 0 {
+ requestBytes = append(requestBytes, body...)
+ }
+ _, err := client.conn.Write(requestBytes)
+ return err
+}
+
+func (client *defaultClient) close() {
+ // TODO process response
+ client.conn.Close()
+}
+
+func (client *defaultClient) listen() {
+ rb := utils.NewRingBuffer(4096)
+
+ var frameSize int32
+ go func() {
+ for {
+ err := binary.Read(rb, binary.BigEndian, &frameSize)
+ if err != nil {
+ // TODO
+ }
+ data := make([]byte, frameSize)
+
+ _, err = rb.Read(data)
+
+ if err != nil {
+ // TODO
+ }
+
+ cmd, err := decode(data)
+ if cmd.isResponseType() {
+ client.handleResponse(cmd)
+ } else {
+ client.handleRequestFromServer(cmd)
+ }
+ }
+ }()
+
+ buf := make([]byte, 4096)
+ for {
+ n, err := client.conn.Read(buf)
+ if err != nil {
+ log.Errorf("read data from connection errors: %v", err)
+ return
+ }
+ err = rb.Write(buf[:n])
+ if err != nil {
+ // just log
+ log.Errorf("write data to buffer errors: %v", err)
+ }
+
+ }
+}
+
+func (client *defaultClient) handleRequestFromServer(cmd *remotingCommand) {
+ //responseCommand := client.clientRequestProcessor(cmd)
+ //if responseCommand == nil {
+ // return
+ //}
+ //responseCommand.Opaque = cmd.Opaque
+ //responseCommand.markResponseType()
+ //header, err := encode(responseCommand)
+ //body := responseCommand.Body
+ //err = client.doRequest(header, body)
+ //if err != nil {
+ // log.Error(err)
+ //}
+}
+
+func (client *defaultClient) handleResponse(cmd *remotingCommand) error {
+ //response, err := client.getResponse(cmd.Opaque)
+ ////client.removeResponse(cmd.Opaque)
+ //if err != nil {
+ // return err
+ //}
+ //
+ //response.ResponseCommand = cmd
+ //response.callback(cmd)
+ //
+ //if response.Done != nil {
+ // response.Done <- true
+ //}
+ return nil
+}
+
+func (client *defaultClient) clearExpiredRequest() {
+ //for seq, responseObj := range client.responseTable.Items() {
+ // response := responseObj.(*ResponseFuture)
+ // if (response.BeginTimestamp + 30) <= time.Now().Unix() {
+ // //30 minutes expired
+ // client.responseTable.Remove(seq)
+ // response.callback(nil)
+ // log.Warningf("remove time out request %v", response)
+ // }
+ //}
+}
diff --git a/remote/client_test.go b/remote/client_test.go
new file mode 100644
index 0000000..7909612
--- /dev/null
+++ b/remote/client_test.go
@@ -0,0 +1,17 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package remote
diff --git a/remote/codec.go b/remote/codec.go
new file mode 100644
index 0000000..0a62642
--- /dev/null
+++ b/remote/codec.go
@@ -0,0 +1,431 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package remote
+
+import (
+ "bytes"
+ "encoding/binary"
+ "encoding/json"
+ "sync/atomic"
+)
+
+var opaque int32
+
+const (
+ // 0, REQUEST_COMMAND
+ RpcType = 0
+
+ // 1, RPC
+ RpcOneWay = 1
+
+ ResponseType = 1
+
+ RemotingCommandFlag = 0
+ languageFlag = "golang"
+ languageCode = byte(9)
+ remotingCommandVersion = 137
+)
+
+type remotingCommand struct {
+ Code int16 `json:"code"`
+ Language string `json:"language"`
+ Version int16 `json:"version"`
+ Opaque int32 `json:"opaque"`
+ Flag int `json:"flag"`
+ Remark string `json:"remark"`
+ ExtFields map[string]string `json:"extFields"`
+ Body []byte `json:"body,omitempty"`
+}
+
+func newRemotingCommand(code int16, properties map[string]string, body []byte) *remotingCommand {
+ remotingCommand := &remotingCommand{
+ Code: code,
+ Language: languageFlag,
+ Version: remotingCommandVersion,
+ Opaque: atomic.AddInt32(&opaque, 1),
+ ExtFields: properties,
+ Body: body,
+ }
+
+ return remotingCommand
+}
+
+func (command *remotingCommand) isResponseType() bool {
+ return command.Flag&(ResponseType) == ResponseType
+}
+
+func (command *remotingCommand) markResponseType() {
+ command.Flag = command.Flag | ResponseType
+}
+
+var (
+ jsonSerializer = &jsonCodec{}
+ rocketMqSerializer = &rmqCodec{}
+ codecType byte
+)
+
+// encode remotingCommand
+//
+// Frame format:
+// +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
+// + item | frame_size | header_length | header_body | body +
+// +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
+// + len | 4bytes | 4bytes | (19 + r_len + e_len) bytes | remain bytes +
+// +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
+func encode(command *remotingCommand) ([]byte, error) {
+ var (
+ header []byte
+ err error
+ )
+
+ switch codecType {
+ case Json:
+ header, err = jsonSerializer.encodeHeader(command)
+ case RocketMQ:
+ header, err = rocketMqSerializer.encodeHeader(command)
+ }
+
+ if err != nil {
+ return nil, err
+ }
+
+ frameSize := 8 + len(header) + len(command.Body)
+ buf := bytes.NewBuffer(make([]byte, frameSize))
+
+ err = binary.Write(buf, binary.BigEndian, int32(frameSize))
+ if err != nil {
+ return nil, err
+ }
+
+ err = binary.Write(buf, binary.BigEndian, markProtocolType(int32(len(header))))
+ if err != nil {
+ return nil, err
+ }
+
+ err = binary.Write(buf, binary.BigEndian, int32(len(command.Body)))
+ if err != nil {
+ return nil, err
+ }
+
+ return buf.Bytes(), nil
+}
+
+func decode(data []byte) (*remotingCommand, error) {
+ buf := bytes.NewBuffer(data)
+
+ var oriHeaderLen, headerLength int32
+ err := binary.Read(buf, binary.BigEndian, &oriHeaderLen)
+ if err != nil {
+ return nil, err
+ }
+ headerLength = oriHeaderLen & 0xFFFFFF
+
+ headerData := make([]byte, headerLength)
+ err = binary.Read(buf, binary.BigEndian, &headerLength)
+ if err != nil {
+ return nil, err
+ }
+
+ var command *remotingCommand
+
+ switch byte((oriHeaderLen >> 24) & 0xFF) {
+ case Json:
+ command, err = jsonSerializer.decodeHeader(headerData)
+ case RocketMQ:
+ command, err = rocketMqSerializer.decodeHeader(headerData)
+ }
+ if err != nil {
+ return nil, err
+ }
+
+ bodyLength := int32(len(data)) - 4 - headerLength
+ command.Body = make([]byte, bodyLength)
+ err = binary.Read(buf, binary.BigEndian, &command.Body)
+ if err != nil {
+ return nil, err
+ }
+
+ return command, nil
+}
+
+func markProtocolType(source int32) []byte {
+ result := make([]byte, 4)
+ result[0] = codecType
+ result[1] = byte((source >> 16) & 0xFF)
+ result[2] = byte((source >> 8) & 0xFF)
+ result[3] = byte(source & 0xFF)
+ return result
+}
+
+const (
+ Json = byte(0)
+ RocketMQ = byte(1)
+)
+
+type serializer interface {
+ encodeHeader(command *remotingCommand) ([]byte, error)
+ decodeHeader(data []byte) (*remotingCommand, error)
+}
+
+// jsonCodec please refer to remoting/protocol/RemotingSerializable
+type jsonCodec struct {}
+
+func (c *jsonCodec) encodeHeader(command *remotingCommand) ([]byte, error) {
+ buf, err := json.Marshal(command)
+ if err != nil {
+ return nil, err
+ }
+ return buf, nil
+}
+
+func (c *jsonCodec) decodeHeader(header []byte) (*remotingCommand, error) {
+ command := &remotingCommand{}
+ command.ExtFields = make(map[string]string)
+ err := json.Unmarshal(header, command)
+ if err != nil {
+ return nil, err
+ }
+ return command, nil
+}
+
+// rmqCodec implementation of RocketMQ private protocol, please refer to remoting/protocol/RocketMQSerializable
+// RocketMQ Private Protocol Header format:
+//
+// v_flag: version flag
+// r_len: length of remark body
+// r_body: data of remark body
+// e_len: length of extends fields body
+// e_body: data of extends fields
+//
+// +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
+// + item | request_code | l_flag | v_flag | opaque | request_flag | r_len | r_body | e_len | e_body +
+// +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
+// + len | 2bytes | 1byte | 2bytes | 4bytes | 4 bytes | 4 bytes | r_len bytes | 4 bytes | e_len bytes +
+// +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
+const (
+ // header + body length
+ headerFixedLength = 21
+)
+
+type rmqCodec struct {}
+
+// encodeHeader
+func (c *rmqCodec) encodeHeader(command *remotingCommand) ([]byte, error) {
+ extBytes, err := c.encodeMaps(command.ExtFields)
+ if err != nil {
+ return nil, err
+ }
+
+ buf := bytes.NewBuffer(make([]byte, headerFixedLength + len(command.Remark) + len(extBytes)))
+
+ // request code, length is 2 bytes
+ err = binary.Write(buf, binary.BigEndian, command.Code)
+ if err != nil {
+ return nil, err
+ }
+
+ // language flag, length is 1 byte
+ err = binary.Write(buf, binary.BigEndian, languageCode)
+ if err != nil {
+ return nil, err
+ }
+
+ // version flag, length is 2 bytes
+ err = binary.Write(buf, binary.BigEndian, command.Version)
+ if err != nil {
+ return nil, err
+ }
+
+ // opaque flag, opaque is request identifier, length is 4 bytes
+ err = binary.Write(buf, binary.BigEndian, command.Opaque)
+ if err != nil {
+ return nil, err
+ }
+
+ // request flag, length is 4 bytes
+ err = binary.Write(buf, binary.BigEndian, command.Flag)
+ if err != nil {
+ return nil, err
+ }
+
+ // remark length flag, length is 4 bytes
+ err = binary.Write(buf, binary.BigEndian, int32(len(command.Remark)))
+ if err != nil {
+ return nil, err
+ }
+
+ // write remark, len(command.Remark) bytes
+ if len(command.Remark) > 0 {
+ err = binary.Write(buf, binary.BigEndian, command.Remark)
+ if err != nil {
+ return nil, err
+ }
+ }
+
+ err = binary.Write(buf, binary.BigEndian, int32(len(extBytes)))
+ if err != nil {
+ return nil, err
+ }
+
+ if len(extBytes) > 0 {
+ err = binary.Write(buf, binary.BigEndian, extBytes)
+ if err != nil {
+ return nil, err
+ }
+ }
+
+ return buf.Bytes(), nil
+}
+
+func (c *rmqCodec) encodeMaps(maps map[string]string) ([]byte, error) {
+ if maps == nil || len(maps) == 0 {
+ return []byte{}, nil
+ }
+ extFieldsBuf := bytes.NewBuffer([]byte{})
+ var err error
+ for key, value := range maps {
+ err = binary.Write(extFieldsBuf, binary.BigEndian, int16(len(key)))
+ if err != nil {
+ return nil, err
+ }
+ err = binary.Write(extFieldsBuf, binary.BigEndian, []byte(key))
+ if err != nil {
+ return nil, err
+ }
+
+ err = binary.Write(extFieldsBuf, binary.BigEndian, int32(len(value)))
+ if err != nil {
+ return nil, err
+ }
+ err = binary.Write(extFieldsBuf, binary.BigEndian, []byte(value))
+ if err != nil {
+ return nil, err
+ }
+ }
+ return extFieldsBuf.Bytes(), nil
+}
+
+func (c *rmqCodec) decodeHeader(data []byte) (*remotingCommand, error) {
+ var err error
+ command := &remotingCommand{}
+ buf := bytes.NewBuffer(data)
+ // int code(~32767)
+ err = binary.Read(buf, binary.BigEndian, &command.Code)
+ if err != nil {
+ return nil, err
+ }
+
+ // LanguageCode language
+
+ var (
+ languageCode byte
+ remarkLen int32
+ extFieldsLen int32
+ )
+ err = binary.Read(buf, binary.BigEndian, &languageCode)
+ if err != nil {
+ return nil, err
+ }
+ //command.Language = languageFlag
+
+ // int version(~32767)
+ err = binary.Read(buf, binary.BigEndian, &command.Version)
+ if err != nil {
+ return nil, err
+ }
+
+ // int opaque
+ err = binary.Read(buf, binary.BigEndian, &command.Opaque)
+ if err != nil {
+ return nil, err
+ }
+
+ // int flag
+ err = binary.Read(buf, binary.BigEndian, &command.Flag)
+ if err != nil {
+ return nil, err
+ }
+
+ // String remark
+ err = binary.Read(buf, binary.BigEndian, &remarkLen)
+ if err != nil {
+ return nil, err
+ }
+
+ if remarkLen > 0 {
+ var remarkData = make([]byte, remarkLen)
+ err = binary.Read(buf, binary.BigEndian, &remarkData)
+ if err != nil {
+ return nil, err
+ }
+ command.Remark = string(remarkData)
+ }
+
+ err = binary.Read(buf, binary.BigEndian, &extFieldsLen)
+ if err != nil {
+ return nil, err
+ }
+
+ if extFieldsLen > 0 {
+ extFieldsData := make([]byte, extFieldsLen)
+ err = binary.Read(buf, binary.BigEndian, &extFieldsData)
+ if err != nil {
+ return nil, err
+ }
+
+ command.ExtFields = make(map[string]string)
+ buf := bytes.NewBuffer(extFieldsData)
+ var (
+ kLen int16
+ vLen int32
+ )
+ for buf.Len() > 0 {
+ err = binary.Read(buf, binary.BigEndian, &kLen)
+ if err != nil {
+ return nil, err
+ }
+
+ key, err := getExtFieldsData(buf, int32(kLen))
+ if err != nil {
+ return nil, err
+ }
+
+ err = binary.Read(buf, binary.BigEndian, &vLen)
+ if err != nil {
+ return nil, err
+ }
+
+ value, err := getExtFieldsData(buf, vLen)
+ if err != nil {
+ return nil, err
+ }
+ command.ExtFields[key] = value
+ }
+ }
+
+ return command, nil
+}
+
+func getExtFieldsData(buff *bytes.Buffer, length int32) (string, error) {
+ var data = make([]byte, length)
+ err := binary.Read(buff, binary.BigEndian, &data)
+ if err != nil {
+ return "", err
+ }
+
+ return string(data), nil
+}
diff --git a/remote/codec_test.go b/remote/codec_test.go
new file mode 100644
index 0000000..35cf702
--- /dev/null
+++ b/remote/codec_test.go
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package remote
+
+import (
+ "testing"
+)
+
+func TestEncode(t *testing.T) {
+}
+
+func TestDecode(t *testing.T) {
+}
\ No newline at end of file
diff --git a/remote/processor.go b/remote/processor.go
new file mode 100644
index 0000000..2d0218e
--- /dev/null
+++ b/remote/processor.go
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package remote
+
+type ClientRequestProcessor func(remotingCommand *remotingCommand) (responseCommand *remotingCommand)
+
+//CHECK_TRANSACTION_STATE
+//NOTIFY_CONSUMER_IDS_CHANGED
+//RESET_CONSUMER_CLIENT_OFFSET
+//GET_CONSUMER_STATUS_FROM_CLIENT
+//GET_CONSUMER_RUNNING_INFO
+//CONSUME_MESSAGE_DIRECTLY
diff --git a/remote/request.go b/remote/request.go
new file mode 100644
index 0000000..c37eac5
--- /dev/null
+++ b/remote/request.go
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package remote
+
+const (
+ SEND_MESSAGE = 10
+ PULL_MESSAGE = 11
+ QUERY_MESSAGE = 12
+ QUERY_BROKER_OFFSET = 13
+ QUERY_CONSUMER_OFFSET = 14
+ UPDATE_CONSUMER_OFFSET = 15
+ UPDATE_AND_CREATE_TOPIC = 17
+ GET_ALL_TOPIC_CONFIG = 21
+ GET_TOPIC_CONFIG_LIST = 22
+ GET_TOPIC_NAME_LIST = 23
+ UPDATE_BROKER_CONFIG = 25
+ GET_BROKER_CONFIG = 26
+ TRIGGER_DELETE_FILES = 27
+ GET_BROKER_RUNTIME_INFO = 28
+ SEARCH_OFFSET_BY_TIMESTAMP = 29
+ GET_MAX_OFFSET = 30
+ GET_MIN_OFFSET = 31
+ GET_EARLIEST_MSG_STORETIME = 32
+ VIEW_MESSAGE_BY_ID = 33
+ HEART_BEAT = 34
+ UNREGISTER_CLIENT = 35
+ CONSUMER_SEND_MSG_BACK = 36
+ END_TRANSACTION = 37
+ GET_CONSUMER_LIST_BY_GROUP = 38
+ CHECK_TRANSACTION_STATE = 39
+ NOTIFY_CONSUMER_IDS_CHANGED = 40
+ LOCK_BATCH_MQ = 41
+ UNLOCK_BATCH_MQ = 42
+ GET_ALL_CONSUMER_OFFSET = 43
+ GET_ALL_DELAY_OFFSET = 45
+ PUT_KV_CONFIG = 100
+ GET_KV_CONFIG = 101
+ DELETE_KV_CONFIG = 102
+ REGISTER_BROKER = 103
+ UNREGISTER_BROKER = 104
+ GET_ROUTEINTO_BY_TOPIC = 105
+ GET_BROKER_CLUSTER_INFO = 106
+ UPDATE_AND_CREATE_SUBSCRIPTIONGROUP = 200
+ GET_ALL_SUBSCRIPTIONGROUP_CONFIG = 201
+ GET_TOPIC_STATS_INFO = 202
+ GET_CONSUMER_CONNECTION_LIST = 203
+ GET_PRODUCER_CONNECTION_LIST = 204
+ WIPE_WRITE_PERM_OF_BROKER = 205
+
+ GET_ALL_TOPIC_LIST_FROM_NAMESERVER = 206
+ DELETE_SUBSCRIPTIONGROUP = 207
+ GET_CONSUME_STATS = 208
+ SUSPEND_CONSUMER = 209
+ RESUME_CONSUMER = 210
+ RESET_CONSUMER_OFFSET_IN_CONSUMER = 211
+ RESET_CONSUMER_OFFSET_IN_BROKER = 212
+ ADJUST_CONSUMER_THREAD_POOL = 213
+ WHO_CONSUME_THE_MESSAGE = 214
+
+ DELETE_TOPIC_IN_BROKER = 215
+ DELETE_TOPIC_IN_NAMESRV = 216
+ GET_KV_CONFIG_BY_VALUE = 217
+ DELETE_KV_CONFIG_BY_VALUE = 218
+ GET_KVLIST_BY_NAMESPACE = 219
+
+ RESET_CONSUMER_CLIENT_OFFSET = 220
+ GET_CONSUMER_STATUS_FROM_CLIENT = 221
+ INVOKE_BROKER_TO_RESET_OFFSET = 222
+ INVOKE_BROKER_TO_GET_CONSUMER_STATUS = 223
+
+ QUERY_TOPIC_CONSUME_BY_WHO = 300
+
+ GET_TOPICS_BY_CLUSTER = 224
+
+ REGISTER_FILTER_SERVER = 301
+ REGISTER_MESSAGE_FILTER_CLASS = 302
+ QUERY_CONSUME_TIME_SPAN = 303
+ GET_SYSTEM_TOPIC_LIST_FROM_NS = 304
+ GET_SYSTEM_TOPIC_LIST_FROM_BROKER = 305
+
+ CLEAN_EXPIRED_CONSUMEQUEUE = 306
+
+ GET_CONSUMER_RUNNING_INFO = 307
+
+ QUERY_CORRECTION_OFFSET = 308
+
+ CONSUME_MESSAGE_DIRECTLY = 309
+
+ SEND_MESSAGE_V2 = 310
+
+ GET_UNIT_TOPIC_LIST = 311
+ GET_HAS_UNIT_SUB_TOPIC_LIST = 312
+ GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST = 313
+ CLONE_GROUP_OFFSET = 314
+
+ VIEW_BROKER_STATS_DATA = 315
+)
diff --git a/remote/response.go b/remote/response.go
new file mode 100644
index 0000000..7053e27
--- /dev/null
+++ b/remote/response.go
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package remote
+
+import "time"
+
+const (
+ SUCCESS = 0
+ SYSTEM_ERROR = 1
+ SYSTEM_BUSY = 2
+ REQUEST_CODE_NOT_SUPPORTED = 3
+ TRANSACTION_FAILED = 4
+ FLUSH_DISK_TIMEOUT = 10
+ SLAVE_NOT_AVAILABLE = 11
+ FLUSH_SLAVE_TIMEOUT = 12
+ MESSAGE_ILLEGAL = 13
+ SERVICE_NOT_AVAILABLE = 14
+ VERSION_NOT_SUPPORTED = 15
+ NO_PERMISSION = 16
+ TOPIC_NOT_EXIST = 17
+ TOPIC_EXIST_ALREADY = 18
+ PULL_NOT_FOUND = 19
+ PULL_RETRY_IMMEDIATELY = 20
+ PULL_OFFSET_MOVED = 21
+ QUERY_NOT_FOUND = 22
+ SUBSCRIPTION_PARSE_FAILED = 23
+ SUBSCRIPTION_NOT_EXIST = 24
+ SUBSCRIPTION_NOT_LATEST = 25
+ SUBSCRIPTION_GROUP_NOT_EXIST = 26
+ TRANSACTION_SHOULD_COMMIT = 200
+ TRANSACTION_SHOULD_ROLLBACK = 201
+ TRANSACTION_STATE_UNKNOW = 202
+ TRANSACTION_STATE_GROUP_WRONG = 203
+ NO_BUYER_ID = 204
+
+ NOT_IN_CURRENT_UNIT = 205
+
+ CONSUMER_NOT_ONLINE = 206
+
+ CONSUME_MSG_TIMEOUT = 207
+)
+
+type ResponseFuture struct {
+ ResponseCommand *remotingCommand
+ SendRequestOK bool
+ Rrr error
+ Opaque int32
+ TimeoutMillis time.Duration
+ callback func(*remotingCommand)
+ BeginTimestamp int64
+ Done chan bool
+}
diff --git a/remote/rpchook.go b/remote/rpchook.go
new file mode 100644
index 0000000..134e4be
--- /dev/null
+++ b/remote/rpchook.go
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package remote
+
+type RPCHook interface {
+ DoBeforeRequest(string, *remotingCommand)
+ DoAfterResponse(string, *remotingCommand)
+}
diff --git a/utils/ring_buffer.go b/utils/ring_buffer.go
new file mode 100644
index 0000000..ee60bcd
--- /dev/null
+++ b/utils/ring_buffer.go
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package utils
+
+import "sync"
+
+type RingBuffer struct {
+ buf []byte
+ writePos int
+ readPos int
+ cap int
+ rwMutex sync.RWMutex
+ exitCh chan interface{}
+}
+
+func NewRingBuffer(cap int) *RingBuffer {
+ rb := &RingBuffer{buf: make([]byte, cap), cap:cap}
+ go rb.resize()
+ return rb
+}
+
+func (r *RingBuffer) Write(b []byte) error {
+ // TODO
+ return nil
+}
+
+func (r *RingBuffer) Read(p []byte) (n int, err error) {
+
+ if r.Size() >= len(p) {
+ copy(p, r.buf[r.readPos: r.readPos + len(p)])
+ r.readPos += len(p)
+
+ }
+
+ // TODO waiting data...
+ return 0, err
+}
+
+func (r *RingBuffer) Size() int {
+ return r.writePos - r.readPos
+}
+
+func (r *RingBuffer) Destroy() {
+
+}
+
+func (r *RingBuffer) resize() {
+ // TODO
+}
\ No newline at end of file