You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ha...@apache.org on 2022/01/17 17:05:16 UTC
[skywalking-banyandb] 01/01: Fix issues of loading a database
This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch liaison-test
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 67dd662abd8da20fad6a0bcf84c9d85e48752453
Author: Gao Hongtao <ha...@gmail.com>
AuthorDate: Mon Jan 17 16:58:34 2022 +0000
Fix issues of loading a database
Signed-off-by: Gao Hongtao <ha...@gmail.com>
---
banyand/liaison/grpc/grpc_suite_test.go | 9 ++++++
banyand/liaison/grpc/stream_test.go | 57 +++++++++++++++++++++++----------
banyand/query/processor.go | 2 ++
banyand/stream/stream_query.go | 2 +-
banyand/tsdb/segment.go | 1 +
banyand/tsdb/shard.go | 14 +++++++-
banyand/tsdb/tsdb.go | 8 +++--
go.mod | 2 +-
go.sum | 4 +--
pkg/encoding/plain.go | 4 +--
pkg/index/inverted/inverted.go | 2 +-
pkg/index/lsm/lsm.go | 4 ++-
pkg/index/metadata/metadata.go | 7 ++++
pkg/run/test.go | 2 +-
14 files changed, 88 insertions(+), 30 deletions(-)
diff --git a/banyand/liaison/grpc/grpc_suite_test.go b/banyand/liaison/grpc/grpc_suite_test.go
index ae8f957..ef85011 100644
--- a/banyand/liaison/grpc/grpc_suite_test.go
+++ b/banyand/liaison/grpc/grpc_suite_test.go
@@ -22,9 +22,18 @@ import (
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
+
+ "github.com/apache/skywalking-banyandb/pkg/logger"
)
func TestGrpc(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "Grpc Suite")
}
+
+var _ = BeforeSuite(func() {
+ Expect(logger.Init(logger.Logging{
+ Env: "dev",
+ Level: "warn",
+ })).Should(Succeed())
+})
diff --git a/banyand/liaison/grpc/stream_test.go b/banyand/liaison/grpc/stream_test.go
index 596f4ed..ea28868 100644
--- a/banyand/liaison/grpc/stream_test.go
+++ b/banyand/liaison/grpc/stream_test.go
@@ -38,7 +38,6 @@ import (
"github.com/apache/skywalking-banyandb/banyand/query"
"github.com/apache/skywalking-banyandb/banyand/queue"
"github.com/apache/skywalking-banyandb/banyand/stream"
- "github.com/apache/skywalking-banyandb/pkg/logger"
pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
"github.com/apache/skywalking-banyandb/pkg/run"
"github.com/apache/skywalking-banyandb/pkg/test"
@@ -46,10 +45,20 @@ import (
)
var _ = Describe("Stream", func() {
- var gracefulStop func()
+ var rootPath, metadataPath string
+ var gracefulStop, deferRootFunc, deferMetadataFunc func()
var conn *grpclib.ClientConn
+ BeforeEach(func() {
+ var err error
+ rootPath, deferRootFunc, err = test.NewSpace()
+ Expect(err).NotTo(HaveOccurred())
+ metadataPath, deferMetadataFunc, err = test.NewSpace()
+ Expect(err).NotTo(HaveOccurred())
+ })
It("is a plain server", func() {
- gracefulStop = setup(nil)
+ By("Verifying an empty server")
+ flags := []string{"--root-path=" + rootPath, "--metadata-root-path=" + metadataPath}
+ gracefulStop = setup(flags)
var err error
conn, err = grpclib.Dial("localhost:17912", grpclib.WithInsecure())
Expect(err).NotTo(HaveOccurred())
@@ -57,9 +66,23 @@ var _ = Describe("Stream", func() {
Eventually(func() (int, error) {
return streamQuery(conn)
}).Should(Equal(1))
+ _ = conn.Close()
+ gracefulStop()
+ By("Verifying an existing server")
+ gracefulStop = setup(flags)
+ conn, err = grpclib.Dial("localhost:17912", grpclib.WithInsecure())
+ Expect(err).NotTo(HaveOccurred())
+ Eventually(func() int {
+ num, err := streamQuery(conn)
+ if err != nil {
+ GinkgoWriter.Printf("stream query err: %v \n", err)
+ return 0
+ }
+ return num
+ }).Should(Equal(1))
})
It("is a TLS server", func() {
- flags := []string{"--tls=true"}
+ flags := []string{"--tls=true", "--root-path=" + rootPath, "--metadata-root-path=" + metadataPath}
_, currentFile, _, _ := runtime.Caller(0)
basePath := filepath.Dir(currentFile)
certFile := filepath.Join(basePath, "testdata/server_cert.pem")
@@ -81,14 +104,13 @@ var _ = Describe("Stream", func() {
AfterEach(func() {
_ = conn.Close()
gracefulStop()
+ deferMetadataFunc()
+ deferRootFunc()
})
})
func setup(flags []string) func() {
- Expect(logger.Init(logger.Logging{
- Env: "dev",
- Level: "warn",
- })).Should(Succeed())
+
g := run.Group{Name: "standalone"}
// Init `Discovery` module
repo, err := discovery.NewServiceRepo(context.Background())
@@ -123,10 +145,6 @@ func setup(flags []string) func() {
tcp,
startListener,
)
- // Create a random directory
- rootPath, deferFunc, err := test.NewSpace()
- Expect(err).NotTo(HaveOccurred())
- flags = append(flags, "--root-path="+rootPath, "--metadata-root-path="+teststream.RandomTempDir())
err = g.RegisterFlags().Parse(flags)
Expect(err).NotTo(HaveOccurred())
@@ -140,9 +158,8 @@ func setup(flags []string) func() {
errRun := g.Run()
if errRun != nil {
startListener.GracefulStop()
- Expect(errRun).Should(Succeed())
+ Expect(errRun).NotTo(HaveOccurred())
}
- deferFunc()
}()
Expect(startListener.WaitUntilStarted()).Should(Succeed())
return func() {
@@ -195,9 +212,15 @@ func queryCriteria(baseTs time.Time) *streamv1.QueryRequest {
func streamWrite(conn *grpclib.ClientConn) {
client := streamv1.NewStreamServiceClient(conn)
ctx := context.Background()
- writeClient, errorWrite := client.Write(ctx)
- Expect(errorWrite).Should(Succeed())
- Expect(writeClient.Send(writeData())).Should(Succeed())
+ var writeClient streamv1.StreamService_WriteClient
+ Eventually(func(g Gomega) {
+ var err error
+ writeClient, err = client.Write(ctx)
+ g.Expect(err).NotTo(HaveOccurred())
+ }).Should(Succeed())
+ Eventually(func() error {
+ return writeClient.Send(writeData())
+ }).ShouldNot(HaveOccurred())
Expect(writeClient.CloseSend()).Should(Succeed())
Eventually(func() error {
_, err := writeClient.Recv()
diff --git a/banyand/query/processor.go b/banyand/query/processor.go
index bfc9a10..507fb5f 100644
--- a/banyand/query/processor.go
+++ b/banyand/query/processor.go
@@ -83,6 +83,8 @@ func (q *queryProcessor) Rev(message bus.Message) (resp bus.Message) {
return
}
+ q.log.Debug().Str("plan", p.String()).Msg("query plan")
+
entities, err := p.Execute(ec)
if err != nil {
q.log.Error().Err(err).Msg("fail to execute the query plan")
diff --git a/banyand/stream/stream_query.go b/banyand/stream/stream_query.go
index 0c75b14..dfca4f4 100644
--- a/banyand/stream/stream_query.go
+++ b/banyand/stream/stream_query.go
@@ -78,7 +78,7 @@ func (s *stream) Shard(id common.ShardID) (tsdb.Shard, error) {
func (s *stream) ParseTagFamily(family string, item tsdb.Item) (*modelv1.TagFamily, error) {
familyRawBytes, err := item.Family(family)
if err != nil {
- return nil, err
+ return nil, errors.Wrapf(err, "parse family %s", family)
}
tagFamily := &modelv1.TagFamilyForWrite{}
err = proto.Unmarshal(familyRawBytes, tagFamily)
diff --git a/banyand/tsdb/segment.go b/banyand/tsdb/segment.go
index a2450a4..6a8305e 100644
--- a/banyand/tsdb/segment.go
+++ b/banyand/tsdb/segment.go
@@ -102,4 +102,5 @@ func (s *segment) close() {
for _, b := range s.lst {
b.close()
}
+ s.globalIndex.Close()
}
diff --git a/banyand/tsdb/shard.go b/banyand/tsdb/shard.go
index 09b6ded..7b1b7b9 100644
--- a/banyand/tsdb/shard.go
+++ b/banyand/tsdb/shard.go
@@ -19,17 +19,20 @@ package tsdb
import (
"context"
+ "strconv"
"sync"
"time"
"github.com/apache/skywalking-banyandb/api/common"
+ "github.com/apache/skywalking-banyandb/pkg/logger"
)
var _ Shard = (*shard)(nil)
type shard struct {
sync.Mutex
- id common.ShardID
+ id common.ShardID
+ logger *logger.Logger
location string
seriesDatabase SeriesDatabase
@@ -54,6 +57,12 @@ func openShard(ctx context.Context, id common.ShardID, location string) (*shard,
id: id,
location: location,
}
+ parentLogger := ctx.Value(logger.ContextKey)
+ if parentLogger != nil {
+ if pl, ok := parentLogger.(*logger.Logger); ok {
+ s.logger = pl.Named("shard" + strconv.Itoa(int(id)))
+ }
+ }
loadSeg := func(path string) error {
seg, err := newSegment(ctx, path)
if err != nil {
@@ -64,9 +73,11 @@ func openShard(ctx context.Context, id common.ShardID, location string) (*shard,
defer s.Unlock()
s.lst = append(s.lst, seg)
}
+ s.logger.Info().Int("size", len(s.lst)).Msg("seg size")
return nil
}
err := walkDir(location, segPathPrefix, func(_, absolutePath string) error {
+ s.logger.Info().Str("path", absolutePath).Msg("loading a segment")
return loadSeg(absolutePath)
})
if err != nil {
@@ -78,6 +89,7 @@ func openShard(ctx context.Context, id common.ShardID, location string) (*shard,
if err != nil {
return nil, err
}
+ s.logger.Info().Str("path", segPath).Msg("creating a new segment")
err = loadSeg(segPath)
if err != nil {
return nil, err
diff --git a/banyand/tsdb/tsdb.go b/banyand/tsdb/tsdb.go
index fbb710a..e746890 100644
--- a/banyand/tsdb/tsdb.go
+++ b/banyand/tsdb/tsdb.go
@@ -186,7 +186,8 @@ func loadDatabase(ctx context.Context, db *database) (Database, error) {
if shardID >= int(db.shardNum) {
return nil
}
- so, errOpenShard := openShard(ctx, common.ShardID(shardID), absolutePath)
+ db.logger.Info().Int("shard_id", shardID).Str("path", absolutePath).Msg("opening a shard")
+ so, errOpenShard := openShard(context.WithValue(ctx, logger.ContextKey, db.logger), common.ShardID(shardID), absolutePath)
if errOpenShard != nil {
return errOpenShard
}
@@ -210,8 +211,9 @@ func walkDir(root, prefix string, walkFn walkFn) error {
if !f.IsDir() || !strings.HasPrefix(f.Name(), prefix) {
continue
}
- if walkFn(f.Name(), fmt.Sprintf(rootPrefix, root)+f.Name()) != nil {
- return errors.WithMessagef(err, "failed to load: %s", f.Name())
+ errWalk := walkFn(f.Name(), fmt.Sprintf(rootPrefix, root)+f.Name())
+ if errWalk != nil {
+ return errors.WithMessagef(errWalk, "failed to load: %s", f.Name())
}
}
return nil
diff --git a/go.mod b/go.mod
index 38d87ab..ed53ef6 100644
--- a/go.mod
+++ b/go.mod
@@ -106,4 +106,4 @@ require (
sigs.k8s.io/yaml v1.2.0 // indirect
)
-replace github.com/dgraph-io/badger/v3 v3.2011.1 => github.com/SkyAPM/badger/v3 v3.0.0-20211119041803-47ac7c51ca6a
+replace github.com/dgraph-io/badger/v3 v3.2011.1 => github.com/SkyAPM/badger/v3 v3.0.0-20220117144524-89781ee8a386
diff --git a/go.sum b/go.sum
index fb13844..2716245 100644
--- a/go.sum
+++ b/go.sum
@@ -47,8 +47,8 @@ github.com/RoaringBitmap/gocroaring v0.4.0/go.mod h1:NieMwz7ZqwU2DD73/vvYwv7r4eW
github.com/RoaringBitmap/real-roaring-datasets v0.0.0-20190726190000-eb7c87156f76/go.mod h1:oM0MHmQ3nDsq609SS36p+oYbRi16+oVvU2Bw4Ipv0SE=
github.com/RoaringBitmap/roaring v0.9.1 h1:5PRizBmoN/PfV17nPNQou4dHQ7NcJi8FO/bihdYyCEM=
github.com/RoaringBitmap/roaring v0.9.1/go.mod h1:h1B7iIUOmnAeb5ytYMvnHJwxMc6LUrwBnzXWRuqTQUc=
-github.com/SkyAPM/badger/v3 v3.0.0-20211119041803-47ac7c51ca6a h1:kcUQmdVI0E0J8bfwJpbQhWOOxijKNeoEfLsiIkayf1E=
-github.com/SkyAPM/badger/v3 v3.0.0-20211119041803-47ac7c51ca6a/go.mod h1:RHo4/GmYcKKh5Lxu63wLEMHJ70Pac2JqZRYGhlyAo2M=
+github.com/SkyAPM/badger/v3 v3.0.0-20220117144524-89781ee8a386 h1:2VmCNyGlF/yY+Ev9bxCiPcKGaWSEQDiRCRiCJYWPh7o=
+github.com/SkyAPM/badger/v3 v3.0.0-20220117144524-89781ee8a386/go.mod h1:RHo4/GmYcKKh5Lxu63wLEMHJ70Pac2JqZRYGhlyAo2M=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
diff --git a/pkg/encoding/plain.go b/pkg/encoding/plain.go
index 0b3fb8c..e511441 100644
--- a/pkg/encoding/plain.go
+++ b/pkg/encoding/plain.go
@@ -149,7 +149,7 @@ func (t *plainEncoder) Encode() ([]byte, error) {
l := len(data)
dst := make([]byte, 0, compressBound(l))
dst = zstdEncoder.EncodeAll(data, dst)
- result := buffer.NewBufferWriter(bytes.NewBuffer(make([]byte, len(dst)+2)))
+ result := buffer.NewBufferWriter(bytes.NewBuffer(make([]byte, 0, len(dst)+2)))
result.Write(dst)
result.PutUint16(uint16(l))
return result.Bytes(), nil
@@ -187,7 +187,7 @@ func (t *plainDecoder) Decode(_, rawData []byte) (err error) {
var data []byte
size := binary.LittleEndian.Uint16(rawData[len(rawData)-2:])
if data, err = zstdDecoder.DecodeAll(rawData[:len(rawData)-2], make([]byte, 0, size)); err != nil {
- return err
+ return errors.Wrap(err, "plain decoder fails to decode")
}
l := uint32(len(data))
if l <= 8 {
diff --git a/pkg/index/inverted/inverted.go b/pkg/index/inverted/inverted.go
index cd56d18..7c19920 100644
--- a/pkg/index/inverted/inverted.go
+++ b/pkg/index/inverted/inverted.go
@@ -69,7 +69,7 @@ func NewStore(opts StoreOpts) (index.Store, error) {
}
func (s *store) Close() error {
- return s.diskTable.Close()
+ return multierr.Combine(s.diskTable.Close(), s.termMetadata.Close())
}
func (s *store) Write(field index.Field, chunkID common.ItemID) error {
diff --git a/pkg/index/lsm/lsm.go b/pkg/index/lsm/lsm.go
index 288d16c..3a7cddd 100644
--- a/pkg/index/lsm/lsm.go
+++ b/pkg/index/lsm/lsm.go
@@ -18,6 +18,8 @@
package lsm
import (
+ "go.uber.org/multierr"
+
"github.com/apache/skywalking-banyandb/api/common"
"github.com/apache/skywalking-banyandb/banyand/kv"
"github.com/apache/skywalking-banyandb/pkg/convert"
@@ -34,7 +36,7 @@ type store struct {
}
func (s *store) Close() error {
- return s.lsm.Close()
+ return multierr.Combine(s.lsm.Close(), s.termMetadata.Close())
}
func (s *store) Write(field index.Field, itemID common.ItemID) error {
diff --git a/pkg/index/metadata/metadata.go b/pkg/index/metadata/metadata.go
index 750e3db..fd7d5d0 100644
--- a/pkg/index/metadata/metadata.go
+++ b/pkg/index/metadata/metadata.go
@@ -18,6 +18,8 @@
package metadata
import (
+ "io"
+
"github.com/pkg/errors"
"github.com/apache/skywalking-banyandb/banyand/kv"
@@ -28,6 +30,7 @@ import (
type Term interface {
ID(term []byte) (id []byte, err error)
Literal(id []byte) (term []byte, err error)
+ io.Closer
}
var _ Term = (*term)(nil)
@@ -64,3 +67,7 @@ func (t *term) ID(term []byte) (id []byte, err error) {
func (t *term) Literal(id []byte) (term []byte, err error) {
return t.store.Get(id)
}
+
+func (t *term) Close() error {
+ return t.store.Close()
+}
diff --git a/pkg/run/test.go b/pkg/run/test.go
index 98e1514..17b22cb 100644
--- a/pkg/run/test.go
+++ b/pkg/run/test.go
@@ -43,7 +43,7 @@ func NewTester(ID string) *Tester {
func (t *Tester) WaitUntilStarted() error {
select {
case err := <-t.stopCh:
- return fmt.Errorf("stoped: %v", err)
+ return fmt.Errorf("stopped: %v", err)
case <-t.startedNotifier:
return nil
}