You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ha...@apache.org on 2023/03/30 13:01:57 UTC
[skywalking-banyandb] 01/01: Integrating Sharded Buffer into TSDB
This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch sst
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit e4b98d5605d6cfedcb4bd31d394c7b1748e7f2a3
Author: Gao Hongtao <ha...@gmail.com>
AuthorDate: Thu Mar 30 12:56:04 2023 +0000
Integrating Sharded Buffer into TSDB
The Sharded Buffer will replace Badger's memtable for data ingestion.
Currently, Badger KV only provides SST.
Signed-off-by: Gao Hongtao <ha...@gmail.com>
---
banyand/kv/badger.go | 5 ++
banyand/kv/kv.go | 13 +---
banyand/tsdb/block.go | 143 +++++++++++++++++++++++++++++++++---------
banyand/tsdb/buffer.go | 26 +++++++-
dist/LICENSE | 2 +-
go.mod | 2 +-
go.sum | 4 +-
test/cases/measure/measure.go | 2 +-
test/e2e-v2/script/env | 8 +--
9 files changed, 156 insertions(+), 49 deletions(-)
diff --git a/banyand/kv/badger.go b/banyand/kv/badger.go
index 7f616d2f..7bc7ee99 100644
--- a/banyand/kv/badger.go
+++ b/banyand/kv/badger.go
@@ -26,6 +26,7 @@ import (
"github.com/dgraph-io/badger/v3"
"github.com/dgraph-io/badger/v3/banyandb"
+ "github.com/dgraph-io/badger/v3/skl"
"github.com/dgraph-io/badger/v3/y"
"github.com/apache/skywalking-banyandb/banyand/observability"
@@ -49,6 +50,10 @@ type badgerTSS struct {
dbOpts badger.Options
}
+func (b *badgerTSS) Handover(skl *skl.Skiplist) error {
+ return b.db.HandoverIterator(skl.NewUniIterator(false))
+}
+
func (b *badgerTSS) Stats() (s observability.Statistics) {
return badgerStats(b.db)
}
diff --git a/banyand/kv/kv.go b/banyand/kv/kv.go
index 8f85432e..1a9a7cd6 100644
--- a/banyand/kv/kv.go
+++ b/banyand/kv/kv.go
@@ -25,6 +25,7 @@ import (
"github.com/dgraph-io/badger/v3"
"github.com/dgraph-io/badger/v3/options"
+ "github.com/dgraph-io/badger/v3/skl"
"github.com/pkg/errors"
"github.com/apache/skywalking-banyandb/banyand/observability"
@@ -76,15 +77,6 @@ type Store interface {
Reader
}
-// TimeSeriesWriter allows writing to a time-series storage.
-type TimeSeriesWriter interface {
- // Put a value with a timestamp/version
- Put(key, val []byte, ts uint64) error
- // PutAsync a value with a timestamp/version asynchronously.
- // Injected "f" func will notice the result of value write.
- PutAsync(key, val []byte, ts uint64, f func(error)) error
-}
-
// TimeSeriesReader allows retrieving data from a time-series storage.
type TimeSeriesReader interface {
// Get a value by its key and timestamp/version
@@ -95,7 +87,7 @@ type TimeSeriesReader interface {
type TimeSeriesStore interface {
observability.Observable
io.Closer
- TimeSeriesWriter
+ Handover(skl *skl.Skiplist) error
TimeSeriesReader
}
@@ -191,6 +183,7 @@ func OpenTimeSeriesStore(path string, options ...TimeSeriesOptions) (TimeSeriesS
if btss.dbOpts.MemTableSize < 8<<20 {
btss.dbOpts = btss.dbOpts.WithValueThreshold(1 << 10)
}
+ btss.dbOpts = btss.dbOpts.WithInTable()
var err error
btss.db, err = badger.Open(btss.dbOpts)
if err != nil {
diff --git a/banyand/tsdb/block.go b/banyand/tsdb/block.go
index 70e94db6..763f86bd 100644
--- a/banyand/tsdb/block.go
+++ b/banyand/tsdb/block.go
@@ -29,6 +29,7 @@ import (
"sync/atomic"
"time"
+ "github.com/dgraph-io/badger/v3/skl"
"github.com/pkg/errors"
"go.uber.org/multierr"
@@ -48,40 +49,46 @@ const (
componentSecondInvertedIdx = "inverted"
componentSecondLSMIdx = "lsm"
- defaultMainMemorySize = 8 << 20
- defaultEnqueueTimeout = 500 * time.Millisecond
+ defaultBufferSize = 8 << 20
+ defaultEnqueueTimeout = 500 * time.Millisecond
+ maxBlockAge = time.Hour
+ defaultWriteConcurrency = 1000
+ defaultNumBufferShards = 2
)
var errBlockClosingInterrupted = errors.New("interrupt to close the block")
type block struct {
- store kv.TimeSeriesStore
- openOpts openOpts
- queue bucket.Queue
- bucket.Reporter
- clock timestamp.Clock
- lsmIndex index.Store
invertedIndex index.Store
- closed *atomic.Bool
- l *logger.Logger
- deleted *atomic.Bool
- ref *atomic.Int32
- position common.Position
+ sst kv.TimeSeriesStore
+ queue bucket.Queue
+ bucket.Reporter
+ clock timestamp.Clock
+ lsmIndex index.Store
+ closed *atomic.Bool
+ buffer *Buffer
+ l *logger.Logger
+ deleted *atomic.Bool
+ ref *atomic.Int32
+ closeBufferTimer *time.Timer
+ position common.Position
timestamp.TimeRange
segSuffix string
suffix string
path string
closableLst []io.Closer
+ openOpts openOpts
lock sync.RWMutex
segID SectionID
blockID SectionID
}
type openOpts struct {
- store []kv.TimeSeriesOptions
- storePath string
- inverted inverted.StoreOpts
- lsm lsm.StoreOpts
+ storePath string
+ inverted inverted.StoreOpts
+ lsm lsm.StoreOpts
+ store []kv.TimeSeriesOptions
+ bufferSize int64
}
type blockOpts struct {
@@ -140,20 +147,21 @@ func (b *block) options(ctx context.Context) {
if options.CompressionMethod.Type == CompressionTypeZSTD {
b.openOpts.store = append(b.openOpts.store, kv.TSSWithZSTDCompression(options.CompressionMethod.ChunkSizeInBytes))
}
- var memSize int64
+ var bufferSize int64
if options.BlockMemSize < 1 {
- memSize = defaultMainMemorySize
+ bufferSize = defaultBufferSize
} else {
- memSize = options.BlockMemSize
+ bufferSize = options.BlockMemSize
}
- b.openOpts.store = append(b.openOpts.store, kv.TSSWithMemTableSize(memSize), kv.TSSWithLogger(b.l.Named(componentMain)))
+ b.openOpts.bufferSize = bufferSize
+ b.openOpts.store = append(b.openOpts.store, kv.TSSWithMemTableSize(bufferSize), kv.TSSWithLogger(b.l.Named(componentMain)))
b.openOpts.storePath = path.Join(b.path, componentMain)
b.openOpts.inverted = inverted.StoreOpts{
Path: path.Join(b.path, componentSecondInvertedIdx),
Logger: b.l.Named(componentSecondInvertedIdx),
BatchWaitSec: options.BlockInvertedIndex.BatchWaitSec,
}
- lsmMemSize := memSize / 8
+ lsmMemSize := bufferSize / 8
if lsmMemSize < defaultKVMemorySize {
lsmMemSize = defaultKVMemorySize
}
@@ -177,10 +185,15 @@ func (b *block) openSafely() (err error) {
}
func (b *block) open() (err error) {
- if b.store, err = kv.OpenTimeSeriesStore(b.openOpts.storePath, b.openOpts.store...); err != nil {
+ if b.isActive() {
+ if err = b.openBuffer(); err != nil {
+ return err
+ }
+ }
+ if b.sst, err = kv.OpenTimeSeriesStore(b.openOpts.storePath, b.openOpts.store...); err != nil {
return err
}
- b.closableLst = append(b.closableLst, b.store)
+ b.closableLst = append(b.closableLst, b.sst)
if b.invertedIndex, err = inverted.NewStore(b.openOpts.inverted); err != nil {
return err
}
@@ -193,6 +206,52 @@ func (b *block) open() (err error) {
return nil
}
+func (b *block) openBuffer() (err error) {
+ if b.buffer != nil {
+ return nil
+ }
+ if b.buffer, err = NewBuffer(b.l, int(b.openOpts.bufferSize/defaultNumBufferShards),
+ defaultWriteConcurrency, defaultNumBufferShards, b.flush); err != nil {
+ return err
+ }
+ now := b.clock.Now()
+ max := b.maxTime()
+ closeAfter := max.Sub(now)
+ // TODO: we should move inactive write buffer to segment or shard.
+ if now.After(max) {
+ closeAfter = maxBlockAge
+ }
+ // create a timer to close buffer
+ b.closeBufferTimer = time.AfterFunc(closeAfter, func() {
+ if b.l.Debug().Enabled() {
+ b.l.Debug().Msg("closing buffer")
+ }
+ b.lock.Lock()
+ defer b.lock.Unlock()
+ if b.buffer == nil {
+ return
+ }
+ if err := b.buffer.Close(); err != nil {
+ b.l.Error().Err(err).Msg("close buffer error")
+ }
+ b.buffer = nil
+ })
+ return nil
+}
+
+func (b *block) isActive() bool {
+ return !b.clock.Now().After(b.maxTime())
+}
+
+func (b *block) maxTime() time.Time {
+ return b.End.Add(maxBlockAge)
+}
+
+func (b *block) flush(shardIndex int, skl *skl.Skiplist) error {
+ b.l.Info().Int("shard", shardIndex).Msg("flushing buffer")
+ return b.sst.Handover(skl)
+}
+
func (b *block) delegate(ctx context.Context) (blockDelegate, error) {
if b.deleted.Load() {
return nil, errors.WithMessagef(errBlockAbsent, "block %s is deleted", b)
@@ -284,6 +343,12 @@ func (b *block) close(ctx context.Context) (err error) {
case <-ch:
}
b.closed.Store(true)
+ if b.closeBufferTimer != nil {
+ b.closeBufferTimer.Stop()
+ }
+ if b.buffer != nil {
+ err = multierr.Append(err, b.buffer.Close())
+ }
for _, closer := range b.closableLst {
err = multierr.Append(err, closer.Close())
}
@@ -308,15 +373,25 @@ func (b *block) String() string {
}
func (b *block) stats() (names []string, stats []observability.Statistics) {
- names = append(names, componentMain, componentSecondInvertedIdx, componentSecondLSMIdx)
if b.Closed() {
- stats = make([]observability.Statistics, 3)
+ stats = make([]observability.Statistics, 0)
return
}
- stats = append(stats, b.store.Stats(), b.invertedIndex.Stats(), b.lsmIndex.Stats())
+ bnn, bss := b.buffer.Stats()
+ names = append(names, bnn...)
+ stats = append(stats, bss...)
+ names = append(names, componentSecondInvertedIdx, componentSecondLSMIdx)
+ stats = append(stats, b.invertedIndex.Stats(), b.lsmIndex.Stats())
return names, stats
}
+func (b *block) Get(key []byte, ts uint64) ([]byte, error) {
+ if v, ok := b.buffer.Read(key, time.Unix(0, int64(ts))); ok {
+ return v, nil
+ }
+ return b.sst.Get(key, ts)
+}
+
type blockDelegate interface {
io.Closer
contains(ts time.Time) bool
@@ -340,7 +415,7 @@ type bDelegate struct {
}
func (d *bDelegate) dataReader() kv.TimeSeriesReader {
- return d.delegate.store
+ return d.delegate
}
func (d *bDelegate) lsmIndexReader() index.Searcher {
@@ -364,7 +439,17 @@ func (d *bDelegate) identity() (segID SectionID, blockID SectionID) {
}
func (d *bDelegate) write(key []byte, val []byte, ts time.Time) error {
- return d.delegate.store.Put(key, val, uint64(ts.UnixNano()))
+ // On-demand open buffer
+ if d.delegate.buffer == nil {
+ d.delegate.lock.Lock()
+ if err := d.delegate.openBuffer(); err != nil {
+ d.delegate.lock.Unlock()
+ return err
+ }
+ d.delegate.lock.Unlock()
+ }
+ d.delegate.buffer.Write(key, val, ts)
+ return nil
}
func (d *bDelegate) writePrimaryIndex(field index.Field, id common.ItemID) error {
diff --git a/banyand/tsdb/buffer.go b/banyand/tsdb/buffer.go
index 8372554a..e6f4ac0a 100644
--- a/banyand/tsdb/buffer.go
+++ b/banyand/tsdb/buffer.go
@@ -25,6 +25,7 @@ import (
"github.com/dgraph-io/badger/v3/skl"
"github.com/dgraph-io/badger/v3/y"
+ "github.com/apache/skywalking-banyandb/banyand/observability"
"github.com/apache/skywalking-banyandb/pkg/convert"
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/run"
@@ -105,7 +106,7 @@ func NewBuffer(log *logger.Logger, flushSize, writeConcurrency, numShards int, o
// Write adds a key-value pair with a timestamp to the appropriate shard bucket in the buffer.
func (b *Buffer) Write(key, value []byte, timestamp time.Time) {
- if !b.entryCloser.AddRunning() {
+ if b == nil || !b.entryCloser.AddRunning() {
return
}
defer b.entryCloser.Done()
@@ -119,6 +120,10 @@ func (b *Buffer) Write(key, value []byte, timestamp time.Time) {
// Read retrieves the value associated with the given key and timestamp from the appropriate shard bucket in the buffer.
func (b *Buffer) Read(key []byte, ts time.Time) ([]byte, bool) {
+ if b == nil || !b.entryCloser.AddRunning() {
+ return nil, false
+ }
+ defer b.entryCloser.Done()
keyWithTS := y.KeyWithTs(key, uint64(ts.UnixNano()))
index := b.getShardIndex(key)
epoch := uint64(ts.UnixNano())
@@ -157,6 +162,25 @@ func (b *Buffer) Close() error {
return nil
}
+// Stats returns the statistics for the buffer.
+func (b *Buffer) Stats() ([]string, []observability.Statistics) {
+ names := make([]string, b.numShards)
+ stats := make([]observability.Statistics, b.numShards)
+ for i := 0; i < b.numShards; i++ {
+ names[i] = fmt.Sprintf("buffer-%d", i)
+ var size, maxSize int64
+ for _, l := range b.buckets[i].getAll() {
+ size += l.MemSize()
+ maxSize += int64(b.buckets[i].size)
+ }
+ stats[i] = observability.Statistics{
+ MemBytes: size,
+ MaxMemBytes: maxSize,
+ }
+ }
+ return names, stats
+}
+
func (b *Buffer) getShardIndex(key []byte) uint64 {
return convert.Hash(key) % uint64(b.numShards)
}
diff --git a/dist/LICENSE b/dist/LICENSE
index 75485dd4..d04bca78 100644
--- a/dist/LICENSE
+++ b/dist/LICENSE
@@ -178,7 +178,7 @@
Apache-2.0 licenses
========================================================================
- github.com/SkyAPM/badger/v3 v3.0.0-20221227124922-b88a2f7d336f Apache-2.0
+ github.com/SkyAPM/badger/v3 v3.0.0-20230329010346-dfdf57f0581b Apache-2.0
github.com/blevesearch/segment v0.9.0 Apache-2.0
github.com/blevesearch/vellum v1.0.7 Apache-2.0
github.com/coreos/go-semver v0.3.0 Apache-2.0
diff --git a/go.mod b/go.mod
index aba7feed..a11d213a 100644
--- a/go.mod
+++ b/go.mod
@@ -136,5 +136,5 @@ replace (
github.com/blugelabs/bluge => github.com/zinclabs/bluge v1.1.5
github.com/blugelabs/bluge_segment_api => github.com/zinclabs/bluge_segment_api v1.0.0
github.com/blugelabs/ice => github.com/zinclabs/ice v1.1.3
- github.com/dgraph-io/badger/v3 v3.2011.1 => github.com/SkyAPM/badger/v3 v3.0.0-20221227124922-b88a2f7d336f
+ github.com/dgraph-io/badger/v3 v3.2011.1 => github.com/SkyAPM/badger/v3 v3.0.0-20230329010346-dfdf57f0581b
)
diff --git a/go.sum b/go.sum
index c5d2e026..ef0a79f4 100644
--- a/go.sum
+++ b/go.sum
@@ -46,8 +46,8 @@ github.com/OneOfOne/xxhash v1.2.2 h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
github.com/RoaringBitmap/roaring v0.9.4 h1:ckvZSX5gwCRaJYBNe7syNawCU5oruY9gQmjXlp4riwo=
github.com/RoaringBitmap/roaring v0.9.4/go.mod h1:icnadbWcNyfEHlYdr+tDlOTih1Bf/h+rzPpv4sbomAA=
-github.com/SkyAPM/badger/v3 v3.0.0-20221227124922-b88a2f7d336f h1:fcfR1l/cO03QQWnwhlYdgXkjZBV58ZmxwgRJj7HiNQw=
-github.com/SkyAPM/badger/v3 v3.0.0-20221227124922-b88a2f7d336f/go.mod h1:4WETftF8A4mEeFgIsYB/VvGo5kfTVl/neYgEqlVW9Ag=
+github.com/SkyAPM/badger/v3 v3.0.0-20230329010346-dfdf57f0581b h1:ZThiWg9yHnIMYy+KaAgkNZhnFgF7kvtytsdjb24zAGU=
+github.com/SkyAPM/badger/v3 v3.0.0-20230329010346-dfdf57f0581b/go.mod h1:4WETftF8A4mEeFgIsYB/VvGo5kfTVl/neYgEqlVW9Ag=
github.com/SkyAPM/clock v1.3.1-0.20220809233656-dc7607c94a97 h1:FKuhJ+6n/DHspGeLleeNbziWnKr9gHKYN4q7NcoCp4s=
github.com/SkyAPM/clock v1.3.1-0.20220809233656-dc7607c94a97/go.mod h1:2xGRl9H1pllhxTbEGO1W3gDkip8P9GQaHPni/wpdR44=
github.com/ajstarks/svgo v0.0.0-20180226025133-644b8db467af/go.mod h1:K08gAheRH3/J6wwsYMMT4xOr94bZjxIelGM0+d/wbFw=
diff --git a/test/cases/measure/measure.go b/test/cases/measure/measure.go
index b906d86b..4db0b05d 100644
--- a/test/cases/measure/measure.go
+++ b/test/cases/measure/measure.go
@@ -39,7 +39,7 @@ var (
}
)
-var _ = g.FDescribeTable("Scanning Measures", verify,
+var _ = g.DescribeTable("Scanning Measures", verify,
g.Entry("all", helpers.Args{Input: "all", Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
g.Entry("all_only_fields", helpers.Args{Input: "all", Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
g.Entry("filter by tag", helpers.Args{Input: "tag_filter", Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
diff --git a/test/e2e-v2/script/env b/test/e2e-v2/script/env
index 493eb132..52d79dc8 100644
--- a/test/e2e-v2/script/env
+++ b/test/e2e-v2/script/env
@@ -22,8 +22,8 @@ SW_AGENT_PYTHON_COMMIT=c76a6ec51a478ac91abb20ec8f22a99b8d4d6a58
SW_AGENT_CLIENT_JS_COMMIT=af0565a67d382b683c1dbd94c379b7080db61449
SW_AGENT_CLIENT_JS_TEST_COMMIT=4f1eb1dcdbde3ec4a38534bf01dded4ab5d2f016
SW_KUBERNETES_COMMIT_SHA=b670c41d94a82ddefcf466d54bab5c492d88d772
-SW_ROVER_COMMIT=d956eaede57b62108b78bca48045bd09ba88e653
-SW_CTL_COMMIT=e684fae0107045fc23799146d62f04cb68bd5a3b
+SW_ROVER_COMMIT=fc8d074c6d34ecfee585a7097cbd5aef1ca680a5
+SW_CTL_COMMIT=23debb3b77426edd70192095a5fe9b0fc9031068
-SW_OAP_COMMIT=1335a48f1c034abc1fe24f6197ee7acfa3118bf0
-SW_AGENT_E2E_SERVICE_PROVIDER_COMMIT=828e6e2f2b57a0f06bb0d507e3296d2377943d9a
+SW_OAP_COMMIT=0664bcdcb4bbf9e37ed3d497ed28e527d40bd819
+SW_AGENT_E2E_SERVICE_PROVIDER_COMMIT=cc7a2c9e97fd2c421adbe3e9c471688459a446d9