You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ha...@apache.org on 2022/01/17 17:05:16 UTC

[skywalking-banyandb] 01/01: Fix issues of loading a database

This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch liaison-test
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit 67dd662abd8da20fad6a0bcf84c9d85e48752453
Author: Gao Hongtao <ha...@gmail.com>
AuthorDate: Mon Jan 17 16:58:34 2022 +0000

    Fix issues of loading a database
    
    Signed-off-by: Gao Hongtao <ha...@gmail.com>
---
 banyand/liaison/grpc/grpc_suite_test.go |  9 ++++++
 banyand/liaison/grpc/stream_test.go     | 57 +++++++++++++++++++++++----------
 banyand/query/processor.go              |  2 ++
 banyand/stream/stream_query.go          |  2 +-
 banyand/tsdb/segment.go                 |  1 +
 banyand/tsdb/shard.go                   | 14 +++++++-
 banyand/tsdb/tsdb.go                    |  8 +++--
 go.mod                                  |  2 +-
 go.sum                                  |  4 +--
 pkg/encoding/plain.go                   |  4 +--
 pkg/index/inverted/inverted.go          |  2 +-
 pkg/index/lsm/lsm.go                    |  4 ++-
 pkg/index/metadata/metadata.go          |  7 ++++
 pkg/run/test.go                         |  2 +-
 14 files changed, 88 insertions(+), 30 deletions(-)

diff --git a/banyand/liaison/grpc/grpc_suite_test.go b/banyand/liaison/grpc/grpc_suite_test.go
index ae8f957..ef85011 100644
--- a/banyand/liaison/grpc/grpc_suite_test.go
+++ b/banyand/liaison/grpc/grpc_suite_test.go
@@ -22,9 +22,18 @@ import (
 
 	. "github.com/onsi/ginkgo/v2"
 	. "github.com/onsi/gomega"
+
+	"github.com/apache/skywalking-banyandb/pkg/logger"
 )
 
 func TestGrpc(t *testing.T) {
 	RegisterFailHandler(Fail)
 	RunSpecs(t, "Grpc Suite")
 }
