You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by di...@apache.org on 2020/02/24 02:51:38 UTC

[rocketmq-client-go] branch native updated: feat(producer): auto create topic for producer (#416)

This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/native by this push:
     new 1ade23a  feat(producer): auto create topic for producer (#416)
1ade23a is described below

commit 1ade23a0854642a561d224dfd4ed9f4b8beeb6c0
Author: xujianhai666 <52...@users.noreply.github.com>
AuthorDate: Mon Feb 24 10:51:28 2020 +0800

    feat(producer): auto create topic for producer (#416)
    
    Closes #415
---
 examples/producer/simple/main.go | 12 +++++++++---
 internal/client.go               |  4 ++--
 internal/mock_namesrv.go         | 32 ++++++++++++++++++++++++++------
 internal/namesrv.go              |  2 +-
 internal/route.go                | 38 ++++++++++++++++++++++++++++++--------
 primitive/errors.go              | 30 ++++++++++++++++++++++++++++++
 producer/option.go               | 27 ++++++++++++++++++++++-----
 producer/producer.go             | 11 ++++++++++-
 8 files changed, 130 insertions(+), 26 deletions(-)

diff --git a/examples/producer/simple/main.go b/examples/producer/simple/main.go
index 63bc5a1..08b7682 100644
--- a/examples/producer/simple/main.go
+++ b/examples/producer/simple/main.go
@@ -21,6 +21,7 @@ import (
 	"context"
 	"fmt"
 	"os"
+	"strconv"
 
 	"github.com/apache/rocketmq-client-go/v2"
 	"github.com/apache/rocketmq-client-go/v2/primitive"
@@ -38,9 +39,14 @@ func main() {
 		fmt.Printf("start producer error: %s", err.Error())
 		os.Exit(1)
 	}
-	for i := 0; i < 1000; i++ {
-		res, err := p.SendSync(context.Background(), primitive.NewMessage("test",
-			[]byte("Hello RocketMQ Go Client!")))
+	topic := "test"
+
+	for i := 0; i < 10; i++ {
+		msg := &primitive.Message{
+			Topic: topic,
+			Body:  []byte("Hello RocketMQ Go Client! " + strconv.Itoa(i)),
+		}
+		res, err := p.SendSync(context.Background(), msg)
 
 		if err != nil {
 			fmt.Printf("send message error: %s\n", err)
diff --git a/internal/client.go b/internal/client.go
index f810b3a..72debb0 100644
--- a/internal/client.go
+++ b/internal/client.go
@@ -499,7 +499,7 @@ func (c *rmqClient) UpdateTopicRouteInfo() {
 		return true
 	})
 	for topic := range publishTopicSet {
-		data, changed := c.namesrvs.UpdateTopicRouteInfo(topic)
+		data, changed, _ := c.namesrvs.UpdateTopicRouteInfo(topic)
 		c.UpdatePublishInfo(topic, data, changed)
 	}
 
@@ -514,7 +514,7 @@ func (c *rmqClient) UpdateTopicRouteInfo() {
 	})
 
 	for topic := range subscribedTopicSet {
-		data, changed := c.namesrvs.UpdateTopicRouteInfo(topic)
+		data, changed, _ := c.namesrvs.UpdateTopicRouteInfo(topic)
 		c.updateSubscribeInfo(topic, data, changed)
 	}
 }
diff --git a/internal/mock_namesrv.go b/internal/mock_namesrv.go
index a7a0842..7f9bfd6 100644
--- a/internal/mock_namesrv.go
+++ b/internal/mock_namesrv.go
@@ -21,11 +21,10 @@ limitations under the License.
 package internal
 
 import (
-	"reflect"
+	reflect "reflect"
 
-	"github.com/golang/mock/gomock"
-
-	"github.com/apache/rocketmq-client-go/v2/primitive"
+	primitive "github.com/apache/rocketmq-client-go/v2/primitive"
+	gomock "github.com/golang/mock/gomock"
 )
 
 // MockNamesrvs is a mock of Namesrvs interface
@@ -53,49 +52,59 @@ func (m *MockNamesrvs) EXPECT() *MockNamesrvsMockRecorder {
 
 // UpdateNameServerAddress mocks base method
 func (m *MockNamesrvs) UpdateNameServerAddress(nameServerDomain, instanceName string) {
+	m.ctrl.T.Helper()
 	m.ctrl.Call(m, "UpdateNameServerAddress", nameServerDomain, instanceName)
 }
 
 // UpdateNameServerAddress indicates an expected call of UpdateNameServerAddress
 func (mr *MockNamesrvsMockRecorder) UpdateNameServerAddress(nameServerDomain, instanceName interface{}) *gomock.Call {
+	mr.mock.ctrl.T.Helper()
 	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateNameServerAddress", reflect.TypeOf((*MockNamesrvs)(nil).UpdateNameServerAddress), nameServerDomain, instanceName)
 }
 
 // AddBroker mocks base method
 func (m *MockNamesrvs) AddBroker(routeData *TopicRouteData) {
+	m.ctrl.T.Helper()
 	m.ctrl.Call(m, "AddBroker", routeData)
 }
 
 // AddBroker indicates an expected call of AddBroker
 func (mr *MockNamesrvsMockRecorder) AddBroker(routeData interface{}) *gomock.Call {
+	mr.mock.ctrl.T.Helper()
 	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddBroker", reflect.TypeOf((*MockNamesrvs)(nil).AddBroker), routeData)
 }
 
 // cleanOfflineBroker mocks base method
 func (m *MockNamesrvs) cleanOfflineBroker() {
+	m.ctrl.T.Helper()
 	m.ctrl.Call(m, "cleanOfflineBroker")
 }
 
 // cleanOfflineBroker indicates an expected call of cleanOfflineBroker
 func (mr *MockNamesrvsMockRecorder) cleanOfflineBroker() *gomock.Call {
+	mr.mock.ctrl.T.Helper()
 	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "cleanOfflineBroker", reflect.TypeOf((*MockNamesrvs)(nil).cleanOfflineBroker))
 }
 
 // UpdateTopicRouteInfo mocks base method
-func (m *MockNamesrvs) UpdateTopicRouteInfo(topic string) (*TopicRouteData, bool) {
+func (m *MockNamesrvs) UpdateTopicRouteInfo(topic string) (*TopicRouteData, bool, error) {
+	m.ctrl.T.Helper()
 	ret := m.ctrl.Call(m, "UpdateTopicRouteInfo", topic)
 	ret0, _ := ret[0].(*TopicRouteData)
 	ret1, _ := ret[1].(bool)
-	return ret0, ret1
+	ret2, _ := ret[2].(error)
+	return ret0, ret1, ret2
 }
 
 // UpdateTopicRouteInfo indicates an expected call of UpdateTopicRouteInfo
 func (mr *MockNamesrvsMockRecorder) UpdateTopicRouteInfo(topic interface{}) *gomock.Call {
+	mr.mock.ctrl.T.Helper()
 	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateTopicRouteInfo", reflect.TypeOf((*MockNamesrvs)(nil).UpdateTopicRouteInfo), topic)
 }
 
 // FetchPublishMessageQueues mocks base method
 func (m *MockNamesrvs) FetchPublishMessageQueues(topic string) ([]*primitive.MessageQueue, error) {
+	m.ctrl.T.Helper()
 	ret := m.ctrl.Call(m, "FetchPublishMessageQueues", topic)
 	ret0, _ := ret[0].([]*primitive.MessageQueue)
 	ret1, _ := ret[1].(error)
@@ -104,11 +113,13 @@ func (m *MockNamesrvs) FetchPublishMessageQueues(topic string) ([]*primitive.Mes
 
 // FetchPublishMessageQueues indicates an expected call of FetchPublishMessageQueues
 func (mr *MockNamesrvsMockRecorder) FetchPublishMessageQueues(topic interface{}) *gomock.Call {
+	mr.mock.ctrl.T.Helper()
 	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchPublishMessageQueues", reflect.TypeOf((*MockNamesrvs)(nil).FetchPublishMessageQueues), topic)
 }
 
 // FindBrokerAddrByTopic mocks base method
 func (m *MockNamesrvs) FindBrokerAddrByTopic(topic string) string {
+	m.ctrl.T.Helper()
 	ret := m.ctrl.Call(m, "FindBrokerAddrByTopic", topic)
 	ret0, _ := ret[0].(string)
 	return ret0
@@ -116,11 +127,13 @@ func (m *MockNamesrvs) FindBrokerAddrByTopic(topic string) string {
 
 // FindBrokerAddrByTopic indicates an expected call of FindBrokerAddrByTopic
 func (mr *MockNamesrvsMockRecorder) FindBrokerAddrByTopic(topic interface{}) *gomock.Call {
+	mr.mock.ctrl.T.Helper()
 	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FindBrokerAddrByTopic", reflect.TypeOf((*MockNamesrvs)(nil).FindBrokerAddrByTopic), topic)
 }
 
 // FindBrokerAddrByName mocks base method
 func (m *MockNamesrvs) FindBrokerAddrByName(brokerName string) string {
+	m.ctrl.T.Helper()
 	ret := m.ctrl.Call(m, "FindBrokerAddrByName", brokerName)
 	ret0, _ := ret[0].(string)
 	return ret0
@@ -128,11 +141,13 @@ func (m *MockNamesrvs) FindBrokerAddrByName(brokerName string) string {
 
 // FindBrokerAddrByName indicates an expected call of FindBrokerAddrByName
 func (mr *MockNamesrvsMockRecorder) FindBrokerAddrByName(brokerName interface{}) *gomock.Call {
+	mr.mock.ctrl.T.Helper()
 	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FindBrokerAddrByName", reflect.TypeOf((*MockNamesrvs)(nil).FindBrokerAddrByName), brokerName)
 }
 
 // FindBrokerAddressInSubscribe mocks base method
 func (m *MockNamesrvs) FindBrokerAddressInSubscribe(brokerName string, brokerId int64, onlyThisBroker bool) *FindBrokerResult {
+	m.ctrl.T.Helper()
 	ret := m.ctrl.Call(m, "FindBrokerAddressInSubscribe", brokerName, brokerId, onlyThisBroker)
 	ret0, _ := ret[0].(*FindBrokerResult)
 	return ret0
@@ -140,11 +155,13 @@ func (m *MockNamesrvs) FindBrokerAddressInSubscribe(brokerName string, brokerId
 
 // FindBrokerAddressInSubscribe indicates an expected call of FindBrokerAddressInSubscribe
 func (mr *MockNamesrvsMockRecorder) FindBrokerAddressInSubscribe(brokerName, brokerId, onlyThisBroker interface{}) *gomock.Call {
+	mr.mock.ctrl.T.Helper()
 	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FindBrokerAddressInSubscribe", reflect.TypeOf((*MockNamesrvs)(nil).FindBrokerAddressInSubscribe), brokerName, brokerId, onlyThisBroker)
 }
 
 // FetchSubscribeMessageQueues mocks base method
 func (m *MockNamesrvs) FetchSubscribeMessageQueues(topic string) ([]*primitive.MessageQueue, error) {
+	m.ctrl.T.Helper()
 	ret := m.ctrl.Call(m, "FetchSubscribeMessageQueues", topic)
 	ret0, _ := ret[0].([]*primitive.MessageQueue)
 	ret1, _ := ret[1].(error)
@@ -153,11 +170,13 @@ func (m *MockNamesrvs) FetchSubscribeMessageQueues(topic string) ([]*primitive.M
 
 // FetchSubscribeMessageQueues indicates an expected call of FetchSubscribeMessageQueues
 func (mr *MockNamesrvsMockRecorder) FetchSubscribeMessageQueues(topic interface{}) *gomock.Call {
+	mr.mock.ctrl.T.Helper()
 	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchSubscribeMessageQueues", reflect.TypeOf((*MockNamesrvs)(nil).FetchSubscribeMessageQueues), topic)
 }
 
 // AddrList mocks base method
 func (m *MockNamesrvs) AddrList() []string {
+	m.ctrl.T.Helper()
 	ret := m.ctrl.Call(m, "AddrList")
 	ret0, _ := ret[0].([]string)
 	return ret0
@@ -165,5 +184,6 @@ func (m *MockNamesrvs) AddrList() []string {
 
 // AddrList indicates an expected call of AddrList
 func (mr *MockNamesrvsMockRecorder) AddrList() *gomock.Call {
+	mr.mock.ctrl.T.Helper()
 	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddrList", reflect.TypeOf((*MockNamesrvs)(nil).AddrList))
 }
diff --git a/internal/namesrv.go b/internal/namesrv.go
index 9bbfc9c..a06a09e 100644
--- a/internal/namesrv.go
+++ b/internal/namesrv.go
@@ -54,7 +54,7 @@ type Namesrvs interface {
 
 	cleanOfflineBroker()
 
-	UpdateTopicRouteInfo(topic string) (routeData *TopicRouteData, changed bool)
+	UpdateTopicRouteInfo(topic string) (routeData *TopicRouteData, changed bool, err error)
 
 	FetchPublishMessageQueues(topic string) ([]*primitive.MessageQueue, error)
 
diff --git a/internal/route.go b/internal/route.go
index d3a5f01..8270a8c 100644
--- a/internal/route.go
+++ b/internal/route.go
@@ -112,23 +112,45 @@ func (info *TopicPublishInfo) fetchQueueIndex() int {
 	return int(qIndex) % length
 }
 
-func (s *namesrvs) UpdateTopicRouteInfo(topic string) (*TopicRouteData, bool) {
+func (s *namesrvs) UpdateTopicRouteInfo(topic string) (*TopicRouteData, bool, error) {
+	return s.UpdateTopicRouteInfoWithDefault(topic, "", 0)
+}
+
+func (s *namesrvs) UpdateTopicRouteInfoWithDefault(topic string, defaultTopic string, defaultQueueNum int) (*TopicRouteData, bool, error) {
 	s.lockNamesrv.Lock()
 	defer s.lockNamesrv.Unlock()
 
-	routeData, err := s.queryTopicRouteInfoFromServer(topic)
+	var (
+		routeData *TopicRouteData
+		err       error
+	)
+
+	t := topic
+	if len(defaultTopic) > 0 {
+		t = defaultTopic
+	}
+	routeData, err = s.queryTopicRouteInfoFromServer(t)
+
 	if err != nil {
 		rlog.Warning("query topic route from server error", map[string]interface{}{
 			rlog.LogKeyUnderlayError: err,
 		})
-		return nil, false
 	}
 
 	if routeData == nil {
 		rlog.Warning("queryTopicRouteInfoFromServer return nil", map[string]interface{}{
 			rlog.LogKeyTopic: topic,
 		})
-		return nil, false
+		return nil, false, err
+	}
+
+	if len(defaultTopic) > 0 {
+		for _, q := range routeData.QueueDataList {
+			if q.ReadQueueNums > defaultQueueNum {
+				q.ReadQueueNums = defaultQueueNum
+				q.WriteQueueNums = defaultQueueNum
+			}
+		}
 	}
 
 	oldRouteData, exist := s.routeDataMap.Load(topic)
@@ -150,7 +172,7 @@ func (s *namesrvs) UpdateTopicRouteInfo(topic string) (*TopicRouteData, bool) {
 		}
 	}
 
-	return routeData.clone(), changed
+	return routeData.clone(), changed, nil
 }
 
 func (s *namesrvs) AddBroker(routeData *TopicRouteData) {
@@ -330,13 +352,13 @@ func (s *namesrvs) queryTopicRouteInfoFromServer(topic string) (*TopicRouteData,
 		rlog.Error("connect to namesrv failed.", map[string]interface{}{
 			"namesrv": s,
 		})
-		return nil, err
+		return nil, primitive.NewRemotingErr(err.Error())
 	}
 
 	switch response.Code {
 	case ResSuccess:
 		if response.Body == nil {
-			return nil, errors.New(response.Remark)
+			return nil, primitive.NewMQClientErr(response.Code, response.Remark)
 		}
 		routeData := &TopicRouteData{}
 
@@ -351,7 +373,7 @@ func (s *namesrvs) queryTopicRouteInfoFromServer(topic string) (*TopicRouteData,
 	case ResTopicNotExist:
 		return nil, ErrTopicNotExist
 	default:
-		return nil, errors.New(response.Remark)
+		return nil, primitive.NewMQClientErr(response.Code, response.Remark)
 	}
 }
 
diff --git a/primitive/errors.go b/primitive/errors.go
index 5a53a9f..9b1a88e 100644
--- a/primitive/errors.go
+++ b/primitive/errors.go
@@ -36,3 +36,33 @@ type MQBrokerErr struct {
 func (e MQBrokerErr) Error() string {
 	return "CODE: " + strconv.Itoa(int(e.ResponseCode)) + "  DESC: " + e.ErrorMessage
 }
+
+func NewRemotingErr(s string) error {
+	return &RemotingErr{s: s}
+}
+
+type RemotingErr struct {
+	s string
+}
+
+func (e *RemotingErr) Error() string {
+	return e.s
+}
+
+func NewMQClientErr(code int16, msg string) error {
+	return &MQClientErr{code: code, msg: msg}
+}
+
+type MQClientErr struct {
+	code int16
+	msg  string
+}
+
+func (e MQClientErr) Error() string {
+	return "CODE: " + strconv.Itoa(int(e.code)) + "  DESC: " + e.msg
+}
+
+func IsRemotingErr(err error) bool {
+	_, ok := err.(*RemotingErr)
+	return ok
+}
diff --git a/producer/option.go b/producer/option.go
index 6fc4d25..ae5d51f 100644
--- a/producer/option.go
+++ b/producer/option.go
@@ -26,9 +26,11 @@ import (
 
 func defaultProducerOptions() producerOptions {
 	opts := producerOptions{
-		ClientOptions:  internal.DefaultClientOptions(),
-		Selector:       NewHashQueueSelector(),
-		SendMsgTimeout: 3 * time.Second,
+		ClientOptions:         internal.DefaultClientOptions(),
+		Selector:              NewRoundRobinQueueSelector(),
+		SendMsgTimeout:        3 * time.Second,
+		DefaultTopicQueueNums: 4,
+		CreateTopicKey:        "TBW102",
 	}
 	opts.ClientOptions.GroupName = "DEFAULT_CONSUMER"
 	return opts
@@ -36,8 +38,11 @@ func defaultProducerOptions() producerOptions {
 
 type producerOptions struct {
 	internal.ClientOptions
-	Selector       QueueSelector
-	SendMsgTimeout time.Duration
+	Selector              QueueSelector
+	SendMsgTimeout        time.Duration
+	DefaultTopicQueueNums int
+	CreateTopicKey        string // "TBW102" Will be created at broker when isAutoCreateTopicEnable. when topic is not created,
+	// and broker open isAutoCreateTopicEnable, topic will use "TBW102" config to create topic
 }
 
 type Option func(*producerOptions)
@@ -109,3 +114,15 @@ func WithCredentials(c primitive.Credentials) Option {
 		options.ClientOptions.Credentials = c
 	}
 }
+
+func WithDefaultTopicQueueNums(queueNum int) Option {
+	return func(options *producerOptions) {
+		options.DefaultTopicQueueNums = queueNum
+	}
+}
+
+func WithCreateTopicKey(topic string) Option {
+	return func(options *producerOptions) {
+		options.CreateTopicKey = topic
+	}
+}
diff --git a/producer/producer.go b/producer/producer.go
index 5503abf..edddf90 100644
--- a/producer/producer.go
+++ b/producer/producer.go
@@ -342,7 +342,16 @@ func (p *defaultProducer) selectMessageQueue(msg *primitive.Message) *primitive.
 
 	v, exist := p.publishInfo.Load(topic)
 	if !exist {
-		data, changed := p.options.Namesrv.UpdateTopicRouteInfo(topic)
+		data, changed, err := p.options.Namesrv.UpdateTopicRouteInfo(topic)
+		if err != nil && primitive.IsRemotingErr(err) {
+			return nil
+		}
+		p.client.UpdatePublishInfo(topic, data, changed)
+		v, exist = p.publishInfo.Load(topic)
+	}
+
+	if !exist {
+		data, changed, _ := p.options.Namesrv.UpdateTopicRouteInfoWithDefault(topic, p.options.CreateTopicKey, p.options.DefaultTopicQueueNums)
 		p.client.UpdatePublishInfo(topic, data, changed)
 		v, exist = p.publishInfo.Load(topic)
 	}