You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by mi...@apache.org on 2023/03/01 07:11:59 UTC

[shardingsphere-on-cloud] branch main updated: feat(pitr):agent backup api (#232)

This is an automated email from the ASF dual-hosted git repository.

miaoliyao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/shardingsphere-on-cloud.git


The following commit(s) were added to refs/heads/main by this push:
     new 87e7363  feat(pitr):agent backup api (#232)
87e7363 is described below

commit 87e7363daa0802e4274174307cf6d13f2d3b3353
Author: lltgo <ll...@foxmail.com>
AuthorDate: Wed Mar 1 15:11:53 2023 +0800

    feat(pitr):agent backup api (#232)
---
 pitr/agent/internal/cons/error.go          |  1 +
 pitr/agent/internal/handler/backup.go      | 19 +++++++++--
 pitr/agent/internal/handler/view/backup.go |  9 +++---
 pitr/agent/internal/pkg/opengauss.go       | 51 ++++++++++++++++++------------
 pitr/agent/internal/pkg/opengauss_test.go  | 25 ++++++++-------
 pitr/agent/internal/pkg/pkg.go             |  4 +--
 pitr/agent/main.go                         |  2 +-
 7 files changed, 70 insertions(+), 41 deletions(-)

diff --git a/pitr/agent/internal/cons/error.go b/pitr/agent/internal/cons/error.go
index f5ba399..a86c0be 100644
--- a/pitr/agent/internal/cons/error.go
+++ b/pitr/agent/internal/cons/error.go
@@ -45,4 +45,5 @@ var (
 	BodyParseFailed        = xerror.New(10020, "Invalid http request body.")
 	MissingDbName          = xerror.New(10021, "Missing db name.")
 	DbConnectionFailed     = xerror.New(10022, "Database connection failed.")
+	UnmatchBackupID        = xerror.New(10023, "Unmatch any backup id.")
 )
diff --git a/pitr/agent/internal/handler/backup.go b/pitr/agent/internal/handler/backup.go
index 10d3006..2d9cd2b 100644
--- a/pitr/agent/internal/handler/backup.go
+++ b/pitr/agent/internal/handler/backup.go
@@ -19,6 +19,8 @@ package handler
 
 import (
 	"fmt"
+	"github.com/apache/shardingsphere-on-cloud/pitr/agent/internal/pkg"
+	"github.com/apache/shardingsphere-on-cloud/pitr/agent/pkg/responder"
 
 	"github.com/apache/shardingsphere-on-cloud/pitr/agent/internal/handler/view"
 
@@ -35,8 +37,21 @@ func Backup(ctx *fiber.Ctx) error {
 	}
 
 	if err := in.Validate(); err != nil {
-		return err
+		return fmt.Errorf("invalid parameter,err=%w", err)
 	}
 
-	return ctx.JSON(in)
+	if err := pkg.OG.Auth(in.Username, in.Password, in.DbName, in.DbPort); err != nil {
+		efmt := "pkg.OG.Auth failure[un=%s,pw.len=%d,db=%s],err=%w"
+		return fmt.Errorf(efmt, in.Username, len(in.Password), in.DbName, err)
+	}
+
+	backupID, err := pkg.OG.AsyncBackup(in.DnBackupPath, in.Instance, in.DnBackupMode, 1)
+	if err != nil {
+		efmt := "pkg.OG.AsyncBackup[path=%s,instance=%s,mode=%s] failure,err=%w"
+		return fmt.Errorf(efmt, in.DnBackupPath, in.Instance, in.DnBackupMode, err)
+	}
+
+	return responder.Success(ctx, view.BackupOut{
+		ID: backupID,
+	})
 }
diff --git a/pitr/agent/internal/handler/view/backup.go b/pitr/agent/internal/handler/view/backup.go
index 9ab7e21..19a29aa 100644
--- a/pitr/agent/internal/handler/view/backup.go
+++ b/pitr/agent/internal/handler/view/backup.go
@@ -21,10 +21,11 @@ import "github.com/apache/shardingsphere-on-cloud/pitr/agent/internal/cons"
 
 type (
 	BackupIn struct {
-		DbPort       uint16 `json:"db_port"`
-		DbName       string `json:"db_name"`
-		Username     string `json:"username"`
-		Password     string `json:"password"`
+		DbPort   uint16 `json:"db_port"`
+		DbName   string `json:"db_name"`
+		Username string `json:"username"`
+		Password string `json:"password"`
+
 		DnBackupPath string `json:"dn_backup_path"`
 		DnThreadsNum uint8  `json:"dn_threads_num"`
 		DnBackupMode string `json:"dn_backup_mode"`
diff --git a/pitr/agent/internal/pkg/opengauss.go b/pitr/agent/internal/pkg/opengauss.go
index c992e88..da44642 100644
--- a/pitr/agent/internal/pkg/opengauss.go
+++ b/pitr/agent/internal/pkg/opengauss.go
@@ -33,29 +33,33 @@ import (
 
 type (
 	openGauss struct {
-		shell string
+		shell  string
+		pgData string
 	}
 
 	IOpenGauss interface {
-		AsyncBackup(backupPath, instanceName, backupMode, pgData string) (string, error)
+		AsyncBackup(backupPath, instanceName, backupMode string, threadsNum uint8) (string, error)
 		ShowBackup(backupPath, instanceName, backupID string) (*model.Backup, error)
 		Init(backupPath string) error
-		AddInstance(backupPath, instancee, pgData string) error
-		DelInstance(backupPath, instancee string) error
-		Start(pgData string) error
-		Stop(pgData string) error
-		Restore(backupPath, instance, backupID, pgData string) error
+		AddInstance(backupPath, instance string) error
+		DelInstance(backupPath, instance string) error
+		Start() error
+		Stop() error
+		Restore(backupPath, instance, backupID string) error
 		ShowBackupList(backupPath, instanceName string) ([]model.Backup, error)
 		Auth(user, password, dbName string, dbPort uint16) error
 	}
 )
 
-func NewOpenGauss(shell string) IOpenGauss {
-	return &openGauss{shell: shell}
+func NewOpenGauss(shell, pgData string) IOpenGauss {
+	return &openGauss{
+		shell:  shell,
+		pgData: pgData,
+	}
 }
 
 const (
-	_backupFmt    = "gs_probackup backup --backup-path=%s --instance=%s --backup-mode=%s --pgdata=%s 2>&1"
+	_backupFmt    = "gs_probackup backup --backup-path=%s --instance=%s --backup-mode=%s --pgdata=%s --threads=%d 2>&1"
 	_showFmt      = "gs_probackup show --instance=%s --backup-path=%s --backup-id=%s --format=json 2>&1"
 	_delBackupFmt = "gs_probackup delete --backup-path=%s --instance=%s --backup-id=%s 2>&1"
 	_restoreFmt   = "gs_probackup restore --backup-path=%s --instance=%s --backup-id=%s --pgdata=%s 2>&1"
@@ -72,8 +76,8 @@ const (
 	_showListFmt = "gs_probackup show --instance=%s --backup-path=%s --format=json 2>&1"
 )
 
-func (og *openGauss) AsyncBackup(backupPath, instanceName, backupMode, pgData string) (string, error) {
-	cmd := fmt.Sprintf(_backupFmt, backupPath, instanceName, backupMode, pgData)
+func (og *openGauss) AsyncBackup(backupPath, instanceName, backupMode string, threadsNum uint8) (string, error) {
+	cmd := fmt.Sprintf(_backupFmt, backupPath, instanceName, backupMode, og.pgData, threadsNum)
 	outputs, err := cmds.AsyncExec(og.shell, cmd)
 	if err != nil {
 		return "", fmt.Errorf("cmds.AsyncExec[shell=%s,cmd=%s] return err=%w", og.shell, cmd, err)
@@ -155,8 +159,8 @@ func (og *openGauss) deinit(backupPath string) error {
 	return nil
 }
 
-func (og *openGauss) AddInstance(backupPath, instancee, pgData string) error {
-	cmd := fmt.Sprintf(_addInstanceFmt, backupPath, instancee, pgData)
+func (og *openGauss) AddInstance(backupPath, instance string) error {
+	cmd := fmt.Sprintf(_addInstanceFmt, backupPath, instance, og.pgData)
 	_, err := cmds.Exec(og.shell, cmd)
 	// already exist and it's not empty
 	if errors.Is(err, cons.CmdOperateFailed) {
@@ -181,8 +185,8 @@ func (og *openGauss) DelInstance(backupPath, instancee string) error {
 	return nil
 }
 
-func (og *openGauss) Start(pgData string) error {
-	cmd := fmt.Sprintf(_startOpenGaussFmt, pgData)
+func (og *openGauss) Start() error {
+	cmd := fmt.Sprintf(_startOpenGaussFmt, og.pgData)
 	_, err := cmds.Exec(og.shell, cmd)
 	// already exist and it's not empty
 	if errors.Is(err, cons.CmdOperateFailed) {
@@ -194,8 +198,8 @@ func (og *openGauss) Start(pgData string) error {
 	return nil
 }
 
-func (og *openGauss) Stop(pgData string) error {
-	cmd := fmt.Sprintf(_stopOpenGaussFmt, pgData)
+func (og *openGauss) Stop() error {
+	cmd := fmt.Sprintf(_stopOpenGaussFmt, og.pgData)
 	_, err := cmds.Exec(og.shell, cmd)
 	// already exist and it's not empty
 	if errors.Is(err, cons.CmdOperateFailed) {
@@ -208,8 +212,8 @@ func (og *openGauss) Stop(pgData string) error {
 }
 
 // Restore TODO:Dependent environments require integration testing
-func (og *openGauss) Restore(backupPath, instance, backupID, pgData string) error {
-	cmd := fmt.Sprintf(_restoreFmt, backupPath, instance, backupID, pgData)
+func (og *openGauss) Restore(backupPath, instance, backupID string) error {
+	cmd := fmt.Sprintf(_restoreFmt, backupPath, instance, backupID, og.pgData)
 	outputs, err := cmds.AsyncExec(og.shell, cmd)
 
 	for output := range outputs {
@@ -260,8 +264,15 @@ func (og *openGauss) ignore(outputs chan *cmds.Output) {
 }
 
 func (og *openGauss) getBackupID(msg string) (string, error) {
+	fmt.Println(msg)
 	re := regexp2.MustCompile("(?<=backup ID:\\s+)\\w+(?=,)", 0)
 	match, err := re.FindStringMatch(msg)
+	if err != nil {
+		return "", fmt.Errorf("unmatch any backup id[msg=%s],err=%s", msg, err)
+	}
+	if match.Length == 0 {
+		return "", fmt.Errorf("unmatch any backup id,match.lenght is 0,err=%w", cons.UnmatchBackupID)
+	}
 	return match.String(), err
 }
 
diff --git a/pitr/agent/internal/pkg/opengauss_test.go b/pitr/agent/internal/pkg/opengauss_test.go
index 7bdb724..9656f20 100644
--- a/pitr/agent/internal/pkg/opengauss_test.go
+++ b/pitr/agent/internal/pkg/opengauss_test.go
@@ -34,7 +34,8 @@ var _ = Describe("OpenGauss,requires opengauss environment", func() {
 	Context("AsyncBackup & ShowBackupDetail ", func() {
 		It("backup, show and delete", func() {
 			og := &openGauss{
-				shell: "/bin/sh",
+				shell:  "/bin/sh",
+				pgData: "/data/opengauss/3.1.1/data/single_node/",
 			}
 
 			var (
@@ -46,7 +47,7 @@ var _ = Describe("OpenGauss,requires opengauss environment", func() {
 				data,
 				instance,
 				"full",
-				"/data/opengauss/3.1.1/data/single_node/",
+				1,
 			)
 
 			Expect(err).To(BeNil())
@@ -55,7 +56,7 @@ var _ = Describe("OpenGauss,requires opengauss environment", func() {
 
 			// timeout 60s
 			for i := 0; i < 60; i++ {
-				backup, err := og.ShowBackupDetail(
+				backup, err := og.ShowBackup(
 					data,
 					instance,
 					backupID,
@@ -121,18 +122,18 @@ var _ = Describe("OpenGauss,requires opengauss environment", func() {
 	Context("AddInstance and DelInstance", func() {
 		It("instance:add and delete", func() {
 			og := &openGauss{
-				shell: "/bin/sh",
+				shell:  "/bin/sh",
+				pgData: "/data/opengauss/3.1.1/data/single_node/",
 			}
 
 			var (
 				backupPath = "/home/omm/data"
 				instance   = "ins-test-1"
-				pgData     = "/data/opengauss/3.1.1/data/single_node/"
 			)
-			err := og.AddInstance(backupPath, instance, pgData)
+			err := og.AddInstance(backupPath, instance)
 			Expect(err).To(BeNil())
 
-			err = og.AddInstance(backupPath, instance, pgData)
+			err = og.AddInstance(backupPath, instance)
 			Expect(errors.Is(err, cons.InstanceAlreadyExist)).To(BeTrue())
 
 			err = og.DelInstance(backupPath, instance)
@@ -146,17 +147,17 @@ var _ = Describe("OpenGauss,requires opengauss environment", func() {
 	Context("Start and Stop", func() {
 		It("start and stop:may fail if no instance exists", func() {
 			og := &openGauss{
-				shell: "/bin/sh",
+				shell:  "/bin/sh",
+				pgData: "/data/opengauss/3.1.1/data/single_node/",
 			}
 
-			var pgData = "/data/opengauss/3.1.1/data/single_node/"
-			err := og.Stop(pgData)
+			err := og.Stop()
 			Expect(err).To(BeNil())
 
-			err = og.Stop(pgData)
+			err = og.Stop()
 			Expect(errors.Is(err, cons.StopOpenGaussFailed)).To(BeTrue())
 
-			err = og.Start(pgData)
+			err = og.Start()
 			Expect(err).To(BeNil())
 		})
 	})
diff --git a/pitr/agent/internal/pkg/pkg.go b/pitr/agent/internal/pkg/pkg.go
index 59e81e2..5e8f356 100644
--- a/pitr/agent/internal/pkg/pkg.go
+++ b/pitr/agent/internal/pkg/pkg.go
@@ -21,6 +21,6 @@ var (
 	OG IOpenGauss
 )
 
-func Init(shell string) {
-	OG = NewOpenGauss(shell)
+func Init(shell, pgData string) {
+	OG = NewOpenGauss(shell, pgData)
 }
diff --git a/pitr/agent/main.go b/pitr/agent/main.go
index 81dd37a..ff143df 100644
--- a/pitr/agent/main.go
+++ b/pitr/agent/main.go
@@ -69,7 +69,6 @@ func main() {
 	if shell == "" {
 		panic(fmt.Errorf("shell does not exist"))
 	}
-	pkg.Init(shell)
 
 	if pgData == "" {
 		pgData = os.Getenv("PGDATA")
@@ -77,6 +76,7 @@ func main() {
 			panic(fmt.Errorf("PGDATA:no database directory specified and environment variable PGDATA unset"))
 		}
 	}
+	pkg.Init(shell, pgData)
 
 	if strings.Trim(tlsCrt, " ") == "" || strings.Trim(tlsKey, " ") == "" {
 		panic(fmt.Errorf("lack of HTTPs certificate"))