You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ha...@apache.org on 2022/01/06 03:43:32 UTC

[skywalking-banyandb] branch tsdb-load created (now 24eb7f0)

This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a change to branch tsdb-load
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git.


      at 24eb7f0  Open the tsdb on a existing path

This branch includes the following new commits:

     new 24eb7f0  Open the tsdb on a existing path

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


[skywalking-banyandb] 01/01: Open the tsdb on a existing path

Posted by ha...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch tsdb-load
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit 24eb7f01417ace87ad6f5d2fefcc25594024fc31
Author: Gao Hongtao <ha...@gmail.com>
AuthorDate: Thu Jan 6 03:43:07 2022 +0000

    Open the tsdb on a existing path
---
 banyand/tsdb/segment.go   | 37 +++++++++++++++++++---------
 banyand/tsdb/shard.go     | 33 ++++++++++++++++++-------
 banyand/tsdb/tsdb.go      | 61 +++++++++++++++++++++++++++++++++++++++++------
 banyand/tsdb/tsdb_test.go | 28 ++++++++++++++++++----
 4 files changed, 127 insertions(+), 32 deletions(-)

diff --git a/banyand/tsdb/segment.go b/banyand/tsdb/segment.go
index b2dad50..a2450a4 100644
--- a/banyand/tsdb/segment.go
+++ b/banyand/tsdb/segment.go
@@ -63,20 +63,35 @@ func newSegment(ctx context.Context, path string) (s *segment, err error) {
 	if s.globalIndex, err = kv.OpenStore(0, indexPath, kv.StoreWithLogger(s.l)); err != nil {
 		return nil, err
 	}
-	blockPath, err := mkdir(blockTemplate, path, time.Now().Format(blockFormat))
-	if err != nil {
-		return nil, err
+	loadBlock := func(path string) error {
+		var b *block
+		if b, err = newBlock(context.WithValue(ctx, logger.ContextKey, s.l), blockOpts{
+			path: path,
+		}); err != nil {
+			return err
+		}
+		{
+			s.Lock()
+			defer s.Unlock()
+			s.lst = append(s.lst, b)
+		}
+		return nil
 	}
-	var b *block
-	if b, err = newBlock(context.WithValue(ctx, logger.ContextKey, s.l), blockOpts{
-		path: blockPath,
-	}); err != nil {
+	err = walkDir(path, blockPathPrefix, func(name, absolutePath string) error {
+		return loadBlock(absolutePath)
+	})
+	if err != nil {
 		return nil, err
 	}
-	{
-		s.Lock()
-		defer s.Unlock()
-		s.lst = append(s.lst, b)
+	if len(s.lst) < 1 {
+		blockPath, err := mkdir(blockTemplate, path, time.Now().Format(blockFormat))
+		if err != nil {
+			return nil, err
+		}
+		err = loadBlock(blockPath)
+		if err != nil {
+			return nil, err
+		}
 	}
 	return s, nil
 }
diff --git a/banyand/tsdb/shard.go b/banyand/tsdb/shard.go
index 4bcc40e..451b530 100644
--- a/banyand/tsdb/shard.go
+++ b/banyand/tsdb/shard.go
@@ -49,23 +49,38 @@ func (s *shard) Index() IndexDatabase {
 	return s.indexDatabase
 }
 
-func newShard(ctx context.Context, id common.ShardID, location string) (*shard, error) {
+func openShard(ctx context.Context, id common.ShardID, location string) (*shard, error) {
 	s := &shard{
 		id:       id,
 		location: location,
 	}
-	segPath, err := mkdir(segTemplate, location, time.Now().Format(segFormat))
-	if err != nil {
-		return nil, err
+	loadSeg := func(path string) error {
+		seg, err := newSegment(ctx, path)
+		if err != nil {
+			return err
+		}
+		{
+			s.Lock()
+			defer s.Unlock()
+			s.lst = append(s.lst, seg)
+		}
+		return nil
 	}
-	seg, err := newSegment(ctx, segPath)
+	err := walkDir(location, segPathPrefix, func(_, absolutePath string) error {
+		return loadSeg(absolutePath)
+	})
 	if err != nil {
 		return nil, err
 	}
-	{
-		s.Lock()
-		defer s.Unlock()
-		s.lst = append(s.lst, seg)
+	if len(s.lst) < 1 {
+		segPath, err := mkdir(segTemplate, location, time.Now().Format(segFormat))
+		if err != nil {
+			return nil, err
+		}
+		err = loadSeg(segPath)
+		if err != nil {
+			return nil, err
+		}
 	}
 	seriesPath, err := mkdir(seriesTemplate, s.location)
 	if err != nil {
diff --git a/banyand/tsdb/tsdb.go b/banyand/tsdb/tsdb.go
index e9ed19a..fbb710a 100644
--- a/banyand/tsdb/tsdb.go
+++ b/banyand/tsdb/tsdb.go
@@ -24,6 +24,8 @@ import (
 	"io/fs"
 	"io/ioutil"
 	"os"
+	"strconv"
+	"strings"
 	"sync"
 
 	"github.com/pkg/errors"
@@ -36,11 +38,16 @@ import (
 )
 
 const (
-	shardTemplate       = "%s/shard-%d"
-	seriesTemplate      = "%s/series"
-	segTemplate         = "%s/seg-%s"
-	blockTemplate       = "%s/block-%s"
-	globalIndexTemplate = "%s/index"
+	shardPathPrefix     = "shard"
+	pathSeparator       = string(os.PathSeparator)
+	rootPrefix          = "%s" + pathSeparator
+	shardTemplate       = rootPrefix + shardPathPrefix + "-%d"
+	seriesTemplate      = rootPrefix + "series"
+	segPathPrefix       = "seg"
+	segTemplate         = rootPrefix + segPathPrefix + "-%s"
+	blockPathPrefix     = "block"
+	blockTemplate       = rootPrefix + blockPathPrefix + "-%s"
+	globalIndexTemplate = rootPrefix + "index"
 
 	segFormat   = "20060102"
 	blockFormat = "1504"
@@ -155,7 +162,7 @@ func createDatabase(ctx context.Context, db *database) (Database, error) {
 			err = multierr.Append(err, errInternal)
 			continue
 		}
-		so, errNewShard := newShard(ctx, common.ShardID(i), shardLocation)
+		so, errNewShard := openShard(ctx, common.ShardID(i), shardLocation)
 		if errNewShard != nil {
 			err = multierr.Append(err, errNewShard)
 			continue
@@ -166,10 +173,50 @@ func createDatabase(ctx context.Context, db *database) (Database, error) {
 }
 
 func loadDatabase(ctx context.Context, db *database) (Database, error) {
-	//TODO: load the existing database
+	//TODO: open the lock file
+	//TODO: open the manifest file
+	db.Lock()
+	defer db.Unlock()
+	err := walkDir(db.location, shardPathPrefix, func(name, absolutePath string) error {
+		shardSegs := strings.Split(name, "-")
+		shardID, err := strconv.Atoi(shardSegs[1])
+		if err != nil {
+			return err
+		}
+		if shardID >= int(db.shardNum) {
+			return nil
+		}
+		so, errOpenShard := openShard(ctx, common.ShardID(shardID), absolutePath)
+		if errOpenShard != nil {
+			return errOpenShard
+		}
+		db.sLst = append(db.sLst, so)
+		return nil
+	})
+	if err != nil {
+		return nil, errors.WithMessage(err, "load the database failed")
+	}
 	return db, nil
 }
 
+type walkFn func(name, absolutePath string) error
+
+func walkDir(root, prefix string, walkFn walkFn) error {
+	files, err := ioutil.ReadDir(root)
+	if err != nil {
+		return errors.Wrapf(err, "failed to walk the database path: %s", root)
+	}
+	for _, f := range files {
+		if !f.IsDir() || !strings.HasPrefix(f.Name(), prefix) {
+			continue
+		}
+		if walkFn(f.Name(), fmt.Sprintf(rootPrefix, root)+f.Name()) != nil {
+			return errors.WithMessagef(err, "failed to load: %s", f.Name())
+		}
+	}
+	return nil
+}
+
 func mkdir(format string, a ...interface{}) (path string, err error) {
 	path = fmt.Sprintf(format, a...)
 	if err = os.MkdirAll(path, dirPerm); err != nil {
diff --git a/banyand/tsdb/tsdb_test.go b/banyand/tsdb/tsdb_test.go
index c570fdf..bfb9957 100644
--- a/banyand/tsdb/tsdb_test.go
+++ b/banyand/tsdb/tsdb_test.go
@@ -34,8 +34,27 @@ import (
 
 func TestOpenDatabase(t *testing.T) {
 	tester := assert.New(t)
-	tempDir, deferFunc, _ := setUp(require.New(t))
+	req := require.New(t)
+	tempDir, deferFunc := test.Space(req)
+	openDatabase(req, tempDir)
 	defer deferFunc()
+	verifyDatabaseStructure(tester, tempDir)
+}
+
+func TestReOpenDatabase(t *testing.T) {
+	tester := assert.New(t)
+	req := require.New(t)
+	tempDir, deferFunc := test.Space(req)
+	defer deferFunc()
+	db := openDatabase(req, tempDir)
+	req.NoError(db.Close())
+	verifyDatabaseStructure(tester, tempDir)
+	db = openDatabase(req, tempDir)
+	req.NoError(db.Close())
+	verifyDatabaseStructure(tester, tempDir)
+}
+
+func verifyDatabaseStructure(tester *assert.Assertions, tempDir string) {
 	shardPath := fmt.Sprintf(shardTemplate, tempDir, 0)
 	validateDirectory(tester, shardPath)
 	seriesPath := fmt.Sprintf(seriesTemplate, shardPath)
@@ -46,16 +65,15 @@ func TestOpenDatabase(t *testing.T) {
 	validateDirectory(tester, fmt.Sprintf(blockTemplate, segPath, now.Format(blockFormat)))
 }
 
-func setUp(t *require.Assertions) (tempDir string, deferFunc func(), db Database) {
+func openDatabase(t *require.Assertions, path string) (db Database) {
 	t.NoError(logger.Init(logger.Logging{
 		Env:   "dev",
 		Level: "warn",
 	}))
-	tempDir, deferFunc = test.Space(t)
 	db, err := OpenDatabase(
 		context.WithValue(context.Background(), logger.ContextKey, logger.GetLogger("test")),
 		DatabaseOpts{
-			Location: tempDir,
+			Location: path,
 			ShardNum: 1,
 			EncodingMethod: EncodingMethod{
 				EncoderPool: encoding.NewPlainEncoderPool(0),
@@ -64,7 +82,7 @@ func setUp(t *require.Assertions) (tempDir string, deferFunc func(), db Database
 		})
 	t.NoError(err)
 	t.NotNil(db)
-	return tempDir, deferFunc, db
+	return db
 }
 
 func validateDirectory(t *assert.Assertions, dir string) {