You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pegasus.apache.org by ji...@apache.org on 2022/06/16 07:33:56 UTC

[incubator-pegasus] 01/25: init

This is an automated email from the ASF dual-hosted git repository.

jiashuo pushed a commit to branch add-table-migrator
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git

commit 0ea696591c1ab6cae8af3954f9b8c58229fb01ff
Author: jiashuo <js...@live.com>
AuthorDate: Mon Jun 13 15:19:22 2022 +0800

    init
---
 admin-cli/executor/data_version.go                 |  66 +++++++++++++
 admin-cli/executor/server_config.go                | 109 ++++++---------------
 .../executor/toolkits/tablemigrator/migrator.go    |  11 +++
 admin-cli/util/http_client.go                      |  58 +++++++++++
 4 files changed, 163 insertions(+), 81 deletions(-)

diff --git a/admin-cli/executor/data_version.go b/admin-cli/executor/data_version.go
new file mode 100644
index 00000000..97c9d8c9
--- /dev/null
+++ b/admin-cli/executor/data_version.go
@@ -0,0 +1,66 @@
+package executor
+
+import (
+	"encoding/json"
+	"fmt"
+	"github.com/apache/incubator-pegasus/admin-cli/util"
+	"github.com/apache/incubator-pegasus/go-client/session"
+)
+
+type TableDataVersion struct {
+	DataVersion string `json:"data_version"`
+}
+
+func QueryTableVersion(client *Client, table string) error {
+	version, err := QueryReplicaDataVersion(client, table)
+	if err != nil {
+		return nil
+	}
+
+	// formats into JSON
+	outputBytes, _ := json.MarshalIndent(version, "", "  ")
+	fmt.Fprintln(client, string(outputBytes))
+	return nil
+}
+
+func QueryReplicaDataVersion(client *Client, table string) (*TableDataVersion, error) {
+	nodes := client.Nodes.GetAllNodes(session.NodeTypeReplica)
+	resp, err := client.Meta.QueryConfig(table)
+	if err != nil {
+		return nil, err
+	}
+
+	args := util.Arguments{
+		Name:  "app_id",
+		Value: string(resp.AppID),
+	}
+	results := util.BatchCallHTTP(nodes, getTableDataVersion, args)
+
+	var finalVersion string
+	var version TableDataVersion
+	for _, result := range results {
+		if result.Err != nil {
+			return nil, result.Err
+		}
+		err := json.Unmarshal([]byte(result.Resp), &version)
+		if err != nil {
+			return nil, err
+		}
+
+		if finalVersion == "" {
+			finalVersion = version.DataVersion
+		} else {
+			if version.DataVersion == finalVersion {
+				continue
+			} else {
+				return nil, fmt.Errorf("replica versions are not consistent!")
+			}
+		}
+	}
+	return &version, nil
+}
+
+func getTableDataVersion(addr string, args util.Arguments) (string, error) {
+	url := fmt.Sprintf("http://%s/replica/data_version?%s=%s", args.Name, args.Value)
+	return util.CallHTTPGet(url)
+}
diff --git a/admin-cli/executor/server_config.go b/admin-cli/executor/server_config.go
index f17cf27c..e857d702 100644
--- a/admin-cli/executor/server_config.go
+++ b/admin-cli/executor/server_config.go
@@ -20,26 +20,20 @@
 package executor
 
 import (
-	"context"
 	"encoding/json"
 	"fmt"
 	"sort"
 	"strings"
-	"sync"
-	"time"
 
 	"github.com/apache/incubator-pegasus/admin-cli/util"
 	"github.com/apache/incubator-pegasus/go-client/session"
-	"github.com/go-resty/resty/v2"
 )
 
-type httpRequest func(addr string, cmd command) (string, error)
-
-// map[*util.PegasusNode]*cmdResult is not sorted, pass nodes is for print sorted result
-type printResponse func(nodeType session.NodeType, sortedNodeList []string, resp map[string]*cmdResult)
+// map[*util.PegasusNode]*util.Result is not sorted, pass nodes is for print sorted result
+type printResponse func(nodeType session.NodeType, sortedNodeList []string, resp map[string]*util.Result)
 
 type action struct {
-	request httpRequest
+	request util.HttpRequest
 	print   printResponse
 }
 
@@ -55,11 +49,6 @@ var sectionsMap = map[session.NodeType]string{
 	// TODO(jiashuo1) support collector
 }
 
