You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by di...@apache.org on 2022/03/15 07:28:47 UTC

[rocketmq-client-go] branch master updated: [ISSUE #787]Refactor the client instance struct,converge the namesrv module (#788)

This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 6c97b55  [ISSUE #787]Refactor the client instance struct,converge the namesrv module (#788)
6c97b55 is described below

commit 6c97b55c8ca21a0dc5e1f25a8ce949cb99c213f3
Author: guyinyou <36...@users.noreply.github.com>
AuthorDate: Tue Mar 15 15:28:39 2022 +0800

    [ISSUE #787]Refactor the client instance struct,converge the namesrv module (#788)
---
 admin/admin.go            | 30 ++++++++++++---------
 consumer/consumer.go      | 36 ++++++++++++--------------
 consumer/consumer_test.go |  5 ++--
 consumer/interceptor.go   |  8 +++++-
 consumer/pull_consumer.go | 10 ++++---
 consumer/push_consumer.go | 18 +++++--------
 internal/client.go        | 66 ++++++++++++++++++++++++++++++++++-------------
 internal/mock_client.go   | 15 +++++++++--
 internal/mock_namesrv.go  |  4 +++
 internal/namesrv.go       |  2 ++
 internal/route.go         |  2 +-
 internal/trace.go         |  4 +++
 producer/interceptor.go   |  8 +++++-
 producer/option.go        |  2 +-
 producer/producer.go      | 30 ++++++++++++---------
 producer/producer_test.go | 24 ++++++++++++-----
 rlog/log.go               |  1 +
 17 files changed, 174 insertions(+), 91 deletions(-)

diff --git a/admin/admin.go b/admin/admin.go
index 1957a06..06908f4 100644
--- a/admin/admin.go
+++ b/admin/admin.go
@@ -19,6 +19,7 @@ package admin
 
 import (
 	"context"
+	"fmt"
 	"sync"
 	"time"
 
@@ -61,8 +62,7 @@ func WithResolver(resolver primitive.NsResolver) AdminOption {
 }
 
 type admin struct {
-	cli     internal.RMQClient
-	namesrv internal.Namesrvs
+	cli internal.RMQClient
 
 	opts *adminOptions
 
@@ -75,17 +75,21 @@ func NewAdmin(opts ...AdminOption) (Admin, error) {
 	for _, opt := range opts {
 		opt(defaultOpts)
 	}
-
-	cli := internal.GetOrNewRocketMQClient(defaultOpts.ClientOptions, nil)
 	namesrv, err := internal.NewNamesrv(defaultOpts.Resolver)
+	defaultOpts.Namesrv = namesrv
 	if err != nil {
 		return nil, err
 	}
+
+	cli := internal.GetOrNewRocketMQClient(defaultOpts.ClientOptions, nil)
+	if cli == nil {
+		return nil, fmt.Errorf("GetOrNewRocketMQClient faild")
+	}
+	defaultOpts.Namesrv = cli.GetNameSrv()
 	//log.Printf("Client: %#v", namesrv.srvs)
 	return &admin{
-		cli:     cli,
-		namesrv: namesrv,
-		opts:    defaultOpts,
+		cli:  cli,
+		opts: defaultOpts,
 	}, nil
 }
 
@@ -153,8 +157,8 @@ func (a *admin) DeleteTopic(ctx context.Context, opts ...OptionDelete) error {
 	}
 	//delete topic in broker
 	if cfg.BrokerAddr == "" {
-		a.namesrv.UpdateTopicRouteInfo(cfg.Topic)
-		cfg.BrokerAddr = a.namesrv.FindBrokerAddrByTopic(cfg.Topic)
+		a.cli.GetNameSrv().UpdateTopicRouteInfo(cfg.Topic)
+		cfg.BrokerAddr = a.cli.GetNameSrv().FindBrokerAddrByTopic(cfg.Topic)
 	}
 
 	if _, err := a.deleteTopicInBroker(ctx, cfg.Topic, cfg.BrokerAddr); err != nil {
@@ -168,14 +172,16 @@ func (a *admin) DeleteTopic(ctx context.Context, opts ...OptionDelete) error {
 
 	//delete topic in nameserver
 	if len(cfg.NameSrvAddr) == 0 {
-		_, _, err := a.namesrv.UpdateTopicRouteInfo(cfg.Topic)
+		a.cli.GetNameSrv().UpdateTopicRouteInfo(cfg.Topic)
+		cfg.NameSrvAddr = a.cli.GetNameSrv().AddrList()
+		_, _, err := a.cli.GetNameSrv().UpdateTopicRouteInfo(cfg.Topic)
 		if err != nil {
 			rlog.Error("delete topic in nameserver error", map[string]interface{}{
-				rlog.LogKeyTopic: cfg.Topic,
+				rlog.LogKeyTopic:         cfg.Topic,
 				rlog.LogKeyUnderlayError: err,
 			})
 		}
-		cfg.NameSrvAddr = a.namesrv.AddrList()
+		cfg.NameSrvAddr = a.cli.GetNameSrv().AddrList()
 	}
 
 	for _, nameSrvAddr := range cfg.NameSrvAddr {
diff --git a/consumer/consumer.go b/consumer/consumer.go
index ad77bc1..8056b22 100644
--- a/consumer/consumer.go
+++ b/consumer/consumer.go
@@ -263,8 +263,6 @@ type defaultConsumer struct {
 	// chan for push consumer
 	prCh chan PullRequest
 
-	namesrv internal.Namesrvs
-
 	pullFromWhichNodeTable sync.Map
 
 	stat *StatsManager
@@ -280,7 +278,7 @@ func (dc *defaultConsumer) start() error {
 
 	if dc.model == Clustering {
 		dc.option.ChangeInstanceNameToPID()
-		dc.storage = NewRemoteOffsetStore(dc.consumerGroup, dc.client, dc.namesrv)
+		dc.storage = NewRemoteOffsetStore(dc.consumerGroup, dc.client, dc.client.GetNameSrv())
 	} else {
 		dc.storage = NewLocalFileOffsetStore(dc.consumerGroup, dc.client.ClientID())
 	}
@@ -448,7 +446,7 @@ type lockBatchRequestBody struct {
 }
 
 func (dc *defaultConsumer) lock(mq *primitive.MessageQueue) bool {
-	brokerResult := dc.namesrv.FindBrokerAddressInSubscribe(mq.BrokerName, internal.MasterId, true)
+	brokerResult := dc.client.GetNameSrv().FindBrokerAddressInSubscribe(mq.BrokerName, internal.MasterId, true)
 
 	if brokerResult == nil {
 		return false
@@ -488,7 +486,7 @@ func (dc *defaultConsumer) lock(mq *primitive.MessageQueue) bool {
 }
 
 func (dc *defaultConsumer) unlock(mq *primitive.MessageQueue, oneway bool) {
-	brokerResult := dc.namesrv.FindBrokerAddressInSubscribe(mq.BrokerName, internal.MasterId, true)
+	brokerResult := dc.client.GetNameSrv().FindBrokerAddressInSubscribe(mq.BrokerName, internal.MasterId, true)
 
 	if brokerResult == nil {
 		return
@@ -513,7 +511,7 @@ func (dc *defaultConsumer) lockAll() {
 		if len(mqs) == 0 {
 			continue
 		}
-		brokerResult := dc.namesrv.FindBrokerAddressInSubscribe(broker, internal.MasterId, true)
+		brokerResult := dc.client.GetNameSrv().FindBrokerAddressInSubscribe(broker, internal.MasterId, true)
 		if brokerResult == nil {
 			continue
 		}
@@ -559,7 +557,7 @@ func (dc *defaultConsumer) unlockAll(oneway bool) {
 		if len(mqs) == 0 {
 			continue
 		}
-		brokerResult := dc.namesrv.FindBrokerAddressInSubscribe(broker, internal.MasterId, true)
+		brokerResult := dc.client.GetNameSrv().FindBrokerAddressInSubscribe(broker, internal.MasterId, true)
 		if brokerResult == nil {
 			continue
 		}
@@ -892,10 +890,10 @@ func (dc *defaultConsumer) processPullResult(mq *primitive.MessageQueue, result
 }
 
 func (dc *defaultConsumer) findConsumerList(topic string) []string {
-	brokerAddr := dc.namesrv.FindBrokerAddrByTopic(topic)
+	brokerAddr := dc.client.GetNameSrv().FindBrokerAddrByTopic(topic)
 	if brokerAddr == "" {
-		dc.namesrv.UpdateTopicRouteInfo(topic)
-		brokerAddr = dc.namesrv.FindBrokerAddrByTopic(topic)
+		dc.client.GetNameSrv().UpdateTopicRouteInfo(topic)
+		brokerAddr = dc.client.GetNameSrv().FindBrokerAddrByTopic(topic)
 	}
 
 	if brokerAddr != "" {
@@ -929,10 +927,10 @@ func (dc *defaultConsumer) sendBack(msg *primitive.MessageExt, level int) error
 
 // QueryMaxOffset with specific queueId and topic
 func (dc *defaultConsumer) queryMaxOffset(mq *primitive.MessageQueue) (int64, error) {
-	brokerAddr := dc.namesrv.FindBrokerAddrByName(mq.BrokerName)
+	brokerAddr := dc.client.GetNameSrv().FindBrokerAddrByName(mq.BrokerName)
 	if brokerAddr == "" {
-		dc.namesrv.UpdateTopicRouteInfo(mq.Topic)
-		brokerAddr = dc.namesrv.FindBrokerAddrByName(mq.BrokerName)
+		dc.client.GetNameSrv().UpdateTopicRouteInfo(mq.Topic)
+		brokerAddr = dc.client.GetNameSrv().FindBrokerAddrByName(mq.BrokerName)
 	}
 	if brokerAddr == "" {
 		return -1, fmt.Errorf("the broker [%s] does not exist", mq.BrokerName)
@@ -958,10 +956,10 @@ func (dc *defaultConsumer) queryOffset(mq *primitive.MessageQueue) int64 {
 
 // SearchOffsetByTimestamp with specific queueId and topic
 func (dc *defaultConsumer) searchOffsetByTimestamp(mq *primitive.MessageQueue, timestamp int64) (int64, error) {
-	brokerAddr := dc.namesrv.FindBrokerAddrByName(mq.BrokerName)
+	brokerAddr := dc.client.GetNameSrv().FindBrokerAddrByName(mq.BrokerName)
 	if brokerAddr == "" {
-		dc.namesrv.UpdateTopicRouteInfo(mq.Topic)
-		brokerAddr = dc.namesrv.FindBrokerAddrByName(mq.BrokerName)
+		dc.client.GetNameSrv().UpdateTopicRouteInfo(mq.Topic)
+		brokerAddr = dc.client.GetNameSrv().FindBrokerAddrByName(mq.BrokerName)
 	}
 	if brokerAddr == "" {
 		return -1, fmt.Errorf("the broker [%s] does not exist", mq.BrokerName)
@@ -1044,12 +1042,12 @@ func clearCommitOffsetFlag(sysFlag int32) int32 {
 }
 
 func (dc *defaultConsumer) tryFindBroker(mq *primitive.MessageQueue) *internal.FindBrokerResult {
-	result := dc.namesrv.FindBrokerAddressInSubscribe(mq.BrokerName, dc.recalculatePullFromWhichNode(mq), false)
+	result := dc.client.GetNameSrv().FindBrokerAddressInSubscribe(mq.BrokerName, dc.recalculatePullFromWhichNode(mq), false)
 	if result != nil {
 		return result
 	}
-	dc.namesrv.UpdateTopicRouteInfo(mq.Topic)
-	return dc.namesrv.FindBrokerAddressInSubscribe(mq.BrokerName, dc.recalculatePullFromWhichNode(mq), false)
+	dc.client.GetNameSrv().UpdateTopicRouteInfo(mq.Topic)
+	return dc.client.GetNameSrv().FindBrokerAddressInSubscribe(mq.BrokerName, dc.recalculatePullFromWhichNode(mq), false)
 }
 
 func (dc *defaultConsumer) updatePullFromWhichNode(mq *primitive.MessageQueue, brokerId int64) {
diff --git a/consumer/consumer_test.go b/consumer/consumer_test.go
index 8b99767..12ccd18 100644
--- a/consumer/consumer_test.go
+++ b/consumer/consumer_test.go
@@ -67,7 +67,6 @@ func TestDoRebalance(t *testing.T) {
 		defer ctrl.Finish()
 		namesrvCli := internal.NewMockNamesrvs(ctrl)
 		namesrvCli.EXPECT().FindBrokerAddrByTopic(gomock.Any()).Return(broker)
-		dc.namesrv = namesrvCli
 
 		rmqCli := internal.NewMockRMQClient(ctrl)
 		rmqCli.EXPECT().InvokeSync(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
@@ -75,6 +74,8 @@ func TestDoRebalance(t *testing.T) {
 				Body: []byte("{\"consumerIdList\": [\"a1\", \"a2\", \"a3\"] }"),
 			}, nil)
 		rmqCli.EXPECT().ClientID().Return(clientID)
+		rmqCli.SetNameSrv(namesrvCli)
+
 		dc.client = rmqCli
 
 		var wg sync.WaitGroup
@@ -109,10 +110,10 @@ func TestComputePullFromWhere(t *testing.T) {
 		}
 
 		namesrvCli := internal.NewMockNamesrvs(ctrl)
-		dc.namesrv = namesrvCli
 
 		rmqCli := internal.NewMockRMQClient(ctrl)
 		dc.client = rmqCli
+		rmqCli.SetNameSrv(namesrvCli)
 
 		Convey("get effective offset", func() {
 			offsetStore.EXPECT().read(gomock.Any(), gomock.Any()).Return(int64(10))
diff --git a/consumer/interceptor.go b/consumer/interceptor.go
index aababfe..05ff94a 100644
--- a/consumer/interceptor.go
+++ b/consumer/interceptor.go
@@ -19,6 +19,7 @@ package consumer
 
 import (
 	"context"
+	"fmt"
 	"time"
 
 	"github.com/apache/rocketmq-client-go/v2/internal"
@@ -39,9 +40,14 @@ func WithTrace(traceCfg *primitive.TraceConfig) Option {
 
 func newTraceInterceptor(traceCfg *primitive.TraceConfig) primitive.Interceptor {
 	dispatcher := internal.NewTraceDispatcher(traceCfg)
-	dispatcher.Start()
+	if dispatcher != nil {
+		dispatcher.Start()
+	}
 
 	return func(ctx context.Context, req, reply interface{}, next primitive.Invoker) error {
+		if dispatcher == nil {
+			return fmt.Errorf("GetOrNewRocketMQClient faild")
+		}
 		consumerCtx, exist := primitive.GetConsumerCtx(ctx)
 		if !exist || len(consumerCtx.Msgs) == 0 {
 			return next(ctx, req, reply)
diff --git a/consumer/pull_consumer.go b/consumer/pull_consumer.go
index 81a3ec5..874973b 100644
--- a/consumer/pull_consumer.go
+++ b/consumer/pull_consumer.go
@@ -90,10 +90,12 @@ func NewPullConsumer(options ...Option) (*defaultPullConsumer, error) {
 		prCh:          make(chan PullRequest, 4),
 		model:         defaultOpts.ConsumerModel,
 		option:        defaultOpts,
-
-		namesrv: srvs,
 	}
-	dc.option.ClientOptions.Namesrv, err = internal.GetNamesrv(dc.client.ClientID())
+	if dc.client == nil {
+		return nil, fmt.Errorf("GetOrNewRocketMQClient faild")
+	}
+	defaultOpts.Namesrv = dc.client.GetNameSrv()
+
 	c := &defaultPullConsumer{
 		defaultConsumer: dc,
 	}
@@ -132,7 +134,7 @@ func (c *defaultPullConsumer) Pull(ctx context.Context, topic string, selector M
 }
 
 func (c *defaultPullConsumer) getNextQueueOf(topic string) *primitive.MessageQueue {
-	queues, err := c.defaultConsumer.namesrv.FetchSubscribeMessageQueues(topic)
+	queues, err := c.defaultConsumer.client.GetNameSrv().FetchSubscribeMessageQueues(topic)
 	if err != nil && len(queues) > 0 {
 		rlog.Error("get next mq error", map[string]interface{}{
 			rlog.LogKeyTopic:         topic,
diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index 3e4377b..8642aa4 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -100,14 +100,13 @@ func NewPushConsumer(opts ...Option) (*pushConsumer, error) {
 		consumeOrderly: defaultOpts.ConsumeOrderly,
 		fromWhere:      defaultOpts.FromWhere,
 		allocate:       defaultOpts.Strategy,
-		namesrv:        srvs,
 		option:         defaultOpts,
 	}
-	dc.option.ClientOptions.Namesrv, err = internal.GetNamesrv(dc.client.ClientID())
-	if err != nil {
-		return nil, err
+	if dc.client == nil {
+		return nil, fmt.Errorf("GetOrNewRocketMQClient faild")
 	}
-	dc.namesrv = dc.option.ClientOptions.Namesrv
+	defaultOpts.Namesrv = dc.client.GetNameSrv()
+
 	p := &pushConsumer{
 		defaultConsumer: dc,
 		subscribedTopic: make(map[string]string, 0),
@@ -124,11 +123,6 @@ func NewPushConsumer(opts ...Option) (*pushConsumer, error) {
 
 	p.interceptor = primitive.ChainInterceptors(p.option.Interceptors...)
 
-	if p.model == Clustering {
-		retryTopic := internal.GetRetryTopic(p.consumerGroup)
-		sub := buildSubscriptionData(retryTopic, MessageSelector{TAG, _SubAll})
-		p.subscriptionDataTable.Store(retryTopic, sub)
-	}
 	return p, nil
 }
 
@@ -386,7 +380,7 @@ func (pc *pushConsumer) GetConsumerRunningInfo() *internal.ConsumerRunningInfo {
 	})
 
 	nsAddr := ""
-	for _, value := range pc.namesrv.AddrList() {
+	for _, value := range pc.client.GetNameSrv().AddrList() {
 		nsAddr += fmt.Sprintf("%s;", value)
 	}
 	info.Properties[internal.PropNameServerAddr] = nsAddr
@@ -795,7 +789,7 @@ func (pc *pushConsumer) correctTagsOffset(pr *PullRequest) {
 func (pc *pushConsumer) sendMessageBack(brokerName string, msg *primitive.MessageExt, delayLevel int) bool {
 	var brokerAddr string
 	if len(brokerName) != 0 {
-		brokerAddr = pc.defaultConsumer.namesrv.FindBrokerAddrByName(brokerName)
+		brokerAddr = pc.defaultConsumer.client.GetNameSrv().FindBrokerAddrByName(brokerName)
 	} else {
 		brokerAddr = msg.StoreHost
 	}
diff --git a/internal/client.go b/internal/client.go
index 4d7769e..c7f3e58 100644
--- a/internal/client.go
+++ b/internal/client.go
@@ -24,6 +24,7 @@ import (
 	errors2 "github.com/apache/rocketmq-client-go/v2/errors"
 	"net"
 	"os"
+	"sort"
 	"strconv"
 	"strings"
 	"sync"
@@ -104,7 +105,7 @@ func DefaultClientOptions() ClientOptions {
 type ClientOptions struct {
 	GroupName         string
 	NameServerAddrs   primitive.NamesrvAddr
-	Namesrv           *namesrvs
+	Namesrv           Namesrvs
 	ClientIP          string
 	InstanceName      string
 	UnitMode          bool
@@ -136,7 +137,7 @@ type RMQClient interface {
 
 	ClientID() string
 
-	RegisterProducer(group string, producer InnerProducer)
+	RegisterProducer(group string, producer InnerProducer) error
 	UnregisterProducer(group string)
 	InvokeSync(ctx context.Context, addr string, request *remote.RemotingCommand,
 		timeoutMillis time.Duration) (*remote.RemotingCommand, error)
@@ -155,6 +156,8 @@ type RMQClient interface {
 	PullMessage(ctx context.Context, brokerAddrs string, request *PullMessageRequestHeader) (*primitive.PullResult, error)
 	RebalanceImmediately()
 	UpdatePublishInfo(topic string, data *TopicRouteData, changed bool)
+
+	GetNameSrv() Namesrvs
 }
 
 var _ RMQClient = new(rmqClient)
@@ -172,25 +175,48 @@ type rmqClient struct {
 	hbMutex      sync.Mutex
 	close        bool
 	rbMutex      sync.Mutex
-	namesrvs     *namesrvs
 	done         chan struct{}
 	shutdownOnce sync.Once
 }
 
+func (c *rmqClient) GetNameSrv() Namesrvs {
+	return c.option.Namesrv
+}
+
 var clientMap sync.Map
 
 func GetOrNewRocketMQClient(option ClientOptions, callbackCh chan interface{}) RMQClient {
 	client := &rmqClient{
 		option:       option,
 		remoteClient: remote.NewRemotingClient(),
-		namesrvs:     option.Namesrv,
 		done:         make(chan struct{}),
 	}
 	actual, loaded := clientMap.LoadOrStore(client.ClientID(), client)
-	client.namesrvs = GetOrSetNamesrv(client.ClientID(), client.namesrvs)
-	client.namesrvs.bundleClient = actual.(*rmqClient)
-	client.option.Namesrv = client.namesrvs
-	if !loaded {
+
+	if loaded {
+		// compare namesrv address
+		client = actual.(*rmqClient)
+		now := option.Namesrv.(*namesrvs).resolver.Resolve()
+		old := client.GetNameSrv().(*namesrvs).resolver.Resolve()
+		if len(now) != len(old) {
+			rlog.Error("different namesrv option in the same instance", map[string]interface{}{
+				"NewNameSrv":    now,
+				"BeforeNameSrv": old,
+			})
+			return nil
+		}
+		sort.Strings(now)
+		sort.Strings(old)
+		for i := 0; i < len(now); i++ {
+			if now[i] != old[i] {
+				rlog.Error("different namesrv option in the same instance", map[string]interface{}{
+					"NewNameSrv":    now,
+					"BeforeNameSrv": old,
+				})
+				return nil
+			}
+		}
+	} else {
 		client.remoteClient.RegisterRequestFunc(ReqNotifyConsumerIdsChanged, func(req *remote.RemotingCommand, addr net.Addr) *remote.RemotingCommand {
 			rlog.Info("receive broker's notification to consumer group", map[string]interface{}{
 				rlog.LogKeyConsumerGroup: req.ExtFields["consumerGroup"],
@@ -306,7 +332,7 @@ func GetOrNewRocketMQClient(option ClientOptions, callbackCh chan interface{}) R
 			return nil
 		})
 	}
-	return actual.(*rmqClient)
+	return client
 }
 
 func (c *rmqClient) Start() {
@@ -318,7 +344,7 @@ func (c *rmqClient) Start() {
 		}
 		go primitive.WithRecover(func() {
 			op := func() {
-				c.namesrvs.UpdateNameServerAddress()
+				c.GetNameSrv().UpdateNameServerAddress()
 			}
 			time.Sleep(10 * time.Second)
 			op()
@@ -364,7 +390,7 @@ func (c *rmqClient) Start() {
 
 		go primitive.WithRecover(func() {
 			op := func() {
-				c.namesrvs.cleanOfflineBroker()
+				c.GetNameSrv().cleanOfflineBroker()
 				c.SendHeartbeatToAllBrokerWithLock()
 			}
 
@@ -529,7 +555,7 @@ func (c *rmqClient) SendHeartbeatToAllBrokerWithLock() {
 		rlog.Info("sending heartbeat, but no producer and no consumer", nil)
 		return
 	}
-	c.namesrvs.brokerAddressesMap.Range(func(key, value interface{}) bool {
+	c.GetNameSrv().(*namesrvs).brokerAddressesMap.Range(func(key, value interface{}) bool {
 		brokerName := key.(string)
 		data := value.(*BrokerData)
 		for id, addr := range data.BrokerAddresses {
@@ -559,7 +585,7 @@ func (c *rmqClient) SendHeartbeatToAllBrokerWithLock() {
 			}
 			cancel()
 			if response.Code == ResSuccess {
-				c.namesrvs.AddBrokerVersion(brokerName, addr, int32(response.Version))
+				c.GetNameSrv().(*namesrvs).AddBrokerVersion(brokerName, addr, int32(response.Version))
 				rlog.Debug("send heart beat to broker success", map[string]interface{}{
 					"brokerName": brokerName,
 					"brokerId":   id,
@@ -589,7 +615,7 @@ func (c *rmqClient) UpdateTopicRouteInfo() {
 		return true
 	})
 	for topic := range publishTopicSet {
-		data, changed, _ := c.namesrvs.UpdateTopicRouteInfo(topic)
+		data, changed, _ := c.GetNameSrv().UpdateTopicRouteInfo(topic)
 		c.UpdatePublishInfo(topic, data, changed)
 	}
 
@@ -604,7 +630,7 @@ func (c *rmqClient) UpdateTopicRouteInfo() {
 	})
 
 	for topic := range subscribedTopicSet {
-		data, changed, _ := c.namesrvs.UpdateTopicRouteInfo(topic)
+		data, changed, _ := c.GetNameSrv().UpdateTopicRouteInfo(topic)
 		c.updateSubscribeInfo(topic, data, changed)
 	}
 }
@@ -730,8 +756,12 @@ func (c *rmqClient) UnregisterConsumer(group string) {
 	c.consumerMap.Delete(group)
 }
 
-func (c *rmqClient) RegisterProducer(group string, producer InnerProducer) {
-	c.producerMap.Store(group, producer)
+func (c *rmqClient) RegisterProducer(group string, producer InnerProducer) error {
+	_, loaded := c.producerMap.LoadOrStore(group, producer)
+	if loaded {
+		return fmt.Errorf("the producer group \"%s\" has been created, specify another one", c.option.GroupName)
+	}
+	return nil
 }
 
 func (c *rmqClient) UnregisterProducer(group string) {
@@ -760,7 +790,7 @@ func (c *rmqClient) UpdatePublishInfo(topic string, data *TopicRouteData, change
 			updated = p.IsPublishTopicNeedUpdate(topic)
 		}
 		if updated {
-			publishInfo := c.namesrvs.routeData2PublishInfo(topic, data)
+			publishInfo := c.GetNameSrv().(*namesrvs).routeData2PublishInfo(topic, data)
 			publishInfo.HaveTopicRouterInfo = true
 			p.UpdateTopicPublishInfo(topic, publishInfo)
 		}
diff --git a/internal/mock_client.go b/internal/mock_client.go
index ab34ac1..c975038 100644
--- a/internal/mock_client.go
+++ b/internal/mock_client.go
@@ -208,6 +208,15 @@ func (mr *MockInnerConsumerMockRecorder) GetConsumerRunningInfo() *gomock.Call {
 type MockRMQClient struct {
 	ctrl     *gomock.Controller
 	recorder *MockRMQClientMockRecorder
+	Namesrv  *MockNamesrvs
+}
+
+func (m *MockRMQClient) GetNameSrv() Namesrvs {
+	return m.Namesrv
+}
+
+func (m *MockRMQClient) SetNameSrv(mockNamesrvs *MockNamesrvs) {
+	m.Namesrv = mockNamesrvs
 }
 
 // MockRMQClientMockRecorder is the mock recorder for MockRMQClient
@@ -260,8 +269,10 @@ func (mr *MockRMQClientMockRecorder) ClientID() *gomock.Call {
 }
 
 // RegisterProducer mocks base method
-func (m *MockRMQClient) RegisterProducer(group string, producer InnerProducer) {
-	m.ctrl.Call(m, "RegisterProducer", group, producer)
+func (m *MockRMQClient) RegisterProducer(group string, producer InnerProducer) error {
+	ret := m.ctrl.Call(m, "RegisterProducer", group, producer)
+	ret0, _ := ret[0].(error)
+	return ret0
 }
 
 // RegisterProducer indicates an expected call of RegisterProducer
diff --git a/internal/mock_namesrv.go b/internal/mock_namesrv.go
index f87d174..7ce6f97 100644
--- a/internal/mock_namesrv.go
+++ b/internal/mock_namesrv.go
@@ -33,6 +33,10 @@ type MockNamesrvs struct {
 	recorder *MockNamesrvsMockRecorder
 }
 
+func (m *MockNamesrvs) UpdateTopicRouteInfoWithDefault(topic string, defaultTopic string, defaultQueueNum int) (*TopicRouteData, bool, error) {
+	return m.UpdateTopicRouteInfo(topic)
+}
+
 // MockNamesrvsMockRecorder is the mock recorder for MockNamesrvs
 type MockNamesrvsMockRecorder struct {
 	mock *MockNamesrvs
diff --git a/internal/namesrv.go b/internal/namesrv.go
index 7776651..96e708a 100644
--- a/internal/namesrv.go
+++ b/internal/namesrv.go
@@ -50,6 +50,8 @@ type Namesrvs interface {
 
 	UpdateTopicRouteInfo(topic string) (routeData *TopicRouteData, changed bool, err error)
 
+	UpdateTopicRouteInfoWithDefault(topic string, defaultTopic string, defaultQueueNum int) (*TopicRouteData, bool, error)
+
 	FetchPublishMessageQueues(topic string) ([]*primitive.MessageQueue, error)
 
 	FindBrokerAddrByTopic(topic string) string
diff --git a/internal/route.go b/internal/route.go
index 9cfa398..54dbbea 100644
--- a/internal/route.go
+++ b/internal/route.go
@@ -165,7 +165,7 @@ func (s *namesrvs) UpdateTopicRouteInfoWithDefault(topic string, defaultTopic st
 					updated = p.IsPublishTopicNeedUpdate(topic)
 				}
 				if updated {
-					publishInfo := s.bundleClient.namesrvs.routeData2PublishInfo(topic, routeData)
+					publishInfo := s.bundleClient.GetNameSrv().(*namesrvs).routeData2PublishInfo(topic, routeData)
 					publishInfo.HaveTopicRouterInfo = true
 					p.UpdateTopicPublishInfo(topic, publishInfo)
 				}
diff --git a/internal/trace.go b/internal/trace.go
index cef1634..753a4d1 100644
--- a/internal/trace.go
+++ b/internal/trace.go
@@ -276,6 +276,10 @@ func NewTraceDispatcher(traceCfg *primitive.TraceConfig) *traceDispatcher {
 	cliOp.Namesrv = srvs
 	cliOp.Credentials = traceCfg.Credentials
 	cli := GetOrNewRocketMQClient(cliOp, nil)
+	if cli == nil {
+		return nil
+	}
+	cliOp.Namesrv = cli.GetNameSrv()
 	return &traceDispatcher{
 		ctx:    ctx,
 		cancel: cancel,
diff --git a/producer/interceptor.go b/producer/interceptor.go
index 160deac..71eb8e7 100644
--- a/producer/interceptor.go
+++ b/producer/interceptor.go
@@ -22,6 +22,7 @@ package producer
 
 import (
 	"context"
+	"fmt"
 	"time"
 
 	"github.com/apache/rocketmq-client-go/v2/internal"
@@ -42,9 +43,14 @@ func WithTrace(traceCfg *primitive.TraceConfig) Option {
 
 func newTraceInterceptor(traceCfg *primitive.TraceConfig) primitive.Interceptor {
 	dispatcher := internal.NewTraceDispatcher(traceCfg)
-	dispatcher.Start()
+	if dispatcher != nil {
+		dispatcher.Start()
+	}
 
 	return func(ctx context.Context, req, reply interface{}, next primitive.Invoker) error {
+		if dispatcher == nil {
+			return fmt.Errorf("GetOrNewRocketMQClient faild")
+		}
 		beginT := time.Now()
 		err := next(ctx, req, reply)
 
diff --git a/producer/option.go b/producer/option.go
index 5839402..ae76511 100644
--- a/producer/option.go
+++ b/producer/option.go
@@ -35,7 +35,7 @@ func defaultProducerOptions() producerOptions {
 		CompressMsgBodyOverHowmuch: 4096,
 		CompressLevel:              5,
 	}
-	opts.ClientOptions.GroupName = "DEFAULT_CONSUMER"
+	opts.ClientOptions.GroupName = "DEFAULT_PRODUCER"
 	return opts
 }
 
diff --git a/producer/producer.go b/producer/producer.go
index 226eedb..3c875c6 100644
--- a/producer/producer.go
+++ b/producer/producer.go
@@ -67,19 +67,25 @@ func NewDefaultProducer(opts ...Option) (*defaultProducer, error) {
 		options:    defaultOpts,
 	}
 	producer.client = internal.GetOrNewRocketMQClient(defaultOpts.ClientOptions, producer.callbackCh)
-	producer.options.ClientOptions.Namesrv, err = internal.GetNamesrv(producer.client.ClientID())
-	if err != nil {
-		return nil, err
+	if producer.client == nil {
+		return nil, fmt.Errorf("GetOrNewRocketMQClient faild")
 	}
+	defaultOpts.Namesrv = producer.client.GetNameSrv()
+
 	producer.interceptor = primitive.ChainInterceptors(producer.options.Interceptors...)
 
 	return producer, nil
 }
 
 func (p *defaultProducer) Start() error {
+	if p == nil || p.client == nil {
+		return fmt.Errorf("client instance is nil, can not start producer")
+	}
 	atomic.StoreInt32(&p.state, int32(internal.StateRunning))
-
-	p.client.RegisterProducer(p.group, p)
+	err := p.client.RegisterProducer(p.group, p)
+	if err != nil {
+		return err
+	}
 	p.client.Start()
 	return nil
 }
@@ -195,7 +201,7 @@ func (p *defaultProducer) sendSync(ctx context.Context, msg *primitive.Message,
 			continue
 		}
 
-		addr := p.options.Namesrv.FindBrokerAddrByName(mq.BrokerName)
+		addr := p.client.GetNameSrv().FindBrokerAddrByName(mq.BrokerName)
 		if addr == "" {
 			return fmt.Errorf("topic=%s route info not found", mq.Topic)
 		}
@@ -242,7 +248,7 @@ func (p *defaultProducer) sendAsync(ctx context.Context, msg *primitive.Message,
 		return errors.Errorf("the topic=%s route info not found", msg.Topic)
 	}
 
-	addr := p.options.Namesrv.FindBrokerAddrByName(mq.BrokerName)
+	addr := p.client.GetNameSrv().FindBrokerAddrByName(mq.BrokerName)
 	if addr == "" {
 		return errors.Errorf("topic=%s route info not found", mq.Topic)
 	}
@@ -289,7 +295,7 @@ func (p *defaultProducer) sendOneWay(ctx context.Context, msg *primitive.Message
 			continue
 		}
 
-		addr := p.options.Namesrv.FindBrokerAddrByName(mq.BrokerName)
+		addr := p.client.GetNameSrv().FindBrokerAddrByName(mq.BrokerName)
 		if addr == "" {
 			return fmt.Errorf("topic=%s route info not found", mq.Topic)
 		}
@@ -378,7 +384,7 @@ func (p *defaultProducer) selectMessageQueue(msg *primitive.Message) *primitive.
 
 	v, exist := p.publishInfo.Load(topic)
 	if !exist {
-		data, changed, err := p.options.Namesrv.UpdateTopicRouteInfo(topic)
+		data, changed, err := p.client.GetNameSrv().UpdateTopicRouteInfo(topic)
 		if err != nil && primitive.IsRemotingErr(err) {
 			return nil
 		}
@@ -387,7 +393,7 @@ func (p *defaultProducer) selectMessageQueue(msg *primitive.Message) *primitive.
 	}
 
 	if !exist {
-		data, changed, _ := p.options.Namesrv.UpdateTopicRouteInfoWithDefault(topic, p.options.CreateTopicKey, p.options.DefaultTopicQueueNums)
+		data, changed, _ := p.client.GetNameSrv().UpdateTopicRouteInfoWithDefault(topic, p.options.CreateTopicKey, p.options.DefaultTopicQueueNums)
 		p.client.UpdatePublishInfo(topic, data, changed)
 		v, exist = p.publishInfo.Load(topic)
 	}
@@ -558,8 +564,8 @@ func (tp *transactionProducer) endTransaction(result primitive.SendResult, err e
 	} else {
 		msgID, _ = primitive.UnmarshalMsgID([]byte(result.MsgID))
 	}
-	
-	brokerAddr := tp.producer.options.Namesrv.FindBrokerAddrByName(result.MessageQueue.BrokerName)
+	// 估计没有反序列化回来
+	brokerAddr := tp.producer.client.GetNameSrv().FindBrokerAddrByName(result.MessageQueue.BrokerName)
 	requestHeader := &internal.EndTransactionRequestHeader{
 		TransactionId:        result.TransactionID,
 		CommitLogOffset:      msgID.Offset,
diff --git a/producer/producer_test.go b/producer/producer_test.go
index a7c15c1..e1d72dd 100644
--- a/producer/producer_test.go
+++ b/producer/producer_test.go
@@ -47,7 +47,7 @@ func TestShutdown(t *testing.T) {
 	client := internal.NewMockRMQClient(ctrl)
 	p.client = client
 
-	client.EXPECT().RegisterProducer(gomock.Any(), gomock.Any()).Return()
+	client.EXPECT().RegisterProducer(gomock.Any(), gomock.Any()).Return(nil)
 	client.EXPECT().Start().Return()
 	err := p.Start()
 	assert.Nil(t, err)
@@ -117,10 +117,13 @@ func TestSync(t *testing.T) {
 
 	ctrl := gomock.NewController(t)
 	defer ctrl.Finish()
+	namesrvCli := internal.NewMockNamesrvs(ctrl)
 	client := internal.NewMockRMQClient(ctrl)
 	p.client = client
+	client.SetNameSrv(namesrvCli)
+	namesrvCli.EXPECT().FindBrokerAddrByName(gomock.Any()).Return("a")
 
-	client.EXPECT().RegisterProducer(gomock.Any(), gomock.Any()).Return()
+	client.EXPECT().RegisterProducer(gomock.Any(), gomock.Any()).Return(nil)
 	client.EXPECT().Start().Return()
 	err := p.Start()
 	assert.Nil(t, err)
@@ -168,10 +171,13 @@ func TestASync(t *testing.T) {
 
 	ctrl := gomock.NewController(t)
 	defer ctrl.Finish()
+	namesrvCli := internal.NewMockNamesrvs(ctrl)
 	client := internal.NewMockRMQClient(ctrl)
 	p.client = client
+	client.SetNameSrv(namesrvCli)
+	namesrvCli.EXPECT().FindBrokerAddrByName(gomock.Any()).Return("a")
 
-	client.EXPECT().RegisterProducer(gomock.Any(), gomock.Any()).Return()
+	client.EXPECT().RegisterProducer(gomock.Any(), gomock.Any()).Return(nil)
 	client.EXPECT().Start().Return()
 	err := p.Start()
 	assert.Nil(t, err)
@@ -230,10 +236,13 @@ func TestOneway(t *testing.T) {
 
 	ctrl := gomock.NewController(t)
 	defer ctrl.Finish()
+	namesrvCli := internal.NewMockNamesrvs(ctrl)
 	client := internal.NewMockRMQClient(ctrl)
 	p.client = client
+	client.SetNameSrv(namesrvCli)
+	namesrvCli.EXPECT().FindBrokerAddrByName(gomock.Any()).Return("a")
 
-	client.EXPECT().RegisterProducer(gomock.Any(), gomock.Any()).Return()
+	client.EXPECT().RegisterProducer(gomock.Any(), gomock.Any()).Return(nil)
 	client.EXPECT().Start().Return()
 	err := p.Start()
 	assert.Nil(t, err)
@@ -268,10 +277,13 @@ func TestSyncWithNamespace(t *testing.T) {
 
 	ctrl := gomock.NewController(t)
 	defer ctrl.Finish()
+	namesrvCli := internal.NewMockNamesrvs(ctrl)
 	client := internal.NewMockRMQClient(ctrl)
 	p.client = client
+	client.SetNameSrv(namesrvCli)
+	namesrvCli.EXPECT().FindBrokerAddrByName(gomock.Any()).Return("a")
 
-	client.EXPECT().RegisterProducer(gomock.Any(), gomock.Any()).Return()
+	client.EXPECT().RegisterProducer(gomock.Any(), gomock.Any()).Return(nil)
 	client.EXPECT().Start().Return()
 	err := p.Start()
 	assert.Nil(t, err)
@@ -323,7 +335,7 @@ func TestBatchSendDifferentTopics(t *testing.T) {
 	client := internal.NewMockRMQClient(ctrl)
 	p.client = client
 
-	client.EXPECT().RegisterProducer(gomock.Any(), gomock.Any()).Return()
+	client.EXPECT().RegisterProducer(gomock.Any(), gomock.Any()).Return(nil)
 	client.EXPECT().Start().Return()
 	err := p.Start()
 	assert.Nil(t, err)
diff --git a/rlog/log.go b/rlog/log.go
index 037cfcf..a179a40 100644
--- a/rlog/log.go
+++ b/rlog/log.go
@@ -25,6 +25,7 @@ import (
 )
 
 const (
+	LogKeyProducerGroup    = "producerGroup"
 	LogKeyConsumerGroup    = "consumerGroup"
 	LogKeyTopic            = "topic"
 	LogKeyMessageQueue     = "MessageQueue"