You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by mi...@apache.org on 2023/03/01 07:11:59 UTC
[shardingsphere-on-cloud] branch main updated: feat(pitr):agent backup api (#232)
This is an automated email from the ASF dual-hosted git repository.
miaoliyao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/shardingsphere-on-cloud.git
The following commit(s) were added to refs/heads/main by this push:
new 87e7363 feat(pitr):agent backup api (#232)
87e7363 is described below
commit 87e7363daa0802e4274174307cf6d13f2d3b3353
Author: lltgo <ll...@foxmail.com>
AuthorDate: Wed Mar 1 15:11:53 2023 +0800
feat(pitr):agent backup api (#232)
---
pitr/agent/internal/cons/error.go | 1 +
pitr/agent/internal/handler/backup.go | 19 +++++++++--
pitr/agent/internal/handler/view/backup.go | 9 +++---
pitr/agent/internal/pkg/opengauss.go | 51 ++++++++++++++++++------------
pitr/agent/internal/pkg/opengauss_test.go | 25 ++++++++-------
pitr/agent/internal/pkg/pkg.go | 4 +--
pitr/agent/main.go | 2 +-
7 files changed, 70 insertions(+), 41 deletions(-)
diff --git a/pitr/agent/internal/cons/error.go b/pitr/agent/internal/cons/error.go
index f5ba399..a86c0be 100644
--- a/pitr/agent/internal/cons/error.go
+++ b/pitr/agent/internal/cons/error.go
@@ -45,4 +45,5 @@ var (
BodyParseFailed = xerror.New(10020, "Invalid http request body.")
MissingDbName = xerror.New(10021, "Missing db name.")
DbConnectionFailed = xerror.New(10022, "Database connection failed.")
+ UnmatchBackupID = xerror.New(10023, "Unmatch any backup id.")
)
diff --git a/pitr/agent/internal/handler/backup.go b/pitr/agent/internal/handler/backup.go
index 10d3006..2d9cd2b 100644
--- a/pitr/agent/internal/handler/backup.go
+++ b/pitr/agent/internal/handler/backup.go
@@ -19,6 +19,8 @@ package handler
import (
"fmt"
+ "github.com/apache/shardingsphere-on-cloud/pitr/agent/internal/pkg"
+ "github.com/apache/shardingsphere-on-cloud/pitr/agent/pkg/responder"
"github.com/apache/shardingsphere-on-cloud/pitr/agent/internal/handler/view"
@@ -35,8 +37,21 @@ func Backup(ctx *fiber.Ctx) error {
}
if err := in.Validate(); err != nil {
- return err
+ return fmt.Errorf("invalid parameter,err=%w", err)
}
- return ctx.JSON(in)
+ if err := pkg.OG.Auth(in.Username, in.Password, in.DbName, in.DbPort); err != nil {
+ efmt := "pkg.OG.Auth failure[un=%s,pw.len=%d,db=%s],err=%w"
+ return fmt.Errorf(efmt, in.Username, len(in.Password), in.DbName, err)
+ }
+
+ backupID, err := pkg.OG.AsyncBackup(in.DnBackupPath, in.Instance, in.DnBackupMode, 1)
+ if err != nil {
+ efmt := "pkg.OG.AsyncBackup[path=%s,instance=%s,mode=%s] failure,err=%w"
+ return fmt.Errorf(efmt, in.DnBackupPath, in.Instance, in.DnBackupMode, err)
+ }
+
+ return responder.Success(ctx, view.BackupOut{
+ ID: backupID,
+ })
}
diff --git a/pitr/agent/internal/handler/view/backup.go b/pitr/agent/internal/handler/view/backup.go
index 9ab7e21..19a29aa 100644
--- a/pitr/agent/internal/handler/view/backup.go
+++ b/pitr/agent/internal/handler/view/backup.go
@@ -21,10 +21,11 @@ import "github.com/apache/shardingsphere-on-cloud/pitr/agent/internal/cons"
type (
BackupIn struct {
- DbPort uint16 `json:"db_port"`
- DbName string `json:"db_name"`
- Username string `json:"username"`
- Password string `json:"password"`
+ DbPort uint16 `json:"db_port"`
+ DbName string `json:"db_name"`
+ Username string `json:"username"`
+ Password string `json:"password"`
+
DnBackupPath string `json:"dn_backup_path"`
DnThreadsNum uint8 `json:"dn_threads_num"`
DnBackupMode string `json:"dn_backup_mode"`
diff --git a/pitr/agent/internal/pkg/opengauss.go b/pitr/agent/internal/pkg/opengauss.go
index c992e88..da44642 100644
--- a/pitr/agent/internal/pkg/opengauss.go
+++ b/pitr/agent/internal/pkg/opengauss.go
@@ -33,29 +33,33 @@ import (
type (
openGauss struct {
- shell string
+ shell string
+ pgData string
}
IOpenGauss interface {
- AsyncBackup(backupPath, instanceName, backupMode, pgData string) (string, error)
+ AsyncBackup(backupPath, instanceName, backupMode string, threadsNum uint8) (string, error)
ShowBackup(backupPath, instanceName, backupID string) (*model.Backup, error)
Init(backupPath string) error
- AddInstance(backupPath, instancee, pgData string) error
- DelInstance(backupPath, instancee string) error
- Start(pgData string) error
- Stop(pgData string) error
- Restore(backupPath, instance, backupID, pgData string) error
+ AddInstance(backupPath, instance string) error
+ DelInstance(backupPath, instance string) error
+ Start() error
+ Stop() error
+ Restore(backupPath, instance, backupID string) error
ShowBackupList(backupPath, instanceName string) ([]model.Backup, error)
Auth(user, password, dbName string, dbPort uint16) error
}
)
-func NewOpenGauss(shell string) IOpenGauss {
- return &openGauss{shell: shell}
+func NewOpenGauss(shell, pgData string) IOpenGauss {
+ return &openGauss{
+ shell: shell,
+ pgData: pgData,
+ }
}
const (
- _backupFmt = "gs_probackup backup --backup-path=%s --instance=%s --backup-mode=%s --pgdata=%s 2>&1"
+ _backupFmt = "gs_probackup backup --backup-path=%s --instance=%s --backup-mode=%s --pgdata=%s --threads=%d 2>&1"
_showFmt = "gs_probackup show --instance=%s --backup-path=%s --backup-id=%s --format=json 2>&1"
_delBackupFmt = "gs_probackup delete --backup-path=%s --instance=%s --backup-id=%s 2>&1"
_restoreFmt = "gs_probackup restore --backup-path=%s --instance=%s --backup-id=%s --pgdata=%s 2>&1"
@@ -72,8 +76,8 @@ const (
_showListFmt = "gs_probackup show --instance=%s --backup-path=%s --format=json 2>&1"
)
-func (og *openGauss) AsyncBackup(backupPath, instanceName, backupMode, pgData string) (string, error) {
- cmd := fmt.Sprintf(_backupFmt, backupPath, instanceName, backupMode, pgData)
+func (og *openGauss) AsyncBackup(backupPath, instanceName, backupMode string, threadsNum uint8) (string, error) {
+ cmd := fmt.Sprintf(_backupFmt, backupPath, instanceName, backupMode, og.pgData, threadsNum)
outputs, err := cmds.AsyncExec(og.shell, cmd)
if err != nil {
return "", fmt.Errorf("cmds.AsyncExec[shell=%s,cmd=%s] return err=%w", og.shell, cmd, err)
@@ -155,8 +159,8 @@ func (og *openGauss) deinit(backupPath string) error {
return nil
}
-func (og *openGauss) AddInstance(backupPath, instancee, pgData string) error {
- cmd := fmt.Sprintf(_addInstanceFmt, backupPath, instancee, pgData)
+func (og *openGauss) AddInstance(backupPath, instance string) error {
+ cmd := fmt.Sprintf(_addInstanceFmt, backupPath, instance, og.pgData)
_, err := cmds.Exec(og.shell, cmd)
// already exist and it's not empty
if errors.Is(err, cons.CmdOperateFailed) {
@@ -181,8 +185,8 @@ func (og *openGauss) DelInstance(backupPath, instancee string) error {
return nil
}
-func (og *openGauss) Start(pgData string) error {
- cmd := fmt.Sprintf(_startOpenGaussFmt, pgData)
+func (og *openGauss) Start() error {
+ cmd := fmt.Sprintf(_startOpenGaussFmt, og.pgData)
_, err := cmds.Exec(og.shell, cmd)
// already exist and it's not empty
if errors.Is(err, cons.CmdOperateFailed) {
@@ -194,8 +198,8 @@ func (og *openGauss) Start(pgData string) error {
return nil
}
-func (og *openGauss) Stop(pgData string) error {
- cmd := fmt.Sprintf(_stopOpenGaussFmt, pgData)
+func (og *openGauss) Stop() error {
+ cmd := fmt.Sprintf(_stopOpenGaussFmt, og.pgData)
_, err := cmds.Exec(og.shell, cmd)
// already exist and it's not empty
if errors.Is(err, cons.CmdOperateFailed) {
@@ -208,8 +212,8 @@ func (og *openGauss) Stop(pgData string) error {
}
// Restore TODO:Dependent environments require integration testing
-func (og *openGauss) Restore(backupPath, instance, backupID, pgData string) error {
- cmd := fmt.Sprintf(_restoreFmt, backupPath, instance, backupID, pgData)
+func (og *openGauss) Restore(backupPath, instance, backupID string) error {
+ cmd := fmt.Sprintf(_restoreFmt, backupPath, instance, backupID, og.pgData)
outputs, err := cmds.AsyncExec(og.shell, cmd)
for output := range outputs {
@@ -260,8 +264,15 @@ func (og *openGauss) ignore(outputs chan *cmds.Output) {
}
func (og *openGauss) getBackupID(msg string) (string, error) {
+ fmt.Println(msg)
re := regexp2.MustCompile("(?<=backup ID:\\s+)\\w+(?=,)", 0)
match, err := re.FindStringMatch(msg)
+ if err != nil {
+ return "", fmt.Errorf("unmatch any backup id[msg=%s],err=%s", msg, err)
+ }
+ if match.Length == 0 {
+ return "", fmt.Errorf("unmatch any backup id,match.lenght is 0,err=%w", cons.UnmatchBackupID)
+ }
return match.String(), err
}
diff --git a/pitr/agent/internal/pkg/opengauss_test.go b/pitr/agent/internal/pkg/opengauss_test.go
index 7bdb724..9656f20 100644
--- a/pitr/agent/internal/pkg/opengauss_test.go
+++ b/pitr/agent/internal/pkg/opengauss_test.go
@@ -34,7 +34,8 @@ var _ = Describe("OpenGauss,requires opengauss environment", func() {
Context("AsyncBackup & ShowBackupDetail ", func() {
It("backup, show and delete", func() {
og := &openGauss{
- shell: "/bin/sh",
+ shell: "/bin/sh",
+ pgData: "/data/opengauss/3.1.1/data/single_node/",
}
var (
@@ -46,7 +47,7 @@ var _ = Describe("OpenGauss,requires opengauss environment", func() {
data,
instance,
"full",
- "/data/opengauss/3.1.1/data/single_node/",
+ 1,
)
Expect(err).To(BeNil())
@@ -55,7 +56,7 @@ var _ = Describe("OpenGauss,requires opengauss environment", func() {
// timeout 60s
for i := 0; i < 60; i++ {
- backup, err := og.ShowBackupDetail(
+ backup, err := og.ShowBackup(
data,
instance,
backupID,
@@ -121,18 +122,18 @@ var _ = Describe("OpenGauss,requires opengauss environment", func() {
Context("AddInstance and DelInstance", func() {
It("instance:add and delete", func() {
og := &openGauss{
- shell: "/bin/sh",
+ shell: "/bin/sh",
+ pgData: "/data/opengauss/3.1.1/data/single_node/",
}
var (
backupPath = "/home/omm/data"
instance = "ins-test-1"
- pgData = "/data/opengauss/3.1.1/data/single_node/"
)
- err := og.AddInstance(backupPath, instance, pgData)
+ err := og.AddInstance(backupPath, instance)
Expect(err).To(BeNil())
- err = og.AddInstance(backupPath, instance, pgData)
+ err = og.AddInstance(backupPath, instance)
Expect(errors.Is(err, cons.InstanceAlreadyExist)).To(BeTrue())
err = og.DelInstance(backupPath, instance)
@@ -146,17 +147,17 @@ var _ = Describe("OpenGauss,requires opengauss environment", func() {
Context("Start and Stop", func() {
It("start and stop:may fail if no instance exists", func() {
og := &openGauss{
- shell: "/bin/sh",
+ shell: "/bin/sh",
+ pgData: "/data/opengauss/3.1.1/data/single_node/",
}
- var pgData = "/data/opengauss/3.1.1/data/single_node/"
- err := og.Stop(pgData)
+ err := og.Stop()
Expect(err).To(BeNil())
- err = og.Stop(pgData)
+ err = og.Stop()
Expect(errors.Is(err, cons.StopOpenGaussFailed)).To(BeTrue())
- err = og.Start(pgData)
+ err = og.Start()
Expect(err).To(BeNil())
})
})
diff --git a/pitr/agent/internal/pkg/pkg.go b/pitr/agent/internal/pkg/pkg.go
index 59e81e2..5e8f356 100644
--- a/pitr/agent/internal/pkg/pkg.go
+++ b/pitr/agent/internal/pkg/pkg.go
@@ -21,6 +21,6 @@ var (
OG IOpenGauss
)
-func Init(shell string) {
- OG = NewOpenGauss(shell)
+func Init(shell, pgData string) {
+ OG = NewOpenGauss(shell, pgData)
}
diff --git a/pitr/agent/main.go b/pitr/agent/main.go
index 81dd37a..ff143df 100644
--- a/pitr/agent/main.go
+++ b/pitr/agent/main.go
@@ -69,7 +69,6 @@ func main() {
if shell == "" {
panic(fmt.Errorf("shell does not exist"))
}
- pkg.Init(shell)
if pgData == "" {
pgData = os.Getenv("PGDATA")
@@ -77,6 +76,7 @@ func main() {
panic(fmt.Errorf("PGDATA:no database directory specified and environment variable PGDATA unset"))
}
}
+ pkg.Init(shell, pgData)
if strings.Trim(tlsCrt, " ") == "" || strings.Trim(tlsKey, " ") == "" {
panic(fmt.Errorf("lack of HTTPs certificate"))