You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by lu...@apache.org on 2022/10/17 14:51:51 UTC

[skywalking-banyandb] branch fix-topn-post-processor created (now 61fd510)

This is an automated email from the ASF dual-hosted git repository.

lujiajing pushed a change to branch fix-topn-post-processor
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


      at 61fd510  fix lint

This branch includes the following new commits:

     new c135b85  polish test data and fix post processor
     new 61fd510  fix lint

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[skywalking-banyandb] 01/02: polish test data and fix post processor

Posted by lu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lujiajing pushed a commit to branch fix-topn-post-processor
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit c135b85cf8d6bd440299481fcb9037cd11d02994
Author: Megrez Lu <lu...@gmail.com>
AuthorDate: Mon Oct 17 22:46:12 2022 +0800

    polish test data and fix post processor
---
 banyand/measure/measure_topn.go                    | 11 ++-
 banyand/query/processor_topn.go                    | 86 ++++++++++++----------
 pkg/flow/dedup_priority_queue.go                   | 39 ++--------
 .../data/testdata/service_cpm_minute_data.json     |  6 +-
 .../data/testdata/service_cpm_minute_data1.json    | 10 +--
 test/cases/measure/data/want/all.yaml              | 40 +++++-----
 test/cases/measure/data/want/group_max.yaml        | 66 ++++++++---------
 test/cases/measure/data/want/limit.yaml            |  8 +-
 test/cases/measure/data/want/order_asc.yaml        | 40 +++++-----
 test/cases/measure/data/want/order_desc.yaml       | 40 +++++-----
 test/cases/topn/data/want/aggr_desc.yaml           | 12 +--
 test/cases/topn/data/want/asc.yaml                 | 22 +++---
 test/cases/topn/data/want/condition_aggr_desc.yaml |  4 +-
 test/cases/topn/data/want/desc.yaml                | 22 +++---
 14 files changed, 198 insertions(+), 208 deletions(-)

