You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ha...@apache.org on 2023/03/30 13:05:49 UTC
[skywalking-banyandb] branch sst updated (e4b98d56 -> 5064d78b)
This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a change to branch sst
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
discard e4b98d56 Integrating Sharded Buffer into TSDB
new 5064d78b Integrating Sharded Buffer into TSDB
This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version. This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:
* -- * -- B -- O -- O -- O (e4b98d56)
\
N -- N -- N refs/heads/sst (5064d78b)
You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.
Any revisions marked "omit" are not gone; other references still
refer to them. Any revisions marked "discard" are gone forever.
The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails. The revisions
listed as "add" were already present in the repository and have only
been added to this reference.
Summary of changes:
CHANGES.md | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
[skywalking-banyandb] 01/01: Integrating Sharded Buffer into TSDB
Posted by ha...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch sst
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 5064d78b706206acf213814b255c08c83edeec05
Author: Gao Hongtao <ha...@gmail.com>
AuthorDate: Thu Mar 30 12:56:04 2023 +0000
Integrating Sharded Buffer into TSDB
The Sharded Buffer will replace Badger's memtable for data ingestion.
Currently, Badger KV only provides SST.
Signed-off-by: Gao Hongtao <ha...@gmail.com>
---
CHANGES.md | 2 +-
banyand/kv/badger.go | 5 ++
banyand/kv/kv.go | 13 +---
banyand/tsdb/block.go | 143 +++++++++++++++++++++++++++++++++---------
banyand/tsdb/buffer.go | 26 +++++++-
dist/LICENSE | 2 +-
go.mod | 2 +-
go.sum | 4 +-
test/cases/measure/measure.go | 2 +-
test/e2e-v2/script/env | 8 +--
10 files changed, 157 insertions(+), 50 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index 17c09ea6..fc47300b 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -9,7 +9,7 @@ Release Notes.
- Add TSDB concept document.
- [UI] Add YAML editor for inputting query criteria.
- Refactor TopN to support `NULL` group while keeping seriesID from the source measure.
-- Add a sharded buffer to TSDB.
+- Add a sharded buffer to TSDB to replace Badger's memtable. Badger KV only provides SST.
### Chores
diff --git a/banyand/kv/badger.go b/banyand/kv/badger.go
index 7f616d2f..7bc7ee99 100644
--- a/banyand/kv/badger.go
+++ b/banyand/kv/badger.go
@@ -26,6 +26,7 @@ import (
"github.com/dgraph-io/badger/v3"
"github.com/dgraph-io/badger/v3/banyandb"
+ "github.com/dgraph-io/badger/v3/skl"
"github.com/dgraph-io/badger/v3/y"
"github.com/apache/skywalking-banyandb/banyand/observability"
@@ -49,6 +50,10 @@ type badgerTSS struct {
dbOpts badger.Options
}
+func (b *badgerTSS) Handover(skl *skl.Skiplist) error {
+ return b.db.HandoverIterator(skl.NewUniIterator(false))
+}
+
func (b *badgerTSS) Stats() (s observability.Statistics) {
return badgerStats(b.db)
}
diff --git a/banyand/kv/kv.go b/banyand/kv/kv.go
index 8f85432e..1a9a7cd6 100644
--- a/banyand/kv/kv.go
+++ b/banyand/kv/kv.go
@@ -25,6 +25,7 @@ import (
"github.com/dgraph-io/badger/v3"
"github.com/dgraph-io/badger/v3/options"
+ "github.com/dgraph-io/badger/v3/skl"
"github.com/pkg/errors"
"github.com/apache/skywalking-banyandb/banyand/observability"
@@ -76,15 +77,6 @@ type Store interface {
Reader
}
-// TimeSeriesWriter allows writing to a time-series storage.
-type TimeSeriesWriter interface {
- // Put a value with a timestamp/version
- Put(key, val []byte, ts uint64) error
- // PutAsync a value with a timestamp/version asynchronously.
- // Injected "f" func will notice the result of value write.
- PutAsync(key, val []byte, ts uint64, f func(error)) error
-}
-
// TimeSeriesReader allows retrieving data from a time-series storage.
type TimeSeriesReader interface {
// Get a value by its key and timestamp/version
@@ -95,7 +87,7 @@ type TimeSeriesReader interface {
type TimeSeriesStore interface {
observability.Observable
io.Closer
- TimeSeriesWriter
+ Handover(skl *skl.Skiplist) error
TimeSeriesReader
}
@@ -191,6 +183,7 @@ func OpenTimeSeriesStore(path string, options ...TimeSeriesOptions) (TimeSeriesS
if btss.dbOpts.MemTableSize < 8<<20 {
btss.dbOpts = btss.dbOpts.WithValueThreshold(1 << 10)
}
+ btss.dbOpts = btss.dbOpts.WithInTable()
var err error
btss.db, err = badger.Open(btss.dbOpts)
if err != nil {
diff --git a/banyand/tsdb/block.go b/banyand/tsdb/block.go
index 70e94db6..763f86bd 100644
--- a/banyand/tsdb/block.go
+++ b/banyand/tsdb/block.go
@@ -29,6 +29,7 @@ import (
"sync/atomic"
"time"
+ "github.com/dgraph-io/badger/v3/skl"
"github.com/pkg/errors"
"go.uber.org/multierr"
@@ -48,40 +49,46 @@ const (
componentSecondInvertedIdx = "inverted"
componentSecondLSMIdx = "lsm"
- defaultMainMemorySize = 8 << 20
- defaultEnqueueTimeout = 500 * time.Millisecond
+ defaultBufferSize = 8 << 20
+ defaultEnqueueTimeout = 500 * time.Millisecond
+ maxBlockAge = time.Hour
+ defaultWriteConcurrency = 1000
+ defaultNumBufferShards = 2
)
var errBlockClosingInterrupted = errors.New("interrupt to close the block")
type block struct {
- store kv.TimeSeriesStore
- openOpts openOpts
- queue bucket.Queue
- bucket.Reporter
- clock timestamp.Clock
- lsmIndex index.Store
invertedIndex index.Store
- closed *atomic.Bool
- l *logger.Logger
- deleted *atomic.Bool
- ref *atomic.Int32
- position common.Position
+ sst kv.TimeSeriesStore
+ queue bucket.Queue
+ bucket.Reporter
+ clock timestamp.Clock
+ lsmIndex index.Store
+ closed *atomic.Bool
+ buffer *Buffer
+ l *logger.Logger
+ deleted *atomic.Bool
+ ref *atomic.Int32
+ closeBufferTimer *time.Timer
+ position common.Position
timestamp.TimeRange
segSuffix string
suffix string
path string
closableLst []io.Closer
+ openOpts openOpts
lock sync.RWMutex
segID SectionID
blockID SectionID
}
type openOpts struct {
- store []kv.TimeSeriesOptions
- storePath string
- inverted inverted.StoreOpts
- lsm lsm.StoreOpts
+ storePath string
+ inverted inverted.StoreOpts
+ lsm lsm.StoreOpts
+ store []kv.TimeSeriesOptions
+ bufferSize int64
}
type blockOpts struct {
@@ -140,20 +147,21 @@ func (b *block) options(ctx context.Context) {
if options.CompressionMethod.Type == CompressionTypeZSTD {
b.openOpts.store = append(b.openOpts.store, kv.TSSWithZSTDCompression(options.CompressionMethod.ChunkSizeInBytes))
}
- var memSize int64
+ var bufferSize int64
if options.BlockMemSize < 1 {
- memSize = defaultMainMemorySize
+ bufferSize = defaultBufferSize
} else {
- memSize = options.BlockMemSize
+ bufferSize = options.BlockMemSize
}
- b.openOpts.store = append(b.openOpts.store, kv.TSSWithMemTableSize(memSize), kv.TSSWithLogger(b.l.Named(componentMain)))
+ b.openOpts.bufferSize = bufferSize
+ b.openOpts.store = append(b.openOpts.store, kv.TSSWithMemTableSize(bufferSize), kv.TSSWithLogger(b.l.Named(componentMain)))
b.openOpts.storePath = path.Join(b.path, componentMain)
b.openOpts.inverted = inverted.StoreOpts{
Path: path.Join(b.path, componentSecondInvertedIdx),
Logger: b.l.Named(componentSecondInvertedIdx),
BatchWaitSec: options.BlockInvertedIndex.BatchWaitSec,
}
- lsmMemSize := memSize / 8
+ lsmMemSize := bufferSize / 8
if lsmMemSize < defaultKVMemorySize {
lsmMemSize = defaultKVMemorySize
}
@@ -177,10 +185,15 @@ func (b *block) openSafely() (err error) {
}
func (b *block) open() (err error) {
- if b.store, err = kv.OpenTimeSeriesStore(b.openOpts.storePath, b.openOpts.store...); err != nil {
+ if b.isActive() {
+ if err = b.openBuffer(); err != nil {
+ return err
+ }
+ }
+ if b.sst, err = kv.OpenTimeSeriesStore(b.openOpts.storePath, b.openOpts.store...); err != nil {
return err
}
- b.closableLst = append(b.closableLst, b.store)
+ b.closableLst = append(b.closableLst, b.sst)
if b.invertedIndex, err = inverted.NewStore(b.openOpts.inverted); err != nil {
return err
}
@@ -193,6 +206,52 @@ func (b *block) open() (err error) {
return nil
}
+func (b *block) openBuffer() (err error) {
+ if b.buffer != nil {
+ return nil
+ }
+ if b.buffer, err = NewBuffer(b.l, int(b.openOpts.bufferSize/defaultNumBufferShards),
+ defaultWriteConcurrency, defaultNumBufferShards, b.flush); err != nil {
+ return err
+ }
+ now := b.clock.Now()
+ max := b.maxTime()
+ closeAfter := max.Sub(now)
+ // TODO: we should move inactive write buffer to segment or shard.
+ if now.After(max) {
+ closeAfter = maxBlockAge
+ }
+ // create a timer to close buffer
+ b.closeBufferTimer = time.AfterFunc(closeAfter, func() {
+ if b.l.Debug().Enabled() {
+ b.l.Debug().Msg("closing buffer")
+ }
+ b.lock.Lock()
+ defer b.lock.Unlock()
+ if b.buffer == nil {
+ return
+ }
+ if err := b.buffer.Close(); err != nil {
+ b.l.Error().Err(err).Msg("close buffer error")
+ }
+ b.buffer = nil
+ })
+ return nil
+}
+
+func (b *block) isActive() bool {
+ return !b.clock.Now().After(b.maxTime())
+}
+
+func (b *block) maxTime() time.Time {
+ return b.End.Add(maxBlockAge)
+}
+
+func (b *block) flush(shardIndex int, skl *skl.Skiplist) error {
+ b.l.Info().Int("shard", shardIndex).Msg("flushing buffer")
+ return b.sst.Handover(skl)
+}
+
func (b *block) delegate(ctx context.Context) (blockDelegate, error) {
if b.deleted.Load() {
return nil, errors.WithMessagef(errBlockAbsent, "block %s is deleted", b)
@@ -284,6 +343,12 @@ func (b *block) close(ctx context.Context) (err error) {
case <-ch:
}
b.closed.Store(true)
+ if b.closeBufferTimer != nil {
+ b.closeBufferTimer.Stop()
+ }
+ if b.buffer != nil {
+ err = multierr.Append(err, b.buffer.Close())
+ }
for _, closer := range b.closableLst {
err = multierr.Append(err, closer.Close())
}
@@ -308,15 +373,25 @@ func (b *block) String() string {
}
func (b *block) stats() (names []string, stats []observability.Statistics) {
- names = append(names, componentMain, componentSecondInvertedIdx, componentSecondLSMIdx)
if b.Closed() {
- stats = make([]observability.Statistics, 3)
+ stats = make([]observability.Statistics, 0)
return
}
- stats = append(stats, b.store.Stats(), b.invertedIndex.Stats(), b.lsmIndex.Stats())
+ bnn, bss := b.buffer.Stats()
+ names = append(names, bnn...)
+ stats = append(stats, bss...)
+ names = append(names, componentSecondInvertedIdx, componentSecondLSMIdx)
+ stats = append(stats, b.invertedIndex.Stats(), b.lsmIndex.Stats())
return names, stats
}
+func (b *block) Get(key []byte, ts uint64) ([]byte, error) {
+ if v, ok := b.buffer.Read(key, time.Unix(0, int64(ts))); ok {
+ return v, nil
+ }
+ return b.sst.Get(key, ts)
+}
+
type blockDelegate interface {
io.Closer
contains(ts time.Time) bool
@@ -340,7 +415,7 @@ type bDelegate struct {
}
func (d *bDelegate) dataReader() kv.TimeSeriesReader {
- return d.delegate.store
+ return d.delegate
}
func (d *bDelegate) lsmIndexReader() index.Searcher {
@@ -364,7 +439,17 @@ func (d *bDelegate) identity() (segID SectionID, blockID SectionID) {
}
func (d *bDelegate) write(key []byte, val []byte, ts time.Time) error {
- return d.delegate.store.Put(key, val, uint64(ts.UnixNano()))
+ // On-demand open buffer
+ if d.delegate.buffer == nil {
+ d.delegate.lock.Lock()
+ if err := d.delegate.openBuffer(); err != nil {
+ d.delegate.lock.Unlock()
+ return err
+ }
+ d.delegate.lock.Unlock()
+ }
+ d.delegate.buffer.Write(key, val, ts)
+ return nil
}
func (d *bDelegate) writePrimaryIndex(field index.Field, id common.ItemID) error {
diff --git a/banyand/tsdb/buffer.go b/banyand/tsdb/buffer.go
index 8372554a..e6f4ac0a 100644
--- a/banyand/tsdb/buffer.go
+++ b/banyand/tsdb/buffer.go
@@ -25,6 +25,7 @@ import (
"github.com/dgraph-io/badger/v3/skl"
"github.com/dgraph-io/badger/v3/y"
+ "github.com/apache/skywalking-banyandb/banyand/observability"
"github.com/apache/skywalking-banyandb/pkg/convert"
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/run"
@@ -105,7 +106,7 @@ func NewBuffer(log *logger.Logger, flushSize, writeConcurrency, numShards int, o
// Write adds a key-value pair with a timestamp to the appropriate shard bucket in the buffer.
func (b *Buffer) Write(key, value []byte, timestamp time.Time) {
- if !b.entryCloser.AddRunning() {
+ if b == nil || !b.entryCloser.AddRunning() {
return
}
defer b.entryCloser.Done()
@@ -119,6 +120,10 @@ func (b *Buffer) Write(key, value []byte, timestamp time.Time) {
// Read retrieves the value associated with the given key and timestamp from the appropriate shard bucket in the buffer.
func (b *Buffer) Read(key []byte, ts time.Time) ([]byte, bool) {
+ if b == nil || !b.entryCloser.AddRunning() {
+ return nil, false
+ }
+ defer b.entryCloser.Done()
keyWithTS := y.KeyWithTs(key, uint64(ts.UnixNano()))
index := b.getShardIndex(key)
epoch := uint64(ts.UnixNano())
@@ -157,6 +162,25 @@ func (b *Buffer) Close() error {
return nil
}
+// Stats returns the statistics for the buffer.
+func (b *Buffer) Stats() ([]string, []observability.Statistics) {
+ names := make([]string, b.numShards)
+ stats := make([]observability.Statistics, b.numShards)
+ for i := 0; i < b.numShards; i++ {
+ names[i] = fmt.Sprintf("buffer-%d", i)
+ var size, maxSize int64
+ for _, l := range b.buckets[i].getAll() {
+ size += l.MemSize()
+ maxSize += int64(b.buckets[i].size)
+ }
+ stats[i] = observability.Statistics{
+ MemBytes: size,
+ MaxMemBytes: maxSize,
+ }
+ }
+ return names, stats
+}
+
func (b *Buffer) getShardIndex(key []byte) uint64 {
return convert.Hash(key) % uint64(b.numShards)
}
diff --git a/dist/LICENSE b/dist/LICENSE
index 75485dd4..d04bca78 100644
--- a/dist/LICENSE
+++ b/dist/LICENSE
@@ -178,7 +178,7 @@
Apache-2.0 licenses
========================================================================
- github.com/SkyAPM/badger/v3 v3.0.0-20221227124922-b88a2f7d336f Apache-2.0
+ github.com/SkyAPM/badger/v3 v3.0.0-20230329010346-dfdf57f0581b Apache-2.0
github.com/blevesearch/segment v0.9.0 Apache-2.0
github.com/blevesearch/vellum v1.0.7 Apache-2.0
github.com/coreos/go-semver v0.3.0 Apache-2.0
diff --git a/go.mod b/go.mod
index aba7feed..a11d213a 100644
--- a/go.mod
+++ b/go.mod
@@ -136,5 +136,5 @@ replace (
github.com/blugelabs/bluge => github.com/zinclabs/bluge v1.1.5
github.com/blugelabs/bluge_segment_api => github.com/zinclabs/bluge_segment_api v1.0.0
github.com/blugelabs/ice => github.com/zinclabs/ice v1.1.3
- github.com/dgraph-io/badger/v3 v3.2011.1 => github.com/SkyAPM/badger/v3 v3.0.0-20221227124922-b88a2f7d336f
+ github.com/dgraph-io/badger/v3 v3.2011.1 => github.com/SkyAPM/badger/v3 v3.0.0-20230329010346-dfdf57f0581b
)
diff --git a/go.sum b/go.sum
index c5d2e026..ef0a79f4 100644
--- a/go.sum
+++ b/go.sum
@@ -46,8 +46,8 @@ github.com/OneOfOne/xxhash v1.2.2 h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
github.com/RoaringBitmap/roaring v0.9.4 h1:ckvZSX5gwCRaJYBNe7syNawCU5oruY9gQmjXlp4riwo=
github.com/RoaringBitmap/roaring v0.9.4/go.mod h1:icnadbWcNyfEHlYdr+tDlOTih1Bf/h+rzPpv4sbomAA=
-github.com/SkyAPM/badger/v3 v3.0.0-20221227124922-b88a2f7d336f h1:fcfR1l/cO03QQWnwhlYdgXkjZBV58ZmxwgRJj7HiNQw=
-github.com/SkyAPM/badger/v3 v3.0.0-20221227124922-b88a2f7d336f/go.mod h1:4WETftF8A4mEeFgIsYB/VvGo5kfTVl/neYgEqlVW9Ag=
+github.com/SkyAPM/badger/v3 v3.0.0-20230329010346-dfdf57f0581b h1:ZThiWg9yHnIMYy+KaAgkNZhnFgF7kvtytsdjb24zAGU=
+github.com/SkyAPM/badger/v3 v3.0.0-20230329010346-dfdf57f0581b/go.mod h1:4WETftF8A4mEeFgIsYB/VvGo5kfTVl/neYgEqlVW9Ag=
github.com/SkyAPM/clock v1.3.1-0.20220809233656-dc7607c94a97 h1:FKuhJ+6n/DHspGeLleeNbziWnKr9gHKYN4q7NcoCp4s=
github.com/SkyAPM/clock v1.3.1-0.20220809233656-dc7607c94a97/go.mod h1:2xGRl9H1pllhxTbEGO1W3gDkip8P9GQaHPni/wpdR44=
github.com/ajstarks/svgo v0.0.0-20180226025133-644b8db467af/go.mod h1:K08gAheRH3/J6wwsYMMT4xOr94bZjxIelGM0+d/wbFw=
diff --git a/test/cases/measure/measure.go b/test/cases/measure/measure.go
index b906d86b..4db0b05d 100644
--- a/test/cases/measure/measure.go
+++ b/test/cases/measure/measure.go
@@ -39,7 +39,7 @@ var (
}
)
-var _ = g.FDescribeTable("Scanning Measures", verify,
+var _ = g.DescribeTable("Scanning Measures", verify,
g.Entry("all", helpers.Args{Input: "all", Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
g.Entry("all_only_fields", helpers.Args{Input: "all", Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
g.Entry("filter by tag", helpers.Args{Input: "tag_filter", Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
diff --git a/test/e2e-v2/script/env b/test/e2e-v2/script/env
index 493eb132..52d79dc8 100644
--- a/test/e2e-v2/script/env
+++ b/test/e2e-v2/script/env
@@ -22,8 +22,8 @@ SW_AGENT_PYTHON_COMMIT=c76a6ec51a478ac91abb20ec8f22a99b8d4d6a58
SW_AGENT_CLIENT_JS_COMMIT=af0565a67d382b683c1dbd94c379b7080db61449
SW_AGENT_CLIENT_JS_TEST_COMMIT=4f1eb1dcdbde3ec4a38534bf01dded4ab5d2f016
SW_KUBERNETES_COMMIT_SHA=b670c41d94a82ddefcf466d54bab5c492d88d772
-SW_ROVER_COMMIT=d956eaede57b62108b78bca48045bd09ba88e653
-SW_CTL_COMMIT=e684fae0107045fc23799146d62f04cb68bd5a3b
+SW_ROVER_COMMIT=fc8d074c6d34ecfee585a7097cbd5aef1ca680a5
+SW_CTL_COMMIT=23debb3b77426edd70192095a5fe9b0fc9031068
-SW_OAP_COMMIT=1335a48f1c034abc1fe24f6197ee7acfa3118bf0
-SW_AGENT_E2E_SERVICE_PROVIDER_COMMIT=828e6e2f2b57a0f06bb0d507e3296d2377943d9a
+SW_OAP_COMMIT=0664bcdcb4bbf9e37ed3d497ed28e527d40bd819
+SW_AGENT_E2E_SERVICE_PROVIDER_COMMIT=cc7a2c9e97fd2c421adbe3e9c471688459a446d9