You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ha...@apache.org on 2021/07/27 09:18:28 UTC
[skywalking-banyandb] 09/11: feat: use protobuf
This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch gao-feat/rpc
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 3f61301a339f73cb85dbd830e2d55806983ee14b
Author: Qiuxia Fan <fi...@outlook.com>
AuthorDate: Tue Jul 27 13:16:31 2021 +0800
feat: use protobuf
---
api/fbs/v1/write.fbs | 75 -----------------------------------
banyand/liaison/grpc/grpc.go | 83 +++++++++++++++------------------------
banyand/liaison/grpc/grpc_test.go | 48 +++++++++++-----------
banyand/series/trace/write.go | 5 ++-
pkg/partition/route.go | 5 ++-
pkg/query/logical/schema.go | 14 +++----
6 files changed, 66 insertions(+), 164 deletions(-)
diff --git a/api/fbs/v1/write.fbs b/api/fbs/v1/write.fbs
deleted file mode 100644
index ac7314c..0000000
--- a/api/fbs/v1/write.fbs
+++ /dev/null
@@ -1,75 +0,0 @@
-// Licensed to Apache Software Foundation (ASF) under one or more contributor
-// license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright
-// ownership. Apache Software Foundation (ASF) licenses this file to you under
-// the Apache License, Version 2.0 (the "License"); you may
-// not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-include "database.fbs";
-
-namespace banyandb.v1;
-
-table String {
- value: string;
-}
-
-table Int {
- value: int64;
-}
-
-table StringArray {
- value: [string];
-}
-
-table IntArray {
- value: [int64];
-}
-
-union ValueType {
- String,
- StringArray,
- Int,
- IntArray
-}
-
-table Field {
- value: ValueType;
-}
-
-table EntityValue {
- // entity_id could be span_id of a Span or segment_id of a Segment in the context of Trace
- entity_id: string;
- // timestamp_nanoseconds is in the timeunit of nanoseconds. It represents
- // 1) either the start time of a Span/Segment,
- // 2) or the timestamp of a log
- timestamp_nanoseconds: uint64;
- // binary representation of segments, including tags, spans...
- data_binary: [ubyte];
- // support all of indexed fields in the fields.
- // Field only has value, as the value of ValueType match with the key
- // by the index rules and index rule bindings of Metadata group.
- // indexed fields of multiple entities are compression in the fields.
- fields: [Field];
-}
-
-table WriteEntity {
- // the mate_data is only required in the first write.
- meta_data: Metadata;
- // the entity is required.
- entity: EntityValue;
-}
-
-table WriteResponse {}
-
-root_type WriteEntity;
-root_type EntityValue;
diff --git a/banyand/liaison/grpc/grpc.go b/banyand/liaison/grpc/grpc.go
index 43efff6..bc0f9ca 100644
--- a/banyand/liaison/grpc/grpc.go
+++ b/banyand/liaison/grpc/grpc.go
@@ -19,10 +19,11 @@ package grpc
import (
"context"
+ "fmt"
+ "github.com/apache/skywalking-banyandb/api/common"
+ apischema "github.com/apache/skywalking-banyandb/api/schema"
"net"
- "google.golang.org/grpc"
-
"github.com/apache/skywalking-banyandb/api/event"
v1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/v1"
"github.com/apache/skywalking-banyandb/banyand/discovery"
@@ -30,21 +31,19 @@ import (
"github.com/apache/skywalking-banyandb/pkg/bus"
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/partition"
- logical "github.com/apache/skywalking-banyandb/pkg/query/logical"
+ "github.com/apache/skywalking-banyandb/pkg/query/logical"
"github.com/apache/skywalking-banyandb/pkg/run"
- flatbuffers "github.com/google/flatbuffers/go"
"github.com/pkg/errors"
grpclib "google.golang.org/grpc"
"io"
"log"
- "net"
"strings"
)
type Server struct {
addr string
log *logger.Logger
- ser *grpc.Server
+ ser *grpclib.Server
pipeline queue.Queue
repo discovery.ServiceRepo
shardInfo *shardInfo
@@ -127,7 +126,7 @@ func (s *Server) Serve() error {
s.log.Fatal().Err(err).Msg("Failed to listen")
}
- s.ser = grpc.NewServer()
+ s.ser = grpclib.NewServer()
// TODO: add server implementation here
v1.RegisterTraceServer(s.ser, v1.UnimplementedTraceServer{})
@@ -153,50 +152,38 @@ func (t *TraceServer) Write(TraceWriteServer v1.Trace_WriteServer) error {
return err
}
- //log.Println("writeEntity:", writeEntity)
+ log.Println("writeEntity:", writeEntity)
ana := logical.DefaultAnalyzer()
metadata := common.Metadata{
KindVersion: apischema.SeriesKindVersion,
- Spec: writeEntity.MetaData(nil),
+ Spec: writeEntity.GetMetadata(),
}
schema, ruleError := ana.BuildTraceSchema(context.TODO(), metadata)
if ruleError != nil {
return ruleError
}
- seriesIdLen := seriesEventData.FieldNamesCompositeSeriesIdLength()
+ seriesIdLen := len(seriesEventData.FieldNamesCompositeSeriesId)
var str string
var arr []string
for i := 0; i < seriesIdLen; i++ {
- id := seriesEventData.FieldNamesCompositeSeriesId(i)
- if defined, sub := schema.FieldSubscript(string(id)); defined {
+ id := seriesEventData.FieldNamesCompositeSeriesId[i]
+ if defined, sub := schema.FieldSubscript(id); defined {
var field v1.Field
- if ok := writeEntity.Entity(nil).Fields(&field, sub); !ok {
- return nil
- }
- unionValueTable := new(flatbuffers.Table)
- if ok := field.Value(unionValueTable); !ok {
- return nil
- }
- if field.ValueType() == v1.ValueTypeStringArray {
- unionStrArr := new(v1.StringArray)
- unionStrArr.Init(unionValueTable.Bytes, unionValueTable.Pos)
- for j := 0; j < unionStrArr.ValueLength(); j++ {
- arr = append(arr, string(unionStrArr.Value(j)))
- }
- } else if field.ValueType() == v1.ValueTypeIntArray {
- unionIntArr := new(v1.IntArray)
- unionIntArr.Init(unionValueTable.Bytes, unionValueTable.Pos)
- for t := 0; t < unionIntArr.ValueLength(); t++ {
- arr = append(arr, fmt.Sprint(unionIntArr.Value(t)))
- }
- } else if field.ValueType() == v1.ValueTypeInt {
- unionInt := new(v1.Int)
- unionInt.Init(unionValueTable.Bytes, unionValueTable.Pos)
- arr = append(arr, fmt.Sprint(unionInt.Value()))
- } else if field.ValueType() == v1.ValueTypeString {
- unionStr := new(v1.String)
- unionStr.Init(unionValueTable.Bytes, unionValueTable.Pos)
- arr = append(arr, string(unionStr.Value()))
+ switch v := field.GetValueType().(type) {
+ case *v1.Field_StrArray:
+ for j := 0; j < len(v.StrArray.Value); j++ {
+ if sub == j {
+ arr = append(arr, v.StrArray.Value[j])
+ }
+ }
+ case *v1.Field_IntArray:
+ for t := 0; t < len(v.IntArray.Value); t++ {
+ arr = append(arr, fmt.Sprint(v.IntArray.Value[t]))
+ }
+ case *v1.Field_Int:
+ arr = append(arr, fmt.Sprint(v.Int.Value))
+ case *v1.Field_Str:
+ arr = append(arr, fmt.Sprint(v.Str.Value))
}
}
}
@@ -205,32 +192,24 @@ func (t *TraceServer) Write(TraceWriteServer v1.Trace_WriteServer) error {
return errors.New("invalid seriesID")
}
seriesID := []byte(str)
- shardNum := shardEventData.Shard(nil).Id()
+ shardNum := shardEventData.GetShard().GetId()
if shardNum < 1 {
shardNum = 1
}
- shardID, shardIdError := partition.ShardID(seriesID, uint(shardNum))
+ shardID, shardIdError := partition.ShardID(seriesID, uint32(shardNum))
if shardIdError != nil {
return shardIdError
}
log.Println("shardID:", shardID)
- builder := flatbuffers.NewBuilder(0)
- v1.WriteResponseStart(builder)
- builder.Finish(v1.WriteResponseEnd(builder))
- if errSend := TraceWriteServer.Send(builder); errSend != nil {
+ if errSend := TraceWriteServer.Send(nil); errSend != nil {
return errSend
}
//queue
}
}
-func (t *TraceServer) Query(ctx context.Context, entityCriteria *v1.EntityCriteria) (*flatbuffers.Builder, error) {
+func (t *TraceServer) Query(ctx context.Context, entityCriteria *v1.EntityCriteria) error { // *v1.QueryResponse,
log.Println("entityCriteria:", entityCriteria)
- // receive entity, then serialize entity
- b := flatbuffers.NewBuilder(0)
- v1.EntityStart(b)
- b.Finish(v1.EntityEnd(b))
-
- return b, nil
+ return nil
}
diff --git a/banyand/liaison/grpc/grpc_test.go b/banyand/liaison/grpc/grpc_test.go
index 252d0d6..6a995ad 100644
--- a/banyand/liaison/grpc/grpc_test.go
+++ b/banyand/liaison/grpc/grpc_test.go
@@ -26,10 +26,10 @@ import (
"testing"
"time"
- grpclib "google.golang.org/grpc"
- apiv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/v1"
-
+ v1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/v1"
"github.com/apache/skywalking-banyandb/banyand/liaison/grpc"
+ "github.com/apache/skywalking-banyandb/pkg/pb"
+ grpclib "google.golang.org/grpc"
)
var serverAddr = "localhost:17912"
@@ -57,15 +57,16 @@ func Test_trace_write(t *testing.T) {
client := v1.NewTraceClient(conn)
ctx := context.Background()
- b := fb.NewWriteEntityBuilder()
binary := byte(12)
- criteria, e := b.BuildWriteEntity(
- b.BuildEntity("entityId", []byte{binary}, "service_instance_id", "service_id_1234", "service_instance_id_43543"),
- b.BuildMetaData("default", "trace"),
- )
- if e != nil {
- log.Fatalf("Failed to connect: %v", e)
- }
+ entityValue := pb.NewEntityValueBuilder().
+ EntityID("entityId").
+ DataBinary([]byte{binary}).
+ Fields("service_instance_id", "service_id_1234", "service_instance_id_43543").
+ Build()
+ criteria := pb.NewWriteEntityBuilder().
+ EntityValue(entityValue).
+ Metadata("default", "trace").
+ Build()
stream, errorWrite := client.Write(ctx)
if errorWrite != nil {
log.Fatalf("%v.runWrite(_) = _, %v", client, errorWrite)
@@ -103,21 +104,16 @@ func Test_trace_query(t *testing.T) {
client := v1.NewTraceClient(conn)
ctx := context.Background()
sT, eT := time.Now().Add(-3*time.Hour), time.Now()
-
- b := fb.NewCriteriaBuilder()
- builder, e := b.BuildQueryEntity(
- fb.AddLimit(5),
- fb.AddOffset(10),
- b.BuildMetaData("default", "trace"),
- b.BuildTimeStampNanoSeconds(sT, eT),
- b.BuildFields("service_id", "=", "my_app", "http.method", "=", "GET"),
- b.BuildProjection("http.method", "service_id", "service_instance_id"),
- b.BuildOrderBy("service_instance_id", v1.SortDESC),
- )
- if e != nil {
- log.Fatalf("Failed to connect: %v", e)
- }
- stream, errRev := client.Query(ctx, builder)
+ criteria := pb.NewEntityCriteriaBuilder().
+ Limit(5).
+ Offset(10).
+ OrderBy("service_instance_id", v1.QueryOrder_SORT_DESC).
+ Metadata("default", "trace").
+ Projection("http.method", "service_id", "service_instance_id").
+ Fields("service_id", "=", "my_app", "http.method", "=", "GET").
+ TimeRange(sT, eT).
+ Build()
+ stream, errRev := client.Query(ctx, criteria)
if errRev != nil {
log.Fatalf("Retrieve client failed: %v", errRev)
}
diff --git a/banyand/series/trace/write.go b/banyand/series/trace/write.go
index 2b4466f..305ab43 100644
--- a/banyand/series/trace/write.go
+++ b/banyand/series/trace/write.go
@@ -73,7 +73,10 @@ func (t *traceSeries) Write(seriesID common.SeriesID, shardID uint, entity data.
if err = wp.Writer(shardID, chunkIDMapping).Put(chunkIDBytes, bydb_bytes.Join(stateBytes, seriesIDBytes, tsBytes)); err != nil {
return 0, errors.Wrap(err, "failed to write chunkID index")
}
- traceIDShardID := partition.ShardID(traceID, t.shardNum)
+ traceIDShardID, shardIdError := partition.ShardID(traceID, t.shardNum)
+ if shardIdError != nil {
+ return 0, shardIdError
+ }
if err = wp.TimeSeriesWriter(traceIDShardID, traceIndex).
Put(traceID, bydb_bytes.Join(convert.Uint16ToBytes(uint16(shardID)), chunkIDBytes), tts); err != nil {
return 0, errors.Wrap(err, "failed to Trace index")
diff --git a/pkg/partition/route.go b/pkg/partition/route.go
index 2928eb7..b1f00ca 100644
--- a/pkg/partition/route.go
+++ b/pkg/partition/route.go
@@ -22,7 +22,10 @@ import (
"github.com/pkg/errors"
)
-func ShardID(key []byte, shardNum uint32) uint {
+func ShardID(key []byte, shardNum uint32) (uint, error) {
+ if shardNum > 0 {
+ return 0, errors.New("invalid shardNum")
+ }
encodeKey := convert.Hash(key)
return uint(encodeKey % uint64(shardNum)), nil
}
diff --git a/pkg/query/logical/schema.go b/pkg/query/logical/schema.go
index 9925d1d..9911daa 100644
--- a/pkg/query/logical/schema.go
+++ b/pkg/query/logical/schema.go
@@ -27,6 +27,7 @@ import (
type Schema interface {
IndexDefined(string) (bool, *apiv1.IndexObject)
+ FieldSubscript(string) (bool, int)
FieldDefined(string) bool
CreateRef(names ...string) ([]*FieldRef, error)
Map(refs ...*FieldRef) Schema
@@ -64,16 +65,11 @@ func (s *schema) IndexDefined(field string) (bool, *apiv1.IndexObject) {
func (s *schema) FieldSubscript(field string) (bool, int) {
idxRule := s.indexRule.Spec
- for i := 0; i < idxRule.ObjectsLength(); i++ {
- var idxObj apiv1.IndexObject
- if ok := idxRule.Objects(&idxObj, i); ok {
- for j := 0; j < idxObj.FieldsLength(); j++ {
- if field == string(idxObj.Fields(j)) {
- return true, i
- }
+ for i, indexObj := range idxRule.GetObjects() {
+ for _, fieldName := range indexObj.GetFields() {
+ if field == fieldName {
+ return true, i
}
- } else {
- return false, -1
}
}
return false, -1