You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2022/10/31 13:18:10 UTC
[skywalking-banyandb] branch main updated: Fix flaky test cases (#201)
This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new 65ebe9f Fix flaky test cases (#201)
65ebe9f is described below
commit 65ebe9f9dc18a953916c645198dbc1b4337b0887
Author: Gao Hongtao <ha...@gmail.com>
AuthorDate: Mon Oct 31 21:18:05 2022 +0800
Fix flaky test cases (#201)
---
banyand/measure/measure_topn.go | 64 +++++--------------------
banyand/measure/measure_write.go | 13 ++---
banyand/measure/service.go | 20 +++-----
bydbctl/internal/cmd/index_rule_binding_test.go | 32 ++++++++-----
bydbctl/internal/cmd/index_rule_test.go | 32 ++++++++-----
bydbctl/internal/cmd/measure_test.go | 36 ++++++++------
bydbctl/internal/cmd/rest.go | 8 ++--
bydbctl/internal/cmd/stream_test.go | 40 +++++++++-------
test/integration/cold_query/query_suite_test.go | 5 +-
test/integration/query/query_suite_test.go | 5 +-
10 files changed, 116 insertions(+), 139 deletions(-)
diff --git a/banyand/measure/measure_topn.go b/banyand/measure/measure_topn.go
index 183b5f0..29295c3 100644
--- a/banyand/measure/measure_topn.go
+++ b/banyand/measure/measure_topn.go
@@ -37,7 +37,6 @@ import (
measurev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
"github.com/apache/skywalking-banyandb/banyand/tsdb"
- "github.com/apache/skywalking-banyandb/pkg/bus"
"github.com/apache/skywalking-banyandb/pkg/convert"
"github.com/apache/skywalking-banyandb/pkg/flow"
"github.com/apache/skywalking-banyandb/pkg/flow/streaming"
@@ -45,7 +44,6 @@ import (
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/partition"
pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
- "github.com/apache/skywalking-banyandb/pkg/timestamp"
)
const (
@@ -54,10 +52,9 @@ const (
)
var (
- _ bus.MessageListener = (*topNProcessCallback)(nil)
- _ io.Closer = (*topNStreamingProcessor)(nil)
- _ io.Closer = (*topNProcessorManager)(nil)
- _ flow.Sink = (*topNStreamingProcessor)(nil)
+ _ io.Closer = (*topNStreamingProcessor)(nil)
+ _ io.Closer = (*topNProcessorManager)(nil)
+ _ flow.Sink = (*topNStreamingProcessor)(nil)
errUnsupportedConditionValueType = errors.New("unsupported value type in the condition")
@@ -169,7 +166,7 @@ func (t *topNStreamingProcessor) writeData(eventTime time.Time, timeBucket strin
if err != nil {
return err
}
- span, err := series.Span(timestamp.NewInclusiveTimeRangeDuration(eventTime, 0))
+ span, err := series.Create(eventTime)
if err != nil {
if span != nil {
_ = span.Close()
@@ -324,16 +321,16 @@ func (manager *topNProcessorManager) Close() error {
return err
}
-func (manager *topNProcessorManager) onMeasureWrite(request *measurev1.WriteRequest) error {
- manager.RLock()
- defer manager.RUnlock()
- for _, processorList := range manager.processorMap {
- for _, processor := range processorList {
- processor.src <- flow.NewStreamRecordWithTimestampPb(request.GetDataPoint(), request.GetDataPoint().GetTimestamp())
+func (manager *topNProcessorManager) onMeasureWrite(request *measurev1.WriteRequest) {
+ go func() {
+ manager.RLock()
+ defer manager.RUnlock()
+ for _, processorList := range manager.processorMap {
+ for _, processor := range processorList {
+ processor.src <- flow.NewStreamRecordWithTimestampPb(request.GetDataPoint(), request.GetDataPoint().GetTimestamp())
+ }
}
- }
-
- return nil
+ }()
}
func (manager *topNProcessorManager) start() error {
@@ -518,41 +515,6 @@ func (manager *topNProcessorManager) buildMapper(fieldName string, groupByNames
}, nil
}
-// topNProcessCallback listens pipeline for writing requests
-type topNProcessCallback struct {
- l *logger.Logger
- schemaRepo *schemaRepo
-}
-
-func setUpStreamingProcessCallback(l *logger.Logger, schemaRepo *schemaRepo) bus.MessageListener {
- return &topNProcessCallback{
- l: l,
- schemaRepo: schemaRepo,
- }
-}
-
-func (cb *topNProcessCallback) Rev(message bus.Message) (resp bus.Message) {
- writeEvent, ok := message.Data().(*measurev1.InternalWriteRequest)
- if !ok {
- cb.l.Warn().Msg("invalid event data type")
- return
- }
-
- // first get measure existence
- m, ok := cb.schemaRepo.loadMeasure(writeEvent.GetRequest().GetMetadata())
- if !ok {
- cb.l.Warn().Msg("cannot find measure definition")
- return
- }
-
- err := m.processorManager.onMeasureWrite(writeEvent.GetRequest())
- if err != nil {
- cb.l.Debug().Err(err).Msg("fail to send to the streaming processor")
- }
-
- return
-}
-
var (
_ conditionFilter = (*strTagFilter)(nil)
_ conditionFilter = (*int64TagFilter)(nil)
diff --git a/banyand/measure/measure_write.go b/banyand/measure/measure_write.go
index 5dae6e1..c8a9aef 100644
--- a/banyand/measure/measure_write.go
+++ b/banyand/measure/measure_write.go
@@ -50,15 +50,6 @@ func (s *measure) Write(value *measurev1.DataPointValue) error {
close(waitCh)
return err
}
- // send to stream processor
- err = s.processorManager.onMeasureWrite(&measurev1.WriteRequest{
- Metadata: s.GetMetadata(),
- DataPoint: value,
- })
- if err != nil {
- close(waitCh)
- return err
- }
<-waitCh
return nil
}
@@ -149,6 +140,10 @@ func (s *measure) write(shardID common.ShardID, seriesHashKey []byte, value *mea
Cb: cb,
}
s.indexWriter.Write(m)
+ s.processorManager.onMeasureWrite(&measurev1.WriteRequest{
+ Metadata: s.GetMetadata(),
+ DataPoint: value,
+ })
return err
}
diff --git a/banyand/measure/service.go b/banyand/measure/service.go
index fadcc38..9b828cf 100644
--- a/banyand/measure/service.go
+++ b/banyand/measure/service.go
@@ -55,13 +55,12 @@ type service struct {
root string
dbOpts tsdb.DatabaseOpts
- schemaRepo schemaRepo
- writeListener bus.MessageListener
- processListener bus.MessageListener
- l *logger.Logger
- metadata metadata.Repo
- pipeline queue.Queue
- repo discovery.ServiceRepo
+ schemaRepo schemaRepo
+ writeListener bus.MessageListener
+ l *logger.Logger
+ metadata metadata.Repo
+ pipeline queue.Queue
+ repo discovery.ServiceRepo
// stop channel for the service
stopCh chan struct{}
}
@@ -133,13 +132,6 @@ func (s *service) PreRun() error {
if err != nil {
return err
}
-
- s.processListener = setUpStreamingProcessCallback(s.l, &s.schemaRepo)
- err = s.pipeline.Subscribe(data.TopicMeasureWrite, s.processListener)
- if err != nil {
- return err
- }
-
return nil
}
diff --git a/bydbctl/internal/cmd/index_rule_binding_test.go b/bydbctl/internal/cmd/index_rule_binding_test.go
index ac3e328..6379d11 100644
--- a/bydbctl/internal/cmd/index_rule_binding_test.go
+++ b/bydbctl/internal/cmd/index_rule_binding_test.go
@@ -44,7 +44,8 @@ var _ = Describe("IndexRuleBindingSchema Operation", func() {
rootCmd = &cobra.Command{Use: "root"}
cmd.RootCmdFlags(rootCmd)
rootCmd.SetArgs([]string{"group", "create", "-a", addr, "-f", "-"})
- rootCmd.SetIn(strings.NewReader(`
+ createGroup := func() string {
+ rootCmd.SetIn(strings.NewReader(`
metadata:
name: group1
catalog: CATALOG_STREAM
@@ -59,24 +60,31 @@ resource_opts:
ttl:
unit: UNIT_DAY
num: 7`))
- out := capturer.CaptureStdout(func() {
- err := rootCmd.Execute()
- Expect(err).NotTo(HaveOccurred())
- })
- Expect(out).To(ContainSubstring("group group1 is created"))
+ return capturer.CaptureStdout(func() {
+ err := rootCmd.Execute()
+ if err != nil {
+ GinkgoWriter.Printf("execution fails:%v", err)
+ }
+ })
+ }
+ Eventually(createGroup).Should(ContainSubstring("group group1 is created"))
rootCmd.SetArgs([]string{"indexRuleBinding", "create", "-a", addr, "-f", "-"})
- rootCmd.SetIn(strings.NewReader(`
+ createIndexRuleBinding := func() string {
+ rootCmd.SetIn(strings.NewReader(`
metadata:
name: name1
group: group1
subject:
catalog: CATALOG_STREAM
name: stream1`))
- out = capturer.CaptureStdout(func() {
- err := rootCmd.Execute()
- Expect(err).NotTo(HaveOccurred())
- })
- Expect(out).To(ContainSubstring("indexRuleBinding group1.name1 is created"))
+ return capturer.CaptureStdout(func() {
+ err := rootCmd.Execute()
+ if err != nil {
+ GinkgoWriter.Printf("execution fails:%v", err)
+ }
+ })
+ }
+ Eventually(createIndexRuleBinding).Should(ContainSubstring("indexRuleBinding group1.name1 is created"))
})
It("get indexRuleBinding schema", func() {
diff --git a/bydbctl/internal/cmd/index_rule_test.go b/bydbctl/internal/cmd/index_rule_test.go
index f09ff1e..baeb24d 100644
--- a/bydbctl/internal/cmd/index_rule_test.go
+++ b/bydbctl/internal/cmd/index_rule_test.go
@@ -44,7 +44,8 @@ var _ = Describe("IndexRuleSchema Operation", func() {
rootCmd = &cobra.Command{Use: "root"}
cmd.RootCmdFlags(rootCmd)
rootCmd.SetArgs([]string{"group", "create", "-a", addr, "-f", "-"})
- rootCmd.SetIn(strings.NewReader(`
+ createGroup := func() string {
+ rootCmd.SetIn(strings.NewReader(`
metadata:
name: group1
catalog: CATALOG_STREAM
@@ -59,21 +60,28 @@ resource_opts:
ttl:
unit: UNIT_DAY
num: 7`))
- out := capturer.CaptureStdout(func() {
- err := rootCmd.Execute()
- Expect(err).NotTo(HaveOccurred())
- })
- Expect(out).To(ContainSubstring("group group1 is created"))
+ return capturer.CaptureStdout(func() {
+ err := rootCmd.Execute()
+ if err != nil {
+ GinkgoWriter.Printf("execution fails:%v", err)
+ }
+ })
+ }
+ Eventually(createGroup).Should(ContainSubstring("group group1 is created"))
rootCmd.SetArgs([]string{"indexRule", "create", "-a", addr, "-f", "-"})
- rootCmd.SetIn(strings.NewReader(`
+ createIndexRule := func() string {
+ rootCmd.SetIn(strings.NewReader(`
metadata:
name: name1
group: group1`))
- out = capturer.CaptureStdout(func() {
- err := rootCmd.Execute()
- Expect(err).NotTo(HaveOccurred())
- })
- Expect(out).To(ContainSubstring("indexRule group1.name1 is created"))
+ return capturer.CaptureStdout(func() {
+ err := rootCmd.Execute()
+ if err != nil {
+ GinkgoWriter.Printf("execution fails:%v", err)
+ }
+ })
+ }
+ Eventually(createIndexRule).Should(ContainSubstring("indexRule group1.name1 is created"))
})
It("get indexRule schema", func() {
diff --git a/bydbctl/internal/cmd/measure_test.go b/bydbctl/internal/cmd/measure_test.go
index 85ae3fb..5c29c41 100644
--- a/bydbctl/internal/cmd/measure_test.go
+++ b/bydbctl/internal/cmd/measure_test.go
@@ -50,7 +50,8 @@ var _ = Describe("Measure Schema Operation", func() {
rootCmd = &cobra.Command{Use: "root"}
cmd.RootCmdFlags(rootCmd)
rootCmd.SetArgs([]string{"group", "create", "-a", addr, "-f", "-"})
- rootCmd.SetIn(strings.NewReader(`
+ createGroup := func() string {
+ rootCmd.SetIn(strings.NewReader(`
metadata:
name: group1
catalog: CATALOG_MEASURE
@@ -65,21 +66,28 @@ resource_opts:
ttl:
unit: UNIT_DAY
num: 7`))
- out := capturer.CaptureStdout(func() {
- err := rootCmd.Execute()
- Expect(err).NotTo(HaveOccurred())
- })
- Expect(out).To(ContainSubstring("group group1 is created"))
+ return capturer.CaptureStdout(func() {
+ err := rootCmd.Execute()
+ if err != nil {
+ GinkgoWriter.Printf("execution fails:%v", err)
+ }
+ })
+ }
+ Eventually(createGroup).Should(ContainSubstring("group group1 is created"))
rootCmd.SetArgs([]string{"measure", "create", "-a", addr, "-f", "-"})
- rootCmd.SetIn(strings.NewReader(`
+ createMeasure := func() string {
+ rootCmd.SetIn(strings.NewReader(`
metadata:
name: name1
group: group1`))
- out = capturer.CaptureStdout(func() {
- err := rootCmd.Execute()
- Expect(err).NotTo(HaveOccurred())
- })
- Expect(out).To(ContainSubstring("measure group1.name1 is created"))
+ return capturer.CaptureStdout(func() {
+ err := rootCmd.Execute()
+ if err != nil {
+ GinkgoWriter.Printf("execution fails:%v", err)
+ }
+ })
+ }
+ Eventually(createMeasure).Should(ContainSubstring("measure group1.name1 is created"))
})
It("get measure schema", func() {
@@ -170,9 +178,9 @@ var _ = Describe("Measure Data Query", func() {
var interval time.Duration
BeforeEach(func() {
now = timestamp.NowMilli()
- startStr = now.Add(-20 * time.Minute).Format(RFC3339)
+ startStr = now.Add(-20 * time.Minute).Format(time.RFC3339)
interval = 1 * time.Millisecond
- endStr = now.Add(5 * time.Minute).Format(RFC3339)
+ endStr = now.Add(5 * time.Minute).Format(time.RFC3339)
grpcAddr, addr, deferFunc = setup.SetUp()
Eventually(helpers.HTTPHealthCheck(addr), 10*time.Second).Should(Succeed())
addr = "http://" + addr
diff --git a/bydbctl/internal/cmd/rest.go b/bydbctl/internal/cmd/rest.go
index b42e50f..e161bdd 100644
--- a/bydbctl/internal/cmd/rest.go
+++ b/bydbctl/internal/cmd/rest.go
@@ -38,8 +38,6 @@ import (
)
const (
- // RFC3339 refers to https://www.rfc-editor.org/rfc/rfc3339
- RFC3339 = "2006-01-02T15:04:05Z07:00"
timeRange = 30 * time.Minute
timeRangeUsage = `"start" and "end" specify a time range during which the query is preformed,
they can be absolute time like "2006-01-02T15:04:05Z07:00"(https://www.rfc-editor.org/rfc/rfc3339),
@@ -183,8 +181,8 @@ func parseTimeRangeFromFlagAndYAML(reader io.Reader) (requests []reqBody, err er
}
startTS = endTS.Add(-timeRange)
}
- s := startTS.Format(RFC3339)
- e := endTS.Format(RFC3339)
+ s := startTS.Format(time.RFC3339)
+ e := endTS.Format(time.RFC3339)
var rawRequests []reqBody
if rawRequests, err = parseNameAndGroupFromYAML(reader); err != nil {
return nil, err
@@ -211,7 +209,7 @@ func parseTime(timestamp string) (time.Time, error) {
if len(timestamp) < 1 {
return time.Time{}, errors.New("time is empty")
}
- t, errAbsoluteTime := time.Parse(timestamp, RFC3339)
+ t, errAbsoluteTime := time.Parse(timestamp, time.RFC3339)
if errAbsoluteTime == nil {
return t, nil
}
diff --git a/bydbctl/internal/cmd/stream_test.go b/bydbctl/internal/cmd/stream_test.go
index 187c8c2..87632b1 100644
--- a/bydbctl/internal/cmd/stream_test.go
+++ b/bydbctl/internal/cmd/stream_test.go
@@ -38,10 +38,6 @@ import (
cases_stream_data "github.com/apache/skywalking-banyandb/test/cases/stream/data"
)
-const (
- RFC3339 = "2006-01-02T15:04:05Z07:00"
-)
-
var _ = Describe("Stream Schema Operation", func() {
var addr string
var deferFunc func()
@@ -54,7 +50,8 @@ var _ = Describe("Stream Schema Operation", func() {
rootCmd = &cobra.Command{Use: "root"}
cmd.RootCmdFlags(rootCmd)
rootCmd.SetArgs([]string{"group", "create", "-a", addr, "-f", "-"})
- rootCmd.SetIn(strings.NewReader(`
+ createGroup := func() string {
+ rootCmd.SetIn(strings.NewReader(`
metadata:
name: group1
catalog: CATALOG_STREAM
@@ -69,13 +66,17 @@ resource_opts:
ttl:
unit: UNIT_DAY
num: 7`))
- out := capturer.CaptureStdout(func() {
- err := rootCmd.Execute()
- Expect(err).NotTo(HaveOccurred())
- })
- Expect(out).To(ContainSubstring("group group1 is created"))
+ return capturer.CaptureStdout(func() {
+ err := rootCmd.Execute()
+ if err != nil {
+ GinkgoWriter.Printf("execution fails:%v", err)
+ }
+ })
+ }
+ Eventually(createGroup).Should(ContainSubstring("group group1 is created"))
rootCmd.SetArgs([]string{"stream", "create", "-a", addr, "-f", "-"})
- rootCmd.SetIn(strings.NewReader(`
+ createStream := func() string {
+ rootCmd.SetIn(strings.NewReader(`
metadata:
name: name1
group: group1
@@ -84,11 +85,14 @@ tagFamilies:
tags:
- name: trace_id
type: TAG_TYPE_STRING`))
- out = capturer.CaptureStdout(func() {
- err := rootCmd.Execute()
- Expect(err).NotTo(HaveOccurred())
- })
- Expect(out).To(ContainSubstring("stream group1.name1 is created"))
+ return capturer.CaptureStdout(func() {
+ err := rootCmd.Execute()
+ if err != nil {
+ GinkgoWriter.Printf("execution fails:%v", err)
+ }
+ })
+ }
+ Eventually(createStream).Should(ContainSubstring("stream group1.name1 is created"))
})
It("get stream schema", func() {
@@ -180,9 +184,9 @@ var _ = Describe("Stream Data Query", func() {
var interval time.Duration
BeforeEach(func() {
now = timestamp.NowMilli()
- nowStr = now.Format(RFC3339)
+ nowStr = now.Format(time.RFC3339)
interval = 500 * time.Millisecond
- endStr = now.Add(1 * time.Hour).Format(RFC3339)
+ endStr = now.Add(1 * time.Hour).Format(time.RFC3339)
grpcAddr, addr, deferFunc = setup.SetUp()
Eventually(helpers.HTTPHealthCheck(addr), 10*time.Second).Should(Succeed())
addr = "http://" + addr
diff --git a/test/integration/cold_query/query_suite_test.go b/test/integration/cold_query/query_suite_test.go
index a12a3fb..9b3a190 100644
--- a/test/integration/cold_query/query_suite_test.go
+++ b/test/integration/cold_query/query_suite_test.go
@@ -60,14 +60,15 @@ var _ = SynchronizedBeforeSuite(func() []byte {
grpclib.WithTransportCredentials(insecure.NewCredentials()),
)
Expect(err).NotTo(HaveOccurred())
- now = timestamp.NowMilli().Add(-time.Hour * 24)
+ ns := timestamp.NowMilli().UnixNano()
+ now = time.Unix(0, ns-ns%int64(time.Minute)).Add(-time.Hour * 24)
interval := 500 * time.Millisecond
casesStreamData.Write(conn, "data.json", now, interval)
interval = time.Minute
casesMeasureData.Write(conn, "service_traffic", "sw_metric", "service_traffic_data.json", now, interval)
casesMeasureData.Write(conn, "service_instance_traffic", "sw_metric", "service_instance_traffic_data.json", now, interval)
casesMeasureData.Write(conn, "service_cpm_minute", "sw_metric", "service_cpm_minute_data.json", now, interval)
- casesMeasureData.Write(conn, "service_cpm_minute", "sw_metric", "service_cpm_minute_data1.json", now.Add(time.Second), interval)
+ casesMeasureData.Write(conn, "service_cpm_minute", "sw_metric", "service_cpm_minute_data1.json", now.Add(10*time.Second), interval)
casesMeasureData.Write(conn, "service_cpm_minute", "sw_metric", "service_cpm_minute_data2.json", now.Add(10*time.Minute), interval)
Expect(conn.Close()).To(Succeed())
return []byte(addr)
diff --git a/test/integration/query/query_suite_test.go b/test/integration/query/query_suite_test.go
index 22a7972..0bb67be 100644
--- a/test/integration/query/query_suite_test.go
+++ b/test/integration/query/query_suite_test.go
@@ -60,7 +60,8 @@ var _ = SynchronizedBeforeSuite(func() []byte {
grpclib.WithTransportCredentials(insecure.NewCredentials()),
)
Expect(err).NotTo(HaveOccurred())
- now = timestamp.NowMilli()
+ ns := timestamp.NowMilli().UnixNano()
+ now = time.Unix(0, ns-ns%int64(time.Minute))
interval := 500 * time.Millisecond
// stream
cases_stream_data.Write(conn, "data.json", now, interval)
@@ -69,7 +70,7 @@ var _ = SynchronizedBeforeSuite(func() []byte {
cases_measure_data.Write(conn, "service_traffic", "sw_metric", "service_traffic_data.json", now, interval)
cases_measure_data.Write(conn, "service_instance_traffic", "sw_metric", "service_instance_traffic_data.json", now, interval)
cases_measure_data.Write(conn, "service_cpm_minute", "sw_metric", "service_cpm_minute_data.json", now, interval)
- cases_measure_data.Write(conn, "service_cpm_minute", "sw_metric", "service_cpm_minute_data1.json", now.Add(time.Second), interval)
+ cases_measure_data.Write(conn, "service_cpm_minute", "sw_metric", "service_cpm_minute_data1.json", now.Add(10*time.Second), interval)
cases_measure_data.Write(conn, "service_cpm_minute", "sw_metric", "service_cpm_minute_data2.json", now.Add(10*time.Minute), interval)
Expect(conn.Close()).To(Succeed())
return []byte(addr)