You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2022/04/01 01:51:11 UTC
[skywalking-banyandb] branch main updated: Improve the observability of banyandb: (#96)
This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new 1d5c0a6 Improve the observability of banyandb: (#96)
1d5c0a6 is described below
commit 1d5c0a6a4c214bc4d2ccf4a6a1925c3f3df1f716
Author: Gao Hongtao <ha...@gmail.com>
AuthorDate: Fri Apr 1 09:51:06 2022 +0800
Improve the observability of banyandb: (#96)
* Add the prometheus exporter
* Export memory table gauges
* Introduce a self running generator to generate stream data
---
api/common/id.go | 43 +++++-
banyand/internal/cmd/standalone.go | 6 +-
banyand/kv/badger.go | 17 +++
banyand/kv/kv.go | 3 +
banyand/measure/metadata.go | 6 +-
banyand/{prof/pprof.go => observability/metric.go} | 38 ++---
banyand/{prof => observability}/pprof.go | 7 +-
api/common/id.go => banyand/observability/type.go | 17 +--
banyand/stream/metadata.go | 6 +-
banyand/tsdb/block.go | 24 ++-
banyand/tsdb/index/writer.go | 3 +
banyand/tsdb/metric.go | 111 ++++++++++++++
banyand/tsdb/scope.go | 9 +-
banyand/tsdb/segment.go | 48 +++---
banyand/tsdb/seriesdb.go | 6 +
banyand/tsdb/shard.go | 50 +++++--
go.mod | 16 +-
go.sum | 30 ++--
pkg/index/index.go | 2 +
pkg/index/inverted/field_map.go | 10 ++
pkg/index/inverted/inverted.go | 14 ++
pkg/index/inverted/mem.go | 5 +
pkg/index/inverted/term_map.go | 10 ++
pkg/index/lsm/lsm.go | 5 +
pkg/index/posting/posting.go | 2 +
pkg/index/posting/roaring/roaring.go | 4 +
pkg/test/stream/traffic/searchable_template.json | 12 ++
pkg/test/stream/traffic/traffic_test.go | 163 +++++++++++++++++++++
28 files changed, 574 insertions(+), 93 deletions(-)
diff --git a/api/common/id.go b/api/common/id.go
index 9964786..f6a55b4 100644
--- a/api/common/id.go
+++ b/api/common/id.go
@@ -17,7 +17,13 @@
package common
-import "github.com/apache/skywalking-banyandb/pkg/convert"
+import (
+ "context"
+
+ "github.com/prometheus/client_golang/prometheus"
+
+ "github.com/apache/skywalking-banyandb/pkg/convert"
+)
type SeriesID uint64
type ShardID uint32
@@ -26,3 +32,38 @@ type ItemID uint64
func (s SeriesID) Marshal() []byte {
return convert.Uint64ToBytes(uint64(s))
}
+
+var PositionKey = contextPositionKey{}
+
+type contextPositionKey struct{}
+
+type Position struct {
+ Module string
+ Database string
+ Shard string
+ Segment string
+ Block string
+ KV string
+}
+
+func (p Position) Labels() prometheus.Labels {
+ return prometheus.Labels{
+ "module": p.Module,
+ "database": p.Database,
+ "shard": p.Shard,
+ "seg": p.Segment,
+ "block": p.Block,
+ "kv": p.KV,
+ }
+}
+
+func SetPosition(ctx context.Context, fn func(p Position) Position) context.Context {
+ val := ctx.Value(PositionKey)
+ var p Position
+ if val == nil {
+ p = Position{}
+ } else {
+ p = val.(Position)
+ }
+ return context.WithValue(ctx, PositionKey, fn(p))
+}
diff --git a/banyand/internal/cmd/standalone.go b/banyand/internal/cmd/standalone.go
index f9cde55..2919212 100644
--- a/banyand/internal/cmd/standalone.go
+++ b/banyand/internal/cmd/standalone.go
@@ -27,7 +27,7 @@ import (
"github.com/apache/skywalking-banyandb/banyand/liaison"
"github.com/apache/skywalking-banyandb/banyand/measure"
"github.com/apache/skywalking-banyandb/banyand/metadata"
- "github.com/apache/skywalking-banyandb/banyand/prof"
+ "github.com/apache/skywalking-banyandb/banyand/observability"
"github.com/apache/skywalking-banyandb/banyand/query"
"github.com/apache/skywalking-banyandb/banyand/queue"
"github.com/apache/skywalking-banyandb/banyand/stream"
@@ -74,7 +74,8 @@ func newStandaloneCmd() *cobra.Command {
if err != nil {
l.Fatal().Err(err).Msg("failed to initiate Endpoint transport layer")
}
- profSvc := prof.NewProfService()
+ profSvc := observability.NewProfService()
+ metricSvc := observability.NewMetricService()
// Meta the run Group units.
g.Register(
@@ -86,6 +87,7 @@ func newStandaloneCmd() *cobra.Command {
streamSvc,
q,
tcp,
+ metricSvc,
profSvc,
)
logging := logger.Logging{}
diff --git a/banyand/kv/badger.go b/banyand/kv/badger.go
index de6394b..4482721 100644
--- a/banyand/kv/badger.go
+++ b/banyand/kv/badger.go
@@ -27,6 +27,7 @@ import (
"github.com/dgraph-io/badger/v3/bydb"
"github.com/dgraph-io/badger/v3/y"
+ "github.com/apache/skywalking-banyandb/banyand/observability"
"github.com/apache/skywalking-banyandb/pkg/encoding"
"github.com/apache/skywalking-banyandb/pkg/logger"
)
@@ -47,6 +48,18 @@ type badgerTSS struct {
badger.TSet
}
+func (b *badgerTSS) Stats() (s observability.Statistics) {
+ return badgerStats(b.db)
+}
+
+func badgerStats(db *badger.DB) (s observability.Statistics) {
+ stat := db.Stats()
+ return observability.Statistics{
+ MemBytes: stat.MemBytes,
+ MaxMemBytes: db.Opts().MemTableSize,
+ }
+}
+
func (b *badgerTSS) Close() error {
if b.db != nil && !b.db.IsClosed() {
return b.db.Close()
@@ -110,6 +123,10 @@ type badgerDB struct {
db *badger.DB
}
+func (b *badgerDB) Stats() observability.Statistics {
+ return badgerStats(b.db)
+}
+
func (b *badgerDB) Handover(iterator Iterator) error {
return b.db.HandoverIterator(&mergedIter{
delegated: iterator,
diff --git a/banyand/kv/kv.go b/banyand/kv/kv.go
index fdca551..103959f 100644
--- a/banyand/kv/kv.go
+++ b/banyand/kv/kv.go
@@ -25,6 +25,7 @@ import (
"github.com/dgraph-io/badger/v3"
"github.com/pkg/errors"
+ "github.com/apache/skywalking-banyandb/banyand/observability"
"github.com/apache/skywalking-banyandb/pkg/encoding"
"github.com/apache/skywalking-banyandb/pkg/logger"
)
@@ -62,6 +63,7 @@ type Reader interface {
// Store is a common kv storage with auto-generated key
type Store interface {
+ observability.Observable
io.Closer
Writer
Reader
@@ -84,6 +86,7 @@ type TimeSeriesReader interface {
// TimeSeriesStore is time series storage
type TimeSeriesStore interface {
+ observability.Observable
io.Closer
TimeSeriesWriter
TimeSeriesReader
diff --git a/banyand/measure/metadata.go b/banyand/measure/metadata.go
index 2a2e96e..4858b30 100644
--- a/banyand/measure/metadata.go
+++ b/banyand/measure/metadata.go
@@ -21,6 +21,7 @@ import (
"context"
"time"
+ "github.com/apache/skywalking-banyandb/api/common"
"github.com/apache/skywalking-banyandb/api/event"
commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
@@ -195,7 +196,10 @@ func (s *supplier) ResourceSchema(repo metadata.Repo, md *commonv1.Metadata) (re
func (s *supplier) OpenDB(groupSchema *commonv1.Group) (tsdb.Database, error) {
return tsdb.OpenDatabase(
- context.TODO(),
+ context.WithValue(context.Background(), common.PositionKey, common.Position{
+ Module: "stream",
+ Database: groupSchema.Metadata.Name,
+ }),
tsdb.DatabaseOpts{
Location: s.path,
ShardNum: groupSchema.ResourceOpts.ShardNum,
diff --git a/banyand/prof/pprof.go b/banyand/observability/metric.go
similarity index 61%
copy from banyand/prof/pprof.go
copy to banyand/observability/metric.go
index ac27f62..29f353b 100644
--- a/banyand/prof/pprof.go
+++ b/banyand/observability/metric.go
@@ -14,59 +14,61 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
-
-package prof
+//
+package observability
import (
"net/http"
- // Register pprof package
- _ "net/http/pprof"
+
+ "github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/run"
)
var (
- _ run.Service = (*pprofService)(nil)
- _ run.Config = (*pprofService)(nil)
+ _ run.Service = (*metricService)(nil)
+ _ run.Config = (*metricService)(nil)
)
-func NewProfService() run.Service {
- return &pprofService{
+func NewMetricService() run.Service {
+ return &metricService{
stopCh: make(chan struct{}),
}
}
-type pprofService struct {
+type metricService struct {
listenAddr string
stopCh chan struct{}
l *logger.Logger
}
-func (p *pprofService) FlagSet() *run.FlagSet {
- flagSet := run.NewFlagSet("prof")
- flagSet.StringVar(&p.listenAddr, "pprof-listener-addr", "127.0.0.1:6060", "listen addr for pprof")
+func (p *metricService) FlagSet() *run.FlagSet {
+ flagSet := run.NewFlagSet("observability")
+ flagSet.StringVar(&p.listenAddr, "observability-listener-addr", ":2121", "listen addr for observability")
return flagSet
}
-func (p *pprofService) Validate() error {
+func (p *metricService) Validate() error {
return nil
}
-func (p *pprofService) Name() string {
- return "pprof-service"
+func (p *metricService) Name() string {
+ return "metric-service"
}
-func (p *pprofService) Serve() run.StopNotify {
+func (p *metricService) Serve() run.StopNotify {
p.l = logger.GetLogger(p.Name())
+ http.Handle("/metrics", promhttp.Handler())
go func() {
- p.l.Info().Str("listenAddr", p.listenAddr).Msg("Start pprof server")
+ p.l.Info().Str("listenAddr", p.listenAddr).Msg("Start metric server")
_ = http.ListenAndServe(p.listenAddr, nil)
+ p.stopCh <- struct{}{}
}()
return p.stopCh
}
-func (p *pprofService) GracefulStop() {
+func (p *metricService) GracefulStop() {
close(p.stopCh)
}
diff --git a/banyand/prof/pprof.go b/banyand/observability/pprof.go
similarity index 93%
rename from banyand/prof/pprof.go
rename to banyand/observability/pprof.go
index ac27f62..6e9bfe8 100644
--- a/banyand/prof/pprof.go
+++ b/banyand/observability/pprof.go
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-package prof
+package observability
import (
"net/http"
@@ -27,8 +27,8 @@ import (
)
var (
- _ run.Service = (*pprofService)(nil)
- _ run.Config = (*pprofService)(nil)
+ _ run.Service = (*metricService)(nil)
+ _ run.Config = (*metricService)(nil)
)
func NewProfService() run.Service {
@@ -62,6 +62,7 @@ func (p *pprofService) Serve() run.StopNotify {
go func() {
p.l.Info().Str("listenAddr", p.listenAddr).Msg("Start pprof server")
_ = http.ListenAndServe(p.listenAddr, nil)
+ p.stopCh <- struct{}{}
}()
return p.stopCh
diff --git a/api/common/id.go b/banyand/observability/type.go
similarity index 79%
copy from api/common/id.go
copy to banyand/observability/type.go
index 9964786..1b26330 100644
--- a/api/common/id.go
+++ b/banyand/observability/type.go
@@ -14,15 +14,14 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
+//
+package observability
-package common
-
-import "github.com/apache/skywalking-banyandb/pkg/convert"
-
-type SeriesID uint64
-type ShardID uint32
-type ItemID uint64
+type Statistics struct {
+ MemBytes int64
+ MaxMemBytes int64
+}
-func (s SeriesID) Marshal() []byte {
- return convert.Uint64ToBytes(uint64(s))
+type Observable interface {
+ Stats() Statistics
}
diff --git a/banyand/stream/metadata.go b/banyand/stream/metadata.go
index 97972ba..f1b8f79 100644
--- a/banyand/stream/metadata.go
+++ b/banyand/stream/metadata.go
@@ -21,6 +21,7 @@ import (
"context"
"time"
+ "github.com/apache/skywalking-banyandb/api/common"
"github.com/apache/skywalking-banyandb/api/event"
commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
@@ -195,7 +196,10 @@ func (s *supplier) ResourceSchema(repo metadata.Repo, md *commonv1.Metadata) (re
func (s *supplier) OpenDB(groupSchema *commonv1.Group) (tsdb.Database, error) {
return tsdb.OpenDatabase(
- context.TODO(),
+ context.WithValue(context.Background(), common.PositionKey, common.Position{
+ Module: "stream",
+ Database: groupSchema.Metadata.Name,
+ }),
tsdb.DatabaseOpts{
Location: s.path,
ShardNum: groupSchema.ResourceOpts.ShardNum,
diff --git a/banyand/tsdb/block.go b/banyand/tsdb/block.go
index f93b446..cafe15c 100644
--- a/banyand/tsdb/block.go
+++ b/banyand/tsdb/block.go
@@ -29,6 +29,7 @@ import (
"github.com/apache/skywalking-banyandb/api/common"
"github.com/apache/skywalking-banyandb/banyand/kv"
+ "github.com/apache/skywalking-banyandb/banyand/observability"
"github.com/apache/skywalking-banyandb/banyand/tsdb/bucket"
"github.com/apache/skywalking-banyandb/pkg/encoding"
"github.com/apache/skywalking-banyandb/pkg/index"
@@ -39,12 +40,13 @@ import (
)
type block struct {
- path string
- l *logger.Logger
- suffix string
- ref *z.Closer
- lock sync.RWMutex
- closed *atomic.Bool
+ path string
+ l *logger.Logger
+ suffix string
+ ref *z.Closer
+ lock sync.RWMutex
+ closed *atomic.Bool
+ position common.Position
store kv.TimeSeriesStore
primaryIndex index.Store
@@ -90,6 +92,10 @@ func newBlock(ctx context.Context, opts blockOpts) (b *block, err error) {
closed: atomic.NewBool(true),
encodingMethod: encodingMethodObject.(EncodingMethod),
}
+ position := ctx.Value(common.PositionKey)
+ if position != nil {
+ b.position = position.(common.Position)
+ }
return b, err
}
@@ -172,6 +178,12 @@ func (b *block) String() string {
return b.Reporter.String()
}
+func (b *block) stats() (names []string, stats []observability.Statistics) {
+ names = append(names, "main", "p-idx", "si-idx", "sl-idx")
+ stats = append(stats, b.store.Stats(), b.primaryIndex.Stats(), b.invertedIndex.Stats(), b.lsmIndex.Stats())
+ return names, stats
+}
+
type blockDelegate interface {
io.Closer
contains(ts time.Time) bool
diff --git a/banyand/tsdb/index/writer.go b/banyand/tsdb/index/writer.go
index 424155f..bb459a0 100644
--- a/banyand/tsdb/index/writer.go
+++ b/banyand/tsdb/index/writer.go
@@ -198,6 +198,9 @@ func getIndexValue(ruleIndex *partition.IndexRuleLocator, value Value) (val []by
var existInt bool
for _, tIndex := range ruleIndex.TagIndices {
tag, err := partition.GetTagByOffset(value.TagFamilies, tIndex.FamilyOffset, tIndex.TagOffset)
+ if errors.Is(err, partition.ErrMalformedElement) {
+ continue
+ }
if err != nil {
return nil, false, errors.WithMessagef(err, "index rule:%v", ruleIndex.Rule.Metadata)
}
diff --git a/banyand/tsdb/metric.go b/banyand/tsdb/metric.go
new file mode 100644
index 0000000..9c4005b
--- /dev/null
+++ b/banyand/tsdb/metric.go
@@ -0,0 +1,111 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+package tsdb
+
+import (
+ "time"
+
+ "github.com/prometheus/client_golang/prometheus"
+ "github.com/prometheus/client_golang/prometheus/promauto"
+
+ "github.com/apache/skywalking-banyandb/banyand/observability"
+)
+
+const statInterval = 5 * time.Second
+
+var (
+ mtBytes *prometheus.GaugeVec
+ maxMtBytes *prometheus.GaugeVec
+)
+
+func init() {
+ labels := []string{"module", "database", "shard", "component"}
+ mtBytes = promauto.NewGaugeVec(
+ prometheus.GaugeOpts{
+ Name: "banyand_memtables_bytes",
+ Help: "Memory table size in bytes",
+ },
+ labels,
+ )
+ maxMtBytes = promauto.NewGaugeVec(
+ prometheus.GaugeOpts{
+ Name: "banyand_memtables_max_bytes",
+ Help: "Maximum amount of memory table available in bytes",
+ },
+ labels,
+ )
+}
+
+func (s *shard) runStat() {
+ go func() {
+ ticker := time.NewTicker(statInterval)
+ defer ticker.Stop()
+ for {
+ select {
+ case <-ticker.C:
+ s.stat()
+ case <-s.stopCh:
+ return
+ }
+ }
+ }()
+}
+
+func (s *shard) stat() {
+ seriesStat := s.seriesDatabase.Stats()
+ s.curry(mtBytes).WithLabelValues("series").Set(float64(seriesStat.MemBytes))
+ s.curry(maxMtBytes).WithLabelValues("series").Set(float64(seriesStat.MaxMemBytes))
+ segStats := observability.Statistics{}
+ blockStats := observability.Statistics{}
+ localIndexStats := observability.Statistics{}
+ for _, seg := range s.segmentController.segments() {
+ segStat := seg.Stats()
+ segStats.MaxMemBytes += segStat.MaxMemBytes
+ segStats.MemBytes += segStat.MemBytes
+ for _, b := range seg.blockController.blocks() {
+ if b.closed.Load() {
+ continue
+ }
+ names, bss := b.stats()
+ for i, bs := range bss {
+ if names[i] == "main" {
+ blockStats.MaxMemBytes += bs.MaxMemBytes
+ blockStats.MemBytes += bs.MemBytes
+ continue
+ }
+ localIndexStats.MaxMemBytes += bs.MaxMemBytes
+ localIndexStats.MemBytes += bs.MemBytes
+ }
+
+ }
+ }
+ s.curry(mtBytes).WithLabelValues("global-index").Set(float64(segStats.MemBytes))
+ s.curry(maxMtBytes).WithLabelValues("global-index").Set(float64(segStats.MaxMemBytes))
+ s.curry(mtBytes).WithLabelValues("block").Set(float64(blockStats.MemBytes))
+ s.curry(maxMtBytes).WithLabelValues("block").Set(float64(blockStats.MaxMemBytes))
+ s.curry(mtBytes).WithLabelValues("local-index").Set(float64(localIndexStats.MemBytes))
+ s.curry(maxMtBytes).WithLabelValues("local-index").Set(float64(localIndexStats.MaxMemBytes))
+}
+
+func (s *shard) curry(gv *prometheus.GaugeVec) *prometheus.GaugeVec {
+ return gv.MustCurryWith(prometheus.Labels{
+ "module": s.position.Module,
+ "database": s.position.Database,
+ "shard": s.position.Shard,
+ })
+}
diff --git a/banyand/tsdb/scope.go b/banyand/tsdb/scope.go
index cbbe687..e2d0ff2 100644
--- a/banyand/tsdb/scope.go
+++ b/banyand/tsdb/scope.go
@@ -17,7 +17,10 @@
//
package tsdb
-import "github.com/apache/skywalking-banyandb/api/common"
+import (
+ "github.com/apache/skywalking-banyandb/api/common"
+ "github.com/apache/skywalking-banyandb/banyand/observability"
+)
var _ Shard = (*ScopedShard)(nil)
@@ -64,6 +67,10 @@ type scopedSeriesDatabase struct {
delegated SeriesDatabase
}
+func (sdd *scopedSeriesDatabase) Stats() observability.Statistics {
+ return sdd.delegated.Stats()
+}
+
func (sdd *scopedSeriesDatabase) Close() error {
return nil
}
diff --git a/banyand/tsdb/segment.go b/banyand/tsdb/segment.go
index 2db1f91..bf50669 100644
--- a/banyand/tsdb/segment.go
+++ b/banyand/tsdb/segment.go
@@ -24,7 +24,9 @@ import (
"sync"
"time"
+ "github.com/apache/skywalking-banyandb/api/common"
"github.com/apache/skywalking-banyandb/banyand/kv"
+ "github.com/apache/skywalking-banyandb/banyand/observability"
"github.com/apache/skywalking-banyandb/banyand/tsdb/bucket"
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/timestamp"
@@ -54,16 +56,17 @@ func openSegment(ctx context.Context, startTime time.Time, path, suffix string,
id := uint16(segmentSize.Unit)<<12 | ((uint16(suffixInteger) << 4) >> 4)
timeRange := timestamp.NewTimeRange(startTime, segmentSize.NextTime(startTime), true, false)
l := logger.Fetch(ctx, "segment")
+ segCtx := context.WithValue(ctx, logger.ContextKey, l)
s = &segment{
id: id,
path: path,
suffix: suffix,
l: l,
- blockController: newBlockController(id, path, timeRange, blockSize, l, blockQueue),
+ blockController: newBlockController(segCtx, id, path, timeRange, blockSize, l, blockQueue),
TimeRange: timeRange,
Reporter: bucket.NewTimeBasedReporter(timeRange),
}
- err = s.blockController.open(context.WithValue(ctx, logger.ContextKey, s.l))
+ err = s.blockController.open()
if err != nil {
return nil, err
}
@@ -98,8 +101,13 @@ func (s segment) String() string {
return s.Reporter.String()
}
+func (s *segment) Stats() observability.Statistics {
+ return s.globalIndex.Stats()
+}
+
type blockController struct {
sync.RWMutex
+ segCtx context.Context
segID uint16
location string
segTimeRange timestamp.TimeRange
@@ -110,9 +118,10 @@ type blockController struct {
l *logger.Logger
}
-func newBlockController(segID uint16, location string, segTimeRange timestamp.TimeRange,
+func newBlockController(segCtx context.Context, segID uint16, location string, segTimeRange timestamp.TimeRange,
blockSize IntervalRule, l *logger.Logger, blockQueue bucket.Queue) *blockController {
return &blockController{
+ segCtx: segCtx,
segID: segID,
location: location,
blockSize: blockSize,
@@ -140,7 +149,7 @@ func (bc *blockController) Current() bucket.Reporter {
func (bc *blockController) Next() (bucket.Reporter, error) {
b := bc.Current().(*block)
- reporter, err := bc.create(context.TODO(),
+ reporter, err := bc.create(
bc.blockSize.NextTime(b.Start))
if errors.Is(err, ErrEndOfSegment) {
return nil, bucket.ErrNoMoreBucket
@@ -282,19 +291,19 @@ func (bc *blockController) startTime(suffix string) (time.Time, error) {
panic("invalid interval unit")
}
-func (bc *blockController) open(ctx context.Context) error {
+func (bc *blockController) open() error {
err := WalkDir(
bc.location,
segPathPrefix,
func(suffix, absolutePath string) error {
- _, err := bc.load(ctx, suffix, absolutePath)
+ _, err := bc.load(suffix, absolutePath)
return err
})
if err != nil {
return err
}
if bc.Current() == nil {
- b, err := bc.create(ctx, time.Now())
+ b, err := bc.create(time.Now())
if err != nil {
return err
}
@@ -305,7 +314,7 @@ func (bc *blockController) open(ctx context.Context) error {
return nil
}
-func (bc *blockController) create(ctx context.Context, startTime time.Time) (*block, error) {
+func (bc *blockController) create(startTime time.Time) (*block, error) {
if startTime.Before(bc.segTimeRange.Start) {
startTime = bc.segTimeRange.Start
}
@@ -317,21 +326,26 @@ func (bc *blockController) create(ctx context.Context, startTime time.Time) (*bl
if err != nil {
return nil, err
}
- return bc.load(ctx, suffix, segPath)
+ return bc.load(suffix, segPath)
}
-func (bc *blockController) load(ctx context.Context, suffix, path string) (b *block, err error) {
+func (bc *blockController) load(suffix, path string) (b *block, err error) {
starTime, err := bc.startTime(suffix)
if err != nil {
return nil, err
}
- if b, err = newBlock(ctx, blockOpts{
- segID: bc.segID,
- path: path,
- startTime: starTime,
- suffix: suffix,
- blockSize: bc.blockSize,
- }); err != nil {
+ if b, err = newBlock(
+ common.SetPosition(bc.segCtx, func(p common.Position) common.Position {
+ p.Block = suffix
+ return p
+ }),
+ blockOpts{
+ segID: bc.segID,
+ path: path,
+ startTime: starTime,
+ suffix: suffix,
+ blockSize: bc.blockSize,
+ }); err != nil {
return nil, err
}
bc.Lock()
diff --git a/banyand/tsdb/seriesdb.go b/banyand/tsdb/seriesdb.go
index ed62632..4afb96f 100644
--- a/banyand/tsdb/seriesdb.go
+++ b/banyand/tsdb/seriesdb.go
@@ -28,6 +28,7 @@ import (
"github.com/apache/skywalking-banyandb/api/common"
"github.com/apache/skywalking-banyandb/banyand/kv"
+ "github.com/apache/skywalking-banyandb/banyand/observability"
"github.com/apache/skywalking-banyandb/pkg/convert"
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/timestamp"
@@ -119,6 +120,7 @@ func (p Path) Prepand(entry Entry) Path {
}
type SeriesDatabase interface {
+ observability.Observable
io.Closer
GetByID(id common.SeriesID) (Series, error)
Get(entity Entity) (Series, error)
@@ -250,6 +252,10 @@ func (s *seriesDB) context() context.Context {
return context.WithValue(context.Background(), logger.ContextKey, s.l)
}
+func (s *seriesDB) Stats() observability.Statistics {
+ return s.seriesMetadata.Stats()
+}
+
func (s *seriesDB) Close() error {
return s.seriesMetadata.Close()
}
diff --git a/banyand/tsdb/shard.go b/banyand/tsdb/shard.go
index cd4120c..3299e57 100644
--- a/banyand/tsdb/shard.go
+++ b/banyand/tsdb/shard.go
@@ -36,13 +36,15 @@ const defaultBlockQueueSize = 1 << 4
var _ Shard = (*shard)(nil)
type shard struct {
- l *logger.Logger
- id common.ShardID
+ l *logger.Logger
+ id common.ShardID
+ position common.Position
seriesDatabase SeriesDatabase
indexDatabase IndexDatabase
segmentController *segmentController
segmentManageStrategy *bucket.Strategy
+ stopCh chan struct{}
}
func OpenShard(ctx context.Context, id common.ShardID,
@@ -56,7 +58,12 @@ func OpenShard(ctx context.Context, id common.ShardID,
if openedBlockSize < 1 {
openedBlockSize = defaultBlockQueueSize
}
- sc, err := newSegmentController(path, segmentSize, blockSize, openedBlockSize, l)
+ shardCtx := context.WithValue(ctx, logger.ContextKey, l)
+ shardCtx = common.SetPosition(shardCtx, func(p common.Position) common.Position {
+ p.Shard = strconv.Itoa(int(id))
+ return p
+ })
+ sc, err := newSegmentController(shardCtx, path, segmentSize, blockSize, openedBlockSize, l)
if err != nil {
return nil, errors.Wrapf(err, "create the segment controller of the shard %d", int(id))
}
@@ -64,9 +71,9 @@ func OpenShard(ctx context.Context, id common.ShardID,
id: id,
segmentController: sc,
l: l,
+ stopCh: make(chan struct{}),
}
- shardCtx := context.WithValue(ctx, logger.ContextKey, s.l)
- err = s.segmentController.open(shardCtx)
+ err = s.segmentController.open()
if err != nil {
return nil, err
}
@@ -89,6 +96,11 @@ func OpenShard(ctx context.Context, id common.ShardID,
return nil, err
}
s.segmentManageStrategy.Run()
+ position := shardCtx.Value(common.PositionKey)
+ if position != nil {
+ s.position = position.(common.Position)
+ }
+ s.runStat()
return s, nil
}
@@ -123,7 +135,9 @@ func (s *shard) State() (shardState ShardState) {
func (s *shard) Close() error {
s.segmentManageStrategy.Close()
s.segmentController.close()
- return s.seriesDatabase.Close()
+ err := s.seriesDatabase.Close()
+ close(s.stopCh)
+ return err
}
type IntervalUnit int
@@ -178,6 +192,7 @@ func (ir IntervalRule) EstimatedDuration() time.Duration {
type segmentController struct {
sync.RWMutex
+ shardCtx context.Context
location string
segmentSize IntervalRule
blockSize IntervalRule
@@ -187,8 +202,10 @@ type segmentController struct {
l *logger.Logger
}
-func newSegmentController(location string, segmentSize, blockSize IntervalRule, openedBlockSize int, l *logger.Logger) (*segmentController, error) {
+func newSegmentController(shardCtx context.Context, location string,
+ segmentSize, blockSize IntervalRule, openedBlockSize int, l *logger.Logger) (*segmentController, error) {
sc := &segmentController{
+ shardCtx: shardCtx,
location: location,
segmentSize: segmentSize,
blockSize: blockSize,
@@ -264,7 +281,7 @@ func (sc *segmentController) Current() bucket.Reporter {
func (sc *segmentController) Next() (bucket.Reporter, error) {
seg := sc.Current().(*segment)
- reporter, err := sc.create(context.TODO(), sc.Format(
+ reporter, err := sc.create(sc.Format(
sc.segmentSize.NextTime(seg.Start)))
if errors.Is(err, ErrEndOfSegment) {
return nil, bucket.ErrNoMoreBucket
@@ -307,12 +324,12 @@ func (sc *segmentController) Parse(value string) (time.Time, error) {
panic("invalid interval unit")
}
-func (sc *segmentController) open(ctx context.Context) error {
+func (sc *segmentController) open() error {
err := WalkDir(
sc.location,
segPathPrefix,
func(suffix, absolutePath string) error {
- _, err := sc.load(ctx, suffix, absolutePath)
+ _, err := sc.load(suffix, absolutePath)
if errors.Is(err, ErrEndOfSegment) {
return nil
}
@@ -322,7 +339,7 @@ func (sc *segmentController) open(ctx context.Context) error {
return err
}
if sc.Current() == nil {
- _, err = sc.create(ctx, sc.Format(time.Now()))
+ _, err = sc.create(sc.Format(time.Now()))
if err != nil {
return err
}
@@ -330,20 +347,23 @@ func (sc *segmentController) open(ctx context.Context) error {
return nil
}
-func (sc *segmentController) create(ctx context.Context, suffix string) (*segment, error) {
+func (sc *segmentController) create(suffix string) (*segment, error) {
segPath, err := mkdir(segTemplate, sc.location, suffix)
if err != nil {
return nil, err
}
- return sc.load(ctx, suffix, segPath)
+ return sc.load(suffix, segPath)
}
-func (sc *segmentController) load(ctx context.Context, suffix, path string) (seg *segment, err error) {
+func (sc *segmentController) load(suffix, path string) (seg *segment, err error) {
startTime, err := sc.Parse(suffix)
if err != nil {
return nil, err
}
- seg, err = openSegment(ctx, startTime, path, suffix, sc.segmentSize, sc.blockSize, sc.blockQueue)
+ seg, err = openSegment(common.SetPosition(sc.shardCtx, func(p common.Position) common.Position {
+ p.Segment = suffix
+ return p
+ }), startTime, path, suffix, sc.segmentSize, sc.blockSize, sc.blockQueue)
if err != nil {
return nil, err
}
diff --git a/go.mod b/go.mod
index a445dc6..e8db453 100644
--- a/go.mod
+++ b/go.mod
@@ -25,7 +25,7 @@ require (
go.etcd.io/etcd/server/v3 v3.5.0
go.uber.org/multierr v1.7.0
golang.org/x/net v0.0.0-20210716203947-853a461950ff // indirect
- golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c // indirect
+ golang.org/x/sys v0.0.0-20220114195835-da31bd327af9 // indirect
google.golang.org/genproto v0.0.0-20210722135532-667f2b7c528f // indirect
google.golang.org/grpc v1.39.0
google.golang.org/protobuf v1.27.1
@@ -34,7 +34,7 @@ require (
require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/bits-and-blooms/bitset v1.2.0 // indirect
- github.com/cespare/xxhash/v2 v2.1.1 // indirect
+ github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/coreos/go-semver v0.3.0 // indirect
github.com/coreos/go-systemd/v22 v22.3.2 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
@@ -55,19 +55,19 @@ require (
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/inconshreveable/mousetrap v1.0.0 // indirect
github.com/jonboulle/clockwork v0.2.2 // indirect
- github.com/json-iterator/go v1.1.11 // indirect
+ github.com/json-iterator/go v1.1.12 // indirect
github.com/magiconair/properties v1.8.5 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
github.com/mitchellh/mapstructure v1.4.1 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
- github.com/modern-go/reflect2 v1.0.1 // indirect
+ github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/mschoch/smat v0.2.0 // indirect
github.com/pelletier/go-toml v1.9.3 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
- github.com/prometheus/client_golang v1.11.0 // indirect
+ github.com/prometheus/client_golang v1.12.1
github.com/prometheus/client_model v0.2.0 // indirect
- github.com/prometheus/common v0.26.0 // indirect
- github.com/prometheus/procfs v0.6.0 // indirect
+ github.com/prometheus/common v0.32.1 // indirect
+ github.com/prometheus/procfs v0.7.3 // indirect
github.com/sirupsen/logrus v1.7.0 // indirect
github.com/soheilhy/cmux v0.1.5 // indirect
github.com/spf13/afero v1.6.0 // indirect
@@ -107,4 +107,4 @@ require (
sigs.k8s.io/yaml v1.2.0 // indirect
)
-replace github.com/dgraph-io/badger/v3 v3.2011.1 => github.com/SkyAPM/badger/v3 v3.0.0-20220117144524-89781ee8a386
+replace github.com/dgraph-io/badger/v3 v3.2011.1 => github.com/SkyAPM/badger/v3 v3.0.0-20220331082938-b907904d6089
diff --git a/go.sum b/go.sum
index ee831c2..4189cd5 100644
--- a/go.sum
+++ b/go.sum
@@ -47,8 +47,8 @@ github.com/RoaringBitmap/gocroaring v0.4.0/go.mod h1:NieMwz7ZqwU2DD73/vvYwv7r4eW
github.com/RoaringBitmap/real-roaring-datasets v0.0.0-20190726190000-eb7c87156f76/go.mod h1:oM0MHmQ3nDsq609SS36p+oYbRi16+oVvU2Bw4Ipv0SE=
github.com/RoaringBitmap/roaring v0.9.1 h1:5PRizBmoN/PfV17nPNQou4dHQ7NcJi8FO/bihdYyCEM=
github.com/RoaringBitmap/roaring v0.9.1/go.mod h1:h1B7iIUOmnAeb5ytYMvnHJwxMc6LUrwBnzXWRuqTQUc=
-github.com/SkyAPM/badger/v3 v3.0.0-20220117144524-89781ee8a386 h1:2VmCNyGlF/yY+Ev9bxCiPcKGaWSEQDiRCRiCJYWPh7o=
-github.com/SkyAPM/badger/v3 v3.0.0-20220117144524-89781ee8a386/go.mod h1:RHo4/GmYcKKh5Lxu63wLEMHJ70Pac2JqZRYGhlyAo2M=
+github.com/SkyAPM/badger/v3 v3.0.0-20220331082938-b907904d6089 h1:MEqSQssrKviX+uU4xfAcIvvpFeTamsT3cPc9k4Ol4Ks=
+github.com/SkyAPM/badger/v3 v3.0.0-20220331082938-b907904d6089/go.mod h1:Q0luV7nB94o3Bl4hYqAPy03+QTtLxs9pWdUEQb0i0K0=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
@@ -76,8 +76,9 @@ github.com/certifi/gocertifi v0.0.0-20200922220541-2c3bb06c6054 h1:uH66TXeswKn5P
github.com/certifi/gocertifi v0.0.0-20200922220541-2c3bb06c6054/go.mod h1:sGbDF6GwGcLpkNXPUTkMRoywsNa/ol15pxFe6ERfguA=
github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko=
github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
-github.com/cespare/xxhash/v2 v2.1.1 h1:6MnRN8NT7+YBpUIWxHtefFZOKTAPgGjpQSxqLNn0+qY=
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
+github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE=
+github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI=
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI=
github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU=
@@ -277,8 +278,9 @@ github.com/jonboulle/clockwork v0.2.2/go.mod h1:Pkfl5aHPm1nk2H9h0bjmnJD/BcgbGXUB
github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4=
github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
-github.com/json-iterator/go v1.1.11 h1:uVUAXhF2To8cbw/3xN3pxj6kk7TYKs98NIrTqPlMWAQ=
github.com/json-iterator/go v1.1.11/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
+github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
+github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU=
github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk=
github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo=
@@ -325,8 +327,9 @@ github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJ
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
-github.com/modern-go/reflect2 v1.0.1 h1:9f412s+6RmYXLWZSEzVVgPGK7C2PphHj5RJrvfx9AWI=
github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
+github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M=
+github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
github.com/mschoch/smat v0.2.0 h1:8imxQsjDm8yFEAVBe7azKmKSgzSkZXDuKkSq9374khM=
github.com/mschoch/smat v0.2.0/go.mod h1:kc9mz7DoBKqDyiRL7VZN8KvXQMWeTaVnttLRXOlotKw=
github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
@@ -364,8 +367,9 @@ github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXP
github.com/prometheus/client_golang v0.9.3/go.mod h1:/TN21ttK/J9q6uSwhBd54HahCDft0ttaMvbicHlPoso=
github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo=
github.com/prometheus/client_golang v1.7.1/go.mod h1:PY5Wy2awLA44sXw4AOSfFBetzPP4j5+D6mVACh+pe2M=
-github.com/prometheus/client_golang v1.11.0 h1:HNkLOAEQMIDv/K+04rukrLx6ch7msSRwf3/SASFAGtQ=
github.com/prometheus/client_golang v1.11.0/go.mod h1:Z6t4BnS23TR94PD6BsDNk8yVqroYurpAkEiz0P2BEV0=
+github.com/prometheus/client_golang v1.12.1 h1:ZiaPsmm9uiBeaSMRznKsCDNtPCS0T3JVDGF+06gjBzk=
+github.com/prometheus/client_golang v1.12.1/go.mod h1:3Z9XVyYiZYEO+YQWt3RD2R3jrbd179Rt297l4aS6nDY=
github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo=
github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
@@ -375,14 +379,16 @@ github.com/prometheus/common v0.0.0-20181113130724-41aa239b4cce/go.mod h1:daVV7q
github.com/prometheus/common v0.4.0/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4=
github.com/prometheus/common v0.4.1/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4=
github.com/prometheus/common v0.10.0/go.mod h1:Tlit/dnDKsSWFlCLTWaA1cyBgKHSMdTB80sz/V91rCo=
-github.com/prometheus/common v0.26.0 h1:iMAkS2TDoNWnKM+Kopnx/8tnEStIfpYA0ur0xQzzhMQ=
github.com/prometheus/common v0.26.0/go.mod h1:M7rCNAaPfAosfx8veZJCuw84e35h3Cfd9VFqTh1DIvc=
+github.com/prometheus/common v0.32.1 h1:hWIdL3N2HoUx3B8j3YN9mWor0qhY/NlEKZEaXxuIRh4=
+github.com/prometheus/common v0.32.1/go.mod h1:vu+V0TpY+O6vW9J44gczi3Ap/oXXR10b+M/gUGO4Hls=
github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk=
github.com/prometheus/procfs v0.0.0-20190507164030-5867b95ac084/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA=
github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA=
github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU=
-github.com/prometheus/procfs v0.6.0 h1:mxy4L2jP6qMonqmq+aTtOx1ifVWUgG/TAmntgbh3xv4=
github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA=
+github.com/prometheus/procfs v0.7.3 h1:4jVXhlkAyzOScmCkXBTOLRLTz8EeU+eyjrwB/EPq0VU=
+github.com/prometheus/procfs v0.7.3/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA=
github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU=
github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
@@ -604,6 +610,7 @@ golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v
golang.org/x/net v0.0.0-20210316092652-d523dce5a7f4/go.mod h1:RBQZq4jEuRlivfhVLdyRGr576XBO4/greRjx4P4O3yc=
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
golang.org/x/net v0.0.0-20210428140749-89ef3d95e781/go.mod h1:OJAsFXCWl8Ukc7SiCT/9KSuxbyM7479/AVlXFRxuMCk=
+golang.org/x/net v0.0.0-20210525063256-abc453219eb5/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20210716203947-853a461950ff h1:j2EK/QoxYNBsXI4R7fQkkRUk8y6wnOBI+6hgPdP/6Ds=
golang.org/x/net v0.0.0-20210716203947-853a461950ff/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
@@ -617,8 +624,9 @@ golang.org/x/oauth2 v0.0.0-20201208152858-08078c50e5b5/go.mod h1:KelEdhl1UZF7XfJ
golang.org/x/oauth2 v0.0.0-20210218202405-ba52d332ba99/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A=
golang.org/x/oauth2 v0.0.0-20210220000619-9bb904979d93/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A=
golang.org/x/oauth2 v0.0.0-20210313182246-cd4f82c27b84/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A=
-golang.org/x/oauth2 v0.0.0-20210402161424-2e8d93401602 h1:0Ja1LBD+yisY6RWM/BH7TJVXWsSjs2VwBSmvSX4HdBc=
golang.org/x/oauth2 v0.0.0-20210402161424-2e8d93401602/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A=
+golang.org/x/oauth2 v0.0.0-20210514164344-f6687ab2804c h1:pkQiBZBvdos9qq4wBAHqlzuZHEXo07pqV06ef90u1WI=
+golang.org/x/oauth2 v0.0.0-20210514164344-f6687ab2804c/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
@@ -688,8 +696,8 @@ golang.org/x/sys v0.0.0-20210403161142-5e06dd20ab57/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
-golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c h1:F1jZWGFhYfh0Ci55sIpILtKKK8p3i2/krTr0H1rg74I=
-golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.0.0-20220114195835-da31bd327af9 h1:XfKQ4OlFl8okEOr5UvAqFRVj8pY/4yfcXrddB8qAbU0=
+golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
diff --git a/pkg/index/index.go b/pkg/index/index.go
index 39bd478..82acfd8 100644
--- a/pkg/index/index.go
+++ b/pkg/index/index.go
@@ -25,6 +25,7 @@ import (
"github.com/apache/skywalking-banyandb/api/common"
modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+ "github.com/apache/skywalking-banyandb/banyand/observability"
"github.com/apache/skywalking-banyandb/pkg/convert"
"github.com/apache/skywalking-banyandb/pkg/index/metadata"
"github.com/apache/skywalking-banyandb/pkg/index/posting"
@@ -172,6 +173,7 @@ type Searcher interface {
}
type Store interface {
+ observability.Observable
io.Closer
Writer
Searcher
diff --git a/pkg/index/inverted/field_map.go b/pkg/index/inverted/field_map.go
index e94df9a..9b719e0 100644
--- a/pkg/index/inverted/field_map.go
+++ b/pkg/index/inverted/field_map.go
@@ -21,6 +21,7 @@ import (
"sync"
"github.com/apache/skywalking-banyandb/api/common"
+ "github.com/apache/skywalking-banyandb/banyand/observability"
"github.com/apache/skywalking-banyandb/pkg/convert"
"github.com/apache/skywalking-banyandb/pkg/index"
)
@@ -72,6 +73,15 @@ func (fm *fieldMap) put(fv index.Field, id common.ItemID) error {
return pm.value.put(fv.Term, id)
}
+func (fm *fieldMap) Stats() (s observability.Statistics) {
+ for _, pv := range fm.repo {
+ // 8 is the size of key
+ s.MemBytes += 8
+ s.MemBytes += pv.value.Stats().MemBytes
+ }
+ return s
+}
+
type termContainer struct {
key index.FieldKey
value *termMap
diff --git a/pkg/index/inverted/inverted.go b/pkg/index/inverted/inverted.go
index 7387b63..9af43fd 100644
--- a/pkg/index/inverted/inverted.go
+++ b/pkg/index/inverted/inverted.go
@@ -27,6 +27,7 @@ import (
"github.com/apache/skywalking-banyandb/api/common"
modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
"github.com/apache/skywalking-banyandb/banyand/kv"
+ "github.com/apache/skywalking-banyandb/banyand/observability"
"github.com/apache/skywalking-banyandb/pkg/index"
"github.com/apache/skywalking-banyandb/pkg/index/metadata"
"github.com/apache/skywalking-banyandb/pkg/index/posting"
@@ -95,6 +96,19 @@ func (s *store) Flush() error {
return nil
}
+func (s *store) Stats() (stat observability.Statistics) {
+ s.rwMutex.RLock()
+ defer s.rwMutex.RUnlock()
+ //TODO: add MaxMem
+ main := s.memTable.Stats()
+ stat.MemBytes += main.MemBytes
+ if s.immutableMemTable != nil {
+ sub := s.immutableMemTable.Stats()
+ stat.MemBytes += sub.MemBytes
+ }
+ return stat
+}
+
func (s *store) MatchField(fieldKey index.FieldKey) (posting.List, error) {
return s.Range(fieldKey, index.RangeOpts{})
}
diff --git a/pkg/index/inverted/mem.go b/pkg/index/inverted/mem.go
index aed68fe..279a9d4 100644
--- a/pkg/index/inverted/mem.go
+++ b/pkg/index/inverted/mem.go
@@ -26,6 +26,7 @@ import (
"github.com/apache/skywalking-banyandb/api/common"
modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
"github.com/apache/skywalking-banyandb/banyand/kv"
+ "github.com/apache/skywalking-banyandb/banyand/observability"
"github.com/apache/skywalking-banyandb/pkg/index"
"github.com/apache/skywalking-banyandb/pkg/index/metadata"
"github.com/apache/skywalking-banyandb/pkg/index/posting"
@@ -51,6 +52,10 @@ func (m *memTable) Write(field index.Field, itemID common.ItemID) error {
return m.fields.put(field, itemID)
}
+func (m *memTable) Stats() observability.Statistics {
+ return m.fields.Stats()
+}
+
var _ index.FieldIterator = (*fIterator)(nil)
type fIterator struct {
diff --git a/pkg/index/inverted/term_map.go b/pkg/index/inverted/term_map.go
index e683837..a3d9adb 100644
--- a/pkg/index/inverted/term_map.go
+++ b/pkg/index/inverted/term_map.go
@@ -21,6 +21,7 @@ import (
"sync"
"github.com/apache/skywalking-banyandb/api/common"
+ "github.com/apache/skywalking-banyandb/banyand/observability"
"github.com/apache/skywalking-banyandb/pkg/convert"
"github.com/apache/skywalking-banyandb/pkg/index"
"github.com/apache/skywalking-banyandb/pkg/index/posting"
@@ -82,3 +83,12 @@ func (p *termMap) getEntry(key []byte) *index.PostingValue {
}
return v
}
+
+func (p *termMap) Stats() (s observability.Statistics) {
+ for _, pv := range p.repo {
+ // 8 is the size of key
+ s.MemBytes += 8
+ s.MemBytes += pv.Value.SizeInBytes()
+ }
+ return s
+}
diff --git a/pkg/index/lsm/lsm.go b/pkg/index/lsm/lsm.go
index 58f8f93..12d5a6c 100644
--- a/pkg/index/lsm/lsm.go
+++ b/pkg/index/lsm/lsm.go
@@ -22,6 +22,7 @@ import (
"github.com/apache/skywalking-banyandb/api/common"
"github.com/apache/skywalking-banyandb/banyand/kv"
+ "github.com/apache/skywalking-banyandb/banyand/observability"
"github.com/apache/skywalking-banyandb/pkg/convert"
"github.com/apache/skywalking-banyandb/pkg/index"
"github.com/apache/skywalking-banyandb/pkg/index/metadata"
@@ -36,6 +37,10 @@ type store struct {
l *logger.Logger
}
+func (s *store) Stats() observability.Statistics {
+ return s.lsm.Stats()
+}
+
func (s *store) Close() error {
return multierr.Combine(s.lsm.Close(), s.termMetadata.Close())
}
diff --git a/pkg/index/posting/posting.go b/pkg/index/posting/posting.go
index 593e61c..6f810fd 100644
--- a/pkg/index/posting/posting.go
+++ b/pkg/index/posting/posting.go
@@ -64,6 +64,8 @@ type List interface {
Marshall() ([]byte, error)
Unmarshall(data []byte) error
+
+ SizeInBytes() int64
}
type Iterator interface {
diff --git a/pkg/index/posting/roaring/roaring.go b/pkg/index/posting/roaring/roaring.go
index d121f89..eaab6a3 100644
--- a/pkg/index/posting/roaring/roaring.go
+++ b/pkg/index/posting/roaring/roaring.go
@@ -187,6 +187,10 @@ func (p *postingsList) Reset() {
p.bitmap.Clear()
}
+func (p *postingsList) SizeInBytes() int64 {
+ return int64(p.bitmap.GetSizeInBytes())
+}
+
type roaringIterator struct {
iter roaring64.IntIterable64
current common.ItemID
diff --git a/pkg/test/stream/traffic/searchable_template.json b/pkg/test/stream/traffic/searchable_template.json
new file mode 100644
index 0000000..f50a757
--- /dev/null
+++ b/pkg/test/stream/traffic/searchable_template.json
@@ -0,0 +1,12 @@
+{
+ "tags": [
+ {"str":{"value": "1"}},
+ {"int":{"value": 0}},
+ {"str":{"value": "webapp_id"}},
+ {"str":{"value": "10.0.0.1_id"}},
+ {"str":{"value": "/home_id"}},
+ {"int":{"value": 1000}},
+ {"int":{"value": 1622933202000000000}}
+ ]
+}
+
\ No newline at end of file
diff --git a/pkg/test/stream/traffic/traffic_test.go b/pkg/test/stream/traffic/traffic_test.go
new file mode 100644
index 0000000..dc8b45a
--- /dev/null
+++ b/pkg/test/stream/traffic/traffic_test.go
@@ -0,0 +1,163 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+package traffic_test
+
+import (
+ "context"
+ "crypto/rand"
+ _ "embed"
+ "strconv"
+ "time"
+
+ "github.com/golang/protobuf/jsonpb"
+ "google.golang.org/protobuf/proto"
+ "google.golang.org/protobuf/types/known/timestamppb"
+
+ commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+ modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+ streamv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
+ "github.com/apache/skywalking-banyandb/banyand/metadata"
+ "github.com/apache/skywalking-banyandb/banyand/stream"
+ "github.com/apache/skywalking-banyandb/pkg/logger"
+ "github.com/apache/skywalking-banyandb/pkg/run"
+ stream_test "github.com/apache/skywalking-banyandb/pkg/test/stream"
+)
+
+type Service interface {
+ run.PreRunner
+ run.Config
+ run.Service
+}
+
+var _ Service = (*service)(nil)
+
+// service to preload stream
+type service struct {
+ metaSvc metadata.Service
+ streamSvc stream.Service
+ l *logger.Logger
+ stopCh chan struct{}
+}
+
+func NewService(ctx context.Context, metaSvc metadata.Service, streamSvc stream.Service) Service {
+ return &service{
+ metaSvc: metaSvc,
+ streamSvc: streamSvc,
+ stopCh: make(chan struct{}),
+ }
+}
+
+func (s *service) Name() string {
+ return "stream-traffic-gen"
+}
+
+func (s *service) FlagSet() *run.FlagSet {
+ flagS := run.NewFlagSet("stream-traffic-gen")
+ return flagS
+}
+
+func (*service) Validate() error {
+ return nil
+}
+
+func (s *service) PreRun() error {
+ s.l = logger.GetLogger(s.Name())
+ return stream_test.PreloadSchema(s.metaSvc.SchemaRegistry())
+}
+
+//go:embed searchable_template.json
+var content string
+
+func (s *service) Serve() run.StopNotify {
+ searchTagFamily := &modelv1.TagFamilyForWrite{}
+ err := jsonpb.UnmarshalString(content, searchTagFamily)
+ if err != nil {
+ s.l.Err(err).Msg("unmarshal template")
+ close(s.stopCh)
+ return s.stopCh
+ }
+ stream, err := s.streamSvc.Stream(&commonv1.Metadata{
+ Name: "sw",
+ Group: "default",
+ })
+ if err != nil {
+ s.l.Err(err).Msg("get the stream")
+ close(s.stopCh)
+ return s.stopCh
+ }
+ for i := 0; i < 5; i++ {
+ svc := "svc-" + strconv.Itoa(i)
+ for j := 0; j < 10; j++ {
+ instance := "instance-" + strconv.Itoa(j)
+ go func() {
+ ticker := time.NewTicker(1 * time.Second)
+ defer ticker.Stop()
+ for {
+ select {
+ case <-ticker.C:
+ tf := proto.Clone(searchTagFamily).(*modelv1.TagFamilyForWrite)
+ tf.Tags[2] = &modelv1.TagValue{
+ Value: &modelv1.TagValue_Str{
+ Str: &modelv1.Str{
+ Value: svc,
+ },
+ },
+ }
+ tf.Tags[3] = &modelv1.TagValue{
+ Value: &modelv1.TagValue_Str{
+ Str: &modelv1.Str{
+ Value: instance,
+ },
+ },
+ }
+ data := make([]byte, 10*1024)
+ _, _ = rand.Read(data)
+ e := &streamv1.ElementValue{
+ ElementId: strconv.Itoa(i),
+ Timestamp: timestamppb.Now(),
+ TagFamilies: []*modelv1.TagFamilyForWrite{
+ {
+ Tags: []*modelv1.TagValue{
+ {
+ Value: &modelv1.TagValue_BinaryData{
+ BinaryData: data,
+ },
+ },
+ },
+ },
+ },
+ }
+ e.TagFamilies = append(e.TagFamilies, tf)
+ errInner := stream.Write(e)
+ if err != nil {
+ s.l.Err(errInner).Msg("writing to the stream")
+ }
+ case <-s.stopCh:
+ return
+ }
+ }
+ }()
+
+ }
+
+ }
+ return s.stopCh
+}
+
+func (s *service) GracefulStop() {
+}