You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ha...@apache.org on 2022/04/01 01:41:29 UTC

[skywalking-banyandb] 01/01: Improve the observability of banyandb: * Add the prometheus exporter * Export memory table gauges * Introduce a self running generator to generate stream data

This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch observability
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit 18bb574205cab1fe4c0139e0c886e7452ae9243c
Author: Gao Hongtao <ha...@gmail.com>
AuthorDate: Fri Apr 1 01:33:23 2022 +0000

    Improve the observability of banyandb:
      * Add the prometheus exporter
      * Export memory table gauges
      * Introduce a self running generator to generate stream
        data
    
    Signed-off-by: Gao Hongtao <ha...@gmail.com>
---
 api/common/id.go                                   |  43 +++++-
 banyand/internal/cmd/standalone.go                 |   6 +-
 banyand/kv/badger.go                               |  17 +++
 banyand/kv/kv.go                                   |   3 +
 banyand/measure/metadata.go                        |   6 +-
 banyand/{prof/pprof.go => observability/metric.go} |  38 ++---
 banyand/{prof => observability}/pprof.go           |   7 +-
 api/common/id.go => banyand/observability/type.go  |  17 +--
 banyand/stream/metadata.go                         |   6 +-
 banyand/tsdb/block.go                              |  24 ++-
 banyand/tsdb/index/writer.go                       |   3 +
 banyand/tsdb/metric.go                             | 111 ++++++++++++++
 banyand/tsdb/scope.go                              |   9 +-
 banyand/tsdb/segment.go                            |  48 +++---
 banyand/tsdb/seriesdb.go                           |   6 +
 banyand/tsdb/shard.go                              |  50 +++++--
 go.mod                                             |  16 +-
 go.sum                                             |  30 ++--
 pkg/index/index.go                                 |   2 +
 pkg/index/inverted/field_map.go                    |  10 ++
 pkg/index/inverted/inverted.go                     |  14 ++
 pkg/index/inverted/mem.go                          |   5 +
 pkg/index/inverted/term_map.go                     |  10 ++
 pkg/index/lsm/lsm.go                               |   5 +
 pkg/index/posting/posting.go                       |   2 +
 pkg/index/posting/roaring/roaring.go               |   4 +
 pkg/test/stream/traffic/searchable_template.json   |  12 ++
 pkg/test/stream/traffic/traffic_test.go            | 163 +++++++++++++++++++++
 28 files changed, 574 insertions(+), 93 deletions(-)

diff --git a/api/common/id.go b/api/common/id.go
index 9964786..f6a55b4 100644
--- a/api/common/id.go
+++ b/api/common/id.go
@@ -17,7 +17,13 @@
 
 package common
 
-import "github.com/apache/skywalking-banyandb/pkg/convert"
+import (
+	"context"
+
+	"github.com/prometheus/client_golang/prometheus"
+
+	"github.com/apache/skywalking-banyandb/pkg/convert"
+)
 
 type SeriesID uint64
 type ShardID uint32
@@ -26,3 +32,38 @@ type ItemID uint64
 func (s SeriesID) Marshal() []byte {
 	return convert.Uint64ToBytes(uint64(s))
 }
