You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by mi...@apache.org on 2023/03/21 10:29:47 UTC
[shardingsphere-on-cloud] branch main updated: feat(pitr): implement func backup (#268)
This is an automated email from the ASF dual-hosted git repository.
miaoliyao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/shardingsphere-on-cloud.git
The following commit(s) were added to refs/heads/main by this push:
new 8dd34dd feat(pitr): implement func backup (#268)
8dd34dd is described below
commit 8dd34dd0794339cfc261c8f0e79f9d15dd8bd595
Author: Xu-Wentao <cu...@yahoo.com>
AuthorDate: Tue Mar 21 18:29:42 2023 +0800
feat(pitr): implement func backup (#268)
---
.gitignore | 2 +
pitr/agent/Makefile | 9 +-
.../as_backup.go => agent/internal/cons/backup.go} | 38 ++-
pitr/agent/internal/handler/backup.go | 8 +-
pitr/agent/internal/handler/view/backup.go | 2 +-
pitr/agent/internal/handler/view/show.go | 14 +-
pitr/agent/internal/pkg/opengauss.go | 12 +-
pitr/agent/main.go | 11 +
pitr/agent/pkg/cmds/cmd.go | 4 +-
pitr/agent/pkg/gsutil/conn.go | 6 +-
pitr/cli/Makefile | 8 +-
pitr/cli/go.mod | 13 +-
pitr/cli/go.sum | 37 ++-
pitr/cli/internal/cmd/backup.go | 317 ++++++++++++++++++++-
pitr/cli/internal/cmd/backup_test.go | 224 +++++++++++++++
pitr/cli/internal/cmd/cmd_suite_test.go | 55 ++++
pitr/cli/internal/pkg/agent-server.go | 21 +-
pitr/cli/internal/pkg/local-storage.go | 9 +-
pitr/cli/internal/pkg/local-storage_test.go | 8 +-
pitr/cli/internal/pkg/mocks/agent-server.go | 111 ++++++++
pitr/cli/internal/pkg/mocks/local-storage.go | 126 ++++++++
.../cli/internal/pkg/mocks/shardingsphere-proxy.go | 138 +++++++++
pitr/cli/internal/pkg/model/as_backup.go | 8 +-
pitr/cli/internal/pkg/model/as_show.go | 14 +-
.../internal/pkg/model/{as_backup.go => const.go} | 30 +-
pitr/cli/internal/pkg/model/ls_backup.go | 30 +-
pitr/cli/internal/pkg/shardingsphere-proxy.go | 17 +-
pitr/cli/internal/pkg/shardingsphere-proxy_test.go | 88 ++++++
pitr/cli/pkg/httputils/req.go | 10 +-
29 files changed, 1230 insertions(+), 140 deletions(-)
diff --git a/.gitignore b/.gitignore
index 9903380..db4a733 100644
--- a/.gitignore
+++ b/.gitignore
@@ -36,3 +36,5 @@ test
certs
shardingsphere-operator/vendor/
pitr/agent/agent
+pitr/agent/vendor/
+pitr/cli/vendor/
diff --git a/pitr/agent/Makefile b/pitr/agent/Makefile
index b31ca7f..6f3a0ae 100644
--- a/pitr/agent/Makefile
+++ b/pitr/agent/Makefile
@@ -1,7 +1,14 @@
-.PHONY:openssl-local
+.PHONY:openssl-local test build
+
+GOOS := $(shell go env GOOS)
+
openssl-local:
mkdir -p certs && \
cd certs && \
openssl req -new -SHA256 -newkey rsa:2048 -nodes -keyout tls.key -out tls.csr -subj "/C=CN/ST=beijing/L=beijing/O=/OU=/" && \
openssl x509 -req -sha256 -days 365 -in tls.csr -signkey tls.key -out tls.crt
+test:
+ go test ./... -coverprofile cover.out
+build:
+ GOOS=$(GOOS) go build -o pitr-agent
diff --git a/pitr/cli/internal/pkg/model/as_backup.go b/pitr/agent/internal/cons/backup.go
similarity index 53%
copy from pitr/cli/internal/pkg/model/as_backup.go
copy to pitr/agent/internal/cons/backup.go
index e0cc1cf..a8d4dd2 100644
--- a/pitr/cli/internal/pkg/model/as_backup.go
+++ b/pitr/agent/internal/cons/backup.go
@@ -15,28 +15,24 @@
* limitations under the License.
*/
-package model
+package cons
-type (
- BackupIn struct {
- DbPort uint16 `json:"db_port"`
- DbName string `json:"db_name"`
- Username string `json:"username"`
- Password string `json:"password"`
+const (
+ // DBBackModeFull is full backup mode
+ DBBackModeFull = "FULL"
+ // DBBackModePTrack is ptrack backup mode
+ DBBackModePTrack = "PTRACK"
- DnBackupPath string `json:"dn_backup_path"`
- DnThreadsNum uint8 `json:"dn_threads_num"`
- DnBackupMode string `json:"dn_backup_mode"`
- Instance string `json:"instance"`
- }
+ // opengauss backup status
+ OGBackupStatusRunning = "RUNNING"
+ OGBackupStatusOk = "OK"
+ OGBackupStatusError = "ERROR"
- BackupOut struct {
- ID string `json:"backup_id"`
- }
-
- BackupOutResp struct {
- Code int `json:"code" validate:"required"`
- Msg string `json:"msg" validate:"required"`
- Data BackupOut `json:"data"`
- }
+ // agent backup status
+ DBBackupStatusRunning = "Running"
+ DBBackupStatusCompleted = "Completed"
+ DBBackupStatusFailed = "Failed"
+ // DBBackupStatusOther is used to indicate that the backup status is not in the above three states. and we will not handle it.
+ // the `Other` status may be some intermediate status, such as `Waiting`, `CheckError`, `Ok` etc.
+ DBBackupStatusOther = "Other"
)
diff --git a/pitr/agent/internal/handler/backup.go b/pitr/agent/internal/handler/backup.go
index 2d9cd2b..b69e16d 100644
--- a/pitr/agent/internal/handler/backup.go
+++ b/pitr/agent/internal/handler/backup.go
@@ -18,6 +18,7 @@
package handler
import (
+ "errors"
"fmt"
"github.com/apache/shardingsphere-on-cloud/pitr/agent/internal/pkg"
"github.com/apache/shardingsphere-on-cloud/pitr/agent/pkg/responder"
@@ -45,7 +46,12 @@ func Backup(ctx *fiber.Ctx) error {
return fmt.Errorf(efmt, in.Username, len(in.Password), in.DbName, err)
}
- backupID, err := pkg.OG.AsyncBackup(in.DnBackupPath, in.Instance, in.DnBackupMode, 1)
+ // try to add backup instance
+ if err := pkg.OG.AddInstance(in.DnBackupPath, in.Instance); err != nil && !errors.Is(err, cons.InstanceAlreadyExist) {
+ return fmt.Errorf("add instance failed, err=%w", err)
+ }
+
+ backupID, err := pkg.OG.AsyncBackup(in.DnBackupPath, in.Instance, in.DnBackupMode, 1, in.DbPort)
if err != nil {
efmt := "pkg.OG.AsyncBackup[path=%s,instance=%s,mode=%s] failure,err=%w"
return fmt.Errorf(efmt, in.DnBackupPath, in.Instance, in.DnBackupMode, err)
diff --git a/pitr/agent/internal/handler/view/backup.go b/pitr/agent/internal/handler/view/backup.go
index 19a29aa..b08952d 100644
--- a/pitr/agent/internal/handler/view/backup.go
+++ b/pitr/agent/internal/handler/view/backup.go
@@ -70,7 +70,7 @@ func (in *BackupIn) Validate() error {
return cons.MissingDnBackupMode
}
- if in.DnBackupMode != "FULL" && in.DnBackupMode != "PTRACK" {
+ if in.DnBackupMode != cons.DBBackModeFull && in.DnBackupMode != cons.DBBackModePTrack {
return cons.InvalidDnBackupMode
}
diff --git a/pitr/agent/internal/handler/view/show.go b/pitr/agent/internal/handler/view/show.go
index 6ebcfb2..022180c 100644
--- a/pitr/agent/internal/handler/view/show.go
+++ b/pitr/agent/internal/handler/view/show.go
@@ -124,14 +124,14 @@ func NewBackupInfoList(list []model.Backup, path, instance string) []BackupInfo
func statusTrans(status string) string {
switch status {
- case "OK":
- return "Completed"
- case "ERROR":
- return "Failed"
- case "RUNNING":
- return "Running"
+ case cons.OGBackupStatusOk:
+ return cons.DBBackupStatusCompleted
+ case cons.OGBackupStatusError:
+ return cons.DBBackupStatusFailed
+ case cons.OGBackupStatusRunning:
+ return cons.DBBackupStatusRunning
default:
- return "Other"
+ return cons.DBBackupStatusOther
}
}
diff --git a/pitr/agent/internal/pkg/opengauss.go b/pitr/agent/internal/pkg/opengauss.go
index 8f6c0b2..32b8970 100644
--- a/pitr/agent/internal/pkg/opengauss.go
+++ b/pitr/agent/internal/pkg/opengauss.go
@@ -41,7 +41,7 @@ type (
}
IOpenGauss interface {
- AsyncBackup(backupPath, instanceName, backupMode string, threadsNum uint8) (string, error)
+ AsyncBackup(backupPath, instanceName, backupMode string, threadsNum uint8, dbPort uint16) (string, error)
ShowBackup(backupPath, instanceName, backupID string) (*model.Backup, error)
Init(backupPath string) error
AddInstance(backupPath, instance string) error
@@ -71,7 +71,7 @@ func NewOpenGauss(shell, pgData string, log logging.ILog) IOpenGauss {
}
const (
- _backupFmt = "gs_probackup backup --backup-path=%s --instance=%s --backup-mode=%s --pgdata=%s --threads=%d 2>&1"
+ _backupFmt = "gs_probackup backup --backup-path=%s --instance=%s --backup-mode=%s --pgdata=%s --threads=%d --pgport %d 2>&1"
_showFmt = "gs_probackup show --instance=%s --backup-path=%s --backup-id=%s --format=json 2>&1"
_delBackupFmt = "gs_probackup delete --backup-path=%s --instance=%s --backup-id=%s 2>&1"
_restoreFmt = "gs_probackup restore --backup-path=%s --instance=%s --backup-id=%s --pgdata=%s 2>&1"
@@ -91,8 +91,8 @@ const (
_mvFmt = "mv %s %s"
)
-func (og *openGauss) AsyncBackup(backupPath, instanceName, backupMode string, threadsNum uint8) (string, error) {
- cmd := fmt.Sprintf(_backupFmt, backupPath, instanceName, backupMode, og.pgData, threadsNum)
+func (og *openGauss) AsyncBackup(backupPath, instanceName, backupMode string, threadsNum uint8, dbPort uint16) (string, error) {
+ cmd := fmt.Sprintf(_backupFmt, backupPath, instanceName, backupMode, og.pgData, threadsNum, dbPort)
outputs, err := cmds.AsyncExec(og.shell, cmd)
if err != nil {
return "", fmt.Errorf("cmds.AsyncExec[shell=%s,cmd=%s] return err=%w", og.shell, cmd, err)
@@ -198,8 +198,8 @@ func (og *openGauss) AddInstance(backupPath, instance string) error {
return nil
}
-func (og *openGauss) DelInstance(backupPath, instancee string) error {
- cmd := fmt.Sprintf(_delInstanceFmt, backupPath, instancee)
+func (og *openGauss) DelInstance(backupPath, instance string) error {
+ cmd := fmt.Sprintf(_delInstanceFmt, backupPath, instance)
output, err := cmds.Exec(og.shell, cmd)
og.log.Debug(fmt.Sprintf("DelInstance[output=%s,err=%v]", output, err))
diff --git a/pitr/agent/main.go b/pitr/agent/main.go
index 2730fc4..56f0e02 100644
--- a/pitr/agent/main.go
+++ b/pitr/agent/main.go
@@ -77,6 +77,10 @@ func main() {
}
}
+ if _, err := os.Stat(pgData); os.IsNotExist(err) {
+ panic(fmt.Errorf("PGDATA:%s the database directory does not exist", pgData))
+ }
+
pgData := strings.Trim(pgData, " ")
if strings.HasSuffix(pgData, "/") {
dirs := strings.Split(pgData, "/")
@@ -88,6 +92,13 @@ func main() {
panic(fmt.Errorf("lack of HTTPs certificate"))
}
+ if _, err := os.Stat(tlsCrt); os.IsNotExist(err) {
+ panic(fmt.Errorf("TLS certificate file does not exist"))
+ }
+ if _, err := os.Stat(tlsKey); os.IsNotExist(err) {
+ panic(fmt.Errorf("TLS key file does not exist"))
+ }
+
var level = zapcore.InfoLevel
if logLevel == debugLogLevel {
level = zapcore.DebugLevel
diff --git a/pitr/agent/pkg/cmds/cmd.go b/pitr/agent/pkg/cmds/cmd.go
index 21b1be6..dc248d2 100644
--- a/pitr/agent/pkg/cmds/cmd.go
+++ b/pitr/agent/pkg/cmds/cmd.go
@@ -20,6 +20,7 @@ package cmds
import (
"bufio"
"fmt"
+ "github.com/apache/shardingsphere-on-cloud/pitr/agent/pkg/logging"
"io"
"os/exec"
@@ -39,7 +40,7 @@ func AsyncExec(name string, args ...string) (chan *Output, error) {
args = append([]string{c}, args...)
cmd := exec.Command(name, args...)
-
+ logging.Debug(cmd.String())
stdout, err := cmd.StdoutPipe()
if err != nil {
return nil, fmt.Errorf("can not obtain stdout pipe for command[args=%+v]:%s", args, err)
@@ -101,6 +102,7 @@ func Exec(name string, args ...string) (string, error) {
args = append([]string{c}, args...)
cmd := exec.Command(name, args...)
+ logging.Debug(cmd.String())
stdout, err := cmd.StdoutPipe()
if err != nil {
diff --git a/pitr/agent/pkg/gsutil/conn.go b/pitr/agent/pkg/gsutil/conn.go
index 6b4f565..4dcbdf8 100644
--- a/pitr/agent/pkg/gsutil/conn.go
+++ b/pitr/agent/pkg/gsutil/conn.go
@@ -25,6 +25,8 @@ import (
"strings"
)
+const defaultOGHost = "127.0.0.1"
+
type OpenGauss struct {
db *sql.DB
user string
@@ -43,8 +45,8 @@ func Open(user, password, dbName string, dbPort uint16) (*OpenGauss, error) {
return nil, fmt.Errorf("db name is empty")
}
- connStr := "port=%d user=%s password=%s dbname=%s sslmode=disable"
- db, err := sql.Open("opengauss", fmt.Sprintf(connStr, dbPort, user, password, dbName))
+ connStr := "host=%s port=%d user=%s password=%s dbname=%s sslmode=disable"
+ db, err := sql.Open("opengauss", fmt.Sprintf(connStr, defaultOGHost, dbPort, user, password, dbName))
if err != nil {
efmt := "sql:open fail[user=%s,pwLen=%d,dbName=%s],err=%s,wrap=%w"
return nil, fmt.Errorf(efmt, user, len(password), dbName, err, cons.DbConnectionFailed)
diff --git a/pitr/cli/Makefile b/pitr/cli/Makefile
index db92ecc..2e49a59 100644
--- a/pitr/cli/Makefile
+++ b/pitr/cli/Makefile
@@ -1,4 +1,8 @@
-.PHONY: build
+.PHONY: build test
+
+GOOS := $(shell go env GOOS)
build:
- go build -o gs_pitr main.go
\ No newline at end of file
+ GOOS=$(GOOS) go build -o gs_pitr main.go
+test:
+ go test ./... -coverprofile cover.out
diff --git a/pitr/cli/go.mod b/pitr/cli/go.mod
index 03be626..fe896f4 100644
--- a/pitr/cli/go.mod
+++ b/pitr/cli/go.mod
@@ -4,9 +4,10 @@ go 1.20
require (
gitee.com/opengauss/openGauss-connector-go-pq v1.0.4
+ github.com/golang/mock v1.6.0
github.com/google/uuid v1.3.0
- github.com/onsi/ginkgo/v2 v2.8.3
- github.com/onsi/gomega v1.27.1
+ github.com/onsi/ginkgo/v2 v2.9.1
+ github.com/onsi/gomega v1.27.3
github.com/spf13/cobra v1.6.1
go.uber.org/zap v1.24.0
)
@@ -26,10 +27,10 @@ require (
go.uber.org/atomic v1.9.0 // indirect
go.uber.org/multierr v1.8.0 // indirect
golang.org/x/crypto v0.0.0-20220214200702-86341886e292 // indirect
- golang.org/x/net v0.7.0 // indirect
- golang.org/x/sys v0.5.0 // indirect
- golang.org/x/text v0.7.0 // indirect
- golang.org/x/tools v0.6.0 // indirect
+ golang.org/x/net v0.8.0 // indirect
+ golang.org/x/sys v0.6.0 // indirect
+ golang.org/x/text v0.8.0 // indirect
+ golang.org/x/tools v0.7.0 // indirect
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
google.golang.org/protobuf v1.28.1 // indirect
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
diff --git a/pitr/cli/go.sum b/pitr/cli/go.sum
index 8ebc765..5aa05a8 100644
--- a/pitr/cli/go.sum
+++ b/pitr/cli/go.sum
@@ -23,6 +23,8 @@ github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0 h1:p104kn46Q8Wd
github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0/go.mod h1:fyg7847qk6SyHyPtNmDHnmrv/HOrqktSC+C9fM+CJOE=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
+github.com/golang/mock v1.6.0 h1:ErTB+efbowRARo13NNdxyJji2egdxLGQhRaY+DUumQc=
+github.com/golang/mock v1.6.0/go.mod h1:p6yTPP+5HYm5mzsMV8JkE6ZKdX+/wYM6Hr+LicevLPs=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.3/go.mod h1:vzj43D7+SQXF/4pzW/hwtAqwc6iTitCiVSaWz5lYuqw=
@@ -34,6 +36,7 @@ github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvq
github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw=
+github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg=
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
@@ -56,10 +59,14 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
-github.com/onsi/ginkgo/v2 v2.8.3 h1:RpbK1G8nWPNaCVFBWsOGnEQQGgASi6b8fxcWBvDYjxQ=
-github.com/onsi/ginkgo/v2 v2.8.3/go.mod h1:6OaUA8BCi0aZfmzYT/q9AacwTzDpNbxILUT+TlBq6MY=
-github.com/onsi/gomega v1.27.1 h1:rfztXRbg6nv/5f+Raen9RcGoSecHIFgBBLQK3Wdj754=
-github.com/onsi/gomega v1.27.1/go.mod h1:aHX5xOykVYzWOV4WqQy0sy8BQptgukenXpCXfadcIAw=
+github.com/onsi/ginkgo/v2 v2.3.1 h1:8SbseP7qM32WcvE6VaN6vfXxv698izmsJ1UQX9ve7T8=
+github.com/onsi/ginkgo/v2 v2.3.1/go.mod h1:Sv4yQXwG5VmF7tm3Q5Z+RWUpPo24LF1mpnz2crUb8Ys=
+github.com/onsi/ginkgo/v2 v2.9.1 h1:zie5Ly042PD3bsCvsSOPvRnFwyo3rKe64TJlD6nu0mk=
+github.com/onsi/ginkgo/v2 v2.9.1/go.mod h1:FEcmzVcCHl+4o9bQZVab+4dC9+j+91t2FHSzmGAPfuo=
+github.com/onsi/gomega v1.22.1 h1:pY8O4lBfsHKZHM/6nrxkhVPUznOlIu3quZcKP/M20KI=
+github.com/onsi/gomega v1.22.1/go.mod h1:x6n7VNe4hw0vkyYUM4mjIXx3JbLiPaBPNgB7PRQ1tuM=
+github.com/onsi/gomega v1.27.3 h1:5VwIwnBY3vbBDOJrNtA4rVdiTZCsq9B5F12pvy1Drmk=
+github.com/onsi/gomega v1.27.3/go.mod h1:5vG284IBtfDAmDyrK+eGyZmUgUlmi+Wngqo557cZ6Gw=
github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
@@ -86,6 +93,7 @@ github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKs
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/tjfoc/gmsm v1.4.1 h1:aMe1GlZb+0bLjn+cKTPEvvn9oUEBlJitaZiiBwsbgho=
github.com/tjfoc/gmsm v1.4.1/go.mod h1:j4INPkHWMrhJb38G+J6W4Tw0AbuN8Thu3PbdVYhVcTE=
+github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE=
go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
@@ -95,6 +103,7 @@ go.uber.org/multierr v1.8.0/go.mod h1:7EAYxJLBy9rStEaz58O2t4Uvip6FSURkq8/ppBp95a
go.uber.org/zap v1.24.0 h1:FiJd5l1UOLj0wCgbSE0rwwXHzEdAZS6hiiSnxJN/D60=
go.uber.org/zap v1.24.0/go.mod h1:2kMP+WWQ8aoFoedH3T2sq6iJ2yDWpHbP0f6MQbS9Gkg=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
+golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20201012173705-84dcc777aaee/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
@@ -104,40 +113,56 @@ golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU=
golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
+golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
+golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20201010224723-4f7140c49acb/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
+golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
golang.org/x/net v0.7.0 h1:rJrUqqhjsgNp7KqAIc25s9pZnjU7TUcSY7HcVZjdn1g=
golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
+golang.org/x/net v0.8.0 h1:Zrh2ngAOFYneWTAIAPethzeaQLuHwhuBkuV6ZiRnUaQ=
+golang.org/x/net v0.8.0/go.mod h1:QVkue5JL9kW//ek3r6jTKnTFis1tRmNAW2P1shuFdJc=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191204072324-ce4227a45e2e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.5.0 h1:MUK/U/4lj1t1oPg0HfuXDN/Z1wv31ZJ/YcPiGccS4DU=
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.6.0 h1:MVltZSvRTcU2ljQOhs94SXPftV6DCNnZViHeQps87pQ=
+golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.7.0 h1:4BRB4x83lYWy72KwLD/qYDuTu7q9PjSagHvijDw7cLo=
golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
+golang.org/x/text v0.8.0 h1:57P1ETyNKtuIjB4SRd15iJxuhj8Gc416Y78H3qgMh68=
+golang.org/x/text v0.8.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY=
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q=
-golang.org/x/tools v0.6.0 h1:BOw41kyTf3PuCW1pVQf8+Cyg8pMlkYB1oo9iJ6D/lKM=
-golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU=
+golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
+golang.org/x/tools v0.1.1/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
+golang.org/x/tools v0.7.0 h1:W4OVu8VVOaIO0yzWMNdepAulS7YfoS3Zabrm8DOXXU4=
+golang.org/x/tools v0.7.0/go.mod h1:4pg6aUX35JBAogB10C9AtvVL+qowtN4pT3CGSQex14s=
+golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
diff --git a/pitr/cli/internal/cmd/backup.go b/pitr/cli/internal/cmd/backup.go
index 946f7a8..b5669c7 100644
--- a/pitr/cli/internal/cmd/backup.go
+++ b/pitr/cli/internal/cmd/backup.go
@@ -19,6 +19,13 @@ package cmd
import (
"fmt"
+ "github.com/apache/shardingsphere-on-cloud/pitr/cli/internal/pkg"
+ "github.com/apache/shardingsphere-on-cloud/pitr/cli/internal/pkg/model"
+ "github.com/apache/shardingsphere-on-cloud/pitr/cli/internal/pkg/xerr"
+ "github.com/google/uuid"
+ "os"
+ "sync"
+ "time"
"github.com/spf13/cobra"
@@ -28,58 +35,338 @@ import (
const (
dnBackupPath = "dn-backup-path"
dnThreadsNum = "dn-threads-num"
+ // defaultInstance is used to set backup instance name in openGauss, we can modify it in the future.
+ defaultInstance = "ins-default-ss"
+ // defaultShowDetailRetryTimes retry times of check backup detail from agent server
+ defaultShowDetailRetryTimes = 3
+)
+
+var (
+ // Host ss-proxy host
+ Host string
+ // Port ss-proxy port
+ Port uint16
+ // Username ss-proxy username
+ Username string
+ // Password ss-proxy password
+ Password string
+ // AgentPort agent-server port
+ AgentPort uint16
+ // BackupPath openGauss data backup path
+ BackupPath string
+ // ThreadsNum openGauss data backup task thread num
+ ThreadsNum uint8
+
+ filename string
)
var Backup = &cobra.Command{
Use: "backup",
Short: "Backup a database cluster",
Run: func(cmd *cobra.Command, args []string) {
+ var err error
- host, err := cmd.Flags().GetString(host)
+ Host, err = cmd.Flags().GetString(host)
if err != nil {
logging.Error(err.Error())
}
- logging.Info(fmt.Sprintf("flags:host:%s", host))
+ logging.Info(fmt.Sprintf("flags:host:%s", Host))
- port, err := cmd.Flags().GetUint16(port)
+ Port, err = cmd.Flags().GetUint16(port)
if err != nil {
logging.Error(err.Error())
}
- logging.Info(fmt.Sprintf("flags:port:%d", port))
+ logging.Info(fmt.Sprintf("flags:port:%d", Port))
- un, err := cmd.Flags().GetString(username)
+ Username, err = cmd.Flags().GetString(username)
if err != nil {
logging.Error(err.Error())
}
- logging.Info(fmt.Sprintf("flags:username:%s", un))
+ logging.Info(fmt.Sprintf("flags:username:%s", Username))
- pw, err := cmd.Flags().GetString(password)
+ Password, err = cmd.Flags().GetString(password)
if err != nil {
logging.Error(err.Error())
}
- logging.Info(fmt.Sprintf("flags:password:%s", pw))
+ logging.Info(fmt.Sprintf("flags:password:%s", Password))
- agentPort, err := cmd.Flags().GetUint16(agentPort)
+ AgentPort, err = cmd.Flags().GetUint16(agentPort)
if err != nil {
logging.Error(err.Error())
}
- logging.Info(fmt.Sprintf("flags:agentPort:%d", agentPort))
+ logging.Info(fmt.Sprintf("flags:agentPort:%d", AgentPort))
- backupPath, err := cmd.Flags().GetString(dnBackupPath)
+ BackupPath, err = cmd.Flags().GetString(dnBackupPath)
if err != nil {
logging.Error(err.Error())
}
- logging.Info(fmt.Sprintf("flags:backupPath:%s", backupPath))
+ logging.Info(fmt.Sprintf("flags:backupPath:%s", BackupPath))
- threadsNum, err := cmd.Flags().GetUint16(dnThreadsNum)
+ ThreadsNum, err = cmd.Flags().GetUint8(dnThreadsNum)
if err != nil {
logging.Error(err.Error())
}
- logging.Info(fmt.Sprintf("flags:threadsNum:%d", threadsNum))
+ logging.Info(fmt.Sprintf("flags:threadsNum:%d", ThreadsNum))
+
+ logging.Info(fmt.Sprintf("Default backup path: %s/%s\n", os.Getenv("HOME"), ".gs_pitr/backup/"))
+
+ // Start backup
+ if err := backup(); err != nil {
+ logging.Error(err.Error())
+ }
},
}
+// Steps of backup:
+// 1. lock cluster
+// 2. Get cluster info and save local backup info
+// 3. Operate backup by agent-server
+// 4. unlock cluster
+// 5. Waiting for backups finished
+// 6. Update local backup info
+// 7. Double check backups all finished
+func backup() error {
+ proxy, err := pkg.NewShardingSphereProxy(Username, Password, pkg.DefaultDbName, Host, Port)
+ if err != nil {
+ return xerr.NewCliErr("create ss-proxy connect failed")
+ }
+
+ root := fmt.Sprintf("%s/%s", os.Getenv("HOME"), ".gs_pitr")
+ ls, err := pkg.NewLocalStorage(root)
+ if err != nil {
+ return xerr.NewCliErr("create local storage failed")
+ }
+
+ // Step1. lock cluster
+ if err := proxy.LockForBackup(); err != nil {
+ return xerr.NewCliErr("lock for backup failed")
+ }
+
+ // Step2. Get cluster info and save local backup info
+ lsBackup, err := exportData(proxy, ls)
+ if err != nil {
+ return xerr.NewCliErr(fmt.Sprintf("export backup data failed, err:%s", err.Error()))
+ }
+
+ // Step3. send backup command to agent-server.
+ if err := execBackup(lsBackup); err != nil {
+ // if backup failed, still need to unlock cluster.
+ if err := proxy.Unlock(); err != nil {
+ logging.Error(fmt.Sprintf("coz exec backup failed, try to unlock cluster, but still failed, err:%s", err.Error()))
+ }
+ return xerr.NewCliErr(fmt.Sprintf("exec backup failed, err:%s", err.Error()))
+ }
+
+ // Step4. unlock cluster
+ if err := proxy.Unlock(); err != nil {
+ return xerr.NewCliErr(fmt.Sprintf("unlock cluster failed, err:%s", err.Error()))
+ }
+
+ // Step5. update backup file
+ if err := ls.WriteByJSON(filename, lsBackup); err != nil {
+ return xerr.NewCliErr(fmt.Sprintf("update backup file failed, err:%s", err.Error()))
+ }
+
+ // Step6. check agent server backup status
+ status := checkBackupStatus(lsBackup)
+ logging.Info(fmt.Sprintf("backup result:%s", status))
+
+ // Step7. finished backup and update backup file
+ if err := ls.WriteByJSON(filename, lsBackup); err != nil {
+ return xerr.NewCliErr(fmt.Sprintf("update backup file failed, err: %s", err.Error()))
+ }
+
+ logging.Info("backup finished")
+ return nil
+
+}
+
+func exportData(proxy pkg.IShardingSphereProxy, ls pkg.ILocalStorage) (lsBackup *model.LsBackup, err error) {
+ // Step1. export cluster metadata from ss-proxy
+ cluster, err := proxy.ExportMetaData()
+ if err != nil {
+ return nil, xerr.NewCliErr("export meta data failed")
+ }
+
+ // Step2. export storage nodes from ss-proxy
+ nodes, err := proxy.ExportStorageNodes()
+ if err != nil {
+ return nil, xerr.NewCliErr("export storage nodes failed")
+ }
+
+ // Step3. combine the backup contents
+ filename = ls.GenFilename(pkg.ExtnJSON)
+ contents := &model.LsBackup{
+ Info: &model.BackupMetaInfo{
+ ID: uuid.New().String(), // generate uuid for this backup
+ CSN: cluster.SnapshotInfo.Csn,
+ StartTime: time.Now().Unix(),
+ EndTime: 0,
+ },
+ SsBackup: &model.SsBackup{
+ Status: model.SsBackupStatusWaiting, // default status of backup is model.SsBackupStatusWaiting
+ ClusterInfo: cluster,
+ StorageNodes: nodes,
+ },
+ }
+
+ // Step4. finally, save data with json to local
+ if err := ls.WriteByJSON(filename, contents); err != nil {
+ return nil, xerr.NewCliErr("write backup info by json failed")
+ }
+
+ return contents, nil
+}
+
+func execBackup(lsBackup *model.LsBackup) error {
+ var (
+ wg sync.WaitGroup
+ sNodes = lsBackup.SsBackup.StorageNodes
+ dnCh = make(chan *model.DataNode, len(sNodes))
+ failSnCh = make(chan *model.StorageNode, len(sNodes))
+ success = true
+ )
+ logging.Info("Starting send backup command to agent server...")
+
+ for _, node := range sNodes {
+ wg.Add(1)
+ go func(wg *sync.WaitGroup, node *model.StorageNode) {
+ defer wg.Done()
+ as := pkg.NewAgentServer(fmt.Sprintf("%s:%d", node.IP, AgentPort))
+ _execBackup(as, node, failSnCh, dnCh)
+ }(&wg, node)
+ }
+
+ wg.Wait()
+ close(dnCh)
+ close(failSnCh)
+
+ // TODO format print data like a table
+ for errN := range failSnCh {
+ success = false
+ fmt.Printf("failed node detail: [IP:%s, PORT:%d]\n", errN.IP, errN.Port)
+ }
+
+ if !success {
+ lsBackup.SsBackup.Status = model.SsBackupStatusFailed
+ return xerr.NewCliErr("backup failed")
+ }
+
+ // save data node list to lsBackup
+ for dn := range dnCh {
+ lsBackup.DnList = append(lsBackup.DnList, dn)
+ }
+
+ lsBackup.SsBackup.Status = model.SsBackupStatusRunning
+ return nil
+}
+
+func _execBackup(as pkg.IAgentServer, node *model.StorageNode, failSnCh chan *model.StorageNode, dnCh chan *model.DataNode) {
+ in := &model.BackupIn{
+ DbPort: node.Port,
+ DbName: node.Database,
+ Username: node.Username,
+ Password: node.Password,
+ DnBackupPath: BackupPath,
+ DnThreadsNum: ThreadsNum,
+ DnBackupMode: model.BDBackModeFull,
+ Instance: defaultInstance,
+ }
+ backupID, err := as.Backup(in)
+ if err != nil {
+ logging.Error(fmt.Sprintf("backup failed, %s\n", err.Error()))
+ failSnCh <- node
+ return
+ }
+
+ // update DnList of lsBackup
+ dn := &model.DataNode{
+ IP: node.IP,
+ Port: node.Port,
+ Status: model.SsBackupStatusRunning,
+ BackupID: backupID,
+ StartTime: time.Now().Unix(),
+ EndTime: 0,
+ }
+ dnCh <- dn
+}
+
+func checkBackupStatus(lsBackup *model.LsBackup) model.BackupStatus {
+ var (
+ wg sync.WaitGroup
+ dataNodeMap = make(map[string]*model.DataNode)
+ backupFinalStatus = model.SsBackupStatusCompleted
+ statusCh = make(chan *model.DataNode, len(lsBackup.DnList))
+ )
+
+ // DataNode.IP -> DataNode
+ for _, dn := range lsBackup.DnList {
+ dataNodeMap[dn.IP] = dn
+ }
+
+ for _, sn := range lsBackup.SsBackup.StorageNodes {
+ wg.Add(1)
+ go func(wg *sync.WaitGroup, sn *model.StorageNode) {
+ defer wg.Done()
+ as := pkg.NewAgentServer(fmt.Sprintf("%s:%d", sn.IP, AgentPort))
+ dn := dataNodeMap[sn.IP]
+
+ // check backup status
+ status := checkStatus(as, sn, dn.BackupID, model.BackupStatus(""), defaultShowDetailRetryTimes)
+
+ // update DataNode status
+ dn.Status = status
+ dn.EndTime = time.Now().Unix()
+ statusCh <- dn
+ }(&wg, sn)
+ }
+
+ wg.Wait()
+ close(statusCh)
+
+ for dn := range statusCh {
+ fmt.Printf("data node backup final status: [IP:%s] ==> %s", dn.IP, dn.Status)
+ }
+
+ for _, dn := range lsBackup.DnList {
+ if dn.Status == model.SsBackupStatusFailed {
+ backupFinalStatus = model.SsBackupStatusFailed
+ }
+ }
+
+ lsBackup.SsBackup.Status = backupFinalStatus
+ return backupFinalStatus
+}
+
+func checkStatus(as pkg.IAgentServer, sn *model.StorageNode, backupId string, status model.BackupStatus, retryTimes uint8) model.BackupStatus {
+ if retryTimes+1 == 0 {
+ return status
+ }
+ if status == model.SsBackupStatusCompleted || status == model.SsBackupStatusFailed {
+ return status
+ }
+
+ // todo: how often to check backup status
+ time.Sleep(time.Second * 2)
+
+ in := &model.ShowDetailIn{
+ DbPort: sn.Port,
+ DbName: sn.Database,
+ Username: sn.Username,
+ Password: sn.Password,
+ DnBackupId: backupId,
+ DnBackupPath: BackupPath,
+ Instance: defaultInstance,
+ }
+ backupInfo, err := as.ShowDetail(in)
+ if err != nil {
+ logging.Error(fmt.Sprintf("get storage node [IP:%s] backup detail from agent server failed, will retry %d times.", sn.IP, retryTimes))
+ return checkStatus(as, sn, backupId, model.SsBackupStatusCheckError, retryTimes-1)
+ }
+ return checkStatus(as, sn, backupId, backupInfo.Status, retryTimes)
+}
+
func init() {
Backup.PersistentFlags().StringP(dnBackupPath, "B", "", "DataNode backup path")
- Backup.PersistentFlags().Uint16P(dnThreadsNum, "j", 1, "DataNode backup threads nums")
+ Backup.PersistentFlags().Uint8P(dnThreadsNum, "j", 1, "DataNode backup threads nums")
}
diff --git a/pitr/cli/internal/cmd/backup_test.go b/pitr/cli/internal/cmd/backup_test.go
new file mode 100644
index 0000000..e40e973
--- /dev/null
+++ b/pitr/cli/internal/cmd/backup_test.go
@@ -0,0 +1,224 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+ */
+
+package cmd
+
+import (
+ "errors"
+ "fmt"
+ "github.com/apache/shardingsphere-on-cloud/pitr/cli/internal/pkg"
+ mock_pkg "github.com/apache/shardingsphere-on-cloud/pitr/cli/internal/pkg/mocks"
+ "github.com/apache/shardingsphere-on-cloud/pitr/cli/internal/pkg/model"
+ "github.com/golang/mock/gomock"
+
+ . "github.com/onsi/ginkgo/v2"
+ . "github.com/onsi/gomega"
+)
+
+var ctrl *gomock.Controller
+
+var _ = Describe("Backup", func() {
+ Context("check status", func() {
+ var (
+ as *mock_pkg.MockIAgentServer
+ sn = &model.StorageNode{
+ IP: "127.0.0.1",
+ }
+ )
+ BeforeEach(func() {
+ ctrl = gomock.NewController(GinkgoT())
+ as = mock_pkg.NewMockIAgentServer(ctrl)
+ })
+ AfterEach(func() {
+ ctrl.Finish()
+ })
+
+ It("agent server return err", func() {
+ as.EXPECT().ShowDetail(&model.ShowDetailIn{Instance: defaultInstance}).Return(nil, errors.New("timeout"))
+ Expect(checkStatus(as, sn, "backup-id", "", 0)).To(Equal(model.SsBackupStatusCheckError))
+ })
+
+ It("mock agent server and return failed status", func() {
+ as.EXPECT().ShowDetail(&model.ShowDetailIn{Instance: defaultInstance}).Return(&model.BackupInfo{Status: model.SsBackupStatusFailed}, nil)
+ Expect(checkStatus(as, sn, "backup-id", "", 0)).To(Equal(model.SsBackupStatusFailed))
+ })
+
+ It("mock agent server and return completed status", func() {
+ as.EXPECT().ShowDetail(&model.ShowDetailIn{Instance: defaultInstance}).Return(&model.BackupInfo{Status: model.SsBackupStatusCompleted}, nil)
+ Expect(checkStatus(as, sn, "backup-id", "", 0)).To(Equal(model.SsBackupStatusCompleted))
+ })
+
+ It("mock agent server and return timeout error first time and then retry 1 time return completed status", func() {
+ as.EXPECT().ShowDetail(&model.ShowDetailIn{Instance: defaultInstance}).Times(1).Return(nil, errors.New("timeout"))
+ as.EXPECT().ShowDetail(&model.ShowDetailIn{Instance: defaultInstance}).Return(&model.BackupInfo{Status: model.SsBackupStatusCompleted}, nil)
+ Expect(checkStatus(as, sn, "backup-id", "", 1)).To(Equal(model.SsBackupStatusCompleted))
+ })
+ })
+
+ Context("export data", func() {
+ var (
+ proxy *mock_pkg.MockIShardingSphereProxy
+ ls *mock_pkg.MockILocalStorage
+ )
+ BeforeEach(func() {
+ proxy = mock_pkg.NewMockIShardingSphereProxy(ctrl)
+ ls = mock_pkg.NewMockILocalStorage(ctrl)
+ })
+ AfterEach(func() {
+ ctrl.Finish()
+ })
+
+ It("export data", func() {
+ proxy.EXPECT().LockForBackup().Return(nil)
+ // mock proxy export metadata
+ proxy.EXPECT().ExportMetaData().Return(&model.ClusterInfo{SnapshotInfo: model.SnapshotInfo{Csn: "mock-csn"}}, nil)
+ // mock proxy export node storage data
+ proxy.EXPECT().ExportStorageNodes().Return([]*model.StorageNode{}, nil)
+ // mock ls generate filename
+ ls.EXPECT().GenFilename(pkg.ExtnJSON).Return("mock.json")
+ // mock ls write by json
+ ls.EXPECT().WriteByJSON("mock.json", gomock.Any()).Return(nil)
+
+ bk, err := exportData(proxy, ls)
+ Expect(err).To(BeNil())
+ Expect(bk.Info.CSN).To(Equal("mock-csn"))
+ })
+ })
+
+ Context("exec backup", func() {
+ var as *mock_pkg.MockIAgentServer
+ bak := &model.LsBackup{
+ DnList: nil,
+ SsBackup: &model.SsBackup{
+ Status: "Running",
+ StorageNodes: []*model.StorageNode{},
+ },
+ }
+ BeforeEach(func() {
+ as = mock_pkg.NewMockIAgentServer(ctrl)
+ })
+ AfterEach(func() {
+ ctrl.Finish()
+ })
+ It("exec backup empty storage nodes", func() {
+ Expect(execBackup(bak)).To(BeNil())
+ })
+ It("exec backup 2 storage nodes", func() {
+ bak.SsBackup.StorageNodes = []*model.StorageNode{
+ {
+ IP: "127.0.0.1",
+ Port: 80,
+ Username: "",
+ Password: "",
+ Database: "",
+ Remark: "",
+ },
+ {
+ IP: "127.0.0.2",
+ Port: 443,
+ Username: "",
+ Password: "",
+ Database: "",
+ Remark: "",
+ },
+ }
+ as.EXPECT().Backup(gomock.Any()).Return("", nil)
+ Expect(execBackup(bak)).NotTo(BeNil())
+ Expect(execBackup(bak).Error()).To(Equal("backup failed"))
+ })
+ })
+
+ Context("exec backup", func() {
+ It("exec backup", func() {
+ var (
+ as *mock_pkg.MockIAgentServer
+ node = &model.StorageNode{}
+ failSnCh = make(chan *model.StorageNode, 10)
+ dnCh = make(chan *model.DataNode, 10)
+ )
+ as = mock_pkg.NewMockIAgentServer(ctrl)
+
+ defer close(failSnCh)
+ defer close(dnCh)
+ defer ctrl.Finish()
+ as.EXPECT().Backup(gomock.Any()).Return("backup-id", nil)
+ _execBackup(as, node, failSnCh, dnCh)
+ Expect(len(failSnCh)).To(Equal(0))
+ Expect(len(dnCh)).To(Equal(1))
+ })
+ })
+})
+
+var _ = Describe("test backup manually", func() {
+ var (
+ // implement with your own dev
+ u string = "username"
+ p string = "password"
+ db string = "database"
+ h string = "host-ip"
+ pt uint16 = 3307
+ )
+ Context("test manually", func() {})
+
+ It("unlock after lock", func() {
+ proxy, _ := pkg.NewShardingSphereProxy(u, p, db, h, pt)
+ Expect(proxy.LockForBackup()).To(BeNil())
+ Expect(proxy.Unlock()).To(BeNil())
+ })
+
+ It("export data in dev", func() {
+ proxy, _ := pkg.NewShardingSphereProxy(u, p, db, h, pt)
+ ls, _ := pkg.NewLocalStorage("./")
+
+ Expect(proxy.LockForBackup()).To(BeNil())
+ defer func() {
+ Expect(proxy.Unlock()).To(BeNil())
+ }()
+
+ bk, err := exportData(proxy, ls)
+
+ Expect(err).To(BeNil())
+ Expect(bk.Info).NotTo(BeNil())
+ })
+
+ It("test all", func() {
+ proxy, _ := pkg.NewShardingSphereProxy(u, p, db, h, pt)
+ ls, _ := pkg.NewLocalStorage("./")
+ bak, err := exportData(proxy, ls)
+ Expect(err).To(BeNil())
+ Expect(bak.Info).NotTo(BeNil())
+ fmt.Printf("cluster info:%+v\n ss backup storagde nodes nums:%+v\nfirst storage node info:%+v\n", bak.Info, len(bak.SsBackup.StorageNodes), bak.SsBackup.StorageNodes[0])
+
+ AgentPort = 18080
+ BackupPath = "/home/omm/data"
+ ThreadsNum = 1
+ bak.SsBackup.StorageNodes[0].IP = "https://" + h
+
+ err = execBackup(bak)
+ Expect(err).To(BeNil())
+ Expect(bak.SsBackup.Status).To(Equal(model.SsBackupStatusRunning))
+ fmt.Printf("data node list nums:%d\nfirst data node info:%+v", len(bak.DnList), bak.DnList[0])
+
+ err = ls.WriteByJSON(filename, bak)
+ Expect(err).To(BeNil())
+
+ Expect(checkBackupStatus(bak)).To(Equal(model.SsBackupStatusCompleted))
+
+ err = ls.WriteByJSON(filename, bak)
+ Expect(err).To(BeNil())
+ })
+})
diff --git a/pitr/cli/internal/cmd/cmd_suite_test.go b/pitr/cli/internal/cmd/cmd_suite_test.go
new file mode 100644
index 0000000..92c583c
--- /dev/null
+++ b/pitr/cli/internal/cmd/cmd_suite_test.go
@@ -0,0 +1,55 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+ */
+
+package cmd_test
+
+import (
+ "fmt"
+ "go.uber.org/zap"
+ "go.uber.org/zap/zapcore"
+ "testing"
+
+ "github.com/apache/shardingsphere-on-cloud/pitr/cli/pkg/logging"
+ . "github.com/onsi/ginkgo/v2"
+ . "github.com/onsi/gomega"
+)
+
+func initLog() {
+ prodConfig := zap.NewProductionConfig()
+ prodConfig.Encoding = "console"
+ prodConfig.DisableCaller = true
+ prodConfig.DisableStacktrace = true
+ prodConfig.EncoderConfig.EncodeTime = zapcore.RFC3339TimeEncoder
+ prodConfig.Level = zap.NewAtomicLevelAt(zapcore.InfoLevel)
+ prodConfig.EncoderConfig.ConsoleSeparator = " "
+
+ logger, err := prodConfig.Build(
+ zap.WithCaller(false),
+ zap.AddCallerSkip(1),
+ zap.AddStacktrace(zapcore.FatalLevel),
+ )
+ if err != nil {
+ panic(fmt.Errorf("an unknown error occured in the zap-log"))
+ }
+ logging.Init(logger)
+}
+
+func TestCmd(t *testing.T) {
+ initLog()
+ RegisterFailHandler(Fail)
+ RunSpecs(t, "Cmd Suite")
+}
diff --git a/pitr/cli/internal/pkg/agent-server.go b/pitr/cli/internal/pkg/agent-server.go
index 7af91b7..da18999 100644
--- a/pitr/cli/internal/pkg/agent-server.go
+++ b/pitr/cli/internal/pkg/agent-server.go
@@ -27,7 +27,7 @@ import (
"net/http"
)
-type agentServer struct {
+type AgentServer struct {
addr string
_apiBackup string
@@ -36,8 +36,15 @@ type agentServer struct {
_apiShowList string
}
-func NewAgentServer(addr string) *agentServer {
- return &agentServer{
+type IAgentServer interface {
+ Backup(in *model.BackupIn) (string, error)
+ Restore(in *model.RestoreIn) error
+ ShowDetail(in *model.ShowDetailIn) (*model.BackupInfo, error)
+ ShowList(in *model.ShowListIn) ([]model.BackupInfo, error)
+}
+
+func NewAgentServer(addr string) IAgentServer {
+ return &AgentServer{
addr: addr,
_apiBackup: "/api/backup",
@@ -47,7 +54,7 @@ func NewAgentServer(addr string) *agentServer {
}
}
-func (as *agentServer) Backup(in *model.BackupIn) (string, error) {
+func (as *AgentServer) Backup(in *model.BackupIn) (string, error) {
url := fmt.Sprintf("%s%s", as.addr, as._apiBackup)
out := &model.BackupOutResp{}
@@ -75,7 +82,7 @@ func (as *agentServer) Backup(in *model.BackupIn) (string, error) {
return out.Data.ID, nil
}
-func (as *agentServer) Restore(in *model.RestoreIn) error {
+func (as *AgentServer) Restore(in *model.RestoreIn) error {
url := fmt.Sprintf("%s%s", as.addr, as._apiRestore)
out := &model.RestoreResp{}
@@ -103,7 +110,7 @@ func (as *agentServer) Restore(in *model.RestoreIn) error {
return nil
}
-func (as *agentServer) ShowDetail(in *model.ShowDetailIn) (*model.BackupInfo, error) {
+func (as *AgentServer) ShowDetail(in *model.ShowDetailIn) (*model.BackupInfo, error) {
url := fmt.Sprintf("%s%s", as.addr, as._apiShowDetail)
out := &model.BackupDetailResp{}
@@ -131,7 +138,7 @@ func (as *agentServer) ShowDetail(in *model.ShowDetailIn) (*model.BackupInfo, er
return &out.Data, nil
}
-func (as *agentServer) ShowList(in *model.ShowListIn) ([]model.BackupInfo, error) {
+func (as *AgentServer) ShowList(in *model.ShowListIn) ([]model.BackupInfo, error) {
url := fmt.Sprintf("%s%s", as.addr, as._apiShowList)
out := &model.BackupListResp{}
diff --git a/pitr/cli/internal/pkg/local-storage.go b/pitr/cli/internal/pkg/local-storage.go
index 1d8a74a..832cffc 100644
--- a/pitr/cli/internal/pkg/local-storage.go
+++ b/pitr/cli/internal/pkg/local-storage.go
@@ -37,19 +37,18 @@ type (
}
ILocalStorage interface {
- init() error
WriteByJSON(name string, contents *model.LsBackup) error
- GenFilename(extn extension) string
+ GenFilename(extn Extension) string
ReadAll() ([]model.LsBackup, error)
ReadByID(id string) (*model.LsBackup, error)
ReadByCSN(csn string) (*model.LsBackup, error)
}
- extension string
+ Extension string
)
const (
- ExtnJSON extension = "JOSN"
+ ExtnJSON Extension = "JSON"
)
func NewLocalStorage(root string) (ILocalStorage, error) {
@@ -190,7 +189,7 @@ GenFilename gen a filename based on the file extension
if extn is empty,return a postfix-free filename
if extn=JSON,return the JSON filename like **.json
*/
-func (ls *localStorage) GenFilename(extn extension) string {
+func (ls *localStorage) GenFilename(extn Extension) string {
prefix := time.Now().UTC().Format("20060102150405")
suffix := strutil.Random(8)
diff --git a/pitr/cli/internal/pkg/local-storage_test.go b/pitr/cli/internal/pkg/local-storage_test.go
index 2781d71..d713c9b 100644
--- a/pitr/cli/internal/pkg/local-storage_test.go
+++ b/pitr/cli/internal/pkg/local-storage_test.go
@@ -98,10 +98,10 @@ var _ = Describe("ILocalStorage", func() {
StartTime: time.Now().Unix(),
EndTime: time.Now().Add(time.Minute).Unix(),
},
- DnList: []model.DataNode{
+ DnList: []*model.DataNode{
{
IP: "1.1.1.1",
- Port: "5432",
+ Port: 5432,
Status: "Completed",
BackupID: "SK08DAK1",
StartTime: time.Now().Unix(),
@@ -109,7 +109,7 @@ var _ = Describe("ILocalStorage", func() {
},
{
IP: "1.1.1.2",
- Port: "5432",
+ Port: 5432,
Status: "Completed",
BackupID: "SK08DAK2",
StartTime: time.Now().Unix(),
@@ -118,7 +118,7 @@ var _ = Describe("ILocalStorage", func() {
},
SsBackup: &model.SsBackup{
Status: "Completed",
- ClusterInfo: model.ClusterInfo{
+ ClusterInfo: &model.ClusterInfo{
MetaData: model.MetaData{
Databases: model.Databases{
ShardingDb: "ShardingDb",
diff --git a/pitr/cli/internal/pkg/mocks/agent-server.go b/pitr/cli/internal/pkg/mocks/agent-server.go
new file mode 100644
index 0000000..e5cbb95
--- /dev/null
+++ b/pitr/cli/internal/pkg/mocks/agent-server.go
@@ -0,0 +1,111 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+ */
+
+// Code generated by MockGen. DO NOT EDIT.
+// Source: internal/pkg/agent-server.go
+
+// Package mock_pkg is a generated GoMock package.
+package mock_pkg
+
+import (
+ reflect "reflect"
+
+ model "github.com/apache/shardingsphere-on-cloud/pitr/cli/internal/pkg/model"
+ gomock "github.com/golang/mock/gomock"
+)
+
+// MockIAgentServer is a mock of IAgentServer interface.
+type MockIAgentServer struct {
+ ctrl *gomock.Controller
+ recorder *MockIAgentServerMockRecorder
+}
+
+// MockIAgentServerMockRecorder is the mock recorder for MockIAgentServer.
+type MockIAgentServerMockRecorder struct {
+ mock *MockIAgentServer
+}
+
+// NewMockIAgentServer creates a new mock instance.
+func NewMockIAgentServer(ctrl *gomock.Controller) *MockIAgentServer {
+ mock := &MockIAgentServer{ctrl: ctrl}
+ mock.recorder = &MockIAgentServerMockRecorder{mock}
+ return mock
+}
+
+// EXPECT returns an object that allows the caller to indicate expected use.
+func (m *MockIAgentServer) EXPECT() *MockIAgentServerMockRecorder {
+ return m.recorder
+}
+
+// Backup mocks base method.
+func (m *MockIAgentServer) Backup(in *model.BackupIn) (string, error) {
+ m.ctrl.T.Helper()
+ ret := m.ctrl.Call(m, "Backup", in)
+ ret0, _ := ret[0].(string)
+ ret1, _ := ret[1].(error)
+ return ret0, ret1
+}
+
+// Backup indicates an expected call of Backup.
+func (mr *MockIAgentServerMockRecorder) Backup(in interface{}) *gomock.Call {
+ mr.mock.ctrl.T.Helper()
+ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Backup", reflect.TypeOf((*MockIAgentServer)(nil).Backup), in)
+}
+
+// Restore mocks base method.
+func (m *MockIAgentServer) Restore(in *model.RestoreIn) error {
+ m.ctrl.T.Helper()
+ ret := m.ctrl.Call(m, "Restore", in)
+ ret0, _ := ret[0].(error)
+ return ret0
+}
+
+// Restore indicates an expected call of Restore.
+func (mr *MockIAgentServerMockRecorder) Restore(in interface{}) *gomock.Call {
+ mr.mock.ctrl.T.Helper()
+ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Restore", reflect.TypeOf((*MockIAgentServer)(nil).Restore), in)
+}
+
+// ShowDetail mocks base method.
+func (m *MockIAgentServer) ShowDetail(in *model.ShowDetailIn) (*model.BackupInfo, error) {
+ m.ctrl.T.Helper()
+ ret := m.ctrl.Call(m, "ShowDetail", in)
+ ret0, _ := ret[0].(*model.BackupInfo)
+ ret1, _ := ret[1].(error)
+ return ret0, ret1
+}
+
+// ShowDetail indicates an expected call of ShowDetail.
+func (mr *MockIAgentServerMockRecorder) ShowDetail(in interface{}) *gomock.Call {
+ mr.mock.ctrl.T.Helper()
+ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ShowDetail", reflect.TypeOf((*MockIAgentServer)(nil).ShowDetail), in)
+}
+
+// ShowList mocks base method.
+func (m *MockIAgentServer) ShowList(in *model.ShowListIn) ([]model.BackupInfo, error) {
+ m.ctrl.T.Helper()
+ ret := m.ctrl.Call(m, "ShowList", in)
+ ret0, _ := ret[0].([]model.BackupInfo)
+ ret1, _ := ret[1].(error)
+ return ret0, ret1
+}
+
+// ShowList indicates an expected call of ShowList.
+func (mr *MockIAgentServerMockRecorder) ShowList(in interface{}) *gomock.Call {
+ mr.mock.ctrl.T.Helper()
+ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ShowList", reflect.TypeOf((*MockIAgentServer)(nil).ShowList), in)
+}
diff --git a/pitr/cli/internal/pkg/mocks/local-storage.go b/pitr/cli/internal/pkg/mocks/local-storage.go
new file mode 100644
index 0000000..7445096
--- /dev/null
+++ b/pitr/cli/internal/pkg/mocks/local-storage.go
@@ -0,0 +1,126 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+ */
+
+// Code generated by MockGen. DO NOT EDIT.
+// Source: internal/pkg/local-storage.go
+
+// Package mock_pkg is a generated GoMock package.
+package mock_pkg
+
+import (
+ reflect "reflect"
+
+ pkg "github.com/apache/shardingsphere-on-cloud/pitr/cli/internal/pkg"
+ model "github.com/apache/shardingsphere-on-cloud/pitr/cli/internal/pkg/model"
+ gomock "github.com/golang/mock/gomock"
+)
+
+// MockILocalStorage is a mock of ILocalStorage interface.
+type MockILocalStorage struct {
+ ctrl *gomock.Controller
+ recorder *MockILocalStorageMockRecorder
+}
+
+// MockILocalStorageMockRecorder is the mock recorder for MockILocalStorage.
+type MockILocalStorageMockRecorder struct {
+ mock *MockILocalStorage
+}
+
+// NewMockILocalStorage creates a new mock instance.
+func NewMockILocalStorage(ctrl *gomock.Controller) *MockILocalStorage {
+ mock := &MockILocalStorage{ctrl: ctrl}
+ mock.recorder = &MockILocalStorageMockRecorder{mock}
+ return mock
+}
+
+// EXPECT returns an object that allows the caller to indicate expected use.
+func (m *MockILocalStorage) EXPECT() *MockILocalStorageMockRecorder {
+ return m.recorder
+}
+
+// GenFilename mocks base method.
+func (m *MockILocalStorage) GenFilename(extn pkg.Extension) string {
+ m.ctrl.T.Helper()
+ ret := m.ctrl.Call(m, "GenFilename", extn)
+ ret0, _ := ret[0].(string)
+ return ret0
+}
+
+// GenFilename indicates an expected call of GenFilename.
+func (mr *MockILocalStorageMockRecorder) GenFilename(extn interface{}) *gomock.Call {
+ mr.mock.ctrl.T.Helper()
+ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GenFilename", reflect.TypeOf((*MockILocalStorage)(nil).GenFilename), extn)
+}
+
+// ReadAll mocks base method.
+func (m *MockILocalStorage) ReadAll() ([]model.LsBackup, error) {
+ m.ctrl.T.Helper()
+ ret := m.ctrl.Call(m, "ReadAll")
+ ret0, _ := ret[0].([]model.LsBackup)
+ ret1, _ := ret[1].(error)
+ return ret0, ret1
+}
+
+// ReadAll indicates an expected call of ReadAll.
+func (mr *MockILocalStorageMockRecorder) ReadAll() *gomock.Call {
+ mr.mock.ctrl.T.Helper()
+ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ReadAll", reflect.TypeOf((*MockILocalStorage)(nil).ReadAll))
+}
+
+// ReadByCSN mocks base method.
+func (m *MockILocalStorage) ReadByCSN(csn string) (*model.LsBackup, error) {
+ m.ctrl.T.Helper()
+ ret := m.ctrl.Call(m, "ReadByCSN", csn)
+ ret0, _ := ret[0].(*model.LsBackup)
+ ret1, _ := ret[1].(error)
+ return ret0, ret1
+}
+
+// ReadByCSN indicates an expected call of ReadByCSN.
+func (mr *MockILocalStorageMockRecorder) ReadByCSN(csn interface{}) *gomock.Call {
+ mr.mock.ctrl.T.Helper()
+ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ReadByCSN", reflect.TypeOf((*MockILocalStorage)(nil).ReadByCSN), csn)
+}
+
+// ReadByID mocks base method.
+func (m *MockILocalStorage) ReadByID(id string) (*model.LsBackup, error) {
+ m.ctrl.T.Helper()
+ ret := m.ctrl.Call(m, "ReadByID", id)
+ ret0, _ := ret[0].(*model.LsBackup)
+ ret1, _ := ret[1].(error)
+ return ret0, ret1
+}
+
+// ReadByID indicates an expected call of ReadByID.
+func (mr *MockILocalStorageMockRecorder) ReadByID(id interface{}) *gomock.Call {
+ mr.mock.ctrl.T.Helper()
+ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ReadByID", reflect.TypeOf((*MockILocalStorage)(nil).ReadByID), id)
+}
+
+// WriteByJSON mocks base method.
+func (m *MockILocalStorage) WriteByJSON(name string, contents *model.LsBackup) error {
+ m.ctrl.T.Helper()
+ ret := m.ctrl.Call(m, "WriteByJSON", name, contents)
+ ret0, _ := ret[0].(error)
+ return ret0
+}
+
+// WriteByJSON indicates an expected call of WriteByJSON.
+func (mr *MockILocalStorageMockRecorder) WriteByJSON(name, contents interface{}) *gomock.Call {
+ mr.mock.ctrl.T.Helper()
+ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WriteByJSON", reflect.TypeOf((*MockILocalStorage)(nil).WriteByJSON), name, contents)
+}
diff --git a/pitr/cli/internal/pkg/mocks/shardingsphere-proxy.go b/pitr/cli/internal/pkg/mocks/shardingsphere-proxy.go
new file mode 100644
index 0000000..ec3c8f3
--- /dev/null
+++ b/pitr/cli/internal/pkg/mocks/shardingsphere-proxy.go
@@ -0,0 +1,138 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+ */
+
+// Code generated by MockGen. DO NOT EDIT.
+// Source: internal/pkg/shardingsphere-proxy.go
+
+// Package mock_pkg is a generated GoMock package.
+package mock_pkg
+
+import (
+ reflect "reflect"
+
+ model "github.com/apache/shardingsphere-on-cloud/pitr/cli/internal/pkg/model"
+ gomock "github.com/golang/mock/gomock"
+)
+
+// MockIShardingSphereProxy is a mock of IShardingSphereProxy interface.
+type MockIShardingSphereProxy struct {
+ ctrl *gomock.Controller
+ recorder *MockIShardingSphereProxyMockRecorder
+}
+
+// MockIShardingSphereProxyMockRecorder is the mock recorder for MockIShardingSphereProxy.
+type MockIShardingSphereProxyMockRecorder struct {
+ mock *MockIShardingSphereProxy
+}
+
+// NewMockIShardingSphereProxy creates a new mock instance.
+func NewMockIShardingSphereProxy(ctrl *gomock.Controller) *MockIShardingSphereProxy {
+ mock := &MockIShardingSphereProxy{ctrl: ctrl}
+ mock.recorder = &MockIShardingSphereProxyMockRecorder{mock}
+ return mock
+}
+
+// EXPECT returns an object that allows the caller to indicate expected use.
+func (m *MockIShardingSphereProxy) EXPECT() *MockIShardingSphereProxyMockRecorder {
+ return m.recorder
+}
+
+// ExportMetaData mocks base method.
+func (m *MockIShardingSphereProxy) ExportMetaData() (*model.ClusterInfo, error) {
+ m.ctrl.T.Helper()
+ ret := m.ctrl.Call(m, "ExportMetaData")
+ ret0, _ := ret[0].(*model.ClusterInfo)
+ ret1, _ := ret[1].(error)
+ return ret0, ret1
+}
+
+// ExportMetaData indicates an expected call of ExportMetaData.
+func (mr *MockIShardingSphereProxyMockRecorder) ExportMetaData() *gomock.Call {
+ mr.mock.ctrl.T.Helper()
+ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ExportMetaData", reflect.TypeOf((*MockIShardingSphereProxy)(nil).ExportMetaData))
+}
+
+// ExportStorageNodes mocks base method.
+func (m *MockIShardingSphereProxy) ExportStorageNodes() ([]*model.StorageNode, error) {
+ m.ctrl.T.Helper()
+ ret := m.ctrl.Call(m, "ExportStorageNodes")
+ ret0, _ := ret[0].([]*model.StorageNode)
+ ret1, _ := ret[1].(error)
+ return ret0, ret1
+}
+
+// ExportStorageNodes indicates an expected call of ExportStorageNodes.
+func (mr *MockIShardingSphereProxyMockRecorder) ExportStorageNodes() *gomock.Call {
+ mr.mock.ctrl.T.Helper()
+ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ExportStorageNodes", reflect.TypeOf((*MockIShardingSphereProxy)(nil).ExportStorageNodes))
+}
+
+// ImportMetaData mocks base method.
+func (m *MockIShardingSphereProxy) ImportMetaData(in *model.ClusterInfo) error {
+ m.ctrl.T.Helper()
+ ret := m.ctrl.Call(m, "ImportMetaData", in)
+ ret0, _ := ret[0].(error)
+ return ret0
+}
+
+// ImportMetaData indicates an expected call of ImportMetaData.
+func (mr *MockIShardingSphereProxyMockRecorder) ImportMetaData(in interface{}) *gomock.Call {
+ mr.mock.ctrl.T.Helper()
+ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ImportMetaData", reflect.TypeOf((*MockIShardingSphereProxy)(nil).ImportMetaData), in)
+}
+
+// LockForBackup mocks base method.
+func (m *MockIShardingSphereProxy) LockForBackup() error {
+ m.ctrl.T.Helper()
+ ret := m.ctrl.Call(m, "LockForBackup")
+ ret0, _ := ret[0].(error)
+ return ret0
+}
+
+// LockForBackup indicates an expected call of LockForBackup.
+func (mr *MockIShardingSphereProxyMockRecorder) LockForBackup() *gomock.Call {
+ mr.mock.ctrl.T.Helper()
+ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "LockForBackup", reflect.TypeOf((*MockIShardingSphereProxy)(nil).LockForBackup))
+}
+
+// LockForRestore mocks base method.
+func (m *MockIShardingSphereProxy) LockForRestore() error {
+ m.ctrl.T.Helper()
+ ret := m.ctrl.Call(m, "LockForRestore")
+ ret0, _ := ret[0].(error)
+ return ret0
+}
+
+// LockForRestore indicates an expected call of LockForRestore.
+func (mr *MockIShardingSphereProxyMockRecorder) LockForRestore() *gomock.Call {
+ mr.mock.ctrl.T.Helper()
+ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "LockForRestore", reflect.TypeOf((*MockIShardingSphereProxy)(nil).LockForRestore))
+}
+
+// Unlock mocks base method.
+func (m *MockIShardingSphereProxy) Unlock() error {
+ m.ctrl.T.Helper()
+ ret := m.ctrl.Call(m, "Unlock")
+ ret0, _ := ret[0].(error)
+ return ret0
+}
+
+// Unlock indicates an expected call of Unlock.
+func (mr *MockIShardingSphereProxyMockRecorder) Unlock() *gomock.Call {
+ mr.mock.ctrl.T.Helper()
+ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Unlock", reflect.TypeOf((*MockIShardingSphereProxy)(nil).Unlock))
+}
diff --git a/pitr/cli/internal/pkg/model/as_backup.go b/pitr/cli/internal/pkg/model/as_backup.go
index e0cc1cf..1f5cc35 100644
--- a/pitr/cli/internal/pkg/model/as_backup.go
+++ b/pitr/cli/internal/pkg/model/as_backup.go
@@ -24,10 +24,10 @@ type (
Username string `json:"username"`
Password string `json:"password"`
- DnBackupPath string `json:"dn_backup_path"`
- DnThreadsNum uint8 `json:"dn_threads_num"`
- DnBackupMode string `json:"dn_backup_mode"`
- Instance string `json:"instance"`
+ DnBackupPath string `json:"dn_backup_path"`
+ DnThreadsNum uint8 `json:"dn_threads_num"`
+ DnBackupMode DBBackupMode `json:"dn_backup_mode"`
+ Instance string `json:"instance"`
}
BackupOut struct {
diff --git a/pitr/cli/internal/pkg/model/as_show.go b/pitr/cli/internal/pkg/model/as_show.go
index 231db16..8fefabf 100644
--- a/pitr/cli/internal/pkg/model/as_show.go
+++ b/pitr/cli/internal/pkg/model/as_show.go
@@ -38,13 +38,13 @@ type (
}
BackupInfo struct {
- Id string `json:"dn_backup_id"`
- Path string `json:"dn_backup_path"`
- Mode string `json:"db_backup_mode"`
- Instance string `json:"instance"`
- StartTime string `json:"start_time"`
- EndTime string `json:"end_time"`
- Status string `json:"status"`
+ Id string `json:"dn_backup_id"`
+ Path string `json:"dn_backup_path"`
+ Mode string `json:"db_backup_mode"`
+ Instance string `json:"instance"`
+ StartTime string `json:"start_time"`
+ EndTime string `json:"end_time"`
+ Status BackupStatus `json:"status"`
}
BackupDetailResp struct {
diff --git a/pitr/cli/internal/pkg/model/as_backup.go b/pitr/cli/internal/pkg/model/const.go
similarity index 58%
copy from pitr/cli/internal/pkg/model/as_backup.go
copy to pitr/cli/internal/pkg/model/const.go
index e0cc1cf..d745929 100644
--- a/pitr/cli/internal/pkg/model/as_backup.go
+++ b/pitr/cli/internal/pkg/model/const.go
@@ -17,26 +17,16 @@
package model
-type (
- BackupIn struct {
- DbPort uint16 `json:"db_port"`
- DbName string `json:"db_name"`
- Username string `json:"username"`
- Password string `json:"password"`
+type BackupStatus string
+type DBBackupMode string
- DnBackupPath string `json:"dn_backup_path"`
- DnThreadsNum uint8 `json:"dn_threads_num"`
- DnBackupMode string `json:"dn_backup_mode"`
- Instance string `json:"instance"`
- }
+const (
+ SsBackupStatusWaiting BackupStatus = "Waiting"
+ SsBackupStatusRunning BackupStatus = "Running"
+ SsBackupStatusCompleted BackupStatus = "Completed"
+ SsBackupStatusFailed BackupStatus = "Failed"
+ SsBackupStatusCheckError BackupStatus = "CheckError"
- BackupOut struct {
- ID string `json:"backup_id"`
- }
-
- BackupOutResp struct {
- Code int `json:"code" validate:"required"`
- Msg string `json:"msg" validate:"required"`
- Data BackupOut `json:"data"`
- }
+ BDBackModeFull DBBackupMode = "FULL"
+ DBBackModePTrack DBBackupMode = "PTRACK"
)
diff --git a/pitr/cli/internal/pkg/model/ls_backup.go b/pitr/cli/internal/pkg/model/ls_backup.go
index 48514bf..962007c 100644
--- a/pitr/cli/internal/pkg/model/ls_backup.go
+++ b/pitr/cli/internal/pkg/model/ls_backup.go
@@ -21,7 +21,7 @@ type (
// LsBackup LocalStorageBackup
LsBackup struct {
Info *BackupMetaInfo `json:"info"`
- DnList []DataNode `json:"dn_list"`
+ DnList []*DataNode `json:"dn_list"`
SsBackup *SsBackup `json:"ss_backup"`
}
@@ -33,32 +33,36 @@ type (
}
DataNode struct {
- IP string `json:"ip"`
- Port string `json:"port"`
- Status string `json:"status"`
- BackupID string `json:"backup_id"`
- StartTime int64 `json:"start_time"` // Unix time
- EndTime int64 `json:"end_time"` // Unix time
+ IP string `json:"ip"`
+ Port uint16 `json:"port"`
+ Status BackupStatus `json:"status"`
+ BackupID string `json:"backup_id"`
+ StartTime int64 `json:"start_time"` // Unix time
+ EndTime int64 `json:"end_time"` // Unix time
}
)
type (
SsBackup struct {
- Status string `json:"status"`
- ClusterInfo ClusterInfo `json:"cluster_info"`
- StorageNodes []StorageNode `json:"storage_nodes"`
+ Status BackupStatus `json:"status"`
+ ClusterInfo *ClusterInfo `json:"cluster_info"`
+ StorageNodes []*StorageNode `json:"storage_nodes"`
}
StorageNode struct {
IP string `json:"ip"`
- Port string `json:"port"`
+ Port uint16 `json:"port,string"`
Username string `json:"username"`
Password string `json:"password"`
Database string `json:"database"`
- Remark string `json:"remark"`
+ Remark string `json:"remark,omitempty"`
+ }
+
+ StorageNodesInfo struct {
+ StorageNodes *StorageNodes `json:"storage_nodes"`
}
StorageNodes struct {
- List []StorageNode `json:"storage_nodes"`
+ List []*StorageNode `json:"sharding_db"`
}
)
diff --git a/pitr/cli/internal/pkg/shardingsphere-proxy.go b/pitr/cli/internal/pkg/shardingsphere-proxy.go
index bd0be41..51de400 100644
--- a/pitr/cli/internal/pkg/shardingsphere-proxy.go
+++ b/pitr/cli/internal/pkg/shardingsphere-proxy.go
@@ -34,7 +34,7 @@ type (
IShardingSphereProxy interface {
ExportMetaData() (*model.ClusterInfo, error)
- ExportStorageNodes() ([]model.StorageNode, error)
+ ExportStorageNodes() ([]*model.StorageNode, error)
LockForRestore() error
LockForBackup() error
Unlock() error
@@ -111,17 +111,15 @@ func (ss *shardingSphereProxy) ExportMetaData() (*model.ClusterInfo, error) {
if err = query.Scan(&id, &createTime, &data); err != nil {
return nil, xerr.NewCliErr(fmt.Sprintf("query scan failure,err=%s", err))
}
-
if err = query.Close(); err != nil {
return nil, xerr.NewCliErr(fmt.Sprintf("query close failure,err=%s", err))
}
}
-
- out := model.SsBackupInfo{}
+ out := model.ClusterInfo{}
if err = json.Unmarshal([]byte(data), &out); err != nil {
return nil, fmt.Errorf("json unmarshal return err=%s", err)
}
- return nil, nil
+ return &out, nil
}
/*
@@ -130,10 +128,10 @@ ExportStorageNodes 导出存储节点数据
+-----------------------------+-------------------------+----------------------------------------+
| id | create_time | data |
+-------------------------------------------------------+----------------------------------------+
-| 734bb036-b15d-4af0-be87-237 | 2023-01-01 12:00:00 897 | {"storage_nodes":[]} |
+| 734bb036-b15d-4af0-be87-237 | 2023-01-01 12:00:00 897 | {"storage_nodes":{"sharding_db":[]}} |
+-------------------------------------------------------+----------------------------------------+
*/
-func (ss *shardingSphereProxy) ExportStorageNodes() ([]model.StorageNode, error) {
+func (ss *shardingSphereProxy) ExportStorageNodes() ([]*model.StorageNode, error) {
query, err := ss.db.Query(`EXPORT STORAGE NODES;`)
if err != nil {
return nil, xerr.NewCliErr(fmt.Sprintf("export storage nodes failure,err=%s", err))
@@ -152,12 +150,11 @@ func (ss *shardingSphereProxy) ExportStorageNodes() ([]model.StorageNode, error)
return nil, xerr.NewCliErr(fmt.Sprintf("query close failure,err=%s", err))
}
}
-
- out := model.StorageNodes{}
+ out := &model.StorageNodesInfo{}
if err = json.Unmarshal([]byte(data), &out); err != nil {
return nil, fmt.Errorf("json unmarshal return err=%s", err)
}
- return out.List, nil
+ return out.StorageNodes.List, nil
}
// ImportMetaData 备份数据恢复
diff --git a/pitr/cli/internal/pkg/shardingsphere-proxy_test.go b/pitr/cli/internal/pkg/shardingsphere-proxy_test.go
index d949161..7544d7a 100644
--- a/pitr/cli/internal/pkg/shardingsphere-proxy_test.go
+++ b/pitr/cli/internal/pkg/shardingsphere-proxy_test.go
@@ -19,6 +19,8 @@ package pkg
import (
"fmt"
+ "github.com/apache/shardingsphere-on-cloud/pitr/cli/internal/pkg/model"
+ "testing"
"time"
. "github.com/onsi/ginkgo/v2"
@@ -83,3 +85,89 @@ var _ = Describe("IShardingSphereProxy", func() {
})
})
+
+var (
+ // implement with your own env
+ u string
+ p string
+ h string
+ pt uint16
+ db string
+)
+
+func Test_shardingSphereProxy_Unlock(t *testing.T) {
+ tests := []struct {
+ name string
+
+ wantErr bool
+ }{
+ {
+ name: "test unlock after lock",
+ wantErr: false,
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ ss, _ := NewShardingSphereProxy(u, p, db, h, pt)
+ if err := ss.LockForBackup(); (err != nil) != tt.wantErr {
+ t.Errorf("Lock() error = %v, wantErr %v", err, tt.wantErr)
+ }
+ if err := ss.Unlock(); (err != nil) != tt.wantErr {
+ t.Errorf("Unlock() error = %v, wantErr %v", err, tt.wantErr)
+ }
+ })
+ }
+}
+
+func Test_shardingSphereProxy_ExportMetaData(t *testing.T) {
+ tests := []struct {
+ name string
+ want *model.ClusterInfo
+ wantErr bool
+ }{
+ {
+ name: "test export metadata",
+ wantErr: false,
+ want: &model.ClusterInfo{},
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ ss, _ := NewShardingSphereProxy(u, p, db, h, pt)
+ _, err := ss.ExportMetaData()
+ if (err != nil) != tt.wantErr {
+ t.Errorf("ExportMetaData() error = %v, wantErr %v", err, tt.wantErr)
+ return
+ }
+ //if !reflect.DeepEqual(got, tt.want) {
+ // t.Errorf("ExportMetaData() got = %v, want %v", got, tt.want)
+ //}
+ })
+ }
+}
+
+func Test_shardingSphereProxy_ExportStorageNodes(t *testing.T) {
+ tests := []struct {
+ name string
+ want []*model.StorageNode
+ wantErr bool
+ }{
+ {
+ name: "test export storage nodes",
+ want: []*model.StorageNode{},
+ wantErr: false,
+ }}
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ ss, _ := NewShardingSphereProxy(u, p, db, h, pt)
+ _, err := ss.ExportStorageNodes()
+ if (err != nil) != tt.wantErr {
+ t.Errorf("ExportStorageNodes() error = %v, wantErr %v", err, tt.wantErr)
+ return
+ }
+ //if !reflect.DeepEqual(got, tt.want) {
+ // t.Errorf("ExportStorageNodes() got = %v, want %v", got, tt.want)
+ //}
+ })
+ }
+}
diff --git a/pitr/cli/pkg/httputils/req.go b/pitr/cli/pkg/httputils/req.go
index fc93c6c..2a72018 100644
--- a/pitr/cli/pkg/httputils/req.go
+++ b/pitr/cli/pkg/httputils/req.go
@@ -20,10 +20,12 @@ package httputils
import (
"bytes"
"context"
+ "crypto/tls"
"encoding/json"
"fmt"
"io"
"net/http"
+ "strings"
)
type req struct {
@@ -36,6 +38,9 @@ type req struct {
}
func NewRequest(ctx context.Context, method, url string) *req {
+ if !strings.HasPrefix(url, "http") {
+ url = fmt.Sprintf("http://%s", url)
+ }
r := &req{
ctx: ctx,
method: method,
@@ -87,7 +92,10 @@ func (r *req) Send(body any) (int, error) {
_req.URL.RawQuery = values.Encode()
}
- c := &http.Client{}
+ tr := &http.Transport{
+ TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
+ }
+ c := &http.Client{Transport: tr}
resp, err := c.Do(_req)
if err != nil {
return -1, fmt.Errorf("http request err=%w", err)