You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ha...@apache.org on 2021/08/03 13:53:21 UTC

[skywalking-banyandb] 05/05: Test search

This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch index-mem
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit 6e6081647be408fc825594a6bbeb2c4c4f483688
Author: Gao Hongtao <ha...@gmail.com>
AuthorDate: Tue Aug 3 21:07:15 2021 +0800

    Test search
    
    Signed-off-by: Gao Hongtao <ha...@gmail.com>
---
 api/common/metadata.go          |  10 ++
 banyand/index/index.go          |  13 +-
 banyand/index/index_test.go     | 185 +++++++++++++++++++++++++++++
 banyand/index/search.go         | 127 +++++++++++++++-----
 banyand/index/search_test.go    | 255 ++++++++++++++++++++++++++++++++++++++++
 banyand/index/tsdb/mem.go       |   2 +-
 banyand/index/tsdb/term_map.go  |   2 +
 banyand/query/processor_test.go |  11 +-
 pkg/convert/number.go           |   4 +
 pkg/logger/logger.go            |   7 ++
 pkg/posting/posting.go          |   2 +
 pkg/posting/roaring/roaring.go  |  10 ++
 12 files changed, 596 insertions(+), 32 deletions(-)

diff --git a/api/common/metadata.go b/api/common/metadata.go
index 1facc9e..8706aaf 100644
--- a/api/common/metadata.go
+++ b/api/common/metadata.go
@@ -40,3 +40,13 @@ func NewMetadata(spec *v1.Metadata) *Metadata {
 		Spec:        spec,
 	}
 }
+
+func NewMetadataByNameAndGroup(name, group string) *Metadata {
+	return &Metadata{
+		KindVersion: MetadataKindVersion,
+		Spec: &v1.Metadata{
+			Name:  name,
+			Group: group,
+		},
+	}
+}
diff --git a/banyand/index/index.go b/banyand/index/index.go
index c09f27f..00f2861 100644
--- a/banyand/index/index.go
+++ b/banyand/index/index.go
@@ -160,6 +160,16 @@ type indexMeta struct {
 	sync.RWMutex
 }
 
