You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pegasus.apache.org by ji...@apache.org on 2022/06/16 07:33:56 UTC
[incubator-pegasus] 01/25: init
This is an automated email from the ASF dual-hosted git repository.
jiashuo pushed a commit to branch add-table-migrator
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
commit 0ea696591c1ab6cae8af3954f9b8c58229fb01ff
Author: jiashuo <js...@live.com>
AuthorDate: Mon Jun 13 15:19:22 2022 +0800
init
---
admin-cli/executor/data_version.go | 66 +++++++++++++
admin-cli/executor/server_config.go | 109 ++++++---------------
.../executor/toolkits/tablemigrator/migrator.go | 11 +++
admin-cli/util/http_client.go | 58 +++++++++++
4 files changed, 163 insertions(+), 81 deletions(-)
diff --git a/admin-cli/executor/data_version.go b/admin-cli/executor/data_version.go
new file mode 100644
index 00000000..97c9d8c9
--- /dev/null
+++ b/admin-cli/executor/data_version.go
@@ -0,0 +1,66 @@
+package executor
+
+import (
+ "encoding/json"
+ "fmt"
+ "github.com/apache/incubator-pegasus/admin-cli/util"
+ "github.com/apache/incubator-pegasus/go-client/session"
+)
+
+type TableDataVersion struct {
+ DataVersion string `json:"data_version"`
+}
+
+func QueryTableVersion(client *Client, table string) error {
+ version, err := QueryReplicaDataVersion(client, table)
+ if err != nil {
+ return nil
+ }
+
+ // formats into JSON
+ outputBytes, _ := json.MarshalIndent(version, "", " ")
+ fmt.Fprintln(client, string(outputBytes))
+ return nil
+}
+
+func QueryReplicaDataVersion(client *Client, table string) (*TableDataVersion, error) {
+ nodes := client.Nodes.GetAllNodes(session.NodeTypeReplica)
+ resp, err := client.Meta.QueryConfig(table)
+ if err != nil {
+ return nil, err
+ }
+
+ args := util.Arguments{
+ Name: "app_id",
+ Value: string(resp.AppID),
+ }
+ results := util.BatchCallHTTP(nodes, getTableDataVersion, args)
+
+ var finalVersion string
+ var version TableDataVersion
+ for _, result := range results {
+ if result.Err != nil {
+ return nil, result.Err
+ }
+ err := json.Unmarshal([]byte(result.Resp), &version)
+ if err != nil {
+ return nil, err
+ }
+
+ if finalVersion == "" {
+ finalVersion = version.DataVersion
+ } else {
+ if version.DataVersion == finalVersion {
+ continue
+ } else {
+ return nil, fmt.Errorf("replica versions are not consistent!")
+ }
+ }
+ }
+ return &version, nil
+}
+
+func getTableDataVersion(addr string, args util.Arguments) (string, error) {
+ url := fmt.Sprintf("http://%s/replica/data_version?%s=%s", args.Name, args.Value)
+ return util.CallHTTPGet(url)
+}
diff --git a/admin-cli/executor/server_config.go b/admin-cli/executor/server_config.go
index f17cf27c..e857d702 100644
--- a/admin-cli/executor/server_config.go
+++ b/admin-cli/executor/server_config.go
@@ -20,26 +20,20 @@
package executor
import (
- "context"
"encoding/json"
"fmt"
"sort"
"strings"
- "sync"
- "time"
"github.com/apache/incubator-pegasus/admin-cli/util"
"github.com/apache/incubator-pegasus/go-client/session"
- "github.com/go-resty/resty/v2"
)
-type httpRequest func(addr string, cmd command) (string, error)
-
-// map[*util.PegasusNode]*cmdResult is not sorted, pass nodes is for print sorted result
-type printResponse func(nodeType session.NodeType, sortedNodeList []string, resp map[string]*cmdResult)
+// map[*util.PegasusNode]*util.Result is not sorted, pass nodes is for print sorted result
+type printResponse func(nodeType session.NodeType, sortedNodeList []string, resp map[string]*util.Result)
type action struct {
- request httpRequest
+ request util.HttpRequest
print printResponse
}
@@ -55,11 +49,6 @@ var sectionsMap = map[session.NodeType]string{
// TODO(jiashuo1) support collector
}
-type command struct {
- name string
- value string
-}
-
type response struct {
Name string
Section string
@@ -67,11 +56,6 @@ type response struct {
Value string
}
-type cmdResult struct {
- resp string
- err error
-}
-
//TODO(jiashuo1) not support update collector config
func ConfigCommand(client *Client, nodeType session.NodeType, nodeAddr string, name string, actionType string, value string) error {
var nodes []*util.PegasusNode
@@ -87,11 +71,11 @@ func ConfigCommand(client *Client, nodeType session.NodeType, nodeAddr string, n
}
if ac, ok := actionsMap[actionType]; ok {
- cmd := command{
- name: name,
- value: value,
+ cmd := util.Arguments{
+ Name: name,
+ Value: value,
}
- results := batchCallHTTP(nodes, ac.request, cmd)
+ results := util.BatchCallHTTP(nodes, ac.request, cmd)
var sortedNodeList []string
for _, n := range nodes {
@@ -106,59 +90,22 @@ func ConfigCommand(client *Client, nodeType session.NodeType, nodeAddr string, n
return nil
}
-func batchCallHTTP(nodes []*util.PegasusNode, request httpRequest, cmd command) map[string]*cmdResult {
- results := make(map[string]*cmdResult)
-
- var mu sync.Mutex
- var wg sync.WaitGroup
- wg.Add(len(nodes))
- for _, n := range nodes {
- go func(node *util.PegasusNode) {
- _, cancel := context.WithTimeout(context.Background(), time.Second*10)
- defer cancel()
- result, err := request(node.TCPAddr(), cmd)
- mu.Lock()
- if err != nil {
- results[node.CombinedAddr()] = &cmdResult{err: err}
- } else {
- results[node.CombinedAddr()] = &cmdResult{resp: result}
- }
- mu.Unlock()
- wg.Done()
- }(n)
- }
- wg.Wait()
-
- return results
-}
-
-func callHTTP(url string) (string, error) {
- resp, err := resty.New().SetTimeout(time.Second * 10).R().Get(url)
- if err != nil {
- return "", fmt.Errorf("failed to call \"%s\": %s", url, err)
- }
- if resp.StatusCode() != 200 {
- return "", fmt.Errorf("failed to call \"%s\": code=%d", url, resp.StatusCode())
- }
- return string(resp.Body()), nil
-}
-
-func listConfig(addr string, cmd command) (string, error) {
+func listConfig(addr string, cmd util.Arguments) (string, error) {
url := fmt.Sprintf("http://%s/configs", addr)
- return callHTTP(url)
+ return util.CallHTTPGet(url)
}
-func printConfigList(nodeType session.NodeType, sortedNodeList []string, results map[string]*cmdResult) {
+func printConfigList(nodeType session.NodeType, sortedNodeList []string, results map[string]*util.Result) {
fmt.Printf("CMD: list \n")
for _, node := range sortedNodeList {
cmdRes := results[node]
- if cmdRes.err != nil {
- fmt.Printf("[%s] %s\n", node, cmdRes.err)
+ if cmdRes.Err != nil {
+ fmt.Printf("[%s] %s\n", node, cmdRes.Err)
continue
}
var respMap map[string]response
- err := json.Unmarshal([]byte(cmdRes.resp), &respMap)
+ err := json.Unmarshal([]byte(cmdRes.Resp), &respMap)
if err != nil {
fmt.Printf("[%s] %s\n", node, err)
continue
@@ -178,24 +125,24 @@ func printConfigList(nodeType session.NodeType, sortedNodeList []string, results
}
}
-func getConfig(addr string, cmd command) (string, error) {
- url := fmt.Sprintf("http://%s/config?name=%s", addr, cmd.name)
- return callHTTP(url)
+func getConfig(addr string, cmd util.Arguments) (string, error) {
+ url := fmt.Sprintf("http://%s/config?name=%s", addr, cmd.Name)
+ return util.CallHTTPGet(url)
}
-func printConfigValue(nodeType session.NodeType, sortedNodeList []string, results map[string]*cmdResult) {
+func printConfigValue(nodeType session.NodeType, sortedNodeList []string, results map[string]*util.Result) {
fmt.Printf("CMD: get \n")
for _, node := range sortedNodeList {
cmdRes := results[node]
- if cmdRes.err != nil {
- fmt.Printf("[%s] %s\n", node, cmdRes.err)
+ if cmdRes.Err != nil {
+ fmt.Printf("[%s] %s\n", node, cmdRes.Err)
continue
}
var resp response
- err := json.Unmarshal([]byte(cmdRes.resp), &resp)
+ err := json.Unmarshal([]byte(cmdRes.Resp), &resp)
if err != nil {
- fmt.Printf("[%s] %s\n", node, cmdRes.resp)
+ fmt.Printf("[%s] %s\n", node, cmdRes.Resp)
continue
}
@@ -208,22 +155,22 @@ func printConfigValue(nodeType session.NodeType, sortedNodeList []string, result
}
}
-func updateConfig(addr string, cmd command) (string, error) {
- url := fmt.Sprintf("http://%s/updateConfig?%s=%s", addr, cmd.name, cmd.value)
- return callHTTP(url)
+func updateConfig(addr string, cmd util.Arguments) (string, error) {
+ url := fmt.Sprintf("http://%s/updateConfig?%s=%s", addr, cmd.Name, cmd.Value)
+ return util.CallHTTPGet(url)
}
-func printConfigUpdate(nodeType session.NodeType, sortedNodeList []string, results map[string]*cmdResult) {
+func printConfigUpdate(nodeType session.NodeType, sortedNodeList []string, results map[string]*util.Result) {
fmt.Printf("CMD: set \n")
for _, node := range sortedNodeList {
cmdRes := results[node]
- if cmdRes.err != nil {
- fmt.Printf("[%s] %s\n", node, cmdRes.err)
+ if cmdRes.Err != nil {
+ fmt.Printf("[%s] %s\n", node, cmdRes.Err)
continue
}
var resMap map[string]string
- err := json.Unmarshal([]byte(cmdRes.resp), &resMap)
+ err := json.Unmarshal([]byte(cmdRes.Resp), &resMap)
if err != nil {
fmt.Printf("[%s] %s\n", node, err)
continue
diff --git a/admin-cli/executor/toolkits/tablemigrator/migrator.go b/admin-cli/executor/toolkits/tablemigrator/migrator.go
new file mode 100644
index 00000000..e18c3744
--- /dev/null
+++ b/admin-cli/executor/toolkits/tablemigrator/migrator.go
@@ -0,0 +1,11 @@
+package tablemigrator
+
+/**
+1. check data version
+2. create table duplication
+3. check confirm decree if < 5k
+4. set env config deny write request
+5. check confirm decree if == 0
+6. switch table env addrs
+7. set env config deny and re-config
+*/
diff --git a/admin-cli/util/http_client.go b/admin-cli/util/http_client.go
new file mode 100644
index 00000000..473c6fb7
--- /dev/null
+++ b/admin-cli/util/http_client.go
@@ -0,0 +1,58 @@
+package util
+
+import (
+ "context"
+ "fmt"
+ "github.com/go-resty/resty/v2"
+ "sync"
+ "time"
+)
+
+type Arguments struct {
+ Name string
+ Value string
+}
+
+type Result struct {
+ Resp string
+ Err error
+}
+
+type HttpRequest func(addr string, args Arguments) (string, error)
+
+func BatchCallHTTP(nodes []*PegasusNode, request HttpRequest, args Arguments) map[string]*Result {
+ results := make(map[string]*Result)
+
+ var mu sync.Mutex
+ var wg sync.WaitGroup
+ wg.Add(len(nodes))
+ for _, n := range nodes {
+ go func(node *PegasusNode) {
+ _, cancel := context.WithTimeout(context.Background(), time.Second*10)
+ defer cancel()
+ result, err := request(node.TCPAddr(), args)
+ mu.Lock()
+ if err != nil {
+ results[node.CombinedAddr()] = &Result{Err: err}
+ } else {
+ results[node.CombinedAddr()] = &Result{Resp: result}
+ }
+ mu.Unlock()
+ wg.Done()
+ }(n)
+ }
+ wg.Wait()
+
+ return results
+}
+
+func CallHTTPGet(url string) (string, error) {
+ resp, err := resty.New().SetTimeout(time.Second * 10).R().Get(url)
+ if err != nil {
+ return "", fmt.Errorf("failed to call \"%s\": %s", url, err)
+ }
+ if resp.StatusCode() != 200 {
+ return "", fmt.Errorf("failed to call \"%s\": code=%d", url, resp.StatusCode())
+ }
+ return string(resp.Body()), nil
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pegasus.apache.org
For additional commands, e-mail: commits-help@pegasus.apache.org