You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pegasus.apache.org by ji...@apache.org on 2022/06/16 07:34:13 UTC
[incubator-pegasus] 18/25: update postion
This is an automated email from the ASF dual-hosted git repository.
jiashuo pushed a commit to branch add-table-migrator
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
commit 20a34edcae0d19d987f35009a5d0a331ae5c79b4
Author: jiashuo <js...@live.com>
AuthorDate: Thu Jun 16 12:06:23 2022 +0800
update postion
---
admin-cli/cmd/table_migrator.go | 3 ++-
admin-cli/executor/toolkits/tablemigrator/migrator.go | 15 ++++++++++-----
2 files changed, 12 insertions(+), 6 deletions(-)
diff --git a/admin-cli/cmd/table_migrator.go b/admin-cli/cmd/table_migrator.go
index a32910f3..2821cc9e 100644
--- a/admin-cli/cmd/table_migrator.go
+++ b/admin-cli/cmd/table_migrator.go
@@ -18,11 +18,12 @@ func init() {
"the path of meatproxy, if you don't specify it, that is means user need manual-switch the table addrs")
f.String("c", "cluster", "", "target cluster name")
f.String("m", "meta", "", "target meta list")
+ f.Float64("p", "threshold", 100000, "pending mutation throshold when server will reject all write request")
},
Run: func(c *grumble.Context) error {
return tablemigrator.MigrateTable(pegasusClient, c.Flags.String("table"),
c.Flags.String("node"), c.Flags.String("root"),
- c.Flags.String("cluster"), c.Flags.String("meta"))
+ c.Flags.String("cluster"), c.Flags.String("meta"), c.Flags.Float64("threshold"))
},
})
}
diff --git a/admin-cli/executor/toolkits/tablemigrator/migrator.go b/admin-cli/executor/toolkits/tablemigrator/migrator.go
index da4fe650..b76df808 100644
--- a/admin-cli/executor/toolkits/tablemigrator/migrator.go
+++ b/admin-cli/executor/toolkits/tablemigrator/migrator.go
@@ -11,7 +11,12 @@ import (
"github.com/pegasus-kv/collector/aggregate"
)
-func MigrateTable(client *executor.Client, table string, metaProxyZkAddrs string, metaProxyZkRoot string, targetCluster string, targetAddrs string) error {
+var pendingMutationThreshold = 100000.0
+
+func MigrateTable(client *executor.Client, table string, metaProxyZkAddrs string, metaProxyZkRoot string, targetCluster string, targetAddrs string, threshold float64) error {
+ pendingMutationThreshold = threshold
+ toolkits.LogInfo(fmt.Sprintf("set pendingMutationThreshold = %f means if the pending less the value will "+
+ "reject all write and ready to switch cluster", pendingMutationThreshold))
//1. check data version
toolkits.LogInfo("check data version")
version, err := executor.QueryReplicaDataVersion(client, table)
@@ -46,7 +51,7 @@ func MigrateTable(client *executor.Client, table string, metaProxyZkAddrs string
}
perfSessions[n.CombinedAddr()] = perf
}
- err = checkUnConfirmedDecree(perfSessions, 5000)
+ err = checkUnConfirmedDecree(perfSessions)
if err != nil {
return err
}
@@ -82,7 +87,7 @@ func MigrateTable(client *executor.Client, table string, metaProxyZkAddrs string
return nil
}
-func checkUnConfirmedDecree(perfSessions map[string]*aggregate.PerfSession, threshold float64) error {
+func checkUnConfirmedDecree(perfSessions map[string]*aggregate.PerfSession) error {
completed := false
for !completed {
completed = true
@@ -96,14 +101,14 @@ func checkUnConfirmedDecree(perfSessions map[string]*aggregate.PerfSession, thre
return fmt.Errorf("get pending_mutations_count perfcounter size must be 1, but now is %d", len(stats))
}
- if stats[0].Value > threshold {
+ if stats[0].Value > pendingMutationThreshold {
completed = false
toolkits.LogInfo(fmt.Sprintf("%s has pending_mutations_count %f", addr, stats[0].Value))
break
}
}
}
- toolkits.LogInfo(fmt.Sprintf("all the node pending_mutations_count has less %f", threshold))
+ toolkits.LogInfo(fmt.Sprintf("all the node pending_mutations_count has less %f", pendingMutationThreshold))
time.Sleep(10 * time.Second)
return nil
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pegasus.apache.org
For additional commands, e-mail: commits-help@pegasus.apache.org