You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ha...@apache.org on 2021/09/07 10:32:05 UTC

[skywalking-banyandb] 01/03: Add stream query test

This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch time-series
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit cd628eaa55cba84f110d92940e4badfdd3dd3fff
Author: Gao Hongtao <ha...@gmail.com>
AuthorDate: Tue Sep 7 11:15:08 2021 +0800

    Add stream query test
    
    Signed-off-by: Gao Hongtao <ha...@gmail.com>
---
 banyand/kv/badger.go                         |  36 ++--
 banyand/kv/kv.go                             |   3 +-
 banyand/stream/stream_query.go               |  35 +---
 banyand/stream/stream_query_test.go          | 301 +++++++++++++++++++++++++++
 banyand/stream/stream_write.go               |  29 ++-
 banyand/stream/stream_write_test.go          |   2 +-
 banyand/stream/testdata/multiple_shards.json |  64 ++++++
 banyand/stream/testdata/shard0.json          |  18 --
 banyand/stream/testdata/single_series.json   |  51 +++++
 banyand/tsdb/series.go                       |  74 +++++--
 banyand/tsdb/series_seek.go                  |   7 +-
 banyand/tsdb/series_seek_filter.go           |   3 +
 banyand/tsdb/series_seek_sort.go             |  71 ++++---
 banyand/tsdb/series_write.go                 |   5 +-
 banyand/tsdb/seriesdb.go                     |  38 +++-
 banyand/tsdb/seriesdb_test.go                |   2 +-
 banyand/tsdb/shard.go                        |   4 +
 banyand/tsdb/tsdb.go                         |   1 +
 18 files changed, 607 insertions(+), 137 deletions(-)

