You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ha...@apache.org on 2021/08/10 01:57:33 UTC
[skywalking-banyandb] branch storage-table created (now 2424a82)
This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a change to branch storage-table
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git.
at 2424a82 Introduce index store
This branch includes the following new commits:
new a4ce4df Update badger, using new APIs
new 302935e Merge remote-tracking branch 'origin/main' into storage-table
new 2424a82 Introduce index store
The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails. The revisions
listed as "add" were already present in the repository and have only
been added to this reference.
[skywalking-banyandb] 02/03: Merge remote-tracking branch
'origin/main' into storage-table
Posted by ha...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch storage-table
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 302935e3a49b761bc29eb764891834321e5d1ba5
Merge: a4ce4df b7845b0
Author: Gao Hongtao <ha...@gmail.com>
AuthorDate: Tue Aug 10 09:48:15 2021 +0800
Merge remote-tracking branch 'origin/main' into storage-table
api/proto/banyandb/v1/schema.pb.go | 364 ++++++++++++------------
api/proto/banyandb/v1/schema.proto | 14 +-
banyand/series/schema/sw/trace_series.textproto | 2 +-
3 files changed, 191 insertions(+), 189 deletions(-)
[skywalking-banyandb] 01/03: Update badger, using new APIs
Posted by ha...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch storage-table
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit a4ce4df9b4bc77c3c78619c309fbb8e4d216f899
Author: Gao Hongtao <ha...@gmail.com>
AuthorDate: Sun Aug 8 14:56:31 2021 +0800
Update badger, using new APIs
Signed-off-by: Gao Hongtao <ha...@gmail.com>
---
banyand/kv/badger.go | 71 ++++++++++++++++++++--------------------------------
banyand/kv/kv.go | 12 +++++----
go.mod | 2 +-
go.sum | 8 +++---
4 files changed, 39 insertions(+), 54 deletions(-)
diff --git a/banyand/kv/badger.go b/banyand/kv/badger.go
index d118a58..05b4e51 100644
--- a/banyand/kv/badger.go
+++ b/banyand/kv/badger.go
@@ -19,8 +19,10 @@ package kv
import (
"log"
+ "math"
"github.com/dgraph-io/badger/v3"
+ "github.com/dgraph-io/badger/v3/y"
"github.com/apache/skywalking-banyandb/pkg/logger"
)
@@ -52,36 +54,26 @@ type badgerDB struct {
}
func (b *badgerDB) Scan(key []byte, opt ScanOpts, f ScanFunc) error {
- err := b.db.View(func(txn *badger.Txn) error {
- opts := badger.DefaultIteratorOptions
- opts.PrefetchSize = opt.PrefetchSize
- opts.PrefetchValues = opt.PrefetchValues
- it := txn.NewIterator(opts)
- defer it.Close()
- for it.Seek(key); it.Valid(); it.Next() {
- item := it.Item()
- k := item.Key()
- err := f(b.shardID, k, func() ([]byte, error) {
- var val []byte
- err := item.Value(func(v []byte) error {
- val = v
- return nil
- })
- if err != nil {
- return nil, err
- }
- return val, nil
- })
- if err == ErrStopScan {
- break
- }
- if err != nil {
- return err
- }
+ opts := badger.DefaultIteratorOptions
+ opts.PrefetchSize = opt.PrefetchSize
+ opts.PrefetchValues = opt.PrefetchValues
+ it := b.db.NewIterator(opts)
+ defer func() {
+ _ = it.Close()
+ }()
+ for it.Seek(y.KeyWithTs(key, math.MaxInt64)); it.Valid(); it.Next() {
+ k := y.ParseKey(it.Key())
+ err := f(b.shardID, k, func() ([]byte, error) {
+ return y.Copy(it.Value().Value), nil
+ })
+ if err == ErrStopScan {
+ break
+ }
+ if err != nil {
+ return err
}
- return nil
- })
- return err
+ }
+ return nil
}
func (b *badgerDB) Close() error {
@@ -92,24 +84,15 @@ func (b *badgerDB) Close() error {
}
func (b *badgerDB) Put(key, val []byte) error {
- return b.db.Update(func(txn *badger.Txn) error {
- return txn.Set(key, val)
- })
+ return b.db.Put(y.KeyWithTs(key, math.MaxInt64), val)
}
func (b *badgerDB) Get(key []byte) ([]byte, error) {
- var bb []byte
- err := b.db.View(func(txn *badger.Txn) error {
- item, err := txn.Get(key)
- if err != nil {
- return err
- }
- return item.Value(func(val []byte) error {
- bb = val
- return nil
- })
- })
- return bb, err
+ v, err := b.db.Get(y.KeyWithTs(key, math.MaxInt64))
+ if err != nil {
+ return nil, err
+ }
+ return v.Value, nil
}
// badgerLog delegates the zap log to the badger logger
diff --git a/banyand/kv/kv.go b/banyand/kv/kv.go
index 34a5188..f3a2d55 100644
--- a/banyand/kv/kv.go
+++ b/banyand/kv/kv.go
@@ -27,11 +27,13 @@ import (
"github.com/apache/skywalking-banyandb/pkg/logger"
)
-var ErrStopScan = errors.New("stop scanning")
-var DefaultScanOpts = ScanOpts{
- PrefetchSize: 100,
- PrefetchValues: true,
-}
+var (
+ ErrStopScan = errors.New("stop scanning")
+ DefaultScanOpts = ScanOpts{
+ PrefetchSize: 100,
+ PrefetchValues: true,
+ }
+)
type Writer interface {
// Put a value
diff --git a/go.mod b/go.mod
index 5ad72a0..864feed 100644
--- a/go.mod
+++ b/go.mod
@@ -27,4 +27,4 @@ require (
google.golang.org/protobuf v1.27.1
)
-replace github.com/dgraph-io/badger/v3 v3.2011.1 => github.com/SkyAPM/badger/v3 v3.0.0-20210624023741-bd2dcfcaaa74
+replace github.com/dgraph-io/badger/v3 v3.2011.1 => github.com/SkyAPM/badger/v3 v3.0.0-20210808063906-49c6d778ad7d
diff --git a/go.sum b/go.sum
index ce8fd5c..90495e8 100644
--- a/go.sum
+++ b/go.sum
@@ -45,8 +45,8 @@ github.com/RoaringBitmap/gocroaring v0.4.0/go.mod h1:NieMwz7ZqwU2DD73/vvYwv7r4eW
github.com/RoaringBitmap/real-roaring-datasets v0.0.0-20190726190000-eb7c87156f76/go.mod h1:oM0MHmQ3nDsq609SS36p+oYbRi16+oVvU2Bw4Ipv0SE=
github.com/RoaringBitmap/roaring v0.9.1 h1:5PRizBmoN/PfV17nPNQou4dHQ7NcJi8FO/bihdYyCEM=
github.com/RoaringBitmap/roaring v0.9.1/go.mod h1:h1B7iIUOmnAeb5ytYMvnHJwxMc6LUrwBnzXWRuqTQUc=
-github.com/SkyAPM/badger/v3 v3.0.0-20210624023741-bd2dcfcaaa74 h1:BFHSkDBSYCtPxMgxGz07DfNRYS76KFVDlocQ2U9rY7E=
-github.com/SkyAPM/badger/v3 v3.0.0-20210624023741-bd2dcfcaaa74/go.mod h1:XieWaNygSGj5ZzSsZO4tQe/2wwLjCvESus4twFqxOKc=
+github.com/SkyAPM/badger/v3 v3.0.0-20210808063906-49c6d778ad7d h1:6B7YHxp79aOfRuMXfWdSyNSZEDqJqSzQWWkQcmqyI9s=
+github.com/SkyAPM/badger/v3 v3.0.0-20210808063906-49c6d778ad7d/go.mod h1:dULbq6ehJ5K0cGW/1TQ9iSfUk0gbSiToDWmWmTsJ53E=
github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o=
github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8=
@@ -79,8 +79,8 @@ github.com/cpuguy83/go-md2man/v2 v2.0.0/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsr
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
-github.com/dgraph-io/ristretto v0.0.4-0.20210504190834-0bf2acd73aa3 h1:jU/wpYsEL+8JPLf/QcjkQKI5g0dOjSuwcMjkThxt5x0=
-github.com/dgraph-io/ristretto v0.0.4-0.20210504190834-0bf2acd73aa3/go.mod h1:fux0lOrBhrVCJd3lcTHsIJhq1T2rokOu6v9Vcb3Q9ug=
+github.com/dgraph-io/ristretto v0.1.0 h1:Jv3CGQHp9OjuMBSne1485aDpUkTKEcUqF+jm/LuerPI=
+github.com/dgraph-io/ristretto v0.1.0/go.mod h1:fux0lOrBhrVCJd3lcTHsIJhq1T2rokOu6v9Vcb3Q9ug=
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 h1:tdlZCpZ/P9DhczCTSixgIKmwPv6+wP5DGjqLYw5SUiA=
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw=
github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo=
[skywalking-banyandb] 03/03: Introduce index store
Posted by ha...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch storage-table
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 2424a8236c656da3395f6c59b8f0a5d20e5c181a
Author: Gao Hongtao <ha...@gmail.com>
AuthorDate: Tue Aug 10 09:56:28 2021 +0800
Introduce index store
Signed-off-by: Gao Hongtao <ha...@gmail.com>
---
banyand/kv/badger.go | 103 +++++++++++++++++++++++++--
banyand/kv/kv.go | 78 +++++++++++++++++++--
banyand/storage/block.go | 36 +++++++---
banyand/storage/database.go | 9 +++
banyand/storage/database_test.go | 145 ++++++++++++++++++++++++++++++++++++---
banyand/storage/storage.go | 5 ++
go.mod | 2 +-
go.sum | 4 +-
pkg/posting/posting.go | 4 ++
pkg/posting/roaring/roaring.go | 8 +++
10 files changed, 365 insertions(+), 29 deletions(-)
diff --git a/banyand/kv/badger.go b/banyand/kv/badger.go
index 05b4e51..64c9f29 100644
--- a/banyand/kv/badger.go
+++ b/banyand/kv/badger.go
@@ -18,16 +18,27 @@
package kv
import (
+ "bytes"
"log"
"math"
+ "time"
"github.com/dgraph-io/badger/v3"
"github.com/dgraph-io/badger/v3/y"
+ "go.uber.org/multierr"
"github.com/apache/skywalking-banyandb/pkg/logger"
+ "github.com/apache/skywalking-banyandb/pkg/posting"
+ "github.com/apache/skywalking-banyandb/pkg/posting/roaring"
)
-var _ TimeSeriesStore = (*badgerTSS)(nil)
+var (
+ _ Store = (*badgerDB)(nil)
+ _ IndexStore = (*badgerDB)(nil)
+ _ y.Iterator = (*mergedIter)(nil)
+ _ TimeSeriesStore = (*badgerTSS)(nil)
+ bitMergeEntry byte = 1 << 3
+)
type badgerTSS struct {
shardID int
@@ -43,14 +54,98 @@ func (b *badgerTSS) Close() error {
return nil
}
-var _ Store = (*badgerDB)(nil)
+type mergedIter struct {
+ delegated Iterator
+ valid bool
+ data []byte
+}
+
+func (i *mergedIter) Next() {
+ i.delegated.Next()
+ i.parseData()
+}
+
+func (i *mergedIter) Rewind() {
+ i.delegated.Rewind()
+ i.parseData()
+}
+
+func (i *mergedIter) Seek(key []byte) {
+ i.delegated.Seek(y.KeyWithTs(key, math.MaxInt64))
+}
+
+func (i *mergedIter) Key() []byte {
+ return y.KeyWithTs(i.delegated.Key(), uint64(time.Now().UnixNano()))
+}
+
+func (i *mergedIter) Valid() bool {
+ return i.valid
+}
+
+func (i *mergedIter) parseData() {
+ i.data = nil
+ i.valid = i.delegated.Valid()
+ if !i.valid {
+ return
+ }
+ data, err := i.delegated.Val().Marshall()
+ if err != nil {
+ i.valid = false
+ return
+ }
+ i.data = data
+}
+
+func (i *mergedIter) Close() error {
+ i.data = nil
+ i.valid = false
+ return i.delegated.Close()
+}
+
+func (i mergedIter) Value() y.ValueStruct {
+ return y.ValueStruct{
+ Value: i.data,
+ Meta: bitMergeEntry,
+ }
+}
type badgerDB struct {
shardID int
dbOpts badger.Options
db *badger.DB
- seqKey string
- seq *badger.Sequence
+}
+
+func (b *badgerDB) Handover(iterator Iterator) error {
+ return b.db.HandoverIterator(&mergedIter{
+ delegated: iterator,
+ })
+}
+
+func (b *badgerDB) Seek(key []byte, limit int) (posting.List, error) {
+ opts := badger.DefaultIteratorOptions
+ it := b.db.NewIterator(opts)
+ defer func() {
+ _ = it.Close()
+ }()
+ result := roaring.NewPostingList()
+ var errMerged error
+ for it.Seek(y.KeyWithTs(key, math.MaxInt64)); it.Valid(); it.Next() {
+ k := y.ParseKey(it.Key())
+ if !bytes.Equal(key, k) {
+ break
+ }
+ list := roaring.NewPostingList()
+ err := list.Unmarshall(it.Value().Value)
+ if err != nil {
+ errMerged = multierr.Append(errMerged, err)
+ continue
+ }
+ _ = result.Union(list)
+ if result.Len() > limit {
+ break
+ }
+ }
+ return result, errMerged
}
func (b *badgerDB) Scan(key []byte, opt ScanOpts, f ScanFunc) error {
diff --git a/banyand/kv/kv.go b/banyand/kv/kv.go
index f3a2d55..67d6938 100644
--- a/banyand/kv/kv.go
+++ b/banyand/kv/kv.go
@@ -25,6 +25,7 @@ import (
"github.com/pkg/errors"
"github.com/apache/skywalking-banyandb/pkg/logger"
+ "github.com/apache/skywalking-banyandb/pkg/posting"
)
var (
@@ -95,6 +96,24 @@ func TSSWithLogger(l *logger.Logger) TimeSeriesOptions {
}
}
+type Iterator interface {
+ Next()
+ Rewind()
+ Seek(key []byte)
+ Key() []byte
+ Val() posting.List
+ Valid() bool
+ Close() error
+}
+
+type HandoverCallback func()
+
+type IndexStore interface {
+ Handover(iterator Iterator) error
+ Seek(key []byte, limit int) (posting.List, error)
+ Close() error
+}
+
// OpenTimeSeriesStore creates a new TimeSeriesStore
func OpenTimeSeriesStore(shardID int, path string, compressLevel int, valueSize int, options ...TimeSeriesOptions) (TimeSeriesStore, error) {
btss := new(badgerTSS)
@@ -104,6 +123,8 @@ func OpenTimeSeriesStore(shardID int, path string, compressLevel int, valueSize
opt(btss)
}
btss.dbOpts = btss.dbOpts.WithMaxLevels(1)
+ // Put all values into LSM
+ btss.dbOpts = btss.dbOpts.WithVLogPercentile(1.0)
var err error
btss.db, err = badger.Open(btss.dbOpts)
if err != nil {
@@ -126,6 +147,26 @@ func StoreWithLogger(l *logger.Logger) StoreOptions {
}
}
+// StoreWithBufferSize sets a external logger into underlying Store
+func StoreWithBufferSize(size int64) StoreOptions {
+ return func(store Store) {
+ if bdb, ok := store.(*badgerDB); ok {
+ bdb.dbOpts = bdb.dbOpts.WithMemTableSize(size)
+ }
+ }
+}
+
+type FlushCallback func()
+
+// StoreWithFlushCallback sets a callback function
+func StoreWithFlushCallback(callback FlushCallback) StoreOptions {
+ return func(store Store) {
+ if bdb, ok := store.(*badgerDB); ok {
+ bdb.dbOpts.FlushCallBack = callback
+ }
+ }
+}
+
// OpenStore creates a new Store
func OpenStore(shardID int, path string, options ...StoreOptions) (Store, error) {
bdb := new(badgerDB)
@@ -139,13 +180,40 @@ func OpenStore(shardID int, path string, options ...StoreOptions) (Store, error)
var err error
bdb.db, err = badger.Open(bdb.dbOpts)
if err != nil {
- return nil, fmt.Errorf("failed to open time series store: %v", err)
+ return nil, fmt.Errorf("failed to open normal store: %v", err)
}
- if bdb.seqKey != "" {
- bdb.seq, err = bdb.db.GetSequence([]byte(bdb.seqKey), 100)
- if err != nil {
- return nil, fmt.Errorf("failed to get sequence: %v", err)
+ return bdb, nil
+}
+
+type IndexOptions func(store IndexStore)
+
+// IndexWithLogger sets a external logger into underlying IndexStore
+func IndexWithLogger(l *logger.Logger) IndexOptions {
+ return func(store IndexStore) {
+ if bdb, ok := store.(*badgerDB); ok {
+ bdb.dbOpts = bdb.dbOpts.WithLogger(&badgerLog{
+ delegated: l.Named("index-kv"),
+ })
}
}
+}
+
+// OpenIndexStore creates a new IndexStore
+func OpenIndexStore(shardID int, path string, options ...IndexOptions) (IndexStore, error) {
+ bdb := new(badgerDB)
+ bdb.shardID = shardID
+ bdb.dbOpts = badger.DefaultOptions(path)
+ for _, opt := range options {
+ opt(bdb)
+ }
+ bdb.dbOpts = bdb.dbOpts.WithMaxLevels(1)
+ // Put all values into LSM
+ bdb.dbOpts = bdb.dbOpts.WithVLogPercentile(1.0)
+
+ var err error
+ bdb.db, err = badger.Open(bdb.dbOpts)
+ if err != nil {
+ return nil, fmt.Errorf("failed to index store: %v", err)
+ }
return bdb, nil
}
diff --git a/banyand/storage/block.go b/banyand/storage/block.go
index 8bbf50f..76a8b1f 100644
--- a/banyand/storage/block.go
+++ b/banyand/storage/block.go
@@ -30,8 +30,9 @@ type block struct {
l *logger.Logger
- stores map[string]kv.Store
- tsStores map[string]kv.TimeSeriesStore
+ stores map[string]kv.Store
+ tsStores map[string]kv.TimeSeriesStore
+ indexStores map[string]kv.IndexStore
shardID int
}
@@ -39,12 +40,13 @@ type block struct {
func newBlock(shardID int, path string, plugin Plugin) (*block, error) {
l := logger.GetLogger("block")
return &block{
- shardID: shardID,
- path: path,
- plugin: plugin,
- l: l,
- stores: make(map[string]kv.Store),
- tsStores: make(map[string]kv.TimeSeriesStore),
+ shardID: shardID,
+ path: path,
+ plugin: plugin,
+ l: l,
+ stores: make(map[string]kv.Store),
+ tsStores: make(map[string]kv.TimeSeriesStore),
+ indexStores: make(map[string]kv.IndexStore),
}, nil
}
@@ -69,6 +71,12 @@ func (b *block) createKV(defines []KVSpec) (err error) {
case KVTypeNormal:
var s kv.Store
opts := make([]kv.StoreOptions, 0)
+ if define.BufferSize > 0 {
+ opts = append(opts, kv.StoreWithBufferSize(define.BufferSize))
+ }
+ if define.FlushCallback != nil {
+ opts = append(opts, kv.StoreWithFlushCallback(define.FlushCallback))
+ }
opts = append(opts, kv.StoreWithLogger(b.l))
if s, err = kv.OpenStore(b.shardID, path, opts...); err != nil {
return fmt.Errorf("failed to open normal store: %w", err)
@@ -76,10 +84,17 @@ func (b *block) createKV(defines []KVSpec) (err error) {
b.stores[storeID] = s
case KVTypeTimeSeries:
var s kv.TimeSeriesStore
- if s, err = kv.OpenTimeSeriesStore(b.shardID, path, define.CompressLevel, define.ValueSize, kv.TSSWithLogger(b.l)); err != nil {
+ if s, err = kv.OpenTimeSeriesStore(b.shardID, path, define.CompressLevel, define.ValueSize,
+ kv.TSSWithLogger(b.l)); err != nil {
return fmt.Errorf("failed to open time series store: %w", err)
}
b.tsStores[storeID] = s
+ case KVTypeIndex:
+ var s kv.IndexStore
+ if s, err = kv.OpenIndexStore(b.shardID, path, kv.IndexWithLogger(b.l)); err != nil {
+ return fmt.Errorf("failed to open time series store: %w", err)
+ }
+ b.indexStores[storeID] = s
}
}
return nil
@@ -92,4 +107,7 @@ func (b *block) close() {
for _, store := range b.tsStores {
_ = store.Close()
}
+ for _, store := range b.indexStores {
+ _ = store.Close()
+ }
}
diff --git a/banyand/storage/database.go b/banyand/storage/database.go
index f69af0f..9302ab0 100644
--- a/banyand/storage/database.go
+++ b/banyand/storage/database.go
@@ -127,6 +127,15 @@ func (s *series) TimeSeriesReader(shard uint, name string, start, end uint64) kv
return b.tsStores[name]
}
+func (s *series) Index(shard uint, name string) kv.IndexStore {
+ //TODO: find targets in all blocks
+ b, ok := s.sLst[shard].activeBlock.Load().(*block)
+ if !ok {
+ return nil
+ }
+ return b.indexStores[name]
+}
+
func (s *series) load(meta PluginMeta) error {
//TODO: to implement load instead of removing old contents
return os.RemoveAll(s.location)
diff --git a/banyand/storage/database_test.go b/banyand/storage/database_test.go
index 754c25b..89c910e 100644
--- a/banyand/storage/database_test.go
+++ b/banyand/storage/database_test.go
@@ -29,7 +29,12 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
+ "github.com/apache/skywalking-banyandb/api/common"
+ "github.com/apache/skywalking-banyandb/banyand/kv"
+ "github.com/apache/skywalking-banyandb/pkg/convert"
"github.com/apache/skywalking-banyandb/pkg/logger"
+ "github.com/apache/skywalking-banyandb/pkg/posting"
+ "github.com/apache/skywalking-banyandb/pkg/posting/roaring"
)
func TestDB_Create_Directory(t *testing.T) {
@@ -59,6 +64,7 @@ func TestDB_Create_Directory(t *testing.T) {
}
func TestDB_Store(t *testing.T) {
+ is := require.New(t)
ctrl := gomock.NewController(t)
defer ctrl.Finish()
now := uint64(time.Now().UnixNano())
@@ -75,18 +81,137 @@ func TestDB_Store(t *testing.T) {
removeDir(tempDir)
}()
- assert.NoError(t, ap.Writer(0, "normal").Put([]byte("key1"), []byte{12}))
+ is.NoError(ap.Writer(0, "normal").Put([]byte("key1"), []byte{12}))
val, err := repo.Reader(0, "normal", now, now).Get([]byte("key1"))
- assert.NoError(t, err)
- assert.Equal(t, []byte{12}, val)
+ is.NoError(err)
+ is.Equal([]byte{12}, val)
- assert.NoError(t, ap.TimeSeriesWriter(1, "time-series").Put([]byte("key11"), []byte{33}, 1))
+ is.NoError(ap.TimeSeriesWriter(1, "time-series").Put([]byte("key11"), []byte{33}, 1))
val, err = repo.TimeSeriesReader(1, "time-series", now, now).Get([]byte("key11"), 1)
- assert.NoError(t, err)
- assert.Equal(t, []byte{33}, val)
+ is.NoError(err)
+ is.Equal([]byte{33}, val)
vals, allErr := repo.TimeSeriesReader(1, "time-series", now, now).GetAll([]byte("key11"))
- assert.NoError(t, allErr)
- assert.Equal(t, [][]byte{{33}}, vals)
+ is.NoError(allErr)
+ is.Equal([][]byte{{33}}, vals)
+
+ index := repo.Index(1, "index")
+ is.NoError(index.Handover(mockMemtable([]uint64{1, 2}, []uint64{3, 6})))
+ list, err := index.Seek(convert.Int64ToBytes(0), 2)
+ is.NoError(err)
+ is.Equal(2, list.Len())
+ is.True(list.Contains(common.ChunkID(1)))
+ is.True(list.Contains(common.ChunkID(2)))
+ list, err = index.Seek(convert.Int64ToBytes(1), 2)
+ is.NoError(err)
+ is.Equal(2, list.Len())
+ is.True(list.Contains(common.ChunkID(3)))
+ is.True(list.Contains(common.ChunkID(6)))
+
+ is.NoError(index.Handover(mockMemtable([]uint64{11, 14})))
+ list, err = index.Seek(convert.Int64ToBytes(0), 2)
+ is.NoError(err)
+ is.Equal(4, list.Len())
+ is.True(list.Contains(common.ChunkID(1)))
+ is.True(list.Contains(common.ChunkID(2)))
+ is.True(list.Contains(common.ChunkID(11)))
+ is.True(list.Contains(common.ChunkID(14)))
+}
+
+func TestDB_FlushCallback(t *testing.T) {
+ is := require.New(t)
+ ctrl := gomock.NewController(t)
+ defer ctrl.Finish()
+ now := uint64(time.Now().UnixNano())
+ var ap WritePoint
+ //var repo StoreRepo
+ p := NewMockPlugin(ctrl)
+ latch := make(chan struct{})
+ closed := false
+ p.EXPECT().Meta().Return(PluginMeta{
+ ID: "sw",
+ Group: "default",
+ ShardNumber: 2,
+ KVSpecs: []KVSpec{
+ {
+ Name: "normal",
+ Type: KVTypeNormal,
+ BufferSize: 10 << 20,
+ FlushCallback: func() {
+ if closed {
+ return
+ }
+ close(latch)
+ closed = true
+ },
+ },
+ },
+ }).AnyTimes()
+ p.EXPECT().Init(gomock.Any(), gomock.Any()).Do(func(r StoreRepo, wp GetWritePoint) {
+ ap = wp(now)
+ }).AnyTimes()
+
+ tempDir, db := setUp(t, p)
+ defer func() {
+ db.GracefulStop()
+ removeDir(tempDir)
+ }()
+ for i := 0; i < 5000; i++ {
+ key := make([]byte, i)
+ _ = ap.Writer(0, "normal").Put(key, []byte{1})
+ }
+ ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
+ defer cancel()
+ select {
+ case <-latch:
+ case <-ctx.Done():
+ is.Fail("timeout")
+ }
+}
+
+var _ kv.Iterator = (*iter)(nil)
+
+type iter struct {
+ data map[int]posting.List
+ p int
+}
+
+func (i *iter) Next() {
+ i.p++
+}
+
+func (i *iter) Rewind() {
+ i.p = 0
+}
+
+func (i *iter) Seek(key []byte) {
+ panic("implement me")
+}
+
+func (i *iter) Key() []byte {
+ return convert.Int64ToBytes(int64(i.p))
+}
+
+func (i *iter) Val() posting.List {
+ return i.data[i.p]
+}
+
+func (i *iter) Valid() bool {
+ _, ok := i.data[i.p]
+ return ok
+}
+
+func (i *iter) Close() error {
+ return nil
+}
+
+func mockMemtable(data ...[]uint64) kv.Iterator {
+ it := &iter{
+ data: make(map[int]posting.List),
+ }
+ for i, d := range data {
+ it.data[i] = roaring.NewPostingListWithInitialData(d...)
+ }
+ return it
}
func mockPlugin(ctrl *gomock.Controller, f func(repo StoreRepo, get GetWritePoint)) Plugin {
@@ -105,6 +230,10 @@ func mockPlugin(ctrl *gomock.Controller, f func(repo StoreRepo, get GetWritePoin
Type: KVTypeTimeSeries,
CompressLevel: 3,
},
+ {
+ Name: "index",
+ Type: KVTypeIndex,
+ },
},
}).AnyTimes()
p.EXPECT().Init(gomock.Any(), gomock.Any()).Do(func(r StoreRepo, wp GetWritePoint) {
diff --git a/banyand/storage/storage.go b/banyand/storage/storage.go
index 8ac58ec..6219842 100644
--- a/banyand/storage/storage.go
+++ b/banyand/storage/storage.go
@@ -35,6 +35,8 @@ const (
KVTypeNormal KVType = 0
// KVTypeTimeSeries is a time-series KV storage
KVTypeTimeSeries KVType = 1
+ // KVTypeIndex is an index KV storage
+ KVTypeIndex KVType = 2
)
// Database is the storage manager which implements the physical data model
@@ -82,13 +84,16 @@ type CompressSpec struct {
type KVSpec struct {
Name string
Type KVType
+ BufferSize int64
CompressLevel int
ValueSize int
+ FlushCallback kv.FlushCallback
}
type StoreRepo interface {
Reader(shard uint, name string, start, end uint64) kv.Reader
TimeSeriesReader(shard uint, name string, start, end uint64) kv.TimeSeriesReader
+ Index(shard uint, name string) kv.IndexStore
}
// WritePoint is a reference to a underlying area.
diff --git a/go.mod b/go.mod
index 864feed..65674a5 100644
--- a/go.mod
+++ b/go.mod
@@ -27,4 +27,4 @@ require (
google.golang.org/protobuf v1.27.1
)
-replace github.com/dgraph-io/badger/v3 v3.2011.1 => github.com/SkyAPM/badger/v3 v3.0.0-20210808063906-49c6d778ad7d
+replace github.com/dgraph-io/badger/v3 v3.2011.1 => github.com/SkyAPM/badger/v3 v3.0.0-20210809093509-ff1b2dd81165
diff --git a/go.sum b/go.sum
index 90495e8..14301da 100644
--- a/go.sum
+++ b/go.sum
@@ -45,8 +45,8 @@ github.com/RoaringBitmap/gocroaring v0.4.0/go.mod h1:NieMwz7ZqwU2DD73/vvYwv7r4eW
github.com/RoaringBitmap/real-roaring-datasets v0.0.0-20190726190000-eb7c87156f76/go.mod h1:oM0MHmQ3nDsq609SS36p+oYbRi16+oVvU2Bw4Ipv0SE=
github.com/RoaringBitmap/roaring v0.9.1 h1:5PRizBmoN/PfV17nPNQou4dHQ7NcJi8FO/bihdYyCEM=
github.com/RoaringBitmap/roaring v0.9.1/go.mod h1:h1B7iIUOmnAeb5ytYMvnHJwxMc6LUrwBnzXWRuqTQUc=
-github.com/SkyAPM/badger/v3 v3.0.0-20210808063906-49c6d778ad7d h1:6B7YHxp79aOfRuMXfWdSyNSZEDqJqSzQWWkQcmqyI9s=
-github.com/SkyAPM/badger/v3 v3.0.0-20210808063906-49c6d778ad7d/go.mod h1:dULbq6ehJ5K0cGW/1TQ9iSfUk0gbSiToDWmWmTsJ53E=
+github.com/SkyAPM/badger/v3 v3.0.0-20210809093509-ff1b2dd81165 h1:csoTNiGUMtp4H1AchgaZWJ4WY4uJQ6s+pz3sXS93jAA=
+github.com/SkyAPM/badger/v3 v3.0.0-20210809093509-ff1b2dd81165/go.mod h1:dULbq6ehJ5K0cGW/1TQ9iSfUk0gbSiToDWmWmTsJ53E=
github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o=
github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8=
diff --git a/pkg/posting/posting.go b/pkg/posting/posting.go
index 2d7ad16..d997ab0 100644
--- a/pkg/posting/posting.go
+++ b/pkg/posting/posting.go
@@ -60,6 +60,10 @@ type List interface {
Reset()
ToSlice() []common.ChunkID
+
+ Marshall() ([]byte, error)
+
+ Unmarshall(data []byte) error
}
type Iterator interface {
diff --git a/pkg/posting/roaring/roaring.go b/pkg/posting/roaring/roaring.go
index 0b1c9b6..63eb2bc 100644
--- a/pkg/posting/roaring/roaring.go
+++ b/pkg/posting/roaring/roaring.go
@@ -40,6 +40,14 @@ type postingsList struct {
bitmap *roaring64.Bitmap
}
+func (p *postingsList) Marshall() ([]byte, error) {
+ return p.bitmap.MarshalBinary()
+}
+
+func (p *postingsList) Unmarshall(data []byte) error {
+ return p.bitmap.UnmarshalBinary(data)
+}
+
func NewPostingList() posting.List {
return &postingsList{
bitmap: roaring64.New(),