+
+var PositionKey = contextPositionKey{}
+
+type contextPositionKey struct{}
+
+type Position struct {
+	Module   string
+	Database string
+	Shard    string
+	Segment  string
+	Block    string
+	KV       string
+}
+
+func (p Position) Labels() prometheus.Labels {
+	return prometheus.Labels{
+		"module":   p.Module,
+		"database": p.Database,
+		"shard":    p.Shard,
+		"seg":      p.Segment,
+		"block":    p.Block,
+		"kv":       p.KV,
+	}
+}
+
+func SetPosition(ctx context.Context, fn func(p Position) Position) context.Context {
+	val := ctx.Value(PositionKey)
+	var p Position
+	if val == nil {
+		p = Position{}
+	} else {
+		p = val.(Position)
+	}
+	return context.WithValue(ctx, PositionKey, fn(p))
+}
diff --git a/banyand/internal/cmd/standalone.go b/banyand/internal/cmd/standalone.go
index f9cde55..2919212 100644
--- a/banyand/internal/cmd/standalone.go
+++ b/banyand/internal/cmd/standalone.go
@@ -27,7 +27,7 @@ import (
 	"github.com/apache/skywalking-banyandb/banyand/liaison"
 	"github.com/apache/skywalking-banyandb/banyand/measure"
 	"github.com/apache/skywalking-banyandb/banyand/metadata"
-	"github.com/apache/skywalking-banyandb/banyand/prof"
+	"github.com/apache/skywalking-banyandb/banyand/observability"
 	"github.com/apache/skywalking-banyandb/banyand/query"
 	"github.com/apache/skywalking-banyandb/banyand/queue"
 	"github.com/apache/skywalking-banyandb/banyand/stream"
@@ -74,7 +74,8 @@ func newStandaloneCmd() *cobra.Command {
 	if err != nil {
 		l.Fatal().Err(err).Msg("failed to initiate Endpoint transport layer")
 	}
-	profSvc := prof.NewProfService()
+	profSvc := observability.NewProfService()
+	metricSvc := observability.NewMetricService()
 
 	// Meta the run Group units.
 	g.Register(
@@ -86,6 +87,7 @@ func newStandaloneCmd() *cobra.Command {
 		streamSvc,
 		q,
 		tcp,
+		metricSvc,
 		profSvc,
 	)
 	logging := logger.Logging{}
diff --git a/banyand/kv/badger.go b/banyand/kv/badger.go
index de6394b..4482721 100644
--- a/banyand/kv/badger.go
+++ b/banyand/kv/badger.go
@@ -27,6 +27,7 @@ import (
 	"github.com/dgraph-io/badger/v3/bydb"
 	"github.com/dgraph-io/badger/v3/y"
 
+	"github.com/apache/skywalking-banyandb/banyand/observability"
 	"github.com/apache/skywalking-banyandb/pkg/encoding"
 	"github.com/apache/skywalking-banyandb/pkg/logger"
 )
@@ -47,6 +48,18 @@ type badgerTSS struct {
 	badger.TSet
 }
 
+func (b *badgerTSS) Stats() (s observability.Statistics) {
+	return badgerStats(b.db)
+}
+
+func badgerStats(db *badger.DB) (s observability.Statistics) {
+	stat := db.Stats()
+	return observability.Statistics{
+		MemBytes:    stat.MemBytes,
+		MaxMemBytes: db.Opts().MemTableSize,
+	}
+}
+
 func (b *badgerTSS) Close() error {
 	if b.db != nil && !b.db.IsClosed() {
 		return b.db.Close()
@@ -110,6 +123,10 @@ type badgerDB struct {
 	db      *badger.DB
 }
 
+func (b *badgerDB) Stats() observability.Statistics {
+	return badgerStats(b.db)
+}
+
 func (b *badgerDB) Handover(iterator Iterator) error {
 	return b.db.HandoverIterator(&mergedIter{
 		delegated: iterator,
diff --git a/banyand/kv/kv.go b/banyand/kv/kv.go
index fdca551..103959f 100644
--- a/banyand/kv/kv.go
+++ b/banyand/kv/kv.go
@@ -25,6 +25,7 @@ import (
 	"github.com/dgraph-io/badger/v3"
 	"github.com/pkg/errors"
 
+	"github.com/apache/skywalking-banyandb/banyand/observability"
 	"github.com/apache/skywalking-banyandb/pkg/encoding"
 	"github.com/apache/skywalking-banyandb/pkg/logger"
 )
@@ -62,6 +63,7 @@ type Reader interface {
 
 // Store is a common kv storage with auto-generated key
 type Store interface {
+	observability.Observable
 	io.Closer
 	Writer
 	Reader
@@ -84,6 +86,7 @@ type TimeSeriesReader interface {
 
 // TimeSeriesStore is time series storage
 type TimeSeriesStore interface {
+	observability.Observable
 	io.Closer
 	TimeSeriesWriter
 	TimeSeriesReader
diff --git a/banyand/measure/metadata.go b/banyand/measure/metadata.go
index 2a2e96e..4858b30 100644
--- a/banyand/measure/metadata.go
+++ b/banyand/measure/metadata.go
@@ -21,6 +21,7 @@ import (
 	"context"
 	"time"
 
+	"github.com/apache/skywalking-banyandb/api/common"
 	"github.com/apache/skywalking-banyandb/api/event"
 	commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
 	databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
@@ -195,7 +196,10 @@ func (s *supplier) ResourceSchema(repo metadata.Repo, md *commonv1.Metadata) (re
 
 func (s *supplier) OpenDB(groupSchema *commonv1.Group) (tsdb.Database, error) {
 	return tsdb.OpenDatabase(
-		context.TODO(),
+		context.WithValue(context.Background(), common.PositionKey, common.Position{
+			Module:   "stream",
+			Database: groupSchema.Metadata.Name,
+		}),
 		tsdb.DatabaseOpts{
 			Location: s.path,
 			ShardNum: groupSchema.ResourceOpts.ShardNum,
diff --git a/banyand/prof/pprof.go b/banyand/observability/metric.go
similarity index 61%
copy from banyand/prof/pprof.go
copy to banyand/observability/metric.go
index ac27f62..29f353b 100644
--- a/banyand/prof/pprof.go
+++ b/banyand/observability/metric.go
@@ -14,59 +14,61 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-
-package prof
+//
+package observability
 
 import (
 	"net/http"
-	// Register pprof package
-	_ "net/http/pprof"
+
+	"github.com/prometheus/client_golang/prometheus/promhttp"
 
 	"github.com/apache/skywalking-banyandb/pkg/logger"
 	"github.com/apache/skywalking-banyandb/pkg/run"
 )
 
 var (
-	_ run.Service = (*pprofService)(nil)
-	_ run.Config  = (*pprofService)(nil)
+	_ run.Service = (*metricService)(nil)
+	_ run.Config  = (*metricService)(nil)
 )
 
-func NewProfService() run.Service {
-	return &pprofService{
+func NewMetricService() run.Service {
+	return &metricService{
 		stopCh: make(chan struct{}),
 	}
 }
 
-type pprofService struct {
+type metricService struct {
 	listenAddr string
 	stopCh     chan struct{}
 	l          *logger.Logger
 }
 
-func (p *pprofService) FlagSet() *run.FlagSet {
-	flagSet := run.NewFlagSet("prof")
-	flagSet.StringVar(&p.listenAddr, "pprof-listener-addr", "127.0.0.1:6060", "listen addr for pprof")
+func (p *metricService) FlagSet() *run.FlagSet {
+	flagSet := run.NewFlagSet("observability")
+	flagSet.StringVar(&p.listenAddr, "observability-listener-addr", ":2121", "listen addr for observability")
 	return flagSet
 }
 
-func (p *pprofService) Validate() error {
+func (p *metricService) Validate() error {
 	return nil
 }
 
-func (p *pprofService) Name() string {
-	return "pprof-service"
+func (p *metricService) Name() string {
+	return "metric-service"
 }
 
-func (p *pprofService) Serve() run.StopNotify {
+func (p *metricService) Serve() run.StopNotify {
 	p.l = logger.GetLogger(p.Name())
+	http.Handle("/metrics", promhttp.Handler())
 	go func() {
-		p.l.Info().Str("listenAddr", p.listenAddr).Msg("Start pprof server")
+		p.l.Info().Str("listenAddr", p.listenAddr).Msg("Start metric server")
 		_ = http.ListenAndServe(p.listenAddr, nil)
+		p.stopCh <- struct{}{}
 	}()
 
 	return p.stopCh
 }
 
-func (p *pprofService) GracefulStop() {
+func (p *metricService) GracefulStop() {
 	close(p.stopCh)
 }
diff --git a/banyand/prof/pprof.go b/banyand/observability/pprof.go
similarity index 93%
rename from banyand/prof/pprof.go
rename to banyand/observability/pprof.go
index ac27f62..6e9bfe8 100644
--- a/banyand/prof/pprof.go
+++ b/banyand/observability/pprof.go
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package prof
+package observability
 
 import (
 	"net/http"
@@ -27,8 +27,8 @@ import (
 )
 
 var (
-	_ run.Service = (*pprofService)(nil)
-	_ run.Config  = (*pprofService)(nil)
+	_ run.Service = (*metricService)(nil)
+	_ run.Config  = (*metricService)(nil)
 )
 
 func NewProfService() run.Service {
@@ -62,6 +62,7 @@ func (p *pprofService) Serve() run.StopNotify {
 	go func() {
 		p.l.Info().Str("listenAddr", p.listenAddr).Msg("Start pprof server")
 		_ = http.ListenAndServe(p.listenAddr, nil)
+		p.stopCh <- struct{}{}
 	}()
 
 	return p.stopCh
diff --git a/api/common/id.go b/banyand/observability/type.go
similarity index 79%
copy from api/common/id.go
copy to banyand/observability/type.go
index 9964786..1b26330 100644
--- a/api/common/id.go
+++ b/banyand/observability/type.go
@@ -14,15 +14,14 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
+//
+package observability
 
-package common
-
-import "github.com/apache/skywalking-banyandb/pkg/convert"
-
-type SeriesID uint64
-type ShardID uint32
-type ItemID uint64
+type Statistics struct {
+	MemBytes    int64
+	MaxMemBytes int64
+}
 
-func (s SeriesID) Marshal() []byte {
-	return convert.Uint64ToBytes(uint64(s))
+type Observable interface {
+	Stats() Statistics
 }
diff --git a/banyand/stream/metadata.go b/banyand/stream/metadata.go
index 97972ba..f1b8f79 100644
--- a/banyand/stream/metadata.go
+++ b/banyand/stream/metadata.go
@@ -21,6 +21,7 @@ import (
 	"context"
 	"time"
 
+	"github.com/apache/skywalking-banyandb/api/common"
 	"github.com/apache/skywalking-banyandb/api/event"
 	commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
 	databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
@@ -195,7 +196,10 @@ func (s *supplier) ResourceSchema(repo metadata.Repo, md *commonv1.Metadata) (re
 
 func (s *supplier) OpenDB(groupSchema *commonv1.Group) (tsdb.Database, error) {
 	return tsdb.OpenDatabase(
-		context.TODO(),
+		context.WithValue(context.Background(), common.PositionKey, common.Position{
+			Module:   "stream",
+			Database: groupSchema.Metadata.Name,
+		}),
 		tsdb.DatabaseOpts{
 			Location: s.path,
 			ShardNum: groupSchema.ResourceOpts.ShardNum,
diff --git a/banyand/tsdb/block.go b/banyand/tsdb/block.go
index f93b446..cafe15c 100644
--- a/banyand/tsdb/block.go
+++ b/banyand/tsdb/block.go
@@ -29,6 +29,7 @@ import (
 
 	"github.com/apache/skywalking-banyandb/api/common"
 	"github.com/apache/skywalking-banyandb/banyand/kv"
+	"github.com/apache/skywalking-banyandb/banyand/observability"
 	"github.com/apache/skywalking-banyandb/banyand/tsdb/bucket"
 	"github.com/apache/skywalking-banyandb/pkg/encoding"
 	"github.com/apache/skywalking-banyandb/pkg/index"
@@ -39,12 +40,13 @@ import (
 )
 
 type block struct {
-	path   string
-	l      *logger.Logger
-	suffix string
-	ref    *z.Closer
-	lock   sync.RWMutex
-	closed *atomic.Bool
+	path     string
+	l        *logger.Logger
+	suffix   string
+	ref      *z.Closer
+	lock     sync.RWMutex
+	closed   *atomic.Bool
+	position common.Position
 
 	store         kv.TimeSeriesStore
 	primaryIndex  index.Store
@@ -90,6 +92,10 @@ func newBlock(ctx context.Context, opts blockOpts) (b *block, err error) {
 		closed:         atomic.NewBool(true),
 		encodingMethod: encodingMethodObject.(EncodingMethod),
 	}
+	position := ctx.Value(common.PositionKey)
+	if position != nil {
+		b.position = position.(common.Position)
+	}
 	return b, err
 }
 
@@ -172,6 +178,12 @@ func (b *block) String() string {
 	return b.Reporter.String()
 }
 
+func (b *block) stats() (names []string, stats []observability.Statistics) {
+	names = append(names, "main", "p-idx", "si-idx", "sl-idx")
+	stats = append(stats, b.store.Stats(), b.primaryIndex.Stats(), b.invertedIndex.Stats(), b.lsmIndex.Stats())
+	return names, stats
+}
+
 type blockDelegate interface {
 	io.Closer
 	contains(ts time.Time) bool
diff --git a/banyand/tsdb/index/writer.go b/banyand/tsdb/index/writer.go
index 424155f..bb459a0 100644
--- a/banyand/tsdb/index/writer.go
+++ b/banyand/tsdb/index/writer.go
@@ -198,6 +198,9 @@ func getIndexValue(ruleIndex *partition.IndexRuleLocator, value Value) (val []by
 	var existInt bool
 	for _, tIndex := range ruleIndex.TagIndices {
 		tag, err := partition.GetTagByOffset(value.TagFamilies, tIndex.FamilyOffset, tIndex.TagOffset)
+		if errors.Is(err, partition.ErrMalformedElement) {
+			continue
+		}
 		if err != nil {
 			return nil, false, errors.WithMessagef(err, "index rule:%v", ruleIndex.Rule.Metadata)
 		}
diff --git a/banyand/tsdb/metric.go b/banyand/tsdb/metric.go
new file mode 100644
index 0000000..9c4005b
--- /dev/null
+++ b/banyand/tsdb/metric.go
@@ -0,0 +1,111 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+package tsdb
+
+import (
+	"time"
+
+	"github.com/prometheus/client_golang/prometheus"
+	"github.com/prometheus/client_golang/prometheus/promauto"
+
+	"github.com/apache/skywalking-banyandb/banyand/observability"
+)
+
+const statInterval = 5 * time.Second
+
+var (
+	mtBytes    *prometheus.GaugeVec
+	maxMtBytes *prometheus.GaugeVec
+)
+
+func init() {
+	labels := []string{"module", "database", "shard", "component"}
+	mtBytes = promauto.NewGaugeVec(
+		prometheus.GaugeOpts{
+			Name: "banyand_memtables_bytes",
+			Help: "Memory table size in bytes",
+		},
+		labels,
+	)
+	maxMtBytes = promauto.NewGaugeVec(
+		prometheus.GaugeOpts{
+			Name: "banyand_memtables_max_bytes",
+			Help: "Maximum amount of memory table available in bytes",
+		},
+		labels,
+	)
+}
+
+func (s *shard) runStat() {
+	go func() {
+		ticker := time.NewTicker(statInterval)
+		defer ticker.Stop()
+		for {
+			select {
+			case <-ticker.C:
+				s.stat()
+			case <-s.stopCh:
+				return
+			}
+		}
+	}()
+}
+
+func (s *shard) stat() {
+	seriesStat := s.seriesDatabase.Stats()
+	s.curry(mtBytes).WithLabelValues("series").Set(float64(seriesStat.MemBytes))
+	s.curry(maxMtBytes).WithLabelValues("series").Set(float64(seriesStat.MaxMemBytes))
+	segStats := observability.Statistics{}
+	blockStats := observability.Statistics{}
+	localIndexStats := observability.Statistics{}
+	for _, seg := range s.segmentController.segments() {
+		segStat := seg.Stats()
+		segStats.MaxMemBytes += segStat.MaxMemBytes
+		segStats.MemBytes += segStat.MemBytes
+		for _, b := range seg.blockController.blocks() {
+			if b.closed.Load() {
+				continue
+			}
+			names, bss := b.stats()
+			for i, bs := range bss {
+				if names[i] == "main" {
+					blockStats.MaxMemBytes += bs.MaxMemBytes
+					blockStats.MemBytes += bs.MemBytes
+					continue
+				}
+				localIndexStats.MaxMemBytes += bs.MaxMemBytes
+				localIndexStats.MemBytes += bs.MemBytes
+			}
+
+		}
+	}
+	s.curry(mtBytes).WithLabelValues("global-index").Set(float64(segStats.MemBytes))
+	s.curry(maxMtBytes).WithLabelValues("global-index").Set(float64(segStats.MaxMemBytes))
+	s.curry(mtBytes).WithLabelValues("block").Set(float64(blockStats.MemBytes))
+	s.curry(maxMtBytes).WithLabelValues("block").Set(float64(blockStats.MaxMemBytes))
+	s.curry(mtBytes).WithLabelValues("local-index").Set(float64(localIndexStats.MemBytes))
+	s.curry(maxMtBytes).WithLabelValues("local-index").Set(float64(localIndexStats.MaxMemBytes))
+}
+
+func (s *shard) curry(gv *prometheus.GaugeVec) *prometheus.GaugeVec {
+	return gv.MustCurryWith(prometheus.Labels{
+		"module":   s.position.Module,
+		"database": s.position.Database,
+		"shard":    s.position.Shard,
+	})
+}
diff --git a/banyand/tsdb/scope.go b/banyand/tsdb/scope.go
index cbbe687..e2d0ff2 100644
--- a/banyand/tsdb/scope.go
+++ b/banyand/tsdb/scope.go
@@ -17,7 +17,10 @@
 //
 package tsdb
 
-import "github.com/apache/skywalking-banyandb/api/common"
+import (
+	"github.com/apache/skywalking-banyandb/api/common"
+	"github.com/apache/skywalking-banyandb/banyand/observability"
+)
 
 var _ Shard = (*ScopedShard)(nil)
 
@@ -64,6 +67,10 @@ type scopedSeriesDatabase struct {
 	delegated SeriesDatabase
 }
 
+func (sdd *scopedSeriesDatabase) Stats() observability.Statistics {
+	return sdd.delegated.Stats()
+}
+
 func (sdd *scopedSeriesDatabase) Close() error {
 	return nil
 }
diff --git a/banyand/tsdb/segment.go b/banyand/tsdb/segment.go
index 2db1f91..bf50669 100644
--- a/banyand/tsdb/segment.go
+++ b/banyand/tsdb/segment.go
@@ -24,7 +24,9 @@ import (
 	"sync"
 	"time"
 
+	"github.com/apache/skywalking-banyandb/api/common"
 	"github.com/apache/skywalking-banyandb/banyand/kv"
+	"github.com/apache/skywalking-banyandb/banyand/observability"
 	"github.com/apache/skywalking-banyandb/banyand/tsdb/bucket"
 	"github.com/apache/skywalking-banyandb/pkg/logger"
 	"github.com/apache/skywalking-banyandb/pkg/timestamp"
@@ -54,16 +56,17 @@ func openSegment(ctx context.Context, startTime time.Time, path, suffix string,
 	id := uint16(segmentSize.Unit)<<12 | ((uint16(suffixInteger) << 4) >> 4)
 	timeRange := timestamp.NewTimeRange(startTime, segmentSize.NextTime(startTime), true, false)
 	l := logger.Fetch(ctx, "segment")
+	segCtx := context.WithValue(ctx, logger.ContextKey, l)
 	s = &segment{
 		id:              id,
 		path:            path,
 		suffix:          suffix,
 		l:               l,
-		blockController: newBlockController(id, path, timeRange, blockSize, l, blockQueue),
+		blockController: newBlockController(segCtx, id, path, timeRange, blockSize, l, blockQueue),
 		TimeRange:       timeRange,
 		Reporter:        bucket.NewTimeBasedReporter(timeRange),
 	}
-	err = s.blockController.open(context.WithValue(ctx, logger.ContextKey, s.l))
+	err = s.blockController.open()
 	if err != nil {
 		return nil, err
 	}
@@ -98,8 +101,13 @@ func (s segment) String() string {
 	return s.Reporter.String()
 }
 
+func (s *segment) Stats() observability.Statistics {
+	return s.globalIndex.Stats()
+}
+
 type blockController struct {
 	sync.RWMutex
+	segCtx       context.Context
 	segID        uint16
 	location     string
 	segTimeRange timestamp.TimeRange
@@ -110,9 +118,10 @@ type blockController struct {
 	l *logger.Logger
 }
 
-func newBlockController(segID uint16, location string, segTimeRange timestamp.TimeRange,
+func newBlockController(segCtx context.Context, segID uint16, location string, segTimeRange timestamp.TimeRange,
 	blockSize IntervalRule, l *logger.Logger, blockQueue bucket.Queue) *blockController {
 	return &blockController{
+		segCtx:       segCtx,
 		segID:        segID,
 		location:     location,
 		blockSize:    blockSize,
@@ -140,7 +149,7 @@ func (bc *blockController) Current() bucket.Reporter {
 
 func (bc *blockController) Next() (bucket.Reporter, error) {
 	b := bc.Current().(*block)
-	reporter, err := bc.create(context.TODO(),
+	reporter, err := bc.create(
 		bc.blockSize.NextTime(b.Start))
 	if errors.Is(err, ErrEndOfSegment) {
 		return nil, bucket.ErrNoMoreBucket
@@ -282,19 +291,19 @@ func (bc *blockController) startTime(suffix string) (time.Time, error) {
 	panic("invalid interval unit")
 }
 
-func (bc *blockController) open(ctx context.Context) error {
+func (bc *blockController) open() error {
 	err := WalkDir(
 		bc.location,
 		segPathPrefix,
 		func(suffix, absolutePath string) error {
-			_, err := bc.load(ctx, suffix, absolutePath)
+			_, err := bc.load(suffix, absolutePath)
 			return err
 		})
 	if err != nil {
 		return err
 	}
 	if bc.Current() == nil {
-		b, err := bc.create(ctx, time.Now())
+		b, err := bc.create(time.Now())
 		if err != nil {
 			return err
 		}
@@ -305,7 +314,7 @@ func (bc *blockController) open(ctx context.Context) error {
 	return nil
 }
 
-func (bc *blockController) create(ctx context.Context, startTime time.Time) (*block, error) {
+func (bc *blockController) create(startTime time.Time) (*block, error) {
 	if startTime.Before(bc.segTimeRange.Start) {
 		startTime = bc.segTimeRange.Start
 	}
@@ -317,21 +326,26 @@ func (bc *blockController) create(ctx context.Context, startTime time.Time) (*bl
 	if err != nil {
 		return nil, err
 	}
-	return bc.load(ctx, suffix, segPath)
+	return bc.load(suffix, segPath)
 }
 
-func (bc *blockController) load(ctx context.Context, suffix, path string) (b *block, err error) {
+func (bc *blockController) load(suffix, path string) (b *block, err error) {
 	starTime, err := bc.startTime(suffix)
 	if err != nil {
 		return nil, err
 	}
-	if b, err = newBlock(ctx, blockOpts{
-		segID:     bc.segID,
-		path:      path,
-		startTime: starTime,
-		suffix:    suffix,
-		blockSize: bc.blockSize,
-	}); err != nil {
+	if b, err = newBlock(
+		common.SetPosition(bc.segCtx, func(p common.Position) common.Position {
+			p.Block = suffix
+			return p
+		}),
+		blockOpts{
+			segID:     bc.segID,
+			path:      path,
+			startTime: starTime,
+			suffix:    suffix,
+			blockSize: bc.blockSize,
+		}); err != nil {
 		return nil, err
 	}
 	bc.Lock()
diff --git a/banyand/tsdb/seriesdb.go b/banyand/tsdb/seriesdb.go
index ed62632..4afb96f 100644
--- a/banyand/tsdb/seriesdb.go
+++ b/banyand/tsdb/seriesdb.go
@@ -28,6 +28,7 @@ import (
 
 	"github.com/apache/skywalking-banyandb/api/common"
 	"github.com/apache/skywalking-banyandb/banyand/kv"
+	"github.com/apache/skywalking-banyandb/banyand/observability"
 	"github.com/apache/skywalking-banyandb/pkg/convert"
 	"github.com/apache/skywalking-banyandb/pkg/logger"
 	"github.com/apache/skywalking-banyandb/pkg/timestamp"
@@ -119,6 +120,7 @@ func (p Path) Prepand(entry Entry) Path {
 }
 
 type SeriesDatabase interface {
+	observability.Observable
 	io.Closer
 	GetByID(id common.SeriesID) (Series, error)
 	Get(entity Entity) (Series, error)
@@ -250,6 +252,10 @@ func (s *seriesDB) context() context.Context {
 	return context.WithValue(context.Background(), logger.ContextKey, s.l)
 }
 
+func (s *seriesDB) Stats() observability.Statistics {
+	return s.seriesMetadata.Stats()
+}
+
 func (s *seriesDB) Close() error {
 	return s.seriesMetadata.Close()
 }
diff --git a/banyand/tsdb/shard.go b/banyand/tsdb/shard.go
index cd4120c..3299e57 100644
--- a/banyand/tsdb/shard.go
+++ b/banyand/tsdb/shard.go
@@ -36,13 +36,15 @@ const defaultBlockQueueSize = 1 << 4
 var _ Shard = (*shard)(nil)
 
 type shard struct {
-	l  *logger.Logger
-	id common.ShardID
+	l        *logger.Logger
+	id       common.ShardID
+	position common.Position
 
 	seriesDatabase        SeriesDatabase
 	indexDatabase         IndexDatabase
 	segmentController     *segmentController
 	segmentManageStrategy *bucket.Strategy
+	stopCh                chan struct{}
 }
 
 func OpenShard(ctx context.Context, id common.ShardID,
@@ -56,7 +58,12 @@ func OpenShard(ctx context.Context, id common.ShardID,
 	if openedBlockSize < 1 {
 		openedBlockSize = defaultBlockQueueSize
 	}
-	sc, err := newSegmentController(path, segmentSize, blockSize, openedBlockSize, l)
+	shardCtx := context.WithValue(ctx, logger.ContextKey, l)
+	shardCtx = common.SetPosition(shardCtx, func(p common.Position) common.Position {
+		p.Shard = strconv.Itoa(int(id))
+		return p
+	})
+	sc, err := newSegmentController(shardCtx, path, segmentSize, blockSize, openedBlockSize, l)
 	if err != nil {
 		return nil, errors.Wrapf(err, "create the segment controller of the shard %d", int(id))
 	}
@@ -64,9 +71,9 @@ func OpenShard(ctx context.Context, id common.ShardID,
 		id:                id,
 		segmentController: sc,
 		l:                 l,
+		stopCh:            make(chan struct{}),
 	}
-	shardCtx := context.WithValue(ctx, logger.ContextKey, s.l)
-	err = s.segmentController.open(shardCtx)
+	err = s.segmentController.open()
 	if err != nil {
 		return nil, err
 	}
@@ -89,6 +96,11 @@ func OpenShard(ctx context.Context, id common.ShardID,
 		return nil, err
 	}
 	s.segmentManageStrategy.Run()
+	position := shardCtx.Value(common.PositionKey)
+	if position != nil {
+		s.position = position.(common.Position)
+	}
+	s.runStat()
 	return s, nil
 }
 
@@ -123,7 +135,9 @@ func (s *shard) State() (shardState ShardState) {
 func (s *shard) Close() error {
 	s.segmentManageStrategy.Close()
 	s.segmentController.close()
-	return s.seriesDatabase.Close()
+	err := s.seriesDatabase.Close()
+	close(s.stopCh)
+	return err
 }
 
 type IntervalUnit int
@@ -178,6 +192,7 @@ func (ir IntervalRule) EstimatedDuration() time.Duration {
 
 type segmentController struct {
 	sync.RWMutex
+	shardCtx    context.Context
 	location    string
 	segmentSize IntervalRule
 	blockSize   IntervalRule
@@ -187,8 +202,10 @@ type segmentController struct {
 	l *logger.Logger
 }
 
-func newSegmentController(location string, segmentSize, blockSize IntervalRule, openedBlockSize int, l *logger.Logger) (*segmentController, error) {
+func newSegmentController(shardCtx context.Context, location string,
+	segmentSize, blockSize IntervalRule, openedBlockSize int, l *logger.Logger) (*segmentController, error) {
 	sc := &segmentController{
+		shardCtx:    shardCtx,
 		location:    location,
 		segmentSize: segmentSize,
 		blockSize:   blockSize,
@@ -264,7 +281,7 @@ func (sc *segmentController) Current() bucket.Reporter {
 
 func (sc *segmentController) Next() (bucket.Reporter, error) {
 	seg := sc.Current().(*segment)
-	reporter, err := sc.create(context.TODO(), sc.Format(
+	reporter, err := sc.create(sc.Format(
 		sc.segmentSize.NextTime(seg.Start)))
 	if errors.Is(err, ErrEndOfSegment) {
 		return nil, bucket.ErrNoMoreBucket
@@ -307,12 +324,12 @@ func (sc *segmentController) Parse(value string) (time.Time, error) {
 	panic("invalid interval unit")
 }
 
-func (sc *segmentController) open(ctx context.Context) error {
+func (sc *segmentController) open() error {
 	err := WalkDir(
 		sc.location,
 		segPathPrefix,
 		func(suffix, absolutePath string) error {
-			_, err := sc.load(ctx, suffix, absolutePath)
+			_, err := sc.load(suffix, absolutePath)
 			if errors.Is(err, ErrEndOfSegment) {
 				return nil
 			}
@@ -322,7 +339,7 @@ func (sc *segmentController) open(ctx context.Context) error {
 		return err
 	}
 	if sc.Current() == nil {
-		_, err = sc.create(ctx, sc.Format(time.Now()))
+		_, err = sc.create(sc.Format(time.Now()))
 		if err != nil {
 			return err
 		}
@@ -330,20 +347,23 @@ func (sc *segmentController) open(ctx context.Context) error {
 	return nil
 }
 
-func (sc *segmentController) create(ctx context.Context, suffix string) (*segment, error) {
+func (sc *segmentController) create(suffix string) (*segment, error) {
 	segPath, err := mkdir(segTemplate, sc.location, suffix)
 	if err != nil {
 		return nil, err
 	}
-	return sc.load(ctx, suffix, segPath)
+	return sc.load(suffix, segPath)
 }
 
-func (sc *segmentController) load(ctx context.Context, suffix, path string) (seg *segment, err error) {
+func (sc *segmentController) load(suffix, path string) (seg *segment, err error) {
 	startTime, err := sc.Parse(suffix)
 	if err != nil {
 		return nil, err
 	}
-	seg, err = openSegment(ctx, startTime, path, suffix, sc.segmentSize, sc.blockSize, sc.blockQueue)
+	seg, err = openSegment(common.SetPosition(sc.shardCtx, func(p common.Position) common.Position {
+		p.Segment = suffix
+		return p
+	}), startTime, path, suffix, sc.segmentSize, sc.blockSize, sc.blockQueue)
 	if err != nil {
 		return nil, err
 	}
diff --git a/go.mod b/go.mod
index a445dc6..e8db453 100644
--- a/go.mod
+++ b/go.mod
@@ -25,7 +25,7 @@ require (
 	go.etcd.io/etcd/server/v3 v3.5.0
 	go.uber.org/multierr v1.7.0
 	golang.org/x/net v0.0.0-20210716203947-853a461950ff // indirect
-	golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c // indirect
+	golang.org/x/sys v0.0.0-20220114195835-da31bd327af9 // indirect
 	google.golang.org/genproto v0.0.0-20210722135532-667f2b7c528f // indirect
 	google.golang.org/grpc v1.39.0
 	google.golang.org/protobuf v1.27.1
@@ -34,7 +34,7 @@ require (
 require (
 	github.com/beorn7/perks v1.0.1 // indirect
 	github.com/bits-and-blooms/bitset v1.2.0 // indirect
-	github.com/cespare/xxhash/v2 v2.1.1 // indirect
+	github.com/cespare/xxhash/v2 v2.1.2 // indirect
 	github.com/coreos/go-semver v0.3.0 // indirect
 	github.com/coreos/go-systemd/v22 v22.3.2 // indirect
 	github.com/davecgh/go-spew v1.1.1 // indirect
@@ -55,19 +55,19 @@ require (
 	github.com/hashicorp/hcl v1.0.0 // indirect
 	github.com/inconshreveable/mousetrap v1.0.0 // indirect
 	github.com/jonboulle/clockwork v0.2.2 // indirect
-	github.com/json-iterator/go v1.1.11 // indirect
+	github.com/json-iterator/go v1.1.12 // indirect
 	github.com/magiconair/properties v1.8.5 // indirect
 	github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
 	github.com/mitchellh/mapstructure v1.4.1 // indirect
 	github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
-	github.com/modern-go/reflect2 v1.0.1 // indirect
+	github.com/modern-go/reflect2 v1.0.2 // indirect
 	github.com/mschoch/smat v0.2.0 // indirect
 	github.com/pelletier/go-toml v1.9.3 // indirect
 	github.com/pmezard/go-difflib v1.0.0 // indirect
-	github.com/prometheus/client_golang v1.11.0 // indirect
+	github.com/prometheus/client_golang v1.12.1
 	github.com/prometheus/client_model v0.2.0 // indirect
-	github.com/prometheus/common v0.26.0 // indirect
-	github.com/prometheus/procfs v0.6.0 // indirect
+	github.com/prometheus/common v0.32.1 // indirect
+	github.com/prometheus/procfs v0.7.3 // indirect
 	github.com/sirupsen/logrus v1.7.0 // indirect
 	github.com/soheilhy/cmux v0.1.5 // indirect
 	github.com/spf13/afero v1.6.0 // indirect
@@ -107,4 +107,4 @@ require (
 	sigs.k8s.io/yaml v1.2.0 // indirect
 )
 
-replace github.com/dgraph-io/badger/v3 v3.2011.1 => github.com/SkyAPM/badger/v3 v3.0.0-20220117144524-89781ee8a386
+replace github.com/dgraph-io/badger/v3 v3.2011.1 => github.com/SkyAPM/badger/v3 v3.0.0-20220331082938-b907904d6089
diff --git a/go.sum b/go.sum
index ee831c2..4189cd5 100644
--- a/go.sum
+++ b/go.sum
@@ -47,8 +47,8 @@ github.com/RoaringBitmap/gocroaring v0.4.0/go.mod h1:NieMwz7ZqwU2DD73/vvYwv7r4eW
 github.com/RoaringBitmap/real-roaring-datasets v0.0.0-20190726190000-eb7c87156f76/go.mod h1:oM0MHmQ3nDsq609SS36p+oYbRi16+oVvU2Bw4Ipv0SE=
 github.com/RoaringBitmap/roaring v0.9.1 h1:5PRizBmoN/PfV17nPNQou4dHQ7NcJi8FO/bihdYyCEM=
 github.com/RoaringBitmap/roaring v0.9.1/go.mod h1:h1B7iIUOmnAeb5ytYMvnHJwxMc6LUrwBnzXWRuqTQUc=
-github.com/SkyAPM/badger/v3 v3.0.0-20220117144524-89781ee8a386 h1:2VmCNyGlF/yY+Ev9bxCiPcKGaWSEQDiRCRiCJYWPh7o=
-github.com/SkyAPM/badger/v3 v3.0.0-20220117144524-89781ee8a386/go.mod h1:RHo4/GmYcKKh5Lxu63wLEMHJ70Pac2JqZRYGhlyAo2M=
+github.com/SkyAPM/badger/v3 v3.0.0-20220331082938-b907904d6089 h1:MEqSQssrKviX+uU4xfAcIvvpFeTamsT3cPc9k4Ol4Ks=
+github.com/SkyAPM/badger/v3 v3.0.0-20220331082938-b907904d6089/go.mod h1:Q0luV7nB94o3Bl4hYqAPy03+QTtLxs9pWdUEQb0i0K0=
 github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
 github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
 github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
@@ -76,8 +76,9 @@ github.com/certifi/gocertifi v0.0.0-20200922220541-2c3bb06c6054 h1:uH66TXeswKn5P
 github.com/certifi/gocertifi v0.0.0-20200922220541-2c3bb06c6054/go.mod h1:sGbDF6GwGcLpkNXPUTkMRoywsNa/ol15pxFe6ERfguA=
 github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko=
 github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
-github.com/cespare/xxhash/v2 v2.1.1 h1:6MnRN8NT7+YBpUIWxHtefFZOKTAPgGjpQSxqLNn0+qY=
 github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
+github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE=
+github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
 github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI=
 github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI=
 github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU=
@@ -277,8 +278,9 @@ github.com/jonboulle/clockwork v0.2.2/go.mod h1:Pkfl5aHPm1nk2H9h0bjmnJD/BcgbGXUB
 github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4=
 github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
 github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
-github.com/json-iterator/go v1.1.11 h1:uVUAXhF2To8cbw/3xN3pxj6kk7TYKs98NIrTqPlMWAQ=
 github.com/json-iterator/go v1.1.11/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
+github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
+github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
 github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU=
 github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk=
 github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo=
@@ -325,8 +327,9 @@ github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJ
 github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg=
 github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
 github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
-github.com/modern-go/reflect2 v1.0.1 h1:9f412s+6RmYXLWZSEzVVgPGK7C2PphHj5RJrvfx9AWI=
 github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
+github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M=
+github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
 github.com/mschoch/smat v0.2.0 h1:8imxQsjDm8yFEAVBe7azKmKSgzSkZXDuKkSq9374khM=
 github.com/mschoch/smat v0.2.0/go.mod h1:kc9mz7DoBKqDyiRL7VZN8KvXQMWeTaVnttLRXOlotKw=
 github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
@@ -364,8 +367,9 @@ github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXP
 github.com/prometheus/client_golang v0.9.3/go.mod h1:/TN21ttK/J9q6uSwhBd54HahCDft0ttaMvbicHlPoso=
 github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo=
 github.com/prometheus/client_golang v1.7.1/go.mod h1:PY5Wy2awLA44sXw4AOSfFBetzPP4j5+D6mVACh+pe2M=
-github.com/prometheus/client_golang v1.11.0 h1:HNkLOAEQMIDv/K+04rukrLx6ch7msSRwf3/SASFAGtQ=
 github.com/prometheus/client_golang v1.11.0/go.mod h1:Z6t4BnS23TR94PD6BsDNk8yVqroYurpAkEiz0P2BEV0=
+github.com/prometheus/client_golang v1.12.1 h1:ZiaPsmm9uiBeaSMRznKsCDNtPCS0T3JVDGF+06gjBzk=
+github.com/prometheus/client_golang v1.12.1/go.mod h1:3Z9XVyYiZYEO+YQWt3RD2R3jrbd179Rt297l4aS6nDY=
 github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo=
 github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
 github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
@@ -375,14 +379,16 @@ github.com/prometheus/common v0.0.0-20181113130724-41aa239b4cce/go.mod h1:daVV7q
 github.com/prometheus/common v0.4.0/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4=
 github.com/prometheus/common v0.4.1/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4=
 github.com/prometheus/common v0.10.0/go.mod h1:Tlit/dnDKsSWFlCLTWaA1cyBgKHSMdTB80sz/V91rCo=
-github.com/prometheus/common v0.26.0 h1:iMAkS2TDoNWnKM+Kopnx/8tnEStIfpYA0ur0xQzzhMQ=
 github.com/prometheus/common v0.26.0/go.mod h1:M7rCNAaPfAosfx8veZJCuw84e35h3Cfd9VFqTh1DIvc=
+github.com/prometheus/common v0.32.1 h1:hWIdL3N2HoUx3B8j3YN9mWor0qhY/NlEKZEaXxuIRh4=
+github.com/prometheus/common v0.32.1/go.mod h1:vu+V0TpY+O6vW9J44gczi3Ap/oXXR10b+M/gUGO4Hls=
 github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk=
 github.com/prometheus/procfs v0.0.0-20190507164030-5867b95ac084/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA=
 github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA=
 github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU=
-github.com/prometheus/procfs v0.6.0 h1:mxy4L2jP6qMonqmq+aTtOx1ifVWUgG/TAmntgbh3xv4=
 github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA=
+github.com/prometheus/procfs v0.7.3 h1:4jVXhlkAyzOScmCkXBTOLRLTz8EeU+eyjrwB/EPq0VU=
+github.com/prometheus/procfs v0.7.3/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA=
 github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU=
 github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
 github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
@@ -604,6 +610,7 @@ golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v
 golang.org/x/net v0.0.0-20210316092652-d523dce5a7f4/go.mod h1:RBQZq4jEuRlivfhVLdyRGr576XBO4/greRjx4P4O3yc=
 golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
 golang.org/x/net v0.0.0-20210428140749-89ef3d95e781/go.mod h1:OJAsFXCWl8Ukc7SiCT/9KSuxbyM7479/AVlXFRxuMCk=
+golang.org/x/net v0.0.0-20210525063256-abc453219eb5/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
 golang.org/x/net v0.0.0-20210716203947-853a461950ff h1:j2EK/QoxYNBsXI4R7fQkkRUk8y6wnOBI+6hgPdP/6Ds=
 golang.org/x/net v0.0.0-20210716203947-853a461950ff/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
 golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
@@ -617,8 +624,9 @@ golang.org/x/oauth2 v0.0.0-20201208152858-08078c50e5b5/go.mod h1:KelEdhl1UZF7XfJ
 golang.org/x/oauth2 v0.0.0-20210218202405-ba52d332ba99/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A=
 golang.org/x/oauth2 v0.0.0-20210220000619-9bb904979d93/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A=
 golang.org/x/oauth2 v0.0.0-20210313182246-cd4f82c27b84/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A=
-golang.org/x/oauth2 v0.0.0-20210402161424-2e8d93401602 h1:0Ja1LBD+yisY6RWM/BH7TJVXWsSjs2VwBSmvSX4HdBc=
 golang.org/x/oauth2 v0.0.0-20210402161424-2e8d93401602/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A=
+golang.org/x/oauth2 v0.0.0-20210514164344-f6687ab2804c h1:pkQiBZBvdos9qq4wBAHqlzuZHEXo07pqV06ef90u1WI=
+golang.org/x/oauth2 v0.0.0-20210514164344-f6687ab2804c/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A=
 golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
 golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
 golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
@@ -688,8 +696,8 @@ golang.org/x/sys v0.0.0-20210403161142-5e06dd20ab57/go.mod h1:h1NjWce9XRLGQEsW7w
 golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
 golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
-golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c h1:F1jZWGFhYfh0Ci55sIpILtKKK8p3i2/krTr0H1rg74I=
-golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.0.0-20220114195835-da31bd327af9 h1:XfKQ4OlFl8okEOr5UvAqFRVj8pY/4yfcXrddB8qAbU0=
+golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
 golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
 golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
 golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
diff --git a/pkg/index/index.go b/pkg/index/index.go
index 39bd478..82acfd8 100644
--- a/pkg/index/index.go
+++ b/pkg/index/index.go
@@ -25,6 +25,7 @@ import (
 
 	"github.com/apache/skywalking-banyandb/api/common"
 	modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+	"github.com/apache/skywalking-banyandb/banyand/observability"
 	"github.com/apache/skywalking-banyandb/pkg/convert"
 	"github.com/apache/skywalking-banyandb/pkg/index/metadata"
 	"github.com/apache/skywalking-banyandb/pkg/index/posting"
@@ -172,6 +173,7 @@ type Searcher interface {
 }
 
 type Store interface {
+	observability.Observable
 	io.Closer
 	Writer
 	Searcher
diff --git a/pkg/index/inverted/field_map.go b/pkg/index/inverted/field_map.go
index e94df9a..9b719e0 100644
--- a/pkg/index/inverted/field_map.go
+++ b/pkg/index/inverted/field_map.go
@@ -21,6 +21,7 @@ import (
 	"sync"
 
 	"github.com/apache/skywalking-banyandb/api/common"
+	"github.com/apache/skywalking-banyandb/banyand/observability"
 	"github.com/apache/skywalking-banyandb/pkg/convert"
 	"github.com/apache/skywalking-banyandb/pkg/index"
 )
@@ -72,6 +73,15 @@ func (fm *fieldMap) put(fv index.Field, id common.ItemID) error {
 	return pm.value.put(fv.Term, id)
 }
 
+func (fm *fieldMap) Stats() (s observability.Statistics) {
+	for _, pv := range fm.repo {
+		// 8 is the size of key
+		s.MemBytes += 8
+		s.MemBytes += pv.value.Stats().MemBytes
+	}
+	return s
+}
+
 type termContainer struct {
 	key   index.FieldKey
 	value *termMap
diff --git a/pkg/index/inverted/inverted.go b/pkg/index/inverted/inverted.go
index 7387b63..9af43fd 100644
--- a/pkg/index/inverted/inverted.go
+++ b/pkg/index/inverted/inverted.go
@@ -27,6 +27,7 @@ import (
 	"github.com/apache/skywalking-banyandb/api/common"
 	modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
 	"github.com/apache/skywalking-banyandb/banyand/kv"
+	"github.com/apache/skywalking-banyandb/banyand/observability"
 	"github.com/apache/skywalking-banyandb/pkg/index"
 	"github.com/apache/skywalking-banyandb/pkg/index/metadata"
 	"github.com/apache/skywalking-banyandb/pkg/index/posting"
@@ -95,6 +96,19 @@ func (s *store) Flush() error {
 	return nil
 }
 
+func (s *store) Stats() (stat observability.Statistics) {
+	s.rwMutex.RLock()
+	defer s.rwMutex.RUnlock()
+	//TODO: add MaxMem
+	main := s.memTable.Stats()
+	stat.MemBytes += main.MemBytes
+	if s.immutableMemTable != nil {
+		sub := s.immutableMemTable.Stats()
+		stat.MemBytes += sub.MemBytes
+	}
+	return stat
+}
+
 func (s *store) MatchField(fieldKey index.FieldKey) (posting.List, error) {
 	return s.Range(fieldKey, index.RangeOpts{})
 }
diff --git a/pkg/index/inverted/mem.go b/pkg/index/inverted/mem.go
index aed68fe..279a9d4 100644
--- a/pkg/index/inverted/mem.go
+++ b/pkg/index/inverted/mem.go
@@ -26,6 +26,7 @@ import (
 	"github.com/apache/skywalking-banyandb/api/common"
 	modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
 	"github.com/apache/skywalking-banyandb/banyand/kv"
+	"github.com/apache/skywalking-banyandb/banyand/observability"
 	"github.com/apache/skywalking-banyandb/pkg/index"
 	"github.com/apache/skywalking-banyandb/pkg/index/metadata"
 	"github.com/apache/skywalking-banyandb/pkg/index/posting"
@@ -51,6 +52,10 @@ func (m *memTable) Write(field index.Field, itemID common.ItemID) error {
 	return m.fields.put(field, itemID)
 }
 
+func (m *memTable) Stats() observability.Statistics {
+	return m.fields.Stats()
+}
+
 var _ index.FieldIterator = (*fIterator)(nil)
 
 type fIterator struct {
diff --git a/pkg/index/inverted/term_map.go b/pkg/index/inverted/term_map.go
index e683837..a3d9adb 100644
--- a/pkg/index/inverted/term_map.go
+++ b/pkg/index/inverted/term_map.go
@@ -21,6 +21,7 @@ import (
 	"sync"
 
 	"github.com/apache/skywalking-banyandb/api/common"
+	"github.com/apache/skywalking-banyandb/banyand/observability"
 	"github.com/apache/skywalking-banyandb/pkg/convert"
 	"github.com/apache/skywalking-banyandb/pkg/index"
 	"github.com/apache/skywalking-banyandb/pkg/index/posting"
@@ -82,3 +83,12 @@ func (p *termMap) getEntry(key []byte) *index.PostingValue {
 	}
 	return v
 }
+
+func (p *termMap) Stats() (s observability.Statistics) {
+	for _, pv := range p.repo {
+		// 8 is the size of key
+		s.MemBytes += 8
+		s.MemBytes += pv.Value.SizeInBytes()
+	}
+	return s
+}
diff --git a/pkg/index/lsm/lsm.go b/pkg/index/lsm/lsm.go
index 58f8f93..12d5a6c 100644
--- a/pkg/index/lsm/lsm.go
+++ b/pkg/index/lsm/lsm.go
@@ -22,6 +22,7 @@ import (
 
 	"github.com/apache/skywalking-banyandb/api/common"
 	"github.com/apache/skywalking-banyandb/banyand/kv"
+	"github.com/apache/skywalking-banyandb/banyand/observability"
 	"github.com/apache/skywalking-banyandb/pkg/convert"
 	"github.com/apache/skywalking-banyandb/pkg/index"
 	"github.com/apache/skywalking-banyandb/pkg/index/metadata"
@@ -36,6 +37,10 @@ type store struct {
 	l            *logger.Logger
 }
 
+func (s *store) Stats() observability.Statistics {
+	return s.lsm.Stats()
+}
+
 func (s *store) Close() error {
 	return multierr.Combine(s.lsm.Close(), s.termMetadata.Close())
 }
diff --git a/pkg/index/posting/posting.go b/pkg/index/posting/posting.go
index 593e61c..6f810fd 100644
--- a/pkg/index/posting/posting.go
+++ b/pkg/index/posting/posting.go
@@ -64,6 +64,8 @@ type List interface {
 	Marshall() ([]byte, error)
 
 	Unmarshall(data []byte) error
+
+	SizeInBytes() int64
 }
 
 type Iterator interface {
diff --git a/pkg/index/posting/roaring/roaring.go b/pkg/index/posting/roaring/roaring.go
index d121f89..eaab6a3 100644
--- a/pkg/index/posting/roaring/roaring.go
+++ b/pkg/index/posting/roaring/roaring.go
@@ -187,6 +187,10 @@ func (p *postingsList) Reset() {
 	p.bitmap.Clear()
 }
 
+func (p *postingsList) SizeInBytes() int64 {
+	return int64(p.bitmap.GetSizeInBytes())
+}
+
 type roaringIterator struct {
 	iter    roaring64.IntIterable64
 	current common.ItemID
diff --git a/pkg/test/stream/traffic/searchable_template.json b/pkg/test/stream/traffic/searchable_template.json
new file mode 100644
index 0000000..f50a757
--- /dev/null
+++ b/pkg/test/stream/traffic/searchable_template.json
@@ -0,0 +1,12 @@
+{
+  "tags": [
+    {"str":{"value": "1"}},
+    {"int":{"value": 0}},
+    {"str":{"value": "webapp_id"}},
+    {"str":{"value": "10.0.0.1_id"}},
+    {"str":{"value": "/home_id"}},
+    {"int":{"value": 1000}},
+    {"int":{"value": 1622933202000000000}}
+  ]
+}
+  
\ No newline at end of file
diff --git a/pkg/test/stream/traffic/traffic_test.go b/pkg/test/stream/traffic/traffic_test.go
new file mode 100644
index 0000000..dc8b45a
--- /dev/null
+++ b/pkg/test/stream/traffic/traffic_test.go
@@ -0,0 +1,163 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+package traffic_test
+
+import (
+	"context"
+	"crypto/rand"
+	_ "embed"
+	"strconv"
+	"time"
+
+	"github.com/golang/protobuf/jsonpb"
+	"google.golang.org/protobuf/proto"
+	"google.golang.org/protobuf/types/known/timestamppb"
+
+	commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+	modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+	streamv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
+	"github.com/apache/skywalking-banyandb/banyand/metadata"
+	"github.com/apache/skywalking-banyandb/banyand/stream"
+	"github.com/apache/skywalking-banyandb/pkg/logger"
+	"github.com/apache/skywalking-banyandb/pkg/run"
+	stream_test "github.com/apache/skywalking-banyandb/pkg/test/stream"
+)
+
+type Service interface {
+	run.PreRunner
+	run.Config
+	run.Service
+}
+
+var _ Service = (*service)(nil)
+
+// service to preload stream
+type service struct {
+	metaSvc   metadata.Service
+	streamSvc stream.Service
+	l         *logger.Logger
+	stopCh    chan struct{}
+}
+
+func NewService(ctx context.Context, metaSvc metadata.Service, streamSvc stream.Service) Service {
+	return &service{
+		metaSvc:   metaSvc,
+		streamSvc: streamSvc,
+		stopCh:    make(chan struct{}),
+	}
+}
+
+func (s *service) Name() string {
+	return "stream-traffic-gen"
+}
+
+func (s *service) FlagSet() *run.FlagSet {
+	flagS := run.NewFlagSet("stream-traffic-gen")
+	return flagS
+}
+
+func (*service) Validate() error {
+	return nil
+}
+
+func (s *service) PreRun() error {
+	s.l = logger.GetLogger(s.Name())
+	return stream_test.PreloadSchema(s.metaSvc.SchemaRegistry())
+}
+
+//go:embed searchable_template.json
+var content string
+
+func (s *service) Serve() run.StopNotify {
+	searchTagFamily := &modelv1.TagFamilyForWrite{}
+	err := jsonpb.UnmarshalString(content, searchTagFamily)
+	if err != nil {
+		s.l.Err(err).Msg("unmarshal template")
+		close(s.stopCh)
+		return s.stopCh
+	}
+	stream, err := s.streamSvc.Stream(&commonv1.Metadata{
+		Name:  "sw",
+		Group: "default",
+	})
+	if err != nil {
+		s.l.Err(err).Msg("get the stream")
+		close(s.stopCh)
+		return s.stopCh
+	}
+	for i := 0; i < 5; i++ {
+		svc := "svc-" + strconv.Itoa(i)
+		for j := 0; j < 10; j++ {
+			instance := "instance-" + strconv.Itoa(j)
+			go func() {
+				ticker := time.NewTicker(1 * time.Second)
+				defer ticker.Stop()
+				for {
+					select {
+					case <-ticker.C:
+						tf := proto.Clone(searchTagFamily).(*modelv1.TagFamilyForWrite)
+						tf.Tags[2] = &modelv1.TagValue{
+							Value: &modelv1.TagValue_Str{
+								Str: &modelv1.Str{
+									Value: svc,
+								},
+							},
+						}
+						tf.Tags[3] = &modelv1.TagValue{
+							Value: &modelv1.TagValue_Str{
+								Str: &modelv1.Str{
+									Value: instance,
+								},
+							},
+						}
+						data := make([]byte, 10*1024)
+						_, _ = rand.Read(data)
+						e := &streamv1.ElementValue{
+							ElementId: strconv.Itoa(i),
+							Timestamp: timestamppb.Now(),
+							TagFamilies: []*modelv1.TagFamilyForWrite{
+								{
+									Tags: []*modelv1.TagValue{
+										{
+											Value: &modelv1.TagValue_BinaryData{
+												BinaryData: data,
+											},
+										},
+									},
+								},
+							},
+						}
+						e.TagFamilies = append(e.TagFamilies, tf)
+						errInner := stream.Write(e)
+						if err != nil {
+							s.l.Err(errInner).Msg("writing to the stream")
+						}
+					case <-s.stopCh:
+						return
+					}
+				}
+			}()
+
+		}
+
+	}
+	return s.stopCh
+}
+
+func (s *service) GracefulStop() {
+}