You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ka...@apache.org on 2023/02/03 09:09:39 UTC

[rocketmq-clients] branch master updated: fix telemeter logic (#348)

This is an automated email from the ASF dual-hosted git repository.

kaili pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


The following commit(s) were added to refs/heads/master by this push:
     new 70d43828 fix telemeter logic (#348)
70d43828 is described below

commit 70d438283aa7d87cc38fdb1de64842761d700858
Author: guyinyou <36...@users.noreply.github.com>
AuthorDate: Fri Feb 3 17:09:33 2023 +0800

    fix telemeter logic (#348)
    
    Co-authored-by: guyinyou <gu...@alibaba-inc.com>
---
 golang/client.go | 58 +++++++++++++++++++++-----------------------------------
 1 file changed, 22 insertions(+), 36 deletions(-)

diff --git a/golang/client.go b/golang/client.go
index 41a95796..2b95f2bc 100644
--- a/golang/client.go
+++ b/golang/client.go
@@ -264,6 +264,22 @@ func (cli *defaultClient) getMessageQueues(ctx context.Context, topic string) ([
 	if err != nil {
 		return nil, err
 	}
+
+	// telemeter to all messageQueues
+	endpointsSet := make(map[string]bool)
+	for _, messageQueue := range route {
+		for _, address := range messageQueue.GetBroker().GetEndpoints().GetAddresses() {
+			target := utils.ParseAddress(address)
+			if _, ok := endpointsSet[target]; ok {
+				continue
+			}
+			endpointsSet[target] = true
+			if err = cli.mustSyncSettingsToTargert(target); err != nil {
+				return nil, err
+			}
+		}
+	}
+
 	cli.router.Store(topic, route)
 	return route, nil
 }
@@ -299,14 +315,7 @@ func (cli *defaultClient) getQueryRouteRequest(topic string) *v2.QueryRouteReque
 func (cli *defaultClient) getTotalTargets() []string {
 	endpoints := make([]string, 0)
 	endpointsSet := make(map[string]bool)
-	for _, address := range cli.accessPoint.GetAddresses() {
-		target := utils.ParseAddress(address)
-		if _, ok := endpointsSet[target]; ok {
-			continue
-		}
-		endpointsSet[target] = true
-		endpoints = append(endpoints, target)
-	}
+
 	cli.router.Range(func(_, v interface{}) bool {
 		messageQueues := v.([]*v2.MessageQueue)
 		for _, messageQueue := range messageQueues {
@@ -388,16 +397,9 @@ func (cli *defaultClient) trySyncSettings() {
 	}
 }
 
-func (cli *defaultClient) mustSyncSettings() error {
-	cli.log.Info("start mustSyncSettings")
+func (cli *defaultClient) mustSyncSettingsToTargert(target string) error {
 	command := cli.getSettingsCommand()
-	targets := cli.getTotalTargets()
-	for _, target := range targets {
-		if err := cli.telemeter(target, command); err != nil {
-			return err
-		}
-	}
-	return nil
+	return cli.telemeter(target, command)
 }
 
 func (cli *defaultClient) telemeter(target string, command *v2.TelemetryCommand) error {
@@ -423,27 +425,11 @@ func (cli *defaultClient) startUp() error {
 	cm.RegisterClient(cli)
 	cli.clientManager = cm
 	for _, topic := range cli.initTopics {
-		maxAttempts := int(cli.settings.GetRetryPolicy().GetMaxAttempts())
-		for i := 0; i < maxAttempts; i++ {
-			_, err := cli.getMessageQueues(context.Background(), topic)
-			if err != nil {
-				if i == maxAttempts-1 {
-					return fmt.Errorf("failed to get topic route data result from remote during client startup, clientId=%s, topics=%v, err=%v", cli.clientID, cli.initTopics, err)
-				} else {
-					cli.log.Errorf("failed to get topic route data result from remote during client startup, topics=%v, err=%v. retry attempt=%d", cli.initTopics, err, i)
-					time.Sleep(time.Second * 3)
-				}
-			} else {
-				if i > 0 {
-					cli.log.Infof("retry to get topic route data success, attempts=%d\n", i)
-				}
-				break
-			}
+		_, err := cli.getMessageQueues(context.Background(), topic)
+		if err != nil {
+			return fmt.Errorf("failed to get topic route data result from remote during client startup, clientId=%s, topics=%v, err=%v", cli.clientID, cli.initTopics, err)
 		}
 	}
-	if err := cli.mustSyncSettings(); err != nil {
-		return err
-	}
 	f := func() {
 		cli.router.Range(func(k, v interface{}) bool {
 			topic := k.(string)