You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2022/10/31 13:18:10 UTC

[skywalking-banyandb] branch main updated: Fix flaky test cases (#201)

This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new 65ebe9f  Fix flaky test cases (#201)
65ebe9f is described below

commit 65ebe9f9dc18a953916c645198dbc1b4337b0887
Author: Gao Hongtao <ha...@gmail.com>
AuthorDate: Mon Oct 31 21:18:05 2022 +0800

    Fix flaky test cases (#201)
---
 banyand/measure/measure_topn.go                 | 64 +++++--------------------
 banyand/measure/measure_write.go                | 13 ++---
 banyand/measure/service.go                      | 20 +++-----
 bydbctl/internal/cmd/index_rule_binding_test.go | 32 ++++++++-----
 bydbctl/internal/cmd/index_rule_test.go         | 32 ++++++++-----
 bydbctl/internal/cmd/measure_test.go            | 36 ++++++++------
 bydbctl/internal/cmd/rest.go                    |  8 ++--
 bydbctl/internal/cmd/stream_test.go             | 40 +++++++++-------
 test/integration/cold_query/query_suite_test.go |  5 +-
 test/integration/query/query_suite_test.go      |  5 +-
 10 files changed, 116 insertions(+), 139 deletions(-)

diff --git a/banyand/measure/measure_topn.go b/banyand/measure/measure_topn.go
index 183b5f0..29295c3 100644
--- a/banyand/measure/measure_topn.go
+++ b/banyand/measure/measure_topn.go
@@ -37,7 +37,6 @@ import (
 	measurev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
 	modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
 	"github.com/apache/skywalking-banyandb/banyand/tsdb"
-	"github.com/apache/skywalking-banyandb/pkg/bus"
 	"github.com/apache/skywalking-banyandb/pkg/convert"
 	"github.com/apache/skywalking-banyandb/pkg/flow"
 	"github.com/apache/skywalking-banyandb/pkg/flow/streaming"
@@ -45,7 +44,6 @@ import (
 	"github.com/apache/skywalking-banyandb/pkg/logger"
 	"github.com/apache/skywalking-banyandb/pkg/partition"
 	pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
-	"github.com/apache/skywalking-banyandb/pkg/timestamp"
 )
 
 const (
@@ -54,10 +52,9 @@ const (
 )
 
 var (
-	_ bus.MessageListener = (*topNProcessCallback)(nil)
-	_ io.Closer           = (*topNStreamingProcessor)(nil)
-	_ io.Closer           = (*topNProcessorManager)(nil)
-	_ flow.Sink           = (*topNStreamingProcessor)(nil)
+	_ io.Closer = (*topNStreamingProcessor)(nil)
+	_ io.Closer = (*topNProcessorManager)(nil)
+	_ flow.Sink = (*topNStreamingProcessor)(nil)
 
 	errUnsupportedConditionValueType = errors.New("unsupported value type in the condition")
 
@@ -169,7 +166,7 @@ func (t *topNStreamingProcessor) writeData(eventTime time.Time, timeBucket strin
 	if err != nil {
 		return err
 	}
-	span, err := series.Span(timestamp.NewInclusiveTimeRangeDuration(eventTime, 0))
+	span, err := series.Create(eventTime)
 	if err != nil {
 		if span != nil {
 			_ = span.Close()
@@ -324,16 +321,16 @@ func (manager *topNProcessorManager) Close() error {
 	return err
 }
 
-func (manager *topNProcessorManager) onMeasureWrite(request *measurev1.WriteRequest) error {
-	manager.RLock()
-	defer manager.RUnlock()
-	for _, processorList := range manager.processorMap {
-		for _, processor := range processorList {
-			processor.src <- flow.NewStreamRecordWithTimestampPb(request.GetDataPoint(), request.GetDataPoint().GetTimestamp())
+func (manager *topNProcessorManager) onMeasureWrite(request *measurev1.WriteRequest) {
+	go func() {
+		manager.RLock()
+		defer manager.RUnlock()
+		for _, processorList := range manager.processorMap {
+			for _, processor := range processorList {
+				processor.src <- flow.NewStreamRecordWithTimestampPb(request.GetDataPoint(), request.GetDataPoint().GetTimestamp())
+			}
 		}
-	}
-
-	return nil
+	}()
 }
 
 func (manager *topNProcessorManager) start() error {
@@ -518,41 +515,6 @@ func (manager *topNProcessorManager) buildMapper(fieldName string, groupByNames
 	}, nil
 }
 
-// topNProcessCallback listens pipeline for writing requests
-type topNProcessCallback struct {
-	l          *logger.Logger
-	schemaRepo *schemaRepo
-}
-
-func setUpStreamingProcessCallback(l *logger.Logger, schemaRepo *schemaRepo) bus.MessageListener {
-	return &topNProcessCallback{
-		l:          l,
-		schemaRepo: schemaRepo,
-	}
-}
-
-func (cb *topNProcessCallback) Rev(message bus.Message) (resp bus.Message) {
-	writeEvent, ok := message.Data().(*measurev1.InternalWriteRequest)
-	if !ok {
-		cb.l.Warn().Msg("invalid event data type")
-		return
-	}
-
-	// first get measure existence
-	m, ok := cb.schemaRepo.loadMeasure(writeEvent.GetRequest().GetMetadata())
-	if !ok {
-		cb.l.Warn().Msg("cannot find measure definition")
-		return
-	}
-
-	err := m.processorManager.onMeasureWrite(writeEvent.GetRequest())
-	if err != nil {
-		cb.l.Debug().Err(err).Msg("fail to send to the streaming processor")
-	}
-
-	return
-}
-
 var (
 	_ conditionFilter = (*strTagFilter)(nil)
 	_ conditionFilter = (*int64TagFilter)(nil)
diff --git a/banyand/measure/measure_write.go b/banyand/measure/measure_write.go
index 5dae6e1..c8a9aef 100644
--- a/banyand/measure/measure_write.go
+++ b/banyand/measure/measure_write.go
@@ -50,15 +50,6 @@ func (s *measure) Write(value *measurev1.DataPointValue) error {
 		close(waitCh)
 		return err
 	}
-	// send to stream processor
-	err = s.processorManager.onMeasureWrite(&measurev1.WriteRequest{
-		Metadata:  s.GetMetadata(),
-		DataPoint: value,
-	})
-	if err != nil {
-		close(waitCh)
-		return err
-	}
 	<-waitCh
 	return nil
 }
@@ -149,6 +140,10 @@ func (s *measure) write(shardID common.ShardID, seriesHashKey []byte, value *mea
 		Cb:          cb,
 	}
 	s.indexWriter.Write(m)
+	s.processorManager.onMeasureWrite(&measurev1.WriteRequest{
+		Metadata:  s.GetMetadata(),
+		DataPoint: value,
+	})
 	return err
 }
 
diff --git a/banyand/measure/service.go b/banyand/measure/service.go
index fadcc38..9b828cf 100644
--- a/banyand/measure/service.go
+++ b/banyand/measure/service.go
@@ -55,13 +55,12 @@ type service struct {
 	root   string
 	dbOpts tsdb.DatabaseOpts
 
-	schemaRepo      schemaRepo
-	writeListener   bus.MessageListener
-	processListener bus.MessageListener
-	l               *logger.Logger
-	metadata        metadata.Repo
-	pipeline        queue.Queue
-	repo            discovery.ServiceRepo
+	schemaRepo    schemaRepo
+	writeListener bus.MessageListener
+	l             *logger.Logger
+	metadata      metadata.Repo
+	pipeline      queue.Queue
+	repo          discovery.ServiceRepo
 	// stop channel for the service
 	stopCh chan struct{}
 }
@@ -133,13 +132,6 @@ func (s *service) PreRun() error {
 	if err != nil {
 		return err
 	}
-
-	s.processListener = setUpStreamingProcessCallback(s.l, &s.schemaRepo)
-	err = s.pipeline.Subscribe(data.TopicMeasureWrite, s.processListener)
-	if err != nil {
-		return err
-	}
-
 	return nil
 }
 
diff --git a/bydbctl/internal/cmd/index_rule_binding_test.go b/bydbctl/internal/cmd/index_rule_binding_test.go
index ac3e328..6379d11 100644
--- a/bydbctl/internal/cmd/index_rule_binding_test.go
+++ b/bydbctl/internal/cmd/index_rule_binding_test.go
@@ -44,7 +44,8 @@ var _ = Describe("IndexRuleBindingSchema Operation", func() {
 		rootCmd = &cobra.Command{Use: "root"}
 		cmd.RootCmdFlags(rootCmd)
 		rootCmd.SetArgs([]string{"group", "create", "-a", addr, "-f", "-"})
-		rootCmd.SetIn(strings.NewReader(`
+		createGroup := func() string {
+			rootCmd.SetIn(strings.NewReader(`
 metadata:
   name: group1
 catalog: CATALOG_STREAM
@@ -59,24 +60,31 @@ resource_opts:
   ttl:
     unit: UNIT_DAY
     num: 7`))
-		out := capturer.CaptureStdout(func() {
-			err := rootCmd.Execute()
-			Expect(err).NotTo(HaveOccurred())
-		})
-		Expect(out).To(ContainSubstring("group group1 is created"))
+			return capturer.CaptureStdout(func() {
+				err := rootCmd.Execute()
+				if err != nil {
+					GinkgoWriter.Printf("execution fails:%v", err)
+				}
+			})
+		}
+		Eventually(createGroup).Should(ContainSubstring("group group1 is created"))
 		rootCmd.SetArgs([]string{"indexRuleBinding", "create", "-a", addr, "-f", "-"})
-		rootCmd.SetIn(strings.NewReader(`
+		createIndexRuleBinding := func() string {
+			rootCmd.SetIn(strings.NewReader(`
 metadata:
   name: name1
   group: group1
 subject:
   catalog: CATALOG_STREAM
   name: stream1`))
-		out = capturer.CaptureStdout(func() {
-			err := rootCmd.Execute()
-			Expect(err).NotTo(HaveOccurred())
-		})
-		Expect(out).To(ContainSubstring("indexRuleBinding group1.name1 is created"))
+			return capturer.CaptureStdout(func() {
+				err := rootCmd.Execute()
+				if err != nil {
+					GinkgoWriter.Printf("execution fails:%v", err)
+				}
+			})
+		}
+		Eventually(createIndexRuleBinding).Should(ContainSubstring("indexRuleBinding group1.name1 is created"))
 	})
 
 	It("get indexRuleBinding schema", func() {
diff --git a/bydbctl/internal/cmd/index_rule_test.go b/bydbctl/internal/cmd/index_rule_test.go
index f09ff1e..baeb24d 100644
--- a/bydbctl/internal/cmd/index_rule_test.go
+++ b/bydbctl/internal/cmd/index_rule_test.go
@@ -44,7 +44,8 @@ var _ = Describe("IndexRuleSchema Operation", func() {
 		rootCmd = &cobra.Command{Use: "root"}
 		cmd.RootCmdFlags(rootCmd)
 		rootCmd.SetArgs([]string{"group", "create", "-a", addr, "-f", "-"})
-		rootCmd.SetIn(strings.NewReader(`
+		createGroup := func() string {
+			rootCmd.SetIn(strings.NewReader(`
 metadata:
   name: group1
 catalog: CATALOG_STREAM
@@ -59,21 +60,28 @@ resource_opts:
   ttl:
     unit: UNIT_DAY
     num: 7`))
-		out := capturer.CaptureStdout(func() {
-			err := rootCmd.Execute()
-			Expect(err).NotTo(HaveOccurred())
-		})
-		Expect(out).To(ContainSubstring("group group1 is created"))
+			return capturer.CaptureStdout(func() {
+				err := rootCmd.Execute()
+				if err != nil {
+					GinkgoWriter.Printf("execution fails:%v", err)
+				}
+			})
+		}
+		Eventually(createGroup).Should(ContainSubstring("group group1 is created"))
 		rootCmd.SetArgs([]string{"indexRule", "create", "-a", addr, "-f", "-"})
-		rootCmd.SetIn(strings.NewReader(`
+		createIndexRule := func() string {
+			rootCmd.SetIn(strings.NewReader(`
 metadata:
   name: name1
   group: group1`))
-		out = capturer.CaptureStdout(func() {
-			err := rootCmd.Execute()
-			Expect(err).NotTo(HaveOccurred())
-		})
-		Expect(out).To(ContainSubstring("indexRule group1.name1 is created"))
+			return capturer.CaptureStdout(func() {
+				err := rootCmd.Execute()
+				if err != nil {
+					GinkgoWriter.Printf("execution fails:%v", err)
+				}
+			})
+		}
+		Eventually(createIndexRule).Should(ContainSubstring("indexRule group1.name1 is created"))
 	})
 
 	It("get indexRule schema", func() {
diff --git a/bydbctl/internal/cmd/measure_test.go b/bydbctl/internal/cmd/measure_test.go
index 85ae3fb..5c29c41 100644
--- a/bydbctl/internal/cmd/measure_test.go
+++ b/bydbctl/internal/cmd/measure_test.go
@@ -50,7 +50,8 @@ var _ = Describe("Measure Schema Operation", func() {
 		rootCmd = &cobra.Command{Use: "root"}
 		cmd.RootCmdFlags(rootCmd)
 		rootCmd.SetArgs([]string{"group", "create", "-a", addr, "-f", "-"})
-		rootCmd.SetIn(strings.NewReader(`
+		createGroup := func() string {
+			rootCmd.SetIn(strings.NewReader(`
 metadata:
   name: group1
 catalog: CATALOG_MEASURE
@@ -65,21 +66,28 @@ resource_opts:
   ttl:
     unit: UNIT_DAY
     num: 7`))
-		out := capturer.CaptureStdout(func() {
-			err := rootCmd.Execute()
-			Expect(err).NotTo(HaveOccurred())
-		})
-		Expect(out).To(ContainSubstring("group group1 is created"))
+			return capturer.CaptureStdout(func() {
+				err := rootCmd.Execute()
+				if err != nil {
+					GinkgoWriter.Printf("execution fails:%v", err)
+				}
+			})
+		}
+		Eventually(createGroup).Should(ContainSubstring("group group1 is created"))
 		rootCmd.SetArgs([]string{"measure", "create", "-a", addr, "-f", "-"})
-		rootCmd.SetIn(strings.NewReader(`
+		createMeasure := func() string {
+			rootCmd.SetIn(strings.NewReader(`
 metadata:
   name: name1
   group: group1`))
-		out = capturer.CaptureStdout(func() {
-			err := rootCmd.Execute()
-			Expect(err).NotTo(HaveOccurred())
-		})
-		Expect(out).To(ContainSubstring("measure group1.name1 is created"))
+			return capturer.CaptureStdout(func() {
+				err := rootCmd.Execute()
+				if err != nil {
+					GinkgoWriter.Printf("execution fails:%v", err)
+				}
+			})
+		}
+		Eventually(createMeasure).Should(ContainSubstring("measure group1.name1 is created"))
 	})
 
 	It("get measure schema", func() {
@@ -170,9 +178,9 @@ var _ = Describe("Measure Data Query", func() {
 	var interval time.Duration
 	BeforeEach(func() {
 		now = timestamp.NowMilli()
-		startStr = now.Add(-20 * time.Minute).Format(RFC3339)
+		startStr = now.Add(-20 * time.Minute).Format(time.RFC3339)
 		interval = 1 * time.Millisecond
-		endStr = now.Add(5 * time.Minute).Format(RFC3339)
+		endStr = now.Add(5 * time.Minute).Format(time.RFC3339)
 		grpcAddr, addr, deferFunc = setup.SetUp()
 		Eventually(helpers.HTTPHealthCheck(addr), 10*time.Second).Should(Succeed())
 		addr = "http://" + addr
diff --git a/bydbctl/internal/cmd/rest.go b/bydbctl/internal/cmd/rest.go
index b42e50f..e161bdd 100644
--- a/bydbctl/internal/cmd/rest.go
+++ b/bydbctl/internal/cmd/rest.go
@@ -38,8 +38,6 @@ import (
 )
 
 const (
-	// RFC3339 refers to https://www.rfc-editor.org/rfc/rfc3339
-	RFC3339        = "2006-01-02T15:04:05Z07:00"
 	timeRange      = 30 * time.Minute
 	timeRangeUsage = `"start" and "end" specify a time range during which the query is preformed,
 		they can be absolute time like "2006-01-02T15:04:05Z07:00"(https://www.rfc-editor.org/rfc/rfc3339), 
@@ -183,8 +181,8 @@ func parseTimeRangeFromFlagAndYAML(reader io.Reader) (requests []reqBody, err er
 		}
 		startTS = endTS.Add(-timeRange)
 	}
-	s := startTS.Format(RFC3339)
-	e := endTS.Format(RFC3339)
+	s := startTS.Format(time.RFC3339)
+	e := endTS.Format(time.RFC3339)
 	var rawRequests []reqBody
 	if rawRequests, err = parseNameAndGroupFromYAML(reader); err != nil {
 		return nil, err
@@ -211,7 +209,7 @@ func parseTime(timestamp string) (time.Time, error) {
 	if len(timestamp) < 1 {
 		return time.Time{}, errors.New("time is empty")
 	}
-	t, errAbsoluteTime := time.Parse(timestamp, RFC3339)
+	t, errAbsoluteTime := time.Parse(timestamp, time.RFC3339)
 	if errAbsoluteTime == nil {
 		return t, nil
 	}
diff --git a/bydbctl/internal/cmd/stream_test.go b/bydbctl/internal/cmd/stream_test.go
index 187c8c2..87632b1 100644
--- a/bydbctl/internal/cmd/stream_test.go
+++ b/bydbctl/internal/cmd/stream_test.go
@@ -38,10 +38,6 @@ import (
 	cases_stream_data "github.com/apache/skywalking-banyandb/test/cases/stream/data"
 )
 
-const (
-	RFC3339 = "2006-01-02T15:04:05Z07:00"
-)
-
 var _ = Describe("Stream Schema Operation", func() {
 	var addr string
 	var deferFunc func()
@@ -54,7 +50,8 @@ var _ = Describe("Stream Schema Operation", func() {
 		rootCmd = &cobra.Command{Use: "root"}
 		cmd.RootCmdFlags(rootCmd)
 		rootCmd.SetArgs([]string{"group", "create", "-a", addr, "-f", "-"})
-		rootCmd.SetIn(strings.NewReader(`
+		createGroup := func() string {
+			rootCmd.SetIn(strings.NewReader(`
 metadata:
   name: group1
 catalog: CATALOG_STREAM
@@ -69,13 +66,17 @@ resource_opts:
   ttl:
     unit: UNIT_DAY
     num: 7`))
-		out := capturer.CaptureStdout(func() {
-			err := rootCmd.Execute()
-			Expect(err).NotTo(HaveOccurred())
-		})
-		Expect(out).To(ContainSubstring("group group1 is created"))
+			return capturer.CaptureStdout(func() {
+				err := rootCmd.Execute()
+				if err != nil {
+					GinkgoWriter.Printf("execution fails:%v", err)
+				}
+			})
+		}
+		Eventually(createGroup).Should(ContainSubstring("group group1 is created"))
 		rootCmd.SetArgs([]string{"stream", "create", "-a", addr, "-f", "-"})
-		rootCmd.SetIn(strings.NewReader(`
+		createStream := func() string {
+			rootCmd.SetIn(strings.NewReader(`
 metadata:
   name: name1
   group: group1
@@ -84,11 +85,14 @@ tagFamilies:
     tags: 
       - name: trace_id
         type: TAG_TYPE_STRING`))
-		out = capturer.CaptureStdout(func() {
-			err := rootCmd.Execute()
-			Expect(err).NotTo(HaveOccurred())
-		})
-		Expect(out).To(ContainSubstring("stream group1.name1 is created"))
+			return capturer.CaptureStdout(func() {
+				err := rootCmd.Execute()
+				if err != nil {
+					GinkgoWriter.Printf("execution fails:%v", err)
+				}
+			})
+		}
+		Eventually(createStream).Should(ContainSubstring("stream group1.name1 is created"))
 	})
 
 	It("get stream schema", func() {
@@ -180,9 +184,9 @@ var _ = Describe("Stream Data Query", func() {
 	var interval time.Duration
 	BeforeEach(func() {
 		now = timestamp.NowMilli()
-		nowStr = now.Format(RFC3339)
+		nowStr = now.Format(time.RFC3339)
 		interval = 500 * time.Millisecond
-		endStr = now.Add(1 * time.Hour).Format(RFC3339)
+		endStr = now.Add(1 * time.Hour).Format(time.RFC3339)
 		grpcAddr, addr, deferFunc = setup.SetUp()
 		Eventually(helpers.HTTPHealthCheck(addr), 10*time.Second).Should(Succeed())
 		addr = "http://" + addr
diff --git a/test/integration/cold_query/query_suite_test.go b/test/integration/cold_query/query_suite_test.go
index a12a3fb..9b3a190 100644
--- a/test/integration/cold_query/query_suite_test.go
+++ b/test/integration/cold_query/query_suite_test.go
@@ -60,14 +60,15 @@ var _ = SynchronizedBeforeSuite(func() []byte {
 		grpclib.WithTransportCredentials(insecure.NewCredentials()),
 	)
 	Expect(err).NotTo(HaveOccurred())
-	now = timestamp.NowMilli().Add(-time.Hour * 24)
+	ns := timestamp.NowMilli().UnixNano()
+	now = time.Unix(0, ns-ns%int64(time.Minute)).Add(-time.Hour * 24)
 	interval := 500 * time.Millisecond
 	casesStreamData.Write(conn, "data.json", now, interval)
 	interval = time.Minute
 	casesMeasureData.Write(conn, "service_traffic", "sw_metric", "service_traffic_data.json", now, interval)
 	casesMeasureData.Write(conn, "service_instance_traffic", "sw_metric", "service_instance_traffic_data.json", now, interval)
 	casesMeasureData.Write(conn, "service_cpm_minute", "sw_metric", "service_cpm_minute_data.json", now, interval)
-	casesMeasureData.Write(conn, "service_cpm_minute", "sw_metric", "service_cpm_minute_data1.json", now.Add(time.Second), interval)
+	casesMeasureData.Write(conn, "service_cpm_minute", "sw_metric", "service_cpm_minute_data1.json", now.Add(10*time.Second), interval)
 	casesMeasureData.Write(conn, "service_cpm_minute", "sw_metric", "service_cpm_minute_data2.json", now.Add(10*time.Minute), interval)
 	Expect(conn.Close()).To(Succeed())
 	return []byte(addr)
diff --git a/test/integration/query/query_suite_test.go b/test/integration/query/query_suite_test.go
index 22a7972..0bb67be 100644
--- a/test/integration/query/query_suite_test.go
+++ b/test/integration/query/query_suite_test.go
@@ -60,7 +60,8 @@ var _ = SynchronizedBeforeSuite(func() []byte {
 		grpclib.WithTransportCredentials(insecure.NewCredentials()),
 	)
 	Expect(err).NotTo(HaveOccurred())
-	now = timestamp.NowMilli()
+	ns := timestamp.NowMilli().UnixNano()
+	now = time.Unix(0, ns-ns%int64(time.Minute))
 	interval := 500 * time.Millisecond
 	// stream
 	cases_stream_data.Write(conn, "data.json", now, interval)
@@ -69,7 +70,7 @@ var _ = SynchronizedBeforeSuite(func() []byte {
 	cases_measure_data.Write(conn, "service_traffic", "sw_metric", "service_traffic_data.json", now, interval)
 	cases_measure_data.Write(conn, "service_instance_traffic", "sw_metric", "service_instance_traffic_data.json", now, interval)
 	cases_measure_data.Write(conn, "service_cpm_minute", "sw_metric", "service_cpm_minute_data.json", now, interval)
-	cases_measure_data.Write(conn, "service_cpm_minute", "sw_metric", "service_cpm_minute_data1.json", now.Add(time.Second), interval)
+	cases_measure_data.Write(conn, "service_cpm_minute", "sw_metric", "service_cpm_minute_data1.json", now.Add(10*time.Second), interval)
 	cases_measure_data.Write(conn, "service_cpm_minute", "sw_metric", "service_cpm_minute_data2.json", now.Add(10*time.Minute), interval)
 	Expect(conn.Close()).To(Succeed())
 	return []byte(addr)