You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by lu...@apache.org on 2023/01/10 08:59:55 UTC

[skywalking-banyandb] 01/01: unify filter

This is an automated email from the ASF dual-hosted git repository.

lujiajing pushed a commit to branch unify-index-filters
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit 691d567d28a2e41334c79ac3c5e03e1501d3c653
Author: Megrez Lu <lu...@gmail.com>
AuthorDate: Tue Jan 10 16:59:38 2023 +0800

    unify filter
---
 banyand/measure/measure_topn.go | 86 +++++------------------------------------
 pkg/pb/v1/write.go              | 19 +++++++++
 pkg/query/logical/schema.go     | 19 ++++++++-
 pkg/query/logical/tag_filter.go | 13 +++++--
 4 files changed, 55 insertions(+), 82 deletions(-)

diff --git a/banyand/measure/measure_topn.go b/banyand/measure/measure_topn.go
index 4d51099..4dd50f3 100644
--- a/banyand/measure/measure_topn.go
+++ b/banyand/measure/measure_topn.go
@@ -43,6 +43,7 @@ import (
 	"github.com/apache/skywalking-banyandb/pkg/logger"
 	"github.com/apache/skywalking-banyandb/pkg/partition"
 	pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+	"github.com/apache/skywalking-banyandb/pkg/query/logical"
 )
 
 const (
@@ -56,8 +57,6 @@ var (
 	_ io.Closer = (*topNProcessorManager)(nil)
 	_ flow.Sink = (*topNStreamingProcessor)(nil)
 
-	errUnsupportedConditionValueType = errors.New("unsupported value type in the condition")
-
 	// TopNValueFieldSpec denotes the field specification of the topN calculated result.
 	TopNValueFieldSpec = &databasev1.FieldSpec{
 		Name:              "value",
@@ -393,88 +392,23 @@ func (manager *topNProcessorManager) buildFilter(criteria *modelv1.Criteria) (fl
 		}, nil
 	}
 
-	f, err := manager.buildFilterForCriteria(criteria)
+	f, err := logical.BuildSimpleTagFilter(criteria)
 	if err != nil {
 		return nil, err
 	}
 
 	return func(_ context.Context, dataPoint any) bool {
-		tfs := dataPoint.(*measurev1.DataPointValue).GetTagFamilies()
-		return f.predicate(tfs)
+		tffw := dataPoint.(*measurev1.DataPointValue).GetTagFamilies()
+		tfs := pbv1.AttachSchema(tffw, manager.m.schema)
+		ok, matchErr := f.Match(tfs)
+		if matchErr != nil {
+			manager.l.Err(matchErr).Msg("fail to match criteria")
+			return false
+		}
+		return ok
 	}, nil
 }
 
-func (manager *topNProcessorManager) buildFilterForCriteria(criteria *modelv1.Criteria) (conditionFilter, error) {
-	switch v := criteria.GetExp().(type) {
-	case *modelv1.Criteria_Condition:
-		return manager.buildFilterForCondition(v.Condition)
-	case *modelv1.Criteria_Le:
-		return manager.buildFilterForLogicalExpr(v.Le)
-	default:
-		return nil, errors.New("should not reach here")
-	}
-}
-
-// buildFilterForCondition builds a logical and composable filter for a logical expression which have underlying conditions,
-// or nested logical expressions as its children.
-func (manager *topNProcessorManager) buildFilterForLogicalExpr(logicalExpr *modelv1.LogicalExpression) (conditionFilter, error) {
-	left, lErr := manager.buildFilterForCriteria(logicalExpr.Left)
-	if lErr != nil {
-		return nil, lErr
-	}
-	right, rErr := manager.buildFilterForCriteria(logicalExpr.Right)
-	if rErr != nil {
-		return nil, rErr
-	}
-	return composeWithOp(left, right, logicalExpr.Op), nil
-}
-
-func composeWithOp(left, right conditionFilter, op modelv1.LogicalExpression_LogicalOp) conditionFilter {
-	if op == modelv1.LogicalExpression_LOGICAL_OP_AND {
-		return &andFilter{left, right}
-	}
-	return &orFilter{left, right}
-}
-
-// buildFilterForCondition builds a single, composable filter for a single condition.
-func (manager *topNProcessorManager) buildFilterForCondition(cond *modelv1.Condition) (conditionFilter, error) {
-	familyOffset, tagOffset, spec := pbv1.FindTagByName(manager.m.GetSchema().GetTagFamilies(), cond.GetName())
-	if spec == nil {
-		return nil, errors.New("fail to parse tag by name")
-	}
-	switch v := cond.GetValue().GetValue().(type) {
-	case *modelv1.TagValue_Int:
-		return &int64TagFilter{
-			TagLocator: partition.TagLocator{
-				FamilyOffset: familyOffset,
-				TagOffset:    tagOffset,
-			},
-			op:  cond.GetOp(),
-			val: v.Int.GetValue(),
-		}, nil
-	case *modelv1.TagValue_Str:
-		return &strTagFilter{
-			TagLocator: partition.TagLocator{
-				FamilyOffset: familyOffset,
-				TagOffset:    tagOffset,
-			},
-			op:  cond.GetOp(),
-			val: v.Str.GetValue(),
-		}, nil
-	case *modelv1.TagValue_Id:
-		return &idTagFilter{
-			TagLocator: partition.TagLocator{
-				FamilyOffset: familyOffset,
-				TagOffset:    tagOffset,
-			},
-			op:  cond.GetOp(),
-			val: v.Id.GetValue(),
-		}, nil
-	default:
-		return nil, errUnsupportedConditionValueType
-	}
-}
-
 func (manager *topNProcessorManager) buildMapper(fieldName string, groupByNames ...string) (flow.UnaryFunc[any], error) {
 	fieldIdx := slices.IndexFunc(manager.m.GetSchema().GetFields(), func(spec *databasev1.FieldSpec) bool {
 		return spec.GetName() == fieldName
diff --git a/pkg/pb/v1/write.go b/pkg/pb/v1/write.go
index 03c470b..a8147c0 100644
--- a/pkg/pb/v1/write.go
+++ b/pkg/pb/v1/write.go
@@ -217,3 +217,22 @@ func DecodeFieldFlag(key []byte) (*databasev1.FieldSpec, time.Duration, error) {
 		CompressionMethod: databasev1.CompressionMethod(int32(b[0] & 0x0F)),
 	}, time.Duration(convert.BytesToInt64(b[1:])), nil
 }
+
+func AttachSchema(tffws []*modelv1.TagFamilyForWrite, measure *databasev1.Measure) []*modelv1.TagFamily {
+	tfs := make([]*modelv1.TagFamily, len(tffws))
+	for tagFamilyIdx := 0; tagFamilyIdx < len(tffws); tagFamilyIdx++ {
+		tffw := tffws[tagFamilyIdx]
+		tags := make([]*modelv1.Tag, len(tffw.GetTags()))
+		for tagIndex := 0; tagIndex < len(tffw.GetTags()); tagIndex++ {
+			tags[tagIndex] = &modelv1.Tag{
+				Key:   measure.TagFamilies[tagFamilyIdx].Tags[tagIndex].Name,
+				Value: tffw.GetTags()[tagIndex],
+			}
+		}
+		tfs[tagFamilyIdx] = &modelv1.TagFamily{
+			Name: measure.TagFamilies[tagFamilyIdx].Name,
+			Tags: tags,
+		}
+	}
+	return tfs
+}
diff --git a/pkg/query/logical/schema.go b/pkg/query/logical/schema.go
index 4512b9d..ac320a9 100644
--- a/pkg/query/logical/schema.go
+++ b/pkg/query/logical/schema.go
@@ -24,12 +24,27 @@ import (
 	"github.com/apache/skywalking-banyandb/banyand/tsdb"
 )
 
+// IndexChecker allows checking the existence of a specific index rule
+type IndexChecker interface {
+	IndexDefined(tagName string) (bool, *databasev1.IndexRule)
+	IndexRuleDefined(ruleName string) (bool, *databasev1.IndexRule)
+}
+
+type emptyIndexChecker struct{}
+
+func (emptyIndexChecker) IndexDefined(tagName string) (bool, *databasev1.IndexRule) {
+	return false, nil
+}
+
+func (emptyIndexChecker) IndexRuleDefined(ruleName string) (bool, *databasev1.IndexRule) {
+	return false, nil
+}
+
 // Schema allows retrieving schemas in a convenient way.
 type Schema interface {
+	IndexChecker
 	Scope() tsdb.Entry
 	EntityList() []string
-	IndexDefined(tagName string) (bool, *databasev1.IndexRule)
-	IndexRuleDefined(string) (bool, *databasev1.IndexRule)
 	CreateTagRef(tags ...[]*Tag) ([][]*TagRef, error)
 	CreateFieldRef(fields ...*Field) ([]*FieldRef, error)
 	ProjTags(refs ...[]*TagRef) Schema
diff --git a/pkg/query/logical/tag_filter.go b/pkg/query/logical/tag_filter.go
index cafd665..03b6979 100644
--- a/pkg/query/logical/tag_filter.go
+++ b/pkg/query/logical/tag_filter.go
@@ -35,8 +35,13 @@ type TagFilter interface {
 	Match(tagFamilies []*modelv1.TagFamily) (bool, error)
 }
 
+// BuildSimpleTagFilter returns a TagFilter without any local-index, global index, sharding key support.
+func BuildSimpleTagFilter(criteria *modelv1.Criteria) (TagFilter, error) {
+	return BuildTagFilter(criteria, nil, emptyIndexChecker{}, false)
+}
+
 // BuildTagFilter returns a TagFilter if predicates doesn't match any indices.
-func BuildTagFilter(criteria *modelv1.Criteria, entityDict map[string]int, schema Schema, hasGlobalIndex bool) (TagFilter, error) {
+func BuildTagFilter(criteria *modelv1.Criteria, entityDict map[string]int, indexChecker IndexChecker, hasGlobalIndex bool) (TagFilter, error) {
 	if criteria == nil {
 		return DummyFilter, nil
 	}
@@ -47,7 +52,7 @@ func BuildTagFilter(criteria *modelv1.Criteria, entityDict map[string]int, schem
 		if err != nil {
 			return nil, err
 		}
-		if ok, _ := schema.IndexDefined(cond.Name); ok {
+		if ok, _ := indexChecker.IndexDefined(cond.Name); ok {
 			return DummyFilter, nil
 		}
 		if _, ok := entityDict[cond.Name]; ok {
@@ -56,11 +61,11 @@ func BuildTagFilter(criteria *modelv1.Criteria, entityDict map[string]int, schem
 		return parseFilter(cond, expr)
 	case *modelv1.Criteria_Le:
 		le := criteria.GetLe()
-		left, err := BuildTagFilter(le.Left, entityDict, schema, hasGlobalIndex)
+		left, err := BuildTagFilter(le.Left, entityDict, indexChecker, hasGlobalIndex)
 		if err != nil {
 			return nil, err
 		}
-		right, err := BuildTagFilter(le.Right, entityDict, schema, hasGlobalIndex)
+		right, err := BuildTagFilter(le.Right, entityDict, indexChecker, hasGlobalIndex)
 		if err != nil {
 			return nil, err
 		}