You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ha...@apache.org on 2022/04/01 01:41:28 UTC

[skywalking-banyandb] branch observability created (now 18bb574)

This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a change to branch observability
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git.


      at 18bb574  Improve the observability of banyandb:   * Add the prometheus exporter   * Export memory table gauges   * Introduce a self running generator to generate stream     data

This branch includes the following new commits:

     new 18bb574  Improve the observability of banyandb:   * Add the prometheus exporter   * Export memory table gauges   * Introduce a self running generator to generate stream     data

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


[skywalking-banyandb] 01/01: Improve the observability of banyandb: * Add the prometheus exporter * Export memory table gauges * Introduce a self running generator to generate stream data

Posted by ha...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch observability
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit 18bb574205cab1fe4c0139e0c886e7452ae9243c
Author: Gao Hongtao <ha...@gmail.com>
AuthorDate: Fri Apr 1 01:33:23 2022 +0000

    Improve the observability of banyandb:
      * Add the prometheus exporter
      * Export memory table gauges
      * Introduce a self running generator to generate stream
        data
    
    Signed-off-by: Gao Hongtao <ha...@gmail.com>
---
 api/common/id.go                                   |  43 +++++-
 banyand/internal/cmd/standalone.go                 |   6 +-
 banyand/kv/badger.go                               |  17 +++
 banyand/kv/kv.go                                   |   3 +
 banyand/measure/metadata.go                        |   6 +-
 banyand/{prof/pprof.go => observability/metric.go} |  38 ++---
 banyand/{prof => observability}/pprof.go           |   7 +-
 api/common/id.go => banyand/observability/type.go  |  17 +--
 banyand/stream/metadata.go                         |   6 +-
 banyand/tsdb/block.go                              |  24 ++-
 banyand/tsdb/index/writer.go                       |   3 +
 banyand/tsdb/metric.go                             | 111 ++++++++++++++
 banyand/tsdb/scope.go                              |   9 +-
 banyand/tsdb/segment.go                            |  48 +++---
 banyand/tsdb/seriesdb.go                           |   6 +
 banyand/tsdb/shard.go                              |  50 +++++--
 go.mod                                             |  16 +-
 go.sum                                             |  30 ++--
 pkg/index/index.go                                 |   2 +
 pkg/index/inverted/field_map.go                    |  10 ++
 pkg/index/inverted/inverted.go                     |  14 ++
 pkg/index/inverted/mem.go                          |   5 +
 pkg/index/inverted/term_map.go                     |  10 ++
 pkg/index/lsm/lsm.go                               |   5 +
 pkg/index/posting/posting.go                       |   2 +
 pkg/index/posting/roaring/roaring.go               |   4 +
 pkg/test/stream/traffic/searchable_template.json   |  12 ++
 pkg/test/stream/traffic/traffic_test.go            | 163 +++++++++++++++++++++
 28 files changed, 574 insertions(+), 93 deletions(-)

diff --git a/api/common/id.go b/api/common/id.go
index 9964786..f6a55b4 100644
--- a/api/common/id.go
+++ b/api/common/id.go
@@ -17,7 +17,13 @@
 
 package common
 
-import "github.com/apache/skywalking-banyandb/pkg/convert"
+import (
+	"context"
+
+	"github.com/prometheus/client_golang/prometheus"
+
+	"github.com/apache/skywalking-banyandb/pkg/convert"
+)
 
 type SeriesID uint64
 type ShardID uint32
@@ -26,3 +32,38 @@ type ItemID uint64
 func (s SeriesID) Marshal() []byte {
 	return convert.Uint64ToBytes(uint64(s))
 }