+
+var _ = BeforeSuite(func() {
+	Expect(logger.Init(logger.Logging{
+		Env:   "dev",
+		Level: "warn",
+	})).Should(Succeed())
+})
diff --git a/banyand/liaison/grpc/stream_test.go b/banyand/liaison/grpc/stream_test.go
index 596f4ed..ea28868 100644
--- a/banyand/liaison/grpc/stream_test.go
+++ b/banyand/liaison/grpc/stream_test.go
@@ -38,7 +38,6 @@ import (
 	"github.com/apache/skywalking-banyandb/banyand/query"
 	"github.com/apache/skywalking-banyandb/banyand/queue"
 	"github.com/apache/skywalking-banyandb/banyand/stream"
-	"github.com/apache/skywalking-banyandb/pkg/logger"
 	pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
 	"github.com/apache/skywalking-banyandb/pkg/run"
 	"github.com/apache/skywalking-banyandb/pkg/test"
@@ -46,10 +45,20 @@ import (
 )
 
 var _ = Describe("Stream", func() {
-	var gracefulStop func()
+	var rootPath, metadataPath string
+	var gracefulStop, deferRootFunc, deferMetadataFunc func()
 	var conn *grpclib.ClientConn
+	BeforeEach(func() {
+		var err error
+		rootPath, deferRootFunc, err = test.NewSpace()
+		Expect(err).NotTo(HaveOccurred())
+		metadataPath, deferMetadataFunc, err = test.NewSpace()
+		Expect(err).NotTo(HaveOccurred())
+	})
 	It("is a plain server", func() {
-		gracefulStop = setup(nil)
+		By("Verifying an empty server")
+		flags := []string{"--root-path=" + rootPath, "--metadata-root-path=" + metadataPath}
+		gracefulStop = setup(flags)
 		var err error
 		conn, err = grpclib.Dial("localhost:17912", grpclib.WithInsecure())
 		Expect(err).NotTo(HaveOccurred())
@@ -57,9 +66,23 @@ var _ = Describe("Stream", func() {
 		Eventually(func() (int, error) {
 			return streamQuery(conn)
 		}).Should(Equal(1))
+		_ = conn.Close()
+		gracefulStop()
+		By("Verifying an existing server")
+		gracefulStop = setup(flags)
+		conn, err = grpclib.Dial("localhost:17912", grpclib.WithInsecure())
+		Expect(err).NotTo(HaveOccurred())
+		Eventually(func() int {
+			num, err := streamQuery(conn)
+			if err != nil {
+				GinkgoWriter.Printf("stream query err: %v \n", err)
+				return 0
+			}
+			return num
+		}).Should(Equal(1))
 	})
 	It("is a TLS server", func() {
-		flags := []string{"--tls=true"}
+		flags := []string{"--tls=true", "--root-path=" + rootPath, "--metadata-root-path=" + metadataPath}
 		_, currentFile, _, _ := runtime.Caller(0)
 		basePath := filepath.Dir(currentFile)
 		certFile := filepath.Join(basePath, "testdata/server_cert.pem")
@@ -81,14 +104,13 @@ var _ = Describe("Stream", func() {
 	AfterEach(func() {
 		_ = conn.Close()
 		gracefulStop()
+		deferMetadataFunc()
+		deferRootFunc()
 	})
 })
 
 func setup(flags []string) func() {
-	Expect(logger.Init(logger.Logging{
-		Env:   "dev",
-		Level: "warn",
-	})).Should(Succeed())
+
 	g := run.Group{Name: "standalone"}
 	// Init `Discovery` module
 	repo, err := discovery.NewServiceRepo(context.Background())
@@ -123,10 +145,6 @@ func setup(flags []string) func() {
 		tcp,
 		startListener,
 	)
-	// Create a random directory
-	rootPath, deferFunc, err := test.NewSpace()
-	Expect(err).NotTo(HaveOccurred())
-	flags = append(flags, "--root-path="+rootPath, "--metadata-root-path="+teststream.RandomTempDir())
 
 	err = g.RegisterFlags().Parse(flags)
 	Expect(err).NotTo(HaveOccurred())
@@ -140,9 +158,8 @@ func setup(flags []string) func() {
 		errRun := g.Run()
 		if errRun != nil {
 			startListener.GracefulStop()
-			Expect(errRun).Should(Succeed())
+			Expect(errRun).NotTo(HaveOccurred())
 		}
-		deferFunc()
 	}()
 	Expect(startListener.WaitUntilStarted()).Should(Succeed())
 	return func() {
@@ -195,9 +212,15 @@ func queryCriteria(baseTs time.Time) *streamv1.QueryRequest {
 func streamWrite(conn *grpclib.ClientConn) {
 	client := streamv1.NewStreamServiceClient(conn)
 	ctx := context.Background()
-	writeClient, errorWrite := client.Write(ctx)
-	Expect(errorWrite).Should(Succeed())
-	Expect(writeClient.Send(writeData())).Should(Succeed())
+	var writeClient streamv1.StreamService_WriteClient
+	Eventually(func(g Gomega) {
+		var err error
+		writeClient, err = client.Write(ctx)
+		g.Expect(err).NotTo(HaveOccurred())
+	}).Should(Succeed())
+	Eventually(func() error {
+		return writeClient.Send(writeData())
+	}).ShouldNot(HaveOccurred())
 	Expect(writeClient.CloseSend()).Should(Succeed())
 	Eventually(func() error {
 		_, err := writeClient.Recv()
diff --git a/banyand/query/processor.go b/banyand/query/processor.go
index bfc9a10..507fb5f 100644
--- a/banyand/query/processor.go
+++ b/banyand/query/processor.go
@@ -83,6 +83,8 @@ func (q *queryProcessor) Rev(message bus.Message) (resp bus.Message) {
 		return
 	}
 
+	q.log.Debug().Str("plan", p.String()).Msg("query plan")
+
 	entities, err := p.Execute(ec)
 	if err != nil {
 		q.log.Error().Err(err).Msg("fail to execute the query plan")
diff --git a/banyand/stream/stream_query.go b/banyand/stream/stream_query.go
index 0c75b14..dfca4f4 100644
--- a/banyand/stream/stream_query.go
+++ b/banyand/stream/stream_query.go
@@ -78,7 +78,7 @@ func (s *stream) Shard(id common.ShardID) (tsdb.Shard, error) {
 func (s *stream) ParseTagFamily(family string, item tsdb.Item) (*modelv1.TagFamily, error) {
 	familyRawBytes, err := item.Family(family)
 	if err != nil {
-		return nil, err
+		return nil, errors.Wrapf(err, "parse family %s", family)
 	}
 	tagFamily := &modelv1.TagFamilyForWrite{}
 	err = proto.Unmarshal(familyRawBytes, tagFamily)
diff --git a/banyand/tsdb/segment.go b/banyand/tsdb/segment.go
index a2450a4..6a8305e 100644
--- a/banyand/tsdb/segment.go
+++ b/banyand/tsdb/segment.go
@@ -102,4 +102,5 @@ func (s *segment) close() {
 	for _, b := range s.lst {
 		b.close()
 	}
+	s.globalIndex.Close()
 }
diff --git a/banyand/tsdb/shard.go b/banyand/tsdb/shard.go
index 09b6ded..7b1b7b9 100644
--- a/banyand/tsdb/shard.go
+++ b/banyand/tsdb/shard.go
@@ -19,17 +19,20 @@ package tsdb
 
 import (
 	"context"
+	"strconv"
 	"sync"
 	"time"
 
 	"github.com/apache/skywalking-banyandb/api/common"
+	"github.com/apache/skywalking-banyandb/pkg/logger"
 )
 
 var _ Shard = (*shard)(nil)
 
 type shard struct {
 	sync.Mutex
-	id common.ShardID
+	id     common.ShardID
+	logger *logger.Logger
 
 	location       string
 	seriesDatabase SeriesDatabase
@@ -54,6 +57,12 @@ func openShard(ctx context.Context, id common.ShardID, location string) (*shard,
 		id:       id,
 		location: location,
 	}
+	parentLogger := ctx.Value(logger.ContextKey)
+	if parentLogger != nil {
+		if pl, ok := parentLogger.(*logger.Logger); ok {
+			s.logger = pl.Named("shard" + strconv.Itoa(int(id)))
+		}
+	}
 	loadSeg := func(path string) error {
 		seg, err := newSegment(ctx, path)
 		if err != nil {
@@ -64,9 +73,11 @@ func openShard(ctx context.Context, id common.ShardID, location string) (*shard,
 			defer s.Unlock()
 			s.lst = append(s.lst, seg)
 		}
+		s.logger.Info().Int("size", len(s.lst)).Msg("seg size")
 		return nil
 	}
 	err := walkDir(location, segPathPrefix, func(_, absolutePath string) error {
+		s.logger.Info().Str("path", absolutePath).Msg("loading a segment")
 		return loadSeg(absolutePath)
 	})
 	if err != nil {
@@ -78,6 +89,7 @@ func openShard(ctx context.Context, id common.ShardID, location string) (*shard,
 		if err != nil {
 			return nil, err
 		}
+		s.logger.Info().Str("path", segPath).Msg("creating a new segment")
 		err = loadSeg(segPath)
 		if err != nil {
 			return nil, err
diff --git a/banyand/tsdb/tsdb.go b/banyand/tsdb/tsdb.go
index fbb710a..e746890 100644
--- a/banyand/tsdb/tsdb.go
+++ b/banyand/tsdb/tsdb.go
@@ -186,7 +186,8 @@ func loadDatabase(ctx context.Context, db *database) (Database, error) {
 		if shardID >= int(db.shardNum) {
 			return nil
 		}
-		so, errOpenShard := openShard(ctx, common.ShardID(shardID), absolutePath)
+		db.logger.Info().Int("shard_id", shardID).Str("path", absolutePath).Msg("opening a shard")
+		so, errOpenShard := openShard(context.WithValue(ctx, logger.ContextKey, db.logger), common.ShardID(shardID), absolutePath)
 		if errOpenShard != nil {
 			return errOpenShard
 		}
@@ -210,8 +211,9 @@ func walkDir(root, prefix string, walkFn walkFn) error {
 		if !f.IsDir() || !strings.HasPrefix(f.Name(), prefix) {
 			continue
 		}
-		if walkFn(f.Name(), fmt.Sprintf(rootPrefix, root)+f.Name()) != nil {
-			return errors.WithMessagef(err, "failed to load: %s", f.Name())
+		errWalk := walkFn(f.Name(), fmt.Sprintf(rootPrefix, root)+f.Name())
+		if errWalk != nil {
+			return errors.WithMessagef(errWalk, "failed to load: %s", f.Name())
 		}
 	}
 	return nil
diff --git a/go.mod b/go.mod
index 38d87ab..ed53ef6 100644
--- a/go.mod
+++ b/go.mod
@@ -106,4 +106,4 @@ require (
 	sigs.k8s.io/yaml v1.2.0 // indirect
 )
 
-replace github.com/dgraph-io/badger/v3 v3.2011.1 => github.com/SkyAPM/badger/v3 v3.0.0-20211119041803-47ac7c51ca6a
+replace github.com/dgraph-io/badger/v3 v3.2011.1 => github.com/SkyAPM/badger/v3 v3.0.0-20220117144524-89781ee8a386
diff --git a/go.sum b/go.sum
index fb13844..2716245 100644
--- a/go.sum
+++ b/go.sum
@@ -47,8 +47,8 @@ github.com/RoaringBitmap/gocroaring v0.4.0/go.mod h1:NieMwz7ZqwU2DD73/vvYwv7r4eW
 github.com/RoaringBitmap/real-roaring-datasets v0.0.0-20190726190000-eb7c87156f76/go.mod h1:oM0MHmQ3nDsq609SS36p+oYbRi16+oVvU2Bw4Ipv0SE=
 github.com/RoaringBitmap/roaring v0.9.1 h1:5PRizBmoN/PfV17nPNQou4dHQ7NcJi8FO/bihdYyCEM=
 github.com/RoaringBitmap/roaring v0.9.1/go.mod h1:h1B7iIUOmnAeb5ytYMvnHJwxMc6LUrwBnzXWRuqTQUc=
-github.com/SkyAPM/badger/v3 v3.0.0-20211119041803-47ac7c51ca6a h1:kcUQmdVI0E0J8bfwJpbQhWOOxijKNeoEfLsiIkayf1E=
-github.com/SkyAPM/badger/v3 v3.0.0-20211119041803-47ac7c51ca6a/go.mod h1:RHo4/GmYcKKh5Lxu63wLEMHJ70Pac2JqZRYGhlyAo2M=
+github.com/SkyAPM/badger/v3 v3.0.0-20220117144524-89781ee8a386 h1:2VmCNyGlF/yY+Ev9bxCiPcKGaWSEQDiRCRiCJYWPh7o=
+github.com/SkyAPM/badger/v3 v3.0.0-20220117144524-89781ee8a386/go.mod h1:RHo4/GmYcKKh5Lxu63wLEMHJ70Pac2JqZRYGhlyAo2M=
 github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
 github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
 github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
diff --git a/pkg/encoding/plain.go b/pkg/encoding/plain.go
index 0b3fb8c..e511441 100644
--- a/pkg/encoding/plain.go
+++ b/pkg/encoding/plain.go
@@ -149,7 +149,7 @@ func (t *plainEncoder) Encode() ([]byte, error) {
 	l := len(data)
 	dst := make([]byte, 0, compressBound(l))
 	dst = zstdEncoder.EncodeAll(data, dst)
-	result := buffer.NewBufferWriter(bytes.NewBuffer(make([]byte, len(dst)+2)))
+	result := buffer.NewBufferWriter(bytes.NewBuffer(make([]byte, 0, len(dst)+2)))
 	result.Write(dst)
 	result.PutUint16(uint16(l))
 	return result.Bytes(), nil
@@ -187,7 +187,7 @@ func (t *plainDecoder) Decode(_, rawData []byte) (err error) {
 	var data []byte
 	size := binary.LittleEndian.Uint16(rawData[len(rawData)-2:])
 	if data, err = zstdDecoder.DecodeAll(rawData[:len(rawData)-2], make([]byte, 0, size)); err != nil {
-		return err
+		return errors.Wrap(err, "plain decoder fails to decode")
 	}
 	l := uint32(len(data))
 	if l <= 8 {
diff --git a/pkg/index/inverted/inverted.go b/pkg/index/inverted/inverted.go
index cd56d18..7c19920 100644
--- a/pkg/index/inverted/inverted.go
+++ b/pkg/index/inverted/inverted.go
@@ -69,7 +69,7 @@ func NewStore(opts StoreOpts) (index.Store, error) {
 }
 
 func (s *store) Close() error {
-	return s.diskTable.Close()
+	return multierr.Combine(s.diskTable.Close(), s.termMetadata.Close())
 }
 
 func (s *store) Write(field index.Field, chunkID common.ItemID) error {
diff --git a/pkg/index/lsm/lsm.go b/pkg/index/lsm/lsm.go
index 288d16c..3a7cddd 100644
--- a/pkg/index/lsm/lsm.go
+++ b/pkg/index/lsm/lsm.go
@@ -18,6 +18,8 @@
 package lsm
 
 import (
+	"go.uber.org/multierr"
+
 	"github.com/apache/skywalking-banyandb/api/common"
 	"github.com/apache/skywalking-banyandb/banyand/kv"
 	"github.com/apache/skywalking-banyandb/pkg/convert"
@@ -34,7 +36,7 @@ type store struct {
 }
 
 func (s *store) Close() error {
-	return s.lsm.Close()
+	return multierr.Combine(s.lsm.Close(), s.termMetadata.Close())
 }
 
 func (s *store) Write(field index.Field, itemID common.ItemID) error {
diff --git a/pkg/index/metadata/metadata.go b/pkg/index/metadata/metadata.go
index 750e3db..fd7d5d0 100644
--- a/pkg/index/metadata/metadata.go
+++ b/pkg/index/metadata/metadata.go
@@ -18,6 +18,8 @@
 package metadata
 
 import (
+	"io"
+
 	"github.com/pkg/errors"
 
 	"github.com/apache/skywalking-banyandb/banyand/kv"
@@ -28,6 +30,7 @@ import (
 type Term interface {
 	ID(term []byte) (id []byte, err error)
 	Literal(id []byte) (term []byte, err error)
+	io.Closer
 }
 
 var _ Term = (*term)(nil)
@@ -64,3 +67,7 @@ func (t *term) ID(term []byte) (id []byte, err error) {
 func (t *term) Literal(id []byte) (term []byte, err error) {
 	return t.store.Get(id)
 }
+
+func (t *term) Close() error {
+	return t.store.Close()
+}
diff --git a/pkg/run/test.go b/pkg/run/test.go
index 98e1514..17b22cb 100644
--- a/pkg/run/test.go
+++ b/pkg/run/test.go
@@ -43,7 +43,7 @@ func NewTester(ID string) *Tester {
 func (t *Tester) WaitUntilStarted() error {
 	select {
 	case err := <-t.stopCh:
-		return fmt.Errorf("stoped: %v", err)
+		return fmt.Errorf("stopped: %v", err)
 	case <-t.startedNotifier:
 		return nil
 	}