You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by GitBox <gi...@apache.org> on 2021/08/07 14:35:23 UTC

[GitHub] [skywalking-banyandb] hanahmily commented on a change in pull request #30: feat: Implement Write in the liaison

hanahmily commented on a change in pull request #30:
URL: https://github.com/apache/skywalking-banyandb/pull/30#discussion_r684636138



##########
File path: banyand/liaison/grpc/grpc.go
##########
@@ -116,10 +142,25 @@ func (s *Server) Serve() error {
 	if err != nil {
 		s.log.Fatal().Err(err).Msg("Failed to listen")
 	}
-
-	s.ser = grpc.NewServer()
-	// TODO: add server implementation here
-	v1.RegisterTraceServiceServer(s.ser, v1.UnimplementedTraceServiceServer{})
+	var opts []grpclib.ServerOption
+	var f embed.FS
+	if *tls {
+		if *certFile == "" {
+			serverCert, _ := f.ReadFile("data/x509/server_cert.pem")
+			*certFile = string(serverCert)
+		}
+		if *keyFile == "" {
+			serverKey, _ := f.ReadFile("data/x509/server_key.pem")
+			*keyFile = string(serverKey)
+		}
+		cred, err := credentials.NewServerTLSFromFile(*certFile, *keyFile)
+		if err != nil {
+			log.Fatalf("Failed to generate credentials %v", err)
+		}
+		opts = []grpclib.ServerOption{grpclib.Creds(cred)}
+	}
+	s.ser = grpclib.NewServer(opts...)

Review comment:
       we should create a flag to set [MaxRecvMsgSize](https://github.com/grpc/grpc-go/blob/master/server.go#L347) in case binary data is more than the default size.

##########
File path: banyand/liaison/data/x509/server_cert.pem
##########
@@ -0,0 +1,32 @@
+-----BEGIN CERTIFICATE-----

Review comment:
       This file is for test purposes? If that, pls move them to `liaison/testdata/...`

##########
File path: banyand/liaison/grpc/grpc.go
##########
@@ -128,3 +169,87 @@ func (s *Server) GracefulStop() {
 	s.log.Info().Msg("stopping")
 	s.ser.GracefulStop()
 }
+
+func assemblyWriteData(shardID uint, writeEntity *v1.WriteRequest, seriesID uint64) data.TraceWriteDate {
+	return data.TraceWriteDate{ShardID: shardID, SeriesID: seriesID, WriteRequest: writeEntity}
+}
+
+func (s *Server) Write(TraceWriteServer v1.TraceService_WriteServer) error {
+	for {
+		writeEntity, err := TraceWriteServer.Recv()
+		if err == io.EOF {
+			return nil
+		}
+		if err != nil {
+			return err
+		}
+
+		//log.Println("writeEntity:", writeEntity)
+		ana := logical.DefaultAnalyzer()
+		metadata := common.Metadata{
+			KindVersion: apischema.SeriesKindVersion,
+			Spec:        writeEntity.GetMetadata(),
+		}
+		schema, ruleError := ana.BuildTraceSchema(context.TODO(), metadata)
+		if ruleError != nil {
+			return ruleError
+		}
+		if s.seriesInfo.seriesEvent == nil {

Review comment:
       Data race here. Pls leverage `sync.RWMutex`  to protect seriesEvent.

##########
File path: banyand/liaison/grpc/grpc.go
##########
@@ -116,10 +142,25 @@ func (s *Server) Serve() error {
 	if err != nil {
 		s.log.Fatal().Err(err).Msg("Failed to listen")
 	}
-
-	s.ser = grpc.NewServer()
-	// TODO: add server implementation here
-	v1.RegisterTraceServiceServer(s.ser, v1.UnimplementedTraceServiceServer{})
+	var opts []grpclib.ServerOption
+	var f embed.FS
+	if *tls {
+		if *certFile == "" {
+			serverCert, _ := f.ReadFile("data/x509/server_cert.pem")

Review comment:
       Why would you set default cert and key files here? Once they are absent with tls=true, you should  return an  error in https://github.com/apache/skywalking-banyandb/blob/199c1b0334b8cf4851fe1f1e8b0ebd2bf3f665db/banyand/liaison/grpc/grpc.go#L110

##########
File path: banyand/liaison/grpc/grpc_test.go
##########
@@ -0,0 +1,203 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package grpc_test
+
+import (
+	"context"
+	"io"
+	"log"
+	"os"
+	"path"
+	"testing"
+	"time"
+
+	googleUUID "github.com/google/uuid"
+	"github.com/stretchr/testify/assert"
+	"github.com/stretchr/testify/require"
+	grpclib "google.golang.org/grpc"
+
+	v1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/v1"
+	"github.com/apache/skywalking-banyandb/banyand/discovery"
+	"github.com/apache/skywalking-banyandb/banyand/index"
+	"github.com/apache/skywalking-banyandb/banyand/liaison"
+	"github.com/apache/skywalking-banyandb/banyand/query"
+	"github.com/apache/skywalking-banyandb/banyand/queue"
+	"github.com/apache/skywalking-banyandb/banyand/series/trace"
+	"github.com/apache/skywalking-banyandb/banyand/storage"
+	"github.com/apache/skywalking-banyandb/pkg/logger"
+	"github.com/apache/skywalking-banyandb/pkg/pb"
+)
+
+var (
+	serverAddr = "localhost:17912"
+)
+
+func setupService(t *testing.T, tester *require.Assertions) func() {
+	tester.NoError(logger.Init(logger.Logging{
+		Env:   "dev",
+		Level: "warn",
+	}))
+	// Init `Discovery` module
+	repo, err := discovery.NewServiceRepo(context.Background())
+	tester.NoError(err)
+	tester.NotNil(repo)
+	// Init `Queue` module
+	pipeline, err := queue.NewQueue(context.TODO(), repo)
+	tester.NoError(err)
+	// Init `Database` module
+	db, err := storage.NewDB(context.TODO(), repo)
+	tester.NoError(err)
+	uuid, err := googleUUID.NewUUID()
+	tester.NoError(err)
+	rootPath := path.Join(os.TempDir(), "banyandb-"+uuid.String())
+	tester.NoError(db.FlagSet().Parse([]string{"--root-path=" + rootPath}))
+	// Init `Index` module
+	indexSvc, err := index.NewService(context.TODO(), repo)
+	tester.NoError(err)
+	// Init `Trace` module
+	traceSvc, err := trace.NewService(context.TODO(), db, repo, indexSvc, pipeline)
+	tester.NoError(err)
+	// Init `Query` module
+	executor, err := query.NewExecutor(context.TODO(), repo, indexSvc, traceSvc, traceSvc)
+	tester.NoError(err)
+	// Init `liaison` module
+	tcp, err := liaison.NewEndpoint(context.TODO(), pipeline, repo)
+	tester.NoError(err)
+
+	err = indexSvc.PreRun()
+	tester.NoError(err)
+
+	err = traceSvc.PreRun()
+	tester.NoError(err)
+
+	err = db.PreRun()
+	tester.NoError(err)
+
+	err = executor.PreRun()
+	tester.NoError(err)
+
+	tcp.FlagSet()
+	err = tcp.PreRun()
+	tester.NoError(err)
+
+	go func() {
+		tester.NoError(traceSvc.Serve())
+		tester.NoError(err)
+	}()
+
+	go func() {
+		tester.NoError(tcp.Serve())
+		tester.NoError(err)
+	}()
+
+	ctx, cancelFunc := context.WithTimeout(context.Background(), 10*time.Second)
+	defer cancelFunc()
+
+	tester.True(indexSvc.Ready(ctx, index.MetaExists("default", "sw")))
+
+	return func() {
+		db.GracefulStop()
+		_ = os.RemoveAll(rootPath)
+	}
+}
+
+func TestTraceWrite(t *testing.T) {
+	tester := require.New(t)
+	gracefulStop := setupService(t, tester)
+	defer gracefulStop()
+
+	conn, err := grpclib.Dial(serverAddr, grpclib.WithInsecure())
+	assert.NoError(t, err)
+	defer conn.Close()
+
+	client := v1.NewTraceServiceClient(conn)
+	ctx := context.Background()
+	entityValue := pb.NewEntityValueBuilder().
+		EntityID("entityId").
+		DataBinary([]byte{12}).
+		Fields("trace_id-xxfff.111323",
+			0,
+			"webapp_id",
+			"10.0.0.1_id",
+			"/home_id",
+			"webapp",
+			"10.0.0.1",
+			"/home",
+			300,
+			1622933202000000000).
+		Timestamp(time.Now()).
+		Build()
+	criteria := pb.NewWriteEntityBuilder().
+		EntityValue(entityValue).
+		Metadata("default", "sw").
+		Build()
+	stream, errorWrite := client.Write(ctx)
+	if errorWrite != nil {
+		log.Fatalf("%v.runWrite(_) = _, %v", client, errorWrite)
+	}
+	waitc := make(chan struct{})
+	go func() {
+		for {
+			writeResponse, errRecv := stream.Recv()
+			if errRecv == io.EOF {
+				// read done.
+				close(waitc)
+				return
+			}
+			if errRecv != nil {
+				log.Fatalf("Failed to receive data : %v", errRecv)
+			}
+			log.Println("writeResponse: ", writeResponse)

Review comment:
       to user `assert` to replace `log`

##########
File path: banyand/series/trace/service.go
##########
@@ -65,7 +72,9 @@ func (s *service) PreRun() error {
 		return err
 	}
 	s.schemaMap = make(map[string]*traceSeries, len(schemas))
+	s.writeListener.schemaMap = make(map[string]*traceSeries, len(schemas))
 	s.l = logger.GetLogger(s.Name())
+	s.writeListener.l = logger.GetLogger(s.Name())

Review comment:
       pls assign `s.l` to `s.writeListener.l`

##########
File path: banyand/liaison/grpc/grpc.go
##########
@@ -128,3 +169,87 @@ func (s *Server) GracefulStop() {
 	s.log.Info().Msg("stopping")
 	s.ser.GracefulStop()
 }
+
+func assemblyWriteData(shardID uint, writeEntity *v1.WriteRequest, seriesID uint64) data.TraceWriteDate {
+	return data.TraceWriteDate{ShardID: shardID, SeriesID: seriesID, WriteRequest: writeEntity}
+}
+
+func (s *Server) Write(TraceWriteServer v1.TraceService_WriteServer) error {
+	for {
+		writeEntity, err := TraceWriteServer.Recv()
+		if err == io.EOF {
+			return nil
+		}
+		if err != nil {
+			return err
+		}
+
+		//log.Println("writeEntity:", writeEntity)
+		ana := logical.DefaultAnalyzer()
+		metadata := common.Metadata{
+			KindVersion: apischema.SeriesKindVersion,
+			Spec:        writeEntity.GetMetadata(),
+		}
+		schema, ruleError := ana.BuildTraceSchema(context.TODO(), metadata)
+		if ruleError != nil {
+			return ruleError
+		}
+		if s.seriesInfo.seriesEvent == nil {
+			return errors.New("No seriesEvents")

Review comment:
       Could you create a global error and return it here?

##########
File path: banyand/liaison/grpc/grpc.go
##########
@@ -128,3 +169,87 @@ func (s *Server) GracefulStop() {
 	s.log.Info().Msg("stopping")
 	s.ser.GracefulStop()
 }
+
+func assemblyWriteData(shardID uint, writeEntity *v1.WriteRequest, seriesID uint64) data.TraceWriteDate {
+	return data.TraceWriteDate{ShardID: shardID, SeriesID: seriesID, WriteRequest: writeEntity}
+}
+
+func (s *Server) Write(TraceWriteServer v1.TraceService_WriteServer) error {
+	for {
+		writeEntity, err := TraceWriteServer.Recv()
+		if err == io.EOF {
+			return nil
+		}
+		if err != nil {
+			return err
+		}
+
+		//log.Println("writeEntity:", writeEntity)

Review comment:
       pls remove debugging codes.

##########
File path: banyand/series/trace/service.go
##########
@@ -65,7 +72,9 @@ func (s *service) PreRun() error {
 		return err
 	}
 	s.schemaMap = make(map[string]*traceSeries, len(schemas))
+	s.writeListener.schemaMap = make(map[string]*traceSeries, len(schemas))

Review comment:
       Could  you assign `s.schemaMap` to `s.writeListener.schemaMap` directly?

##########
File path: banyand/liaison/grpc/grpc.go
##########
@@ -19,31 +19,54 @@ package grpc
 
 import (
 	"context"
+	"flag"
+	"fmt"
+	"io"
+	"log"
 	"net"
+	"strings"
+	"time"
 
-	"google.golang.org/grpc"
+	"github.com/pkg/errors"
+	grpclib "google.golang.org/grpc"
+	"google.golang.org/grpc/credentials"
 
+	"github.com/apache/skywalking-banyandb/api/common"
+	"github.com/apache/skywalking-banyandb/api/data"
 	"github.com/apache/skywalking-banyandb/api/event"
 	v1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/v1"
+	apischema "github.com/apache/skywalking-banyandb/api/schema"
 	"github.com/apache/skywalking-banyandb/banyand/discovery"
+	serverData "github.com/apache/skywalking-banyandb/banyand/liaison/data"
 	"github.com/apache/skywalking-banyandb/banyand/queue"
 	"github.com/apache/skywalking-banyandb/pkg/bus"
+	"github.com/apache/skywalking-banyandb/pkg/convert"
 	"github.com/apache/skywalking-banyandb/pkg/logger"
+	"github.com/apache/skywalking-banyandb/pkg/partition"
+	"github.com/apache/skywalking-banyandb/pkg/query/logical"
 	"github.com/apache/skywalking-banyandb/pkg/run"
 )
 
+var (
+	tls      = flag.Bool("tls", false, "Connection uses TLS if true, else plain TCP")

Review comment:
       Exactly.  move these flags to the above function by referring to https://github.com/apache/skywalking-banyandb/blob/199c1b0334b8cf4851fe1f1e8b0ebd2bf3f665db/banyand/liaison/grpc/grpc.go#L104

##########
File path: banyand/liaison/grpc/grpc_test.go
##########
@@ -0,0 +1,203 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package grpc_test
+
+import (
+	"context"
+	"io"
+	"log"
+	"os"
+	"path"
+	"testing"
+	"time"
+
+	googleUUID "github.com/google/uuid"
+	"github.com/stretchr/testify/assert"
+	"github.com/stretchr/testify/require"
+	grpclib "google.golang.org/grpc"
+
+	v1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/v1"
+	"github.com/apache/skywalking-banyandb/banyand/discovery"
+	"github.com/apache/skywalking-banyandb/banyand/index"
+	"github.com/apache/skywalking-banyandb/banyand/liaison"
+	"github.com/apache/skywalking-banyandb/banyand/query"
+	"github.com/apache/skywalking-banyandb/banyand/queue"
+	"github.com/apache/skywalking-banyandb/banyand/series/trace"
+	"github.com/apache/skywalking-banyandb/banyand/storage"
+	"github.com/apache/skywalking-banyandb/pkg/logger"
+	"github.com/apache/skywalking-banyandb/pkg/pb"
+)
+
+var (
+	serverAddr = "localhost:17912"
+)
+
+func setupService(t *testing.T, tester *require.Assertions) func() {
+	tester.NoError(logger.Init(logger.Logging{
+		Env:   "dev",
+		Level: "warn",
+	}))
+	// Init `Discovery` module
+	repo, err := discovery.NewServiceRepo(context.Background())
+	tester.NoError(err)
+	tester.NotNil(repo)
+	// Init `Queue` module
+	pipeline, err := queue.NewQueue(context.TODO(), repo)
+	tester.NoError(err)
+	// Init `Database` module
+	db, err := storage.NewDB(context.TODO(), repo)
+	tester.NoError(err)
+	uuid, err := googleUUID.NewUUID()
+	tester.NoError(err)
+	rootPath := path.Join(os.TempDir(), "banyandb-"+uuid.String())
+	tester.NoError(db.FlagSet().Parse([]string{"--root-path=" + rootPath}))
+	// Init `Index` module
+	indexSvc, err := index.NewService(context.TODO(), repo)
+	tester.NoError(err)
+	// Init `Trace` module
+	traceSvc, err := trace.NewService(context.TODO(), db, repo, indexSvc, pipeline)
+	tester.NoError(err)
+	// Init `Query` module
+	executor, err := query.NewExecutor(context.TODO(), repo, indexSvc, traceSvc, traceSvc)
+	tester.NoError(err)
+	// Init `liaison` module
+	tcp, err := liaison.NewEndpoint(context.TODO(), pipeline, repo)
+	tester.NoError(err)
+
+	err = indexSvc.PreRun()
+	tester.NoError(err)
+
+	err = traceSvc.PreRun()
+	tester.NoError(err)
+
+	err = db.PreRun()
+	tester.NoError(err)
+
+	err = executor.PreRun()
+	tester.NoError(err)
+
+	tcp.FlagSet()
+	err = tcp.PreRun()
+	tester.NoError(err)
+
+	go func() {
+		tester.NoError(traceSvc.Serve())
+		tester.NoError(err)
+	}()
+
+	go func() {
+		tester.NoError(tcp.Serve())
+		tester.NoError(err)
+	}()
+
+	ctx, cancelFunc := context.WithTimeout(context.Background(), 10*time.Second)
+	defer cancelFunc()
+
+	tester.True(indexSvc.Ready(ctx, index.MetaExists("default", "sw")))
+
+	return func() {
+		db.GracefulStop()
+		_ = os.RemoveAll(rootPath)
+	}
+}
+
+func TestTraceWrite(t *testing.T) {
+	tester := require.New(t)
+	gracefulStop := setupService(t, tester)
+	defer gracefulStop()
+
+	conn, err := grpclib.Dial(serverAddr, grpclib.WithInsecure())
+	assert.NoError(t, err)
+	defer conn.Close()
+
+	client := v1.NewTraceServiceClient(conn)
+	ctx := context.Background()
+	entityValue := pb.NewEntityValueBuilder().
+		EntityID("entityId").
+		DataBinary([]byte{12}).
+		Fields("trace_id-xxfff.111323",
+			0,
+			"webapp_id",
+			"10.0.0.1_id",
+			"/home_id",
+			"webapp",
+			"10.0.0.1",
+			"/home",
+			300,
+			1622933202000000000).
+		Timestamp(time.Now()).
+		Build()
+	criteria := pb.NewWriteEntityBuilder().
+		EntityValue(entityValue).
+		Metadata("default", "sw").
+		Build()
+	stream, errorWrite := client.Write(ctx)
+	if errorWrite != nil {
+		log.Fatalf("%v.runWrite(_) = _, %v", client, errorWrite)
+	}
+	waitc := make(chan struct{})
+	go func() {
+		for {
+			writeResponse, errRecv := stream.Recv()
+			if errRecv == io.EOF {
+				// read done.
+				close(waitc)
+				return
+			}
+			if errRecv != nil {
+				log.Fatalf("Failed to receive data : %v", errRecv)

Review comment:
       to use `assert.NoError`(testify) here

##########
File path: banyand/liaison/grpc/grpc.go
##########
@@ -128,3 +169,87 @@ func (s *Server) GracefulStop() {
 	s.log.Info().Msg("stopping")
 	s.ser.GracefulStop()
 }
+
+func assemblyWriteData(shardID uint, writeEntity *v1.WriteRequest, seriesID uint64) data.TraceWriteDate {
+	return data.TraceWriteDate{ShardID: shardID, SeriesID: seriesID, WriteRequest: writeEntity}
+}
+
+func (s *Server) Write(TraceWriteServer v1.TraceService_WriteServer) error {
+	for {
+		writeEntity, err := TraceWriteServer.Recv()
+		if err == io.EOF {
+			return nil
+		}
+		if err != nil {
+			return err
+		}
+
+		//log.Println("writeEntity:", writeEntity)
+		ana := logical.DefaultAnalyzer()
+		metadata := common.Metadata{
+			KindVersion: apischema.SeriesKindVersion,
+			Spec:        writeEntity.GetMetadata(),
+		}
+		schema, ruleError := ana.BuildTraceSchema(context.TODO(), metadata)
+		if ruleError != nil {
+			return ruleError
+		}
+		if s.seriesInfo.seriesEvent == nil {
+			return errors.New("No seriesEvents")
+		}
+		seriesIDLen := len(s.seriesInfo.seriesEvent.FieldNamesCompositeSeriesId)
+		var str string
+		var arr []string
+		for i := 0; i < seriesIDLen; i++ {
+			id := s.seriesInfo.seriesEvent.FieldNamesCompositeSeriesId[i]
+			if defined, sub := schema.FieldSubscript(id); defined {
+				for _, field := range writeEntity.GetEntity().GetFields() {
+					switch v := field.GetValueType().(type) {
+					case *v1.Field_StrArray:
+						for j := 0; j < len(v.StrArray.Value); j++ {
+							if sub == j {
+								arr = append(arr, v.StrArray.Value[j])
+							}
+						}
+					case *v1.Field_IntArray:
+						for t := 0; t < len(v.IntArray.Value); t++ {
+							arr = append(arr, fmt.Sprint(v.IntArray.Value[t]))
+						}
+					case *v1.Field_Int:
+						arr = append(arr, fmt.Sprint(v.Int.Value))
+					case *v1.Field_Str:
+						arr = append(arr, fmt.Sprint(v.Str.Value))
+					}
+				}
+			}
+		}
+		str = strings.Join(arr, "")
+		if str == "" {
+			return errors.New("invalid seriesID")
+		}
+		seriesID := []byte(str)
+		shardNum := s.shardInfo.shardEvent.GetShard().GetId()

Review comment:
       shardEvent might be nil, pls check whether it's nil before read it. and `sync.RWMutex` is also needed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org