You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ha...@apache.org on 2021/05/31 13:42:47 UTC

[skywalking-banyandb] 01/01: Add KV module and refactor storage module

This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch storage
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit d85b514a89ea8614b4cc6e7f99b37c794af144ce
Author: Gao Hongtao <ha...@gmail.com>
AuthorDate: Mon May 31 21:20:26 2021 +0800

    Add KV module and refactor storage module
    
    Signed-off-by: Gao Hongtao <ha...@gmail.com>
---
 banyand/internal/cmd/standalone.go                |   2 +-
 banyand/kv/badger.go                              | 118 +++++++++++++
 banyand/kv/kv.go                                  | 139 ++++++++++++++++
 banyand/storage/block.go                          | 137 +++++++++++++++
 banyand/storage/database.go                       | 193 ++++++++++++++--------
 banyand/storage/database_test.go                  | 165 ++++++++++++++++++
 banyand/storage/storage.go                        |  56 ++++++-
 go.mod                                            |   5 +
 go.sum                                            |  82 ++++++++-
 banyand/storage/kv/kv.go => pkg/bytes/bytes.go    |  16 +-
 banyand/storage/kv/kv.go => pkg/convert/number.go |  19 ++-
 pkg/logger/logger.go                              |  22 ++-
 12 files changed, 860 insertions(+), 94 deletions(-)

