You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ha...@apache.org on 2021/09/07 10:32:04 UTC

[skywalking-banyandb] branch time-series updated (31fb8cf -> 1bbd271)

This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a change to branch time-series
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git.


    from 31fb8cf  Finish stream module
     new cd628ea  Add stream query test
     new e8df5de  Finish test
     new 1bbd271  Fix test issues

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 banyand/kv/badger.go                         |  39 +-
 banyand/kv/kv.go                             |   3 +-
 banyand/stream/stream_query.go               |  40 +-
 banyand/stream/stream_query_test.go          | 532 +++++++++++++++++++++++++++
 banyand/stream/stream_write.go               |  29 +-
 banyand/stream/stream_write_test.go          |   2 +-
 banyand/stream/testdata/global_index.json    |  64 ++++
 banyand/stream/testdata/multiple_shards.json |  64 ++++
 banyand/stream/testdata/shard0.json          |  18 -
 banyand/tsdb/block.go                        |  14 +-
 banyand/tsdb/indexdb.go                      |   6 +-
 banyand/tsdb/series.go                       |  94 +++--
 banyand/tsdb/series_seek.go                  |  16 +-
 banyand/tsdb/series_seek_filter.go           |   9 +-
 banyand/tsdb/series_seek_sort.go             |  75 ++--
 banyand/tsdb/series_write.go                 |  25 +-
 banyand/tsdb/seriesdb.go                     |  43 ++-
 banyand/tsdb/seriesdb_test.go                |  32 +-
 banyand/tsdb/shard.go                        |   4 +
 banyand/tsdb/tsdb.go                         |   3 +-
 banyand/tsdb/tsdb_test.go                    |   2 +-
 pkg/index/index.go                           |   4 -
 pkg/index/inverted/field_map.go              |  23 +-
 pkg/index/inverted/inverted.go               |   5 -
 pkg/index/inverted/mem.go                    |  14 +-
 pkg/index/inverted/mem_test.go               | 129 ++-----
 pkg/index/search.go                          |   4 +-
 27 files changed, 986 insertions(+), 307 deletions(-)
 create mode 100644 banyand/stream/stream_query_test.go
 create mode 100644 banyand/stream/testdata/global_index.json
 create mode 100644 banyand/stream/testdata/multiple_shards.json
 delete mode 100644 banyand/stream/testdata/shard0.json

[skywalking-banyandb] 01/03: Add stream query test

Posted by ha...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch time-series
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit cd628eaa55cba84f110d92940e4badfdd3dd3fff
Author: Gao Hongtao <ha...@gmail.com>
AuthorDate: Tue Sep 7 11:15:08 2021 +0800

    Add stream query test
    
    Signed-off-by: Gao Hongtao <ha...@gmail.com>
---
 banyand/kv/badger.go                         |  36 ++--
 banyand/kv/kv.go                             |   3 +-
 banyand/stream/stream_query.go               |  35 +---
 banyand/stream/stream_query_test.go          | 301 +++++++++++++++++++++++++++
 banyand/stream/stream_write.go               |  29 ++-
 banyand/stream/stream_write_test.go          |   2 +-
 banyand/stream/testdata/multiple_shards.json |  64 ++++++
 banyand/stream/testdata/shard0.json          |  18 --
 banyand/stream/testdata/single_series.json   |  51 +++++
 banyand/tsdb/series.go                       |  74 +++++--
 banyand/tsdb/series_seek.go                  |   7 +-
 banyand/tsdb/series_seek_filter.go           |   3 +
 banyand/tsdb/series_seek_sort.go             |  71 ++++---
 banyand/tsdb/series_write.go                 |   5 +-
 banyand/tsdb/seriesdb.go                     |  38 +++-
 banyand/tsdb/seriesdb_test.go                |   2 +-
 banyand/tsdb/shard.go                        |   4 +
 banyand/tsdb/tsdb.go                         |   1 +
 18 files changed, 607 insertions(+), 137 deletions(-)

