You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ha...@apache.org on 2021/08/10 01:57:36 UTC
[skywalking-banyandb] 03/03: Introduce index store
This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch storage-table
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 2424a8236c656da3395f6c59b8f0a5d20e5c181a
Author: Gao Hongtao <ha...@gmail.com>
AuthorDate: Tue Aug 10 09:56:28 2021 +0800
Introduce index store
Signed-off-by: Gao Hongtao <ha...@gmail.com>
---
banyand/kv/badger.go | 103 +++++++++++++++++++++++++--
banyand/kv/kv.go | 78 +++++++++++++++++++--
banyand/storage/block.go | 36 +++++++---
banyand/storage/database.go | 9 +++
banyand/storage/database_test.go | 145 ++++++++++++++++++++++++++++++++++++---
banyand/storage/storage.go | 5 ++
go.mod | 2 +-
go.sum | 4 +-
pkg/posting/posting.go | 4 ++
pkg/posting/roaring/roaring.go | 8 +++
10 files changed, 365 insertions(+), 29 deletions(-)
diff --git a/banyand/kv/badger.go b/banyand/kv/badger.go
index 05b4e51..64c9f29 100644
--- a/banyand/kv/badger.go
+++ b/banyand/kv/badger.go
@@ -18,16 +18,27 @@
package kv
import (
+ "bytes"
"log"
"math"
+ "time"
"github.com/dgraph-io/badger/v3"
"github.com/dgraph-io/badger/v3/y"
+ "go.uber.org/multierr"
"github.com/apache/skywalking-banyandb/pkg/logger"
+ "github.com/apache/skywalking-banyandb/pkg/posting"
+ "github.com/apache/skywalking-banyandb/pkg/posting/roaring"
)
-var _ TimeSeriesStore = (*badgerTSS)(nil)
+var (
+ _ Store = (*badgerDB)(nil)
+ _ IndexStore = (*badgerDB)(nil)
+ _ y.Iterator = (*mergedIter)(nil)
+ _ TimeSeriesStore = (*badgerTSS)(nil)
+ bitMergeEntry byte = 1 << 3
+)
type badgerTSS struct {
shardID int
@@ -43,14 +54,98 @@ func (b *badgerTSS) Close() error {
return nil
}
-var _ Store = (*badgerDB)(nil)
+type mergedIter struct {
+ delegated Iterator
+ valid bool
+ data []byte
+}
+
+func (i *mergedIter) Next() {
+ i.delegated.Next()
+ i.parseData()
+}
+
+func (i *mergedIter) Rewind() {
+ i.delegated.Rewind()
+ i.parseData()
+}
+
+func (i *mergedIter) Seek(key []byte) {
+ i.delegated.Seek(y.KeyWithTs(key, math.MaxInt64))
+}
+
+func (i *mergedIter) Key() []byte {
+ return y.KeyWithTs(i.delegated.Key(), uint64(time.Now().UnixNano()))
+}
+
+func (i *mergedIter) Valid() bool {
+ return i.valid
+}
+
+func (i *mergedIter) parseData() {
+ i.data = nil
+ i.valid = i.delegated.Valid()
+ if !i.valid {
+ return
+ }
+ data, err := i.delegated.Val().Marshall()
+ if err != nil {
+ i.valid = false
+ return
+ }
+ i.data = data
+}
+
+func (i *mergedIter) Close() error {
+ i.data = nil
+ i.valid = false
+ return i.delegated.Close()
+}
+
+func (i mergedIter) Value() y.ValueStruct {
+ return y.ValueStruct{
+ Value: i.data,
+ Meta: bitMergeEntry,
+ }
+}
type badgerDB struct {
shardID int
dbOpts badger.Options
db *badger.DB
- seqKey string
- seq *badger.Sequence
+}
+
+func (b *badgerDB) Handover(iterator Iterator) error {
+ return b.db.HandoverIterator(&mergedIter{
+ delegated: iterator,
+ })
+}
+
+func (b *badgerDB) Seek(key []byte, limit int) (posting.List, error) {
+ opts := badger.DefaultIteratorOptions
+ it := b.db.NewIterator(opts)
+ defer func() {
+ _ = it.Close()
+ }()
+ result := roaring.NewPostingList()
+ var errMerged error
+ for it.Seek(y.KeyWithTs(key, math.MaxInt64)); it.Valid(); it.Next() {
+ k := y.ParseKey(it.Key())
+ if !bytes.Equal(key, k) {
+ break
+ }
+ list := roaring.NewPostingList()
+ err := list.Unmarshall(it.Value().Value)
+ if err != nil {
+ errMerged = multierr.Append(errMerged, err)
+ continue
+ }
+ _ = result.Union(list)
+ if result.Len() > limit {
+ break
+ }
+ }
+ return result, errMerged
}
func (b *badgerDB) Scan(key []byte, opt ScanOpts, f ScanFunc) error {
diff --git a/banyand/kv/kv.go b/banyand/kv/kv.go
index f3a2d55..67d6938 100644
--- a/banyand/kv/kv.go
+++ b/banyand/kv/kv.go
@@ -25,6 +25,7 @@ import (
"github.com/pkg/errors"
"github.com/apache/skywalking-banyandb/pkg/logger"
+ "github.com/apache/skywalking-banyandb/pkg/posting"
)
var (
@@ -95,6 +96,24 @@ func TSSWithLogger(l *logger.Logger) TimeSeriesOptions {
}
}
+type Iterator interface {
+ Next()
+ Rewind()
+ Seek(key []byte)
+ Key() []byte
+ Val() posting.List
+ Valid() bool
+ Close() error
+}
+
+type HandoverCallback func()
+
+type IndexStore interface {
+ Handover(iterator Iterator) error
+ Seek(key []byte, limit int) (posting.List, error)
+ Close() error
+}
+
// OpenTimeSeriesStore creates a new TimeSeriesStore
func OpenTimeSeriesStore(shardID int, path string, compressLevel int, valueSize int, options ...TimeSeriesOptions) (TimeSeriesStore, error) {
btss := new(badgerTSS)
@@ -104,6 +123,8 @@ func OpenTimeSeriesStore(shardID int, path string, compressLevel int, valueSize
opt(btss)
}
btss.dbOpts = btss.dbOpts.WithMaxLevels(1)
+ // Put all values into LSM
+ btss.dbOpts = btss.dbOpts.WithVLogPercentile(1.0)
var err error
btss.db, err = badger.Open(btss.dbOpts)
if err != nil {
@@ -126,6 +147,26 @@ func StoreWithLogger(l *logger.Logger) StoreOptions {
}
}
+// StoreWithBufferSize sets a external logger into underlying Store
+func StoreWithBufferSize(size int64) StoreOptions {
+ return func(store Store) {
+ if bdb, ok := store.(*badgerDB); ok {
+ bdb.dbOpts = bdb.dbOpts.WithMemTableSize(size)
+ }
+ }
+}
+
+type FlushCallback func()
+
+// StoreWithFlushCallback sets a callback function
+func StoreWithFlushCallback(callback FlushCallback) StoreOptions {
+ return func(store Store) {
+ if bdb, ok := store.(*badgerDB); ok {
+ bdb.dbOpts.FlushCallBack = callback
+ }
+ }
+}
+
// OpenStore creates a new Store
func OpenStore(shardID int, path string, options ...StoreOptions) (Store, error) {
bdb := new(badgerDB)
@@ -139,13 +180,40 @@ func OpenStore(shardID int, path string, options ...StoreOptions) (Store, error)
var err error
bdb.db, err = badger.Open(bdb.dbOpts)
if err != nil {
- return nil, fmt.Errorf("failed to open time series store: %v", err)
+ return nil, fmt.Errorf("failed to open normal store: %v", err)
}
- if bdb.seqKey != "" {
- bdb.seq, err = bdb.db.GetSequence([]byte(bdb.seqKey), 100)
- if err != nil {
- return nil, fmt.Errorf("failed to get sequence: %v", err)
+ return bdb, nil
+}
+
+type IndexOptions func(store IndexStore)
+
+// IndexWithLogger sets a external logger into underlying IndexStore
+func IndexWithLogger(l *logger.Logger) IndexOptions {
+ return func(store IndexStore) {
+ if bdb, ok := store.(*badgerDB); ok {
+ bdb.dbOpts = bdb.dbOpts.WithLogger(&badgerLog{
+ delegated: l.Named("index-kv"),
+ })
}
}
+}
+
+// OpenIndexStore creates a new IndexStore
+func OpenIndexStore(shardID int, path string, options ...IndexOptions) (IndexStore, error) {
+ bdb := new(badgerDB)
+ bdb.shardID = shardID
+ bdb.dbOpts = badger.DefaultOptions(path)
+ for _, opt := range options {
+ opt(bdb)
+ }
+ bdb.dbOpts = bdb.dbOpts.WithMaxLevels(1)
+ // Put all values into LSM
+ bdb.dbOpts = bdb.dbOpts.WithVLogPercentile(1.0)
+
+ var err error
+ bdb.db, err = badger.Open(bdb.dbOpts)
+ if err != nil {
+ return nil, fmt.Errorf("failed to index store: %v", err)
+ }
return bdb, nil
}
diff --git a/banyand/storage/block.go b/banyand/storage/block.go
index 8bbf50f..76a8b1f 100644
--- a/banyand/storage/block.go
+++ b/banyand/storage/block.go
@@ -30,8 +30,9 @@ type block struct {
l *logger.Logger
- stores map[string]kv.Store
- tsStores map[string]kv.TimeSeriesStore
+ stores map[string]kv.Store
+ tsStores map[string]kv.TimeSeriesStore
+ indexStores map[string]kv.IndexStore
shardID int
}
@@ -39,12 +40,13 @@ type block struct {
func newBlock(shardID int, path string, plugin Plugin) (*block, error) {
l := logger.GetLogger("block")
return &block{
- shardID: shardID,
- path: path,
- plugin: plugin,
- l: l,
- stores: make(map[string]kv.Store),
- tsStores: make(map[string]kv.TimeSeriesStore),
+ shardID: shardID,
+ path: path,
+ plugin: plugin,
+ l: l,
+ stores: make(map[string]kv.Store),
+ tsStores: make(map[string]kv.TimeSeriesStore),
+ indexStores: make(map[string]kv.IndexStore),
}, nil
}
@@ -69,6 +71,12 @@ func (b *block) createKV(defines []KVSpec) (err error) {
case KVTypeNormal:
var s kv.Store
opts := make([]kv.StoreOptions, 0)
+ if define.BufferSize > 0 {
+ opts = append(opts, kv.StoreWithBufferSize(define.BufferSize))
+ }
+ if define.FlushCallback != nil {
+ opts = append(opts, kv.StoreWithFlushCallback(define.FlushCallback))
+ }
opts = append(opts, kv.StoreWithLogger(b.l))
if s, err = kv.OpenStore(b.shardID, path, opts...); err != nil {
return fmt.Errorf("failed to open normal store: %w", err)
@@ -76,10 +84,17 @@ func (b *block) createKV(defines []KVSpec) (err error) {
b.stores[storeID] = s
case KVTypeTimeSeries:
var s kv.TimeSeriesStore
- if s, err = kv.OpenTimeSeriesStore(b.shardID, path, define.CompressLevel, define.ValueSize, kv.TSSWithLogger(b.l)); err != nil {
+ if s, err = kv.OpenTimeSeriesStore(b.shardID, path, define.CompressLevel, define.ValueSize,
+ kv.TSSWithLogger(b.l)); err != nil {
return fmt.Errorf("failed to open time series store: %w", err)
}
b.tsStores[storeID] = s
+ case KVTypeIndex:
+ var s kv.IndexStore
+ if s, err = kv.OpenIndexStore(b.shardID, path, kv.IndexWithLogger(b.l)); err != nil {
+ return fmt.Errorf("failed to open time series store: %w", err)
+ }
+ b.indexStores[storeID] = s
}
}
return nil
@@ -92,4 +107,7 @@ func (b *block) close() {
for _, store := range b.tsStores {
_ = store.Close()
}
+ for _, store := range b.indexStores {
+ _ = store.Close()
+ }
}
diff --git a/banyand/storage/database.go b/banyand/storage/database.go
index f69af0f..9302ab0 100644
--- a/banyand/storage/database.go
+++ b/banyand/storage/database.go
@@ -127,6 +127,15 @@ func (s *series) TimeSeriesReader(shard uint, name string, start, end uint64) kv
return b.tsStores[name]
}
+func (s *series) Index(shard uint, name string) kv.IndexStore {
+ //TODO: find targets in all blocks
+ b, ok := s.sLst[shard].activeBlock.Load().(*block)
+ if !ok {
+ return nil
+ }
+ return b.indexStores[name]
+}
+
func (s *series) load(meta PluginMeta) error {
//TODO: to implement load instead of removing old contents
return os.RemoveAll(s.location)
diff --git a/banyand/storage/database_test.go b/banyand/storage/database_test.go
index 754c25b..89c910e 100644
--- a/banyand/storage/database_test.go
+++ b/banyand/storage/database_test.go
@@ -29,7 +29,12 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
+ "github.com/apache/skywalking-banyandb/api/common"
+ "github.com/apache/skywalking-banyandb/banyand/kv"
+ "github.com/apache/skywalking-banyandb/pkg/convert"
"github.com/apache/skywalking-banyandb/pkg/logger"
+ "github.com/apache/skywalking-banyandb/pkg/posting"
+ "github.com/apache/skywalking-banyandb/pkg/posting/roaring"
)
func TestDB_Create_Directory(t *testing.T) {
@@ -59,6 +64,7 @@ func TestDB_Create_Directory(t *testing.T) {
}
func TestDB_Store(t *testing.T) {
+ is := require.New(t)
ctrl := gomock.NewController(t)
defer ctrl.Finish()
now := uint64(time.Now().UnixNano())
@@ -75,18 +81,137 @@ func TestDB_Store(t *testing.T) {
removeDir(tempDir)
}()
- assert.NoError(t, ap.Writer(0, "normal").Put([]byte("key1"), []byte{12}))
+ is.NoError(ap.Writer(0, "normal").Put([]byte("key1"), []byte{12}))
val, err := repo.Reader(0, "normal", now, now).Get([]byte("key1"))
- assert.NoError(t, err)
- assert.Equal(t, []byte{12}, val)
+ is.NoError(err)
+ is.Equal([]byte{12}, val)
- assert.NoError(t, ap.TimeSeriesWriter(1, "time-series").Put([]byte("key11"), []byte{33}, 1))
+ is.NoError(ap.TimeSeriesWriter(1, "time-series").Put([]byte("key11"), []byte{33}, 1))
val, err = repo.TimeSeriesReader(1, "time-series", now, now).Get([]byte("key11"), 1)
- assert.NoError(t, err)
- assert.Equal(t, []byte{33}, val)
+ is.NoError(err)
+ is.Equal([]byte{33}, val)
vals, allErr := repo.TimeSeriesReader(1, "time-series", now, now).GetAll([]byte("key11"))
- assert.NoError(t, allErr)
- assert.Equal(t, [][]byte{{33}}, vals)
+ is.NoError(allErr)
+ is.Equal([][]byte{{33}}, vals)
+
+ index := repo.Index(1, "index")
+ is.NoError(index.Handover(mockMemtable([]uint64{1, 2}, []uint64{3, 6})))
+ list, err := index.Seek(convert.Int64ToBytes(0), 2)
+ is.NoError(err)
+ is.Equal(2, list.Len())
+ is.True(list.Contains(common.ChunkID(1)))
+ is.True(list.Contains(common.ChunkID(2)))
+ list, err = index.Seek(convert.Int64ToBytes(1), 2)
+ is.NoError(err)
+ is.Equal(2, list.Len())
+ is.True(list.Contains(common.ChunkID(3)))
+ is.True(list.Contains(common.ChunkID(6)))
+
+ is.NoError(index.Handover(mockMemtable([]uint64{11, 14})))
+ list, err = index.Seek(convert.Int64ToBytes(0), 2)
+ is.NoError(err)
+ is.Equal(4, list.Len())
+ is.True(list.Contains(common.ChunkID(1)))
+ is.True(list.Contains(common.ChunkID(2)))
+ is.True(list.Contains(common.ChunkID(11)))
+ is.True(list.Contains(common.ChunkID(14)))
+}
+
+func TestDB_FlushCallback(t *testing.T) {
+ is := require.New(t)
+ ctrl := gomock.NewController(t)
+ defer ctrl.Finish()
+ now := uint64(time.Now().UnixNano())
+ var ap WritePoint
+ //var repo StoreRepo
+ p := NewMockPlugin(ctrl)
+ latch := make(chan struct{})
+ closed := false
+ p.EXPECT().Meta().Return(PluginMeta{
+ ID: "sw",
+ Group: "default",
+ ShardNumber: 2,
+ KVSpecs: []KVSpec{
+ {
+ Name: "normal",
+ Type: KVTypeNormal,
+ BufferSize: 10 << 20,
+ FlushCallback: func() {
+ if closed {
+ return
+ }
+ close(latch)
+ closed = true
+ },
+ },
+ },
+ }).AnyTimes()
+ p.EXPECT().Init(gomock.Any(), gomock.Any()).Do(func(r StoreRepo, wp GetWritePoint) {
+ ap = wp(now)
+ }).AnyTimes()
+
+ tempDir, db := setUp(t, p)
+ defer func() {
+ db.GracefulStop()
+ removeDir(tempDir)
+ }()
+ for i := 0; i < 5000; i++ {
+ key := make([]byte, i)
+ _ = ap.Writer(0, "normal").Put(key, []byte{1})
+ }
+ ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
+ defer cancel()
+ select {
+ case <-latch:
+ case <-ctx.Done():
+ is.Fail("timeout")
+ }
+}
+
+var _ kv.Iterator = (*iter)(nil)
+
+type iter struct {
+ data map[int]posting.List
+ p int
+}
+
+func (i *iter) Next() {
+ i.p++
+}
+
+func (i *iter) Rewind() {
+ i.p = 0
+}
+
+func (i *iter) Seek(key []byte) {
+ panic("implement me")
+}
+
+func (i *iter) Key() []byte {
+ return convert.Int64ToBytes(int64(i.p))
+}
+
+func (i *iter) Val() posting.List {
+ return i.data[i.p]
+}
+
+func (i *iter) Valid() bool {
+ _, ok := i.data[i.p]
+ return ok
+}
+
+func (i *iter) Close() error {
+ return nil
+}
+
+func mockMemtable(data ...[]uint64) kv.Iterator {
+ it := &iter{
+ data: make(map[int]posting.List),
+ }
+ for i, d := range data {
+ it.data[i] = roaring.NewPostingListWithInitialData(d...)
+ }
+ return it
}
func mockPlugin(ctrl *gomock.Controller, f func(repo StoreRepo, get GetWritePoint)) Plugin {
@@ -105,6 +230,10 @@ func mockPlugin(ctrl *gomock.Controller, f func(repo StoreRepo, get GetWritePoin
Type: KVTypeTimeSeries,
CompressLevel: 3,
},
+ {
+ Name: "index",
+ Type: KVTypeIndex,
+ },
},
}).AnyTimes()
p.EXPECT().Init(gomock.Any(), gomock.Any()).Do(func(r StoreRepo, wp GetWritePoint) {
diff --git a/banyand/storage/storage.go b/banyand/storage/storage.go
index 8ac58ec..6219842 100644
--- a/banyand/storage/storage.go
+++ b/banyand/storage/storage.go
@@ -35,6 +35,8 @@ const (
KVTypeNormal KVType = 0
// KVTypeTimeSeries is a time-series KV storage
KVTypeTimeSeries KVType = 1
+ // KVTypeIndex is an index KV storage
+ KVTypeIndex KVType = 2
)
// Database is the storage manager which implements the physical data model
@@ -82,13 +84,16 @@ type CompressSpec struct {
type KVSpec struct {
Name string
Type KVType
+ BufferSize int64
CompressLevel int
ValueSize int
+ FlushCallback kv.FlushCallback
}
type StoreRepo interface {
Reader(shard uint, name string, start, end uint64) kv.Reader
TimeSeriesReader(shard uint, name string, start, end uint64) kv.TimeSeriesReader
+ Index(shard uint, name string) kv.IndexStore
}
// WritePoint is a reference to a underlying area.
diff --git a/go.mod b/go.mod
index 864feed..65674a5 100644
--- a/go.mod
+++ b/go.mod
@@ -27,4 +27,4 @@ require (
google.golang.org/protobuf v1.27.1
)
-replace github.com/dgraph-io/badger/v3 v3.2011.1 => github.com/SkyAPM/badger/v3 v3.0.0-20210808063906-49c6d778ad7d
+replace github.com/dgraph-io/badger/v3 v3.2011.1 => github.com/SkyAPM/badger/v3 v3.0.0-20210809093509-ff1b2dd81165
diff --git a/go.sum b/go.sum
index 90495e8..14301da 100644
--- a/go.sum
+++ b/go.sum
@@ -45,8 +45,8 @@ github.com/RoaringBitmap/gocroaring v0.4.0/go.mod h1:NieMwz7ZqwU2DD73/vvYwv7r4eW
github.com/RoaringBitmap/real-roaring-datasets v0.0.0-20190726190000-eb7c87156f76/go.mod h1:oM0MHmQ3nDsq609SS36p+oYbRi16+oVvU2Bw4Ipv0SE=
github.com/RoaringBitmap/roaring v0.9.1 h1:5PRizBmoN/PfV17nPNQou4dHQ7NcJi8FO/bihdYyCEM=
github.com/RoaringBitmap/roaring v0.9.1/go.mod h1:h1B7iIUOmnAeb5ytYMvnHJwxMc6LUrwBnzXWRuqTQUc=
-github.com/SkyAPM/badger/v3 v3.0.0-20210808063906-49c6d778ad7d h1:6B7YHxp79aOfRuMXfWdSyNSZEDqJqSzQWWkQcmqyI9s=
-github.com/SkyAPM/badger/v3 v3.0.0-20210808063906-49c6d778ad7d/go.mod h1:dULbq6ehJ5K0cGW/1TQ9iSfUk0gbSiToDWmWmTsJ53E=
+github.com/SkyAPM/badger/v3 v3.0.0-20210809093509-ff1b2dd81165 h1:csoTNiGUMtp4H1AchgaZWJ4WY4uJQ6s+pz3sXS93jAA=
+github.com/SkyAPM/badger/v3 v3.0.0-20210809093509-ff1b2dd81165/go.mod h1:dULbq6ehJ5K0cGW/1TQ9iSfUk0gbSiToDWmWmTsJ53E=
github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o=
github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8=
diff --git a/pkg/posting/posting.go b/pkg/posting/posting.go
index 2d7ad16..d997ab0 100644
--- a/pkg/posting/posting.go
+++ b/pkg/posting/posting.go
@@ -60,6 +60,10 @@ type List interface {
Reset()
ToSlice() []common.ChunkID
+
+ Marshall() ([]byte, error)
+
+ Unmarshall(data []byte) error
}
type Iterator interface {
diff --git a/pkg/posting/roaring/roaring.go b/pkg/posting/roaring/roaring.go
index 0b1c9b6..63eb2bc 100644
--- a/pkg/posting/roaring/roaring.go
+++ b/pkg/posting/roaring/roaring.go
@@ -40,6 +40,14 @@ type postingsList struct {
bitmap *roaring64.Bitmap
}
+func (p *postingsList) Marshall() ([]byte, error) {
+ return p.bitmap.MarshalBinary()
+}
+
+func (p *postingsList) Unmarshall(data []byte) error {
+ return p.bitmap.UnmarshalBinary(data)
+}
+
func NewPostingList() posting.List {
return &postingsList{
bitmap: roaring64.New(),