You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pegasus.apache.org by ji...@apache.org on 2022/06/16 07:34:13 UTC

[incubator-pegasus] 18/25: update postion

This is an automated email from the ASF dual-hosted git repository.

jiashuo pushed a commit to branch add-table-migrator
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git

commit 20a34edcae0d19d987f35009a5d0a331ae5c79b4
Author: jiashuo <js...@live.com>
AuthorDate: Thu Jun 16 12:06:23 2022 +0800

    update postion
---
 admin-cli/cmd/table_migrator.go                       |  3 ++-
 admin-cli/executor/toolkits/tablemigrator/migrator.go | 15 ++++++++++-----
 2 files changed, 12 insertions(+), 6 deletions(-)

diff --git a/admin-cli/cmd/table_migrator.go b/admin-cli/cmd/table_migrator.go
index a32910f3..2821cc9e 100644
--- a/admin-cli/cmd/table_migrator.go
+++ b/admin-cli/cmd/table_migrator.go
@@ -18,11 +18,12 @@ func init() {
 				"the path of meatproxy, if you don't specify it, that is means user need manual-switch the table addrs")
 			f.String("c", "cluster", "", "target cluster name")
 			f.String("m", "meta", "", "target meta list")
+			f.Float64("p", "threshold", 100000, "pending mutation throshold when server will reject all write request")
 		},
 		Run: func(c *grumble.Context) error {
 			return tablemigrator.MigrateTable(pegasusClient, c.Flags.String("table"),
 				c.Flags.String("node"), c.Flags.String("root"),
-				c.Flags.String("cluster"), c.Flags.String("meta"))
+				c.Flags.String("cluster"), c.Flags.String("meta"), c.Flags.Float64("threshold"))
 		},
 	})
 }
diff --git a/admin-cli/executor/toolkits/tablemigrator/migrator.go b/admin-cli/executor/toolkits/tablemigrator/migrator.go
index da4fe650..b76df808 100644
--- a/admin-cli/executor/toolkits/tablemigrator/migrator.go
+++ b/admin-cli/executor/toolkits/tablemigrator/migrator.go
@@ -11,7 +11,12 @@ import (
 	"github.com/pegasus-kv/collector/aggregate"
 )
 
-func MigrateTable(client *executor.Client, table string, metaProxyZkAddrs string, metaProxyZkRoot string, targetCluster string, targetAddrs string) error {
+var pendingMutationThreshold = 100000.0
+
+func MigrateTable(client *executor.Client, table string, metaProxyZkAddrs string, metaProxyZkRoot string, targetCluster string, targetAddrs string, threshold float64) error {
+	pendingMutationThreshold = threshold
+	toolkits.LogInfo(fmt.Sprintf("set pendingMutationThreshold = %f means if the pending less the value will "+
+		"reject all write and ready to switch cluster", pendingMutationThreshold))
 	//1. check data version
 	toolkits.LogInfo("check data version")
 	version, err := executor.QueryReplicaDataVersion(client, table)
@@ -46,7 +51,7 @@ func MigrateTable(client *executor.Client, table string, metaProxyZkAddrs string
 		}
 		perfSessions[n.CombinedAddr()] = perf
 	}
-	err = checkUnConfirmedDecree(perfSessions, 5000)
+	err = checkUnConfirmedDecree(perfSessions)
 	if err != nil {
 		return err
 	}
@@ -82,7 +87,7 @@ func MigrateTable(client *executor.Client, table string, metaProxyZkAddrs string
 	return nil
 }
 
-func checkUnConfirmedDecree(perfSessions map[string]*aggregate.PerfSession, threshold float64) error {
+func checkUnConfirmedDecree(perfSessions map[string]*aggregate.PerfSession) error {
 	completed := false
 	for !completed {
 		completed = true
@@ -96,14 +101,14 @@ func checkUnConfirmedDecree(perfSessions map[string]*aggregate.PerfSession, thre
 				return fmt.Errorf("get pending_mutations_count perfcounter size must be 1, but now is %d", len(stats))
 			}
 
-			if stats[0].Value > threshold {
+			if stats[0].Value > pendingMutationThreshold {
 				completed = false
 				toolkits.LogInfo(fmt.Sprintf("%s has pending_mutations_count %f", addr, stats[0].Value))
 				break
 			}
 		}
 	}
-	toolkits.LogInfo(fmt.Sprintf("all the node pending_mutations_count has less %f", threshold))
+	toolkits.LogInfo(fmt.Sprintf("all the node pending_mutations_count has less %f", pendingMutationThreshold))
 	time.Sleep(10 * time.Second)
 	return nil
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pegasus.apache.org
For additional commands, e-mail: commits-help@pegasus.apache.org