You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ha...@apache.org on 2022/01/14 00:07:42 UTC
[skywalking-banyandb] 01/01: Introduce ginkgo+gomega test framework
This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch test-ginkgo
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 5a15e6b13bbb621b3f1cd2a3fa98030d09df79fd
Author: Gao Hongtao <ha...@gmail.com>
AuthorDate: Fri Jan 14 00:05:23 2022 +0000
Introduce ginkgo+gomega test framework
---
banyand/liaison/grpc/discovery.go | 41 ++++
.../liaison/grpc/grpc_suite_test.go | 21 +-
banyand/liaison/grpc/{stream.go => measure.go} | 50 ++--
banyand/liaison/grpc/registry_test.go | 253 +++++++++------------
banyand/liaison/grpc/server.go | 58 +++--
banyand/liaison/grpc/stream.go | 22 +-
banyand/liaison/grpc/stream_test.go | 246 +++++++-------------
go.mod | 5 +
go.sum | 26 +++
pkg/logger/logger.go | 5 +
pkg/test/space.go | 9 +
11 files changed, 342 insertions(+), 394 deletions(-)
diff --git a/banyand/liaison/grpc/discovery.go b/banyand/liaison/grpc/discovery.go
index e7e2d76..a383589 100644
--- a/banyand/liaison/grpc/discovery.go
+++ b/banyand/liaison/grpc/discovery.go
@@ -20,13 +20,54 @@ package grpc
import (
"sync"
+ "github.com/apache/skywalking-banyandb/api/common"
commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+ modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+ "github.com/apache/skywalking-banyandb/banyand/queue"
+ "github.com/apache/skywalking-banyandb/banyand/tsdb"
"github.com/apache/skywalking-banyandb/pkg/bus"
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/partition"
+ "github.com/pkg/errors"
)
+var ErrNotExist = errors.New("the object doesn't exist")
+
+type discoveryService struct {
+ shardRepo *shardRepo
+ entityRepo *entityRepo
+ pipeline queue.Queue
+ log *logger.Logger
+}
+
+func newDiscoveryService(pipeline queue.Queue) *discoveryService {
+ return &discoveryService{
+ shardRepo: &shardRepo{shardEventsMap: make(map[identity]uint32)},
+ entityRepo: &entityRepo{entitiesMap: make(map[identity]partition.EntityLocator)},
+ pipeline: pipeline,
+ }
+}
+
+func (ds *discoveryService) SetLogger(log *logger.Logger) {
+ ds.log = log
+ ds.shardRepo.log = log
+ ds.entityRepo.log = log
+}
+
+func (ds *discoveryService) navigate(metadata *commonv1.Metadata, tagFamilies []*modelv1.TagFamilyForWrite) (tsdb.Entity, common.ShardID, error) {
+ id := getID(metadata)
+ shardNum, existed := ds.shardRepo.shardNum(id)
+ if !existed {
+ return nil, common.ShardID(0), errors.Wrapf(ErrNotExist, "finding the shard num by: %v", metadata)
+ }
+ locator, existed := ds.entityRepo.getLocator(id)
+ if !existed {
+ return nil, common.ShardID(0), errors.Wrapf(ErrNotExist, "finding the locator by: %v", metadata)
+ }
+ return locator.Locate(tagFamilies, shardNum)
+}
+
type identity struct {
name string
group string
diff --git a/pkg/test/space.go b/banyand/liaison/grpc/grpc_suite_test.go
similarity index 67%
copy from pkg/test/space.go
copy to banyand/liaison/grpc/grpc_suite_test.go
index bbe7ea3..ae8f957 100644
--- a/pkg/test/space.go
+++ b/banyand/liaison/grpc/grpc_suite_test.go
@@ -15,23 +15,16 @@
// specific language governing permissions and limitations
// under the License.
-package test
+package grpc_test
import (
- "fmt"
- "io/ioutil"
- "os"
+ "testing"
- "github.com/stretchr/testify/require"
+ . "github.com/onsi/ginkgo/v2"
+ . "github.com/onsi/gomega"
)
-func Space(t *require.Assertions) (tempDir string, deferFunc func()) {
- var tempDirErr error
- tempDir, tempDirErr = ioutil.TempDir("", "banyandb-test-*")
- t.Nil(tempDirErr)
- return tempDir, func() {
- if err := os.RemoveAll(tempDir); err != nil {
- _, _ = fmt.Fprintf(os.Stderr, "Error while removing dir: %v\n", err)
- }
- }
+func TestGrpc(t *testing.T) {
+ RegisterFailHandler(Fail)
+ RunSpecs(t, "Grpc Suite")
}
diff --git a/banyand/liaison/grpc/stream.go b/banyand/liaison/grpc/measure.go
similarity index 51%
copy from banyand/liaison/grpc/stream.go
copy to banyand/liaison/grpc/measure.go
index df831c4..530b024 100644
--- a/banyand/liaison/grpc/stream.go
+++ b/banyand/liaison/grpc/measure.go
@@ -18,67 +18,47 @@
package grpc
import (
- "context"
"io"
"time"
"github.com/apache/skywalking-banyandb/api/data"
- streamv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
+ measurev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
"github.com/apache/skywalking-banyandb/banyand/tsdb"
"github.com/apache/skywalking-banyandb/pkg/bus"
)
-func (s *Server) Write(stream streamv1.StreamService_WriteServer) error {
+type measureService struct {
+ *discoveryService
+ measurev1.UnimplementedMeasureServiceServer
+}
+
+func (ms *measureService) Write(measure measurev1.MeasureService_WriteServer) error {
for {
- writeEntity, err := stream.Recv()
+ writeRequest, err := measure.Recv()
if err == io.EOF {
return nil
}
if err != nil {
return err
}
- id := getID(writeEntity.GetMetadata())
- shardNum, existed := s.shardRepo.shardNum(id)
- if !existed {
- continue
- }
- locator, existed := s.entityRepo.getLocator(id)
- if !existed {
- continue
- }
- entity, shardID, err := locator.Locate(writeEntity.GetElement().TagFamilies, shardNum)
+ entity, shardID, err := ms.navigate(writeRequest.GetMetadata(), writeRequest.GetDataPoint().GetTagFamilies())
if err != nil {
- s.log.Error().Err(err).Msg("failed to locate write target")
+ ms.log.Error().Err(err).Msg("failed to navigate to the write target")
continue
}
- message := bus.NewMessage(bus.MessageID(time.Now().UnixNano()), &streamv1.InternalWriteRequest{
- Request: writeEntity,
+ message := bus.NewMessage(bus.MessageID(time.Now().UnixNano()), &measurev1.InternalWriteRequest{
+ Request: writeRequest,
ShardId: uint32(shardID),
SeriesHash: tsdb.HashEntity(entity),
})
- _, errWritePub := s.pipeline.Publish(data.TopicStreamWrite, message)
+ _, errWritePub := ms.pipeline.Publish(data.TopicMeasureWrite, message)
if errWritePub != nil {
return errWritePub
}
- if errSend := stream.Send(&streamv1.WriteResponse{}); errSend != nil {
+ if errSend := measure.Send(&measurev1.WriteResponse{}); errSend != nil {
return errSend
}
}
}
-func (s *Server) Query(_ context.Context, entityCriteria *streamv1.QueryRequest) (*streamv1.QueryResponse, error) {
- message := bus.NewMessage(bus.MessageID(time.Now().UnixNano()), entityCriteria)
- feat, errQuery := s.pipeline.Publish(data.TopicStreamQuery, message)
- if errQuery != nil {
- return nil, errQuery
- }
- msg, errFeat := feat.Get()
- if errFeat != nil {
- return nil, errFeat
- }
- queryMsg, ok := msg.Data().([]*streamv1.Element)
- if !ok {
- return nil, ErrQueryMsg
- }
- return &streamv1.QueryResponse{Elements: queryMsg}, nil
-}
+//TODO: implement topN & Query
diff --git a/banyand/liaison/grpc/registry_test.go b/banyand/liaison/grpc/registry_test.go
index 1c21f8a..13717e1 100644
--- a/banyand/liaison/grpc/registry_test.go
+++ b/banyand/liaison/grpc/registry_test.go
@@ -15,173 +15,126 @@
// specific language governing permissions and limitations
// under the License.
-package grpc
+package grpc_test
import (
"context"
- "testing"
- "github.com/stretchr/testify/require"
"google.golang.org/grpc"
"google.golang.org/grpc/status"
commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
"github.com/apache/skywalking-banyandb/banyand/metadata/schema"
-)
-
-func TestStreamRegistry(t *testing.T) {
- req := require.New(t)
- gracefulStop := setup(req, testData{
- TLS: false,
- addr: "localhost:17912",
- })
- defer gracefulStop()
- conn, err := grpc.Dial("localhost:17912", grpc.WithInsecure())
- req.NoError(err)
- req.NotNil(conn)
-
- client := databasev1.NewStreamRegistryServiceClient(conn)
- req.NotNil(client)
+ . "github.com/onsi/ginkgo/v2"
+ . "github.com/onsi/gomega"
+)
+var _ = Describe("Registry", func() {
+ var gracefulStop func()
+ var conn *grpc.ClientConn
meta := &commonv1.Metadata{
Group: "default",
- Name: "sw",
}
-
- getResp, err := client.Get(context.TODO(), &databasev1.StreamRegistryServiceGetRequest{Metadata: meta})
-
- req.NoError(err)
- req.NotNil(getResp)
-
- // 2 - DELETE
- deleteResp, err := client.Delete(context.TODO(), &databasev1.StreamRegistryServiceDeleteRequest{
- Metadata: meta,
+ BeforeEach(func() {
+ gracefulStop = setup(nil)
+ var err error
+ conn, err = grpc.Dial("localhost:17912", grpc.WithInsecure())
+ Expect(err).NotTo(HaveOccurred())
})
- req.NoError(err)
- req.NotNil(deleteResp)
- req.True(deleteResp.GetDeleted())
-
- // 3 - GET -> Nil
- _, err = client.Get(context.TODO(), &databasev1.StreamRegistryServiceGetRequest{
- Metadata: meta,
- })
- errStatus, _ := status.FromError(err)
- req.Equal(errStatus.Message(), schema.ErrEntityNotFound.Error())
-
- // 4 - CREATE
- _, err = client.Create(context.TODO(), &databasev1.StreamRegistryServiceCreateRequest{Stream: getResp.GetStream()})
- req.NoError(err)
-
- // 5 - GET - > Not Nil
- getResp, err = client.Get(context.TODO(), &databasev1.StreamRegistryServiceGetRequest{
- Metadata: meta,
+ It("manages the stream", func() {
+ client := databasev1.NewStreamRegistryServiceClient(conn)
+ Expect(client).NotTo(BeNil())
+ meta.Name = "sw"
+ getResp, err := client.Get(context.TODO(), &databasev1.StreamRegistryServiceGetRequest{Metadata: meta})
+ Expect(err).ShouldNot(HaveOccurred())
+ Expect(getResp).NotTo(BeNil())
+ By("Cleanup the registry")
+ deleteResp, err := client.Delete(context.TODO(), &databasev1.StreamRegistryServiceDeleteRequest{
+ Metadata: meta,
+ })
+ Expect(err).ShouldNot(HaveOccurred())
+ Expect(deleteResp).NotTo(BeNil())
+ Expect(deleteResp.GetDeleted()).To(BeTrue())
+ By("Verifying the registry empty")
+ _, err = client.Get(context.TODO(), &databasev1.StreamRegistryServiceGetRequest{
+ Metadata: meta,
+ })
+ errStatus, _ := status.FromError(err)
+ Expect(errStatus.Message()).To(Equal(schema.ErrEntityNotFound.Error()))
+ By("Creating a new stream")
+ _, err = client.Create(context.TODO(), &databasev1.StreamRegistryServiceCreateRequest{Stream: getResp.GetStream()})
+ Expect(err).ShouldNot(HaveOccurred())
+ By("Verifying the new stream")
+ getResp, err = client.Get(context.TODO(), &databasev1.StreamRegistryServiceGetRequest{
+ Metadata: meta,
+ })
+ Expect(err).ShouldNot(HaveOccurred())
+ Expect(getResp).NotTo(BeNil())
})
- req.NoError(err)
- req.NotNil(getResp)
-}
-
-func TestIndexRuleBindingRegistry(t *testing.T) {
- req := require.New(t)
- gracefulStop := setup(req, testData{
- TLS: false,
- addr: "localhost:17912",
+ It("manages the index-rule-binding", func() {
+ client := databasev1.NewIndexRuleBindingRegistryServiceClient(conn)
+ Expect(client).NotTo(BeNil())
+ meta.Name = "sw-index-rule-binding"
+ getResp, err := client.Get(context.TODO(), &databasev1.IndexRuleBindingRegistryServiceGetRequest{Metadata: meta})
+ Expect(err).ShouldNot(HaveOccurred())
+ Expect(getResp).NotTo(BeNil())
+ By("Cleanup the registry")
+ deleteResp, err := client.Delete(context.TODO(), &databasev1.IndexRuleBindingRegistryServiceDeleteRequest{
+ Metadata: meta,
+ })
+ Expect(err).ShouldNot(HaveOccurred())
+ Expect(deleteResp).NotTo(BeNil())
+ Expect(deleteResp.GetDeleted()).To(BeTrue())
+ By("Verifying the registry empty")
+ _, err = client.Get(context.TODO(), &databasev1.IndexRuleBindingRegistryServiceGetRequest{
+ Metadata: meta,
+ })
+ errStatus, _ := status.FromError(err)
+ Expect(errStatus.Message()).To(Equal(schema.ErrEntityNotFound.Error()))
+ By("Creating a new index-rule-binding")
+ _, err = client.Create(context.TODO(), &databasev1.IndexRuleBindingRegistryServiceCreateRequest{IndexRuleBinding: getResp.GetIndexRuleBinding()})
+ Expect(err).ShouldNot(HaveOccurred())
+ By("Verifying the new index-rule-binding")
+ getResp, err = client.Get(context.TODO(), &databasev1.IndexRuleBindingRegistryServiceGetRequest{
+ Metadata: meta,
+ })
+ Expect(err).ShouldNot(HaveOccurred())
+ Expect(getResp).NotTo(BeNil())
})
- defer gracefulStop()
-
- conn, err := grpc.Dial("localhost:17912", grpc.WithInsecure())
- req.NoError(err)
- req.NotNil(conn)
-
- client := databasev1.NewIndexRuleBindingRegistryServiceClient(conn)
- req.NotNil(client)
-
- meta := &commonv1.Metadata{
- Group: "default",
- Name: "sw-index-rule-binding",
- }
-
- getResp, err := client.Get(context.TODO(), &databasev1.IndexRuleBindingRegistryServiceGetRequest{Metadata: meta})
-
- req.NoError(err)
- req.NotNil(getResp)
-
- // 2 - DELETE
- deleteResp, err := client.Delete(context.TODO(), &databasev1.IndexRuleBindingRegistryServiceDeleteRequest{
- Metadata: meta,
+ It("manages the index-rule", func() {
+ client := databasev1.NewIndexRuleRegistryServiceClient(conn)
+ Expect(client).NotTo(BeNil())
+ meta.Name = "db.instance"
+ getResp, err := client.Get(context.TODO(), &databasev1.IndexRuleRegistryServiceGetRequest{Metadata: meta})
+ Expect(err).ShouldNot(HaveOccurred())
+ Expect(getResp).NotTo(BeNil())
+ By("Cleanup the registry")
+ deleteResp, err := client.Delete(context.TODO(), &databasev1.IndexRuleRegistryServiceDeleteRequest{
+ Metadata: meta,
+ })
+ Expect(err).ShouldNot(HaveOccurred())
+ Expect(deleteResp).NotTo(BeNil())
+ Expect(deleteResp.GetDeleted()).To(BeTrue())
+ By("Verifying the registry empty")
+ _, err = client.Get(context.TODO(), &databasev1.IndexRuleRegistryServiceGetRequest{
+ Metadata: meta,
+ })
+ errStatus, _ := status.FromError(err)
+ Expect(errStatus.Message()).To(Equal(schema.ErrEntityNotFound.Error()))
+ By("Creating a new index-rule")
+ _, err = client.Create(context.TODO(), &databasev1.IndexRuleRegistryServiceCreateRequest{IndexRule: getResp.GetIndexRule()})
+ Expect(err).ShouldNot(HaveOccurred())
+ By("Verifying the new index-rule")
+ getResp, err = client.Get(context.TODO(), &databasev1.IndexRuleRegistryServiceGetRequest{
+ Metadata: meta,
+ })
+ Expect(err).ShouldNot(HaveOccurred())
+ Expect(getResp).NotTo(BeNil())
})
- req.NoError(err)
- req.NotNil(deleteResp)
- req.True(deleteResp.GetDeleted())
-
- // 3 - GET -> Nil
- _, err = client.Get(context.TODO(), &databasev1.IndexRuleBindingRegistryServiceGetRequest{
- Metadata: meta,
- })
- errStatus, _ := status.FromError(err)
- req.Equal(errStatus.Message(), schema.ErrEntityNotFound.Error())
-
- // 4 - CREATE
- _, err = client.Create(context.TODO(), &databasev1.IndexRuleBindingRegistryServiceCreateRequest{IndexRuleBinding: getResp.GetIndexRuleBinding()})
- req.NoError(err)
-
- // 5 - GET - > Not Nil
- getResp, err = client.Get(context.TODO(), &databasev1.IndexRuleBindingRegistryServiceGetRequest{
- Metadata: meta,
- })
- req.NoError(err)
- req.NotNil(getResp)
-}
-
-func TestIndexRuleRegistry(t *testing.T) {
- req := require.New(t)
- gracefulStop := setup(req, testData{
- TLS: false,
- addr: "localhost:17912",
- })
- defer gracefulStop()
-
- conn, err := grpc.Dial("localhost:17912", grpc.WithInsecure())
- req.NoError(err)
- req.NotNil(conn)
-
- client := databasev1.NewIndexRuleRegistryServiceClient(conn)
- req.NotNil(client)
-
- meta := &commonv1.Metadata{
- Group: "default",
- Name: "db.instance",
- }
-
- getResp, err := client.Get(context.TODO(), &databasev1.IndexRuleRegistryServiceGetRequest{Metadata: meta})
-
- req.NoError(err)
- req.NotNil(getResp)
-
- // 2 - DELETE
- deleteResp, err := client.Delete(context.TODO(), &databasev1.IndexRuleRegistryServiceDeleteRequest{
- Metadata: meta,
- })
- req.NoError(err)
- req.NotNil(deleteResp)
- req.True(deleteResp.GetDeleted())
-
- // 3 - GET -> Nil
- _, err = client.Get(context.TODO(), &databasev1.IndexRuleRegistryServiceGetRequest{
- Metadata: meta,
- })
- errStatus, _ := status.FromError(err)
- req.Equal(errStatus.Message(), schema.ErrEntityNotFound.Error())
-
- // 4 - CREATE
- _, err = client.Create(context.TODO(), &databasev1.IndexRuleRegistryServiceCreateRequest{IndexRule: getResp.GetIndexRule()})
- req.NoError(err)
-
- // 5 - GET - > Not Nil
- getResp, err = client.Get(context.TODO(), &databasev1.IndexRuleRegistryServiceGetRequest{
- Metadata: meta,
+ AfterEach(func() {
+ _ = conn.Close()
+ gracefulStop()
})
- req.NoError(err)
- req.NotNil(getResp)
-}
+})
diff --git a/banyand/liaison/grpc/server.go b/banyand/liaison/grpc/server.go
index 6720d7b..5081754 100644
--- a/banyand/liaison/grpc/server.go
+++ b/banyand/liaison/grpc/server.go
@@ -27,12 +27,13 @@ import (
"github.com/apache/skywalking-banyandb/api/event"
databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+ measurev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
streamv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
"github.com/apache/skywalking-banyandb/banyand/discovery"
"github.com/apache/skywalking-banyandb/banyand/metadata"
"github.com/apache/skywalking-banyandb/banyand/queue"
+ "github.com/apache/skywalking-banyandb/pkg/bus"
"github.com/apache/skywalking-banyandb/pkg/logger"
- "github.com/apache/skywalking-banyandb/pkg/partition"
"github.com/apache/skywalking-banyandb/pkg/run"
)
@@ -56,22 +57,26 @@ type Server struct {
pipeline queue.Queue
repo discovery.ServiceRepo
creds credentials.TransportCredentials
- shardRepo *shardRepo
- entityRepo *entityRepo
+
+ streamSVC *streamService
+ measureSVC *measureService
*streamRegistryServer
*indexRuleBindingRegistryServer
*indexRuleRegistryServer
*measureRegistryServer
*groupRegistryServer
- streamv1.UnimplementedStreamServiceServer
}
func NewServer(_ context.Context, pipeline queue.Queue, repo discovery.ServiceRepo, schemaRegistry metadata.Service) *Server {
return &Server{
- pipeline: pipeline,
- repo: repo,
- shardRepo: &shardRepo{shardEventsMap: make(map[identity]uint32)},
- entityRepo: &entityRepo{entitiesMap: make(map[identity]partition.EntityLocator)},
+ pipeline: pipeline,
+ repo: repo,
+ streamSVC: &streamService{
+ discoveryService: newDiscoveryService(pipeline),
+ },
+ measureSVC: &measureService{
+ discoveryService: newDiscoveryService(pipeline),
+ },
streamRegistryServer: &streamRegistryServer{
schemaRegistry: schemaRegistry,
},
@@ -92,13 +97,34 @@ func NewServer(_ context.Context, pipeline queue.Queue, repo discovery.ServiceRe
func (s *Server) PreRun() error {
s.log = logger.GetLogger("liaison-grpc")
- s.shardRepo.log = s.log
- s.entityRepo.log = s.log
- err := s.repo.Subscribe(event.StreamTopicShardEvent, s.shardRepo)
- if err != nil {
- return err
+ components := []struct {
+ shardEvent bus.Topic
+ entityEvent bus.Topic
+ discoverySVC *discoveryService
+ }{
+ {
+ shardEvent: event.StreamTopicShardEvent,
+ entityEvent: event.StreamTopicEntityEvent,
+ discoverySVC: s.streamSVC.discoveryService,
+ },
+ {
+ shardEvent: event.MeasureTopicShardEvent,
+ entityEvent: event.MeasureTopicEntityEvent,
+ discoverySVC: s.measureSVC.discoveryService,
+ },
+ }
+ for _, c := range components {
+ c.discoverySVC.SetLogger(s.log)
+ err := s.repo.Subscribe(c.shardEvent, c.discoverySVC.shardRepo)
+ if err != nil {
+ return err
+ }
+ err = s.repo.Subscribe(c.entityEvent, c.discoverySVC.entityRepo)
+ if err != nil {
+ return err
+ }
}
- return s.repo.Subscribe(event.StreamTopicEntityEvent, s.entityRepo)
+ return nil
}
func (s *Server) Name() string {
@@ -150,7 +176,9 @@ func (s *Server) Serve() error {
}
opts = append(opts, grpclib.MaxRecvMsgSize(s.maxRecvMsgSize))
s.ser = grpclib.NewServer(opts...)
- streamv1.RegisterStreamServiceServer(s.ser, s)
+
+ streamv1.RegisterStreamServiceServer(s.ser, s.streamSVC)
+ measurev1.RegisterMeasureServiceServer(s.ser, s.measureSVC)
// register *Registry
databasev1.RegisterGroupRegistryServiceServer(s.ser, s.groupRegistryServer)
databasev1.RegisterIndexRuleBindingRegistryServiceServer(s.ser, s.indexRuleBindingRegistryServer)
diff --git a/banyand/liaison/grpc/stream.go b/banyand/liaison/grpc/stream.go
index df831c4..8d125b9 100644
--- a/banyand/liaison/grpc/stream.go
+++ b/banyand/liaison/grpc/stream.go
@@ -28,7 +28,12 @@ import (
"github.com/apache/skywalking-banyandb/pkg/bus"
)
-func (s *Server) Write(stream streamv1.StreamService_WriteServer) error {
+type streamService struct {
+ *discoveryService
+ streamv1.UnimplementedStreamServiceServer
+}
+
+func (s *streamService) Write(stream streamv1.StreamService_WriteServer) error {
for {
writeEntity, err := stream.Recv()
if err == io.EOF {
@@ -37,18 +42,9 @@ func (s *Server) Write(stream streamv1.StreamService_WriteServer) error {
if err != nil {
return err
}
- id := getID(writeEntity.GetMetadata())
- shardNum, existed := s.shardRepo.shardNum(id)
- if !existed {
- continue
- }
- locator, existed := s.entityRepo.getLocator(id)
- if !existed {
- continue
- }
- entity, shardID, err := locator.Locate(writeEntity.GetElement().TagFamilies, shardNum)
+ entity, shardID, err := s.navigate(writeEntity.GetMetadata(), writeEntity.GetElement().GetTagFamilies())
if err != nil {
- s.log.Error().Err(err).Msg("failed to locate write target")
+ s.log.Error().Err(err).Msg("failed to navigate to the write target")
continue
}
message := bus.NewMessage(bus.MessageID(time.Now().UnixNano()), &streamv1.InternalWriteRequest{
@@ -66,7 +62,7 @@ func (s *Server) Write(stream streamv1.StreamService_WriteServer) error {
}
}
-func (s *Server) Query(_ context.Context, entityCriteria *streamv1.QueryRequest) (*streamv1.QueryResponse, error) {
+func (s *streamService) Query(_ context.Context, entityCriteria *streamv1.QueryRequest) (*streamv1.QueryResponse, error) {
message := bus.NewMessage(bus.MessageID(time.Now().UnixNano()), entityCriteria)
feat, errQuery := s.pipeline.Publish(data.TopicStreamQuery, message)
if errQuery != nil {
diff --git a/banyand/liaison/grpc/stream_test.go b/banyand/liaison/grpc/stream_test.go
index 1f30088..01aea1d 100644
--- a/banyand/liaison/grpc/stream_test.go
+++ b/banyand/liaison/grpc/stream_test.go
@@ -1,40 +1,20 @@
-// Licensed to Apache Software Foundation (ASF) under one or more contributor
-// license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright
-// ownership. Apache Software Foundation (ASF) licenses this file to you under
-// the Apache License, Version 2.0 (the "License"); you may
-// not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package grpc
+package grpc_test
import (
"context"
"encoding/base64"
- "fmt"
"io"
"path/filepath"
"runtime"
"sync"
- "testing"
"time"
- "github.com/stretchr/testify/assert"
- "github.com/stretchr/testify/require"
grpclib "google.golang.org/grpc"
"google.golang.org/grpc/credentials"
streamv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
"github.com/apache/skywalking-banyandb/banyand/discovery"
+ "github.com/apache/skywalking-banyandb/banyand/liaison/grpc"
"github.com/apache/skywalking-banyandb/banyand/metadata"
"github.com/apache/skywalking-banyandb/banyand/query"
"github.com/apache/skywalking-banyandb/banyand/queue"
@@ -44,53 +24,73 @@ import (
"github.com/apache/skywalking-banyandb/pkg/run"
"github.com/apache/skywalking-banyandb/pkg/test"
teststream "github.com/apache/skywalking-banyandb/pkg/test/stream"
-)
-
-var _ run.PreRunner = (*preloadStreamService)(nil)
-
-type preloadStreamService struct {
- metaSvc metadata.Service
-}
-
-func (p *preloadStreamService) Name() string {
- return "preload-measure"
-}
-func (p *preloadStreamService) PreRun() error {
- return teststream.PreloadSchema(p.metaSvc.SchemaRegistry())
-}
+ . "github.com/onsi/ginkgo/v2"
+ . "github.com/onsi/gomega"
+)
-type testData struct {
- certFile string
- serverHostOverride string
- TLS bool
- addr string
- basePath string
-}
+var _ = Describe("Stream", func() {
+ var gracefulStop func()
+ var conn *grpclib.ClientConn
+ It("is a plain server", func() {
+ gracefulStop = setup(nil)
+ var err error
+ conn, err = grpclib.Dial("localhost:17912", grpclib.WithInsecure())
+ Expect(err).NotTo(HaveOccurred())
+ streamWrite(conn)
+ Eventually(func() (int, error) {
+ return streamQuery(conn)
+ }).Should(Equal(1))
+ })
+ It("is a TLS server", func() {
+ flags := []string{"--tls=true"}
+ _, currentFile, _, _ := runtime.Caller(0)
+ basePath := filepath.Dir(currentFile)
+ certFile := filepath.Join(basePath, "testdata/server_cert.pem")
+ keyFile := filepath.Join(basePath, "testdata/server_key.pem")
+ flags = append(flags, "--cert-file="+certFile)
+ flags = append(flags, "--key-file="+keyFile)
+ addr := "localhost:17913"
+ flags = append(flags, "--addr="+addr)
+ gracefulStop = setup(flags)
+ creds, err := credentials.NewClientTLSFromFile(certFile, "localhost")
+ Expect(err).NotTo(HaveOccurred())
+ conn, err = grpclib.Dial(addr, grpclib.WithTransportCredentials(creds))
+ Expect(err).NotTo(HaveOccurred())
+ streamWrite(conn)
+ Eventually(func() (int, error) {
+ return streamQuery(conn)
+ }).Should(Equal(1))
+ })
+ AfterEach(func() {
+ _ = conn.Close()
+ gracefulStop()
+ })
+})
-func setup(req *require.Assertions, testData testData) func() {
- req.NoError(logger.Init(logger.Logging{
+func setup(flags []string) func() {
+ Expect(logger.Init(logger.Logging{
Env: "dev",
Level: "warn",
- }))
+ })).Should(Succeed())
g := run.Group{Name: "standalone"}
// Init `Discovery` module
repo, err := discovery.NewServiceRepo(context.Background())
- req.NoError(err)
+ Expect(err).NotTo(HaveOccurred())
// Init `Queue` module
pipeline, err := queue.NewQueue(context.TODO(), repo)
- req.NoError(err)
+ Expect(err).NotTo(HaveOccurred())
// Init `Metadata` module
metaSvc, err := metadata.NewService(context.TODO())
- req.NoError(err)
+ Expect(err).NotTo(HaveOccurred())
// Init `Stream` module
streamSvc, err := stream.NewService(context.TODO(), metaSvc, repo, pipeline)
- req.NoError(err)
+ Expect(err).NotTo(HaveOccurred())
// Init `Query` module
q, err := query.NewExecutor(context.TODO(), streamSvc, metaSvc, repo, pipeline)
- req.NoError(err)
+ Expect(err).NotTo(HaveOccurred())
- tcp := NewServer(context.TODO(), pipeline, repo, metaSvc)
+ tcp := grpc.NewServer(context.TODO(), pipeline, repo, metaSvc)
closer := run.NewTester("closer")
startListener := run.NewTester("started-listener")
@@ -108,18 +108,12 @@ func setup(req *require.Assertions, testData testData) func() {
startListener,
)
// Create a random directory
- rootPath, deferFunc := test.Space(req)
- flags := []string{"--root-path=" + rootPath, "--metadata-root-path=" + teststream.RandomTempDir()}
- if testData.TLS {
- flags = append(flags, "--tls=true")
- certFile := filepath.Join(testData.basePath, "testdata/server_cert.pem")
- keyFile := filepath.Join(testData.basePath, "testdata/server_key.pem")
- flags = append(flags, "--cert-file="+certFile)
- flags = append(flags, "--key-file="+keyFile)
- flags = append(flags, "--addr="+testData.addr)
- }
+ rootPath, deferFunc, err := test.NewSpace()
+ Expect(err).NotTo(HaveOccurred())
+ flags = append(flags, "--root-path="+rootPath, "--metadata-root-path="+teststream.RandomTempDir())
+
err = g.RegisterFlags().Parse(flags)
- req.NoError(err)
+ Expect(err).NotTo(HaveOccurred())
wg := sync.WaitGroup{}
@@ -130,72 +124,27 @@ func setup(req *require.Assertions, testData testData) func() {
errRun := g.Run()
if errRun != nil {
startListener.GracefulStop()
- req.NoError(errRun)
+ Expect(errRun).Should(Succeed())
}
deferFunc()
}()
- req.NoError(startListener.WaitUntilStarted())
+ Expect(startListener.WaitUntilStarted()).Should(Succeed())
return func() {
closer.GracefulStop()
wg.Wait()
}
}
-type caseData struct {
- name string
- queryGenerator func(baseTs time.Time) *streamv1.QueryRequest
- writeGenerator func() *streamv1.WriteRequest
- args testData
- wantLen int
+type preloadStreamService struct {
+ metaSvc metadata.Service
}
-func TestStreamService(t *testing.T) {
- req := require.New(t)
- _, currentFile, _, _ := runtime.Caller(0)
- basePath := filepath.Dir(currentFile)
- certFile := filepath.Join(basePath, "testdata/server_cert.pem")
- testCases := []caseData{
- {
- name: "isTLS",
- queryGenerator: queryCriteria,
- writeGenerator: writeData,
- args: testData{
- TLS: true,
- certFile: certFile,
- serverHostOverride: "localhost",
- addr: "localhost:17913",
- basePath: basePath,
- },
- wantLen: 1,
- },
- {
- name: "noTLS",
- queryGenerator: queryCriteria,
- writeGenerator: writeData,
- args: testData{
- TLS: false,
- addr: "localhost:17912",
- },
- wantLen: 1,
- },
- }
- for _, tc := range testCases {
- t.Run(tc.name, func(t *testing.T) {
- gracefulStop := setup(req, tc.args)
- defer gracefulStop()
- if tc.args.TLS {
- var opts []grpclib.DialOption
- creds, err := credentials.NewClientTLSFromFile(tc.args.certFile, tc.args.serverHostOverride)
- assert.NoError(t, err)
- opts = append(opts, grpclib.WithTransportCredentials(creds))
- dialService(t, tc, opts)
- } else {
- var opts []grpclib.DialOption
- opts = append(opts, grpclib.WithInsecure())
- dialService(t, tc, opts)
- }
- })
- }
+func (p *preloadStreamService) Name() string {
+ return "preload-measure"
+}
+
+func (p *preloadStreamService) PreRun() error {
+ return teststream.PreloadSchema(p.metaSvc.SchemaRegistry())
}
func writeData() *streamv1.WriteRequest {
@@ -227,60 +176,23 @@ func queryCriteria(baseTs time.Time) *streamv1.QueryRequest {
Build()
}
-func dialService(t *testing.T, tc caseData, opts []grpclib.DialOption) {
- conn, err := grpclib.Dial(tc.args.addr, opts...)
- assert.NoError(t, err)
- defer func(conn *grpclib.ClientConn) {
- _ = conn.Close()
- }(conn)
- streamWrite(t, tc, conn)
- requireTester := require.New(t)
- assert.NoError(t, test.Retry(10, 100*time.Millisecond, func() error {
- now := time.Now()
- resp := streamQuery(requireTester, conn, tc.queryGenerator(now))
- if len(resp.GetElements()) == tc.wantLen {
- return nil
- }
- return fmt.Errorf("expected elements number: %d got: %d", tc.wantLen, len(resp.GetElements()))
- }))
-}
-
-func streamWrite(t *testing.T, tc caseData, conn *grpclib.ClientConn) {
+func streamWrite(conn *grpclib.ClientConn) {
client := streamv1.NewStreamServiceClient(conn)
ctx := context.Background()
writeClient, errorWrite := client.Write(ctx)
- if errorWrite != nil {
- t.Errorf("%v.write(_) = _, %v", client, errorWrite)
- }
- waitc := make(chan struct{})
- go func() {
- for {
- writeResponse, errRecv := writeClient.Recv()
- if errRecv == io.EOF {
- // read done.
- close(waitc)
- return
- }
- assert.NoError(t, errRecv)
- assert.NotNil(t, writeResponse)
- }
- }()
- if errSend := writeClient.Send(tc.writeGenerator()); errSend != nil {
- t.Errorf("Failed to send a note: %v", errSend)
- }
- if errorSend := writeClient.CloseSend(); errorSend != nil {
- t.Errorf("Failed to send a note: %v", errorSend)
- }
- <-waitc
+ Expect(errorWrite).Should(Succeed())
+ Expect(writeClient.Send(writeData())).Should(Succeed())
+ Expect(writeClient.CloseSend()).Should(Succeed())
+ Eventually(func() error {
+ _, err := writeClient.Recv()
+ return err
+ }).Should(Equal(io.EOF))
}
-func streamQuery(tester *require.Assertions, conn *grpclib.ClientConn, request *streamv1.QueryRequest) *streamv1.QueryResponse {
+func streamQuery(conn *grpclib.ClientConn) (int, error) {
client := streamv1.NewStreamServiceClient(conn)
ctx := context.Background()
- queryResponse, errRev := client.Query(ctx, request)
- if errRev != nil {
- tester.Errorf(errRev, "Retrieve client failed: %v")
- }
- tester.NotNil(queryResponse)
- return queryResponse
+ resp, err := client.Query(ctx, queryCriteria(time.Now()))
+
+ return len(resp.GetElements()), err
}
diff --git a/go.mod b/go.mod
index adda1b0..91a8dec 100644
--- a/go.mod
+++ b/go.mod
@@ -38,12 +38,14 @@ require (
github.com/dustin/go-humanize v1.0.0 // indirect
github.com/form3tech-oss/jwt-go v3.2.3+incompatible // indirect
github.com/fsnotify/fsnotify v1.4.9 // indirect
+ github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/snappy v0.0.3 // indirect
github.com/google/btree v1.0.1 // indirect
github.com/google/flatbuffers v1.12.1 // indirect
+ github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38 // indirect
github.com/gorilla/websocket v1.4.2 // indirect
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 // indirect
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 // indirect
@@ -58,6 +60,8 @@ require (
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.1 // indirect
github.com/mschoch/smat v0.2.0 // indirect
+ github.com/onsi/ginkgo/v2 v2.0.0 // indirect
+ github.com/onsi/gomega v1.17.0 // indirect
github.com/pelletier/go-toml v1.9.3 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_golang v1.11.0 // indirect
@@ -94,6 +98,7 @@ require (
golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0 // indirect
golang.org/x/text v0.3.6 // indirect
golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba // indirect
+ golang.org/x/tools v0.1.5 // indirect
gopkg.in/ini.v1 v1.62.0 // indirect
gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
diff --git a/go.sum b/go.sum
index 5b62477..425e2d0 100644
--- a/go.sum
+++ b/go.sum
@@ -146,6 +146,8 @@ github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9
github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk=
github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A=
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
+github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0 h1:p104kn46Q8WdvHunIJ9dAyjPVtrBPhSr3KT2yUst43I=
+github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0/go.mod h1:fyg7847qk6SyHyPtNmDHnmrv/HOrqktSC+C9fM+CJOE=
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4=
@@ -222,6 +224,8 @@ github.com/google/pprof v0.0.0-20201023163331-3e6fc7fc9c4c/go.mod h1:kpwsk12EmLe
github.com/google/pprof v0.0.0-20201203190320-1bf35d6f28c2/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE=
github.com/google/pprof v0.0.0-20210122040257-d980be63207e/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE=
github.com/google/pprof v0.0.0-20210226084205-cbba55b83ad5/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE=
+github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38 h1:yAJXTCF9TqKcTiHJAE8dj7HMvPfh66eeA2JYW7eFpSE=
+github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE=
github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
@@ -261,6 +265,7 @@ github.com/hashicorp/logutils v1.0.0/go.mod h1:QIAnNjmIWmVIIkWDTG1z5v++HQmx9WQRO
github.com/hashicorp/mdns v1.0.0/go.mod h1:tL+uN++7HEJ6SQLQ2/p+z2pH24WQKWjBPkE0mNTz8vQ=
github.com/hashicorp/memberlist v0.1.3/go.mod h1:ajVTdAv/9Im8oMAAj5G31PhhMCZJV2pPBoIllUwCN7I=
github.com/hashicorp/serf v0.8.2/go.mod h1:6hOLApaqBFA1NXqRQAsxw9QxuDEvNxSQRwA/JwenrHc=
+github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM=
@@ -325,9 +330,20 @@ github.com/mschoch/smat v0.2.0 h1:8imxQsjDm8yFEAVBe7azKmKSgzSkZXDuKkSq9374khM=
github.com/mschoch/smat v0.2.0/go.mod h1:kc9mz7DoBKqDyiRL7VZN8KvXQMWeTaVnttLRXOlotKw=
github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
+github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=
+github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU=
github.com/oklog/run v1.1.0 h1:GEenZ1cK0+q0+wsJew9qUg/DyD8k3JzYsZAi5gYi2mA=
github.com/oklog/run v1.1.0/go.mod h1:sVPdnTZT1zYwAJeCMu2Th4T21pA3FPOQRfWjQlk7DVU=
github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U=
+github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
+github.com/onsi/ginkgo v1.12.1/go.mod h1:zj2OWP4+oCPe1qIXoGWkgMRwljMUYCdkwsT2108oapk=
+github.com/onsi/ginkgo v1.16.4/go.mod h1:dX+/inL/fNMqNlz0e9LfyB9TswhZpCVdJM/Z6Vvnwo0=
+github.com/onsi/ginkgo/v2 v2.0.0 h1:CcuG/HvWNkkaqCUpJifQY8z7qEMBJya6aLPx6ftGyjQ=
+github.com/onsi/ginkgo/v2 v2.0.0/go.mod h1:vw5CSIxN1JObi/U8gcbwft7ZxR2dgaR70JSE3/PpL4c=
+github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY=
+github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo=
+github.com/onsi/gomega v1.17.0 h1:9Luw4uT5HTjHTN8+aNcSThgH1vdXnmdJ8xIfZ4wyTRE=
+github.com/onsi/gomega v1.17.0/go.mod h1:HnhC7FXeEQY45zxNK3PPoIUhzk/80Xly9PcubAlGdZY=
github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic=
@@ -543,6 +559,7 @@ golang.org/x/mod v0.4.1/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
+golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20181023162649-9b4f9f5ad519/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20181201002055-351d144fa1fc/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
@@ -568,6 +585,7 @@ golang.org/x/net v0.0.0-20200324143707-d3edc9973b7e/go.mod h1:qpuaurCH72eLCgpAm/
golang.org/x/net v0.0.0-20200501053045-e0ff5e5a1de5/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
golang.org/x/net v0.0.0-20200506145744-7e3656a0809f/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
golang.org/x/net v0.0.0-20200513185701-a91f0712d120/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
+golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
golang.org/x/net v0.0.0-20200520182314-0ba52f642ac2/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
golang.org/x/net v0.0.0-20200625001655-4c5254603344/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
golang.org/x/net v0.0.0-20200707034311-ab3426394381/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
@@ -581,6 +599,7 @@ golang.org/x/net v0.0.0-20210119194325-5f4716e94777/go.mod h1:m0MpNAwzfU5UDzcl9v
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.0.0-20210316092652-d523dce5a7f4/go.mod h1:RBQZq4jEuRlivfhVLdyRGr576XBO4/greRjx4P4O3yc=
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
+golang.org/x/net v0.0.0-20210428140749-89ef3d95e781/go.mod h1:OJAsFXCWl8Ukc7SiCT/9KSuxbyM7479/AVlXFRxuMCk=
golang.org/x/net v0.0.0-20210716203947-853a461950ff h1:j2EK/QoxYNBsXI4R7fQkkRUk8y6wnOBI+6hgPdP/6Ds=
golang.org/x/net v0.0.0-20210716203947-853a461950ff/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
@@ -610,6 +629,7 @@ golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20181026203630-95b1ffbd15a5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20181107165924-66b7b1311ac8/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
@@ -623,9 +643,11 @@ golang.org/x/sys v0.0.0-20190507160741-ecd444e8653b/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20190606165138-5da285871e9c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190624142023-c5567b49c5d0/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190726091711-fc99dfbffb4e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20190904154756-749cb33beabd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191001151750-bb3f8db39f24/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20191120155948-bd437916bb0e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191204072324-ce4227a45e2e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191228213918-04cbcbbfeed8/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200106162015-b016eb3dc98e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
@@ -650,6 +672,7 @@ golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201201145000-ef89a241ccb3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210104204734-6f8348627aad/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20210112080510-489259a85091/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210220050731-9a76102bfb43/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
@@ -728,6 +751,7 @@ golang.org/x/tools v0.0.0-20200904185747-39188db58858/go.mod h1:Cj7w3i3Rnn0Xh82u
golang.org/x/tools v0.0.0-20201110124207-079ba7bd75cd/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.0.0-20201201161351-ac6f37ff4c2a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.0.0-20201208233053-a543418bbed2/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
+golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.0.0-20210105154028-b0ab187a4818/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0=
@@ -856,12 +880,14 @@ gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
+gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
gopkg.in/ini.v1 v1.51.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
gopkg.in/ini.v1 v1.62.0 h1:duBzk771uxoUuOlyRLkHsygud9+5lrlGjdFBb4mSKDU=
gopkg.in/ini.v1 v1.62.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
gopkg.in/natefinch/lumberjack.v2 v2.0.0 h1:1Lc07Kr7qY4U2YPouBjpCLxpiyxIVoxqXgkXLknAOE8=
gopkg.in/natefinch/lumberjack.v2 v2.0.0/go.mod h1:l0ndWWf7gzL7RNwBG7wST/UCcT4T24xpD6X8LsfU/+k=
gopkg.in/resty.v1 v1.12.0/go.mod h1:mDo4pnntr5jdWRML875a/NmxYqAlA73dVijT2AXvQQo=
+gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
gopkg.in/yaml.v2 v2.0.0-20170812160011-eb3733d160e7/go.mod h1:JAlM8MvJe8wmxCU4Bli9HhUf9+ttbYbLASfIpnQbh74=
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
diff --git a/pkg/logger/logger.go b/pkg/logger/logger.go
index 1407a4f..36896ed 100644
--- a/pkg/logger/logger.go
+++ b/pkg/logger/logger.go
@@ -46,3 +46,8 @@ func (l *Logger) Named(name string) *Logger {
subLogger := root.Logger.With().Str("module", module).Logger()
return &Logger{module: module, Logger: &subLogger}
}
+
+// Loggable indicates the implement supports logging
+type Loggable interface {
+ SetLogger(*Logger)
+}
diff --git a/pkg/test/space.go b/pkg/test/space.go
index bbe7ea3..8fafdb3 100644
--- a/pkg/test/space.go
+++ b/pkg/test/space.go
@@ -35,3 +35,12 @@ func Space(t *require.Assertions) (tempDir string, deferFunc func()) {
}
}
}
+
+func NewSpace() (tempDir string, deferFunc func(), err error) {
+ tempDir, err = ioutil.TempDir("", "banyandb-test-*")
+ return tempDir, func() {
+ if err = os.RemoveAll(tempDir); err != nil {
+ _, _ = fmt.Fprintf(os.Stderr, "Error while removing dir: %v\n", err)
+ }
+ }, err
+}