You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pegasus.apache.org by ji...@apache.org on 2022/06/28 02:19:58 UTC

[incubator-pegasus] branch master updated: feat(admin-cli): support online-migrate table to another cluster using table-duplication (#1006)

This is an automated email from the ASF dual-hosted git repository.

jiashuo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git


The following commit(s) were added to refs/heads/master by this push:
     new 347f7b907 feat(admin-cli): support online-migrate table to another cluster using table-duplication (#1006)
347f7b907 is described below

commit 347f7b9071c142257ab22be1c5d8e339b53cd27a
Author: Jiashuo <js...@live.com>
AuthorDate: Tue Jun 28 10:19:53 2022 +0800

    feat(admin-cli): support online-migrate table to another cluster using table-duplication (#1006)
---
 admin-cli/cmd/table_env.go                         |   1 +
 admin-cli/cmd/table_migrator.go                    |  48 +++++++
 admin-cli/cmd/table_version.go                     |  36 +++++
 admin-cli/executor/server_config.go                | 109 ++++-----------
 admin-cli/executor/table_version.go                |  89 +++++++++++++
 .../executor/toolkits/tablemigrator/README.md      |  52 ++++++++
 .../executor/toolkits/tablemigrator/migrator.go    | 146 +++++++++++++++++++++
 .../executor/toolkits/tablemigrator/switcher.go    | 130 ++++++++++++++++++
 admin-cli/go.mod                                   |   1 +
 admin-cli/go.sum                                   |   2 +
 admin-cli/util/http_client.go                      |  78 +++++++++++
 11 files changed, 611 insertions(+), 81 deletions(-)

diff --git a/admin-cli/cmd/table_env.go b/admin-cli/cmd/table_env.go
index ba9bcf942..cd10eef18 100644
--- a/admin-cli/cmd/table_env.go
+++ b/admin-cli/cmd/table_env.go
@@ -31,6 +31,7 @@ import (
 var predefinedAppEnvKeys = []string{
 	"rocksdb.usage_scenario",
 	"replica.deny_client_write",
+	"replica.deny_client_request",
 	"replica.write_throttling",
 	"replica.write_throttling_by_size",
 	"default_ttl",
diff --git a/admin-cli/cmd/table_migrator.go b/admin-cli/cmd/table_migrator.go
new file mode 100644
index 000000000..4672e6389
--- /dev/null
+++ b/admin-cli/cmd/table_migrator.go
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package cmd
+
+import (
+	"github.com/apache/incubator-pegasus/admin-cli/executor/toolkits/tablemigrator"
+	"github.com/apache/incubator-pegasus/admin-cli/shell"
+	"github.com/desertbit/grumble"
+)
+
+func init() {
+	shell.AddCommand(&grumble.Command{
+		Name: "table-migrator",
+		Help: "migrate table from current cluster to another via table duplication and metaproxy",
+		Flags: func(f *grumble.Flags) {
+			f.String("t", "table", "", "table name")
+			f.String("n", "node", "", "zk node, addrs:port, default equal with peagsus "+
+				"cluster zk addrs, you can use `cluster-info` to show it")
+			f.String("r", "root", "", "zk root path. the tool will update table addrs in "+
+				"the path of meatproxy, if you don't specify it, that is means user need manual-switch the table addrs")
+			f.String("c", "cluster", "", "target cluster name")
+			f.String("m", "meta", "", "target meta list")
+			f.Float64("p", "threshold", 100000, "pending mutation threshold when server will reject all write request")
+		},
+		Run: func(c *grumble.Context) error {
+			return tablemigrator.MigrateTable(pegasusClient, c.Flags.String("table"),
+				c.Flags.String("node"), c.Flags.String("root"),
+				c.Flags.String("cluster"), c.Flags.String("meta"), c.Flags.Float64("threshold"))
+		},
+	})
+}
diff --git a/admin-cli/cmd/table_version.go b/admin-cli/cmd/table_version.go
new file mode 100644
index 000000000..069649e6c
--- /dev/null
+++ b/admin-cli/cmd/table_version.go
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package cmd
+
+import (
+	"github.com/apache/incubator-pegasus/admin-cli/executor"
+	"github.com/apache/incubator-pegasus/admin-cli/shell"
+	"github.com/desertbit/grumble"
+)
+
+func init() {
+	shell.AddCommand(&grumble.Command{
+		Name: "data-version",
+		Help: "query data version",
+		Run: shell.RequireUseTable(func(c *shell.Context) error {
+			return executor.QueryTableVersion(pegasusClient, c.UseTable)
+		}),
+	})
+}
diff --git a/admin-cli/executor/server_config.go b/admin-cli/executor/server_config.go
index f17cf27cc..062d53d9e 100644
--- a/admin-cli/executor/server_config.go
+++ b/admin-cli/executor/server_config.go
@@ -20,26 +20,20 @@
 package executor
 
 import (
-	"context"
 	"encoding/json"
 	"fmt"
 	"sort"
 	"strings"
-	"sync"
-	"time"
 
 	"github.com/apache/incubator-pegasus/admin-cli/util"
 	"github.com/apache/incubator-pegasus/go-client/session"
-	"github.com/go-resty/resty/v2"
 )
 
-type httpRequest func(addr string, cmd command) (string, error)
-
-// map[*util.PegasusNode]*cmdResult is not sorted, pass nodes is for print sorted result
-type printResponse func(nodeType session.NodeType, sortedNodeList []string, resp map[string]*cmdResult)
+// map[*util.PegasusNode]*util.Result is not sorted, pass nodes is for print sorted result
+type printResponse func(nodeType session.NodeType, sortedNodeList []string, resp map[string]*util.Result)
 
 type action struct {
-	request httpRequest
+	request util.HTTPRequestFunc
 	print   printResponse
 }
 
@@ -55,11 +49,6 @@ var sectionsMap = map[session.NodeType]string{
 	// TODO(jiashuo1) support collector
 }
 
-type command struct {
-	name  string
-	value string
-}
-
 type response struct {
 	Name    string
 	Section string
@@ -67,11 +56,6 @@ type response struct {
 	Value   string
 }
 
-type cmdResult struct {
-	resp string
-	err  error
-}
-
 //TODO(jiashuo1) not support update collector config
 func ConfigCommand(client *Client, nodeType session.NodeType, nodeAddr string, name string, actionType string, value string) error {
 	var nodes []*util.PegasusNode
@@ -87,11 +71,11 @@ func ConfigCommand(client *Client, nodeType session.NodeType, nodeAddr string, n
 	}
 
 	if ac, ok := actionsMap[actionType]; ok {
-		cmd := command{
-			name:  name,
-			value: value,
+		cmd := util.Arguments{
+			Name:  name,
+			Value: value,
 		}
-		results := batchCallHTTP(nodes, ac.request, cmd)
+		results := util.BatchCallHTTP(nodes, ac.request, cmd)
 
 		var sortedNodeList []string
 		for _, n := range nodes {
@@ -106,59 +90,22 @@ func ConfigCommand(client *Client, nodeType session.NodeType, nodeAddr string, n
 	return nil
 }
 
-func batchCallHTTP(nodes []*util.PegasusNode, request httpRequest, cmd command) map[string]*cmdResult {
-	results := make(map[string]*cmdResult)
-
-	var mu sync.Mutex
-	var wg sync.WaitGroup
-	wg.Add(len(nodes))
-	for _, n := range nodes {
-		go func(node *util.PegasusNode) {
-			_, cancel := context.WithTimeout(context.Background(), time.Second*10)
-			defer cancel()
-			result, err := request(node.TCPAddr(), cmd)
-			mu.Lock()
-			if err != nil {
-				results[node.CombinedAddr()] = &cmdResult{err: err}
-			} else {
-				results[node.CombinedAddr()] = &cmdResult{resp: result}
-			}
-			mu.Unlock()
-			wg.Done()
-		}(n)
-	}
-	wg.Wait()
-
-	return results
-}
-
-func callHTTP(url string) (string, error) {
-	resp, err := resty.New().SetTimeout(time.Second * 10).R().Get(url)
-	if err != nil {
-		return "", fmt.Errorf("failed to call \"%s\": %s", url, err)
-	}
-	if resp.StatusCode() != 200 {
-		return "", fmt.Errorf("failed to call \"%s\": code=%d", url, resp.StatusCode())
-	}
-	return string(resp.Body()), nil
-}
-
-func listConfig(addr string, cmd command) (string, error) {
+func listConfig(addr string, cmd util.Arguments) (string, error) {
 	url := fmt.Sprintf("http://%s/configs", addr)
-	return callHTTP(url)
+	return util.CallHTTPGet(url)
 }
 
-func printConfigList(nodeType session.NodeType, sortedNodeList []string, results map[string]*cmdResult) {
+func printConfigList(nodeType session.NodeType, sortedNodeList []string, results map[string]*util.Result) {
 	fmt.Printf("CMD: list \n")
 	for _, node := range sortedNodeList {
 		cmdRes := results[node]
-		if cmdRes.err != nil {
-			fmt.Printf("[%s] %s\n", node, cmdRes.err)
+		if cmdRes.Err != nil {
+			fmt.Printf("[%s] %s\n", node, cmdRes.Err)
 			continue
 		}
 
 		var respMap map[string]response
-		err := json.Unmarshal([]byte(cmdRes.resp), &respMap)
+		err := json.Unmarshal([]byte(cmdRes.Resp), &respMap)
 		if err != nil {
 			fmt.Printf("[%s] %s\n", node, err)
 			continue
@@ -178,24 +125,24 @@ func printConfigList(nodeType session.NodeType, sortedNodeList []string, results
 	}
 }
 
-func getConfig(addr string, cmd command) (string, error) {
-	url := fmt.Sprintf("http://%s/config?name=%s", addr, cmd.name)
-	return callHTTP(url)
+func getConfig(addr string, cmd util.Arguments) (string, error) {
+	url := fmt.Sprintf("http://%s/config?name=%s", addr, cmd.Name)
+	return util.CallHTTPGet(url)
 }
 
-func printConfigValue(nodeType session.NodeType, sortedNodeList []string, results map[string]*cmdResult) {
+func printConfigValue(nodeType session.NodeType, sortedNodeList []string, results map[string]*util.Result) {
 	fmt.Printf("CMD: get \n")
 	for _, node := range sortedNodeList {
 		cmdRes := results[node]
-		if cmdRes.err != nil {
-			fmt.Printf("[%s] %s\n", node, cmdRes.err)
+		if cmdRes.Err != nil {
+			fmt.Printf("[%s] %s\n", node, cmdRes.Err)
 			continue
 		}
 
 		var resp response
-		err := json.Unmarshal([]byte(cmdRes.resp), &resp)
+		err := json.Unmarshal([]byte(cmdRes.Resp), &resp)
 		if err != nil {
-			fmt.Printf("[%s] %s\n", node, cmdRes.resp)
+			fmt.Printf("[%s] %s\n", node, cmdRes.Resp)
 			continue
 		}
 
@@ -208,22 +155,22 @@ func printConfigValue(nodeType session.NodeType, sortedNodeList []string, result
 	}
 }
 
-func updateConfig(addr string, cmd command) (string, error) {
-	url := fmt.Sprintf("http://%s/updateConfig?%s=%s", addr, cmd.name, cmd.value)
-	return callHTTP(url)
+func updateConfig(addr string, cmd util.Arguments) (string, error) {
+	url := fmt.Sprintf("http://%s/updateConfig?%s=%s", addr, cmd.Name, cmd.Value)
+	return util.CallHTTPGet(url)
 }
 
-func printConfigUpdate(nodeType session.NodeType, sortedNodeList []string, results map[string]*cmdResult) {
+func printConfigUpdate(nodeType session.NodeType, sortedNodeList []string, results map[string]*util.Result) {
 	fmt.Printf("CMD: set \n")
 	for _, node := range sortedNodeList {
 		cmdRes := results[node]
-		if cmdRes.err != nil {
-			fmt.Printf("[%s] %s\n", node, cmdRes.err)
+		if cmdRes.Err != nil {
+			fmt.Printf("[%s] %s\n", node, cmdRes.Err)
 			continue
 		}
 
 		var resMap map[string]string
-		err := json.Unmarshal([]byte(cmdRes.resp), &resMap)
+		err := json.Unmarshal([]byte(cmdRes.Resp), &resMap)
 		if err != nil {
 			fmt.Printf("[%s] %s\n", node, err)
 			continue
diff --git a/admin-cli/executor/table_version.go b/admin-cli/executor/table_version.go
new file mode 100644
index 000000000..7ae742c2b
--- /dev/null
+++ b/admin-cli/executor/table_version.go
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package executor
+
+import (
+	"encoding/json"
+	"fmt"
+	"strconv"
+
+	"github.com/apache/incubator-pegasus/admin-cli/util"
+	"github.com/apache/incubator-pegasus/go-client/session"
+)
+
+type TableDataVersion struct {
+	DataVersion string `json:"data_version"`
+}
+
+func QueryTableVersion(client *Client, table string) error {
+	version, err := QueryReplicaDataVersion(client, table)
+	if err != nil {
+		return nil
+	}
+
+	// formats into JSON
+	outputBytes, _ := json.MarshalIndent(version, "", "  ")
+	fmt.Fprintln(client, string(outputBytes))
+	return nil
+}
+
+func QueryReplicaDataVersion(client *Client, table string) (*TableDataVersion, error) {
+	nodes := client.Nodes.GetAllNodes(session.NodeTypeReplica)
+	resp, err := client.Meta.QueryConfig(table)
+	if err != nil {
+		return nil, err
+	}
+
+	args := util.Arguments{
+		Name:  "app_id",
+		Value: strconv.Itoa(int(resp.AppID)),
+	}
+	results := util.BatchCallHTTP(nodes, getTableDataVersion, args)
+
+	var finalVersion TableDataVersion
+	versions := make(map[string]TableDataVersion)
+	for _, result := range results {
+		if result.Err != nil {
+			return nil, result.Err
+		}
+		err := json.Unmarshal([]byte(result.Resp), &versions)
+		if err != nil {
+			return nil, err
+		}
+
+		for _, version := range versions {
+			if finalVersion.DataVersion == "" {
+				finalVersion = version
+			} else {
+				if version.DataVersion == finalVersion.DataVersion {
+					continue
+				} else {
+					return nil, fmt.Errorf("replica versions are not consistent")
+				}
+			}
+		}
+	}
+	return &finalVersion, nil
+}
+
+func getTableDataVersion(addr string, args util.Arguments) (string, error) {
+	url := fmt.Sprintf("http://%s/replica/data_version?%s=%s", addr, args.Name, args.Value)
+	return util.CallHTTPGet(url)
+}
diff --git a/admin-cli/executor/toolkits/tablemigrator/README.md b/admin-cli/executor/toolkits/tablemigrator/README.md
new file mode 100644
index 000000000..7d11cb710
--- /dev/null
+++ b/admin-cli/executor/toolkits/tablemigrator/README.md
@@ -0,0 +1,52 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Table-Migrator
+
+Table-Migrator helps users easily and automatically  online-migrate table from one cluster to another. The client side does not even need to restart and 
+the server side can complete the migration of the entire table and continue to provide services to the client through the new cluster. There are a few points to note: 
+
+
+- Table Migrator depends on the [duplication](https://pegasus.apache.org/administration/duplication) feature of the server, so please upgrade to pegasus-server 2.4
+- Since the `duplication ` only supports pegasus-data with v1, only tables with v1 are supported table migration. Otherwise,  `not support` error will be returned
+- There will be a short write reject time (in minutes level) when migrating data. Please evaluate whether this restrictive measure is tolerated
+- **After the table migration is completed, the tool supports triggering the client to automatically switch to a new cluster. However, this function requires that the client must 
+   access the cluster through [metaproxy](https://github.com/pegasus-kv/meta-proxy). Of course, this function is optional. Users can also manually change the client configuration and restart the client after migration**
+
+The entire table migration process includes the following steps:
+- Check the data version  of the current table. Only v1 supports table data migration. Otherwise, an error is returned.
+- Create `duplication` task for the target cluster. The task will first migrate the chekpoint data, and then start to migrate the incremental data via plog
+- Block waiting until the unsynchronized incremental data drops to a lower level, and then prohibit the write request of the source cluster to fully synchronize all the remaining incremental data
+- Continue to block and wait until the value of synchronization requests decreases to 0, indicating that all incremental data has been synchronized
+- **If you configure metaproxy, the tool will automatically switch the target cluster of the client to a new cluster. Otherwise, it will end directly**
+
+
+# Usage
+
+`-t | --table`: name of the table to be migrated
+
+`-n | --node`: the zookeeper address configured by the metaproxy. If it is not specified, the zookeeper address configured by the current cluster will be used by default. Please check the metaproxy service to confirm the correct address
+
+`-r | -- root`: the zookeeper root path of the metaproxy configuration. If it is not specified, it means that you are not going to use metaproxy to complete the automatic switching of the client cluster
+
+`-c | --cluster`: name of the target cluster
+
+`-m | --meta`: meta address of the target cluster
+
+`-p | --threshold`: the threshold value of the number of remaining incremental data pieces. When the threshold value is reached, write prohibition will be enabled for the source cluster. The default value is 10K
diff --git a/admin-cli/executor/toolkits/tablemigrator/migrator.go b/admin-cli/executor/toolkits/tablemigrator/migrator.go
new file mode 100644
index 000000000..0b062ccae
--- /dev/null
+++ b/admin-cli/executor/toolkits/tablemigrator/migrator.go
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package tablemigrator
+
+import (
+	"fmt"
+	"time"
+
+	"github.com/apache/incubator-pegasus/admin-cli/executor"
+	"github.com/apache/incubator-pegasus/admin-cli/executor/toolkits"
+	"github.com/apache/incubator-pegasus/admin-cli/util"
+	"github.com/apache/incubator-pegasus/go-client/session"
+	"github.com/pegasus-kv/collector/aggregate"
+)
+
+var pendingMutationThreshold = 100000.0
+
+func MigrateTable(client *executor.Client, table string, metaProxyZkAddrs string, metaProxyZkRoot string, targetCluster string, targetAddrs string, threshold float64) error {
+	pendingMutationThreshold = threshold
+	toolkits.LogInfo(fmt.Sprintf("set pendingMutationThreshold = %d means that server will reject all write "+
+		"and ready to switch cluster if the pending less the value", int64(pendingMutationThreshold)))
+	//1. check data version
+	toolkits.LogInfo("check the table data version")
+	version, err := executor.QueryReplicaDataVersion(client, table)
+	if err != nil {
+		return err
+	}
+	if version.DataVersion != "1" {
+		return fmt.Errorf("not support migrate table with data_version = %s by duplication", version.DataVersion)
+	}
+
+	//2. create table duplication
+	toolkits.LogInfo(fmt.Sprintf("create the table duplication to %s", targetCluster))
+	err = executor.AddDuplication(client, table, targetCluster, true)
+	if err != nil {
+		return err
+	}
+
+	//3. check pending mutation count if less `pendingMutationThreshold`
+	toolkits.LogInfo(fmt.Sprintf("check pending mutation count if less %d", int64(pendingMutationThreshold)))
+	nodes := client.Nodes.GetAllNodes(session.NodeTypeReplica)
+	perfSessions := make(map[string]*aggregate.PerfSession)
+	for _, n := range nodes {
+		perf := client.Nodes.GetPerfSession(n.TCPAddr(), session.NodeTypeReplica)
+		perfSessions[n.CombinedAddr()] = perf
+	}
+	err = checkPendingMutationCount(perfSessions)
+	if err != nil {
+		return err
+	}
+	//4. set env config deny write request
+	toolkits.LogInfo("set env config deny write request")
+	var envs = map[string]string{
+		"replica.deny_client_request": "timeout*write",
+	}
+	err = client.Meta.UpdateAppEnvs(table, envs)
+	if err != nil {
+		return err
+	}
+	//5. check duplicate qps if equal 0
+	toolkits.LogInfo("check duplicate qps if equal 0")
+	resp, err := client.Meta.QueryConfig(table)
+	if err != nil {
+		return err
+	}
+	err = checkDuplicationCompleted(perfSessions, resp.AppID)
+	if err != nil {
+		return err
+	}
+	//6. switch table addrs in metaproxy
+	toolkits.LogInfo("switch table addrs in metaproxy")
+	if metaProxyZkRoot == "" {
+		toolkits.LogWarn("can't switch cluster via metaproxy for you don't specify enough meta proxy info, please manual-switch the table cluster!")
+		return nil
+	}
+	err = SwitchMetaAddrs(client, metaProxyZkAddrs, metaProxyZkRoot, table, targetAddrs)
+	if err != nil {
+		return err
+	}
+	return nil
+}
+
+func checkPendingMutationCount(perfSessions map[string]*aggregate.PerfSession) error {
+	completed := false
+	for !completed {
+		completed = true
+		time.Sleep(10 * time.Second)
+		for addr, perf := range perfSessions {
+			stats, err := perf.GetPerfCounters("pending_mutations_count")
+			if err != nil {
+				return err
+			}
+			if len(stats) != 1 {
+				return fmt.Errorf("get pending_mutations_count perfcounter size must be 1, but now is %d", len(stats))
+			}
+
+			if stats[0].Value > pendingMutationThreshold {
+				completed = false
+				toolkits.LogWarn(fmt.Sprintf("%s has pending_mutations_count %d", addr, int64(stats[0].Value)))
+				break
+			}
+		}
+	}
+	toolkits.LogInfo(fmt.Sprintf("all the node pending_mutations_count has less %d", int64(pendingMutationThreshold)))
+	time.Sleep(10 * time.Second)
+	return nil
+}
+
+func checkDuplicationCompleted(perfSessions map[string]*aggregate.PerfSession, tableID int32) error {
+	completed := false
+	counter := fmt.Sprintf("dup_shipped_ops@%d", tableID)
+	for !completed {
+		completed = true
+		time.Sleep(10 * time.Second)
+		for addr, perf := range perfSessions {
+			stats := util.GetPartitionStat(perf, counter)
+			for gpid, qps := range stats {
+				if qps > 0 {
+					completed = false
+					toolkits.LogWarn(fmt.Sprintf("%s[%s] still sending pending mutation %d", addr, gpid, int64(qps)))
+					break
+				}
+			}
+		}
+	}
+	toolkits.LogInfo("all the node has stop duplicate the pending wal and wait 30s to switch cluster")
+	time.Sleep(30 * time.Second)
+	return nil
+}
diff --git a/admin-cli/executor/toolkits/tablemigrator/switcher.go b/admin-cli/executor/toolkits/tablemigrator/switcher.go
new file mode 100644
index 000000000..76c223a98
--- /dev/null
+++ b/admin-cli/executor/toolkits/tablemigrator/switcher.go
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package tablemigrator
+
+import (
+	"encoding/json"
+	"fmt"
+	"os"
+	"strings"
+	"time"
+
+	"github.com/apache/incubator-pegasus/admin-cli/executor"
+	"github.com/apache/incubator-pegasus/admin-cli/executor/toolkits"
+	"github.com/go-zookeeper/zk"
+)
+
+func SwitchMetaAddrs(client *executor.Client, zkAddr string, zkRoot string, tableName string, targetAddrs string) error {
+	cluster, err := client.Meta.QueryClusterInfo()
+	if err != nil {
+		return err
+	}
+
+	if zkAddr == "" {
+		zkAddr = cluster["zookeeper_hosts"]
+	}
+	zkConn, _, err := zk.Connect([]string{zkAddr}, time.Duration(1000*1000*1000))
+	if err != nil {
+		return err
+	}
+	defer zkConn.Close()
+
+	currentRemoteZKInfo, err := ReadZkData(zkConn, zkRoot, tableName)
+	if err != nil {
+		return err
+	}
+
+	currentLocalCluster := cluster["cluster_name"]
+	if currentRemoteZKInfo.ClusterName != currentLocalCluster {
+		return fmt.Errorf("current remote table is not `current local cluster`, remote vs expect= %s : %s",
+			currentRemoteZKInfo.ClusterName, currentLocalCluster)
+	}
+
+	originMeta := client.Meta
+	targetAddrList := strings.Split(targetAddrs, ",")
+	targetMeta := executor.NewClient(os.Stdout, targetAddrList).Meta
+	env := map[string]string{
+		"replica.deny_client_request": "reconfig*all",
+	}
+
+	targetCluster, err := targetMeta.QueryClusterInfo()
+	if err != nil {
+		return err
+	}
+	_, updatedZkInfo, err := WriteZkData(zkConn, zkRoot, tableName, targetCluster["cluster_name"], targetAddrs)
+	if err != nil {
+		return err
+	}
+
+	err = originMeta.UpdateAppEnvs(tableName, env)
+	if err != nil {
+		return err
+	}
+	toolkits.LogInfo(fmt.Sprintf("%s has updated metaproxy addr from %v to %v, current table env is %v\n", tableName, currentRemoteZKInfo, updatedZkInfo, env))
+	return nil
+}
+
+type MetaProxyTable struct {
+	ClusterName string `json:"cluster_name"`
+	MetaAddrs   string `json:"meta_addrs"`
+}
+
+func ReadZkData(zkConn *zk.Conn, root string, table string) (*MetaProxyTable, error) {
+	tablePath := fmt.Sprintf("%s/%s", root, table)
+	exist, _, _ := zkConn.Exists(tablePath)
+	if !exist {
+		return nil, fmt.Errorf("can't find the zk path: %s", tablePath)
+	}
+
+	data, _, err := zkConn.Get(tablePath)
+	if err != nil {
+		return nil, err
+	}
+
+	metaProxyTable := MetaProxyTable{}
+	err = json.Unmarshal(data, &metaProxyTable)
+	if err != nil {
+		return nil, err
+	}
+	return &metaProxyTable, nil
+}
+
+func WriteZkData(zkConn *zk.Conn, root string, table string, cluster string, addrs string) (string, string, error) {
+	zkData := encodeToZkNodeData(cluster, addrs)
+	tablePath := fmt.Sprintf("%s/%s", root, table)
+	exist, stat, _ := zkConn.Exists(tablePath)
+	if !exist {
+		_, err := zkConn.Create(tablePath, zkData, 0, zk.WorldACL(zk.PermAll))
+		if err != nil {
+			return "", "", err
+		}
+	}
+	_, err := zkConn.Set(tablePath, zkData, stat.Version)
+	if err != nil {
+		return "", "", err
+	}
+
+	return tablePath, string(zkData), nil
+}
+
+func encodeToZkNodeData(cluster string, addr string) []byte {
+	data := fmt.Sprintf("{\"cluster_name\": \"%s\", \"meta_addrs\": \"%s\"}", cluster, addr)
+	return []byte(data)
+}
diff --git a/admin-cli/go.mod b/admin-cli/go.mod
index ca023b24a..c630d0c53 100644
--- a/admin-cli/go.mod
+++ b/admin-cli/go.mod
@@ -26,6 +26,7 @@ require (
 	github.com/dustin/go-humanize v1.0.0
 	github.com/fsnotify/fsnotify v1.4.9 // indirect
 	github.com/go-resty/resty/v2 v2.6.0
+	github.com/go-zookeeper/zk v1.0.2
 	github.com/hashicorp/go-multierror v1.1.1 // indirect
 	github.com/kr/pretty v0.2.1 // indirect
 	github.com/magiconair/properties v1.8.4 // indirect
diff --git a/admin-cli/go.sum b/admin-cli/go.sum
index 3feeea67a..ecfba901e 100644
--- a/admin-cli/go.sum
+++ b/admin-cli/go.sum
@@ -161,6 +161,8 @@ github.com/go-resty/resty/v2 v2.6.0 h1:joIR5PNLM2EFqqESUjCMGXrWmXNHEU9CEiK813oKY
 github.com/go-resty/resty/v2 v2.6.0/go.mod h1:PwvJS6hvaPkjtjNg9ph+VrSD92bi5Zq73w/BIH7cC3Q=
 github.com/go-sql-driver/mysql v1.4.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w=
 github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
+github.com/go-zookeeper/zk v1.0.2 h1:4mx0EYENAdX/B/rbunjlt5+4RTA/a9SMHBRuSKdGxPM=
+github.com/go-zookeeper/zk v1.0.2/go.mod h1:nOB03cncLtlp4t+UAkGSV+9beXP/akpekBwL+UX1Qcw=
 github.com/gobwas/httphead v0.0.0-20180130184737-2c6c146eadee/go.mod h1:L0fX3K22YWvt/FAX9NnzrNzcI4wNYi9Yku4O0LKYflo=
 github.com/gobwas/pool v0.2.0/go.mod h1:q8bcK0KcYlCgd9e7WYLm9LpyS+YeLd8JVDW6WezmKEw=
 github.com/gobwas/ws v1.0.2/go.mod h1:szmBTxLgaFppYjEmNtny/v3w89xOydFnnZMcgRRu/EM=
diff --git a/admin-cli/util/http_client.go b/admin-cli/util/http_client.go
new file mode 100644
index 000000000..a631e0153
--- /dev/null
+++ b/admin-cli/util/http_client.go
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package util
+
+import (
+	"context"
+	"fmt"
+	"sync"
+	"time"
+
+	"github.com/go-resty/resty/v2"
+)
+
+type Arguments struct {
+	Name  string
+	Value string
+}
+
+type Result struct {
+	Resp string
+	Err  error
+}
+
+type HTTPRequestFunc func(addr string, args Arguments) (string, error)
+
+func BatchCallHTTP(nodes []*PegasusNode, request HTTPRequestFunc, args Arguments) map[string]*Result {
+	results := make(map[string]*Result)
+
+	var mu sync.Mutex
+	var wg sync.WaitGroup
+	wg.Add(len(nodes))
+	for _, n := range nodes {
+		go func(node *PegasusNode) {
+			_, cancel := context.WithTimeout(context.Background(), time.Second*10)
+			defer cancel()
+			result, err := request(node.TCPAddr(), args)
+			mu.Lock()
+			if err != nil {
+				results[node.CombinedAddr()] = &Result{Err: err}
+			} else {
+				results[node.CombinedAddr()] = &Result{Resp: result}
+			}
+			mu.Unlock()
+			wg.Done()
+		}(n)
+	}
+	wg.Wait()
+
+	return results
+}
+
+func CallHTTPGet(url string) (string, error) {
+	resp, err := resty.New().SetTimeout(time.Second * 10).R().Get(url)
+	if err != nil {
+		return "", fmt.Errorf("failed to call \"%s\": %s", url, err)
+	}
+	if resp.StatusCode() != 200 {
+		return "", fmt.Errorf("failed to call \"%s\": code=%d", url, resp.StatusCode())
+	}
+	return string(resp.Body()), nil
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pegasus.apache.org
For additional commands, e-mail: commits-help@pegasus.apache.org