You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2022/01/14 00:42:31 UTC

[skywalking-banyandb] branch main updated: Introduce ginkgo+gomega test framework (#67)

This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new 828c20e  Introduce ginkgo+gomega test framework (#67)
828c20e is described below

commit 828c20e0435d2519afba79ff8b8b9839bd5eacaf
Author: Gao Hongtao <ha...@gmail.com>
AuthorDate: Fri Jan 14 08:42:25 2022 +0800

    Introduce ginkgo+gomega test framework (#67)
---
 banyand/liaison/grpc/discovery.go                  |  42 ++++
 .../liaison/grpc/grpc_suite_test.go                |  21 +-
 banyand/liaison/grpc/{stream.go => measure.go}     |  50 ++--
 banyand/liaison/grpc/registry_test.go              | 252 +++++++++------------
 banyand/liaison/grpc/server.go                     |  58 +++--
 banyand/liaison/grpc/stream.go                     |  22 +-
 banyand/liaison/grpc/stream_test.go                | 228 +++++++------------
 go.mod                                             |   5 +
 go.sum                                             |  27 +++
 pkg/logger/logger.go                               |   5 +
 pkg/test/space.go                                  |   9 +
 11 files changed, 342 insertions(+), 377 deletions(-)

diff --git a/banyand/liaison/grpc/discovery.go b/banyand/liaison/grpc/discovery.go
index e7e2d76..609fadf 100644
--- a/banyand/liaison/grpc/discovery.go
+++ b/banyand/liaison/grpc/discovery.go
@@ -20,13 +20,55 @@ package grpc
 import (
 	"sync"
 
+	"github.com/pkg/errors"
+
+	"github.com/apache/skywalking-banyandb/api/common"
 	commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
 	databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+	modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+	"github.com/apache/skywalking-banyandb/banyand/queue"
+	"github.com/apache/skywalking-banyandb/banyand/tsdb"
 	"github.com/apache/skywalking-banyandb/pkg/bus"
 	"github.com/apache/skywalking-banyandb/pkg/logger"
 	"github.com/apache/skywalking-banyandb/pkg/partition"
 )
 
+var ErrNotExist = errors.New("the object doesn't exist")
+
+type discoveryService struct {
+	shardRepo  *shardRepo
+	entityRepo *entityRepo
+	pipeline   queue.Queue
+	log        *logger.Logger
+}
+
+func newDiscoveryService(pipeline queue.Queue) *discoveryService {
+	return &discoveryService{
+		shardRepo:  &shardRepo{shardEventsMap: make(map[identity]uint32)},
+		entityRepo: &entityRepo{entitiesMap: make(map[identity]partition.EntityLocator)},
+		pipeline:   pipeline,
+	}
+}
+
+func (ds *discoveryService) SetLogger(log *logger.Logger) {
+	ds.log = log
+	ds.shardRepo.log = log
+	ds.entityRepo.log = log
+}
+
+func (ds *discoveryService) navigate(metadata *commonv1.Metadata, tagFamilies []*modelv1.TagFamilyForWrite) (tsdb.Entity, common.ShardID, error) {
+	id := getID(metadata)
+	shardNum, existed := ds.shardRepo.shardNum(id)
+	if !existed {
+		return nil, common.ShardID(0), errors.Wrapf(ErrNotExist, "finding the shard num by: %v", metadata)
+	}
+	locator, existed := ds.entityRepo.getLocator(id)
+	if !existed {
+		return nil, common.ShardID(0), errors.Wrapf(ErrNotExist, "finding the locator by: %v", metadata)
+	}
+	return locator.Locate(tagFamilies, shardNum)
+}
+
 type identity struct {
 	name  string
 	group string
diff --git a/pkg/test/space.go b/banyand/liaison/grpc/grpc_suite_test.go
similarity index 67%
copy from pkg/test/space.go
copy to banyand/liaison/grpc/grpc_suite_test.go
index bbe7ea3..ae8f957 100644
--- a/pkg/test/space.go
+++ b/banyand/liaison/grpc/grpc_suite_test.go
@@ -15,23 +15,16 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package test
+package grpc_test
 
 import (
-	"fmt"
-	"io/ioutil"
-	"os"
+	"testing"
 
-	"github.com/stretchr/testify/require"
+	. "github.com/onsi/ginkgo/v2"
+	. "github.com/onsi/gomega"
 )
 
-func Space(t *require.Assertions) (tempDir string, deferFunc func()) {
-	var tempDirErr error
-	tempDir, tempDirErr = ioutil.TempDir("", "banyandb-test-*")
-	t.Nil(tempDirErr)
-	return tempDir, func() {
-		if err := os.RemoveAll(tempDir); err != nil {
-			_, _ = fmt.Fprintf(os.Stderr, "Error while removing dir: %v\n", err)
-		}
-	}
+func TestGrpc(t *testing.T) {
+	RegisterFailHandler(Fail)
+	RunSpecs(t, "Grpc Suite")
 }
diff --git a/banyand/liaison/grpc/stream.go b/banyand/liaison/grpc/measure.go
similarity index 51%
copy from banyand/liaison/grpc/stream.go
copy to banyand/liaison/grpc/measure.go
index df831c4..530b024 100644
--- a/banyand/liaison/grpc/stream.go
+++ b/banyand/liaison/grpc/measure.go
@@ -18,67 +18,47 @@
 package grpc
 
 import (
-	"context"
 	"io"
 	"time"
 
 	"github.com/apache/skywalking-banyandb/api/data"
-	streamv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
+	measurev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
 	"github.com/apache/skywalking-banyandb/banyand/tsdb"
 	"github.com/apache/skywalking-banyandb/pkg/bus"
 )
 
-func (s *Server) Write(stream streamv1.StreamService_WriteServer) error {
+type measureService struct {
+	*discoveryService
+	measurev1.UnimplementedMeasureServiceServer
+}
+
+func (ms *measureService) Write(measure measurev1.MeasureService_WriteServer) error {
 	for {
-		writeEntity, err := stream.Recv()
+		writeRequest, err := measure.Recv()
 		if err == io.EOF {
 			return nil
 		}
 		if err != nil {
 			return err
 		}
-		id := getID(writeEntity.GetMetadata())
-		shardNum, existed := s.shardRepo.shardNum(id)
-		if !existed {
-			continue
-		}
-		locator, existed := s.entityRepo.getLocator(id)
-		if !existed {
-			continue
-		}
-		entity, shardID, err := locator.Locate(writeEntity.GetElement().TagFamilies, shardNum)
+		entity, shardID, err := ms.navigate(writeRequest.GetMetadata(), writeRequest.GetDataPoint().GetTagFamilies())
 		if err != nil {
-			s.log.Error().Err(err).Msg("failed to locate write target")
+			ms.log.Error().Err(err).Msg("failed to navigate to the write target")
 			continue
 		}
-		message := bus.NewMessage(bus.MessageID(time.Now().UnixNano()), &streamv1.InternalWriteRequest{
-			Request:    writeEntity,
+		message := bus.NewMessage(bus.MessageID(time.Now().UnixNano()), &measurev1.InternalWriteRequest{
+			Request:    writeRequest,
 			ShardId:    uint32(shardID),
 			SeriesHash: tsdb.HashEntity(entity),
 		})
-		_, errWritePub := s.pipeline.Publish(data.TopicStreamWrite, message)
+		_, errWritePub := ms.pipeline.Publish(data.TopicMeasureWrite, message)
 		if errWritePub != nil {
 			return errWritePub
 		}
-		if errSend := stream.Send(&streamv1.WriteResponse{}); errSend != nil {
+		if errSend := measure.Send(&measurev1.WriteResponse{}); errSend != nil {
 			return errSend
 		}
 	}
 }
 
-func (s *Server) Query(_ context.Context, entityCriteria *streamv1.QueryRequest) (*streamv1.QueryResponse, error) {
-	message := bus.NewMessage(bus.MessageID(time.Now().UnixNano()), entityCriteria)
-	feat, errQuery := s.pipeline.Publish(data.TopicStreamQuery, message)
-	if errQuery != nil {
-		return nil, errQuery
-	}
-	msg, errFeat := feat.Get()
-	if errFeat != nil {
-		return nil, errFeat
-	}
-	queryMsg, ok := msg.Data().([]*streamv1.Element)
-	if !ok {
-		return nil, ErrQueryMsg
-	}
-	return &streamv1.QueryResponse{Elements: queryMsg}, nil
-}
+//TODO: implement topN & Query
diff --git a/banyand/liaison/grpc/registry_test.go b/banyand/liaison/grpc/registry_test.go
index 1c21f8a..bd2ab05 100644
--- a/banyand/liaison/grpc/registry_test.go
+++ b/banyand/liaison/grpc/registry_test.go
@@ -15,13 +15,13 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package grpc
+package grpc_test
 
 import (
 	"context"
-	"testing"
 
-	"github.com/stretchr/testify/require"
+	. "github.com/onsi/ginkgo/v2"
+	. "github.com/onsi/gomega"
 	"google.golang.org/grpc"
 	"google.golang.org/grpc/status"
 
@@ -30,158 +30,110 @@ import (
 	"github.com/apache/skywalking-banyandb/banyand/metadata/schema"
 )
 
-func TestStreamRegistry(t *testing.T) {
-	req := require.New(t)
-	gracefulStop := setup(req, testData{
-		TLS:  false,
-		addr: "localhost:17912",
-	})
-	defer gracefulStop()
-
-	conn, err := grpc.Dial("localhost:17912", grpc.WithInsecure())
-	req.NoError(err)
-	req.NotNil(conn)
-
-	client := databasev1.NewStreamRegistryServiceClient(conn)
-	req.NotNil(client)
-
-	meta := &commonv1.Metadata{
-		Group: "default",
-		Name:  "sw",
-	}
-
-	getResp, err := client.Get(context.TODO(), &databasev1.StreamRegistryServiceGetRequest{Metadata: meta})
-
-	req.NoError(err)
-	req.NotNil(getResp)
-
-	// 2 - DELETE
-	deleteResp, err := client.Delete(context.TODO(), &databasev1.StreamRegistryServiceDeleteRequest{
-		Metadata: meta,
-	})
-	req.NoError(err)
-	req.NotNil(deleteResp)
-	req.True(deleteResp.GetDeleted())
-
-	// 3 - GET -> Nil
-	_, err = client.Get(context.TODO(), &databasev1.StreamRegistryServiceGetRequest{
-		Metadata: meta,
-	})
-	errStatus, _ := status.FromError(err)
-	req.Equal(errStatus.Message(), schema.ErrEntityNotFound.Error())
-
-	// 4 - CREATE
-	_, err = client.Create(context.TODO(), &databasev1.StreamRegistryServiceCreateRequest{Stream: getResp.GetStream()})
-	req.NoError(err)
-
-	// 5 - GET - > Not Nil
-	getResp, err = client.Get(context.TODO(), &databasev1.StreamRegistryServiceGetRequest{
-		Metadata: meta,
-	})
-	req.NoError(err)
-	req.NotNil(getResp)
-}
-
-func TestIndexRuleBindingRegistry(t *testing.T) {
-	req := require.New(t)
-	gracefulStop := setup(req, testData{
-		TLS:  false,
-		addr: "localhost:17912",
-	})
-	defer gracefulStop()
-
-	conn, err := grpc.Dial("localhost:17912", grpc.WithInsecure())
-	req.NoError(err)
-	req.NotNil(conn)
-
-	client := databasev1.NewIndexRuleBindingRegistryServiceClient(conn)
-	req.NotNil(client)
-
+var _ = Describe("Registry", func() {
+	var gracefulStop func()
+	var conn *grpc.ClientConn
 	meta := &commonv1.Metadata{
 		Group: "default",
-		Name:  "sw-index-rule-binding",
 	}
-
-	getResp, err := client.Get(context.TODO(), &databasev1.IndexRuleBindingRegistryServiceGetRequest{Metadata: meta})
-
-	req.NoError(err)
-	req.NotNil(getResp)
-
-	// 2 - DELETE
-	deleteResp, err := client.Delete(context.TODO(), &databasev1.IndexRuleBindingRegistryServiceDeleteRequest{
-		Metadata: meta,
+	BeforeEach(func() {
+		gracefulStop = setup(nil)
+		var err error
+		conn, err = grpc.Dial("localhost:17912", grpc.WithInsecure())
+		Expect(err).NotTo(HaveOccurred())
 	})
-	req.NoError(err)
-	req.NotNil(deleteResp)
-	req.True(deleteResp.GetDeleted())
-
-	// 3 - GET -> Nil
-	_, err = client.Get(context.TODO(), &databasev1.IndexRuleBindingRegistryServiceGetRequest{
-		Metadata: meta,
+	It("manages the stream", func() {
+		client := databasev1.NewStreamRegistryServiceClient(conn)
+		Expect(client).NotTo(BeNil())
+		meta.Name = "sw"
+		getResp, err := client.Get(context.TODO(), &databasev1.StreamRegistryServiceGetRequest{Metadata: meta})
+		Expect(err).ShouldNot(HaveOccurred())
+		Expect(getResp).NotTo(BeNil())
+		By("Cleanup the registry")
+		deleteResp, err := client.Delete(context.TODO(), &databasev1.StreamRegistryServiceDeleteRequest{
+			Metadata: meta,
+		})
+		Expect(err).ShouldNot(HaveOccurred())
+		Expect(deleteResp).NotTo(BeNil())
+		Expect(deleteResp.GetDeleted()).To(BeTrue())
+		By("Verifying the registry empty")
+		_, err = client.Get(context.TODO(), &databasev1.StreamRegistryServiceGetRequest{
+			Metadata: meta,
+		})
+		errStatus, _ := status.FromError(err)
+		Expect(errStatus.Message()).To(Equal(schema.ErrEntityNotFound.Error()))
+		By("Creating a new stream")
+		_, err = client.Create(context.TODO(), &databasev1.StreamRegistryServiceCreateRequest{Stream: getResp.GetStream()})
+		Expect(err).ShouldNot(HaveOccurred())
+		By("Verifying the new stream")
+		getResp, err = client.Get(context.TODO(), &databasev1.StreamRegistryServiceGetRequest{
+			Metadata: meta,
+		})
+		Expect(err).ShouldNot(HaveOccurred())
+		Expect(getResp).NotTo(BeNil())
 	})
-	errStatus, _ := status.FromError(err)
-	req.Equal(errStatus.Message(), schema.ErrEntityNotFound.Error())
-
-	// 4 - CREATE
-	_, err = client.Create(context.TODO(), &databasev1.IndexRuleBindingRegistryServiceCreateRequest{IndexRuleBinding: getResp.GetIndexRuleBinding()})
-	req.NoError(err)
-
-	// 5 - GET - > Not Nil
-	getResp, err = client.Get(context.TODO(), &databasev1.IndexRuleBindingRegistryServiceGetRequest{
-		Metadata: meta,
+	It("manages the index-rule-binding", func() {
+		client := databasev1.NewIndexRuleBindingRegistryServiceClient(conn)
+		Expect(client).NotTo(BeNil())
+		meta.Name = "sw-index-rule-binding"
+		getResp, err := client.Get(context.TODO(), &databasev1.IndexRuleBindingRegistryServiceGetRequest{Metadata: meta})
+		Expect(err).ShouldNot(HaveOccurred())
+		Expect(getResp).NotTo(BeNil())
+		By("Cleanup the registry")
+		deleteResp, err := client.Delete(context.TODO(), &databasev1.IndexRuleBindingRegistryServiceDeleteRequest{
+			Metadata: meta,
+		})
+		Expect(err).ShouldNot(HaveOccurred())
+		Expect(deleteResp).NotTo(BeNil())
+		Expect(deleteResp.GetDeleted()).To(BeTrue())
+		By("Verifying the registry empty")
+		_, err = client.Get(context.TODO(), &databasev1.IndexRuleBindingRegistryServiceGetRequest{
+			Metadata: meta,
+		})
+		errStatus, _ := status.FromError(err)
+		Expect(errStatus.Message()).To(Equal(schema.ErrEntityNotFound.Error()))
+		By("Creating a new index-rule-binding")
+		_, err = client.Create(context.TODO(), &databasev1.IndexRuleBindingRegistryServiceCreateRequest{IndexRuleBinding: getResp.GetIndexRuleBinding()})
+		Expect(err).ShouldNot(HaveOccurred())
+		By("Verifying the new index-rule-binding")
+		getResp, err = client.Get(context.TODO(), &databasev1.IndexRuleBindingRegistryServiceGetRequest{
+			Metadata: meta,
+		})
+		Expect(err).ShouldNot(HaveOccurred())
+		Expect(getResp).NotTo(BeNil())
 	})
-	req.NoError(err)
-	req.NotNil(getResp)
-}
-
-func TestIndexRuleRegistry(t *testing.T) {
-	req := require.New(t)
-	gracefulStop := setup(req, testData{
-		TLS:  false,
-		addr: "localhost:17912",
+	It("manages the index-rule", func() {
+		client := databasev1.NewIndexRuleRegistryServiceClient(conn)
+		Expect(client).NotTo(BeNil())
+		meta.Name = "db.instance"
+		getResp, err := client.Get(context.TODO(), &databasev1.IndexRuleRegistryServiceGetRequest{Metadata: meta})
+		Expect(err).ShouldNot(HaveOccurred())
+		Expect(getResp).NotTo(BeNil())
+		By("Cleanup the registry")
+		deleteResp, err := client.Delete(context.TODO(), &databasev1.IndexRuleRegistryServiceDeleteRequest{
+			Metadata: meta,
+		})
+		Expect(err).ShouldNot(HaveOccurred())
+		Expect(deleteResp).NotTo(BeNil())
+		Expect(deleteResp.GetDeleted()).To(BeTrue())
+		By("Verifying the registry empty")
+		_, err = client.Get(context.TODO(), &databasev1.IndexRuleRegistryServiceGetRequest{
+			Metadata: meta,
+		})
+		errStatus, _ := status.FromError(err)
+		Expect(errStatus.Message()).To(Equal(schema.ErrEntityNotFound.Error()))
+		By("Creating a new index-rule")
+		_, err = client.Create(context.TODO(), &databasev1.IndexRuleRegistryServiceCreateRequest{IndexRule: getResp.GetIndexRule()})
+		Expect(err).ShouldNot(HaveOccurred())
+		By("Verifying the new index-rule")
+		getResp, err = client.Get(context.TODO(), &databasev1.IndexRuleRegistryServiceGetRequest{
+			Metadata: meta,
+		})
+		Expect(err).ShouldNot(HaveOccurred())
+		Expect(getResp).NotTo(BeNil())
 	})
-	defer gracefulStop()
-
-	conn, err := grpc.Dial("localhost:17912", grpc.WithInsecure())
-	req.NoError(err)
-	req.NotNil(conn)
-
-	client := databasev1.NewIndexRuleRegistryServiceClient(conn)
-	req.NotNil(client)
-
-	meta := &commonv1.Metadata{
-		Group: "default",
-		Name:  "db.instance",
-	}
-
-	getResp, err := client.Get(context.TODO(), &databasev1.IndexRuleRegistryServiceGetRequest{Metadata: meta})
-
-	req.NoError(err)
-	req.NotNil(getResp)
-
-	// 2 - DELETE
-	deleteResp, err := client.Delete(context.TODO(), &databasev1.IndexRuleRegistryServiceDeleteRequest{
-		Metadata: meta,
-	})
-	req.NoError(err)
-	req.NotNil(deleteResp)
-	req.True(deleteResp.GetDeleted())
-
-	// 3 - GET -> Nil
-	_, err = client.Get(context.TODO(), &databasev1.IndexRuleRegistryServiceGetRequest{
-		Metadata: meta,
-	})
-	errStatus, _ := status.FromError(err)
-	req.Equal(errStatus.Message(), schema.ErrEntityNotFound.Error())
-
-	// 4 - CREATE
-	_, err = client.Create(context.TODO(), &databasev1.IndexRuleRegistryServiceCreateRequest{IndexRule: getResp.GetIndexRule()})
-	req.NoError(err)
-
-	// 5 - GET - > Not Nil
-	getResp, err = client.Get(context.TODO(), &databasev1.IndexRuleRegistryServiceGetRequest{
-		Metadata: meta,
+	AfterEach(func() {
+		_ = conn.Close()
+		gracefulStop()
 	})
-	req.NoError(err)
-	req.NotNil(getResp)
-}
+})
diff --git a/banyand/liaison/grpc/server.go b/banyand/liaison/grpc/server.go
index 6720d7b..5081754 100644
--- a/banyand/liaison/grpc/server.go
+++ b/banyand/liaison/grpc/server.go
@@ -27,12 +27,13 @@ import (
 
 	"github.com/apache/skywalking-banyandb/api/event"
 	databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+	measurev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
 	streamv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
 	"github.com/apache/skywalking-banyandb/banyand/discovery"
 	"github.com/apache/skywalking-banyandb/banyand/metadata"
 	"github.com/apache/skywalking-banyandb/banyand/queue"
+	"github.com/apache/skywalking-banyandb/pkg/bus"
 	"github.com/apache/skywalking-banyandb/pkg/logger"
-	"github.com/apache/skywalking-banyandb/pkg/partition"
 	"github.com/apache/skywalking-banyandb/pkg/run"
 )
 
@@ -56,22 +57,26 @@ type Server struct {
 	pipeline       queue.Queue
 	repo           discovery.ServiceRepo
 	creds          credentials.TransportCredentials
-	shardRepo      *shardRepo
-	entityRepo     *entityRepo
+
+	streamSVC  *streamService
+	measureSVC *measureService
 	*streamRegistryServer
 	*indexRuleBindingRegistryServer
 	*indexRuleRegistryServer
 	*measureRegistryServer
 	*groupRegistryServer
-	streamv1.UnimplementedStreamServiceServer
 }
 
 func NewServer(_ context.Context, pipeline queue.Queue, repo discovery.ServiceRepo, schemaRegistry metadata.Service) *Server {
 	return &Server{
-		pipeline:   pipeline,
-		repo:       repo,
-		shardRepo:  &shardRepo{shardEventsMap: make(map[identity]uint32)},
-		entityRepo: &entityRepo{entitiesMap: make(map[identity]partition.EntityLocator)},
+		pipeline: pipeline,
+		repo:     repo,
+		streamSVC: &streamService{
+			discoveryService: newDiscoveryService(pipeline),
+		},
+		measureSVC: &measureService{
+			discoveryService: newDiscoveryService(pipeline),
+		},
 		streamRegistryServer: &streamRegistryServer{
 			schemaRegistry: schemaRegistry,
 		},
@@ -92,13 +97,34 @@ func NewServer(_ context.Context, pipeline queue.Queue, repo discovery.ServiceRe
 
 func (s *Server) PreRun() error {
 	s.log = logger.GetLogger("liaison-grpc")
-	s.shardRepo.log = s.log
-	s.entityRepo.log = s.log
-	err := s.repo.Subscribe(event.StreamTopicShardEvent, s.shardRepo)
-	if err != nil {
-		return err
+	components := []struct {
+		shardEvent   bus.Topic
+		entityEvent  bus.Topic
+		discoverySVC *discoveryService
+	}{
+		{
+			shardEvent:   event.StreamTopicShardEvent,
+			entityEvent:  event.StreamTopicEntityEvent,
+			discoverySVC: s.streamSVC.discoveryService,
+		},
+		{
+			shardEvent:   event.MeasureTopicShardEvent,
+			entityEvent:  event.MeasureTopicEntityEvent,
+			discoverySVC: s.measureSVC.discoveryService,
+		},
+	}
+	for _, c := range components {
+		c.discoverySVC.SetLogger(s.log)
+		err := s.repo.Subscribe(c.shardEvent, c.discoverySVC.shardRepo)
+		if err != nil {
+			return err
+		}
+		err = s.repo.Subscribe(c.entityEvent, c.discoverySVC.entityRepo)
+		if err != nil {
+			return err
+		}
 	}
-	return s.repo.Subscribe(event.StreamTopicEntityEvent, s.entityRepo)
+	return nil
 }
 
 func (s *Server) Name() string {
@@ -150,7 +176,9 @@ func (s *Server) Serve() error {
 	}
 	opts = append(opts, grpclib.MaxRecvMsgSize(s.maxRecvMsgSize))
 	s.ser = grpclib.NewServer(opts...)
-	streamv1.RegisterStreamServiceServer(s.ser, s)
+
+	streamv1.RegisterStreamServiceServer(s.ser, s.streamSVC)
+	measurev1.RegisterMeasureServiceServer(s.ser, s.measureSVC)
 	// register *Registry
 	databasev1.RegisterGroupRegistryServiceServer(s.ser, s.groupRegistryServer)
 	databasev1.RegisterIndexRuleBindingRegistryServiceServer(s.ser, s.indexRuleBindingRegistryServer)
diff --git a/banyand/liaison/grpc/stream.go b/banyand/liaison/grpc/stream.go
index df831c4..8d125b9 100644
--- a/banyand/liaison/grpc/stream.go
+++ b/banyand/liaison/grpc/stream.go
@@ -28,7 +28,12 @@ import (
 	"github.com/apache/skywalking-banyandb/pkg/bus"
 )
 
-func (s *Server) Write(stream streamv1.StreamService_WriteServer) error {
+type streamService struct {
+	*discoveryService
+	streamv1.UnimplementedStreamServiceServer
+}
+
+func (s *streamService) Write(stream streamv1.StreamService_WriteServer) error {
 	for {
 		writeEntity, err := stream.Recv()
 		if err == io.EOF {
@@ -37,18 +42,9 @@ func (s *Server) Write(stream streamv1.StreamService_WriteServer) error {
 		if err != nil {
 			return err
 		}
-		id := getID(writeEntity.GetMetadata())
-		shardNum, existed := s.shardRepo.shardNum(id)
-		if !existed {
-			continue
-		}
-		locator, existed := s.entityRepo.getLocator(id)
-		if !existed {
-			continue
-		}
-		entity, shardID, err := locator.Locate(writeEntity.GetElement().TagFamilies, shardNum)
+		entity, shardID, err := s.navigate(writeEntity.GetMetadata(), writeEntity.GetElement().GetTagFamilies())
 		if err != nil {
-			s.log.Error().Err(err).Msg("failed to locate write target")
+			s.log.Error().Err(err).Msg("failed to navigate to the write target")
 			continue
 		}
 		message := bus.NewMessage(bus.MessageID(time.Now().UnixNano()), &streamv1.InternalWriteRequest{
@@ -66,7 +62,7 @@ func (s *Server) Write(stream streamv1.StreamService_WriteServer) error {
 	}
 }
 
-func (s *Server) Query(_ context.Context, entityCriteria *streamv1.QueryRequest) (*streamv1.QueryResponse, error) {
+func (s *streamService) Query(_ context.Context, entityCriteria *streamv1.QueryRequest) (*streamv1.QueryResponse, error) {
 	message := bus.NewMessage(bus.MessageID(time.Now().UnixNano()), entityCriteria)
 	feat, errQuery := s.pipeline.Publish(data.TopicStreamQuery, message)
 	if errQuery != nil {
diff --git a/banyand/liaison/grpc/stream_test.go b/banyand/liaison/grpc/stream_test.go
index 1f30088..596f4ed 100644
--- a/banyand/liaison/grpc/stream_test.go
+++ b/banyand/liaison/grpc/stream_test.go
@@ -15,26 +15,25 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package grpc
+package grpc_test
 
 import (
 	"context"
 	"encoding/base64"
-	"fmt"
 	"io"
 	"path/filepath"
 	"runtime"
 	"sync"
-	"testing"
 	"time"
 
-	"github.com/stretchr/testify/assert"
-	"github.com/stretchr/testify/require"
+	. "github.com/onsi/ginkgo/v2"
+	. "github.com/onsi/gomega"
 	grpclib "google.golang.org/grpc"
 	"google.golang.org/grpc/credentials"
 
 	streamv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
 	"github.com/apache/skywalking-banyandb/banyand/discovery"
+	"github.com/apache/skywalking-banyandb/banyand/liaison/grpc"
 	"github.com/apache/skywalking-banyandb/banyand/metadata"
 	"github.com/apache/skywalking-banyandb/banyand/query"
 	"github.com/apache/skywalking-banyandb/banyand/queue"
@@ -46,51 +45,68 @@ import (
 	teststream "github.com/apache/skywalking-banyandb/pkg/test/stream"
 )
 
-var _ run.PreRunner = (*preloadStreamService)(nil)
-
-type preloadStreamService struct {
-	metaSvc metadata.Service
-}
-
-func (p *preloadStreamService) Name() string {
-	return "preload-measure"
-}
-
-func (p *preloadStreamService) PreRun() error {
-	return teststream.PreloadSchema(p.metaSvc.SchemaRegistry())
-}
-
-type testData struct {
-	certFile           string
-	serverHostOverride string
-	TLS                bool
-	addr               string
-	basePath           string
-}
+var _ = Describe("Stream", func() {
+	var gracefulStop func()
+	var conn *grpclib.ClientConn
+	It("is a plain server", func() {
+		gracefulStop = setup(nil)
+		var err error
+		conn, err = grpclib.Dial("localhost:17912", grpclib.WithInsecure())
+		Expect(err).NotTo(HaveOccurred())
+		streamWrite(conn)
+		Eventually(func() (int, error) {
+			return streamQuery(conn)
+		}).Should(Equal(1))
+	})
+	It("is a TLS server", func() {
+		flags := []string{"--tls=true"}
+		_, currentFile, _, _ := runtime.Caller(0)
+		basePath := filepath.Dir(currentFile)
+		certFile := filepath.Join(basePath, "testdata/server_cert.pem")
+		keyFile := filepath.Join(basePath, "testdata/server_key.pem")
+		flags = append(flags, "--cert-file="+certFile)
+		flags = append(flags, "--key-file="+keyFile)
+		addr := "localhost:17913"
+		flags = append(flags, "--addr="+addr)
+		gracefulStop = setup(flags)
+		creds, err := credentials.NewClientTLSFromFile(certFile, "localhost")
+		Expect(err).NotTo(HaveOccurred())
+		conn, err = grpclib.Dial(addr, grpclib.WithTransportCredentials(creds))
+		Expect(err).NotTo(HaveOccurred())
+		streamWrite(conn)
+		Eventually(func() (int, error) {
+			return streamQuery(conn)
+		}).Should(Equal(1))
+	})
+	AfterEach(func() {
+		_ = conn.Close()
+		gracefulStop()
+	})
+})
 
-func setup(req *require.Assertions, testData testData) func() {
-	req.NoError(logger.Init(logger.Logging{
+func setup(flags []string) func() {
+	Expect(logger.Init(logger.Logging{
 		Env:   "dev",
 		Level: "warn",
-	}))
+	})).Should(Succeed())
 	g := run.Group{Name: "standalone"}
 	// Init `Discovery` module
 	repo, err := discovery.NewServiceRepo(context.Background())
-	req.NoError(err)
+	Expect(err).NotTo(HaveOccurred())
 	// Init `Queue` module
 	pipeline, err := queue.NewQueue(context.TODO(), repo)
-	req.NoError(err)
+	Expect(err).NotTo(HaveOccurred())
 	// Init `Metadata` module
 	metaSvc, err := metadata.NewService(context.TODO())
-	req.NoError(err)
+	Expect(err).NotTo(HaveOccurred())
 	// Init `Stream` module
 	streamSvc, err := stream.NewService(context.TODO(), metaSvc, repo, pipeline)
-	req.NoError(err)
+	Expect(err).NotTo(HaveOccurred())
 	// Init `Query` module
 	q, err := query.NewExecutor(context.TODO(), streamSvc, metaSvc, repo, pipeline)
-	req.NoError(err)
+	Expect(err).NotTo(HaveOccurred())
 
-	tcp := NewServer(context.TODO(), pipeline, repo, metaSvc)
+	tcp := grpc.NewServer(context.TODO(), pipeline, repo, metaSvc)
 
 	closer := run.NewTester("closer")
 	startListener := run.NewTester("started-listener")
@@ -108,18 +124,12 @@ func setup(req *require.Assertions, testData testData) func() {
 		startListener,
 	)
 	// Create a random directory
-	rootPath, deferFunc := test.Space(req)
-	flags := []string{"--root-path=" + rootPath, "--metadata-root-path=" + teststream.RandomTempDir()}
-	if testData.TLS {
-		flags = append(flags, "--tls=true")
-		certFile := filepath.Join(testData.basePath, "testdata/server_cert.pem")
-		keyFile := filepath.Join(testData.basePath, "testdata/server_key.pem")
-		flags = append(flags, "--cert-file="+certFile)
-		flags = append(flags, "--key-file="+keyFile)
-		flags = append(flags, "--addr="+testData.addr)
-	}
+	rootPath, deferFunc, err := test.NewSpace()
+	Expect(err).NotTo(HaveOccurred())
+	flags = append(flags, "--root-path="+rootPath, "--metadata-root-path="+teststream.RandomTempDir())
+
 	err = g.RegisterFlags().Parse(flags)
-	req.NoError(err)
+	Expect(err).NotTo(HaveOccurred())
 
 	wg := sync.WaitGroup{}
 
@@ -130,72 +140,27 @@ func setup(req *require.Assertions, testData testData) func() {
 		errRun := g.Run()
 		if errRun != nil {
 			startListener.GracefulStop()
-			req.NoError(errRun)
+			Expect(errRun).Should(Succeed())
 		}
 		deferFunc()
 	}()
-	req.NoError(startListener.WaitUntilStarted())
+	Expect(startListener.WaitUntilStarted()).Should(Succeed())
 	return func() {
 		closer.GracefulStop()
 		wg.Wait()
 	}
 }
 
-type caseData struct {
-	name           string
-	queryGenerator func(baseTs time.Time) *streamv1.QueryRequest
-	writeGenerator func() *streamv1.WriteRequest
-	args           testData
-	wantLen        int
+type preloadStreamService struct {
+	metaSvc metadata.Service
 }
 
-func TestStreamService(t *testing.T) {
-	req := require.New(t)
-	_, currentFile, _, _ := runtime.Caller(0)
-	basePath := filepath.Dir(currentFile)
-	certFile := filepath.Join(basePath, "testdata/server_cert.pem")
-	testCases := []caseData{
-		{
-			name:           "isTLS",
-			queryGenerator: queryCriteria,
-			writeGenerator: writeData,
-			args: testData{
-				TLS:                true,
-				certFile:           certFile,
-				serverHostOverride: "localhost",
-				addr:               "localhost:17913",
-				basePath:           basePath,
-			},
-			wantLen: 1,
-		},
-		{
-			name:           "noTLS",
-			queryGenerator: queryCriteria,
-			writeGenerator: writeData,
-			args: testData{
-				TLS:  false,
-				addr: "localhost:17912",
-			},
-			wantLen: 1,
-		},
-	}
-	for _, tc := range testCases {
-		t.Run(tc.name, func(t *testing.T) {
-			gracefulStop := setup(req, tc.args)
-			defer gracefulStop()
-			if tc.args.TLS {
-				var opts []grpclib.DialOption
-				creds, err := credentials.NewClientTLSFromFile(tc.args.certFile, tc.args.serverHostOverride)
-				assert.NoError(t, err)
-				opts = append(opts, grpclib.WithTransportCredentials(creds))
-				dialService(t, tc, opts)
-			} else {
-				var opts []grpclib.DialOption
-				opts = append(opts, grpclib.WithInsecure())
-				dialService(t, tc, opts)
-			}
-		})
-	}
+func (p *preloadStreamService) Name() string {
+	return "preload-measure"
+}
+
+func (p *preloadStreamService) PreRun() error {
+	return teststream.PreloadSchema(p.metaSvc.SchemaRegistry())
 }
 
 func writeData() *streamv1.WriteRequest {
@@ -227,60 +192,23 @@ func queryCriteria(baseTs time.Time) *streamv1.QueryRequest {
 		Build()
 }
 
-func dialService(t *testing.T, tc caseData, opts []grpclib.DialOption) {
-	conn, err := grpclib.Dial(tc.args.addr, opts...)
-	assert.NoError(t, err)
-	defer func(conn *grpclib.ClientConn) {
-		_ = conn.Close()
-	}(conn)
-	streamWrite(t, tc, conn)
-	requireTester := require.New(t)
-	assert.NoError(t, test.Retry(10, 100*time.Millisecond, func() error {
-		now := time.Now()
-		resp := streamQuery(requireTester, conn, tc.queryGenerator(now))
-		if len(resp.GetElements()) == tc.wantLen {
-			return nil
-		}
-		return fmt.Errorf("expected elements number: %d got: %d", tc.wantLen, len(resp.GetElements()))
-	}))
-}
-
-func streamWrite(t *testing.T, tc caseData, conn *grpclib.ClientConn) {
+func streamWrite(conn *grpclib.ClientConn) {
 	client := streamv1.NewStreamServiceClient(conn)
 	ctx := context.Background()
 	writeClient, errorWrite := client.Write(ctx)
-	if errorWrite != nil {
-		t.Errorf("%v.write(_) = _, %v", client, errorWrite)
-	}
-	waitc := make(chan struct{})
-	go func() {
-		for {
-			writeResponse, errRecv := writeClient.Recv()
-			if errRecv == io.EOF {
-				// read done.
-				close(waitc)
-				return
-			}
-			assert.NoError(t, errRecv)
-			assert.NotNil(t, writeResponse)
-		}
-	}()
-	if errSend := writeClient.Send(tc.writeGenerator()); errSend != nil {
-		t.Errorf("Failed to send a note: %v", errSend)
-	}
-	if errorSend := writeClient.CloseSend(); errorSend != nil {
-		t.Errorf("Failed to send a note: %v", errorSend)
-	}
-	<-waitc
+	Expect(errorWrite).Should(Succeed())
+	Expect(writeClient.Send(writeData())).Should(Succeed())
+	Expect(writeClient.CloseSend()).Should(Succeed())
+	Eventually(func() error {
+		_, err := writeClient.Recv()
+		return err
+	}).Should(Equal(io.EOF))
 }
 
-func streamQuery(tester *require.Assertions, conn *grpclib.ClientConn, request *streamv1.QueryRequest) *streamv1.QueryResponse {
+func streamQuery(conn *grpclib.ClientConn) (int, error) {
 	client := streamv1.NewStreamServiceClient(conn)
 	ctx := context.Background()
-	queryResponse, errRev := client.Query(ctx, request)
-	if errRev != nil {
-		tester.Errorf(errRev, "Retrieve client failed: %v")
-	}
-	tester.NotNil(queryResponse)
-	return queryResponse
+	resp, err := client.Query(ctx, queryCriteria(time.Now()))
+
+	return len(resp.GetElements()), err
 }
diff --git a/go.mod b/go.mod
index adda1b0..38d87ab 100644
--- a/go.mod
+++ b/go.mod
@@ -29,6 +29,11 @@ require (
 )
 
 require (
+	github.com/onsi/ginkgo/v2 v2.0.0
+	github.com/onsi/gomega v1.17.0
+)
+
+require (
 	github.com/beorn7/perks v1.0.1 // indirect
 	github.com/bits-and-blooms/bitset v1.2.0 // indirect
 	github.com/cespare/xxhash/v2 v2.1.1 // indirect
diff --git a/go.sum b/go.sum
index 5b62477..fb13844 100644
--- a/go.sum
+++ b/go.sum
@@ -146,6 +146,7 @@ github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9
 github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk=
 github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A=
 github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
+github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0/go.mod h1:fyg7847qk6SyHyPtNmDHnmrv/HOrqktSC+C9fM+CJOE=
 github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
 github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
 github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4=
@@ -222,6 +223,7 @@ github.com/google/pprof v0.0.0-20201023163331-3e6fc7fc9c4c/go.mod h1:kpwsk12EmLe
 github.com/google/pprof v0.0.0-20201203190320-1bf35d6f28c2/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE=
 github.com/google/pprof v0.0.0-20210122040257-d980be63207e/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE=
 github.com/google/pprof v0.0.0-20210226084205-cbba55b83ad5/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE=
+github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE=
 github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
 github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
 github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
@@ -261,6 +263,7 @@ github.com/hashicorp/logutils v1.0.0/go.mod h1:QIAnNjmIWmVIIkWDTG1z5v++HQmx9WQRO
 github.com/hashicorp/mdns v1.0.0/go.mod h1:tL+uN++7HEJ6SQLQ2/p+z2pH24WQKWjBPkE0mNTz8vQ=
 github.com/hashicorp/memberlist v0.1.3/go.mod h1:ajVTdAv/9Im8oMAAj5G31PhhMCZJV2pPBoIllUwCN7I=
 github.com/hashicorp/serf v0.8.2/go.mod h1:6hOLApaqBFA1NXqRQAsxw9QxuDEvNxSQRwA/JwenrHc=
+github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
 github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
 github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
 github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM=
@@ -325,9 +328,22 @@ github.com/mschoch/smat v0.2.0 h1:8imxQsjDm8yFEAVBe7azKmKSgzSkZXDuKkSq9374khM=
 github.com/mschoch/smat v0.2.0/go.mod h1:kc9mz7DoBKqDyiRL7VZN8KvXQMWeTaVnttLRXOlotKw=
 github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
 github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
+github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=
+github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE=
+github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU=
 github.com/oklog/run v1.1.0 h1:GEenZ1cK0+q0+wsJew9qUg/DyD8k3JzYsZAi5gYi2mA=
 github.com/oklog/run v1.1.0/go.mod h1:sVPdnTZT1zYwAJeCMu2Th4T21pA3FPOQRfWjQlk7DVU=
 github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U=
+github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
+github.com/onsi/ginkgo v1.12.1/go.mod h1:zj2OWP4+oCPe1qIXoGWkgMRwljMUYCdkwsT2108oapk=
+github.com/onsi/ginkgo v1.16.4 h1:29JGrr5oVBm5ulCWet69zQkzWipVXIol6ygQUe/EzNc=
+github.com/onsi/ginkgo v1.16.4/go.mod h1:dX+/inL/fNMqNlz0e9LfyB9TswhZpCVdJM/Z6Vvnwo0=
+github.com/onsi/ginkgo/v2 v2.0.0 h1:CcuG/HvWNkkaqCUpJifQY8z7qEMBJya6aLPx6ftGyjQ=
+github.com/onsi/ginkgo/v2 v2.0.0/go.mod h1:vw5CSIxN1JObi/U8gcbwft7ZxR2dgaR70JSE3/PpL4c=
+github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY=
+github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo=
+github.com/onsi/gomega v1.17.0 h1:9Luw4uT5HTjHTN8+aNcSThgH1vdXnmdJ8xIfZ4wyTRE=
+github.com/onsi/gomega v1.17.0/go.mod h1:HnhC7FXeEQY45zxNK3PPoIUhzk/80Xly9PcubAlGdZY=
 github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
 github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
 github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic=
@@ -543,6 +559,7 @@ golang.org/x/mod v0.4.1/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
 golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
 golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
 golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
+golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
 golang.org/x/net v0.0.0-20181023162649-9b4f9f5ad519/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
 golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
 golang.org/x/net v0.0.0-20181201002055-351d144fa1fc/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
@@ -568,6 +585,7 @@ golang.org/x/net v0.0.0-20200324143707-d3edc9973b7e/go.mod h1:qpuaurCH72eLCgpAm/
 golang.org/x/net v0.0.0-20200501053045-e0ff5e5a1de5/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
 golang.org/x/net v0.0.0-20200506145744-7e3656a0809f/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
 golang.org/x/net v0.0.0-20200513185701-a91f0712d120/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
+golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
 golang.org/x/net v0.0.0-20200520182314-0ba52f642ac2/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
 golang.org/x/net v0.0.0-20200625001655-4c5254603344/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
 golang.org/x/net v0.0.0-20200707034311-ab3426394381/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
@@ -581,6 +599,7 @@ golang.org/x/net v0.0.0-20210119194325-5f4716e94777/go.mod h1:m0MpNAwzfU5UDzcl9v
 golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
 golang.org/x/net v0.0.0-20210316092652-d523dce5a7f4/go.mod h1:RBQZq4jEuRlivfhVLdyRGr576XBO4/greRjx4P4O3yc=
 golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
+golang.org/x/net v0.0.0-20210428140749-89ef3d95e781/go.mod h1:OJAsFXCWl8Ukc7SiCT/9KSuxbyM7479/AVlXFRxuMCk=
 golang.org/x/net v0.0.0-20210716203947-853a461950ff h1:j2EK/QoxYNBsXI4R7fQkkRUk8y6wnOBI+6hgPdP/6Ds=
 golang.org/x/net v0.0.0-20210716203947-853a461950ff/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
 golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
@@ -610,6 +629,7 @@ golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJ
 golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
 golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
 golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
 golang.org/x/sys v0.0.0-20181026203630-95b1ffbd15a5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
 golang.org/x/sys v0.0.0-20181107165924-66b7b1311ac8/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
 golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
@@ -623,9 +643,11 @@ golang.org/x/sys v0.0.0-20190507160741-ecd444e8653b/go.mod h1:h1NjWce9XRLGQEsW7w
 golang.org/x/sys v0.0.0-20190606165138-5da285871e9c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20190624142023-c5567b49c5d0/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20190726091711-fc99dfbffb4e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20190904154756-749cb33beabd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20191001151750-bb3f8db39f24/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20191120155948-bd437916bb0e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20191204072324-ce4227a45e2e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20191228213918-04cbcbbfeed8/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20200106162015-b016eb3dc98e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
@@ -650,6 +672,7 @@ golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7w
 golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20201201145000-ef89a241ccb3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20210104204734-6f8348627aad/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20210112080510-489259a85091/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20210220050731-9a76102bfb43/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
@@ -728,6 +751,7 @@ golang.org/x/tools v0.0.0-20200904185747-39188db58858/go.mod h1:Cj7w3i3Rnn0Xh82u
 golang.org/x/tools v0.0.0-20201110124207-079ba7bd75cd/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
 golang.org/x/tools v0.0.0-20201201161351-ac6f37ff4c2a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
 golang.org/x/tools v0.0.0-20201208233053-a543418bbed2/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
+golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
 golang.org/x/tools v0.0.0-20210105154028-b0ab187a4818/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
 golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
 golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0=
@@ -856,12 +880,15 @@ gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8
 gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo=
 gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
 gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
+gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
 gopkg.in/ini.v1 v1.51.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
 gopkg.in/ini.v1 v1.62.0 h1:duBzk771uxoUuOlyRLkHsygud9+5lrlGjdFBb4mSKDU=
 gopkg.in/ini.v1 v1.62.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
 gopkg.in/natefinch/lumberjack.v2 v2.0.0 h1:1Lc07Kr7qY4U2YPouBjpCLxpiyxIVoxqXgkXLknAOE8=
 gopkg.in/natefinch/lumberjack.v2 v2.0.0/go.mod h1:l0ndWWf7gzL7RNwBG7wST/UCcT4T24xpD6X8LsfU/+k=
 gopkg.in/resty.v1 v1.12.0/go.mod h1:mDo4pnntr5jdWRML875a/NmxYqAlA73dVijT2AXvQQo=
+gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ=
+gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
 gopkg.in/yaml.v2 v2.0.0-20170812160011-eb3733d160e7/go.mod h1:JAlM8MvJe8wmxCU4Bli9HhUf9+ttbYbLASfIpnQbh74=
 gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
 gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
diff --git a/pkg/logger/logger.go b/pkg/logger/logger.go
index 1407a4f..36896ed 100644
--- a/pkg/logger/logger.go
+++ b/pkg/logger/logger.go
@@ -46,3 +46,8 @@ func (l *Logger) Named(name string) *Logger {
 	subLogger := root.Logger.With().Str("module", module).Logger()
 	return &Logger{module: module, Logger: &subLogger}
 }
+
+// Loggable indicates the implement supports logging
+type Loggable interface {
+	SetLogger(*Logger)
+}
diff --git a/pkg/test/space.go b/pkg/test/space.go
index bbe7ea3..8fafdb3 100644
--- a/pkg/test/space.go
+++ b/pkg/test/space.go
@@ -35,3 +35,12 @@ func Space(t *require.Assertions) (tempDir string, deferFunc func()) {
 		}
 	}
 }
+
+func NewSpace() (tempDir string, deferFunc func(), err error) {
+	tempDir, err = ioutil.TempDir("", "banyandb-test-*")
+	return tempDir, func() {
+		if err = os.RemoveAll(tempDir); err != nil {
+			_, _ = fmt.Fprintf(os.Stderr, "Error while removing dir: %v\n", err)
+		}
+	}, err
+}