You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ha...@apache.org on 2021/09/07 10:32:05 UTC
[skywalking-banyandb] 01/03: Add stream query test
This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch time-series
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit cd628eaa55cba84f110d92940e4badfdd3dd3fff
Author: Gao Hongtao <ha...@gmail.com>
AuthorDate: Tue Sep 7 11:15:08 2021 +0800
Add stream query test
Signed-off-by: Gao Hongtao <ha...@gmail.com>
---
banyand/kv/badger.go | 36 ++--
banyand/kv/kv.go | 3 +-
banyand/stream/stream_query.go | 35 +---
banyand/stream/stream_query_test.go | 301 +++++++++++++++++++++++++++
banyand/stream/stream_write.go | 29 ++-
banyand/stream/stream_write_test.go | 2 +-
banyand/stream/testdata/multiple_shards.json | 64 ++++++
banyand/stream/testdata/shard0.json | 18 --
banyand/stream/testdata/single_series.json | 51 +++++
banyand/tsdb/series.go | 74 +++++--
banyand/tsdb/series_seek.go | 7 +-
banyand/tsdb/series_seek_filter.go | 3 +
banyand/tsdb/series_seek_sort.go | 71 ++++---
banyand/tsdb/series_write.go | 5 +-
banyand/tsdb/seriesdb.go | 38 +++-
banyand/tsdb/seriesdb_test.go | 2 +-
banyand/tsdb/shard.go | 4 +
banyand/tsdb/tsdb.go | 1 +
18 files changed, 607 insertions(+), 137 deletions(-)
diff --git a/banyand/kv/badger.go b/banyand/kv/badger.go
index 0dfbbe7..131032d 100644
--- a/banyand/kv/badger.go
+++ b/banyand/kv/badger.go
@@ -27,7 +27,9 @@ import (
"github.com/dgraph-io/badger/v3/y"
"go.uber.org/multierr"
+ "github.com/apache/skywalking-banyandb/api/common"
modelv2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v2"
+ "github.com/apache/skywalking-banyandb/pkg/convert"
"github.com/apache/skywalking-banyandb/pkg/index"
"github.com/apache/skywalking-banyandb/pkg/index/posting"
"github.com/apache/skywalking-banyandb/pkg/index/posting/roaring"
@@ -196,15 +198,11 @@ func (i *iterator) Seek(key []byte) {
}
func (i *iterator) Key() []byte {
- return i.delegated.Key()
+ return y.ParseKey(i.delegated.Key())
}
-func (i *iterator) Val() posting.List {
- list := roaring.NewPostingList()
- data := make([]byte, len(i.delegated.Value().Value))
- copy(data, i.delegated.Value().Value)
- _ = list.Unmarshall(data)
- return list
+func (i *iterator) Val() []byte {
+ return y.Copy(i.delegated.Value().Value)
}
func (i *iterator) Valid() bool {
@@ -286,22 +284,30 @@ var _ index.FieldIterator = (*fIterator)(nil)
type fIterator struct {
init bool
delegate Iterator
+ curr *index.PostingValue
}
func (f *fIterator) Next() bool {
- if f.init {
- f.delegate.Next()
- } else {
+ if !f.init {
f.init = true
+ f.delegate.Rewind()
}
- return f.delegate.Valid()
+ if !f.delegate.Valid() {
+ return false
+ }
+ pv := &index.PostingValue{
+ Key: f.delegate.Key(),
+ Value: roaring.NewPostingListWithInitialData(convert.BytesToUint64(f.delegate.Val())),
+ }
+ for ; f.delegate.Valid() && bytes.Equal(pv.Key, f.delegate.Key()); f.delegate.Next() {
+ pv.Value.Insert(common.ItemID(convert.BytesToUint64(f.delegate.Val())))
+ }
+ f.curr = pv
+ return true
}
func (f *fIterator) Val() *index.PostingValue {
- return &index.PostingValue{
- Key: f.delegate.Key(),
- Value: f.delegate.Val(),
- }
+ return f.curr
}
func (f *fIterator) Close() error {
diff --git a/banyand/kv/kv.go b/banyand/kv/kv.go
index 5cea6ed..4e17456 100644
--- a/banyand/kv/kv.go
+++ b/banyand/kv/kv.go
@@ -26,7 +26,6 @@ import (
"github.com/pkg/errors"
"github.com/apache/skywalking-banyandb/pkg/index"
- "github.com/apache/skywalking-banyandb/pkg/index/posting"
"github.com/apache/skywalking-banyandb/pkg/logger"
posting2 "github.com/apache/skywalking-banyandb/pkg/posting"
)
@@ -109,7 +108,7 @@ type Iterator interface {
Rewind()
Seek(key []byte)
Key() []byte
- Val() posting.List
+ Val() []byte
Valid() bool
Close() error
}
diff --git a/banyand/stream/stream_query.go b/banyand/stream/stream_query.go
index ac7d038..3402665 100644
--- a/banyand/stream/stream_query.go
+++ b/banyand/stream/stream_query.go
@@ -18,8 +18,6 @@
package stream
import (
- "bytes"
-
"github.com/golang/protobuf/proto"
"github.com/pkg/errors"
@@ -41,37 +39,22 @@ type Query interface {
Stream(stream *commonv2.Metadata) (Stream, error)
}
-type EqualCondition struct {
- tag string
- value []byte
-}
-
type Stream interface {
- Shards(equalConditions []EqualCondition) ([]tsdb.Shard, error)
+ Shards(entity tsdb.Entity) ([]tsdb.Shard, error)
}
var _ Stream = (*stream)(nil)
-func (s *stream) Shards(equalConditions []EqualCondition) ([]tsdb.Shard, error) {
- entityItemLen := len(s.entityIndex)
- entityItems := make([][]byte, entityItemLen)
- var entityCount int
- for _, ec := range equalConditions {
- fi, ti, tag := s.findTagByName(ec.tag)
- if tag == nil {
- return nil, ErrTagNotExist
- }
- for i, eIndex := range s.entityIndex {
- if eIndex.family == fi && eIndex.tag == ti {
- entityItems[i] = ec.value
- entityCount++
- }
- }
- }
- if entityCount < entityItemLen {
+func (s *stream) Shards(entity tsdb.Entity) ([]tsdb.Shard, error) {
+ if len(entity) < 1 {
return s.db.Shards(), nil
}
- shardID, err := partition.ShardID(bytes.Join(entityItems, nil), s.schema.GetShardNum())
+ for _, e := range entity {
+ if e == nil {
+ return s.db.Shards(), nil
+ }
+ }
+ shardID, err := partition.ShardID(entity.Marshal(), s.schema.GetShardNum())
if err != nil {
return nil, err
}
diff --git a/banyand/stream/stream_query_test.go b/banyand/stream/stream_query_test.go
new file mode 100644
index 0000000..8b6cc12
--- /dev/null
+++ b/banyand/stream/stream_query_test.go
@@ -0,0 +1,301 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package stream
+
+import (
+ "bytes"
+ "embed"
+ _ "embed"
+ "encoding/base64"
+ "encoding/json"
+ "fmt"
+ "sort"
+ "strconv"
+ "testing"
+ "time"
+
+ "github.com/golang/protobuf/jsonpb"
+ "github.com/stretchr/testify/assert"
+ "google.golang.org/protobuf/types/known/timestamppb"
+
+ "github.com/apache/skywalking-banyandb/api/common"
+ modelv2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v2"
+ streamv2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v2"
+ "github.com/apache/skywalking-banyandb/banyand/tsdb"
+ "github.com/apache/skywalking-banyandb/pkg/convert"
+ "github.com/apache/skywalking-banyandb/pkg/partition"
+)
+
+func Test_Stream_SelectShard(t *testing.T) {
+ tester := assert.New(t)
+ s, deferFunc := setup(tester)
+ defer deferFunc()
+ _ = setupQueryData(tester, "multiple_shards.json", s)
+ tests := []struct {
+ name string
+ entity tsdb.Entity
+ wantShardNum int
+ wantErr bool
+ }{
+ {
+ name: "all shards",
+ wantShardNum: 2,
+ },
+ {
+ name: "select a shard",
+ entity: tsdb.Entity{tsdb.Entry("webapp_id"), tsdb.Entry("10.0.0.1_id"), convert.Int64ToBytes(0)},
+ wantShardNum: 1,
+ },
+ {
+ name: "select shards",
+ entity: tsdb.Entity{tsdb.Entry("webapp_id"), tsdb.AnyEntry, convert.Int64ToBytes(0)},
+ wantShardNum: 2,
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ shards, err := s.Shards(tt.entity)
+ if tt.wantErr {
+ tester.Error(err)
+ return
+ }
+ tester.NoError(err)
+ tester.Equal(tt.wantShardNum, len(shards))
+ })
+ }
+
+}
+
+func Test_Stream_Series(t *testing.T) {
+ tester := assert.New(t)
+ s, deferFunc := setup(tester)
+ defer deferFunc()
+ baseTime := setupQueryData(tester, "multiple_shards.json", s)
+ type args struct {
+ entity tsdb.Entity
+ }
+ type shardStruct struct {
+ id common.ShardID
+ location []string
+ elements []string
+ }
+ type want struct {
+ shards []shardStruct
+ }
+
+ tests := []struct {
+ name string
+ args args
+ want want
+ wantErr bool
+ }{
+ {
+ name: "all",
+ args: args{
+ entity: tsdb.Entity{tsdb.AnyEntry, tsdb.AnyEntry, tsdb.AnyEntry},
+ },
+ want: want{
+ shards: []shardStruct{
+ {
+ id: 0,
+ location: []string{"series_12243341348514563931", "data_flow_0"},
+ elements: []string{"1"},
+ },
+ {
+ id: 0,
+ location: []string{"series_1671844747554927007", "data_flow_0"},
+ elements: []string{"2"},
+ },
+ {
+ id: 1,
+ location: []string{"series_2374367181827824198", "data_flow_0"},
+ elements: []string{"5", "3"},
+ },
+ {
+ id: 1,
+ location: []string{"series_8429137420168685297", "data_flow_0"},
+ elements: []string{"4"},
+ },
+ },
+ },
+ },
+ {
+ name: "find series by service_id and instance_id",
+ args: args{
+ entity: tsdb.Entity{tsdb.Entry("webapp_id"), tsdb.Entry("10.0.0.1_id"), tsdb.AnyEntry},
+ },
+ want: want{
+ shards: []shardStruct{
+ {
+ id: 0,
+ location: []string{"series_12243341348514563931", "data_flow_0"},
+ elements: []string{"1"},
+ },
+ {
+ id: 1,
+ location: []string{"series_2374367181827824198", "data_flow_0"},
+ elements: []string{"5", "3"},
+ },
+ },
+ },
+ },
+ {
+ name: "find a series",
+ args: args{
+ entity: tsdb.Entity{tsdb.Entry("webapp_id"), tsdb.Entry("10.0.0.1_id"), convert.Uint64ToBytes(1)},
+ },
+ want: want{
+ shards: []shardStruct{
+ {
+ id: 1,
+ location: []string{"series_2374367181827824198", "data_flow_0"},
+ elements: []string{"5", "3"},
+ },
+ },
+ },
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ shards, err := s.Shards(tt.args.entity)
+ tester.NoError(err)
+ got := want{
+ shards: []shardStruct{},
+ }
+
+ for _, shard := range shards {
+ seriesList, err := shard.Series().List(tsdb.NewPath(tt.args.entity))
+ tester.NoError(err)
+ for _, series := range seriesList {
+ func(g *want) {
+ sp, err := series.Span(tsdb.NewTimeRangeDuration(baseTime, 1*time.Hour))
+ defer func(sp tsdb.SeriesSpan) {
+ _ = sp.Close()
+ }(sp)
+ tester.NoError(err)
+ seeker, err := sp.SeekerBuilder().Build()
+ tester.NoError(err)
+ iter, err := seeker.Seek()
+ tester.NoError(err)
+ for dataFlowID, iterator := range iter {
+ var elements []string
+ for iterator.Next() {
+ tagFamily, err := s.ParseTagFamily("searchable", iterator.Val())
+ tester.NoError(err)
+ for _, tag := range tagFamily.GetTags() {
+ if tag.GetKey() == "trace_id" {
+ elements = append(elements, tag.GetValue().GetStr().GetValue())
+ }
+ }
+ }
+ _ = iterator.Close()
+ g.shards = append(g.shards, shardStruct{
+ id: shard.ID(),
+ location: []string{
+ fmt.Sprintf("series_%v", series.ID()),
+ "data_flow_" + strconv.Itoa(dataFlowID),
+ },
+ elements: elements,
+ })
+ }
+
+ }(&got)
+ }
+ }
+ if tt.wantErr {
+ tester.Error(err)
+ return
+ }
+ tester.NoError(err)
+ sort.SliceStable(got.shards, func(i, j int) bool {
+ a := got.shards[i]
+ b := got.shards[j]
+ if a.id > b.id {
+ return false
+ }
+ for i, al := range a.location {
+ bl := b.location[i]
+ if bytes.Compare([]byte(al), []byte(bl)) > 0 {
+ return false
+ }
+ }
+ return true
+ })
+ tester.Equal(tt.want, got)
+ })
+ }
+
+}
+
+//go:embed testdata/*.json
+var dataFS embed.FS
+
+func setupQueryData(t *assert.Assertions, dataFile string, stream *stream) (baseTime time.Time) {
+ var templates []interface{}
+ baseTime = time.Now()
+ content, err := dataFS.ReadFile("testdata/" + dataFile)
+ t.NoError(err)
+ t.NoError(json.Unmarshal(content, &templates))
+ bb, _ := base64.StdEncoding.DecodeString("YWJjMTIzIT8kKiYoKSctPUB+")
+ for i, template := range templates {
+ rawSearchTagFamily, err := json.Marshal(template)
+ t.NoError(err)
+ searchTagFamily := &streamv2.ElementValue_TagFamily{}
+ t.NoError(jsonpb.UnmarshalString(string(rawSearchTagFamily), searchTagFamily))
+ e := &streamv2.ElementValue{
+ ElementId: strconv.Itoa(i),
+ Timestamp: timestamppb.New(baseTime.Add(time.Millisecond * time.Duration(i))),
+ TagFamilies: []*streamv2.ElementValue_TagFamily{
+ {
+ Tags: []*modelv2.TagValue{
+ {
+ Value: &modelv2.TagValue_BinaryData{
+ BinaryData: bb,
+ },
+ },
+ },
+ },
+ },
+ }
+ e.TagFamilies = append(e.TagFamilies, searchTagFamily)
+ entity, err := stream.buildEntity(e)
+ t.NoError(err)
+ shardID, err := partition.ShardID(entity.Marshal(), stream.schema.GetShardNum())
+ t.NoError(err)
+ itemID, err := stream.write(common.ShardID(shardID), e)
+ t.NoError(err)
+ sa, err := stream.Shards(entity)
+ for _, shard := range sa {
+ se, err := shard.Series().Get(entity)
+ t.NoError(err)
+ for {
+ item, closer, _ := se.Get(*itemID)
+ rawTagFamily, _ := item.Val("searchable")
+ if len(rawTagFamily) > 0 {
+ _ = closer.Close()
+ break
+ }
+ _ = closer.Close()
+ }
+
+ }
+ }
+ return baseTime
+}
diff --git a/banyand/stream/stream_write.go b/banyand/stream/stream_write.go
index 2bcd0bf..fa80dc3 100644
--- a/banyand/stream/stream_write.go
+++ b/banyand/stream/stream_write.go
@@ -40,34 +40,34 @@ var (
ErrUnsupportedTagForIndexField = errors.New("the tag type(for example, null) can not be as the index field value")
)
-func (s *stream) write(shardID common.ShardID, value *streamv2.ElementValue) error {
+func (s *stream) write(shardID common.ShardID, value *streamv2.ElementValue) (*tsdb.GlobalItemID, error) {
sm := s.schema
fLen := len(value.GetTagFamilies())
if fLen < 1 {
- return errors.Wrap(ErrMalformedElement, "no tag family")
+ return nil, errors.Wrap(ErrMalformedElement, "no tag family")
}
if fLen > len(sm.TagFamilies) {
- return errors.Wrap(ErrMalformedElement, "tag family number is more than expected")
+ return nil, errors.Wrap(ErrMalformedElement, "tag family number is more than expected")
}
shard, err := s.db.Shard(shardID)
if err != nil {
- return err
+ return nil, err
}
entity, err := s.buildEntity(value)
if err != nil {
- return err
+ return nil, err
}
series, err := shard.Series().Get(entity)
if err != nil {
- return err
+ return nil, err
}
t := value.GetTimestamp().AsTime()
- wp, err := series.Span(tsdb.NewTimeRange(t, 0))
+ wp, err := series.Span(tsdb.NewTimeRangeDuration(t, 0))
if err != nil {
if wp != nil {
_ = wp.Close()
}
- return err
+ return nil, err
}
writeFn := func() (tsdb.Writer, error) {
builder := wp.WriterBuilder().Time(t)
@@ -97,12 +97,18 @@ func (s *stream) write(shardID common.ShardID, value *streamv2.ElementValue) err
return nil, errWrite
}
_, errWrite = writer.Write()
+ s.l.Debug().
+ Time("ts", t).
+ Int("ts_nano", t.Nanosecond()).
+ Interface("data", value).
+ Uint64("series_id", uint64(series.ID())).
+ Msg("write stream")
return writer, errWrite
}
writer, err := writeFn()
if err != nil {
_ = wp.Close()
- return err
+ return nil, err
}
m := indexMessage{
localWriter: writer,
@@ -117,7 +123,8 @@ func (s *stream) write(shardID common.ShardID, value *streamv2.ElementValue) err
}()
s.indexCh <- m
}(m)
- return err
+ itemID := writer.ItemID()
+ return &itemID, err
}
func getIndexValue(ruleIndex indexRule, value *streamv2.ElementValue) (val []byte, err error) {
@@ -204,7 +211,7 @@ func (w *writeCallback) Rev(message bus.Message) (resp bus.Message) {
}
sm := writeEvent.WriteRequest.GetMetadata()
id := formatStreamID(sm.GetName(), sm.GetGroup())
- err := w.schemaMap[id].write(common.ShardID(writeEvent.ShardID), writeEvent.WriteRequest.GetElement())
+ _, err := w.schemaMap[id].write(common.ShardID(writeEvent.ShardID), writeEvent.WriteRequest.GetElement())
if err != nil {
w.l.Debug().Err(err)
}
diff --git a/banyand/stream/stream_write_test.go b/banyand/stream/stream_write_test.go
index 1853a7f..0ffe0d5 100644
--- a/banyand/stream/stream_write_test.go
+++ b/banyand/stream/stream_write_test.go
@@ -197,7 +197,7 @@ func Test_Stream_Write(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
- err := s.write(common.ShardID(tt.args.shardID), tt.args.ele)
+ _, err := s.write(common.ShardID(tt.args.shardID), tt.args.ele)
if tt.wantErr {
tester.Error(err)
return
diff --git a/banyand/stream/testdata/multiple_shards.json b/banyand/stream/testdata/multiple_shards.json
new file mode 100644
index 0000000..791c7c9
--- /dev/null
+++ b/banyand/stream/testdata/multiple_shards.json
@@ -0,0 +1,64 @@
+[
+ {
+ "tags": [
+ {"str":{"value": "1"}},
+ {"int":{"value": 0}},
+ {"str":{"value": "webapp_id"}},
+ {"str":{"value": "10.0.0.1_id"}},
+ {"str":{"value": "/home_id"}},
+ {"int":{"value": 1000}},
+ {"int":{"value": 1622933202000000000}}
+ ]
+ },
+ {
+ "tags": [
+ {"str":{"value": "2"}},
+ {"int":{"value": 0}},
+ {"str":{"value": "webapp_id"}},
+ {"str":{"value": "10.0.0.3_id"}},
+ {"str":{"value": "/home_id"}},
+ {"int":{"value": 500}},
+ {"int":{"value": 1622933202000000000}}
+ ]
+ },
+ {
+ "tags": [
+ {"str":{"value": "3"}},
+ {"int":{"value": 1}},
+ {"str":{"value": "webapp_id"}},
+ {"str":{"value": "10.0.0.1_id"}},
+ {"str":{"value": "/home_id"}},
+ {"int":{"value": 30}},
+ {"int":{"value": 1622933202000000000}},
+ {"str":{"value": "GET"}},
+ {"str":{"value": "500"}}
+ ]
+ },
+ {
+ "tags": [
+ {"str":{"value": "4"}},
+ {"int":{"value": 1}},
+ {"str":{"value": "webapp_id"}},
+ {"str":{"value": "10.0.0.5_id"}},
+ {"str":{"value": "/home_id"}},
+ {"int":{"value": 60}},
+ {"int":{"value": 1622933202000000000}},
+ {"str":{"value": "GET"}},
+ {"str":{"value": "400"}}
+ ]
+ },
+ {
+ "tags": [
+ {"str":{"value": "5"}},
+ {"int":{"value": 1}},
+ {"str":{"value": "webapp_id"}},
+ {"str":{"value": "10.0.0.1_id"}},
+ {"str":{"value": "/home_id"}},
+ {"int":{"value": 300}},
+ {"int":{"value": 1622933202000000000}},
+ {"str":{"value": "GET"}},
+ {"str":{"value": "500"}}
+ ]
+ }
+
+]
\ No newline at end of file
diff --git a/banyand/stream/testdata/shard0.json b/banyand/stream/testdata/shard0.json
deleted file mode 100644
index 28de2f3..0000000
--- a/banyand/stream/testdata/shard0.json
+++ /dev/null
@@ -1,18 +0,0 @@
-[
- {
- "element_id": "1",
- "timestamp": "2021-04-15T01:30:15.01Z",
- "tag_families": [
- {
- "tags": [
- {"binary_data": "YWJjMTIzIT8kKiYoKSctPUB+"}
- ]
- },
- {
- "tags": [
- {"str": ""}
- ]
- }
- ]
- }
-]
\ No newline at end of file
diff --git a/banyand/stream/testdata/single_series.json b/banyand/stream/testdata/single_series.json
new file mode 100644
index 0000000..049ec0c
--- /dev/null
+++ b/banyand/stream/testdata/single_series.json
@@ -0,0 +1,51 @@
+[
+ {
+ "tags": [
+ {"str":{"value": "trace_id-xxfff.111323"}},
+ {"int":{"value": 0}},
+ {"str":{"value": "webapp_id"}},
+ {"str":{"value": "10.0.0.1_id"}},
+ {"str":{"value": "/home_id"}},
+ {"int":{"value": 1000}},
+ {"int":{"value": 1622933202000000000}}
+ ]
+ },
+ {
+ "tags": [
+ {"str":{"value": "trace_id-xxfff.111323"}},
+ {"int":{"value": 0}},
+ {"str":{"value": "webapp_id"}},
+ {"str":{"value": "10.0.0.1_id"}},
+ {"str":{"value": "/home_id"}},
+ {"int":{"value": 500}},
+ {"int":{"value": 1622933202000000000}}
+ ]
+ },
+ {
+ "tags": [
+ {"str":{"value": "trace_id-xxfff.111323"}},
+ {"int":{"value": 0}},
+ {"str":{"value": "webapp_id"}},
+ {"str":{"value": "10.0.0.1_id"}},
+ {"str":{"value": "/home_id"}},
+ {"int":{"value": 30}},
+ {"int":{"value": 1622933202000000000}},
+ {"str":{"value": "GET"}},
+ {"str":{"value": "200"}}
+ ]
+ },
+ {
+ "tags": [
+ {"str":{"value": "trace_id-xxfff.111323"}},
+ {"int":{"value": 1}},
+ {"str":{"value": "httpserver_id"}},
+ {"str":{"value": "10.0.0.1_id"}},
+ {"str":{"value": "/home_id"}},
+ {"int":{"value": 300}},
+ {"int":{"value": 1622933202000000000}},
+ {"str":{"value": "GET"}},
+ {"str":{"value": "500"}}
+ ]
+ }
+
+]
\ No newline at end of file
diff --git a/banyand/tsdb/series.go b/banyand/tsdb/series.go
index efdf8ba..376e39c 100644
--- a/banyand/tsdb/series.go
+++ b/banyand/tsdb/series.go
@@ -19,6 +19,7 @@ package tsdb
import (
"bytes"
+ "context"
"io"
"time"
@@ -27,6 +28,7 @@ import (
"github.com/apache/skywalking-banyandb/api/common"
"github.com/apache/skywalking-banyandb/pkg/convert"
+ "github.com/apache/skywalking-banyandb/pkg/logger"
)
var (
@@ -70,9 +72,8 @@ func (i *GlobalItemID) UnMarshal(data []byte) error {
}
type TimeRange struct {
- Start time.Time
- Duration time.Duration
- End time.Time
+ Start time.Time
+ End time.Time
}
func (t TimeRange) contains(unixNano uint64) bool {
@@ -83,18 +84,24 @@ func (t TimeRange) contains(unixNano uint64) bool {
return tp.Equal(t.Start) || tp.After(t.Start)
}
-func NewTimeRange(Start time.Time, Duration time.Duration) TimeRange {
+func NewTimeRange(Start, End time.Time) TimeRange {
return TimeRange{
- Start: Start,
- Duration: Duration,
- End: Start.Add(Duration),
+ Start: Start,
+ End: End,
+ }
+}
+
+func NewTimeRangeDuration(Start time.Time, Duration time.Duration) TimeRange {
+ return TimeRange{
+ Start: Start,
+ End: Start.Add(Duration),
}
}
type Series interface {
ID() common.SeriesID
Span(timeRange TimeRange) (SeriesSpan, error)
- Get(id GlobalItemID) (Item, error)
+ Get(id GlobalItemID) (Item, io.Closer, error)
}
type SeriesSpan interface {
@@ -109,18 +116,16 @@ type series struct {
id common.SeriesID
blockDB blockDatabase
shardID common.ShardID
+ l *logger.Logger
}
-func (s *series) Get(id GlobalItemID) (Item, error) {
- panic("implement me")
-}
-
-func newSeries(id common.SeriesID, blockDB blockDatabase) *series {
- return &series{
- id: id,
- blockDB: blockDB,
- shardID: blockDB.shardID(),
- }
+func (s *series) Get(id GlobalItemID) (Item, io.Closer, error) {
+ b := s.blockDB.block(id)
+ return &item{
+ data: b.dataReader(),
+ itemID: id.id,
+ seriesID: s.id,
+ }, b, nil
}
func (s *series) ID() common.SeriesID {
@@ -132,7 +137,25 @@ func (s *series) Span(timeRange TimeRange) (SeriesSpan, error) {
if len(blocks) < 1 {
return nil, ErrEmptySeriesSpan
}
- return newSeriesSpan(timeRange, blocks, s.id, s.shardID), nil
+ s.l.Debug().
+ Times("time_range", []time.Time{timeRange.Start, timeRange.End}).
+ Msg("select series span")
+ return newSeriesSpan(context.WithValue(context.Background(), logger.ContextKey, s.l), timeRange, blocks, s.id, s.shardID), nil
+}
+
+func newSeries(ctx context.Context, id common.SeriesID, blockDB blockDatabase) *series {
+ s := &series{
+ id: id,
+ blockDB: blockDB,
+ shardID: blockDB.shardID(),
+ }
+ parentLogger := ctx.Value(logger.ContextKey)
+ if pl, ok := parentLogger.(*logger.Logger); ok {
+ s.l = pl.Named("series")
+ } else {
+ s.l = logger.GetLogger("series")
+ }
+ return s
}
var _ SeriesSpan = (*seriesSpan)(nil)
@@ -142,6 +165,7 @@ type seriesSpan struct {
seriesID common.SeriesID
shardID common.ShardID
timeRange TimeRange
+ l *logger.Logger
}
func (s *seriesSpan) Close() (err error) {
@@ -159,12 +183,18 @@ func (s *seriesSpan) SeekerBuilder() SeekerBuilder {
return newSeekerBuilder(s)
}
-func newSeriesSpan(timeRange TimeRange, blocks []blockDelegate,
- id common.SeriesID, shardID common.ShardID) *seriesSpan {
- return &seriesSpan{
+func newSeriesSpan(ctx context.Context, timeRange TimeRange, blocks []blockDelegate, id common.SeriesID, shardID common.ShardID) *seriesSpan {
+ s := &seriesSpan{
blocks: blocks,
seriesID: id,
shardID: shardID,
timeRange: timeRange,
}
+ parentLogger := ctx.Value(logger.ContextKey)
+ if pl, ok := parentLogger.(*logger.Logger); ok {
+ s.l = pl.Named("series_span")
+ } else {
+ s.l = logger.GetLogger("series_span")
+ }
+ return s
}
diff --git a/banyand/tsdb/series_seek.go b/banyand/tsdb/series_seek.go
index 478efa0..1c4f338 100644
--- a/banyand/tsdb/series_seek.go
+++ b/banyand/tsdb/series_seek.go
@@ -63,16 +63,21 @@ type seekerBuilder struct {
}
func (s *seekerBuilder) Build() (Seeker, error) {
+ if s.order == modelv2.QueryOrder_SORT_UNSPECIFIED {
+ s.order = modelv2.QueryOrder_SORT_DESC
+ }
indexFilter, err := s.buildIndexFilter()
if err != nil {
return nil, err
}
filters := []filterFn{
- indexFilter,
func(item Item) bool {
return s.seriesSpan.timeRange.contains(item.Time())
},
}
+ if indexFilter != nil {
+ filters = append(filters, indexFilter)
+ }
return newSeeker(s.buildSeries(filters)), nil
}
diff --git a/banyand/tsdb/series_seek_filter.go b/banyand/tsdb/series_seek_filter.go
index 65147ea..8814a16 100644
--- a/banyand/tsdb/series_seek_filter.go
+++ b/banyand/tsdb/series_seek_filter.go
@@ -44,6 +44,9 @@ func (s *seekerBuilder) Filter(indexRule *databasev2.IndexRule, condition Condit
}
func (s *seekerBuilder) buildIndexFilter() (filterFn, error) {
+ if len(s.conditions) < 1 {
+ return nil, nil
+ }
var treeIndexCondition, invertedIndexCondition []index.Condition
for _, condition := range s.conditions {
if len(condition.condition) > 1 {
diff --git a/banyand/tsdb/series_seek_sort.go b/banyand/tsdb/series_seek_sort.go
index 1646bce..3b26a8c 100644
--- a/banyand/tsdb/series_seek_sort.go
+++ b/banyand/tsdb/series_seek_sort.go
@@ -19,6 +19,9 @@ package tsdb
import (
"sort"
+ "time"
+
+ "go.uber.org/multierr"
"github.com/apache/skywalking-banyandb/api/common"
databasev2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v2"
@@ -26,6 +29,7 @@ import (
"github.com/apache/skywalking-banyandb/banyand/kv"
"github.com/apache/skywalking-banyandb/pkg/index"
"github.com/apache/skywalking-banyandb/pkg/index/posting"
+ "github.com/apache/skywalking-banyandb/pkg/logger"
)
func (s *seekerBuilder) OrderByIndex(indexRule *databasev2.IndexRule, order modelv2.QueryOrder_Sort) SeekerBuilder {
@@ -42,29 +46,20 @@ func (s *seekerBuilder) OrderByTime(order modelv2.QueryOrder_Sort) SeekerBuilder
func (s *seekerBuilder) buildSeries(filters []filterFn) []Iterator {
if s.indexRuleForSorting == nil {
- return s.buildSeriesByIndex(filters)
+ return s.buildSeriesByTime(filters)
}
- return s.buildSeriesByTime(filters)
+ return s.buildSeriesByIndex(filters)
}
func (s *seekerBuilder) buildSeriesByIndex(filters []filterFn) (series []Iterator) {
for _, b := range s.seriesSpan.blocks {
switch s.indexRuleForSorting.GetType() {
case databasev2.IndexRule_TYPE_TREE:
- series = append(series, newSearcherIterator(
- b.lsmIndexReader().
- FieldIterator([]byte(s.indexRuleForSorting.GetMetadata().GetName()), s.order),
- b.dataReader(),
- s.seriesSpan.seriesID,
- filters,
- ))
+ series = append(series, newSearcherIterator(s.seriesSpan.l, b.lsmIndexReader().
+ FieldIterator([]byte(s.indexRuleForSorting.GetMetadata().GetName()), s.order), b.dataReader(), s.seriesSpan.seriesID, filters))
case databasev2.IndexRule_TYPE_INVERTED:
- series = append(series, newSearcherIterator(b.invertedIndexReader().
- FieldIterator([]byte(s.indexRuleForSorting.GetMetadata().GetName()), s.order),
- b.dataReader(),
- s.seriesSpan.seriesID,
- filters,
- ))
+ series = append(series, newSearcherIterator(s.seriesSpan.l, b.invertedIndexReader().
+ FieldIterator([]byte(s.indexRuleForSorting.GetMetadata().GetName()), s.order), b.dataReader(), s.seriesSpan.seriesID, filters))
}
}
return
@@ -83,15 +78,23 @@ func (s *seekerBuilder) buildSeriesByTime(filters []filterFn) []Iterator {
return bb[i].startTime().After(bb[j].startTime())
})
}
- delegated := make([]Iterator, len(bb))
+ delegated := make([]Iterator, 0, len(bb))
+ var bTimes []time.Time
for _, b := range bb {
- delegated = append(delegated, newSearcherIterator(b.
- primaryIndexReader().
- FieldIterator(
- s.seriesSpan.seriesID.Marshal(),
- s.order,
- ), b.dataReader(), s.seriesSpan.seriesID, filters))
+ bTimes = append(bTimes, b.startTime())
+ delegated = append(delegated, newSearcherIterator(
+ s.seriesSpan.l,
+ b.primaryIndexReader().
+ FieldIterator(
+ s.seriesSpan.seriesID.Marshal(),
+ s.order,
+ ), b.dataReader(), s.seriesSpan.seriesID, filters))
}
+ s.seriesSpan.l.Debug().
+ Str("order", modelv2.QueryOrder_Sort_name[int32(s.order)]).
+ Times("blocks", bTimes).
+ Uint64("series_id", uint64(s.seriesSpan.seriesID)).
+ Msg("seek series by time")
return []Iterator{newMergedIterator(delegated)}
}
@@ -104,6 +107,7 @@ type searcherIterator struct {
data kv.TimeSeriesReader
seriesID common.SeriesID
filters []filterFn
+ l *logger.Logger
}
func (s *searcherIterator) Next() bool {
@@ -112,20 +116,22 @@ func (s *searcherIterator) Next() bool {
v := s.fieldIterator.Val()
s.cur = v.Value.Iterator()
s.curKey = v.Key
+ s.l.Trace().Hex("term_field", s.curKey).Msg("got a new field")
} else {
- _ = s.Close()
return false
}
}
if s.cur.Next() {
+
for _, filter := range s.filters {
if !filter(s.Val()) {
+ s.l.Trace().Uint64("item_id", uint64(s.Val().ID())).Msg("ignore the item")
return s.Next()
}
}
+ s.l.Trace().Uint64("item_id", uint64(s.Val().ID())).Msg("got an item")
return true
}
- _ = s.cur.Close()
s.cur = nil
return s.Next()
}
@@ -143,12 +149,14 @@ func (s *searcherIterator) Close() error {
return s.fieldIterator.Close()
}
-func newSearcherIterator(fieldIterator index.FieldIterator, data kv.TimeSeriesReader, seriesID common.SeriesID, filters []filterFn) Iterator {
+func newSearcherIterator(l *logger.Logger, fieldIterator index.FieldIterator, data kv.TimeSeriesReader,
+ seriesID common.SeriesID, filters []filterFn) Iterator {
return &searcherIterator{
fieldIterator: fieldIterator,
data: data,
seriesID: seriesID,
filters: filters,
+ l: l,
}
}
@@ -158,17 +166,12 @@ type mergedIterator struct {
curr Iterator
index int
delegated []Iterator
- closed bool
}
func (m *mergedIterator) Next() bool {
- if m.closed {
- return false
- }
if m.curr == nil {
m.index++
if m.index >= len(m.delegated) {
- _ = m.Close()
return false
} else {
m.curr = m.delegated[m.index]
@@ -176,7 +179,6 @@ func (m *mergedIterator) Next() bool {
}
hasNext := m.curr.Next()
if !hasNext {
- _ = m.curr.Close()
m.curr = nil
return m.Next()
}
@@ -188,8 +190,11 @@ func (m *mergedIterator) Val() Item {
}
func (m *mergedIterator) Close() error {
- m.closed = true
- return nil
+ var err error
+ for _, d := range m.delegated {
+ err = multierr.Append(err, d.Close())
+ }
+ return err
}
func newMergedIterator(delegated []Iterator) Iterator {
diff --git a/banyand/tsdb/series_write.go b/banyand/tsdb/series_write.go
index 0f69332..af59b53 100644
--- a/banyand/tsdb/series_write.go
+++ b/banyand/tsdb/series_write.go
@@ -159,9 +159,10 @@ func (d dataBucket) marshal() []byte {
}, nil)
}
-func (w *writer) Write() (id GlobalItemID, err error) {
+func (w *writer) Write() (GlobalItemID, error) {
+ id := w.ItemID()
for _, c := range w.columns {
- err = w.block.write(dataBucket{
+ err := w.block.write(dataBucket{
seriesID: w.itemID.seriesID,
family: c.family,
}.marshal(),
diff --git a/banyand/tsdb/seriesdb.go b/banyand/tsdb/seriesdb.go
index 24a622f..ccae717 100644
--- a/banyand/tsdb/seriesdb.go
+++ b/banyand/tsdb/seriesdb.go
@@ -41,6 +41,14 @@ type Entry []byte
type Entity []Entry
+func (e Entity) Marshal() []byte {
+ data := make([][]byte, len(e))
+ for i, entry := range e {
+ data[i] = entry
+ }
+ return bytes.Join(data, nil)
+}
+
type Path struct {
prefix []byte
mask []byte
@@ -86,6 +94,7 @@ type SeriesDatabase interface {
type blockDatabase interface {
shardID() common.ShardID
span(timeRange TimeRange) []blockDelegate
+ block(id GlobalItemID) blockDelegate
}
var _ SeriesDatabase = (*seriesDB)(nil)
@@ -100,6 +109,10 @@ type seriesDB struct {
sID common.ShardID
}
+func (s *seriesDB) block(id GlobalItemID) blockDelegate {
+ return s.lst[id.segID].lst[id.blockID].delegate()
+}
+
func (s *seriesDB) shardID() common.ShardID {
return s.sID
}
@@ -111,7 +124,7 @@ func (s *seriesDB) Get(entity Entity) (Series, error) {
return nil, err
}
if err == nil {
- return newSeries(bytesConvSeriesID(seriesID), s), nil
+ return newSeries(s.context(), bytesConvSeriesID(seriesID), s), nil
}
s.Lock()
defer s.Unlock()
@@ -120,7 +133,7 @@ func (s *seriesDB) Get(entity Entity) (Series, error) {
if err != nil {
return nil, err
}
- return newSeries(bytesConvSeriesID(seriesID), s), nil
+ return newSeries(s.context(), bytesConvSeriesID(seriesID), s), nil
}
func (s *seriesDB) List(path Path) (SeriesList, error) {
@@ -130,8 +143,14 @@ func (s *seriesDB) List(path Path) (SeriesList, error) {
return nil, err
}
if err == nil {
- return []Series{newSeries(bytesConvSeriesID(id), s)}, nil
+ seriesID := bytesConvSeriesID(id)
+ s.l.Debug().
+ Hex("path", path.prefix).
+ Uint64("series_id", uint64(seriesID)).
+ Msg("got a series")
+ return []Series{newSeries(s.context(), seriesID, s)}, nil
}
+ s.l.Debug().Hex("path", path.prefix).Msg("doesn't get any series")
return nil, nil
}
result := make([]Series, 0)
@@ -147,7 +166,12 @@ func (s *seriesDB) List(path Path) (SeriesList, error) {
err = multierr.Append(err, errGetVal)
return nil
}
- result = append(result, newSeries(common.SeriesID(convert.BytesToUint64(id)), s))
+ seriesID := bytesConvSeriesID(id)
+ s.l.Debug().
+ Hex("path", path.prefix).
+ Uint64("series_id", uint64(seriesID)).
+ Msg("got a series")
+ result = append(result, newSeries(s.context(), seriesID, s))
}
return nil
})
@@ -166,6 +190,10 @@ func (s *seriesDB) span(_ TimeRange) []blockDelegate {
return result
}
+func (s *seriesDB) context() context.Context {
+ return context.WithValue(context.Background(), logger.ContextKey, s.l)
+}
+
func (s *seriesDB) Close() error {
for _, seg := range s.lst {
seg.close()
@@ -183,7 +211,7 @@ func newSeriesDataBase(ctx context.Context, shardID common.ShardID, path string,
return nil, logger.ErrNoLoggerInContext
}
if pl, ok := parentLogger.(*logger.Logger); ok {
- sdb.l = pl.Named("seriesSpan")
+ sdb.l = pl.Named("series_database")
}
var err error
sdb.seriesMetadata, err = kv.OpenStore(0, path+"/md", kv.StoreWithNamedLogger("metadata", sdb.l))
diff --git a/banyand/tsdb/seriesdb_test.go b/banyand/tsdb/seriesdb_test.go
index 3968564..a87d24c 100644
--- a/banyand/tsdb/seriesdb_test.go
+++ b/banyand/tsdb/seriesdb_test.go
@@ -338,7 +338,7 @@ func setUpEntities(t *assert.Assertions, db SeriesDatabase) []*entityWithID {
}
func newMockSeries(id common.SeriesID) *series {
- return newSeries(id, nil)
+ return newSeries(nil, id, nil)
}
func transform(list SeriesList) (seriesIDs []common.SeriesID) {
diff --git a/banyand/tsdb/shard.go b/banyand/tsdb/shard.go
index 3b2ed4e..4bcc40e 100644
--- a/banyand/tsdb/shard.go
+++ b/banyand/tsdb/shard.go
@@ -37,6 +37,10 @@ type shard struct {
lst []*segment
}
+func (s *shard) ID() common.ShardID {
+ return s.id
+}
+
func (s *shard) Series() SeriesDatabase {
return s.seriesDatabase
}
diff --git a/banyand/tsdb/tsdb.go b/banyand/tsdb/tsdb.go
index 0d3da45..4528b46 100644
--- a/banyand/tsdb/tsdb.go
+++ b/banyand/tsdb/tsdb.go
@@ -60,6 +60,7 @@ type Database interface {
type Shard interface {
io.Closer
+ ID() common.ShardID
Series() SeriesDatabase
Index() IndexDatabase
}