You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by we...@apache.org on 2021/07/01 13:14:46 UTC
[rocketmq-client-go] branch master updated: [ISSUE #631] Support
Consuming from Slave
This is an automated email from the ASF dual-hosted git repository.
wenfeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 76dd050 [ISSUE #631] Support Consuming from Slave
76dd050 is described below
commit 76dd050fbbd06d59c5fb3e8fe5ab187521167436
Author: 张旭 <ma...@gmail.com>
AuthorDate: Thu Jul 1 21:14:40 2021 +0800
[ISSUE #631] Support Consuming from Slave
Co-authored-by: zhangxu16 <zh...@xiaomi.com>
---
internal/route.go | 48 ++++++++++++++++++++++++++++++++-------------
internal/route_test.go | 53 ++++++++++++++++++++++++++++++++++++++++++++++++++
2 files changed, 87 insertions(+), 14 deletions(-)
diff --git a/internal/route.go b/internal/route.go
index 09b6e53..7b27c9d 100644
--- a/internal/route.go
+++ b/internal/route.go
@@ -219,10 +219,12 @@ func (s *namesrvs) FindBrokerAddrByName(brokerName string) string {
func (s *namesrvs) FindBrokerAddressInSubscribe(brokerName string, brokerId int64, onlyThisBroker bool) *FindBrokerResult {
var (
brokerAddr = ""
- //slave = false
- //found = false
+ slave = false
+ found = false
)
+ rlog.Debug("broker id "+strconv.FormatInt(brokerId, 10), nil)
+
v, exist := s.brokerAddressesMap.Load(brokerName)
if !exist {
@@ -234,22 +236,40 @@ func (s *namesrvs) FindBrokerAddressInSubscribe(brokerName string, brokerId int6
}
brokerAddr = data.BrokerAddresses[brokerId]
- //for k, v := range data.BrokerAddresses {
- // if v != "" {
- // found = true
- // if k != MasterId {
- // slave = true
- // }
- // brokerAddr = v
- // break
- // }
- //}
+ slave = brokerId != MasterId
+ if brokerAddr != "" {
+ found = true
+ }
+
+ // not found && read from slave, try again use next brokerId
+ if !found && slave {
+ rlog.Debug("Not found broker addr and slave "+strconv.FormatBool(slave), nil)
+ brokerAddr = data.BrokerAddresses[brokerId+1]
+ found = brokerAddr != ""
+ }
+
+ // still not found && cloud use other broker addr, find anyone in BrokerAddresses
+ if !found && !onlyThisBroker {
+ rlog.Debug("STILL Not found broker addr", nil)
+ for k, v := range data.BrokerAddresses {
+ if v != "" {
+ brokerAddr = v
+ found = true
+ slave = k != MasterId
+ break
+ }
+ }
+ }
+
+ if found {
+ rlog.Debug("Find broker addr "+brokerAddr, nil)
+ }
var result *FindBrokerResult
- if brokerAddr != "" {
+ if found {
result = &FindBrokerResult{
BrokerAddr: brokerAddr,
- Slave: brokerId != 0,
+ Slave: slave,
BrokerVersion: s.findBrokerVersion(brokerName, brokerAddr),
}
}
diff --git a/internal/route_test.go b/internal/route_test.go
index c9b65f0..ded7780 100644
--- a/internal/route_test.go
+++ b/internal/route_test.go
@@ -82,3 +82,56 @@ func TestAddBrokerVersion(t *testing.T) {
v = s.findBrokerVersion("b1", "addr2")
assert.Equal(t, v, int32(0))
}
+
+func TestFindBrokerAddressInSubscribe(t *testing.T) {
+ s := &namesrvs{}
+ s.brokerVersionMap = make(map[string]map[string]int32, 0)
+ s.brokerLock = new(sync.RWMutex)
+
+ brokerDataRaft1 := &BrokerData{
+ Cluster: "cluster",
+ BrokerName: "raft01",
+ BrokerAddresses: map[int64]string{
+ 0: "127.0.0.1:10911",
+ 1: "127.0.0.1:10912",
+ 2: "127.0.0.1:10913",
+ },
+ }
+ s.brokerAddressesMap.Store(brokerDataRaft1.BrokerName, brokerDataRaft1)
+ brokerDataRaft2 := &BrokerData{
+ Cluster: "cluster",
+ BrokerName: "raft02",
+ BrokerAddresses: map[int64]string{
+ 0: "127.0.0.1:10911",
+ 2: "127.0.0.1:10912",
+ 3: "127.0.0.1:10913",
+ },
+ }
+ s.brokerAddressesMap.Store(brokerDataRaft2.BrokerName, brokerDataRaft2)
+
+ Convey("Request master broker", t, func() {
+ result := s.FindBrokerAddressInSubscribe(brokerDataRaft1.BrokerName, 0, false)
+ assert.NotNil(t, result)
+ assert.Equal(t, result.BrokerAddr, brokerDataRaft1.BrokerAddresses[0])
+ assert.Equal(t, result.Slave, false)
+ })
+
+ Convey("Request slave broker from normal broker group", t, func() {
+ result := s.FindBrokerAddressInSubscribe(brokerDataRaft1.BrokerName, 1, false)
+ assert.NotNil(t, result)
+ assert.Equal(t, result.BrokerAddr, brokerDataRaft1.BrokerAddresses[1])
+ assert.Equal(t, result.Slave, true)
+ })
+
+ Convey("Request slave broker from non normal broker group", t, func() {
+ result := s.FindBrokerAddressInSubscribe(brokerDataRaft2.BrokerName, 1, false)
+ assert.NotNil(t, result)
+ assert.Equal(t, result.BrokerAddr, brokerDataRaft2.BrokerAddresses[2])
+ assert.Equal(t, result.Slave, true)
+ })
+
+ Convey("Request not exist broker", t, func() {
+ result := s.FindBrokerAddressInSubscribe(brokerDataRaft1.BrokerName, 4, false)
+ assert.NotNil(t, result)
+ })
+}