You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ka...@apache.org on 2023/02/03 09:09:39 UTC
[rocketmq-clients] branch master updated: fix telemeter logic (#348)
This is an automated email from the ASF dual-hosted git repository.
kaili pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push:
new 70d43828 fix telemeter logic (#348)
70d43828 is described below
commit 70d438283aa7d87cc38fdb1de64842761d700858
Author: guyinyou <36...@users.noreply.github.com>
AuthorDate: Fri Feb 3 17:09:33 2023 +0800
fix telemeter logic (#348)
Co-authored-by: guyinyou <gu...@alibaba-inc.com>
---
golang/client.go | 58 +++++++++++++++++++++-----------------------------------
1 file changed, 22 insertions(+), 36 deletions(-)
diff --git a/golang/client.go b/golang/client.go
index 41a95796..2b95f2bc 100644
--- a/golang/client.go
+++ b/golang/client.go
@@ -264,6 +264,22 @@ func (cli *defaultClient) getMessageQueues(ctx context.Context, topic string) ([
if err != nil {
return nil, err
}
+
+ // telemeter to all messageQueues
+ endpointsSet := make(map[string]bool)
+ for _, messageQueue := range route {
+ for _, address := range messageQueue.GetBroker().GetEndpoints().GetAddresses() {
+ target := utils.ParseAddress(address)
+ if _, ok := endpointsSet[target]; ok {
+ continue
+ }
+ endpointsSet[target] = true
+ if err = cli.mustSyncSettingsToTargert(target); err != nil {
+ return nil, err
+ }
+ }
+ }
+
cli.router.Store(topic, route)
return route, nil
}
@@ -299,14 +315,7 @@ func (cli *defaultClient) getQueryRouteRequest(topic string) *v2.QueryRouteReque
func (cli *defaultClient) getTotalTargets() []string {
endpoints := make([]string, 0)
endpointsSet := make(map[string]bool)
- for _, address := range cli.accessPoint.GetAddresses() {
- target := utils.ParseAddress(address)
- if _, ok := endpointsSet[target]; ok {
- continue
- }
- endpointsSet[target] = true
- endpoints = append(endpoints, target)
- }
+
cli.router.Range(func(_, v interface{}) bool {
messageQueues := v.([]*v2.MessageQueue)
for _, messageQueue := range messageQueues {
@@ -388,16 +397,9 @@ func (cli *defaultClient) trySyncSettings() {
}
}
-func (cli *defaultClient) mustSyncSettings() error {
- cli.log.Info("start mustSyncSettings")
+func (cli *defaultClient) mustSyncSettingsToTargert(target string) error {
command := cli.getSettingsCommand()
- targets := cli.getTotalTargets()
- for _, target := range targets {
- if err := cli.telemeter(target, command); err != nil {
- return err
- }
- }
- return nil
+ return cli.telemeter(target, command)
}
func (cli *defaultClient) telemeter(target string, command *v2.TelemetryCommand) error {
@@ -423,27 +425,11 @@ func (cli *defaultClient) startUp() error {
cm.RegisterClient(cli)
cli.clientManager = cm
for _, topic := range cli.initTopics {
- maxAttempts := int(cli.settings.GetRetryPolicy().GetMaxAttempts())
- for i := 0; i < maxAttempts; i++ {
- _, err := cli.getMessageQueues(context.Background(), topic)
- if err != nil {
- if i == maxAttempts-1 {
- return fmt.Errorf("failed to get topic route data result from remote during client startup, clientId=%s, topics=%v, err=%v", cli.clientID, cli.initTopics, err)
- } else {
- cli.log.Errorf("failed to get topic route data result from remote during client startup, topics=%v, err=%v. retry attempt=%d", cli.initTopics, err, i)
- time.Sleep(time.Second * 3)
- }
- } else {
- if i > 0 {
- cli.log.Infof("retry to get topic route data success, attempts=%d\n", i)
- }
- break
- }
+ _, err := cli.getMessageQueues(context.Background(), topic)
+ if err != nil {
+ return fmt.Errorf("failed to get topic route data result from remote during client startup, clientId=%s, topics=%v, err=%v", cli.clientID, cli.initTopics, err)
}
}
- if err := cli.mustSyncSettings(); err != nil {
- return err
- }
f := func() {
cli.router.Range(func(k, v interface{}) bool {
topic := k.(string)