You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ha...@apache.org on 2023/04/18 06:54:31 UTC

[skywalking-banyandb] 01/01: Add several metrics to measure storage sub system.

This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch metrics-stroage
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit cf341f20eb4bbc521fa5904da17da9b1bb1b0387
Author: Gao Hongtao <ha...@gmail.com>
AuthorDate: Tue Apr 18 06:51:59 2023 +0000

    Add several metrics to measure storage sub system.
    
    Signed-off-by: Gao Hongtao <ha...@gmail.com>
---
 CHANGES.md                              |   1 +
 api/common/id.go                        |  27 +++++--
 banyand/internal/cmd/standalone.go      |   4 +-
 banyand/kv/badger.go                    |  10 +++
 banyand/kv/kv.go                        |   3 +
 banyand/liaison/grpc/server.go          |  33 ++++++++-
 banyand/measure/metadata.go             |   8 ++-
 banyand/observability/collector.go      |  34 +++++++++
 banyand/observability/meter_noop.go     |  14 ++++
 banyand/observability/meter_prom.go     | 107 ++++++++++++++++++++++++++-
 banyand/observability/metric.go         |  90 -----------------------
 banyand/observability/metrics_system.go | 123 ++++++++++++++++++++++++++++++++
 banyand/observability/pprof.go          |   4 +-
 banyand/stream/metadata.go              |   7 +-
 banyand/tsdb/block.go                   |  60 +++++++++++++---
 banyand/tsdb/buffer.go                  |  57 +++++++++------
 banyand/tsdb/indexdb.go                 |  29 ++++++--
 banyand/tsdb/scope.go                   |   4 ++
 banyand/tsdb/segment.go                 |   3 +
 banyand/tsdb/seriesdb.go                |  16 ++++-
 banyand/tsdb/shard.go                   |  79 ++++++++++++++++++--
 banyand/tsdb/tsdb.go                    |   3 +-
 go.mod                                  |  40 +++++++----
 go.sum                                  |  74 +++++++++++++------
 pkg/index/index.go                      |   1 +
 pkg/index/inverted/inverted.go          |  30 +++++---
 pkg/index/lsm/lsm.go                    |   4 ++
 pkg/meter/meter.go                      |  35 +++++----
 pkg/meter/prom/instruments.go           |  12 ++++
 pkg/meter/prom/prom.go                  |  23 +++---
 pkg/timestamp/scheduler.go              |  27 ++++---
 31 files changed, 732 insertions(+), 230 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index 515a2181..27fdfb0b 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -11,6 +11,7 @@ Release Notes.
 - Refactor TopN to support `NULL` group while keeping seriesID from the source measure.
 - Add a sharded buffer to TSDB to replace Badger's memtable. Badger KV only provides SST.
 - Add a meter system to control the internal metrics.
+- Add multiple metrics for measuring the storage subsystem.
 
 ### Chores
 
diff --git a/api/common/id.go b/api/common/id.go
index 41ae7551..6cc879ca 100644
--- a/api/common/id.go
+++ b/api/common/id.go
@@ -40,8 +40,8 @@ func (s SeriesID) Marshal() []byte {
 	return convert.Uint64ToBytes(uint64(s))
 }
 
-// PositionKey is a context key to store the module position.
-var PositionKey = contextPositionKey{}
+// positionKey is a context key to store the module position.
+var positionKey = contextPositionKey{}
 
 type contextPositionKey struct{}
 
@@ -60,21 +60,40 @@ func LabelNames() []string {
 	return []string{"module", "database", "shard", "seg", "block"}
 }
 
+// ShardLabelNames returns the label names of Position. It is used for shard level metrics.
+func ShardLabelNames() []string {
+	return []string{"module", "database", "shard"}
+}
+
 // LabelValues returns the label values of Position.
 func (p Position) LabelValues() []string {
 	return []string{p.Module, p.Database, p.Shard, p.Segment, p.Block}
 }
 
+// ShardLabelValues returns the label values of Position. It is used for shard level metrics.
+func (p Position) ShardLabelValues() []string {
+	return []string{p.Module, p.Database, p.Shard}
+}
+
 // SetPosition sets a position returned from fn to attach it to ctx, then return a new context.
 func SetPosition(ctx context.Context, fn func(p Position) Position) context.Context {
-	val := ctx.Value(PositionKey)
+	val := ctx.Value(positionKey)
 	var p Position
 	if val == nil {
 		p = Position{}
 	} else {
 		p = val.(Position)
 	}
-	return context.WithValue(ctx, PositionKey, fn(p))
+	return context.WithValue(ctx, positionKey, fn(p))
+}
+
+// GetPosition returns the position from ctx.
+func GetPosition(ctx context.Context) Position {
+	val := ctx.Value(positionKey)
+	if val == nil {
+		return Position{}
+	}
+	return val.(Position)
 }
 
 // Error wraps a error msg.
