You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by mi...@apache.org on 2023/04/25 02:19:20 UTC

[shardingsphere-on-cloud] branch main updated: chore(pitr): pitr backup support display backup progress

This is an automated email from the ASF dual-hosted git repository.

miaoliyao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/shardingsphere-on-cloud.git


The following commit(s) were added to refs/heads/main by this push:
     new 157509f  chore(pitr): pitr backup support display backup progress
     new 72d7b5d  Merge pull request #321 from Xu-Wentao/pitr
157509f is described below

commit 157509f6e878d184f21d6884b725db966715b28e
Author: xuwentao <cu...@yahoo.com>
AuthorDate: Sun Apr 23 20:17:04 2023 +0800

    chore(pitr): pitr backup support display backup progress
---
 pitr/agent/Makefile                      |   2 +-
 pitr/cli/Makefile                        |   2 +-
 pitr/cli/internal/cmd/backup.go          | 147 ++++++++++++++++++++-----------
 pitr/cli/internal/cmd/backup_test.go     | 129 ++++++++++++++++++++-------
 pitr/cli/internal/pkg/model/as_backup.go |   6 ++
 pitr/cli/pkg/prettyoutput/progress.go    |  35 ++++++++
 6 files changed, 233 insertions(+), 88 deletions(-)

diff --git a/pitr/agent/Makefile b/pitr/agent/Makefile
index 7fd3b96..d1c0a32 100644
--- a/pitr/agent/Makefile
+++ b/pitr/agent/Makefile
@@ -8,7 +8,7 @@ openssl-local:
 	openssl req -new -SHA256 -newkey rsa:2048 -nodes -keyout tls.key -out tls.csr -subj "/C=CN/ST=beijing/L=beijing/O=/OU=/" && \
 	openssl x509 -req -sha256 -days 365 -in tls.csr -signkey tls.key -out tls.crt
 test:
-	go test ./... -coverprofile cover.out
+	go test -gcflags=-l -v ./... -coverprofile cover.out
 
 build:
 	GOOS=$(GOOS) go build -o pitr-agent
diff --git a/pitr/cli/Makefile b/pitr/cli/Makefile
index d6b27df..b933165 100644
--- a/pitr/cli/Makefile
+++ b/pitr/cli/Makefile
@@ -5,7 +5,7 @@ GOOS := $(shell go env GOOS)
 build:
 	GOOS=$(GOOS) go build -o gs_pitr main.go
 test:
-	go test -v ./... -cover -coverprofile cover.out
+	go test -gcflags=-l -v ./... -cover -coverprofile cover.out
 cover:
 	go tool cover -html cover.out
 lint:
