You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by di...@apache.org on 2019/02/18 02:09:06 UTC

[rocketmq-client-go] branch native updated: Adding partial implementation of remote in native and docs (#29)

This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/native by this push:
     new 26011d0  Adding partial implementation of remote in native and docs  (#29)
26011d0 is described below

commit 26011d04ffdef49c5aa98e0094ecd9406023dcf1
Author: wenfeng <sx...@gmail.com>
AuthorDate: Mon Feb 18 10:09:02 2019 +0800

    Adding partial implementation of remote in native and docs  (#29)
    
    * Add implementation of RocketMQ protocol in pure go
    
    * Add design & protocol docs
---
 common/manager.go               |   1 +
 common/route.go                 |   1 +
 core/producer.go                |   6 +-
 core/push_consumer.go           |   2 +-
 {doc => docs}/Introduction.md   |   0
 docs/client-design.gliffy       |   1 +
 docs/client-design.png          | Bin 0 -> 30513 bytes
 docs/feature.md                 |  93 +++++++++
 docs/zh/native-design_zh.md     | 102 ++++++++++
 docs/zh/rocketmq-protocol_zh.md | 112 +++++++++++
 remote/client.go                | 252 +++++++++++++++++++++++
 remote/client_test.go           |  17 ++
 remote/codec.go                 | 431 ++++++++++++++++++++++++++++++++++++++++
 remote/codec_test.go            |  27 +++
 remote/processor.go             |  26 +++
 remote/request.go               | 111 +++++++++++
 remote/response.go              |  66 ++++++
 remote/rpchook.go               |  23 +++
 utils/ring_buffer.go            |  64 ++++++
 19 files changed, 1331 insertions(+), 4 deletions(-)

diff --git a/common/manager.go b/common/manager.go
new file mode 100644
index 0000000..805d0c7
--- /dev/null
+++ b/common/manager.go
@@ -0,0 +1 @@
+package common
diff --git a/common/route.go b/common/route.go
new file mode 100644
index 0000000..805d0c7
--- /dev/null
+++ b/common/route.go
@@ -0,0 +1 @@
+package common
diff --git a/core/producer.go b/core/producer.go
index fe2d978..7df57e6 100644
--- a/core/producer.go
+++ b/core/producer.go
@@ -235,12 +235,12 @@ func (p *defaultProducer) SendMessageOrderly(msg *Message, selector MessageQueue
 		unsafe.Pointer(&key),
 		C.int(autoRetryTimes),
 		&sr))
-	
+
 	if err != NIL {
 		log.Warnf("send message orderly error, error is: %s", err.Error())
 		return nil, err
 	}
-	
+
 	return &SendResult{
 		Status: SendStatus(sr.sendStatus),
 		MsgId:  C.GoString(&sr.msgId[0]),
@@ -257,7 +257,7 @@ func (p *defaultProducer) SendMessageOneway(msg *Message) error {
 		log.Warnf("send message with oneway error, error is: %s", err.Error())
 		return err
 	}
-	
+
 	log.Debugf("Send Message: %s with oneway success.", msg.String())
 	return nil
 }
diff --git a/core/push_consumer.go b/core/push_consumer.go
index 90e1454..c222587 100644
--- a/core/push_consumer.go
+++ b/core/push_consumer.go
@@ -32,11 +32,11 @@ int callback_cgo(CPushConsumer *consumer, CMessageExt *msg) {
 import "C"
 
 import (
+	"errors"
 	"fmt"
 	log "github.com/sirupsen/logrus"
 	"sync"
 	"unsafe"
-	"errors"
 )
 
 type ConsumeStatus int
diff --git a/doc/Introduction.md b/docs/Introduction.md
similarity index 100%
rename from doc/Introduction.md
rename to docs/Introduction.md
diff --git a/docs/client-design.gliffy b/docs/client-design.gliffy
new file mode 100644
index 0000000..418f4d8
--- /dev/null
+++ b/docs/client-design.gliffy
@@ -0,0 +1 @@
+{"contentType":"application/gliffy+json","version":"1.1","metadata":{"title":"untitled","revision":0,"exportBorder":false},"embeddedResources":{"index":0,"resources":[]},"stage":{"objects":[{"x":31,"y":44,"rotation":0,"id":57,"uid":"com.gliffy.shape.basic.basic_v1.default.text","width":150,"height":27,"lockAspectRatio":false,"lockShape":false,"order":57,"graphic":{"type":"Text","Text":{"tid":null,"valign":"middle","overflow":"none","vposition":"none","hposition":"none","html":"<p style=\ [...]
\ No newline at end of file
diff --git a/docs/client-design.png b/docs/client-design.png
new file mode 100644
index 0000000..97b86ef
Binary files /dev/null and b/docs/client-design.png differ
diff --git a/docs/feature.md b/docs/feature.md
new file mode 100644
index 0000000..bc23be2
--- /dev/null
+++ b/docs/feature.md
@@ -0,0 +1,93 @@
+# Feature
+
+## Producer
+- [ ] ProducerType
+    - [ ] DefaultProducer
+    - [ ] TransactionProducer
+- [ ] API
+    - [ ] Send
+        - [ ] Sync
+        - [ ] Async
+        - [ ] OneWay
+- [ ] Other
+    - [ ] DelayMessage
+    - [ ] Config
+    - [ ] MessageId Generate
+    - [ ] CompressMsg
+    - [ ] TimeOut
+    - [ ] LoadBalance
+    - [ ] DefaultTopic
+    - [ ] VipChannel
+    - [ ] Retry
+    - [ ] SendMessageHook
+    - [ ] CheckRequestQueue
+    - [ ] CheckForbiddenHookList
+    - [ ] MQFaultStrategy
+
+## Consumer
+- [ ] ConsumerType
+    - [ ] PushConsumer
+    - [ ] PullConsumer
+- [ ] MessageListener
+    - [ ] Concurrently
+    - [ ] Orderly
+- [ ] MessageModel
+    - [ ] CLUSTERING
+    - [ ] BROADCASTING
+- [ ] OffsetStore
+    - [ ] RemoteBrokerOffsetStore
+        - [ ] many actions
+    - [ ] LocalFileOffsetStore
+- [ ] RebalanceService
+- [ ] PullMessageService
+- [ ] ConsumeMessageService
+- [ ] AllocateMessageQueueStrategy
+    - [ ] AllocateMessageQueueAveragely
+    - [ ] AllocateMessageQueueAveragelyByCircle
+    - [ ] AllocateMessageQueueByConfig
+    - [ ] AllocateMessageQueueByMachineRoom
+- [ ] Other
+    - [ ] Config
+    - [ ] ZIP
+    - [ ] AllocateMessageQueueStrategy
+    - [ ] ConsumeFromWhere
+        - [ ] CONSUME_FROM_LAST_OFFSET
+        - [ ] CONSUME_FROM_FIRST_OFFSET
+        - [ ] CONSUME_FROM_TIMESTAMP
+    - [ ] Retry(sendMessageBack)
+    - [ ] TimeOut(clearExpiredMessage)
+    - [ ] ACK(partSuccess)
+    - [ ] FlowControl(messageCanNotConsume)
+    - [ ] ConsumeMessageHook
+    - [ ] filterMessageHookList
+
+## Manager
+- [ ] Multiple Request API Wrapper
+    - many functions...
+- [ ] Task
+    - [ ] PollNameServer
+    - [ ] Heartbeat
+    - [ ] UpdateTopicRouteInfoFromNameServer
+    - [ ] CleanOfflineBroker
+    - [ ] PersistAllConsumerOffset
+    - [ ] ClearExpiredMessage(form consumer consumeMessageService)
+- [ ] Processor
+    - [ ] CHECK_TRANSACTION_STATE
+    - [ ] NOTIFY_CONSUMER_IDS_CHANGED
+    - [ ] RESET_CONSUMER_CLIENT_OFFSET
+    - [ ] GET_CONSUMER_STATUS_FROM_CLIENT
+    - [ ] GET_CONSUMER_RUNNING_INFO
+    - [ ] CONSUME_MESSAGE_DIRECTLY
+    
+## Remoting
+- [ ] API
+    - [ ] InvokeSync
+    - [ ] InvokeAsync
+    - [ ] InvokeOneWay
+- [ ] Serialize
+    - [ ] JSON
+    - [ ] ROCKETMQ
+- [ ] Other
+    - [ ] VIPChannel
+    - [ ] RPCHook
+    
\ No newline at end of file
diff --git a/docs/zh/native-design_zh.md b/docs/zh/native-design_zh.md
new file mode 100644
index 0000000..c1745e0
--- /dev/null
+++ b/docs/zh/native-design_zh.md
@@ -0,0 +1,102 @@
+# RocketMQ Go Client Design Draft
+
+## Architecture
+
+### Overview
+![client-design](client-design.png)
+
+### Description
+在RocketMQ Java Client的实现里面,代码耦合了大量的admin方面的功能, 其为了尽可能的提高代码复用率,代码的依赖关系较为复杂、接口的设计比
+较重、语义的界限不够清晰。因此,为了避免简单的对Java代码进行翻译,故Go客户端进行了重新的设计,剥离了admin相关的逻辑。整体如上面的图所示,在逻辑层次上,按照请求
+顺序,从下到上总共分为三层:
+- remote层:client网络通信和私有协议层,将到每个到服务端(NameServer或Broker)的连接实体抽象为一个client结构,并在这里实现了网络数据的
+序列/反序列化。
+- 公共层:由于remote层对上层只暴露了`Sync/Async/Oneway`三个接口,所以对于特定的Request/Response、连接的管理、路由等信息的处理和维护、
+其它`producer/consumer`共用的逻辑,均在这里实现。
+- 业务逻辑层:在这里实现各种producer、consumer的语义。所有producer、consumer专有的逻辑,均放到这里实现,如`queue selector`,
+`consume balance`, `offset storage`等。除了基础的数据结构外,producer和consumer之间内部的代码不能进行复用。
+
+
+## 设计目标
+0. 兼容cgo版本已经暴露出去的API
+1. 实现语义清晰、轻量的API接口
+2. 依赖关系清晰简单,设计和实现要正交
+
+## 目录结构
+### 源码
+- producer:producer相关逻辑
+- consumer:consumer相关逻辑
+- common(可改名):连接管理和路由管理相关的通用逻辑
+- remote:网络通信和序列化
+
+### 其它
+- benchmark:压力测试相关代码
+- core:1.2版本cgo的代码库,该目录下的代码将会被移除,只保留API进行兼容,并会被标记为`Deprecated`
+- docs:文档,包括面向用户和开发者
+- examples:示例代码
+- test:集成测试代码
+
+## API
+
+### remote
+```go
+// send a request to servers and return until response received.
+InvokeSync(request *remotingCommand) (*remotingCommand, error)
+
+// send a request to servers, process response with async
+InvokeAsync(request *remotingCommand, f func(*remotingCommand)) error
+
+// send request only, no response will be returned by servers.
+InvokeOneWay(request *remotingCommand) error
+```
+
+### common
+```go
+// for producer
+SendMessageSync(route *TopicRouteInfo, msg *Message) error
+SendMessageAsync(route *TopicRouteInfo, msg *Message, f func(result *SendResult)) error
+
+SendMessageSyncBatch(route *TopicRouteInfo, msg []*Message) error
+SendMessageAsyncBatch(route *TopicRouteInfo, msg []*Message, f func(result *SendResult)) error
+
+// for consumer
+PullMessageSync(route *TopicRouteInfo, request *PullMessageRequest) error
+PullMessageAsync(route *TopicRouteInfo, request *PullMessageRequest, f func(result *PullResult)) error
+
+// for offset
+TODO
+```
+
+## Road map
+for more details about features: [feature-list](feature.md)
+
+### Milestone1(due: 2019.3.10)
+
+#### producer
+- [ ] normal message
+- [ ] order message
+
+#### consumer
+- [ ] normal message with pull/push
+- [ ] order message with pull/push
+- [ ] rebalance
+- [ ] offset manager
+
+#### common
+- [ ] API wrapper
+- [ ] connections manager
+- [ ] route
+
+#### remote
+- [ ] serializer
+- [ ] communication
+- [ ] processor
+- [ ] RPC
+
+### Milestone2 (2019.4.12)
+- Transaction Message
+- ACL
+- Message Tracing
+
+## sub project
+- RocketMQ Administration tools: JVM too heavy for command line tools
\ No newline at end of file
diff --git a/docs/zh/rocketmq-protocol_zh.md b/docs/zh/rocketmq-protocol_zh.md
new file mode 100644
index 0000000..9d741ce
--- /dev/null
+++ b/docs/zh/rocketmq-protocol_zh.md
@@ -0,0 +1,112 @@
+# RocketMQ 通信协议
+
+在RocketMQ中,`RemotingCommand`是RocketMQ通信的基本对象,Request/Response最后均被包装成`RemotingCommand`。一个`RemotingCommand`
+在被序列化后的格式如下:
+```go
+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
++ frame_size | header_length |         header_body        |     body     +
+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
++   4bytes   |     4bytes    | (19 + r_len + e_len) bytes | remain bytes +
+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
+```
+|item|type|description|
+|:-:|:-:|:-:|
+|frame_size|`int32`|一个`RemotingCommand`数据包大小|
+|header_length|`in32`|高8位表示数据的序列化方式,余下的表示真实header长度|
+|header_body|`[]byte`|header的playload,长度由附带的`remark`和`properties`决定|
+|body|`[]byte`|具体Request/Response的playload|
+
+## Header
+RocketMQ的Header序列化方式有两种:JSON和RocketMQ私有的序列化方式。JSON序列化方式不再赘述。具体可以参考Java`RemotingCommand`类。
+主要介绍RocketMQ的私有序列化方式。
+
+在序列化的时候,需要将序列化方式记录进数据包里面,即对`header_length`进行编码
+
+```go
+// 编码算法
+
+// 编码后的header_length
+var header_length int32
+
+// 实际的header长度
+var headerDataLen int32
+
+// 序列化方式
+var SerializedType byte
+
+result := make([]byte, 4)
+result[0]|SerializedType
+result[1]|byte((headerDataLen >> 16) & 0xFF)
+result[2]|byte((headerDataLen >> 8) & 0xFF)
+result[3]|byte(headerDataLen & 0xFF)
+binary.Read(result, binary.BigEndian, &header_length)
+
+
+// 解码算法
+headerDataLen := header_length & 0xFFFFFF
+SerializedType := byte((header_length >> 24) & 0xFF)
+```
+
+### Header Frame
+
+```
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
++  request_code | l_flag | v_flag | opaque | request_flag |  r_len  |   r_body    |  e_len  |    e_body   +
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
++     2bytes    |  1byte | 2bytes | 4bytes |    4 bytes   | 4 bytes | r_len bytes | 4 bytes | e_len bytes +
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
+```
+
+|item|type|description|
+|:-:|:-:|:-:|
+|request_code|`short`|哪一种Request或ResponseCode,具体类别由request_flag决定|
+|l_flag|`byte`|language位,用来标识Request来源方的开发语言|
+|v_flag|`int16`|版本标记位|
+|request_flag|`int32`|Header标记位,用来标记该`RemotingCommand`的类型和请求方式|
+|opaque|`int32`|标识Request/Response的RequestID,Broker返回的Response通过该值和Client缓存的Request一一对应|
+|r_len|`int32`| length of remark, remark是Request/Response的附带说明信息,一般在Response中用来说明具体的错误原因|
+|r_body|`[]byte`| playload of remark |
+|e_len|`int32`| length of extended fields,即properties,一些非标准字段会存储在这里,在RocketMQ的各种feature中均有广泛应用|
+|e_body|`int32`| playload of extended fields |
+
+## Body
+`body`是具体的Request/Response的数据,在RocketMQ中,有许多种Request/Response。每个类有自己的序列化和反序列方式,由于种类过多,
+这里就不再展开。可以具体参考Java代码中对`CommandCustomHeader`的使用。下面列一些Client使用到的Request和Response
+
+### RequestCode
+|item|type|description|
+|:-:|:-:|:-:|
+|SEND_MESSAGE|10| 向broker发送消息|
+|PULL_MESSAGE|11| 从broker拉取消息,client的push模式也是通过pull的长轮询来实现的|
+|TODO...|||
+
+### ResponseCode
+|item|type|description|
+|:-:|:-:|:-:|
+|FLUSH_DISK_TIMEOUT|10|broker存储层刷盘超时|
+|SLAVE_NOT_AVAILABLE|11|slave节点无法服务|
+|FLUSH_SLAVE_TIMEOUT|12|数据同步到slave超时|
+|MESSAGE_ILLEGAL|13|消息格式不合格|
+|SERVICE_NOT_AVAILABLE|14|broker暂时不可用|
+|VERSION_NOT_SUPPORTED|15|不支持的请求,目前没有看到使用|
+|NO_PERMISSION|16|对broker、topic或subscription无访问权限|
+|TOPIC_EXIST_ALREADY|18|topic已存在,目前没看到使用|
+|PULL_NOT_FOUND|19|没拉到消息,大多为offset错误|
+|PULL_RETRY_IMMEDIATELY|20|建议client立即重新拉取消息|
+|PULL_OFFSET_MOVED|21|offset太小或太大|
+|QUERY_NOT_FOUND|22|管理面Response,TODO|
+|SUBSCRIPTION_PARSE_FAILED|23|订阅数据解析失败|
+|SUBSCRIPTION_NOT_EXIST|24|订阅不存在|
+|SUBSCRIPTION_NOT_LATEST|25|订阅数据版本和request数据版本不匹配|
+|SUBSCRIPTION_GROUP_NOT_EXIST|26|订阅组不存在|
+|FILTER_DATA_NOT_EXIST|27|filter数据不存在|
+|FILTER_DATA_NOT_LATEST|28|filter数据版本和request数据版本不匹配|
+|TRANSACTION_SHOULD_COMMIT|200|事务Response,TODO|
+|TRANSACTION_SHOULD_ROLLBACK|201|事务Response,TODO|
+|TRANSACTION_STATE_UNKNOW|202|事务Response,TODO||
+|TRANSACTION_STATE_GROUP_WRONG|203|事务Response,TODO|
+|NO_BUYER_ID|204|不知道是什么,没看到broker端在使用|
+|NOT_IN_CURRENT_UNIT|205|不知道是什么,没看到broker端在使用|
+|CONSUMER_NOT_ONLINE|206|consumer不在线,控制面response|
+|CONSUME_MSG_TIMEOUT|207|client request等待broker相应超时|
+|NO_MESSAGE|208|控制面response,由client自己设置,不清楚具体用途|
\ No newline at end of file
diff --git a/remote/client.go b/remote/client.go
new file mode 100644
index 0000000..058ccdd
--- /dev/null
+++ b/remote/client.go
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package remote
+
+import (
+	"encoding/binary"
+	"errors"
+	"github.com/apache/rocketmq-client-go/utils"
+	log "github.com/sirupsen/logrus"
+	"net"
+	"sync"
+	"time"
+)
+
+var (
+	ErrRequestTimeout = errors.New("request timeout")
+)
+
+type RemotingClient interface {
+	InvokeSync(request *remotingCommand) (*remotingCommand, error)
+	InvokeAsync(request *remotingCommand, f func(*remotingCommand)) error
+	InvokeOneWay(request *remotingCommand) error
+}
+
+// ClientConfig common config
+type ClientConfig struct {
+	// NameServer or Broker address
+	RemotingAddress string
+
+	ClientIP     string
+	InstanceName string
+
+	// Heartbeat interval in microseconds with message broker, default is 30
+	HeartbeatBrokerInterval time.Duration
+
+	// request timeout time
+	RequestTimeout time.Duration
+	CType byte
+
+	UnitMode          bool
+	UnitName          string
+	VipChannelEnabled bool
+}
+
+type defaultClient struct {
+	//clientId     string
+	config ClientConfig
+	conn net.Conn
+	// requestId
+	opaque int32
+
+	// int32 -> ResponseFuture
+	responseTable sync.Map
+	codec         serializer
+	exitCh chan interface{}
+}
+
+func NewRemotingClient(config ClientConfig) (RemotingClient, error) {
+	client := &defaultClient{
+		config: config,
+	}
+
+	switch config.CType {
+	case Json:
+		client.codec = &jsonCodec{}
+	case RocketMQ:
+		client.codec = &rmqCodec{}
+	default:
+		return nil, errors.New("unknow codec")
+	}
+
+	conn, err := net.Dial("tcp", config.RemotingAddress)
+	if err != nil {
+		log.Error(err)
+		return nil, err
+	}
+	client.conn = conn
+	go client.listen()
+	go client.clearExpiredRequest()
+	return client, nil
+}
+
+func (client *defaultClient) InvokeSync(request *remotingCommand) (*remotingCommand, error) {
+
+	response := &ResponseFuture{
+		SendRequestOK:  false,
+		Opaque:         request.Opaque,
+		TimeoutMillis:  client.config.RequestTimeout,
+		BeginTimestamp: time.Now().Unix(),
+		Done:           make(chan bool),
+	}
+	header, err := encode(request)
+	body := request.Body
+	client.responseTable.Store(request.Opaque, response)
+	err = client.doRequest(header, body)
+
+	if err != nil {
+		log.Error(err)
+		return nil, err
+	}
+	select {
+	case <-response.Done:
+		rmd := response.ResponseCommand
+		return rmd, nil
+	case <-time.After(client.config.RequestTimeout):
+		return nil, ErrRequestTimeout
+	}
+}
+
+func (client *defaultClient) InvokeAsync(request *remotingCommand, f func(*remotingCommand)) error {
+
+	response := &ResponseFuture{
+		SendRequestOK:  false,
+		Opaque:         request.Opaque,
+		TimeoutMillis:  client.config.RequestTimeout,
+		BeginTimestamp: time.Now().Unix(),
+		callback:       f,
+	}
+	client.responseTable.Store(request.Opaque, response)
+	header, err := encode(request)
+	if err != nil {
+		return err
+	}
+
+	body := request.Body
+	return client.doRequest(header, body)
+}
+
+func (client *defaultClient) InvokeOneWay(request *remotingCommand) error {
+	header, err := encode(request)
+	if err != nil {
+		return err
+	}
+
+	body := request.Body
+	return client.doRequest(header, body)
+}
+
+func (client *defaultClient) doRequest(header, body []byte) error {
+	var requestBytes []byte
+	requestBytes = append(requestBytes, header...)
+	if body != nil && len(body) > 0 {
+		requestBytes = append(requestBytes, body...)
+	}
+	_, err := client.conn.Write(requestBytes)
+	return err
+}
+
+func (client *defaultClient) close() {
+	// TODO process response
+	client.conn.Close()
+}
+
+func (client *defaultClient) listen() {
+	rb := utils.NewRingBuffer(4096)
+
+	var frameSize int32
+	go func() {
+		for {
+			err := binary.Read(rb, binary.BigEndian, &frameSize)
+			if err != nil {
+				 // TODO
+			}
+			data := make([]byte, frameSize)
+
+			_, err = rb.Read(data)
+
+			if err != nil {
+				// TODO
+			}
+
+			cmd, err := decode(data)
+			if cmd.isResponseType() {
+				client.handleResponse(cmd)
+			}  else {
+				client.handleRequestFromServer(cmd)
+			}
+		}
+	}()
+
+	buf := make([]byte, 4096)
+	for {
+		n, err := client.conn.Read(buf)
+		if err != nil {
+			log.Errorf("read data from connection errors: %v", err)
+			return
+		}
+		err = rb.Write(buf[:n])
+		if err != nil {
+			// just log
+			log.Errorf("write data to buffer errors: %v", err)
+		}
+
+	}
+}
+
+func (client *defaultClient) handleRequestFromServer(cmd *remotingCommand) {
+	//responseCommand := client.clientRequestProcessor(cmd)
+	//if responseCommand == nil {
+	//	return
+	//}
+	//responseCommand.Opaque = cmd.Opaque
+	//responseCommand.markResponseType()
+	//header, err := encode(responseCommand)
+	//body := responseCommand.Body
+	//err = client.doRequest(header, body)
+	//if err != nil {
+	//	log.Error(err)
+	//}
+}
+
+func (client *defaultClient) handleResponse(cmd *remotingCommand) error {
+	//response, err := client.getResponse(cmd.Opaque)
+	////client.removeResponse(cmd.Opaque)
+	//if err != nil {
+	//	return err
+	//}
+	//
+	//response.ResponseCommand = cmd
+	//response.callback(cmd)
+	//
+	//if response.Done != nil {
+	//	response.Done <- true
+	//}
+	return nil
+}
+
+func (client *defaultClient) clearExpiredRequest() {
+	//for seq, responseObj := range client.responseTable.Items() {
+	//	response := responseObj.(*ResponseFuture)
+	//	if (response.BeginTimestamp + 30) <= time.Now().Unix() {
+	//		//30 minutes expired
+	//		client.responseTable.Remove(seq)
+	//		response.callback(nil)
+	//		log.Warningf("remove time out request %v", response)
+	//	}
+	//}
+}
diff --git a/remote/client_test.go b/remote/client_test.go
new file mode 100644
index 0000000..7909612
--- /dev/null
+++ b/remote/client_test.go
@@ -0,0 +1,17 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package remote
diff --git a/remote/codec.go b/remote/codec.go
new file mode 100644
index 0000000..0a62642
--- /dev/null
+++ b/remote/codec.go
@@ -0,0 +1,431 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package remote
+
+import (
+	"bytes"
+	"encoding/binary"
+	"encoding/json"
+	"sync/atomic"
+)
+
+var opaque int32
+
+const (
+	// 0, REQUEST_COMMAND
+	RpcType = 0
+
+	// 1, RPC
+	RpcOneWay = 1
+
+	ResponseType = 1
+
+	RemotingCommandFlag    = 0
+	languageFlag           = "golang"
+	languageCode = byte(9)
+	remotingCommandVersion = 137
+)
+
+type remotingCommand struct {
+	Code      int16             `json:"code"`
+	Language  string            `json:"language"`
+	Version   int16             `json:"version"`
+	Opaque    int32             `json:"opaque"`
+	Flag      int               `json:"flag"`
+	Remark    string            `json:"remark"`
+	ExtFields map[string]string `json:"extFields"`
+	Body      []byte            `json:"body,omitempty"`
+}
+
+func newRemotingCommand(code int16, properties map[string]string, body []byte) *remotingCommand {
+	remotingCommand := &remotingCommand{
+		Code:      code,
+		Language:  languageFlag,
+		Version:   remotingCommandVersion,
+		Opaque:    atomic.AddInt32(&opaque, 1),
+		ExtFields: properties,
+		Body:      body,
+	}
+
+	return remotingCommand
+}
+
+func (command *remotingCommand) isResponseType() bool {
+	return command.Flag&(ResponseType) == ResponseType
+}
+
+func (command *remotingCommand) markResponseType() {
+	command.Flag = command.Flag | ResponseType
+}
+
+var (
+	jsonSerializer = &jsonCodec{}
+	rocketMqSerializer = &rmqCodec{}
+	codecType byte
+)
+
+// encode remotingCommand
+//
+// Frame format:
+// +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
+// + item | frame_size | header_length |         header_body        |     body     +
+// +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
+// + len  |   4bytes   |     4bytes    | (19 + r_len + e_len) bytes | remain bytes +
+// +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
+func encode(command *remotingCommand) ([]byte, error) {
+	var (
+		header []byte
+		err error
+	)
+
+	switch codecType {
+	case Json:
+		header, err  = jsonSerializer.encodeHeader(command)
+	case RocketMQ:
+		header, err  = rocketMqSerializer.encodeHeader(command)
+	}
+
+	if err != nil {
+		return nil, err
+	}
+
+	frameSize := 8 + len(header) + len(command.Body)
+	buf := bytes.NewBuffer(make([]byte, frameSize))
+
+	err = binary.Write(buf, binary.BigEndian, int32(frameSize))
+	if err != nil {
+		return nil, err
+	}
+
+	err = binary.Write(buf, binary.BigEndian, markProtocolType(int32(len(header))))
+	if err != nil {
+		return nil, err
+	}
+
+	err = binary.Write(buf, binary.BigEndian, int32(len(command.Body)))
+	if err != nil {
+		return nil, err
+	}
+
+	return buf.Bytes(), nil
+}
+
+func decode(data []byte) (*remotingCommand, error) {
+	buf := bytes.NewBuffer(data)
+
+	var oriHeaderLen, headerLength int32
+	err := binary.Read(buf, binary.BigEndian, &oriHeaderLen)
+	if err != nil {
+		return nil, err
+	}
+	headerLength = oriHeaderLen & 0xFFFFFF
+
+	headerData := make([]byte, headerLength)
+	err = binary.Read(buf, binary.BigEndian, &headerLength)
+	if err != nil {
+		return nil, err
+	}
+
+	var command *remotingCommand
+
+	switch byte((oriHeaderLen >> 24) & 0xFF) {
+	case Json:
+		command, err = jsonSerializer.decodeHeader(headerData)
+	case RocketMQ:
+		command, err = rocketMqSerializer.decodeHeader(headerData)
+	}
+	if err != nil {
+		return nil, err
+	}
+
+	bodyLength := int32(len(data)) - 4 - headerLength
+	command.Body = make([]byte, bodyLength)
+	err = binary.Read(buf, binary.BigEndian, &command.Body)
+	if err != nil {
+		return nil, err
+	}
+
+	return command, nil
+}
+
+func markProtocolType(source int32) []byte {
+	result := make([]byte, 4)
+	result[0] = codecType
+	result[1] = byte((source >> 16) & 0xFF)
+	result[2] = byte((source >> 8) & 0xFF)
+	result[3] = byte(source & 0xFF)
+	return result
+}
+
+const (
+	Json  = byte(0)
+	RocketMQ  = byte(1)
+)
+
+type serializer interface {
+	encodeHeader(command *remotingCommand) ([]byte, error)
+	decodeHeader(data []byte) (*remotingCommand, error)
+}
+
+// jsonCodec please refer to remoting/protocol/RemotingSerializable
+type jsonCodec struct {}
+
+func (c *jsonCodec) encodeHeader(command *remotingCommand) ([]byte, error) {
+	buf, err := json.Marshal(command)
+	if err != nil {
+		return nil, err
+	}
+	return buf, nil
+}
+
+func (c *jsonCodec) decodeHeader(header []byte) (*remotingCommand, error) {
+	command := &remotingCommand{}
+	command.ExtFields = make(map[string]string)
+	err := json.Unmarshal(header, command)
+	if err != nil {
+		return nil, err
+	}
+	return command, nil
+}
+
+// rmqCodec implementation of RocketMQ private protocol, please refer to remoting/protocol/RocketMQSerializable
+// RocketMQ Private Protocol Header format:
+//
+// v_flag: version flag
+// r_len: length of remark body
+// r_body: data of remark body
+// e_len: length of extends fields body
+// e_body: data of extends fields
+//
+// +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
+// + item | request_code | l_flag | v_flag | opaque | request_flag |  r_len  |   r_body    |  e_len  |    e_body   +
+// +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
+// + len  |    2bytes    |  1byte | 2bytes | 4bytes |    4 bytes   | 4 bytes | r_len bytes | 4 bytes | e_len bytes +
+// +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
+const (
+	// header + body length
+	headerFixedLength = 21
+)
+
+type rmqCodec struct {}
+
+// encodeHeader
+func (c *rmqCodec) encodeHeader(command *remotingCommand)  ([]byte, error) {
+	extBytes, err := c.encodeMaps(command.ExtFields)
+	if err != nil {
+		return nil, err
+	}
+
+	buf := bytes.NewBuffer(make([]byte, headerFixedLength + len(command.Remark) + len(extBytes)))
+
+	// request code, length is 2 bytes
+	err = binary.Write(buf, binary.BigEndian, command.Code)
+	if err != nil {
+		return nil, err
+	}
+
+	// language flag, length is 1 byte
+	err = binary.Write(buf, binary.BigEndian, languageCode)
+	if err != nil {
+		return nil, err
+	}
+
+	// version flag, length is 2 bytes
+	err = binary.Write(buf, binary.BigEndian, command.Version)
+	if err != nil {
+		return nil, err
+	}
+
+	// opaque flag, opaque is request identifier, length is 4 bytes
+	err = binary.Write(buf, binary.BigEndian, command.Opaque)
+	if err != nil {
+		return nil, err
+	}
+
+	// request flag, length is 4 bytes
+	err = binary.Write(buf, binary.BigEndian, command.Flag)
+	if err != nil {
+		return nil, err
+	}
+
+	// remark length flag, length is 4 bytes
+	err = binary.Write(buf, binary.BigEndian, int32(len(command.Remark)))
+	if err != nil {
+		return nil, err
+	}
+
+	// write remark, len(command.Remark) bytes
+	if len(command.Remark) > 0 {
+		err = binary.Write(buf, binary.BigEndian, command.Remark)
+		if err != nil {
+			return nil, err
+		}
+	}
+
+	err = binary.Write(buf, binary.BigEndian, int32(len(extBytes)))
+	if err != nil {
+		return nil, err
+	}
+
+	if len(extBytes) > 0 {
+		err = binary.Write(buf, binary.BigEndian, extBytes)
+		if err != nil {
+			return nil, err
+		}
+	}
+
+	return buf.Bytes(), nil
+}
+
+func (c *rmqCodec) encodeMaps(maps map[string]string) ([]byte, error) {
+	if maps == nil || len(maps) == 0 {
+		return []byte{}, nil
+	}
+	extFieldsBuf := bytes.NewBuffer([]byte{})
+	var err error
+	for key, value := range maps {
+		err = binary.Write(extFieldsBuf, binary.BigEndian, int16(len(key)))
+		if err != nil {
+			return nil, err
+		}
+		err = binary.Write(extFieldsBuf, binary.BigEndian, []byte(key))
+		if err != nil {
+			return nil, err
+		}
+
+		err = binary.Write(extFieldsBuf, binary.BigEndian, int32(len(value)))
+		if err != nil {
+			return nil, err
+		}
+		err = binary.Write(extFieldsBuf, binary.BigEndian, []byte(value))
+		if err != nil {
+			return nil, err
+		}
+	}
+	return extFieldsBuf.Bytes(), nil
+}
+
+func (c *rmqCodec) decodeHeader(data []byte) (*remotingCommand, error) {
+	var err error
+	command := &remotingCommand{}
+	buf := bytes.NewBuffer(data)
+	// int code(~32767)
+	err = binary.Read(buf, binary.BigEndian, &command.Code)
+	if err != nil {
+		return nil, err
+	}
+
+	// LanguageCode language
+
+	var (
+		languageCode byte
+		remarkLen int32
+		extFieldsLen int32
+	)
+	err = binary.Read(buf, binary.BigEndian, &languageCode)
+	if err != nil {
+		return nil, err
+	}
+	//command.Language = languageFlag
+
+	// int version(~32767)
+	err = binary.Read(buf, binary.BigEndian, &command.Version)
+	if err != nil {
+		return nil, err
+	}
+
+	// int opaque
+	err = binary.Read(buf, binary.BigEndian, &command.Opaque)
+	if err != nil {
+		return nil, err
+	}
+
+	// int flag
+	err = binary.Read(buf, binary.BigEndian, &command.Flag)
+	if err != nil {
+		return nil, err
+	}
+
+	// String remark
+	err = binary.Read(buf, binary.BigEndian, &remarkLen)
+	if err != nil {
+		return nil, err
+	}
+
+	if remarkLen > 0 {
+		var remarkData = make([]byte, remarkLen)
+		err = binary.Read(buf, binary.BigEndian, &remarkData)
+		if err != nil {
+			return nil, err
+		}
+		command.Remark = string(remarkData)
+	}
+
+	err = binary.Read(buf, binary.BigEndian, &extFieldsLen)
+	if err != nil {
+		return nil, err
+	}
+
+	if extFieldsLen > 0 {
+		extFieldsData := make([]byte, extFieldsLen)
+		err = binary.Read(buf, binary.BigEndian, &extFieldsData)
+		if err != nil {
+			return nil, err
+		}
+
+		command.ExtFields = make(map[string]string)
+		buf := bytes.NewBuffer(extFieldsData)
+		var (
+			kLen int16
+			vLen int32
+		)
+		for buf.Len() > 0 {
+			err = binary.Read(buf, binary.BigEndian, &kLen)
+			if err != nil {
+				return nil, err
+			}
+
+			key, err := getExtFieldsData(buf, int32(kLen))
+			if err != nil {
+				return nil, err
+			}
+
+			err = binary.Read(buf, binary.BigEndian, &vLen)
+			if err != nil {
+				return nil, err
+			}
+
+			value, err := getExtFieldsData(buf, vLen)
+			if err != nil {
+				return nil, err
+			}
+			command.ExtFields[key] = value
+		}
+	}
+
+	return command, nil
+}
+
+func getExtFieldsData(buff *bytes.Buffer, length int32) (string, error) {
+	var data = make([]byte, length)
+	err := binary.Read(buff, binary.BigEndian, &data)
+	if err != nil {
+		return "", err
+	}
+
+	return string(data), nil
+}
diff --git a/remote/codec_test.go b/remote/codec_test.go
new file mode 100644
index 0000000..35cf702
--- /dev/null
+++ b/remote/codec_test.go
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package remote
+
+import (
+	"testing"
+)
+
+func TestEncode(t *testing.T) {
+}
+
+func TestDecode(t *testing.T) {
+}
\ No newline at end of file
diff --git a/remote/processor.go b/remote/processor.go
new file mode 100644
index 0000000..2d0218e
--- /dev/null
+++ b/remote/processor.go
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package remote
+
+type ClientRequestProcessor func(remotingCommand *remotingCommand) (responseCommand *remotingCommand)
+
+//CHECK_TRANSACTION_STATE
+//NOTIFY_CONSUMER_IDS_CHANGED
+//RESET_CONSUMER_CLIENT_OFFSET
+//GET_CONSUMER_STATUS_FROM_CLIENT
+//GET_CONSUMER_RUNNING_INFO
+//CONSUME_MESSAGE_DIRECTLY
diff --git a/remote/request.go b/remote/request.go
new file mode 100644
index 0000000..c37eac5
--- /dev/null
+++ b/remote/request.go
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package remote
+
+const (
+	SEND_MESSAGE                        = 10
+	PULL_MESSAGE                        = 11
+	QUERY_MESSAGE                       = 12
+	QUERY_BROKER_OFFSET                 = 13
+	QUERY_CONSUMER_OFFSET               = 14
+	UPDATE_CONSUMER_OFFSET              = 15
+	UPDATE_AND_CREATE_TOPIC             = 17
+	GET_ALL_TOPIC_CONFIG                = 21
+	GET_TOPIC_CONFIG_LIST               = 22
+	GET_TOPIC_NAME_LIST                 = 23
+	UPDATE_BROKER_CONFIG                = 25
+	GET_BROKER_CONFIG                   = 26
+	TRIGGER_DELETE_FILES                = 27
+	GET_BROKER_RUNTIME_INFO             = 28
+	SEARCH_OFFSET_BY_TIMESTAMP          = 29
+	GET_MAX_OFFSET                      = 30
+	GET_MIN_OFFSET                      = 31
+	GET_EARLIEST_MSG_STORETIME          = 32
+	VIEW_MESSAGE_BY_ID                  = 33
+	HEART_BEAT                          = 34
+	UNREGISTER_CLIENT                   = 35
+	CONSUMER_SEND_MSG_BACK              = 36
+	END_TRANSACTION                     = 37
+	GET_CONSUMER_LIST_BY_GROUP          = 38
+	CHECK_TRANSACTION_STATE             = 39
+	NOTIFY_CONSUMER_IDS_CHANGED         = 40
+	LOCK_BATCH_MQ                       = 41
+	UNLOCK_BATCH_MQ                     = 42
+	GET_ALL_CONSUMER_OFFSET             = 43
+	GET_ALL_DELAY_OFFSET                = 45
+	PUT_KV_CONFIG                       = 100
+	GET_KV_CONFIG                       = 101
+	DELETE_KV_CONFIG                    = 102
+	REGISTER_BROKER                     = 103
+	UNREGISTER_BROKER                   = 104
+	GET_ROUTEINTO_BY_TOPIC              = 105
+	GET_BROKER_CLUSTER_INFO             = 106
+	UPDATE_AND_CREATE_SUBSCRIPTIONGROUP = 200
+	GET_ALL_SUBSCRIPTIONGROUP_CONFIG    = 201
+	GET_TOPIC_STATS_INFO                = 202
+	GET_CONSUMER_CONNECTION_LIST        = 203
+	GET_PRODUCER_CONNECTION_LIST        = 204
+	WIPE_WRITE_PERM_OF_BROKER           = 205
+
+	GET_ALL_TOPIC_LIST_FROM_NAMESERVER = 206
+	DELETE_SUBSCRIPTIONGROUP           = 207
+	GET_CONSUME_STATS                  = 208
+	SUSPEND_CONSUMER                   = 209
+	RESUME_CONSUMER                    = 210
+	RESET_CONSUMER_OFFSET_IN_CONSUMER  = 211
+	RESET_CONSUMER_OFFSET_IN_BROKER    = 212
+	ADJUST_CONSUMER_THREAD_POOL        = 213
+	WHO_CONSUME_THE_MESSAGE            = 214
+
+	DELETE_TOPIC_IN_BROKER    = 215
+	DELETE_TOPIC_IN_NAMESRV   = 216
+	GET_KV_CONFIG_BY_VALUE    = 217
+	DELETE_KV_CONFIG_BY_VALUE = 218
+	GET_KVLIST_BY_NAMESPACE   = 219
+
+	RESET_CONSUMER_CLIENT_OFFSET         = 220
+	GET_CONSUMER_STATUS_FROM_CLIENT      = 221
+	INVOKE_BROKER_TO_RESET_OFFSET        = 222
+	INVOKE_BROKER_TO_GET_CONSUMER_STATUS = 223
+
+	QUERY_TOPIC_CONSUME_BY_WHO = 300
+
+	GET_TOPICS_BY_CLUSTER = 224
+
+	REGISTER_FILTER_SERVER            = 301
+	REGISTER_MESSAGE_FILTER_CLASS     = 302
+	QUERY_CONSUME_TIME_SPAN           = 303
+	GET_SYSTEM_TOPIC_LIST_FROM_NS     = 304
+	GET_SYSTEM_TOPIC_LIST_FROM_BROKER = 305
+
+	CLEAN_EXPIRED_CONSUMEQUEUE = 306
+
+	GET_CONSUMER_RUNNING_INFO = 307
+
+	QUERY_CORRECTION_OFFSET = 308
+
+	CONSUME_MESSAGE_DIRECTLY = 309
+
+	SEND_MESSAGE_V2 = 310
+
+	GET_UNIT_TOPIC_LIST                = 311
+	GET_HAS_UNIT_SUB_TOPIC_LIST        = 312
+	GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST = 313
+	CLONE_GROUP_OFFSET                 = 314
+
+	VIEW_BROKER_STATS_DATA = 315
+)
diff --git a/remote/response.go b/remote/response.go
new file mode 100644
index 0000000..7053e27
--- /dev/null
+++ b/remote/response.go
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package remote
+
+import "time"
+
+const (
+	SUCCESS                       = 0
+	SYSTEM_ERROR                  = 1
+	SYSTEM_BUSY                   = 2
+	REQUEST_CODE_NOT_SUPPORTED    = 3
+	TRANSACTION_FAILED            = 4
+	FLUSH_DISK_TIMEOUT            = 10
+	SLAVE_NOT_AVAILABLE           = 11
+	FLUSH_SLAVE_TIMEOUT           = 12
+	MESSAGE_ILLEGAL               = 13
+	SERVICE_NOT_AVAILABLE         = 14
+	VERSION_NOT_SUPPORTED         = 15
+	NO_PERMISSION                 = 16
+	TOPIC_NOT_EXIST               = 17
+	TOPIC_EXIST_ALREADY           = 18
+	PULL_NOT_FOUND                = 19
+	PULL_RETRY_IMMEDIATELY        = 20
+	PULL_OFFSET_MOVED             = 21
+	QUERY_NOT_FOUND               = 22
+	SUBSCRIPTION_PARSE_FAILED     = 23
+	SUBSCRIPTION_NOT_EXIST        = 24
+	SUBSCRIPTION_NOT_LATEST       = 25
+	SUBSCRIPTION_GROUP_NOT_EXIST  = 26
+	TRANSACTION_SHOULD_COMMIT     = 200
+	TRANSACTION_SHOULD_ROLLBACK   = 201
+	TRANSACTION_STATE_UNKNOW      = 202
+	TRANSACTION_STATE_GROUP_WRONG = 203
+	NO_BUYER_ID                   = 204
+
+	NOT_IN_CURRENT_UNIT = 205
+
+	CONSUMER_NOT_ONLINE = 206
+
+	CONSUME_MSG_TIMEOUT = 207
+)
+
+type ResponseFuture struct {
+	ResponseCommand *remotingCommand
+	SendRequestOK   bool
+	Rrr             error
+	Opaque          int32
+	TimeoutMillis   time.Duration
+	callback        func(*remotingCommand)
+	BeginTimestamp  int64
+	Done            chan bool
+}
diff --git a/remote/rpchook.go b/remote/rpchook.go
new file mode 100644
index 0000000..134e4be
--- /dev/null
+++ b/remote/rpchook.go
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package remote
+
+type RPCHook interface {
+	DoBeforeRequest(string, *remotingCommand)
+	DoAfterResponse(string, *remotingCommand)
+}
diff --git a/utils/ring_buffer.go b/utils/ring_buffer.go
new file mode 100644
index 0000000..ee60bcd
--- /dev/null
+++ b/utils/ring_buffer.go
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+ 
+package utils
+
+import "sync"
+
+type RingBuffer struct {
+	buf         []byte
+	writePos int
+	readPos int
+	cap int
+	rwMutex sync.RWMutex
+	exitCh chan interface{}
+}
+
+func NewRingBuffer(cap int) *RingBuffer {
+	rb := &RingBuffer{buf: make([]byte, cap), cap:cap}
+	go rb.resize()
+	return rb
+}
+
+func (r *RingBuffer) Write(b []byte) error {
+	// TODO
+	return nil
+}
+
+func (r *RingBuffer) Read(p []byte) (n int, err error) {
+
+	if r.Size() >= len(p) {
+		copy(p, r.buf[r.readPos: r.readPos + len(p)])
+		r.readPos += len(p)
+
+	}
+
+	// TODO waiting data...
+	return 0, err
+}
+
+func (r *RingBuffer) Size() int {
+	return r.writePos - r.readPos
+}
+
+func (r *RingBuffer) Destroy() {
+
+}
+
+func (r *RingBuffer) resize() {
+	// TODO
+}
\ No newline at end of file