You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ha...@apache.org on 2023/03/19 12:58:08 UTC
[skywalking-banyandb] branch main updated: [OAP Integration] Register missing TopN registry service (#257)
This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new 6be427d0 [OAP Integration] Register missing TopN registry service (#257)
6be427d0 is described below
commit 6be427d092b28e5dd31e5e3b181b775fe088e47a
Author: Jiajing LU <lu...@gmail.com>
AuthorDate: Sun Mar 19 20:58:02 2023 +0800
[OAP Integration] Register missing TopN registry service (#257)
* register missing service
* support multiple groups in topn and use seriesID as identity
* add null group
---
CHANGES.md | 1 +
api/proto/banyandb/measure/v1/topn.proto | 2 +-
banyand/liaison/grpc/measure.go | 10 +-
banyand/liaison/grpc/server.go | 1 +
banyand/measure/measure_topn.go | 136 +++++++-----
banyand/measure/measure_write.go | 9 +-
banyand/query/processor_topn.go | 99 ++++++---
banyand/tsdb/seriesdb.go | 6 +-
docs/api-reference.md | 2 +-
pkg/flow/streaming/streaming_test.go | 30 ++-
pkg/flow/streaming/topn.go | 109 ++++++----
pkg/flow/streaming/topn_test.go | 73 +++++--
pkg/flow/types.go | 6 +
.../service_instance_endpoint_cpm_minute.json | 51 +++++
.../service_cpm_minute_nogroup_top100.json | 14 --
...ervice_instance_cpm_minute_top_bottom_100.json} | 6 +-
...stance_endpoint_cpm_minute_top_bottom_100.json} | 6 +-
.../testdata/service_instance_cpm_minute_data.json | 212 +++++++++++++++++++
.../service_instance_cpm_minute_data1.json | 212 +++++++++++++++++++
.../service_instance_cpm_minute_data2.json | 72 +++++++
.../service_instance_endpoint_cpm_minute_data.json | 228 +++++++++++++++++++++
...service_instance_endpoint_cpm_minute_data1.json | 228 +++++++++++++++++++++
...service_instance_endpoint_cpm_minute_data2.json | 82 ++++++++
test/cases/topn/data/input/aggr_desc.yaml | 2 +-
test/cases/topn/data/input/asc.yaml | 4 +-
.../cases/topn/data/input/condition_aggr_desc.yaml | 6 +-
test/cases/topn/data/input/desc.yaml | 4 +-
.../{condition_aggr_desc.yaml => null_group.yaml} | 8 +-
test/cases/topn/data/want/aggr_desc.yaml | 30 ++-
test/cases/topn/data/want/asc.yaml | 132 +++++++++++-
test/cases/topn/data/want/condition_aggr_desc.yaml | 12 +-
test/cases/topn/data/want/desc.yaml | 132 +++++++++++-
.../data/want/{aggr_desc.yaml => null_group.yaml} | 32 ++-
test/cases/topn/topn.go | 1 +
test/integration/cold_query/query_suite_test.go | 6 +
test/integration/query/query_suite_test.go | 6 +
36 files changed, 1762 insertions(+), 208 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index 0ff0aefd..d4df9af1 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -8,6 +8,7 @@ Release Notes.
- Add TSDB concept document.
- [UI] Add YAML editor for inputting query criteria.
+- Refactor TopN to support `NULL` group while keeping seriesID from the source measure.
### Chores
diff --git a/api/proto/banyandb/measure/v1/topn.proto b/api/proto/banyandb/measure/v1/topn.proto
index 53644510..f7b426a1 100644
--- a/api/proto/banyandb/measure/v1/topn.proto
+++ b/api/proto/banyandb/measure/v1/topn.proto
@@ -33,7 +33,7 @@ message TopNList {
// timestamp is in the timeunit of milliseconds.
google.protobuf.Timestamp timestamp = 1;
message Item {
- string name = 1;
+ repeated model.v1.Tag entity = 1;
model.v1.FieldValue value = 2;
}
// items contains top-n items in a list
diff --git a/banyand/liaison/grpc/measure.go b/banyand/liaison/grpc/measure.go
index 9d72fa6b..34f746a9 100644
--- a/banyand/liaison/grpc/measure.go
+++ b/banyand/liaison/grpc/measure.go
@@ -79,12 +79,10 @@ func (ms *measureService) Write(measure measurev1.MeasureService_WriteServer) er
continue
}
iwr := &measurev1.InternalWriteRequest{
- Request: writeRequest,
- ShardId: uint32(shardID),
- SeriesHash: tsdb.HashEntity(entity),
- }
- if ms.log.Debug().Enabled() {
- iwr.EntityValues = tagValues.Encode()
+ Request: writeRequest,
+ ShardId: uint32(shardID),
+ SeriesHash: tsdb.HashEntity(entity),
+ EntityValues: tagValues.Encode(),
}
message := bus.NewMessage(bus.MessageID(time.Now().UnixNano()), iwr)
_, errWritePub := ms.pipeline.Publish(data.TopicMeasureWrite, message)
diff --git a/banyand/liaison/grpc/server.go b/banyand/liaison/grpc/server.go
index 4758585d..057486dc 100644
--- a/banyand/liaison/grpc/server.go
+++ b/banyand/liaison/grpc/server.go
@@ -199,6 +199,7 @@ func (s *server) Serve() run.StopNotify {
databasev1.RegisterStreamRegistryServiceServer(s.ser, s.streamRegistryServer)
databasev1.RegisterMeasureRegistryServiceServer(s.ser, s.measureRegistryServer)
propertyv1.RegisterPropertyServiceServer(s.ser, s.propertyServer)
+ databasev1.RegisterTopNAggregationRegistryServiceServer(s.ser, s.topNAggregationRegistryServer)
grpc_health_v1.RegisterHealthServer(s.ser, health.NewServer())
s.stopCh = make(chan struct{})
diff --git a/banyand/measure/measure_topn.go b/banyand/measure/measure_topn.go
index ac207ae0..1b5e6cd3 100644
--- a/banyand/measure/measure_topn.go
+++ b/banyand/measure/measure_topn.go
@@ -20,6 +20,7 @@ package measure
import (
"context"
"encoding/base64"
+ "fmt"
"io"
"strconv"
"strings"
@@ -73,6 +74,11 @@ var (
}
)
+type dataPointWithEntityValues struct {
+ *measurev1.DataPointValue
+ entityValues tsdb.EntityValues
+}
+
type topNStreamingProcessor struct {
m *measure
streamingFlow flow.Flow
@@ -134,7 +140,7 @@ func (t *topNStreamingProcessor) Close() error {
}
func (t *topNStreamingProcessor) writeStreamRecord(record flow.StreamRecord) error {
- tuples, ok := record.Data().([]*streaming.Tuple2)
+ tuplesGroups, ok := record.Data().(map[string][]*streaming.Tuple2)
if !ok {
return errors.New("invalid data type")
}
@@ -142,24 +148,29 @@ func (t *topNStreamingProcessor) writeStreamRecord(record flow.StreamRecord) err
eventTime := t.downSampleTimeBucket(record.TimestampMillis())
timeBucket := eventTime.Format(timeBucketFormat)
var err error
- if e := t.l.Debug(); e.Enabled() {
- e.Str("TopN", t.topNSchema.GetMetadata().GetName()).
- Int("rankNums", len(tuples)).
- Msg("Write a tuple")
- }
- for rankNum, tuple := range tuples {
- fieldValue := tuple.V1.(int64)
- data := tuple.V2.(flow.StreamRecord).Data().(flow.Data)
- err = multierr.Append(err, t.writeData(eventTime, timeBucket, fieldValue, data, rankNum))
+ for group, tuples := range tuplesGroups {
+ if e := t.l.Debug(); e.Enabled() {
+ e.Str("TopN", t.topNSchema.GetMetadata().GetName()).
+ Str("group", group).
+ Int("rankNums", len(tuples)).
+ Msg("Write tuples")
+ }
+ for rankNum, tuple := range tuples {
+ fieldValue := tuple.V1.(int64)
+ data := tuple.V2.(flow.StreamRecord).Data().(flow.Data)
+ err = multierr.Append(err, t.writeData(eventTime, timeBucket, fieldValue, group, data, rankNum))
+ }
}
return err
}
-func (t *topNStreamingProcessor) writeData(eventTime time.Time, timeBucket string, fieldValue int64, data flow.Data, rankNum int) error {
+func (t *topNStreamingProcessor) writeData(eventTime time.Time, timeBucket string, fieldValue int64,
+ group string, data flow.Data, rankNum int,
+) error {
var tagValues []*modelv1.TagValue
if len(t.topNSchema.GetGroupByTagNames()) > 0 {
var ok bool
- if tagValues, ok = data[2].([]*modelv1.TagValue); !ok {
+ if tagValues, ok = data[3].([]*modelv1.TagValue); !ok {
return errors.New("fail to extract tag values from topN result")
}
}
@@ -172,7 +183,7 @@ func (t *topNStreamingProcessor) writeData(eventTime time.Time, timeBucket strin
// 1. groupValues
// 2. rankNumber
// 3. timeBucket
- measureID := data[0].(string) + "_" + strconv.Itoa(rankNum) + "_" + timeBucket
+ measureID := group + "_" + strconv.Itoa(rankNum) + "_" + timeBucket
iwr := &measurev1.InternalWriteRequest{
Request: &measurev1.WriteRequest{
Metadata: t.topNSchema.GetMetadata(),
@@ -180,7 +191,7 @@ func (t *topNStreamingProcessor) writeData(eventTime time.Time, timeBucket strin
Timestamp: timestamppb.New(eventTime),
TagFamilies: []*modelv1.TagFamilyForWrite{
{
- Tags: []*modelv1.TagValue{
+ Tags: append([]*modelv1.TagValue{
// MeasureID
{
Value: &modelv1.TagValue_Id{
@@ -189,15 +200,7 @@ func (t *topNStreamingProcessor) writeData(eventTime time.Time, timeBucket strin
},
},
},
- // GroupValues for merge in post processor
- {
- Value: &modelv1.TagValue_Str{
- Str: &modelv1.Str{
- Value: data[0].(string),
- },
- },
- },
- },
+ }, data[0].(tsdb.EntityValues)...),
},
},
Fields: []*modelv1.FieldValue{
@@ -227,14 +230,15 @@ func (t *topNStreamingProcessor) downSampleTimeBucket(eventTimeMillis int64) tim
}
func (t *topNStreamingProcessor) locate(tagValues []*modelv1.TagValue, rankNum int) (tsdb.Entity, tsdb.EntityValues, common.ShardID, error) {
- if len(t.topNSchema.GetGroupByTagNames()) != len(tagValues) {
+ if len(tagValues) != 0 && len(t.topNSchema.GetGroupByTagNames()) != len(tagValues) {
return nil, nil, 0, errors.New("no enough tag values for the entity")
}
// entity prefix
// 1) source measure Name + topN aggregation Name
// 2) sort direction
// 3) rank number
- entity := make(tsdb.EntityValues, 1+1+1+len(t.topNSchema.GetGroupByTagNames()))
+ // >4) group tag values if needed
+ entity := make(tsdb.EntityValues, 1+1+1+len(tagValues))
// entity prefix
entity[0] = tsdb.StrValue(formatMeasureCompanionPrefix(t.topNSchema.GetSourceMeasure().GetName(),
t.topNSchema.GetMetadata().GetName()))
@@ -260,9 +264,12 @@ func (t *topNStreamingProcessor) start() *topNStreamingProcessor {
AllowedMaxWindows(int(t.topNSchema.GetLruSize())).
TopN(int(t.topNSchema.GetCountersNumber()),
streaming.WithSortKeyExtractor(func(record flow.StreamRecord) int64 {
- return record.Data().(flow.Data)[1].(int64)
+ return record.Data().(flow.Data)[2].(int64)
}),
orderBy(t.topNSchema.GetFieldValueSort()),
+ streaming.WithGroupKeyExtractor(func(record flow.StreamRecord) string {
+ return record.Data().(flow.Data)[1].(string)
+ }),
).To(t).Open()
go t.handleError()
return t
@@ -307,13 +314,16 @@ func (manager *topNProcessorManager) Close() error {
return err
}
-func (manager *topNProcessorManager) onMeasureWrite(request *measurev1.WriteRequest) {
+func (manager *topNProcessorManager) onMeasureWrite(request *measurev1.InternalWriteRequest) {
go func() {
manager.RLock()
defer manager.RUnlock()
for _, processorList := range manager.processorMap {
for _, processor := range processorList {
- processor.src <- flow.NewStreamRecordWithTimestampPb(request.GetDataPoint(), request.GetDataPoint().GetTimestamp())
+ processor.src <- flow.NewStreamRecordWithTimestampPb(&dataPointWithEntityValues{
+ request.GetRequest().GetDataPoint(),
+ request.GetEntityValues(),
+ }, request.GetRequest().GetDataPoint().GetTimestamp())
}
}
}()
@@ -325,6 +335,27 @@ func (manager *topNProcessorManager) createOrUpdateTopNMeasure(topNSchema *datab
return err
}
+ tagNames := manager.m.GetSchema().GetEntity().GetTagNames()
+ seriesSpecs := make([]*databasev1.TagSpec, 0, len(tagNames))
+
+ for _, tagName := range tagNames {
+ var found bool
+ for _, fSpec := range manager.m.GetSchema().GetTagFamilies() {
+ for _, tSpec := range fSpec.GetTags() {
+ if tSpec.GetName() == tagName {
+ seriesSpecs = append(seriesSpecs, tSpec)
+ found = true
+ goto CHECK
+ }
+ }
+ }
+
+ CHECK:
+ if !found {
+ return fmt.Errorf("fail to find tag spec %s", tagName)
+ }
+ }
+
// create a new "derived" measure for TopN result
newTopNMeasure := &databasev1.Measure{
Metadata: topNSchema.GetMetadata(),
@@ -332,16 +363,12 @@ func (manager *topNProcessorManager) createOrUpdateTopNMeasure(topNSchema *datab
TagFamilies: []*databasev1.TagFamilySpec{
{
Name: TopNTagFamily,
- Tags: []*databasev1.TagSpec{
+ Tags: append([]*databasev1.TagSpec{
{
Name: "measure_id",
Type: databasev1.TagType_TAG_TYPE_ID,
},
- {
- Name: "group_values",
- Type: databasev1.TagType_TAG_TYPE_STRING,
- },
- },
+ }, seriesSpecs...),
},
},
Fields: []*databasev1.FieldSpec{TopNValueFieldSpec},
@@ -425,8 +452,8 @@ func (manager *topNProcessorManager) buildFilter(criteria *modelv1.Criteria) (fl
return nil, err
}
- return func(_ context.Context, dataPoint any) bool {
- tffws := dataPoint.(*measurev1.DataPointValue).GetTagFamilies()
+ return func(_ context.Context, request any) bool {
+ tffws := request.(*dataPointWithEntityValues).GetTagFamilies()
ok, matchErr := f.Match(logical.TagFamiliesForWrite(tffws), manager.s)
if matchErr != nil {
manager.l.Err(matchErr).Msg("fail to match criteria")
@@ -445,20 +472,22 @@ func (manager *topNProcessorManager) buildMapper(fieldName string, groupByNames
}
if len(groupByNames) == 0 {
return func(_ context.Context, request any) any {
- dataPoint := request.(*measurev1.DataPointValue)
- if len(dataPoint.GetFields()) <= fieldIdx {
- manager.l.Warn().Interface("point", dataPoint).
+ dpWithEvs := request.(*dataPointWithEntityValues)
+ if len(dpWithEvs.GetFields()) <= fieldIdx {
+ manager.l.Warn().Interface("point", dpWithEvs.DataPointValue).
Str("fieldName", fieldName).
- Int("len", len(dataPoint.GetFields())).
+ Int("len", len(dpWithEvs.GetFields())).
Int("fieldIdx", fieldIdx).
Msg("out of range")
}
return flow.Data{
+ // EntityValues as identity
+ dpWithEvs.entityValues,
// save string representation of group values as the key, i.e. v1
"",
// field value as v2
// TODO: we only support int64
- dataPoint.GetFields()[fieldIdx].GetInt().GetValue(),
+ dpWithEvs.GetFields()[fieldIdx].GetInt().GetValue(),
// groupBy tag values as v3
nil,
}
@@ -469,18 +498,20 @@ func (manager *topNProcessorManager) buildMapper(fieldName string, groupByNames
return nil, err
}
return func(_ context.Context, request any) any {
- dataPoint := request.(*measurev1.DataPointValue)
+ dpWithEvs := request.(*dataPointWithEntityValues)
return flow.Data{
+ // EntityValues as identity
+ dpWithEvs.entityValues,
// save string representation of group values as the key, i.e. v1
strings.Join(transform(groupLocator, func(locator partition.TagLocator) string {
- return stringify(dataPoint.GetTagFamilies()[locator.FamilyOffset].GetTags()[locator.TagOffset])
+ return stringify(extractTagValue(dpWithEvs.DataPointValue, locator))
}), "|"),
// field value as v2
// TODO: we only support int64
- dataPoint.GetFields()[fieldIdx].GetInt().GetValue(),
+ dpWithEvs.GetFields()[fieldIdx].GetInt().GetValue(),
// groupBy tag values as v3
transform(groupLocator, func(locator partition.TagLocator) *modelv1.TagValue {
- return dataPoint.GetTagFamilies()[locator.FamilyOffset].GetTags()[locator.TagOffset]
+ return extractTagValue(dpWithEvs.DataPointValue, locator)
}),
}
}, nil
@@ -495,7 +526,7 @@ func newGroupLocator(m *databasev1.Measure, groupByNames []string) (groupTagsLoc
for _, groupByName := range groupByNames {
fIdx, tIdx, spec := pbv1.FindTagByName(m.GetTagFamilies(), groupByName)
if spec == nil {
- return nil, errors.New("tag is not found")
+ return nil, fmt.Errorf("tag %s is not found", groupByName)
}
groupTags = append(groupTags, partition.TagLocator{
FamilyOffset: fIdx,
@@ -505,6 +536,17 @@ func newGroupLocator(m *databasev1.Measure, groupByNames []string) (groupTagsLoc
return groupTags, nil
}
+func extractTagValue(dpv *measurev1.DataPointValue, locator partition.TagLocator) *modelv1.TagValue {
+ if locator.FamilyOffset >= len(dpv.GetTagFamilies()) {
+ return &modelv1.TagValue{Value: &modelv1.TagValue_Null{}}
+ }
+ tagFamily := dpv.GetTagFamilies()[locator.FamilyOffset]
+ if locator.TagOffset >= len(tagFamily.GetTags()) {
+ return &modelv1.TagValue{Value: &modelv1.TagValue_Null{}}
+ }
+ return tagFamily.GetTags()[locator.TagOffset]
+}
+
func stringify(tagValue *modelv1.TagValue) string {
switch v := tagValue.GetValue().(type) {
case *modelv1.TagValue_Str:
@@ -522,7 +564,7 @@ func stringify(tagValue *modelv1.TagValue) string {
case *modelv1.TagValue_StrArray:
return strings.Join(v.StrArray.GetValue(), ",")
default:
- return "<nil>"
+ return ""
}
}
diff --git a/banyand/measure/measure_write.go b/banyand/measure/measure_write.go
index 2c2eb233..b3615741 100644
--- a/banyand/measure/measure_write.go
+++ b/banyand/measure/measure_write.go
@@ -131,9 +131,12 @@ func (s *measure) write(shardID common.ShardID, entity []byte, entityValues tsdb
}
s.indexWriter.Write(m)
if s.processorManager != nil {
- s.processorManager.onMeasureWrite(&measurev1.WriteRequest{
- Metadata: s.GetMetadata(),
- DataPoint: value,
+ s.processorManager.onMeasureWrite(&measurev1.InternalWriteRequest{
+ Request: &measurev1.WriteRequest{
+ Metadata: s.GetMetadata(),
+ DataPoint: value,
+ },
+ EntityValues: entityValues[1:],
})
}
return err
diff --git a/banyand/query/processor_topn.go b/banyand/query/processor_topn.go
index e72520af..336dc8e6 100644
--- a/banyand/query/processor_topn.go
+++ b/banyand/query/processor_topn.go
@@ -130,7 +130,7 @@ func (t *topNQueryProcessor) Rev(message bus.Message) (resp bus.Message) {
Msg("fail to parse topN family")
return
}
- _ = aggregator.put(tuple.V1.(string), tuple.V2.(int64), iter.Val().Time())
+ _ = aggregator.put(tuple.V1.([]*modelv1.TagValue), tuple.V2.(int64), iter.Val().Time())
}
_ = iter.Close()
}
@@ -138,12 +138,14 @@ func (t *topNQueryProcessor) Rev(message bus.Message) (resp bus.Message) {
}
now := time.Now().UnixNano()
- resp = bus.NewMessage(bus.MessageID(now), aggregator.val())
+ resp = bus.NewMessage(bus.MessageID(now), aggregator.val(sourceMeasure.GetSchema().GetEntity().GetTagNames()))
return
}
-func locateEntity(topNSchema *databasev1.TopNAggregation, sortDirection modelv1.Sort, conditions []*modelv1.Condition) (tsdb.Entity, error) {
+func locateEntity(topNSchema *databasev1.TopNAggregation, sortDirection modelv1.Sort,
+ conditions []*modelv1.Condition,
+) (tsdb.Entity, error) {
entityMap := make(map[string]int)
entity := make([]tsdb.Entry, 1+1+len(topNSchema.GetGroupByTagNames()))
// sortDirection
@@ -152,7 +154,7 @@ func locateEntity(topNSchema *databasev1.TopNAggregation, sortDirection modelv1.
entity[1] = tsdb.AnyEntry
for idx, tagName := range topNSchema.GetGroupByTagNames() {
entityMap[tagName] = idx + 2
- // fill AnyEntry by default
+ // allow to make fuzzy search with partial conditions
entity[idx+2] = tsdb.AnyEntry
}
for _, pairQuery := range conditions {
@@ -174,6 +176,7 @@ func locateEntity(topNSchema *databasev1.TopNAggregation, sortDirection modelv1.
}
return nil, errors.New("only groupBy tag name is supported")
}
+
return entity, nil
}
@@ -198,7 +201,7 @@ func parseTopNFamily(item tsdb.Item, interval time.Duration) (*streaming.Tuple2,
}
return &streaming.Tuple2{
// GroupValues
- V1: tagFamily.GetTags()[1].GetStr().GetValue(),
+ V1: tagFamily.GetTags()[1:],
// FieldValue
V2: fieldValue.GetInt().GetValue(),
}, nil
@@ -238,13 +241,25 @@ var _ heap.Interface = (*postAggregationProcessor)(nil)
type aggregatorItem struct {
int64Func aggregation.Func[int64]
key string
+ values tsdb.EntityValues
index int
}
+func (n *aggregatorItem) GetTags(tagNames []string) []*modelv1.Tag {
+ tags := make([]*modelv1.Tag, len(n.values))
+ for i := 0; i < len(tags); i++ {
+ tags[i] = &modelv1.Tag{
+ Key: tagNames[i],
+ Value: n.values[i],
+ }
+ }
+ return tags
+}
+
// postProcessor defines necessary methods for Top-N post processor with or without aggregation.
type postProcessor interface {
- put(key string, val int64, timestampMillis uint64) error
- val() []*measurev1.TopNList
+ put(entityValues tsdb.EntityValues, val int64, timestampMillis uint64) error
+ val([]string) []*measurev1.TopNList
}
func createTopNPostAggregator(topN int32, aggrFunc modelv1.AggregationFunction, sort modelv1.Sort) postProcessor {
@@ -261,6 +276,7 @@ func createTopNPostAggregator(topN int32, aggrFunc modelv1.AggregationFunction,
sort: sort,
aggrFunc: aggrFunc,
cache: make(map[string]*aggregatorItem),
+ items: make([]*aggregatorItem, 0, topN),
}
heap.Init(aggregator)
return aggregator
@@ -313,14 +329,15 @@ func (aggr *postAggregationProcessor) Pop() any {
return item
}
-func (aggr *postAggregationProcessor) put(key string, val int64, timestampMillis uint64) error {
+func (aggr *postAggregationProcessor) put(entityValues tsdb.EntityValues, val int64, timestampMillis uint64) error {
// update latest ts
if aggr.latestTimestamp < timestampMillis {
aggr.latestTimestamp = timestampMillis
}
+ key := entityValues.String()
if item, found := aggr.cache[key]; found {
item.int64Func.In(val)
- heap.Fix(aggr, item.index)
+ aggr.tryEnqueue(key, item)
return nil
}
@@ -331,6 +348,7 @@ func (aggr *postAggregationProcessor) put(key string, val int64, timestampMillis
item := &aggregatorItem{
key: key,
int64Func: aggrFunc,
+ values: entityValues,
}
item.int64Func.In(val)
@@ -338,29 +356,33 @@ func (aggr *postAggregationProcessor) put(key string, val int64, timestampMillis
aggr.cache[key] = item
heap.Push(aggr, item)
} else {
- if lowest := aggr.items[0]; lowest != nil {
- if aggr.sort == modelv1.Sort_SORT_DESC && lowest.int64Func.Val() < val {
- aggr.cache[key] = item
- aggr.items[0] = item
- heap.Fix(aggr, 0)
- } else if aggr.sort != modelv1.Sort_SORT_DESC && lowest.int64Func.Val() > val {
- aggr.cache[key] = item
- aggr.items[0] = item
- heap.Fix(aggr, 0)
- }
- }
+ aggr.tryEnqueue(key, item)
}
return nil
}
-func (aggr *postAggregationProcessor) val() []*measurev1.TopNList {
+func (aggr *postAggregationProcessor) tryEnqueue(key string, item *aggregatorItem) {
+ if lowest := aggr.items[0]; lowest != nil {
+ if aggr.sort == modelv1.Sort_SORT_DESC && lowest.int64Func.Val() < item.int64Func.Val() {
+ aggr.cache[key] = item
+ aggr.items[0] = item
+ heap.Fix(aggr, 0)
+ } else if aggr.sort != modelv1.Sort_SORT_DESC && lowest.int64Func.Val() > item.int64Func.Val() {
+ aggr.cache[key] = item
+ aggr.items[0] = item
+ heap.Fix(aggr, 0)
+ }
+ }
+}
+
+func (aggr *postAggregationProcessor) val(tagNames []string) []*measurev1.TopNList {
topNItems := make([]*measurev1.TopNList_Item, aggr.Len())
for aggr.Len() > 0 {
item := heap.Pop(aggr).(*aggregatorItem)
topNItems[aggr.Len()] = &measurev1.TopNList_Item{
- Name: item.key,
+ Entity: item.GetTags(tagNames),
Value: &modelv1.FieldValue{
Value: &modelv1.FieldValue_Int{
Int: &modelv1.Int{Value: item.int64Func.Val()},
@@ -379,9 +401,21 @@ func (aggr *postAggregationProcessor) val() []*measurev1.TopNList {
var _ flow.Element = (*nonAggregatorItem)(nil)
type nonAggregatorItem struct {
- key string
- val int64
- index int
+ key string
+ values tsdb.EntityValues
+ val int64
+ index int
+}
+
+func (n *nonAggregatorItem) GetTags(tagNames []string) []*modelv1.Tag {
+ tags := make([]*modelv1.Tag, len(n.values))
+ for i := 0; i < len(tags); i++ {
+ tags[i] = &modelv1.Tag{
+ Key: tagNames[i],
+ Value: n.values[i],
+ }
+ }
+ return tags
}
func (n *nonAggregatorItem) GetIndex() int {
@@ -398,13 +432,13 @@ type postNonAggregationProcessor struct {
sort modelv1.Sort
}
-func (naggr *postNonAggregationProcessor) val() []*measurev1.TopNList {
+func (naggr *postNonAggregationProcessor) val(tagNames []string) []*measurev1.TopNList {
topNLists := make([]*measurev1.TopNList, 0, len(naggr.timelines))
for ts, timeline := range naggr.timelines {
items := make([]*measurev1.TopNList_Item, timeline.Len())
for idx, elem := range timeline.Values() {
items[idx] = &measurev1.TopNList_Item{
- Name: elem.(*nonAggregatorItem).key,
+ Entity: elem.(*nonAggregatorItem).GetTags(tagNames),
Value: &modelv1.FieldValue{
Value: &modelv1.FieldValue_Int{
Int: &modelv1.Int{Value: elem.(*nonAggregatorItem).val},
@@ -430,16 +464,17 @@ func (naggr *postNonAggregationProcessor) val() []*measurev1.TopNList {
return topNLists
}
-func (naggr *postNonAggregationProcessor) put(key string, val int64, timestampMillis uint64) error {
+func (naggr *postNonAggregationProcessor) put(entityValues tsdb.EntityValues, val int64, timestampMillis uint64) error {
+ key := entityValues.String()
if timeline, ok := naggr.timelines[timestampMillis]; ok {
if timeline.Len() < int(naggr.topN) {
- heap.Push(timeline, &nonAggregatorItem{val: val, key: key})
+ heap.Push(timeline, &nonAggregatorItem{val: val, key: key, values: entityValues})
} else {
if lowest := timeline.Peek(); lowest != nil {
if naggr.sort == modelv1.Sort_SORT_DESC && lowest.(*nonAggregatorItem).val < val {
- timeline.ReplaceLowest(&nonAggregatorItem{val: val, key: key})
+ timeline.ReplaceLowest(&nonAggregatorItem{val: val, key: key, values: entityValues})
} else if naggr.sort != modelv1.Sort_SORT_DESC && lowest.(*nonAggregatorItem).val > val {
- timeline.ReplaceLowest(&nonAggregatorItem{val: val, key: key})
+ timeline.ReplaceLowest(&nonAggregatorItem{val: val, key: key, values: entityValues})
}
}
}
@@ -465,7 +500,7 @@ func (naggr *postNonAggregationProcessor) put(key string, val int64, timestampMi
}
}, false)
naggr.timelines[timestampMillis] = timeline
- heap.Push(timeline, &nonAggregatorItem{val: val, key: key})
+ heap.Push(timeline, &nonAggregatorItem{val: val, key: key, values: entityValues})
return nil
}
diff --git a/banyand/tsdb/seriesdb.go b/banyand/tsdb/seriesdb.go
index 33e1798c..3fc996c4 100644
--- a/banyand/tsdb/seriesdb.go
+++ b/banyand/tsdb/seriesdb.go
@@ -92,7 +92,7 @@ func NewEntity(length int) Entity {
}
// EntityValue represents the value of a tag which is a part of an entity.
-type EntityValue *modelv1.TagValue
+type EntityValue = *modelv1.TagValue
// EntityValueToEntry transforms EntityValue to Entry.
func EntityValueToEntry(ev EntityValue) (Entry, error) {
@@ -404,6 +404,10 @@ func (s *seriesDB) List(ctx context.Context, path Path) (SeriesList, error) {
errScan := s.seriesMetadata.Scan(prefix, prepend(path.seekKey, entityPrefix), kv.DefaultScanOpts, func(key []byte, getVal func() ([]byte, error)) error {
key = key[entityPrefixLen:]
comparableKey := make([]byte, len(key))
+ // avoid slice out of bound
+ if len(key) > len(path.mask) {
+ return nil
+ }
for i, b := range key {
comparableKey[i] = path.mask[i] & b
}
diff --git a/docs/api-reference.md b/docs/api-reference.md
index 6d05c457..71ca74c4 100644
--- a/docs/api-reference.md
+++ b/docs/api-reference.md
@@ -2468,7 +2468,7 @@ TopNList contains a series of topN items
| Field | Type | Label | Description |
| ----- | ---- | ----- | ----------- |
-| name | [string](#string) | | |
+| entity | [banyandb.model.v1.Tag](#banyandb-model-v1-Tag) | repeated | |
| value | [banyandb.model.v1.FieldValue](#banyandb-model-v1-FieldValue) | | |
diff --git a/pkg/flow/streaming/streaming_test.go b/pkg/flow/streaming/streaming_test.go
index a89e10ef..437af1ec 100644
--- a/pkg/flow/streaming/streaming_test.go
+++ b/pkg/flow/streaming/streaming_test.go
@@ -151,14 +151,16 @@ var _ = Describe("Streaming", func() {
Window(NewTumblingTimeWindows(15*time.Second)).
TopN(3, WithSortKeyExtractor(func(record flow.StreamRecord) int64 {
return record.Data().(flow.Data)[1].(int64)
- }), OrderBy(ASC)).
+ }), OrderBy(ASC), WithGroupKeyExtractor(func(record flow.StreamRecord) string {
+ return record.Data().(flow.Data)[0].(string)
+ })).
To(snk)
errCh = f.Open()
Expect(errCh).ShouldNot(BeNil())
})
- When("Top3", func() {
+ When("Bottom3", func() {
BeforeEach(func() {
input = []flow.StreamRecord{
flow.NewStreamRecord(&record{"e2e-service-provider", "instance-001", 10000}, 1000),
@@ -172,14 +174,21 @@ var _ = Describe("Streaming", func() {
}
})
- It("Should take top 3 elements", func() {
+ It("Should take bottom 3 elements", func() {
Eventually(func(g Gomega) {
g.Expect(len(snk.Value())).Should(BeNumerically(">=", 1))
- g.Expect(snk.Value()[0].(flow.StreamRecord).Data()).Should(BeEquivalentTo([]*Tuple2{
+ // e2e-service-consumer Group
+ g.Expect(snk.Value()[0].(flow.StreamRecord).Data().(map[string][]*Tuple2)["e2e-service-consumer"]).Should(BeEquivalentTo([]*Tuple2{
{int64(9500), flow.NewStreamRecord(flow.Data{"e2e-service-consumer", int64(9500)}, 7000)},
{int64(9600), flow.NewStreamRecord(flow.Data{"e2e-service-consumer", int64(9600)}, 6000)},
{int64(9700), flow.NewStreamRecord(flow.Data{"e2e-service-consumer", int64(9700)}, 4000)},
}))
+ // e2e-service-provider Group
+ g.Expect(snk.Value()[0].(flow.StreamRecord).Data().(map[string][]*Tuple2)["e2e-service-provider"]).Should(BeEquivalentTo([]*Tuple2{
+ {int64(9700), flow.NewStreamRecord(flow.Data{"e2e-service-provider", int64(9700)}, 5000)},
+ {int64(9800), flow.NewStreamRecord(flow.Data{"e2e-service-provider", int64(9800)}, 3000)},
+ {int64(10000), flow.NewStreamRecord(flow.Data{"e2e-service-provider", int64(10000)}, 1000)},
+ }))
}).WithTimeout(flags.EventuallyTimeout).Should(Succeed())
})
})
@@ -205,6 +214,8 @@ var _ = Describe("Streaming", func() {
Window(NewTumblingTimeWindows(15*time.Second)).
TopN(3, WithSortKeyExtractor(func(record flow.StreamRecord) int64 {
return record.Data().(flow.Data)[1].(int64)
+ }), WithGroupKeyExtractor(func(record flow.StreamRecord) string {
+ return record.Data().(flow.Data)[0].(string)
})).
To(snk)
@@ -229,10 +240,17 @@ var _ = Describe("Streaming", func() {
It("Should take top 3 elements", func() {
Eventually(func(g Gomega) {
g.Expect(len(snk.Value())).Should(BeNumerically(">=", 1))
- g.Expect(snk.Value()[0].(flow.StreamRecord).Data()).Should(BeEquivalentTo([]*Tuple2{
- {int64(10000), flow.NewStreamRecord(flow.Data{"e2e-service-provider", int64(10000)}, 1000)},
+ // e2e-service-consumer Group
+ g.Expect(snk.Value()[0].(flow.StreamRecord).Data().(map[string][]*Tuple2)["e2e-service-consumer"]).Should(BeEquivalentTo([]*Tuple2{
{int64(9900), flow.NewStreamRecord(flow.Data{"e2e-service-consumer", int64(9900)}, 2000)},
+ {int64(9700), flow.NewStreamRecord(flow.Data{"e2e-service-consumer", int64(9700)}, 4000)},
+ {int64(9600), flow.NewStreamRecord(flow.Data{"e2e-service-consumer", int64(9600)}, 6000)},
+ }))
+ // e2e-service-provider Group
+ g.Expect(snk.Value()[0].(flow.StreamRecord).Data().(map[string][]*Tuple2)["e2e-service-provider"]).Should(BeEquivalentTo([]*Tuple2{
+ {int64(10000), flow.NewStreamRecord(flow.Data{"e2e-service-provider", int64(10000)}, 1000)},
{int64(9800), flow.NewStreamRecord(flow.Data{"e2e-service-provider", int64(9800)}, 3000)},
+ {int64(9700), flow.NewStreamRecord(flow.Data{"e2e-service-provider", int64(9700)}, 5000)},
}))
}).WithTimeout(flags.EventuallyTimeout).Should(Succeed())
})
diff --git a/pkg/flow/streaming/topn.go b/pkg/flow/streaming/topn.go
index de63c7d8..746faa66 100644
--- a/pkg/flow/streaming/topn.go
+++ b/pkg/flow/streaming/topn.go
@@ -41,10 +41,9 @@ type windowedFlow struct {
func (s *windowedFlow) TopN(topNum int, opts ...any) flow.Flow {
s.wa.(*tumblingTimeWindows).aggregationFactory = func() flow.AggregationOp {
- topNAggrFunc := &topNAggregator{
+ topNAggrFunc := &topNAggregatorGroup{
cacheSize: topNum,
sort: DESC,
- dirty: false,
}
// apply user customized options
for _, opt := range opts {
@@ -62,50 +61,105 @@ func (s *windowedFlow) TopN(topNum int, opts ...any) flow.Flow {
return utils.Int64Comparator(b, a)
}
}
- topNAggrFunc.treeMap = treemap.NewWith(topNAggrFunc.comparator)
+ topNAggrFunc.aggregatorGroup = make(map[string]*topNAggregator)
return topNAggrFunc
}
return s.f
}
+type topNAggregatorGroup struct {
+ aggregatorGroup map[string]*topNAggregator
+ sortKeyExtractor func(flow.StreamRecord) int64
+ groupKeyExtractor func(flow.StreamRecord) string
+ comparator utils.Comparator
+ cacheSize int
+ sort TopNSort
+}
+
type topNAggregator struct {
- treeMap *treemap.Map
- sortKeyExtractor func(flow.StreamRecord) int64
- comparator utils.Comparator
- cacheSize int
- currentTopNum int
- sort TopNSort
- dirty bool
+ *topNAggregatorGroup
+ treeMap *treemap.Map
+ currentTopNum int
+ dirty bool
}
-// TopNOption is the option to set up a top-n aggregator.
-type TopNOption func(aggregator *topNAggregator)
+// TopNOption is the option to set up a top-n aggregator group.
+type TopNOption func(aggregator *topNAggregatorGroup)
// WithSortKeyExtractor sets a closure to extract the sorting key.
func WithSortKeyExtractor(sortKeyExtractor func(flow.StreamRecord) int64) TopNOption {
- return func(aggregator *topNAggregator) {
+ return func(aggregator *topNAggregatorGroup) {
aggregator.sortKeyExtractor = sortKeyExtractor
}
}
+// WithGroupKeyExtractor extract group key from the StreamRecord.
+func WithGroupKeyExtractor(groupKeyExtractor func(flow.StreamRecord) string) TopNOption {
+ return func(aggregator *topNAggregatorGroup) {
+ aggregator.groupKeyExtractor = groupKeyExtractor
+ }
+}
+
// OrderBy sets the sorting order.
func OrderBy(sort TopNSort) TopNOption {
- return func(aggregator *topNAggregator) {
+ return func(aggregator *topNAggregatorGroup) {
aggregator.sort = sort
}
}
-func (t *topNAggregator) Add(input []flow.StreamRecord) {
+func (t *topNAggregatorGroup) Add(input []flow.StreamRecord) {
for _, item := range input {
sortKey := t.sortKeyExtractor(item)
- // check
- if t.checkSortKeyInBufferRange(sortKey) {
- t.put(sortKey, item)
- t.doCleanUp()
+ groupKey := t.groupKeyExtractor(item)
+ aggregator := t.getOrCreateGroup(groupKey)
+ if aggregator.checkSortKeyInBufferRange(sortKey) {
+ aggregator.put(sortKey, item)
+ aggregator.doCleanUp()
}
}
}
+func (t *topNAggregatorGroup) Snapshot() interface{} {
+ groupRanks := make(map[string][]*Tuple2)
+ for group, aggregator := range t.aggregatorGroup {
+ if !aggregator.dirty {
+ continue
+ }
+ aggregator.dirty = false
+ iter := aggregator.treeMap.Iterator()
+ items := make([]*Tuple2, 0, aggregator.currentTopNum)
+ for iter.Next() {
+ list := iter.Value().([]interface{})
+ for _, item := range list {
+ items = append(items, &Tuple2{iter.Key(), item})
+ }
+ }
+ groupRanks[group] = items
+ }
+ return groupRanks
+}
+
+func (t *topNAggregatorGroup) Dirty() bool {
+ for _, aggregator := range t.aggregatorGroup {
+ if aggregator.dirty {
+ return true
+ }
+ }
+ return false
+}
+
+func (t *topNAggregatorGroup) getOrCreateGroup(group string) *topNAggregator {
+ aggregator, groupExist := t.aggregatorGroup[group]
+ if groupExist {
+ return aggregator
+ }
+ t.aggregatorGroup[group] = &topNAggregator{
+ topNAggregatorGroup: t,
+ treeMap: treemap.NewWith(t.comparator),
+ }
+ return t.aggregatorGroup[group]
+}
+
func (t *topNAggregator) doCleanUp() {
// do cleanup: maintain the treeMap windowSize
if t.currentTopNum > t.cacheSize {
@@ -148,23 +202,6 @@ func (t *topNAggregator) checkSortKeyInBufferRange(sortKey int64) bool {
return t.currentTopNum < t.cacheSize
}
-func (t *topNAggregator) Snapshot() interface{} {
- t.dirty = false
- iter := t.treeMap.Iterator()
- items := make([]*Tuple2, 0, t.currentTopNum)
- for iter.Next() {
- list := iter.Value().([]interface{})
- for _, item := range list {
- items = append(items, &Tuple2{iter.Key(), item})
- }
- }
- return items
-}
-
-func (t *topNAggregator) Dirty() bool {
- return t.dirty
-}
-
// Tuple2 is a tuple with 2 fields. Each field may be a separate type.
type Tuple2 struct {
V1 interface{} `json:"v1"`
diff --git a/pkg/flow/streaming/topn_test.go b/pkg/flow/streaming/topn_test.go
index bc2592ee..988fe825 100644
--- a/pkg/flow/streaming/topn_test.go
+++ b/pkg/flow/streaming/topn_test.go
@@ -20,8 +20,8 @@ package streaming
import (
"testing"
- "github.com/emirpasic/gods/maps/treemap"
"github.com/emirpasic/gods/utils"
+ "github.com/google/go-cmp/cmp"
"github.com/stretchr/testify/require"
"github.com/apache/skywalking-banyandb/pkg/flow"
@@ -29,7 +29,7 @@ import (
func TestFlow_TopN_Aggregator(t *testing.T) {
input := []flow.StreamRecord{
- // 1. string
+ // 1. group by values
// 2. number
// 3. slices of groupBy values
flow.NewStreamRecordWithoutTS(flow.Data{"e2e-service-provider", 10000, []interface{}{"e2e-service-provider"}}),
@@ -42,35 +42,56 @@ func TestFlow_TopN_Aggregator(t *testing.T) {
flow.NewStreamRecordWithoutTS(flow.Data{"e2e-service-consumer", 9500, []interface{}{"e2e-service-consumer"}}),
}
tests := []struct {
+ expected map[string][]*Tuple2
name string
- expected []*Tuple2
sort TopNSort
}{
{
name: "DESC",
sort: DESC,
- expected: []*Tuple2{
- {int64(10000), flow.NewStreamRecordWithoutTS(flow.Data{"e2e-service-provider", 10000, []interface{}{"e2e-service-provider"}})},
- {int64(9900), flow.NewStreamRecordWithoutTS(flow.Data{"e2e-service-consumer", 9900, []interface{}{"e2e-service-consumer"}})},
- {int64(9800), flow.NewStreamRecordWithoutTS(flow.Data{"e2e-service-provider", 9800, []interface{}{"e2e-service-provider"}})},
+ expected: map[string][]*Tuple2{
+ "e2e-service-provider": {
+ {int64(10000), flow.NewStreamRecordWithoutTS(flow.Data{"e2e-service-provider", 10000, []interface{}{"e2e-service-provider"}})},
+ {int64(9800), flow.NewStreamRecordWithoutTS(flow.Data{"e2e-service-provider", 9800, []interface{}{"e2e-service-provider"}})},
+ {int64(9700), flow.NewStreamRecordWithoutTS(flow.Data{"e2e-service-provider", 9700, []interface{}{"e2e-service-provider"}})},
+ },
+ "e2e-service-consumer": {
+ {int64(9900), flow.NewStreamRecordWithoutTS(flow.Data{"e2e-service-consumer", 9900, []interface{}{"e2e-service-consumer"}})},
+ {int64(9800), flow.NewStreamRecordWithoutTS(flow.Data{"e2e-service-consumer", 9800, []interface{}{"e2e-service-consumer"}})},
+ {int64(9700), flow.NewStreamRecordWithoutTS(flow.Data{"e2e-service-consumer", 9700, []interface{}{"e2e-service-consumer"}})},
+ },
},
},
{
name: "DESC by default",
sort: 0,
- expected: []*Tuple2{
- {int64(10000), flow.NewStreamRecordWithoutTS(flow.Data{"e2e-service-provider", 10000, []interface{}{"e2e-service-provider"}})},
- {int64(9900), flow.NewStreamRecordWithoutTS(flow.Data{"e2e-service-consumer", 9900, []interface{}{"e2e-service-consumer"}})},
- {int64(9800), flow.NewStreamRecordWithoutTS(flow.Data{"e2e-service-provider", 9800, []interface{}{"e2e-service-provider"}})},
+ expected: map[string][]*Tuple2{
+ "e2e-service-provider": {
+ {int64(10000), flow.NewStreamRecordWithoutTS(flow.Data{"e2e-service-provider", 10000, []interface{}{"e2e-service-provider"}})},
+ {int64(9800), flow.NewStreamRecordWithoutTS(flow.Data{"e2e-service-provider", 9800, []interface{}{"e2e-service-provider"}})},
+ {int64(9700), flow.NewStreamRecordWithoutTS(flow.Data{"e2e-service-provider", 9700, []interface{}{"e2e-service-provider"}})},
+ },
+ "e2e-service-consumer": {
+ {int64(9900), flow.NewStreamRecordWithoutTS(flow.Data{"e2e-service-consumer", 9900, []interface{}{"e2e-service-consumer"}})},
+ {int64(9800), flow.NewStreamRecordWithoutTS(flow.Data{"e2e-service-consumer", 9800, []interface{}{"e2e-service-consumer"}})},
+ {int64(9700), flow.NewStreamRecordWithoutTS(flow.Data{"e2e-service-consumer", 9700, []interface{}{"e2e-service-consumer"}})},
+ },
},
},
{
name: "ASC",
sort: ASC,
- expected: []*Tuple2{
- {int64(9500), flow.NewStreamRecordWithoutTS(flow.Data{"e2e-service-consumer", 9500, []interface{}{"e2e-service-consumer"}})},
- {int64(9600), flow.NewStreamRecordWithoutTS(flow.Data{"e2e-service-consumer", 9600, []interface{}{"e2e-service-consumer"}})},
- {int64(9700), flow.NewStreamRecordWithoutTS(flow.Data{"e2e-service-consumer", 9700, []interface{}{"e2e-service-consumer"}})},
+ expected: map[string][]*Tuple2{
+ "e2e-service-consumer": {
+ {int64(9500), flow.NewStreamRecordWithoutTS(flow.Data{"e2e-service-consumer", 9500, []interface{}{"e2e-service-consumer"}})},
+ {int64(9600), flow.NewStreamRecordWithoutTS(flow.Data{"e2e-service-consumer", 9600, []interface{}{"e2e-service-consumer"}})},
+ {int64(9700), flow.NewStreamRecordWithoutTS(flow.Data{"e2e-service-consumer", 9700, []interface{}{"e2e-service-consumer"}})},
+ },
+ "e2e-service-provider": {
+ {int64(9700), flow.NewStreamRecordWithoutTS(flow.Data{"e2e-service-provider", 9700, []interface{}{"e2e-service-provider"}})},
+ {int64(9800), flow.NewStreamRecordWithoutTS(flow.Data{"e2e-service-provider", 9800, []interface{}{"e2e-service-provider"}})},
+ {int64(10000), flow.NewStreamRecordWithoutTS(flow.Data{"e2e-service-provider", 10000, []interface{}{"e2e-service-provider"}})},
+ },
},
},
}
@@ -85,18 +106,26 @@ func TestFlow_TopN_Aggregator(t *testing.T) {
} else {
comparator = utils.Int64Comparator
}
- topN := &topNAggregator{
- cacheSize: 3,
- sort: tt.sort,
- comparator: comparator,
- treeMap: treemap.NewWith(comparator),
+ topN := &topNAggregatorGroup{
+ cacheSize: 3,
+ sort: tt.sort,
+ comparator: comparator,
+ aggregatorGroup: make(map[string]*topNAggregator),
sortKeyExtractor: func(record flow.StreamRecord) int64 {
return int64(record.Data().(flow.Data)[1].(int))
},
+ groupKeyExtractor: func(record flow.StreamRecord) string {
+ return record.Data().(flow.Data)[0].(string)
+ },
}
topN.Add(input)
- require.Len(topN.Snapshot(), 3)
- require.Equal(tt.expected, topN.Snapshot())
+ snapshot := topN.Snapshot()
+ require.Len(snapshot, 2)
+ require.Contains(snapshot, "e2e-service-provider") // provider group
+ require.Contains(snapshot, "e2e-service-consumer") // consumer group
+ if diff := cmp.Diff(tt.expected, snapshot); diff != "" {
+ t.Errorf("Snapshot() mismatch (-want +got):\n%s", diff)
+ }
})
}
}
diff --git a/pkg/flow/types.go b/pkg/flow/types.go
index 778f9a7d..538ce849 100644
--- a/pkg/flow/types.go
+++ b/pkg/flow/types.go
@@ -23,6 +23,7 @@ import (
"io"
"sync"
+ "github.com/google/go-cmp/cmp"
"google.golang.org/protobuf/types/known/timestamppb"
)
@@ -136,6 +137,11 @@ func (sr StreamRecord) Data() interface{} {
return sr.data
}
+// Equal checks if two StreamRecord are the same.
+func (sr StreamRecord) Equal(other StreamRecord) bool {
+ return sr.ts == other.ts && cmp.Equal(sr.data, other.data)
+}
+
// Inlet represents a type that exposes one open input.
//
//go:generate mockgen -destination=./inlet_mock.go -package=flow github.com/apache/skywalking-banyandb/pkg/flow Inlet
diff --git a/pkg/test/measure/testdata/measures/service_instance_endpoint_cpm_minute.json b/pkg/test/measure/testdata/measures/service_instance_endpoint_cpm_minute.json
new file mode 100644
index 00000000..47241abc
--- /dev/null
+++ b/pkg/test/measure/testdata/measures/service_instance_endpoint_cpm_minute.json
@@ -0,0 +1,51 @@
+{
+ "metadata": {
+ "name": "service_instance_endpoint_cpm_minute",
+ "group": "sw_metric"
+ },
+ "tag_families": [
+ {
+ "name": "default",
+ "tags": [
+ {
+ "name": "id",
+ "type": "TAG_TYPE_ID"
+ },
+ {
+ "name": "entity_id",
+ "type": "TAG_TYPE_STRING"
+ },
+ {
+ "name": "service_id",
+ "type": "TAG_TYPE_STRING"
+ },
+ {
+ "name": "http.uri",
+ "type": "TAG_TYPE_STRING"
+ }
+ ]
+ }
+ ],
+ "fields": [
+ {
+ "name": "total",
+ "field_type": "FIELD_TYPE_INT",
+ "encoding_method": "ENCODING_METHOD_GORILLA",
+ "compression_method": "COMPRESSION_METHOD_ZSTD"
+ },
+ {
+ "name": "value",
+ "field_type": "FIELD_TYPE_INT",
+ "encoding_method": "ENCODING_METHOD_GORILLA",
+ "compression_method": "COMPRESSION_METHOD_ZSTD"
+ }
+ ],
+ "entity": {
+ "tag_names": [
+ "service_id",
+ "entity_id"
+ ]
+ },
+ "interval": "1m",
+ "updated_at": "2021-04-15T01:30:15.01Z"
+}
\ No newline at end of file
diff --git a/pkg/test/measure/testdata/topn_aggregations/service_cpm_minute_nogroup_top100.json b/pkg/test/measure/testdata/topn_aggregations/service_cpm_minute_nogroup_top100.json
deleted file mode 100644
index a4e1456c..00000000
--- a/pkg/test/measure/testdata/topn_aggregations/service_cpm_minute_nogroup_top100.json
+++ /dev/null
@@ -1,14 +0,0 @@
-{
- "metadata": {
- "name": "service_cpm_minute_no_group_by_top100",
- "group": "sw_metric"
- },
- "source_measure": {
- "name": "service_cpm_minute",
- "group": "sw_metric"
- },
- "field_name": "value",
- "field_value_sort": 1,
- "counters_number": 1000,
- "lru_size": 10
-}
\ No newline at end of file
diff --git a/pkg/test/measure/testdata/topn_aggregations/service_cpm_minute_top_bottom100.json b/pkg/test/measure/testdata/topn_aggregations/service_instance_cpm_minute_top_bottom_100.json
similarity index 65%
copy from pkg/test/measure/testdata/topn_aggregations/service_cpm_minute_top_bottom100.json
copy to pkg/test/measure/testdata/topn_aggregations/service_instance_cpm_minute_top_bottom_100.json
index fec9f49d..08d6a109 100644
--- a/pkg/test/measure/testdata/topn_aggregations/service_cpm_minute_top_bottom100.json
+++ b/pkg/test/measure/testdata/topn_aggregations/service_instance_cpm_minute_top_bottom_100.json
@@ -1,16 +1,16 @@
{
"metadata": {
- "name": "service_cpm_minute_top_bottom_100",
+ "name": "service_instance_cpm_minute_top_bottom_100",
"group": "sw_metric"
},
"source_measure": {
- "name": "service_cpm_minute",
+ "name": "service_instance_cpm_minute",
"group": "sw_metric"
},
"field_name": "value",
"field_value_sort": 0,
"group_by_tag_names": [
- "entity_id"
+ "service_id"
],
"counters_number": 1000,
"lru_size": 10
diff --git a/pkg/test/measure/testdata/topn_aggregations/service_cpm_minute_top_bottom100.json b/pkg/test/measure/testdata/topn_aggregations/service_instance_endpoint_cpm_minute_top_bottom_100.json
similarity index 62%
rename from pkg/test/measure/testdata/topn_aggregations/service_cpm_minute_top_bottom100.json
rename to pkg/test/measure/testdata/topn_aggregations/service_instance_endpoint_cpm_minute_top_bottom_100.json
index fec9f49d..65f3af15 100644
--- a/pkg/test/measure/testdata/topn_aggregations/service_cpm_minute_top_bottom100.json
+++ b/pkg/test/measure/testdata/topn_aggregations/service_instance_endpoint_cpm_minute_top_bottom_100.json
@@ -1,16 +1,16 @@
{
"metadata": {
- "name": "service_cpm_minute_top_bottom_100",
+ "name": "service_instance_endpoint_cpm_minute_top_bottom_100",
"group": "sw_metric"
},
"source_measure": {
- "name": "service_cpm_minute",
+ "name": "service_instance_endpoint_cpm_minute",
"group": "sw_metric"
},
"field_name": "value",
"field_value_sort": 0,
"group_by_tag_names": [
- "entity_id"
+ "http.uri"
],
"counters_number": 1000,
"lru_size": 10
diff --git a/test/cases/measure/data/testdata/service_instance_cpm_minute_data.json b/test/cases/measure/data/testdata/service_instance_cpm_minute_data.json
new file mode 100644
index 00000000..b67eef16
--- /dev/null
+++ b/test/cases/measure/data/testdata/service_instance_cpm_minute_data.json
@@ -0,0 +1,212 @@
+[
+ {
+ "tag_families": [
+ {
+ "tags": [
+ {
+ "id": {
+ "value": "1"
+ }
+ },
+ {
+ "str": {
+ "value": "entity_1"
+ }
+ },
+ {
+ "str": {
+ "value": "svc_1"
+ }
+ }
+ ]
+ }
+ ],
+ "fields": [
+ {
+ "int": {
+ "value": 100
+ }
+ },
+ {
+ "int": {
+ "value": 1
+ }
+ }
+ ]
+ },
+ {
+ "tag_families": [
+ {
+ "tags": [
+ {
+ "id": {
+ "value": "4"
+ }
+ },
+ {
+ "str": {
+ "value": "entity_2"
+ }
+ },
+ {
+ "str": {
+ "value": "svc_2"
+ }
+ }
+ ]
+ }
+ ],
+ "fields": [
+ {
+ "int": {
+ "value": 100
+ }
+ },
+ {
+ "int": {
+ "value": 2
+ }
+ }
+ ]
+ },
+ {
+ "tag_families": [
+ {
+ "tags": [
+ {
+ "id": {
+ "value": "5"
+ }
+ },
+ {
+ "str": {
+ "value": "entity_2"
+ }
+ },
+ {
+ "str": {
+ "value": "svc_1"
+ }
+ }
+ ]
+ }
+ ],
+ "fields": [
+ {
+ "int": {
+ "value": 100
+ }
+ },
+ {
+ "int": {
+ "value": 3
+ }
+ }
+ ]
+ },
+ {
+ "tag_families": [
+ {
+ "tags": [
+ {
+ "id": {
+ "value": "6"
+ }
+ },
+ {
+ "str": {
+ "value": "entity_3"
+ }
+ },
+ {
+ "str": {
+ "value": "svc_2"
+ }
+ }
+ ]
+ }
+ ],
+ "fields": [
+ {
+ "int": {
+ "value": 100
+ }
+ },
+ {
+ "int": {
+ "value": 5
+ }
+ }
+ ]
+ },
+ {
+ "tag_families": [
+ {
+ "tags": [
+ {
+ "id": {
+ "value": "2"
+ }
+ },
+ {
+ "str": {
+ "value": "entity_1"
+ }
+ },
+ {
+ "str": {
+ "value": ""
+ }
+ }
+ ]
+ }
+ ],
+ "fields": [
+ {
+ "int": {
+ "value": 50
+ }
+ },
+ {
+ "int": {
+ "value": 4
+ }
+ }
+ ]
+ },
+ {
+ "tag_families": [
+ {
+ "tags": [
+ {
+ "id": {
+ "value": "3"
+ }
+ },
+ {
+ "str": {
+ "value": "entity_1"
+ }
+ },
+ {
+ "str": {
+ "value": "svc_1"
+ }
+ }
+ ]
+ }
+ ],
+ "fields": [
+ {
+ "int": {
+ "value": 300
+ }
+ },
+ {
+ "int": {
+ "value": 6
+ }
+ }
+ ]
+ }
+]
diff --git a/test/cases/measure/data/testdata/service_instance_cpm_minute_data1.json b/test/cases/measure/data/testdata/service_instance_cpm_minute_data1.json
new file mode 100644
index 00000000..93b8b834
--- /dev/null
+++ b/test/cases/measure/data/testdata/service_instance_cpm_minute_data1.json
@@ -0,0 +1,212 @@
+[
+ {
+ "tag_families": [
+ {
+ "tags": [
+ {
+ "id": {
+ "value": "7"
+ }
+ },
+ {
+ "str": {
+ "value": "entity_2"
+ }
+ },
+ {
+ "str": {
+ "value": ""
+ }
+ }
+ ]
+ }
+ ],
+ "fields": [
+ {
+ "int": {
+ "value": 100
+ }
+ },
+ {
+ "int": {
+ "value": 10
+ }
+ }
+ ]
+ },
+ {
+ "tag_families": [
+ {
+ "tags": [
+ {
+ "id": {
+ "value": "8"
+ }
+ },
+ {
+ "str": {
+ "value": "entity_2"
+ }
+ },
+ {
+ "str": {
+ "value": "svc_1"
+ }
+ }
+ ]
+ }
+ ],
+ "fields": [
+ {
+ "int": {
+ "value": 100
+ }
+ },
+ {
+ "int": {
+ "value": 9
+ }
+ }
+ ]
+ },
+ {
+ "tag_families": [
+ {
+ "tags": [
+ {
+ "id": {
+ "value": "9"
+ }
+ },
+ {
+ "str": {
+ "value": "entity_2"
+ }
+ },
+ {
+ "str": {
+ "value": "svc_4"
+ }
+ }
+ ]
+ }
+ ],
+ "fields": [
+ {
+ "int": {
+ "value": 100
+ }
+ },
+ {
+ "int": {
+ "value": 8
+ }
+ }
+ ]
+ },
+ {
+ "tag_families": [
+ {
+ "tags": [
+ {
+ "id": {
+ "value": "10"
+ }
+ },
+ {
+ "str": {
+ "value": "entity_3"
+ }
+ },
+ {
+ "str": {
+ "value": "svc_1"
+ }
+ }
+ ]
+ }
+ ],
+ "fields": [
+ {
+ "int": {
+ "value": 100
+ }
+ },
+ {
+ "int": {
+ "value": 11
+ }
+ }
+ ]
+ },
+ {
+ "tag_families": [
+ {
+ "tags": [
+ {
+ "id": {
+ "value": "11"
+ }
+ },
+ {
+ "str": {
+ "value": "entity_1"
+ }
+ },
+ {
+ "str": {
+ "value": "svc_2"
+ }
+ }
+ ]
+ }
+ ],
+ "fields": [
+ {
+ "int": {
+ "value": 50
+ }
+ },
+ {
+ "int": {
+ "value": 12
+ }
+ }
+ ]
+ },
+ {
+ "tag_families": [
+ {
+ "tags": [
+ {
+ "id": {
+ "value": "12"
+ }
+ },
+ {
+ "str": {
+ "value": "entity_1"
+ }
+ },
+ {
+ "str": {
+ "value": ""
+ }
+ }
+ ]
+ }
+ ],
+ "fields": [
+ {
+ "int": {
+ "value": 300
+ }
+ },
+ {
+ "int": {
+ "value": 7
+ }
+ }
+ ]
+ }
+]
diff --git a/test/cases/measure/data/testdata/service_instance_cpm_minute_data2.json b/test/cases/measure/data/testdata/service_instance_cpm_minute_data2.json
new file mode 100644
index 00000000..e98f1412
--- /dev/null
+++ b/test/cases/measure/data/testdata/service_instance_cpm_minute_data2.json
@@ -0,0 +1,72 @@
+[
+ {
+ "tag_families": [
+ {
+ "tags": [
+ {
+ "id": {
+ "value": "100"
+ }
+ },
+ {
+ "str": {
+ "value": "entity_2"
+ }
+ },
+ {
+ "str": {
+ "value": "svc_2"
+ }
+ }
+ ]
+ }
+ ],
+ "fields": [
+ {
+ "int": {
+ "value": 100
+ }
+ },
+ {
+ "int": {
+ "value": 100
+ }
+ }
+ ]
+ },
+ {
+ "tag_families": [
+ {
+ "tags": [
+ {
+ "id": {
+ "value": "110"
+ }
+ },
+ {
+ "str": {
+ "value": "entity_2"
+ }
+ },
+ {
+ "str": {
+ "value": ""
+ }
+ }
+ ]
+ }
+ ],
+ "fields": [
+ {
+ "int": {
+ "value": 100
+ }
+ },
+ {
+ "int": {
+ "value": 110
+ }
+ }
+ ]
+ }
+]
diff --git a/test/cases/measure/data/testdata/service_instance_endpoint_cpm_minute_data.json b/test/cases/measure/data/testdata/service_instance_endpoint_cpm_minute_data.json
new file mode 100644
index 00000000..c5817343
--- /dev/null
+++ b/test/cases/measure/data/testdata/service_instance_endpoint_cpm_minute_data.json
@@ -0,0 +1,228 @@
+[
+ {
+ "tag_families": [
+ {
+ "tags": [
+ {
+ "id": {
+ "value": "1"
+ }
+ },
+ {
+ "str": {
+ "value": "entity_1"
+ }
+ },
+ {
+ "str": {
+ "value": "svc_1"
+ }
+ },
+ {
+ "str": {
+ "value": "GET:/metrics"
+ }
+ }
+ ]
+ }
+ ],
+ "fields": [
+ {
+ "int": {
+ "value": 100
+ }
+ },
+ {
+ "int": {
+ "value": 1
+ }
+ }
+ ]
+ },
+ {
+ "tag_families": [
+ {
+ "tags": [
+ {
+ "id": {
+ "value": "4"
+ }
+ },
+ {
+ "str": {
+ "value": "entity_2"
+ }
+ },
+ {
+ "str": {
+ "value": "svc_2"
+ }
+ },
+ {
+ "str": {
+ "value": "GET:/check"
+ }
+ }
+ ]
+ }
+ ],
+ "fields": [
+ {
+ "int": {
+ "value": 100
+ }
+ },
+ {
+ "int": {
+ "value": 2
+ }
+ }
+ ]
+ },
+ {
+ "tag_families": [
+ {
+ "tags": [
+ {
+ "id": {
+ "value": "5"
+ }
+ },
+ {
+ "str": {
+ "value": "entity_2"
+ }
+ },
+ {
+ "str": {
+ "value": "svc_1"
+ }
+ }
+ ]
+ }
+ ],
+ "fields": [
+ {
+ "int": {
+ "value": 100
+ }
+ },
+ {
+ "int": {
+ "value": 3
+ }
+ }
+ ]
+ },
+ {
+ "tag_families": [
+ {
+ "tags": [
+ {
+ "id": {
+ "value": "6"
+ }
+ },
+ {
+ "str": {
+ "value": "entity_3"
+ }
+ },
+ {
+ "str": {
+ "value": "svc_2"
+ }
+ }
+ ]
+ }
+ ],
+ "fields": [
+ {
+ "int": {
+ "value": 100
+ }
+ },
+ {
+ "int": {
+ "value": 5
+ }
+ }
+ ]
+ },
+ {
+ "tag_families": [
+ {
+ "tags": [
+ {
+ "id": {
+ "value": "2"
+ }
+ },
+ {
+ "str": {
+ "value": "entity_1"
+ }
+ },
+ {
+ "str": {
+ "value": ""
+ }
+ },
+ {
+ "null": 0
+ }
+ ]
+ }
+ ],
+ "fields": [
+ {
+ "int": {
+ "value": 50
+ }
+ },
+ {
+ "int": {
+ "value": 4
+ }
+ }
+ ]
+ },
+ {
+ "tag_families": [
+ {
+ "tags": [
+ {
+ "id": {
+ "value": "3"
+ }
+ },
+ {
+ "str": {
+ "value": "entity_1"
+ }
+ },
+ {
+ "str": {
+ "value": "svc_1"
+ }
+ },
+ {
+ "null": 0
+ }
+ ]
+ }
+ ],
+ "fields": [
+ {
+ "int": {
+ "value": 300
+ }
+ },
+ {
+ "int": {
+ "value": 6
+ }
+ }
+ ]
+ }
+]
diff --git a/test/cases/measure/data/testdata/service_instance_endpoint_cpm_minute_data1.json b/test/cases/measure/data/testdata/service_instance_endpoint_cpm_minute_data1.json
new file mode 100644
index 00000000..d1313b86
--- /dev/null
+++ b/test/cases/measure/data/testdata/service_instance_endpoint_cpm_minute_data1.json
@@ -0,0 +1,228 @@
+[
+ {
+ "tag_families": [
+ {
+ "tags": [
+ {
+ "id": {
+ "value": "7"
+ }
+ },
+ {
+ "str": {
+ "value": "entity_2"
+ }
+ },
+ {
+ "str": {
+ "value": ""
+ }
+ },
+ {
+ "null": 0
+ }
+ ]
+ }
+ ],
+ "fields": [
+ {
+ "int": {
+ "value": 100
+ }
+ },
+ {
+ "int": {
+ "value": 10
+ }
+ }
+ ]
+ },
+ {
+ "tag_families": [
+ {
+ "tags": [
+ {
+ "id": {
+ "value": "8"
+ }
+ },
+ {
+ "str": {
+ "value": "entity_2"
+ }
+ },
+ {
+ "str": {
+ "value": "svc_1"
+ }
+ },
+ {
+ "str": {
+ "value": "GET:/metrics"
+ }
+ }
+ ]
+ }
+ ],
+ "fields": [
+ {
+ "int": {
+ "value": 100
+ }
+ },
+ {
+ "int": {
+ "value": 9
+ }
+ }
+ ]
+ },
+ {
+ "tag_families": [
+ {
+ "tags": [
+ {
+ "id": {
+ "value": "9"
+ }
+ },
+ {
+ "str": {
+ "value": "entity_2"
+ }
+ },
+ {
+ "str": {
+ "value": "svc_4"
+ }
+ },
+ {
+ "str": {
+ "value": "GET:/check"
+ }
+ }
+ ]
+ }
+ ],
+ "fields": [
+ {
+ "int": {
+ "value": 100
+ }
+ },
+ {
+ "int": {
+ "value": 8
+ }
+ }
+ ]
+ },
+ {
+ "tag_families": [
+ {
+ "tags": [
+ {
+ "id": {
+ "value": "10"
+ }
+ },
+ {
+ "str": {
+ "value": "entity_3"
+ }
+ },
+ {
+ "str": {
+ "value": "svc_1"
+ }
+ }
+ ]
+ }
+ ],
+ "fields": [
+ {
+ "int": {
+ "value": 100
+ }
+ },
+ {
+ "int": {
+ "value": 11
+ }
+ }
+ ]
+ },
+ {
+ "tag_families": [
+ {
+ "tags": [
+ {
+ "id": {
+ "value": "11"
+ }
+ },
+ {
+ "str": {
+ "value": "entity_1"
+ }
+ },
+ {
+ "str": {
+ "value": "svc_2"
+ }
+ }
+ ]
+ }
+ ],
+ "fields": [
+ {
+ "int": {
+ "value": 50
+ }
+ },
+ {
+ "int": {
+ "value": 12
+ }
+ }
+ ]
+ },
+ {
+ "tag_families": [
+ {
+ "tags": [
+ {
+ "id": {
+ "value": "12"
+ }
+ },
+ {
+ "str": {
+ "value": "entity_1"
+ }
+ },
+ {
+ "str": {
+ "value": ""
+ }
+ },
+ {
+ "null": 0
+ }
+ ]
+ }
+ ],
+ "fields": [
+ {
+ "int": {
+ "value": 300
+ }
+ },
+ {
+ "int": {
+ "value": 7
+ }
+ }
+ ]
+ }
+]
diff --git a/test/cases/measure/data/testdata/service_instance_endpoint_cpm_minute_data2.json b/test/cases/measure/data/testdata/service_instance_endpoint_cpm_minute_data2.json
new file mode 100644
index 00000000..170f45d6
--- /dev/null
+++ b/test/cases/measure/data/testdata/service_instance_endpoint_cpm_minute_data2.json
@@ -0,0 +1,82 @@
+[
+ {
+ "tag_families": [
+ {
+ "tags": [
+ {
+ "id": {
+ "value": "100"
+ }
+ },
+ {
+ "str": {
+ "value": "entity_2"
+ }
+ },
+ {
+ "str": {
+ "value": "svc_2"
+ }
+ },
+ {
+ "str": {
+ "value": "GET:/metrics"
+ }
+ }
+ ]
+ }
+ ],
+ "fields": [
+ {
+ "int": {
+ "value": 100
+ }
+ },
+ {
+ "int": {
+ "value": 100
+ }
+ }
+ ]
+ },
+ {
+ "tag_families": [
+ {
+ "tags": [
+ {
+ "id": {
+ "value": "110"
+ }
+ },
+ {
+ "str": {
+ "value": "entity_2"
+ }
+ },
+ {
+ "str": {
+ "value": ""
+ }
+ },
+ {
+ "str": {
+ "value": "GET:/metrics"
+ }
+ }
+ ]
+ }
+ ],
+ "fields": [
+ {
+ "int": {
+ "value": 100
+ }
+ },
+ {
+ "int": {
+ "value": 110
+ }
+ }
+ ]
+ }
+]
diff --git a/test/cases/topn/data/input/aggr_desc.yaml b/test/cases/topn/data/input/aggr_desc.yaml
index 83d2f7db..3151ab64 100644
--- a/test/cases/topn/data/input/aggr_desc.yaml
+++ b/test/cases/topn/data/input/aggr_desc.yaml
@@ -16,7 +16,7 @@
# under the License.
metadata:
- name: "service_cpm_minute_top_bottom_100"
+ name: "service_instance_cpm_minute_top_bottom_100"
group: "sw_metric"
topN: 3
fieldValueSort: 1
diff --git a/test/cases/topn/data/input/asc.yaml b/test/cases/topn/data/input/asc.yaml
index 75e714b0..9a8732df 100644
--- a/test/cases/topn/data/input/asc.yaml
+++ b/test/cases/topn/data/input/asc.yaml
@@ -16,7 +16,7 @@
# under the License.
metadata:
- name: "service_cpm_minute_top_bottom_100"
+ name: "service_instance_cpm_minute_top_bottom_100"
group: "sw_metric"
-topN: 1
+topN: 2
fieldValueSort: 2
diff --git a/test/cases/topn/data/input/condition_aggr_desc.yaml b/test/cases/topn/data/input/condition_aggr_desc.yaml
index 88f701c6..844af5a3 100644
--- a/test/cases/topn/data/input/condition_aggr_desc.yaml
+++ b/test/cases/topn/data/input/condition_aggr_desc.yaml
@@ -16,14 +16,14 @@
# under the License.
metadata:
- name: "service_cpm_minute_top_bottom_100"
+ name: "service_instance_cpm_minute_top_bottom_100"
group: "sw_metric"
topN: 1
fieldValueSort: 1
agg: 2
conditions:
-- name: entity_id
+- name: service_id
op: 1
value:
str:
- value: entity_1
+ value: "svc_1"
diff --git a/test/cases/topn/data/input/desc.yaml b/test/cases/topn/data/input/desc.yaml
index d40dfb52..63a4d565 100644
--- a/test/cases/topn/data/input/desc.yaml
+++ b/test/cases/topn/data/input/desc.yaml
@@ -16,7 +16,7 @@
# under the License.
metadata:
- name: "service_cpm_minute_top_bottom_100"
+ name: "service_instance_cpm_minute_top_bottom_100"
group: "sw_metric"
-topN: 1
+topN: 2
fieldValueSort: 1
diff --git a/test/cases/topn/data/input/condition_aggr_desc.yaml b/test/cases/topn/data/input/null_group.yaml
similarity index 89%
copy from test/cases/topn/data/input/condition_aggr_desc.yaml
copy to test/cases/topn/data/input/null_group.yaml
index 88f701c6..5020488f 100644
--- a/test/cases/topn/data/input/condition_aggr_desc.yaml
+++ b/test/cases/topn/data/input/null_group.yaml
@@ -16,14 +16,14 @@
# under the License.
metadata:
- name: "service_cpm_minute_top_bottom_100"
+ name: "service_instance_endpoint_cpm_minute_top_bottom_100"
group: "sw_metric"
-topN: 1
+topN: 3
fieldValueSort: 1
agg: 2
conditions:
-- name: entity_id
+- name: http.uri
op: 1
value:
str:
- value: entity_1
+ value: ""
diff --git a/test/cases/topn/data/want/aggr_desc.yaml b/test/cases/topn/data/want/aggr_desc.yaml
index 85c4ee51..96cc77b2 100644
--- a/test/cases/topn/data/want/aggr_desc.yaml
+++ b/test/cases/topn/data/want/aggr_desc.yaml
@@ -17,15 +17,39 @@
lists:
- items:
- - name: entity_1
+ - entity:
+ - key: service_id
+ value:
+ str:
+ value: svc_2
+ - key: entity_id
+ value:
+ str:
+ value: entity_1
value:
int:
value: "12"
- - name: entity_3
+ - entity:
+ - key: service_id
+ value:
+ str:
+ value: svc_1
+ - key: entity_id
+ value:
+ str:
+ value: entity_3
value:
int:
value: "11"
- - name: entity_2
+ - entity:
+ - key: service_id
+ value:
+ str:
+ value: ""
+ - key: entity_id
+ value:
+ str:
+ value: entity_2
value:
int:
value: "10"
diff --git a/test/cases/topn/data/want/asc.yaml b/test/cases/topn/data/want/asc.yaml
index 14c2f447..2731344d 100644
--- a/test/cases/topn/data/want/asc.yaml
+++ b/test/cases/topn/data/want/asc.yaml
@@ -17,38 +17,158 @@
lists:
- items:
- - name: entity_1
+ - entity:
+ - key: service_id
+ value:
+ str:
+ value: svc_1
+ - key: entity_id
+ value:
+ str:
+ value: entity_1
value:
int:
value: "1"
+ - entity:
+ - key: service_id
+ value:
+ str:
+ value: ""
+ - key: entity_id
+ value:
+ str:
+ value: entity_2
+ value:
+ int:
+ value: "10"
timestamp: "2022-10-17T12:49:00Z"
- items:
- - name: entity_2
+ - entity:
+ - key: service_id
+ value:
+ str:
+ value: svc_2
+ - key: entity_id
+ value:
+ str:
+ value: entity_2
value:
int:
value: "2"
+ - entity:
+ - key: service_id
+ value:
+ str:
+ value: svc_1
+ - key: entity_id
+ value:
+ str:
+ value: entity_2
+ value:
+ int:
+ value: "9"
timestamp: "2022-10-17T12:50:00Z"
- items:
- - name: entity_2
+ - entity:
+ - key: service_id
+ value:
+ str:
+ value: svc_1
+ - key: entity_id
+ value:
+ str:
+ value: entity_2
value:
int:
value: "3"
+ - entity:
+ - key: service_id
+ value:
+ str:
+ value: svc_4
+ - key: entity_id
+ value:
+ str:
+ value: entity_2
+ value:
+ int:
+ value: "8"
timestamp: "2022-10-17T12:51:00Z"
- items:
- - name: entity_3
+ - entity:
+ - key: service_id
+ value:
+ str:
+ value: svc_2
+ - key: entity_id
+ value:
+ str:
+ value: entity_3
value:
int:
value: "5"
+ - entity:
+ - key: service_id
+ value:
+ str:
+ value: svc_1
+ - key: entity_id
+ value:
+ str:
+ value: entity_3
+ value:
+ int:
+ value: "11"
timestamp: "2022-10-17T12:52:00Z"
- items:
- - name: entity_1
+ - entity:
+ - key: service_id
+ value:
+ str:
+ value: ""
+ - key: entity_id
+ value:
+ str:
+ value: entity_1
value:
int:
value: "4"
+ - entity:
+ - key: service_id
+ value:
+ str:
+ value: svc_2
+ - key: entity_id
+ value:
+ str:
+ value: entity_1
+ value:
+ int:
+ value: "12"
timestamp: "2022-10-17T12:53:00Z"
- items:
- - name: entity_1
+ - entity:
+ - key: service_id
+ value:
+ str:
+ value: svc_1
+ - key: entity_id
+ value:
+ str:
+ value: entity_1
value:
int:
value: "6"
+ - entity:
+ - key: service_id
+ value:
+ str:
+ value: ""
+ - key: entity_id
+ value:
+ str:
+ value: entity_1
+ value:
+ int:
+ value: "7"
timestamp: "2022-10-17T12:54:00Z"
\ No newline at end of file
diff --git a/test/cases/topn/data/want/condition_aggr_desc.yaml b/test/cases/topn/data/want/condition_aggr_desc.yaml
index 784e5c38..ebbb77f3 100644
--- a/test/cases/topn/data/want/condition_aggr_desc.yaml
+++ b/test/cases/topn/data/want/condition_aggr_desc.yaml
@@ -17,8 +17,16 @@
lists:
- items:
- - name: entity_1
+ - entity:
+ - key: service_id
+ value:
+ str:
+ value: svc_1
+ - key: entity_id
+ value:
+ str:
+ value: entity_3
value:
int:
- value: "12"
+ value: "11"
timestamp: "2022-10-17T12:54:00Z"
\ No newline at end of file
diff --git a/test/cases/topn/data/want/desc.yaml b/test/cases/topn/data/want/desc.yaml
index 40a10b07..a937323c 100644
--- a/test/cases/topn/data/want/desc.yaml
+++ b/test/cases/topn/data/want/desc.yaml
@@ -17,38 +17,158 @@
lists:
- items:
- - name: entity_2
+ - entity:
+ - key: service_id
+ value:
+ str:
+ value: ""
+ - key: entity_id
+ value:
+ str:
+ value: entity_2
value:
int:
value: "10"
+ - entity:
+ - key: service_id
+ value:
+ str:
+ value: svc_1
+ - key: entity_id
+ value:
+ str:
+ value: entity_1
+ value:
+ int:
+ value: "1"
timestamp: "2022-10-17T12:49:00Z"
- items:
- - name: entity_2
+ - entity:
+ - key: service_id
+ value:
+ str:
+ value: svc_1
+ - key: entity_id
+ value:
+ str:
+ value: entity_2
value:
int:
value: "9"
+ - entity:
+ - key: service_id
+ value:
+ str:
+ value: svc_2
+ - key: entity_id
+ value:
+ str:
+ value: entity_2
+ value:
+ int:
+ value: "2"
timestamp: "2022-10-17T12:50:00Z"
- items:
- - name: entity_2
+ - entity:
+ - key: service_id
+ value:
+ str:
+ value: svc_4
+ - key: entity_id
+ value:
+ str:
+ value: entity_2
value:
int:
value: "8"
+ - entity:
+ - key: service_id
+ value:
+ str:
+ value: svc_1
+ - key: entity_id
+ value:
+ str:
+ value: entity_2
+ value:
+ int:
+ value: "3"
timestamp: "2022-10-17T12:51:00Z"
- items:
- - name: entity_3
+ - entity:
+ - key: service_id
+ value:
+ str:
+ value: svc_1
+ - key: entity_id
+ value:
+ str:
+ value: entity_3
value:
int:
value: "11"
+ - entity:
+ - key: service_id
+ value:
+ str:
+ value: svc_2
+ - key: entity_id
+ value:
+ str:
+ value: entity_3
+ value:
+ int:
+ value: "5"
timestamp: "2022-10-17T12:52:00Z"
- items:
- - name: entity_1
+ - entity:
+ - key: service_id
+ value:
+ str:
+ value: svc_2
+ - key: entity_id
+ value:
+ str:
+ value: entity_1
value:
int:
value: "12"
+ - entity:
+ - key: service_id
+ value:
+ str:
+ value: ""
+ - key: entity_id
+ value:
+ str:
+ value: entity_1
+ value:
+ int:
+ value: "4"
timestamp: "2022-10-17T12:53:00Z"
- items:
- - name: entity_1
+ - entity:
+ - key: service_id
+ value:
+ str:
+ value: ""
+ - key: entity_id
+ value:
+ str:
+ value: entity_1
value:
int:
value: "7"
+ - entity:
+ - key: service_id
+ value:
+ str:
+ value: svc_1
+ - key: entity_id
+ value:
+ str:
+ value: entity_1
+ value:
+ int:
+ value: "6"
timestamp: "2022-10-17T12:54:00Z"
\ No newline at end of file
diff --git a/test/cases/topn/data/want/aggr_desc.yaml b/test/cases/topn/data/want/null_group.yaml
similarity index 65%
copy from test/cases/topn/data/want/aggr_desc.yaml
copy to test/cases/topn/data/want/null_group.yaml
index 85c4ee51..c9841a7d 100644
--- a/test/cases/topn/data/want/aggr_desc.yaml
+++ b/test/cases/topn/data/want/null_group.yaml
@@ -17,16 +17,40 @@
lists:
- items:
- - name: entity_1
+ - entity:
+ - key: service_id
+ value:
+ str:
+ value: svc_2
+ - key: entity_id
+ value:
+ str:
+ value: entity_1
value:
int:
value: "12"
- - name: entity_3
+ - entity:
+ - key: service_id
+ value:
+ str:
+ value: svc_1
+ - key: entity_id
+ value:
+ str:
+ value: entity_3
value:
int:
value: "11"
- - name: entity_2
+ - entity:
+ - key: service_id
+ value:
+ str:
+ value: ""
+ - key: entity_id
+ value:
+ str:
+ value: entity_2
value:
int:
value: "10"
- timestamp: "2022-10-17T12:54:00Z"
\ No newline at end of file
+ timestamp: "2023-03-18T13:19:00Z"
\ No newline at end of file
diff --git a/test/cases/topn/topn.go b/test/cases/topn/topn.go
index 3147048a..48f6d54e 100644
--- a/test/cases/topn/topn.go
+++ b/test/cases/topn/topn.go
@@ -44,4 +44,5 @@ var _ = g.DescribeTable("TopN Tests", verify,
g.Entry("asc", helpers.Args{Input: "asc", Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
g.Entry("max top3 order by desc", helpers.Args{Input: "aggr_desc", Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
g.Entry("max top3 with condition order by desc", helpers.Args{Input: "condition_aggr_desc", Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
+ g.Entry("max top3 for null group order by desc", helpers.Args{Input: "null_group", Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
)
diff --git a/test/integration/cold_query/query_suite_test.go b/test/integration/cold_query/query_suite_test.go
index 04296ae4..0319b5f5 100644
--- a/test/integration/cold_query/query_suite_test.go
+++ b/test/integration/cold_query/query_suite_test.go
@@ -76,6 +76,12 @@ var _ = SynchronizedBeforeSuite(func() []byte {
casesmeasureData.Write(conn, "service_cpm_minute", "sw_metric", "service_cpm_minute_data1.json", now.Add(10*time.Second), interval)
casesmeasureData.Write(conn, "service_cpm_minute", "sw_metric", "service_cpm_minute_data2.json", now.Add(10*time.Minute), interval)
casesmeasureData.Write(conn, "instance_clr_cpu_minute", "sw_metric", "instance_clr_cpu_minute_data.json", now, interval)
+ casesmeasureData.Write(conn, "service_instance_cpm_minute", "sw_metric", "service_instance_cpm_minute_data.json", now, interval)
+ casesmeasureData.Write(conn, "service_instance_cpm_minute", "sw_metric", "service_instance_cpm_minute_data1.json", now.Add(10*time.Second), interval)
+ casesmeasureData.Write(conn, "service_instance_cpm_minute", "sw_metric", "service_instance_cpm_minute_data2.json", now.Add(10*time.Minute), interval)
+ casesmeasureData.Write(conn, "service_instance_endpoint_cpm_minute", "sw_metric", "service_instance_endpoint_cpm_minute_data.json", now, interval)
+ casesmeasureData.Write(conn, "service_instance_endpoint_cpm_minute", "sw_metric", "service_instance_endpoint_cpm_minute_data1.json", now.Add(10*time.Second), interval)
+ casesmeasureData.Write(conn, "service_instance_endpoint_cpm_minute", "sw_metric", "service_instance_endpoint_cpm_minute_data2.json", now.Add(10*time.Minute), interval)
Expect(conn.Close()).To(Succeed())
return []byte(addr)
}, func(address []byte) {
diff --git a/test/integration/query/query_suite_test.go b/test/integration/query/query_suite_test.go
index 49bb25ce..762b9889 100644
--- a/test/integration/query/query_suite_test.go
+++ b/test/integration/query/query_suite_test.go
@@ -77,6 +77,12 @@ var _ = SynchronizedBeforeSuite(func() []byte {
casesmeasuredata.Write(conn, "service_cpm_minute", "sw_metric", "service_cpm_minute_data1.json", now.Add(10*time.Second), interval)
casesmeasuredata.Write(conn, "service_cpm_minute", "sw_metric", "service_cpm_minute_data2.json", now.Add(10*time.Minute), interval)
casesmeasuredata.Write(conn, "instance_clr_cpu_minute", "sw_metric", "instance_clr_cpu_minute_data.json", now, interval)
+ casesmeasuredata.Write(conn, "service_instance_cpm_minute", "sw_metric", "service_instance_cpm_minute_data.json", now, interval)
+ casesmeasuredata.Write(conn, "service_instance_cpm_minute", "sw_metric", "service_instance_cpm_minute_data1.json", now.Add(10*time.Second), interval)
+ casesmeasuredata.Write(conn, "service_instance_cpm_minute", "sw_metric", "service_instance_cpm_minute_data2.json", now.Add(10*time.Minute), interval)
+ casesmeasuredata.Write(conn, "service_instance_endpoint_cpm_minute", "sw_metric", "service_instance_endpoint_cpm_minute_data.json", now, interval)
+ casesmeasuredata.Write(conn, "service_instance_endpoint_cpm_minute", "sw_metric", "service_instance_endpoint_cpm_minute_data1.json", now.Add(10*time.Second), interval)
+ casesmeasuredata.Write(conn, "service_instance_endpoint_cpm_minute", "sw_metric", "service_instance_endpoint_cpm_minute_data2.json", now.Add(10*time.Minute), interval)
Expect(conn.Close()).To(Succeed())
return []byte(addr)
}, func(address []byte) {