You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by lu...@apache.org on 2022/10/06 13:10:49 UTC

[skywalking-banyandb] branch fix-async-write created (now 6498e3b)

This is an automated email from the ASF dual-hosted git repository.

lujiajing pushed a change to branch fix-async-write
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


      at 6498e3b  fix test cases with Eventually semantics

This branch includes the following new commits:

     new 6498e3b  fix test cases with Eventually semantics

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[skywalking-banyandb] 01/01: fix test cases with Eventually semantics

Posted by lu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lujiajing pushed a commit to branch fix-async-write
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit 6498e3bb5accaa4672c30ad3f8b174577ad4639e
Author: Megrez Lu <lu...@gmail.com>
AuthorDate: Thu Oct 6 21:10:37 2022 +0800

    fix test cases with Eventually semantics
    
    Signed-off-by: Megrez Lu <lu...@gmail.com>
---
 .../measure/measure_plan_indexscan_local.go        |   2 +-
 test/cases/measure/data/data.go                    |  60 +++++------
 test/cases/measure/measure.go                      |  39 ++++----
 test/cases/stream/stream.go                        | 111 +++++++++++----------
 test/integration/cold_query/query_suite_test.go    |  27 ++---
 test/integration/other/measure_test.go             |  22 ++--
 test/integration/other/tls_test.go                 |  20 ++--
 7 files changed, 147 insertions(+), 134 deletions(-)

