You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ha...@apache.org on 2021/06/29 04:09:48 UTC

[skywalking-banyandb] 11/11: Fix some test issues

This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch feat/liasion
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit e1a5044da1a0a316dd70538930aee0f6c607cf02
Author: Gao Hongtao <ha...@gmail.com>
AuthorDate: Tue Jun 29 12:05:22 2021 +0800

    Fix some test issues
    
    Signed-off-by: Gao Hongtao <ha...@gmail.com>
---
 banyand/liaison/grpc/grpc.go      | 14 +++++++-------
 banyand/liaison/grpc/grpc_test.go | 35 ++++++++++++++++++++---------------
 2 files changed, 27 insertions(+), 22 deletions(-)

diff --git a/banyand/liaison/grpc/grpc.go b/banyand/liaison/grpc/grpc.go
index e84f599..58092a1 100644
--- a/banyand/liaison/grpc/grpc.go
+++ b/banyand/liaison/grpc/grpc.go
@@ -20,14 +20,15 @@ package grpc
 import (
 	"context"
 	"fmt"
+	"io"
+	"net"
+
 	v1 "github.com/apache/skywalking-banyandb/api/fbs/v1"
 	"github.com/apache/skywalking-banyandb/banyand/queue"
 	"github.com/apache/skywalking-banyandb/pkg/logger"
 	"github.com/apache/skywalking-banyandb/pkg/run"
 	flatbuffers "github.com/google/flatbuffers/go"
 	grpclib "google.golang.org/grpc"
-	"io"
-	"net"
 )
 
 type Server struct {
@@ -54,7 +55,7 @@ func (s *Server) FlagSet() *run.FlagSet {
 func (s *Server) Validate() error {
 	return nil
 }
-func init(){
+func init() {
 	//encoding.RegisterCodec(flatbuffers.FlatbuffersCodec{})
 }
 func (s *Server) Serve() error {
@@ -64,7 +65,6 @@ func (s *Server) Serve() error {
 		s.log.Fatal("Failed to listen", logger.Error(err))
 	}
 
-
 	s.ser = grpclib.NewServer(grpclib.CustomCodec(flatbuffers.FlatbuffersCodec{}))
 	//s.ser = grpclib.NewServer()
 
@@ -88,19 +88,19 @@ type TraceServer struct {
 func (t *TraceServer) Write(TraceWriteServer v1.Trace_WriteServer) error {
 	for {
 		writeEntity, err := TraceWriteServer.Recv()
-		fmt.Println(123, writeEntity)
 		if err == io.EOF {
 			return nil
 		}
 		if err != nil {
 			return err
 		}
+		fmt.Println(123, writeEntity)
 		t.writeData = append(t.writeData, writeEntity)
 		builder := flatbuffers.NewBuilder(0)
 		v1.WriteResponseStart(builder)
 		builder.Finish(v1.WriteResponseEnd(builder))
-		if error := TraceWriteServer.Send(builder); error != nil {
-			return error
+		if errSend := TraceWriteServer.Send(builder); errSend != nil {
+			return errSend
 		}
 		//writeEntity.Entity().Fields()
 		//writeEntity.MetaData(nil).Group()
diff --git a/banyand/liaison/grpc/grpc_test.go b/banyand/liaison/grpc/grpc_test.go
index a4c2ee4..7f865ee 100644
--- a/banyand/liaison/grpc/grpc_test.go
+++ b/banyand/liaison/grpc/grpc_test.go
@@ -19,14 +19,15 @@ package grpc
 
 import (
 	"context"
-	v1 "github.com/apache/skywalking-banyandb/api/fbs/v1"
-	"github.com/apache/skywalking-banyandb/pkg/convert"
-	flatbuffers "github.com/google/flatbuffers/go"
-	"google.golang.org/grpc"
 	"io"
 	"log"
 	"testing"
 	"time"
+
+	v1 "github.com/apache/skywalking-banyandb/api/fbs/v1"
+	"github.com/apache/skywalking-banyandb/pkg/convert"
+	flatbuffers "github.com/google/flatbuffers/go"
+	"google.golang.org/grpc"
 )
 
 var serverAddr = "localhost:17912"
@@ -35,6 +36,7 @@ type ComponentBuilderFunc func(*flatbuffers.Builder)
 type writeEntityBuilder struct {
 	*flatbuffers.Builder
 }
+
 func NewCriteriaBuilder() *writeEntityBuilder {
 	return &writeEntityBuilder{
 		flatbuffers.NewBuilder(1024),
@@ -77,7 +79,7 @@ func (b *writeEntityBuilder) BuildEntity(id string, binary []byte, items ...inte
 	}
 }
 
-func (b *writeEntityBuilder) BuildField(val interface{}) flatbuffers.UOffsetT  {
+func (b *writeEntityBuilder) BuildField(val interface{}) flatbuffers.UOffsetT {
 	var ValueTypeOffset flatbuffers.UOffsetT
 	var valType v1.ValueType
 	switch v := val.(type) {
@@ -159,7 +161,7 @@ func (b *writeEntityBuilder) Build(funcs ...ComponentBuilderFunc) *v1.WriteEntit
 	return v1.GetRootAsWriteEntity(buf, 0)
 }
 
-func runWrite (writeEntity *v1.WriteEntity) (*flatbuffers.Builder, error) {
+func runWrite(writeEntity *v1.WriteEntity) (*flatbuffers.Builder, error) {
 	builder := flatbuffers.NewBuilder(0)
 	metaData := writeEntity.MetaData(nil)
 	entityValue := writeEntity.Entity(nil)
@@ -270,7 +272,7 @@ func runWrite (writeEntity *v1.WriteEntity) (*flatbuffers.Builder, error) {
 	position := v1.WriteEntityEnd(builder)
 	builder.Finish(position)
 
-	return  builder, nil
+	return builder, nil
 }
 
 func Test_grpc_write(t *testing.T) {
@@ -281,7 +283,7 @@ func Test_grpc_write(t *testing.T) {
 	defer conn.Close()
 
 	client := v1.NewTraceClient(conn)
-	ctx, cancel := context.WithTimeout(context.Background(), 10 * time.Second)
+	ctx := context.Background()
 	b := NewCriteriaBuilder()
 	binary := byte(12)
 	entity := b.Build(
@@ -292,26 +294,29 @@ func Test_grpc_write(t *testing.T) {
 	if e != nil {
 		log.Fatalf("Failed to connect: %v", err)
 	}
-	defer cancel()
 	stream, er := client.Write(ctx)
 	if er != nil {
 		log.Fatalf("%v.runWrite(_) = _, %v", client, err)
 	}
+	waitc := make(chan struct{})
 	go func() {
 		for {
-			writeResponse, err := stream.Recv()
-			if err == io.EOF {
+			writeResponse, errRecv := stream.Recv()
+			if errRecv == io.EOF {
 				// read done.
+				close(waitc)
 				return
 			}
-			if err != nil {
+			if errRecv != nil {
 				log.Fatalf("Failed to receive data : %v", err)
 			}
-			println( writeResponse)
+			println(writeResponse)
 		}
 	}()
-	if error := stream.Send(builder); error != nil {
-		log.Fatalf("Failed to send a note: %v", err)
+	if errSend := stream.Send(builder); errSend != nil {
+		log.Fatalf("Failed to send a note: %v", errSend)
 	}
+
 	stream.CloseSend()
+	<-waitc
 }