diff --git a/banyand/internal/cmd/standalone.go b/banyand/internal/cmd/standalone.go
index b9f6e90..d4f4291 100644
--- a/banyand/internal/cmd/standalone.go
+++ b/banyand/internal/cmd/standalone.go
@@ -53,7 +53,7 @@ func newStandaloneCmd() *cobra.Command {
 	if err != nil {
 		l.Fatal("failed to initiate data pipeline", logger.Error(err))
 	}
-	db, err := storage.NewDB(ctx, repo, pipeline)
+	db, err := storage.NewDB(ctx, repo)
 	if err != nil {
 		l.Fatal("failed to initiate database", logger.Error(err))
 	}
diff --git a/banyand/kv/badger.go b/banyand/kv/badger.go
new file mode 100644
index 0000000..e4d510c
--- /dev/null
+++ b/banyand/kv/badger.go
@@ -0,0 +1,118 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package kv
+
+import (
+	"log"
+	"strconv"
+
+	"github.com/dgraph-io/badger/v3"
+	"go.uber.org/zap"
+
+	"github.com/apache/skywalking-banyandb/pkg/convert"
+	"github.com/apache/skywalking-banyandb/pkg/logger"
+)
+
+var _ TimeSeriesStore = (*badgerTSS)(nil)
+
+type badgerTSS struct {
+	dbOpts badger.Options
+	db     *badger.DB
+	badger.TSet
+}
+
+func (b *badgerTSS) Close() error {
+	if b.db != nil && !b.db.IsClosed() {
+		return b.db.Close()
+	}
+	return nil
+}
+
+var _ Store = (*badgerDB)(nil)
+
+type badgerDB struct {
+	dbOpts badger.Options
+	db     *badger.DB
+	seqKey string
+	seq    *badger.Sequence
+}
+
+func (b *badgerDB) Add(val []byte) (uint64, error) {
+	id, err := b.seq.Next()
+	if err != nil {
+		return 0, err
+	}
+	return id, b.Put(convert.Uint64ToBytes(id), val)
+}
+
+func (b *badgerDB) Close() error {
+	if b.db != nil && !b.db.IsClosed() {
+		return b.db.Close()
+	}
+	return nil
+}
+
+func (b *badgerDB) Put(key, val []byte) error {
+	return b.db.Update(func(txn *badger.Txn) error {
+		return txn.Set(key, val)
+	})
+}
+
+func (b *badgerDB) Get(key []byte) ([]byte, error) {
+	var bb []byte
+	err := b.db.View(func(txn *badger.Txn) error {
+		item, err := txn.Get(key)
+		if err != nil {
+			return err
+		}
+		return item.Value(func(val []byte) error {
+			bb = val
+			return nil
+		})
+	})
+	return bb, err
+}
+
+// badgerLog delegates the zap log to the badger logger
+type badgerLog struct {
+	*log.Logger
+	delegated *logger.Logger
+}
+
+func (l *badgerLog) Errorf(f string, v ...interface{}) {
+	l.delegated.Error(f, convToFields(v...)...)
+}
+
+func (l *badgerLog) Warningf(f string, v ...interface{}) {
+	l.delegated.Warn(f, convToFields(v...)...)
+}
+
+func (l *badgerLog) Infof(f string, v ...interface{}) {
+	l.delegated.Info(f, convToFields(v...)...)
+}
+
+func (l *badgerLog) Debugf(f string, v ...interface{}) {
+	l.delegated.Debug(f, convToFields(v...)...)
+}
+
+func convToFields(vv ...interface{}) (fields []zap.Field) {
+	for i, v := range vv {
+		fields = append(fields, zap.Any(strconv.Itoa(i), v))
+	}
+	return fields
+}
diff --git a/banyand/kv/kv.go b/banyand/kv/kv.go
new file mode 100644
index 0000000..80efdb4
--- /dev/null
+++ b/banyand/kv/kv.go
@@ -0,0 +1,139 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package kv
+
+import (
+	"bytes"
+	"fmt"
+	"io"
+
+	"github.com/dgraph-io/badger/v3"
+	"github.com/dgraph-io/badger/v3/y"
+
+	"github.com/apache/skywalking-banyandb/pkg/logger"
+)
+
+// Hook contains several functions which are invoked on MemTables flushing or LevelTables retrieving
+type Hook interface {
+	// Reduce the values with an identical key on MemTables flushing
+	Reduce(left bytes.Buffer, right y.ValueStruct) bytes.Buffer
+	// Extract an chunk from a reduced value on LevelTables retrieving
+	Extract(raw []byte, ts uint64) ([]byte, error)
+	// Split a reduced value into chunks on LevelTables retrieving
+	Split(raw []byte) ([][]byte, error)
+}
+
+// Store is a common kv storage with auto-generated key
+type Store interface {
+	io.Closer
+	// Add a value whose key can be generated automatically
+	Add(val []byte) (uint64, error)
+	// Put a value
+	Put(key, val []byte) error
+	// Get a value by its key
+	Get(key []byte) ([]byte, error)
+}
+
+// TimeSeriesStore is time series storage
+type TimeSeriesStore interface {
+	io.Closer
+	// Put a value with a timestamp/version
+	Put(key, val []byte, ts uint64) error
+	// PutAsync a value with a timestamp/version asynchronously.
+	// Injected "f" func will notice the result of value write.
+	PutAsync(key, val []byte, ts uint64, f func(error)) error
+	// Get a value by its key and timestamp/version
+	Get(key []byte, ts uint64) ([]byte, error)
+	// GetAll values with an identical key
+	GetAll(key []byte) ([][]byte, error)
+}
+
+type TimeSeriesOptions func(TimeSeriesStore)
+
+// TSSWithLogger sets a external logger into underlying TimeSeriesStore
+func TSSWithLogger(l *logger.Logger) TimeSeriesOptions {
+	return func(store TimeSeriesStore) {
+		if btss, ok := store.(*badgerTSS); ok {
+			btss.dbOpts = btss.dbOpts.WithLogger(&badgerLog{
+				delegated: l,
+			})
+		}
+	}
+}
+
+// OpenTimeSeriesStore creates a new TimeSeriesStore
+func OpenTimeSeriesStore(path string, hook Hook, options ...TimeSeriesOptions) (TimeSeriesStore, error) {
+	btss := new(badgerTSS)
+	btss.dbOpts = badger.DefaultOptions(path)
+	for _, opt := range options {
+		opt(btss)
+	}
+	btss.dbOpts = btss.dbOpts.WithMaxLevels(1)
+	var err error
+	btss.db, err = badger.Open(btss.dbOpts)
+	if err != nil {
+		return nil, fmt.Errorf("failed to open time series store: %v", err)
+	}
+	btss.TSet = *badger.NewTSet(btss.db, hook.Reduce, hook.Extract, hook.Split)
+	return btss, nil
+}
+
+type StoreOptions func(Store)
+
+// StoreWithLogger sets a external logger into underlying Store
+func StoreWithLogger(l *logger.Logger) StoreOptions {
+	return func(store Store) {
+		if bdb, ok := store.(*badgerDB); ok {
+			bdb.dbOpts = bdb.dbOpts.WithLogger(&badgerLog{
+				delegated: l,
+			})
+		}
+	}
+}
+
+// StoreWithAutoGeneratedKey actives auto generation key to the Store
+func StoreWithAutoGeneratedKey() StoreOptions {
+	return func(store Store) {
+		if bdb, ok := store.(*badgerDB); ok {
+			bdb.seqKey = "sequence"
+		}
+	}
+}
+
+// OpenStore creates a new Store
+func OpenStore(path string, options ...StoreOptions) (Store, error) {
+	bdb := new(badgerDB)
+	bdb.dbOpts = badger.DefaultOptions(path)
+	for _, opt := range options {
+		opt(bdb)
+	}
+	bdb.dbOpts = bdb.dbOpts.WithMaxLevels(1)
+
+	var err error
+	bdb.db, err = badger.Open(bdb.dbOpts)
+	if err != nil {
+		return nil, fmt.Errorf("failed to open time series store: %v", err)
+	}
+	if bdb.seqKey != "" {
+		bdb.seq, err = bdb.db.GetSequence([]byte(bdb.seqKey), 100)
+		if err != nil {
+			return nil, fmt.Errorf("failed to get sequence: %v", err)
+		}
+	}
+	return bdb, nil
+}
diff --git a/banyand/storage/block.go b/banyand/storage/block.go
new file mode 100644
index 0000000..4838c8d
--- /dev/null
+++ b/banyand/storage/block.go
@@ -0,0 +1,137 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package storage
+
+import (
+	"fmt"
+
+	"go.uber.org/atomic"
+
+	"github.com/apache/skywalking-banyandb/banyand/kv"
+	"github.com/apache/skywalking-banyandb/pkg/logger"
+)
+
+type block struct {
+	path    string
+	plugins []Plugin
+
+	l *logger.Logger
+
+	stores   map[string]kv.Store
+	tsStores map[string]kv.TimeSeriesStore
+
+	refCount atomic.Uint32
+}
+
+func newBlock(path string, plugins []Plugin) (*block, error) {
+	l := logger.GetLogger("block")
+	return &block{
+		path:     path,
+		plugins:  plugins,
+		l:        l,
+		stores:   make(map[string]kv.Store),
+		tsStores: make(map[string]kv.TimeSeriesStore),
+	}, nil
+}
+
+func (b *block) init() (err error) {
+	for _, plugin := range b.plugins {
+		kvDefines := plugin.Init()
+		if kvDefines == nil || len(kvDefines) < 1 {
+			continue
+		}
+		if err = b.createKV(plugin.ID(), kvDefines); err != nil {
+			return err
+		}
+		plugin.Start(func() AccessPoint {
+			b.refCount.Inc()
+			return &staticAccessPoint{
+				b:        b,
+				pluginID: plugin.ID(),
+			}
+		})
+	}
+	return nil
+}
+
+func (b *block) createKV(pluginID string, defines []KVSpec) (err error) {
+	for _, define := range defines {
+		storeID := getStoreID(pluginID, define.Name)
+		path := fmt.Sprintf("%s/%s", b.path, storeID)
+		b.l.Info("open kv store", logger.String("path", path), logger.Any("type", define.Type))
+		switch define.Type {
+		case KVTypeNormal:
+			var s kv.Store
+			opts := make([]kv.StoreOptions, 0)
+			opts = append(opts, kv.StoreWithLogger(b.l))
+			if define.AutoGenKey {
+				opts = append(opts, kv.StoreWithAutoGeneratedKey())
+			}
+			if s, err = kv.OpenStore(path, opts...); err != nil {
+				return fmt.Errorf("failed to open normal store: %w", err)
+			}
+			b.stores[storeID] = s
+		case KVTypeTimeSeries:
+			var s kv.TimeSeriesStore
+			if s, err = kv.OpenTimeSeriesStore(path, define.TimeSeriesHook, kv.TSSWithLogger(b.l)); err != nil {
+				return fmt.Errorf("failed to open time series store: %w", err)
+			}
+			b.tsStores[storeID] = s
+		}
+	}
+	return nil
+}
+
+func (b *block) close() {
+	for _, store := range b.stores {
+		_ = store.Close()
+	}
+	for _, store := range b.tsStores {
+		_ = store.Close()
+	}
+}
+
+func getStoreID(pluginID string, defineName string) string {
+	return fmt.Sprintf("%s-%s", pluginID, defineName)
+}
+
+var _ AccessPoint = (*staticAccessPoint)(nil)
+
+type staticAccessPoint struct {
+	pluginID string
+	b        *block
+}
+
+func (s *staticAccessPoint) Close() error {
+	s.b.refCount.Dec()
+	return nil
+}
+
+func (s *staticAccessPoint) Store(name string) kv.Store {
+	if s, ok := s.b.stores[getStoreID(s.pluginID, name)]; ok {
+		return s
+	}
+	return nil
+}
+
+func (s *staticAccessPoint) TimeSeriesStore(name string) kv.TimeSeriesStore {
+	if s, ok := s.b.tsStores[getStoreID(s.pluginID, name)]; ok {
+		return s
+	}
+	return nil
+}
diff --git a/banyand/storage/database.go b/banyand/storage/database.go
index f5b0cb6..8bf76bf 100644
--- a/banyand/storage/database.go
+++ b/banyand/storage/database.go
@@ -18,29 +18,62 @@
 package storage
 
 import (
+	"errors"
 	"fmt"
+	"io/fs"
 	"io/ioutil"
 	"os"
 	"sync"
+	"time"
 
 	"go.uber.org/multierr"
 
-	"github.com/apache/skywalking-banyandb/api/data"
-	"github.com/apache/skywalking-banyandb/api/event"
 	"github.com/apache/skywalking-banyandb/banyand/discovery"
-	"github.com/apache/skywalking-banyandb/banyand/internal/bus"
-	"github.com/apache/skywalking-banyandb/banyand/queue"
-	"github.com/apache/skywalking-banyandb/banyand/storage/kv"
 	"github.com/apache/skywalking-banyandb/pkg/run"
 )
 
+const (
+	shardTemplate = "%s/shard-%d"
+	segTemplate   = "%s/seg-%s"
+	blockTemplate = "%s/block-%s"
+
+	segFormat   = "20060102"
+	blockFormat = "1504"
+
+	dirPerm = 0700
+)
+
+var ErrNoDiscoveryRepo = errors.New("no discovery repo exists")
+
 var _ Database = (*DB)(nil)
 
+// DB is a storage manager to physical data model.
+// Notice: The current status of DB is under WIP. It only contains feats support to verify the series module.
 type DB struct {
-	root   string
-	shards int
-	repo   discovery.ServiceRepo
-	q      queue.Queue
+	root      string
+	shards    int
+	sLst      []*shard
+	repo      discovery.ServiceRepo
+	pluginLst []Plugin
+
+	stopCh chan struct{}
+}
+
+func (d *DB) Serve() error {
+	d.stopCh = make(chan struct{})
+	<-d.stopCh
+	return nil
+}
+
+func (d *DB) GracefulStop() {
+	for _, s := range d.sLst {
+		s.stop()
+	}
+	close(d.stopCh)
+}
+
+func (d *DB) Register(plugin Plugin) {
+	d.pluginLst = append(d.pluginLst, plugin)
 }
 
 func (d *DB) Name() string {
@@ -48,10 +81,10 @@ func (d *DB) Name() string {
 }
 
 func (d *DB) FlagSet() *run.FlagSet {
-	fs := run.NewFlagSet("storage")
-	fs.StringVar(&d.root, "root-path", "/tmp", "the root path of database")
-	fs.IntVar(&d.shards, "shards", 1, "total shards size")
-	return fs
+	flagS := run.NewFlagSet("storage")
+	flagS.StringVar(&d.root, "root-path", "/tmp", "the root path of database")
+	flagS.IntVar(&d.shards, "shards", 1, "total shards size")
+	return flagS
 }
 
 func (d *DB) Validate() error {
@@ -59,95 +92,121 @@ func (d *DB) Validate() error {
 }
 
 func (d *DB) PreRun() error {
-	if d.repo == nil {
-		return nil
-	}
 	if err := d.init(); err != nil {
 		return fmt.Errorf("failed to initialize db: %v", err)
 	}
-	if err := d.start(); err != nil {
-		return fmt.Errorf("failed to start db: %v", err)
+	return nil
+}
+
+func (d *DB) init() (err error) {
+	if _, err = mkdir(d.root); err != nil {
+		return fmt.Errorf("failed to create %s: %v", d.root, err)
+	}
+	var entris []fs.FileInfo
+	if entris, err = ioutil.ReadDir(d.root); err != nil {
+		return fmt.Errorf("failed to read directory contents failed: %v", err)
+	}
+	if len(entris) < 1 {
+		return d.createShards()
 	}
-	return d.repo.Publish(bus.Topic(event.ShardEventKindVersion.String()), bus.NewMessage(1, event.NewShard()))
+	return d.loadShards(entris)
 }
 
-type segment struct {
-	lst []kv.Block
-	sync.Mutex
+func (d *DB) loadShards(_ []fs.FileInfo) (err error) {
+	//TODO load existing shards
+	return nil
 }
 
-func (s *segment) AddBlock(b kv.Block) {
-	s.Lock()
-	defer s.Unlock()
-	s.lst = append(s.lst, b)
+func (d *DB) createShards() (err error) {
+	for i := 0; i < d.shards; i++ {
+		var shardLocation string
+		if shardLocation, err = mkdir(shardTemplate, d.root, i); err != nil {
+			return err
+		}
+		s := newShard(i, shardLocation)
+		if sErr := s.init(d.pluginLst); sErr != nil {
+			err = multierr.Append(err, sErr)
+			continue
+		}
+		d.sLst = append(d.sLst, s)
+	}
+	return err
 }
 
 type shard struct {
 	id  int
 	lst []*segment
 	sync.Mutex
+	location string
+}
+
+func newShard(id int, location string) *shard {
+	return &shard{
+		id:       id,
+		location: location,
+	}
 }
 
-func (s *shard) newSeg() *segment {
+func (s *shard) newSeg(path string) *segment {
 	s.Lock()
 	defer s.Unlock()
-	seg := &segment{}
+	seg := &segment{
+		path: path,
+	}
 	s.lst = append(s.lst, seg)
 	return seg
 }
 
-func (s *shard) init() error {
-	seg := s.newSeg()
-	b, err := kv.NewBlock()
+func (s *shard) init(plugins []Plugin) error {
+	segPath, err := mkdir(segTemplate, s.location, time.Now().Format(segFormat))
 	if err != nil {
-		return fmt.Errorf("failed to create segment: %v", err)
+		return fmt.Errorf("failed to make segment directory: %v", err)
 	}
-	seg.AddBlock(b)
-	return nil
+	seg := s.newSeg(segPath)
+	return seg.init(plugins)
 }
 
-func (d *DB) init() (err error) {
-	if err = os.MkdirAll(d.root, os.ModeDir); err != nil {
-		return fmt.Errorf("failed to create %s: %v", d.root, err)
-	}
-	var isEmpty bool
-	if isEmpty, err = isEmptyDir(d.root); err != nil {
-		return fmt.Errorf("checking directory contents failed: %v", err)
-	}
-	if !isEmpty {
-		return nil
-	}
-	for i := 0; i < d.shards; i++ {
-		s := newShard(i)
-		err = multierr.Append(err, s.init())
-	}
-	if err != nil {
-		return fmt.Errorf("failed to init shards: %v", err)
+func (s *shard) stop() {
+	for _, seg := range s.lst {
+		seg.close()
 	}
-	return nil
 }
 
-func (d *DB) start() error {
-	return d.q.Subscribe(bus.Topic(data.TraceKindVersion.String()), d)
+func mkdir(format string, a ...interface{}) (path string, err error) {
+	path = fmt.Sprintf(format, a...)
+	if err = os.MkdirAll(path, dirPerm); err != nil {
+		return "", err
+	}
+	return path, err
 }
 
-func (d *DB) Rev(message bus.Message) {
-	//nolint
-	_, ok := message.Data().(data.Trace)
-	if !ok {
-		return
-	}
-	//TODO: save data into target shard
+type segment struct {
+	lst []*block
+	sync.Mutex
+	path string
 }
 
-func newShard(id int) *shard {
-	return &shard{id: id}
+func (s *segment) addBlock(b *block) {
+	s.Lock()
+	defer s.Unlock()
+	s.lst = append(s.lst, b)
 }
 
-func isEmptyDir(name string) (bool, error) {
-	entries, err := ioutil.ReadDir(name)
+func (s *segment) init(plugins []Plugin) error {
+	blockPath, err := mkdir(blockTemplate, s.path, time.Now().Format(blockFormat))
 	if err != nil {
-		return false, err
+		return fmt.Errorf("failed to make block directory: %v", err)
+	}
+	var b *block
+	if b, err = newBlock(blockPath, plugins); err != nil {
+		return fmt.Errorf("failed to create segment: %v", err)
+	}
+	s.addBlock(b)
+	return b.init()
+}
+
+func (s *segment) close() {
+	for _, block := range s.lst {
+		block.close()
 	}
-	return len(entries) == 0, nil
 }
diff --git a/banyand/storage/database_test.go b/banyand/storage/database_test.go
new file mode 100644
index 0000000..a873420
--- /dev/null
+++ b/banyand/storage/database_test.go
@@ -0,0 +1,165 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package storage
+
+import (
+	"bytes"
+	"context"
+	"fmt"
+	"io/ioutil"
+	"os"
+	"testing"
+	"time"
+
+	"github.com/dgraph-io/badger/v3/y"
+	"github.com/stretchr/testify/assert"
+	"github.com/stretchr/testify/require"
+
+	"github.com/apache/skywalking-banyandb/banyand/kv"
+	"github.com/apache/skywalking-banyandb/pkg/convert"
+	"github.com/apache/skywalking-banyandb/pkg/logger"
+)
+
+func TestDB_Create_Directory(t *testing.T) {
+	tempDir, _ := setUp(t, nil)
+	defer removeDir(tempDir)
+	shardPath := fmt.Sprintf(shardTemplate, tempDir, 0)
+	validateDirectory(t, shardPath)
+	now := time.Now()
+	segPath := fmt.Sprintf(segTemplate, shardPath, now.Format(segFormat))
+	validateDirectory(t, segPath)
+	validateDirectory(t, fmt.Sprintf(blockTemplate, segPath, now.Format(blockFormat)))
+}
+
+func TestDB_Store(t *testing.T) {
+	p := new(mockPlugin)
+	tempDir, db := setUp(t, p)
+	defer func() {
+		db.GracefulStop()
+		removeDir(tempDir)
+	}()
+	ap := p.ApFunc()
+	s := ap.Store("normal")
+	assert.NoError(t, s.Put([]byte("key1"), []byte{12}))
+	val, err := s.Get([]byte("key1"))
+	assert.NoError(t, err)
+	assert.Equal(t, []byte{12}, val)
+
+	s = ap.Store("auto-gen")
+	key, addErr := s.Add([]byte{11})
+	assert.NoError(t, addErr)
+	val, err = s.Get(convert.Uint64ToBytes(key))
+	assert.NoError(t, err)
+	assert.Equal(t, []byte{11}, val)
+
+	tss := ap.TimeSeriesStore("time-series")
+	assert.NoError(t, tss.Put([]byte("key11"), []byte{33}, 1))
+	val, err = tss.Get([]byte("key11"), 1)
+	assert.NoError(t, err)
+	assert.Equal(t, []byte{33}, val)
+	vals, allErr := tss.GetAll([]byte("key11"))
+	assert.NoError(t, allErr)
+	assert.Equal(t, [][]byte{{33}}, vals)
+}
+
+func setUp(t *testing.T, p *mockPlugin) (tempDir string, db Database) {
+	require.NoError(t, logger.Init(logger.Logging{
+		Env:   "dev",
+		Level: "debug",
+	}))
+	db, err := NewDB(context.Background(), nil)
+	require.NoError(t, err)
+	require.NotNil(t, db)
+
+	var tempDirErr error
+	tempDir, tempDirErr = ioutil.TempDir("", "banyandb-test-*")
+	require.Nil(t, tempDirErr)
+
+	require.NoError(t, db.FlagSet().Parse([]string{"--root-path", tempDir, "--shards", "1"}))
+	if p != nil {
+		db.Register(p)
+	}
+	require.NoError(t, db.PreRun())
+	go func() {
+		require.NoError(t, db.Serve())
+	}()
+	return tempDir, db
+}
+
+func validateDirectory(t *testing.T, dir string) {
+	info, err := os.Stat(dir)
+	assert.False(t, os.IsNotExist(err), "Directory does not exist: %v", dir)
+	assert.NoError(t, err, "Directory error: %v", dir)
+	assert.True(t, info.IsDir(), "Directory is a file, not a directory: %#v\n", dir)
+}
+
+func removeDir(dir string) {
+	if err := os.RemoveAll(dir); err != nil {
+		fmt.Printf("Error while removing dir: %v\n", err)
+	}
+}
+
+var _ Plugin = (*mockPlugin)(nil)
+
+type mockPlugin struct {
+	ApFunc GetAccessPoint
+}
+
+func (m *mockPlugin) ID() string {
+	return "foo"
+}
+
+func (m *mockPlugin) Init() []KVSpec {
+	return []KVSpec{
+		{
+			Name: "normal",
+			Type: KVTypeNormal,
+		},
+		{
+			Name:       "auto-gen",
+			Type:       KVTypeNormal,
+			AutoGenKey: true,
+		},
+		{
+			Name:           "time-series",
+			Type:           KVTypeTimeSeries,
+			TimeSeriesHook: &mockHook{},
+		},
+	}
+}
+
+func (m *mockPlugin) Start(point GetAccessPoint) {
+	m.ApFunc = point
+}
+
+var _ kv.Hook = (*mockHook)(nil)
+
+type mockHook struct {
+}
+
+func (m *mockHook) Reduce(left bytes.Buffer, right y.ValueStruct) bytes.Buffer {
+	return left
+}
+
+func (m *mockHook) Extract(raw []byte, ts uint64) ([]byte, error) {
+	return raw, nil
+}
+
+func (m *mockHook) Split(raw []byte) ([][]byte, error) {
+	return [][]byte{raw}, nil
+}
diff --git a/banyand/storage/storage.go b/banyand/storage/storage.go
index feb408c..dfad909 100644
--- a/banyand/storage/storage.go
+++ b/banyand/storage/storage.go
@@ -19,17 +19,67 @@ package storage
 
 import (
 	"context"
+	"io"
 
 	"github.com/apache/skywalking-banyandb/banyand/discovery"
-	"github.com/apache/skywalking-banyandb/banyand/queue"
+	"github.com/apache/skywalking-banyandb/banyand/kv"
 	"github.com/apache/skywalking-banyandb/pkg/run"
 )
 
+// KVType defines the kind of a KV storage
+type KVType uint8
+
+const (
+	// KVTypeNormal is normal KV storage
+	KVTypeNormal KVType = 0
+	// KVTypeTimeSeries is a time-series KV storage
+	KVTypeTimeSeries KVType = 1
+)
+
+// Database is the storage manager which implements the physical data model
 type Database interface {
 	run.Config
 	run.PreRunner
+	run.Service
+	// Register a Plugin into a Database
+	Register(plugin Plugin)
+}
+
+// NewDB returns a new Database
+func NewDB(_ context.Context, repo discovery.ServiceRepo) (Database, error) {
+	return &DB{repo: repo}, nil
 }
 
-func NewDB(ctx context.Context, repo discovery.ServiceRepo, pipeline queue.Queue) (Database, error) {
-	return &DB{repo: repo, q: pipeline}, nil
+// Plugin helps Database create kv storage with a specific data model.
+// Trace, metric and log series could be registered as Plugin to save data into kv regardless of where the kv
+// are placed on the physical devices.
+type Plugin interface {
+	// ID is unique identifier
+	ID() string
+	// Init the Plugin, and generate KVSpec to indicate how kv storages are built
+	Init() []KVSpec
+	// Start the Plugin and inject the a func to get the AccessPoint
+	Start(GetAccessPoint)
 }
+
+// KVSpec defines the behaviours of a KV store.
+// Database get these specs which are generated by Plugin to create underlying kv storage.
+type KVSpec struct {
+	Name           string
+	Type           KVType
+	AutoGenKey     bool
+	TimeSeriesHook kv.Hook
+}
+
+// AccessPoint is a reference to a underlying area.
+// A single write will be split into several kv writes. These writes can be placed into the same area though an AccessPoint
+// even though the area rotation(a new area replaces the old area) takes place.
+// When a new area is created, the old one can be closed until all AccessPoint refers to it are closed.
+type AccessPoint interface {
+	io.Closer
+	Store(name string) kv.Store
+	TimeSeriesStore(name string) kv.TimeSeriesStore
+}
+
+// GetAccessPoint returns the AccessPoint which refers to the current active area.
+type GetAccessPoint func() AccessPoint
diff --git a/go.mod b/go.mod
index 77dcde2..4c7e9c0 100644
--- a/go.mod
+++ b/go.mod
@@ -3,12 +3,17 @@ module github.com/apache/skywalking-banyandb
 go 1.16
 
 require (
+	github.com/dgraph-io/badger/v3 v3.2011.1
 	github.com/google/flatbuffers v1.12.0
 	github.com/oklog/run v1.1.0
 	github.com/spf13/cobra v1.1.3
 	github.com/spf13/pflag v1.0.5
 	github.com/spf13/viper v1.7.1
+	github.com/stretchr/testify v1.7.0
+	go.uber.org/atomic v1.7.0
 	go.uber.org/multierr v1.6.0
 	go.uber.org/zap v1.16.0
 	google.golang.org/grpc v1.37.0
 )
+
+replace github.com/dgraph-io/badger/v3 v3.2011.1 => github.com/SkyAPM/badger/v3 v3.0.0-20210527215642-f1c960d2de88
diff --git a/go.sum b/go.sum
index 444ebce..9a41ac4 100644
--- a/go.sum
+++ b/go.sum
@@ -14,10 +14,16 @@ dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7
 github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
 github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
 github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
+github.com/DataDog/zstd v1.4.6-0.20210216161059-8cb8bacba7ba h1:3qB2yylqW3kVPr9QoPZtTJOXsJOUdNWT2CrZcifhs5g=
+github.com/DataDog/zstd v1.4.6-0.20210216161059-8cb8bacba7ba/go.mod h1:g4AWEaM3yOg3HYfnJ3YIawPnVdXJh9QME85blwSAmyw=
+github.com/OneOfOne/xxhash v1.2.2 h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE=
 github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
+github.com/SkyAPM/badger/v3 v3.0.0-20210527215642-f1c960d2de88 h1:S/pluxBSFbM+1s7RMazOkHimDLNBHDhDQBUwMb18568=
+github.com/SkyAPM/badger/v3 v3.0.0-20210527215642-f1c960d2de88/go.mod h1:NiMHhAd/Elhx154lGQ4oJennK/f4ADMqhbggxs/mAew=
 github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
 github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
 github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o=
+github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8=
 github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY=
 github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8=
 github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
@@ -25,20 +31,33 @@ github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+Ce
 github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs=
 github.com/bketelsen/crypt v0.0.3-0.20200106085610-5cbc8cc4026c/go.mod h1:MKsuJmJgSg28kpZDP6UIiPt0e0Oz0kqKNGyRaWEPv84=
 github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
+github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko=
 github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
+github.com/cespare/xxhash/v2 v2.1.1 h1:6MnRN8NT7+YBpUIWxHtefFZOKTAPgGjpQSxqLNn0+qY=
+github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
 github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
 github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk=
 github.com/coreos/bbolt v1.3.2/go.mod h1:iRUV2dpdMOn7Bo10OQBFzIJO9kkE559Wcmn+qkEiiKk=
+github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE=
 github.com/coreos/etcd v3.3.13+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE=
+github.com/coreos/go-etcd v2.0.0+incompatible/go.mod h1:Jez6KQU2B/sWsbdaef3ED8NzMklzPG4d5KIOhIy30Tk=
+github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk=
 github.com/coreos/go-semver v0.3.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk=
 github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4=
 github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA=
+github.com/cpuguy83/go-md2man v1.0.10/go.mod h1:SmD6nW6nTyfqj6ABTjUi3V3JVMnlJmwcJI5acqYI6dE=
 github.com/cpuguy83/go-md2man/v2 v2.0.0/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU=
 github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
 github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
 github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/dgraph-io/ristretto v0.0.4-0.20210504190834-0bf2acd73aa3 h1:jU/wpYsEL+8JPLf/QcjkQKI5g0dOjSuwcMjkThxt5x0=
+github.com/dgraph-io/ristretto v0.0.4-0.20210504190834-0bf2acd73aa3/go.mod h1:fux0lOrBhrVCJd3lcTHsIJhq1T2rokOu6v9Vcb3Q9ug=
 github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
+github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 h1:tdlZCpZ/P9DhczCTSixgIKmwPv6+wP5DGjqLYw5SUiA=
+github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw=
 github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954/go.mod h1:vAd38F8PWV+bWy6jNmig1y/TA+kYO4g3RSRF0IAv0no=
+github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo=
+github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
 github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
 github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
 github.com/envoyproxy/go-control-plane v0.9.9-0.20210217033140-668b12f5399d/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk=
@@ -54,8 +73,13 @@ github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V
 github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
 github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
 github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4=
+github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
+github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
+github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58=
 github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
 github.com/golang/groupcache v0.0.0-20190129154638-5b532d6fd5ef/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
+github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6 h1:ZgQEtGgCBiWRM39fZuwSd1LwSqqSW0hOdXCYYDX0R3I=
+github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
 github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
 github.com/golang/mock v1.2.0/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
 github.com/golang/mock v1.3.1/go.mod h1:sBzyDLLjw3U8JLTeZvSv8jJB+tU5PVekmnlKIyFUx0Y=
@@ -70,6 +94,8 @@ github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvq
 github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QDs8UjoX8=
 github.com/golang/protobuf v1.4.2 h1:+Z5KGCizgyZCbGh1KZqA0fcLLkwbsjIzS4aV2v7wJX0=
 github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
+github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
+github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
 github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
 github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
 github.com/google/flatbuffers v1.12.0 h1:/PtAHvnBY4Kqnx/xCQ3OIV9uYcSFGScBsWI3Oogeh6w=
@@ -78,8 +104,9 @@ github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5a
 github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
 github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
 github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
-github.com/google/go-cmp v0.5.0 h1:/QaMHBdZ26BB3SSst0Iwl10Epc+xhTquomWX0oZEB6w=
 github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
+github.com/google/go-cmp v0.5.4 h1:L8R9j+yAqZuZjsqh/z+F1NCffTKKLShY6zXTItVIZ8M=
+github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
 github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs=
 github.com/google/pprof v0.0.0-20181206194817-3ea8567a2e57/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc=
 github.com/google/pprof v0.0.0-20190515194954-54271f7e092f/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc=
@@ -123,6 +150,7 @@ github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7
 github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
 github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w=
 github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q=
+github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
 github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
 github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
 github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc=
@@ -131,6 +159,7 @@ github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORN
 github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
 github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
 github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
+github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
 github.com/magiconair/properties v1.8.1 h1:ZC2Vc7/ZFkGmsVC9KvOjumD+G5lXy2RtTKyzRKO2BQ4=
 github.com/magiconair/properties v1.8.1/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
 github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU=
@@ -156,8 +185,9 @@ github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FI
 github.com/pelletier/go-toml v1.2.0 h1:T5zMGML61Wp+FlcbWjRDT7yAxhJNAiPPLOFECq181zc=
 github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic=
 github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
-github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
 github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
+github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
+github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
 github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
 github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
 github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndrE9hABlRI=
@@ -173,6 +203,7 @@ github.com/prometheus/procfs v0.0.0-20190507164030-5867b95ac084/go.mod h1:TjEm7z
 github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU=
 github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
 github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
+github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g=
 github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
 github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts=
 github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc=
@@ -184,10 +215,13 @@ github.com/smartystreets/goconvey v1.6.4 h1:fv0U8FUIMPNf1L9lnHLvLhgicrIVChEkdzIK
 github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA=
 github.com/soheilhy/cmux v0.1.4/go.mod h1:IM3LyeVVIOuxMH7sFAkER9+bJ4dT7Ms6E4xg4kGIyLM=
 github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
+github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI=
+github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
 github.com/spf13/afero v1.1.2 h1:m8/z1t7/fwjysjQRYbP0RD+bUIF/8tJwPdEZsI83ACI=
 github.com/spf13/afero v1.1.2/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B0CQ=
 github.com/spf13/cast v1.3.0 h1:oget//CVOEoFewqQxwr0Ej5yjygnqGkvggSE/gB35Q8=
 github.com/spf13/cast v1.3.0/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE=
+github.com/spf13/cobra v0.0.5/go.mod h1:3K3wKZymM7VvHMDS9+Akkh4K60UwM26emMESw8tLCHU=
 github.com/spf13/cobra v1.1.3 h1:xghbfqPkxzxP3C/f3n5DdpAbdKLj4ZE4BWQI362l53M=
 github.com/spf13/cobra v1.1.3/go.mod h1:pGADOWyqRD/YMrPZigI/zbliZ2wVD/23d+is3pSWzOo=
 github.com/spf13/jwalterweatherman v1.0.0 h1:XHEdyB+EcvlqZamSM4ZOMGlc93t6AcsBEu9Gc1vn7yk=
@@ -195,6 +229,7 @@ github.com/spf13/jwalterweatherman v1.0.0/go.mod h1:cQK4TGJAtQXfYWX+Ddv3mKDzgVb6
 github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4=
 github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
 github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
+github.com/spf13/viper v1.3.2/go.mod h1:ZiWeW+zYFKm7srdB9IoDzzZXaJaI5eL9QjNiN/DMA2s=
 github.com/spf13/viper v1.7.0/go.mod h1:8WkrPz2fc9jxqZNCJI/76HCieCp4Q8HaLFoCha5qpdg=
 github.com/spf13/viper v1.7.1 h1:pM5oEahlgWv/WnHXpgbKz7iLIxRf65tye2Ci+XFK5sk=
 github.com/spf13/viper v1.7.1/go.mod h1:8WkrPz2fc9jxqZNCJI/76HCieCp4Q8HaLFoCha5qpdg=
@@ -203,15 +238,22 @@ github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+
 github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
 github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
 github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
-github.com/stretchr/testify v1.5.1 h1:nOGnQDM7FYENwehXlg/kFVnos3rEvtKTjRvOWSzb6H4=
 github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
+github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
+github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
 github.com/subosito/gotenv v1.2.0 h1:Slr1R9HxAlEKefgq5jn9U+DnETlIUa6HfgEzj0g5d7s=
 github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw=
 github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
+github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0=
 github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU=
+github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q=
+github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
+github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
 go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU=
 go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU=
 go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8=
+go.opencensus.io v0.22.5 h1:dntmOdLpSpHlVqbW5Eay97DelsZHe+55D+xC6i0dDS0=
+go.opencensus.io v0.22.5/go.mod h1:5pWMHQbX5EPX2/62yrJeAkowc+lfs/XD7Uxpq3pI6kk=
 go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
 go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
 go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw=
@@ -226,9 +268,12 @@ go.uber.org/zap v1.16.0 h1:uFRZXykJGK9lLY4HtgSw44DnIcAM+kRBP7x5m+NpAOM=
 go.uber.org/zap v1.16.0/go.mod h1:MA8QOfq0BHJwdXa996Y4dYkAqRKB8/1K1QMMZVaNZjQ=
 golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
 golang.org/x/crypto v0.0.0-20181029021203-45a5f77698d3/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
+golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
 golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
 golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
 golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
+golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
+golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
 golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
 golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
 golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8=
@@ -248,6 +293,9 @@ golang.org/x/mobile v0.0.0-20190312151609-d3739f865fa6/go.mod h1:z+o9i4GpDbdi3rU
 golang.org/x/mobile v0.0.0-20190719004257-d2bd2a29d028/go.mod h1:E/iHnbuqvinMTCcRqshq8CkpyQDoeVncDDYHnLhea+o=
 golang.org/x/mod v0.0.0-20190513183733-4bf6d317e70e/go.mod h1:mXi4GBBbnImb6dmsKGUJ2LatrhH/nqhxcFungHvyanc=
 golang.org/x/mod v0.1.0/go.mod h1:0QHyrYULN0/3qlju5TqG8bIK38QM8yzMo5ekMj3DlcY=
+golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
+golang.org/x/mod v0.3.0 h1:RM4zey1++hCTbCVQfnWeKs9/IEsaBLA8vTkd0WVtmH4=
+golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
 golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
 golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
 golang.org/x/net v0.0.0-20181023162649-9b4f9f5ad519/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
@@ -261,8 +309,10 @@ golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn
 golang.org/x/net v0.0.0-20190501004415-9ce7a6920f09/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
 golang.org/x/net v0.0.0-20190503192946-f4e77d36d62c/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
 golang.org/x/net v0.0.0-20190603091049-60506f45cf65/go.mod h1:HSz+uSET+XFnRR8LxR5pz3Of3rY3CfYBVs4xY44aLks=
-golang.org/x/net v0.0.0-20190620200207-3b0461eec859 h1:R/3boaszxrf1GEUWTVDzSKVwLmSJpwZ1yqXm8j0v2QI=
 golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
+golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
+golang.org/x/net v0.0.0-20201021035429-f5854403a974 h1:IX6qOQeG5uLjB/hjjwjedwfjND0hgjPMMyO1RoIXQNI=
+golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
 golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
 golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
 golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
@@ -271,24 +321,30 @@ golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJ
 golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
 golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
 golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
 golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
 golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
 golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
 golang.org/x/sys v0.0.0-20181026203630-95b1ffbd15a5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
 golang.org/x/sys v0.0.0-20181107165924-66b7b1311ac8/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
 golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+golang.org/x/sys v0.0.0-20181205085412-a5c9d58dba9a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
 golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
 golang.org/x/sys v0.0.0-20190312061237-fead79001313/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20190502145724-3ef323f4f1fd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20190507160741-ecd444e8653b/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20190606165138-5da285871e9c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
-golang.org/x/sys v0.0.0-20190624142023-c5567b49c5d0 h1:HyfiK1WMnHj5FXFXatD+Qs1A/xC2Run6RzeW1SyHxpc=
 golang.org/x/sys v0.0.0-20190624142023-c5567b49c5d0/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c h1:VwygUrnw9jn88c4u8GD3rZQbqrP/tgas88tPUbBxQrk=
+golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
 golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
-golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs=
 golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
+golang.org/x/text v0.3.3 h1:cokOdA+Jmi5PJGXLlLllQSgYigAEfHXJAERHVMaCc2k=
+golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
 golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
 golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
 golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
@@ -310,11 +366,16 @@ golang.org/x/tools v0.0.0-20190911174233-4f2ddba30aff/go.mod h1:b+2E5dAYhXwXZwtn
 golang.org/x/tools v0.0.0-20191012152004-8de300cfc20a/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
 golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
 golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
-golang.org/x/tools v0.0.0-20191112195655-aa38f8e97acc h1:NCy3Ohtk6Iny5V/reW2Ktypo4zIpWBdRJ1uFMjBxdg8=
 golang.org/x/tools v0.0.0-20191112195655-aa38f8e97acc/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
+golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
+golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
+golang.org/x/tools v0.0.0-20210106214847-113979e3529a h1:CB3a9Nez8M13wwlr/E2YtwoU+qYHKfC+JrDa45RXXoQ=
+golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
 golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
-golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
+golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
 golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE=
+golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
 google.golang.org/api v0.4.0/go.mod h1:8k5glujaEP+g9n7WNsDg8QP6cUVNI86fCNMcbazEtwE=
 google.golang.org/api v0.7.0/go.mod h1:WtwebWUNSVBH/HAw79HIFXZNqEvBhG+Ra+ax0hx3E3M=
 google.golang.org/api v0.8.0/go.mod h1:o4eAsZoiT+ibD93RtjEohWalFOjRDx6CVaqeizhEnKg=
@@ -355,8 +416,9 @@ google.golang.org/protobuf v1.25.0 h1:Ejskq+SyPohKW+1uil0JJMtmHCgJPJ/qWTxr8qp+R4
 google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c=
 gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
 gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
-gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
 gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo=
+gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
 gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
 gopkg.in/ini.v1 v1.51.0 h1:AQvPpx3LzTDM0AjnIRlVFwFFGC+npRopjZxLJj6gdno=
 gopkg.in/ini.v1 v1.51.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
@@ -367,6 +429,8 @@ gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
 gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
 gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
 gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
+gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
+gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
 honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
 honnef.co/go/tools v0.0.0-20190106161140-3f1c8253044a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
 honnef.co/go/tools v0.0.0-20190418001031-e561f6794a2a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
diff --git a/banyand/storage/kv/kv.go b/pkg/bytes/bytes.go
similarity index 82%
copy from banyand/storage/kv/kv.go
copy to pkg/bytes/bytes.go
index c145286..b4d051c 100644
--- a/banyand/storage/kv/kv.go
+++ b/pkg/bytes/bytes.go
@@ -15,11 +15,17 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package kv
+package bytes
 
-type Block interface {
-}
+func Join(s ...[]byte) []byte {
+	n := 0
+	for _, v := range s {
+		n += len(v)
+	}
 
-func NewBlock() (Block, error) {
-	return nil, nil
+	b, i := make([]byte, n), 0
+	for _, v := range s {
+		i += copy(b[i:], v)
+	}
+	return b
 }
diff --git a/banyand/storage/kv/kv.go b/pkg/convert/number.go
similarity index 71%
rename from banyand/storage/kv/kv.go
rename to pkg/convert/number.go
index c145286..9656912 100644
--- a/banyand/storage/kv/kv.go
+++ b/pkg/convert/number.go
@@ -15,11 +15,22 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package kv
+package convert
 
-type Block interface {
+import "encoding/binary"
+
+func Uint64ToBytes(u uint64) []byte {
+	bs := make([]byte, 8)
+	binary.BigEndian.PutUint64(bs, u)
+	return bs
+}
+
+func Uint32ToBytes(u uint32) []byte {
+	bs := make([]byte, 8)
+	binary.BigEndian.PutUint32(bs, u)
+	return bs
 }
 
-func NewBlock() (Block, error) {
-	return nil, nil
+func BytesToUint64(b []byte) uint64 {
+	return binary.BigEndian.Uint64(b)
 }
diff --git a/pkg/logger/logger.go b/pkg/logger/logger.go
index 8195056..737c939 100644
--- a/pkg/logger/logger.go
+++ b/pkg/logger/logger.go
@@ -18,6 +18,8 @@
 package logger
 
 import (
+	"strings"
+
 	"go.uber.org/zap"
 	"go.uber.org/zap/zapcore"
 )
@@ -34,6 +36,11 @@ type Logger struct {
 	*zap.Logger
 }
 
+func (l *Logger) Named(name string) *Logger {
+	module := strings.Join([]string{l.module, name}, ".")
+	return &Logger{module: module, Logger: root.Logger.Named(module)}
+}
+
 // String constructs a field with the given key and value.
 func String(key string, val string) zap.Field {
 	return zap.Field{Key: key, Type: zapcore.StringType, String: val}
@@ -44,16 +51,16 @@ func Error(err error) zap.Field {
 	return zap.NamedError("error", err)
 }
 
-// Uint16 constructs a field with the given key and value.
-func Uint16(key string, val uint16) zap.Field {
-	return zap.Field{Key: key, Type: zapcore.Uint16Type, Integer: int64(val)}
-}
-
 // Uint32 constructs a field with the given key and value.
 func Uint32(key string, val uint32) zap.Field {
 	return zap.Field{Key: key, Type: zapcore.Uint32Type, Integer: int64(val)}
 }
 
+// Uint64 constructs a field with the given key and value.
+func Uint64(key string, val uint64) zap.Field {
+	return zap.Field{Key: key, Type: zapcore.Uint64Type, Integer: int64(val)}
+}
+
 // Int32 constructs a field with the given key and value.
 func Int32(key string, val int32) zap.Field {
 	return zap.Field{Key: key, Type: zapcore.Int32Type, Integer: int64(val)}
@@ -64,6 +71,11 @@ func Int64(key string, val int64) zap.Field {
 	return zap.Field{Key: key, Type: zapcore.Int64Type, Integer: val}
 }
 
+// Binary constructs a field with the given key and value.
+func Binary(key string, value interface{}) zap.Field {
+	return zap.Field{Key: key, Type: zapcore.BinaryType, Interface: value}
+}
+
 // Any takes a key and an arbitrary value and chooses the best way to represent
 // them as a field, falling back to a reflection-based approach only if
 // necessary.