diff --git a/banyand/kv/badger.go b/banyand/kv/badger.go
index 0dfbbe7..131032d 100644
--- a/banyand/kv/badger.go
+++ b/banyand/kv/badger.go
@@ -27,7 +27,9 @@ import (
 	"github.com/dgraph-io/badger/v3/y"
 	"go.uber.org/multierr"
 
+	"github.com/apache/skywalking-banyandb/api/common"
 	modelv2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v2"
+	"github.com/apache/skywalking-banyandb/pkg/convert"
 	"github.com/apache/skywalking-banyandb/pkg/index"
 	"github.com/apache/skywalking-banyandb/pkg/index/posting"
 	"github.com/apache/skywalking-banyandb/pkg/index/posting/roaring"
@@ -196,15 +198,11 @@ func (i *iterator) Seek(key []byte) {
 }
 
 func (i *iterator) Key() []byte {
-	return i.delegated.Key()
+	return y.ParseKey(i.delegated.Key())
 }
 
-func (i *iterator) Val() posting.List {
-	list := roaring.NewPostingList()
-	data := make([]byte, len(i.delegated.Value().Value))
-	copy(data, i.delegated.Value().Value)
-	_ = list.Unmarshall(data)
-	return list
+func (i *iterator) Val() []byte {
+	return y.Copy(i.delegated.Value().Value)
 }
 
 func (i *iterator) Valid() bool {
@@ -286,22 +284,30 @@ var _ index.FieldIterator = (*fIterator)(nil)
 type fIterator struct {
 	init     bool
 	delegate Iterator
+	curr     *index.PostingValue
 }
 
 func (f *fIterator) Next() bool {
-	if f.init {
-		f.delegate.Next()
-	} else {
+	if !f.init {
 		f.init = true
+		f.delegate.Rewind()
 	}
-	return f.delegate.Valid()
+	if !f.delegate.Valid() {
+		return false
+	}
+	pv := &index.PostingValue{
+		Key:   f.delegate.Key(),
+		Value: roaring.NewPostingListWithInitialData(convert.BytesToUint64(f.delegate.Val())),
+	}
+	for ; f.delegate.Valid() && bytes.Equal(pv.Key, f.delegate.Key()); f.delegate.Next() {
+		pv.Value.Insert(common.ItemID(convert.BytesToUint64(f.delegate.Val())))
+	}
+	f.curr = pv
+	return true
 }
 
 func (f *fIterator) Val() *index.PostingValue {
-	return &index.PostingValue{
-		Key:   f.delegate.Key(),
-		Value: f.delegate.Val(),
-	}
+	return f.curr
 }
 
 func (f *fIterator) Close() error {
diff --git a/banyand/kv/kv.go b/banyand/kv/kv.go
index 5cea6ed..4e17456 100644
--- a/banyand/kv/kv.go
+++ b/banyand/kv/kv.go
@@ -26,7 +26,6 @@ import (
 	"github.com/pkg/errors"
 
 	"github.com/apache/skywalking-banyandb/pkg/index"
-	"github.com/apache/skywalking-banyandb/pkg/index/posting"
 	"github.com/apache/skywalking-banyandb/pkg/logger"
 	posting2 "github.com/apache/skywalking-banyandb/pkg/posting"
 )
@@ -109,7 +108,7 @@ type Iterator interface {
 	Rewind()
 	Seek(key []byte)
 	Key() []byte
-	Val() posting.List
+	Val() []byte
 	Valid() bool
 	Close() error
 }
diff --git a/banyand/stream/stream_query.go b/banyand/stream/stream_query.go
index ac7d038..3402665 100644
--- a/banyand/stream/stream_query.go
+++ b/banyand/stream/stream_query.go
@@ -18,8 +18,6 @@
 package stream
 
 import (
-	"bytes"
-
 	"github.com/golang/protobuf/proto"
 	"github.com/pkg/errors"
 
@@ -41,37 +39,22 @@ type Query interface {
 	Stream(stream *commonv2.Metadata) (Stream, error)
 }
 
-type EqualCondition struct {
-	tag   string
-	value []byte
-}
-
 type Stream interface {
-	Shards(equalConditions []EqualCondition) ([]tsdb.Shard, error)
+	Shards(entity tsdb.Entity) ([]tsdb.Shard, error)
 }
 
 var _ Stream = (*stream)(nil)
 
-func (s *stream) Shards(equalConditions []EqualCondition) ([]tsdb.Shard, error) {
-	entityItemLen := len(s.entityIndex)
-	entityItems := make([][]byte, entityItemLen)
-	var entityCount int
-	for _, ec := range equalConditions {
-		fi, ti, tag := s.findTagByName(ec.tag)
-		if tag == nil {
-			return nil, ErrTagNotExist
-		}
-		for i, eIndex := range s.entityIndex {
-			if eIndex.family == fi && eIndex.tag == ti {
-				entityItems[i] = ec.value
-				entityCount++
-			}
-		}
-	}
-	if entityCount < entityItemLen {
+func (s *stream) Shards(entity tsdb.Entity) ([]tsdb.Shard, error) {
+	if len(entity) < 1 {
 		return s.db.Shards(), nil
 	}
-	shardID, err := partition.ShardID(bytes.Join(entityItems, nil), s.schema.GetShardNum())
+	for _, e := range entity {
+		if e == nil {
+			return s.db.Shards(), nil
+		}
+	}
+	shardID, err := partition.ShardID(entity.Marshal(), s.schema.GetShardNum())
 	if err != nil {
 		return nil, err
 	}
diff --git a/banyand/stream/stream_query_test.go b/banyand/stream/stream_query_test.go
new file mode 100644
index 0000000..8b6cc12
--- /dev/null
+++ b/banyand/stream/stream_query_test.go
@@ -0,0 +1,301 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package stream
+
+import (
+	"bytes"
+	"embed"
+	_ "embed"
+	"encoding/base64"
+	"encoding/json"
+	"fmt"
+	"sort"
+	"strconv"
+	"testing"
+	"time"
+
+	"github.com/golang/protobuf/jsonpb"
+	"github.com/stretchr/testify/assert"
+	"google.golang.org/protobuf/types/known/timestamppb"
+
+	"github.com/apache/skywalking-banyandb/api/common"
+	modelv2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v2"
+	streamv2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v2"
+	"github.com/apache/skywalking-banyandb/banyand/tsdb"
+	"github.com/apache/skywalking-banyandb/pkg/convert"
+	"github.com/apache/skywalking-banyandb/pkg/partition"
+)
+
+func Test_Stream_SelectShard(t *testing.T) {
+	tester := assert.New(t)
+	s, deferFunc := setup(tester)
+	defer deferFunc()
+	_ = setupQueryData(tester, "multiple_shards.json", s)
+	tests := []struct {
+		name         string
+		entity       tsdb.Entity
+		wantShardNum int
+		wantErr      bool
+	}{
+		{
+			name:         "all shards",
+			wantShardNum: 2,
+		},
+		{
+			name:         "select a shard",
+			entity:       tsdb.Entity{tsdb.Entry("webapp_id"), tsdb.Entry("10.0.0.1_id"), convert.Int64ToBytes(0)},
+			wantShardNum: 1,
+		},
+		{
+			name:         "select shards",
+			entity:       tsdb.Entity{tsdb.Entry("webapp_id"), tsdb.AnyEntry, convert.Int64ToBytes(0)},
+			wantShardNum: 2,
+		},
+	}
+
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			shards, err := s.Shards(tt.entity)
+			if tt.wantErr {
+				tester.Error(err)
+				return
+			}
+			tester.NoError(err)
+			tester.Equal(tt.wantShardNum, len(shards))
+		})
+	}
+
+}
+
+func Test_Stream_Series(t *testing.T) {
+	tester := assert.New(t)
+	s, deferFunc := setup(tester)
+	defer deferFunc()
+	baseTime := setupQueryData(tester, "multiple_shards.json", s)
+	type args struct {
+		entity tsdb.Entity
+	}
+	type shardStruct struct {
+		id       common.ShardID
+		location []string
+		elements []string
+	}
+	type want struct {
+		shards []shardStruct
+	}
+
+	tests := []struct {
+		name    string
+		args    args
+		want    want
+		wantErr bool
+	}{
+		{
+			name: "all",
+			args: args{
+				entity: tsdb.Entity{tsdb.AnyEntry, tsdb.AnyEntry, tsdb.AnyEntry},
+			},
+			want: want{
+				shards: []shardStruct{
+					{
+						id:       0,
+						location: []string{"series_12243341348514563931", "data_flow_0"},
+						elements: []string{"1"},
+					},
+					{
+						id:       0,
+						location: []string{"series_1671844747554927007", "data_flow_0"},
+						elements: []string{"2"},
+					},
+					{
+						id:       1,
+						location: []string{"series_2374367181827824198", "data_flow_0"},
+						elements: []string{"5", "3"},
+					},
+					{
+						id:       1,
+						location: []string{"series_8429137420168685297", "data_flow_0"},
+						elements: []string{"4"},
+					},
+				},
+			},
+		},
+		{
+			name: "find series by service_id and instance_id",
+			args: args{
+				entity: tsdb.Entity{tsdb.Entry("webapp_id"), tsdb.Entry("10.0.0.1_id"), tsdb.AnyEntry},
+			},
+			want: want{
+				shards: []shardStruct{
+					{
+						id:       0,
+						location: []string{"series_12243341348514563931", "data_flow_0"},
+						elements: []string{"1"},
+					},
+					{
+						id:       1,
+						location: []string{"series_2374367181827824198", "data_flow_0"},
+						elements: []string{"5", "3"},
+					},
+				},
+			},
+		},
+		{
+			name: "find a series",
+			args: args{
+				entity: tsdb.Entity{tsdb.Entry("webapp_id"), tsdb.Entry("10.0.0.1_id"), convert.Uint64ToBytes(1)},
+			},
+			want: want{
+				shards: []shardStruct{
+					{
+						id:       1,
+						location: []string{"series_2374367181827824198", "data_flow_0"},
+						elements: []string{"5", "3"},
+					},
+				},
+			},
+		},
+	}
+
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			shards, err := s.Shards(tt.args.entity)
+			tester.NoError(err)
+			got := want{
+				shards: []shardStruct{},
+			}
+
+			for _, shard := range shards {
+				seriesList, err := shard.Series().List(tsdb.NewPath(tt.args.entity))
+				tester.NoError(err)
+				for _, series := range seriesList {
+					func(g *want) {
+						sp, err := series.Span(tsdb.NewTimeRangeDuration(baseTime, 1*time.Hour))
+						defer func(sp tsdb.SeriesSpan) {
+							_ = sp.Close()
+						}(sp)
+						tester.NoError(err)
+						seeker, err := sp.SeekerBuilder().Build()
+						tester.NoError(err)
+						iter, err := seeker.Seek()
+						tester.NoError(err)
+						for dataFlowID, iterator := range iter {
+							var elements []string
+							for iterator.Next() {
+								tagFamily, err := s.ParseTagFamily("searchable", iterator.Val())
+								tester.NoError(err)
+								for _, tag := range tagFamily.GetTags() {
+									if tag.GetKey() == "trace_id" {
+										elements = append(elements, tag.GetValue().GetStr().GetValue())
+									}
+								}
+							}
+							_ = iterator.Close()
+							g.shards = append(g.shards, shardStruct{
+								id: shard.ID(),
+								location: []string{
+									fmt.Sprintf("series_%v", series.ID()),
+									"data_flow_" + strconv.Itoa(dataFlowID),
+								},
+								elements: elements,
+							})
+						}
+
+					}(&got)
+				}
+			}
+			if tt.wantErr {
+				tester.Error(err)
+				return
+			}
+			tester.NoError(err)
+			sort.SliceStable(got.shards, func(i, j int) bool {
+				a := got.shards[i]
+				b := got.shards[j]
+				if a.id > b.id {
+					return false
+				}
+				for i, al := range a.location {
+					bl := b.location[i]
+					if bytes.Compare([]byte(al), []byte(bl)) > 0 {
+						return false
+					}
+				}
+				return true
+			})
+			tester.Equal(tt.want, got)
+		})
+	}
+
+}
+
+//go:embed testdata/*.json
+var dataFS embed.FS
+
+func setupQueryData(t *assert.Assertions, dataFile string, stream *stream) (baseTime time.Time) {
+	var templates []interface{}
+	baseTime = time.Now()
+	content, err := dataFS.ReadFile("testdata/" + dataFile)
+	t.NoError(err)
+	t.NoError(json.Unmarshal(content, &templates))
+	bb, _ := base64.StdEncoding.DecodeString("YWJjMTIzIT8kKiYoKSctPUB+")
+	for i, template := range templates {
+		rawSearchTagFamily, err := json.Marshal(template)
+		t.NoError(err)
+		searchTagFamily := &streamv2.ElementValue_TagFamily{}
+		t.NoError(jsonpb.UnmarshalString(string(rawSearchTagFamily), searchTagFamily))
+		e := &streamv2.ElementValue{
+			ElementId: strconv.Itoa(i),
+			Timestamp: timestamppb.New(baseTime.Add(time.Millisecond * time.Duration(i))),
+			TagFamilies: []*streamv2.ElementValue_TagFamily{
+				{
+					Tags: []*modelv2.TagValue{
+						{
+							Value: &modelv2.TagValue_BinaryData{
+								BinaryData: bb,
+							},
+						},
+					},
+				},
+			},
+		}
+		e.TagFamilies = append(e.TagFamilies, searchTagFamily)
+		entity, err := stream.buildEntity(e)
+		t.NoError(err)
+		shardID, err := partition.ShardID(entity.Marshal(), stream.schema.GetShardNum())
+		t.NoError(err)
+		itemID, err := stream.write(common.ShardID(shardID), e)
+		t.NoError(err)
+		sa, err := stream.Shards(entity)
+		for _, shard := range sa {
+			se, err := shard.Series().Get(entity)
+			t.NoError(err)
+			for {
+				item, closer, _ := se.Get(*itemID)
+				rawTagFamily, _ := item.Val("searchable")
+				if len(rawTagFamily) > 0 {
+					_ = closer.Close()
+					break
+				}
+				_ = closer.Close()
+			}
+
+		}
+	}
+	return baseTime
+}
diff --git a/banyand/stream/stream_write.go b/banyand/stream/stream_write.go
index 2bcd0bf..fa80dc3 100644
--- a/banyand/stream/stream_write.go
+++ b/banyand/stream/stream_write.go
@@ -40,34 +40,34 @@ var (
 	ErrUnsupportedTagForIndexField = errors.New("the tag type(for example, null) can not be as the index field value")
 )
 
-func (s *stream) write(shardID common.ShardID, value *streamv2.ElementValue) error {
+func (s *stream) write(shardID common.ShardID, value *streamv2.ElementValue) (*tsdb.GlobalItemID, error) {
 	sm := s.schema
 	fLen := len(value.GetTagFamilies())
 	if fLen < 1 {
-		return errors.Wrap(ErrMalformedElement, "no tag family")
+		return nil, errors.Wrap(ErrMalformedElement, "no tag family")
 	}
 	if fLen > len(sm.TagFamilies) {
-		return errors.Wrap(ErrMalformedElement, "tag family number is more than expected")
+		return nil, errors.Wrap(ErrMalformedElement, "tag family number is more than expected")
 	}
 	shard, err := s.db.Shard(shardID)
 	if err != nil {
-		return err
+		return nil, err
 	}
 	entity, err := s.buildEntity(value)
 	if err != nil {
-		return err
+		return nil, err
 	}
 	series, err := shard.Series().Get(entity)
 	if err != nil {
-		return err
+		return nil, err
 	}
 	t := value.GetTimestamp().AsTime()
-	wp, err := series.Span(tsdb.NewTimeRange(t, 0))
+	wp, err := series.Span(tsdb.NewTimeRangeDuration(t, 0))
 	if err != nil {
 		if wp != nil {
 			_ = wp.Close()
 		}
-		return err
+		return nil, err
 	}
 	writeFn := func() (tsdb.Writer, error) {
 		builder := wp.WriterBuilder().Time(t)
@@ -97,12 +97,18 @@ func (s *stream) write(shardID common.ShardID, value *streamv2.ElementValue) err
 			return nil, errWrite
 		}
 		_, errWrite = writer.Write()
+		s.l.Debug().
+			Time("ts", t).
+			Int("ts_nano", t.Nanosecond()).
+			Interface("data", value).
+			Uint64("series_id", uint64(series.ID())).
+			Msg("write stream")
 		return writer, errWrite
 	}
 	writer, err := writeFn()
 	if err != nil {
 		_ = wp.Close()
-		return err
+		return nil, err
 	}
 	m := indexMessage{
 		localWriter: writer,
@@ -117,7 +123,8 @@ func (s *stream) write(shardID common.ShardID, value *streamv2.ElementValue) err
 		}()
 		s.indexCh <- m
 	}(m)
-	return err
+	itemID := writer.ItemID()
+	return &itemID, err
 }
 
 func getIndexValue(ruleIndex indexRule, value *streamv2.ElementValue) (val []byte, err error) {
@@ -204,7 +211,7 @@ func (w *writeCallback) Rev(message bus.Message) (resp bus.Message) {
 	}
 	sm := writeEvent.WriteRequest.GetMetadata()
 	id := formatStreamID(sm.GetName(), sm.GetGroup())
-	err := w.schemaMap[id].write(common.ShardID(writeEvent.ShardID), writeEvent.WriteRequest.GetElement())
+	_, err := w.schemaMap[id].write(common.ShardID(writeEvent.ShardID), writeEvent.WriteRequest.GetElement())
 	if err != nil {
 		w.l.Debug().Err(err)
 	}
diff --git a/banyand/stream/stream_write_test.go b/banyand/stream/stream_write_test.go
index 1853a7f..0ffe0d5 100644
--- a/banyand/stream/stream_write_test.go
+++ b/banyand/stream/stream_write_test.go
@@ -197,7 +197,7 @@ func Test_Stream_Write(t *testing.T) {
 
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
-			err := s.write(common.ShardID(tt.args.shardID), tt.args.ele)
+			_, err := s.write(common.ShardID(tt.args.shardID), tt.args.ele)
 			if tt.wantErr {
 				tester.Error(err)
 				return
diff --git a/banyand/stream/testdata/multiple_shards.json b/banyand/stream/testdata/multiple_shards.json
new file mode 100644
index 0000000..791c7c9
--- /dev/null
+++ b/banyand/stream/testdata/multiple_shards.json
@@ -0,0 +1,64 @@
+[
+  {
+    "tags": [
+      {"str":{"value": "1"}},
+      {"int":{"value": 0}},
+      {"str":{"value": "webapp_id"}},
+      {"str":{"value": "10.0.0.1_id"}},
+      {"str":{"value": "/home_id"}},
+      {"int":{"value": 1000}},
+      {"int":{"value": 1622933202000000000}}
+    ]
+  },
+  {
+    "tags": [
+      {"str":{"value": "2"}},
+      {"int":{"value": 0}},
+      {"str":{"value": "webapp_id"}},
+      {"str":{"value": "10.0.0.3_id"}},
+      {"str":{"value": "/home_id"}},
+      {"int":{"value": 500}},
+      {"int":{"value": 1622933202000000000}}
+    ]
+  },
+  {
+    "tags": [
+      {"str":{"value": "3"}},
+      {"int":{"value": 1}},
+      {"str":{"value": "webapp_id"}},
+      {"str":{"value": "10.0.0.1_id"}},
+      {"str":{"value": "/home_id"}},
+      {"int":{"value": 30}},
+      {"int":{"value": 1622933202000000000}},
+      {"str":{"value": "GET"}},
+      {"str":{"value": "500"}}
+    ]
+  },
+  {
+    "tags": [
+      {"str":{"value": "4"}},
+      {"int":{"value": 1}},
+      {"str":{"value": "webapp_id"}},
+      {"str":{"value": "10.0.0.5_id"}},
+      {"str":{"value": "/home_id"}},
+      {"int":{"value": 60}},
+      {"int":{"value": 1622933202000000000}},
+      {"str":{"value": "GET"}},
+      {"str":{"value": "400"}}
+    ]
+  },
+  {
+    "tags": [
+      {"str":{"value": "5"}},
+      {"int":{"value": 1}},
+      {"str":{"value": "webapp_id"}},
+      {"str":{"value": "10.0.0.1_id"}},
+      {"str":{"value": "/home_id"}},
+      {"int":{"value": 300}},
+      {"int":{"value": 1622933202000000000}},
+      {"str":{"value": "GET"}},
+      {"str":{"value": "500"}}
+    ]
+  }
+  
+]
\ No newline at end of file
diff --git a/banyand/stream/testdata/shard0.json b/banyand/stream/testdata/shard0.json
deleted file mode 100644
index 28de2f3..0000000
--- a/banyand/stream/testdata/shard0.json
+++ /dev/null
@@ -1,18 +0,0 @@
-[
-  {
-    "element_id": "1",
-    "timestamp": "2021-04-15T01:30:15.01Z",
-    "tag_families": [
-      {
-        "tags": [
-          {"binary_data": "YWJjMTIzIT8kKiYoKSctPUB+"}
-        ]
-      },
-      {
-        "tags": [
-          {"str": ""}
-        ]
-      }
-    ]
-  }
-]
\ No newline at end of file
diff --git a/banyand/stream/testdata/single_series.json b/banyand/stream/testdata/single_series.json
new file mode 100644
index 0000000..049ec0c
--- /dev/null
+++ b/banyand/stream/testdata/single_series.json
@@ -0,0 +1,51 @@
+[
+  {
+    "tags": [
+      {"str":{"value": "trace_id-xxfff.111323"}},
+      {"int":{"value": 0}},
+      {"str":{"value": "webapp_id"}},
+      {"str":{"value": "10.0.0.1_id"}},
+      {"str":{"value": "/home_id"}},
+      {"int":{"value": 1000}},
+      {"int":{"value": 1622933202000000000}}
+    ]
+  },
+  {
+    "tags": [
+      {"str":{"value": "trace_id-xxfff.111323"}},
+      {"int":{"value": 0}},
+      {"str":{"value": "webapp_id"}},
+      {"str":{"value": "10.0.0.1_id"}},
+      {"str":{"value": "/home_id"}},
+      {"int":{"value": 500}},
+      {"int":{"value": 1622933202000000000}}
+    ]
+  },
+  {
+    "tags": [
+      {"str":{"value": "trace_id-xxfff.111323"}},
+      {"int":{"value": 0}},
+      {"str":{"value": "webapp_id"}},
+      {"str":{"value": "10.0.0.1_id"}},
+      {"str":{"value": "/home_id"}},
+      {"int":{"value": 30}},
+      {"int":{"value": 1622933202000000000}},
+      {"str":{"value": "GET"}},
+      {"str":{"value": "200"}}
+    ]
+  },
+  {
+    "tags": [
+      {"str":{"value": "trace_id-xxfff.111323"}},
+      {"int":{"value": 1}},
+      {"str":{"value": "httpserver_id"}},
+      {"str":{"value": "10.0.0.1_id"}},
+      {"str":{"value": "/home_id"}},
+      {"int":{"value": 300}},
+      {"int":{"value": 1622933202000000000}},
+      {"str":{"value": "GET"}},
+      {"str":{"value": "500"}}
+    ]
+  }
+  
+]
\ No newline at end of file
diff --git a/banyand/tsdb/series.go b/banyand/tsdb/series.go
index efdf8ba..376e39c 100644
--- a/banyand/tsdb/series.go
+++ b/banyand/tsdb/series.go
@@ -19,6 +19,7 @@ package tsdb
 
 import (
 	"bytes"
+	"context"
 	"io"
 	"time"
 
@@ -27,6 +28,7 @@ import (
 
 	"github.com/apache/skywalking-banyandb/api/common"
 	"github.com/apache/skywalking-banyandb/pkg/convert"
+	"github.com/apache/skywalking-banyandb/pkg/logger"
 )
 
 var (
@@ -70,9 +72,8 @@ func (i *GlobalItemID) UnMarshal(data []byte) error {
 }
 
 type TimeRange struct {
-	Start    time.Time
-	Duration time.Duration
-	End      time.Time
+	Start time.Time
+	End   time.Time
 }
 
 func (t TimeRange) contains(unixNano uint64) bool {
@@ -83,18 +84,24 @@ func (t TimeRange) contains(unixNano uint64) bool {
 	return tp.Equal(t.Start) || tp.After(t.Start)
 }
 
-func NewTimeRange(Start time.Time, Duration time.Duration) TimeRange {
+func NewTimeRange(Start, End time.Time) TimeRange {
 	return TimeRange{
-		Start:    Start,
-		Duration: Duration,
-		End:      Start.Add(Duration),
+		Start: Start,
+		End:   End,
+	}
+}
+
+func NewTimeRangeDuration(Start time.Time, Duration time.Duration) TimeRange {
+	return TimeRange{
+		Start: Start,
+		End:   Start.Add(Duration),
 	}
 }
 
 type Series interface {
 	ID() common.SeriesID
 	Span(timeRange TimeRange) (SeriesSpan, error)
-	Get(id GlobalItemID) (Item, error)
+	Get(id GlobalItemID) (Item, io.Closer, error)
 }
 
 type SeriesSpan interface {
@@ -109,18 +116,16 @@ type series struct {
 	id      common.SeriesID
 	blockDB blockDatabase
 	shardID common.ShardID
+	l       *logger.Logger
 }
 
-func (s *series) Get(id GlobalItemID) (Item, error) {
-	panic("implement me")
-}
-
-func newSeries(id common.SeriesID, blockDB blockDatabase) *series {
-	return &series{
-		id:      id,
-		blockDB: blockDB,
-		shardID: blockDB.shardID(),
-	}
+func (s *series) Get(id GlobalItemID) (Item, io.Closer, error) {
+	b := s.blockDB.block(id)
+	return &item{
+		data:     b.dataReader(),
+		itemID:   id.id,
+		seriesID: s.id,
+	}, b, nil
 }
 
 func (s *series) ID() common.SeriesID {
@@ -132,7 +137,25 @@ func (s *series) Span(timeRange TimeRange) (SeriesSpan, error) {
 	if len(blocks) < 1 {
 		return nil, ErrEmptySeriesSpan
 	}
-	return newSeriesSpan(timeRange, blocks, s.id, s.shardID), nil
+	s.l.Debug().
+		Times("time_range", []time.Time{timeRange.Start, timeRange.End}).
+		Msg("select series span")
+	return newSeriesSpan(context.WithValue(context.Background(), logger.ContextKey, s.l), timeRange, blocks, s.id, s.shardID), nil
+}
+
+func newSeries(ctx context.Context, id common.SeriesID, blockDB blockDatabase) *series {
+	s := &series{
+		id:      id,
+		blockDB: blockDB,
+		shardID: blockDB.shardID(),
+	}
+	parentLogger := ctx.Value(logger.ContextKey)
+	if pl, ok := parentLogger.(*logger.Logger); ok {
+		s.l = pl.Named("series")
+	} else {
+		s.l = logger.GetLogger("series")
+	}
+	return s
 }
 
 var _ SeriesSpan = (*seriesSpan)(nil)
@@ -142,6 +165,7 @@ type seriesSpan struct {
 	seriesID  common.SeriesID
 	shardID   common.ShardID
 	timeRange TimeRange
+	l         *logger.Logger
 }
 
 func (s *seriesSpan) Close() (err error) {
@@ -159,12 +183,18 @@ func (s *seriesSpan) SeekerBuilder() SeekerBuilder {
 	return newSeekerBuilder(s)
 }
 
-func newSeriesSpan(timeRange TimeRange, blocks []blockDelegate,
-	id common.SeriesID, shardID common.ShardID) *seriesSpan {
-	return &seriesSpan{
+func newSeriesSpan(ctx context.Context, timeRange TimeRange, blocks []blockDelegate, id common.SeriesID, shardID common.ShardID) *seriesSpan {
+	s := &seriesSpan{
 		blocks:    blocks,
 		seriesID:  id,
 		shardID:   shardID,
 		timeRange: timeRange,
 	}
+	parentLogger := ctx.Value(logger.ContextKey)
+	if pl, ok := parentLogger.(*logger.Logger); ok {
+		s.l = pl.Named("series_span")
+	} else {
+		s.l = logger.GetLogger("series_span")
+	}
+	return s
 }
diff --git a/banyand/tsdb/series_seek.go b/banyand/tsdb/series_seek.go
index 478efa0..1c4f338 100644
--- a/banyand/tsdb/series_seek.go
+++ b/banyand/tsdb/series_seek.go
@@ -63,16 +63,21 @@ type seekerBuilder struct {
 }
 
 func (s *seekerBuilder) Build() (Seeker, error) {
+	if s.order == modelv2.QueryOrder_SORT_UNSPECIFIED {
+		s.order = modelv2.QueryOrder_SORT_DESC
+	}
 	indexFilter, err := s.buildIndexFilter()
 	if err != nil {
 		return nil, err
 	}
 	filters := []filterFn{
-		indexFilter,
 		func(item Item) bool {
 			return s.seriesSpan.timeRange.contains(item.Time())
 		},
 	}
+	if indexFilter != nil {
+		filters = append(filters, indexFilter)
+	}
 	return newSeeker(s.buildSeries(filters)), nil
 }
 
diff --git a/banyand/tsdb/series_seek_filter.go b/banyand/tsdb/series_seek_filter.go
index 65147ea..8814a16 100644
--- a/banyand/tsdb/series_seek_filter.go
+++ b/banyand/tsdb/series_seek_filter.go
@@ -44,6 +44,9 @@ func (s *seekerBuilder) Filter(indexRule *databasev2.IndexRule, condition Condit
 }
 
 func (s *seekerBuilder) buildIndexFilter() (filterFn, error) {
+	if len(s.conditions) < 1 {
+		return nil, nil
+	}
 	var treeIndexCondition, invertedIndexCondition []index.Condition
 	for _, condition := range s.conditions {
 		if len(condition.condition) > 1 {
diff --git a/banyand/tsdb/series_seek_sort.go b/banyand/tsdb/series_seek_sort.go
index 1646bce..3b26a8c 100644
--- a/banyand/tsdb/series_seek_sort.go
+++ b/banyand/tsdb/series_seek_sort.go
@@ -19,6 +19,9 @@ package tsdb
 
 import (
 	"sort"
+	"time"
+
+	"go.uber.org/multierr"
 
 	"github.com/apache/skywalking-banyandb/api/common"
 	databasev2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v2"
@@ -26,6 +29,7 @@ import (
 	"github.com/apache/skywalking-banyandb/banyand/kv"
 	"github.com/apache/skywalking-banyandb/pkg/index"
 	"github.com/apache/skywalking-banyandb/pkg/index/posting"
+	"github.com/apache/skywalking-banyandb/pkg/logger"
 )
 
 func (s *seekerBuilder) OrderByIndex(indexRule *databasev2.IndexRule, order modelv2.QueryOrder_Sort) SeekerBuilder {
@@ -42,29 +46,20 @@ func (s *seekerBuilder) OrderByTime(order modelv2.QueryOrder_Sort) SeekerBuilder
 
 func (s *seekerBuilder) buildSeries(filters []filterFn) []Iterator {
 	if s.indexRuleForSorting == nil {
-		return s.buildSeriesByIndex(filters)
+		return s.buildSeriesByTime(filters)
 	}
-	return s.buildSeriesByTime(filters)
+	return s.buildSeriesByIndex(filters)
 }
 
 func (s *seekerBuilder) buildSeriesByIndex(filters []filterFn) (series []Iterator) {
 	for _, b := range s.seriesSpan.blocks {
 		switch s.indexRuleForSorting.GetType() {
 		case databasev2.IndexRule_TYPE_TREE:
-			series = append(series, newSearcherIterator(
-				b.lsmIndexReader().
-					FieldIterator([]byte(s.indexRuleForSorting.GetMetadata().GetName()), s.order),
-				b.dataReader(),
-				s.seriesSpan.seriesID,
-				filters,
-			))
+			series = append(series, newSearcherIterator(s.seriesSpan.l, b.lsmIndexReader().
+				FieldIterator([]byte(s.indexRuleForSorting.GetMetadata().GetName()), s.order), b.dataReader(), s.seriesSpan.seriesID, filters))
 		case databasev2.IndexRule_TYPE_INVERTED:
-			series = append(series, newSearcherIterator(b.invertedIndexReader().
-				FieldIterator([]byte(s.indexRuleForSorting.GetMetadata().GetName()), s.order),
-				b.dataReader(),
-				s.seriesSpan.seriesID,
-				filters,
-			))
+			series = append(series, newSearcherIterator(s.seriesSpan.l, b.invertedIndexReader().
+				FieldIterator([]byte(s.indexRuleForSorting.GetMetadata().GetName()), s.order), b.dataReader(), s.seriesSpan.seriesID, filters))
 		}
 	}
 	return
@@ -83,15 +78,23 @@ func (s *seekerBuilder) buildSeriesByTime(filters []filterFn) []Iterator {
 			return bb[i].startTime().After(bb[j].startTime())
 		})
 	}
-	delegated := make([]Iterator, len(bb))
+	delegated := make([]Iterator, 0, len(bb))
+	var bTimes []time.Time
 	for _, b := range bb {
-		delegated = append(delegated, newSearcherIterator(b.
-			primaryIndexReader().
-			FieldIterator(
-				s.seriesSpan.seriesID.Marshal(),
-				s.order,
-			), b.dataReader(), s.seriesSpan.seriesID, filters))
+		bTimes = append(bTimes, b.startTime())
+		delegated = append(delegated, newSearcherIterator(
+			s.seriesSpan.l,
+			b.primaryIndexReader().
+				FieldIterator(
+					s.seriesSpan.seriesID.Marshal(),
+					s.order,
+				), b.dataReader(), s.seriesSpan.seriesID, filters))
 	}
+	s.seriesSpan.l.Debug().
+		Str("order", modelv2.QueryOrder_Sort_name[int32(s.order)]).
+		Times("blocks", bTimes).
+		Uint64("series_id", uint64(s.seriesSpan.seriesID)).
+		Msg("seek series by time")
 	return []Iterator{newMergedIterator(delegated)}
 }
 
@@ -104,6 +107,7 @@ type searcherIterator struct {
 	data          kv.TimeSeriesReader
 	seriesID      common.SeriesID
 	filters       []filterFn
+	l             *logger.Logger
 }
 
 func (s *searcherIterator) Next() bool {
@@ -112,20 +116,22 @@ func (s *searcherIterator) Next() bool {
 			v := s.fieldIterator.Val()
 			s.cur = v.Value.Iterator()
 			s.curKey = v.Key
+			s.l.Trace().Hex("term_field", s.curKey).Msg("got a new field")
 		} else {
-			_ = s.Close()
 			return false
 		}
 	}
 	if s.cur.Next() {
+
 		for _, filter := range s.filters {
 			if !filter(s.Val()) {
+				s.l.Trace().Uint64("item_id", uint64(s.Val().ID())).Msg("ignore the item")
 				return s.Next()
 			}
 		}
+		s.l.Trace().Uint64("item_id", uint64(s.Val().ID())).Msg("got an item")
 		return true
 	}
-	_ = s.cur.Close()
 	s.cur = nil
 	return s.Next()
 }
@@ -143,12 +149,14 @@ func (s *searcherIterator) Close() error {
 	return s.fieldIterator.Close()
 }
 
-func newSearcherIterator(fieldIterator index.FieldIterator, data kv.TimeSeriesReader, seriesID common.SeriesID, filters []filterFn) Iterator {
+func newSearcherIterator(l *logger.Logger, fieldIterator index.FieldIterator, data kv.TimeSeriesReader,
+	seriesID common.SeriesID, filters []filterFn) Iterator {
 	return &searcherIterator{
 		fieldIterator: fieldIterator,
 		data:          data,
 		seriesID:      seriesID,
 		filters:       filters,
+		l:             l,
 	}
 }
 
@@ -158,17 +166,12 @@ type mergedIterator struct {
 	curr      Iterator
 	index     int
 	delegated []Iterator
-	closed    bool
 }
 
 func (m *mergedIterator) Next() bool {
-	if m.closed {
-		return false
-	}
 	if m.curr == nil {
 		m.index++
 		if m.index >= len(m.delegated) {
-			_ = m.Close()
 			return false
 		} else {
 			m.curr = m.delegated[m.index]
@@ -176,7 +179,6 @@ func (m *mergedIterator) Next() bool {
 	}
 	hasNext := m.curr.Next()
 	if !hasNext {
-		_ = m.curr.Close()
 		m.curr = nil
 		return m.Next()
 	}
@@ -188,8 +190,11 @@ func (m *mergedIterator) Val() Item {
 }
 
 func (m *mergedIterator) Close() error {
-	m.closed = true
-	return nil
+	var err error
+	for _, d := range m.delegated {
+		err = multierr.Append(err, d.Close())
+	}
+	return err
 }
 
 func newMergedIterator(delegated []Iterator) Iterator {
diff --git a/banyand/tsdb/series_write.go b/banyand/tsdb/series_write.go
index 0f69332..af59b53 100644
--- a/banyand/tsdb/series_write.go
+++ b/banyand/tsdb/series_write.go
@@ -159,9 +159,10 @@ func (d dataBucket) marshal() []byte {
 	}, nil)
 }
 
-func (w *writer) Write() (id GlobalItemID, err error) {
+func (w *writer) Write() (GlobalItemID, error) {
+	id := w.ItemID()
 	for _, c := range w.columns {
-		err = w.block.write(dataBucket{
+		err := w.block.write(dataBucket{
 			seriesID: w.itemID.seriesID,
 			family:   c.family,
 		}.marshal(),
diff --git a/banyand/tsdb/seriesdb.go b/banyand/tsdb/seriesdb.go
index 24a622f..ccae717 100644
--- a/banyand/tsdb/seriesdb.go
+++ b/banyand/tsdb/seriesdb.go
@@ -41,6 +41,14 @@ type Entry []byte
 
 type Entity []Entry
 
+func (e Entity) Marshal() []byte {
+	data := make([][]byte, len(e))
+	for i, entry := range e {
+		data[i] = entry
+	}
+	return bytes.Join(data, nil)
+}
+
 type Path struct {
 	prefix   []byte
 	mask     []byte
@@ -86,6 +94,7 @@ type SeriesDatabase interface {
 type blockDatabase interface {
 	shardID() common.ShardID
 	span(timeRange TimeRange) []blockDelegate
+	block(id GlobalItemID) blockDelegate
 }
 
 var _ SeriesDatabase = (*seriesDB)(nil)
@@ -100,6 +109,10 @@ type seriesDB struct {
 	sID            common.ShardID
 }
 
+func (s *seriesDB) block(id GlobalItemID) blockDelegate {
+	return s.lst[id.segID].lst[id.blockID].delegate()
+}
+
 func (s *seriesDB) shardID() common.ShardID {
 	return s.sID
 }
@@ -111,7 +124,7 @@ func (s *seriesDB) Get(entity Entity) (Series, error) {
 		return nil, err
 	}
 	if err == nil {
-		return newSeries(bytesConvSeriesID(seriesID), s), nil
+		return newSeries(s.context(), bytesConvSeriesID(seriesID), s), nil
 	}
 	s.Lock()
 	defer s.Unlock()
@@ -120,7 +133,7 @@ func (s *seriesDB) Get(entity Entity) (Series, error) {
 	if err != nil {
 		return nil, err
 	}
-	return newSeries(bytesConvSeriesID(seriesID), s), nil
+	return newSeries(s.context(), bytesConvSeriesID(seriesID), s), nil
 }
 
 func (s *seriesDB) List(path Path) (SeriesList, error) {
@@ -130,8 +143,14 @@ func (s *seriesDB) List(path Path) (SeriesList, error) {
 			return nil, err
 		}
 		if err == nil {
-			return []Series{newSeries(bytesConvSeriesID(id), s)}, nil
+			seriesID := bytesConvSeriesID(id)
+			s.l.Debug().
+				Hex("path", path.prefix).
+				Uint64("series_id", uint64(seriesID)).
+				Msg("got a series")
+			return []Series{newSeries(s.context(), seriesID, s)}, nil
 		}
+		s.l.Debug().Hex("path", path.prefix).Msg("doesn't get any series")
 		return nil, nil
 	}
 	result := make([]Series, 0)
@@ -147,7 +166,12 @@ func (s *seriesDB) List(path Path) (SeriesList, error) {
 				err = multierr.Append(err, errGetVal)
 				return nil
 			}
-			result = append(result, newSeries(common.SeriesID(convert.BytesToUint64(id)), s))
+			seriesID := bytesConvSeriesID(id)
+			s.l.Debug().
+				Hex("path", path.prefix).
+				Uint64("series_id", uint64(seriesID)).
+				Msg("got a series")
+			result = append(result, newSeries(s.context(), seriesID, s))
 		}
 		return nil
 	})
@@ -166,6 +190,10 @@ func (s *seriesDB) span(_ TimeRange) []blockDelegate {
 	return result
 }
 
+func (s *seriesDB) context() context.Context {
+	return context.WithValue(context.Background(), logger.ContextKey, s.l)
+}
+
 func (s *seriesDB) Close() error {
 	for _, seg := range s.lst {
 		seg.close()
@@ -183,7 +211,7 @@ func newSeriesDataBase(ctx context.Context, shardID common.ShardID, path string,
 		return nil, logger.ErrNoLoggerInContext
 	}
 	if pl, ok := parentLogger.(*logger.Logger); ok {
-		sdb.l = pl.Named("seriesSpan")
+		sdb.l = pl.Named("series_database")
 	}
 	var err error
 	sdb.seriesMetadata, err = kv.OpenStore(0, path+"/md", kv.StoreWithNamedLogger("metadata", sdb.l))
diff --git a/banyand/tsdb/seriesdb_test.go b/banyand/tsdb/seriesdb_test.go
index 3968564..a87d24c 100644
--- a/banyand/tsdb/seriesdb_test.go
+++ b/banyand/tsdb/seriesdb_test.go
@@ -338,7 +338,7 @@ func setUpEntities(t *assert.Assertions, db SeriesDatabase) []*entityWithID {
 }
 
 func newMockSeries(id common.SeriesID) *series {
-	return newSeries(id, nil)
+	return newSeries(nil, id, nil)
 }
 
 func transform(list SeriesList) (seriesIDs []common.SeriesID) {
diff --git a/banyand/tsdb/shard.go b/banyand/tsdb/shard.go
index 3b2ed4e..4bcc40e 100644
--- a/banyand/tsdb/shard.go
+++ b/banyand/tsdb/shard.go
@@ -37,6 +37,10 @@ type shard struct {
 	lst            []*segment
 }
 
+func (s *shard) ID() common.ShardID {
+	return s.id
+}
+
 func (s *shard) Series() SeriesDatabase {
 	return s.seriesDatabase
 }
diff --git a/banyand/tsdb/tsdb.go b/banyand/tsdb/tsdb.go
index 0d3da45..4528b46 100644
--- a/banyand/tsdb/tsdb.go
+++ b/banyand/tsdb/tsdb.go
@@ -60,6 +60,7 @@ type Database interface {
 
 type Shard interface {
 	io.Closer
+	ID() common.ShardID
 	Series() SeriesDatabase
 	Index() IndexDatabase
 }

[skywalking-banyandb] 02/03: Finish test

Posted by ha...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch time-series
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit e8df5deb25e057b4233a63f93736dd5b826d9902
Author: Gao Hongtao <ha...@gmail.com>
AuthorDate: Tue Sep 7 17:51:01 2021 +0800

    Finish test
    
    Signed-off-by: Gao Hongtao <ha...@gmail.com>
---
 banyand/kv/badger.go                               |   3 +
 banyand/stream/stream_query.go                     |   5 +
 banyand/stream/stream_query_test.go                | 451 ++++++++++++++++-----
 .../{single_series.json => global_index.json}      |  33 +-
 banyand/stream/testdata/multiple_shards.json       |   6 +-
 banyand/tsdb/block.go                              |  14 +-
 banyand/tsdb/indexdb.go                            |   6 +-
 banyand/tsdb/series.go                             |  22 +-
 banyand/tsdb/series_seek.go                        |   9 +-
 banyand/tsdb/series_seek_filter.go                 |   6 +-
 banyand/tsdb/series_seek_sort.go                   |  40 +-
 banyand/tsdb/series_write.go                       |  20 +-
 banyand/tsdb/seriesdb.go                           |   5 +
 banyand/tsdb/seriesdb_test.go                      |   2 +-
 pkg/index/index.go                                 |   4 -
 pkg/index/inverted/field_map.go                    |  23 +-
 pkg/index/inverted/inverted.go                     |   5 -
 pkg/index/inverted/mem.go                          |  14 +-
 pkg/index/inverted/mem_test.go                     | 129 ++----
 pkg/index/search.go                                |   4 +-
 20 files changed, 505 insertions(+), 296 deletions(-)

diff --git a/banyand/kv/badger.go b/banyand/kv/badger.go
index 131032d..a3767d5 100644
--- a/banyand/kv/badger.go
+++ b/banyand/kv/badger.go
@@ -255,6 +255,9 @@ func (b *badgerDB) GetAll(key []byte, applyFn func([]byte) error) error {
 	iter := b.db.NewIterator(badger.DefaultIteratorOptions)
 	var count int
 	for iter.Seek(key); iter.Valid(); iter.Next() {
+		if !bytes.Equal(y.ParseKey(iter.Key()), key) {
+			break
+		}
 		count++
 		err := applyFn(y.Copy(iter.Value().Value))
 		if err != nil {
diff --git a/banyand/stream/stream_query.go b/banyand/stream/stream_query.go
index 3402665..5bd9844 100644
--- a/banyand/stream/stream_query.go
+++ b/banyand/stream/stream_query.go
@@ -41,6 +41,7 @@ type Query interface {
 
 type Stream interface {
 	Shards(entity tsdb.Entity) ([]tsdb.Shard, error)
+	Shard(id common.ShardID) (tsdb.Shard, error)
 }
 
 var _ Stream = (*stream)(nil)
@@ -65,6 +66,10 @@ func (s *stream) Shards(entity tsdb.Entity) ([]tsdb.Shard, error) {
 	return []tsdb.Shard{shard}, nil
 }
 
+func (s *stream) Shard(id common.ShardID) (tsdb.Shard, error) {
+	return s.db.Shard(id)
+}
+
 func (s *stream) ParseTagFamily(family string, item tsdb.Item) (*modelv2.TagFamily, error) {
 	familyRawBytes, err := item.Val(family)
 	if err != nil {
diff --git a/banyand/stream/stream_query_test.go b/banyand/stream/stream_query_test.go
index 8b6cc12..65311ce 100644
--- a/banyand/stream/stream_query_test.go
+++ b/banyand/stream/stream_query_test.go
@@ -24,23 +24,36 @@ import (
 	"encoding/base64"
 	"encoding/json"
 	"fmt"
+	"io"
 	"sort"
 	"strconv"
 	"testing"
 	"time"
 
 	"github.com/golang/protobuf/jsonpb"
+	"github.com/pkg/errors"
 	"github.com/stretchr/testify/assert"
 	"google.golang.org/protobuf/types/known/timestamppb"
 
 	"github.com/apache/skywalking-banyandb/api/common"
+	commonv2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v2"
+	databasev2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v2"
 	modelv2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v2"
 	streamv2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v2"
 	"github.com/apache/skywalking-banyandb/banyand/tsdb"
 	"github.com/apache/skywalking-banyandb/pkg/convert"
+	"github.com/apache/skywalking-banyandb/pkg/index"
 	"github.com/apache/skywalking-banyandb/pkg/partition"
 )
 
+type shardStruct struct {
+	id       common.ShardID
+	location []string
+	elements []string
+}
+
+type shardsForTest []shardStruct
+
 func Test_Stream_SelectShard(t *testing.T) {
 	tester := assert.New(t)
 	s, deferFunc := setup(tester)
@@ -87,86 +100,184 @@ func Test_Stream_Series(t *testing.T) {
 	s, deferFunc := setup(tester)
 	defer deferFunc()
 	baseTime := setupQueryData(tester, "multiple_shards.json", s)
-	type args struct {
-		entity tsdb.Entity
-	}
-	type shardStruct struct {
-		id       common.ShardID
-		location []string
-		elements []string
-	}
-	type want struct {
-		shards []shardStruct
-	}
-
 	tests := []struct {
 		name    string
-		args    args
-		want    want
+		args    queryOpts
+		want    shardsForTest
 		wantErr bool
 	}{
 		{
 			name: "all",
-			args: args{
-				entity: tsdb.Entity{tsdb.AnyEntry, tsdb.AnyEntry, tsdb.AnyEntry},
+			args: queryOpts{
+				entity:    tsdb.Entity{tsdb.AnyEntry, tsdb.AnyEntry, tsdb.AnyEntry},
+				timeRange: tsdb.NewTimeRangeDuration(baseTime, 1*time.Hour),
 			},
-			want: want{
-				shards: []shardStruct{
-					{
-						id:       0,
-						location: []string{"series_12243341348514563931", "data_flow_0"},
-						elements: []string{"1"},
-					},
-					{
-						id:       0,
-						location: []string{"series_1671844747554927007", "data_flow_0"},
-						elements: []string{"2"},
-					},
-					{
-						id:       1,
-						location: []string{"series_2374367181827824198", "data_flow_0"},
-						elements: []string{"5", "3"},
-					},
-					{
-						id:       1,
-						location: []string{"series_8429137420168685297", "data_flow_0"},
-						elements: []string{"4"},
-					},
+			want: shardsForTest{
+				{
+					id:       0,
+					location: []string{"series_12243341348514563931", "data_flow_0"},
+					elements: []string{"1"},
+				},
+				{
+					id:       0,
+					location: []string{"series_1671844747554927007", "data_flow_0"},
+					elements: []string{"2"},
+				},
+				{
+					id:       1,
+					location: []string{"series_2374367181827824198", "data_flow_0"},
+					elements: []string{"5", "3"},
+				},
+				{
+					id:       1,
+					location: []string{"series_8429137420168685297", "data_flow_0"},
+					elements: []string{"4"},
+				},
+			},
+		},
+
+		{
+			name: "time range",
+			args: queryOpts{
+				entity:    tsdb.Entity{tsdb.AnyEntry, tsdb.AnyEntry, tsdb.AnyEntry},
+				timeRange: tsdb.NewTimeRangeDuration(baseTime.Add(1500*time.Millisecond), 1*time.Hour),
+			},
+			want: shardsForTest{
+				{
+					id:       0,
+					location: []string{"series_12243341348514563931", "data_flow_0"},
+				},
+				{
+					id:       0,
+					location: []string{"series_1671844747554927007", "data_flow_0"},
+				},
+				{
+					id:       1,
+					location: []string{"series_2374367181827824198", "data_flow_0"},
+					elements: []string{"5"},
+				},
+				{
+					id:       1,
+					location: []string{"series_8429137420168685297", "data_flow_0"},
+					elements: []string{"4"},
 				},
 			},
 		},
 		{
 			name: "find series by service_id and instance_id",
-			args: args{
-				entity: tsdb.Entity{tsdb.Entry("webapp_id"), tsdb.Entry("10.0.0.1_id"), tsdb.AnyEntry},
+			args: queryOpts{
+				entity:    tsdb.Entity{tsdb.Entry("webapp_id"), tsdb.Entry("10.0.0.1_id"), tsdb.AnyEntry},
+				timeRange: tsdb.NewTimeRangeDuration(baseTime, 1*time.Hour),
 			},
-			want: want{
-				shards: []shardStruct{
-					{
-						id:       0,
-						location: []string{"series_12243341348514563931", "data_flow_0"},
-						elements: []string{"1"},
-					},
-					{
-						id:       1,
-						location: []string{"series_2374367181827824198", "data_flow_0"},
-						elements: []string{"5", "3"},
-					},
+			want: shardsForTest{
+				{
+					id:       0,
+					location: []string{"series_12243341348514563931", "data_flow_0"},
+					elements: []string{"1"},
+				},
+				{
+					id:       1,
+					location: []string{"series_2374367181827824198", "data_flow_0"},
+					elements: []string{"5", "3"},
 				},
 			},
 		},
 		{
 			name: "find a series",
-			args: args{
-				entity: tsdb.Entity{tsdb.Entry("webapp_id"), tsdb.Entry("10.0.0.1_id"), convert.Uint64ToBytes(1)},
+			args: queryOpts{
+				entity:    tsdb.Entity{tsdb.Entry("webapp_id"), tsdb.Entry("10.0.0.1_id"), convert.Uint64ToBytes(1)},
+				timeRange: tsdb.NewTimeRangeDuration(baseTime, 1*time.Hour),
 			},
-			want: want{
-				shards: []shardStruct{
-					{
-						id:       1,
-						location: []string{"series_2374367181827824198", "data_flow_0"},
-						elements: []string{"5", "3"},
-					},
+			want: shardsForTest{
+				{
+					id:       1,
+					location: []string{"series_2374367181827824198", "data_flow_0"},
+					elements: []string{"5", "3"},
+				},
+			},
+		},
+		{
+			name: "filter",
+			args: queryOpts{
+				entity:    tsdb.Entity{tsdb.AnyEntry, tsdb.AnyEntry, tsdb.AnyEntry},
+				timeRange: tsdb.NewTimeRangeDuration(baseTime, 1*time.Hour),
+				buildFn: func(builder tsdb.SeekerBuilder) {
+					builder.Filter(&databasev2.IndexRule{
+						Metadata: &commonv2.Metadata{
+							Name:  "endpoint_id",
+							Group: "default",
+						},
+						Tags:     []string{"endpoint_id"},
+						Type:     databasev2.IndexRule_TYPE_INVERTED,
+						Location: databasev2.IndexRule_LOCATION_SERIES,
+					}, tsdb.Condition{
+						"endpoint_id": []index.ConditionValue{
+							{
+								Op:     modelv2.Condition_BINARY_OP_EQ,
+								Values: [][]byte{[]byte("/home_id")},
+							},
+						},
+					})
+				},
+			},
+			want: shardsForTest{
+				{
+					id:       0,
+					location: []string{"series_12243341348514563931", "data_flow_0"},
+					elements: []string{"1"},
+				},
+				{
+					id:       0,
+					location: []string{"series_1671844747554927007", "data_flow_0"},
+				},
+				{
+					id:       1,
+					location: []string{"series_2374367181827824198", "data_flow_0"},
+					elements: []string{"3"},
+				},
+				{
+					id:       1,
+					location: []string{"series_8429137420168685297", "data_flow_0"},
+				},
+			},
+		},
+		{
+			name: "order by duration",
+			args: queryOpts{
+				entity:    tsdb.Entity{tsdb.AnyEntry, tsdb.AnyEntry, tsdb.AnyEntry},
+				timeRange: tsdb.NewTimeRangeDuration(baseTime, 1*time.Hour),
+				buildFn: func(builder tsdb.SeekerBuilder) {
+					builder.OrderByIndex(&databasev2.IndexRule{
+						Metadata: &commonv2.Metadata{
+							Name:  "duration",
+							Group: "default",
+						},
+						Tags:     []string{"duration"},
+						Type:     databasev2.IndexRule_TYPE_TREE,
+						Location: databasev2.IndexRule_LOCATION_SERIES,
+					}, modelv2.QueryOrder_SORT_ASC)
+				},
+			},
+			want: shardsForTest{
+				{
+					id:       0,
+					location: []string{"series_12243341348514563931", "data_flow_0"},
+					elements: []string{"1"},
+				},
+				{
+					id:       0,
+					location: []string{"series_1671844747554927007", "data_flow_0"},
+					elements: []string{"2"},
+				},
+				{
+					id:       1,
+					location: []string{"series_2374367181827824198", "data_flow_0"},
+					elements: []string{"3", "5"},
+				},
+				{
+					id:       1,
+					location: []string{"series_8429137420168685297", "data_flow_0"},
+					elements: []string{"4"},
 				},
 			},
 		},
@@ -174,59 +285,15 @@ func Test_Stream_Series(t *testing.T) {
 
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
-			shards, err := s.Shards(tt.args.entity)
-			tester.NoError(err)
-			got := want{
-				shards: []shardStruct{},
-			}
-
-			for _, shard := range shards {
-				seriesList, err := shard.Series().List(tsdb.NewPath(tt.args.entity))
-				tester.NoError(err)
-				for _, series := range seriesList {
-					func(g *want) {
-						sp, err := series.Span(tsdb.NewTimeRangeDuration(baseTime, 1*time.Hour))
-						defer func(sp tsdb.SeriesSpan) {
-							_ = sp.Close()
-						}(sp)
-						tester.NoError(err)
-						seeker, err := sp.SeekerBuilder().Build()
-						tester.NoError(err)
-						iter, err := seeker.Seek()
-						tester.NoError(err)
-						for dataFlowID, iterator := range iter {
-							var elements []string
-							for iterator.Next() {
-								tagFamily, err := s.ParseTagFamily("searchable", iterator.Val())
-								tester.NoError(err)
-								for _, tag := range tagFamily.GetTags() {
-									if tag.GetKey() == "trace_id" {
-										elements = append(elements, tag.GetValue().GetStr().GetValue())
-									}
-								}
-							}
-							_ = iterator.Close()
-							g.shards = append(g.shards, shardStruct{
-								id: shard.ID(),
-								location: []string{
-									fmt.Sprintf("series_%v", series.ID()),
-									"data_flow_" + strconv.Itoa(dataFlowID),
-								},
-								elements: elements,
-							})
-						}
-
-					}(&got)
-				}
-			}
+			got, err := queryData(tester, s, tt.args)
 			if tt.wantErr {
 				tester.Error(err)
 				return
 			}
 			tester.NoError(err)
-			sort.SliceStable(got.shards, func(i, j int) bool {
-				a := got.shards[i]
-				b := got.shards[j]
+			sort.SliceStable(got, func(i, j int) bool {
+				a := got[i]
+				b := got[j]
 				if a.id > b.id {
 					return false
 				}
@@ -244,6 +311,169 @@ func Test_Stream_Series(t *testing.T) {
 
 }
 
+func Test_Stream_Global_Index(t *testing.T) {
+	tester := assert.New(t)
+	s, deferFunc := setup(tester)
+	defer deferFunc()
+	_ = setupQueryData(tester, "global_index.json", s)
+	tests := []struct {
+		name                string
+		traceID             string
+		wantTraceSegmentNum int
+		wantErr             bool
+	}{
+		{
+			name:                "trace id is 1",
+			traceID:             "1",
+			wantTraceSegmentNum: 2,
+		},
+		{
+			name:                "trace id is 2",
+			traceID:             "2",
+			wantTraceSegmentNum: 3,
+		},
+		{
+			name:    "unknown trace id",
+			traceID: "foo",
+		},
+	}
+
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			shards, errShards := s.Shards(nil)
+			tester.NoError(errShards)
+			err := func() error {
+				for _, shard := range shards {
+					itemIDs, err := shard.Index().Seek(index.Field{
+						Term:  []byte("trace_id"),
+						Value: []byte(tt.traceID),
+					})
+					if err != nil {
+						return errors.WithStack(err)
+					}
+					if len(itemIDs) < 1 {
+						continue
+					}
+					if err != nil {
+						return errors.WithStack(err)
+					}
+					tester.Equal(tt.wantTraceSegmentNum, len(itemIDs))
+					for _, itemID := range itemIDs {
+						segShard, err := s.Shard(itemID.ShardID)
+						if err != nil {
+							return errors.WithStack(err)
+						}
+						series, err := segShard.Series().GetByID(itemID.SeriesID)
+						if err != nil {
+							return errors.WithStack(err)
+						}
+						err = func() error {
+							item, closer, errInner := series.Get(itemID)
+							defer func(closer io.Closer) {
+								_ = closer.Close()
+							}(closer)
+							if errInner != nil {
+								return errors.WithStack(errInner)
+							}
+							tagFamily, errInner := s.ParseTagFamily("searchable", item)
+							if errInner != nil {
+								return errors.WithStack(errInner)
+							}
+							for _, tag := range tagFamily.GetTags() {
+								if tag.GetKey() == "trace_id" {
+									tester.Equal(tt.traceID, tag.GetValue().GetStr().GetValue())
+								}
+							}
+							return nil
+						}()
+						if err != nil {
+							return errors.WithStack(err)
+						}
+
+					}
+				}
+				return nil
+			}()
+			if tt.wantErr {
+				tester.Error(err)
+				return
+			}
+			tester.NoError(err)
+		})
+	}
+
+}
+
+type queryOpts struct {
+	entity    tsdb.Entity
+	timeRange tsdb.TimeRange
+	buildFn   func(builder tsdb.SeekerBuilder)
+}
+
+func queryData(tester *assert.Assertions, s *stream, opts queryOpts) (shardsForTest, error) {
+	shards, err := s.Shards(opts.entity)
+	tester.NoError(err)
+	got := shardsForTest{}
+	for _, shard := range shards {
+		seriesList, err := shard.Series().List(tsdb.NewPath(opts.entity))
+		if err != nil {
+			return nil, err
+		}
+		for _, series := range seriesList {
+			got, err = func(g shardsForTest) (shardsForTest, error) {
+				sp, errInner := series.Span(opts.timeRange)
+				defer func(sp tsdb.SeriesSpan) {
+					_ = sp.Close()
+				}(sp)
+				if errInner != nil {
+					return nil, errInner
+				}
+				builder := sp.SeekerBuilder()
+				if opts.buildFn != nil {
+					opts.buildFn(builder)
+				}
+				seeker, errInner := builder.Build()
+				if errInner != nil {
+					return nil, errInner
+				}
+				iter, errInner := seeker.Seek()
+				if errInner != nil {
+					return nil, errInner
+				}
+				for dataFlowID, iterator := range iter {
+					var elements []string
+					for iterator.Next() {
+						tagFamily, errInner := s.ParseTagFamily("searchable", iterator.Val())
+						if errInner != nil {
+							return nil, errInner
+						}
+						for _, tag := range tagFamily.GetTags() {
+							if tag.GetKey() == "trace_id" {
+								elements = append(elements, tag.GetValue().GetStr().GetValue())
+							}
+						}
+					}
+					_ = iterator.Close()
+					g = append(g, shardStruct{
+						id: shard.ID(),
+						location: []string{
+							fmt.Sprintf("series_%v", series.ID()),
+							"data_flow_" + strconv.Itoa(dataFlowID),
+						},
+						elements: elements,
+					})
+				}
+
+				return g, nil
+			}(got)
+			if err != nil {
+				return nil, err
+			}
+		}
+	}
+	return got, nil
+}
+
 //go:embed testdata/*.json
 var dataFS embed.FS
 
@@ -261,7 +491,7 @@ func setupQueryData(t *assert.Assertions, dataFile string, stream *stream) (base
 		t.NoError(jsonpb.UnmarshalString(string(rawSearchTagFamily), searchTagFamily))
 		e := &streamv2.ElementValue{
 			ElementId: strconv.Itoa(i),
-			Timestamp: timestamppb.New(baseTime.Add(time.Millisecond * time.Duration(i))),
+			Timestamp: timestamppb.New(baseTime.Add(500 * time.Millisecond * time.Duration(i))),
 			TagFamilies: []*streamv2.ElementValue_TagFamily{
 				{
 					Tags: []*modelv2.TagValue{
@@ -282,6 +512,7 @@ func setupQueryData(t *assert.Assertions, dataFile string, stream *stream) (base
 		itemID, err := stream.write(common.ShardID(shardID), e)
 		t.NoError(err)
 		sa, err := stream.Shards(entity)
+		t.NoError(err)
 		for _, shard := range sa {
 			se, err := shard.Series().Get(entity)
 			t.NoError(err)
diff --git a/banyand/stream/testdata/single_series.json b/banyand/stream/testdata/global_index.json
similarity index 57%
rename from banyand/stream/testdata/single_series.json
rename to banyand/stream/testdata/global_index.json
index 049ec0c..9e81928 100644
--- a/banyand/stream/testdata/single_series.json
+++ b/banyand/stream/testdata/global_index.json
@@ -1,7 +1,7 @@
 [
   {
     "tags": [
-      {"str":{"value": "trace_id-xxfff.111323"}},
+      {"str":{"value": "1"}},
       {"int":{"value": 0}},
       {"str":{"value": "webapp_id"}},
       {"str":{"value": "10.0.0.1_id"}},
@@ -12,35 +12,48 @@
   },
   {
     "tags": [
-      {"str":{"value": "trace_id-xxfff.111323"}},
+      {"str":{"value": "2"}},
       {"int":{"value": 0}},
       {"str":{"value": "webapp_id"}},
-      {"str":{"value": "10.0.0.1_id"}},
-      {"str":{"value": "/home_id"}},
+      {"str":{"value": "10.0.0.3_id"}},
+      {"str":{"value": "/product_id"}},
       {"int":{"value": 500}},
       {"int":{"value": 1622933202000000000}}
     ]
   },
   {
     "tags": [
-      {"str":{"value": "trace_id-xxfff.111323"}},
-      {"int":{"value": 0}},
+      {"str":{"value": "1"}},
+      {"int":{"value": 1}},
       {"str":{"value": "webapp_id"}},
       {"str":{"value": "10.0.0.1_id"}},
       {"str":{"value": "/home_id"}},
       {"int":{"value": 30}},
       {"int":{"value": 1622933202000000000}},
       {"str":{"value": "GET"}},
-      {"str":{"value": "200"}}
+      {"str":{"value": "500"}}
     ]
   },
   {
     "tags": [
-      {"str":{"value": "trace_id-xxfff.111323"}},
+      {"str":{"value": "2"}},
       {"int":{"value": 1}},
-      {"str":{"value": "httpserver_id"}},
+      {"str":{"value": "webapp_id"}},
+      {"str":{"value": "10.0.0.5_id"}},
+      {"str":{"value": "/price_id"}},
+      {"int":{"value": 60}},
+      {"int":{"value": 1622933202000000000}},
+      {"str":{"value": "GET"}},
+      {"str":{"value": "400"}}
+    ]
+  },
+  {
+    "tags": [
+      {"str":{"value": "2"}},
+      {"int":{"value": 1}},
+      {"str":{"value": "webapp_id"}},
       {"str":{"value": "10.0.0.1_id"}},
-      {"str":{"value": "/home_id"}},
+      {"str":{"value": "/item_id"}},
       {"int":{"value": 300}},
       {"int":{"value": 1622933202000000000}},
       {"str":{"value": "GET"}},
diff --git a/banyand/stream/testdata/multiple_shards.json b/banyand/stream/testdata/multiple_shards.json
index 791c7c9..3f1a721 100644
--- a/banyand/stream/testdata/multiple_shards.json
+++ b/banyand/stream/testdata/multiple_shards.json
@@ -16,7 +16,7 @@
       {"int":{"value": 0}},
       {"str":{"value": "webapp_id"}},
       {"str":{"value": "10.0.0.3_id"}},
-      {"str":{"value": "/home_id"}},
+      {"str":{"value": "/product_id"}},
       {"int":{"value": 500}},
       {"int":{"value": 1622933202000000000}}
     ]
@@ -40,7 +40,7 @@
       {"int":{"value": 1}},
       {"str":{"value": "webapp_id"}},
       {"str":{"value": "10.0.0.5_id"}},
-      {"str":{"value": "/home_id"}},
+      {"str":{"value": "/price_id"}},
       {"int":{"value": 60}},
       {"int":{"value": 1622933202000000000}},
       {"str":{"value": "GET"}},
@@ -53,7 +53,7 @@
       {"int":{"value": 1}},
       {"str":{"value": "webapp_id"}},
       {"str":{"value": "10.0.0.1_id"}},
-      {"str":{"value": "/home_id"}},
+      {"str":{"value": "/item_id"}},
       {"int":{"value": 300}},
       {"int":{"value": 1622933202000000000}},
       {"str":{"value": "GET"}},
diff --git a/banyand/tsdb/block.go b/banyand/tsdb/block.go
index 7d5168f..44f1922 100644
--- a/banyand/tsdb/block.go
+++ b/banyand/tsdb/block.go
@@ -81,22 +81,10 @@ func newBlock(ctx context.Context, opts blockOpts) (b *block, err error) {
 	}
 	b.closableLst = append(b.closableLst, b.store, b.primaryIndex)
 	rules, ok := ctx.Value(indexRulesKey).([]*databasev2.IndexRule)
-	var specs []index.FieldSpec
-	for _, rule := range rules {
-		if rule.GetLocation() == databasev2.IndexRule_LOCATION_SERIES {
-			specs = append(specs, index.FieldSpec{
-				Name: rule.GetMetadata().GetName(),
-			})
-		}
-	}
-	if !ok || len(specs) == 0 {
+	if !ok || len(rules) == 0 {
 		return b, nil
 	}
 	b.invertedIndex = inverted.NewStore("inverted")
-	err = b.invertedIndex.Initialize(specs)
-	if err != nil {
-		return nil, err
-	}
 	return b, nil
 }
 
diff --git a/banyand/tsdb/indexdb.go b/banyand/tsdb/indexdb.go
index c81c727..c9fd9d0 100644
--- a/banyand/tsdb/indexdb.go
+++ b/banyand/tsdb/indexdb.go
@@ -24,6 +24,7 @@ import (
 	"github.com/pkg/errors"
 
 	"github.com/apache/skywalking-banyandb/api/common"
+	"github.com/apache/skywalking-banyandb/banyand/kv"
 	"github.com/apache/skywalking-banyandb/pkg/index"
 )
 
@@ -54,7 +55,7 @@ type indexDB struct {
 }
 
 func (i *indexDB) Seek(term index.Field) ([]GlobalItemID, error) {
-	var result []GlobalItemID
+	result := make([]GlobalItemID, 0)
 	err := i.lst[0].globalIndex.GetAll(term.Marshal(), func(rawBytes []byte) error {
 		id := &GlobalItemID{}
 		err := id.UnMarshal(rawBytes)
@@ -64,6 +65,9 @@ func (i *indexDB) Seek(term index.Field) ([]GlobalItemID, error) {
 		result = append(result, *id)
 		return nil
 	})
+	if err == kv.ErrKeyNotFound {
+		return result, nil
+	}
 	return result, err
 }
 
diff --git a/banyand/tsdb/series.go b/banyand/tsdb/series.go
index 376e39c..c9ec07b 100644
--- a/banyand/tsdb/series.go
+++ b/banyand/tsdb/series.go
@@ -37,37 +37,37 @@ var (
 )
 
 type GlobalItemID struct {
-	shardID  common.ShardID
+	ShardID  common.ShardID
 	segID    uint16
 	blockID  uint16
-	seriesID common.SeriesID
-	id       common.ItemID
+	SeriesID common.SeriesID
+	ID       common.ItemID
 }
 
 func (i *GlobalItemID) Marshal() []byte {
 	return bytes.Join([][]byte{
-		convert.Uint32ToBytes(uint32(i.shardID)),
+		convert.Uint32ToBytes(uint32(i.ShardID)),
 		convert.Uint16ToBytes(i.segID),
 		convert.Uint16ToBytes(i.blockID),
-		convert.Uint64ToBytes(uint64(i.seriesID)),
-		convert.Uint64ToBytes(uint64(i.id)),
+		convert.Uint64ToBytes(uint64(i.SeriesID)),
+		convert.Uint64ToBytes(uint64(i.ID)),
 	}, nil)
 }
 
 func (i *GlobalItemID) UnMarshal(data []byte) error {
-	if len(data) <= 32+16+16+64+64 {
+	if len(data) != 4+2+2+8+8 {
 		return ErrItemIDMalformed
 	}
 	var offset int
-	i.shardID = common.ShardID(convert.BytesToUint32(data[offset : offset+4]))
+	i.ShardID = common.ShardID(convert.BytesToUint32(data[offset : offset+4]))
 	offset += 4
 	i.segID = convert.BytesToUint16(data[offset : offset+2])
 	offset += 2
 	i.blockID = convert.BytesToUint16(data[offset : offset+2])
 	offset += 2
-	i.seriesID = common.SeriesID(convert.BytesToUint64(data[offset : offset+8]))
+	i.SeriesID = common.SeriesID(convert.BytesToUint64(data[offset : offset+8]))
 	offset += 8
-	i.id = common.ItemID(convert.BytesToUint64(data[offset:]))
+	i.ID = common.ItemID(convert.BytesToUint64(data[offset:]))
 	return nil
 }
 
@@ -123,7 +123,7 @@ func (s *series) Get(id GlobalItemID) (Item, io.Closer, error) {
 	b := s.blockDB.block(id)
 	return &item{
 		data:     b.dataReader(),
-		itemID:   id.id,
+		itemID:   id.ID,
 		seriesID: s.id,
 	}, b, nil
 }
diff --git a/banyand/tsdb/series_seek.go b/banyand/tsdb/series_seek.go
index 1c4f338..a6cf6c0 100644
--- a/banyand/tsdb/series_seek.go
+++ b/banyand/tsdb/series_seek.go
@@ -18,6 +18,8 @@
 package tsdb
 
 import (
+	"time"
+
 	"github.com/apache/skywalking-banyandb/api/common"
 	databasev2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v2"
 	modelv2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v2"
@@ -72,7 +74,12 @@ func (s *seekerBuilder) Build() (Seeker, error) {
 	}
 	filters := []filterFn{
 		func(item Item) bool {
-			return s.seriesSpan.timeRange.contains(item.Time())
+			valid := s.seriesSpan.timeRange.contains(item.Time())
+			timeRange := s.seriesSpan.timeRange
+			s.seriesSpan.l.Trace().
+				Times("time_range", []time.Time{timeRange.Start, timeRange.End}).
+				Bool("valid", valid).Msg("filter item by time range")
+			return valid
 		},
 	}
 	if indexFilter != nil {
diff --git a/banyand/tsdb/series_seek_filter.go b/banyand/tsdb/series_seek_filter.go
index 8814a16..990d1ae 100644
--- a/banyand/tsdb/series_seek_filter.go
+++ b/banyand/tsdb/series_seek_filter.go
@@ -53,7 +53,7 @@ func (s *seekerBuilder) buildIndexFilter() (filterFn, error) {
 			//TODO:// should support composite index rule
 			return nil, ErrUnsupportedIndexRule
 		}
-		var cond index.Condition
+		cond := make(index.Condition)
 		term := index.Term{
 			SeriesID:  s.seriesSpan.seriesID,
 			IndexRule: condition.indexRule,
@@ -100,7 +100,9 @@ func (s *seekerBuilder) buildIndexFilter() (filterFn, error) {
 		}
 	}
 	return func(item Item) bool {
-		return allItemIDs.Contains(item.ID())
+		valid := allItemIDs.Contains(item.ID())
+		s.seriesSpan.l.Trace().Int("valid_item_num", allItemIDs.Len()).Bool("valid", valid).Msg("filter item by index")
+		return valid
 	}, nil
 }
 
diff --git a/banyand/tsdb/series_seek_sort.go b/banyand/tsdb/series_seek_sort.go
index 3b26a8c..8e45a91 100644
--- a/banyand/tsdb/series_seek_sort.go
+++ b/banyand/tsdb/series_seek_sort.go
@@ -53,13 +53,19 @@ func (s *seekerBuilder) buildSeries(filters []filterFn) []Iterator {
 
 func (s *seekerBuilder) buildSeriesByIndex(filters []filterFn) (series []Iterator) {
 	for _, b := range s.seriesSpan.blocks {
+		var inner index.FieldIterator
+		term := index.Term{
+			SeriesID:  s.seriesSpan.seriesID,
+			IndexRule: s.indexRuleForSorting.GetMetadata().GetName(),
+		}
 		switch s.indexRuleForSorting.GetType() {
 		case databasev2.IndexRule_TYPE_TREE:
-			series = append(series, newSearcherIterator(s.seriesSpan.l, b.lsmIndexReader().
-				FieldIterator([]byte(s.indexRuleForSorting.GetMetadata().GetName()), s.order), b.dataReader(), s.seriesSpan.seriesID, filters))
+			inner = b.lsmIndexReader().FieldIterator(term.Marshal(), s.order)
 		case databasev2.IndexRule_TYPE_INVERTED:
-			series = append(series, newSearcherIterator(s.seriesSpan.l, b.invertedIndexReader().
-				FieldIterator([]byte(s.indexRuleForSorting.GetMetadata().GetName()), s.order), b.dataReader(), s.seriesSpan.seriesID, filters))
+			inner = b.invertedIndexReader().FieldIterator(term.Marshal(), s.order)
+		}
+		if inner != nil {
+			series = append(series, newSearcherIterator(s.seriesSpan.l, inner, b.dataReader(), s.seriesSpan.seriesID, filters))
 		}
 	}
 	return
@@ -79,16 +85,17 @@ func (s *seekerBuilder) buildSeriesByTime(filters []filterFn) []Iterator {
 		})
 	}
 	delegated := make([]Iterator, 0, len(bb))
-	var bTimes []time.Time
+	bTimes := make([]time.Time, 0, len(bb))
 	for _, b := range bb {
 		bTimes = append(bTimes, b.startTime())
-		delegated = append(delegated, newSearcherIterator(
-			s.seriesSpan.l,
-			b.primaryIndexReader().
-				FieldIterator(
-					s.seriesSpan.seriesID.Marshal(),
-					s.order,
-				), b.dataReader(), s.seriesSpan.seriesID, filters))
+		inner := b.primaryIndexReader().
+			FieldIterator(
+				s.seriesSpan.seriesID.Marshal(),
+				s.order,
+			)
+		if inner != nil {
+			delegated = append(delegated, newSearcherIterator(s.seriesSpan.l, inner, b.dataReader(), s.seriesSpan.seriesID, filters))
+		}
 	}
 	s.seriesSpan.l.Debug().
 		Str("order", modelv2.QueryOrder_Sort_name[int32(s.order)]).
@@ -116,7 +123,7 @@ func (s *searcherIterator) Next() bool {
 			v := s.fieldIterator.Val()
 			s.cur = v.Value.Iterator()
 			s.curKey = v.Key
-			s.l.Trace().Hex("term_field", s.curKey).Msg("got a new field")
+			s.l.Trace().Uint64("series_id", uint64(s.seriesID)).Hex("term_field", s.curKey).Msg("got a new field")
 		} else {
 			return false
 		}
@@ -125,11 +132,11 @@ func (s *searcherIterator) Next() bool {
 
 		for _, filter := range s.filters {
 			if !filter(s.Val()) {
-				s.l.Trace().Uint64("item_id", uint64(s.Val().ID())).Msg("ignore the item")
+				s.l.Trace().Uint64("series_id", uint64(s.seriesID)).Uint64("item_id", uint64(s.Val().ID())).Msg("ignore the item")
 				return s.Next()
 			}
 		}
-		s.l.Trace().Uint64("item_id", uint64(s.Val().ID())).Msg("got an item")
+		s.l.Trace().Uint64("series_id", uint64(s.seriesID)).Uint64("item_id", uint64(s.Val().ID())).Msg("got an item")
 		return true
 	}
 	s.cur = nil
@@ -173,9 +180,8 @@ func (m *mergedIterator) Next() bool {
 		m.index++
 		if m.index >= len(m.delegated) {
 			return false
-		} else {
-			m.curr = m.delegated[m.index]
 		}
+		m.curr = m.delegated[m.index]
 	}
 	hasNext := m.curr.Next()
 	if !hasNext {
diff --git a/banyand/tsdb/series_write.go b/banyand/tsdb/series_write.go
index af59b53..b335be9 100644
--- a/banyand/tsdb/series_write.go
+++ b/banyand/tsdb/series_write.go
@@ -96,11 +96,11 @@ func (w *writerBuilder) Build() (Writer, error) {
 		block: w.block,
 		ts:    w.ts,
 		itemID: &GlobalItemID{
-			shardID:  w.series.shardID,
+			ShardID:  w.series.shardID,
 			segID:    segID,
 			blockID:  blockID,
-			seriesID: w.series.seriesID,
-			id:       common.ItemID(uint64(w.ts.UnixNano())),
+			SeriesID: w.series.seriesID,
+			ID:       common.ItemID(uint64(w.ts.UnixNano())),
 		},
 		columns: w.values,
 	}, nil
@@ -131,20 +131,20 @@ func (w *writer) ItemID() GlobalItemID {
 
 func (w *writer) WriteLSMIndex(field index.Field) error {
 	t := index.Term{
-		SeriesID:  w.itemID.seriesID,
+		SeriesID:  w.itemID.SeriesID,
 		IndexRule: string(field.Term),
 	}
 	field.Term = t.Marshal()
-	return w.block.writeLSMIndex(field, w.itemID.id)
+	return w.block.writeLSMIndex(field, w.itemID.ID)
 }
 
 func (w *writer) WriteInvertedIndex(field index.Field) error {
 	t := index.Term{
-		SeriesID:  w.itemID.seriesID,
+		SeriesID:  w.itemID.SeriesID,
 		IndexRule: string(field.Term),
 	}
 	field.Term = t.Marshal()
-	return w.block.writeInvertedIndex(field, w.itemID.id)
+	return w.block.writeInvertedIndex(field, w.itemID.ID)
 }
 
 type dataBucket struct {
@@ -163,7 +163,7 @@ func (w *writer) Write() (GlobalItemID, error) {
 	id := w.ItemID()
 	for _, c := range w.columns {
 		err := w.block.write(dataBucket{
-			seriesID: w.itemID.seriesID,
+			seriesID: w.itemID.SeriesID,
 			family:   c.family,
 		}.marshal(),
 			c.val, w.ts)
@@ -172,7 +172,7 @@ func (w *writer) Write() (GlobalItemID, error) {
 		}
 	}
 	return id, w.block.writePrimaryIndex(index.Field{
-		Term:  id.seriesID.Marshal(),
+		Term:  id.SeriesID.Marshal(),
 		Value: convert.Int64ToBytes(w.ts.UnixNano()),
-	}, id.id)
+	}, id.ID)
 }
diff --git a/banyand/tsdb/seriesdb.go b/banyand/tsdb/seriesdb.go
index ccae717..422029f 100644
--- a/banyand/tsdb/seriesdb.go
+++ b/banyand/tsdb/seriesdb.go
@@ -87,6 +87,7 @@ func NewPath(entries []Entry) Path {
 
 type SeriesDatabase interface {
 	io.Closer
+	GetByID(id common.SeriesID) (Series, error)
 	Get(entity Entity) (Series, error)
 	List(path Path) (SeriesList, error)
 }
@@ -109,6 +110,10 @@ type seriesDB struct {
 	sID            common.ShardID
 }
 
+func (s *seriesDB) GetByID(id common.SeriesID) (Series, error) {
+	return newSeries(s.context(), id, s), nil
+}
+
 func (s *seriesDB) block(id GlobalItemID) blockDelegate {
 	return s.lst[id.segID].lst[id.blockID].delegate()
 }
diff --git a/banyand/tsdb/seriesdb_test.go b/banyand/tsdb/seriesdb_test.go
index a87d24c..8819772 100644
--- a/banyand/tsdb/seriesdb_test.go
+++ b/banyand/tsdb/seriesdb_test.go
@@ -338,7 +338,7 @@ func setUpEntities(t *assert.Assertions, db SeriesDatabase) []*entityWithID {
 }
 
 func newMockSeries(id common.SeriesID) *series {
-	return newSeries(nil, id, nil)
+	return newSeries(context.TODO(), id, nil)
 }
 
 func transform(list SeriesList) (seriesIDs []common.SeriesID) {
diff --git a/pkg/index/index.go b/pkg/index/index.go
index 46c70be..d6cbf86 100644
--- a/pkg/index/index.go
+++ b/pkg/index/index.go
@@ -33,10 +33,6 @@ func (f Field) Marshal() []byte {
 	return bytes.Join([][]byte{f.Term, f.Value}, nil)
 }
 
-type FieldSpec struct {
-	Name string
-}
-
 type RangeOpts struct {
 	Upper         []byte
 	Lower         []byte
diff --git a/pkg/index/inverted/field_map.go b/pkg/index/inverted/field_map.go
index f8d3789..57826d1 100644
--- a/pkg/index/inverted/field_map.go
+++ b/pkg/index/inverted/field_map.go
@@ -18,6 +18,8 @@
 package inverted
 
 import (
+	"sync"
+
 	"github.com/pkg/errors"
 
 	"github.com/apache/skywalking-banyandb/api/common"
@@ -30,7 +32,8 @@ var ErrFieldAbsent = errors.New("field doesn't exist")
 type fieldHashID uint64
 
 type fieldMap struct {
-	repo map[fieldHashID]*fieldValue
+	repo  map[fieldHashID]*fieldValue
+	mutex sync.RWMutex
 }
 
 func newFieldMap(initialSize int) *fieldMap {
@@ -39,22 +42,32 @@ func newFieldMap(initialSize int) *fieldMap {
 	}
 }
 
-func (fm *fieldMap) createKey(key []byte) {
-	fm.repo[fieldHashID(convert.Hash(key))] = &fieldValue{
+func (fm *fieldMap) createKey(key []byte) *fieldValue {
+	result := &fieldValue{
 		key:   key,
 		value: newPostingMap(),
 	}
+	fm.repo[fieldHashID(convert.Hash(key))] = result
+	return result
 }
 
 func (fm *fieldMap) get(key []byte) (*fieldValue, bool) {
+	fm.mutex.RLock()
+	defer fm.mutex.RUnlock()
+	return fm.getWithoutLock(key)
+}
+
+func (fm *fieldMap) getWithoutLock(key []byte) (*fieldValue, bool) {
 	v, ok := fm.repo[fieldHashID(convert.Hash(key))]
 	return v, ok
 }
 
 func (fm *fieldMap) put(fv index.Field, id common.ItemID) error {
-	pm, ok := fm.get(fv.Term)
+	fm.mutex.Lock()
+	defer fm.mutex.Unlock()
+	pm, ok := fm.getWithoutLock(fv.Term)
 	if !ok {
-		return errors.Wrapf(ErrFieldAbsent, "filed Term:%s", fv.Term)
+		pm = fm.createKey(fv.Term)
 	}
 	return pm.value.put(fv.Value, id)
 }
diff --git a/pkg/index/inverted/inverted.go b/pkg/index/inverted/inverted.go
index d610d54..569d434 100644
--- a/pkg/index/inverted/inverted.go
+++ b/pkg/index/inverted/inverted.go
@@ -24,7 +24,6 @@ import (
 
 type GlobalStore interface {
 	Searcher() index.Searcher
-	Initialize(fields []index.FieldSpec) error
 	Insert(field index.Field, docID common.ItemID) error
 }
 
@@ -37,10 +36,6 @@ func (s *store) Searcher() index.Searcher {
 	return s.memTable
 }
 
-func (s *store) Initialize(fields []index.FieldSpec) error {
-	return s.memTable.Initialize(fields)
-}
-
 func (s *store) Insert(field index.Field, chunkID common.ItemID) error {
 	return s.memTable.Insert(field, chunkID)
 }
diff --git a/pkg/index/inverted/mem.go b/pkg/index/inverted/mem.go
index e2db1f6..a6e2c12 100644
--- a/pkg/index/inverted/mem.go
+++ b/pkg/index/inverted/mem.go
@@ -41,21 +41,11 @@ type MemTable struct {
 
 func NewMemTable(name string) *MemTable {
 	return &MemTable{
-		name: name,
+		name:  name,
+		terms: newFieldMap(1000),
 	}
 }
 
-func (m *MemTable) Initialize(fields []index.FieldSpec) error {
-	if len(fields) < 1 {
-		return ErrFieldsAbsent
-	}
-	m.terms = newFieldMap(len(fields))
-	for _, f := range fields {
-		m.terms.createKey([]byte(f.Name))
-	}
-	return nil
-}
-
 func (m *MemTable) Insert(field index.Field, chunkID common.ItemID) error {
 	return m.terms.put(field, chunkID)
 }
diff --git a/pkg/index/inverted/mem_test.go b/pkg/index/inverted/mem_test.go
index 45117a7..d3c4731 100644
--- a/pkg/index/inverted/mem_test.go
+++ b/pkg/index/inverted/mem_test.go
@@ -26,56 +26,15 @@ import (
 	"github.com/apache/skywalking-banyandb/api/common"
 	modelv2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v2"
 	"github.com/apache/skywalking-banyandb/pkg/convert"
+	"github.com/apache/skywalking-banyandb/pkg/index"
 	"github.com/apache/skywalking-banyandb/pkg/index/posting"
 	"github.com/apache/skywalking-banyandb/pkg/index/posting/roaring"
 )
 
-func TestMemTable_Initialize(t *testing.T) {
-	type args struct {
-		fields []FieldSpec
-	}
-	tests := []struct {
-		name    string
-		args    args
-		wantErr bool
-	}{
-		{
-			name: "golden path",
-			args: args{
-				fields: []FieldSpec{
-					{
-						Name: "service_name",
-					},
-					{
-						Name: "duration",
-					},
-				},
-			},
-		},
-		{
-			name:    "fields absent",
-			wantErr: true,
-		},
-	}
-	for _, tt := range tests {
-		t.Run(tt.name, func(t *testing.T) {
-			m := NewMemTable("sw")
-			var err error
-			if err = m.Initialize(tt.args.fields); (err != nil) != tt.wantErr {
-				t.Errorf("Initialize() error = %v, wantErr %v", err, tt.wantErr)
-			}
-			if err != nil {
-				return
-			}
-			assert.Equal(t, len(m.terms.repo), len(tt.args.fields))
-		})
-	}
-}
-
 func TestMemTable_Range(t *testing.T) {
 	type args struct {
 		fieldName []byte
-		opts      *RangeOpts
+		opts      index.RangeOpts
 	}
 	m := NewMemTable("sw")
 	setUp(t, m)
@@ -88,13 +47,13 @@ func TestMemTable_Range(t *testing.T) {
 			name: "in range",
 			args: args{
 				fieldName: []byte("duration"),
-				opts: &RangeOpts{
+				opts: index.RangeOpts{
 					Lower: convert.Uint16ToBytes(100),
 					Upper: convert.Uint16ToBytes(500),
 				},
 			},
-			wantList: m.MatchTerms(&Field{
-				Name:  []byte("duration"),
+			wantList: m.MatchTerms(index.Field{
+				Term:  []byte("duration"),
 				Value: convert.Uint16ToBytes(200),
 			}),
 		},
@@ -102,14 +61,14 @@ func TestMemTable_Range(t *testing.T) {
 			name: "excludes edge",
 			args: args{
 				fieldName: []byte("duration"),
-				opts: &RangeOpts{
+				opts: index.RangeOpts{
 					Lower: convert.Uint16ToBytes(50),
 					Upper: convert.Uint16ToBytes(1000),
 				},
 			},
 			wantList: union(m,
-				&Field{
-					Name:  []byte("duration"),
+				index.Field{
+					Term:  []byte("duration"),
 					Value: convert.Uint16ToBytes(200),
 				},
 			),
@@ -118,19 +77,19 @@ func TestMemTable_Range(t *testing.T) {
 			name: "includes lower",
 			args: args{
 				fieldName: []byte("duration"),
-				opts: &RangeOpts{
+				opts: index.RangeOpts{
 					Lower:         convert.Uint16ToBytes(50),
 					Upper:         convert.Uint16ToBytes(1000),
 					IncludesLower: true,
 				},
 			},
 			wantList: union(m,
-				&Field{
-					Name:  []byte("duration"),
+				index.Field{
+					Term:  []byte("duration"),
 					Value: convert.Uint16ToBytes(50),
 				},
-				&Field{
-					Name:  []byte("duration"),
+				index.Field{
+					Term:  []byte("duration"),
 					Value: convert.Uint16ToBytes(200),
 				},
 			),
@@ -139,19 +98,19 @@ func TestMemTable_Range(t *testing.T) {
 			name: "includes upper",
 			args: args{
 				fieldName: []byte("duration"),
-				opts: &RangeOpts{
+				opts: index.RangeOpts{
 					Lower:         convert.Uint16ToBytes(50),
 					Upper:         convert.Uint16ToBytes(1000),
 					IncludesUpper: true,
 				},
 			},
 			wantList: union(m,
-				&Field{
-					Name:  []byte("duration"),
+				index.Field{
+					Term:  []byte("duration"),
 					Value: convert.Uint16ToBytes(200),
 				},
-				&Field{
-					Name:  []byte("duration"),
+				index.Field{
+					Term:  []byte("duration"),
 					Value: convert.Uint16ToBytes(1000),
 				},
 			),
@@ -160,7 +119,7 @@ func TestMemTable_Range(t *testing.T) {
 			name: "includes edges",
 			args: args{
 				fieldName: []byte("duration"),
-				opts: &RangeOpts{
+				opts: index.RangeOpts{
 					Lower:         convert.Uint16ToBytes(50),
 					Upper:         convert.Uint16ToBytes(1000),
 					IncludesUpper: true,
@@ -168,16 +127,16 @@ func TestMemTable_Range(t *testing.T) {
 				},
 			},
 			wantList: union(m,
-				&Field{
-					Name:  []byte("duration"),
+				index.Field{
+					Term:  []byte("duration"),
 					Value: convert.Uint16ToBytes(50),
 				},
-				&Field{
-					Name:  []byte("duration"),
+				index.Field{
+					Term:  []byte("duration"),
 					Value: convert.Uint16ToBytes(200),
 				},
-				&Field{
-					Name:  []byte("duration"),
+				index.Field{
+					Term:  []byte("duration"),
 					Value: convert.Uint16ToBytes(1000),
 				},
 			),
@@ -186,7 +145,7 @@ func TestMemTable_Range(t *testing.T) {
 			name: "match one",
 			args: args{
 				fieldName: []byte("duration"),
-				opts: &RangeOpts{
+				opts: index.RangeOpts{
 					Lower:         convert.Uint16ToBytes(200),
 					Upper:         convert.Uint16ToBytes(200),
 					IncludesUpper: true,
@@ -194,8 +153,8 @@ func TestMemTable_Range(t *testing.T) {
 				},
 			},
 			wantList: union(m,
-				&Field{
-					Name:  []byte("duration"),
+				index.Field{
+					Term:  []byte("duration"),
 					Value: convert.Uint16ToBytes(200),
 				},
 			),
@@ -249,14 +208,14 @@ func TestMemTable_Iterator(t *testing.T) {
 				_ = iter.Close()
 			}()
 			for iter.Next() {
-				got = append(got, iter.Val().key)
+				got = append(got, iter.Val().Key)
 			}
 			tester.Equal(tt.want, got)
 		})
 	}
 }
 
-func union(memTable *MemTable, fields ...*Field) posting.List {
+func union(memTable *MemTable, fields ...index.Field) posting.List {
 	result := roaring.NewPostingList()
 	for _, f := range fields {
 		_ = result.Union(memTable.MatchTerms(f))
@@ -265,23 +224,15 @@ func union(memTable *MemTable, fields ...*Field) posting.List {
 }
 
 func setUp(t *testing.T, mt *MemTable) {
-	assert.NoError(t, mt.Initialize([]FieldSpec{
-		{
-			Name: "service_name",
-		},
-		{
-			Name: "duration",
-		},
-	}))
 	for i := 0; i < 100; i++ {
 		if i%2 == 0 {
-			assert.NoError(t, mt.Insert(&Field{
-				Name:  []byte("service_name"),
+			assert.NoError(t, mt.Insert(index.Field{
+				Term:  []byte("service_name"),
 				Value: []byte("gateway"),
 			}, common.ItemID(i)))
 		} else {
-			assert.NoError(t, mt.Insert(&Field{
-				Name:  []byte("service_name"),
+			assert.NoError(t, mt.Insert(index.Field{
+				Term:  []byte("service_name"),
 				Value: []byte("webpage"),
 			}, common.ItemID(i)))
 		}
@@ -289,18 +240,18 @@ func setUp(t *testing.T, mt *MemTable) {
 	for i := 100; i < 200; i++ {
 		switch {
 		case i%3 == 0:
-			assert.NoError(t, mt.Insert(&Field{
-				Name:  []byte("duration"),
+			assert.NoError(t, mt.Insert(index.Field{
+				Term:  []byte("duration"),
 				Value: convert.Uint16ToBytes(50),
 			}, common.ItemID(i)))
 		case i%3 == 1:
-			assert.NoError(t, mt.Insert(&Field{
-				Name:  []byte("duration"),
+			assert.NoError(t, mt.Insert(index.Field{
+				Term:  []byte("duration"),
 				Value: convert.Uint16ToBytes(200),
 			}, common.ItemID(i)))
 		case i%3 == 2:
-			assert.NoError(t, mt.Insert(&Field{
-				Name:  []byte("duration"),
+			assert.NoError(t, mt.Insert(index.Field{
+				Term:  []byte("duration"),
 				Value: convert.Uint16ToBytes(1000),
 			}, common.ItemID(i)))
 		}
diff --git a/pkg/index/search.go b/pkg/index/search.go
index 3fa8bd4..833d904 100644
--- a/pkg/index/search.go
+++ b/pkg/index/search.go
@@ -268,7 +268,7 @@ type not struct {
 }
 
 func (n *not) Execute() (posting.List, error) {
-	all := n.searcher.MatchField([]byte(n.Key))
+	all := n.searcher.MatchField(n.Key)
 	list, err := n.Inner.Execute()
 	if err != nil {
 		return nil, err
@@ -289,7 +289,7 @@ type eq struct {
 
 func (eq *eq) Execute() (posting.List, error) {
 	return eq.searcher.MatchTerms(Field{
-		Term:  []byte(eq.Key),
+		Term:  eq.Key,
 		Value: bytes.Join(eq.Values, nil),
 	}), nil
 }

[skywalking-banyandb] 03/03: Fix test issues

Posted by ha...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch time-series
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit 1bbd27153e60f8768174b3c49fd78a9fe60fe930
Author: Gao Hongtao <ha...@gmail.com>
AuthorDate: Tue Sep 7 18:31:39 2021 +0800

    Fix test issues
    
    Signed-off-by: Gao Hongtao <ha...@gmail.com>
---
 banyand/tsdb/seriesdb_test.go | 32 ++++++++++++++++----------------
 banyand/tsdb/tsdb.go          |  2 +-
 banyand/tsdb/tsdb_test.go     |  2 +-
 3 files changed, 18 insertions(+), 18 deletions(-)

diff --git a/banyand/tsdb/seriesdb_test.go b/banyand/tsdb/seriesdb_test.go
index 8819772..8429295 100644
--- a/banyand/tsdb/seriesdb_test.go
+++ b/banyand/tsdb/seriesdb_test.go
@@ -185,7 +185,7 @@ func Test_SeriesDatabase_Get(t *testing.T) {
 			for _, entity := range tt.entities {
 				series, err := s.Get(entity)
 				tester.NoError(err)
-				tester.Equal(hashEntity(entity), series.ID())
+				tester.Greater(uint(series.ID()), uint(0))
 			}
 		})
 	}
@@ -216,7 +216,7 @@ func Test_SeriesDatabase_List(t *testing.T) {
 				convert.Uint64ToBytes(0),
 			}),
 			want: SeriesList{
-				newMockSeries(data[0].id),
+				newMockSeries(data[0].id, s.(*seriesDB)),
 			},
 		},
 		{
@@ -227,8 +227,8 @@ func Test_SeriesDatabase_List(t *testing.T) {
 				AnyEntry,
 			}),
 			want: SeriesList{
-				newMockSeries(data[1].id),
-				newMockSeries(data[2].id),
+				newMockSeries(data[1].id, s.(*seriesDB)),
+				newMockSeries(data[2].id, s.(*seriesDB)),
 			},
 		},
 		{
@@ -239,10 +239,10 @@ func Test_SeriesDatabase_List(t *testing.T) {
 				AnyEntry,
 			}),
 			want: SeriesList{
-				newMockSeries(data[0].id),
-				newMockSeries(data[1].id),
-				newMockSeries(data[2].id),
-				newMockSeries(data[3].id),
+				newMockSeries(data[0].id, s.(*seriesDB)),
+				newMockSeries(data[1].id, s.(*seriesDB)),
+				newMockSeries(data[2].id, s.(*seriesDB)),
+				newMockSeries(data[3].id, s.(*seriesDB)),
 			},
 		},
 		{
@@ -253,9 +253,9 @@ func Test_SeriesDatabase_List(t *testing.T) {
 				convert.Uint64ToBytes(0),
 			}),
 			want: SeriesList{
-				newMockSeries(data[0].id),
-				newMockSeries(data[1].id),
-				newMockSeries(data[3].id),
+				newMockSeries(data[0].id, s.(*seriesDB)),
+				newMockSeries(data[1].id, s.(*seriesDB)),
+				newMockSeries(data[3].id, s.(*seriesDB)),
 			},
 		},
 		{
@@ -266,8 +266,8 @@ func Test_SeriesDatabase_List(t *testing.T) {
 				convert.Uint64ToBytes(1),
 			}),
 			want: SeriesList{
-				newMockSeries(data[2].id),
-				newMockSeries(data[4].id),
+				newMockSeries(data[2].id, s.(*seriesDB)),
+				newMockSeries(data[4].id, s.(*seriesDB)),
 			},
 		},
 	}
@@ -332,13 +332,13 @@ func setUpEntities(t *assert.Assertions, db SeriesDatabase) []*entityWithID {
 		d.id = common.SeriesID(convert.BytesToUint64(hash(hashEntity(d.entity))))
 		series, err := db.Get(d.entity)
 		t.NoError(err)
-		t.Equal(hashEntity(d.entity), series.ID())
+		t.Greater(uint(series.ID()), uint(0))
 	}
 	return data
 }
 
-func newMockSeries(id common.SeriesID) *series {
-	return newSeries(context.TODO(), id, nil)
+func newMockSeries(id common.SeriesID, blockDB *seriesDB) *series {
+	return newSeries(context.TODO(), id, blockDB)
 }
 
 func transform(list SeriesList) (seriesIDs []common.SeriesID) {
diff --git a/banyand/tsdb/tsdb.go b/banyand/tsdb/tsdb.go
index 4528b46..373e438 100644
--- a/banyand/tsdb/tsdb.go
+++ b/banyand/tsdb/tsdb.go
@@ -36,7 +36,7 @@ import (
 
 const (
 	shardTemplate       = "%s/shard-%d"
-	seriesTemplate      = "%s/seriesSpan"
+	seriesTemplate      = "%s/series"
 	segTemplate         = "%s/seg-%s"
 	blockTemplate       = "%s/block-%s"
 	globalIndexTemplate = "%s/index"
diff --git a/banyand/tsdb/tsdb_test.go b/banyand/tsdb/tsdb_test.go
index e12e0a6..1c17c2b 100644
--- a/banyand/tsdb/tsdb_test.go
+++ b/banyand/tsdb/tsdb_test.go
@@ -39,7 +39,7 @@ func TestOpenDatabase(t *testing.T) {
 	seriesPath := fmt.Sprintf(seriesTemplate, shardPath)
 	validateDirectory(tester, seriesPath)
 	now := time.Now()
-	segPath := fmt.Sprintf(segTemplate, seriesPath, now.Format(segFormat))
+	segPath := fmt.Sprintf(segTemplate, shardPath, now.Format(segFormat))
 	validateDirectory(tester, segPath)
 	validateDirectory(tester, fmt.Sprintf(blockTemplate, segPath, now.Format(blockFormat)))
 }