You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ha...@apache.org on 2022/10/24 10:30:12 UTC

[skywalking-banyandb] branch main updated: Add more data points to flush topn results (#196)

This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new a05f94a  Add more data points to flush topn results (#196)
a05f94a is described below

commit a05f94a5b3a48b13062c1349ad1fb1140fa69ac8
Author: Gao Hongtao <ha...@gmail.com>
AuthorDate: Mon Oct 24 18:30:07 2022 +0800

    Add more data points to flush topn results (#196)
    
    Signed-off-by: Gao Hongtao <ha...@gmail.com>
---
 bydbctl/internal/cmd/measure_test.go               | 14 ++---
 pkg/test/helpers/http_health.go                    |  1 +
 test/cases/measure/data/data.go                    |  2 +-
 .../data/testdata/service_cpm_minute_data2.json    | 62 ++++++++++++++++++++++
 test/cases/measure/measure.go                      | 28 +++++-----
 test/cases/topn/data/want/desc.yaml                |  2 +-
 test/cases/topn/topn.go                            |  8 +--
 test/integration/cold_query/query_suite_test.go    |  1 +
 test/integration/query/query_suite_test.go         |  1 +
 9 files changed, 92 insertions(+), 27 deletions(-)

diff --git a/bydbctl/internal/cmd/measure_test.go b/bydbctl/internal/cmd/measure_test.go
index bf7dcb6..85ae3fb 100644
--- a/bydbctl/internal/cmd/measure_test.go
+++ b/bydbctl/internal/cmd/measure_test.go
@@ -166,13 +166,13 @@ var _ = Describe("Measure Data Query", func() {
 	var deferFunc func()
 	var rootCmd *cobra.Command
 	var now time.Time
-	var nowStr, endStr string
+	var startStr, endStr string
 	var interval time.Duration
 	BeforeEach(func() {
 		now = timestamp.NowMilli()
-		nowStr = now.Format(RFC3339)
+		startStr = now.Add(-20 * time.Minute).Format(RFC3339)
 		interval = 1 * time.Millisecond
-		endStr = now.Add(1 * time.Hour).Format(RFC3339)
+		endStr = now.Add(5 * time.Minute).Format(RFC3339)
 		grpcAddr, addr, deferFunc = setup.SetUp()
 		Eventually(helpers.HTTPHealthCheck(addr), 10*time.Second).Should(Succeed())
 		addr = "http://" + addr
@@ -200,7 +200,7 @@ tagProjection:
   tagFamilies:
     - name: default
       tags:
-        - id`, nowStr, endStr)))
+        - id`, startStr, endStr)))
 			return capturer.CaptureStdout(func() {
 				err := rootCmd.Execute()
 				Expect(err).NotTo(HaveOccurred())
@@ -224,7 +224,7 @@ tagProjection:
 		)
 		Expect(err).NotTo(HaveOccurred())
 		now := timestamp.NowMilli()
-		interval := -time.Minute
+		interval := time.Minute
 		cases_measure_data.Write(conn, "service_cpm_minute", "sw_metric", "service_cpm_minute_data.json", now, interval)
 		args := []string{"measure", "query", "-a", addr}
 		args = append(args, timeArgs...)
@@ -256,11 +256,11 @@ tagProjection:
 	},
 		Entry("relative start", "--start", "-30m"),
 		Entry("relative end", "--end", "0m"),
-		Entry("absolute start", "--start", nowStr),
+		Entry("absolute start", "--start", startStr),
 		Entry("absolute end", "--end", endStr),
 		Entry("default"),
 		Entry("all relative", "--start", "-30m", "--end", "0m"),
-		Entry("all absolute", "--start", nowStr, "--end", endStr),
+		Entry("all absolute", "--start", startStr, "--end", endStr),
 	)
 
 	AfterEach(func() {
diff --git a/pkg/test/helpers/http_health.go b/pkg/test/helpers/http_health.go
index 47fa01f..abd33d6 100644
--- a/pkg/test/helpers/http_health.go
+++ b/pkg/test/helpers/http_health.go
@@ -41,6 +41,7 @@ func HTTPHealthCheck(addr string) func() error {
 			return ErrServiceUnhealthy
 		}
 		l.Info().Stringer("response", resp).Msg("connected")
+		time.Sleep(500 * time.Millisecond)
 		return nil
 	}
 }
diff --git a/test/cases/measure/data/data.go b/test/cases/measure/data/data.go
index 21a35d7..e683672 100644
--- a/test/cases/measure/data/data.go
+++ b/test/cases/measure/data/data.go
@@ -103,7 +103,7 @@ func loadData(md *commonv1.Metadata, measure measurev1.MeasureService_WriteClien
 		gm.Expect(errMarshal).ShouldNot(gm.HaveOccurred())
 		dataPointValue := &measurev1.DataPointValue{}
 		gm.Expect(protojson.Unmarshal(rawDataPointValue, dataPointValue)).ShouldNot(gm.HaveOccurred())
-		dataPointValue.Timestamp = timestamppb.New(baseTime.Add(time.Duration(i) * interval))
+		dataPointValue.Timestamp = timestamppb.New(baseTime.Add(-time.Duration(len(templates)-i-1) * interval))
 		gm.Expect(measure.Send(&measurev1.WriteRequest{Metadata: md, DataPoint: dataPointValue})).
 			Should(gm.Succeed())
 	}
diff --git a/test/cases/measure/data/testdata/service_cpm_minute_data2.json b/test/cases/measure/data/testdata/service_cpm_minute_data2.json
new file mode 100644
index 0000000..7e36417
--- /dev/null
+++ b/test/cases/measure/data/testdata/service_cpm_minute_data2.json
@@ -0,0 +1,62 @@
+[
+  {
+    "tag_families": [
+      {
+        "tags": [
+          {
+            "id": {
+              "value": "100"
+            }
+          },
+          {
+            "str": {
+              "value": "entity_2"
+            }
+          }
+        ]
+      }
+    ],
+    "fields": [
+      {
+        "int": {
+          "value": 100
+        }
+      },
+      {
+        "int": {
+          "value": 100
+        }
+      }
+    ]
+  },
+  {
+    "tag_families": [
+      {
+        "tags": [
+          {
+            "id": {
+              "value": "110"
+            }
+          },
+          {
+            "str": {
+              "value": "entity_2"
+            }
+          }
+        ]
+      }
+    ],
+    "fields": [
+      {
+        "int": {
+          "value": 100
+        }
+      },
+      {
+        "int": {
+          "value": 110
+        }
+      }
+    ]
+  }
+]
diff --git a/test/cases/measure/measure.go b/test/cases/measure/measure.go
index 3cb8394..2dbc9c2 100644
--- a/test/cases/measure/measure.go
+++ b/test/cases/measure/measure.go
@@ -39,18 +39,18 @@ var (
 )
 
 var _ = g.DescribeTable("Scanning Measures", verify,
-	g.Entry("all", helpers.Args{Input: "all", Duration: 1 * time.Hour}),
-	g.Entry("filter by tag", helpers.Args{Input: "tag_filter", Duration: 1 * time.Hour}),
-	g.Entry("filter by an unknown tag", helpers.Args{Input: "tag_filter_unknown", Duration: 1 * time.Hour, WantEmpty: true}),
-	g.Entry("group and max", helpers.Args{Input: "group_max", Duration: 1 * time.Hour}),
-	g.Entry("group without field", helpers.Args{Input: "group_no_field", Duration: 1 * time.Hour}),
-	g.Entry("top 2", helpers.Args{Input: "top", Duration: 1 * time.Hour}),
-	g.Entry("bottom 2", helpers.Args{Input: "bottom", Duration: 1 * time.Hour}),
-	g.Entry("order by time asc", helpers.Args{Input: "order_asc", Duration: 1 * time.Hour}),
-	g.Entry("order by time desc", helpers.Args{Input: "order_desc", Duration: 1 * time.Hour}),
-	g.Entry("limit 3,2", helpers.Args{Input: "limit", Duration: 1 * time.Hour}),
-	g.Entry("match a node", helpers.Args{Input: "match_node", Duration: 1 * time.Hour}),
-	g.Entry("match nodes", helpers.Args{Input: "match_nodes", Duration: 1 * time.Hour}),
-	g.Entry("filter by entity id", helpers.Args{Input: "entity", Duration: 1 * time.Hour}),
-	g.Entry("without field", helpers.Args{Input: "no_field", Duration: 1 * time.Hour}),
+	g.Entry("all", helpers.Args{Input: "all", Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
+	g.Entry("filter by tag", helpers.Args{Input: "tag_filter", Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
+	g.Entry("filter by an unknown tag", helpers.Args{Input: "tag_filter_unknown", Duration: 25 * time.Minute, Offset: -20 * time.Minute, WantEmpty: true}),
+	g.Entry("group and max", helpers.Args{Input: "group_max", Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
+	g.Entry("group without field", helpers.Args{Input: "group_no_field", Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
+	g.Entry("top 2", helpers.Args{Input: "top", Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
+	g.Entry("bottom 2", helpers.Args{Input: "bottom", Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
+	g.Entry("order by time asc", helpers.Args{Input: "order_asc", Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
+	g.Entry("order by time desc", helpers.Args{Input: "order_desc", Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
+	g.Entry("limit 3,2", helpers.Args{Input: "limit", Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
+	g.Entry("match a node", helpers.Args{Input: "match_node", Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
+	g.Entry("match nodes", helpers.Args{Input: "match_nodes", Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
+	g.Entry("filter by entity id", helpers.Args{Input: "entity", Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
+	g.Entry("without field", helpers.Args{Input: "no_field", Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
 )
diff --git a/test/cases/topn/data/want/desc.yaml b/test/cases/topn/data/want/desc.yaml
index 53f58be..40a10b0 100644
--- a/test/cases/topn/data/want/desc.yaml
+++ b/test/cases/topn/data/want/desc.yaml
@@ -50,5 +50,5 @@ lists:
   - name: entity_1
     value:
       int:
-        value: "6"
+        value: "7"
   timestamp: "2022-10-17T12:54:00Z"
\ No newline at end of file
diff --git a/test/cases/topn/topn.go b/test/cases/topn/topn.go
index dd055d3..200a141 100644
--- a/test/cases/topn/topn.go
+++ b/test/cases/topn/topn.go
@@ -39,8 +39,8 @@ var (
 )
 
 var _ = g.DescribeTable("TopN Tests", verify,
-	g.Entry("desc", helpers.Args{Input: "desc", Duration: 1 * time.Hour, Offset: -5 * time.Minute}),
-	g.Entry("asc", helpers.Args{Input: "asc", Duration: 1 * time.Hour, Offset: -5 * time.Minute}),
-	g.Entry("max top3 order by desc", helpers.Args{Input: "aggr_desc", Duration: 1 * time.Hour, Offset: -5 * time.Minute}),
-	g.Entry("max top3 with condition order by desc", helpers.Args{Input: "condition_aggr_desc", Duration: 1 * time.Hour, Offset: -5 * time.Minute}),
+	g.Entry("desc", helpers.Args{Input: "desc", Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
+	g.Entry("asc", helpers.Args{Input: "asc", Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
+	g.Entry("max top3 order by desc", helpers.Args{Input: "aggr_desc", Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
+	g.Entry("max top3 with condition order by desc", helpers.Args{Input: "condition_aggr_desc", Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
 )
diff --git a/test/integration/cold_query/query_suite_test.go b/test/integration/cold_query/query_suite_test.go
index 649cc5f..a12a3fb 100644
--- a/test/integration/cold_query/query_suite_test.go
+++ b/test/integration/cold_query/query_suite_test.go
@@ -68,6 +68,7 @@ var _ = SynchronizedBeforeSuite(func() []byte {
 	casesMeasureData.Write(conn, "service_instance_traffic", "sw_metric", "service_instance_traffic_data.json", now, interval)
 	casesMeasureData.Write(conn, "service_cpm_minute", "sw_metric", "service_cpm_minute_data.json", now, interval)
 	casesMeasureData.Write(conn, "service_cpm_minute", "sw_metric", "service_cpm_minute_data1.json", now.Add(time.Second), interval)
+	casesMeasureData.Write(conn, "service_cpm_minute", "sw_metric", "service_cpm_minute_data2.json", now.Add(10*time.Minute), interval)
 	Expect(conn.Close()).To(Succeed())
 	return []byte(addr)
 }, func(address []byte) {
diff --git a/test/integration/query/query_suite_test.go b/test/integration/query/query_suite_test.go
index 3b2bb9f..22a7972 100644
--- a/test/integration/query/query_suite_test.go
+++ b/test/integration/query/query_suite_test.go
@@ -70,6 +70,7 @@ var _ = SynchronizedBeforeSuite(func() []byte {
 	cases_measure_data.Write(conn, "service_instance_traffic", "sw_metric", "service_instance_traffic_data.json", now, interval)
 	cases_measure_data.Write(conn, "service_cpm_minute", "sw_metric", "service_cpm_minute_data.json", now, interval)
 	cases_measure_data.Write(conn, "service_cpm_minute", "sw_metric", "service_cpm_minute_data1.json", now.Add(time.Second), interval)
+	cases_measure_data.Write(conn, "service_cpm_minute", "sw_metric", "service_cpm_minute_data2.json", now.Add(10*time.Minute), interval)
 	Expect(conn.Close()).To(Succeed())
 	return []byte(addr)
 }, func(address []byte) {