You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pegasus.apache.org by ji...@apache.org on 2022/06/16 07:34:12 UTC

[incubator-pegasus] 17/25: update postion

This is an automated email from the ASF dual-hosted git repository.

jiashuo pushed a commit to branch add-table-migrator
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git

commit bacbd7468073e293655105934ac78d10179d392e
Author: jiashuo <js...@live.com>
AuthorDate: Thu Jun 16 11:41:26 2022 +0800

    update postion
---
 admin-cli/executor/toolkits/tablemigrator/migrator.go | 16 ++++++++--------
 1 file changed, 8 insertions(+), 8 deletions(-)

diff --git a/admin-cli/executor/toolkits/tablemigrator/migrator.go b/admin-cli/executor/toolkits/tablemigrator/migrator.go
index 2954ba4e..da4fe650 100644
--- a/admin-cli/executor/toolkits/tablemigrator/migrator.go
+++ b/admin-cli/executor/toolkits/tablemigrator/migrator.go
@@ -32,7 +32,7 @@ func MigrateTable(client *executor.Client, table string, metaProxyZkAddrs string
 	//3. check un-confirm decree if less 5k
 	toolkits.LogInfo("check un-confirm decree if less 5k")
 	nodes := client.Nodes.GetAllNodes(session.NodeTypeReplica)
-	var perfSessions []*aggregate.PerfSession
+	perfSessions := make(map[string]*aggregate.PerfSession)
 	for _, n := range nodes {
 		if n.Session() == nil {
 			return fmt.Errorf("init node failed = %s", n.TCPAddr())
@@ -44,7 +44,7 @@ func MigrateTable(client *executor.Client, table string, metaProxyZkAddrs string
 		if perf.NodeSession == nil {
 			return fmt.Errorf("session err, node=%s", n.TCPAddr())
 		}
-		perfSessions = append(perfSessions, perf)
+		perfSessions[n.CombinedAddr()] = perf
 	}
 	err = checkUnConfirmedDecree(perfSessions, 5000)
 	if err != nil {
@@ -82,12 +82,12 @@ func MigrateTable(client *executor.Client, table string, metaProxyZkAddrs string
 	return nil
 }
 
-func checkUnConfirmedDecree(perfSessions []*aggregate.PerfSession, threshold float64) error {
+func checkUnConfirmedDecree(perfSessions map[string]*aggregate.PerfSession, threshold float64) error {
 	completed := false
 	for !completed {
 		completed = true
 		time.Sleep(10 * time.Second)
-		for _, perf := range perfSessions {
+		for addr, perf := range perfSessions {
 			stats, err := perf.GetPerfCounters("pending_mutations_count")
 			if err != nil {
 				return err
@@ -98,7 +98,7 @@ func checkUnConfirmedDecree(perfSessions []*aggregate.PerfSession, threshold flo
 
 			if stats[0].Value > threshold {
 				completed = false
-				toolkits.LogInfo(fmt.Sprintf("%s has pending_mutations_count %f", perf.Address, stats[0].Value))
+				toolkits.LogInfo(fmt.Sprintf("%s has pending_mutations_count %f", addr, stats[0].Value))
 				break
 			}
 		}
@@ -108,18 +108,18 @@ func checkUnConfirmedDecree(perfSessions []*aggregate.PerfSession, threshold flo
 	return nil
 }
 
-func checkDuplicatingQPS(perfSessions []*aggregate.PerfSession, tableID int32) error {
+func checkDuplicatingQPS(perfSessions map[string]*aggregate.PerfSession, tableID int32) error {
 	completed := false
 	counter := fmt.Sprintf("duplicate_qps@%d", tableID)
 	for !completed {
 		completed = true
 		time.Sleep(10 * time.Second)
-		for _, perf := range perfSessions {
+		for addr, perf := range perfSessions {
 			stats := util.GetPartitionStat(perf, counter)
 			for gpid, qps := range stats {
 				if qps > 0 {
 					completed = false
-					toolkits.LogInfo(fmt.Sprintf("%s[%s] still sending pending mutation %f", perf.Address, gpid, qps))
+					toolkits.LogInfo(fmt.Sprintf("%s[%s] still sending pending mutation %f", addr, gpid, qps))
 					break
 				}
 			}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pegasus.apache.org
For additional commands, e-mail: commits-help@pegasus.apache.org