diff --git a/pitr/cli/internal/cmd/backup.go b/pitr/cli/internal/cmd/backup.go
index 25956fb..ced03a6 100644
--- a/pitr/cli/internal/cmd/backup.go
+++ b/pitr/cli/internal/cmd/backup.go
@@ -20,14 +20,15 @@ package cmd
 import (
 	"fmt"
 	"os"
-	"sync"
 	"time"
 
 	"github.com/apache/shardingsphere-on-cloud/pitr/cli/internal/pkg"
 	"github.com/apache/shardingsphere-on-cloud/pitr/cli/internal/pkg/model"
 	"github.com/apache/shardingsphere-on-cloud/pitr/cli/internal/pkg/xerr"
 	"github.com/apache/shardingsphere-on-cloud/pitr/cli/pkg/logging"
+	"github.com/apache/shardingsphere-on-cloud/pitr/cli/pkg/prettyoutput"
 	"github.com/google/uuid"
+	"github.com/jedib0t/go-pretty/v6/progress"
 	"github.com/jedib0t/go-pretty/v6/table"
 	"github.com/spf13/cobra"
 	"github.com/spf13/pflag"
@@ -232,14 +233,10 @@ func execBackup(lsBackup *model.LsBackup) error {
 	logging.Info("Starting send backup command to agent server...")
 
 	for _, node := range sNodes {
-		node := node
-		agentHost := node.IP
-		if agentHost == "127.0.0.1" {
-			agentHost = Host
-		}
-		as := pkg.NewAgentServer(fmt.Sprintf("%s:%d", agentHost, AgentPort))
+		sn := node
+		as := pkg.NewAgentServer(fmt.Sprintf("%s:%d", convertLocalhost(sn.IP), AgentPort))
 		g.Go(func() error {
-			return _execBackup(as, node, dnCh)
+			return _execBackup(as, sn, dnCh)
 		})
 	}
 
@@ -269,19 +266,16 @@ func checkAgentServerStatus(lsBackup *model.LsBackup) bool {
 	available := true
 
 	for _, node := range lsBackup.SsBackup.StorageNodes {
-		n := node
-		agentHost := n.IP
-		if agentHost == "127.0.0.1" {
-			agentHost = Host
-		}
-		as := pkg.NewAgentServer(fmt.Sprintf("%s:%d", agentHost, AgentPort))
+		sn := node
+		as := pkg.NewAgentServer(fmt.Sprintf("%s:%d", convertLocalhost(sn.IP), AgentPort))
 		if err := as.CheckStatus(); err != nil {
-			statusList = append(statusList, &model.AgentServerStatus{IP: n.IP, Status: "Unavailable"})
+			statusList = append(statusList, &model.AgentServerStatus{IP: sn.IP, Status: "Unavailable"})
 			available = false
 		} else {
-			statusList = append(statusList, &model.AgentServerStatus{IP: n.IP, Status: "Available"})
+			statusList = append(statusList, &model.AgentServerStatus{IP: sn.IP, Status: "Available"})
 		}
 	}
+
 	t := table.NewWriter()
 	t.SetOutputMirror(os.Stdout)
 	t.SetTitle("Agent Server Status")
@@ -328,64 +322,100 @@ func _execBackup(as pkg.IAgentServer, node *model.StorageNode, dnCh chan *model.
 
 func checkBackupStatus(lsBackup *model.LsBackup) model.BackupStatus {
 	var (
-		wg                sync.WaitGroup
 		dataNodeMap       = make(map[string]*model.DataNode)
+		dnCh              = make(chan *model.DataNode, len(lsBackup.DnList))
 		backupFinalStatus = model.SsBackupStatusCompleted
-		statusCh          = make(chan *model.DataNode, len(lsBackup.DnList))
+		totalNum          = len(lsBackup.SsBackup.StorageNodes)
+		dnResult          = make([]*model.DataNode, 0)
 	)
 
-	// DataNode.IP -> DataNode
 	for _, dn := range lsBackup.DnList {
 		dataNodeMap[dn.IP] = dn
 	}
 
-	for _, sn := range lsBackup.SsBackup.StorageNodes {
-		wg.Add(1)
-		go func(wg *sync.WaitGroup, sn *model.StorageNode) {
-			defer wg.Done()
-			agentHost := sn.IP
-			if agentHost == "127.0.0.1" {
-				agentHost = Host
-			}
-			as := pkg.NewAgentServer(fmt.Sprintf("%s:%d", agentHost, AgentPort))
-			dn := dataNodeMap[sn.IP]
-
-			// check backup status
-			status := checkStatus(as, sn, dn.BackupID, model.BackupStatus(""), defaultShowDetailRetryTimes)
+	pw := prettyoutput.NewPW(totalNum)
+	go pw.Render()
+	for idx := 0; idx < totalNum; idx++ {
+		sn := lsBackup.SsBackup.StorageNodes[idx]
+		as := pkg.NewAgentServer(fmt.Sprintf("%s:%d", convertLocalhost(sn.IP), AgentPort))
+		dn := dataNodeMap[sn.IP]
+		go checkStatus(as, sn, dn, dnCh, pw)
+	}
 
-			// update DataNode status
-			dn.Status = status
-			dn.EndTime = time.Now().Unix()
-			statusCh <- dn
-		}(&wg, sn)
+	// wait for all data node backup finished
+	time.Sleep(time.Millisecond * 100)
+	for pw.IsRenderInProgress() {
+		if pw.LengthActive() == 0 {
+			pw.Stop()
+		}
+		time.Sleep(time.Millisecond * 100)
 	}
 
-	wg.Wait()
-	close(statusCh)
+	close(dnCh)
 
-	for dn := range statusCh {
-		logging.Info(fmt.Sprintf("data node backup final status: [IP:%s, backupID:%s] ==> %s", dn.IP, dn.BackupID, dn.Status))
+	for dn := range dnCh {
+		dnResult = append(dnResult, dn)
 		if dn.Status != model.SsBackupStatusCompleted {
 			backupFinalStatus = model.SsBackupStatusFailed
 		}
 	}
 
+	// print backup result formatted
+	t := table.NewWriter()
+	t.SetOutputMirror(os.Stdout)
+	t.SetTitle("Backup Task Result: %s", backupFinalStatus)
+	t.AppendHeader(table.Row{"#", "Data Node IP", "Data Node Port", "Result"})
+
+	for i, dn := range dnResult {
+		t.AppendRow([]interface{}{i + 1, dn.IP, dn.Port, dn.Status})
+		t.AppendSeparator()
+	}
+
+	t.Render()
+
+	lsBackup.DnList = dnResult
 	lsBackup.SsBackup.Status = backupFinalStatus
 	lsBackup.Info.EndTime = time.Now().Unix()
 	return backupFinalStatus
 }
 
-func checkStatus(as pkg.IAgentServer, sn *model.StorageNode, backupID string, status model.BackupStatus, retryTimes uint8) model.BackupStatus {
-	if retryTimes+1 == 0 {
-		return status
-	}
-	if status == model.SsBackupStatusCompleted || status == model.SsBackupStatusFailed {
-		return status
-	}
+func checkStatus(as pkg.IAgentServer, sn *model.StorageNode, dn *model.DataNode, dnCh chan *model.DataNode, pw progress.Writer) {
+	var (
+		// mark check status is done, time ticker should break.
+		done = make(chan bool)
+		// time ticker, try to doCheck request every 2 seconds.
+		ticker = time.Tick(time.Second * 2)
+		// progress bar.
+		tracker = progress.Tracker{Message: fmt.Sprintf("Checking backup status  # %s:%d", sn.IP, AgentPort), Total: 0, Units: progress.UnitsDefault}
+	)
 
-	// todo: how often to check backup status
-	time.Sleep(time.Second * 2)
+	pw.AppendTracker(&tracker)
+
+	for !tracker.IsDone() {
+		select {
+		case <-done:
+			return
+		case <-ticker:
+			status, err := doCheck(as, sn, dn.BackupID, defaultShowDetailRetryTimes)
+			if err != nil {
+				tracker.MarkAsErrored()
+				dn.Status = status
+				dn.EndTime = time.Now().Unix()
+				dnCh <- dn
+				done <- true
+			}
+			if status == model.SsBackupStatusCompleted || status == model.SsBackupStatusFailed {
+				tracker.MarkAsDone()
+				dn.Status = status
+				dn.EndTime = time.Now().Unix()
+				dnCh <- dn
+				done <- true
+			}
+		}
+	}
+}
 
+func doCheck(as pkg.IAgentServer, sn *model.StorageNode, backupID string, retries int) (status model.BackupStatus, err error) {
 	in := &model.ShowDetailIn{
 		DBPort:       sn.Port,
 		DBName:       sn.Database,
@@ -397,8 +427,19 @@ func checkStatus(as pkg.IAgentServer, sn *model.StorageNode, backupID string, st
 	}
 	backupInfo, err := as.ShowDetail(in)
 	if err != nil {
-		logging.Error(fmt.Sprintf("get storage node [IP:%s] backup detail from agent server failed, will retry %d times.\n%s", sn.IP, retryTimes, err.Error()))
-		return checkStatus(as, sn, backupID, model.SsBackupStatusCheckError, retryTimes-1)
+		if retries == 0 {
+			return model.SsBackupStatusCheckError, err
+		}
+		time.Sleep(time.Second * 1)
+		return doCheck(as, sn, backupID, retries-1)
+	}
+
+	return backupInfo.Status, nil
+}
+
+func convertLocalhost(ip string) string {
+	if ip == "127.0.0.1" {
+		return Host
 	}
-	return checkStatus(as, sn, backupID, backupInfo.Status, retryTimes)
+	return ip
 }
diff --git a/pitr/cli/internal/cmd/backup_test.go b/pitr/cli/internal/cmd/backup_test.go
index 1f56485..ada9434 100644
--- a/pitr/cli/internal/cmd/backup_test.go
+++ b/pitr/cli/internal/cmd/backup_test.go
@@ -36,7 +36,7 @@ import (
 var ctrl *gomock.Controller
 
 var _ = Describe("Backup", func() {
-	Context("check status", func() {
+	Context("do check", func() {
 		var (
 			as *mock_pkg.MockIAgentServer
 			sn = &model.StorageNode{
@@ -53,23 +53,31 @@ var _ = Describe("Backup", func() {
 
 		It("agent server return err", func() {
 			as.EXPECT().ShowDetail(&model.ShowDetailIn{Instance: defaultInstance}).Return(nil, errors.New("timeout"))
-			Expect(checkStatus(as, sn, "", "", 0)).To(Equal(model.SsBackupStatusCheckError))
+			status, err := doCheck(as, sn, "", 0)
+			Expect(err).To(HaveOccurred())
+			Expect(status).To(Equal(model.SsBackupStatusCheckError))
 		})
 
 		It("mock agent server and return failed status", func() {
 			as.EXPECT().ShowDetail(&model.ShowDetailIn{Instance: defaultInstance}).Return(&model.BackupInfo{Status: model.SsBackupStatusFailed}, nil)
-			Expect(checkStatus(as, sn, "", "", 0)).To(Equal(model.SsBackupStatusFailed))
+			status, err := doCheck(as, sn, "", 0)
+			Expect(err).ToNot(HaveOccurred())
+			Expect(status).To(Equal(model.SsBackupStatusFailed))
 		})
 
 		It("mock agent server and return completed status", func() {
 			as.EXPECT().ShowDetail(&model.ShowDetailIn{Instance: defaultInstance}).Return(&model.BackupInfo{Status: model.SsBackupStatusCompleted}, nil)
-			Expect(checkStatus(as, sn, "", "", 0)).To(Equal(model.SsBackupStatusCompleted))
+			status, err := doCheck(as, sn, "", 0)
+			Expect(err).ToNot(HaveOccurred())
+			Expect(status).To(Equal(model.SsBackupStatusCompleted))
 		})
 
-		It("mock agent server and return timeout error first time and then retry 1 time return completed status", func() {
-			as.EXPECT().ShowDetail(&model.ShowDetailIn{Instance: defaultInstance}).Times(1).Return(nil, errors.New("timeout"))
+		It("mock agent server and return check err first time and then success", func() {
+			as.EXPECT().ShowDetail(&model.ShowDetailIn{Instance: defaultInstance}).Return(nil, errors.New("timeout"))
 			as.EXPECT().ShowDetail(&model.ShowDetailIn{Instance: defaultInstance}).Return(&model.BackupInfo{Status: model.SsBackupStatusCompleted}, nil)
-			Expect(checkStatus(as, sn, "", "", 1)).To(Equal(model.SsBackupStatusCompleted))
+			status, err := doCheck(as, sn, "", 1)
+			Expect(err).ToNot(HaveOccurred())
+			Expect(status).To(Equal(model.SsBackupStatusCompleted))
 		})
 	})
 
@@ -78,17 +86,15 @@ var _ = Describe("Backup", func() {
 			proxy *mock_pkg.MockIShardingSphereProxy
 			ls    *mock_pkg.MockILocalStorage
 		)
-
 		BeforeEach(func() {
 			ctrl = gomock.NewController(GinkgoT())
 			proxy = mock_pkg.NewMockIShardingSphereProxy(ctrl)
 			ls = mock_pkg.NewMockILocalStorage(ctrl)
-		})
 
+		})
 		AfterEach(func() {
 			ctrl.Finish()
 		})
-
 		It("export data", func() {
 			// mock proxy export metadata
 			proxy.EXPECT().ExportMetaData().Return(&model.ClusterInfo{}, nil)
@@ -104,16 +110,11 @@ var _ = Describe("Backup", func() {
 			Expect(bk.Info.CSN).To(Equal(""))
 		})
 	})
-	Context("exec backup", func() {
 
-		var as *mock_pkg.MockIAgentServer
-		bak := &model.LsBackup{
-			DnList: nil,
-			SsBackup: &model.SsBackup{
-				Status:       "Running",
-				StorageNodes: []*model.StorageNode{},
-			},
-		}
+	Context("exec backup", func() {
+		var (
+			as *mock_pkg.MockIAgentServer
+		)
 		BeforeEach(func() {
 			ctrl = gomock.NewController(GinkgoT())
 			as = mock_pkg.NewMockIAgentServer(ctrl)
@@ -121,6 +122,14 @@ var _ = Describe("Backup", func() {
 		AfterEach(func() {
 			ctrl.Finish()
 		})
+		bak := &model.LsBackup{
+			DnList: nil,
+			SsBackup: &model.SsBackup{
+				Status:       "Running",
+				StorageNodes: []*model.StorageNode{},
+			},
+		}
+
 		It("exec backup empty storage nodes", func() {
 			Expect(execBackup(bak)).To(BeNil())
 		})
@@ -155,22 +164,76 @@ var _ = Describe("Backup", func() {
 		})
 	})
 
-	Context("exec backup", func() {
-		It("exec backup", func() {
-			var (
-				as       *mock_pkg.MockIAgentServer
-				node     = &model.StorageNode{}
-				failSnCh = make(chan *model.StorageNode, 10)
-				dnCh     = make(chan *model.DataNode, 10)
-			)
+	Context("check backup status", func() {
+		var (
+			as       *mock_pkg.MockIAgentServer
+			lsbackup *model.LsBackup
+		)
+		BeforeEach(func() {
+			lsbackup = &model.LsBackup{
+				DnList: []*model.DataNode{
+					{
+						IP:   "127.0.0.1",
+						Port: 3306,
+					},
+					{
+						IP:   "127.0.0.2",
+						Port: 3307,
+					},
+				},
+				SsBackup: &model.SsBackup{
+					Status: "Running",
+					StorageNodes: []*model.StorageNode{
+						{
+							IP:   "127.0.0.1",
+							Port: 3306,
+						},
+						{
+							IP:   "127.0.0.2",
+							Port: 3307,
+						},
+					},
+				},
+				Info: &model.BackupMetaInfo{},
+			}
+
+			ctrl = gomock.NewController(GinkgoT())
 			as = mock_pkg.NewMockIAgentServer(ctrl)
 
-			defer close(dnCh)
-			defer ctrl.Finish()
-			as.EXPECT().Backup(gomock.Any()).Return("backup-id", nil)
-			Expect(_execBackup(as, node, dnCh)).To(BeNil())
-			Expect(len(failSnCh)).To(Equal(0))
-			Expect(len(dnCh)).To(Equal(1))
+			monkey.Patch(pkg.NewAgentServer, func(_ string) pkg.IAgentServer {
+				return as
+			})
+		})
+		AfterEach(func() {
+			ctrl.Finish()
+			monkey.UnpatchAll()
+		})
+
+		It("check error", func() {
+			as.EXPECT().ShowDetail(gomock.Any()).Return(nil, errors.New("timeout")).AnyTimes()
+			Expect(checkBackupStatus(lsbackup)).To(Equal(model.SsBackupStatusFailed))
+		})
+
+		It("check error 2", func() {
+			as.EXPECT().ShowDetail(gomock.Any()).Return(nil, errors.New("timeout")).Times(1)
+			as.EXPECT().ShowDetail(gomock.Any()).Return(&model.BackupInfo{Status: model.SsBackupStatusFailed}, nil).AnyTimes()
+			Expect(checkBackupStatus(lsbackup)).To(Equal(model.SsBackupStatusFailed))
+		})
+
+		It("check error 3", func() {
+			as.EXPECT().ShowDetail(gomock.Any()).Return(nil, errors.New("timeout")).Times(2)
+			as.EXPECT().ShowDetail(gomock.Any()).Return(&model.BackupInfo{Status: model.SsBackupStatusCompleted}, nil).AnyTimes()
+			Expect(checkBackupStatus(lsbackup)).To(Equal(model.SsBackupStatusCompleted))
+		})
+
+		It("check failed", func() {
+			as.EXPECT().ShowDetail(gomock.Any()).Return(&model.BackupInfo{Status: model.SsBackupStatusFailed}, nil).AnyTimes()
+			Expect(checkBackupStatus(lsbackup)).To(Equal(model.SsBackupStatusFailed))
+		})
+
+		It("check success", func() {
+			as.EXPECT().ShowDetail(gomock.Any()).Return(&model.BackupInfo{Status: model.SsBackupStatusCompleted}, nil).AnyTimes()
+			Expect(checkBackupStatus(lsbackup)).To(Equal(model.SsBackupStatusCompleted))
 		})
 	})
 })
diff --git a/pitr/cli/internal/pkg/model/as_backup.go b/pitr/cli/internal/pkg/model/as_backup.go
index bd18148..ee3078f 100644
--- a/pitr/cli/internal/pkg/model/as_backup.go
+++ b/pitr/cli/internal/pkg/model/as_backup.go
@@ -45,3 +45,9 @@ type AgentServerStatus struct {
 	IP     string `json:"ip"`
 	Status string `json:"status"`
 }
+
+type BackupResult struct {
+	IP     string       `json:"ip"`
+	Port   uint16       `json:"port"`
+	Status BackupStatus `json:"status"`
+}
diff --git a/pitr/cli/pkg/prettyoutput/progress.go b/pitr/cli/pkg/prettyoutput/progress.go
new file mode 100644
index 0000000..e70552a
--- /dev/null
+++ b/pitr/cli/pkg/prettyoutput/progress.go
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package prettyoutput
+
+import (
+	"github.com/jedib0t/go-pretty/v6/progress"
+)
+
+func NewPW(totalNum int) progress.Writer {
+	pw := progress.NewWriter()
+	pw.SetTrackerLength(25)
+	pw.SetAutoStop(true)
+	pw.SetNumTrackersExpected(totalNum)
+	pw.SetSortBy(progress.SortByPercentDsc)
+	style := progress.StyleDefault
+	style.Options.PercentIndeterminate = "running"
+	pw.SetStyle(style)
+	pw.SetTrackerPosition(progress.PositionRight)
+	return pw
+}