You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ha...@apache.org on 2021/09/12 13:59:06 UTC

[skywalking-banyandb] branch time-series created (now eb7429f)

This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a change to branch time-series
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git.


      at eb7429f  Fix desc scan errors

This branch includes the following new commits:

     new b47fa44  Some improvment
     new 5f5c2fa  Implement two indices
     new eb7429f  Fix desc scan errors

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


[skywalking-banyandb] 02/03: Implement two indices

Posted by ha...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch time-series
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit 5f5c2fa58b98168906b1bc7fd703289d5f485762
Author: Gao Hongtao <ha...@gmail.com>
AuthorDate: Sun Sep 12 20:43:11 2021 +0800

    Implement two indices
    
    Signed-off-by: Gao Hongtao <ha...@gmail.com>
---
 banyand/index/index_test.go                    |   2 +-
 banyand/kv/badger.go                           | 114 +--------
 banyand/kv/kv.go                               |  19 +-
 banyand/storage/database_test.go               | 259 ++++++++++---------
 banyand/stream/stream_query_test.go            |  54 ++--
 banyand/stream/stream_write.go                 |   1 +
 banyand/stream/stream_write_test.go            |   2 +-
 banyand/tsdb/block.go                          |  40 +--
 banyand/tsdb/series_seek.go                    |  15 +-
 banyand/tsdb/series_seek_filter.go             |   7 +
 banyand/tsdb/series_seek_sort.go               |  42 +++-
 banyand/tsdb/series_write.go                   |   4 +-
 pkg/index/index.go                             |  86 ++++++-
 pkg/index/inverted/field_map.go                |  26 +-
 pkg/index/inverted/inverted.go                 | 199 ++++++++++++++-
 pkg/index/inverted/inverted_test.go            | 142 +++++++++++
 pkg/index/inverted/mem.go                      | 173 +++++++++----
 pkg/index/inverted/mem_test.go                 | 237 +-----------------
 pkg/index/inverted/term_map.go                 | 101 +-------
 pkg/index/iterator.go                          | 209 ++++++++++++++++
 pkg/index/{inverted/inverted.go => lsm/lsm.go} |  38 +--
 pkg/index/lsm/lsm_test.go                      |  69 ++++++
 pkg/index/lsm/search.go                        |  84 +++++++
 pkg/index/posting/roaring/roaring.go           |   8 +
 pkg/index/test_cases/duration.go               | 329 +++++++++++++++++++++++++
 pkg/index/test_cases/service_name.go           | 100 ++++++++
 pkg/index/{search.go => tree.go}               |  61 +++--
 27 files changed, 1677 insertions(+), 744 deletions(-)

diff --git a/banyand/index/index_test.go b/banyand/index/index_test.go
index fde4e9f..7730f91 100644
--- a/banyand/index/index_test.go
+++ b/banyand/index/index_test.go
@@ -116,7 +116,7 @@ func Test_service_Insert(t *testing.T) {
 		t.Run(tt.name, func(t *testing.T) {
 			s := setUpModules(tester)
 			if err := s.Insert(tt.args.series, tt.args.shardID, tt.args.field); (err != nil) != tt.wantErr {
-				t.Errorf("Insert() error = %v, wantErr %v", err, tt.wantErr)
+				t.Errorf("Write() error = %v, wantErr %v", err, tt.wantErr)
 			}
 		})
 	}
