You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by lu...@apache.org on 2023/01/10 08:59:55 UTC
[skywalking-banyandb] 01/01: unify filter
This is an automated email from the ASF dual-hosted git repository.
lujiajing pushed a commit to branch unify-index-filters
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 691d567d28a2e41334c79ac3c5e03e1501d3c653
Author: Megrez Lu <lu...@gmail.com>
AuthorDate: Tue Jan 10 16:59:38 2023 +0800
unify filter
---
banyand/measure/measure_topn.go | 86 +++++------------------------------------
pkg/pb/v1/write.go | 19 +++++++++
pkg/query/logical/schema.go | 19 ++++++++-
pkg/query/logical/tag_filter.go | 13 +++++--
4 files changed, 55 insertions(+), 82 deletions(-)
diff --git a/banyand/measure/measure_topn.go b/banyand/measure/measure_topn.go
index 4d51099..4dd50f3 100644
--- a/banyand/measure/measure_topn.go
+++ b/banyand/measure/measure_topn.go
@@ -43,6 +43,7 @@ import (
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/partition"
pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+ "github.com/apache/skywalking-banyandb/pkg/query/logical"
)
const (
@@ -56,8 +57,6 @@ var (
_ io.Closer = (*topNProcessorManager)(nil)
_ flow.Sink = (*topNStreamingProcessor)(nil)
- errUnsupportedConditionValueType = errors.New("unsupported value type in the condition")
-
// TopNValueFieldSpec denotes the field specification of the topN calculated result.
TopNValueFieldSpec = &databasev1.FieldSpec{
Name: "value",
@@ -393,88 +392,23 @@ func (manager *topNProcessorManager) buildFilter(criteria *modelv1.Criteria) (fl
}, nil
}
- f, err := manager.buildFilterForCriteria(criteria)
+ f, err := logical.BuildSimpleTagFilter(criteria)
if err != nil {
return nil, err
}
return func(_ context.Context, dataPoint any) bool {
- tfs := dataPoint.(*measurev1.DataPointValue).GetTagFamilies()
- return f.predicate(tfs)
+ tffw := dataPoint.(*measurev1.DataPointValue).GetTagFamilies()
+ tfs := pbv1.AttachSchema(tffw, manager.m.schema)
+ ok, matchErr := f.Match(tfs)
+ if matchErr != nil {
+ manager.l.Err(matchErr).Msg("fail to match criteria")
+ return false
+ }
+ return ok
}, nil
}
-func (manager *topNProcessorManager) buildFilterForCriteria(criteria *modelv1.Criteria) (conditionFilter, error) {
- switch v := criteria.GetExp().(type) {
- case *modelv1.Criteria_Condition:
- return manager.buildFilterForCondition(v.Condition)
- case *modelv1.Criteria_Le:
- return manager.buildFilterForLogicalExpr(v.Le)
- default:
- return nil, errors.New("should not reach here")
- }
-}
-
-// buildFilterForCondition builds a logical and composable filter for a logical expression which have underlying conditions,
-// or nested logical expressions as its children.
-func (manager *topNProcessorManager) buildFilterForLogicalExpr(logicalExpr *modelv1.LogicalExpression) (conditionFilter, error) {
- left, lErr := manager.buildFilterForCriteria(logicalExpr.Left)
- if lErr != nil {
- return nil, lErr
- }
- right, rErr := manager.buildFilterForCriteria(logicalExpr.Right)
- if rErr != nil {
- return nil, rErr
- }
- return composeWithOp(left, right, logicalExpr.Op), nil
-}
-
-func composeWithOp(left, right conditionFilter, op modelv1.LogicalExpression_LogicalOp) conditionFilter {
- if op == modelv1.LogicalExpression_LOGICAL_OP_AND {
- return &andFilter{left, right}
- }
- return &orFilter{left, right}
-}
-
-// buildFilterForCondition builds a single, composable filter for a single condition.
-func (manager *topNProcessorManager) buildFilterForCondition(cond *modelv1.Condition) (conditionFilter, error) {
- familyOffset, tagOffset, spec := pbv1.FindTagByName(manager.m.GetSchema().GetTagFamilies(), cond.GetName())
- if spec == nil {
- return nil, errors.New("fail to parse tag by name")
- }
- switch v := cond.GetValue().GetValue().(type) {
- case *modelv1.TagValue_Int:
- return &int64TagFilter{
- TagLocator: partition.TagLocator{
- FamilyOffset: familyOffset,
- TagOffset: tagOffset,
- },
- op: cond.GetOp(),
- val: v.Int.GetValue(),
- }, nil
- case *modelv1.TagValue_Str:
- return &strTagFilter{
- TagLocator: partition.TagLocator{
- FamilyOffset: familyOffset,
- TagOffset: tagOffset,
- },
- op: cond.GetOp(),
- val: v.Str.GetValue(),
- }, nil
- case *modelv1.TagValue_Id:
- return &idTagFilter{
- TagLocator: partition.TagLocator{
- FamilyOffset: familyOffset,
- TagOffset: tagOffset,
- },
- op: cond.GetOp(),
- val: v.Id.GetValue(),
- }, nil
- default:
- return nil, errUnsupportedConditionValueType
- }
-}
-
func (manager *topNProcessorManager) buildMapper(fieldName string, groupByNames ...string) (flow.UnaryFunc[any], error) {
fieldIdx := slices.IndexFunc(manager.m.GetSchema().GetFields(), func(spec *databasev1.FieldSpec) bool {
return spec.GetName() == fieldName
diff --git a/pkg/pb/v1/write.go b/pkg/pb/v1/write.go
index 03c470b..a8147c0 100644
--- a/pkg/pb/v1/write.go
+++ b/pkg/pb/v1/write.go
@@ -217,3 +217,22 @@ func DecodeFieldFlag(key []byte) (*databasev1.FieldSpec, time.Duration, error) {
CompressionMethod: databasev1.CompressionMethod(int32(b[0] & 0x0F)),
}, time.Duration(convert.BytesToInt64(b[1:])), nil
}
+
+func AttachSchema(tffws []*modelv1.TagFamilyForWrite, measure *databasev1.Measure) []*modelv1.TagFamily {
+ tfs := make([]*modelv1.TagFamily, len(tffws))
+ for tagFamilyIdx := 0; tagFamilyIdx < len(tffws); tagFamilyIdx++ {
+ tffw := tffws[tagFamilyIdx]
+ tags := make([]*modelv1.Tag, len(tffw.GetTags()))
+ for tagIndex := 0; tagIndex < len(tffw.GetTags()); tagIndex++ {
+ tags[tagIndex] = &modelv1.Tag{
+ Key: measure.TagFamilies[tagFamilyIdx].Tags[tagIndex].Name,
+ Value: tffw.GetTags()[tagIndex],
+ }
+ }
+ tfs[tagFamilyIdx] = &modelv1.TagFamily{
+ Name: measure.TagFamilies[tagFamilyIdx].Name,
+ Tags: tags,
+ }
+ }
+ return tfs
+}
diff --git a/pkg/query/logical/schema.go b/pkg/query/logical/schema.go
index 4512b9d..ac320a9 100644
--- a/pkg/query/logical/schema.go
+++ b/pkg/query/logical/schema.go
@@ -24,12 +24,27 @@ import (
"github.com/apache/skywalking-banyandb/banyand/tsdb"
)
+// IndexChecker allows checking the existence of a specific index rule
+type IndexChecker interface {
+ IndexDefined(tagName string) (bool, *databasev1.IndexRule)
+ IndexRuleDefined(ruleName string) (bool, *databasev1.IndexRule)
+}
+
+type emptyIndexChecker struct{}
+
+func (emptyIndexChecker) IndexDefined(tagName string) (bool, *databasev1.IndexRule) {
+ return false, nil
+}
+
+func (emptyIndexChecker) IndexRuleDefined(ruleName string) (bool, *databasev1.IndexRule) {
+ return false, nil
+}
+
// Schema allows retrieving schemas in a convenient way.
type Schema interface {
+ IndexChecker
Scope() tsdb.Entry
EntityList() []string
- IndexDefined(tagName string) (bool, *databasev1.IndexRule)
- IndexRuleDefined(string) (bool, *databasev1.IndexRule)
CreateTagRef(tags ...[]*Tag) ([][]*TagRef, error)
CreateFieldRef(fields ...*Field) ([]*FieldRef, error)
ProjTags(refs ...[]*TagRef) Schema
diff --git a/pkg/query/logical/tag_filter.go b/pkg/query/logical/tag_filter.go
index cafd665..03b6979 100644
--- a/pkg/query/logical/tag_filter.go
+++ b/pkg/query/logical/tag_filter.go
@@ -35,8 +35,13 @@ type TagFilter interface {
Match(tagFamilies []*modelv1.TagFamily) (bool, error)
}
+// BuildSimpleTagFilter returns a TagFilter without any local-index, global index, sharding key support.
+func BuildSimpleTagFilter(criteria *modelv1.Criteria) (TagFilter, error) {
+ return BuildTagFilter(criteria, nil, emptyIndexChecker{}, false)
+}
+
// BuildTagFilter returns a TagFilter if predicates doesn't match any indices.
-func BuildTagFilter(criteria *modelv1.Criteria, entityDict map[string]int, schema Schema, hasGlobalIndex bool) (TagFilter, error) {
+func BuildTagFilter(criteria *modelv1.Criteria, entityDict map[string]int, indexChecker IndexChecker, hasGlobalIndex bool) (TagFilter, error) {
if criteria == nil {
return DummyFilter, nil
}
@@ -47,7 +52,7 @@ func BuildTagFilter(criteria *modelv1.Criteria, entityDict map[string]int, schem
if err != nil {
return nil, err
}
- if ok, _ := schema.IndexDefined(cond.Name); ok {
+ if ok, _ := indexChecker.IndexDefined(cond.Name); ok {
return DummyFilter, nil
}
if _, ok := entityDict[cond.Name]; ok {
@@ -56,11 +61,11 @@ func BuildTagFilter(criteria *modelv1.Criteria, entityDict map[string]int, schem
return parseFilter(cond, expr)
case *modelv1.Criteria_Le:
le := criteria.GetLe()
- left, err := BuildTagFilter(le.Left, entityDict, schema, hasGlobalIndex)
+ left, err := BuildTagFilter(le.Left, entityDict, indexChecker, hasGlobalIndex)
if err != nil {
return nil, err
}
- right, err := BuildTagFilter(le.Right, entityDict, schema, hasGlobalIndex)
+ right, err := BuildTagFilter(le.Right, entityDict, indexChecker, hasGlobalIndex)
if err != nil {
return nil, err
}