You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by lu...@apache.org on 2022/04/04 11:11:03 UTC

[skywalking-banyandb] branch add-liaison-measure-test created (now ee6e814)

This is an automated email from the ASF dual-hosted git repository.

lujiajing pushed a change to branch add-liaison-measure-test
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


      at ee6e814  add tests

This branch includes the following new commits:

     new ee6e814  add tests

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[skywalking-banyandb] 01/01: add tests

Posted by lu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lujiajing pushed a commit to branch add-liaison-measure-test
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit ee6e81479ca024e93f2db0bf571bb52f1c39af7d
Author: Megrez Lu <lu...@gmail.com>
AuthorDate: Mon Apr 4 19:10:53 2022 +0800

    add tests
---
 .../grpc/{stream_test.go => measure_test.go}       | 122 +++++++--------------
 banyand/liaison/grpc/stream_test.go                |  25 +++--
 pkg/pb/v1/write.go                                 | 118 ++++++++++++++++++++
 3 files changed, 171 insertions(+), 94 deletions(-)

diff --git a/banyand/liaison/grpc/stream_test.go b/banyand/liaison/grpc/measure_test.go
similarity index 54%
copy from banyand/liaison/grpc/stream_test.go
copy to banyand/liaison/grpc/measure_test.go
index a116426..a7ae577 100644
--- a/banyand/liaison/grpc/stream_test.go
+++ b/banyand/liaison/grpc/measure_test.go
@@ -19,7 +19,6 @@ package grpc_test
 
 import (
 	"context"
-	"encoding/base64"
 	"io"
 	"path/filepath"
 	"runtime"
@@ -30,19 +29,14 @@ import (
 	grpclib "google.golang.org/grpc"
 	"google.golang.org/grpc/credentials"
 
-	streamv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
-	"github.com/apache/skywalking-banyandb/banyand/discovery"
-	"github.com/apache/skywalking-banyandb/banyand/liaison/grpc"
+	measurev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
 	"github.com/apache/skywalking-banyandb/banyand/metadata"
-	"github.com/apache/skywalking-banyandb/banyand/query"
-	"github.com/apache/skywalking-banyandb/banyand/queue"
-	"github.com/apache/skywalking-banyandb/banyand/stream"
 	pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
 	"github.com/apache/skywalking-banyandb/pkg/test"
-	teststream "github.com/apache/skywalking-banyandb/pkg/test/stream"
+	testmeasure "github.com/apache/skywalking-banyandb/pkg/test/measure"
 )
 
-var _ = Describe("Stream", func() {
+var _ = Describe("Measure", func() {
 	var rootPath, metadataPath string
 	var gracefulStop, deferRootFunc, deferMetadataFunc func()
 	var conn *grpclib.ClientConn
@@ -55,14 +49,14 @@ var _ = Describe("Stream", func() {
 	})
 	It("is a plain server", func() {
 		By("Verifying an empty server")
-		flags := []string{"--stream-root-path=" + rootPath, "--metadata-root-path=" + metadataPath}
+		flags := []string{"--measure-root-path=" + rootPath, "--metadata-root-path=" + metadataPath}
 		gracefulStop = setup(flags)
 		var err error
 		conn, err = grpclib.Dial("localhost:17912", grpclib.WithInsecure())
 		Expect(err).NotTo(HaveOccurred())
-		streamWrite(conn)
+		measureWrite(conn)
 		Eventually(func() (int, error) {
-			return streamQuery(conn)
+			return measureQuery(conn)
 		}).Should(Equal(1))
 		_ = conn.Close()
 		gracefulStop()
@@ -71,16 +65,16 @@ var _ = Describe("Stream", func() {
 		conn, err = grpclib.Dial("localhost:17912", grpclib.WithInsecure())
 		Expect(err).NotTo(HaveOccurred())
 		Eventually(func() int {
-			num, err := streamQuery(conn)
+			num, err := measureQuery(conn)
 			if err != nil {
-				GinkgoWriter.Printf("stream query err: %v \n", err)
+				GinkgoWriter.Printf("measure query err: %v \n", err)
 				return 0
 			}
 			return num
 		}).Should(Equal(1))
 	})
 	It("is a TLS server", func() {
-		flags := []string{"--tls=true", "--stream-root-path=" + rootPath, "--metadata-root-path=" + metadataPath}
+		flags := []string{"--tls=true", "--measure-root-path=" + rootPath, "--metadata-root-path=" + metadataPath}
 		_, currentFile, _, _ := runtime.Caller(0)
 		basePath := filepath.Dir(currentFile)
 		certFile := filepath.Join(basePath, "testdata/server_cert.pem")
@@ -94,9 +88,9 @@ var _ = Describe("Stream", func() {
 		Expect(err).NotTo(HaveOccurred())
 		conn, err = grpclib.Dial(addr, grpclib.WithTransportCredentials(creds))
 		Expect(err).NotTo(HaveOccurred())
-		streamWrite(conn)
+		measureWrite(conn)
 		Eventually(func() (int, error) {
-			return streamQuery(conn)
+			return measureQuery(conn)
 		}).Should(Equal(1))
 	})
 	AfterEach(func() {
@@ -107,90 +101,48 @@ var _ = Describe("Stream", func() {
 	})
 })
 
-func setup(flags []string) func() {
-	// Init `Discovery` module
-	repo, err := discovery.NewServiceRepo(context.Background())
-	Expect(err).NotTo(HaveOccurred())
-	// Init `Queue` module
-	pipeline, err := queue.NewQueue(context.TODO(), repo)
-	Expect(err).NotTo(HaveOccurred())
-	// Init `Metadata` module
-	metaSvc, err := metadata.NewService(context.TODO())
-	Expect(err).NotTo(HaveOccurred())
-	// Init `Stream` module
-	streamSvc, err := stream.NewService(context.TODO(), metaSvc, repo, pipeline)
-	Expect(err).NotTo(HaveOccurred())
-	// Init `Query` module
-	q, err := query.NewExecutor(context.TODO(), streamSvc, nil, metaSvc, repo, pipeline)
-	Expect(err).NotTo(HaveOccurred())
-
-	tcp := grpc.NewServer(context.TODO(), pipeline, repo, metaSvc)
-	preloadStreamSvc := &preloadStreamService{metaSvc: metaSvc}
-
-	return test.SetUpModules(
-		flags,
-		repo,
-		pipeline,
-		metaSvc,
-		preloadStreamSvc,
-		streamSvc,
-		q,
-		tcp,
-	)
+func writeMeasureData() *measurev1.WriteRequest {
+	return pbv1.NewMeasureWriteRequestBuilder().
+		Metadata("sw_metric", "service_cpm_minute").
+		Timestamp(time.Now()).
+		TagFamily(
+			pbv1.ID("1"),
+			"entity_1",
+		).Fields(100, 1).
+		Build()
 }
 
-type preloadStreamService struct {
+type preloadMeasureService struct {
 	metaSvc metadata.Service
 }
 
-func (p *preloadStreamService) Name() string {
+func (p *preloadMeasureService) Name() string {
 	return "preload-measure"
 }
 
-func (p *preloadStreamService) PreRun() error {
-	return teststream.PreloadSchema(p.metaSvc.SchemaRegistry())
-}
-
-func writeData() *streamv1.WriteRequest {
-	bb, _ := base64.StdEncoding.DecodeString("YWJjMTIzIT8kKiYoKSctPUB+")
-	return pbv1.NewStreamWriteRequestBuilder().
-		ID("1").
-		Metadata("default", "sw").
-		Timestamp(time.Now()).
-		TagFamily(bb).
-		TagFamily(
-			"trace_id-xxfff.111",
-			0,
-			"webapp_id",
-			"10.0.0.1_id",
-			"/home_id",
-			300,
-			1622933202000000000,
-		).
-		Build()
+func (p *preloadMeasureService) PreRun() error {
+	return testmeasure.PreloadSchema(p.metaSvc.SchemaRegistry())
 }
 
-func queryCriteria(baseTs time.Time) *streamv1.QueryRequest {
-	return pbv1.NewStreamQueryRequestBuilder().
-		Limit(10).
-		Offset(0).
-		Metadata("default", "sw").
+func queryMeasureCriteria(baseTs time.Time) *measurev1.QueryRequest {
+	return pbv1.NewMeasureQueryRequestBuilder().
+		Metadata("sw_metric", "service_cpm_minute").
 		TimeRange(baseTs.Add(-1*time.Minute), baseTs.Add(1*time.Minute)).
-		Projection("searchable", "trace_id").
+		TagProjection("searchable", "trace_id").
 		Build()
 }
 
-func streamWrite(conn *grpclib.ClientConn) {
-	client := streamv1.NewStreamServiceClient(conn)
+func measureWrite(conn *grpclib.ClientConn) {
+	c := measurev1.NewMeasureServiceClient(conn)
 	ctx := context.Background()
-	var writeClient streamv1.StreamService_WriteClient
+	var writeClient measurev1.MeasureService_WriteClient
 	Eventually(func(g Gomega) {
 		var err error
-		writeClient, err = client.Write(ctx)
+		writeClient, err = c.Write(ctx)
 		g.Expect(err).NotTo(HaveOccurred())
 	}).Should(Succeed())
 	Eventually(func() error {
-		return writeClient.Send(writeData())
+		return writeClient.Send(writeMeasureData())
 	}).ShouldNot(HaveOccurred())
 	Expect(writeClient.CloseSend()).Should(Succeed())
 	Eventually(func() error {
@@ -199,10 +151,10 @@ func streamWrite(conn *grpclib.ClientConn) {
 	}).Should(Equal(io.EOF))
 }
 
-func streamQuery(conn *grpclib.ClientConn) (int, error) {
-	client := streamv1.NewStreamServiceClient(conn)
+func measureQuery(conn *grpclib.ClientConn) (int, error) {
+	c := measurev1.NewMeasureServiceClient(conn)
 	ctx := context.Background()
-	resp, err := client.Query(ctx, queryCriteria(time.Now()))
+	resp, err := c.Query(ctx, queryMeasureCriteria(time.Now()))
 
-	return len(resp.GetElements()), err
+	return len(resp.GetDataPoints()), err
 }
diff --git a/banyand/liaison/grpc/stream_test.go b/banyand/liaison/grpc/stream_test.go
index a116426..dbc271d 100644
--- a/banyand/liaison/grpc/stream_test.go
+++ b/banyand/liaison/grpc/stream_test.go
@@ -33,6 +33,7 @@ import (
 	streamv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
 	"github.com/apache/skywalking-banyandb/banyand/discovery"
 	"github.com/apache/skywalking-banyandb/banyand/liaison/grpc"
+	"github.com/apache/skywalking-banyandb/banyand/measure"
 	"github.com/apache/skywalking-banyandb/banyand/metadata"
 	"github.com/apache/skywalking-banyandb/banyand/query"
 	"github.com/apache/skywalking-banyandb/banyand/queue"
@@ -120,12 +121,16 @@ func setup(flags []string) func() {
 	// Init `Stream` module
 	streamSvc, err := stream.NewService(context.TODO(), metaSvc, repo, pipeline)
 	Expect(err).NotTo(HaveOccurred())
+	// Init `Measure` module
+	measureSvc, err := measure.NewService(context.TODO(), metaSvc, repo, pipeline)
+	Expect(err).NotTo(HaveOccurred())
 	// Init `Query` module
-	q, err := query.NewExecutor(context.TODO(), streamSvc, nil, metaSvc, repo, pipeline)
+	q, err := query.NewExecutor(context.TODO(), streamSvc, measureSvc, metaSvc, repo, pipeline)
 	Expect(err).NotTo(HaveOccurred())
 
 	tcp := grpc.NewServer(context.TODO(), pipeline, repo, metaSvc)
 	preloadStreamSvc := &preloadStreamService{metaSvc: metaSvc}
+	preloadMeasureSvc := &preloadMeasureService{metaSvc: metaSvc}
 
 	return test.SetUpModules(
 		flags,
@@ -133,7 +138,9 @@ func setup(flags []string) func() {
 		pipeline,
 		metaSvc,
 		preloadStreamSvc,
+		preloadMeasureSvc,
 		streamSvc,
+		measureSvc,
 		q,
 		tcp,
 	)
@@ -144,14 +151,14 @@ type preloadStreamService struct {
 }
 
 func (p *preloadStreamService) Name() string {
-	return "preload-measure"
+	return "preload-stream"
 }
 
 func (p *preloadStreamService) PreRun() error {
 	return teststream.PreloadSchema(p.metaSvc.SchemaRegistry())
 }
 
-func writeData() *streamv1.WriteRequest {
+func writeStreamData() *streamv1.WriteRequest {
 	bb, _ := base64.StdEncoding.DecodeString("YWJjMTIzIT8kKiYoKSctPUB+")
 	return pbv1.NewStreamWriteRequestBuilder().
 		ID("1").
@@ -170,7 +177,7 @@ func writeData() *streamv1.WriteRequest {
 		Build()
 }
 
-func queryCriteria(baseTs time.Time) *streamv1.QueryRequest {
+func queryStreamCriteria(baseTs time.Time) *streamv1.QueryRequest {
 	return pbv1.NewStreamQueryRequestBuilder().
 		Limit(10).
 		Offset(0).
@@ -181,16 +188,16 @@ func queryCriteria(baseTs time.Time) *streamv1.QueryRequest {
 }
 
 func streamWrite(conn *grpclib.ClientConn) {
-	client := streamv1.NewStreamServiceClient(conn)
+	c := streamv1.NewStreamServiceClient(conn)
 	ctx := context.Background()
 	var writeClient streamv1.StreamService_WriteClient
 	Eventually(func(g Gomega) {
 		var err error
-		writeClient, err = client.Write(ctx)
+		writeClient, err = c.Write(ctx)
 		g.Expect(err).NotTo(HaveOccurred())
 	}).Should(Succeed())
 	Eventually(func() error {
-		return writeClient.Send(writeData())
+		return writeClient.Send(writeStreamData())
 	}).ShouldNot(HaveOccurred())
 	Expect(writeClient.CloseSend()).Should(Succeed())
 	Eventually(func() error {
@@ -200,9 +207,9 @@ func streamWrite(conn *grpclib.ClientConn) {
 }
 
 func streamQuery(conn *grpclib.ClientConn) (int, error) {
-	client := streamv1.NewStreamServiceClient(conn)
+	c := streamv1.NewStreamServiceClient(conn)
 	ctx := context.Background()
-	resp, err := client.Query(ctx, queryCriteria(time.Now()))
+	resp, err := c.Query(ctx, queryStreamCriteria(time.Now()))
 
 	return len(resp.GetElements()), err
 }
diff --git a/pkg/pb/v1/write.go b/pkg/pb/v1/write.go
index e39a3a0..7eb8cbc 100644
--- a/pkg/pb/v1/write.go
+++ b/pkg/pb/v1/write.go
@@ -26,11 +26,14 @@ import (
 	"google.golang.org/protobuf/types/known/timestamppb"
 
 	commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+	measurev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
 	modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
 	streamv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
 	"github.com/apache/skywalking-banyandb/pkg/convert"
 )
 
+type ID string
+
 const strDelimiter = "\n"
 
 var ErrUnsupportedTagForIndexField = errors.New("the tag type(for example, null) can not be as the index field value")
@@ -131,6 +134,121 @@ func getTag(tag interface{}) *modelv1.TagValue {
 				BinaryData: t,
 			},
 		}
+	case ID:
+		return &modelv1.TagValue{
+			Value: &modelv1.TagValue_Id{
+				Id: &modelv1.ID{
+					Value: string(t),
+				},
+			},
+		}
+	}
+	return nil
+}
+
+type MeasureWriteRequestBuilder struct {
+	ec *measurev1.WriteRequest
+}
+
+func NewMeasureWriteRequestBuilder() *MeasureWriteRequestBuilder {
+	return &MeasureWriteRequestBuilder{
+		ec: &measurev1.WriteRequest{
+			DataPoint: &measurev1.DataPointValue{
+				TagFamilies: make([]*modelv1.TagFamilyForWrite, 0),
+				Fields:      make([]*modelv1.FieldValue, 0),
+			},
+		},
+	}
+}
+
+func (b *MeasureWriteRequestBuilder) Metadata(group, name string) *MeasureWriteRequestBuilder {
+	b.ec.Metadata = &commonv1.Metadata{
+		Group: group,
+		Name:  name,
+	}
+	return b
+}
+
+func (b *MeasureWriteRequestBuilder) TagFamily(tags ...interface{}) *MeasureWriteRequestBuilder {
+	tagFamily := &modelv1.TagFamilyForWrite{}
+	for _, tag := range tags {
+		tagFamily.Tags = append(tagFamily.Tags, getTag(tag))
+	}
+	b.ec.DataPoint.TagFamilies = append(b.ec.DataPoint.TagFamilies, tagFamily)
+	return b
+}
+
+func (b *MeasureWriteRequestBuilder) Fields(fields ...interface{}) *MeasureWriteRequestBuilder {
+	var fieldValues []*modelv1.FieldValue
+	for _, field := range fields {
+		fieldValues = append(fieldValues, getField(field))
+	}
+	b.ec.DataPoint.Fields = append(b.ec.DataPoint.Fields, fieldValues...)
+	return b
+}
+
+func (b *MeasureWriteRequestBuilder) Timestamp(t time.Time) *MeasureWriteRequestBuilder {
+	b.ec.DataPoint.Timestamp = timestamppb.New(t)
+	return b
+}
+
+func (b *MeasureWriteRequestBuilder) Build() *measurev1.WriteRequest {
+	return b.ec
+}
+
+func getField(field interface{}) *modelv1.FieldValue {
+	if field == nil {
+		return &modelv1.FieldValue{
+			Value: &modelv1.FieldValue_Null{},
+		}
+	}
+	switch t := field.(type) {
+	case int8:
+		return &modelv1.FieldValue{
+			Value: &modelv1.FieldValue_Int{
+				Int: &modelv1.Int{
+					Value: int64(t),
+				},
+			},
+		}
+	case int32:
+		return &modelv1.FieldValue{
+			Value: &modelv1.FieldValue_Int{
+				Int: &modelv1.Int{
+					Value: int64(t),
+				},
+			},
+		}
+	case int64:
+		return &modelv1.FieldValue{
+			Value: &modelv1.FieldValue_Int{
+				Int: &modelv1.Int{
+					Value: t,
+				},
+			},
+		}
+	case int:
+		return &modelv1.FieldValue{
+			Value: &modelv1.FieldValue_Int{
+				Int: &modelv1.Int{
+					Value: int64(t),
+				},
+			},
+		}
+	case string:
+		return &modelv1.FieldValue{
+			Value: &modelv1.FieldValue_Str{
+				Str: &modelv1.Str{
+					Value: t,
+				},
+			},
+		}
+	case []byte:
+		return &modelv1.FieldValue{
+			Value: &modelv1.FieldValue_BinaryData{
+				BinaryData: t,
+			},
+		}
 	}
 	return nil
 }