You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ha...@apache.org on 2022/01/14 00:07:41 UTC

[skywalking-banyandb] branch test-ginkgo created (now 5a15e6b)

This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a change to branch test-ginkgo
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git.


      at 5a15e6b  Introduce ginkgo+gomega test framework

This branch includes the following new commits:

     new 5a15e6b  Introduce ginkgo+gomega test framework

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


[skywalking-banyandb] 01/01: Introduce ginkgo+gomega test framework

Posted by ha...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch test-ginkgo
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit 5a15e6b13bbb621b3f1cd2a3fa98030d09df79fd
Author: Gao Hongtao <ha...@gmail.com>
AuthorDate: Fri Jan 14 00:05:23 2022 +0000

    Introduce ginkgo+gomega test framework
---
 banyand/liaison/grpc/discovery.go                  |  41 ++++
 .../liaison/grpc/grpc_suite_test.go                |  21 +-
 banyand/liaison/grpc/{stream.go => measure.go}     |  50 ++--
 banyand/liaison/grpc/registry_test.go              | 253 +++++++++------------
 banyand/liaison/grpc/server.go                     |  58 +++--
 banyand/liaison/grpc/stream.go                     |  22 +-
 banyand/liaison/grpc/stream_test.go                | 246 +++++++-------------
 go.mod                                             |   5 +
 go.sum                                             |  26 +++
 pkg/logger/logger.go                               |   5 +
 pkg/test/space.go                                  |   9 +
 11 files changed, 342 insertions(+), 394 deletions(-)

diff --git a/banyand/liaison/grpc/discovery.go b/banyand/liaison/grpc/discovery.go
index e7e2d76..a383589 100644
--- a/banyand/liaison/grpc/discovery.go
+++ b/banyand/liaison/grpc/discovery.go
@@ -20,13 +20,54 @@ package grpc
 import (
 	"sync"
 
+	"github.com/apache/skywalking-banyandb/api/common"
 	commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
 	databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+	modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+	"github.com/apache/skywalking-banyandb/banyand/queue"
+	"github.com/apache/skywalking-banyandb/banyand/tsdb"
 	"github.com/apache/skywalking-banyandb/pkg/bus"
 	"github.com/apache/skywalking-banyandb/pkg/logger"
 	"github.com/apache/skywalking-banyandb/pkg/partition"
+	"github.com/pkg/errors"
 )
 
