You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ha...@apache.org on 2022/10/17 22:40:15 UTC

[skywalking-banyandb] branch main updated: Fix TopN sort direction issue and TopN query issue (#194)

This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new cf6bdc1  Fix TopN sort direction issue and TopN query issue (#194)
cf6bdc1 is described below

commit cf6bdc10bf401efee1dc00034be58f675b990515
Author: Jiajing LU <lu...@gmail.com>
AuthorDate: Tue Oct 18 06:39:45 2022 +0800

    Fix TopN sort direction issue and TopN query issue (#194)
---
 banyand/measure/measure_topn.go                    | 11 ++-
 banyand/query/processor_topn.go                    | 90 +++++++++++++---------
 pkg/flow/dedup_priority_queue.go                   | 54 ++-----------
 .../data/testdata/service_cpm_minute_data.json     |  6 +-
 .../data/testdata/service_cpm_minute_data1.json    | 10 +--
 test/cases/measure/data/want/all.yaml              | 40 +++++-----
 test/cases/measure/data/want/group_max.yaml        | 66 ++++++++--------
 test/cases/measure/data/want/limit.yaml            |  8 +-
 test/cases/measure/data/want/order_asc.yaml        | 40 +++++-----
 test/cases/measure/data/want/order_desc.yaml       | 40 +++++-----
 test/cases/topn/data/want/aggr_desc.yaml           | 12 +--
 test/cases/topn/data/want/asc.yaml                 | 22 +++---
 test/cases/topn/data/want/condition_aggr_desc.yaml |  4 +-
 test/cases/topn/data/want/desc.yaml                | 22 +++---
 14 files changed, 201 insertions(+), 224 deletions(-)

diff --git a/banyand/measure/measure_topn.go b/banyand/measure/measure_topn.go
index 8a964b1..183b5f0 100644
--- a/banyand/measure/measure_topn.go
+++ b/banyand/measure/measure_topn.go
@@ -247,15 +247,20 @@ func (t *topNStreamingProcessor) locate(tagValues []*modelv1.TagValue, rankNum i
 	if len(t.topNSchema.GetGroupByTagNames()) != len(tagValues) {
 		return nil, 0, errors.New("no enough tag values for the entity")
 	}
-	entity := make(tsdb.Entity, 1+1+len(t.topNSchema.GetGroupByTagNames()))
+	// entity prefix
+	// 1) source measure Name + topN aggregation Name
+	// 2) sort direction
+	// 3) rank number
+	entity := make(tsdb.Entity, 1+1+1+len(t.topNSchema.GetGroupByTagNames()))
 	// entity prefix
 	entity[0] = []byte(formatMeasureCompanionPrefix(t.topNSchema.GetSourceMeasure().GetName(),
 		t.topNSchema.GetMetadata().GetName()))
-	entity[1] = convert.Int64ToBytes(int64(rankNum))
+	entity[1] = convert.Int64ToBytes(int64(t.sortDirection.Number()))
+	entity[2] = convert.Int64ToBytes(int64(rankNum))
 	// measureID as sharding key
 	for idx, tagVal := range tagValues {
 		var innerErr error
-		entity[idx+2], innerErr = pbv1.MarshalIndexFieldValue(tagVal)
+		entity[idx+3], innerErr = pbv1.MarshalIndexFieldValue(tagVal)
 		if innerErr != nil {
 			return nil, 0, innerErr
 		}
diff --git a/banyand/query/processor_topn.go b/banyand/query/processor_topn.go
index e8aa712..91f1199 100644
--- a/banyand/query/processor_topn.go
+++ b/banyand/query/processor_topn.go
@@ -21,7 +21,6 @@ import (
 	"bytes"
 	"container/heap"
 	"context"
-	"math"
 	"time"
 
 	"github.com/pkg/errors"
@@ -88,7 +87,7 @@ func (t *topNQueryProcessor) Rev(message bus.Message) (resp bus.Message) {
 	}
 	aggregator := createTopNPostAggregator(request.GetTopN(),
 		request.GetAgg(), request.GetFieldValueSort())
-	entity, err := locateEntity(topNSchema, request.GetConditions())
+	entity, err := locateEntity(topNSchema, request.GetFieldValueSort(), request.GetConditions())
 	if err != nil {
 		t.log.Error().Err(err).
 			Str("topN", topNMetadata.GetName()).
@@ -134,14 +133,17 @@ func (t *topNQueryProcessor) Rev(message bus.Message) (resp bus.Message) {
 	return
 }
 
-func locateEntity(topNSchema *databasev1.TopNAggregation, conditions []*modelv1.Condition) (tsdb.Entity, error) {
+func locateEntity(topNSchema *databasev1.TopNAggregation, sortDirection modelv1.Sort, conditions []*modelv1.Condition) (tsdb.Entity, error) {
 	entityMap := make(map[string]int)
-	entity := make([]tsdb.Entry, 1+len(topNSchema.GetGroupByTagNames()))
-	entity[0] = tsdb.AnyEntry
+	entity := make([]tsdb.Entry, 1+1+len(topNSchema.GetGroupByTagNames()))
+	// sortDirection
+	entity[0] = convert.Int64ToBytes(int64(sortDirection.Number()))
+	// rankNumber
+	entity[1] = tsdb.AnyEntry
 	for idx, tagName := range topNSchema.GetGroupByTagNames() {
-		entityMap[tagName] = idx + 1
+		entityMap[tagName] = idx + 2
 		// fill AnyEntry by default
-		entity[idx+1] = tsdb.AnyEntry
+		entity[idx+2] = tsdb.AnyEntry
 	}
 	for _, pairQuery := range conditions {
 		if pairQuery.GetOp() != modelv1.Condition_BINARY_OP_EQ {
@@ -199,7 +201,9 @@ func (t *topNQueryProcessor) scanSeries(series tsdb.Series, request *measurev1.T
 		request.GetTimeRange().GetEnd().AsTime()),
 	)
 	defer func(seriesSpan tsdb.SeriesSpan) {
-		_ = seriesSpan.Close()
+		if seriesSpan != nil {
+			_ = seriesSpan.Close()
+		}
 	}(seriesSpan)
 	if err != nil {
 		return nil, err
@@ -258,11 +262,14 @@ func (aggr postAggregationProcessor) Len() int {
 	return len(aggr.items)
 }
 
+// Less reports whether min/max heap has to be built.
+// For DESC, a min heap has to be built,
+// while for ASC, a max heap has to be built.
 func (aggr postAggregationProcessor) Less(i, j int) bool {
 	if aggr.sort == modelv1.Sort_SORT_DESC {
-		return aggr.items[i].int64Func.Val() > aggr.items[j].int64Func.Val()
+		return aggr.items[i].int64Func.Val() < aggr.items[j].int64Func.Val()
 	}
-	return aggr.items[i].int64Func.Val() < aggr.items[j].int64Func.Val()
+	return aggr.items[i].int64Func.Val() > aggr.items[j].int64Func.Val()
 }
 
 func (aggr *postAggregationProcessor) Swap(i, j int) {
@@ -298,6 +305,7 @@ func (aggr *postAggregationProcessor) put(key string, val int64, timestampMillis
 		heap.Fix(aggr, item.index)
 		return nil
 	}
+
 	aggrFunc, err := aggregation.NewInt64Func(aggr.aggrFunc)
 	if err != nil {
 		return err
@@ -307,24 +315,40 @@ func (aggr *postAggregationProcessor) put(key string, val int64, timestampMillis
 		int64Func: aggrFunc,
 	}
 	item.int64Func.In(val)
-	aggr.cache[key] = item
-	heap.Push(aggr, item)
+
+	if aggr.Len() < int(aggr.topN) {
+		aggr.cache[key] = item
+		heap.Push(aggr, item)
+	} else {
+		if lowest := aggr.items[0]; lowest != nil {
+			if aggr.sort == modelv1.Sort_SORT_DESC && lowest.int64Func.Val() < val {
+				aggr.cache[key] = item
+				aggr.items[0] = item
+				heap.Fix(aggr, 0)
+			} else if aggr.sort != modelv1.Sort_SORT_DESC && lowest.int64Func.Val() > val {
+				aggr.cache[key] = item
+				aggr.items[0] = item
+				heap.Fix(aggr, 0)
+			}
+		}
+	}
+
 	return nil
 }
 
 func (aggr *postAggregationProcessor) val() []*measurev1.TopNList {
-	itemLen := int(math.Min(float64(aggr.topN), float64(aggr.Len())))
-	topNItems := make([]*measurev1.TopNList_Item, 0, itemLen)
+	topNItems := make([]*measurev1.TopNList_Item, aggr.Len())
 
-	for _, item := range aggr.items[0:itemLen] {
-		topNItems = append(topNItems, &measurev1.TopNList_Item{
+	for aggr.Len() > 0 {
+		item := heap.Pop(aggr).(*aggregatorItem)
+		topNItems[aggr.Len()] = &measurev1.TopNList_Item{
 			Name: item.key,
 			Value: &modelv1.FieldValue{
 				Value: &modelv1.FieldValue_Int{
 					Int: &modelv1.Int{Value: item.int64Func.Val()},
 				},
 			},
-		})
+		}
 	}
 	return []*measurev1.TopNList{
 		{
@@ -360,8 +384,8 @@ func (naggr *postNonAggregationProcessor) val() []*measurev1.TopNList {
 	topNLists := make([]*measurev1.TopNList, 0, len(naggr.timelines))
 	for ts, timeline := range naggr.timelines {
 		items := make([]*measurev1.TopNList_Item, timeline.Len())
-		for _, elem := range timeline.Values() {
-			items[elem.GetIndex()] = &measurev1.TopNList_Item{
+		for idx, elem := range timeline.Values() {
+			items[idx] = &measurev1.TopNList_Item{
 				Name: elem.(*nonAggregatorItem).key,
 				Value: &modelv1.FieldValue{
 					Value: &modelv1.FieldValue_Int{
@@ -393,21 +417,11 @@ func (naggr *postNonAggregationProcessor) put(key string, val int64, timestampMi
 		if timeline.Len() < int(naggr.topN) {
 			heap.Push(timeline, &nonAggregatorItem{val: val, key: key})
 		} else {
-			if right := timeline.Right(); right != nil {
-				if naggr.sort == modelv1.Sort_SORT_DESC && right.(*nonAggregatorItem).val < val {
-					heap.Push(timeline, &nonAggregatorItem{val: val, key: key})
-					newTimeline, err := timeline.WithNewItems(timeline.Slice(0, int(naggr.topN)))
-					if err != nil {
-						return err
-					}
-					naggr.timelines[timestampMillis] = newTimeline
-				} else if naggr.sort != modelv1.Sort_SORT_DESC && right.(*nonAggregatorItem).val > val {
-					heap.Push(timeline, &nonAggregatorItem{val: val, key: key})
-					newTimeline, err := timeline.WithNewItems(timeline.Slice(0, int(naggr.topN)))
-					if err != nil {
-						return err
-					}
-					naggr.timelines[timestampMillis] = newTimeline
+			if lowest := timeline.Peek(); lowest != nil {
+				if naggr.sort == modelv1.Sort_SORT_DESC && lowest.(*nonAggregatorItem).val < val {
+					timeline.ReplaceLowest(&nonAggregatorItem{val: val, key: key})
+				} else if naggr.sort != modelv1.Sort_SORT_DESC && lowest.(*nonAggregatorItem).val > val {
+					timeline.ReplaceLowest(&nonAggregatorItem{val: val, key: key})
 				}
 			}
 		}
@@ -417,19 +431,19 @@ func (naggr *postNonAggregationProcessor) put(key string, val int64, timestampMi
 	timeline := flow.NewPriorityQueue(func(a, b interface{}) int {
 		if naggr.sort == modelv1.Sort_SORT_DESC {
 			if a.(*nonAggregatorItem).val < b.(*nonAggregatorItem).val {
-				return 1
+				return -1
 			} else if a.(*nonAggregatorItem).val == b.(*nonAggregatorItem).val {
 				return 0
 			} else {
-				return -1
+				return 1
 			}
 		}
 		if a.(*nonAggregatorItem).val < b.(*nonAggregatorItem).val {
-			return -1
+			return 1
 		} else if a.(*nonAggregatorItem).val == b.(*nonAggregatorItem).val {
 			return 0
 		} else {
-			return 1
+			return -1
 		}
 	}, false)
 	naggr.timelines[timestampMillis] = timeline
diff --git a/pkg/flow/dedup_priority_queue.go b/pkg/flow/dedup_priority_queue.go
index fbd1b2b..a5c0f8e 100644
--- a/pkg/flow/dedup_priority_queue.go
+++ b/pkg/flow/dedup_priority_queue.go
@@ -21,7 +21,6 @@ import (
 	"container/heap"
 
 	"github.com/emirpasic/gods/utils"
-	"github.com/pkg/errors"
 )
 
 var _ heap.Interface = (*DedupPriorityQueue)(nil)
@@ -50,20 +49,6 @@ func NewPriorityQueue(comparator utils.Comparator, allowDuplicates bool) *DedupP
 	}
 }
 
-func (pq *DedupPriorityQueue) initCache() error {
-	if pq.allowDuplicates || len(pq.Items) == 0 {
-		return nil
-	}
-	for _, elem := range pq.Items {
-		if _, ok := pq.cache[elem]; !ok {
-			pq.cache[elem] = struct{}{}
-		} else {
-			return errors.New("duplicated item is not allowed")
-		}
-	}
-	return nil
-}
-
 // Len returns the DedupPriorityQueue length.
 func (pq *DedupPriorityQueue) Len() int { return len(pq.Items) }
 
@@ -116,43 +101,16 @@ func (pq *DedupPriorityQueue) Peek() Element {
 	return nil
 }
 
-// Slice returns a sliced DedupPriorityQueue using the given bounds.
-func (pq *DedupPriorityQueue) Slice(start, end int) []Element {
-	return pq.Items[start:end]
+func (pq *DedupPriorityQueue) ReplaceLowest(newLowest Element) {
+	pq.Items[0] = newLowest
+	heap.Fix(pq, 0)
 }
 
 func (pq *DedupPriorityQueue) Values() []Element {
 	values := make([]Element, pq.Len())
-	for _, elem := range pq.Items {
-		values[elem.GetIndex()] = elem
+	for pq.Len() > 0 {
+		item := heap.Pop(pq).(Element)
+		values[pq.Len()] = item
 	}
 	return values
 }
-
-func (pq *DedupPriorityQueue) Left() Element {
-	if pq.Len() == 0 {
-		return nil
-	}
-	return pq.Items[0]
-}
-
-func (pq *DedupPriorityQueue) Right() Element {
-	if pq.Len() == 0 {
-		return nil
-	}
-	return pq.Items[pq.Len()-1]
-}
-
-func (pq *DedupPriorityQueue) WithNewItems(items []Element) (*DedupPriorityQueue, error) {
-	newPq := &DedupPriorityQueue{
-		Items:           items,
-		cache:           make(map[Element]struct{}),
-		allowDuplicates: pq.allowDuplicates,
-		comparator:      pq.comparator,
-	}
-	err := newPq.initCache()
-	if err != nil {
-		return nil, err
-	}
-	return newPq, nil
-}
diff --git a/test/cases/measure/data/testdata/service_cpm_minute_data.json b/test/cases/measure/data/testdata/service_cpm_minute_data.json
index 877290c..c2dfa53 100644
--- a/test/cases/measure/data/testdata/service_cpm_minute_data.json
+++ b/test/cases/measure/data/testdata/service_cpm_minute_data.json
@@ -54,7 +54,7 @@
       },
       {
         "int": {
-          "value": 1
+          "value": 2
         }
       }
     ]
@@ -84,7 +84,7 @@
       },
       {
         "int": {
-          "value": 1
+          "value": 3
         }
       }
     ]
@@ -174,7 +174,7 @@
       },
       {
         "int": {
-          "value": 5
+          "value": 6
         }
       }
     ]
diff --git a/test/cases/measure/data/testdata/service_cpm_minute_data1.json b/test/cases/measure/data/testdata/service_cpm_minute_data1.json
index 44bd61a..2ee3ecf 100644
--- a/test/cases/measure/data/testdata/service_cpm_minute_data1.json
+++ b/test/cases/measure/data/testdata/service_cpm_minute_data1.json
@@ -54,7 +54,7 @@
       },
       {
         "int": {
-          "value": 5
+          "value": 9
         }
       }
     ]
@@ -84,7 +84,7 @@
       },
       {
         "int": {
-          "value": 3
+          "value": 8
         }
       }
     ]
@@ -114,7 +114,7 @@
       },
       {
         "int": {
-          "value": 2
+          "value": 11
         }
       }
     ]
@@ -144,7 +144,7 @@
       },
       {
         "int": {
-          "value": 1
+          "value": 12
         }
       }
     ]
@@ -174,7 +174,7 @@
       },
       {
         "int": {
-          "value": 4
+          "value": 7
         }
       }
     ]
diff --git a/test/cases/measure/data/want/all.yaml b/test/cases/measure/data/want/all.yaml
index 5041ece..5d41213 100644
--- a/test/cases/measure/data/want/all.yaml
+++ b/test/cases/measure/data/want/all.yaml
@@ -36,7 +36,7 @@ dataPoints:
       value:
         str:
           value: entity_1
-  timestamp: "2022-10-13T13:34:18.389Z"
+  timestamp: "2022-10-17T12:49:45.912Z"
 - fields:
   - name: total
     value:
@@ -57,7 +57,7 @@ dataPoints:
       value:
         str:
           value: entity_2
-  timestamp: "2022-10-13T13:34:19.389Z"
+  timestamp: "2022-10-17T12:49:46.912Z"
 - fields:
   - name: total
     value:
@@ -66,7 +66,7 @@ dataPoints:
   - name: value
     value:
       int:
-        value: "1"
+        value: "2"
   tagFamilies:
   - name: default
     tags:
@@ -78,7 +78,7 @@ dataPoints:
       value:
         str:
           value: entity_2
-  timestamp: "2022-10-13T13:35:18.389Z"
+  timestamp: "2022-10-17T12:50:45.912Z"
 - fields:
   - name: total
     value:
@@ -87,7 +87,7 @@ dataPoints:
   - name: value
     value:
       int:
-        value: "5"
+        value: "9"
   tagFamilies:
   - name: default
     tags:
@@ -99,7 +99,7 @@ dataPoints:
       value:
         str:
           value: entity_2
-  timestamp: "2022-10-13T13:35:19.389Z"
+  timestamp: "2022-10-17T12:50:46.912Z"
 - fields:
   - name: total
     value:
@@ -108,7 +108,7 @@ dataPoints:
   - name: value
     value:
       int:
-        value: "1"
+        value: "3"
   tagFamilies:
   - name: default
     tags:
@@ -120,7 +120,7 @@ dataPoints:
       value:
         str:
           value: entity_2
-  timestamp: "2022-10-13T13:36:18.389Z"
+  timestamp: "2022-10-17T12:51:45.912Z"
 - fields:
   - name: total
     value:
@@ -129,7 +129,7 @@ dataPoints:
   - name: value
     value:
       int:
-        value: "3"
+        value: "8"
   tagFamilies:
   - name: default
     tags:
@@ -141,7 +141,7 @@ dataPoints:
       value:
         str:
           value: entity_2
-  timestamp: "2022-10-13T13:36:19.389Z"
+  timestamp: "2022-10-17T12:51:46.912Z"
 - fields:
   - name: total
     value:
@@ -162,7 +162,7 @@ dataPoints:
       value:
         str:
           value: entity_3
-  timestamp: "2022-10-13T13:37:18.389Z"
+  timestamp: "2022-10-17T12:52:45.912Z"
 - fields:
   - name: total
     value:
@@ -171,7 +171,7 @@ dataPoints:
   - name: value
     value:
       int:
-        value: "2"
+        value: "11"
   tagFamilies:
   - name: default
     tags:
@@ -183,7 +183,7 @@ dataPoints:
       value:
         str:
           value: entity_3
-  timestamp: "2022-10-13T13:37:19.389Z"
+  timestamp: "2022-10-17T12:52:46.912Z"
 - fields:
   - name: total
     value:
@@ -204,7 +204,7 @@ dataPoints:
       value:
         str:
           value: entity_1
-  timestamp: "2022-10-13T13:38:18.389Z"
+  timestamp: "2022-10-17T12:53:45.912Z"
 - fields:
   - name: total
     value:
@@ -213,7 +213,7 @@ dataPoints:
   - name: value
     value:
       int:
-        value: "1"
+        value: "12"
   tagFamilies:
   - name: default
     tags:
@@ -225,7 +225,7 @@ dataPoints:
       value:
         str:
           value: entity_1
-  timestamp: "2022-10-13T13:38:19.389Z"
+  timestamp: "2022-10-17T12:53:46.912Z"
 - fields:
   - name: total
     value:
@@ -234,7 +234,7 @@ dataPoints:
   - name: value
     value:
       int:
-        value: "5"
+        value: "6"
   tagFamilies:
   - name: default
     tags:
@@ -246,7 +246,7 @@ dataPoints:
       value:
         str:
           value: entity_1
-  timestamp: "2022-10-13T13:39:18.389Z"
+  timestamp: "2022-10-17T12:54:45.912Z"
 - fields:
   - name: total
     value:
@@ -255,7 +255,7 @@ dataPoints:
   - name: value
     value:
       int:
-        value: "4"
+        value: "7"
   tagFamilies:
   - name: default
     tags:
@@ -267,4 +267,4 @@ dataPoints:
       value:
         str:
           value: entity_1
-  timestamp: "2022-10-13T13:39:19.389Z"
+  timestamp: "2022-10-17T12:54:46.912Z"
diff --git a/test/cases/measure/data/want/group_max.yaml b/test/cases/measure/data/want/group_max.yaml
index f24bb15..bbe3923 100644
--- a/test/cases/measure/data/want/group_max.yaml
+++ b/test/cases/measure/data/want/group_max.yaml
@@ -16,39 +16,39 @@
 # under the License.
 
 dataPoints:
-  - fields:
-    - name: value
+- fields:
+  - name: value
+    value:
+      int:
+        value: "12"
+  tagFamilies:
+  - name: default
+    tags:
+    - key: entity_id
       value:
-        int:
-          value: "5"
-    tagFamilies:
-    - name: default
-      tags:
-      - key: entity_id
-        value:
-          str:
-            value: entity_1
-  - fields:
-    - name: value
+        str:
+          value: entity_1
+- fields:
+  - name: value
+    value:
+      int:
+        value: "10"
+  tagFamilies:
+  - name: default
+    tags:
+    - key: entity_id
       value:
-        int:
-          value: "10"
-    tagFamilies:
-    - name: default
-      tags:
-      - key: entity_id
-        value:
-          str:
-            value: entity_2
-  - fields:
-    - name: value
+        str:
+          value: entity_2
+- fields:
+  - name: value
+    value:
+      int:
+        value: "11"
+  tagFamilies:
+  - name: default
+    tags:
+    - key: entity_id
       value:
-        int:
-          value: "5"
-    tagFamilies:
-    - name: default
-      tags:
-      - key: entity_id
-        value:
-          str:
-            value: entity_3
\ No newline at end of file
+        str:
+          value: entity_3
\ No newline at end of file
diff --git a/test/cases/measure/data/want/limit.yaml b/test/cases/measure/data/want/limit.yaml
index 2267a74..ca93ea1 100644
--- a/test/cases/measure/data/want/limit.yaml
+++ b/test/cases/measure/data/want/limit.yaml
@@ -24,7 +24,7 @@ dataPoints:
   - name: value
     value:
       int:
-        value: "5"
+        value: "9"
   tagFamilies:
   - name: default
     tags:
@@ -32,7 +32,7 @@ dataPoints:
       value:
         str:
           value: entity_2
-  timestamp: "2022-10-13T14:13:50.440Z"
+  timestamp: "2022-10-17T12:57:03.444Z"
 - fields:
   - name: total
     value:
@@ -41,7 +41,7 @@ dataPoints:
   - name: value
     value:
       int:
-        value: "1"
+        value: "3"
   tagFamilies:
   - name: default
     tags:
@@ -49,4 +49,4 @@ dataPoints:
       value:
         str:
           value: entity_2
-  timestamp: "2022-10-13T14:14:49.440Z"
\ No newline at end of file
+  timestamp: "2022-10-17T12:58:02.444Z"
\ No newline at end of file
diff --git a/test/cases/measure/data/want/order_asc.yaml b/test/cases/measure/data/want/order_asc.yaml
index 67f6d11..d55db5e 100644
--- a/test/cases/measure/data/want/order_asc.yaml
+++ b/test/cases/measure/data/want/order_asc.yaml
@@ -32,7 +32,7 @@ dataPoints:
       value:
         str:
           value: entity_1
-  timestamp: "2022-10-13T14:11:30.182Z"
+  timestamp: "2022-10-17T12:54:36.627Z"
 - fields:
   - name: total
     value:
@@ -49,7 +49,7 @@ dataPoints:
       value:
         str:
           value: entity_2
-  timestamp: "2022-10-13T14:11:31.182Z"
+  timestamp: "2022-10-17T12:54:37.627Z"
 - fields:
   - name: total
     value:
@@ -58,7 +58,7 @@ dataPoints:
   - name: value
     value:
       int:
-        value: "1"
+        value: "2"
   tagFamilies:
   - name: default
     tags:
@@ -66,7 +66,7 @@ dataPoints:
       value:
         str:
           value: entity_2
-  timestamp: "2022-10-13T14:12:30.182Z"
+  timestamp: "2022-10-17T12:55:36.627Z"
 - fields:
   - name: total
     value:
@@ -75,7 +75,7 @@ dataPoints:
   - name: value
     value:
       int:
-        value: "5"
+        value: "9"
   tagFamilies:
   - name: default
     tags:
@@ -83,7 +83,7 @@ dataPoints:
       value:
         str:
           value: entity_2
-  timestamp: "2022-10-13T14:12:31.182Z"
+  timestamp: "2022-10-17T12:55:37.627Z"
 - fields:
   - name: total
     value:
@@ -92,7 +92,7 @@ dataPoints:
   - name: value
     value:
       int:
-        value: "1"
+        value: "3"
   tagFamilies:
   - name: default
     tags:
@@ -100,7 +100,7 @@ dataPoints:
       value:
         str:
           value: entity_2
-  timestamp: "2022-10-13T14:13:30.182Z"
+  timestamp: "2022-10-17T12:56:36.627Z"
 - fields:
   - name: total
     value:
@@ -109,7 +109,7 @@ dataPoints:
   - name: value
     value:
       int:
-        value: "3"
+        value: "8"
   tagFamilies:
   - name: default
     tags:
@@ -117,7 +117,7 @@ dataPoints:
       value:
         str:
           value: entity_2
-  timestamp: "2022-10-13T14:13:31.182Z"
+  timestamp: "2022-10-17T12:56:37.627Z"
 - fields:
   - name: total
     value:
@@ -134,7 +134,7 @@ dataPoints:
       value:
         str:
           value: entity_3
-  timestamp: "2022-10-13T14:14:30.182Z"
+  timestamp: "2022-10-17T12:57:36.627Z"
 - fields:
   - name: total
     value:
@@ -143,7 +143,7 @@ dataPoints:
   - name: value
     value:
       int:
-        value: "2"
+        value: "11"
   tagFamilies:
   - name: default
     tags:
@@ -151,7 +151,7 @@ dataPoints:
       value:
         str:
           value: entity_3
-  timestamp: "2022-10-13T14:14:31.182Z"
+  timestamp: "2022-10-17T12:57:37.627Z"
 - fields:
   - name: total
     value:
@@ -168,7 +168,7 @@ dataPoints:
       value:
         str:
           value: entity_1
-  timestamp: "2022-10-13T14:15:30.182Z"
+  timestamp: "2022-10-17T12:58:36.627Z"
 - fields:
   - name: total
     value:
@@ -177,7 +177,7 @@ dataPoints:
   - name: value
     value:
       int:
-        value: "1"
+        value: "12"
   tagFamilies:
   - name: default
     tags:
@@ -185,7 +185,7 @@ dataPoints:
       value:
         str:
           value: entity_1
-  timestamp: "2022-10-13T14:15:31.182Z"
+  timestamp: "2022-10-17T12:58:37.627Z"
 - fields:
   - name: total
     value:
@@ -194,7 +194,7 @@ dataPoints:
   - name: value
     value:
       int:
-        value: "5"
+        value: "6"
   tagFamilies:
   - name: default
     tags:
@@ -202,7 +202,7 @@ dataPoints:
       value:
         str:
           value: entity_1
-  timestamp: "2022-10-13T14:16:30.182Z"
+  timestamp: "2022-10-17T12:59:36.627Z"
 - fields:
   - name: total
     value:
@@ -211,7 +211,7 @@ dataPoints:
   - name: value
     value:
       int:
-        value: "4"
+        value: "7"
   tagFamilies:
   - name: default
     tags:
@@ -219,4 +219,4 @@ dataPoints:
       value:
         str:
           value: entity_1
-  timestamp: "2022-10-13T14:16:31.182Z"
\ No newline at end of file
+  timestamp: "2022-10-17T12:59:37.627Z"
\ No newline at end of file
diff --git a/test/cases/measure/data/want/order_desc.yaml b/test/cases/measure/data/want/order_desc.yaml
index b503b36..4c4f749 100644
--- a/test/cases/measure/data/want/order_desc.yaml
+++ b/test/cases/measure/data/want/order_desc.yaml
@@ -24,7 +24,7 @@ dataPoints:
   - name: value
     value:
       int:
-        value: "4"
+        value: "7"
   tagFamilies:
   - name: default
     tags:
@@ -32,7 +32,7 @@ dataPoints:
       value:
         str:
           value: entity_1
-  timestamp: "2022-10-13T14:17:50.440Z"
+  timestamp: "2022-10-17T13:01:03.444Z"
 - fields:
   - name: total
     value:
@@ -41,7 +41,7 @@ dataPoints:
   - name: value
     value:
       int:
-        value: "5"
+        value: "6"
   tagFamilies:
   - name: default
     tags:
@@ -49,7 +49,7 @@ dataPoints:
       value:
         str:
           value: entity_1
-  timestamp: "2022-10-13T14:17:49.440Z"
+  timestamp: "2022-10-17T13:01:02.444Z"
 - fields:
   - name: total
     value:
@@ -58,7 +58,7 @@ dataPoints:
   - name: value
     value:
       int:
-        value: "1"
+        value: "12"
   tagFamilies:
   - name: default
     tags:
@@ -66,7 +66,7 @@ dataPoints:
       value:
         str:
           value: entity_1
-  timestamp: "2022-10-13T14:16:50.440Z"
+  timestamp: "2022-10-17T13:00:03.444Z"
 - fields:
   - name: total
     value:
@@ -83,7 +83,7 @@ dataPoints:
       value:
         str:
           value: entity_1
-  timestamp: "2022-10-13T14:16:49.440Z"
+  timestamp: "2022-10-17T13:00:02.444Z"
 - fields:
   - name: total
     value:
@@ -92,7 +92,7 @@ dataPoints:
   - name: value
     value:
       int:
-        value: "2"
+        value: "11"
   tagFamilies:
   - name: default
     tags:
@@ -100,7 +100,7 @@ dataPoints:
       value:
         str:
           value: entity_3
-  timestamp: "2022-10-13T14:15:50.440Z"
+  timestamp: "2022-10-17T12:59:03.444Z"
 - fields:
   - name: total
     value:
@@ -117,7 +117,7 @@ dataPoints:
       value:
         str:
           value: entity_3
-  timestamp: "2022-10-13T14:15:49.440Z"
+  timestamp: "2022-10-17T12:59:02.444Z"
 - fields:
   - name: total
     value:
@@ -126,7 +126,7 @@ dataPoints:
   - name: value
     value:
       int:
-        value: "3"
+        value: "8"
   tagFamilies:
   - name: default
     tags:
@@ -134,7 +134,7 @@ dataPoints:
       value:
         str:
           value: entity_2
-  timestamp: "2022-10-13T14:14:50.440Z"
+  timestamp: "2022-10-17T12:58:03.444Z"
 - fields:
   - name: total
     value:
@@ -143,7 +143,7 @@ dataPoints:
   - name: value
     value:
       int:
-        value: "1"
+        value: "3"
   tagFamilies:
   - name: default
     tags:
@@ -151,7 +151,7 @@ dataPoints:
       value:
         str:
           value: entity_2
-  timestamp: "2022-10-13T14:14:49.440Z"
+  timestamp: "2022-10-17T12:58:02.444Z"
 - fields:
   - name: total
     value:
@@ -160,7 +160,7 @@ dataPoints:
   - name: value
     value:
       int:
-        value: "5"
+        value: "9"
   tagFamilies:
   - name: default
     tags:
@@ -168,7 +168,7 @@ dataPoints:
       value:
         str:
           value: entity_2
-  timestamp: "2022-10-13T14:13:50.440Z"
+  timestamp: "2022-10-17T12:57:03.444Z"
 - fields:
   - name: total
     value:
@@ -177,7 +177,7 @@ dataPoints:
   - name: value
     value:
       int:
-        value: "1"
+        value: "2"
   tagFamilies:
   - name: default
     tags:
@@ -185,7 +185,7 @@ dataPoints:
       value:
         str:
           value: entity_2
-  timestamp: "2022-10-13T14:13:49.440Z"
+  timestamp: "2022-10-17T12:57:02.444Z"
 - fields:
   - name: total
     value:
@@ -202,7 +202,7 @@ dataPoints:
       value:
         str:
           value: entity_2
-  timestamp: "2022-10-13T14:12:50.440Z"
+  timestamp: "2022-10-17T12:56:03.444Z"
 - fields:
   - name: total
     value:
@@ -219,4 +219,4 @@ dataPoints:
       value:
         str:
           value: entity_1
-  timestamp: "2022-10-13T14:12:49.440Z"
\ No newline at end of file
+  timestamp: "2022-10-17T12:56:02.444Z"
\ No newline at end of file
diff --git a/test/cases/topn/data/want/aggr_desc.yaml b/test/cases/topn/data/want/aggr_desc.yaml
index 336774a..85c4ee5 100644
--- a/test/cases/topn/data/want/aggr_desc.yaml
+++ b/test/cases/topn/data/want/aggr_desc.yaml
@@ -17,16 +17,16 @@
 
 lists:
 - items:
-  - name: entity_2
+  - name: entity_1
     value:
       int:
-        value: "10"
+        value: "12"
   - name: entity_3
     value:
       int:
-        value: "5"
-  - name: entity_1
+        value: "11"
+  - name: entity_2
     value:
       int:
-        value: "5"
-  timestamp: "2022-10-13T14:35:00Z"
\ No newline at end of file
+        value: "10"
+  timestamp: "2022-10-17T12:54:00Z"
\ No newline at end of file
diff --git a/test/cases/topn/data/want/asc.yaml b/test/cases/topn/data/want/asc.yaml
index 6eebcc6..14c2f44 100644
--- a/test/cases/topn/data/want/asc.yaml
+++ b/test/cases/topn/data/want/asc.yaml
@@ -21,34 +21,34 @@ lists:
     value:
       int:
         value: "1"
-  timestamp: "2022-10-13T14:14:00Z"
+  timestamp: "2022-10-17T12:49:00Z"
 - items:
   - name: entity_2
     value:
       int:
-        value: "1"
-  timestamp: "2022-10-13T14:15:00Z"
+        value: "2"
+  timestamp: "2022-10-17T12:50:00Z"
 - items:
   - name: entity_2
     value:
       int:
-        value: "1"
-  timestamp: "2022-10-13T14:16:00Z"
+        value: "3"
+  timestamp: "2022-10-17T12:51:00Z"
 - items:
   - name: entity_3
     value:
       int:
-        value: "2"
-  timestamp: "2022-10-13T14:17:00Z"
+        value: "5"
+  timestamp: "2022-10-17T12:52:00Z"
 - items:
   - name: entity_1
     value:
       int:
-        value: "1"
-  timestamp: "2022-10-13T14:18:00Z"
+        value: "4"
+  timestamp: "2022-10-17T12:53:00Z"
 - items:
   - name: entity_1
     value:
       int:
-        value: "5"
-  timestamp: "2022-10-13T14:19:00Z"
\ No newline at end of file
+        value: "6"
+  timestamp: "2022-10-17T12:54:00Z"
\ No newline at end of file
diff --git a/test/cases/topn/data/want/condition_aggr_desc.yaml b/test/cases/topn/data/want/condition_aggr_desc.yaml
index d1ae3bb..784e5c3 100644
--- a/test/cases/topn/data/want/condition_aggr_desc.yaml
+++ b/test/cases/topn/data/want/condition_aggr_desc.yaml
@@ -20,5 +20,5 @@ lists:
   - name: entity_1
     value:
       int:
-        value: "5"
-  timestamp: "2022-10-13T14:45:00Z"
\ No newline at end of file
+        value: "12"
+  timestamp: "2022-10-17T12:54:00Z"
\ No newline at end of file
diff --git a/test/cases/topn/data/want/desc.yaml b/test/cases/topn/data/want/desc.yaml
index dbc19ab..53f58be 100644
--- a/test/cases/topn/data/want/desc.yaml
+++ b/test/cases/topn/data/want/desc.yaml
@@ -21,34 +21,34 @@ lists:
     value:
       int:
         value: "10"
-  timestamp: "2022-10-13T14:14:00Z"
+  timestamp: "2022-10-17T12:49:00Z"
 - items:
   - name: entity_2
     value:
       int:
-        value: "5"
-  timestamp: "2022-10-13T14:15:00Z"
+        value: "9"
+  timestamp: "2022-10-17T12:50:00Z"
 - items:
   - name: entity_2
     value:
       int:
-        value: "3"
-  timestamp: "2022-10-13T14:16:00Z"
+        value: "8"
+  timestamp: "2022-10-17T12:51:00Z"
 - items:
   - name: entity_3
     value:
       int:
-        value: "5"
-  timestamp: "2022-10-13T14:17:00Z"
+        value: "11"
+  timestamp: "2022-10-17T12:52:00Z"
 - items:
   - name: entity_1
     value:
       int:
-        value: "4"
-  timestamp: "2022-10-13T14:18:00Z"
+        value: "12"
+  timestamp: "2022-10-17T12:53:00Z"
 - items:
   - name: entity_1
     value:
       int:
-        value: "5"
-  timestamp: "2022-10-13T14:19:00Z"
\ No newline at end of file
+        value: "6"
+  timestamp: "2022-10-17T12:54:00Z"
\ No newline at end of file