-type command struct {
-	name  string
-	value string
-}
-
 type response struct {
 	Name    string
 	Section string
@@ -67,11 +56,6 @@ type response struct {
 	Value   string
 }
 
-type cmdResult struct {
-	resp string
-	err  error
-}
-
 //TODO(jiashuo1) not support update collector config
 func ConfigCommand(client *Client, nodeType session.NodeType, nodeAddr string, name string, actionType string, value string) error {
 	var nodes []*util.PegasusNode
@@ -87,11 +71,11 @@ func ConfigCommand(client *Client, nodeType session.NodeType, nodeAddr string, n
 	}
 
 	if ac, ok := actionsMap[actionType]; ok {
-		cmd := command{
-			name:  name,
-			value: value,
+		cmd := util.Arguments{
+			Name:  name,
+			Value: value,
 		}
-		results := batchCallHTTP(nodes, ac.request, cmd)
+		results := util.BatchCallHTTP(nodes, ac.request, cmd)
 
 		var sortedNodeList []string
 		for _, n := range nodes {
@@ -106,59 +90,22 @@ func ConfigCommand(client *Client, nodeType session.NodeType, nodeAddr string, n
 	return nil
 }
 
-func batchCallHTTP(nodes []*util.PegasusNode, request httpRequest, cmd command) map[string]*cmdResult {
-	results := make(map[string]*cmdResult)
-
-	var mu sync.Mutex
-	var wg sync.WaitGroup
-	wg.Add(len(nodes))
-	for _, n := range nodes {
-		go func(node *util.PegasusNode) {
-			_, cancel := context.WithTimeout(context.Background(), time.Second*10)
-			defer cancel()
-			result, err := request(node.TCPAddr(), cmd)
-			mu.Lock()
-			if err != nil {
-				results[node.CombinedAddr()] = &cmdResult{err: err}
-			} else {
-				results[node.CombinedAddr()] = &cmdResult{resp: result}
-			}
-			mu.Unlock()
-			wg.Done()
-		}(n)
-	}
-	wg.Wait()
-
-	return results
-}
-
-func callHTTP(url string) (string, error) {
-	resp, err := resty.New().SetTimeout(time.Second * 10).R().Get(url)
-	if err != nil {
-		return "", fmt.Errorf("failed to call \"%s\": %s", url, err)
-	}
-	if resp.StatusCode() != 200 {
-		return "", fmt.Errorf("failed to call \"%s\": code=%d", url, resp.StatusCode())
-	}
-	return string(resp.Body()), nil
-}
-
-func listConfig(addr string, cmd command) (string, error) {
+func listConfig(addr string, cmd util.Arguments) (string, error) {
 	url := fmt.Sprintf("http://%s/configs", addr)
-	return callHTTP(url)
+	return util.CallHTTPGet(url)
 }
 
-func printConfigList(nodeType session.NodeType, sortedNodeList []string, results map[string]*cmdResult) {
+func printConfigList(nodeType session.NodeType, sortedNodeList []string, results map[string]*util.Result) {
 	fmt.Printf("CMD: list \n")
 	for _, node := range sortedNodeList {
 		cmdRes := results[node]
-		if cmdRes.err != nil {
-			fmt.Printf("[%s] %s\n", node, cmdRes.err)
+		if cmdRes.Err != nil {
+			fmt.Printf("[%s] %s\n", node, cmdRes.Err)
 			continue
 		}
 
 		var respMap map[string]response
-		err := json.Unmarshal([]byte(cmdRes.resp), &respMap)
+		err := json.Unmarshal([]byte(cmdRes.Resp), &respMap)
 		if err != nil {
 			fmt.Printf("[%s] %s\n", node, err)
 			continue
@@ -178,24 +125,24 @@ func printConfigList(nodeType session.NodeType, sortedNodeList []string, results
 	}
 }
 
-func getConfig(addr string, cmd command) (string, error) {
-	url := fmt.Sprintf("http://%s/config?name=%s", addr, cmd.name)
-	return callHTTP(url)
+func getConfig(addr string, cmd util.Arguments) (string, error) {
+	url := fmt.Sprintf("http://%s/config?name=%s", addr, cmd.Name)
+	return util.CallHTTPGet(url)
 }
 
-func printConfigValue(nodeType session.NodeType, sortedNodeList []string, results map[string]*cmdResult) {
+func printConfigValue(nodeType session.NodeType, sortedNodeList []string, results map[string]*util.Result) {
 	fmt.Printf("CMD: get \n")
 	for _, node := range sortedNodeList {
 		cmdRes := results[node]
-		if cmdRes.err != nil {
-			fmt.Printf("[%s] %s\n", node, cmdRes.err)
+		if cmdRes.Err != nil {
+			fmt.Printf("[%s] %s\n", node, cmdRes.Err)
 			continue
 		}
 
 		var resp response
-		err := json.Unmarshal([]byte(cmdRes.resp), &resp)
+		err := json.Unmarshal([]byte(cmdRes.Resp), &resp)
 		if err != nil {
-			fmt.Printf("[%s] %s\n", node, cmdRes.resp)
+			fmt.Printf("[%s] %s\n", node, cmdRes.Resp)
 			continue
 		}
 
@@ -208,22 +155,22 @@ func printConfigValue(nodeType session.NodeType, sortedNodeList []string, result
 	}
 }
 
-func updateConfig(addr string, cmd command) (string, error) {
-	url := fmt.Sprintf("http://%s/updateConfig?%s=%s", addr, cmd.name, cmd.value)
-	return callHTTP(url)
+func updateConfig(addr string, cmd util.Arguments) (string, error) {
+	url := fmt.Sprintf("http://%s/updateConfig?%s=%s", addr, cmd.Name, cmd.Value)
+	return util.CallHTTPGet(url)
 }
 
-func printConfigUpdate(nodeType session.NodeType, sortedNodeList []string, results map[string]*cmdResult) {
+func printConfigUpdate(nodeType session.NodeType, sortedNodeList []string, results map[string]*util.Result) {
 	fmt.Printf("CMD: set \n")
 	for _, node := range sortedNodeList {
 		cmdRes := results[node]
-		if cmdRes.err != nil {
-			fmt.Printf("[%s] %s\n", node, cmdRes.err)
+		if cmdRes.Err != nil {
+			fmt.Printf("[%s] %s\n", node, cmdRes.Err)
 			continue
 		}
 
 		var resMap map[string]string
-		err := json.Unmarshal([]byte(cmdRes.resp), &resMap)
+		err := json.Unmarshal([]byte(cmdRes.Resp), &resMap)
 		if err != nil {
 			fmt.Printf("[%s] %s\n", node, err)
 			continue
diff --git a/admin-cli/executor/toolkits/tablemigrator/migrator.go b/admin-cli/executor/toolkits/tablemigrator/migrator.go
new file mode 100644
index 00000000..e18c3744
--- /dev/null
+++ b/admin-cli/executor/toolkits/tablemigrator/migrator.go
@@ -0,0 +1,11 @@
+package tablemigrator
+
+/**
+1. check data version
+2. create table duplication
+3. check confirm decree if < 5k
+4. set env config deny write request
+5. check confirm decree if == 0
+6. switch table env addrs
+7. set env config deny and re-config
+*/
diff --git a/admin-cli/util/http_client.go b/admin-cli/util/http_client.go
new file mode 100644
index 00000000..473c6fb7
--- /dev/null
+++ b/admin-cli/util/http_client.go
@@ -0,0 +1,58 @@
+package util
+
+import (
+	"context"
+	"fmt"
+	"github.com/go-resty/resty/v2"
+	"sync"
+	"time"
+)
+
+type Arguments struct {
+	Name  string
+	Value string
+}
+
+type Result struct {
+	Resp string
+	Err  error
+}
+
+type HttpRequest func(addr string, args Arguments) (string, error)
+
+func BatchCallHTTP(nodes []*PegasusNode, request HttpRequest, args Arguments) map[string]*Result {
+	results := make(map[string]*Result)
+
+	var mu sync.Mutex
+	var wg sync.WaitGroup
+	wg.Add(len(nodes))
+	for _, n := range nodes {
+		go func(node *PegasusNode) {
+			_, cancel := context.WithTimeout(context.Background(), time.Second*10)
+			defer cancel()
+			result, err := request(node.TCPAddr(), args)
+			mu.Lock()
+			if err != nil {
+				results[node.CombinedAddr()] = &Result{Err: err}
+			} else {
+				results[node.CombinedAddr()] = &Result{Resp: result}
+			}
+			mu.Unlock()
+			wg.Done()
+		}(n)
+	}
+	wg.Wait()
+
+	return results
+}
+
+func CallHTTPGet(url string) (string, error) {
+	resp, err := resty.New().SetTimeout(time.Second * 10).R().Get(url)
+	if err != nil {
+		return "", fmt.Errorf("failed to call \"%s\": %s", url, err)
+	}
+	if resp.StatusCode() != 200 {
+		return "", fmt.Errorf("failed to call \"%s\": code=%d", url, resp.StatusCode())
+	}
+	return string(resp.Body()), nil
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pegasus.apache.org
For additional commands, e-mail: commits-help@pegasus.apache.org