diff --git a/banyand/measure/measure_topn.go b/banyand/measure/measure_topn.go
index 8a964b1..183b5f0 100644
--- a/banyand/measure/measure_topn.go
+++ b/banyand/measure/measure_topn.go
@@ -247,15 +247,20 @@ func (t *topNStreamingProcessor) locate(tagValues []*modelv1.TagValue, rankNum i
 	if len(t.topNSchema.GetGroupByTagNames()) != len(tagValues) {
 		return nil, 0, errors.New("no enough tag values for the entity")
 	}
-	entity := make(tsdb.Entity, 1+1+len(t.topNSchema.GetGroupByTagNames()))
+	// entity prefix
+	// 1) source measure Name + topN aggregation Name
+	// 2) sort direction
+	// 3) rank number
+	entity := make(tsdb.Entity, 1+1+1+len(t.topNSchema.GetGroupByTagNames()))
 	// entity prefix
 	entity[0] = []byte(formatMeasureCompanionPrefix(t.topNSchema.GetSourceMeasure().GetName(),
 		t.topNSchema.GetMetadata().GetName()))
-	entity[1] = convert.Int64ToBytes(int64(rankNum))
+	entity[1] = convert.Int64ToBytes(int64(t.sortDirection.Number()))
+	entity[2] = convert.Int64ToBytes(int64(rankNum))
 	// measureID as sharding key
 	for idx, tagVal := range tagValues {
 		var innerErr error
-		entity[idx+2], innerErr = pbv1.MarshalIndexFieldValue(tagVal)
+		entity[idx+3], innerErr = pbv1.MarshalIndexFieldValue(tagVal)
 		if innerErr != nil {
 			return nil, 0, innerErr
 		}
diff --git a/banyand/query/processor_topn.go b/banyand/query/processor_topn.go
index e8aa712..65b514d 100644
--- a/banyand/query/processor_topn.go
+++ b/banyand/query/processor_topn.go
@@ -21,7 +21,6 @@ import (
 	"bytes"
 	"container/heap"
 	"context"
-	"math"
 	"time"
 
 	"github.com/pkg/errors"
@@ -88,7 +87,7 @@ func (t *topNQueryProcessor) Rev(message bus.Message) (resp bus.Message) {
 	}
 	aggregator := createTopNPostAggregator(request.GetTopN(),
 		request.GetAgg(), request.GetFieldValueSort())
-	entity, err := locateEntity(topNSchema, request.GetConditions())
+	entity, err := locateEntity(topNSchema, request.GetFieldValueSort(), request.GetConditions())
 	if err != nil {
 		t.log.Error().Err(err).
 			Str("topN", topNMetadata.GetName()).
@@ -134,14 +133,17 @@ func (t *topNQueryProcessor) Rev(message bus.Message) (resp bus.Message) {
 	return
 }
 
-func locateEntity(topNSchema *databasev1.TopNAggregation, conditions []*modelv1.Condition) (tsdb.Entity, error) {
+func locateEntity(topNSchema *databasev1.TopNAggregation, sortDirection modelv1.Sort, conditions []*modelv1.Condition) (tsdb.Entity, error) {
 	entityMap := make(map[string]int)
-	entity := make([]tsdb.Entry, 1+len(topNSchema.GetGroupByTagNames()))
-	entity[0] = tsdb.AnyEntry
+	entity := make([]tsdb.Entry, 1+1+len(topNSchema.GetGroupByTagNames()))
+	// sortDirection
+	entity[0] = convert.Int64ToBytes(int64(sortDirection.Number()))
+	// rankNumber
+	entity[1] = tsdb.AnyEntry
 	for idx, tagName := range topNSchema.GetGroupByTagNames() {
-		entityMap[tagName] = idx + 1
+		entityMap[tagName] = idx + 2
 		// fill AnyEntry by default
-		entity[idx+1] = tsdb.AnyEntry
+		entity[idx+2] = tsdb.AnyEntry
 	}
 	for _, pairQuery := range conditions {
 		if pairQuery.GetOp() != modelv1.Condition_BINARY_OP_EQ {
@@ -258,11 +260,14 @@ func (aggr postAggregationProcessor) Len() int {
 	return len(aggr.items)
 }
 
+// Less reports whether min/max heap has to be built.
+// For DESC, a min heap has to be built,
+// while for ASC, a max heap has to be built.
 func (aggr postAggregationProcessor) Less(i, j int) bool {
 	if aggr.sort == modelv1.Sort_SORT_DESC {
-		return aggr.items[i].int64Func.Val() > aggr.items[j].int64Func.Val()
+		return aggr.items[i].int64Func.Val() < aggr.items[j].int64Func.Val()
 	}
-	return aggr.items[i].int64Func.Val() < aggr.items[j].int64Func.Val()
+	return aggr.items[i].int64Func.Val() > aggr.items[j].int64Func.Val()
 }
 
 func (aggr *postAggregationProcessor) Swap(i, j int) {
@@ -298,6 +303,7 @@ func (aggr *postAggregationProcessor) put(key string, val int64, timestampMillis
 		heap.Fix(aggr, item.index)
 		return nil
 	}
+
 	aggrFunc, err := aggregation.NewInt64Func(aggr.aggrFunc)
 	if err != nil {
 		return err
@@ -307,24 +313,40 @@ func (aggr *postAggregationProcessor) put(key string, val int64, timestampMillis
 		int64Func: aggrFunc,
 	}
 	item.int64Func.In(val)
-	aggr.cache[key] = item
-	heap.Push(aggr, item)
+
+	if aggr.Len() < int(aggr.topN) {
+		aggr.cache[key] = item
+		heap.Push(aggr, item)
+	} else {
+		if lowest := aggr.items[0]; lowest != nil {
+			if aggr.sort == modelv1.Sort_SORT_DESC && lowest.int64Func.Val() < val {
+				aggr.cache[key] = item
+				aggr.items[0] = item
+				heap.Fix(aggr, 0)
+			} else if aggr.sort != modelv1.Sort_SORT_DESC && lowest.int64Func.Val() > val {
+				aggr.cache[key] = item
+				aggr.items[0] = item
+				heap.Fix(aggr, 0)
+			}
+		}
+	}
+
 	return nil
 }
 
 func (aggr *postAggregationProcessor) val() []*measurev1.TopNList {
-	itemLen := int(math.Min(float64(aggr.topN), float64(aggr.Len())))
-	topNItems := make([]*measurev1.TopNList_Item, 0, itemLen)
+	topNItems := make([]*measurev1.TopNList_Item, aggr.Len())
 
-	for _, item := range aggr.items[0:itemLen] {
-		topNItems = append(topNItems, &measurev1.TopNList_Item{
+	for aggr.Len() > 0 {
+		item := heap.Pop(aggr).(*aggregatorItem)
+		topNItems[aggr.Len()] = &measurev1.TopNList_Item{
 			Name: item.key,
 			Value: &modelv1.FieldValue{
 				Value: &modelv1.FieldValue_Int{
 					Int: &modelv1.Int{Value: item.int64Func.Val()},
 				},
 			},
-		})
+		}
 	}
 	return []*measurev1.TopNList{
 		{
@@ -360,8 +382,8 @@ func (naggr *postNonAggregationProcessor) val() []*measurev1.TopNList {
 	topNLists := make([]*measurev1.TopNList, 0, len(naggr.timelines))
 	for ts, timeline := range naggr.timelines {
 		items := make([]*measurev1.TopNList_Item, timeline.Len())
-		for _, elem := range timeline.Values() {
-			items[elem.GetIndex()] = &measurev1.TopNList_Item{
+		for idx, elem := range timeline.Values() {
+			items[idx] = &measurev1.TopNList_Item{
 				Name: elem.(*nonAggregatorItem).key,
 				Value: &modelv1.FieldValue{
 					Value: &modelv1.FieldValue_Int{
@@ -393,21 +415,11 @@ func (naggr *postNonAggregationProcessor) put(key string, val int64, timestampMi
 		if timeline.Len() < int(naggr.topN) {
 			heap.Push(timeline, &nonAggregatorItem{val: val, key: key})
 		} else {
-			if right := timeline.Right(); right != nil {
-				if naggr.sort == modelv1.Sort_SORT_DESC && right.(*nonAggregatorItem).val < val {
-					heap.Push(timeline, &nonAggregatorItem{val: val, key: key})
-					newTimeline, err := timeline.WithNewItems(timeline.Slice(0, int(naggr.topN)))
-					if err != nil {
-						return err
-					}
-					naggr.timelines[timestampMillis] = newTimeline
-				} else if naggr.sort != modelv1.Sort_SORT_DESC && right.(*nonAggregatorItem).val > val {
-					heap.Push(timeline, &nonAggregatorItem{val: val, key: key})
-					newTimeline, err := timeline.WithNewItems(timeline.Slice(0, int(naggr.topN)))
-					if err != nil {
-						return err
-					}
-					naggr.timelines[timestampMillis] = newTimeline
+			if lowest := timeline.Peek(); lowest != nil {
+				if naggr.sort == modelv1.Sort_SORT_DESC && lowest.(*nonAggregatorItem).val < val {
+					timeline.ReplaceLowest(&nonAggregatorItem{val: val, key: key})
+				} else if naggr.sort != modelv1.Sort_SORT_DESC && lowest.(*nonAggregatorItem).val > val {
+					timeline.ReplaceLowest(&nonAggregatorItem{val: val, key: key})
 				}
 			}
 		}
@@ -417,19 +429,19 @@ func (naggr *postNonAggregationProcessor) put(key string, val int64, timestampMi
 	timeline := flow.NewPriorityQueue(func(a, b interface{}) int {
 		if naggr.sort == modelv1.Sort_SORT_DESC {
 			if a.(*nonAggregatorItem).val < b.(*nonAggregatorItem).val {
-				return 1
+				return -1
 			} else if a.(*nonAggregatorItem).val == b.(*nonAggregatorItem).val {
 				return 0
 			} else {
-				return -1
+				return 1
 			}
 		}
 		if a.(*nonAggregatorItem).val < b.(*nonAggregatorItem).val {
-			return -1
+			return 1
 		} else if a.(*nonAggregatorItem).val == b.(*nonAggregatorItem).val {
 			return 0
 		} else {
-			return 1
+			return -1
 		}
 	}, false)
 	naggr.timelines[timestampMillis] = timeline
diff --git a/pkg/flow/dedup_priority_queue.go b/pkg/flow/dedup_priority_queue.go
index fbd1b2b..55fa120 100644
--- a/pkg/flow/dedup_priority_queue.go
+++ b/pkg/flow/dedup_priority_queue.go
@@ -116,43 +116,16 @@ func (pq *DedupPriorityQueue) Peek() Element {
 	return nil
 }
 
-// Slice returns a sliced DedupPriorityQueue using the given bounds.
-func (pq *DedupPriorityQueue) Slice(start, end int) []Element {
-	return pq.Items[start:end]
+func (pq *DedupPriorityQueue) ReplaceLowest(newLowest Element) {
+	pq.Items[0] = newLowest
+	heap.Fix(pq, 0)
 }
 
 func (pq *DedupPriorityQueue) Values() []Element {
 	values := make([]Element, pq.Len())
-	for _, elem := range pq.Items {
-		values[elem.GetIndex()] = elem
+	for pq.Len() > 0 {
+		item := heap.Pop(pq).(Element)
+		values[pq.Len()] = item
 	}
 	return values
 }
-
-func (pq *DedupPriorityQueue) Left() Element {
-	if pq.Len() == 0 {
-		return nil
-	}
-	return pq.Items[0]
-}
-
-func (pq *DedupPriorityQueue) Right() Element {
-	if pq.Len() == 0 {
-		return nil
-	}
-	return pq.Items[pq.Len()-1]
-}
-
-func (pq *DedupPriorityQueue) WithNewItems(items []Element) (*DedupPriorityQueue, error) {
-	newPq := &DedupPriorityQueue{
-		Items:           items,
-		cache:           make(map[Element]struct{}),
-		allowDuplicates: pq.allowDuplicates,
-		comparator:      pq.comparator,
-	}
-	err := newPq.initCache()
-	if err != nil {
-		return nil, err
-	}
-	return newPq, nil
-}
diff --git a/test/cases/measure/data/testdata/service_cpm_minute_data.json b/test/cases/measure/data/testdata/service_cpm_minute_data.json
index 877290c..c2dfa53 100644
--- a/test/cases/measure/data/testdata/service_cpm_minute_data.json
+++ b/test/cases/measure/data/testdata/service_cpm_minute_data.json
@@ -54,7 +54,7 @@
       },
       {
         "int": {
-          "value": 1
+          "value": 2
         }
       }
     ]
@@ -84,7 +84,7 @@
       },
       {
         "int": {
-          "value": 1
+          "value": 3
         }
       }
     ]
@@ -174,7 +174,7 @@
       },
       {
         "int": {
-          "value": 5
+          "value": 6
         }
       }
     ]
diff --git a/test/cases/measure/data/testdata/service_cpm_minute_data1.json b/test/cases/measure/data/testdata/service_cpm_minute_data1.json
index 44bd61a..2ee3ecf 100644
--- a/test/cases/measure/data/testdata/service_cpm_minute_data1.json
+++ b/test/cases/measure/data/testdata/service_cpm_minute_data1.json
@@ -54,7 +54,7 @@
       },
       {
         "int": {
-          "value": 5
+          "value": 9
         }
       }
     ]
@@ -84,7 +84,7 @@
       },
       {
         "int": {
-          "value": 3
+          "value": 8
         }
       }
     ]
@@ -114,7 +114,7 @@
       },
       {
         "int": {
-          "value": 2
+          "value": 11
         }
       }
     ]
@@ -144,7 +144,7 @@
       },
       {
         "int": {
-          "value": 1
+          "value": 12
         }
       }
     ]
@@ -174,7 +174,7 @@
       },
       {
         "int": {
-          "value": 4
+          "value": 7
         }
       }
     ]
diff --git a/test/cases/measure/data/want/all.yaml b/test/cases/measure/data/want/all.yaml
index 5041ece..5d41213 100644
--- a/test/cases/measure/data/want/all.yaml
+++ b/test/cases/measure/data/want/all.yaml
@@ -36,7 +36,7 @@ dataPoints:
       value:
         str:
           value: entity_1
-  timestamp: "2022-10-13T13:34:18.389Z"
+  timestamp: "2022-10-17T12:49:45.912Z"
 - fields:
   - name: total
     value:
@@ -57,7 +57,7 @@ dataPoints:
       value:
         str:
           value: entity_2
-  timestamp: "2022-10-13T13:34:19.389Z"
+  timestamp: "2022-10-17T12:49:46.912Z"
 - fields:
   - name: total
     value:
@@ -66,7 +66,7 @@ dataPoints:
   - name: value
     value:
       int:
-        value: "1"
+        value: "2"
   tagFamilies:
   - name: default
     tags:
@@ -78,7 +78,7 @@ dataPoints:
       value:
         str:
           value: entity_2
-  timestamp: "2022-10-13T13:35:18.389Z"
+  timestamp: "2022-10-17T12:50:45.912Z"
 - fields:
   - name: total
     value:
@@ -87,7 +87,7 @@ dataPoints:
   - name: value
     value:
       int:
-        value: "5"
+        value: "9"
   tagFamilies:
   - name: default
     tags:
@@ -99,7 +99,7 @@ dataPoints:
       value:
         str:
           value: entity_2
-  timestamp: "2022-10-13T13:35:19.389Z"
+  timestamp: "2022-10-17T12:50:46.912Z"
 - fields:
   - name: total
     value:
@@ -108,7 +108,7 @@ dataPoints:
   - name: value
     value:
       int:
-        value: "1"
+        value: "3"
   tagFamilies:
   - name: default
     tags:
@@ -120,7 +120,7 @@ dataPoints:
       value:
         str:
           value: entity_2
-  timestamp: "2022-10-13T13:36:18.389Z"
+  timestamp: "2022-10-17T12:51:45.912Z"
 - fields:
   - name: total
     value:
@@ -129,7 +129,7 @@ dataPoints:
   - name: value
     value:
       int:
-        value: "3"
+        value: "8"
   tagFamilies:
   - name: default
     tags:
@@ -141,7 +141,7 @@ dataPoints:
       value:
         str:
           value: entity_2
-  timestamp: "2022-10-13T13:36:19.389Z"
+  timestamp: "2022-10-17T12:51:46.912Z"
 - fields:
   - name: total
     value:
@@ -162,7 +162,7 @@ dataPoints:
       value:
         str:
           value: entity_3
-  timestamp: "2022-10-13T13:37:18.389Z"
+  timestamp: "2022-10-17T12:52:45.912Z"
 - fields:
   - name: total
     value:
@@ -171,7 +171,7 @@ dataPoints:
   - name: value
     value:
       int:
-        value: "2"
+        value: "11"
   tagFamilies:
   - name: default
     tags:
@@ -183,7 +183,7 @@ dataPoints:
       value:
         str:
           value: entity_3
-  timestamp: "2022-10-13T13:37:19.389Z"
+  timestamp: "2022-10-17T12:52:46.912Z"
 - fields:
   - name: total
     value:
@@ -204,7 +204,7 @@ dataPoints:
       value:
         str:
           value: entity_1
-  timestamp: "2022-10-13T13:38:18.389Z"
+  timestamp: "2022-10-17T12:53:45.912Z"
 - fields:
   - name: total
     value:
@@ -213,7 +213,7 @@ dataPoints:
   - name: value
     value:
       int:
-        value: "1"
+        value: "12"
   tagFamilies:
   - name: default
     tags:
@@ -225,7 +225,7 @@ dataPoints:
       value:
         str:
           value: entity_1
-  timestamp: "2022-10-13T13:38:19.389Z"
+  timestamp: "2022-10-17T12:53:46.912Z"
 - fields:
   - name: total
     value:
@@ -234,7 +234,7 @@ dataPoints:
   - name: value
     value:
       int:
-        value: "5"
+        value: "6"
   tagFamilies:
   - name: default
     tags:
@@ -246,7 +246,7 @@ dataPoints:
       value:
         str:
           value: entity_1
-  timestamp: "2022-10-13T13:39:18.389Z"
+  timestamp: "2022-10-17T12:54:45.912Z"
 - fields:
   - name: total
     value:
@@ -255,7 +255,7 @@ dataPoints:
   - name: value
     value:
       int:
-        value: "4"
+        value: "7"
   tagFamilies:
   - name: default
     tags:
@@ -267,4 +267,4 @@ dataPoints:
       value:
         str:
           value: entity_1
-  timestamp: "2022-10-13T13:39:19.389Z"
+  timestamp: "2022-10-17T12:54:46.912Z"
diff --git a/test/cases/measure/data/want/group_max.yaml b/test/cases/measure/data/want/group_max.yaml
index f24bb15..bbe3923 100644
--- a/test/cases/measure/data/want/group_max.yaml
+++ b/test/cases/measure/data/want/group_max.yaml
@@ -16,39 +16,39 @@
 # under the License.
 
 dataPoints:
-  - fields:
-    - name: value
+- fields:
+  - name: value
+    value:
+      int:
+        value: "12"
+  tagFamilies:
+  - name: default
+    tags:
+    - key: entity_id
       value:
-        int:
-          value: "5"
-    tagFamilies:
-    - name: default
-      tags:
-      - key: entity_id
-        value:
-          str:
-            value: entity_1
-  - fields:
-    - name: value
+        str:
+          value: entity_1
+- fields:
+  - name: value
+    value:
+      int:
+        value: "10"
+  tagFamilies:
+  - name: default
+    tags:
+    - key: entity_id
       value:
-        int:
-          value: "10"
-    tagFamilies:
-    - name: default
-      tags:
-      - key: entity_id
-        value:
-          str:
-            value: entity_2
-  - fields:
-    - name: value
+        str:
+          value: entity_2
+- fields:
+  - name: value
+    value:
+      int:
+        value: "11"
+  tagFamilies:
+  - name: default
+    tags:
+    - key: entity_id
       value:
-        int:
-          value: "5"
-    tagFamilies:
-    - name: default
-      tags:
-      - key: entity_id
-        value:
-          str:
-            value: entity_3
\ No newline at end of file
+        str:
+          value: entity_3
\ No newline at end of file
diff --git a/test/cases/measure/data/want/limit.yaml b/test/cases/measure/data/want/limit.yaml
index 2267a74..ca93ea1 100644
--- a/test/cases/measure/data/want/limit.yaml
+++ b/test/cases/measure/data/want/limit.yaml
@@ -24,7 +24,7 @@ dataPoints:
   - name: value
     value:
       int:
-        value: "5"
+        value: "9"
   tagFamilies:
   - name: default
     tags:
@@ -32,7 +32,7 @@ dataPoints:
       value:
         str:
           value: entity_2
-  timestamp: "2022-10-13T14:13:50.440Z"
+  timestamp: "2022-10-17T12:57:03.444Z"
 - fields:
   - name: total
     value:
@@ -41,7 +41,7 @@ dataPoints:
   - name: value
     value:
       int:
-        value: "1"
+        value: "3"
   tagFamilies:
   - name: default
     tags:
@@ -49,4 +49,4 @@ dataPoints:
       value:
         str:
           value: entity_2
-  timestamp: "2022-10-13T14:14:49.440Z"
\ No newline at end of file
+  timestamp: "2022-10-17T12:58:02.444Z"
\ No newline at end of file
diff --git a/test/cases/measure/data/want/order_asc.yaml b/test/cases/measure/data/want/order_asc.yaml
index 67f6d11..d55db5e 100644
--- a/test/cases/measure/data/want/order_asc.yaml
+++ b/test/cases/measure/data/want/order_asc.yaml
@@ -32,7 +32,7 @@ dataPoints:
       value:
         str:
           value: entity_1
-  timestamp: "2022-10-13T14:11:30.182Z"
+  timestamp: "2022-10-17T12:54:36.627Z"
 - fields:
   - name: total
     value:
@@ -49,7 +49,7 @@ dataPoints:
       value:
         str:
           value: entity_2
-  timestamp: "2022-10-13T14:11:31.182Z"
+  timestamp: "2022-10-17T12:54:37.627Z"
 - fields:
   - name: total
     value:
@@ -58,7 +58,7 @@ dataPoints:
   - name: value
     value:
       int:
-        value: "1"
+        value: "2"
   tagFamilies:
   - name: default
     tags:
@@ -66,7 +66,7 @@ dataPoints:
       value:
         str:
           value: entity_2
-  timestamp: "2022-10-13T14:12:30.182Z"
+  timestamp: "2022-10-17T12:55:36.627Z"
 - fields:
   - name: total
     value:
@@ -75,7 +75,7 @@ dataPoints:
   - name: value
     value:
       int:
-        value: "5"
+        value: "9"
   tagFamilies:
   - name: default
     tags:
@@ -83,7 +83,7 @@ dataPoints:
       value:
         str:
           value: entity_2
-  timestamp: "2022-10-13T14:12:31.182Z"
+  timestamp: "2022-10-17T12:55:37.627Z"
 - fields:
   - name: total
     value:
@@ -92,7 +92,7 @@ dataPoints:
   - name: value
     value:
       int:
-        value: "1"
+        value: "3"
   tagFamilies:
   - name: default
     tags:
@@ -100,7 +100,7 @@ dataPoints:
       value:
         str:
           value: entity_2
-  timestamp: "2022-10-13T14:13:30.182Z"
+  timestamp: "2022-10-17T12:56:36.627Z"
 - fields:
   - name: total
     value:
@@ -109,7 +109,7 @@ dataPoints:
   - name: value
     value:
       int:
-        value: "3"
+        value: "8"
   tagFamilies:
   - name: default
     tags:
@@ -117,7 +117,7 @@ dataPoints:
       value:
         str:
           value: entity_2
-  timestamp: "2022-10-13T14:13:31.182Z"
+  timestamp: "2022-10-17T12:56:37.627Z"
 - fields:
   - name: total
     value:
@@ -134,7 +134,7 @@ dataPoints:
       value:
         str:
           value: entity_3
-  timestamp: "2022-10-13T14:14:30.182Z"
+  timestamp: "2022-10-17T12:57:36.627Z"
 - fields:
   - name: total
     value:
@@ -143,7 +143,7 @@ dataPoints:
   - name: value
     value:
       int:
-        value: "2"
+        value: "11"
   tagFamilies:
   - name: default
     tags:
@@ -151,7 +151,7 @@ dataPoints:
       value:
         str:
           value: entity_3
-  timestamp: "2022-10-13T14:14:31.182Z"
+  timestamp: "2022-10-17T12:57:37.627Z"
 - fields:
   - name: total
     value:
@@ -168,7 +168,7 @@ dataPoints:
       value:
         str:
           value: entity_1
-  timestamp: "2022-10-13T14:15:30.182Z"
+  timestamp: "2022-10-17T12:58:36.627Z"
 - fields:
   - name: total
     value:
@@ -177,7 +177,7 @@ dataPoints:
   - name: value
     value:
       int:
-        value: "1"
+        value: "12"
   tagFamilies:
   - name: default
     tags:
@@ -185,7 +185,7 @@ dataPoints:
       value:
         str:
           value: entity_1
-  timestamp: "2022-10-13T14:15:31.182Z"
+  timestamp: "2022-10-17T12:58:37.627Z"
 - fields:
   - name: total
     value:
@@ -194,7 +194,7 @@ dataPoints:
   - name: value
     value:
       int:
-        value: "5"
+        value: "6"
   tagFamilies:
   - name: default
     tags:
@@ -202,7 +202,7 @@ dataPoints:
       value:
         str:
           value: entity_1
-  timestamp: "2022-10-13T14:16:30.182Z"
+  timestamp: "2022-10-17T12:59:36.627Z"
 - fields:
   - name: total
     value:
@@ -211,7 +211,7 @@ dataPoints:
   - name: value
     value:
       int:
-        value: "4"
+        value: "7"
   tagFamilies:
   - name: default
     tags:
@@ -219,4 +219,4 @@ dataPoints:
       value:
         str:
           value: entity_1
-  timestamp: "2022-10-13T14:16:31.182Z"
\ No newline at end of file
+  timestamp: "2022-10-17T12:59:37.627Z"
\ No newline at end of file
diff --git a/test/cases/measure/data/want/order_desc.yaml b/test/cases/measure/data/want/order_desc.yaml
index b503b36..4c4f749 100644
--- a/test/cases/measure/data/want/order_desc.yaml
+++ b/test/cases/measure/data/want/order_desc.yaml
@@ -24,7 +24,7 @@ dataPoints:
   - name: value
     value:
       int:
-        value: "4"
+        value: "7"
   tagFamilies:
   - name: default
     tags:
@@ -32,7 +32,7 @@ dataPoints:
       value:
         str:
           value: entity_1
-  timestamp: "2022-10-13T14:17:50.440Z"
+  timestamp: "2022-10-17T13:01:03.444Z"
 - fields:
   - name: total
     value:
@@ -41,7 +41,7 @@ dataPoints:
   - name: value
     value:
       int:
-        value: "5"
+        value: "6"
   tagFamilies:
   - name: default
     tags:
@@ -49,7 +49,7 @@ dataPoints:
       value:
         str:
           value: entity_1
-  timestamp: "2022-10-13T14:17:49.440Z"
+  timestamp: "2022-10-17T13:01:02.444Z"
 - fields:
   - name: total
     value:
@@ -58,7 +58,7 @@ dataPoints:
   - name: value
     value:
       int:
-        value: "1"
+        value: "12"
   tagFamilies:
   - name: default
     tags:
@@ -66,7 +66,7 @@ dataPoints:
       value:
         str:
           value: entity_1
-  timestamp: "2022-10-13T14:16:50.440Z"
+  timestamp: "2022-10-17T13:00:03.444Z"
 - fields:
   - name: total
     value:
@@ -83,7 +83,7 @@ dataPoints:
       value:
         str:
           value: entity_1
-  timestamp: "2022-10-13T14:16:49.440Z"
+  timestamp: "2022-10-17T13:00:02.444Z"
 - fields:
   - name: total
     value:
@@ -92,7 +92,7 @@ dataPoints:
   - name: value
     value:
       int:
-        value: "2"
+        value: "11"
   tagFamilies:
   - name: default
     tags:
@@ -100,7 +100,7 @@ dataPoints:
       value:
         str:
           value: entity_3
-  timestamp: "2022-10-13T14:15:50.440Z"
+  timestamp: "2022-10-17T12:59:03.444Z"
 - fields:
   - name: total
     value:
@@ -117,7 +117,7 @@ dataPoints:
       value:
         str:
           value: entity_3
-  timestamp: "2022-10-13T14:15:49.440Z"
+  timestamp: "2022-10-17T12:59:02.444Z"
 - fields:
   - name: total
     value:
@@ -126,7 +126,7 @@ dataPoints:
   - name: value
     value:
       int:
-        value: "3"
+        value: "8"
   tagFamilies:
   - name: default
     tags:
@@ -134,7 +134,7 @@ dataPoints:
       value:
         str:
           value: entity_2
-  timestamp: "2022-10-13T14:14:50.440Z"
+  timestamp: "2022-10-17T12:58:03.444Z"
 - fields:
   - name: total
     value:
@@ -143,7 +143,7 @@ dataPoints:
   - name: value
     value:
       int:
-        value: "1"
+        value: "3"
   tagFamilies:
   - name: default
     tags:
@@ -151,7 +151,7 @@ dataPoints:
       value:
         str:
           value: entity_2
-  timestamp: "2022-10-13T14:14:49.440Z"
+  timestamp: "2022-10-17T12:58:02.444Z"
 - fields:
   - name: total
     value:
@@ -160,7 +160,7 @@ dataPoints:
   - name: value
     value:
       int:
-        value: "5"
+        value: "9"
   tagFamilies:
   - name: default
     tags:
@@ -168,7 +168,7 @@ dataPoints:
       value:
         str:
           value: entity_2
-  timestamp: "2022-10-13T14:13:50.440Z"
+  timestamp: "2022-10-17T12:57:03.444Z"
 - fields:
   - name: total
     value:
@@ -177,7 +177,7 @@ dataPoints:
   - name: value
     value:
       int:
-        value: "1"
+        value: "2"
   tagFamilies:
   - name: default
     tags:
@@ -185,7 +185,7 @@ dataPoints:
       value:
         str:
           value: entity_2
-  timestamp: "2022-10-13T14:13:49.440Z"
+  timestamp: "2022-10-17T12:57:02.444Z"
 - fields:
   - name: total
     value:
@@ -202,7 +202,7 @@ dataPoints:
       value:
         str:
           value: entity_2
-  timestamp: "2022-10-13T14:12:50.440Z"
+  timestamp: "2022-10-17T12:56:03.444Z"
 - fields:
   - name: total
     value:
@@ -219,4 +219,4 @@ dataPoints:
       value:
         str:
           value: entity_1
-  timestamp: "2022-10-13T14:12:49.440Z"
\ No newline at end of file
+  timestamp: "2022-10-17T12:56:02.444Z"
\ No newline at end of file
diff --git a/test/cases/topn/data/want/aggr_desc.yaml b/test/cases/topn/data/want/aggr_desc.yaml
index 336774a..85c4ee5 100644
--- a/test/cases/topn/data/want/aggr_desc.yaml
+++ b/test/cases/topn/data/want/aggr_desc.yaml
@@ -17,16 +17,16 @@
 
 lists:
 - items:
-  - name: entity_2
+  - name: entity_1
     value:
       int:
-        value: "10"
+        value: "12"
   - name: entity_3
     value:
       int:
-        value: "5"
-  - name: entity_1
+        value: "11"
+  - name: entity_2
     value:
       int:
-        value: "5"
-  timestamp: "2022-10-13T14:35:00Z"
\ No newline at end of file
+        value: "10"
+  timestamp: "2022-10-17T12:54:00Z"
\ No newline at end of file
diff --git a/test/cases/topn/data/want/asc.yaml b/test/cases/topn/data/want/asc.yaml
index 6eebcc6..14c2f44 100644
--- a/test/cases/topn/data/want/asc.yaml
+++ b/test/cases/topn/data/want/asc.yaml
@@ -21,34 +21,34 @@ lists:
     value:
       int:
         value: "1"
-  timestamp: "2022-10-13T14:14:00Z"
+  timestamp: "2022-10-17T12:49:00Z"
 - items:
   - name: entity_2
     value:
       int:
-        value: "1"
-  timestamp: "2022-10-13T14:15:00Z"
+        value: "2"
+  timestamp: "2022-10-17T12:50:00Z"
 - items:
   - name: entity_2
     value:
       int:
-        value: "1"
-  timestamp: "2022-10-13T14:16:00Z"
+        value: "3"
+  timestamp: "2022-10-17T12:51:00Z"
 - items:
   - name: entity_3
     value:
       int:
-        value: "2"
-  timestamp: "2022-10-13T14:17:00Z"
+        value: "5"
+  timestamp: "2022-10-17T12:52:00Z"
 - items:
   - name: entity_1
     value:
       int:
-        value: "1"
-  timestamp: "2022-10-13T14:18:00Z"
+        value: "4"
+  timestamp: "2022-10-17T12:53:00Z"
 - items:
   - name: entity_1
     value:
       int:
-        value: "5"
-  timestamp: "2022-10-13T14:19:00Z"
\ No newline at end of file
+        value: "6"
+  timestamp: "2022-10-17T12:54:00Z"
\ No newline at end of file
diff --git a/test/cases/topn/data/want/condition_aggr_desc.yaml b/test/cases/topn/data/want/condition_aggr_desc.yaml
index d1ae3bb..784e5c3 100644
--- a/test/cases/topn/data/want/condition_aggr_desc.yaml
+++ b/test/cases/topn/data/want/condition_aggr_desc.yaml
@@ -20,5 +20,5 @@ lists:
   - name: entity_1
     value:
       int:
-        value: "5"
-  timestamp: "2022-10-13T14:45:00Z"
\ No newline at end of file
+        value: "12"
+  timestamp: "2022-10-17T12:54:00Z"
\ No newline at end of file
diff --git a/test/cases/topn/data/want/desc.yaml b/test/cases/topn/data/want/desc.yaml
index dbc19ab..53f58be 100644
--- a/test/cases/topn/data/want/desc.yaml
+++ b/test/cases/topn/data/want/desc.yaml
@@ -21,34 +21,34 @@ lists:
     value:
       int:
         value: "10"
-  timestamp: "2022-10-13T14:14:00Z"
+  timestamp: "2022-10-17T12:49:00Z"
 - items:
   - name: entity_2
     value:
       int:
-        value: "5"
-  timestamp: "2022-10-13T14:15:00Z"
+        value: "9"
+  timestamp: "2022-10-17T12:50:00Z"
 - items:
   - name: entity_2
     value:
       int:
-        value: "3"
-  timestamp: "2022-10-13T14:16:00Z"
+        value: "8"
+  timestamp: "2022-10-17T12:51:00Z"
 - items:
   - name: entity_3
     value:
       int:
-        value: "5"
-  timestamp: "2022-10-13T14:17:00Z"
+        value: "11"
+  timestamp: "2022-10-17T12:52:00Z"
 - items:
   - name: entity_1
     value:
       int:
-        value: "4"
-  timestamp: "2022-10-13T14:18:00Z"
+        value: "12"
+  timestamp: "2022-10-17T12:53:00Z"
 - items:
   - name: entity_1
     value:
       int:
-        value: "5"
-  timestamp: "2022-10-13T14:19:00Z"
\ No newline at end of file
+        value: "6"
+  timestamp: "2022-10-17T12:54:00Z"
\ No newline at end of file


[skywalking-banyandb] 02/02: fix lint

Posted by lu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lujiajing pushed a commit to branch fix-topn-post-processor
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit 61fd5103e79352a6d8e69435fb14035e1928f1db
Author: Megrez Lu <lu...@gmail.com>
AuthorDate: Mon Oct 17 22:47:08 2022 +0800

    fix lint
---
 pkg/flow/dedup_priority_queue.go | 15 ---------------
 1 file changed, 15 deletions(-)

diff --git a/pkg/flow/dedup_priority_queue.go b/pkg/flow/dedup_priority_queue.go
index 55fa120..a5c0f8e 100644
--- a/pkg/flow/dedup_priority_queue.go
+++ b/pkg/flow/dedup_priority_queue.go
@@ -21,7 +21,6 @@ import (
 	"container/heap"
 
 	"github.com/emirpasic/gods/utils"
-	"github.com/pkg/errors"
 )
 
 var _ heap.Interface = (*DedupPriorityQueue)(nil)
@@ -50,20 +49,6 @@ func NewPriorityQueue(comparator utils.Comparator, allowDuplicates bool) *DedupP
 	}
 }
 
-func (pq *DedupPriorityQueue) initCache() error {
-	if pq.allowDuplicates || len(pq.Items) == 0 {
-		return nil
-	}
-	for _, elem := range pq.Items {
-		if _, ok := pq.cache[elem]; !ok {
-			pq.cache[elem] = struct{}{}
-		} else {
-			return errors.New("duplicated item is not allowed")
-		}
-	}
-	return nil
-}
-
 // Len returns the DedupPriorityQueue length.
 func (pq *DedupPriorityQueue) Len() int { return len(pq.Items) }