You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ha...@apache.org on 2021/09/22 07:54:52 UTC

[skywalking-banyandb] branch tsdb-range created (now 04d3d4b)

This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a change to branch tsdb-range
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git.


      at 04d3d4b  Fix some flaws:

This branch includes the following new commits:

     new 04d3d4b  Fix some flaws:

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


[skywalking-banyandb] 01/01: Fix some flaws:

Posted by ha...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch tsdb-range
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit 04d3d4b6e60ec1c8696b7a36e078539c0e0b9af1
Author: Gao Hongtao <ha...@gmail.com>
AuthorDate: Wed Sep 22 14:58:36 2021 +0800

    Fix some flaws:
    
     * failed to assign the range to the searching tree
     * use intersection operation to combine several criteria
    
    Signed-off-by: Gao Hongtao <ha...@gmail.com>
---
 banyand/stream/stream_query_test.go | 222 ++++++++++++++++++++++++++++++++++++
 banyand/tsdb/series_seek.go         |   8 +-
 banyand/tsdb/series_seek_filter.go  |  76 +++++++-----
 banyand/tsdb/series_seek_sort.go    |  39 +++++--
 pkg/index/tree.go                   |  16 ++-
 5 files changed, 312 insertions(+), 49 deletions(-)

diff --git a/banyand/stream/stream_query_test.go b/banyand/stream/stream_query_test.go
index 5a19576..21669c6 100644
--- a/banyand/stream/stream_query_test.go
+++ b/banyand/stream/stream_query_test.go
@@ -285,6 +285,228 @@ func Test_Stream_Series(t *testing.T) {
 				},
 			},
 		},
