You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ha...@apache.org on 2023/04/18 06:54:31 UTC
[skywalking-banyandb] 01/01: Add several metrics to measure storage sub system.
This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch metrics-stroage
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit cf341f20eb4bbc521fa5904da17da9b1bb1b0387
Author: Gao Hongtao <ha...@gmail.com>
AuthorDate: Tue Apr 18 06:51:59 2023 +0000
Add several metrics to measure storage sub system.
Signed-off-by: Gao Hongtao <ha...@gmail.com>
---
CHANGES.md | 1 +
api/common/id.go | 27 +++++--
banyand/internal/cmd/standalone.go | 4 +-
banyand/kv/badger.go | 10 +++
banyand/kv/kv.go | 3 +
banyand/liaison/grpc/server.go | 33 ++++++++-
banyand/measure/metadata.go | 8 ++-
banyand/observability/collector.go | 34 +++++++++
banyand/observability/meter_noop.go | 14 ++++
banyand/observability/meter_prom.go | 107 ++++++++++++++++++++++++++-
banyand/observability/metric.go | 90 -----------------------
banyand/observability/metrics_system.go | 123 ++++++++++++++++++++++++++++++++
banyand/observability/pprof.go | 4 +-
banyand/stream/metadata.go | 7 +-
banyand/tsdb/block.go | 60 +++++++++++++---
banyand/tsdb/buffer.go | 57 +++++++++------
banyand/tsdb/indexdb.go | 29 ++++++--
banyand/tsdb/scope.go | 4 ++
banyand/tsdb/segment.go | 3 +
banyand/tsdb/seriesdb.go | 16 ++++-
banyand/tsdb/shard.go | 79 ++++++++++++++++++--
banyand/tsdb/tsdb.go | 3 +-
go.mod | 40 +++++++----
go.sum | 74 +++++++++++++------
pkg/index/index.go | 1 +
pkg/index/inverted/inverted.go | 30 +++++---
pkg/index/lsm/lsm.go | 4 ++
pkg/meter/meter.go | 35 +++++----
pkg/meter/prom/instruments.go | 12 ++++
pkg/meter/prom/prom.go | 23 +++---
pkg/timestamp/scheduler.go | 27 ++++---
31 files changed, 732 insertions(+), 230 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index 515a2181..27fdfb0b 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -11,6 +11,7 @@ Release Notes.
- Refactor TopN to support `NULL` group while keeping seriesID from the source measure.
- Add a sharded buffer to TSDB to replace Badger's memtable. Badger KV only provides SST.
- Add a meter system to control the internal metrics.
+- Add multiple metrics for measuring the storage subsystem.
### Chores
diff --git a/api/common/id.go b/api/common/id.go
index 41ae7551..6cc879ca 100644
--- a/api/common/id.go
+++ b/api/common/id.go
@@ -40,8 +40,8 @@ func (s SeriesID) Marshal() []byte {
return convert.Uint64ToBytes(uint64(s))
}
-// PositionKey is a context key to store the module position.
-var PositionKey = contextPositionKey{}
+// positionKey is a context key to store the module position.
+var positionKey = contextPositionKey{}
type contextPositionKey struct{}
@@ -60,21 +60,40 @@ func LabelNames() []string {
return []string{"module", "database", "shard", "seg", "block"}
}
+// ShardLabelNames returns the label names of Position. It is used for shard level metrics.
+func ShardLabelNames() []string {
+ return []string{"module", "database", "shard"}
+}
+
// LabelValues returns the label values of Position.
func (p Position) LabelValues() []string {
return []string{p.Module, p.Database, p.Shard, p.Segment, p.Block}
}
+// ShardLabelValues returns the label values of Position. It is used for shard level metrics.
+func (p Position) ShardLabelValues() []string {
+ return []string{p.Module, p.Database, p.Shard}
+}
+
// SetPosition sets a position returned from fn to attach it to ctx, then return a new context.
func SetPosition(ctx context.Context, fn func(p Position) Position) context.Context {
- val := ctx.Value(PositionKey)
+ val := ctx.Value(positionKey)
var p Position
if val == nil {
p = Position{}
} else {
p = val.(Position)
}
- return context.WithValue(ctx, PositionKey, fn(p))
+ return context.WithValue(ctx, positionKey, fn(p))
+}
+
+// GetPosition returns the position from ctx.
+func GetPosition(ctx context.Context) Position {
+ val := ctx.Value(positionKey)
+ if val == nil {
+ return Position{}
+ }
+ return val.(Position)
}
// Error wraps a error msg.
diff --git a/banyand/internal/cmd/standalone.go b/banyand/internal/cmd/standalone.go
index 795a9dc6..215dcb2f 100644
--- a/banyand/internal/cmd/standalone.go
+++ b/banyand/internal/cmd/standalone.go
@@ -35,7 +35,6 @@ import (
"github.com/apache/skywalking-banyandb/banyand/stream"
"github.com/apache/skywalking-banyandb/pkg/config"
"github.com/apache/skywalking-banyandb/pkg/logger"
- "github.com/apache/skywalking-banyandb/pkg/meter"
"github.com/apache/skywalking-banyandb/pkg/run"
"github.com/apache/skywalking-banyandb/pkg/signal"
"github.com/apache/skywalking-banyandb/pkg/version"
@@ -90,8 +89,7 @@ func newStandaloneCmd() *cobra.Command {
httpServer,
profSvc,
}
- _, noMetricProvider := observability.NewMeterProvider(observability.RootScope).(meter.NoopProvider)
- if !noMetricProvider {
+ if metricSvc != nil {
units = append(units, metricSvc)
}
// Meta the run Group units.
diff --git a/banyand/kv/badger.go b/banyand/kv/badger.go
index 8a6bf80b..48a0a367 100644
--- a/banyand/kv/badger.go
+++ b/banyand/kv/badger.go
@@ -60,6 +60,11 @@ func (b *badgerTSS) Close() error {
return nil
}
+func (b *badgerTSS) SizeOnDisk() int64 {
+ lsmSize, vlogSize := b.db.Size()
+ return lsmSize + vlogSize
+}
+
type mergedIter struct {
delegated Iterator
data []byte
@@ -145,6 +150,11 @@ func (b *badgerDB) Scan(prefix, seekKey []byte, opt ScanOpts, f ScanFunc) error
return nil
}
+func (b *badgerDB) SizeOnDisk() int64 {
+ lsmSize, vlogSize := b.db.Size()
+ return lsmSize + vlogSize
+}
+
var _ Iterator = (*iterator)(nil)
type iterator struct {
diff --git a/banyand/kv/kv.go b/banyand/kv/kv.go
index c2e8a97a..c5ae7a89 100644
--- a/banyand/kv/kv.go
+++ b/banyand/kv/kv.go
@@ -73,6 +73,7 @@ type Store interface {
io.Closer
writer
Reader
+ SizeOnDisk() int64
}
// TimeSeriesReader allows retrieving data from a time-series storage.
@@ -86,6 +87,7 @@ type TimeSeriesStore interface {
io.Closer
Handover(skl *skl.Skiplist) error
TimeSeriesReader
+ SizeOnDisk() int64
}
// TimeSeriesOptions sets an options for creating a TimeSeriesStore.
@@ -163,6 +165,7 @@ type IndexStore interface {
Iterable
Reader
Close() error
+ SizeOnDisk() int64
}
// OpenTimeSeriesStore creates a new TimeSeriesStore.
diff --git a/banyand/liaison/grpc/server.go b/banyand/liaison/grpc/server.go
index 057486dc..5bcf3596 100644
--- a/banyand/liaison/grpc/server.go
+++ b/banyand/liaison/grpc/server.go
@@ -21,14 +21,18 @@ package grpc
import (
"context"
"net"
+ "runtime/debug"
"time"
- grpc_validator "github.com/grpc-ecosystem/go-grpc-middleware/validator"
+ "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/recovery"
+ grpc_validator "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/validator"
"github.com/pkg/errors"
grpclib "google.golang.org/grpc"
+ "google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/health"
"google.golang.org/grpc/health/grpc_health_v1"
+ "google.golang.org/grpc/status"
"github.com/apache/skywalking-banyandb/api/event"
databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
@@ -37,6 +41,7 @@ import (
streamv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
"github.com/apache/skywalking-banyandb/banyand/discovery"
"github.com/apache/skywalking-banyandb/banyand/metadata"
+ "github.com/apache/skywalking-banyandb/banyand/observability"
"github.com/apache/skywalking-banyandb/banyand/queue"
"github.com/apache/skywalking-banyandb/pkg/bus"
"github.com/apache/skywalking-banyandb/pkg/logger"
@@ -184,9 +189,31 @@ func (s *server) Serve() run.StopNotify {
if s.tls {
opts = []grpclib.ServerOption{grpclib.Creds(s.creds)}
}
+ grpcPanicRecoveryHandler := func(p any) (err error) {
+ s.log.Error().Interface("panic", p).Str("stack", string(debug.Stack())).Msg("recovered from panic")
+
+ return status.Errorf(codes.Internal, "%s", p)
+ }
+
+ unaryMetrics, streamMetrics := observability.MetricsServerInterceptor()
+ streamChain := []grpclib.StreamServerInterceptor{
+ grpc_validator.StreamServerInterceptor(),
+ recovery.StreamServerInterceptor(recovery.WithRecoveryHandler(grpcPanicRecoveryHandler)),
+ }
+ if streamMetrics != nil {
+ streamChain = append(streamChain, streamMetrics)
+ }
+ unaryChain := []grpclib.UnaryServerInterceptor{
+ grpc_validator.UnaryServerInterceptor(),
+ recovery.UnaryServerInterceptor(recovery.WithRecoveryHandler(grpcPanicRecoveryHandler)),
+ }
+ if unaryMetrics != nil {
+ unaryChain = append(unaryChain, unaryMetrics)
+ }
+
opts = append(opts, grpclib.MaxRecvMsgSize(s.maxRecvMsgSize),
- grpclib.UnaryInterceptor(grpc_validator.UnaryServerInterceptor()),
- grpclib.StreamInterceptor(grpc_validator.StreamServerInterceptor()),
+ grpclib.ChainUnaryInterceptor(unaryChain...),
+ grpclib.ChainStreamInterceptor(streamChain...),
)
s.ser = grpclib.NewServer(opts...)
diff --git a/banyand/measure/metadata.go b/banyand/measure/metadata.go
index aa16588e..2ed11cdd 100644
--- a/banyand/measure/metadata.go
+++ b/banyand/measure/metadata.go
@@ -240,10 +240,12 @@ func (s *supplier) OpenDB(groupSchema *commonv1.Group) (tsdb.Database, error) {
if opts.TTL, err = pb_v1.ToIntervalRule(groupSchema.ResourceOpts.Ttl); err != nil {
return nil, err
}
+
return tsdb.OpenDatabase(
- context.WithValue(context.Background(), common.PositionKey, common.Position{
- Module: "measure",
- Database: name,
+ common.SetPosition(context.Background(), func(p common.Position) common.Position {
+ p.Module = "measure"
+ p.Database = name
+ return p
}),
opts)
}
diff --git a/banyand/observability/collector.go b/banyand/observability/collector.go
new file mode 100644
index 00000000..f03d8f52
--- /dev/null
+++ b/banyand/observability/collector.go
@@ -0,0 +1,34 @@
+package observability
+
+import (
+ "sync"
+)
+
+// MetricsCollector is a global metrics collector.
+var MetricsCollector = Collector{
+ getters: make(map[string]MetricsGetter),
+}
+
+// MetricsGetter is a function that collects metrics.
+type MetricsGetter func()
+
+// Collector is a metrics collector.
+type Collector struct {
+ getters map[string]MetricsGetter
+ gMux sync.RWMutex
+}
+
+// Register registers a metrics getter.
+func (c *Collector) Register(name string, getter MetricsGetter) {
+ c.gMux.Lock()
+ defer c.gMux.Unlock()
+ c.getters[name] = getter
+}
+
+func (c *Collector) collect() {
+ c.gMux.RLock()
+ defer c.gMux.RUnlock()
+ for _, getter := range c.getters {
+ getter()
+ }
+}
diff --git a/banyand/observability/meter_noop.go b/banyand/observability/meter_noop.go
index bc25b6e6..29bb737d 100644
--- a/banyand/observability/meter_noop.go
+++ b/banyand/observability/meter_noop.go
@@ -21,10 +21,24 @@
package observability
import (
+ "google.golang.org/grpc"
+
"github.com/apache/skywalking-banyandb/pkg/meter"
+ "github.com/apache/skywalking-banyandb/pkg/run"
)
+// NewMetricService returns a metric service.
+func NewMetricService() run.Service {
+ MetricsCollector.collect()
+ return nil
+}
+
// NewMeterProvider returns a meter.Provider based on the given scope.
func NewMeterProvider(_ meter.Scope) meter.Provider {
return meter.NoopProvider{}
}
+
+// MetricsServerInterceptor returns a grpc.UnaryServerInterceptor and a grpc.StreamServerInterceptor.
+func MetricsServerInterceptor() (grpc.UnaryServerInterceptor, grpc.StreamServerInterceptor) {
+ return nil, nil
+}
diff --git a/banyand/observability/meter_prom.go b/banyand/observability/meter_prom.go
index 1047e99f..173bb116 100644
--- a/banyand/observability/meter_prom.go
+++ b/banyand/observability/meter_prom.go
@@ -21,11 +21,116 @@
package observability
import (
+ "context"
+ "net/http"
+ "time"
+
+ grpcprom "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus"
+ "github.com/prometheus/client_golang/prometheus"
+ "github.com/prometheus/client_golang/prometheus/promhttp"
+ "github.com/robfig/cron/v3"
+
+ "github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/meter"
"github.com/apache/skywalking-banyandb/pkg/meter/prom"
+ "github.com/apache/skywalking-banyandb/pkg/run"
+ "github.com/apache/skywalking-banyandb/pkg/timestamp"
+ "google.golang.org/grpc"
)
+var (
+ _ run.Service = (*metricService)(nil)
+ _ run.Config = (*metricService)(nil)
+
+ reg = prometheus.NewRegistry()
+)
+
+// NewMetricService returns a metric service.
+func NewMetricService() run.Service {
+ return &metricService{
+ closer: run.NewCloser(1),
+ }
+}
+
+type metricService struct {
+ l *logger.Logger
+ svr *http.Server
+ closer *run.Closer
+ listenAddr string
+ scheduler *timestamp.Scheduler
+}
+
+func (p *metricService) FlagSet() *run.FlagSet {
+ flagSet := run.NewFlagSet("observability")
+ flagSet.StringVar(&p.listenAddr, "observability-listener-addr", ":2121", "listen addr for observability")
+ return flagSet
+}
+
+func (p *metricService) Validate() error {
+ if p.listenAddr == "" {
+ return errNoAddr
+ }
+ return nil
+}
+
+func (p *metricService) Name() string {
+ return "metric-service"
+}
+
+func (p *metricService) Serve() run.StopNotify {
+ p.l = logger.GetLogger(p.Name())
+
+ reg.MustRegister(prometheus.NewGoCollector())
+ reg.MustRegister(prometheus.NewProcessCollector(prometheus.ProcessCollectorOpts{}))
+
+ clock, _ := timestamp.GetClock(context.TODO())
+ p.scheduler = timestamp.NewScheduler(p.l, clock)
+ p.scheduler.Register("metrics-collector", cron.Descriptor, "@every 15s", func(now time.Time, logger *logger.Logger) bool {
+ MetricsCollector.collect()
+ return true
+ })
+
+ mux := http.NewServeMux()
+ mux.Handle("/metrics", promhttp.HandlerFor(
+ reg,
+ promhttp.HandlerOpts{},
+ ))
+ p.svr = &http.Server{
+ Addr: p.listenAddr,
+ ReadHeaderTimeout: 3 * time.Second,
+ Handler: mux,
+ }
+
+ go func() {
+ defer p.closer.Done()
+ p.l.Info().Str("listenAddr", p.listenAddr).Msg("Start metric server")
+ _ = p.svr.ListenAndServe()
+ }()
+ return p.closer.CloseNotify()
+}
+
+func (p *metricService) GracefulStop() {
+ if p.scheduler != nil {
+ p.scheduler.Close()
+ }
+ if p.svr != nil {
+ _ = p.svr.Close()
+ }
+ p.closer.CloseThenWait()
+}
+
// NewMeterProvider returns a meter.Provider based on the given scope.
func NewMeterProvider(scope meter.Scope) meter.Provider {
- return prom.NewProvider(scope)
+ return prom.NewProvider(scope, reg)
+}
+
+// MetricsServerInterceptor returns a server interceptor for metrics.
+func MetricsServerInterceptor() (grpc.UnaryServerInterceptor, grpc.StreamServerInterceptor) {
+ srvMetrics := grpcprom.NewServerMetrics(
+ grpcprom.WithServerHandlingTimeHistogram(
+ grpcprom.WithHistogramBuckets([]float64{0.001, 0.01, 0.1, 0.3, 0.6, 1, 3, 6, 9, 20, 30, 60, 90, 120}),
+ ),
+ )
+ reg.MustRegister(srvMetrics)
+ return srvMetrics.UnaryServerInterceptor(), srvMetrics.StreamServerInterceptor()
}
diff --git a/banyand/observability/metric.go b/banyand/observability/metric.go
deleted file mode 100644
index d34b7a5d..00000000
--- a/banyand/observability/metric.go
+++ /dev/null
@@ -1,90 +0,0 @@
-// Licensed to Apache Software Foundation (ASF) under one or more contributor
-// license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright
-// ownership. Apache Software Foundation (ASF) licenses this file to you under
-// the Apache License, Version 2.0 (the "License"); you may
-// not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package observability
-
-import (
- "net/http"
- "time"
-
- "github.com/prometheus/client_golang/prometheus/promhttp"
-
- "github.com/apache/skywalking-banyandb/pkg/logger"
- "github.com/apache/skywalking-banyandb/pkg/meter"
- "github.com/apache/skywalking-banyandb/pkg/run"
-)
-
-var (
- _ run.Service = (*metricService)(nil)
- _ run.Config = (*metricService)(nil)
-
- // RootScope is the root scope for all metrics.
- RootScope = meter.NewHierarchicalScope("banyandb", "_")
-)
-
-// NewMetricService returns a metric service.
-func NewMetricService() run.Service {
- return &metricService{
- closer: run.NewCloser(1),
- }
-}
-
-type metricService struct {
- l *logger.Logger
- svr *http.Server
- closer *run.Closer
- listenAddr string
-}
-
-func (p *metricService) FlagSet() *run.FlagSet {
- flagSet := run.NewFlagSet("observability")
- flagSet.StringVar(&p.listenAddr, "observability-listener-addr", ":2121", "listen addr for observability")
- return flagSet
-}
-
-func (p *metricService) Validate() error {
- if p.listenAddr == "" {
- return errNoAddr
- }
- return nil
-}
-
-func (p *metricService) Name() string {
- return "metric-service"
-}
-
-func (p *metricService) Serve() run.StopNotify {
- p.l = logger.GetLogger(p.Name())
- mux := http.NewServeMux()
- mux.Handle("/metrics", promhttp.Handler())
- p.svr = &http.Server{
- Addr: p.listenAddr,
- ReadHeaderTimeout: 3 * time.Second,
- Handler: mux,
- }
- go func() {
- defer p.closer.Done()
- p.l.Info().Str("listenAddr", p.listenAddr).Msg("Start metric server")
- _ = p.svr.ListenAndServe()
- }()
- return p.closer.CloseNotify()
-}
-
-func (p *metricService) GracefulStop() {
- _ = p.svr.Close()
- p.closer.CloseThenWait()
-}
diff --git a/banyand/observability/metrics_system.go b/banyand/observability/metrics_system.go
new file mode 100644
index 00000000..abb5cb8f
--- /dev/null
+++ b/banyand/observability/metrics_system.go
@@ -0,0 +1,123 @@
+package observability
+
+import (
+ "context"
+ "strings"
+ "sync"
+
+ "github.com/shirou/gopsutil/v3/cpu"
+ "github.com/shirou/gopsutil/v3/mem"
+ "github.com/shirou/gopsutil/v3/net"
+
+ "github.com/apache/skywalking-banyandb/pkg/logger"
+ "github.com/apache/skywalking-banyandb/pkg/meter"
+)
+
+var log = logger.GetLogger("observability", "metrics", "system")
+
+var (
+ cpuCount = 0
+ once4CpuCount sync.Once
+ cpuCountsFunc = cpu.Counts
+ cpuTimesFunc = cpu.Times
+)
+
+var (
+ // RootScope is the root scope for all metrics.
+ RootScope = meter.NewHierarchicalScope("banyandb", "_")
+
+ systemScope = RootScope.SubScope("system")
+
+ systemProvider = NewMeterProvider(systemScope)
+ cpuStateGauge = systemProvider.Gauge("cpu_state", "kind")
+ cpuNumGauge = systemProvider.Gauge("cpu_num")
+ memorySateGauge = systemProvider.Gauge("memory_state", "kind")
+ netStateGauge = systemProvider.Gauge("net_state", "kind", "name")
+)
+
+func init() {
+ MetricsCollector.Register("cpu", collectCPU)
+ MetricsCollector.Register("memory", collectMemory)
+ MetricsCollector.Register("net", collectNet)
+}
+
+func collectCPU() {
+ once4CpuCount.Do(func() {
+ if c, err := cpuCountsFunc(false); err != nil {
+ log.Error().Err(err).Msg("cannot get cpu count")
+ } else {
+ cpuCount = c
+ }
+ })
+ cpuNumGauge.Set(float64(cpuCount))
+ s, err := cpuTimesFunc(false)
+ if err != nil {
+ log.Error().Err(err).Msg("cannot get cpu stat")
+ }
+ if len(s) == 0 {
+ log.Error().Msg("cannot get cpu stat")
+ }
+ allStat := s[0]
+ total := allStat.User + allStat.System + allStat.Idle + allStat.Nice + allStat.Iowait + allStat.Irq +
+ allStat.Softirq + allStat.Steal + allStat.Guest + allStat.GuestNice
+ cpuStateGauge.Set(allStat.User/total, "user")
+ cpuStateGauge.Set(allStat.System/total, "system")
+ cpuStateGauge.Set(allStat.Idle/total, "idle")
+ cpuStateGauge.Set(allStat.Nice/total, "nice")
+ cpuStateGauge.Set(allStat.Iowait/total, "iowait")
+ cpuStateGauge.Set(allStat.Irq/total, "irq")
+ cpuStateGauge.Set(allStat.Softirq/total, "softirq")
+ cpuStateGauge.Set(allStat.Steal/total, "steal")
+}
+
+func collectMemory() {
+ m, err := mem.VirtualMemory()
+ if err != nil {
+ log.Error().Err(err).Msg("cannot get memory stat")
+ }
+ memorySateGauge.Set(m.UsedPercent/100, "used_percent")
+ memorySateGauge.Set(float64(m.Used)/float64(m.Total), "used")
+}
+
+func collectNet() {
+ stats, err := getNetStat(context.Background())
+ if err != nil {
+ log.Error().Err(err).Msg("cannot get net stat")
+ }
+ for _, stat := range stats {
+ netStateGauge.Set(float64(stat.BytesRecv), "bytes_recv", stat.Name)
+ netStateGauge.Set(float64(stat.BytesSent), "bytes_sent", stat.Name)
+ netStateGauge.Set(float64(stat.PacketsRecv), "packets_recv", stat.Name)
+ netStateGauge.Set(float64(stat.PacketsSent), "packets_sent", stat.Name)
+ netStateGauge.Set(float64(stat.Errin), "errin", stat.Name)
+ netStateGauge.Set(float64(stat.Errout), "errout", stat.Name)
+ netStateGauge.Set(float64(stat.Dropin), "dropin", stat.Name)
+ netStateGauge.Set(float64(stat.Dropout), "dropout", stat.Name)
+ netStateGauge.Set(float64(stat.Fifoin), "fifoin", stat.Name)
+ netStateGauge.Set(float64(stat.Fifoout), "fifoout", stat.Name)
+ }
+}
+
+func getNetStat(ctx context.Context) ([]net.IOCountersStat, error) {
+ stats, err := net.IOCountersWithContext(ctx, true)
+ if err != nil {
+ return nil, err
+ }
+ var availableStats []net.IOCountersStat
+ for _, stat := range stats {
+ switch {
+ // OS X
+ case strings.HasPrefix(stat.Name, "en"):
+ // Linux
+ case strings.HasPrefix(stat.Name, "eth"):
+ default:
+ continue
+ }
+ // ignore empty interface
+ if stat.BytesRecv == 0 || stat.BytesSent == 0 {
+ continue
+ }
+ availableStats = append(availableStats, stat)
+ }
+ return availableStats, nil
+}
diff --git a/banyand/observability/pprof.go b/banyand/observability/pprof.go
index 0e651367..46cbb0da 100644
--- a/banyand/observability/pprof.go
+++ b/banyand/observability/pprof.go
@@ -27,8 +27,8 @@ import (
)
var (
- _ run.Service = (*metricService)(nil)
- _ run.Config = (*metricService)(nil)
+ _ run.Service = (*pprofService)(nil)
+ _ run.Config = (*pprofService)(nil)
)
// NewProfService returns a pprof service.
diff --git a/banyand/stream/metadata.go b/banyand/stream/metadata.go
index 968fd7a1..11de21d9 100644
--- a/banyand/stream/metadata.go
+++ b/banyand/stream/metadata.go
@@ -220,9 +220,10 @@ func (s *supplier) OpenDB(groupSchema *commonv1.Group) (tsdb.Database, error) {
return nil, err
}
return tsdb.OpenDatabase(
- context.WithValue(context.Background(), common.PositionKey, common.Position{
- Module: "stream",
- Database: name,
+ common.SetPosition(context.Background(), func(p common.Position) common.Position {
+ p.Module = "stream"
+ p.Database = name
+ return p
}),
opts)
}
diff --git a/banyand/tsdb/block.go b/banyand/tsdb/block.go
index 53f29f47..55dbb83b 100644
--- a/banyand/tsdb/block.go
+++ b/banyand/tsdb/block.go
@@ -28,6 +28,7 @@ import (
"sync"
"sync/atomic"
"time"
+ "unsafe"
"github.com/dgraph-io/badger/v3/skl"
"github.com/pkg/errors"
@@ -35,11 +36,13 @@ import (
"github.com/apache/skywalking-banyandb/api/common"
"github.com/apache/skywalking-banyandb/banyand/kv"
+ "github.com/apache/skywalking-banyandb/banyand/observability"
"github.com/apache/skywalking-banyandb/banyand/tsdb/bucket"
"github.com/apache/skywalking-banyandb/pkg/index"
"github.com/apache/skywalking-banyandb/pkg/index/inverted"
"github.com/apache/skywalking-banyandb/pkg/index/lsm"
"github.com/apache/skywalking-banyandb/pkg/logger"
+ "github.com/apache/skywalking-banyandb/pkg/meter"
"github.com/apache/skywalking-banyandb/pkg/timestamp"
)
@@ -53,9 +56,16 @@ const (
maxBlockAge = time.Hour
defaultWriteConcurrency = 1000
defaultNumBufferShards = 2
+ itemIDLength = unsafe.Sizeof(common.ItemID(0))
)
-var errBlockClosingInterrupted = errors.New("interrupt to close the block")
+var (
+ blockMeterProvider = observability.NewMeterProvider(meterTSDB.SubScope("block"))
+ blockOpenedTimeSecondsGauge = blockMeterProvider.Gauge("opened_time_seconds", common.LabelNames()...)
+ blockReferencesGauge = blockMeterProvider.Gauge("refs", common.LabelNames()...)
+
+ errBlockClosingInterrupted = errors.New("interrupt to close the block")
+)
type block struct {
invertedIndex index.Store
@@ -125,10 +135,7 @@ func newBlock(ctx context.Context, opts blockOpts) (b *block, err error) {
b.Reporter = bucket.NewTimeBasedReporter(b.String(), opts.timeRange, clock, opts.scheduler)
b.closed.Store(true)
b.options(ctx)
- position := ctx.Value(common.PositionKey)
- if position != nil {
- b.position = position.(common.Position)
- }
+ b.position = common.GetPosition(ctx)
return b, nil
}
@@ -202,6 +209,7 @@ func (b *block) open() (err error) {
b.closableLst = append(b.closableLst, b.invertedIndex, b.lsmIndex)
b.ref.Store(0)
b.closed.Store(false)
+ blockOpenedTimeSecondsGauge.Set(float64(time.Now().Unix()), b.position.LabelValues()...)
return nil
}
@@ -287,11 +295,13 @@ func (b *block) incRef() bool {
return false
}
b.ref.Add(1)
+ blockReferencesGauge.Set(float64(b.ref.Load()), b.position.LabelValues()...)
return true
}
func (b *block) Done() {
b.ref.Add(-1)
+ blockReferencesGauge.Set(float64(b.ref.Load()), b.position.LabelValues()...)
}
func (b *block) waitDone(stopped *atomic.Bool) <-chan struct{} {
@@ -337,6 +347,9 @@ func (b *block) close(ctx context.Context) (err error) {
for _, closer := range b.closableLst {
err = multierr.Append(err, closer.Close())
}
+ for _, g := range []meter.Gauge{blockOpenedTimeSecondsGauge, blockReferencesGauge} {
+ g.Delete(b.position.LabelValues()...)
+ }
return err
}
@@ -416,24 +429,55 @@ func (d *bDelegate) write(key []byte, val []byte, ts time.Time) error {
d.delegate.lock.Lock()
if err := d.delegate.openBuffer(); err != nil {
d.delegate.lock.Unlock()
+ receivedNumCounter.Inc(1, append(d.delegate.position.ShardLabelValues(), "main", "true")...)
return err
}
d.delegate.lock.Unlock()
}
d.delegate.buffer.Write(key, val, ts)
+ receivedBytesCounter.Inc(float64(len(key)+len(val)), append(d.delegate.position.ShardLabelValues(), "main")...)
+ receivedNumCounter.Inc(1, append(d.delegate.position.ShardLabelValues(), "main", "false")...)
return nil
}
func (d *bDelegate) writePrimaryIndex(field index.Field, id common.ItemID) error {
- return d.delegate.lsmIndex.Write([]index.Field{field}, id)
+ if err := d.delegate.lsmIndex.Write([]index.Field{field}, id); err != nil {
+ receivedNumCounter.Inc(1, append(d.delegate.position.ShardLabelValues(), "primary", "true")...)
+ return err
+ }
+ receivedBytesCounter.Inc(float64(len(field.Marshal())+int(itemIDLength)), append(d.delegate.position.ShardLabelValues(), "primary")...)
+ receivedNumCounter.Inc(1, append(d.delegate.position.ShardLabelValues(), "primary", "false")...)
+ return nil
}
func (d *bDelegate) writeLSMIndex(fields []index.Field, id common.ItemID) error {
- return d.delegate.lsmIndex.Write(fields, id)
+ total := 0
+ for _, f := range fields {
+ total += len(f.Marshal())
+ }
+
+ if err := d.delegate.lsmIndex.Write(fields, id); err != nil {
+ receivedNumCounter.Inc(1, append(d.delegate.position.ShardLabelValues(), "local_lsm", "true")...)
+ return err
+ }
+ receivedBytesCounter.Inc(float64(total+int(itemIDLength)), append(d.delegate.position.ShardLabelValues(), "local_lsm")...)
+ receivedNumCounter.Inc(1, append(d.delegate.position.ShardLabelValues(), "local_lsm", "false")...)
+ return nil
}
func (d *bDelegate) writeInvertedIndex(fields []index.Field, id common.ItemID) error {
- return d.delegate.invertedIndex.Write(fields, id)
+ total := 0
+ for _, f := range fields {
+ total += len(f.Marshal())
+ }
+
+ if err := d.delegate.invertedIndex.Write(fields, id); err != nil {
+ receivedNumCounter.Inc(1, append(d.delegate.position.ShardLabelValues(), "local_inverted", "true")...)
+ return err
+ }
+ receivedBytesCounter.Inc(float64(total+int(itemIDLength)), append(d.delegate.position.ShardLabelValues(), "local_inverted")...)
+ receivedNumCounter.Inc(1, append(d.delegate.position.ShardLabelValues(), "local_inverted", "false")...)
+ return nil
}
func (d *bDelegate) contains(ts time.Time) bool {
diff --git a/banyand/tsdb/buffer.go b/banyand/tsdb/buffer.go
index 001501df..d2e6ebe6 100644
--- a/banyand/tsdb/buffer.go
+++ b/banyand/tsdb/buffer.go
@@ -67,17 +67,18 @@ type flushEvent struct {
type onFlush func(shardIndex int, skl *skl.Skiplist) error
type bufferShardBucket struct {
- mutable *skl.Skiplist
- writeCh chan operation
- flushCh chan flushEvent
- writeWaitGroup *sync.WaitGroup
- flushWaitGroup *sync.WaitGroup
- log *logger.Logger
- immutables []*skl.Skiplist
- labelValues []string
- index int
- capacity int
- mutex sync.RWMutex
+ mutable *skl.Skiplist
+ writeCh chan operation
+ flushCh chan flushEvent
+ writeWaitGroup *sync.WaitGroup
+ flushWaitGroup *sync.WaitGroup
+ log *logger.Logger
+ immutables []*skl.Skiplist
+ labelValues []string
+ shardLabelValues []string
+ index int
+ capacity int
+ mutex sync.RWMutex
}
// Buffer is an exported struct that represents a buffer composed of multiple shard buckets.
@@ -106,15 +107,16 @@ func NewBuffer(log *logger.Logger, position common.Position, flushSize, writeCon
buffer.flushWaitGroup.Add(numShards)
for i := 0; i < numShards; i++ {
buckets[i] = bufferShardBucket{
- index: i,
- capacity: flushSize,
- mutable: skl.NewSkiplist(int64(flushSize)),
- writeCh: make(chan operation, writeConcurrency),
- flushCh: make(chan flushEvent, 1),
- writeWaitGroup: &buffer.writeWaitGroup,
- flushWaitGroup: &buffer.flushWaitGroup,
- log: buffer.log.Named(fmt.Sprintf("shard-%d", i)),
- labelValues: append(position.LabelValues(), fmt.Sprintf("%d", i)),
+ index: i,
+ capacity: flushSize,
+ mutable: skl.NewSkiplist(int64(flushSize)),
+ writeCh: make(chan operation, writeConcurrency),
+ flushCh: make(chan flushEvent, 1),
+ writeWaitGroup: &buffer.writeWaitGroup,
+ flushWaitGroup: &buffer.flushWaitGroup,
+ log: buffer.log.Named(fmt.Sprintf("shard-%d", i)),
+ labelValues: append(position.LabelValues(), fmt.Sprintf("%d", i)),
+ shardLabelValues: position.ShardLabelValues(),
}
buckets[i].start(onFlushFn)
maxBytes.Set(float64(flushSize), buckets[i].labelValues...)
@@ -206,23 +208,34 @@ func (bsb *bufferShardBucket) getAll() ([]*skl.Skiplist, func()) {
func (bsb *bufferShardBucket) start(onFlushFn onFlush) {
go func() {
+ defer func() {
+ for _, g := range []meter.Gauge{maxBytes, immutableBytes, mutableBytes} {
+ g.Delete(bsb.labelValues...)
+ }
+ }()
defer bsb.flushWaitGroup.Done()
for event := range bsb.flushCh {
oldSkipList := event.data
memSize := oldSkipList.MemSize()
+ t1 := time.Now()
for {
if err := onFlushFn(bsb.index, oldSkipList); err != nil {
bsb.log.Err(err).Msg("flushing immutable buffer failed. Retrying...")
+ flushNum.Inc(1, append(bsb.labelValues[:2], "true")...)
time.Sleep(time.Second)
continue
}
break
}
+ flushLatency.Observe(time.Since(t1).Seconds(), bsb.shardLabelValues...)
+ immutableBytes.Add(float64(-memSize), bsb.labelValues...)
+ flushBytes.Inc(float64(memSize), bsb.shardLabelValues...)
+ flushNum.Inc(1, append(bsb.shardLabelValues, "false")...)
+
bsb.mutex.Lock()
bsb.immutables = bsb.immutables[1:]
bsb.mutex.Unlock()
oldSkipList.DecrRef()
- immutableBytes.Add(float64(-memSize), bsb.labelValues...)
}
}()
go func() {
@@ -259,6 +272,6 @@ func (bsb *bufferShardBucket) triggerFlushing() {
func (bsb *bufferShardBucket) swap() {
bsb.immutables = append(bsb.immutables, bsb.mutable)
- bsb.mutable = skl.NewSkiplist(int64(bsb.capacity))
immutableBytes.Add(float64(bsb.mutable.MemSize()), bsb.labelValues...)
+ bsb.mutable = skl.NewSkiplist(int64(bsb.capacity))
}
diff --git a/banyand/tsdb/indexdb.go b/banyand/tsdb/indexdb.go
index d0fff8fe..d9e57068 100644
--- a/banyand/tsdb/indexdb.go
+++ b/banyand/tsdb/indexdb.go
@@ -147,23 +147,44 @@ type indexWriter struct {
}
func (i *indexWriter) WriteLSMIndex(fields []index.Field) (err error) {
+ total := 0
for _, field := range fields {
if i.scope != nil {
field.Key.SeriesID = GlobalSeriesID(i.scope)
}
- err = multierr.Append(err, i.seg.globalIndex.PutWithVersion(field.Marshal(), i.itemID.marshal(), uint64(i.ts.UnixNano())))
+ bb := field.Marshal()
+ ibb := i.itemID.marshal()
+ err = multierr.Append(err, i.seg.globalIndex.PutWithVersion(bb, ibb, uint64(i.ts.UnixNano())))
+ total += len(bb) + len(ibb) + 8
}
- return err
+ if err != nil {
+ receivedNumCounter.Inc(1, append(i.seg.position.ShardLabelValues(), "global_lsm", "true")...)
+ return err
+ }
+ receivedNumCounter.Inc(1, append(i.seg.position.ShardLabelValues(), "global_lsm", "false")...)
+ receivedBytesCounter.Inc(float64(total), append(i.seg.position.ShardLabelValues(), "global_lsm")...)
+ return nil
}
func (i *indexWriter) WriteInvertedIndex(fields []index.Field) (err error) {
+ total := 0
for _, field := range fields {
if i.scope != nil {
field.Key.SeriesID = GlobalSeriesID(i.scope)
}
- err = multierr.Append(err, i.seg.globalIndex.PutWithVersion(field.Marshal(), i.itemID.marshal(), uint64(i.ts.UnixNano())))
+ bb := field.Marshal()
+ ibb := i.itemID.marshal()
+ err = multierr.Append(err, i.seg.globalIndex.PutWithVersion(bb, ibb, uint64(i.ts.UnixNano())))
+ total += len(bb) + len(ibb) + 8
+ }
+
+ if err != nil {
+ receivedNumCounter.Inc(1, append(i.seg.position.ShardLabelValues(), "global_inverted", "true")...)
+ return err
}
- return err
+ receivedNumCounter.Inc(1, append(i.seg.position.ShardLabelValues(), "global_inverted", "false")...)
+ receivedBytesCounter.Inc(float64(total), append(i.seg.position.ShardLabelValues(), "global_inverted")...)
+ return nil
}
// GlobalSeriesID encodes Entry to common.SeriesID.
diff --git a/banyand/tsdb/scope.go b/banyand/tsdb/scope.go
index 15aebb9b..d28de9eb 100644
--- a/banyand/tsdb/scope.go
+++ b/banyand/tsdb/scope.go
@@ -88,3 +88,7 @@ func (sdd *scopedSeriesDatabase) GetByID(id common.SeriesID) (Series, error) {
func (sdd *scopedSeriesDatabase) List(ctx context.Context, path Path) (SeriesList, error) {
return sdd.delegated.List(ctx, path.prepend(sdd.scope))
}
+
+func (sdd *scopedSeriesDatabase) SizeOnDisk() int64 {
+ return sdd.delegated.SizeOnDisk()
+}
diff --git a/banyand/tsdb/segment.go b/banyand/tsdb/segment.go
index 61122f21..1e2abaa7 100644
--- a/banyand/tsdb/segment.go
+++ b/banyand/tsdb/segment.go
@@ -25,6 +25,7 @@ import (
"sync"
"time"
+ "github.com/apache/skywalking-banyandb/api/common"
"github.com/apache/skywalking-banyandb/banyand/kv"
"github.com/apache/skywalking-banyandb/banyand/tsdb/bucket"
"github.com/apache/skywalking-banyandb/pkg/logger"
@@ -39,6 +40,7 @@ type segment struct {
l *logger.Logger
blockController *blockController
blockManageStrategy *bucket.Strategy
+ position common.Position
timestamp.TimeRange
path string
suffix string
@@ -60,6 +62,7 @@ func openSegment(ctx context.Context, startTime, endTime time.Time, path, suffix
path: path,
suffix: suffix,
TimeRange: timeRange,
+ position: common.GetPosition(ctx),
}
l := logger.Fetch(ctx, s.String())
s.l = l
diff --git a/banyand/tsdb/seriesdb.go b/banyand/tsdb/seriesdb.go
index d715c0a3..679fcf43 100644
--- a/banyand/tsdb/seriesdb.go
+++ b/banyand/tsdb/seriesdb.go
@@ -251,6 +251,7 @@ type SeriesDatabase interface {
GetByID(id common.SeriesID) (Series, error)
Get(key []byte, entityValues EntityValues) (Series, error)
List(ctx context.Context, path Path) (SeriesList, error)
+ SizeOnDisk() int64
}
type blockDatabase interface {
@@ -271,6 +272,7 @@ type seriesDB struct {
seriesMetadata kv.Store
l *logger.Logger
segCtrl *segmentController
+ position common.Position
sync.Mutex
sID common.ShardID
}
@@ -322,8 +324,11 @@ func (s *seriesDB) Get(key []byte, entityValues EntityValues) (Series, error) {
}
errDecode = s.seriesMetadata.Put(entityKey, encodedData)
if errDecode != nil {
+ receivedNumCounter.Inc(1, append(s.position.ShardLabelValues(), "series", "true")...)
return nil, errDecode
}
+ receivedBytesCounter.Inc(float64(len(entityKey)+len(encodedData)), append(s.position.ShardLabelValues(), "series")...)
+ receivedNumCounter.Inc(1, append(s.position.ShardLabelValues(), "series", "false")...)
var series string
if e := s.l.Debug(); e.Enabled() {
@@ -349,6 +354,10 @@ func (s *seriesDB) Get(key []byte, entityValues EntityValues) (Series, error) {
return newSeries(s.context(), seriesID, entityValues.String(), s), nil
}
+func (s *seriesDB) SizeOnDisk() int64 {
+ return s.seriesMetadata.SizeOnDisk()
+}
+
func encode(seriesID common.SeriesID, evv EntityValues) ([]byte, []byte, error) {
data, err := MarshalEntityValues(evv)
if err != nil {
@@ -493,9 +502,10 @@ func (s *seriesDB) Close() error {
func newSeriesDataBase(ctx context.Context, shardID common.ShardID, path string, segCtrl *segmentController) (SeriesDatabase, error) {
sdb := &seriesDB{
- sID: shardID,
- segCtrl: segCtrl,
- l: logger.Fetch(ctx, "series_database"),
+ sID: shardID,
+ segCtrl: segCtrl,
+ l: logger.Fetch(ctx, "series_database"),
+ position: common.GetPosition(ctx),
}
o := ctx.Value(optionsKey)
var memSize int64
diff --git a/banyand/tsdb/shard.go b/banyand/tsdb/shard.go
index 3f561d69..fafd8b1d 100644
--- a/banyand/tsdb/shard.go
+++ b/banyand/tsdb/shard.go
@@ -21,15 +21,19 @@ import (
"context"
"sort"
"strconv"
+ "strings"
"sync"
"time"
"github.com/pkg/errors"
+ "github.com/shirou/gopsutil/v3/disk"
"go.uber.org/multierr"
"github.com/apache/skywalking-banyandb/api/common"
+ "github.com/apache/skywalking-banyandb/banyand/observability"
"github.com/apache/skywalking-banyandb/banyand/tsdb/bucket"
"github.com/apache/skywalking-banyandb/pkg/logger"
+ "github.com/apache/skywalking-banyandb/pkg/meter"
"github.com/apache/skywalking-banyandb/pkg/timestamp"
)
@@ -39,7 +43,31 @@ const (
defaultKVMemorySize = 1 << 20
)
-var _ Shard = (*shard)(nil)
+var (
+ _ Shard = (*shard)(nil)
+
+ shardProvider = observability.NewMeterProvider(meterTSDB.SubScope("shard"))
+ diskStateGetter = disk.UsageWithContext
+
+ diskStateGauge meter.Gauge
+ flushBytes meter.Counter
+ flushNum meter.Counter
+ flushLatency meter.Histogram
+ receivedBytesCounter meter.Counter
+ receivedNumCounter meter.Counter
+ onDiskBytesGauge meter.Gauge
+)
+
+func init() {
+ labelNames := common.ShardLabelNames()
+ diskStateGauge = shardProvider.Gauge("disk_state", append(labelNames, "kind")...)
+ flushBytes = shardProvider.Counter("flush_bytes", labelNames...)
+ flushNum = shardProvider.Counter("flush_num", append(labelNames, "is_error")...)
+ flushLatency = shardProvider.Histogram("flush_latency", meter.DefBuckets, labelNames...)
+ receivedBytesCounter = shardProvider.Counter("received_bytes", append(labelNames, "kind")...)
+ receivedNumCounter = shardProvider.Counter("received_num", append(labelNames, "kind", "is_error")...)
+ onDiskBytesGauge = shardProvider.Gauge("on_disk_bytes", append(labelNames, "kind")...)
+}
type shard struct {
seriesDatabase SeriesDatabase
@@ -103,14 +131,27 @@ func OpenShard(ctx context.Context, id common.ShardID,
return nil, err
}
s.segmentManageStrategy.Run()
- position := shardCtx.Value(common.PositionKey)
- if position != nil {
- s.position = position.(common.Position)
- }
+ s.position = common.GetPosition(shardCtx)
retentionTask := newRetentionTask(s.segmentController, ttl)
if err := scheduler.Register("retention", retentionTask.option, retentionTask.expr, retentionTask.run); err != nil {
return nil, err
}
+ plv := s.position.ShardLabelValues()
+ observability.MetricsCollector.Register(strings.Join(plv, "-"), func() {
+ if stat, err := diskStateGetter(ctx, path); err != nil {
+ s.l.Error().Err(err).Msg("get disk usage stat")
+ } else {
+ diskStateGauge.Set(stat.UsedPercent, append(plv, "used_percent")...)
+ diskStateGauge.Set(float64(stat.Free), append(plv, "free")...)
+ diskStateGauge.Set(float64(stat.Total), append(plv, "total")...)
+ diskStateGauge.Set(float64(stat.Used), append(plv, "used")...)
+ diskStateGauge.Set(float64(stat.InodesUsed), append(plv, "inodes_used")...)
+ diskStateGauge.Set(float64(stat.InodesFree), append(plv, "inodes_free")...)
+ diskStateGauge.Set(float64(stat.InodesTotal), append(plv, "inodes_total")...)
+ diskStateGauge.Set(stat.InodesUsedPercent, append(plv, "inodes_used_percent")...)
+ }
+ s.collectSizeOnDisk(plv)
+ })
return s, nil
}
@@ -126,6 +167,34 @@ func (s *shard) Index() IndexDatabase {
return s.indexDatabase
}
+func (s *shard) collectSizeOnDisk(labelsValues []string) {
+ var globalIndex, localLSM, localInverted, main int64
+ for _, seg := range s.segmentController.segments() {
+ if seg.globalIndex != nil {
+ globalIndex += seg.globalIndex.SizeOnDisk()
+ }
+ for _, b := range seg.blockController.blocks() {
+ if b.Closed() {
+ continue
+ }
+ if b.sst != nil {
+ main += b.sst.SizeOnDisk()
+ }
+ if b.lsmIndex != nil {
+ localLSM += b.lsmIndex.SizeOnDisk()
+ }
+ if b.invertedIndex != nil {
+ localInverted += b.invertedIndex.SizeOnDisk()
+ }
+ }
+ }
+ onDiskBytesGauge.Set(float64(s.seriesDatabase.SizeOnDisk()), append(labelsValues, "series")...)
+ onDiskBytesGauge.Set(float64(globalIndex), append(labelsValues, "global_index")...)
+ onDiskBytesGauge.Set(float64(localLSM), append(labelsValues, "local_lsm")...)
+ onDiskBytesGauge.Set(float64(localInverted), append(labelsValues, "local_inverted")...)
+ onDiskBytesGauge.Set(float64(main), append(labelsValues, "main")...)
+}
+
func (s *shard) State() (shardState ShardState) {
shardState.StrategyManagers = append(shardState.StrategyManagers, s.segmentManageStrategy.String())
for _, seg := range s.segmentController.segments() {
diff --git a/banyand/tsdb/tsdb.go b/banyand/tsdb/tsdb.go
index daa7b4a9..fb499c4b 100644
--- a/banyand/tsdb/tsdb.go
+++ b/banyand/tsdb/tsdb.go
@@ -239,10 +239,11 @@ func OpenDatabase(ctx context.Context, opts DatabaseOpts) (Database, error) {
if opts.TTL.Num == 0 {
return nil, errors.Wrap(errOpenDatabase, "ttl is absent")
}
+ p := common.GetPosition(ctx)
db := &database{
location: opts.Location,
shardNum: opts.ShardNum,
- logger: logger.Fetch(ctx, "tsdb"),
+ logger: logger.Fetch(ctx, p.Database),
segmentSize: opts.SegmentInterval,
blockSize: opts.BlockInterval,
ttl: opts.TTL,
diff --git a/go.mod b/go.mod
index a11d213a..881d6470 100644
--- a/go.mod
+++ b/go.mod
@@ -9,13 +9,14 @@ require (
github.com/cespare/xxhash v1.1.0
github.com/dgraph-io/badger/v3 v3.2011.1
github.com/emirpasic/gods v1.18.1
- github.com/envoyproxy/protoc-gen-validate v0.1.0
+ github.com/envoyproxy/protoc-gen-validate v0.9.1
github.com/go-chi/chi/v5 v5.0.7
github.com/go-resty/resty/v2 v2.7.0
github.com/golang/mock v1.6.0
github.com/google/go-cmp v0.5.9
github.com/google/uuid v1.3.0
- github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
+ github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus v1.0.0-rc.0
+ github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.0.0-rc.5
github.com/grpc-ecosystem/grpc-gateway/v2 v2.14.0
github.com/hashicorp/golang-lru v0.5.4
github.com/oklog/run v1.1.0
@@ -27,21 +28,31 @@ require (
github.com/spf13/cobra v1.6.1
github.com/spf13/pflag v1.0.5
github.com/spf13/viper v1.12.0
- github.com/stretchr/testify v1.8.0
+ github.com/stretchr/testify v1.8.2
github.com/xhit/go-str2duration/v2 v2.0.0
github.com/zenizh/go-capturer v0.0.0-20211219060012-52ea6c8fed04
go.etcd.io/etcd/client/v3 v3.5.6
go.etcd.io/etcd/server/v3 v3.5.6
go.uber.org/multierr v1.8.0
golang.org/x/exp v0.0.0-20220602145555-4a0574d9293f
- golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4
- google.golang.org/genproto v0.0.0-20221118155620-16455021b5e6
- google.golang.org/grpc v1.51.0
- google.golang.org/protobuf v1.28.1
+ golang.org/x/mod v0.8.0
+ google.golang.org/genproto v0.0.0-20230110181048-76db0878b65f
+ google.golang.org/grpc v1.53.0
+ google.golang.org/protobuf v1.30.0
sigs.k8s.io/yaml v1.3.0
)
-require github.com/klauspost/compress v1.15.6 // indirect
+require (
+ github.com/go-ole/go-ole v1.2.6 // indirect
+ github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 // indirect
+ github.com/klauspost/compress v1.15.6 // indirect
+ github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect
+ github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect
+ github.com/shoenig/go-m1cpu v0.1.4 // indirect
+ github.com/tklauser/go-sysconf v0.3.11 // indirect
+ github.com/tklauser/numcpus v0.6.0 // indirect
+ github.com/yusufpapurcu/wmi v1.2.2 // indirect
+)
require (
github.com/axiomhq/hyperloglog v0.0.0-20191112132149-a4c4c47bc57f // indirect
@@ -56,7 +67,7 @@ require (
github.com/blugelabs/ice v1.0.0 // indirect
github.com/caio/go-tdigest v3.1.0+incompatible // indirect
github.com/cenkalti/backoff/v4 v4.2.0 // indirect
- github.com/cespare/xxhash/v2 v2.1.2 // indirect
+ github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/coreos/go-semver v0.3.0 // indirect
github.com/coreos/go-systemd/v22 v22.5.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
@@ -94,12 +105,13 @@ require (
github.com/prometheus/common v0.37.0 // indirect
github.com/prometheus/procfs v0.8.0 // indirect
github.com/robfig/cron/v3 v3.0.1
+ github.com/shirou/gopsutil/v3 v3.23.3
github.com/sirupsen/logrus v1.9.0 // indirect
github.com/soheilhy/cmux v0.1.5 // indirect
- github.com/spf13/afero v1.8.2 // indirect
+ github.com/spf13/afero v1.9.2 // indirect
github.com/spf13/cast v1.5.0 // indirect
github.com/spf13/jwalterweatherman v1.1.0 // indirect
- github.com/stretchr/objx v0.4.0 // indirect
+ github.com/stretchr/objx v0.5.0 // indirect
github.com/subosito/gotenv v1.3.0 // indirect
github.com/tmc/grpc-websocket-proxy v0.0.0-20220101234140-673ab2c3ae75 // indirect
github.com/xiang90/probing v0.0.0-20221125231312-a49e3df8f510 // indirect
@@ -121,9 +133,9 @@ require (
go.uber.org/atomic v1.10.0 // indirect
go.uber.org/zap v1.23.0
golang.org/x/crypto v0.3.0 // indirect
- golang.org/x/net v0.7.0 // indirect
- golang.org/x/sys v0.5.0 // indirect
- golang.org/x/text v0.7.0 // indirect
+ golang.org/x/net v0.8.0 // indirect
+ golang.org/x/sys v0.6.0 // indirect
+ golang.org/x/text v0.8.0 // indirect
golang.org/x/time v0.2.0 // indirect
gopkg.in/ini.v1 v1.66.4 // indirect
gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect
diff --git a/go.sum b/go.sum
index ef0a79f4..4029823a 100644
--- a/go.sum
+++ b/go.sum
@@ -24,7 +24,8 @@ cloud.google.com/go/bigquery v1.4.0/go.mod h1:S8dzgnTigyfTmLBfrtrhyYhwRxG72rYxvf
cloud.google.com/go/bigquery v1.5.0/go.mod h1:snEHRnqQbz117VIFhE8bmtwIDY80NLUZUMb4Nv6dBIg=
cloud.google.com/go/bigquery v1.7.0/go.mod h1://okPTzCYNXSlb24MZs83e2Do+h+VXtc4gLoIoXIAPc=
cloud.google.com/go/bigquery v1.8.0/go.mod h1:J5hqkt3O0uAFnINi6JXValWIb1v0goeZM77hZzJN/fQ=
-cloud.google.com/go/compute v1.6.1 h1:2sMmt8prCn7DPaG4Pmh0N3Inmc8cT8ae5k1M6VJ9Wqc=
+cloud.google.com/go/compute v1.15.1 h1:7UGq3QknM33pw5xATlpzeoomNxsacIVvTqTTvbfajmE=
+cloud.google.com/go/compute/metadata v0.2.3 h1:mg4jlk7mCAj6xXp9UJ4fjI9VUI5rubuGBW5aJ7UnBMY=
cloud.google.com/go/datastore v1.0.0/go.mod h1:LXYbyblFSglQ5pkeyhO+Qmw7ukd3C+pD7TKLgZqpHYE=
cloud.google.com/go/datastore v1.1.0/go.mod h1:umbIZjpQpHh4hmRpGhH4tLFup+FVzqBi1b3c64qFpCk=
cloud.google.com/go/firestore v1.1.0/go.mod h1:ulACoGHTpvq5r8rxGJ4ddJZBZqakUQqClKRT5SZwBmk=
@@ -94,8 +95,9 @@ github.com/certifi/gocertifi v0.0.0-20200922220541-2c3bb06c6054/go.mod h1:sGbDF6
github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko=
github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
-github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE=
github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
+github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
+github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI=
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI=
github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU=
@@ -154,8 +156,9 @@ github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.m
github.com/envoyproxy/go-control-plane v0.9.9-0.20210217033140-668b12f5399d/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk=
github.com/envoyproxy/go-control-plane v0.9.9-0.20210512163311-63b5d3c536b0/go.mod h1:hliV/p42l8fGbc6Y9bQ70uLwIvmJyVE5k4iMKlh8wCQ=
github.com/envoyproxy/go-control-plane v0.9.10-0.20210907150352-cf90f659a021/go.mod h1:AFq3mo9L8Lqqiid3OhADV3RfLJnjiw63cSpi+fDTRC0=
-github.com/envoyproxy/protoc-gen-validate v0.1.0 h1:EQciDnbrYxy13PgWoY8AqoxGiPrpgBZ1R8UNe3ddc+A=
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
+github.com/envoyproxy/protoc-gen-validate v0.9.1 h1:PS7VIOgmSVhWUEeZwTe7z7zouA22Cr590PzXKbZHOVY=
+github.com/envoyproxy/protoc-gen-validate v0.9.1/go.mod h1:OKNgG7TCp5pF4d6XftA0++PMirau2/yoOwVac3AbF2w=
github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=
github.com/fogleman/gg v1.2.1-0.20190220221249-0403632d5b90/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k=
github.com/frankban/quicktest v1.14.3 h1:FJKSZTDHjyhriyC81FLQ0LY93eSai0ZyR/ZIkd3ZUKE=
@@ -183,6 +186,8 @@ github.com/go-logr/logr v1.2.3 h1:2DntVwHkVopvECVRSlL5PSo9eG+cAkDCuckLubN+rq0=
github.com/go-logr/logr v1.2.3/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
+github.com/go-ole/go-ole v1.2.6 h1:/Fpf6oFPoeFik9ty7siob0G6Ke8QvQEuVcuChpwXzpY=
+github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0=
github.com/go-resty/resty/v2 v2.7.0 h1:me+K9p3uhSmXtrBZ4k9jcEAfJmuC8IivWHwaLZwPrFY=
github.com/go-resty/resty/v2 v2.7.0/go.mod h1:9PWDzw47qPphMRFfhsyk0NnSgvluHcljSMVIq3w7q0I=
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
@@ -282,6 +287,10 @@ github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/ad
github.com/grpc-ecosystem/go-grpc-middleware v1.0.0/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs=
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 h1:+9834+KizmvFV7pXQGSXQTsaWhq2GjuNUt0aUU0YBYw=
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0/go.mod h1:z0ButlSOZa5vEBq9m2m2hlwIgKw+rp3sdCBRoJY+30Y=
+github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus v1.0.0-rc.0 h1:mdLirNAJBxnGgyB6pjZLcs6ue/6eZGBui6gXspfq4ks=
+github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus v1.0.0-rc.0/go.mod h1:kdXbOySqcQeTxiqglW7aahTmWZy3Pgi6SYL36yvKeyA=
+github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.0.0-rc.5 h1:3IZOAnD058zZllQTZNBioTlrzrBG/IjpiZ133IEtusM=
+github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.0.0-rc.5/go.mod h1:xbKERva94Pw2cPen0s79J3uXmGzbbpDYFBFDlZ4mV/w=
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 h1:Ovs26xHkKqVztRpIrF/92BcuyuQ/YW4NSIpoGtfXNho=
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk=
github.com/grpc-ecosystem/grpc-gateway v1.9.0/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY=
@@ -355,6 +364,8 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/leesper/go_rng v0.0.0-20190531154944-a612b043e353 h1:X/79QL0b4YJVO5+OsPH9rF2u428CIrGL/jLmPsoOQQ4=
github.com/leesper/go_rng v0.0.0-20190531154944-a612b043e353/go.mod h1:N0SVk0uhy+E1PZ3C9ctsPRlvOPAFPkCNlcPBDkt0N3U=
+github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 h1:6E+4a0GO5zZEnZ81pIr0yLvtUWk2if982qA3F3QD6H4=
+github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0/go.mod h1:zJYVVT2jmtg6P3p1VtQj7WsuWi/y4VnjVBn7F8KPB3I=
github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
github.com/magiconair/properties v1.8.1/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
github.com/magiconair/properties v1.8.6 h1:5ibWZ6iY0NctNGWo87LalDlEZ6R41TqbbDamhfG/Qzo=
@@ -408,6 +419,8 @@ github.com/pkg/sftp v1.13.1/go.mod h1:3HaPG6Dq1ILlpPZRO0HVMrsydcdLt6HRDccSgb87qR
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndrE9hABlRI=
+github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c h1:ncq/mPwQF4JjgDlrVEn3C11VoGHZN7m8qihwgMEtzYw=
+github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c/go.mod h1:OmDBASR4679mdNQnz2pUhc2G8CO2JrUAVFDRBDP/hJE=
github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw=
github.com/prometheus/client_golang v0.9.3/go.mod h1:/TN21ttK/J9q6uSwhBd54HahCDft0ttaMvbicHlPoso=
github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo=
@@ -454,6 +467,12 @@ github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQD
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts=
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc=
+github.com/shirou/gopsutil/v3 v3.23.3 h1:Syt5vVZXUDXPEXpIBt5ziWsJ4LdSAAxF4l/xZeQgSEE=
+github.com/shirou/gopsutil/v3 v3.23.3/go.mod h1:lSBNN6t3+D6W5e5nXTxc8KIMMVxAcS+6IJlffjRRlMU=
+github.com/shoenig/go-m1cpu v0.1.4 h1:SZPIgRM2sEF9NJy50mRHu9PKGwxyyTTJIWvCtgVbozs=
+github.com/shoenig/go-m1cpu v0.1.4/go.mod h1:Wwvst4LR89UxjeFtLRMrpgRiyY4xPsejnVZym39dbAQ=
+github.com/shoenig/test v0.6.3 h1:GVXWJFk9PiOjN0KoJ7VrJGH6uLPnqxR7/fe3HUPfE0c=
+github.com/shoenig/test v0.6.3/go.mod h1:byHiCGXqrVaflBLAMq/srcZIHynQPQgeyvkvXnjqq0k=
github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc=
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
@@ -470,8 +489,8 @@ github.com/soheilhy/cmux v0.1.5/go.mod h1:T7TcVDs9LWfQgPlPsdngu6I6QIoyIFZDDC6sNE
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72 h1:qLC7fQah7D6K1B0ujays3HV9gkFtllcxhzImRR7ArPQ=
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
github.com/spf13/afero v1.1.2/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B0CQ=
-github.com/spf13/afero v1.8.2 h1:xehSyVa0YnHWsJ49JFljMpg1HX19V6NDZ1fkm1Xznbo=
-github.com/spf13/afero v1.8.2/go.mod h1:CtAatgMJh6bJEIs48Ay/FOnkljP3WeGUG0MC1RfAqwo=
+github.com/spf13/afero v1.9.2 h1:j49Hj62F0n+DaZ1dDCvhABaPNSGNkt32oRFxI33IEMw=
+github.com/spf13/afero v1.9.2/go.mod h1:iUV7ddyEEZPO5gA3zD4fJt6iStLlL+Lg4m2cihcDf8Y=
github.com/spf13/cast v1.3.0/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE=
github.com/spf13/cast v1.5.0 h1:rj3WzYc11XZaIZMPKmwP96zkFEnnAmV8s6XbB2aY32w=
github.com/spf13/cast v1.5.0/go.mod h1:SpXXQ5YoyJw6s3/6cMTQuxvgRl3PCJiyaX9p6b155UU=
@@ -491,8 +510,9 @@ github.com/spf13/viper v1.12.0 h1:CZ7eSOd3kZoaYDLbXnmzgQI5RlciuXBMA+18HwHRfZQ=
github.com/spf13/viper v1.12.0/go.mod h1:b6COn30jlNxbm/V2IqWiNWkJ+vZNiMNksliPCiuKtSI=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
-github.com/stretchr/objx v0.4.0 h1:M2gUjqZET1qApGOWNSnZ49BAIMX4F/1plDv3+l31EJ4=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
+github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c=
+github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
@@ -500,11 +520,16 @@ github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
-github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
+github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ8=
+github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw=
github.com/subosito/gotenv v1.3.0 h1:mjC+YW8QpAdXibNi+vNWgzmgBH4+5l5dCXv8cNysBLI=
github.com/subosito/gotenv v1.3.0/go.mod h1:YzJjq/33h7nrwdY+iHMhEOEEbW0ovIz0tB6t6PwAXzs=
+github.com/tklauser/go-sysconf v0.3.11 h1:89WgdJhk5SNwJfu+GKyYveZ4IaJ7xAkecBo+KdJV0CM=
+github.com/tklauser/go-sysconf v0.3.11/go.mod h1:GqXfhXY3kiPa0nAXPDIQIWzJbMCB7AmcWpGR8lSZfqI=
+github.com/tklauser/numcpus v0.6.0 h1:kebhY2Qt+3U6RNK7UqpYNA+tJ23IBEGKkB7JQBfDYms=
+github.com/tklauser/numcpus v0.6.0/go.mod h1:FEZLMke0lhOUG6w2JadTzp0a+Nl8PF/GFkQ5UVIcaL4=
github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
github.com/tmc/grpc-websocket-proxy v0.0.0-20201229170055-e5319fda7802/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
github.com/tmc/grpc-websocket-proxy v0.0.0-20220101234140-673ab2c3ae75 h1:6fotK7otjonDflCTK0BCfls4SPy3NcCVb5dqqmbRknE=
@@ -522,6 +547,8 @@ github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9de
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
github.com/yuin/goldmark v1.4.0/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
+github.com/yusufpapurcu/wmi v1.2.2 h1:KBNDSne4vP5mbSWnJbO+51IMOXJB67QiYCSBrubbPRg=
+github.com/yusufpapurcu/wmi v1.2.2/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0=
github.com/zenizh/go-capturer v0.0.0-20211219060012-52ea6c8fed04 h1:qXafrlZL1WsJW5OokjraLLRURHiw0OzKHD/RNdspp4w=
github.com/zenizh/go-capturer v0.0.0-20211219060012-52ea6c8fed04/go.mod h1:FiwNQxz6hGoNFBC4nIx+CxZhI3nne5RmIOlT/MXcSD4=
github.com/zinclabs/bluge v1.1.5 h1:QJhkweeBVRaaEPdaRptkYOJDLCeyo+JBgc2hNyFehAM=
@@ -647,8 +674,8 @@ golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.4.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.4.1/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
-golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4 h1:6zppjxzCulZykYSLyVDYbneBfbaBIQPYMevg0bEwv2s=
-golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
+golang.org/x/mod v0.8.0 h1:LUYupSeNrTNCGzR/hVBk2NHZO4hXcVaW1k4Qx7rjPx8=
+golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20181023162649-9b4f9f5ad519/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
@@ -695,8 +722,8 @@ golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qx
golang.org/x/net v0.0.0-20211123203042-d83791d6bcd9/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
golang.org/x/net v0.0.0-20220225172249-27dd8689420f/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
-golang.org/x/net v0.7.0 h1:rJrUqqhjsgNp7KqAIc25s9pZnjU7TUcSY7HcVZjdn1g=
-golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
+golang.org/x/net v0.8.0 h1:Zrh2ngAOFYneWTAIAPethzeaQLuHwhuBkuV6ZiRnUaQ=
+golang.org/x/net v0.8.0/go.mod h1:QVkue5JL9kW//ek3r6jTKnTFis1tRmNAW2P1shuFdJc=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
@@ -709,7 +736,7 @@ golang.org/x/oauth2 v0.0.0-20210218202405-ba52d332ba99/go.mod h1:KelEdhl1UZF7XfJ
golang.org/x/oauth2 v0.0.0-20210514164344-f6687ab2804c/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A=
golang.org/x/oauth2 v0.0.0-20211104180415-d3ed0bb246c8/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A=
golang.org/x/oauth2 v0.0.0-20220223155221-ee480838109b/go.mod h1:DAh4E804XQdzx2j+YRIaUnCqCV2RuMz24cGBJ5QYIrc=
-golang.org/x/oauth2 v0.2.0 h1:GtQkldQ9m7yvzCL1V+LrYow3Khe0eJH0w7RbX/VbaIU=
+golang.org/x/oauth2 v0.4.0 h1:NF0gk8LVPg1Ml7SSbGyySuoxdsXitj7TvgvuRxIMc/M=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
@@ -738,6 +765,7 @@ golang.org/x/sys v0.0.0-20190606165138-5da285871e9c/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20190624142023-c5567b49c5d0/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190726091711-fc99dfbffb4e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191001151750-bb3f8db39f24/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191204072324-ce4227a45e2e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
@@ -763,6 +791,7 @@ golang.org/x/sys v0.0.0-20200923182605-d9f96fdee20d/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201201145000-ef89a241ccb3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20201204225414-ed752295db88/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210104204734-6f8348627aad/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
@@ -781,8 +810,9 @@ golang.org/x/sys v0.0.0-20220412211240-33da011f77ad/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20221010170243-090e33056c14/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
-golang.org/x/sys v0.5.0 h1:MUK/U/4lj1t1oPg0HfuXDN/Z1wv31ZJ/YcPiGccS4DU=
-golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.2.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.6.0 h1:MVltZSvRTcU2ljQOhs94SXPftV6DCNnZViHeQps87pQ=
+golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
@@ -794,8 +824,8 @@ golang.org/x/text v0.3.4/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
-golang.org/x/text v0.7.0 h1:4BRB4x83lYWy72KwLD/qYDuTu7q9PjSagHvijDw7cLo=
-golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
+golang.org/x/text v0.8.0 h1:57P1ETyNKtuIjB4SRd15iJxuhj8Gc416Y78H3qgMh68=
+golang.org/x/text v0.8.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
@@ -936,8 +966,8 @@ google.golang.org/genproto v0.0.0-20210108203827-ffc7fda8c3d7/go.mod h1:FWY/as6D
google.golang.org/genproto v0.0.0-20210226172003-ab064af71705/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
google.golang.org/genproto v0.0.0-20210602131652-f16073e35f0c/go.mod h1:UODoCrxHCcBojKKwX1terBiRUaqAsFqJiF615XL43r0=
google.golang.org/genproto v0.0.0-20211118181313-81c1377c94b1/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc=
-google.golang.org/genproto v0.0.0-20221118155620-16455021b5e6 h1:a2S6M0+660BgMNl++4JPlcAO/CjkqYItDEZwkoDQK7c=
-google.golang.org/genproto v0.0.0-20221118155620-16455021b5e6/go.mod h1:rZS5c/ZVYMaOGBfO68GWtjOw/eLaZM1X6iVtgjZ+EWg=
+google.golang.org/genproto v0.0.0-20230110181048-76db0878b65f h1:BWUVssLB0HVOSY78gIdvk1dTVYtT1y8SBWtPYuTJ/6w=
+google.golang.org/genproto v0.0.0-20230110181048-76db0878b65f/go.mod h1:RGgjbofJ8xD9Sq1VVhDM1Vok1vRONV+rg+CjzG4SZKM=
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38=
google.golang.org/grpc v1.21.1/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM=
@@ -961,8 +991,8 @@ google.golang.org/grpc v1.38.0/go.mod h1:NREThFqKR1f3iQ6oBuvc5LadQuXVGo9rkm5ZGrQ
google.golang.org/grpc v1.40.0/go.mod h1:ogyxbiOoUXAkP+4+xa6PZSE9DZgIHtSpzjDTB9KAK34=
google.golang.org/grpc v1.41.0/go.mod h1:U3l9uK9J0sini8mHphKoXyaqDA/8VyGnDee1zzIUK6k=
google.golang.org/grpc v1.42.0/go.mod h1:k+4IHHFw41K8+bbowsex27ge2rCb65oeWqe4jJ590SU=
-google.golang.org/grpc v1.51.0 h1:E1eGv1FTqoLIdnBCZufiSHgKjlqG6fKFf6pPWtMTh8U=
-google.golang.org/grpc v1.51.0/go.mod h1:wgNDFcnuBGmxLKI/qn4T+m5BtEBYXJPvibbUPsAIPww=
+google.golang.org/grpc v1.53.0 h1:LAv2ds7cmFV/XTS3XG1NneeENYrXGmorPxsBbptIjNc=
+google.golang.org/grpc v1.53.0/go.mod h1:OnIrk0ipVdj4N5d9IUoFUx72/VlD7+jUsHwZgwSMQpw=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=
@@ -976,8 +1006,8 @@ google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlba
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
-google.golang.org/protobuf v1.28.1 h1:d0NfwRgPtno5B1Wa6L2DAG+KivqkdutMf1UhdNx175w=
-google.golang.org/protobuf v1.28.1/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
+google.golang.org/protobuf v1.30.0 h1:kPPoIgf3TsEvrm0PFe15JQ+570QVxYzEvvHqChK+cng=
+google.golang.org/protobuf v1.30.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
diff --git a/pkg/index/index.go b/pkg/index/index.go
index 95483d80..024811ab 100644
--- a/pkg/index/index.go
+++ b/pkg/index/index.go
@@ -207,6 +207,7 @@ type Store interface {
io.Closer
Writer
Searcher
+ SizeOnDisk() int64
}
// GetSearcher returns a searcher associated with input index rule type.
diff --git a/pkg/index/inverted/inverted.go b/pkg/index/inverted/inverted.go
index 9574f023..05aa471c 100644
--- a/pkg/index/inverted/inverted.go
+++ b/pkg/index/inverted/inverted.go
@@ -21,6 +21,8 @@ package inverted
import (
"bytes"
"context"
+ "errors"
+ "io"
"log"
"math"
"time"
@@ -160,7 +162,6 @@ func (s *store) Iterator(fieldKey index.FieldKey, termRange index.RangeOpts, ord
if err != nil {
return nil, err
}
- defer reader.Close()
fk := fieldKey.MarshalIndexRule()
var query bluge.Query
shouldDecodeTerm := true
@@ -193,7 +194,7 @@ func (s *store) Iterator(fieldKey index.FieldKey, termRange index.RangeOpts, ord
if err != nil {
return nil, err
}
- result := newBlugeMatchIterator(documentMatchIterator, fk, shouldDecodeTerm)
+ result := newBlugeMatchIterator(documentMatchIterator, fk, shouldDecodeTerm, reader)
return &result, nil
}
@@ -206,7 +207,6 @@ func (s *store) MatchTerms(field index.Field) (list posting.List, err error) {
if err != nil {
return nil, err
}
- defer reader.Close()
fk := field.Key.MarshalIndexRule()
var query bluge.Query
shouldDecodeTerm := true
@@ -222,12 +222,14 @@ func (s *store) MatchTerms(field index.Field) (list posting.List, err error) {
if err != nil {
return nil, err
}
- iter := newBlugeMatchIterator(documentMatchIterator, fk, shouldDecodeTerm)
+ iter := newBlugeMatchIterator(documentMatchIterator, fk, shouldDecodeTerm, reader)
+ defer func() {
+ err = multierr.Append(err, iter.Close())
+ }()
list = roaring.NewPostingList()
for iter.Next() {
err = multierr.Append(err, list.Union(iter.Val().Value))
}
- err = multierr.Append(err, iter.Close())
return list, err
}
@@ -239,7 +241,6 @@ func (s *store) Match(fieldKey index.FieldKey, matches []string) (posting.List,
if err != nil {
return nil, err
}
- defer reader.Close()
analyzer := analyzers[fieldKey.Analyzer]
fk := fieldKey.MarshalIndexRule()
query := bluge.NewBooleanQuery()
@@ -252,12 +253,14 @@ func (s *store) Match(fieldKey index.FieldKey, matches []string) (posting.List,
if err != nil {
return nil, err
}
- iter := newBlugeMatchIterator(documentMatchIterator, fk, false)
+ iter := newBlugeMatchIterator(documentMatchIterator, fk, false, reader)
+ defer func() {
+ err = multierr.Append(err, iter.Close())
+ }()
list := roaring.NewPostingList()
for iter.Next() {
err = multierr.Append(err, list.Union(iter.Val().Value))
}
- err = multierr.Append(err, iter.Close())
return list, err
}
@@ -274,6 +277,11 @@ func (s *store) Range(fieldKey index.FieldKey, opts index.RangeOpts) (list posti
return
}
+func (s *store) SizeOnDisk() int64 {
+ _, bytes := s.writer.DirectoryStats()
+ return int64(bytes)
+}
+
func (s *store) run() {
go func() {
defer s.closer.Done()
@@ -352,6 +360,7 @@ func (s *store) flush() {
type blugeMatchIterator struct {
delegated search.DocumentMatchIterator
err error
+ closer io.Closer
current *index.PostingValue
agg *index.PostingValue
fieldKey string
@@ -359,11 +368,12 @@ type blugeMatchIterator struct {
closed bool
}
-func newBlugeMatchIterator(delegated search.DocumentMatchIterator, fieldKey string, shouldDecodeTerm bool) blugeMatchIterator {
+func newBlugeMatchIterator(delegated search.DocumentMatchIterator, fieldKey string, shouldDecodeTerm bool, closer io.Closer) blugeMatchIterator {
return blugeMatchIterator{
delegated: delegated,
fieldKey: fieldKey,
shouldDecodeTerm: shouldDecodeTerm,
+ closer: closer,
}
}
@@ -450,5 +460,5 @@ func (bmi *blugeMatchIterator) Val() *index.PostingValue {
func (bmi *blugeMatchIterator) Close() error {
bmi.closed = true
- return bmi.err
+ return errors.Join(bmi.err, bmi.closer.Close())
}
diff --git a/pkg/index/lsm/lsm.go b/pkg/index/lsm/lsm.go
index b800aaf2..fa0e3527 100644
--- a/pkg/index/lsm/lsm.go
+++ b/pkg/index/lsm/lsm.go
@@ -47,6 +47,10 @@ func (s *store) Write(fields []index.Field, itemID common.ItemID) (err error) {
return err
}
+func (s *store) SizeOnDisk() int64 {
+ return s.lsm.SizeOnDisk()
+}
+
// StoreOpts wraps options to create the lsm repository.
type StoreOpts struct {
Logger *logger.Logger
diff --git a/pkg/meter/meter.go b/pkg/meter/meter.go
index 86fa1fa4..75132327 100644
--- a/pkg/meter/meter.go
+++ b/pkg/meter/meter.go
@@ -26,6 +26,9 @@ type (
LabelPairs map[string]string
)
+// DefBuckets is the default buckets for histograms.
+var DefBuckets = Buckets{.005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5, 10}
+
// Merge merges the given label pairs with the current label pairs.
func (p LabelPairs) Merge(other LabelPairs) LabelPairs {
result := make(LabelPairs, len(p)+len(other))
@@ -53,49 +56,53 @@ type Scope interface {
GetLabels() LabelPairs
}
+// Instrument is the interface for a metric.
+type Instrument interface {
+ // Delete the metric with the given label values.
+ Delete(labelValues ...string) bool
+}
+
// Counter is a metric that represents a single numerical value that only ever goes up.
type Counter interface {
+ Instrument
Inc(delta float64, labelValues ...string)
}
// Gauge is a metric that represents a single numerical value that can arbitrarily go up and down.
type Gauge interface {
+ Instrument
Set(value float64, labelValues ...string)
Add(delta float64, labelValues ...string)
}
// Histogram is a metric that represents the statistical distribution of a set of values.
type Histogram interface {
+ Instrument
Observe(value float64, labelValues ...string)
}
-type noopCounter struct{}
-
-func (noopCounter) Inc(_ float64, _ ...string) {}
-
-type noopGauge struct{}
-
-func (noopGauge) Set(_ float64, _ ...string) {}
-func (noopGauge) Add(_ float64, _ ...string) {}
-
-type noopHistogram struct{}
+type noopInstrument struct{}
-func (noopHistogram) Observe(_ float64, _ ...string) {}
+func (noopInstrument) Inc(_ float64, _ ...string) {}
+func (noopInstrument) Set(_ float64, _ ...string) {}
+func (noopInstrument) Add(_ float64, _ ...string) {}
+func (noopInstrument) Observe(_ float64, _ ...string) {}
+func (noopInstrument) Delete(_ ...string) bool { return false }
// NoopProvider is a no-op implementation of the Provider interface.
type NoopProvider struct{}
// Counter returns a no-op implementation of the Counter interface.
func (NoopProvider) Counter(_ string, _ ...string) Counter {
- return noopCounter{}
+ return noopInstrument{}
}
// Gauge returns a no-op implementation of the Gauge interface.
func (NoopProvider) Gauge(_ string, _ ...string) Gauge {
- return noopGauge{}
+ return noopInstrument{}
}
// Histogram returns a no-op implementation of the Histogram interface.
func (NoopProvider) Histogram(_ string, _ Buckets, _ ...string) Histogram {
- return noopHistogram{}
+ return noopInstrument{}
}
diff --git a/pkg/meter/prom/instruments.go b/pkg/meter/prom/instruments.go
index 6ea434c4..9f29238c 100644
--- a/pkg/meter/prom/instruments.go
+++ b/pkg/meter/prom/instruments.go
@@ -28,6 +28,10 @@ func (c *counter) Inc(delta float64, labelValues ...string) {
c.counter.WithLabelValues(labelValues...).Add(delta)
}
+func (c *counter) Delete(labelValues ...string) bool {
+ return c.counter.DeleteLabelValues(labelValues...)
+}
+
type gauge struct {
gauge *prometheus.GaugeVec
}
@@ -40,6 +44,10 @@ func (g *gauge) Add(delta float64, labelValues ...string) {
g.gauge.WithLabelValues(labelValues...).Add(delta)
}
+func (g *gauge) Delete(labelValues ...string) bool {
+ return g.gauge.DeleteLabelValues(labelValues...)
+}
+
type histogram struct {
histogram *prometheus.HistogramVec
}
@@ -47,3 +55,7 @@ type histogram struct {
func (h *histogram) Observe(value float64, labelValues ...string) {
h.histogram.WithLabelValues(labelValues...).Observe(value)
}
+
+func (h *histogram) Delete(labelValues ...string) bool {
+ return h.histogram.DeleteLabelValues(labelValues...)
+}
diff --git a/pkg/meter/prom/prom.go b/pkg/meter/prom/prom.go
index 50583732..0ca74abe 100644
--- a/pkg/meter/prom/prom.go
+++ b/pkg/meter/prom/prom.go
@@ -21,6 +21,7 @@ import (
"unsafe"
"github.com/prometheus/client_golang/prometheus"
+ "github.com/prometheus/client_golang/prometheus/promauto"
"github.com/apache/skywalking-banyandb/pkg/meter"
)
@@ -28,21 +29,23 @@ import (
// Provider is a prometheus provider.
type provider struct {
scope meter.Scope
+ reg prometheus.Registerer
}
// NewProvider creates a new prometheus provider with given meter.Scope.
-func NewProvider(scope meter.Scope) meter.Provider {
+func NewProvider(scope meter.Scope, reg prometheus.Registerer) meter.Provider {
return &provider{
scope: scope,
+ reg: reg,
}
}
// Counter returns a prometheus counter.
func (p *provider) Counter(name string, labels ...string) meter.Counter {
return &counter{
- counter: prometheus.NewCounterVec(prometheus.CounterOpts{
- Name: name,
- Help: name,
+ counter: promauto.With(p.reg).NewCounterVec(prometheus.CounterOpts{
+ Name: p.scope.GetNamespace() + "_" + name,
+ Help: p.scope.GetNamespace() + "_" + name,
ConstLabels: convertLabels(p.scope.GetLabels()),
}, labels),
}
@@ -51,9 +54,9 @@ func (p *provider) Counter(name string, labels ...string) meter.Counter {
// Gauge returns a prometheus gauge.
func (p *provider) Gauge(name string, labels ...string) meter.Gauge {
return &gauge{
- gauge: prometheus.NewGaugeVec(prometheus.GaugeOpts{
- Name: name,
- Help: name,
+ gauge: promauto.With(p.reg).NewGaugeVec(prometheus.GaugeOpts{
+ Name: p.scope.GetNamespace() + "_" + name,
+ Help: p.scope.GetNamespace() + "_" + name,
ConstLabels: convertLabels(p.scope.GetLabels()),
}, labels),
}
@@ -62,9 +65,9 @@ func (p *provider) Gauge(name string, labels ...string) meter.Gauge {
// Histogram returns a prometheus histogram.
func (p *provider) Histogram(name string, buckets meter.Buckets, labels ...string) meter.Histogram {
return &histogram{
- histogram: prometheus.NewHistogramVec(prometheus.HistogramOpts{
- Name: name,
- Help: name,
+ histogram: promauto.With(p.reg).NewHistogramVec(prometheus.HistogramOpts{
+ Name: p.scope.GetNamespace() + "_" + name,
+ Help: p.scope.GetNamespace() + "_" + name,
ConstLabels: convertLabels(p.scope.GetLabels()),
Buckets: buckets,
}, labels),
diff --git a/pkg/timestamp/scheduler.go b/pkg/timestamp/scheduler.go
index 0c85e9c6..effadf37 100644
--- a/pkg/timestamp/scheduler.go
+++ b/pkg/timestamp/scheduler.go
@@ -18,6 +18,7 @@
package timestamp
import (
+ "runtime/debug"
"sync"
"time"
@@ -90,7 +91,7 @@ func (s *Scheduler) Register(name string, options cron.ParseOption, expr string,
} else {
clock = s.clock
}
- t := newTask(s.l.Named(name), clock, schedule, action)
+ t := newTask(s.l.Named(name), name, clock, schedule, action)
s.tasks[name] = t
go func() {
t.run()
@@ -146,11 +147,13 @@ type task struct {
closer *run.Closer
l *logger.Logger
action SchedulerAction
+ name string
}
-func newTask(l *logger.Logger, clock clock.Clock, schedule cron.Schedule, action SchedulerAction) *task {
+func newTask(l *logger.Logger, name string, clock clock.Clock, schedule cron.Schedule, action SchedulerAction) *task {
return &task{
l: l,
+ name: name,
clock: clock,
schedule: schedule,
action: action,
@@ -161,26 +164,34 @@ func newTask(l *logger.Logger, clock clock.Clock, schedule cron.Schedule, action
func (t *task) run() {
defer t.closer.Done()
now := t.clock.Now()
- t.l.Info().Time("now", now).Msg("start")
+ t.l.Info().Str("name", t.name).Time("now", now).Msg("start")
for {
next := t.schedule.Next(now)
d := next.Sub(now)
if e := t.l.Debug(); e.Enabled() {
- e.Time("now", now).Time("next", next).Dur("dur", d).Msg("schedule to")
+ e.Str("name", t.name).Time("now", now).Time("next", next).Dur("dur", d).Msg("schedule to")
}
timer := t.clock.Timer(d)
select {
case now = <-timer.C:
if e := t.l.Debug(); e.Enabled() {
- e.Time("now", now).Msg("wake")
+ e.Str("name", t.name).Time("now", now).Msg("wake")
}
- if !t.action(now, t.l) {
- t.l.Info().Msg("action stops the task")
+ if !func() (ret bool) {
+ defer func() {
+ if r := recover(); r != nil {
+ t.l.Error().Str("name", t.name).Interface("panic", r).Str("stack", string(debug.Stack())).Msg("panic")
+ ret = true
+ }
+ }()
+ return t.action(now, t.l)
+ }() {
+ t.l.Info().Str("name", t.name).Msg("action stops the task")
return
}
case <-t.closer.CloseNotify():
timer.Stop()
- t.l.Info().Msg("closed")
+ t.l.Info().Str("name", t.name).Msg("closed")
return
}
}