You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by di...@apache.org on 2019/05/07 03:35:18 UTC

[rocketmq-client-go] branch native updated: Add ClientProcessor and optimize (#52)

This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/native by this push:
     new 529c872  Add ClientProcessor and optimize (#52)
529c872 is described below

commit 529c872f2473247f8735d43a2dbc3c9c72c7d93e
Author: wenfeng <sx...@gmail.com>
AuthorDate: Tue May 7 11:35:14 2019 +0800

    Add ClientProcessor and optimize (#52)
    
    * replace list with TreeMap in process_queue
    
    * refactor for processor
    
    * update feature list
    
    * add cleanOfflineBroker
---
 benchmark/producer.go                   |   8 +-
 benchmark/stable.go                     |   6 +-
 consumer/consumer.go                    | 100 ++++++++++++----
 consumer/offset_store.go                |  99 ++++++++-------
 consumer/process_queue.go               | 205 ++++++++++++++++++++++++++------
 consumer/push_consumer.go               | 119 ++++++++++++++----
 core/api.go                             |   4 +-
 core/error.go                           |   2 +-
 core/producer.go                        |   4 +-
 docs/feature.md                         | 142 ++++++++++------------
 examples/{producer => consumer}/main.go |   7 +-
 go.mod                                  |   3 -
 go.sum                                  |   6 -
 kernel/client.go                        | 113 +++++++-----------
 kernel/model.go                         |   2 +-
 kernel/request.go                       |  29 +++--
 kernel/route.go                         |  99 ++++++++++++++-
 remote/processor.go                     |  26 ----
 remote/remote_client.go                 | 118 +++++++++++-------
 rlog/log.go                             |  12 +-
 utils/helper.go                         |  25 ----
 utils/helper_test.go                    |   9 --
 utils/net.go                            |  30 +++++
 utils/net_test.go                       |   7 ++
 24 files changed, 749 insertions(+), 426 deletions(-)

diff --git a/benchmark/producer.go b/benchmark/producer.go
index e183269..4551615 100644
--- a/benchmark/producer.go
+++ b/benchmark/producer.go
@@ -108,7 +108,7 @@ type producer struct {
 
 func init() {
 	p := &producer{}
-	flags := flag.NewFlagSet("producer", flag.ExitOnError)
+	flags := flag.NewFlagSet("consumer", flag.ExitOnError)
 	p.flags = flags
 
 	flags.StringVar(&p.topic, "t", "", "topic name")
@@ -118,7 +118,7 @@ func init() {
 	flags.IntVar(&p.testMinutes, "m", 10, "test minutes")
 	flags.IntVar(&p.bodySize, "s", 32, "body size")
 
-	registerCommand("producer", p)
+	registerCommand("consumer", p)
 }
 
 func (bp *producer) produceMsg(stati *statiBenchmarkProducerSnapshot, exit chan struct{}) {
@@ -126,14 +126,14 @@ func (bp *producer) produceMsg(stati *statiBenchmarkProducerSnapshot, exit chan
 		ClientConfig: rocketmq.ClientConfig{GroupID: bp.groupID, NameServer: bp.nameSrv},
 	})
 	if err != nil {
-		fmt.Printf("new producer error:%s\n", err)
+		fmt.Printf("new consumer error:%s\n", err)
 		return
 	}
 
 	p.Start()
 	defer p.Shutdown()
 
-	topic, tag := bp.topic, "benchmark-producer"
+	topic, tag := bp.topic, "benchmark-consumer"
 
 AGAIN:
 	select {
diff --git a/benchmark/stable.go b/benchmark/stable.go
index 6c7a12c..46c7b5c 100644
--- a/benchmark/stable.go
+++ b/benchmark/stable.go
@@ -145,13 +145,13 @@ func (stp *stableTestProducer) run(args []string) {
 		ClientConfig: rocketmq.ClientConfig{GroupID: stp.groupID, NameServer: stp.nameSrv},
 	})
 	if err != nil {
-		fmt.Printf("new producer error:%s\n", err)
+		fmt.Printf("new consumer error:%s\n", err)
 		return
 	}
 
 	err = p.Start()
 	if err != nil {
-		fmt.Printf("start producer error:%s\n", err)
+		fmt.Printf("start consumer error:%s\n", err)
 		return
 	}
 	defer p.Shutdown()
@@ -256,7 +256,7 @@ func (stc *stableTestConsumer) pullMessage() {
 }
 
 func init() {
-	// producer
+	// consumer
 	name := "stableTestProducer"
 	p := &stableTestProducer{stableTest: &stableTest{}}
 	p.buildFlags(name)
diff --git a/consumer/consumer.go b/consumer/consumer.go
index f128f66..a6d9500 100644
--- a/consumer/consumer.go
+++ b/consumer/consumer.go
@@ -26,6 +26,7 @@ import (
 	"github.com/apache/rocketmq-client-go/utils"
 	"github.com/tidwall/gjson"
 	"sort"
+	"strconv"
 	"strings"
 	"sync"
 	"sync/atomic"
@@ -181,7 +182,7 @@ const (
 type PullRequest struct {
 	consumerGroup string
 	mq            *kernel.MessageQueue
-	pq            *ProcessQueue
+	pq            *processQueue
 	nextOffset    int64
 	lockedFirst   bool
 }
@@ -209,7 +210,7 @@ type ConsumerOption struct {
 
 	// Flow control threshold on queue level, each message queue will cache at most 1000 messages by default,
 	// Consider the {PullBatchSize}, the instantaneous value may exceed the limit
-	PullThresholdForQueue int
+	PullThresholdForQueue int64
 
 	// Limit the cached message size on queue level, each message queue will cache at most 100 MiB messages by default,
 	// Consider the {@code pullBatchSize}, the instantaneous value may exceed the limit
@@ -408,8 +409,10 @@ func (dc *defaultConsumer) SubscriptionDataList() []*kernel.SubscriptionData {
 }
 
 func (dc *defaultConsumer) makeSureStateOK() error {
-	// TODO log
-	return nil //dc.state == StateRunning
+	if dc.state != kernel.StateRunning {
+		return fmt.Errorf("state not running, actually: %v", dc.state)
+	}
+	return nil
 }
 
 type lockBatchRequestBody struct {
@@ -436,7 +439,7 @@ func (dc *defaultConsumer) lock(mq *kernel.MessageQueue) bool {
 		_mq := lockedMQ[idx]
 		v, exist := dc.processQueueTable.Load(_mq)
 		if exist {
-			pq := v.(*ProcessQueue)
+			pq := v.(*processQueue)
 			pq.locked = true
 			pq.lastConsumeTime = time.Now()
 		}
@@ -486,7 +489,7 @@ func (dc *defaultConsumer) lockAll(mq kernel.MessageQueue) {
 			_mq := lockedMQ[idx]
 			v, exist := dc.processQueueTable.Load(_mq)
 			if exist {
-				pq := v.(*ProcessQueue)
+				pq := v.(*processQueue)
 				pq.locked = true
 				pq.lastConsumeTime = time.Now()
 			}
@@ -497,7 +500,7 @@ func (dc *defaultConsumer) lockAll(mq kernel.MessageQueue) {
 			if !set[_mq.HashCode()] {
 				v, exist := dc.processQueueTable.Load(_mq)
 				if exist {
-					pq := v.(*ProcessQueue)
+					pq := v.(*processQueue)
 					pq.locked = true
 					pq.lastLockTime = time.Now()
 					rlog.Warnf("the message queue: %s locked Failed, Group: %s", mq.String(), dc.consumerGroup)
@@ -527,7 +530,7 @@ func (dc *defaultConsumer) unlockAll(oneway bool) {
 			_mq := mqs[idx]
 			v, exist := dc.processQueueTable.Load(_mq)
 			if exist {
-				v.(*ProcessQueue).locked = false
+				v.(*processQueue).locked = false
 				rlog.Warnf("the message queue: %s locked Failed, Group: %s", _mq.String(), dc.consumerGroup)
 			}
 		}
@@ -537,7 +540,7 @@ func (dc *defaultConsumer) unlockAll(oneway bool) {
 func (dc *defaultConsumer) doLock(addr string, body *lockBatchRequestBody) []kernel.MessageQueue {
 	data, _ := json.Marshal(body)
 	request := remote.NewRemotingCommand(kernel.ReqLockBatchMQ, nil, data)
-	response, err := remote.InvokeSync(addr, request, 1*time.Second)
+	response, err := dc.client.InvokeSync(addr, request, 1*time.Second)
 	if err != nil {
 		rlog.Errorf("lock mq to broker: %s error %s", addr, err.Error())
 		return nil
@@ -557,12 +560,12 @@ func (dc *defaultConsumer) doUnlock(addr string, body *lockBatchRequestBody, one
 	data, _ := json.Marshal(body)
 	request := remote.NewRemotingCommand(kernel.ReqUnlockBatchMQ, nil, data)
 	if oneway {
-		err := remote.InvokeOneWay(addr, request, 3*time.Second)
+		err := dc.client.InvokeOneWay(addr, request, 3*time.Second)
 		if err != nil {
 			rlog.Errorf("lock mq to broker with oneway: %s error %s", addr, err.Error())
 		}
 	} else {
-		response, err := remote.InvokeSync(addr, request, 1*time.Second)
+		response, err := dc.client.InvokeSync(addr, request, 1*time.Second)
 		if err != nil {
 			rlog.Errorf("lock mq to broker: %s error %s", addr, err.Error())
 		}
@@ -599,12 +602,13 @@ func (dc *defaultConsumer) updateProcessQueueTable(topic string, mqs []*kernel.M
 	// TODO
 	dc.processQueueTable.Range(func(key, value interface{}) bool {
 		mq := key.(*kernel.MessageQueue)
-		pq := value.(*ProcessQueue)
+		pq := value.(*processQueue)
 		if mq.Topic == topic {
 			if !mqSet[mq] {
 				pq.dropped = true
 				if dc.removeUnnecessaryMessageQueue(mq, pq) {
-					delete(mqSet, mq)
+					//delete(mqSet, mq)
+					dc.processQueueTable.Delete(key)
 					changed = true
 					rlog.Infof("do defaultConsumer, Group:%s, remove unnecessary mq: %s", dc.consumerGroup, mq.String())
 				}
@@ -640,7 +644,7 @@ func (dc *defaultConsumer) updateProcessQueueTable(topic string, mqs []*kernel.M
 					rlog.Debugf("do defaultConsumer, Group: %s, mq already exist, %s", dc.consumerGroup, mq.String())
 				} else {
 					rlog.Infof("do defaultConsumer, Group: %s, add a new mq, %s", dc.consumerGroup, mq.String())
-					pq := &ProcessQueue{}
+					pq := newProcessQueue()
 					dc.processQueueTable.Store(mq, pq)
 					pr := PullRequest{
 						consumerGroup: dc.consumerGroup,
@@ -660,12 +664,9 @@ func (dc *defaultConsumer) updateProcessQueueTable(topic string, mqs []*kernel.M
 	return changed
 }
 
-func (dc *defaultConsumer) removeUnnecessaryMessageQueue(mq *kernel.MessageQueue, pq *ProcessQueue) bool {
+func (dc *defaultConsumer) removeUnnecessaryMessageQueue(mq *kernel.MessageQueue, pq *processQueue) bool {
 	dc.storage.persist([]*kernel.MessageQueue{mq})
 	dc.storage.remove(mq)
-	if dc.cType == _PushConsume && dc.consumeOrderly && Clustering == dc.model {
-		// TODO
-	}
 	return true
 }
 
@@ -684,7 +685,7 @@ func (dc *defaultConsumer) computePullFromWhere(mq *kernel.MessageQueue) int64 {
 				if strings.HasPrefix(mq.Topic, kernel.RetryGroupTopicPrefix) {
 					lastOffset = 0
 				} else {
-					lastOffset, err := kernel.QueryMaxOffset(mq)
+					lastOffset, err := dc.queryMaxOffset(mq)
 					if err == nil {
 						result = lastOffset
 					} else {
@@ -701,7 +702,7 @@ func (dc *defaultConsumer) computePullFromWhere(mq *kernel.MessageQueue) int64 {
 		case ConsumeFromTimestamp:
 			if lastOffset == -1 {
 				if strings.HasPrefix(mq.Topic, kernel.RetryGroupTopicPrefix) {
-					lastOffset, err := kernel.QueryMaxOffset(mq)
+					lastOffset, err := dc.queryMaxOffset(mq)
 					if err == nil {
 						result = lastOffset
 					} else {
@@ -713,7 +714,7 @@ func (dc *defaultConsumer) computePullFromWhere(mq *kernel.MessageQueue) int64 {
 					if err != nil {
 						result = -1
 					} else {
-						lastOffset, err := kernel.SearchOffsetByTimestamp(mq, t.Unix())
+						lastOffset, err := dc.searchOffsetByTimestamp(mq, t.Unix())
 						if err != nil {
 							result = -1
 						} else {
@@ -741,7 +742,7 @@ func (dc *defaultConsumer) findConsumerList(topic string) []string {
 			ConsumerGroup: dc.consumerGroup,
 		}
 		cmd := remote.NewRemotingCommand(kernel.ReqGetConsumerListByGroup, req, nil)
-		res, err := remote.InvokeSync(brokerAddr, cmd, 3*time.Second) // TODO 超时机制有问题
+		res, err := dc.client.InvokeSync(brokerAddr, cmd, 3*time.Second) // TODO 超时机制有问题
 		if err != nil {
 			rlog.Errorf("get consumer list of [%s] from %s error: %s", dc.consumerGroup, brokerAddr, err.Error())
 			return nil
@@ -757,6 +758,61 @@ func (dc *defaultConsumer) findConsumerList(topic string) []string {
 	return nil
 }
 
+func (dc *defaultConsumer) sendBack(msg *kernel.MessageExt, level int) error {
+	return nil
+}
+
+// QueryMaxOffset with specific queueId and topic
+func (dc *defaultConsumer) queryMaxOffset(mq *kernel.MessageQueue) (int64, error) {
+	brokerAddr := kernel.FindBrokerAddrByName(mq.BrokerName)
+	if brokerAddr == "" {
+		kernel.UpdateTopicRouteInfo(mq.Topic)
+		brokerAddr = kernel.FindBrokerAddrByName(mq.Topic)
+	}
+	if brokerAddr == "" {
+		return -1, fmt.Errorf("the broker [%s] does not exist", mq.BrokerName)
+	}
+
+	request := &kernel.GetMaxOffsetRequest{
+		Topic:   mq.Topic,
+		QueueId: mq.QueueId,
+	}
+
+	cmd := remote.NewRemotingCommand(kernel.ReqGetMaxOffset, request, nil)
+	response, err := dc.client.InvokeSync(brokerAddr, cmd, 3*time.Second)
+	if err != nil {
+		return -1, err
+	}
+
+	return strconv.ParseInt(response.ExtFields["offset"], 10, 64)
+}
+
+// SearchOffsetByTimestamp with specific queueId and topic
+func (dc *defaultConsumer) searchOffsetByTimestamp(mq *kernel.MessageQueue, timestamp int64) (int64, error) {
+	brokerAddr := kernel.FindBrokerAddrByName(mq.BrokerName)
+	if brokerAddr == "" {
+		kernel.UpdateTopicRouteInfo(mq.Topic)
+		brokerAddr = kernel.FindBrokerAddrByName(mq.Topic)
+	}
+	if brokerAddr == "" {
+		return -1, fmt.Errorf("the broker [%s] does not exist", mq.BrokerName)
+	}
+
+	request := &kernel.SearchOffsetRequest{
+		Topic:     mq.Topic,
+		QueueId:   mq.QueueId,
+		Timestamp: timestamp,
+	}
+
+	cmd := remote.NewRemotingCommand(kernel.ReqSearchOffsetByTimestamp, request, nil)
+	response, err := dc.client.InvokeSync(brokerAddr, cmd, 3*time.Second)
+	if err != nil {
+		return -1, err
+	}
+
+	return strconv.ParseInt(response.ExtFields["offset"], 10, 64)
+}
+
 func buildSubscriptionData(topic string, selector MessageSelector) *kernel.SubscriptionData {
 	subData := &kernel.SubscriptionData{
 		Topic:     topic,
diff --git a/consumer/offset_store.go b/consumer/offset_store.go
index 6b6d719..e368945 100644
--- a/consumer/offset_store.go
+++ b/consumer/offset_store.go
@@ -50,7 +50,6 @@ func init() {
 }
 
 type OffsetStore interface {
-	load()
 	persist(mqs []*kernel.MessageQueue)
 	remove(mq *kernel.MessageQueue)
 	read(mq *kernel.MessageQueue, t readType) int64
@@ -110,7 +109,7 @@ func (local *localFileOffsetStore) read(mq *kernel.MessageQueue, t readType) int
 }
 
 func (local *localFileOffsetStore) update(mq *kernel.MessageQueue, offset int64, increaseOnly bool) {
-	rlog.Infof("update offset: %s to %d", mq, offset)
+	rlog.Debugf("update offset: %s to %d", mq, offset)
 	localOffset, exist := local.OffsetTable[mq.Topic]
 	if !exist {
 		localOffset = make(map[int]*queueOffset)
@@ -163,35 +162,33 @@ func (local *localFileOffsetStore) persist(mqs []*kernel.MessageQueue) {
 }
 
 func (local *localFileOffsetStore) remove(mq *kernel.MessageQueue) {
-	// unsupported
+	// nothing to do
 }
 
 type remoteBrokerOffsetStore struct {
 	group       string
 	OffsetTable map[string]map[int]*queueOffset `json:"OffsetTable"`
+	client      *kernel.RMQClient
 	mutex       sync.RWMutex
 }
 
-func NewRemoteOffsetStore(group string) OffsetStore {
+func NewRemoteOffsetStore(group string, client *kernel.RMQClient) OffsetStore {
 	return &remoteBrokerOffsetStore{
 		group:       group,
+		client:      client,
 		OffsetTable: make(map[string]map[int]*queueOffset),
 	}
 }
 
-func (remote *remoteBrokerOffsetStore) load() {
-	// unsupported
-}
-
-func (remote *remoteBrokerOffsetStore) persist(mqs []*kernel.MessageQueue) {
-	remote.mutex.Lock()
-	defer remote.mutex.Unlock()
+func (r *remoteBrokerOffsetStore) persist(mqs []*kernel.MessageQueue) {
+	r.mutex.Lock()
+	defer r.mutex.Unlock()
 	if len(mqs) == 0 {
 		return
 	}
 	for idx := range mqs {
 		mq := mqs[idx]
-		offsets, exist := remote.OffsetTable[mq.Topic]
+		offsets, exist := r.OffsetTable[mq.Topic]
 		if !exist {
 			continue
 		}
@@ -200,23 +197,23 @@ func (remote *remoteBrokerOffsetStore) persist(mqs []*kernel.MessageQueue) {
 			continue
 		}
 
-		err := updateConsumeOffsetToBroker(remote.group, mq.Topic, off)
+		err := r.updateConsumeOffsetToBroker(r.group, mq.Topic, off)
 		if err != nil {
 			rlog.Warnf("update offset to broker error: %s, group: %s, queue: %s, offset: %d",
-				err.Error(), remote.group, mq.String(), off.Offset)
+				err.Error(), r.group, mq.String(), off.Offset)
 		} else {
-			rlog.Infof("update offset to broker success, group: %s, topic: %s, queue: %v", remote.group, mq.Topic, off)
+			rlog.Debugf("update offset to broker success, group: %s, topic: %s, queue: %v", r.group, mq.Topic, off)
 		}
 	}
 }
 
-func (remote *remoteBrokerOffsetStore) remove(mq *kernel.MessageQueue) {
-	remote.mutex.Lock()
-	defer remote.mutex.Unlock()
+func (r *remoteBrokerOffsetStore) remove(mq *kernel.MessageQueue) {
+	r.mutex.Lock()
+	defer r.mutex.Unlock()
 	if mq == nil {
 		return
 	}
-	offset, exist := remote.OffsetTable[mq.Topic]
+	offset, exist := r.OffsetTable[mq.Topic]
 	if !exist {
 		return
 	}
@@ -224,38 +221,38 @@ func (remote *remoteBrokerOffsetStore) remove(mq *kernel.MessageQueue) {
 	delete(offset, mq.QueueId)
 }
 
-func (remote *remoteBrokerOffsetStore) read(mq *kernel.MessageQueue, t readType) int64 {
-	remote.mutex.RLock()
+func (r *remoteBrokerOffsetStore) read(mq *kernel.MessageQueue, t readType) int64 {
+	r.mutex.RLock()
 	if t == _ReadFromMemory || t == _ReadMemoryThenStore {
-		off := readFromMemory(remote.OffsetTable, mq)
+		off := readFromMemory(r.OffsetTable, mq)
 		if off >= 0 || (off == -1 && t == _ReadFromMemory) {
-			remote.mutex.RUnlock()
+			r.mutex.RUnlock()
 			return off
 		}
 	}
-	off, err := fetchConsumeOffsetFromBroker(remote.group, mq)
+	off, err := r.fetchConsumeOffsetFromBroker(r.group, mq)
 	if err != nil {
 		rlog.Errorf("fetch offset of %s error: %s", mq.String(), err.Error())
-		remote.mutex.RUnlock()
+		r.mutex.RUnlock()
 		return -1
 	}
-	remote.mutex.RUnlock()
-	remote.update(mq, off, true)
+	r.mutex.RUnlock()
+	r.update(mq, off, true)
 	return off
 }
 
-func (remote *remoteBrokerOffsetStore) update(mq *kernel.MessageQueue, offset int64, increaseOnly bool) {
-	rlog.Infof("update offset: %s to %d", mq, offset)
-	remote.mutex.Lock()
-	defer remote.mutex.Unlock()
-	localOffset, exist := remote.OffsetTable[mq.Topic]
+func (r *remoteBrokerOffsetStore) update(mq *kernel.MessageQueue, offset int64, increaseOnly bool) {
+	rlog.Debugf("update offset: %s to %d", mq, offset)
+	r.mutex.Lock()
+	defer r.mutex.Unlock()
+	localOffset, exist := r.OffsetTable[mq.Topic]
 	if !exist {
 		localOffset = make(map[int]*queueOffset)
-		remote.OffsetTable[mq.Topic] = localOffset
+		r.OffsetTable[mq.Topic] = localOffset
 	}
 	q, exist := localOffset[mq.QueueId]
 	if !exist {
-		rlog.Infof("new queueOffset: %d, off: %d", mq.QueueId, offset)
+		rlog.Infof("add a new queue: %s, off: %d", mq.String(), offset)
 		q = &queueOffset{
 			QueueID: mq.QueueId,
 			Broker:  mq.BrokerName,
@@ -271,20 +268,7 @@ func (remote *remoteBrokerOffsetStore) update(mq *kernel.MessageQueue, offset in
 	}
 }
 
-func readFromMemory(table map[string]map[int]*queueOffset, mq *kernel.MessageQueue) int64 {
-	localOffset, exist := table[mq.Topic]
-	if !exist {
-		return -1
-	}
-	off, exist := localOffset[mq.QueueId]
-	if !exist {
-		return -1
-	}
-
-	return off.Offset
-}
-
-func fetchConsumeOffsetFromBroker(group string, mq *kernel.MessageQueue) (int64, error) {
+func (r *remoteBrokerOffsetStore) fetchConsumeOffsetFromBroker(group string, mq *kernel.MessageQueue) (int64, error) {
 	broker := kernel.FindBrokerAddrByName(mq.BrokerName)
 	if broker == "" {
 		kernel.UpdateTopicRouteInfo(mq.Topic)
@@ -299,7 +283,7 @@ func fetchConsumeOffsetFromBroker(group string, mq *kernel.MessageQueue) (int64,
 		QueueId:       mq.QueueId,
 	}
 	cmd := remote.NewRemotingCommand(kernel.ReqQueryConsumerOffset, queryOffsetRequest, nil)
-	res, err := remote.InvokeSync(broker, cmd, 3*time.Second)
+	res, err := r.client.InvokeSync(broker, cmd, 3*time.Second)
 	if err != nil {
 		return -1, err
 	}
@@ -316,7 +300,7 @@ func fetchConsumeOffsetFromBroker(group string, mq *kernel.MessageQueue) (int64,
 	return off, nil
 }
 
-func updateConsumeOffsetToBroker(group, topic string, queue *queueOffset) error {
+func (r *remoteBrokerOffsetStore) updateConsumeOffsetToBroker(group, topic string, queue *queueOffset) error {
 	broker := kernel.FindBrokerAddrByName(queue.Broker)
 	if broker == "" {
 		kernel.UpdateTopicRouteInfo(topic)
@@ -333,5 +317,18 @@ func updateConsumeOffsetToBroker(group, topic string, queue *queueOffset) error
 		CommitOffset:  queue.Offset,
 	}
 	cmd := remote.NewRemotingCommand(kernel.ReqUpdateConsumerOffset, updateOffsetRequest, nil)
-	return remote.InvokeOneWay(broker, cmd, 5*time.Second)
+	return r.client.InvokeOneWay(broker, cmd, 5*time.Second)
+}
+
+func readFromMemory(table map[string]map[int]*queueOffset, mq *kernel.MessageQueue) int64 {
+	localOffset, exist := table[mq.Topic]
+	if !exist {
+		return -1
+	}
+	off, exist := localOffset[mq.QueueId]
+	if !exist {
+		return -1
+	}
+
+	return off.Offset
 }
diff --git a/consumer/process_queue.go b/consumer/process_queue.go
index f4367f9..77e84fa 100644
--- a/consumer/process_queue.go
+++ b/consumer/process_queue.go
@@ -18,9 +18,13 @@ limitations under the License.
 package consumer
 
 import (
-	"container/list"
 	"github.com/apache/rocketmq-client-go/kernel"
+	"github.com/apache/rocketmq-client-go/rlog"
+	"github.com/emirpasic/gods/maps/treemap"
+	"github.com/emirpasic/gods/utils"
+	"strconv"
 	"sync"
+	"sync/atomic"
 	"time"
 )
 
@@ -30,10 +34,10 @@ const (
 	_PullMaxIdleTime      = 120 * time.Second
 )
 
-type ProcessQueue struct {
+type processQueue struct {
+	msgCache                   *treemap.Map
 	mutex                      sync.RWMutex
-	msgCache                   list.List // sorted
-	cachedMsgCount             int
+	cachedMsgCount             int64
 	cachedMsgSize              int64
 	consumeLock                sync.Mutex
 	consumingMsgOrderlyTreeMap sync.Map
@@ -46,61 +50,192 @@ type ProcessQueue struct {
 	lastLockTime               time.Time
 	consuming                  bool
 	msgAccCnt                  int64
-	once                       sync.Once
+	lockConsume                sync.Mutex
+	msgCh                      chan []*kernel.MessageExt
 }
 
-func (pq *ProcessQueue) isPullExpired() bool {
-	return false
-}
-
-func (pq *ProcessQueue) getMaxSpan() int {
-	return pq.msgCache.Len()
+func newProcessQueue() *processQueue {
+	pq := &processQueue{
+		msgCache:        treemap.NewWith(utils.Int64Comparator),
+		lastPullTime:    time.Now(),
+		lastConsumeTime: time.Now(),
+		lastLockTime:    time.Now(),
+		msgCh:           make(chan []*kernel.MessageExt, 32),
+	}
+	return pq
 }
 
-func (pq *ProcessQueue) putMessage(messages []*kernel.MessageExt) {
-	pq.once.Do(func() {
-		pq.msgCache.Init()
-	})
-	localList := list.New()
-	for idx := range messages {
-		localList.PushBack(messages[idx])
-		pq.queueOffsetMax = messages[idx].QueueOffset
+func (pq *processQueue) putMessage(messages ...*kernel.MessageExt) {
+	if messages == nil || len(messages) == 0 {
+		return
 	}
 	pq.mutex.Lock()
-	pq.msgCache.PushBackList(localList)
+	pq.msgCh <- messages // 放锁外面会挂
+	validMessageCount := 0
+	for idx := range messages {
+		msg := messages[idx]
+		_, found := pq.msgCache.Get(msg.QueueOffset)
+		if found {
+			continue
+		}
+		pq.msgCache.Put(msg.QueueOffset, msg)
+		validMessageCount++
+		pq.queueOffsetMax = msg.QueueOffset
+		atomic.AddInt64(&pq.cachedMsgSize, int64(len(msg.Body)))
+	}
 	pq.mutex.Unlock()
+
+	atomic.AddInt64(&pq.cachedMsgCount, int64(validMessageCount))
+
+	if pq.msgCache.Size() > 0 && !pq.consuming {
+		pq.consuming = true
+	}
+
+	msg := messages[len(messages)-1]
+	maxOffset, err := strconv.ParseInt(msg.Properties[kernel.PropertyMaxOffset], 10, 64)
+	if err != nil {
+		acc := maxOffset - msg.QueueOffset
+		if acc > 0 {
+			pq.msgAccCnt = acc
+		}
+	}
 }
 
-func (pq *ProcessQueue) removeMessage(number int) int64 {
-	result := pq.queueOffsetMax + 1
+func (pq *processQueue) removeMessage(messages ...*kernel.MessageExt) int64 {
+	result := int64(-1)
 	pq.mutex.Lock()
-	for i := 0; i < number && pq.msgCache.Len() > 0; i++ {
-		head := pq.msgCache.Front()
-		pq.msgCache.Remove(head)
-		result = head.Value.(*kernel.MessageExt).QueueOffset
+	pq.lastConsumeTime = time.Now()
+	if !pq.msgCache.Empty() {
+		result = pq.queueOffsetMax + 1
+		removedCount := 0
+		for idx := range messages {
+			msg := messages[idx]
+			_, found := pq.msgCache.Get(msg.QueueOffset)
+			if !found {
+				continue
+			}
+			pq.msgCache.Remove(msg.QueueOffset)
+			removedCount++
+			atomic.AddInt64(&pq.cachedMsgSize, int64(-len(msg.Body)))
+		}
+		atomic.AddInt64(&pq.cachedMsgCount, int64(-removedCount))
 	}
-	pq.mutex.Unlock()
-	if pq.msgCache.Len() > 0 {
-		result = pq.msgCache.Front().Value.(*kernel.MessageExt).QueueOffset
+	if !pq.msgCache.Empty() {
+		first, _ := pq.msgCache.Min()
+		result = first.(int64)
 	}
+	pq.mutex.Unlock()
 	return result
 }
 
-func (pq *ProcessQueue) takeMessages(number int) []*kernel.MessageExt {
-	for pq.msgCache.Len() == 0 {
+func (pq *processQueue) isLockExpired() bool {
+	return time.Now().Sub(pq.lastLockTime) > _RebalanceLockMaxTime
+}
+
+func (pq *processQueue) isPullExpired() bool {
+	return time.Now().Sub(pq.lastPullTime) > _PullMaxIdleTime
+}
+
+func (pq *processQueue) cleanExpiredMsg(consumer defaultConsumer) {
+	if consumer.option.ConsumeOrderly {
+		return
+	}
+	var loop = 16
+	if pq.msgCache.Size() < 16 {
+		loop = pq.msgCache.Size()
+	}
+
+	for i := 0; i < loop; i++ {
+		pq.mutex.RLock()
+		if pq.msgCache.Empty() {
+			pq.mutex.RLock()
+			return
+		}
+		_, firstValue := pq.msgCache.Min()
+		msg := firstValue.(*kernel.MessageExt)
+		startTime := msg.Properties[kernel.PropertyConsumeStartTime]
+		if startTime != "" {
+			st, err := strconv.ParseInt(startTime, 10, 64)
+			if err != nil {
+				rlog.Warnf("parse message start consume time error: %s, origin str is: %s", startTime)
+				continue
+			}
+			if time.Now().Unix()-st <= int64(consumer.option.ConsumeTimeout) {
+				pq.mutex.RLock()
+				return
+			}
+		}
+		pq.mutex.RLock()
+
+		err := consumer.sendBack(msg, 3)
+		if err != nil {
+			rlog.Errorf("send message back to broker error: %s when clean expired messages", err.Error())
+			continue
+		}
+		pq.removeMessage(msg)
+	}
+}
+
+func (pq *processQueue) getMaxSpan() int {
+	pq.mutex.RLock()
+	defer pq.mutex.RUnlock()
+	if pq.msgCache.Size() == 0 {
+		return 0
+	}
+	firstKey, _ := pq.msgCache.Min()
+	lastKey, _ := pq.msgCache.Max()
+	return int(lastKey.(int64) - firstKey.(int64))
+}
+
+func (pq *processQueue) getMessages() []*kernel.MessageExt {
+	return <-pq.msgCh
+}
+
+func (pq *processQueue) takeMessages(number int) []*kernel.MessageExt {
+	for pq.msgCache.Empty() {
 		time.Sleep(10 * time.Millisecond)
 	}
 	result := make([]*kernel.MessageExt, number)
 	i := 0
 	pq.mutex.Lock()
 	for ; i < number; i++ {
-		e := pq.msgCache.Front()
-		if e == nil {
+		k, v := pq.msgCache.Min()
+		if v == nil {
 			break
 		}
-		result[i] = e.Value.(*kernel.MessageExt)
-		pq.msgCache.Remove(e)
+		result[i] = v.(*kernel.MessageExt)
+		pq.msgCache.Remove(k)
 	}
 	pq.mutex.Unlock()
 	return result[:i]
 }
+
+func (pq *processQueue) Min() int64 {
+	if pq.msgCache.Empty() {
+		return -1
+	}
+	k, _ := pq.msgCache.Min()
+	if k != nil {
+		return k.(int64)
+	}
+	return -1
+}
+
+func (pq *processQueue) Max() int64 {
+	if pq.msgCache.Empty() {
+		return -1
+	}
+	k, _ := pq.msgCache.Max()
+	if k != nil {
+		return k.(int64)
+	}
+	return -1
+}
+
+func (pq *processQueue) clear() {
+	pq.mutex.Lock()
+	pq.msgCache.Clear()
+	pq.cachedMsgCount = 0
+	pq.cachedMsgSize = 0
+	pq.queueOffsetMax = 0
+}
diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index fc78e3e..2039be1 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -22,6 +22,7 @@ import (
 	"errors"
 	"github.com/apache/rocketmq-client-go/kernel"
 	"github.com/apache/rocketmq-client-go/rlog"
+	"github.com/apache/rocketmq-client-go/utils"
 	"math"
 	"strconv"
 	"time"
@@ -55,11 +56,13 @@ type pushConsumer struct {
 	queueFlowControlTimes        int
 	queueMaxSpanFlowControlTimes int
 	consume                      func(*ConsumeMessageContext, []*kernel.MessageExt) (ConsumeResult, error)
-	submitToConsume              func(*ProcessQueue, *kernel.MessageQueue)
+	submitToConsume              func(*processQueue, *kernel.MessageQueue)
 	subscribedTopic              map[string]string
 }
 
 func NewPushConsumer(consumerGroup string, opt ConsumerOption) PushConsumer {
+	opt.InstanceName = "DEFAULT"
+	opt.ClientIP = utils.LocalIP()
 	dc := &defaultConsumer{
 		consumerGroup:  consumerGroup,
 		cType:          _PushConsume,
@@ -119,11 +122,10 @@ func (pc *pushConsumer) Start() error {
 		pc.client = kernel.GetOrNewRocketMQClient(pc.option.ClientOption)
 		if pc.model == Clustering {
 			pc.option.ChangeInstanceNameToPID()
-			pc.storage = NewRemoteOffsetStore(pc.consumerGroup)
+			pc.storage = NewRemoteOffsetStore(pc.consumerGroup, pc.client)
 		} else {
 			pc.storage = NewLocalFileOffsetStore(pc.consumerGroup, pc.client.ClientID())
 		}
-		pc.storage.load()
 		go func() {
 			// todo start clean msg expired
 			// TODO quit
@@ -263,7 +265,7 @@ func (pc *pushConsumer) validate() {
 
 	if pc.option.PullBatchSize < 1 || pc.option.PullBatchSize > 1024 {
 		if pc.option.PullBatchSize == 0 {
-			pc.option.PullBatchSize = 1
+			pc.option.PullBatchSize = 32
 		} else {
 			rlog.Fatal("option.PullBatchSize out of range [1, 1024]")
 		}
@@ -293,7 +295,6 @@ func (pc *pushConsumer) pullMessage(request *PullRequest) {
 		sleepTime = pc.option.PullInterval
 		pq.lastPullTime = time.Now()
 		err := pc.makeSureStateOK()
-		rlog.Debugf("pull MessageQueue: %d", request.mq.QueueId)
 		if err != nil {
 			rlog.Warnf("consumer state error: %s", err.Error())
 			sleepTime = _PullDelayTimeWhenError
@@ -312,8 +313,7 @@ func (pc *pushConsumer) pullMessage(request *PullRequest) {
 			if pc.queueFlowControlTimes%1000 == 0 {
 				rlog.Warnf("the cached message count exceeds the threshold %d, so do flow control, "+
 					"minOffset=%d, maxOffset=%d, count=%d, size=%d MiB, pullRequest=%s, flowControlTimes=%d",
-					pc.option.PullThresholdForQueue, 0, pq.msgCache.Front().Value.(int64),
-					pq.msgCache.Back().Value.(int64),
+					pc.option.PullThresholdForQueue, 0, pq.Min(), pq.Max(),
 					pq.msgCache, cachedMessageSizeInMiB, request.String(), pc.queueFlowControlTimes)
 			}
 			pc.queueFlowControlTimes++
@@ -325,8 +325,7 @@ func (pc *pushConsumer) pullMessage(request *PullRequest) {
 			if pc.queueFlowControlTimes%1000 == 0 {
 				rlog.Warnf("the cached message size exceeds the threshold %d MiB, so do flow control, "+
 					"minOffset=%d, maxOffset=%d, count=%d, size=%d MiB, pullRequest=%s, flowControlTimes=%d",
-					pc.option.PullThresholdSizeForQueue, 0, //processQueue.getMsgTreeMap().firstKey(),
-					0, // TODO processQueue.getMsgTreeMap().lastKey(),
+					pc.option.PullThresholdSizeForQueue, pq.Min(), pq.Max(),
 					pq.msgCache, cachedMessageSizeInMiB, request.String(), pc.queueFlowControlTimes)
 			}
 			pc.queueFlowControlTimes++
@@ -338,12 +337,9 @@ func (pc *pushConsumer) pullMessage(request *PullRequest) {
 			if pq.getMaxSpan() > pc.option.ConsumeConcurrentlyMaxSpan {
 
 				if pc.queueMaxSpanFlowControlTimes%1000 == 0 {
-					rlog.Warnf("the queue's messages, span too long, so do flow control, minOffset=%d, "+
-						"maxOffset=%d, maxSpan=%d, pullRequest=%s, flowControlTimes=%d",
-						0, //processQueue.getMsgTreeMap().firstKey(),
-						0, // processQueue.getMsgTreeMap().lastKey(),
-						pq.getMaxSpan(),
-						request.String(), pc.queueMaxSpanFlowControlTimes)
+					rlog.Warnf("the queue's messages, span too long, limit=%d, so do flow control, minOffset=%d, "+
+						"maxOffset=%d, maxSpan=%d, pullRequest=%s, flowControlTimes=%d", pc.option.ConsumeConcurrentlyMaxSpan,
+						pq.Min(), pq.Max(), pq.getMaxSpan(), request.String(), pc.queueMaxSpanFlowControlTimes)
 				}
 				sleepTime = _PullDelayTimeWhenFlowControl
 				goto NEXT
@@ -452,14 +448,14 @@ func (pc *pushConsumer) pullMessage(request *PullRequest) {
 			if msgFounded != nil && len(msgFounded) != 0 {
 				firstMsgOffset = msgFounded[0].QueueOffset
 				increasePullTPS(pc.consumerGroup, request.mq.Topic, len(msgFounded))
-				pq.putMessage(msgFounded)
+				pq.putMessage(msgFounded...)
 			}
 			if result.NextBeginOffset < prevRequestOffset || firstMsgOffset < prevRequestOffset {
 				rlog.Warnf("[BUG] pull message result maybe data wrong, [nextBeginOffset=%s, "+
 					"firstMsgOffset=%d, prevRequestOffset=%d]", result.NextBeginOffset, firstMsgOffset, prevRequestOffset)
 			}
 		case kernel.PullNoNewMsg:
-			rlog.Infof("Topic: %s, QueueId: %d, no more msg", request.mq.Topic, request.mq.QueueId)
+			rlog.Debugf("Topic: %s, QueueId: %d no more msg, next offset: %d", request.mq.Topic, request.mq.QueueId, result.NextBeginOffset)
 		case kernel.PullNoMsgMatched:
 			request.nextOffset = result.NextBeginOffset
 			pc.correctTagsOffset(request)
@@ -489,6 +485,87 @@ func (pc *pushConsumer) sendMessageBack(ctx *ConsumeMessageContext, msg *kernel.
 	return true
 }
 
+func (pc *pushConsumer) suspend() {
+	pc.pause = true
+	rlog.Infof("suspend consumer: %s", pc.consumerGroup)
+}
+
+func (pc *pushConsumer) resume() {
+	pc.pause = false
+	pc.doBalance()
+	rlog.Infof("resume consumer: %s", pc.consumerGroup)
+}
+
+func (pc *pushConsumer) resetOffset(topic string, table map[kernel.MessageQueue]int64) {
+	//topic := cmd.ExtFields["topic"]
+	//group := cmd.ExtFields["group"]
+	//if topic == "" || group == "" {
+	//	rlog.Warnf("received reset offset command from: %s, but missing params.", from)
+	//	return
+	//}
+	//t, err := strconv.ParseInt(cmd.ExtFields["timestamp"], 10, 64)
+	//if err != nil {
+	//	rlog.Warnf("received reset offset command from: %s, but parse time error: %s", err.Error())
+	//	return
+	//}
+	//rlog.Infof("invoke reset offset operation from broker. brokerAddr=%s, topic=%s, group=%s, timestamp=%v",
+	//	from, topic, group, t)
+	//
+	//offsetTable := make(map[kernel.MessageQueue]int64, 0)
+	//err = json.Unmarshal(cmd.Body, &offsetTable)
+	//if err != nil {
+	//	rlog.Warnf("received reset offset command from: %s, but parse offset table: %s", err.Error())
+	//	return
+	//}
+	//v, exist := c.consumerMap.Load(group)
+	//if !exist {
+	//	rlog.Infof("[reset-offset] consumer dose not exist. group=%s", group)
+	//	return
+	//}
+
+	set := make(map[int]*kernel.MessageQueue, 0)
+	for k := range table {
+		set[k.HashCode()] = &k
+	}
+	pc.processQueueTable.Range(func(key, value interface{}) bool {
+		mqHash := value.(int)
+		pq := value.(*processQueue)
+		if set[mqHash] != nil {
+			pq.dropped = true
+			pq.clear()
+		}
+		return true
+	})
+	time.Sleep(10 * time.Second)
+	v, exist := pc.topicSubscribeInfoTable.Load(topic)
+	if !exist {
+		return
+	}
+	queuesOfTopic := v.(map[int]*kernel.MessageQueue)
+	for k := range queuesOfTopic {
+		q := set[k]
+		if q != nil {
+			pc.storage.update(q, table[*q], false)
+			v, exist := pc.processQueueTable.Load(k)
+			if !exist {
+				continue
+			}
+			pq := v.(*processQueue)
+			pc.removeUnnecessaryMessageQueue(q, pq)
+			delete(queuesOfTopic, k)
+		}
+	}
+}
+
+func (pc *pushConsumer) removeUnnecessaryMessageQueue(mq *kernel.MessageQueue, pq *processQueue) bool {
+	pc.defaultConsumer.removeUnnecessaryMessageQueue(mq, pq)
+	if !pc.consumeOrderly || Clustering != pc.model {
+		return true
+	}
+	// TODO orderly
+	return true
+}
+
 type ConsumeMessageContext struct {
 	consumerGroup string
 	msgs          []*kernel.MessageExt
@@ -499,8 +576,8 @@ type ConsumeMessageContext struct {
 	properties map[string]string
 }
 
-func (pc *pushConsumer) consumeMessageCurrently(pq *ProcessQueue, mq *kernel.MessageQueue) {
-	msgs := pq.takeMessages(32)
+func (pc *pushConsumer) consumeMessageCurrently(pq *processQueue, mq *kernel.MessageQueue) {
+	msgs := pq.getMessages()
 	if msgs == nil {
 		return
 	}
@@ -573,7 +650,7 @@ func (pc *pushConsumer) consumeMessageCurrently(pq *ProcessQueue, mq *kernel.Mes
 					}
 				}
 
-				offset := pq.removeMessage(len(subMsgs))
+				offset := pq.removeMessage(subMsgs...)
 
 				if offset >= 0 && !pq.dropped {
 					pc.storage.update(mq, int64(offset), true)
@@ -591,5 +668,5 @@ func (pc *pushConsumer) consumeMessageCurrently(pq *ProcessQueue, mq *kernel.Mes
 	}
 }
 
-func (pc *pushConsumer) consumeMessageOrderly(pq *ProcessQueue, mq *kernel.MessageQueue) {
+func (pc *pushConsumer) consumeMessageOrderly(pq *processQueue, mq *kernel.MessageQueue) {
 }
diff --git a/core/api.go b/core/api.go
index 8a6fdd3..6969d1f 100644
--- a/core/api.go
+++ b/core/api.go
@@ -48,12 +48,12 @@ func (config *ClientConfig) String() string {
 	return str
 }
 
-// NewProducer create a new producer with config
+// NewProducer create a new consumer with config
 func NewProducer(config *ProducerConfig) (Producer, error) {
 	return newDefaultProducer(config)
 }
 
-// ProducerConfig define a producer
+// ProducerConfig define a consumer
 type ProducerConfig struct {
 	ClientConfig
 	SendMsgTimeout int
diff --git a/core/error.go b/core/error.go
index 6be7883..5f708ee 100644
--- a/core/error.go
+++ b/core/error.go
@@ -46,7 +46,7 @@ func (e rmqError) Error() string {
 	case ErrMallocFailed:
 		return "malloc memory failed"
 	case ErrProducerStartFailed:
-		return "start producer failed"
+		return "start consumer failed"
 	case ErrSendSyncFailed:
 		return "send message with sync failed"
 	case ErrSendOrderlyFailed:
diff --git a/core/producer.go b/core/producer.go
index 22b49ed..e75e0ff 100644
--- a/core/producer.go
+++ b/core/producer.go
@@ -178,7 +178,7 @@ func (p *defaultProducer) String() string {
 	return p.config.String()
 }
 
-// Start the producer.
+// Start the consumer.
 func (p *defaultProducer) Start() error {
 	err := rmqError(C.StartProducer(p.cproduer))
 	if err != NIL {
@@ -187,7 +187,7 @@ func (p *defaultProducer) Start() error {
 	return nil
 }
 
-// Shutdown the producer.
+// Shutdown the consumer.
 func (p *defaultProducer) Shutdown() error {
 	err := rmqError(C.ShutdownProducer(p.cproduer))
 
diff --git a/docs/feature.md b/docs/feature.md
index bc23be2..9caa789 100644
--- a/docs/feature.md
+++ b/docs/feature.md
@@ -1,92 +1,72 @@
 # Feature
 
 ## Producer
-- [ ] ProducerType
-    - [ ] DefaultProducer
-    - [ ] TransactionProducer
-- [ ] API
-    - [ ] Send
-        - [ ] Sync
-        - [ ] Async
-        - [ ] OneWay
-- [ ] Other
-    - [ ] DelayMessage
-    - [ ] Config
-    - [ ] MessageId Generate
-    - [ ] CompressMsg
-    - [ ] TimeOut
-    - [ ] LoadBalance
-    - [ ] DefaultTopic
-    - [ ] VipChannel
-    - [ ] Retry
-    - [ ] SendMessageHook
-    - [ ] CheckRequestQueue
-    - [ ] CheckForbiddenHookList
-    - [ ] MQFaultStrategy
+
+### MessageType
+- [x] NormalMessage
+- [ ] TransactionMessage
+- [ ] DelayMessage
+
+### SendWith    
+- [x] Sync
+- [ ] Async
+- [x] OneWay
+
+### Other    
+- [ ] Config
+- [ ] MessageId Generate
+- [ ] CompressMsg
+- [ ] LoadBalance
+- [ ] DefaultTopic
+- [ ] VipChannel
+- [ ] Retry
+- [ ] Hook
+- [ ] CheckRequestQueue
+- [ ] MQFaultStrategy
 
 ## Consumer
-- [ ] ConsumerType
-    - [ ] PushConsumer
-    - [ ] PullConsumer
-- [ ] MessageListener
-    - [ ] Concurrently
-    - [ ] Orderly
-- [ ] MessageModel
-    - [ ] CLUSTERING
-    - [ ] BROADCASTING
-- [ ] OffsetStore
-    - [ ] RemoteBrokerOffsetStore
-        - [ ] many actions
-    - [ ] LocalFileOffsetStore
-- [ ] RebalanceService
-- [ ] PullMessageService
-- [ ] ConsumeMessageService
-- [ ] AllocateMessageQueueStrategy
-    - [ ] AllocateMessageQueueAveragely
-    - [ ] AllocateMessageQueueAveragelyByCircle
-    - [ ] AllocateMessageQueueByConfig
-    - [ ] AllocateMessageQueueByMachineRoom
-- [ ] Other
-    - [ ] Config
-    - [ ] ZIP
-    - [ ] AllocateMessageQueueStrategy
-    - [ ] ConsumeFromWhere
-        - [ ] CONSUME_FROM_LAST_OFFSET
-        - [ ] CONSUME_FROM_FIRST_OFFSET
-        - [ ] CONSUME_FROM_TIMESTAMP
-    - [ ] Retry(sendMessageBack)
-    - [ ] TimeOut(clearExpiredMessage)
-    - [ ] ACK(partSuccess)
-    - [ ] FlowControl(messageCanNotConsume)
-    - [ ] ConsumeMessageHook
-    - [ ] filterMessageHookList
 
-## Manager
-- [ ] Multiple Request API Wrapper
-    - many functions...
-- [ ] Task
-    - [ ] PollNameServer
-    - [ ] Heartbeat
-    - [ ] UpdateTopicRouteInfoFromNameServer
-    - [ ] CleanOfflineBroker
-    - [ ] PersistAllConsumerOffset
-    - [ ] ClearExpiredMessage(form consumer consumeMessageService)
-- [ ] Processor
-    - [ ] CHECK_TRANSACTION_STATE
-    - [ ] NOTIFY_CONSUMER_IDS_CHANGED
-    - [ ] RESET_CONSUMER_CLIENT_OFFSET
-    - [ ] GET_CONSUMER_STATUS_FROM_CLIENT
-    - [ ] GET_CONSUMER_RUNNING_INFO
-    - [ ] CONSUME_MESSAGE_DIRECTLY
+### ReceiveType
+- [x] Push
+- [ ] Pull
+
+### ConsumingType
+- [x] Concurrently
+- [ ] Orderly
+
+### MessageModel
+- [x] CLUSTERING
+- [x] BROADCASTING
+    
+### AllocateMessageQueueStrategy
+- [x] AllocateMessageQueueAveragely
+- [ ] AllocateMessageQueueAveragelyByCircle
+- [ ] AllocateMessageQueueByConfig
+- [ ] AllocateMessageQueueByMachineRoom
+
+### Other
+- [x] Rebalance
+- [x] Flow Control
+- [ ] compress
+- [x] ConsumeFromWhere
+- [ ] Retry(sendMessageBack)
+- [ ] Hook
+
+## Common
+- [ ] PollNameServer
+- [x] Heartbeat
+- [x] UpdateTopicRouteInfoFromNameServer
+- [ ] CleanOfflineBroker
+- [ ] ClearExpiredMessage(form consumer consumeMessageService)
     
 ## Remoting
-- [ ] API
-    - [ ] InvokeSync
-    - [ ] InvokeAsync
-    - [ ] InvokeOneWay
-- [ ] Serialize
-    - [ ] JSON
-    - [ ] ROCKETMQ
+- [x] API
+    - [x] InvokeSync
+    - [x] InvokeAsync
+    - [x] InvokeOneWay
+- [x] Serialize
+    - [x] JSON
+    - [x] ROCKETMQ
 - [ ] Other
     - [ ] VIPChannel
     - [ ] RPCHook
diff --git a/examples/producer/main.go b/examples/consumer/main.go
similarity index 92%
rename from examples/producer/main.go
rename to examples/consumer/main.go
index e2d0126..2b563ff 100644
--- a/examples/producer/main.go
+++ b/examples/consumer/main.go
@@ -22,6 +22,7 @@ import (
 	"github.com/apache/rocketmq-client-go/consumer"
 	"github.com/apache/rocketmq-client-go/kernel"
 	"os"
+	"sync/atomic"
 	"time"
 )
 
@@ -30,9 +31,13 @@ func main() {
 		ConsumerModel: consumer.Clustering,
 		FromWhere:     consumer.ConsumeFromFirstOffset,
 	})
+	var count int64
 	err := c.Subscribe("test", consumer.MessageSelector{}, func(ctx *consumer.ConsumeMessageContext,
 		msgs []*kernel.MessageExt) (consumer.ConsumeResult, error) {
-		fmt.Println(msgs)
+		c := atomic.AddInt64(&count, int64(len(msgs)))
+		if c%1000 == 0 {
+			fmt.Println(c)
+		}
 		return consumer.ConsumeSuccess, nil
 	})
 	if err != nil {
diff --git a/go.mod b/go.mod
index b2ca040..419d8ba 100644
--- a/go.mod
+++ b/go.mod
@@ -3,15 +3,12 @@ module github.com/apache/rocketmq-client-go
 go 1.11
 
 require (
-	github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc // indirect
-	github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf // indirect
 	github.com/emirpasic/gods v1.12.0
 	github.com/sirupsen/logrus v1.3.0
 	github.com/stretchr/testify v1.3.0
 	github.com/tidwall/gjson v1.2.1
 	github.com/tidwall/match v1.0.1 // indirect
 	github.com/tidwall/pretty v0.0.0-20180105212114-65a9db5fad51 // indirect
-	gopkg.in/alecthomas/kingpin.v2 v2.2.6
 )
 
 replace (
diff --git a/go.sum b/go.sum
index da32c5e..61c5e19 100644
--- a/go.sum
+++ b/go.sum
@@ -1,7 +1,3 @@
-github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc h1:cAKDfWh5VpdgMhJosfJnn5/FoN2SRZ4p7fJNX58YPaU=
-github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
-github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf h1:qet1QNfXsQxTZqLG4oE62mJzwPIB8+Tee4RNCL9ulrY=
-github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
 github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
 github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
 github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
@@ -30,5 +26,3 @@ golang.org/x/crypto v0.0.0-20180904163835-0709b304e793 h1:u+LnwYTOOW7Ukr/fppxEb1
 golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
 golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33 h1:I6FyU15t786LL7oL/hn43zqTuEGr4PN7F4XJ1p4E3Y8=
 golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
-gopkg.in/alecthomas/kingpin.v2 v2.2.6 h1:jMFz6MfLP0/4fUyZle81rXUoxOBFi19VUFKVDOQfozc=
-gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
diff --git a/kernel/client.go b/kernel/client.go
index e388411..ae258e4 100644
--- a/kernel/client.go
+++ b/kernel/client.go
@@ -47,7 +47,7 @@ const (
 	_PersistOffset = 5 * time.Second
 
 	// Rebalance interval
-	_RebalanceInterval = 20 * time.Millisecond
+	_RebalanceInterval = 100 * time.Millisecond
 )
 
 var (
@@ -82,7 +82,6 @@ type InnerProducer interface {
 	IsPublishTopicNeedUpdate(topic string) bool
 	GetCheckListener() func(msg *MessageExt)
 	GetTransactionListener() TransactionListener
-	//CheckTransactionState()
 	isUnitMode() bool
 }
 
@@ -103,13 +102,25 @@ type RMQClient struct {
 	// group -> InnerConsumer
 	consumerMap sync.Map
 	once        sync.Once
+
+	remoteClient *remote.RemotingClient
 }
 
 var clientMap sync.Map
 
 func GetOrNewRocketMQClient(option ClientOption) *RMQClient {
-	client := &RMQClient{option: option}
-	actual, _ := clientMap.LoadOrStore(client.ClientID(), client)
+	client := &RMQClient{
+		option:       option,
+		remoteClient: remote.NewRemotingClient(),
+	}
+	actual, loaded := clientMap.LoadOrStore(client.ClientID(), client)
+	if !loaded {
+		client.remoteClient.RegisterRequestFunc(ReqNotifyConsumerIdsChanged, func(req *remote.RemotingCommand) *remote.RemotingCommand {
+			rlog.Infof("receive broker's notification, the consumer group: %s", req.ExtFields["consumerGroup"])
+			client.RebalanceImmediately()
+			return nil
+		})
+	}
 	return actual.(*RMQClient)
 }
 
@@ -129,7 +140,13 @@ func (c *RMQClient) Start() {
 		}()
 
 		// TODO cleanOfflineBroker & sendHeartbeatToAllBrokerWithLock
-		go func() {}()
+		go func() {
+			for {
+				cleanOfflineBroker()
+				c.SendHeartbeatToAllBrokerWithLock()
+				time.Sleep(_HeartbeatBrokerInterval)
+			}
+		}()
 
 		// schedule persist offset
 		go func() {
@@ -147,7 +164,7 @@ func (c *RMQClient) Start() {
 		go func() {
 			for {
 				c.RebalanceImmediately()
-				time.Sleep(time.Second)
+				time.Sleep(_RebalanceInterval)
 			}
 		}()
 	})
@@ -161,9 +178,20 @@ func (c *RMQClient) ClientID() string {
 	return id
 }
 
+func (c *RMQClient) InvokeSync(addr string, request *remote.RemotingCommand,
+	timeoutMillis time.Duration) (*remote.RemotingCommand, error) {
+	return c.remoteClient.InvokeSync(addr, request, timeoutMillis)
+}
+
+func (c *RMQClient) InvokeOneWay(addr string, request *remote.RemotingCommand,
+	timeoutMillis time.Duration) error {
+	return c.remoteClient.InvokeOneWay(addr, request, timeoutMillis)
+}
+
 func (c *RMQClient) CheckClientInBroker() {
 }
 
+// TODO
 func (c *RMQClient) SendHeartbeatToAllBrokerWithLock() {
 	hbData := &heartbeatData{
 		ClientId: c.ClientID(),
@@ -190,7 +218,7 @@ func (c *RMQClient) SendHeartbeatToAllBrokerWithLock() {
 	hbData.ProducerDatas = pData
 	hbData.ConsumerDatas = cData
 	if len(pData) == 0 && len(cData) == 0 {
-		rlog.Warn("sending heartbeat, but no consumer and no producer")
+		rlog.Info("sending heartbeat, but no consumer and no consumer")
 		return
 	}
 	brokerAddressesMap.Range(func(key, value interface{}) bool {
@@ -198,7 +226,7 @@ func (c *RMQClient) SendHeartbeatToAllBrokerWithLock() {
 		data := value.(*BrokerData)
 		for id, addr := range data.BrokerAddresses {
 			cmd := remote.NewRemotingCommand(ReqHeartBeat, nil, hbData.encode())
-			response, err := remote.InvokeSync(addr, cmd, 3*time.Second)
+			response, err := c.remoteClient.InvokeSync(addr, cmd, 3*time.Second)
 			if err != nil {
 				rlog.Warnf("send heart beat to broker error: %s", err.Error())
 				return true
@@ -213,7 +241,7 @@ func (c *RMQClient) SendHeartbeatToAllBrokerWithLock() {
 					brokerVersionMap.Store(brokerName, m)
 				}
 				m[brokerName] = int32(response.Version)
-				rlog.Infof("send heart beat to broker[%s %s %s] success", brokerName, id, addr)
+				rlog.Infof("send heart beat to broker[%s %d %s] success", brokerName, id, addr)
 			}
 		}
 		return true
@@ -255,7 +283,7 @@ func (c *RMQClient) UpdateTopicRouteInfo() {
 func (c *RMQClient) SendMessageSync(ctx context.Context, brokerAddrs, brokerName string, request *SendMessageRequest,
 	msgs []*Message) (*SendResult, error) {
 	cmd := getRemotingCommand(request, msgs)
-	response, err := remote.InvokeSync(brokerAddrs, cmd, 3*time.Second)
+	response, err := c.remoteClient.InvokeSync(brokerAddrs, cmd, 3*time.Second)
 	if err != nil {
 		rlog.Warnf("send messages with sync error: %v", err)
 		return nil, err
@@ -281,7 +309,7 @@ func (c *RMQClient) SendMessageAsync(ctx context.Context, brokerAddrs, brokerNam
 func (c *RMQClient) SendMessageOneWay(ctx context.Context, brokerAddrs string, request *SendMessageRequest,
 	msgs []*Message) (*SendResult, error) {
 	cmd := remote.NewRemotingCommand(ReqSendBatchMessage, request, encodeMessages(msgs))
-	err := remote.InvokeOneWay(brokerAddrs, cmd, 3*time.Second)
+	err := c.remoteClient.InvokeOneWay(brokerAddrs, cmd, 3*time.Second)
 	if err != nil {
 		rlog.Warnf("send messages with oneway error: %v", err)
 	}
@@ -337,7 +365,7 @@ func (c *RMQClient) processSendResponse(brokerName string, msgs []*Message, cmd
 // PullMessage with sync
 func (c *RMQClient) PullMessage(ctx context.Context, brokerAddrs string, request *PullMessageRequest) (*PullResult, error) {
 	cmd := remote.NewRemotingCommand(ReqPullMessage, request, nil)
-	res, err := remote.InvokeSync(brokerAddrs, cmd, 3*time.Second)
+	res, err := c.remoteClient.InvokeSync(brokerAddrs, cmd, 3*time.Second)
 	if err != nil {
 		return nil, err
 	}
@@ -390,67 +418,6 @@ func (c *RMQClient) PullMessageAsync(ctx context.Context, brokerAddrs string, re
 	return nil
 }
 
-// QueryMaxOffset with specific queueId and topic
-func QueryMaxOffset(mq *MessageQueue) (int64, error) {
-	brokerAddr := FindBrokerAddrByName(mq.BrokerName)
-	if brokerAddr == "" {
-		UpdateTopicRouteInfo(mq.Topic)
-		brokerAddr = FindBrokerAddrByName(mq.Topic)
-	}
-	if brokerAddr == "" {
-		return -1, fmt.Errorf("the broker [%s] does not exist", mq.BrokerName)
-	}
-
-	request := &GetMaxOffsetRequest{
-		Topic:   mq.Topic,
-		QueueId: mq.QueueId,
-	}
-
-	cmd := remote.NewRemotingCommand(ReqGetMaxOffset, request, nil)
-	response, err := remote.InvokeSync(brokerAddr, cmd, 3*time.Second)
-	if err != nil {
-		return -1, err
-	}
-
-	return strconv.ParseInt(response.ExtFields["offset"], 10, 64)
-}
-
-// QueryConsumerOffset with specific queueId and topic of consumerGroup
-func (c *RMQClient) QueryConsumerOffset(consumerGroup, mq *MessageQueue) (int64, error) {
-	return 0, nil
-}
-
-// SearchOffsetByTimestamp with specific queueId and topic
-func SearchOffsetByTimestamp(mq *MessageQueue, timestamp int64) (int64, error) {
-	brokerAddr := FindBrokerAddrByName(mq.BrokerName)
-	if brokerAddr == "" {
-		UpdateTopicRouteInfo(mq.Topic)
-		brokerAddr = FindBrokerAddrByName(mq.Topic)
-	}
-	if brokerAddr == "" {
-		return -1, fmt.Errorf("the broker [%s] does not exist", mq.BrokerName)
-	}
-
-	request := &SearchOffsetRequest{
-		Topic:     mq.Topic,
-		QueueId:   mq.QueueId,
-		Timestamp: timestamp,
-	}
-
-	cmd := remote.NewRemotingCommand(ReqSearchOffsetByTimestamp, request, nil)
-	response, err := remote.InvokeSync(brokerAddr, cmd, 3*time.Second)
-	if err != nil {
-		return -1, err
-	}
-
-	return strconv.ParseInt(response.ExtFields["offset"], 10, 64)
-}
-
-// UpdateConsumerOffset with specific queueId and topic
-func (c *RMQClient) UpdateConsumerOffset(consumerGroup, topic string, queue int, offset int64) error {
-	return nil
-}
-
 func (c *RMQClient) RegisterConsumer(group string, consumer InnerConsumer) error {
 	c.consumerMap.Store(group, consumer)
 	return nil
diff --git a/kernel/model.go b/kernel/model.go
index d9f4d00..0ddc4c3 100644
--- a/kernel/model.go
+++ b/kernel/model.go
@@ -242,7 +242,7 @@ type FindBrokerResult struct {
 }
 
 type (
-	// groupName of producer
+	// groupName of consumer
 	producerData string
 
 	consumeType string
diff --git a/kernel/request.go b/kernel/request.go
index 2904242..9f16e44 100644
--- a/kernel/request.go
+++ b/kernel/request.go
@@ -24,18 +24,23 @@ import (
 )
 
 const (
-	ReqSendMessage         = int16(10)
-	ReqPullMessage             = int16(11)
-	ReqQueryConsumerOffset     = int16(14)
-	ReqUpdateConsumerOffset    = int16(15)
-	ReqSearchOffsetByTimestamp = int16(30)
-	ReqGetMaxOffset            = int16(30)
-	ReqHeartBeat               = int16(34)
-	ReqGetConsumerListByGroup  = int16(38)
-	ReqLockBatchMQ             = int16(41)
-	ReqUnlockBatchMQ           = int16(42)
-	ReqGetRouteInfoByTopic     = int16(105)
-	ReqSendBatchMessage        = int16(320)
+	ReqSendMessage              = int16(10)
+	ReqPullMessage              = int16(11)
+	ReqQueryConsumerOffset      = int16(14)
+	ReqUpdateConsumerOffset     = int16(15)
+	ReqSearchOffsetByTimestamp  = int16(30)
+	ReqGetMaxOffset             = int16(30)
+	ReqHeartBeat                = int16(34)
+	ReqGetConsumerListByGroup   = int16(38)
+	ReqLockBatchMQ              = int16(41)
+	ReqUnlockBatchMQ            = int16(42)
+	ReqGetRouteInfoByTopic      = int16(105)
+	ReqSendBatchMessage         = int16(320)
+	ReqCheckTransactionState    = int16(39)
+	ReqNotifyConsumerIdsChanged = int16(40)
+	ReqResetConsuemrOffset      = int16(220)
+	ReqGetConsumerRunningInfo   = int16(307)
+	ReqConsumeMessageDirectly   = int16(309)
 )
 
 type SendMessageRequest struct {
diff --git a/kernel/route.go b/kernel/route.go
index 6a232cf..a1bfe3a 100644
--- a/kernel/route.go
+++ b/kernel/route.go
@@ -42,6 +42,7 @@ const (
 
 var (
 	ErrTopicNotExist = errors.New("topic not exist")
+	nameSrvClient    = remote.NewRemotingClient()
 )
 
 var (
@@ -57,6 +58,40 @@ var (
 	lockNamesrv  sync.Mutex
 )
 
+func cleanOfflineBroker() {
+	// TODO optimize
+	lockNamesrv.Lock()
+	brokerAddressesMap.Range(func(key, value interface{}) bool {
+		brokerName := key.(string)
+		bd := value.(*BrokerData)
+		for k, v := range bd.BrokerAddresses {
+			isBrokerAddrExistInTopicRoute := false
+			routeDataMap.Range(func(key, value interface{}) bool {
+				trd := value.(*TopicRouteData)
+				for idx := range trd.BrokerDataList {
+					for _, v1 := range trd.BrokerDataList[idx].BrokerAddresses {
+						if v1 == v {
+							isBrokerAddrExistInTopicRoute = true
+							return false
+						}
+					}
+				}
+				return true
+			})
+			if !isBrokerAddrExistInTopicRoute {
+				delete(bd.BrokerAddresses, k)
+				rlog.Infof("the broker: [name=%s, ID=%d, addr=%s,] is offline, remove it", brokerName, k, v)
+			}
+		}
+		if len(bd.BrokerAddresses) == 0 {
+			brokerAddressesMap.Delete(brokerName)
+			rlog.Infof("the broker [name=%s] name's host is offline, remove it", brokerName)
+		}
+		return true
+	})
+	lockNamesrv.Unlock()
+}
+
 // key is topic, value is TopicPublishInfo
 type TopicPublishInfo struct {
 	OrderTopic          bool
@@ -233,7 +268,7 @@ func queryTopicRouteInfoFromServer(topic string) (*TopicRouteData, error) {
 		Topic: topic,
 	}
 	rc := remote.NewRemotingCommand(ReqGetRouteInfoByTopic, request, nil)
-	response, err := remote.InvokeSync(getNameServerAddress(), rc, requestTimeout)
+	response, err := nameSrvClient.InvokeSync(getNameServerAddress(), rc, requestTimeout)
 
 	if err != nil {
 		return nil, err
@@ -405,7 +440,25 @@ func (routeData *TopicRouteData) clone() *TopicRouteData {
 }
 
 func (routeData *TopicRouteData) equals(data *TopicRouteData) bool {
-	return false
+	if len(routeData.BrokerDataList) != len(data.BrokerDataList) {
+		return false
+	}
+	if len(routeData.QueueDataList) != len(data.QueueDataList) {
+		return false
+	}
+
+	for idx := range routeData.BrokerDataList {
+		if !routeData.BrokerDataList[idx].Equals(data.BrokerDataList[idx]) {
+			return false
+		}
+	}
+
+	for idx := range routeData.QueueDataList {
+		if !routeData.QueueDataList[idx].Equals(data.QueueDataList[idx]) {
+			return false
+		}
+	}
+	return true
 }
 
 func (routeData *TopicRouteData) String() string {
@@ -422,6 +475,30 @@ type QueueData struct {
 	TopicSynFlag   int    `json:"topicSynFlag"`
 }
 
+func (q *QueueData) Equals(qd *QueueData) bool {
+	if q.BrokerName != qd.BrokerName {
+		return false
+	}
+
+	if q.ReadQueueNums != qd.ReadQueueNums {
+		return false
+	}
+
+	if q.WriteQueueNums != qd.WriteQueueNums {
+		return false
+	}
+
+	if q.Perm != qd.Perm {
+		return false
+	}
+
+	if q.TopicSynFlag != qd.TopicSynFlag {
+		return false
+	}
+
+	return true
+}
+
 // BrokerData BrokerData
 type BrokerData struct {
 	Cluster             string           `json:"cluster"`
@@ -429,3 +506,21 @@ type BrokerData struct {
 	BrokerAddresses     map[int64]string `json:"brokerAddrs"`
 	brokerAddressesLock sync.RWMutex
 }
+
+func (b *BrokerData) Equals(bd *BrokerData) bool {
+	if b.Cluster != bd.Cluster {
+		return false
+	}
+
+	if b.BrokerName != bd.BrokerName {
+		return false
+	}
+
+	for k, v := range b.BrokerAddresses {
+		if bd.BrokerAddresses[k] != v {
+			return false
+		}
+	}
+
+	return true
+}
diff --git a/remote/processor.go b/remote/processor.go
deleted file mode 100644
index 1e5d23e..0000000
--- a/remote/processor.go
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package remote
-
-type ClientRequestProcessor func(remotingCommand *RemotingCommand) (responseCommand *RemotingCommand)
-
-//CHECK_TRANSACTION_STATE
-//NOTIFY_CONSUMER_IDS_CHANGED
-//RESET_CONSUMER_CLIENT_OFFSET
-//GET_CONSUMER_STATUS_FROM_CLIENT
-//GET_CONSUMER_RUNNING_INFO
-//CONSUME_MESSAGE_DIRECTLY
diff --git a/remote/remote_client.go b/remote/remote_client.go
index bfe483c..93f81ce 100644
--- a/remote/remote_client.go
+++ b/remote/remote_client.go
@@ -85,14 +85,37 @@ func (r *ResponseFuture) waitResponse() (*RemotingCommand, error) {
 	}
 }
 
-func InvokeSync(addr string, request *RemotingCommand, timeoutMillis time.Duration) (*RemotingCommand, error) {
-	conn, err := connect(addr)
+type ClientRequestFunc func(remotingCommand *RemotingCommand) (responseCommand *RemotingCommand)
+
+type TcpOption struct {
+	// TODO
+}
+
+type RemotingClient struct {
+	responseTable   sync.Map
+	connectionTable sync.Map
+	option          TcpOption
+	processors      map[int16]ClientRequestFunc
+}
+
+func NewRemotingClient() *RemotingClient {
+	return &RemotingClient{
+		processors: make(map[int16]ClientRequestFunc),
+	}
+}
+
+func (c *RemotingClient) RegisterRequestFunc(code int16, f ClientRequestFunc) {
+	c.processors[code] = f
+}
+
+func (c *RemotingClient) InvokeSync(addr string, request *RemotingCommand, timeoutMillis time.Duration) (*RemotingCommand, error) {
+	conn, err := c.connect(addr)
 	if err != nil {
 		return nil, err
 	}
 	resp := NewResponseFuture(request.Opaque, timeoutMillis, nil)
-	responseTable.Store(resp.Opaque, resp)
-	err = sendRequest(conn, request)
+	c.responseTable.Store(resp.Opaque, resp)
+	err = c.sendRequest(conn, request)
 	if err != nil {
 		return nil, err
 	}
@@ -100,14 +123,14 @@ func InvokeSync(addr string, request *RemotingCommand, timeoutMillis time.Durati
 	return resp.waitResponse()
 }
 
-func InvokeAsync(addr string, request *RemotingCommand, timeoutMillis time.Duration, callback func(*ResponseFuture)) error {
-	conn, err := connect(addr)
+func (c *RemotingClient) InvokeAsync(addr string, request *RemotingCommand, timeoutMillis time.Duration, callback func(*ResponseFuture)) error {
+	conn, err := c.connect(addr)
 	if err != nil {
 		return err
 	}
 	resp := NewResponseFuture(request.Opaque, timeoutMillis, callback)
-	responseTable.Store(resp.Opaque, resp)
-	err = sendRequest(conn, request)
+	c.responseTable.Store(resp.Opaque, resp)
+	err = c.sendRequest(conn, request)
 	if err != nil {
 		return err
 	}
@@ -116,26 +139,21 @@ func InvokeAsync(addr string, request *RemotingCommand, timeoutMillis time.Durat
 
 }
 
-func InvokeOneWay(addr string, request *RemotingCommand, timeout time.Duration) error {
-	conn, err := connect(addr)
+func (c *RemotingClient) InvokeOneWay(addr string, request *RemotingCommand, timeout time.Duration) error {
+	conn, err := c.connect(addr)
 	if err != nil {
 		return err
 	}
-	return sendRequest(conn, request)
+	return c.sendRequest(conn, request)
 }
 
-var (
-	responseTable   sync.Map
-	connectionTable sync.Map
-)
-
-func ScanResponseTable() {
+func (c *RemotingClient) ScanResponseTable() {
 	rfs := make([]*ResponseFuture, 0)
-	responseTable.Range(func(key, value interface{}) bool {
+	c.responseTable.Range(func(key, value interface{}) bool {
 		if resp, ok := value.(*ResponseFuture); ok {
 			if (resp.BeginTimestamp + int64(resp.TimeoutMillis) + 1000) <= time.Now().Unix()*1000 {
 				rfs = append(rfs, resp)
-				responseTable.Delete(key)
+				c.responseTable.Delete(key)
 			}
 		}
 		return true
@@ -146,11 +164,11 @@ func ScanResponseTable() {
 	}
 }
 
-func connect(addr string) (net.Conn, error) {
+func (c *RemotingClient) connect(addr string) (net.Conn, error) {
 	//it needs additional locker.
 	connectionLocker.Lock()
 	defer connectionLocker.Unlock()
-	conn, ok := connectionTable.Load(addr)
+	conn, ok := c.connectionTable.Load(addr)
 	if ok {
 		return conn.(net.Conn), nil
 	}
@@ -158,28 +176,27 @@ func connect(addr string) (net.Conn, error) {
 	if err != nil {
 		return nil, err
 	}
-	connectionTable.Store(addr, tcpConn)
-	go receiveResponse(tcpConn)
+	c.connectionTable.Store(addr, tcpConn)
+	go c.receiveResponse(tcpConn)
 	return tcpConn, nil
 }
 
-func receiveResponse(r net.Conn) {
-	scanner := createScanner(r)
-	for {
-		scanner.Scan()
-		receivedRemotingCommand, err := decode(scanner.Bytes())
+func (c *RemotingClient) receiveResponse(r net.Conn) {
+	scanner := c.createScanner(r)
+	for scanner.Scan() {
+		cmd, err := decode(scanner.Bytes())
 		if err != nil {
-			closeConnection(r)
-			rlog.Error(err.Error())
+			c.closeConnection(r)
+			rlog.Errorf("decode RemotingCommand error: %s", err.Error())
 			break
 		}
-		if receivedRemotingCommand.isResponseType() {
-			resp, exist := responseTable.Load(receivedRemotingCommand.Opaque)
+		if cmd.isResponseType() {
+			resp, exist := c.responseTable.Load(cmd.Opaque)
 			if exist {
-				responseTable.Delete(receivedRemotingCommand.Opaque)
+				c.responseTable.Delete(cmd.Opaque)
 				responseFuture := resp.(*ResponseFuture)
 				go func() {
-					responseFuture.ResponseCommand = receivedRemotingCommand
+					responseFuture.ResponseCommand = cmd
 					responseFuture.executeInvokeCallback()
 					if responseFuture.Done != nil {
 						responseFuture.Done <- true
@@ -187,12 +204,30 @@ func receiveResponse(r net.Conn) {
 				}()
 			}
 		} else {
-			// todo handler request from peer
+			f := c.processors[cmd.Code]
+			if f != nil {
+				go func() { // 单个goroutine会造成死锁
+					res := f(cmd)
+					if res != nil {
+						err := c.sendRequest(r, cmd)
+						if err != nil {
+							rlog.Warnf("send response to broker error: %s, type is: %d", err, res.Code)
+						}
+					}
+				}()
+			} else {
+				rlog.Warnf("receive broker's requests, but no func to handle, code is: %d", cmd.Code)
+			}
 		}
 	}
+	if scanner.Err() != nil {
+		rlog.Errorf("net: %s scanner exit, err: %s.", r.RemoteAddr().String(), scanner.Err())
+	} else {
+		rlog.Infof("net: %s scanner exit.", r.RemoteAddr().String())
+	}
 }
 
-func createScanner(r io.Reader) *bufio.Scanner {
+func (c *RemotingClient) createScanner(r io.Reader) *bufio.Scanner {
 	scanner := bufio.NewScanner(r)
 	scanner.Split(func(data []byte, atEOF bool) (int, []byte, error) {
 		defer func() {
@@ -219,27 +254,26 @@ func createScanner(r io.Reader) *bufio.Scanner {
 	return scanner
 }
 
-func sendRequest(conn net.Conn, request *RemotingCommand) error {
+func (c *RemotingClient) sendRequest(conn net.Conn, request *RemotingCommand) error {
 	content, err := encode(request)
 	if err != nil {
 		return err
 	}
 	_, err = conn.Write(content)
 	if err != nil {
-		closeConnection(conn)
+		c.closeConnection(conn)
 		return err
 	}
 	return nil
 }
 
-func closeConnection(toCloseConn net.Conn) {
-	connectionTable.Range(func(key, value interface{}) bool {
+func (c *RemotingClient) closeConnection(toCloseConn net.Conn) {
+	c.connectionTable.Range(func(key, value interface{}) bool {
 		if value == toCloseConn {
-			connectionTable.Delete(key)
+			c.connectionTable.Delete(key)
 			return false
 		} else {
 			return true
 		}
-
 	})
 }
diff --git a/rlog/log.go b/rlog/log.go
index 4f5adaf..d9a1928 100644
--- a/rlog/log.go
+++ b/rlog/log.go
@@ -17,9 +17,7 @@
 
 package rlog
 
-import (
-	"github.com/sirupsen/logrus"
-)
+import "github.com/sirupsen/logrus"
 
 type Logger interface {
 	Debug(i ...interface{})
@@ -34,7 +32,13 @@ type Logger interface {
 	Fatalf(format string, args ...interface{})
 }
 
-var rLog Logger = logrus.New()
+var rLog Logger
+
+func init() {
+	r := logrus.New()
+	//r.SetLevel(logrus.DebugLevel)
+	rLog = r
+}
 
 func SetLogger(log Logger) {
 	rLog = log
diff --git a/utils/helper.go b/utils/helper.go
index 1ad0441..c5495b0 100644
--- a/utils/helper.go
+++ b/utils/helper.go
@@ -20,9 +20,7 @@ package utils
 import (
 	"bytes"
 	"encoding/binary"
-	"errors"
 	"fmt"
-	"net"
 	"os"
 	"sync"
 	"time"
@@ -63,29 +61,6 @@ func updateTimestamp() {
 	nextTimestamp = int64(time.Date(year, month, 1, 0, 0, 0, 0, time.Local).AddDate(0, 1, 0).Unix())
 }
 
-func LocalIP() []byte {
-	ip, err := clientIP4()
-	if err != nil {
-		return []byte{0, 0, 0, 0}
-	}
-	return ip
-}
-
-func clientIP4() ([]byte, error) {
-	addrs, err := net.InterfaceAddrs()
-	if err != nil {
-		return nil, errors.New("unexpected IP address")
-	}
-	for _, addr := range addrs {
-		if ipnet, ok := addr.(*net.IPNet); ok && !ipnet.IP.IsLoopback() {
-			if ip4 := ipnet.IP.To4(); ip4 != nil {
-				return ip4, nil
-			}
-		}
-	}
-	return nil, errors.New("unknown IP address")
-}
-
 func GetAddressByBytes(data []byte) string {
 	return "127.0.0.1"
 }
diff --git a/utils/helper_test.go b/utils/helper_test.go
index 967b0ae..ee55cd2 100644
--- a/utils/helper_test.go
+++ b/utils/helper_test.go
@@ -26,15 +26,6 @@ func TestClassLoaderID(t *testing.T) {
 	}
 }
 
-func TestLocalIP(t *testing.T) {
-	ip := LocalIP()
-	if ip[0] == 0 && ip[1] == 0 && ip[2] == 0 && ip[3] == 0 {
-		t.Errorf("failed to get host public ip4 address")
-	} else {
-		t.Logf("ip4 address: %v", ip)
-	}
-}
-
 func BenchmarkMessageClientID(b *testing.B) {
 	for i := 0; i < b.N; i++ {
 		MessageClientID()
diff --git a/utils/net.go b/utils/net.go
new file mode 100644
index 0000000..5da1edc
--- /dev/null
+++ b/utils/net.go
@@ -0,0 +1,30 @@
+package utils
+
+import (
+	"errors"
+	"fmt"
+	"net"
+)
+
+func LocalIP() string {
+	ip, err := clientIP4()
+	if err != nil {
+		return ""
+	}
+	return fmt.Sprintf("%d.%d.%d.%d", ip[0], ip[1], ip[2], ip[3])
+}
+
+func clientIP4() ([]byte, error) {
+	addrs, err := net.InterfaceAddrs()
+	if err != nil {
+		return nil, errors.New("unexpected IP address")
+	}
+	for _, addr := range addrs {
+		if ipnet, ok := addr.(*net.IPNet); ok && !ipnet.IP.IsLoopback() {
+			if ip4 := ipnet.IP.To4(); ip4 != nil {
+				return ip4, nil
+			}
+		}
+	}
+	return nil, errors.New("unknown IP address")
+}
diff --git a/utils/net_test.go b/utils/net_test.go
new file mode 100644
index 0000000..9f76062
--- /dev/null
+++ b/utils/net_test.go
@@ -0,0 +1,7 @@
+package utils
+
+import "testing"
+
+func TestLocalIP2(t *testing.T) {
+	t.Log(LocalIP())
+}