You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ha...@apache.org on 2023/01/22 12:46:17 UTC

[skywalking-banyandb] branch main updated: Unify TagFilter for Streaming scenario (#239)

This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new c25160d0 Unify TagFilter for Streaming scenario (#239)
c25160d0 is described below

commit c25160d0321d6596fb84891628beba4470cbab3d
Author: Jiajing LU <lu...@gmail.com>
AuthorDate: Sun Jan 22 20:46:11 2023 +0800

    Unify TagFilter for Streaming scenario (#239)
    
    * unify filter
    * move processor manager start to Run stage
---
 banyand/measure/measure.go                         |  48 ++-
 banyand/measure/measure_topn.go                    | 391 +++++++--------------
 banyand/measure/measure_write.go                   |  30 +-
 banyand/measure/service.go                         |  31 ++
 banyand/metadata/schema/measure.go                 |   2 +-
 pkg/query/logical/measure/measure_analyzer.go      |   8 +-
 pkg/query/logical/measure/schema.go                |  12 +-
 pkg/query/logical/schema.go                        |  71 +++-
 pkg/query/logical/stream/schema.go                 |  11 +-
 pkg/query/logical/stream/stream_analyzer.go        |   9 +-
 pkg/query/logical/stream/stream_plan_tag_filter.go |   5 +-
 pkg/query/logical/tag_filter.go                    |  97 +++--
 12 files changed, 359 insertions(+), 356 deletions(-)

diff --git a/banyand/measure/measure.go b/banyand/measure/measure.go
index 46485256..1e1e7e2f 100644
--- a/banyand/measure/measure.go
+++ b/banyand/measure/measure.go
@@ -26,11 +26,14 @@ import (
 
 	commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
 	databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+	"github.com/apache/skywalking-banyandb/banyand/metadata"
+	"github.com/apache/skywalking-banyandb/banyand/queue"
 	"github.com/apache/skywalking-banyandb/banyand/tsdb"
 	"github.com/apache/skywalking-banyandb/banyand/tsdb/index"
 	"github.com/apache/skywalking-banyandb/pkg/logger"
 	"github.com/apache/skywalking-banyandb/pkg/partition"
 	pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+	"github.com/apache/skywalking-banyandb/pkg/query/logical"
 	"github.com/apache/skywalking-banyandb/pkg/timestamp"
 )
 
@@ -50,11 +53,32 @@ type measure struct {
 	group                  string
 	entityLocator          partition.EntityLocator
 	indexRules             []*databasev1.IndexRule
+	topNAggregations       []*databasev1.TopNAggregation
 	maxObservedModRevision int64
 	interval               time.Duration
 	shardNum               uint32
 }
 
+func (s *measure) startSteamingManager(pipeline queue.Queue, repo metadata.Repo) error {
+	if len(s.topNAggregations) == 0 {
+		return nil
+	}
+	tagMapSpec := logical.TagSpecMap{}
+	tagMapSpec.RegisterTagFamilies(s.schema.GetTagFamilies())
+
+	s.processorManager = &topNProcessorManager{
+		l:            s.l,
+		pipeline:     pipeline,
+		repo:         repo,
+		m:            s,
+		s:            tagMapSpec,
+		topNSchemas:  s.topNAggregations,
+		processorMap: make(map[*commonv1.Metadata][]*topNStreamingProcessor),
+	}
+
+	return s.processorManager.start()
+}
+
 func (s *measure) GetSchema() *databasev1.Measure {
 	return s.schema
 }
@@ -76,6 +100,9 @@ func (s *measure) EntityLocator() partition.EntityLocator {
 }
 
 func (s *measure) Close() error {
+	if s.processorManager == nil {
+		return nil
+	}
 	return s.processorManager.Close()
 }
 
@@ -97,10 +124,11 @@ type measureSpec struct {
 
 func openMeasure(shardNum uint32, db tsdb.Supplier, spec measureSpec, l *logger.Logger) (*measure, error) {
 	m := &measure{
-		shardNum:   shardNum,
-		schema:     spec.schema,
-		indexRules: spec.indexRules,
-		l:          l,
+		shardNum:         shardNum,
+		schema:           spec.schema,
+		indexRules:       spec.indexRules,
+		topNAggregations: spec.topNAggregations,
+		l:                l,
 	}
 	if err := m.parseSpec(); err != nil {
 		return nil, err
@@ -115,17 +143,5 @@ func openMeasure(shardNum uint32, db tsdb.Supplier, spec measureSpec, l *logger.
 		IndexRules: spec.indexRules,
 	})
 
-	m.processorManager = &topNProcessorManager{
-		l:            l,
-		m:            m,
-		topNSchemas:  spec.topNAggregations,
-		processorMap: make(map[*commonv1.Metadata][]*topNStreamingProcessor),
-	}
-
-	err := m.processorManager.start()
-	if err != nil {
-		return nil, err
-	}
-
 	return m, nil
 }
diff --git a/banyand/measure/measure_topn.go b/banyand/measure/measure_topn.go
index 4d510991..ac207ae0 100644
--- a/banyand/measure/measure_topn.go
+++ b/banyand/measure/measure_topn.go
@@ -26,23 +26,31 @@ import (
 	"sync"
 	"time"
 
+	"github.com/google/go-cmp/cmp"
 	"github.com/pkg/errors"
 	"go.uber.org/multierr"
 	"golang.org/x/exp/slices"
-	"google.golang.org/protobuf/proto"
+	"google.golang.org/protobuf/testing/protocmp"
+	"google.golang.org/protobuf/types/known/timestamppb"
 
 	"github.com/apache/skywalking-banyandb/api/common"
+	apiData "github.com/apache/skywalking-banyandb/api/data"
 	commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
 	databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
 	measurev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
 	modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+	"github.com/apache/skywalking-banyandb/banyand/metadata"
+	"github.com/apache/skywalking-banyandb/banyand/metadata/schema"
+	"github.com/apache/skywalking-banyandb/banyand/queue"
 	"github.com/apache/skywalking-banyandb/banyand/tsdb"
+	"github.com/apache/skywalking-banyandb/pkg/bus"
 	"github.com/apache/skywalking-banyandb/pkg/flow"
 	"github.com/apache/skywalking-banyandb/pkg/flow/streaming"
 	"github.com/apache/skywalking-banyandb/pkg/flow/streaming/sources"
 	"github.com/apache/skywalking-banyandb/pkg/logger"
 	"github.com/apache/skywalking-banyandb/pkg/partition"
 	pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+	"github.com/apache/skywalking-banyandb/pkg/query/logical"
 )
 
 const (
@@ -56,8 +64,6 @@ var (
 	_ io.Closer = (*topNProcessorManager)(nil)
 	_ flow.Sink = (*topNStreamingProcessor)(nil)
 
-	errUnsupportedConditionValueType = errors.New("unsupported value type in the condition")
-
 	// TopNValueFieldSpec denotes the field specification of the topN calculated result.
 	TopNValueFieldSpec = &databasev1.FieldSpec{
 		Name:              "value",
@@ -68,17 +74,17 @@ var (
 )
 
 type topNStreamingProcessor struct {
-	databaseSupplier tsdb.Supplier
-	streamingFlow    flow.Flow
-	l                *logger.Logger
-	topNSchema       *databasev1.TopNAggregation
-	src              chan interface{}
-	in               chan flow.StreamRecord
-	errCh            <-chan error
-	stopCh           chan struct{}
+	m             *measure
+	streamingFlow flow.Flow
+	l             *logger.Logger
+	pipeline      queue.Queue
+	topNSchema    *databasev1.TopNAggregation
+	src           chan interface{}
+	in            chan flow.StreamRecord
+	errCh         <-chan error
+	stopCh        chan struct{}
 	flow.ComponentState
 	interval      time.Duration
-	shardNum      uint32
 	sortDirection modelv1.Sort
 }
 
@@ -100,7 +106,8 @@ func (t *topNStreamingProcessor) run(ctx context.Context) {
 			if !ok {
 				return
 			}
-			if err := t.writeStreamRecord(ctx, record); err != nil {
+			// nolint: contextcheck
+			if err := t.writeStreamRecord(record); err != nil {
 				t.l.Err(err).Msg("fail to write stream record")
 			}
 		case <-ctx.Done():
@@ -126,7 +133,7 @@ func (t *topNStreamingProcessor) Close() error {
 	return err
 }
 
-func (t *topNStreamingProcessor) writeStreamRecord(ctx context.Context, record flow.StreamRecord) error {
+func (t *topNStreamingProcessor) writeStreamRecord(record flow.StreamRecord) error {
 	tuples, ok := record.Data().([]*streaming.Tuple2)
 	if !ok {
 		return errors.New("invalid data type")
@@ -143,12 +150,12 @@ func (t *topNStreamingProcessor) writeStreamRecord(ctx context.Context, record f
 	for rankNum, tuple := range tuples {
 		fieldValue := tuple.V1.(int64)
 		data := tuple.V2.(flow.StreamRecord).Data().(flow.Data)
-		err = multierr.Append(err, t.writeData(ctx, eventTime, timeBucket, fieldValue, data, rankNum))
+		err = multierr.Append(err, t.writeData(eventTime, timeBucket, fieldValue, data, rankNum))
 	}
 	return err
 }
 
-func (t *topNStreamingProcessor) writeData(ctx context.Context, eventTime time.Time, timeBucket string, fieldValue int64, data flow.Data, rankNum int) error {
+func (t *topNStreamingProcessor) writeData(eventTime time.Time, timeBucket string, fieldValue int64, data flow.Data, rankNum int) error {
 	var tagValues []*modelv1.TagValue
 	if len(t.topNSchema.GetGroupByTagNames()) > 0 {
 		var ok bool
@@ -160,86 +167,59 @@ func (t *topNStreamingProcessor) writeData(ctx context.Context, eventTime time.T
 	if err != nil {
 		return err
 	}
-	shard, err := t.databaseSupplier.SupplyTSDB().Shard(shardID)
-	if err != nil {
-		return err
-	}
-	series, err := shard.Series().Get(tsdb.HashEntity(entity), entityValues)
-	if err != nil {
-		return err
-	}
-	ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
-	defer cancel()
-	span, err := series.Create(ctx, eventTime)
-	if err != nil {
-		if span != nil {
-			_ = span.Close()
-		}
-		return err
-	}
+
 	// measureID is consist of three parts,
 	// 1. groupValues
 	// 2. rankNumber
 	// 3. timeBucket
 	measureID := data[0].(string) + "_" + strconv.Itoa(rankNum) + "_" + timeBucket
-	writeFn := func() (tsdb.Writer, error) {
-		builder := span.WriterBuilder().Time(eventTime)
-		virtualTagFamily := &modelv1.TagFamilyForWrite{
-			Tags: []*modelv1.TagValue{
-				// MeasureID
-				{
-					Value: &modelv1.TagValue_Id{
-						Id: &modelv1.ID{
-							Value: measureID,
+	iwr := &measurev1.InternalWriteRequest{
+		Request: &measurev1.WriteRequest{
+			Metadata: t.topNSchema.GetMetadata(),
+			DataPoint: &measurev1.DataPointValue{
+				Timestamp: timestamppb.New(eventTime),
+				TagFamilies: []*modelv1.TagFamilyForWrite{
+					{
+						Tags: []*modelv1.TagValue{
+							// MeasureID
+							{
+								Value: &modelv1.TagValue_Id{
+									Id: &modelv1.ID{
+										Value: measureID,
+									},
+								},
+							},
+							// GroupValues for merge in post processor
+							{
+								Value: &modelv1.TagValue_Str{
+									Str: &modelv1.Str{
+										Value: data[0].(string),
+									},
+								},
+							},
 						},
 					},
 				},
-				// GroupValues for merge in post processor
-				{
-					Value: &modelv1.TagValue_Str{
-						Str: &modelv1.Str{
-							Value: data[0].(string),
+				Fields: []*modelv1.FieldValue{
+					{
+						Value: &modelv1.FieldValue_Int{
+							Int: &modelv1.Int{
+								Value: fieldValue,
+							},
 						},
 					},
 				},
 			},
-		}
-		payload, errMarshal := proto.Marshal(virtualTagFamily)
-		if errMarshal != nil {
-			return nil, errMarshal
-		}
-		builder.Family(familyIdentity(TopNTagFamily, pbv1.TagFlag), payload)
-		virtualFieldValue := &modelv1.FieldValue{
-			Value: &modelv1.FieldValue_Int{
-				Int: &modelv1.Int{
-					Value: fieldValue,
-				},
-			},
-		}
-		fieldData := encodeFieldValue(virtualFieldValue)
-		builder.Family(familyIdentity(TopNValueFieldSpec.GetName(), pbv1.EncoderFieldFlag(TopNValueFieldSpec, t.interval)), fieldData)
-		writer, errWrite := builder.Build()
-		if errWrite != nil {
-			return nil, errWrite
-		}
-		_, errWrite = writer.Write()
-		if e := t.l.Debug(); e.Enabled() {
-			e.Time("ts", eventTime).
-				Int("ts_nano", eventTime.Nanosecond()).
-				Uint64("series_id", uint64(series.ID())).
-				Stringer("series", series).
-				Uint64("item_id", uint64(writer.ItemID().ID)).
-				Int("shard_id", int(shardID)).
-				Msg("write measure")
-		}
-		return writer, errWrite
+		},
+		ShardId:    uint32(shardID),
+		SeriesHash: tsdb.HashEntity(entity),
 	}
-	_, err = writeFn()
-	if err != nil {
-		_ = span.Close()
-		return err
+	if t.l.Debug().Enabled() {
+		iwr.EntityValues = entityValues.Encode()
 	}
-	return span.Close()
+	message := bus.NewMessage(bus.MessageID(time.Now().UnixNano()), iwr)
+	_, errWritePub := t.pipeline.Publish(apiData.TopicMeasureWrite, message)
+	return errWritePub
 }
 
 func (t *topNStreamingProcessor) downSampleTimeBucket(eventTimeMillis int64) time.Time {
@@ -268,7 +248,7 @@ func (t *topNStreamingProcessor) locate(tagValues []*modelv1.TagValue, rankNum i
 	if err != nil {
 		return nil, nil, 0, err
 	}
-	id, err := partition.ShardID(e.Marshal(), t.shardNum)
+	id, err := partition.ShardID(e.Marshal(), t.m.shardNum)
 	if err != nil {
 		return nil, nil, 0, err
 	}
@@ -306,7 +286,10 @@ func (t *topNStreamingProcessor) handleError() {
 // topNProcessorManager manages multiple topNStreamingProcessor(s) belonging to a single measure.
 type topNProcessorManager struct {
 	l            *logger.Logger
+	pipeline     queue.Queue
+	repo         metadata.Repo
 	m            *measure
+	s            logical.TagSpecRegistry
 	processorMap map[*commonv1.Metadata][]*topNStreamingProcessor
 	topNSchemas  []*databasev1.TopNAggregation
 	sync.RWMutex
@@ -336,9 +319,54 @@ func (manager *topNProcessorManager) onMeasureWrite(request *measurev1.WriteRequ
 	}()
 }
 
+func (manager *topNProcessorManager) createOrUpdateTopNMeasure(topNSchema *databasev1.TopNAggregation) error {
+	m, err := manager.repo.MeasureRegistry().GetMeasure(context.TODO(), topNSchema.GetMetadata())
+	if err != nil && !errors.Is(err, schema.ErrGRPCResourceNotFound) {
+		return err
+	}
+
+	// create a new "derived" measure for TopN result
+	newTopNMeasure := &databasev1.Measure{
+		Metadata: topNSchema.GetMetadata(),
+		Interval: manager.m.schema.GetInterval(),
+		TagFamilies: []*databasev1.TagFamilySpec{
+			{
+				Name: TopNTagFamily,
+				Tags: []*databasev1.TagSpec{
+					{
+						Name: "measure_id",
+						Type: databasev1.TagType_TAG_TYPE_ID,
+					},
+					{
+						Name: "group_values",
+						Type: databasev1.TagType_TAG_TYPE_STRING,
+					},
+				},
+			},
+		},
+		Fields: []*databasev1.FieldSpec{TopNValueFieldSpec},
+	}
+	if m == nil {
+		return manager.repo.MeasureRegistry().CreateMeasure(context.Background(), newTopNMeasure)
+	}
+	// compare with the old one
+	if cmp.Diff(newTopNMeasure, m,
+		protocmp.IgnoreUnknown(),
+		protocmp.IgnoreFields(&databasev1.Measure{}, "updated_at"),
+		protocmp.IgnoreFields(&commonv1.Metadata{}, "id", "create_revision", "mod_revision"),
+		protocmp.Transform()) == "" {
+		return nil
+	}
+	// update
+	return manager.repo.MeasureRegistry().UpdateMeasure(context.Background(), newTopNMeasure)
+}
+
 func (manager *topNProcessorManager) start() error {
 	interval := manager.m.interval
 	for _, topNSchema := range manager.topNSchemas {
+		if createErr := manager.createOrUpdateTopNMeasure(topNSchema); createErr != nil {
+			return createErr
+		}
 		sortDirections := make([]modelv1.Sort, 0, 2)
 		if topNSchema.GetFieldValueSort() == modelv1.Sort_SORT_UNSPECIFIED {
 			sortDirections = append(sortDirections, modelv1.Sort_SORT_ASC, modelv1.Sort_SORT_DESC)
@@ -363,18 +391,17 @@ func (manager *topNProcessorManager) start() error {
 				return innerErr
 			}
 			streamingFlow = streamingFlow.Map(mapper)
-
 			processor := &topNStreamingProcessor{
-				l:                manager.l,
-				shardNum:         manager.m.shardNum,
-				interval:         interval,
-				topNSchema:       topNSchema,
-				sortDirection:    sortDirection,
-				databaseSupplier: manager.m.databaseSupplier,
-				src:              srcCh,
-				in:               make(chan flow.StreamRecord),
-				stopCh:           make(chan struct{}),
-				streamingFlow:    streamingFlow,
+				m:             manager.m,
+				l:             manager.l,
+				interval:      interval,
+				topNSchema:    topNSchema,
+				sortDirection: sortDirection,
+				src:           srcCh,
+				in:            make(chan flow.StreamRecord),
+				stopCh:        make(chan struct{}),
+				streamingFlow: streamingFlow,
+				pipeline:      manager.pipeline,
 			}
 			processorList[i] = processor.start()
 		}
@@ -393,88 +420,22 @@ func (manager *topNProcessorManager) buildFilter(criteria *modelv1.Criteria) (fl
 		}, nil
 	}
 
-	f, err := manager.buildFilterForCriteria(criteria)
+	f, err := logical.BuildSimpleTagFilter(criteria)
 	if err != nil {
 		return nil, err
 	}
 
 	return func(_ context.Context, dataPoint any) bool {
-		tfs := dataPoint.(*measurev1.DataPointValue).GetTagFamilies()
-		return f.predicate(tfs)
+		tffws := dataPoint.(*measurev1.DataPointValue).GetTagFamilies()
+		ok, matchErr := f.Match(logical.TagFamiliesForWrite(tffws), manager.s)
+		if matchErr != nil {
+			manager.l.Err(matchErr).Msg("fail to match criteria")
+			return false
+		}
+		return ok
 	}, nil
 }
 
-func (manager *topNProcessorManager) buildFilterForCriteria(criteria *modelv1.Criteria) (conditionFilter, error) {
-	switch v := criteria.GetExp().(type) {
-	case *modelv1.Criteria_Condition:
-		return manager.buildFilterForCondition(v.Condition)
-	case *modelv1.Criteria_Le:
-		return manager.buildFilterForLogicalExpr(v.Le)
-	default:
-		return nil, errors.New("should not reach here")
-	}
-}
-
-// buildFilterForCondition builds a logical and composable filter for a logical expression which have underlying conditions,
-// or nested logical expressions as its children.
-func (manager *topNProcessorManager) buildFilterForLogicalExpr(logicalExpr *modelv1.LogicalExpression) (conditionFilter, error) {
-	left, lErr := manager.buildFilterForCriteria(logicalExpr.Left)
-	if lErr != nil {
-		return nil, lErr
-	}
-	right, rErr := manager.buildFilterForCriteria(logicalExpr.Right)
-	if rErr != nil {
-		return nil, rErr
-	}
-	return composeWithOp(left, right, logicalExpr.Op), nil
-}
-
-func composeWithOp(left, right conditionFilter, op modelv1.LogicalExpression_LogicalOp) conditionFilter {
-	if op == modelv1.LogicalExpression_LOGICAL_OP_AND {
-		return &andFilter{left, right}
-	}
-	return &orFilter{left, right}
-}
-
-// buildFilterForCondition builds a single, composable filter for a single condition.
-func (manager *topNProcessorManager) buildFilterForCondition(cond *modelv1.Condition) (conditionFilter, error) {
-	familyOffset, tagOffset, spec := pbv1.FindTagByName(manager.m.GetSchema().GetTagFamilies(), cond.GetName())
-	if spec == nil {
-		return nil, errors.New("fail to parse tag by name")
-	}
-	switch v := cond.GetValue().GetValue().(type) {
-	case *modelv1.TagValue_Int:
-		return &int64TagFilter{
-			TagLocator: partition.TagLocator{
-				FamilyOffset: familyOffset,
-				TagOffset:    tagOffset,
-			},
-			op:  cond.GetOp(),
-			val: v.Int.GetValue(),
-		}, nil
-	case *modelv1.TagValue_Str:
-		return &strTagFilter{
-			TagLocator: partition.TagLocator{
-				FamilyOffset: familyOffset,
-				TagOffset:    tagOffset,
-			},
-			op:  cond.GetOp(),
-			val: v.Str.GetValue(),
-		}, nil
-	case *modelv1.TagValue_Id:
-		return &idTagFilter{
-			TagLocator: partition.TagLocator{
-				FamilyOffset: familyOffset,
-				TagOffset:    tagOffset,
-			},
-			op:  cond.GetOp(),
-			val: v.Id.GetValue(),
-		}, nil
-	default:
-		return nil, errUnsupportedConditionValueType
-	}
-}
-
 func (manager *topNProcessorManager) buildMapper(fieldName string, groupByNames ...string) (flow.UnaryFunc[any], error) {
 	fieldIdx := slices.IndexFunc(manager.m.GetSchema().GetFields(), func(spec *databasev1.FieldSpec) bool {
 		return spec.GetName() == fieldName
@@ -485,6 +446,13 @@ func (manager *topNProcessorManager) buildMapper(fieldName string, groupByNames
 	if len(groupByNames) == 0 {
 		return func(_ context.Context, request any) any {
 			dataPoint := request.(*measurev1.DataPointValue)
+			if len(dataPoint.GetFields()) <= fieldIdx {
+				manager.l.Warn().Interface("point", dataPoint).
+					Str("fieldName", fieldName).
+					Int("len", len(dataPoint.GetFields())).
+					Int("fieldIdx", fieldIdx).
+					Msg("out of range")
+			}
 			return flow.Data{
 				// save string representation of group values as the key, i.e. v1
 				"",
@@ -518,93 +486,6 @@ func (manager *topNProcessorManager) buildMapper(fieldName string, groupByNames
 	}, nil
 }
 
-var (
-	_ conditionFilter = (*strTagFilter)(nil)
-	_ conditionFilter = (*int64TagFilter)(nil)
-)
-
-type conditionFilter interface {
-	predicate(tagFamilies []*modelv1.TagFamilyForWrite) bool
-}
-
-type strTagFilter struct {
-	val string
-	partition.TagLocator
-	op modelv1.Condition_BinaryOp
-}
-
-func (f *strTagFilter) predicate(tagFamilies []*modelv1.TagFamilyForWrite) bool {
-	strValue := tagFamilies[f.FamilyOffset].GetTags()[f.TagOffset].GetStr().GetValue()
-	switch f.op {
-	case modelv1.Condition_BINARY_OP_EQ:
-		return strValue == f.val
-	case modelv1.Condition_BINARY_OP_NE:
-		return strValue != f.val
-	default:
-		return false
-	}
-}
-
-type idTagFilter struct {
-	val string
-	partition.TagLocator
-	op modelv1.Condition_BinaryOp
-}
-
-func (f *idTagFilter) predicate(tagFamilies []*modelv1.TagFamilyForWrite) bool {
-	val := tagFamilies[f.FamilyOffset].GetTags()[f.TagOffset].GetId().GetValue()
-	switch f.op {
-	case modelv1.Condition_BINARY_OP_EQ:
-		return val == f.val
-	case modelv1.Condition_BINARY_OP_NE:
-		return val != f.val
-	default:
-		return false
-	}
-}
-
-type int64TagFilter struct {
-	partition.TagLocator
-	op  modelv1.Condition_BinaryOp
-	val int64
-}
-
-func (f *int64TagFilter) predicate(tagFamilies []*modelv1.TagFamilyForWrite) bool {
-	val := tagFamilies[f.FamilyOffset].GetTags()[f.TagOffset].GetInt().GetValue()
-	switch f.op {
-	case modelv1.Condition_BINARY_OP_EQ:
-		return val == f.val
-	case modelv1.Condition_BINARY_OP_NE:
-		return val != f.val
-	case modelv1.Condition_BINARY_OP_GE:
-		return val >= f.val
-	case modelv1.Condition_BINARY_OP_GT:
-		return val > f.val
-	case modelv1.Condition_BINARY_OP_LE:
-		return val <= f.val
-	case modelv1.Condition_BINARY_OP_LT:
-		return val < f.val
-	default:
-		return false
-	}
-}
-
-type andFilter struct {
-	l, r conditionFilter
-}
-
-func (f *andFilter) predicate(tagFamilies []*modelv1.TagFamilyForWrite) bool {
-	return f.l.predicate(tagFamilies) && f.r.predicate(tagFamilies)
-}
-
-type orFilter struct {
-	l, r conditionFilter
-}
-
-func (f *orFilter) predicate(tagFamilies []*modelv1.TagFamilyForWrite) bool {
-	return f.l.predicate(tagFamilies) || f.r.predicate(tagFamilies)
-}
-
 // groupTagsLocator can be used to locate tags within families.
 type groupTagsLocator []partition.TagLocator
 
diff --git a/banyand/measure/measure_write.go b/banyand/measure/measure_write.go
index b91d5a0d..2c2eb233 100644
--- a/banyand/measure/measure_write.go
+++ b/banyand/measure/measure_write.go
@@ -25,7 +25,6 @@ import (
 	"github.com/pkg/errors"
 
 	"github.com/apache/skywalking-banyandb/api/common"
-	commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
 	measurev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
 	modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
 	"github.com/apache/skywalking-banyandb/banyand/tsdb"
@@ -39,17 +38,18 @@ import (
 
 var errMalformedElement = errors.New("element is malformed")
 
-func (s *measure) write(md *commonv1.Metadata, shardID common.ShardID, entity []byte, entityValues tsdb.EntityValues, value *measurev1.DataPointValue) error {
+func (s *measure) write(shardID common.ShardID, entity []byte, entityValues tsdb.EntityValues,
+	value *measurev1.DataPointValue,
+) error {
 	t := value.GetTimestamp().AsTime().Local()
 	if err := timestamp.Check(t); err != nil {
 		return errors.WithMessage(err, "writing stream")
 	}
-	sm := s.schema
 	fLen := len(value.GetTagFamilies())
 	if fLen < 1 {
 		return errors.Wrap(errMalformedElement, "no tag family")
 	}
-	if fLen > len(sm.TagFamilies) {
+	if fLen > len(s.schema.GetTagFamilies()) {
 		return errors.Wrap(errMalformedElement, "tag family number is more than expected")
 	}
 	shard, err := s.databaseSupplier.SupplyTSDB().Shard(shardID)
@@ -72,18 +72,18 @@ func (s *measure) write(md *commonv1.Metadata, shardID common.ShardID, entity []
 	writeFn := func() (tsdb.Writer, error) {
 		builder := wp.WriterBuilder().Time(t)
 		for fi, family := range value.GetTagFamilies() {
-			spec := sm.GetTagFamilies()[fi]
+			spec := s.schema.GetTagFamilies()[fi]
 			bb, errMarshal := pbv1.EncodeFamily(spec, family)
 			if errMarshal != nil {
 				return nil, errMarshal
 			}
 			builder.Family(familyIdentity(spec.GetName(), pbv1.TagFlag), bb)
 		}
-		if len(value.GetFields()) > len(sm.GetFields()) {
+		if len(value.GetFields()) > len(s.schema.GetFields()) {
 			return nil, errors.Wrap(errMalformedElement, "fields number is more than expected")
 		}
 		for fi, fieldValue := range value.GetFields() {
-			fieldSpec := sm.GetFields()[fi]
+			fieldSpec := s.schema.GetFields()[fi]
 			fType, isNull := pbv1.FieldValueTypeConv(fieldValue)
 			if isNull {
 				s.l.Warn().RawJSON("written", logger.Proto(value)).Msg("ignore null field")
@@ -97,14 +97,14 @@ func (s *measure) write(md *commonv1.Metadata, shardID common.ShardID, entity []
 				s.l.Warn().RawJSON("written", logger.Proto(value)).Msg("ignore unknown field")
 				continue
 			}
-			builder.Family(familyIdentity(sm.GetFields()[fi].GetName(), pbv1.EncoderFieldFlag(fieldSpec, s.interval)), data)
+			builder.Family(familyIdentity(s.schema.GetFields()[fi].GetName(), pbv1.EncoderFieldFlag(fieldSpec, s.interval)), data)
 		}
 		writer, errWrite := builder.Build()
 		if errWrite != nil {
 			return nil, errWrite
 		}
 		_, errWrite = writer.Write()
-		if e := s.l.Named(md.Group, md.Name).Debug(); e.Enabled() {
+		if e := s.l.Named(s.schema.GetMetadata().GetGroup(), s.schema.GetMetadata().GetName()).Debug(); e.Enabled() {
 			e.Time("ts", t).
 				Int("ts_nano", t.Nanosecond()).
 				RawJSON("data", logger.Proto(value)).
@@ -130,10 +130,12 @@ func (s *measure) write(md *commonv1.Metadata, shardID common.ShardID, entity []
 		BlockCloser: wp,
 	}
 	s.indexWriter.Write(m)
-	s.processorManager.onMeasureWrite(&measurev1.WriteRequest{
-		Metadata:  s.GetMetadata(),
-		DataPoint: value,
-	})
+	if s.processorManager != nil {
+		s.processorManager.onMeasureWrite(&measurev1.WriteRequest{
+			Metadata:  s.GetMetadata(),
+			DataPoint: value,
+		})
+	}
 	return err
 }
 
@@ -161,7 +163,7 @@ func (w *writeCallback) Rev(message bus.Message) (resp bus.Message) {
 		w.l.Warn().Msg("cannot find measure definition")
 		return
 	}
-	err := stm.write(writeEvent.GetRequest().GetMetadata(), common.ShardID(writeEvent.GetShardId()),
+	err := stm.write(common.ShardID(writeEvent.GetShardId()),
 		writeEvent.SeriesHash, tsdb.DecodeEntityValues(writeEvent.GetEntityValues()), writeEvent.GetRequest().GetDataPoint())
 	if err != nil {
 		w.l.Error().Err(err).RawJSON("written", logger.Proto(writeEvent)).Msg("fail to write entity")
diff --git a/banyand/measure/service.go b/banyand/measure/service.go
index a606c9fc..15d1bdea 100644
--- a/banyand/measure/service.go
+++ b/banyand/measure/service.go
@@ -145,6 +145,37 @@ func (s *service) Serve() run.StopNotify {
 		RegisterHandler(schema.KindGroup|schema.KindMeasure|schema.KindIndexRuleBinding|schema.KindIndexRule|schema.KindTopNAggregation,
 			&s.schemaRepo)
 
+	// start TopN manager after registering handlers
+	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+	groups, err := s.metadata.GroupRegistry().ListGroup(ctx)
+	cancel()
+
+	if err != nil {
+		s.l.Err(err).Msg("fail to list groups")
+		return s.stopCh
+	}
+
+	for _, g := range groups {
+		if g.Catalog != commonv1.Catalog_CATALOG_MEASURE {
+			continue
+		}
+		ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
+		allMeasureSchemas, listErr := s.metadata.MeasureRegistry().
+			ListMeasure(ctx, schema.ListOpt{Group: g.GetMetadata().GetName()})
+		cancel()
+		if listErr != nil {
+			s.l.Err(listErr).Str("group", g.GetMetadata().GetName()).Msg("fail to list measures in the group")
+			continue
+		}
+		for _, measureSchema := range allMeasureSchemas {
+			if res, ok := s.schemaRepo.LoadResource(measureSchema.GetMetadata()); ok {
+				if startErr := res.(*measure).startSteamingManager(s.pipeline, s.metadata); startErr != nil {
+					s.l.Err(startErr).Str("measure", measureSchema.GetMetadata().GetName()).Msg("fail to start streaming manager")
+				}
+			}
+		}
+	}
+
 	return s.stopCh
 }
 
diff --git a/banyand/metadata/schema/measure.go b/banyand/metadata/schema/measure.go
index 8895b8ba..b4276b71 100644
--- a/banyand/metadata/schema/measure.go
+++ b/banyand/metadata/schema/measure.go
@@ -92,7 +92,7 @@ func (e *etcdSchemaRegistry) CreateMeasure(ctx context.Context, measure *databas
 	for _, tfs := range measure.GetTagFamilies() {
 		for _, ts := range tfs.GetTags() {
 			if ts.Type == databasev1.TagType_TAG_TYPE_ID {
-				for _, e := range measure.Entity.TagNames {
+				for _, e := range measure.GetEntity().GetTagNames() {
 					if ts.Name == e {
 						continue
 					}
diff --git a/pkg/query/logical/measure/measure_analyzer.go b/pkg/query/logical/measure/measure_analyzer.go
index a8f6150b..95368147 100644
--- a/pkg/query/logical/measure/measure_analyzer.go
+++ b/pkg/query/logical/measure/measure_analyzer.go
@@ -36,18 +36,14 @@ func BuildSchema(measureSchema measure.Measure) (logical.Schema, error) {
 	ms := &schema{
 		common: &logical.CommonSchema{
 			IndexRules: measureSchema.GetIndexRules(),
-			TagMap:     make(map[string]*logical.TagSpec),
+			TagSpecMap: make(map[string]*logical.TagSpec),
 			EntityList: md.GetEntity().GetTagNames(),
 		},
 		measure:  md,
 		fieldMap: make(map[string]*logical.FieldSpec),
 	}
 
-	for tagFamilyIdx, tagFamily := range md.GetTagFamilies() {
-		for tagIdx, spec := range tagFamily.GetTags() {
-			ms.registerTag(tagFamilyIdx, tagIdx, spec)
-		}
-	}
+	ms.common.RegisterTagFamilies(md.GetTagFamilies())
 
 	for fieldIdx, spec := range md.GetFields() {
 		ms.registerField(fieldIdx, spec)
diff --git a/pkg/query/logical/measure/schema.go b/pkg/query/logical/measure/schema.go
index 59656612..b389053f 100644
--- a/pkg/query/logical/measure/schema.go
+++ b/pkg/query/logical/measure/schema.go
@@ -27,6 +27,7 @@ import (
 	"github.com/apache/skywalking-banyandb/pkg/query/logical"
 )
 
+// errFieldNotDefined indicated the field is not defined in the measure schema.
 var errFieldNotDefined = errors.New("field is not defined")
 
 type schema struct {
@@ -43,6 +44,10 @@ func (m *schema) EntityList() []string {
 	return m.common.EntityList
 }
 
+func (m *schema) FindTagSpecByName(name string) *logical.TagSpec {
+	return m.common.FindTagSpecByName(name)
+}
+
 func (m *schema) IndexDefined(tagName string) (bool, *databasev1.IndexRule) {
 	return m.common.IndexDefined(tagName)
 }
@@ -101,16 +106,11 @@ func (m *schema) ProjFields(fieldRefs ...*logical.FieldRef) logical.Schema {
 func (m *schema) Equal(s2 logical.Schema) bool {
 	if other, ok := s2.(*schema); ok {
 		// TODO: add more equality checks
-		return cmp.Equal(other.common.TagMap, m.common.TagMap)
+		return cmp.Equal(other.common.TagSpecMap, m.common.TagSpecMap)
 	}
 	return false
 }
 
-// registerTag registers the tag spec with given tagFamilyIdx and tagIdx.
-func (m *schema) registerTag(tagFamilyIdx, tagIdx int, spec *databasev1.TagSpec) {
-	m.common.RegisterTag(tagFamilyIdx, tagIdx, spec)
-}
-
 // registerField registers the field spec with given index.
 func (m *schema) registerField(fieldIdx int, spec *databasev1.FieldSpec) {
 	m.fieldMap[spec.GetName()] = &logical.FieldSpec{
diff --git a/pkg/query/logical/schema.go b/pkg/query/logical/schema.go
index 4512b9de..e0a14c07 100644
--- a/pkg/query/logical/schema.go
+++ b/pkg/query/logical/schema.go
@@ -24,12 +24,33 @@ import (
 	"github.com/apache/skywalking-banyandb/banyand/tsdb"
 )
 
+// IndexChecker allows checking the existence of a specific index rule.
+type IndexChecker interface {
+	IndexDefined(tagName string) (bool, *databasev1.IndexRule)
+	IndexRuleDefined(ruleName string) (bool, *databasev1.IndexRule)
+}
+
+type emptyIndexChecker struct{}
+
+func (emptyIndexChecker) IndexDefined(_ string) (bool, *databasev1.IndexRule) {
+	return false, nil
+}
+
+func (emptyIndexChecker) IndexRuleDefined(_ string) (bool, *databasev1.IndexRule) {
+	return false, nil
+}
+
+// TagSpecRegistry enables to find TagSpec by its name.
+type TagSpecRegistry interface {
+	FindTagSpecByName(string) *TagSpec
+}
+
 // Schema allows retrieving schemas in a convenient way.
 type Schema interface {
+	TagSpecRegistry
+	IndexChecker
 	Scope() tsdb.Entry
 	EntityList() []string
-	IndexDefined(tagName string) (bool, *databasev1.IndexRule)
-	IndexRuleDefined(string) (bool, *databasev1.IndexRule)
 	CreateTagRef(tags ...[]*Tag) ([][]*TagRef, error)
 	CreateFieldRef(fields ...*Field) ([]*FieldRef, error)
 	ProjTags(refs ...[]*TagRef) Schema
@@ -50,11 +71,40 @@ func (fs *TagSpec) Equal(other *TagSpec) bool {
 		fs.Spec.GetType() == other.Spec.GetType() && fs.Spec.GetName() == other.Spec.GetName()
 }
 
+// TagSpecMap is a map of TapSpec implements TagSpecRegistry.
+type TagSpecMap map[string]*TagSpec
+
+// FindTagSpecByName finds TagSpec by its name in the registry.
+func (tagSpecMap TagSpecMap) FindTagSpecByName(name string) *TagSpec {
+	if spec, ok := tagSpecMap[name]; ok {
+		return spec
+	}
+	return nil
+}
+
+// RegisterTagFamilies registers the tag specs with a given slice of TagFamilySpec.
+func (tagSpecMap TagSpecMap) RegisterTagFamilies(tagFamilies []*databasev1.TagFamilySpec) {
+	for tagFamilyIdx, tagFamily := range tagFamilies {
+		for tagIdx, spec := range tagFamily.GetTags() {
+			tagSpecMap.RegisterTag(tagFamilyIdx, tagIdx, spec)
+		}
+	}
+}
+
+// RegisterTag registers the tag spec with given tagFamilyName, tagName and indexes.
+func (tagSpecMap TagSpecMap) RegisterTag(tagFamilyIdx, tagIdx int, spec *databasev1.TagSpec) {
+	tagSpecMap[spec.GetName()] = &TagSpec{
+		TagIdx:       tagIdx,
+		TagFamilyIdx: tagFamilyIdx,
+		Spec:         spec,
+	}
+}
+
 // CommonSchema represents a sharable fields between independent schemas.
 // It provides common access methods at the same time.
 type CommonSchema struct {
+	TagSpecMap
 	IndexRules []*databasev1.IndexRule
-	TagMap     map[string]*TagSpec
 	EntityList []string
 }
 
@@ -65,12 +115,12 @@ func (cs *CommonSchema) ProjTags(refs ...[]*TagRef) *CommonSchema {
 	}
 	newCommonSchema := &CommonSchema{
 		IndexRules: cs.IndexRules,
-		TagMap:     make(map[string]*TagSpec),
+		TagSpecMap: make(map[string]*TagSpec),
 		EntityList: cs.EntityList,
 	}
 	for projFamilyIdx, refInFamily := range refs {
 		for projIdx, ref := range refInFamily {
-			newCommonSchema.TagMap[ref.Tag.getTagName()] = &TagSpec{
+			newCommonSchema.TagSpecMap[ref.Tag.getTagName()] = &TagSpec{
 				TagFamilyIdx: projFamilyIdx,
 				TagIdx:       projIdx,
 				Spec:         ref.Spec.Spec,
@@ -80,15 +130,6 @@ func (cs *CommonSchema) ProjTags(refs ...[]*TagRef) *CommonSchema {
 	return newCommonSchema
 }
 
-// RegisterTag registers the tag spec with given tagFamilyName, tagName and indexes.
-func (cs *CommonSchema) RegisterTag(tagFamilyIdx, tagIdx int, spec *databasev1.TagSpec) {
-	cs.TagMap[spec.GetName()] = &TagSpec{
-		TagIdx:       tagIdx,
-		TagFamilyIdx: tagFamilyIdx,
-		Spec:         spec,
-	}
-}
-
 // IndexDefined checks whether the field given is indexed.
 func (cs *CommonSchema) IndexDefined(tagName string) (bool, *databasev1.IndexRule) {
 	for _, idxRule := range cs.IndexRules {
@@ -119,7 +160,7 @@ func (cs *CommonSchema) CreateRef(tags ...[]*Tag) ([][]*TagRef, error) {
 	for i, tagInFamily := range tags {
 		var tagRefsInFamily []*TagRef
 		for _, tag := range tagInFamily {
-			if ts, ok := cs.TagMap[tag.getTagName()]; ok {
+			if ts, ok := cs.TagSpecMap[tag.getTagName()]; ok {
 				tagRefsInFamily = append(tagRefsInFamily, &TagRef{tag, ts})
 			} else {
 				return nil, errors.Wrap(errTagNotDefined, tag.GetCompoundName())
diff --git a/pkg/query/logical/stream/schema.go b/pkg/query/logical/stream/schema.go
index fc7bf59b..47f468d5 100644
--- a/pkg/query/logical/stream/schema.go
+++ b/pkg/query/logical/stream/schema.go
@@ -33,6 +33,10 @@ type schema struct {
 	common *logical.CommonSchema
 }
 
+func (s *schema) FindTagSpecByName(name string) *logical.TagSpec {
+	return s.common.FindTagSpecByName(name)
+}
+
 func (s *schema) CreateFieldRef(_ ...*logical.Field) ([]*logical.FieldRef, error) {
 	panic("no field for stream")
 }
@@ -52,16 +56,11 @@ func (s *schema) IndexDefined(tagName string) (bool, *databasev1.IndexRule) {
 
 func (s *schema) Equal(s2 logical.Schema) bool {
 	if other, ok := s2.(*schema); ok {
-		return cmp.Equal(other.common.TagMap, s.common.TagMap)
+		return cmp.Equal(other.common.TagSpecMap, s.common.TagSpecMap)
 	}
 	return false
 }
 
-// registerTag registers the tag spec with given tagFamilyName, tagName and indexes.
-func (s *schema) registerTag(tagFamilyIdx, tagIdx int, spec *databasev1.TagSpec) {
-	s.common.RegisterTag(tagFamilyIdx, tagIdx, spec)
-}
-
 // CreateTagRef create TagRef to the given tags.
 // The family name of the tag is actually not used
 // since the uniqueness of the tag names can be guaranteed across families.
diff --git a/pkg/query/logical/stream/stream_analyzer.go b/pkg/query/logical/stream/stream_analyzer.go
index 5972caa1..4771eeab 100644
--- a/pkg/query/logical/stream/stream_analyzer.go
+++ b/pkg/query/logical/stream/stream_analyzer.go
@@ -37,18 +37,13 @@ func BuildSchema(streamSchema stream.Stream) (logical.Schema, error) {
 	s := &schema{
 		common: &logical.CommonSchema{
 			IndexRules: streamSchema.GetIndexRules(),
-			TagMap:     make(map[string]*logical.TagSpec),
+			TagSpecMap: make(map[string]*logical.TagSpec),
 			EntityList: sm.GetEntity().GetTagNames(),
 		},
 		stream: sm,
 	}
 
-	// generate the streamSchema of the fields for the traceSeries
-	for tagFamilyIdx, tagFamily := range sm.GetTagFamilies() {
-		for tagIdx, spec := range tagFamily.GetTags() {
-			s.registerTag(tagFamilyIdx, tagIdx, spec)
-		}
-	}
+	s.common.RegisterTagFamilies(sm.GetTagFamilies())
 
 	return s, nil
 }
diff --git a/pkg/query/logical/stream/stream_plan_tag_filter.go b/pkg/query/logical/stream/stream_plan_tag_filter.go
index d74c3654..4ea0b3bf 100644
--- a/pkg/query/logical/stream/stream_plan_tag_filter.go
+++ b/pkg/query/logical/stream/stream_plan_tag_filter.go
@@ -85,7 +85,8 @@ func (uis *unresolvedTagFilter) Analyze(s logical.Schema) (logical.Plan, error)
 			return nil, errFilter
 		}
 		if tagFilter != logical.DummyFilter {
-			plan = newTagFilter(s, plan, tagFilter)
+			// create tagFilter with a projected view
+			plan = newTagFilter(s.ProjTags(ctx.projTagsRefs...), plan, tagFilter)
 		}
 	}
 	return plan, err
@@ -167,7 +168,7 @@ func (t *tagFilterPlan) Execute(ec executor.StreamExecutionContext) ([]*streamv1
 	}
 	filteredElements := make([]*streamv1.Element, 0)
 	for _, e := range entities {
-		ok, err := t.tagFilter.Match(e.TagFamilies)
+		ok, err := t.tagFilter.Match(logical.TagFamilies(e.TagFamilies), t.s)
 		if err != nil {
 			return nil, err
 		}
diff --git a/pkg/query/logical/tag_filter.go b/pkg/query/logical/tag_filter.go
index cafd6658..8e4fdce2 100644
--- a/pkg/query/logical/tag_filter.go
+++ b/pkg/query/logical/tag_filter.go
@@ -29,14 +29,55 @@ import (
 
 var errUnsupportedLogicalOperation = errors.New("unsupported logical operation")
 
+// TagValueIndexAccessor provides accessor to get TagValue by two indexes, i.e. tagFamilyIndex and tagIndex.
+// It works like a matrix.
+type TagValueIndexAccessor interface {
+	GetTagValue(tagFamilyIdx, tagIdx int) *modelv1.TagValue
+}
+
+// TagFamilies wraps a slice of TagFamily.
+type TagFamilies []*modelv1.TagFamily
+
+// GetTagValue gets TagValue from the underlying TagFamily slice.
+func (tfs TagFamilies) GetTagValue(tagFamilyIdx, tagIdx int) *modelv1.TagValue {
+	if len(tfs)-1 < tagFamilyIdx {
+		return nil
+	}
+	tags := tfs[tagFamilyIdx].GetTags()
+	if len(tags)-1 < tagIdx {
+		return nil
+	}
+	return tags[tagIdx].GetValue()
+}
+
+// TagFamiliesForWrite wraps a slice of TagFamilyForWrite.
+type TagFamiliesForWrite []*modelv1.TagFamilyForWrite
+
+// GetTagValue gets TagValue from the underlying TagFamilyForWrite slice.
+func (tffws TagFamiliesForWrite) GetTagValue(tagFamilyIdx, tagIdx int) *modelv1.TagValue {
+	if len(tffws)-1 < tagFamilyIdx {
+		return nil
+	}
+	tagVals := tffws[tagFamilyIdx].GetTags()
+	if len(tagVals)-1 < tagIdx {
+		return nil
+	}
+	return tagVals[tagIdx]
+}
+
 // TagFilter allows matching a tag based on a predicate.
 type TagFilter interface {
 	fmt.Stringer
-	Match(tagFamilies []*modelv1.TagFamily) (bool, error)
+	Match(accessor TagValueIndexAccessor, registry TagSpecRegistry) (bool, error)
+}
+
+// BuildSimpleTagFilter returns a TagFilter without any local-index, global index, sharding key support.
+func BuildSimpleTagFilter(criteria *modelv1.Criteria) (TagFilter, error) {
+	return BuildTagFilter(criteria, nil, emptyIndexChecker{}, false)
 }
 
 // BuildTagFilter returns a TagFilter if predicates doesn't match any indices.
-func BuildTagFilter(criteria *modelv1.Criteria, entityDict map[string]int, schema Schema, hasGlobalIndex bool) (TagFilter, error) {
+func BuildTagFilter(criteria *modelv1.Criteria, entityDict map[string]int, indexChecker IndexChecker, hasGlobalIndex bool) (TagFilter, error) {
 	if criteria == nil {
 		return DummyFilter, nil
 	}
@@ -47,7 +88,7 @@ func BuildTagFilter(criteria *modelv1.Criteria, entityDict map[string]int, schem
 		if err != nil {
 			return nil, err
 		}
-		if ok, _ := schema.IndexDefined(cond.Name); ok {
+		if ok, _ := indexChecker.IndexDefined(cond.Name); ok {
 			return DummyFilter, nil
 		}
 		if _, ok := entityDict[cond.Name]; ok {
@@ -56,11 +97,11 @@ func BuildTagFilter(criteria *modelv1.Criteria, entityDict map[string]int, schem
 		return parseFilter(cond, expr)
 	case *modelv1.Criteria_Le:
 		le := criteria.GetLe()
-		left, err := BuildTagFilter(le.Left, entityDict, schema, hasGlobalIndex)
+		left, err := BuildTagFilter(le.Left, entityDict, indexChecker, hasGlobalIndex)
 		if err != nil {
 			return nil, err
 		}
-		right, err := BuildTagFilter(le.Right, entityDict, schema, hasGlobalIndex)
+		right, err := BuildTagFilter(le.Right, entityDict, indexChecker, hasGlobalIndex)
 		if err != nil {
 			return nil, err
 		}
@@ -150,7 +191,9 @@ var DummyFilter = new(dummyTagFilter)
 
 type dummyTagFilter struct{}
 
-func (dummyTagFilter) Match(_ []*modelv1.TagFamily) (bool, error) { return true, nil }
+func (dummyTagFilter) Match(_ TagValueIndexAccessor, _ TagSpecRegistry) (bool, error) {
+	return true, nil
+}
 
 func (dummyTagFilter) String() string { return "dummy" }
 
@@ -171,10 +214,10 @@ func (n *logicalNode) append(sub TagFilter) *logicalNode {
 	return n
 }
 
-func matchTag(tagFamilies []*modelv1.TagFamily, n *logicalNode, lp logicalNodeOP) (bool, error) {
+func matchTag(accessor TagValueIndexAccessor, registry TagSpecRegistry, n *logicalNode, lp logicalNodeOP) (bool, error) {
 	var result *bool
 	for _, sn := range n.SubNodes {
-		r, err := sn.Match(tagFamilies)
+		r, err := sn.Match(accessor, registry)
 		if err != nil {
 			return false, err
 		}
@@ -209,8 +252,8 @@ func (an *andLogicalNode) merge(bb ...bool) bool {
 	return true
 }
 
-func (an *andLogicalNode) Match(tagFamilies []*modelv1.TagFamily) (bool, error) {
-	return matchTag(tagFamilies, an.logicalNode, an)
+func (an *andLogicalNode) Match(accessor TagValueIndexAccessor, registry TagSpecRegistry) (bool, error) {
+	return matchTag(accessor, registry, an.logicalNode, an)
 }
 
 func (an *andLogicalNode) MarshalJSON() ([]byte, error) {
@@ -244,8 +287,8 @@ func (on *orLogicalNode) merge(bb ...bool) bool {
 	return false
 }
 
-func (on *orLogicalNode) Match(tagFamilies []*modelv1.TagFamily) (bool, error) {
-	return matchTag(tagFamilies, on.logicalNode, on)
+func (on *orLogicalNode) Match(accessor TagValueIndexAccessor, registry TagSpecRegistry) (bool, error) {
+	return matchTag(accessor, registry, on.logicalNode, on)
 }
 
 func (on *orLogicalNode) MarshalJSON() ([]byte, error) {
@@ -282,8 +325,8 @@ func newNotTag(inner TagFilter) *notTag {
 	}
 }
 
-func (n *notTag) Match(tagFamilies []*modelv1.TagFamily) (bool, error) {
-	b, err := n.Inner.Match(tagFamilies)
+func (n *notTag) Match(accessor TagValueIndexAccessor, registry TagSpecRegistry) (bool, error) {
+	b, err := n.Inner.Match(accessor, registry)
 	if err != nil {
 		return false, err
 	}
@@ -313,8 +356,8 @@ func newInTag(tagName string, values LiteralExpr) *inTag {
 	}
 }
 
-func (h *inTag) Match(tagFamilies []*modelv1.TagFamily) (bool, error) {
-	expr, err := tagExpr(tagFamilies, h.Name)
+func (h *inTag) Match(accessor TagValueIndexAccessor, registry TagSpecRegistry) (bool, error) {
+	expr, err := tagExpr(accessor, registry, h.Name)
 	if err != nil {
 		return false, err
 	}
@@ -334,8 +377,8 @@ func newEqTag(tagName string, values LiteralExpr) *eqTag {
 	}
 }
 
-func (eq *eqTag) Match(tagFamilies []*modelv1.TagFamily) (bool, error) {
-	expr, err := tagExpr(tagFamilies, eq.Name)
+func (eq *eqTag) Match(accessor TagValueIndexAccessor, registry TagSpecRegistry) (bool, error) {
+	expr, err := tagExpr(accessor, registry, eq.Name)
 	if err != nil {
 		return false, err
 	}
@@ -373,8 +416,8 @@ func newRangeTag(tagName string, opts rangeOpts) *rangeTag {
 	}
 }
 
-func (r *rangeTag) Match(tagFamilies []*modelv1.TagFamily) (bool, error) {
-	expr, err := tagExpr(tagFamilies, r.Name)
+func (r *rangeTag) Match(accessor TagValueIndexAccessor, registry TagSpecRegistry) (bool, error) {
+	expr, err := tagExpr(accessor, registry, r.Name)
 	if err != nil {
 		return false, err
 	}
@@ -442,12 +485,10 @@ func (r *rangeTag) String() string {
 	return jsonToString(r)
 }
 
-func tagExpr(tagFamilies []*modelv1.TagFamily, tagName string) (ComparableExpr, error) {
-	for _, tf := range tagFamilies {
-		for _, t := range tf.Tags {
-			if t.Key == tagName {
-				return parseExpr(t.Value)
-			}
+func tagExpr(accessor TagValueIndexAccessor, registry TagSpecRegistry, tagName string) (ComparableExpr, error) {
+	if tagSpec := registry.FindTagSpecByName(tagName); tagSpec != nil {
+		if tagVal := accessor.GetTagValue(tagSpec.TagFamilyIdx, tagSpec.TagIdx); tagVal != nil {
+			return parseExpr(tagVal)
 		}
 	}
 	return nil, errTagNotDefined
@@ -466,8 +507,8 @@ func newHavingTag(tagName string, values LiteralExpr) *havingTag {
 	}
 }
 
-func (h *havingTag) Match(tagFamilies []*modelv1.TagFamily) (bool, error) {
-	expr, err := tagExpr(tagFamilies, h.Name)
+func (h *havingTag) Match(accessor TagValueIndexAccessor, registry TagSpecRegistry) (bool, error) {
+	expr, err := tagExpr(accessor, registry, h.Name)
 	if err != nil {
 		return false, err
 	}