+		{
+			name: "filter by duration",
+			args: queryOpts{
+				entity:    tsdb.Entity{tsdb.AnyEntry, tsdb.AnyEntry, tsdb.AnyEntry},
+				timeRange: tsdb.NewTimeRangeDuration(baseTime, 1*time.Hour),
+				buildFn: func(builder tsdb.SeekerBuilder) {
+					rule := &databasev2.IndexRule{
+						Metadata: &commonv2.Metadata{
+							Name:  "duration",
+							Group: "default",
+							Id:    3,
+						},
+						Tags:     []string{"duration"},
+						Type:     databasev2.IndexRule_TYPE_TREE,
+						Location: databasev2.IndexRule_LOCATION_SERIES,
+					}
+					builder.Filter(rule, tsdb.Condition{
+						"duration": []index.ConditionValue{
+							{
+								Op:     modelv2.Condition_BINARY_OP_LT,
+								Values: [][]byte{convert.Int64ToBytes(500)},
+							},
+						},
+					})
+				},
+			},
+			want: shardsForTest{
+				{
+					id:       0,
+					location: []string{"series_16283518706331625322", "data_flow_0"},
+					elements: []string{"4"},
+				},
+				{
+					id:       0,
+					location: []string{"series_4862694201852929188", "data_flow_0"},
+				},
+				{
+					id:       1,
+					location: []string{"series_13343478452567673284", "data_flow_0"},
+				},
+				{
+					id:       1,
+					location: []string{"series_7898679171060804990", "data_flow_0"},
+					elements: []string{"5", "3"},
+				},
+			},
+		},
+		{
+			name: "filter and sort by duration",
+			args: queryOpts{
+				entity:    tsdb.Entity{tsdb.AnyEntry, tsdb.AnyEntry, tsdb.AnyEntry},
+				timeRange: tsdb.NewTimeRangeDuration(baseTime, 1*time.Hour),
+				buildFn: func(builder tsdb.SeekerBuilder) {
+					rule := &databasev2.IndexRule{
+						Metadata: &commonv2.Metadata{
+							Name:  "duration",
+							Group: "default",
+							Id:    3,
+						},
+						Tags:     []string{"duration"},
+						Type:     databasev2.IndexRule_TYPE_TREE,
+						Location: databasev2.IndexRule_LOCATION_SERIES,
+					}
+					builder.Filter(rule, tsdb.Condition{
+						"duration": []index.ConditionValue{
+							{
+								Op:     modelv2.Condition_BINARY_OP_LT,
+								Values: [][]byte{convert.Int64ToBytes(500)},
+							},
+						},
+					})
+					builder.OrderByIndex(rule, modelv2.QueryOrder_SORT_ASC)
+				},
+			},
+			want: shardsForTest{
+				{
+					id:       0,
+					location: []string{"series_16283518706331625322", "data_flow_0"},
+					elements: []string{"4"},
+				},
+				{
+					id:       0,
+					location: []string{"series_4862694201852929188", "data_flow_0"},
+				},
+				{
+					id:       1,
+					location: []string{"series_13343478452567673284", "data_flow_0"},
+				},
+				{
+					id:       1,
+					location: []string{"series_7898679171060804990", "data_flow_0"},
+					elements: []string{"3", "5"},
+				},
+			},
+		},
+		{
+			name: "filter by several conditions",
+			args: queryOpts{
+				entity:    tsdb.Entity{tsdb.AnyEntry, tsdb.AnyEntry, tsdb.AnyEntry},
+				timeRange: tsdb.NewTimeRangeDuration(baseTime, 1*time.Hour),
+				buildFn: func(builder tsdb.SeekerBuilder) {
+					rule := &databasev2.IndexRule{
+						Metadata: &commonv2.Metadata{
+							Name:  "duration",
+							Group: "default",
+							Id:    3,
+						},
+						Tags:     []string{"duration"},
+						Type:     databasev2.IndexRule_TYPE_TREE,
+						Location: databasev2.IndexRule_LOCATION_SERIES,
+					}
+					builder.Filter(rule, tsdb.Condition{
+						"duration": []index.ConditionValue{
+							{
+								Op:     modelv2.Condition_BINARY_OP_LT,
+								Values: [][]byte{convert.Int64ToBytes(500)},
+							},
+						},
+					})
+					builder.Filter(&databasev2.IndexRule{
+						Metadata: &commonv2.Metadata{
+							Name:  "endpoint_id",
+							Group: "default",
+							Id:    4,
+						},
+						Tags:     []string{"endpoint_id"},
+						Type:     databasev2.IndexRule_TYPE_INVERTED,
+						Location: databasev2.IndexRule_LOCATION_SERIES,
+					}, tsdb.Condition{
+						"endpoint_id": []index.ConditionValue{
+							{
+								Op:     modelv2.Condition_BINARY_OP_EQ,
+								Values: [][]byte{[]byte("/home_id")},
+							},
+						},
+					})
+				},
+			},
+			want: shardsForTest{
+				{
+					id:       0,
+					location: []string{"series_16283518706331625322", "data_flow_0"},
+				},
+				{
+					id:       0,
+					location: []string{"series_4862694201852929188", "data_flow_0"},
+				},
+				{
+					id:       1,
+					location: []string{"series_13343478452567673284", "data_flow_0"},
+				},
+				{
+					id:       1,
+					location: []string{"series_7898679171060804990", "data_flow_0"},
+					elements: []string{"3"},
+				},
+			},
+		},
+		{
+			name: "filter by several conditions, sort by duration",
+			args: queryOpts{
+				entity:    tsdb.Entity{tsdb.AnyEntry, tsdb.AnyEntry, tsdb.AnyEntry},
+				timeRange: tsdb.NewTimeRangeDuration(baseTime, 1*time.Hour),
+				buildFn: func(builder tsdb.SeekerBuilder) {
+					rule := &databasev2.IndexRule{
+						Metadata: &commonv2.Metadata{
+							Name:  "duration",
+							Group: "default",
+							Id:    3,
+						},
+						Tags:     []string{"duration"},
+						Type:     databasev2.IndexRule_TYPE_TREE,
+						Location: databasev2.IndexRule_LOCATION_SERIES,
+					}
+					builder.Filter(rule, tsdb.Condition{
+						"duration": []index.ConditionValue{
+							{
+								Op:     modelv2.Condition_BINARY_OP_LT,
+								Values: [][]byte{convert.Int64ToBytes(500)},
+							},
+						},
+					})
+					builder.OrderByIndex(rule, modelv2.QueryOrder_SORT_ASC)
+					builder.Filter(&databasev2.IndexRule{
+						Metadata: &commonv2.Metadata{
+							Name:  "endpoint_id",
+							Group: "default",
+							Id:    4,
+						},
+						Tags:     []string{"endpoint_id"},
+						Type:     databasev2.IndexRule_TYPE_INVERTED,
+						Location: databasev2.IndexRule_LOCATION_SERIES,
+					}, tsdb.Condition{
+						"endpoint_id": []index.ConditionValue{
+							{
+								Op:     modelv2.Condition_BINARY_OP_EQ,
+								Values: [][]byte{[]byte("/home_id")},
+							},
+						},
+					})
+				},
+			},
+			want: shardsForTest{
+				{
+					id:       0,
+					location: []string{"series_16283518706331625322", "data_flow_0"},
+				},
+				{
+					id:       0,
+					location: []string{"series_4862694201852929188", "data_flow_0"},
+				},
+				{
+					id:       1,
+					location: []string{"series_13343478452567673284", "data_flow_0"},
+				},
+				{
+					id:       1,
+					location: []string{"series_7898679171060804990", "data_flow_0"},
+					elements: []string{"3"},
+				},
+			},
+		},
 	}
 
 	for _, tt := range tests {
diff --git a/banyand/tsdb/series_seek.go b/banyand/tsdb/series_seek.go
index 5f46390..0c8700b 100644
--- a/banyand/tsdb/series_seek.go
+++ b/banyand/tsdb/series_seek.go
@@ -68,15 +68,11 @@ func (s *seekerBuilder) Build() (Seeker, error) {
 	if s.order == modelv2.QueryOrder_SORT_UNSPECIFIED {
 		s.order = modelv2.QueryOrder_SORT_DESC
 	}
-	indexFilter, err := s.buildIndexFilter()
+	conditions, err := s.buildConditions()
 	if err != nil {
 		return nil, err
 	}
-	filters := make([]filterFn, 0, 2)
-	if indexFilter != nil {
-		filters = append(filters, indexFilter)
-	}
-	se, err := s.buildSeries(filters)
+	se, err := s.buildSeries(conditions)
 	if err != nil {
 		return nil, err
 	}
diff --git a/banyand/tsdb/series_seek_filter.go b/banyand/tsdb/series_seek_filter.go
index eca279b..8a698c6 100644
--- a/banyand/tsdb/series_seek_filter.go
+++ b/banyand/tsdb/series_seek_filter.go
@@ -23,7 +23,6 @@ import (
 	databasev2 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v2"
 	"github.com/apache/skywalking-banyandb/pkg/index"
 	"github.com/apache/skywalking-banyandb/pkg/index/posting"
-	"github.com/apache/skywalking-banyandb/pkg/index/posting/roaring"
 )
 
 var ErrUnsupportedIndexRule = errors.New("the index rule is not supported")
@@ -43,11 +42,16 @@ func (s *seekerBuilder) Filter(indexRule *databasev2.IndexRule, condition Condit
 	return s
 }
 
-func (s *seekerBuilder) buildIndexFilter() (filterFn, error) {
+type condWithIRT struct {
+	indexRuleType databasev2.IndexRule_Type
+	condition     index.Condition
+}
+
+func (s *seekerBuilder) buildConditions() ([]condWithIRT, error) {
 	if len(s.conditions) < 1 {
 		return nil, nil
 	}
-	var treeIndexCondition, invertedIndexCondition []index.Condition
+	conditions := make([]condWithIRT, 0, len(s.conditions))
 	for _, condition := range s.conditions {
 		if len(condition.condition) > 1 {
 			//TODO:// should support composite index rule
@@ -62,18 +66,17 @@ func (s *seekerBuilder) buildIndexFilter() (filterFn, error) {
 			cond[term] = c
 			break
 		}
-		switch condition.indexRuleType {
-		case databasev2.IndexRule_TYPE_TREE:
-			treeIndexCondition = append(treeIndexCondition, cond)
-		case databasev2.IndexRule_TYPE_INVERTED:
-			invertedIndexCondition = append(invertedIndexCondition, cond)
-		}
+		conditions = append(conditions, condWithIRT{indexRuleType: condition.indexRuleType, condition: cond})
 	}
-	allItemIDs := roaring.NewPostingList()
-	addIDs := func(allList posting.List, searcher index.Searcher, cond index.Condition) error {
+	return conditions, nil
+}
+
+func (s *seekerBuilder) buildIndexFilter(block blockDelegate, conditions []condWithIRT) (filterFn, error) {
+	var allItemIDs posting.List
+	addIDs := func(allList posting.List, searcher index.Searcher, cond index.Condition) (posting.List, bool, error) {
 		tree, err := index.BuildTree(searcher, cond)
 		if err != nil {
-			return err
+			return nil, false, err
 		}
 		rangeOpts, found := tree.TrimRangeLeaf(index.FieldKey{
 			SeriesID:    s.seriesSpan.seriesID,
@@ -83,28 +86,47 @@ func (s *seekerBuilder) buildIndexFilter() (filterFn, error) {
 			s.rangeOptsForSorting = rangeOpts
 		}
 		list, err := tree.Execute()
-		if err != nil {
-			return err
+		if errors.Is(err, index.ErrEmptyTree) {
+			return allList, false, nil
 		}
-		err = allList.Union(list)
 		if err != nil {
-			return err
+			return nil, false, err
 		}
-		return nil
-	}
-	for _, b := range s.seriesSpan.blocks {
-		for _, tCond := range treeIndexCondition {
-			err := addIDs(allItemIDs, b.lsmIndexReader(), tCond)
+		if allList == nil {
+			allList = list
+		} else {
+			err = allList.Intersect(list)
 			if err != nil {
-				return nil, err
+				return nil, false, err
 			}
 		}
-		for _, iCond := range invertedIndexCondition {
-			err := addIDs(allItemIDs, b.invertedIndexReader(), iCond)
-			if err != nil {
-				return nil, err
-			}
+		return allList, true, nil
+	}
+	allInvalid := true
+	for i, condition := range conditions {
+		var valid bool
+		var err error
+		switch condition.indexRuleType {
+		case databasev2.IndexRule_TYPE_INVERTED:
+			allItemIDs, valid, err = addIDs(allItemIDs, block.invertedIndexReader(), condition.condition)
+		case databasev2.IndexRule_TYPE_TREE:
+			allItemIDs, valid, err = addIDs(allItemIDs, block.lsmIndexReader(), condition.condition)
+		default:
+			return nil, ErrUnsupportedIndexRule
 		}
+		if err != nil {
+			return nil, err
+		}
+		if i > 0 && allItemIDs.IsEmpty() {
+			return func(_ Item) bool {
+				return false
+			}, nil
+		}
+		allInvalid = allInvalid && !valid
+	}
+
+	if allInvalid {
+		return nil, nil
 	}
 	return func(item Item) bool {
 		valid := allItemIDs.Contains(item.ID())
diff --git a/banyand/tsdb/series_seek_sort.go b/banyand/tsdb/series_seek_sort.go
index 1529aa7..1d9d944 100644
--- a/banyand/tsdb/series_seek_sort.go
+++ b/banyand/tsdb/series_seek_sort.go
@@ -33,6 +33,8 @@ import (
 	"github.com/apache/skywalking-banyandb/pkg/logger"
 )
 
+var emptyFilters = make([]filterFn, 0)
+
 func (s *seekerBuilder) OrderByIndex(indexRule *databasev2.IndexRule, order modelv2.QueryOrder_Sort) SeekerBuilder {
 	s.indexRuleForSorting = indexRule
 	s.order = order
@@ -45,22 +47,22 @@ func (s *seekerBuilder) OrderByTime(order modelv2.QueryOrder_Sort) SeekerBuilder
 	return s
 }
 
-func (s *seekerBuilder) buildSeries(filters []filterFn) ([]Iterator, error) {
+func (s *seekerBuilder) buildSeries(conditions []condWithIRT) ([]Iterator, error) {
 	if s.indexRuleForSorting == nil {
-		return s.buildSeriesByTime(filters)
+		return s.buildSeriesByTime(conditions)
 	}
-	filters = append(filters, func(item Item) bool {
+	return s.buildSeriesByIndex(conditions)
+}
+
+func (s *seekerBuilder) buildSeriesByIndex(conditions []condWithIRT) (series []Iterator, err error) {
+	timeFilter := func(item Item) bool {
 		valid := s.seriesSpan.timeRange.contains(item.Time())
 		timeRange := s.seriesSpan.timeRange
 		s.seriesSpan.l.Trace().
 			Times("time_range", []time.Time{timeRange.Start, timeRange.End}).
 			Bool("valid", valid).Msg("filter item by time range")
 		return valid
-	})
-	return s.buildSeriesByIndex(filters)
-}
-
-func (s *seekerBuilder) buildSeriesByIndex(filters []filterFn) (series []Iterator, err error) {
+	}
 	for _, b := range s.seriesSpan.blocks {
 		var inner index.FieldIterator
 		var err error
@@ -68,7 +70,14 @@ func (s *seekerBuilder) buildSeriesByIndex(filters []filterFn) (series []Iterato
 			SeriesID:    s.seriesSpan.seriesID,
 			IndexRuleID: s.indexRuleForSorting.GetMetadata().GetId(),
 		}
-
+		filters := []filterFn{timeFilter}
+		filter, err := s.buildIndexFilter(b, conditions)
+		if err != nil {
+			return nil, err
+		}
+		if filter != nil {
+			filters = append(filters, filter)
+		}
 		switch s.indexRuleForSorting.GetType() {
 		case databasev2.IndexRule_TYPE_TREE:
 			inner, err = b.lsmIndexReader().Iterator(fieldKey, s.rangeOptsForSorting, s.order)
@@ -85,7 +94,7 @@ func (s *seekerBuilder) buildSeriesByIndex(filters []filterFn) (series []Iterato
 	return
 }
 
-func (s *seekerBuilder) buildSeriesByTime(filters []filterFn) ([]Iterator, error) {
+func (s *seekerBuilder) buildSeriesByTime(conditions []condWithIRT) ([]Iterator, error) {
 	bb := s.seriesSpan.blocks
 	switch s.order {
 	case modelv2.QueryOrder_SORT_ASC:
@@ -120,7 +129,15 @@ func (s *seekerBuilder) buildSeriesByTime(filters []filterFn) ([]Iterator, error
 			return nil, err
 		}
 		if inner != nil {
-			delegated = append(delegated, newSearcherIterator(s.seriesSpan.l, inner, b.dataReader(), s.seriesSpan.seriesID, filters))
+			filter, err := s.buildIndexFilter(b, conditions)
+			if err != nil {
+				return nil, err
+			}
+			if filter == nil {
+				delegated = append(delegated, newSearcherIterator(s.seriesSpan.l, inner, b.dataReader(), s.seriesSpan.seriesID, emptyFilters))
+			} else {
+				delegated = append(delegated, newSearcherIterator(s.seriesSpan.l, inner, b.dataReader(), s.seriesSpan.seriesID, []filterFn{filter}))
+			}
 		}
 	}
 	s.seriesSpan.l.Debug().
diff --git a/pkg/index/tree.go b/pkg/index/tree.go
index 6ac7022..35f2f4b 100644
--- a/pkg/index/tree.go
+++ b/pkg/index/tree.go
@@ -29,7 +29,10 @@ import (
 	"github.com/apache/skywalking-banyandb/pkg/index/posting"
 )
 
-var ErrNotRangeOperation = errors.New("this is not an range operation")
+var (
+	ErrNotRangeOperation = errors.New("this is not an range operation")
+	ErrEmptyTree         = errors.New("tree is empty")
+)
 
 type Executor interface {
 	Execute() (posting.List, error)
@@ -151,7 +154,7 @@ func (n *node) addRangeLeaf(key FieldKey) *rangeOp {
 			Key:      key,
 			searcher: n.searcher,
 		},
-		Opts: RangeOpts{},
+		Opts: &RangeOpts{},
 	}
 	n.SubNodes = append(n.SubNodes, r)
 	return r
@@ -184,6 +187,9 @@ func (n *node) pop() (Executor, bool) {
 func execute(n *node, lp logicalOP) (posting.List, error) {
 	ex, hasNext := n.pop()
 	if !hasNext {
+		if n.value == nil {
+			return nil, ErrEmptyTree
+		}
 		return n.value, nil
 	}
 	r, err := ex.Execute()
@@ -219,7 +225,7 @@ func (an *andNode) TrimRangeLeaf(key FieldKey) (RangeOpts, bool) {
 		}
 		if key.Equal(leafRange.Key) {
 			an.SubNodes = removeLeaf(an.SubNodes, i)
-			return leafRange.Opts, true
+			return *leafRange.Opts, true
 		}
 	}
 	return RangeOpts{}, false
@@ -309,11 +315,11 @@ func (eq *eq) MarshalJSON() ([]byte, error) {
 
 type rangeOp struct {
 	*leaf
-	Opts RangeOpts
+	Opts *RangeOpts
 }
 
 func (r *rangeOp) Execute() (posting.List, error) {
-	return r.searcher.Range(r.Key, r.Opts)
+	return r.searcher.Range(r.Key, *r.Opts)
 }
 
 func (r *rangeOp) MarshalJSON() ([]byte, error) {