You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ha...@apache.org on 2021/09/12 13:59:08 UTC
[skywalking-banyandb] 02/03: Implement two indices
This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch time-series
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 5f5c2fa58b98168906b1bc7fd703289d5f485762
Author: Gao Hongtao <ha...@gmail.com>
AuthorDate: Sun Sep 12 20:43:11 2021 +0800
Implement two indices
Signed-off-by: Gao Hongtao <ha...@gmail.com>
---
banyand/index/index_test.go | 2 +-
banyand/kv/badger.go | 114 +--------
banyand/kv/kv.go | 19 +-
banyand/storage/database_test.go | 259 ++++++++++---------
banyand/stream/stream_query_test.go | 54 ++--
banyand/stream/stream_write.go | 1 +
banyand/stream/stream_write_test.go | 2 +-
banyand/tsdb/block.go | 40 +--
banyand/tsdb/series_seek.go | 15 +-
banyand/tsdb/series_seek_filter.go | 7 +
banyand/tsdb/series_seek_sort.go | 42 +++-
banyand/tsdb/series_write.go | 4 +-
pkg/index/index.go | 86 ++++++-
pkg/index/inverted/field_map.go | 26 +-
pkg/index/inverted/inverted.go | 199 ++++++++++++++-
pkg/index/inverted/inverted_test.go | 142 +++++++++++
pkg/index/inverted/mem.go | 173 +++++++++----
pkg/index/inverted/mem_test.go | 237 +-----------------
pkg/index/inverted/term_map.go | 101 +-------
pkg/index/iterator.go | 209 ++++++++++++++++
pkg/index/{inverted/inverted.go => lsm/lsm.go} | 38 +--
pkg/index/lsm/lsm_test.go | 69 ++++++
pkg/index/lsm/search.go | 84 +++++++
pkg/index/posting/roaring/roaring.go | 8 +
pkg/index/test_cases/duration.go | 329 +++++++++++++++++++++++++
pkg/index/test_cases/service_name.go | 100 ++++++++
pkg/index/{search.go => tree.go} | 61 +++--
27 files changed, 1677 insertions(+), 744 deletions(-)
diff --git a/banyand/index/index_test.go b/banyand/index/index_test.go
index fde4e9f..7730f91 100644
--- a/banyand/index/index_test.go
+++ b/banyand/index/index_test.go
@@ -116,7 +116,7 @@ func Test_service_Insert(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
s := setUpModules(tester)
if err := s.Insert(tt.args.series, tt.args.shardID, tt.args.field); (err != nil) != tt.wantErr {
- t.Errorf("Insert() error = %v, wantErr %v", err, tt.wantErr)
+ t.Errorf("Write() error = %v, wantErr %v", err, tt.wantErr)
}
})
}
diff --git a/banyand/kv/badger.go b/banyand/kv/badger.go
index a3767d5..92a8529 100644
--- a/banyand/kv/badger.go
+++ b/banyand/kv/badger.go
@@ -25,17 +25,8 @@ import (
"github.com/dgraph-io/badger/v3"
"github.com/dgraph-io/badger/v3/y"
- "go.uber.org/multierr"
-
- "github.com/apache/skywalking-banyandb/api/common"
- modelv2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v2"
- "github.com/apache/skywalking-banyandb/pkg/convert"
- "github.com/apache/skywalking-banyandb/pkg/index"
- "github.com/apache/skywalking-banyandb/pkg/index/posting"
- "github.com/apache/skywalking-banyandb/pkg/index/posting/roaring"
+
"github.com/apache/skywalking-banyandb/pkg/logger"
- posting2 "github.com/apache/skywalking-banyandb/pkg/posting"
- roaring2 "github.com/apache/skywalking-banyandb/pkg/posting/roaring"
)
var (
@@ -62,7 +53,7 @@ func (b *badgerTSS) Close() error {
}
type mergedIter struct {
- delegated Iterator2
+ delegated Iterator
valid bool
data []byte
}
@@ -95,12 +86,7 @@ func (i *mergedIter) parseData() {
if !i.valid {
return
}
- data, err := i.delegated.Val().Marshall()
- if err != nil {
- i.valid = false
- return
- }
- i.data = data
+ i.data = i.delegated.Val()
}
func (i *mergedIter) Close() error {
@@ -122,39 +108,12 @@ type badgerDB struct {
db *badger.DB
}
-func (b *badgerDB) Handover(iterator Iterator2) error {
+func (b *badgerDB) Handover(iterator Iterator) error {
return b.db.HandoverIterator(&mergedIter{
delegated: iterator,
})
}
-func (b *badgerDB) Seek(key []byte, limit int) (posting2.List, error) {
- opts := badger.DefaultIteratorOptions
- it := b.db.NewIterator(opts)
- defer func() {
- _ = it.Close()
- }()
- result := roaring2.NewPostingList()
- var errMerged error
- for it.Seek(y.KeyWithTs(key, math.MaxInt64)); it.Valid(); it.Next() {
- k := y.ParseKey(it.Key())
- if !bytes.Equal(key, k) {
- break
- }
- list := roaring2.NewPostingList()
- err := list.Unmarshall(it.Value().Value)
- if err != nil {
- errMerged = multierr.Append(errMerged, err)
- continue
- }
- _ = result.Union(list)
- if result.Len() > limit {
- break
- }
- }
- return result, errMerged
-}
-
func (b *badgerDB) Scan(key []byte, opt ScanOpts, f ScanFunc) error {
opts := badger.DefaultIteratorOptions
opts.PrefetchSize = opt.PrefetchSize
@@ -194,7 +153,7 @@ func (i *iterator) Rewind() {
}
func (i *iterator) Seek(key []byte) {
- i.delegated.Seek(key)
+ i.delegated.Seek(y.KeyWithTs(key, math.MaxInt64))
}
func (i *iterator) Key() []byte {
@@ -254,7 +213,7 @@ func (b *badgerDB) Get(key []byte) ([]byte, error) {
func (b *badgerDB) GetAll(key []byte, applyFn func([]byte) error) error {
iter := b.db.NewIterator(badger.DefaultIteratorOptions)
var count int
- for iter.Seek(key); iter.Valid(); iter.Next() {
+ for iter.Seek(y.KeyWithTs(key, math.MaxInt64)); iter.Valid(); iter.Next() {
if !bytes.Equal(y.ParseKey(iter.Key()), key) {
break
}
@@ -270,67 +229,6 @@ func (b *badgerDB) GetAll(key []byte, applyFn func([]byte) error) error {
return ErrKeyNotFound
}
-func (b *badgerDB) MatchField(fieldName []byte) (list posting.List) {
- panic("implement me")
-}
-
-func (b *badgerDB) MatchTerms(field index.Field) (list posting.List) {
- panic("implement me")
-}
-
-func (b *badgerDB) Range(fieldName []byte, opts index.RangeOpts) (list posting.List) {
- panic("implement me")
-}
-
-var _ index.FieldIterator = (*fIterator)(nil)
-
-type fIterator struct {
- init bool
- delegate Iterator
- curr *index.PostingValue
-}
-
-func (f *fIterator) Next() bool {
- if !f.init {
- f.init = true
- f.delegate.Rewind()
- }
- if !f.delegate.Valid() {
- return false
- }
- pv := &index.PostingValue{
- Key: f.delegate.Key(),
- Value: roaring.NewPostingListWithInitialData(convert.BytesToUint64(f.delegate.Val())),
- }
- for ; f.delegate.Valid() && bytes.Equal(pv.Key, f.delegate.Key()); f.delegate.Next() {
- pv.Value.Insert(common.ItemID(convert.BytesToUint64(f.delegate.Val())))
- }
- f.curr = pv
- return true
-}
-
-func (f *fIterator) Val() *index.PostingValue {
- return f.curr
-}
-
-func (f *fIterator) Close() error {
- return f.delegate.Close()
-}
-
-func (b *badgerDB) FieldIterator(fieldName []byte, order modelv2.QueryOrder_Sort) index.FieldIterator {
- var reverse bool
- if order == modelv2.QueryOrder_SORT_DESC {
- reverse = true
- }
- iter := b.NewIterator(ScanOpts{
- Prefix: fieldName,
- Reverse: reverse,
- })
- return &fIterator{
- delegate: iter,
- }
-}
-
// badgerLog delegates the zap log to the badger logger
type badgerLog struct {
*log.Logger
diff --git a/banyand/kv/kv.go b/banyand/kv/kv.go
index 0e49715..fc829d3 100644
--- a/banyand/kv/kv.go
+++ b/banyand/kv/kv.go
@@ -25,9 +25,7 @@ import (
"github.com/dgraph-io/badger/v3"
"github.com/pkg/errors"
- "github.com/apache/skywalking-banyandb/pkg/index"
"github.com/apache/skywalking-banyandb/pkg/logger"
- posting2 "github.com/apache/skywalking-banyandb/pkg/posting"
)
var (
@@ -54,6 +52,7 @@ type ScanOpts struct {
}
type Reader interface {
+ Iterable
// Get a value by its key
Get(key []byte) ([]byte, error)
GetAll(key []byte, applyFn func([]byte) error) error
@@ -65,7 +64,6 @@ type Store interface {
io.Closer
Writer
Reader
- index.Searcher
}
type TimeSeriesWriter interface {
@@ -113,21 +111,16 @@ type Iterator interface {
Close() error
}
-type Iterator2 interface {
- Next()
- Rewind()
- Seek(key []byte)
- Key() []byte
- Val() posting2.List
- Valid() bool
- Close() error
+type Iterable interface {
+ NewIterator(opt ScanOpts) Iterator
}
type HandoverCallback func()
type IndexStore interface {
- Handover(iterator Iterator2) error
- Seek(key []byte, limit int) (posting2.List, error)
+ Iterable
+ Reader
+ Handover(iterator Iterator) error
Close() error
}
diff --git a/banyand/storage/database_test.go b/banyand/storage/database_test.go
index 971cdf7..42893e0 100644
--- a/banyand/storage/database_test.go
+++ b/banyand/storage/database_test.go
@@ -29,12 +29,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
- "github.com/apache/skywalking-banyandb/api/common"
- "github.com/apache/skywalking-banyandb/banyand/kv"
- "github.com/apache/skywalking-banyandb/pkg/convert"
"github.com/apache/skywalking-banyandb/pkg/logger"
- "github.com/apache/skywalking-banyandb/pkg/posting"
- "github.com/apache/skywalking-banyandb/pkg/posting/roaring"
)
func TestDB_Create_Directory(t *testing.T) {
@@ -63,59 +58,60 @@ func TestDB_Create_Directory(t *testing.T) {
validateDirectory(t, fmt.Sprintf(blockTemplate, segPath, now.Format(blockFormat)))
}
-func TestDB_Store(t *testing.T) {
- is := require.New(t)
- ctrl := gomock.NewController(t)
- defer ctrl.Finish()
- now := uint64(time.Now().UnixNano())
- var ap WritePoint
- var repo StoreRepo
- p := mockPlugin(ctrl, func(r StoreRepo, get GetWritePoint) {
- ap = get(now)
- repo = r
- })
-
- tempDir, db := setUp(t, p)
- defer func() {
- db.GracefulStop()
- removeDir(tempDir)
- }()
-
- is.NoError(ap.Writer(0, "normal").Put([]byte("key1"), []byte{12}))
- val, err := repo.Reader(0, "normal", now, now).Get([]byte("key1"))
- is.NoError(err)
- is.Equal([]byte{12}, val)
-
- is.NoError(ap.TimeSeriesWriter(1, "time-series").Put([]byte("key11"), []byte{33}, 1))
- val, err = repo.TimeSeriesReader(1, "time-series", now, now).Get([]byte("key11"), 1)
- is.NoError(err)
- is.Equal([]byte{33}, val)
- vals, allErr := repo.TimeSeriesReader(1, "time-series", now, now).GetAll([]byte("key11"))
- is.NoError(allErr)
- is.Equal([][]byte{{33}}, vals)
-
- index := repo.Index(1, "index")
- is.NoError(index.Handover(mockMemtable([]uint64{1, 2}, []uint64{3, 6})))
- list, err := index.Seek(convert.Int64ToBytes(0), 2)
- is.NoError(err)
- is.Equal(2, list.Len())
- is.True(list.Contains(common.ChunkID(1)))
- is.True(list.Contains(common.ChunkID(2)))
- list, err = index.Seek(convert.Int64ToBytes(1), 2)
- is.NoError(err)
- is.Equal(2, list.Len())
- is.True(list.Contains(common.ChunkID(3)))
- is.True(list.Contains(common.ChunkID(6)))
-
- is.NoError(index.Handover(mockMemtable([]uint64{11, 14})))
- list, err = index.Seek(convert.Int64ToBytes(0), 2)
- is.NoError(err)
- is.Equal(4, list.Len())
- is.True(list.Contains(common.ChunkID(1)))
- is.True(list.Contains(common.ChunkID(2)))
- is.True(list.Contains(common.ChunkID(11)))
- is.True(list.Contains(common.ChunkID(14)))
-}
+//
+//func TestDB_Store(t *testing.T) {
+// is := require.New(t)
+// ctrl := gomock.NewController(t)
+// defer ctrl.Finish()
+// now := uint64(time.Now().UnixNano())
+// var ap WritePoint
+// var repo StoreRepo
+// p := mockPlugin(ctrl, func(r StoreRepo, get GetWritePoint) {
+// ap = get(now)
+// repo = r
+// })
+//
+// tempDir, db := setUp(t, p)
+// defer func() {
+// db.GracefulStop()
+// removeDir(tempDir)
+// }()
+//
+// is.NoError(ap.Writer(0, "normal").Put([]byte("key1"), []byte{12}))
+// val, err := repo.Reader(0, "normal", now, now).Get([]byte("key1"))
+// is.NoError(err)
+// is.Equal([]byte{12}, val)
+//
+// is.NoError(ap.TimeSeriesWriter(1, "time-series").Put([]byte("key11"), []byte{33}, 1))
+// val, err = repo.TimeSeriesReader(1, "time-series", now, now).Get([]byte("key11"), 1)
+// is.NoError(err)
+// is.Equal([]byte{33}, val)
+// vals, allErr := repo.TimeSeriesReader(1, "time-series", now, now).GetAll([]byte("key11"))
+// is.NoError(allErr)
+// is.Equal([][]byte{{33}}, vals)
+//
+// index := repo.Index(1, "index")
+// is.NoError(index.Handover(mockMemtable([]uint64{1, 2}, []uint64{3, 6})))
+// list, err := index.Seek(convert.Int64ToBytes(0), 2)
+// is.NoError(err)
+// is.Equal(2, list.Len())
+// is.True(list.Contains(common.ChunkID(1)))
+// is.True(list.Contains(common.ChunkID(2)))
+// list, err = index.Seek(convert.Int64ToBytes(1), 2)
+// is.NoError(err)
+// is.Equal(2, list.Len())
+// is.True(list.Contains(common.ChunkID(3)))
+// is.True(list.Contains(common.ChunkID(6)))
+//
+// is.NoError(index.Handover(mockMemtable([]uint64{11, 14})))
+// list, err = index.Seek(convert.Int64ToBytes(0), 2)
+// is.NoError(err)
+// is.Equal(4, list.Len())
+// is.True(list.Contains(common.ChunkID(1)))
+// is.True(list.Contains(common.ChunkID(2)))
+// is.True(list.Contains(common.ChunkID(11)))
+// is.True(list.Contains(common.ChunkID(14)))
+//}
func TestDB_FlushCallback(t *testing.T) {
is := require.New(t)
@@ -168,79 +164,80 @@ func TestDB_FlushCallback(t *testing.T) {
}
}
-var _ kv.Iterator2 = (*iter)(nil)
-
-type iter struct {
- data map[int]posting.List
- p int
-}
-
-func (i *iter) Next() {
- i.p++
-}
-
-func (i *iter) Rewind() {
- i.p = 0
-}
-
-func (i *iter) Seek(key []byte) {
- panic("implement me")
-}
-
-func (i *iter) Key() []byte {
- return convert.Int64ToBytes(int64(i.p))
-}
-
-func (i *iter) Val() posting.List {
- return i.data[i.p]
-}
-
-func (i *iter) Valid() bool {
- _, ok := i.data[i.p]
- return ok
-}
-
-func (i *iter) Close() error {
- return nil
-}
-
-func mockMemtable(data ...[]uint64) kv.Iterator2 {
- it := &iter{
- data: make(map[int]posting.List),
- }
- for i, d := range data {
- it.data[i] = roaring.NewPostingListWithInitialData(d...)
- }
- return it
-}
-
-func mockPlugin(ctrl *gomock.Controller, f func(repo StoreRepo, get GetWritePoint)) Plugin {
- p := NewMockPlugin(ctrl)
- p.EXPECT().Meta().Return(PluginMeta{
- ID: "sw",
- Group: "default",
- ShardNumber: 2,
- KVSpecs: []KVSpec{
- {
- Name: "normal",
- Type: KVTypeNormal,
- },
- {
- Name: "time-series",
- Type: KVTypeTimeSeries,
- CompressLevel: 3,
- },
- {
- Name: "index",
- Type: KVTypeIndex,
- },
- },
- }).AnyTimes()
- p.EXPECT().Init(gomock.Any(), gomock.Any()).Do(func(r StoreRepo, wp GetWritePoint) {
- f(r, wp)
- }).AnyTimes()
- return p
-}
+//
+//var _ kv.Iterator2 = (*iter)(nil)
+//
+//type iter struct {
+// data map[int]posting.List
+// p int
+//}
+//
+//func (i *iter) Next() {
+// i.p++
+//}
+//
+//func (i *iter) Rewind() {
+// i.p = 0
+//}
+//
+//func (i *iter) Seek(key []byte) {
+// panic("implement me")
+//}
+//
+//func (i *iter) Key() []byte {
+// return convert.Int64ToBytes(int64(i.p))
+//}
+//
+//func (i *iter) Val() posting.List {
+// return i.data[i.p]
+//}
+//
+//func (i *iter) Valid() bool {
+// _, ok := i.data[i.p]
+// return ok
+//}
+//
+//func (i *iter) Close() error {
+// return nil
+//}
+//
+//func mockMemtable(data ...[]uint64) kv.Iterator2 {
+// it := &iter{
+// data: make(map[int]posting.List),
+// }
+// for i, d := range data {
+// it.data[i] = roaring.NewPostingListWithInitialData(d...)
+// }
+// return it
+//}
+//
+//func mockPlugin(ctrl *gomock.Controller, f func(repo StoreRepo, get GetWritePoint)) Plugin {
+// p := NewMockPlugin(ctrl)
+// p.EXPECT().Meta().Return(PluginMeta{
+// ID: "sw",
+// Group: "default",
+// ShardNumber: 2,
+// KVSpecs: []KVSpec{
+// {
+// Name: "normal",
+// Type: KVTypeNormal,
+// },
+// {
+// Name: "time-series",
+// Type: KVTypeTimeSeries,
+// CompressLevel: 3,
+// },
+// {
+// Name: "index",
+// Type: KVTypeIndex,
+// },
+// },
+// }).AnyTimes()
+// p.EXPECT().Init(gomock.Any(), gomock.Any()).Do(func(r StoreRepo, wp GetWritePoint) {
+// f(r, wp)
+// }).AnyTimes()
+// return p
+//}
func setUp(t *testing.T, p Plugin) (tempDir string, db Database) {
require.NoError(t, logger.Init(logger.Logging{
diff --git a/banyand/stream/stream_query_test.go b/banyand/stream/stream_query_test.go
index a060ff3..f02921b 100644
--- a/banyand/stream/stream_query_test.go
+++ b/banyand/stream/stream_query_test.go
@@ -19,6 +19,7 @@ package stream
import (
"bytes"
+ "context"
"embed"
_ "embed"
"encoding/base64"
@@ -33,6 +34,7 @@ import (
"github.com/golang/protobuf/jsonpb"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
"google.golang.org/protobuf/types/known/timestamppb"
"github.com/apache/skywalking-banyandb/api/common"
@@ -58,7 +60,7 @@ func Test_Stream_SelectShard(t *testing.T) {
tester := assert.New(t)
s, deferFunc := setup(tester)
defer deferFunc()
- _ = setupQueryData(tester, "multiple_shards.json", s)
+ _ = setupQueryData(t, "multiple_shards.json", s)
tests := []struct {
name string
entity tsdb.Entity
@@ -99,7 +101,7 @@ func Test_Stream_Series(t *testing.T) {
tester := assert.New(t)
s, deferFunc := setup(tester)
defer deferFunc()
- baseTime := setupQueryData(tester, "multiple_shards.json", s)
+ baseTime := setupQueryData(t, "multiple_shards.json", s)
tests := []struct {
name string
args queryOpts
@@ -315,7 +317,7 @@ func Test_Stream_Global_Index(t *testing.T) {
tester := assert.New(t)
s, deferFunc := setup(tester)
defer deferFunc()
- _ = setupQueryData(tester, "global_index.json", s)
+ _ = setupQueryData(t, "global_index.json", s)
tests := []struct {
name string
traceID string
@@ -477,7 +479,8 @@ func queryData(tester *assert.Assertions, s *stream, opts queryOpts) (shardsForT
//go:embed testdata/*.json
var dataFS embed.FS
-func setupQueryData(t *assert.Assertions, dataFile string, stream *stream) (baseTime time.Time) {
+func setupQueryData(testing *testing.T, dataFile string, stream *stream) (baseTime time.Time) {
+ t := assert.New(testing)
var templates []interface{}
baseTime = time.Now()
content, err := dataFS.ReadFile("testdata/" + dataFile)
@@ -509,24 +512,37 @@ func setupQueryData(t *assert.Assertions, dataFile string, stream *stream) (base
t.NoError(err)
shardID, err := partition.ShardID(entity.Marshal(), stream.schema.GetShardNum())
t.NoError(err)
- itemID, err := stream.write(common.ShardID(shardID), e)
+ _, err = stream.write(common.ShardID(shardID), e)
t.NoError(err)
- sa, err := stream.Shards(entity)
- t.NoError(err)
- for _, shard := range sa {
- se, err := shard.Series().Get(entity)
- t.NoError(err)
- for {
- item, closer, _ := se.Get(*itemID)
- rawTagFamily, _ := item.Val("searchable")
- if len(rawTagFamily) > 0 {
- _ = closer.Close()
- break
+ }
+ ctx, cancelFunc := context.WithTimeout(context.Background(), 10*time.Second)
+ defer cancelFunc()
+ err = ready(ctx, t, stream, queryOpts{
+ entity: tsdb.Entity{tsdb.AnyEntry, tsdb.AnyEntry, tsdb.AnyEntry},
+ timeRange: tsdb.NewTimeRangeDuration(baseTime, 1*time.Hour),
+ })
+ require.NoError(testing, err)
+ return baseTime
+}
+
+func ready(ctx context.Context, t *assert.Assertions, stream *stream, options queryOpts) error {
+ for {
+ loop:
+ select {
+ case <-ctx.Done():
+ return ctx.Err()
+ default:
+ data, err := queryData(t, stream, options)
+ if err != nil {
+ return err
+ }
+ for _, d := range data {
+ if len(d.elements) < 1 {
+ time.Sleep(300 * time.Millisecond)
+ break loop
}
- _ = closer.Close()
}
-
+ return nil
}
}
- return baseTime
}
diff --git a/banyand/stream/stream_write.go b/banyand/stream/stream_write.go
index fa80dc3..9ce2732 100644
--- a/banyand/stream/stream_write.go
+++ b/banyand/stream/stream_write.go
@@ -102,6 +102,7 @@ func (s *stream) write(shardID common.ShardID, value *streamv2.ElementValue) (*t
Int("ts_nano", t.Nanosecond()).
Interface("data", value).
Uint64("series_id", uint64(series.ID())).
+ Int("shard_id", int(shardID)).
Msg("write stream")
return writer, errWrite
}
diff --git a/banyand/stream/stream_write_test.go b/banyand/stream/stream_write_test.go
index 0ffe0d5..b0d8880 100644
--- a/banyand/stream/stream_write_test.go
+++ b/banyand/stream/stream_write_test.go
@@ -211,7 +211,7 @@ func Test_Stream_Write(t *testing.T) {
func setup(t *assert.Assertions) (*stream, func()) {
t.NoError(logger.Init(logger.Logging{
Env: "dev",
- Level: "info",
+ Level: "trace",
}))
tempDir, deferFunc := test.Space(t)
streamRepo, err := schema.NewStream()
diff --git a/banyand/tsdb/block.go b/banyand/tsdb/block.go
index 44f1922..824c861 100644
--- a/banyand/tsdb/block.go
+++ b/banyand/tsdb/block.go
@@ -27,9 +27,9 @@ import (
"github.com/apache/skywalking-banyandb/api/common"
databasev2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v2"
"github.com/apache/skywalking-banyandb/banyand/kv"
- "github.com/apache/skywalking-banyandb/pkg/convert"
"github.com/apache/skywalking-banyandb/pkg/index"
"github.com/apache/skywalking-banyandb/pkg/index/inverted"
+ "github.com/apache/skywalking-banyandb/pkg/index/lsm"
"github.com/apache/skywalking-banyandb/pkg/logger"
)
@@ -39,15 +39,14 @@ type block struct {
ref *z.Closer
store kv.TimeSeriesStore
- primaryIndex kv.Store
- invertedIndex inverted.GlobalStore
+ primaryIndex index.Store
+ invertedIndex index.Store
+ lsmIndex index.Store
closableLst []io.Closer
endTime time.Time
startTime time.Time
segID uint16
blockID uint16
-
- //revertedIndex kv.Store
}
type blockOpts struct {
@@ -76,7 +75,10 @@ func newBlock(ctx context.Context, opts blockOpts) (b *block, err error) {
kv.TSSWithLogger(b.l)); err != nil {
return nil, err
}
- if b.primaryIndex, err = kv.OpenStore(0, b.path+"/p_index", kv.StoreWithLogger(b.l)); err != nil {
+ if b.primaryIndex, err = lsm.NewStore(lsm.StoreOpts{
+ Path: b.path + "/primary",
+ Logger: b.l,
+ }); err != nil {
return nil, err
}
b.closableLst = append(b.closableLst, b.store, b.primaryIndex)
@@ -84,8 +86,18 @@ func newBlock(ctx context.Context, opts blockOpts) (b *block, err error) {
if !ok || len(rules) == 0 {
return b, nil
}
- b.invertedIndex = inverted.NewStore("inverted")
- return b, nil
+ b.invertedIndex, err = inverted.NewStore(inverted.StoreOpts{
+ Path: b.path + "/inverted",
+ Logger: b.l,
+ })
+ if b.lsmIndex, err = lsm.NewStore(lsm.StoreOpts{
+ Path: b.path + "/lsm",
+ Logger: b.l,
+ }); err != nil {
+ return nil, err
+ }
+ b.closableLst = append(b.closableLst, b.invertedIndex, b.lsmIndex)
+ return b, err
}
func (b *block) delegate() blockDelegate {
@@ -137,11 +149,11 @@ func (d *bDelegate) dataReader() kv.TimeSeriesReader {
}
func (d *bDelegate) lsmIndexReader() index.Searcher {
- return d.delegate.invertedIndex.Searcher()
+ return d.delegate.lsmIndex
}
func (d *bDelegate) invertedIndexReader() index.Searcher {
- return d.delegate.invertedIndex.Searcher()
+ return d.delegate.invertedIndex
}
func (d *bDelegate) primaryIndexReader() index.Searcher {
@@ -161,21 +173,21 @@ func (d *bDelegate) write(key []byte, val []byte, ts time.Time) error {
}
func (d *bDelegate) writePrimaryIndex(field index.Field, id common.ItemID) error {
- return d.delegate.primaryIndex.Put(field.Marshal(), convert.Uint64ToBytes(uint64(id)))
+ return d.delegate.primaryIndex.Write(field, id)
}
func (d *bDelegate) writeLSMIndex(field index.Field, id common.ItemID) error {
- if d.delegate.invertedIndex == nil {
+ if d.delegate.lsmIndex == nil {
return nil
}
- return d.delegate.invertedIndex.Insert(field, id)
+ return d.delegate.lsmIndex.Write(field, id)
}
func (d *bDelegate) writeInvertedIndex(field index.Field, id common.ItemID) error {
if d.delegate.invertedIndex == nil {
return nil
}
- return d.delegate.invertedIndex.Insert(field, id)
+ return d.delegate.invertedIndex.Write(field, id)
}
func (d *bDelegate) contains(ts time.Time) bool {
diff --git a/banyand/tsdb/series_seek.go b/banyand/tsdb/series_seek.go
index a6cf6c0..4763ec6 100644
--- a/banyand/tsdb/series_seek.go
+++ b/banyand/tsdb/series_seek.go
@@ -18,12 +18,11 @@
package tsdb
import (
- "time"
-
"github.com/apache/skywalking-banyandb/api/common"
databasev2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v2"
modelv2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v2"
"github.com/apache/skywalking-banyandb/banyand/kv"
+ "github.com/apache/skywalking-banyandb/pkg/index"
)
type Iterator interface {
@@ -62,6 +61,7 @@ type seekerBuilder struct {
}
order modelv2.QueryOrder_Sort
indexRuleForSorting *databasev2.IndexRule
+ rangeOptsForSorting index.RangeOpts
}
func (s *seekerBuilder) Build() (Seeker, error) {
@@ -72,16 +72,7 @@ func (s *seekerBuilder) Build() (Seeker, error) {
if err != nil {
return nil, err
}
- filters := []filterFn{
- func(item Item) bool {
- valid := s.seriesSpan.timeRange.contains(item.Time())
- timeRange := s.seriesSpan.timeRange
- s.seriesSpan.l.Trace().
- Times("time_range", []time.Time{timeRange.Start, timeRange.End}).
- Bool("valid", valid).Msg("filter item by time range")
- return valid
- },
- }
+ filters := make([]filterFn, 0, 2)
if indexFilter != nil {
filters = append(filters, indexFilter)
}
diff --git a/banyand/tsdb/series_seek_filter.go b/banyand/tsdb/series_seek_filter.go
index bd55f1c..8e92715 100644
--- a/banyand/tsdb/series_seek_filter.go
+++ b/banyand/tsdb/series_seek_filter.go
@@ -75,6 +75,13 @@ func (s *seekerBuilder) buildIndexFilter() (filterFn, error) {
if err != nil {
return err
}
+ rangeOpts, found := tree.TrimRangeLeaf(index.FieldKey{
+ SeriesID: s.seriesSpan.seriesID,
+ IndexRule: s.indexRuleForSorting.GetMetadata().GetName(),
+ })
+ if found {
+ s.rangeOptsForSorting = rangeOpts
+ }
list, err := tree.Execute()
if err != nil {
return err
diff --git a/banyand/tsdb/series_seek_sort.go b/banyand/tsdb/series_seek_sort.go
index 7e9a308..dbe1947 100644
--- a/banyand/tsdb/series_seek_sort.go
+++ b/banyand/tsdb/series_seek_sort.go
@@ -27,6 +27,7 @@ import (
databasev2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v2"
modelv2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v2"
"github.com/apache/skywalking-banyandb/banyand/kv"
+ "github.com/apache/skywalking-banyandb/pkg/convert"
"github.com/apache/skywalking-banyandb/pkg/index"
"github.com/apache/skywalking-banyandb/pkg/index/posting"
"github.com/apache/skywalking-banyandb/pkg/logger"
@@ -48,23 +49,36 @@ func (s *seekerBuilder) buildSeries(filters []filterFn) []Iterator {
if s.indexRuleForSorting == nil {
return s.buildSeriesByTime(filters)
}
+ filters = append(filters, func(item Item) bool {
+ valid := s.seriesSpan.timeRange.contains(item.Time())
+ timeRange := s.seriesSpan.timeRange
+ s.seriesSpan.l.Trace().
+ Times("time_range", []time.Time{timeRange.Start, timeRange.End}).
+ Bool("valid", valid).Msg("filter item by time range")
+ return valid
+ })
return s.buildSeriesByIndex(filters)
}
func (s *seekerBuilder) buildSeriesByIndex(filters []filterFn) (series []Iterator) {
for _, b := range s.seriesSpan.blocks {
var inner index.FieldIterator
- term := index.FieldKey{
+ var found bool
+ fieldKey := index.FieldKey{
SeriesID: s.seriesSpan.seriesID,
IndexRule: s.indexRuleForSorting.GetMetadata().GetName(),
}
+
switch s.indexRuleForSorting.GetType() {
case databasev2.IndexRule_TYPE_TREE:
- inner = b.lsmIndexReader().FieldIterator(term.Marshal(), s.order)
+ inner, found = b.lsmIndexReader().Iterator(fieldKey, s.rangeOptsForSorting, s.order)
case databasev2.IndexRule_TYPE_INVERTED:
- inner = b.invertedIndexReader().FieldIterator(term.Marshal(), s.order)
+ inner, found = b.invertedIndexReader().Iterator(fieldKey, s.rangeOptsForSorting, s.order)
+ default:
+ // only tree index supports sorting
+ continue
}
- if inner != nil {
+ if found {
series = append(series, newSearcherIterator(s.seriesSpan.l, inner, b.dataReader(), s.seriesSpan.seriesID, filters))
}
}
@@ -86,14 +100,23 @@ func (s *seekerBuilder) buildSeriesByTime(filters []filterFn) []Iterator {
}
delegated := make([]Iterator, 0, len(bb))
bTimes := make([]time.Time, 0, len(bb))
+ timeRange := s.seriesSpan.timeRange
+ termRange := index.RangeOpts{
+ Lower: convert.Int64ToBytes(timeRange.Start.UnixNano()),
+ Upper: convert.Int64ToBytes(timeRange.End.UnixNano()),
+ IncludesLower: true,
+ }
for _, b := range bb {
bTimes = append(bTimes, b.startTime())
- inner := b.primaryIndexReader().
- FieldIterator(
- s.seriesSpan.seriesID.Marshal(),
+ inner, found := b.primaryIndexReader().
+ Iterator(
+ index.FieldKey{
+ SeriesID: s.seriesSpan.seriesID,
+ },
+ termRange,
s.order,
)
- if inner != nil {
+ if found {
delegated = append(delegated, newSearcherIterator(s.seriesSpan.l, inner, b.dataReader(), s.seriesSpan.seriesID, filters))
}
}
@@ -101,6 +124,7 @@ func (s *seekerBuilder) buildSeriesByTime(filters []filterFn) []Iterator {
Str("order", modelv2.QueryOrder_Sort_name[int32(s.order)]).
Times("blocks", bTimes).
Uint64("series_id", uint64(s.seriesSpan.seriesID)).
+ Int("shard_id", int(s.seriesSpan.shardID)).
Msg("seek series by time")
return []Iterator{newMergedIterator(delegated)}
}
@@ -122,7 +146,7 @@ func (s *searcherIterator) Next() bool {
if s.fieldIterator.Next() {
v := s.fieldIterator.Val()
s.cur = v.Value.Iterator()
- s.curKey = v.Key
+ s.curKey = v.Term
s.l.Trace().Uint64("series_id", uint64(s.seriesID)).Hex("term", s.curKey).Msg("got a new field")
} else {
return false
diff --git a/banyand/tsdb/series_write.go b/banyand/tsdb/series_write.go
index 60fba52..ea5404b 100644
--- a/banyand/tsdb/series_write.go
+++ b/banyand/tsdb/series_write.go
@@ -172,7 +172,9 @@ func (w *writer) Write() (GlobalItemID, error) {
}
}
return id, w.block.writePrimaryIndex(index.Field{
- Key: id.SeriesID.Marshal(),
+ Key: index.FieldKey{
+ SeriesID: id.SeriesID,
+ }.Marshal(),
Term: convert.Int64ToBytes(w.ts.UnixNano()),
}, id.ID)
}
diff --git a/pkg/index/index.go b/pkg/index/index.go
index bec268e..9be8831 100644
--- a/pkg/index/index.go
+++ b/pkg/index/index.go
@@ -19,18 +19,52 @@ package index
import (
"bytes"
+ "io"
+ "github.com/pkg/errors"
+
+ "github.com/apache/skywalking-banyandb/api/common"
modelv2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v2"
"github.com/apache/skywalking-banyandb/pkg/index/posting"
)
+var ErrMalformed = errors.New("the data is malformed")
+
+type FieldKey struct {
+ SeriesID common.SeriesID
+ IndexRule string
+}
+
+func (f FieldKey) Marshal() []byte {
+ return bytes.Join([][]byte{
+ f.SeriesID.Marshal(),
+ []byte(f.IndexRule),
+ }, nil)
+}
+
+func (f FieldKey) Equal(other FieldKey) bool {
+ return f.SeriesID == other.SeriesID && f.IndexRule == other.IndexRule
+}
+
type Field struct {
Key []byte
Term []byte
}
func (f Field) Marshal() []byte {
- return bytes.Join([][]byte{f.Key, f.Term}, nil)
+ return bytes.Join([][]byte{f.Key, f.Term}, []byte(":"))
+}
+
+func (f *Field) Unmarshal(raw []byte) error {
+ bb := bytes.SplitN(raw, []byte(":"), 2)
+ if len(bb) < 2 {
+ return errors.Wrap(ErrMalformed, "unable to unmarshal the field")
+ }
+ f.Key = make([]byte, len(bb[0]))
+ copy(f.Key, bb[0])
+ f.Term = make([]byte, len(bb[1]))
+ copy(f.Term, bb[1])
+ return nil
}
type RangeOpts struct {
@@ -40,6 +74,32 @@ type RangeOpts struct {
IncludesLower bool
}
+func (r RangeOpts) Between(value []byte) int {
+ if r.Upper != nil {
+ var in bool
+ if r.IncludesUpper {
+ in = bytes.Compare(r.Upper, value) >= 0
+ } else {
+ in = bytes.Compare(r.Upper, value) > 0
+ }
+ if !in {
+ return 1
+ }
+ }
+ if r.Lower != nil {
+ var in bool
+ if r.IncludesLower {
+ in = bytes.Compare(r.Lower, value) <= 0
+ } else {
+ in = bytes.Compare(r.Lower, value) < 0
+ }
+ if !in {
+ return -1
+ }
+ }
+ return 0
+}
+
type FieldIterator interface {
Next() bool
Val() *PostingValue
@@ -47,13 +107,27 @@ type FieldIterator interface {
}
type PostingValue struct {
- Key []byte
+ Term []byte
Value posting.List
}
+type Writer interface {
+ Write(field Field, itemID common.ItemID) error
+}
+
+type FieldIterable interface {
+ Iterator(fieldKey FieldKey, termRange RangeOpts, order modelv2.QueryOrder_Sort) (iter FieldIterator, found bool)
+}
+
type Searcher interface {
- MatchField(fieldName []byte) (list posting.List)
- MatchTerms(field Field) (list posting.List)
- Range(fieldName []byte, opts RangeOpts) (list posting.List)
- FieldIterator(fieldName []byte, order modelv2.QueryOrder_Sort) FieldIterator
+ FieldIterable
+ MatchField(fieldKey FieldKey) (list posting.List, err error)
+ MatchTerms(field Field) (list posting.List, err error)
+ Range(fieldKey FieldKey, opts RangeOpts) (list posting.List, err error)
+}
+
+type Store interface {
+ io.Closer
+ Writer
+ Searcher
}
diff --git a/pkg/index/inverted/field_map.go b/pkg/index/inverted/field_map.go
index 85c7771..2ee0831 100644
--- a/pkg/index/inverted/field_map.go
+++ b/pkg/index/inverted/field_map.go
@@ -20,44 +20,44 @@ package inverted
import (
"sync"
- "github.com/pkg/errors"
-
"github.com/apache/skywalking-banyandb/api/common"
"github.com/apache/skywalking-banyandb/pkg/convert"
"github.com/apache/skywalking-banyandb/pkg/index"
)
-var ErrFieldAbsent = errors.New("field doesn't exist")
-
type fieldHashID uint64
type fieldMap struct {
- repo map[fieldHashID]*fieldValue
+ repo map[fieldHashID]*termContainer
+ lst []fieldHashID
mutex sync.RWMutex
}
func newFieldMap(initialSize int) *fieldMap {
return &fieldMap{
- repo: make(map[fieldHashID]*fieldValue, initialSize),
+ repo: make(map[fieldHashID]*termContainer, initialSize),
+ lst: make([]fieldHashID, 0),
}
}
-func (fm *fieldMap) createKey(key []byte) *fieldValue {
- result := &fieldValue{
+func (fm *fieldMap) createKey(key []byte) *termContainer {
+ k := fieldHashID(convert.Hash(key))
+ result := &termContainer{
key: key,
value: newPostingMap(),
}
- fm.repo[fieldHashID(convert.Hash(key))] = result
+ fm.repo[k] = result
+ fm.lst = append(fm.lst, k)
return result
}
-func (fm *fieldMap) get(key []byte) (*fieldValue, bool) {
+func (fm *fieldMap) get(key []byte) (*termContainer, bool) {
fm.mutex.RLock()
defer fm.mutex.RUnlock()
return fm.getWithoutLock(key)
}
-func (fm *fieldMap) getWithoutLock(key []byte) (*fieldValue, bool) {
+func (fm *fieldMap) getWithoutLock(key []byte) (*termContainer, bool) {
v, ok := fm.repo[fieldHashID(convert.Hash(key))]
return v, ok
}
@@ -72,7 +72,7 @@ func (fm *fieldMap) put(fv index.Field, id common.ItemID) error {
return pm.value.put(fv.Term, id)
}
-type fieldValue struct {
+type termContainer struct {
key []byte
- value *postingMap
+ value *termMap
}
diff --git a/pkg/index/inverted/inverted.go b/pkg/index/inverted/inverted.go
index 569d434..ce50740 100644
--- a/pkg/index/inverted/inverted.go
+++ b/pkg/index/inverted/inverted.go
@@ -18,30 +18,203 @@
package inverted
import (
+ "bytes"
+ "sync"
+
+ "github.com/pkg/errors"
+ "go.uber.org/multierr"
+
"github.com/apache/skywalking-banyandb/api/common"
+ modelv2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v2"
+ "github.com/apache/skywalking-banyandb/banyand/kv"
"github.com/apache/skywalking-banyandb/pkg/index"
+ "github.com/apache/skywalking-banyandb/pkg/index/posting"
+ "github.com/apache/skywalking-banyandb/pkg/index/posting/roaring"
+ "github.com/apache/skywalking-banyandb/pkg/logger"
)
-type GlobalStore interface {
- Searcher() index.Searcher
- Insert(field index.Field, docID common.ItemID) error
-}
+var _ index.Store = (*store)(nil)
type store struct {
- memTable *MemTable
- //TODO: add data tables
+ diskTable kv.IndexStore
+ memTable *memTable
+ immutableMemTable *memTable
+ rwMutex sync.RWMutex
}
-func (s *store) Searcher() index.Searcher {
- return s.memTable
+type StoreOpts struct {
+ Path string
+ Logger *logger.Logger
}
-func (s *store) Insert(field index.Field, chunkID common.ItemID) error {
- return s.memTable.Insert(field, chunkID)
+func NewStore(opts StoreOpts) (index.Store, error) {
+ diskTable, err := kv.OpenIndexStore(0, opts.Path, kv.IndexWithLogger(opts.Logger))
+ if err != nil {
+ return nil, err
+ }
+ return &store{
+ memTable: newMemTable(),
+ diskTable: diskTable,
+ }, nil
}
-func NewStore(name string) GlobalStore {
- return &store{
- memTable: NewMemTable(name),
+func (s *store) Close() error {
+ return s.diskTable.Close()
+}
+
+func (s *store) Write(field index.Field, chunkID common.ItemID) error {
+ return s.memTable.Write(field, chunkID)
+}
+
+func (s *store) Flush() error {
+ s.rwMutex.Lock()
+ defer s.rwMutex.Unlock()
+ if s.immutableMemTable == nil {
+ s.immutableMemTable = s.memTable
+ s.memTable = newMemTable()
+ }
+ err := s.diskTable.
+ Handover(s.immutableMemTable.Iter())
+ if err != nil {
+ return err
+ }
+ s.immutableMemTable = nil
+ return nil
+}
+
+func (s *store) MatchField(fieldKey index.FieldKey) (posting.List, error) {
+ return s.Range(fieldKey, index.RangeOpts{})
+}
+
+func (s *store) MatchTerms(field index.Field) (posting.List, error) {
+ result := roaring.NewPostingList()
+ result, errMem := s.searchInMemTables(result, func(table *memTable) (posting.List, error) {
+ list, errInner := table.MatchTerms(field)
+ if errInner != nil {
+ return nil, errInner
+ }
+ return list, nil
+ })
+ if errMem != nil {
+ return nil, errors.Wrap(errMem, "mem table of inverted index")
+ }
+ raw, errTable := s.diskTable.Get(field.Marshal())
+ switch {
+ case errors.Is(errTable, kv.ErrKeyNotFound):
+ return result, nil
+ case errTable != nil:
+ return nil, errors.Wrap(errTable, "disk table of inverted index")
}
+ list := roaring.NewPostingList()
+ err := list.Unmarshall(raw)
+ if err != nil {
+ return nil, err
+ }
+ err = result.Union(list)
+ if err != nil {
+ return nil, err
+ }
+ return result, nil
}
+
+func (s *store) Range(fieldKey index.FieldKey, opts index.RangeOpts) (list posting.List, err error) {
+ iter, found := s.Iterator(fieldKey, opts, modelv2.QueryOrder_SORT_ASC)
+ if !found {
+ return roaring.EmptyPostingList, nil
+ }
+ list = roaring.NewPostingList()
+ for iter.Next() {
+ err = multierr.Append(err, list.Union(iter.Val().Value))
+ }
+ err = multierr.Append(err, iter.Close())
+ return
+}
+
+func (s *store) Iterator(fieldKey index.FieldKey, termRange index.RangeOpts,
+ order modelv2.QueryOrder_Sort) (index.FieldIterator, bool) {
+ s.rwMutex.RLock()
+ defer s.rwMutex.RUnlock()
+ tt := []*memTable{s.memTable, s.immutableMemTable}
+ iters := make([]index.FieldIterator, 0, len(tt)+1)
+ for _, table := range tt {
+ if table == nil {
+ continue
+ }
+ it, found := table.Iterator(fieldKey, termRange, order)
+ if !found {
+ continue
+ }
+ iters = append(iters, it)
+ }
+ it := index.NewFieldIteratorTemplate(fieldKey, termRange, order, s.diskTable, func(term, val []byte, delegated kv.Iterator) (*index.PostingValue, error) {
+ list := roaring.NewPostingList()
+ err := list.Unmarshall(val)
+ if err != nil {
+ return nil, err
+ }
+
+ pv := &index.PostingValue{
+ Term: term,
+ Value: list,
+ }
+
+ for ; delegated.Valid(); delegated.Next() {
+ f := index.Field{}
+ err := f.Unmarshal(delegated.Key())
+ if err != nil {
+ return nil, err
+ }
+ if !bytes.Equal(f.Term, term) {
+ break
+ }
+ l := roaring.NewPostingList()
+ err = l.Unmarshall(delegated.Val())
+ if err != nil {
+ return nil, err
+ }
+ err = pv.Value.Union(l)
+ if err != nil {
+ return nil, err
+ }
+ }
+ return pv, nil
+ })
+ iters = append(iters, it)
+ if len(iters) < 1 {
+ return nil, false
+ }
+ var fn index.SwitchFn
+ switch order {
+ case modelv2.QueryOrder_SORT_ASC, modelv2.QueryOrder_SORT_UNSPECIFIED:
+ fn = func(a, b []byte) bool {
+ return bytes.Compare(a, b) > 0
+ }
+ case modelv2.QueryOrder_SORT_DESC:
+ fn = func(a, b []byte) bool {
+ return bytes.Compare(a, b) < 0
+ }
+ }
+ return index.NewMergedIterator(iters, fn), true
+}
+
+func (s *store) searchInMemTables(result posting.List, entityFunc entityFunc) (posting.List, error) {
+ s.rwMutex.RLock()
+ defer s.rwMutex.RUnlock()
+ tt := []*memTable{s.memTable, s.immutableMemTable}
+ for _, table := range tt {
+ if table == nil {
+ continue
+ }
+ list, err := entityFunc(table)
+ if err != nil {
+ return result, err
+ }
+ err = result.Union(list)
+ if err != nil {
+ return result, err
+ }
+ }
+ return result, nil
+}
+
+type entityFunc func(table *memTable) (posting.List, error)
diff --git a/pkg/index/inverted/inverted_test.go b/pkg/index/inverted/inverted_test.go
new file mode 100644
index 0000000..88a427b
--- /dev/null
+++ b/pkg/index/inverted/inverted_test.go
@@ -0,0 +1,142 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package inverted
+
+import (
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+
+ "github.com/apache/skywalking-banyandb/pkg/index/posting"
+ "github.com/apache/skywalking-banyandb/pkg/index/posting/roaring"
+ "github.com/apache/skywalking-banyandb/pkg/index/test_cases"
+ "github.com/apache/skywalking-banyandb/pkg/logger"
+ "github.com/apache/skywalking-banyandb/pkg/test"
+)
+
+func TestStore_MatchTerm(t *testing.T) {
+ tester := assert.New(t)
+ path, fn := setUp(tester)
+ s, err := NewStore(StoreOpts{
+ Path: path,
+ Logger: logger.GetLogger("test"),
+ })
+ defer func() {
+ tester.NoError(s.Close())
+ fn()
+ }()
+ tester.NoError(err)
+ test_cases.SetUp(tester, s)
+ test_cases.RunServiceName(t, s)
+}
+
+func TestStore_MatchTerm_AfterFlush(t *testing.T) {
+ tester := assert.New(t)
+ path, fn := setUp(tester)
+ s, err := NewStore(StoreOpts{
+ Path: path,
+ Logger: logger.GetLogger("test"),
+ })
+ defer func() {
+ tester.NoError(s.Close())
+ fn()
+ }()
+ tester.NoError(err)
+ test_cases.SetUp(tester, s)
+ tester.NoError(s.(*store).Flush())
+ test_cases.RunServiceName(t, s)
+}
+
+func TestStore_Iterator(t *testing.T) {
+ tester := assert.New(t)
+ path, fn := setUp(tester)
+ s, err := NewStore(StoreOpts{
+ Path: path,
+ Logger: logger.GetLogger("test"),
+ })
+ defer func() {
+ tester.NoError(s.Close())
+ fn()
+ }()
+ tester.NoError(err)
+ data := test_cases.SetUpDuration(tester, s)
+ test_cases.RunDuration(t, data, s)
+}
+
+func TestStore_Iterator_AfterFlush(t *testing.T) {
+ tester := assert.New(t)
+ path, fn := setUp(tester)
+ s, err := NewStore(StoreOpts{
+ Path: path,
+ Logger: logger.GetLogger("test"),
+ })
+ defer func() {
+ tester.NoError(s.Close())
+ fn()
+ }()
+ tester.NoError(err)
+ data := test_cases.SetUpDuration(tester, s)
+ tester.NoError(s.(*store).Flush())
+ test_cases.RunDuration(t, data, s)
+}
+
+func TestStore_Iterator_Hybrid(t *testing.T) {
+ tester := assert.New(t)
+ path, fn := setUp(tester)
+ s, err := NewStore(StoreOpts{
+ Path: path,
+ Logger: logger.GetLogger("test"),
+ })
+ defer func() {
+ tester.NoError(s.Close())
+ fn()
+ }()
+ tester.NoError(err)
+ r := map[int]posting.List{
+ 50: roaring.NewPostingList(),
+ 200: nil,
+ 500: roaring.NewPostingList(),
+ 1000: nil,
+ 2000: roaring.NewPostingList(),
+ }
+ data1 := test_cases.SetUpPartialDuration(tester, s, r)
+ tester.NoError(s.(*store).Flush())
+ r = map[int]posting.List{
+ 50: nil,
+ 200: roaring.NewPostingList(),
+ 500: nil,
+ 1000: roaring.NewPostingList(),
+ 2000: nil,
+ }
+ data := test_cases.SetUpPartialDuration(tester, s, r)
+ for i, list := range data {
+ if list == nil {
+ data[i] = data1[i]
+ }
+ }
+ test_cases.RunDuration(t, data, s)
+}
+
+func setUp(t *assert.Assertions) (tempDir string, deferFunc func()) {
+ t.NoError(logger.Init(logger.Logging{
+ Env: "dev",
+ Level: "debug",
+ }))
+ tempDir, deferFunc = test.Space(t)
+ return tempDir, deferFunc
+}
diff --git a/pkg/index/inverted/mem.go b/pkg/index/inverted/mem.go
index f8db9b7..cd19027 100644
--- a/pkg/index/inverted/mem.go
+++ b/pkg/index/inverted/mem.go
@@ -21,41 +21,33 @@ import (
"bytes"
"sort"
- "github.com/pkg/errors"
+ "go.uber.org/multierr"
"github.com/apache/skywalking-banyandb/api/common"
modelv2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v2"
+ "github.com/apache/skywalking-banyandb/banyand/kv"
"github.com/apache/skywalking-banyandb/pkg/index"
"github.com/apache/skywalking-banyandb/pkg/index/posting"
"github.com/apache/skywalking-banyandb/pkg/index/posting/roaring"
)
-var ErrFieldsAbsent = errors.New("fields are absent")
-
-var _ index.Searcher = (*MemTable)(nil)
+var (
+ _ index.Writer = (*memTable)(nil)
+ _ index.FieldIterable = (*memTable)(nil)
+)
-type MemTable struct {
- terms *fieldMap
- name string
+type memTable struct {
+ fields *fieldMap
}
-func NewMemTable(name string) *MemTable {
- return &MemTable{
- name: name,
- terms: newFieldMap(1000),
+func newMemTable() *memTable {
+ return &memTable{
+ fields: newFieldMap(1000),
}
}
-func (m *MemTable) Insert(field index.Field, chunkID common.ItemID) error {
- return m.terms.put(field, chunkID)
-}
-
-func (m *MemTable) MatchField(fieldName []byte) (list posting.List) {
- fieldsValues, ok := m.terms.get(fieldName)
- if !ok {
- return roaring.EmptyPostingList
- }
- return fieldsValues.value.allValues()
+func (m *memTable) Write(field index.Field, chunkID common.ItemID) error {
+ return m.fields.put(field, chunkID)
}
var _ index.FieldIterator = (*fIterator)(nil)
@@ -64,7 +56,7 @@ type fIterator struct {
index int
val *index.PostingValue
keys [][]byte
- valueRepo *postingMap
+ valueRepo *termMap
closed bool
}
@@ -74,7 +66,6 @@ func (f *fIterator) Next() bool {
}
f.index++
if f.index >= len(f.keys) {
- _ = f.Close()
return false
}
f.val = f.valueRepo.getEntry(f.keys[f.index])
@@ -93,7 +84,7 @@ func (f *fIterator) Close() error {
return nil
}
-func newFieldIterator(keys [][]byte, fValue *postingMap) index.FieldIterator {
+func newFieldIterator(keys [][]byte, fValue *termMap) index.FieldIterator {
return &fIterator{
keys: keys,
valueRepo: fValue,
@@ -101,45 +92,139 @@ func newFieldIterator(keys [][]byte, fValue *postingMap) index.FieldIterator {
}
}
-func (m *MemTable) FieldIterator(fieldName []byte, order modelv2.QueryOrder_Sort) index.FieldIterator {
- fieldsValues, ok := m.terms.get(fieldName)
+func (m *memTable) Iterator(fieldKey index.FieldKey, rangeOpts index.RangeOpts,
+ order modelv2.QueryOrder_Sort) (iter index.FieldIterator, found bool) {
+ fieldsValues, ok := m.fields.get(fieldKey.Marshal())
if !ok {
- return nil
+ return nil, false
}
fValue := fieldsValues.value
- var keys [][]byte
+ var terms [][]byte
{
fValue.mutex.RLock()
defer fValue.mutex.RUnlock()
for _, value := range fValue.repo {
- keys = append(keys, value.Key)
+ if rangeOpts.Between(value.Term) == 0 {
+ terms = append(terms, value.Term)
+ }
}
}
+ if len(terms) < 1 {
+ return nil, false
+ }
switch order {
- case modelv2.QueryOrder_SORT_ASC:
- sort.SliceStable(keys, func(i, j int) bool {
- return bytes.Compare(keys[i], keys[j]) < 0
+ case modelv2.QueryOrder_SORT_ASC, modelv2.QueryOrder_SORT_UNSPECIFIED:
+ sort.SliceStable(terms, func(i, j int) bool {
+ return bytes.Compare(terms[i], terms[j]) < 0
})
case modelv2.QueryOrder_SORT_DESC:
- sort.SliceStable(keys, func(i, j int) bool {
- return bytes.Compare(keys[i], keys[j]) > 0
+ sort.SliceStable(terms, func(i, j int) bool {
+ return bytes.Compare(terms[i], terms[j]) > 0
})
}
- return newFieldIterator(keys, fValue)
+ return newFieldIterator(terms, fValue), true
}
-func (m *MemTable) MatchTerms(field index.Field) (list posting.List) {
- fieldsValues, ok := m.terms.get(field.Key)
+func (m *memTable) MatchTerms(field index.Field) (posting.List, error) {
+ fieldsValues, ok := m.fields.get(field.Key)
if !ok {
- return roaring.EmptyPostingList
+ return roaring.EmptyPostingList, nil
+ }
+ list := fieldsValues.value.get(field.Term)
+ if list == nil {
+ return roaring.EmptyPostingList, nil
}
- return fieldsValues.value.get(field.Term).Clone()
+ return list, nil
}
-func (m *MemTable) Range(fieldName []byte, opts index.RangeOpts) (list posting.List) {
- fieldsValues, ok := m.terms.get(fieldName)
- if !ok {
- return roaring.EmptyPostingList
+var _ kv.Iterator = (*flushIterator)(nil)
+
+type flushIterator struct {
+ fieldIdx int
+ termIdx int
+ key []byte
+ value []byte
+ fields *fieldMap
+ valid bool
+ err error
+}
+
+func (i *flushIterator) Next() {
+ if i.fieldIdx >= len(i.fields.lst) {
+ i.valid = false
+ return
+ }
+ fieldID := i.fields.lst[i.fieldIdx]
+ terms := i.fields.repo[fieldID]
+ if i.termIdx < len(terms.value.lst) {
+ i.termIdx++
+ if !i.setCurr() {
+ i.Next()
+ }
+ return
+ }
+ i.fieldIdx++
+ i.termIdx = 0
+ if !i.setCurr() {
+ i.Next()
+ }
+}
+
+func (i *flushIterator) Rewind() {
+ i.fieldIdx = 0
+ i.termIdx = 0
+ i.valid = true
+ if !i.setCurr() {
+ i.valid = false
+ }
+}
+
+func (i *flushIterator) Seek(_ []byte) {
+ panic("unsupported")
+}
+
+func (i *flushIterator) Key() []byte {
+ return i.key
+}
+
+func (i *flushIterator) Val() []byte {
+ return i.value
+}
+
+func (i *flushIterator) Valid() bool {
+ return i.valid
+}
+
+func (i *flushIterator) Close() error {
+ return i.err
+}
+
+func (i *flushIterator) setCurr() bool {
+ if i.fieldIdx >= len(i.fields.lst) {
+ return false
+ }
+ fieldID := i.fields.lst[i.fieldIdx]
+ term := i.fields.repo[fieldID]
+ if i.termIdx >= len(term.value.lst) {
+ return false
+ }
+ valueID := term.value.lst[i.termIdx]
+ value := term.value.repo[valueID]
+ v, err := value.Value.Marshall()
+ if err != nil {
+ i.err = multierr.Append(i.err, err)
+ return false
+ }
+ i.value = v
+ i.key = index.Field{
+ Key: term.key,
+ Term: value.Term,
+ }.Marshal()
+ return true
+}
+
+func (m *memTable) Iter() kv.Iterator {
+ return &flushIterator{
+ fields: m.fields,
}
- return fieldsValues.value.getRange(opts)
}
diff --git a/pkg/index/inverted/mem_test.go b/pkg/index/inverted/mem_test.go
index 99caa2c..9e7c4eb 100644
--- a/pkg/index/inverted/mem_test.go
+++ b/pkg/index/inverted/mem_test.go
@@ -18,242 +18,21 @@
package inverted
import (
- "reflect"
"testing"
"github.com/stretchr/testify/assert"
- "github.com/apache/skywalking-banyandb/api/common"
- modelv2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v2"
- "github.com/apache/skywalking-banyandb/pkg/convert"
- "github.com/apache/skywalking-banyandb/pkg/index"
- "github.com/apache/skywalking-banyandb/pkg/index/posting"
- "github.com/apache/skywalking-banyandb/pkg/index/posting/roaring"
+ "github.com/apache/skywalking-banyandb/pkg/index/test_cases"
)
-func TestMemTable_Range(t *testing.T) {
- type args struct {
- fieldName []byte
- opts index.RangeOpts
- }
- m := NewMemTable("sw")
- setUp(t, m)
- tests := []struct {
- name string
- args args
- wantList posting.List
- }{
- {
- name: "in range",
- args: args{
- fieldName: []byte("duration"),
- opts: index.RangeOpts{
- Lower: convert.Uint16ToBytes(100),
- Upper: convert.Uint16ToBytes(500),
- },
- },
- wantList: m.MatchTerms(index.Field{
- Key: []byte("duration"),
- Term: convert.Uint16ToBytes(200),
- }),
- },
- {
- name: "excludes edge",
- args: args{
- fieldName: []byte("duration"),
- opts: index.RangeOpts{
- Lower: convert.Uint16ToBytes(50),
- Upper: convert.Uint16ToBytes(1000),
- },
- },
- wantList: union(m,
- index.Field{
- Key: []byte("duration"),
- Term: convert.Uint16ToBytes(200),
- },
- ),
- },
- {
- name: "includes lower",
- args: args{
- fieldName: []byte("duration"),
- opts: index.RangeOpts{
- Lower: convert.Uint16ToBytes(50),
- Upper: convert.Uint16ToBytes(1000),
- IncludesLower: true,
- },
- },
- wantList: union(m,
- index.Field{
- Key: []byte("duration"),
- Term: convert.Uint16ToBytes(50),
- },
- index.Field{
- Key: []byte("duration"),
- Term: convert.Uint16ToBytes(200),
- },
- ),
- },
- {
- name: "includes upper",
- args: args{
- fieldName: []byte("duration"),
- opts: index.RangeOpts{
- Lower: convert.Uint16ToBytes(50),
- Upper: convert.Uint16ToBytes(1000),
- IncludesUpper: true,
- },
- },
- wantList: union(m,
- index.Field{
- Key: []byte("duration"),
- Term: convert.Uint16ToBytes(200),
- },
- index.Field{
- Key: []byte("duration"),
- Term: convert.Uint16ToBytes(1000),
- },
- ),
- },
- {
- name: "includes edges",
- args: args{
- fieldName: []byte("duration"),
- opts: index.RangeOpts{
- Lower: convert.Uint16ToBytes(50),
- Upper: convert.Uint16ToBytes(1000),
- IncludesUpper: true,
- IncludesLower: true,
- },
- },
- wantList: union(m,
- index.Field{
- Key: []byte("duration"),
- Term: convert.Uint16ToBytes(50),
- },
- index.Field{
- Key: []byte("duration"),
- Term: convert.Uint16ToBytes(200),
- },
- index.Field{
- Key: []byte("duration"),
- Term: convert.Uint16ToBytes(1000),
- },
- ),
- },
- {
- name: "match one",
- args: args{
- fieldName: []byte("duration"),
- opts: index.RangeOpts{
- Lower: convert.Uint16ToBytes(200),
- Upper: convert.Uint16ToBytes(200),
- IncludesUpper: true,
- IncludesLower: true,
- },
- },
- wantList: union(m,
- index.Field{
- Key: []byte("duration"),
- Term: convert.Uint16ToBytes(200),
- },
- ),
- },
- }
- for _, tt := range tests {
- t.Run(tt.name, func(t *testing.T) {
- if gotList := m.Range(tt.args.fieldName, tt.args.opts); !reflect.DeepEqual(gotList, tt.wantList) {
- t.Errorf("Range() = %v, want %v", gotList.Len(), tt.wantList.Len())
- }
- })
- }
+func TestMemTable_MatchTerm(t *testing.T) {
+ mt := newMemTable()
+ test_cases.SetUp(assert.New(t), mt)
+ test_cases.RunServiceName(t, mt)
}
func TestMemTable_Iterator(t *testing.T) {
- tester := assert.New(t)
- type args struct {
- fieldName []byte
- orderType modelv2.QueryOrder_Sort
- }
- m := NewMemTable("sw")
- setUp(t, m)
- tests := []struct {
- name string
- args args
- want [][]byte
- }{
- {
- name: "sort asc",
- args: args{
- fieldName: []byte("duration"),
- orderType: modelv2.QueryOrder_SORT_ASC,
- },
- want: [][]byte{convert.Uint16ToBytes(50), convert.Uint16ToBytes(200), convert.Uint16ToBytes(1000)},
- },
- {
- name: "sort desc",
- args: args{
- fieldName: []byte("duration"),
- orderType: modelv2.QueryOrder_SORT_DESC,
- },
- want: [][]byte{convert.Uint16ToBytes(1000), convert.Uint16ToBytes(200), convert.Uint16ToBytes(50)},
- },
- }
- for _, tt := range tests {
- t.Run(tt.name, func(t *testing.T) {
- iter := m.FieldIterator(tt.args.fieldName, tt.args.orderType)
- tester.NotNil(iter)
- var got [][]byte
- defer func() {
- _ = iter.Close()
- }()
- for iter.Next() {
- got = append(got, iter.Val().Key)
- }
- tester.Equal(tt.want, got)
- })
- }
-}
-
-func union(memTable *MemTable, fields ...index.Field) posting.List {
- result := roaring.NewPostingList()
- for _, f := range fields {
- _ = result.Union(memTable.MatchTerms(f))
- }
- return result
-}
-
-func setUp(t *testing.T, mt *MemTable) {
- for i := 0; i < 100; i++ {
- if i%2 == 0 {
- assert.NoError(t, mt.Insert(index.Field{
- Key: []byte("service_name"),
- Term: []byte("gateway"),
- }, common.ItemID(i)))
- } else {
- assert.NoError(t, mt.Insert(index.Field{
- Key: []byte("service_name"),
- Term: []byte("webpage"),
- }, common.ItemID(i)))
- }
- }
- for i := 100; i < 200; i++ {
- switch {
- case i%3 == 0:
- assert.NoError(t, mt.Insert(index.Field{
- Key: []byte("duration"),
- Term: convert.Uint16ToBytes(50),
- }, common.ItemID(i)))
- case i%3 == 1:
- assert.NoError(t, mt.Insert(index.Field{
- Key: []byte("duration"),
- Term: convert.Uint16ToBytes(200),
- }, common.ItemID(i)))
- case i%3 == 2:
- assert.NoError(t, mt.Insert(index.Field{
- Key: []byte("duration"),
- Term: convert.Uint16ToBytes(1000),
- }, common.ItemID(i)))
- }
- }
+ mt := newMemTable()
+ data := test_cases.SetUpDuration(assert.New(t), mt)
+ test_cases.RunDuration(t, data, mt)
}
diff --git a/pkg/index/inverted/term_map.go b/pkg/index/inverted/term_map.go
index c672239..e683837 100644
--- a/pkg/index/inverted/term_map.go
+++ b/pkg/index/inverted/term_map.go
@@ -18,8 +18,6 @@
package inverted
import (
- "bytes"
- "sort"
"sync"
"github.com/apache/skywalking-banyandb/api/common"
@@ -31,48 +29,50 @@ import (
type termHashID uint64
-type postingMap struct {
+type termMap struct {
repo map[termHashID]*index.PostingValue
+ lst []termHashID
mutex sync.RWMutex
}
-func newPostingMap() *postingMap {
- return &postingMap{
+func newPostingMap() *termMap {
+ return &termMap{
repo: make(map[termHashID]*index.PostingValue),
}
}
-func (p *postingMap) put(key []byte, id common.ItemID) error {
+func (p *termMap) put(key []byte, id common.ItemID) error {
list := p.getOrCreate(key)
list.Insert(id)
return nil
}
-func (p *postingMap) getOrCreate(key []byte) posting.List {
+func (p *termMap) getOrCreate(key []byte) posting.List {
list := p.get(key)
- if list != roaring.EmptyPostingList {
+ if list != nil {
return list
}
p.mutex.Lock()
defer p.mutex.Unlock()
hashedKey := termHashID(convert.Hash(key))
v := &index.PostingValue{
- Key: key,
+ Term: key,
Value: roaring.NewPostingList(),
}
p.repo[hashedKey] = v
+ p.lst = append(p.lst, hashedKey)
return v.Value
}
-func (p *postingMap) get(key []byte) posting.List {
+func (p *termMap) get(key []byte) posting.List {
e := p.getEntry(key)
if e == nil {
- return roaring.EmptyPostingList
+ return nil
}
return e.Value
}
-func (p *postingMap) getEntry(key []byte) *index.PostingValue {
+func (p *termMap) getEntry(key []byte) *index.PostingValue {
p.mutex.RLock()
defer p.mutex.RUnlock()
hashedKey := termHashID(convert.Hash(key))
@@ -82,80 +82,3 @@ func (p *postingMap) getEntry(key []byte) *index.PostingValue {
}
return v
}
-
-func (p *postingMap) allValues() posting.List {
- result := roaring.NewPostingList()
- for _, value := range p.repo {
- _ = result.Union(value.Value)
- }
- return result
-}
-
-func (p *postingMap) getRange(opts index.RangeOpts) posting.List {
- switch bytes.Compare(opts.Upper, opts.Lower) {
- case -1:
- return roaring.EmptyPostingList
- case 0:
- if opts.IncludesUpper && opts.IncludesLower {
- return p.get(opts.Upper)
- }
- return roaring.EmptyPostingList
- }
- p.mutex.RLock()
- defer p.mutex.RUnlock()
- keys := make(Asc, 0, len(p.repo))
- for _, v := range p.repo {
- keys = append(keys, v.Key)
- }
- sort.Sort(keys)
- index := sort.Search(len(keys), func(i int) bool {
- return bytes.Compare(keys[i], opts.Lower) >= 0
- })
- result := roaring.NewPostingList()
- for i := index; i < len(keys); i++ {
- k := keys[i]
- switch {
- case bytes.Equal(k, opts.Lower):
- if opts.IncludesLower {
- _ = result.Union(p.repo[termHashID(convert.Hash(k))].Value)
- }
- case bytes.Compare(k, opts.Upper) > 0:
- break
- case bytes.Equal(k, opts.Upper):
- if opts.IncludesUpper {
- _ = result.Union(p.repo[termHashID(convert.Hash(k))].Value)
- }
- default:
- _ = result.Union(p.repo[termHashID(convert.Hash(k))].Value)
- }
- }
- return result
-}
-
-type Asc [][]byte
-
-func (a Asc) Len() int {
- return len(a)
-}
-
-func (a Asc) Less(i, j int) bool {
- return bytes.Compare(a[i], a[j]) < 0
-}
-
-func (a Asc) Swap(i, j int) {
- a[i], a[j] = a[j], a[i]
-}
-
-type Desc [][]byte
-
-func (d Desc) Len() int {
- return len(d)
-}
-
-func (d Desc) Less(i, j int) bool {
- return bytes.Compare(d[i], d[j]) > 0
-}
-
-func (d Desc) Swap(i, j int) {
- d[i], d[j] = d[j], d[i]
-}
diff --git a/pkg/index/iterator.go b/pkg/index/iterator.go
new file mode 100644
index 0000000..439b627
--- /dev/null
+++ b/pkg/index/iterator.go
@@ -0,0 +1,209 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package index
+
+import (
+ "bytes"
+
+ "go.uber.org/multierr"
+
+ modelv2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v2"
+ "github.com/apache/skywalking-banyandb/banyand/kv"
+)
+
+type CompositePostingValueFn = func(term, value []byte, delegated kv.Iterator) (*PostingValue, error)
+
+var _ FieldIterator = (*FieldIteratorTemplate)(nil)
+
+type FieldIteratorTemplate struct {
+ delegated kv.Iterator
+
+ init bool
+ curr *PostingValue
+ err error
+ termRange RangeOpts
+ fn CompositePostingValueFn
+ reverse bool
+ field Field
+}
+
+func (f *FieldIteratorTemplate) Next() bool {
+ if !f.init {
+ f.init = true
+ f.delegated.Seek(f.field.Marshal())
+ }
+ if !f.delegated.Valid() {
+ return false
+ }
+ field := &Field{}
+ err := field.Unmarshal(f.delegated.Key())
+ if err != nil {
+ f.err = err
+ return false
+ }
+ if !bytes.Equal(field.Key, f.field.Key) {
+ return false
+ }
+ pv, err := f.fn(field.Term, f.delegated.Val(), f.delegated)
+ if err != nil {
+ f.err = err
+ return false
+ }
+ in := f.termRange.Between(pv.Term)
+ switch {
+ case in > 0:
+ if f.reverse {
+ return f.Next()
+ } else {
+ return false
+ }
+ case in < 0:
+ if f.reverse {
+ return false
+ } else {
+ return f.Next()
+ }
+ }
+ f.curr = pv
+ return true
+}
+
+func (f *FieldIteratorTemplate) Val() *PostingValue {
+ return f.curr
+}
+
+func (f *FieldIteratorTemplate) Close() error {
+ return f.delegated.Close()
+}
+
+func NewFieldIteratorTemplate(fieldKey FieldKey, termRange RangeOpts, order modelv2.QueryOrder_Sort, iterable kv.Iterable, fn CompositePostingValueFn) *FieldIteratorTemplate {
+ var reverse bool
+ var term []byte
+ switch order {
+ case modelv2.QueryOrder_SORT_ASC, modelv2.QueryOrder_SORT_UNSPECIFIED:
+ term = termRange.Lower
+ reverse = false
+ case modelv2.QueryOrder_SORT_DESC:
+ term = termRange.Upper
+ reverse = true
+ }
+ if order == modelv2.QueryOrder_SORT_DESC {
+ reverse = true
+ }
+ iter := iterable.NewIterator(kv.ScanOpts{
+ Prefix: fieldKey.Marshal(),
+ Reverse: reverse,
+ })
+ return &FieldIteratorTemplate{
+ delegated: iter,
+ termRange: termRange,
+ fn: fn,
+ reverse: reverse,
+ field: Field{
+ Key: fieldKey.Marshal(),
+ Term: term,
+ },
+ }
+}
+
+type SwitchFn = func(a, b []byte) bool
+
+var _ FieldIterator = (*mergedIterator)(nil)
+
+type mergedIterator struct {
+ inner []FieldIterator
+ drained []FieldIterator
+ drainedCount int
+ cur *PostingValue
+ switchFn SwitchFn
+ init bool
+ closed bool
+}
+
+func NewMergedIterator(merged []FieldIterator, fn SwitchFn) FieldIterator {
+ return &mergedIterator{
+ inner: merged,
+ drained: make([]FieldIterator, len(merged)),
+ switchFn: fn,
+ }
+}
+
+func (m *mergedIterator) Next() bool {
+ if m.closed {
+ return false
+ }
+ if m.allDrained() {
+ return false
+ }
+ if !m.init {
+ for i, iterator := range m.inner {
+ if !iterator.Next() {
+ m.drain(i)
+ }
+ }
+ if m.allDrained() {
+ return false
+ }
+ m.init = true
+ }
+ var head FieldIterator
+ var headIndex int
+ for i, iterator := range m.inner {
+ if iterator == nil {
+ continue
+ }
+ if head == nil {
+ head = iterator
+ continue
+ }
+ if m.switchFn(head.Val().Term, iterator.Val().Term) {
+ head = iterator
+ headIndex = i
+ }
+ }
+ m.cur = head.Val()
+ if !head.Next() {
+ m.drain(headIndex)
+ }
+ return true
+}
+
+func (m *mergedIterator) Val() *PostingValue {
+ return m.cur
+}
+
+func (m *mergedIterator) Close() error {
+ m.closed = true
+ var err error
+ for _, iterator := range m.drained {
+ if iterator == nil {
+ continue
+ }
+ err = multierr.Append(err, iterator.Close())
+ }
+ return err
+}
+
+func (m *mergedIterator) drain(index int) {
+ m.drained[index], m.inner[index] = m.inner[index], nil
+ m.drainedCount++
+}
+
+func (m *mergedIterator) allDrained() bool {
+ return m.drainedCount == len(m.inner)
+}
diff --git a/pkg/index/inverted/inverted.go b/pkg/index/lsm/lsm.go
similarity index 57%
copy from pkg/index/inverted/inverted.go
copy to pkg/index/lsm/lsm.go
index 569d434..14961a8 100644
--- a/pkg/index/inverted/inverted.go
+++ b/pkg/index/lsm/lsm.go
@@ -15,33 +15,43 @@
// specific language governing permissions and limitations
// under the License.
-package inverted
+package lsm
import (
"github.com/apache/skywalking-banyandb/api/common"
+ "github.com/apache/skywalking-banyandb/banyand/kv"
+ "github.com/apache/skywalking-banyandb/pkg/convert"
"github.com/apache/skywalking-banyandb/pkg/index"
+ "github.com/apache/skywalking-banyandb/pkg/logger"
)
-type GlobalStore interface {
- Searcher() index.Searcher
- Insert(field index.Field, docID common.ItemID) error
-}
+var _ index.Store = (*store)(nil)
type store struct {
- memTable *MemTable
- //TODO: add data tables
+ lsm kv.Store
}
-func (s *store) Searcher() index.Searcher {
- return s.memTable
+func (s *store) Close() error {
+ return s.lsm.Close()
}
-func (s *store) Insert(field index.Field, chunkID common.ItemID) error {
- return s.memTable.Insert(field, chunkID)
+func (s *store) Write(field index.Field, itemID common.ItemID) error {
+ itemIDInt := uint64(itemID)
+ return s.lsm.PutWithVersion(field.Marshal(), convert.Uint64ToBytes(itemIDInt), itemIDInt)
}
-func NewStore(name string) GlobalStore {
- return &store{
- memTable: NewMemTable(name),
+type StoreOpts struct {
+ Path string
+ Logger *logger.Logger
+}
+
+func NewStore(opts StoreOpts) (index.Store, error) {
+ var err error
+ var lsm kv.Store
+ if lsm, err = kv.OpenStore(0, opts.Path, kv.StoreWithLogger(opts.Logger)); err != nil {
+ return nil, err
}
+ return &store{
+ lsm: lsm,
+ }, nil
}
diff --git a/pkg/index/lsm/lsm_test.go b/pkg/index/lsm/lsm_test.go
new file mode 100644
index 0000000..2e8f8f8
--- /dev/null
+++ b/pkg/index/lsm/lsm_test.go
@@ -0,0 +1,69 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package lsm
+
+import (
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+
+ "github.com/apache/skywalking-banyandb/pkg/index/test_cases"
+ "github.com/apache/skywalking-banyandb/pkg/logger"
+ "github.com/apache/skywalking-banyandb/pkg/test"
+)
+
+func TestStore_MatchTerm(t *testing.T) {
+ tester := assert.New(t)
+ path, fn := setUp(tester)
+ s, err := NewStore(StoreOpts{
+ Path: path,
+ Logger: logger.GetLogger("test"),
+ })
+ defer func() {
+ tester.NoError(s.Close())
+ fn()
+ }()
+ tester.NoError(err)
+ test_cases.SetUp(tester, s)
+ test_cases.RunServiceName(t, s)
+}
+
+func TestStore_Iterator(t *testing.T) {
+ tester := assert.New(t)
+ path, fn := setUp(tester)
+ s, err := NewStore(StoreOpts{
+ Path: path,
+ Logger: logger.GetLogger("test"),
+ })
+ defer func() {
+ tester.NoError(s.Close())
+ fn()
+ }()
+ tester.NoError(err)
+ data := test_cases.SetUpDuration(tester, s)
+ test_cases.RunDuration(t, data, s)
+}
+
+func setUp(t *assert.Assertions) (tempDir string, deferFunc func()) {
+ t.NoError(logger.Init(logger.Logging{
+ Env: "dev",
+ Level: "info",
+ }))
+ tempDir, deferFunc = test.Space(t)
+ return tempDir, deferFunc
+}
diff --git a/pkg/index/lsm/search.go b/pkg/index/lsm/search.go
new file mode 100644
index 0000000..53bf0aa
--- /dev/null
+++ b/pkg/index/lsm/search.go
@@ -0,0 +1,84 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package lsm
+
+import (
+ "bytes"
+
+ "github.com/pkg/errors"
+ "go.uber.org/multierr"
+
+ "github.com/apache/skywalking-banyandb/api/common"
+ modelv2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v2"
+ "github.com/apache/skywalking-banyandb/banyand/kv"
+ "github.com/apache/skywalking-banyandb/pkg/convert"
+ "github.com/apache/skywalking-banyandb/pkg/index"
+ "github.com/apache/skywalking-banyandb/pkg/index/posting"
+ "github.com/apache/skywalking-banyandb/pkg/index/posting/roaring"
+)
+
+func (s *store) MatchField(fieldKey index.FieldKey) (list posting.List, err error) {
+ return s.Range(fieldKey, index.RangeOpts{})
+}
+
+func (s *store) MatchTerms(field index.Field) (list posting.List, err error) {
+ list = roaring.NewPostingList()
+ err = s.lsm.GetAll(field.Marshal(), func(itemID []byte) error {
+ list.Insert(common.ItemID(convert.BytesToUint64(itemID)))
+ return nil
+ })
+ if errors.Is(err, kv.ErrKeyNotFound) {
+ return roaring.EmptyPostingList, nil
+ }
+ return
+}
+
+func (s *store) Range(fieldKey index.FieldKey, opts index.RangeOpts) (list posting.List, err error) {
+ iter, found := s.Iterator(fieldKey, opts, modelv2.QueryOrder_SORT_ASC)
+ if !found {
+ return roaring.EmptyPostingList, nil
+ }
+ list = roaring.NewPostingList()
+ for iter.Next() {
+ err = multierr.Append(err, list.Union(iter.Val().Value))
+ }
+ err = multierr.Append(err, iter.Close())
+ return
+}
+
+func (s *store) Iterator(fieldKey index.FieldKey, termRange index.RangeOpts, order modelv2.QueryOrder_Sort) (index.FieldIterator, bool) {
+ return index.NewFieldIteratorTemplate(fieldKey, termRange, order, s.lsm, func(term, value []byte, delegated kv.Iterator) (*index.PostingValue, error) {
+ pv := &index.PostingValue{
+ Term: term,
+ Value: roaring.NewPostingListWithInitialData(convert.BytesToUint64(value)),
+ }
+
+ for ; delegated.Valid(); delegated.Next() {
+ f := index.Field{}
+ err := f.Unmarshal(delegated.Key())
+ if err != nil {
+ return nil, err
+ }
+ if !bytes.Equal(f.Term, term) {
+ break
+ }
+ pv.Value.Insert(common.ItemID(convert.BytesToUint64(delegated.Val())))
+ }
+ return pv, nil
+ }), true
+}
diff --git a/pkg/index/posting/roaring/roaring.go b/pkg/index/posting/roaring/roaring.go
index 8f1f3d7..d121f89 100644
--- a/pkg/index/posting/roaring/roaring.go
+++ b/pkg/index/posting/roaring/roaring.go
@@ -62,6 +62,14 @@ func NewPostingListWithInitialData(data ...uint64) posting.List {
return list
}
+func NewRange(start, end uint64) posting.List {
+ list := &postingsList{
+ bitmap: roaring64.New(),
+ }
+ list.bitmap.AddRange(start, end)
+ return list
+}
+
func (p *postingsList) Contains(id common.ItemID) bool {
return p.bitmap.Contains(uint64(id))
}
diff --git a/pkg/index/test_cases/duration.go b/pkg/index/test_cases/duration.go
new file mode 100644
index 0000000..af4bf1a
--- /dev/null
+++ b/pkg/index/test_cases/duration.go
@@ -0,0 +1,329 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package test_cases
+
+import (
+ "sort"
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+
+ "github.com/apache/skywalking-banyandb/api/common"
+ modelv2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v2"
+ "github.com/apache/skywalking-banyandb/pkg/convert"
+ "github.com/apache/skywalking-banyandb/pkg/index"
+ "github.com/apache/skywalking-banyandb/pkg/index/posting"
+ "github.com/apache/skywalking-banyandb/pkg/index/posting/roaring"
+)
+
+var (
+ duration = index.FieldKey{
+ IndexRule: "duration",
+ }
+)
+
+type SimpleStore interface {
+ index.FieldIterable
+ index.Writer
+ MatchTerms(field index.Field) (list posting.List, err error)
+}
+
+type args struct {
+ fieldKey index.FieldKey
+ termRange index.RangeOpts
+ orderType modelv2.QueryOrder_Sort
+}
+
+type result struct {
+ key []byte
+ items posting.List
+}
+
+func RunDuration(t *testing.T, data map[int]posting.List, store SimpleStore) {
+ tester := assert.New(t)
+ is := require.New(t)
+ tests := []struct {
+ name string
+ args args
+ want []int
+ }{
+ //{
+ // name: "sort in asc order",
+ // args: args{
+ // fieldKey: duration,
+ // orderType: modelv2.QueryOrder_SORT_ASC,
+ // },
+ // want: []int{50, 200, 500, 1000, 2000},
+ //},
+ //{
+ // name: "sort in desc order",
+ // args: args{
+ // fieldKey: duration,
+ // orderType: modelv2.QueryOrder_SORT_DESC,
+ // },
+ // want: []int{2000, 1000, 500, 200, 50},
+ //},
+ //{
+ // name: "scan in (lower, upper) and sort in asc order",
+ // args: args{
+ // fieldKey: duration,
+ // orderType: modelv2.QueryOrder_SORT_ASC,
+ // termRange: index.RangeOpts{
+ // Lower: convert.Int64ToBytes(50),
+ // Upper: convert.Int64ToBytes(2000),
+ // },
+ // },
+ // want: []int{200, 500, 1000},
+ //},
+ //{
+ // name: "scan in (lower, upper) and sort in desc order",
+ // args: args{
+ // fieldKey: duration,
+ // orderType: modelv2.QueryOrder_SORT_DESC,
+ // termRange: index.RangeOpts{
+ // Lower: convert.Int64ToBytes(50),
+ // Upper: convert.Int64ToBytes(2000),
+ // },
+ // },
+ // want: []int{1000, 500, 200},
+ //},
+ //{
+ // name: "scan in [lower, upper] and sort in asc order",
+ // args: args{
+ // fieldKey: duration,
+ // orderType: modelv2.QueryOrder_SORT_ASC,
+ // termRange: index.RangeOpts{
+ // Lower: convert.Int64ToBytes(200),
+ // IncludesLower: true,
+ // Upper: convert.Int64ToBytes(1000),
+ // IncludesUpper: true,
+ // },
+ // },
+ // want: []int{200, 500, 1000},
+ //},
+ //{
+ // name: "scan in [lower, upper] and sort in desc order",
+ // args: args{
+ // fieldKey: duration,
+ // orderType: modelv2.QueryOrder_SORT_DESC,
+ // termRange: index.RangeOpts{
+ // Lower: convert.Int64ToBytes(200),
+ // IncludesLower: true,
+ // Upper: convert.Int64ToBytes(1000),
+ // IncludesUpper: true,
+ // },
+ // },
+ // want: []int{1000, 500, 200},
+ //},
+ {
+ name: "scan in [lower, undefined) and sort in asc order",
+ args: args{
+ fieldKey: duration,
+ orderType: modelv2.QueryOrder_SORT_ASC,
+ termRange: index.RangeOpts{
+ Lower: convert.Int64ToBytes(200),
+ IncludesLower: true,
+ },
+ },
+ want: []int{200, 500, 1000, 2000},
+ },
+ //{
+ // name: "scan in [lower, undefined) and sort in desc order",
+ // args: args{
+ // fieldKey: duration,
+ // orderType: modelv2.QueryOrder_SORT_DESC,
+ // termRange: index.RangeOpts{
+ // Lower: convert.Int64ToBytes(200),
+ // IncludesLower: true,
+ // },
+ // },
+ // want: []int{2000, 1000, 500, 200},
+ //},
+ {
+ name: "scan in (undefined, upper] and sort in asc order",
+ args: args{
+ fieldKey: duration,
+ orderType: modelv2.QueryOrder_SORT_ASC,
+ termRange: index.RangeOpts{
+ Upper: convert.Int64ToBytes(1000),
+ IncludesUpper: true,
+ },
+ },
+ want: []int{50, 200, 500, 1000},
+ },
+ //{
+ // name: "scan in (undefined, upper] and sort in desc order",
+ // args: args{
+ // fieldKey: duration,
+ // orderType: modelv2.QueryOrder_SORT_DESC,
+ // termRange: index.RangeOpts{
+ // Upper: convert.Int64ToBytes(1000),
+ // IncludesUpper: true,
+ // },
+ // },
+ // want: []int{1000, 500, 200, 50},
+ //},
+ {
+ name: "scan splice in (lower, upper) and sort in asc order",
+ args: args{
+ fieldKey: duration,
+ orderType: modelv2.QueryOrder_SORT_ASC,
+ termRange: index.RangeOpts{
+ Lower: convert.Int64ToBytes(50 + 100),
+ Upper: convert.Int64ToBytes(2000 - 100),
+ },
+ },
+ want: []int{200, 500, 1000},
+ },
+ {
+ name: "scan splice in (lower, upper) and sort in desc order",
+ args: args{
+ fieldKey: duration,
+ orderType: modelv2.QueryOrder_SORT_DESC,
+ termRange: index.RangeOpts{
+ Lower: convert.Int64ToBytes(50 + 100),
+ Upper: convert.Int64ToBytes(2000 - 100),
+ },
+ },
+ want: []int{1000, 500, 200},
+ },
+ {
+ name: "scan splice in [lower, upper] and sort in asc order",
+ args: args{
+ fieldKey: duration,
+ orderType: modelv2.QueryOrder_SORT_ASC,
+ termRange: index.RangeOpts{
+ Lower: convert.Int64ToBytes(50 + 100),
+ IncludesLower: true,
+ Upper: convert.Int64ToBytes(2000 - 100),
+ IncludesUpper: true,
+ },
+ },
+ want: []int{200, 500, 1000},
+ },
+ {
+ name: "scan splice in [lower, upper] and sort in desc order",
+ args: args{
+ fieldKey: duration,
+ orderType: modelv2.QueryOrder_SORT_DESC,
+ termRange: index.RangeOpts{
+ Lower: convert.Int64ToBytes(50 + 100),
+ IncludesLower: true,
+ Upper: convert.Int64ToBytes(2000 - 100),
+ IncludesUpper: true,
+ },
+ },
+ want: []int{1000, 500, 200},
+ },
+ {
+ name: "no field key",
+ args: args{},
+ },
+ {
+ name: "unknown field key",
+ args: args{
+ fieldKey: index.FieldKey{
+ IndexRule: "unknown",
+ },
+ },
+ },
+ {
+ name: "default order",
+ args: args{
+ fieldKey: duration,
+ },
+ want: []int{50, 200, 500, 1000, 2000},
+ },
+ {
+ name: "invalid range",
+ args: args{
+ fieldKey: duration,
+ termRange: index.RangeOpts{
+ Lower: convert.Int64ToBytes(100),
+ Upper: convert.Int64ToBytes(50),
+ },
+ },
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ iter, found := store.Iterator(tt.args.fieldKey, tt.args.termRange, tt.args.orderType)
+ if !found {
+ tester.Empty(tt.want)
+ return
+ }
+ defer func() {
+ tester.NoError(iter.Close())
+ for i := 0; i < 10; i++ {
+ is.False(iter.Next())
+ }
+ }()
+ is.NotNil(iter)
+ var got []result
+ for iter.Next() {
+ got = append(got, result{
+ key: iter.Val().Term,
+ items: iter.Val().Value,
+ })
+ }
+ for i := 0; i < 10; i++ {
+ is.False(iter.Next())
+ }
+ is.Equal(len(tt.want), len(got))
+ for i, w := range tt.want {
+ g := got[i]
+ tester.Equal(int64(w), convert.BytesToInt64(g.key))
+ tester.True(data[w].Equal(g.items))
+ }
+ })
+ }
+}
+
+func SetUpDuration(t *assert.Assertions, store index.Writer) map[int]posting.List {
+ r := map[int]posting.List{
+ 50: roaring.NewPostingList(),
+ 200: roaring.NewPostingList(),
+ 500: roaring.NewPostingList(),
+ 1000: roaring.NewPostingList(),
+ 2000: roaring.NewPostingList(),
+ }
+ return SetUpPartialDuration(t, store, r)
+}
+
+func SetUpPartialDuration(t *assert.Assertions, store index.Writer, r map[int]posting.List) map[int]posting.List {
+ idx := make([]int, 0, len(r))
+ for key, _ := range r {
+ idx = append(idx, key)
+ }
+ sort.Ints(idx)
+ for i := 100; i < 200; i++ {
+ id := common.ItemID(i)
+ for i2, term := range idx {
+ if i%len(idx) != i2 || r[term] == nil {
+ continue
+ }
+ t.NoError(store.Write(index.Field{
+ Key: duration.Marshal(),
+ Term: convert.Int64ToBytes(int64(term)),
+ }, id))
+ r[term].Insert(id)
+ }
+ }
+ return r
+}
diff --git a/pkg/index/test_cases/service_name.go b/pkg/index/test_cases/service_name.go
new file mode 100644
index 0000000..40a0714
--- /dev/null
+++ b/pkg/index/test_cases/service_name.go
@@ -0,0 +1,100 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package test_cases
+
+import (
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+
+ "github.com/apache/skywalking-banyandb/api/common"
+ "github.com/apache/skywalking-banyandb/pkg/index"
+ "github.com/apache/skywalking-banyandb/pkg/index/posting"
+ "github.com/apache/skywalking-banyandb/pkg/index/posting/roaring"
+)
+
+var serviceName = index.FieldKey{
+ IndexRule: "service_name",
+}.Marshal()
+
+func RunServiceName(t *testing.T, store SimpleStore) {
+ tester := assert.New(t)
+ tests := []struct {
+ name string
+ arg index.Field
+ want posting.List
+ wantErr bool
+ }{
+ {
+ name: "match gateway",
+ arg: index.Field{
+ Key: serviceName,
+ Term: []byte("gateway"),
+ },
+ want: roaring.NewRange(0, 50),
+ },
+ {
+ name: "match webpage",
+ arg: index.Field{
+ Key: serviceName,
+ Term: []byte("webpage"),
+ },
+ want: roaring.NewRange(50, 100),
+ },
+ {
+ name: "unknown field",
+ want: roaring.EmptyPostingList,
+ },
+ {
+ name: "unknown term",
+ arg: index.Field{
+ Key: serviceName,
+ Term: []byte("unknown"),
+ },
+ want: roaring.EmptyPostingList,
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ list, err := store.MatchTerms(tt.arg)
+ if tt.wantErr {
+ tester.Error(err)
+ return
+ }
+ tester.NoError(err)
+ tester.NotNil(list)
+ tester.True(tt.want.Equal(list))
+ })
+ }
+}
+
+func SetUp(t *assert.Assertions, store SimpleStore) {
+ for i := 0; i < 100; i++ {
+ if i < 100/2 {
+ t.NoError(store.Write(index.Field{
+ Key: serviceName,
+ Term: []byte("gateway"),
+ }, common.ItemID(i)))
+ } else {
+ t.NoError(store.Write(index.Field{
+ Key: serviceName,
+ Term: []byte("webpage"),
+ }, common.ItemID(i)))
+ }
+ }
+}
diff --git a/pkg/index/search.go b/pkg/index/tree.go
similarity index 87%
rename from pkg/index/search.go
rename to pkg/index/tree.go
index 3f304d5..1f5bda1 100644
--- a/pkg/index/search.go
+++ b/pkg/index/tree.go
@@ -25,7 +25,6 @@ import (
"github.com/pkg/errors"
- "github.com/apache/skywalking-banyandb/api/common"
modelv2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v2"
"github.com/apache/skywalking-banyandb/pkg/index/posting"
)
@@ -38,18 +37,7 @@ type Executor interface {
type Tree interface {
Executor
-}
-
-type FieldKey struct {
- SeriesID common.SeriesID
- IndexRule string
-}
-
-func (t *FieldKey) Marshal() []byte {
- return bytes.Join([][]byte{
- t.SeriesID.Marshal(),
- []byte(t.IndexRule),
- }, []byte(":"))
+ TrimRangeLeaf(key FieldKey) (rangeOpts RangeOpts, found bool)
}
type Condition map[FieldKey][]ConditionValue
@@ -66,8 +54,7 @@ func BuildTree(searcher Searcher, condMap Condition) (Tree, error) {
searcher: searcher,
},
}
- for term, conds := range condMap {
- key := term.Marshal()
+ for key, conds := range condMap {
var rangeLeaf *rangeOp
for _, cond := range conds {
if rangeLeaf != nil && !rangeOP(cond.Op) {
@@ -136,7 +123,7 @@ type node struct {
SubNodes []Executor `json:"sub_nodes,omitempty"`
}
-func (n *node) newEq(key []byte, values [][]byte) *eq {
+func (n *node) newEq(key FieldKey, values [][]byte) *eq {
return &eq{
leaf: &leaf{
Key: key,
@@ -146,11 +133,11 @@ func (n *node) newEq(key []byte, values [][]byte) *eq {
}
}
-func (n *node) addEq(key []byte, values [][]byte) {
+func (n *node) addEq(key FieldKey, values [][]byte) {
n.SubNodes = append(n.SubNodes, n.newEq(key, values))
}
-func (n *node) addNot(key []byte, inner Executor) {
+func (n *node) addNot(key FieldKey, inner Executor) {
n.SubNodes = append(n.SubNodes, ¬{
Key: key,
searcher: n.searcher,
@@ -158,13 +145,13 @@ func (n *node) addNot(key []byte, inner Executor) {
})
}
-func (n *node) addRangeLeaf(key []byte) *rangeOp {
+func (n *node) addRangeLeaf(key FieldKey) *rangeOp {
r := &rangeOp{
leaf: &leaf{
Key: key,
searcher: n.searcher,
},
- Opts: &RangeOpts{},
+ Opts: RangeOpts{},
}
n.SubNodes = append(n.SubNodes, r)
return r
@@ -221,6 +208,23 @@ type andNode struct {
*node
}
+func (an *andNode) TrimRangeLeaf(key FieldKey) (RangeOpts, bool) {
+ removeLeaf := func(s []Executor, index int) []Executor {
+ return append(s[:index], s[index+1:]...)
+ }
+ for i, subNode := range an.SubNodes {
+ leafRange, ok := subNode.(*rangeOp)
+ if !ok {
+ continue
+ }
+ if key.Equal(leafRange.Key) {
+ an.SubNodes = removeLeaf(an.SubNodes, i)
+ return leafRange.Opts, true
+ }
+ }
+ return RangeOpts{}, false
+}
+
func (an *andNode) merge(list posting.List) error {
return an.value.Intersect(list)
}
@@ -255,20 +259,23 @@ func (on *orNode) MarshalJSON() ([]byte, error) {
type leaf struct {
Executor
- Key []byte
+ Key FieldKey
Values [][]byte
searcher Searcher
}
type not struct {
Executor
- Key []byte
+ Key FieldKey
searcher Searcher
Inner Executor
}
func (n *not) Execute() (posting.List, error) {
- all := n.searcher.MatchField(n.Key)
+ all, err := n.searcher.MatchField(n.Key)
+ if err != nil {
+ return nil, err
+ }
list, err := n.Inner.Execute()
if err != nil {
return nil, err
@@ -289,9 +296,9 @@ type eq struct {
func (eq *eq) Execute() (posting.List, error) {
return eq.searcher.MatchTerms(Field{
- Key: eq.Key,
+ Key: eq.Key.Marshal(),
Term: bytes.Join(eq.Values, nil),
- }), nil
+ })
}
func (eq *eq) MarshalJSON() ([]byte, error) {
@@ -302,11 +309,11 @@ func (eq *eq) MarshalJSON() ([]byte, error) {
type rangeOp struct {
*leaf
- Opts *RangeOpts
+ Opts RangeOpts
}
func (r *rangeOp) Execute() (posting.List, error) {
- return r.searcher.Range(r.Key, *r.Opts), nil
+ return r.searcher.Range(r.Key, r.Opts)
}
func (r *rangeOp) MarshalJSON() ([]byte, error) {