You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ha...@apache.org on 2023/03/30 13:01:57 UTC

[skywalking-banyandb] 01/01: Integrating Sharded Buffer into TSDB

This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch sst
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit e4b98d5605d6cfedcb4bd31d394c7b1748e7f2a3
Author: Gao Hongtao <ha...@gmail.com>
AuthorDate: Thu Mar 30 12:56:04 2023 +0000

    Integrating Sharded Buffer into TSDB
    
    The Sharded Buffer will replace Badger's memtable for data ingestion.
     Currently, Badger KV only provides SST.
    
    Signed-off-by: Gao Hongtao <ha...@gmail.com>
---
 banyand/kv/badger.go          |   5 ++
 banyand/kv/kv.go              |  13 +---
 banyand/tsdb/block.go         | 143 +++++++++++++++++++++++++++++++++---------
 banyand/tsdb/buffer.go        |  26 +++++++-
 dist/LICENSE                  |   2 +-
 go.mod                        |   2 +-
 go.sum                        |   4 +-
 test/cases/measure/measure.go |   2 +-
 test/e2e-v2/script/env        |   8 +--
 9 files changed, 156 insertions(+), 49 deletions(-)

diff --git a/banyand/kv/badger.go b/banyand/kv/badger.go
index 7f616d2f..7bc7ee99 100644
--- a/banyand/kv/badger.go
+++ b/banyand/kv/badger.go
@@ -26,6 +26,7 @@ import (
 
 	"github.com/dgraph-io/badger/v3"
 	"github.com/dgraph-io/badger/v3/banyandb"
+	"github.com/dgraph-io/badger/v3/skl"
 	"github.com/dgraph-io/badger/v3/y"
 
 	"github.com/apache/skywalking-banyandb/banyand/observability"
@@ -49,6 +50,10 @@ type badgerTSS struct {
 	dbOpts badger.Options
 }
 
+func (b *badgerTSS) Handover(skl *skl.Skiplist) error {
+	return b.db.HandoverIterator(skl.NewUniIterator(false))
+}
+
 func (b *badgerTSS) Stats() (s observability.Statistics) {
 	return badgerStats(b.db)
 }
diff --git a/banyand/kv/kv.go b/banyand/kv/kv.go
index 8f85432e..1a9a7cd6 100644
--- a/banyand/kv/kv.go
+++ b/banyand/kv/kv.go
@@ -25,6 +25,7 @@ import (
 
 	"github.com/dgraph-io/badger/v3"
 	"github.com/dgraph-io/badger/v3/options"
+	"github.com/dgraph-io/badger/v3/skl"
 	"github.com/pkg/errors"
 
 	"github.com/apache/skywalking-banyandb/banyand/observability"
@@ -76,15 +77,6 @@ type Store interface {
 	Reader
 }
 
-// TimeSeriesWriter allows writing to a time-series storage.
-type TimeSeriesWriter interface {
-	// Put a value with a timestamp/version
-	Put(key, val []byte, ts uint64) error
-	// PutAsync a value with a timestamp/version asynchronously.
-	// Injected "f" func will notice the result of value write.
-	PutAsync(key, val []byte, ts uint64, f func(error)) error
-}
-
 // TimeSeriesReader allows retrieving data from a time-series storage.
 type TimeSeriesReader interface {
 	// Get a value by its key and timestamp/version
@@ -95,7 +87,7 @@ type TimeSeriesReader interface {
 type TimeSeriesStore interface {
 	observability.Observable
 	io.Closer
-	TimeSeriesWriter
+	Handover(skl *skl.Skiplist) error
 	TimeSeriesReader
 }
 
@@ -191,6 +183,7 @@ func OpenTimeSeriesStore(path string, options ...TimeSeriesOptions) (TimeSeriesS
 	if btss.dbOpts.MemTableSize < 8<<20 {
 		btss.dbOpts = btss.dbOpts.WithValueThreshold(1 << 10)
 	}
+	btss.dbOpts = btss.dbOpts.WithInTable()
 	var err error
 	btss.db, err = badger.Open(btss.dbOpts)
 	if err != nil {
diff --git a/banyand/tsdb/block.go b/banyand/tsdb/block.go
index 70e94db6..763f86bd 100644
--- a/banyand/tsdb/block.go
+++ b/banyand/tsdb/block.go
@@ -29,6 +29,7 @@ import (
 	"sync/atomic"
 	"time"
 
+	"github.com/dgraph-io/badger/v3/skl"
 	"github.com/pkg/errors"
 	"go.uber.org/multierr"
 
@@ -48,40 +49,46 @@ const (
 	componentSecondInvertedIdx = "inverted"
 	componentSecondLSMIdx      = "lsm"
 
-	defaultMainMemorySize = 8 << 20
-	defaultEnqueueTimeout = 500 * time.Millisecond
+	defaultBufferSize       = 8 << 20
+	defaultEnqueueTimeout   = 500 * time.Millisecond
+	maxBlockAge             = time.Hour
+	defaultWriteConcurrency = 1000
+	defaultNumBufferShards  = 2
 )
 
 var errBlockClosingInterrupted = errors.New("interrupt to close the block")
 
 type block struct {
-	store    kv.TimeSeriesStore
-	openOpts openOpts
-	queue    bucket.Queue
-	bucket.Reporter
-	clock         timestamp.Clock
-	lsmIndex      index.Store
 	invertedIndex index.Store
-	closed        *atomic.Bool
-	l             *logger.Logger
-	deleted       *atomic.Bool
-	ref           *atomic.Int32
-	position      common.Position
+	sst           kv.TimeSeriesStore
+	queue         bucket.Queue
+	bucket.Reporter
+	clock            timestamp.Clock
+	lsmIndex         index.Store
+	closed           *atomic.Bool
+	buffer           *Buffer
+	l                *logger.Logger
+	deleted          *atomic.Bool
+	ref              *atomic.Int32
+	closeBufferTimer *time.Timer
+	position         common.Position
 	timestamp.TimeRange
 	segSuffix   string
 	suffix      string
 	path        string
 	closableLst []io.Closer
+	openOpts    openOpts
 	lock        sync.RWMutex
 	segID       SectionID
 	blockID     SectionID
 }
 
 type openOpts struct {
-	store     []kv.TimeSeriesOptions
-	storePath string
-	inverted  inverted.StoreOpts
-	lsm       lsm.StoreOpts
+	storePath  string
+	inverted   inverted.StoreOpts
+	lsm        lsm.StoreOpts
+	store      []kv.TimeSeriesOptions
+	bufferSize int64
 }
 
 type blockOpts struct {
@@ -140,20 +147,21 @@ func (b *block) options(ctx context.Context) {
 	if options.CompressionMethod.Type == CompressionTypeZSTD {
 		b.openOpts.store = append(b.openOpts.store, kv.TSSWithZSTDCompression(options.CompressionMethod.ChunkSizeInBytes))
 	}
-	var memSize int64
+	var bufferSize int64
 	if options.BlockMemSize < 1 {
-		memSize = defaultMainMemorySize
+		bufferSize = defaultBufferSize
 	} else {
-		memSize = options.BlockMemSize
+		bufferSize = options.BlockMemSize
 	}
-	b.openOpts.store = append(b.openOpts.store, kv.TSSWithMemTableSize(memSize), kv.TSSWithLogger(b.l.Named(componentMain)))
+	b.openOpts.bufferSize = bufferSize
+	b.openOpts.store = append(b.openOpts.store, kv.TSSWithMemTableSize(bufferSize), kv.TSSWithLogger(b.l.Named(componentMain)))
 	b.openOpts.storePath = path.Join(b.path, componentMain)
 	b.openOpts.inverted = inverted.StoreOpts{
 		Path:         path.Join(b.path, componentSecondInvertedIdx),
 		Logger:       b.l.Named(componentSecondInvertedIdx),
 		BatchWaitSec: options.BlockInvertedIndex.BatchWaitSec,
 	}
-	lsmMemSize := memSize / 8
+	lsmMemSize := bufferSize / 8
 	if lsmMemSize < defaultKVMemorySize {
 		lsmMemSize = defaultKVMemorySize
 	}
@@ -177,10 +185,15 @@ func (b *block) openSafely() (err error) {
 }
 
 func (b *block) open() (err error) {
-	if b.store, err = kv.OpenTimeSeriesStore(b.openOpts.storePath, b.openOpts.store...); err != nil {
+	if b.isActive() {
+		if err = b.openBuffer(); err != nil {
+			return err
+		}
+	}
+	if b.sst, err = kv.OpenTimeSeriesStore(b.openOpts.storePath, b.openOpts.store...); err != nil {
 		return err
 	}
-	b.closableLst = append(b.closableLst, b.store)
+	b.closableLst = append(b.closableLst, b.sst)
 	if b.invertedIndex, err = inverted.NewStore(b.openOpts.inverted); err != nil {
 		return err
 	}
@@ -193,6 +206,52 @@ func (b *block) open() (err error) {
 	return nil
 }
 
+func (b *block) openBuffer() (err error) {
+	if b.buffer != nil {
+		return nil
+	}
+	if b.buffer, err = NewBuffer(b.l, int(b.openOpts.bufferSize/defaultNumBufferShards),
+		defaultWriteConcurrency, defaultNumBufferShards, b.flush); err != nil {
+		return err
+	}
+	now := b.clock.Now()
+	max := b.maxTime()
+	closeAfter := max.Sub(now)
+	// TODO: we should move inactive write buffer to segment or shard.
+	if now.After(max) {
+		closeAfter = maxBlockAge
+	}
+	// create a timer to close buffer
+	b.closeBufferTimer = time.AfterFunc(closeAfter, func() {
+		if b.l.Debug().Enabled() {
+			b.l.Debug().Msg("closing buffer")
+		}
+		b.lock.Lock()
+		defer b.lock.Unlock()
+		if b.buffer == nil {
+			return
+		}
+		if err := b.buffer.Close(); err != nil {
+			b.l.Error().Err(err).Msg("close buffer error")
+		}
+		b.buffer = nil
+	})
+	return nil
+}
+
+func (b *block) isActive() bool {
+	return !b.clock.Now().After(b.maxTime())
+}
+
+func (b *block) maxTime() time.Time {
+	return b.End.Add(maxBlockAge)
+}
+
+func (b *block) flush(shardIndex int, skl *skl.Skiplist) error {
+	b.l.Info().Int("shard", shardIndex).Msg("flushing buffer")
+	return b.sst.Handover(skl)
+}
+
 func (b *block) delegate(ctx context.Context) (blockDelegate, error) {
 	if b.deleted.Load() {
 		return nil, errors.WithMessagef(errBlockAbsent, "block %s is deleted", b)
@@ -284,6 +343,12 @@ func (b *block) close(ctx context.Context) (err error) {
 	case <-ch:
 	}
 	b.closed.Store(true)
+	if b.closeBufferTimer != nil {
+		b.closeBufferTimer.Stop()
+	}
+	if b.buffer != nil {
+		err = multierr.Append(err, b.buffer.Close())
+	}
 	for _, closer := range b.closableLst {
 		err = multierr.Append(err, closer.Close())
 	}
@@ -308,15 +373,25 @@ func (b *block) String() string {
 }
 
 func (b *block) stats() (names []string, stats []observability.Statistics) {
-	names = append(names, componentMain, componentSecondInvertedIdx, componentSecondLSMIdx)
 	if b.Closed() {
-		stats = make([]observability.Statistics, 3)
+		stats = make([]observability.Statistics, 0)
 		return
 	}
-	stats = append(stats, b.store.Stats(), b.invertedIndex.Stats(), b.lsmIndex.Stats())
+	bnn, bss := b.buffer.Stats()
+	names = append(names, bnn...)
+	stats = append(stats, bss...)
+	names = append(names, componentSecondInvertedIdx, componentSecondLSMIdx)
+	stats = append(stats, b.invertedIndex.Stats(), b.lsmIndex.Stats())
 	return names, stats
 }
 
+func (b *block) Get(key []byte, ts uint64) ([]byte, error) {
+	if v, ok := b.buffer.Read(key, time.Unix(0, int64(ts))); ok {
+		return v, nil
+	}
+	return b.sst.Get(key, ts)
+}
+
 type blockDelegate interface {
 	io.Closer
 	contains(ts time.Time) bool
@@ -340,7 +415,7 @@ type bDelegate struct {
 }
 
 func (d *bDelegate) dataReader() kv.TimeSeriesReader {
-	return d.delegate.store
+	return d.delegate
 }
 
 func (d *bDelegate) lsmIndexReader() index.Searcher {
@@ -364,7 +439,17 @@ func (d *bDelegate) identity() (segID SectionID, blockID SectionID) {
 }
 
 func (d *bDelegate) write(key []byte, val []byte, ts time.Time) error {
-	return d.delegate.store.Put(key, val, uint64(ts.UnixNano()))
+	// On-demand open buffer
+	if d.delegate.buffer == nil {
+		d.delegate.lock.Lock()
+		if err := d.delegate.openBuffer(); err != nil {
+			d.delegate.lock.Unlock()
+			return err
+		}
+		d.delegate.lock.Unlock()
+	}
+	d.delegate.buffer.Write(key, val, ts)
+	return nil
 }
 
 func (d *bDelegate) writePrimaryIndex(field index.Field, id common.ItemID) error {
diff --git a/banyand/tsdb/buffer.go b/banyand/tsdb/buffer.go
index 8372554a..e6f4ac0a 100644
--- a/banyand/tsdb/buffer.go
+++ b/banyand/tsdb/buffer.go
@@ -25,6 +25,7 @@ import (
 	"github.com/dgraph-io/badger/v3/skl"
 	"github.com/dgraph-io/badger/v3/y"
 
+	"github.com/apache/skywalking-banyandb/banyand/observability"
 	"github.com/apache/skywalking-banyandb/pkg/convert"
 	"github.com/apache/skywalking-banyandb/pkg/logger"
 	"github.com/apache/skywalking-banyandb/pkg/run"
@@ -105,7 +106,7 @@ func NewBuffer(log *logger.Logger, flushSize, writeConcurrency, numShards int, o
 
 // Write adds a key-value pair with a timestamp to the appropriate shard bucket in the buffer.
 func (b *Buffer) Write(key, value []byte, timestamp time.Time) {
-	if !b.entryCloser.AddRunning() {
+	if b == nil || !b.entryCloser.AddRunning() {
 		return
 	}
 	defer b.entryCloser.Done()
@@ -119,6 +120,10 @@ func (b *Buffer) Write(key, value []byte, timestamp time.Time) {
 
 // Read retrieves the value associated with the given key and timestamp from the appropriate shard bucket in the buffer.
 func (b *Buffer) Read(key []byte, ts time.Time) ([]byte, bool) {
+	if b == nil || !b.entryCloser.AddRunning() {
+		return nil, false
+	}
+	defer b.entryCloser.Done()
 	keyWithTS := y.KeyWithTs(key, uint64(ts.UnixNano()))
 	index := b.getShardIndex(key)
 	epoch := uint64(ts.UnixNano())
@@ -157,6 +162,25 @@ func (b *Buffer) Close() error {
 	return nil
 }
 
+// Stats returns the statistics for the buffer.
+func (b *Buffer) Stats() ([]string, []observability.Statistics) {
+	names := make([]string, b.numShards)
+	stats := make([]observability.Statistics, b.numShards)
+	for i := 0; i < b.numShards; i++ {
+		names[i] = fmt.Sprintf("buffer-%d", i)
+		var size, maxSize int64
+		for _, l := range b.buckets[i].getAll() {
+			size += l.MemSize()
+			maxSize += int64(b.buckets[i].size)
+		}
+		stats[i] = observability.Statistics{
+			MemBytes:    size,
+			MaxMemBytes: maxSize,
+		}
+	}
+	return names, stats
+}
+
 func (b *Buffer) getShardIndex(key []byte) uint64 {
 	return convert.Hash(key) % uint64(b.numShards)
 }
diff --git a/dist/LICENSE b/dist/LICENSE
index 75485dd4..d04bca78 100644
--- a/dist/LICENSE
+++ b/dist/LICENSE
@@ -178,7 +178,7 @@
 Apache-2.0 licenses
 ========================================================================
 
-    github.com/SkyAPM/badger/v3 v3.0.0-20221227124922-b88a2f7d336f Apache-2.0
+    github.com/SkyAPM/badger/v3 v3.0.0-20230329010346-dfdf57f0581b Apache-2.0
     github.com/blevesearch/segment v0.9.0 Apache-2.0
     github.com/blevesearch/vellum v1.0.7 Apache-2.0
     github.com/coreos/go-semver v0.3.0 Apache-2.0
diff --git a/go.mod b/go.mod
index aba7feed..a11d213a 100644
--- a/go.mod
+++ b/go.mod
@@ -136,5 +136,5 @@ replace (
 	github.com/blugelabs/bluge => github.com/zinclabs/bluge v1.1.5
 	github.com/blugelabs/bluge_segment_api => github.com/zinclabs/bluge_segment_api v1.0.0
 	github.com/blugelabs/ice => github.com/zinclabs/ice v1.1.3
-	github.com/dgraph-io/badger/v3 v3.2011.1 => github.com/SkyAPM/badger/v3 v3.0.0-20221227124922-b88a2f7d336f
+	github.com/dgraph-io/badger/v3 v3.2011.1 => github.com/SkyAPM/badger/v3 v3.0.0-20230329010346-dfdf57f0581b
 )
diff --git a/go.sum b/go.sum
index c5d2e026..ef0a79f4 100644
--- a/go.sum
+++ b/go.sum
@@ -46,8 +46,8 @@ github.com/OneOfOne/xxhash v1.2.2 h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE
 github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
 github.com/RoaringBitmap/roaring v0.9.4 h1:ckvZSX5gwCRaJYBNe7syNawCU5oruY9gQmjXlp4riwo=
 github.com/RoaringBitmap/roaring v0.9.4/go.mod h1:icnadbWcNyfEHlYdr+tDlOTih1Bf/h+rzPpv4sbomAA=
-github.com/SkyAPM/badger/v3 v3.0.0-20221227124922-b88a2f7d336f h1:fcfR1l/cO03QQWnwhlYdgXkjZBV58ZmxwgRJj7HiNQw=
-github.com/SkyAPM/badger/v3 v3.0.0-20221227124922-b88a2f7d336f/go.mod h1:4WETftF8A4mEeFgIsYB/VvGo5kfTVl/neYgEqlVW9Ag=
+github.com/SkyAPM/badger/v3 v3.0.0-20230329010346-dfdf57f0581b h1:ZThiWg9yHnIMYy+KaAgkNZhnFgF7kvtytsdjb24zAGU=
+github.com/SkyAPM/badger/v3 v3.0.0-20230329010346-dfdf57f0581b/go.mod h1:4WETftF8A4mEeFgIsYB/VvGo5kfTVl/neYgEqlVW9Ag=
 github.com/SkyAPM/clock v1.3.1-0.20220809233656-dc7607c94a97 h1:FKuhJ+6n/DHspGeLleeNbziWnKr9gHKYN4q7NcoCp4s=
 github.com/SkyAPM/clock v1.3.1-0.20220809233656-dc7607c94a97/go.mod h1:2xGRl9H1pllhxTbEGO1W3gDkip8P9GQaHPni/wpdR44=
 github.com/ajstarks/svgo v0.0.0-20180226025133-644b8db467af/go.mod h1:K08gAheRH3/J6wwsYMMT4xOr94bZjxIelGM0+d/wbFw=
diff --git a/test/cases/measure/measure.go b/test/cases/measure/measure.go
index b906d86b..4db0b05d 100644
--- a/test/cases/measure/measure.go
+++ b/test/cases/measure/measure.go
@@ -39,7 +39,7 @@ var (
 	}
 )
 
-var _ = g.FDescribeTable("Scanning Measures", verify,
+var _ = g.DescribeTable("Scanning Measures", verify,
 	g.Entry("all", helpers.Args{Input: "all", Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
 	g.Entry("all_only_fields", helpers.Args{Input: "all", Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
 	g.Entry("filter by tag", helpers.Args{Input: "tag_filter", Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
diff --git a/test/e2e-v2/script/env b/test/e2e-v2/script/env
index 493eb132..52d79dc8 100644
--- a/test/e2e-v2/script/env
+++ b/test/e2e-v2/script/env
@@ -22,8 +22,8 @@ SW_AGENT_PYTHON_COMMIT=c76a6ec51a478ac91abb20ec8f22a99b8d4d6a58
 SW_AGENT_CLIENT_JS_COMMIT=af0565a67d382b683c1dbd94c379b7080db61449
 SW_AGENT_CLIENT_JS_TEST_COMMIT=4f1eb1dcdbde3ec4a38534bf01dded4ab5d2f016
 SW_KUBERNETES_COMMIT_SHA=b670c41d94a82ddefcf466d54bab5c492d88d772
-SW_ROVER_COMMIT=d956eaede57b62108b78bca48045bd09ba88e653
-SW_CTL_COMMIT=e684fae0107045fc23799146d62f04cb68bd5a3b
+SW_ROVER_COMMIT=fc8d074c6d34ecfee585a7097cbd5aef1ca680a5
+SW_CTL_COMMIT=23debb3b77426edd70192095a5fe9b0fc9031068
 
-SW_OAP_COMMIT=1335a48f1c034abc1fe24f6197ee7acfa3118bf0
-SW_AGENT_E2E_SERVICE_PROVIDER_COMMIT=828e6e2f2b57a0f06bb0d507e3296d2377943d9a
+SW_OAP_COMMIT=0664bcdcb4bbf9e37ed3d497ed28e527d40bd819
+SW_AGENT_E2E_SERVICE_PROVIDER_COMMIT=cc7a2c9e97fd2c421adbe3e9c471688459a446d9