You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pegasus.apache.org by ji...@apache.org on 2022/06/16 07:33:58 UTC

[incubator-pegasus] 03/25: update

This is an automated email from the ASF dual-hosted git repository.

jiashuo pushed a commit to branch add-table-migrator
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git

commit f76837d713eb11e40512b380e0fa45b3e3cd734f
Author: jiashuo <js...@live.com>
AuthorDate: Tue Jun 14 15:10:57 2022 +0800

    update
---
 admin-cli/cmd/table_env.go                         |  1 +
 admin-cli/cmd/table_migrator.go                    | 28 +++++++++
 admin-cli/executor/data_version.go                 |  5 +-
 admin-cli/executor/server_config.go                |  2 +-
 .../executor/toolkits/tablemigrator/migrator.go    | 29 +++++----
 .../meta_proxy.go => tablemigrator/switcher.go}    | 68 +++-------------------
 admin-cli/util/http_client.go                      |  7 ++-
 7 files changed, 57 insertions(+), 83 deletions(-)

diff --git a/admin-cli/cmd/table_env.go b/admin-cli/cmd/table_env.go
index ba9bcf94..cd10eef1 100644
--- a/admin-cli/cmd/table_env.go
+++ b/admin-cli/cmd/table_env.go
@@ -31,6 +31,7 @@ import (
 var predefinedAppEnvKeys = []string{
 	"rocksdb.usage_scenario",
 	"replica.deny_client_write",
+	"replica.deny_client_request",
 	"replica.write_throttling",
 	"replica.write_throttling_by_size",
 	"default_ttl",
diff --git a/admin-cli/cmd/table_migrator.go b/admin-cli/cmd/table_migrator.go
new file mode 100644
index 00000000..0b4d13cf
--- /dev/null
+++ b/admin-cli/cmd/table_migrator.go
@@ -0,0 +1,28 @@
+package cmd
+
+import (
+	"github.com/apache/incubator-pegasus/admin-cli/executor/toolkits/tablemigrator"
+	"github.com/apache/incubator-pegasus/admin-cli/shell"
+	"github.com/desertbit/grumble"
+)
+
+func init() {
+	shell.AddCommand(&grumble.Command{
+		Name: "table-migrator",
+		Help: "migrate table from current cluster to another via table duplication and metaproxy",
+		Run: func(c *grumble.Context) error {
+			return tablemigrator.MigrateTable(pegasusClient, c.Flags.String("table"),
+				c.Flags.String("node"), c.Flags.String("root"),
+				c.Flags.String("cluster"), c.Flags.String("meta"))
+		},
+		Flags: func(f *grumble.Flags) {
+			f.String("t", "table", "", "table name")
+			f.String("n", "node", "", "zk node: addrs:port, default equal with peagsus "+
+				"cluster zk addrs, you can use `cluster-info` to show it")
+			f.String("r", "root", "", "zk root path. the tool will update table addrs in "+
+				"the path of meatproxy, if you don't specify it, that is means user need manual-switch the table addrs")
+			f.String("c", "cluster", "", "target cluster name")
+			f.String("m", "meta", "", "target meta list")
+		},
+	})
+}
diff --git a/admin-cli/executor/data_version.go b/admin-cli/executor/data_version.go
index 97c9d8c9..22138093 100644
--- a/admin-cli/executor/data_version.go
+++ b/admin-cli/executor/data_version.go
@@ -3,6 +3,7 @@ package executor
 import (
 	"encoding/json"
 	"fmt"
+
 	"github.com/apache/incubator-pegasus/admin-cli/util"
 	"github.com/apache/incubator-pegasus/go-client/session"
 )
@@ -53,7 +54,7 @@ func QueryReplicaDataVersion(client *Client, table string) (*TableDataVersion, e
 			if version.DataVersion == finalVersion {
 				continue
 			} else {
-				return nil, fmt.Errorf("replica versions are not consistent!")
+				return nil, fmt.Errorf("replica versions are not consistent")
 			}
 		}
 	}
@@ -61,6 +62,6 @@ func QueryReplicaDataVersion(client *Client, table string) (*TableDataVersion, e
 }
 
 func getTableDataVersion(addr string, args util.Arguments) (string, error) {
-	url := fmt.Sprintf("http://%s/replica/data_version?%s=%s", args.Name, args.Value)
+	url := fmt.Sprintf("http://%s/replica/data_version?%s=%s", addr, args.Name, args.Value)
 	return util.CallHTTPGet(url)
 }
diff --git a/admin-cli/executor/server_config.go b/admin-cli/executor/server_config.go
index e857d702..062d53d9 100644
--- a/admin-cli/executor/server_config.go
+++ b/admin-cli/executor/server_config.go
@@ -33,7 +33,7 @@ import (
 type printResponse func(nodeType session.NodeType, sortedNodeList []string, resp map[string]*util.Result)
 
 type action struct {
-	request util.HttpRequest
+	request util.HTTPRequestFunc
 	print   printResponse
 }
 
diff --git a/admin-cli/executor/toolkits/tablemigrator/migrator.go b/admin-cli/executor/toolkits/tablemigrator/migrator.go
index e6b0f7a3..8a3d2a42 100644
--- a/admin-cli/executor/toolkits/tablemigrator/migrator.go
+++ b/admin-cli/executor/toolkits/tablemigrator/migrator.go
@@ -2,25 +2,16 @@ package tablemigrator
 
 import (
 	"fmt"
+	"time"
+
 	"github.com/apache/incubator-pegasus/admin-cli/executor"
 	"github.com/apache/incubator-pegasus/admin-cli/executor/toolkits"
-	"github.com/apache/incubator-pegasus/admin-cli/executor/toolkits/metaproxy"
 	"github.com/apache/incubator-pegasus/admin-cli/util"
 	"github.com/apache/incubator-pegasus/go-client/session"
 	"github.com/pegasus-kv/collector/aggregate"
-	"time"
 )
 
-/**
-1. check data version
-2. create table duplication
-3. check confirm decree if < 5k
-4. set env config deny write request
-5. check duplicate qps decree if == 0
-6. switch table env addrs
-*/
-
-func MigrateTable(client *executor.Client, table string, toCluster string) error {
+func MigrateTable(client *executor.Client, table string, metaProxyZkAddrs string, metaProxyZkRoot string, targetCluster string, targetAddrs string) error {
 	//1. check data version
 	version, err := executor.QueryReplicaDataVersion(client, table)
 	if err != nil {
@@ -31,7 +22,7 @@ func MigrateTable(client *executor.Client, table string, toCluster string) error
 	}
 
 	//2. create data version
-	err = executor.AddDuplication(client, table, toCluster, false)
+	err = executor.AddDuplication(client, table, targetCluster, false)
 	if err != nil {
 		return err
 	}
@@ -59,12 +50,16 @@ func MigrateTable(client *executor.Client, table string, toCluster string) error
 	if err != nil {
 		return err
 	}
-	err = checkDuplicateQPS(perfSessions, resp.AppID)
+	err = checkDuplicatingQPS(perfSessions, resp.AppID)
 	if err != nil {
 		return err
 	}
 	//6. switch table addrs in metaproxy
-	err = metaproxy.SwitchMetaAddrs(client, "", "", "", "")
+	if metaProxyZkRoot == "" {
+		toolkits.LogWarn("you don't specify enough meta proxy info, please manual-switch the table cluster!")
+		return nil
+	}
+	err = SwitchMetaAddrs(client, metaProxyZkAddrs, metaProxyZkRoot, table, targetAddrs)
 	if err != nil {
 		return err
 	}
@@ -92,10 +87,11 @@ func checkUnConfirmedDecree(perfSessions []*aggregate.PerfSession, threshold flo
 			}
 		}
 	}
+	toolkits.LogDebug(fmt.Sprintf("all the node pending_mutations_count has less %f", threshold))
 	return nil
 }
 
-func checkDuplicateQPS(perfSessions []*aggregate.PerfSession, tableID int32) error {
+func checkDuplicatingQPS(perfSessions []*aggregate.PerfSession, tableID int32) error {
 	completed := false
 	counter := fmt.Sprintf("duplicate_qps@%d", tableID)
 	for !completed {
@@ -112,5 +108,6 @@ func checkDuplicateQPS(perfSessions []*aggregate.PerfSession, tableID int32) err
 			}
 		}
 	}
+	toolkits.LogDebug("all the node has stop duplicate the pending wal")
 	return nil
 }
