You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by lu...@apache.org on 2022/10/17 14:51:51 UTC
[skywalking-banyandb] branch fix-topn-post-processor created (now 61fd510)
This is an automated email from the ASF dual-hosted git repository.
lujiajing pushed a change to branch fix-topn-post-processor
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
at 61fd510 fix lint
This branch includes the following new commits:
new c135b85 polish test data and fix post processor
new 61fd510 fix lint
The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails. The revisions
listed as "add" were already present in the repository and have only
been added to this reference.
[skywalking-banyandb] 01/02: polish test data and fix post processor
Posted by lu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
lujiajing pushed a commit to branch fix-topn-post-processor
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit c135b85cf8d6bd440299481fcb9037cd11d02994
Author: Megrez Lu <lu...@gmail.com>
AuthorDate: Mon Oct 17 22:46:12 2022 +0800
polish test data and fix post processor
---
banyand/measure/measure_topn.go | 11 ++-
banyand/query/processor_topn.go | 86 ++++++++++++----------
pkg/flow/dedup_priority_queue.go | 39 ++--------
.../data/testdata/service_cpm_minute_data.json | 6 +-
.../data/testdata/service_cpm_minute_data1.json | 10 +--
test/cases/measure/data/want/all.yaml | 40 +++++-----
test/cases/measure/data/want/group_max.yaml | 66 ++++++++---------
test/cases/measure/data/want/limit.yaml | 8 +-
test/cases/measure/data/want/order_asc.yaml | 40 +++++-----
test/cases/measure/data/want/order_desc.yaml | 40 +++++-----
test/cases/topn/data/want/aggr_desc.yaml | 12 +--
test/cases/topn/data/want/asc.yaml | 22 +++---
test/cases/topn/data/want/condition_aggr_desc.yaml | 4 +-
test/cases/topn/data/want/desc.yaml | 22 +++---
14 files changed, 198 insertions(+), 208 deletions(-)
diff --git a/banyand/measure/measure_topn.go b/banyand/measure/measure_topn.go
index 8a964b1..183b5f0 100644
--- a/banyand/measure/measure_topn.go
+++ b/banyand/measure/measure_topn.go
@@ -247,15 +247,20 @@ func (t *topNStreamingProcessor) locate(tagValues []*modelv1.TagValue, rankNum i
if len(t.topNSchema.GetGroupByTagNames()) != len(tagValues) {
return nil, 0, errors.New("no enough tag values for the entity")
}
- entity := make(tsdb.Entity, 1+1+len(t.topNSchema.GetGroupByTagNames()))
+ // entity prefix
+ // 1) source measure Name + topN aggregation Name
+ // 2) sort direction
+ // 3) rank number
+ entity := make(tsdb.Entity, 1+1+1+len(t.topNSchema.GetGroupByTagNames()))
// entity prefix
entity[0] = []byte(formatMeasureCompanionPrefix(t.topNSchema.GetSourceMeasure().GetName(),
t.topNSchema.GetMetadata().GetName()))
- entity[1] = convert.Int64ToBytes(int64(rankNum))
+ entity[1] = convert.Int64ToBytes(int64(t.sortDirection.Number()))
+ entity[2] = convert.Int64ToBytes(int64(rankNum))
// measureID as sharding key
for idx, tagVal := range tagValues {
var innerErr error
- entity[idx+2], innerErr = pbv1.MarshalIndexFieldValue(tagVal)
+ entity[idx+3], innerErr = pbv1.MarshalIndexFieldValue(tagVal)
if innerErr != nil {
return nil, 0, innerErr
}
diff --git a/banyand/query/processor_topn.go b/banyand/query/processor_topn.go
index e8aa712..65b514d 100644
--- a/banyand/query/processor_topn.go
+++ b/banyand/query/processor_topn.go
@@ -21,7 +21,6 @@ import (
"bytes"
"container/heap"
"context"
- "math"
"time"
"github.com/pkg/errors"
@@ -88,7 +87,7 @@ func (t *topNQueryProcessor) Rev(message bus.Message) (resp bus.Message) {
}
aggregator := createTopNPostAggregator(request.GetTopN(),
request.GetAgg(), request.GetFieldValueSort())
- entity, err := locateEntity(topNSchema, request.GetConditions())
+ entity, err := locateEntity(topNSchema, request.GetFieldValueSort(), request.GetConditions())
if err != nil {
t.log.Error().Err(err).
Str("topN", topNMetadata.GetName()).
@@ -134,14 +133,17 @@ func (t *topNQueryProcessor) Rev(message bus.Message) (resp bus.Message) {
return
}
-func locateEntity(topNSchema *databasev1.TopNAggregation, conditions []*modelv1.Condition) (tsdb.Entity, error) {
+func locateEntity(topNSchema *databasev1.TopNAggregation, sortDirection modelv1.Sort, conditions []*modelv1.Condition) (tsdb.Entity, error) {
entityMap := make(map[string]int)
- entity := make([]tsdb.Entry, 1+len(topNSchema.GetGroupByTagNames()))
- entity[0] = tsdb.AnyEntry
+ entity := make([]tsdb.Entry, 1+1+len(topNSchema.GetGroupByTagNames()))
+ // sortDirection
+ entity[0] = convert.Int64ToBytes(int64(sortDirection.Number()))
+ // rankNumber
+ entity[1] = tsdb.AnyEntry
for idx, tagName := range topNSchema.GetGroupByTagNames() {
- entityMap[tagName] = idx + 1
+ entityMap[tagName] = idx + 2
// fill AnyEntry by default
- entity[idx+1] = tsdb.AnyEntry
+ entity[idx+2] = tsdb.AnyEntry
}
for _, pairQuery := range conditions {
if pairQuery.GetOp() != modelv1.Condition_BINARY_OP_EQ {
@@ -258,11 +260,14 @@ func (aggr postAggregationProcessor) Len() int {
return len(aggr.items)
}
+// Less reports whether min/max heap has to be built.
+// For DESC, a min heap has to be built,
+// while for ASC, a max heap has to be built.
func (aggr postAggregationProcessor) Less(i, j int) bool {
if aggr.sort == modelv1.Sort_SORT_DESC {
- return aggr.items[i].int64Func.Val() > aggr.items[j].int64Func.Val()
+ return aggr.items[i].int64Func.Val() < aggr.items[j].int64Func.Val()
}
- return aggr.items[i].int64Func.Val() < aggr.items[j].int64Func.Val()
+ return aggr.items[i].int64Func.Val() > aggr.items[j].int64Func.Val()
}
func (aggr *postAggregationProcessor) Swap(i, j int) {
@@ -298,6 +303,7 @@ func (aggr *postAggregationProcessor) put(key string, val int64, timestampMillis
heap.Fix(aggr, item.index)
return nil
}
+
aggrFunc, err := aggregation.NewInt64Func(aggr.aggrFunc)
if err != nil {
return err
@@ -307,24 +313,40 @@ func (aggr *postAggregationProcessor) put(key string, val int64, timestampMillis
int64Func: aggrFunc,
}
item.int64Func.In(val)
- aggr.cache[key] = item
- heap.Push(aggr, item)
+
+ if aggr.Len() < int(aggr.topN) {
+ aggr.cache[key] = item
+ heap.Push(aggr, item)
+ } else {
+ if lowest := aggr.items[0]; lowest != nil {
+ if aggr.sort == modelv1.Sort_SORT_DESC && lowest.int64Func.Val() < val {
+ aggr.cache[key] = item
+ aggr.items[0] = item
+ heap.Fix(aggr, 0)
+ } else if aggr.sort != modelv1.Sort_SORT_DESC && lowest.int64Func.Val() > val {
+ aggr.cache[key] = item
+ aggr.items[0] = item
+ heap.Fix(aggr, 0)
+ }
+ }
+ }
+
return nil
}
func (aggr *postAggregationProcessor) val() []*measurev1.TopNList {
- itemLen := int(math.Min(float64(aggr.topN), float64(aggr.Len())))
- topNItems := make([]*measurev1.TopNList_Item, 0, itemLen)
+ topNItems := make([]*measurev1.TopNList_Item, aggr.Len())
- for _, item := range aggr.items[0:itemLen] {
- topNItems = append(topNItems, &measurev1.TopNList_Item{
+ for aggr.Len() > 0 {
+ item := heap.Pop(aggr).(*aggregatorItem)
+ topNItems[aggr.Len()] = &measurev1.TopNList_Item{
Name: item.key,
Value: &modelv1.FieldValue{
Value: &modelv1.FieldValue_Int{
Int: &modelv1.Int{Value: item.int64Func.Val()},
},
},
- })
+ }
}
return []*measurev1.TopNList{
{
@@ -360,8 +382,8 @@ func (naggr *postNonAggregationProcessor) val() []*measurev1.TopNList {
topNLists := make([]*measurev1.TopNList, 0, len(naggr.timelines))
for ts, timeline := range naggr.timelines {
items := make([]*measurev1.TopNList_Item, timeline.Len())
- for _, elem := range timeline.Values() {
- items[elem.GetIndex()] = &measurev1.TopNList_Item{
+ for idx, elem := range timeline.Values() {
+ items[idx] = &measurev1.TopNList_Item{
Name: elem.(*nonAggregatorItem).key,
Value: &modelv1.FieldValue{
Value: &modelv1.FieldValue_Int{
@@ -393,21 +415,11 @@ func (naggr *postNonAggregationProcessor) put(key string, val int64, timestampMi
if timeline.Len() < int(naggr.topN) {
heap.Push(timeline, &nonAggregatorItem{val: val, key: key})
} else {
- if right := timeline.Right(); right != nil {
- if naggr.sort == modelv1.Sort_SORT_DESC && right.(*nonAggregatorItem).val < val {
- heap.Push(timeline, &nonAggregatorItem{val: val, key: key})
- newTimeline, err := timeline.WithNewItems(timeline.Slice(0, int(naggr.topN)))
- if err != nil {
- return err
- }
- naggr.timelines[timestampMillis] = newTimeline
- } else if naggr.sort != modelv1.Sort_SORT_DESC && right.(*nonAggregatorItem).val > val {
- heap.Push(timeline, &nonAggregatorItem{val: val, key: key})
- newTimeline, err := timeline.WithNewItems(timeline.Slice(0, int(naggr.topN)))
- if err != nil {
- return err
- }
- naggr.timelines[timestampMillis] = newTimeline
+ if lowest := timeline.Peek(); lowest != nil {
+ if naggr.sort == modelv1.Sort_SORT_DESC && lowest.(*nonAggregatorItem).val < val {
+ timeline.ReplaceLowest(&nonAggregatorItem{val: val, key: key})
+ } else if naggr.sort != modelv1.Sort_SORT_DESC && lowest.(*nonAggregatorItem).val > val {
+ timeline.ReplaceLowest(&nonAggregatorItem{val: val, key: key})
}
}
}
@@ -417,19 +429,19 @@ func (naggr *postNonAggregationProcessor) put(key string, val int64, timestampMi
timeline := flow.NewPriorityQueue(func(a, b interface{}) int {
if naggr.sort == modelv1.Sort_SORT_DESC {
if a.(*nonAggregatorItem).val < b.(*nonAggregatorItem).val {
- return 1
+ return -1
} else if a.(*nonAggregatorItem).val == b.(*nonAggregatorItem).val {
return 0
} else {
- return -1
+ return 1
}
}
if a.(*nonAggregatorItem).val < b.(*nonAggregatorItem).val {
- return -1
+ return 1
} else if a.(*nonAggregatorItem).val == b.(*nonAggregatorItem).val {
return 0
} else {
- return 1
+ return -1
}
}, false)
naggr.timelines[timestampMillis] = timeline
diff --git a/pkg/flow/dedup_priority_queue.go b/pkg/flow/dedup_priority_queue.go
index fbd1b2b..55fa120 100644
--- a/pkg/flow/dedup_priority_queue.go
+++ b/pkg/flow/dedup_priority_queue.go
@@ -116,43 +116,16 @@ func (pq *DedupPriorityQueue) Peek() Element {
return nil
}
-// Slice returns a sliced DedupPriorityQueue using the given bounds.
-func (pq *DedupPriorityQueue) Slice(start, end int) []Element {
- return pq.Items[start:end]
+func (pq *DedupPriorityQueue) ReplaceLowest(newLowest Element) {
+ pq.Items[0] = newLowest
+ heap.Fix(pq, 0)
}
func (pq *DedupPriorityQueue) Values() []Element {
values := make([]Element, pq.Len())
- for _, elem := range pq.Items {
- values[elem.GetIndex()] = elem
+ for pq.Len() > 0 {
+ item := heap.Pop(pq).(Element)
+ values[pq.Len()] = item
}
return values
}
-
-func (pq *DedupPriorityQueue) Left() Element {
- if pq.Len() == 0 {
- return nil
- }
- return pq.Items[0]
-}
-
-func (pq *DedupPriorityQueue) Right() Element {
- if pq.Len() == 0 {
- return nil
- }
- return pq.Items[pq.Len()-1]
-}
-
-func (pq *DedupPriorityQueue) WithNewItems(items []Element) (*DedupPriorityQueue, error) {
- newPq := &DedupPriorityQueue{
- Items: items,
- cache: make(map[Element]struct{}),
- allowDuplicates: pq.allowDuplicates,
- comparator: pq.comparator,
- }
- err := newPq.initCache()
- if err != nil {
- return nil, err
- }
- return newPq, nil
-}
diff --git a/test/cases/measure/data/testdata/service_cpm_minute_data.json b/test/cases/measure/data/testdata/service_cpm_minute_data.json
index 877290c..c2dfa53 100644
--- a/test/cases/measure/data/testdata/service_cpm_minute_data.json
+++ b/test/cases/measure/data/testdata/service_cpm_minute_data.json
@@ -54,7 +54,7 @@
},
{
"int": {
- "value": 1
+ "value": 2
}
}
]
@@ -84,7 +84,7 @@
},
{
"int": {
- "value": 1
+ "value": 3
}
}
]
@@ -174,7 +174,7 @@
},
{
"int": {
- "value": 5
+ "value": 6
}
}
]
diff --git a/test/cases/measure/data/testdata/service_cpm_minute_data1.json b/test/cases/measure/data/testdata/service_cpm_minute_data1.json
index 44bd61a..2ee3ecf 100644
--- a/test/cases/measure/data/testdata/service_cpm_minute_data1.json
+++ b/test/cases/measure/data/testdata/service_cpm_minute_data1.json
@@ -54,7 +54,7 @@
},
{
"int": {
- "value": 5
+ "value": 9
}
}
]
@@ -84,7 +84,7 @@
},
{
"int": {
- "value": 3
+ "value": 8
}
}
]
@@ -114,7 +114,7 @@
},
{
"int": {
- "value": 2
+ "value": 11
}
}
]
@@ -144,7 +144,7 @@
},
{
"int": {
- "value": 1
+ "value": 12
}
}
]
@@ -174,7 +174,7 @@
},
{
"int": {
- "value": 4
+ "value": 7
}
}
]
diff --git a/test/cases/measure/data/want/all.yaml b/test/cases/measure/data/want/all.yaml
index 5041ece..5d41213 100644
--- a/test/cases/measure/data/want/all.yaml
+++ b/test/cases/measure/data/want/all.yaml
@@ -36,7 +36,7 @@ dataPoints:
value:
str:
value: entity_1
- timestamp: "2022-10-13T13:34:18.389Z"
+ timestamp: "2022-10-17T12:49:45.912Z"
- fields:
- name: total
value:
@@ -57,7 +57,7 @@ dataPoints:
value:
str:
value: entity_2
- timestamp: "2022-10-13T13:34:19.389Z"
+ timestamp: "2022-10-17T12:49:46.912Z"
- fields:
- name: total
value:
@@ -66,7 +66,7 @@ dataPoints:
- name: value
value:
int:
- value: "1"
+ value: "2"
tagFamilies:
- name: default
tags:
@@ -78,7 +78,7 @@ dataPoints:
value:
str:
value: entity_2
- timestamp: "2022-10-13T13:35:18.389Z"
+ timestamp: "2022-10-17T12:50:45.912Z"
- fields:
- name: total
value:
@@ -87,7 +87,7 @@ dataPoints:
- name: value
value:
int:
- value: "5"
+ value: "9"
tagFamilies:
- name: default
tags:
@@ -99,7 +99,7 @@ dataPoints:
value:
str:
value: entity_2
- timestamp: "2022-10-13T13:35:19.389Z"
+ timestamp: "2022-10-17T12:50:46.912Z"
- fields:
- name: total
value:
@@ -108,7 +108,7 @@ dataPoints:
- name: value
value:
int:
- value: "1"
+ value: "3"
tagFamilies:
- name: default
tags:
@@ -120,7 +120,7 @@ dataPoints:
value:
str:
value: entity_2
- timestamp: "2022-10-13T13:36:18.389Z"
+ timestamp: "2022-10-17T12:51:45.912Z"
- fields:
- name: total
value:
@@ -129,7 +129,7 @@ dataPoints:
- name: value
value:
int:
- value: "3"
+ value: "8"
tagFamilies:
- name: default
tags:
@@ -141,7 +141,7 @@ dataPoints:
value:
str:
value: entity_2
- timestamp: "2022-10-13T13:36:19.389Z"
+ timestamp: "2022-10-17T12:51:46.912Z"
- fields:
- name: total
value:
@@ -162,7 +162,7 @@ dataPoints:
value:
str:
value: entity_3
- timestamp: "2022-10-13T13:37:18.389Z"
+ timestamp: "2022-10-17T12:52:45.912Z"
- fields:
- name: total
value:
@@ -171,7 +171,7 @@ dataPoints:
- name: value
value:
int:
- value: "2"
+ value: "11"
tagFamilies:
- name: default
tags:
@@ -183,7 +183,7 @@ dataPoints:
value:
str:
value: entity_3
- timestamp: "2022-10-13T13:37:19.389Z"
+ timestamp: "2022-10-17T12:52:46.912Z"
- fields:
- name: total
value:
@@ -204,7 +204,7 @@ dataPoints:
value:
str:
value: entity_1
- timestamp: "2022-10-13T13:38:18.389Z"
+ timestamp: "2022-10-17T12:53:45.912Z"
- fields:
- name: total
value:
@@ -213,7 +213,7 @@ dataPoints:
- name: value
value:
int:
- value: "1"
+ value: "12"
tagFamilies:
- name: default
tags:
@@ -225,7 +225,7 @@ dataPoints:
value:
str:
value: entity_1
- timestamp: "2022-10-13T13:38:19.389Z"
+ timestamp: "2022-10-17T12:53:46.912Z"
- fields:
- name: total
value:
@@ -234,7 +234,7 @@ dataPoints:
- name: value
value:
int:
- value: "5"
+ value: "6"
tagFamilies:
- name: default
tags:
@@ -246,7 +246,7 @@ dataPoints:
value:
str:
value: entity_1
- timestamp: "2022-10-13T13:39:18.389Z"
+ timestamp: "2022-10-17T12:54:45.912Z"
- fields:
- name: total
value:
@@ -255,7 +255,7 @@ dataPoints:
- name: value
value:
int:
- value: "4"
+ value: "7"
tagFamilies:
- name: default
tags:
@@ -267,4 +267,4 @@ dataPoints:
value:
str:
value: entity_1
- timestamp: "2022-10-13T13:39:19.389Z"
+ timestamp: "2022-10-17T12:54:46.912Z"
diff --git a/test/cases/measure/data/want/group_max.yaml b/test/cases/measure/data/want/group_max.yaml
index f24bb15..bbe3923 100644
--- a/test/cases/measure/data/want/group_max.yaml
+++ b/test/cases/measure/data/want/group_max.yaml
@@ -16,39 +16,39 @@
# under the License.
dataPoints:
- - fields:
- - name: value
+- fields:
+ - name: value
+ value:
+ int:
+ value: "12"
+ tagFamilies:
+ - name: default
+ tags:
+ - key: entity_id
value:
- int:
- value: "5"
- tagFamilies:
- - name: default
- tags:
- - key: entity_id
- value:
- str:
- value: entity_1
- - fields:
- - name: value
+ str:
+ value: entity_1
+- fields:
+ - name: value
+ value:
+ int:
+ value: "10"
+ tagFamilies:
+ - name: default
+ tags:
+ - key: entity_id
value:
- int:
- value: "10"
- tagFamilies:
- - name: default
- tags:
- - key: entity_id
- value:
- str:
- value: entity_2
- - fields:
- - name: value
+ str:
+ value: entity_2
+- fields:
+ - name: value
+ value:
+ int:
+ value: "11"
+ tagFamilies:
+ - name: default
+ tags:
+ - key: entity_id
value:
- int:
- value: "5"
- tagFamilies:
- - name: default
- tags:
- - key: entity_id
- value:
- str:
- value: entity_3
\ No newline at end of file
+ str:
+ value: entity_3
\ No newline at end of file
diff --git a/test/cases/measure/data/want/limit.yaml b/test/cases/measure/data/want/limit.yaml
index 2267a74..ca93ea1 100644
--- a/test/cases/measure/data/want/limit.yaml
+++ b/test/cases/measure/data/want/limit.yaml
@@ -24,7 +24,7 @@ dataPoints:
- name: value
value:
int:
- value: "5"
+ value: "9"
tagFamilies:
- name: default
tags:
@@ -32,7 +32,7 @@ dataPoints:
value:
str:
value: entity_2
- timestamp: "2022-10-13T14:13:50.440Z"
+ timestamp: "2022-10-17T12:57:03.444Z"
- fields:
- name: total
value:
@@ -41,7 +41,7 @@ dataPoints:
- name: value
value:
int:
- value: "1"
+ value: "3"
tagFamilies:
- name: default
tags:
@@ -49,4 +49,4 @@ dataPoints:
value:
str:
value: entity_2
- timestamp: "2022-10-13T14:14:49.440Z"
\ No newline at end of file
+ timestamp: "2022-10-17T12:58:02.444Z"
\ No newline at end of file
diff --git a/test/cases/measure/data/want/order_asc.yaml b/test/cases/measure/data/want/order_asc.yaml
index 67f6d11..d55db5e 100644
--- a/test/cases/measure/data/want/order_asc.yaml
+++ b/test/cases/measure/data/want/order_asc.yaml
@@ -32,7 +32,7 @@ dataPoints:
value:
str:
value: entity_1
- timestamp: "2022-10-13T14:11:30.182Z"
+ timestamp: "2022-10-17T12:54:36.627Z"
- fields:
- name: total
value:
@@ -49,7 +49,7 @@ dataPoints:
value:
str:
value: entity_2
- timestamp: "2022-10-13T14:11:31.182Z"
+ timestamp: "2022-10-17T12:54:37.627Z"
- fields:
- name: total
value:
@@ -58,7 +58,7 @@ dataPoints:
- name: value
value:
int:
- value: "1"
+ value: "2"
tagFamilies:
- name: default
tags:
@@ -66,7 +66,7 @@ dataPoints:
value:
str:
value: entity_2
- timestamp: "2022-10-13T14:12:30.182Z"
+ timestamp: "2022-10-17T12:55:36.627Z"
- fields:
- name: total
value:
@@ -75,7 +75,7 @@ dataPoints:
- name: value
value:
int:
- value: "5"
+ value: "9"
tagFamilies:
- name: default
tags:
@@ -83,7 +83,7 @@ dataPoints:
value:
str:
value: entity_2
- timestamp: "2022-10-13T14:12:31.182Z"
+ timestamp: "2022-10-17T12:55:37.627Z"
- fields:
- name: total
value:
@@ -92,7 +92,7 @@ dataPoints:
- name: value
value:
int:
- value: "1"
+ value: "3"
tagFamilies:
- name: default
tags:
@@ -100,7 +100,7 @@ dataPoints:
value:
str:
value: entity_2
- timestamp: "2022-10-13T14:13:30.182Z"
+ timestamp: "2022-10-17T12:56:36.627Z"
- fields:
- name: total
value:
@@ -109,7 +109,7 @@ dataPoints:
- name: value
value:
int:
- value: "3"
+ value: "8"
tagFamilies:
- name: default
tags:
@@ -117,7 +117,7 @@ dataPoints:
value:
str:
value: entity_2
- timestamp: "2022-10-13T14:13:31.182Z"
+ timestamp: "2022-10-17T12:56:37.627Z"
- fields:
- name: total
value:
@@ -134,7 +134,7 @@ dataPoints:
value:
str:
value: entity_3
- timestamp: "2022-10-13T14:14:30.182Z"
+ timestamp: "2022-10-17T12:57:36.627Z"
- fields:
- name: total
value:
@@ -143,7 +143,7 @@ dataPoints:
- name: value
value:
int:
- value: "2"
+ value: "11"
tagFamilies:
- name: default
tags:
@@ -151,7 +151,7 @@ dataPoints:
value:
str:
value: entity_3
- timestamp: "2022-10-13T14:14:31.182Z"
+ timestamp: "2022-10-17T12:57:37.627Z"
- fields:
- name: total
value:
@@ -168,7 +168,7 @@ dataPoints:
value:
str:
value: entity_1
- timestamp: "2022-10-13T14:15:30.182Z"
+ timestamp: "2022-10-17T12:58:36.627Z"
- fields:
- name: total
value:
@@ -177,7 +177,7 @@ dataPoints:
- name: value
value:
int:
- value: "1"
+ value: "12"
tagFamilies:
- name: default
tags:
@@ -185,7 +185,7 @@ dataPoints:
value:
str:
value: entity_1
- timestamp: "2022-10-13T14:15:31.182Z"
+ timestamp: "2022-10-17T12:58:37.627Z"
- fields:
- name: total
value:
@@ -194,7 +194,7 @@ dataPoints:
- name: value
value:
int:
- value: "5"
+ value: "6"
tagFamilies:
- name: default
tags:
@@ -202,7 +202,7 @@ dataPoints:
value:
str:
value: entity_1
- timestamp: "2022-10-13T14:16:30.182Z"
+ timestamp: "2022-10-17T12:59:36.627Z"
- fields:
- name: total
value:
@@ -211,7 +211,7 @@ dataPoints:
- name: value
value:
int:
- value: "4"
+ value: "7"
tagFamilies:
- name: default
tags:
@@ -219,4 +219,4 @@ dataPoints:
value:
str:
value: entity_1
- timestamp: "2022-10-13T14:16:31.182Z"
\ No newline at end of file
+ timestamp: "2022-10-17T12:59:37.627Z"
\ No newline at end of file
diff --git a/test/cases/measure/data/want/order_desc.yaml b/test/cases/measure/data/want/order_desc.yaml
index b503b36..4c4f749 100644
--- a/test/cases/measure/data/want/order_desc.yaml
+++ b/test/cases/measure/data/want/order_desc.yaml
@@ -24,7 +24,7 @@ dataPoints:
- name: value
value:
int:
- value: "4"
+ value: "7"
tagFamilies:
- name: default
tags:
@@ -32,7 +32,7 @@ dataPoints:
value:
str:
value: entity_1
- timestamp: "2022-10-13T14:17:50.440Z"
+ timestamp: "2022-10-17T13:01:03.444Z"
- fields:
- name: total
value:
@@ -41,7 +41,7 @@ dataPoints:
- name: value
value:
int:
- value: "5"
+ value: "6"
tagFamilies:
- name: default
tags:
@@ -49,7 +49,7 @@ dataPoints:
value:
str:
value: entity_1
- timestamp: "2022-10-13T14:17:49.440Z"
+ timestamp: "2022-10-17T13:01:02.444Z"
- fields:
- name: total
value:
@@ -58,7 +58,7 @@ dataPoints:
- name: value
value:
int:
- value: "1"
+ value: "12"
tagFamilies:
- name: default
tags:
@@ -66,7 +66,7 @@ dataPoints:
value:
str:
value: entity_1
- timestamp: "2022-10-13T14:16:50.440Z"
+ timestamp: "2022-10-17T13:00:03.444Z"
- fields:
- name: total
value:
@@ -83,7 +83,7 @@ dataPoints:
value:
str:
value: entity_1
- timestamp: "2022-10-13T14:16:49.440Z"
+ timestamp: "2022-10-17T13:00:02.444Z"
- fields:
- name: total
value:
@@ -92,7 +92,7 @@ dataPoints:
- name: value
value:
int:
- value: "2"
+ value: "11"
tagFamilies:
- name: default
tags:
@@ -100,7 +100,7 @@ dataPoints:
value:
str:
value: entity_3
- timestamp: "2022-10-13T14:15:50.440Z"
+ timestamp: "2022-10-17T12:59:03.444Z"
- fields:
- name: total
value:
@@ -117,7 +117,7 @@ dataPoints:
value:
str:
value: entity_3
- timestamp: "2022-10-13T14:15:49.440Z"
+ timestamp: "2022-10-17T12:59:02.444Z"
- fields:
- name: total
value:
@@ -126,7 +126,7 @@ dataPoints:
- name: value
value:
int:
- value: "3"
+ value: "8"
tagFamilies:
- name: default
tags:
@@ -134,7 +134,7 @@ dataPoints:
value:
str:
value: entity_2
- timestamp: "2022-10-13T14:14:50.440Z"
+ timestamp: "2022-10-17T12:58:03.444Z"
- fields:
- name: total
value:
@@ -143,7 +143,7 @@ dataPoints:
- name: value
value:
int:
- value: "1"
+ value: "3"
tagFamilies:
- name: default
tags:
@@ -151,7 +151,7 @@ dataPoints:
value:
str:
value: entity_2
- timestamp: "2022-10-13T14:14:49.440Z"
+ timestamp: "2022-10-17T12:58:02.444Z"
- fields:
- name: total
value:
@@ -160,7 +160,7 @@ dataPoints:
- name: value
value:
int:
- value: "5"
+ value: "9"
tagFamilies:
- name: default
tags:
@@ -168,7 +168,7 @@ dataPoints:
value:
str:
value: entity_2
- timestamp: "2022-10-13T14:13:50.440Z"
+ timestamp: "2022-10-17T12:57:03.444Z"
- fields:
- name: total
value:
@@ -177,7 +177,7 @@ dataPoints:
- name: value
value:
int:
- value: "1"
+ value: "2"
tagFamilies:
- name: default
tags:
@@ -185,7 +185,7 @@ dataPoints:
value:
str:
value: entity_2
- timestamp: "2022-10-13T14:13:49.440Z"
+ timestamp: "2022-10-17T12:57:02.444Z"
- fields:
- name: total
value:
@@ -202,7 +202,7 @@ dataPoints:
value:
str:
value: entity_2
- timestamp: "2022-10-13T14:12:50.440Z"
+ timestamp: "2022-10-17T12:56:03.444Z"
- fields:
- name: total
value:
@@ -219,4 +219,4 @@ dataPoints:
value:
str:
value: entity_1
- timestamp: "2022-10-13T14:12:49.440Z"
\ No newline at end of file
+ timestamp: "2022-10-17T12:56:02.444Z"
\ No newline at end of file
diff --git a/test/cases/topn/data/want/aggr_desc.yaml b/test/cases/topn/data/want/aggr_desc.yaml
index 336774a..85c4ee5 100644
--- a/test/cases/topn/data/want/aggr_desc.yaml
+++ b/test/cases/topn/data/want/aggr_desc.yaml
@@ -17,16 +17,16 @@
lists:
- items:
- - name: entity_2
+ - name: entity_1
value:
int:
- value: "10"
+ value: "12"
- name: entity_3
value:
int:
- value: "5"
- - name: entity_1
+ value: "11"
+ - name: entity_2
value:
int:
- value: "5"
- timestamp: "2022-10-13T14:35:00Z"
\ No newline at end of file
+ value: "10"
+ timestamp: "2022-10-17T12:54:00Z"
\ No newline at end of file
diff --git a/test/cases/topn/data/want/asc.yaml b/test/cases/topn/data/want/asc.yaml
index 6eebcc6..14c2f44 100644
--- a/test/cases/topn/data/want/asc.yaml
+++ b/test/cases/topn/data/want/asc.yaml
@@ -21,34 +21,34 @@ lists:
value:
int:
value: "1"
- timestamp: "2022-10-13T14:14:00Z"
+ timestamp: "2022-10-17T12:49:00Z"
- items:
- name: entity_2
value:
int:
- value: "1"
- timestamp: "2022-10-13T14:15:00Z"
+ value: "2"
+ timestamp: "2022-10-17T12:50:00Z"
- items:
- name: entity_2
value:
int:
- value: "1"
- timestamp: "2022-10-13T14:16:00Z"
+ value: "3"
+ timestamp: "2022-10-17T12:51:00Z"
- items:
- name: entity_3
value:
int:
- value: "2"
- timestamp: "2022-10-13T14:17:00Z"
+ value: "5"
+ timestamp: "2022-10-17T12:52:00Z"
- items:
- name: entity_1
value:
int:
- value: "1"
- timestamp: "2022-10-13T14:18:00Z"
+ value: "4"
+ timestamp: "2022-10-17T12:53:00Z"
- items:
- name: entity_1
value:
int:
- value: "5"
- timestamp: "2022-10-13T14:19:00Z"
\ No newline at end of file
+ value: "6"
+ timestamp: "2022-10-17T12:54:00Z"
\ No newline at end of file
diff --git a/test/cases/topn/data/want/condition_aggr_desc.yaml b/test/cases/topn/data/want/condition_aggr_desc.yaml
index d1ae3bb..784e5c3 100644
--- a/test/cases/topn/data/want/condition_aggr_desc.yaml
+++ b/test/cases/topn/data/want/condition_aggr_desc.yaml
@@ -20,5 +20,5 @@ lists:
- name: entity_1
value:
int:
- value: "5"
- timestamp: "2022-10-13T14:45:00Z"
\ No newline at end of file
+ value: "12"
+ timestamp: "2022-10-17T12:54:00Z"
\ No newline at end of file
diff --git a/test/cases/topn/data/want/desc.yaml b/test/cases/topn/data/want/desc.yaml
index dbc19ab..53f58be 100644
--- a/test/cases/topn/data/want/desc.yaml
+++ b/test/cases/topn/data/want/desc.yaml
@@ -21,34 +21,34 @@ lists:
value:
int:
value: "10"
- timestamp: "2022-10-13T14:14:00Z"
+ timestamp: "2022-10-17T12:49:00Z"
- items:
- name: entity_2
value:
int:
- value: "5"
- timestamp: "2022-10-13T14:15:00Z"
+ value: "9"
+ timestamp: "2022-10-17T12:50:00Z"
- items:
- name: entity_2
value:
int:
- value: "3"
- timestamp: "2022-10-13T14:16:00Z"
+ value: "8"
+ timestamp: "2022-10-17T12:51:00Z"
- items:
- name: entity_3
value:
int:
- value: "5"
- timestamp: "2022-10-13T14:17:00Z"
+ value: "11"
+ timestamp: "2022-10-17T12:52:00Z"
- items:
- name: entity_1
value:
int:
- value: "4"
- timestamp: "2022-10-13T14:18:00Z"
+ value: "12"
+ timestamp: "2022-10-17T12:53:00Z"
- items:
- name: entity_1
value:
int:
- value: "5"
- timestamp: "2022-10-13T14:19:00Z"
\ No newline at end of file
+ value: "6"
+ timestamp: "2022-10-17T12:54:00Z"
\ No newline at end of file
[skywalking-banyandb] 02/02: fix lint
Posted by lu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
lujiajing pushed a commit to branch fix-topn-post-processor
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 61fd5103e79352a6d8e69435fb14035e1928f1db
Author: Megrez Lu <lu...@gmail.com>
AuthorDate: Mon Oct 17 22:47:08 2022 +0800
fix lint
---
pkg/flow/dedup_priority_queue.go | 15 ---------------
1 file changed, 15 deletions(-)
diff --git a/pkg/flow/dedup_priority_queue.go b/pkg/flow/dedup_priority_queue.go
index 55fa120..a5c0f8e 100644
--- a/pkg/flow/dedup_priority_queue.go
+++ b/pkg/flow/dedup_priority_queue.go
@@ -21,7 +21,6 @@ import (
"container/heap"
"github.com/emirpasic/gods/utils"
- "github.com/pkg/errors"
)
var _ heap.Interface = (*DedupPriorityQueue)(nil)
@@ -50,20 +49,6 @@ func NewPriorityQueue(comparator utils.Comparator, allowDuplicates bool) *DedupP
}
}
-func (pq *DedupPriorityQueue) initCache() error {
- if pq.allowDuplicates || len(pq.Items) == 0 {
- return nil
- }
- for _, elem := range pq.Items {
- if _, ok := pq.cache[elem]; !ok {
- pq.cache[elem] = struct{}{}
- } else {
- return errors.New("duplicated item is not allowed")
- }
- }
- return nil
-}
-
// Len returns the DedupPriorityQueue length.
func (pq *DedupPriorityQueue) Len() int { return len(pq.Items) }