You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ha...@apache.org on 2021/06/29 04:09:47 UTC
[skywalking-banyandb] 10/11: update grpc test
This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch feat/liasion
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 45797cf2784cfafaef81f048ef4f99e61ad540a0
Author: Qiuxia Fan <fi...@outlook.com>
AuthorDate: Tue Jun 29 11:19:00 2021 +0800
update grpc test
---
banyand/liaison/grpc/grpc.go | 9 +++------
banyand/liaison/grpc/grpc_test.go | 9 ++-------
2 files changed, 5 insertions(+), 13 deletions(-)
diff --git a/banyand/liaison/grpc/grpc.go b/banyand/liaison/grpc/grpc.go
index 4156efb..e84f599 100644
--- a/banyand/liaison/grpc/grpc.go
+++ b/banyand/liaison/grpc/grpc.go
@@ -28,7 +28,6 @@ import (
grpclib "google.golang.org/grpc"
"io"
"net"
- "sync"
)
type Server struct {
@@ -36,7 +35,6 @@ type Server struct {
log *logger.Logger
ser *grpclib.Server
pipeline queue.Queue
- writeEntity *v1.WriteEntity
}
func NewServer(ctx context.Context, pipeline queue.Queue) *Server {
@@ -85,13 +83,12 @@ func (s *Server) GracefulStop() {
type TraceServer struct {
v1.UnimplementedTraceServer
writeData []*v1.WriteEntity
- mu sync.Mutex
}
func (t *TraceServer) Write(TraceWriteServer v1.Trace_WriteServer) error {
for {
writeEntity, err := TraceWriteServer.Recv()
- fmt.Println(writeEntity)
+ fmt.Println(123, writeEntity)
if err == io.EOF {
return nil
}
@@ -102,8 +99,8 @@ func (t *TraceServer) Write(TraceWriteServer v1.Trace_WriteServer) error {
builder := flatbuffers.NewBuilder(0)
v1.WriteResponseStart(builder)
builder.Finish(v1.WriteResponseEnd(builder))
- if err := TraceWriteServer.Send(builder); err != nil {
- return err
+ if error := TraceWriteServer.Send(builder); error != nil {
+ return error
}
//writeEntity.Entity().Fields()
//writeEntity.MetaData(nil).Group()
diff --git a/banyand/liaison/grpc/grpc_test.go b/banyand/liaison/grpc/grpc_test.go
index b7d1ef1..a4c2ee4 100644
--- a/banyand/liaison/grpc/grpc_test.go
+++ b/banyand/liaison/grpc/grpc_test.go
@@ -19,7 +19,6 @@ package grpc
import (
"context"
- "fmt"
v1 "github.com/apache/skywalking-banyandb/api/fbs/v1"
"github.com/apache/skywalking-banyandb/pkg/convert"
flatbuffers "github.com/google/flatbuffers/go"
@@ -282,14 +281,13 @@ func Test_grpc_write(t *testing.T) {
defer conn.Close()
client := v1.NewTraceClient(conn)
- ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
+ ctx, cancel := context.WithTimeout(context.Background(), 10 * time.Second)
b := NewCriteriaBuilder()
- binary := byte(123)
+ binary := byte(12)
entity := b.Build(
b.BuildEntity("entityId", []byte{binary}, "service_name", "endpoint_id"),
b.BuildMetaData("default", "trace"),
)
- fmt.Println(entity)
builder, e := runWrite(entity)
if e != nil {
log.Fatalf("Failed to connect: %v", err)
@@ -299,13 +297,11 @@ func Test_grpc_write(t *testing.T) {
if er != nil {
log.Fatalf("%v.runWrite(_) = _, %v", client, err)
}
- //waitc := make(chan struct{})
go func() {
for {
writeResponse, err := stream.Recv()
if err == io.EOF {
// read done.
- //close(waitc)
return
}
if err != nil {
@@ -318,5 +314,4 @@ func Test_grpc_write(t *testing.T) {
log.Fatalf("Failed to send a note: %v", err)
}
stream.CloseSend()
- //<-waitc
}