diff --git a/banyand/kv/badger.go b/banyand/kv/badger.go
index 0dfbbe7..131032d 100644
--- a/banyand/kv/badger.go
+++ b/banyand/kv/badger.go
@@ -27,7 +27,9 @@ import (
 	"github.com/dgraph-io/badger/v3/y"
 	"go.uber.org/multierr"
 
+	"github.com/apache/skywalking-banyandb/api/common"
 	modelv2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v2"
+	"github.com/apache/skywalking-banyandb/pkg/convert"
 	"github.com/apache/skywalking-banyandb/pkg/index"
 	"github.com/apache/skywalking-banyandb/pkg/index/posting"
 	"github.com/apache/skywalking-banyandb/pkg/index/posting/roaring"
@@ -196,15 +198,11 @@ func (i *iterator) Seek(key []byte) {
 }
 
 func (i *iterator) Key() []byte {
-	return i.delegated.Key()
+	return y.ParseKey(i.delegated.Key())
 }
 
-func (i *iterator) Val() posting.List {
-	list := roaring.NewPostingList()
-	data := make([]byte, len(i.delegated.Value().Value))
-	copy(data, i.delegated.Value().Value)
-	_ = list.Unmarshall(data)
-	return list
+func (i *iterator) Val() []byte {
+	return y.Copy(i.delegated.Value().Value)
 }
 
 func (i *iterator) Valid() bool {
@@ -286,22 +284,30 @@ var _ index.FieldIterator = (*fIterator)(nil)
 type fIterator struct {
 	init     bool
 	delegate Iterator
+	curr     *index.PostingValue
 }
 
 func (f *fIterator) Next() bool {
-	if f.init {
-		f.delegate.Next()
-	} else {
+	if !f.init {
 		f.init = true
+		f.delegate.Rewind()
 	}
-	return f.delegate.Valid()
+	if !f.delegate.Valid() {
+		return false
+	}
+	pv := &index.PostingValue{
+		Key:   f.delegate.Key(),
+		Value: roaring.NewPostingListWithInitialData(convert.BytesToUint64(f.delegate.Val())),
+	}
+	for ; f.delegate.Valid() && bytes.Equal(pv.Key, f.delegate.Key()); f.delegate.Next() {
+		pv.Value.Insert(common.ItemID(convert.BytesToUint64(f.delegate.Val())))
+	}
+	f.curr = pv
+	return true
 }
 
 func (f *fIterator) Val() *index.PostingValue {
-	return &index.PostingValue{
-		Key:   f.delegate.Key(),
-		Value: f.delegate.Val(),
-	}
+	return f.curr
 }
 
 func (f *fIterator) Close() error {
diff --git a/banyand/kv/kv.go b/banyand/kv/kv.go
index 5cea6ed..4e17456 100644
--- a/banyand/kv/kv.go
+++ b/banyand/kv/kv.go
@@ -26,7 +26,6 @@ import (
 	"github.com/pkg/errors"
 
 	"github.com/apache/skywalking-banyandb/pkg/index"
-	"github.com/apache/skywalking-banyandb/pkg/index/posting"
 	"github.com/apache/skywalking-banyandb/pkg/logger"
 	posting2 "github.com/apache/skywalking-banyandb/pkg/posting"
 )
@@ -109,7 +108,7 @@ type Iterator interface {
 	Rewind()
 	Seek(key []byte)
 	Key() []byte
-	Val() posting.List
+	Val() []byte
 	Valid() bool
 	Close() error
 }
diff --git a/banyand/stream/stream_query.go b/banyand/stream/stream_query.go
index ac7d038..3402665 100644
--- a/banyand/stream/stream_query.go
+++ b/banyand/stream/stream_query.go
@@ -18,8 +18,6 @@
 package stream
 
 import (
-	"bytes"
-
 	"github.com/golang/protobuf/proto"
 	"github.com/pkg/errors"
 
@@ -41,37 +39,22 @@ type Query interface {
 	Stream(stream *commonv2.Metadata) (Stream, error)
 }
 
-type EqualCondition struct {
-	tag   string
-	value []byte
-}
-
 type Stream interface {
-	Shards(equalConditions []EqualCondition) ([]tsdb.Shard, error)
+	Shards(entity tsdb.Entity) ([]tsdb.Shard, error)
 }
 
 var _ Stream = (*stream)(nil)
 
-func (s *stream) Shards(equalConditions []EqualCondition) ([]tsdb.Shard, error) {
-	entityItemLen := len(s.entityIndex)
-	entityItems := make([][]byte, entityItemLen)
-	var entityCount int
-	for _, ec := range equalConditions {
-		fi, ti, tag := s.findTagByName(ec.tag)
-		if tag == nil {
-			return nil, ErrTagNotExist
-		}
-		for i, eIndex := range s.entityIndex {
-			if eIndex.family == fi && eIndex.tag == ti {
-				entityItems[i] = ec.value
-				entityCount++
-			}
-		}
-	}
-	if entityCount < entityItemLen {
+func (s *stream) Shards(entity tsdb.Entity) ([]tsdb.Shard, error) {
+	if len(entity) < 1 {
 		return s.db.Shards(), nil
 	}
-	shardID, err := partition.ShardID(bytes.Join(entityItems, nil), s.schema.GetShardNum())
+	for _, e := range entity {
+		if e == nil {
+			return s.db.Shards(), nil
+		}
+	}
+	shardID, err := partition.ShardID(entity.Marshal(), s.schema.GetShardNum())
 	if err != nil {
 		return nil, err
 	}
diff --git a/banyand/stream/stream_query_test.go b/banyand/stream/stream_query_test.go
new file mode 100644
index 0000000..8b6cc12
--- /dev/null
+++ b/banyand/stream/stream_query_test.go
@@ -0,0 +1,301 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package stream
+
+import (
+	"bytes"
+	"embed"
+	_ "embed"
+	"encoding/base64"
+	"encoding/json"
+	"fmt"
+	"sort"
+	"strconv"
+	"testing"
+	"time"
+
+	"github.com/golang/protobuf/jsonpb"
+	"github.com/stretchr/testify/assert"
+	"google.golang.org/protobuf/types/known/timestamppb"
+
+	"github.com/apache/skywalking-banyandb/api/common"
+	modelv2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v2"
+	streamv2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v2"
+	"github.com/apache/skywalking-banyandb/banyand/tsdb"
+	"github.com/apache/skywalking-banyandb/pkg/convert"
+	"github.com/apache/skywalking-banyandb/pkg/partition"
+)
+
+func Test_Stream_SelectShard(t *testing.T) {
+	tester := assert.New(t)
+	s, deferFunc := setup(tester)
+	defer deferFunc()
+	_ = setupQueryData(tester, "multiple_shards.json", s)
+	tests := []struct {
+		name         string
+		entity       tsdb.Entity
+		wantShardNum int
+		wantErr      bool
+	}{
+		{
+			name:         "all shards",
+			wantShardNum: 2,
+		},
+		{
+			name:         "select a shard",
+			entity:       tsdb.Entity{tsdb.Entry("webapp_id"), tsdb.Entry("10.0.0.1_id"), convert.Int64ToBytes(0)},
+			wantShardNum: 1,
+		},
+		{
+			name:         "select shards",
+			entity:       tsdb.Entity{tsdb.Entry("webapp_id"), tsdb.AnyEntry, convert.Int64ToBytes(0)},
+			wantShardNum: 2,
+		},
+	}
+
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			shards, err := s.Shards(tt.entity)
+			if tt.wantErr {
+				tester.Error(err)
+				return
+			}
+			tester.NoError(err)
+			tester.Equal(tt.wantShardNum, len(shards))
+		})
+	}
+
+}
+
+func Test_Stream_Series(t *testing.T) {
+	tester := assert.New(t)
+	s, deferFunc := setup(tester)
+	defer deferFunc()
+	baseTime := setupQueryData(tester, "multiple_shards.json", s)
+	type args struct {
+		entity tsdb.Entity
+	}
+	type shardStruct struct {
+		id       common.ShardID
+		location []string
+		elements []string
+	}
+	type want struct {
+		shards []shardStruct
+	}
+
+	tests := []struct {
+		name    string
+		args    args
+		want    want
+		wantErr bool
+	}{
+		{
+			name: "all",
+			args: args{
+				entity: tsdb.Entity{tsdb.AnyEntry, tsdb.AnyEntry, tsdb.AnyEntry},
+			},
+			want: want{
+				shards: []shardStruct{
+					{
+						id:       0,
+						location: []string{"series_12243341348514563931", "data_flow_0"},
+						elements: []string{"1"},
+					},
+					{
+						id:       0,
+						location: []string{"series_1671844747554927007", "data_flow_0"},
+						elements: []string{"2"},
+					},
+					{
+						id:       1,
+						location: []string{"series_2374367181827824198", "data_flow_0"},
+						elements: []string{"5", "3"},
+					},
+					{
+						id:       1,
+						location: []string{"series_8429137420168685297", "data_flow_0"},
+						elements: []string{"4"},
+					},
+				},
+			},
+		},
+		{
+			name: "find series by service_id and instance_id",
+			args: args{
+				entity: tsdb.Entity{tsdb.Entry("webapp_id"), tsdb.Entry("10.0.0.1_id"), tsdb.AnyEntry},
+			},
+			want: want{
+				shards: []shardStruct{
+					{
+						id:       0,
+						location: []string{"series_12243341348514563931", "data_flow_0"},
+						elements: []string{"1"},
+					},
+					{
+						id:       1,
+						location: []string{"series_2374367181827824198", "data_flow_0"},
+						elements: []string{"5", "3"},
+					},
+				},
+			},
+		},
+		{
+			name: "find a series",
+			args: args{
+				entity: tsdb.Entity{tsdb.Entry("webapp_id"), tsdb.Entry("10.0.0.1_id"), convert.Uint64ToBytes(1)},
+			},
+			want: want{
+				shards: []shardStruct{
+					{
+						id:       1,
+						location: []string{"series_2374367181827824198", "data_flow_0"},
+						elements: []string{"5", "3"},
+					},
+				},
+			},
+		},
+	}
+
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			shards, err := s.Shards(tt.args.entity)
+			tester.NoError(err)
+			got := want{
+				shards: []shardStruct{},
+			}
+
+			for _, shard := range shards {
+				seriesList, err := shard.Series().List(tsdb.NewPath(tt.args.entity))
+				tester.NoError(err)
+				for _, series := range seriesList {
+					func(g *want) {
+						sp, err := series.Span(tsdb.NewTimeRangeDuration(baseTime, 1*time.Hour))
+						defer func(sp tsdb.SeriesSpan) {
+							_ = sp.Close()
+						}(sp)
+						tester.NoError(err)
+						seeker, err := sp.SeekerBuilder().Build()
+						tester.NoError(err)
+						iter, err := seeker.Seek()
+						tester.NoError(err)
+						for dataFlowID, iterator := range iter {
+							var elements []string
+							for iterator.Next() {
+								tagFamily, err := s.ParseTagFamily("searchable", iterator.Val())
+								tester.NoError(err)
+								for _, tag := range tagFamily.GetTags() {
+									if tag.GetKey() == "trace_id" {
+										elements = append(elements, tag.GetValue().GetStr().GetValue())
+									}
+								}
+							}
+							_ = iterator.Close()
+							g.shards = append(g.shards, shardStruct{
+								id: shard.ID(),
+								location: []string{
+									fmt.Sprintf("series_%v", series.ID()),
+									"data_flow_" + strconv.Itoa(dataFlowID),
+								},
+								elements: elements,
+							})
+						}
+
+					}(&got)
+				}
+			}
+			if tt.wantErr {
+				tester.Error(err)
+				return
+			}
+			tester.NoError(err)
+			sort.SliceStable(got.shards, func(i, j int) bool {
+				a := got.shards[i]
+				b := got.shards[j]
+				if a.id > b.id {
+					return false
+				}
+				for i, al := range a.location {
+					bl := b.location[i]
+					if bytes.Compare([]byte(al), []byte(bl)) > 0 {
+						return false
+					}
+				}
+				return true
+			})
+			tester.Equal(tt.want, got)
+		})
+	}
+
+}
+
+//go:embed testdata/*.json
+var dataFS embed.FS
+
+func setupQueryData(t *assert.Assertions, dataFile string, stream *stream) (baseTime time.Time) {
+	var templates []interface{}
+	baseTime = time.Now()
+	content, err := dataFS.ReadFile("testdata/" + dataFile)
+	t.NoError(err)
+	t.NoError(json.Unmarshal(content, &templates))
+	bb, _ := base64.StdEncoding.DecodeString("YWJjMTIzIT8kKiYoKSctPUB+")
+	for i, template := range templates {
+		rawSearchTagFamily, err := json.Marshal(template)
+		t.NoError(err)
+		searchTagFamily := &streamv2.ElementValue_TagFamily{}
+		t.NoError(jsonpb.UnmarshalString(string(rawSearchTagFamily), searchTagFamily))
+		e := &streamv2.ElementValue{
+			ElementId: strconv.Itoa(i),
+			Timestamp: timestamppb.New(baseTime.Add(time.Millisecond * time.Duration(i))),
+			TagFamilies: []*streamv2.ElementValue_TagFamily{
+				{
+					Tags: []*modelv2.TagValue{
+						{
+							Value: &modelv2.TagValue_BinaryData{
+								BinaryData: bb,
+							},
+						},
+					},
+				},
+			},
+		}
+		e.TagFamilies = append(e.TagFamilies, searchTagFamily)
+		entity, err := stream.buildEntity(e)
+		t.NoError(err)
+		shardID, err := partition.ShardID(entity.Marshal(), stream.schema.GetShardNum())
+		t.NoError(err)
+		itemID, err := stream.write(common.ShardID(shardID), e)
+		t.NoError(err)
+		sa, err := stream.Shards(entity)
+		for _, shard := range sa {
+			se, err := shard.Series().Get(entity)
+			t.NoError(err)
+			for {
+				item, closer, _ := se.Get(*itemID)
+				rawTagFamily, _ := item.Val("searchable")
+				if len(rawTagFamily) > 0 {
+					_ = closer.Close()
+					break
+				}
+				_ = closer.Close()
+			}
+
+		}
+	}
+	return baseTime
+}
diff --git a/banyand/stream/stream_write.go b/banyand/stream/stream_write.go
index 2bcd0bf..fa80dc3 100644
--- a/banyand/stream/stream_write.go
+++ b/banyand/stream/stream_write.go
@@ -40,34 +40,34 @@ var (
 	ErrUnsupportedTagForIndexField = errors.New("the tag type(for example, null) can not be as the index field value")
 )
 
-func (s *stream) write(shardID common.ShardID, value *streamv2.ElementValue) error {
+func (s *stream) write(shardID common.ShardID, value *streamv2.ElementValue) (*tsdb.GlobalItemID, error) {
 	sm := s.schema
 	fLen := len(value.GetTagFamilies())
 	if fLen < 1 {
-		return errors.Wrap(ErrMalformedElement, "no tag family")
+		return nil, errors.Wrap(ErrMalformedElement, "no tag family")
 	}
 	if fLen > len(sm.TagFamilies) {
-		return errors.Wrap(ErrMalformedElement, "tag family number is more than expected")
+		return nil, errors.Wrap(ErrMalformedElement, "tag family number is more than expected")
 	}
 	shard, err := s.db.Shard(shardID)
 	if err != nil {
-		return err
+		return nil, err
 	}
 	entity, err := s.buildEntity(value)
 	if err != nil {
-		return err
+		return nil, err
 	}
 	series, err := shard.Series().Get(entity)
 	if err != nil {
-		return err
+		return nil, err
 	}
 	t := value.GetTimestamp().AsTime()
-	wp, err := series.Span(tsdb.NewTimeRange(t, 0))
+	wp, err := series.Span(tsdb.NewTimeRangeDuration(t, 0))
 	if err != nil {
 		if wp != nil {
 			_ = wp.Close()
 		}
-		return err
+		return nil, err
 	}
 	writeFn := func() (tsdb.Writer, error) {
 		builder := wp.WriterBuilder().Time(t)
@@ -97,12 +97,18 @@ func (s *stream) write(shardID common.ShardID, value *streamv2.ElementValue) err
 			return nil, errWrite
 		}
 		_, errWrite = writer.Write()
+		s.l.Debug().
+			Time("ts", t).
+			Int("ts_nano", t.Nanosecond()).
+			Interface("data", value).
+			Uint64("series_id", uint64(series.ID())).
+			Msg("write stream")
 		return writer, errWrite
 	}
 	writer, err := writeFn()
 	if err != nil {
 		_ = wp.Close()
-		return err
+		return nil, err
 	}
 	m := indexMessage{
 		localWriter: writer,
@@ -117,7 +123,8 @@ func (s *stream) write(shardID common.ShardID, value *streamv2.ElementValue) err
 		}()
 		s.indexCh <- m
 	}(m)
-	return err
+	itemID := writer.ItemID()
+	return &itemID, err
 }
 
 func getIndexValue(ruleIndex indexRule, value *streamv2.ElementValue) (val []byte, err error) {
@@ -204,7 +211,7 @@ func (w *writeCallback) Rev(message bus.Message) (resp bus.Message) {
 	}
 	sm := writeEvent.WriteRequest.GetMetadata()
 	id := formatStreamID(sm.GetName(), sm.GetGroup())
-	err := w.schemaMap[id].write(common.ShardID(writeEvent.ShardID), writeEvent.WriteRequest.GetElement())
+	_, err := w.schemaMap[id].write(common.ShardID(writeEvent.ShardID), writeEvent.WriteRequest.GetElement())
 	if err != nil {
 		w.l.Debug().Err(err)
 	}
diff --git a/banyand/stream/stream_write_test.go b/banyand/stream/stream_write_test.go
index 1853a7f..0ffe0d5 100644
--- a/banyand/stream/stream_write_test.go
+++ b/banyand/stream/stream_write_test.go
@@ -197,7 +197,7 @@ func Test_Stream_Write(t *testing.T) {
 
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
-			err := s.write(common.ShardID(tt.args.shardID), tt.args.ele)
+			_, err := s.write(common.ShardID(tt.args.shardID), tt.args.ele)
 			if tt.wantErr {
 				tester.Error(err)
 				return
diff --git a/banyand/stream/testdata/multiple_shards.json b/banyand/stream/testdata/multiple_shards.json
new file mode 100644
index 0000000..791c7c9
--- /dev/null
+++ b/banyand/stream/testdata/multiple_shards.json
@@ -0,0 +1,64 @@
+[
+  {
+    "tags": [
+      {"str":{"value": "1"}},
+      {"int":{"value": 0}},
+      {"str":{"value": "webapp_id"}},
+      {"str":{"value": "10.0.0.1_id"}},
+      {"str":{"value": "/home_id"}},
+      {"int":{"value": 1000}},
+      {"int":{"value": 1622933202000000000}}
+    ]
+  },
+  {
+    "tags": [
+      {"str":{"value": "2"}},
+      {"int":{"value": 0}},
+      {"str":{"value": "webapp_id"}},
+      {"str":{"value": "10.0.0.3_id"}},
+      {"str":{"value": "/home_id"}},
+      {"int":{"value": 500}},
+      {"int":{"value": 1622933202000000000}}
+    ]
+  },
+  {
+    "tags": [
+      {"str":{"value": "3"}},
+      {"int":{"value": 1}},
+      {"str":{"value": "webapp_id"}},
+      {"str":{"value": "10.0.0.1_id"}},
+      {"str":{"value": "/home_id"}},
+      {"int":{"value": 30}},
+      {"int":{"value": 1622933202000000000}},
+      {"str":{"value": "GET"}},
+      {"str":{"value": "500"}}
+    ]
+  },
+  {
+    "tags": [
+      {"str":{"value": "4"}},
+      {"int":{"value": 1}},
+      {"str":{"value": "webapp_id"}},
+      {"str":{"value": "10.0.0.5_id"}},
+      {"str":{"value": "/home_id"}},
+      {"int":{"value": 60}},
+      {"int":{"value": 1622933202000000000}},
+      {"str":{"value": "GET"}},
+      {"str":{"value": "400"}}
+    ]
+  },
+  {
+    "tags": [
+      {"str":{"value": "5"}},
+      {"int":{"value": 1}},
+      {"str":{"value": "webapp_id"}},
+      {"str":{"value": "10.0.0.1_id"}},
+      {"str":{"value": "/home_id"}},
+      {"int":{"value": 300}},
+      {"int":{"value": 1622933202000000000}},
+      {"str":{"value": "GET"}},
+      {"str":{"value": "500"}}
+    ]
+  }
+  
+]
\ No newline at end of file
diff --git a/banyand/stream/testdata/shard0.json b/banyand/stream/testdata/shard0.json
deleted file mode 100644
index 28de2f3..0000000
--- a/banyand/stream/testdata/shard0.json
+++ /dev/null
@@ -1,18 +0,0 @@
-[
-  {
-    "element_id": "1",
-    "timestamp": "2021-04-15T01:30:15.01Z",
-    "tag_families": [
-      {
-        "tags": [
-          {"binary_data": "YWJjMTIzIT8kKiYoKSctPUB+"}
-        ]
-      },
-      {
-        "tags": [
-          {"str": ""}
-        ]
-      }
-    ]
-  }
-]
\ No newline at end of file
diff --git a/banyand/stream/testdata/single_series.json b/banyand/stream/testdata/single_series.json
new file mode 100644
index 0000000..049ec0c
--- /dev/null
+++ b/banyand/stream/testdata/single_series.json
@@ -0,0 +1,51 @@
+[
+  {
+    "tags": [
+      {"str":{"value": "trace_id-xxfff.111323"}},
+      {"int":{"value": 0}},
+      {"str":{"value": "webapp_id"}},
+      {"str":{"value": "10.0.0.1_id"}},
+      {"str":{"value": "/home_id"}},
+      {"int":{"value": 1000}},
+      {"int":{"value": 1622933202000000000}}
+    ]
+  },
+  {
+    "tags": [
+      {"str":{"value": "trace_id-xxfff.111323"}},
+      {"int":{"value": 0}},
+      {"str":{"value": "webapp_id"}},
+      {"str":{"value": "10.0.0.1_id"}},
+      {"str":{"value": "/home_id"}},
+      {"int":{"value": 500}},
+      {"int":{"value": 1622933202000000000}}
+    ]
+  },
+  {
+    "tags": [
+      {"str":{"value": "trace_id-xxfff.111323"}},
+      {"int":{"value": 0}},
+      {"str":{"value": "webapp_id"}},
+      {"str":{"value": "10.0.0.1_id"}},
+      {"str":{"value": "/home_id"}},
+      {"int":{"value": 30}},
+      {"int":{"value": 1622933202000000000}},
+      {"str":{"value": "GET"}},
+      {"str":{"value": "200"}}
+    ]
+  },
+  {
+    "tags": [
+      {"str":{"value": "trace_id-xxfff.111323"}},
+      {"int":{"value": 1}},
+      {"str":{"value": "httpserver_id"}},
+      {"str":{"value": "10.0.0.1_id"}},
+      {"str":{"value": "/home_id"}},
+      {"int":{"value": 300}},
+      {"int":{"value": 1622933202000000000}},
+      {"str":{"value": "GET"}},
+      {"str":{"value": "500"}}
+    ]
+  }
+  
+]
\ No newline at end of file
diff --git a/banyand/tsdb/series.go b/banyand/tsdb/series.go
index efdf8ba..376e39c 100644
--- a/banyand/tsdb/series.go
+++ b/banyand/tsdb/series.go
@@ -19,6 +19,7 @@ package tsdb
 
 import (
 	"bytes"
+	"context"
 	"io"
 	"time"
 
@@ -27,6 +28,7 @@ import (
 
 	"github.com/apache/skywalking-banyandb/api/common"
 	"github.com/apache/skywalking-banyandb/pkg/convert"
+	"github.com/apache/skywalking-banyandb/pkg/logger"
 )
 
 var (
@@ -70,9 +72,8 @@ func (i *GlobalItemID) UnMarshal(data []byte) error {
 }
 
 type TimeRange struct {
-	Start    time.Time
-	Duration time.Duration
-	End      time.Time
+	Start time.Time
+	End   time.Time
 }
 
 func (t TimeRange) contains(unixNano uint64) bool {
@@ -83,18 +84,24 @@ func (t TimeRange) contains(unixNano uint64) bool {
 	return tp.Equal(t.Start) || tp.After(t.Start)
 }
 
-func NewTimeRange(Start time.Time, Duration time.Duration) TimeRange {
+func NewTimeRange(Start, End time.Time) TimeRange {
 	return TimeRange{
-		Start:    Start,
-		Duration: Duration,
-		End:      Start.Add(Duration),
+		Start: Start,
+		End:   End,
+	}
+}
+
+func NewTimeRangeDuration(Start time.Time, Duration time.Duration) TimeRange {
+	return TimeRange{
+		Start: Start,
+		End:   Start.Add(Duration),
 	}
 }
 
 type Series interface {
 	ID() common.SeriesID
 	Span(timeRange TimeRange) (SeriesSpan, error)
-	Get(id GlobalItemID) (Item, error)
+	Get(id GlobalItemID) (Item, io.Closer, error)
 }
 
 type SeriesSpan interface {
@@ -109,18 +116,16 @@ type series struct {
 	id      common.SeriesID
 	blockDB blockDatabase
 	shardID common.ShardID
+	l       *logger.Logger
 }
 
-func (s *series) Get(id GlobalItemID) (Item, error) {
-	panic("implement me")
-}
-
-func newSeries(id common.SeriesID, blockDB blockDatabase) *series {
-	return &series{
-		id:      id,
-		blockDB: blockDB,
-		shardID: blockDB.shardID(),
-	}
+func (s *series) Get(id GlobalItemID) (Item, io.Closer, error) {
+	b := s.blockDB.block(id)
+	return &item{
+		data:     b.dataReader(),
+		itemID:   id.id,
+		seriesID: s.id,
+	}, b, nil
 }
 
 func (s *series) ID() common.SeriesID {
@@ -132,7 +137,25 @@ func (s *series) Span(timeRange TimeRange) (SeriesSpan, error) {
 	if len(blocks) < 1 {
 		return nil, ErrEmptySeriesSpan
 	}
-	return newSeriesSpan(timeRange, blocks, s.id, s.shardID), nil
+	s.l.Debug().
+		Times("time_range", []time.Time{timeRange.Start, timeRange.End}).
+		Msg("select series span")
+	return newSeriesSpan(context.WithValue(context.Background(), logger.ContextKey, s.l), timeRange, blocks, s.id, s.shardID), nil
+}
+
+func newSeries(ctx context.Context, id common.SeriesID, blockDB blockDatabase) *series {
+	s := &series{
+		id:      id,
+		blockDB: blockDB,
+		shardID: blockDB.shardID(),
+	}
+	parentLogger := ctx.Value(logger.ContextKey)
+	if pl, ok := parentLogger.(*logger.Logger); ok {
+		s.l = pl.Named("series")
+	} else {
+		s.l = logger.GetLogger("series")
+	}
+	return s
 }
 
 var _ SeriesSpan = (*seriesSpan)(nil)
@@ -142,6 +165,7 @@ type seriesSpan struct {
 	seriesID  common.SeriesID
 	shardID   common.ShardID
 	timeRange TimeRange
+	l         *logger.Logger
 }
 
 func (s *seriesSpan) Close() (err error) {
@@ -159,12 +183,18 @@ func (s *seriesSpan) SeekerBuilder() SeekerBuilder {
 	return newSeekerBuilder(s)
 }
 
-func newSeriesSpan(timeRange TimeRange, blocks []blockDelegate,
-	id common.SeriesID, shardID common.ShardID) *seriesSpan {
-	return &seriesSpan{
+func newSeriesSpan(ctx context.Context, timeRange TimeRange, blocks []blockDelegate, id common.SeriesID, shardID common.ShardID) *seriesSpan {
+	s := &seriesSpan{
 		blocks:    blocks,
 		seriesID:  id,
 		shardID:   shardID,
 		timeRange: timeRange,
 	}
+	parentLogger := ctx.Value(logger.ContextKey)
+	if pl, ok := parentLogger.(*logger.Logger); ok {
+		s.l = pl.Named("series_span")
+	} else {
+		s.l = logger.GetLogger("series_span")
+	}
+	return s
 }
diff --git a/banyand/tsdb/series_seek.go b/banyand/tsdb/series_seek.go
index 478efa0..1c4f338 100644
--- a/banyand/tsdb/series_seek.go
+++ b/banyand/tsdb/series_seek.go
@@ -63,16 +63,21 @@ type seekerBuilder struct {
 }
 
 func (s *seekerBuilder) Build() (Seeker, error) {
+	if s.order == modelv2.QueryOrder_SORT_UNSPECIFIED {
+		s.order = modelv2.QueryOrder_SORT_DESC
+	}
 	indexFilter, err := s.buildIndexFilter()
 	if err != nil {
 		return nil, err
 	}
 	filters := []filterFn{
-		indexFilter,
 		func(item Item) bool {
 			return s.seriesSpan.timeRange.contains(item.Time())
 		},
 	}
+	if indexFilter != nil {
+		filters = append(filters, indexFilter)
+	}
 	return newSeeker(s.buildSeries(filters)), nil
 }
 
diff --git a/banyand/tsdb/series_seek_filter.go b/banyand/tsdb/series_seek_filter.go
index 65147ea..8814a16 100644
--- a/banyand/tsdb/series_seek_filter.go
+++ b/banyand/tsdb/series_seek_filter.go
@@ -44,6 +44,9 @@ func (s *seekerBuilder) Filter(indexRule *databasev2.IndexRule, condition Condit
 }
 
 func (s *seekerBuilder) buildIndexFilter() (filterFn, error) {
+	if len(s.conditions) < 1 {
+		return nil, nil
+	}
 	var treeIndexCondition, invertedIndexCondition []index.Condition
 	for _, condition := range s.conditions {
 		if len(condition.condition) > 1 {
diff --git a/banyand/tsdb/series_seek_sort.go b/banyand/tsdb/series_seek_sort.go
index 1646bce..3b26a8c 100644
--- a/banyand/tsdb/series_seek_sort.go
+++ b/banyand/tsdb/series_seek_sort.go
@@ -19,6 +19,9 @@ package tsdb
 
 import (
 	"sort"
+	"time"
+
+	"go.uber.org/multierr"
 
 	"github.com/apache/skywalking-banyandb/api/common"
 	databasev2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v2"
@@ -26,6 +29,7 @@ import (
 	"github.com/apache/skywalking-banyandb/banyand/kv"
 	"github.com/apache/skywalking-banyandb/pkg/index"
 	"github.com/apache/skywalking-banyandb/pkg/index/posting"
+	"github.com/apache/skywalking-banyandb/pkg/logger"
 )
 
 func (s *seekerBuilder) OrderByIndex(indexRule *databasev2.IndexRule, order modelv2.QueryOrder_Sort) SeekerBuilder {
@@ -42,29 +46,20 @@ func (s *seekerBuilder) OrderByTime(order modelv2.QueryOrder_Sort) SeekerBuilder
 
 func (s *seekerBuilder) buildSeries(filters []filterFn) []Iterator {
 	if s.indexRuleForSorting == nil {
-		return s.buildSeriesByIndex(filters)
+		return s.buildSeriesByTime(filters)
 	}
-	return s.buildSeriesByTime(filters)
+	return s.buildSeriesByIndex(filters)
 }
 
 func (s *seekerBuilder) buildSeriesByIndex(filters []filterFn) (series []Iterator) {
 	for _, b := range s.seriesSpan.blocks {
 		switch s.indexRuleForSorting.GetType() {
 		case databasev2.IndexRule_TYPE_TREE:
-			series = append(series, newSearcherIterator(
-				b.lsmIndexReader().
-					FieldIterator([]byte(s.indexRuleForSorting.GetMetadata().GetName()), s.order),
-				b.dataReader(),
-				s.seriesSpan.seriesID,
-				filters,
-			))
+			series = append(series, newSearcherIterator(s.seriesSpan.l, b.lsmIndexReader().
+				FieldIterator([]byte(s.indexRuleForSorting.GetMetadata().GetName()), s.order), b.dataReader(), s.seriesSpan.seriesID, filters))
 		case databasev2.IndexRule_TYPE_INVERTED:
-			series = append(series, newSearcherIterator(b.invertedIndexReader().
-				FieldIterator([]byte(s.indexRuleForSorting.GetMetadata().GetName()), s.order),
-				b.dataReader(),
-				s.seriesSpan.seriesID,
-				filters,
-			))
+			series = append(series, newSearcherIterator(s.seriesSpan.l, b.invertedIndexReader().
+				FieldIterator([]byte(s.indexRuleForSorting.GetMetadata().GetName()), s.order), b.dataReader(), s.seriesSpan.seriesID, filters))
 		}
 	}
 	return
@@ -83,15 +78,23 @@ func (s *seekerBuilder) buildSeriesByTime(filters []filterFn) []Iterator {
 			return bb[i].startTime().After(bb[j].startTime())
 		})
 	}
-	delegated := make([]Iterator, len(bb))
+	delegated := make([]Iterator, 0, len(bb))
+	var bTimes []time.Time
 	for _, b := range bb {
-		delegated = append(delegated, newSearcherIterator(b.
-			primaryIndexReader().
-			FieldIterator(
-				s.seriesSpan.seriesID.Marshal(),
-				s.order,
-			), b.dataReader(), s.seriesSpan.seriesID, filters))
+		bTimes = append(bTimes, b.startTime())
+		delegated = append(delegated, newSearcherIterator(
+			s.seriesSpan.l,
+			b.primaryIndexReader().
+				FieldIterator(
+					s.seriesSpan.seriesID.Marshal(),
+					s.order,
+				), b.dataReader(), s.seriesSpan.seriesID, filters))
 	}
+	s.seriesSpan.l.Debug().
+		Str("order", modelv2.QueryOrder_Sort_name[int32(s.order)]).
+		Times("blocks", bTimes).
+		Uint64("series_id", uint64(s.seriesSpan.seriesID)).
+		Msg("seek series by time")
 	return []Iterator{newMergedIterator(delegated)}
 }
 
@@ -104,6 +107,7 @@ type searcherIterator struct {
 	data          kv.TimeSeriesReader
 	seriesID      common.SeriesID
 	filters       []filterFn
+	l             *logger.Logger
 }
 
 func (s *searcherIterator) Next() bool {
@@ -112,20 +116,22 @@ func (s *searcherIterator) Next() bool {
 			v := s.fieldIterator.Val()
 			s.cur = v.Value.Iterator()
 			s.curKey = v.Key
+			s.l.Trace().Hex("term_field", s.curKey).Msg("got a new field")
 		} else {
-			_ = s.Close()
 			return false
 		}
 	}
 	if s.cur.Next() {
+
 		for _, filter := range s.filters {
 			if !filter(s.Val()) {
+				s.l.Trace().Uint64("item_id", uint64(s.Val().ID())).Msg("ignore the item")
 				return s.Next()
 			}
 		}
+		s.l.Trace().Uint64("item_id", uint64(s.Val().ID())).Msg("got an item")
 		return true
 	}
-	_ = s.cur.Close()
 	s.cur = nil
 	return s.Next()
 }
@@ -143,12 +149,14 @@ func (s *searcherIterator) Close() error {
 	return s.fieldIterator.Close()
 }
 
-func newSearcherIterator(fieldIterator index.FieldIterator, data kv.TimeSeriesReader, seriesID common.SeriesID, filters []filterFn) Iterator {
+func newSearcherIterator(l *logger.Logger, fieldIterator index.FieldIterator, data kv.TimeSeriesReader,
+	seriesID common.SeriesID, filters []filterFn) Iterator {
 	return &searcherIterator{
 		fieldIterator: fieldIterator,
 		data:          data,
 		seriesID:      seriesID,
 		filters:       filters,
+		l:             l,
 	}
 }
 
@@ -158,17 +166,12 @@ type mergedIterator struct {
 	curr      Iterator
 	index     int
 	delegated []Iterator
-	closed    bool
 }
 
 func (m *mergedIterator) Next() bool {
-	if m.closed {
-		return false
-	}
 	if m.curr == nil {
 		m.index++
 		if m.index >= len(m.delegated) {
-			_ = m.Close()
 			return false
 		} else {
 			m.curr = m.delegated[m.index]
@@ -176,7 +179,6 @@ func (m *mergedIterator) Next() bool {
 	}
 	hasNext := m.curr.Next()
 	if !hasNext {
-		_ = m.curr.Close()
 		m.curr = nil
 		return m.Next()
 	}
@@ -188,8 +190,11 @@ func (m *mergedIterator) Val() Item {
 }
 
 func (m *mergedIterator) Close() error {
-	m.closed = true
-	return nil
+	var err error
+	for _, d := range m.delegated {
+		err = multierr.Append(err, d.Close())
+	}
+	return err
 }
 
 func newMergedIterator(delegated []Iterator) Iterator {
diff --git a/banyand/tsdb/series_write.go b/banyand/tsdb/series_write.go
index 0f69332..af59b53 100644
--- a/banyand/tsdb/series_write.go
+++ b/banyand/tsdb/series_write.go
@@ -159,9 +159,10 @@ func (d dataBucket) marshal() []byte {
 	}, nil)
 }
 
-func (w *writer) Write() (id GlobalItemID, err error) {
+func (w *writer) Write() (GlobalItemID, error) {
+	id := w.ItemID()
 	for _, c := range w.columns {
-		err = w.block.write(dataBucket{
+		err := w.block.write(dataBucket{
 			seriesID: w.itemID.seriesID,
 			family:   c.family,
 		}.marshal(),
diff --git a/banyand/tsdb/seriesdb.go b/banyand/tsdb/seriesdb.go
index 24a622f..ccae717 100644
--- a/banyand/tsdb/seriesdb.go
+++ b/banyand/tsdb/seriesdb.go
@@ -41,6 +41,14 @@ type Entry []byte
 
 type Entity []Entry
 
+func (e Entity) Marshal() []byte {
+	data := make([][]byte, len(e))
+	for i, entry := range e {
+		data[i] = entry
+	}
+	return bytes.Join(data, nil)
+}
+
 type Path struct {
 	prefix   []byte
 	mask     []byte
@@ -86,6 +94,7 @@ type SeriesDatabase interface {
 type blockDatabase interface {
 	shardID() common.ShardID
 	span(timeRange TimeRange) []blockDelegate
+	block(id GlobalItemID) blockDelegate
 }
 
 var _ SeriesDatabase = (*seriesDB)(nil)
@@ -100,6 +109,10 @@ type seriesDB struct {
 	sID            common.ShardID
 }
 
+func (s *seriesDB) block(id GlobalItemID) blockDelegate {
+	return s.lst[id.segID].lst[id.blockID].delegate()
+}
+
 func (s *seriesDB) shardID() common.ShardID {
 	return s.sID
 }
@@ -111,7 +124,7 @@ func (s *seriesDB) Get(entity Entity) (Series, error) {
 		return nil, err
 	}
 	if err == nil {
-		return newSeries(bytesConvSeriesID(seriesID), s), nil
+		return newSeries(s.context(), bytesConvSeriesID(seriesID), s), nil
 	}
 	s.Lock()
 	defer s.Unlock()
@@ -120,7 +133,7 @@ func (s *seriesDB) Get(entity Entity) (Series, error) {
 	if err != nil {
 		return nil, err
 	}
-	return newSeries(bytesConvSeriesID(seriesID), s), nil
+	return newSeries(s.context(), bytesConvSeriesID(seriesID), s), nil
 }
 
 func (s *seriesDB) List(path Path) (SeriesList, error) {
@@ -130,8 +143,14 @@ func (s *seriesDB) List(path Path) (SeriesList, error) {
 			return nil, err
 		}
 		if err == nil {
-			return []Series{newSeries(bytesConvSeriesID(id), s)}, nil
+			seriesID := bytesConvSeriesID(id)
+			s.l.Debug().
+				Hex("path", path.prefix).
+				Uint64("series_id", uint64(seriesID)).
+				Msg("got a series")
+			return []Series{newSeries(s.context(), seriesID, s)}, nil
 		}
+		s.l.Debug().Hex("path", path.prefix).Msg("doesn't get any series")
 		return nil, nil
 	}
 	result := make([]Series, 0)
@@ -147,7 +166,12 @@ func (s *seriesDB) List(path Path) (SeriesList, error) {
 				err = multierr.Append(err, errGetVal)
 				return nil
 			}
-			result = append(result, newSeries(common.SeriesID(convert.BytesToUint64(id)), s))
+			seriesID := bytesConvSeriesID(id)
+			s.l.Debug().
+				Hex("path", path.prefix).
+				Uint64("series_id", uint64(seriesID)).
+				Msg("got a series")
+			result = append(result, newSeries(s.context(), seriesID, s))
 		}
 		return nil
 	})
@@ -166,6 +190,10 @@ func (s *seriesDB) span(_ TimeRange) []blockDelegate {
 	return result
 }
 
+func (s *seriesDB) context() context.Context {
+	return context.WithValue(context.Background(), logger.ContextKey, s.l)
+}
+
 func (s *seriesDB) Close() error {
 	for _, seg := range s.lst {
 		seg.close()
@@ -183,7 +211,7 @@ func newSeriesDataBase(ctx context.Context, shardID common.ShardID, path string,
 		return nil, logger.ErrNoLoggerInContext
 	}
 	if pl, ok := parentLogger.(*logger.Logger); ok {
-		sdb.l = pl.Named("seriesSpan")
+		sdb.l = pl.Named("series_database")
 	}
 	var err error
 	sdb.seriesMetadata, err = kv.OpenStore(0, path+"/md", kv.StoreWithNamedLogger("metadata", sdb.l))
diff --git a/banyand/tsdb/seriesdb_test.go b/banyand/tsdb/seriesdb_test.go
index 3968564..a87d24c 100644
--- a/banyand/tsdb/seriesdb_test.go
+++ b/banyand/tsdb/seriesdb_test.go
@@ -338,7 +338,7 @@ func setUpEntities(t *assert.Assertions, db SeriesDatabase) []*entityWithID {
 }
 
 func newMockSeries(id common.SeriesID) *series {
-	return newSeries(id, nil)
+	return newSeries(nil, id, nil)
 }
 
 func transform(list SeriesList) (seriesIDs []common.SeriesID) {
diff --git a/banyand/tsdb/shard.go b/banyand/tsdb/shard.go
index 3b2ed4e..4bcc40e 100644
--- a/banyand/tsdb/shard.go
+++ b/banyand/tsdb/shard.go
@@ -37,6 +37,10 @@ type shard struct {
 	lst            []*segment
 }
 
+func (s *shard) ID() common.ShardID {
+	return s.id
+}
+
 func (s *shard) Series() SeriesDatabase {
 	return s.seriesDatabase
 }
diff --git a/banyand/tsdb/tsdb.go b/banyand/tsdb/tsdb.go
index 0d3da45..4528b46 100644
--- a/banyand/tsdb/tsdb.go
+++ b/banyand/tsdb/tsdb.go
@@ -60,6 +60,7 @@ type Database interface {
 
 type Shard interface {
 	io.Closer
+	ID() common.ShardID
 	Series() SeriesDatabase
 	Index() IndexDatabase
 }