+
+var PositionKey = contextPositionKey{}
+
+type contextPositionKey struct{}
+
+type Position struct {
+	Module   string
+	Database string
+	Shard    string
+	Segment  string
+	Block    string
+	KV       string
+}
+
+func (p Position) Labels() prometheus.Labels {
+	return prometheus.Labels{
+		"module":   p.Module,
+		"database": p.Database,
+		"shard":    p.Shard,
+		"seg":      p.Segment,
+		"block":    p.Block,
+		"kv":       p.KV,
+	}
+}
+
+func SetPosition(ctx context.Context, fn func(p Position) Position) context.Context {
+	val := ctx.Value(PositionKey)
+	var p Position
+	if val == nil {
+		p = Position{}
+	} else {
+		p = val.(Position)
+	}
+	return context.WithValue(ctx, PositionKey, fn(p))
+}
diff --git a/banyand/internal/cmd/standalone.go b/banyand/internal/cmd/standalone.go
index f9cde55..2919212 100644
--- a/banyand/internal/cmd/standalone.go
+++ b/banyand/internal/cmd/standalone.go
@@ -27,7 +27,7 @@ import (
 	"github.com/apache/skywalking-banyandb/banyand/liaison"
 	"github.com/apache/skywalking-banyandb/banyand/measure"
 	"github.com/apache/skywalking-banyandb/banyand/metadata"
-	"github.com/apache/skywalking-banyandb/banyand/prof"
+	"github.com/apache/skywalking-banyandb/banyand/observability"
 	"github.com/apache/skywalking-banyandb/banyand/query"
 	"github.com/apache/skywalking-banyandb/banyand/queue"
 	"github.com/apache/skywalking-banyandb/banyand/stream"
@@ -74,7 +74,8 @@ func newStandaloneCmd() *cobra.Command {
 	if err != nil {
 		l.Fatal().Err(err).Msg("failed to initiate Endpoint transport layer")
 	}
-	profSvc := prof.NewProfService()
+	profSvc := observability.NewProfService()
+	metricSvc := observability.NewMetricService()
 
 	// Meta the run Group units.
 	g.Register(
@@ -86,6 +87,7 @@ func newStandaloneCmd() *cobra.Command {
 		streamSvc,
 		q,
 		tcp,
+		metricSvc,
 		profSvc,
 	)
 	logging := logger.Logging{}
diff --git a/banyand/kv/badger.go b/banyand/kv/badger.go
index de6394b..4482721 100644
--- a/banyand/kv/badger.go
+++ b/banyand/kv/badger.go
@@ -27,6 +27,7 @@ import (
 	"github.com/dgraph-io/badger/v3/bydb"
 	"github.com/dgraph-io/badger/v3/y"
 
+	"github.com/apache/skywalking-banyandb/banyand/observability"
 	"github.com/apache/skywalking-banyandb/pkg/encoding"
 	"github.com/apache/skywalking-banyandb/pkg/logger"
 )
@@ -47,6 +48,18 @@ type badgerTSS struct {
 	badger.TSet
 }
 
+func (b *badgerTSS) Stats() (s observability.Statistics) {
+	return badgerStats(b.db)
+}
+
+func badgerStats(db *badger.DB) (s observability.Statistics) {
+	stat := db.Stats()
+	return observability.Statistics{
+		MemBytes:    stat.MemBytes,
+		MaxMemBytes: db.Opts().MemTableSize,
+	}
+}
+
 func (b *badgerTSS) Close() error {
 	if b.db != nil && !b.db.IsClosed() {
 		return b.db.Close()
@@ -110,6 +123,10 @@ type badgerDB struct {
 	db      *badger.DB
 }
 
+func (b *badgerDB) Stats() observability.Statistics {
+	return badgerStats(b.db)
+}
+
 func (b *badgerDB) Handover(iterator Iterator) error {
 	return b.db.HandoverIterator(&mergedIter{
 		delegated: iterator,
diff --git a/banyand/kv/kv.go b/banyand/kv/kv.go
index fdca551..103959f 100644
--- a/banyand/kv/kv.go
+++ b/banyand/kv/kv.go
@@ -25,6 +25,7 @@ import (
 	"github.com/dgraph-io/badger/v3"
 	"github.com/pkg/errors"
 
+	"github.com/apache/skywalking-banyandb/banyand/observability"
 	"github.com/apache/skywalking-banyandb/pkg/encoding"
 	"github.com/apache/skywalking-banyandb/pkg/logger"
 )
@@ -62,6 +63,7 @@ type Reader interface {
 
 // Store is a common kv storage with auto-generated key
 type Store interface {
+	observability.Observable
 	io.Closer
 	Writer
 	Reader
@@ -84,6 +86,7 @@ type TimeSeriesReader interface {
 
 // TimeSeriesStore is time series storage
 type TimeSeriesStore interface {
+	observability.Observable
 	io.Closer
 	TimeSeriesWriter
 	TimeSeriesReader
diff --git a/banyand/measure/metadata.go b/banyand/measure/metadata.go
index 2a2e96e..4858b30 100644
--- a/banyand/measure/metadata.go
+++ b/banyand/measure/metadata.go
@@ -21,6 +21,7 @@ import (
 	"context"
 	"time"
 
+	"github.com/apache/skywalking-banyandb/api/common"
 	"github.com/apache/skywalking-banyandb/api/event"
 	commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
 	databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
@@ -195,7 +196,10 @@ func (s *supplier) ResourceSchema(repo metadata.Repo, md *commonv1.Metadata) (re
 
 func (s *supplier) OpenDB(groupSchema *commonv1.Group) (tsdb.Database, error) {
 	return tsdb.OpenDatabase(
-		context.TODO(),
+		context.WithValue(context.Background(), common.PositionKey, common.Position{
+			Module:   "stream",
+			Database: groupSchema.Metadata.Name,
+		}),
 		tsdb.DatabaseOpts{
 			Location: s.path,
 			ShardNum: groupSchema.ResourceOpts.ShardNum,
diff --git a/banyand/prof/pprof.go b/banyand/observability/metric.go
similarity index 61%
copy from banyand/prof/pprof.go
copy to banyand/observability/metric.go
index ac27f62..29f353b 100644
--- a/banyand/prof/pprof.go
+++ b/banyand/observability/metric.go
@@ -14,59 +14,61 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-
-package prof
+//
+package observability
 
 import (
 	"net/http"
-	// Register pprof package
-	_ "net/http/pprof"
+
+	"github.com/prometheus/client_golang/prometheus/promhttp"
 
 	"github.com/apache/skywalking-banyandb/pkg/logger"
 	"github.com/apache/skywalking-banyandb/pkg/run"
 )
 
 var (
-	_ run.Service = (*pprofService)(nil)
-	_ run.Config  = (*pprofService)(nil)
+	_ run.Service = (*metricService)(nil)
+	_ run.Config  = (*metricService)(nil)
 )
 
-func NewProfService() run.Service {
-	return &pprofService{
+func NewMetricService() run.Service {
+	return &metricService{
 		stopCh: make(chan struct{}),
 	}
 }
 
-type pprofService struct {
+type metricService struct {
 	listenAddr string
 	stopCh     chan struct{}
 	l          *logger.Logger
 }
 
-func (p *pprofService) FlagSet() *run.FlagSet {
-	flagSet := run.NewFlagSet("prof")
-	flagSet.StringVar(&p.listenAddr, "pprof-listener-addr", "127.0.0.1:6060", "listen addr for pprof")
+func (p *metricService) FlagSet() *run.FlagSet {
+	flagSet := run.NewFlagSet("observability")
+	flagSet.StringVar(&p.listenAddr, "observability-listener-addr", ":2121", "listen addr for observability")
 	return flagSet
 }
 
-func (p *pprofService) Validate() error {
+func (p *metricService) Validate() error {
 	return nil
 }
 
-func (p *pprofService) Name() string {
-	return "pprof-service"
+func (p *metricService) Name() string {
+	return "metric-service"
 }
 
-func (p *pprofService) Serve() run.StopNotify {
+func (p *metricService) Serve() run.StopNotify {
 	p.l = logger.GetLogger(p.Name())
+	http.Handle("/metrics", promhttp.Handler())
 	go func() {
-		p.l.Info().Str("listenAddr", p.listenAddr).Msg("Start pprof server")
+		p.l.Info().Str("listenAddr", p.listenAddr).Msg("Start metric server")
 		_ = http.ListenAndServe(p.listenAddr, nil)
+		p.stopCh <- struct{}{}
 	}()
 
 	return p.stopCh
 }
 
-func (p *pprofService) GracefulStop() {
+func (p *metricService) GracefulStop() {
 	close(p.stopCh)
 }
diff --git a/banyand/prof/pprof.go b/banyand/observability/pprof.go
similarity index 93%
rename from banyand/prof/pprof.go
rename to banyand/observability/pprof.go
index ac27f62..6e9bfe8 100644
--- a/banyand/prof/pprof.go
+++ b/banyand/observability/pprof.go
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package prof
+package observability
 
 import (
 	"net/http"
@@ -27,8 +27,8 @@ import (
 )
 
 var (
-	_ run.Service = (*pprofService)(nil)
-	_ run.Config  = (*pprofService)(nil)
+	_ run.Service = (*metricService)(nil)
+	_ run.Config  = (*metricService)(nil)
 )
 
 func NewProfService() run.Service {
@@ -62,6 +62,7 @@ func (p *pprofService) Serve() run.StopNotify {
 	go func() {
 		p.l.Info().Str("listenAddr", p.listenAddr).Msg("Start pprof server")
 		_ = http.ListenAndServe(p.listenAddr, nil)
+		p.stopCh <- struct{}{}
 	}()
 
 	return p.stopCh
diff --git a/api/common/id.go b/banyand/observability/type.go
similarity index 79%
copy from api/common/id.go
copy to banyand/observability/type.go
index 9964786..1b26330 100644
--- a/api/common/id.go
+++ b/banyand/observability/type.go
@@ -14,15 +14,14 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
+//
+package observability
 
-package common
-
-import "github.com/apache/skywalking-banyandb/pkg/convert"
-
-type SeriesID uint64
-type ShardID uint32
-type ItemID uint64
+type Statistics struct {
+	MemBytes    int64
+	MaxMemBytes int64
+}
 
-func (s SeriesID) Marshal() []byte {
-	return convert.Uint64ToBytes(uint64(s))
+type Observable interface {
+	Stats() Statistics
 }
diff --git a/banyand/stream/metadata.go b/banyand/stream/metadata.go
index 97972ba..f1b8f79 100644
--- a/banyand/stream/metadata.go
+++ b/banyand/stream/metadata.go
@@ -21,6 +21,7 @@ import (
 	"context"
 	"time"
 
+	"github.com/apache/skywalking-banyandb/api/common"
 	"github.com/apache/skywalking-banyandb/api/event"
 	commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
 	databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
@@ -195,7 +196,10 @@ func (s *supplier) ResourceSchema(repo metadata.Repo, md *commonv1.Metadata) (re
 
 func (s *supplier) OpenDB(groupSchema *commonv1.Group) (tsdb.Database, error) {
 	return tsdb.OpenDatabase(
-		context.TODO(),
+		context.WithValue(context.Background(), common.PositionKey, common.Position{
+			Module:   "stream",
+			Database: groupSchema.Metadata.Name,
+		}),
 		tsdb.DatabaseOpts{
 			Location: s.path,
 			ShardNum: groupSchema.ResourceOpts.ShardNum,
diff --git a/banyand/tsdb/block.go b/banyand/tsdb/block.go
index f93b446..cafe15c 100644
--- a/banyand/tsdb/block.go
+++ b/banyand/tsdb/block.go
@@ -29,6 +29,7 @@ import (
 
 	"github.com/apache/skywalking-banyandb/api/common"
 	"github.com/apache/skywalking-banyandb/banyand/kv"
+	"github.com/apache/skywalking-banyandb/banyand/observability"
 	"github.com/apache/skywalking-banyandb/banyand/tsdb/bucket"
 	"github.com/apache/skywalking-banyandb/pkg/encoding"
 	"github.com/apache/skywalking-banyandb/pkg/index"
@@ -39,12 +40,13 @@ import (
 )
 
 type block struct {
-	path   string
-	l      *logger.Logger
-	suffix string
-	ref    *z.Closer
-	lock   sync.RWMutex
-	closed *atomic.Bool
+	path     string
+	l        *logger.Logger
+	suffix   string
+	ref      *z.Closer
+	lock     sync.RWMutex
+	closed   *atomic.Bool
+	position common.Position
 
 	store         kv.TimeSeriesStore
 	primaryIndex  index.Store
@@ -90,6 +92,10 @@ func newBlock(ctx context.Context, opts blockOpts) (b *block, err error) {
 		closed:         atomic.NewBool(true),
 		encodingMethod: encodingMethodObject.(EncodingMethod),
 	}
+	position := ctx.Value(common.PositionKey)
+	if position != nil {
+		b.position = position.(common.Position)
+	}
 	return b, err
 }
 
@@ -172,6 +178,12 @@ func (b *block) String() string {
 	return b.Reporter.String()
 }
 
+func (b *block) stats() (names []string, stats []observability.Statistics) {
+	names = append(names, "main", "p-idx", "si-idx", "sl-idx")
+	stats = append(stats, b.store.Stats(), b.primaryIndex.Stats(), b.invertedIndex.Stats(), b.lsmIndex.Stats())
+	return names, stats
+}
+
 type blockDelegate interface {
 	io.Closer
 	contains(ts time.Time) bool
diff --git a/banyand/tsdb/index/writer.go b/banyand/tsdb/index/writer.go
index 424155f..bb459a0 100644
--- a/banyand/tsdb/index/writer.go
+++ b/banyand/tsdb/index/writer.go
@@ -198,6 +198,9 @@ func getIndexValue(ruleIndex *partition.IndexRuleLocator, value Value) (val []by
 	var existInt bool
 	for _, tIndex := range ruleIndex.TagIndices {
 		tag, err := partition.GetTagByOffset(value.TagFamilies, tIndex.FamilyOffset, tIndex.TagOffset)
+		if errors.Is(err, partition.ErrMalformedElement) {
+			continue
+		}
 		if err != nil {
 			return nil, false, errors.WithMessagef(err, "index rule:%v", ruleIndex.Rule.Metadata)
 		}
diff --git a/banyand/tsdb/metric.go b/banyand/tsdb/metric.go
new file mode 100644
index 0000000..9c4005b
--- /dev/null
+++ b/banyand/tsdb/metric.go
@@ -0,0 +1,111 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+package tsdb
+
+import (
+	"time"
+
+	"github.com/prometheus/client_golang/prometheus"
+	"github.com/prometheus/client_golang/prometheus/promauto"
+
+	"github.com/apache/skywalking-banyandb/banyand/observability"
+)
+
+const statInterval = 5 * time.Second
+
+var (
+	mtBytes    *prometheus.GaugeVec
+	maxMtBytes *prometheus.GaugeVec
+)
+
+func init() {
+	labels := []string{"module", "database", "shard", "component"}
+	mtBytes = promauto.NewGaugeVec(
+		prometheus.GaugeOpts{
+			Name: "banyand_memtables_bytes",
+			Help: "Memory table size in bytes",
+		},
+		labels,
+	)
+	maxMtBytes = promauto.NewGaugeVec(
+		prometheus.GaugeOpts{
+			Name: "banyand_memtables_max_bytes",
+			Help: "Maximum amount of memory table available in bytes",
+		},
+		labels,
+	)
+}
+
+func (s *shard) runStat() {
+	go func() {
+		ticker := time.NewTicker(statInterval)
+		defer ticker.Stop()
+		for {
+			select {
+			case <-ticker.C:
+				s.stat()
+			case <-s.stopCh:
+				return
+			}
+		}
+	}()
+}
+
+func (s *shard) stat() {
+	seriesStat := s.seriesDatabase.Stats()
+	s.curry(mtBytes).WithLabelValues("series").Set(float64(seriesStat.MemBytes))
+	s.curry(maxMtBytes).WithLabelValues("series").Set(float64(seriesStat.MaxMemBytes))
+	segStats := observability.Statistics{}
+	blockStats := observability.Statistics{}
+	localIndexStats := observability.Statistics{}
+	for _, seg := range s.segmentController.segments() {
+		segStat := seg.Stats()
+		segStats.MaxMemBytes += segStat.MaxMemBytes
+		segStats.MemBytes += segStat.MemBytes
+		for _, b := range seg.blockController.blocks() {
+			if b.closed.Load() {
+				continue
+			}
+			names, bss := b.stats()
+			for i, bs := range bss {
+				if names[i] == "main" {
+					blockStats.MaxMemBytes += bs.MaxMemBytes
+					blockStats.MemBytes += bs.MemBytes
+					continue
+				}
+				localIndexStats.MaxMemBytes += bs.MaxMemBytes
+				localIndexStats.MemBytes += bs.MemBytes
+			}
+
+		}
+	}
+	s.curry(mtBytes).WithLabelValues("global-index").Set(float64(segStats.MemBytes))
+	s.curry(maxMtBytes).WithLabelValues("global-index").Set(float64(segStats.MaxMemBytes))
+	s.curry(mtBytes).WithLabelValues("block").Set(float64(blockStats.MemBytes))
+	s.curry(maxMtBytes).WithLabelValues("block").Set(float64(blockStats.MaxMemBytes))
+	s.curry(mtBytes).WithLabelValues("local-index").Set(float64(localIndexStats.MemBytes))
+	s.curry(maxMtBytes).WithLabelValues("local-index").Set(float64(localIndexStats.MaxMemBytes))
+}
+
+func (s *shard) curry(gv *prometheus.GaugeVec) *prometheus.GaugeVec {
+	return gv.MustCurryWith(prometheus.Labels{
+		"module":   s.position.Module,
+		"database": s.position.Database,
+		"shard":    s.position.Shard,
+	})
+}
diff --git a/banyand/tsdb/scope.go b/banyand/tsdb/scope.go
index cbbe687..e2d0ff2 100644
--- a/banyand/tsdb/scope.go
+++ b/banyand/tsdb/scope.go
@@ -17,7 +17,10 @@
 //
 package tsdb
 
-import "github.com/apache/skywalking-banyandb/api/common"
+import (
+	"github.com/apache/skywalking-banyandb/api/common"
+	"github.com/apache/skywalking-banyandb/banyand/observability"
+)
 
 var _ Shard = (*ScopedShard)(nil)
 
@@ -64,6 +67,10 @@ type scopedSeriesDatabase struct {
 	delegated SeriesDatabase
 }
 
+func (sdd *scopedSeriesDatabase) Stats() observability.Statistics {
+	return sdd.delegated.Stats()
+}
+
 func (sdd *scopedSeriesDatabase) Close() error {
 	return nil
 }
diff --git a/banyand/tsdb/segment.go b/banyand/tsdb/segment.go
index 2db1f91..bf50669 100644
--- a/banyand/tsdb/segment.go
+++ b/banyand/tsdb/segment.go
@@ -24,7 +24,9 @@ import (
 	"sync"
 	"time"
 
+	"github.com/apache/skywalking-banyandb/api/common"
 	"github.com/apache/skywalking-banyandb/banyand/kv"
+	"github.com/apache/skywalking-banyandb/banyand/observability"
 	"github.com/apache/skywalking-banyandb/banyand/tsdb/bucket"
 	"github.com/apache/skywalking-banyandb/pkg/logger"
 	"github.com/apache/skywalking-banyandb/pkg/timestamp"
@@ -54,16 +56,17 @@ func openSegment(ctx context.Context, startTime time.Time, path, suffix string,
 	id := uint16(segmentSize.Unit)<<12 | ((uint16(suffixInteger) << 4) >> 4)
 	timeRange := timestamp.NewTimeRange(startTime, segmentSize.NextTime(startTime), true, false)
 	l := logger.Fetch(ctx, "segment")
+	segCtx := context.WithValue(ctx, logger.ContextKey, l)
 	s = &segment{
 		id:              id,
 		path:            path,
 		suffix:          suffix,
 		l:               l,
-		blockController: newBlockController(id, path, timeRange, blockSize, l, blockQueue),
+		blockController: newBlockController(segCtx, id, path, timeRange, blockSize, l, blockQueue),
 		TimeRange:       timeRange,
 		Reporter:        bucket.NewTimeBasedReporter(timeRange),
 	}
-	err = s.blockController.open(context.WithValue(ctx, logger.ContextKey, s.l))
+	err = s.blockController.open()
 	if err != nil {
 		return nil, err
 	}
@@ -98,8 +101,13 @@ func (s segment) String() string {
 	return s.Reporter.String()
 }
 
+func (s *segment) Stats() observability.Statistics {
+	return s.globalIndex.Stats()
+}
+
 type blockController struct {
 	sync.RWMutex
+	segCtx       context.Context
 	segID        uint16
 	location     string
 	segTimeRange timestamp.TimeRange
@@ -110,9 +118,10 @@ type blockController struct {
 	l *logger.Logger
 }
 
-func newBlockController(segID uint16, location string, segTimeRange timestamp.TimeRange,
+func newBlockController(segCtx context.Context, segID uint16, location string, segTimeRange timestamp.TimeRange,
 	blockSize IntervalRule, l *logger.Logger, blockQueue bucket.Queue) *blockController {
 	return &blockController{
+		segCtx:       segCtx,
 		segID:        segID,
 		location:     location,
 		blockSize:    blockSize,
@@ -140,7 +149,7 @@ func (bc *blockController) Current() bucket.Reporter {
 
 func (bc *blockController) Next() (bucket.Reporter, error) {
 	b := bc.Current().(*block)
-	reporter, err := bc.create(context.TODO(),
+	reporter, err := bc.create(
 		bc.blockSize.NextTime(b.Start))
 	if errors.Is(err, ErrEndOfSegment) {
 		return nil, bucket.ErrNoMoreBucket
@@ -282,19 +291,19 @@ func (bc *blockController) startTime(suffix string) (time.Time, error) {
 	panic("invalid interval unit")
 }
 
-func (bc *blockController) open(ctx context.Context) error {
+func (bc *blockController) open() error {
 	err := WalkDir(
 		bc.location,
 		segPathPrefix,
 		func(suffix, absolutePath string) error {
-			_, err := bc.load(ctx, suffix, absolutePath)
+			_, err := bc.load(suffix, absolutePath)
 			return err
 		})
 	if err != nil {
 		return err
 	}
 	if bc.Current() == nil {
-		b, err := bc.create(ctx, time.Now())
+		b, err := bc.create(time.Now())
 		if err != nil {
 			return err
 		}
@@ -305,7 +314,7 @@ func (bc *blockController) open(ctx context.Context) error {
 	return nil
 }
 
-func (bc *blockController) create(ctx context.Context, startTime time.Time) (*block, error) {
+func (bc *blockController) create(startTime time.Time) (*block, error) {
 	if startTime.Before(bc.segTimeRange.Start) {
 		startTime = bc.segTimeRange.Start
 	}
@@ -317,21 +326,26 @@ func (bc *blockController) create(ctx context.Context, startTime time.Time) (*bl
 	if err != nil {
 		return nil, err
 	}
-	return bc.load(ctx, suffix, segPath)
+	return bc.load(suffix, segPath)
 }
 
-func (bc *blockController) load(ctx context.Context, suffix, path string) (b *block, err error) {
+func (bc *blockController) load(suffix, path string) (b *block, err error) {
 	starTime, err := bc.startTime(suffix)
 	if err != nil {
 		return nil, err
 	}
-	if b, err = newBlock(ctx, blockOpts{
-		segID:     bc.segID,
-		path:      path,
-		startTime: starTime,
-		suffix:    suffix,
-		blockSize: bc.blockSize,
-	}); err != nil {
+	if b, err = newBlock(
+		common.SetPosition(bc.segCtx, func(p common.Position) common.Position {
+			p.Block = suffix
+			return p
+		}),
+		blockOpts{
+			segID:     bc.segID,
+			path:      path,
+			startTime: starTime,
+			suffix:    suffix,
+			blockSize: bc.blockSize,
+		}); err != nil {
 		return nil, err
 	}
 	bc.Lock()
diff --git a/banyand/tsdb/seriesdb.go b/banyand/tsdb/seriesdb.go
index ed62632..4afb96f 100644
--- a/banyand/tsdb/seriesdb.go
+++ b/banyand/tsdb/seriesdb.go
@@ -28,6 +28,7 @@ import (
 
 	"github.com/apache/skywalking-banyandb/api/common"
 	"github.com/apache/skywalking-banyandb/banyand/kv"
+	"github.com/apache/skywalking-banyandb/banyand/observability"
 	"github.com/apache/skywalking-banyandb/pkg/convert"
 	"github.com/apache/skywalking-banyandb/pkg/logger"
 	"github.com/apache/skywalking-banyandb/pkg/timestamp"
@@ -119,6 +120,7 @@ func (p Path) Prepand(entry Entry) Path {
 }
 
 type SeriesDatabase interface {
+	observability.Observable
 	io.Closer
 	GetByID(id common.SeriesID) (Series, error)
 	Get(entity Entity) (Series, error)
@@ -250,6 +252,10 @@ func (s *seriesDB) context() context.Context {
 	return context.WithValue(context.Background(), logger.ContextKey, s.l)
 }
 
+func (s *seriesDB) Stats() observability.Statistics {
+	return s.seriesMetadata.Stats()
+}
+
 func (s *seriesDB) Close() error {
 	return s.seriesMetadata.Close()
 }
diff --git a/banyand/tsdb/shard.go b/banyand/tsdb/shard.go
index cd4120c..3299e57 100644
--- a/banyand/tsdb/shard.go
+++ b/banyand/tsdb/shard.go
@@ -36,13 +36,15 @@ const defaultBlockQueueSize = 1 << 4
 var _ Shard = (*shard)(nil)
 
 type shard struct {
-	l  *logger.Logger
-	id common.ShardID
+	l        *logger.Logger
+	id       common.ShardID
+	position common.Position
 
 	seriesDatabase        SeriesDatabase
 	indexDatabase         IndexDatabase
 	segmentController     *segmentController
 	segmentManageStrategy *bucket.Strategy
+	stopCh                chan struct{}
 }
 
 func OpenShard(ctx context.Context, id common.ShardID,
@@ -56,7 +58,12 @@ func OpenShard(ctx context.Context, id common.ShardID,
 	if openedBlockSize < 1 {
 		openedBlockSize = defaultBlockQueueSize
 	}
-	sc, err := newSegmentController(path, segmentSize, blockSize, openedBlockSize, l)
+	shardCtx := context.WithValue(ctx, logger.ContextKey, l)
+	shardCtx = common.SetPosition(shardCtx, func(p common.Position) common.Position {
+		p.Shard = strconv.Itoa(int(id))
+		return p
+	})
+	sc, err := newSegmentController(shardCtx, path, segmentSize, blockSize, openedBlockSize, l)
 	if err != nil {
 		return nil, errors.Wrapf(err, "create the segment controller of the shard %d", int(id))
 	}
@@ -64,9 +71,9 @@ func OpenShard(ctx context.Context, id common.ShardID,
 		id:                id,
 		segmentController: sc,
 		l:                 l,
+		stopCh:            make(chan struct{}),
 	}
-	shardCtx := context.WithValue(ctx, logger.ContextKey, s.l)
-	err = s.segmentController.open(shardCtx)
+	err = s.segmentController.open()
 	if err != nil {
 		return nil, err
 	}
@@ -89,6 +96,11 @@ func OpenShard(ctx context.Context, id common.ShardID,
 		return nil, err
 	}
 	s.segmentManageStrategy.Run()
+	position := shardCtx.Value(common.PositionKey)
+	if position != nil {
+		s.position = position.(common.Position)
+	}
+	s.runStat()
 	return s, nil
 }
 
@@ -123,7 +135,9 @@ func (s *shard) State() (shardState ShardState) {
 func (s *shard) Close() error {
 	s.segmentManageStrategy.Close()
 	s.segmentController.close()
-	return s.seriesDatabase.Close()
+	err := s.seriesDatabase.Close()
+	close(s.stopCh)
+	return err
 }
 
 type IntervalUnit int
@@ -178,6 +192,7 @@ func (ir IntervalRule) EstimatedDuration() time.Duration {
 
 type segmentController struct {
 	sync.RWMutex
+	shardCtx    context.Context
 	location    string
 	segmentSize IntervalRule
 	blockSize   IntervalRule
@@ -187,8 +202,10 @@ type segmentController struct {
 	l *logger.Logger
 }
 
-func newSegmentController(location string, segmentSize, blockSize IntervalRule, openedBlockSize int, l *logger.Logger) (*segmentController, error) {
+func newSegmentController(shardCtx context.Context, location string,
+	segmentSize, blockSize IntervalRule, openedBlockSize int, l *logger.Logger) (*segmentController, error) {
 	sc := &segmentController{
+		shardCtx:    shardCtx,
 		location:    location,
 		segmentSize: segmentSize,
 		blockSize:   blockSize,
@@ -264,7 +281,7 @@ func (sc *segmentController) Current() bucket.Reporter {
 
 func (sc *segmentController) Next() (bucket.Reporter, error) {
 	seg := sc.Current().(*segment)
-	reporter, err := sc.create(context.TODO(), sc.Format(
+	reporter, err := sc.create(sc.Format(
 		sc.segmentSize.NextTime(seg.Start)))
 	if errors.Is(err, ErrEndOfSegment) {
 		return nil, bucket.ErrNoMoreBucket
@@ -307,12 +324,12 @@ func (sc *segmentController) Parse(value string) (time.Time, error) {
 	panic("invalid interval unit")
 }
 
-func (sc *segmentController) open(ctx context.Context) error {
+func (sc *segmentController) open() error {
 	err := WalkDir(
 		sc.location,
 		segPathPrefix,
 		func(suffix, absolutePath string) error {
-			_, err := sc.load(ctx, suffix, absolutePath)
+			_, err := sc.load(suffix, absolutePath)
 			if errors.Is(err, ErrEndOfSegment) {
 				return nil
 			}
@@ -322,7 +339,7 @@ func (sc *segmentController) open(ctx context.Context) error {
 		return err
 	}
 	if sc.Current() == nil {
-		_, err = sc.create(ctx, sc.Format(time.Now()))
+		_, err = sc.create(sc.Format(time.Now()))
 		if err != nil {
 			return err
 		}
@@ -330,20 +347,23 @@ func (sc *segmentController) open(ctx context.Context) error {
 	return nil
 }
 
-func (sc *segmentController) create(ctx context.Context, suffix string) (*segment, error) {
+func (sc *segmentController) create(suffix string) (*segment, error) {
 	segPath, err := mkdir(segTemplate, sc.location, suffix)
 	if err != nil {
 		return nil, err
 	}
-	return sc.load(ctx, suffix, segPath)
+	return sc.load(suffix, segPath)
 }
 
-func (sc *segmentController) load(ctx context.Context, suffix, path string) (seg *segment, err error) {
+func (sc *segmentController) load(suffix, path string) (seg *segment, err error) {
 	startTime, err := sc.Parse(suffix)
 	if err != nil {
 		return nil, err
 	}
-	seg, err = openSegment(ctx, startTime, path, suffix, sc.segmentSize, sc.blockSize, sc.blockQueue)
+	seg, err = openSegment(common.SetPosition(sc.shardCtx, func(p common.Position) common.Position {
+		p.Segment = suffix
+		return p
+	}), startTime, path, suffix, sc.segmentSize, sc.blockSize, sc.blockQueue)
 	if err != nil {
 		return nil, err
 	}
diff --git a/go.mod b/go.mod
index a445dc6..e8db453 100644
--- a/go.mod
+++ b/go.mod
@@ -25,7 +25,7 @@ require (
 	go.etcd.io/etcd/server/v3 v3.5.0
 	go.uber.org/multierr v1.7.0
 	golang.org/x/net v0.0.0-20210716203947-853a461950ff // indirect
-	golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c // indirect
+	golang.org/x/sys v0.0.0-20220114195835-da31bd327af9 // indirect
 	google.golang.org/genproto v0.0.0-20210722135532-667f2b7c528f // indirect
 	google.golang.org/grpc v1.39.0
 	google.golang.org/protobuf v1.27.1
@@ -34,7 +34,7 @@ require (
 require (
 	github.com/beorn7/perks v1.0.1 // indirect
 	github.com/bits-and-blooms/bitset v1.2.0 // indirect
-	github.com/cespare/xxhash/v2 v2.1.1 // indirect
+	github.com/cespare/xxhash/v2 v2.1.2 // indirect
 	github.com/coreos/go-semver v0.3.0 // indirect
 	github.com/coreos/go-systemd/v22 v22.3.2 // indirect
 	github.com/davecgh/go-spew v1.1.1 // indirect
@@ -55,19 +55,19 @@ require (
 	github.com/hashicorp/hcl v1.0.0 // indirect
 	github.com/inconshreveable/mousetrap v1.0.0 // indirect
 	github.com/jonboulle/clockwork v0.2.2 // indirect
-	github.com/json-iterator/go v1.1.11 // indirect
+	github.com/json-iterator/go v1.1.12 // indirect
 	github.com/magiconair/properties v1.8.5 // indirect
 	github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
 	github.com/mitchellh/mapstructure v1.4.1 // indirect
 	github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
-	github.com/modern-go/reflect2 v1.0.1 // indirect
+	github.com/modern-go/reflect2 v1.0.2 // indirect
 	github.com/mschoch/smat v0.2.0 // indirect
 	github.com/pelletier/go-toml v1.9.3 // indirect
 	github.com/pmezard/go-difflib v1.0.0 // indirect
-	github.com/prometheus/client_golang v1.11.0 // indirect
+	github.com/prometheus/client_golang v1.12.1
 	github.com/prometheus/client_model v0.2.0 // indirect
-	github.com/prometheus/common v0.26.0 // indirect
-	github.com/prometheus/procfs v0.6.0 // indirect
+	github.com/prometheus/common v0.32.1 // indirect
+	github.com/prometheus/procfs v0.7.3 // indirect
 	github.com/sirupsen/logrus v1.7.0 // indirect
 	github.com/soheilhy/cmux v0.1.5 // indirect
 	github.com/spf13/afero v1.6.0 // indirect
@@ -107,4 +107,4 @@ require (
 	sigs.k8s.io/yaml v1.2.0 // indirect
 )
 
-replace github.com/dgraph-io/badger/v3 v3.2011.1 => github.com/SkyAPM/badger/v3 v3.0.0-20220117144524-89781ee8a386
+replace github.com/dgraph-io/badger/v3 v3.2011.1 => github.com/SkyAPM/badger/v3 v3.0.0-20220331082938-b907904d6089
diff --git a/go.sum b/go.sum
index ee831c2..4189cd5 100644
--- a/go.sum
+++ b/go.sum
@@ -47,8 +47,8 @@ github.com/RoaringBitmap/gocroaring v0.4.0/go.mod h1:NieMwz7ZqwU2DD73/vvYwv7r4eW
 github.com/RoaringBitmap/real-roaring-datasets v0.0.0-20190726190000-eb7c87156f76/go.mod h1:oM0MHmQ3nDsq609SS36p+oYbRi16+oVvU2Bw4Ipv0SE=
 github.com/RoaringBitmap/roaring v0.9.1 h1:5PRizBmoN/PfV17nPNQou4dHQ7NcJi8FO/bihdYyCEM=
 github.com/RoaringBitmap/roaring v0.9.1/go.mod h1:h1B7iIUOmnAeb5ytYMvnHJwxMc6LUrwBnzXWRuqTQUc=
-github.com/SkyAPM/badger/v3 v3.0.0-20220117144524-89781ee8a386 h1:2VmCNyGlF/yY+Ev9bxCiPcKGaWSEQDiRCRiCJYWPh7o=
-github.com/SkyAPM/badger/v3 v3.0.0-20220117144524-89781ee8a386/go.mod h1:RHo4/GmYcKKh5Lxu63wLEMHJ70Pac2JqZRYGhlyAo2M=
+github.com/SkyAPM/badger/v3 v3.0.0-20220331082938-b907904d6089 h1:MEqSQssrKviX+uU4xfAcIvvpFeTamsT3cPc9k4Ol4Ks=
+github.com/SkyAPM/badger/v3 v3.0.0-20220331082938-b907904d6089/go.mod h1:Q0luV7nB94o3Bl4hYqAPy03+QTtLxs9pWdUEQb0i0K0=
 github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
 github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
 github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
@@ -76,8 +76,9 @@ github.com/certifi/gocertifi v0.0.0-20200922220541-2c3bb06c6054 h1:uH66TXeswKn5P
 github.com/certifi/gocertifi v0.0.0-20200922220541-2c3bb06c6054/go.mod h1:sGbDF6GwGcLpkNXPUTkMRoywsNa/ol15pxFe6ERfguA=
 github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko=
 github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
-github.com/cespare/xxhash/v2 v2.1.1 h1:6MnRN8NT7+YBpUIWxHtefFZOKTAPgGjpQSxqLNn0+qY=
 github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
+github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE=
+github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
 github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI=
 github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI=
 github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU=
@@ -277,8 +278,9 @@ github.com/jonboulle/clockwork v0.2.2/go.mod h1:Pkfl5aHPm1nk2H9h0bjmnJD/BcgbGXUB
 github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4=
 github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
 github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
-github.com/json-iterator/go v1.1.11 h1:uVUAXhF2To8cbw/3xN3pxj6kk7TYKs98NIrTqPlMWAQ=
 github.com/json-iterator/go v1.1.11/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
+github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
+github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
 github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU=
 github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk=
 github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo=
@@ -325,8 +327,9 @@ github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJ
 github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg=
 github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
 github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
-github.com/modern-go/reflect2 v1.0.1 h1:9f412s+6RmYXLWZSEzVVgPGK7C2PphHj5RJrvfx9AWI=
 github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
+github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M=
+github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
 github.com/mschoch/smat v0.2.0 h1:8imxQsjDm8yFEAVBe7azKmKSgzSkZXDuKkSq9374khM=
 github.com/mschoch/smat v0.2.0/go.mod h1:kc9mz7DoBKqDyiRL7VZN8KvXQMWeTaVnttLRXOlotKw=
 github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
@@ -364,8 +367,9 @@ github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXP
 github.com/prometheus/client_golang v0.9.3/go.mod h1:/TN21ttK/J9q6uSwhBd54HahCDft0ttaMvbicHlPoso=
 github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo=
 github.com/prometheus/client_golang v1.7.1/go.mod h1:PY5Wy2awLA44sXw4AOSfFBetzPP4j5+D6mVACh+pe2M=
-github.com/prometheus/client_golang v1.11.0 h1:HNkLOAEQMIDv/K+04rukrLx6ch7msSRwf3/SASFAGtQ=
 github.com/prometheus/client_golang v1.11.0/go.mod h1:Z6t4BnS23TR94PD6BsDNk8yVqroYurpAkEiz0P2BEV0=
+github.com/prometheus/client_golang v1.12.1 h1:ZiaPsmm9uiBeaSMRznKsCDNtPCS0T3JVDGF+06gjBzk=
+github.com/prometheus/client_golang v1.12.1/go.mod h1:3Z9XVyYiZYEO+YQWt3RD2R3jrbd179Rt297l4aS6nDY=
 github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo=
 github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
 github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
@@ -375,14 +379,16 @@ github.com/prometheus/common v0.0.0-20181113130724-41aa239b4cce/go.mod h1:daVV7q
 github.com/prometheus/common v0.4.0/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4=
 github.com/prometheus/common v0.4.1/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4=
 github.com/prometheus/common v0.10.0/go.mod h1:Tlit/dnDKsSWFlCLTWaA1cyBgKHSMdTB80sz/V91rCo=
-github.com/prometheus/common v0.26.0 h1:iMAkS2TDoNWnKM+Kopnx/8tnEStIfpYA0ur0xQzzhMQ=
 github.com/prometheus/common v0.26.0/go.mod h1:M7rCNAaPfAosfx8veZJCuw84e35h3Cfd9VFqTh1DIvc=
+github.com/prometheus/common v0.32.1 h1:hWIdL3N2HoUx3B8j3YN9mWor0qhY/NlEKZEaXxuIRh4=
+github.com/prometheus/common v0.32.1/go.mod h1:vu+V0TpY+O6vW9J44gczi3Ap/oXXR10b+M/gUGO4Hls=
 github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk=
 github.com/prometheus/procfs v0.0.0-20190507164030-5867b95ac084/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA=
 github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA=
 github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU=
-github.com/prometheus/procfs v0.6.0 h1:mxy4L2jP6qMonqmq+aTtOx1ifVWUgG/TAmntgbh3xv4=
 github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA=
+github.com/prometheus/procfs v0.7.3 h1:4jVXhlkAyzOScmCkXBTOLRLTz8EeU+eyjrwB/EPq0VU=
+github.com/prometheus/procfs v0.7.3/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA=
 github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU=
 github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
 github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
@@ -604,6 +610,7 @@ golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v
 golang.org/x/net v0.0.0-20210316092652-d523dce5a7f4/go.mod h1:RBQZq4jEuRlivfhVLdyRGr576XBO4/greRjx4P4O3yc=
 golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
 golang.org/x/net v0.0.0-20210428140749-89ef3d95e781/go.mod h1:OJAsFXCWl8Ukc7SiCT/9KSuxbyM7479/AVlXFRxuMCk=
+golang.org/x/net v0.0.0-20210525063256-abc453219eb5/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
 golang.org/x/net v0.0.0-20210716203947-853a461950ff h1:j2EK/QoxYNBsXI4R7fQkkRUk8y6wnOBI+6hgPdP/6Ds=
 golang.org/x/net v0.0.0-20210716203947-853a461950ff/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
 golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
@@ -617,8 +624,9 @@ golang.org/x/oauth2 v0.0.0-20201208152858-08078c50e5b5/go.mod h1:KelEdhl1UZF7XfJ
 golang.org/x/oauth2 v0.0.0-20210218202405-ba52d332ba99/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A=
 golang.org/x/oauth2 v0.0.0-20210220000619-9bb904979d93/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A=
 golang.org/x/oauth2 v0.0.0-20210313182246-cd4f82c27b84/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A=
-golang.org/x/oauth2 v0.0.0-20210402161424-2e8d93401602 h1:0Ja1LBD+yisY6RWM/BH7TJVXWsSjs2VwBSmvSX4HdBc=
 golang.org/x/oauth2 v0.0.0-20210402161424-2e8d93401602/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A=
+golang.org/x/oauth2 v0.0.0-20210514164344-f6687ab2804c h1:pkQiBZBvdos9qq4wBAHqlzuZHEXo07pqV06ef90u1WI=
+golang.org/x/oauth2 v0.0.0-20210514164344-f6687ab2804c/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A=
 golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
 golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
 golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
@@ -688,8 +696,8 @@ golang.org/x/sys v0.0.0-20210403161142-5e06dd20ab57/go.mod h1:h1NjWce9XRLGQEsW7w
 golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
 golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
-golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c h1:F1jZWGFhYfh0Ci55sIpILtKKK8p3i2/krTr0H1rg74I=
-golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.0.0-20220114195835-da31bd327af9 h1:XfKQ4OlFl8okEOr5UvAqFRVj8pY/4yfcXrddB8qAbU0=
+golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
 golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
 golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
 golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
diff --git a/pkg/index/index.go b/pkg/index/index.go
index 39bd478..82acfd8 100644
--- a/pkg/index/index.go
+++ b/pkg/index/index.go
@@ -25,6 +25,7 @@ import (
 
 	"github.com/apache/skywalking-banyandb/api/common"
 	modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+	"github.com/apache/skywalking-banyandb/banyand/observability"
 	"github.com/apache/skywalking-banyandb/pkg/convert"
 	"github.com/apache/skywalking-banyandb/pkg/index/metadata"
 	"github.com/apache/skywalking-banyandb/pkg/index/posting"
@@ -172,6 +173,7 @@ type Searcher interface {
 }
 
 type Store interface {
+	observability.Observable
 	io.Closer
 	Writer
 	Searcher
diff --git a/pkg/index/inverted/field_map.go b/pkg/index/inverted/field_map.go
index e94df9a..9b719e0 100644
--- a/pkg/index/inverted/field_map.go
+++ b/pkg/index/inverted/field_map.go
@@ -21,6 +21,7 @@ import (
 	"sync"
 
 	"github.com/apache/skywalking-banyandb/api/common"
+	"github.com/apache/skywalking-banyandb/banyand/observability"
 	"github.com/apache/skywalking-banyandb/pkg/convert"
 	"github.com/apache/skywalking-banyandb/pkg/index"
 )
@@ -72,6 +73,15 @@ func (fm *fieldMap) put(fv index.Field, id common.ItemID) error {
 	return pm.value.put(fv.Term, id)
 }
 
+func (fm *fieldMap) Stats() (s observability.Statistics) {
+	for _, pv := range fm.repo {
+		// 8 is the size of key
+		s.MemBytes += 8
+		s.MemBytes += pv.value.Stats().MemBytes
+	}
+	return s
+}
+
 type termContainer struct {
 	key   index.FieldKey
 	value *termMap
diff --git a/pkg/index/inverted/inverted.go b/pkg/index/inverted/inverted.go
index 7387b63..9af43fd 100644
--- a/pkg/index/inverted/inverted.go
+++ b/pkg/index/inverted/inverted.go
@@ -27,6 +27,7 @@ import (
 	"github.com/apache/skywalking-banyandb/api/common"
 	modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
 	"github.com/apache/skywalking-banyandb/banyand/kv"
+	"github.com/apache/skywalking-banyandb/banyand/observability"
 	"github.com/apache/skywalking-banyandb/pkg/index"
 	"github.com/apache/skywalking-banyandb/pkg/index/metadata"
 	"github.com/apache/skywalking-banyandb/pkg/index/posting"
@@ -95,6 +96,19 @@ func (s *store) Flush() error {
 	return nil
 }
 
+func (s *store) Stats() (stat observability.Statistics) {
+	s.rwMutex.RLock()
+	defer s.rwMutex.RUnlock()
+	//TODO: add MaxMem
+	main := s.memTable.Stats()
+	stat.MemBytes += main.MemBytes
+	if s.immutableMemTable != nil {
+		sub := s.immutableMemTable.Stats()
+		stat.MemBytes += sub.MemBytes
+	}
+	return stat
+}
+
 func (s *store) MatchField(fieldKey index.FieldKey) (posting.List, error) {
 	return s.Range(fieldKey, index.RangeOpts{})
 }
diff --git a/pkg/index/inverted/mem.go b/pkg/index/inverted/mem.go
index aed68fe..279a9d4 100644
--- a/pkg/index/inverted/mem.go
+++ b/pkg/index/inverted/mem.go
@@ -26,6 +26,7 @@ import (
 	"github.com/apache/skywalking-banyandb/api/common"
 	modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
 	"github.com/apache/skywalking-banyandb/banyand/kv"
+	"github.com/apache/skywalking-banyandb/banyand/observability"
 	"github.com/apache/skywalking-banyandb/pkg/index"
 	"github.com/apache/skywalking-banyandb/pkg/index/metadata"
 	"github.com/apache/skywalking-banyandb/pkg/index/posting"
@@ -51,6 +52,10 @@ func (m *memTable) Write(field index.Field, itemID common.ItemID) error {
 	return m.fields.put(field, itemID)
 }
 
+func (m *memTable) Stats() observability.Statistics {
+	return m.fields.Stats()
+}
+
 var _ index.FieldIterator = (*fIterator)(nil)
 
 type fIterator struct {
diff --git a/pkg/index/inverted/term_map.go b/pkg/index/inverted/term_map.go
index e683837..a3d9adb 100644
--- a/pkg/index/inverted/term_map.go
+++ b/pkg/index/inverted/term_map.go
@@ -21,6 +21,7 @@ import (
 	"sync"
 
 	"github.com/apache/skywalking-banyandb/api/common"
+	"github.com/apache/skywalking-banyandb/banyand/observability"
 	"github.com/apache/skywalking-banyandb/pkg/convert"
 	"github.com/apache/skywalking-banyandb/pkg/index"
 	"github.com/apache/skywalking-banyandb/pkg/index/posting"
@@ -82,3 +83,12 @@ func (p *termMap) getEntry(key []byte) *index.PostingValue {
 	}
 	return v
 }
+
+func (p *termMap) Stats() (s observability.Statistics) {
+	for _, pv := range p.repo {
+		// 8 is the size of key
+		s.MemBytes += 8
+		s.MemBytes += pv.Value.SizeInBytes()
+	}
+	return s
+}
diff --git a/pkg/index/lsm/lsm.go b/pkg/index/lsm/lsm.go
index 58f8f93..12d5a6c 100644
--- a/pkg/index/lsm/lsm.go
+++ b/pkg/index/lsm/lsm.go
@@ -22,6 +22,7 @@ import (
 
 	"github.com/apache/skywalking-banyandb/api/common"
 	"github.com/apache/skywalking-banyandb/banyand/kv"
+	"github.com/apache/skywalking-banyandb/banyand/observability"
 	"github.com/apache/skywalking-banyandb/pkg/convert"
 	"github.com/apache/skywalking-banyandb/pkg/index"
 	"github.com/apache/skywalking-banyandb/pkg/index/metadata"
@@ -36,6 +37,10 @@ type store struct {
 	l            *logger.Logger
 }
 
+func (s *store) Stats() observability.Statistics {
+	return s.lsm.Stats()
+}
+
 func (s *store) Close() error {
 	return multierr.Combine(s.lsm.Close(), s.termMetadata.Close())
 }
diff --git a/pkg/index/posting/posting.go b/pkg/index/posting/posting.go
index 593e61c..6f810fd 100644
--- a/pkg/index/posting/posting.go
+++ b/pkg/index/posting/posting.go
@@ -64,6 +64,8 @@ type List interface {
 	Marshall() ([]byte, error)
 
 	Unmarshall(data []byte) error
+
+	SizeInBytes() int64
 }
 
 type Iterator interface {
diff --git a/pkg/index/posting/roaring/roaring.go b/pkg/index/posting/roaring/roaring.go
index d121f89..eaab6a3 100644
--- a/pkg/index/posting/roaring/roaring.go
+++ b/pkg/index/posting/roaring/roaring.go
@@ -187,6 +187,10 @@ func (p *postingsList) Reset() {
 	p.bitmap.Clear()
 }
 
+func (p *postingsList) SizeInBytes() int64 {
+	return int64(p.bitmap.GetSizeInBytes())
+}
+
 type roaringIterator struct {
 	iter    roaring64.IntIterable64
 	current common.ItemID
diff --git a/pkg/test/stream/traffic/searchable_template.json b/pkg/test/stream/traffic/searchable_template.json
new file mode 100644
index 0000000..f50a757
--- /dev/null
+++ b/pkg/test/stream/traffic/searchable_template.json
@@ -0,0 +1,12 @@
+{
+  "tags": [
+    {"str":{"value": "1"}},
+    {"int":{"value": 0}},
+    {"str":{"value": "webapp_id"}},
+    {"str":{"value": "10.0.0.1_id"}},
+    {"str":{"value": "/home_id"}},
+    {"int":{"value": 1000}},
+    {"int":{"value": 1622933202000000000}}
+  ]
+}
+  
\ No newline at end of file
diff --git a/pkg/test/stream/traffic/traffic_test.go b/pkg/test/stream/traffic/traffic_test.go
new file mode 100644
index 0000000..dc8b45a
--- /dev/null
+++ b/pkg/test/stream/traffic/traffic_test.go
@@ -0,0 +1,163 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+package traffic_test
+
+import (
+	"context"
+	"crypto/rand"
+	_ "embed"
+	"strconv"
+	"time"
+
+	"github.com/golang/protobuf/jsonpb"
+	"google.golang.org/protobuf/proto"
+	"google.golang.org/protobuf/types/known/timestamppb"
+
+	commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+	modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+	streamv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
+	"github.com/apache/skywalking-banyandb/banyand/metadata"
+	"github.com/apache/skywalking-banyandb/banyand/stream"
+	"github.com/apache/skywalking-banyandb/pkg/logger"
+	"github.com/apache/skywalking-banyandb/pkg/run"
+	stream_test "github.com/apache/skywalking-banyandb/pkg/test/stream"
+)
+
+type Service interface {
+	run.PreRunner
+	run.Config
+	run.Service
+}
+
+var _ Service = (*service)(nil)
+
+// service to preload stream
+type service struct {
+	metaSvc   metadata.Service
+	streamSvc stream.Service
+	l         *logger.Logger
+	stopCh    chan struct{}
+}
+
+func NewService(ctx context.Context, metaSvc metadata.Service, streamSvc stream.Service) Service {
+	return &service{
+		metaSvc:   metaSvc,
+		streamSvc: streamSvc,
+		stopCh:    make(chan struct{}),
+	}
+}
+
+func (s *service) Name() string {
+	return "stream-traffic-gen"
+}
+
+func (s *service) FlagSet() *run.FlagSet {
+	flagS := run.NewFlagSet("stream-traffic-gen")
+	return flagS
+}
+
+func (*service) Validate() error {
+	return nil
+}
+
+func (s *service) PreRun() error {
+	s.l = logger.GetLogger(s.Name())
+	return stream_test.PreloadSchema(s.metaSvc.SchemaRegistry())
+}
+
+//go:embed searchable_template.json
+var content string
+
+func (s *service) Serve() run.StopNotify {
+	searchTagFamily := &modelv1.TagFamilyForWrite{}
+	err := jsonpb.UnmarshalString(content, searchTagFamily)
+	if err != nil {
+		s.l.Err(err).Msg("unmarshal template")
+		close(s.stopCh)
+		return s.stopCh
+	}
+	stream, err := s.streamSvc.Stream(&commonv1.Metadata{
+		Name:  "sw",
+		Group: "default",
+	})
+	if err != nil {
+		s.l.Err(err).Msg("get the stream")
+		close(s.stopCh)
+		return s.stopCh
+	}
+	for i := 0; i < 5; i++ {
+		svc := "svc-" + strconv.Itoa(i)
+		for j := 0; j < 10; j++ {
+			instance := "instance-" + strconv.Itoa(j)
+			go func() {
+				ticker := time.NewTicker(1 * time.Second)
+				defer ticker.Stop()
+				for {
+					select {
+					case <-ticker.C:
+						tf := proto.Clone(searchTagFamily).(*modelv1.TagFamilyForWrite)
+						tf.Tags[2] = &modelv1.TagValue{
+							Value: &modelv1.TagValue_Str{
+								Str: &modelv1.Str{
+									Value: svc,
+								},
+							},
+						}
+						tf.Tags[3] = &modelv1.TagValue{
+							Value: &modelv1.TagValue_Str{
+								Str: &modelv1.Str{
+									Value: instance,
+								},
+							},
+						}
+						data := make([]byte, 10*1024)
+						_, _ = rand.Read(data)
+						e := &streamv1.ElementValue{
+							ElementId: strconv.Itoa(i),
+							Timestamp: timestamppb.Now(),
+							TagFamilies: []*modelv1.TagFamilyForWrite{
+								{
+									Tags: []*modelv1.TagValue{
+										{
+											Value: &modelv1.TagValue_BinaryData{
+												BinaryData: data,
+											},
+										},
+									},
+								},
+							},
+						}
+						e.TagFamilies = append(e.TagFamilies, tf)
+						errInner := stream.Write(e)
+						if err != nil {
+							s.l.Err(errInner).Msg("writing to the stream")
+						}
+					case <-s.stopCh:
+						return
+					}
+				}
+			}()
+
+		}
+
+	}
+	return s.stopCh
+}
+
+func (s *service) GracefulStop() {
+}