You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by we...@apache.org on 2021/07/01 13:14:46 UTC

[rocketmq-client-go] branch master updated: [ISSUE #631] Support Consuming from Slave

This is an automated email from the ASF dual-hosted git repository.

wenfeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 76dd050  [ISSUE #631] Support Consuming from Slave
76dd050 is described below

commit 76dd050fbbd06d59c5fb3e8fe5ab187521167436
Author: 张旭 <ma...@gmail.com>
AuthorDate: Thu Jul 1 21:14:40 2021 +0800

    [ISSUE #631] Support Consuming from Slave
    
    Co-authored-by: zhangxu16 <zh...@xiaomi.com>
---
 internal/route.go      | 48 ++++++++++++++++++++++++++++++++-------------
 internal/route_test.go | 53 ++++++++++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 87 insertions(+), 14 deletions(-)

diff --git a/internal/route.go b/internal/route.go
index 09b6e53..7b27c9d 100644
--- a/internal/route.go
+++ b/internal/route.go
@@ -219,10 +219,12 @@ func (s *namesrvs) FindBrokerAddrByName(brokerName string) string {
 func (s *namesrvs) FindBrokerAddressInSubscribe(brokerName string, brokerId int64, onlyThisBroker bool) *FindBrokerResult {
 	var (
 		brokerAddr = ""
-		//slave      = false
-		//found      = false
+		slave      = false
+		found      = false
 	)
 
+	rlog.Debug("broker id "+strconv.FormatInt(brokerId, 10), nil)
+
 	v, exist := s.brokerAddressesMap.Load(brokerName)
 
 	if !exist {
@@ -234,22 +236,40 @@ func (s *namesrvs) FindBrokerAddressInSubscribe(brokerName string, brokerId int6
 	}
 
 	brokerAddr = data.BrokerAddresses[brokerId]
-	//for k, v := range data.BrokerAddresses {
-	//	if v != "" {
-	//		found = true
-	//		if k != MasterId {
-	//			slave = true
-	//		}
-	//		brokerAddr = v
-	//		break
-	//	}
-	//}
+	slave = brokerId != MasterId
+	if brokerAddr != "" {
+		found = true
+	}
+
+	// not found && read from slave, try again use next brokerId
+	if !found && slave {
+		rlog.Debug("Not found broker addr and slave "+strconv.FormatBool(slave), nil)
+		brokerAddr = data.BrokerAddresses[brokerId+1]
+		found = brokerAddr != ""
+	}
+
+	// still not found && cloud use other broker addr, find anyone in BrokerAddresses
+	if !found && !onlyThisBroker {
+		rlog.Debug("STILL Not found broker addr", nil)
+		for k, v := range data.BrokerAddresses {
+			if v != "" {
+				brokerAddr = v
+				found = true
+				slave = k != MasterId
+				break
+			}
+		}
+	}
+
+	if found {
+		rlog.Debug("Find broker addr "+brokerAddr, nil)
+	}
 
 	var result *FindBrokerResult
-	if brokerAddr != "" {
+	if found {
 		result = &FindBrokerResult{
 			BrokerAddr:    brokerAddr,
-			Slave:         brokerId != 0,
+			Slave:         slave,
 			BrokerVersion: s.findBrokerVersion(brokerName, brokerAddr),
 		}
 	}
diff --git a/internal/route_test.go b/internal/route_test.go
index c9b65f0..ded7780 100644
--- a/internal/route_test.go
+++ b/internal/route_test.go
@@ -82,3 +82,56 @@ func TestAddBrokerVersion(t *testing.T) {
 	v = s.findBrokerVersion("b1", "addr2")
 	assert.Equal(t, v, int32(0))
 }
+
+func TestFindBrokerAddressInSubscribe(t *testing.T) {
+	s := &namesrvs{}
+	s.brokerVersionMap = make(map[string]map[string]int32, 0)
+	s.brokerLock = new(sync.RWMutex)
+
+	brokerDataRaft1 := &BrokerData{
+		Cluster:    "cluster",
+		BrokerName: "raft01",
+		BrokerAddresses: map[int64]string{
+			0: "127.0.0.1:10911",
+			1: "127.0.0.1:10912",
+			2: "127.0.0.1:10913",
+		},
+	}
+	s.brokerAddressesMap.Store(brokerDataRaft1.BrokerName, brokerDataRaft1)
+	brokerDataRaft2 := &BrokerData{
+		Cluster:    "cluster",
+		BrokerName: "raft02",
+		BrokerAddresses: map[int64]string{
+			0: "127.0.0.1:10911",
+			2: "127.0.0.1:10912",
+			3: "127.0.0.1:10913",
+		},
+	}
+	s.brokerAddressesMap.Store(brokerDataRaft2.BrokerName, brokerDataRaft2)
+
+	Convey("Request master broker", t, func() {
+		result := s.FindBrokerAddressInSubscribe(brokerDataRaft1.BrokerName, 0, false)
+		assert.NotNil(t, result)
+		assert.Equal(t, result.BrokerAddr, brokerDataRaft1.BrokerAddresses[0])
+		assert.Equal(t, result.Slave, false)
+	})
+
+	Convey("Request slave broker from normal broker group", t, func() {
+		result := s.FindBrokerAddressInSubscribe(brokerDataRaft1.BrokerName, 1, false)
+		assert.NotNil(t, result)
+		assert.Equal(t, result.BrokerAddr, brokerDataRaft1.BrokerAddresses[1])
+		assert.Equal(t, result.Slave, true)
+	})
+
+	Convey("Request slave broker from non normal broker group", t, func() {
+		result := s.FindBrokerAddressInSubscribe(brokerDataRaft2.BrokerName, 1, false)
+		assert.NotNil(t, result)
+		assert.Equal(t, result.BrokerAddr, brokerDataRaft2.BrokerAddresses[2])
+		assert.Equal(t, result.Slave, true)
+	})
+
+	Convey("Request not exist broker", t, func() {
+		result := s.FindBrokerAddressInSubscribe(brokerDataRaft1.BrokerName, 4, false)
+		assert.NotNil(t, result)
+	})
+}