You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by di...@apache.org on 2020/02/24 02:51:38 UTC
[rocketmq-client-go] branch native updated: feat(producer): auto
create topic for producer (#416)
This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git
The following commit(s) were added to refs/heads/native by this push:
new 1ade23a feat(producer): auto create topic for producer (#416)
1ade23a is described below
commit 1ade23a0854642a561d224dfd4ed9f4b8beeb6c0
Author: xujianhai666 <52...@users.noreply.github.com>
AuthorDate: Mon Feb 24 10:51:28 2020 +0800
feat(producer): auto create topic for producer (#416)
Closes #415
---
examples/producer/simple/main.go | 12 +++++++++---
internal/client.go | 4 ++--
internal/mock_namesrv.go | 32 ++++++++++++++++++++++++++------
internal/namesrv.go | 2 +-
internal/route.go | 38 ++++++++++++++++++++++++++++++--------
primitive/errors.go | 30 ++++++++++++++++++++++++++++++
producer/option.go | 27 ++++++++++++++++++++++-----
producer/producer.go | 11 ++++++++++-
8 files changed, 130 insertions(+), 26 deletions(-)
diff --git a/examples/producer/simple/main.go b/examples/producer/simple/main.go
index 63bc5a1..08b7682 100644
--- a/examples/producer/simple/main.go
+++ b/examples/producer/simple/main.go
@@ -21,6 +21,7 @@ import (
"context"
"fmt"
"os"
+ "strconv"
"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/primitive"
@@ -38,9 +39,14 @@ func main() {
fmt.Printf("start producer error: %s", err.Error())
os.Exit(1)
}
- for i := 0; i < 1000; i++ {
- res, err := p.SendSync(context.Background(), primitive.NewMessage("test",
- []byte("Hello RocketMQ Go Client!")))
+ topic := "test"
+
+ for i := 0; i < 10; i++ {
+ msg := &primitive.Message{
+ Topic: topic,
+ Body: []byte("Hello RocketMQ Go Client! " + strconv.Itoa(i)),
+ }
+ res, err := p.SendSync(context.Background(), msg)
if err != nil {
fmt.Printf("send message error: %s\n", err)
diff --git a/internal/client.go b/internal/client.go
index f810b3a..72debb0 100644
--- a/internal/client.go
+++ b/internal/client.go
@@ -499,7 +499,7 @@ func (c *rmqClient) UpdateTopicRouteInfo() {
return true
})
for topic := range publishTopicSet {
- data, changed := c.namesrvs.UpdateTopicRouteInfo(topic)
+ data, changed, _ := c.namesrvs.UpdateTopicRouteInfo(topic)
c.UpdatePublishInfo(topic, data, changed)
}
@@ -514,7 +514,7 @@ func (c *rmqClient) UpdateTopicRouteInfo() {
})
for topic := range subscribedTopicSet {
- data, changed := c.namesrvs.UpdateTopicRouteInfo(topic)
+ data, changed, _ := c.namesrvs.UpdateTopicRouteInfo(topic)
c.updateSubscribeInfo(topic, data, changed)
}
}
diff --git a/internal/mock_namesrv.go b/internal/mock_namesrv.go
index a7a0842..7f9bfd6 100644
--- a/internal/mock_namesrv.go
+++ b/internal/mock_namesrv.go
@@ -21,11 +21,10 @@ limitations under the License.
package internal
import (
- "reflect"
+ reflect "reflect"
- "github.com/golang/mock/gomock"
-
- "github.com/apache/rocketmq-client-go/v2/primitive"
+ primitive "github.com/apache/rocketmq-client-go/v2/primitive"
+ gomock "github.com/golang/mock/gomock"
)
// MockNamesrvs is a mock of Namesrvs interface
@@ -53,49 +52,59 @@ func (m *MockNamesrvs) EXPECT() *MockNamesrvsMockRecorder {
// UpdateNameServerAddress mocks base method
func (m *MockNamesrvs) UpdateNameServerAddress(nameServerDomain, instanceName string) {
+ m.ctrl.T.Helper()
m.ctrl.Call(m, "UpdateNameServerAddress", nameServerDomain, instanceName)
}
// UpdateNameServerAddress indicates an expected call of UpdateNameServerAddress
func (mr *MockNamesrvsMockRecorder) UpdateNameServerAddress(nameServerDomain, instanceName interface{}) *gomock.Call {
+ mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateNameServerAddress", reflect.TypeOf((*MockNamesrvs)(nil).UpdateNameServerAddress), nameServerDomain, instanceName)
}
// AddBroker mocks base method
func (m *MockNamesrvs) AddBroker(routeData *TopicRouteData) {
+ m.ctrl.T.Helper()
m.ctrl.Call(m, "AddBroker", routeData)
}
// AddBroker indicates an expected call of AddBroker
func (mr *MockNamesrvsMockRecorder) AddBroker(routeData interface{}) *gomock.Call {
+ mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddBroker", reflect.TypeOf((*MockNamesrvs)(nil).AddBroker), routeData)
}
// cleanOfflineBroker mocks base method
func (m *MockNamesrvs) cleanOfflineBroker() {
+ m.ctrl.T.Helper()
m.ctrl.Call(m, "cleanOfflineBroker")
}
// cleanOfflineBroker indicates an expected call of cleanOfflineBroker
func (mr *MockNamesrvsMockRecorder) cleanOfflineBroker() *gomock.Call {
+ mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "cleanOfflineBroker", reflect.TypeOf((*MockNamesrvs)(nil).cleanOfflineBroker))
}
// UpdateTopicRouteInfo mocks base method
-func (m *MockNamesrvs) UpdateTopicRouteInfo(topic string) (*TopicRouteData, bool) {
+func (m *MockNamesrvs) UpdateTopicRouteInfo(topic string) (*TopicRouteData, bool, error) {
+ m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "UpdateTopicRouteInfo", topic)
ret0, _ := ret[0].(*TopicRouteData)
ret1, _ := ret[1].(bool)
- return ret0, ret1
+ ret2, _ := ret[2].(error)
+ return ret0, ret1, ret2
}
// UpdateTopicRouteInfo indicates an expected call of UpdateTopicRouteInfo
func (mr *MockNamesrvsMockRecorder) UpdateTopicRouteInfo(topic interface{}) *gomock.Call {
+ mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateTopicRouteInfo", reflect.TypeOf((*MockNamesrvs)(nil).UpdateTopicRouteInfo), topic)
}
// FetchPublishMessageQueues mocks base method
func (m *MockNamesrvs) FetchPublishMessageQueues(topic string) ([]*primitive.MessageQueue, error) {
+ m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "FetchPublishMessageQueues", topic)
ret0, _ := ret[0].([]*primitive.MessageQueue)
ret1, _ := ret[1].(error)
@@ -104,11 +113,13 @@ func (m *MockNamesrvs) FetchPublishMessageQueues(topic string) ([]*primitive.Mes
// FetchPublishMessageQueues indicates an expected call of FetchPublishMessageQueues
func (mr *MockNamesrvsMockRecorder) FetchPublishMessageQueues(topic interface{}) *gomock.Call {
+ mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchPublishMessageQueues", reflect.TypeOf((*MockNamesrvs)(nil).FetchPublishMessageQueues), topic)
}
// FindBrokerAddrByTopic mocks base method
func (m *MockNamesrvs) FindBrokerAddrByTopic(topic string) string {
+ m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "FindBrokerAddrByTopic", topic)
ret0, _ := ret[0].(string)
return ret0
@@ -116,11 +127,13 @@ func (m *MockNamesrvs) FindBrokerAddrByTopic(topic string) string {
// FindBrokerAddrByTopic indicates an expected call of FindBrokerAddrByTopic
func (mr *MockNamesrvsMockRecorder) FindBrokerAddrByTopic(topic interface{}) *gomock.Call {
+ mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FindBrokerAddrByTopic", reflect.TypeOf((*MockNamesrvs)(nil).FindBrokerAddrByTopic), topic)
}
// FindBrokerAddrByName mocks base method
func (m *MockNamesrvs) FindBrokerAddrByName(brokerName string) string {
+ m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "FindBrokerAddrByName", brokerName)
ret0, _ := ret[0].(string)
return ret0
@@ -128,11 +141,13 @@ func (m *MockNamesrvs) FindBrokerAddrByName(brokerName string) string {
// FindBrokerAddrByName indicates an expected call of FindBrokerAddrByName
func (mr *MockNamesrvsMockRecorder) FindBrokerAddrByName(brokerName interface{}) *gomock.Call {
+ mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FindBrokerAddrByName", reflect.TypeOf((*MockNamesrvs)(nil).FindBrokerAddrByName), brokerName)
}
// FindBrokerAddressInSubscribe mocks base method
func (m *MockNamesrvs) FindBrokerAddressInSubscribe(brokerName string, brokerId int64, onlyThisBroker bool) *FindBrokerResult {
+ m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "FindBrokerAddressInSubscribe", brokerName, brokerId, onlyThisBroker)
ret0, _ := ret[0].(*FindBrokerResult)
return ret0
@@ -140,11 +155,13 @@ func (m *MockNamesrvs) FindBrokerAddressInSubscribe(brokerName string, brokerId
// FindBrokerAddressInSubscribe indicates an expected call of FindBrokerAddressInSubscribe
func (mr *MockNamesrvsMockRecorder) FindBrokerAddressInSubscribe(brokerName, brokerId, onlyThisBroker interface{}) *gomock.Call {
+ mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FindBrokerAddressInSubscribe", reflect.TypeOf((*MockNamesrvs)(nil).FindBrokerAddressInSubscribe), brokerName, brokerId, onlyThisBroker)
}
// FetchSubscribeMessageQueues mocks base method
func (m *MockNamesrvs) FetchSubscribeMessageQueues(topic string) ([]*primitive.MessageQueue, error) {
+ m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "FetchSubscribeMessageQueues", topic)
ret0, _ := ret[0].([]*primitive.MessageQueue)
ret1, _ := ret[1].(error)
@@ -153,11 +170,13 @@ func (m *MockNamesrvs) FetchSubscribeMessageQueues(topic string) ([]*primitive.M
// FetchSubscribeMessageQueues indicates an expected call of FetchSubscribeMessageQueues
func (mr *MockNamesrvsMockRecorder) FetchSubscribeMessageQueues(topic interface{}) *gomock.Call {
+ mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchSubscribeMessageQueues", reflect.TypeOf((*MockNamesrvs)(nil).FetchSubscribeMessageQueues), topic)
}
// AddrList mocks base method
func (m *MockNamesrvs) AddrList() []string {
+ m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "AddrList")
ret0, _ := ret[0].([]string)
return ret0
@@ -165,5 +184,6 @@ func (m *MockNamesrvs) AddrList() []string {
// AddrList indicates an expected call of AddrList
func (mr *MockNamesrvsMockRecorder) AddrList() *gomock.Call {
+ mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddrList", reflect.TypeOf((*MockNamesrvs)(nil).AddrList))
}
diff --git a/internal/namesrv.go b/internal/namesrv.go
index 9bbfc9c..a06a09e 100644
--- a/internal/namesrv.go
+++ b/internal/namesrv.go
@@ -54,7 +54,7 @@ type Namesrvs interface {
cleanOfflineBroker()
- UpdateTopicRouteInfo(topic string) (routeData *TopicRouteData, changed bool)
+ UpdateTopicRouteInfo(topic string) (routeData *TopicRouteData, changed bool, err error)
FetchPublishMessageQueues(topic string) ([]*primitive.MessageQueue, error)
diff --git a/internal/route.go b/internal/route.go
index d3a5f01..8270a8c 100644
--- a/internal/route.go
+++ b/internal/route.go
@@ -112,23 +112,45 @@ func (info *TopicPublishInfo) fetchQueueIndex() int {
return int(qIndex) % length
}
-func (s *namesrvs) UpdateTopicRouteInfo(topic string) (*TopicRouteData, bool) {
+func (s *namesrvs) UpdateTopicRouteInfo(topic string) (*TopicRouteData, bool, error) {
+ return s.UpdateTopicRouteInfoWithDefault(topic, "", 0)
+}
+
+func (s *namesrvs) UpdateTopicRouteInfoWithDefault(topic string, defaultTopic string, defaultQueueNum int) (*TopicRouteData, bool, error) {
s.lockNamesrv.Lock()
defer s.lockNamesrv.Unlock()
- routeData, err := s.queryTopicRouteInfoFromServer(topic)
+ var (
+ routeData *TopicRouteData
+ err error
+ )
+
+ t := topic
+ if len(defaultTopic) > 0 {
+ t = defaultTopic
+ }
+ routeData, err = s.queryTopicRouteInfoFromServer(t)
+
if err != nil {
rlog.Warning("query topic route from server error", map[string]interface{}{
rlog.LogKeyUnderlayError: err,
})
- return nil, false
}
if routeData == nil {
rlog.Warning("queryTopicRouteInfoFromServer return nil", map[string]interface{}{
rlog.LogKeyTopic: topic,
})
- return nil, false
+ return nil, false, err
+ }
+
+ if len(defaultTopic) > 0 {
+ for _, q := range routeData.QueueDataList {
+ if q.ReadQueueNums > defaultQueueNum {
+ q.ReadQueueNums = defaultQueueNum
+ q.WriteQueueNums = defaultQueueNum
+ }
+ }
}
oldRouteData, exist := s.routeDataMap.Load(topic)
@@ -150,7 +172,7 @@ func (s *namesrvs) UpdateTopicRouteInfo(topic string) (*TopicRouteData, bool) {
}
}
- return routeData.clone(), changed
+ return routeData.clone(), changed, nil
}
func (s *namesrvs) AddBroker(routeData *TopicRouteData) {
@@ -330,13 +352,13 @@ func (s *namesrvs) queryTopicRouteInfoFromServer(topic string) (*TopicRouteData,
rlog.Error("connect to namesrv failed.", map[string]interface{}{
"namesrv": s,
})
- return nil, err
+ return nil, primitive.NewRemotingErr(err.Error())
}
switch response.Code {
case ResSuccess:
if response.Body == nil {
- return nil, errors.New(response.Remark)
+ return nil, primitive.NewMQClientErr(response.Code, response.Remark)
}
routeData := &TopicRouteData{}
@@ -351,7 +373,7 @@ func (s *namesrvs) queryTopicRouteInfoFromServer(topic string) (*TopicRouteData,
case ResTopicNotExist:
return nil, ErrTopicNotExist
default:
- return nil, errors.New(response.Remark)
+ return nil, primitive.NewMQClientErr(response.Code, response.Remark)
}
}
diff --git a/primitive/errors.go b/primitive/errors.go
index 5a53a9f..9b1a88e 100644
--- a/primitive/errors.go
+++ b/primitive/errors.go
@@ -36,3 +36,33 @@ type MQBrokerErr struct {
func (e MQBrokerErr) Error() string {
return "CODE: " + strconv.Itoa(int(e.ResponseCode)) + " DESC: " + e.ErrorMessage
}
+
+func NewRemotingErr(s string) error {
+ return &RemotingErr{s: s}
+}
+
+type RemotingErr struct {
+ s string
+}
+
+func (e *RemotingErr) Error() string {
+ return e.s
+}
+
+func NewMQClientErr(code int16, msg string) error {
+ return &MQClientErr{code: code, msg: msg}
+}
+
+type MQClientErr struct {
+ code int16
+ msg string
+}
+
+func (e MQClientErr) Error() string {
+ return "CODE: " + strconv.Itoa(int(e.code)) + " DESC: " + e.msg
+}
+
+func IsRemotingErr(err error) bool {
+ _, ok := err.(*RemotingErr)
+ return ok
+}
diff --git a/producer/option.go b/producer/option.go
index 6fc4d25..ae5d51f 100644
--- a/producer/option.go
+++ b/producer/option.go
@@ -26,9 +26,11 @@ import (
func defaultProducerOptions() producerOptions {
opts := producerOptions{
- ClientOptions: internal.DefaultClientOptions(),
- Selector: NewHashQueueSelector(),
- SendMsgTimeout: 3 * time.Second,
+ ClientOptions: internal.DefaultClientOptions(),
+ Selector: NewRoundRobinQueueSelector(),
+ SendMsgTimeout: 3 * time.Second,
+ DefaultTopicQueueNums: 4,
+ CreateTopicKey: "TBW102",
}
opts.ClientOptions.GroupName = "DEFAULT_CONSUMER"
return opts
@@ -36,8 +38,11 @@ func defaultProducerOptions() producerOptions {
type producerOptions struct {
internal.ClientOptions
- Selector QueueSelector
- SendMsgTimeout time.Duration
+ Selector QueueSelector
+ SendMsgTimeout time.Duration
+ DefaultTopicQueueNums int
+ CreateTopicKey string // "TBW102" Will be created at broker when isAutoCreateTopicEnable. when topic is not created,
+ // and broker open isAutoCreateTopicEnable, topic will use "TBW102" config to create topic
}
type Option func(*producerOptions)
@@ -109,3 +114,15 @@ func WithCredentials(c primitive.Credentials) Option {
options.ClientOptions.Credentials = c
}
}
+
+func WithDefaultTopicQueueNums(queueNum int) Option {
+ return func(options *producerOptions) {
+ options.DefaultTopicQueueNums = queueNum
+ }
+}
+
+func WithCreateTopicKey(topic string) Option {
+ return func(options *producerOptions) {
+ options.CreateTopicKey = topic
+ }
+}
diff --git a/producer/producer.go b/producer/producer.go
index 5503abf..edddf90 100644
--- a/producer/producer.go
+++ b/producer/producer.go
@@ -342,7 +342,16 @@ func (p *defaultProducer) selectMessageQueue(msg *primitive.Message) *primitive.
v, exist := p.publishInfo.Load(topic)
if !exist {
- data, changed := p.options.Namesrv.UpdateTopicRouteInfo(topic)
+ data, changed, err := p.options.Namesrv.UpdateTopicRouteInfo(topic)
+ if err != nil && primitive.IsRemotingErr(err) {
+ return nil
+ }
+ p.client.UpdatePublishInfo(topic, data, changed)
+ v, exist = p.publishInfo.Load(topic)
+ }
+
+ if !exist {
+ data, changed, _ := p.options.Namesrv.UpdateTopicRouteInfoWithDefault(topic, p.options.CreateTopicKey, p.options.DefaultTopicQueueNums)
p.client.UpdatePublishInfo(topic, data, changed)
v, exist = p.publishInfo.Load(topic)
}