You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ha...@apache.org on 2021/09/12 13:59:08 UTC

[skywalking-banyandb] 02/03: Implement two indices

This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch time-series
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit 5f5c2fa58b98168906b1bc7fd703289d5f485762
Author: Gao Hongtao <ha...@gmail.com>
AuthorDate: Sun Sep 12 20:43:11 2021 +0800

    Implement two indices
    
    Signed-off-by: Gao Hongtao <ha...@gmail.com>
---
 banyand/index/index_test.go                    |   2 +-
 banyand/kv/badger.go                           | 114 +--------
 banyand/kv/kv.go                               |  19 +-
 banyand/storage/database_test.go               | 259 ++++++++++---------
 banyand/stream/stream_query_test.go            |  54 ++--
 banyand/stream/stream_write.go                 |   1 +
 banyand/stream/stream_write_test.go            |   2 +-
 banyand/tsdb/block.go                          |  40 +--
 banyand/tsdb/series_seek.go                    |  15 +-
 banyand/tsdb/series_seek_filter.go             |   7 +
 banyand/tsdb/series_seek_sort.go               |  42 +++-
 banyand/tsdb/series_write.go                   |   4 +-
 pkg/index/index.go                             |  86 ++++++-
 pkg/index/inverted/field_map.go                |  26 +-
 pkg/index/inverted/inverted.go                 | 199 ++++++++++++++-
 pkg/index/inverted/inverted_test.go            | 142 +++++++++++
 pkg/index/inverted/mem.go                      | 173 +++++++++----
 pkg/index/inverted/mem_test.go                 | 237 +-----------------
 pkg/index/inverted/term_map.go                 | 101 +-------
 pkg/index/iterator.go                          | 209 ++++++++++++++++
 pkg/index/{inverted/inverted.go => lsm/lsm.go} |  38 +--
 pkg/index/lsm/lsm_test.go                      |  69 ++++++
 pkg/index/lsm/search.go                        |  84 +++++++
 pkg/index/posting/roaring/roaring.go           |   8 +
 pkg/index/test_cases/duration.go               | 329 +++++++++++++++++++++++++
 pkg/index/test_cases/service_name.go           | 100 ++++++++
 pkg/index/{search.go => tree.go}               |  61 +++--
 27 files changed, 1677 insertions(+), 744 deletions(-)

diff --git a/banyand/index/index_test.go b/banyand/index/index_test.go
index fde4e9f..7730f91 100644
--- a/banyand/index/index_test.go
+++ b/banyand/index/index_test.go
@@ -116,7 +116,7 @@ func Test_service_Insert(t *testing.T) {
 		t.Run(tt.name, func(t *testing.T) {
 			s := setUpModules(tester)
 			if err := s.Insert(tt.args.series, tt.args.shardID, tt.args.field); (err != nil) != tt.wantErr {
-				t.Errorf("Insert() error = %v, wantErr %v", err, tt.wantErr)
+				t.Errorf("Write() error = %v, wantErr %v", err, tt.wantErr)
 			}
 		})
 	}
