You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by di...@apache.org on 2019/08/20 08:38:25 UTC

[rocketmq-client-go] branch native updated: [ISSUE #147] Support Namespace and domain of namesrv (#161)

This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/native by this push:
     new 6303ecd  [ISSUE #147] Support Namespace and domain of namesrv (#161)
6303ecd is described below

commit 6303ecd0878adb88b6ffb78587584aa6dd51b480
Author: wenfeng <sx...@gmail.com>
AuthorDate: Tue Aug 20 16:38:21 2019 +0800

    [ISSUE #147] Support Namespace and domain of namesrv (#161)
    
    * support domain of namesrv
    
    * add namespace support
    
    * add getConsumerInfo func
---
 .gitignore                                   |  3 +-
 consumer/consumer.go                         | 11 +++----
 consumer/option.go                           |  7 ++++
 consumer/pull_consumer.go                    |  4 +--
 consumer/push_consumer.go                    | 14 ++++++--
 examples/consumer/acl/main.go                |  9 +++--
 examples/consumer/{acl => namespace}/main.go | 10 ++++--
 examples/producer/acl/main.go                | 10 ++++--
 examples/producer/{acl => namespace}/main.go | 11 +++++--
 internal/client.go                           | 14 +++++---
 internal/model.go                            |  2 +-
 internal/namesrv.go                          | 17 +++++++++-
 internal/remote/codec.go                     | 49 +++++++++++++++++++++++-----
 internal/remote/remote_client.go             |  2 ++
 internal/response.go                         |  1 +
 internal/route.go                            | 34 +++++++++++--------
 producer/option.go                           |  7 ++++
 producer/producer.go                         | 21 ++++++++++--
 18 files changed, 175 insertions(+), 51 deletions(-)

diff --git a/.gitignore b/.gitignore
index cb35ce2..a0c292d 100644
--- a/.gitignore
+++ b/.gitignore
@@ -2,4 +2,5 @@
 go.mod
 go.sum
 vendor/
-coverage.txt
\ No newline at end of file
+coverage.txt
+examples/test
\ No newline at end of file
diff --git a/consumer/consumer.go b/consumer/consumer.go
index b5bf22f..2c88e5b 100644
--- a/consumer/consumer.go
+++ b/consumer/consumer.go
@@ -227,7 +227,6 @@ func (pr *PullRequest) String() string {
 		pr.consumerGroup, pr.mq.Topic, pr.mq.QueueId)
 }
 
-// TODO hook
 type defaultConsumer struct {
 	/**
 	 * Consumers of the same role is required to have exactly same subscriptions and consumerGroup to correctly achieve
@@ -312,8 +311,8 @@ func (dc *defaultConsumer) persistConsumerOffset() error {
 	return nil
 }
 
-func (c *defaultConsumer) updateOffset(queue *primitive.MessageQueue, offset int64) error {
-	c.storage.update(queue, offset, false)
+func (dc *defaultConsumer) updateOffset(queue *primitive.MessageQueue, offset int64) error {
+	dc.storage.update(queue, offset, false)
 	return nil
 }
 
@@ -615,14 +614,14 @@ func (dc *defaultConsumer) updateProcessQueueTable(topic string, mqs []*primitiv
 					//delete(mqSet, mq)
 					dc.processQueueTable.Delete(key)
 					changed = true
-					rlog.Infof("do defaultConsumer, Group:%s, remove unnecessary mq: %s", dc.consumerGroup, mq.String())
+					rlog.Debugf("do defaultConsumer, Group:%s, remove unnecessary mq: %s", dc.consumerGroup, mq.String())
 				}
 			} else if pq.isPullExpired() && dc.cType == _PushConsume {
 				pq.dropped = true
 				if dc.removeUnnecessaryMessageQueue(&mq, pq) {
 					delete(mqSet, mq)
 					changed = true
-					rlog.Infof("do defaultConsumer, Group:%s, remove unnecessary mq: %s, "+
+					rlog.Debugf("do defaultConsumer, Group:%s, remove unnecessary mq: %s, "+
 						"because pull was paused, so try to fixed it", dc.consumerGroup, mq)
 				}
 			}
@@ -650,7 +649,7 @@ func (dc *defaultConsumer) updateProcessQueueTable(topic string, mqs []*primitiv
 				if exist {
 					rlog.Debugf("do defaultConsumer, Group: %s, mq already exist, %s", dc.consumerGroup, mq.String())
 				} else {
-					rlog.Infof("do defaultConsumer, Group: %s, add a new mq, %s", dc.consumerGroup, mq.String())
+					rlog.Debugf("do defaultConsumer, Group: %s, add a new mq, %s", dc.consumerGroup, mq.String())
 					pq := newProcessQueue(dc.consumeOrderly)
 					dc.processQueueTable.Store(mq, pq)
 					pr := PullRequest{
diff --git a/consumer/option.go b/consumer/option.go
index 558fd90..f92c118 100644
--- a/consumer/option.go
+++ b/consumer/option.go
@@ -175,6 +175,13 @@ func WithNameServer(nameServers []string) Option {
 	}
 }
 
+// WithNamespace set the namespace of consumer
+func WithNamespace(namespace string) Option {
+	return func(opts *consumerOptions) {
+		opts.Namespace = namespace
+	}
+}
+
 func WithVIPChannel(enable bool) Option {
 	return func(opts *consumerOptions) {
 		opts.VIPChannelEnabled = enable
diff --git a/consumer/pull_consumer.go b/consumer/pull_consumer.go
index 742cf16..dad27a8 100644
--- a/consumer/pull_consumer.go
+++ b/consumer/pull_consumer.go
@@ -160,8 +160,8 @@ func (c *defaultPullConsumer) ACK(msg *primitive.Message, result ConsumeResult)
 
 }
 
-func (c *defaultConsumer) checkPull(ctx context.Context, mq *primitive.MessageQueue, offset int64, numbers int) error {
-	err := c.makeSureStateOK()
+func (dc *defaultConsumer) checkPull(ctx context.Context, mq *primitive.MessageQueue, offset int64, numbers int) error {
+	err := dc.makeSureStateOK()
 	if err != nil {
 		return err
 	}
diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index 3b9747c..6303ded 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -64,8 +64,13 @@ func NewPushConsumer(opts ...Option) (*pushConsumer, error) {
 	if err != nil {
 		return nil, errors.Wrap(err, "new Namesrv failed.")
 	}
+	if !defaultOpts.Credentials.IsEmpty() {
+		srvs.SetCredentials(defaultOpts.Credentials)
+	}
 	internal.RegisterNamsrv(srvs)
-
+	if defaultOpts.Namespace != "" {
+		defaultOpts.GroupName = defaultOpts.Namespace + "%" + defaultOpts.GroupName
+	}
 	dc := &defaultConsumer{
 		client:         internal.GetOrNewRocketMQClient(defaultOpts.ClientOptions, nil),
 		consumerGroup:  defaultOpts.GroupName,
@@ -96,7 +101,7 @@ func NewPushConsumer(opts ...Option) (*pushConsumer, error) {
 	return p, nil
 }
 
-// TODO: add shutdown on pushConsumr.
+// TODO: add shutdown on pushConsumer.
 func (pc *pushConsumer) Start() error {
 	var err error
 	pc.once.Do(func() {
@@ -163,6 +168,9 @@ func (pc *pushConsumer) Subscribe(topic string, selector MessageSelector,
 	if pc.state != internal.StateCreateJust {
 		return errors.New("subscribe topic only started before")
 	}
+	if pc.option.Namespace != "" {
+		topic = pc.option.Namespace + "%" + topic
+	}
 	data := buildSubscriptionData(topic, selector)
 	pc.subscriptionDataTable.Store(topic, data)
 	pc.subscribedTopic[topic] = ""
@@ -285,7 +293,7 @@ func (pc *pushConsumer) validate() {
 }
 
 func (pc *pushConsumer) pullMessage(request *PullRequest) {
-	rlog.Infof("start a nwe Pull Message task %s for [%s]", request.String(), pc.consumerGroup)
+	rlog.Debugf("start a new Pull Message task %s for [%s]", request.String(), pc.consumerGroup)
 	var sleepTime time.Duration
 	pq := request.pq
 	go func() {
diff --git a/examples/consumer/acl/main.go b/examples/consumer/acl/main.go
index 4582cde..8f944e7 100644
--- a/examples/consumer/acl/main.go
+++ b/examples/consumer/acl/main.go
@@ -29,7 +29,7 @@ import (
 )
 
 func main() {
-	c, _ := rocketmq.NewPushConsumer(
+	c, err := rocketmq.NewPushConsumer(
 		consumer.WithGroupName("testGroup"),
 		consumer.WithNameServer([]string{"127.0.0.1:9876"}),
 		consumer.WithCredentials(primitive.Credentials{
@@ -37,7 +37,12 @@ func main() {
 			SecretKey: "12345678",
 		}),
 	)
-	err := c.Subscribe("test", consumer.MessageSelector{}, func(ctx context.Context,
+	if err != nil {
+		fmt.Println("init consumer error: " + err.Error())
+		os.Exit(0)
+	}
+
+	err = c.Subscribe("test", consumer.MessageSelector{}, func(ctx context.Context,
 		msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
 		fmt.Printf("subscribe callback: %v \n", msgs)
 		return consumer.ConsumeSuccess, nil
diff --git a/examples/consumer/acl/main.go b/examples/consumer/namespace/main.go
similarity index 86%
copy from examples/consumer/acl/main.go
copy to examples/consumer/namespace/main.go
index 4582cde..a0e8708 100644
--- a/examples/consumer/acl/main.go
+++ b/examples/consumer/namespace/main.go
@@ -29,15 +29,21 @@ import (
 )
 
 func main() {
-	c, _ := rocketmq.NewPushConsumer(
+	c, err := rocketmq.NewPushConsumer(
 		consumer.WithGroupName("testGroup"),
 		consumer.WithNameServer([]string{"127.0.0.1:9876"}),
 		consumer.WithCredentials(primitive.Credentials{
 			AccessKey: "RocketMQ",
 			SecretKey: "12345678",
 		}),
+		consumer.WithNamespace("namespace"),
 	)
-	err := c.Subscribe("test", consumer.MessageSelector{}, func(ctx context.Context,
+	if err != nil {
+		fmt.Println("init consumer error: " + err.Error())
+		os.Exit(0)
+	}
+
+	err = c.Subscribe("test", consumer.MessageSelector{}, func(ctx context.Context,
 		msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
 		fmt.Printf("subscribe callback: %v \n", msgs)
 		return consumer.ConsumeSuccess, nil
diff --git a/examples/producer/acl/main.go b/examples/producer/acl/main.go
index cee23db..a98500c 100644
--- a/examples/producer/acl/main.go
+++ b/examples/producer/acl/main.go
@@ -30,7 +30,7 @@ import (
 )
 
 func main() {
-	p, _ := rocketmq.NewProducer(
+	p, err := rocketmq.NewProducer(
 		producer.WithNameServer([]string{"127.0.0.1:9876"}),
 		producer.WithRetry(2),
 		producer.WithCredentials(primitive.Credentials{
@@ -38,7 +38,13 @@ func main() {
 			SecretKey: "12345678",
 		}),
 	)
-	err := p.Start()
+
+	if err != nil {
+		fmt.Println("init producer error: " + err.Error())
+		os.Exit(0)
+	}
+
+	err = p.Start()
 	if err != nil {
 		fmt.Printf("start producer error: %s", err.Error())
 		os.Exit(1)
diff --git a/examples/producer/acl/main.go b/examples/producer/namespace/main.go
similarity index 91%
copy from examples/producer/acl/main.go
copy to examples/producer/namespace/main.go
index cee23db..002868a 100644
--- a/examples/producer/acl/main.go
+++ b/examples/producer/namespace/main.go
@@ -30,15 +30,22 @@ import (
 )
 
 func main() {
-	p, _ := rocketmq.NewProducer(
+	p, err := rocketmq.NewProducer(
 		producer.WithNameServer([]string{"127.0.0.1:9876"}),
 		producer.WithRetry(2),
 		producer.WithCredentials(primitive.Credentials{
 			AccessKey: "RocketMQ",
 			SecretKey: "12345678",
 		}),
+		producer.WithNamespace("namespace"),
 	)
-	err := p.Start()
+
+	if err != nil {
+		fmt.Println("init producer error: " + err.Error())
+		os.Exit(0)
+	}
+
+	err = p.Start()
 	if err != nil {
 		fmt.Printf("start producer error: %s", err.Error())
 		os.Exit(1)
diff --git a/internal/client.go b/internal/client.go
index c6b3246..11eca1e 100644
--- a/internal/client.go
+++ b/internal/client.go
@@ -102,10 +102,10 @@ type ClientOptions struct {
 	UnitMode          bool
 	UnitName          string
 	VIPChannelEnabled bool
-	ACLEnabled        bool
 	RetryTimes        int
 	Interceptors      []primitive.Interceptor
 	Credentials       primitive.Credentials
+	Namespace         string
 }
 
 func (opt *ClientOptions) ChangeInstanceNameToPID() {
@@ -116,8 +116,8 @@ func (opt *ClientOptions) ChangeInstanceNameToPID() {
 
 func (opt *ClientOptions) String() string {
 	return fmt.Sprintf("ClientOption [ClientIP=%s, InstanceName=%s, "+
-		"UnitMode=%v, UnitName=%s, VIPChannelEnabled=%v, ACLEnabled=%v]", opt.ClientIP,
-		opt.InstanceName, opt.UnitMode, opt.UnitName, opt.VIPChannelEnabled, opt.ACLEnabled)
+		"UnitMode=%v, UnitName=%s, VIPChannelEnabled=%v]", opt.ClientIP,
+		opt.InstanceName, opt.UnitMode, opt.UnitName, opt.VIPChannelEnabled)
 }
 
 //go:generate mockgen -source client.go -destination mock_client.go -self_package github.com/apache/rocketmq-client-go/internal  --package internal RMQClient
@@ -210,6 +210,12 @@ func GetOrNewRocketMQClient(option ClientOptions, callbackCh chan interface{}) *
 			return nil
 		})
 
+		client.remoteClient.RegisterRequestFunc(ReqGetConsumerRunningInfo, func(req *remote.RemotingCommand, addr net.Addr) *remote.RemotingCommand {
+			rlog.Info("receive get consumer running info request...")
+			res := remote.NewRemotingCommand(ResError, nil, nil)
+			res.Remark = "the go client has not supported consumer running info"
+			return res
+		})
 	}
 	return actual.(*rmqClient)
 }
@@ -363,7 +369,7 @@ func (c *rmqClient) SendHeartbeatToAllBrokerWithLock() {
 					brokerVersionMap.Store(brokerName, m)
 				}
 				m[brokerName] = int32(response.Version)
-				rlog.Infof("send heart beat to broker[%s %d %s] success", brokerName, id, addr)
+				rlog.Debugf("send heart beat to broker[%s %d %s] success", brokerName, id, addr)
 			}
 		}
 		return true
diff --git a/internal/model.go b/internal/model.go
index f534234..86b286e 100644
--- a/internal/model.go
+++ b/internal/model.go
@@ -95,6 +95,6 @@ func (data *heartbeatData) encode() []byte {
 		rlog.Errorf("marshal heartbeatData error: %s", err.Error())
 		return nil
 	}
-	rlog.Info("heartbeat: " + string(d))
+	rlog.Debugf("heartbeat: " + string(d))
 	return d
 }
diff --git a/internal/namesrv.go b/internal/namesrv.go
index 66b3e22..d8e1a40 100644
--- a/internal/namesrv.go
+++ b/internal/namesrv.go
@@ -22,6 +22,8 @@ import (
 	"regexp"
 	"strings"
 	"sync"
+
+	"github.com/apache/rocketmq-client-go/primitive"
 )
 
 var (
@@ -42,6 +44,9 @@ type Namesrvs struct {
 
 	// index indicate the next position for getNamesrv
 	index int
+
+	// credentials for query topic route
+	credentials primitive.Credentials
 }
 
 // NewNamesrv init Namesrv from namesrv addr string.
@@ -80,7 +85,7 @@ func (s *Namesrvs) GetNamesrv() string {
 	}
 	index %= len(s.srvs)
 	s.index = index
-	return addr
+	return strings.TrimLeft(addr, "http(s)://")
 }
 
 func (s *Namesrvs) Size() int {
@@ -90,8 +95,18 @@ func (s *Namesrvs) Size() int {
 func (s *Namesrvs) String() string {
 	return strings.Join(s.srvs, ";")
 }
+func (s *Namesrvs) SetCredentials(credentials primitive.Credentials) {
+	s.credentials = credentials
+}
+
+var (
+	httpPrefixRegex, _ = regexp.Compile("^(http|https)://")
+)
 
 func verifyIP(ip string) error {
+	if httpPrefixRegex.MatchString(ip) {
+		return nil
+	}
 	if strings.Contains(ip, ";") {
 		return ErrMultiIP
 	}
diff --git a/internal/remote/codec.go b/internal/remote/codec.go
index 3093514..4c354cb 100644
--- a/internal/remote/codec.go
+++ b/internal/remote/codec.go
@@ -29,21 +29,52 @@ var opaque int32
 const (
 	// 0, REQUEST_COMMAND
 	RPCType = 0
-
 	// 1, RPC
 	RPCOneWay = 1
-
 	//ResponseType for response
 	ResponseType = 1
+	_Flag        = 0
+	_Version     = 317
+)
+
+type LanguageCode byte
 
-	_Flag         = 0
-	_LanguageCode = byte(9)
-	_Version      = 137
+const (
+	_Java    = LanguageCode(0)
+	_Go      = LanguageCode(9)
+	_Unknown = LanguageCode(127)
 )
 
+func (lc LanguageCode) MarshalJSON() ([]byte, error) {
+	return []byte(`"GO"`), nil
+}
+
+func (lc *LanguageCode) UnmarshalJSON(b []byte) error {
+	switch string(b) {
+	case "JAVA":
+		*lc = _Java
+	case "GO":
+		*lc = _Go
+	default:
+		*lc = _Unknown
+	}
+	return nil
+}
+
+func (lc LanguageCode) String() string {
+	switch lc {
+	case _Java:
+		return "JAVA"
+	case _Go:
+		return "GO"
+	default:
+		return "unknown"
+	}
+}
+
 type RemotingCommand struct {
 	Code      int16             `json:"code"`
-	Language  byte              `json:"-"`
+	Language  LanguageCode      `json:"language,string"`
 	Version   int16             `json:"version"`
 	Opaque    int32             `json:"opaque"`
 	Flag      int32             `json:"flag"`
@@ -62,7 +93,7 @@ func NewRemotingCommand(code int16, header CustomHeader, body []byte) *RemotingC
 		Version:   _Version,
 		Opaque:    atomic.AddInt32(&opaque, 1),
 		Body:      body,
-		Language:  _LanguageCode,
+		Language:  _Go,
 		ExtFields: make(map[string]string),
 	}
 
@@ -264,7 +295,7 @@ func (c *rmqCodec) encodeHeader(command *RemotingCommand) ([]byte, error) {
 	}
 
 	// language flag, length is 1 byte
-	err = binary.Write(buf, binary.BigEndian, _LanguageCode)
+	err = binary.Write(buf, binary.BigEndian, _Go)
 	if err != nil {
 		return nil, err
 	}
@@ -364,7 +395,7 @@ func (c *rmqCodec) decodeHeader(data []byte) (*RemotingCommand, error) {
 	if err != nil {
 		return nil, err
 	}
-	command.Language = languageCode
+	command.Language = LanguageCode(languageCode)
 
 	var version int16
 	err = binary.Read(buf, binary.BigEndian, &version)
diff --git a/internal/remote/remote_client.go b/internal/remote/remote_client.go
index e8f10c2..f9b77d5 100644
--- a/internal/remote/remote_client.go
+++ b/internal/remote/remote_client.go
@@ -150,6 +150,8 @@ func (c *RemotingClient) receiveResponse(r net.Conn) {
 				go func() { // 单个goroutine会造成死锁
 					res := f(cmd, r.RemoteAddr())
 					if res != nil {
+						res.Opaque = cmd.Opaque
+						res.Flag |= 1 << 0
 						err := c.sendRequest(r, res)
 						if err != nil {
 							rlog.Warnf("send response to broker error: %s, type is: %d", err, res.Code)
diff --git a/internal/response.go b/internal/response.go
index 0937db7..ae75b9c 100644
--- a/internal/response.go
+++ b/internal/response.go
@@ -19,6 +19,7 @@ package internal
 
 const (
 	ResSuccess              = int16(0)
+	ResError                = int16(1)
 	ResFlushDiskTimeout     = int16(10)
 	ResSlaveNotAvailable    = int16(11)
 	ResFlushSlaveTimeout    = int16(12)
diff --git a/internal/route.go b/internal/route.go
index 9c6caa9..4f70eb0 100644
--- a/internal/route.go
+++ b/internal/route.go
@@ -52,6 +52,9 @@ var (
 )
 
 func RegisterNamsrv(s *Namesrvs) {
+	if !s.credentials.IsEmpty() {
+		nameSrvClient.RegisterInterceptor(remote.ACLInterceptor(s.credentials))
+	}
 	nameSrvs = s
 }
 
@@ -202,8 +205,8 @@ func FindBrokerAddrByName(brokerName string) string {
 func FindBrokerAddressInSubscribe(brokerName string, brokerId int64, onlyThisBroker bool) *FindBrokerResult {
 	var (
 		brokerAddr = ""
-		slave      = false
-		found      = false
+		//slave      = false
+		//found      = false
 	)
 
 	v, exist := brokerAddressesMap.Load(brokerName)
@@ -212,22 +215,27 @@ func FindBrokerAddressInSubscribe(brokerName string, brokerId int64, onlyThisBro
 		return nil
 	}
 	data := v.(*BrokerData)
-	for k, v := range data.BrokerAddresses {
-		if v != "" {
-			found = true
-			if k != MasterId {
-				slave = true
-			}
-			brokerAddr = v
-			break
-		}
+	if len(data.BrokerAddresses) == 0 {
+		return nil
 	}
 
+	brokerAddr = data.BrokerAddresses[brokerId]
+	//for k, v := range data.BrokerAddresses {
+	//	if v != "" {
+	//		found = true
+	//		if k != MasterId {
+	//			slave = true
+	//		}
+	//		brokerAddr = v
+	//		break
+	//	}
+	//}
+
 	var result *FindBrokerResult
-	if found {
+	if brokerAddr != "" {
 		result = &FindBrokerResult{
 			BrokerAddr:    brokerAddr,
-			Slave:         slave,
+			Slave:         brokerId != 0,
 			BrokerVersion: findBrokerVersion(brokerName, brokerAddr),
 		}
 	}
diff --git a/producer/option.go b/producer/option.go
index 5ec003c..ac88120 100644
--- a/producer/option.go
+++ b/producer/option.go
@@ -61,6 +61,13 @@ func WithNameServer(nameServers []string) Option {
 	}
 }
 
+// WithNamespace set the namespace of producer
+func WithNamespace(namespace string) Option {
+	return func(opts *producerOptions) {
+		opts.Namespace = namespace
+	}
+}
+
 func WithSendMsgTimeout(duration time.Duration) Option {
 	return func(opts *producerOptions) {
 		opts.SendMsgTimeout = duration
diff --git a/producer/producer.go b/producer/producer.go
index 1d082e9..12ac962 100644
--- a/producer/producer.go
+++ b/producer/producer.go
@@ -59,6 +59,9 @@ func NewDefaultProducer(opts ...Option) (*defaultProducer, error) {
 	if err != nil {
 		return nil, errors.Wrap(err, "new Namesrv failed.")
 	}
+	if !defaultOpts.Credentials.IsEmpty() {
+		srvs.SetCredentials(defaultOpts.Credentials)
+	}
 	internal.RegisterNamsrv(srvs)
 
 	producer := &defaultProducer{
@@ -140,6 +143,10 @@ func (p *defaultProducer) sendSync(ctx context.Context, msg *primitive.Message,
 		err error
 	)
 
+	if p.options.Namespace != "" {
+		msg.Topic = p.options.Namespace + "%" + msg.Topic
+	}
+
 	var producerCtx *primitive.ProducerCtx
 	for retryCount := 0; retryCount < retryTime; retryCount++ {
 		mq := p.selectMessageQueue(msg)
@@ -185,6 +192,9 @@ func (p *defaultProducer) SendAsync(ctx context.Context, f func(context.Context,
 }
 
 func (p *defaultProducer) sendAsync(ctx context.Context, msg *primitive.Message, h func(context.Context, *primitive.SendResult, error)) error {
+	if p.options.Namespace != "" {
+		msg.Topic = p.options.Namespace + "%" + msg.Topic
+	}
 	mq := p.selectMessageQueue(msg)
 	if mq == nil {
 		return errors.Errorf("the topic=%s route info not found", msg.Topic)
@@ -224,9 +234,11 @@ func (p *defaultProducer) SendOneWay(ctx context.Context, msg *primitive.Message
 func (p *defaultProducer) sendOneWay(ctx context.Context, msg *primitive.Message) error {
 	retryTime := 1 + p.options.RetryTimes
 
-	var (
-		err error
-	)
+	if p.options.Namespace != "" {
+		msg.Topic = p.options.Namespace + "%" + msg.Topic
+	}
+
+	var err error
 	for retryCount := 0; retryCount < retryTime; retryCount++ {
 		mq := p.selectMessageQueue(msg)
 		if mq == nil {
@@ -251,6 +263,9 @@ func (p *defaultProducer) sendOneWay(ctx context.Context, msg *primitive.Message
 
 func (p *defaultProducer) buildSendRequest(mq *primitive.MessageQueue,
 	msg *primitive.Message) *remote.RemotingCommand {
+	if msg.Properties == nil {
+		msg.Properties = make(map[string]string, 0)
+	}
 	if !msg.Batch && msg.Properties[primitive.PropertyUniqueClientMessageIdKeyIndex] == "" {
 		msg.Properties[primitive.PropertyUniqueClientMessageIdKeyIndex] = primitive.CreateUniqID()
 	}