diff --git a/pkg/query/logical/measure/measure_plan_indexscan_local.go b/pkg/query/logical/measure/measure_plan_indexscan_local.go
index 5002238..c9147d4 100644
--- a/pkg/query/logical/measure/measure_plan_indexscan_local.go
+++ b/pkg/query/logical/measure/measure_plan_indexscan_local.go
@@ -115,7 +115,7 @@ func (i *localIndexScan) Execute(ec executor.MeasureExecutionContext) (executor.
 		}
 	}
 	if len(seriesList) == 0 {
-		return nil, nil
+		return executor.EmptyMIterator, nil
 	}
 	var builders []logical.SeekerBuilder
 	if i.Index != nil {
diff --git a/test/cases/measure/data/data.go b/test/cases/measure/data/data.go
index 978af69..2910565 100644
--- a/test/cases/measure/data/data.go
+++ b/test/cases/measure/data/data.go
@@ -26,16 +26,16 @@ import (
 	"time"
 
 	"github.com/google/go-cmp/cmp"
-	g "github.com/onsi/ginkgo/v2"
-	gm "github.com/onsi/gomega"
+	. "github.com/onsi/ginkgo/v2"
+	. "github.com/onsi/gomega"
 	grpclib "google.golang.org/grpc"
 	"google.golang.org/protobuf/encoding/protojson"
 	"google.golang.org/protobuf/testing/protocmp"
 	"google.golang.org/protobuf/types/known/timestamppb"
 	"sigs.k8s.io/yaml"
 
-	common_v1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
-	measure_v1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
+	commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+	measurev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
 	"github.com/apache/skywalking-banyandb/pkg/test/helpers"
 )
 
@@ -46,38 +46,38 @@ var inputFS embed.FS
 var wantFS embed.FS
 
 // VerifyFn verify whether the query response matches the wanted result
-var VerifyFn = func(sharedContext helpers.SharedContext, args helpers.Args) {
+var VerifyFn = func(g Gomega, sharedContext helpers.SharedContext, args helpers.Args) {
 	i, err := inputFS.ReadFile("input/" + args.Input + ".yaml")
-	gm.Expect(err).NotTo(gm.HaveOccurred())
-	query := &measure_v1.QueryRequest{}
+	g.Expect(err).NotTo(HaveOccurred())
+	query := &measurev1.QueryRequest{}
 	helpers.UnmarshalYAML(i, query)
 	query.TimeRange = helpers.TimeRange(args, sharedContext)
-	c := measure_v1.NewMeasureServiceClient(sharedContext.Connection)
+	c := measurev1.NewMeasureServiceClient(sharedContext.Connection)
 	ctx := context.Background()
 	resp, err := c.Query(ctx, query)
 	if args.WantErr {
 		if err == nil {
-			g.Fail("expect error")
+			Fail("expect error")
 		}
 		return
 	}
-	gm.Expect(err).NotTo(gm.HaveOccurred(), query.String())
+	Expect(err).NotTo(HaveOccurred(), query.String())
 	if args.WantEmpty {
-		gm.Expect(resp.DataPoints).To(gm.BeEmpty())
+		Expect(resp.DataPoints).To(BeEmpty())
 		return
 	}
 	if args.Want == "" {
 		args.Want = args.Input
 	}
 	ww, err := wantFS.ReadFile("want/" + args.Want + ".yaml")
-	gm.Expect(err).NotTo(gm.HaveOccurred())
-	want := &measure_v1.QueryResponse{}
+	Expect(err).NotTo(HaveOccurred())
+	want := &measurev1.QueryResponse{}
 	helpers.UnmarshalYAML(ww, want)
-	gm.Expect(cmp.Equal(resp, want,
+	Expect(cmp.Equal(resp, want,
 		protocmp.IgnoreUnknown(),
-		protocmp.IgnoreFields(&measure_v1.DataPoint{}, "timestamp"),
+		protocmp.IgnoreFields(&measurev1.DataPoint{}, "timestamp"),
 		protocmp.Transform())).
-		To(gm.BeTrue(), func() string {
+		To(BeTrue(), func() string {
 			j, err := protojson.Marshal(resp)
 			if err != nil {
 				return err.Error()
@@ -93,19 +93,19 @@ var VerifyFn = func(sharedContext helpers.SharedContext, args helpers.Args) {
 //go:embed testdata/*.json
 var dataFS embed.FS
 
-func loadData(md *common_v1.Metadata, measure measure_v1.MeasureService_WriteClient, dataFile string, baseTime time.Time, interval time.Duration) {
+func loadData(md *commonv1.Metadata, measure measurev1.MeasureService_WriteClient, dataFile string, baseTime time.Time, interval time.Duration) {
 	var templates []interface{}
 	content, err := dataFS.ReadFile("testdata/" + dataFile)
-	gm.Expect(err).ShouldNot(gm.HaveOccurred())
-	gm.Expect(json.Unmarshal(content, &templates)).ShouldNot(gm.HaveOccurred())
+	Expect(err).ShouldNot(HaveOccurred())
+	Expect(json.Unmarshal(content, &templates)).ShouldNot(HaveOccurred())
 	for i, template := range templates {
 		rawDataPointValue, errMarshal := json.Marshal(template)
-		gm.Expect(errMarshal).ShouldNot(gm.HaveOccurred())
-		dataPointValue := &measure_v1.DataPointValue{}
-		gm.Expect(protojson.Unmarshal(rawDataPointValue, dataPointValue)).ShouldNot(gm.HaveOccurred())
+		Expect(errMarshal).ShouldNot(HaveOccurred())
+		dataPointValue := &measurev1.DataPointValue{}
+		Expect(protojson.Unmarshal(rawDataPointValue, dataPointValue)).ShouldNot(HaveOccurred())
 		dataPointValue.Timestamp = timestamppb.New(baseTime.Add(time.Duration(i) * time.Minute))
-		gm.Expect(measure.Send(&measure_v1.WriteRequest{Metadata: md, DataPoint: dataPointValue})).
-			Should(gm.Succeed())
+		Expect(measure.Send(&measurev1.WriteRequest{Metadata: md, DataPoint: dataPointValue})).
+			Should(Succeed())
 	}
 }
 
@@ -113,17 +113,17 @@ func loadData(md *common_v1.Metadata, measure measure_v1.MeasureService_WriteCli
 func Write(conn *grpclib.ClientConn, name, group, dataFile string,
 	baseTime time.Time, interval time.Duration,
 ) {
-	c := measure_v1.NewMeasureServiceClient(conn)
+	c := measurev1.NewMeasureServiceClient(conn)
 	ctx := context.Background()
 	writeClient, err := c.Write(ctx)
-	gm.Expect(err).NotTo(gm.HaveOccurred())
-	loadData(&common_v1.Metadata{
+	Expect(err).NotTo(HaveOccurred())
+	loadData(&commonv1.Metadata{
 		Name:  name,
 		Group: group,
 	}, writeClient, dataFile, baseTime, interval)
-	gm.Expect(writeClient.CloseSend()).To(gm.Succeed())
-	gm.Eventually(func() error {
+	Expect(writeClient.CloseSend()).To(Succeed())
+	Eventually(func() error {
 		_, err := writeClient.Recv()
 		return err
-	}).Should(gm.Equal(io.EOF))
+	}).Should(Equal(io.EOF))
 }
diff --git a/test/cases/measure/measure.go b/test/cases/measure/measure.go
index 87c9d13..7feab43 100644
--- a/test/cases/measure/measure.go
+++ b/test/cases/measure/measure.go
@@ -21,33 +21,36 @@ package measure_test
 import (
 	"time"
 
-	g "github.com/onsi/ginkgo/v2"
+	. "github.com/onsi/ginkgo/v2"
+	. "github.com/onsi/gomega"
 
 	"github.com/apache/skywalking-banyandb/pkg/test/helpers"
-	measure_test_data "github.com/apache/skywalking-banyandb/test/cases/measure/data"
+	measureTestData "github.com/apache/skywalking-banyandb/test/cases/measure/data"
 )
 
 var (
 	// SharedContext is the parallel execution context
 	SharedContext helpers.SharedContext
 	verify        = func(args helpers.Args) {
-		measure_test_data.VerifyFn(SharedContext, args)
+		Eventually(func(g Gomega) {
+			measureTestData.VerifyFn(g, SharedContext, args)
+		})
 	}
 )
 
-var _ = g.DescribeTable("Scanning Measures", verify,
-	g.Entry("all", helpers.Args{Input: "all", Duration: 1 * time.Hour}),
-	g.Entry("filter by tag", helpers.Args{Input: "tag_filter", Duration: 1 * time.Hour}),
-	g.Entry("filter by an unknown tag", helpers.Args{Input: "tag_filter_unknown", Duration: 1 * time.Hour, WantEmpty: true}),
-	g.Entry("group and max", helpers.Args{Input: "group_max", Duration: 1 * time.Hour}),
-	g.Entry("group without field", helpers.Args{Input: "group_no_field", Duration: 1 * time.Hour}),
-	g.Entry("top 2", helpers.Args{Input: "top", Duration: 1 * time.Hour}),
-	g.Entry("bottom 2", helpers.Args{Input: "bottom", Duration: 1 * time.Hour}),
-	g.Entry("order by time asc", helpers.Args{Input: "order_asc", Duration: 1 * time.Hour}),
-	g.Entry("order by time desc", helpers.Args{Input: "order_desc", Duration: 1 * time.Hour}),
-	g.Entry("limit 3,2", helpers.Args{Input: "limit", Duration: 1 * time.Hour}),
-	g.Entry("match a node", helpers.Args{Input: "match_node", Duration: 1 * time.Hour}),
-	g.Entry("match nodes", helpers.Args{Input: "match_nodes", Duration: 1 * time.Hour}),
-	g.Entry("filter by entity id", helpers.Args{Input: "entity", Duration: 1 * time.Hour}),
-	g.Entry("without field", helpers.Args{Input: "no_field", Duration: 1 * time.Hour}),
+var _ = DescribeTable("Scanning Measures", verify,
+	Entry("all", helpers.Args{Input: "all", Duration: 1 * time.Hour}),
+	Entry("filter by tag", helpers.Args{Input: "tag_filter", Duration: 1 * time.Hour}),
+	Entry("filter by an unknown tag", helpers.Args{Input: "tag_filter_unknown", Duration: 1 * time.Hour, WantEmpty: true}),
+	Entry("group and max", helpers.Args{Input: "group_max", Duration: 1 * time.Hour}),
+	Entry("group without field", helpers.Args{Input: "group_no_field", Duration: 1 * time.Hour}),
+	Entry("top 2", helpers.Args{Input: "top", Duration: 1 * time.Hour}),
+	Entry("bottom 2", helpers.Args{Input: "bottom", Duration: 1 * time.Hour}),
+	Entry("order by time asc", helpers.Args{Input: "order_asc", Duration: 1 * time.Hour}),
+	Entry("order by time desc", helpers.Args{Input: "order_desc", Duration: 1 * time.Hour}),
+	Entry("limit 3,2", helpers.Args{Input: "limit", Duration: 1 * time.Hour}),
+	Entry("match a node", helpers.Args{Input: "match_node", Duration: 1 * time.Hour}),
+	Entry("match nodes", helpers.Args{Input: "match_nodes", Duration: 1 * time.Hour}),
+	Entry("filter by entity id", helpers.Args{Input: "entity", Duration: 1 * time.Hour}),
+	Entry("without field", helpers.Args{Input: "no_field", Duration: 1 * time.Hour}),
 )
diff --git a/test/cases/stream/stream.go b/test/cases/stream/stream.go
index b532d45..c88c8a1 100644
--- a/test/cases/stream/stream.go
+++ b/test/cases/stream/stream.go
@@ -29,19 +29,18 @@ import (
 	"time"
 
 	"github.com/google/go-cmp/cmp"
-	g "github.com/onsi/ginkgo/v2"
-	gm "github.com/onsi/gomega"
-	"sigs.k8s.io/yaml"
-
-	common_v1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
-	model_v1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
-	stream_v1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
-	"github.com/apache/skywalking-banyandb/pkg/test/helpers"
-
+	. "github.com/onsi/ginkgo/v2"
+	. "github.com/onsi/gomega"
 	grpclib "google.golang.org/grpc"
 	"google.golang.org/protobuf/encoding/protojson"
 	"google.golang.org/protobuf/testing/protocmp"
 	"google.golang.org/protobuf/types/known/timestamppb"
+	"sigs.k8s.io/yaml"
+
+	commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+	modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+	streamv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
+	"github.com/apache/skywalking-banyandb/pkg/test/helpers"
 )
 
 // SharedContext is the parallel execution context
@@ -53,38 +52,38 @@ var inputFS embed.FS
 //go:embed want/*.yaml
 var wantFS embed.FS
 
-var _ = g.DescribeTable("Scanning Streams", func(args helpers.Args) {
+var verifyFn = func(g Gomega, args helpers.Args) {
 	i, err := inputFS.ReadFile("input/" + args.Input + ".yaml")
-	gm.Expect(err).NotTo(gm.HaveOccurred())
-	query := &stream_v1.QueryRequest{}
+	Expect(err).NotTo(HaveOccurred())
+	query := &streamv1.QueryRequest{}
 	helpers.UnmarshalYAML(i, query)
 	query.TimeRange = helpers.TimeRange(args, SharedContext)
-	c := stream_v1.NewStreamServiceClient(SharedContext.Connection)
+	c := streamv1.NewStreamServiceClient(SharedContext.Connection)
 	ctx := context.Background()
 	resp, err := c.Query(ctx, query)
 	if args.WantErr {
 		if err == nil {
-			g.Fail("expect error")
+			Fail("expect error")
 		}
 		return
 	}
-	gm.Expect(err).NotTo(gm.HaveOccurred(), query.String())
+	Expect(err).NotTo(HaveOccurred(), query.String())
 	if args.WantEmpty {
-		gm.Expect(resp.Elements).To(gm.BeEmpty())
+		Expect(resp.Elements).To(BeEmpty())
 		return
 	}
 	if args.Want == "" {
 		args.Want = args.Input
 	}
 	ww, err := wantFS.ReadFile("want/" + args.Want + ".yaml")
-	gm.Expect(err).NotTo(gm.HaveOccurred())
-	want := &stream_v1.QueryResponse{}
+	Expect(err).NotTo(HaveOccurred())
+	want := &streamv1.QueryResponse{}
 	helpers.UnmarshalYAML(ww, want)
-	gm.Expect(cmp.Equal(resp, want,
+	Expect(cmp.Equal(resp, want,
 		protocmp.IgnoreUnknown(),
-		protocmp.IgnoreFields(&stream_v1.Element{}, "timestamp"),
+		protocmp.IgnoreFields(&streamv1.Element{}, "timestamp"),
 		protocmp.Transform())).
-		To(gm.BeTrue(), func() string {
+		To(BeTrue(), func() string {
 			j, err := protojson.Marshal(resp)
 			if err != nil {
 				return err.Error()
@@ -95,49 +94,55 @@ var _ = g.DescribeTable("Scanning Streams", func(args helpers.Args) {
 			}
 			return string(y)
 		})
+}
+
+var _ = DescribeTable("Scanning Streams", func(args helpers.Args) {
+	Eventually(func(g Gomega) {
+		verifyFn(g, args)
+	})
 },
-	g.Entry("all elements", helpers.Args{Input: "all", Duration: 1 * time.Hour}),
-	g.Entry("limit", helpers.Args{Input: "limit", Duration: 1 * time.Hour}),
-	g.Entry("offset", helpers.Args{Input: "offset", Duration: 1 * time.Hour}),
-	g.Entry("nothing", helpers.Args{Input: "all", WantEmpty: true}),
-	g.Entry("invalid time range", helpers.Args{
+	Entry("all elements", helpers.Args{Input: "all", Duration: 1 * time.Hour}),
+	Entry("limit", helpers.Args{Input: "limit", Duration: 1 * time.Hour}),
+	Entry("offset", helpers.Args{Input: "offset", Duration: 1 * time.Hour}),
+	Entry("nothing", helpers.Args{Input: "all", WantEmpty: true}),
+	Entry("invalid time range", helpers.Args{
 		Input: "all",
 		Begin: timestamppb.New(time.Unix(0, int64(math.MinInt64+time.Millisecond)).Truncate(time.Millisecond)),
 		End:   timestamppb.New(time.Unix(0, math.MaxInt64).Truncate(time.Millisecond)),
 	}),
-	g.Entry("sort desc", helpers.Args{Input: "sort_desc", Duration: 1 * time.Hour}),
-	g.Entry("global index", helpers.Args{Input: "global_index", Duration: 1 * time.Hour}),
-	g.Entry("filter by non-indexed tag", helpers.Args{Input: "filter_tag", Duration: 1 * time.Hour}),
-	g.Entry("get empty result by non-indexed tag", helpers.Args{Input: "filter_tag_empty", Duration: 1 * time.Hour, WantEmpty: true}),
-	g.Entry("numeric local index: less", helpers.Args{Input: "less", Duration: 1 * time.Hour}),
-	g.Entry("numeric local index: less and eq", helpers.Args{Input: "less_eq", Duration: 1 * time.Hour}),
-	g.Entry("logical expression", helpers.Args{Input: "logical", Duration: 1 * time.Hour}),
-	g.Entry("having", helpers.Args{Input: "having", Duration: 1 * time.Hour}),
-	g.Entry("full text searching", helpers.Args{Input: "search", Duration: 1 * time.Hour}),
+	Entry("sort desc", helpers.Args{Input: "sort_desc", Duration: 1 * time.Hour}),
+	Entry("global index", helpers.Args{Input: "global_index", Duration: 1 * time.Hour}),
+	Entry("filter by non-indexed tag", helpers.Args{Input: "filter_tag", Duration: 1 * time.Hour}),
+	Entry("get empty result by non-indexed tag", helpers.Args{Input: "filter_tag_empty", Duration: 1 * time.Hour, WantEmpty: true}),
+	Entry("numeric local index: less", helpers.Args{Input: "less", Duration: 1 * time.Hour}),
+	Entry("numeric local index: less and eq", helpers.Args{Input: "less_eq", Duration: 1 * time.Hour}),
+	Entry("logical expression", helpers.Args{Input: "logical", Duration: 1 * time.Hour}),
+	Entry("having", helpers.Args{Input: "having", Duration: 1 * time.Hour}),
+	Entry("full text searching", helpers.Args{Input: "search", Duration: 1 * time.Hour}),
 )
 
 //go:embed testdata/*.json
 var dataFS embed.FS
 
-func loadData(stream stream_v1.StreamService_WriteClient, dataFile string, baseTime time.Time, interval time.Duration) {
+func loadData(stream streamv1.StreamService_WriteClient, dataFile string, baseTime time.Time, interval time.Duration) {
 	var templates []interface{}
 	content, err := dataFS.ReadFile("testdata/" + dataFile)
-	gm.Expect(err).ShouldNot(gm.HaveOccurred())
-	gm.Expect(json.Unmarshal(content, &templates)).ShouldNot(gm.HaveOccurred())
+	Expect(err).ShouldNot(HaveOccurred())
+	Expect(json.Unmarshal(content, &templates)).ShouldNot(HaveOccurred())
 	bb, _ := base64.StdEncoding.DecodeString("YWJjMTIzIT8kKiYoKSctPUB+")
 	for i, template := range templates {
 		rawSearchTagFamily, errMarshal := json.Marshal(template)
-		gm.Expect(errMarshal).ShouldNot(gm.HaveOccurred())
-		searchTagFamily := &model_v1.TagFamilyForWrite{}
-		gm.Expect(protojson.Unmarshal(rawSearchTagFamily, searchTagFamily)).ShouldNot(gm.HaveOccurred())
-		e := &stream_v1.ElementValue{
+		Expect(errMarshal).ShouldNot(HaveOccurred())
+		searchTagFamily := &modelv1.TagFamilyForWrite{}
+		Expect(protojson.Unmarshal(rawSearchTagFamily, searchTagFamily)).ShouldNot(HaveOccurred())
+		e := &streamv1.ElementValue{
 			ElementId: strconv.Itoa(i),
 			Timestamp: timestamppb.New(baseTime.Add(interval * time.Duration(i))),
-			TagFamilies: []*model_v1.TagFamilyForWrite{
+			TagFamilies: []*modelv1.TagFamilyForWrite{
 				{
-					Tags: []*model_v1.TagValue{
+					Tags: []*modelv1.TagValue{
 						{
-							Value: &model_v1.TagValue_BinaryData{
+							Value: &modelv1.TagValue_BinaryData{
 								BinaryData: bb,
 							},
 						},
@@ -146,27 +151,27 @@ func loadData(stream stream_v1.StreamService_WriteClient, dataFile string, baseT
 			},
 		}
 		e.TagFamilies = append(e.TagFamilies, searchTagFamily)
-		errInner := stream.Send(&stream_v1.WriteRequest{
-			Metadata: &common_v1.Metadata{
+		errInner := stream.Send(&streamv1.WriteRequest{
+			Metadata: &commonv1.Metadata{
 				Name:  "sw",
 				Group: "default",
 			},
 			Element: e,
 		})
-		gm.Expect(errInner).ShouldNot(gm.HaveOccurred())
+		Expect(errInner).ShouldNot(HaveOccurred())
 	}
 }
 
 // Write data into the server
 func Write(conn *grpclib.ClientConn, dataFile string, baseTime time.Time, interval time.Duration) {
-	c := stream_v1.NewStreamServiceClient(conn)
+	c := streamv1.NewStreamServiceClient(conn)
 	ctx := context.Background()
 	writeClient, err := c.Write(ctx)
-	gm.Expect(err).NotTo(gm.HaveOccurred())
+	Expect(err).NotTo(HaveOccurred())
 	loadData(writeClient, dataFile, baseTime, interval)
-	gm.Expect(writeClient.CloseSend()).To(gm.Succeed())
-	gm.Eventually(func() error {
+	Expect(writeClient.CloseSend()).To(Succeed())
+	Eventually(func() error {
 		_, err := writeClient.Recv()
 		return err
-	}).Should(gm.Equal(io.EOF))
+	}).Should(Equal(io.EOF))
 }
diff --git a/test/integration/cold_query/query_suite_test.go b/test/integration/cold_query/query_suite_test.go
index 6f9ed17..822ec1f 100644
--- a/test/integration/cold_query/query_suite_test.go
+++ b/test/integration/cold_query/query_suite_test.go
@@ -21,17 +21,18 @@ import (
 	"testing"
 	"time"
 
-	"github.com/apache/skywalking-banyandb/pkg/logger"
-	"github.com/apache/skywalking-banyandb/pkg/test/helpers"
-	"github.com/apache/skywalking-banyandb/pkg/test/setup"
-	"github.com/apache/skywalking-banyandb/pkg/timestamp"
-	cases_measure "github.com/apache/skywalking-banyandb/test/cases/measure"
-	cases_measure_data "github.com/apache/skywalking-banyandb/test/cases/measure/data"
-	cases_stream "github.com/apache/skywalking-banyandb/test/cases/stream"
 	. "github.com/onsi/ginkgo/v2"
 	. "github.com/onsi/gomega"
 	grpclib "google.golang.org/grpc"
 	"google.golang.org/grpc/credentials/insecure"
+
+	"github.com/apache/skywalking-banyandb/pkg/logger"
+	"github.com/apache/skywalking-banyandb/pkg/test/helpers"
+	"github.com/apache/skywalking-banyandb/pkg/test/setup"
+	"github.com/apache/skywalking-banyandb/pkg/timestamp"
+	casesMeasure "github.com/apache/skywalking-banyandb/test/cases/measure"
+	casesMeasureData "github.com/apache/skywalking-banyandb/test/cases/measure/data"
+	casesStream "github.com/apache/skywalking-banyandb/test/cases/stream"
 )
 
 func TestIntegrationColdQuery(t *testing.T) {
@@ -59,10 +60,10 @@ var _ = SynchronizedBeforeSuite(func() []byte {
 	Expect(err).NotTo(HaveOccurred())
 	now = timestamp.NowMilli().Add(-time.Hour * 24)
 	interval := 500 * time.Millisecond
-	cases_stream.Write(conn, "data.json", now, interval)
-	cases_measure_data.Write(conn, "service_traffic", "sw_metric", "service_traffic_data.json", now, interval)
-	cases_measure_data.Write(conn, "service_instance_traffic", "sw_metric", "service_instance_traffic_data.json", now, interval)
-	cases_measure_data.Write(conn, "service_cpm_minute", "sw_metric", "service_cpm_minute_data.json", now, interval)
+	casesStream.Write(conn, "data.json", now, interval)
+	casesMeasureData.Write(conn, "service_traffic", "sw_metric", "service_traffic_data.json", now, interval)
+	casesMeasureData.Write(conn, "service_instance_traffic", "sw_metric", "service_instance_traffic_data.json", now, interval)
+	casesMeasureData.Write(conn, "service_cpm_minute", "sw_metric", "service_cpm_minute_data.json", now, interval)
 	Expect(conn.Close()).To(Succeed())
 	return []byte(addr)
 }, func(address []byte) {
@@ -72,11 +73,11 @@ var _ = SynchronizedBeforeSuite(func() []byte {
 		grpclib.WithTransportCredentials(insecure.NewCredentials()),
 		grpclib.WithBlock(),
 	)
-	cases_stream.SharedContext = helpers.SharedContext{
+	casesStream.SharedContext = helpers.SharedContext{
 		Connection: connection,
 		BaseTime:   now,
 	}
-	cases_measure.SharedContext = helpers.SharedContext{
+	casesMeasure.SharedContext = helpers.SharedContext{
 		Connection: connection,
 		BaseTime:   now,
 	}
diff --git a/test/integration/other/measure_test.go b/test/integration/other/measure_test.go
index 3ba7f4b..aab5ef0 100644
--- a/test/integration/other/measure_test.go
+++ b/test/integration/other/measure_test.go
@@ -20,15 +20,15 @@ package integration_other_test
 import (
 	"time"
 
-	"github.com/apache/skywalking-banyandb/pkg/test/helpers"
-	"github.com/apache/skywalking-banyandb/pkg/test/setup"
-	"github.com/apache/skywalking-banyandb/pkg/timestamp"
 	. "github.com/onsi/ginkgo/v2"
 	. "github.com/onsi/gomega"
 	grpclib "google.golang.org/grpc"
 	"google.golang.org/grpc/credentials/insecure"
 
-	cases_measure "github.com/apache/skywalking-banyandb/test/cases/measure/data"
+	"github.com/apache/skywalking-banyandb/pkg/test/helpers"
+	"github.com/apache/skywalking-banyandb/pkg/test/setup"
+	"github.com/apache/skywalking-banyandb/pkg/timestamp"
+	casesMeasureData "github.com/apache/skywalking-banyandb/test/cases/measure/data"
 )
 
 var _ = Describe("Query service_cpm_minute", func() {
@@ -48,17 +48,19 @@ var _ = Describe("Query service_cpm_minute", func() {
 		Expect(err).NotTo(HaveOccurred())
 		baseTime = timestamp.NowMilli()
 		interval = 500 * time.Millisecond
-		cases_measure.Write(conn, "service_cpm_minute", "sw_metric", "service_cpm_minute_data.json", baseTime, interval)
+		casesMeasureData.Write(conn, "service_cpm_minute", "sw_metric", "service_cpm_minute_data.json", baseTime, interval)
 	})
 	AfterEach(func() {
 		Expect(conn.Close()).To(Succeed())
 		deferFn()
 	})
 	It("queries service_cpm_minute by id after updating", func() {
-		cases_measure.Write(conn, "service_cpm_minute", "sw_metric", "service_cpm_minute_data1.json", baseTime, interval)
-		cases_measure.VerifyFn(helpers.SharedContext{
-			Connection: conn,
-			BaseTime:   baseTime,
-		}, helpers.Args{Input: "all", Want: "update", Duration: 1 * time.Hour})
+		casesMeasureData.Write(conn, "service_cpm_minute", "sw_metric", "service_cpm_minute_data1.json", baseTime, interval)
+		Eventually(func(g Gomega) {
+			casesMeasureData.VerifyFn(g, helpers.SharedContext{
+				Connection: conn,
+				BaseTime:   baseTime,
+			}, helpers.Args{Input: "all", Want: "update", Duration: 1 * time.Hour})
+		})
 	})
 })
diff --git a/test/integration/other/tls_test.go b/test/integration/other/tls_test.go
index 607e7ca..603ebd2 100644
--- a/test/integration/other/tls_test.go
+++ b/test/integration/other/tls_test.go
@@ -22,15 +22,15 @@ import (
 	"runtime"
 	"time"
 
-	"github.com/apache/skywalking-banyandb/pkg/test/helpers"
-	"github.com/apache/skywalking-banyandb/pkg/test/setup"
-	"github.com/apache/skywalking-banyandb/pkg/timestamp"
 	. "github.com/onsi/ginkgo/v2"
 	. "github.com/onsi/gomega"
 	grpclib "google.golang.org/grpc"
 	"google.golang.org/grpc/credentials"
 
-	cases_measure "github.com/apache/skywalking-banyandb/test/cases/measure/data"
+	"github.com/apache/skywalking-banyandb/pkg/test/helpers"
+	"github.com/apache/skywalking-banyandb/pkg/test/setup"
+	"github.com/apache/skywalking-banyandb/pkg/timestamp"
+	casesMeasureData "github.com/apache/skywalking-banyandb/test/cases/measure/data"
 )
 
 var _ = Describe("Query service_cpm_minute", func() {
@@ -53,16 +53,18 @@ var _ = Describe("Query service_cpm_minute", func() {
 		Expect(err).NotTo(HaveOccurred())
 		baseTime = timestamp.NowMilli()
 		interval = 500 * time.Millisecond
-		cases_measure.Write(conn, "service_cpm_minute", "sw_metric", "service_cpm_minute_data.json", baseTime, interval)
+		casesMeasureData.Write(conn, "service_cpm_minute", "sw_metric", "service_cpm_minute_data.json", baseTime, interval)
 	})
 	AfterEach(func() {
 		Expect(conn.Close()).To(Succeed())
 		deferFn()
 	})
 	It("queries a tls server", func() {
-		cases_measure.VerifyFn(helpers.SharedContext{
-			Connection: conn,
-			BaseTime:   baseTime,
-		}, helpers.Args{Input: "all", Duration: 1 * time.Hour})
+		Eventually(func(g Gomega) {
+			casesMeasureData.VerifyFn(g, helpers.SharedContext{
+				Connection: conn,
+				BaseTime:   baseTime,
+			}, helpers.Args{Input: "all", Duration: 1 * time.Hour})
+		})
 	})
 })