You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by mi...@apache.org on 2023/03/02 10:58:36 UTC
[shardingsphere-on-cloud] branch main updated: feat(pitr):agent restore api and backup pgdata (#239)
This is an automated email from the ASF dual-hosted git repository.
miaoliyao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/shardingsphere-on-cloud.git
The following commit(s) were added to refs/heads/main by this push:
new 1df9b5b feat(pitr):agent restore api and backup pgdata (#239)
1df9b5b is described below
commit 1df9b5ba39221cfd921b69dbe4f87538c64d1d3a
Author: lltgo <ll...@foxmail.com>
AuthorDate: Thu Mar 2 18:58:30 2023 +0800
feat(pitr):agent restore api and backup pgdata (#239)
* feat(pitr):agent restore api and backup pgdata
* chore:comments
---
pitr/agent/internal/cons/error.go | 3 ++
pitr/agent/internal/handler/restore.go | 62 ++++++++++++++++++++++++-------
pitr/agent/internal/pkg/opengauss.go | 68 ++++++++++++++++++++++++++--------
pitr/agent/main.go | 7 ++++
4 files changed, 110 insertions(+), 30 deletions(-)
diff --git a/pitr/agent/internal/cons/error.go b/pitr/agent/internal/cons/error.go
index 4596a0c..d9b2b2a 100644
--- a/pitr/agent/internal/cons/error.go
+++ b/pitr/agent/internal/cons/error.go
@@ -48,4 +48,7 @@ var (
UnmatchBackupID = xerror.New(10023, "Unmatch any backup id.")
InvalidPgDataDir = xerror.New(10024, "Invalid PGDATA dir.")
UnknownOgStatus = xerror.New(10025, "Unknown openGauss status.")
+ MvPgDataToTempFailed = xerror.New(10026, "Move pgdata dir to temp failed.")
+ MvTempToPgDataFailed = xerror.New(10027, "Move temp dir to pgdata failed.")
+ CleanPgDataTempFailed = xerror.New(10028, "Clean pgdata temp dir failed.")
)
diff --git a/pitr/agent/internal/handler/restore.go b/pitr/agent/internal/handler/restore.go
index be4c2f6..df7ce62 100644
--- a/pitr/agent/internal/handler/restore.go
+++ b/pitr/agent/internal/handler/restore.go
@@ -20,6 +20,7 @@ package handler
import (
"fmt"
"github.com/apache/shardingsphere-on-cloud/pitr/agent/internal/pkg"
+ "github.com/apache/shardingsphere-on-cloud/pitr/agent/pkg/responder"
"github.com/gofiber/fiber/v2"
@@ -27,34 +28,67 @@ import (
"github.com/apache/shardingsphere-on-cloud/pitr/agent/internal/handler/view"
)
-func Restore(ctx *fiber.Ctx) error {
+func Restore(ctx *fiber.Ctx) (err error) {
in := &view.RestoreIn{}
- if err := ctx.BodyParser(in); err != nil {
- return fmt.Errorf("body parse err=%s,wrap=%w", err, cons.BodyParseFailed)
+ if err = ctx.BodyParser(in); err != nil {
+ err = fmt.Errorf("body parse err=%s,wrap=%w", err, cons.BodyParseFailed)
+ return
}
- if err := in.Validate(); err != nil {
- return fmt.Errorf("invalid parameter,err=%w", err)
+ if err = in.Validate(); err != nil {
+ err = fmt.Errorf("invalid parameter,err=%w", err)
+ return
}
- if err := pkg.OG.Auth(in.Username, in.Password, in.DbName, in.DbPort); err != nil {
+ if err = pkg.OG.Auth(in.Username, in.Password, in.DbName, in.DbPort); err != nil {
efmt := "pkg.OG.Auth failure[un=%s,pw.len=%d,db=%s],err=%w"
- return fmt.Errorf(efmt, in.Username, len(in.Password), in.DbName, err)
+ err = fmt.Errorf(efmt, in.Username, len(in.Password), in.DbName, err)
+ return
}
- if err := pkg.OG.Stop(); err != nil {
- return fmt.Errorf("stop openGauss failure,err=%w", err)
+ if err = pkg.OG.Stop(); err != nil {
+ err = fmt.Errorf("stop openGauss failure,err=%w", err)
+ return
}
+ defer func() {
+ if err != nil {
+ err2 := pkg.OG.Start()
+ if err2 != nil {
+ err = fmt.Errorf("pkg.OG.Start() return err=%s,wrap=%w", err2, err)
+ return
+ }
+ }
+ }()
- if err := pkg.OG.Restore(in.DnBackupPath, in.Instance, in.DnBackupId); err != nil {
+ if err = pkg.OG.MvPgDataToTemp(); err != nil {
+ err = fmt.Errorf("pkg.OG.MvPgDataToTemp return err=%w", err)
+ return
+ }
+
+ if err = pkg.OG.Restore(in.DnBackupPath, in.Instance, in.DnBackupId); err != nil {
efmt := "pkg.OG.Restore failure[path=%s,instance=%s,backupID=%s],err=%w"
- return fmt.Errorf(efmt, in.DnBackupPath, in.Instance, in.DnBackupId, err)
+ err = fmt.Errorf(efmt, in.DnBackupPath, in.Instance, in.DnBackupId, err)
+
+ err2 := pkg.OG.MvTempToPgData()
+ err = fmt.Errorf("resotre failre[err=%s],pkg.OG.MvTempToPgData return err=%w", err, err2)
+
+ return
}
- if err := pkg.OG.Start(); err != nil {
- return fmt.Errorf("stop openGauss failure,err=%w", err)
+ if err = pkg.OG.CleanPgDataTemp(); err != nil {
+ err = fmt.Errorf("pkg.OG.CleanPgDataTemp return err=%w", err)
+ return
}
- return ctx.JSON(in)
+ if err = pkg.OG.Start(); err != nil {
+ err = fmt.Errorf("pkg.OG.Start return err=%w", err)
+ return
+ }
+
+ if err = responder.Success(ctx, nil); err != nil {
+ err = fmt.Errorf("responder failure,err=%s,wrap=%w", err, cons.Internal)
+ return nil
+ }
+ return
}
diff --git a/pitr/agent/internal/pkg/opengauss.go b/pitr/agent/internal/pkg/opengauss.go
index 3d9ee28..1c20800 100644
--- a/pitr/agent/internal/pkg/opengauss.go
+++ b/pitr/agent/internal/pkg/opengauss.go
@@ -33,8 +33,9 @@ import (
type (
openGauss struct {
- shell string
- pgData string
+ shell string
+ pgData string
+ pgDataTemp string
}
IOpenGauss interface {
@@ -49,13 +50,20 @@ type (
Restore(backupPath, instance, backupID string) error
ShowBackupList(backupPath, instanceName string) ([]model.Backup, error)
Auth(user, password, dbName string, dbPort uint16) error
+ MvTempToPgData() error
+ MvPgDataToTemp() error
+ CleanPgDataTemp() error
}
)
func NewOpenGauss(shell, pgData string) IOpenGauss {
+ dirs := strings.Split(pgData, "/")
+ dirs = append(dirs[0:len(dirs)-1], "temp")
+
return &openGauss{
- shell: shell,
- pgData: pgData,
+ shell: shell,
+ pgData: pgData,
+ pgDataTemp: strings.Join(dirs, "/"),
}
}
@@ -76,6 +84,8 @@ const (
_statusGaussFmt = "gs_ctl status --pgdata=%s"
_showListFmt = "gs_probackup show --instance=%s --backup-path=%s --format=json 2>&1"
+
+ _mvFmt = "mv %s %s"
)
func (og *openGauss) AsyncBackup(backupPath, instanceName, backupMode string, threadsNum uint8) (string, error) {
@@ -190,7 +200,7 @@ func (og *openGauss) Start() error {
cmd := fmt.Sprintf(_startOpenGaussFmt, og.pgData)
_, err := cmds.Exec(og.shell, cmd)
if errors.Is(err, cons.CmdOperateFailed) {
- return fmt.Errorf("starat openGauss failure,err=%s,wrap=%w", err, cons.StartOpenGaussFailed)
+ return fmt.Errorf("start openGauss failure,err=%s,wrap=%w", err, cons.StartOpenGaussFailed)
}
if err != nil {
return fmt.Errorf("cmds.Exec[shell=%s,cmd=%s] return err=%w", og.shell, cmd, err)
@@ -240,20 +250,10 @@ func (og *openGauss) Status() (string, error) {
// Restore TODO:Dependent environments require integration testing
func (og *openGauss) Restore(backupPath, instance, backupID string) error {
- if len(og.pgData) < 2 && strings.HasPrefix(og.pgData, "/") {
- return fmt.Errorf("invalid pg data dir[path=%s],err=%w", og.pgData, cons.InvalidPgDataDir)
- }
-
- if _, err := cmds.Exec(og.shell, fmt.Sprintf(_rmDirFmt, og.pgData)); err != nil {
- return fmt.Errorf("rm PGDATA dir failure,err=%s,wrap=%w", err, cons.RestoreFailed)
- }
-
cmd := fmt.Sprintf(_restoreFmt, backupPath, instance, backupID, og.pgData)
outputs, err := cmds.AsyncExec(og.shell, cmd)
for output := range outputs {
- // TODO just for dev,rm in next commit
- fmt.Println(output.Message)
if errors.Is(err, cons.CmdOperateFailed) {
return fmt.Errorf("outputs get err=%s,wrap=%w", output.Error, cons.RestoreFailed)
}
@@ -301,7 +301,6 @@ func (og *openGauss) ignore(outputs chan *cmds.Output) {
}
func (og *openGauss) getBackupID(msg string) (string, error) {
- fmt.Println(msg)
re := regexp2.MustCompile("(?<=backup ID:\\s+)\\w+(?=,)", 0)
match, err := re.FindStringMatch(msg)
if err != nil {
@@ -331,3 +330,40 @@ func (og *openGauss) Auth(user, password, dbName string, dbPort uint16) error {
}
return nil
}
+
+func (og *openGauss) MvPgDataToTemp() error {
+ cmd := fmt.Sprintf(_mvFmt, og.pgData, og.pgDataTemp)
+ _, err := cmds.Exec(og.shell, cmd)
+ if errors.Is(err, cons.CmdOperateFailed) {
+ return fmt.Errorf("mv pgdata to temp dir failure,err=%s,wrap=%w", err, cons.MvPgDataToTempFailed)
+ }
+ if err != nil {
+ return fmt.Errorf("cmds.Exec[shell=%s,cmd=%s] return err=%w", og.shell, cmd, err)
+ }
+
+ return nil
+}
+
+func (og *openGauss) MvTempToPgData() error {
+ cmd := fmt.Sprintf(_mvFmt, og.pgDataTemp, og.pgData)
+ _, err := cmds.Exec(og.shell, cmd)
+ if errors.Is(err, cons.CmdOperateFailed) {
+ return fmt.Errorf("mv temp to pgdata dir failure,err=%s,wrap=%w", err, cons.MvTempToPgDataFailed)
+ }
+ if err != nil {
+ return fmt.Errorf("cmds.Exec[shell=%s,cmd=%s] return err=%w", og.shell, cmd, err)
+ }
+ return nil
+}
+
+func (og *openGauss) CleanPgDataTemp() error {
+ cmd := fmt.Sprintf(_rmDirFmt, og.pgDataTemp)
+ _, err := cmds.Exec(og.shell, cmd)
+ if errors.Is(err, cons.CmdOperateFailed) {
+ return fmt.Errorf("clean pgdata temp dir failure,err=%s,wrap=%w", err, cons.CleanPgDataTempFailed)
+ }
+ if err != nil {
+ return fmt.Errorf("cmds.Exec[shell=%s,cmd=%s] return err=%w", og.shell, cmd, err)
+ }
+ return nil
+}
diff --git a/pitr/agent/main.go b/pitr/agent/main.go
index ff143df..011b6ef 100644
--- a/pitr/agent/main.go
+++ b/pitr/agent/main.go
@@ -76,6 +76,13 @@ func main() {
panic(fmt.Errorf("PGDATA:no database directory specified and environment variable PGDATA unset"))
}
}
+
+ pgData := strings.Trim(pgData, " ")
+ if strings.HasSuffix(pgData, "/") {
+ dirs := strings.Split(pgData, "/")
+ dirs = dirs[0 : len(dirs)-1]
+ pgData = strings.Join(dirs, "/")
+ }
pkg.Init(shell, pgData)
if strings.Trim(tlsCrt, " ") == "" || strings.Trim(tlsKey, " ") == "" {