You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ha...@apache.org on 2021/08/03 13:53:21 UTC
[skywalking-banyandb] 05/05: Test search
This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch index-mem
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 6e6081647be408fc825594a6bbeb2c4c4f483688
Author: Gao Hongtao <ha...@gmail.com>
AuthorDate: Tue Aug 3 21:07:15 2021 +0800
Test search
Signed-off-by: Gao Hongtao <ha...@gmail.com>
---
api/common/metadata.go | 10 ++
banyand/index/index.go | 13 +-
banyand/index/index_test.go | 185 +++++++++++++++++++++++++++++
banyand/index/search.go | 127 +++++++++++++++-----
banyand/index/search_test.go | 255 ++++++++++++++++++++++++++++++++++++++++
banyand/index/tsdb/mem.go | 2 +-
banyand/index/tsdb/term_map.go | 2 +
banyand/query/processor_test.go | 11 +-
pkg/convert/number.go | 4 +
pkg/logger/logger.go | 7 ++
pkg/posting/posting.go | 2 +
pkg/posting/roaring/roaring.go | 10 ++
12 files changed, 596 insertions(+), 32 deletions(-)
diff --git a/api/common/metadata.go b/api/common/metadata.go
index 1facc9e..8706aaf 100644
--- a/api/common/metadata.go
+++ b/api/common/metadata.go
@@ -40,3 +40,13 @@ func NewMetadata(spec *v1.Metadata) *Metadata {
Spec: spec,
}
}
+
+func NewMetadataByNameAndGroup(name, group string) *Metadata {
+ return &Metadata{
+ KindVersion: MetadataKindVersion,
+ Spec: &v1.Metadata{
+ Name: name,
+ Group: group,
+ },
+ }
+}
diff --git a/banyand/index/index.go b/banyand/index/index.go
index c09f27f..00f2861 100644
--- a/banyand/index/index.go
+++ b/banyand/index/index.go
@@ -160,6 +160,16 @@ type indexMeta struct {
sync.RWMutex
}
+func (i *indexMeta) get(series *apiv1.Metadata) *series {
+ i.RWMutex.RLock()
+ defer i.RWMutex.RUnlock()
+ s, ok := i.meta[compositeSeriesID(series)]
+ if ok {
+ return s
+ }
+ return nil
+}
+
type indexRuleListener struct {
log *logger.Logger
indexMeta *indexMeta
@@ -174,7 +184,8 @@ func (i *indexRuleListener) Rev(message bus.Message) (resp bus.Message) {
}
i.log.Info().
Str("action", apiv1.Action_name[int32(indexRuleEvent.Action)]).
- Str("series", indexRuleEvent.Series.String()).
+ Str("series-name", indexRuleEvent.Series.Name).
+ Str("series-group", indexRuleEvent.Series.Group).
Msg("received an index rule")
i.indexMeta.Lock()
defer i.indexMeta.Unlock()
diff --git a/banyand/index/index_test.go b/banyand/index/index_test.go
new file mode 100644
index 0000000..44a89af
--- /dev/null
+++ b/banyand/index/index_test.go
@@ -0,0 +1,185 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package index
+
+import (
+ "context"
+ "math"
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/assert"
+ "google.golang.org/protobuf/types/known/timestamppb"
+
+ "github.com/apache/skywalking-banyandb/api/common"
+ "github.com/apache/skywalking-banyandb/api/event"
+ apiv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/v1"
+ "github.com/apache/skywalking-banyandb/banyand/discovery"
+ "github.com/apache/skywalking-banyandb/pkg/bus"
+ "github.com/apache/skywalking-banyandb/pkg/convert"
+ "github.com/apache/skywalking-banyandb/pkg/logger"
+)
+
+func Test_service_Insert(t *testing.T) {
+ tester := assert.New(t)
+ type args struct {
+ series common.Metadata
+ shardID uint
+ field *Field
+ }
+ tests := []struct {
+ name string
+ args args
+ wantErr bool
+ }{
+ {
+ name: "str field",
+ args: args{
+ series: *common.NewMetadataByNameAndGroup("sw", "default"),
+ shardID: 0,
+ field: &Field{
+ ChunkID: common.ChunkID(1),
+ Name: "endpoint",
+ Value: []byte("/test"),
+ },
+ },
+ },
+ {
+ name: "int field",
+ args: args{
+ series: *common.NewMetadataByNameAndGroup("sw", "default"),
+ shardID: 1,
+ field: &Field{
+ ChunkID: common.ChunkID(2),
+ Name: "duration",
+ Value: convert.Int64ToBytes(500),
+ },
+ },
+ },
+ {
+ name: "unknown series",
+ args: args{
+ series: *common.NewMetadataByNameAndGroup("unknown", "default"),
+ shardID: 0,
+ field: &Field{
+ ChunkID: common.ChunkID(2),
+ Name: "duration",
+ Value: convert.Int64ToBytes(500),
+ },
+ },
+ wantErr: true,
+ },
+ {
+ name: "unknown shard",
+ args: args{
+ series: *common.NewMetadataByNameAndGroup("sw", "default"),
+ shardID: math.MaxInt64,
+ field: &Field{
+ ChunkID: common.ChunkID(2),
+ Name: "duration",
+ Value: convert.Int64ToBytes(500),
+ },
+ },
+ wantErr: true,
+ },
+ {
+ name: "unknown field",
+ args: args{
+ series: *common.NewMetadataByNameAndGroup("sw", "default"),
+ shardID: 0,
+ field: &Field{
+ ChunkID: common.ChunkID(2),
+ Name: "unknown",
+ Value: convert.Int64ToBytes(500),
+ },
+ },
+ wantErr: true,
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ s := setUpModules(tester)
+ if err := s.Insert(tt.args.series, tt.args.shardID, tt.args.field); (err != nil) != tt.wantErr {
+ t.Errorf("Insert() error = %v, wantErr %v", err, tt.wantErr)
+ }
+ })
+ }
+}
+
+func Test_service_Init(t *testing.T) {
+ tester := assert.New(t)
+ s := setUpModules(tester)
+ tester.Equal(1, len(s.meta.meta))
+ tester.Equal(2, len(s.meta.meta["sw-default"].repo))
+}
+
+func setUpModules(tester *assert.Assertions) *service {
+ _ = logger.Bootstrap()
+ repo, err := discovery.NewServiceRepo(context.TODO())
+ tester.NoError(err)
+ svc, err := NewService(context.TODO(), repo)
+ tester.NoError(err)
+ tester.NoError(svc.PreRun())
+
+ rules := []*apiv1.IndexRule{
+ {
+ Objects: []*apiv1.IndexObject{
+ {
+ Name: "endpoint",
+ Fields: []string{"endpoint"},
+ },
+ {
+ Name: "duration",
+ Fields: []string{"duration"},
+ },
+ },
+ },
+ }
+ seriesID := &apiv1.Metadata{
+ Name: "sw",
+ Group: "default",
+ }
+ _, err = repo.Publish(event.TopicIndexRule, bus.NewMessage(bus.MessageID(time.Now().UnixNano()), &apiv1.IndexRuleEvent{
+ Series: seriesID,
+ Rules: []*apiv1.IndexRuleEvent_ShardedIndexRule{
+ {
+ ShardId: 0,
+ Rules: rules,
+ },
+ {
+ ShardId: 1,
+ Rules: rules,
+ },
+ },
+ Action: apiv1.Action_ACTION_PUT,
+ Time: timestamppb.Now(),
+ }))
+ tester.NoError(err)
+ s, ok := svc.(*service)
+ tester.True(ok)
+ deadline := time.Now().Add(10 * time.Second)
+ for {
+ if s.meta.get(seriesID) != nil {
+ break
+ }
+ if time.Now().After(deadline) {
+ tester.Fail("timeout")
+ }
+ }
+ return s
+}
diff --git a/banyand/index/search.go b/banyand/index/search.go
index 0c829f6..7d231c0 100644
--- a/banyand/index/search.go
+++ b/banyand/index/search.go
@@ -18,7 +18,12 @@
package index
import (
+ "encoding/base64"
+ "encoding/json"
+ "strings"
+
"github.com/pkg/errors"
+ "github.com/rs/zerolog"
"github.com/apache/skywalking-banyandb/api/common"
apiv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/v1"
@@ -52,7 +57,15 @@ func (s *service) Search(series common.Metadata, shardID uint, startTime, endTim
if errBuild != nil {
return nil, err
}
- return tree.execute()
+ if s.log.Should(zerolog.DebugLevel) {
+ s.log.Debug().Interface("search-tree", tree).Msg("build search tree")
+ }
+
+ result, err := tree.execute()
+ if result == nil {
+ return roaring.EmptyPostingList, err
+ }
+ return result, err
}
func buildSearchTree(searcher tsdb.Searcher, indexObject string, conditions []Condition) (searchTree, error) {
@@ -99,7 +112,7 @@ func buildSearchTree(searcher tsdb.Searcher, indexObject string, conditions []Co
n.addEq(key, [][]byte{v})
}
case apiv1.PairQuery_BINARY_OP_NOT_HAVING:
- n := root.addOrNode(len(cond.Values))
+ n := root.newOrNode(len(cond.Values))
for _, v := range cond.Values {
n.addEq(key, [][]byte{v})
}
@@ -112,10 +125,10 @@ func buildSearchTree(searcher tsdb.Searcher, indexObject string, conditions []Co
func rangeOP(op apiv1.PairQuery_BinaryOp) bool {
switch op {
- case apiv1.PairQuery_BINARY_OP_GT:
- case apiv1.PairQuery_BINARY_OP_GE:
- case apiv1.PairQuery_BINARY_OP_LT:
- case apiv1.PairQuery_BINARY_OP_LE:
+ case apiv1.PairQuery_BINARY_OP_GT,
+ apiv1.PairQuery_BINARY_OP_GE,
+ apiv1.PairQuery_BINARY_OP_LT,
+ apiv1.PairQuery_BINARY_OP_LE:
return true
}
return false
@@ -137,12 +150,11 @@ func toMap(indexObject string, condition []Condition) map[string][]Condition {
}
type logicalOP interface {
+ executable
merge(posting.List) error
}
type node struct {
- logicalOP
- executable
searcher tsdb.Searcher
value posting.List
SubNodes []executable `json:"sub_nodes,omitempty"`
@@ -151,8 +163,8 @@ type node struct {
func (n *node) newEq(key string, values [][]byte) *eq {
return &eq{
leaf: &leaf{
- Key: []byte(key),
- values: values,
+ Key: key,
+ Values: values,
searcher: n.searcher,
},
}
@@ -164,7 +176,7 @@ func (n *node) addEq(key string, values [][]byte) {
func (n *node) addNot(key string, inner executable) {
n.SubNodes = append(n.SubNodes, ¬{
- Key: []byte(key),
+ Key: key,
searcher: n.searcher,
Inner: inner,
})
@@ -173,7 +185,7 @@ func (n *node) addNot(key string, inner executable) {
func (n *node) addRangeLeaf(key string) *rangeOp {
r := &rangeOp{
leaf: &leaf{
- Key: []byte(key),
+ Key: key,
searcher: n.searcher,
},
Opts: &tsdb.RangeOpts{},
@@ -182,13 +194,17 @@ func (n *node) addRangeLeaf(key string) *rangeOp {
return r
}
-func (n *node) addOrNode(size int) *orNode {
- on := &orNode{
+func (n *node) newOrNode(size int) *orNode {
+ return &orNode{
node: &node{
searcher: n.searcher,
SubNodes: make([]executable, 0, size),
},
}
+}
+
+func (n *node) addOrNode(size int) *orNode {
+ on := n.newOrNode(size)
n.SubNodes = append(n.SubNodes, on)
return on
}
@@ -202,7 +218,7 @@ func (n *node) pop() (executable, bool) {
return sn, true
}
-func (n *node) execute() (posting.List, error) {
+func execute(n *node, lp logicalOP) (posting.List, error) {
ex, hasNext := n.pop()
if !hasNext {
return n.value, nil
@@ -213,16 +229,16 @@ func (n *node) execute() (posting.List, error) {
}
if n.value == nil {
n.value = r
- return n.execute()
+ return lp.execute()
}
- err = n.merge(r)
+ err = lp.merge(r)
if err != nil {
return nil, err
}
if n.value.IsEmpty() {
return n.value, nil
}
- return n.execute()
+ return lp.execute()
}
type andNode struct {
@@ -233,6 +249,16 @@ func (an *andNode) merge(list posting.List) error {
return an.value.Intersect(list)
}
+func (an *andNode) execute() (posting.List, error) {
+ return execute(an.node, an)
+}
+
+func (an *andNode) MarshalJSON() ([]byte, error) {
+ data := make(map[string]interface{}, 1)
+ data["and"] = an.node.SubNodes
+ return json.Marshal(data)
+}
+
type orNode struct {
*node
}
@@ -241,22 +267,32 @@ func (on *orNode) merge(list posting.List) error {
return on.value.Union(list)
}
+func (on *orNode) execute() (posting.List, error) {
+ return execute(on.node, on)
+}
+
+func (on *orNode) MarshalJSON() ([]byte, error) {
+ data := make(map[string]interface{}, 1)
+ data["or"] = on.node.SubNodes
+ return json.Marshal(data)
+}
+
type leaf struct {
executable
- Key []byte `json:"Key"`
- values [][]byte
+ Key string
+ Values [][]byte
searcher tsdb.Searcher
}
type not struct {
executable
- Key []byte `json:"key"`
+ Key string
searcher tsdb.Searcher
- Inner executable `json:"inner,omitempty"`
+ Inner executable
}
func (n *not) execute() (posting.List, error) {
- all := n.searcher.MatchField(n.Key)
+ all := n.searcher.MatchField([]byte(n.Key))
list, err := n.Inner.execute()
if err != nil {
return nil, err
@@ -265,22 +301,59 @@ func (n *not) execute() (posting.List, error) {
return all, err
}
+func (n *not) MarshalJSON() ([]byte, error) {
+ data := make(map[string]interface{}, 1)
+ data["not"] = n.Inner
+ return json.Marshal(data)
+}
+
type eq struct {
*leaf
}
func (eq *eq) execute() (posting.List, error) {
return eq.searcher.MatchTerms(&tsdb.Field{
- Name: eq.Key,
- Value: bytes.Join(eq.values...),
+ Name: []byte(eq.Key),
+ Value: bytes.Join(eq.Values...),
}), nil
}
+func (eq *eq) MarshalJSON() ([]byte, error) {
+ data := make(map[string]interface{}, 1)
+ data["eq"] = eq.leaf
+ return json.Marshal(data)
+}
+
type rangeOp struct {
*leaf
- Opts *tsdb.RangeOpts `json:"opts"`
+ Opts *tsdb.RangeOpts
}
func (r *rangeOp) execute() (posting.List, error) {
- return r.searcher.Range(r.Key, r.Opts), nil
+ return r.searcher.Range([]byte(r.Key), r.Opts), nil
+}
+
+func (r *rangeOp) MarshalJSON() ([]byte, error) {
+ data := make(map[string]interface{}, 1)
+ var builder strings.Builder
+ if r.Opts.Lower != nil {
+ if r.Opts.IncludesLower {
+ builder.WriteString("[")
+ } else {
+ builder.WriteString("(")
+ }
+ }
+ builder.WriteString(base64.StdEncoding.EncodeToString(r.Opts.Lower))
+ builder.WriteString(",")
+ builder.WriteString(base64.StdEncoding.EncodeToString(r.Opts.Upper))
+ if r.Opts.Upper != nil {
+ if r.Opts.IncludesUpper {
+ builder.WriteString("]")
+ } else {
+ builder.WriteString(")")
+ }
+ }
+ data["key"] = r.Key
+ data["range"] = builder.String()
+ return json.Marshal(data)
}
diff --git a/banyand/index/search_test.go b/banyand/index/search_test.go
new file mode 100644
index 0000000..d9eb8d9
--- /dev/null
+++ b/banyand/index/search_test.go
@@ -0,0 +1,255 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package index
+
+import (
+ "math"
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+
+ "github.com/apache/skywalking-banyandb/api/common"
+ apiv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/v1"
+ "github.com/apache/skywalking-banyandb/pkg/convert"
+ "github.com/apache/skywalking-banyandb/pkg/posting"
+ "github.com/apache/skywalking-banyandb/pkg/posting/roaring"
+)
+
+func Test_service_Search(t *testing.T) {
+ tester := assert.New(t)
+ type args struct {
+ indexObjectName string
+ conditions []Condition
+ }
+ tests := []struct {
+ name string
+ args args
+ want posting.List
+ wantErr bool
+ }{
+ {
+ name: "str equal",
+ args: args{
+ indexObjectName: "endpoint",
+ conditions: []Condition{
+ {
+ Key: "endpoint",
+ Op: apiv1.PairQuery_BINARY_OP_EQ,
+ Values: [][]byte{[]byte("/product")},
+ },
+ },
+ },
+ want: roaring.NewPostingListWithInitialData(1),
+ },
+ {
+ name: "str not equal",
+ args: args{
+ indexObjectName: "endpoint",
+ conditions: []Condition{
+ {
+ Key: "endpoint",
+ Op: apiv1.PairQuery_BINARY_OP_NE,
+ Values: [][]byte{[]byte("/product")},
+ },
+ },
+ },
+ want: roaring.NewPostingListWithInitialData(2, 3),
+ },
+ {
+ name: "str having",
+ args: args{
+ indexObjectName: "endpoint",
+ conditions: []Condition{
+ {
+ Key: "endpoint",
+ Op: apiv1.PairQuery_BINARY_OP_HAVING,
+ Values: [][]byte{[]byte("/product"), []byte("/sales")},
+ },
+ },
+ },
+ want: roaring.NewPostingListWithInitialData(1, 3),
+ },
+ {
+ name: "str not having",
+ args: args{
+ indexObjectName: "endpoint",
+ conditions: []Condition{
+ {
+ Key: "endpoint",
+ Op: apiv1.PairQuery_BINARY_OP_NOT_HAVING,
+ Values: [][]byte{[]byte("/product"), []byte("/sales")},
+ },
+ },
+ },
+ want: roaring.NewPostingListWithInitialData(2),
+ },
+ {
+ name: "int equal",
+ args: args{
+ indexObjectName: "duration",
+ conditions: []Condition{
+ {
+ Key: "duration",
+ Op: apiv1.PairQuery_BINARY_OP_EQ,
+ Values: [][]byte{convert.Int64ToBytes(500)},
+ },
+ },
+ },
+ want: roaring.NewPostingListWithInitialData(12),
+ },
+ {
+ name: "int not equal",
+ args: args{
+ indexObjectName: "duration",
+ conditions: []Condition{
+ {
+ Key: "duration",
+ Op: apiv1.PairQuery_BINARY_OP_NE,
+ Values: [][]byte{convert.Int64ToBytes(500)},
+ },
+ },
+ },
+ want: roaring.NewPostingListWithInitialData(11, 13, 14),
+ },
+ {
+ name: "int having",
+ args: args{
+ indexObjectName: "duration",
+ conditions: []Condition{
+ {
+ Key: "duration",
+ Op: apiv1.PairQuery_BINARY_OP_HAVING,
+ Values: [][]byte{convert.Int64ToBytes(500), convert.Int64ToBytes(50)},
+ },
+ },
+ },
+ want: roaring.NewPostingListWithInitialData(11, 12),
+ },
+ {
+ name: "int not having",
+ args: args{
+ indexObjectName: "duration",
+ conditions: []Condition{
+ {
+ Key: "duration",
+ Op: apiv1.PairQuery_BINARY_OP_NOT_HAVING,
+ Values: [][]byte{convert.Int64ToBytes(500), convert.Int64ToBytes(50)},
+ },
+ },
+ },
+ want: roaring.NewPostingListWithInitialData(13, 14),
+ },
+ {
+ name: "int in range",
+ args: args{
+ indexObjectName: "duration",
+ conditions: []Condition{
+ {
+ Key: "duration",
+ Op: apiv1.PairQuery_BINARY_OP_GT,
+ Values: [][]byte{convert.Int64ToBytes(50)},
+ },
+ {
+ Key: "duration",
+ Op: apiv1.PairQuery_BINARY_OP_LT,
+ Values: [][]byte{convert.Int64ToBytes(5000)},
+ },
+ },
+ },
+ want: roaring.NewPostingListWithInitialData(13, 12),
+ },
+ {
+ name: "int includes edges",
+ args: args{
+ indexObjectName: "duration",
+ conditions: []Condition{
+ {
+ Key: "duration",
+ Op: apiv1.PairQuery_BINARY_OP_GE,
+ Values: [][]byte{convert.Int64ToBytes(50)},
+ },
+ {
+ Key: "duration",
+ Op: apiv1.PairQuery_BINARY_OP_LE,
+ Values: [][]byte{convert.Int64ToBytes(5000)},
+ },
+ },
+ },
+ want: roaring.NewPostingListWithInitialData(13, 12, 11, 14),
+ },
+ }
+ s := setUpModules(tester)
+ setupData(tester, s)
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ got, err := s.Search(*common.NewMetadataByNameAndGroup("sw", "default"), 0, 0, math.MaxInt64, tt.args.indexObjectName, tt.args.conditions)
+ if (err != nil) != tt.wantErr {
+ t.Errorf("Search() error = %v, wantErr %v", err, tt.wantErr)
+ return
+ }
+ if !got.Equal(tt.want) {
+ t.Errorf("Search() got = %v, want %v", got.ToSlice(), tt.want.ToSlice())
+ }
+ })
+ }
+}
+
+func setupData(tester *assert.Assertions, s *service) {
+ fields := []*Field{
+ {
+ ChunkID: common.ChunkID(1),
+ Name: "endpoint",
+ Value: []byte("/product"),
+ },
+ {
+ ChunkID: common.ChunkID(2),
+ Name: "endpoint",
+ Value: []byte("/home"),
+ },
+ {
+ ChunkID: common.ChunkID(3),
+ Name: "endpoint",
+ Value: []byte("/sales"),
+ },
+ {
+ ChunkID: common.ChunkID(11),
+ Name: "duration",
+ Value: convert.Int64ToBytes(50),
+ },
+ {
+ ChunkID: common.ChunkID(12),
+ Name: "duration",
+ Value: convert.Int64ToBytes(500),
+ },
+ {
+ ChunkID: common.ChunkID(13),
+ Name: "duration",
+ Value: convert.Int64ToBytes(100),
+ },
+ {
+ ChunkID: common.ChunkID(14),
+ Name: "duration",
+ Value: convert.Int64ToBytes(5000),
+ },
+ }
+ for _, field := range fields {
+ if err := s.Insert(*common.NewMetadataByNameAndGroup("sw", "default"), 0, field); err != nil {
+ tester.NoError(err)
+ }
+ }
+}
diff --git a/banyand/index/tsdb/mem.go b/banyand/index/tsdb/mem.go
index a96b107..2e2ea9b 100644
--- a/banyand/index/tsdb/mem.go
+++ b/banyand/index/tsdb/mem.go
@@ -79,7 +79,7 @@ func (m *MemTable) MatchTerms(field *Field) (list posting.List) {
if !ok {
return roaring.EmptyPostingList
}
- return fieldsValues.value.get(field.Value)
+ return fieldsValues.value.get(field.Value).Clone()
}
func (m *MemTable) Range(fieldName []byte, opts *RangeOpts) (list posting.List) {
diff --git a/banyand/index/tsdb/term_map.go b/banyand/index/tsdb/term_map.go
index f325282..20bc492 100644
--- a/banyand/index/tsdb/term_map.go
+++ b/banyand/index/tsdb/term_map.go
@@ -92,6 +92,8 @@ func (p *postingMap) getRange(opts *RangeOpts) posting.List {
}
return roaring.EmptyPostingList
}
+ p.mutex.RLock()
+ defer p.mutex.RUnlock()
keys := make(Asc, 0, len(p.repo))
for _, v := range p.repo {
keys = append(keys, v.key)
diff --git a/banyand/query/processor_test.go b/banyand/query/processor_test.go
index 64e1bb3..7136e0e 100644
--- a/banyand/query/processor_test.go
+++ b/banyand/query/processor_test.go
@@ -24,6 +24,7 @@ import (
"testing"
"time"
+ "github.com/golang/mock/gomock"
googleUUID "github.com/google/uuid"
"github.com/stretchr/testify/require"
@@ -31,6 +32,7 @@ import (
"github.com/apache/skywalking-banyandb/api/event"
v1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/v1"
"github.com/apache/skywalking-banyandb/banyand/discovery"
+ "github.com/apache/skywalking-banyandb/banyand/index"
"github.com/apache/skywalking-banyandb/banyand/series"
"github.com/apache/skywalking-banyandb/banyand/series/trace"
"github.com/apache/skywalking-banyandb/banyand/storage"
@@ -49,7 +51,7 @@ type entityValue struct {
items []interface{}
}
-func setupServices(tester *require.Assertions) (discovery.ServiceRepo, series.Service, func()) {
+func setupServices(t *testing.T, tester *require.Assertions) (discovery.ServiceRepo, series.Service, func()) {
// Bootstrap logger system
tester.NoError(logger.Init(logger.Logging{
Env: "dev",
@@ -70,7 +72,10 @@ func setupServices(tester *require.Assertions) (discovery.ServiceRepo, series.Se
tester.NoError(db.FlagSet().Parse([]string{"--root-path=" + rootPath}))
// Init `Trace` module
- traceSvc, err := trace.NewService(context.TODO(), db, repo, nil)
+ ctrl := gomock.NewController(t)
+ mockIndex := index.NewMockService(ctrl)
+ mockIndex.EXPECT().Insert(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes()
+ traceSvc, err := trace.NewService(context.TODO(), db, repo, mockIndex)
tester.NoError(err)
// Init `Query` module
@@ -257,7 +262,7 @@ func TestQueryProcessor(t *testing.T) {
tester := require.New(t)
// setup services
- repo, traceSvc, gracefulStop := setupServices(tester)
+ repo, traceSvc, gracefulStop := setupServices(t, tester)
defer gracefulStop()
baseTs := time.Now()
diff --git a/pkg/convert/number.go b/pkg/convert/number.go
index c06d7a4..fa2d688 100644
--- a/pkg/convert/number.go
+++ b/pkg/convert/number.go
@@ -43,6 +43,10 @@ func Uint32ToBytes(u uint32) []byte {
return bs
}
+func BytesToInt64(b []byte) int64 {
+ return int64(binary.BigEndian.Uint64(b))
+}
+
func BytesToUint64(b []byte) uint64 {
return binary.BigEndian.Uint64(b)
}
diff --git a/pkg/logger/logger.go b/pkg/logger/logger.go
index 1a08dc5..0651e41 100644
--- a/pkg/logger/logger.go
+++ b/pkg/logger/logger.go
@@ -40,3 +40,10 @@ func (l *Logger) Named(name string) *Logger {
subLogger := root.Logger.With().Str("module", module).Logger()
return &Logger{module: module, Logger: &subLogger}
}
+
+func (l *Logger) Should(lvl zerolog.Level) bool {
+ if lvl < l.GetLevel() || lvl < zerolog.GlobalLevel() {
+ return false
+ }
+ return true
+}
diff --git a/pkg/posting/posting.go b/pkg/posting/posting.go
index 3812162..2d7ad16 100644
--- a/pkg/posting/posting.go
+++ b/pkg/posting/posting.go
@@ -58,6 +58,8 @@ type List interface {
RemoveRange(min, max common.ChunkID) error
Reset()
+
+ ToSlice() []common.ChunkID
}
type Iterator interface {
diff --git a/pkg/posting/roaring/roaring.go b/pkg/posting/roaring/roaring.go
index 0593813..0b1c9b6 100644
--- a/pkg/posting/roaring/roaring.go
+++ b/pkg/posting/roaring/roaring.go
@@ -194,3 +194,13 @@ func (it *roaringIterator) Close() error {
it.closed = true
return nil
}
+
+func (p *postingsList) ToSlice() []common.ChunkID {
+ iter := p.Iterator()
+ defer iter.Close()
+ s := make([]common.ChunkID, 0, p.Len())
+ for iter.Next() {
+ s = append(s, iter.Current())
+ }
+ return s
+}