You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ha...@apache.org on 2021/07/27 09:18:28 UTC

[skywalking-banyandb] 09/11: feat: use protobuf

This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch gao-feat/rpc
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit 3f61301a339f73cb85dbd830e2d55806983ee14b
Author: Qiuxia Fan <fi...@outlook.com>
AuthorDate: Tue Jul 27 13:16:31 2021 +0800

    feat: use protobuf
---
 api/fbs/v1/write.fbs              | 75 -----------------------------------
 banyand/liaison/grpc/grpc.go      | 83 +++++++++++++++------------------------
 banyand/liaison/grpc/grpc_test.go | 48 +++++++++++-----------
 banyand/series/trace/write.go     |  5 ++-
 pkg/partition/route.go            |  5 ++-
 pkg/query/logical/schema.go       | 14 +++----
 6 files changed, 66 insertions(+), 164 deletions(-)

diff --git a/api/fbs/v1/write.fbs b/api/fbs/v1/write.fbs
deleted file mode 100644
index ac7314c..0000000
--- a/api/fbs/v1/write.fbs
+++ /dev/null
@@ -1,75 +0,0 @@
-// Licensed to Apache Software Foundation (ASF) under one or more contributor
-// license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright
-// ownership. Apache Software Foundation (ASF) licenses this file to you under
-// the Apache License, Version 2.0 (the "License"); you may
-// not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-include "database.fbs";
-
-namespace banyandb.v1;
-
-table String {
-    value: string;
-}
-
-table Int {
-    value: int64;
-}
-
-table StringArray {
-    value: [string];
-}
-
-table IntArray {
-    value: [int64];
-}
-
-union ValueType {
-    String,
-    StringArray,
-    Int,
-    IntArray
-}
-
-table Field {
-    value: ValueType;
-}
-
-table EntityValue {
-    // entity_id could be span_id of a Span or segment_id of a Segment in the context of Trace
-    entity_id: string;
-    // timestamp_nanoseconds is in the timeunit of nanoseconds. It represents
-    // 1) either the start time of a Span/Segment,
-    // 2) or the timestamp of a log
-    timestamp_nanoseconds: uint64;
-    // binary representation of segments, including tags, spans...
-    data_binary: [ubyte];
-    // support all of indexed fields in the fields.
-    // Field only has value, as the value of ValueType match with the key
-    // by the index rules and index rule bindings of Metadata group.
-    // indexed fields of multiple entities are compression in the fields.
-    fields: [Field];
-}
-
-table WriteEntity {
-    // the mate_data is only required in the first write.
-    meta_data: Metadata;
-    // the entity is required.
-    entity: EntityValue;
-}
-
-table WriteResponse {}
-
-root_type WriteEntity;
-root_type EntityValue;
diff --git a/banyand/liaison/grpc/grpc.go b/banyand/liaison/grpc/grpc.go
index 43efff6..bc0f9ca 100644
--- a/banyand/liaison/grpc/grpc.go
+++ b/banyand/liaison/grpc/grpc.go
@@ -19,10 +19,11 @@ package grpc
 
 import (
 	"context"
+	"fmt"
+	"github.com/apache/skywalking-banyandb/api/common"
+	apischema "github.com/apache/skywalking-banyandb/api/schema"
 	"net"
 
-	"google.golang.org/grpc"
-
 	"github.com/apache/skywalking-banyandb/api/event"
 	v1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/v1"
 	"github.com/apache/skywalking-banyandb/banyand/discovery"
@@ -30,21 +31,19 @@ import (
 	"github.com/apache/skywalking-banyandb/pkg/bus"
 	"github.com/apache/skywalking-banyandb/pkg/logger"
 	"github.com/apache/skywalking-banyandb/pkg/partition"
-	logical "github.com/apache/skywalking-banyandb/pkg/query/logical"
+	"github.com/apache/skywalking-banyandb/pkg/query/logical"
 	"github.com/apache/skywalking-banyandb/pkg/run"
-	flatbuffers "github.com/google/flatbuffers/go"
 	"github.com/pkg/errors"
 	grpclib "google.golang.org/grpc"
 	"io"
 	"log"
-	"net"
 	"strings"
 )
 
 type Server struct {
 	addr       string
 	log        *logger.Logger
-	ser        *grpc.Server
+	ser        *grpclib.Server
 	pipeline   queue.Queue
 	repo       discovery.ServiceRepo
 	shardInfo  *shardInfo
@@ -127,7 +126,7 @@ func (s *Server) Serve() error {
 		s.log.Fatal().Err(err).Msg("Failed to listen")
 	}
 
-	s.ser = grpc.NewServer()
+	s.ser = grpclib.NewServer()
 	// TODO: add server implementation here
 	v1.RegisterTraceServer(s.ser, v1.UnimplementedTraceServer{})
 
@@ -153,50 +152,38 @@ func (t *TraceServer) Write(TraceWriteServer v1.Trace_WriteServer) error {
 			return err
 		}
 
-		//log.Println("writeEntity:", writeEntity)
+		log.Println("writeEntity:", writeEntity)
 		ana := logical.DefaultAnalyzer()
 		metadata := common.Metadata{
 			KindVersion: apischema.SeriesKindVersion,
-			Spec:        writeEntity.MetaData(nil),
+			Spec:        writeEntity.GetMetadata(),
 		}
 		schema, ruleError := ana.BuildTraceSchema(context.TODO(), metadata)
 		if ruleError != nil {
 			return  ruleError
 		}
-		seriesIdLen := seriesEventData.FieldNamesCompositeSeriesIdLength()
+		seriesIdLen := len(seriesEventData.FieldNamesCompositeSeriesId)
 		var str string
 		var arr []string
 		for i := 0; i < seriesIdLen; i++ {
-			id := seriesEventData.FieldNamesCompositeSeriesId(i)
-			if defined, sub := schema.FieldSubscript(string(id)); defined {
+			id := seriesEventData.FieldNamesCompositeSeriesId[i]
+			if defined, sub := schema.FieldSubscript(id); defined {
 				var field v1.Field
-				if ok := writeEntity.Entity(nil).Fields(&field, sub); !ok {
-					return nil
-				}
-				unionValueTable := new(flatbuffers.Table)
-				if ok := field.Value(unionValueTable); !ok {
-					return nil
-				}
-				if field.ValueType() == v1.ValueTypeStringArray {
-					unionStrArr := new(v1.StringArray)
-					unionStrArr.Init(unionValueTable.Bytes, unionValueTable.Pos)
-					for j := 0; j < unionStrArr.ValueLength(); j++ {
-						arr = append(arr, string(unionStrArr.Value(j)))
-					}
-				} else if field.ValueType() == v1.ValueTypeIntArray {
-					unionIntArr := new(v1.IntArray)
-					unionIntArr.Init(unionValueTable.Bytes, unionValueTable.Pos)
-					for t := 0; t < unionIntArr.ValueLength(); t++ {
-						arr = append(arr, fmt.Sprint(unionIntArr.Value(t)))
-					}
-				} else if field.ValueType() == v1.ValueTypeInt {
-					unionInt := new(v1.Int)
-					unionInt.Init(unionValueTable.Bytes, unionValueTable.Pos)
-					arr = append(arr, fmt.Sprint(unionInt.Value()))
-				} else if field.ValueType() == v1.ValueTypeString {
-					unionStr := new(v1.String)
-					unionStr.Init(unionValueTable.Bytes, unionValueTable.Pos)
-					arr = append(arr, string(unionStr.Value()))
+				switch v := field.GetValueType().(type) {
+					case *v1.Field_StrArray:
+						for j := 0; j < len(v.StrArray.Value); j++ {
+							if sub == j {
+								arr = append(arr, v.StrArray.Value[j])
+							}
+						}
+					case *v1.Field_IntArray:
+						for t := 0; t < len(v.IntArray.Value); t++ {
+							arr = append(arr, fmt.Sprint(v.IntArray.Value[t]))
+						}
+					case *v1.Field_Int:
+						arr = append(arr, fmt.Sprint(v.Int.Value))
+					case *v1.Field_Str:
+						arr = append(arr, fmt.Sprint(v.Str.Value))
 				}
 			}
 		}
@@ -205,32 +192,24 @@ func (t *TraceServer) Write(TraceWriteServer v1.Trace_WriteServer) error {
 			return errors.New("invalid seriesID")
 		}
 		seriesID := []byte(str)
-		shardNum := shardEventData.Shard(nil).Id()
+		shardNum := shardEventData.GetShard().GetId()
 		if shardNum < 1 {
 			shardNum = 1
 		}
-		shardID, shardIdError := partition.ShardID(seriesID, uint(shardNum))
+		shardID, shardIdError := partition.ShardID(seriesID, uint32(shardNum))
 		if shardIdError != nil {
 			return shardIdError
 		}
 		log.Println("shardID:", shardID)
-		builder := flatbuffers.NewBuilder(0)
-		v1.WriteResponseStart(builder)
-		builder.Finish(v1.WriteResponseEnd(builder))
-		if errSend := TraceWriteServer.Send(builder); errSend != nil {
+		if errSend := TraceWriteServer.Send(nil); errSend != nil {
 			return errSend
 		}
 		//queue
 	}
 }
 
-func (t *TraceServer) Query(ctx context.Context, entityCriteria *v1.EntityCriteria) (*flatbuffers.Builder, error) {
+func (t *TraceServer) Query(ctx context.Context, entityCriteria *v1.EntityCriteria) error { // *v1.QueryResponse,
 	log.Println("entityCriteria:", entityCriteria)
 
-	// receive entity, then serialize entity
-	b := flatbuffers.NewBuilder(0)
-	v1.EntityStart(b)
-	b.Finish(v1.EntityEnd(b))
-
-	return b, nil
+	return nil
 }
diff --git a/banyand/liaison/grpc/grpc_test.go b/banyand/liaison/grpc/grpc_test.go
index 252d0d6..6a995ad 100644
--- a/banyand/liaison/grpc/grpc_test.go
+++ b/banyand/liaison/grpc/grpc_test.go
@@ -26,10 +26,10 @@ import (
 	"testing"
 	"time"
 
-	grpclib "google.golang.org/grpc"
-	apiv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/v1"
-
+	v1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/v1"
 	"github.com/apache/skywalking-banyandb/banyand/liaison/grpc"
+	"github.com/apache/skywalking-banyandb/pkg/pb"
+	grpclib "google.golang.org/grpc"
 )
 
 var serverAddr = "localhost:17912"
@@ -57,15 +57,16 @@ func Test_trace_write(t *testing.T) {
 
 	client := v1.NewTraceClient(conn)
 	ctx := context.Background()
-	b := fb.NewWriteEntityBuilder()
 	binary := byte(12)
-	criteria, e := b.BuildWriteEntity(
-		b.BuildEntity("entityId", []byte{binary}, "service_instance_id", "service_id_1234", "service_instance_id_43543"),
-		b.BuildMetaData("default", "trace"),
-	)
-	if e != nil {
-		log.Fatalf("Failed to connect: %v", e)
-	}
+	entityValue := pb.NewEntityValueBuilder().
+		EntityID("entityId").
+		DataBinary([]byte{binary}).
+		Fields("service_instance_id", "service_id_1234", "service_instance_id_43543").
+		Build()
+	criteria := pb.NewWriteEntityBuilder().
+		EntityValue(entityValue).
+		Metadata("default", "trace").
+		Build()
 	stream, errorWrite := client.Write(ctx)
 	if errorWrite != nil {
 		log.Fatalf("%v.runWrite(_) = _, %v", client, errorWrite)
@@ -103,21 +104,16 @@ func Test_trace_query(t *testing.T) {
 	client := v1.NewTraceClient(conn)
 	ctx := context.Background()
 	sT, eT := time.Now().Add(-3*time.Hour), time.Now()
-
-	b := fb.NewCriteriaBuilder()
-	builder, e := b.BuildQueryEntity(
-		fb.AddLimit(5),
-		fb.AddOffset(10),
-		b.BuildMetaData("default", "trace"),
-		b.BuildTimeStampNanoSeconds(sT, eT),
-		b.BuildFields("service_id", "=", "my_app", "http.method", "=", "GET"),
-		b.BuildProjection("http.method", "service_id", "service_instance_id"),
-		b.BuildOrderBy("service_instance_id", v1.SortDESC),
-	)
-	if e != nil {
-		log.Fatalf("Failed to connect: %v", e)
-	}
-	stream, errRev := client.Query(ctx, builder)
+	criteria := pb.NewEntityCriteriaBuilder().
+		Limit(5).
+		Offset(10).
+		OrderBy("service_instance_id", v1.QueryOrder_SORT_DESC).
+		Metadata("default", "trace").
+		Projection("http.method", "service_id", "service_instance_id").
+		Fields("service_id", "=", "my_app", "http.method", "=", "GET").
+		TimeRange(sT, eT).
+		Build()
+	stream, errRev := client.Query(ctx, criteria)
 	if errRev != nil {
 		log.Fatalf("Retrieve client failed: %v", errRev)
 	}
diff --git a/banyand/series/trace/write.go b/banyand/series/trace/write.go
index 2b4466f..305ab43 100644
--- a/banyand/series/trace/write.go
+++ b/banyand/series/trace/write.go
@@ -73,7 +73,10 @@ func (t *traceSeries) Write(seriesID common.SeriesID, shardID uint, entity data.
 	if err = wp.Writer(shardID, chunkIDMapping).Put(chunkIDBytes, bydb_bytes.Join(stateBytes, seriesIDBytes, tsBytes)); err != nil {
 		return 0, errors.Wrap(err, "failed to write chunkID index")
 	}
-	traceIDShardID := partition.ShardID(traceID, t.shardNum)
+	traceIDShardID, shardIdError := partition.ShardID(traceID, t.shardNum)
+	if shardIdError != nil {
+		return 0, shardIdError
+	}
 	if err = wp.TimeSeriesWriter(traceIDShardID, traceIndex).
 		Put(traceID, bydb_bytes.Join(convert.Uint16ToBytes(uint16(shardID)), chunkIDBytes), tts); err != nil {
 		return 0, errors.Wrap(err, "failed to Trace index")
diff --git a/pkg/partition/route.go b/pkg/partition/route.go
index 2928eb7..b1f00ca 100644
--- a/pkg/partition/route.go
+++ b/pkg/partition/route.go
@@ -22,7 +22,10 @@ import (
 	"github.com/pkg/errors"
 )
 
-func ShardID(key []byte, shardNum uint32) uint {
+func ShardID(key []byte, shardNum uint32) (uint, error) {
+	if shardNum > 0 {
+		return 0, errors.New("invalid shardNum")
+	}
 	encodeKey := convert.Hash(key)
 	return uint(encodeKey % uint64(shardNum)), nil
 }
diff --git a/pkg/query/logical/schema.go b/pkg/query/logical/schema.go
index 9925d1d..9911daa 100644
--- a/pkg/query/logical/schema.go
+++ b/pkg/query/logical/schema.go
@@ -27,6 +27,7 @@ import (
 
 type Schema interface {
 	IndexDefined(string) (bool, *apiv1.IndexObject)
+	FieldSubscript(string) (bool, int)
 	FieldDefined(string) bool
 	CreateRef(names ...string) ([]*FieldRef, error)
 	Map(refs ...*FieldRef) Schema
@@ -64,16 +65,11 @@ func (s *schema) IndexDefined(field string) (bool, *apiv1.IndexObject) {
 
 func (s *schema) FieldSubscript(field string) (bool, int) {
 	idxRule := s.indexRule.Spec
-	for i := 0; i < idxRule.ObjectsLength(); i++ {
-		var idxObj apiv1.IndexObject
-		if ok := idxRule.Objects(&idxObj, i); ok {
-			for j := 0; j < idxObj.FieldsLength(); j++ {
-				if field == string(idxObj.Fields(j)) {
-					return true, i
-				}
+	for i, indexObj := range idxRule.GetObjects() {
+		for _, fieldName := range indexObj.GetFields() {
+			if field == fieldName {
+				return true, i
 			}
-		} else {
-			return false, -1
 		}
 	}
 	return false, -1