You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ha...@apache.org on 2021/08/10 01:57:36 UTC

[skywalking-banyandb] 03/03: Introduce index store

This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch storage-table
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit 2424a8236c656da3395f6c59b8f0a5d20e5c181a
Author: Gao Hongtao <ha...@gmail.com>
AuthorDate: Tue Aug 10 09:56:28 2021 +0800

    Introduce index store
    
    Signed-off-by: Gao Hongtao <ha...@gmail.com>
---
 banyand/kv/badger.go             | 103 +++++++++++++++++++++++++--
 banyand/kv/kv.go                 |  78 +++++++++++++++++++--
 banyand/storage/block.go         |  36 +++++++---
 banyand/storage/database.go      |   9 +++
 banyand/storage/database_test.go | 145 ++++++++++++++++++++++++++++++++++++---
 banyand/storage/storage.go       |   5 ++
 go.mod                           |   2 +-
 go.sum                           |   4 +-
 pkg/posting/posting.go           |   4 ++
 pkg/posting/roaring/roaring.go   |   8 +++
 10 files changed, 365 insertions(+), 29 deletions(-)

diff --git a/banyand/kv/badger.go b/banyand/kv/badger.go
index 05b4e51..64c9f29 100644
--- a/banyand/kv/badger.go
+++ b/banyand/kv/badger.go
@@ -18,16 +18,27 @@
 package kv
 
 import (
+	"bytes"
 	"log"
 	"math"
+	"time"
 
 	"github.com/dgraph-io/badger/v3"
 	"github.com/dgraph-io/badger/v3/y"
+	"go.uber.org/multierr"
 
 	"github.com/apache/skywalking-banyandb/pkg/logger"
+	"github.com/apache/skywalking-banyandb/pkg/posting"
+	"github.com/apache/skywalking-banyandb/pkg/posting/roaring"
 )
 
