You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by mi...@apache.org on 2023/03/02 10:58:36 UTC

[shardingsphere-on-cloud] branch main updated: feat(pitr):agent restore api and backup pgdata (#239)

This is an automated email from the ASF dual-hosted git repository.

miaoliyao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/shardingsphere-on-cloud.git


The following commit(s) were added to refs/heads/main by this push:
     new 1df9b5b  feat(pitr):agent restore api and backup pgdata (#239)
1df9b5b is described below

commit 1df9b5ba39221cfd921b69dbe4f87538c64d1d3a
Author: lltgo <ll...@foxmail.com>
AuthorDate: Thu Mar 2 18:58:30 2023 +0800

    feat(pitr):agent restore api and backup pgdata (#239)
    
    * feat(pitr):agent restore api and backup pgdata
    
    * chore:comments
---
 pitr/agent/internal/cons/error.go      |  3 ++
 pitr/agent/internal/handler/restore.go | 62 ++++++++++++++++++++++++-------
 pitr/agent/internal/pkg/opengauss.go   | 68 ++++++++++++++++++++++++++--------
 pitr/agent/main.go                     |  7 ++++
 4 files changed, 110 insertions(+), 30 deletions(-)

diff --git a/pitr/agent/internal/cons/error.go b/pitr/agent/internal/cons/error.go
index 4596a0c..d9b2b2a 100644
--- a/pitr/agent/internal/cons/error.go
+++ b/pitr/agent/internal/cons/error.go
@@ -48,4 +48,7 @@ var (
 	UnmatchBackupID        = xerror.New(10023, "Unmatch any backup id.")
 	InvalidPgDataDir       = xerror.New(10024, "Invalid PGDATA dir.")
 	UnknownOgStatus        = xerror.New(10025, "Unknown openGauss status.")
+	MvPgDataToTempFailed   = xerror.New(10026, "Move pgdata dir to temp failed.")
+	MvTempToPgDataFailed   = xerror.New(10027, "Move temp dir to pgdata failed.")
+	CleanPgDataTempFailed  = xerror.New(10028, "Clean pgdata temp dir failed.")
 )
diff --git a/pitr/agent/internal/handler/restore.go b/pitr/agent/internal/handler/restore.go
index be4c2f6..df7ce62 100644
--- a/pitr/agent/internal/handler/restore.go
+++ b/pitr/agent/internal/handler/restore.go
@@ -20,6 +20,7 @@ package handler
 import (
 	"fmt"
 	"github.com/apache/shardingsphere-on-cloud/pitr/agent/internal/pkg"
+	"github.com/apache/shardingsphere-on-cloud/pitr/agent/pkg/responder"
 
 	"github.com/gofiber/fiber/v2"
 
@@ -27,34 +28,67 @@ import (
 	"github.com/apache/shardingsphere-on-cloud/pitr/agent/internal/handler/view"
 )
 
-func Restore(ctx *fiber.Ctx) error {
+func Restore(ctx *fiber.Ctx) (err error) {
 	in := &view.RestoreIn{}
 
-	if err := ctx.BodyParser(in); err != nil {
-		return fmt.Errorf("body parse err=%s,wrap=%w", err, cons.BodyParseFailed)
+	if err = ctx.BodyParser(in); err != nil {
+		err = fmt.Errorf("body parse err=%s,wrap=%w", err, cons.BodyParseFailed)
+		return
 	}
 
-	if err := in.Validate(); err != nil {
-		return fmt.Errorf("invalid parameter,err=%w", err)
+	if err = in.Validate(); err != nil {
+		err = fmt.Errorf("invalid parameter,err=%w", err)
+		return
 	}
 
-	if err := pkg.OG.Auth(in.Username, in.Password, in.DbName, in.DbPort); err != nil {
+	if err = pkg.OG.Auth(in.Username, in.Password, in.DbName, in.DbPort); err != nil {
 		efmt := "pkg.OG.Auth failure[un=%s,pw.len=%d,db=%s],err=%w"
-		return fmt.Errorf(efmt, in.Username, len(in.Password), in.DbName, err)
+		err = fmt.Errorf(efmt, in.Username, len(in.Password), in.DbName, err)
+		return
 	}
 
-	if err := pkg.OG.Stop(); err != nil {
-		return fmt.Errorf("stop openGauss failure,err=%w", err)
+	if err = pkg.OG.Stop(); err != nil {
+		err = fmt.Errorf("stop openGauss failure,err=%w", err)
+		return
 	}
+	defer func() {
+		if err != nil {
+			err2 := pkg.OG.Start()
+			if err2 != nil {
+				err = fmt.Errorf("pkg.OG.Start() return err=%s,wrap=%w", err2, err)
+				return
+			}
+		}
+	}()
 
-	if err := pkg.OG.Restore(in.DnBackupPath, in.Instance, in.DnBackupId); err != nil {
+	if err = pkg.OG.MvPgDataToTemp(); err != nil {
+		err = fmt.Errorf("pkg.OG.MvPgDataToTemp return err=%w", err)
+		return
+	}
+
+	if err = pkg.OG.Restore(in.DnBackupPath, in.Instance, in.DnBackupId); err != nil {
 		efmt := "pkg.OG.Restore failure[path=%s,instance=%s,backupID=%s],err=%w"
-		return fmt.Errorf(efmt, in.DnBackupPath, in.Instance, in.DnBackupId, err)
+		err = fmt.Errorf(efmt, in.DnBackupPath, in.Instance, in.DnBackupId, err)
+
+		err2 := pkg.OG.MvTempToPgData()
+		err = fmt.Errorf("resotre failre[err=%s],pkg.OG.MvTempToPgData return err=%w", err, err2)
+
+		return
 	}
 
-	if err := pkg.OG.Start(); err != nil {
-		return fmt.Errorf("stop openGauss failure,err=%w", err)
+	if err = pkg.OG.CleanPgDataTemp(); err != nil {
+		err = fmt.Errorf("pkg.OG.CleanPgDataTemp return err=%w", err)
+		return
 	}
 
-	return ctx.JSON(in)
+	if err = pkg.OG.Start(); err != nil {
+		err = fmt.Errorf("pkg.OG.Start return err=%w", err)
+		return
+	}
+
+	if err = responder.Success(ctx, nil); err != nil {
+		err = fmt.Errorf("responder failure,err=%s,wrap=%w", err, cons.Internal)
+		return nil
+	}
+	return
 }
diff --git a/pitr/agent/internal/pkg/opengauss.go b/pitr/agent/internal/pkg/opengauss.go
index 3d9ee28..1c20800 100644
--- a/pitr/agent/internal/pkg/opengauss.go
+++ b/pitr/agent/internal/pkg/opengauss.go
@@ -33,8 +33,9 @@ import (
 
 type (
 	openGauss struct {
-		shell  string
-		pgData string
+		shell      string
+		pgData     string
+		pgDataTemp string
 	}
 
 	IOpenGauss interface {
@@ -49,13 +50,20 @@ type (
 		Restore(backupPath, instance, backupID string) error
 		ShowBackupList(backupPath, instanceName string) ([]model.Backup, error)
 		Auth(user, password, dbName string, dbPort uint16) error
+		MvTempToPgData() error
+		MvPgDataToTemp() error
+		CleanPgDataTemp() error
 	}
 )
 
 func NewOpenGauss(shell, pgData string) IOpenGauss {
+	dirs := strings.Split(pgData, "/")
+	dirs = append(dirs[0:len(dirs)-1], "temp")
+
 	return &openGauss{
-		shell:  shell,
-		pgData: pgData,
+		shell:      shell,
+		pgData:     pgData,
+		pgDataTemp: strings.Join(dirs, "/"),
 	}
 }
 
@@ -76,6 +84,8 @@ const (
 	_statusGaussFmt    = "gs_ctl status --pgdata=%s"
 
 	_showListFmt = "gs_probackup show --instance=%s --backup-path=%s --format=json 2>&1"
+
+	_mvFmt = "mv %s %s"
 )
 
 func (og *openGauss) AsyncBackup(backupPath, instanceName, backupMode string, threadsNum uint8) (string, error) {
@@ -190,7 +200,7 @@ func (og *openGauss) Start() error {
 	cmd := fmt.Sprintf(_startOpenGaussFmt, og.pgData)
 	_, err := cmds.Exec(og.shell, cmd)
 	if errors.Is(err, cons.CmdOperateFailed) {
-		return fmt.Errorf("starat openGauss failure,err=%s,wrap=%w", err, cons.StartOpenGaussFailed)
+		return fmt.Errorf("start openGauss failure,err=%s,wrap=%w", err, cons.StartOpenGaussFailed)
 	}
 	if err != nil {
 		return fmt.Errorf("cmds.Exec[shell=%s,cmd=%s] return err=%w", og.shell, cmd, err)
@@ -240,20 +250,10 @@ func (og *openGauss) Status() (string, error) {
 
 // Restore TODO:Dependent environments require integration testing
 func (og *openGauss) Restore(backupPath, instance, backupID string) error {
-	if len(og.pgData) < 2 && strings.HasPrefix(og.pgData, "/") {
-		return fmt.Errorf("invalid pg data dir[path=%s],err=%w", og.pgData, cons.InvalidPgDataDir)
-	}
-
-	if _, err := cmds.Exec(og.shell, fmt.Sprintf(_rmDirFmt, og.pgData)); err != nil {
-		return fmt.Errorf("rm PGDATA dir failure,err=%s,wrap=%w", err, cons.RestoreFailed)
-	}
-
 	cmd := fmt.Sprintf(_restoreFmt, backupPath, instance, backupID, og.pgData)
 	outputs, err := cmds.AsyncExec(og.shell, cmd)
 
 	for output := range outputs {
-		// TODO just for dev,rm in next commit
-		fmt.Println(output.Message)
 		if errors.Is(err, cons.CmdOperateFailed) {
 			return fmt.Errorf("outputs get err=%s,wrap=%w", output.Error, cons.RestoreFailed)
 		}
@@ -301,7 +301,6 @@ func (og *openGauss) ignore(outputs chan *cmds.Output) {
 }
 
 func (og *openGauss) getBackupID(msg string) (string, error) {
-	fmt.Println(msg)
 	re := regexp2.MustCompile("(?<=backup ID:\\s+)\\w+(?=,)", 0)
 	match, err := re.FindStringMatch(msg)
 	if err != nil {
@@ -331,3 +330,40 @@ func (og *openGauss) Auth(user, password, dbName string, dbPort uint16) error {
 	}
 	return nil
 }
+
+func (og *openGauss) MvPgDataToTemp() error {
+	cmd := fmt.Sprintf(_mvFmt, og.pgData, og.pgDataTemp)
+	_, err := cmds.Exec(og.shell, cmd)
+	if errors.Is(err, cons.CmdOperateFailed) {
+		return fmt.Errorf("mv pgdata to temp dir failure,err=%s,wrap=%w", err, cons.MvPgDataToTempFailed)
+	}
+	if err != nil {
+		return fmt.Errorf("cmds.Exec[shell=%s,cmd=%s] return err=%w", og.shell, cmd, err)
+	}
+
+	return nil
+}
+
+func (og *openGauss) MvTempToPgData() error {
+	cmd := fmt.Sprintf(_mvFmt, og.pgDataTemp, og.pgData)
+	_, err := cmds.Exec(og.shell, cmd)
+	if errors.Is(err, cons.CmdOperateFailed) {
+		return fmt.Errorf("mv temp to pgdata dir failure,err=%s,wrap=%w", err, cons.MvTempToPgDataFailed)
+	}
+	if err != nil {
+		return fmt.Errorf("cmds.Exec[shell=%s,cmd=%s] return err=%w", og.shell, cmd, err)
+	}
+	return nil
+}
+
+func (og *openGauss) CleanPgDataTemp() error {
+	cmd := fmt.Sprintf(_rmDirFmt, og.pgDataTemp)
+	_, err := cmds.Exec(og.shell, cmd)
+	if errors.Is(err, cons.CmdOperateFailed) {
+		return fmt.Errorf("clean pgdata temp dir failure,err=%s,wrap=%w", err, cons.CleanPgDataTempFailed)
+	}
+	if err != nil {
+		return fmt.Errorf("cmds.Exec[shell=%s,cmd=%s] return err=%w", og.shell, cmd, err)
+	}
+	return nil
+}
diff --git a/pitr/agent/main.go b/pitr/agent/main.go
index ff143df..011b6ef 100644
--- a/pitr/agent/main.go
+++ b/pitr/agent/main.go
@@ -76,6 +76,13 @@ func main() {
 			panic(fmt.Errorf("PGDATA:no database directory specified and environment variable PGDATA unset"))
 		}
 	}
+
+	pgData := strings.Trim(pgData, " ")
+	if strings.HasSuffix(pgData, "/") {
+		dirs := strings.Split(pgData, "/")
+		dirs = dirs[0 : len(dirs)-1]
+		pgData = strings.Join(dirs, "/")
+	}
 	pkg.Init(shell, pgData)
 
 	if strings.Trim(tlsCrt, " ") == "" || strings.Trim(tlsKey, " ") == "" {