You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ha...@apache.org on 2021/06/29 04:09:37 UTC

[skywalking-banyandb] branch feat/liasion created (now e1a5044)

This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a change to branch feat/liasion
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git.


      at e1a5044  Fix some test issues

This branch includes the following new commits:

     new 1822062  fix typo
     new 4834d50  write traces
     new a12f830  fix: entity
     new 67d8a41  update server info
     new 14a7fcb  Serialize Fields
     new 5e377c9  read trace
     new 542e42e  feat: add test for grpc
     new dfddd49  fix runWrite
     new 02817dd  merge
     new 45797cf  update grpc test
     new e1a5044  Fix some test issues

The 11 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


[skywalking-banyandb] 08/11: fix runWrite

Posted by ha...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch feat/liasion
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit dfddd49875f8d4750b2f3fa061b12b231dfd0e57
Author: Qiuxia Fan <fi...@outlook.com>
AuthorDate: Mon Jun 28 18:08:55 2021 +0800

    fix runWrite
---
 api/fbs/Makefile                  |   2 +-
 api/fbs/v1/Trace_grpc.go          | 164 ++++++++++++++++++++
 banyand/liaison/grpc/grpc.go      | 169 +++++++--------------
 banyand/liaison/grpc/grpc_test.go | 305 ++++++++++++++++++++++++++++++++++----
 banyand/liaison/liaison.go        |   1 +
 banyand/series/service.go         |   8 +-
 pkg/convert/number.go             |   8 +
 7 files changed, 513 insertions(+), 144 deletions(-)

