You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ha...@apache.org on 2022/01/14 00:07:41 UTC
[skywalking-banyandb] branch test-ginkgo created (now 5a15e6b)
This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a change to branch test-ginkgo
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git.
at 5a15e6b Introduce ginkgo+gomega test framework
This branch includes the following new commits:
new 5a15e6b Introduce ginkgo+gomega test framework
The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails. The revisions
listed as "add" were already present in the repository and have only
been added to this reference.
[skywalking-banyandb] 01/01: Introduce ginkgo+gomega test framework
Posted by ha...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch test-ginkgo
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 5a15e6b13bbb621b3f1cd2a3fa98030d09df79fd
Author: Gao Hongtao <ha...@gmail.com>
AuthorDate: Fri Jan 14 00:05:23 2022 +0000
Introduce ginkgo+gomega test framework
---
banyand/liaison/grpc/discovery.go | 41 ++++
.../liaison/grpc/grpc_suite_test.go | 21 +-
banyand/liaison/grpc/{stream.go => measure.go} | 50 ++--
banyand/liaison/grpc/registry_test.go | 253 +++++++++------------
banyand/liaison/grpc/server.go | 58 +++--
banyand/liaison/grpc/stream.go | 22 +-
banyand/liaison/grpc/stream_test.go | 246 +++++++-------------
go.mod | 5 +
go.sum | 26 +++
pkg/logger/logger.go | 5 +
pkg/test/space.go | 9 +
11 files changed, 342 insertions(+), 394 deletions(-)
diff --git a/banyand/liaison/grpc/discovery.go b/banyand/liaison/grpc/discovery.go
index e7e2d76..a383589 100644
--- a/banyand/liaison/grpc/discovery.go
+++ b/banyand/liaison/grpc/discovery.go
@@ -20,13 +20,54 @@ package grpc
import (
"sync"
+ "github.com/apache/skywalking-banyandb/api/common"
commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+ modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+ "github.com/apache/skywalking-banyandb/banyand/queue"
+ "github.com/apache/skywalking-banyandb/banyand/tsdb"
"github.com/apache/skywalking-banyandb/pkg/bus"
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/partition"
+ "github.com/pkg/errors"
)
+var ErrNotExist = errors.New("the object doesn't exist")
+
+type discoveryService struct {
+ shardRepo *shardRepo
+ entityRepo *entityRepo
+ pipeline queue.Queue
+ log *logger.Logger
+}
+
+func newDiscoveryService(pipeline queue.Queue) *discoveryService {
+ return &discoveryService{
+ shardRepo: &shardRepo{shardEventsMap: make(map[identity]uint32)},
+ entityRepo: &entityRepo{entitiesMap: make(map[identity]partition.EntityLocator)},
+ pipeline: pipeline,
+ }
+}
+
+func (ds *discoveryService) SetLogger(log *logger.Logger) {
+ ds.log = log
+ ds.shardRepo.log = log
+ ds.entityRepo.log = log
+}
+
+func (ds *discoveryService) navigate(metadata *commonv1.Metadata, tagFamilies []*modelv1.TagFamilyForWrite) (tsdb.Entity, common.ShardID, error) {
+ id := getID(metadata)
+ shardNum, existed := ds.shardRepo.shardNum(id)
+ if !existed {
+ return nil, common.ShardID(0), errors.Wrapf(ErrNotExist, "finding the shard num by: %v", metadata)
+ }
+ locator, existed := ds.entityRepo.getLocator(id)
+ if !existed {
+ return nil, common.ShardID(0), errors.Wrapf(ErrNotExist, "finding the locator by: %v", metadata)
+ }
+ return locator.Locate(tagFamilies, shardNum)
+}
+
type identity struct {
name string
group string
diff --git a/pkg/test/space.go b/banyand/liaison/grpc/grpc_suite_test.go
similarity index 67%
copy from pkg/test/space.go
copy to banyand/liaison/grpc/grpc_suite_test.go
index bbe7ea3..ae8f957 100644
--- a/pkg/test/space.go
+++ b/banyand/liaison/grpc/grpc_suite_test.go
@@ -15,23 +15,16 @@
// specific language governing permissions and limitations
// under the License.
-package test
+package grpc_test
import (
- "fmt"
- "io/ioutil"
- "os"
+ "testing"
- "github.com/stretchr/testify/require"
+ . "github.com/onsi/ginkgo/v2"
+ . "github.com/onsi/gomega"
)
-func Space(t *require.Assertions) (tempDir string, deferFunc func()) {
- var tempDirErr error
- tempDir, tempDirErr = ioutil.TempDir("", "banyandb-test-*")
- t.Nil(tempDirErr)
- return tempDir, func() {
- if err := os.RemoveAll(tempDir); err != nil {
- _, _ = fmt.Fprintf(os.Stderr, "Error while removing dir: %v\n", err)
- }
- }
+func TestGrpc(t *testing.T) {
+ RegisterFailHandler(Fail)
+ RunSpecs(t, "Grpc Suite")
}
diff --git a/banyand/liaison/grpc/stream.go b/banyand/liaison/grpc/measure.go
similarity index 51%
copy from banyand/liaison/grpc/stream.go
copy to banyand/liaison/grpc/measure.go
index df831c4..530b024 100644
--- a/banyand/liaison/grpc/stream.go
+++ b/banyand/liaison/grpc/measure.go
@@ -18,67 +18,47 @@
package grpc
import (
- "context"
"io"
"time"
"github.com/apache/skywalking-banyandb/api/data"
- streamv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
+ measurev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
"github.com/apache/skywalking-banyandb/banyand/tsdb"
"github.com/apache/skywalking-banyandb/pkg/bus"
)
-func (s *Server) Write(stream streamv1.StreamService_WriteServer) error {
+type measureService struct {
+ *discoveryService
+ measurev1.UnimplementedMeasureServiceServer
+}
+
+func (ms *measureService) Write(measure measurev1.MeasureService_WriteServer) error {
for {
- writeEntity, err := stream.Recv()
+ writeRequest, err := measure.Recv()
if err == io.EOF {
return nil
}
if err != nil {
return err
}
- id := getID(writeEntity.GetMetadata())
- shardNum, existed := s.shardRepo.shardNum(id)
- if !existed {
- continue
- }
- locator, existed := s.entityRepo.getLocator(id)
- if !existed {
- continue
- }
- entity, shardID, err := locator.Locate(writeEntity.GetElement().TagFamilies, shardNum)
+ entity, shardID, err := ms.navigate(writeRequest.GetMetadata(), writeRequest.GetDataPoint().GetTagFamilies())
if err != nil {
- s.log.Error().Err(err).Msg("failed to locate write target")
+ ms.log.Error().Err(err).Msg("failed to navigate to the write target")
continue
}
- message := bus.NewMessage(bus.MessageID(time.Now().UnixNano()), &streamv1.InternalWriteRequest{
- Request: writeEntity,
+ message := bus.NewMessage(bus.MessageID(time.Now().UnixNano()), &measurev1.InternalWriteRequest{
+ Request: writeRequest,
ShardId: uint32(shardID),
SeriesHash: tsdb.HashEntity(entity),
})
- _, errWritePub := s.pipeline.Publish(data.TopicStreamWrite, message)
+ _, errWritePub := ms.pipeline.Publish(data.TopicMeasureWrite, message)
if errWritePub != nil {
return errWritePub
}
- if errSend := stream.Send(&streamv1.WriteResponse{}); errSend != nil {
+ if errSend := measure.Send(&measurev1.WriteResponse{}); errSend != nil {
return errSend
}
}
}
-func (s *Server) Query(_ context.Context, entityCriteria *streamv1.QueryRequest) (*streamv1.QueryResponse, error) {
- message := bus.NewMessage(bus.MessageID(time.Now().UnixNano()), entityCriteria)
- feat, errQuery := s.pipeline.Publish(data.TopicStreamQuery, message)
- if errQuery != nil {
- return nil, errQuery
- }
- msg, errFeat := feat.Get()
- if errFeat != nil {
- return nil, errFeat
- }
- queryMsg, ok := msg.Data().([]*streamv1.Element)
- if !ok {
- return nil, ErrQueryMsg
- }
- return &streamv1.QueryResponse{Elements: queryMsg}, nil
-}
+//TODO: implement topN & Query
diff --git a/banyand/liaison/grpc/registry_test.go b/banyand/liaison/grpc/registry_test.go
index 1c21f8a..13717e1 100644
--- a/banyand/liaison/grpc/registry_test.go
+++ b/banyand/liaison/grpc/registry_test.go
@@ -15,173 +15,126 @@
// specific language governing permissions and limitations
// under the License.
-package grpc
+package grpc_test
import (
"context"
- "testing"
- "github.com/stretchr/testify/require"
"google.golang.org/grpc"
"google.golang.org/grpc/status"
commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
"github.com/apache/skywalking-banyandb/banyand/metadata/schema"
-)
-
-func TestStreamRegistry(t *testing.T) {
- req := require.New(t)
- gracefulStop := setup(req, testData{
- TLS: false,
- addr: "localhost:17912",
- })
- defer gracefulStop()
- conn, err := grpc.Dial("localhost:17912", grpc.WithInsecure())
- req.NoError(err)
- req.NotNil(conn)
-
- client := databasev1.NewStreamRegistryServiceClient(conn)
- req.NotNil(client)
+ . "github.com/onsi/ginkgo/v2"
+ . "github.com/onsi/gomega"
+)
+var _ = Describe("Registry", func() {
+ var gracefulStop func()
+ var conn *grpc.ClientConn
meta := &commonv1.Metadata{
Group: "default",
- Name: "sw",
}
-
- getResp, err := client.Get(context.TODO(), &databasev1.StreamRegistryServiceGetRequest{Metadata: meta})
-
- req.NoError(err)
- req.NotNil(getResp)
-
- // 2 - DELETE
- deleteResp, err := client.Delete(context.TODO(), &databasev1.StreamRegistryServiceDeleteRequest{
- Metadata: meta,
+ BeforeEach(func() {
+ gracefulStop = setup(nil)
+ var err error
+ conn, err = grpc.Dial("localhost:17912", grpc.WithInsecure())
+ Expect(err).NotTo(HaveOccurred())
})
- req.NoError(err)
- req.NotNil(deleteResp)
- req.True(deleteResp.GetDeleted())
-
- // 3 - GET -> Nil
- _, err = client.Get(context.TODO(), &databasev1.StreamRegistryServiceGetRequest{
- Metadata: meta,
- })
- errStatus, _ := status.FromError(err)
- req.Equal(errStatus.Message(), schema.ErrEntityNotFound.Error())
-
- // 4 - CREATE
- _, err = client.Create(context.TODO(), &databasev1.StreamRegistryServiceCreateRequest{Stream: getResp.GetStream()})
- req.NoError(err)
-
- // 5 - GET - > Not Nil
- getResp, err = client.Get(context.TODO(), &databasev1.StreamRegistryServiceGetRequest{
- Metadata: meta,
+ It("manages the stream", func() {
+ client := databasev1.NewStreamRegistryServiceClient(conn)
+ Expect(client).NotTo(BeNil())
+ meta.Name = "sw"
+ getResp, err := client.Get(context.TODO(), &databasev1.StreamRegistryServiceGetRequest{Metadata: meta})
+ Expect(err).ShouldNot(HaveOccurred())
+ Expect(getResp).NotTo(BeNil())
+ By("Cleanup the registry")
+ deleteResp, err := client.Delete(context.TODO(), &databasev1.StreamRegistryServiceDeleteRequest{
+ Metadata: meta,
+ })
+ Expect(err).ShouldNot(HaveOccurred())
+ Expect(deleteResp).NotTo(BeNil())
+ Expect(deleteResp.GetDeleted()).To(BeTrue())
+ By("Verifying the registry empty")
+ _, err = client.Get(context.TODO(), &databasev1.StreamRegistryServiceGetRequest{
+ Metadata: meta,
+ })
+ errStatus, _ := status.FromError(err)
+ Expect(errStatus.Message()).To(Equal(schema.ErrEntityNotFound.Error()))
+ By("Creating a new stream")
+ _, err = client.Create(context.TODO(), &databasev1.StreamRegistryServiceCreateRequest{Stream: getResp.GetStream()})
+ Expect(err).ShouldNot(HaveOccurred())
+ By("Verifying the new stream")
+ getResp, err = client.Get(context.TODO(), &databasev1.StreamRegistryServiceGetRequest{
+ Metadata: meta,
+ })
+ Expect(err).ShouldNot(HaveOccurred())
+ Expect(getResp).NotTo(BeNil())
})
- req.NoError(err)
- req.NotNil(getResp)
-}
-
-func TestIndexRuleBindingRegistry(t *testing.T) {
- req := require.New(t)
- gracefulStop := setup(req, testData{
- TLS: false,
- addr: "localhost:17912",
+ It("manages the index-rule-binding", func() {
+ client := databasev1.NewIndexRuleBindingRegistryServiceClient(conn)
+ Expect(client).NotTo(BeNil())
+ meta.Name = "sw-index-rule-binding"
+ getResp, err := client.Get(context.TODO(), &databasev1.IndexRuleBindingRegistryServiceGetRequest{Metadata: meta})
+ Expect(err).ShouldNot(HaveOccurred())
+ Expect(getResp).NotTo(BeNil())
+ By("Cleanup the registry")
+ deleteResp, err := client.Delete(context.TODO(), &databasev1.IndexRuleBindingRegistryServiceDeleteRequest{
+ Metadata: meta,
+ })
+ Expect(err).ShouldNot(HaveOccurred())
+ Expect(deleteResp).NotTo(BeNil())
+ Expect(deleteResp.GetDeleted()).To(BeTrue())
+ By("Verifying the registry empty")
+ _, err = client.Get(context.TODO(), &databasev1.IndexRuleBindingRegistryServiceGetRequest{
+ Metadata: meta,
+ })
+ errStatus, _ := status.FromError(err)
+ Expect(errStatus.Message()).To(Equal(schema.ErrEntityNotFound.Error()))
+ By("Creating a new index-rule-binding")
+ _, err = client.Create(context.TODO(), &databasev1.IndexRuleBindingRegistryServiceCreateRequest{IndexRuleBinding: getResp.GetIndexRuleBinding()})
+ Expect(err).ShouldNot(HaveOccurred())
+ By("Verifying the new index-rule-binding")
+ getResp, err = client.Get(context.TODO(), &databasev1.IndexRuleBindingRegistryServiceGetRequest{
+ Metadata: meta,
+ })
+ Expect(err).ShouldNot(HaveOccurred())
+ Expect(getResp).NotTo(BeNil())
})
- defer gracefulStop()
-
- conn, err := grpc.Dial("localhost:17912", grpc.WithInsecure())
- req.NoError(err)
- req.NotNil(conn)
-
- client := databasev1.NewIndexRuleBindingRegistryServiceClient(conn)
- req.NotNil(client)
-
- meta := &commonv1.Metadata{
- Group: "default",
- Name: "sw-index-rule-binding",
- }
-
- getResp, err := client.Get(context.TODO(), &databasev1.IndexRuleBindingRegistryServiceGetRequest{Metadata: meta})
-
- req.NoError(err)
- req.NotNil(getResp)
-
- // 2 - DELETE
- deleteResp, err := client.Delete(context.TODO(), &databasev1.IndexRuleBindingRegistryServiceDeleteRequest{
- Metadata: meta,
+ It("manages the index-rule", func() {
+ client := databasev1.NewIndexRuleRegistryServiceClient(conn)
+ Expect(client).NotTo(BeNil())
+ meta.Name = "db.instance"
+ getResp, err := client.Get(context.TODO(), &databasev1.IndexRuleRegistryServiceGetRequest{Metadata: meta})
+ Expect(err).ShouldNot(HaveOccurred())
+ Expect(getResp).NotTo(BeNil())
+ By("Cleanup the registry")
+ deleteResp, err := client.Delete(context.TODO(), &databasev1.IndexRuleRegistryServiceDeleteRequest{
+ Metadata: meta,
+ })
+ Expect(err).ShouldNot(HaveOccurred())
+ Expect(deleteResp).NotTo(BeNil())
+ Expect(deleteResp.GetDeleted()).To(BeTrue())
+ By("Verifying the registry empty")
+ _, err = client.Get(context.TODO(), &databasev1.IndexRuleRegistryServiceGetRequest{
+ Metadata: meta,
+ })
+ errStatus, _ := status.FromError(err)
+ Expect(errStatus.Message()).To(Equal(schema.ErrEntityNotFound.Error()))
+ By("Creating a new index-rule")
+ _, err = client.Create(context.TODO(), &databasev1.IndexRuleRegistryServiceCreateRequest{IndexRule: getResp.GetIndexRule()})
+ Expect(err).ShouldNot(HaveOccurred())
+ By("Verifying the new index-rule")
+ getResp, err = client.Get(context.TODO(), &databasev1.IndexRuleRegistryServiceGetRequest{
+ Metadata: meta,
+ })
+ Expect(err).ShouldNot(HaveOccurred())
+ Expect(getResp).NotTo(BeNil())
})
- req.NoError(err)
- req.NotNil(deleteResp)
- req.True(deleteResp.GetDeleted())
-
- // 3 - GET -> Nil
- _, err = client.Get(context.TODO(), &databasev1.IndexRuleBindingRegistryServiceGetRequest{
- Metadata: meta,
- })
- errStatus, _ := status.FromError(err)
- req.Equal(errStatus.Message(), schema.ErrEntityNotFound.Error())
-
- // 4 - CREATE
- _, err = client.Create(context.TODO(), &databasev1.IndexRuleBindingRegistryServiceCreateRequest{IndexRuleBinding: getResp.GetIndexRuleBinding()})
- req.NoError(err)
-
- // 5 - GET - > Not Nil
- getResp, err = client.Get(context.TODO(), &databasev1.IndexRuleBindingRegistryServiceGetRequest{
- Metadata: meta,
- })
- req.NoError(err)
- req.NotNil(getResp)
-}
-
-func TestIndexRuleRegistry(t *testing.T) {
- req := require.New(t)
- gracefulStop := setup(req, testData{
- TLS: false,
- addr: "localhost:17912",
- })
- defer gracefulStop()
-
- conn, err := grpc.Dial("localhost:17912", grpc.WithInsecure())
- req.NoError(err)
- req.NotNil(conn)
-
- client := databasev1.NewIndexRuleRegistryServiceClient(conn)
- req.NotNil(client)
-
- meta := &commonv1.Metadata{
- Group: "default",
- Name: "db.instance",
- }
-
- getResp, err := client.Get(context.TODO(), &databasev1.IndexRuleRegistryServiceGetRequest{Metadata: meta})
-
- req.NoError(err)
- req.NotNil(getResp)
-
- // 2 - DELETE
- deleteResp, err := client.Delete(context.TODO(), &databasev1.IndexRuleRegistryServiceDeleteRequest{
- Metadata: meta,
- })
- req.NoError(err)
- req.NotNil(deleteResp)
- req.True(deleteResp.GetDeleted())
-
- // 3 - GET -> Nil
- _, err = client.Get(context.TODO(), &databasev1.IndexRuleRegistryServiceGetRequest{
- Metadata: meta,
- })
- errStatus, _ := status.FromError(err)
- req.Equal(errStatus.Message(), schema.ErrEntityNotFound.Error())
-
- // 4 - CREATE
- _, err = client.Create(context.TODO(), &databasev1.IndexRuleRegistryServiceCreateRequest{IndexRule: getResp.GetIndexRule()})
- req.NoError(err)
-
- // 5 - GET - > Not Nil
- getResp, err = client.Get(context.TODO(), &databasev1.IndexRuleRegistryServiceGetRequest{
- Metadata: meta,
+ AfterEach(func() {
+ _ = conn.Close()
+ gracefulStop()
})
- req.NoError(err)
- req.NotNil(getResp)
-}
+})
diff --git a/banyand/liaison/grpc/server.go b/banyand/liaison/grpc/server.go
index 6720d7b..5081754 100644
--- a/banyand/liaison/grpc/server.go
+++ b/banyand/liaison/grpc/server.go
@@ -27,12 +27,13 @@ import (
"github.com/apache/skywalking-banyandb/api/event"
databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+ measurev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
streamv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
"github.com/apache/skywalking-banyandb/banyand/discovery"
"github.com/apache/skywalking-banyandb/banyand/metadata"
"github.com/apache/skywalking-banyandb/banyand/queue"
+ "github.com/apache/skywalking-banyandb/pkg/bus"
"github.com/apache/skywalking-banyandb/pkg/logger"
- "github.com/apache/skywalking-banyandb/pkg/partition"
"github.com/apache/skywalking-banyandb/pkg/run"
)
@@ -56,22 +57,26 @@ type Server struct {
pipeline queue.Queue
repo discovery.ServiceRepo
creds credentials.TransportCredentials
- shardRepo *shardRepo
- entityRepo *entityRepo
+
+ streamSVC *streamService
+ measureSVC *measureService
*streamRegistryServer
*indexRuleBindingRegistryServer
*indexRuleRegistryServer
*measureRegistryServer
*groupRegistryServer
- streamv1.UnimplementedStreamServiceServer
}
func NewServer(_ context.Context, pipeline queue.Queue, repo discovery.ServiceRepo, schemaRegistry metadata.Service) *Server {
return &Server{
- pipeline: pipeline,
- repo: repo,
- shardRepo: &shardRepo{shardEventsMap: make(map[identity]uint32)},
- entityRepo: &entityRepo{entitiesMap: make(map[identity]partition.EntityLocator)},
+ pipeline: pipeline,
+ repo: repo,
+ streamSVC: &streamService{
+ discoveryService: newDiscoveryService(pipeline),
+ },
+ measureSVC: &measureService{
+ discoveryService: newDiscoveryService(pipeline),
+ },
streamRegistryServer: &streamRegistryServer{
schemaRegistry: schemaRegistry,
},
@@ -92,13 +97,34 @@ func NewServer(_ context.Context, pipeline queue.Queue, repo discovery.ServiceRe
func (s *Server) PreRun() error {
s.log = logger.GetLogger("liaison-grpc")
- s.shardRepo.log = s.log
- s.entityRepo.log = s.log
- err := s.repo.Subscribe(event.StreamTopicShardEvent, s.shardRepo)
- if err != nil {
- return err
+ components := []struct {
+ shardEvent bus.Topic
+ entityEvent bus.Topic
+ discoverySVC *discoveryService
+ }{
+ {
+ shardEvent: event.StreamTopicShardEvent,
+ entityEvent: event.StreamTopicEntityEvent,
+ discoverySVC: s.streamSVC.discoveryService,
+ },
+ {
+ shardEvent: event.MeasureTopicShardEvent,
+ entityEvent: event.MeasureTopicEntityEvent,
+ discoverySVC: s.measureSVC.discoveryService,
+ },
+ }
+ for _, c := range components {
+ c.discoverySVC.SetLogger(s.log)
+ err := s.repo.Subscribe(c.shardEvent, c.discoverySVC.shardRepo)
+ if err != nil {
+ return err
+ }
+ err = s.repo.Subscribe(c.entityEvent, c.discoverySVC.entityRepo)
+ if err != nil {
+ return err
+ }
}
- return s.repo.Subscribe(event.StreamTopicEntityEvent, s.entityRepo)
+ return nil
}
func (s *Server) Name() string {
@@ -150,7 +176,9 @@ func (s *Server) Serve() error {
}
opts = append(opts, grpclib.MaxRecvMsgSize(s.maxRecvMsgSize))
s.ser = grpclib.NewServer(opts...)
- streamv1.RegisterStreamServiceServer(s.ser, s)
+
+ streamv1.RegisterStreamServiceServer(s.ser, s.streamSVC)
+ measurev1.RegisterMeasureServiceServer(s.ser, s.measureSVC)
// register *Registry
databasev1.RegisterGroupRegistryServiceServer(s.ser, s.groupRegistryServer)
databasev1.RegisterIndexRuleBindingRegistryServiceServer(s.ser, s.indexRuleBindingRegistryServer)
diff --git a/banyand/liaison/grpc/stream.go b/banyand/liaison/grpc/stream.go
index df831c4..8d125b9 100644
--- a/banyand/liaison/grpc/stream.go
+++ b/banyand/liaison/grpc/stream.go
@@ -28,7 +28,12 @@ import (
"github.com/apache/skywalking-banyandb/pkg/bus"
)
-func (s *Server) Write(stream streamv1.StreamService_WriteServer) error {
+type streamService struct {
+ *discoveryService
+ streamv1.UnimplementedStreamServiceServer
+}
+
+func (s *streamService) Write(stream streamv1.StreamService_WriteServer) error {
for {
writeEntity, err := stream.Recv()
if err == io.EOF {
@@ -37,18 +42,9 @@ func (s *Server) Write(stream streamv1.StreamService_WriteServer) error {
if err != nil {
return err
}
- id := getID(writeEntity.GetMetadata())
- shardNum, existed := s.shardRepo.shardNum(id)
- if !existed {
- continue
- }
- locator, existed := s.entityRepo.getLocator(id)
- if !existed {
- continue
- }
- entity, shardID, err := locator.Locate(writeEntity.GetElement().TagFamilies, shardNum)
+ entity, shardID, err := s.navigate(writeEntity.GetMetadata(), writeEntity.GetElement().GetTagFamilies())
if err != nil {
- s.log.Error().Err(err).Msg("failed to locate write target")
+ s.log.Error().Err(err).Msg("failed to navigate to the write target")
continue
}
message := bus.NewMessage(bus.MessageID(time.Now().UnixNano()), &streamv1.InternalWriteRequest{
@@ -66,7 +62,7 @@ func (s *Server) Write(stream streamv1.StreamService_WriteServer) error {
}
}
-func (s *Server) Query(_ context.Context, entityCriteria *streamv1.QueryRequest) (*streamv1.QueryResponse, error) {
+func (s *streamService) Query(_ context.Context, entityCriteria *streamv1.QueryRequest) (*streamv1.QueryResponse, error) {
message := bus.NewMessage(bus.MessageID(time.Now().UnixNano()), entityCriteria)
feat, errQuery := s.pipeline.Publish(data.TopicStreamQuery, message)
if errQuery != nil {
diff --git a/banyand/liaison/grpc/stream_test.go b/banyand/liaison/grpc/stream_test.go
index 1f30088..01aea1d 100644
--- a/banyand/liaison/grpc/stream_test.go
+++ b/banyand/liaison/grpc/stream_test.go
@@ -1,40 +1,20 @@
-// Licensed to Apache Software Foundation (ASF) under one or more contributor
-// license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright
-// ownership. Apache Software Foundation (ASF) licenses this file to you under
-// the Apache License, Version 2.0 (the "License"); you may
-// not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package grpc
+package grpc_test
import (
"context"
"encoding/base64"
- "fmt"
"io"
"path/filepath"
"runtime"
"sync"
- "testing"
"time"
- "github.com/stretchr/testify/assert"
- "github.com/stretchr/testify/require"
grpclib "google.golang.org/grpc"
"google.golang.org/grpc/credentials"
streamv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
"github.com/apache/skywalking-banyandb/banyand/discovery"
+ "github.com/apache/skywalking-banyandb/banyand/liaison/grpc"
"github.com/apache/skywalking-banyandb/banyand/metadata"
"github.com/apache/skywalking-banyandb/banyand/query"
"github.com/apache/skywalking-banyandb/banyand/queue"
@@ -44,53 +24,73 @@ import (
"github.com/apache/skywalking-banyandb/pkg/run"
"github.com/apache/skywalking-banyandb/pkg/test"
teststream "github.com/apache/skywalking-banyandb/pkg/test/stream"
-)
-
-var _ run.PreRunner = (*preloadStreamService)(nil)
-
-type preloadStreamService struct {
- metaSvc metadata.Service
-}
-
-func (p *preloadStreamService) Name() string {
- return "preload-measure"
-}
-func (p *preloadStreamService) PreRun() error {
- return teststream.PreloadSchema(p.metaSvc.SchemaRegistry())
-}
+ . "github.com/onsi/ginkgo/v2"
+ . "github.com/onsi/gomega"
+)
-type testData struct {
- certFile string
- serverHostOverride string
- TLS bool
- addr string
- basePath string
-}
+var _ = Describe("Stream", func() {
+ var gracefulStop func()
+ var conn *grpclib.ClientConn
+ It("is a plain server", func() {
+ gracefulStop = setup(nil)
+ var err error
+ conn, err = grpclib.Dial("localhost:17912", grpclib.WithInsecure())
+ Expect(err).NotTo(HaveOccurred())
+ streamWrite(conn)
+ Eventually(func() (int, error) {
+ return streamQuery(conn)
+ }).Should(Equal(1))
+ })
+ It("is a TLS server", func() {
+ flags := []string{"--tls=true"}
+ _, currentFile, _, _ := runtime.Caller(0)
+ basePath := filepath.Dir(currentFile)
+ certFile := filepath.Join(basePath, "testdata/server_cert.pem")
+ keyFile := filepath.Join(basePath, "testdata/server_key.pem")
+ flags = append(flags, "--cert-file="+certFile)
+ flags = append(flags, "--key-file="+keyFile)
+ addr := "localhost:17913"
+ flags = append(flags, "--addr="+addr)
+ gracefulStop = setup(flags)
+ creds, err := credentials.NewClientTLSFromFile(certFile, "localhost")
+ Expect(err).NotTo(HaveOccurred())
+ conn, err = grpclib.Dial(addr, grpclib.WithTransportCredentials(creds))
+ Expect(err).NotTo(HaveOccurred())
+ streamWrite(conn)
+ Eventually(func() (int, error) {
+ return streamQuery(conn)
+ }).Should(Equal(1))
+ })
+ AfterEach(func() {
+ _ = conn.Close()
+ gracefulStop()
+ })
+})
-func setup(req *require.Assertions, testData testData) func() {
- req.NoError(logger.Init(logger.Logging{
+func setup(flags []string) func() {
+ Expect(logger.Init(logger.Logging{
Env: "dev",
Level: "warn",
- }))
+ })).Should(Succeed())
g := run.Group{Name: "standalone"}
// Init `Discovery` module
repo, err := discovery.NewServiceRepo(context.Background())
- req.NoError(err)
+ Expect(err).NotTo(HaveOccurred())
// Init `Queue` module
pipeline, err := queue.NewQueue(context.TODO(), repo)
- req.NoError(err)
+ Expect(err).NotTo(HaveOccurred())
// Init `Metadata` module
metaSvc, err := metadata.NewService(context.TODO())
- req.NoError(err)
+ Expect(err).NotTo(HaveOccurred())
// Init `Stream` module
streamSvc, err := stream.NewService(context.TODO(), metaSvc, repo, pipeline)
- req.NoError(err)
+ Expect(err).NotTo(HaveOccurred())
// Init `Query` module
q, err := query.NewExecutor(context.TODO(), streamSvc, metaSvc, repo, pipeline)
- req.NoError(err)
+ Expect(err).NotTo(HaveOccurred())
- tcp := NewServer(context.TODO(), pipeline, repo, metaSvc)
+ tcp := grpc.NewServer(context.TODO(), pipeline, repo, metaSvc)
closer := run.NewTester("closer")
startListener := run.NewTester("started-listener")
@@ -108,18 +108,12 @@ func setup(req *require.Assertions, testData testData) func() {
startListener,
)
// Create a random directory
- rootPath, deferFunc := test.Space(req)
- flags := []string{"--root-path=" + rootPath, "--metadata-root-path=" + teststream.RandomTempDir()}
- if testData.TLS {
- flags = append(flags, "--tls=true")
- certFile := filepath.Join(testData.basePath, "testdata/server_cert.pem")
- keyFile := filepath.Join(testData.basePath, "testdata/server_key.pem")
- flags = append(flags, "--cert-file="+certFile)
- flags = append(flags, "--key-file="+keyFile)
- flags = append(flags, "--addr="+testData.addr)
- }
+ rootPath, deferFunc, err := test.NewSpace()
+ Expect(err).NotTo(HaveOccurred())
+ flags = append(flags, "--root-path="+rootPath, "--metadata-root-path="+teststream.RandomTempDir())
+
err = g.RegisterFlags().Parse(flags)
- req.NoError(err)
+ Expect(err).NotTo(HaveOccurred())
wg := sync.WaitGroup{}
@@ -130,72 +124,27 @@ func setup(req *require.Assertions, testData testData) func() {
errRun := g.Run()
if errRun != nil {
startListener.GracefulStop()
- req.NoError(errRun)
+ Expect(errRun).Should(Succeed())
}
deferFunc()
}()
- req.NoError(startListener.WaitUntilStarted())
+ Expect(startListener.WaitUntilStarted()).Should(Succeed())
return func() {
closer.GracefulStop()
wg.Wait()
}
}
-type caseData struct {
- name string
- queryGenerator func(baseTs time.Time) *streamv1.QueryRequest
- writeGenerator func() *streamv1.WriteRequest
- args testData
- wantLen int
+type preloadStreamService struct {
+ metaSvc metadata.Service
}
-func TestStreamService(t *testing.T) {
- req := require.New(t)
- _, currentFile, _, _ := runtime.Caller(0)
- basePath := filepath.Dir(currentFile)
- certFile := filepath.Join(basePath, "testdata/server_cert.pem")
- testCases := []caseData{
- {
- name: "isTLS",
- queryGenerator: queryCriteria,
- writeGenerator: writeData,
- args: testData{
- TLS: true,
- certFile: certFile,
- serverHostOverride: "localhost",
- addr: "localhost:17913",
- basePath: basePath,
- },
- wantLen: 1,
- },
- {
- name: "noTLS",
- queryGenerator: queryCriteria,
- writeGenerator: writeData,
- args: testData{
- TLS: false,
- addr: "localhost:17912",
- },
- wantLen: 1,
- },
- }
- for _, tc := range testCases {
- t.Run(tc.name, func(t *testing.T) {
- gracefulStop := setup(req, tc.args)
- defer gracefulStop()
- if tc.args.TLS {
- var opts []grpclib.DialOption
- creds, err := credentials.NewClientTLSFromFile(tc.args.certFile, tc.args.serverHostOverride)
- assert.NoError(t, err)
- opts = append(opts, grpclib.WithTransportCredentials(creds))
- dialService(t, tc, opts)
- } else {
- var opts []grpclib.DialOption
- opts = append(opts, grpclib.WithInsecure())
- dialService(t, tc, opts)
- }
- })
- }
+func (p *preloadStreamService) Name() string {
+ return "preload-measure"
+}
+
+func (p *preloadStreamService) PreRun() error {
+ return teststream.PreloadSchema(p.metaSvc.SchemaRegistry())
}
func writeData() *streamv1.WriteRequest {
@@ -227,60 +176,23 @@ func queryCriteria(baseTs time.Time) *streamv1.QueryRequest {
Build()
}
-func dialService(t *testing.T, tc caseData, opts []grpclib.DialOption) {
- conn, err := grpclib.Dial(tc.args.addr, opts...)
- assert.NoError(t, err)
- defer func(conn *grpclib.ClientConn) {
- _ = conn.Close()
- }(conn)
- streamWrite(t, tc, conn)
- requireTester := require.New(t)
- assert.NoError(t, test.Retry(10, 100*time.Millisecond, func() error {
- now := time.Now()
- resp := streamQuery(requireTester, conn, tc.queryGenerator(now))
- if len(resp.GetElements()) == tc.wantLen {
- return nil
- }
- return fmt.Errorf("expected elements number: %d got: %d", tc.wantLen, len(resp.GetElements()))
- }))
-}
-
-func streamWrite(t *testing.T, tc caseData, conn *grpclib.ClientConn) {
+func streamWrite(conn *grpclib.ClientConn) {
client := streamv1.NewStreamServiceClient(conn)
ctx := context.Background()
writeClient, errorWrite := client.Write(ctx)
- if errorWrite != nil {
- t.Errorf("%v.write(_) = _, %v", client, errorWrite)
- }
- waitc := make(chan struct{})
- go func() {
- for {
- writeResponse, errRecv := writeClient.Recv()
- if errRecv == io.EOF {
- // read done.
- close(waitc)
- return
- }
- assert.NoError(t, errRecv)
- assert.NotNil(t, writeResponse)
- }
- }()
- if errSend := writeClient.Send(tc.writeGenerator()); errSend != nil {
- t.Errorf("Failed to send a note: %v", errSend)
- }
- if errorSend := writeClient.CloseSend(); errorSend != nil {
- t.Errorf("Failed to send a note: %v", errorSend)
- }
- <-waitc
+ Expect(errorWrite).Should(Succeed())
+ Expect(writeClient.Send(writeData())).Should(Succeed())
+ Expect(writeClient.CloseSend()).Should(Succeed())
+ Eventually(func() error {
+ _, err := writeClient.Recv()
+ return err
+ }).Should(Equal(io.EOF))
}
-func streamQuery(tester *require.Assertions, conn *grpclib.ClientConn, request *streamv1.QueryRequest) *streamv1.QueryResponse {
+func streamQuery(conn *grpclib.ClientConn) (int, error) {
client := streamv1.NewStreamServiceClient(conn)
ctx := context.Background()
- queryResponse, errRev := client.Query(ctx, request)
- if errRev != nil {
- tester.Errorf(errRev, "Retrieve client failed: %v")
- }
- tester.NotNil(queryResponse)
- return queryResponse
+ resp, err := client.Query(ctx, queryCriteria(time.Now()))
+
+ return len(resp.GetElements()), err
}
diff --git a/go.mod b/go.mod
index adda1b0..91a8dec 100644
--- a/go.mod
+++ b/go.mod
@@ -38,12 +38,14 @@ require (
github.com/dustin/go-humanize v1.0.0 // indirect
github.com/form3tech-oss/jwt-go v3.2.3+incompatible // indirect
github.com/fsnotify/fsnotify v1.4.9 // indirect
+ github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/snappy v0.0.3 // indirect
github.com/google/btree v1.0.1 // indirect
github.com/google/flatbuffers v1.12.1 // indirect
+ github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38 // indirect
github.com/gorilla/websocket v1.4.2 // indirect
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 // indirect
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 // indirect
@@ -58,6 +60,8 @@ require (
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.1 // indirect
github.com/mschoch/smat v0.2.0 // indirect
+ github.com/onsi/ginkgo/v2 v2.0.0 // indirect
+ github.com/onsi/gomega v1.17.0 // indirect
github.com/pelletier/go-toml v1.9.3 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_golang v1.11.0 // indirect
@@ -94,6 +98,7 @@ require (
golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0 // indirect
golang.org/x/text v0.3.6 // indirect
golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba // indirect
+ golang.org/x/tools v0.1.5 // indirect
gopkg.in/ini.v1 v1.62.0 // indirect
gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
diff --git a/go.sum b/go.sum
index 5b62477..425e2d0 100644
--- a/go.sum
+++ b/go.sum
@@ -146,6 +146,8 @@ github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9
github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk=
github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A=
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
+github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0 h1:p104kn46Q8WdvHunIJ9dAyjPVtrBPhSr3KT2yUst43I=
+github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0/go.mod h1:fyg7847qk6SyHyPtNmDHnmrv/HOrqktSC+C9fM+CJOE=
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4=
@@ -222,6 +224,8 @@ github.com/google/pprof v0.0.0-20201023163331-3e6fc7fc9c4c/go.mod h1:kpwsk12EmLe
github.com/google/pprof v0.0.0-20201203190320-1bf35d6f28c2/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE=
github.com/google/pprof v0.0.0-20210122040257-d980be63207e/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE=
github.com/google/pprof v0.0.0-20210226084205-cbba55b83ad5/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE=
+github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38 h1:yAJXTCF9TqKcTiHJAE8dj7HMvPfh66eeA2JYW7eFpSE=
+github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE=
github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
@@ -261,6 +265,7 @@ github.com/hashicorp/logutils v1.0.0/go.mod h1:QIAnNjmIWmVIIkWDTG1z5v++HQmx9WQRO
github.com/hashicorp/mdns v1.0.0/go.mod h1:tL+uN++7HEJ6SQLQ2/p+z2pH24WQKWjBPkE0mNTz8vQ=
github.com/hashicorp/memberlist v0.1.3/go.mod h1:ajVTdAv/9Im8oMAAj5G31PhhMCZJV2pPBoIllUwCN7I=
github.com/hashicorp/serf v0.8.2/go.mod h1:6hOLApaqBFA1NXqRQAsxw9QxuDEvNxSQRwA/JwenrHc=
+github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM=
@@ -325,9 +330,20 @@ github.com/mschoch/smat v0.2.0 h1:8imxQsjDm8yFEAVBe7azKmKSgzSkZXDuKkSq9374khM=
github.com/mschoch/smat v0.2.0/go.mod h1:kc9mz7DoBKqDyiRL7VZN8KvXQMWeTaVnttLRXOlotKw=
github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
+github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=
+github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU=
github.com/oklog/run v1.1.0 h1:GEenZ1cK0+q0+wsJew9qUg/DyD8k3JzYsZAi5gYi2mA=
github.com/oklog/run v1.1.0/go.mod h1:sVPdnTZT1zYwAJeCMu2Th4T21pA3FPOQRfWjQlk7DVU=
github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U=
+github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
+github.com/onsi/ginkgo v1.12.1/go.mod h1:zj2OWP4+oCPe1qIXoGWkgMRwljMUYCdkwsT2108oapk=
+github.com/onsi/ginkgo v1.16.4/go.mod h1:dX+/inL/fNMqNlz0e9LfyB9TswhZpCVdJM/Z6Vvnwo0=
+github.com/onsi/ginkgo/v2 v2.0.0 h1:CcuG/HvWNkkaqCUpJifQY8z7qEMBJya6aLPx6ftGyjQ=
+github.com/onsi/ginkgo/v2 v2.0.0/go.mod h1:vw5CSIxN1JObi/U8gcbwft7ZxR2dgaR70JSE3/PpL4c=
+github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY=
+github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo=
+github.com/onsi/gomega v1.17.0 h1:9Luw4uT5HTjHTN8+aNcSThgH1vdXnmdJ8xIfZ4wyTRE=
+github.com/onsi/gomega v1.17.0/go.mod h1:HnhC7FXeEQY45zxNK3PPoIUhzk/80Xly9PcubAlGdZY=
github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic=
@@ -543,6 +559,7 @@ golang.org/x/mod v0.4.1/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
+golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20181023162649-9b4f9f5ad519/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20181201002055-351d144fa1fc/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
@@ -568,6 +585,7 @@ golang.org/x/net v0.0.0-20200324143707-d3edc9973b7e/go.mod h1:qpuaurCH72eLCgpAm/
golang.org/x/net v0.0.0-20200501053045-e0ff5e5a1de5/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
golang.org/x/net v0.0.0-20200506145744-7e3656a0809f/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
golang.org/x/net v0.0.0-20200513185701-a91f0712d120/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
+golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
golang.org/x/net v0.0.0-20200520182314-0ba52f642ac2/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
golang.org/x/net v0.0.0-20200625001655-4c5254603344/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
golang.org/x/net v0.0.0-20200707034311-ab3426394381/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
@@ -581,6 +599,7 @@ golang.org/x/net v0.0.0-20210119194325-5f4716e94777/go.mod h1:m0MpNAwzfU5UDzcl9v
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.0.0-20210316092652-d523dce5a7f4/go.mod h1:RBQZq4jEuRlivfhVLdyRGr576XBO4/greRjx4P4O3yc=
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
+golang.org/x/net v0.0.0-20210428140749-89ef3d95e781/go.mod h1:OJAsFXCWl8Ukc7SiCT/9KSuxbyM7479/AVlXFRxuMCk=
golang.org/x/net v0.0.0-20210716203947-853a461950ff h1:j2EK/QoxYNBsXI4R7fQkkRUk8y6wnOBI+6hgPdP/6Ds=
golang.org/x/net v0.0.0-20210716203947-853a461950ff/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
@@ -610,6 +629,7 @@ golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20181026203630-95b1ffbd15a5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20181107165924-66b7b1311ac8/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
@@ -623,9 +643,11 @@ golang.org/x/sys v0.0.0-20190507160741-ecd444e8653b/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20190606165138-5da285871e9c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190624142023-c5567b49c5d0/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190726091711-fc99dfbffb4e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20190904154756-749cb33beabd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191001151750-bb3f8db39f24/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20191120155948-bd437916bb0e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191204072324-ce4227a45e2e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191228213918-04cbcbbfeed8/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200106162015-b016eb3dc98e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
@@ -650,6 +672,7 @@ golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201201145000-ef89a241ccb3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210104204734-6f8348627aad/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20210112080510-489259a85091/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210220050731-9a76102bfb43/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
@@ -728,6 +751,7 @@ golang.org/x/tools v0.0.0-20200904185747-39188db58858/go.mod h1:Cj7w3i3Rnn0Xh82u
golang.org/x/tools v0.0.0-20201110124207-079ba7bd75cd/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.0.0-20201201161351-ac6f37ff4c2a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.0.0-20201208233053-a543418bbed2/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
+golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.0.0-20210105154028-b0ab187a4818/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0=
@@ -856,12 +880,14 @@ gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
+gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
gopkg.in/ini.v1 v1.51.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
gopkg.in/ini.v1 v1.62.0 h1:duBzk771uxoUuOlyRLkHsygud9+5lrlGjdFBb4mSKDU=
gopkg.in/ini.v1 v1.62.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
gopkg.in/natefinch/lumberjack.v2 v2.0.0 h1:1Lc07Kr7qY4U2YPouBjpCLxpiyxIVoxqXgkXLknAOE8=
gopkg.in/natefinch/lumberjack.v2 v2.0.0/go.mod h1:l0ndWWf7gzL7RNwBG7wST/UCcT4T24xpD6X8LsfU/+k=
gopkg.in/resty.v1 v1.12.0/go.mod h1:mDo4pnntr5jdWRML875a/NmxYqAlA73dVijT2AXvQQo=
+gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
gopkg.in/yaml.v2 v2.0.0-20170812160011-eb3733d160e7/go.mod h1:JAlM8MvJe8wmxCU4Bli9HhUf9+ttbYbLASfIpnQbh74=
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
diff --git a/pkg/logger/logger.go b/pkg/logger/logger.go
index 1407a4f..36896ed 100644
--- a/pkg/logger/logger.go
+++ b/pkg/logger/logger.go
@@ -46,3 +46,8 @@ func (l *Logger) Named(name string) *Logger {
subLogger := root.Logger.With().Str("module", module).Logger()
return &Logger{module: module, Logger: &subLogger}
}
+
+// Loggable indicates the implement supports logging
+type Loggable interface {
+ SetLogger(*Logger)
+}
diff --git a/pkg/test/space.go b/pkg/test/space.go
index bbe7ea3..8fafdb3 100644
--- a/pkg/test/space.go
+++ b/pkg/test/space.go
@@ -35,3 +35,12 @@ func Space(t *require.Assertions) (tempDir string, deferFunc func()) {
}
}
}
+
+func NewSpace() (tempDir string, deferFunc func(), err error) {
+ tempDir, err = ioutil.TempDir("", "banyandb-test-*")
+ return tempDir, func() {
+ if err = os.RemoveAll(tempDir); err != nil {
+ _, _ = fmt.Fprintf(os.Stderr, "Error while removing dir: %v\n", err)
+ }
+ }, err
+}