diff --git a/admin-cli/executor/toolkits/metaproxy/meta_proxy.go b/admin-cli/executor/toolkits/tablemigrator/switcher.go
similarity index 61%
rename from admin-cli/executor/toolkits/metaproxy/meta_proxy.go
rename to admin-cli/executor/toolkits/tablemigrator/switcher.go
index bc347807..c5dbf55c 100644
--- a/admin-cli/executor/toolkits/metaproxy/meta_proxy.go
+++ b/admin-cli/executor/toolkits/tablemigrator/switcher.go
@@ -1,72 +1,16 @@
-package metaproxy
+package tablemigrator
 
 import (
 	"encoding/json"
 	"fmt"
-	"github.com/apache/incubator-pegasus/admin-cli/executor"
 	"os"
+	"strings"
 	"time"
 
+	"github.com/apache/incubator-pegasus/admin-cli/executor"
 	"github.com/go-zookeeper/zk"
 )
 
-func getTableAddrInMetaProxy(client *executor.Client, zkAddr string, zkRoot string, tableName string) error {
-	cluster, err := client.Meta.QueryClusterInfo()
-	if err != nil {
-		return err
-	}
-
-	if zkAddr == "" {
-		zkAddr = cluster["zookeeper_hosts"]
-	}
-	zkConn, _, err := zk.Connect([]string{zkAddr}, time.Duration(1000*1000*1000))
-	if err != nil {
-		return err
-	}
-	defer zkConn.Close()
-
-	currentRemoteZKInfo, err := ReadZkData(zkConn, zkRoot, tableName)
-	if err != nil {
-		return err
-	}
-	// formats into JSON
-	outputBytes, _ := json.MarshalIndent(currentRemoteZKInfo, "", "  ")
-	fmt.Fprintln(client, string(outputBytes))
-	return nil
-}
-
-func addTableAddrInMetaProxy(client *executor.Client, zkAddr string, zkRoot string, tableName string) error {
-	cluster, err := client.Meta.QueryClusterInfo()
-	if err != nil {
-		return err
-	}
-
-	if zkAddr == "" {
-		zkAddr = cluster["zookeeper_hosts"]
-	}
-	zkConn, _, err := zk.Connect([]string{zkAddr}, time.Duration(1000*1000*1000))
-	if err != nil {
-		return err
-	}
-	defer zkConn.Close()
-
-	clusterName := cluster["cluster_name"]
-	clusterAddr := cluster["meta_servers"]
-	_, _, err = WriteZkData(zkConn, zkRoot, tableName, clusterName, clusterAddr)
-	if err != nil {
-		return err
-	}
-	// formats into JSON
-	tableInfo := MetaProxyTable{
-		ClusterName: clusterName,
-		MetaAddrs:   clusterAddr,
-	}
-
-	outputBytes, _ := json.MarshalIndent(tableInfo, "", "  ")
-	fmt.Fprintln(client, string(outputBytes))
-	return nil
-}
-
 func SwitchMetaAddrs(client *executor.Client, zkAddr string, zkRoot string, tableName string, targetAddrs string) error {
 	cluster, err := client.Meta.QueryClusterInfo()
 	if err != nil {
@@ -94,7 +38,8 @@ func SwitchMetaAddrs(client *executor.Client, zkAddr string, zkRoot string, tabl
 	}
 
 	originMeta := client.Meta
-	targetMeta := executor.NewClient(os.Stdout, []string{}).Meta
+	targetAddrList := strings.Split(targetAddrs, ",")
+	targetMeta := executor.NewClient(os.Stdout, targetAddrList).Meta
 	env := map[string]string{
 		"replica.deny_client_request": "reconfig*all",
 	}
@@ -103,7 +48,7 @@ func SwitchMetaAddrs(client *executor.Client, zkAddr string, zkRoot string, tabl
 	if err != nil {
 		return err
 	}
-	_, _, err = WriteZkData(zkConn, zkRoot, tableName, targetCluster["cluster_name"], targetAddrs)
+	_, updatedZkInfo, err := WriteZkData(zkConn, zkRoot, tableName, targetCluster["cluster_name"], targetAddrs)
 	if err != nil {
 		return err
 	}
@@ -112,6 +57,7 @@ func SwitchMetaAddrs(client *executor.Client, zkAddr string, zkRoot string, tabl
 	if err != nil {
 		return err
 	}
+	fmt.Printf("%s has updated metaproxy addr from %v to %v, current table env is %v", tableName, currentRemoteZKInfo, updatedZkInfo, env)
 	return nil
 }
 
diff --git a/admin-cli/util/http_client.go b/admin-cli/util/http_client.go
index 473c6fb7..3c456d53 100644
--- a/admin-cli/util/http_client.go
+++ b/admin-cli/util/http_client.go
@@ -3,9 +3,10 @@ package util
 import (
 	"context"
 	"fmt"
-	"github.com/go-resty/resty/v2"
 	"sync"
 	"time"
+
+	"github.com/go-resty/resty/v2"
 )
 
 type Arguments struct {
@@ -18,9 +19,9 @@ type Result struct {
 	Err  error
 }
 
-type HttpRequest func(addr string, args Arguments) (string, error)
+type HTTPRequestFunc func(addr string, args Arguments) (string, error)
 
-func BatchCallHTTP(nodes []*PegasusNode, request HttpRequest, args Arguments) map[string]*Result {
+func BatchCallHTTP(nodes []*PegasusNode, request HTTPRequestFunc, args Arguments) map[string]*Result {
 	results := make(map[string]*Result)
 
 	var mu sync.Mutex


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pegasus.apache.org
For additional commands, e-mail: commits-help@pegasus.apache.org