You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ha...@apache.org on 2021/08/10 01:57:33 UTC

[skywalking-banyandb] branch storage-table created (now 2424a82)

This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a change to branch storage-table
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git.


      at 2424a82  Introduce index store

This branch includes the following new commits:

     new a4ce4df  Update badger, using new APIs
     new 302935e  Merge remote-tracking branch 'origin/main' into storage-table
     new 2424a82  Introduce index store

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


[skywalking-banyandb] 02/03: Merge remote-tracking branch 'origin/main' into storage-table

Posted by ha...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch storage-table
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit 302935e3a49b761bc29eb764891834321e5d1ba5
Merge: a4ce4df b7845b0
Author: Gao Hongtao <ha...@gmail.com>
AuthorDate: Tue Aug 10 09:48:15 2021 +0800

    Merge remote-tracking branch 'origin/main' into storage-table

 api/proto/banyandb/v1/schema.pb.go              | 364 ++++++++++++------------
 api/proto/banyandb/v1/schema.proto              |  14 +-
 banyand/series/schema/sw/trace_series.textproto |   2 +-
 3 files changed, 191 insertions(+), 189 deletions(-)

[skywalking-banyandb] 01/03: Update badger, using new APIs

Posted by ha...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch storage-table
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit a4ce4df9b4bc77c3c78619c309fbb8e4d216f899
Author: Gao Hongtao <ha...@gmail.com>
AuthorDate: Sun Aug 8 14:56:31 2021 +0800

    Update badger, using new APIs
    
    Signed-off-by: Gao Hongtao <ha...@gmail.com>
---
 banyand/kv/badger.go | 71 ++++++++++++++++++++--------------------------------
 banyand/kv/kv.go     | 12 +++++----
 go.mod               |  2 +-
 go.sum               |  8 +++---
 4 files changed, 39 insertions(+), 54 deletions(-)

diff --git a/banyand/kv/badger.go b/banyand/kv/badger.go
index d118a58..05b4e51 100644
--- a/banyand/kv/badger.go
+++ b/banyand/kv/badger.go
@@ -19,8 +19,10 @@ package kv
 
 import (
 	"log"
+	"math"
 
 	"github.com/dgraph-io/badger/v3"
+	"github.com/dgraph-io/badger/v3/y"
 
 	"github.com/apache/skywalking-banyandb/pkg/logger"
 )
