You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pegasus.apache.org by ji...@apache.org on 2022/06/16 07:33:59 UTC
[incubator-pegasus] 04/25: update
This is an automated email from the ASF dual-hosted git repository.
jiashuo pushed a commit to branch add-table-migrator
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
commit 466d46541fb0b20f16e1fa7acc771b317fed3ed9
Author: jiashuo <js...@live.com>
AuthorDate: Tue Jun 14 16:02:12 2022 +0800
update
---
admin-cli/cmd/table_migrator.go | 12 +++++------
admin-cli/executor/data_version.go | 25 ++++++++++++----------
.../executor/toolkits/tablemigrator/migrator.go | 8 +++----
3 files changed, 24 insertions(+), 21 deletions(-)
diff --git a/admin-cli/cmd/table_migrator.go b/admin-cli/cmd/table_migrator.go
index 0b4d13cf..a32910f3 100644
--- a/admin-cli/cmd/table_migrator.go
+++ b/admin-cli/cmd/table_migrator.go
@@ -10,19 +10,19 @@ func init() {
shell.AddCommand(&grumble.Command{
Name: "table-migrator",
Help: "migrate table from current cluster to another via table duplication and metaproxy",
- Run: func(c *grumble.Context) error {
- return tablemigrator.MigrateTable(pegasusClient, c.Flags.String("table"),
- c.Flags.String("node"), c.Flags.String("root"),
- c.Flags.String("cluster"), c.Flags.String("meta"))
- },
Flags: func(f *grumble.Flags) {
f.String("t", "table", "", "table name")
- f.String("n", "node", "", "zk node: addrs:port, default equal with peagsus "+
+ f.String("n", "node", "", "zk node, addrs:port, default equal with peagsus "+
"cluster zk addrs, you can use `cluster-info` to show it")
f.String("r", "root", "", "zk root path. the tool will update table addrs in "+
"the path of meatproxy, if you don't specify it, that is means user need manual-switch the table addrs")
f.String("c", "cluster", "", "target cluster name")
f.String("m", "meta", "", "target meta list")
},
+ Run: func(c *grumble.Context) error {
+ return tablemigrator.MigrateTable(pegasusClient, c.Flags.String("table"),
+ c.Flags.String("node"), c.Flags.String("root"),
+ c.Flags.String("cluster"), c.Flags.String("meta"))
+ },
})
}
diff --git a/admin-cli/executor/data_version.go b/admin-cli/executor/data_version.go
index 22138093..bce23beb 100644
--- a/admin-cli/executor/data_version.go
+++ b/admin-cli/executor/data_version.go
@@ -3,6 +3,7 @@ package executor
import (
"encoding/json"
"fmt"
+ "strconv"
"github.com/apache/incubator-pegasus/admin-cli/util"
"github.com/apache/incubator-pegasus/go-client/session"
@@ -33,32 +34,34 @@ func QueryReplicaDataVersion(client *Client, table string) (*TableDataVersion, e
args := util.Arguments{
Name: "app_id",
- Value: string(resp.AppID),
+ Value: strconv.Itoa(int(resp.AppID)),
}
results := util.BatchCallHTTP(nodes, getTableDataVersion, args)
- var finalVersion string
- var version TableDataVersion
+ var finalVersion TableDataVersion
+ versions := make(map[string]TableDataVersion)
for _, result := range results {
if result.Err != nil {
return nil, result.Err
}
- err := json.Unmarshal([]byte(result.Resp), &version)
+ err := json.Unmarshal([]byte(result.Resp), &versions)
if err != nil {
return nil, err
}
- if finalVersion == "" {
- finalVersion = version.DataVersion
- } else {
- if version.DataVersion == finalVersion {
- continue
+ for _, version := range versions {
+ if finalVersion.DataVersion == "" {
+ finalVersion = version
} else {
- return nil, fmt.Errorf("replica versions are not consistent")
+ if version.DataVersion == finalVersion.DataVersion {
+ continue
+ } else {
+ return nil, fmt.Errorf("replica versions are not consistent")
+ }
}
}
}
- return &version, nil
+ return &finalVersion, nil
}
func getTableDataVersion(addr string, args util.Arguments) (string, error) {
diff --git a/admin-cli/executor/toolkits/tablemigrator/migrator.go b/admin-cli/executor/toolkits/tablemigrator/migrator.go
index 8a3d2a42..33e39fbe 100644
--- a/admin-cli/executor/toolkits/tablemigrator/migrator.go
+++ b/admin-cli/executor/toolkits/tablemigrator/migrator.go
@@ -15,19 +15,19 @@ func MigrateTable(client *executor.Client, table string, metaProxyZkAddrs string
//1. check data version
version, err := executor.QueryReplicaDataVersion(client, table)
if err != nil {
- return nil
+ return err
}
if version.DataVersion != "1" {
- return fmt.Errorf("not support data version = 0 to migrate by duplication")
+ return fmt.Errorf("not support migrate table with data_version = %s by duplication", version.DataVersion)
}
- //2. create data version
+ //2. create table duplication
err = executor.AddDuplication(client, table, targetCluster, false)
if err != nil {
return err
}
- //3. check un-confirm decree is less 5k
+ //3. check un-confirm decree if less 5k
nodes := client.Nodes.GetAllNodes(session.NodeTypeReplica)
var perfSessions []*aggregate.PerfSession
for _, n := range nodes {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pegasus.apache.org
For additional commands, e-mail: commits-help@pegasus.apache.org