You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by di...@apache.org on 2021/08/18 02:58:47 UTC
[rocketmq-client-go] branch master updated: fix the bug caused in
HA cluster when master broker node is down (#714)
This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 181eb30 fix the bug caused in HA cluster when master broker node is down (#714)
181eb30 is described below
commit 181eb30879c7c037b515d4b6c7a5e7a9b4eaefef
Author: guyinyou <36...@users.noreply.github.com>
AuthorDate: Wed Aug 18 10:58:40 2021 +0800
fix the bug caused in HA cluster when master broker node is down (#714)
---
consumer/push_consumer.go | 6 +++++-
internal/client.go | 3 +++
internal/namesrv.go | 15 +++++++++++++++
internal/route.go | 28 ++++++++++++++++++++++++++++
producer/producer.go | 5 ++++-
5 files changed, 55 insertions(+), 2 deletions(-)
diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index c84ce84..e3e0d32 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -99,8 +99,12 @@ func NewPushConsumer(opts ...Option) (*pushConsumer, error) {
consumeOrderly: defaultOpts.ConsumeOrderly,
fromWhere: defaultOpts.FromWhere,
allocate: defaultOpts.Strategy,
- option: defaultOpts,
namesrv: srvs,
+ option: defaultOpts,
+ }
+ dc.option.ClientOptions.Namesrv, err = internal.GetNamesrv(dc.client.ClientID())
+ if err != nil {
+ return nil, err
}
p := &pushConsumer{
diff --git a/internal/client.go b/internal/client.go
index 3a09ea8..4de3b93 100644
--- a/internal/client.go
+++ b/internal/client.go
@@ -186,6 +186,9 @@ func GetOrNewRocketMQClient(option ClientOptions, callbackCh chan interface{}) R
done: make(chan struct{}),
}
actual, loaded := clientMap.LoadOrStore(client.ClientID(), client)
+ client.namesrvs = GetOrSetNamesrv(client.ClientID(), client.namesrvs)
+ client.namesrvs.bundleClient = actual.(*rmqClient)
+ client.option.Namesrv = client.namesrvs
if !loaded {
client.remoteClient.RegisterRequestFunc(ReqNotifyConsumerIdsChanged, func(req *remote.RemotingCommand, addr net.Addr) *remote.RemotingCommand {
rlog.Info("receive broker's notification to consumer group", map[string]interface{}{
diff --git a/internal/namesrv.go b/internal/namesrv.go
index a47bbc1..7776651 100644
--- a/internal/namesrv.go
+++ b/internal/namesrv.go
@@ -19,6 +19,7 @@ package internal
import (
"errors"
+ "fmt"
"regexp"
"strings"
"sync"
@@ -76,6 +77,8 @@ type namesrvs struct {
// brokerName -> *BrokerData
brokerAddressesMap sync.Map
+ bundleClient *rmqClient
+
// brokerName -> map[string]int32: brokerAddr -> version
brokerVersionMap map[string]map[string]int32
// lock for broker version read/write
@@ -92,9 +95,21 @@ type namesrvs struct {
}
var _ Namesrvs = (*namesrvs)(nil)
+var namesrvMap sync.Map
// NewNamesrv init Namesrv from namesrv addr string.
// addr primitive.NamesrvAddr
+func GetOrSetNamesrv(clientId string, namesrv *namesrvs) *namesrvs {
+ actual, _ := namesrvMap.LoadOrStore(clientId, namesrv)
+ return actual.(*namesrvs)
+}
+func GetNamesrv(clientId string) (*namesrvs, error) {
+ actual, ok := namesrvMap.Load(clientId)
+ if !ok {
+ return nil, fmt.Errorf("the namesrv in instanceName [%s] not found", clientId)
+ }
+ return actual.(*namesrvs), nil
+}
func NewNamesrv(resolver primitive.NsResolver) (*namesrvs, error) {
addr := resolver.Resolve()
if len(addr) == 0 {
diff --git a/internal/route.go b/internal/route.go
index 66be96d..3676cb4 100644
--- a/internal/route.go
+++ b/internal/route.go
@@ -161,6 +161,34 @@ func (s *namesrvs) UpdateTopicRouteInfoWithDefault(topic string, defaultTopic st
}
if changed {
+ if s.bundleClient != nil {
+ s.bundleClient.producerMap.Range(func(key, value interface{}) bool {
+ p := value.(InnerProducer)
+ updated := changed
+ if !updated {
+ updated = p.IsPublishTopicNeedUpdate(topic)
+ }
+ if updated {
+ publishInfo := s.bundleClient.namesrvs.routeData2PublishInfo(topic, routeData)
+ publishInfo.HaveTopicRouterInfo = true
+ p.UpdateTopicPublishInfo(topic, publishInfo)
+ }
+ return true
+ })
+ s.bundleClient.consumerMap.Range(func(key, value interface{}) bool {
+ consumer := value.(InnerConsumer)
+ updated := changed
+ if !updated {
+ updated = consumer.IsSubscribeTopicNeedUpdate(topic)
+ }
+ if updated {
+ consumer.UpdateTopicSubscribeInfo(topic, routeData2SubscribeInfo(topic, routeData))
+ }
+
+ return true
+ })
+ }
+
s.routeDataMap.Store(topic, routeData)
rlog.Info("the topic route info changed", map[string]interface{}{
rlog.LogKeyTopic: topic,
diff --git a/producer/producer.go b/producer/producer.go
index 8ebb660..ef4ea25 100644
--- a/producer/producer.go
+++ b/producer/producer.go
@@ -72,7 +72,10 @@ func NewDefaultProducer(opts ...Option) (*defaultProducer, error) {
options: defaultOpts,
}
producer.client = internal.GetOrNewRocketMQClient(defaultOpts.ClientOptions, producer.callbackCh)
-
+ producer.options.ClientOptions.Namesrv, err = internal.GetNamesrv(producer.client.ClientID())
+ if err != nil {
+ return nil, err
+ }
producer.interceptor = primitive.ChainInterceptors(producer.options.Interceptors...)
return producer, nil