diff --git a/banyand/kv/badger.go b/banyand/kv/badger.go
index a3767d5..92a8529 100644
--- a/banyand/kv/badger.go
+++ b/banyand/kv/badger.go
@@ -25,17 +25,8 @@ import (
 
 	"github.com/dgraph-io/badger/v3"
 	"github.com/dgraph-io/badger/v3/y"
-	"go.uber.org/multierr"
-
-	"github.com/apache/skywalking-banyandb/api/common"
-	modelv2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v2"
-	"github.com/apache/skywalking-banyandb/pkg/convert"
-	"github.com/apache/skywalking-banyandb/pkg/index"
-	"github.com/apache/skywalking-banyandb/pkg/index/posting"
-	"github.com/apache/skywalking-banyandb/pkg/index/posting/roaring"
+
 	"github.com/apache/skywalking-banyandb/pkg/logger"
-	posting2 "github.com/apache/skywalking-banyandb/pkg/posting"
-	roaring2 "github.com/apache/skywalking-banyandb/pkg/posting/roaring"
 )
 
 var (
@@ -62,7 +53,7 @@ func (b *badgerTSS) Close() error {
 }
 
 type mergedIter struct {
-	delegated Iterator2
+	delegated Iterator
 	valid     bool
 	data      []byte
 }
@@ -95,12 +86,7 @@ func (i *mergedIter) parseData() {
 	if !i.valid {
 		return
 	}
-	data, err := i.delegated.Val().Marshall()
-	if err != nil {
-		i.valid = false
-		return
-	}
-	i.data = data
+	i.data = i.delegated.Val()
 }
 
 func (i *mergedIter) Close() error {
@@ -122,39 +108,12 @@ type badgerDB struct {
 	db      *badger.DB
 }
 
-func (b *badgerDB) Handover(iterator Iterator2) error {
+func (b *badgerDB) Handover(iterator Iterator) error {
 	return b.db.HandoverIterator(&mergedIter{
 		delegated: iterator,
 	})
 }
 
-func (b *badgerDB) Seek(key []byte, limit int) (posting2.List, error) {
-	opts := badger.DefaultIteratorOptions
-	it := b.db.NewIterator(opts)
-	defer func() {
-		_ = it.Close()
-	}()
-	result := roaring2.NewPostingList()
-	var errMerged error
-	for it.Seek(y.KeyWithTs(key, math.MaxInt64)); it.Valid(); it.Next() {
-		k := y.ParseKey(it.Key())
-		if !bytes.Equal(key, k) {
-			break
-		}
-		list := roaring2.NewPostingList()
-		err := list.Unmarshall(it.Value().Value)
-		if err != nil {
-			errMerged = multierr.Append(errMerged, err)
-			continue
-		}
-		_ = result.Union(list)
-		if result.Len() > limit {
-			break
-		}
-	}
-	return result, errMerged
-}
-
 func (b *badgerDB) Scan(key []byte, opt ScanOpts, f ScanFunc) error {
 	opts := badger.DefaultIteratorOptions
 	opts.PrefetchSize = opt.PrefetchSize
@@ -194,7 +153,7 @@ func (i *iterator) Rewind() {
 }
 
 func (i *iterator) Seek(key []byte) {
-	i.delegated.Seek(key)
+	i.delegated.Seek(y.KeyWithTs(key, math.MaxInt64))
 }
 
 func (i *iterator) Key() []byte {
@@ -254,7 +213,7 @@ func (b *badgerDB) Get(key []byte) ([]byte, error) {
 func (b *badgerDB) GetAll(key []byte, applyFn func([]byte) error) error {
 	iter := b.db.NewIterator(badger.DefaultIteratorOptions)
 	var count int
-	for iter.Seek(key); iter.Valid(); iter.Next() {
+	for iter.Seek(y.KeyWithTs(key, math.MaxInt64)); iter.Valid(); iter.Next() {
 		if !bytes.Equal(y.ParseKey(iter.Key()), key) {
 			break
 		}
@@ -270,67 +229,6 @@ func (b *badgerDB) GetAll(key []byte, applyFn func([]byte) error) error {
 	return ErrKeyNotFound
 }
 
-func (b *badgerDB) MatchField(fieldName []byte) (list posting.List) {
-	panic("implement me")
-}
-
-func (b *badgerDB) MatchTerms(field index.Field) (list posting.List) {
-	panic("implement me")
-}
-
-func (b *badgerDB) Range(fieldName []byte, opts index.RangeOpts) (list posting.List) {
-	panic("implement me")
-}
-
-var _ index.FieldIterator = (*fIterator)(nil)
-
-type fIterator struct {
-	init     bool
-	delegate Iterator
-	curr     *index.PostingValue
-}
-
-func (f *fIterator) Next() bool {
-	if !f.init {
-		f.init = true
-		f.delegate.Rewind()
-	}
-	if !f.delegate.Valid() {
-		return false
-	}
-	pv := &index.PostingValue{
-		Key:   f.delegate.Key(),
-		Value: roaring.NewPostingListWithInitialData(convert.BytesToUint64(f.delegate.Val())),
-	}
-	for ; f.delegate.Valid() && bytes.Equal(pv.Key, f.delegate.Key()); f.delegate.Next() {
-		pv.Value.Insert(common.ItemID(convert.BytesToUint64(f.delegate.Val())))
-	}
-	f.curr = pv
-	return true
-}
-
-func (f *fIterator) Val() *index.PostingValue {
-	return f.curr
-}
-
-func (f *fIterator) Close() error {
-	return f.delegate.Close()
-}
-
-func (b *badgerDB) FieldIterator(fieldName []byte, order modelv2.QueryOrder_Sort) index.FieldIterator {
-	var reverse bool
-	if order == modelv2.QueryOrder_SORT_DESC {
-		reverse = true
-	}
-	iter := b.NewIterator(ScanOpts{
-		Prefix:  fieldName,
-		Reverse: reverse,
-	})
-	return &fIterator{
-		delegate: iter,
-	}
-}
-
 // badgerLog delegates the zap log to the badger logger
 type badgerLog struct {
 	*log.Logger
diff --git a/banyand/kv/kv.go b/banyand/kv/kv.go
index 0e49715..fc829d3 100644
--- a/banyand/kv/kv.go
+++ b/banyand/kv/kv.go
@@ -25,9 +25,7 @@ import (
 	"github.com/dgraph-io/badger/v3"
 	"github.com/pkg/errors"
 
-	"github.com/apache/skywalking-banyandb/pkg/index"
 	"github.com/apache/skywalking-banyandb/pkg/logger"
-	posting2 "github.com/apache/skywalking-banyandb/pkg/posting"
 )
 
 var (
@@ -54,6 +52,7 @@ type ScanOpts struct {
 }
 
 type Reader interface {
+	Iterable
 	// Get a value by its key
 	Get(key []byte) ([]byte, error)
 	GetAll(key []byte, applyFn func([]byte) error) error
@@ -65,7 +64,6 @@ type Store interface {
 	io.Closer
 	Writer
 	Reader
-	index.Searcher
 }
 
 type TimeSeriesWriter interface {
@@ -113,21 +111,16 @@ type Iterator interface {
 	Close() error
 }
 
-type Iterator2 interface {
-	Next()
-	Rewind()
-	Seek(key []byte)
-	Key() []byte
-	Val() posting2.List
-	Valid() bool
-	Close() error
+type Iterable interface {
+	NewIterator(opt ScanOpts) Iterator
 }
 
 type HandoverCallback func()
 
 type IndexStore interface {
-	Handover(iterator Iterator2) error
-	Seek(key []byte, limit int) (posting2.List, error)
+	Iterable
+	Reader
+	Handover(iterator Iterator) error
 	Close() error
 }
 
diff --git a/banyand/storage/database_test.go b/banyand/storage/database_test.go
index 971cdf7..42893e0 100644
--- a/banyand/storage/database_test.go
+++ b/banyand/storage/database_test.go
@@ -29,12 +29,7 @@ import (
 	"github.com/stretchr/testify/assert"
 	"github.com/stretchr/testify/require"
 
-	"github.com/apache/skywalking-banyandb/api/common"
-	"github.com/apache/skywalking-banyandb/banyand/kv"
-	"github.com/apache/skywalking-banyandb/pkg/convert"
 	"github.com/apache/skywalking-banyandb/pkg/logger"
-	"github.com/apache/skywalking-banyandb/pkg/posting"
-	"github.com/apache/skywalking-banyandb/pkg/posting/roaring"
 )
 
 func TestDB_Create_Directory(t *testing.T) {
@@ -63,59 +58,60 @@ func TestDB_Create_Directory(t *testing.T) {
 	validateDirectory(t, fmt.Sprintf(blockTemplate, segPath, now.Format(blockFormat)))
 }
 
-func TestDB_Store(t *testing.T) {
-	is := require.New(t)
-	ctrl := gomock.NewController(t)
-	defer ctrl.Finish()
-	now := uint64(time.Now().UnixNano())
-	var ap WritePoint
-	var repo StoreRepo
-	p := mockPlugin(ctrl, func(r StoreRepo, get GetWritePoint) {
-		ap = get(now)
-		repo = r
-	})
-
-	tempDir, db := setUp(t, p)
-	defer func() {
-		db.GracefulStop()
-		removeDir(tempDir)
-	}()
-
-	is.NoError(ap.Writer(0, "normal").Put([]byte("key1"), []byte{12}))
-	val, err := repo.Reader(0, "normal", now, now).Get([]byte("key1"))
-	is.NoError(err)
-	is.Equal([]byte{12}, val)
-
-	is.NoError(ap.TimeSeriesWriter(1, "time-series").Put([]byte("key11"), []byte{33}, 1))
-	val, err = repo.TimeSeriesReader(1, "time-series", now, now).Get([]byte("key11"), 1)
-	is.NoError(err)
-	is.Equal([]byte{33}, val)
-	vals, allErr := repo.TimeSeriesReader(1, "time-series", now, now).GetAll([]byte("key11"))
-	is.NoError(allErr)
-	is.Equal([][]byte{{33}}, vals)
-
-	index := repo.Index(1, "index")
-	is.NoError(index.Handover(mockMemtable([]uint64{1, 2}, []uint64{3, 6})))
-	list, err := index.Seek(convert.Int64ToBytes(0), 2)
-	is.NoError(err)
-	is.Equal(2, list.Len())
-	is.True(list.Contains(common.ChunkID(1)))
-	is.True(list.Contains(common.ChunkID(2)))
-	list, err = index.Seek(convert.Int64ToBytes(1), 2)
-	is.NoError(err)
-	is.Equal(2, list.Len())
-	is.True(list.Contains(common.ChunkID(3)))
-	is.True(list.Contains(common.ChunkID(6)))
-
-	is.NoError(index.Handover(mockMemtable([]uint64{11, 14})))
-	list, err = index.Seek(convert.Int64ToBytes(0), 2)
-	is.NoError(err)
-	is.Equal(4, list.Len())
-	is.True(list.Contains(common.ChunkID(1)))
-	is.True(list.Contains(common.ChunkID(2)))
-	is.True(list.Contains(common.ChunkID(11)))
-	is.True(list.Contains(common.ChunkID(14)))
-}
+//
+//func TestDB_Store(t *testing.T) {
+//	is := require.New(t)
+//	ctrl := gomock.NewController(t)
+//	defer ctrl.Finish()
+//	now := uint64(time.Now().UnixNano())
+//	var ap WritePoint
+//	var repo StoreRepo
+//	p := mockPlugin(ctrl, func(r StoreRepo, get GetWritePoint) {
+//		ap = get(now)
+//		repo = r
+//	})
+//
+//	tempDir, db := setUp(t, p)
+//	defer func() {
+//		db.GracefulStop()
+//		removeDir(tempDir)
+//	}()
+//
+//	is.NoError(ap.Writer(0, "normal").Put([]byte("key1"), []byte{12}))
+//	val, err := repo.Reader(0, "normal", now, now).Get([]byte("key1"))
+//	is.NoError(err)
+//	is.Equal([]byte{12}, val)
+//
+//	is.NoError(ap.TimeSeriesWriter(1, "time-series").Put([]byte("key11"), []byte{33}, 1))
+//	val, err = repo.TimeSeriesReader(1, "time-series", now, now).Get([]byte("key11"), 1)
+//	is.NoError(err)
+//	is.Equal([]byte{33}, val)
+//	vals, allErr := repo.TimeSeriesReader(1, "time-series", now, now).GetAll([]byte("key11"))
+//	is.NoError(allErr)
+//	is.Equal([][]byte{{33}}, vals)
+//
+//	index := repo.Index(1, "index")
+//	is.NoError(index.Handover(mockMemtable([]uint64{1, 2}, []uint64{3, 6})))
+//	list, err := index.Seek(convert.Int64ToBytes(0), 2)
+//	is.NoError(err)
+//	is.Equal(2, list.Len())
+//	is.True(list.Contains(common.ChunkID(1)))
+//	is.True(list.Contains(common.ChunkID(2)))
+//	list, err = index.Seek(convert.Int64ToBytes(1), 2)
+//	is.NoError(err)
+//	is.Equal(2, list.Len())
+//	is.True(list.Contains(common.ChunkID(3)))
+//	is.True(list.Contains(common.ChunkID(6)))
+//
+//	is.NoError(index.Handover(mockMemtable([]uint64{11, 14})))
+//	list, err = index.Seek(convert.Int64ToBytes(0), 2)
+//	is.NoError(err)
+//	is.Equal(4, list.Len())
+//	is.True(list.Contains(common.ChunkID(1)))
+//	is.True(list.Contains(common.ChunkID(2)))
+//	is.True(list.Contains(common.ChunkID(11)))
+//	is.True(list.Contains(common.ChunkID(14)))
+//}
 
 func TestDB_FlushCallback(t *testing.T) {
 	is := require.New(t)
@@ -168,79 +164,80 @@ func TestDB_FlushCallback(t *testing.T) {
 	}
 }
 
-var _ kv.Iterator2 = (*iter)(nil)
-
-type iter struct {
-	data map[int]posting.List
-	p    int
-}
-
-func (i *iter) Next() {
-	i.p++
-}
-
-func (i *iter) Rewind() {
-	i.p = 0
-}
-
-func (i *iter) Seek(key []byte) {
-	panic("implement me")
-}
-
-func (i *iter) Key() []byte {
-	return convert.Int64ToBytes(int64(i.p))
-}
-
-func (i *iter) Val() posting.List {
-	return i.data[i.p]
-}
-
-func (i *iter) Valid() bool {
-	_, ok := i.data[i.p]
-	return ok
-}
-
-func (i *iter) Close() error {
-	return nil
-}
-
-func mockMemtable(data ...[]uint64) kv.Iterator2 {
-	it := &iter{
-		data: make(map[int]posting.List),
-	}
-	for i, d := range data {
-		it.data[i] = roaring.NewPostingListWithInitialData(d...)
-	}
-	return it
-}
-
-func mockPlugin(ctrl *gomock.Controller, f func(repo StoreRepo, get GetWritePoint)) Plugin {
-	p := NewMockPlugin(ctrl)
-	p.EXPECT().Meta().Return(PluginMeta{
-		ID:          "sw",
-		Group:       "default",
-		ShardNumber: 2,
-		KVSpecs: []KVSpec{
-			{
-				Name: "normal",
-				Type: KVTypeNormal,
-			},
-			{
-				Name:          "time-series",
-				Type:          KVTypeTimeSeries,
-				CompressLevel: 3,
-			},
-			{
-				Name: "index",
-				Type: KVTypeIndex,
-			},
-		},
-	}).AnyTimes()
-	p.EXPECT().Init(gomock.Any(), gomock.Any()).Do(func(r StoreRepo, wp GetWritePoint) {
-		f(r, wp)
-	}).AnyTimes()
-	return p
-}
+//
+//var _ kv.Iterator2 = (*iter)(nil)
+//
+//type iter struct {
+//	data map[int]posting.List
+//	p    int
+//}
+//
+//func (i *iter) Next() {
+//	i.p++
+//}
+//
+//func (i *iter) Rewind() {
+//	i.p = 0
+//}
+//
+//func (i *iter) Seek(key []byte) {
+//	panic("implement me")
+//}
+//
+//func (i *iter) Key() []byte {
+//	return convert.Int64ToBytes(int64(i.p))
+//}
+//
+//func (i *iter) Val() posting.List {
+//	return i.data[i.p]
+//}
+//
+//func (i *iter) Valid() bool {
+//	_, ok := i.data[i.p]
+//	return ok
+//}
+//
+//func (i *iter) Close() error {
+//	return nil
+//}
+//
+//func mockMemtable(data ...[]uint64) kv.Iterator2 {
+//	it := &iter{
+//		data: make(map[int]posting.List),
+//	}
+//	for i, d := range data {
+//		it.data[i] = roaring.NewPostingListWithInitialData(d...)
+//	}
+//	return it
+//}
+//
+//func mockPlugin(ctrl *gomock.Controller, f func(repo StoreRepo, get GetWritePoint)) Plugin {
+//	p := NewMockPlugin(ctrl)
+//	p.EXPECT().Meta().Return(PluginMeta{
+//		ID:          "sw",
+//		Group:       "default",
+//		ShardNumber: 2,
+//		KVSpecs: []KVSpec{
+//			{
+//				Name: "normal",
+//				Type: KVTypeNormal,
+//			},
+//			{
+//				Name:          "time-series",
+//				Type:          KVTypeTimeSeries,
+//				CompressLevel: 3,
+//			},
+//			{
+//				Name: "index",
+//				Type: KVTypeIndex,
+//			},
+//		},
+//	}).AnyTimes()
+//	p.EXPECT().Init(gomock.Any(), gomock.Any()).Do(func(r StoreRepo, wp GetWritePoint) {
+//		f(r, wp)
+//	}).AnyTimes()
+//	return p
+//}
 
 func setUp(t *testing.T, p Plugin) (tempDir string, db Database) {
 	require.NoError(t, logger.Init(logger.Logging{
diff --git a/banyand/stream/stream_query_test.go b/banyand/stream/stream_query_test.go
index a060ff3..f02921b 100644
--- a/banyand/stream/stream_query_test.go
+++ b/banyand/stream/stream_query_test.go
@@ -19,6 +19,7 @@ package stream
 
 import (
 	"bytes"
+	"context"
 	"embed"
 	_ "embed"
 	"encoding/base64"
@@ -33,6 +34,7 @@ import (
 	"github.com/golang/protobuf/jsonpb"
 	"github.com/pkg/errors"
 	"github.com/stretchr/testify/assert"
+	"github.com/stretchr/testify/require"
 	"google.golang.org/protobuf/types/known/timestamppb"
 
 	"github.com/apache/skywalking-banyandb/api/common"
@@ -58,7 +60,7 @@ func Test_Stream_SelectShard(t *testing.T) {
 	tester := assert.New(t)
 	s, deferFunc := setup(tester)
 	defer deferFunc()
-	_ = setupQueryData(tester, "multiple_shards.json", s)
+	_ = setupQueryData(t, "multiple_shards.json", s)
 	tests := []struct {
 		name         string
 		entity       tsdb.Entity
@@ -99,7 +101,7 @@ func Test_Stream_Series(t *testing.T) {
 	tester := assert.New(t)
 	s, deferFunc := setup(tester)
 	defer deferFunc()
-	baseTime := setupQueryData(tester, "multiple_shards.json", s)
+	baseTime := setupQueryData(t, "multiple_shards.json", s)
 	tests := []struct {
 		name    string
 		args    queryOpts
@@ -315,7 +317,7 @@ func Test_Stream_Global_Index(t *testing.T) {
 	tester := assert.New(t)
 	s, deferFunc := setup(tester)
 	defer deferFunc()
-	_ = setupQueryData(tester, "global_index.json", s)
+	_ = setupQueryData(t, "global_index.json", s)
 	tests := []struct {
 		name                string
 		traceID             string
@@ -477,7 +479,8 @@ func queryData(tester *assert.Assertions, s *stream, opts queryOpts) (shardsForT
 //go:embed testdata/*.json
 var dataFS embed.FS
 
-func setupQueryData(t *assert.Assertions, dataFile string, stream *stream) (baseTime time.Time) {
+func setupQueryData(testing *testing.T, dataFile string, stream *stream) (baseTime time.Time) {
+	t := assert.New(testing)
 	var templates []interface{}
 	baseTime = time.Now()
 	content, err := dataFS.ReadFile("testdata/" + dataFile)
@@ -509,24 +512,37 @@ func setupQueryData(t *assert.Assertions, dataFile string, stream *stream) (base
 		t.NoError(err)
 		shardID, err := partition.ShardID(entity.Marshal(), stream.schema.GetShardNum())
 		t.NoError(err)
-		itemID, err := stream.write(common.ShardID(shardID), e)
+		_, err = stream.write(common.ShardID(shardID), e)
 		t.NoError(err)
-		sa, err := stream.Shards(entity)
-		t.NoError(err)
-		for _, shard := range sa {
-			se, err := shard.Series().Get(entity)
-			t.NoError(err)
-			for {
-				item, closer, _ := se.Get(*itemID)
-				rawTagFamily, _ := item.Val("searchable")
-				if len(rawTagFamily) > 0 {
-					_ = closer.Close()
-					break
+	}
+	ctx, cancelFunc := context.WithTimeout(context.Background(), 10*time.Second)
+	defer cancelFunc()
+	err = ready(ctx, t, stream, queryOpts{
+		entity:    tsdb.Entity{tsdb.AnyEntry, tsdb.AnyEntry, tsdb.AnyEntry},
+		timeRange: tsdb.NewTimeRangeDuration(baseTime, 1*time.Hour),
+	})
+	require.NoError(testing, err)
+	return baseTime
+}
+
+func ready(ctx context.Context, t *assert.Assertions, stream *stream, options queryOpts) error {
+	for {
+	loop:
+		select {
+		case <-ctx.Done():
+			return ctx.Err()
+		default:
+			data, err := queryData(t, stream, options)
+			if err != nil {
+				return err
+			}
+			for _, d := range data {
+				if len(d.elements) < 1 {
+					time.Sleep(300 * time.Millisecond)
+					break loop
 				}
-				_ = closer.Close()
 			}
-
+			return nil
 		}
 	}
-	return baseTime
 }
diff --git a/banyand/stream/stream_write.go b/banyand/stream/stream_write.go
index fa80dc3..9ce2732 100644
--- a/banyand/stream/stream_write.go
+++ b/banyand/stream/stream_write.go
@@ -102,6 +102,7 @@ func (s *stream) write(shardID common.ShardID, value *streamv2.ElementValue) (*t
 			Int("ts_nano", t.Nanosecond()).
 			Interface("data", value).
 			Uint64("series_id", uint64(series.ID())).
+			Int("shard_id", int(shardID)).
 			Msg("write stream")
 		return writer, errWrite
 	}
diff --git a/banyand/stream/stream_write_test.go b/banyand/stream/stream_write_test.go
index 0ffe0d5..b0d8880 100644
--- a/banyand/stream/stream_write_test.go
+++ b/banyand/stream/stream_write_test.go
@@ -211,7 +211,7 @@ func Test_Stream_Write(t *testing.T) {
 func setup(t *assert.Assertions) (*stream, func()) {
 	t.NoError(logger.Init(logger.Logging{
 		Env:   "dev",
-		Level: "info",
+		Level: "trace",
 	}))
 	tempDir, deferFunc := test.Space(t)
 	streamRepo, err := schema.NewStream()
diff --git a/banyand/tsdb/block.go b/banyand/tsdb/block.go
index 44f1922..824c861 100644
--- a/banyand/tsdb/block.go
+++ b/banyand/tsdb/block.go
@@ -27,9 +27,9 @@ import (
 	"github.com/apache/skywalking-banyandb/api/common"
 	databasev2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v2"
 	"github.com/apache/skywalking-banyandb/banyand/kv"
-	"github.com/apache/skywalking-banyandb/pkg/convert"
 	"github.com/apache/skywalking-banyandb/pkg/index"
 	"github.com/apache/skywalking-banyandb/pkg/index/inverted"
+	"github.com/apache/skywalking-banyandb/pkg/index/lsm"
 	"github.com/apache/skywalking-banyandb/pkg/logger"
 )
 
@@ -39,15 +39,14 @@ type block struct {
 	ref  *z.Closer
 
 	store         kv.TimeSeriesStore
-	primaryIndex  kv.Store
-	invertedIndex inverted.GlobalStore
+	primaryIndex  index.Store
+	invertedIndex index.Store
+	lsmIndex      index.Store
 	closableLst   []io.Closer
 	endTime       time.Time
 	startTime     time.Time
 	segID         uint16
 	blockID       uint16
-
-	//revertedIndex kv.Store
 }
 
 type blockOpts struct {
@@ -76,7 +75,10 @@ func newBlock(ctx context.Context, opts blockOpts) (b *block, err error) {
 		kv.TSSWithLogger(b.l)); err != nil {
 		return nil, err
 	}
-	if b.primaryIndex, err = kv.OpenStore(0, b.path+"/p_index", kv.StoreWithLogger(b.l)); err != nil {
+	if b.primaryIndex, err = lsm.NewStore(lsm.StoreOpts{
+		Path:   b.path + "/primary",
+		Logger: b.l,
+	}); err != nil {
 		return nil, err
 	}
 	b.closableLst = append(b.closableLst, b.store, b.primaryIndex)
@@ -84,8 +86,18 @@ func newBlock(ctx context.Context, opts blockOpts) (b *block, err error) {
 	if !ok || len(rules) == 0 {
 		return b, nil
 	}
-	b.invertedIndex = inverted.NewStore("inverted")
-	return b, nil
+	b.invertedIndex, err = inverted.NewStore(inverted.StoreOpts{
+		Path:   b.path + "/inverted",
+		Logger: b.l,
+	})
+	if b.lsmIndex, err = lsm.NewStore(lsm.StoreOpts{
+		Path:   b.path + "/lsm",
+		Logger: b.l,
+	}); err != nil {
+		return nil, err
+	}
+	b.closableLst = append(b.closableLst, b.invertedIndex, b.lsmIndex)
+	return b, err
 }
 
 func (b *block) delegate() blockDelegate {
@@ -137,11 +149,11 @@ func (d *bDelegate) dataReader() kv.TimeSeriesReader {
 }
 
 func (d *bDelegate) lsmIndexReader() index.Searcher {
-	return d.delegate.invertedIndex.Searcher()
+	return d.delegate.lsmIndex
 }
 
 func (d *bDelegate) invertedIndexReader() index.Searcher {
-	return d.delegate.invertedIndex.Searcher()
+	return d.delegate.invertedIndex
 }
 
 func (d *bDelegate) primaryIndexReader() index.Searcher {
@@ -161,21 +173,21 @@ func (d *bDelegate) write(key []byte, val []byte, ts time.Time) error {
 }
 
 func (d *bDelegate) writePrimaryIndex(field index.Field, id common.ItemID) error {
-	return d.delegate.primaryIndex.Put(field.Marshal(), convert.Uint64ToBytes(uint64(id)))
+	return d.delegate.primaryIndex.Write(field, id)
 }
 
 func (d *bDelegate) writeLSMIndex(field index.Field, id common.ItemID) error {
-	if d.delegate.invertedIndex == nil {
+	if d.delegate.lsmIndex == nil {
 		return nil
 	}
-	return d.delegate.invertedIndex.Insert(field, id)
+	return d.delegate.lsmIndex.Write(field, id)
 }
 
 func (d *bDelegate) writeInvertedIndex(field index.Field, id common.ItemID) error {
 	if d.delegate.invertedIndex == nil {
 		return nil
 	}
-	return d.delegate.invertedIndex.Insert(field, id)
+	return d.delegate.invertedIndex.Write(field, id)
 }
 
 func (d *bDelegate) contains(ts time.Time) bool {
diff --git a/banyand/tsdb/series_seek.go b/banyand/tsdb/series_seek.go
index a6cf6c0..4763ec6 100644
--- a/banyand/tsdb/series_seek.go
+++ b/banyand/tsdb/series_seek.go
@@ -18,12 +18,11 @@
 package tsdb
 
 import (
-	"time"
-
 	"github.com/apache/skywalking-banyandb/api/common"
 	databasev2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v2"
 	modelv2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v2"
 	"github.com/apache/skywalking-banyandb/banyand/kv"
+	"github.com/apache/skywalking-banyandb/pkg/index"
 )
 
 type Iterator interface {
@@ -62,6 +61,7 @@ type seekerBuilder struct {
 	}
 	order               modelv2.QueryOrder_Sort
 	indexRuleForSorting *databasev2.IndexRule
+	rangeOptsForSorting index.RangeOpts
 }
 
 func (s *seekerBuilder) Build() (Seeker, error) {
@@ -72,16 +72,7 @@ func (s *seekerBuilder) Build() (Seeker, error) {
 	if err != nil {
 		return nil, err
 	}
-	filters := []filterFn{
-		func(item Item) bool {
-			valid := s.seriesSpan.timeRange.contains(item.Time())
-			timeRange := s.seriesSpan.timeRange
-			s.seriesSpan.l.Trace().
-				Times("time_range", []time.Time{timeRange.Start, timeRange.End}).
-				Bool("valid", valid).Msg("filter item by time range")
-			return valid
-		},
-	}
+	filters := make([]filterFn, 0, 2)
 	if indexFilter != nil {
 		filters = append(filters, indexFilter)
 	}
diff --git a/banyand/tsdb/series_seek_filter.go b/banyand/tsdb/series_seek_filter.go
index bd55f1c..8e92715 100644
--- a/banyand/tsdb/series_seek_filter.go
+++ b/banyand/tsdb/series_seek_filter.go
@@ -75,6 +75,13 @@ func (s *seekerBuilder) buildIndexFilter() (filterFn, error) {
 		if err != nil {
 			return err
 		}
+		rangeOpts, found := tree.TrimRangeLeaf(index.FieldKey{
+			SeriesID:  s.seriesSpan.seriesID,
+			IndexRule: s.indexRuleForSorting.GetMetadata().GetName(),
+		})
+		if found {
+			s.rangeOptsForSorting = rangeOpts
+		}
 		list, err := tree.Execute()
 		if err != nil {
 			return err
diff --git a/banyand/tsdb/series_seek_sort.go b/banyand/tsdb/series_seek_sort.go
index 7e9a308..dbe1947 100644
--- a/banyand/tsdb/series_seek_sort.go
+++ b/banyand/tsdb/series_seek_sort.go
@@ -27,6 +27,7 @@ import (
 	databasev2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v2"
 	modelv2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v2"
 	"github.com/apache/skywalking-banyandb/banyand/kv"
+	"github.com/apache/skywalking-banyandb/pkg/convert"
 	"github.com/apache/skywalking-banyandb/pkg/index"
 	"github.com/apache/skywalking-banyandb/pkg/index/posting"
 	"github.com/apache/skywalking-banyandb/pkg/logger"
@@ -48,23 +49,36 @@ func (s *seekerBuilder) buildSeries(filters []filterFn) []Iterator {
 	if s.indexRuleForSorting == nil {
 		return s.buildSeriesByTime(filters)
 	}
+	filters = append(filters, func(item Item) bool {
+		valid := s.seriesSpan.timeRange.contains(item.Time())
+		timeRange := s.seriesSpan.timeRange
+		s.seriesSpan.l.Trace().
+			Times("time_range", []time.Time{timeRange.Start, timeRange.End}).
+			Bool("valid", valid).Msg("filter item by time range")
+		return valid
+	})
 	return s.buildSeriesByIndex(filters)
 }
 
 func (s *seekerBuilder) buildSeriesByIndex(filters []filterFn) (series []Iterator) {
 	for _, b := range s.seriesSpan.blocks {
 		var inner index.FieldIterator
-		term := index.FieldKey{
+		var found bool
+		fieldKey := index.FieldKey{
 			SeriesID:  s.seriesSpan.seriesID,
 			IndexRule: s.indexRuleForSorting.GetMetadata().GetName(),
 		}
+
 		switch s.indexRuleForSorting.GetType() {
 		case databasev2.IndexRule_TYPE_TREE:
-			inner = b.lsmIndexReader().FieldIterator(term.Marshal(), s.order)
+			inner, found = b.lsmIndexReader().Iterator(fieldKey, s.rangeOptsForSorting, s.order)
 		case databasev2.IndexRule_TYPE_INVERTED:
-			inner = b.invertedIndexReader().FieldIterator(term.Marshal(), s.order)
+			inner, found = b.invertedIndexReader().Iterator(fieldKey, s.rangeOptsForSorting, s.order)
+		default:
+			// only tree index supports sorting
+			continue
 		}
-		if inner != nil {
+		if found {
 			series = append(series, newSearcherIterator(s.seriesSpan.l, inner, b.dataReader(), s.seriesSpan.seriesID, filters))
 		}
 	}
@@ -86,14 +100,23 @@ func (s *seekerBuilder) buildSeriesByTime(filters []filterFn) []Iterator {
 	}
 	delegated := make([]Iterator, 0, len(bb))
 	bTimes := make([]time.Time, 0, len(bb))
+	timeRange := s.seriesSpan.timeRange
+	termRange := index.RangeOpts{
+		Lower:         convert.Int64ToBytes(timeRange.Start.UnixNano()),
+		Upper:         convert.Int64ToBytes(timeRange.End.UnixNano()),
+		IncludesLower: true,
+	}
 	for _, b := range bb {
 		bTimes = append(bTimes, b.startTime())
-		inner := b.primaryIndexReader().
-			FieldIterator(
-				s.seriesSpan.seriesID.Marshal(),
+		inner, found := b.primaryIndexReader().
+			Iterator(
+				index.FieldKey{
+					SeriesID: s.seriesSpan.seriesID,
+				},
+				termRange,
 				s.order,
 			)
-		if inner != nil {
+		if found {
 			delegated = append(delegated, newSearcherIterator(s.seriesSpan.l, inner, b.dataReader(), s.seriesSpan.seriesID, filters))
 		}
 	}
@@ -101,6 +124,7 @@ func (s *seekerBuilder) buildSeriesByTime(filters []filterFn) []Iterator {
 		Str("order", modelv2.QueryOrder_Sort_name[int32(s.order)]).
 		Times("blocks", bTimes).
 		Uint64("series_id", uint64(s.seriesSpan.seriesID)).
+		Int("shard_id", int(s.seriesSpan.shardID)).
 		Msg("seek series by time")
 	return []Iterator{newMergedIterator(delegated)}
 }
@@ -122,7 +146,7 @@ func (s *searcherIterator) Next() bool {
 		if s.fieldIterator.Next() {
 			v := s.fieldIterator.Val()
 			s.cur = v.Value.Iterator()
-			s.curKey = v.Key
+			s.curKey = v.Term
 			s.l.Trace().Uint64("series_id", uint64(s.seriesID)).Hex("term", s.curKey).Msg("got a new field")
 		} else {
 			return false
diff --git a/banyand/tsdb/series_write.go b/banyand/tsdb/series_write.go
index 60fba52..ea5404b 100644
--- a/banyand/tsdb/series_write.go
+++ b/banyand/tsdb/series_write.go
@@ -172,7 +172,9 @@ func (w *writer) Write() (GlobalItemID, error) {
 		}
 	}
 	return id, w.block.writePrimaryIndex(index.Field{
-		Key:  id.SeriesID.Marshal(),
+		Key: index.FieldKey{
+			SeriesID: id.SeriesID,
+		}.Marshal(),
 		Term: convert.Int64ToBytes(w.ts.UnixNano()),
 	}, id.ID)
 }
diff --git a/pkg/index/index.go b/pkg/index/index.go
index bec268e..9be8831 100644
--- a/pkg/index/index.go
+++ b/pkg/index/index.go
@@ -19,18 +19,52 @@ package index
 
 import (
 	"bytes"
+	"io"
 
+	"github.com/pkg/errors"
+
+	"github.com/apache/skywalking-banyandb/api/common"
 	modelv2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v2"
 	"github.com/apache/skywalking-banyandb/pkg/index/posting"
 )
 
+var ErrMalformed = errors.New("the data is malformed")
+
+type FieldKey struct {
+	SeriesID  common.SeriesID
+	IndexRule string
+}
+
+func (f FieldKey) Marshal() []byte {
+	return bytes.Join([][]byte{
+		f.SeriesID.Marshal(),
+		[]byte(f.IndexRule),
+	}, nil)
+}
+
+func (f FieldKey) Equal(other FieldKey) bool {
+	return f.SeriesID == other.SeriesID && f.IndexRule == other.IndexRule
+}
+
 type Field struct {
 	Key  []byte
 	Term []byte
 }
 
 func (f Field) Marshal() []byte {
-	return bytes.Join([][]byte{f.Key, f.Term}, nil)
+	return bytes.Join([][]byte{f.Key, f.Term}, []byte(":"))
+}
+
+func (f *Field) Unmarshal(raw []byte) error {
+	bb := bytes.SplitN(raw, []byte(":"), 2)
+	if len(bb) < 2 {
+		return errors.Wrap(ErrMalformed, "unable to unmarshal the field")
+	}
+	f.Key = make([]byte, len(bb[0]))
+	copy(f.Key, bb[0])
+	f.Term = make([]byte, len(bb[1]))
+	copy(f.Term, bb[1])
+	return nil
 }
 
 type RangeOpts struct {
@@ -40,6 +74,32 @@ type RangeOpts struct {
 	IncludesLower bool
 }
 
+func (r RangeOpts) Between(value []byte) int {
+	if r.Upper != nil {
+		var in bool
+		if r.IncludesUpper {
+			in = bytes.Compare(r.Upper, value) >= 0
+		} else {
+			in = bytes.Compare(r.Upper, value) > 0
+		}
+		if !in {
+			return 1
+		}
+	}
+	if r.Lower != nil {
+		var in bool
+		if r.IncludesLower {
+			in = bytes.Compare(r.Lower, value) <= 0
+		} else {
+			in = bytes.Compare(r.Lower, value) < 0
+		}
+		if !in {
+			return -1
+		}
+	}
+	return 0
+}
+
 type FieldIterator interface {
 	Next() bool
 	Val() *PostingValue
@@ -47,13 +107,27 @@ type FieldIterator interface {
 }
 
 type PostingValue struct {
-	Key   []byte
+	Term  []byte
 	Value posting.List
 }
 
+type Writer interface {
+	Write(field Field, itemID common.ItemID) error
+}
+
+type FieldIterable interface {
+	Iterator(fieldKey FieldKey, termRange RangeOpts, order modelv2.QueryOrder_Sort) (iter FieldIterator, found bool)
+}
+
 type Searcher interface {
-	MatchField(fieldName []byte) (list posting.List)
-	MatchTerms(field Field) (list posting.List)
-	Range(fieldName []byte, opts RangeOpts) (list posting.List)
-	FieldIterator(fieldName []byte, order modelv2.QueryOrder_Sort) FieldIterator
+	FieldIterable
+	MatchField(fieldKey FieldKey) (list posting.List, err error)
+	MatchTerms(field Field) (list posting.List, err error)
+	Range(fieldKey FieldKey, opts RangeOpts) (list posting.List, err error)
+}
+
+type Store interface {
+	io.Closer
+	Writer
+	Searcher
 }
diff --git a/pkg/index/inverted/field_map.go b/pkg/index/inverted/field_map.go
index 85c7771..2ee0831 100644
--- a/pkg/index/inverted/field_map.go
+++ b/pkg/index/inverted/field_map.go
@@ -20,44 +20,44 @@ package inverted
 import (
 	"sync"
 
-	"github.com/pkg/errors"
-
 	"github.com/apache/skywalking-banyandb/api/common"
 	"github.com/apache/skywalking-banyandb/pkg/convert"
 	"github.com/apache/skywalking-banyandb/pkg/index"
 )
 
-var ErrFieldAbsent = errors.New("field doesn't exist")
-
 type fieldHashID uint64
 
 type fieldMap struct {
-	repo  map[fieldHashID]*fieldValue
+	repo  map[fieldHashID]*termContainer
+	lst   []fieldHashID
 	mutex sync.RWMutex
 }
 
 func newFieldMap(initialSize int) *fieldMap {
 	return &fieldMap{
-		repo: make(map[fieldHashID]*fieldValue, initialSize),
+		repo: make(map[fieldHashID]*termContainer, initialSize),
+		lst:  make([]fieldHashID, 0),
 	}
 }
 
-func (fm *fieldMap) createKey(key []byte) *fieldValue {
-	result := &fieldValue{
+func (fm *fieldMap) createKey(key []byte) *termContainer {
+	k := fieldHashID(convert.Hash(key))
+	result := &termContainer{
 		key:   key,
 		value: newPostingMap(),
 	}
-	fm.repo[fieldHashID(convert.Hash(key))] = result
+	fm.repo[k] = result
+	fm.lst = append(fm.lst, k)
 	return result
 }
 
-func (fm *fieldMap) get(key []byte) (*fieldValue, bool) {
+func (fm *fieldMap) get(key []byte) (*termContainer, bool) {
 	fm.mutex.RLock()
 	defer fm.mutex.RUnlock()
 	return fm.getWithoutLock(key)
 }
 
-func (fm *fieldMap) getWithoutLock(key []byte) (*fieldValue, bool) {
+func (fm *fieldMap) getWithoutLock(key []byte) (*termContainer, bool) {
 	v, ok := fm.repo[fieldHashID(convert.Hash(key))]
 	return v, ok
 }
@@ -72,7 +72,7 @@ func (fm *fieldMap) put(fv index.Field, id common.ItemID) error {
 	return pm.value.put(fv.Term, id)
 }
 
-type fieldValue struct {
+type termContainer struct {
 	key   []byte
-	value *postingMap
+	value *termMap
 }
diff --git a/pkg/index/inverted/inverted.go b/pkg/index/inverted/inverted.go
index 569d434..ce50740 100644
--- a/pkg/index/inverted/inverted.go
+++ b/pkg/index/inverted/inverted.go
@@ -18,30 +18,203 @@
 package inverted
 
 import (
+	"bytes"
+	"sync"
+
+	"github.com/pkg/errors"
+	"go.uber.org/multierr"
+
 	"github.com/apache/skywalking-banyandb/api/common"
+	modelv2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v2"
+	"github.com/apache/skywalking-banyandb/banyand/kv"
 	"github.com/apache/skywalking-banyandb/pkg/index"
+	"github.com/apache/skywalking-banyandb/pkg/index/posting"
+	"github.com/apache/skywalking-banyandb/pkg/index/posting/roaring"
+	"github.com/apache/skywalking-banyandb/pkg/logger"
 )
 
-type GlobalStore interface {
-	Searcher() index.Searcher
-	Insert(field index.Field, docID common.ItemID) error
-}
+var _ index.Store = (*store)(nil)
 
 type store struct {
-	memTable *MemTable
-	//TODO: add data tables
+	diskTable         kv.IndexStore
+	memTable          *memTable
+	immutableMemTable *memTable
+	rwMutex           sync.RWMutex
 }
 
-func (s *store) Searcher() index.Searcher {
-	return s.memTable
+type StoreOpts struct {
+	Path   string
+	Logger *logger.Logger
 }
 
-func (s *store) Insert(field index.Field, chunkID common.ItemID) error {
-	return s.memTable.Insert(field, chunkID)
+func NewStore(opts StoreOpts) (index.Store, error) {
+	diskTable, err := kv.OpenIndexStore(0, opts.Path, kv.IndexWithLogger(opts.Logger))
+	if err != nil {
+		return nil, err
+	}
+	return &store{
+		memTable:  newMemTable(),
+		diskTable: diskTable,
+	}, nil
 }
 
-func NewStore(name string) GlobalStore {
-	return &store{
-		memTable: NewMemTable(name),
+func (s *store) Close() error {
+	return s.diskTable.Close()
+}
+
+func (s *store) Write(field index.Field, chunkID common.ItemID) error {
+	return s.memTable.Write(field, chunkID)
+}
+
+func (s *store) Flush() error {
+	s.rwMutex.Lock()
+	defer s.rwMutex.Unlock()
+	if s.immutableMemTable == nil {
+		s.immutableMemTable = s.memTable
+		s.memTable = newMemTable()
+	}
+	err := s.diskTable.
+		Handover(s.immutableMemTable.Iter())
+	if err != nil {
+		return err
+	}
+	s.immutableMemTable = nil
+	return nil
+}
+
+func (s *store) MatchField(fieldKey index.FieldKey) (posting.List, error) {
+	return s.Range(fieldKey, index.RangeOpts{})
+}
+
+func (s *store) MatchTerms(field index.Field) (posting.List, error) {
+	result := roaring.NewPostingList()
+	result, errMem := s.searchInMemTables(result, func(table *memTable) (posting.List, error) {
+		list, errInner := table.MatchTerms(field)
+		if errInner != nil {
+			return nil, errInner
+		}
+		return list, nil
+	})
+	if errMem != nil {
+		return nil, errors.Wrap(errMem, "mem table of inverted index")
+	}
+	raw, errTable := s.diskTable.Get(field.Marshal())
+	switch {
+	case errors.Is(errTable, kv.ErrKeyNotFound):
+		return result, nil
+	case errTable != nil:
+		return nil, errors.Wrap(errTable, "disk table of inverted index")
 	}
+	list := roaring.NewPostingList()
+	err := list.Unmarshall(raw)
+	if err != nil {
+		return nil, err
+	}
+	err = result.Union(list)
+	if err != nil {
+		return nil, err
+	}
+	return result, nil
 }
+
+func (s *store) Range(fieldKey index.FieldKey, opts index.RangeOpts) (list posting.List, err error) {
+	iter, found := s.Iterator(fieldKey, opts, modelv2.QueryOrder_SORT_ASC)
+	if !found {
+		return roaring.EmptyPostingList, nil
+	}
+	list = roaring.NewPostingList()
+	for iter.Next() {
+		err = multierr.Append(err, list.Union(iter.Val().Value))
+	}
+	err = multierr.Append(err, iter.Close())
+	return
+}
+
+func (s *store) Iterator(fieldKey index.FieldKey, termRange index.RangeOpts,
+	order modelv2.QueryOrder_Sort) (index.FieldIterator, bool) {
+	s.rwMutex.RLock()
+	defer s.rwMutex.RUnlock()
+	tt := []*memTable{s.memTable, s.immutableMemTable}
+	iters := make([]index.FieldIterator, 0, len(tt)+1)
+	for _, table := range tt {
+		if table == nil {
+			continue
+		}
+		it, found := table.Iterator(fieldKey, termRange, order)
+		if !found {
+			continue
+		}
+		iters = append(iters, it)
+	}
+	it := index.NewFieldIteratorTemplate(fieldKey, termRange, order, s.diskTable, func(term, val []byte, delegated kv.Iterator) (*index.PostingValue, error) {
+		list := roaring.NewPostingList()
+		err := list.Unmarshall(val)
+		if err != nil {
+			return nil, err
+		}
+
+		pv := &index.PostingValue{
+			Term:  term,
+			Value: list,
+		}
+
+		for ; delegated.Valid(); delegated.Next() {
+			f := index.Field{}
+			err := f.Unmarshal(delegated.Key())
+			if err != nil {
+				return nil, err
+			}
+			if !bytes.Equal(f.Term, term) {
+				break
+			}
+			l := roaring.NewPostingList()
+			err = l.Unmarshall(delegated.Val())
+			if err != nil {
+				return nil, err
+			}
+			err = pv.Value.Union(l)
+			if err != nil {
+				return nil, err
+			}
+		}
+		return pv, nil
+	})
+	iters = append(iters, it)
+	if len(iters) < 1 {
+		return nil, false
+	}
+	var fn index.SwitchFn
+	switch order {
+	case modelv2.QueryOrder_SORT_ASC, modelv2.QueryOrder_SORT_UNSPECIFIED:
+		fn = func(a, b []byte) bool {
+			return bytes.Compare(a, b) > 0
+		}
+	case modelv2.QueryOrder_SORT_DESC:
+		fn = func(a, b []byte) bool {
+			return bytes.Compare(a, b) < 0
+		}
+	}
+	return index.NewMergedIterator(iters, fn), true
+}
+
+func (s *store) searchInMemTables(result posting.List, entityFunc entityFunc) (posting.List, error) {
+	s.rwMutex.RLock()
+	defer s.rwMutex.RUnlock()
+	tt := []*memTable{s.memTable, s.immutableMemTable}
+	for _, table := range tt {
+		if table == nil {
+			continue
+		}
+		list, err := entityFunc(table)
+		if err != nil {
+			return result, err
+		}
+		err = result.Union(list)
+		if err != nil {
+			return result, err
+		}
+	}
+	return result, nil
+}
+
+type entityFunc func(table *memTable) (posting.List, error)
diff --git a/pkg/index/inverted/inverted_test.go b/pkg/index/inverted/inverted_test.go
new file mode 100644
index 0000000..88a427b
--- /dev/null
+++ b/pkg/index/inverted/inverted_test.go
@@ -0,0 +1,142 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package inverted
+
+import (
+	"testing"
+
+	"github.com/stretchr/testify/assert"
+
+	"github.com/apache/skywalking-banyandb/pkg/index/posting"
+	"github.com/apache/skywalking-banyandb/pkg/index/posting/roaring"
+	"github.com/apache/skywalking-banyandb/pkg/index/test_cases"
+	"github.com/apache/skywalking-banyandb/pkg/logger"
+	"github.com/apache/skywalking-banyandb/pkg/test"
+)
+
+func TestStore_MatchTerm(t *testing.T) {
+	tester := assert.New(t)
+	path, fn := setUp(tester)
+	s, err := NewStore(StoreOpts{
+		Path:   path,
+		Logger: logger.GetLogger("test"),
+	})
+	defer func() {
+		tester.NoError(s.Close())
+		fn()
+	}()
+	tester.NoError(err)
+	test_cases.SetUp(tester, s)
+	test_cases.RunServiceName(t, s)
+}
+
+func TestStore_MatchTerm_AfterFlush(t *testing.T) {
+	tester := assert.New(t)
+	path, fn := setUp(tester)
+	s, err := NewStore(StoreOpts{
+		Path:   path,
+		Logger: logger.GetLogger("test"),
+	})
+	defer func() {
+		tester.NoError(s.Close())
+		fn()
+	}()
+	tester.NoError(err)
+	test_cases.SetUp(tester, s)
+	tester.NoError(s.(*store).Flush())
+	test_cases.RunServiceName(t, s)
+}
+
+func TestStore_Iterator(t *testing.T) {
+	tester := assert.New(t)
+	path, fn := setUp(tester)
+	s, err := NewStore(StoreOpts{
+		Path:   path,
+		Logger: logger.GetLogger("test"),
+	})
+	defer func() {
+		tester.NoError(s.Close())
+		fn()
+	}()
+	tester.NoError(err)
+	data := test_cases.SetUpDuration(tester, s)
+	test_cases.RunDuration(t, data, s)
+}
+
+func TestStore_Iterator_AfterFlush(t *testing.T) {
+	tester := assert.New(t)
+	path, fn := setUp(tester)
+	s, err := NewStore(StoreOpts{
+		Path:   path,
+		Logger: logger.GetLogger("test"),
+	})
+	defer func() {
+		tester.NoError(s.Close())
+		fn()
+	}()
+	tester.NoError(err)
+	data := test_cases.SetUpDuration(tester, s)
+	tester.NoError(s.(*store).Flush())
+	test_cases.RunDuration(t, data, s)
+}
+
+func TestStore_Iterator_Hybrid(t *testing.T) {
+	tester := assert.New(t)
+	path, fn := setUp(tester)
+	s, err := NewStore(StoreOpts{
+		Path:   path,
+		Logger: logger.GetLogger("test"),
+	})
+	defer func() {
+		tester.NoError(s.Close())
+		fn()
+	}()
+	tester.NoError(err)
+	r := map[int]posting.List{
+		50:   roaring.NewPostingList(),
+		200:  nil,
+		500:  roaring.NewPostingList(),
+		1000: nil,
+		2000: roaring.NewPostingList(),
+	}
+	data1 := test_cases.SetUpPartialDuration(tester, s, r)
+	tester.NoError(s.(*store).Flush())
+	r = map[int]posting.List{
+		50:   nil,
+		200:  roaring.NewPostingList(),
+		500:  nil,
+		1000: roaring.NewPostingList(),
+		2000: nil,
+	}
+	data := test_cases.SetUpPartialDuration(tester, s, r)
+	for i, list := range data {
+		if list == nil {
+			data[i] = data1[i]
+		}
+	}
+	test_cases.RunDuration(t, data, s)
+}
+
+func setUp(t *assert.Assertions) (tempDir string, deferFunc func()) {
+	t.NoError(logger.Init(logger.Logging{
+		Env:   "dev",
+		Level: "debug",
+	}))
+	tempDir, deferFunc = test.Space(t)
+	return tempDir, deferFunc
+}
diff --git a/pkg/index/inverted/mem.go b/pkg/index/inverted/mem.go
index f8db9b7..cd19027 100644
--- a/pkg/index/inverted/mem.go
+++ b/pkg/index/inverted/mem.go
@@ -21,41 +21,33 @@ import (
 	"bytes"
 	"sort"
 
-	"github.com/pkg/errors"
+	"go.uber.org/multierr"
 
 	"github.com/apache/skywalking-banyandb/api/common"
 	modelv2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v2"
+	"github.com/apache/skywalking-banyandb/banyand/kv"
 	"github.com/apache/skywalking-banyandb/pkg/index"
 	"github.com/apache/skywalking-banyandb/pkg/index/posting"
 	"github.com/apache/skywalking-banyandb/pkg/index/posting/roaring"
 )
 
-var ErrFieldsAbsent = errors.New("fields are absent")
-
-var _ index.Searcher = (*MemTable)(nil)
+var (
+	_ index.Writer        = (*memTable)(nil)
+	_ index.FieldIterable = (*memTable)(nil)
+)
 
-type MemTable struct {
-	terms *fieldMap
-	name  string
+type memTable struct {
+	fields *fieldMap
 }
 
-func NewMemTable(name string) *MemTable {
-	return &MemTable{
-		name:  name,
-		terms: newFieldMap(1000),
+func newMemTable() *memTable {
+	return &memTable{
+		fields: newFieldMap(1000),
 	}
 }
 
-func (m *MemTable) Insert(field index.Field, chunkID common.ItemID) error {
-	return m.terms.put(field, chunkID)
-}
-
-func (m *MemTable) MatchField(fieldName []byte) (list posting.List) {
-	fieldsValues, ok := m.terms.get(fieldName)
-	if !ok {
-		return roaring.EmptyPostingList
-	}
-	return fieldsValues.value.allValues()
+func (m *memTable) Write(field index.Field, chunkID common.ItemID) error {
+	return m.fields.put(field, chunkID)
 }
 
 var _ index.FieldIterator = (*fIterator)(nil)
@@ -64,7 +56,7 @@ type fIterator struct {
 	index     int
 	val       *index.PostingValue
 	keys      [][]byte
-	valueRepo *postingMap
+	valueRepo *termMap
 	closed    bool
 }
 
@@ -74,7 +66,6 @@ func (f *fIterator) Next() bool {
 	}
 	f.index++
 	if f.index >= len(f.keys) {
-		_ = f.Close()
 		return false
 	}
 	f.val = f.valueRepo.getEntry(f.keys[f.index])
@@ -93,7 +84,7 @@ func (f *fIterator) Close() error {
 	return nil
 }
 
-func newFieldIterator(keys [][]byte, fValue *postingMap) index.FieldIterator {
+func newFieldIterator(keys [][]byte, fValue *termMap) index.FieldIterator {
 	return &fIterator{
 		keys:      keys,
 		valueRepo: fValue,
@@ -101,45 +92,139 @@ func newFieldIterator(keys [][]byte, fValue *postingMap) index.FieldIterator {
 	}
 }
 
-func (m *MemTable) FieldIterator(fieldName []byte, order modelv2.QueryOrder_Sort) index.FieldIterator {
-	fieldsValues, ok := m.terms.get(fieldName)
+func (m *memTable) Iterator(fieldKey index.FieldKey, rangeOpts index.RangeOpts,
+	order modelv2.QueryOrder_Sort) (iter index.FieldIterator, found bool) {
+	fieldsValues, ok := m.fields.get(fieldKey.Marshal())
 	if !ok {
-		return nil
+		return nil, false
 	}
 	fValue := fieldsValues.value
-	var keys [][]byte
+	var terms [][]byte
 	{
 		fValue.mutex.RLock()
 		defer fValue.mutex.RUnlock()
 		for _, value := range fValue.repo {
-			keys = append(keys, value.Key)
+			if rangeOpts.Between(value.Term) == 0 {
+				terms = append(terms, value.Term)
+			}
 		}
 	}
+	if len(terms) < 1 {
+		return nil, false
+	}
 	switch order {
-	case modelv2.QueryOrder_SORT_ASC:
-		sort.SliceStable(keys, func(i, j int) bool {
-			return bytes.Compare(keys[i], keys[j]) < 0
+	case modelv2.QueryOrder_SORT_ASC, modelv2.QueryOrder_SORT_UNSPECIFIED:
+		sort.SliceStable(terms, func(i, j int) bool {
+			return bytes.Compare(terms[i], terms[j]) < 0
 		})
 	case modelv2.QueryOrder_SORT_DESC:
-		sort.SliceStable(keys, func(i, j int) bool {
-			return bytes.Compare(keys[i], keys[j]) > 0
+		sort.SliceStable(terms, func(i, j int) bool {
+			return bytes.Compare(terms[i], terms[j]) > 0
 		})
 	}
-	return newFieldIterator(keys, fValue)
+	return newFieldIterator(terms, fValue), true
 }
 
-func (m *MemTable) MatchTerms(field index.Field) (list posting.List) {
-	fieldsValues, ok := m.terms.get(field.Key)
+func (m *memTable) MatchTerms(field index.Field) (posting.List, error) {
+	fieldsValues, ok := m.fields.get(field.Key)
 	if !ok {
-		return roaring.EmptyPostingList
+		return roaring.EmptyPostingList, nil
+	}
+	list := fieldsValues.value.get(field.Term)
+	if list == nil {
+		return roaring.EmptyPostingList, nil
 	}
-	return fieldsValues.value.get(field.Term).Clone()
+	return list, nil
 }
 
-func (m *MemTable) Range(fieldName []byte, opts index.RangeOpts) (list posting.List) {
-	fieldsValues, ok := m.terms.get(fieldName)
-	if !ok {
-		return roaring.EmptyPostingList
+var _ kv.Iterator = (*flushIterator)(nil)
+
+type flushIterator struct {
+	fieldIdx int
+	termIdx  int
+	key      []byte
+	value    []byte
+	fields   *fieldMap
+	valid    bool
+	err      error
+}
+
+func (i *flushIterator) Next() {
+	if i.fieldIdx >= len(i.fields.lst) {
+		i.valid = false
+		return
+	}
+	fieldID := i.fields.lst[i.fieldIdx]
+	terms := i.fields.repo[fieldID]
+	if i.termIdx < len(terms.value.lst) {
+		i.termIdx++
+		if !i.setCurr() {
+			i.Next()
+		}
+		return
+	}
+	i.fieldIdx++
+	i.termIdx = 0
+	if !i.setCurr() {
+		i.Next()
+	}
+}
+
+func (i *flushIterator) Rewind() {
+	i.fieldIdx = 0
+	i.termIdx = 0
+	i.valid = true
+	if !i.setCurr() {
+		i.valid = false
+	}
+}
+
+func (i *flushIterator) Seek(_ []byte) {
+	panic("unsupported")
+}
+
+func (i *flushIterator) Key() []byte {
+	return i.key
+}
+
+func (i *flushIterator) Val() []byte {
+	return i.value
+}
+
+func (i *flushIterator) Valid() bool {
+	return i.valid
+}
+
+func (i *flushIterator) Close() error {
+	return i.err
+}
+
+func (i *flushIterator) setCurr() bool {
+	if i.fieldIdx >= len(i.fields.lst) {
+		return false
+	}
+	fieldID := i.fields.lst[i.fieldIdx]
+	term := i.fields.repo[fieldID]
+	if i.termIdx >= len(term.value.lst) {
+		return false
+	}
+	valueID := term.value.lst[i.termIdx]
+	value := term.value.repo[valueID]
+	v, err := value.Value.Marshall()
+	if err != nil {
+		i.err = multierr.Append(i.err, err)
+		return false
+	}
+	i.value = v
+	i.key = index.Field{
+		Key:  term.key,
+		Term: value.Term,
+	}.Marshal()
+	return true
+}
+
+func (m *memTable) Iter() kv.Iterator {
+	return &flushIterator{
+		fields: m.fields,
 	}
-	return fieldsValues.value.getRange(opts)
 }
diff --git a/pkg/index/inverted/mem_test.go b/pkg/index/inverted/mem_test.go
index 99caa2c..9e7c4eb 100644
--- a/pkg/index/inverted/mem_test.go
+++ b/pkg/index/inverted/mem_test.go
@@ -18,242 +18,21 @@
 package inverted
 
 import (
-	"reflect"
 	"testing"
 
 	"github.com/stretchr/testify/assert"
 
-	"github.com/apache/skywalking-banyandb/api/common"
-	modelv2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v2"
-	"github.com/apache/skywalking-banyandb/pkg/convert"
-	"github.com/apache/skywalking-banyandb/pkg/index"
-	"github.com/apache/skywalking-banyandb/pkg/index/posting"
-	"github.com/apache/skywalking-banyandb/pkg/index/posting/roaring"
+	"github.com/apache/skywalking-banyandb/pkg/index/test_cases"
 )
 
-func TestMemTable_Range(t *testing.T) {
-	type args struct {
-		fieldName []byte
-		opts      index.RangeOpts
-	}
-	m := NewMemTable("sw")
-	setUp(t, m)
-	tests := []struct {
-		name     string
-		args     args
-		wantList posting.List
-	}{
-		{
-			name: "in range",
-			args: args{
-				fieldName: []byte("duration"),
-				opts: index.RangeOpts{
-					Lower: convert.Uint16ToBytes(100),
-					Upper: convert.Uint16ToBytes(500),
-				},
-			},
-			wantList: m.MatchTerms(index.Field{
-				Key:  []byte("duration"),
-				Term: convert.Uint16ToBytes(200),
-			}),
-		},
-		{
-			name: "excludes edge",
-			args: args{
-				fieldName: []byte("duration"),
-				opts: index.RangeOpts{
-					Lower: convert.Uint16ToBytes(50),
-					Upper: convert.Uint16ToBytes(1000),
-				},
-			},
-			wantList: union(m,
-				index.Field{
-					Key:  []byte("duration"),
-					Term: convert.Uint16ToBytes(200),
-				},
-			),
-		},
-		{
-			name: "includes lower",
-			args: args{
-				fieldName: []byte("duration"),
-				opts: index.RangeOpts{
-					Lower:         convert.Uint16ToBytes(50),
-					Upper:         convert.Uint16ToBytes(1000),
-					IncludesLower: true,
-				},
-			},
-			wantList: union(m,
-				index.Field{
-					Key:  []byte("duration"),
-					Term: convert.Uint16ToBytes(50),
-				},
-				index.Field{
-					Key:  []byte("duration"),
-					Term: convert.Uint16ToBytes(200),
-				},
-			),
-		},
-		{
-			name: "includes upper",
-			args: args{
-				fieldName: []byte("duration"),
-				opts: index.RangeOpts{
-					Lower:         convert.Uint16ToBytes(50),
-					Upper:         convert.Uint16ToBytes(1000),
-					IncludesUpper: true,
-				},
-			},
-			wantList: union(m,
-				index.Field{
-					Key:  []byte("duration"),
-					Term: convert.Uint16ToBytes(200),
-				},
-				index.Field{
-					Key:  []byte("duration"),
-					Term: convert.Uint16ToBytes(1000),
-				},
-			),
-		},
-		{
-			name: "includes edges",
-			args: args{
-				fieldName: []byte("duration"),
-				opts: index.RangeOpts{
-					Lower:         convert.Uint16ToBytes(50),
-					Upper:         convert.Uint16ToBytes(1000),
-					IncludesUpper: true,
-					IncludesLower: true,
-				},
-			},
-			wantList: union(m,
-				index.Field{
-					Key:  []byte("duration"),
-					Term: convert.Uint16ToBytes(50),
-				},
-				index.Field{
-					Key:  []byte("duration"),
-					Term: convert.Uint16ToBytes(200),
-				},
-				index.Field{
-					Key:  []byte("duration"),
-					Term: convert.Uint16ToBytes(1000),
-				},
-			),
-		},
-		{
-			name: "match one",
-			args: args{
-				fieldName: []byte("duration"),
-				opts: index.RangeOpts{
-					Lower:         convert.Uint16ToBytes(200),
-					Upper:         convert.Uint16ToBytes(200),
-					IncludesUpper: true,
-					IncludesLower: true,
-				},
-			},
-			wantList: union(m,
-				index.Field{
-					Key:  []byte("duration"),
-					Term: convert.Uint16ToBytes(200),
-				},
-			),
-		},
-	}
-	for _, tt := range tests {
-		t.Run(tt.name, func(t *testing.T) {
-			if gotList := m.Range(tt.args.fieldName, tt.args.opts); !reflect.DeepEqual(gotList, tt.wantList) {
-				t.Errorf("Range() = %v, want %v", gotList.Len(), tt.wantList.Len())
-			}
-		})
-	}
+func TestMemTable_MatchTerm(t *testing.T) {
+	mt := newMemTable()
+	test_cases.SetUp(assert.New(t), mt)
+	test_cases.RunServiceName(t, mt)
 }
 
 func TestMemTable_Iterator(t *testing.T) {
-	tester := assert.New(t)
-	type args struct {
-		fieldName []byte
-		orderType modelv2.QueryOrder_Sort
-	}
-	m := NewMemTable("sw")
-	setUp(t, m)
-	tests := []struct {
-		name string
-		args args
-		want [][]byte
-	}{
-		{
-			name: "sort asc",
-			args: args{
-				fieldName: []byte("duration"),
-				orderType: modelv2.QueryOrder_SORT_ASC,
-			},
-			want: [][]byte{convert.Uint16ToBytes(50), convert.Uint16ToBytes(200), convert.Uint16ToBytes(1000)},
-		},
-		{
-			name: "sort desc",
-			args: args{
-				fieldName: []byte("duration"),
-				orderType: modelv2.QueryOrder_SORT_DESC,
-			},
-			want: [][]byte{convert.Uint16ToBytes(1000), convert.Uint16ToBytes(200), convert.Uint16ToBytes(50)},
-		},
-	}
-	for _, tt := range tests {
-		t.Run(tt.name, func(t *testing.T) {
-			iter := m.FieldIterator(tt.args.fieldName, tt.args.orderType)
-			tester.NotNil(iter)
-			var got [][]byte
-			defer func() {
-				_ = iter.Close()
-			}()
-			for iter.Next() {
-				got = append(got, iter.Val().Key)
-			}
-			tester.Equal(tt.want, got)
-		})
-	}
-}
-
-func union(memTable *MemTable, fields ...index.Field) posting.List {
-	result := roaring.NewPostingList()
-	for _, f := range fields {
-		_ = result.Union(memTable.MatchTerms(f))
-	}
-	return result
-}
-
-func setUp(t *testing.T, mt *MemTable) {
-	for i := 0; i < 100; i++ {
-		if i%2 == 0 {
-			assert.NoError(t, mt.Insert(index.Field{
-				Key:  []byte("service_name"),
-				Term: []byte("gateway"),
-			}, common.ItemID(i)))
-		} else {
-			assert.NoError(t, mt.Insert(index.Field{
-				Key:  []byte("service_name"),
-				Term: []byte("webpage"),
-			}, common.ItemID(i)))
-		}
-	}
-	for i := 100; i < 200; i++ {
-		switch {
-		case i%3 == 0:
-			assert.NoError(t, mt.Insert(index.Field{
-				Key:  []byte("duration"),
-				Term: convert.Uint16ToBytes(50),
-			}, common.ItemID(i)))
-		case i%3 == 1:
-			assert.NoError(t, mt.Insert(index.Field{
-				Key:  []byte("duration"),
-				Term: convert.Uint16ToBytes(200),
-			}, common.ItemID(i)))
-		case i%3 == 2:
-			assert.NoError(t, mt.Insert(index.Field{
-				Key:  []byte("duration"),
-				Term: convert.Uint16ToBytes(1000),
-			}, common.ItemID(i)))
-		}
-	}
+	mt := newMemTable()
+	data := test_cases.SetUpDuration(assert.New(t), mt)
+	test_cases.RunDuration(t, data, mt)
 }
diff --git a/pkg/index/inverted/term_map.go b/pkg/index/inverted/term_map.go
index c672239..e683837 100644
--- a/pkg/index/inverted/term_map.go
+++ b/pkg/index/inverted/term_map.go
@@ -18,8 +18,6 @@
 package inverted
 
 import (
-	"bytes"
-	"sort"
 	"sync"
 
 	"github.com/apache/skywalking-banyandb/api/common"
@@ -31,48 +29,50 @@ import (
 
 type termHashID uint64
 
-type postingMap struct {
+type termMap struct {
 	repo  map[termHashID]*index.PostingValue
+	lst   []termHashID
 	mutex sync.RWMutex
 }
 
-func newPostingMap() *postingMap {
-	return &postingMap{
+func newPostingMap() *termMap {
+	return &termMap{
 		repo: make(map[termHashID]*index.PostingValue),
 	}
 }
 
-func (p *postingMap) put(key []byte, id common.ItemID) error {
+func (p *termMap) put(key []byte, id common.ItemID) error {
 	list := p.getOrCreate(key)
 	list.Insert(id)
 	return nil
 }
 
-func (p *postingMap) getOrCreate(key []byte) posting.List {
+func (p *termMap) getOrCreate(key []byte) posting.List {
 	list := p.get(key)
-	if list != roaring.EmptyPostingList {
+	if list != nil {
 		return list
 	}
 	p.mutex.Lock()
 	defer p.mutex.Unlock()
 	hashedKey := termHashID(convert.Hash(key))
 	v := &index.PostingValue{
-		Key:   key,
+		Term:  key,
 		Value: roaring.NewPostingList(),
 	}
 	p.repo[hashedKey] = v
+	p.lst = append(p.lst, hashedKey)
 	return v.Value
 }
 
-func (p *postingMap) get(key []byte) posting.List {
+func (p *termMap) get(key []byte) posting.List {
 	e := p.getEntry(key)
 	if e == nil {
-		return roaring.EmptyPostingList
+		return nil
 	}
 	return e.Value
 }
 
-func (p *postingMap) getEntry(key []byte) *index.PostingValue {
+func (p *termMap) getEntry(key []byte) *index.PostingValue {
 	p.mutex.RLock()
 	defer p.mutex.RUnlock()
 	hashedKey := termHashID(convert.Hash(key))
@@ -82,80 +82,3 @@ func (p *postingMap) getEntry(key []byte) *index.PostingValue {
 	}
 	return v
 }
-
-func (p *postingMap) allValues() posting.List {
-	result := roaring.NewPostingList()
-	for _, value := range p.repo {
-		_ = result.Union(value.Value)
-	}
-	return result
-}
-
-func (p *postingMap) getRange(opts index.RangeOpts) posting.List {
-	switch bytes.Compare(opts.Upper, opts.Lower) {
-	case -1:
-		return roaring.EmptyPostingList
-	case 0:
-		if opts.IncludesUpper && opts.IncludesLower {
-			return p.get(opts.Upper)
-		}
-		return roaring.EmptyPostingList
-	}
-	p.mutex.RLock()
-	defer p.mutex.RUnlock()
-	keys := make(Asc, 0, len(p.repo))
-	for _, v := range p.repo {
-		keys = append(keys, v.Key)
-	}
-	sort.Sort(keys)
-	index := sort.Search(len(keys), func(i int) bool {
-		return bytes.Compare(keys[i], opts.Lower) >= 0
-	})
-	result := roaring.NewPostingList()
-	for i := index; i < len(keys); i++ {
-		k := keys[i]
-		switch {
-		case bytes.Equal(k, opts.Lower):
-			if opts.IncludesLower {
-				_ = result.Union(p.repo[termHashID(convert.Hash(k))].Value)
-			}
-		case bytes.Compare(k, opts.Upper) > 0:
-			break
-		case bytes.Equal(k, opts.Upper):
-			if opts.IncludesUpper {
-				_ = result.Union(p.repo[termHashID(convert.Hash(k))].Value)
-			}
-		default:
-			_ = result.Union(p.repo[termHashID(convert.Hash(k))].Value)
-		}
-	}
-	return result
-}
-
-type Asc [][]byte
-
-func (a Asc) Len() int {
-	return len(a)
-}
-
-func (a Asc) Less(i, j int) bool {
-	return bytes.Compare(a[i], a[j]) < 0
-}
-
-func (a Asc) Swap(i, j int) {
-	a[i], a[j] = a[j], a[i]
-}
-
-type Desc [][]byte
-
-func (d Desc) Len() int {
-	return len(d)
-}
-
-func (d Desc) Less(i, j int) bool {
-	return bytes.Compare(d[i], d[j]) > 0
-}
-
-func (d Desc) Swap(i, j int) {
-	d[i], d[j] = d[j], d[i]
-}
diff --git a/pkg/index/iterator.go b/pkg/index/iterator.go
new file mode 100644
index 0000000..439b627
--- /dev/null
+++ b/pkg/index/iterator.go
@@ -0,0 +1,209 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package index
+
+import (
+	"bytes"
+
+	"go.uber.org/multierr"
+
+	modelv2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v2"
+	"github.com/apache/skywalking-banyandb/banyand/kv"
+)
+
+type CompositePostingValueFn = func(term, value []byte, delegated kv.Iterator) (*PostingValue, error)
+
+var _ FieldIterator = (*FieldIteratorTemplate)(nil)
+
+type FieldIteratorTemplate struct {
+	delegated kv.Iterator
+
+	init      bool
+	curr      *PostingValue
+	err       error
+	termRange RangeOpts
+	fn        CompositePostingValueFn
+	reverse   bool
+	field     Field
+}
+
+func (f *FieldIteratorTemplate) Next() bool {
+	if !f.init {
+		f.init = true
+		f.delegated.Seek(f.field.Marshal())
+	}
+	if !f.delegated.Valid() {
+		return false
+	}
+	field := &Field{}
+	err := field.Unmarshal(f.delegated.Key())
+	if err != nil {
+		f.err = err
+		return false
+	}
+	if !bytes.Equal(field.Key, f.field.Key) {
+		return false
+	}
+	pv, err := f.fn(field.Term, f.delegated.Val(), f.delegated)
+	if err != nil {
+		f.err = err
+		return false
+	}
+	in := f.termRange.Between(pv.Term)
+	switch {
+	case in > 0:
+		if f.reverse {
+			return f.Next()
+		} else {
+			return false
+		}
+	case in < 0:
+		if f.reverse {
+			return false
+		} else {
+			return f.Next()
+		}
+	}
+	f.curr = pv
+	return true
+}
+
+func (f *FieldIteratorTemplate) Val() *PostingValue {
+	return f.curr
+}
+
+func (f *FieldIteratorTemplate) Close() error {
+	return f.delegated.Close()
+}
+
+func NewFieldIteratorTemplate(fieldKey FieldKey, termRange RangeOpts, order modelv2.QueryOrder_Sort, iterable kv.Iterable, fn CompositePostingValueFn) *FieldIteratorTemplate {
+	var reverse bool
+	var term []byte
+	switch order {
+	case modelv2.QueryOrder_SORT_ASC, modelv2.QueryOrder_SORT_UNSPECIFIED:
+		term = termRange.Lower
+		reverse = false
+	case modelv2.QueryOrder_SORT_DESC:
+		term = termRange.Upper
+		reverse = true
+	}
+	if order == modelv2.QueryOrder_SORT_DESC {
+		reverse = true
+	}
+	iter := iterable.NewIterator(kv.ScanOpts{
+		Prefix:  fieldKey.Marshal(),
+		Reverse: reverse,
+	})
+	return &FieldIteratorTemplate{
+		delegated: iter,
+		termRange: termRange,
+		fn:        fn,
+		reverse:   reverse,
+		field: Field{
+			Key:  fieldKey.Marshal(),
+			Term: term,
+		},
+	}
+}
+
+type SwitchFn = func(a, b []byte) bool
+
+var _ FieldIterator = (*mergedIterator)(nil)
+
+type mergedIterator struct {
+	inner        []FieldIterator
+	drained      []FieldIterator
+	drainedCount int
+	cur          *PostingValue
+	switchFn     SwitchFn
+	init         bool
+	closed       bool
+}
+
+func NewMergedIterator(merged []FieldIterator, fn SwitchFn) FieldIterator {
+	return &mergedIterator{
+		inner:    merged,
+		drained:  make([]FieldIterator, len(merged)),
+		switchFn: fn,
+	}
+}
+
+func (m *mergedIterator) Next() bool {
+	if m.closed {
+		return false
+	}
+	if m.allDrained() {
+		return false
+	}
+	if !m.init {
+		for i, iterator := range m.inner {
+			if !iterator.Next() {
+				m.drain(i)
+			}
+		}
+		if m.allDrained() {
+			return false
+		}
+		m.init = true
+	}
+	var head FieldIterator
+	var headIndex int
+	for i, iterator := range m.inner {
+		if iterator == nil {
+			continue
+		}
+		if head == nil {
+			head = iterator
+			continue
+		}
+		if m.switchFn(head.Val().Term, iterator.Val().Term) {
+			head = iterator
+			headIndex = i
+		}
+	}
+	m.cur = head.Val()
+	if !head.Next() {
+		m.drain(headIndex)
+	}
+	return true
+}
+
+func (m *mergedIterator) Val() *PostingValue {
+	return m.cur
+}
+
+func (m *mergedIterator) Close() error {
+	m.closed = true
+	var err error
+	for _, iterator := range m.drained {
+		if iterator == nil {
+			continue
+		}
+		err = multierr.Append(err, iterator.Close())
+	}
+	return err
+}
+
+func (m *mergedIterator) drain(index int) {
+	m.drained[index], m.inner[index] = m.inner[index], nil
+	m.drainedCount++
+}
+
+func (m *mergedIterator) allDrained() bool {
+	return m.drainedCount == len(m.inner)
+}
diff --git a/pkg/index/inverted/inverted.go b/pkg/index/lsm/lsm.go
similarity index 57%
copy from pkg/index/inverted/inverted.go
copy to pkg/index/lsm/lsm.go
index 569d434..14961a8 100644
--- a/pkg/index/inverted/inverted.go
+++ b/pkg/index/lsm/lsm.go
@@ -15,33 +15,43 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package inverted
+package lsm
 
 import (
 	"github.com/apache/skywalking-banyandb/api/common"
+	"github.com/apache/skywalking-banyandb/banyand/kv"
+	"github.com/apache/skywalking-banyandb/pkg/convert"
 	"github.com/apache/skywalking-banyandb/pkg/index"
+	"github.com/apache/skywalking-banyandb/pkg/logger"
 )
 
-type GlobalStore interface {
-	Searcher() index.Searcher
-	Insert(field index.Field, docID common.ItemID) error
-}
+var _ index.Store = (*store)(nil)
 
 type store struct {
-	memTable *MemTable
-	//TODO: add data tables
+	lsm kv.Store
 }
 
-func (s *store) Searcher() index.Searcher {
-	return s.memTable
+func (s *store) Close() error {
+	return s.lsm.Close()
 }
 
-func (s *store) Insert(field index.Field, chunkID common.ItemID) error {
-	return s.memTable.Insert(field, chunkID)
+func (s *store) Write(field index.Field, itemID common.ItemID) error {
+	itemIDInt := uint64(itemID)
+	return s.lsm.PutWithVersion(field.Marshal(), convert.Uint64ToBytes(itemIDInt), itemIDInt)
 }
 
-func NewStore(name string) GlobalStore {
-	return &store{
-		memTable: NewMemTable(name),
+type StoreOpts struct {
+	Path   string
+	Logger *logger.Logger
+}
+
+func NewStore(opts StoreOpts) (index.Store, error) {
+	var err error
+	var lsm kv.Store
+	if lsm, err = kv.OpenStore(0, opts.Path, kv.StoreWithLogger(opts.Logger)); err != nil {
+		return nil, err
 	}
+	return &store{
+		lsm: lsm,
+	}, nil
 }
diff --git a/pkg/index/lsm/lsm_test.go b/pkg/index/lsm/lsm_test.go
new file mode 100644
index 0000000..2e8f8f8
--- /dev/null
+++ b/pkg/index/lsm/lsm_test.go
@@ -0,0 +1,69 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package lsm
+
+import (
+	"testing"
+
+	"github.com/stretchr/testify/assert"
+
+	"github.com/apache/skywalking-banyandb/pkg/index/test_cases"
+	"github.com/apache/skywalking-banyandb/pkg/logger"
+	"github.com/apache/skywalking-banyandb/pkg/test"
+)
+
+func TestStore_MatchTerm(t *testing.T) {
+	tester := assert.New(t)
+	path, fn := setUp(tester)
+	s, err := NewStore(StoreOpts{
+		Path:   path,
+		Logger: logger.GetLogger("test"),
+	})
+	defer func() {
+		tester.NoError(s.Close())
+		fn()
+	}()
+	tester.NoError(err)
+	test_cases.SetUp(tester, s)
+	test_cases.RunServiceName(t, s)
+}
+
+func TestStore_Iterator(t *testing.T) {
+	tester := assert.New(t)
+	path, fn := setUp(tester)
+	s, err := NewStore(StoreOpts{
+		Path:   path,
+		Logger: logger.GetLogger("test"),
+	})
+	defer func() {
+		tester.NoError(s.Close())
+		fn()
+	}()
+	tester.NoError(err)
+	data := test_cases.SetUpDuration(tester, s)
+	test_cases.RunDuration(t, data, s)
+}
+
+func setUp(t *assert.Assertions) (tempDir string, deferFunc func()) {
+	t.NoError(logger.Init(logger.Logging{
+		Env:   "dev",
+		Level: "info",
+	}))
+	tempDir, deferFunc = test.Space(t)
+	return tempDir, deferFunc
+}
diff --git a/pkg/index/lsm/search.go b/pkg/index/lsm/search.go
new file mode 100644
index 0000000..53bf0aa
--- /dev/null
+++ b/pkg/index/lsm/search.go
@@ -0,0 +1,84 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package lsm
+
+import (
+	"bytes"
+
+	"github.com/pkg/errors"
+	"go.uber.org/multierr"
+
+	"github.com/apache/skywalking-banyandb/api/common"
+	modelv2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v2"
+	"github.com/apache/skywalking-banyandb/banyand/kv"
+	"github.com/apache/skywalking-banyandb/pkg/convert"
+	"github.com/apache/skywalking-banyandb/pkg/index"
+	"github.com/apache/skywalking-banyandb/pkg/index/posting"
+	"github.com/apache/skywalking-banyandb/pkg/index/posting/roaring"
+)
+
+func (s *store) MatchField(fieldKey index.FieldKey) (list posting.List, err error) {
+	return s.Range(fieldKey, index.RangeOpts{})
+}
+
+func (s *store) MatchTerms(field index.Field) (list posting.List, err error) {
+	list = roaring.NewPostingList()
+	err = s.lsm.GetAll(field.Marshal(), func(itemID []byte) error {
+		list.Insert(common.ItemID(convert.BytesToUint64(itemID)))
+		return nil
+	})
+	if errors.Is(err, kv.ErrKeyNotFound) {
+		return roaring.EmptyPostingList, nil
+	}
+	return
+}
+
+func (s *store) Range(fieldKey index.FieldKey, opts index.RangeOpts) (list posting.List, err error) {
+	iter, found := s.Iterator(fieldKey, opts, modelv2.QueryOrder_SORT_ASC)
+	if !found {
+		return roaring.EmptyPostingList, nil
+	}
+	list = roaring.NewPostingList()
+	for iter.Next() {
+		err = multierr.Append(err, list.Union(iter.Val().Value))
+	}
+	err = multierr.Append(err, iter.Close())
+	return
+}
+
+func (s *store) Iterator(fieldKey index.FieldKey, termRange index.RangeOpts, order modelv2.QueryOrder_Sort) (index.FieldIterator, bool) {
+	return index.NewFieldIteratorTemplate(fieldKey, termRange, order, s.lsm, func(term, value []byte, delegated kv.Iterator) (*index.PostingValue, error) {
+		pv := &index.PostingValue{
+			Term:  term,
+			Value: roaring.NewPostingListWithInitialData(convert.BytesToUint64(value)),
+		}
+
+		for ; delegated.Valid(); delegated.Next() {
+			f := index.Field{}
+			err := f.Unmarshal(delegated.Key())
+			if err != nil {
+				return nil, err
+			}
+			if !bytes.Equal(f.Term, term) {
+				break
+			}
+			pv.Value.Insert(common.ItemID(convert.BytesToUint64(delegated.Val())))
+		}
+		return pv, nil
+	}), true
+}
diff --git a/pkg/index/posting/roaring/roaring.go b/pkg/index/posting/roaring/roaring.go
index 8f1f3d7..d121f89 100644
--- a/pkg/index/posting/roaring/roaring.go
+++ b/pkg/index/posting/roaring/roaring.go
@@ -62,6 +62,14 @@ func NewPostingListWithInitialData(data ...uint64) posting.List {
 	return list
 }
 
+func NewRange(start, end uint64) posting.List {
+	list := &postingsList{
+		bitmap: roaring64.New(),
+	}
+	list.bitmap.AddRange(start, end)
+	return list
+}
+
 func (p *postingsList) Contains(id common.ItemID) bool {
 	return p.bitmap.Contains(uint64(id))
 }
diff --git a/pkg/index/test_cases/duration.go b/pkg/index/test_cases/duration.go
new file mode 100644
index 0000000..af4bf1a
--- /dev/null
+++ b/pkg/index/test_cases/duration.go
@@ -0,0 +1,329 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package test_cases
+
+import (
+	"sort"
+	"testing"
+
+	"github.com/stretchr/testify/assert"
+	"github.com/stretchr/testify/require"
+
+	"github.com/apache/skywalking-banyandb/api/common"
+	modelv2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v2"
+	"github.com/apache/skywalking-banyandb/pkg/convert"
+	"github.com/apache/skywalking-banyandb/pkg/index"
+	"github.com/apache/skywalking-banyandb/pkg/index/posting"
+	"github.com/apache/skywalking-banyandb/pkg/index/posting/roaring"
+)
+
+var (
+	duration = index.FieldKey{
+		IndexRule: "duration",
+	}
+)
+
+type SimpleStore interface {
+	index.FieldIterable
+	index.Writer
+	MatchTerms(field index.Field) (list posting.List, err error)
+}
+
+type args struct {
+	fieldKey  index.FieldKey
+	termRange index.RangeOpts
+	orderType modelv2.QueryOrder_Sort
+}
+
+type result struct {
+	key   []byte
+	items posting.List
+}
+
+func RunDuration(t *testing.T, data map[int]posting.List, store SimpleStore) {
+	tester := assert.New(t)
+	is := require.New(t)
+	tests := []struct {
+		name string
+		args args
+		want []int
+	}{
+		//{
+		//	name: "sort in asc order",
+		//	args: args{
+		//		fieldKey:  duration,
+		//		orderType: modelv2.QueryOrder_SORT_ASC,
+		//	},
+		//	want: []int{50, 200, 500, 1000, 2000},
+		//},
+		//{
+		//	name: "sort in desc order",
+		//	args: args{
+		//		fieldKey:  duration,
+		//		orderType: modelv2.QueryOrder_SORT_DESC,
+		//	},
+		//	want: []int{2000, 1000, 500, 200, 50},
+		//},
+		//{
+		//	name: "scan in (lower, upper) and sort in asc order",
+		//	args: args{
+		//		fieldKey:  duration,
+		//		orderType: modelv2.QueryOrder_SORT_ASC,
+		//		termRange: index.RangeOpts{
+		//			Lower: convert.Int64ToBytes(50),
+		//			Upper: convert.Int64ToBytes(2000),
+		//		},
+		//	},
+		//	want: []int{200, 500, 1000},
+		//},
+		//{
+		//	name: "scan in (lower, upper) and sort in desc order",
+		//	args: args{
+		//		fieldKey:  duration,
+		//		orderType: modelv2.QueryOrder_SORT_DESC,
+		//		termRange: index.RangeOpts{
+		//			Lower: convert.Int64ToBytes(50),
+		//			Upper: convert.Int64ToBytes(2000),
+		//		},
+		//	},
+		//	want: []int{1000, 500, 200},
+		//},
+		//{
+		//	name: "scan in [lower, upper] and sort in asc order",
+		//	args: args{
+		//		fieldKey:  duration,
+		//		orderType: modelv2.QueryOrder_SORT_ASC,
+		//		termRange: index.RangeOpts{
+		//			Lower:         convert.Int64ToBytes(200),
+		//			IncludesLower: true,
+		//			Upper:         convert.Int64ToBytes(1000),
+		//			IncludesUpper: true,
+		//		},
+		//	},
+		//	want: []int{200, 500, 1000},
+		//},
+		//{
+		//	name: "scan in [lower, upper] and sort in desc order",
+		//	args: args{
+		//		fieldKey:  duration,
+		//		orderType: modelv2.QueryOrder_SORT_DESC,
+		//		termRange: index.RangeOpts{
+		//			Lower:         convert.Int64ToBytes(200),
+		//			IncludesLower: true,
+		//			Upper:         convert.Int64ToBytes(1000),
+		//			IncludesUpper: true,
+		//		},
+		//	},
+		//	want: []int{1000, 500, 200},
+		//},
+		{
+			name: "scan in [lower, undefined)  and sort in asc order",
+			args: args{
+				fieldKey:  duration,
+				orderType: modelv2.QueryOrder_SORT_ASC,
+				termRange: index.RangeOpts{
+					Lower:         convert.Int64ToBytes(200),
+					IncludesLower: true,
+				},
+			},
+			want: []int{200, 500, 1000, 2000},
+		},
+		//{
+		//	name: "scan in [lower, undefined) and sort in desc order",
+		//	args: args{
+		//		fieldKey:  duration,
+		//		orderType: modelv2.QueryOrder_SORT_DESC,
+		//		termRange: index.RangeOpts{
+		//			Lower:         convert.Int64ToBytes(200),
+		//			IncludesLower: true,
+		//		},
+		//	},
+		//	want: []int{2000, 1000, 500, 200},
+		//},
+		{
+			name: "scan in (undefined, upper] and sort in asc order",
+			args: args{
+				fieldKey:  duration,
+				orderType: modelv2.QueryOrder_SORT_ASC,
+				termRange: index.RangeOpts{
+					Upper:         convert.Int64ToBytes(1000),
+					IncludesUpper: true,
+				},
+			},
+			want: []int{50, 200, 500, 1000},
+		},
+		//{
+		//	name: "scan in (undefined, upper] and sort in desc order",
+		//	args: args{
+		//		fieldKey:  duration,
+		//		orderType: modelv2.QueryOrder_SORT_DESC,
+		//		termRange: index.RangeOpts{
+		//			Upper:         convert.Int64ToBytes(1000),
+		//			IncludesUpper: true,
+		//		},
+		//	},
+		//	want: []int{1000, 500, 200, 50},
+		//},
+		{
+			name: "scan splice in (lower, upper) and sort in asc order",
+			args: args{
+				fieldKey:  duration,
+				orderType: modelv2.QueryOrder_SORT_ASC,
+				termRange: index.RangeOpts{
+					Lower: convert.Int64ToBytes(50 + 100),
+					Upper: convert.Int64ToBytes(2000 - 100),
+				},
+			},
+			want: []int{200, 500, 1000},
+		},
+		{
+			name: "scan splice in (lower, upper) and sort in desc order",
+			args: args{
+				fieldKey:  duration,
+				orderType: modelv2.QueryOrder_SORT_DESC,
+				termRange: index.RangeOpts{
+					Lower: convert.Int64ToBytes(50 + 100),
+					Upper: convert.Int64ToBytes(2000 - 100),
+				},
+			},
+			want: []int{1000, 500, 200},
+		},
+		{
+			name: "scan splice in [lower, upper] and sort in asc order",
+			args: args{
+				fieldKey:  duration,
+				orderType: modelv2.QueryOrder_SORT_ASC,
+				termRange: index.RangeOpts{
+					Lower:         convert.Int64ToBytes(50 + 100),
+					IncludesLower: true,
+					Upper:         convert.Int64ToBytes(2000 - 100),
+					IncludesUpper: true,
+				},
+			},
+			want: []int{200, 500, 1000},
+		},
+		{
+			name: "scan splice in [lower, upper] and sort in desc order",
+			args: args{
+				fieldKey:  duration,
+				orderType: modelv2.QueryOrder_SORT_DESC,
+				termRange: index.RangeOpts{
+					Lower:         convert.Int64ToBytes(50 + 100),
+					IncludesLower: true,
+					Upper:         convert.Int64ToBytes(2000 - 100),
+					IncludesUpper: true,
+				},
+			},
+			want: []int{1000, 500, 200},
+		},
+		{
+			name: "no field key",
+			args: args{},
+		},
+		{
+			name: "unknown field key",
+			args: args{
+				fieldKey: index.FieldKey{
+					IndexRule: "unknown",
+				},
+			},
+		},
+		{
+			name: "default order",
+			args: args{
+				fieldKey: duration,
+			},
+			want: []int{50, 200, 500, 1000, 2000},
+		},
+		{
+			name: "invalid range",
+			args: args{
+				fieldKey: duration,
+				termRange: index.RangeOpts{
+					Lower: convert.Int64ToBytes(100),
+					Upper: convert.Int64ToBytes(50),
+				},
+			},
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			iter, found := store.Iterator(tt.args.fieldKey, tt.args.termRange, tt.args.orderType)
+			if !found {
+				tester.Empty(tt.want)
+				return
+			}
+			defer func() {
+				tester.NoError(iter.Close())
+				for i := 0; i < 10; i++ {
+					is.False(iter.Next())
+				}
+			}()
+			is.NotNil(iter)
+			var got []result
+			for iter.Next() {
+				got = append(got, result{
+					key:   iter.Val().Term,
+					items: iter.Val().Value,
+				})
+			}
+			for i := 0; i < 10; i++ {
+				is.False(iter.Next())
+			}
+			is.Equal(len(tt.want), len(got))
+			for i, w := range tt.want {
+				g := got[i]
+				tester.Equal(int64(w), convert.BytesToInt64(g.key))
+				tester.True(data[w].Equal(g.items))
+			}
+		})
+	}
+}
+
+func SetUpDuration(t *assert.Assertions, store index.Writer) map[int]posting.List {
+	r := map[int]posting.List{
+		50:   roaring.NewPostingList(),
+		200:  roaring.NewPostingList(),
+		500:  roaring.NewPostingList(),
+		1000: roaring.NewPostingList(),
+		2000: roaring.NewPostingList(),
+	}
+	return SetUpPartialDuration(t, store, r)
+}
+
+func SetUpPartialDuration(t *assert.Assertions, store index.Writer, r map[int]posting.List) map[int]posting.List {
+	idx := make([]int, 0, len(r))
+	for key, _ := range r {
+		idx = append(idx, key)
+	}
+	sort.Ints(idx)
+	for i := 100; i < 200; i++ {
+		id := common.ItemID(i)
+		for i2, term := range idx {
+			if i%len(idx) != i2 || r[term] == nil {
+				continue
+			}
+			t.NoError(store.Write(index.Field{
+				Key:  duration.Marshal(),
+				Term: convert.Int64ToBytes(int64(term)),
+			}, id))
+			r[term].Insert(id)
+		}
+	}
+	return r
+}
diff --git a/pkg/index/test_cases/service_name.go b/pkg/index/test_cases/service_name.go
new file mode 100644
index 0000000..40a0714
--- /dev/null
+++ b/pkg/index/test_cases/service_name.go
@@ -0,0 +1,100 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package test_cases
+
+import (
+	"testing"
+
+	"github.com/stretchr/testify/assert"
+
+	"github.com/apache/skywalking-banyandb/api/common"
+	"github.com/apache/skywalking-banyandb/pkg/index"
+	"github.com/apache/skywalking-banyandb/pkg/index/posting"
+	"github.com/apache/skywalking-banyandb/pkg/index/posting/roaring"
+)
+
+var serviceName = index.FieldKey{
+	IndexRule: "service_name",
+}.Marshal()
+
+func RunServiceName(t *testing.T, store SimpleStore) {
+	tester := assert.New(t)
+	tests := []struct {
+		name    string
+		arg     index.Field
+		want    posting.List
+		wantErr bool
+	}{
+		{
+			name: "match gateway",
+			arg: index.Field{
+				Key:  serviceName,
+				Term: []byte("gateway"),
+			},
+			want: roaring.NewRange(0, 50),
+		},
+		{
+			name: "match webpage",
+			arg: index.Field{
+				Key:  serviceName,
+				Term: []byte("webpage"),
+			},
+			want: roaring.NewRange(50, 100),
+		},
+		{
+			name: "unknown field",
+			want: roaring.EmptyPostingList,
+		},
+		{
+			name: "unknown term",
+			arg: index.Field{
+				Key:  serviceName,
+				Term: []byte("unknown"),
+			},
+			want: roaring.EmptyPostingList,
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			list, err := store.MatchTerms(tt.arg)
+			if tt.wantErr {
+				tester.Error(err)
+				return
+			}
+			tester.NoError(err)
+			tester.NotNil(list)
+			tester.True(tt.want.Equal(list))
+		})
+	}
+}
+
+func SetUp(t *assert.Assertions, store SimpleStore) {
+	for i := 0; i < 100; i++ {
+		if i < 100/2 {
+			t.NoError(store.Write(index.Field{
+				Key:  serviceName,
+				Term: []byte("gateway"),
+			}, common.ItemID(i)))
+		} else {
+			t.NoError(store.Write(index.Field{
+				Key:  serviceName,
+				Term: []byte("webpage"),
+			}, common.ItemID(i)))
+		}
+	}
+}
diff --git a/pkg/index/search.go b/pkg/index/tree.go
similarity index 87%
rename from pkg/index/search.go
rename to pkg/index/tree.go
index 3f304d5..1f5bda1 100644
--- a/pkg/index/search.go
+++ b/pkg/index/tree.go
@@ -25,7 +25,6 @@ import (
 
 	"github.com/pkg/errors"
 
-	"github.com/apache/skywalking-banyandb/api/common"
 	modelv2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v2"
 	"github.com/apache/skywalking-banyandb/pkg/index/posting"
 )
@@ -38,18 +37,7 @@ type Executor interface {
 
 type Tree interface {
 	Executor
-}
-
-type FieldKey struct {
-	SeriesID  common.SeriesID
-	IndexRule string
-}
-
-func (t *FieldKey) Marshal() []byte {
-	return bytes.Join([][]byte{
-		t.SeriesID.Marshal(),
-		[]byte(t.IndexRule),
-	}, []byte(":"))
+	TrimRangeLeaf(key FieldKey) (rangeOpts RangeOpts, found bool)
 }
 
 type Condition map[FieldKey][]ConditionValue
@@ -66,8 +54,7 @@ func BuildTree(searcher Searcher, condMap Condition) (Tree, error) {
 			searcher: searcher,
 		},
 	}
-	for term, conds := range condMap {
-		key := term.Marshal()
+	for key, conds := range condMap {
 		var rangeLeaf *rangeOp
 		for _, cond := range conds {
 			if rangeLeaf != nil && !rangeOP(cond.Op) {
@@ -136,7 +123,7 @@ type node struct {
 	SubNodes []Executor `json:"sub_nodes,omitempty"`
 }
 
-func (n *node) newEq(key []byte, values [][]byte) *eq {
+func (n *node) newEq(key FieldKey, values [][]byte) *eq {
 	return &eq{
 		leaf: &leaf{
 			Key:      key,
@@ -146,11 +133,11 @@ func (n *node) newEq(key []byte, values [][]byte) *eq {
 	}
 }
 
-func (n *node) addEq(key []byte, values [][]byte) {
+func (n *node) addEq(key FieldKey, values [][]byte) {
 	n.SubNodes = append(n.SubNodes, n.newEq(key, values))
 }
 
-func (n *node) addNot(key []byte, inner Executor) {
+func (n *node) addNot(key FieldKey, inner Executor) {
 	n.SubNodes = append(n.SubNodes, &not{
 		Key:      key,
 		searcher: n.searcher,
@@ -158,13 +145,13 @@ func (n *node) addNot(key []byte, inner Executor) {
 	})
 }
 
-func (n *node) addRangeLeaf(key []byte) *rangeOp {
+func (n *node) addRangeLeaf(key FieldKey) *rangeOp {
 	r := &rangeOp{
 		leaf: &leaf{
 			Key:      key,
 			searcher: n.searcher,
 		},
-		Opts: &RangeOpts{},
+		Opts: RangeOpts{},
 	}
 	n.SubNodes = append(n.SubNodes, r)
 	return r
@@ -221,6 +208,23 @@ type andNode struct {
 	*node
 }
 
+func (an *andNode) TrimRangeLeaf(key FieldKey) (RangeOpts, bool) {
+	removeLeaf := func(s []Executor, index int) []Executor {
+		return append(s[:index], s[index+1:]...)
+	}
+	for i, subNode := range an.SubNodes {
+		leafRange, ok := subNode.(*rangeOp)
+		if !ok {
+			continue
+		}
+		if key.Equal(leafRange.Key) {
+			an.SubNodes = removeLeaf(an.SubNodes, i)
+			return leafRange.Opts, true
+		}
+	}
+	return RangeOpts{}, false
+}
+
 func (an *andNode) merge(list posting.List) error {
 	return an.value.Intersect(list)
 }
@@ -255,20 +259,23 @@ func (on *orNode) MarshalJSON() ([]byte, error) {
 
 type leaf struct {
 	Executor
-	Key      []byte
+	Key      FieldKey
 	Values   [][]byte
 	searcher Searcher
 }
 
 type not struct {
 	Executor
-	Key      []byte
+	Key      FieldKey
 	searcher Searcher
 	Inner    Executor
 }
 
 func (n *not) Execute() (posting.List, error) {
-	all := n.searcher.MatchField(n.Key)
+	all, err := n.searcher.MatchField(n.Key)
+	if err != nil {
+		return nil, err
+	}
 	list, err := n.Inner.Execute()
 	if err != nil {
 		return nil, err
@@ -289,9 +296,9 @@ type eq struct {
 
 func (eq *eq) Execute() (posting.List, error) {
 	return eq.searcher.MatchTerms(Field{
-		Key:  eq.Key,
+		Key:  eq.Key.Marshal(),
 		Term: bytes.Join(eq.Values, nil),
-	}), nil
+	})
 }
 
 func (eq *eq) MarshalJSON() ([]byte, error) {
@@ -302,11 +309,11 @@ func (eq *eq) MarshalJSON() ([]byte, error) {
 
 type rangeOp struct {
 	*leaf
-	Opts *RangeOpts
+	Opts RangeOpts
 }
 
 func (r *rangeOp) Execute() (posting.List, error) {
-	return r.searcher.Range(r.Key, *r.Opts), nil
+	return r.searcher.Range(r.Key, r.Opts)
 }
 
 func (r *rangeOp) MarshalJSON() ([]byte, error) {

[skywalking-banyandb] 03/03: Fix desc scan errors

Posted by ha...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch time-series
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit eb7429f7904e205867b17bc134da43122d3d0267
Author: Gao Hongtao <ha...@gmail.com>
AuthorDate: Sun Sep 12 21:57:58 2021 +0800

    Fix desc scan errors
    
    Signed-off-by: Gao Hongtao <ha...@gmail.com>
---
 banyand/kv/badger.go             |   8 +-
 pkg/index/iterator.go            |   4 +
 pkg/index/test_cases/duration.go | 178 +++++++++++++++++++++------------------
 3 files changed, 107 insertions(+), 83 deletions(-)

diff --git a/banyand/kv/badger.go b/banyand/kv/badger.go
index 92a8529..a0b0a1b 100644
--- a/banyand/kv/badger.go
+++ b/banyand/kv/badger.go
@@ -141,6 +141,7 @@ func (b *badgerDB) Scan(key []byte, opt ScanOpts, f ScanFunc) error {
 var _ Iterator = (*iterator)(nil)
 
 type iterator struct {
+	reverse   bool
 	delegated y.Iterator
 }
 
@@ -153,7 +154,11 @@ func (i *iterator) Rewind() {
 }
 
 func (i *iterator) Seek(key []byte) {
-	i.delegated.Seek(y.KeyWithTs(key, math.MaxInt64))
+	if i.reverse {
+		i.delegated.Seek(y.KeyWithTs(key, 0))
+	} else {
+		i.delegated.Seek(y.KeyWithTs(key, math.MaxInt64))
+	}
 }
 
 func (i *iterator) Key() []byte {
@@ -181,6 +186,7 @@ func (b *badgerDB) NewIterator(opt ScanOpts) Iterator {
 	it := b.db.NewIterator(opts)
 	return &iterator{
 		delegated: it,
+		reverse:   opts.Reverse,
 	}
 }
 
diff --git a/pkg/index/iterator.go b/pkg/index/iterator.go
index 439b627..ef41c92 100644
--- a/pkg/index/iterator.go
+++ b/pkg/index/iterator.go
@@ -109,6 +109,10 @@ func NewFieldIteratorTemplate(fieldKey FieldKey, termRange RangeOpts, order mode
 		Prefix:  fieldKey.Marshal(),
 		Reverse: reverse,
 	})
+	iter = iterable.NewIterator(kv.ScanOpts{
+		Prefix:  fieldKey.Marshal(),
+		Reverse: reverse,
+	})
 	return &FieldIteratorTemplate{
 		delegated: iter,
 		termRange: termRange,
diff --git a/pkg/index/test_cases/duration.go b/pkg/index/test_cases/duration.go
index af4bf1a..e6277c2 100644
--- a/pkg/index/test_cases/duration.go
+++ b/pkg/index/test_cases/duration.go
@@ -51,8 +51,8 @@ type args struct {
 }
 
 type result struct {
-	key   []byte
-	items posting.List
+	key   int
+	items []int
 }
 
 func RunDuration(t *testing.T, data map[int]posting.List, store SimpleStore) {
@@ -63,14 +63,14 @@ func RunDuration(t *testing.T, data map[int]posting.List, store SimpleStore) {
 		args args
 		want []int
 	}{
-		//{
-		//	name: "sort in asc order",
-		//	args: args{
-		//		fieldKey:  duration,
-		//		orderType: modelv2.QueryOrder_SORT_ASC,
-		//	},
-		//	want: []int{50, 200, 500, 1000, 2000},
-		//},
+		{
+			name: "sort in asc order",
+			args: args{
+				fieldKey:  duration,
+				orderType: modelv2.QueryOrder_SORT_ASC,
+			},
+			want: []int{50, 200, 500, 1000, 2000},
+		},
 		//{
 		//	name: "sort in desc order",
 		//	args: args{
@@ -79,58 +79,58 @@ func RunDuration(t *testing.T, data map[int]posting.List, store SimpleStore) {
 		//	},
 		//	want: []int{2000, 1000, 500, 200, 50},
 		//},
-		//{
-		//	name: "scan in (lower, upper) and sort in asc order",
-		//	args: args{
-		//		fieldKey:  duration,
-		//		orderType: modelv2.QueryOrder_SORT_ASC,
-		//		termRange: index.RangeOpts{
-		//			Lower: convert.Int64ToBytes(50),
-		//			Upper: convert.Int64ToBytes(2000),
-		//		},
-		//	},
-		//	want: []int{200, 500, 1000},
-		//},
-		//{
-		//	name: "scan in (lower, upper) and sort in desc order",
-		//	args: args{
-		//		fieldKey:  duration,
-		//		orderType: modelv2.QueryOrder_SORT_DESC,
-		//		termRange: index.RangeOpts{
-		//			Lower: convert.Int64ToBytes(50),
-		//			Upper: convert.Int64ToBytes(2000),
-		//		},
-		//	},
-		//	want: []int{1000, 500, 200},
-		//},
-		//{
-		//	name: "scan in [lower, upper] and sort in asc order",
-		//	args: args{
-		//		fieldKey:  duration,
-		//		orderType: modelv2.QueryOrder_SORT_ASC,
-		//		termRange: index.RangeOpts{
-		//			Lower:         convert.Int64ToBytes(200),
-		//			IncludesLower: true,
-		//			Upper:         convert.Int64ToBytes(1000),
-		//			IncludesUpper: true,
-		//		},
-		//	},
-		//	want: []int{200, 500, 1000},
-		//},
-		//{
-		//	name: "scan in [lower, upper] and sort in desc order",
-		//	args: args{
-		//		fieldKey:  duration,
-		//		orderType: modelv2.QueryOrder_SORT_DESC,
-		//		termRange: index.RangeOpts{
-		//			Lower:         convert.Int64ToBytes(200),
-		//			IncludesLower: true,
-		//			Upper:         convert.Int64ToBytes(1000),
-		//			IncludesUpper: true,
-		//		},
-		//	},
-		//	want: []int{1000, 500, 200},
-		//},
+		{
+			name: "scan in (lower, upper) and sort in asc order",
+			args: args{
+				fieldKey:  duration,
+				orderType: modelv2.QueryOrder_SORT_ASC,
+				termRange: index.RangeOpts{
+					Lower: convert.Int64ToBytes(50),
+					Upper: convert.Int64ToBytes(2000),
+				},
+			},
+			want: []int{200, 500, 1000},
+		},
+		{
+			name: "scan in (lower, upper) and sort in desc order",
+			args: args{
+				fieldKey:  duration,
+				orderType: modelv2.QueryOrder_SORT_DESC,
+				termRange: index.RangeOpts{
+					Lower: convert.Int64ToBytes(50),
+					Upper: convert.Int64ToBytes(2000),
+				},
+			},
+			want: []int{1000, 500, 200},
+		},
+		{
+			name: "scan in [lower, upper] and sort in asc order",
+			args: args{
+				fieldKey:  duration,
+				orderType: modelv2.QueryOrder_SORT_ASC,
+				termRange: index.RangeOpts{
+					Lower:         convert.Int64ToBytes(200),
+					IncludesLower: true,
+					Upper:         convert.Int64ToBytes(1000),
+					IncludesUpper: true,
+				},
+			},
+			want: []int{200, 500, 1000},
+		},
+		{
+			name: "scan in [lower, upper] and sort in desc order",
+			args: args{
+				fieldKey:  duration,
+				orderType: modelv2.QueryOrder_SORT_DESC,
+				termRange: index.RangeOpts{
+					Lower:         convert.Int64ToBytes(200),
+					IncludesLower: true,
+					Upper:         convert.Int64ToBytes(1000),
+					IncludesUpper: true,
+				},
+			},
+			want: []int{1000, 500, 200},
+		},
 		{
 			name: "scan in [lower, undefined)  and sort in asc order",
 			args: args{
@@ -167,18 +167,18 @@ func RunDuration(t *testing.T, data map[int]posting.List, store SimpleStore) {
 			},
 			want: []int{50, 200, 500, 1000},
 		},
-		//{
-		//	name: "scan in (undefined, upper] and sort in desc order",
-		//	args: args{
-		//		fieldKey:  duration,
-		//		orderType: modelv2.QueryOrder_SORT_DESC,
-		//		termRange: index.RangeOpts{
-		//			Upper:         convert.Int64ToBytes(1000),
-		//			IncludesUpper: true,
-		//		},
-		//	},
-		//	want: []int{1000, 500, 200, 50},
-		//},
+		{
+			name: "scan in (undefined, upper] and sort in desc order",
+			args: args{
+				fieldKey:  duration,
+				orderType: modelv2.QueryOrder_SORT_DESC,
+				termRange: index.RangeOpts{
+					Upper:         convert.Int64ToBytes(1000),
+					IncludesUpper: true,
+				},
+			},
+			want: []int{1000, 500, 200, 50},
+		},
 		{
 			name: "scan splice in (lower, upper) and sort in asc order",
 			args: args{
@@ -275,26 +275,40 @@ func RunDuration(t *testing.T, data map[int]posting.List, store SimpleStore) {
 				}
 			}()
 			is.NotNil(iter)
-			var got []result
+			got := make([]result, 0)
 			for iter.Next() {
 				got = append(got, result{
-					key:   iter.Val().Term,
-					items: iter.Val().Value,
+					key:   int(convert.BytesToInt64(iter.Val().Term)),
+					items: toArray(iter.Val().Value),
 				})
 			}
 			for i := 0; i < 10; i++ {
 				is.False(iter.Next())
 			}
-			is.Equal(len(tt.want), len(got))
-			for i, w := range tt.want {
-				g := got[i]
-				tester.Equal(int64(w), convert.BytesToInt64(g.key))
-				tester.True(data[w].Equal(g.items))
+			wants := make([]result, 0, len(tt.want))
+			for _, w := range tt.want {
+				wants = append(wants, result{
+					key:   w,
+					items: toArray(data[w]),
+				})
 			}
+			tester.Equal(wants, got)
 		})
 	}
 }
 
+func toArray(list posting.List) []int {
+	ints := make([]int, 0, list.Len())
+	iter := list.Iterator()
+	defer func(iter posting.Iterator) {
+		_ = iter.Close()
+	}(iter)
+	for iter.Next() {
+		ints = append(ints, int(iter.Current()))
+	}
+	return ints
+}
+
 func SetUpDuration(t *assert.Assertions, store index.Writer) map[int]posting.List {
 	r := map[int]posting.List{
 		50:   roaring.NewPostingList(),

[skywalking-banyandb] 01/03: Some improvment

Posted by ha...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch time-series
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit b47fa44d5754d02ee51cc08495828a9417785a05
Author: Gao Hongtao <ha...@gmail.com>
AuthorDate: Thu Sep 9 11:06:59 2021 +0800

    Some improvment
    
    Signed-off-by: Gao Hongtao <ha...@gmail.com>
---
 api/proto/banyandb/database/v2/schema.pb.go | 238 ++++++++++++++--------------
 api/proto/banyandb/database/v2/schema.proto |   4 +-
 banyand/kv/kv.go                            |   3 -
 3 files changed, 121 insertions(+), 124 deletions(-)

diff --git a/api/proto/banyandb/database/v2/schema.pb.go b/api/proto/banyandb/database/v2/schema.pb.go
index f2fed85..f277eb9 100644
--- a/api/proto/banyandb/database/v2/schema.pb.go
+++ b/api/proto/banyandb/database/v2/schema.pb.go
@@ -309,7 +309,7 @@ func (x *Duration) GetUnit() Duration_DurationUnit {
 	return Duration_DURATION_UNIT_UNSPECIFIED
 }
 
-type TagFamily struct {
+type TagFamilySpec struct {
 	state         protoimpl.MessageState
 	sizeCache     protoimpl.SizeCache
 	unknownFields protoimpl.UnknownFields
@@ -319,8 +319,8 @@ type TagFamily struct {
 	Tags []*TagSpec `protobuf:"bytes,2,rep,name=tags,proto3" json:"tags,omitempty"`
 }
 
-func (x *TagFamily) Reset() {
-	*x = TagFamily{}
+func (x *TagFamilySpec) Reset() {
+	*x = TagFamilySpec{}
 	if protoimpl.UnsafeEnabled {
 		mi := &file_banyandb_database_v2_schema_proto_msgTypes[1]
 		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
@@ -328,13 +328,13 @@ func (x *TagFamily) Reset() {
 	}
 }
 
-func (x *TagFamily) String() string {
+func (x *TagFamilySpec) String() string {
 	return protoimpl.X.MessageStringOf(x)
 }
 
-func (*TagFamily) ProtoMessage() {}
+func (*TagFamilySpec) ProtoMessage() {}
 
-func (x *TagFamily) ProtoReflect() protoreflect.Message {
+func (x *TagFamilySpec) ProtoReflect() protoreflect.Message {
 	mi := &file_banyandb_database_v2_schema_proto_msgTypes[1]
 	if protoimpl.UnsafeEnabled && x != nil {
 		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
@@ -346,19 +346,19 @@ func (x *TagFamily) ProtoReflect() protoreflect.Message {
 	return mi.MessageOf(x)
 }
 
-// Deprecated: Use TagFamily.ProtoReflect.Descriptor instead.
-func (*TagFamily) Descriptor() ([]byte, []int) {
+// Deprecated: Use TagFamilySpec.ProtoReflect.Descriptor instead.
+func (*TagFamilySpec) Descriptor() ([]byte, []int) {
 	return file_banyandb_database_v2_schema_proto_rawDescGZIP(), []int{1}
 }
 
-func (x *TagFamily) GetName() string {
+func (x *TagFamilySpec) GetName() string {
 	if x != nil {
 		return x.Name
 	}
 	return ""
 }
 
-func (x *TagFamily) GetTags() []*TagSpec {
+func (x *TagFamilySpec) GetTags() []*TagSpec {
 	if x != nil {
 		return x.Tags
 	}
@@ -429,7 +429,7 @@ type Stream struct {
 	// metadata is the identity of a trace series
 	Metadata *v2.Metadata `protobuf:"bytes,1,opt,name=metadata,proto3" json:"metadata,omitempty"`
 	// tag_families
-	TagFamilies []*TagFamily `protobuf:"bytes,2,rep,name=tag_families,json=tagFamilies,proto3" json:"tag_families,omitempty"`
+	TagFamilies []*TagFamilySpec `protobuf:"bytes,2,rep,name=tag_families,json=tagFamilies,proto3" json:"tag_families,omitempty"`
 	// entity indicates how to generate a series and shard a stream
 	Entity *Entity `protobuf:"bytes,3,opt,name=entity,proto3" json:"entity,omitempty"`
 	// duration determines how long a TraceSeries keeps its data
@@ -478,7 +478,7 @@ func (x *Stream) GetMetadata() *v2.Metadata {
 	return nil
 }
 
-func (x *Stream) GetTagFamilies() []*TagFamily {
+func (x *Stream) GetTagFamilies() []*TagFamilySpec {
 	if x != nil {
 		return x.TagFamilies
 	}
@@ -828,111 +828,111 @@ var file_banyandb_database_v2_schema_proto_rawDesc = []byte{
 	0x55, 0x4e, 0x49, 0x54, 0x5f, 0x44, 0x41, 0x59, 0x10, 0x02, 0x12, 0x16, 0x0a, 0x12, 0x44, 0x55,
 	0x52, 0x41, 0x54, 0x49, 0x4f, 0x4e, 0x5f, 0x55, 0x4e, 0x49, 0x54, 0x5f, 0x57, 0x45, 0x45, 0x4b,
 	0x10, 0x03, 0x12, 0x17, 0x0a, 0x13, 0x44, 0x55, 0x52, 0x41, 0x54, 0x49, 0x4f, 0x4e, 0x5f, 0x55,
-	0x4e, 0x49, 0x54, 0x5f, 0x4d, 0x4f, 0x4e, 0x54, 0x48, 0x10, 0x04, 0x22, 0x52, 0x0a, 0x09, 0x54,
-	0x61, 0x67, 0x46, 0x61, 0x6d, 0x69, 0x6c, 0x79, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65,
-	0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x31, 0x0a, 0x04,
-	0x74, 0x61, 0x67, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x1d, 0x2e, 0x62, 0x61, 0x6e,
-	0x79, 0x61, 0x6e, 0x64, 0x62, 0x2e, 0x64, 0x61, 0x74, 0x61, 0x62, 0x61, 0x73, 0x65, 0x2e, 0x76,
-	0x32, 0x2e, 0x54, 0x61, 0x67, 0x53, 0x70, 0x65, 0x63, 0x52, 0x04, 0x74, 0x61, 0x67, 0x73, 0x22,
-	0x50, 0x0a, 0x07, 0x54, 0x61, 0x67, 0x53, 0x70, 0x65, 0x63, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61,
-	0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x31,
-	0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x1d, 0x2e, 0x62,
-	0x61, 0x6e, 0x79, 0x61, 0x6e, 0x64, 0x62, 0x2e, 0x64, 0x61, 0x74, 0x61, 0x62, 0x61, 0x73, 0x65,
-	0x2e, 0x76, 0x32, 0x2e, 0x54, 0x61, 0x67, 0x54, 0x79, 0x70, 0x65, 0x52, 0x04, 0x74, 0x79, 0x70,
-	0x65, 0x22, 0xd0, 0x02, 0x0a, 0x06, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x12, 0x38, 0x0a, 0x08,
-	0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1c,
-	0x2e, 0x62, 0x61, 0x6e, 0x79, 0x61, 0x6e, 0x64, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x6d, 0x6f, 0x6e,
-	0x2e, 0x76, 0x32, 0x2e, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x52, 0x08, 0x6d, 0x65,
-	0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x12, 0x42, 0x0a, 0x0c, 0x74, 0x61, 0x67, 0x5f, 0x66, 0x61,
-	0x6d, 0x69, 0x6c, 0x69, 0x65, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x1f, 0x2e, 0x62,
-	0x61, 0x6e, 0x79, 0x61, 0x6e, 0x64, 0x62, 0x2e, 0x64, 0x61, 0x74, 0x61, 0x62, 0x61, 0x73, 0x65,
-	0x2e, 0x76, 0x32, 0x2e, 0x54, 0x61, 0x67, 0x46, 0x61, 0x6d, 0x69, 0x6c, 0x79, 0x52, 0x0b, 0x74,
-	0x61, 0x67, 0x46, 0x61, 0x6d, 0x69, 0x6c, 0x69, 0x65, 0x73, 0x12, 0x34, 0x0a, 0x06, 0x65, 0x6e,
-	0x74, 0x69, 0x74, 0x79, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1c, 0x2e, 0x62, 0x61, 0x6e,
-	0x79, 0x61, 0x6e, 0x64, 0x62, 0x2e, 0x64, 0x61, 0x74, 0x61, 0x62, 0x61, 0x73, 0x65, 0x2e, 0x76,
-	0x32, 0x2e, 0x45, 0x6e, 0x74, 0x69, 0x74, 0x79, 0x52, 0x06, 0x65, 0x6e, 0x74, 0x69, 0x74, 0x79,
-	0x12, 0x1b, 0x0a, 0x09, 0x73, 0x68, 0x61, 0x72, 0x64, 0x5f, 0x6e, 0x75, 0x6d, 0x18, 0x04, 0x20,
-	0x01, 0x28, 0x0d, 0x52, 0x08, 0x73, 0x68, 0x61, 0x72, 0x64, 0x4e, 0x75, 0x6d, 0x12, 0x3a, 0x0a,
-	0x08, 0x64, 0x75, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x05, 0x20, 0x01, 0x28, 0x0b, 0x32,
-	0x1e, 0x2e, 0x62, 0x61, 0x6e, 0x79, 0x61, 0x6e, 0x64, 0x62, 0x2e, 0x64, 0x61, 0x74, 0x61, 0x62,
-	0x61, 0x73, 0x65, 0x2e, 0x76, 0x32, 0x2e, 0x44, 0x75, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52,
-	0x08, 0x64, 0x75, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x39, 0x0a, 0x0a, 0x75, 0x70, 0x64,
-	0x61, 0x74, 0x65, 0x64, 0x5f, 0x61, 0x74, 0x18, 0x06, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e,
-	0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e,
-	0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x09, 0x75, 0x70, 0x64, 0x61, 0x74,
-	0x65, 0x64, 0x41, 0x74, 0x22, 0x25, 0x0a, 0x06, 0x45, 0x6e, 0x74, 0x69, 0x74, 0x79, 0x12, 0x1b,
-	0x0a, 0x09, 0x74, 0x61, 0x67, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28,
-	0x09, 0x52, 0x08, 0x74, 0x61, 0x67, 0x4e, 0x61, 0x6d, 0x65, 0x73, 0x22, 0xa4, 0x03, 0x0a, 0x09,
-	0x49, 0x6e, 0x64, 0x65, 0x78, 0x52, 0x75, 0x6c, 0x65, 0x12, 0x38, 0x0a, 0x08, 0x6d, 0x65, 0x74,
-	0x61, 0x64, 0x61, 0x74, 0x61, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1c, 0x2e, 0x62, 0x61,
-	0x6e, 0x79, 0x61, 0x6e, 0x64, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x6d, 0x6f, 0x6e, 0x2e, 0x76, 0x32,
-	0x2e, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x52, 0x08, 0x6d, 0x65, 0x74, 0x61, 0x64,
-	0x61, 0x74, 0x61, 0x12, 0x12, 0x0a, 0x04, 0x74, 0x61, 0x67, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28,
-	0x09, 0x52, 0x04, 0x74, 0x61, 0x67, 0x73, 0x12, 0x38, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18,
-	0x03, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x24, 0x2e, 0x62, 0x61, 0x6e, 0x79, 0x61, 0x6e, 0x64, 0x62,
-	0x2e, 0x64, 0x61, 0x74, 0x61, 0x62, 0x61, 0x73, 0x65, 0x2e, 0x76, 0x32, 0x2e, 0x49, 0x6e, 0x64,
-	0x65, 0x78, 0x52, 0x75, 0x6c, 0x65, 0x2e, 0x54, 0x79, 0x70, 0x65, 0x52, 0x04, 0x74, 0x79, 0x70,
-	0x65, 0x12, 0x44, 0x0a, 0x08, 0x6c, 0x6f, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x04, 0x20,
-	0x01, 0x28, 0x0e, 0x32, 0x28, 0x2e, 0x62, 0x61, 0x6e, 0x79, 0x61, 0x6e, 0x64, 0x62, 0x2e, 0x64,
-	0x61, 0x74, 0x61, 0x62, 0x61, 0x73, 0x65, 0x2e, 0x76, 0x32, 0x2e, 0x49, 0x6e, 0x64, 0x65, 0x78,
-	0x52, 0x75, 0x6c, 0x65, 0x2e, 0x4c, 0x6f, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x08, 0x6c,
-	0x6f, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x39, 0x0a, 0x0a, 0x75, 0x70, 0x64, 0x61, 0x74,
-	0x65, 0x64, 0x5f, 0x61, 0x74, 0x18, 0x05, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f,
-	0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69,
-	0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x09, 0x75, 0x70, 0x64, 0x61, 0x74, 0x65, 0x64,
-	0x41, 0x74, 0x22, 0x3e, 0x0a, 0x04, 0x54, 0x79, 0x70, 0x65, 0x12, 0x14, 0x0a, 0x10, 0x54, 0x59,
-	0x50, 0x45, 0x5f, 0x55, 0x4e, 0x53, 0x50, 0x45, 0x43, 0x49, 0x46, 0x49, 0x45, 0x44, 0x10, 0x00,
-	0x12, 0x0d, 0x0a, 0x09, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x54, 0x52, 0x45, 0x45, 0x10, 0x01, 0x12,
-	0x11, 0x0a, 0x0d, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x49, 0x4e, 0x56, 0x45, 0x52, 0x54, 0x45, 0x44,
-	0x10, 0x02, 0x22, 0x4e, 0x0a, 0x08, 0x4c, 0x6f, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x18,
-	0x0a, 0x14, 0x4c, 0x4f, 0x43, 0x41, 0x54, 0x49, 0x4f, 0x4e, 0x5f, 0x55, 0x4e, 0x53, 0x50, 0x45,
-	0x43, 0x49, 0x46, 0x49, 0x45, 0x44, 0x10, 0x00, 0x12, 0x13, 0x0a, 0x0f, 0x4c, 0x4f, 0x43, 0x41,
-	0x54, 0x49, 0x4f, 0x4e, 0x5f, 0x53, 0x45, 0x52, 0x49, 0x45, 0x53, 0x10, 0x01, 0x12, 0x13, 0x0a,
-	0x0f, 0x4c, 0x4f, 0x43, 0x41, 0x54, 0x49, 0x4f, 0x4e, 0x5f, 0x47, 0x4c, 0x4f, 0x42, 0x41, 0x4c,
-	0x10, 0x02, 0x22, 0x54, 0x0a, 0x07, 0x53, 0x75, 0x62, 0x6a, 0x65, 0x63, 0x74, 0x12, 0x35, 0x0a,
-	0x07, 0x63, 0x61, 0x74, 0x61, 0x6c, 0x6f, 0x67, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x1b,
-	0x2e, 0x62, 0x61, 0x6e, 0x79, 0x61, 0x6e, 0x64, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x6d, 0x6f, 0x6e,
-	0x2e, 0x76, 0x32, 0x2e, 0x43, 0x61, 0x74, 0x61, 0x6c, 0x6f, 0x67, 0x52, 0x07, 0x63, 0x61, 0x74,
-	0x61, 0x6c, 0x6f, 0x67, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x02, 0x20, 0x01,
-	0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x22, 0xc6, 0x02, 0x0a, 0x10, 0x49, 0x6e, 0x64,
-	0x65, 0x78, 0x52, 0x75, 0x6c, 0x65, 0x42, 0x69, 0x6e, 0x64, 0x69, 0x6e, 0x67, 0x12, 0x38, 0x0a,
-	0x08, 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32,
-	0x1c, 0x2e, 0x62, 0x61, 0x6e, 0x79, 0x61, 0x6e, 0x64, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x6d, 0x6f,
-	0x6e, 0x2e, 0x76, 0x32, 0x2e, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x52, 0x08, 0x6d,
-	0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x12, 0x14, 0x0a, 0x05, 0x72, 0x75, 0x6c, 0x65, 0x73,
-	0x18, 0x02, 0x20, 0x03, 0x28, 0x09, 0x52, 0x05, 0x72, 0x75, 0x6c, 0x65, 0x73, 0x12, 0x37, 0x0a,
-	0x07, 0x73, 0x75, 0x62, 0x6a, 0x65, 0x63, 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1d,
+	0x4e, 0x49, 0x54, 0x5f, 0x4d, 0x4f, 0x4e, 0x54, 0x48, 0x10, 0x04, 0x22, 0x56, 0x0a, 0x0d, 0x54,
+	0x61, 0x67, 0x46, 0x61, 0x6d, 0x69, 0x6c, 0x79, 0x53, 0x70, 0x65, 0x63, 0x12, 0x12, 0x0a, 0x04,
+	0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65,
+	0x12, 0x31, 0x0a, 0x04, 0x74, 0x61, 0x67, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x1d,
 	0x2e, 0x62, 0x61, 0x6e, 0x79, 0x61, 0x6e, 0x64, 0x62, 0x2e, 0x64, 0x61, 0x74, 0x61, 0x62, 0x61,
-	0x73, 0x65, 0x2e, 0x76, 0x32, 0x2e, 0x53, 0x75, 0x62, 0x6a, 0x65, 0x63, 0x74, 0x52, 0x07, 0x73,
-	0x75, 0x62, 0x6a, 0x65, 0x63, 0x74, 0x12, 0x35, 0x0a, 0x08, 0x62, 0x65, 0x67, 0x69, 0x6e, 0x5f,
-	0x61, 0x74, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c,
-	0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73,
-	0x74, 0x61, 0x6d, 0x70, 0x52, 0x07, 0x62, 0x65, 0x67, 0x69, 0x6e, 0x41, 0x74, 0x12, 0x37, 0x0a,
-	0x09, 0x65, 0x78, 0x70, 0x69, 0x72, 0x65, 0x5f, 0x61, 0x74, 0x18, 0x05, 0x20, 0x01, 0x28, 0x0b,
+	0x73, 0x65, 0x2e, 0x76, 0x32, 0x2e, 0x54, 0x61, 0x67, 0x53, 0x70, 0x65, 0x63, 0x52, 0x04, 0x74,
+	0x61, 0x67, 0x73, 0x22, 0x50, 0x0a, 0x07, 0x54, 0x61, 0x67, 0x53, 0x70, 0x65, 0x63, 0x12, 0x12,
+	0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61,
+	0x6d, 0x65, 0x12, 0x31, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0e,
+	0x32, 0x1d, 0x2e, 0x62, 0x61, 0x6e, 0x79, 0x61, 0x6e, 0x64, 0x62, 0x2e, 0x64, 0x61, 0x74, 0x61,
+	0x62, 0x61, 0x73, 0x65, 0x2e, 0x76, 0x32, 0x2e, 0x54, 0x61, 0x67, 0x54, 0x79, 0x70, 0x65, 0x52,
+	0x04, 0x74, 0x79, 0x70, 0x65, 0x22, 0xd4, 0x02, 0x0a, 0x06, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d,
+	0x12, 0x38, 0x0a, 0x08, 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x18, 0x01, 0x20, 0x01,
+	0x28, 0x0b, 0x32, 0x1c, 0x2e, 0x62, 0x61, 0x6e, 0x79, 0x61, 0x6e, 0x64, 0x62, 0x2e, 0x63, 0x6f,
+	0x6d, 0x6d, 0x6f, 0x6e, 0x2e, 0x76, 0x32, 0x2e, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61,
+	0x52, 0x08, 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x12, 0x46, 0x0a, 0x0c, 0x74, 0x61,
+	0x67, 0x5f, 0x66, 0x61, 0x6d, 0x69, 0x6c, 0x69, 0x65, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b,
+	0x32, 0x23, 0x2e, 0x62, 0x61, 0x6e, 0x79, 0x61, 0x6e, 0x64, 0x62, 0x2e, 0x64, 0x61, 0x74, 0x61,
+	0x62, 0x61, 0x73, 0x65, 0x2e, 0x76, 0x32, 0x2e, 0x54, 0x61, 0x67, 0x46, 0x61, 0x6d, 0x69, 0x6c,
+	0x79, 0x53, 0x70, 0x65, 0x63, 0x52, 0x0b, 0x74, 0x61, 0x67, 0x46, 0x61, 0x6d, 0x69, 0x6c, 0x69,
+	0x65, 0x73, 0x12, 0x34, 0x0a, 0x06, 0x65, 0x6e, 0x74, 0x69, 0x74, 0x79, 0x18, 0x03, 0x20, 0x01,
+	0x28, 0x0b, 0x32, 0x1c, 0x2e, 0x62, 0x61, 0x6e, 0x79, 0x61, 0x6e, 0x64, 0x62, 0x2e, 0x64, 0x61,
+	0x74, 0x61, 0x62, 0x61, 0x73, 0x65, 0x2e, 0x76, 0x32, 0x2e, 0x45, 0x6e, 0x74, 0x69, 0x74, 0x79,
+	0x52, 0x06, 0x65, 0x6e, 0x74, 0x69, 0x74, 0x79, 0x12, 0x1b, 0x0a, 0x09, 0x73, 0x68, 0x61, 0x72,
+	0x64, 0x5f, 0x6e, 0x75, 0x6d, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x08, 0x73, 0x68, 0x61,
+	0x72, 0x64, 0x4e, 0x75, 0x6d, 0x12, 0x3a, 0x0a, 0x08, 0x64, 0x75, 0x72, 0x61, 0x74, 0x69, 0x6f,
+	0x6e, 0x18, 0x05, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1e, 0x2e, 0x62, 0x61, 0x6e, 0x79, 0x61, 0x6e,
+	0x64, 0x62, 0x2e, 0x64, 0x61, 0x74, 0x61, 0x62, 0x61, 0x73, 0x65, 0x2e, 0x76, 0x32, 0x2e, 0x44,
+	0x75, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x08, 0x64, 0x75, 0x72, 0x61, 0x74, 0x69, 0x6f,
+	0x6e, 0x12, 0x39, 0x0a, 0x0a, 0x75, 0x70, 0x64, 0x61, 0x74, 0x65, 0x64, 0x5f, 0x61, 0x74, 0x18,
+	0x06, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70,
+	0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d,
+	0x70, 0x52, 0x09, 0x75, 0x70, 0x64, 0x61, 0x74, 0x65, 0x64, 0x41, 0x74, 0x22, 0x25, 0x0a, 0x06,
+	0x45, 0x6e, 0x74, 0x69, 0x74, 0x79, 0x12, 0x1b, 0x0a, 0x09, 0x74, 0x61, 0x67, 0x5f, 0x6e, 0x61,
+	0x6d, 0x65, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x09, 0x52, 0x08, 0x74, 0x61, 0x67, 0x4e, 0x61,
+	0x6d, 0x65, 0x73, 0x22, 0xa4, 0x03, 0x0a, 0x09, 0x49, 0x6e, 0x64, 0x65, 0x78, 0x52, 0x75, 0x6c,
+	0x65, 0x12, 0x38, 0x0a, 0x08, 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x18, 0x01, 0x20,
+	0x01, 0x28, 0x0b, 0x32, 0x1c, 0x2e, 0x62, 0x61, 0x6e, 0x79, 0x61, 0x6e, 0x64, 0x62, 0x2e, 0x63,
+	0x6f, 0x6d, 0x6d, 0x6f, 0x6e, 0x2e, 0x76, 0x32, 0x2e, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74,
+	0x61, 0x52, 0x08, 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x12, 0x12, 0x0a, 0x04, 0x74,
+	0x61, 0x67, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x09, 0x52, 0x04, 0x74, 0x61, 0x67, 0x73, 0x12,
+	0x38, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x24, 0x2e,
+	0x62, 0x61, 0x6e, 0x79, 0x61, 0x6e, 0x64, 0x62, 0x2e, 0x64, 0x61, 0x74, 0x61, 0x62, 0x61, 0x73,
+	0x65, 0x2e, 0x76, 0x32, 0x2e, 0x49, 0x6e, 0x64, 0x65, 0x78, 0x52, 0x75, 0x6c, 0x65, 0x2e, 0x54,
+	0x79, 0x70, 0x65, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x44, 0x0a, 0x08, 0x6c, 0x6f, 0x63,
+	0x61, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x28, 0x2e, 0x62, 0x61,
+	0x6e, 0x79, 0x61, 0x6e, 0x64, 0x62, 0x2e, 0x64, 0x61, 0x74, 0x61, 0x62, 0x61, 0x73, 0x65, 0x2e,
+	0x76, 0x32, 0x2e, 0x49, 0x6e, 0x64, 0x65, 0x78, 0x52, 0x75, 0x6c, 0x65, 0x2e, 0x4c, 0x6f, 0x63,
+	0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x08, 0x6c, 0x6f, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x12,
+	0x39, 0x0a, 0x0a, 0x75, 0x70, 0x64, 0x61, 0x74, 0x65, 0x64, 0x5f, 0x61, 0x74, 0x18, 0x05, 0x20,
+	0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f,
+	0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52,
+	0x09, 0x75, 0x70, 0x64, 0x61, 0x74, 0x65, 0x64, 0x41, 0x74, 0x22, 0x3e, 0x0a, 0x04, 0x54, 0x79,
+	0x70, 0x65, 0x12, 0x14, 0x0a, 0x10, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x55, 0x4e, 0x53, 0x50, 0x45,
+	0x43, 0x49, 0x46, 0x49, 0x45, 0x44, 0x10, 0x00, 0x12, 0x0d, 0x0a, 0x09, 0x54, 0x59, 0x50, 0x45,
+	0x5f, 0x54, 0x52, 0x45, 0x45, 0x10, 0x01, 0x12, 0x11, 0x0a, 0x0d, 0x54, 0x59, 0x50, 0x45, 0x5f,
+	0x49, 0x4e, 0x56, 0x45, 0x52, 0x54, 0x45, 0x44, 0x10, 0x02, 0x22, 0x4e, 0x0a, 0x08, 0x4c, 0x6f,
+	0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x18, 0x0a, 0x14, 0x4c, 0x4f, 0x43, 0x41, 0x54, 0x49,
+	0x4f, 0x4e, 0x5f, 0x55, 0x4e, 0x53, 0x50, 0x45, 0x43, 0x49, 0x46, 0x49, 0x45, 0x44, 0x10, 0x00,
+	0x12, 0x13, 0x0a, 0x0f, 0x4c, 0x4f, 0x43, 0x41, 0x54, 0x49, 0x4f, 0x4e, 0x5f, 0x53, 0x45, 0x52,
+	0x49, 0x45, 0x53, 0x10, 0x01, 0x12, 0x13, 0x0a, 0x0f, 0x4c, 0x4f, 0x43, 0x41, 0x54, 0x49, 0x4f,
+	0x4e, 0x5f, 0x47, 0x4c, 0x4f, 0x42, 0x41, 0x4c, 0x10, 0x02, 0x22, 0x54, 0x0a, 0x07, 0x53, 0x75,
+	0x62, 0x6a, 0x65, 0x63, 0x74, 0x12, 0x35, 0x0a, 0x07, 0x63, 0x61, 0x74, 0x61, 0x6c, 0x6f, 0x67,
+	0x18, 0x01, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x1b, 0x2e, 0x62, 0x61, 0x6e, 0x79, 0x61, 0x6e, 0x64,
+	0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x6d, 0x6f, 0x6e, 0x2e, 0x76, 0x32, 0x2e, 0x43, 0x61, 0x74, 0x61,
+	0x6c, 0x6f, 0x67, 0x52, 0x07, 0x63, 0x61, 0x74, 0x61, 0x6c, 0x6f, 0x67, 0x12, 0x12, 0x0a, 0x04,
+	0x6e, 0x61, 0x6d, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65,
+	0x22, 0xc6, 0x02, 0x0a, 0x10, 0x49, 0x6e, 0x64, 0x65, 0x78, 0x52, 0x75, 0x6c, 0x65, 0x42, 0x69,
+	0x6e, 0x64, 0x69, 0x6e, 0x67, 0x12, 0x38, 0x0a, 0x08, 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74,
+	0x61, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1c, 0x2e, 0x62, 0x61, 0x6e, 0x79, 0x61, 0x6e,
+	0x64, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x6d, 0x6f, 0x6e, 0x2e, 0x76, 0x32, 0x2e, 0x4d, 0x65, 0x74,
+	0x61, 0x64, 0x61, 0x74, 0x61, 0x52, 0x08, 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x12,
+	0x14, 0x0a, 0x05, 0x72, 0x75, 0x6c, 0x65, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x09, 0x52, 0x05,
+	0x72, 0x75, 0x6c, 0x65, 0x73, 0x12, 0x37, 0x0a, 0x07, 0x73, 0x75, 0x62, 0x6a, 0x65, 0x63, 0x74,
+	0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1d, 0x2e, 0x62, 0x61, 0x6e, 0x79, 0x61, 0x6e, 0x64,
+	0x62, 0x2e, 0x64, 0x61, 0x74, 0x61, 0x62, 0x61, 0x73, 0x65, 0x2e, 0x76, 0x32, 0x2e, 0x53, 0x75,
+	0x62, 0x6a, 0x65, 0x63, 0x74, 0x52, 0x07, 0x73, 0x75, 0x62, 0x6a, 0x65, 0x63, 0x74, 0x12, 0x35,
+	0x0a, 0x08, 0x62, 0x65, 0x67, 0x69, 0x6e, 0x5f, 0x61, 0x74, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b,
 	0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62,
-	0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x08, 0x65, 0x78,
-	0x70, 0x69, 0x72, 0x65, 0x41, 0x74, 0x12, 0x39, 0x0a, 0x0a, 0x75, 0x70, 0x64, 0x61, 0x74, 0x65,
-	0x64, 0x5f, 0x61, 0x74, 0x18, 0x06, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f,
-	0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d,
-	0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x09, 0x75, 0x70, 0x64, 0x61, 0x74, 0x65, 0x64, 0x41,
-	0x74, 0x2a, 0x97, 0x01, 0x0a, 0x07, 0x54, 0x61, 0x67, 0x54, 0x79, 0x70, 0x65, 0x12, 0x18, 0x0a,
-	0x14, 0x54, 0x41, 0x47, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x55, 0x4e, 0x53, 0x50, 0x45, 0x43,
-	0x49, 0x46, 0x49, 0x45, 0x44, 0x10, 0x00, 0x12, 0x13, 0x0a, 0x0f, 0x54, 0x41, 0x47, 0x5f, 0x54,
-	0x59, 0x50, 0x45, 0x5f, 0x53, 0x54, 0x52, 0x49, 0x4e, 0x47, 0x10, 0x01, 0x12, 0x10, 0x0a, 0x0c,
-	0x54, 0x41, 0x47, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x49, 0x4e, 0x54, 0x10, 0x02, 0x12, 0x19,
-	0x0a, 0x15, 0x54, 0x41, 0x47, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x53, 0x54, 0x52, 0x49, 0x4e,
-	0x47, 0x5f, 0x41, 0x52, 0x52, 0x41, 0x59, 0x10, 0x03, 0x12, 0x16, 0x0a, 0x12, 0x54, 0x41, 0x47,
-	0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x49, 0x4e, 0x54, 0x5f, 0x41, 0x52, 0x52, 0x41, 0x59, 0x10,
-	0x04, 0x12, 0x18, 0x0a, 0x14, 0x54, 0x41, 0x47, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x44, 0x41,
-	0x54, 0x41, 0x5f, 0x42, 0x49, 0x4e, 0x41, 0x52, 0x59, 0x10, 0x05, 0x42, 0x72, 0x0a, 0x2a, 0x6f,
-	0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2e, 0x73, 0x6b, 0x79, 0x77, 0x61, 0x6c,
-	0x6b, 0x69, 0x6e, 0x67, 0x2e, 0x62, 0x61, 0x6e, 0x79, 0x61, 0x6e, 0x64, 0x62, 0x2e, 0x64, 0x61,
-	0x74, 0x61, 0x62, 0x61, 0x73, 0x65, 0x2e, 0x76, 0x32, 0x5a, 0x44, 0x67, 0x69, 0x74, 0x68, 0x75,
-	0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2f, 0x73, 0x6b, 0x79,
-	0x77, 0x61, 0x6c, 0x6b, 0x69, 0x6e, 0x67, 0x2d, 0x62, 0x61, 0x6e, 0x79, 0x61, 0x6e, 0x64, 0x62,
-	0x2f, 0x61, 0x70, 0x69, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x62, 0x61, 0x6e, 0x79, 0x61,
-	0x6e, 0x64, 0x62, 0x2f, 0x64, 0x61, 0x74, 0x61, 0x62, 0x61, 0x73, 0x65, 0x2f, 0x76, 0x32, 0x62,
-	0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
+	0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x07, 0x62, 0x65,
+	0x67, 0x69, 0x6e, 0x41, 0x74, 0x12, 0x37, 0x0a, 0x09, 0x65, 0x78, 0x70, 0x69, 0x72, 0x65, 0x5f,
+	0x61, 0x74, 0x18, 0x05, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c,
+	0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73,
+	0x74, 0x61, 0x6d, 0x70, 0x52, 0x08, 0x65, 0x78, 0x70, 0x69, 0x72, 0x65, 0x41, 0x74, 0x12, 0x39,
+	0x0a, 0x0a, 0x75, 0x70, 0x64, 0x61, 0x74, 0x65, 0x64, 0x5f, 0x61, 0x74, 0x18, 0x06, 0x20, 0x01,
+	0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74,
+	0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x09,
+	0x75, 0x70, 0x64, 0x61, 0x74, 0x65, 0x64, 0x41, 0x74, 0x2a, 0x97, 0x01, 0x0a, 0x07, 0x54, 0x61,
+	0x67, 0x54, 0x79, 0x70, 0x65, 0x12, 0x18, 0x0a, 0x14, 0x54, 0x41, 0x47, 0x5f, 0x54, 0x59, 0x50,
+	0x45, 0x5f, 0x55, 0x4e, 0x53, 0x50, 0x45, 0x43, 0x49, 0x46, 0x49, 0x45, 0x44, 0x10, 0x00, 0x12,
+	0x13, 0x0a, 0x0f, 0x54, 0x41, 0x47, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x53, 0x54, 0x52, 0x49,
+	0x4e, 0x47, 0x10, 0x01, 0x12, 0x10, 0x0a, 0x0c, 0x54, 0x41, 0x47, 0x5f, 0x54, 0x59, 0x50, 0x45,
+	0x5f, 0x49, 0x4e, 0x54, 0x10, 0x02, 0x12, 0x19, 0x0a, 0x15, 0x54, 0x41, 0x47, 0x5f, 0x54, 0x59,
+	0x50, 0x45, 0x5f, 0x53, 0x54, 0x52, 0x49, 0x4e, 0x47, 0x5f, 0x41, 0x52, 0x52, 0x41, 0x59, 0x10,
+	0x03, 0x12, 0x16, 0x0a, 0x12, 0x54, 0x41, 0x47, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x49, 0x4e,
+	0x54, 0x5f, 0x41, 0x52, 0x52, 0x41, 0x59, 0x10, 0x04, 0x12, 0x18, 0x0a, 0x14, 0x54, 0x41, 0x47,
+	0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x44, 0x41, 0x54, 0x41, 0x5f, 0x42, 0x49, 0x4e, 0x41, 0x52,
+	0x59, 0x10, 0x05, 0x42, 0x72, 0x0a, 0x2a, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63, 0x68,
+	0x65, 0x2e, 0x73, 0x6b, 0x79, 0x77, 0x61, 0x6c, 0x6b, 0x69, 0x6e, 0x67, 0x2e, 0x62, 0x61, 0x6e,
+	0x79, 0x61, 0x6e, 0x64, 0x62, 0x2e, 0x64, 0x61, 0x74, 0x61, 0x62, 0x61, 0x73, 0x65, 0x2e, 0x76,
+	0x32, 0x5a, 0x44, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x61, 0x70,
+	0x61, 0x63, 0x68, 0x65, 0x2f, 0x73, 0x6b, 0x79, 0x77, 0x61, 0x6c, 0x6b, 0x69, 0x6e, 0x67, 0x2d,
+	0x62, 0x61, 0x6e, 0x79, 0x61, 0x6e, 0x64, 0x62, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x70, 0x72, 0x6f,
+	0x74, 0x6f, 0x2f, 0x62, 0x61, 0x6e, 0x79, 0x61, 0x6e, 0x64, 0x62, 0x2f, 0x64, 0x61, 0x74, 0x61,
+	0x62, 0x61, 0x73, 0x65, 0x2f, 0x76, 0x32, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
 }
 
 var (
@@ -955,7 +955,7 @@ var file_banyandb_database_v2_schema_proto_goTypes = []interface{}{
 	(IndexRule_Type)(0),           // 2: banyandb.database.v2.IndexRule.Type
 	(IndexRule_Location)(0),       // 3: banyandb.database.v2.IndexRule.Location
 	(*Duration)(nil),              // 4: banyandb.database.v2.Duration
-	(*TagFamily)(nil),             // 5: banyandb.database.v2.TagFamily
+	(*TagFamilySpec)(nil),         // 5: banyandb.database.v2.TagFamilySpec
 	(*TagSpec)(nil),               // 6: banyandb.database.v2.TagSpec
 	(*Stream)(nil),                // 7: banyandb.database.v2.Stream
 	(*Entity)(nil),                // 8: banyandb.database.v2.Entity
@@ -968,10 +968,10 @@ var file_banyandb_database_v2_schema_proto_goTypes = []interface{}{
 }
 var file_banyandb_database_v2_schema_proto_depIdxs = []int32{
 	1,  // 0: banyandb.database.v2.Duration.unit:type_name -> banyandb.database.v2.Duration.DurationUnit
-	6,  // 1: banyandb.database.v2.TagFamily.tags:type_name -> banyandb.database.v2.TagSpec
+	6,  // 1: banyandb.database.v2.TagFamilySpec.tags:type_name -> banyandb.database.v2.TagSpec
 	0,  // 2: banyandb.database.v2.TagSpec.type:type_name -> banyandb.database.v2.TagType
 	12, // 3: banyandb.database.v2.Stream.metadata:type_name -> banyandb.common.v2.Metadata
-	5,  // 4: banyandb.database.v2.Stream.tag_families:type_name -> banyandb.database.v2.TagFamily
+	5,  // 4: banyandb.database.v2.Stream.tag_families:type_name -> banyandb.database.v2.TagFamilySpec
 	8,  // 5: banyandb.database.v2.Stream.entity:type_name -> banyandb.database.v2.Entity
 	4,  // 6: banyandb.database.v2.Stream.duration:type_name -> banyandb.database.v2.Duration
 	13, // 7: banyandb.database.v2.Stream.updated_at:type_name -> google.protobuf.Timestamp
@@ -1011,7 +1011,7 @@ func file_banyandb_database_v2_schema_proto_init() {
 			}
 		}
 		file_banyandb_database_v2_schema_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} {
-			switch v := v.(*TagFamily); i {
+			switch v := v.(*TagFamilySpec); i {
 			case 0:
 				return &v.state
 			case 1:
diff --git a/api/proto/banyandb/database/v2/schema.proto b/api/proto/banyandb/database/v2/schema.proto
index cb358aa..a5647a6 100644
--- a/api/proto/banyandb/database/v2/schema.proto
+++ b/api/proto/banyandb/database/v2/schema.proto
@@ -47,7 +47,7 @@ enum TagType {
     TAG_TYPE_DATA_BINARY = 5;
 }
 
-message TagFamily {
+message TagFamilySpec {
     string name = 1;
     // tags defines accepted tags
     repeated TagSpec tags = 2; 
@@ -63,7 +63,7 @@ message Stream {
     // metadata is the identity of a trace series
     common.v2.Metadata metadata = 1;
     // tag_families 
-    repeated TagFamily tag_families = 2;
+    repeated TagFamilySpec tag_families = 2;
     // entity indicates how to generate a series and shard a stream
     Entity entity = 3;
     // duration determines how long a TraceSeries keeps its data
diff --git a/banyand/kv/kv.go b/banyand/kv/kv.go
index 4e17456..0e49715 100644
--- a/banyand/kv/kv.go
+++ b/banyand/kv/kv.go
@@ -139,7 +139,6 @@ func OpenTimeSeriesStore(shardID int, path string, compressLevel int, valueSize
 	for _, opt := range options {
 		opt(btss)
 	}
-	btss.dbOpts = btss.dbOpts.WithMaxLevels(1)
 	// Put all values into LSM
 	btss.dbOpts = btss.dbOpts.WithVLogPercentile(1.0)
 	var err error
@@ -197,7 +196,6 @@ func OpenStore(shardID int, path string, options ...StoreOptions) (Store, error)
 	for _, opt := range options {
 		opt(bdb)
 	}
-	bdb.dbOpts = bdb.dbOpts.WithMaxLevels(2)
 	bdb.dbOpts = bdb.dbOpts.WithNumVersionsToKeep(math.MaxUint32)
 
 	var err error
@@ -229,7 +227,6 @@ func OpenIndexStore(shardID int, path string, options ...IndexOptions) (IndexSto
 	for _, opt := range options {
 		opt(bdb)
 	}
-	bdb.dbOpts = bdb.dbOpts.WithMaxLevels(2)
 	bdb.dbOpts = bdb.dbOpts.WithNumVersionsToKeep(math.MaxUint32)
 
 	var err error