You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ha...@apache.org on 2023/03/19 12:58:08 UTC

[skywalking-banyandb] branch main updated: [OAP Integration] Register missing TopN registry service (#257)

This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new 6be427d0 [OAP Integration] Register missing TopN registry service (#257)
6be427d0 is described below

commit 6be427d092b28e5dd31e5e3b181b775fe088e47a
Author: Jiajing LU <lu...@gmail.com>
AuthorDate: Sun Mar 19 20:58:02 2023 +0800

    [OAP Integration] Register missing TopN registry service (#257)
    
    * register missing service
    
    * support multiple groups in topn and use seriesID as identity
    
    * add null group
---
 CHANGES.md                                         |   1 +
 api/proto/banyandb/measure/v1/topn.proto           |   2 +-
 banyand/liaison/grpc/measure.go                    |  10 +-
 banyand/liaison/grpc/server.go                     |   1 +
 banyand/measure/measure_topn.go                    | 136 +++++++-----
 banyand/measure/measure_write.go                   |   9 +-
 banyand/query/processor_topn.go                    |  99 ++++++---
 banyand/tsdb/seriesdb.go                           |   6 +-
 docs/api-reference.md                              |   2 +-
 pkg/flow/streaming/streaming_test.go               |  30 ++-
 pkg/flow/streaming/topn.go                         | 109 ++++++----
 pkg/flow/streaming/topn_test.go                    |  73 +++++--
 pkg/flow/types.go                                  |   6 +
 .../service_instance_endpoint_cpm_minute.json      |  51 +++++
 .../service_cpm_minute_nogroup_top100.json         |  14 --
 ...ervice_instance_cpm_minute_top_bottom_100.json} |   6 +-
 ...stance_endpoint_cpm_minute_top_bottom_100.json} |   6 +-
 .../testdata/service_instance_cpm_minute_data.json | 212 +++++++++++++++++++
 .../service_instance_cpm_minute_data1.json         | 212 +++++++++++++++++++
 .../service_instance_cpm_minute_data2.json         |  72 +++++++
 .../service_instance_endpoint_cpm_minute_data.json | 228 +++++++++++++++++++++
 ...service_instance_endpoint_cpm_minute_data1.json | 228 +++++++++++++++++++++
 ...service_instance_endpoint_cpm_minute_data2.json |  82 ++++++++
 test/cases/topn/data/input/aggr_desc.yaml          |   2 +-
 test/cases/topn/data/input/asc.yaml                |   4 +-
 .../cases/topn/data/input/condition_aggr_desc.yaml |   6 +-
 test/cases/topn/data/input/desc.yaml               |   4 +-
 .../{condition_aggr_desc.yaml => null_group.yaml}  |   8 +-
 test/cases/topn/data/want/aggr_desc.yaml           |  30 ++-
 test/cases/topn/data/want/asc.yaml                 | 132 +++++++++++-
 test/cases/topn/data/want/condition_aggr_desc.yaml |  12 +-
 test/cases/topn/data/want/desc.yaml                | 132 +++++++++++-
 .../data/want/{aggr_desc.yaml => null_group.yaml}  |  32 ++-
 test/cases/topn/topn.go                            |   1 +
 test/integration/cold_query/query_suite_test.go    |   6 +
 test/integration/query/query_suite_test.go         |   6 +
 36 files changed, 1762 insertions(+), 208 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index 0ff0aefd..d4df9af1 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -8,6 +8,7 @@ Release Notes.
 
 - Add TSDB concept document.
 - [UI] Add YAML editor for inputting query criteria.
+- Refactor TopN to support `NULL` group while keeping seriesID from the source measure.
 
 ### Chores
 
diff --git a/api/proto/banyandb/measure/v1/topn.proto b/api/proto/banyandb/measure/v1/topn.proto
index 53644510..f7b426a1 100644
--- a/api/proto/banyandb/measure/v1/topn.proto
+++ b/api/proto/banyandb/measure/v1/topn.proto
@@ -33,7 +33,7 @@ message TopNList {
   // timestamp is in the timeunit of milliseconds.
   google.protobuf.Timestamp timestamp = 1;
   message Item {
-    string name = 1;
+    repeated model.v1.Tag entity = 1;
     model.v1.FieldValue value = 2;
   }
   // items contains top-n items in a list
diff --git a/banyand/liaison/grpc/measure.go b/banyand/liaison/grpc/measure.go
index 9d72fa6b..34f746a9 100644
--- a/banyand/liaison/grpc/measure.go
+++ b/banyand/liaison/grpc/measure.go
@@ -79,12 +79,10 @@ func (ms *measureService) Write(measure measurev1.MeasureService_WriteServer) er
 			continue
 		}
 		iwr := &measurev1.InternalWriteRequest{
-			Request:    writeRequest,
-			ShardId:    uint32(shardID),
-			SeriesHash: tsdb.HashEntity(entity),
-		}
-		if ms.log.Debug().Enabled() {
-			iwr.EntityValues = tagValues.Encode()
+			Request:      writeRequest,
+			ShardId:      uint32(shardID),
+			SeriesHash:   tsdb.HashEntity(entity),
+			EntityValues: tagValues.Encode(),
 		}
 		message := bus.NewMessage(bus.MessageID(time.Now().UnixNano()), iwr)
 		_, errWritePub := ms.pipeline.Publish(data.TopicMeasureWrite, message)
diff --git a/banyand/liaison/grpc/server.go b/banyand/liaison/grpc/server.go
index 4758585d..057486dc 100644
--- a/banyand/liaison/grpc/server.go
+++ b/banyand/liaison/grpc/server.go
@@ -199,6 +199,7 @@ func (s *server) Serve() run.StopNotify {
 	databasev1.RegisterStreamRegistryServiceServer(s.ser, s.streamRegistryServer)
 	databasev1.RegisterMeasureRegistryServiceServer(s.ser, s.measureRegistryServer)
 	propertyv1.RegisterPropertyServiceServer(s.ser, s.propertyServer)
+	databasev1.RegisterTopNAggregationRegistryServiceServer(s.ser, s.topNAggregationRegistryServer)
 	grpc_health_v1.RegisterHealthServer(s.ser, health.NewServer())
 
 	s.stopCh = make(chan struct{})
diff --git a/banyand/measure/measure_topn.go b/banyand/measure/measure_topn.go
index ac207ae0..1b5e6cd3 100644
--- a/banyand/measure/measure_topn.go
+++ b/banyand/measure/measure_topn.go
@@ -20,6 +20,7 @@ package measure
 import (
 	"context"
 	"encoding/base64"
+	"fmt"
 	"io"
 	"strconv"
 	"strings"
@@ -73,6 +74,11 @@ var (
 	}
 )
 
+type dataPointWithEntityValues struct {
+	*measurev1.DataPointValue
+	entityValues tsdb.EntityValues
+}
+
 type topNStreamingProcessor struct {
 	m             *measure
 	streamingFlow flow.Flow
@@ -134,7 +140,7 @@ func (t *topNStreamingProcessor) Close() error {
 }
 
 func (t *topNStreamingProcessor) writeStreamRecord(record flow.StreamRecord) error {
-	tuples, ok := record.Data().([]*streaming.Tuple2)
+	tuplesGroups, ok := record.Data().(map[string][]*streaming.Tuple2)
 	if !ok {
 		return errors.New("invalid data type")
 	}
@@ -142,24 +148,29 @@ func (t *topNStreamingProcessor) writeStreamRecord(record flow.StreamRecord) err
 	eventTime := t.downSampleTimeBucket(record.TimestampMillis())
 	timeBucket := eventTime.Format(timeBucketFormat)
 	var err error
-	if e := t.l.Debug(); e.Enabled() {
-		e.Str("TopN", t.topNSchema.GetMetadata().GetName()).
-			Int("rankNums", len(tuples)).
-			Msg("Write a tuple")
-	}
-	for rankNum, tuple := range tuples {
-		fieldValue := tuple.V1.(int64)
-		data := tuple.V2.(flow.StreamRecord).Data().(flow.Data)
-		err = multierr.Append(err, t.writeData(eventTime, timeBucket, fieldValue, data, rankNum))
+	for group, tuples := range tuplesGroups {
+		if e := t.l.Debug(); e.Enabled() {
+			e.Str("TopN", t.topNSchema.GetMetadata().GetName()).
+				Str("group", group).
+				Int("rankNums", len(tuples)).
+				Msg("Write tuples")
+		}
+		for rankNum, tuple := range tuples {
+			fieldValue := tuple.V1.(int64)
+			data := tuple.V2.(flow.StreamRecord).Data().(flow.Data)
+			err = multierr.Append(err, t.writeData(eventTime, timeBucket, fieldValue, group, data, rankNum))
+		}
 	}
 	return err
 }
 
-func (t *topNStreamingProcessor) writeData(eventTime time.Time, timeBucket string, fieldValue int64, data flow.Data, rankNum int) error {
+func (t *topNStreamingProcessor) writeData(eventTime time.Time, timeBucket string, fieldValue int64,
+	group string, data flow.Data, rankNum int,
+) error {
 	var tagValues []*modelv1.TagValue
 	if len(t.topNSchema.GetGroupByTagNames()) > 0 {
 		var ok bool
-		if tagValues, ok = data[2].([]*modelv1.TagValue); !ok {
+		if tagValues, ok = data[3].([]*modelv1.TagValue); !ok {
 			return errors.New("fail to extract tag values from topN result")
 		}
 	}
@@ -172,7 +183,7 @@ func (t *topNStreamingProcessor) writeData(eventTime time.Time, timeBucket strin
 	// 1. groupValues
 	// 2. rankNumber
 	// 3. timeBucket
-	measureID := data[0].(string) + "_" + strconv.Itoa(rankNum) + "_" + timeBucket
+	measureID := group + "_" + strconv.Itoa(rankNum) + "_" + timeBucket
 	iwr := &measurev1.InternalWriteRequest{
 		Request: &measurev1.WriteRequest{
 			Metadata: t.topNSchema.GetMetadata(),
@@ -180,7 +191,7 @@ func (t *topNStreamingProcessor) writeData(eventTime time.Time, timeBucket strin
 				Timestamp: timestamppb.New(eventTime),
 				TagFamilies: []*modelv1.TagFamilyForWrite{
 					{
-						Tags: []*modelv1.TagValue{
+						Tags: append([]*modelv1.TagValue{
 							// MeasureID
 							{
 								Value: &modelv1.TagValue_Id{
@@ -189,15 +200,7 @@ func (t *topNStreamingProcessor) writeData(eventTime time.Time, timeBucket strin
 									},
 								},
 							},
-							// GroupValues for merge in post processor
-							{
-								Value: &modelv1.TagValue_Str{
-									Str: &modelv1.Str{
-										Value: data[0].(string),
-									},
-								},
-							},
-						},
+						}, data[0].(tsdb.EntityValues)...),
 					},
 				},
 				Fields: []*modelv1.FieldValue{
@@ -227,14 +230,15 @@ func (t *topNStreamingProcessor) downSampleTimeBucket(eventTimeMillis int64) tim
 }
 
 func (t *topNStreamingProcessor) locate(tagValues []*modelv1.TagValue, rankNum int) (tsdb.Entity, tsdb.EntityValues, common.ShardID, error) {
-	if len(t.topNSchema.GetGroupByTagNames()) != len(tagValues) {
+	if len(tagValues) != 0 && len(t.topNSchema.GetGroupByTagNames()) != len(tagValues) {
 		return nil, nil, 0, errors.New("no enough tag values for the entity")
 	}
 	// entity prefix
 	// 1) source measure Name + topN aggregation Name
 	// 2) sort direction
 	// 3) rank number
-	entity := make(tsdb.EntityValues, 1+1+1+len(t.topNSchema.GetGroupByTagNames()))
+	// >4) group tag values if needed
+	entity := make(tsdb.EntityValues, 1+1+1+len(tagValues))
 	// entity prefix
 	entity[0] = tsdb.StrValue(formatMeasureCompanionPrefix(t.topNSchema.GetSourceMeasure().GetName(),
 		t.topNSchema.GetMetadata().GetName()))
@@ -260,9 +264,12 @@ func (t *topNStreamingProcessor) start() *topNStreamingProcessor {
 		AllowedMaxWindows(int(t.topNSchema.GetLruSize())).
 		TopN(int(t.topNSchema.GetCountersNumber()),
 			streaming.WithSortKeyExtractor(func(record flow.StreamRecord) int64 {
-				return record.Data().(flow.Data)[1].(int64)
+				return record.Data().(flow.Data)[2].(int64)
 			}),
 			orderBy(t.topNSchema.GetFieldValueSort()),
+			streaming.WithGroupKeyExtractor(func(record flow.StreamRecord) string {
+				return record.Data().(flow.Data)[1].(string)
+			}),
 		).To(t).Open()
 	go t.handleError()
 	return t
@@ -307,13 +314,16 @@ func (manager *topNProcessorManager) Close() error {
 	return err
 }
 
-func (manager *topNProcessorManager) onMeasureWrite(request *measurev1.WriteRequest) {
+func (manager *topNProcessorManager) onMeasureWrite(request *measurev1.InternalWriteRequest) {
 	go func() {
 		manager.RLock()
 		defer manager.RUnlock()
 		for _, processorList := range manager.processorMap {
 			for _, processor := range processorList {
-				processor.src <- flow.NewStreamRecordWithTimestampPb(request.GetDataPoint(), request.GetDataPoint().GetTimestamp())
+				processor.src <- flow.NewStreamRecordWithTimestampPb(&dataPointWithEntityValues{
+					request.GetRequest().GetDataPoint(),
+					request.GetEntityValues(),
+				}, request.GetRequest().GetDataPoint().GetTimestamp())
 			}
 		}
 	}()
@@ -325,6 +335,27 @@ func (manager *topNProcessorManager) createOrUpdateTopNMeasure(topNSchema *datab
 		return err
 	}
 
+	tagNames := manager.m.GetSchema().GetEntity().GetTagNames()
+	seriesSpecs := make([]*databasev1.TagSpec, 0, len(tagNames))
+
+	for _, tagName := range tagNames {
+		var found bool
+		for _, fSpec := range manager.m.GetSchema().GetTagFamilies() {
+			for _, tSpec := range fSpec.GetTags() {
+				if tSpec.GetName() == tagName {
+					seriesSpecs = append(seriesSpecs, tSpec)
+					found = true
+					goto CHECK
+				}
+			}
+		}
+
+	CHECK:
+		if !found {
+			return fmt.Errorf("fail to find tag spec %s", tagName)
+		}
+	}
+
 	// create a new "derived" measure for TopN result
 	newTopNMeasure := &databasev1.Measure{
 		Metadata: topNSchema.GetMetadata(),
@@ -332,16 +363,12 @@ func (manager *topNProcessorManager) createOrUpdateTopNMeasure(topNSchema *datab
 		TagFamilies: []*databasev1.TagFamilySpec{
 			{
 				Name: TopNTagFamily,
-				Tags: []*databasev1.TagSpec{
+				Tags: append([]*databasev1.TagSpec{
 					{
 						Name: "measure_id",
 						Type: databasev1.TagType_TAG_TYPE_ID,
 					},
-					{
-						Name: "group_values",
-						Type: databasev1.TagType_TAG_TYPE_STRING,
-					},
-				},
+				}, seriesSpecs...),
 			},
 		},
 		Fields: []*databasev1.FieldSpec{TopNValueFieldSpec},
@@ -425,8 +452,8 @@ func (manager *topNProcessorManager) buildFilter(criteria *modelv1.Criteria) (fl
 		return nil, err
 	}
 
-	return func(_ context.Context, dataPoint any) bool {
-		tffws := dataPoint.(*measurev1.DataPointValue).GetTagFamilies()
+	return func(_ context.Context, request any) bool {
+		tffws := request.(*dataPointWithEntityValues).GetTagFamilies()
 		ok, matchErr := f.Match(logical.TagFamiliesForWrite(tffws), manager.s)
 		if matchErr != nil {
 			manager.l.Err(matchErr).Msg("fail to match criteria")
@@ -445,20 +472,22 @@ func (manager *topNProcessorManager) buildMapper(fieldName string, groupByNames
 	}
 	if len(groupByNames) == 0 {
 		return func(_ context.Context, request any) any {
-			dataPoint := request.(*measurev1.DataPointValue)
-			if len(dataPoint.GetFields()) <= fieldIdx {
-				manager.l.Warn().Interface("point", dataPoint).
+			dpWithEvs := request.(*dataPointWithEntityValues)
+			if len(dpWithEvs.GetFields()) <= fieldIdx {
+				manager.l.Warn().Interface("point", dpWithEvs.DataPointValue).
 					Str("fieldName", fieldName).
-					Int("len", len(dataPoint.GetFields())).
+					Int("len", len(dpWithEvs.GetFields())).
 					Int("fieldIdx", fieldIdx).
 					Msg("out of range")
 			}
 			return flow.Data{
+				// EntityValues as identity
+				dpWithEvs.entityValues,
 				// save string representation of group values as the key, i.e. v1
 				"",
 				// field value as v2
 				// TODO: we only support int64
-				dataPoint.GetFields()[fieldIdx].GetInt().GetValue(),
+				dpWithEvs.GetFields()[fieldIdx].GetInt().GetValue(),
 				// groupBy tag values as v3
 				nil,
 			}
@@ -469,18 +498,20 @@ func (manager *topNProcessorManager) buildMapper(fieldName string, groupByNames
 		return nil, err
 	}
 	return func(_ context.Context, request any) any {
-		dataPoint := request.(*measurev1.DataPointValue)
+		dpWithEvs := request.(*dataPointWithEntityValues)
 		return flow.Data{
+			// EntityValues as identity
+			dpWithEvs.entityValues,
 			// save string representation of group values as the key, i.e. v1
 			strings.Join(transform(groupLocator, func(locator partition.TagLocator) string {
-				return stringify(dataPoint.GetTagFamilies()[locator.FamilyOffset].GetTags()[locator.TagOffset])
+				return stringify(extractTagValue(dpWithEvs.DataPointValue, locator))
 			}), "|"),
 			// field value as v2
 			// TODO: we only support int64
-			dataPoint.GetFields()[fieldIdx].GetInt().GetValue(),
+			dpWithEvs.GetFields()[fieldIdx].GetInt().GetValue(),
 			// groupBy tag values as v3
 			transform(groupLocator, func(locator partition.TagLocator) *modelv1.TagValue {
-				return dataPoint.GetTagFamilies()[locator.FamilyOffset].GetTags()[locator.TagOffset]
+				return extractTagValue(dpWithEvs.DataPointValue, locator)
 			}),
 		}
 	}, nil
@@ -495,7 +526,7 @@ func newGroupLocator(m *databasev1.Measure, groupByNames []string) (groupTagsLoc
 	for _, groupByName := range groupByNames {
 		fIdx, tIdx, spec := pbv1.FindTagByName(m.GetTagFamilies(), groupByName)
 		if spec == nil {
-			return nil, errors.New("tag is not found")
+			return nil, fmt.Errorf("tag %s is not found", groupByName)
 		}
 		groupTags = append(groupTags, partition.TagLocator{
 			FamilyOffset: fIdx,
@@ -505,6 +536,17 @@ func newGroupLocator(m *databasev1.Measure, groupByNames []string) (groupTagsLoc
 	return groupTags, nil
 }
 
+func extractTagValue(dpv *measurev1.DataPointValue, locator partition.TagLocator) *modelv1.TagValue {
+	if locator.FamilyOffset >= len(dpv.GetTagFamilies()) {
+		return &modelv1.TagValue{Value: &modelv1.TagValue_Null{}}
+	}
+	tagFamily := dpv.GetTagFamilies()[locator.FamilyOffset]
+	if locator.TagOffset >= len(tagFamily.GetTags()) {
+		return &modelv1.TagValue{Value: &modelv1.TagValue_Null{}}
+	}
+	return tagFamily.GetTags()[locator.TagOffset]
+}
+
 func stringify(tagValue *modelv1.TagValue) string {
 	switch v := tagValue.GetValue().(type) {
 	case *modelv1.TagValue_Str:
@@ -522,7 +564,7 @@ func stringify(tagValue *modelv1.TagValue) string {
 	case *modelv1.TagValue_StrArray:
 		return strings.Join(v.StrArray.GetValue(), ",")
 	default:
-		return "<nil>"
+		return ""
 	}
 }
 
diff --git a/banyand/measure/measure_write.go b/banyand/measure/measure_write.go
index 2c2eb233..b3615741 100644
--- a/banyand/measure/measure_write.go
+++ b/banyand/measure/measure_write.go
@@ -131,9 +131,12 @@ func (s *measure) write(shardID common.ShardID, entity []byte, entityValues tsdb
 	}
 	s.indexWriter.Write(m)
 	if s.processorManager != nil {
-		s.processorManager.onMeasureWrite(&measurev1.WriteRequest{
-			Metadata:  s.GetMetadata(),
-			DataPoint: value,
+		s.processorManager.onMeasureWrite(&measurev1.InternalWriteRequest{
+			Request: &measurev1.WriteRequest{
+				Metadata:  s.GetMetadata(),
+				DataPoint: value,
+			},
+			EntityValues: entityValues[1:],
 		})
 	}
 	return err
diff --git a/banyand/query/processor_topn.go b/banyand/query/processor_topn.go
index e72520af..336dc8e6 100644
--- a/banyand/query/processor_topn.go
+++ b/banyand/query/processor_topn.go
@@ -130,7 +130,7 @@ func (t *topNQueryProcessor) Rev(message bus.Message) (resp bus.Message) {
 							Msg("fail to parse topN family")
 						return
 					}
-					_ = aggregator.put(tuple.V1.(string), tuple.V2.(int64), iter.Val().Time())
+					_ = aggregator.put(tuple.V1.([]*modelv1.TagValue), tuple.V2.(int64), iter.Val().Time())
 				}
 				_ = iter.Close()
 			}
@@ -138,12 +138,14 @@ func (t *topNQueryProcessor) Rev(message bus.Message) (resp bus.Message) {
 	}
 
 	now := time.Now().UnixNano()
-	resp = bus.NewMessage(bus.MessageID(now), aggregator.val())
+	resp = bus.NewMessage(bus.MessageID(now), aggregator.val(sourceMeasure.GetSchema().GetEntity().GetTagNames()))
 
 	return
 }
 
-func locateEntity(topNSchema *databasev1.TopNAggregation, sortDirection modelv1.Sort, conditions []*modelv1.Condition) (tsdb.Entity, error) {
+func locateEntity(topNSchema *databasev1.TopNAggregation, sortDirection modelv1.Sort,
+	conditions []*modelv1.Condition,
+) (tsdb.Entity, error) {
 	entityMap := make(map[string]int)
 	entity := make([]tsdb.Entry, 1+1+len(topNSchema.GetGroupByTagNames()))
 	// sortDirection
@@ -152,7 +154,7 @@ func locateEntity(topNSchema *databasev1.TopNAggregation, sortDirection modelv1.
 	entity[1] = tsdb.AnyEntry
 	for idx, tagName := range topNSchema.GetGroupByTagNames() {
 		entityMap[tagName] = idx + 2
-		// fill AnyEntry by default
+		// allow to make fuzzy search with partial conditions
 		entity[idx+2] = tsdb.AnyEntry
 	}
 	for _, pairQuery := range conditions {
@@ -174,6 +176,7 @@ func locateEntity(topNSchema *databasev1.TopNAggregation, sortDirection modelv1.
 		}
 		return nil, errors.New("only groupBy tag name is supported")
 	}
+
 	return entity, nil
 }
 
@@ -198,7 +201,7 @@ func parseTopNFamily(item tsdb.Item, interval time.Duration) (*streaming.Tuple2,
 	}
 	return &streaming.Tuple2{
 		// GroupValues
-		V1: tagFamily.GetTags()[1].GetStr().GetValue(),
+		V1: tagFamily.GetTags()[1:],
 		// FieldValue
 		V2: fieldValue.GetInt().GetValue(),
 	}, nil
@@ -238,13 +241,25 @@ var _ heap.Interface = (*postAggregationProcessor)(nil)
 type aggregatorItem struct {
 	int64Func aggregation.Func[int64]
 	key       string
+	values    tsdb.EntityValues
 	index     int
 }
 
+func (n *aggregatorItem) GetTags(tagNames []string) []*modelv1.Tag {
+	tags := make([]*modelv1.Tag, len(n.values))
+	for i := 0; i < len(tags); i++ {
+		tags[i] = &modelv1.Tag{
+			Key:   tagNames[i],
+			Value: n.values[i],
+		}
+	}
+	return tags
+}
+
 // postProcessor defines necessary methods for Top-N post processor with or without aggregation.
 type postProcessor interface {
-	put(key string, val int64, timestampMillis uint64) error
-	val() []*measurev1.TopNList
+	put(entityValues tsdb.EntityValues, val int64, timestampMillis uint64) error
+	val([]string) []*measurev1.TopNList
 }
 
 func createTopNPostAggregator(topN int32, aggrFunc modelv1.AggregationFunction, sort modelv1.Sort) postProcessor {
@@ -261,6 +276,7 @@ func createTopNPostAggregator(topN int32, aggrFunc modelv1.AggregationFunction,
 		sort:     sort,
 		aggrFunc: aggrFunc,
 		cache:    make(map[string]*aggregatorItem),
+		items:    make([]*aggregatorItem, 0, topN),
 	}
 	heap.Init(aggregator)
 	return aggregator
@@ -313,14 +329,15 @@ func (aggr *postAggregationProcessor) Pop() any {
 	return item
 }
 
-func (aggr *postAggregationProcessor) put(key string, val int64, timestampMillis uint64) error {
+func (aggr *postAggregationProcessor) put(entityValues tsdb.EntityValues, val int64, timestampMillis uint64) error {
 	// update latest ts
 	if aggr.latestTimestamp < timestampMillis {
 		aggr.latestTimestamp = timestampMillis
 	}
+	key := entityValues.String()
 	if item, found := aggr.cache[key]; found {
 		item.int64Func.In(val)
-		heap.Fix(aggr, item.index)
+		aggr.tryEnqueue(key, item)
 		return nil
 	}
 
@@ -331,6 +348,7 @@ func (aggr *postAggregationProcessor) put(key string, val int64, timestampMillis
 	item := &aggregatorItem{
 		key:       key,
 		int64Func: aggrFunc,
+		values:    entityValues,
 	}
 	item.int64Func.In(val)
 
@@ -338,29 +356,33 @@ func (aggr *postAggregationProcessor) put(key string, val int64, timestampMillis
 		aggr.cache[key] = item
 		heap.Push(aggr, item)
 	} else {
-		if lowest := aggr.items[0]; lowest != nil {
-			if aggr.sort == modelv1.Sort_SORT_DESC && lowest.int64Func.Val() < val {
-				aggr.cache[key] = item
-				aggr.items[0] = item
-				heap.Fix(aggr, 0)
-			} else if aggr.sort != modelv1.Sort_SORT_DESC && lowest.int64Func.Val() > val {
-				aggr.cache[key] = item
-				aggr.items[0] = item
-				heap.Fix(aggr, 0)
-			}
-		}
+		aggr.tryEnqueue(key, item)
 	}
 
 	return nil
 }
 
-func (aggr *postAggregationProcessor) val() []*measurev1.TopNList {
+func (aggr *postAggregationProcessor) tryEnqueue(key string, item *aggregatorItem) {
+	if lowest := aggr.items[0]; lowest != nil {
+		if aggr.sort == modelv1.Sort_SORT_DESC && lowest.int64Func.Val() < item.int64Func.Val() {
+			aggr.cache[key] = item
+			aggr.items[0] = item
+			heap.Fix(aggr, 0)
+		} else if aggr.sort != modelv1.Sort_SORT_DESC && lowest.int64Func.Val() > item.int64Func.Val() {
+			aggr.cache[key] = item
+			aggr.items[0] = item
+			heap.Fix(aggr, 0)
+		}
+	}
+}
+
+func (aggr *postAggregationProcessor) val(tagNames []string) []*measurev1.TopNList {
 	topNItems := make([]*measurev1.TopNList_Item, aggr.Len())
 
 	for aggr.Len() > 0 {
 		item := heap.Pop(aggr).(*aggregatorItem)
 		topNItems[aggr.Len()] = &measurev1.TopNList_Item{
-			Name: item.key,
+			Entity: item.GetTags(tagNames),
 			Value: &modelv1.FieldValue{
 				Value: &modelv1.FieldValue_Int{
 					Int: &modelv1.Int{Value: item.int64Func.Val()},
@@ -379,9 +401,21 @@ func (aggr *postAggregationProcessor) val() []*measurev1.TopNList {
 var _ flow.Element = (*nonAggregatorItem)(nil)
 
 type nonAggregatorItem struct {
-	key   string
-	val   int64
-	index int
+	key    string
+	values tsdb.EntityValues
+	val    int64
+	index  int
+}
+
+func (n *nonAggregatorItem) GetTags(tagNames []string) []*modelv1.Tag {
+	tags := make([]*modelv1.Tag, len(n.values))
+	for i := 0; i < len(tags); i++ {
+		tags[i] = &modelv1.Tag{
+			Key:   tagNames[i],
+			Value: n.values[i],
+		}
+	}
+	return tags
 }
 
 func (n *nonAggregatorItem) GetIndex() int {
@@ -398,13 +432,13 @@ type postNonAggregationProcessor struct {
 	sort      modelv1.Sort
 }
 
-func (naggr *postNonAggregationProcessor) val() []*measurev1.TopNList {
+func (naggr *postNonAggregationProcessor) val(tagNames []string) []*measurev1.TopNList {
 	topNLists := make([]*measurev1.TopNList, 0, len(naggr.timelines))
 	for ts, timeline := range naggr.timelines {
 		items := make([]*measurev1.TopNList_Item, timeline.Len())
 		for idx, elem := range timeline.Values() {
 			items[idx] = &measurev1.TopNList_Item{
-				Name: elem.(*nonAggregatorItem).key,
+				Entity: elem.(*nonAggregatorItem).GetTags(tagNames),
 				Value: &modelv1.FieldValue{
 					Value: &modelv1.FieldValue_Int{
 						Int: &modelv1.Int{Value: elem.(*nonAggregatorItem).val},
@@ -430,16 +464,17 @@ func (naggr *postNonAggregationProcessor) val() []*measurev1.TopNList {
 	return topNLists
 }
 
-func (naggr *postNonAggregationProcessor) put(key string, val int64, timestampMillis uint64) error {
+func (naggr *postNonAggregationProcessor) put(entityValues tsdb.EntityValues, val int64, timestampMillis uint64) error {
+	key := entityValues.String()
 	if timeline, ok := naggr.timelines[timestampMillis]; ok {
 		if timeline.Len() < int(naggr.topN) {
-			heap.Push(timeline, &nonAggregatorItem{val: val, key: key})
+			heap.Push(timeline, &nonAggregatorItem{val: val, key: key, values: entityValues})
 		} else {
 			if lowest := timeline.Peek(); lowest != nil {
 				if naggr.sort == modelv1.Sort_SORT_DESC && lowest.(*nonAggregatorItem).val < val {
-					timeline.ReplaceLowest(&nonAggregatorItem{val: val, key: key})
+					timeline.ReplaceLowest(&nonAggregatorItem{val: val, key: key, values: entityValues})
 				} else if naggr.sort != modelv1.Sort_SORT_DESC && lowest.(*nonAggregatorItem).val > val {
-					timeline.ReplaceLowest(&nonAggregatorItem{val: val, key: key})
+					timeline.ReplaceLowest(&nonAggregatorItem{val: val, key: key, values: entityValues})
 				}
 			}
 		}
@@ -465,7 +500,7 @@ func (naggr *postNonAggregationProcessor) put(key string, val int64, timestampMi
 		}
 	}, false)
 	naggr.timelines[timestampMillis] = timeline
-	heap.Push(timeline, &nonAggregatorItem{val: val, key: key})
+	heap.Push(timeline, &nonAggregatorItem{val: val, key: key, values: entityValues})
 
 	return nil
 }
diff --git a/banyand/tsdb/seriesdb.go b/banyand/tsdb/seriesdb.go
index 33e1798c..3fc996c4 100644
--- a/banyand/tsdb/seriesdb.go
+++ b/banyand/tsdb/seriesdb.go
@@ -92,7 +92,7 @@ func NewEntity(length int) Entity {
 }
 
 // EntityValue represents the value of a tag which is a part of an entity.
-type EntityValue *modelv1.TagValue
+type EntityValue = *modelv1.TagValue
 
 // EntityValueToEntry transforms EntityValue to Entry.
 func EntityValueToEntry(ev EntityValue) (Entry, error) {
@@ -404,6 +404,10 @@ func (s *seriesDB) List(ctx context.Context, path Path) (SeriesList, error) {
 	errScan := s.seriesMetadata.Scan(prefix, prepend(path.seekKey, entityPrefix), kv.DefaultScanOpts, func(key []byte, getVal func() ([]byte, error)) error {
 		key = key[entityPrefixLen:]
 		comparableKey := make([]byte, len(key))
+		// avoid slice out of bound
+		if len(key) > len(path.mask) {
+			return nil
+		}
 		for i, b := range key {
 			comparableKey[i] = path.mask[i] & b
 		}
diff --git a/docs/api-reference.md b/docs/api-reference.md
index 6d05c457..71ca74c4 100644
--- a/docs/api-reference.md
+++ b/docs/api-reference.md
@@ -2468,7 +2468,7 @@ TopNList contains a series of topN items
 
 | Field | Type | Label | Description |
 | ----- | ---- | ----- | ----------- |
-| name | [string](#string) |  |  |
+| entity | [banyandb.model.v1.Tag](#banyandb-model-v1-Tag) | repeated |  |
 | value | [banyandb.model.v1.FieldValue](#banyandb-model-v1-FieldValue) |  |  |
 
 
diff --git a/pkg/flow/streaming/streaming_test.go b/pkg/flow/streaming/streaming_test.go
index a89e10ef..437af1ec 100644
--- a/pkg/flow/streaming/streaming_test.go
+++ b/pkg/flow/streaming/streaming_test.go
@@ -151,14 +151,16 @@ var _ = Describe("Streaming", func() {
 				Window(NewTumblingTimeWindows(15*time.Second)).
 				TopN(3, WithSortKeyExtractor(func(record flow.StreamRecord) int64 {
 					return record.Data().(flow.Data)[1].(int64)
-				}), OrderBy(ASC)).
+				}), OrderBy(ASC), WithGroupKeyExtractor(func(record flow.StreamRecord) string {
+					return record.Data().(flow.Data)[0].(string)
+				})).
 				To(snk)
 
 			errCh = f.Open()
 			Expect(errCh).ShouldNot(BeNil())
 		})
 
-		When("Top3", func() {
+		When("Bottom3", func() {
 			BeforeEach(func() {
 				input = []flow.StreamRecord{
 					flow.NewStreamRecord(&record{"e2e-service-provider", "instance-001", 10000}, 1000),
@@ -172,14 +174,21 @@ var _ = Describe("Streaming", func() {
 				}
 			})
 
-			It("Should take top 3 elements", func() {
+			It("Should take bottom 3 elements", func() {
 				Eventually(func(g Gomega) {
 					g.Expect(len(snk.Value())).Should(BeNumerically(">=", 1))
-					g.Expect(snk.Value()[0].(flow.StreamRecord).Data()).Should(BeEquivalentTo([]*Tuple2{
+					// e2e-service-consumer Group
+					g.Expect(snk.Value()[0].(flow.StreamRecord).Data().(map[string][]*Tuple2)["e2e-service-consumer"]).Should(BeEquivalentTo([]*Tuple2{
 						{int64(9500), flow.NewStreamRecord(flow.Data{"e2e-service-consumer", int64(9500)}, 7000)},
 						{int64(9600), flow.NewStreamRecord(flow.Data{"e2e-service-consumer", int64(9600)}, 6000)},
 						{int64(9700), flow.NewStreamRecord(flow.Data{"e2e-service-consumer", int64(9700)}, 4000)},
 					}))
+					// e2e-service-provider Group
+					g.Expect(snk.Value()[0].(flow.StreamRecord).Data().(map[string][]*Tuple2)["e2e-service-provider"]).Should(BeEquivalentTo([]*Tuple2{
+						{int64(9700), flow.NewStreamRecord(flow.Data{"e2e-service-provider", int64(9700)}, 5000)},
+						{int64(9800), flow.NewStreamRecord(flow.Data{"e2e-service-provider", int64(9800)}, 3000)},
+						{int64(10000), flow.NewStreamRecord(flow.Data{"e2e-service-provider", int64(10000)}, 1000)},
+					}))
 				}).WithTimeout(flags.EventuallyTimeout).Should(Succeed())
 			})
 		})
@@ -205,6 +214,8 @@ var _ = Describe("Streaming", func() {
 				Window(NewTumblingTimeWindows(15*time.Second)).
 				TopN(3, WithSortKeyExtractor(func(record flow.StreamRecord) int64 {
 					return record.Data().(flow.Data)[1].(int64)
+				}), WithGroupKeyExtractor(func(record flow.StreamRecord) string {
+					return record.Data().(flow.Data)[0].(string)
 				})).
 				To(snk)
 
@@ -229,10 +240,17 @@ var _ = Describe("Streaming", func() {
 			It("Should take top 3 elements", func() {
 				Eventually(func(g Gomega) {
 					g.Expect(len(snk.Value())).Should(BeNumerically(">=", 1))
-					g.Expect(snk.Value()[0].(flow.StreamRecord).Data()).Should(BeEquivalentTo([]*Tuple2{
-						{int64(10000), flow.NewStreamRecord(flow.Data{"e2e-service-provider", int64(10000)}, 1000)},
+					// e2e-service-consumer Group
+					g.Expect(snk.Value()[0].(flow.StreamRecord).Data().(map[string][]*Tuple2)["e2e-service-consumer"]).Should(BeEquivalentTo([]*Tuple2{
 						{int64(9900), flow.NewStreamRecord(flow.Data{"e2e-service-consumer", int64(9900)}, 2000)},
+						{int64(9700), flow.NewStreamRecord(flow.Data{"e2e-service-consumer", int64(9700)}, 4000)},
+						{int64(9600), flow.NewStreamRecord(flow.Data{"e2e-service-consumer", int64(9600)}, 6000)},
+					}))
+					// e2e-service-provider Group
+					g.Expect(snk.Value()[0].(flow.StreamRecord).Data().(map[string][]*Tuple2)["e2e-service-provider"]).Should(BeEquivalentTo([]*Tuple2{
+						{int64(10000), flow.NewStreamRecord(flow.Data{"e2e-service-provider", int64(10000)}, 1000)},
 						{int64(9800), flow.NewStreamRecord(flow.Data{"e2e-service-provider", int64(9800)}, 3000)},
+						{int64(9700), flow.NewStreamRecord(flow.Data{"e2e-service-provider", int64(9700)}, 5000)},
 					}))
 				}).WithTimeout(flags.EventuallyTimeout).Should(Succeed())
 			})
diff --git a/pkg/flow/streaming/topn.go b/pkg/flow/streaming/topn.go
index de63c7d8..746faa66 100644
--- a/pkg/flow/streaming/topn.go
+++ b/pkg/flow/streaming/topn.go
@@ -41,10 +41,9 @@ type windowedFlow struct {
 
 func (s *windowedFlow) TopN(topNum int, opts ...any) flow.Flow {
 	s.wa.(*tumblingTimeWindows).aggregationFactory = func() flow.AggregationOp {
-		topNAggrFunc := &topNAggregator{
+		topNAggrFunc := &topNAggregatorGroup{
 			cacheSize: topNum,
 			sort:      DESC,
-			dirty:     false,
 		}
 		// apply user customized options
 		for _, opt := range opts {
@@ -62,50 +61,105 @@ func (s *windowedFlow) TopN(topNum int, opts ...any) flow.Flow {
 				return utils.Int64Comparator(b, a)
 			}
 		}
-		topNAggrFunc.treeMap = treemap.NewWith(topNAggrFunc.comparator)
+		topNAggrFunc.aggregatorGroup = make(map[string]*topNAggregator)
 		return topNAggrFunc
 	}
 	return s.f
 }
 
+type topNAggregatorGroup struct {
+	aggregatorGroup   map[string]*topNAggregator
+	sortKeyExtractor  func(flow.StreamRecord) int64
+	groupKeyExtractor func(flow.StreamRecord) string
+	comparator        utils.Comparator
+	cacheSize         int
+	sort              TopNSort
+}
+
 type topNAggregator struct {
-	treeMap          *treemap.Map
-	sortKeyExtractor func(flow.StreamRecord) int64
-	comparator       utils.Comparator
-	cacheSize        int
-	currentTopNum    int
-	sort             TopNSort
-	dirty            bool
+	*topNAggregatorGroup
+	treeMap       *treemap.Map
+	currentTopNum int
+	dirty         bool
 }
 
-// TopNOption is the option to set up a top-n aggregator.
-type TopNOption func(aggregator *topNAggregator)
+// TopNOption is the option to set up a top-n aggregator group.
+type TopNOption func(aggregator *topNAggregatorGroup)
 
 // WithSortKeyExtractor sets a closure to extract the sorting key.
 func WithSortKeyExtractor(sortKeyExtractor func(flow.StreamRecord) int64) TopNOption {
-	return func(aggregator *topNAggregator) {
+	return func(aggregator *topNAggregatorGroup) {
 		aggregator.sortKeyExtractor = sortKeyExtractor
 	}
 }
 
+// WithGroupKeyExtractor extract group key from the StreamRecord.
+func WithGroupKeyExtractor(groupKeyExtractor func(flow.StreamRecord) string) TopNOption {
+	return func(aggregator *topNAggregatorGroup) {
+		aggregator.groupKeyExtractor = groupKeyExtractor
+	}
+}
+
 // OrderBy sets the sorting order.
 func OrderBy(sort TopNSort) TopNOption {
-	return func(aggregator *topNAggregator) {
+	return func(aggregator *topNAggregatorGroup) {
 		aggregator.sort = sort
 	}
 }
 
-func (t *topNAggregator) Add(input []flow.StreamRecord) {
+func (t *topNAggregatorGroup) Add(input []flow.StreamRecord) {
 	for _, item := range input {
 		sortKey := t.sortKeyExtractor(item)
-		// check
-		if t.checkSortKeyInBufferRange(sortKey) {
-			t.put(sortKey, item)
-			t.doCleanUp()
+		groupKey := t.groupKeyExtractor(item)
+		aggregator := t.getOrCreateGroup(groupKey)
+		if aggregator.checkSortKeyInBufferRange(sortKey) {
+			aggregator.put(sortKey, item)
+			aggregator.doCleanUp()
 		}
 	}
 }
 
+func (t *topNAggregatorGroup) Snapshot() interface{} {
+	groupRanks := make(map[string][]*Tuple2)
+	for group, aggregator := range t.aggregatorGroup {
+		if !aggregator.dirty {
+			continue
+		}
+		aggregator.dirty = false
+		iter := aggregator.treeMap.Iterator()
+		items := make([]*Tuple2, 0, aggregator.currentTopNum)
+		for iter.Next() {
+			list := iter.Value().([]interface{})
+			for _, item := range list {
+				items = append(items, &Tuple2{iter.Key(), item})
+			}
+		}
+		groupRanks[group] = items
+	}
+	return groupRanks
+}
+
+func (t *topNAggregatorGroup) Dirty() bool {
+	for _, aggregator := range t.aggregatorGroup {
+		if aggregator.dirty {
+			return true
+		}
+	}
+	return false
+}
+
+func (t *topNAggregatorGroup) getOrCreateGroup(group string) *topNAggregator {
+	aggregator, groupExist := t.aggregatorGroup[group]
+	if groupExist {
+		return aggregator
+	}
+	t.aggregatorGroup[group] = &topNAggregator{
+		topNAggregatorGroup: t,
+		treeMap:             treemap.NewWith(t.comparator),
+	}
+	return t.aggregatorGroup[group]
+}
+
 func (t *topNAggregator) doCleanUp() {
 	// do cleanup: maintain the treeMap windowSize
 	if t.currentTopNum > t.cacheSize {
@@ -148,23 +202,6 @@ func (t *topNAggregator) checkSortKeyInBufferRange(sortKey int64) bool {
 	return t.currentTopNum < t.cacheSize
 }
 
-func (t *topNAggregator) Snapshot() interface{} {
-	t.dirty = false
-	iter := t.treeMap.Iterator()
-	items := make([]*Tuple2, 0, t.currentTopNum)
-	for iter.Next() {
-		list := iter.Value().([]interface{})
-		for _, item := range list {
-			items = append(items, &Tuple2{iter.Key(), item})
-		}
-	}
-	return items
-}
-
-func (t *topNAggregator) Dirty() bool {
-	return t.dirty
-}
-
 // Tuple2 is a tuple with 2 fields. Each field may be a separate type.
 type Tuple2 struct {
 	V1 interface{} `json:"v1"`
diff --git a/pkg/flow/streaming/topn_test.go b/pkg/flow/streaming/topn_test.go
index bc2592ee..988fe825 100644
--- a/pkg/flow/streaming/topn_test.go
+++ b/pkg/flow/streaming/topn_test.go
@@ -20,8 +20,8 @@ package streaming
 import (
 	"testing"
 
-	"github.com/emirpasic/gods/maps/treemap"
 	"github.com/emirpasic/gods/utils"
+	"github.com/google/go-cmp/cmp"
 	"github.com/stretchr/testify/require"
 
 	"github.com/apache/skywalking-banyandb/pkg/flow"
@@ -29,7 +29,7 @@ import (
 
 func TestFlow_TopN_Aggregator(t *testing.T) {
 	input := []flow.StreamRecord{
-		// 1. string
+		// 1. group by values
 		// 2. number
 		// 3. slices of groupBy values
 		flow.NewStreamRecordWithoutTS(flow.Data{"e2e-service-provider", 10000, []interface{}{"e2e-service-provider"}}),
@@ -42,35 +42,56 @@ func TestFlow_TopN_Aggregator(t *testing.T) {
 		flow.NewStreamRecordWithoutTS(flow.Data{"e2e-service-consumer", 9500, []interface{}{"e2e-service-consumer"}}),
 	}
 	tests := []struct {
+		expected map[string][]*Tuple2
 		name     string
-		expected []*Tuple2
 		sort     TopNSort
 	}{
 		{
 			name: "DESC",
 			sort: DESC,
-			expected: []*Tuple2{
-				{int64(10000), flow.NewStreamRecordWithoutTS(flow.Data{"e2e-service-provider", 10000, []interface{}{"e2e-service-provider"}})},
-				{int64(9900), flow.NewStreamRecordWithoutTS(flow.Data{"e2e-service-consumer", 9900, []interface{}{"e2e-service-consumer"}})},
-				{int64(9800), flow.NewStreamRecordWithoutTS(flow.Data{"e2e-service-provider", 9800, []interface{}{"e2e-service-provider"}})},
+			expected: map[string][]*Tuple2{
+				"e2e-service-provider": {
+					{int64(10000), flow.NewStreamRecordWithoutTS(flow.Data{"e2e-service-provider", 10000, []interface{}{"e2e-service-provider"}})},
+					{int64(9800), flow.NewStreamRecordWithoutTS(flow.Data{"e2e-service-provider", 9800, []interface{}{"e2e-service-provider"}})},
+					{int64(9700), flow.NewStreamRecordWithoutTS(flow.Data{"e2e-service-provider", 9700, []interface{}{"e2e-service-provider"}})},
+				},
+				"e2e-service-consumer": {
+					{int64(9900), flow.NewStreamRecordWithoutTS(flow.Data{"e2e-service-consumer", 9900, []interface{}{"e2e-service-consumer"}})},
+					{int64(9800), flow.NewStreamRecordWithoutTS(flow.Data{"e2e-service-consumer", 9800, []interface{}{"e2e-service-consumer"}})},
+					{int64(9700), flow.NewStreamRecordWithoutTS(flow.Data{"e2e-service-consumer", 9700, []interface{}{"e2e-service-consumer"}})},
+				},
 			},
 		},
 		{
 			name: "DESC by default",
 			sort: 0,
-			expected: []*Tuple2{
-				{int64(10000), flow.NewStreamRecordWithoutTS(flow.Data{"e2e-service-provider", 10000, []interface{}{"e2e-service-provider"}})},
-				{int64(9900), flow.NewStreamRecordWithoutTS(flow.Data{"e2e-service-consumer", 9900, []interface{}{"e2e-service-consumer"}})},
-				{int64(9800), flow.NewStreamRecordWithoutTS(flow.Data{"e2e-service-provider", 9800, []interface{}{"e2e-service-provider"}})},
+			expected: map[string][]*Tuple2{
+				"e2e-service-provider": {
+					{int64(10000), flow.NewStreamRecordWithoutTS(flow.Data{"e2e-service-provider", 10000, []interface{}{"e2e-service-provider"}})},
+					{int64(9800), flow.NewStreamRecordWithoutTS(flow.Data{"e2e-service-provider", 9800, []interface{}{"e2e-service-provider"}})},
+					{int64(9700), flow.NewStreamRecordWithoutTS(flow.Data{"e2e-service-provider", 9700, []interface{}{"e2e-service-provider"}})},
+				},
+				"e2e-service-consumer": {
+					{int64(9900), flow.NewStreamRecordWithoutTS(flow.Data{"e2e-service-consumer", 9900, []interface{}{"e2e-service-consumer"}})},
+					{int64(9800), flow.NewStreamRecordWithoutTS(flow.Data{"e2e-service-consumer", 9800, []interface{}{"e2e-service-consumer"}})},
+					{int64(9700), flow.NewStreamRecordWithoutTS(flow.Data{"e2e-service-consumer", 9700, []interface{}{"e2e-service-consumer"}})},
+				},
 			},
 		},
 		{
 			name: "ASC",
 			sort: ASC,
-			expected: []*Tuple2{
-				{int64(9500), flow.NewStreamRecordWithoutTS(flow.Data{"e2e-service-consumer", 9500, []interface{}{"e2e-service-consumer"}})},
-				{int64(9600), flow.NewStreamRecordWithoutTS(flow.Data{"e2e-service-consumer", 9600, []interface{}{"e2e-service-consumer"}})},
-				{int64(9700), flow.NewStreamRecordWithoutTS(flow.Data{"e2e-service-consumer", 9700, []interface{}{"e2e-service-consumer"}})},
+			expected: map[string][]*Tuple2{
+				"e2e-service-consumer": {
+					{int64(9500), flow.NewStreamRecordWithoutTS(flow.Data{"e2e-service-consumer", 9500, []interface{}{"e2e-service-consumer"}})},
+					{int64(9600), flow.NewStreamRecordWithoutTS(flow.Data{"e2e-service-consumer", 9600, []interface{}{"e2e-service-consumer"}})},
+					{int64(9700), flow.NewStreamRecordWithoutTS(flow.Data{"e2e-service-consumer", 9700, []interface{}{"e2e-service-consumer"}})},
+				},
+				"e2e-service-provider": {
+					{int64(9700), flow.NewStreamRecordWithoutTS(flow.Data{"e2e-service-provider", 9700, []interface{}{"e2e-service-provider"}})},
+					{int64(9800), flow.NewStreamRecordWithoutTS(flow.Data{"e2e-service-provider", 9800, []interface{}{"e2e-service-provider"}})},
+					{int64(10000), flow.NewStreamRecordWithoutTS(flow.Data{"e2e-service-provider", 10000, []interface{}{"e2e-service-provider"}})},
+				},
 			},
 		},
 	}
@@ -85,18 +106,26 @@ func TestFlow_TopN_Aggregator(t *testing.T) {
 			} else {
 				comparator = utils.Int64Comparator
 			}
-			topN := &topNAggregator{
-				cacheSize:  3,
-				sort:       tt.sort,
-				comparator: comparator,
-				treeMap:    treemap.NewWith(comparator),
+			topN := &topNAggregatorGroup{
+				cacheSize:       3,
+				sort:            tt.sort,
+				comparator:      comparator,
+				aggregatorGroup: make(map[string]*topNAggregator),
 				sortKeyExtractor: func(record flow.StreamRecord) int64 {
 					return int64(record.Data().(flow.Data)[1].(int))
 				},
+				groupKeyExtractor: func(record flow.StreamRecord) string {
+					return record.Data().(flow.Data)[0].(string)
+				},
 			}
 			topN.Add(input)
-			require.Len(topN.Snapshot(), 3)
-			require.Equal(tt.expected, topN.Snapshot())
+			snapshot := topN.Snapshot()
+			require.Len(snapshot, 2)
+			require.Contains(snapshot, "e2e-service-provider") // provider group
+			require.Contains(snapshot, "e2e-service-consumer") // consumer group
+			if diff := cmp.Diff(tt.expected, snapshot); diff != "" {
+				t.Errorf("Snapshot() mismatch (-want +got):\n%s", diff)
+			}
 		})
 	}
 }
diff --git a/pkg/flow/types.go b/pkg/flow/types.go
index 778f9a7d..538ce849 100644
--- a/pkg/flow/types.go
+++ b/pkg/flow/types.go
@@ -23,6 +23,7 @@ import (
 	"io"
 	"sync"
 
+	"github.com/google/go-cmp/cmp"
 	"google.golang.org/protobuf/types/known/timestamppb"
 )
 
@@ -136,6 +137,11 @@ func (sr StreamRecord) Data() interface{} {
 	return sr.data
 }
 
+// Equal checks if two StreamRecord are the same.
+func (sr StreamRecord) Equal(other StreamRecord) bool {
+	return sr.ts == other.ts && cmp.Equal(sr.data, other.data)
+}
+
 // Inlet represents a type that exposes one open input.
 //
 //go:generate mockgen -destination=./inlet_mock.go -package=flow github.com/apache/skywalking-banyandb/pkg/flow Inlet
diff --git a/pkg/test/measure/testdata/measures/service_instance_endpoint_cpm_minute.json b/pkg/test/measure/testdata/measures/service_instance_endpoint_cpm_minute.json
new file mode 100644
index 00000000..47241abc
--- /dev/null
+++ b/pkg/test/measure/testdata/measures/service_instance_endpoint_cpm_minute.json
@@ -0,0 +1,51 @@
+{
+  "metadata": {
+    "name": "service_instance_endpoint_cpm_minute",
+    "group": "sw_metric"
+  },
+  "tag_families": [
+    {
+      "name": "default",
+      "tags": [
+        {
+          "name": "id",
+          "type": "TAG_TYPE_ID"
+        },
+        {
+          "name": "entity_id",
+          "type": "TAG_TYPE_STRING"
+        },
+        {
+          "name": "service_id",
+          "type": "TAG_TYPE_STRING"
+        },
+        {
+          "name": "http.uri",
+          "type": "TAG_TYPE_STRING"
+        }
+      ]
+    }
+  ],
+  "fields": [
+    {
+      "name": "total",
+      "field_type": "FIELD_TYPE_INT",
+      "encoding_method": "ENCODING_METHOD_GORILLA",
+      "compression_method": "COMPRESSION_METHOD_ZSTD"
+    },
+    {
+      "name": "value",
+      "field_type": "FIELD_TYPE_INT",
+      "encoding_method": "ENCODING_METHOD_GORILLA",
+      "compression_method": "COMPRESSION_METHOD_ZSTD"
+    }
+  ],
+  "entity": {
+    "tag_names": [
+      "service_id",
+      "entity_id"
+    ]
+  },
+  "interval": "1m",
+  "updated_at": "2021-04-15T01:30:15.01Z"
+}
\ No newline at end of file
diff --git a/pkg/test/measure/testdata/topn_aggregations/service_cpm_minute_nogroup_top100.json b/pkg/test/measure/testdata/topn_aggregations/service_cpm_minute_nogroup_top100.json
deleted file mode 100644
index a4e1456c..00000000
--- a/pkg/test/measure/testdata/topn_aggregations/service_cpm_minute_nogroup_top100.json
+++ /dev/null
@@ -1,14 +0,0 @@
-{
-  "metadata": {
-    "name": "service_cpm_minute_no_group_by_top100",
-    "group": "sw_metric"
-  },
-  "source_measure": {
-    "name": "service_cpm_minute",
-    "group": "sw_metric"
-  },
-  "field_name": "value",
-  "field_value_sort": 1,
-  "counters_number": 1000,
-  "lru_size": 10
-}
\ No newline at end of file
diff --git a/pkg/test/measure/testdata/topn_aggregations/service_cpm_minute_top_bottom100.json b/pkg/test/measure/testdata/topn_aggregations/service_instance_cpm_minute_top_bottom_100.json
similarity index 65%
copy from pkg/test/measure/testdata/topn_aggregations/service_cpm_minute_top_bottom100.json
copy to pkg/test/measure/testdata/topn_aggregations/service_instance_cpm_minute_top_bottom_100.json
index fec9f49d..08d6a109 100644
--- a/pkg/test/measure/testdata/topn_aggregations/service_cpm_minute_top_bottom100.json
+++ b/pkg/test/measure/testdata/topn_aggregations/service_instance_cpm_minute_top_bottom_100.json
@@ -1,16 +1,16 @@
 {
   "metadata": {
-    "name": "service_cpm_minute_top_bottom_100",
+    "name": "service_instance_cpm_minute_top_bottom_100",
     "group": "sw_metric"
   },
   "source_measure": {
-    "name": "service_cpm_minute",
+    "name": "service_instance_cpm_minute",
     "group": "sw_metric"
   },
   "field_name": "value",
   "field_value_sort": 0,
   "group_by_tag_names": [
-    "entity_id"
+    "service_id"
   ],
   "counters_number": 1000,
   "lru_size": 10
diff --git a/pkg/test/measure/testdata/topn_aggregations/service_cpm_minute_top_bottom100.json b/pkg/test/measure/testdata/topn_aggregations/service_instance_endpoint_cpm_minute_top_bottom_100.json
similarity index 62%
rename from pkg/test/measure/testdata/topn_aggregations/service_cpm_minute_top_bottom100.json
rename to pkg/test/measure/testdata/topn_aggregations/service_instance_endpoint_cpm_minute_top_bottom_100.json
index fec9f49d..65f3af15 100644
--- a/pkg/test/measure/testdata/topn_aggregations/service_cpm_minute_top_bottom100.json
+++ b/pkg/test/measure/testdata/topn_aggregations/service_instance_endpoint_cpm_minute_top_bottom_100.json
@@ -1,16 +1,16 @@
 {
   "metadata": {
-    "name": "service_cpm_minute_top_bottom_100",
+    "name": "service_instance_endpoint_cpm_minute_top_bottom_100",
     "group": "sw_metric"
   },
   "source_measure": {
-    "name": "service_cpm_minute",
+    "name": "service_instance_endpoint_cpm_minute",
     "group": "sw_metric"
   },
   "field_name": "value",
   "field_value_sort": 0,
   "group_by_tag_names": [
-    "entity_id"
+    "http.uri"
   ],
   "counters_number": 1000,
   "lru_size": 10
diff --git a/test/cases/measure/data/testdata/service_instance_cpm_minute_data.json b/test/cases/measure/data/testdata/service_instance_cpm_minute_data.json
new file mode 100644
index 00000000..b67eef16
--- /dev/null
+++ b/test/cases/measure/data/testdata/service_instance_cpm_minute_data.json
@@ -0,0 +1,212 @@
+[
+  {
+    "tag_families": [
+      {
+        "tags": [
+          {
+            "id": {
+              "value": "1"
+            }
+          },
+          {
+            "str": {
+              "value": "entity_1"
+            }
+          },
+          {
+            "str": {
+              "value": "svc_1"
+            }
+          }
+        ]
+      }
+    ],
+    "fields": [
+      {
+        "int": {
+          "value": 100
+        }
+      },
+      {
+        "int": {
+          "value": 1
+        }
+      }
+    ]
+  },
+  {
+    "tag_families": [
+      {
+        "tags": [
+          {
+            "id": {
+              "value": "4"
+            }
+          },
+          {
+            "str": {
+              "value": "entity_2"
+            }
+          },
+          {
+            "str": {
+              "value": "svc_2"
+            }
+          }
+        ]
+      }
+    ],
+    "fields": [
+      {
+        "int": {
+          "value": 100
+        }
+      },
+      {
+        "int": {
+          "value": 2
+        }
+      }
+    ]
+  },
+  {
+    "tag_families": [
+      {
+        "tags": [
+          {
+            "id": {
+              "value": "5"
+            }
+          },
+          {
+            "str": {
+              "value": "entity_2"
+            }
+          },
+          {
+            "str": {
+              "value": "svc_1"
+            }
+          }
+        ]
+      }
+    ],
+    "fields": [
+      {
+        "int": {
+          "value": 100
+        }
+      },
+      {
+        "int": {
+          "value": 3
+        }
+      }
+    ]
+  },
+  {
+    "tag_families": [
+      {
+        "tags": [
+          {
+            "id": {
+              "value": "6"
+            }
+          },
+          {
+            "str": {
+              "value": "entity_3"
+            }
+          },
+          {
+            "str": {
+              "value": "svc_2"
+            }
+          }
+        ]
+      }
+    ],
+    "fields": [
+      {
+        "int": {
+          "value": 100
+        }
+      },
+      {
+        "int": {
+          "value": 5
+        }
+      }
+    ]
+  },
+  {
+    "tag_families": [
+      {
+        "tags": [
+          {
+            "id": {
+              "value": "2"
+            }
+          },
+          {
+            "str": {
+              "value": "entity_1"
+            }
+          },
+          {
+            "str": {
+              "value": ""
+            }
+          }
+        ]
+      }
+    ],
+    "fields": [
+      {
+        "int": {
+          "value": 50
+        }
+      },
+      {
+        "int": {
+          "value": 4
+        }
+      }
+    ]
+  },
+  {
+    "tag_families": [
+      {
+        "tags": [
+          {
+            "id": {
+              "value": "3"
+            }
+          },
+          {
+            "str": {
+              "value": "entity_1"
+            }
+          },
+          {
+            "str": {
+              "value": "svc_1"
+            }
+          }
+        ]
+      }
+    ],
+    "fields": [
+      {
+        "int": {
+          "value": 300
+        }
+      },
+      {
+        "int": {
+          "value": 6
+        }
+      }
+    ]
+  }
+]
diff --git a/test/cases/measure/data/testdata/service_instance_cpm_minute_data1.json b/test/cases/measure/data/testdata/service_instance_cpm_minute_data1.json
new file mode 100644
index 00000000..93b8b834
--- /dev/null
+++ b/test/cases/measure/data/testdata/service_instance_cpm_minute_data1.json
@@ -0,0 +1,212 @@
+[
+  {
+    "tag_families": [
+      {
+        "tags": [
+          {
+            "id": {
+              "value": "7"
+            }
+          },
+          {
+            "str": {
+              "value": "entity_2"
+            }
+          },
+          {
+            "str": {
+              "value": ""
+            }
+          }
+        ]
+      }
+    ],
+    "fields": [
+      {
+        "int": {
+          "value": 100
+        }
+      },
+      {
+        "int": {
+          "value": 10
+        }
+      }
+    ]
+  },
+  {
+    "tag_families": [
+      {
+        "tags": [
+          {
+            "id": {
+              "value": "8"
+            }
+          },
+          {
+            "str": {
+              "value": "entity_2"
+            }
+          },
+          {
+            "str": {
+              "value": "svc_1"
+            }
+          }
+        ]
+      }
+    ],
+    "fields": [
+      {
+        "int": {
+          "value": 100
+        }
+      },
+      {
+        "int": {
+          "value": 9
+        }
+      }
+    ]
+  },
+  {
+    "tag_families": [
+      {
+        "tags": [
+          {
+            "id": {
+              "value": "9"
+            }
+          },
+          {
+            "str": {
+              "value": "entity_2"
+            }
+          },
+          {
+            "str": {
+              "value": "svc_4"
+            }
+          }
+        ]
+      }
+    ],
+    "fields": [
+      {
+        "int": {
+          "value": 100
+        }
+      },
+      {
+        "int": {
+          "value": 8
+        }
+      }
+    ]
+  },
+  {
+    "tag_families": [
+      {
+        "tags": [
+          {
+            "id": {
+              "value": "10"
+            }
+          },
+          {
+            "str": {
+              "value": "entity_3"
+            }
+          },
+          {
+            "str": {
+              "value": "svc_1"
+            }
+          }
+        ]
+      }
+    ],
+    "fields": [
+      {
+        "int": {
+          "value": 100
+        }
+      },
+      {
+        "int": {
+          "value": 11
+        }
+      }
+    ]
+  },
+  {
+    "tag_families": [
+      {
+        "tags": [
+          {
+            "id": {
+              "value": "11"
+            }
+          },
+          {
+            "str": {
+              "value": "entity_1"
+            }
+          },
+          {
+            "str": {
+              "value": "svc_2"
+            }
+          }
+        ]
+      }
+    ],
+    "fields": [
+      {
+        "int": {
+          "value": 50
+        }
+      },
+      {
+        "int": {
+          "value": 12
+        }
+      }
+    ]
+  },
+  {
+    "tag_families": [
+      {
+        "tags": [
+          {
+            "id": {
+              "value": "12"
+            }
+          },
+          {
+            "str": {
+              "value": "entity_1"
+            }
+          },
+          {
+            "str": {
+              "value": ""
+            }
+          }
+        ]
+      }
+    ],
+    "fields": [
+      {
+        "int": {
+          "value": 300
+        }
+      },
+      {
+        "int": {
+          "value": 7
+        }
+      }
+    ]
+  }
+]
diff --git a/test/cases/measure/data/testdata/service_instance_cpm_minute_data2.json b/test/cases/measure/data/testdata/service_instance_cpm_minute_data2.json
new file mode 100644
index 00000000..e98f1412
--- /dev/null
+++ b/test/cases/measure/data/testdata/service_instance_cpm_minute_data2.json
@@ -0,0 +1,72 @@
+[
+  {
+    "tag_families": [
+      {
+        "tags": [
+          {
+            "id": {
+              "value": "100"
+            }
+          },
+          {
+            "str": {
+              "value": "entity_2"
+            }
+          },
+          {
+            "str": {
+              "value": "svc_2"
+            }
+          }
+        ]
+      }
+    ],
+    "fields": [
+      {
+        "int": {
+          "value": 100
+        }
+      },
+      {
+        "int": {
+          "value": 100
+        }
+      }
+    ]
+  },
+  {
+    "tag_families": [
+      {
+        "tags": [
+          {
+            "id": {
+              "value": "110"
+            }
+          },
+          {
+            "str": {
+              "value": "entity_2"
+            }
+          },
+          {
+            "str": {
+              "value": ""
+            }
+          }
+        ]
+      }
+    ],
+    "fields": [
+      {
+        "int": {
+          "value": 100
+        }
+      },
+      {
+        "int": {
+          "value": 110
+        }
+      }
+    ]
+  }
+]
diff --git a/test/cases/measure/data/testdata/service_instance_endpoint_cpm_minute_data.json b/test/cases/measure/data/testdata/service_instance_endpoint_cpm_minute_data.json
new file mode 100644
index 00000000..c5817343
--- /dev/null
+++ b/test/cases/measure/data/testdata/service_instance_endpoint_cpm_minute_data.json
@@ -0,0 +1,228 @@
+[
+  {
+    "tag_families": [
+      {
+        "tags": [
+          {
+            "id": {
+              "value": "1"
+            }
+          },
+          {
+            "str": {
+              "value": "entity_1"
+            }
+          },
+          {
+            "str": {
+              "value": "svc_1"
+            }
+          },
+          {
+            "str": {
+              "value": "GET:/metrics"
+            }
+          }
+        ]
+      }
+    ],
+    "fields": [
+      {
+        "int": {
+          "value": 100
+        }
+      },
+      {
+        "int": {
+          "value": 1
+        }
+      }
+    ]
+  },
+  {
+    "tag_families": [
+      {
+        "tags": [
+          {
+            "id": {
+              "value": "4"
+            }
+          },
+          {
+            "str": {
+              "value": "entity_2"
+            }
+          },
+          {
+            "str": {
+              "value": "svc_2"
+            }
+          },
+          {
+            "str": {
+              "value": "GET:/check"
+            }
+          }
+        ]
+      }
+    ],
+    "fields": [
+      {
+        "int": {
+          "value": 100
+        }
+      },
+      {
+        "int": {
+          "value": 2
+        }
+      }
+    ]
+  },
+  {
+    "tag_families": [
+      {
+        "tags": [
+          {
+            "id": {
+              "value": "5"
+            }
+          },
+          {
+            "str": {
+              "value": "entity_2"
+            }
+          },
+          {
+            "str": {
+              "value": "svc_1"
+            }
+          }
+        ]
+      }
+    ],
+    "fields": [
+      {
+        "int": {
+          "value": 100
+        }
+      },
+      {
+        "int": {
+          "value": 3
+        }
+      }
+    ]
+  },
+  {
+    "tag_families": [
+      {
+        "tags": [
+          {
+            "id": {
+              "value": "6"
+            }
+          },
+          {
+            "str": {
+              "value": "entity_3"
+            }
+          },
+          {
+            "str": {
+              "value": "svc_2"
+            }
+          }
+        ]
+      }
+    ],
+    "fields": [
+      {
+        "int": {
+          "value": 100
+        }
+      },
+      {
+        "int": {
+          "value": 5
+        }
+      }
+    ]
+  },
+  {
+    "tag_families": [
+      {
+        "tags": [
+          {
+            "id": {
+              "value": "2"
+            }
+          },
+          {
+            "str": {
+              "value": "entity_1"
+            }
+          },
+          {
+            "str": {
+              "value": ""
+            }
+          },
+          {
+            "null": 0
+          }
+        ]
+      }
+    ],
+    "fields": [
+      {
+        "int": {
+          "value": 50
+        }
+      },
+      {
+        "int": {
+          "value": 4
+        }
+      }
+    ]
+  },
+  {
+    "tag_families": [
+      {
+        "tags": [
+          {
+            "id": {
+              "value": "3"
+            }
+          },
+          {
+            "str": {
+              "value": "entity_1"
+            }
+          },
+          {
+            "str": {
+              "value": "svc_1"
+            }
+          },
+          {
+            "null": 0
+          }
+        ]
+      }
+    ],
+    "fields": [
+      {
+        "int": {
+          "value": 300
+        }
+      },
+      {
+        "int": {
+          "value": 6
+        }
+      }
+    ]
+  }
+]
diff --git a/test/cases/measure/data/testdata/service_instance_endpoint_cpm_minute_data1.json b/test/cases/measure/data/testdata/service_instance_endpoint_cpm_minute_data1.json
new file mode 100644
index 00000000..d1313b86
--- /dev/null
+++ b/test/cases/measure/data/testdata/service_instance_endpoint_cpm_minute_data1.json
@@ -0,0 +1,228 @@
+[
+  {
+    "tag_families": [
+      {
+        "tags": [
+          {
+            "id": {
+              "value": "7"
+            }
+          },
+          {
+            "str": {
+              "value": "entity_2"
+            }
+          },
+          {
+            "str": {
+              "value": ""
+            }
+          },
+          {
+            "null": 0
+          }
+        ]
+      }
+    ],
+    "fields": [
+      {
+        "int": {
+          "value": 100
+        }
+      },
+      {
+        "int": {
+          "value": 10
+        }
+      }
+    ]
+  },
+  {
+    "tag_families": [
+      {
+        "tags": [
+          {
+            "id": {
+              "value": "8"
+            }
+          },
+          {
+            "str": {
+              "value": "entity_2"
+            }
+          },
+          {
+            "str": {
+              "value": "svc_1"
+            }
+          },
+          {
+            "str": {
+              "value": "GET:/metrics"
+            }
+          }
+        ]
+      }
+    ],
+    "fields": [
+      {
+        "int": {
+          "value": 100
+        }
+      },
+      {
+        "int": {
+          "value": 9
+        }
+      }
+    ]
+  },
+  {
+    "tag_families": [
+      {
+        "tags": [
+          {
+            "id": {
+              "value": "9"
+            }
+          },
+          {
+            "str": {
+              "value": "entity_2"
+            }
+          },
+          {
+            "str": {
+              "value": "svc_4"
+            }
+          },
+          {
+            "str": {
+              "value": "GET:/check"
+            }
+          }
+        ]
+      }
+    ],
+    "fields": [
+      {
+        "int": {
+          "value": 100
+        }
+      },
+      {
+        "int": {
+          "value": 8
+        }
+      }
+    ]
+  },
+  {
+    "tag_families": [
+      {
+        "tags": [
+          {
+            "id": {
+              "value": "10"
+            }
+          },
+          {
+            "str": {
+              "value": "entity_3"
+            }
+          },
+          {
+            "str": {
+              "value": "svc_1"
+            }
+          }
+        ]
+      }
+    ],
+    "fields": [
+      {
+        "int": {
+          "value": 100
+        }
+      },
+      {
+        "int": {
+          "value": 11
+        }
+      }
+    ]
+  },
+  {
+    "tag_families": [
+      {
+        "tags": [
+          {
+            "id": {
+              "value": "11"
+            }
+          },
+          {
+            "str": {
+              "value": "entity_1"
+            }
+          },
+          {
+            "str": {
+              "value": "svc_2"
+            }
+          }
+        ]
+      }
+    ],
+    "fields": [
+      {
+        "int": {
+          "value": 50
+        }
+      },
+      {
+        "int": {
+          "value": 12
+        }
+      }
+    ]
+  },
+  {
+    "tag_families": [
+      {
+        "tags": [
+          {
+            "id": {
+              "value": "12"
+            }
+          },
+          {
+            "str": {
+              "value": "entity_1"
+            }
+          },
+          {
+            "str": {
+              "value": ""
+            }
+          },
+          {
+            "null": 0
+          }
+        ]
+      }
+    ],
+    "fields": [
+      {
+        "int": {
+          "value": 300
+        }
+      },
+      {
+        "int": {
+          "value": 7
+        }
+      }
+    ]
+  }
+]
diff --git a/test/cases/measure/data/testdata/service_instance_endpoint_cpm_minute_data2.json b/test/cases/measure/data/testdata/service_instance_endpoint_cpm_minute_data2.json
new file mode 100644
index 00000000..170f45d6
--- /dev/null
+++ b/test/cases/measure/data/testdata/service_instance_endpoint_cpm_minute_data2.json
@@ -0,0 +1,82 @@
+[
+  {
+    "tag_families": [
+      {
+        "tags": [
+          {
+            "id": {
+              "value": "100"
+            }
+          },
+          {
+            "str": {
+              "value": "entity_2"
+            }
+          },
+          {
+            "str": {
+              "value": "svc_2"
+            }
+          },
+          {
+            "str": {
+              "value": "GET:/metrics"
+            }
+          }
+        ]
+      }
+    ],
+    "fields": [
+      {
+        "int": {
+          "value": 100
+        }
+      },
+      {
+        "int": {
+          "value": 100
+        }
+      }
+    ]
+  },
+  {
+    "tag_families": [
+      {
+        "tags": [
+          {
+            "id": {
+              "value": "110"
+            }
+          },
+          {
+            "str": {
+              "value": "entity_2"
+            }
+          },
+          {
+            "str": {
+              "value": ""
+            }
+          },
+          {
+            "str": {
+              "value": "GET:/metrics"
+            }
+          }
+        ]
+      }
+    ],
+    "fields": [
+      {
+        "int": {
+          "value": 100
+        }
+      },
+      {
+        "int": {
+          "value": 110
+        }
+      }
+    ]
+  }
+]
diff --git a/test/cases/topn/data/input/aggr_desc.yaml b/test/cases/topn/data/input/aggr_desc.yaml
index 83d2f7db..3151ab64 100644
--- a/test/cases/topn/data/input/aggr_desc.yaml
+++ b/test/cases/topn/data/input/aggr_desc.yaml
@@ -16,7 +16,7 @@
 # under the License.
 
 metadata:
-  name: "service_cpm_minute_top_bottom_100"
+  name: "service_instance_cpm_minute_top_bottom_100"
   group: "sw_metric"
 topN: 3
 fieldValueSort: 1
diff --git a/test/cases/topn/data/input/asc.yaml b/test/cases/topn/data/input/asc.yaml
index 75e714b0..9a8732df 100644
--- a/test/cases/topn/data/input/asc.yaml
+++ b/test/cases/topn/data/input/asc.yaml
@@ -16,7 +16,7 @@
 # under the License.
 
 metadata:
-  name: "service_cpm_minute_top_bottom_100"
+  name: "service_instance_cpm_minute_top_bottom_100"
   group: "sw_metric"
-topN: 1
+topN: 2
 fieldValueSort: 2
diff --git a/test/cases/topn/data/input/condition_aggr_desc.yaml b/test/cases/topn/data/input/condition_aggr_desc.yaml
index 88f701c6..844af5a3 100644
--- a/test/cases/topn/data/input/condition_aggr_desc.yaml
+++ b/test/cases/topn/data/input/condition_aggr_desc.yaml
@@ -16,14 +16,14 @@
 # under the License.
 
 metadata:
-  name: "service_cpm_minute_top_bottom_100"
+  name: "service_instance_cpm_minute_top_bottom_100"
   group: "sw_metric"
 topN: 1
 fieldValueSort: 1
 agg: 2
 conditions:
-- name: entity_id
+- name: service_id
   op: 1
   value:
     str:
-      value: entity_1
+      value: "svc_1"
diff --git a/test/cases/topn/data/input/desc.yaml b/test/cases/topn/data/input/desc.yaml
index d40dfb52..63a4d565 100644
--- a/test/cases/topn/data/input/desc.yaml
+++ b/test/cases/topn/data/input/desc.yaml
@@ -16,7 +16,7 @@
 # under the License.
 
 metadata:
-  name: "service_cpm_minute_top_bottom_100"
+  name: "service_instance_cpm_minute_top_bottom_100"
   group: "sw_metric"
-topN: 1
+topN: 2
 fieldValueSort: 1
diff --git a/test/cases/topn/data/input/condition_aggr_desc.yaml b/test/cases/topn/data/input/null_group.yaml
similarity index 89%
copy from test/cases/topn/data/input/condition_aggr_desc.yaml
copy to test/cases/topn/data/input/null_group.yaml
index 88f701c6..5020488f 100644
--- a/test/cases/topn/data/input/condition_aggr_desc.yaml
+++ b/test/cases/topn/data/input/null_group.yaml
@@ -16,14 +16,14 @@
 # under the License.
 
 metadata:
-  name: "service_cpm_minute_top_bottom_100"
+  name: "service_instance_endpoint_cpm_minute_top_bottom_100"
   group: "sw_metric"
-topN: 1
+topN: 3
 fieldValueSort: 1
 agg: 2
 conditions:
-- name: entity_id
+- name: http.uri
   op: 1
   value:
     str:
-      value: entity_1
+      value: ""
diff --git a/test/cases/topn/data/want/aggr_desc.yaml b/test/cases/topn/data/want/aggr_desc.yaml
index 85c4ee51..96cc77b2 100644
--- a/test/cases/topn/data/want/aggr_desc.yaml
+++ b/test/cases/topn/data/want/aggr_desc.yaml
@@ -17,15 +17,39 @@
 
 lists:
 - items:
-  - name: entity_1
+  - entity:
+    - key: service_id
+      value:
+        str:
+          value: svc_2
+    - key: entity_id
+      value:
+        str:
+          value: entity_1
     value:
       int:
         value: "12"
-  - name: entity_3
+  - entity:
+    - key: service_id
+      value:
+        str:
+          value: svc_1
+    - key: entity_id
+      value:
+        str:
+          value: entity_3
     value:
       int:
         value: "11"
-  - name: entity_2
+  - entity:
+    - key: service_id
+      value:
+        str:
+          value: ""
+    - key: entity_id
+      value:
+        str:
+          value: entity_2
     value:
       int:
         value: "10"
diff --git a/test/cases/topn/data/want/asc.yaml b/test/cases/topn/data/want/asc.yaml
index 14c2f447..2731344d 100644
--- a/test/cases/topn/data/want/asc.yaml
+++ b/test/cases/topn/data/want/asc.yaml
@@ -17,38 +17,158 @@
 
 lists:
 - items:
-  - name: entity_1
+  - entity:
+    - key: service_id
+      value:
+        str:
+          value: svc_1
+    - key: entity_id
+      value:
+        str:
+          value: entity_1
     value:
       int:
         value: "1"
+  - entity:
+      - key: service_id
+        value:
+          str:
+            value: ""
+      - key: entity_id
+        value:
+          str:
+            value: entity_2
+    value:
+      int:
+        value: "10"
   timestamp: "2022-10-17T12:49:00Z"
 - items:
-  - name: entity_2
+  - entity:
+    - key: service_id
+      value:
+        str:
+          value: svc_2
+    - key: entity_id
+      value:
+        str:
+          value: entity_2
     value:
       int:
         value: "2"
+  - entity:
+      - key: service_id
+        value:
+          str:
+            value: svc_1
+      - key: entity_id
+        value:
+          str:
+            value: entity_2
+    value:
+      int:
+        value: "9"
   timestamp: "2022-10-17T12:50:00Z"
 - items:
-  - name: entity_2
+  - entity:
+    - key: service_id
+      value:
+        str:
+          value: svc_1
+    - key: entity_id
+      value:
+        str:
+          value: entity_2
     value:
       int:
         value: "3"
+  - entity:
+      - key: service_id
+        value:
+          str:
+            value: svc_4
+      - key: entity_id
+        value:
+          str:
+            value: entity_2
+    value:
+      int:
+        value: "8"
   timestamp: "2022-10-17T12:51:00Z"
 - items:
-  - name: entity_3
+  - entity:
+    - key: service_id
+      value:
+        str:
+          value: svc_2
+    - key: entity_id
+      value:
+        str:
+          value: entity_3
     value:
       int:
         value: "5"
+  - entity:
+      - key: service_id
+        value:
+          str:
+            value: svc_1
+      - key: entity_id
+        value:
+          str:
+            value: entity_3
+    value:
+      int:
+        value: "11"
   timestamp: "2022-10-17T12:52:00Z"
 - items:
-  - name: entity_1
+  - entity:
+    - key: service_id
+      value:
+        str:
+          value: ""
+    - key: entity_id
+      value:
+        str:
+          value: entity_1
     value:
       int:
         value: "4"
+  - entity:
+      - key: service_id
+        value:
+          str:
+            value: svc_2
+      - key: entity_id
+        value:
+          str:
+            value: entity_1
+    value:
+      int:
+        value: "12"
   timestamp: "2022-10-17T12:53:00Z"
 - items:
-  - name: entity_1
+  - entity:
+    - key: service_id
+      value:
+        str:
+          value: svc_1
+    - key: entity_id
+      value:
+        str:
+          value: entity_1
     value:
       int:
         value: "6"
+  - entity:
+      - key: service_id
+        value:
+          str:
+            value: ""
+      - key: entity_id
+        value:
+          str:
+            value: entity_1
+    value:
+      int:
+        value: "7"
   timestamp: "2022-10-17T12:54:00Z"
\ No newline at end of file
diff --git a/test/cases/topn/data/want/condition_aggr_desc.yaml b/test/cases/topn/data/want/condition_aggr_desc.yaml
index 784e5c38..ebbb77f3 100644
--- a/test/cases/topn/data/want/condition_aggr_desc.yaml
+++ b/test/cases/topn/data/want/condition_aggr_desc.yaml
@@ -17,8 +17,16 @@
 
 lists:
 - items:
-  - name: entity_1
+  - entity:
+    - key: service_id
+      value:
+        str:
+          value: svc_1
+    - key: entity_id
+      value:
+        str:
+          value: entity_3
     value:
       int:
-        value: "12"
+        value: "11"
   timestamp: "2022-10-17T12:54:00Z"
\ No newline at end of file
diff --git a/test/cases/topn/data/want/desc.yaml b/test/cases/topn/data/want/desc.yaml
index 40a10b07..a937323c 100644
--- a/test/cases/topn/data/want/desc.yaml
+++ b/test/cases/topn/data/want/desc.yaml
@@ -17,38 +17,158 @@
 
 lists:
 - items:
-  - name: entity_2
+  - entity:
+    - key: service_id
+      value:
+        str:
+          value: ""
+    - key: entity_id
+      value:
+        str:
+          value: entity_2
     value:
       int:
         value: "10"
+  - entity:
+    - key: service_id
+      value:
+        str:
+          value: svc_1
+    - key: entity_id
+      value:
+        str:
+          value: entity_1
+    value:
+      int:
+        value: "1"
   timestamp: "2022-10-17T12:49:00Z"
 - items:
-  - name: entity_2
+  - entity:
+    - key: service_id
+      value:
+        str:
+          value: svc_1
+    - key: entity_id
+      value:
+        str:
+          value: entity_2
     value:
       int:
         value: "9"
+  - entity:
+    - key: service_id
+      value:
+        str:
+          value: svc_2
+    - key: entity_id
+      value:
+        str:
+          value: entity_2
+    value:
+      int:
+        value: "2"
   timestamp: "2022-10-17T12:50:00Z"
 - items:
-  - name: entity_2
+  - entity:
+    - key: service_id
+      value:
+        str:
+          value: svc_4
+    - key: entity_id
+      value:
+        str:
+          value: entity_2
     value:
       int:
         value: "8"
+  - entity:
+    - key: service_id
+      value:
+        str:
+          value: svc_1
+    - key: entity_id
+      value:
+        str:
+          value: entity_2
+    value:
+      int:
+        value: "3"
   timestamp: "2022-10-17T12:51:00Z"
 - items:
-  - name: entity_3
+  - entity:
+    - key: service_id
+      value:
+        str:
+          value: svc_1
+    - key: entity_id
+      value:
+        str:
+          value: entity_3
     value:
       int:
         value: "11"
+  - entity:
+    - key: service_id
+      value:
+        str:
+          value: svc_2
+    - key: entity_id
+      value:
+        str:
+          value: entity_3
+    value:
+      int:
+        value: "5"
   timestamp: "2022-10-17T12:52:00Z"
 - items:
-  - name: entity_1
+  - entity:
+    - key: service_id
+      value:
+        str:
+          value: svc_2
+    - key: entity_id
+      value:
+        str:
+          value: entity_1
     value:
       int:
         value: "12"
+  - entity:
+    - key: service_id
+      value:
+        str:
+          value: ""
+    - key: entity_id
+      value:
+        str:
+          value: entity_1
+    value:
+      int:
+        value: "4"
   timestamp: "2022-10-17T12:53:00Z"
 - items:
-  - name: entity_1
+  - entity:
+    - key: service_id
+      value:
+        str:
+          value: ""
+    - key: entity_id
+      value:
+        str:
+          value: entity_1
     value:
       int:
         value: "7"
+  - entity:
+    - key: service_id
+      value:
+        str:
+          value: svc_1
+    - key: entity_id
+      value:
+        str:
+          value: entity_1
+    value:
+      int:
+        value: "6"
   timestamp: "2022-10-17T12:54:00Z"
\ No newline at end of file
diff --git a/test/cases/topn/data/want/aggr_desc.yaml b/test/cases/topn/data/want/null_group.yaml
similarity index 65%
copy from test/cases/topn/data/want/aggr_desc.yaml
copy to test/cases/topn/data/want/null_group.yaml
index 85c4ee51..c9841a7d 100644
--- a/test/cases/topn/data/want/aggr_desc.yaml
+++ b/test/cases/topn/data/want/null_group.yaml
@@ -17,16 +17,40 @@
 
 lists:
 - items:
-  - name: entity_1
+  - entity:
+    - key: service_id
+      value:
+        str:
+          value: svc_2
+    - key: entity_id
+      value:
+        str:
+          value: entity_1
     value:
       int:
         value: "12"
-  - name: entity_3
+  - entity:
+    - key: service_id
+      value:
+        str:
+          value: svc_1
+    - key: entity_id
+      value:
+        str:
+          value: entity_3
     value:
       int:
         value: "11"
-  - name: entity_2
+  - entity:
+    - key: service_id
+      value:
+        str:
+          value: ""
+    - key: entity_id
+      value:
+        str:
+          value: entity_2
     value:
       int:
         value: "10"
-  timestamp: "2022-10-17T12:54:00Z"
\ No newline at end of file
+  timestamp: "2023-03-18T13:19:00Z"
\ No newline at end of file
diff --git a/test/cases/topn/topn.go b/test/cases/topn/topn.go
index 3147048a..48f6d54e 100644
--- a/test/cases/topn/topn.go
+++ b/test/cases/topn/topn.go
@@ -44,4 +44,5 @@ var _ = g.DescribeTable("TopN Tests", verify,
 	g.Entry("asc", helpers.Args{Input: "asc", Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
 	g.Entry("max top3 order by desc", helpers.Args{Input: "aggr_desc", Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
 	g.Entry("max top3 with condition order by desc", helpers.Args{Input: "condition_aggr_desc", Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
+	g.Entry("max top3 for null group order by desc", helpers.Args{Input: "null_group", Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
 )
diff --git a/test/integration/cold_query/query_suite_test.go b/test/integration/cold_query/query_suite_test.go
index 04296ae4..0319b5f5 100644
--- a/test/integration/cold_query/query_suite_test.go
+++ b/test/integration/cold_query/query_suite_test.go
@@ -76,6 +76,12 @@ var _ = SynchronizedBeforeSuite(func() []byte {
 	casesmeasureData.Write(conn, "service_cpm_minute", "sw_metric", "service_cpm_minute_data1.json", now.Add(10*time.Second), interval)
 	casesmeasureData.Write(conn, "service_cpm_minute", "sw_metric", "service_cpm_minute_data2.json", now.Add(10*time.Minute), interval)
 	casesmeasureData.Write(conn, "instance_clr_cpu_minute", "sw_metric", "instance_clr_cpu_minute_data.json", now, interval)
+	casesmeasureData.Write(conn, "service_instance_cpm_minute", "sw_metric", "service_instance_cpm_minute_data.json", now, interval)
+	casesmeasureData.Write(conn, "service_instance_cpm_minute", "sw_metric", "service_instance_cpm_minute_data1.json", now.Add(10*time.Second), interval)
+	casesmeasureData.Write(conn, "service_instance_cpm_minute", "sw_metric", "service_instance_cpm_minute_data2.json", now.Add(10*time.Minute), interval)
+	casesmeasureData.Write(conn, "service_instance_endpoint_cpm_minute", "sw_metric", "service_instance_endpoint_cpm_minute_data.json", now, interval)
+	casesmeasureData.Write(conn, "service_instance_endpoint_cpm_minute", "sw_metric", "service_instance_endpoint_cpm_minute_data1.json", now.Add(10*time.Second), interval)
+	casesmeasureData.Write(conn, "service_instance_endpoint_cpm_minute", "sw_metric", "service_instance_endpoint_cpm_minute_data2.json", now.Add(10*time.Minute), interval)
 	Expect(conn.Close()).To(Succeed())
 	return []byte(addr)
 }, func(address []byte) {
diff --git a/test/integration/query/query_suite_test.go b/test/integration/query/query_suite_test.go
index 49bb25ce..762b9889 100644
--- a/test/integration/query/query_suite_test.go
+++ b/test/integration/query/query_suite_test.go
@@ -77,6 +77,12 @@ var _ = SynchronizedBeforeSuite(func() []byte {
 	casesmeasuredata.Write(conn, "service_cpm_minute", "sw_metric", "service_cpm_minute_data1.json", now.Add(10*time.Second), interval)
 	casesmeasuredata.Write(conn, "service_cpm_minute", "sw_metric", "service_cpm_minute_data2.json", now.Add(10*time.Minute), interval)
 	casesmeasuredata.Write(conn, "instance_clr_cpu_minute", "sw_metric", "instance_clr_cpu_minute_data.json", now, interval)
+	casesmeasuredata.Write(conn, "service_instance_cpm_minute", "sw_metric", "service_instance_cpm_minute_data.json", now, interval)
+	casesmeasuredata.Write(conn, "service_instance_cpm_minute", "sw_metric", "service_instance_cpm_minute_data1.json", now.Add(10*time.Second), interval)
+	casesmeasuredata.Write(conn, "service_instance_cpm_minute", "sw_metric", "service_instance_cpm_minute_data2.json", now.Add(10*time.Minute), interval)
+	casesmeasuredata.Write(conn, "service_instance_endpoint_cpm_minute", "sw_metric", "service_instance_endpoint_cpm_minute_data.json", now, interval)
+	casesmeasuredata.Write(conn, "service_instance_endpoint_cpm_minute", "sw_metric", "service_instance_endpoint_cpm_minute_data1.json", now.Add(10*time.Second), interval)
+	casesmeasuredata.Write(conn, "service_instance_endpoint_cpm_minute", "sw_metric", "service_instance_endpoint_cpm_minute_data2.json", now.Add(10*time.Minute), interval)
 	Expect(conn.Close()).To(Succeed())
 	return []byte(addr)
 }, func(address []byte) {