You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by mi...@apache.org on 2023/04/25 02:19:20 UTC
[shardingsphere-on-cloud] branch main updated: chore(pitr): pitr backup support display backup progress
This is an automated email from the ASF dual-hosted git repository.
miaoliyao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/shardingsphere-on-cloud.git
The following commit(s) were added to refs/heads/main by this push:
new 157509f chore(pitr): pitr backup support display backup progress
new 72d7b5d Merge pull request #321 from Xu-Wentao/pitr
157509f is described below
commit 157509f6e878d184f21d6884b725db966715b28e
Author: xuwentao <cu...@yahoo.com>
AuthorDate: Sun Apr 23 20:17:04 2023 +0800
chore(pitr): pitr backup support display backup progress
---
pitr/agent/Makefile | 2 +-
pitr/cli/Makefile | 2 +-
pitr/cli/internal/cmd/backup.go | 147 ++++++++++++++++++++-----------
pitr/cli/internal/cmd/backup_test.go | 129 ++++++++++++++++++++-------
pitr/cli/internal/pkg/model/as_backup.go | 6 ++
pitr/cli/pkg/prettyoutput/progress.go | 35 ++++++++
6 files changed, 233 insertions(+), 88 deletions(-)
diff --git a/pitr/agent/Makefile b/pitr/agent/Makefile
index 7fd3b96..d1c0a32 100644
--- a/pitr/agent/Makefile
+++ b/pitr/agent/Makefile
@@ -8,7 +8,7 @@ openssl-local:
openssl req -new -SHA256 -newkey rsa:2048 -nodes -keyout tls.key -out tls.csr -subj "/C=CN/ST=beijing/L=beijing/O=/OU=/" && \
openssl x509 -req -sha256 -days 365 -in tls.csr -signkey tls.key -out tls.crt
test:
- go test ./... -coverprofile cover.out
+ go test -gcflags=-l -v ./... -coverprofile cover.out
build:
GOOS=$(GOOS) go build -o pitr-agent
diff --git a/pitr/cli/Makefile b/pitr/cli/Makefile
index d6b27df..b933165 100644
--- a/pitr/cli/Makefile
+++ b/pitr/cli/Makefile
@@ -5,7 +5,7 @@ GOOS := $(shell go env GOOS)
build:
GOOS=$(GOOS) go build -o gs_pitr main.go
test:
- go test -v ./... -cover -coverprofile cover.out
+ go test -gcflags=-l -v ./... -cover -coverprofile cover.out
cover:
go tool cover -html cover.out
lint:
diff --git a/pitr/cli/internal/cmd/backup.go b/pitr/cli/internal/cmd/backup.go
index 25956fb..ced03a6 100644
--- a/pitr/cli/internal/cmd/backup.go
+++ b/pitr/cli/internal/cmd/backup.go
@@ -20,14 +20,15 @@ package cmd
import (
"fmt"
"os"
- "sync"
"time"
"github.com/apache/shardingsphere-on-cloud/pitr/cli/internal/pkg"
"github.com/apache/shardingsphere-on-cloud/pitr/cli/internal/pkg/model"
"github.com/apache/shardingsphere-on-cloud/pitr/cli/internal/pkg/xerr"
"github.com/apache/shardingsphere-on-cloud/pitr/cli/pkg/logging"
+ "github.com/apache/shardingsphere-on-cloud/pitr/cli/pkg/prettyoutput"
"github.com/google/uuid"
+ "github.com/jedib0t/go-pretty/v6/progress"
"github.com/jedib0t/go-pretty/v6/table"
"github.com/spf13/cobra"
"github.com/spf13/pflag"
@@ -232,14 +233,10 @@ func execBackup(lsBackup *model.LsBackup) error {
logging.Info("Starting send backup command to agent server...")
for _, node := range sNodes {
- node := node
- agentHost := node.IP
- if agentHost == "127.0.0.1" {
- agentHost = Host
- }
- as := pkg.NewAgentServer(fmt.Sprintf("%s:%d", agentHost, AgentPort))
+ sn := node
+ as := pkg.NewAgentServer(fmt.Sprintf("%s:%d", convertLocalhost(sn.IP), AgentPort))
g.Go(func() error {
- return _execBackup(as, node, dnCh)
+ return _execBackup(as, sn, dnCh)
})
}
@@ -269,19 +266,16 @@ func checkAgentServerStatus(lsBackup *model.LsBackup) bool {
available := true
for _, node := range lsBackup.SsBackup.StorageNodes {
- n := node
- agentHost := n.IP
- if agentHost == "127.0.0.1" {
- agentHost = Host
- }
- as := pkg.NewAgentServer(fmt.Sprintf("%s:%d", agentHost, AgentPort))
+ sn := node
+ as := pkg.NewAgentServer(fmt.Sprintf("%s:%d", convertLocalhost(sn.IP), AgentPort))
if err := as.CheckStatus(); err != nil {
- statusList = append(statusList, &model.AgentServerStatus{IP: n.IP, Status: "Unavailable"})
+ statusList = append(statusList, &model.AgentServerStatus{IP: sn.IP, Status: "Unavailable"})
available = false
} else {
- statusList = append(statusList, &model.AgentServerStatus{IP: n.IP, Status: "Available"})
+ statusList = append(statusList, &model.AgentServerStatus{IP: sn.IP, Status: "Available"})
}
}
+
t := table.NewWriter()
t.SetOutputMirror(os.Stdout)
t.SetTitle("Agent Server Status")
@@ -328,64 +322,100 @@ func _execBackup(as pkg.IAgentServer, node *model.StorageNode, dnCh chan *model.
func checkBackupStatus(lsBackup *model.LsBackup) model.BackupStatus {
var (
- wg sync.WaitGroup
dataNodeMap = make(map[string]*model.DataNode)
+ dnCh = make(chan *model.DataNode, len(lsBackup.DnList))
backupFinalStatus = model.SsBackupStatusCompleted
- statusCh = make(chan *model.DataNode, len(lsBackup.DnList))
+ totalNum = len(lsBackup.SsBackup.StorageNodes)
+ dnResult = make([]*model.DataNode, 0)
)
- // DataNode.IP -> DataNode
for _, dn := range lsBackup.DnList {
dataNodeMap[dn.IP] = dn
}
- for _, sn := range lsBackup.SsBackup.StorageNodes {
- wg.Add(1)
- go func(wg *sync.WaitGroup, sn *model.StorageNode) {
- defer wg.Done()
- agentHost := sn.IP
- if agentHost == "127.0.0.1" {
- agentHost = Host
- }
- as := pkg.NewAgentServer(fmt.Sprintf("%s:%d", agentHost, AgentPort))
- dn := dataNodeMap[sn.IP]
-
- // check backup status
- status := checkStatus(as, sn, dn.BackupID, model.BackupStatus(""), defaultShowDetailRetryTimes)
+ pw := prettyoutput.NewPW(totalNum)
+ go pw.Render()
+ for idx := 0; idx < totalNum; idx++ {
+ sn := lsBackup.SsBackup.StorageNodes[idx]
+ as := pkg.NewAgentServer(fmt.Sprintf("%s:%d", convertLocalhost(sn.IP), AgentPort))
+ dn := dataNodeMap[sn.IP]
+ go checkStatus(as, sn, dn, dnCh, pw)
+ }
- // update DataNode status
- dn.Status = status
- dn.EndTime = time.Now().Unix()
- statusCh <- dn
- }(&wg, sn)
+ // wait for all data node backup finished
+ time.Sleep(time.Millisecond * 100)
+ for pw.IsRenderInProgress() {
+ if pw.LengthActive() == 0 {
+ pw.Stop()
+ }
+ time.Sleep(time.Millisecond * 100)
}
- wg.Wait()
- close(statusCh)
+ close(dnCh)
- for dn := range statusCh {
- logging.Info(fmt.Sprintf("data node backup final status: [IP:%s, backupID:%s] ==> %s", dn.IP, dn.BackupID, dn.Status))
+ for dn := range dnCh {
+ dnResult = append(dnResult, dn)
if dn.Status != model.SsBackupStatusCompleted {
backupFinalStatus = model.SsBackupStatusFailed
}
}
+ // print backup result formatted
+ t := table.NewWriter()
+ t.SetOutputMirror(os.Stdout)
+ t.SetTitle("Backup Task Result: %s", backupFinalStatus)
+ t.AppendHeader(table.Row{"#", "Data Node IP", "Data Node Port", "Result"})
+
+ for i, dn := range dnResult {
+ t.AppendRow([]interface{}{i + 1, dn.IP, dn.Port, dn.Status})
+ t.AppendSeparator()
+ }
+
+ t.Render()
+
+ lsBackup.DnList = dnResult
lsBackup.SsBackup.Status = backupFinalStatus
lsBackup.Info.EndTime = time.Now().Unix()
return backupFinalStatus
}
-func checkStatus(as pkg.IAgentServer, sn *model.StorageNode, backupID string, status model.BackupStatus, retryTimes uint8) model.BackupStatus {
- if retryTimes+1 == 0 {
- return status
- }
- if status == model.SsBackupStatusCompleted || status == model.SsBackupStatusFailed {
- return status
- }
+func checkStatus(as pkg.IAgentServer, sn *model.StorageNode, dn *model.DataNode, dnCh chan *model.DataNode, pw progress.Writer) {
+ var (
+ // mark check status is done, time ticker should break.
+ done = make(chan bool)
+ // time ticker, try to doCheck request every 2 seconds.
+ ticker = time.Tick(time.Second * 2)
+ // progress bar.
+ tracker = progress.Tracker{Message: fmt.Sprintf("Checking backup status # %s:%d", sn.IP, AgentPort), Total: 0, Units: progress.UnitsDefault}
+ )
- // todo: how often to check backup status
- time.Sleep(time.Second * 2)
+ pw.AppendTracker(&tracker)
+
+ for !tracker.IsDone() {
+ select {
+ case <-done:
+ return
+ case <-ticker:
+ status, err := doCheck(as, sn, dn.BackupID, defaultShowDetailRetryTimes)
+ if err != nil {
+ tracker.MarkAsErrored()
+ dn.Status = status
+ dn.EndTime = time.Now().Unix()
+ dnCh <- dn
+ done <- true
+ }
+ if status == model.SsBackupStatusCompleted || status == model.SsBackupStatusFailed {
+ tracker.MarkAsDone()
+ dn.Status = status
+ dn.EndTime = time.Now().Unix()
+ dnCh <- dn
+ done <- true
+ }
+ }
+ }
+}
+func doCheck(as pkg.IAgentServer, sn *model.StorageNode, backupID string, retries int) (status model.BackupStatus, err error) {
in := &model.ShowDetailIn{
DBPort: sn.Port,
DBName: sn.Database,
@@ -397,8 +427,19 @@ func checkStatus(as pkg.IAgentServer, sn *model.StorageNode, backupID string, st
}
backupInfo, err := as.ShowDetail(in)
if err != nil {
- logging.Error(fmt.Sprintf("get storage node [IP:%s] backup detail from agent server failed, will retry %d times.\n%s", sn.IP, retryTimes, err.Error()))
- return checkStatus(as, sn, backupID, model.SsBackupStatusCheckError, retryTimes-1)
+ if retries == 0 {
+ return model.SsBackupStatusCheckError, err
+ }
+ time.Sleep(time.Second * 1)
+ return doCheck(as, sn, backupID, retries-1)
+ }
+
+ return backupInfo.Status, nil
+}
+
+func convertLocalhost(ip string) string {
+ if ip == "127.0.0.1" {
+ return Host
}
- return checkStatus(as, sn, backupID, backupInfo.Status, retryTimes)
+ return ip
}
diff --git a/pitr/cli/internal/cmd/backup_test.go b/pitr/cli/internal/cmd/backup_test.go
index 1f56485..ada9434 100644
--- a/pitr/cli/internal/cmd/backup_test.go
+++ b/pitr/cli/internal/cmd/backup_test.go
@@ -36,7 +36,7 @@ import (
var ctrl *gomock.Controller
var _ = Describe("Backup", func() {
- Context("check status", func() {
+ Context("do check", func() {
var (
as *mock_pkg.MockIAgentServer
sn = &model.StorageNode{
@@ -53,23 +53,31 @@ var _ = Describe("Backup", func() {
It("agent server return err", func() {
as.EXPECT().ShowDetail(&model.ShowDetailIn{Instance: defaultInstance}).Return(nil, errors.New("timeout"))
- Expect(checkStatus(as, sn, "", "", 0)).To(Equal(model.SsBackupStatusCheckError))
+ status, err := doCheck(as, sn, "", 0)
+ Expect(err).To(HaveOccurred())
+ Expect(status).To(Equal(model.SsBackupStatusCheckError))
})
It("mock agent server and return failed status", func() {
as.EXPECT().ShowDetail(&model.ShowDetailIn{Instance: defaultInstance}).Return(&model.BackupInfo{Status: model.SsBackupStatusFailed}, nil)
- Expect(checkStatus(as, sn, "", "", 0)).To(Equal(model.SsBackupStatusFailed))
+ status, err := doCheck(as, sn, "", 0)
+ Expect(err).ToNot(HaveOccurred())
+ Expect(status).To(Equal(model.SsBackupStatusFailed))
})
It("mock agent server and return completed status", func() {
as.EXPECT().ShowDetail(&model.ShowDetailIn{Instance: defaultInstance}).Return(&model.BackupInfo{Status: model.SsBackupStatusCompleted}, nil)
- Expect(checkStatus(as, sn, "", "", 0)).To(Equal(model.SsBackupStatusCompleted))
+ status, err := doCheck(as, sn, "", 0)
+ Expect(err).ToNot(HaveOccurred())
+ Expect(status).To(Equal(model.SsBackupStatusCompleted))
})
- It("mock agent server and return timeout error first time and then retry 1 time return completed status", func() {
- as.EXPECT().ShowDetail(&model.ShowDetailIn{Instance: defaultInstance}).Times(1).Return(nil, errors.New("timeout"))
+ It("mock agent server and return check err first time and then success", func() {
+ as.EXPECT().ShowDetail(&model.ShowDetailIn{Instance: defaultInstance}).Return(nil, errors.New("timeout"))
as.EXPECT().ShowDetail(&model.ShowDetailIn{Instance: defaultInstance}).Return(&model.BackupInfo{Status: model.SsBackupStatusCompleted}, nil)
- Expect(checkStatus(as, sn, "", "", 1)).To(Equal(model.SsBackupStatusCompleted))
+ status, err := doCheck(as, sn, "", 1)
+ Expect(err).ToNot(HaveOccurred())
+ Expect(status).To(Equal(model.SsBackupStatusCompleted))
})
})
@@ -78,17 +86,15 @@ var _ = Describe("Backup", func() {
proxy *mock_pkg.MockIShardingSphereProxy
ls *mock_pkg.MockILocalStorage
)
-
BeforeEach(func() {
ctrl = gomock.NewController(GinkgoT())
proxy = mock_pkg.NewMockIShardingSphereProxy(ctrl)
ls = mock_pkg.NewMockILocalStorage(ctrl)
- })
+ })
AfterEach(func() {
ctrl.Finish()
})
-
It("export data", func() {
// mock proxy export metadata
proxy.EXPECT().ExportMetaData().Return(&model.ClusterInfo{}, nil)
@@ -104,16 +110,11 @@ var _ = Describe("Backup", func() {
Expect(bk.Info.CSN).To(Equal(""))
})
})
- Context("exec backup", func() {
- var as *mock_pkg.MockIAgentServer
- bak := &model.LsBackup{
- DnList: nil,
- SsBackup: &model.SsBackup{
- Status: "Running",
- StorageNodes: []*model.StorageNode{},
- },
- }
+ Context("exec backup", func() {
+ var (
+ as *mock_pkg.MockIAgentServer
+ )
BeforeEach(func() {
ctrl = gomock.NewController(GinkgoT())
as = mock_pkg.NewMockIAgentServer(ctrl)
@@ -121,6 +122,14 @@ var _ = Describe("Backup", func() {
AfterEach(func() {
ctrl.Finish()
})
+ bak := &model.LsBackup{
+ DnList: nil,
+ SsBackup: &model.SsBackup{
+ Status: "Running",
+ StorageNodes: []*model.StorageNode{},
+ },
+ }
+
It("exec backup empty storage nodes", func() {
Expect(execBackup(bak)).To(BeNil())
})
@@ -155,22 +164,76 @@ var _ = Describe("Backup", func() {
})
})
- Context("exec backup", func() {
- It("exec backup", func() {
- var (
- as *mock_pkg.MockIAgentServer
- node = &model.StorageNode{}
- failSnCh = make(chan *model.StorageNode, 10)
- dnCh = make(chan *model.DataNode, 10)
- )
+ Context("check backup status", func() {
+ var (
+ as *mock_pkg.MockIAgentServer
+ lsbackup *model.LsBackup
+ )
+ BeforeEach(func() {
+ lsbackup = &model.LsBackup{
+ DnList: []*model.DataNode{
+ {
+ IP: "127.0.0.1",
+ Port: 3306,
+ },
+ {
+ IP: "127.0.0.2",
+ Port: 3307,
+ },
+ },
+ SsBackup: &model.SsBackup{
+ Status: "Running",
+ StorageNodes: []*model.StorageNode{
+ {
+ IP: "127.0.0.1",
+ Port: 3306,
+ },
+ {
+ IP: "127.0.0.2",
+ Port: 3307,
+ },
+ },
+ },
+ Info: &model.BackupMetaInfo{},
+ }
+
+ ctrl = gomock.NewController(GinkgoT())
as = mock_pkg.NewMockIAgentServer(ctrl)
- defer close(dnCh)
- defer ctrl.Finish()
- as.EXPECT().Backup(gomock.Any()).Return("backup-id", nil)
- Expect(_execBackup(as, node, dnCh)).To(BeNil())
- Expect(len(failSnCh)).To(Equal(0))
- Expect(len(dnCh)).To(Equal(1))
+ monkey.Patch(pkg.NewAgentServer, func(_ string) pkg.IAgentServer {
+ return as
+ })
+ })
+ AfterEach(func() {
+ ctrl.Finish()
+ monkey.UnpatchAll()
+ })
+
+ It("check error", func() {
+ as.EXPECT().ShowDetail(gomock.Any()).Return(nil, errors.New("timeout")).AnyTimes()
+ Expect(checkBackupStatus(lsbackup)).To(Equal(model.SsBackupStatusFailed))
+ })
+
+ It("check error 2", func() {
+ as.EXPECT().ShowDetail(gomock.Any()).Return(nil, errors.New("timeout")).Times(1)
+ as.EXPECT().ShowDetail(gomock.Any()).Return(&model.BackupInfo{Status: model.SsBackupStatusFailed}, nil).AnyTimes()
+ Expect(checkBackupStatus(lsbackup)).To(Equal(model.SsBackupStatusFailed))
+ })
+
+ It("check error 3", func() {
+ as.EXPECT().ShowDetail(gomock.Any()).Return(nil, errors.New("timeout")).Times(2)
+ as.EXPECT().ShowDetail(gomock.Any()).Return(&model.BackupInfo{Status: model.SsBackupStatusCompleted}, nil).AnyTimes()
+ Expect(checkBackupStatus(lsbackup)).To(Equal(model.SsBackupStatusCompleted))
+ })
+
+ It("check failed", func() {
+ as.EXPECT().ShowDetail(gomock.Any()).Return(&model.BackupInfo{Status: model.SsBackupStatusFailed}, nil).AnyTimes()
+ Expect(checkBackupStatus(lsbackup)).To(Equal(model.SsBackupStatusFailed))
+ })
+
+ It("check success", func() {
+ as.EXPECT().ShowDetail(gomock.Any()).Return(&model.BackupInfo{Status: model.SsBackupStatusCompleted}, nil).AnyTimes()
+ Expect(checkBackupStatus(lsbackup)).To(Equal(model.SsBackupStatusCompleted))
})
})
})
diff --git a/pitr/cli/internal/pkg/model/as_backup.go b/pitr/cli/internal/pkg/model/as_backup.go
index bd18148..ee3078f 100644
--- a/pitr/cli/internal/pkg/model/as_backup.go
+++ b/pitr/cli/internal/pkg/model/as_backup.go
@@ -45,3 +45,9 @@ type AgentServerStatus struct {
IP string `json:"ip"`
Status string `json:"status"`
}
+
+type BackupResult struct {
+ IP string `json:"ip"`
+ Port uint16 `json:"port"`
+ Status BackupStatus `json:"status"`
+}
diff --git a/pitr/cli/pkg/prettyoutput/progress.go b/pitr/cli/pkg/prettyoutput/progress.go
new file mode 100644
index 0000000..e70552a
--- /dev/null
+++ b/pitr/cli/pkg/prettyoutput/progress.go
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package prettyoutput
+
+import (
+ "github.com/jedib0t/go-pretty/v6/progress"
+)
+
+func NewPW(totalNum int) progress.Writer {
+ pw := progress.NewWriter()
+ pw.SetTrackerLength(25)
+ pw.SetAutoStop(true)
+ pw.SetNumTrackersExpected(totalNum)
+ pw.SetSortBy(progress.SortByPercentDsc)
+ style := progress.StyleDefault
+ style.Options.PercentIndeterminate = "running"
+ pw.SetStyle(style)
+ pw.SetTrackerPosition(progress.PositionRight)
+ return pw
+}