You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ha...@apache.org on 2022/10/07 05:39:58 UTC

[skywalking-banyandb] branch main updated: Fix test cases with Eventually semantics (#183)

This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new 4bfbb84  Fix test cases with Eventually semantics (#183)
4bfbb84 is described below

commit 4bfbb842cc20a8cc1647a9ebcbd0278c33f4d6c3
Author: Jiajing LU <lu...@gmail.com>
AuthorDate: Fri Oct 7 13:39:53 2022 +0800

    Fix test cases with Eventually semantics (#183)
    
    * fix test cases with Eventually semantics
---
 .../measure/measure_plan_indexscan_local.go        |  2 +-
 test/cases/measure/data/data.go                    | 34 ++++++-------
 test/cases/measure/measure.go                      |  7 ++-
 test/cases/stream/stream.go                        | 57 ++++++++++++----------
 test/integration/cold_query/query_suite_test.go    | 47 +++++++++---------
 test/integration/other/measure_test.go             | 38 ++++++++-------
 test/integration/other/other_suite_test.go         | 15 +++---
 test/integration/other/tls_test.go                 | 38 ++++++++-------
 test/integration/query/query_suite_test.go         | 29 +++++------
 9 files changed, 141 insertions(+), 126 deletions(-)

diff --git a/pkg/query/logical/measure/measure_plan_indexscan_local.go b/pkg/query/logical/measure/measure_plan_indexscan_local.go
index 5002238..c9147d4 100644
--- a/pkg/query/logical/measure/measure_plan_indexscan_local.go
+++ b/pkg/query/logical/measure/measure_plan_indexscan_local.go
@@ -115,7 +115,7 @@ func (i *localIndexScan) Execute(ec executor.MeasureExecutionContext) (executor.
 		}
 	}
 	if len(seriesList) == 0 {
-		return nil, nil
+		return executor.EmptyMIterator, nil
 	}
 	var builders []logical.SeekerBuilder
 	if i.Index != nil {
diff --git a/test/cases/measure/data/data.go b/test/cases/measure/data/data.go
index 978af69..967d02d 100644
--- a/test/cases/measure/data/data.go
+++ b/test/cases/measure/data/data.go
@@ -34,8 +34,8 @@ import (
 	"google.golang.org/protobuf/types/known/timestamppb"
 	"sigs.k8s.io/yaml"
 
-	common_v1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
-	measure_v1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
+	commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+	measurev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
 	"github.com/apache/skywalking-banyandb/pkg/test/helpers"
 )
 
@@ -46,13 +46,13 @@ var inputFS embed.FS
 var wantFS embed.FS
 
 // VerifyFn verify whether the query response matches the wanted result
-var VerifyFn = func(sharedContext helpers.SharedContext, args helpers.Args) {
+var VerifyFn = func(innerGm gm.Gomega, sharedContext helpers.SharedContext, args helpers.Args) {
 	i, err := inputFS.ReadFile("input/" + args.Input + ".yaml")
-	gm.Expect(err).NotTo(gm.HaveOccurred())
-	query := &measure_v1.QueryRequest{}
+	innerGm.Expect(err).NotTo(gm.HaveOccurred())
+	query := &measurev1.QueryRequest{}
 	helpers.UnmarshalYAML(i, query)
 	query.TimeRange = helpers.TimeRange(args, sharedContext)
-	c := measure_v1.NewMeasureServiceClient(sharedContext.Connection)
+	c := measurev1.NewMeasureServiceClient(sharedContext.Connection)
 	ctx := context.Background()
 	resp, err := c.Query(ctx, query)
 	if args.WantErr {
@@ -61,21 +61,21 @@ var VerifyFn = func(sharedContext helpers.SharedContext, args helpers.Args) {
 		}
 		return
 	}
-	gm.Expect(err).NotTo(gm.HaveOccurred(), query.String())
+	innerGm.Expect(err).NotTo(gm.HaveOccurred(), query.String())
 	if args.WantEmpty {
-		gm.Expect(resp.DataPoints).To(gm.BeEmpty())
+		innerGm.Expect(resp.DataPoints).To(gm.BeEmpty())
 		return
 	}
 	if args.Want == "" {
 		args.Want = args.Input
 	}
 	ww, err := wantFS.ReadFile("want/" + args.Want + ".yaml")
-	gm.Expect(err).NotTo(gm.HaveOccurred())
-	want := &measure_v1.QueryResponse{}
+	innerGm.Expect(err).NotTo(gm.HaveOccurred())
+	want := &measurev1.QueryResponse{}
 	helpers.UnmarshalYAML(ww, want)
-	gm.Expect(cmp.Equal(resp, want,
+	innerGm.Expect(cmp.Equal(resp, want,
 		protocmp.IgnoreUnknown(),
-		protocmp.IgnoreFields(&measure_v1.DataPoint{}, "timestamp"),
+		protocmp.IgnoreFields(&measurev1.DataPoint{}, "timestamp"),
 		protocmp.Transform())).
 		To(gm.BeTrue(), func() string {
 			j, err := protojson.Marshal(resp)
@@ -93,7 +93,7 @@ var VerifyFn = func(sharedContext helpers.SharedContext, args helpers.Args) {
 //go:embed testdata/*.json
 var dataFS embed.FS
 
-func loadData(md *common_v1.Metadata, measure measure_v1.MeasureService_WriteClient, dataFile string, baseTime time.Time, interval time.Duration) {
+func loadData(md *commonv1.Metadata, measure measurev1.MeasureService_WriteClient, dataFile string, baseTime time.Time, interval time.Duration) {
 	var templates []interface{}
 	content, err := dataFS.ReadFile("testdata/" + dataFile)
 	gm.Expect(err).ShouldNot(gm.HaveOccurred())
@@ -101,10 +101,10 @@ func loadData(md *common_v1.Metadata, measure measure_v1.MeasureService_WriteCli
 	for i, template := range templates {
 		rawDataPointValue, errMarshal := json.Marshal(template)
 		gm.Expect(errMarshal).ShouldNot(gm.HaveOccurred())
-		dataPointValue := &measure_v1.DataPointValue{}
+		dataPointValue := &measurev1.DataPointValue{}
 		gm.Expect(protojson.Unmarshal(rawDataPointValue, dataPointValue)).ShouldNot(gm.HaveOccurred())
 		dataPointValue.Timestamp = timestamppb.New(baseTime.Add(time.Duration(i) * time.Minute))
-		gm.Expect(measure.Send(&measure_v1.WriteRequest{Metadata: md, DataPoint: dataPointValue})).
+		gm.Expect(measure.Send(&measurev1.WriteRequest{Metadata: md, DataPoint: dataPointValue})).
 			Should(gm.Succeed())
 	}
 }
@@ -113,11 +113,11 @@ func loadData(md *common_v1.Metadata, measure measure_v1.MeasureService_WriteCli
 func Write(conn *grpclib.ClientConn, name, group, dataFile string,
 	baseTime time.Time, interval time.Duration,
 ) {
-	c := measure_v1.NewMeasureServiceClient(conn)
+	c := measurev1.NewMeasureServiceClient(conn)
 	ctx := context.Background()
 	writeClient, err := c.Write(ctx)
 	gm.Expect(err).NotTo(gm.HaveOccurred())
-	loadData(&common_v1.Metadata{
+	loadData(&commonv1.Metadata{
 		Name:  name,
 		Group: group,
 	}, writeClient, dataFile, baseTime, interval)
diff --git a/test/cases/measure/measure.go b/test/cases/measure/measure.go
index 87c9d13..b7939ae 100644
--- a/test/cases/measure/measure.go
+++ b/test/cases/measure/measure.go
@@ -22,16 +22,19 @@ import (
 	"time"
 
 	g "github.com/onsi/ginkgo/v2"
+	gm "github.com/onsi/gomega"
 
 	"github.com/apache/skywalking-banyandb/pkg/test/helpers"
-	measure_test_data "github.com/apache/skywalking-banyandb/test/cases/measure/data"
+	measureTestData "github.com/apache/skywalking-banyandb/test/cases/measure/data"
 )
 
 var (
 	// SharedContext is the parallel execution context
 	SharedContext helpers.SharedContext
 	verify        = func(args helpers.Args) {
-		measure_test_data.VerifyFn(SharedContext, args)
+		gm.Eventually(func(innerGm gm.Gomega) {
+			measureTestData.VerifyFn(innerGm, SharedContext, args)
+		})
 	}
 )
 
diff --git a/test/cases/stream/stream.go b/test/cases/stream/stream.go
index 46d6817..600862d 100644
--- a/test/cases/stream/stream.go
+++ b/test/cases/stream/stream.go
@@ -31,17 +31,16 @@ import (
 	"github.com/google/go-cmp/cmp"
 	g "github.com/onsi/ginkgo/v2"
 	gm "github.com/onsi/gomega"
-	"sigs.k8s.io/yaml"
-
-	common_v1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
-	model_v1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
-	stream_v1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
-	"github.com/apache/skywalking-banyandb/pkg/test/helpers"
-
 	grpclib "google.golang.org/grpc"
 	"google.golang.org/protobuf/encoding/protojson"
 	"google.golang.org/protobuf/testing/protocmp"
 	"google.golang.org/protobuf/types/known/timestamppb"
+	"sigs.k8s.io/yaml"
+
+	commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+	modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+	streamv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
+	"github.com/apache/skywalking-banyandb/pkg/test/helpers"
 )
 
 // SharedContext is the parallel execution context
@@ -53,13 +52,13 @@ var inputFS embed.FS
 //go:embed want/*.yaml
 var wantFS embed.FS
 
-var _ = g.DescribeTable("Scanning Streams", func(args helpers.Args) {
+var verifyFn = func(innerGm gm.Gomega, args helpers.Args) {
 	i, err := inputFS.ReadFile("input/" + args.Input + ".yaml")
-	gm.Expect(err).NotTo(gm.HaveOccurred())
-	query := &stream_v1.QueryRequest{}
+	innerGm.Expect(err).NotTo(gm.HaveOccurred())
+	query := &streamv1.QueryRequest{}
 	helpers.UnmarshalYAML(i, query)
 	query.TimeRange = helpers.TimeRange(args, SharedContext)
-	c := stream_v1.NewStreamServiceClient(SharedContext.Connection)
+	c := streamv1.NewStreamServiceClient(SharedContext.Connection)
 	ctx := context.Background()
 	resp, err := c.Query(ctx, query)
 	if args.WantErr {
@@ -68,21 +67,21 @@ var _ = g.DescribeTable("Scanning Streams", func(args helpers.Args) {
 		}
 		return
 	}
-	gm.Expect(err).NotTo(gm.HaveOccurred(), query.String())
+	innerGm.Expect(err).NotTo(gm.HaveOccurred(), query.String())
 	if args.WantEmpty {
-		gm.Expect(resp.Elements).To(gm.BeEmpty())
+		innerGm.Expect(resp.Elements).To(gm.BeEmpty())
 		return
 	}
 	if args.Want == "" {
 		args.Want = args.Input
 	}
 	ww, err := wantFS.ReadFile("want/" + args.Want + ".yaml")
-	gm.Expect(err).NotTo(gm.HaveOccurred())
-	want := &stream_v1.QueryResponse{}
+	innerGm.Expect(err).NotTo(gm.HaveOccurred())
+	want := &streamv1.QueryResponse{}
 	helpers.UnmarshalYAML(ww, want)
-	gm.Expect(cmp.Equal(resp, want,
+	innerGm.Expect(cmp.Equal(resp, want,
 		protocmp.IgnoreUnknown(),
-		protocmp.IgnoreFields(&stream_v1.Element{}, "timestamp"),
+		protocmp.IgnoreFields(&streamv1.Element{}, "timestamp"),
 		protocmp.Transform())).
 		To(gm.BeTrue(), func() string {
 			j, err := protojson.Marshal(resp)
@@ -95,6 +94,12 @@ var _ = g.DescribeTable("Scanning Streams", func(args helpers.Args) {
 			}
 			return string(y)
 		})
+}
+
+var _ = g.DescribeTable("Scanning Streams", func(args helpers.Args) {
+	gm.Eventually(func(innerGm gm.Gomega) {
+		verifyFn(innerGm, args)
+	})
 },
 	g.Entry("all elements", helpers.Args{Input: "all", Duration: 1 * time.Hour}),
 	g.Entry("limit", helpers.Args{Input: "limit", Duration: 1 * time.Hour}),
@@ -120,7 +125,7 @@ var _ = g.DescribeTable("Scanning Streams", func(args helpers.Args) {
 //go:embed testdata/*.json
 var dataFS embed.FS
 
-func loadData(stream stream_v1.StreamService_WriteClient, dataFile string, baseTime time.Time, interval time.Duration) {
+func loadData(stream streamv1.StreamService_WriteClient, dataFile string, baseTime time.Time, interval time.Duration) {
 	var templates []interface{}
 	content, err := dataFS.ReadFile("testdata/" + dataFile)
 	gm.Expect(err).ShouldNot(gm.HaveOccurred())
@@ -129,16 +134,16 @@ func loadData(stream stream_v1.StreamService_WriteClient, dataFile string, baseT
 	for i, template := range templates {
 		rawSearchTagFamily, errMarshal := json.Marshal(template)
 		gm.Expect(errMarshal).ShouldNot(gm.HaveOccurred())
-		searchTagFamily := &model_v1.TagFamilyForWrite{}
+		searchTagFamily := &modelv1.TagFamilyForWrite{}
 		gm.Expect(protojson.Unmarshal(rawSearchTagFamily, searchTagFamily)).ShouldNot(gm.HaveOccurred())
-		e := &stream_v1.ElementValue{
+		e := &streamv1.ElementValue{
 			ElementId: strconv.Itoa(i),
 			Timestamp: timestamppb.New(baseTime.Add(interval * time.Duration(i))),
-			TagFamilies: []*model_v1.TagFamilyForWrite{
+			TagFamilies: []*modelv1.TagFamilyForWrite{
 				{
-					Tags: []*model_v1.TagValue{
+					Tags: []*modelv1.TagValue{
 						{
-							Value: &model_v1.TagValue_BinaryData{
+							Value: &modelv1.TagValue_BinaryData{
 								BinaryData: bb,
 							},
 						},
@@ -147,8 +152,8 @@ func loadData(stream stream_v1.StreamService_WriteClient, dataFile string, baseT
 			},
 		}
 		e.TagFamilies = append(e.TagFamilies, searchTagFamily)
-		errInner := stream.Send(&stream_v1.WriteRequest{
-			Metadata: &common_v1.Metadata{
+		errInner := stream.Send(&streamv1.WriteRequest{
+			Metadata: &commonv1.Metadata{
 				Name:  "sw",
 				Group: "default",
 			},
@@ -160,7 +165,7 @@ func loadData(stream stream_v1.StreamService_WriteClient, dataFile string, baseT
 
 // Write data into the server
 func Write(conn *grpclib.ClientConn, dataFile string, baseTime time.Time, interval time.Duration) {
-	c := stream_v1.NewStreamServiceClient(conn)
+	c := streamv1.NewStreamServiceClient(conn)
 	ctx := context.Background()
 	writeClient, err := c.Write(ctx)
 	gm.Expect(err).NotTo(gm.HaveOccurred())
diff --git a/test/integration/cold_query/query_suite_test.go b/test/integration/cold_query/query_suite_test.go
index 6f9ed17..3971560 100644
--- a/test/integration/cold_query/query_suite_test.go
+++ b/test/integration/cold_query/query_suite_test.go
@@ -21,22 +21,23 @@ import (
 	"testing"
 	"time"
 
+	g "github.com/onsi/ginkgo/v2"
+	gm "github.com/onsi/gomega"
+	grpclib "google.golang.org/grpc"
+	"google.golang.org/grpc/credentials/insecure"
+
 	"github.com/apache/skywalking-banyandb/pkg/logger"
 	"github.com/apache/skywalking-banyandb/pkg/test/helpers"
 	"github.com/apache/skywalking-banyandb/pkg/test/setup"
 	"github.com/apache/skywalking-banyandb/pkg/timestamp"
-	cases_measure "github.com/apache/skywalking-banyandb/test/cases/measure"
-	cases_measure_data "github.com/apache/skywalking-banyandb/test/cases/measure/data"
-	cases_stream "github.com/apache/skywalking-banyandb/test/cases/stream"
-	. "github.com/onsi/ginkgo/v2"
-	. "github.com/onsi/gomega"
-	grpclib "google.golang.org/grpc"
-	"google.golang.org/grpc/credentials/insecure"
+	casesMeasure "github.com/apache/skywalking-banyandb/test/cases/measure"
+	casesMeasureData "github.com/apache/skywalking-banyandb/test/cases/measure/data"
+	casesStream "github.com/apache/skywalking-banyandb/test/cases/stream"
 )
 
 func TestIntegrationColdQuery(t *testing.T) {
-	RegisterFailHandler(Fail)
-	RunSpecs(t, "Integration Query Cold Data Suite")
+	gm.RegisterFailHandler(g.Fail)
+	g.RunSpecs(t, "Integration Query Cold Data Suite")
 }
 
 var (
@@ -45,25 +46,25 @@ var (
 	deferFunc  func()
 )
 
-var _ = SynchronizedBeforeSuite(func() []byte {
-	Expect(logger.Init(logger.Logging{
+var _ = g.SynchronizedBeforeSuite(func() []byte {
+	gm.Expect(logger.Init(logger.Logging{
 		Env:   "dev",
 		Level: "warn",
-	})).To(Succeed())
+	})).To(gm.Succeed())
 	var addr string
 	addr, deferFunc = setup.SetUp()
 	conn, err := grpclib.Dial(
 		addr,
 		grpclib.WithTransportCredentials(insecure.NewCredentials()),
 	)
-	Expect(err).NotTo(HaveOccurred())
+	gm.Expect(err).NotTo(gm.HaveOccurred())
 	now = timestamp.NowMilli().Add(-time.Hour * 24)
 	interval := 500 * time.Millisecond
-	cases_stream.Write(conn, "data.json", now, interval)
-	cases_measure_data.Write(conn, "service_traffic", "sw_metric", "service_traffic_data.json", now, interval)
-	cases_measure_data.Write(conn, "service_instance_traffic", "sw_metric", "service_instance_traffic_data.json", now, interval)
-	cases_measure_data.Write(conn, "service_cpm_minute", "sw_metric", "service_cpm_minute_data.json", now, interval)
-	Expect(conn.Close()).To(Succeed())
+	casesStream.Write(conn, "data.json", now, interval)
+	casesMeasureData.Write(conn, "service_traffic", "sw_metric", "service_traffic_data.json", now, interval)
+	casesMeasureData.Write(conn, "service_instance_traffic", "sw_metric", "service_instance_traffic_data.json", now, interval)
+	casesMeasureData.Write(conn, "service_cpm_minute", "sw_metric", "service_cpm_minute_data.json", now, interval)
+	gm.Expect(conn.Close()).To(gm.Succeed())
 	return []byte(addr)
 }, func(address []byte) {
 	var err error
@@ -72,20 +73,20 @@ var _ = SynchronizedBeforeSuite(func() []byte {
 		grpclib.WithTransportCredentials(insecure.NewCredentials()),
 		grpclib.WithBlock(),
 	)
-	cases_stream.SharedContext = helpers.SharedContext{
+	casesStream.SharedContext = helpers.SharedContext{
 		Connection: connection,
 		BaseTime:   now,
 	}
-	cases_measure.SharedContext = helpers.SharedContext{
+	casesMeasure.SharedContext = helpers.SharedContext{
 		Connection: connection,
 		BaseTime:   now,
 	}
-	Expect(err).NotTo(HaveOccurred())
+	gm.Expect(err).NotTo(gm.HaveOccurred())
 })
 
-var _ = SynchronizedAfterSuite(func() {
+var _ = g.SynchronizedAfterSuite(func() {
 	if connection != nil {
-		Expect(connection.Close()).To(Succeed())
+		gm.Expect(connection.Close()).To(gm.Succeed())
 	}
 }, func() {
 	deferFunc()
diff --git a/test/integration/other/measure_test.go b/test/integration/other/measure_test.go
index 3ba7f4b..f162e64 100644
--- a/test/integration/other/measure_test.go
+++ b/test/integration/other/measure_test.go
@@ -20,24 +20,24 @@ package integration_other_test
 import (
 	"time"
 
-	"github.com/apache/skywalking-banyandb/pkg/test/helpers"
-	"github.com/apache/skywalking-banyandb/pkg/test/setup"
-	"github.com/apache/skywalking-banyandb/pkg/timestamp"
-	. "github.com/onsi/ginkgo/v2"
-	. "github.com/onsi/gomega"
+	g "github.com/onsi/ginkgo/v2"
+	gm "github.com/onsi/gomega"
 	grpclib "google.golang.org/grpc"
 	"google.golang.org/grpc/credentials/insecure"
 
-	cases_measure "github.com/apache/skywalking-banyandb/test/cases/measure/data"
+	"github.com/apache/skywalking-banyandb/pkg/test/helpers"
+	"github.com/apache/skywalking-banyandb/pkg/test/setup"
+	"github.com/apache/skywalking-banyandb/pkg/timestamp"
+	casesMeasureData "github.com/apache/skywalking-banyandb/test/cases/measure/data"
 )
 
-var _ = Describe("Query service_cpm_minute", func() {
+var _ = g.Describe("Query service_cpm_minute", func() {
 	var deferFn func()
 	var baseTime time.Time
 	var interval time.Duration
 	var conn *grpclib.ClientConn
 
-	BeforeEach(func() {
+	g.BeforeEach(func() {
 		var addr string
 		addr, deferFn = setup.SetUp()
 		var err error
@@ -45,20 +45,22 @@ var _ = Describe("Query service_cpm_minute", func() {
 			addr,
 			grpclib.WithTransportCredentials(insecure.NewCredentials()),
 		)
-		Expect(err).NotTo(HaveOccurred())
+		gm.Expect(err).NotTo(gm.HaveOccurred())
 		baseTime = timestamp.NowMilli()
 		interval = 500 * time.Millisecond
-		cases_measure.Write(conn, "service_cpm_minute", "sw_metric", "service_cpm_minute_data.json", baseTime, interval)
+		casesMeasureData.Write(conn, "service_cpm_minute", "sw_metric", "service_cpm_minute_data.json", baseTime, interval)
 	})
-	AfterEach(func() {
-		Expect(conn.Close()).To(Succeed())
+	g.AfterEach(func() {
+		gm.Expect(conn.Close()).To(gm.Succeed())
 		deferFn()
 	})
-	It("queries service_cpm_minute by id after updating", func() {
-		cases_measure.Write(conn, "service_cpm_minute", "sw_metric", "service_cpm_minute_data1.json", baseTime, interval)
-		cases_measure.VerifyFn(helpers.SharedContext{
-			Connection: conn,
-			BaseTime:   baseTime,
-		}, helpers.Args{Input: "all", Want: "update", Duration: 1 * time.Hour})
+	g.It("queries service_cpm_minute by id after updating", func() {
+		casesMeasureData.Write(conn, "service_cpm_minute", "sw_metric", "service_cpm_minute_data1.json", baseTime, interval)
+		gm.Eventually(func(innerGm gm.Gomega) {
+			casesMeasureData.VerifyFn(innerGm, helpers.SharedContext{
+				Connection: conn,
+				BaseTime:   baseTime,
+			}, helpers.Args{Input: "all", Want: "update", Duration: 1 * time.Hour})
+		})
 	})
 })
diff --git a/test/integration/other/other_suite_test.go b/test/integration/other/other_suite_test.go
index 0225ce9..4eaee84 100644
--- a/test/integration/other/other_suite_test.go
+++ b/test/integration/other/other_suite_test.go
@@ -20,19 +20,20 @@ package integration_other_test
 import (
 	"testing"
 
+	g "github.com/onsi/ginkgo/v2"
+	gm "github.com/onsi/gomega"
+
 	"github.com/apache/skywalking-banyandb/pkg/logger"
-	. "github.com/onsi/ginkgo/v2"
-	. "github.com/onsi/gomega"
 )
 
 func TestIntegrationOther(t *testing.T) {
-	RegisterFailHandler(Fail)
-	RunSpecs(t, "Integration Other Suite")
+	gm.RegisterFailHandler(g.Fail)
+	g.RunSpecs(t, "Integration Other Suite")
 }
 
-var _ = BeforeSuite(func() {
-	Expect(logger.Init(logger.Logging{
+var _ = g.BeforeSuite(func() {
+	gm.Expect(logger.Init(logger.Logging{
 		Env:   "dev",
 		Level: "warn",
-	})).To(Succeed())
+	})).To(gm.Succeed())
 })
diff --git a/test/integration/other/tls_test.go b/test/integration/other/tls_test.go
index 607e7ca..97fc855 100644
--- a/test/integration/other/tls_test.go
+++ b/test/integration/other/tls_test.go
@@ -22,24 +22,24 @@ import (
 	"runtime"
 	"time"
 
-	"github.com/apache/skywalking-banyandb/pkg/test/helpers"
-	"github.com/apache/skywalking-banyandb/pkg/test/setup"
-	"github.com/apache/skywalking-banyandb/pkg/timestamp"
-	. "github.com/onsi/ginkgo/v2"
-	. "github.com/onsi/gomega"
+	g "github.com/onsi/ginkgo/v2"
+	gm "github.com/onsi/gomega"
 	grpclib "google.golang.org/grpc"
 	"google.golang.org/grpc/credentials"
 
-	cases_measure "github.com/apache/skywalking-banyandb/test/cases/measure/data"
+	"github.com/apache/skywalking-banyandb/pkg/test/helpers"
+	"github.com/apache/skywalking-banyandb/pkg/test/setup"
+	"github.com/apache/skywalking-banyandb/pkg/timestamp"
+	casesMeasureData "github.com/apache/skywalking-banyandb/test/cases/measure/data"
 )
 
-var _ = Describe("Query service_cpm_minute", func() {
+var _ = g.Describe("Query service_cpm_minute", func() {
 	var deferFn func()
 	var baseTime time.Time
 	var interval time.Duration
 	var conn *grpclib.ClientConn
 
-	BeforeEach(func() {
+	g.BeforeEach(func() {
 		_, currentFile, _, _ := runtime.Caller(0)
 		basePath := filepath.Dir(currentFile)
 		certFile := filepath.Join(basePath, "testdata/server_cert.pem")
@@ -48,21 +48,23 @@ var _ = Describe("Query service_cpm_minute", func() {
 		addr, deferFn = setup.SetUp("--tls=true", "--cert-file="+certFile, "--key-file="+keyFile)
 		var err error
 		creds, err := credentials.NewClientTLSFromFile(certFile, "localhost")
-		Expect(err).NotTo(HaveOccurred())
+		gm.Expect(err).NotTo(gm.HaveOccurred())
 		conn, err = grpclib.Dial(addr, grpclib.WithTransportCredentials(creds))
-		Expect(err).NotTo(HaveOccurred())
+		gm.Expect(err).NotTo(gm.HaveOccurred())
 		baseTime = timestamp.NowMilli()
 		interval = 500 * time.Millisecond
-		cases_measure.Write(conn, "service_cpm_minute", "sw_metric", "service_cpm_minute_data.json", baseTime, interval)
+		casesMeasureData.Write(conn, "service_cpm_minute", "sw_metric", "service_cpm_minute_data.json", baseTime, interval)
 	})
-	AfterEach(func() {
-		Expect(conn.Close()).To(Succeed())
+	g.AfterEach(func() {
+		gm.Expect(conn.Close()).To(gm.Succeed())
 		deferFn()
 	})
-	It("queries a tls server", func() {
-		cases_measure.VerifyFn(helpers.SharedContext{
-			Connection: conn,
-			BaseTime:   baseTime,
-		}, helpers.Args{Input: "all", Duration: 1 * time.Hour})
+	g.It("queries a tls server", func() {
+		gm.Eventually(func(innerGm gm.Gomega) {
+			casesMeasureData.VerifyFn(innerGm, helpers.SharedContext{
+				Connection: conn,
+				BaseTime:   baseTime,
+			}, helpers.Args{Input: "all", Duration: 1 * time.Hour})
+		})
 	})
 })
diff --git a/test/integration/query/query_suite_test.go b/test/integration/query/query_suite_test.go
index 424334a..3579482 100644
--- a/test/integration/query/query_suite_test.go
+++ b/test/integration/query/query_suite_test.go
@@ -21,6 +21,11 @@ import (
 	"testing"
 	"time"
 
+	g "github.com/onsi/ginkgo/v2"
+	gm "github.com/onsi/gomega"
+	grpclib "google.golang.org/grpc"
+	"google.golang.org/grpc/credentials/insecure"
+
 	"github.com/apache/skywalking-banyandb/pkg/logger"
 	"github.com/apache/skywalking-banyandb/pkg/test/helpers"
 	"github.com/apache/skywalking-banyandb/pkg/test/setup"
@@ -28,15 +33,11 @@ import (
 	cases_measure "github.com/apache/skywalking-banyandb/test/cases/measure"
 	cases_measure_data "github.com/apache/skywalking-banyandb/test/cases/measure/data"
 	cases_stream "github.com/apache/skywalking-banyandb/test/cases/stream"
-	. "github.com/onsi/ginkgo/v2"
-	. "github.com/onsi/gomega"
-	grpclib "google.golang.org/grpc"
-	"google.golang.org/grpc/credentials/insecure"
 )
 
 func TestIntegrationQuery(t *testing.T) {
-	RegisterFailHandler(Fail)
-	RunSpecs(t, "Integration Query Suite")
+	gm.RegisterFailHandler(g.Fail)
+	g.RunSpecs(t, "Integration Query Suite")
 }
 
 var (
@@ -45,25 +46,25 @@ var (
 	deferFunc  func()
 )
 
-var _ = SynchronizedBeforeSuite(func() []byte {
-	Expect(logger.Init(logger.Logging{
+var _ = g.SynchronizedBeforeSuite(func() []byte {
+	gm.Expect(logger.Init(logger.Logging{
 		Env:   "dev",
 		Level: "warn",
-	})).To(Succeed())
+	})).To(gm.Succeed())
 	var addr string
 	addr, deferFunc = setup.SetUp()
 	conn, err := grpclib.Dial(
 		addr,
 		grpclib.WithTransportCredentials(insecure.NewCredentials()),
 	)
-	Expect(err).NotTo(HaveOccurred())
+	gm.Expect(err).NotTo(gm.HaveOccurred())
 	now = timestamp.NowMilli()
 	interval := 500 * time.Millisecond
 	cases_stream.Write(conn, "data.json", now, interval)
 	cases_measure_data.Write(conn, "service_traffic", "sw_metric", "service_traffic_data.json", now, interval)
 	cases_measure_data.Write(conn, "service_instance_traffic", "sw_metric", "service_instance_traffic_data.json", now, interval)
 	cases_measure_data.Write(conn, "service_cpm_minute", "sw_metric", "service_cpm_minute_data.json", now, interval)
-	Expect(conn.Close()).To(Succeed())
+	gm.Expect(conn.Close()).To(gm.Succeed())
 	return []byte(addr)
 }, func(address []byte) {
 	var err error
@@ -80,12 +81,12 @@ var _ = SynchronizedBeforeSuite(func() []byte {
 		Connection: connection,
 		BaseTime:   now,
 	}
-	Expect(err).NotTo(HaveOccurred())
+	gm.Expect(err).NotTo(gm.HaveOccurred())
 })
 
-var _ = SynchronizedAfterSuite(func() {
+var _ = g.SynchronizedAfterSuite(func() {
 	if connection != nil {
-		Expect(connection.Close()).To(Succeed())
+		gm.Expect(connection.Close()).To(gm.Succeed())
 	}
 }, func() {
 	deferFunc()