You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ha...@apache.org on 2023/01/22 12:46:17 UTC
[skywalking-banyandb] branch main updated: Unify TagFilter for Streaming scenario (#239)
This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new c25160d0 Unify TagFilter for Streaming scenario (#239)
c25160d0 is described below
commit c25160d0321d6596fb84891628beba4470cbab3d
Author: Jiajing LU <lu...@gmail.com>
AuthorDate: Sun Jan 22 20:46:11 2023 +0800
Unify TagFilter for Streaming scenario (#239)
* unify filter
* move processor manager start to Run stage
---
banyand/measure/measure.go | 48 ++-
banyand/measure/measure_topn.go | 391 +++++++--------------
banyand/measure/measure_write.go | 30 +-
banyand/measure/service.go | 31 ++
banyand/metadata/schema/measure.go | 2 +-
pkg/query/logical/measure/measure_analyzer.go | 8 +-
pkg/query/logical/measure/schema.go | 12 +-
pkg/query/logical/schema.go | 71 +++-
pkg/query/logical/stream/schema.go | 11 +-
pkg/query/logical/stream/stream_analyzer.go | 9 +-
pkg/query/logical/stream/stream_plan_tag_filter.go | 5 +-
pkg/query/logical/tag_filter.go | 97 +++--
12 files changed, 359 insertions(+), 356 deletions(-)
diff --git a/banyand/measure/measure.go b/banyand/measure/measure.go
index 46485256..1e1e7e2f 100644
--- a/banyand/measure/measure.go
+++ b/banyand/measure/measure.go
@@ -26,11 +26,14 @@ import (
commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+ "github.com/apache/skywalking-banyandb/banyand/metadata"
+ "github.com/apache/skywalking-banyandb/banyand/queue"
"github.com/apache/skywalking-banyandb/banyand/tsdb"
"github.com/apache/skywalking-banyandb/banyand/tsdb/index"
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/partition"
pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+ "github.com/apache/skywalking-banyandb/pkg/query/logical"
"github.com/apache/skywalking-banyandb/pkg/timestamp"
)
@@ -50,11 +53,32 @@ type measure struct {
group string
entityLocator partition.EntityLocator
indexRules []*databasev1.IndexRule
+ topNAggregations []*databasev1.TopNAggregation
maxObservedModRevision int64
interval time.Duration
shardNum uint32
}
+func (s *measure) startSteamingManager(pipeline queue.Queue, repo metadata.Repo) error {
+ if len(s.topNAggregations) == 0 {
+ return nil
+ }
+ tagMapSpec := logical.TagSpecMap{}
+ tagMapSpec.RegisterTagFamilies(s.schema.GetTagFamilies())
+
+ s.processorManager = &topNProcessorManager{
+ l: s.l,
+ pipeline: pipeline,
+ repo: repo,
+ m: s,
+ s: tagMapSpec,
+ topNSchemas: s.topNAggregations,
+ processorMap: make(map[*commonv1.Metadata][]*topNStreamingProcessor),
+ }
+
+ return s.processorManager.start()
+}
+
func (s *measure) GetSchema() *databasev1.Measure {
return s.schema
}
@@ -76,6 +100,9 @@ func (s *measure) EntityLocator() partition.EntityLocator {
}
func (s *measure) Close() error {
+ if s.processorManager == nil {
+ return nil
+ }
return s.processorManager.Close()
}
@@ -97,10 +124,11 @@ type measureSpec struct {
func openMeasure(shardNum uint32, db tsdb.Supplier, spec measureSpec, l *logger.Logger) (*measure, error) {
m := &measure{
- shardNum: shardNum,
- schema: spec.schema,
- indexRules: spec.indexRules,
- l: l,
+ shardNum: shardNum,
+ schema: spec.schema,
+ indexRules: spec.indexRules,
+ topNAggregations: spec.topNAggregations,
+ l: l,
}
if err := m.parseSpec(); err != nil {
return nil, err
@@ -115,17 +143,5 @@ func openMeasure(shardNum uint32, db tsdb.Supplier, spec measureSpec, l *logger.
IndexRules: spec.indexRules,
})
- m.processorManager = &topNProcessorManager{
- l: l,
- m: m,
- topNSchemas: spec.topNAggregations,
- processorMap: make(map[*commonv1.Metadata][]*topNStreamingProcessor),
- }
-
- err := m.processorManager.start()
- if err != nil {
- return nil, err
- }
-
return m, nil
}
diff --git a/banyand/measure/measure_topn.go b/banyand/measure/measure_topn.go
index 4d510991..ac207ae0 100644
--- a/banyand/measure/measure_topn.go
+++ b/banyand/measure/measure_topn.go
@@ -26,23 +26,31 @@ import (
"sync"
"time"
+ "github.com/google/go-cmp/cmp"
"github.com/pkg/errors"
"go.uber.org/multierr"
"golang.org/x/exp/slices"
- "google.golang.org/protobuf/proto"
+ "google.golang.org/protobuf/testing/protocmp"
+ "google.golang.org/protobuf/types/known/timestamppb"
"github.com/apache/skywalking-banyandb/api/common"
+ apiData "github.com/apache/skywalking-banyandb/api/data"
commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
measurev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+ "github.com/apache/skywalking-banyandb/banyand/metadata"
+ "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
+ "github.com/apache/skywalking-banyandb/banyand/queue"
"github.com/apache/skywalking-banyandb/banyand/tsdb"
+ "github.com/apache/skywalking-banyandb/pkg/bus"
"github.com/apache/skywalking-banyandb/pkg/flow"
"github.com/apache/skywalking-banyandb/pkg/flow/streaming"
"github.com/apache/skywalking-banyandb/pkg/flow/streaming/sources"
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/partition"
pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+ "github.com/apache/skywalking-banyandb/pkg/query/logical"
)
const (
@@ -56,8 +64,6 @@ var (
_ io.Closer = (*topNProcessorManager)(nil)
_ flow.Sink = (*topNStreamingProcessor)(nil)
- errUnsupportedConditionValueType = errors.New("unsupported value type in the condition")
-
// TopNValueFieldSpec denotes the field specification of the topN calculated result.
TopNValueFieldSpec = &databasev1.FieldSpec{
Name: "value",
@@ -68,17 +74,17 @@ var (
)
type topNStreamingProcessor struct {
- databaseSupplier tsdb.Supplier
- streamingFlow flow.Flow
- l *logger.Logger
- topNSchema *databasev1.TopNAggregation
- src chan interface{}
- in chan flow.StreamRecord
- errCh <-chan error
- stopCh chan struct{}
+ m *measure
+ streamingFlow flow.Flow
+ l *logger.Logger
+ pipeline queue.Queue
+ topNSchema *databasev1.TopNAggregation
+ src chan interface{}
+ in chan flow.StreamRecord
+ errCh <-chan error
+ stopCh chan struct{}
flow.ComponentState
interval time.Duration
- shardNum uint32
sortDirection modelv1.Sort
}
@@ -100,7 +106,8 @@ func (t *topNStreamingProcessor) run(ctx context.Context) {
if !ok {
return
}
- if err := t.writeStreamRecord(ctx, record); err != nil {
+ // nolint: contextcheck
+ if err := t.writeStreamRecord(record); err != nil {
t.l.Err(err).Msg("fail to write stream record")
}
case <-ctx.Done():
@@ -126,7 +133,7 @@ func (t *topNStreamingProcessor) Close() error {
return err
}
-func (t *topNStreamingProcessor) writeStreamRecord(ctx context.Context, record flow.StreamRecord) error {
+func (t *topNStreamingProcessor) writeStreamRecord(record flow.StreamRecord) error {
tuples, ok := record.Data().([]*streaming.Tuple2)
if !ok {
return errors.New("invalid data type")
@@ -143,12 +150,12 @@ func (t *topNStreamingProcessor) writeStreamRecord(ctx context.Context, record f
for rankNum, tuple := range tuples {
fieldValue := tuple.V1.(int64)
data := tuple.V2.(flow.StreamRecord).Data().(flow.Data)
- err = multierr.Append(err, t.writeData(ctx, eventTime, timeBucket, fieldValue, data, rankNum))
+ err = multierr.Append(err, t.writeData(eventTime, timeBucket, fieldValue, data, rankNum))
}
return err
}
-func (t *topNStreamingProcessor) writeData(ctx context.Context, eventTime time.Time, timeBucket string, fieldValue int64, data flow.Data, rankNum int) error {
+func (t *topNStreamingProcessor) writeData(eventTime time.Time, timeBucket string, fieldValue int64, data flow.Data, rankNum int) error {
var tagValues []*modelv1.TagValue
if len(t.topNSchema.GetGroupByTagNames()) > 0 {
var ok bool
@@ -160,86 +167,59 @@ func (t *topNStreamingProcessor) writeData(ctx context.Context, eventTime time.T
if err != nil {
return err
}
- shard, err := t.databaseSupplier.SupplyTSDB().Shard(shardID)
- if err != nil {
- return err
- }
- series, err := shard.Series().Get(tsdb.HashEntity(entity), entityValues)
- if err != nil {
- return err
- }
- ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
- defer cancel()
- span, err := series.Create(ctx, eventTime)
- if err != nil {
- if span != nil {
- _ = span.Close()
- }
- return err
- }
+
// measureID is consist of three parts,
// 1. groupValues
// 2. rankNumber
// 3. timeBucket
measureID := data[0].(string) + "_" + strconv.Itoa(rankNum) + "_" + timeBucket
- writeFn := func() (tsdb.Writer, error) {
- builder := span.WriterBuilder().Time(eventTime)
- virtualTagFamily := &modelv1.TagFamilyForWrite{
- Tags: []*modelv1.TagValue{
- // MeasureID
- {
- Value: &modelv1.TagValue_Id{
- Id: &modelv1.ID{
- Value: measureID,
+ iwr := &measurev1.InternalWriteRequest{
+ Request: &measurev1.WriteRequest{
+ Metadata: t.topNSchema.GetMetadata(),
+ DataPoint: &measurev1.DataPointValue{
+ Timestamp: timestamppb.New(eventTime),
+ TagFamilies: []*modelv1.TagFamilyForWrite{
+ {
+ Tags: []*modelv1.TagValue{
+ // MeasureID
+ {
+ Value: &modelv1.TagValue_Id{
+ Id: &modelv1.ID{
+ Value: measureID,
+ },
+ },
+ },
+ // GroupValues for merge in post processor
+ {
+ Value: &modelv1.TagValue_Str{
+ Str: &modelv1.Str{
+ Value: data[0].(string),
+ },
+ },
+ },
},
},
},
- // GroupValues for merge in post processor
- {
- Value: &modelv1.TagValue_Str{
- Str: &modelv1.Str{
- Value: data[0].(string),
+ Fields: []*modelv1.FieldValue{
+ {
+ Value: &modelv1.FieldValue_Int{
+ Int: &modelv1.Int{
+ Value: fieldValue,
+ },
},
},
},
},
- }
- payload, errMarshal := proto.Marshal(virtualTagFamily)
- if errMarshal != nil {
- return nil, errMarshal
- }
- builder.Family(familyIdentity(TopNTagFamily, pbv1.TagFlag), payload)
- virtualFieldValue := &modelv1.FieldValue{
- Value: &modelv1.FieldValue_Int{
- Int: &modelv1.Int{
- Value: fieldValue,
- },
- },
- }
- fieldData := encodeFieldValue(virtualFieldValue)
- builder.Family(familyIdentity(TopNValueFieldSpec.GetName(), pbv1.EncoderFieldFlag(TopNValueFieldSpec, t.interval)), fieldData)
- writer, errWrite := builder.Build()
- if errWrite != nil {
- return nil, errWrite
- }
- _, errWrite = writer.Write()
- if e := t.l.Debug(); e.Enabled() {
- e.Time("ts", eventTime).
- Int("ts_nano", eventTime.Nanosecond()).
- Uint64("series_id", uint64(series.ID())).
- Stringer("series", series).
- Uint64("item_id", uint64(writer.ItemID().ID)).
- Int("shard_id", int(shardID)).
- Msg("write measure")
- }
- return writer, errWrite
+ },
+ ShardId: uint32(shardID),
+ SeriesHash: tsdb.HashEntity(entity),
}
- _, err = writeFn()
- if err != nil {
- _ = span.Close()
- return err
+ if t.l.Debug().Enabled() {
+ iwr.EntityValues = entityValues.Encode()
}
- return span.Close()
+ message := bus.NewMessage(bus.MessageID(time.Now().UnixNano()), iwr)
+ _, errWritePub := t.pipeline.Publish(apiData.TopicMeasureWrite, message)
+ return errWritePub
}
func (t *topNStreamingProcessor) downSampleTimeBucket(eventTimeMillis int64) time.Time {
@@ -268,7 +248,7 @@ func (t *topNStreamingProcessor) locate(tagValues []*modelv1.TagValue, rankNum i
if err != nil {
return nil, nil, 0, err
}
- id, err := partition.ShardID(e.Marshal(), t.shardNum)
+ id, err := partition.ShardID(e.Marshal(), t.m.shardNum)
if err != nil {
return nil, nil, 0, err
}
@@ -306,7 +286,10 @@ func (t *topNStreamingProcessor) handleError() {
// topNProcessorManager manages multiple topNStreamingProcessor(s) belonging to a single measure.
type topNProcessorManager struct {
l *logger.Logger
+ pipeline queue.Queue
+ repo metadata.Repo
m *measure
+ s logical.TagSpecRegistry
processorMap map[*commonv1.Metadata][]*topNStreamingProcessor
topNSchemas []*databasev1.TopNAggregation
sync.RWMutex
@@ -336,9 +319,54 @@ func (manager *topNProcessorManager) onMeasureWrite(request *measurev1.WriteRequ
}()
}
+func (manager *topNProcessorManager) createOrUpdateTopNMeasure(topNSchema *databasev1.TopNAggregation) error {
+ m, err := manager.repo.MeasureRegistry().GetMeasure(context.TODO(), topNSchema.GetMetadata())
+ if err != nil && !errors.Is(err, schema.ErrGRPCResourceNotFound) {
+ return err
+ }
+
+ // create a new "derived" measure for TopN result
+ newTopNMeasure := &databasev1.Measure{
+ Metadata: topNSchema.GetMetadata(),
+ Interval: manager.m.schema.GetInterval(),
+ TagFamilies: []*databasev1.TagFamilySpec{
+ {
+ Name: TopNTagFamily,
+ Tags: []*databasev1.TagSpec{
+ {
+ Name: "measure_id",
+ Type: databasev1.TagType_TAG_TYPE_ID,
+ },
+ {
+ Name: "group_values",
+ Type: databasev1.TagType_TAG_TYPE_STRING,
+ },
+ },
+ },
+ },
+ Fields: []*databasev1.FieldSpec{TopNValueFieldSpec},
+ }
+ if m == nil {
+ return manager.repo.MeasureRegistry().CreateMeasure(context.Background(), newTopNMeasure)
+ }
+ // compare with the old one
+ if cmp.Diff(newTopNMeasure, m,
+ protocmp.IgnoreUnknown(),
+ protocmp.IgnoreFields(&databasev1.Measure{}, "updated_at"),
+ protocmp.IgnoreFields(&commonv1.Metadata{}, "id", "create_revision", "mod_revision"),
+ protocmp.Transform()) == "" {
+ return nil
+ }
+ // update
+ return manager.repo.MeasureRegistry().UpdateMeasure(context.Background(), newTopNMeasure)
+}
+
func (manager *topNProcessorManager) start() error {
interval := manager.m.interval
for _, topNSchema := range manager.topNSchemas {
+ if createErr := manager.createOrUpdateTopNMeasure(topNSchema); createErr != nil {
+ return createErr
+ }
sortDirections := make([]modelv1.Sort, 0, 2)
if topNSchema.GetFieldValueSort() == modelv1.Sort_SORT_UNSPECIFIED {
sortDirections = append(sortDirections, modelv1.Sort_SORT_ASC, modelv1.Sort_SORT_DESC)
@@ -363,18 +391,17 @@ func (manager *topNProcessorManager) start() error {
return innerErr
}
streamingFlow = streamingFlow.Map(mapper)
-
processor := &topNStreamingProcessor{
- l: manager.l,
- shardNum: manager.m.shardNum,
- interval: interval,
- topNSchema: topNSchema,
- sortDirection: sortDirection,
- databaseSupplier: manager.m.databaseSupplier,
- src: srcCh,
- in: make(chan flow.StreamRecord),
- stopCh: make(chan struct{}),
- streamingFlow: streamingFlow,
+ m: manager.m,
+ l: manager.l,
+ interval: interval,
+ topNSchema: topNSchema,
+ sortDirection: sortDirection,
+ src: srcCh,
+ in: make(chan flow.StreamRecord),
+ stopCh: make(chan struct{}),
+ streamingFlow: streamingFlow,
+ pipeline: manager.pipeline,
}
processorList[i] = processor.start()
}
@@ -393,88 +420,22 @@ func (manager *topNProcessorManager) buildFilter(criteria *modelv1.Criteria) (fl
}, nil
}
- f, err := manager.buildFilterForCriteria(criteria)
+ f, err := logical.BuildSimpleTagFilter(criteria)
if err != nil {
return nil, err
}
return func(_ context.Context, dataPoint any) bool {
- tfs := dataPoint.(*measurev1.DataPointValue).GetTagFamilies()
- return f.predicate(tfs)
+ tffws := dataPoint.(*measurev1.DataPointValue).GetTagFamilies()
+ ok, matchErr := f.Match(logical.TagFamiliesForWrite(tffws), manager.s)
+ if matchErr != nil {
+ manager.l.Err(matchErr).Msg("fail to match criteria")
+ return false
+ }
+ return ok
}, nil
}
-func (manager *topNProcessorManager) buildFilterForCriteria(criteria *modelv1.Criteria) (conditionFilter, error) {
- switch v := criteria.GetExp().(type) {
- case *modelv1.Criteria_Condition:
- return manager.buildFilterForCondition(v.Condition)
- case *modelv1.Criteria_Le:
- return manager.buildFilterForLogicalExpr(v.Le)
- default:
- return nil, errors.New("should not reach here")
- }
-}
-
-// buildFilterForCondition builds a logical and composable filter for a logical expression which have underlying conditions,
-// or nested logical expressions as its children.
-func (manager *topNProcessorManager) buildFilterForLogicalExpr(logicalExpr *modelv1.LogicalExpression) (conditionFilter, error) {
- left, lErr := manager.buildFilterForCriteria(logicalExpr.Left)
- if lErr != nil {
- return nil, lErr
- }
- right, rErr := manager.buildFilterForCriteria(logicalExpr.Right)
- if rErr != nil {
- return nil, rErr
- }
- return composeWithOp(left, right, logicalExpr.Op), nil
-}
-
-func composeWithOp(left, right conditionFilter, op modelv1.LogicalExpression_LogicalOp) conditionFilter {
- if op == modelv1.LogicalExpression_LOGICAL_OP_AND {
- return &andFilter{left, right}
- }
- return &orFilter{left, right}
-}
-
-// buildFilterForCondition builds a single, composable filter for a single condition.
-func (manager *topNProcessorManager) buildFilterForCondition(cond *modelv1.Condition) (conditionFilter, error) {
- familyOffset, tagOffset, spec := pbv1.FindTagByName(manager.m.GetSchema().GetTagFamilies(), cond.GetName())
- if spec == nil {
- return nil, errors.New("fail to parse tag by name")
- }
- switch v := cond.GetValue().GetValue().(type) {
- case *modelv1.TagValue_Int:
- return &int64TagFilter{
- TagLocator: partition.TagLocator{
- FamilyOffset: familyOffset,
- TagOffset: tagOffset,
- },
- op: cond.GetOp(),
- val: v.Int.GetValue(),
- }, nil
- case *modelv1.TagValue_Str:
- return &strTagFilter{
- TagLocator: partition.TagLocator{
- FamilyOffset: familyOffset,
- TagOffset: tagOffset,
- },
- op: cond.GetOp(),
- val: v.Str.GetValue(),
- }, nil
- case *modelv1.TagValue_Id:
- return &idTagFilter{
- TagLocator: partition.TagLocator{
- FamilyOffset: familyOffset,
- TagOffset: tagOffset,
- },
- op: cond.GetOp(),
- val: v.Id.GetValue(),
- }, nil
- default:
- return nil, errUnsupportedConditionValueType
- }
-}
-
func (manager *topNProcessorManager) buildMapper(fieldName string, groupByNames ...string) (flow.UnaryFunc[any], error) {
fieldIdx := slices.IndexFunc(manager.m.GetSchema().GetFields(), func(spec *databasev1.FieldSpec) bool {
return spec.GetName() == fieldName
@@ -485,6 +446,13 @@ func (manager *topNProcessorManager) buildMapper(fieldName string, groupByNames
if len(groupByNames) == 0 {
return func(_ context.Context, request any) any {
dataPoint := request.(*measurev1.DataPointValue)
+ if len(dataPoint.GetFields()) <= fieldIdx {
+ manager.l.Warn().Interface("point", dataPoint).
+ Str("fieldName", fieldName).
+ Int("len", len(dataPoint.GetFields())).
+ Int("fieldIdx", fieldIdx).
+ Msg("out of range")
+ }
return flow.Data{
// save string representation of group values as the key, i.e. v1
"",
@@ -518,93 +486,6 @@ func (manager *topNProcessorManager) buildMapper(fieldName string, groupByNames
}, nil
}
-var (
- _ conditionFilter = (*strTagFilter)(nil)
- _ conditionFilter = (*int64TagFilter)(nil)
-)
-
-type conditionFilter interface {
- predicate(tagFamilies []*modelv1.TagFamilyForWrite) bool
-}
-
-type strTagFilter struct {
- val string
- partition.TagLocator
- op modelv1.Condition_BinaryOp
-}
-
-func (f *strTagFilter) predicate(tagFamilies []*modelv1.TagFamilyForWrite) bool {
- strValue := tagFamilies[f.FamilyOffset].GetTags()[f.TagOffset].GetStr().GetValue()
- switch f.op {
- case modelv1.Condition_BINARY_OP_EQ:
- return strValue == f.val
- case modelv1.Condition_BINARY_OP_NE:
- return strValue != f.val
- default:
- return false
- }
-}
-
-type idTagFilter struct {
- val string
- partition.TagLocator
- op modelv1.Condition_BinaryOp
-}
-
-func (f *idTagFilter) predicate(tagFamilies []*modelv1.TagFamilyForWrite) bool {
- val := tagFamilies[f.FamilyOffset].GetTags()[f.TagOffset].GetId().GetValue()
- switch f.op {
- case modelv1.Condition_BINARY_OP_EQ:
- return val == f.val
- case modelv1.Condition_BINARY_OP_NE:
- return val != f.val
- default:
- return false
- }
-}
-
-type int64TagFilter struct {
- partition.TagLocator
- op modelv1.Condition_BinaryOp
- val int64
-}
-
-func (f *int64TagFilter) predicate(tagFamilies []*modelv1.TagFamilyForWrite) bool {
- val := tagFamilies[f.FamilyOffset].GetTags()[f.TagOffset].GetInt().GetValue()
- switch f.op {
- case modelv1.Condition_BINARY_OP_EQ:
- return val == f.val
- case modelv1.Condition_BINARY_OP_NE:
- return val != f.val
- case modelv1.Condition_BINARY_OP_GE:
- return val >= f.val
- case modelv1.Condition_BINARY_OP_GT:
- return val > f.val
- case modelv1.Condition_BINARY_OP_LE:
- return val <= f.val
- case modelv1.Condition_BINARY_OP_LT:
- return val < f.val
- default:
- return false
- }
-}
-
-type andFilter struct {
- l, r conditionFilter
-}
-
-func (f *andFilter) predicate(tagFamilies []*modelv1.TagFamilyForWrite) bool {
- return f.l.predicate(tagFamilies) && f.r.predicate(tagFamilies)
-}
-
-type orFilter struct {
- l, r conditionFilter
-}
-
-func (f *orFilter) predicate(tagFamilies []*modelv1.TagFamilyForWrite) bool {
- return f.l.predicate(tagFamilies) || f.r.predicate(tagFamilies)
-}
-
// groupTagsLocator can be used to locate tags within families.
type groupTagsLocator []partition.TagLocator
diff --git a/banyand/measure/measure_write.go b/banyand/measure/measure_write.go
index b91d5a0d..2c2eb233 100644
--- a/banyand/measure/measure_write.go
+++ b/banyand/measure/measure_write.go
@@ -25,7 +25,6 @@ import (
"github.com/pkg/errors"
"github.com/apache/skywalking-banyandb/api/common"
- commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
measurev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
"github.com/apache/skywalking-banyandb/banyand/tsdb"
@@ -39,17 +38,18 @@ import (
var errMalformedElement = errors.New("element is malformed")
-func (s *measure) write(md *commonv1.Metadata, shardID common.ShardID, entity []byte, entityValues tsdb.EntityValues, value *measurev1.DataPointValue) error {
+func (s *measure) write(shardID common.ShardID, entity []byte, entityValues tsdb.EntityValues,
+ value *measurev1.DataPointValue,
+) error {
t := value.GetTimestamp().AsTime().Local()
if err := timestamp.Check(t); err != nil {
return errors.WithMessage(err, "writing stream")
}
- sm := s.schema
fLen := len(value.GetTagFamilies())
if fLen < 1 {
return errors.Wrap(errMalformedElement, "no tag family")
}
- if fLen > len(sm.TagFamilies) {
+ if fLen > len(s.schema.GetTagFamilies()) {
return errors.Wrap(errMalformedElement, "tag family number is more than expected")
}
shard, err := s.databaseSupplier.SupplyTSDB().Shard(shardID)
@@ -72,18 +72,18 @@ func (s *measure) write(md *commonv1.Metadata, shardID common.ShardID, entity []
writeFn := func() (tsdb.Writer, error) {
builder := wp.WriterBuilder().Time(t)
for fi, family := range value.GetTagFamilies() {
- spec := sm.GetTagFamilies()[fi]
+ spec := s.schema.GetTagFamilies()[fi]
bb, errMarshal := pbv1.EncodeFamily(spec, family)
if errMarshal != nil {
return nil, errMarshal
}
builder.Family(familyIdentity(spec.GetName(), pbv1.TagFlag), bb)
}
- if len(value.GetFields()) > len(sm.GetFields()) {
+ if len(value.GetFields()) > len(s.schema.GetFields()) {
return nil, errors.Wrap(errMalformedElement, "fields number is more than expected")
}
for fi, fieldValue := range value.GetFields() {
- fieldSpec := sm.GetFields()[fi]
+ fieldSpec := s.schema.GetFields()[fi]
fType, isNull := pbv1.FieldValueTypeConv(fieldValue)
if isNull {
s.l.Warn().RawJSON("written", logger.Proto(value)).Msg("ignore null field")
@@ -97,14 +97,14 @@ func (s *measure) write(md *commonv1.Metadata, shardID common.ShardID, entity []
s.l.Warn().RawJSON("written", logger.Proto(value)).Msg("ignore unknown field")
continue
}
- builder.Family(familyIdentity(sm.GetFields()[fi].GetName(), pbv1.EncoderFieldFlag(fieldSpec, s.interval)), data)
+ builder.Family(familyIdentity(s.schema.GetFields()[fi].GetName(), pbv1.EncoderFieldFlag(fieldSpec, s.interval)), data)
}
writer, errWrite := builder.Build()
if errWrite != nil {
return nil, errWrite
}
_, errWrite = writer.Write()
- if e := s.l.Named(md.Group, md.Name).Debug(); e.Enabled() {
+ if e := s.l.Named(s.schema.GetMetadata().GetGroup(), s.schema.GetMetadata().GetName()).Debug(); e.Enabled() {
e.Time("ts", t).
Int("ts_nano", t.Nanosecond()).
RawJSON("data", logger.Proto(value)).
@@ -130,10 +130,12 @@ func (s *measure) write(md *commonv1.Metadata, shardID common.ShardID, entity []
BlockCloser: wp,
}
s.indexWriter.Write(m)
- s.processorManager.onMeasureWrite(&measurev1.WriteRequest{
- Metadata: s.GetMetadata(),
- DataPoint: value,
- })
+ if s.processorManager != nil {
+ s.processorManager.onMeasureWrite(&measurev1.WriteRequest{
+ Metadata: s.GetMetadata(),
+ DataPoint: value,
+ })
+ }
return err
}
@@ -161,7 +163,7 @@ func (w *writeCallback) Rev(message bus.Message) (resp bus.Message) {
w.l.Warn().Msg("cannot find measure definition")
return
}
- err := stm.write(writeEvent.GetRequest().GetMetadata(), common.ShardID(writeEvent.GetShardId()),
+ err := stm.write(common.ShardID(writeEvent.GetShardId()),
writeEvent.SeriesHash, tsdb.DecodeEntityValues(writeEvent.GetEntityValues()), writeEvent.GetRequest().GetDataPoint())
if err != nil {
w.l.Error().Err(err).RawJSON("written", logger.Proto(writeEvent)).Msg("fail to write entity")
diff --git a/banyand/measure/service.go b/banyand/measure/service.go
index a606c9fc..15d1bdea 100644
--- a/banyand/measure/service.go
+++ b/banyand/measure/service.go
@@ -145,6 +145,37 @@ func (s *service) Serve() run.StopNotify {
RegisterHandler(schema.KindGroup|schema.KindMeasure|schema.KindIndexRuleBinding|schema.KindIndexRule|schema.KindTopNAggregation,
&s.schemaRepo)
+ // start TopN manager after registering handlers
+ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ groups, err := s.metadata.GroupRegistry().ListGroup(ctx)
+ cancel()
+
+ if err != nil {
+ s.l.Err(err).Msg("fail to list groups")
+ return s.stopCh
+ }
+
+ for _, g := range groups {
+ if g.Catalog != commonv1.Catalog_CATALOG_MEASURE {
+ continue
+ }
+ ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
+ allMeasureSchemas, listErr := s.metadata.MeasureRegistry().
+ ListMeasure(ctx, schema.ListOpt{Group: g.GetMetadata().GetName()})
+ cancel()
+ if listErr != nil {
+ s.l.Err(listErr).Str("group", g.GetMetadata().GetName()).Msg("fail to list measures in the group")
+ continue
+ }
+ for _, measureSchema := range allMeasureSchemas {
+ if res, ok := s.schemaRepo.LoadResource(measureSchema.GetMetadata()); ok {
+ if startErr := res.(*measure).startSteamingManager(s.pipeline, s.metadata); startErr != nil {
+ s.l.Err(startErr).Str("measure", measureSchema.GetMetadata().GetName()).Msg("fail to start streaming manager")
+ }
+ }
+ }
+ }
+
return s.stopCh
}
diff --git a/banyand/metadata/schema/measure.go b/banyand/metadata/schema/measure.go
index 8895b8ba..b4276b71 100644
--- a/banyand/metadata/schema/measure.go
+++ b/banyand/metadata/schema/measure.go
@@ -92,7 +92,7 @@ func (e *etcdSchemaRegistry) CreateMeasure(ctx context.Context, measure *databas
for _, tfs := range measure.GetTagFamilies() {
for _, ts := range tfs.GetTags() {
if ts.Type == databasev1.TagType_TAG_TYPE_ID {
- for _, e := range measure.Entity.TagNames {
+ for _, e := range measure.GetEntity().GetTagNames() {
if ts.Name == e {
continue
}
diff --git a/pkg/query/logical/measure/measure_analyzer.go b/pkg/query/logical/measure/measure_analyzer.go
index a8f6150b..95368147 100644
--- a/pkg/query/logical/measure/measure_analyzer.go
+++ b/pkg/query/logical/measure/measure_analyzer.go
@@ -36,18 +36,14 @@ func BuildSchema(measureSchema measure.Measure) (logical.Schema, error) {
ms := &schema{
common: &logical.CommonSchema{
IndexRules: measureSchema.GetIndexRules(),
- TagMap: make(map[string]*logical.TagSpec),
+ TagSpecMap: make(map[string]*logical.TagSpec),
EntityList: md.GetEntity().GetTagNames(),
},
measure: md,
fieldMap: make(map[string]*logical.FieldSpec),
}
- for tagFamilyIdx, tagFamily := range md.GetTagFamilies() {
- for tagIdx, spec := range tagFamily.GetTags() {
- ms.registerTag(tagFamilyIdx, tagIdx, spec)
- }
- }
+ ms.common.RegisterTagFamilies(md.GetTagFamilies())
for fieldIdx, spec := range md.GetFields() {
ms.registerField(fieldIdx, spec)
diff --git a/pkg/query/logical/measure/schema.go b/pkg/query/logical/measure/schema.go
index 59656612..b389053f 100644
--- a/pkg/query/logical/measure/schema.go
+++ b/pkg/query/logical/measure/schema.go
@@ -27,6 +27,7 @@ import (
"github.com/apache/skywalking-banyandb/pkg/query/logical"
)
+// errFieldNotDefined indicated the field is not defined in the measure schema.
var errFieldNotDefined = errors.New("field is not defined")
type schema struct {
@@ -43,6 +44,10 @@ func (m *schema) EntityList() []string {
return m.common.EntityList
}
+func (m *schema) FindTagSpecByName(name string) *logical.TagSpec {
+ return m.common.FindTagSpecByName(name)
+}
+
func (m *schema) IndexDefined(tagName string) (bool, *databasev1.IndexRule) {
return m.common.IndexDefined(tagName)
}
@@ -101,16 +106,11 @@ func (m *schema) ProjFields(fieldRefs ...*logical.FieldRef) logical.Schema {
func (m *schema) Equal(s2 logical.Schema) bool {
if other, ok := s2.(*schema); ok {
// TODO: add more equality checks
- return cmp.Equal(other.common.TagMap, m.common.TagMap)
+ return cmp.Equal(other.common.TagSpecMap, m.common.TagSpecMap)
}
return false
}
-// registerTag registers the tag spec with given tagFamilyIdx and tagIdx.
-func (m *schema) registerTag(tagFamilyIdx, tagIdx int, spec *databasev1.TagSpec) {
- m.common.RegisterTag(tagFamilyIdx, tagIdx, spec)
-}
-
// registerField registers the field spec with given index.
func (m *schema) registerField(fieldIdx int, spec *databasev1.FieldSpec) {
m.fieldMap[spec.GetName()] = &logical.FieldSpec{
diff --git a/pkg/query/logical/schema.go b/pkg/query/logical/schema.go
index 4512b9de..e0a14c07 100644
--- a/pkg/query/logical/schema.go
+++ b/pkg/query/logical/schema.go
@@ -24,12 +24,33 @@ import (
"github.com/apache/skywalking-banyandb/banyand/tsdb"
)
+// IndexChecker allows checking the existence of a specific index rule.
+type IndexChecker interface {
+ IndexDefined(tagName string) (bool, *databasev1.IndexRule)
+ IndexRuleDefined(ruleName string) (bool, *databasev1.IndexRule)
+}
+
+type emptyIndexChecker struct{}
+
+func (emptyIndexChecker) IndexDefined(_ string) (bool, *databasev1.IndexRule) {
+ return false, nil
+}
+
+func (emptyIndexChecker) IndexRuleDefined(_ string) (bool, *databasev1.IndexRule) {
+ return false, nil
+}
+
+// TagSpecRegistry enables to find TagSpec by its name.
+type TagSpecRegistry interface {
+ FindTagSpecByName(string) *TagSpec
+}
+
// Schema allows retrieving schemas in a convenient way.
type Schema interface {
+ TagSpecRegistry
+ IndexChecker
Scope() tsdb.Entry
EntityList() []string
- IndexDefined(tagName string) (bool, *databasev1.IndexRule)
- IndexRuleDefined(string) (bool, *databasev1.IndexRule)
CreateTagRef(tags ...[]*Tag) ([][]*TagRef, error)
CreateFieldRef(fields ...*Field) ([]*FieldRef, error)
ProjTags(refs ...[]*TagRef) Schema
@@ -50,11 +71,40 @@ func (fs *TagSpec) Equal(other *TagSpec) bool {
fs.Spec.GetType() == other.Spec.GetType() && fs.Spec.GetName() == other.Spec.GetName()
}
+// TagSpecMap is a map of TapSpec implements TagSpecRegistry.
+type TagSpecMap map[string]*TagSpec
+
+// FindTagSpecByName finds TagSpec by its name in the registry.
+func (tagSpecMap TagSpecMap) FindTagSpecByName(name string) *TagSpec {
+ if spec, ok := tagSpecMap[name]; ok {
+ return spec
+ }
+ return nil
+}
+
+// RegisterTagFamilies registers the tag specs with a given slice of TagFamilySpec.
+func (tagSpecMap TagSpecMap) RegisterTagFamilies(tagFamilies []*databasev1.TagFamilySpec) {
+ for tagFamilyIdx, tagFamily := range tagFamilies {
+ for tagIdx, spec := range tagFamily.GetTags() {
+ tagSpecMap.RegisterTag(tagFamilyIdx, tagIdx, spec)
+ }
+ }
+}
+
+// RegisterTag registers the tag spec with given tagFamilyName, tagName and indexes.
+func (tagSpecMap TagSpecMap) RegisterTag(tagFamilyIdx, tagIdx int, spec *databasev1.TagSpec) {
+ tagSpecMap[spec.GetName()] = &TagSpec{
+ TagIdx: tagIdx,
+ TagFamilyIdx: tagFamilyIdx,
+ Spec: spec,
+ }
+}
+
// CommonSchema represents a sharable fields between independent schemas.
// It provides common access methods at the same time.
type CommonSchema struct {
+ TagSpecMap
IndexRules []*databasev1.IndexRule
- TagMap map[string]*TagSpec
EntityList []string
}
@@ -65,12 +115,12 @@ func (cs *CommonSchema) ProjTags(refs ...[]*TagRef) *CommonSchema {
}
newCommonSchema := &CommonSchema{
IndexRules: cs.IndexRules,
- TagMap: make(map[string]*TagSpec),
+ TagSpecMap: make(map[string]*TagSpec),
EntityList: cs.EntityList,
}
for projFamilyIdx, refInFamily := range refs {
for projIdx, ref := range refInFamily {
- newCommonSchema.TagMap[ref.Tag.getTagName()] = &TagSpec{
+ newCommonSchema.TagSpecMap[ref.Tag.getTagName()] = &TagSpec{
TagFamilyIdx: projFamilyIdx,
TagIdx: projIdx,
Spec: ref.Spec.Spec,
@@ -80,15 +130,6 @@ func (cs *CommonSchema) ProjTags(refs ...[]*TagRef) *CommonSchema {
return newCommonSchema
}
-// RegisterTag registers the tag spec with given tagFamilyName, tagName and indexes.
-func (cs *CommonSchema) RegisterTag(tagFamilyIdx, tagIdx int, spec *databasev1.TagSpec) {
- cs.TagMap[spec.GetName()] = &TagSpec{
- TagIdx: tagIdx,
- TagFamilyIdx: tagFamilyIdx,
- Spec: spec,
- }
-}
-
// IndexDefined checks whether the field given is indexed.
func (cs *CommonSchema) IndexDefined(tagName string) (bool, *databasev1.IndexRule) {
for _, idxRule := range cs.IndexRules {
@@ -119,7 +160,7 @@ func (cs *CommonSchema) CreateRef(tags ...[]*Tag) ([][]*TagRef, error) {
for i, tagInFamily := range tags {
var tagRefsInFamily []*TagRef
for _, tag := range tagInFamily {
- if ts, ok := cs.TagMap[tag.getTagName()]; ok {
+ if ts, ok := cs.TagSpecMap[tag.getTagName()]; ok {
tagRefsInFamily = append(tagRefsInFamily, &TagRef{tag, ts})
} else {
return nil, errors.Wrap(errTagNotDefined, tag.GetCompoundName())
diff --git a/pkg/query/logical/stream/schema.go b/pkg/query/logical/stream/schema.go
index fc7bf59b..47f468d5 100644
--- a/pkg/query/logical/stream/schema.go
+++ b/pkg/query/logical/stream/schema.go
@@ -33,6 +33,10 @@ type schema struct {
common *logical.CommonSchema
}
+func (s *schema) FindTagSpecByName(name string) *logical.TagSpec {
+ return s.common.FindTagSpecByName(name)
+}
+
func (s *schema) CreateFieldRef(_ ...*logical.Field) ([]*logical.FieldRef, error) {
panic("no field for stream")
}
@@ -52,16 +56,11 @@ func (s *schema) IndexDefined(tagName string) (bool, *databasev1.IndexRule) {
func (s *schema) Equal(s2 logical.Schema) bool {
if other, ok := s2.(*schema); ok {
- return cmp.Equal(other.common.TagMap, s.common.TagMap)
+ return cmp.Equal(other.common.TagSpecMap, s.common.TagSpecMap)
}
return false
}
-// registerTag registers the tag spec with given tagFamilyName, tagName and indexes.
-func (s *schema) registerTag(tagFamilyIdx, tagIdx int, spec *databasev1.TagSpec) {
- s.common.RegisterTag(tagFamilyIdx, tagIdx, spec)
-}
-
// CreateTagRef create TagRef to the given tags.
// The family name of the tag is actually not used
// since the uniqueness of the tag names can be guaranteed across families.
diff --git a/pkg/query/logical/stream/stream_analyzer.go b/pkg/query/logical/stream/stream_analyzer.go
index 5972caa1..4771eeab 100644
--- a/pkg/query/logical/stream/stream_analyzer.go
+++ b/pkg/query/logical/stream/stream_analyzer.go
@@ -37,18 +37,13 @@ func BuildSchema(streamSchema stream.Stream) (logical.Schema, error) {
s := &schema{
common: &logical.CommonSchema{
IndexRules: streamSchema.GetIndexRules(),
- TagMap: make(map[string]*logical.TagSpec),
+ TagSpecMap: make(map[string]*logical.TagSpec),
EntityList: sm.GetEntity().GetTagNames(),
},
stream: sm,
}
- // generate the streamSchema of the fields for the traceSeries
- for tagFamilyIdx, tagFamily := range sm.GetTagFamilies() {
- for tagIdx, spec := range tagFamily.GetTags() {
- s.registerTag(tagFamilyIdx, tagIdx, spec)
- }
- }
+ s.common.RegisterTagFamilies(sm.GetTagFamilies())
return s, nil
}
diff --git a/pkg/query/logical/stream/stream_plan_tag_filter.go b/pkg/query/logical/stream/stream_plan_tag_filter.go
index d74c3654..4ea0b3bf 100644
--- a/pkg/query/logical/stream/stream_plan_tag_filter.go
+++ b/pkg/query/logical/stream/stream_plan_tag_filter.go
@@ -85,7 +85,8 @@ func (uis *unresolvedTagFilter) Analyze(s logical.Schema) (logical.Plan, error)
return nil, errFilter
}
if tagFilter != logical.DummyFilter {
- plan = newTagFilter(s, plan, tagFilter)
+ // create tagFilter with a projected view
+ plan = newTagFilter(s.ProjTags(ctx.projTagsRefs...), plan, tagFilter)
}
}
return plan, err
@@ -167,7 +168,7 @@ func (t *tagFilterPlan) Execute(ec executor.StreamExecutionContext) ([]*streamv1
}
filteredElements := make([]*streamv1.Element, 0)
for _, e := range entities {
- ok, err := t.tagFilter.Match(e.TagFamilies)
+ ok, err := t.tagFilter.Match(logical.TagFamilies(e.TagFamilies), t.s)
if err != nil {
return nil, err
}
diff --git a/pkg/query/logical/tag_filter.go b/pkg/query/logical/tag_filter.go
index cafd6658..8e4fdce2 100644
--- a/pkg/query/logical/tag_filter.go
+++ b/pkg/query/logical/tag_filter.go
@@ -29,14 +29,55 @@ import (
var errUnsupportedLogicalOperation = errors.New("unsupported logical operation")
+// TagValueIndexAccessor provides accessor to get TagValue by two indexes, i.e. tagFamilyIndex and tagIndex.
+// It works like a matrix.
+type TagValueIndexAccessor interface {
+ GetTagValue(tagFamilyIdx, tagIdx int) *modelv1.TagValue
+}
+
+// TagFamilies wraps a slice of TagFamily.
+type TagFamilies []*modelv1.TagFamily
+
+// GetTagValue gets TagValue from the underlying TagFamily slice.
+func (tfs TagFamilies) GetTagValue(tagFamilyIdx, tagIdx int) *modelv1.TagValue {
+ if len(tfs)-1 < tagFamilyIdx {
+ return nil
+ }
+ tags := tfs[tagFamilyIdx].GetTags()
+ if len(tags)-1 < tagIdx {
+ return nil
+ }
+ return tags[tagIdx].GetValue()
+}
+
+// TagFamiliesForWrite wraps a slice of TagFamilyForWrite.
+type TagFamiliesForWrite []*modelv1.TagFamilyForWrite
+
+// GetTagValue gets TagValue from the underlying TagFamilyForWrite slice.
+func (tffws TagFamiliesForWrite) GetTagValue(tagFamilyIdx, tagIdx int) *modelv1.TagValue {
+ if len(tffws)-1 < tagFamilyIdx {
+ return nil
+ }
+ tagVals := tffws[tagFamilyIdx].GetTags()
+ if len(tagVals)-1 < tagIdx {
+ return nil
+ }
+ return tagVals[tagIdx]
+}
+
// TagFilter allows matching a tag based on a predicate.
type TagFilter interface {
fmt.Stringer
- Match(tagFamilies []*modelv1.TagFamily) (bool, error)
+ Match(accessor TagValueIndexAccessor, registry TagSpecRegistry) (bool, error)
+}
+
+// BuildSimpleTagFilter returns a TagFilter without any local-index, global index, sharding key support.
+func BuildSimpleTagFilter(criteria *modelv1.Criteria) (TagFilter, error) {
+ return BuildTagFilter(criteria, nil, emptyIndexChecker{}, false)
}
// BuildTagFilter returns a TagFilter if predicates doesn't match any indices.
-func BuildTagFilter(criteria *modelv1.Criteria, entityDict map[string]int, schema Schema, hasGlobalIndex bool) (TagFilter, error) {
+func BuildTagFilter(criteria *modelv1.Criteria, entityDict map[string]int, indexChecker IndexChecker, hasGlobalIndex bool) (TagFilter, error) {
if criteria == nil {
return DummyFilter, nil
}
@@ -47,7 +88,7 @@ func BuildTagFilter(criteria *modelv1.Criteria, entityDict map[string]int, schem
if err != nil {
return nil, err
}
- if ok, _ := schema.IndexDefined(cond.Name); ok {
+ if ok, _ := indexChecker.IndexDefined(cond.Name); ok {
return DummyFilter, nil
}
if _, ok := entityDict[cond.Name]; ok {
@@ -56,11 +97,11 @@ func BuildTagFilter(criteria *modelv1.Criteria, entityDict map[string]int, schem
return parseFilter(cond, expr)
case *modelv1.Criteria_Le:
le := criteria.GetLe()
- left, err := BuildTagFilter(le.Left, entityDict, schema, hasGlobalIndex)
+ left, err := BuildTagFilter(le.Left, entityDict, indexChecker, hasGlobalIndex)
if err != nil {
return nil, err
}
- right, err := BuildTagFilter(le.Right, entityDict, schema, hasGlobalIndex)
+ right, err := BuildTagFilter(le.Right, entityDict, indexChecker, hasGlobalIndex)
if err != nil {
return nil, err
}
@@ -150,7 +191,9 @@ var DummyFilter = new(dummyTagFilter)
type dummyTagFilter struct{}
-func (dummyTagFilter) Match(_ []*modelv1.TagFamily) (bool, error) { return true, nil }
+func (dummyTagFilter) Match(_ TagValueIndexAccessor, _ TagSpecRegistry) (bool, error) {
+ return true, nil
+}
func (dummyTagFilter) String() string { return "dummy" }
@@ -171,10 +214,10 @@ func (n *logicalNode) append(sub TagFilter) *logicalNode {
return n
}
-func matchTag(tagFamilies []*modelv1.TagFamily, n *logicalNode, lp logicalNodeOP) (bool, error) {
+func matchTag(accessor TagValueIndexAccessor, registry TagSpecRegistry, n *logicalNode, lp logicalNodeOP) (bool, error) {
var result *bool
for _, sn := range n.SubNodes {
- r, err := sn.Match(tagFamilies)
+ r, err := sn.Match(accessor, registry)
if err != nil {
return false, err
}
@@ -209,8 +252,8 @@ func (an *andLogicalNode) merge(bb ...bool) bool {
return true
}
-func (an *andLogicalNode) Match(tagFamilies []*modelv1.TagFamily) (bool, error) {
- return matchTag(tagFamilies, an.logicalNode, an)
+func (an *andLogicalNode) Match(accessor TagValueIndexAccessor, registry TagSpecRegistry) (bool, error) {
+ return matchTag(accessor, registry, an.logicalNode, an)
}
func (an *andLogicalNode) MarshalJSON() ([]byte, error) {
@@ -244,8 +287,8 @@ func (on *orLogicalNode) merge(bb ...bool) bool {
return false
}
-func (on *orLogicalNode) Match(tagFamilies []*modelv1.TagFamily) (bool, error) {
- return matchTag(tagFamilies, on.logicalNode, on)
+func (on *orLogicalNode) Match(accessor TagValueIndexAccessor, registry TagSpecRegistry) (bool, error) {
+ return matchTag(accessor, registry, on.logicalNode, on)
}
func (on *orLogicalNode) MarshalJSON() ([]byte, error) {
@@ -282,8 +325,8 @@ func newNotTag(inner TagFilter) *notTag {
}
}
-func (n *notTag) Match(tagFamilies []*modelv1.TagFamily) (bool, error) {
- b, err := n.Inner.Match(tagFamilies)
+func (n *notTag) Match(accessor TagValueIndexAccessor, registry TagSpecRegistry) (bool, error) {
+ b, err := n.Inner.Match(accessor, registry)
if err != nil {
return false, err
}
@@ -313,8 +356,8 @@ func newInTag(tagName string, values LiteralExpr) *inTag {
}
}
-func (h *inTag) Match(tagFamilies []*modelv1.TagFamily) (bool, error) {
- expr, err := tagExpr(tagFamilies, h.Name)
+func (h *inTag) Match(accessor TagValueIndexAccessor, registry TagSpecRegistry) (bool, error) {
+ expr, err := tagExpr(accessor, registry, h.Name)
if err != nil {
return false, err
}
@@ -334,8 +377,8 @@ func newEqTag(tagName string, values LiteralExpr) *eqTag {
}
}
-func (eq *eqTag) Match(tagFamilies []*modelv1.TagFamily) (bool, error) {
- expr, err := tagExpr(tagFamilies, eq.Name)
+func (eq *eqTag) Match(accessor TagValueIndexAccessor, registry TagSpecRegistry) (bool, error) {
+ expr, err := tagExpr(accessor, registry, eq.Name)
if err != nil {
return false, err
}
@@ -373,8 +416,8 @@ func newRangeTag(tagName string, opts rangeOpts) *rangeTag {
}
}
-func (r *rangeTag) Match(tagFamilies []*modelv1.TagFamily) (bool, error) {
- expr, err := tagExpr(tagFamilies, r.Name)
+func (r *rangeTag) Match(accessor TagValueIndexAccessor, registry TagSpecRegistry) (bool, error) {
+ expr, err := tagExpr(accessor, registry, r.Name)
if err != nil {
return false, err
}
@@ -442,12 +485,10 @@ func (r *rangeTag) String() string {
return jsonToString(r)
}
-func tagExpr(tagFamilies []*modelv1.TagFamily, tagName string) (ComparableExpr, error) {
- for _, tf := range tagFamilies {
- for _, t := range tf.Tags {
- if t.Key == tagName {
- return parseExpr(t.Value)
- }
+func tagExpr(accessor TagValueIndexAccessor, registry TagSpecRegistry, tagName string) (ComparableExpr, error) {
+ if tagSpec := registry.FindTagSpecByName(tagName); tagSpec != nil {
+ if tagVal := accessor.GetTagValue(tagSpec.TagFamilyIdx, tagSpec.TagIdx); tagVal != nil {
+ return parseExpr(tagVal)
}
}
return nil, errTagNotDefined
@@ -466,8 +507,8 @@ func newHavingTag(tagName string, values LiteralExpr) *havingTag {
}
}
-func (h *havingTag) Match(tagFamilies []*modelv1.TagFamily) (bool, error) {
- expr, err := tagExpr(tagFamilies, h.Name)
+func (h *havingTag) Match(accessor TagValueIndexAccessor, registry TagSpecRegistry) (bool, error) {
+ expr, err := tagExpr(accessor, registry, h.Name)
if err != nil {
return false, err
}