You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by di...@apache.org on 2022/03/15 07:28:47 UTC
[rocketmq-client-go] branch master updated: [ISSUE #787]Refactor the client instance struct,converge the namesrv module (#788)
This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 6c97b55 [ISSUE #787]Refactor the client instance struct,converge the namesrv module (#788)
6c97b55 is described below
commit 6c97b55c8ca21a0dc5e1f25a8ce949cb99c213f3
Author: guyinyou <36...@users.noreply.github.com>
AuthorDate: Tue Mar 15 15:28:39 2022 +0800
[ISSUE #787]Refactor the client instance struct,converge the namesrv module (#788)
---
admin/admin.go | 30 ++++++++++++---------
consumer/consumer.go | 36 ++++++++++++--------------
consumer/consumer_test.go | 5 ++--
consumer/interceptor.go | 8 +++++-
consumer/pull_consumer.go | 10 ++++---
consumer/push_consumer.go | 18 +++++--------
internal/client.go | 66 ++++++++++++++++++++++++++++++++++-------------
internal/mock_client.go | 15 +++++++++--
internal/mock_namesrv.go | 4 +++
internal/namesrv.go | 2 ++
internal/route.go | 2 +-
internal/trace.go | 4 +++
producer/interceptor.go | 8 +++++-
producer/option.go | 2 +-
producer/producer.go | 30 ++++++++++++---------
producer/producer_test.go | 24 ++++++++++++-----
rlog/log.go | 1 +
17 files changed, 174 insertions(+), 91 deletions(-)
diff --git a/admin/admin.go b/admin/admin.go
index 1957a06..06908f4 100644
--- a/admin/admin.go
+++ b/admin/admin.go
@@ -19,6 +19,7 @@ package admin
import (
"context"
+ "fmt"
"sync"
"time"
@@ -61,8 +62,7 @@ func WithResolver(resolver primitive.NsResolver) AdminOption {
}
type admin struct {
- cli internal.RMQClient
- namesrv internal.Namesrvs
+ cli internal.RMQClient
opts *adminOptions
@@ -75,17 +75,21 @@ func NewAdmin(opts ...AdminOption) (Admin, error) {
for _, opt := range opts {
opt(defaultOpts)
}
-
- cli := internal.GetOrNewRocketMQClient(defaultOpts.ClientOptions, nil)
namesrv, err := internal.NewNamesrv(defaultOpts.Resolver)
+ defaultOpts.Namesrv = namesrv
if err != nil {
return nil, err
}
+
+ cli := internal.GetOrNewRocketMQClient(defaultOpts.ClientOptions, nil)
+ if cli == nil {
+ return nil, fmt.Errorf("GetOrNewRocketMQClient faild")
+ }
+ defaultOpts.Namesrv = cli.GetNameSrv()
//log.Printf("Client: %#v", namesrv.srvs)
return &admin{
- cli: cli,
- namesrv: namesrv,
- opts: defaultOpts,
+ cli: cli,
+ opts: defaultOpts,
}, nil
}
@@ -153,8 +157,8 @@ func (a *admin) DeleteTopic(ctx context.Context, opts ...OptionDelete) error {
}
//delete topic in broker
if cfg.BrokerAddr == "" {
- a.namesrv.UpdateTopicRouteInfo(cfg.Topic)
- cfg.BrokerAddr = a.namesrv.FindBrokerAddrByTopic(cfg.Topic)
+ a.cli.GetNameSrv().UpdateTopicRouteInfo(cfg.Topic)
+ cfg.BrokerAddr = a.cli.GetNameSrv().FindBrokerAddrByTopic(cfg.Topic)
}
if _, err := a.deleteTopicInBroker(ctx, cfg.Topic, cfg.BrokerAddr); err != nil {
@@ -168,14 +172,16 @@ func (a *admin) DeleteTopic(ctx context.Context, opts ...OptionDelete) error {
//delete topic in nameserver
if len(cfg.NameSrvAddr) == 0 {
- _, _, err := a.namesrv.UpdateTopicRouteInfo(cfg.Topic)
+ a.cli.GetNameSrv().UpdateTopicRouteInfo(cfg.Topic)
+ cfg.NameSrvAddr = a.cli.GetNameSrv().AddrList()
+ _, _, err := a.cli.GetNameSrv().UpdateTopicRouteInfo(cfg.Topic)
if err != nil {
rlog.Error("delete topic in nameserver error", map[string]interface{}{
- rlog.LogKeyTopic: cfg.Topic,
+ rlog.LogKeyTopic: cfg.Topic,
rlog.LogKeyUnderlayError: err,
})
}
- cfg.NameSrvAddr = a.namesrv.AddrList()
+ cfg.NameSrvAddr = a.cli.GetNameSrv().AddrList()
}
for _, nameSrvAddr := range cfg.NameSrvAddr {
diff --git a/consumer/consumer.go b/consumer/consumer.go
index ad77bc1..8056b22 100644
--- a/consumer/consumer.go
+++ b/consumer/consumer.go
@@ -263,8 +263,6 @@ type defaultConsumer struct {
// chan for push consumer
prCh chan PullRequest
- namesrv internal.Namesrvs
-
pullFromWhichNodeTable sync.Map
stat *StatsManager
@@ -280,7 +278,7 @@ func (dc *defaultConsumer) start() error {
if dc.model == Clustering {
dc.option.ChangeInstanceNameToPID()
- dc.storage = NewRemoteOffsetStore(dc.consumerGroup, dc.client, dc.namesrv)
+ dc.storage = NewRemoteOffsetStore(dc.consumerGroup, dc.client, dc.client.GetNameSrv())
} else {
dc.storage = NewLocalFileOffsetStore(dc.consumerGroup, dc.client.ClientID())
}
@@ -448,7 +446,7 @@ type lockBatchRequestBody struct {
}
func (dc *defaultConsumer) lock(mq *primitive.MessageQueue) bool {
- brokerResult := dc.namesrv.FindBrokerAddressInSubscribe(mq.BrokerName, internal.MasterId, true)
+ brokerResult := dc.client.GetNameSrv().FindBrokerAddressInSubscribe(mq.BrokerName, internal.MasterId, true)
if brokerResult == nil {
return false
@@ -488,7 +486,7 @@ func (dc *defaultConsumer) lock(mq *primitive.MessageQueue) bool {
}
func (dc *defaultConsumer) unlock(mq *primitive.MessageQueue, oneway bool) {
- brokerResult := dc.namesrv.FindBrokerAddressInSubscribe(mq.BrokerName, internal.MasterId, true)
+ brokerResult := dc.client.GetNameSrv().FindBrokerAddressInSubscribe(mq.BrokerName, internal.MasterId, true)
if brokerResult == nil {
return
@@ -513,7 +511,7 @@ func (dc *defaultConsumer) lockAll() {
if len(mqs) == 0 {
continue
}
- brokerResult := dc.namesrv.FindBrokerAddressInSubscribe(broker, internal.MasterId, true)
+ brokerResult := dc.client.GetNameSrv().FindBrokerAddressInSubscribe(broker, internal.MasterId, true)
if brokerResult == nil {
continue
}
@@ -559,7 +557,7 @@ func (dc *defaultConsumer) unlockAll(oneway bool) {
if len(mqs) == 0 {
continue
}
- brokerResult := dc.namesrv.FindBrokerAddressInSubscribe(broker, internal.MasterId, true)
+ brokerResult := dc.client.GetNameSrv().FindBrokerAddressInSubscribe(broker, internal.MasterId, true)
if brokerResult == nil {
continue
}
@@ -892,10 +890,10 @@ func (dc *defaultConsumer) processPullResult(mq *primitive.MessageQueue, result
}
func (dc *defaultConsumer) findConsumerList(topic string) []string {
- brokerAddr := dc.namesrv.FindBrokerAddrByTopic(topic)
+ brokerAddr := dc.client.GetNameSrv().FindBrokerAddrByTopic(topic)
if brokerAddr == "" {
- dc.namesrv.UpdateTopicRouteInfo(topic)
- brokerAddr = dc.namesrv.FindBrokerAddrByTopic(topic)
+ dc.client.GetNameSrv().UpdateTopicRouteInfo(topic)
+ brokerAddr = dc.client.GetNameSrv().FindBrokerAddrByTopic(topic)
}
if brokerAddr != "" {
@@ -929,10 +927,10 @@ func (dc *defaultConsumer) sendBack(msg *primitive.MessageExt, level int) error
// QueryMaxOffset with specific queueId and topic
func (dc *defaultConsumer) queryMaxOffset(mq *primitive.MessageQueue) (int64, error) {
- brokerAddr := dc.namesrv.FindBrokerAddrByName(mq.BrokerName)
+ brokerAddr := dc.client.GetNameSrv().FindBrokerAddrByName(mq.BrokerName)
if brokerAddr == "" {
- dc.namesrv.UpdateTopicRouteInfo(mq.Topic)
- brokerAddr = dc.namesrv.FindBrokerAddrByName(mq.BrokerName)
+ dc.client.GetNameSrv().UpdateTopicRouteInfo(mq.Topic)
+ brokerAddr = dc.client.GetNameSrv().FindBrokerAddrByName(mq.BrokerName)
}
if brokerAddr == "" {
return -1, fmt.Errorf("the broker [%s] does not exist", mq.BrokerName)
@@ -958,10 +956,10 @@ func (dc *defaultConsumer) queryOffset(mq *primitive.MessageQueue) int64 {
// SearchOffsetByTimestamp with specific queueId and topic
func (dc *defaultConsumer) searchOffsetByTimestamp(mq *primitive.MessageQueue, timestamp int64) (int64, error) {
- brokerAddr := dc.namesrv.FindBrokerAddrByName(mq.BrokerName)
+ brokerAddr := dc.client.GetNameSrv().FindBrokerAddrByName(mq.BrokerName)
if brokerAddr == "" {
- dc.namesrv.UpdateTopicRouteInfo(mq.Topic)
- brokerAddr = dc.namesrv.FindBrokerAddrByName(mq.BrokerName)
+ dc.client.GetNameSrv().UpdateTopicRouteInfo(mq.Topic)
+ brokerAddr = dc.client.GetNameSrv().FindBrokerAddrByName(mq.BrokerName)
}
if brokerAddr == "" {
return -1, fmt.Errorf("the broker [%s] does not exist", mq.BrokerName)
@@ -1044,12 +1042,12 @@ func clearCommitOffsetFlag(sysFlag int32) int32 {
}
func (dc *defaultConsumer) tryFindBroker(mq *primitive.MessageQueue) *internal.FindBrokerResult {
- result := dc.namesrv.FindBrokerAddressInSubscribe(mq.BrokerName, dc.recalculatePullFromWhichNode(mq), false)
+ result := dc.client.GetNameSrv().FindBrokerAddressInSubscribe(mq.BrokerName, dc.recalculatePullFromWhichNode(mq), false)
if result != nil {
return result
}
- dc.namesrv.UpdateTopicRouteInfo(mq.Topic)
- return dc.namesrv.FindBrokerAddressInSubscribe(mq.BrokerName, dc.recalculatePullFromWhichNode(mq), false)
+ dc.client.GetNameSrv().UpdateTopicRouteInfo(mq.Topic)
+ return dc.client.GetNameSrv().FindBrokerAddressInSubscribe(mq.BrokerName, dc.recalculatePullFromWhichNode(mq), false)
}
func (dc *defaultConsumer) updatePullFromWhichNode(mq *primitive.MessageQueue, brokerId int64) {
diff --git a/consumer/consumer_test.go b/consumer/consumer_test.go
index 8b99767..12ccd18 100644
--- a/consumer/consumer_test.go
+++ b/consumer/consumer_test.go
@@ -67,7 +67,6 @@ func TestDoRebalance(t *testing.T) {
defer ctrl.Finish()
namesrvCli := internal.NewMockNamesrvs(ctrl)
namesrvCli.EXPECT().FindBrokerAddrByTopic(gomock.Any()).Return(broker)
- dc.namesrv = namesrvCli
rmqCli := internal.NewMockRMQClient(ctrl)
rmqCli.EXPECT().InvokeSync(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
@@ -75,6 +74,8 @@ func TestDoRebalance(t *testing.T) {
Body: []byte("{\"consumerIdList\": [\"a1\", \"a2\", \"a3\"] }"),
}, nil)
rmqCli.EXPECT().ClientID().Return(clientID)
+ rmqCli.SetNameSrv(namesrvCli)
+
dc.client = rmqCli
var wg sync.WaitGroup
@@ -109,10 +110,10 @@ func TestComputePullFromWhere(t *testing.T) {
}
namesrvCli := internal.NewMockNamesrvs(ctrl)
- dc.namesrv = namesrvCli
rmqCli := internal.NewMockRMQClient(ctrl)
dc.client = rmqCli
+ rmqCli.SetNameSrv(namesrvCli)
Convey("get effective offset", func() {
offsetStore.EXPECT().read(gomock.Any(), gomock.Any()).Return(int64(10))
diff --git a/consumer/interceptor.go b/consumer/interceptor.go
index aababfe..05ff94a 100644
--- a/consumer/interceptor.go
+++ b/consumer/interceptor.go
@@ -19,6 +19,7 @@ package consumer
import (
"context"
+ "fmt"
"time"
"github.com/apache/rocketmq-client-go/v2/internal"
@@ -39,9 +40,14 @@ func WithTrace(traceCfg *primitive.TraceConfig) Option {
func newTraceInterceptor(traceCfg *primitive.TraceConfig) primitive.Interceptor {
dispatcher := internal.NewTraceDispatcher(traceCfg)
- dispatcher.Start()
+ if dispatcher != nil {
+ dispatcher.Start()
+ }
return func(ctx context.Context, req, reply interface{}, next primitive.Invoker) error {
+ if dispatcher == nil {
+ return fmt.Errorf("GetOrNewRocketMQClient faild")
+ }
consumerCtx, exist := primitive.GetConsumerCtx(ctx)
if !exist || len(consumerCtx.Msgs) == 0 {
return next(ctx, req, reply)
diff --git a/consumer/pull_consumer.go b/consumer/pull_consumer.go
index 81a3ec5..874973b 100644
--- a/consumer/pull_consumer.go
+++ b/consumer/pull_consumer.go
@@ -90,10 +90,12 @@ func NewPullConsumer(options ...Option) (*defaultPullConsumer, error) {
prCh: make(chan PullRequest, 4),
model: defaultOpts.ConsumerModel,
option: defaultOpts,
-
- namesrv: srvs,
}
- dc.option.ClientOptions.Namesrv, err = internal.GetNamesrv(dc.client.ClientID())
+ if dc.client == nil {
+ return nil, fmt.Errorf("GetOrNewRocketMQClient faild")
+ }
+ defaultOpts.Namesrv = dc.client.GetNameSrv()
+
c := &defaultPullConsumer{
defaultConsumer: dc,
}
@@ -132,7 +134,7 @@ func (c *defaultPullConsumer) Pull(ctx context.Context, topic string, selector M
}
func (c *defaultPullConsumer) getNextQueueOf(topic string) *primitive.MessageQueue {
- queues, err := c.defaultConsumer.namesrv.FetchSubscribeMessageQueues(topic)
+ queues, err := c.defaultConsumer.client.GetNameSrv().FetchSubscribeMessageQueues(topic)
if err != nil && len(queues) > 0 {
rlog.Error("get next mq error", map[string]interface{}{
rlog.LogKeyTopic: topic,
diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index 3e4377b..8642aa4 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -100,14 +100,13 @@ func NewPushConsumer(opts ...Option) (*pushConsumer, error) {
consumeOrderly: defaultOpts.ConsumeOrderly,
fromWhere: defaultOpts.FromWhere,
allocate: defaultOpts.Strategy,
- namesrv: srvs,
option: defaultOpts,
}
- dc.option.ClientOptions.Namesrv, err = internal.GetNamesrv(dc.client.ClientID())
- if err != nil {
- return nil, err
+ if dc.client == nil {
+ return nil, fmt.Errorf("GetOrNewRocketMQClient faild")
}
- dc.namesrv = dc.option.ClientOptions.Namesrv
+ defaultOpts.Namesrv = dc.client.GetNameSrv()
+
p := &pushConsumer{
defaultConsumer: dc,
subscribedTopic: make(map[string]string, 0),
@@ -124,11 +123,6 @@ func NewPushConsumer(opts ...Option) (*pushConsumer, error) {
p.interceptor = primitive.ChainInterceptors(p.option.Interceptors...)
- if p.model == Clustering {
- retryTopic := internal.GetRetryTopic(p.consumerGroup)
- sub := buildSubscriptionData(retryTopic, MessageSelector{TAG, _SubAll})
- p.subscriptionDataTable.Store(retryTopic, sub)
- }
return p, nil
}
@@ -386,7 +380,7 @@ func (pc *pushConsumer) GetConsumerRunningInfo() *internal.ConsumerRunningInfo {
})
nsAddr := ""
- for _, value := range pc.namesrv.AddrList() {
+ for _, value := range pc.client.GetNameSrv().AddrList() {
nsAddr += fmt.Sprintf("%s;", value)
}
info.Properties[internal.PropNameServerAddr] = nsAddr
@@ -795,7 +789,7 @@ func (pc *pushConsumer) correctTagsOffset(pr *PullRequest) {
func (pc *pushConsumer) sendMessageBack(brokerName string, msg *primitive.MessageExt, delayLevel int) bool {
var brokerAddr string
if len(brokerName) != 0 {
- brokerAddr = pc.defaultConsumer.namesrv.FindBrokerAddrByName(brokerName)
+ brokerAddr = pc.defaultConsumer.client.GetNameSrv().FindBrokerAddrByName(brokerName)
} else {
brokerAddr = msg.StoreHost
}
diff --git a/internal/client.go b/internal/client.go
index 4d7769e..c7f3e58 100644
--- a/internal/client.go
+++ b/internal/client.go
@@ -24,6 +24,7 @@ import (
errors2 "github.com/apache/rocketmq-client-go/v2/errors"
"net"
"os"
+ "sort"
"strconv"
"strings"
"sync"
@@ -104,7 +105,7 @@ func DefaultClientOptions() ClientOptions {
type ClientOptions struct {
GroupName string
NameServerAddrs primitive.NamesrvAddr
- Namesrv *namesrvs
+ Namesrv Namesrvs
ClientIP string
InstanceName string
UnitMode bool
@@ -136,7 +137,7 @@ type RMQClient interface {
ClientID() string
- RegisterProducer(group string, producer InnerProducer)
+ RegisterProducer(group string, producer InnerProducer) error
UnregisterProducer(group string)
InvokeSync(ctx context.Context, addr string, request *remote.RemotingCommand,
timeoutMillis time.Duration) (*remote.RemotingCommand, error)
@@ -155,6 +156,8 @@ type RMQClient interface {
PullMessage(ctx context.Context, brokerAddrs string, request *PullMessageRequestHeader) (*primitive.PullResult, error)
RebalanceImmediately()
UpdatePublishInfo(topic string, data *TopicRouteData, changed bool)
+
+ GetNameSrv() Namesrvs
}
var _ RMQClient = new(rmqClient)
@@ -172,25 +175,48 @@ type rmqClient struct {
hbMutex sync.Mutex
close bool
rbMutex sync.Mutex
- namesrvs *namesrvs
done chan struct{}
shutdownOnce sync.Once
}
+func (c *rmqClient) GetNameSrv() Namesrvs {
+ return c.option.Namesrv
+}
+
var clientMap sync.Map
func GetOrNewRocketMQClient(option ClientOptions, callbackCh chan interface{}) RMQClient {
client := &rmqClient{
option: option,
remoteClient: remote.NewRemotingClient(),
- namesrvs: option.Namesrv,
done: make(chan struct{}),
}
actual, loaded := clientMap.LoadOrStore(client.ClientID(), client)
- client.namesrvs = GetOrSetNamesrv(client.ClientID(), client.namesrvs)
- client.namesrvs.bundleClient = actual.(*rmqClient)
- client.option.Namesrv = client.namesrvs
- if !loaded {
+
+ if loaded {
+ // compare namesrv address
+ client = actual.(*rmqClient)
+ now := option.Namesrv.(*namesrvs).resolver.Resolve()
+ old := client.GetNameSrv().(*namesrvs).resolver.Resolve()
+ if len(now) != len(old) {
+ rlog.Error("different namesrv option in the same instance", map[string]interface{}{
+ "NewNameSrv": now,
+ "BeforeNameSrv": old,
+ })
+ return nil
+ }
+ sort.Strings(now)
+ sort.Strings(old)
+ for i := 0; i < len(now); i++ {
+ if now[i] != old[i] {
+ rlog.Error("different namesrv option in the same instance", map[string]interface{}{
+ "NewNameSrv": now,
+ "BeforeNameSrv": old,
+ })
+ return nil
+ }
+ }
+ } else {
client.remoteClient.RegisterRequestFunc(ReqNotifyConsumerIdsChanged, func(req *remote.RemotingCommand, addr net.Addr) *remote.RemotingCommand {
rlog.Info("receive broker's notification to consumer group", map[string]interface{}{
rlog.LogKeyConsumerGroup: req.ExtFields["consumerGroup"],
@@ -306,7 +332,7 @@ func GetOrNewRocketMQClient(option ClientOptions, callbackCh chan interface{}) R
return nil
})
}
- return actual.(*rmqClient)
+ return client
}
func (c *rmqClient) Start() {
@@ -318,7 +344,7 @@ func (c *rmqClient) Start() {
}
go primitive.WithRecover(func() {
op := func() {
- c.namesrvs.UpdateNameServerAddress()
+ c.GetNameSrv().UpdateNameServerAddress()
}
time.Sleep(10 * time.Second)
op()
@@ -364,7 +390,7 @@ func (c *rmqClient) Start() {
go primitive.WithRecover(func() {
op := func() {
- c.namesrvs.cleanOfflineBroker()
+ c.GetNameSrv().cleanOfflineBroker()
c.SendHeartbeatToAllBrokerWithLock()
}
@@ -529,7 +555,7 @@ func (c *rmqClient) SendHeartbeatToAllBrokerWithLock() {
rlog.Info("sending heartbeat, but no producer and no consumer", nil)
return
}
- c.namesrvs.brokerAddressesMap.Range(func(key, value interface{}) bool {
+ c.GetNameSrv().(*namesrvs).brokerAddressesMap.Range(func(key, value interface{}) bool {
brokerName := key.(string)
data := value.(*BrokerData)
for id, addr := range data.BrokerAddresses {
@@ -559,7 +585,7 @@ func (c *rmqClient) SendHeartbeatToAllBrokerWithLock() {
}
cancel()
if response.Code == ResSuccess {
- c.namesrvs.AddBrokerVersion(brokerName, addr, int32(response.Version))
+ c.GetNameSrv().(*namesrvs).AddBrokerVersion(brokerName, addr, int32(response.Version))
rlog.Debug("send heart beat to broker success", map[string]interface{}{
"brokerName": brokerName,
"brokerId": id,
@@ -589,7 +615,7 @@ func (c *rmqClient) UpdateTopicRouteInfo() {
return true
})
for topic := range publishTopicSet {
- data, changed, _ := c.namesrvs.UpdateTopicRouteInfo(topic)
+ data, changed, _ := c.GetNameSrv().UpdateTopicRouteInfo(topic)
c.UpdatePublishInfo(topic, data, changed)
}
@@ -604,7 +630,7 @@ func (c *rmqClient) UpdateTopicRouteInfo() {
})
for topic := range subscribedTopicSet {
- data, changed, _ := c.namesrvs.UpdateTopicRouteInfo(topic)
+ data, changed, _ := c.GetNameSrv().UpdateTopicRouteInfo(topic)
c.updateSubscribeInfo(topic, data, changed)
}
}
@@ -730,8 +756,12 @@ func (c *rmqClient) UnregisterConsumer(group string) {
c.consumerMap.Delete(group)
}
-func (c *rmqClient) RegisterProducer(group string, producer InnerProducer) {
- c.producerMap.Store(group, producer)
+func (c *rmqClient) RegisterProducer(group string, producer InnerProducer) error {
+ _, loaded := c.producerMap.LoadOrStore(group, producer)
+ if loaded {
+ return fmt.Errorf("the producer group \"%s\" has been created, specify another one", c.option.GroupName)
+ }
+ return nil
}
func (c *rmqClient) UnregisterProducer(group string) {
@@ -760,7 +790,7 @@ func (c *rmqClient) UpdatePublishInfo(topic string, data *TopicRouteData, change
updated = p.IsPublishTopicNeedUpdate(topic)
}
if updated {
- publishInfo := c.namesrvs.routeData2PublishInfo(topic, data)
+ publishInfo := c.GetNameSrv().(*namesrvs).routeData2PublishInfo(topic, data)
publishInfo.HaveTopicRouterInfo = true
p.UpdateTopicPublishInfo(topic, publishInfo)
}
diff --git a/internal/mock_client.go b/internal/mock_client.go
index ab34ac1..c975038 100644
--- a/internal/mock_client.go
+++ b/internal/mock_client.go
@@ -208,6 +208,15 @@ func (mr *MockInnerConsumerMockRecorder) GetConsumerRunningInfo() *gomock.Call {
type MockRMQClient struct {
ctrl *gomock.Controller
recorder *MockRMQClientMockRecorder
+ Namesrv *MockNamesrvs
+}
+
+func (m *MockRMQClient) GetNameSrv() Namesrvs {
+ return m.Namesrv
+}
+
+func (m *MockRMQClient) SetNameSrv(mockNamesrvs *MockNamesrvs) {
+ m.Namesrv = mockNamesrvs
}
// MockRMQClientMockRecorder is the mock recorder for MockRMQClient
@@ -260,8 +269,10 @@ func (mr *MockRMQClientMockRecorder) ClientID() *gomock.Call {
}
// RegisterProducer mocks base method
-func (m *MockRMQClient) RegisterProducer(group string, producer InnerProducer) {
- m.ctrl.Call(m, "RegisterProducer", group, producer)
+func (m *MockRMQClient) RegisterProducer(group string, producer InnerProducer) error {
+ ret := m.ctrl.Call(m, "RegisterProducer", group, producer)
+ ret0, _ := ret[0].(error)
+ return ret0
}
// RegisterProducer indicates an expected call of RegisterProducer
diff --git a/internal/mock_namesrv.go b/internal/mock_namesrv.go
index f87d174..7ce6f97 100644
--- a/internal/mock_namesrv.go
+++ b/internal/mock_namesrv.go
@@ -33,6 +33,10 @@ type MockNamesrvs struct {
recorder *MockNamesrvsMockRecorder
}
+func (m *MockNamesrvs) UpdateTopicRouteInfoWithDefault(topic string, defaultTopic string, defaultQueueNum int) (*TopicRouteData, bool, error) {
+ return m.UpdateTopicRouteInfo(topic)
+}
+
// MockNamesrvsMockRecorder is the mock recorder for MockNamesrvs
type MockNamesrvsMockRecorder struct {
mock *MockNamesrvs
diff --git a/internal/namesrv.go b/internal/namesrv.go
index 7776651..96e708a 100644
--- a/internal/namesrv.go
+++ b/internal/namesrv.go
@@ -50,6 +50,8 @@ type Namesrvs interface {
UpdateTopicRouteInfo(topic string) (routeData *TopicRouteData, changed bool, err error)
+ UpdateTopicRouteInfoWithDefault(topic string, defaultTopic string, defaultQueueNum int) (*TopicRouteData, bool, error)
+
FetchPublishMessageQueues(topic string) ([]*primitive.MessageQueue, error)
FindBrokerAddrByTopic(topic string) string
diff --git a/internal/route.go b/internal/route.go
index 9cfa398..54dbbea 100644
--- a/internal/route.go
+++ b/internal/route.go
@@ -165,7 +165,7 @@ func (s *namesrvs) UpdateTopicRouteInfoWithDefault(topic string, defaultTopic st
updated = p.IsPublishTopicNeedUpdate(topic)
}
if updated {
- publishInfo := s.bundleClient.namesrvs.routeData2PublishInfo(topic, routeData)
+ publishInfo := s.bundleClient.GetNameSrv().(*namesrvs).routeData2PublishInfo(topic, routeData)
publishInfo.HaveTopicRouterInfo = true
p.UpdateTopicPublishInfo(topic, publishInfo)
}
diff --git a/internal/trace.go b/internal/trace.go
index cef1634..753a4d1 100644
--- a/internal/trace.go
+++ b/internal/trace.go
@@ -276,6 +276,10 @@ func NewTraceDispatcher(traceCfg *primitive.TraceConfig) *traceDispatcher {
cliOp.Namesrv = srvs
cliOp.Credentials = traceCfg.Credentials
cli := GetOrNewRocketMQClient(cliOp, nil)
+ if cli == nil {
+ return nil
+ }
+ cliOp.Namesrv = cli.GetNameSrv()
return &traceDispatcher{
ctx: ctx,
cancel: cancel,
diff --git a/producer/interceptor.go b/producer/interceptor.go
index 160deac..71eb8e7 100644
--- a/producer/interceptor.go
+++ b/producer/interceptor.go
@@ -22,6 +22,7 @@ package producer
import (
"context"
+ "fmt"
"time"
"github.com/apache/rocketmq-client-go/v2/internal"
@@ -42,9 +43,14 @@ func WithTrace(traceCfg *primitive.TraceConfig) Option {
func newTraceInterceptor(traceCfg *primitive.TraceConfig) primitive.Interceptor {
dispatcher := internal.NewTraceDispatcher(traceCfg)
- dispatcher.Start()
+ if dispatcher != nil {
+ dispatcher.Start()
+ }
return func(ctx context.Context, req, reply interface{}, next primitive.Invoker) error {
+ if dispatcher == nil {
+ return fmt.Errorf("GetOrNewRocketMQClient faild")
+ }
beginT := time.Now()
err := next(ctx, req, reply)
diff --git a/producer/option.go b/producer/option.go
index 5839402..ae76511 100644
--- a/producer/option.go
+++ b/producer/option.go
@@ -35,7 +35,7 @@ func defaultProducerOptions() producerOptions {
CompressMsgBodyOverHowmuch: 4096,
CompressLevel: 5,
}
- opts.ClientOptions.GroupName = "DEFAULT_CONSUMER"
+ opts.ClientOptions.GroupName = "DEFAULT_PRODUCER"
return opts
}
diff --git a/producer/producer.go b/producer/producer.go
index 226eedb..3c875c6 100644
--- a/producer/producer.go
+++ b/producer/producer.go
@@ -67,19 +67,25 @@ func NewDefaultProducer(opts ...Option) (*defaultProducer, error) {
options: defaultOpts,
}
producer.client = internal.GetOrNewRocketMQClient(defaultOpts.ClientOptions, producer.callbackCh)
- producer.options.ClientOptions.Namesrv, err = internal.GetNamesrv(producer.client.ClientID())
- if err != nil {
- return nil, err
+ if producer.client == nil {
+ return nil, fmt.Errorf("GetOrNewRocketMQClient faild")
}
+ defaultOpts.Namesrv = producer.client.GetNameSrv()
+
producer.interceptor = primitive.ChainInterceptors(producer.options.Interceptors...)
return producer, nil
}
func (p *defaultProducer) Start() error {
+ if p == nil || p.client == nil {
+ return fmt.Errorf("client instance is nil, can not start producer")
+ }
atomic.StoreInt32(&p.state, int32(internal.StateRunning))
-
- p.client.RegisterProducer(p.group, p)
+ err := p.client.RegisterProducer(p.group, p)
+ if err != nil {
+ return err
+ }
p.client.Start()
return nil
}
@@ -195,7 +201,7 @@ func (p *defaultProducer) sendSync(ctx context.Context, msg *primitive.Message,
continue
}
- addr := p.options.Namesrv.FindBrokerAddrByName(mq.BrokerName)
+ addr := p.client.GetNameSrv().FindBrokerAddrByName(mq.BrokerName)
if addr == "" {
return fmt.Errorf("topic=%s route info not found", mq.Topic)
}
@@ -242,7 +248,7 @@ func (p *defaultProducer) sendAsync(ctx context.Context, msg *primitive.Message,
return errors.Errorf("the topic=%s route info not found", msg.Topic)
}
- addr := p.options.Namesrv.FindBrokerAddrByName(mq.BrokerName)
+ addr := p.client.GetNameSrv().FindBrokerAddrByName(mq.BrokerName)
if addr == "" {
return errors.Errorf("topic=%s route info not found", mq.Topic)
}
@@ -289,7 +295,7 @@ func (p *defaultProducer) sendOneWay(ctx context.Context, msg *primitive.Message
continue
}
- addr := p.options.Namesrv.FindBrokerAddrByName(mq.BrokerName)
+ addr := p.client.GetNameSrv().FindBrokerAddrByName(mq.BrokerName)
if addr == "" {
return fmt.Errorf("topic=%s route info not found", mq.Topic)
}
@@ -378,7 +384,7 @@ func (p *defaultProducer) selectMessageQueue(msg *primitive.Message) *primitive.
v, exist := p.publishInfo.Load(topic)
if !exist {
- data, changed, err := p.options.Namesrv.UpdateTopicRouteInfo(topic)
+ data, changed, err := p.client.GetNameSrv().UpdateTopicRouteInfo(topic)
if err != nil && primitive.IsRemotingErr(err) {
return nil
}
@@ -387,7 +393,7 @@ func (p *defaultProducer) selectMessageQueue(msg *primitive.Message) *primitive.
}
if !exist {
- data, changed, _ := p.options.Namesrv.UpdateTopicRouteInfoWithDefault(topic, p.options.CreateTopicKey, p.options.DefaultTopicQueueNums)
+ data, changed, _ := p.client.GetNameSrv().UpdateTopicRouteInfoWithDefault(topic, p.options.CreateTopicKey, p.options.DefaultTopicQueueNums)
p.client.UpdatePublishInfo(topic, data, changed)
v, exist = p.publishInfo.Load(topic)
}
@@ -558,8 +564,8 @@ func (tp *transactionProducer) endTransaction(result primitive.SendResult, err e
} else {
msgID, _ = primitive.UnmarshalMsgID([]byte(result.MsgID))
}
-
- brokerAddr := tp.producer.options.Namesrv.FindBrokerAddrByName(result.MessageQueue.BrokerName)
+ // 估计没有反序列化回来
+ brokerAddr := tp.producer.client.GetNameSrv().FindBrokerAddrByName(result.MessageQueue.BrokerName)
requestHeader := &internal.EndTransactionRequestHeader{
TransactionId: result.TransactionID,
CommitLogOffset: msgID.Offset,
diff --git a/producer/producer_test.go b/producer/producer_test.go
index a7c15c1..e1d72dd 100644
--- a/producer/producer_test.go
+++ b/producer/producer_test.go
@@ -47,7 +47,7 @@ func TestShutdown(t *testing.T) {
client := internal.NewMockRMQClient(ctrl)
p.client = client
- client.EXPECT().RegisterProducer(gomock.Any(), gomock.Any()).Return()
+ client.EXPECT().RegisterProducer(gomock.Any(), gomock.Any()).Return(nil)
client.EXPECT().Start().Return()
err := p.Start()
assert.Nil(t, err)
@@ -117,10 +117,13 @@ func TestSync(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
+ namesrvCli := internal.NewMockNamesrvs(ctrl)
client := internal.NewMockRMQClient(ctrl)
p.client = client
+ client.SetNameSrv(namesrvCli)
+ namesrvCli.EXPECT().FindBrokerAddrByName(gomock.Any()).Return("a")
- client.EXPECT().RegisterProducer(gomock.Any(), gomock.Any()).Return()
+ client.EXPECT().RegisterProducer(gomock.Any(), gomock.Any()).Return(nil)
client.EXPECT().Start().Return()
err := p.Start()
assert.Nil(t, err)
@@ -168,10 +171,13 @@ func TestASync(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
+ namesrvCli := internal.NewMockNamesrvs(ctrl)
client := internal.NewMockRMQClient(ctrl)
p.client = client
+ client.SetNameSrv(namesrvCli)
+ namesrvCli.EXPECT().FindBrokerAddrByName(gomock.Any()).Return("a")
- client.EXPECT().RegisterProducer(gomock.Any(), gomock.Any()).Return()
+ client.EXPECT().RegisterProducer(gomock.Any(), gomock.Any()).Return(nil)
client.EXPECT().Start().Return()
err := p.Start()
assert.Nil(t, err)
@@ -230,10 +236,13 @@ func TestOneway(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
+ namesrvCli := internal.NewMockNamesrvs(ctrl)
client := internal.NewMockRMQClient(ctrl)
p.client = client
+ client.SetNameSrv(namesrvCli)
+ namesrvCli.EXPECT().FindBrokerAddrByName(gomock.Any()).Return("a")
- client.EXPECT().RegisterProducer(gomock.Any(), gomock.Any()).Return()
+ client.EXPECT().RegisterProducer(gomock.Any(), gomock.Any()).Return(nil)
client.EXPECT().Start().Return()
err := p.Start()
assert.Nil(t, err)
@@ -268,10 +277,13 @@ func TestSyncWithNamespace(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
+ namesrvCli := internal.NewMockNamesrvs(ctrl)
client := internal.NewMockRMQClient(ctrl)
p.client = client
+ client.SetNameSrv(namesrvCli)
+ namesrvCli.EXPECT().FindBrokerAddrByName(gomock.Any()).Return("a")
- client.EXPECT().RegisterProducer(gomock.Any(), gomock.Any()).Return()
+ client.EXPECT().RegisterProducer(gomock.Any(), gomock.Any()).Return(nil)
client.EXPECT().Start().Return()
err := p.Start()
assert.Nil(t, err)
@@ -323,7 +335,7 @@ func TestBatchSendDifferentTopics(t *testing.T) {
client := internal.NewMockRMQClient(ctrl)
p.client = client
- client.EXPECT().RegisterProducer(gomock.Any(), gomock.Any()).Return()
+ client.EXPECT().RegisterProducer(gomock.Any(), gomock.Any()).Return(nil)
client.EXPECT().Start().Return()
err := p.Start()
assert.Nil(t, err)
diff --git a/rlog/log.go b/rlog/log.go
index 037cfcf..a179a40 100644
--- a/rlog/log.go
+++ b/rlog/log.go
@@ -25,6 +25,7 @@ import (
)
const (
+ LogKeyProducerGroup = "producerGroup"
LogKeyConsumerGroup = "consumerGroup"
LogKeyTopic = "topic"
LogKeyMessageQueue = "MessageQueue"