You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by di...@apache.org on 2019/03/18 06:43:33 UTC
[rocketmq-client-go] branch native updated: Pull Consumer (#44)
This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git
The following commit(s) were added to refs/heads/native by this push:
new 93d4753 Pull Consumer (#44)
93d4753 is described below
commit 93d47535c09e68908eb0123b003f41227a975c3b
Author: wenfeng <sx...@gmail.com>
AuthorDate: Mon Mar 18 14:43:29 2019 +0800
Pull Consumer (#44)
* fix bugs of remote & refactor logger
* consumer can pull message from broker
---
consumer.go | 41 ++++++++-
consumer_test.go | 18 ++++
core/producer.go | 10 +--
core/push_consumer.go | 4 +-
go.mod | 5 +-
go.sum | 6 ++
kernel/client.go | 70 ++++++++++++---
kernel/model.go | 119 +++++++++++++++++++++++--
kernel/request.go | 27 +++++-
kernel/response.go | 13 +--
kernel/route.go | 160 ++++++++++++++++++++--------------
kernel/{response.go => route_test.go} | 27 +-----
remote/client.go | 7 +-
remote/codec.go | 44 +++++-----
remote/codec_test.go | 1 -
{utils => rlog}/log.go | 71 ++++++++++++---
utils/helper.go | 12 ++-
utils/helper_test.go | 9 +-
utils/ring_buffer.go | 19 ++--
utils/ring_buffer_test.go | 33 +++----
20 files changed, 496 insertions(+), 200 deletions(-)
diff --git a/consumer.go b/consumer.go
index acb8f1d..5119f93 100644
--- a/consumer.go
+++ b/consumer.go
@@ -22,18 +22,25 @@ import (
"errors"
"fmt"
"github.com/apache/rocketmq-client-go/kernel"
+ "github.com/apache/rocketmq-client-go/rlog"
"strconv"
"sync"
+ "sync/atomic"
"time"
)
type Consumer interface {
+ Start()
Pull(topic, expression string, numbers int) (*kernel.PullResult, error)
SubscribeWithChan(topic, expression string) (chan *kernel.Message, error)
SubscribeWithFunc(topic, expression string, f func(msg *kernel.Message) ConsumeResult) error
ACK(msg *kernel.Message, result ConsumeResult)
}
+var (
+ queueCounterTable sync.Map
+)
+
type ConsumeResult int
type ConsumerType int
@@ -42,6 +49,8 @@ const (
Original ConsumerType = iota
Orderly
Transaction
+
+ SubAll = "*"
)
type ConsumerConfig struct {
@@ -65,9 +74,12 @@ type defaultConsumer struct {
config ConsumerConfig
}
+func (c *defaultConsumer) Start() {
+ c.state = kernel.Running
+}
+
func (c *defaultConsumer) Pull(topic, expression string, numbers int) (*kernel.PullResult, error) {
mq := getNextQueueOf(topic)
-
if mq == nil {
return nil, fmt.Errorf("prepard to pull topic: %s, but no queue is founded", topic)
}
@@ -210,11 +222,33 @@ func processPullResult(mq *kernel.MessageQueue, result *kernel.PullResult, data
}
func getSubscriptionData(mq *kernel.MessageQueue, exp string) *kernel.SubscriptionData {
- return nil
+ subData := &kernel.SubscriptionData{
+ Topic: mq.Topic,
+ }
+ if exp == "" || exp == SubAll {
+ subData.SubString = SubAll
+ } else {
+ // TODO
+ }
+ return subData
}
func getNextQueueOf(topic string) *kernel.MessageQueue {
- return nil
+ queues, err := kernel.FetchSubscribeMessageQueues(topic)
+ if err != nil && len(queues) > 0 {
+ rlog.Error(err.Error())
+ return nil
+ }
+ var index int64
+ v, exist := queueCounterTable.Load(topic)
+ if !exist {
+ index = -1
+ queueCounterTable.Store(topic, 0)
+ } else {
+ index = v.(int64)
+ }
+
+ return queues[int(atomic.AddInt64(&index, 1))%len(queues)]
}
func buildSysFlag(commitOffset, suspend, subscription, classFilter bool) int32 {
@@ -248,7 +282,6 @@ func tryFindBroker(mq *kernel.MessageQueue) *kernel.FindBrokerResult {
if result == nil {
kernel.UpdateTopicRouteInfo(mq.Topic)
}
-
return kernel.FindBrokerAddressInSubscribe(mq.BrokerName, recalculatePullFromWhichNode(mq), false)
}
diff --git a/consumer_test.go b/consumer_test.go
new file mode 100644
index 0000000..aaba3cb
--- /dev/null
+++ b/consumer_test.go
@@ -0,0 +1,18 @@
+package rocketmq
+
+import (
+ "fmt"
+ "testing"
+)
+
+func TestDefaultConsumer_Pull(t *testing.T) {
+ consumer := NewConsumer(ConsumerConfig{
+ GroupName: "testGroup",
+ })
+ consumer.Start()
+ result, err := consumer.Pull("test", "*", 32)
+ if err != nil {
+ t.Fatal(err.Error())
+ }
+ fmt.Println(len(result.GetMessageExts()))
+}
diff --git a/core/producer.go b/core/producer.go
index 7df57e6..22b49ed 100644
--- a/core/producer.go
+++ b/core/producer.go
@@ -33,7 +33,7 @@ int queueSelectorCallback_cgo(int size, CMessage *msg, void *selectorKey) {
import "C"
import (
"errors"
- log "github.com/sirupsen/logrus"
+ "github.com/apache/rocketmq-client-go/rlog"
"unsafe"
)
@@ -211,7 +211,7 @@ func (p *defaultProducer) SendMessageSync(msg *Message) (*SendResult, error) {
err := rmqError(C.SendMessageSync(p.cproduer, cmsg, &sr))
if err != NIL {
- log.Warnf("send message error, error is: %s", err.Error())
+ rlog.Warnf("send message error, error is: %s", err.Error())
return nil, err
}
@@ -237,7 +237,7 @@ func (p *defaultProducer) SendMessageOrderly(msg *Message, selector MessageQueue
&sr))
if err != NIL {
- log.Warnf("send message orderly error, error is: %s", err.Error())
+ rlog.Warnf("send message orderly error, error is: %s", err.Error())
return nil, err
}
@@ -254,10 +254,10 @@ func (p *defaultProducer) SendMessageOneway(msg *Message) error {
err := rmqError(C.SendMessageOneway(p.cproduer, cmsg))
if err != NIL {
- log.Warnf("send message with oneway error, error is: %s", err.Error())
+ rlog.Warnf("send message with oneway error, error is: %s", err.Error())
return err
}
- log.Debugf("Send Message: %s with oneway success.", msg.String())
+ rlog.Debugf("Send Message: %s with oneway success.", msg.String())
return nil
}
diff --git a/core/push_consumer.go b/core/push_consumer.go
index c222587..afb0d06 100644
--- a/core/push_consumer.go
+++ b/core/push_consumer.go
@@ -34,7 +34,7 @@ import "C"
import (
"errors"
"fmt"
- log "github.com/sirupsen/logrus"
+ "github.com/apache/rocketmq-client-go/rlog"
"sync"
"unsafe"
)
@@ -226,6 +226,6 @@ func (c *defaultPushConsumer) Subscribe(topic, expression string, consumeFunc fu
return err
}
c.funcsMap.Store(topic, consumeFunc)
- log.Infof("subscribe topic[%s] with expression[%s] successfully.", topic, expression)
+ rlog.Infof("subscribe topic[%s] with expression[%s] successfully.", topic, expression)
return nil
}
diff --git a/go.mod b/go.mod
index fa910f6..2701b7c 100644
--- a/go.mod
+++ b/go.mod
@@ -1,11 +1,14 @@
module github.com/apache/rocketmq-client-go
-go 1.12
+go 1.11
require (
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc // indirect
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf // indirect
github.com/sirupsen/logrus v1.3.0
github.com/stretchr/testify v1.3.0
+ github.com/tidwall/gjson v1.2.1
+ github.com/tidwall/match v1.0.1 // indirect
+ github.com/tidwall/pretty v0.0.0-20180105212114-65a9db5fad51 // indirect
gopkg.in/alecthomas/kingpin.v2 v2.2.6
)
diff --git a/go.sum b/go.sum
index 0b0e0c1..7a45ece 100644
--- a/go.sum
+++ b/go.sum
@@ -16,6 +16,12 @@ github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
+github.com/tidwall/gjson v1.2.1 h1:j0efZLrZUvNerEf6xqoi0NjWMK5YlLrR7Guo/dxY174=
+github.com/tidwall/gjson v1.2.1/go.mod h1:c/nTNbUr0E0OrXEhq1pwa8iEgc2DOt4ZZqAt1HtCkPA=
+github.com/tidwall/match v1.0.1 h1:PnKP62LPNxHKTwvHHZZzdOAOCtsJTjo6dZLCwpKm5xc=
+github.com/tidwall/match v1.0.1/go.mod h1:LujAq0jyVjBy028G1WhWfIzbpQfMO8bBZ6Tyb0+pL9E=
+github.com/tidwall/pretty v0.0.0-20180105212114-65a9db5fad51 h1:BP2bjP495BBPaBcS5rmqviTfrOkN5rO5ceKAMRZCRFc=
+github.com/tidwall/pretty v0.0.0-20180105212114-65a9db5fad51/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk=
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793 h1:u+LnwYTOOW7Ukr/fppxEb1Nwz0AtPflrblfvUudpo+I=
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33 h1:I6FyU15t786LL7oL/hn43zqTuEGr4PN7F4XJ1p4E3Y8=
diff --git a/kernel/client.go b/kernel/client.go
index 6e4808a..f9185ac 100644
--- a/kernel/client.go
+++ b/kernel/client.go
@@ -20,7 +20,9 @@ package kernel
import (
"context"
"errors"
+ "fmt"
"github.com/apache/rocketmq-client-go/remote"
+ "github.com/apache/rocketmq-client-go/rlog"
"github.com/apache/rocketmq-client-go/utils"
"os"
"strconv"
@@ -34,7 +36,6 @@ const (
)
var (
- log = utils.RLog
namesrvAddrs = os.Getenv("rocketmq.namesrv.addr")
clientIP = utils.LocalIP()
instanceName = os.Getenv("rocketmq.client.name")
@@ -71,10 +72,10 @@ type InnerConsumer interface {
// SendMessage with batch by sync
func SendMessageSync(ctx context.Context, brokerAddrs, brokerName string, request *SendMessageRequest,
msgs []*Message) (*SendResult, error) {
- cmd := remote.NewRemotingCommand(SendBatchMessage, request, encodeMessages(msgs))
- response, err := remote.InvokeSync(brokerAddrs, cmd, 3 * time.Second)
+ cmd := remote.NewRemotingCommand(ReqSendBatchMessage, request, encodeMessages(msgs))
+ response, err := remote.InvokeSync(brokerAddrs, cmd, 3*time.Second)
if err != nil {
- log.Warningf("send messages with sync error: %v", err)
+ rlog.Warnf("send messages with sync error: %v", err)
return nil, err
}
@@ -89,10 +90,10 @@ func SendMessageAsync(ctx context.Context, brokerAddrs, brokerName string, reque
func SendMessageOneWay(ctx context.Context, brokerAddrs string, request *SendMessageRequest,
msgs []*Message) (*SendResult, error) {
- cmd := remote.NewRemotingCommand(SendBatchMessage, request, encodeMessages(msgs))
+ cmd := remote.NewRemotingCommand(ReqSendBatchMessage, request, encodeMessages(msgs))
err := remote.InvokeOneWay(brokerAddrs, cmd)
if err != nil {
- log.Warningf("send messages with oneway error: %v", err)
+ rlog.Warnf("send messages with oneway error: %v", err)
}
return nil, err
}
@@ -100,13 +101,13 @@ func SendMessageOneWay(ctx context.Context, brokerAddrs string, request *SendMes
func processSendResponse(brokerName string, msgs []*Message, cmd *remote.RemotingCommand) *SendResult {
var status SendStatus
switch cmd.Code {
- case FlushDiskTimeout:
+ case ResFlushDiskTimeout:
status = SendFlushDiskTimeout
- case FlushSlaveTimeout:
+ case ResFlushSlaveTimeout:
status = SendFlushSlaveTimeout
- case SlaveNotAvailable:
+ case ResSlaveNotAvailable:
status = SendSlaveNotAvailable
- case Success:
+ case ResSuccess:
status = SendOK
default:
// TODO process unknown code
@@ -145,7 +146,54 @@ func processSendResponse(brokerName string, msgs []*Message, cmd *remote.Remotin
// PullMessage with sync
func PullMessage(ctx context.Context, brokerAddrs string, request *PullMessageRequest) (*PullResult, error) {
- return nil, nil
+ cmd := remote.NewRemotingCommand(ReqPullMessage, request, nil)
+
+ res, err := remote.InvokeSync(brokerAddrs, cmd, 3*time.Second)
+ if err != nil {
+ return nil, err
+ }
+
+ return processPullResponse(res)
+}
+
+func processPullResponse(response *remote.RemotingCommand) (*PullResult, error) {
+ pullResult := &PullResult{}
+ switch response.Code {
+ case ResSuccess:
+ pullResult.Status = PullFound
+ case ResPullNotFound:
+ pullResult.Status = PullNoNewMsg
+ case ResPullRetryImmediately:
+ pullResult.Status = PullNoMatchedMsg
+ case ResPullOffsetMoved:
+ pullResult.Status = PullOffsetIllegal
+ default:
+ return nil, fmt.Errorf("unknown Response Code: %d, remark: %s", response.Code, response.Remark)
+ }
+
+ v, exist := response.ExtFields["maxOffset"]
+ if exist {
+ pullResult.MaxOffset, _ = strconv.ParseInt(v, 10, 64)
+ }
+
+ v, exist = response.ExtFields["minOffset"]
+ if exist {
+ pullResult.MinOffset, _ = strconv.ParseInt(v, 10, 64)
+ }
+
+ v, exist = response.ExtFields["nextBeginOffset"]
+ if exist {
+ pullResult.NextBeginOffset, _ = strconv.ParseInt(v, 10, 64)
+ }
+
+ v, exist = response.ExtFields["suggestWhichBrokerId"]
+ if exist {
+ pullResult.SuggestWhichBrokerId, _ = strconv.ParseInt(v, 10, 64)
+ }
+
+ pullResult.messageExts = decodeMessage(response.Body)
+
+ return pullResult, nil
}
// PullMessageAsync pull message async
diff --git a/kernel/model.go b/kernel/model.go
index 9ee30cb..f9593ab 100644
--- a/kernel/model.go
+++ b/kernel/model.go
@@ -18,6 +18,8 @@ limitations under the License.
package kernel
import (
+ "bytes"
+ "encoding/binary"
"fmt"
"github.com/apache/rocketmq-client-go/utils"
)
@@ -30,6 +32,9 @@ const (
SendFlushDiskTimeout
SendFlushSlaveTimeout
SendSlaveNotAvailable
+
+ FlagCompressed = 0x1
+ MsgIdLength = 8 + 8
)
// SendResult RocketMQ send result
@@ -69,20 +74,14 @@ type PullResult struct {
MaxOffset int64
Status PullStatus
SuggestWhichBrokerId int64
- messageBinary []byte
messageExts []*MessageExt
}
func (result *PullResult) GetMessageExts() []*MessageExt {
- if result.messageExts != nil && len(result.messageExts) > 0 {
- return result.messageExts
- }
-
return result.messageExts
}
func (result *PullResult) SetMessageExts(msgExts []*MessageExt) {
- result.messageBinary = nil
result.messageExts = msgExts
}
@@ -93,6 +92,112 @@ func (result *PullResult) GetMessages() []*Message {
return toMessages(result.messageExts)
}
+func decodeMessage(data []byte) []*MessageExt {
+ msgs := make([]*MessageExt, 0)
+ buf := bytes.NewBuffer(data)
+ count := 0
+ for count < len(data) {
+ msg := &MessageExt{}
+
+ // 1. total size
+ binary.Read(buf, binary.BigEndian, &msg.StoreSize)
+ count += 4
+
+ // 2. magic code
+ buf.Next(4)
+ count += 4
+
+ // 3. body CRC32
+ binary.Read(buf, binary.BigEndian, &msg.BodyCRC)
+ count += 4
+
+ // 4. queueID
+ binary.Read(buf, binary.BigEndian, &msg.QueueId)
+ count += 4
+
+ // 5. Flag
+ binary.Read(buf, binary.BigEndian, &msg.Flag)
+ count += 4
+
+ // 6. QueueOffset
+ binary.Read(buf, binary.BigEndian, &msg.QueueOffset)
+ count += 8
+
+ // 7. physical offset
+ binary.Read(buf, binary.BigEndian, &msg.CommitLogOffset)
+ count += 8
+
+ // 8. SysFlag
+ binary.Read(buf, binary.BigEndian, &msg.SysFlag)
+ count += 4
+
+ // 9. BornTimestamp
+ binary.Read(buf, binary.BigEndian, &msg.BornTimestamp)
+ count += 8
+
+ // 10. born host
+ hostBytes := buf.Next(4)
+ var port int32
+ binary.Read(buf, binary.BigEndian, &port)
+ msg.BornHost = fmt.Sprintf("%s:%d", utils.GetAddressByBytes(hostBytes), port)
+ count += 8
+
+ // 11. store timestamp
+ binary.Read(buf, binary.BigEndian, &msg.StoreTimestamp)
+ count += 8
+
+ // 12. store host
+ hostBytes = buf.Next(4)
+ binary.Read(buf, binary.BigEndian, &port)
+ msg.StoreHost = fmt.Sprintf("%s:%d", utils.GetAddressByBytes(hostBytes), port)
+ count += 8
+
+ // 13. reconsume times
+ binary.Read(buf, binary.BigEndian, &msg.ReconsumeTimes)
+ count += 4
+
+ // 14. prepared transaction offset
+ binary.Read(buf, binary.BigEndian, &msg.PreparedTransactionOffset)
+ count += 8
+
+ // 15. body
+ var length int32
+ binary.Read(buf, binary.BigEndian, &length)
+ msg.Body = buf.Next(int(length))
+ if (msg.SysFlag & FlagCompressed) == FlagCompressed {
+ msg.Body = utils.UnCompress(msg.Body)
+ }
+ count += 4 + int(length)
+
+ // 16. topic
+ _byte, _ := buf.ReadByte()
+ msg.Topic = string(buf.Next(int(_byte)))
+ count += 1 + int(_byte)
+
+ var propertiesLength int16
+ binary.Read(buf, binary.BigEndian, &propertiesLength)
+ if propertiesLength > 0 {
+ msg.Properties = parseProperties(buf.Next(int(propertiesLength)))
+ }
+ count += 2 + int(propertiesLength)
+
+ msg.MsgId = createMessageId(hostBytes, msg.CommitLogOffset)
+ //count += 16
+
+ msgs = append(msgs, msg)
+ }
+
+ return msgs
+}
+
+func createMessageId(addr []byte, offset int64) string {
+ return "msgID" // TODO
+}
+
+func parseProperties(data []byte) map[string]string {
+ return make(map[string]string, 0)
+}
+
func toMessages(messageExts []*MessageExt) []*Message {
msgs := make([]*Message, 0)
@@ -133,7 +238,7 @@ type (
MessageModel int
ConsumeFromWhere int
- ServiceState int
+ ServiceState int
)
const (
diff --git a/kernel/request.go b/kernel/request.go
index 7967cca..c5e0fef 100644
--- a/kernel/request.go
+++ b/kernel/request.go
@@ -17,9 +17,12 @@ limitations under the License.
package kernel
+import "fmt"
+
const (
- GetRouteInfoByTopic = int16(105)
- SendBatchMessage = int16(320)
+ ReqPullMessage = int16(11)
+ ReqGetRouteInfoByTopic = int16(105)
+ ReqSendBatchMessage = int16(320)
)
type SendMessageRequest struct {
@@ -59,6 +62,22 @@ type PullMessageRequest struct {
ExpressionType string `json:"expressionType"`
}
+func (request *PullMessageRequest) Encode() map[string]string {
+ maps := make(map[string]string)
+ maps["consumerGroup"] = request.ConsumerGroup
+ maps["topic"] = request.Topic
+ maps["queueId"] = fmt.Sprintf("%d", request.QueueOffset)
+ maps["queueOffset"] = fmt.Sprintf("%d", request.QueueOffset)
+ maps["maxMsgNums"] = fmt.Sprintf("%d", request.MaxMsgNums)
+ maps["sysFlag"] = fmt.Sprintf("%d", request.SysFlag)
+ maps["commitOffset"] = fmt.Sprintf("%d", request.CommitOffset)
+ maps["suspendTimeoutMillis"] = fmt.Sprintf("%d", request.SuspendTimeoutMillis)
+ maps["subscription"] = request.SubExpression
+ maps["subVersion"] = fmt.Sprintf("%d", request.SubVersion)
+ maps["expressionType"] = request.ExpressionType
+ return maps
+}
+
type GetMaxOffsetRequest struct {
Topic string `json:"topic"`
QueueId int32 `json:"queueId"`
@@ -88,7 +107,9 @@ type GetRouteInfoRequest struct {
}
func (request *GetRouteInfoRequest) Encode() map[string]string {
- return nil
+ maps := make(map[string]string)
+ maps["topic"] = request.Topic
+ return maps
}
func (request *GetRouteInfoRequest) Decode(properties map[string]string) error {
diff --git a/kernel/response.go b/kernel/response.go
index df1cecc..c4824a0 100644
--- a/kernel/response.go
+++ b/kernel/response.go
@@ -18,11 +18,14 @@ limitations under the License.
package kernel
const (
- Success = int16(0)
- FlushDiskTimeout = int16(10)
- SlaveNotAvailable = int16(11)
- FlushSlaveTimeout = int16(12)
- TopicNotExist = int16(17)
+ ResSuccess = int16(0)
+ ResFlushDiskTimeout = int16(10)
+ ResSlaveNotAvailable = int16(11)
+ ResFlushSlaveTimeout = int16(12)
+ ResTopicNotExist = int16(17)
+ ResPullNotFound = int16(19)
+ ResPullRetryImmediately = int16(20)
+ ResPullOffsetMoved = int16(21)
)
type SendMessageResponse struct {
diff --git a/kernel/route.go b/kernel/route.go
index 38deb93..dc08edc 100644
--- a/kernel/route.go
+++ b/kernel/route.go
@@ -21,6 +21,8 @@ import (
"encoding/json"
"errors"
"github.com/apache/rocketmq-client-go/remote"
+ "github.com/apache/rocketmq-client-go/rlog"
+ "github.com/tidwall/gjson"
"sort"
"strconv"
"strings"
@@ -79,49 +81,49 @@ func UpdateTopicRouteInfo(topic string) {
lockNamesrv.Lock()
defer lockNamesrv.Unlock()
- RouteData, err := queryTopicRouteInfoFromServer(topic, requestTimeout)
+ routeData, err := queryTopicRouteInfoFromServer(topic, requestTimeout)
if err != nil {
- log.Warningf("query topic route from server error: %s", err)
+ rlog.Warnf("query topic route from server error: %s", err)
return
}
- if RouteData == nil {
- log.Warningf("queryTopicRouteInfoFromServer return nil, Topic: %s", topic)
+ if routeData == nil {
+ rlog.Warnf("queryTopicRouteInfoFromServer return nil, Topic: %s", topic)
return
}
var changed bool
oldRouteData, exist := routeDataMap.Load(topic)
- if !exist || RouteData == nil {
+ if !exist || routeData == nil {
changed = true
} else {
- changed = topicRouteDataIsChange(oldRouteData.(*topicRouteData), RouteData)
+ changed = topicRouteDataIsChange(oldRouteData.(*topicRouteData), routeData)
}
if !changed {
changed = isNeedUpdateTopicRouteInfo(topic)
} else {
- log.Infof("the topic[%s] route info changed, old[%s] ,new[%s]", topic, oldRouteData, RouteData)
+ rlog.Infof("the topic[%s] route info changed, old[%v] ,new[%s]", topic, oldRouteData, routeData)
}
if !changed {
return
}
- newTopicRouteData := RouteData.clone()
+ newTopicRouteData := routeData.clone()
- for _, brokerData := range newTopicRouteData.brokerDataList {
- brokerAddressesMap.Store(brokerData.brokerName, brokerData.brokerAddresses)
+ for _, brokerData := range newTopicRouteData.BrokerDataList {
+ brokerAddressesMap.Store(brokerData.BrokerName, brokerData.BrokerAddresses)
}
// update publish info
- publishInfo := RouteData2PublishInfo(topic, RouteData)
+ publishInfo := routeData2PublishInfo(topic, routeData)
publishInfo.HaveTopicRouterInfo = true
old, _ := publishInfoMap.Load(topic)
publishInfoMap.Store(topic, publishInfoMap)
if old != nil {
- log.Infof("Old TopicPublishInfo [%s] removed.", old)
+ rlog.Infof("Old TopicPublishInfo [%s] removed.", old)
}
}
@@ -132,7 +134,7 @@ func FindBrokerAddressInPublish(brokerName string) string {
return ""
}
- return bd.(*BrokerData).brokerAddresses[MasterId]
+ return bd.(*BrokerData).BrokerAddresses[MasterId]
}
func FindBrokerAddressInSubscribe(brokerName string, brokerId int64, onlyThisBroker bool) *FindBrokerResult {
@@ -142,15 +144,16 @@ func FindBrokerAddressInSubscribe(brokerName string, brokerId int64, onlyThisBro
found = false
)
- bd, exist := brokerAddressesMap.Load(brokerName)
+ addrs, exist := brokerAddressesMap.Load(brokerName)
if exist {
- for k, v := range bd.(*BrokerData).brokerAddresses {
+ for k, v := range addrs.(map[int64]string) {
if v != "" {
found = true
if k != MasterId {
slave = true
}
+ brokerAddr = v
break
}
}
@@ -159,7 +162,7 @@ func FindBrokerAddressInSubscribe(brokerName string, brokerId int64, onlyThisBro
var result *FindBrokerResult
if found {
result = &FindBrokerResult{
- BrokerAddr: brokerName,
+ BrokerAddr: brokerAddr,
Slave: slave,
BrokerVersion: findBrokerVersion(brokerName, brokerAddr),
}
@@ -177,14 +180,13 @@ func FetchSubscribeMessageQueues(topic string) ([]*MessageQueue, error) {
mqs := make([]*MessageQueue, 0)
- for _, qd := range routeData.queueDataList {
- if queueIsReadable(qd.perm) {
- for i := 0; i < qd.readQueueNums; i++ {
- mqs = append(mqs, &MessageQueue{Topic: topic, BrokerName: qd.brokerName, QueueId: i})
+ for _, qd := range routeData.QueueDataList {
+ if queueIsReadable(qd.Perm) {
+ for i := 0; i < qd.ReadQueueNums; i++ {
+ mqs = append(mqs, &MessageQueue{Topic: topic, BrokerName: qd.BrokerName, QueueId: i})
}
}
}
-
return mqs, nil
}
@@ -207,8 +209,7 @@ func queryTopicRouteInfoFromServer(topic string, timeout time.Duration) (*topicR
request := &GetRouteInfoRequest{
Topic: topic,
}
- rc := remote.NewRemotingCommand(GetRouteInfoByTopic, request, nil)
-
+ rc := remote.NewRemotingCommand(ReqGetRouteInfoByTopic, request, nil)
response, err := remote.InvokeSync(getNameServerAddress(), rc, timeout)
if err != nil {
@@ -216,18 +217,19 @@ func queryTopicRouteInfoFromServer(topic string, timeout time.Duration) (*topicR
}
switch response.Code {
- case Success:
+ case ResSuccess:
if response.Body == nil {
return nil, errors.New(response.Remark)
}
- RouteData := &topicRouteData{}
- err = json.Unmarshal(response.Body, RouteData)
+ routeData := &topicRouteData{}
+
+ err = routeData.decode(string(response.Body))
if err != nil {
- log.Warningf("unmarshal topicRouteData error: %s", err)
+ rlog.Warnf("decode topicRouteData error: %s", err)
return nil, err
}
- return RouteData, nil
- case TopicNotExist:
+ return routeData, nil
+ case ResTopicNotExist:
return nil, ErrTopicNotExist
default:
return nil, errors.New(response.Remark)
@@ -241,17 +243,17 @@ func topicRouteDataIsChange(oldData *topicRouteData, newData *topicRouteData) bo
oldDataCloned := oldData.clone()
newDataCloned := newData.clone()
- sort.Slice(oldDataCloned.queueDataList, func(i, j int) bool {
- return strings.Compare(oldDataCloned.queueDataList[i].brokerName, oldDataCloned.queueDataList[j].brokerName) > 0
+ sort.Slice(oldDataCloned.QueueDataList, func(i, j int) bool {
+ return strings.Compare(oldDataCloned.QueueDataList[i].BrokerName, oldDataCloned.QueueDataList[j].BrokerName) > 0
})
- sort.Slice(oldDataCloned.brokerDataList, func(i, j int) bool {
- return strings.Compare(oldDataCloned.brokerDataList[i].brokerName, oldDataCloned.brokerDataList[j].brokerName) > 0
+ sort.Slice(oldDataCloned.BrokerDataList, func(i, j int) bool {
+ return strings.Compare(oldDataCloned.BrokerDataList[i].BrokerName, oldDataCloned.BrokerDataList[j].BrokerName) > 0
})
- sort.Slice(newDataCloned.queueDataList, func(i, j int) bool {
- return strings.Compare(newDataCloned.queueDataList[i].brokerName, newDataCloned.queueDataList[j].brokerName) > 0
+ sort.Slice(newDataCloned.QueueDataList, func(i, j int) bool {
+ return strings.Compare(newDataCloned.QueueDataList[i].BrokerName, newDataCloned.QueueDataList[j].BrokerName) > 0
})
- sort.Slice(newDataCloned.brokerDataList, func(i, j int) bool {
- return strings.Compare(newDataCloned.brokerDataList[i].brokerName, newDataCloned.brokerDataList[j].brokerName) > 0
+ sort.Slice(newDataCloned.BrokerDataList, func(i, j int) bool {
+ return strings.Compare(newDataCloned.BrokerDataList[i].BrokerName, newDataCloned.BrokerDataList[j].BrokerName) > 0
})
return !oldDataCloned.equals(newDataCloned)
@@ -263,7 +265,7 @@ func isNeedUpdateTopicRouteInfo(topic string) bool {
return !exist || value.(*TopicPublishInfo).isOK()
}
-func RouteData2PublishInfo(topic string, data *topicRouteData) *TopicPublishInfo {
+func routeData2PublishInfo(topic string, data *topicRouteData) *TopicPublishInfo {
publishInfo := &TopicPublishInfo{
RouteData: data,
OrderTopic: false,
@@ -288,32 +290,32 @@ func RouteData2PublishInfo(topic string, data *topicRouteData) *TopicPublishInfo
return publishInfo
}
- qds := data.queueDataList
+ qds := data.QueueDataList
sort.Slice(qds, func(i, j int) bool {
return i-j >= 0
})
for _, qd := range qds {
- if !queueIsWriteable(qd.perm) {
+ if !queueIsWriteable(qd.Perm) {
continue
}
var bData *BrokerData
- for _, bd := range data.brokerDataList {
- if bd.brokerName == qd.brokerName {
+ for _, bd := range data.BrokerDataList {
+ if bd.BrokerName == qd.BrokerName {
bData = bd
break
}
}
- if bData == nil || bData.brokerAddresses[MasterId] == "" {
+ if bData == nil || bData.BrokerAddresses[MasterId] == "" {
continue
}
- for i := 0; i < qd.writeQueueNums; i++ {
+ for i := 0; i < qd.WriteQueueNums; i++ {
mq := &MessageQueue{
Topic: topic,
- BrokerName: qd.brokerName,
+ BrokerName: qd.BrokerName,
QueueId: i,
}
publishInfo.MqList = append(publishInfo.MqList, mq)
@@ -324,50 +326,80 @@ func RouteData2PublishInfo(topic string, data *topicRouteData) *TopicPublishInfo
}
func getNameServerAddress() string {
- return ""
+ return "127.0.0.1:9876"
}
// topicRouteData topicRouteData
type topicRouteData struct {
OrderTopicConf string
- queueDataList []*QueueData
- brokerDataList []*BrokerData
+ QueueDataList []*QueueData `json:"queueDatas"`
+ BrokerDataList []*BrokerData `json:"brokerDatas"`
+}
+
+func (routeData *topicRouteData) decode(data string) error {
+ res := gjson.Parse(data)
+ json.Unmarshal([]byte(res.Get("queueDatas").String()), &routeData.QueueDataList)
+
+ bds := res.Get("brokerDatas").Array()
+ routeData.BrokerDataList = make([]*BrokerData, len(bds))
+ for idx, v := range bds {
+ bd := &BrokerData{
+ BrokerName: v.Get("brokerName").String(),
+ Cluster: v.Get("cluster").String(),
+ BrokerAddresses: make(map[int64]string, 0),
+ }
+ addrs := v.Get("brokerAddrs").String()
+ strs := strings.Split(addrs[1:len(addrs)-1], ",")
+ if strs != nil {
+ for _, str := range strs {
+ i := strings.Index(str, ":")
+ if i < 0 {
+ continue
+ }
+ id, _ := strconv.ParseInt(str[0:i], 10, 64)
+ bd.BrokerAddresses[id] = strings.Replace(str[i+1:], "\"", "", -1)
+ }
+ }
+ routeData.BrokerDataList[idx] = bd
+ }
+ return nil
}
-func (RouteData *topicRouteData) clone() *topicRouteData {
+func (routeData *topicRouteData) clone() *topicRouteData {
cloned := &topicRouteData{
- OrderTopicConf: RouteData.OrderTopicConf,
- queueDataList: make([]*QueueData, len(RouteData.queueDataList)),
- brokerDataList: make([]*BrokerData, len(RouteData.brokerDataList)),
+ OrderTopicConf: routeData.OrderTopicConf,
+ QueueDataList: make([]*QueueData, len(routeData.QueueDataList)),
+ BrokerDataList: make([]*BrokerData, len(routeData.BrokerDataList)),
}
- for index, value := range RouteData.queueDataList {
- cloned.queueDataList[index] = value
+ for index, value := range routeData.QueueDataList {
+ cloned.QueueDataList[index] = value
}
- for index, value := range RouteData.brokerDataList {
- cloned.brokerDataList[index] = value
+ for index, value := range routeData.BrokerDataList {
+ cloned.BrokerDataList[index] = value
}
return cloned
}
-func (RouteData *topicRouteData) equals(data *topicRouteData) bool {
+func (routeData *topicRouteData) equals(data *topicRouteData) bool {
return false
}
// QueueData QueueData
type QueueData struct {
- brokerName string
- readQueueNums int
- writeQueueNums int
- perm int
- topicSynFlag int
+ BrokerName string `json:"brokerName"`
+ ReadQueueNums int `json:"readQueueNums"`
+ WriteQueueNums int `json:"writeQueueNums"`
+ Perm int `json:"perm"`
+ TopicSynFlag int `json:"topicSynFlag"`
}
// BrokerData BrokerData
type BrokerData struct {
- brokerName string
- brokerAddresses map[int64]string
+ Cluster string `json:"cluster"`
+ BrokerName string `json:"brokerName"`
+ BrokerAddresses map[int64]string `json:"brokerAddrs"`
brokerAddressesLock sync.RWMutex
}
diff --git a/kernel/response.go b/kernel/route_test.go
similarity index 59%
copy from kernel/response.go
copy to kernel/route_test.go
index df1cecc..eb9c312 100644
--- a/kernel/response.go
+++ b/kernel/route_test.go
@@ -17,29 +17,10 @@ limitations under the License.
package kernel
-const (
- Success = int16(0)
- FlushDiskTimeout = int16(10)
- SlaveNotAvailable = int16(11)
- FlushSlaveTimeout = int16(12)
- TopicNotExist = int16(17)
+import (
+ "testing"
)
-type SendMessageResponse struct {
- MsgId string
- QueueId int32
- QueueOffset int64
- TransactionId string
- MsgRegion string
-}
-
-func (response *SendMessageResponse) Decode(properties map[string]string) {
-
-}
-
-type PullMessageResponse struct {
- SuggestWhichBrokerId int64
- NextBeginOffset int64
- MinOffset int64
- MaxOffset int64
+func TestUpdateTopicRouteInfo(t *testing.T) {
+ UpdateTopicRouteInfo("test")
}
diff --git a/remote/client.go b/remote/client.go
index 2cdd6de..bfc4c36 100644
--- a/remote/client.go
+++ b/remote/client.go
@@ -30,7 +30,7 @@ import (
var (
//ErrRequestTimeout for request timeout error
ErrRequestTimeout = errors.New("request timeout")
- connectionLocker sync.Mutex
+ connectionLocker sync.Mutex
)
//ResponseFuture for
@@ -160,7 +160,6 @@ func connect(addr string) (net.Conn, error) {
connectionTable.Store(addr, tcpConn)
go receiveResponse(tcpConn)
return tcpConn, nil
-
}
func receiveResponse(r net.Conn) {
@@ -195,8 +194,8 @@ func createScanner(r io.Reader) *bufio.Scanner {
if len(data) >= 4 {
var length int32
binary.Read(bytes.NewReader(data[0:4]), binary.BigEndian, &length)
- if int(length) <= len(data) {
- return int(length), data[:length], nil
+ if int(length)+4 <= len(data) {
+ return int(length) + 4, data[4 : length+4], nil
}
}
}
diff --git a/remote/codec.go b/remote/codec.go
index d556cfb..8e499fe 100644
--- a/remote/codec.go
+++ b/remote/codec.go
@@ -37,14 +37,13 @@ const (
ResponseType = 1
_Flag = 0
- _LanguageFlag = "golang"
_LanguageCode = byte(9)
_Version = 137
)
type RemotingCommand struct {
Code int16 `json:"code"`
- Language byte `json:"language"`
+ Language byte `json:"-"`
Version int16 `json:"version"`
Opaque int32 `json:"opaque"`
Flag int32 `json:"flag"`
@@ -59,11 +58,10 @@ type CustomHeader interface {
func NewRemotingCommand(code int16, header CustomHeader, body []byte) *RemotingCommand {
cmd := &RemotingCommand{
- Code: code,
- Language: _LanguageCode,
- Version: _Version,
- Opaque: atomic.AddInt32(&opaque, 1),
- Body: body,
+ Code: code,
+ Version: _Version,
+ Opaque: atomic.AddInt32(&opaque, 1),
+ Body: body,
}
if header != nil {
@@ -73,6 +71,11 @@ func NewRemotingCommand(code int16, header CustomHeader, body []byte) *RemotingC
return cmd
}
+func (command *RemotingCommand) String() string {
+ return fmt.Sprintf("Code: %d, Opaque: %d, Remark: %s, ExtFields: %v",
+ command.Code, command.Opaque, command.Remark, command.ExtFields)
+}
+
func (command *RemotingCommand) isResponseType() bool {
return command.Flag&(ResponseType) == ResponseType
}
@@ -112,7 +115,7 @@ func encode(command *RemotingCommand) ([]byte, error) {
return nil, err
}
- frameSize := 8 + len(header) + len(command.Body)
+ frameSize := 4 + len(header) + len(command.Body)
buf := bytes.NewBuffer(make([]byte, frameSize))
buf.Reset()
@@ -141,18 +144,14 @@ func encode(command *RemotingCommand) ([]byte, error) {
func decode(data []byte) (*RemotingCommand, error) {
buf := bytes.NewBuffer(data)
- var length int32
- err := binary.Read(buf, binary.BigEndian, &length)
- if err != nil {
- return nil, err
- }
+ length := int32(len(data))
var oriHeaderLen int32
- err = binary.Read(buf, binary.BigEndian, &oriHeaderLen)
+ err := binary.Read(buf, binary.BigEndian, &oriHeaderLen)
if err != nil {
return nil, err
}
- headerLength := oriHeaderLen & 0xFFFFFF
+ headerLength := oriHeaderLen & 0xFFFFFF
headerData := make([]byte, headerLength)
err = binary.Read(buf, binary.BigEndian, &headerData)
if err != nil {
@@ -160,7 +159,6 @@ func decode(data []byte) (*RemotingCommand, error) {
}
var command *RemotingCommand
-
switch codeType := byte((oriHeaderLen >> 24) & 0xFF); codeType {
case JsonCodecs:
command, err = jsonSerializer.decodeHeader(headerData)
@@ -173,13 +171,15 @@ func decode(data []byte) (*RemotingCommand, error) {
return nil, err
}
- bodyLength := length - 8 - headerLength
- bodyData := make([]byte, bodyLength)
- err = binary.Read(buf, binary.BigEndian, &bodyData)
- if err != nil {
- return nil, err
+ bodyLength := length - 4 - headerLength
+ if bodyLength > 0 {
+ bodyData := make([]byte, bodyLength)
+ err = binary.Read(buf, binary.BigEndian, &bodyData)
+ if err != nil {
+ return nil, err
+ }
+ command.Body = bodyData
}
- command.Body = bodyData
return command, nil
}
diff --git a/remote/codec_test.go b/remote/codec_test.go
index ab1a61e..e48b0bb 100644
--- a/remote/codec_test.go
+++ b/remote/codec_test.go
@@ -23,7 +23,6 @@ import (
)
type testHeader struct {
-
}
func (t testHeader) Encode() map[string]string {
diff --git a/utils/log.go b/rlog/log.go
similarity index 54%
rename from utils/log.go
rename to rlog/log.go
index d684efd..c73c9ba 100644
--- a/utils/log.go
+++ b/rlog/log.go
@@ -15,28 +15,77 @@
* limitations under the License.
*/
-package utils
+package rlog
-import "io"
-
-var RLog Logger
+import (
+ "github.com/sirupsen/logrus"
+)
type Logger interface {
- Output() io.Writer
- SetOutput(w io.Writer)
- Prefix() string
- SetPrefix(p string)
- SetHeader(h string)
Print(i ...interface{})
Printf(format string, args ...interface{})
Debug(i ...interface{})
Debugf(format string, args ...interface{})
Info(i ...interface{})
Infof(format string, args ...interface{})
- Warning(i ...interface{})
- Warningf(format string, args ...interface{})
+ Warn(i ...interface{})
+ Warnf(format string, args ...interface{})
Error(i ...interface{})
Errorf(format string, args ...interface{})
Fatal(i ...interface{})
Fatalf(format string, args ...interface{})
}
+
+var rLog Logger = logrus.New()
+
+func SetLogger(log Logger) {
+ rLog = log
+}
+
+func Print(i ...interface{}) {
+ rLog.Print(i...)
+}
+
+func Printf(format string, args ...interface{}) {
+ rLog.Printf(format, args...)
+}
+
+func Debug(i ...interface{}) {
+ rLog.Debug(i...)
+}
+
+func Debugf(format string, args ...interface{}) {
+ rLog.Debugf(format, args...)
+}
+
+func Info(i ...interface{}) {
+ rLog.Info(i...)
+}
+
+func Infof(format string, args ...interface{}) {
+ rLog.Infof(format, args...)
+}
+
+func Warn(i ...interface{}) {
+ rLog.Warn(i...)
+}
+
+func Warnf(format string, args ...interface{}) {
+ rLog.Warnf(format, args...)
+}
+
+func Error(i ...interface{}) {
+ rLog.Error(i...)
+}
+
+func Errorf(format string, args ...interface{}) {
+ rLog.Errorf(format, args...)
+}
+
+func Fatal(i ...interface{}) {
+ rLog.Fatal(i...)
+}
+
+func Fatalf(format string, args ...interface{}) {
+ rLog.Fatalf(format, args...)
+}
diff --git a/utils/helper.go b/utils/helper.go
index bc18c26..485ae15 100644
--- a/utils/helper.go
+++ b/utils/helper.go
@@ -33,7 +33,7 @@ var (
counter int16 = 0
startTimestamp int64 = 0
nextTimestamp int64 = 0
- prefix string
+ prefix string
locker sync.Mutex
)
@@ -79,7 +79,7 @@ func clientIP4() ([]byte, error) {
}
for _, addr := range addrs {
if ipnet, ok := addr.(*net.IPNet); ok && !ipnet.IP.IsLoopback() {
- if ip4 := ipnet.IP.To4(); ip4!=nil{
+ if ip4 := ipnet.IP.To4(); ip4 != nil {
return ip4, nil
}
}
@@ -87,7 +87,9 @@ func clientIP4() ([]byte, error) {
return nil, errors.New("unknown IP address")
}
-
+func GetAddressByBytes(data []byte) string {
+ return "127.0.0.1"
+}
func Pid() int16 {
return int16(os.Getpid())
@@ -104,3 +106,7 @@ func HashString(s string) int {
}
return int(crc32.ChecksumIEEE([]byte(s)))
}
+
+func UnCompress(data []byte) []byte {
+ return data
+}
diff --git a/utils/helper_test.go b/utils/helper_test.go
index 113e92c..4e1877c 100644
--- a/utils/helper_test.go
+++ b/utils/helper_test.go
@@ -28,14 +28,13 @@ func TestLocalIP(t *testing.T) {
ip := LocalIP()
if ip[0] == 0 && ip[1] == 0 && ip[2] == 0 && ip[3] == 0 {
t.Errorf("failed to get host public ip4 address")
- }else{
+ } else {
t.Logf("ip4 address: %v", ip)
}
}
-
func BenchmarkMessageClientID(b *testing.B) {
- for i:= 0; i< b.N;i++{
- MessageClientID()
+ for i := 0; i < b.N; i++ {
+ MessageClientID()
}
-}
\ No newline at end of file
+}
diff --git a/utils/ring_buffer.go b/utils/ring_buffer.go
index 9400f53..cab385a 100644
--- a/utils/ring_buffer.go
+++ b/utils/ring_buffer.go
@@ -19,8 +19,8 @@ package utils
import (
"runtime"
- "time"
"sync/atomic"
+ "time"
)
// 1.需要能够动态扩容
@@ -30,14 +30,14 @@ import (
type RingNodesBuffer struct {
writePos uint64
readPos uint64
- mask uint64
+ mask uint64
nodes nodes
}
type node struct {
position uint64
- buf []byte
+ buf []byte
}
type nodes []*node
@@ -106,10 +106,10 @@ L:
// 直接返回数据
func (r *RingNodesBuffer) Read(timeout time.Duration) (data []byte, err error) {
var (
- node *node
+ node *node
pos = atomic.LoadUint64(&r.readPos)
start time.Time
- dif uint64
+ dif uint64
)
if timeout > 0 {
start = time.Now()
@@ -143,12 +143,12 @@ L:
}
// 知道大小,传进去解析
-func (r *RingNodesBuffer) ReadBySize(data []byte,timeout time.Duration) (n int, err error) {
+func (r *RingNodesBuffer) ReadBySize(data []byte, timeout time.Duration) (n int, err error) {
var (
- node *node
+ node *node
pos = atomic.LoadUint64(&r.readPos)
start time.Time
- dif uint64
+ dif uint64
)
i := 0
if timeout > 0 {
@@ -176,12 +176,11 @@ L:
i++
}
}
- n = copy(data,node.buf)
+ n = copy(data, node.buf)
atomic.StoreUint64(&node.position, pos+r.mask+1)
return
}
-
func (r *RingNodesBuffer) Size() uint64 {
return atomic.LoadUint64(&r.writePos) - atomic.LoadUint64(&r.readPos)
diff --git a/utils/ring_buffer_test.go b/utils/ring_buffer_test.go
index a119f21..2d54a85 100644
--- a/utils/ring_buffer_test.go
+++ b/utils/ring_buffer_test.go
@@ -18,15 +18,15 @@
package utils
import (
- "time"
- "testing"
+ "fmt"
"github.com/stretchr/testify/assert"
- "sync"
"strconv"
- "fmt"
+ "sync"
+ "testing"
+ "time"
)
-func TestRingRead(t *testing.T) {
+func TestRingRead(t *testing.T) {
rb := NewRingNodesBuffer(5)
assert.Equal(t, uint64(8), rb.Cap())
@@ -34,7 +34,7 @@ func TestRingRead(t *testing.T) {
if !assert.Nil(t, err) {
return
}
- data, err := rb.Read(1*time.Second)
+ data, err := rb.Read(1 * time.Second)
if !assert.Nil(t, err) {
return
}
@@ -42,8 +42,7 @@ func TestRingRead(t *testing.T) {
assert.Equal(t, "hello", string(data))
}
-
-func TestRingReadBySize(t *testing.T) {
+func TestRingReadBySize(t *testing.T) {
rb := NewRingNodesBuffer(5)
assert.Equal(t, uint64(8), rb.Cap())
@@ -52,7 +51,7 @@ func TestRingReadBySize(t *testing.T) {
return
}
sink := make([]byte, 5)
- n, err := rb.ReadBySize(sink,1*time.Second)
+ n, err := rb.ReadBySize(sink, 1*time.Second)
if !assert.Nil(t, err) {
return
}
@@ -68,7 +67,6 @@ func BenchmarkRingReadBufferMPMC(b *testing.B) {
b.ResetTimer()
b.ReportAllocs()
-
for i := 0; i < 100; i++ {
go func() {
for i := 0; i < b.N; i++ {
@@ -82,8 +80,8 @@ func BenchmarkRingReadBufferMPMC(b *testing.B) {
for i := 0; i < b.N; i++ {
_ = len(strconv.Itoa(i))
var p []byte
- p,_ = q.Read(1*time.Second)
- fmt.Sprintf("%v",p)
+ p, _ = q.Read(1 * time.Second)
+ fmt.Sprintf("%v", p)
}
wg.Done()
@@ -93,8 +91,6 @@ func BenchmarkRingReadBufferMPMC(b *testing.B) {
wg.Wait()
}
-
-
func BenchmarkRingBySizeBufferMPMC(b *testing.B) {
q := NewRingNodesBuffer(uint64(b.N * 100))
var wg sync.WaitGroup
@@ -102,7 +98,6 @@ func BenchmarkRingBySizeBufferMPMC(b *testing.B) {
b.ResetTimer()
b.ReportAllocs()
-
for i := 0; i < 100; i++ {
go func() {
for i := 0; i < b.N; i++ {
@@ -114,13 +109,13 @@ func BenchmarkRingBySizeBufferMPMC(b *testing.B) {
for i := 0; i < 100; i++ {
go func() {
for i := 0; i < b.N; i++ {
- p := make([]byte,len(strconv.Itoa(i)))
- q.ReadBySize(p,1*time.Second)
- fmt.Sprintf("%v",p)
+ p := make([]byte, len(strconv.Itoa(i)))
+ q.ReadBySize(p, 1*time.Second)
+ fmt.Sprintf("%v", p)
}
wg.Done()
}()
}
wg.Wait()
-}
\ No newline at end of file
+}