You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ha...@apache.org on 2021/06/29 04:09:48 UTC
[skywalking-banyandb] 11/11: Fix some test issues
This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch feat/liasion
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit e1a5044da1a0a316dd70538930aee0f6c607cf02
Author: Gao Hongtao <ha...@gmail.com>
AuthorDate: Tue Jun 29 12:05:22 2021 +0800
Fix some test issues
Signed-off-by: Gao Hongtao <ha...@gmail.com>
---
banyand/liaison/grpc/grpc.go | 14 +++++++-------
banyand/liaison/grpc/grpc_test.go | 35 ++++++++++++++++++++---------------
2 files changed, 27 insertions(+), 22 deletions(-)
diff --git a/banyand/liaison/grpc/grpc.go b/banyand/liaison/grpc/grpc.go
index e84f599..58092a1 100644
--- a/banyand/liaison/grpc/grpc.go
+++ b/banyand/liaison/grpc/grpc.go
@@ -20,14 +20,15 @@ package grpc
import (
"context"
"fmt"
+ "io"
+ "net"
+
v1 "github.com/apache/skywalking-banyandb/api/fbs/v1"
"github.com/apache/skywalking-banyandb/banyand/queue"
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/run"
flatbuffers "github.com/google/flatbuffers/go"
grpclib "google.golang.org/grpc"
- "io"
- "net"
)
type Server struct {
@@ -54,7 +55,7 @@ func (s *Server) FlagSet() *run.FlagSet {
func (s *Server) Validate() error {
return nil
}
-func init(){
+func init() {
//encoding.RegisterCodec(flatbuffers.FlatbuffersCodec{})
}
func (s *Server) Serve() error {
@@ -64,7 +65,6 @@ func (s *Server) Serve() error {
s.log.Fatal("Failed to listen", logger.Error(err))
}
-
s.ser = grpclib.NewServer(grpclib.CustomCodec(flatbuffers.FlatbuffersCodec{}))
//s.ser = grpclib.NewServer()
@@ -88,19 +88,19 @@ type TraceServer struct {
func (t *TraceServer) Write(TraceWriteServer v1.Trace_WriteServer) error {
for {
writeEntity, err := TraceWriteServer.Recv()
- fmt.Println(123, writeEntity)
if err == io.EOF {
return nil
}
if err != nil {
return err
}
+ fmt.Println(123, writeEntity)
t.writeData = append(t.writeData, writeEntity)
builder := flatbuffers.NewBuilder(0)
v1.WriteResponseStart(builder)
builder.Finish(v1.WriteResponseEnd(builder))
- if error := TraceWriteServer.Send(builder); error != nil {
- return error
+ if errSend := TraceWriteServer.Send(builder); errSend != nil {
+ return errSend
}
//writeEntity.Entity().Fields()
//writeEntity.MetaData(nil).Group()
diff --git a/banyand/liaison/grpc/grpc_test.go b/banyand/liaison/grpc/grpc_test.go
index a4c2ee4..7f865ee 100644
--- a/banyand/liaison/grpc/grpc_test.go
+++ b/banyand/liaison/grpc/grpc_test.go
@@ -19,14 +19,15 @@ package grpc
import (
"context"
- v1 "github.com/apache/skywalking-banyandb/api/fbs/v1"
- "github.com/apache/skywalking-banyandb/pkg/convert"
- flatbuffers "github.com/google/flatbuffers/go"
- "google.golang.org/grpc"
"io"
"log"
"testing"
"time"
+
+ v1 "github.com/apache/skywalking-banyandb/api/fbs/v1"
+ "github.com/apache/skywalking-banyandb/pkg/convert"
+ flatbuffers "github.com/google/flatbuffers/go"
+ "google.golang.org/grpc"
)
var serverAddr = "localhost:17912"
@@ -35,6 +36,7 @@ type ComponentBuilderFunc func(*flatbuffers.Builder)
type writeEntityBuilder struct {
*flatbuffers.Builder
}
+
func NewCriteriaBuilder() *writeEntityBuilder {
return &writeEntityBuilder{
flatbuffers.NewBuilder(1024),
@@ -77,7 +79,7 @@ func (b *writeEntityBuilder) BuildEntity(id string, binary []byte, items ...inte
}
}
-func (b *writeEntityBuilder) BuildField(val interface{}) flatbuffers.UOffsetT {
+func (b *writeEntityBuilder) BuildField(val interface{}) flatbuffers.UOffsetT {
var ValueTypeOffset flatbuffers.UOffsetT
var valType v1.ValueType
switch v := val.(type) {
@@ -159,7 +161,7 @@ func (b *writeEntityBuilder) Build(funcs ...ComponentBuilderFunc) *v1.WriteEntit
return v1.GetRootAsWriteEntity(buf, 0)
}
-func runWrite (writeEntity *v1.WriteEntity) (*flatbuffers.Builder, error) {
+func runWrite(writeEntity *v1.WriteEntity) (*flatbuffers.Builder, error) {
builder := flatbuffers.NewBuilder(0)
metaData := writeEntity.MetaData(nil)
entityValue := writeEntity.Entity(nil)
@@ -270,7 +272,7 @@ func runWrite (writeEntity *v1.WriteEntity) (*flatbuffers.Builder, error) {
position := v1.WriteEntityEnd(builder)
builder.Finish(position)
- return builder, nil
+ return builder, nil
}
func Test_grpc_write(t *testing.T) {
@@ -281,7 +283,7 @@ func Test_grpc_write(t *testing.T) {
defer conn.Close()
client := v1.NewTraceClient(conn)
- ctx, cancel := context.WithTimeout(context.Background(), 10 * time.Second)
+ ctx := context.Background()
b := NewCriteriaBuilder()
binary := byte(12)
entity := b.Build(
@@ -292,26 +294,29 @@ func Test_grpc_write(t *testing.T) {
if e != nil {
log.Fatalf("Failed to connect: %v", err)
}
- defer cancel()
stream, er := client.Write(ctx)
if er != nil {
log.Fatalf("%v.runWrite(_) = _, %v", client, err)
}
+ waitc := make(chan struct{})
go func() {
for {
- writeResponse, err := stream.Recv()
- if err == io.EOF {
+ writeResponse, errRecv := stream.Recv()
+ if errRecv == io.EOF {
// read done.
+ close(waitc)
return
}
- if err != nil {
+ if errRecv != nil {
log.Fatalf("Failed to receive data : %v", err)
}
- println( writeResponse)
+ println(writeResponse)
}
}()
- if error := stream.Send(builder); error != nil {
- log.Fatalf("Failed to send a note: %v", err)
+ if errSend := stream.Send(builder); errSend != nil {
+ log.Fatalf("Failed to send a note: %v", errSend)
}
+
stream.CloseSend()
+ <-waitc
}