diff --git a/banyand/internal/cmd/standalone.go b/banyand/internal/cmd/standalone.go
index 795a9dc6..215dcb2f 100644
--- a/banyand/internal/cmd/standalone.go
+++ b/banyand/internal/cmd/standalone.go
@@ -35,7 +35,6 @@ import (
 	"github.com/apache/skywalking-banyandb/banyand/stream"
 	"github.com/apache/skywalking-banyandb/pkg/config"
 	"github.com/apache/skywalking-banyandb/pkg/logger"
-	"github.com/apache/skywalking-banyandb/pkg/meter"
 	"github.com/apache/skywalking-banyandb/pkg/run"
 	"github.com/apache/skywalking-banyandb/pkg/signal"
 	"github.com/apache/skywalking-banyandb/pkg/version"
@@ -90,8 +89,7 @@ func newStandaloneCmd() *cobra.Command {
 		httpServer,
 		profSvc,
 	}
-	_, noMetricProvider := observability.NewMeterProvider(observability.RootScope).(meter.NoopProvider)
-	if !noMetricProvider {
+	if metricSvc != nil {
 		units = append(units, metricSvc)
 	}
 	// Meta the run Group units.
diff --git a/banyand/kv/badger.go b/banyand/kv/badger.go
index 8a6bf80b..48a0a367 100644
--- a/banyand/kv/badger.go
+++ b/banyand/kv/badger.go
@@ -60,6 +60,11 @@ func (b *badgerTSS) Close() error {
 	return nil
 }
 
+func (b *badgerTSS) SizeOnDisk() int64 {
+	lsmSize, vlogSize := b.db.Size()
+	return lsmSize + vlogSize
+}
+
 type mergedIter struct {
 	delegated Iterator
 	data      []byte
@@ -145,6 +150,11 @@ func (b *badgerDB) Scan(prefix, seekKey []byte, opt ScanOpts, f ScanFunc) error
 	return nil
 }
 
+func (b *badgerDB) SizeOnDisk() int64 {
+	lsmSize, vlogSize := b.db.Size()
+	return lsmSize + vlogSize
+}
+
 var _ Iterator = (*iterator)(nil)
 
 type iterator struct {
diff --git a/banyand/kv/kv.go b/banyand/kv/kv.go
index c2e8a97a..c5ae7a89 100644
--- a/banyand/kv/kv.go
+++ b/banyand/kv/kv.go
@@ -73,6 +73,7 @@ type Store interface {
 	io.Closer
 	writer
 	Reader
+	SizeOnDisk() int64
 }
 
 // TimeSeriesReader allows retrieving data from a time-series storage.
@@ -86,6 +87,7 @@ type TimeSeriesStore interface {
 	io.Closer
 	Handover(skl *skl.Skiplist) error
 	TimeSeriesReader
+	SizeOnDisk() int64
 }
 
 // TimeSeriesOptions sets an options for creating a TimeSeriesStore.
@@ -163,6 +165,7 @@ type IndexStore interface {
 	Iterable
 	Reader
 	Close() error
+	SizeOnDisk() int64
 }
 
 // OpenTimeSeriesStore creates a new TimeSeriesStore.
diff --git a/banyand/liaison/grpc/server.go b/banyand/liaison/grpc/server.go
index 057486dc..5bcf3596 100644
--- a/banyand/liaison/grpc/server.go
+++ b/banyand/liaison/grpc/server.go
@@ -21,14 +21,18 @@ package grpc
 import (
 	"context"
 	"net"
+	"runtime/debug"
 	"time"
 
-	grpc_validator "github.com/grpc-ecosystem/go-grpc-middleware/validator"
+	"github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/recovery"
+	grpc_validator "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/validator"
 	"github.com/pkg/errors"
 	grpclib "google.golang.org/grpc"
+	"google.golang.org/grpc/codes"
 	"google.golang.org/grpc/credentials"
 	"google.golang.org/grpc/health"
 	"google.golang.org/grpc/health/grpc_health_v1"
+	"google.golang.org/grpc/status"
 
 	"github.com/apache/skywalking-banyandb/api/event"
 	databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
@@ -37,6 +41,7 @@ import (
 	streamv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
 	"github.com/apache/skywalking-banyandb/banyand/discovery"
 	"github.com/apache/skywalking-banyandb/banyand/metadata"
+	"github.com/apache/skywalking-banyandb/banyand/observability"
 	"github.com/apache/skywalking-banyandb/banyand/queue"
 	"github.com/apache/skywalking-banyandb/pkg/bus"
 	"github.com/apache/skywalking-banyandb/pkg/logger"
@@ -184,9 +189,31 @@ func (s *server) Serve() run.StopNotify {
 	if s.tls {
 		opts = []grpclib.ServerOption{grpclib.Creds(s.creds)}
 	}
+	grpcPanicRecoveryHandler := func(p any) (err error) {
+		s.log.Error().Interface("panic", p).Str("stack", string(debug.Stack())).Msg("recovered from panic")
+
+		return status.Errorf(codes.Internal, "%s", p)
+	}
+
+	unaryMetrics, streamMetrics := observability.MetricsServerInterceptor()
+	streamChain := []grpclib.StreamServerInterceptor{
+		grpc_validator.StreamServerInterceptor(),
+		recovery.StreamServerInterceptor(recovery.WithRecoveryHandler(grpcPanicRecoveryHandler)),
+	}
+	if streamMetrics != nil {
+		streamChain = append(streamChain, streamMetrics)
+	}
+	unaryChain := []grpclib.UnaryServerInterceptor{
+		grpc_validator.UnaryServerInterceptor(),
+		recovery.UnaryServerInterceptor(recovery.WithRecoveryHandler(grpcPanicRecoveryHandler)),
+	}
+	if unaryMetrics != nil {
+		unaryChain = append(unaryChain, unaryMetrics)
+	}
+
 	opts = append(opts, grpclib.MaxRecvMsgSize(s.maxRecvMsgSize),
-		grpclib.UnaryInterceptor(grpc_validator.UnaryServerInterceptor()),
-		grpclib.StreamInterceptor(grpc_validator.StreamServerInterceptor()),
+		grpclib.ChainUnaryInterceptor(unaryChain...),
+		grpclib.ChainStreamInterceptor(streamChain...),
 	)
 	s.ser = grpclib.NewServer(opts...)
 
diff --git a/banyand/measure/metadata.go b/banyand/measure/metadata.go
index aa16588e..2ed11cdd 100644
--- a/banyand/measure/metadata.go
+++ b/banyand/measure/metadata.go
@@ -240,10 +240,12 @@ func (s *supplier) OpenDB(groupSchema *commonv1.Group) (tsdb.Database, error) {
 	if opts.TTL, err = pb_v1.ToIntervalRule(groupSchema.ResourceOpts.Ttl); err != nil {
 		return nil, err
 	}
+
 	return tsdb.OpenDatabase(
-		context.WithValue(context.Background(), common.PositionKey, common.Position{
-			Module:   "measure",
-			Database: name,
+		common.SetPosition(context.Background(), func(p common.Position) common.Position {
+			p.Module = "measure"
+			p.Database = name
+			return p
 		}),
 		opts)
 }
diff --git a/banyand/observability/collector.go b/banyand/observability/collector.go
new file mode 100644
index 00000000..f03d8f52
--- /dev/null
+++ b/banyand/observability/collector.go
@@ -0,0 +1,34 @@
+package observability
+
+import (
+	"sync"
+)
+
+// MetricsCollector is a global metrics collector.
+var MetricsCollector = Collector{
+	getters: make(map[string]MetricsGetter),
+}
+
+// MetricsGetter is a function that collects metrics.
+type MetricsGetter func()
+
+// Collector is a metrics collector.
+type Collector struct {
+	getters map[string]MetricsGetter
+	gMux    sync.RWMutex
+}
+
+// Register registers a metrics getter.
+func (c *Collector) Register(name string, getter MetricsGetter) {
+	c.gMux.Lock()
+	defer c.gMux.Unlock()
+	c.getters[name] = getter
+}
+
+func (c *Collector) collect() {
+	c.gMux.RLock()
+	defer c.gMux.RUnlock()
+	for _, getter := range c.getters {
+		getter()
+	}
+}
diff --git a/banyand/observability/meter_noop.go b/banyand/observability/meter_noop.go
index bc25b6e6..29bb737d 100644
--- a/banyand/observability/meter_noop.go
+++ b/banyand/observability/meter_noop.go
@@ -21,10 +21,24 @@
 package observability
 
 import (
+	"google.golang.org/grpc"
+
 	"github.com/apache/skywalking-banyandb/pkg/meter"
+	"github.com/apache/skywalking-banyandb/pkg/run"
 )
 
+// NewMetricService returns a metric service.
+func NewMetricService() run.Service {
+	MetricsCollector.collect()
+	return nil
+}
+
 // NewMeterProvider returns a meter.Provider based on the given scope.
 func NewMeterProvider(_ meter.Scope) meter.Provider {
 	return meter.NoopProvider{}
 }
+
+// MetricsServerInterceptor returns a grpc.UnaryServerInterceptor and a grpc.StreamServerInterceptor.
+func MetricsServerInterceptor() (grpc.UnaryServerInterceptor, grpc.StreamServerInterceptor) {
+	return nil, nil
+}
diff --git a/banyand/observability/meter_prom.go b/banyand/observability/meter_prom.go
index 1047e99f..173bb116 100644
--- a/banyand/observability/meter_prom.go
+++ b/banyand/observability/meter_prom.go
@@ -21,11 +21,116 @@
 package observability
 
 import (
+	"context"
+	"net/http"
+	"time"
+
+	grpcprom "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus"
+	"github.com/prometheus/client_golang/prometheus"
+	"github.com/prometheus/client_golang/prometheus/promhttp"
+	"github.com/robfig/cron/v3"
+
+	"github.com/apache/skywalking-banyandb/pkg/logger"
 	"github.com/apache/skywalking-banyandb/pkg/meter"
 	"github.com/apache/skywalking-banyandb/pkg/meter/prom"
+	"github.com/apache/skywalking-banyandb/pkg/run"
+	"github.com/apache/skywalking-banyandb/pkg/timestamp"
+	"google.golang.org/grpc"
 )
 
+var (
+	_ run.Service = (*metricService)(nil)
+	_ run.Config  = (*metricService)(nil)
+
+	reg = prometheus.NewRegistry()
+)
+
+// NewMetricService returns a metric service.
+func NewMetricService() run.Service {
+	return &metricService{
+		closer: run.NewCloser(1),
+	}
+}
+
+type metricService struct {
+	l          *logger.Logger
+	svr        *http.Server
+	closer     *run.Closer
+	listenAddr string
+	scheduler  *timestamp.Scheduler
+}
+
+func (p *metricService) FlagSet() *run.FlagSet {
+	flagSet := run.NewFlagSet("observability")
+	flagSet.StringVar(&p.listenAddr, "observability-listener-addr", ":2121", "listen addr for observability")
+	return flagSet
+}
+
+func (p *metricService) Validate() error {
+	if p.listenAddr == "" {
+		return errNoAddr
+	}
+	return nil
+}
+
+func (p *metricService) Name() string {
+	return "metric-service"
+}
+
+func (p *metricService) Serve() run.StopNotify {
+	p.l = logger.GetLogger(p.Name())
+
+	reg.MustRegister(prometheus.NewGoCollector())
+	reg.MustRegister(prometheus.NewProcessCollector(prometheus.ProcessCollectorOpts{}))
+
+	clock, _ := timestamp.GetClock(context.TODO())
+	p.scheduler = timestamp.NewScheduler(p.l, clock)
+	p.scheduler.Register("metrics-collector", cron.Descriptor, "@every 15s", func(now time.Time, logger *logger.Logger) bool {
+		MetricsCollector.collect()
+		return true
+	})
+
+	mux := http.NewServeMux()
+	mux.Handle("/metrics", promhttp.HandlerFor(
+		reg,
+		promhttp.HandlerOpts{},
+	))
+	p.svr = &http.Server{
+		Addr:              p.listenAddr,
+		ReadHeaderTimeout: 3 * time.Second,
+		Handler:           mux,
+	}
+
+	go func() {
+		defer p.closer.Done()
+		p.l.Info().Str("listenAddr", p.listenAddr).Msg("Start metric server")
+		_ = p.svr.ListenAndServe()
+	}()
+	return p.closer.CloseNotify()
+}
+
+func (p *metricService) GracefulStop() {
+	if p.scheduler != nil {
+		p.scheduler.Close()
+	}
+	if p.svr != nil {
+		_ = p.svr.Close()
+	}
+	p.closer.CloseThenWait()
+}
+
 // NewMeterProvider returns a meter.Provider based on the given scope.
 func NewMeterProvider(scope meter.Scope) meter.Provider {
-	return prom.NewProvider(scope)
+	return prom.NewProvider(scope, reg)
+}
+
+// MetricsServerInterceptor returns a server interceptor for metrics.
+func MetricsServerInterceptor() (grpc.UnaryServerInterceptor, grpc.StreamServerInterceptor) {
+	srvMetrics := grpcprom.NewServerMetrics(
+		grpcprom.WithServerHandlingTimeHistogram(
+			grpcprom.WithHistogramBuckets([]float64{0.001, 0.01, 0.1, 0.3, 0.6, 1, 3, 6, 9, 20, 30, 60, 90, 120}),
+		),
+	)
+	reg.MustRegister(srvMetrics)
+	return srvMetrics.UnaryServerInterceptor(), srvMetrics.StreamServerInterceptor()
 }
diff --git a/banyand/observability/metric.go b/banyand/observability/metric.go
deleted file mode 100644
index d34b7a5d..00000000
--- a/banyand/observability/metric.go
+++ /dev/null
@@ -1,90 +0,0 @@
-// Licensed to Apache Software Foundation (ASF) under one or more contributor
-// license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright
-// ownership. Apache Software Foundation (ASF) licenses this file to you under
-// the Apache License, Version 2.0 (the "License"); you may
-// not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//	http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package observability
-
-import (
-	"net/http"
-	"time"
-
-	"github.com/prometheus/client_golang/prometheus/promhttp"
-
-	"github.com/apache/skywalking-banyandb/pkg/logger"
-	"github.com/apache/skywalking-banyandb/pkg/meter"
-	"github.com/apache/skywalking-banyandb/pkg/run"
-)
-
-var (
-	_ run.Service = (*metricService)(nil)
-	_ run.Config  = (*metricService)(nil)
-
-	// RootScope is the root scope for all metrics.
-	RootScope = meter.NewHierarchicalScope("banyandb", "_")
-)
-
-// NewMetricService returns a metric service.
-func NewMetricService() run.Service {
-	return &metricService{
-		closer: run.NewCloser(1),
-	}
-}
-
-type metricService struct {
-	l          *logger.Logger
-	svr        *http.Server
-	closer     *run.Closer
-	listenAddr string
-}
-
-func (p *metricService) FlagSet() *run.FlagSet {
-	flagSet := run.NewFlagSet("observability")
-	flagSet.StringVar(&p.listenAddr, "observability-listener-addr", ":2121", "listen addr for observability")
-	return flagSet
-}
-
-func (p *metricService) Validate() error {
-	if p.listenAddr == "" {
-		return errNoAddr
-	}
-	return nil
-}
-
-func (p *metricService) Name() string {
-	return "metric-service"
-}
-
-func (p *metricService) Serve() run.StopNotify {
-	p.l = logger.GetLogger(p.Name())
-	mux := http.NewServeMux()
-	mux.Handle("/metrics", promhttp.Handler())
-	p.svr = &http.Server{
-		Addr:              p.listenAddr,
-		ReadHeaderTimeout: 3 * time.Second,
-		Handler:           mux,
-	}
-	go func() {
-		defer p.closer.Done()
-		p.l.Info().Str("listenAddr", p.listenAddr).Msg("Start metric server")
-		_ = p.svr.ListenAndServe()
-	}()
-	return p.closer.CloseNotify()
-}
-
-func (p *metricService) GracefulStop() {
-	_ = p.svr.Close()
-	p.closer.CloseThenWait()
-}
diff --git a/banyand/observability/metrics_system.go b/banyand/observability/metrics_system.go
new file mode 100644
index 00000000..abb5cb8f
--- /dev/null
+++ b/banyand/observability/metrics_system.go
@@ -0,0 +1,123 @@
+package observability
+
+import (
+	"context"
+	"strings"
+	"sync"
+
+	"github.com/shirou/gopsutil/v3/cpu"
+	"github.com/shirou/gopsutil/v3/mem"
+	"github.com/shirou/gopsutil/v3/net"
+
+	"github.com/apache/skywalking-banyandb/pkg/logger"
+	"github.com/apache/skywalking-banyandb/pkg/meter"
+)
+
+var log = logger.GetLogger("observability", "metrics", "system")
+
+var (
+	cpuCount      = 0
+	once4CpuCount sync.Once
+	cpuCountsFunc = cpu.Counts
+	cpuTimesFunc  = cpu.Times
+)
+
+var (
+	// RootScope is the root scope for all metrics.
+	RootScope = meter.NewHierarchicalScope("banyandb", "_")
+
+	systemScope = RootScope.SubScope("system")
+
+	systemProvider  = NewMeterProvider(systemScope)
+	cpuStateGauge   = systemProvider.Gauge("cpu_state", "kind")
+	cpuNumGauge     = systemProvider.Gauge("cpu_num")
+	memorySateGauge = systemProvider.Gauge("memory_state", "kind")
+	netStateGauge   = systemProvider.Gauge("net_state", "kind", "name")
+)
+
+func init() {
+	MetricsCollector.Register("cpu", collectCPU)
+	MetricsCollector.Register("memory", collectMemory)
+	MetricsCollector.Register("net", collectNet)
+}
+
+func collectCPU() {
+	once4CpuCount.Do(func() {
+		if c, err := cpuCountsFunc(false); err != nil {
+			log.Error().Err(err).Msg("cannot get cpu count")
+		} else {
+			cpuCount = c
+		}
+	})
+	cpuNumGauge.Set(float64(cpuCount))
+	s, err := cpuTimesFunc(false)
+	if err != nil {
+		log.Error().Err(err).Msg("cannot get cpu stat")
+	}
+	if len(s) == 0 {
+		log.Error().Msg("cannot get cpu stat")
+	}
+	allStat := s[0]
+	total := allStat.User + allStat.System + allStat.Idle + allStat.Nice + allStat.Iowait + allStat.Irq +
+		allStat.Softirq + allStat.Steal + allStat.Guest + allStat.GuestNice
+	cpuStateGauge.Set(allStat.User/total, "user")
+	cpuStateGauge.Set(allStat.System/total, "system")
+	cpuStateGauge.Set(allStat.Idle/total, "idle")
+	cpuStateGauge.Set(allStat.Nice/total, "nice")
+	cpuStateGauge.Set(allStat.Iowait/total, "iowait")
+	cpuStateGauge.Set(allStat.Irq/total, "irq")
+	cpuStateGauge.Set(allStat.Softirq/total, "softirq")
+	cpuStateGauge.Set(allStat.Steal/total, "steal")
+}
+
+func collectMemory() {
+	m, err := mem.VirtualMemory()
+	if err != nil {
+		log.Error().Err(err).Msg("cannot get memory stat")
+	}
+	memorySateGauge.Set(m.UsedPercent/100, "used_percent")
+	memorySateGauge.Set(float64(m.Used)/float64(m.Total), "used")
+}
+
+func collectNet() {
+	stats, err := getNetStat(context.Background())
+	if err != nil {
+		log.Error().Err(err).Msg("cannot get net stat")
+	}
+	for _, stat := range stats {
+		netStateGauge.Set(float64(stat.BytesRecv), "bytes_recv", stat.Name)
+		netStateGauge.Set(float64(stat.BytesSent), "bytes_sent", stat.Name)
+		netStateGauge.Set(float64(stat.PacketsRecv), "packets_recv", stat.Name)
+		netStateGauge.Set(float64(stat.PacketsSent), "packets_sent", stat.Name)
+		netStateGauge.Set(float64(stat.Errin), "errin", stat.Name)
+		netStateGauge.Set(float64(stat.Errout), "errout", stat.Name)
+		netStateGauge.Set(float64(stat.Dropin), "dropin", stat.Name)
+		netStateGauge.Set(float64(stat.Dropout), "dropout", stat.Name)
+		netStateGauge.Set(float64(stat.Fifoin), "fifoin", stat.Name)
+		netStateGauge.Set(float64(stat.Fifoout), "fifoout", stat.Name)
+	}
+}
+
+func getNetStat(ctx context.Context) ([]net.IOCountersStat, error) {
+	stats, err := net.IOCountersWithContext(ctx, true)
+	if err != nil {
+		return nil, err
+	}
+	var availableStats []net.IOCountersStat
+	for _, stat := range stats {
+		switch {
+		// OS X
+		case strings.HasPrefix(stat.Name, "en"):
+		// Linux
+		case strings.HasPrefix(stat.Name, "eth"):
+		default:
+			continue
+		}
+		// ignore empty interface
+		if stat.BytesRecv == 0 || stat.BytesSent == 0 {
+			continue
+		}
+		availableStats = append(availableStats, stat)
+	}
+	return availableStats, nil
+}
diff --git a/banyand/observability/pprof.go b/banyand/observability/pprof.go
index 0e651367..46cbb0da 100644
--- a/banyand/observability/pprof.go
+++ b/banyand/observability/pprof.go
@@ -27,8 +27,8 @@ import (
 )
 
 var (
-	_ run.Service = (*metricService)(nil)
-	_ run.Config  = (*metricService)(nil)
+	_ run.Service = (*pprofService)(nil)
+	_ run.Config  = (*pprofService)(nil)
 )
 
 // NewProfService returns a pprof service.
diff --git a/banyand/stream/metadata.go b/banyand/stream/metadata.go
index 968fd7a1..11de21d9 100644
--- a/banyand/stream/metadata.go
+++ b/banyand/stream/metadata.go
@@ -220,9 +220,10 @@ func (s *supplier) OpenDB(groupSchema *commonv1.Group) (tsdb.Database, error) {
 		return nil, err
 	}
 	return tsdb.OpenDatabase(
-		context.WithValue(context.Background(), common.PositionKey, common.Position{
-			Module:   "stream",
-			Database: name,
+		common.SetPosition(context.Background(), func(p common.Position) common.Position {
+			p.Module = "stream"
+			p.Database = name
+			return p
 		}),
 		opts)
 }
diff --git a/banyand/tsdb/block.go b/banyand/tsdb/block.go
index 53f29f47..55dbb83b 100644
--- a/banyand/tsdb/block.go
+++ b/banyand/tsdb/block.go
@@ -28,6 +28,7 @@ import (
 	"sync"
 	"sync/atomic"
 	"time"
+	"unsafe"
 
 	"github.com/dgraph-io/badger/v3/skl"
 	"github.com/pkg/errors"
@@ -35,11 +36,13 @@ import (
 
 	"github.com/apache/skywalking-banyandb/api/common"
 	"github.com/apache/skywalking-banyandb/banyand/kv"
+	"github.com/apache/skywalking-banyandb/banyand/observability"
 	"github.com/apache/skywalking-banyandb/banyand/tsdb/bucket"
 	"github.com/apache/skywalking-banyandb/pkg/index"
 	"github.com/apache/skywalking-banyandb/pkg/index/inverted"
 	"github.com/apache/skywalking-banyandb/pkg/index/lsm"
 	"github.com/apache/skywalking-banyandb/pkg/logger"
+	"github.com/apache/skywalking-banyandb/pkg/meter"
 	"github.com/apache/skywalking-banyandb/pkg/timestamp"
 )
 
@@ -53,9 +56,16 @@ const (
 	maxBlockAge             = time.Hour
 	defaultWriteConcurrency = 1000
 	defaultNumBufferShards  = 2
+	itemIDLength            = unsafe.Sizeof(common.ItemID(0))
 )
 
-var errBlockClosingInterrupted = errors.New("interrupt to close the block")
+var (
+	blockMeterProvider          = observability.NewMeterProvider(meterTSDB.SubScope("block"))
+	blockOpenedTimeSecondsGauge = blockMeterProvider.Gauge("opened_time_seconds", common.LabelNames()...)
+	blockReferencesGauge        = blockMeterProvider.Gauge("refs", common.LabelNames()...)
+
+	errBlockClosingInterrupted = errors.New("interrupt to close the block")
+)
 
 type block struct {
 	invertedIndex index.Store
@@ -125,10 +135,7 @@ func newBlock(ctx context.Context, opts blockOpts) (b *block, err error) {
 	b.Reporter = bucket.NewTimeBasedReporter(b.String(), opts.timeRange, clock, opts.scheduler)
 	b.closed.Store(true)
 	b.options(ctx)
-	position := ctx.Value(common.PositionKey)
-	if position != nil {
-		b.position = position.(common.Position)
-	}
+	b.position = common.GetPosition(ctx)
 
 	return b, nil
 }
@@ -202,6 +209,7 @@ func (b *block) open() (err error) {
 	b.closableLst = append(b.closableLst, b.invertedIndex, b.lsmIndex)
 	b.ref.Store(0)
 	b.closed.Store(false)
+	blockOpenedTimeSecondsGauge.Set(float64(time.Now().Unix()), b.position.LabelValues()...)
 	return nil
 }
 
@@ -287,11 +295,13 @@ func (b *block) incRef() bool {
 		return false
 	}
 	b.ref.Add(1)
+	blockReferencesGauge.Set(float64(b.ref.Load()), b.position.LabelValues()...)
 	return true
 }
 
 func (b *block) Done() {
 	b.ref.Add(-1)
+	blockReferencesGauge.Set(float64(b.ref.Load()), b.position.LabelValues()...)
 }
 
 func (b *block) waitDone(stopped *atomic.Bool) <-chan struct{} {
@@ -337,6 +347,9 @@ func (b *block) close(ctx context.Context) (err error) {
 	for _, closer := range b.closableLst {
 		err = multierr.Append(err, closer.Close())
 	}
+	for _, g := range []meter.Gauge{blockOpenedTimeSecondsGauge, blockReferencesGauge} {
+		g.Delete(b.position.LabelValues()...)
+	}
 	return err
 }
 
@@ -416,24 +429,55 @@ func (d *bDelegate) write(key []byte, val []byte, ts time.Time) error {
 		d.delegate.lock.Lock()
 		if err := d.delegate.openBuffer(); err != nil {
 			d.delegate.lock.Unlock()
+			receivedNumCounter.Inc(1, append(d.delegate.position.ShardLabelValues(), "main", "true")...)
 			return err
 		}
 		d.delegate.lock.Unlock()
 	}
 	d.delegate.buffer.Write(key, val, ts)
+	receivedBytesCounter.Inc(float64(len(key)+len(val)), append(d.delegate.position.ShardLabelValues(), "main")...)
+	receivedNumCounter.Inc(1, append(d.delegate.position.ShardLabelValues(), "main", "false")...)
 	return nil
 }
 
 func (d *bDelegate) writePrimaryIndex(field index.Field, id common.ItemID) error {
-	return d.delegate.lsmIndex.Write([]index.Field{field}, id)
+	if err := d.delegate.lsmIndex.Write([]index.Field{field}, id); err != nil {
+		receivedNumCounter.Inc(1, append(d.delegate.position.ShardLabelValues(), "primary", "true")...)
+		return err
+	}
+	receivedBytesCounter.Inc(float64(len(field.Marshal())+int(itemIDLength)), append(d.delegate.position.ShardLabelValues(), "primary")...)
+	receivedNumCounter.Inc(1, append(d.delegate.position.ShardLabelValues(), "primary", "false")...)
+	return nil
 }
 
 func (d *bDelegate) writeLSMIndex(fields []index.Field, id common.ItemID) error {
-	return d.delegate.lsmIndex.Write(fields, id)
+	total := 0
+	for _, f := range fields {
+		total += len(f.Marshal())
+	}
+
+	if err := d.delegate.lsmIndex.Write(fields, id); err != nil {
+		receivedNumCounter.Inc(1, append(d.delegate.position.ShardLabelValues(), "local_lsm", "true")...)
+		return err
+	}
+	receivedBytesCounter.Inc(float64(total+int(itemIDLength)), append(d.delegate.position.ShardLabelValues(), "local_lsm")...)
+	receivedNumCounter.Inc(1, append(d.delegate.position.ShardLabelValues(), "local_lsm", "false")...)
+	return nil
 }
 
 func (d *bDelegate) writeInvertedIndex(fields []index.Field, id common.ItemID) error {
-	return d.delegate.invertedIndex.Write(fields, id)
+	total := 0
+	for _, f := range fields {
+		total += len(f.Marshal())
+	}
+
+	if err := d.delegate.invertedIndex.Write(fields, id); err != nil {
+		receivedNumCounter.Inc(1, append(d.delegate.position.ShardLabelValues(), "local_inverted", "true")...)
+		return err
+	}
+	receivedBytesCounter.Inc(float64(total+int(itemIDLength)), append(d.delegate.position.ShardLabelValues(), "local_inverted")...)
+	receivedNumCounter.Inc(1, append(d.delegate.position.ShardLabelValues(), "local_inverted", "false")...)
+	return nil
 }
 
 func (d *bDelegate) contains(ts time.Time) bool {
diff --git a/banyand/tsdb/buffer.go b/banyand/tsdb/buffer.go
index 001501df..d2e6ebe6 100644
--- a/banyand/tsdb/buffer.go
+++ b/banyand/tsdb/buffer.go
@@ -67,17 +67,18 @@ type flushEvent struct {
 type onFlush func(shardIndex int, skl *skl.Skiplist) error
 
 type bufferShardBucket struct {
-	mutable        *skl.Skiplist
-	writeCh        chan operation
-	flushCh        chan flushEvent
-	writeWaitGroup *sync.WaitGroup
-	flushWaitGroup *sync.WaitGroup
-	log            *logger.Logger
-	immutables     []*skl.Skiplist
-	labelValues    []string
-	index          int
-	capacity       int
-	mutex          sync.RWMutex
+	mutable          *skl.Skiplist
+	writeCh          chan operation
+	flushCh          chan flushEvent
+	writeWaitGroup   *sync.WaitGroup
+	flushWaitGroup   *sync.WaitGroup
+	log              *logger.Logger
+	immutables       []*skl.Skiplist
+	labelValues      []string
+	shardLabelValues []string
+	index            int
+	capacity         int
+	mutex            sync.RWMutex
 }
 
 // Buffer is an exported struct that represents a buffer composed of multiple shard buckets.
@@ -106,15 +107,16 @@ func NewBuffer(log *logger.Logger, position common.Position, flushSize, writeCon
 	buffer.flushWaitGroup.Add(numShards)
 	for i := 0; i < numShards; i++ {
 		buckets[i] = bufferShardBucket{
-			index:          i,
-			capacity:       flushSize,
-			mutable:        skl.NewSkiplist(int64(flushSize)),
-			writeCh:        make(chan operation, writeConcurrency),
-			flushCh:        make(chan flushEvent, 1),
-			writeWaitGroup: &buffer.writeWaitGroup,
-			flushWaitGroup: &buffer.flushWaitGroup,
-			log:            buffer.log.Named(fmt.Sprintf("shard-%d", i)),
-			labelValues:    append(position.LabelValues(), fmt.Sprintf("%d", i)),
+			index:            i,
+			capacity:         flushSize,
+			mutable:          skl.NewSkiplist(int64(flushSize)),
+			writeCh:          make(chan operation, writeConcurrency),
+			flushCh:          make(chan flushEvent, 1),
+			writeWaitGroup:   &buffer.writeWaitGroup,
+			flushWaitGroup:   &buffer.flushWaitGroup,
+			log:              buffer.log.Named(fmt.Sprintf("shard-%d", i)),
+			labelValues:      append(position.LabelValues(), fmt.Sprintf("%d", i)),
+			shardLabelValues: position.ShardLabelValues(),
 		}
 		buckets[i].start(onFlushFn)
 		maxBytes.Set(float64(flushSize), buckets[i].labelValues...)
@@ -206,23 +208,34 @@ func (bsb *bufferShardBucket) getAll() ([]*skl.Skiplist, func()) {
 
 func (bsb *bufferShardBucket) start(onFlushFn onFlush) {
 	go func() {
+		defer func() {
+			for _, g := range []meter.Gauge{maxBytes, immutableBytes, mutableBytes} {
+				g.Delete(bsb.labelValues...)
+			}
+		}()
 		defer bsb.flushWaitGroup.Done()
 		for event := range bsb.flushCh {
 			oldSkipList := event.data
 			memSize := oldSkipList.MemSize()
+			t1 := time.Now()
 			for {
 				if err := onFlushFn(bsb.index, oldSkipList); err != nil {
 					bsb.log.Err(err).Msg("flushing immutable buffer failed. Retrying...")
+					flushNum.Inc(1, append(bsb.labelValues[:2], "true")...)
 					time.Sleep(time.Second)
 					continue
 				}
 				break
 			}
+			flushLatency.Observe(time.Since(t1).Seconds(), bsb.shardLabelValues...)
+			immutableBytes.Add(float64(-memSize), bsb.labelValues...)
+			flushBytes.Inc(float64(memSize), bsb.shardLabelValues...)
+			flushNum.Inc(1, append(bsb.shardLabelValues, "false")...)
+
 			bsb.mutex.Lock()
 			bsb.immutables = bsb.immutables[1:]
 			bsb.mutex.Unlock()
 			oldSkipList.DecrRef()
-			immutableBytes.Add(float64(-memSize), bsb.labelValues...)
 		}
 	}()
 	go func() {
@@ -259,6 +272,6 @@ func (bsb *bufferShardBucket) triggerFlushing() {
 
 func (bsb *bufferShardBucket) swap() {
 	bsb.immutables = append(bsb.immutables, bsb.mutable)
-	bsb.mutable = skl.NewSkiplist(int64(bsb.capacity))
 	immutableBytes.Add(float64(bsb.mutable.MemSize()), bsb.labelValues...)
+	bsb.mutable = skl.NewSkiplist(int64(bsb.capacity))
 }
diff --git a/banyand/tsdb/indexdb.go b/banyand/tsdb/indexdb.go
index d0fff8fe..d9e57068 100644
--- a/banyand/tsdb/indexdb.go
+++ b/banyand/tsdb/indexdb.go
@@ -147,23 +147,44 @@ type indexWriter struct {
 }
 
 func (i *indexWriter) WriteLSMIndex(fields []index.Field) (err error) {
+	total := 0
 	for _, field := range fields {
 		if i.scope != nil {
 			field.Key.SeriesID = GlobalSeriesID(i.scope)
 		}
-		err = multierr.Append(err, i.seg.globalIndex.PutWithVersion(field.Marshal(), i.itemID.marshal(), uint64(i.ts.UnixNano())))
+		bb := field.Marshal()
+		ibb := i.itemID.marshal()
+		err = multierr.Append(err, i.seg.globalIndex.PutWithVersion(bb, ibb, uint64(i.ts.UnixNano())))
+		total += len(bb) + len(ibb) + 8
 	}
-	return err
+	if err != nil {
+		receivedNumCounter.Inc(1, append(i.seg.position.ShardLabelValues(), "global_lsm", "true")...)
+		return err
+	}
+	receivedNumCounter.Inc(1, append(i.seg.position.ShardLabelValues(), "global_lsm", "false")...)
+	receivedBytesCounter.Inc(float64(total), append(i.seg.position.ShardLabelValues(), "global_lsm")...)
+	return nil
 }
 
 func (i *indexWriter) WriteInvertedIndex(fields []index.Field) (err error) {
+	total := 0
 	for _, field := range fields {
 		if i.scope != nil {
 			field.Key.SeriesID = GlobalSeriesID(i.scope)
 		}
-		err = multierr.Append(err, i.seg.globalIndex.PutWithVersion(field.Marshal(), i.itemID.marshal(), uint64(i.ts.UnixNano())))
+		bb := field.Marshal()
+		ibb := i.itemID.marshal()
+		err = multierr.Append(err, i.seg.globalIndex.PutWithVersion(bb, ibb, uint64(i.ts.UnixNano())))
+		total += len(bb) + len(ibb) + 8
+	}
+
+	if err != nil {
+		receivedNumCounter.Inc(1, append(i.seg.position.ShardLabelValues(), "global_inverted", "true")...)
+		return err
 	}
-	return err
+	receivedNumCounter.Inc(1, append(i.seg.position.ShardLabelValues(), "global_inverted", "false")...)
+	receivedBytesCounter.Inc(float64(total), append(i.seg.position.ShardLabelValues(), "global_inverted")...)
+	return nil
 }
 
 // GlobalSeriesID encodes Entry to common.SeriesID.
diff --git a/banyand/tsdb/scope.go b/banyand/tsdb/scope.go
index 15aebb9b..d28de9eb 100644
--- a/banyand/tsdb/scope.go
+++ b/banyand/tsdb/scope.go
@@ -88,3 +88,7 @@ func (sdd *scopedSeriesDatabase) GetByID(id common.SeriesID) (Series, error) {
 func (sdd *scopedSeriesDatabase) List(ctx context.Context, path Path) (SeriesList, error) {
 	return sdd.delegated.List(ctx, path.prepend(sdd.scope))
 }
+
+func (sdd *scopedSeriesDatabase) SizeOnDisk() int64 {
+	return sdd.delegated.SizeOnDisk()
+}
diff --git a/banyand/tsdb/segment.go b/banyand/tsdb/segment.go
index 61122f21..1e2abaa7 100644
--- a/banyand/tsdb/segment.go
+++ b/banyand/tsdb/segment.go
@@ -25,6 +25,7 @@ import (
 	"sync"
 	"time"
 
+	"github.com/apache/skywalking-banyandb/api/common"
 	"github.com/apache/skywalking-banyandb/banyand/kv"
 	"github.com/apache/skywalking-banyandb/banyand/tsdb/bucket"
 	"github.com/apache/skywalking-banyandb/pkg/logger"
@@ -39,6 +40,7 @@ type segment struct {
 	l                   *logger.Logger
 	blockController     *blockController
 	blockManageStrategy *bucket.Strategy
+	position            common.Position
 	timestamp.TimeRange
 	path      string
 	suffix    string
@@ -60,6 +62,7 @@ func openSegment(ctx context.Context, startTime, endTime time.Time, path, suffix
 		path:      path,
 		suffix:    suffix,
 		TimeRange: timeRange,
+		position:  common.GetPosition(ctx),
 	}
 	l := logger.Fetch(ctx, s.String())
 	s.l = l
diff --git a/banyand/tsdb/seriesdb.go b/banyand/tsdb/seriesdb.go
index d715c0a3..679fcf43 100644
--- a/banyand/tsdb/seriesdb.go
+++ b/banyand/tsdb/seriesdb.go
@@ -251,6 +251,7 @@ type SeriesDatabase interface {
 	GetByID(id common.SeriesID) (Series, error)
 	Get(key []byte, entityValues EntityValues) (Series, error)
 	List(ctx context.Context, path Path) (SeriesList, error)
+	SizeOnDisk() int64
 }
 
 type blockDatabase interface {
@@ -271,6 +272,7 @@ type seriesDB struct {
 	seriesMetadata kv.Store
 	l              *logger.Logger
 	segCtrl        *segmentController
+	position       common.Position
 	sync.Mutex
 	sID common.ShardID
 }
@@ -322,8 +324,11 @@ func (s *seriesDB) Get(key []byte, entityValues EntityValues) (Series, error) {
 		}
 		errDecode = s.seriesMetadata.Put(entityKey, encodedData)
 		if errDecode != nil {
+			receivedNumCounter.Inc(1, append(s.position.ShardLabelValues(), "series", "true")...)
 			return nil, errDecode
 		}
+		receivedBytesCounter.Inc(float64(len(entityKey)+len(encodedData)), append(s.position.ShardLabelValues(), "series")...)
+		receivedNumCounter.Inc(1, append(s.position.ShardLabelValues(), "series", "false")...)
 
 		var series string
 		if e := s.l.Debug(); e.Enabled() {
@@ -349,6 +354,10 @@ func (s *seriesDB) Get(key []byte, entityValues EntityValues) (Series, error) {
 	return newSeries(s.context(), seriesID, entityValues.String(), s), nil
 }
 
+func (s *seriesDB) SizeOnDisk() int64 {
+	return s.seriesMetadata.SizeOnDisk()
+}
+
 func encode(seriesID common.SeriesID, evv EntityValues) ([]byte, []byte, error) {
 	data, err := MarshalEntityValues(evv)
 	if err != nil {
@@ -493,9 +502,10 @@ func (s *seriesDB) Close() error {
 
 func newSeriesDataBase(ctx context.Context, shardID common.ShardID, path string, segCtrl *segmentController) (SeriesDatabase, error) {
 	sdb := &seriesDB{
-		sID:     shardID,
-		segCtrl: segCtrl,
-		l:       logger.Fetch(ctx, "series_database"),
+		sID:      shardID,
+		segCtrl:  segCtrl,
+		l:        logger.Fetch(ctx, "series_database"),
+		position: common.GetPosition(ctx),
 	}
 	o := ctx.Value(optionsKey)
 	var memSize int64
diff --git a/banyand/tsdb/shard.go b/banyand/tsdb/shard.go
index 3f561d69..fafd8b1d 100644
--- a/banyand/tsdb/shard.go
+++ b/banyand/tsdb/shard.go
@@ -21,15 +21,19 @@ import (
 	"context"
 	"sort"
 	"strconv"
+	"strings"
 	"sync"
 	"time"
 
 	"github.com/pkg/errors"
+	"github.com/shirou/gopsutil/v3/disk"
 	"go.uber.org/multierr"
 
 	"github.com/apache/skywalking-banyandb/api/common"
+	"github.com/apache/skywalking-banyandb/banyand/observability"
 	"github.com/apache/skywalking-banyandb/banyand/tsdb/bucket"
 	"github.com/apache/skywalking-banyandb/pkg/logger"
+	"github.com/apache/skywalking-banyandb/pkg/meter"
 	"github.com/apache/skywalking-banyandb/pkg/timestamp"
 )
 
@@ -39,7 +43,31 @@ const (
 	defaultKVMemorySize      = 1 << 20
 )
 
-var _ Shard = (*shard)(nil)
+var (
+	_ Shard = (*shard)(nil)
+
+	shardProvider   = observability.NewMeterProvider(meterTSDB.SubScope("shard"))
+	diskStateGetter = disk.UsageWithContext
+
+	diskStateGauge       meter.Gauge
+	flushBytes           meter.Counter
+	flushNum             meter.Counter
+	flushLatency         meter.Histogram
+	receivedBytesCounter meter.Counter
+	receivedNumCounter   meter.Counter
+	onDiskBytesGauge     meter.Gauge
+)
+
+func init() {
+	labelNames := common.ShardLabelNames()
+	diskStateGauge = shardProvider.Gauge("disk_state", append(labelNames, "kind")...)
+	flushBytes = shardProvider.Counter("flush_bytes", labelNames...)
+	flushNum = shardProvider.Counter("flush_num", append(labelNames, "is_error")...)
+	flushLatency = shardProvider.Histogram("flush_latency", meter.DefBuckets, labelNames...)
+	receivedBytesCounter = shardProvider.Counter("received_bytes", append(labelNames, "kind")...)
+	receivedNumCounter = shardProvider.Counter("received_num", append(labelNames, "kind", "is_error")...)
+	onDiskBytesGauge = shardProvider.Gauge("on_disk_bytes", append(labelNames, "kind")...)
+}
 
 type shard struct {
 	seriesDatabase        SeriesDatabase
@@ -103,14 +131,27 @@ func OpenShard(ctx context.Context, id common.ShardID,
 		return nil, err
 	}
 	s.segmentManageStrategy.Run()
-	position := shardCtx.Value(common.PositionKey)
-	if position != nil {
-		s.position = position.(common.Position)
-	}
+	s.position = common.GetPosition(shardCtx)
 	retentionTask := newRetentionTask(s.segmentController, ttl)
 	if err := scheduler.Register("retention", retentionTask.option, retentionTask.expr, retentionTask.run); err != nil {
 		return nil, err
 	}
+	plv := s.position.ShardLabelValues()
+	observability.MetricsCollector.Register(strings.Join(plv, "-"), func() {
+		if stat, err := diskStateGetter(ctx, path); err != nil {
+			s.l.Error().Err(err).Msg("get disk usage stat")
+		} else {
+			diskStateGauge.Set(stat.UsedPercent, append(plv, "used_percent")...)
+			diskStateGauge.Set(float64(stat.Free), append(plv, "free")...)
+			diskStateGauge.Set(float64(stat.Total), append(plv, "total")...)
+			diskStateGauge.Set(float64(stat.Used), append(plv, "used")...)
+			diskStateGauge.Set(float64(stat.InodesUsed), append(plv, "inodes_used")...)
+			diskStateGauge.Set(float64(stat.InodesFree), append(plv, "inodes_free")...)
+			diskStateGauge.Set(float64(stat.InodesTotal), append(plv, "inodes_total")...)
+			diskStateGauge.Set(stat.InodesUsedPercent, append(plv, "inodes_used_percent")...)
+		}
+		s.collectSizeOnDisk(plv)
+	})
 	return s, nil
 }
 
@@ -126,6 +167,34 @@ func (s *shard) Index() IndexDatabase {
 	return s.indexDatabase
 }
 
+func (s *shard) collectSizeOnDisk(labelsValues []string) {
+	var globalIndex, localLSM, localInverted, main int64
+	for _, seg := range s.segmentController.segments() {
+		if seg.globalIndex != nil {
+			globalIndex += seg.globalIndex.SizeOnDisk()
+		}
+		for _, b := range seg.blockController.blocks() {
+			if b.Closed() {
+				continue
+			}
+			if b.sst != nil {
+				main += b.sst.SizeOnDisk()
+			}
+			if b.lsmIndex != nil {
+				localLSM += b.lsmIndex.SizeOnDisk()
+			}
+			if b.invertedIndex != nil {
+				localInverted += b.invertedIndex.SizeOnDisk()
+			}
+		}
+	}
+	onDiskBytesGauge.Set(float64(s.seriesDatabase.SizeOnDisk()), append(labelsValues, "series")...)
+	onDiskBytesGauge.Set(float64(globalIndex), append(labelsValues, "global_index")...)
+	onDiskBytesGauge.Set(float64(localLSM), append(labelsValues, "local_lsm")...)
+	onDiskBytesGauge.Set(float64(localInverted), append(labelsValues, "local_inverted")...)
+	onDiskBytesGauge.Set(float64(main), append(labelsValues, "main")...)
+}
+
 func (s *shard) State() (shardState ShardState) {
 	shardState.StrategyManagers = append(shardState.StrategyManagers, s.segmentManageStrategy.String())
 	for _, seg := range s.segmentController.segments() {
diff --git a/banyand/tsdb/tsdb.go b/banyand/tsdb/tsdb.go
index daa7b4a9..fb499c4b 100644
--- a/banyand/tsdb/tsdb.go
+++ b/banyand/tsdb/tsdb.go
@@ -239,10 +239,11 @@ func OpenDatabase(ctx context.Context, opts DatabaseOpts) (Database, error) {
 	if opts.TTL.Num == 0 {
 		return nil, errors.Wrap(errOpenDatabase, "ttl is absent")
 	}
+	p := common.GetPosition(ctx)
 	db := &database{
 		location:    opts.Location,
 		shardNum:    opts.ShardNum,
-		logger:      logger.Fetch(ctx, "tsdb"),
+		logger:      logger.Fetch(ctx, p.Database),
 		segmentSize: opts.SegmentInterval,
 		blockSize:   opts.BlockInterval,
 		ttl:         opts.TTL,
diff --git a/go.mod b/go.mod
index a11d213a..881d6470 100644
--- a/go.mod
+++ b/go.mod
@@ -9,13 +9,14 @@ require (
 	github.com/cespare/xxhash v1.1.0
 	github.com/dgraph-io/badger/v3 v3.2011.1
 	github.com/emirpasic/gods v1.18.1
-	github.com/envoyproxy/protoc-gen-validate v0.1.0
+	github.com/envoyproxy/protoc-gen-validate v0.9.1
 	github.com/go-chi/chi/v5 v5.0.7
 	github.com/go-resty/resty/v2 v2.7.0
 	github.com/golang/mock v1.6.0
 	github.com/google/go-cmp v0.5.9
 	github.com/google/uuid v1.3.0
-	github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
+	github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus v1.0.0-rc.0
+	github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.0.0-rc.5
 	github.com/grpc-ecosystem/grpc-gateway/v2 v2.14.0
 	github.com/hashicorp/golang-lru v0.5.4
 	github.com/oklog/run v1.1.0
@@ -27,21 +28,31 @@ require (
 	github.com/spf13/cobra v1.6.1
 	github.com/spf13/pflag v1.0.5
 	github.com/spf13/viper v1.12.0
-	github.com/stretchr/testify v1.8.0
+	github.com/stretchr/testify v1.8.2
 	github.com/xhit/go-str2duration/v2 v2.0.0
 	github.com/zenizh/go-capturer v0.0.0-20211219060012-52ea6c8fed04
 	go.etcd.io/etcd/client/v3 v3.5.6
 	go.etcd.io/etcd/server/v3 v3.5.6
 	go.uber.org/multierr v1.8.0
 	golang.org/x/exp v0.0.0-20220602145555-4a0574d9293f
-	golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4
-	google.golang.org/genproto v0.0.0-20221118155620-16455021b5e6
-	google.golang.org/grpc v1.51.0
-	google.golang.org/protobuf v1.28.1
+	golang.org/x/mod v0.8.0
+	google.golang.org/genproto v0.0.0-20230110181048-76db0878b65f
+	google.golang.org/grpc v1.53.0
+	google.golang.org/protobuf v1.30.0
 	sigs.k8s.io/yaml v1.3.0
 )
 
-require github.com/klauspost/compress v1.15.6 // indirect
+require (
+	github.com/go-ole/go-ole v1.2.6 // indirect
+	github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 // indirect
+	github.com/klauspost/compress v1.15.6 // indirect
+	github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect
+	github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect
+	github.com/shoenig/go-m1cpu v0.1.4 // indirect
+	github.com/tklauser/go-sysconf v0.3.11 // indirect
+	github.com/tklauser/numcpus v0.6.0 // indirect
+	github.com/yusufpapurcu/wmi v1.2.2 // indirect
+)
 
 require (
 	github.com/axiomhq/hyperloglog v0.0.0-20191112132149-a4c4c47bc57f // indirect
@@ -56,7 +67,7 @@ require (
 	github.com/blugelabs/ice v1.0.0 // indirect
 	github.com/caio/go-tdigest v3.1.0+incompatible // indirect
 	github.com/cenkalti/backoff/v4 v4.2.0 // indirect
-	github.com/cespare/xxhash/v2 v2.1.2 // indirect
+	github.com/cespare/xxhash/v2 v2.2.0 // indirect
 	github.com/coreos/go-semver v0.3.0 // indirect
 	github.com/coreos/go-systemd/v22 v22.5.0 // indirect
 	github.com/davecgh/go-spew v1.1.1 // indirect
@@ -94,12 +105,13 @@ require (
 	github.com/prometheus/common v0.37.0 // indirect
 	github.com/prometheus/procfs v0.8.0 // indirect
 	github.com/robfig/cron/v3 v3.0.1
+	github.com/shirou/gopsutil/v3 v3.23.3
 	github.com/sirupsen/logrus v1.9.0 // indirect
 	github.com/soheilhy/cmux v0.1.5 // indirect
-	github.com/spf13/afero v1.8.2 // indirect
+	github.com/spf13/afero v1.9.2 // indirect
 	github.com/spf13/cast v1.5.0 // indirect
 	github.com/spf13/jwalterweatherman v1.1.0 // indirect
-	github.com/stretchr/objx v0.4.0 // indirect
+	github.com/stretchr/objx v0.5.0 // indirect
 	github.com/subosito/gotenv v1.3.0 // indirect
 	github.com/tmc/grpc-websocket-proxy v0.0.0-20220101234140-673ab2c3ae75 // indirect
 	github.com/xiang90/probing v0.0.0-20221125231312-a49e3df8f510 // indirect
@@ -121,9 +133,9 @@ require (
 	go.uber.org/atomic v1.10.0 // indirect
 	go.uber.org/zap v1.23.0
 	golang.org/x/crypto v0.3.0 // indirect
-	golang.org/x/net v0.7.0 // indirect
-	golang.org/x/sys v0.5.0 // indirect
-	golang.org/x/text v0.7.0 // indirect
+	golang.org/x/net v0.8.0 // indirect
+	golang.org/x/sys v0.6.0 // indirect
+	golang.org/x/text v0.8.0 // indirect
 	golang.org/x/time v0.2.0 // indirect
 	gopkg.in/ini.v1 v1.66.4 // indirect
 	gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect
diff --git a/go.sum b/go.sum
index ef0a79f4..4029823a 100644
--- a/go.sum
+++ b/go.sum
@@ -24,7 +24,8 @@ cloud.google.com/go/bigquery v1.4.0/go.mod h1:S8dzgnTigyfTmLBfrtrhyYhwRxG72rYxvf
 cloud.google.com/go/bigquery v1.5.0/go.mod h1:snEHRnqQbz117VIFhE8bmtwIDY80NLUZUMb4Nv6dBIg=
 cloud.google.com/go/bigquery v1.7.0/go.mod h1://okPTzCYNXSlb24MZs83e2Do+h+VXtc4gLoIoXIAPc=
 cloud.google.com/go/bigquery v1.8.0/go.mod h1:J5hqkt3O0uAFnINi6JXValWIb1v0goeZM77hZzJN/fQ=
-cloud.google.com/go/compute v1.6.1 h1:2sMmt8prCn7DPaG4Pmh0N3Inmc8cT8ae5k1M6VJ9Wqc=
+cloud.google.com/go/compute v1.15.1 h1:7UGq3QknM33pw5xATlpzeoomNxsacIVvTqTTvbfajmE=
+cloud.google.com/go/compute/metadata v0.2.3 h1:mg4jlk7mCAj6xXp9UJ4fjI9VUI5rubuGBW5aJ7UnBMY=
 cloud.google.com/go/datastore v1.0.0/go.mod h1:LXYbyblFSglQ5pkeyhO+Qmw7ukd3C+pD7TKLgZqpHYE=
 cloud.google.com/go/datastore v1.1.0/go.mod h1:umbIZjpQpHh4hmRpGhH4tLFup+FVzqBi1b3c64qFpCk=
 cloud.google.com/go/firestore v1.1.0/go.mod h1:ulACoGHTpvq5r8rxGJ4ddJZBZqakUQqClKRT5SZwBmk=
@@ -94,8 +95,9 @@ github.com/certifi/gocertifi v0.0.0-20200922220541-2c3bb06c6054/go.mod h1:sGbDF6
 github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko=
 github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
 github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
-github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE=
 github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
+github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
+github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
 github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI=
 github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI=
 github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU=
@@ -154,8 +156,9 @@ github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.m
 github.com/envoyproxy/go-control-plane v0.9.9-0.20210217033140-668b12f5399d/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk=
 github.com/envoyproxy/go-control-plane v0.9.9-0.20210512163311-63b5d3c536b0/go.mod h1:hliV/p42l8fGbc6Y9bQ70uLwIvmJyVE5k4iMKlh8wCQ=
 github.com/envoyproxy/go-control-plane v0.9.10-0.20210907150352-cf90f659a021/go.mod h1:AFq3mo9L8Lqqiid3OhADV3RfLJnjiw63cSpi+fDTRC0=
-github.com/envoyproxy/protoc-gen-validate v0.1.0 h1:EQciDnbrYxy13PgWoY8AqoxGiPrpgBZ1R8UNe3ddc+A=
 github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
+github.com/envoyproxy/protoc-gen-validate v0.9.1 h1:PS7VIOgmSVhWUEeZwTe7z7zouA22Cr590PzXKbZHOVY=
+github.com/envoyproxy/protoc-gen-validate v0.9.1/go.mod h1:OKNgG7TCp5pF4d6XftA0++PMirau2/yoOwVac3AbF2w=
 github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=
 github.com/fogleman/gg v1.2.1-0.20190220221249-0403632d5b90/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k=
 github.com/frankban/quicktest v1.14.3 h1:FJKSZTDHjyhriyC81FLQ0LY93eSai0ZyR/ZIkd3ZUKE=
@@ -183,6 +186,8 @@ github.com/go-logr/logr v1.2.3 h1:2DntVwHkVopvECVRSlL5PSo9eG+cAkDCuckLubN+rq0=
 github.com/go-logr/logr v1.2.3/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
 github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
 github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
+github.com/go-ole/go-ole v1.2.6 h1:/Fpf6oFPoeFik9ty7siob0G6Ke8QvQEuVcuChpwXzpY=
+github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0=
 github.com/go-resty/resty/v2 v2.7.0 h1:me+K9p3uhSmXtrBZ4k9jcEAfJmuC8IivWHwaLZwPrFY=
 github.com/go-resty/resty/v2 v2.7.0/go.mod h1:9PWDzw47qPphMRFfhsyk0NnSgvluHcljSMVIq3w7q0I=
 github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
@@ -282,6 +287,10 @@ github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/ad
 github.com/grpc-ecosystem/go-grpc-middleware v1.0.0/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs=
 github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 h1:+9834+KizmvFV7pXQGSXQTsaWhq2GjuNUt0aUU0YBYw=
 github.com/grpc-ecosystem/go-grpc-middleware v1.3.0/go.mod h1:z0ButlSOZa5vEBq9m2m2hlwIgKw+rp3sdCBRoJY+30Y=
+github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus v1.0.0-rc.0 h1:mdLirNAJBxnGgyB6pjZLcs6ue/6eZGBui6gXspfq4ks=
+github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus v1.0.0-rc.0/go.mod h1:kdXbOySqcQeTxiqglW7aahTmWZy3Pgi6SYL36yvKeyA=
+github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.0.0-rc.5 h1:3IZOAnD058zZllQTZNBioTlrzrBG/IjpiZ133IEtusM=
+github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.0.0-rc.5/go.mod h1:xbKERva94Pw2cPen0s79J3uXmGzbbpDYFBFDlZ4mV/w=
 github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 h1:Ovs26xHkKqVztRpIrF/92BcuyuQ/YW4NSIpoGtfXNho=
 github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk=
 github.com/grpc-ecosystem/grpc-gateway v1.9.0/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY=
@@ -355,6 +364,8 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
 github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
 github.com/leesper/go_rng v0.0.0-20190531154944-a612b043e353 h1:X/79QL0b4YJVO5+OsPH9rF2u428CIrGL/jLmPsoOQQ4=
 github.com/leesper/go_rng v0.0.0-20190531154944-a612b043e353/go.mod h1:N0SVk0uhy+E1PZ3C9ctsPRlvOPAFPkCNlcPBDkt0N3U=
+github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 h1:6E+4a0GO5zZEnZ81pIr0yLvtUWk2if982qA3F3QD6H4=
+github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0/go.mod h1:zJYVVT2jmtg6P3p1VtQj7WsuWi/y4VnjVBn7F8KPB3I=
 github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
 github.com/magiconair/properties v1.8.1/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
 github.com/magiconair/properties v1.8.6 h1:5ibWZ6iY0NctNGWo87LalDlEZ6R41TqbbDamhfG/Qzo=
@@ -408,6 +419,8 @@ github.com/pkg/sftp v1.13.1/go.mod h1:3HaPG6Dq1ILlpPZRO0HVMrsydcdLt6HRDccSgb87qR
 github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
 github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
 github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndrE9hABlRI=
+github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c h1:ncq/mPwQF4JjgDlrVEn3C11VoGHZN7m8qihwgMEtzYw=
+github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c/go.mod h1:OmDBASR4679mdNQnz2pUhc2G8CO2JrUAVFDRBDP/hJE=
 github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw=
 github.com/prometheus/client_golang v0.9.3/go.mod h1:/TN21ttK/J9q6uSwhBd54HahCDft0ttaMvbicHlPoso=
 github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo=
@@ -454,6 +467,12 @@ github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQD
 github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
 github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts=
 github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc=
+github.com/shirou/gopsutil/v3 v3.23.3 h1:Syt5vVZXUDXPEXpIBt5ziWsJ4LdSAAxF4l/xZeQgSEE=
+github.com/shirou/gopsutil/v3 v3.23.3/go.mod h1:lSBNN6t3+D6W5e5nXTxc8KIMMVxAcS+6IJlffjRRlMU=
+github.com/shoenig/go-m1cpu v0.1.4 h1:SZPIgRM2sEF9NJy50mRHu9PKGwxyyTTJIWvCtgVbozs=
+github.com/shoenig/go-m1cpu v0.1.4/go.mod h1:Wwvst4LR89UxjeFtLRMrpgRiyY4xPsejnVZym39dbAQ=
+github.com/shoenig/test v0.6.3 h1:GVXWJFk9PiOjN0KoJ7VrJGH6uLPnqxR7/fe3HUPfE0c=
+github.com/shoenig/test v0.6.3/go.mod h1:byHiCGXqrVaflBLAMq/srcZIHynQPQgeyvkvXnjqq0k=
 github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc=
 github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
 github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
@@ -470,8 +489,8 @@ github.com/soheilhy/cmux v0.1.5/go.mod h1:T7TcVDs9LWfQgPlPsdngu6I6QIoyIFZDDC6sNE
 github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72 h1:qLC7fQah7D6K1B0ujays3HV9gkFtllcxhzImRR7ArPQ=
 github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
 github.com/spf13/afero v1.1.2/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B0CQ=
-github.com/spf13/afero v1.8.2 h1:xehSyVa0YnHWsJ49JFljMpg1HX19V6NDZ1fkm1Xznbo=
-github.com/spf13/afero v1.8.2/go.mod h1:CtAatgMJh6bJEIs48Ay/FOnkljP3WeGUG0MC1RfAqwo=
+github.com/spf13/afero v1.9.2 h1:j49Hj62F0n+DaZ1dDCvhABaPNSGNkt32oRFxI33IEMw=
+github.com/spf13/afero v1.9.2/go.mod h1:iUV7ddyEEZPO5gA3zD4fJt6iStLlL+Lg4m2cihcDf8Y=
 github.com/spf13/cast v1.3.0/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE=
 github.com/spf13/cast v1.5.0 h1:rj3WzYc11XZaIZMPKmwP96zkFEnnAmV8s6XbB2aY32w=
 github.com/spf13/cast v1.5.0/go.mod h1:SpXXQ5YoyJw6s3/6cMTQuxvgRl3PCJiyaX9p6b155UU=
@@ -491,8 +510,9 @@ github.com/spf13/viper v1.12.0 h1:CZ7eSOd3kZoaYDLbXnmzgQI5RlciuXBMA+18HwHRfZQ=
 github.com/spf13/viper v1.12.0/go.mod h1:b6COn30jlNxbm/V2IqWiNWkJ+vZNiMNksliPCiuKtSI=
 github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
 github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
-github.com/stretchr/objx v0.4.0 h1:M2gUjqZET1qApGOWNSnZ49BAIMX4F/1plDv3+l31EJ4=
 github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
+github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c=
+github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
 github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
 github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
 github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
@@ -500,11 +520,16 @@ github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5
 github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
 github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
 github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
-github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk=
 github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
+github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ8=
+github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
 github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw=
 github.com/subosito/gotenv v1.3.0 h1:mjC+YW8QpAdXibNi+vNWgzmgBH4+5l5dCXv8cNysBLI=
 github.com/subosito/gotenv v1.3.0/go.mod h1:YzJjq/33h7nrwdY+iHMhEOEEbW0ovIz0tB6t6PwAXzs=
+github.com/tklauser/go-sysconf v0.3.11 h1:89WgdJhk5SNwJfu+GKyYveZ4IaJ7xAkecBo+KdJV0CM=
+github.com/tklauser/go-sysconf v0.3.11/go.mod h1:GqXfhXY3kiPa0nAXPDIQIWzJbMCB7AmcWpGR8lSZfqI=
+github.com/tklauser/numcpus v0.6.0 h1:kebhY2Qt+3U6RNK7UqpYNA+tJ23IBEGKkB7JQBfDYms=
+github.com/tklauser/numcpus v0.6.0/go.mod h1:FEZLMke0lhOUG6w2JadTzp0a+Nl8PF/GFkQ5UVIcaL4=
 github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
 github.com/tmc/grpc-websocket-proxy v0.0.0-20201229170055-e5319fda7802/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
 github.com/tmc/grpc-websocket-proxy v0.0.0-20220101234140-673ab2c3ae75 h1:6fotK7otjonDflCTK0BCfls4SPy3NcCVb5dqqmbRknE=
@@ -522,6 +547,8 @@ github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9de
 github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
 github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
 github.com/yuin/goldmark v1.4.0/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
+github.com/yusufpapurcu/wmi v1.2.2 h1:KBNDSne4vP5mbSWnJbO+51IMOXJB67QiYCSBrubbPRg=
+github.com/yusufpapurcu/wmi v1.2.2/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0=
 github.com/zenizh/go-capturer v0.0.0-20211219060012-52ea6c8fed04 h1:qXafrlZL1WsJW5OokjraLLRURHiw0OzKHD/RNdspp4w=
 github.com/zenizh/go-capturer v0.0.0-20211219060012-52ea6c8fed04/go.mod h1:FiwNQxz6hGoNFBC4nIx+CxZhI3nne5RmIOlT/MXcSD4=
 github.com/zinclabs/bluge v1.1.5 h1:QJhkweeBVRaaEPdaRptkYOJDLCeyo+JBgc2hNyFehAM=
@@ -647,8 +674,8 @@ golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
 golang.org/x/mod v0.4.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
 golang.org/x/mod v0.4.1/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
 golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
-golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4 h1:6zppjxzCulZykYSLyVDYbneBfbaBIQPYMevg0bEwv2s=
-golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
+golang.org/x/mod v0.8.0 h1:LUYupSeNrTNCGzR/hVBk2NHZO4hXcVaW1k4Qx7rjPx8=
+golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
 golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
 golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
 golang.org/x/net v0.0.0-20181023162649-9b4f9f5ad519/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
@@ -695,8 +722,8 @@ golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qx
 golang.org/x/net v0.0.0-20211123203042-d83791d6bcd9/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
 golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
 golang.org/x/net v0.0.0-20220225172249-27dd8689420f/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
-golang.org/x/net v0.7.0 h1:rJrUqqhjsgNp7KqAIc25s9pZnjU7TUcSY7HcVZjdn1g=
-golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
+golang.org/x/net v0.8.0 h1:Zrh2ngAOFYneWTAIAPethzeaQLuHwhuBkuV6ZiRnUaQ=
+golang.org/x/net v0.8.0/go.mod h1:QVkue5JL9kW//ek3r6jTKnTFis1tRmNAW2P1shuFdJc=
 golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
 golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
 golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
@@ -709,7 +736,7 @@ golang.org/x/oauth2 v0.0.0-20210218202405-ba52d332ba99/go.mod h1:KelEdhl1UZF7XfJ
 golang.org/x/oauth2 v0.0.0-20210514164344-f6687ab2804c/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A=
 golang.org/x/oauth2 v0.0.0-20211104180415-d3ed0bb246c8/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A=
 golang.org/x/oauth2 v0.0.0-20220223155221-ee480838109b/go.mod h1:DAh4E804XQdzx2j+YRIaUnCqCV2RuMz24cGBJ5QYIrc=
-golang.org/x/oauth2 v0.2.0 h1:GtQkldQ9m7yvzCL1V+LrYow3Khe0eJH0w7RbX/VbaIU=
+golang.org/x/oauth2 v0.4.0 h1:NF0gk8LVPg1Ml7SSbGyySuoxdsXitj7TvgvuRxIMc/M=
 golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
 golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
 golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
@@ -738,6 +765,7 @@ golang.org/x/sys v0.0.0-20190606165138-5da285871e9c/go.mod h1:h1NjWce9XRLGQEsW7w
 golang.org/x/sys v0.0.0-20190624142023-c5567b49c5d0/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20190726091711-fc99dfbffb4e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20191001151750-bb3f8db39f24/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20191204072324-ce4227a45e2e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
@@ -763,6 +791,7 @@ golang.org/x/sys v0.0.0-20200923182605-d9f96fdee20d/go.mod h1:h1NjWce9XRLGQEsW7w
 golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20201201145000-ef89a241ccb3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20201204225414-ed752295db88/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20210104204734-6f8348627aad/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
@@ -781,8 +810,9 @@ golang.org/x/sys v0.0.0-20220412211240-33da011f77ad/go.mod h1:oPkhp1MJrh7nUepCBc
 golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
 golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
 golang.org/x/sys v0.0.0-20221010170243-090e33056c14/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
-golang.org/x/sys v0.5.0 h1:MUK/U/4lj1t1oPg0HfuXDN/Z1wv31ZJ/YcPiGccS4DU=
-golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.2.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.6.0 h1:MVltZSvRTcU2ljQOhs94SXPftV6DCNnZViHeQps87pQ=
+golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
 golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
 golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
 golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
@@ -794,8 +824,8 @@ golang.org/x/text v0.3.4/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
 golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
 golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
 golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
-golang.org/x/text v0.7.0 h1:4BRB4x83lYWy72KwLD/qYDuTu7q9PjSagHvijDw7cLo=
-golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
+golang.org/x/text v0.8.0 h1:57P1ETyNKtuIjB4SRd15iJxuhj8Gc416Y78H3qgMh68=
+golang.org/x/text v0.8.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
 golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
 golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
 golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
@@ -936,8 +966,8 @@ google.golang.org/genproto v0.0.0-20210108203827-ffc7fda8c3d7/go.mod h1:FWY/as6D
 google.golang.org/genproto v0.0.0-20210226172003-ab064af71705/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
 google.golang.org/genproto v0.0.0-20210602131652-f16073e35f0c/go.mod h1:UODoCrxHCcBojKKwX1terBiRUaqAsFqJiF615XL43r0=
 google.golang.org/genproto v0.0.0-20211118181313-81c1377c94b1/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc=
-google.golang.org/genproto v0.0.0-20221118155620-16455021b5e6 h1:a2S6M0+660BgMNl++4JPlcAO/CjkqYItDEZwkoDQK7c=
-google.golang.org/genproto v0.0.0-20221118155620-16455021b5e6/go.mod h1:rZS5c/ZVYMaOGBfO68GWtjOw/eLaZM1X6iVtgjZ+EWg=
+google.golang.org/genproto v0.0.0-20230110181048-76db0878b65f h1:BWUVssLB0HVOSY78gIdvk1dTVYtT1y8SBWtPYuTJ/6w=
+google.golang.org/genproto v0.0.0-20230110181048-76db0878b65f/go.mod h1:RGgjbofJ8xD9Sq1VVhDM1Vok1vRONV+rg+CjzG4SZKM=
 google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
 google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38=
 google.golang.org/grpc v1.21.1/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM=
@@ -961,8 +991,8 @@ google.golang.org/grpc v1.38.0/go.mod h1:NREThFqKR1f3iQ6oBuvc5LadQuXVGo9rkm5ZGrQ
 google.golang.org/grpc v1.40.0/go.mod h1:ogyxbiOoUXAkP+4+xa6PZSE9DZgIHtSpzjDTB9KAK34=
 google.golang.org/grpc v1.41.0/go.mod h1:U3l9uK9J0sini8mHphKoXyaqDA/8VyGnDee1zzIUK6k=
 google.golang.org/grpc v1.42.0/go.mod h1:k+4IHHFw41K8+bbowsex27ge2rCb65oeWqe4jJ590SU=
-google.golang.org/grpc v1.51.0 h1:E1eGv1FTqoLIdnBCZufiSHgKjlqG6fKFf6pPWtMTh8U=
-google.golang.org/grpc v1.51.0/go.mod h1:wgNDFcnuBGmxLKI/qn4T+m5BtEBYXJPvibbUPsAIPww=
+google.golang.org/grpc v1.53.0 h1:LAv2ds7cmFV/XTS3XG1NneeENYrXGmorPxsBbptIjNc=
+google.golang.org/grpc v1.53.0/go.mod h1:OnIrk0ipVdj4N5d9IUoFUx72/VlD7+jUsHwZgwSMQpw=
 google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
 google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
 google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=
@@ -976,8 +1006,8 @@ google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlba
 google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
 google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
 google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
-google.golang.org/protobuf v1.28.1 h1:d0NfwRgPtno5B1Wa6L2DAG+KivqkdutMf1UhdNx175w=
-google.golang.org/protobuf v1.28.1/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
+google.golang.org/protobuf v1.30.0 h1:kPPoIgf3TsEvrm0PFe15JQ+570QVxYzEvvHqChK+cng=
+google.golang.org/protobuf v1.30.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
 gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
 gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
 gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
diff --git a/pkg/index/index.go b/pkg/index/index.go
index 95483d80..024811ab 100644
--- a/pkg/index/index.go
+++ b/pkg/index/index.go
@@ -207,6 +207,7 @@ type Store interface {
 	io.Closer
 	Writer
 	Searcher
+	SizeOnDisk() int64
 }
 
 // GetSearcher returns a searcher associated with input index rule type.
diff --git a/pkg/index/inverted/inverted.go b/pkg/index/inverted/inverted.go
index 9574f023..05aa471c 100644
--- a/pkg/index/inverted/inverted.go
+++ b/pkg/index/inverted/inverted.go
@@ -21,6 +21,8 @@ package inverted
 import (
 	"bytes"
 	"context"
+	"errors"
+	"io"
 	"log"
 	"math"
 	"time"
@@ -160,7 +162,6 @@ func (s *store) Iterator(fieldKey index.FieldKey, termRange index.RangeOpts, ord
 	if err != nil {
 		return nil, err
 	}
-	defer reader.Close()
 	fk := fieldKey.MarshalIndexRule()
 	var query bluge.Query
 	shouldDecodeTerm := true
@@ -193,7 +194,7 @@ func (s *store) Iterator(fieldKey index.FieldKey, termRange index.RangeOpts, ord
 	if err != nil {
 		return nil, err
 	}
-	result := newBlugeMatchIterator(documentMatchIterator, fk, shouldDecodeTerm)
+	result := newBlugeMatchIterator(documentMatchIterator, fk, shouldDecodeTerm, reader)
 	return &result, nil
 }
 
@@ -206,7 +207,6 @@ func (s *store) MatchTerms(field index.Field) (list posting.List, err error) {
 	if err != nil {
 		return nil, err
 	}
-	defer reader.Close()
 	fk := field.Key.MarshalIndexRule()
 	var query bluge.Query
 	shouldDecodeTerm := true
@@ -222,12 +222,14 @@ func (s *store) MatchTerms(field index.Field) (list posting.List, err error) {
 	if err != nil {
 		return nil, err
 	}
-	iter := newBlugeMatchIterator(documentMatchIterator, fk, shouldDecodeTerm)
+	iter := newBlugeMatchIterator(documentMatchIterator, fk, shouldDecodeTerm, reader)
+	defer func() {
+		err = multierr.Append(err, iter.Close())
+	}()
 	list = roaring.NewPostingList()
 	for iter.Next() {
 		err = multierr.Append(err, list.Union(iter.Val().Value))
 	}
-	err = multierr.Append(err, iter.Close())
 	return list, err
 }
 
@@ -239,7 +241,6 @@ func (s *store) Match(fieldKey index.FieldKey, matches []string) (posting.List,
 	if err != nil {
 		return nil, err
 	}
-	defer reader.Close()
 	analyzer := analyzers[fieldKey.Analyzer]
 	fk := fieldKey.MarshalIndexRule()
 	query := bluge.NewBooleanQuery()
@@ -252,12 +253,14 @@ func (s *store) Match(fieldKey index.FieldKey, matches []string) (posting.List,
 	if err != nil {
 		return nil, err
 	}
-	iter := newBlugeMatchIterator(documentMatchIterator, fk, false)
+	iter := newBlugeMatchIterator(documentMatchIterator, fk, false, reader)
+	defer func() {
+		err = multierr.Append(err, iter.Close())
+	}()
 	list := roaring.NewPostingList()
 	for iter.Next() {
 		err = multierr.Append(err, list.Union(iter.Val().Value))
 	}
-	err = multierr.Append(err, iter.Close())
 	return list, err
 }
 
@@ -274,6 +277,11 @@ func (s *store) Range(fieldKey index.FieldKey, opts index.RangeOpts) (list posti
 	return
 }
 
+func (s *store) SizeOnDisk() int64 {
+	_, bytes := s.writer.DirectoryStats()
+	return int64(bytes)
+}
+
 func (s *store) run() {
 	go func() {
 		defer s.closer.Done()
@@ -352,6 +360,7 @@ func (s *store) flush() {
 type blugeMatchIterator struct {
 	delegated        search.DocumentMatchIterator
 	err              error
+	closer           io.Closer
 	current          *index.PostingValue
 	agg              *index.PostingValue
 	fieldKey         string
@@ -359,11 +368,12 @@ type blugeMatchIterator struct {
 	closed           bool
 }
 
-func newBlugeMatchIterator(delegated search.DocumentMatchIterator, fieldKey string, shouldDecodeTerm bool) blugeMatchIterator {
+func newBlugeMatchIterator(delegated search.DocumentMatchIterator, fieldKey string, shouldDecodeTerm bool, closer io.Closer) blugeMatchIterator {
 	return blugeMatchIterator{
 		delegated:        delegated,
 		fieldKey:         fieldKey,
 		shouldDecodeTerm: shouldDecodeTerm,
+		closer:           closer,
 	}
 }
 
@@ -450,5 +460,5 @@ func (bmi *blugeMatchIterator) Val() *index.PostingValue {
 
 func (bmi *blugeMatchIterator) Close() error {
 	bmi.closed = true
-	return bmi.err
+	return errors.Join(bmi.err, bmi.closer.Close())
 }
diff --git a/pkg/index/lsm/lsm.go b/pkg/index/lsm/lsm.go
index b800aaf2..fa0e3527 100644
--- a/pkg/index/lsm/lsm.go
+++ b/pkg/index/lsm/lsm.go
@@ -47,6 +47,10 @@ func (s *store) Write(fields []index.Field, itemID common.ItemID) (err error) {
 	return err
 }
 
+func (s *store) SizeOnDisk() int64 {
+	return s.lsm.SizeOnDisk()
+}
+
 // StoreOpts wraps options to create the lsm repository.
 type StoreOpts struct {
 	Logger       *logger.Logger
diff --git a/pkg/meter/meter.go b/pkg/meter/meter.go
index 86fa1fa4..75132327 100644
--- a/pkg/meter/meter.go
+++ b/pkg/meter/meter.go
@@ -26,6 +26,9 @@ type (
 	LabelPairs map[string]string
 )
 
+// DefBuckets is the default buckets for histograms.
+var DefBuckets = Buckets{.005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5, 10}
+
 // Merge merges the given label pairs with the current label pairs.
 func (p LabelPairs) Merge(other LabelPairs) LabelPairs {
 	result := make(LabelPairs, len(p)+len(other))
@@ -53,49 +56,53 @@ type Scope interface {
 	GetLabels() LabelPairs
 }
 
+// Instrument is the interface for a metric.
+type Instrument interface {
+	// Delete the metric with the given label values.
+	Delete(labelValues ...string) bool
+}
+
 // Counter is a metric that represents a single numerical value that only ever goes up.
 type Counter interface {
+	Instrument
 	Inc(delta float64, labelValues ...string)
 }
 
 // Gauge is a metric that represents a single numerical value that can arbitrarily go up and down.
 type Gauge interface {
+	Instrument
 	Set(value float64, labelValues ...string)
 	Add(delta float64, labelValues ...string)
 }
 
 // Histogram is a metric that represents the statistical distribution of a set of values.
 type Histogram interface {
+	Instrument
 	Observe(value float64, labelValues ...string)
 }
 
-type noopCounter struct{}
-
-func (noopCounter) Inc(_ float64, _ ...string) {}
-
-type noopGauge struct{}
-
-func (noopGauge) Set(_ float64, _ ...string) {}
-func (noopGauge) Add(_ float64, _ ...string) {}
-
-type noopHistogram struct{}
+type noopInstrument struct{}
 
-func (noopHistogram) Observe(_ float64, _ ...string) {}
+func (noopInstrument) Inc(_ float64, _ ...string)     {}
+func (noopInstrument) Set(_ float64, _ ...string)     {}
+func (noopInstrument) Add(_ float64, _ ...string)     {}
+func (noopInstrument) Observe(_ float64, _ ...string) {}
+func (noopInstrument) Delete(_ ...string) bool        { return false }
 
 // NoopProvider is a no-op implementation of the Provider interface.
 type NoopProvider struct{}
 
 // Counter returns a no-op implementation of the Counter interface.
 func (NoopProvider) Counter(_ string, _ ...string) Counter {
-	return noopCounter{}
+	return noopInstrument{}
 }
 
 // Gauge returns a no-op implementation of the Gauge interface.
 func (NoopProvider) Gauge(_ string, _ ...string) Gauge {
-	return noopGauge{}
+	return noopInstrument{}
 }
 
 // Histogram returns a no-op implementation of the Histogram interface.
 func (NoopProvider) Histogram(_ string, _ Buckets, _ ...string) Histogram {
-	return noopHistogram{}
+	return noopInstrument{}
 }
diff --git a/pkg/meter/prom/instruments.go b/pkg/meter/prom/instruments.go
index 6ea434c4..9f29238c 100644
--- a/pkg/meter/prom/instruments.go
+++ b/pkg/meter/prom/instruments.go
@@ -28,6 +28,10 @@ func (c *counter) Inc(delta float64, labelValues ...string) {
 	c.counter.WithLabelValues(labelValues...).Add(delta)
 }
 
+func (c *counter) Delete(labelValues ...string) bool {
+	return c.counter.DeleteLabelValues(labelValues...)
+}
+
 type gauge struct {
 	gauge *prometheus.GaugeVec
 }
@@ -40,6 +44,10 @@ func (g *gauge) Add(delta float64, labelValues ...string) {
 	g.gauge.WithLabelValues(labelValues...).Add(delta)
 }
 
+func (g *gauge) Delete(labelValues ...string) bool {
+	return g.gauge.DeleteLabelValues(labelValues...)
+}
+
 type histogram struct {
 	histogram *prometheus.HistogramVec
 }
@@ -47,3 +55,7 @@ type histogram struct {
 func (h *histogram) Observe(value float64, labelValues ...string) {
 	h.histogram.WithLabelValues(labelValues...).Observe(value)
 }
+
+func (h *histogram) Delete(labelValues ...string) bool {
+	return h.histogram.DeleteLabelValues(labelValues...)
+}
diff --git a/pkg/meter/prom/prom.go b/pkg/meter/prom/prom.go
index 50583732..0ca74abe 100644
--- a/pkg/meter/prom/prom.go
+++ b/pkg/meter/prom/prom.go
@@ -21,6 +21,7 @@ import (
 	"unsafe"
 
 	"github.com/prometheus/client_golang/prometheus"
+	"github.com/prometheus/client_golang/prometheus/promauto"
 
 	"github.com/apache/skywalking-banyandb/pkg/meter"
 )
@@ -28,21 +29,23 @@ import (
 // Provider is a prometheus provider.
 type provider struct {
 	scope meter.Scope
+	reg   prometheus.Registerer
 }
 
 // NewProvider creates a new prometheus provider with given meter.Scope.
-func NewProvider(scope meter.Scope) meter.Provider {
+func NewProvider(scope meter.Scope, reg prometheus.Registerer) meter.Provider {
 	return &provider{
 		scope: scope,
+		reg:   reg,
 	}
 }
 
 // Counter returns a prometheus counter.
 func (p *provider) Counter(name string, labels ...string) meter.Counter {
 	return &counter{
-		counter: prometheus.NewCounterVec(prometheus.CounterOpts{
-			Name:        name,
-			Help:        name,
+		counter: promauto.With(p.reg).NewCounterVec(prometheus.CounterOpts{
+			Name:        p.scope.GetNamespace() + "_" + name,
+			Help:        p.scope.GetNamespace() + "_" + name,
 			ConstLabels: convertLabels(p.scope.GetLabels()),
 		}, labels),
 	}
@@ -51,9 +54,9 @@ func (p *provider) Counter(name string, labels ...string) meter.Counter {
 // Gauge returns a prometheus gauge.
 func (p *provider) Gauge(name string, labels ...string) meter.Gauge {
 	return &gauge{
-		gauge: prometheus.NewGaugeVec(prometheus.GaugeOpts{
-			Name:        name,
-			Help:        name,
+		gauge: promauto.With(p.reg).NewGaugeVec(prometheus.GaugeOpts{
+			Name:        p.scope.GetNamespace() + "_" + name,
+			Help:        p.scope.GetNamespace() + "_" + name,
 			ConstLabels: convertLabels(p.scope.GetLabels()),
 		}, labels),
 	}
@@ -62,9 +65,9 @@ func (p *provider) Gauge(name string, labels ...string) meter.Gauge {
 // Histogram returns a prometheus histogram.
 func (p *provider) Histogram(name string, buckets meter.Buckets, labels ...string) meter.Histogram {
 	return &histogram{
-		histogram: prometheus.NewHistogramVec(prometheus.HistogramOpts{
-			Name:        name,
-			Help:        name,
+		histogram: promauto.With(p.reg).NewHistogramVec(prometheus.HistogramOpts{
+			Name:        p.scope.GetNamespace() + "_" + name,
+			Help:        p.scope.GetNamespace() + "_" + name,
 			ConstLabels: convertLabels(p.scope.GetLabels()),
 			Buckets:     buckets,
 		}, labels),
diff --git a/pkg/timestamp/scheduler.go b/pkg/timestamp/scheduler.go
index 0c85e9c6..effadf37 100644
--- a/pkg/timestamp/scheduler.go
+++ b/pkg/timestamp/scheduler.go
@@ -18,6 +18,7 @@
 package timestamp
 
 import (
+	"runtime/debug"
 	"sync"
 	"time"
 
@@ -90,7 +91,7 @@ func (s *Scheduler) Register(name string, options cron.ParseOption, expr string,
 	} else {
 		clock = s.clock
 	}
-	t := newTask(s.l.Named(name), clock, schedule, action)
+	t := newTask(s.l.Named(name), name, clock, schedule, action)
 	s.tasks[name] = t
 	go func() {
 		t.run()
@@ -146,11 +147,13 @@ type task struct {
 	closer   *run.Closer
 	l        *logger.Logger
 	action   SchedulerAction
+	name     string
 }
 
-func newTask(l *logger.Logger, clock clock.Clock, schedule cron.Schedule, action SchedulerAction) *task {
+func newTask(l *logger.Logger, name string, clock clock.Clock, schedule cron.Schedule, action SchedulerAction) *task {
 	return &task{
 		l:        l,
+		name:     name,
 		clock:    clock,
 		schedule: schedule,
 		action:   action,
@@ -161,26 +164,34 @@ func newTask(l *logger.Logger, clock clock.Clock, schedule cron.Schedule, action
 func (t *task) run() {
 	defer t.closer.Done()
 	now := t.clock.Now()
-	t.l.Info().Time("now", now).Msg("start")
+	t.l.Info().Str("name", t.name).Time("now", now).Msg("start")
 	for {
 		next := t.schedule.Next(now)
 		d := next.Sub(now)
 		if e := t.l.Debug(); e.Enabled() {
-			e.Time("now", now).Time("next", next).Dur("dur", d).Msg("schedule to")
+			e.Str("name", t.name).Time("now", now).Time("next", next).Dur("dur", d).Msg("schedule to")
 		}
 		timer := t.clock.Timer(d)
 		select {
 		case now = <-timer.C:
 			if e := t.l.Debug(); e.Enabled() {
-				e.Time("now", now).Msg("wake")
+				e.Str("name", t.name).Time("now", now).Msg("wake")
 			}
-			if !t.action(now, t.l) {
-				t.l.Info().Msg("action stops the task")
+			if !func() (ret bool) {
+				defer func() {
+					if r := recover(); r != nil {
+						t.l.Error().Str("name", t.name).Interface("panic", r).Str("stack", string(debug.Stack())).Msg("panic")
+						ret = true
+					}
+				}()
+				return t.action(now, t.l)
+			}() {
+				t.l.Info().Str("name", t.name).Msg("action stops the task")
 				return
 			}
 		case <-t.closer.CloseNotify():
 			timer.Stop()
-			t.l.Info().Msg("closed")
+			t.l.Info().Str("name", t.name).Msg("closed")
 			return
 		}
 	}