diff --git a/api/fbs/Makefile b/api/fbs/Makefile
index 660aae1..825c495 100644
--- a/api/fbs/Makefile
+++ b/api/fbs/Makefile
@@ -21,4 +21,4 @@ fbs := $(wildcard $(VERSION)/*.fbs)
 
 .PHONY: generate
 generate: $(fbs)
-	flatc --go --gen-onefile --go-namespace $(VERSION) -o $(VERSION) $^
+	flatc --go --gen-onefile --grpc --go-namespace $(VERSION) -o $(VERSION) $^
diff --git a/api/fbs/v1/Trace_grpc.go b/api/fbs/v1/Trace_grpc.go
new file mode 100644
index 0000000..7d2cb69
--- /dev/null
+++ b/api/fbs/v1/Trace_grpc.go
@@ -0,0 +1,164 @@
+//Generated by gRPC Go plugin
+//If you make any local changes, they will be lost
+//source: rpc
+
+package v1
+
+import (
+	context "context"
+
+	flatbuffers "github.com/google/flatbuffers/go"
+	grpc "google.golang.org/grpc"
+	"google.golang.org/grpc/codes"
+	"google.golang.org/grpc/status"
+)
+
+// Client API for Trace service
+type TraceClient interface {
+	Query(ctx context.Context, in *flatbuffers.Builder,
+		opts ...grpc.CallOption) (*QueryResponse, error)
+	Write(ctx context.Context,
+		opts ...grpc.CallOption) (Trace_WriteClient, error)
+}
+
+type traceClient struct {
+	cc grpc.ClientConnInterface
+}
+
+func NewTraceClient(cc grpc.ClientConnInterface) TraceClient {
+	return &traceClient{cc}
+}
+
+func (c *traceClient) Query(ctx context.Context, in *flatbuffers.Builder,
+	opts ...grpc.CallOption) (*QueryResponse, error) {
+	out := new(QueryResponse)
+	err := c.cc.Invoke(ctx, "/banyandb.v1.Trace/Query", in, out, opts...)
+	if err != nil {
+		return nil, err
+	}
+	return out, nil
+}
+
+func (c *traceClient) Write(ctx context.Context,
+	opts ...grpc.CallOption) (Trace_WriteClient, error) {
+	stream, err := c.cc.NewStream(ctx, &_Trace_serviceDesc.Streams[0], "/banyandb.v1.Trace/Write", opts...)
+	if err != nil {
+		return nil, err
+	}
+	x := &traceWriteClient{stream}
+	return x, nil
+}
+
+type Trace_WriteClient interface {
+	Send(*flatbuffers.Builder) error
+	Recv() (*WriteResponse, error)
+	grpc.ClientStream
+}
+
+type traceWriteClient struct {
+	grpc.ClientStream
+}
+
+func (x *traceWriteClient) Send(m *flatbuffers.Builder) error {
+	return x.ClientStream.SendMsg(m)
+}
+
+func (x *traceWriteClient) Recv() (*WriteResponse, error) {
+	m := new(WriteResponse)
+	if err := x.ClientStream.RecvMsg(m); err != nil {
+		return nil, err
+	}
+	return m, nil
+}
+
+// Server API for Trace service
+type TraceServer interface {
+	Query(context.Context, *EntityCriteria) (*flatbuffers.Builder, error)
+	Write(Trace_WriteServer) error
+	mustEmbedUnimplementedTraceServer()
+}
+
+type UnimplementedTraceServer struct {
+}
+
+func (UnimplementedTraceServer) Query(context.Context, *EntityCriteria) (*flatbuffers.Builder, error) {
+	return nil, status.Errorf(codes.Unimplemented, "method Query not implemented")
+}
+
+func (UnimplementedTraceServer) Write(Trace_WriteServer) error {
+	return status.Errorf(codes.Unimplemented, "method Write not implemented")
+}
+
+func (UnimplementedTraceServer) mustEmbedUnimplementedTraceServer() {}
+
+type UnsafeTraceServer interface {
+	mustEmbedUnimplementedTraceServer()
+}
+
+func RegisterTraceServer(s grpc.ServiceRegistrar, srv TraceServer) {
+	s.RegisterService(&_Trace_serviceDesc, srv)
+}
+
+func _Trace_Query_Handler(srv interface{}, ctx context.Context,
+	dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
+	in := new(EntityCriteria)
+	if err := dec(in); err != nil {
+		return nil, err
+	}
+	if interceptor == nil {
+		return srv.(TraceServer).Query(ctx, in)
+	}
+	info := &grpc.UnaryServerInfo{
+		Server:     srv,
+		FullMethod: "/banyandb.v1.Trace/Query",
+	}
+
+	handler := func(ctx context.Context, req interface{}) (interface{}, error) {
+		return srv.(TraceServer).Query(ctx, req.(*EntityCriteria))
+	}
+	return interceptor(ctx, in, info, handler)
+}
+func _Trace_Write_Handler(srv interface{}, stream grpc.ServerStream) error {
+	return srv.(TraceServer).Write(&traceWriteServer{stream})
+}
+
+type Trace_WriteServer interface {
+	Send(*flatbuffers.Builder) error
+	Recv() (*WriteEntity, error)
+	grpc.ServerStream
+}
+
+type traceWriteServer struct {
+	grpc.ServerStream
+}
+
+func (x *traceWriteServer) Send(m *flatbuffers.Builder) error {
+	return x.ServerStream.SendMsg(m)
+}
+
+func (x *traceWriteServer) Recv() (*WriteEntity, error) {
+	m := new(WriteEntity)
+	if err := x.ServerStream.RecvMsg(m); err != nil {
+		return nil, err
+	}
+	return m, nil
+}
+
+var _Trace_serviceDesc = grpc.ServiceDesc{
+	ServiceName: "banyandb.v1.Trace",
+	HandlerType: (*TraceServer)(nil),
+	Methods: []grpc.MethodDesc{
+		{
+			MethodName: "Query",
+			Handler:    _Trace_Query_Handler,
+		},
+	},
+	Streams: []grpc.StreamDesc{
+		{
+			StreamName:    "Write",
+			Handler:       _Trace_Write_Handler,
+			ServerStreams: true,
+			ClientStreams: true,
+		},
+	},
+}
diff --git a/banyand/liaison/grpc/grpc.go b/banyand/liaison/grpc/grpc.go
index 0e82346..4156efb 100644
--- a/banyand/liaison/grpc/grpc.go
+++ b/banyand/liaison/grpc/grpc.go
@@ -20,18 +20,15 @@ package grpc
 import (
 	"context"
 	"fmt"
-	"net"
-	"strconv"
-	"time"
-
-	flatbuffers "github.com/google/flatbuffers/go"
-	grpclib "google.golang.org/grpc"
-	"google.golang.org/grpc/encoding"
-
 	v1 "github.com/apache/skywalking-banyandb/api/fbs/v1"
 	"github.com/apache/skywalking-banyandb/banyand/queue"
 	"github.com/apache/skywalking-banyandb/pkg/logger"
 	"github.com/apache/skywalking-banyandb/pkg/run"
+	flatbuffers "github.com/google/flatbuffers/go"
+	grpclib "google.golang.org/grpc"
+	"io"
+	"net"
+	"sync"
 )
 
 type Server struct {
@@ -39,6 +36,7 @@ type Server struct {
 	log      *logger.Logger
 	ser      *grpclib.Server
 	pipeline queue.Queue
+	writeEntity *v1.WriteEntity
 }
 
 func NewServer(ctx context.Context, pipeline queue.Queue) *Server {
@@ -58,7 +56,9 @@ func (s *Server) FlagSet() *run.FlagSet {
 func (s *Server) Validate() error {
 	return nil
 }
-
+func init(){
+	//encoding.RegisterCodec(flatbuffers.FlatbuffersCodec{})
+}
 func (s *Server) Serve() error {
 	s.log = logger.GetLogger("grpc")
 	lis, err := net.Listen("tcp", s.addr)
@@ -66,8 +66,11 @@ func (s *Server) Serve() error {
 		s.log.Fatal("Failed to listen", logger.Error(err))
 	}
 
-	encoding.RegisterCodec(flatbuffers.FlatbuffersCodec{})
-	s.ser = grpclib.NewServer()
+
+	s.ser = grpclib.NewServer(grpclib.CustomCodec(flatbuffers.FlatbuffersCodec{}))
+	//s.ser = grpclib.NewServer()
+
+	v1.RegisterTraceServer(s.ser, &TraceServer{})
 
 	return s.ser.Serve(lis)
 }
@@ -77,113 +80,49 @@ func (s *Server) GracefulStop() {
 	s.ser.GracefulStop()
 }
 
-func WriteTraces(writeEntity *v1.WriteEntity) []byte {
-	fmt.Println("Write called...")
-	builder := flatbuffers.NewBuilder(0)
-	metaData := writeEntity.MetaData(nil)
-	entityValue := writeEntity.Entity(nil)
-	// Serialize MetaData
-	group, name := builder.CreateString(string(metaData.Group())), builder.CreateString(string(metaData.Name()))
-	v1.MetadataStart(builder)
-	v1.MetadataAddGroup(builder, group)
-	v1.MetadataAddName(builder, name)
-	v1.MetadataEnd(builder)
-	// Serialize Fields
-	v1.FieldStart(builder)
-	var fieldList []flatbuffers.UOffsetT
-	for i := 0; i < entityValue.FieldsLength(); i++ {
-		var f v1.Field
-		var str string
-		if ok := entityValue.Fields(&f, i); ok {
-			unionValueType := new(flatbuffers.Table)
-			if f.Value(unionValueType) {
-				valueType := f.ValueType()
-				if valueType == v1.ValueTypeString {
-					unionStr := new(v1.String)
-					unionStr.Init(unionValueType.Bytes, unionValueType.Pos)
-					v1.FieldAddValueType(builder, v1.ValueTypeString)
-					str = string(unionStr.Value())
-				} else if valueType == v1.ValueTypeInt {
-					unionInt := new(v1.Int)
-					unionInt.Init(unionValueType.Bytes, unionValueType.Pos)
-					v1.FieldAddValueType(builder, v1.ValueTypeInt)
-					field := flatbuffers.UOffsetT(unionInt.Value())
-					v1.FieldAddValue(builder, field)
-					fieldList = append(fieldList, field)
-				} else if valueType == v1.ValueTypeStringArray {
-					unionStrArray := new(v1.StringArray)
-					unionStrArray.Init(unionValueType.Bytes, unionValueType.Pos)
-					l := unionStrArray.ValueLength()
-					if l == 1 {
-						str += string(unionStrArray.Value(0))
-					} else {
-						str += "["
-						for j := 0; j < l; j++ {
-							str += string(unionStrArray.Value(j))
-						}
-						str += "]"
-					}
-				} else if valueType == v1.ValueTypeIntArray {
-					unionIntArray := new(v1.IntArray)
-					unionIntArray.Init(unionValueType.Bytes, unionValueType.Pos)
-					l := unionIntArray.ValueLength()
-					if l == 1 {
-						str += strconv.FormatInt(unionIntArray.Value(0), 10)
-					} else if l > 1{
-						str += "["
-						for j := 0; j < l; j++ {
-							str += strconv.FormatInt(unionIntArray.Value(j), 10)
-						}
-						str += "]"
-					}
-				}
-				if valueType == v1.ValueTypeIntArray || valueType == v1.ValueTypeStringArray || valueType == v1.ValueTypeString {
-					field := builder.CreateString(str)
-					v1.FieldAddValue(builder, field)
-					fieldList = append(fieldList, field)
-				}
-			}
-		}
-	}
-	v1.FieldEnd(builder)
-	// Serialize EntityValue
-	dataBinaryLength := entityValue.DataBinaryLength()
-	v1.EntityStartDataBinaryVector(builder, dataBinaryLength)
-	for i := dataBinaryLength; i >= 0; i-- {
-		builder.PrependByte(byte(i))
-	}
-	dataBinary := builder.EndVector(dataBinaryLength)
-	entityId := builder.CreateString(string(entityValue.EntityId()))
-	v1.EntityValueStart(builder)
-	v1.EntityValueAddEntityId(builder, entityId)
-	time := uint64(time.Now().UnixNano())
-	v1.EntityValueAddTimestampNanoseconds(builder, time)
-	v1.EntityValueAddDataBinary(builder, dataBinary)
-	v1.EntityValueStartFieldsVector(builder, len(fieldList))
-	for val := range fieldList {
-		builder.PrependUOffsetT(flatbuffers.UOffsetT(val))
-	}
-	fields := builder.EndVector(len(fieldList))
-	v1.EntityValueAddFields(builder, fields)
-	v1.EntityValueEnd(builder)
-	trace := v1.WriteEntityEnd(builder)
-
-	builder.Finish(trace)
+//var _ gomock.TestHelper = (*TraceServer)(nil)
 
-	return builder.Bytes[builder.Head():]
+type TraceServer struct {
+	v1.UnimplementedTraceServer
+	writeData []*v1.WriteEntity
+	mu         sync.Mutex
 }
 
-type ComponentBuilderFunc func(*flatbuffers.Builder)
-
-func ReadTraces(funcs ...ComponentBuilderFunc) *v1.Entity {
-	b := flatbuffers.NewBuilder(1024)
-	v1.EntityStart(b)
-	for _, fun := range funcs {
-		fun(b)
+func (t *TraceServer) Write(TraceWriteServer v1.Trace_WriteServer) error {
+	for {
+		writeEntity, err := TraceWriteServer.Recv()
+		fmt.Println(writeEntity)
+		if err == io.EOF {
+			return nil
+		}
+		if err != nil {
+			return err
+		}
+		t.writeData = append(t.writeData, writeEntity)
+		builder := flatbuffers.NewBuilder(0)
+		v1.WriteResponseStart(builder)
+		builder.Finish(v1.WriteResponseEnd(builder))
+		if err := TraceWriteServer.Send(builder); err != nil {
+			return err
+		}
+		//writeEntity.Entity().Fields()
+		//writeEntity.MetaData(nil).Group()
+		//serviceID+instanceID
+		//seriesID := hash(fieds, f1, f2)
+		//shardID := shardingFunc(seriesID, shardNum)
+		//queue
+		//for _, l := range t.writeData {
+		//	if err := TraceWriteServer.Send(l); err != nil {
+		//		return err
+		//	}
+		//}
 	}
-	entityOffset := v1.EntityEnd(b)
-	b.Finish(entityOffset)
+}
+
+func (t *TraceServer) Query(ctx context.Context, entityCriteria *v1.EntityCriteria) (*flatbuffers.Builder, error) {
+	b := flatbuffers.NewBuilder(0)
+	v1.EntityCriteriaStart(b)
+	b.Finish(v1.EntityCriteriaEnd(b))
 
-	buf := b.Bytes[b.Head():]
-	return v1.GetRootAsEntity(buf, 0)
+	return b, nil
 }
diff --git a/banyand/liaison/grpc/grpc_test.go b/banyand/liaison/grpc/grpc_test.go
index a5bf5ca..b7d1ef1 100644
--- a/banyand/liaison/grpc/grpc_test.go
+++ b/banyand/liaison/grpc/grpc_test.go
@@ -15,28 +15,140 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package grpc_test
+package grpc
 
 import (
-	flatbuffers "github.com/google/flatbuffers/go"
-
+	"context"
+	"fmt"
 	v1 "github.com/apache/skywalking-banyandb/api/fbs/v1"
-	"github.com/apache/skywalking-banyandb/banyand/liaison/grpc"
-	"github.com/stretchr/testify/assert"
+	"github.com/apache/skywalking-banyandb/pkg/convert"
+	flatbuffers "github.com/google/flatbuffers/go"
+	"google.golang.org/grpc"
+	"io"
+	"log"
 	"testing"
+	"time"
 )
 
+var serverAddr = "localhost:17912"
+
 type ComponentBuilderFunc func(*flatbuffers.Builder)
-type criteriaBuilder struct {
+type writeEntityBuilder struct {
 	*flatbuffers.Builder
 }
-func NewCriteriaBuilder() *criteriaBuilder {
-	return &criteriaBuilder{
+func NewCriteriaBuilder() *writeEntityBuilder {
+	return &writeEntityBuilder{
 		flatbuffers.NewBuilder(1024),
 	}
 }
+func (b *writeEntityBuilder) BuildMetaData(group, name string) ComponentBuilderFunc {
+	g, n := b.Builder.CreateString(group), b.Builder.CreateString(name)
+	v1.MetadataStart(b.Builder)
+	v1.MetadataAddGroup(b.Builder, g)
+	v1.MetadataAddName(b.Builder, n)
+	metadata := v1.MetadataEnd(b.Builder)
+	return func(b *flatbuffers.Builder) {
+		v1.WriteEntityAddMetaData(b, metadata)
+	}
+}
+
+func (b *writeEntityBuilder) BuildEntity(id string, binary []byte, items ...interface{}) ComponentBuilderFunc {
+	entityId := b.Builder.CreateString(id)
+	binaryOffset := b.Builder.CreateByteVector(binary)
+	l := len(items)
+	var fieldOffsets []flatbuffers.UOffsetT
+	for i := 0; i < l; i++ {
+		o := b.BuildField(items[i])
+		fieldOffsets = append(fieldOffsets, o)
+	}
+	v1.EntityValueStartFieldsVector(b.Builder, len(fieldOffsets))
+	for i := 0; i < len(fieldOffsets); i++ {
+		b.PrependUOffsetT(fieldOffsets[i])
+	}
+	fields := b.EndVector(len(fieldOffsets))
+	v1.EntityValueStart(b.Builder)
+	v1.EntityValueAddEntityId(b.Builder, entityId)
+	t := uint64(time.Now().UnixNano())
+	v1.EntityValueAddTimestampNanoseconds(b.Builder, t)
+	v1.EntityValueAddDataBinary(b.Builder, binaryOffset)
+	v1.EntityValueAddFields(b.Builder, fields)
+	entity := v1.EntityValueEnd(b.Builder)
+	return func(b *flatbuffers.Builder) {
+		v1.WriteEntityAddEntity(b, entity)
+	}
+}
+
+func (b *writeEntityBuilder) BuildField(val interface{}) flatbuffers.UOffsetT  {
+	var ValueTypeOffset flatbuffers.UOffsetT
+	var valType v1.ValueType
+	switch v := val.(type) {
+	case int:
+		ValueTypeOffset = b.buildInt(int64(v))
+		valType = v1.ValueTypeInt
+	case []int:
+		ValueTypeOffset = b.buildInt(convert.IntToInt64(v...)...)
+		valType = v1.ValueTypeIntArray
+	case int64:
+		ValueTypeOffset = b.buildInt(v)
+		valType = v1.ValueTypeInt
+	case []int64:
+		ValueTypeOffset = b.buildInt(v...)
+		valType = v1.ValueTypeIntArray
+	case string:
+		ValueTypeOffset = b.buildStrValueType(v)
+		valType = v1.ValueTypeString
+	case []string:
+		ValueTypeOffset = b.buildStrValueType(v...)
+		valType = v1.ValueTypeStringArray
+	default:
+		panic("not supported values")
+	}
+
+	v1.FieldStart(b.Builder)
+	v1.FieldAddValue(b.Builder, ValueTypeOffset)
+	v1.FieldAddValueType(b.Builder, valType)
+	return v1.FieldEnd(b.Builder)
+}
+
+func (b *writeEntityBuilder) buildStrValueType(values ...string) flatbuffers.UOffsetT {
+	var strOffsets []flatbuffers.UOffsetT
+	for i := 0; i < len(values); i++ {
+		strOffsets = append(strOffsets, b.CreateString(values[i]))
+	}
+	v1.StringArrayStartValueVector(b.Builder, len(values))
+	for i := 0; i < len(strOffsets); i++ {
+		b.Builder.PrependUOffsetT(strOffsets[i])
+	}
+	int64Arr := b.Builder.EndVector(len(values))
+	v1.IntArrayStart(b.Builder)
+	v1.IntArrayAddValue(b.Builder, int64Arr)
+	return v1.IntArrayEnd(b.Builder)
+}
+
+func (b *writeEntityBuilder) buildInt(values ...int64) flatbuffers.UOffsetT {
+	v1.IntArrayStartValueVector(b.Builder, len(values))
+	for i := 0; i < len(values); i++ {
+		b.Builder.PrependInt64(values[i])
+	}
+	int64Arr := b.Builder.EndVector(len(values))
 
-func (b *criteriaBuilder) Build(funcs ...ComponentBuilderFunc) *v1.WriteEntity {
+	v1.IntArrayStart(b.Builder)
+	v1.IntArrayAddValue(b.Builder, int64Arr)
+	return v1.IntArrayEnd(b.Builder)
+}
+
+func (b *writeEntityBuilder) BuildDataBinary(binary []byte) flatbuffers.UOffsetT {
+	dataBinaryLength := len(binary)
+	v1.EntityStartDataBinaryVector(b.Builder, dataBinaryLength)
+	for i := dataBinaryLength; i >= 0; i-- {
+		b.Builder.PrependByte(byte(i))
+	}
+	dataBinaryOffset := b.Builder.EndVector(dataBinaryLength)
+
+	return dataBinaryOffset
+}
+
+func (b *writeEntityBuilder) Build(funcs ...ComponentBuilderFunc) *v1.WriteEntity {
 	v1.WriteEntityStart(b.Builder)
 	for _, fun := range funcs {
 		fun(b.Builder)
@@ -47,21 +159,164 @@ func (b *criteriaBuilder) Build(funcs ...ComponentBuilderFunc) *v1.WriteEntity {
 	buf := b.Bytes[b.Head():]
 	return v1.GetRootAsWriteEntity(buf, 0)
 }
-func BenchmarkWriteTraces(t *testing.T) {
-	tester := assert.New(t)
-	builder := NewCriteriaBuilder()
-	entity := builder.Build(
-	)
-	res := grpc.WriteTraces(entity)
-	//tester.NoError(err)
-	tester.NotNil(res)
-	//tester.NoError(plan.Validate())
+
+func runWrite (writeEntity *v1.WriteEntity) (*flatbuffers.Builder, error) {
+	builder := flatbuffers.NewBuilder(0)
+	metaData := writeEntity.MetaData(nil)
+	entityValue := writeEntity.Entity(nil)
+	// Serialize MetaData
+	group, name := builder.CreateString(string(metaData.Group())), builder.CreateString(string(metaData.Name()))
+	v1.MetadataStart(builder)
+	v1.MetadataAddGroup(builder, group)
+	v1.MetadataAddName(builder, name)
+	v1.MetadataEnd(builder)
+	// Serialize Fields
+	var fieldList []flatbuffers.UOffsetT
+	for i := 0; i < 1; i++ {
+		var field v1.Field
+		var str string
+		if ok := entityValue.Fields(&field, i); ok {
+			unionValueType := new(flatbuffers.Table)
+			if field.Value(unionValueType) {
+				valueType := field.ValueType()
+				if valueType == v1.ValueTypeString {
+					unionStr := new(v1.String)
+					unionStr.Init(unionValueType.Bytes, unionValueType.Pos)
+					v1.FieldStart(builder)
+					v1.FieldAddValueType(builder, v1.ValueTypeString)
+					v1.FieldEnd(builder)
+					str = string(unionStr.Value())
+					f := builder.CreateString(str)
+					fieldList = append(fieldList, f)
+				} else if valueType == v1.ValueTypeInt {
+					unionInt := new(v1.Int)
+					unionInt.Init(unionValueType.Bytes, unionValueType.Pos)
+					v1.FieldStart(builder)
+					v1.FieldAddValueType(builder, v1.ValueTypeInt)
+					v1.FieldEnd(builder)
+					f := flatbuffers.UOffsetT(unionInt.Value())
+					//v1.IntAddValue(builder, int64(f))
+					fieldList = append(fieldList, f)
+				} else if valueType == v1.ValueTypeStringArray {
+					unionStrArray := new(v1.StringArray)
+					unionStrArray.Init(unionValueType.Bytes, unionValueType.Pos)
+					v1.FieldStart(builder)
+					v1.FieldAddValueType(builder, v1.ValueTypeStringArray)
+					v1.FieldEnd(builder)
+					l := unionStrArray.ValueLength()
+					var offsets []flatbuffers.UOffsetT
+					for j := 0; j < l; j++ {
+						v := builder.CreateString(string(unionStrArray.Value(j)))
+						v1.StringArrayStart(builder)
+						v1.StringArrayAddValue(builder, v)
+						offset := v1.StringArrayEnd(builder)
+						offsets = append(offsets, offset)
+					}
+					v1.StringArrayStartValueVector(builder, l)
+					for o := range offsets {
+						builder.PrependUOffsetT(flatbuffers.UOffsetT(o))
+					}
+					f := builder.EndVector(l)
+					fieldList = append(fieldList, f)
+				} else if valueType == v1.ValueTypeIntArray {
+					unionIntArray := new(v1.IntArray)
+					unionIntArray.Init(unionValueType.Bytes, unionValueType.Pos)
+					v1.FieldStart(builder)
+					v1.FieldAddValueType(builder, v1.ValueTypeIntArray)
+					v1.FieldEnd(builder)
+					l := unionIntArray.ValueLength()
+					var offsets []flatbuffers.UOffsetT
+					for j := 0; j < l; j++ {
+						v1.IntArrayStart(builder)
+						v1.IntArrayAddValue(builder, flatbuffers.UOffsetT(unionIntArray.Value(j)))
+						offset := v1.StringArrayEnd(builder)
+						offsets = append(offsets, offset)
+					}
+					v1.IntArrayStartValueVector(builder, len(offsets))
+					for o := range offsets {
+						builder.PrependUOffsetT(flatbuffers.UOffsetT(o))
+					}
+					f := builder.EndVector(len(offsets))
+					fieldList = append(fieldList, f)
+				}
+			}
+		}
+	}
+	v1.FieldStart(builder)
+	for field := range fieldList {
+		v1.FieldAddValue(builder, flatbuffers.UOffsetT(field))
+	}
+	v1.FieldEnd(builder)
+	// Serialize EntityValue
+	dataBinaryLength := 10
+	v1.EntityStartDataBinaryVector(builder, dataBinaryLength)
+	for i := dataBinaryLength; i >= 0; i-- {
+		builder.PrependByte(byte(i))
+	}
+	dataBinaryP := builder.EndVector(dataBinaryLength)
+	v1.EntityValueStartFieldsVector(builder, len(fieldList))
+	for val := range fieldList {
+		builder.PrependUOffsetT(flatbuffers.UOffsetT(val))
+	}
+	fieldsP := builder.EndVector(len(fieldList))
+	entityId := builder.CreateString(string(entityValue.EntityId()))
+	v1.EntityValueStart(builder)
+	v1.EntityValueAddEntityId(builder, entityId)
+	time := uint64(time.Now().UnixNano())
+	v1.EntityValueAddTimestampNanoseconds(builder, time)
+	v1.EntityValueAddDataBinary(builder, dataBinaryP)
+	v1.EntityValueAddFields(builder, fieldsP)
+	v1.EntityValueEnd(builder)
+	v1.WriteEntityStart(builder)
+	position := v1.WriteEntityEnd(builder)
+	builder.Finish(position)
+
+	return  builder, nil
 }
 
-func BenchmarkReadTraces(t *testing.T) {
-	tester := assert.New(t)
-	builder := NewCriteriaBuilder()
-	entity := builder.Build()
-	res := grpc.ReadTraces(entity)
-	tester.NotNil(res)
-}
\ No newline at end of file
+func Test_grpc_write(t *testing.T) {
+	conn, err := grpc.Dial(serverAddr, grpc.WithInsecure(), grpc.WithDefaultCallOptions(grpc.CustomCodecCallOption{Codec: flatbuffers.FlatbuffersCodec{}}))
+	if err != nil {
+		log.Fatalf("Failed to connect: %v", err)
+	}
+	defer conn.Close()
+
+	client := v1.NewTraceClient(conn)
+	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
+	b := NewCriteriaBuilder()
+	binary := byte(123)
+	entity := b.Build(
+		b.BuildEntity("entityId", []byte{binary}, "service_name", "endpoint_id"),
+		b.BuildMetaData("default", "trace"),
+	)
+	fmt.Println(entity)
+	builder, e := runWrite(entity)
+	if e != nil {
+		log.Fatalf("Failed to connect: %v", err)
+	}
+	defer cancel()
+	stream, er := client.Write(ctx)
+	if er != nil {
+		log.Fatalf("%v.runWrite(_) = _, %v", client, err)
+	}
+	//waitc := make(chan struct{})
+	go func() {
+		for {
+			writeResponse, err := stream.Recv()
+			if err == io.EOF {
+				// read done.
+				//close(waitc)
+				return
+			}
+			if err != nil {
+				log.Fatalf("Failed to receive data : %v", err)
+			}
+			println( writeResponse)
+		}
+	}()
+	if error := stream.Send(builder); error != nil {
+		log.Fatalf("Failed to send a note: %v", err)
+	}
+	stream.CloseSend()
+	//<-waitc
+}
diff --git a/banyand/liaison/liaison.go b/banyand/liaison/liaison.go
index 76e8d14..117b361 100644
--- a/banyand/liaison/liaison.go
+++ b/banyand/liaison/liaison.go
@@ -19,6 +19,7 @@ package liaison
 
 import (
 	"context"
+
 	"github.com/apache/skywalking-banyandb/banyand/liaison/grpc"
 	"github.com/apache/skywalking-banyandb/banyand/queue"
 	"github.com/apache/skywalking-banyandb/pkg/run"
diff --git a/banyand/series/service.go b/banyand/series/service.go
index a62c7d0..beb3361 100644
--- a/banyand/series/service.go
+++ b/banyand/series/service.go
@@ -20,6 +20,10 @@ package series
 import (
 	"bytes"
 	"context"
+	"time"
+
+	"go.uber.org/multierr"
+
 	"github.com/apache/skywalking-banyandb/api/common"
 	"github.com/apache/skywalking-banyandb/api/data"
 	v1 "github.com/apache/skywalking-banyandb/api/fbs/v1"
@@ -27,14 +31,12 @@ import (
 	"github.com/apache/skywalking-banyandb/banyand/series/schema/sw"
 	"github.com/apache/skywalking-banyandb/banyand/storage"
 	"github.com/apache/skywalking-banyandb/pkg/run"
-	"go.uber.org/multierr"
-	"time"
 )
 
 var _ Service = (*service)(nil)
 
 type service struct {
-	db storage.Database
+	db   storage.Database
 	addr string
 }
 
diff --git a/pkg/convert/number.go b/pkg/convert/number.go
index 9656912..0dbc4ca 100644
--- a/pkg/convert/number.go
+++ b/pkg/convert/number.go
@@ -34,3 +34,11 @@ func Uint32ToBytes(u uint32) []byte {
 func BytesToUint64(b []byte) uint64 {
 	return binary.BigEndian.Uint64(b)
 }
+
+func IntToInt64(numbers ...int) []int64 {
+	var arr []int64
+	for i := 0; i < len(numbers); i++ {
+		arr = append(arr, int64(numbers[i]))
+	}
+	return arr
+}
\ No newline at end of file

[skywalking-banyandb] 07/11: feat: add test for grpc

Posted by ha...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch feat/liasion
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit 542e42eb482dc01e6be8f8d1f4f8d2e8c14e2e87
Author: Qiuxia Fan <fi...@outlook.com>
AuthorDate: Tue Jun 22 14:08:52 2021 +0800

    feat: add test for grpc
---
 banyand/liaison/grpc/grpc.go      |  4 +++-
 banyand/liaison/grpc/grpc_test.go | 49 +++++++++++++++++++++++++++++++++++++++
 2 files changed, 52 insertions(+), 1 deletion(-)

diff --git a/banyand/liaison/grpc/grpc.go b/banyand/liaison/grpc/grpc.go
index 92dcc48..0e82346 100644
--- a/banyand/liaison/grpc/grpc.go
+++ b/banyand/liaison/grpc/grpc.go
@@ -77,9 +77,11 @@ func (s *Server) GracefulStop() {
 	s.ser.GracefulStop()
 }
 
-func WriteTraces(entityValue *v1.EntityValue, metaData *v1.Metadata) []byte {
+func WriteTraces(writeEntity *v1.WriteEntity) []byte {
 	fmt.Println("Write called...")
 	builder := flatbuffers.NewBuilder(0)
+	metaData := writeEntity.MetaData(nil)
+	entityValue := writeEntity.Entity(nil)
 	// Serialize MetaData
 	group, name := builder.CreateString(string(metaData.Group())), builder.CreateString(string(metaData.Name()))
 	v1.MetadataStart(builder)
diff --git a/banyand/liaison/grpc/grpc_test.go b/banyand/liaison/grpc/grpc_test.go
index 761af64..a5bf5ca 100644
--- a/banyand/liaison/grpc/grpc_test.go
+++ b/banyand/liaison/grpc/grpc_test.go
@@ -16,3 +16,52 @@
 // under the License.
 
 package grpc_test
+
+import (
+	flatbuffers "github.com/google/flatbuffers/go"
+
+	v1 "github.com/apache/skywalking-banyandb/api/fbs/v1"
+	"github.com/apache/skywalking-banyandb/banyand/liaison/grpc"
+	"github.com/stretchr/testify/assert"
+	"testing"
+)
+
+type ComponentBuilderFunc func(*flatbuffers.Builder)
+type criteriaBuilder struct {
+	*flatbuffers.Builder
+}
+func NewCriteriaBuilder() *criteriaBuilder {
+	return &criteriaBuilder{
+		flatbuffers.NewBuilder(1024),
+	}
+}
+
+func (b *criteriaBuilder) Build(funcs ...ComponentBuilderFunc) *v1.WriteEntity {
+	v1.WriteEntityStart(b.Builder)
+	for _, fun := range funcs {
+		fun(b.Builder)
+	}
+	entityOffset := v1.WriteEntityEnd(b.Builder)
+	b.Builder.Finish(entityOffset)
+
+	buf := b.Bytes[b.Head():]
+	return v1.GetRootAsWriteEntity(buf, 0)
+}
+func BenchmarkWriteTraces(t *testing.T) {
+	tester := assert.New(t)
+	builder := NewCriteriaBuilder()
+	entity := builder.Build(
+	)
+	res := grpc.WriteTraces(entity)
+	//tester.NoError(err)
+	tester.NotNil(res)
+	//tester.NoError(plan.Validate())
+}
+
+func BenchmarkReadTraces(t *testing.T) {
+	tester := assert.New(t)
+	builder := NewCriteriaBuilder()
+	entity := builder.Build()
+	res := grpc.ReadTraces(entity)
+	tester.NotNil(res)
+}
\ No newline at end of file

[skywalking-banyandb] 02/11: write traces

Posted by ha...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch feat/liasion
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit 4834d500998e24fa3bc92328e0c722edaf01b7f4
Author: Qiuxia Fan <fi...@outlook.com>
AuthorDate: Thu Jun 17 21:46:41 2021 +0800

    write traces
---
 api/fbs/v1/write.fbs         |  2 ++
 banyand/liaison/grpc/grpc.go | 47 ++++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 49 insertions(+)

diff --git a/api/fbs/v1/write.fbs b/api/fbs/v1/write.fbs
index 57e1378..794bbbd 100644
--- a/api/fbs/v1/write.fbs
+++ b/api/fbs/v1/write.fbs
@@ -70,3 +70,5 @@ table WriteEntity {
 }
 
 table WriteResponse {}
+
+root_type WriteEntity;
diff --git a/banyand/liaison/grpc/grpc.go b/banyand/liaison/grpc/grpc.go
index cdf7e71..ae3457d 100644
--- a/banyand/liaison/grpc/grpc.go
+++ b/banyand/liaison/grpc/grpc.go
@@ -25,6 +25,7 @@ import (
 	grpclib "google.golang.org/grpc"
 	"google.golang.org/grpc/encoding"
 
+	v1 "github.com/apache/skywalking-banyandb/api/fbs/v1"
 	"github.com/apache/skywalking-banyandb/banyand/queue"
 	"github.com/apache/skywalking-banyandb/pkg/logger"
 	"github.com/apache/skywalking-banyandb/pkg/run"
@@ -72,3 +73,49 @@ func (s *Server) GracefulStop() {
 	s.log.Info("stopping")
 	s.ser.GracefulStop()
 }
+
+func (s *Server) WriteTraces(context context.Context, in *v1.WriteEntity) (*flatbuffers.Builder, error) {
+	s.log.Info("Write called...")
+	builder := flatbuffers.NewBuilder(0)
+	// Serialize MetaData
+	name := builder.CreateString("name")
+	group := builder.CreateString("group")
+	v1.MetadataStart(builder)
+	v1.MetadataAddGroup(builder, group)
+	v1.MetadataAddName(builder, name)
+	v1.MetadataEnd(builder)
+	// Serialize Field
+	v1.FieldStart(builder)
+	v1.FieldAddValueType(builder, v1.ValueTypeString)
+	field := builder.CreateString("test")
+	v1.FieldAddValue(builder, field)
+	v1.FieldEnd(builder)
+	// Serialize WriteEntity
+	dataBinaryLength := v1.EntityValue.DataBinaryLength(v1.EntityValue{})
+	v1.EntityStartDataBinaryVector(builder, dataBinaryLength)
+	for i := dataBinaryLength; i >= 0; i-- {
+		builder.PrependByte(byte(i))
+	}
+	dataBinary := builder.EndVector(dataBinaryLength)
+	entityId := builder.CreateString("id")
+	v1.EntityValueStart(builder)
+	v1.EntityValueAddEntityId(builder, entityId)
+	v1.EntityValueAddTimestampNanoseconds(builder, 1613526708000)
+	v1.EntityValueAddDataBinary(builder, dataBinary)
+	v1.EntityValueStartFieldsVector(builder, 1)
+	builder.PrependUOffsetT(field)
+	fields := builder.EndVector(1)
+	v1.EntityValueAddFields(builder, fields)
+	v1.EntityValueEnd(builder)
+
+	builder.Finish(v1.WriteResponseEnd(builder))
+
+	return builder, nil
+}
+
+func (s *Server) FetchTraces(ctx context.Context, in *v1.EntityCriteria) (*flatbuffers.Builder, error) {
+	s.log.Info("Fetch called...")
+	builder := flatbuffers.NewBuilder(0)
+
+	return builder, nil
+}

[skywalking-banyandb] 03/11: fix: entity

Posted by ha...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch feat/liasion
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit a12f830e39e07e6653528db7b6bbd52a66eabde0
Author: Qiuxia Fan <fi...@outlook.com>
AuthorDate: Fri Jun 18 15:36:06 2021 +0800

    fix: entity
---
 api/fbs/v1/write.fbs         |  2 --
 banyand/liaison/grpc/grpc.go | 30 ++++++++++++++++++++----------
 2 files changed, 20 insertions(+), 12 deletions(-)

diff --git a/api/fbs/v1/write.fbs b/api/fbs/v1/write.fbs
index 794bbbd..57e1378 100644
--- a/api/fbs/v1/write.fbs
+++ b/api/fbs/v1/write.fbs
@@ -70,5 +70,3 @@ table WriteEntity {
 }
 
 table WriteResponse {}
-
-root_type WriteEntity;
diff --git a/banyand/liaison/grpc/grpc.go b/banyand/liaison/grpc/grpc.go
index ae3457d..c9b804d 100644
--- a/banyand/liaison/grpc/grpc.go
+++ b/banyand/liaison/grpc/grpc.go
@@ -19,11 +19,12 @@ package grpc
 
 import (
 	"context"
-	"net"
-
+	"fmt"
 	flatbuffers "github.com/google/flatbuffers/go"
 	grpclib "google.golang.org/grpc"
 	"google.golang.org/grpc/encoding"
+	"net"
+	"time"
 
 	v1 "github.com/apache/skywalking-banyandb/api/fbs/v1"
 	"github.com/apache/skywalking-banyandb/banyand/queue"
@@ -74,48 +75,57 @@ func (s *Server) GracefulStop() {
 	s.ser.GracefulStop()
 }
 
-func (s *Server) WriteTraces(context context.Context, in *v1.WriteEntity) (*flatbuffers.Builder, error) {
+func (s *Server) WriteTraces(entityValue *v1.EntityValue, metaData *v1.Metadata) (*flatbuffers.Builder, error) {
 	s.log.Info("Write called...")
 	builder := flatbuffers.NewBuilder(0)
 	// Serialize MetaData
-	name := builder.CreateString("name")
-	group := builder.CreateString("group")
+	group, name := builder.CreateString(string(metaData.Group())), builder.CreateString(string(metaData.Name()))
 	v1.MetadataStart(builder)
 	v1.MetadataAddGroup(builder, group)
 	v1.MetadataAddName(builder, name)
 	v1.MetadataEnd(builder)
 	// Serialize Field
 	v1.FieldStart(builder)
+	//for i := 0; i < entityValue.FieldsLength(); i++ {
+	//	v1.FieldAddValueType(builder, v1.ValueTypeString)
+	//	field := builder.CreateString("test")
+	//	v1.FieldAddValue(builder, field)
+	//}
 	v1.FieldAddValueType(builder, v1.ValueTypeString)
 	field := builder.CreateString("test")
+	fmt.Println(field)
 	v1.FieldAddValue(builder, field)
 	v1.FieldEnd(builder)
 	// Serialize WriteEntity
-	dataBinaryLength := v1.EntityValue.DataBinaryLength(v1.EntityValue{})
+	dataBinaryLength := entityValue.DataBinaryLength()
 	v1.EntityStartDataBinaryVector(builder, dataBinaryLength)
 	for i := dataBinaryLength; i >= 0; i-- {
 		builder.PrependByte(byte(i))
 	}
 	dataBinary := builder.EndVector(dataBinaryLength)
-	entityId := builder.CreateString("id")
+	entityId := builder.CreateString(string(entityValue.EntityId()))
 	v1.EntityValueStart(builder)
 	v1.EntityValueAddEntityId(builder, entityId)
-	v1.EntityValueAddTimestampNanoseconds(builder, 1613526708000)
+	time := uint64(time.Now().UnixNano())
+	v1.EntityValueAddTimestampNanoseconds(builder, time)
 	v1.EntityValueAddDataBinary(builder, dataBinary)
 	v1.EntityValueStartFieldsVector(builder, 1)
 	builder.PrependUOffsetT(field)
 	fields := builder.EndVector(1)
 	v1.EntityValueAddFields(builder, fields)
 	v1.EntityValueEnd(builder)
+	trace := v1.WriteEntityEnd(builder)
 
-	builder.Finish(v1.WriteResponseEnd(builder))
+	builder.Finish(trace)
 
 	return builder, nil
 }
 
-func (s *Server) FetchTraces(ctx context.Context, in *v1.EntityCriteria) (*flatbuffers.Builder, error) {
+func (s *Server) FetchTraces(in *v1.EntityCriteria) (*flatbuffers.Builder, error) {
 	s.log.Info("Fetch called...")
 	builder := flatbuffers.NewBuilder(0)
+	//buf := builder.FinishedBytes()
+	//entityCriteria := v1.GetRootAsEntityCriteria(buf, 0)
 
 	return builder, nil
 }

[skywalking-banyandb] 04/11: update server info

Posted by ha...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch feat/liasion
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit 67d8a41ee7a80ad4adfca6852f2c245e5a3ccdb3
Author: Qiuxia Fan <fi...@outlook.com>
AuthorDate: Fri Jun 18 16:56:18 2021 +0800

    update server info
---
 banyand/liaison/grpc/grpc.go |  5 +++--
 banyand/series/service.go    | 17 +++++++++--------
 2 files changed, 12 insertions(+), 10 deletions(-)

diff --git a/banyand/liaison/grpc/grpc.go b/banyand/liaison/grpc/grpc.go
index c9b804d..58014cf 100644
--- a/banyand/liaison/grpc/grpc.go
+++ b/banyand/liaison/grpc/grpc.go
@@ -20,11 +20,12 @@ package grpc
 import (
 	"context"
 	"fmt"
+	"net"
+	"time"
+
 	flatbuffers "github.com/google/flatbuffers/go"
 	grpclib "google.golang.org/grpc"
 	"google.golang.org/grpc/encoding"
-	"net"
-	"time"
 
 	v1 "github.com/apache/skywalking-banyandb/api/fbs/v1"
 	"github.com/apache/skywalking-banyandb/banyand/queue"
diff --git a/banyand/series/service.go b/banyand/series/service.go
index 6c6a6d7..a62c7d0 100644
--- a/banyand/series/service.go
+++ b/banyand/series/service.go
@@ -20,10 +20,6 @@ package series
 import (
 	"bytes"
 	"context"
-	"time"
-
-	"go.uber.org/multierr"
-
 	"github.com/apache/skywalking-banyandb/api/common"
 	"github.com/apache/skywalking-banyandb/api/data"
 	v1 "github.com/apache/skywalking-banyandb/api/fbs/v1"
@@ -31,12 +27,15 @@ import (
 	"github.com/apache/skywalking-banyandb/banyand/series/schema/sw"
 	"github.com/apache/skywalking-banyandb/banyand/storage"
 	"github.com/apache/skywalking-banyandb/pkg/run"
+	"go.uber.org/multierr"
+	"time"
 )
 
 var _ Service = (*service)(nil)
 
 type service struct {
 	db storage.Database
+	addr string
 }
 
 //Methods for query objects in the schema
@@ -133,17 +132,19 @@ func (s *service) ScanEntity(traceSeries common.Metadata, startTime, endTime uin
 }
 
 func (s *service) Name() string {
-	panic("implement me")
+	return "series"
 }
 
 func (s *service) FlagSet() *run.FlagSet {
-	panic("implement me")
+	fs := run.NewFlagSet("series")
+	fs.StringVarP(&s.addr, "series", "", ":17911", "the address of banyand listens")
+	return fs
 }
 
 func (s *service) Validate() error {
-	panic("implement me")
+	return nil
 }
 
 func (s *service) PreRun() error {
-	panic("implement me")
+	return nil
 }

[skywalking-banyandb] 10/11: update grpc test

Posted by ha...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch feat/liasion
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit 45797cf2784cfafaef81f048ef4f99e61ad540a0
Author: Qiuxia Fan <fi...@outlook.com>
AuthorDate: Tue Jun 29 11:19:00 2021 +0800

    update grpc test
---
 banyand/liaison/grpc/grpc.go      | 9 +++------
 banyand/liaison/grpc/grpc_test.go | 9 ++-------
 2 files changed, 5 insertions(+), 13 deletions(-)

diff --git a/banyand/liaison/grpc/grpc.go b/banyand/liaison/grpc/grpc.go
index 4156efb..e84f599 100644
--- a/banyand/liaison/grpc/grpc.go
+++ b/banyand/liaison/grpc/grpc.go
@@ -28,7 +28,6 @@ import (
 	grpclib "google.golang.org/grpc"
 	"io"
 	"net"
-	"sync"
 )
 
 type Server struct {
@@ -36,7 +35,6 @@ type Server struct {
 	log      *logger.Logger
 	ser      *grpclib.Server
 	pipeline queue.Queue
-	writeEntity *v1.WriteEntity
 }
 
 func NewServer(ctx context.Context, pipeline queue.Queue) *Server {
@@ -85,13 +83,12 @@ func (s *Server) GracefulStop() {
 type TraceServer struct {
 	v1.UnimplementedTraceServer
 	writeData []*v1.WriteEntity
-	mu         sync.Mutex
 }
 
 func (t *TraceServer) Write(TraceWriteServer v1.Trace_WriteServer) error {
 	for {
 		writeEntity, err := TraceWriteServer.Recv()
-		fmt.Println(writeEntity)
+		fmt.Println(123, writeEntity)
 		if err == io.EOF {
 			return nil
 		}
@@ -102,8 +99,8 @@ func (t *TraceServer) Write(TraceWriteServer v1.Trace_WriteServer) error {
 		builder := flatbuffers.NewBuilder(0)
 		v1.WriteResponseStart(builder)
 		builder.Finish(v1.WriteResponseEnd(builder))
-		if err := TraceWriteServer.Send(builder); err != nil {
-			return err
+		if error := TraceWriteServer.Send(builder); error != nil {
+			return error
 		}
 		//writeEntity.Entity().Fields()
 		//writeEntity.MetaData(nil).Group()
diff --git a/banyand/liaison/grpc/grpc_test.go b/banyand/liaison/grpc/grpc_test.go
index b7d1ef1..a4c2ee4 100644
--- a/banyand/liaison/grpc/grpc_test.go
+++ b/banyand/liaison/grpc/grpc_test.go
@@ -19,7 +19,6 @@ package grpc
 
 import (
 	"context"
-	"fmt"
 	v1 "github.com/apache/skywalking-banyandb/api/fbs/v1"
 	"github.com/apache/skywalking-banyandb/pkg/convert"
 	flatbuffers "github.com/google/flatbuffers/go"
@@ -282,14 +281,13 @@ func Test_grpc_write(t *testing.T) {
 	defer conn.Close()
 
 	client := v1.NewTraceClient(conn)
-	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
+	ctx, cancel := context.WithTimeout(context.Background(), 10 * time.Second)
 	b := NewCriteriaBuilder()
-	binary := byte(123)
+	binary := byte(12)
 	entity := b.Build(
 		b.BuildEntity("entityId", []byte{binary}, "service_name", "endpoint_id"),
 		b.BuildMetaData("default", "trace"),
 	)
-	fmt.Println(entity)
 	builder, e := runWrite(entity)
 	if e != nil {
 		log.Fatalf("Failed to connect: %v", err)
@@ -299,13 +297,11 @@ func Test_grpc_write(t *testing.T) {
 	if er != nil {
 		log.Fatalf("%v.runWrite(_) = _, %v", client, err)
 	}
-	//waitc := make(chan struct{})
 	go func() {
 		for {
 			writeResponse, err := stream.Recv()
 			if err == io.EOF {
 				// read done.
-				//close(waitc)
 				return
 			}
 			if err != nil {
@@ -318,5 +314,4 @@ func Test_grpc_write(t *testing.T) {
 		log.Fatalf("Failed to send a note: %v", err)
 	}
 	stream.CloseSend()
-	//<-waitc
 }

[skywalking-banyandb] 01/11: fix typo

Posted by ha...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch feat/liasion
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit 182206222b21809c842f0b2db55ea4c0ee9bcb91
Author: Qiuxia Fan <fi...@outlook.com>
AuthorDate: Tue Jun 15 18:07:21 2021 +0800

    fix typo
---
 api/fbs/v1/write.fbs          |  6 ++---
 api/fbs/v1/write_generated.go | 52 +++++++++++++++++++++----------------------
 2 files changed, 29 insertions(+), 29 deletions(-)

diff --git a/api/fbs/v1/write.fbs b/api/fbs/v1/write.fbs
index ff000f2..57e1378 100644
--- a/api/fbs/v1/write.fbs
+++ b/api/fbs/v1/write.fbs
@@ -46,7 +46,7 @@ table Field {
     value: ValueType;
 }
 
-table entityValue {
+table EntityValue {
     // entity_id could be span_id of a Span or segment_id of a Segment in the context of Trace
     entity_id: string;
     // timestamp_nanoseconds is in the timeunit of nanoseconds. It represents
@@ -56,7 +56,7 @@ table entityValue {
     // binary representation of segments, including tags, spans...
     data_binary: [ubyte];
     // support all of indexed fields in the fields.
-    // Pair only has value, as the value of PairValue match with the key
+    // Field only has value, as the value of ValueType match with the key
     // by the index rules and index rule bindings of Metadata group.
     // indexed fields of multiple entities are compression in the fields.
     fields: [Field];
@@ -66,7 +66,7 @@ table WriteEntity {
     // the mate_data is only required in the first write.
     meta_data: Metadata;
     // the entity is required.
-    entity: entityValue;
+    entity: EntityValue;
 }
 
 table WriteResponse {}
diff --git a/api/fbs/v1/write_generated.go b/api/fbs/v1/write_generated.go
index 3d71f94..d3b561c 100644
--- a/api/fbs/v1/write_generated.go
+++ b/api/fbs/v1/write_generated.go
@@ -319,34 +319,34 @@ func FieldEnd(builder *flatbuffers.Builder) flatbuffers.UOffsetT {
 	return builder.EndObject()
 }
 
-type entityValue struct {
+type EntityValue struct {
 	_tab flatbuffers.Table
 }
 
-func GetRootAsentityValue(buf []byte, offset flatbuffers.UOffsetT) *entityValue {
+func GetRootAsEntityValue(buf []byte, offset flatbuffers.UOffsetT) *EntityValue {
 	n := flatbuffers.GetUOffsetT(buf[offset:])
-	x := &entityValue{}
+	x := &EntityValue{}
 	x.Init(buf, n+offset)
 	return x
 }
 
-func GetSizePrefixedRootAsentityValue(buf []byte, offset flatbuffers.UOffsetT) *entityValue {
+func GetSizePrefixedRootAsEntityValue(buf []byte, offset flatbuffers.UOffsetT) *EntityValue {
 	n := flatbuffers.GetUOffsetT(buf[offset+flatbuffers.SizeUint32:])
-	x := &entityValue{}
+	x := &EntityValue{}
 	x.Init(buf, n+offset+flatbuffers.SizeUint32)
 	return x
 }
 
-func (rcv *entityValue) Init(buf []byte, i flatbuffers.UOffsetT) {
+func (rcv *EntityValue) Init(buf []byte, i flatbuffers.UOffsetT) {
 	rcv._tab.Bytes = buf
 	rcv._tab.Pos = i
 }
 
-func (rcv *entityValue) Table() flatbuffers.Table {
+func (rcv *EntityValue) Table() flatbuffers.Table {
 	return rcv._tab
 }
 
-func (rcv *entityValue) EntityId() []byte {
+func (rcv *EntityValue) EntityId() []byte {
 	o := flatbuffers.UOffsetT(rcv._tab.Offset(4))
 	if o != 0 {
 		return rcv._tab.ByteVector(o + rcv._tab.Pos)
@@ -354,7 +354,7 @@ func (rcv *entityValue) EntityId() []byte {
 	return nil
 }
 
-func (rcv *entityValue) TimestampNanoseconds() uint64 {
+func (rcv *EntityValue) TimestampNanoseconds() uint64 {
 	o := flatbuffers.UOffsetT(rcv._tab.Offset(6))
 	if o != 0 {
 		return rcv._tab.GetUint64(o + rcv._tab.Pos)
@@ -362,11 +362,11 @@ func (rcv *entityValue) TimestampNanoseconds() uint64 {
 	return 0
 }
 
-func (rcv *entityValue) MutateTimestampNanoseconds(n uint64) bool {
+func (rcv *EntityValue) MutateTimestampNanoseconds(n uint64) bool {
 	return rcv._tab.MutateUint64Slot(6, n)
 }
 
-func (rcv *entityValue) DataBinary(j int) byte {
+func (rcv *EntityValue) DataBinary(j int) byte {
 	o := flatbuffers.UOffsetT(rcv._tab.Offset(8))
 	if o != 0 {
 		a := rcv._tab.Vector(o)
@@ -375,7 +375,7 @@ func (rcv *entityValue) DataBinary(j int) byte {
 	return 0
 }
 
-func (rcv *entityValue) DataBinaryLength() int {
+func (rcv *EntityValue) DataBinaryLength() int {
 	o := flatbuffers.UOffsetT(rcv._tab.Offset(8))
 	if o != 0 {
 		return rcv._tab.VectorLen(o)
@@ -383,7 +383,7 @@ func (rcv *entityValue) DataBinaryLength() int {
 	return 0
 }
 
-func (rcv *entityValue) DataBinaryBytes() []byte {
+func (rcv *EntityValue) DataBinaryBytes() []byte {
 	o := flatbuffers.UOffsetT(rcv._tab.Offset(8))
 	if o != 0 {
 		return rcv._tab.ByteVector(o + rcv._tab.Pos)
@@ -391,7 +391,7 @@ func (rcv *entityValue) DataBinaryBytes() []byte {
 	return nil
 }
 
-func (rcv *entityValue) MutateDataBinary(j int, n byte) bool {
+func (rcv *EntityValue) MutateDataBinary(j int, n byte) bool {
 	o := flatbuffers.UOffsetT(rcv._tab.Offset(8))
 	if o != 0 {
 		a := rcv._tab.Vector(o)
@@ -400,7 +400,7 @@ func (rcv *entityValue) MutateDataBinary(j int, n byte) bool {
 	return false
 }
 
-func (rcv *entityValue) Fields(obj *Field, j int) bool {
+func (rcv *EntityValue) Fields(obj *Field, j int) bool {
 	o := flatbuffers.UOffsetT(rcv._tab.Offset(10))
 	if o != 0 {
 		x := rcv._tab.Vector(o)
@@ -412,7 +412,7 @@ func (rcv *entityValue) Fields(obj *Field, j int) bool {
 	return false
 }
 
-func (rcv *entityValue) FieldsLength() int {
+func (rcv *EntityValue) FieldsLength() int {
 	o := flatbuffers.UOffsetT(rcv._tab.Offset(10))
 	if o != 0 {
 		return rcv._tab.VectorLen(o)
@@ -420,28 +420,28 @@ func (rcv *entityValue) FieldsLength() int {
 	return 0
 }
 
-func entityValueStart(builder *flatbuffers.Builder) {
+func EntityValueStart(builder *flatbuffers.Builder) {
 	builder.StartObject(4)
 }
-func entityValueAddEntityId(builder *flatbuffers.Builder, entityId flatbuffers.UOffsetT) {
+func EntityValueAddEntityId(builder *flatbuffers.Builder, entityId flatbuffers.UOffsetT) {
 	builder.PrependUOffsetTSlot(0, flatbuffers.UOffsetT(entityId), 0)
 }
-func entityValueAddTimestampNanoseconds(builder *flatbuffers.Builder, timestampNanoseconds uint64) {
+func EntityValueAddTimestampNanoseconds(builder *flatbuffers.Builder, timestampNanoseconds uint64) {
 	builder.PrependUint64Slot(1, timestampNanoseconds, 0)
 }
-func entityValueAddDataBinary(builder *flatbuffers.Builder, dataBinary flatbuffers.UOffsetT) {
+func EntityValueAddDataBinary(builder *flatbuffers.Builder, dataBinary flatbuffers.UOffsetT) {
 	builder.PrependUOffsetTSlot(2, flatbuffers.UOffsetT(dataBinary), 0)
 }
-func entityValueStartDataBinaryVector(builder *flatbuffers.Builder, numElems int) flatbuffers.UOffsetT {
+func EntityValueStartDataBinaryVector(builder *flatbuffers.Builder, numElems int) flatbuffers.UOffsetT {
 	return builder.StartVector(1, numElems, 1)
 }
-func entityValueAddFields(builder *flatbuffers.Builder, fields flatbuffers.UOffsetT) {
+func EntityValueAddFields(builder *flatbuffers.Builder, fields flatbuffers.UOffsetT) {
 	builder.PrependUOffsetTSlot(3, flatbuffers.UOffsetT(fields), 0)
 }
-func entityValueStartFieldsVector(builder *flatbuffers.Builder, numElems int) flatbuffers.UOffsetT {
+func EntityValueStartFieldsVector(builder *flatbuffers.Builder, numElems int) flatbuffers.UOffsetT {
 	return builder.StartVector(4, numElems, 4)
 }
-func entityValueEnd(builder *flatbuffers.Builder) flatbuffers.UOffsetT {
+func EntityValueEnd(builder *flatbuffers.Builder) flatbuffers.UOffsetT {
 	return builder.EndObject()
 }
 
@@ -485,12 +485,12 @@ func (rcv *WriteEntity) MetaData(obj *Metadata) *Metadata {
 	return nil
 }
 
-func (rcv *WriteEntity) Entity(obj *entityValue) *entityValue {
+func (rcv *WriteEntity) Entity(obj *EntityValue) *EntityValue {
 	o := flatbuffers.UOffsetT(rcv._tab.Offset(6))
 	if o != 0 {
 		x := rcv._tab.Indirect(o + rcv._tab.Pos)
 		if obj == nil {
-			obj = new(entityValue)
+			obj = new(EntityValue)
 		}
 		obj.Init(rcv._tab.Bytes, x)
 		return obj

[skywalking-banyandb] 09/11: merge

Posted by ha...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch feat/liasion
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit 02817dd2e104ce80f4807faa59441b94a24340ab
Merge: dfddd49 9d487a4
Author: Qiuxia Fan <fi...@outlook.com>
AuthorDate: Mon Jun 28 18:11:38 2021 +0800

    merge

 api/common/metadata.go                          |  12 +-
 api/fbs/v1/query.fbs                            |   4 +-
 api/fbs/v1/query_generated.go                   |   6 +-
 api/fbs/v1/schema.fbs                           |  10 +-
 api/fbs/v1/schema_generated.go                  |  36 ++--
 banyand/series/series.go                        |   7 +-
 go.mod                                          |   2 +
 go.sum                                          |   3 +-
 pkg/convert/number.go                           |  24 +++
 pkg/logical/analyzer.go                         | 200 +++++++++++++++++++
 pkg/logical/analyzer_test.go                    | 255 ++++++++++++++++++++++++
 pkg/logical/common_test.go                      | 232 +++++++++++++++++++++
 pkg/logical/expr.go                             | 192 ++++++++++++++++++
 pkg/logical/expr_literal.go                     | 135 +++++++++++++
 api/common/metadata.go => pkg/logical/format.go |  27 ++-
 pkg/{convert/number.go => logical/interface.go} |  54 +++--
 pkg/logical/plan.go                             | 133 ++++++++++++
 pkg/logical/plan_indexscan.go                   | 162 +++++++++++++++
 pkg/logical/plan_orderby.go                     |  96 +++++++++
 pkg/logical/plan_projection.go                  | 100 ++++++++++
 pkg/logical/plan_tablescan.go                   | 122 ++++++++++++
 pkg/logical/plan_traceid.go                     |  72 +++++++
 pkg/logical/schema.go                           | 117 +++++++++++
 23 files changed, 1943 insertions(+), 58 deletions(-)

diff --cc pkg/convert/number.go
index 0dbc4ca,88c80fc..f57f87c
--- a/pkg/convert/number.go
+++ b/pkg/convert/number.go
@@@ -41,4 -41,28 +41,28 @@@ func IntToInt64(numbers ...int) []int6
  		arr = append(arr, int64(numbers[i]))
  	}
  	return arr
+ }
+ 
+ func Int8ToInt64(numbers ...int8) []int64 {
+ 	var arr []int64
+ 	for i := 0; i < len(numbers); i++ {
+ 		arr = append(arr, int64(numbers[i]))
+ 	}
+ 	return arr
+ }
+ 
+ func Int16ToInt64(numbers ...int16) []int64 {
+ 	var arr []int64
+ 	for i := 0; i < len(numbers); i++ {
+ 		arr = append(arr, int64(numbers[i]))
+ 	}
+ 	return arr
+ }
+ 
+ func Int32ToInt64(numbers ...int32) []int64 {
+ 	var arr []int64
+ 	for i := 0; i < len(numbers); i++ {
+ 		arr = append(arr, int64(numbers[i]))
+ 	}
+ 	return arr
 -}
 +}

[skywalking-banyandb] 06/11: read trace

Posted by ha...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch feat/liasion
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit 5e377c973615c8489f7076b33296b7185c56b2c2
Author: Qiuxia Fan <fi...@outlook.com>
AuthorDate: Mon Jun 21 11:07:59 2021 +0800

    read trace
---
 banyand/liaison/grpc/grpc.go                      | 55 +++++++++++++----------
 banyand/liaison/{liaison.go => grpc/grpc_test.go} | 19 +-------
 banyand/liaison/liaison.go                        |  1 -
 3 files changed, 32 insertions(+), 43 deletions(-)

diff --git a/banyand/liaison/grpc/grpc.go b/banyand/liaison/grpc/grpc.go
index 9f8b108..92dcc48 100644
--- a/banyand/liaison/grpc/grpc.go
+++ b/banyand/liaison/grpc/grpc.go
@@ -19,6 +19,7 @@ package grpc
 
 import (
 	"context"
+	"fmt"
 	"net"
 	"strconv"
 	"time"
@@ -76,8 +77,8 @@ func (s *Server) GracefulStop() {
 	s.ser.GracefulStop()
 }
 
-func (s *Server) WriteTraces(entityValue *v1.EntityValue, metaData *v1.Metadata) (*flatbuffers.Builder, error) {
-	s.log.Info("WriteTraces called...")
+func WriteTraces(entityValue *v1.EntityValue, metaData *v1.Metadata) []byte {
+	fmt.Println("Write called...")
 	builder := flatbuffers.NewBuilder(0)
 	// Serialize MetaData
 	group, name := builder.CreateString(string(metaData.Group())), builder.CreateString(string(metaData.Name()))
@@ -90,7 +91,7 @@ func (s *Server) WriteTraces(entityValue *v1.EntityValue, metaData *v1.Metadata)
 	var fieldList []flatbuffers.UOffsetT
 	for i := 0; i < entityValue.FieldsLength(); i++ {
 		var f v1.Field
-		var s string
+		var str string
 		if ok := entityValue.Fields(&f, i); ok {
 			unionValueType := new(flatbuffers.Table)
 			if f.Value(unionValueType) {
@@ -99,7 +100,7 @@ func (s *Server) WriteTraces(entityValue *v1.EntityValue, metaData *v1.Metadata)
 					unionStr := new(v1.String)
 					unionStr.Init(unionValueType.Bytes, unionValueType.Pos)
 					v1.FieldAddValueType(builder, v1.ValueTypeString)
-					s = string(unionStr.Value())
+					str = string(unionStr.Value())
 				} else if valueType == v1.ValueTypeInt {
 					unionInt := new(v1.Int)
 					unionInt.Init(unionValueType.Bytes, unionValueType.Pos)
@@ -110,32 +111,32 @@ func (s *Server) WriteTraces(entityValue *v1.EntityValue, metaData *v1.Metadata)
 				} else if valueType == v1.ValueTypeStringArray {
 					unionStrArray := new(v1.StringArray)
 					unionStrArray.Init(unionValueType.Bytes, unionValueType.Pos)
-					len := unionStrArray.ValueLength()
-					if len == 1 {
-						s += string(unionStrArray.Value(0))
+					l := unionStrArray.ValueLength()
+					if l == 1 {
+						str += string(unionStrArray.Value(0))
 					} else {
-						s += "["
-						for i := 0; i < len; i++ {
-							s += string(unionStrArray.Value(i))
+						str += "["
+						for j := 0; j < l; j++ {
+							str += string(unionStrArray.Value(j))
 						}
-						s += "]"
+						str += "]"
 					}
 				} else if valueType == v1.ValueTypeIntArray {
 					unionIntArray := new(v1.IntArray)
 					unionIntArray.Init(unionValueType.Bytes, unionValueType.Pos)
 					l := unionIntArray.ValueLength()
 					if l == 1 {
-						s += strconv.FormatInt(unionIntArray.Value(0), 10)
+						str += strconv.FormatInt(unionIntArray.Value(0), 10)
 					} else if l > 1{
-						s += "["
-						for i := 0; i < l; i++ {
-							s += strconv.FormatInt(unionIntArray.Value(i), 10)
+						str += "["
+						for j := 0; j < l; j++ {
+							str += strconv.FormatInt(unionIntArray.Value(j), 10)
 						}
-						s += "]"
+						str += "]"
 					}
 				}
 				if valueType == v1.ValueTypeIntArray || valueType == v1.ValueTypeStringArray || valueType == v1.ValueTypeString {
-					field := builder.CreateString(s)
+					field := builder.CreateString(str)
 					v1.FieldAddValue(builder, field)
 					fieldList = append(fieldList, field)
 				}
@@ -167,14 +168,20 @@ func (s *Server) WriteTraces(entityValue *v1.EntityValue, metaData *v1.Metadata)
 
 	builder.Finish(trace)
 
-	return builder, nil
+	return builder.Bytes[builder.Head():]
 }
 
-func (s *Server) FetchTraces(in *v1.EntityCriteria) (*flatbuffers.Builder, error) {
-	s.log.Info("Fetch called...")
-	builder := flatbuffers.NewBuilder(0)
-	//buf := builder.FinishedBytes()
-	//entityCriteria := v1.GetRootAsEntityCriteria(buf, 0)
+type ComponentBuilderFunc func(*flatbuffers.Builder)
+
+func ReadTraces(funcs ...ComponentBuilderFunc) *v1.Entity {
+	b := flatbuffers.NewBuilder(1024)
+	v1.EntityStart(b)
+	for _, fun := range funcs {
+		fun(b)
+	}
+	entityOffset := v1.EntityEnd(b)
+	b.Finish(entityOffset)
 
-	return builder, nil
+	buf := b.Bytes[b.Head():]
+	return v1.GetRootAsEntity(buf, 0)
 }
diff --git a/banyand/liaison/liaison.go b/banyand/liaison/grpc/grpc_test.go
similarity index 68%
copy from banyand/liaison/liaison.go
copy to banyand/liaison/grpc/grpc_test.go
index 117b361..761af64 100644
--- a/banyand/liaison/liaison.go
+++ b/banyand/liaison/grpc/grpc_test.go
@@ -15,21 +15,4 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package liaison
-
-import (
-	"context"
-
-	"github.com/apache/skywalking-banyandb/banyand/liaison/grpc"
-	"github.com/apache/skywalking-banyandb/banyand/queue"
-	"github.com/apache/skywalking-banyandb/pkg/run"
-)
-
-type Endpoint interface {
-	run.Config
-	run.Service
-}
-
-func NewEndpoint(ctx context.Context, pipeline queue.Queue) (Endpoint, error) {
-	return grpc.NewServer(ctx, pipeline), nil
-}
+package grpc_test
diff --git a/banyand/liaison/liaison.go b/banyand/liaison/liaison.go
index 117b361..76e8d14 100644
--- a/banyand/liaison/liaison.go
+++ b/banyand/liaison/liaison.go
@@ -19,7 +19,6 @@ package liaison
 
 import (
 	"context"
-
 	"github.com/apache/skywalking-banyandb/banyand/liaison/grpc"
 	"github.com/apache/skywalking-banyandb/banyand/queue"
 	"github.com/apache/skywalking-banyandb/pkg/run"

[skywalking-banyandb] 05/11: Serialize Fields

Posted by ha...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch feat/liasion
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit 14a7fcb5cedf21ce18aefcad44c164932c299810
Author: Qiuxia Fan <fi...@outlook.com>
AuthorDate: Sun Jun 20 18:15:46 2021 +0800

    Serialize Fields
---
 banyand/liaison/grpc/grpc.go | 80 +++++++++++++++++++++++++++++++++++---------
 1 file changed, 64 insertions(+), 16 deletions(-)

diff --git a/banyand/liaison/grpc/grpc.go b/banyand/liaison/grpc/grpc.go
index 58014cf..9f8b108 100644
--- a/banyand/liaison/grpc/grpc.go
+++ b/banyand/liaison/grpc/grpc.go
@@ -19,8 +19,8 @@ package grpc
 
 import (
 	"context"
-	"fmt"
 	"net"
+	"strconv"
 	"time"
 
 	flatbuffers "github.com/google/flatbuffers/go"
@@ -77,7 +77,7 @@ func (s *Server) GracefulStop() {
 }
 
 func (s *Server) WriteTraces(entityValue *v1.EntityValue, metaData *v1.Metadata) (*flatbuffers.Builder, error) {
-	s.log.Info("Write called...")
+	s.log.Info("WriteTraces called...")
 	builder := flatbuffers.NewBuilder(0)
 	// Serialize MetaData
 	group, name := builder.CreateString(string(metaData.Group())), builder.CreateString(string(metaData.Name()))
@@ -85,19 +85,65 @@ func (s *Server) WriteTraces(entityValue *v1.EntityValue, metaData *v1.Metadata)
 	v1.MetadataAddGroup(builder, group)
 	v1.MetadataAddName(builder, name)
 	v1.MetadataEnd(builder)
-	// Serialize Field
+	// Serialize Fields
 	v1.FieldStart(builder)
-	//for i := 0; i < entityValue.FieldsLength(); i++ {
-	//	v1.FieldAddValueType(builder, v1.ValueTypeString)
-	//	field := builder.CreateString("test")
-	//	v1.FieldAddValue(builder, field)
-	//}
-	v1.FieldAddValueType(builder, v1.ValueTypeString)
-	field := builder.CreateString("test")
-	fmt.Println(field)
-	v1.FieldAddValue(builder, field)
+	var fieldList []flatbuffers.UOffsetT
+	for i := 0; i < entityValue.FieldsLength(); i++ {
+		var f v1.Field
+		var s string
+		if ok := entityValue.Fields(&f, i); ok {
+			unionValueType := new(flatbuffers.Table)
+			if f.Value(unionValueType) {
+				valueType := f.ValueType()
+				if valueType == v1.ValueTypeString {
+					unionStr := new(v1.String)
+					unionStr.Init(unionValueType.Bytes, unionValueType.Pos)
+					v1.FieldAddValueType(builder, v1.ValueTypeString)
+					s = string(unionStr.Value())
+				} else if valueType == v1.ValueTypeInt {
+					unionInt := new(v1.Int)
+					unionInt.Init(unionValueType.Bytes, unionValueType.Pos)
+					v1.FieldAddValueType(builder, v1.ValueTypeInt)
+					field := flatbuffers.UOffsetT(unionInt.Value())
+					v1.FieldAddValue(builder, field)
+					fieldList = append(fieldList, field)
+				} else if valueType == v1.ValueTypeStringArray {
+					unionStrArray := new(v1.StringArray)
+					unionStrArray.Init(unionValueType.Bytes, unionValueType.Pos)
+					len := unionStrArray.ValueLength()
+					if len == 1 {
+						s += string(unionStrArray.Value(0))
+					} else {
+						s += "["
+						for i := 0; i < len; i++ {
+							s += string(unionStrArray.Value(i))
+						}
+						s += "]"
+					}
+				} else if valueType == v1.ValueTypeIntArray {
+					unionIntArray := new(v1.IntArray)
+					unionIntArray.Init(unionValueType.Bytes, unionValueType.Pos)
+					l := unionIntArray.ValueLength()
+					if l == 1 {
+						s += strconv.FormatInt(unionIntArray.Value(0), 10)
+					} else if l > 1{
+						s += "["
+						for i := 0; i < l; i++ {
+							s += strconv.FormatInt(unionIntArray.Value(i), 10)
+						}
+						s += "]"
+					}
+				}
+				if valueType == v1.ValueTypeIntArray || valueType == v1.ValueTypeStringArray || valueType == v1.ValueTypeString {
+					field := builder.CreateString(s)
+					v1.FieldAddValue(builder, field)
+					fieldList = append(fieldList, field)
+				}
+			}
+		}
+	}
 	v1.FieldEnd(builder)
-	// Serialize WriteEntity
+	// Serialize EntityValue
 	dataBinaryLength := entityValue.DataBinaryLength()
 	v1.EntityStartDataBinaryVector(builder, dataBinaryLength)
 	for i := dataBinaryLength; i >= 0; i-- {
@@ -110,9 +156,11 @@ func (s *Server) WriteTraces(entityValue *v1.EntityValue, metaData *v1.Metadata)
 	time := uint64(time.Now().UnixNano())
 	v1.EntityValueAddTimestampNanoseconds(builder, time)
 	v1.EntityValueAddDataBinary(builder, dataBinary)
-	v1.EntityValueStartFieldsVector(builder, 1)
-	builder.PrependUOffsetT(field)
-	fields := builder.EndVector(1)
+	v1.EntityValueStartFieldsVector(builder, len(fieldList))
+	for val := range fieldList {
+		builder.PrependUOffsetT(flatbuffers.UOffsetT(val))
+	}
+	fields := builder.EndVector(len(fieldList))
 	v1.EntityValueAddFields(builder, fields)
 	v1.EntityValueEnd(builder)
 	trace := v1.WriteEntityEnd(builder)

[skywalking-banyandb] 11/11: Fix some test issues

Posted by ha...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch feat/liasion
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit e1a5044da1a0a316dd70538930aee0f6c607cf02
Author: Gao Hongtao <ha...@gmail.com>
AuthorDate: Tue Jun 29 12:05:22 2021 +0800

    Fix some test issues
    
    Signed-off-by: Gao Hongtao <ha...@gmail.com>
---
 banyand/liaison/grpc/grpc.go      | 14 +++++++-------
 banyand/liaison/grpc/grpc_test.go | 35 ++++++++++++++++++++---------------
 2 files changed, 27 insertions(+), 22 deletions(-)

diff --git a/banyand/liaison/grpc/grpc.go b/banyand/liaison/grpc/grpc.go
index e84f599..58092a1 100644
--- a/banyand/liaison/grpc/grpc.go
+++ b/banyand/liaison/grpc/grpc.go
@@ -20,14 +20,15 @@ package grpc
 import (
 	"context"
 	"fmt"
+	"io"
+	"net"
+
 	v1 "github.com/apache/skywalking-banyandb/api/fbs/v1"
 	"github.com/apache/skywalking-banyandb/banyand/queue"
 	"github.com/apache/skywalking-banyandb/pkg/logger"
 	"github.com/apache/skywalking-banyandb/pkg/run"
 	flatbuffers "github.com/google/flatbuffers/go"
 	grpclib "google.golang.org/grpc"
-	"io"
-	"net"
 )
 
 type Server struct {
@@ -54,7 +55,7 @@ func (s *Server) FlagSet() *run.FlagSet {
 func (s *Server) Validate() error {
 	return nil
 }
-func init(){
+func init() {
 	//encoding.RegisterCodec(flatbuffers.FlatbuffersCodec{})
 }
 func (s *Server) Serve() error {
@@ -64,7 +65,6 @@ func (s *Server) Serve() error {
 		s.log.Fatal("Failed to listen", logger.Error(err))
 	}
 
-
 	s.ser = grpclib.NewServer(grpclib.CustomCodec(flatbuffers.FlatbuffersCodec{}))
 	//s.ser = grpclib.NewServer()
 
@@ -88,19 +88,19 @@ type TraceServer struct {
 func (t *TraceServer) Write(TraceWriteServer v1.Trace_WriteServer) error {
 	for {
 		writeEntity, err := TraceWriteServer.Recv()
-		fmt.Println(123, writeEntity)
 		if err == io.EOF {
 			return nil
 		}
 		if err != nil {
 			return err
 		}
+		fmt.Println(123, writeEntity)
 		t.writeData = append(t.writeData, writeEntity)
 		builder := flatbuffers.NewBuilder(0)
 		v1.WriteResponseStart(builder)
 		builder.Finish(v1.WriteResponseEnd(builder))
-		if error := TraceWriteServer.Send(builder); error != nil {
-			return error
+		if errSend := TraceWriteServer.Send(builder); errSend != nil {
+			return errSend
 		}
 		//writeEntity.Entity().Fields()
 		//writeEntity.MetaData(nil).Group()
diff --git a/banyand/liaison/grpc/grpc_test.go b/banyand/liaison/grpc/grpc_test.go
index a4c2ee4..7f865ee 100644
--- a/banyand/liaison/grpc/grpc_test.go
+++ b/banyand/liaison/grpc/grpc_test.go
@@ -19,14 +19,15 @@ package grpc
 
 import (
 	"context"
-	v1 "github.com/apache/skywalking-banyandb/api/fbs/v1"
-	"github.com/apache/skywalking-banyandb/pkg/convert"
-	flatbuffers "github.com/google/flatbuffers/go"
-	"google.golang.org/grpc"
 	"io"
 	"log"
 	"testing"
 	"time"
+
+	v1 "github.com/apache/skywalking-banyandb/api/fbs/v1"
+	"github.com/apache/skywalking-banyandb/pkg/convert"
+	flatbuffers "github.com/google/flatbuffers/go"
+	"google.golang.org/grpc"
 )
 
 var serverAddr = "localhost:17912"
@@ -35,6 +36,7 @@ type ComponentBuilderFunc func(*flatbuffers.Builder)
 type writeEntityBuilder struct {
 	*flatbuffers.Builder
 }
+
 func NewCriteriaBuilder() *writeEntityBuilder {
 	return &writeEntityBuilder{
 		flatbuffers.NewBuilder(1024),
@@ -77,7 +79,7 @@ func (b *writeEntityBuilder) BuildEntity(id string, binary []byte, items ...inte
 	}
 }
 
-func (b *writeEntityBuilder) BuildField(val interface{}) flatbuffers.UOffsetT  {
+func (b *writeEntityBuilder) BuildField(val interface{}) flatbuffers.UOffsetT {
 	var ValueTypeOffset flatbuffers.UOffsetT
 	var valType v1.ValueType
 	switch v := val.(type) {
@@ -159,7 +161,7 @@ func (b *writeEntityBuilder) Build(funcs ...ComponentBuilderFunc) *v1.WriteEntit
 	return v1.GetRootAsWriteEntity(buf, 0)
 }
 
-func runWrite (writeEntity *v1.WriteEntity) (*flatbuffers.Builder, error) {
+func runWrite(writeEntity *v1.WriteEntity) (*flatbuffers.Builder, error) {
 	builder := flatbuffers.NewBuilder(0)
 	metaData := writeEntity.MetaData(nil)
 	entityValue := writeEntity.Entity(nil)
@@ -270,7 +272,7 @@ func runWrite (writeEntity *v1.WriteEntity) (*flatbuffers.Builder, error) {
 	position := v1.WriteEntityEnd(builder)
 	builder.Finish(position)
 
-	return  builder, nil
+	return builder, nil
 }
 
 func Test_grpc_write(t *testing.T) {
@@ -281,7 +283,7 @@ func Test_grpc_write(t *testing.T) {
 	defer conn.Close()
 
 	client := v1.NewTraceClient(conn)
-	ctx, cancel := context.WithTimeout(context.Background(), 10 * time.Second)
+	ctx := context.Background()
 	b := NewCriteriaBuilder()
 	binary := byte(12)
 	entity := b.Build(
@@ -292,26 +294,29 @@ func Test_grpc_write(t *testing.T) {
 	if e != nil {
 		log.Fatalf("Failed to connect: %v", err)
 	}
-	defer cancel()
 	stream, er := client.Write(ctx)
 	if er != nil {
 		log.Fatalf("%v.runWrite(_) = _, %v", client, err)
 	}
+	waitc := make(chan struct{})
 	go func() {
 		for {
-			writeResponse, err := stream.Recv()
-			if err == io.EOF {
+			writeResponse, errRecv := stream.Recv()
+			if errRecv == io.EOF {
 				// read done.
+				close(waitc)
 				return
 			}
-			if err != nil {
+			if errRecv != nil {
 				log.Fatalf("Failed to receive data : %v", err)
 			}
-			println( writeResponse)
+			println(writeResponse)
 		}
 	}()
-	if error := stream.Send(builder); error != nil {
-		log.Fatalf("Failed to send a note: %v", err)
+	if errSend := stream.Send(builder); errSend != nil {
+		log.Fatalf("Failed to send a note: %v", errSend)
 	}
+
 	stream.CloseSend()
+	<-waitc
 }