+func (i *indexMeta) get(series *apiv1.Metadata) *series {
+	i.RWMutex.RLock()
+	defer i.RWMutex.RUnlock()
+	s, ok := i.meta[compositeSeriesID(series)]
+	if ok {
+		return s
+	}
+	return nil
+}
+
 type indexRuleListener struct {
 	log       *logger.Logger
 	indexMeta *indexMeta
@@ -174,7 +184,8 @@ func (i *indexRuleListener) Rev(message bus.Message) (resp bus.Message) {
 	}
 	i.log.Info().
 		Str("action", apiv1.Action_name[int32(indexRuleEvent.Action)]).
-		Str("series", indexRuleEvent.Series.String()).
+		Str("series-name", indexRuleEvent.Series.Name).
+		Str("series-group", indexRuleEvent.Series.Group).
 		Msg("received an index rule")
 	i.indexMeta.Lock()
 	defer i.indexMeta.Unlock()
diff --git a/banyand/index/index_test.go b/banyand/index/index_test.go
new file mode 100644
index 0000000..44a89af
--- /dev/null
+++ b/banyand/index/index_test.go
@@ -0,0 +1,185 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package index
+
+import (
+	"context"
+	"math"
+	"testing"
+	"time"
+
+	"github.com/stretchr/testify/assert"
+	"google.golang.org/protobuf/types/known/timestamppb"
+
+	"github.com/apache/skywalking-banyandb/api/common"
+	"github.com/apache/skywalking-banyandb/api/event"
+	apiv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/v1"
+	"github.com/apache/skywalking-banyandb/banyand/discovery"
+	"github.com/apache/skywalking-banyandb/pkg/bus"
+	"github.com/apache/skywalking-banyandb/pkg/convert"
+	"github.com/apache/skywalking-banyandb/pkg/logger"
+)
+
+func Test_service_Insert(t *testing.T) {
+	tester := assert.New(t)
+	type args struct {
+		series  common.Metadata
+		shardID uint
+		field   *Field
+	}
+	tests := []struct {
+		name    string
+		args    args
+		wantErr bool
+	}{
+		{
+			name: "str field",
+			args: args{
+				series:  *common.NewMetadataByNameAndGroup("sw", "default"),
+				shardID: 0,
+				field: &Field{
+					ChunkID: common.ChunkID(1),
+					Name:    "endpoint",
+					Value:   []byte("/test"),
+				},
+			},
+		},
+		{
+			name: "int field",
+			args: args{
+				series:  *common.NewMetadataByNameAndGroup("sw", "default"),
+				shardID: 1,
+				field: &Field{
+					ChunkID: common.ChunkID(2),
+					Name:    "duration",
+					Value:   convert.Int64ToBytes(500),
+				},
+			},
+		},
+		{
+			name: "unknown series",
+			args: args{
+				series:  *common.NewMetadataByNameAndGroup("unknown", "default"),
+				shardID: 0,
+				field: &Field{
+					ChunkID: common.ChunkID(2),
+					Name:    "duration",
+					Value:   convert.Int64ToBytes(500),
+				},
+			},
+			wantErr: true,
+		},
+		{
+			name: "unknown shard",
+			args: args{
+				series:  *common.NewMetadataByNameAndGroup("sw", "default"),
+				shardID: math.MaxInt64,
+				field: &Field{
+					ChunkID: common.ChunkID(2),
+					Name:    "duration",
+					Value:   convert.Int64ToBytes(500),
+				},
+			},
+			wantErr: true,
+		},
+		{
+			name: "unknown field",
+			args: args{
+				series:  *common.NewMetadataByNameAndGroup("sw", "default"),
+				shardID: 0,
+				field: &Field{
+					ChunkID: common.ChunkID(2),
+					Name:    "unknown",
+					Value:   convert.Int64ToBytes(500),
+				},
+			},
+			wantErr: true,
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			s := setUpModules(tester)
+			if err := s.Insert(tt.args.series, tt.args.shardID, tt.args.field); (err != nil) != tt.wantErr {
+				t.Errorf("Insert() error = %v, wantErr %v", err, tt.wantErr)
+			}
+		})
+	}
+}
+
+func Test_service_Init(t *testing.T) {
+	tester := assert.New(t)
+	s := setUpModules(tester)
+	tester.Equal(1, len(s.meta.meta))
+	tester.Equal(2, len(s.meta.meta["sw-default"].repo))
+}
+
+func setUpModules(tester *assert.Assertions) *service {
+	_ = logger.Bootstrap()
+	repo, err := discovery.NewServiceRepo(context.TODO())
+	tester.NoError(err)
+	svc, err := NewService(context.TODO(), repo)
+	tester.NoError(err)
+	tester.NoError(svc.PreRun())
+
+	rules := []*apiv1.IndexRule{
+		{
+			Objects: []*apiv1.IndexObject{
+				{
+					Name:   "endpoint",
+					Fields: []string{"endpoint"},
+				},
+				{
+					Name:   "duration",
+					Fields: []string{"duration"},
+				},
+			},
+		},
+	}
+	seriesID := &apiv1.Metadata{
+		Name:  "sw",
+		Group: "default",
+	}
+	_, err = repo.Publish(event.TopicIndexRule, bus.NewMessage(bus.MessageID(time.Now().UnixNano()), &apiv1.IndexRuleEvent{
+		Series: seriesID,
+		Rules: []*apiv1.IndexRuleEvent_ShardedIndexRule{
+			{
+				ShardId: 0,
+				Rules:   rules,
+			},
+			{
+				ShardId: 1,
+				Rules:   rules,
+			},
+		},
+		Action: apiv1.Action_ACTION_PUT,
+		Time:   timestamppb.Now(),
+	}))
+	tester.NoError(err)
+	s, ok := svc.(*service)
+	tester.True(ok)
+	deadline := time.Now().Add(10 * time.Second)
+	for {
+		if s.meta.get(seriesID) != nil {
+			break
+		}
+		if time.Now().After(deadline) {
+			tester.Fail("timeout")
+		}
+	}
+	return s
+}
diff --git a/banyand/index/search.go b/banyand/index/search.go
index 0c829f6..7d231c0 100644
--- a/banyand/index/search.go
+++ b/banyand/index/search.go
@@ -18,7 +18,12 @@
 package index
 
 import (
+	"encoding/base64"
+	"encoding/json"
+	"strings"
+
 	"github.com/pkg/errors"
+	"github.com/rs/zerolog"
 
 	"github.com/apache/skywalking-banyandb/api/common"
 	apiv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/v1"
@@ -52,7 +57,15 @@ func (s *service) Search(series common.Metadata, shardID uint, startTime, endTim
 	if errBuild != nil {
 		return nil, err
 	}
-	return tree.execute()
+	if s.log.Should(zerolog.DebugLevel) {
+		s.log.Debug().Interface("search-tree", tree).Msg("build search tree")
+	}
+
+	result, err := tree.execute()
+	if result == nil {
+		return roaring.EmptyPostingList, err
+	}
+	return result, err
 }
 
 func buildSearchTree(searcher tsdb.Searcher, indexObject string, conditions []Condition) (searchTree, error) {
@@ -99,7 +112,7 @@ func buildSearchTree(searcher tsdb.Searcher, indexObject string, conditions []Co
 					n.addEq(key, [][]byte{v})
 				}
 			case apiv1.PairQuery_BINARY_OP_NOT_HAVING:
-				n := root.addOrNode(len(cond.Values))
+				n := root.newOrNode(len(cond.Values))
 				for _, v := range cond.Values {
 					n.addEq(key, [][]byte{v})
 				}
@@ -112,10 +125,10 @@ func buildSearchTree(searcher tsdb.Searcher, indexObject string, conditions []Co
 
 func rangeOP(op apiv1.PairQuery_BinaryOp) bool {
 	switch op {
-	case apiv1.PairQuery_BINARY_OP_GT:
-	case apiv1.PairQuery_BINARY_OP_GE:
-	case apiv1.PairQuery_BINARY_OP_LT:
-	case apiv1.PairQuery_BINARY_OP_LE:
+	case apiv1.PairQuery_BINARY_OP_GT,
+		apiv1.PairQuery_BINARY_OP_GE,
+		apiv1.PairQuery_BINARY_OP_LT,
+		apiv1.PairQuery_BINARY_OP_LE:
 		return true
 	}
 	return false
@@ -137,12 +150,11 @@ func toMap(indexObject string, condition []Condition) map[string][]Condition {
 }
 
 type logicalOP interface {
+	executable
 	merge(posting.List) error
 }
 
 type node struct {
-	logicalOP
-	executable
 	searcher tsdb.Searcher
 	value    posting.List
 	SubNodes []executable `json:"sub_nodes,omitempty"`
@@ -151,8 +163,8 @@ type node struct {
 func (n *node) newEq(key string, values [][]byte) *eq {
 	return &eq{
 		leaf: &leaf{
-			Key:      []byte(key),
-			values:   values,
+			Key:      key,
+			Values:   values,
 			searcher: n.searcher,
 		},
 	}
@@ -164,7 +176,7 @@ func (n *node) addEq(key string, values [][]byte) {
 
 func (n *node) addNot(key string, inner executable) {
 	n.SubNodes = append(n.SubNodes, &not{
-		Key:      []byte(key),
+		Key:      key,
 		searcher: n.searcher,
 		Inner:    inner,
 	})
@@ -173,7 +185,7 @@ func (n *node) addNot(key string, inner executable) {
 func (n *node) addRangeLeaf(key string) *rangeOp {
 	r := &rangeOp{
 		leaf: &leaf{
-			Key:      []byte(key),
+			Key:      key,
 			searcher: n.searcher,
 		},
 		Opts: &tsdb.RangeOpts{},
@@ -182,13 +194,17 @@ func (n *node) addRangeLeaf(key string) *rangeOp {
 	return r
 }
 
-func (n *node) addOrNode(size int) *orNode {
-	on := &orNode{
+func (n *node) newOrNode(size int) *orNode {
+	return &orNode{
 		node: &node{
 			searcher: n.searcher,
 			SubNodes: make([]executable, 0, size),
 		},
 	}
+}
+
+func (n *node) addOrNode(size int) *orNode {
+	on := n.newOrNode(size)
 	n.SubNodes = append(n.SubNodes, on)
 	return on
 }
@@ -202,7 +218,7 @@ func (n *node) pop() (executable, bool) {
 	return sn, true
 }
 
-func (n *node) execute() (posting.List, error) {
+func execute(n *node, lp logicalOP) (posting.List, error) {
 	ex, hasNext := n.pop()
 	if !hasNext {
 		return n.value, nil
@@ -213,16 +229,16 @@ func (n *node) execute() (posting.List, error) {
 	}
 	if n.value == nil {
 		n.value = r
-		return n.execute()
+		return lp.execute()
 	}
-	err = n.merge(r)
+	err = lp.merge(r)
 	if err != nil {
 		return nil, err
 	}
 	if n.value.IsEmpty() {
 		return n.value, nil
 	}
-	return n.execute()
+	return lp.execute()
 }
 
 type andNode struct {
@@ -233,6 +249,16 @@ func (an *andNode) merge(list posting.List) error {
 	return an.value.Intersect(list)
 }
 
+func (an *andNode) execute() (posting.List, error) {
+	return execute(an.node, an)
+}
+
+func (an *andNode) MarshalJSON() ([]byte, error) {
+	data := make(map[string]interface{}, 1)
+	data["and"] = an.node.SubNodes
+	return json.Marshal(data)
+}
+
 type orNode struct {
 	*node
 }
@@ -241,22 +267,32 @@ func (on *orNode) merge(list posting.List) error {
 	return on.value.Union(list)
 }
 
+func (on *orNode) execute() (posting.List, error) {
+	return execute(on.node, on)
+}
+
+func (on *orNode) MarshalJSON() ([]byte, error) {
+	data := make(map[string]interface{}, 1)
+	data["or"] = on.node.SubNodes
+	return json.Marshal(data)
+}
+
 type leaf struct {
 	executable
-	Key      []byte `json:"Key"`
-	values   [][]byte
+	Key      string
+	Values   [][]byte
 	searcher tsdb.Searcher
 }
 
 type not struct {
 	executable
-	Key      []byte `json:"key"`
+	Key      string
 	searcher tsdb.Searcher
-	Inner    executable `json:"inner,omitempty"`
+	Inner    executable
 }
 
 func (n *not) execute() (posting.List, error) {
-	all := n.searcher.MatchField(n.Key)
+	all := n.searcher.MatchField([]byte(n.Key))
 	list, err := n.Inner.execute()
 	if err != nil {
 		return nil, err
@@ -265,22 +301,59 @@ func (n *not) execute() (posting.List, error) {
 	return all, err
 }
 
+func (n *not) MarshalJSON() ([]byte, error) {
+	data := make(map[string]interface{}, 1)
+	data["not"] = n.Inner
+	return json.Marshal(data)
+}
+
 type eq struct {
 	*leaf
 }
 
 func (eq *eq) execute() (posting.List, error) {
 	return eq.searcher.MatchTerms(&tsdb.Field{
-		Name:  eq.Key,
-		Value: bytes.Join(eq.values...),
+		Name:  []byte(eq.Key),
+		Value: bytes.Join(eq.Values...),
 	}), nil
 }
 
+func (eq *eq) MarshalJSON() ([]byte, error) {
+	data := make(map[string]interface{}, 1)
+	data["eq"] = eq.leaf
+	return json.Marshal(data)
+}
+
 type rangeOp struct {
 	*leaf
-	Opts *tsdb.RangeOpts `json:"opts"`
+	Opts *tsdb.RangeOpts
 }
 
 func (r *rangeOp) execute() (posting.List, error) {
-	return r.searcher.Range(r.Key, r.Opts), nil
+	return r.searcher.Range([]byte(r.Key), r.Opts), nil
+}
+
+func (r *rangeOp) MarshalJSON() ([]byte, error) {
+	data := make(map[string]interface{}, 1)
+	var builder strings.Builder
+	if r.Opts.Lower != nil {
+		if r.Opts.IncludesLower {
+			builder.WriteString("[")
+		} else {
+			builder.WriteString("(")
+		}
+	}
+	builder.WriteString(base64.StdEncoding.EncodeToString(r.Opts.Lower))
+	builder.WriteString(",")
+	builder.WriteString(base64.StdEncoding.EncodeToString(r.Opts.Upper))
+	if r.Opts.Upper != nil {
+		if r.Opts.IncludesUpper {
+			builder.WriteString("]")
+		} else {
+			builder.WriteString(")")
+		}
+	}
+	data["key"] = r.Key
+	data["range"] = builder.String()
+	return json.Marshal(data)
 }
diff --git a/banyand/index/search_test.go b/banyand/index/search_test.go
new file mode 100644
index 0000000..d9eb8d9
--- /dev/null
+++ b/banyand/index/search_test.go
@@ -0,0 +1,255 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package index
+
+import (
+	"math"
+	"testing"
+
+	"github.com/stretchr/testify/assert"
+
+	"github.com/apache/skywalking-banyandb/api/common"
+	apiv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/v1"
+	"github.com/apache/skywalking-banyandb/pkg/convert"
+	"github.com/apache/skywalking-banyandb/pkg/posting"
+	"github.com/apache/skywalking-banyandb/pkg/posting/roaring"
+)
+
+func Test_service_Search(t *testing.T) {
+	tester := assert.New(t)
+	type args struct {
+		indexObjectName string
+		conditions      []Condition
+	}
+	tests := []struct {
+		name    string
+		args    args
+		want    posting.List
+		wantErr bool
+	}{
+		{
+			name: "str equal",
+			args: args{
+				indexObjectName: "endpoint",
+				conditions: []Condition{
+					{
+						Key:    "endpoint",
+						Op:     apiv1.PairQuery_BINARY_OP_EQ,
+						Values: [][]byte{[]byte("/product")},
+					},
+				},
+			},
+			want: roaring.NewPostingListWithInitialData(1),
+		},
+		{
+			name: "str not equal",
+			args: args{
+				indexObjectName: "endpoint",
+				conditions: []Condition{
+					{
+						Key:    "endpoint",
+						Op:     apiv1.PairQuery_BINARY_OP_NE,
+						Values: [][]byte{[]byte("/product")},
+					},
+				},
+			},
+			want: roaring.NewPostingListWithInitialData(2, 3),
+		},
+		{
+			name: "str having",
+			args: args{
+				indexObjectName: "endpoint",
+				conditions: []Condition{
+					{
+						Key:    "endpoint",
+						Op:     apiv1.PairQuery_BINARY_OP_HAVING,
+						Values: [][]byte{[]byte("/product"), []byte("/sales")},
+					},
+				},
+			},
+			want: roaring.NewPostingListWithInitialData(1, 3),
+		},
+		{
+			name: "str not having",
+			args: args{
+				indexObjectName: "endpoint",
+				conditions: []Condition{
+					{
+						Key:    "endpoint",
+						Op:     apiv1.PairQuery_BINARY_OP_NOT_HAVING,
+						Values: [][]byte{[]byte("/product"), []byte("/sales")},
+					},
+				},
+			},
+			want: roaring.NewPostingListWithInitialData(2),
+		},
+		{
+			name: "int equal",
+			args: args{
+				indexObjectName: "duration",
+				conditions: []Condition{
+					{
+						Key:    "duration",
+						Op:     apiv1.PairQuery_BINARY_OP_EQ,
+						Values: [][]byte{convert.Int64ToBytes(500)},
+					},
+				},
+			},
+			want: roaring.NewPostingListWithInitialData(12),
+		},
+		{
+			name: "int not equal",
+			args: args{
+				indexObjectName: "duration",
+				conditions: []Condition{
+					{
+						Key:    "duration",
+						Op:     apiv1.PairQuery_BINARY_OP_NE,
+						Values: [][]byte{convert.Int64ToBytes(500)},
+					},
+				},
+			},
+			want: roaring.NewPostingListWithInitialData(11, 13, 14),
+		},
+		{
+			name: "int having",
+			args: args{
+				indexObjectName: "duration",
+				conditions: []Condition{
+					{
+						Key:    "duration",
+						Op:     apiv1.PairQuery_BINARY_OP_HAVING,
+						Values: [][]byte{convert.Int64ToBytes(500), convert.Int64ToBytes(50)},
+					},
+				},
+			},
+			want: roaring.NewPostingListWithInitialData(11, 12),
+		},
+		{
+			name: "int not having",
+			args: args{
+				indexObjectName: "duration",
+				conditions: []Condition{
+					{
+						Key:    "duration",
+						Op:     apiv1.PairQuery_BINARY_OP_NOT_HAVING,
+						Values: [][]byte{convert.Int64ToBytes(500), convert.Int64ToBytes(50)},
+					},
+				},
+			},
+			want: roaring.NewPostingListWithInitialData(13, 14),
+		},
+		{
+			name: "int in range",
+			args: args{
+				indexObjectName: "duration",
+				conditions: []Condition{
+					{
+						Key:    "duration",
+						Op:     apiv1.PairQuery_BINARY_OP_GT,
+						Values: [][]byte{convert.Int64ToBytes(50)},
+					},
+					{
+						Key:    "duration",
+						Op:     apiv1.PairQuery_BINARY_OP_LT,
+						Values: [][]byte{convert.Int64ToBytes(5000)},
+					},
+				},
+			},
+			want: roaring.NewPostingListWithInitialData(13, 12),
+		},
+		{
+			name: "int includes edges",
+			args: args{
+				indexObjectName: "duration",
+				conditions: []Condition{
+					{
+						Key:    "duration",
+						Op:     apiv1.PairQuery_BINARY_OP_GE,
+						Values: [][]byte{convert.Int64ToBytes(50)},
+					},
+					{
+						Key:    "duration",
+						Op:     apiv1.PairQuery_BINARY_OP_LE,
+						Values: [][]byte{convert.Int64ToBytes(5000)},
+					},
+				},
+			},
+			want: roaring.NewPostingListWithInitialData(13, 12, 11, 14),
+		},
+	}
+	s := setUpModules(tester)
+	setupData(tester, s)
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			got, err := s.Search(*common.NewMetadataByNameAndGroup("sw", "default"), 0, 0, math.MaxInt64, tt.args.indexObjectName, tt.args.conditions)
+			if (err != nil) != tt.wantErr {
+				t.Errorf("Search() error = %v, wantErr %v", err, tt.wantErr)
+				return
+			}
+			if !got.Equal(tt.want) {
+				t.Errorf("Search() got = %v, want %v", got.ToSlice(), tt.want.ToSlice())
+			}
+		})
+	}
+}
+
+func setupData(tester *assert.Assertions, s *service) {
+	fields := []*Field{
+		{
+			ChunkID: common.ChunkID(1),
+			Name:    "endpoint",
+			Value:   []byte("/product"),
+		},
+		{
+			ChunkID: common.ChunkID(2),
+			Name:    "endpoint",
+			Value:   []byte("/home"),
+		},
+		{
+			ChunkID: common.ChunkID(3),
+			Name:    "endpoint",
+			Value:   []byte("/sales"),
+		},
+		{
+			ChunkID: common.ChunkID(11),
+			Name:    "duration",
+			Value:   convert.Int64ToBytes(50),
+		},
+		{
+			ChunkID: common.ChunkID(12),
+			Name:    "duration",
+			Value:   convert.Int64ToBytes(500),
+		},
+		{
+			ChunkID: common.ChunkID(13),
+			Name:    "duration",
+			Value:   convert.Int64ToBytes(100),
+		},
+		{
+			ChunkID: common.ChunkID(14),
+			Name:    "duration",
+			Value:   convert.Int64ToBytes(5000),
+		},
+	}
+	for _, field := range fields {
+		if err := s.Insert(*common.NewMetadataByNameAndGroup("sw", "default"), 0, field); err != nil {
+			tester.NoError(err)
+		}
+	}
+}
diff --git a/banyand/index/tsdb/mem.go b/banyand/index/tsdb/mem.go
index a96b107..2e2ea9b 100644
--- a/banyand/index/tsdb/mem.go
+++ b/banyand/index/tsdb/mem.go
@@ -79,7 +79,7 @@ func (m *MemTable) MatchTerms(field *Field) (list posting.List) {
 	if !ok {
 		return roaring.EmptyPostingList
 	}
-	return fieldsValues.value.get(field.Value)
+	return fieldsValues.value.get(field.Value).Clone()
 }
 
 func (m *MemTable) Range(fieldName []byte, opts *RangeOpts) (list posting.List) {
diff --git a/banyand/index/tsdb/term_map.go b/banyand/index/tsdb/term_map.go
index f325282..20bc492 100644
--- a/banyand/index/tsdb/term_map.go
+++ b/banyand/index/tsdb/term_map.go
@@ -92,6 +92,8 @@ func (p *postingMap) getRange(opts *RangeOpts) posting.List {
 		}
 		return roaring.EmptyPostingList
 	}
+	p.mutex.RLock()
+	defer p.mutex.RUnlock()
 	keys := make(Asc, 0, len(p.repo))
 	for _, v := range p.repo {
 		keys = append(keys, v.key)
diff --git a/banyand/query/processor_test.go b/banyand/query/processor_test.go
index 64e1bb3..7136e0e 100644
--- a/banyand/query/processor_test.go
+++ b/banyand/query/processor_test.go
@@ -24,6 +24,7 @@ import (
 	"testing"
 	"time"
 
+	"github.com/golang/mock/gomock"
 	googleUUID "github.com/google/uuid"
 	"github.com/stretchr/testify/require"
 
@@ -31,6 +32,7 @@ import (
 	"github.com/apache/skywalking-banyandb/api/event"
 	v1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/v1"
 	"github.com/apache/skywalking-banyandb/banyand/discovery"
+	"github.com/apache/skywalking-banyandb/banyand/index"
 	"github.com/apache/skywalking-banyandb/banyand/series"
 	"github.com/apache/skywalking-banyandb/banyand/series/trace"
 	"github.com/apache/skywalking-banyandb/banyand/storage"
@@ -49,7 +51,7 @@ type entityValue struct {
 	items      []interface{}
 }
 
-func setupServices(tester *require.Assertions) (discovery.ServiceRepo, series.Service, func()) {
+func setupServices(t *testing.T, tester *require.Assertions) (discovery.ServiceRepo, series.Service, func()) {
 	// Bootstrap logger system
 	tester.NoError(logger.Init(logger.Logging{
 		Env:   "dev",
@@ -70,7 +72,10 @@ func setupServices(tester *require.Assertions) (discovery.ServiceRepo, series.Se
 	tester.NoError(db.FlagSet().Parse([]string{"--root-path=" + rootPath}))
 
 	// Init `Trace` module
-	traceSvc, err := trace.NewService(context.TODO(), db, repo, nil)
+	ctrl := gomock.NewController(t)
+	mockIndex := index.NewMockService(ctrl)
+	mockIndex.EXPECT().Insert(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes()
+	traceSvc, err := trace.NewService(context.TODO(), db, repo, mockIndex)
 	tester.NoError(err)
 
 	// Init `Query` module
@@ -257,7 +262,7 @@ func TestQueryProcessor(t *testing.T) {
 	tester := require.New(t)
 
 	// setup services
-	repo, traceSvc, gracefulStop := setupServices(tester)
+	repo, traceSvc, gracefulStop := setupServices(t, tester)
 	defer gracefulStop()
 
 	baseTs := time.Now()
diff --git a/pkg/convert/number.go b/pkg/convert/number.go
index c06d7a4..fa2d688 100644
--- a/pkg/convert/number.go
+++ b/pkg/convert/number.go
@@ -43,6 +43,10 @@ func Uint32ToBytes(u uint32) []byte {
 	return bs
 }
 
+func BytesToInt64(b []byte) int64 {
+	return int64(binary.BigEndian.Uint64(b))
+}
+
 func BytesToUint64(b []byte) uint64 {
 	return binary.BigEndian.Uint64(b)
 }
diff --git a/pkg/logger/logger.go b/pkg/logger/logger.go
index 1a08dc5..0651e41 100644
--- a/pkg/logger/logger.go
+++ b/pkg/logger/logger.go
@@ -40,3 +40,10 @@ func (l *Logger) Named(name string) *Logger {
 	subLogger := root.Logger.With().Str("module", module).Logger()
 	return &Logger{module: module, Logger: &subLogger}
 }
+
+func (l *Logger) Should(lvl zerolog.Level) bool {
+	if lvl < l.GetLevel() || lvl < zerolog.GlobalLevel() {
+		return false
+	}
+	return true
+}
diff --git a/pkg/posting/posting.go b/pkg/posting/posting.go
index 3812162..2d7ad16 100644
--- a/pkg/posting/posting.go
+++ b/pkg/posting/posting.go
@@ -58,6 +58,8 @@ type List interface {
 	RemoveRange(min, max common.ChunkID) error
 
 	Reset()
+
+	ToSlice() []common.ChunkID
 }
 
 type Iterator interface {
diff --git a/pkg/posting/roaring/roaring.go b/pkg/posting/roaring/roaring.go
index 0593813..0b1c9b6 100644
--- a/pkg/posting/roaring/roaring.go
+++ b/pkg/posting/roaring/roaring.go
@@ -194,3 +194,13 @@ func (it *roaringIterator) Close() error {
 	it.closed = true
 	return nil
 }
+
+func (p *postingsList) ToSlice() []common.ChunkID {
+	iter := p.Iterator()
+	defer iter.Close()
+	s := make([]common.ChunkID, 0, p.Len())
+	for iter.Next() {
+		s = append(s, iter.Current())
+	}
+	return s
+}