@@ -52,36 +54,26 @@ type badgerDB struct {
 }
 
 func (b *badgerDB) Scan(key []byte, opt ScanOpts, f ScanFunc) error {
-	err := b.db.View(func(txn *badger.Txn) error {
-		opts := badger.DefaultIteratorOptions
-		opts.PrefetchSize = opt.PrefetchSize
-		opts.PrefetchValues = opt.PrefetchValues
-		it := txn.NewIterator(opts)
-		defer it.Close()
-		for it.Seek(key); it.Valid(); it.Next() {
-			item := it.Item()
-			k := item.Key()
-			err := f(b.shardID, k, func() ([]byte, error) {
-				var val []byte
-				err := item.Value(func(v []byte) error {
-					val = v
-					return nil
-				})
-				if err != nil {
-					return nil, err
-				}
-				return val, nil
-			})
-			if err == ErrStopScan {
-				break
-			}
-			if err != nil {
-				return err
-			}
+	opts := badger.DefaultIteratorOptions
+	opts.PrefetchSize = opt.PrefetchSize
+	opts.PrefetchValues = opt.PrefetchValues
+	it := b.db.NewIterator(opts)
+	defer func() {
+		_ = it.Close()
+	}()
+	for it.Seek(y.KeyWithTs(key, math.MaxInt64)); it.Valid(); it.Next() {
+		k := y.ParseKey(it.Key())
+		err := f(b.shardID, k, func() ([]byte, error) {
+			return y.Copy(it.Value().Value), nil
+		})
+		if err == ErrStopScan {
+			break
+		}
+		if err != nil {
+			return err
 		}
-		return nil
-	})
-	return err
+	}
+	return nil
 }
 
 func (b *badgerDB) Close() error {
@@ -92,24 +84,15 @@ func (b *badgerDB) Close() error {
 }
 
 func (b *badgerDB) Put(key, val []byte) error {
-	return b.db.Update(func(txn *badger.Txn) error {
-		return txn.Set(key, val)
-	})
+	return b.db.Put(y.KeyWithTs(key, math.MaxInt64), val)
 }
 
 func (b *badgerDB) Get(key []byte) ([]byte, error) {
-	var bb []byte
-	err := b.db.View(func(txn *badger.Txn) error {
-		item, err := txn.Get(key)
-		if err != nil {
-			return err
-		}
-		return item.Value(func(val []byte) error {
-			bb = val
-			return nil
-		})
-	})
-	return bb, err
+	v, err := b.db.Get(y.KeyWithTs(key, math.MaxInt64))
+	if err != nil {
+		return nil, err
+	}
+	return v.Value, nil
 }
 
 // badgerLog delegates the zap log to the badger logger
diff --git a/banyand/kv/kv.go b/banyand/kv/kv.go
index 34a5188..f3a2d55 100644
--- a/banyand/kv/kv.go
+++ b/banyand/kv/kv.go
@@ -27,11 +27,13 @@ import (
 	"github.com/apache/skywalking-banyandb/pkg/logger"
 )
 
-var ErrStopScan = errors.New("stop scanning")
-var DefaultScanOpts = ScanOpts{
-	PrefetchSize:   100,
-	PrefetchValues: true,
-}
+var (
+	ErrStopScan     = errors.New("stop scanning")
+	DefaultScanOpts = ScanOpts{
+		PrefetchSize:   100,
+		PrefetchValues: true,
+	}
+)
 
 type Writer interface {
 	// Put a value
diff --git a/go.mod b/go.mod
index 5ad72a0..864feed 100644
--- a/go.mod
+++ b/go.mod
@@ -27,4 +27,4 @@ require (
 	google.golang.org/protobuf v1.27.1
 )
 
-replace github.com/dgraph-io/badger/v3 v3.2011.1 => github.com/SkyAPM/badger/v3 v3.0.0-20210624023741-bd2dcfcaaa74
+replace github.com/dgraph-io/badger/v3 v3.2011.1 => github.com/SkyAPM/badger/v3 v3.0.0-20210808063906-49c6d778ad7d
diff --git a/go.sum b/go.sum
index ce8fd5c..90495e8 100644
--- a/go.sum
+++ b/go.sum
@@ -45,8 +45,8 @@ github.com/RoaringBitmap/gocroaring v0.4.0/go.mod h1:NieMwz7ZqwU2DD73/vvYwv7r4eW
 github.com/RoaringBitmap/real-roaring-datasets v0.0.0-20190726190000-eb7c87156f76/go.mod h1:oM0MHmQ3nDsq609SS36p+oYbRi16+oVvU2Bw4Ipv0SE=
 github.com/RoaringBitmap/roaring v0.9.1 h1:5PRizBmoN/PfV17nPNQou4dHQ7NcJi8FO/bihdYyCEM=
 github.com/RoaringBitmap/roaring v0.9.1/go.mod h1:h1B7iIUOmnAeb5ytYMvnHJwxMc6LUrwBnzXWRuqTQUc=
-github.com/SkyAPM/badger/v3 v3.0.0-20210624023741-bd2dcfcaaa74 h1:BFHSkDBSYCtPxMgxGz07DfNRYS76KFVDlocQ2U9rY7E=
-github.com/SkyAPM/badger/v3 v3.0.0-20210624023741-bd2dcfcaaa74/go.mod h1:XieWaNygSGj5ZzSsZO4tQe/2wwLjCvESus4twFqxOKc=
+github.com/SkyAPM/badger/v3 v3.0.0-20210808063906-49c6d778ad7d h1:6B7YHxp79aOfRuMXfWdSyNSZEDqJqSzQWWkQcmqyI9s=
+github.com/SkyAPM/badger/v3 v3.0.0-20210808063906-49c6d778ad7d/go.mod h1:dULbq6ehJ5K0cGW/1TQ9iSfUk0gbSiToDWmWmTsJ53E=
 github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
 github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o=
 github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8=
@@ -79,8 +79,8 @@ github.com/cpuguy83/go-md2man/v2 v2.0.0/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsr
 github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
 github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
 github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
-github.com/dgraph-io/ristretto v0.0.4-0.20210504190834-0bf2acd73aa3 h1:jU/wpYsEL+8JPLf/QcjkQKI5g0dOjSuwcMjkThxt5x0=
-github.com/dgraph-io/ristretto v0.0.4-0.20210504190834-0bf2acd73aa3/go.mod h1:fux0lOrBhrVCJd3lcTHsIJhq1T2rokOu6v9Vcb3Q9ug=
+github.com/dgraph-io/ristretto v0.1.0 h1:Jv3CGQHp9OjuMBSne1485aDpUkTKEcUqF+jm/LuerPI=
+github.com/dgraph-io/ristretto v0.1.0/go.mod h1:fux0lOrBhrVCJd3lcTHsIJhq1T2rokOu6v9Vcb3Q9ug=
 github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 h1:tdlZCpZ/P9DhczCTSixgIKmwPv6+wP5DGjqLYw5SUiA=
 github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw=
 github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo=

[skywalking-banyandb] 03/03: Introduce index store

Posted by ha...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch storage-table
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit 2424a8236c656da3395f6c59b8f0a5d20e5c181a
Author: Gao Hongtao <ha...@gmail.com>
AuthorDate: Tue Aug 10 09:56:28 2021 +0800

    Introduce index store
    
    Signed-off-by: Gao Hongtao <ha...@gmail.com>
---
 banyand/kv/badger.go             | 103 +++++++++++++++++++++++++--
 banyand/kv/kv.go                 |  78 +++++++++++++++++++--
 banyand/storage/block.go         |  36 +++++++---
 banyand/storage/database.go      |   9 +++
 banyand/storage/database_test.go | 145 ++++++++++++++++++++++++++++++++++++---
 banyand/storage/storage.go       |   5 ++
 go.mod                           |   2 +-
 go.sum                           |   4 +-
 pkg/posting/posting.go           |   4 ++
 pkg/posting/roaring/roaring.go   |   8 +++
 10 files changed, 365 insertions(+), 29 deletions(-)

diff --git a/banyand/kv/badger.go b/banyand/kv/badger.go
index 05b4e51..64c9f29 100644
--- a/banyand/kv/badger.go
+++ b/banyand/kv/badger.go
@@ -18,16 +18,27 @@
 package kv
 
 import (
+	"bytes"
 	"log"
 	"math"
+	"time"
 
 	"github.com/dgraph-io/badger/v3"
 	"github.com/dgraph-io/badger/v3/y"
+	"go.uber.org/multierr"
 
 	"github.com/apache/skywalking-banyandb/pkg/logger"
+	"github.com/apache/skywalking-banyandb/pkg/posting"
+	"github.com/apache/skywalking-banyandb/pkg/posting/roaring"
 )
 
-var _ TimeSeriesStore = (*badgerTSS)(nil)
+var (
+	_             Store           = (*badgerDB)(nil)
+	_             IndexStore      = (*badgerDB)(nil)
+	_             y.Iterator      = (*mergedIter)(nil)
+	_             TimeSeriesStore = (*badgerTSS)(nil)
+	bitMergeEntry byte            = 1 << 3
+)
 
 type badgerTSS struct {
 	shardID int
@@ -43,14 +54,98 @@ func (b *badgerTSS) Close() error {
 	return nil
 }
 
-var _ Store = (*badgerDB)(nil)
+type mergedIter struct {
+	delegated Iterator
+	valid     bool
+	data      []byte
+}
+
+func (i *mergedIter) Next() {
+	i.delegated.Next()
+	i.parseData()
+}
+
+func (i *mergedIter) Rewind() {
+	i.delegated.Rewind()
+	i.parseData()
+}
+
+func (i *mergedIter) Seek(key []byte) {
+	i.delegated.Seek(y.KeyWithTs(key, math.MaxInt64))
+}
+
+func (i *mergedIter) Key() []byte {
+	return y.KeyWithTs(i.delegated.Key(), uint64(time.Now().UnixNano()))
+}
+
+func (i *mergedIter) Valid() bool {
+	return i.valid
+}
+
+func (i *mergedIter) parseData() {
+	i.data = nil
+	i.valid = i.delegated.Valid()
+	if !i.valid {
+		return
+	}
+	data, err := i.delegated.Val().Marshall()
+	if err != nil {
+		i.valid = false
+		return
+	}
+	i.data = data
+}
+
+func (i *mergedIter) Close() error {
+	i.data = nil
+	i.valid = false
+	return i.delegated.Close()
+}
+
+func (i mergedIter) Value() y.ValueStruct {
+	return y.ValueStruct{
+		Value: i.data,
+		Meta:  bitMergeEntry,
+	}
+}
 
 type badgerDB struct {
 	shardID int
 	dbOpts  badger.Options
 	db      *badger.DB
-	seqKey  string
-	seq     *badger.Sequence
+}
+
+func (b *badgerDB) Handover(iterator Iterator) error {
+	return b.db.HandoverIterator(&mergedIter{
+		delegated: iterator,
+	})
+}
+
+func (b *badgerDB) Seek(key []byte, limit int) (posting.List, error) {
+	opts := badger.DefaultIteratorOptions
+	it := b.db.NewIterator(opts)
+	defer func() {
+		_ = it.Close()
+	}()
+	result := roaring.NewPostingList()
+	var errMerged error
+	for it.Seek(y.KeyWithTs(key, math.MaxInt64)); it.Valid(); it.Next() {
+		k := y.ParseKey(it.Key())
+		if !bytes.Equal(key, k) {
+			break
+		}
+		list := roaring.NewPostingList()
+		err := list.Unmarshall(it.Value().Value)
+		if err != nil {
+			errMerged = multierr.Append(errMerged, err)
+			continue
+		}
+		_ = result.Union(list)
+		if result.Len() > limit {
+			break
+		}
+	}
+	return result, errMerged
 }
 
 func (b *badgerDB) Scan(key []byte, opt ScanOpts, f ScanFunc) error {
diff --git a/banyand/kv/kv.go b/banyand/kv/kv.go
index f3a2d55..67d6938 100644
--- a/banyand/kv/kv.go
+++ b/banyand/kv/kv.go
@@ -25,6 +25,7 @@ import (
 	"github.com/pkg/errors"
 
 	"github.com/apache/skywalking-banyandb/pkg/logger"
+	"github.com/apache/skywalking-banyandb/pkg/posting"
 )
 
 var (
@@ -95,6 +96,24 @@ func TSSWithLogger(l *logger.Logger) TimeSeriesOptions {
 	}
 }
 
+type Iterator interface {
+	Next()
+	Rewind()
+	Seek(key []byte)
+	Key() []byte
+	Val() posting.List
+	Valid() bool
+	Close() error
+}
+
+type HandoverCallback func()
+
+type IndexStore interface {
+	Handover(iterator Iterator) error
+	Seek(key []byte, limit int) (posting.List, error)
+	Close() error
+}
+
 // OpenTimeSeriesStore creates a new TimeSeriesStore
 func OpenTimeSeriesStore(shardID int, path string, compressLevel int, valueSize int, options ...TimeSeriesOptions) (TimeSeriesStore, error) {
 	btss := new(badgerTSS)
@@ -104,6 +123,8 @@ func OpenTimeSeriesStore(shardID int, path string, compressLevel int, valueSize
 		opt(btss)
 	}
 	btss.dbOpts = btss.dbOpts.WithMaxLevels(1)
+	// Put all values into LSM
+	btss.dbOpts = btss.dbOpts.WithVLogPercentile(1.0)
 	var err error
 	btss.db, err = badger.Open(btss.dbOpts)
 	if err != nil {
@@ -126,6 +147,26 @@ func StoreWithLogger(l *logger.Logger) StoreOptions {
 	}
 }
 
+// StoreWithBufferSize sets a external logger into underlying Store
+func StoreWithBufferSize(size int64) StoreOptions {
+	return func(store Store) {
+		if bdb, ok := store.(*badgerDB); ok {
+			bdb.dbOpts = bdb.dbOpts.WithMemTableSize(size)
+		}
+	}
+}
+
+type FlushCallback func()
+
+// StoreWithFlushCallback sets a callback function
+func StoreWithFlushCallback(callback FlushCallback) StoreOptions {
+	return func(store Store) {
+		if bdb, ok := store.(*badgerDB); ok {
+			bdb.dbOpts.FlushCallBack = callback
+		}
+	}
+}
+
 // OpenStore creates a new Store
 func OpenStore(shardID int, path string, options ...StoreOptions) (Store, error) {
 	bdb := new(badgerDB)
@@ -139,13 +180,40 @@ func OpenStore(shardID int, path string, options ...StoreOptions) (Store, error)
 	var err error
 	bdb.db, err = badger.Open(bdb.dbOpts)
 	if err != nil {
-		return nil, fmt.Errorf("failed to open time series store: %v", err)
+		return nil, fmt.Errorf("failed to open normal store: %v", err)
 	}
-	if bdb.seqKey != "" {
-		bdb.seq, err = bdb.db.GetSequence([]byte(bdb.seqKey), 100)
-		if err != nil {
-			return nil, fmt.Errorf("failed to get sequence: %v", err)
+	return bdb, nil
+}
+
+type IndexOptions func(store IndexStore)
+
+// IndexWithLogger sets a external logger into underlying IndexStore
+func IndexWithLogger(l *logger.Logger) IndexOptions {
+	return func(store IndexStore) {
+		if bdb, ok := store.(*badgerDB); ok {
+			bdb.dbOpts = bdb.dbOpts.WithLogger(&badgerLog{
+				delegated: l.Named("index-kv"),
+			})
 		}
 	}
+}
+
+// OpenIndexStore creates a new IndexStore
+func OpenIndexStore(shardID int, path string, options ...IndexOptions) (IndexStore, error) {
+	bdb := new(badgerDB)
+	bdb.shardID = shardID
+	bdb.dbOpts = badger.DefaultOptions(path)
+	for _, opt := range options {
+		opt(bdb)
+	}
+	bdb.dbOpts = bdb.dbOpts.WithMaxLevels(1)
+	// Put all values into LSM
+	bdb.dbOpts = bdb.dbOpts.WithVLogPercentile(1.0)
+
+	var err error
+	bdb.db, err = badger.Open(bdb.dbOpts)
+	if err != nil {
+		return nil, fmt.Errorf("failed to index store: %v", err)
+	}
 	return bdb, nil
 }
diff --git a/banyand/storage/block.go b/banyand/storage/block.go
index 8bbf50f..76a8b1f 100644
--- a/banyand/storage/block.go
+++ b/banyand/storage/block.go
@@ -30,8 +30,9 @@ type block struct {
 
 	l *logger.Logger
 
-	stores   map[string]kv.Store
-	tsStores map[string]kv.TimeSeriesStore
+	stores      map[string]kv.Store
+	tsStores    map[string]kv.TimeSeriesStore
+	indexStores map[string]kv.IndexStore
 
 	shardID int
 }
@@ -39,12 +40,13 @@ type block struct {
 func newBlock(shardID int, path string, plugin Plugin) (*block, error) {
 	l := logger.GetLogger("block")
 	return &block{
-		shardID:  shardID,
-		path:     path,
-		plugin:   plugin,
-		l:        l,
-		stores:   make(map[string]kv.Store),
-		tsStores: make(map[string]kv.TimeSeriesStore),
+		shardID:     shardID,
+		path:        path,
+		plugin:      plugin,
+		l:           l,
+		stores:      make(map[string]kv.Store),
+		tsStores:    make(map[string]kv.TimeSeriesStore),
+		indexStores: make(map[string]kv.IndexStore),
 	}, nil
 }
 
@@ -69,6 +71,12 @@ func (b *block) createKV(defines []KVSpec) (err error) {
 		case KVTypeNormal:
 			var s kv.Store
 			opts := make([]kv.StoreOptions, 0)
+			if define.BufferSize > 0 {
+				opts = append(opts, kv.StoreWithBufferSize(define.BufferSize))
+			}
+			if define.FlushCallback != nil {
+				opts = append(opts, kv.StoreWithFlushCallback(define.FlushCallback))
+			}
 			opts = append(opts, kv.StoreWithLogger(b.l))
 			if s, err = kv.OpenStore(b.shardID, path, opts...); err != nil {
 				return fmt.Errorf("failed to open normal store: %w", err)
@@ -76,10 +84,17 @@ func (b *block) createKV(defines []KVSpec) (err error) {
 			b.stores[storeID] = s
 		case KVTypeTimeSeries:
 			var s kv.TimeSeriesStore
-			if s, err = kv.OpenTimeSeriesStore(b.shardID, path, define.CompressLevel, define.ValueSize, kv.TSSWithLogger(b.l)); err != nil {
+			if s, err = kv.OpenTimeSeriesStore(b.shardID, path, define.CompressLevel, define.ValueSize,
+				kv.TSSWithLogger(b.l)); err != nil {
 				return fmt.Errorf("failed to open time series store: %w", err)
 			}
 			b.tsStores[storeID] = s
+		case KVTypeIndex:
+			var s kv.IndexStore
+			if s, err = kv.OpenIndexStore(b.shardID, path, kv.IndexWithLogger(b.l)); err != nil {
+				return fmt.Errorf("failed to open time series store: %w", err)
+			}
+			b.indexStores[storeID] = s
 		}
 	}
 	return nil
@@ -92,4 +107,7 @@ func (b *block) close() {
 	for _, store := range b.tsStores {
 		_ = store.Close()
 	}
+	for _, store := range b.indexStores {
+		_ = store.Close()
+	}
 }
diff --git a/banyand/storage/database.go b/banyand/storage/database.go
index f69af0f..9302ab0 100644
--- a/banyand/storage/database.go
+++ b/banyand/storage/database.go
@@ -127,6 +127,15 @@ func (s *series) TimeSeriesReader(shard uint, name string, start, end uint64) kv
 	return b.tsStores[name]
 }
 
+func (s *series) Index(shard uint, name string) kv.IndexStore {
+	//TODO: find targets in all blocks
+	b, ok := s.sLst[shard].activeBlock.Load().(*block)
+	if !ok {
+		return nil
+	}
+	return b.indexStores[name]
+}
+
 func (s *series) load(meta PluginMeta) error {
 	//TODO: to implement load instead of removing old contents
 	return os.RemoveAll(s.location)
diff --git a/banyand/storage/database_test.go b/banyand/storage/database_test.go
index 754c25b..89c910e 100644
--- a/banyand/storage/database_test.go
+++ b/banyand/storage/database_test.go
@@ -29,7 +29,12 @@ import (
 	"github.com/stretchr/testify/assert"
 	"github.com/stretchr/testify/require"
 
+	"github.com/apache/skywalking-banyandb/api/common"
+	"github.com/apache/skywalking-banyandb/banyand/kv"
+	"github.com/apache/skywalking-banyandb/pkg/convert"
 	"github.com/apache/skywalking-banyandb/pkg/logger"
+	"github.com/apache/skywalking-banyandb/pkg/posting"
+	"github.com/apache/skywalking-banyandb/pkg/posting/roaring"
 )
 
 func TestDB_Create_Directory(t *testing.T) {
@@ -59,6 +64,7 @@ func TestDB_Create_Directory(t *testing.T) {
 }
 
 func TestDB_Store(t *testing.T) {
+	is := require.New(t)
 	ctrl := gomock.NewController(t)
 	defer ctrl.Finish()
 	now := uint64(time.Now().UnixNano())
@@ -75,18 +81,137 @@ func TestDB_Store(t *testing.T) {
 		removeDir(tempDir)
 	}()
 
-	assert.NoError(t, ap.Writer(0, "normal").Put([]byte("key1"), []byte{12}))
+	is.NoError(ap.Writer(0, "normal").Put([]byte("key1"), []byte{12}))
 	val, err := repo.Reader(0, "normal", now, now).Get([]byte("key1"))
-	assert.NoError(t, err)
-	assert.Equal(t, []byte{12}, val)
+	is.NoError(err)
+	is.Equal([]byte{12}, val)
 
-	assert.NoError(t, ap.TimeSeriesWriter(1, "time-series").Put([]byte("key11"), []byte{33}, 1))
+	is.NoError(ap.TimeSeriesWriter(1, "time-series").Put([]byte("key11"), []byte{33}, 1))
 	val, err = repo.TimeSeriesReader(1, "time-series", now, now).Get([]byte("key11"), 1)
-	assert.NoError(t, err)
-	assert.Equal(t, []byte{33}, val)
+	is.NoError(err)
+	is.Equal([]byte{33}, val)
 	vals, allErr := repo.TimeSeriesReader(1, "time-series", now, now).GetAll([]byte("key11"))
-	assert.NoError(t, allErr)
-	assert.Equal(t, [][]byte{{33}}, vals)
+	is.NoError(allErr)
+	is.Equal([][]byte{{33}}, vals)
+
+	index := repo.Index(1, "index")
+	is.NoError(index.Handover(mockMemtable([]uint64{1, 2}, []uint64{3, 6})))
+	list, err := index.Seek(convert.Int64ToBytes(0), 2)
+	is.NoError(err)
+	is.Equal(2, list.Len())
+	is.True(list.Contains(common.ChunkID(1)))
+	is.True(list.Contains(common.ChunkID(2)))
+	list, err = index.Seek(convert.Int64ToBytes(1), 2)
+	is.NoError(err)
+	is.Equal(2, list.Len())
+	is.True(list.Contains(common.ChunkID(3)))
+	is.True(list.Contains(common.ChunkID(6)))
+
+	is.NoError(index.Handover(mockMemtable([]uint64{11, 14})))
+	list, err = index.Seek(convert.Int64ToBytes(0), 2)
+	is.NoError(err)
+	is.Equal(4, list.Len())
+	is.True(list.Contains(common.ChunkID(1)))
+	is.True(list.Contains(common.ChunkID(2)))
+	is.True(list.Contains(common.ChunkID(11)))
+	is.True(list.Contains(common.ChunkID(14)))
+}
+
+func TestDB_FlushCallback(t *testing.T) {
+	is := require.New(t)
+	ctrl := gomock.NewController(t)
+	defer ctrl.Finish()
+	now := uint64(time.Now().UnixNano())
+	var ap WritePoint
+	//var repo StoreRepo
+	p := NewMockPlugin(ctrl)
+	latch := make(chan struct{})
+	closed := false
+	p.EXPECT().Meta().Return(PluginMeta{
+		ID:          "sw",
+		Group:       "default",
+		ShardNumber: 2,
+		KVSpecs: []KVSpec{
+			{
+				Name:       "normal",
+				Type:       KVTypeNormal,
+				BufferSize: 10 << 20,
+				FlushCallback: func() {
+					if closed {
+						return
+					}
+					close(latch)
+					closed = true
+				},
+			},
+		},
+	}).AnyTimes()
+	p.EXPECT().Init(gomock.Any(), gomock.Any()).Do(func(r StoreRepo, wp GetWritePoint) {
+		ap = wp(now)
+	}).AnyTimes()
+
+	tempDir, db := setUp(t, p)
+	defer func() {
+		db.GracefulStop()
+		removeDir(tempDir)
+	}()
+	for i := 0; i < 5000; i++ {
+		key := make([]byte, i)
+		_ = ap.Writer(0, "normal").Put(key, []byte{1})
+	}
+	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
+	defer cancel()
+	select {
+	case <-latch:
+	case <-ctx.Done():
+		is.Fail("timeout")
+	}
+}
+
+var _ kv.Iterator = (*iter)(nil)
+
+type iter struct {
+	data map[int]posting.List
+	p    int
+}
+
+func (i *iter) Next() {
+	i.p++
+}
+
+func (i *iter) Rewind() {
+	i.p = 0
+}
+
+func (i *iter) Seek(key []byte) {
+	panic("implement me")
+}
+
+func (i *iter) Key() []byte {
+	return convert.Int64ToBytes(int64(i.p))
+}
+
+func (i *iter) Val() posting.List {
+	return i.data[i.p]
+}
+
+func (i *iter) Valid() bool {
+	_, ok := i.data[i.p]
+	return ok
+}
+
+func (i *iter) Close() error {
+	return nil
+}
+
+func mockMemtable(data ...[]uint64) kv.Iterator {
+	it := &iter{
+		data: make(map[int]posting.List),
+	}
+	for i, d := range data {
+		it.data[i] = roaring.NewPostingListWithInitialData(d...)
+	}
+	return it
 }
 
 func mockPlugin(ctrl *gomock.Controller, f func(repo StoreRepo, get GetWritePoint)) Plugin {
@@ -105,6 +230,10 @@ func mockPlugin(ctrl *gomock.Controller, f func(repo StoreRepo, get GetWritePoin
 				Type:          KVTypeTimeSeries,
 				CompressLevel: 3,
 			},
+			{
+				Name: "index",
+				Type: KVTypeIndex,
+			},
 		},
 	}).AnyTimes()
 	p.EXPECT().Init(gomock.Any(), gomock.Any()).Do(func(r StoreRepo, wp GetWritePoint) {
diff --git a/banyand/storage/storage.go b/banyand/storage/storage.go
index 8ac58ec..6219842 100644
--- a/banyand/storage/storage.go
+++ b/banyand/storage/storage.go
@@ -35,6 +35,8 @@ const (
 	KVTypeNormal KVType = 0
 	// KVTypeTimeSeries is a time-series KV storage
 	KVTypeTimeSeries KVType = 1
+	// KVTypeIndex is an index KV storage
+	KVTypeIndex KVType = 2
 )
 
 // Database is the storage manager which implements the physical data model
@@ -82,13 +84,16 @@ type CompressSpec struct {
 type KVSpec struct {
 	Name          string
 	Type          KVType
+	BufferSize    int64
 	CompressLevel int
 	ValueSize     int
+	FlushCallback kv.FlushCallback
 }
 
 type StoreRepo interface {
 	Reader(shard uint, name string, start, end uint64) kv.Reader
 	TimeSeriesReader(shard uint, name string, start, end uint64) kv.TimeSeriesReader
+	Index(shard uint, name string) kv.IndexStore
 }
 
 // WritePoint is a reference to a underlying area.
diff --git a/go.mod b/go.mod
index 864feed..65674a5 100644
--- a/go.mod
+++ b/go.mod
@@ -27,4 +27,4 @@ require (
 	google.golang.org/protobuf v1.27.1
 )
 
-replace github.com/dgraph-io/badger/v3 v3.2011.1 => github.com/SkyAPM/badger/v3 v3.0.0-20210808063906-49c6d778ad7d
+replace github.com/dgraph-io/badger/v3 v3.2011.1 => github.com/SkyAPM/badger/v3 v3.0.0-20210809093509-ff1b2dd81165
diff --git a/go.sum b/go.sum
index 90495e8..14301da 100644
--- a/go.sum
+++ b/go.sum
@@ -45,8 +45,8 @@ github.com/RoaringBitmap/gocroaring v0.4.0/go.mod h1:NieMwz7ZqwU2DD73/vvYwv7r4eW
 github.com/RoaringBitmap/real-roaring-datasets v0.0.0-20190726190000-eb7c87156f76/go.mod h1:oM0MHmQ3nDsq609SS36p+oYbRi16+oVvU2Bw4Ipv0SE=
 github.com/RoaringBitmap/roaring v0.9.1 h1:5PRizBmoN/PfV17nPNQou4dHQ7NcJi8FO/bihdYyCEM=
 github.com/RoaringBitmap/roaring v0.9.1/go.mod h1:h1B7iIUOmnAeb5ytYMvnHJwxMc6LUrwBnzXWRuqTQUc=
-github.com/SkyAPM/badger/v3 v3.0.0-20210808063906-49c6d778ad7d h1:6B7YHxp79aOfRuMXfWdSyNSZEDqJqSzQWWkQcmqyI9s=
-github.com/SkyAPM/badger/v3 v3.0.0-20210808063906-49c6d778ad7d/go.mod h1:dULbq6ehJ5K0cGW/1TQ9iSfUk0gbSiToDWmWmTsJ53E=
+github.com/SkyAPM/badger/v3 v3.0.0-20210809093509-ff1b2dd81165 h1:csoTNiGUMtp4H1AchgaZWJ4WY4uJQ6s+pz3sXS93jAA=
+github.com/SkyAPM/badger/v3 v3.0.0-20210809093509-ff1b2dd81165/go.mod h1:dULbq6ehJ5K0cGW/1TQ9iSfUk0gbSiToDWmWmTsJ53E=
 github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
 github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o=
 github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8=
diff --git a/pkg/posting/posting.go b/pkg/posting/posting.go
index 2d7ad16..d997ab0 100644
--- a/pkg/posting/posting.go
+++ b/pkg/posting/posting.go
@@ -60,6 +60,10 @@ type List interface {
 	Reset()
 
 	ToSlice() []common.ChunkID
+
+	Marshall() ([]byte, error)
+
+	Unmarshall(data []byte) error
 }
 
 type Iterator interface {
diff --git a/pkg/posting/roaring/roaring.go b/pkg/posting/roaring/roaring.go
index 0b1c9b6..63eb2bc 100644
--- a/pkg/posting/roaring/roaring.go
+++ b/pkg/posting/roaring/roaring.go
@@ -40,6 +40,14 @@ type postingsList struct {
 	bitmap *roaring64.Bitmap
 }
 
+func (p *postingsList) Marshall() ([]byte, error) {
+	return p.bitmap.MarshalBinary()
+}
+
+func (p *postingsList) Unmarshall(data []byte) error {
+	return p.bitmap.UnmarshalBinary(data)
+}
+
 func NewPostingList() posting.List {
 	return &postingsList{
 		bitmap: roaring64.New(),