-var _ TimeSeriesStore = (*badgerTSS)(nil)
+var (
+	_             Store           = (*badgerDB)(nil)
+	_             IndexStore      = (*badgerDB)(nil)
+	_             y.Iterator      = (*mergedIter)(nil)
+	_             TimeSeriesStore = (*badgerTSS)(nil)
+	bitMergeEntry byte            = 1 << 3
+)
 
 type badgerTSS struct {
 	shardID int
@@ -43,14 +54,98 @@ func (b *badgerTSS) Close() error {
 	return nil
 }
 
-var _ Store = (*badgerDB)(nil)
+type mergedIter struct {
+	delegated Iterator
+	valid     bool
+	data      []byte
+}
+
+func (i *mergedIter) Next() {
+	i.delegated.Next()
+	i.parseData()
+}
+
+func (i *mergedIter) Rewind() {
+	i.delegated.Rewind()
+	i.parseData()
+}
+
+func (i *mergedIter) Seek(key []byte) {
+	i.delegated.Seek(y.KeyWithTs(key, math.MaxInt64))
+}
+
+func (i *mergedIter) Key() []byte {
+	return y.KeyWithTs(i.delegated.Key(), uint64(time.Now().UnixNano()))
+}
+
+func (i *mergedIter) Valid() bool {
+	return i.valid
+}
+
+func (i *mergedIter) parseData() {
+	i.data = nil
+	i.valid = i.delegated.Valid()
+	if !i.valid {
+		return
+	}
+	data, err := i.delegated.Val().Marshall()
+	if err != nil {
+		i.valid = false
+		return
+	}
+	i.data = data
+}
+
+func (i *mergedIter) Close() error {
+	i.data = nil
+	i.valid = false
+	return i.delegated.Close()
+}
+
+func (i mergedIter) Value() y.ValueStruct {
+	return y.ValueStruct{
+		Value: i.data,
+		Meta:  bitMergeEntry,
+	}
+}
 
 type badgerDB struct {
 	shardID int
 	dbOpts  badger.Options
 	db      *badger.DB
-	seqKey  string
-	seq     *badger.Sequence
+}
+
+func (b *badgerDB) Handover(iterator Iterator) error {
+	return b.db.HandoverIterator(&mergedIter{
+		delegated: iterator,
+	})
+}
+
+func (b *badgerDB) Seek(key []byte, limit int) (posting.List, error) {
+	opts := badger.DefaultIteratorOptions
+	it := b.db.NewIterator(opts)
+	defer func() {
+		_ = it.Close()
+	}()
+	result := roaring.NewPostingList()
+	var errMerged error
+	for it.Seek(y.KeyWithTs(key, math.MaxInt64)); it.Valid(); it.Next() {
+		k := y.ParseKey(it.Key())
+		if !bytes.Equal(key, k) {
+			break
+		}
+		list := roaring.NewPostingList()
+		err := list.Unmarshall(it.Value().Value)
+		if err != nil {
+			errMerged = multierr.Append(errMerged, err)
+			continue
+		}
+		_ = result.Union(list)
+		if result.Len() > limit {
+			break
+		}
+	}
+	return result, errMerged
 }
 
 func (b *badgerDB) Scan(key []byte, opt ScanOpts, f ScanFunc) error {
diff --git a/banyand/kv/kv.go b/banyand/kv/kv.go
index f3a2d55..67d6938 100644
--- a/banyand/kv/kv.go
+++ b/banyand/kv/kv.go
@@ -25,6 +25,7 @@ import (
 	"github.com/pkg/errors"
 
 	"github.com/apache/skywalking-banyandb/pkg/logger"
+	"github.com/apache/skywalking-banyandb/pkg/posting"
 )
 
 var (
@@ -95,6 +96,24 @@ func TSSWithLogger(l *logger.Logger) TimeSeriesOptions {
 	}
 }
 
+type Iterator interface {
+	Next()
+	Rewind()
+	Seek(key []byte)
+	Key() []byte
+	Val() posting.List
+	Valid() bool
+	Close() error
+}
+
+type HandoverCallback func()
+
+type IndexStore interface {
+	Handover(iterator Iterator) error
+	Seek(key []byte, limit int) (posting.List, error)
+	Close() error
+}
+
 // OpenTimeSeriesStore creates a new TimeSeriesStore
 func OpenTimeSeriesStore(shardID int, path string, compressLevel int, valueSize int, options ...TimeSeriesOptions) (TimeSeriesStore, error) {
 	btss := new(badgerTSS)
@@ -104,6 +123,8 @@ func OpenTimeSeriesStore(shardID int, path string, compressLevel int, valueSize
 		opt(btss)
 	}
 	btss.dbOpts = btss.dbOpts.WithMaxLevels(1)
+	// Put all values into LSM
+	btss.dbOpts = btss.dbOpts.WithVLogPercentile(1.0)
 	var err error
 	btss.db, err = badger.Open(btss.dbOpts)
 	if err != nil {
@@ -126,6 +147,26 @@ func StoreWithLogger(l *logger.Logger) StoreOptions {
 	}
 }
 
+// StoreWithBufferSize sets a external logger into underlying Store
+func StoreWithBufferSize(size int64) StoreOptions {
+	return func(store Store) {
+		if bdb, ok := store.(*badgerDB); ok {
+			bdb.dbOpts = bdb.dbOpts.WithMemTableSize(size)
+		}
+	}
+}
+
+type FlushCallback func()
+
+// StoreWithFlushCallback sets a callback function
+func StoreWithFlushCallback(callback FlushCallback) StoreOptions {
+	return func(store Store) {
+		if bdb, ok := store.(*badgerDB); ok {
+			bdb.dbOpts.FlushCallBack = callback
+		}
+	}
+}
+
 // OpenStore creates a new Store
 func OpenStore(shardID int, path string, options ...StoreOptions) (Store, error) {
 	bdb := new(badgerDB)
@@ -139,13 +180,40 @@ func OpenStore(shardID int, path string, options ...StoreOptions) (Store, error)
 	var err error
 	bdb.db, err = badger.Open(bdb.dbOpts)
 	if err != nil {
-		return nil, fmt.Errorf("failed to open time series store: %v", err)
+		return nil, fmt.Errorf("failed to open normal store: %v", err)
 	}
-	if bdb.seqKey != "" {
-		bdb.seq, err = bdb.db.GetSequence([]byte(bdb.seqKey), 100)
-		if err != nil {
-			return nil, fmt.Errorf("failed to get sequence: %v", err)
+	return bdb, nil
+}
+
+type IndexOptions func(store IndexStore)
+
+// IndexWithLogger sets a external logger into underlying IndexStore
+func IndexWithLogger(l *logger.Logger) IndexOptions {
+	return func(store IndexStore) {
+		if bdb, ok := store.(*badgerDB); ok {
+			bdb.dbOpts = bdb.dbOpts.WithLogger(&badgerLog{
+				delegated: l.Named("index-kv"),
+			})
 		}
 	}
+}
+
+// OpenIndexStore creates a new IndexStore
+func OpenIndexStore(shardID int, path string, options ...IndexOptions) (IndexStore, error) {
+	bdb := new(badgerDB)
+	bdb.shardID = shardID
+	bdb.dbOpts = badger.DefaultOptions(path)
+	for _, opt := range options {
+		opt(bdb)
+	}
+	bdb.dbOpts = bdb.dbOpts.WithMaxLevels(1)
+	// Put all values into LSM
+	bdb.dbOpts = bdb.dbOpts.WithVLogPercentile(1.0)
+
+	var err error
+	bdb.db, err = badger.Open(bdb.dbOpts)
+	if err != nil {
+		return nil, fmt.Errorf("failed to index store: %v", err)
+	}
 	return bdb, nil
 }
diff --git a/banyand/storage/block.go b/banyand/storage/block.go
index 8bbf50f..76a8b1f 100644
--- a/banyand/storage/block.go
+++ b/banyand/storage/block.go
@@ -30,8 +30,9 @@ type block struct {
 
 	l *logger.Logger
 
-	stores   map[string]kv.Store
-	tsStores map[string]kv.TimeSeriesStore
+	stores      map[string]kv.Store
+	tsStores    map[string]kv.TimeSeriesStore
+	indexStores map[string]kv.IndexStore
 
 	shardID int
 }
@@ -39,12 +40,13 @@ type block struct {
 func newBlock(shardID int, path string, plugin Plugin) (*block, error) {
 	l := logger.GetLogger("block")
 	return &block{
-		shardID:  shardID,
-		path:     path,
-		plugin:   plugin,
-		l:        l,
-		stores:   make(map[string]kv.Store),
-		tsStores: make(map[string]kv.TimeSeriesStore),
+		shardID:     shardID,
+		path:        path,
+		plugin:      plugin,
+		l:           l,
+		stores:      make(map[string]kv.Store),
+		tsStores:    make(map[string]kv.TimeSeriesStore),
+		indexStores: make(map[string]kv.IndexStore),
 	}, nil
 }
 
@@ -69,6 +71,12 @@ func (b *block) createKV(defines []KVSpec) (err error) {
 		case KVTypeNormal:
 			var s kv.Store
 			opts := make([]kv.StoreOptions, 0)
+			if define.BufferSize > 0 {
+				opts = append(opts, kv.StoreWithBufferSize(define.BufferSize))
+			}
+			if define.FlushCallback != nil {
+				opts = append(opts, kv.StoreWithFlushCallback(define.FlushCallback))
+			}
 			opts = append(opts, kv.StoreWithLogger(b.l))
 			if s, err = kv.OpenStore(b.shardID, path, opts...); err != nil {
 				return fmt.Errorf("failed to open normal store: %w", err)
@@ -76,10 +84,17 @@ func (b *block) createKV(defines []KVSpec) (err error) {
 			b.stores[storeID] = s
 		case KVTypeTimeSeries:
 			var s kv.TimeSeriesStore
-			if s, err = kv.OpenTimeSeriesStore(b.shardID, path, define.CompressLevel, define.ValueSize, kv.TSSWithLogger(b.l)); err != nil {
+			if s, err = kv.OpenTimeSeriesStore(b.shardID, path, define.CompressLevel, define.ValueSize,
+				kv.TSSWithLogger(b.l)); err != nil {
 				return fmt.Errorf("failed to open time series store: %w", err)
 			}
 			b.tsStores[storeID] = s
+		case KVTypeIndex:
+			var s kv.IndexStore
+			if s, err = kv.OpenIndexStore(b.shardID, path, kv.IndexWithLogger(b.l)); err != nil {
+				return fmt.Errorf("failed to open time series store: %w", err)
+			}
+			b.indexStores[storeID] = s
 		}
 	}
 	return nil
@@ -92,4 +107,7 @@ func (b *block) close() {
 	for _, store := range b.tsStores {
 		_ = store.Close()
 	}
+	for _, store := range b.indexStores {
+		_ = store.Close()
+	}
 }
diff --git a/banyand/storage/database.go b/banyand/storage/database.go
index f69af0f..9302ab0 100644
--- a/banyand/storage/database.go
+++ b/banyand/storage/database.go
@@ -127,6 +127,15 @@ func (s *series) TimeSeriesReader(shard uint, name string, start, end uint64) kv
 	return b.tsStores[name]
 }
 
+func (s *series) Index(shard uint, name string) kv.IndexStore {
+	//TODO: find targets in all blocks
+	b, ok := s.sLst[shard].activeBlock.Load().(*block)
+	if !ok {
+		return nil
+	}
+	return b.indexStores[name]
+}
+
 func (s *series) load(meta PluginMeta) error {
 	//TODO: to implement load instead of removing old contents
 	return os.RemoveAll(s.location)
diff --git a/banyand/storage/database_test.go b/banyand/storage/database_test.go
index 754c25b..89c910e 100644
--- a/banyand/storage/database_test.go
+++ b/banyand/storage/database_test.go
@@ -29,7 +29,12 @@ import (
 	"github.com/stretchr/testify/assert"
 	"github.com/stretchr/testify/require"
 
+	"github.com/apache/skywalking-banyandb/api/common"
+	"github.com/apache/skywalking-banyandb/banyand/kv"
+	"github.com/apache/skywalking-banyandb/pkg/convert"
 	"github.com/apache/skywalking-banyandb/pkg/logger"
+	"github.com/apache/skywalking-banyandb/pkg/posting"
+	"github.com/apache/skywalking-banyandb/pkg/posting/roaring"
 )
 
 func TestDB_Create_Directory(t *testing.T) {
@@ -59,6 +64,7 @@ func TestDB_Create_Directory(t *testing.T) {
 }
 
 func TestDB_Store(t *testing.T) {
+	is := require.New(t)
 	ctrl := gomock.NewController(t)
 	defer ctrl.Finish()
 	now := uint64(time.Now().UnixNano())
@@ -75,18 +81,137 @@ func TestDB_Store(t *testing.T) {
 		removeDir(tempDir)
 	}()
 
-	assert.NoError(t, ap.Writer(0, "normal").Put([]byte("key1"), []byte{12}))
+	is.NoError(ap.Writer(0, "normal").Put([]byte("key1"), []byte{12}))
 	val, err := repo.Reader(0, "normal", now, now).Get([]byte("key1"))
-	assert.NoError(t, err)
-	assert.Equal(t, []byte{12}, val)
+	is.NoError(err)
+	is.Equal([]byte{12}, val)
 
-	assert.NoError(t, ap.TimeSeriesWriter(1, "time-series").Put([]byte("key11"), []byte{33}, 1))
+	is.NoError(ap.TimeSeriesWriter(1, "time-series").Put([]byte("key11"), []byte{33}, 1))
 	val, err = repo.TimeSeriesReader(1, "time-series", now, now).Get([]byte("key11"), 1)
-	assert.NoError(t, err)
-	assert.Equal(t, []byte{33}, val)
+	is.NoError(err)
+	is.Equal([]byte{33}, val)
 	vals, allErr := repo.TimeSeriesReader(1, "time-series", now, now).GetAll([]byte("key11"))
-	assert.NoError(t, allErr)
-	assert.Equal(t, [][]byte{{33}}, vals)
+	is.NoError(allErr)
+	is.Equal([][]byte{{33}}, vals)
+
+	index := repo.Index(1, "index")
+	is.NoError(index.Handover(mockMemtable([]uint64{1, 2}, []uint64{3, 6})))
+	list, err := index.Seek(convert.Int64ToBytes(0), 2)
+	is.NoError(err)
+	is.Equal(2, list.Len())
+	is.True(list.Contains(common.ChunkID(1)))
+	is.True(list.Contains(common.ChunkID(2)))
+	list, err = index.Seek(convert.Int64ToBytes(1), 2)
+	is.NoError(err)
+	is.Equal(2, list.Len())
+	is.True(list.Contains(common.ChunkID(3)))
+	is.True(list.Contains(common.ChunkID(6)))
+
+	is.NoError(index.Handover(mockMemtable([]uint64{11, 14})))
+	list, err = index.Seek(convert.Int64ToBytes(0), 2)
+	is.NoError(err)
+	is.Equal(4, list.Len())
+	is.True(list.Contains(common.ChunkID(1)))
+	is.True(list.Contains(common.ChunkID(2)))
+	is.True(list.Contains(common.ChunkID(11)))
+	is.True(list.Contains(common.ChunkID(14)))
+}
+
+func TestDB_FlushCallback(t *testing.T) {
+	is := require.New(t)
+	ctrl := gomock.NewController(t)
+	defer ctrl.Finish()
+	now := uint64(time.Now().UnixNano())
+	var ap WritePoint
+	//var repo StoreRepo
+	p := NewMockPlugin(ctrl)
+	latch := make(chan struct{})
+	closed := false
+	p.EXPECT().Meta().Return(PluginMeta{
+		ID:          "sw",
+		Group:       "default",
+		ShardNumber: 2,
+		KVSpecs: []KVSpec{
+			{
+				Name:       "normal",
+				Type:       KVTypeNormal,
+				BufferSize: 10 << 20,
+				FlushCallback: func() {
+					if closed {
+						return
+					}
+					close(latch)
+					closed = true
+				},
+			},
+		},
+	}).AnyTimes()
+	p.EXPECT().Init(gomock.Any(), gomock.Any()).Do(func(r StoreRepo, wp GetWritePoint) {
+		ap = wp(now)
+	}).AnyTimes()
+
+	tempDir, db := setUp(t, p)
+	defer func() {
+		db.GracefulStop()
+		removeDir(tempDir)
+	}()
+	for i := 0; i < 5000; i++ {
+		key := make([]byte, i)
+		_ = ap.Writer(0, "normal").Put(key, []byte{1})
+	}
+	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
+	defer cancel()
+	select {
+	case <-latch:
+	case <-ctx.Done():
+		is.Fail("timeout")
+	}
+}
+
+var _ kv.Iterator = (*iter)(nil)
+
+type iter struct {
+	data map[int]posting.List
+	p    int
+}
+
+func (i *iter) Next() {
+	i.p++
+}
+
+func (i *iter) Rewind() {
+	i.p = 0
+}
+
+func (i *iter) Seek(key []byte) {
+	panic("implement me")
+}
+
+func (i *iter) Key() []byte {
+	return convert.Int64ToBytes(int64(i.p))
+}
+
+func (i *iter) Val() posting.List {
+	return i.data[i.p]
+}
+
+func (i *iter) Valid() bool {
+	_, ok := i.data[i.p]
+	return ok
+}
+
+func (i *iter) Close() error {
+	return nil
+}
+
+func mockMemtable(data ...[]uint64) kv.Iterator {
+	it := &iter{
+		data: make(map[int]posting.List),
+	}
+	for i, d := range data {
+		it.data[i] = roaring.NewPostingListWithInitialData(d...)
+	}
+	return it
 }
 
 func mockPlugin(ctrl *gomock.Controller, f func(repo StoreRepo, get GetWritePoint)) Plugin {
@@ -105,6 +230,10 @@ func mockPlugin(ctrl *gomock.Controller, f func(repo StoreRepo, get GetWritePoin
 				Type:          KVTypeTimeSeries,
 				CompressLevel: 3,
 			},
+			{
+				Name: "index",
+				Type: KVTypeIndex,
+			},
 		},
 	}).AnyTimes()
 	p.EXPECT().Init(gomock.Any(), gomock.Any()).Do(func(r StoreRepo, wp GetWritePoint) {
diff --git a/banyand/storage/storage.go b/banyand/storage/storage.go
index 8ac58ec..6219842 100644
--- a/banyand/storage/storage.go
+++ b/banyand/storage/storage.go
@@ -35,6 +35,8 @@ const (
 	KVTypeNormal KVType = 0
 	// KVTypeTimeSeries is a time-series KV storage
 	KVTypeTimeSeries KVType = 1
+	// KVTypeIndex is an index KV storage
+	KVTypeIndex KVType = 2
 )
 
 // Database is the storage manager which implements the physical data model
@@ -82,13 +84,16 @@ type CompressSpec struct {
 type KVSpec struct {
 	Name          string
 	Type          KVType
+	BufferSize    int64
 	CompressLevel int
 	ValueSize     int
+	FlushCallback kv.FlushCallback
 }
 
 type StoreRepo interface {
 	Reader(shard uint, name string, start, end uint64) kv.Reader
 	TimeSeriesReader(shard uint, name string, start, end uint64) kv.TimeSeriesReader
+	Index(shard uint, name string) kv.IndexStore
 }
 
 // WritePoint is a reference to a underlying area.
diff --git a/go.mod b/go.mod
index 864feed..65674a5 100644
--- a/go.mod
+++ b/go.mod
@@ -27,4 +27,4 @@ require (
 	google.golang.org/protobuf v1.27.1
 )
 
-replace github.com/dgraph-io/badger/v3 v3.2011.1 => github.com/SkyAPM/badger/v3 v3.0.0-20210808063906-49c6d778ad7d
+replace github.com/dgraph-io/badger/v3 v3.2011.1 => github.com/SkyAPM/badger/v3 v3.0.0-20210809093509-ff1b2dd81165
diff --git a/go.sum b/go.sum
index 90495e8..14301da 100644
--- a/go.sum
+++ b/go.sum
@@ -45,8 +45,8 @@ github.com/RoaringBitmap/gocroaring v0.4.0/go.mod h1:NieMwz7ZqwU2DD73/vvYwv7r4eW
 github.com/RoaringBitmap/real-roaring-datasets v0.0.0-20190726190000-eb7c87156f76/go.mod h1:oM0MHmQ3nDsq609SS36p+oYbRi16+oVvU2Bw4Ipv0SE=
 github.com/RoaringBitmap/roaring v0.9.1 h1:5PRizBmoN/PfV17nPNQou4dHQ7NcJi8FO/bihdYyCEM=
 github.com/RoaringBitmap/roaring v0.9.1/go.mod h1:h1B7iIUOmnAeb5ytYMvnHJwxMc6LUrwBnzXWRuqTQUc=
-github.com/SkyAPM/badger/v3 v3.0.0-20210808063906-49c6d778ad7d h1:6B7YHxp79aOfRuMXfWdSyNSZEDqJqSzQWWkQcmqyI9s=
-github.com/SkyAPM/badger/v3 v3.0.0-20210808063906-49c6d778ad7d/go.mod h1:dULbq6ehJ5K0cGW/1TQ9iSfUk0gbSiToDWmWmTsJ53E=
+github.com/SkyAPM/badger/v3 v3.0.0-20210809093509-ff1b2dd81165 h1:csoTNiGUMtp4H1AchgaZWJ4WY4uJQ6s+pz3sXS93jAA=
+github.com/SkyAPM/badger/v3 v3.0.0-20210809093509-ff1b2dd81165/go.mod h1:dULbq6ehJ5K0cGW/1TQ9iSfUk0gbSiToDWmWmTsJ53E=
 github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
 github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o=
 github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8=
diff --git a/pkg/posting/posting.go b/pkg/posting/posting.go
index 2d7ad16..d997ab0 100644
--- a/pkg/posting/posting.go
+++ b/pkg/posting/posting.go
@@ -60,6 +60,10 @@ type List interface {
 	Reset()
 
 	ToSlice() []common.ChunkID
+
+	Marshall() ([]byte, error)
+
+	Unmarshall(data []byte) error
 }
 
 type Iterator interface {
diff --git a/pkg/posting/roaring/roaring.go b/pkg/posting/roaring/roaring.go
index 0b1c9b6..63eb2bc 100644
--- a/pkg/posting/roaring/roaring.go
+++ b/pkg/posting/roaring/roaring.go
@@ -40,6 +40,14 @@ type postingsList struct {
 	bitmap *roaring64.Bitmap
 }
 
+func (p *postingsList) Marshall() ([]byte, error) {
+	return p.bitmap.MarshalBinary()
+}
+
+func (p *postingsList) Unmarshall(data []byte) error {
+	return p.bitmap.UnmarshalBinary(data)
+}
+
 func NewPostingList() posting.List {
 	return &postingsList{
 		bitmap: roaring64.New(),