+var ErrNotExist = errors.New("the object doesn't exist")
+
+type discoveryService struct {
+	shardRepo  *shardRepo
+	entityRepo *entityRepo
+	pipeline   queue.Queue
+	log        *logger.Logger
+}
+
+func newDiscoveryService(pipeline queue.Queue) *discoveryService {
+	return &discoveryService{
+		shardRepo:  &shardRepo{shardEventsMap: make(map[identity]uint32)},
+		entityRepo: &entityRepo{entitiesMap: make(map[identity]partition.EntityLocator)},
+		pipeline:   pipeline,
+	}
+}
+
+func (ds *discoveryService) SetLogger(log *logger.Logger) {
+	ds.log = log
+	ds.shardRepo.log = log
+	ds.entityRepo.log = log
+}
+
+func (ds *discoveryService) navigate(metadata *commonv1.Metadata, tagFamilies []*modelv1.TagFamilyForWrite) (tsdb.Entity, common.ShardID, error) {
+	id := getID(metadata)
+	shardNum, existed := ds.shardRepo.shardNum(id)
+	if !existed {
+		return nil, common.ShardID(0), errors.Wrapf(ErrNotExist, "finding the shard num by: %v", metadata)
+	}
+	locator, existed := ds.entityRepo.getLocator(id)
+	if !existed {
+		return nil, common.ShardID(0), errors.Wrapf(ErrNotExist, "finding the locator by: %v", metadata)
+	}
+	return locator.Locate(tagFamilies, shardNum)
+}
+
 type identity struct {
 	name  string
 	group string
diff --git a/pkg/test/space.go b/banyand/liaison/grpc/grpc_suite_test.go
similarity index 67%
copy from pkg/test/space.go
copy to banyand/liaison/grpc/grpc_suite_test.go
index bbe7ea3..ae8f957 100644
--- a/pkg/test/space.go
+++ b/banyand/liaison/grpc/grpc_suite_test.go
@@ -15,23 +15,16 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package test
+package grpc_test
 
 import (
-	"fmt"
-	"io/ioutil"
-	"os"
+	"testing"
 
-	"github.com/stretchr/testify/require"
+	. "github.com/onsi/ginkgo/v2"
+	. "github.com/onsi/gomega"
 )
 
-func Space(t *require.Assertions) (tempDir string, deferFunc func()) {
-	var tempDirErr error
-	tempDir, tempDirErr = ioutil.TempDir("", "banyandb-test-*")
-	t.Nil(tempDirErr)
-	return tempDir, func() {
-		if err := os.RemoveAll(tempDir); err != nil {
-			_, _ = fmt.Fprintf(os.Stderr, "Error while removing dir: %v\n", err)
-		}
-	}
+func TestGrpc(t *testing.T) {
+	RegisterFailHandler(Fail)
+	RunSpecs(t, "Grpc Suite")
 }
diff --git a/banyand/liaison/grpc/stream.go b/banyand/liaison/grpc/measure.go
similarity index 51%
copy from banyand/liaison/grpc/stream.go
copy to banyand/liaison/grpc/measure.go
index df831c4..530b024 100644
--- a/banyand/liaison/grpc/stream.go
+++ b/banyand/liaison/grpc/measure.go
@@ -18,67 +18,47 @@
 package grpc
 
 import (
-	"context"
 	"io"
 	"time"
 
 	"github.com/apache/skywalking-banyandb/api/data"
-	streamv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
+	measurev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
 	"github.com/apache/skywalking-banyandb/banyand/tsdb"
 	"github.com/apache/skywalking-banyandb/pkg/bus"
 )
 
-func (s *Server) Write(stream streamv1.StreamService_WriteServer) error {
+type measureService struct {
+	*discoveryService
+	measurev1.UnimplementedMeasureServiceServer
+}
+
+func (ms *measureService) Write(measure measurev1.MeasureService_WriteServer) error {
 	for {
-		writeEntity, err := stream.Recv()
+		writeRequest, err := measure.Recv()
 		if err == io.EOF {
 			return nil
 		}
 		if err != nil {
 			return err
 		}
-		id := getID(writeEntity.GetMetadata())
-		shardNum, existed := s.shardRepo.shardNum(id)
-		if !existed {
-			continue
-		}
-		locator, existed := s.entityRepo.getLocator(id)
-		if !existed {
-			continue
-		}
-		entity, shardID, err := locator.Locate(writeEntity.GetElement().TagFamilies, shardNum)
+		entity, shardID, err := ms.navigate(writeRequest.GetMetadata(), writeRequest.GetDataPoint().GetTagFamilies())
 		if err != nil {
-			s.log.Error().Err(err).Msg("failed to locate write target")
+			ms.log.Error().Err(err).Msg("failed to navigate to the write target")
 			continue
 		}
-		message := bus.NewMessage(bus.MessageID(time.Now().UnixNano()), &streamv1.InternalWriteRequest{
-			Request:    writeEntity,
+		message := bus.NewMessage(bus.MessageID(time.Now().UnixNano()), &measurev1.InternalWriteRequest{
+			Request:    writeRequest,
 			ShardId:    uint32(shardID),
 			SeriesHash: tsdb.HashEntity(entity),
 		})
-		_, errWritePub := s.pipeline.Publish(data.TopicStreamWrite, message)
+		_, errWritePub := ms.pipeline.Publish(data.TopicMeasureWrite, message)
 		if errWritePub != nil {
 			return errWritePub
 		}
-		if errSend := stream.Send(&streamv1.WriteResponse{}); errSend != nil {
+		if errSend := measure.Send(&measurev1.WriteResponse{}); errSend != nil {
 			return errSend
 		}
 	}
 }
 
-func (s *Server) Query(_ context.Context, entityCriteria *streamv1.QueryRequest) (*streamv1.QueryResponse, error) {
-	message := bus.NewMessage(bus.MessageID(time.Now().UnixNano()), entityCriteria)
-	feat, errQuery := s.pipeline.Publish(data.TopicStreamQuery, message)
-	if errQuery != nil {
-		return nil, errQuery
-	}
-	msg, errFeat := feat.Get()
-	if errFeat != nil {
-		return nil, errFeat
-	}
-	queryMsg, ok := msg.Data().([]*streamv1.Element)
-	if !ok {
-		return nil, ErrQueryMsg
-	}
-	return &streamv1.QueryResponse{Elements: queryMsg}, nil
-}
+//TODO: implement topN & Query
diff --git a/banyand/liaison/grpc/registry_test.go b/banyand/liaison/grpc/registry_test.go
index 1c21f8a..13717e1 100644
--- a/banyand/liaison/grpc/registry_test.go
+++ b/banyand/liaison/grpc/registry_test.go
@@ -15,173 +15,126 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package grpc
+package grpc_test
 
 import (
 	"context"
-	"testing"
 
-	"github.com/stretchr/testify/require"
 	"google.golang.org/grpc"
 	"google.golang.org/grpc/status"
 
 	commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
 	databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
 	"github.com/apache/skywalking-banyandb/banyand/metadata/schema"
-)
-
-func TestStreamRegistry(t *testing.T) {
-	req := require.New(t)
-	gracefulStop := setup(req, testData{
-		TLS:  false,
-		addr: "localhost:17912",
-	})
-	defer gracefulStop()
 
-	conn, err := grpc.Dial("localhost:17912", grpc.WithInsecure())
-	req.NoError(err)
-	req.NotNil(conn)
-
-	client := databasev1.NewStreamRegistryServiceClient(conn)
-	req.NotNil(client)
+	. "github.com/onsi/ginkgo/v2"
+	. "github.com/onsi/gomega"
+)
 
+var _ = Describe("Registry", func() {
+	var gracefulStop func()
+	var conn *grpc.ClientConn
 	meta := &commonv1.Metadata{
 		Group: "default",
-		Name:  "sw",
 	}
-
-	getResp, err := client.Get(context.TODO(), &databasev1.StreamRegistryServiceGetRequest{Metadata: meta})
-
-	req.NoError(err)
-	req.NotNil(getResp)
-
-	// 2 - DELETE
-	deleteResp, err := client.Delete(context.TODO(), &databasev1.StreamRegistryServiceDeleteRequest{
-		Metadata: meta,
+	BeforeEach(func() {
+		gracefulStop = setup(nil)
+		var err error
+		conn, err = grpc.Dial("localhost:17912", grpc.WithInsecure())
+		Expect(err).NotTo(HaveOccurred())
 	})
-	req.NoError(err)
-	req.NotNil(deleteResp)
-	req.True(deleteResp.GetDeleted())
-
-	// 3 - GET -> Nil
-	_, err = client.Get(context.TODO(), &databasev1.StreamRegistryServiceGetRequest{
-		Metadata: meta,
-	})
-	errStatus, _ := status.FromError(err)
-	req.Equal(errStatus.Message(), schema.ErrEntityNotFound.Error())
-
-	// 4 - CREATE
-	_, err = client.Create(context.TODO(), &databasev1.StreamRegistryServiceCreateRequest{Stream: getResp.GetStream()})
-	req.NoError(err)
-
-	// 5 - GET - > Not Nil
-	getResp, err = client.Get(context.TODO(), &databasev1.StreamRegistryServiceGetRequest{
-		Metadata: meta,
+	It("manages the stream", func() {
+		client := databasev1.NewStreamRegistryServiceClient(conn)
+		Expect(client).NotTo(BeNil())
+		meta.Name = "sw"
+		getResp, err := client.Get(context.TODO(), &databasev1.StreamRegistryServiceGetRequest{Metadata: meta})
+		Expect(err).ShouldNot(HaveOccurred())
+		Expect(getResp).NotTo(BeNil())
+		By("Cleanup the registry")
+		deleteResp, err := client.Delete(context.TODO(), &databasev1.StreamRegistryServiceDeleteRequest{
+			Metadata: meta,
+		})
+		Expect(err).ShouldNot(HaveOccurred())
+		Expect(deleteResp).NotTo(BeNil())
+		Expect(deleteResp.GetDeleted()).To(BeTrue())
+		By("Verifying the registry empty")
+		_, err = client.Get(context.TODO(), &databasev1.StreamRegistryServiceGetRequest{
+			Metadata: meta,
+		})
+		errStatus, _ := status.FromError(err)
+		Expect(errStatus.Message()).To(Equal(schema.ErrEntityNotFound.Error()))
+		By("Creating a new stream")
+		_, err = client.Create(context.TODO(), &databasev1.StreamRegistryServiceCreateRequest{Stream: getResp.GetStream()})
+		Expect(err).ShouldNot(HaveOccurred())
+		By("Verifying the new stream")
+		getResp, err = client.Get(context.TODO(), &databasev1.StreamRegistryServiceGetRequest{
+			Metadata: meta,
+		})
+		Expect(err).ShouldNot(HaveOccurred())
+		Expect(getResp).NotTo(BeNil())
 	})
-	req.NoError(err)
-	req.NotNil(getResp)
-}
-
-func TestIndexRuleBindingRegistry(t *testing.T) {
-	req := require.New(t)
-	gracefulStop := setup(req, testData{
-		TLS:  false,
-		addr: "localhost:17912",
+	It("manages the index-rule-binding", func() {
+		client := databasev1.NewIndexRuleBindingRegistryServiceClient(conn)
+		Expect(client).NotTo(BeNil())
+		meta.Name = "sw-index-rule-binding"
+		getResp, err := client.Get(context.TODO(), &databasev1.IndexRuleBindingRegistryServiceGetRequest{Metadata: meta})
+		Expect(err).ShouldNot(HaveOccurred())
+		Expect(getResp).NotTo(BeNil())
+		By("Cleanup the registry")
+		deleteResp, err := client.Delete(context.TODO(), &databasev1.IndexRuleBindingRegistryServiceDeleteRequest{
+			Metadata: meta,
+		})
+		Expect(err).ShouldNot(HaveOccurred())
+		Expect(deleteResp).NotTo(BeNil())
+		Expect(deleteResp.GetDeleted()).To(BeTrue())
+		By("Verifying the registry empty")
+		_, err = client.Get(context.TODO(), &databasev1.IndexRuleBindingRegistryServiceGetRequest{
+			Metadata: meta,
+		})
+		errStatus, _ := status.FromError(err)
+		Expect(errStatus.Message()).To(Equal(schema.ErrEntityNotFound.Error()))
+		By("Creating a new index-rule-binding")
+		_, err = client.Create(context.TODO(), &databasev1.IndexRuleBindingRegistryServiceCreateRequest{IndexRuleBinding: getResp.GetIndexRuleBinding()})
+		Expect(err).ShouldNot(HaveOccurred())
+		By("Verifying the new index-rule-binding")
+		getResp, err = client.Get(context.TODO(), &databasev1.IndexRuleBindingRegistryServiceGetRequest{
+			Metadata: meta,
+		})
+		Expect(err).ShouldNot(HaveOccurred())
+		Expect(getResp).NotTo(BeNil())
 	})
-	defer gracefulStop()
-
-	conn, err := grpc.Dial("localhost:17912", grpc.WithInsecure())
-	req.NoError(err)
-	req.NotNil(conn)
-
-	client := databasev1.NewIndexRuleBindingRegistryServiceClient(conn)
-	req.NotNil(client)
-
-	meta := &commonv1.Metadata{
-		Group: "default",
-		Name:  "sw-index-rule-binding",
-	}
-
-	getResp, err := client.Get(context.TODO(), &databasev1.IndexRuleBindingRegistryServiceGetRequest{Metadata: meta})
-
-	req.NoError(err)
-	req.NotNil(getResp)
-
-	// 2 - DELETE
-	deleteResp, err := client.Delete(context.TODO(), &databasev1.IndexRuleBindingRegistryServiceDeleteRequest{
-		Metadata: meta,
+	It("manages the index-rule", func() {
+		client := databasev1.NewIndexRuleRegistryServiceClient(conn)
+		Expect(client).NotTo(BeNil())
+		meta.Name = "db.instance"
+		getResp, err := client.Get(context.TODO(), &databasev1.IndexRuleRegistryServiceGetRequest{Metadata: meta})
+		Expect(err).ShouldNot(HaveOccurred())
+		Expect(getResp).NotTo(BeNil())
+		By("Cleanup the registry")
+		deleteResp, err := client.Delete(context.TODO(), &databasev1.IndexRuleRegistryServiceDeleteRequest{
+			Metadata: meta,
+		})
+		Expect(err).ShouldNot(HaveOccurred())
+		Expect(deleteResp).NotTo(BeNil())
+		Expect(deleteResp.GetDeleted()).To(BeTrue())
+		By("Verifying the registry empty")
+		_, err = client.Get(context.TODO(), &databasev1.IndexRuleRegistryServiceGetRequest{
+			Metadata: meta,
+		})
+		errStatus, _ := status.FromError(err)
+		Expect(errStatus.Message()).To(Equal(schema.ErrEntityNotFound.Error()))
+		By("Creating a new index-rule")
+		_, err = client.Create(context.TODO(), &databasev1.IndexRuleRegistryServiceCreateRequest{IndexRule: getResp.GetIndexRule()})
+		Expect(err).ShouldNot(HaveOccurred())
+		By("Verifying the new index-rule")
+		getResp, err = client.Get(context.TODO(), &databasev1.IndexRuleRegistryServiceGetRequest{
+			Metadata: meta,
+		})
+		Expect(err).ShouldNot(HaveOccurred())
+		Expect(getResp).NotTo(BeNil())
 	})
-	req.NoError(err)
-	req.NotNil(deleteResp)
-	req.True(deleteResp.GetDeleted())
-
-	// 3 - GET -> Nil
-	_, err = client.Get(context.TODO(), &databasev1.IndexRuleBindingRegistryServiceGetRequest{
-		Metadata: meta,
-	})
-	errStatus, _ := status.FromError(err)
-	req.Equal(errStatus.Message(), schema.ErrEntityNotFound.Error())
-
-	// 4 - CREATE
-	_, err = client.Create(context.TODO(), &databasev1.IndexRuleBindingRegistryServiceCreateRequest{IndexRuleBinding: getResp.GetIndexRuleBinding()})
-	req.NoError(err)
-
-	// 5 - GET - > Not Nil
-	getResp, err = client.Get(context.TODO(), &databasev1.IndexRuleBindingRegistryServiceGetRequest{
-		Metadata: meta,
-	})
-	req.NoError(err)
-	req.NotNil(getResp)
-}
-
-func TestIndexRuleRegistry(t *testing.T) {
-	req := require.New(t)
-	gracefulStop := setup(req, testData{
-		TLS:  false,
-		addr: "localhost:17912",
-	})
-	defer gracefulStop()
-
-	conn, err := grpc.Dial("localhost:17912", grpc.WithInsecure())
-	req.NoError(err)
-	req.NotNil(conn)
-
-	client := databasev1.NewIndexRuleRegistryServiceClient(conn)
-	req.NotNil(client)
-
-	meta := &commonv1.Metadata{
-		Group: "default",
-		Name:  "db.instance",
-	}
-
-	getResp, err := client.Get(context.TODO(), &databasev1.IndexRuleRegistryServiceGetRequest{Metadata: meta})
-
-	req.NoError(err)
-	req.NotNil(getResp)
-
-	// 2 - DELETE
-	deleteResp, err := client.Delete(context.TODO(), &databasev1.IndexRuleRegistryServiceDeleteRequest{
-		Metadata: meta,
-	})
-	req.NoError(err)
-	req.NotNil(deleteResp)
-	req.True(deleteResp.GetDeleted())
-
-	// 3 - GET -> Nil
-	_, err = client.Get(context.TODO(), &databasev1.IndexRuleRegistryServiceGetRequest{
-		Metadata: meta,
-	})
-	errStatus, _ := status.FromError(err)
-	req.Equal(errStatus.Message(), schema.ErrEntityNotFound.Error())
-
-	// 4 - CREATE
-	_, err = client.Create(context.TODO(), &databasev1.IndexRuleRegistryServiceCreateRequest{IndexRule: getResp.GetIndexRule()})
-	req.NoError(err)
-
-	// 5 - GET - > Not Nil
-	getResp, err = client.Get(context.TODO(), &databasev1.IndexRuleRegistryServiceGetRequest{
-		Metadata: meta,
+	AfterEach(func() {
+		_ = conn.Close()
+		gracefulStop()
 	})
-	req.NoError(err)
-	req.NotNil(getResp)
-}
+})
diff --git a/banyand/liaison/grpc/server.go b/banyand/liaison/grpc/server.go
index 6720d7b..5081754 100644
--- a/banyand/liaison/grpc/server.go
+++ b/banyand/liaison/grpc/server.go
@@ -27,12 +27,13 @@ import (
 
 	"github.com/apache/skywalking-banyandb/api/event"
 	databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+	measurev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
 	streamv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
 	"github.com/apache/skywalking-banyandb/banyand/discovery"
 	"github.com/apache/skywalking-banyandb/banyand/metadata"
 	"github.com/apache/skywalking-banyandb/banyand/queue"
+	"github.com/apache/skywalking-banyandb/pkg/bus"
 	"github.com/apache/skywalking-banyandb/pkg/logger"
-	"github.com/apache/skywalking-banyandb/pkg/partition"
 	"github.com/apache/skywalking-banyandb/pkg/run"
 )
 
@@ -56,22 +57,26 @@ type Server struct {
 	pipeline       queue.Queue
 	repo           discovery.ServiceRepo
 	creds          credentials.TransportCredentials
-	shardRepo      *shardRepo
-	entityRepo     *entityRepo
+
+	streamSVC  *streamService
+	measureSVC *measureService
 	*streamRegistryServer
 	*indexRuleBindingRegistryServer
 	*indexRuleRegistryServer
 	*measureRegistryServer
 	*groupRegistryServer
-	streamv1.UnimplementedStreamServiceServer
 }
 
 func NewServer(_ context.Context, pipeline queue.Queue, repo discovery.ServiceRepo, schemaRegistry metadata.Service) *Server {
 	return &Server{
-		pipeline:   pipeline,
-		repo:       repo,
-		shardRepo:  &shardRepo{shardEventsMap: make(map[identity]uint32)},
-		entityRepo: &entityRepo{entitiesMap: make(map[identity]partition.EntityLocator)},
+		pipeline: pipeline,
+		repo:     repo,
+		streamSVC: &streamService{
+			discoveryService: newDiscoveryService(pipeline),
+		},
+		measureSVC: &measureService{
+			discoveryService: newDiscoveryService(pipeline),
+		},
 		streamRegistryServer: &streamRegistryServer{
 			schemaRegistry: schemaRegistry,
 		},
@@ -92,13 +97,34 @@ func NewServer(_ context.Context, pipeline queue.Queue, repo discovery.ServiceRe
 
 func (s *Server) PreRun() error {
 	s.log = logger.GetLogger("liaison-grpc")
-	s.shardRepo.log = s.log
-	s.entityRepo.log = s.log
-	err := s.repo.Subscribe(event.StreamTopicShardEvent, s.shardRepo)
-	if err != nil {
-		return err
+	components := []struct {
+		shardEvent   bus.Topic
+		entityEvent  bus.Topic
+		discoverySVC *discoveryService
+	}{
+		{
+			shardEvent:   event.StreamTopicShardEvent,
+			entityEvent:  event.StreamTopicEntityEvent,
+			discoverySVC: s.streamSVC.discoveryService,
+		},
+		{
+			shardEvent:   event.MeasureTopicShardEvent,
+			entityEvent:  event.MeasureTopicEntityEvent,
+			discoverySVC: s.measureSVC.discoveryService,
+		},
+	}
+	for _, c := range components {
+		c.discoverySVC.SetLogger(s.log)
+		err := s.repo.Subscribe(c.shardEvent, c.discoverySVC.shardRepo)
+		if err != nil {
+			return err
+		}
+		err = s.repo.Subscribe(c.entityEvent, c.discoverySVC.entityRepo)
+		if err != nil {
+			return err
+		}
 	}
-	return s.repo.Subscribe(event.StreamTopicEntityEvent, s.entityRepo)
+	return nil
 }
 
 func (s *Server) Name() string {
@@ -150,7 +176,9 @@ func (s *Server) Serve() error {
 	}
 	opts = append(opts, grpclib.MaxRecvMsgSize(s.maxRecvMsgSize))
 	s.ser = grpclib.NewServer(opts...)
-	streamv1.RegisterStreamServiceServer(s.ser, s)
+
+	streamv1.RegisterStreamServiceServer(s.ser, s.streamSVC)
+	measurev1.RegisterMeasureServiceServer(s.ser, s.measureSVC)
 	// register *Registry
 	databasev1.RegisterGroupRegistryServiceServer(s.ser, s.groupRegistryServer)
 	databasev1.RegisterIndexRuleBindingRegistryServiceServer(s.ser, s.indexRuleBindingRegistryServer)
diff --git a/banyand/liaison/grpc/stream.go b/banyand/liaison/grpc/stream.go
index df831c4..8d125b9 100644
--- a/banyand/liaison/grpc/stream.go
+++ b/banyand/liaison/grpc/stream.go
@@ -28,7 +28,12 @@ import (
 	"github.com/apache/skywalking-banyandb/pkg/bus"
 )
 
-func (s *Server) Write(stream streamv1.StreamService_WriteServer) error {
+type streamService struct {
+	*discoveryService
+	streamv1.UnimplementedStreamServiceServer
+}
+
+func (s *streamService) Write(stream streamv1.StreamService_WriteServer) error {
 	for {
 		writeEntity, err := stream.Recv()
 		if err == io.EOF {
@@ -37,18 +42,9 @@ func (s *Server) Write(stream streamv1.StreamService_WriteServer) error {
 		if err != nil {
 			return err
 		}
-		id := getID(writeEntity.GetMetadata())
-		shardNum, existed := s.shardRepo.shardNum(id)
-		if !existed {
-			continue
-		}
-		locator, existed := s.entityRepo.getLocator(id)
-		if !existed {
-			continue
-		}
-		entity, shardID, err := locator.Locate(writeEntity.GetElement().TagFamilies, shardNum)
+		entity, shardID, err := s.navigate(writeEntity.GetMetadata(), writeEntity.GetElement().GetTagFamilies())
 		if err != nil {
-			s.log.Error().Err(err).Msg("failed to locate write target")
+			s.log.Error().Err(err).Msg("failed to navigate to the write target")
 			continue
 		}
 		message := bus.NewMessage(bus.MessageID(time.Now().UnixNano()), &streamv1.InternalWriteRequest{
@@ -66,7 +62,7 @@ func (s *Server) Write(stream streamv1.StreamService_WriteServer) error {
 	}
 }
 
-func (s *Server) Query(_ context.Context, entityCriteria *streamv1.QueryRequest) (*streamv1.QueryResponse, error) {
+func (s *streamService) Query(_ context.Context, entityCriteria *streamv1.QueryRequest) (*streamv1.QueryResponse, error) {
 	message := bus.NewMessage(bus.MessageID(time.Now().UnixNano()), entityCriteria)
 	feat, errQuery := s.pipeline.Publish(data.TopicStreamQuery, message)
 	if errQuery != nil {
diff --git a/banyand/liaison/grpc/stream_test.go b/banyand/liaison/grpc/stream_test.go
index 1f30088..01aea1d 100644
--- a/banyand/liaison/grpc/stream_test.go
+++ b/banyand/liaison/grpc/stream_test.go
@@ -1,40 +1,20 @@
-// Licensed to Apache Software Foundation (ASF) under one or more contributor
-// license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright
-// ownership. Apache Software Foundation (ASF) licenses this file to you under
-// the Apache License, Version 2.0 (the "License"); you may
-// not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package grpc
+package grpc_test
 
 import (
 	"context"
 	"encoding/base64"
-	"fmt"
 	"io"
 	"path/filepath"
 	"runtime"
 	"sync"
-	"testing"
 	"time"
 
-	"github.com/stretchr/testify/assert"
-	"github.com/stretchr/testify/require"
 	grpclib "google.golang.org/grpc"
 	"google.golang.org/grpc/credentials"
 
 	streamv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
 	"github.com/apache/skywalking-banyandb/banyand/discovery"
+	"github.com/apache/skywalking-banyandb/banyand/liaison/grpc"
 	"github.com/apache/skywalking-banyandb/banyand/metadata"
 	"github.com/apache/skywalking-banyandb/banyand/query"
 	"github.com/apache/skywalking-banyandb/banyand/queue"
@@ -44,53 +24,73 @@ import (
 	"github.com/apache/skywalking-banyandb/pkg/run"
 	"github.com/apache/skywalking-banyandb/pkg/test"
 	teststream "github.com/apache/skywalking-banyandb/pkg/test/stream"
-)
-
-var _ run.PreRunner = (*preloadStreamService)(nil)
-
-type preloadStreamService struct {
-	metaSvc metadata.Service
-}
-
-func (p *preloadStreamService) Name() string {
-	return "preload-measure"
-}
 
-func (p *preloadStreamService) PreRun() error {
-	return teststream.PreloadSchema(p.metaSvc.SchemaRegistry())
-}
+	. "github.com/onsi/ginkgo/v2"
+	. "github.com/onsi/gomega"
+)
 
-type testData struct {
-	certFile           string
-	serverHostOverride string
-	TLS                bool
-	addr               string
-	basePath           string
-}
+var _ = Describe("Stream", func() {
+	var gracefulStop func()
+	var conn *grpclib.ClientConn
+	It("is a plain server", func() {
+		gracefulStop = setup(nil)
+		var err error
+		conn, err = grpclib.Dial("localhost:17912", grpclib.WithInsecure())
+		Expect(err).NotTo(HaveOccurred())
+		streamWrite(conn)
+		Eventually(func() (int, error) {
+			return streamQuery(conn)
+		}).Should(Equal(1))
+	})
+	It("is a TLS server", func() {
+		flags := []string{"--tls=true"}
+		_, currentFile, _, _ := runtime.Caller(0)
+		basePath := filepath.Dir(currentFile)
+		certFile := filepath.Join(basePath, "testdata/server_cert.pem")
+		keyFile := filepath.Join(basePath, "testdata/server_key.pem")
+		flags = append(flags, "--cert-file="+certFile)
+		flags = append(flags, "--key-file="+keyFile)
+		addr := "localhost:17913"
+		flags = append(flags, "--addr="+addr)
+		gracefulStop = setup(flags)
+		creds, err := credentials.NewClientTLSFromFile(certFile, "localhost")
+		Expect(err).NotTo(HaveOccurred())
+		conn, err = grpclib.Dial(addr, grpclib.WithTransportCredentials(creds))
+		Expect(err).NotTo(HaveOccurred())
+		streamWrite(conn)
+		Eventually(func() (int, error) {
+			return streamQuery(conn)
+		}).Should(Equal(1))
+	})
+	AfterEach(func() {
+		_ = conn.Close()
+		gracefulStop()
+	})
+})
 
-func setup(req *require.Assertions, testData testData) func() {
-	req.NoError(logger.Init(logger.Logging{
+func setup(flags []string) func() {
+	Expect(logger.Init(logger.Logging{
 		Env:   "dev",
 		Level: "warn",
-	}))
+	})).Should(Succeed())
 	g := run.Group{Name: "standalone"}
 	// Init `Discovery` module
 	repo, err := discovery.NewServiceRepo(context.Background())
-	req.NoError(err)
+	Expect(err).NotTo(HaveOccurred())
 	// Init `Queue` module
 	pipeline, err := queue.NewQueue(context.TODO(), repo)
-	req.NoError(err)
+	Expect(err).NotTo(HaveOccurred())
 	// Init `Metadata` module
 	metaSvc, err := metadata.NewService(context.TODO())
-	req.NoError(err)
+	Expect(err).NotTo(HaveOccurred())
 	// Init `Stream` module
 	streamSvc, err := stream.NewService(context.TODO(), metaSvc, repo, pipeline)
-	req.NoError(err)
+	Expect(err).NotTo(HaveOccurred())
 	// Init `Query` module
 	q, err := query.NewExecutor(context.TODO(), streamSvc, metaSvc, repo, pipeline)
-	req.NoError(err)
+	Expect(err).NotTo(HaveOccurred())
 
-	tcp := NewServer(context.TODO(), pipeline, repo, metaSvc)
+	tcp := grpc.NewServer(context.TODO(), pipeline, repo, metaSvc)
 
 	closer := run.NewTester("closer")
 	startListener := run.NewTester("started-listener")
@@ -108,18 +108,12 @@ func setup(req *require.Assertions, testData testData) func() {
 		startListener,
 	)
 	// Create a random directory
-	rootPath, deferFunc := test.Space(req)
-	flags := []string{"--root-path=" + rootPath, "--metadata-root-path=" + teststream.RandomTempDir()}
-	if testData.TLS {
-		flags = append(flags, "--tls=true")
-		certFile := filepath.Join(testData.basePath, "testdata/server_cert.pem")
-		keyFile := filepath.Join(testData.basePath, "testdata/server_key.pem")
-		flags = append(flags, "--cert-file="+certFile)
-		flags = append(flags, "--key-file="+keyFile)
-		flags = append(flags, "--addr="+testData.addr)
-	}
+	rootPath, deferFunc, err := test.NewSpace()
+	Expect(err).NotTo(HaveOccurred())
+	flags = append(flags, "--root-path="+rootPath, "--metadata-root-path="+teststream.RandomTempDir())
+
 	err = g.RegisterFlags().Parse(flags)
-	req.NoError(err)
+	Expect(err).NotTo(HaveOccurred())
 
 	wg := sync.WaitGroup{}
 
@@ -130,72 +124,27 @@ func setup(req *require.Assertions, testData testData) func() {
 		errRun := g.Run()
 		if errRun != nil {
 			startListener.GracefulStop()
-			req.NoError(errRun)
+			Expect(errRun).Should(Succeed())
 		}
 		deferFunc()
 	}()
-	req.NoError(startListener.WaitUntilStarted())
+	Expect(startListener.WaitUntilStarted()).Should(Succeed())
 	return func() {
 		closer.GracefulStop()
 		wg.Wait()
 	}
 }
 
-type caseData struct {
-	name           string
-	queryGenerator func(baseTs time.Time) *streamv1.QueryRequest
-	writeGenerator func() *streamv1.WriteRequest
-	args           testData
-	wantLen        int
+type preloadStreamService struct {
+	metaSvc metadata.Service
 }
 
-func TestStreamService(t *testing.T) {
-	req := require.New(t)
-	_, currentFile, _, _ := runtime.Caller(0)
-	basePath := filepath.Dir(currentFile)
-	certFile := filepath.Join(basePath, "testdata/server_cert.pem")
-	testCases := []caseData{
-		{
-			name:           "isTLS",
-			queryGenerator: queryCriteria,
-			writeGenerator: writeData,
-			args: testData{
-				TLS:                true,
-				certFile:           certFile,
-				serverHostOverride: "localhost",
-				addr:               "localhost:17913",
-				basePath:           basePath,
-			},
-			wantLen: 1,
-		},
-		{
-			name:           "noTLS",
-			queryGenerator: queryCriteria,
-			writeGenerator: writeData,
-			args: testData{
-				TLS:  false,
-				addr: "localhost:17912",
-			},
-			wantLen: 1,
-		},
-	}
-	for _, tc := range testCases {
-		t.Run(tc.name, func(t *testing.T) {
-			gracefulStop := setup(req, tc.args)
-			defer gracefulStop()
-			if tc.args.TLS {
-				var opts []grpclib.DialOption
-				creds, err := credentials.NewClientTLSFromFile(tc.args.certFile, tc.args.serverHostOverride)
-				assert.NoError(t, err)
-				opts = append(opts, grpclib.WithTransportCredentials(creds))
-				dialService(t, tc, opts)
-			} else {
-				var opts []grpclib.DialOption
-				opts = append(opts, grpclib.WithInsecure())
-				dialService(t, tc, opts)
-			}
-		})
-	}
+func (p *preloadStreamService) Name() string {
+	return "preload-measure"
+}
+
+func (p *preloadStreamService) PreRun() error {
+	return teststream.PreloadSchema(p.metaSvc.SchemaRegistry())
 }
 
 func writeData() *streamv1.WriteRequest {
@@ -227,60 +176,23 @@ func queryCriteria(baseTs time.Time) *streamv1.QueryRequest {
 		Build()
 }
 
-func dialService(t *testing.T, tc caseData, opts []grpclib.DialOption) {
-	conn, err := grpclib.Dial(tc.args.addr, opts...)
-	assert.NoError(t, err)
-	defer func(conn *grpclib.ClientConn) {
-		_ = conn.Close()
-	}(conn)
-	streamWrite(t, tc, conn)
-	requireTester := require.New(t)
-	assert.NoError(t, test.Retry(10, 100*time.Millisecond, func() error {
-		now := time.Now()
-		resp := streamQuery(requireTester, conn, tc.queryGenerator(now))
-		if len(resp.GetElements()) == tc.wantLen {
-			return nil
-		}
-		return fmt.Errorf("expected elements number: %d got: %d", tc.wantLen, len(resp.GetElements()))
-	}))
-}
-
-func streamWrite(t *testing.T, tc caseData, conn *grpclib.ClientConn) {
+func streamWrite(conn *grpclib.ClientConn) {
 	client := streamv1.NewStreamServiceClient(conn)
 	ctx := context.Background()
 	writeClient, errorWrite := client.Write(ctx)
-	if errorWrite != nil {
-		t.Errorf("%v.write(_) = _, %v", client, errorWrite)
-	}
-	waitc := make(chan struct{})
-	go func() {
-		for {
-			writeResponse, errRecv := writeClient.Recv()
-			if errRecv == io.EOF {
-				// read done.
-				close(waitc)
-				return
-			}
-			assert.NoError(t, errRecv)
-			assert.NotNil(t, writeResponse)
-		}
-	}()
-	if errSend := writeClient.Send(tc.writeGenerator()); errSend != nil {
-		t.Errorf("Failed to send a note: %v", errSend)
-	}
-	if errorSend := writeClient.CloseSend(); errorSend != nil {
-		t.Errorf("Failed to send a note: %v", errorSend)
-	}
-	<-waitc
+	Expect(errorWrite).Should(Succeed())
+	Expect(writeClient.Send(writeData())).Should(Succeed())
+	Expect(writeClient.CloseSend()).Should(Succeed())
+	Eventually(func() error {
+		_, err := writeClient.Recv()
+		return err
+	}).Should(Equal(io.EOF))
 }
 
-func streamQuery(tester *require.Assertions, conn *grpclib.ClientConn, request *streamv1.QueryRequest) *streamv1.QueryResponse {
+func streamQuery(conn *grpclib.ClientConn) (int, error) {
 	client := streamv1.NewStreamServiceClient(conn)
 	ctx := context.Background()
-	queryResponse, errRev := client.Query(ctx, request)
-	if errRev != nil {
-		tester.Errorf(errRev, "Retrieve client failed: %v")
-	}
-	tester.NotNil(queryResponse)
-	return queryResponse
+	resp, err := client.Query(ctx, queryCriteria(time.Now()))
+
+	return len(resp.GetElements()), err
 }
diff --git a/go.mod b/go.mod
index adda1b0..91a8dec 100644
--- a/go.mod
+++ b/go.mod
@@ -38,12 +38,14 @@ require (
 	github.com/dustin/go-humanize v1.0.0 // indirect
 	github.com/form3tech-oss/jwt-go v3.2.3+incompatible // indirect
 	github.com/fsnotify/fsnotify v1.4.9 // indirect
+	github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0 // indirect
 	github.com/gogo/protobuf v1.3.2 // indirect
 	github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b // indirect
 	github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
 	github.com/golang/snappy v0.0.3 // indirect
 	github.com/google/btree v1.0.1 // indirect
 	github.com/google/flatbuffers v1.12.1 // indirect
+	github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38 // indirect
 	github.com/gorilla/websocket v1.4.2 // indirect
 	github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 // indirect
 	github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 // indirect
@@ -58,6 +60,8 @@ require (
 	github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
 	github.com/modern-go/reflect2 v1.0.1 // indirect
 	github.com/mschoch/smat v0.2.0 // indirect
+	github.com/onsi/ginkgo/v2 v2.0.0 // indirect
+	github.com/onsi/gomega v1.17.0 // indirect
 	github.com/pelletier/go-toml v1.9.3 // indirect
 	github.com/pmezard/go-difflib v1.0.0 // indirect
 	github.com/prometheus/client_golang v1.11.0 // indirect
@@ -94,6 +98,7 @@ require (
 	golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0 // indirect
 	golang.org/x/text v0.3.6 // indirect
 	golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba // indirect
+	golang.org/x/tools v0.1.5 // indirect
 	gopkg.in/ini.v1 v1.62.0 // indirect
 	gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect
 	gopkg.in/yaml.v2 v2.4.0 // indirect
diff --git a/go.sum b/go.sum
index 5b62477..425e2d0 100644
--- a/go.sum
+++ b/go.sum
@@ -146,6 +146,8 @@ github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9
 github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk=
 github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A=
 github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
+github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0 h1:p104kn46Q8WdvHunIJ9dAyjPVtrBPhSr3KT2yUst43I=
+github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0/go.mod h1:fyg7847qk6SyHyPtNmDHnmrv/HOrqktSC+C9fM+CJOE=
 github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
 github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
 github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4=
@@ -222,6 +224,8 @@ github.com/google/pprof v0.0.0-20201023163331-3e6fc7fc9c4c/go.mod h1:kpwsk12EmLe
 github.com/google/pprof v0.0.0-20201203190320-1bf35d6f28c2/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE=
 github.com/google/pprof v0.0.0-20210122040257-d980be63207e/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE=
 github.com/google/pprof v0.0.0-20210226084205-cbba55b83ad5/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE=
+github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38 h1:yAJXTCF9TqKcTiHJAE8dj7HMvPfh66eeA2JYW7eFpSE=
+github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE=
 github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
 github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
 github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
@@ -261,6 +265,7 @@ github.com/hashicorp/logutils v1.0.0/go.mod h1:QIAnNjmIWmVIIkWDTG1z5v++HQmx9WQRO
 github.com/hashicorp/mdns v1.0.0/go.mod h1:tL+uN++7HEJ6SQLQ2/p+z2pH24WQKWjBPkE0mNTz8vQ=
 github.com/hashicorp/memberlist v0.1.3/go.mod h1:ajVTdAv/9Im8oMAAj5G31PhhMCZJV2pPBoIllUwCN7I=
 github.com/hashicorp/serf v0.8.2/go.mod h1:6hOLApaqBFA1NXqRQAsxw9QxuDEvNxSQRwA/JwenrHc=
+github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
 github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
 github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
 github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM=
@@ -325,9 +330,20 @@ github.com/mschoch/smat v0.2.0 h1:8imxQsjDm8yFEAVBe7azKmKSgzSkZXDuKkSq9374khM=
 github.com/mschoch/smat v0.2.0/go.mod h1:kc9mz7DoBKqDyiRL7VZN8KvXQMWeTaVnttLRXOlotKw=
 github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
 github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
+github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=
+github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU=
 github.com/oklog/run v1.1.0 h1:GEenZ1cK0+q0+wsJew9qUg/DyD8k3JzYsZAi5gYi2mA=
 github.com/oklog/run v1.1.0/go.mod h1:sVPdnTZT1zYwAJeCMu2Th4T21pA3FPOQRfWjQlk7DVU=
 github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U=
+github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
+github.com/onsi/ginkgo v1.12.1/go.mod h1:zj2OWP4+oCPe1qIXoGWkgMRwljMUYCdkwsT2108oapk=
+github.com/onsi/ginkgo v1.16.4/go.mod h1:dX+/inL/fNMqNlz0e9LfyB9TswhZpCVdJM/Z6Vvnwo0=
+github.com/onsi/ginkgo/v2 v2.0.0 h1:CcuG/HvWNkkaqCUpJifQY8z7qEMBJya6aLPx6ftGyjQ=
+github.com/onsi/ginkgo/v2 v2.0.0/go.mod h1:vw5CSIxN1JObi/U8gcbwft7ZxR2dgaR70JSE3/PpL4c=
+github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY=
+github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo=
+github.com/onsi/gomega v1.17.0 h1:9Luw4uT5HTjHTN8+aNcSThgH1vdXnmdJ8xIfZ4wyTRE=
+github.com/onsi/gomega v1.17.0/go.mod h1:HnhC7FXeEQY45zxNK3PPoIUhzk/80Xly9PcubAlGdZY=
 github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
 github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
 github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic=
@@ -543,6 +559,7 @@ golang.org/x/mod v0.4.1/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
 golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
 golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
 golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
+golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
 golang.org/x/net v0.0.0-20181023162649-9b4f9f5ad519/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
 golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
 golang.org/x/net v0.0.0-20181201002055-351d144fa1fc/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
@@ -568,6 +585,7 @@ golang.org/x/net v0.0.0-20200324143707-d3edc9973b7e/go.mod h1:qpuaurCH72eLCgpAm/
 golang.org/x/net v0.0.0-20200501053045-e0ff5e5a1de5/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
 golang.org/x/net v0.0.0-20200506145744-7e3656a0809f/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
 golang.org/x/net v0.0.0-20200513185701-a91f0712d120/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
+golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
 golang.org/x/net v0.0.0-20200520182314-0ba52f642ac2/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
 golang.org/x/net v0.0.0-20200625001655-4c5254603344/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
 golang.org/x/net v0.0.0-20200707034311-ab3426394381/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
@@ -581,6 +599,7 @@ golang.org/x/net v0.0.0-20210119194325-5f4716e94777/go.mod h1:m0MpNAwzfU5UDzcl9v
 golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
 golang.org/x/net v0.0.0-20210316092652-d523dce5a7f4/go.mod h1:RBQZq4jEuRlivfhVLdyRGr576XBO4/greRjx4P4O3yc=
 golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
+golang.org/x/net v0.0.0-20210428140749-89ef3d95e781/go.mod h1:OJAsFXCWl8Ukc7SiCT/9KSuxbyM7479/AVlXFRxuMCk=
 golang.org/x/net v0.0.0-20210716203947-853a461950ff h1:j2EK/QoxYNBsXI4R7fQkkRUk8y6wnOBI+6hgPdP/6Ds=
 golang.org/x/net v0.0.0-20210716203947-853a461950ff/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
 golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
@@ -610,6 +629,7 @@ golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJ
 golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
 golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
 golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
 golang.org/x/sys v0.0.0-20181026203630-95b1ffbd15a5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
 golang.org/x/sys v0.0.0-20181107165924-66b7b1311ac8/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
 golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
@@ -623,9 +643,11 @@ golang.org/x/sys v0.0.0-20190507160741-ecd444e8653b/go.mod h1:h1NjWce9XRLGQEsW7w
 golang.org/x/sys v0.0.0-20190606165138-5da285871e9c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20190624142023-c5567b49c5d0/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20190726091711-fc99dfbffb4e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20190904154756-749cb33beabd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20191001151750-bb3f8db39f24/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20191120155948-bd437916bb0e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20191204072324-ce4227a45e2e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20191228213918-04cbcbbfeed8/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20200106162015-b016eb3dc98e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
@@ -650,6 +672,7 @@ golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7w
 golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20201201145000-ef89a241ccb3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20210104204734-6f8348627aad/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20210112080510-489259a85091/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20210220050731-9a76102bfb43/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
@@ -728,6 +751,7 @@ golang.org/x/tools v0.0.0-20200904185747-39188db58858/go.mod h1:Cj7w3i3Rnn0Xh82u
 golang.org/x/tools v0.0.0-20201110124207-079ba7bd75cd/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
 golang.org/x/tools v0.0.0-20201201161351-ac6f37ff4c2a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
 golang.org/x/tools v0.0.0-20201208233053-a543418bbed2/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
+golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
 golang.org/x/tools v0.0.0-20210105154028-b0ab187a4818/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
 golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
 golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0=
@@ -856,12 +880,14 @@ gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8
 gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo=
 gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
 gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
+gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
 gopkg.in/ini.v1 v1.51.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
 gopkg.in/ini.v1 v1.62.0 h1:duBzk771uxoUuOlyRLkHsygud9+5lrlGjdFBb4mSKDU=
 gopkg.in/ini.v1 v1.62.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
 gopkg.in/natefinch/lumberjack.v2 v2.0.0 h1:1Lc07Kr7qY4U2YPouBjpCLxpiyxIVoxqXgkXLknAOE8=
 gopkg.in/natefinch/lumberjack.v2 v2.0.0/go.mod h1:l0ndWWf7gzL7RNwBG7wST/UCcT4T24xpD6X8LsfU/+k=
 gopkg.in/resty.v1 v1.12.0/go.mod h1:mDo4pnntr5jdWRML875a/NmxYqAlA73dVijT2AXvQQo=
+gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
 gopkg.in/yaml.v2 v2.0.0-20170812160011-eb3733d160e7/go.mod h1:JAlM8MvJe8wmxCU4Bli9HhUf9+ttbYbLASfIpnQbh74=
 gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
 gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
diff --git a/pkg/logger/logger.go b/pkg/logger/logger.go
index 1407a4f..36896ed 100644
--- a/pkg/logger/logger.go
+++ b/pkg/logger/logger.go
@@ -46,3 +46,8 @@ func (l *Logger) Named(name string) *Logger {
 	subLogger := root.Logger.With().Str("module", module).Logger()
 	return &Logger{module: module, Logger: &subLogger}
 }
+
+// Loggable indicates the implement supports logging
+type Loggable interface {
+	SetLogger(*Logger)
+}
diff --git a/pkg/test/space.go b/pkg/test/space.go
index bbe7ea3..8fafdb3 100644
--- a/pkg/test/space.go
+++ b/pkg/test/space.go
@@ -35,3 +35,12 @@ func Space(t *require.Assertions) (tempDir string, deferFunc func()) {
 		}
 	}
 }
+
+func NewSpace() (tempDir string, deferFunc func(), err error) {
+	tempDir, err = ioutil.TempDir("", "banyandb-test-*")
+	return tempDir, func() {
+		if err = os.RemoveAll(tempDir); err != nil {
+			_, _ = fmt.Fprintf(os.Stderr, "Error while removing dir: %v\n", err)
+		}
+	}, err
+}