diff --git a/banyand/kv/badger.go b/banyand/kv/badger.go
index a3767d5..92a8529 100644
--- a/banyand/kv/badger.go
+++ b/banyand/kv/badger.go
@@ -25,17 +25,8 @@ import (
 
 	"github.com/dgraph-io/badger/v3"
 	"github.com/dgraph-io/badger/v3/y"
-	"go.uber.org/multierr"
-
-	"github.com/apache/skywalking-banyandb/api/common"
-	modelv2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v2"
-	"github.com/apache/skywalking-banyandb/pkg/convert"
-	"github.com/apache/skywalking-banyandb/pkg/index"
-	"github.com/apache/skywalking-banyandb/pkg/index/posting"
-	"github.com/apache/skywalking-banyandb/pkg/index/posting/roaring"
+
 	"github.com/apache/skywalking-banyandb/pkg/logger"
-	posting2 "github.com/apache/skywalking-banyandb/pkg/posting"
-	roaring2 "github.com/apache/skywalking-banyandb/pkg/posting/roaring"
 )
 
 var (
@@ -62,7 +53,7 @@ func (b *badgerTSS) Close() error {
 }
 
 type mergedIter struct {
-	delegated Iterator2
+	delegated Iterator
 	valid     bool
 	data      []byte
 }
@@ -95,12 +86,7 @@ func (i *mergedIter) parseData() {
 	if !i.valid {
 		return
 	}
-	data, err := i.delegated.Val().Marshall()
-	if err != nil {
-		i.valid = false
-		return
-	}
-	i.data = data
+	i.data = i.delegated.Val()
 }
 
 func (i *mergedIter) Close() error {
@@ -122,39 +108,12 @@ type badgerDB struct {
 	db      *badger.DB
 }
 
-func (b *badgerDB) Handover(iterator Iterator2) error {
+func (b *badgerDB) Handover(iterator Iterator) error {
 	return b.db.HandoverIterator(&mergedIter{
 		delegated: iterator,
 	})
 }
 
-func (b *badgerDB) Seek(key []byte, limit int) (posting2.List, error) {
-	opts := badger.DefaultIteratorOptions
-	it := b.db.NewIterator(opts)
-	defer func() {
-		_ = it.Close()
-	}()
-	result := roaring2.NewPostingList()
-	var errMerged error
-	for it.Seek(y.KeyWithTs(key, math.MaxInt64)); it.Valid(); it.Next() {
-		k := y.ParseKey(it.Key())
-		if !bytes.Equal(key, k) {
-			break
-		}
-		list := roaring2.NewPostingList()
-		err := list.Unmarshall(it.Value().Value)
-		if err != nil {
-			errMerged = multierr.Append(errMerged, err)
-			continue
-		}
-		_ = result.Union(list)
-		if result.Len() > limit {
-			break
-		}
-	}
-	return result, errMerged
-}
-
 func (b *badgerDB) Scan(key []byte, opt ScanOpts, f ScanFunc) error {
 	opts := badger.DefaultIteratorOptions
 	opts.PrefetchSize = opt.PrefetchSize
@@ -194,7 +153,7 @@ func (i *iterator) Rewind() {
 }
 
 func (i *iterator) Seek(key []byte) {
-	i.delegated.Seek(key)
+	i.delegated.Seek(y.KeyWithTs(key, math.MaxInt64))
 }
 
 func (i *iterator) Key() []byte {
@@ -254,7 +213,7 @@ func (b *badgerDB) Get(key []byte) ([]byte, error) {
 func (b *badgerDB) GetAll(key []byte, applyFn func([]byte) error) error {
 	iter := b.db.NewIterator(badger.DefaultIteratorOptions)
 	var count int
-	for iter.Seek(key); iter.Valid(); iter.Next() {
+	for iter.Seek(y.KeyWithTs(key, math.MaxInt64)); iter.Valid(); iter.Next() {
 		if !bytes.Equal(y.ParseKey(iter.Key()), key) {
 			break
 		}
@@ -270,67 +229,6 @@ func (b *badgerDB) GetAll(key []byte, applyFn func([]byte) error) error {
 	return ErrKeyNotFound
 }
 
-func (b *badgerDB) MatchField(fieldName []byte) (list posting.List) {
-	panic("implement me")
-}
-
-func (b *badgerDB) MatchTerms(field index.Field) (list posting.List) {
-	panic("implement me")
-}
-
-func (b *badgerDB) Range(fieldName []byte, opts index.RangeOpts) (list posting.List) {
-	panic("implement me")
-}
-
-var _ index.FieldIterator = (*fIterator)(nil)
-
-type fIterator struct {
-	init     bool
-	delegate Iterator
-	curr     *index.PostingValue
-}
-
-func (f *fIterator) Next() bool {
-	if !f.init {
-		f.init = true
-		f.delegate.Rewind()
-	}
-	if !f.delegate.Valid() {
-		return false
-	}
-	pv := &index.PostingValue{
-		Key:   f.delegate.Key(),
-		Value: roaring.NewPostingListWithInitialData(convert.BytesToUint64(f.delegate.Val())),
-	}
-	for ; f.delegate.Valid() && bytes.Equal(pv.Key, f.delegate.Key()); f.delegate.Next() {
-		pv.Value.Insert(common.ItemID(convert.BytesToUint64(f.delegate.Val())))
-	}
-	f.curr = pv
-	return true
-}
-
-func (f *fIterator) Val() *index.PostingValue {
-	return f.curr
-}
-
-func (f *fIterator) Close() error {
-	return f.delegate.Close()
-}
-
-func (b *badgerDB) FieldIterator(fieldName []byte, order modelv2.QueryOrder_Sort) index.FieldIterator {
-	var reverse bool
-	if order == modelv2.QueryOrder_SORT_DESC {
-		reverse = true
-	}
-	iter := b.NewIterator(ScanOpts{
-		Prefix:  fieldName,
-		Reverse: reverse,
-	})
-	return &fIterator{
-		delegate: iter,
-	}
-}
-
 // badgerLog delegates the zap log to the badger logger
 type badgerLog struct {
 	*log.Logger
diff --git a/banyand/kv/kv.go b/banyand/kv/kv.go
index 0e49715..fc829d3 100644
--- a/banyand/kv/kv.go
+++ b/banyand/kv/kv.go
@@ -25,9 +25,7 @@ import (
 	"github.com/dgraph-io/badger/v3"
 	"github.com/pkg/errors"
 
-	"github.com/apache/skywalking-banyandb/pkg/index"
 	"github.com/apache/skywalking-banyandb/pkg/logger"
-	posting2 "github.com/apache/skywalking-banyandb/pkg/posting"
 )
 
 var (
@@ -54,6 +52,7 @@ type ScanOpts struct {
 }
 
 type Reader interface {
+	Iterable
 	// Get a value by its key
 	Get(key []byte) ([]byte, error)
 	GetAll(key []byte, applyFn func([]byte) error) error
@@ -65,7 +64,6 @@ type Store interface {
 	io.Closer
 	Writer
 	Reader
-	index.Searcher
 }
 
 type TimeSeriesWriter interface {
@@ -113,21 +111,16 @@ type Iterator interface {
 	Close() error
 }
 
-type Iterator2 interface {
-	Next()
-	Rewind()
-	Seek(key []byte)
-	Key() []byte
-	Val() posting2.List
-	Valid() bool
-	Close() error
+type Iterable interface {
+	NewIterator(opt ScanOpts) Iterator
 }
 
 type HandoverCallback func()
 
 type IndexStore interface {
-	Handover(iterator Iterator2) error
-	Seek(key []byte, limit int) (posting2.List, error)
+	Iterable
+	Reader
+	Handover(iterator Iterator) error
 	Close() error
 }
 
diff --git a/banyand/storage/database_test.go b/banyand/storage/database_test.go
index 971cdf7..42893e0 100644
--- a/banyand/storage/database_test.go
+++ b/banyand/storage/database_test.go
@@ -29,12 +29,7 @@ import (
 	"github.com/stretchr/testify/assert"
 	"github.com/stretchr/testify/require"
 
-	"github.com/apache/skywalking-banyandb/api/common"
-	"github.com/apache/skywalking-banyandb/banyand/kv"
-	"github.com/apache/skywalking-banyandb/pkg/convert"
 	"github.com/apache/skywalking-banyandb/pkg/logger"
-	"github.com/apache/skywalking-banyandb/pkg/posting"
-	"github.com/apache/skywalking-banyandb/pkg/posting/roaring"
 )
 
 func TestDB_Create_Directory(t *testing.T) {
@@ -63,59 +58,60 @@ func TestDB_Create_Directory(t *testing.T) {
 	validateDirectory(t, fmt.Sprintf(blockTemplate, segPath, now.Format(blockFormat)))
 }
 
-func TestDB_Store(t *testing.T) {
-	is := require.New(t)
-	ctrl := gomock.NewController(t)
-	defer ctrl.Finish()
-	now := uint64(time.Now().UnixNano())
-	var ap WritePoint
-	var repo StoreRepo
-	p := mockPlugin(ctrl, func(r StoreRepo, get GetWritePoint) {
-		ap = get(now)
-		repo = r
-	})
-
-	tempDir, db := setUp(t, p)
-	defer func() {
-		db.GracefulStop()
-		removeDir(tempDir)
-	}()
-
-	is.NoError(ap.Writer(0, "normal").Put([]byte("key1"), []byte{12}))
-	val, err := repo.Reader(0, "normal", now, now).Get([]byte("key1"))
-	is.NoError(err)
-	is.Equal([]byte{12}, val)
-
-	is.NoError(ap.TimeSeriesWriter(1, "time-series").Put([]byte("key11"), []byte{33}, 1))
-	val, err = repo.TimeSeriesReader(1, "time-series", now, now).Get([]byte("key11"), 1)
-	is.NoError(err)
-	is.Equal([]byte{33}, val)
-	vals, allErr := repo.TimeSeriesReader(1, "time-series", now, now).GetAll([]byte("key11"))
-	is.NoError(allErr)
-	is.Equal([][]byte{{33}}, vals)
-
-	index := repo.Index(1, "index")
-	is.NoError(index.Handover(mockMemtable([]uint64{1, 2}, []uint64{3, 6})))
-	list, err := index.Seek(convert.Int64ToBytes(0), 2)
-	is.NoError(err)
-	is.Equal(2, list.Len())
-	is.True(list.Contains(common.ChunkID(1)))
-	is.True(list.Contains(common.ChunkID(2)))
-	list, err = index.Seek(convert.Int64ToBytes(1), 2)
-	is.NoError(err)
-	is.Equal(2, list.Len())
-	is.True(list.Contains(common.ChunkID(3)))
-	is.True(list.Contains(common.ChunkID(6)))
-
-	is.NoError(index.Handover(mockMemtable([]uint64{11, 14})))
-	list, err = index.Seek(convert.Int64ToBytes(0), 2)
-	is.NoError(err)
-	is.Equal(4, list.Len())
-	is.True(list.Contains(common.ChunkID(1)))
-	is.True(list.Contains(common.ChunkID(2)))
-	is.True(list.Contains(common.ChunkID(11)))
-	is.True(list.Contains(common.ChunkID(14)))
-}
+//
+//func TestDB_Store(t *testing.T) {
+//	is := require.New(t)
+//	ctrl := gomock.NewController(t)
+//	defer ctrl.Finish()
+//	now := uint64(time.Now().UnixNano())
+//	var ap WritePoint
+//	var repo StoreRepo
+//	p := mockPlugin(ctrl, func(r StoreRepo, get GetWritePoint) {
+//		ap = get(now)
+//		repo = r
+//	})
+//
+//	tempDir, db := setUp(t, p)
+//	defer func() {
+//		db.GracefulStop()
+//		removeDir(tempDir)
+//	}()
+//
+//	is.NoError(ap.Writer(0, "normal").Put([]byte("key1"), []byte{12}))
+//	val, err := repo.Reader(0, "normal", now, now).Get([]byte("key1"))
+//	is.NoError(err)
+//	is.Equal([]byte{12}, val)
+//
+//	is.NoError(ap.TimeSeriesWriter(1, "time-series").Put([]byte("key11"), []byte{33}, 1))
+//	val, err = repo.TimeSeriesReader(1, "time-series", now, now).Get([]byte("key11"), 1)
+//	is.NoError(err)
+//	is.Equal([]byte{33}, val)
+//	vals, allErr := repo.TimeSeriesReader(1, "time-series", now, now).GetAll([]byte("key11"))
+//	is.NoError(allErr)
+//	is.Equal([][]byte{{33}}, vals)
+//
+//	index := repo.Index(1, "index")
+//	is.NoError(index.Handover(mockMemtable([]uint64{1, 2}, []uint64{3, 6})))
+//	list, err := index.Seek(convert.Int64ToBytes(0), 2)
+//	is.NoError(err)
+//	is.Equal(2, list.Len())
+//	is.True(list.Contains(common.ChunkID(1)))
+//	is.True(list.Contains(common.ChunkID(2)))
+//	list, err = index.Seek(convert.Int64ToBytes(1), 2)
+//	is.NoError(err)
+//	is.Equal(2, list.Len())
+//	is.True(list.Contains(common.ChunkID(3)))
+//	is.True(list.Contains(common.ChunkID(6)))
+//
+//	is.NoError(index.Handover(mockMemtable([]uint64{11, 14})))
+//	list, err = index.Seek(convert.Int64ToBytes(0), 2)
+//	is.NoError(err)
+//	is.Equal(4, list.Len())
+//	is.True(list.Contains(common.ChunkID(1)))
+//	is.True(list.Contains(common.ChunkID(2)))
+//	is.True(list.Contains(common.ChunkID(11)))
+//	is.True(list.Contains(common.ChunkID(14)))
+//}
 
 func TestDB_FlushCallback(t *testing.T) {
 	is := require.New(t)
@@ -168,79 +164,80 @@ func TestDB_FlushCallback(t *testing.T) {
 	}
 }
 
-var _ kv.Iterator2 = (*iter)(nil)
-
-type iter struct {
-	data map[int]posting.List
-	p    int
-}
-
-func (i *iter) Next() {
-	i.p++
-}
-
-func (i *iter) Rewind() {
-	i.p = 0
-}
-
-func (i *iter) Seek(key []byte) {
-	panic("implement me")
-}
-
-func (i *iter) Key() []byte {
-	return convert.Int64ToBytes(int64(i.p))
-}
-
-func (i *iter) Val() posting.List {
-	return i.data[i.p]
-}
-
-func (i *iter) Valid() bool {
-	_, ok := i.data[i.p]
-	return ok
-}
-
-func (i *iter) Close() error {
-	return nil
-}
-
-func mockMemtable(data ...[]uint64) kv.Iterator2 {
-	it := &iter{
-		data: make(map[int]posting.List),
-	}
-	for i, d := range data {
-		it.data[i] = roaring.NewPostingListWithInitialData(d...)
-	}
-	return it
-}
-
-func mockPlugin(ctrl *gomock.Controller, f func(repo StoreRepo, get GetWritePoint)) Plugin {
-	p := NewMockPlugin(ctrl)
-	p.EXPECT().Meta().Return(PluginMeta{
-		ID:          "sw",
-		Group:       "default",
-		ShardNumber: 2,
-		KVSpecs: []KVSpec{
-			{
-				Name: "normal",
-				Type: KVTypeNormal,
-			},
-			{
-				Name:          "time-series",
-				Type:          KVTypeTimeSeries,
-				CompressLevel: 3,
-			},
-			{
-				Name: "index",
-				Type: KVTypeIndex,
-			},
-		},
-	}).AnyTimes()
-	p.EXPECT().Init(gomock.Any(), gomock.Any()).Do(func(r StoreRepo, wp GetWritePoint) {
-		f(r, wp)
-	}).AnyTimes()
-	return p
-}
+//
+//var _ kv.Iterator2 = (*iter)(nil)
+//
+//type iter struct {
+//	data map[int]posting.List
+//	p    int
+//}
+//
+//func (i *iter) Next() {
+//	i.p++
+//}
+//
+//func (i *iter) Rewind() {
+//	i.p = 0
+//}
+//
+//func (i *iter) Seek(key []byte) {
+//	panic("implement me")
+//}
+//
+//func (i *iter) Key() []byte {
+//	return convert.Int64ToBytes(int64(i.p))
+//}
+//
+//func (i *iter) Val() posting.List {
+//	return i.data[i.p]
+//}
+//
+//func (i *iter) Valid() bool {
+//	_, ok := i.data[i.p]
+//	return ok
+//}
+//
+//func (i *iter) Close() error {
+//	return nil
+//}
+//
+//func mockMemtable(data ...[]uint64) kv.Iterator2 {
+//	it := &iter{
+//		data: make(map[int]posting.List),
+//	}
+//	for i, d := range data {
+//		it.data[i] = roaring.NewPostingListWithInitialData(d...)
+//	}
+//	return it
+//}
+//
+//func mockPlugin(ctrl *gomock.Controller, f func(repo StoreRepo, get GetWritePoint)) Plugin {
+//	p := NewMockPlugin(ctrl)
+//	p.EXPECT().Meta().Return(PluginMeta{
+//		ID:          "sw",
+//		Group:       "default",
+//		ShardNumber: 2,
+//		KVSpecs: []KVSpec{
+//			{
+//				Name: "normal",
+//				Type: KVTypeNormal,
+//			},
+//			{
+//				Name:          "time-series",
+//				Type:          KVTypeTimeSeries,
+//				CompressLevel: 3,
+//			},
+//			{
+//				Name: "index",
+//				Type: KVTypeIndex,
+//			},
+//		},
+//	}).AnyTimes()
+//	p.EXPECT().Init(gomock.Any(), gomock.Any()).Do(func(r StoreRepo, wp GetWritePoint) {
+//		f(r, wp)
+//	}).AnyTimes()
+//	return p
+//}
 
 func setUp(t *testing.T, p Plugin) (tempDir string, db Database) {
 	require.NoError(t, logger.Init(logger.Logging{
diff --git a/banyand/stream/stream_query_test.go b/banyand/stream/stream_query_test.go
index a060ff3..f02921b 100644
--- a/banyand/stream/stream_query_test.go
+++ b/banyand/stream/stream_query_test.go
@@ -19,6 +19,7 @@ package stream
 
 import (
 	"bytes"
+	"context"
 	"embed"
 	_ "embed"
 	"encoding/base64"
@@ -33,6 +34,7 @@ import (
 	"github.com/golang/protobuf/jsonpb"
 	"github.com/pkg/errors"
 	"github.com/stretchr/testify/assert"
+	"github.com/stretchr/testify/require"
 	"google.golang.org/protobuf/types/known/timestamppb"
 
 	"github.com/apache/skywalking-banyandb/api/common"
@@ -58,7 +60,7 @@ func Test_Stream_SelectShard(t *testing.T) {
 	tester := assert.New(t)
 	s, deferFunc := setup(tester)
 	defer deferFunc()
-	_ = setupQueryData(tester, "multiple_shards.json", s)
+	_ = setupQueryData(t, "multiple_shards.json", s)
 	tests := []struct {
 		name         string
 		entity       tsdb.Entity
@@ -99,7 +101,7 @@ func Test_Stream_Series(t *testing.T) {
 	tester := assert.New(t)
 	s, deferFunc := setup(tester)
 	defer deferFunc()
-	baseTime := setupQueryData(tester, "multiple_shards.json", s)
+	baseTime := setupQueryData(t, "multiple_shards.json", s)
 	tests := []struct {
 		name    string
 		args    queryOpts
@@ -315,7 +317,7 @@ func Test_Stream_Global_Index(t *testing.T) {
 	tester := assert.New(t)
 	s, deferFunc := setup(tester)
 	defer deferFunc()
-	_ = setupQueryData(tester, "global_index.json", s)
+	_ = setupQueryData(t, "global_index.json", s)
 	tests := []struct {
 		name                string
 		traceID             string
@@ -477,7 +479,8 @@ func queryData(tester *assert.Assertions, s *stream, opts queryOpts) (shardsForT
 //go:embed testdata/*.json
 var dataFS embed.FS
 
-func setupQueryData(t *assert.Assertions, dataFile string, stream *stream) (baseTime time.Time) {
+func setupQueryData(testing *testing.T, dataFile string, stream *stream) (baseTime time.Time) {
+	t := assert.New(testing)
 	var templates []interface{}
 	baseTime = time.Now()
 	content, err := dataFS.ReadFile("testdata/" + dataFile)
@@ -509,24 +512,37 @@ func setupQueryData(t *assert.Assertions, dataFile string, stream *stream) (base
 		t.NoError(err)
 		shardID, err := partition.ShardID(entity.Marshal(), stream.schema.GetShardNum())
 		t.NoError(err)
-		itemID, err := stream.write(common.ShardID(shardID), e)
+		_, err = stream.write(common.ShardID(shardID), e)
 		t.NoError(err)
-		sa, err := stream.Shards(entity)
-		t.NoError(err)
-		for _, shard := range sa {
-			se, err := shard.Series().Get(entity)
-			t.NoError(err)
-			for {
-				item, closer, _ := se.Get(*itemID)
-				rawTagFamily, _ := item.Val("searchable")
-				if len(rawTagFamily) > 0 {
-					_ = closer.Close()
-					break
+	}
+	ctx, cancelFunc := context.WithTimeout(context.Background(), 10*time.Second)
+	defer cancelFunc()
+	err = ready(ctx, t, stream, queryOpts{
+		entity:    tsdb.Entity{tsdb.AnyEntry, tsdb.AnyEntry, tsdb.AnyEntry},
+		timeRange: tsdb.NewTimeRangeDuration(baseTime, 1*time.Hour),
+	})
+	require.NoError(testing, err)
+	return baseTime
+}
+
+func ready(ctx context.Context, t *assert.Assertions, stream *stream, options queryOpts) error {
+	for {
+	loop:
+		select {
+		case <-ctx.Done():
+			return ctx.Err()
+		default:
+			data, err := queryData(t, stream, options)
+			if err != nil {
+				return err
+			}
+			for _, d := range data {
+				if len(d.elements) < 1 {
+					time.Sleep(300 * time.Millisecond)
+					break loop
 				}
-				_ = closer.Close()
 			}
-
+			return nil
 		}
 	}
-	return baseTime
 }
diff --git a/banyand/stream/stream_write.go b/banyand/stream/stream_write.go
index fa80dc3..9ce2732 100644
--- a/banyand/stream/stream_write.go
+++ b/banyand/stream/stream_write.go
@@ -102,6 +102,7 @@ func (s *stream) write(shardID common.ShardID, value *streamv2.ElementValue) (*t
 			Int("ts_nano", t.Nanosecond()).
 			Interface("data", value).
 			Uint64("series_id", uint64(series.ID())).
+			Int("shard_id", int(shardID)).
 			Msg("write stream")
 		return writer, errWrite
 	}
diff --git a/banyand/stream/stream_write_test.go b/banyand/stream/stream_write_test.go
index 0ffe0d5..b0d8880 100644
--- a/banyand/stream/stream_write_test.go
+++ b/banyand/stream/stream_write_test.go
@@ -211,7 +211,7 @@ func Test_Stream_Write(t *testing.T) {
 func setup(t *assert.Assertions) (*stream, func()) {
 	t.NoError(logger.Init(logger.Logging{
 		Env:   "dev",
-		Level: "info",
+		Level: "trace",
 	}))
 	tempDir, deferFunc := test.Space(t)
 	streamRepo, err := schema.NewStream()
diff --git a/banyand/tsdb/block.go b/banyand/tsdb/block.go
index 44f1922..824c861 100644
--- a/banyand/tsdb/block.go
+++ b/banyand/tsdb/block.go
@@ -27,9 +27,9 @@ import (
 	"github.com/apache/skywalking-banyandb/api/common"
 	databasev2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v2"
 	"github.com/apache/skywalking-banyandb/banyand/kv"
-	"github.com/apache/skywalking-banyandb/pkg/convert"
 	"github.com/apache/skywalking-banyandb/pkg/index"
 	"github.com/apache/skywalking-banyandb/pkg/index/inverted"
+	"github.com/apache/skywalking-banyandb/pkg/index/lsm"
 	"github.com/apache/skywalking-banyandb/pkg/logger"
 )
 
@@ -39,15 +39,14 @@ type block struct {
 	ref  *z.Closer
 
 	store         kv.TimeSeriesStore
-	primaryIndex  kv.Store
-	invertedIndex inverted.GlobalStore
+	primaryIndex  index.Store
+	invertedIndex index.Store
+	lsmIndex      index.Store
 	closableLst   []io.Closer
 	endTime       time.Time
 	startTime     time.Time
 	segID         uint16
 	blockID       uint16
-
-	//revertedIndex kv.Store
 }
 
 type blockOpts struct {
@@ -76,7 +75,10 @@ func newBlock(ctx context.Context, opts blockOpts) (b *block, err error) {
 		kv.TSSWithLogger(b.l)); err != nil {
 		return nil, err
 	}
-	if b.primaryIndex, err = kv.OpenStore(0, b.path+"/p_index", kv.StoreWithLogger(b.l)); err != nil {
+	if b.primaryIndex, err = lsm.NewStore(lsm.StoreOpts{
+		Path:   b.path + "/primary",
+		Logger: b.l,
+	}); err != nil {
 		return nil, err
 	}
 	b.closableLst = append(b.closableLst, b.store, b.primaryIndex)
@@ -84,8 +86,18 @@ func newBlock(ctx context.Context, opts blockOpts) (b *block, err error) {
 	if !ok || len(rules) == 0 {
 		return b, nil
 	}
-	b.invertedIndex = inverted.NewStore("inverted")
-	return b, nil
+	b.invertedIndex, err = inverted.NewStore(inverted.StoreOpts{
+		Path:   b.path + "/inverted",
+		Logger: b.l,
+	})
+	if b.lsmIndex, err = lsm.NewStore(lsm.StoreOpts{
+		Path:   b.path + "/lsm",
+		Logger: b.l,
+	}); err != nil {
+		return nil, err
+	}
+	b.closableLst = append(b.closableLst, b.invertedIndex, b.lsmIndex)
+	return b, err
 }
 
 func (b *block) delegate() blockDelegate {
@@ -137,11 +149,11 @@ func (d *bDelegate) dataReader() kv.TimeSeriesReader {
 }
 
 func (d *bDelegate) lsmIndexReader() index.Searcher {
-	return d.delegate.invertedIndex.Searcher()
+	return d.delegate.lsmIndex
 }
 
 func (d *bDelegate) invertedIndexReader() index.Searcher {
-	return d.delegate.invertedIndex.Searcher()
+	return d.delegate.invertedIndex
 }
 
 func (d *bDelegate) primaryIndexReader() index.Searcher {
@@ -161,21 +173,21 @@ func (d *bDelegate) write(key []byte, val []byte, ts time.Time) error {
 }
 
 func (d *bDelegate) writePrimaryIndex(field index.Field, id common.ItemID) error {
-	return d.delegate.primaryIndex.Put(field.Marshal(), convert.Uint64ToBytes(uint64(id)))
+	return d.delegate.primaryIndex.Write(field, id)
 }
 
 func (d *bDelegate) writeLSMIndex(field index.Field, id common.ItemID) error {
-	if d.delegate.invertedIndex == nil {
+	if d.delegate.lsmIndex == nil {
 		return nil
 	}
-	return d.delegate.invertedIndex.Insert(field, id)
+	return d.delegate.lsmIndex.Write(field, id)
 }
 
 func (d *bDelegate) writeInvertedIndex(field index.Field, id common.ItemID) error {
 	if d.delegate.invertedIndex == nil {
 		return nil
 	}
-	return d.delegate.invertedIndex.Insert(field, id)
+	return d.delegate.invertedIndex.Write(field, id)
 }
 
 func (d *bDelegate) contains(ts time.Time) bool {
diff --git a/banyand/tsdb/series_seek.go b/banyand/tsdb/series_seek.go
index a6cf6c0..4763ec6 100644
--- a/banyand/tsdb/series_seek.go
+++ b/banyand/tsdb/series_seek.go
@@ -18,12 +18,11 @@
 package tsdb
 
 import (
-	"time"
-
 	"github.com/apache/skywalking-banyandb/api/common"
 	databasev2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v2"
 	modelv2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v2"
 	"github.com/apache/skywalking-banyandb/banyand/kv"
+	"github.com/apache/skywalking-banyandb/pkg/index"
 )
 
 type Iterator interface {
@@ -62,6 +61,7 @@ type seekerBuilder struct {
 	}
 	order               modelv2.QueryOrder_Sort
 	indexRuleForSorting *databasev2.IndexRule
+	rangeOptsForSorting index.RangeOpts
 }
 
 func (s *seekerBuilder) Build() (Seeker, error) {
@@ -72,16 +72,7 @@ func (s *seekerBuilder) Build() (Seeker, error) {
 	if err != nil {
 		return nil, err
 	}
-	filters := []filterFn{
-		func(item Item) bool {
-			valid := s.seriesSpan.timeRange.contains(item.Time())
-			timeRange := s.seriesSpan.timeRange
-			s.seriesSpan.l.Trace().
-				Times("time_range", []time.Time{timeRange.Start, timeRange.End}).
-				Bool("valid", valid).Msg("filter item by time range")
-			return valid
-		},
-	}
+	filters := make([]filterFn, 0, 2)
 	if indexFilter != nil {
 		filters = append(filters, indexFilter)
 	}
diff --git a/banyand/tsdb/series_seek_filter.go b/banyand/tsdb/series_seek_filter.go
index bd55f1c..8e92715 100644
--- a/banyand/tsdb/series_seek_filter.go
+++ b/banyand/tsdb/series_seek_filter.go
@@ -75,6 +75,13 @@ func (s *seekerBuilder) buildIndexFilter() (filterFn, error) {
 		if err != nil {
 			return err
 		}
+		rangeOpts, found := tree.TrimRangeLeaf(index.FieldKey{
+			SeriesID:  s.seriesSpan.seriesID,
+			IndexRule: s.indexRuleForSorting.GetMetadata().GetName(),
+		})
+		if found {
+			s.rangeOptsForSorting = rangeOpts
+		}
 		list, err := tree.Execute()
 		if err != nil {
 			return err
diff --git a/banyand/tsdb/series_seek_sort.go b/banyand/tsdb/series_seek_sort.go
index 7e9a308..dbe1947 100644
--- a/banyand/tsdb/series_seek_sort.go
+++ b/banyand/tsdb/series_seek_sort.go
@@ -27,6 +27,7 @@ import (
 	databasev2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v2"
 	modelv2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v2"
 	"github.com/apache/skywalking-banyandb/banyand/kv"
+	"github.com/apache/skywalking-banyandb/pkg/convert"
 	"github.com/apache/skywalking-banyandb/pkg/index"
 	"github.com/apache/skywalking-banyandb/pkg/index/posting"
 	"github.com/apache/skywalking-banyandb/pkg/logger"
@@ -48,23 +49,36 @@ func (s *seekerBuilder) buildSeries(filters []filterFn) []Iterator {
 	if s.indexRuleForSorting == nil {
 		return s.buildSeriesByTime(filters)
 	}
+	filters = append(filters, func(item Item) bool {
+		valid := s.seriesSpan.timeRange.contains(item.Time())
+		timeRange := s.seriesSpan.timeRange
+		s.seriesSpan.l.Trace().
+			Times("time_range", []time.Time{timeRange.Start, timeRange.End}).
+			Bool("valid", valid).Msg("filter item by time range")
+		return valid
+	})
 	return s.buildSeriesByIndex(filters)
 }
 
 func (s *seekerBuilder) buildSeriesByIndex(filters []filterFn) (series []Iterator) {
 	for _, b := range s.seriesSpan.blocks {
 		var inner index.FieldIterator
-		term := index.FieldKey{
+		var found bool
+		fieldKey := index.FieldKey{
 			SeriesID:  s.seriesSpan.seriesID,
 			IndexRule: s.indexRuleForSorting.GetMetadata().GetName(),
 		}
+
 		switch s.indexRuleForSorting.GetType() {
 		case databasev2.IndexRule_TYPE_TREE:
-			inner = b.lsmIndexReader().FieldIterator(term.Marshal(), s.order)
+			inner, found = b.lsmIndexReader().Iterator(fieldKey, s.rangeOptsForSorting, s.order)
 		case databasev2.IndexRule_TYPE_INVERTED:
-			inner = b.invertedIndexReader().FieldIterator(term.Marshal(), s.order)
+			inner, found = b.invertedIndexReader().Iterator(fieldKey, s.rangeOptsForSorting, s.order)
+		default:
+			// only tree index supports sorting
+			continue
 		}
-		if inner != nil {
+		if found {
 			series = append(series, newSearcherIterator(s.seriesSpan.l, inner, b.dataReader(), s.seriesSpan.seriesID, filters))
 		}
 	}
@@ -86,14 +100,23 @@ func (s *seekerBuilder) buildSeriesByTime(filters []filterFn) []Iterator {
 	}
 	delegated := make([]Iterator, 0, len(bb))
 	bTimes := make([]time.Time, 0, len(bb))
+	timeRange := s.seriesSpan.timeRange
+	termRange := index.RangeOpts{
+		Lower:         convert.Int64ToBytes(timeRange.Start.UnixNano()),
+		Upper:         convert.Int64ToBytes(timeRange.End.UnixNano()),
+		IncludesLower: true,
+	}
 	for _, b := range bb {
 		bTimes = append(bTimes, b.startTime())
-		inner := b.primaryIndexReader().
-			FieldIterator(
-				s.seriesSpan.seriesID.Marshal(),
+		inner, found := b.primaryIndexReader().
+			Iterator(
+				index.FieldKey{
+					SeriesID: s.seriesSpan.seriesID,
+				},
+				termRange,
 				s.order,
 			)
-		if inner != nil {
+		if found {
 			delegated = append(delegated, newSearcherIterator(s.seriesSpan.l, inner, b.dataReader(), s.seriesSpan.seriesID, filters))
 		}
 	}
@@ -101,6 +124,7 @@ func (s *seekerBuilder) buildSeriesByTime(filters []filterFn) []Iterator {
 		Str("order", modelv2.QueryOrder_Sort_name[int32(s.order)]).
 		Times("blocks", bTimes).
 		Uint64("series_id", uint64(s.seriesSpan.seriesID)).
+		Int("shard_id", int(s.seriesSpan.shardID)).
 		Msg("seek series by time")
 	return []Iterator{newMergedIterator(delegated)}
 }
@@ -122,7 +146,7 @@ func (s *searcherIterator) Next() bool {
 		if s.fieldIterator.Next() {
 			v := s.fieldIterator.Val()
 			s.cur = v.Value.Iterator()
-			s.curKey = v.Key
+			s.curKey = v.Term
 			s.l.Trace().Uint64("series_id", uint64(s.seriesID)).Hex("term", s.curKey).Msg("got a new field")
 		} else {
 			return false
diff --git a/banyand/tsdb/series_write.go b/banyand/tsdb/series_write.go
index 60fba52..ea5404b 100644
--- a/banyand/tsdb/series_write.go
+++ b/banyand/tsdb/series_write.go
@@ -172,7 +172,9 @@ func (w *writer) Write() (GlobalItemID, error) {
 		}
 	}
 	return id, w.block.writePrimaryIndex(index.Field{
-		Key:  id.SeriesID.Marshal(),
+		Key: index.FieldKey{
+			SeriesID: id.SeriesID,
+		}.Marshal(),
 		Term: convert.Int64ToBytes(w.ts.UnixNano()),
 	}, id.ID)
 }
diff --git a/pkg/index/index.go b/pkg/index/index.go
index bec268e..9be8831 100644
--- a/pkg/index/index.go
+++ b/pkg/index/index.go
@@ -19,18 +19,52 @@ package index
 
 import (
 	"bytes"
+	"io"
 
+	"github.com/pkg/errors"
+
+	"github.com/apache/skywalking-banyandb/api/common"
 	modelv2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v2"
 	"github.com/apache/skywalking-banyandb/pkg/index/posting"
 )
 
+var ErrMalformed = errors.New("the data is malformed")
+
+type FieldKey struct {
+	SeriesID  common.SeriesID
+	IndexRule string
+}
+
+func (f FieldKey) Marshal() []byte {
+	return bytes.Join([][]byte{
+		f.SeriesID.Marshal(),
+		[]byte(f.IndexRule),
+	}, nil)
+}
+
+func (f FieldKey) Equal(other FieldKey) bool {
+	return f.SeriesID == other.SeriesID && f.IndexRule == other.IndexRule
+}
+
 type Field struct {
 	Key  []byte
 	Term []byte
 }
 
 func (f Field) Marshal() []byte {
-	return bytes.Join([][]byte{f.Key, f.Term}, nil)
+	return bytes.Join([][]byte{f.Key, f.Term}, []byte(":"))
+}
+
+func (f *Field) Unmarshal(raw []byte) error {
+	bb := bytes.SplitN(raw, []byte(":"), 2)
+	if len(bb) < 2 {
+		return errors.Wrap(ErrMalformed, "unable to unmarshal the field")
+	}
+	f.Key = make([]byte, len(bb[0]))
+	copy(f.Key, bb[0])
+	f.Term = make([]byte, len(bb[1]))
+	copy(f.Term, bb[1])
+	return nil
 }
 
 type RangeOpts struct {
@@ -40,6 +74,32 @@ type RangeOpts struct {
 	IncludesLower bool
 }
 
+func (r RangeOpts) Between(value []byte) int {
+	if r.Upper != nil {
+		var in bool
+		if r.IncludesUpper {
+			in = bytes.Compare(r.Upper, value) >= 0
+		} else {
+			in = bytes.Compare(r.Upper, value) > 0
+		}
+		if !in {
+			return 1
+		}
+	}
+	if r.Lower != nil {
+		var in bool
+		if r.IncludesLower {
+			in = bytes.Compare(r.Lower, value) <= 0
+		} else {
+			in = bytes.Compare(r.Lower, value) < 0
+		}
+		if !in {
+			return -1
+		}
+	}
+	return 0
+}
+
 type FieldIterator interface {
 	Next() bool
 	Val() *PostingValue
@@ -47,13 +107,27 @@ type FieldIterator interface {
 }
 
 type PostingValue struct {
-	Key   []byte
+	Term  []byte
 	Value posting.List
 }
 
+type Writer interface {
+	Write(field Field, itemID common.ItemID) error
+}
+
+type FieldIterable interface {
+	Iterator(fieldKey FieldKey, termRange RangeOpts, order modelv2.QueryOrder_Sort) (iter FieldIterator, found bool)
+}
+
 type Searcher interface {
-	MatchField(fieldName []byte) (list posting.List)
-	MatchTerms(field Field) (list posting.List)
-	Range(fieldName []byte, opts RangeOpts) (list posting.List)
-	FieldIterator(fieldName []byte, order modelv2.QueryOrder_Sort) FieldIterator
+	FieldIterable
+	MatchField(fieldKey FieldKey) (list posting.List, err error)
+	MatchTerms(field Field) (list posting.List, err error)
+	Range(fieldKey FieldKey, opts RangeOpts) (list posting.List, err error)
+}
+
+type Store interface {
+	io.Closer
+	Writer
+	Searcher
 }
diff --git a/pkg/index/inverted/field_map.go b/pkg/index/inverted/field_map.go
index 85c7771..2ee0831 100644
--- a/pkg/index/inverted/field_map.go
+++ b/pkg/index/inverted/field_map.go
@@ -20,44 +20,44 @@ package inverted
 import (
 	"sync"
 
-	"github.com/pkg/errors"
-
 	"github.com/apache/skywalking-banyandb/api/common"
 	"github.com/apache/skywalking-banyandb/pkg/convert"
 	"github.com/apache/skywalking-banyandb/pkg/index"
 )
 
-var ErrFieldAbsent = errors.New("field doesn't exist")
-
 type fieldHashID uint64
 
 type fieldMap struct {
-	repo  map[fieldHashID]*fieldValue
+	repo  map[fieldHashID]*termContainer
+	lst   []fieldHashID
 	mutex sync.RWMutex
 }
 
 func newFieldMap(initialSize int) *fieldMap {
 	return &fieldMap{
-		repo: make(map[fieldHashID]*fieldValue, initialSize),
+		repo: make(map[fieldHashID]*termContainer, initialSize),
+		lst:  make([]fieldHashID, 0),
 	}
 }
 
-func (fm *fieldMap) createKey(key []byte) *fieldValue {
-	result := &fieldValue{
+func (fm *fieldMap) createKey(key []byte) *termContainer {
+	k := fieldHashID(convert.Hash(key))
+	result := &termContainer{
 		key:   key,
 		value: newPostingMap(),
 	}
-	fm.repo[fieldHashID(convert.Hash(key))] = result
+	fm.repo[k] = result
+	fm.lst = append(fm.lst, k)
 	return result
 }
 
-func (fm *fieldMap) get(key []byte) (*fieldValue, bool) {
+func (fm *fieldMap) get(key []byte) (*termContainer, bool) {
 	fm.mutex.RLock()
 	defer fm.mutex.RUnlock()
 	return fm.getWithoutLock(key)
 }
 
-func (fm *fieldMap) getWithoutLock(key []byte) (*fieldValue, bool) {
+func (fm *fieldMap) getWithoutLock(key []byte) (*termContainer, bool) {
 	v, ok := fm.repo[fieldHashID(convert.Hash(key))]
 	return v, ok
 }
@@ -72,7 +72,7 @@ func (fm *fieldMap) put(fv index.Field, id common.ItemID) error {
 	return pm.value.put(fv.Term, id)
 }
 
-type fieldValue struct {
+type termContainer struct {
 	key   []byte
-	value *postingMap
+	value *termMap
 }
diff --git a/pkg/index/inverted/inverted.go b/pkg/index/inverted/inverted.go
index 569d434..ce50740 100644
--- a/pkg/index/inverted/inverted.go
+++ b/pkg/index/inverted/inverted.go
@@ -18,30 +18,203 @@
 package inverted
 
 import (
+	"bytes"
+	"sync"
+
+	"github.com/pkg/errors"
+	"go.uber.org/multierr"
+
 	"github.com/apache/skywalking-banyandb/api/common"
+	modelv2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v2"
+	"github.com/apache/skywalking-banyandb/banyand/kv"
 	"github.com/apache/skywalking-banyandb/pkg/index"
+	"github.com/apache/skywalking-banyandb/pkg/index/posting"
+	"github.com/apache/skywalking-banyandb/pkg/index/posting/roaring"
+	"github.com/apache/skywalking-banyandb/pkg/logger"
 )
 
-type GlobalStore interface {
-	Searcher() index.Searcher
-	Insert(field index.Field, docID common.ItemID) error
-}
+var _ index.Store = (*store)(nil)
 
 type store struct {
-	memTable *MemTable
-	//TODO: add data tables
+	diskTable         kv.IndexStore
+	memTable          *memTable
+	immutableMemTable *memTable
+	rwMutex           sync.RWMutex
 }
 
-func (s *store) Searcher() index.Searcher {
-	return s.memTable
+type StoreOpts struct {
+	Path   string
+	Logger *logger.Logger
 }
 
-func (s *store) Insert(field index.Field, chunkID common.ItemID) error {
-	return s.memTable.Insert(field, chunkID)
+func NewStore(opts StoreOpts) (index.Store, error) {
+	diskTable, err := kv.OpenIndexStore(0, opts.Path, kv.IndexWithLogger(opts.Logger))
+	if err != nil {
+		return nil, err
+	}
+	return &store{
+		memTable:  newMemTable(),
+		diskTable: diskTable,
+	}, nil
 }
 
-func NewStore(name string) GlobalStore {
-	return &store{
-		memTable: NewMemTable(name),
+func (s *store) Close() error {
+	return s.diskTable.Close()
+}
+
+func (s *store) Write(field index.Field, chunkID common.ItemID) error {
+	return s.memTable.Write(field, chunkID)
+}
+
+func (s *store) Flush() error {
+	s.rwMutex.Lock()
+	defer s.rwMutex.Unlock()
+	if s.immutableMemTable == nil {
+		s.immutableMemTable = s.memTable
+		s.memTable = newMemTable()
+	}
+	err := s.diskTable.
+		Handover(s.immutableMemTable.Iter())
+	if err != nil {
+		return err
+	}
+	s.immutableMemTable = nil
+	return nil
+}
+
+func (s *store) MatchField(fieldKey index.FieldKey) (posting.List, error) {
+	return s.Range(fieldKey, index.RangeOpts{})
+}
+
+func (s *store) MatchTerms(field index.Field) (posting.List, error) {
+	result := roaring.NewPostingList()
+	result, errMem := s.searchInMemTables(result, func(table *memTable) (posting.List, error) {
+		list, errInner := table.MatchTerms(field)
+		if errInner != nil {
+			return nil, errInner
+		}
+		return list, nil
+	})
+	if errMem != nil {
+		return nil, errors.Wrap(errMem, "mem table of inverted index")
+	}
+	raw, errTable := s.diskTable.Get(field.Marshal())
+	switch {
+	case errors.Is(errTable, kv.ErrKeyNotFound):
+		return result, nil
+	case errTable != nil:
+		return nil, errors.Wrap(errTable, "disk table of inverted index")
 	}
+	list := roaring.NewPostingList()
+	err := list.Unmarshall(raw)
+	if err != nil {
+		return nil, err
+	}
+	err = result.Union(list)
+	if err != nil {
+		return nil, err
+	}
+	return result, nil
 }
+
+func (s *store) Range(fieldKey index.FieldKey, opts index.RangeOpts) (list posting.List, err error) {
+	iter, found := s.Iterator(fieldKey, opts, modelv2.QueryOrder_SORT_ASC)
+	if !found {
+		return roaring.EmptyPostingList, nil
+	}
+	list = roaring.NewPostingList()
+	for iter.Next() {
+		err = multierr.Append(err, list.Union(iter.Val().Value))
+	}
+	err = multierr.Append(err, iter.Close())
+	return
+}
+
+func (s *store) Iterator(fieldKey index.FieldKey, termRange index.RangeOpts,
+	order modelv2.QueryOrder_Sort) (index.FieldIterator, bool) {
+	s.rwMutex.RLock()
+	defer s.rwMutex.RUnlock()
+	tt := []*memTable{s.memTable, s.immutableMemTable}
+	iters := make([]index.FieldIterator, 0, len(tt)+1)
+	for _, table := range tt {
+		if table == nil {
+			continue
+		}
+		it, found := table.Iterator(fieldKey, termRange, order)
+		if !found {
+			continue
+		}
+		iters = append(iters, it)
+	}
+	it := index.NewFieldIteratorTemplate(fieldKey, termRange, order, s.diskTable, func(term, val []byte, delegated kv.Iterator) (*index.PostingValue, error) {
+		list := roaring.NewPostingList()
+		err := list.Unmarshall(val)
+		if err != nil {
+			return nil, err
+		}
+
+		pv := &index.PostingValue{
+			Term:  term,
+			Value: list,
+		}
+
+		for ; delegated.Valid(); delegated.Next() {
+			f := index.Field{}
+			err := f.Unmarshal(delegated.Key())
+			if err != nil {
+				return nil, err
+			}
+			if !bytes.Equal(f.Term, term) {
+				break
+			}
+			l := roaring.NewPostingList()
+			err = l.Unmarshall(delegated.Val())
+			if err != nil {
+				return nil, err
+			}
+			err = pv.Value.Union(l)
+			if err != nil {
+				return nil, err
+			}
+		}
+		return pv, nil
+	})
+	iters = append(iters, it)
+	if len(iters) < 1 {
+		return nil, false
+	}
+	var fn index.SwitchFn
+	switch order {
+	case modelv2.QueryOrder_SORT_ASC, modelv2.QueryOrder_SORT_UNSPECIFIED:
+		fn = func(a, b []byte) bool {
+			return bytes.Compare(a, b) > 0
+		}
+	case modelv2.QueryOrder_SORT_DESC:
+		fn = func(a, b []byte) bool {
+			return bytes.Compare(a, b) < 0
+		}
+	}
+	return index.NewMergedIterator(iters, fn), true
+}
+
+func (s *store) searchInMemTables(result posting.List, entityFunc entityFunc) (posting.List, error) {
+	s.rwMutex.RLock()
+	defer s.rwMutex.RUnlock()
+	tt := []*memTable{s.memTable, s.immutableMemTable}
+	for _, table := range tt {
+		if table == nil {
+			continue
+		}
+		list, err := entityFunc(table)
+		if err != nil {
+			return result, err
+		}
+		err = result.Union(list)
+		if err != nil {
+			return result, err
+		}
+	}
+	return result, nil
+}
+
+type entityFunc func(table *memTable) (posting.List, error)
diff --git a/pkg/index/inverted/inverted_test.go b/pkg/index/inverted/inverted_test.go
new file mode 100644
index 0000000..88a427b
--- /dev/null
+++ b/pkg/index/inverted/inverted_test.go
@@ -0,0 +1,142 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package inverted
+
+import (
+	"testing"
+
+	"github.com/stretchr/testify/assert"
+
+	"github.com/apache/skywalking-banyandb/pkg/index/posting"
+	"github.com/apache/skywalking-banyandb/pkg/index/posting/roaring"
+	"github.com/apache/skywalking-banyandb/pkg/index/test_cases"
+	"github.com/apache/skywalking-banyandb/pkg/logger"
+	"github.com/apache/skywalking-banyandb/pkg/test"
+)
+
+func TestStore_MatchTerm(t *testing.T) {
+	tester := assert.New(t)
+	path, fn := setUp(tester)
+	s, err := NewStore(StoreOpts{
+		Path:   path,
+		Logger: logger.GetLogger("test"),
+	})
+	defer func() {
+		tester.NoError(s.Close())
+		fn()
+	}()
+	tester.NoError(err)
+	test_cases.SetUp(tester, s)
+	test_cases.RunServiceName(t, s)
+}
+
+func TestStore_MatchTerm_AfterFlush(t *testing.T) {
+	tester := assert.New(t)
+	path, fn := setUp(tester)
+	s, err := NewStore(StoreOpts{
+		Path:   path,
+		Logger: logger.GetLogger("test"),
+	})
+	defer func() {
+		tester.NoError(s.Close())
+		fn()
+	}()
+	tester.NoError(err)
+	test_cases.SetUp(tester, s)
+	tester.NoError(s.(*store).Flush())
+	test_cases.RunServiceName(t, s)
+}
+
+func TestStore_Iterator(t *testing.T) {
+	tester := assert.New(t)
+	path, fn := setUp(tester)
+	s, err := NewStore(StoreOpts{
+		Path:   path,
+		Logger: logger.GetLogger("test"),
+	})
+	defer func() {
+		tester.NoError(s.Close())
+		fn()
+	}()
+	tester.NoError(err)
+	data := test_cases.SetUpDuration(tester, s)
+	test_cases.RunDuration(t, data, s)
+}
+
+func TestStore_Iterator_AfterFlush(t *testing.T) {
+	tester := assert.New(t)
+	path, fn := setUp(tester)
+	s, err := NewStore(StoreOpts{
+		Path:   path,
+		Logger: logger.GetLogger("test"),
+	})
+	defer func() {
+		tester.NoError(s.Close())
+		fn()
+	}()
+	tester.NoError(err)
+	data := test_cases.SetUpDuration(tester, s)
+	tester.NoError(s.(*store).Flush())
+	test_cases.RunDuration(t, data, s)
+}
+
+func TestStore_Iterator_Hybrid(t *testing.T) {
+	tester := assert.New(t)
+	path, fn := setUp(tester)
+	s, err := NewStore(StoreOpts{
+		Path:   path,
+		Logger: logger.GetLogger("test"),
+	})
+	defer func() {
+		tester.NoError(s.Close())
+		fn()
+	}()
+	tester.NoError(err)
+	r := map[int]posting.List{
+		50:   roaring.NewPostingList(),
+		200:  nil,
+		500:  roaring.NewPostingList(),
+		1000: nil,
+		2000: roaring.NewPostingList(),
+	}
+	data1 := test_cases.SetUpPartialDuration(tester, s, r)
+	tester.NoError(s.(*store).Flush())
+	r = map[int]posting.List{
+		50:   nil,
+		200:  roaring.NewPostingList(),
+		500:  nil,
+		1000: roaring.NewPostingList(),
+		2000: nil,
+	}
+	data := test_cases.SetUpPartialDuration(tester, s, r)
+	for i, list := range data {
+		if list == nil {
+			data[i] = data1[i]
+		}
+	}
+	test_cases.RunDuration(t, data, s)
+}
+
+func setUp(t *assert.Assertions) (tempDir string, deferFunc func()) {
+	t.NoError(logger.Init(logger.Logging{
+		Env:   "dev",
+		Level: "debug",
+	}))
+	tempDir, deferFunc = test.Space(t)
+	return tempDir, deferFunc
+}
diff --git a/pkg/index/inverted/mem.go b/pkg/index/inverted/mem.go
index f8db9b7..cd19027 100644
--- a/pkg/index/inverted/mem.go
+++ b/pkg/index/inverted/mem.go
@@ -21,41 +21,33 @@ import (
 	"bytes"
 	"sort"
 
-	"github.com/pkg/errors"
+	"go.uber.org/multierr"
 
 	"github.com/apache/skywalking-banyandb/api/common"
 	modelv2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v2"
+	"github.com/apache/skywalking-banyandb/banyand/kv"
 	"github.com/apache/skywalking-banyandb/pkg/index"
 	"github.com/apache/skywalking-banyandb/pkg/index/posting"
 	"github.com/apache/skywalking-banyandb/pkg/index/posting/roaring"
 )
 
-var ErrFieldsAbsent = errors.New("fields are absent")
-
-var _ index.Searcher = (*MemTable)(nil)
+var (
+	_ index.Writer        = (*memTable)(nil)
+	_ index.FieldIterable = (*memTable)(nil)
+)
 
-type MemTable struct {
-	terms *fieldMap
-	name  string
+type memTable struct {
+	fields *fieldMap
 }
 
-func NewMemTable(name string) *MemTable {
-	return &MemTable{
-		name:  name,
-		terms: newFieldMap(1000),
+func newMemTable() *memTable {
+	return &memTable{
+		fields: newFieldMap(1000),
 	}
 }
 
-func (m *MemTable) Insert(field index.Field, chunkID common.ItemID) error {
-	return m.terms.put(field, chunkID)
-}
-
-func (m *MemTable) MatchField(fieldName []byte) (list posting.List) {
-	fieldsValues, ok := m.terms.get(fieldName)
-	if !ok {
-		return roaring.EmptyPostingList
-	}
-	return fieldsValues.value.allValues()
+func (m *memTable) Write(field index.Field, chunkID common.ItemID) error {
+	return m.fields.put(field, chunkID)
 }
 
 var _ index.FieldIterator = (*fIterator)(nil)
@@ -64,7 +56,7 @@ type fIterator struct {
 	index     int
 	val       *index.PostingValue
 	keys      [][]byte
-	valueRepo *postingMap
+	valueRepo *termMap
 	closed    bool
 }
 
@@ -74,7 +66,6 @@ func (f *fIterator) Next() bool {
 	}
 	f.index++
 	if f.index >= len(f.keys) {
-		_ = f.Close()
 		return false
 	}
 	f.val = f.valueRepo.getEntry(f.keys[f.index])
@@ -93,7 +84,7 @@ func (f *fIterator) Close() error {
 	return nil
 }
 
-func newFieldIterator(keys [][]byte, fValue *postingMap) index.FieldIterator {
+func newFieldIterator(keys [][]byte, fValue *termMap) index.FieldIterator {
 	return &fIterator{
 		keys:      keys,
 		valueRepo: fValue,
@@ -101,45 +92,139 @@ func newFieldIterator(keys [][]byte, fValue *postingMap) index.FieldIterator {
 	}
 }
 
-func (m *MemTable) FieldIterator(fieldName []byte, order modelv2.QueryOrder_Sort) index.FieldIterator {
-	fieldsValues, ok := m.terms.get(fieldName)
+func (m *memTable) Iterator(fieldKey index.FieldKey, rangeOpts index.RangeOpts,
+	order modelv2.QueryOrder_Sort) (iter index.FieldIterator, found bool) {
+	fieldsValues, ok := m.fields.get(fieldKey.Marshal())
 	if !ok {
-		return nil
+		return nil, false
 	}
 	fValue := fieldsValues.value
-	var keys [][]byte
+	var terms [][]byte
 	{
 		fValue.mutex.RLock()
 		defer fValue.mutex.RUnlock()
 		for _, value := range fValue.repo {
-			keys = append(keys, value.Key)
+			if rangeOpts.Between(value.Term) == 0 {
+				terms = append(terms, value.Term)
+			}
 		}
 	}
+	if len(terms) < 1 {
+		return nil, false
+	}
 	switch order {
-	case modelv2.QueryOrder_SORT_ASC:
-		sort.SliceStable(keys, func(i, j int) bool {
-			return bytes.Compare(keys[i], keys[j]) < 0
+	case modelv2.QueryOrder_SORT_ASC, modelv2.QueryOrder_SORT_UNSPECIFIED:
+		sort.SliceStable(terms, func(i, j int) bool {
+			return bytes.Compare(terms[i], terms[j]) < 0
 		})
 	case modelv2.QueryOrder_SORT_DESC:
-		sort.SliceStable(keys, func(i, j int) bool {
-			return bytes.Compare(keys[i], keys[j]) > 0
+		sort.SliceStable(terms, func(i, j int) bool {
+			return bytes.Compare(terms[i], terms[j]) > 0
 		})
 	}
-	return newFieldIterator(keys, fValue)
+	return newFieldIterator(terms, fValue), true
 }
 
-func (m *MemTable) MatchTerms(field index.Field) (list posting.List) {
-	fieldsValues, ok := m.terms.get(field.Key)
+func (m *memTable) MatchTerms(field index.Field) (posting.List, error) {
+	fieldsValues, ok := m.fields.get(field.Key)
 	if !ok {
-		return roaring.EmptyPostingList
+		return roaring.EmptyPostingList, nil
+	}
+	list := fieldsValues.value.get(field.Term)
+	if list == nil {
+		return roaring.EmptyPostingList, nil
 	}
-	return fieldsValues.value.get(field.Term).Clone()
+	return list, nil
 }
 
-func (m *MemTable) Range(fieldName []byte, opts index.RangeOpts) (list posting.List) {
-	fieldsValues, ok := m.terms.get(fieldName)
-	if !ok {
-		return roaring.EmptyPostingList
+var _ kv.Iterator = (*flushIterator)(nil)
+
+type flushIterator struct {
+	fieldIdx int
+	termIdx  int
+	key      []byte
+	value    []byte
+	fields   *fieldMap
+	valid    bool
+	err      error
+}
+
+func (i *flushIterator) Next() {
+	if i.fieldIdx >= len(i.fields.lst) {
+		i.valid = false
+		return
+	}
+	fieldID := i.fields.lst[i.fieldIdx]
+	terms := i.fields.repo[fieldID]
+	if i.termIdx < len(terms.value.lst) {
+		i.termIdx++
+		if !i.setCurr() {
+			i.Next()
+		}
+		return
+	}
+	i.fieldIdx++
+	i.termIdx = 0
+	if !i.setCurr() {
+		i.Next()
+	}
+}
+
+func (i *flushIterator) Rewind() {
+	i.fieldIdx = 0
+	i.termIdx = 0
+	i.valid = true
+	if !i.setCurr() {
+		i.valid = false
+	}
+}
+
+func (i *flushIterator) Seek(_ []byte) {
+	panic("unsupported")
+}
+
+func (i *flushIterator) Key() []byte {
+	return i.key
+}
+
+func (i *flushIterator) Val() []byte {
+	return i.value
+}
+
+func (i *flushIterator) Valid() bool {
+	return i.valid
+}
+
+func (i *flushIterator) Close() error {
+	return i.err
+}
+
+func (i *flushIterator) setCurr() bool {
+	if i.fieldIdx >= len(i.fields.lst) {
+		return false
+	}
+	fieldID := i.fields.lst[i.fieldIdx]
+	term := i.fields.repo[fieldID]
+	if i.termIdx >= len(term.value.lst) {
+		return false
+	}
+	valueID := term.value.lst[i.termIdx]
+	value := term.value.repo[valueID]
+	v, err := value.Value.Marshall()
+	if err != nil {
+		i.err = multierr.Append(i.err, err)
+		return false
+	}
+	i.value = v
+	i.key = index.Field{
+		Key:  term.key,
+		Term: value.Term,
+	}.Marshal()
+	return true
+}
+
+func (m *memTable) Iter() kv.Iterator {
+	return &flushIterator{
+		fields: m.fields,
 	}
-	return fieldsValues.value.getRange(opts)
 }
diff --git a/pkg/index/inverted/mem_test.go b/pkg/index/inverted/mem_test.go
index 99caa2c..9e7c4eb 100644
--- a/pkg/index/inverted/mem_test.go
+++ b/pkg/index/inverted/mem_test.go
@@ -18,242 +18,21 @@
 package inverted
 
 import (
-	"reflect"
 	"testing"
 
 	"github.com/stretchr/testify/assert"
 
-	"github.com/apache/skywalking-banyandb/api/common"
-	modelv2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v2"
-	"github.com/apache/skywalking-banyandb/pkg/convert"
-	"github.com/apache/skywalking-banyandb/pkg/index"
-	"github.com/apache/skywalking-banyandb/pkg/index/posting"
-	"github.com/apache/skywalking-banyandb/pkg/index/posting/roaring"
+	"github.com/apache/skywalking-banyandb/pkg/index/test_cases"
 )
 
-func TestMemTable_Range(t *testing.T) {
-	type args struct {
-		fieldName []byte
-		opts      index.RangeOpts
-	}
-	m := NewMemTable("sw")
-	setUp(t, m)
-	tests := []struct {
-		name     string
-		args     args
-		wantList posting.List
-	}{
-		{
-			name: "in range",
-			args: args{
-				fieldName: []byte("duration"),
-				opts: index.RangeOpts{
-					Lower: convert.Uint16ToBytes(100),
-					Upper: convert.Uint16ToBytes(500),
-				},
-			},
-			wantList: m.MatchTerms(index.Field{
-				Key:  []byte("duration"),
-				Term: convert.Uint16ToBytes(200),
-			}),
-		},
-		{
-			name: "excludes edge",
-			args: args{
-				fieldName: []byte("duration"),
-				opts: index.RangeOpts{
-					Lower: convert.Uint16ToBytes(50),
-					Upper: convert.Uint16ToBytes(1000),
-				},
-			},
-			wantList: union(m,
-				index.Field{
-					Key:  []byte("duration"),
-					Term: convert.Uint16ToBytes(200),
-				},
-			),
-		},
-		{
-			name: "includes lower",
-			args: args{
-				fieldName: []byte("duration"),
-				opts: index.RangeOpts{
-					Lower:         convert.Uint16ToBytes(50),
-					Upper:         convert.Uint16ToBytes(1000),
-					IncludesLower: true,
-				},
-			},
-			wantList: union(m,
-				index.Field{
-					Key:  []byte("duration"),
-					Term: convert.Uint16ToBytes(50),
-				},
-				index.Field{
-					Key:  []byte("duration"),
-					Term: convert.Uint16ToBytes(200),
-				},
-			),
-		},
-		{
-			name: "includes upper",
-			args: args{
-				fieldName: []byte("duration"),
-				opts: index.RangeOpts{
-					Lower:         convert.Uint16ToBytes(50),
-					Upper:         convert.Uint16ToBytes(1000),
-					IncludesUpper: true,
-				},
-			},
-			wantList: union(m,
-				index.Field{
-					Key:  []byte("duration"),
-					Term: convert.Uint16ToBytes(200),
-				},
-				index.Field{
-					Key:  []byte("duration"),
-					Term: convert.Uint16ToBytes(1000),
-				},
-			),
-		},
-		{
-			name: "includes edges",
-			args: args{
-				fieldName: []byte("duration"),
-				opts: index.RangeOpts{
-					Lower:         convert.Uint16ToBytes(50),
-					Upper:         convert.Uint16ToBytes(1000),
-					IncludesUpper: true,
-					IncludesLower: true,
-				},
-			},
-			wantList: union(m,
-				index.Field{
-					Key:  []byte("duration"),
-					Term: convert.Uint16ToBytes(50),
-				},
-				index.Field{
-					Key:  []byte("duration"),
-					Term: convert.Uint16ToBytes(200),
-				},
-				index.Field{
-					Key:  []byte("duration"),
-					Term: convert.Uint16ToBytes(1000),
-				},
-			),
-		},
-		{
-			name: "match one",
-			args: args{
-				fieldName: []byte("duration"),
-				opts: index.RangeOpts{
-					Lower:         convert.Uint16ToBytes(200),
-					Upper:         convert.Uint16ToBytes(200),
-					IncludesUpper: true,
-					IncludesLower: true,
-				},
-			},
-			wantList: union(m,
-				index.Field{
-					Key:  []byte("duration"),
-					Term: convert.Uint16ToBytes(200),
-				},
-			),
-		},
-	}
-	for _, tt := range tests {
-		t.Run(tt.name, func(t *testing.T) {
-			if gotList := m.Range(tt.args.fieldName, tt.args.opts); !reflect.DeepEqual(gotList, tt.wantList) {
-				t.Errorf("Range() = %v, want %v", gotList.Len(), tt.wantList.Len())
-			}
-		})
-	}
+func TestMemTable_MatchTerm(t *testing.T) {
+	mt := newMemTable()
+	test_cases.SetUp(assert.New(t), mt)
+	test_cases.RunServiceName(t, mt)
 }
 
 func TestMemTable_Iterator(t *testing.T) {
-	tester := assert.New(t)
-	type args struct {
-		fieldName []byte
-		orderType modelv2.QueryOrder_Sort
-	}
-	m := NewMemTable("sw")
-	setUp(t, m)
-	tests := []struct {
-		name string
-		args args
-		want [][]byte
-	}{
-		{
-			name: "sort asc",
-			args: args{
-				fieldName: []byte("duration"),
-				orderType: modelv2.QueryOrder_SORT_ASC,
-			},
-			want: [][]byte{convert.Uint16ToBytes(50), convert.Uint16ToBytes(200), convert.Uint16ToBytes(1000)},
-		},
-		{
-			name: "sort desc",
-			args: args{
-				fieldName: []byte("duration"),
-				orderType: modelv2.QueryOrder_SORT_DESC,
-			},
-			want: [][]byte{convert.Uint16ToBytes(1000), convert.Uint16ToBytes(200), convert.Uint16ToBytes(50)},
-		},
-	}
-	for _, tt := range tests {
-		t.Run(tt.name, func(t *testing.T) {
-			iter := m.FieldIterator(tt.args.fieldName, tt.args.orderType)
-			tester.NotNil(iter)
-			var got [][]byte
-			defer func() {
-				_ = iter.Close()
-			}()
-			for iter.Next() {
-				got = append(got, iter.Val().Key)
-			}
-			tester.Equal(tt.want, got)
-		})
-	}
-}
-
-func union(memTable *MemTable, fields ...index.Field) posting.List {
-	result := roaring.NewPostingList()
-	for _, f := range fields {
-		_ = result.Union(memTable.MatchTerms(f))
-	}
-	return result
-}
-
-func setUp(t *testing.T, mt *MemTable) {
-	for i := 0; i < 100; i++ {
-		if i%2 == 0 {
-			assert.NoError(t, mt.Insert(index.Field{
-				Key:  []byte("service_name"),
-				Term: []byte("gateway"),
-			}, common.ItemID(i)))
-		} else {
-			assert.NoError(t, mt.Insert(index.Field{
-				Key:  []byte("service_name"),
-				Term: []byte("webpage"),
-			}, common.ItemID(i)))
-		}
-	}
-	for i := 100; i < 200; i++ {
-		switch {
-		case i%3 == 0:
-			assert.NoError(t, mt.Insert(index.Field{
-				Key:  []byte("duration"),
-				Term: convert.Uint16ToBytes(50),
-			}, common.ItemID(i)))
-		case i%3 == 1:
-			assert.NoError(t, mt.Insert(index.Field{
-				Key:  []byte("duration"),
-				Term: convert.Uint16ToBytes(200),
-			}, common.ItemID(i)))
-		case i%3 == 2:
-			assert.NoError(t, mt.Insert(index.Field{
-				Key:  []byte("duration"),
-				Term: convert.Uint16ToBytes(1000),
-			}, common.ItemID(i)))
-		}
-	}
+	mt := newMemTable()
+	data := test_cases.SetUpDuration(assert.New(t), mt)
+	test_cases.RunDuration(t, data, mt)
 }
diff --git a/pkg/index/inverted/term_map.go b/pkg/index/inverted/term_map.go
index c672239..e683837 100644
--- a/pkg/index/inverted/term_map.go
+++ b/pkg/index/inverted/term_map.go
@@ -18,8 +18,6 @@
 package inverted
 
 import (
-	"bytes"
-	"sort"
 	"sync"
 
 	"github.com/apache/skywalking-banyandb/api/common"
@@ -31,48 +29,50 @@ import (
 
 type termHashID uint64
 
-type postingMap struct {
+type termMap struct {
 	repo  map[termHashID]*index.PostingValue
+	lst   []termHashID
 	mutex sync.RWMutex
 }
 
-func newPostingMap() *postingMap {
-	return &postingMap{
+func newPostingMap() *termMap {
+	return &termMap{
 		repo: make(map[termHashID]*index.PostingValue),
 	}
 }
 
-func (p *postingMap) put(key []byte, id common.ItemID) error {
+func (p *termMap) put(key []byte, id common.ItemID) error {
 	list := p.getOrCreate(key)
 	list.Insert(id)
 	return nil
 }
 
-func (p *postingMap) getOrCreate(key []byte) posting.List {
+func (p *termMap) getOrCreate(key []byte) posting.List {
 	list := p.get(key)
-	if list != roaring.EmptyPostingList {
+	if list != nil {
 		return list
 	}
 	p.mutex.Lock()
 	defer p.mutex.Unlock()
 	hashedKey := termHashID(convert.Hash(key))
 	v := &index.PostingValue{
-		Key:   key,
+		Term:  key,
 		Value: roaring.NewPostingList(),
 	}
 	p.repo[hashedKey] = v
+	p.lst = append(p.lst, hashedKey)
 	return v.Value
 }
 
-func (p *postingMap) get(key []byte) posting.List {
+func (p *termMap) get(key []byte) posting.List {
 	e := p.getEntry(key)
 	if e == nil {
-		return roaring.EmptyPostingList
+		return nil
 	}
 	return e.Value
 }
 
-func (p *postingMap) getEntry(key []byte) *index.PostingValue {
+func (p *termMap) getEntry(key []byte) *index.PostingValue {
 	p.mutex.RLock()
 	defer p.mutex.RUnlock()
 	hashedKey := termHashID(convert.Hash(key))
@@ -82,80 +82,3 @@ func (p *postingMap) getEntry(key []byte) *index.PostingValue {
 	}
 	return v
 }
-
-func (p *postingMap) allValues() posting.List {
-	result := roaring.NewPostingList()
-	for _, value := range p.repo {
-		_ = result.Union(value.Value)
-	}
-	return result
-}
-
-func (p *postingMap) getRange(opts index.RangeOpts) posting.List {
-	switch bytes.Compare(opts.Upper, opts.Lower) {
-	case -1:
-		return roaring.EmptyPostingList
-	case 0:
-		if opts.IncludesUpper && opts.IncludesLower {
-			return p.get(opts.Upper)
-		}
-		return roaring.EmptyPostingList
-	}
-	p.mutex.RLock()
-	defer p.mutex.RUnlock()
-	keys := make(Asc, 0, len(p.repo))
-	for _, v := range p.repo {
-		keys = append(keys, v.Key)
-	}
-	sort.Sort(keys)
-	index := sort.Search(len(keys), func(i int) bool {
-		return bytes.Compare(keys[i], opts.Lower) >= 0
-	})
-	result := roaring.NewPostingList()
-	for i := index; i < len(keys); i++ {
-		k := keys[i]
-		switch {
-		case bytes.Equal(k, opts.Lower):
-			if opts.IncludesLower {
-				_ = result.Union(p.repo[termHashID(convert.Hash(k))].Value)
-			}
-		case bytes.Compare(k, opts.Upper) > 0:
-			break
-		case bytes.Equal(k, opts.Upper):
-			if opts.IncludesUpper {
-				_ = result.Union(p.repo[termHashID(convert.Hash(k))].Value)
-			}
-		default:
-			_ = result.Union(p.repo[termHashID(convert.Hash(k))].Value)
-		}
-	}
-	return result
-}
-
-type Asc [][]byte
-
-func (a Asc) Len() int {
-	return len(a)
-}
-
-func (a Asc) Less(i, j int) bool {
-	return bytes.Compare(a[i], a[j]) < 0
-}
-
-func (a Asc) Swap(i, j int) {
-	a[i], a[j] = a[j], a[i]
-}
-
-type Desc [][]byte
-
-func (d Desc) Len() int {
-	return len(d)
-}
-
-func (d Desc) Less(i, j int) bool {
-	return bytes.Compare(d[i], d[j]) > 0
-}
-
-func (d Desc) Swap(i, j int) {
-	d[i], d[j] = d[j], d[i]
-}
diff --git a/pkg/index/iterator.go b/pkg/index/iterator.go
new file mode 100644
index 0000000..439b627
--- /dev/null
+++ b/pkg/index/iterator.go
@@ -0,0 +1,209 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package index
+
+import (
+	"bytes"
+
+	"go.uber.org/multierr"
+
+	modelv2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v2"
+	"github.com/apache/skywalking-banyandb/banyand/kv"
+)
+
+type CompositePostingValueFn = func(term, value []byte, delegated kv.Iterator) (*PostingValue, error)
+
+var _ FieldIterator = (*FieldIteratorTemplate)(nil)
+
+type FieldIteratorTemplate struct {
+	delegated kv.Iterator
+
+	init      bool
+	curr      *PostingValue
+	err       error
+	termRange RangeOpts
+	fn        CompositePostingValueFn
+	reverse   bool
+	field     Field
+}
+
+func (f *FieldIteratorTemplate) Next() bool {
+	if !f.init {
+		f.init = true
+		f.delegated.Seek(f.field.Marshal())
+	}
+	if !f.delegated.Valid() {
+		return false
+	}
+	field := &Field{}
+	err := field.Unmarshal(f.delegated.Key())
+	if err != nil {
+		f.err = err
+		return false
+	}
+	if !bytes.Equal(field.Key, f.field.Key) {
+		return false
+	}
+	pv, err := f.fn(field.Term, f.delegated.Val(), f.delegated)
+	if err != nil {
+		f.err = err
+		return false
+	}
+	in := f.termRange.Between(pv.Term)
+	switch {
+	case in > 0:
+		if f.reverse {
+			return f.Next()
+		} else {
+			return false
+		}
+	case in < 0:
+		if f.reverse {
+			return false
+		} else {
+			return f.Next()
+		}
+	}
+	f.curr = pv
+	return true
+}
+
+func (f *FieldIteratorTemplate) Val() *PostingValue {
+	return f.curr
+}
+
+func (f *FieldIteratorTemplate) Close() error {
+	return f.delegated.Close()
+}
+
+func NewFieldIteratorTemplate(fieldKey FieldKey, termRange RangeOpts, order modelv2.QueryOrder_Sort, iterable kv.Iterable, fn CompositePostingValueFn) *FieldIteratorTemplate {
+	var reverse bool
+	var term []byte
+	switch order {
+	case modelv2.QueryOrder_SORT_ASC, modelv2.QueryOrder_SORT_UNSPECIFIED:
+		term = termRange.Lower
+		reverse = false
+	case modelv2.QueryOrder_SORT_DESC:
+		term = termRange.Upper
+		reverse = true
+	}
+	if order == modelv2.QueryOrder_SORT_DESC {
+		reverse = true
+	}
+	iter := iterable.NewIterator(kv.ScanOpts{
+		Prefix:  fieldKey.Marshal(),
+		Reverse: reverse,
+	})
+	return &FieldIteratorTemplate{
+		delegated: iter,
+		termRange: termRange,
+		fn:        fn,
+		reverse:   reverse,
+		field: Field{
+			Key:  fieldKey.Marshal(),
+			Term: term,
+		},
+	}
+}
+
+type SwitchFn = func(a, b []byte) bool
+
+var _ FieldIterator = (*mergedIterator)(nil)
+
+type mergedIterator struct {
+	inner        []FieldIterator
+	drained      []FieldIterator
+	drainedCount int
+	cur          *PostingValue
+	switchFn     SwitchFn
+	init         bool
+	closed       bool
+}
+
+func NewMergedIterator(merged []FieldIterator, fn SwitchFn) FieldIterator {
+	return &mergedIterator{
+		inner:    merged,
+		drained:  make([]FieldIterator, len(merged)),
+		switchFn: fn,
+	}
+}
+
+func (m *mergedIterator) Next() bool {
+	if m.closed {
+		return false
+	}
+	if m.allDrained() {
+		return false
+	}
+	if !m.init {
+		for i, iterator := range m.inner {
+			if !iterator.Next() {
+				m.drain(i)
+			}
+		}
+		if m.allDrained() {
+			return false
+		}
+		m.init = true
+	}
+	var head FieldIterator
+	var headIndex int
+	for i, iterator := range m.inner {
+		if iterator == nil {
+			continue
+		}
+		if head == nil {
+			head = iterator
+			continue
+		}
+		if m.switchFn(head.Val().Term, iterator.Val().Term) {
+			head = iterator
+			headIndex = i
+		}
+	}
+	m.cur = head.Val()
+	if !head.Next() {
+		m.drain(headIndex)
+	}
+	return true
+}
+
+func (m *mergedIterator) Val() *PostingValue {
+	return m.cur
+}
+
+func (m *mergedIterator) Close() error {
+	m.closed = true
+	var err error
+	for _, iterator := range m.drained {
+		if iterator == nil {
+			continue
+		}
+		err = multierr.Append(err, iterator.Close())
+	}
+	return err
+}
+
+func (m *mergedIterator) drain(index int) {
+	m.drained[index], m.inner[index] = m.inner[index], nil
+	m.drainedCount++
+}
+
+func (m *mergedIterator) allDrained() bool {
+	return m.drainedCount == len(m.inner)
+}
diff --git a/pkg/index/inverted/inverted.go b/pkg/index/lsm/lsm.go
similarity index 57%
copy from pkg/index/inverted/inverted.go
copy to pkg/index/lsm/lsm.go
index 569d434..14961a8 100644
--- a/pkg/index/inverted/inverted.go
+++ b/pkg/index/lsm/lsm.go
@@ -15,33 +15,43 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package inverted
+package lsm
 
 import (
 	"github.com/apache/skywalking-banyandb/api/common"
+	"github.com/apache/skywalking-banyandb/banyand/kv"
+	"github.com/apache/skywalking-banyandb/pkg/convert"
 	"github.com/apache/skywalking-banyandb/pkg/index"
+	"github.com/apache/skywalking-banyandb/pkg/logger"
 )
 
-type GlobalStore interface {
-	Searcher() index.Searcher
-	Insert(field index.Field, docID common.ItemID) error
-}
+var _ index.Store = (*store)(nil)
 
 type store struct {
-	memTable *MemTable
-	//TODO: add data tables
+	lsm kv.Store
 }
 
-func (s *store) Searcher() index.Searcher {
-	return s.memTable
+func (s *store) Close() error {
+	return s.lsm.Close()
 }
 
-func (s *store) Insert(field index.Field, chunkID common.ItemID) error {
-	return s.memTable.Insert(field, chunkID)
+func (s *store) Write(field index.Field, itemID common.ItemID) error {
+	itemIDInt := uint64(itemID)
+	return s.lsm.PutWithVersion(field.Marshal(), convert.Uint64ToBytes(itemIDInt), itemIDInt)
 }
 
-func NewStore(name string) GlobalStore {
-	return &store{
-		memTable: NewMemTable(name),
+type StoreOpts struct {
+	Path   string
+	Logger *logger.Logger
+}
+
+func NewStore(opts StoreOpts) (index.Store, error) {
+	var err error
+	var lsm kv.Store
+	if lsm, err = kv.OpenStore(0, opts.Path, kv.StoreWithLogger(opts.Logger)); err != nil {
+		return nil, err
 	}
+	return &store{
+		lsm: lsm,
+	}, nil
 }
diff --git a/pkg/index/lsm/lsm_test.go b/pkg/index/lsm/lsm_test.go
new file mode 100644
index 0000000..2e8f8f8
--- /dev/null
+++ b/pkg/index/lsm/lsm_test.go
@@ -0,0 +1,69 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package lsm
+
+import (
+	"testing"
+
+	"github.com/stretchr/testify/assert"
+
+	"github.com/apache/skywalking-banyandb/pkg/index/test_cases"
+	"github.com/apache/skywalking-banyandb/pkg/logger"
+	"github.com/apache/skywalking-banyandb/pkg/test"
+)
+
+func TestStore_MatchTerm(t *testing.T) {
+	tester := assert.New(t)
+	path, fn := setUp(tester)
+	s, err := NewStore(StoreOpts{
+		Path:   path,
+		Logger: logger.GetLogger("test"),
+	})
+	defer func() {
+		tester.NoError(s.Close())
+		fn()
+	}()
+	tester.NoError(err)
+	test_cases.SetUp(tester, s)
+	test_cases.RunServiceName(t, s)
+}
+
+func TestStore_Iterator(t *testing.T) {
+	tester := assert.New(t)
+	path, fn := setUp(tester)
+	s, err := NewStore(StoreOpts{
+		Path:   path,
+		Logger: logger.GetLogger("test"),
+	})
+	defer func() {
+		tester.NoError(s.Close())
+		fn()
+	}()
+	tester.NoError(err)
+	data := test_cases.SetUpDuration(tester, s)
+	test_cases.RunDuration(t, data, s)
+}
+
+func setUp(t *assert.Assertions) (tempDir string, deferFunc func()) {
+	t.NoError(logger.Init(logger.Logging{
+		Env:   "dev",
+		Level: "info",
+	}))
+	tempDir, deferFunc = test.Space(t)
+	return tempDir, deferFunc
+}
diff --git a/pkg/index/lsm/search.go b/pkg/index/lsm/search.go
new file mode 100644
index 0000000..53bf0aa
--- /dev/null
+++ b/pkg/index/lsm/search.go
@@ -0,0 +1,84 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package lsm
+
+import (
+	"bytes"
+
+	"github.com/pkg/errors"
+	"go.uber.org/multierr"
+
+	"github.com/apache/skywalking-banyandb/api/common"
+	modelv2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v2"
+	"github.com/apache/skywalking-banyandb/banyand/kv"
+	"github.com/apache/skywalking-banyandb/pkg/convert"
+	"github.com/apache/skywalking-banyandb/pkg/index"
+	"github.com/apache/skywalking-banyandb/pkg/index/posting"
+	"github.com/apache/skywalking-banyandb/pkg/index/posting/roaring"
+)
+
+func (s *store) MatchField(fieldKey index.FieldKey) (list posting.List, err error) {
+	return s.Range(fieldKey, index.RangeOpts{})
+}
+
+func (s *store) MatchTerms(field index.Field) (list posting.List, err error) {
+	list = roaring.NewPostingList()
+	err = s.lsm.GetAll(field.Marshal(), func(itemID []byte) error {
+		list.Insert(common.ItemID(convert.BytesToUint64(itemID)))
+		return nil
+	})
+	if errors.Is(err, kv.ErrKeyNotFound) {
+		return roaring.EmptyPostingList, nil
+	}
+	return
+}
+
+func (s *store) Range(fieldKey index.FieldKey, opts index.RangeOpts) (list posting.List, err error) {
+	iter, found := s.Iterator(fieldKey, opts, modelv2.QueryOrder_SORT_ASC)
+	if !found {
+		return roaring.EmptyPostingList, nil
+	}
+	list = roaring.NewPostingList()
+	for iter.Next() {
+		err = multierr.Append(err, list.Union(iter.Val().Value))
+	}
+	err = multierr.Append(err, iter.Close())
+	return
+}
+
+func (s *store) Iterator(fieldKey index.FieldKey, termRange index.RangeOpts, order modelv2.QueryOrder_Sort) (index.FieldIterator, bool) {
+	return index.NewFieldIteratorTemplate(fieldKey, termRange, order, s.lsm, func(term, value []byte, delegated kv.Iterator) (*index.PostingValue, error) {
+		pv := &index.PostingValue{
+			Term:  term,
+			Value: roaring.NewPostingListWithInitialData(convert.BytesToUint64(value)),
+		}
+
+		for ; delegated.Valid(); delegated.Next() {
+			f := index.Field{}
+			err := f.Unmarshal(delegated.Key())
+			if err != nil {
+				return nil, err
+			}
+			if !bytes.Equal(f.Term, term) {
+				break
+			}
+			pv.Value.Insert(common.ItemID(convert.BytesToUint64(delegated.Val())))
+		}
+		return pv, nil
+	}), true
+}
diff --git a/pkg/index/posting/roaring/roaring.go b/pkg/index/posting/roaring/roaring.go
index 8f1f3d7..d121f89 100644
--- a/pkg/index/posting/roaring/roaring.go
+++ b/pkg/index/posting/roaring/roaring.go
@@ -62,6 +62,14 @@ func NewPostingListWithInitialData(data ...uint64) posting.List {
 	return list
 }
 
+func NewRange(start, end uint64) posting.List {
+	list := &postingsList{
+		bitmap: roaring64.New(),
+	}
+	list.bitmap.AddRange(start, end)
+	return list
+}
+
 func (p *postingsList) Contains(id common.ItemID) bool {
 	return p.bitmap.Contains(uint64(id))
 }
diff --git a/pkg/index/test_cases/duration.go b/pkg/index/test_cases/duration.go
new file mode 100644
index 0000000..af4bf1a
--- /dev/null
+++ b/pkg/index/test_cases/duration.go
@@ -0,0 +1,329 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package test_cases
+
+import (
+	"sort"
+	"testing"
+
+	"github.com/stretchr/testify/assert"
+	"github.com/stretchr/testify/require"
+
+	"github.com/apache/skywalking-banyandb/api/common"
+	modelv2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v2"
+	"github.com/apache/skywalking-banyandb/pkg/convert"
+	"github.com/apache/skywalking-banyandb/pkg/index"
+	"github.com/apache/skywalking-banyandb/pkg/index/posting"
+	"github.com/apache/skywalking-banyandb/pkg/index/posting/roaring"
+)
+
+var (
+	duration = index.FieldKey{
+		IndexRule: "duration",
+	}
+)
+
+type SimpleStore interface {
+	index.FieldIterable
+	index.Writer
+	MatchTerms(field index.Field) (list posting.List, err error)
+}
+
+type args struct {
+	fieldKey  index.FieldKey
+	termRange index.RangeOpts
+	orderType modelv2.QueryOrder_Sort
+}
+
+type result struct {
+	key   []byte
+	items posting.List
+}
+
+func RunDuration(t *testing.T, data map[int]posting.List, store SimpleStore) {
+	tester := assert.New(t)
+	is := require.New(t)
+	tests := []struct {
+		name string
+		args args
+		want []int
+	}{
+		//{
+		//	name: "sort in asc order",
+		//	args: args{
+		//		fieldKey:  duration,
+		//		orderType: modelv2.QueryOrder_SORT_ASC,
+		//	},
+		//	want: []int{50, 200, 500, 1000, 2000},
+		//},
+		//{
+		//	name: "sort in desc order",
+		//	args: args{
+		//		fieldKey:  duration,
+		//		orderType: modelv2.QueryOrder_SORT_DESC,
+		//	},
+		//	want: []int{2000, 1000, 500, 200, 50},
+		//},
+		//{
+		//	name: "scan in (lower, upper) and sort in asc order",
+		//	args: args{
+		//		fieldKey:  duration,
+		//		orderType: modelv2.QueryOrder_SORT_ASC,
+		//		termRange: index.RangeOpts{
+		//			Lower: convert.Int64ToBytes(50),
+		//			Upper: convert.Int64ToBytes(2000),
+		//		},
+		//	},
+		//	want: []int{200, 500, 1000},
+		//},
+		//{
+		//	name: "scan in (lower, upper) and sort in desc order",
+		//	args: args{
+		//		fieldKey:  duration,
+		//		orderType: modelv2.QueryOrder_SORT_DESC,
+		//		termRange: index.RangeOpts{
+		//			Lower: convert.Int64ToBytes(50),
+		//			Upper: convert.Int64ToBytes(2000),
+		//		},
+		//	},
+		//	want: []int{1000, 500, 200},
+		//},
+		//{
+		//	name: "scan in [lower, upper] and sort in asc order",
+		//	args: args{
+		//		fieldKey:  duration,
+		//		orderType: modelv2.QueryOrder_SORT_ASC,
+		//		termRange: index.RangeOpts{
+		//			Lower:         convert.Int64ToBytes(200),
+		//			IncludesLower: true,
+		//			Upper:         convert.Int64ToBytes(1000),
+		//			IncludesUpper: true,
+		//		},
+		//	},
+		//	want: []int{200, 500, 1000},
+		//},
+		//{
+		//	name: "scan in [lower, upper] and sort in desc order",
+		//	args: args{
+		//		fieldKey:  duration,
+		//		orderType: modelv2.QueryOrder_SORT_DESC,
+		//		termRange: index.RangeOpts{
+		//			Lower:         convert.Int64ToBytes(200),
+		//			IncludesLower: true,
+		//			Upper:         convert.Int64ToBytes(1000),
+		//			IncludesUpper: true,
+		//		},
+		//	},
+		//	want: []int{1000, 500, 200},
+		//},
+		{
+			name: "scan in [lower, undefined)  and sort in asc order",
+			args: args{
+				fieldKey:  duration,
+				orderType: modelv2.QueryOrder_SORT_ASC,
+				termRange: index.RangeOpts{
+					Lower:         convert.Int64ToBytes(200),
+					IncludesLower: true,
+				},
+			},
+			want: []int{200, 500, 1000, 2000},
+		},
+		//{
+		//	name: "scan in [lower, undefined) and sort in desc order",
+		//	args: args{
+		//		fieldKey:  duration,
+		//		orderType: modelv2.QueryOrder_SORT_DESC,
+		//		termRange: index.RangeOpts{
+		//			Lower:         convert.Int64ToBytes(200),
+		//			IncludesLower: true,
+		//		},
+		//	},
+		//	want: []int{2000, 1000, 500, 200},
+		//},
+		{
+			name: "scan in (undefined, upper] and sort in asc order",
+			args: args{
+				fieldKey:  duration,
+				orderType: modelv2.QueryOrder_SORT_ASC,
+				termRange: index.RangeOpts{
+					Upper:         convert.Int64ToBytes(1000),
+					IncludesUpper: true,
+				},
+			},
+			want: []int{50, 200, 500, 1000},
+		},
+		//{
+		//	name: "scan in (undefined, upper] and sort in desc order",
+		//	args: args{
+		//		fieldKey:  duration,
+		//		orderType: modelv2.QueryOrder_SORT_DESC,
+		//		termRange: index.RangeOpts{
+		//			Upper:         convert.Int64ToBytes(1000),
+		//			IncludesUpper: true,
+		//		},
+		//	},
+		//	want: []int{1000, 500, 200, 50},
+		//},
+		{
+			name: "scan splice in (lower, upper) and sort in asc order",
+			args: args{
+				fieldKey:  duration,
+				orderType: modelv2.QueryOrder_SORT_ASC,
+				termRange: index.RangeOpts{
+					Lower: convert.Int64ToBytes(50 + 100),
+					Upper: convert.Int64ToBytes(2000 - 100),
+				},
+			},
+			want: []int{200, 500, 1000},
+		},
+		{
+			name: "scan splice in (lower, upper) and sort in desc order",
+			args: args{
+				fieldKey:  duration,
+				orderType: modelv2.QueryOrder_SORT_DESC,
+				termRange: index.RangeOpts{
+					Lower: convert.Int64ToBytes(50 + 100),
+					Upper: convert.Int64ToBytes(2000 - 100),
+				},
+			},
+			want: []int{1000, 500, 200},
+		},
+		{
+			name: "scan splice in [lower, upper] and sort in asc order",
+			args: args{
+				fieldKey:  duration,
+				orderType: modelv2.QueryOrder_SORT_ASC,
+				termRange: index.RangeOpts{
+					Lower:         convert.Int64ToBytes(50 + 100),
+					IncludesLower: true,
+					Upper:         convert.Int64ToBytes(2000 - 100),
+					IncludesUpper: true,
+				},
+			},
+			want: []int{200, 500, 1000},
+		},
+		{
+			name: "scan splice in [lower, upper] and sort in desc order",
+			args: args{
+				fieldKey:  duration,
+				orderType: modelv2.QueryOrder_SORT_DESC,
+				termRange: index.RangeOpts{
+					Lower:         convert.Int64ToBytes(50 + 100),
+					IncludesLower: true,
+					Upper:         convert.Int64ToBytes(2000 - 100),
+					IncludesUpper: true,
+				},
+			},
+			want: []int{1000, 500, 200},
+		},
+		{
+			name: "no field key",
+			args: args{},
+		},
+		{
+			name: "unknown field key",
+			args: args{
+				fieldKey: index.FieldKey{
+					IndexRule: "unknown",
+				},
+			},
+		},
+		{
+			name: "default order",
+			args: args{
+				fieldKey: duration,
+			},
+			want: []int{50, 200, 500, 1000, 2000},
+		},
+		{
+			name: "invalid range",
+			args: args{
+				fieldKey: duration,
+				termRange: index.RangeOpts{
+					Lower: convert.Int64ToBytes(100),
+					Upper: convert.Int64ToBytes(50),
+				},
+			},
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			iter, found := store.Iterator(tt.args.fieldKey, tt.args.termRange, tt.args.orderType)
+			if !found {
+				tester.Empty(tt.want)
+				return
+			}
+			defer func() {
+				tester.NoError(iter.Close())
+				for i := 0; i < 10; i++ {
+					is.False(iter.Next())
+				}
+			}()
+			is.NotNil(iter)
+			var got []result
+			for iter.Next() {
+				got = append(got, result{
+					key:   iter.Val().Term,
+					items: iter.Val().Value,
+				})
+			}
+			for i := 0; i < 10; i++ {
+				is.False(iter.Next())
+			}
+			is.Equal(len(tt.want), len(got))
+			for i, w := range tt.want {
+				g := got[i]
+				tester.Equal(int64(w), convert.BytesToInt64(g.key))
+				tester.True(data[w].Equal(g.items))
+			}
+		})
+	}
+}
+
+func SetUpDuration(t *assert.Assertions, store index.Writer) map[int]posting.List {
+	r := map[int]posting.List{
+		50:   roaring.NewPostingList(),
+		200:  roaring.NewPostingList(),
+		500:  roaring.NewPostingList(),
+		1000: roaring.NewPostingList(),
+		2000: roaring.NewPostingList(),
+	}
+	return SetUpPartialDuration(t, store, r)
+}
+
+func SetUpPartialDuration(t *assert.Assertions, store index.Writer, r map[int]posting.List) map[int]posting.List {
+	idx := make([]int, 0, len(r))
+	for key, _ := range r {
+		idx = append(idx, key)
+	}
+	sort.Ints(idx)
+	for i := 100; i < 200; i++ {
+		id := common.ItemID(i)
+		for i2, term := range idx {
+			if i%len(idx) != i2 || r[term] == nil {
+				continue
+			}
+			t.NoError(store.Write(index.Field{
+				Key:  duration.Marshal(),
+				Term: convert.Int64ToBytes(int64(term)),
+			}, id))
+			r[term].Insert(id)
+		}
+	}
+	return r
+}
diff --git a/pkg/index/test_cases/service_name.go b/pkg/index/test_cases/service_name.go
new file mode 100644
index 0000000..40a0714
--- /dev/null
+++ b/pkg/index/test_cases/service_name.go
@@ -0,0 +1,100 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package test_cases
+
+import (
+	"testing"
+
+	"github.com/stretchr/testify/assert"
+
+	"github.com/apache/skywalking-banyandb/api/common"
+	"github.com/apache/skywalking-banyandb/pkg/index"
+	"github.com/apache/skywalking-banyandb/pkg/index/posting"
+	"github.com/apache/skywalking-banyandb/pkg/index/posting/roaring"
+)
+
+var serviceName = index.FieldKey{
+	IndexRule: "service_name",
+}.Marshal()
+
+func RunServiceName(t *testing.T, store SimpleStore) {
+	tester := assert.New(t)
+	tests := []struct {
+		name    string
+		arg     index.Field
+		want    posting.List
+		wantErr bool
+	}{
+		{
+			name: "match gateway",
+			arg: index.Field{
+				Key:  serviceName,
+				Term: []byte("gateway"),
+			},
+			want: roaring.NewRange(0, 50),
+		},
+		{
+			name: "match webpage",
+			arg: index.Field{
+				Key:  serviceName,
+				Term: []byte("webpage"),
+			},
+			want: roaring.NewRange(50, 100),
+		},
+		{
+			name: "unknown field",
+			want: roaring.EmptyPostingList,
+		},
+		{
+			name: "unknown term",
+			arg: index.Field{
+				Key:  serviceName,
+				Term: []byte("unknown"),
+			},
+			want: roaring.EmptyPostingList,
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			list, err := store.MatchTerms(tt.arg)
+			if tt.wantErr {
+				tester.Error(err)
+				return
+			}
+			tester.NoError(err)
+			tester.NotNil(list)
+			tester.True(tt.want.Equal(list))
+		})
+	}
+}
+
+func SetUp(t *assert.Assertions, store SimpleStore) {
+	for i := 0; i < 100; i++ {
+		if i < 100/2 {
+			t.NoError(store.Write(index.Field{
+				Key:  serviceName,
+				Term: []byte("gateway"),
+			}, common.ItemID(i)))
+		} else {
+			t.NoError(store.Write(index.Field{
+				Key:  serviceName,
+				Term: []byte("webpage"),
+			}, common.ItemID(i)))
+		}
+	}
+}
diff --git a/pkg/index/search.go b/pkg/index/tree.go
similarity index 87%
rename from pkg/index/search.go
rename to pkg/index/tree.go
index 3f304d5..1f5bda1 100644
--- a/pkg/index/search.go
+++ b/pkg/index/tree.go
@@ -25,7 +25,6 @@ import (
 
 	"github.com/pkg/errors"
 
-	"github.com/apache/skywalking-banyandb/api/common"
 	modelv2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v2"
 	"github.com/apache/skywalking-banyandb/pkg/index/posting"
 )
@@ -38,18 +37,7 @@ type Executor interface {
 
 type Tree interface {
 	Executor
-}
-
-type FieldKey struct {
-	SeriesID  common.SeriesID
-	IndexRule string
-}
-
-func (t *FieldKey) Marshal() []byte {
-	return bytes.Join([][]byte{
-		t.SeriesID.Marshal(),
-		[]byte(t.IndexRule),
-	}, []byte(":"))
+	TrimRangeLeaf(key FieldKey) (rangeOpts RangeOpts, found bool)
 }
 
 type Condition map[FieldKey][]ConditionValue
@@ -66,8 +54,7 @@ func BuildTree(searcher Searcher, condMap Condition) (Tree, error) {
 			searcher: searcher,
 		},
 	}
-	for term, conds := range condMap {
-		key := term.Marshal()
+	for key, conds := range condMap {
 		var rangeLeaf *rangeOp
 		for _, cond := range conds {
 			if rangeLeaf != nil && !rangeOP(cond.Op) {
@@ -136,7 +123,7 @@ type node struct {
 	SubNodes []Executor `json:"sub_nodes,omitempty"`
 }
 
-func (n *node) newEq(key []byte, values [][]byte) *eq {
+func (n *node) newEq(key FieldKey, values [][]byte) *eq {
 	return &eq{
 		leaf: &leaf{
 			Key:      key,
@@ -146,11 +133,11 @@ func (n *node) newEq(key []byte, values [][]byte) *eq {
 	}
 }
 
-func (n *node) addEq(key []byte, values [][]byte) {
+func (n *node) addEq(key FieldKey, values [][]byte) {
 	n.SubNodes = append(n.SubNodes, n.newEq(key, values))
 }
 
-func (n *node) addNot(key []byte, inner Executor) {
+func (n *node) addNot(key FieldKey, inner Executor) {
 	n.SubNodes = append(n.SubNodes, &not{
 		Key:      key,
 		searcher: n.searcher,
@@ -158,13 +145,13 @@ func (n *node) addNot(key []byte, inner Executor) {
 	})
 }
 
-func (n *node) addRangeLeaf(key []byte) *rangeOp {
+func (n *node) addRangeLeaf(key FieldKey) *rangeOp {
 	r := &rangeOp{
 		leaf: &leaf{
 			Key:      key,
 			searcher: n.searcher,
 		},
-		Opts: &RangeOpts{},
+		Opts: RangeOpts{},
 	}
 	n.SubNodes = append(n.SubNodes, r)
 	return r
@@ -221,6 +208,23 @@ type andNode struct {
 	*node
 }
 
+func (an *andNode) TrimRangeLeaf(key FieldKey) (RangeOpts, bool) {
+	removeLeaf := func(s []Executor, index int) []Executor {
+		return append(s[:index], s[index+1:]...)
+	}
+	for i, subNode := range an.SubNodes {
+		leafRange, ok := subNode.(*rangeOp)
+		if !ok {
+			continue
+		}
+		if key.Equal(leafRange.Key) {
+			an.SubNodes = removeLeaf(an.SubNodes, i)
+			return leafRange.Opts, true
+		}
+	}
+	return RangeOpts{}, false
+}
+
 func (an *andNode) merge(list posting.List) error {
 	return an.value.Intersect(list)
 }
@@ -255,20 +259,23 @@ func (on *orNode) MarshalJSON() ([]byte, error) {
 
 type leaf struct {
 	Executor
-	Key      []byte
+	Key      FieldKey
 	Values   [][]byte
 	searcher Searcher
 }
 
 type not struct {
 	Executor
-	Key      []byte
+	Key      FieldKey
 	searcher Searcher
 	Inner    Executor
 }
 
 func (n *not) Execute() (posting.List, error) {
-	all := n.searcher.MatchField(n.Key)
+	all, err := n.searcher.MatchField(n.Key)
+	if err != nil {
+		return nil, err
+	}
 	list, err := n.Inner.Execute()
 	if err != nil {
 		return nil, err
@@ -289,9 +296,9 @@ type eq struct {
 
 func (eq *eq) Execute() (posting.List, error) {
 	return eq.searcher.MatchTerms(Field{
-		Key:  eq.Key,
+		Key:  eq.Key.Marshal(),
 		Term: bytes.Join(eq.Values, nil),
-	}), nil
+	})
 }
 
 func (eq *eq) MarshalJSON() ([]byte, error) {
@@ -302,11 +309,11 @@ func (eq *eq) MarshalJSON() ([]byte, error) {
 
 type rangeOp struct {
 	*leaf
-	Opts *RangeOpts
+	Opts RangeOpts
 }
 
 func (r *rangeOp) Execute() (posting.List, error) {
-	return r.searcher.Range(r.Key, *r.Opts), nil
+	return r.searcher.Range(r.Key, r.Opts)
 }
 
 func (r *rangeOp) MarshalJSON() ([]byte, error) {