You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by di...@apache.org on 2019/05/07 03:35:18 UTC
[rocketmq-client-go] branch native updated: Add ClientProcessor and
optimize (#52)
This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git
The following commit(s) were added to refs/heads/native by this push:
new 529c872 Add ClientProcessor and optimize (#52)
529c872 is described below
commit 529c872f2473247f8735d43a2dbc3c9c72c7d93e
Author: wenfeng <sx...@gmail.com>
AuthorDate: Tue May 7 11:35:14 2019 +0800
Add ClientProcessor and optimize (#52)
* replace list with TreeMap in process_queue
* refactor for processor
* update feature list
* add cleanOfflineBroker
---
benchmark/producer.go | 8 +-
benchmark/stable.go | 6 +-
consumer/consumer.go | 100 ++++++++++++----
consumer/offset_store.go | 99 ++++++++-------
consumer/process_queue.go | 205 ++++++++++++++++++++++++++------
consumer/push_consumer.go | 119 ++++++++++++++----
core/api.go | 4 +-
core/error.go | 2 +-
core/producer.go | 4 +-
docs/feature.md | 142 ++++++++++------------
examples/{producer => consumer}/main.go | 7 +-
go.mod | 3 -
go.sum | 6 -
kernel/client.go | 113 +++++++-----------
kernel/model.go | 2 +-
kernel/request.go | 29 +++--
kernel/route.go | 99 ++++++++++++++-
remote/processor.go | 26 ----
remote/remote_client.go | 118 +++++++++++-------
rlog/log.go | 12 +-
utils/helper.go | 25 ----
utils/helper_test.go | 9 --
utils/net.go | 30 +++++
utils/net_test.go | 7 ++
24 files changed, 749 insertions(+), 426 deletions(-)
diff --git a/benchmark/producer.go b/benchmark/producer.go
index e183269..4551615 100644
--- a/benchmark/producer.go
+++ b/benchmark/producer.go
@@ -108,7 +108,7 @@ type producer struct {
func init() {
p := &producer{}
- flags := flag.NewFlagSet("producer", flag.ExitOnError)
+ flags := flag.NewFlagSet("consumer", flag.ExitOnError)
p.flags = flags
flags.StringVar(&p.topic, "t", "", "topic name")
@@ -118,7 +118,7 @@ func init() {
flags.IntVar(&p.testMinutes, "m", 10, "test minutes")
flags.IntVar(&p.bodySize, "s", 32, "body size")
- registerCommand("producer", p)
+ registerCommand("consumer", p)
}
func (bp *producer) produceMsg(stati *statiBenchmarkProducerSnapshot, exit chan struct{}) {
@@ -126,14 +126,14 @@ func (bp *producer) produceMsg(stati *statiBenchmarkProducerSnapshot, exit chan
ClientConfig: rocketmq.ClientConfig{GroupID: bp.groupID, NameServer: bp.nameSrv},
})
if err != nil {
- fmt.Printf("new producer error:%s\n", err)
+ fmt.Printf("new consumer error:%s\n", err)
return
}
p.Start()
defer p.Shutdown()
- topic, tag := bp.topic, "benchmark-producer"
+ topic, tag := bp.topic, "benchmark-consumer"
AGAIN:
select {
diff --git a/benchmark/stable.go b/benchmark/stable.go
index 6c7a12c..46c7b5c 100644
--- a/benchmark/stable.go
+++ b/benchmark/stable.go
@@ -145,13 +145,13 @@ func (stp *stableTestProducer) run(args []string) {
ClientConfig: rocketmq.ClientConfig{GroupID: stp.groupID, NameServer: stp.nameSrv},
})
if err != nil {
- fmt.Printf("new producer error:%s\n", err)
+ fmt.Printf("new consumer error:%s\n", err)
return
}
err = p.Start()
if err != nil {
- fmt.Printf("start producer error:%s\n", err)
+ fmt.Printf("start consumer error:%s\n", err)
return
}
defer p.Shutdown()
@@ -256,7 +256,7 @@ func (stc *stableTestConsumer) pullMessage() {
}
func init() {
- // producer
+ // consumer
name := "stableTestProducer"
p := &stableTestProducer{stableTest: &stableTest{}}
p.buildFlags(name)
diff --git a/consumer/consumer.go b/consumer/consumer.go
index f128f66..a6d9500 100644
--- a/consumer/consumer.go
+++ b/consumer/consumer.go
@@ -26,6 +26,7 @@ import (
"github.com/apache/rocketmq-client-go/utils"
"github.com/tidwall/gjson"
"sort"
+ "strconv"
"strings"
"sync"
"sync/atomic"
@@ -181,7 +182,7 @@ const (
type PullRequest struct {
consumerGroup string
mq *kernel.MessageQueue
- pq *ProcessQueue
+ pq *processQueue
nextOffset int64
lockedFirst bool
}
@@ -209,7 +210,7 @@ type ConsumerOption struct {
// Flow control threshold on queue level, each message queue will cache at most 1000 messages by default,
// Consider the {PullBatchSize}, the instantaneous value may exceed the limit
- PullThresholdForQueue int
+ PullThresholdForQueue int64
// Limit the cached message size on queue level, each message queue will cache at most 100 MiB messages by default,
// Consider the {@code pullBatchSize}, the instantaneous value may exceed the limit
@@ -408,8 +409,10 @@ func (dc *defaultConsumer) SubscriptionDataList() []*kernel.SubscriptionData {
}
func (dc *defaultConsumer) makeSureStateOK() error {
- // TODO log
- return nil //dc.state == StateRunning
+ if dc.state != kernel.StateRunning {
+ return fmt.Errorf("state not running, actually: %v", dc.state)
+ }
+ return nil
}
type lockBatchRequestBody struct {
@@ -436,7 +439,7 @@ func (dc *defaultConsumer) lock(mq *kernel.MessageQueue) bool {
_mq := lockedMQ[idx]
v, exist := dc.processQueueTable.Load(_mq)
if exist {
- pq := v.(*ProcessQueue)
+ pq := v.(*processQueue)
pq.locked = true
pq.lastConsumeTime = time.Now()
}
@@ -486,7 +489,7 @@ func (dc *defaultConsumer) lockAll(mq kernel.MessageQueue) {
_mq := lockedMQ[idx]
v, exist := dc.processQueueTable.Load(_mq)
if exist {
- pq := v.(*ProcessQueue)
+ pq := v.(*processQueue)
pq.locked = true
pq.lastConsumeTime = time.Now()
}
@@ -497,7 +500,7 @@ func (dc *defaultConsumer) lockAll(mq kernel.MessageQueue) {
if !set[_mq.HashCode()] {
v, exist := dc.processQueueTable.Load(_mq)
if exist {
- pq := v.(*ProcessQueue)
+ pq := v.(*processQueue)
pq.locked = true
pq.lastLockTime = time.Now()
rlog.Warnf("the message queue: %s locked Failed, Group: %s", mq.String(), dc.consumerGroup)
@@ -527,7 +530,7 @@ func (dc *defaultConsumer) unlockAll(oneway bool) {
_mq := mqs[idx]
v, exist := dc.processQueueTable.Load(_mq)
if exist {
- v.(*ProcessQueue).locked = false
+ v.(*processQueue).locked = false
rlog.Warnf("the message queue: %s locked Failed, Group: %s", _mq.String(), dc.consumerGroup)
}
}
@@ -537,7 +540,7 @@ func (dc *defaultConsumer) unlockAll(oneway bool) {
func (dc *defaultConsumer) doLock(addr string, body *lockBatchRequestBody) []kernel.MessageQueue {
data, _ := json.Marshal(body)
request := remote.NewRemotingCommand(kernel.ReqLockBatchMQ, nil, data)
- response, err := remote.InvokeSync(addr, request, 1*time.Second)
+ response, err := dc.client.InvokeSync(addr, request, 1*time.Second)
if err != nil {
rlog.Errorf("lock mq to broker: %s error %s", addr, err.Error())
return nil
@@ -557,12 +560,12 @@ func (dc *defaultConsumer) doUnlock(addr string, body *lockBatchRequestBody, one
data, _ := json.Marshal(body)
request := remote.NewRemotingCommand(kernel.ReqUnlockBatchMQ, nil, data)
if oneway {
- err := remote.InvokeOneWay(addr, request, 3*time.Second)
+ err := dc.client.InvokeOneWay(addr, request, 3*time.Second)
if err != nil {
rlog.Errorf("lock mq to broker with oneway: %s error %s", addr, err.Error())
}
} else {
- response, err := remote.InvokeSync(addr, request, 1*time.Second)
+ response, err := dc.client.InvokeSync(addr, request, 1*time.Second)
if err != nil {
rlog.Errorf("lock mq to broker: %s error %s", addr, err.Error())
}
@@ -599,12 +602,13 @@ func (dc *defaultConsumer) updateProcessQueueTable(topic string, mqs []*kernel.M
// TODO
dc.processQueueTable.Range(func(key, value interface{}) bool {
mq := key.(*kernel.MessageQueue)
- pq := value.(*ProcessQueue)
+ pq := value.(*processQueue)
if mq.Topic == topic {
if !mqSet[mq] {
pq.dropped = true
if dc.removeUnnecessaryMessageQueue(mq, pq) {
- delete(mqSet, mq)
+ //delete(mqSet, mq)
+ dc.processQueueTable.Delete(key)
changed = true
rlog.Infof("do defaultConsumer, Group:%s, remove unnecessary mq: %s", dc.consumerGroup, mq.String())
}
@@ -640,7 +644,7 @@ func (dc *defaultConsumer) updateProcessQueueTable(topic string, mqs []*kernel.M
rlog.Debugf("do defaultConsumer, Group: %s, mq already exist, %s", dc.consumerGroup, mq.String())
} else {
rlog.Infof("do defaultConsumer, Group: %s, add a new mq, %s", dc.consumerGroup, mq.String())
- pq := &ProcessQueue{}
+ pq := newProcessQueue()
dc.processQueueTable.Store(mq, pq)
pr := PullRequest{
consumerGroup: dc.consumerGroup,
@@ -660,12 +664,9 @@ func (dc *defaultConsumer) updateProcessQueueTable(topic string, mqs []*kernel.M
return changed
}
-func (dc *defaultConsumer) removeUnnecessaryMessageQueue(mq *kernel.MessageQueue, pq *ProcessQueue) bool {
+func (dc *defaultConsumer) removeUnnecessaryMessageQueue(mq *kernel.MessageQueue, pq *processQueue) bool {
dc.storage.persist([]*kernel.MessageQueue{mq})
dc.storage.remove(mq)
- if dc.cType == _PushConsume && dc.consumeOrderly && Clustering == dc.model {
- // TODO
- }
return true
}
@@ -684,7 +685,7 @@ func (dc *defaultConsumer) computePullFromWhere(mq *kernel.MessageQueue) int64 {
if strings.HasPrefix(mq.Topic, kernel.RetryGroupTopicPrefix) {
lastOffset = 0
} else {
- lastOffset, err := kernel.QueryMaxOffset(mq)
+ lastOffset, err := dc.queryMaxOffset(mq)
if err == nil {
result = lastOffset
} else {
@@ -701,7 +702,7 @@ func (dc *defaultConsumer) computePullFromWhere(mq *kernel.MessageQueue) int64 {
case ConsumeFromTimestamp:
if lastOffset == -1 {
if strings.HasPrefix(mq.Topic, kernel.RetryGroupTopicPrefix) {
- lastOffset, err := kernel.QueryMaxOffset(mq)
+ lastOffset, err := dc.queryMaxOffset(mq)
if err == nil {
result = lastOffset
} else {
@@ -713,7 +714,7 @@ func (dc *defaultConsumer) computePullFromWhere(mq *kernel.MessageQueue) int64 {
if err != nil {
result = -1
} else {
- lastOffset, err := kernel.SearchOffsetByTimestamp(mq, t.Unix())
+ lastOffset, err := dc.searchOffsetByTimestamp(mq, t.Unix())
if err != nil {
result = -1
} else {
@@ -741,7 +742,7 @@ func (dc *defaultConsumer) findConsumerList(topic string) []string {
ConsumerGroup: dc.consumerGroup,
}
cmd := remote.NewRemotingCommand(kernel.ReqGetConsumerListByGroup, req, nil)
- res, err := remote.InvokeSync(brokerAddr, cmd, 3*time.Second) // TODO 超时机制有问题
+ res, err := dc.client.InvokeSync(brokerAddr, cmd, 3*time.Second) // TODO 超时机制有问题
if err != nil {
rlog.Errorf("get consumer list of [%s] from %s error: %s", dc.consumerGroup, brokerAddr, err.Error())
return nil
@@ -757,6 +758,61 @@ func (dc *defaultConsumer) findConsumerList(topic string) []string {
return nil
}
+func (dc *defaultConsumer) sendBack(msg *kernel.MessageExt, level int) error {
+ return nil
+}
+
+// QueryMaxOffset with specific queueId and topic
+func (dc *defaultConsumer) queryMaxOffset(mq *kernel.MessageQueue) (int64, error) {
+ brokerAddr := kernel.FindBrokerAddrByName(mq.BrokerName)
+ if brokerAddr == "" {
+ kernel.UpdateTopicRouteInfo(mq.Topic)
+ brokerAddr = kernel.FindBrokerAddrByName(mq.Topic)
+ }
+ if brokerAddr == "" {
+ return -1, fmt.Errorf("the broker [%s] does not exist", mq.BrokerName)
+ }
+
+ request := &kernel.GetMaxOffsetRequest{
+ Topic: mq.Topic,
+ QueueId: mq.QueueId,
+ }
+
+ cmd := remote.NewRemotingCommand(kernel.ReqGetMaxOffset, request, nil)
+ response, err := dc.client.InvokeSync(brokerAddr, cmd, 3*time.Second)
+ if err != nil {
+ return -1, err
+ }
+
+ return strconv.ParseInt(response.ExtFields["offset"], 10, 64)
+}
+
+// SearchOffsetByTimestamp with specific queueId and topic
+func (dc *defaultConsumer) searchOffsetByTimestamp(mq *kernel.MessageQueue, timestamp int64) (int64, error) {
+ brokerAddr := kernel.FindBrokerAddrByName(mq.BrokerName)
+ if brokerAddr == "" {
+ kernel.UpdateTopicRouteInfo(mq.Topic)
+ brokerAddr = kernel.FindBrokerAddrByName(mq.Topic)
+ }
+ if brokerAddr == "" {
+ return -1, fmt.Errorf("the broker [%s] does not exist", mq.BrokerName)
+ }
+
+ request := &kernel.SearchOffsetRequest{
+ Topic: mq.Topic,
+ QueueId: mq.QueueId,
+ Timestamp: timestamp,
+ }
+
+ cmd := remote.NewRemotingCommand(kernel.ReqSearchOffsetByTimestamp, request, nil)
+ response, err := dc.client.InvokeSync(brokerAddr, cmd, 3*time.Second)
+ if err != nil {
+ return -1, err
+ }
+
+ return strconv.ParseInt(response.ExtFields["offset"], 10, 64)
+}
+
func buildSubscriptionData(topic string, selector MessageSelector) *kernel.SubscriptionData {
subData := &kernel.SubscriptionData{
Topic: topic,
diff --git a/consumer/offset_store.go b/consumer/offset_store.go
index 6b6d719..e368945 100644
--- a/consumer/offset_store.go
+++ b/consumer/offset_store.go
@@ -50,7 +50,6 @@ func init() {
}
type OffsetStore interface {
- load()
persist(mqs []*kernel.MessageQueue)
remove(mq *kernel.MessageQueue)
read(mq *kernel.MessageQueue, t readType) int64
@@ -110,7 +109,7 @@ func (local *localFileOffsetStore) read(mq *kernel.MessageQueue, t readType) int
}
func (local *localFileOffsetStore) update(mq *kernel.MessageQueue, offset int64, increaseOnly bool) {
- rlog.Infof("update offset: %s to %d", mq, offset)
+ rlog.Debugf("update offset: %s to %d", mq, offset)
localOffset, exist := local.OffsetTable[mq.Topic]
if !exist {
localOffset = make(map[int]*queueOffset)
@@ -163,35 +162,33 @@ func (local *localFileOffsetStore) persist(mqs []*kernel.MessageQueue) {
}
func (local *localFileOffsetStore) remove(mq *kernel.MessageQueue) {
- // unsupported
+ // nothing to do
}
type remoteBrokerOffsetStore struct {
group string
OffsetTable map[string]map[int]*queueOffset `json:"OffsetTable"`
+ client *kernel.RMQClient
mutex sync.RWMutex
}
-func NewRemoteOffsetStore(group string) OffsetStore {
+func NewRemoteOffsetStore(group string, client *kernel.RMQClient) OffsetStore {
return &remoteBrokerOffsetStore{
group: group,
+ client: client,
OffsetTable: make(map[string]map[int]*queueOffset),
}
}
-func (remote *remoteBrokerOffsetStore) load() {
- // unsupported
-}
-
-func (remote *remoteBrokerOffsetStore) persist(mqs []*kernel.MessageQueue) {
- remote.mutex.Lock()
- defer remote.mutex.Unlock()
+func (r *remoteBrokerOffsetStore) persist(mqs []*kernel.MessageQueue) {
+ r.mutex.Lock()
+ defer r.mutex.Unlock()
if len(mqs) == 0 {
return
}
for idx := range mqs {
mq := mqs[idx]
- offsets, exist := remote.OffsetTable[mq.Topic]
+ offsets, exist := r.OffsetTable[mq.Topic]
if !exist {
continue
}
@@ -200,23 +197,23 @@ func (remote *remoteBrokerOffsetStore) persist(mqs []*kernel.MessageQueue) {
continue
}
- err := updateConsumeOffsetToBroker(remote.group, mq.Topic, off)
+ err := r.updateConsumeOffsetToBroker(r.group, mq.Topic, off)
if err != nil {
rlog.Warnf("update offset to broker error: %s, group: %s, queue: %s, offset: %d",
- err.Error(), remote.group, mq.String(), off.Offset)
+ err.Error(), r.group, mq.String(), off.Offset)
} else {
- rlog.Infof("update offset to broker success, group: %s, topic: %s, queue: %v", remote.group, mq.Topic, off)
+ rlog.Debugf("update offset to broker success, group: %s, topic: %s, queue: %v", r.group, mq.Topic, off)
}
}
}
-func (remote *remoteBrokerOffsetStore) remove(mq *kernel.MessageQueue) {
- remote.mutex.Lock()
- defer remote.mutex.Unlock()
+func (r *remoteBrokerOffsetStore) remove(mq *kernel.MessageQueue) {
+ r.mutex.Lock()
+ defer r.mutex.Unlock()
if mq == nil {
return
}
- offset, exist := remote.OffsetTable[mq.Topic]
+ offset, exist := r.OffsetTable[mq.Topic]
if !exist {
return
}
@@ -224,38 +221,38 @@ func (remote *remoteBrokerOffsetStore) remove(mq *kernel.MessageQueue) {
delete(offset, mq.QueueId)
}
-func (remote *remoteBrokerOffsetStore) read(mq *kernel.MessageQueue, t readType) int64 {
- remote.mutex.RLock()
+func (r *remoteBrokerOffsetStore) read(mq *kernel.MessageQueue, t readType) int64 {
+ r.mutex.RLock()
if t == _ReadFromMemory || t == _ReadMemoryThenStore {
- off := readFromMemory(remote.OffsetTable, mq)
+ off := readFromMemory(r.OffsetTable, mq)
if off >= 0 || (off == -1 && t == _ReadFromMemory) {
- remote.mutex.RUnlock()
+ r.mutex.RUnlock()
return off
}
}
- off, err := fetchConsumeOffsetFromBroker(remote.group, mq)
+ off, err := r.fetchConsumeOffsetFromBroker(r.group, mq)
if err != nil {
rlog.Errorf("fetch offset of %s error: %s", mq.String(), err.Error())
- remote.mutex.RUnlock()
+ r.mutex.RUnlock()
return -1
}
- remote.mutex.RUnlock()
- remote.update(mq, off, true)
+ r.mutex.RUnlock()
+ r.update(mq, off, true)
return off
}
-func (remote *remoteBrokerOffsetStore) update(mq *kernel.MessageQueue, offset int64, increaseOnly bool) {
- rlog.Infof("update offset: %s to %d", mq, offset)
- remote.mutex.Lock()
- defer remote.mutex.Unlock()
- localOffset, exist := remote.OffsetTable[mq.Topic]
+func (r *remoteBrokerOffsetStore) update(mq *kernel.MessageQueue, offset int64, increaseOnly bool) {
+ rlog.Debugf("update offset: %s to %d", mq, offset)
+ r.mutex.Lock()
+ defer r.mutex.Unlock()
+ localOffset, exist := r.OffsetTable[mq.Topic]
if !exist {
localOffset = make(map[int]*queueOffset)
- remote.OffsetTable[mq.Topic] = localOffset
+ r.OffsetTable[mq.Topic] = localOffset
}
q, exist := localOffset[mq.QueueId]
if !exist {
- rlog.Infof("new queueOffset: %d, off: %d", mq.QueueId, offset)
+ rlog.Infof("add a new queue: %s, off: %d", mq.String(), offset)
q = &queueOffset{
QueueID: mq.QueueId,
Broker: mq.BrokerName,
@@ -271,20 +268,7 @@ func (remote *remoteBrokerOffsetStore) update(mq *kernel.MessageQueue, offset in
}
}
-func readFromMemory(table map[string]map[int]*queueOffset, mq *kernel.MessageQueue) int64 {
- localOffset, exist := table[mq.Topic]
- if !exist {
- return -1
- }
- off, exist := localOffset[mq.QueueId]
- if !exist {
- return -1
- }
-
- return off.Offset
-}
-
-func fetchConsumeOffsetFromBroker(group string, mq *kernel.MessageQueue) (int64, error) {
+func (r *remoteBrokerOffsetStore) fetchConsumeOffsetFromBroker(group string, mq *kernel.MessageQueue) (int64, error) {
broker := kernel.FindBrokerAddrByName(mq.BrokerName)
if broker == "" {
kernel.UpdateTopicRouteInfo(mq.Topic)
@@ -299,7 +283,7 @@ func fetchConsumeOffsetFromBroker(group string, mq *kernel.MessageQueue) (int64,
QueueId: mq.QueueId,
}
cmd := remote.NewRemotingCommand(kernel.ReqQueryConsumerOffset, queryOffsetRequest, nil)
- res, err := remote.InvokeSync(broker, cmd, 3*time.Second)
+ res, err := r.client.InvokeSync(broker, cmd, 3*time.Second)
if err != nil {
return -1, err
}
@@ -316,7 +300,7 @@ func fetchConsumeOffsetFromBroker(group string, mq *kernel.MessageQueue) (int64,
return off, nil
}
-func updateConsumeOffsetToBroker(group, topic string, queue *queueOffset) error {
+func (r *remoteBrokerOffsetStore) updateConsumeOffsetToBroker(group, topic string, queue *queueOffset) error {
broker := kernel.FindBrokerAddrByName(queue.Broker)
if broker == "" {
kernel.UpdateTopicRouteInfo(topic)
@@ -333,5 +317,18 @@ func updateConsumeOffsetToBroker(group, topic string, queue *queueOffset) error
CommitOffset: queue.Offset,
}
cmd := remote.NewRemotingCommand(kernel.ReqUpdateConsumerOffset, updateOffsetRequest, nil)
- return remote.InvokeOneWay(broker, cmd, 5*time.Second)
+ return r.client.InvokeOneWay(broker, cmd, 5*time.Second)
+}
+
+func readFromMemory(table map[string]map[int]*queueOffset, mq *kernel.MessageQueue) int64 {
+ localOffset, exist := table[mq.Topic]
+ if !exist {
+ return -1
+ }
+ off, exist := localOffset[mq.QueueId]
+ if !exist {
+ return -1
+ }
+
+ return off.Offset
}
diff --git a/consumer/process_queue.go b/consumer/process_queue.go
index f4367f9..77e84fa 100644
--- a/consumer/process_queue.go
+++ b/consumer/process_queue.go
@@ -18,9 +18,13 @@ limitations under the License.
package consumer
import (
- "container/list"
"github.com/apache/rocketmq-client-go/kernel"
+ "github.com/apache/rocketmq-client-go/rlog"
+ "github.com/emirpasic/gods/maps/treemap"
+ "github.com/emirpasic/gods/utils"
+ "strconv"
"sync"
+ "sync/atomic"
"time"
)
@@ -30,10 +34,10 @@ const (
_PullMaxIdleTime = 120 * time.Second
)
-type ProcessQueue struct {
+type processQueue struct {
+ msgCache *treemap.Map
mutex sync.RWMutex
- msgCache list.List // sorted
- cachedMsgCount int
+ cachedMsgCount int64
cachedMsgSize int64
consumeLock sync.Mutex
consumingMsgOrderlyTreeMap sync.Map
@@ -46,61 +50,192 @@ type ProcessQueue struct {
lastLockTime time.Time
consuming bool
msgAccCnt int64
- once sync.Once
+ lockConsume sync.Mutex
+ msgCh chan []*kernel.MessageExt
}
-func (pq *ProcessQueue) isPullExpired() bool {
- return false
-}
-
-func (pq *ProcessQueue) getMaxSpan() int {
- return pq.msgCache.Len()
+func newProcessQueue() *processQueue {
+ pq := &processQueue{
+ msgCache: treemap.NewWith(utils.Int64Comparator),
+ lastPullTime: time.Now(),
+ lastConsumeTime: time.Now(),
+ lastLockTime: time.Now(),
+ msgCh: make(chan []*kernel.MessageExt, 32),
+ }
+ return pq
}
-func (pq *ProcessQueue) putMessage(messages []*kernel.MessageExt) {
- pq.once.Do(func() {
- pq.msgCache.Init()
- })
- localList := list.New()
- for idx := range messages {
- localList.PushBack(messages[idx])
- pq.queueOffsetMax = messages[idx].QueueOffset
+func (pq *processQueue) putMessage(messages ...*kernel.MessageExt) {
+ if messages == nil || len(messages) == 0 {
+ return
}
pq.mutex.Lock()
- pq.msgCache.PushBackList(localList)
+ pq.msgCh <- messages // 放锁外面会挂
+ validMessageCount := 0
+ for idx := range messages {
+ msg := messages[idx]
+ _, found := pq.msgCache.Get(msg.QueueOffset)
+ if found {
+ continue
+ }
+ pq.msgCache.Put(msg.QueueOffset, msg)
+ validMessageCount++
+ pq.queueOffsetMax = msg.QueueOffset
+ atomic.AddInt64(&pq.cachedMsgSize, int64(len(msg.Body)))
+ }
pq.mutex.Unlock()
+
+ atomic.AddInt64(&pq.cachedMsgCount, int64(validMessageCount))
+
+ if pq.msgCache.Size() > 0 && !pq.consuming {
+ pq.consuming = true
+ }
+
+ msg := messages[len(messages)-1]
+ maxOffset, err := strconv.ParseInt(msg.Properties[kernel.PropertyMaxOffset], 10, 64)
+ if err != nil {
+ acc := maxOffset - msg.QueueOffset
+ if acc > 0 {
+ pq.msgAccCnt = acc
+ }
+ }
}
-func (pq *ProcessQueue) removeMessage(number int) int64 {
- result := pq.queueOffsetMax + 1
+func (pq *processQueue) removeMessage(messages ...*kernel.MessageExt) int64 {
+ result := int64(-1)
pq.mutex.Lock()
- for i := 0; i < number && pq.msgCache.Len() > 0; i++ {
- head := pq.msgCache.Front()
- pq.msgCache.Remove(head)
- result = head.Value.(*kernel.MessageExt).QueueOffset
+ pq.lastConsumeTime = time.Now()
+ if !pq.msgCache.Empty() {
+ result = pq.queueOffsetMax + 1
+ removedCount := 0
+ for idx := range messages {
+ msg := messages[idx]
+ _, found := pq.msgCache.Get(msg.QueueOffset)
+ if !found {
+ continue
+ }
+ pq.msgCache.Remove(msg.QueueOffset)
+ removedCount++
+ atomic.AddInt64(&pq.cachedMsgSize, int64(-len(msg.Body)))
+ }
+ atomic.AddInt64(&pq.cachedMsgCount, int64(-removedCount))
}
- pq.mutex.Unlock()
- if pq.msgCache.Len() > 0 {
- result = pq.msgCache.Front().Value.(*kernel.MessageExt).QueueOffset
+ if !pq.msgCache.Empty() {
+ first, _ := pq.msgCache.Min()
+ result = first.(int64)
}
+ pq.mutex.Unlock()
return result
}
-func (pq *ProcessQueue) takeMessages(number int) []*kernel.MessageExt {
- for pq.msgCache.Len() == 0 {
+func (pq *processQueue) isLockExpired() bool {
+ return time.Now().Sub(pq.lastLockTime) > _RebalanceLockMaxTime
+}
+
+func (pq *processQueue) isPullExpired() bool {
+ return time.Now().Sub(pq.lastPullTime) > _PullMaxIdleTime
+}
+
+func (pq *processQueue) cleanExpiredMsg(consumer defaultConsumer) {
+ if consumer.option.ConsumeOrderly {
+ return
+ }
+ var loop = 16
+ if pq.msgCache.Size() < 16 {
+ loop = pq.msgCache.Size()
+ }
+
+ for i := 0; i < loop; i++ {
+ pq.mutex.RLock()
+ if pq.msgCache.Empty() {
+ pq.mutex.RLock()
+ return
+ }
+ _, firstValue := pq.msgCache.Min()
+ msg := firstValue.(*kernel.MessageExt)
+ startTime := msg.Properties[kernel.PropertyConsumeStartTime]
+ if startTime != "" {
+ st, err := strconv.ParseInt(startTime, 10, 64)
+ if err != nil {
+ rlog.Warnf("parse message start consume time error: %s, origin str is: %s", startTime)
+ continue
+ }
+ if time.Now().Unix()-st <= int64(consumer.option.ConsumeTimeout) {
+ pq.mutex.RLock()
+ return
+ }
+ }
+ pq.mutex.RLock()
+
+ err := consumer.sendBack(msg, 3)
+ if err != nil {
+ rlog.Errorf("send message back to broker error: %s when clean expired messages", err.Error())
+ continue
+ }
+ pq.removeMessage(msg)
+ }
+}
+
+func (pq *processQueue) getMaxSpan() int {
+ pq.mutex.RLock()
+ defer pq.mutex.RUnlock()
+ if pq.msgCache.Size() == 0 {
+ return 0
+ }
+ firstKey, _ := pq.msgCache.Min()
+ lastKey, _ := pq.msgCache.Max()
+ return int(lastKey.(int64) - firstKey.(int64))
+}
+
+func (pq *processQueue) getMessages() []*kernel.MessageExt {
+ return <-pq.msgCh
+}
+
+func (pq *processQueue) takeMessages(number int) []*kernel.MessageExt {
+ for pq.msgCache.Empty() {
time.Sleep(10 * time.Millisecond)
}
result := make([]*kernel.MessageExt, number)
i := 0
pq.mutex.Lock()
for ; i < number; i++ {
- e := pq.msgCache.Front()
- if e == nil {
+ k, v := pq.msgCache.Min()
+ if v == nil {
break
}
- result[i] = e.Value.(*kernel.MessageExt)
- pq.msgCache.Remove(e)
+ result[i] = v.(*kernel.MessageExt)
+ pq.msgCache.Remove(k)
}
pq.mutex.Unlock()
return result[:i]
}
+
+func (pq *processQueue) Min() int64 {
+ if pq.msgCache.Empty() {
+ return -1
+ }
+ k, _ := pq.msgCache.Min()
+ if k != nil {
+ return k.(int64)
+ }
+ return -1
+}
+
+func (pq *processQueue) Max() int64 {
+ if pq.msgCache.Empty() {
+ return -1
+ }
+ k, _ := pq.msgCache.Max()
+ if k != nil {
+ return k.(int64)
+ }
+ return -1
+}
+
+func (pq *processQueue) clear() {
+ pq.mutex.Lock()
+ pq.msgCache.Clear()
+ pq.cachedMsgCount = 0
+ pq.cachedMsgSize = 0
+ pq.queueOffsetMax = 0
+}
diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index fc78e3e..2039be1 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -22,6 +22,7 @@ import (
"errors"
"github.com/apache/rocketmq-client-go/kernel"
"github.com/apache/rocketmq-client-go/rlog"
+ "github.com/apache/rocketmq-client-go/utils"
"math"
"strconv"
"time"
@@ -55,11 +56,13 @@ type pushConsumer struct {
queueFlowControlTimes int
queueMaxSpanFlowControlTimes int
consume func(*ConsumeMessageContext, []*kernel.MessageExt) (ConsumeResult, error)
- submitToConsume func(*ProcessQueue, *kernel.MessageQueue)
+ submitToConsume func(*processQueue, *kernel.MessageQueue)
subscribedTopic map[string]string
}
func NewPushConsumer(consumerGroup string, opt ConsumerOption) PushConsumer {
+ opt.InstanceName = "DEFAULT"
+ opt.ClientIP = utils.LocalIP()
dc := &defaultConsumer{
consumerGroup: consumerGroup,
cType: _PushConsume,
@@ -119,11 +122,10 @@ func (pc *pushConsumer) Start() error {
pc.client = kernel.GetOrNewRocketMQClient(pc.option.ClientOption)
if pc.model == Clustering {
pc.option.ChangeInstanceNameToPID()
- pc.storage = NewRemoteOffsetStore(pc.consumerGroup)
+ pc.storage = NewRemoteOffsetStore(pc.consumerGroup, pc.client)
} else {
pc.storage = NewLocalFileOffsetStore(pc.consumerGroup, pc.client.ClientID())
}
- pc.storage.load()
go func() {
// todo start clean msg expired
// TODO quit
@@ -263,7 +265,7 @@ func (pc *pushConsumer) validate() {
if pc.option.PullBatchSize < 1 || pc.option.PullBatchSize > 1024 {
if pc.option.PullBatchSize == 0 {
- pc.option.PullBatchSize = 1
+ pc.option.PullBatchSize = 32
} else {
rlog.Fatal("option.PullBatchSize out of range [1, 1024]")
}
@@ -293,7 +295,6 @@ func (pc *pushConsumer) pullMessage(request *PullRequest) {
sleepTime = pc.option.PullInterval
pq.lastPullTime = time.Now()
err := pc.makeSureStateOK()
- rlog.Debugf("pull MessageQueue: %d", request.mq.QueueId)
if err != nil {
rlog.Warnf("consumer state error: %s", err.Error())
sleepTime = _PullDelayTimeWhenError
@@ -312,8 +313,7 @@ func (pc *pushConsumer) pullMessage(request *PullRequest) {
if pc.queueFlowControlTimes%1000 == 0 {
rlog.Warnf("the cached message count exceeds the threshold %d, so do flow control, "+
"minOffset=%d, maxOffset=%d, count=%d, size=%d MiB, pullRequest=%s, flowControlTimes=%d",
- pc.option.PullThresholdForQueue, 0, pq.msgCache.Front().Value.(int64),
- pq.msgCache.Back().Value.(int64),
+ pc.option.PullThresholdForQueue, 0, pq.Min(), pq.Max(),
pq.msgCache, cachedMessageSizeInMiB, request.String(), pc.queueFlowControlTimes)
}
pc.queueFlowControlTimes++
@@ -325,8 +325,7 @@ func (pc *pushConsumer) pullMessage(request *PullRequest) {
if pc.queueFlowControlTimes%1000 == 0 {
rlog.Warnf("the cached message size exceeds the threshold %d MiB, so do flow control, "+
"minOffset=%d, maxOffset=%d, count=%d, size=%d MiB, pullRequest=%s, flowControlTimes=%d",
- pc.option.PullThresholdSizeForQueue, 0, //processQueue.getMsgTreeMap().firstKey(),
- 0, // TODO processQueue.getMsgTreeMap().lastKey(),
+ pc.option.PullThresholdSizeForQueue, pq.Min(), pq.Max(),
pq.msgCache, cachedMessageSizeInMiB, request.String(), pc.queueFlowControlTimes)
}
pc.queueFlowControlTimes++
@@ -338,12 +337,9 @@ func (pc *pushConsumer) pullMessage(request *PullRequest) {
if pq.getMaxSpan() > pc.option.ConsumeConcurrentlyMaxSpan {
if pc.queueMaxSpanFlowControlTimes%1000 == 0 {
- rlog.Warnf("the queue's messages, span too long, so do flow control, minOffset=%d, "+
- "maxOffset=%d, maxSpan=%d, pullRequest=%s, flowControlTimes=%d",
- 0, //processQueue.getMsgTreeMap().firstKey(),
- 0, // processQueue.getMsgTreeMap().lastKey(),
- pq.getMaxSpan(),
- request.String(), pc.queueMaxSpanFlowControlTimes)
+ rlog.Warnf("the queue's messages, span too long, limit=%d, so do flow control, minOffset=%d, "+
+ "maxOffset=%d, maxSpan=%d, pullRequest=%s, flowControlTimes=%d", pc.option.ConsumeConcurrentlyMaxSpan,
+ pq.Min(), pq.Max(), pq.getMaxSpan(), request.String(), pc.queueMaxSpanFlowControlTimes)
}
sleepTime = _PullDelayTimeWhenFlowControl
goto NEXT
@@ -452,14 +448,14 @@ func (pc *pushConsumer) pullMessage(request *PullRequest) {
if msgFounded != nil && len(msgFounded) != 0 {
firstMsgOffset = msgFounded[0].QueueOffset
increasePullTPS(pc.consumerGroup, request.mq.Topic, len(msgFounded))
- pq.putMessage(msgFounded)
+ pq.putMessage(msgFounded...)
}
if result.NextBeginOffset < prevRequestOffset || firstMsgOffset < prevRequestOffset {
rlog.Warnf("[BUG] pull message result maybe data wrong, [nextBeginOffset=%s, "+
"firstMsgOffset=%d, prevRequestOffset=%d]", result.NextBeginOffset, firstMsgOffset, prevRequestOffset)
}
case kernel.PullNoNewMsg:
- rlog.Infof("Topic: %s, QueueId: %d, no more msg", request.mq.Topic, request.mq.QueueId)
+ rlog.Debugf("Topic: %s, QueueId: %d no more msg, next offset: %d", request.mq.Topic, request.mq.QueueId, result.NextBeginOffset)
case kernel.PullNoMsgMatched:
request.nextOffset = result.NextBeginOffset
pc.correctTagsOffset(request)
@@ -489,6 +485,87 @@ func (pc *pushConsumer) sendMessageBack(ctx *ConsumeMessageContext, msg *kernel.
return true
}
+func (pc *pushConsumer) suspend() {
+ pc.pause = true
+ rlog.Infof("suspend consumer: %s", pc.consumerGroup)
+}
+
+func (pc *pushConsumer) resume() {
+ pc.pause = false
+ pc.doBalance()
+ rlog.Infof("resume consumer: %s", pc.consumerGroup)
+}
+
+func (pc *pushConsumer) resetOffset(topic string, table map[kernel.MessageQueue]int64) {
+ //topic := cmd.ExtFields["topic"]
+ //group := cmd.ExtFields["group"]
+ //if topic == "" || group == "" {
+ // rlog.Warnf("received reset offset command from: %s, but missing params.", from)
+ // return
+ //}
+ //t, err := strconv.ParseInt(cmd.ExtFields["timestamp"], 10, 64)
+ //if err != nil {
+ // rlog.Warnf("received reset offset command from: %s, but parse time error: %s", err.Error())
+ // return
+ //}
+ //rlog.Infof("invoke reset offset operation from broker. brokerAddr=%s, topic=%s, group=%s, timestamp=%v",
+ // from, topic, group, t)
+ //
+ //offsetTable := make(map[kernel.MessageQueue]int64, 0)
+ //err = json.Unmarshal(cmd.Body, &offsetTable)
+ //if err != nil {
+ // rlog.Warnf("received reset offset command from: %s, but parse offset table: %s", err.Error())
+ // return
+ //}
+ //v, exist := c.consumerMap.Load(group)
+ //if !exist {
+ // rlog.Infof("[reset-offset] consumer dose not exist. group=%s", group)
+ // return
+ //}
+
+ set := make(map[int]*kernel.MessageQueue, 0)
+ for k := range table {
+ set[k.HashCode()] = &k
+ }
+ pc.processQueueTable.Range(func(key, value interface{}) bool {
+ mqHash := value.(int)
+ pq := value.(*processQueue)
+ if set[mqHash] != nil {
+ pq.dropped = true
+ pq.clear()
+ }
+ return true
+ })
+ time.Sleep(10 * time.Second)
+ v, exist := pc.topicSubscribeInfoTable.Load(topic)
+ if !exist {
+ return
+ }
+ queuesOfTopic := v.(map[int]*kernel.MessageQueue)
+ for k := range queuesOfTopic {
+ q := set[k]
+ if q != nil {
+ pc.storage.update(q, table[*q], false)
+ v, exist := pc.processQueueTable.Load(k)
+ if !exist {
+ continue
+ }
+ pq := v.(*processQueue)
+ pc.removeUnnecessaryMessageQueue(q, pq)
+ delete(queuesOfTopic, k)
+ }
+ }
+}
+
+func (pc *pushConsumer) removeUnnecessaryMessageQueue(mq *kernel.MessageQueue, pq *processQueue) bool {
+ pc.defaultConsumer.removeUnnecessaryMessageQueue(mq, pq)
+ if !pc.consumeOrderly || Clustering != pc.model {
+ return true
+ }
+ // TODO orderly
+ return true
+}
+
type ConsumeMessageContext struct {
consumerGroup string
msgs []*kernel.MessageExt
@@ -499,8 +576,8 @@ type ConsumeMessageContext struct {
properties map[string]string
}
-func (pc *pushConsumer) consumeMessageCurrently(pq *ProcessQueue, mq *kernel.MessageQueue) {
- msgs := pq.takeMessages(32)
+func (pc *pushConsumer) consumeMessageCurrently(pq *processQueue, mq *kernel.MessageQueue) {
+ msgs := pq.getMessages()
if msgs == nil {
return
}
@@ -573,7 +650,7 @@ func (pc *pushConsumer) consumeMessageCurrently(pq *ProcessQueue, mq *kernel.Mes
}
}
- offset := pq.removeMessage(len(subMsgs))
+ offset := pq.removeMessage(subMsgs...)
if offset >= 0 && !pq.dropped {
pc.storage.update(mq, int64(offset), true)
@@ -591,5 +668,5 @@ func (pc *pushConsumer) consumeMessageCurrently(pq *ProcessQueue, mq *kernel.Mes
}
}
-func (pc *pushConsumer) consumeMessageOrderly(pq *ProcessQueue, mq *kernel.MessageQueue) {
+func (pc *pushConsumer) consumeMessageOrderly(pq *processQueue, mq *kernel.MessageQueue) {
}
diff --git a/core/api.go b/core/api.go
index 8a6fdd3..6969d1f 100644
--- a/core/api.go
+++ b/core/api.go
@@ -48,12 +48,12 @@ func (config *ClientConfig) String() string {
return str
}
-// NewProducer create a new producer with config
+// NewProducer create a new consumer with config
func NewProducer(config *ProducerConfig) (Producer, error) {
return newDefaultProducer(config)
}
-// ProducerConfig define a producer
+// ProducerConfig define a consumer
type ProducerConfig struct {
ClientConfig
SendMsgTimeout int
diff --git a/core/error.go b/core/error.go
index 6be7883..5f708ee 100644
--- a/core/error.go
+++ b/core/error.go
@@ -46,7 +46,7 @@ func (e rmqError) Error() string {
case ErrMallocFailed:
return "malloc memory failed"
case ErrProducerStartFailed:
- return "start producer failed"
+ return "start consumer failed"
case ErrSendSyncFailed:
return "send message with sync failed"
case ErrSendOrderlyFailed:
diff --git a/core/producer.go b/core/producer.go
index 22b49ed..e75e0ff 100644
--- a/core/producer.go
+++ b/core/producer.go
@@ -178,7 +178,7 @@ func (p *defaultProducer) String() string {
return p.config.String()
}
-// Start the producer.
+// Start the consumer.
func (p *defaultProducer) Start() error {
err := rmqError(C.StartProducer(p.cproduer))
if err != NIL {
@@ -187,7 +187,7 @@ func (p *defaultProducer) Start() error {
return nil
}
-// Shutdown the producer.
+// Shutdown the consumer.
func (p *defaultProducer) Shutdown() error {
err := rmqError(C.ShutdownProducer(p.cproduer))
diff --git a/docs/feature.md b/docs/feature.md
index bc23be2..9caa789 100644
--- a/docs/feature.md
+++ b/docs/feature.md
@@ -1,92 +1,72 @@
# Feature
## Producer
-- [ ] ProducerType
- - [ ] DefaultProducer
- - [ ] TransactionProducer
-- [ ] API
- - [ ] Send
- - [ ] Sync
- - [ ] Async
- - [ ] OneWay
-- [ ] Other
- - [ ] DelayMessage
- - [ ] Config
- - [ ] MessageId Generate
- - [ ] CompressMsg
- - [ ] TimeOut
- - [ ] LoadBalance
- - [ ] DefaultTopic
- - [ ] VipChannel
- - [ ] Retry
- - [ ] SendMessageHook
- - [ ] CheckRequestQueue
- - [ ] CheckForbiddenHookList
- - [ ] MQFaultStrategy
+
+### MessageType
+- [x] NormalMessage
+- [ ] TransactionMessage
+- [ ] DelayMessage
+
+### SendWith
+- [x] Sync
+- [ ] Async
+- [x] OneWay
+
+### Other
+- [ ] Config
+- [ ] MessageId Generate
+- [ ] CompressMsg
+- [ ] LoadBalance
+- [ ] DefaultTopic
+- [ ] VipChannel
+- [ ] Retry
+- [ ] Hook
+- [ ] CheckRequestQueue
+- [ ] MQFaultStrategy
## Consumer
-- [ ] ConsumerType
- - [ ] PushConsumer
- - [ ] PullConsumer
-- [ ] MessageListener
- - [ ] Concurrently
- - [ ] Orderly
-- [ ] MessageModel
- - [ ] CLUSTERING
- - [ ] BROADCASTING
-- [ ] OffsetStore
- - [ ] RemoteBrokerOffsetStore
- - [ ] many actions
- - [ ] LocalFileOffsetStore
-- [ ] RebalanceService
-- [ ] PullMessageService
-- [ ] ConsumeMessageService
-- [ ] AllocateMessageQueueStrategy
- - [ ] AllocateMessageQueueAveragely
- - [ ] AllocateMessageQueueAveragelyByCircle
- - [ ] AllocateMessageQueueByConfig
- - [ ] AllocateMessageQueueByMachineRoom
-- [ ] Other
- - [ ] Config
- - [ ] ZIP
- - [ ] AllocateMessageQueueStrategy
- - [ ] ConsumeFromWhere
- - [ ] CONSUME_FROM_LAST_OFFSET
- - [ ] CONSUME_FROM_FIRST_OFFSET
- - [ ] CONSUME_FROM_TIMESTAMP
- - [ ] Retry(sendMessageBack)
- - [ ] TimeOut(clearExpiredMessage)
- - [ ] ACK(partSuccess)
- - [ ] FlowControl(messageCanNotConsume)
- - [ ] ConsumeMessageHook
- - [ ] filterMessageHookList
-## Manager
-- [ ] Multiple Request API Wrapper
- - many functions...
-- [ ] Task
- - [ ] PollNameServer
- - [ ] Heartbeat
- - [ ] UpdateTopicRouteInfoFromNameServer
- - [ ] CleanOfflineBroker
- - [ ] PersistAllConsumerOffset
- - [ ] ClearExpiredMessage(form consumer consumeMessageService)
-- [ ] Processor
- - [ ] CHECK_TRANSACTION_STATE
- - [ ] NOTIFY_CONSUMER_IDS_CHANGED
- - [ ] RESET_CONSUMER_CLIENT_OFFSET
- - [ ] GET_CONSUMER_STATUS_FROM_CLIENT
- - [ ] GET_CONSUMER_RUNNING_INFO
- - [ ] CONSUME_MESSAGE_DIRECTLY
+### ReceiveType
+- [x] Push
+- [ ] Pull
+
+### ConsumingType
+- [x] Concurrently
+- [ ] Orderly
+
+### MessageModel
+- [x] CLUSTERING
+- [x] BROADCASTING
+
+### AllocateMessageQueueStrategy
+- [x] AllocateMessageQueueAveragely
+- [ ] AllocateMessageQueueAveragelyByCircle
+- [ ] AllocateMessageQueueByConfig
+- [ ] AllocateMessageQueueByMachineRoom
+
+### Other
+- [x] Rebalance
+- [x] Flow Control
+- [ ] compress
+- [x] ConsumeFromWhere
+- [ ] Retry(sendMessageBack)
+- [ ] Hook
+
+## Common
+- [ ] PollNameServer
+- [x] Heartbeat
+- [x] UpdateTopicRouteInfoFromNameServer
+- [ ] CleanOfflineBroker
+- [ ] ClearExpiredMessage(form consumer consumeMessageService)
## Remoting
-- [ ] API
- - [ ] InvokeSync
- - [ ] InvokeAsync
- - [ ] InvokeOneWay
-- [ ] Serialize
- - [ ] JSON
- - [ ] ROCKETMQ
+- [x] API
+ - [x] InvokeSync
+ - [x] InvokeAsync
+ - [x] InvokeOneWay
+- [x] Serialize
+ - [x] JSON
+ - [x] ROCKETMQ
- [ ] Other
- [ ] VIPChannel
- [ ] RPCHook
diff --git a/examples/producer/main.go b/examples/consumer/main.go
similarity index 92%
rename from examples/producer/main.go
rename to examples/consumer/main.go
index e2d0126..2b563ff 100644
--- a/examples/producer/main.go
+++ b/examples/consumer/main.go
@@ -22,6 +22,7 @@ import (
"github.com/apache/rocketmq-client-go/consumer"
"github.com/apache/rocketmq-client-go/kernel"
"os"
+ "sync/atomic"
"time"
)
@@ -30,9 +31,13 @@ func main() {
ConsumerModel: consumer.Clustering,
FromWhere: consumer.ConsumeFromFirstOffset,
})
+ var count int64
err := c.Subscribe("test", consumer.MessageSelector{}, func(ctx *consumer.ConsumeMessageContext,
msgs []*kernel.MessageExt) (consumer.ConsumeResult, error) {
- fmt.Println(msgs)
+ c := atomic.AddInt64(&count, int64(len(msgs)))
+ if c%1000 == 0 {
+ fmt.Println(c)
+ }
return consumer.ConsumeSuccess, nil
})
if err != nil {
diff --git a/go.mod b/go.mod
index b2ca040..419d8ba 100644
--- a/go.mod
+++ b/go.mod
@@ -3,15 +3,12 @@ module github.com/apache/rocketmq-client-go
go 1.11
require (
- github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc // indirect
- github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf // indirect
github.com/emirpasic/gods v1.12.0
github.com/sirupsen/logrus v1.3.0
github.com/stretchr/testify v1.3.0
github.com/tidwall/gjson v1.2.1
github.com/tidwall/match v1.0.1 // indirect
github.com/tidwall/pretty v0.0.0-20180105212114-65a9db5fad51 // indirect
- gopkg.in/alecthomas/kingpin.v2 v2.2.6
)
replace (
diff --git a/go.sum b/go.sum
index da32c5e..61c5e19 100644
--- a/go.sum
+++ b/go.sum
@@ -1,7 +1,3 @@
-github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc h1:cAKDfWh5VpdgMhJosfJnn5/FoN2SRZ4p7fJNX58YPaU=
-github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
-github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf h1:qet1QNfXsQxTZqLG4oE62mJzwPIB8+Tee4RNCL9ulrY=
-github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
@@ -30,5 +26,3 @@ golang.org/x/crypto v0.0.0-20180904163835-0709b304e793 h1:u+LnwYTOOW7Ukr/fppxEb1
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33 h1:I6FyU15t786LL7oL/hn43zqTuEGr4PN7F4XJ1p4E3Y8=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
-gopkg.in/alecthomas/kingpin.v2 v2.2.6 h1:jMFz6MfLP0/4fUyZle81rXUoxOBFi19VUFKVDOQfozc=
-gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
diff --git a/kernel/client.go b/kernel/client.go
index e388411..ae258e4 100644
--- a/kernel/client.go
+++ b/kernel/client.go
@@ -47,7 +47,7 @@ const (
_PersistOffset = 5 * time.Second
// Rebalance interval
- _RebalanceInterval = 20 * time.Millisecond
+ _RebalanceInterval = 100 * time.Millisecond
)
var (
@@ -82,7 +82,6 @@ type InnerProducer interface {
IsPublishTopicNeedUpdate(topic string) bool
GetCheckListener() func(msg *MessageExt)
GetTransactionListener() TransactionListener
- //CheckTransactionState()
isUnitMode() bool
}
@@ -103,13 +102,25 @@ type RMQClient struct {
// group -> InnerConsumer
consumerMap sync.Map
once sync.Once
+
+ remoteClient *remote.RemotingClient
}
var clientMap sync.Map
func GetOrNewRocketMQClient(option ClientOption) *RMQClient {
- client := &RMQClient{option: option}
- actual, _ := clientMap.LoadOrStore(client.ClientID(), client)
+ client := &RMQClient{
+ option: option,
+ remoteClient: remote.NewRemotingClient(),
+ }
+ actual, loaded := clientMap.LoadOrStore(client.ClientID(), client)
+ if !loaded {
+ client.remoteClient.RegisterRequestFunc(ReqNotifyConsumerIdsChanged, func(req *remote.RemotingCommand) *remote.RemotingCommand {
+ rlog.Infof("receive broker's notification, the consumer group: %s", req.ExtFields["consumerGroup"])
+ client.RebalanceImmediately()
+ return nil
+ })
+ }
return actual.(*RMQClient)
}
@@ -129,7 +140,13 @@ func (c *RMQClient) Start() {
}()
// TODO cleanOfflineBroker & sendHeartbeatToAllBrokerWithLock
- go func() {}()
+ go func() {
+ for {
+ cleanOfflineBroker()
+ c.SendHeartbeatToAllBrokerWithLock()
+ time.Sleep(_HeartbeatBrokerInterval)
+ }
+ }()
// schedule persist offset
go func() {
@@ -147,7 +164,7 @@ func (c *RMQClient) Start() {
go func() {
for {
c.RebalanceImmediately()
- time.Sleep(time.Second)
+ time.Sleep(_RebalanceInterval)
}
}()
})
@@ -161,9 +178,20 @@ func (c *RMQClient) ClientID() string {
return id
}
+func (c *RMQClient) InvokeSync(addr string, request *remote.RemotingCommand,
+ timeoutMillis time.Duration) (*remote.RemotingCommand, error) {
+ return c.remoteClient.InvokeSync(addr, request, timeoutMillis)
+}
+
+func (c *RMQClient) InvokeOneWay(addr string, request *remote.RemotingCommand,
+ timeoutMillis time.Duration) error {
+ return c.remoteClient.InvokeOneWay(addr, request, timeoutMillis)
+}
+
func (c *RMQClient) CheckClientInBroker() {
}
+// TODO
func (c *RMQClient) SendHeartbeatToAllBrokerWithLock() {
hbData := &heartbeatData{
ClientId: c.ClientID(),
@@ -190,7 +218,7 @@ func (c *RMQClient) SendHeartbeatToAllBrokerWithLock() {
hbData.ProducerDatas = pData
hbData.ConsumerDatas = cData
if len(pData) == 0 && len(cData) == 0 {
- rlog.Warn("sending heartbeat, but no consumer and no producer")
+ rlog.Info("sending heartbeat, but no consumer and no consumer")
return
}
brokerAddressesMap.Range(func(key, value interface{}) bool {
@@ -198,7 +226,7 @@ func (c *RMQClient) SendHeartbeatToAllBrokerWithLock() {
data := value.(*BrokerData)
for id, addr := range data.BrokerAddresses {
cmd := remote.NewRemotingCommand(ReqHeartBeat, nil, hbData.encode())
- response, err := remote.InvokeSync(addr, cmd, 3*time.Second)
+ response, err := c.remoteClient.InvokeSync(addr, cmd, 3*time.Second)
if err != nil {
rlog.Warnf("send heart beat to broker error: %s", err.Error())
return true
@@ -213,7 +241,7 @@ func (c *RMQClient) SendHeartbeatToAllBrokerWithLock() {
brokerVersionMap.Store(brokerName, m)
}
m[brokerName] = int32(response.Version)
- rlog.Infof("send heart beat to broker[%s %s %s] success", brokerName, id, addr)
+ rlog.Infof("send heart beat to broker[%s %d %s] success", brokerName, id, addr)
}
}
return true
@@ -255,7 +283,7 @@ func (c *RMQClient) UpdateTopicRouteInfo() {
func (c *RMQClient) SendMessageSync(ctx context.Context, brokerAddrs, brokerName string, request *SendMessageRequest,
msgs []*Message) (*SendResult, error) {
cmd := getRemotingCommand(request, msgs)
- response, err := remote.InvokeSync(brokerAddrs, cmd, 3*time.Second)
+ response, err := c.remoteClient.InvokeSync(brokerAddrs, cmd, 3*time.Second)
if err != nil {
rlog.Warnf("send messages with sync error: %v", err)
return nil, err
@@ -281,7 +309,7 @@ func (c *RMQClient) SendMessageAsync(ctx context.Context, brokerAddrs, brokerNam
func (c *RMQClient) SendMessageOneWay(ctx context.Context, brokerAddrs string, request *SendMessageRequest,
msgs []*Message) (*SendResult, error) {
cmd := remote.NewRemotingCommand(ReqSendBatchMessage, request, encodeMessages(msgs))
- err := remote.InvokeOneWay(brokerAddrs, cmd, 3*time.Second)
+ err := c.remoteClient.InvokeOneWay(brokerAddrs, cmd, 3*time.Second)
if err != nil {
rlog.Warnf("send messages with oneway error: %v", err)
}
@@ -337,7 +365,7 @@ func (c *RMQClient) processSendResponse(brokerName string, msgs []*Message, cmd
// PullMessage with sync
func (c *RMQClient) PullMessage(ctx context.Context, brokerAddrs string, request *PullMessageRequest) (*PullResult, error) {
cmd := remote.NewRemotingCommand(ReqPullMessage, request, nil)
- res, err := remote.InvokeSync(brokerAddrs, cmd, 3*time.Second)
+ res, err := c.remoteClient.InvokeSync(brokerAddrs, cmd, 3*time.Second)
if err != nil {
return nil, err
}
@@ -390,67 +418,6 @@ func (c *RMQClient) PullMessageAsync(ctx context.Context, brokerAddrs string, re
return nil
}
-// QueryMaxOffset with specific queueId and topic
-func QueryMaxOffset(mq *MessageQueue) (int64, error) {
- brokerAddr := FindBrokerAddrByName(mq.BrokerName)
- if brokerAddr == "" {
- UpdateTopicRouteInfo(mq.Topic)
- brokerAddr = FindBrokerAddrByName(mq.Topic)
- }
- if brokerAddr == "" {
- return -1, fmt.Errorf("the broker [%s] does not exist", mq.BrokerName)
- }
-
- request := &GetMaxOffsetRequest{
- Topic: mq.Topic,
- QueueId: mq.QueueId,
- }
-
- cmd := remote.NewRemotingCommand(ReqGetMaxOffset, request, nil)
- response, err := remote.InvokeSync(brokerAddr, cmd, 3*time.Second)
- if err != nil {
- return -1, err
- }
-
- return strconv.ParseInt(response.ExtFields["offset"], 10, 64)
-}
-
-// QueryConsumerOffset with specific queueId and topic of consumerGroup
-func (c *RMQClient) QueryConsumerOffset(consumerGroup, mq *MessageQueue) (int64, error) {
- return 0, nil
-}
-
-// SearchOffsetByTimestamp with specific queueId and topic
-func SearchOffsetByTimestamp(mq *MessageQueue, timestamp int64) (int64, error) {
- brokerAddr := FindBrokerAddrByName(mq.BrokerName)
- if brokerAddr == "" {
- UpdateTopicRouteInfo(mq.Topic)
- brokerAddr = FindBrokerAddrByName(mq.Topic)
- }
- if brokerAddr == "" {
- return -1, fmt.Errorf("the broker [%s] does not exist", mq.BrokerName)
- }
-
- request := &SearchOffsetRequest{
- Topic: mq.Topic,
- QueueId: mq.QueueId,
- Timestamp: timestamp,
- }
-
- cmd := remote.NewRemotingCommand(ReqSearchOffsetByTimestamp, request, nil)
- response, err := remote.InvokeSync(brokerAddr, cmd, 3*time.Second)
- if err != nil {
- return -1, err
- }
-
- return strconv.ParseInt(response.ExtFields["offset"], 10, 64)
-}
-
-// UpdateConsumerOffset with specific queueId and topic
-func (c *RMQClient) UpdateConsumerOffset(consumerGroup, topic string, queue int, offset int64) error {
- return nil
-}
-
func (c *RMQClient) RegisterConsumer(group string, consumer InnerConsumer) error {
c.consumerMap.Store(group, consumer)
return nil
diff --git a/kernel/model.go b/kernel/model.go
index d9f4d00..0ddc4c3 100644
--- a/kernel/model.go
+++ b/kernel/model.go
@@ -242,7 +242,7 @@ type FindBrokerResult struct {
}
type (
- // groupName of producer
+ // groupName of consumer
producerData string
consumeType string
diff --git a/kernel/request.go b/kernel/request.go
index 2904242..9f16e44 100644
--- a/kernel/request.go
+++ b/kernel/request.go
@@ -24,18 +24,23 @@ import (
)
const (
- ReqSendMessage = int16(10)
- ReqPullMessage = int16(11)
- ReqQueryConsumerOffset = int16(14)
- ReqUpdateConsumerOffset = int16(15)
- ReqSearchOffsetByTimestamp = int16(30)
- ReqGetMaxOffset = int16(30)
- ReqHeartBeat = int16(34)
- ReqGetConsumerListByGroup = int16(38)
- ReqLockBatchMQ = int16(41)
- ReqUnlockBatchMQ = int16(42)
- ReqGetRouteInfoByTopic = int16(105)
- ReqSendBatchMessage = int16(320)
+ ReqSendMessage = int16(10)
+ ReqPullMessage = int16(11)
+ ReqQueryConsumerOffset = int16(14)
+ ReqUpdateConsumerOffset = int16(15)
+ ReqSearchOffsetByTimestamp = int16(30)
+ ReqGetMaxOffset = int16(30)
+ ReqHeartBeat = int16(34)
+ ReqGetConsumerListByGroup = int16(38)
+ ReqLockBatchMQ = int16(41)
+ ReqUnlockBatchMQ = int16(42)
+ ReqGetRouteInfoByTopic = int16(105)
+ ReqSendBatchMessage = int16(320)
+ ReqCheckTransactionState = int16(39)
+ ReqNotifyConsumerIdsChanged = int16(40)
+ ReqResetConsuemrOffset = int16(220)
+ ReqGetConsumerRunningInfo = int16(307)
+ ReqConsumeMessageDirectly = int16(309)
)
type SendMessageRequest struct {
diff --git a/kernel/route.go b/kernel/route.go
index 6a232cf..a1bfe3a 100644
--- a/kernel/route.go
+++ b/kernel/route.go
@@ -42,6 +42,7 @@ const (
var (
ErrTopicNotExist = errors.New("topic not exist")
+ nameSrvClient = remote.NewRemotingClient()
)
var (
@@ -57,6 +58,40 @@ var (
lockNamesrv sync.Mutex
)
+func cleanOfflineBroker() {
+ // TODO optimize
+ lockNamesrv.Lock()
+ brokerAddressesMap.Range(func(key, value interface{}) bool {
+ brokerName := key.(string)
+ bd := value.(*BrokerData)
+ for k, v := range bd.BrokerAddresses {
+ isBrokerAddrExistInTopicRoute := false
+ routeDataMap.Range(func(key, value interface{}) bool {
+ trd := value.(*TopicRouteData)
+ for idx := range trd.BrokerDataList {
+ for _, v1 := range trd.BrokerDataList[idx].BrokerAddresses {
+ if v1 == v {
+ isBrokerAddrExistInTopicRoute = true
+ return false
+ }
+ }
+ }
+ return true
+ })
+ if !isBrokerAddrExistInTopicRoute {
+ delete(bd.BrokerAddresses, k)
+ rlog.Infof("the broker: [name=%s, ID=%d, addr=%s,] is offline, remove it", brokerName, k, v)
+ }
+ }
+ if len(bd.BrokerAddresses) == 0 {
+ brokerAddressesMap.Delete(brokerName)
+ rlog.Infof("the broker [name=%s] name's host is offline, remove it", brokerName)
+ }
+ return true
+ })
+ lockNamesrv.Unlock()
+}
+
// key is topic, value is TopicPublishInfo
type TopicPublishInfo struct {
OrderTopic bool
@@ -233,7 +268,7 @@ func queryTopicRouteInfoFromServer(topic string) (*TopicRouteData, error) {
Topic: topic,
}
rc := remote.NewRemotingCommand(ReqGetRouteInfoByTopic, request, nil)
- response, err := remote.InvokeSync(getNameServerAddress(), rc, requestTimeout)
+ response, err := nameSrvClient.InvokeSync(getNameServerAddress(), rc, requestTimeout)
if err != nil {
return nil, err
@@ -405,7 +440,25 @@ func (routeData *TopicRouteData) clone() *TopicRouteData {
}
func (routeData *TopicRouteData) equals(data *TopicRouteData) bool {
- return false
+ if len(routeData.BrokerDataList) != len(data.BrokerDataList) {
+ return false
+ }
+ if len(routeData.QueueDataList) != len(data.QueueDataList) {
+ return false
+ }
+
+ for idx := range routeData.BrokerDataList {
+ if !routeData.BrokerDataList[idx].Equals(data.BrokerDataList[idx]) {
+ return false
+ }
+ }
+
+ for idx := range routeData.QueueDataList {
+ if !routeData.QueueDataList[idx].Equals(data.QueueDataList[idx]) {
+ return false
+ }
+ }
+ return true
}
func (routeData *TopicRouteData) String() string {
@@ -422,6 +475,30 @@ type QueueData struct {
TopicSynFlag int `json:"topicSynFlag"`
}
+func (q *QueueData) Equals(qd *QueueData) bool {
+ if q.BrokerName != qd.BrokerName {
+ return false
+ }
+
+ if q.ReadQueueNums != qd.ReadQueueNums {
+ return false
+ }
+
+ if q.WriteQueueNums != qd.WriteQueueNums {
+ return false
+ }
+
+ if q.Perm != qd.Perm {
+ return false
+ }
+
+ if q.TopicSynFlag != qd.TopicSynFlag {
+ return false
+ }
+
+ return true
+}
+
// BrokerData BrokerData
type BrokerData struct {
Cluster string `json:"cluster"`
@@ -429,3 +506,21 @@ type BrokerData struct {
BrokerAddresses map[int64]string `json:"brokerAddrs"`
brokerAddressesLock sync.RWMutex
}
+
+func (b *BrokerData) Equals(bd *BrokerData) bool {
+ if b.Cluster != bd.Cluster {
+ return false
+ }
+
+ if b.BrokerName != bd.BrokerName {
+ return false
+ }
+
+ for k, v := range b.BrokerAddresses {
+ if bd.BrokerAddresses[k] != v {
+ return false
+ }
+ }
+
+ return true
+}
diff --git a/remote/processor.go b/remote/processor.go
deleted file mode 100644
index 1e5d23e..0000000
--- a/remote/processor.go
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package remote
-
-type ClientRequestProcessor func(remotingCommand *RemotingCommand) (responseCommand *RemotingCommand)
-
-//CHECK_TRANSACTION_STATE
-//NOTIFY_CONSUMER_IDS_CHANGED
-//RESET_CONSUMER_CLIENT_OFFSET
-//GET_CONSUMER_STATUS_FROM_CLIENT
-//GET_CONSUMER_RUNNING_INFO
-//CONSUME_MESSAGE_DIRECTLY
diff --git a/remote/remote_client.go b/remote/remote_client.go
index bfe483c..93f81ce 100644
--- a/remote/remote_client.go
+++ b/remote/remote_client.go
@@ -85,14 +85,37 @@ func (r *ResponseFuture) waitResponse() (*RemotingCommand, error) {
}
}
-func InvokeSync(addr string, request *RemotingCommand, timeoutMillis time.Duration) (*RemotingCommand, error) {
- conn, err := connect(addr)
+type ClientRequestFunc func(remotingCommand *RemotingCommand) (responseCommand *RemotingCommand)
+
+type TcpOption struct {
+ // TODO
+}
+
+type RemotingClient struct {
+ responseTable sync.Map
+ connectionTable sync.Map
+ option TcpOption
+ processors map[int16]ClientRequestFunc
+}
+
+func NewRemotingClient() *RemotingClient {
+ return &RemotingClient{
+ processors: make(map[int16]ClientRequestFunc),
+ }
+}
+
+func (c *RemotingClient) RegisterRequestFunc(code int16, f ClientRequestFunc) {
+ c.processors[code] = f
+}
+
+func (c *RemotingClient) InvokeSync(addr string, request *RemotingCommand, timeoutMillis time.Duration) (*RemotingCommand, error) {
+ conn, err := c.connect(addr)
if err != nil {
return nil, err
}
resp := NewResponseFuture(request.Opaque, timeoutMillis, nil)
- responseTable.Store(resp.Opaque, resp)
- err = sendRequest(conn, request)
+ c.responseTable.Store(resp.Opaque, resp)
+ err = c.sendRequest(conn, request)
if err != nil {
return nil, err
}
@@ -100,14 +123,14 @@ func InvokeSync(addr string, request *RemotingCommand, timeoutMillis time.Durati
return resp.waitResponse()
}
-func InvokeAsync(addr string, request *RemotingCommand, timeoutMillis time.Duration, callback func(*ResponseFuture)) error {
- conn, err := connect(addr)
+func (c *RemotingClient) InvokeAsync(addr string, request *RemotingCommand, timeoutMillis time.Duration, callback func(*ResponseFuture)) error {
+ conn, err := c.connect(addr)
if err != nil {
return err
}
resp := NewResponseFuture(request.Opaque, timeoutMillis, callback)
- responseTable.Store(resp.Opaque, resp)
- err = sendRequest(conn, request)
+ c.responseTable.Store(resp.Opaque, resp)
+ err = c.sendRequest(conn, request)
if err != nil {
return err
}
@@ -116,26 +139,21 @@ func InvokeAsync(addr string, request *RemotingCommand, timeoutMillis time.Durat
}
-func InvokeOneWay(addr string, request *RemotingCommand, timeout time.Duration) error {
- conn, err := connect(addr)
+func (c *RemotingClient) InvokeOneWay(addr string, request *RemotingCommand, timeout time.Duration) error {
+ conn, err := c.connect(addr)
if err != nil {
return err
}
- return sendRequest(conn, request)
+ return c.sendRequest(conn, request)
}
-var (
- responseTable sync.Map
- connectionTable sync.Map
-)
-
-func ScanResponseTable() {
+func (c *RemotingClient) ScanResponseTable() {
rfs := make([]*ResponseFuture, 0)
- responseTable.Range(func(key, value interface{}) bool {
+ c.responseTable.Range(func(key, value interface{}) bool {
if resp, ok := value.(*ResponseFuture); ok {
if (resp.BeginTimestamp + int64(resp.TimeoutMillis) + 1000) <= time.Now().Unix()*1000 {
rfs = append(rfs, resp)
- responseTable.Delete(key)
+ c.responseTable.Delete(key)
}
}
return true
@@ -146,11 +164,11 @@ func ScanResponseTable() {
}
}
-func connect(addr string) (net.Conn, error) {
+func (c *RemotingClient) connect(addr string) (net.Conn, error) {
//it needs additional locker.
connectionLocker.Lock()
defer connectionLocker.Unlock()
- conn, ok := connectionTable.Load(addr)
+ conn, ok := c.connectionTable.Load(addr)
if ok {
return conn.(net.Conn), nil
}
@@ -158,28 +176,27 @@ func connect(addr string) (net.Conn, error) {
if err != nil {
return nil, err
}
- connectionTable.Store(addr, tcpConn)
- go receiveResponse(tcpConn)
+ c.connectionTable.Store(addr, tcpConn)
+ go c.receiveResponse(tcpConn)
return tcpConn, nil
}
-func receiveResponse(r net.Conn) {
- scanner := createScanner(r)
- for {
- scanner.Scan()
- receivedRemotingCommand, err := decode(scanner.Bytes())
+func (c *RemotingClient) receiveResponse(r net.Conn) {
+ scanner := c.createScanner(r)
+ for scanner.Scan() {
+ cmd, err := decode(scanner.Bytes())
if err != nil {
- closeConnection(r)
- rlog.Error(err.Error())
+ c.closeConnection(r)
+ rlog.Errorf("decode RemotingCommand error: %s", err.Error())
break
}
- if receivedRemotingCommand.isResponseType() {
- resp, exist := responseTable.Load(receivedRemotingCommand.Opaque)
+ if cmd.isResponseType() {
+ resp, exist := c.responseTable.Load(cmd.Opaque)
if exist {
- responseTable.Delete(receivedRemotingCommand.Opaque)
+ c.responseTable.Delete(cmd.Opaque)
responseFuture := resp.(*ResponseFuture)
go func() {
- responseFuture.ResponseCommand = receivedRemotingCommand
+ responseFuture.ResponseCommand = cmd
responseFuture.executeInvokeCallback()
if responseFuture.Done != nil {
responseFuture.Done <- true
@@ -187,12 +204,30 @@ func receiveResponse(r net.Conn) {
}()
}
} else {
- // todo handler request from peer
+ f := c.processors[cmd.Code]
+ if f != nil {
+ go func() { // 单个goroutine会造成死锁
+ res := f(cmd)
+ if res != nil {
+ err := c.sendRequest(r, cmd)
+ if err != nil {
+ rlog.Warnf("send response to broker error: %s, type is: %d", err, res.Code)
+ }
+ }
+ }()
+ } else {
+ rlog.Warnf("receive broker's requests, but no func to handle, code is: %d", cmd.Code)
+ }
}
}
+ if scanner.Err() != nil {
+ rlog.Errorf("net: %s scanner exit, err: %s.", r.RemoteAddr().String(), scanner.Err())
+ } else {
+ rlog.Infof("net: %s scanner exit.", r.RemoteAddr().String())
+ }
}
-func createScanner(r io.Reader) *bufio.Scanner {
+func (c *RemotingClient) createScanner(r io.Reader) *bufio.Scanner {
scanner := bufio.NewScanner(r)
scanner.Split(func(data []byte, atEOF bool) (int, []byte, error) {
defer func() {
@@ -219,27 +254,26 @@ func createScanner(r io.Reader) *bufio.Scanner {
return scanner
}
-func sendRequest(conn net.Conn, request *RemotingCommand) error {
+func (c *RemotingClient) sendRequest(conn net.Conn, request *RemotingCommand) error {
content, err := encode(request)
if err != nil {
return err
}
_, err = conn.Write(content)
if err != nil {
- closeConnection(conn)
+ c.closeConnection(conn)
return err
}
return nil
}
-func closeConnection(toCloseConn net.Conn) {
- connectionTable.Range(func(key, value interface{}) bool {
+func (c *RemotingClient) closeConnection(toCloseConn net.Conn) {
+ c.connectionTable.Range(func(key, value interface{}) bool {
if value == toCloseConn {
- connectionTable.Delete(key)
+ c.connectionTable.Delete(key)
return false
} else {
return true
}
-
})
}
diff --git a/rlog/log.go b/rlog/log.go
index 4f5adaf..d9a1928 100644
--- a/rlog/log.go
+++ b/rlog/log.go
@@ -17,9 +17,7 @@
package rlog
-import (
- "github.com/sirupsen/logrus"
-)
+import "github.com/sirupsen/logrus"
type Logger interface {
Debug(i ...interface{})
@@ -34,7 +32,13 @@ type Logger interface {
Fatalf(format string, args ...interface{})
}
-var rLog Logger = logrus.New()
+var rLog Logger
+
+func init() {
+ r := logrus.New()
+ //r.SetLevel(logrus.DebugLevel)
+ rLog = r
+}
func SetLogger(log Logger) {
rLog = log
diff --git a/utils/helper.go b/utils/helper.go
index 1ad0441..c5495b0 100644
--- a/utils/helper.go
+++ b/utils/helper.go
@@ -20,9 +20,7 @@ package utils
import (
"bytes"
"encoding/binary"
- "errors"
"fmt"
- "net"
"os"
"sync"
"time"
@@ -63,29 +61,6 @@ func updateTimestamp() {
nextTimestamp = int64(time.Date(year, month, 1, 0, 0, 0, 0, time.Local).AddDate(0, 1, 0).Unix())
}
-func LocalIP() []byte {
- ip, err := clientIP4()
- if err != nil {
- return []byte{0, 0, 0, 0}
- }
- return ip
-}
-
-func clientIP4() ([]byte, error) {
- addrs, err := net.InterfaceAddrs()
- if err != nil {
- return nil, errors.New("unexpected IP address")
- }
- for _, addr := range addrs {
- if ipnet, ok := addr.(*net.IPNet); ok && !ipnet.IP.IsLoopback() {
- if ip4 := ipnet.IP.To4(); ip4 != nil {
- return ip4, nil
- }
- }
- }
- return nil, errors.New("unknown IP address")
-}
-
func GetAddressByBytes(data []byte) string {
return "127.0.0.1"
}
diff --git a/utils/helper_test.go b/utils/helper_test.go
index 967b0ae..ee55cd2 100644
--- a/utils/helper_test.go
+++ b/utils/helper_test.go
@@ -26,15 +26,6 @@ func TestClassLoaderID(t *testing.T) {
}
}
-func TestLocalIP(t *testing.T) {
- ip := LocalIP()
- if ip[0] == 0 && ip[1] == 0 && ip[2] == 0 && ip[3] == 0 {
- t.Errorf("failed to get host public ip4 address")
- } else {
- t.Logf("ip4 address: %v", ip)
- }
-}
-
func BenchmarkMessageClientID(b *testing.B) {
for i := 0; i < b.N; i++ {
MessageClientID()
diff --git a/utils/net.go b/utils/net.go
new file mode 100644
index 0000000..5da1edc
--- /dev/null
+++ b/utils/net.go
@@ -0,0 +1,30 @@
+package utils
+
+import (
+ "errors"
+ "fmt"
+ "net"
+)
+
+func LocalIP() string {
+ ip, err := clientIP4()
+ if err != nil {
+ return ""
+ }
+ return fmt.Sprintf("%d.%d.%d.%d", ip[0], ip[1], ip[2], ip[3])
+}
+
+func clientIP4() ([]byte, error) {
+ addrs, err := net.InterfaceAddrs()
+ if err != nil {
+ return nil, errors.New("unexpected IP address")
+ }
+ for _, addr := range addrs {
+ if ipnet, ok := addr.(*net.IPNet); ok && !ipnet.IP.IsLoopback() {
+ if ip4 := ipnet.IP.To4(); ip4 != nil {
+ return ip4, nil
+ }
+ }
+ }
+ return nil, errors.New("unknown IP address")
+}
diff --git a/utils/net_test.go b/utils/net_test.go
new file mode 100644
index 0000000..9f76062
--- /dev/null
+++ b/utils/net_test.go
@@ -0,0 +1,7 @@
+package utils
+
+import "testing"
+
+func TestLocalIP2(t *testing.T) {
+ t.Log(LocalIP())
+}