You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ha...@apache.org on 2023/03/30 13:05:49 UTC

[skywalking-banyandb] branch sst updated (e4b98d56 -> 5064d78b)

This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a change to branch sst
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


 discard e4b98d56 Integrating Sharded Buffer into TSDB
     new 5064d78b Integrating Sharded Buffer into TSDB

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (e4b98d56)
            \
             N -- N -- N   refs/heads/sst (5064d78b)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 CHANGES.md | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)


[skywalking-banyandb] 01/01: Integrating Sharded Buffer into TSDB

Posted by ha...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch sst
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit 5064d78b706206acf213814b255c08c83edeec05
Author: Gao Hongtao <ha...@gmail.com>
AuthorDate: Thu Mar 30 12:56:04 2023 +0000

    Integrating Sharded Buffer into TSDB
    
    The Sharded Buffer will replace Badger's memtable for data ingestion.
     Currently, Badger KV only provides SST.
    
    Signed-off-by: Gao Hongtao <ha...@gmail.com>
---
 CHANGES.md                    |   2 +-
 banyand/kv/badger.go          |   5 ++
 banyand/kv/kv.go              |  13 +---
 banyand/tsdb/block.go         | 143 +++++++++++++++++++++++++++++++++---------
 banyand/tsdb/buffer.go        |  26 +++++++-
 dist/LICENSE                  |   2 +-
 go.mod                        |   2 +-
 go.sum                        |   4 +-
 test/cases/measure/measure.go |   2 +-
 test/e2e-v2/script/env        |   8 +--
 10 files changed, 157 insertions(+), 50 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index 17c09ea6..fc47300b 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -9,7 +9,7 @@ Release Notes.
 - Add TSDB concept document.
 - [UI] Add YAML editor for inputting query criteria.
 - Refactor TopN to support `NULL` group while keeping seriesID from the source measure.
-- Add a sharded buffer to TSDB.
+- Add a sharded buffer to TSDB to replace Badger's memtable. Badger KV only provides SST.
 
 ### Chores
 
diff --git a/banyand/kv/badger.go b/banyand/kv/badger.go
index 7f616d2f..7bc7ee99 100644
--- a/banyand/kv/badger.go
+++ b/banyand/kv/badger.go
@@ -26,6 +26,7 @@ import (
 
 	"github.com/dgraph-io/badger/v3"
 	"github.com/dgraph-io/badger/v3/banyandb"
+	"github.com/dgraph-io/badger/v3/skl"
 	"github.com/dgraph-io/badger/v3/y"
 
 	"github.com/apache/skywalking-banyandb/banyand/observability"
@@ -49,6 +50,10 @@ type badgerTSS struct {
 	dbOpts badger.Options
 }
 
+func (b *badgerTSS) Handover(skl *skl.Skiplist) error {
+	return b.db.HandoverIterator(skl.NewUniIterator(false))
+}
+
 func (b *badgerTSS) Stats() (s observability.Statistics) {
 	return badgerStats(b.db)
 }
diff --git a/banyand/kv/kv.go b/banyand/kv/kv.go
index 8f85432e..1a9a7cd6 100644
--- a/banyand/kv/kv.go
+++ b/banyand/kv/kv.go
@@ -25,6 +25,7 @@ import (
 
 	"github.com/dgraph-io/badger/v3"
 	"github.com/dgraph-io/badger/v3/options"
+	"github.com/dgraph-io/badger/v3/skl"
 	"github.com/pkg/errors"
 
 	"github.com/apache/skywalking-banyandb/banyand/observability"
@@ -76,15 +77,6 @@ type Store interface {
 	Reader
 }
 
-// TimeSeriesWriter allows writing to a time-series storage.
-type TimeSeriesWriter interface {
-	// Put a value with a timestamp/version
-	Put(key, val []byte, ts uint64) error
-	// PutAsync a value with a timestamp/version asynchronously.
-	// Injected "f" func will notice the result of value write.
-	PutAsync(key, val []byte, ts uint64, f func(error)) error
-}
-
 // TimeSeriesReader allows retrieving data from a time-series storage.
 type TimeSeriesReader interface {
 	// Get a value by its key and timestamp/version
@@ -95,7 +87,7 @@ type TimeSeriesReader interface {
 type TimeSeriesStore interface {
 	observability.Observable
 	io.Closer
-	TimeSeriesWriter
+	Handover(skl *skl.Skiplist) error
 	TimeSeriesReader
 }
 
@@ -191,6 +183,7 @@ func OpenTimeSeriesStore(path string, options ...TimeSeriesOptions) (TimeSeriesS
 	if btss.dbOpts.MemTableSize < 8<<20 {
 		btss.dbOpts = btss.dbOpts.WithValueThreshold(1 << 10)
 	}
+	btss.dbOpts = btss.dbOpts.WithInTable()
 	var err error
 	btss.db, err = badger.Open(btss.dbOpts)
 	if err != nil {
diff --git a/banyand/tsdb/block.go b/banyand/tsdb/block.go
index 70e94db6..763f86bd 100644
--- a/banyand/tsdb/block.go
+++ b/banyand/tsdb/block.go
@@ -29,6 +29,7 @@ import (
 	"sync/atomic"
 	"time"
 
+	"github.com/dgraph-io/badger/v3/skl"
 	"github.com/pkg/errors"
 	"go.uber.org/multierr"
 
@@ -48,40 +49,46 @@ const (
 	componentSecondInvertedIdx = "inverted"
 	componentSecondLSMIdx      = "lsm"
 
-	defaultMainMemorySize = 8 << 20
-	defaultEnqueueTimeout = 500 * time.Millisecond
+	defaultBufferSize       = 8 << 20
+	defaultEnqueueTimeout   = 500 * time.Millisecond
+	maxBlockAge             = time.Hour
+	defaultWriteConcurrency = 1000
+	defaultNumBufferShards  = 2
 )
 
 var errBlockClosingInterrupted = errors.New("interrupt to close the block")
 
 type block struct {
-	store    kv.TimeSeriesStore
-	openOpts openOpts
-	queue    bucket.Queue
-	bucket.Reporter
-	clock         timestamp.Clock
-	lsmIndex      index.Store
 	invertedIndex index.Store
-	closed        *atomic.Bool
-	l             *logger.Logger
-	deleted       *atomic.Bool
-	ref           *atomic.Int32
-	position      common.Position
+	sst           kv.TimeSeriesStore
+	queue         bucket.Queue
+	bucket.Reporter
+	clock            timestamp.Clock
+	lsmIndex         index.Store
+	closed           *atomic.Bool
+	buffer           *Buffer
+	l                *logger.Logger
+	deleted          *atomic.Bool
+	ref              *atomic.Int32
+	closeBufferTimer *time.Timer
+	position         common.Position
 	timestamp.TimeRange
 	segSuffix   string
 	suffix      string
 	path        string
 	closableLst []io.Closer
+	openOpts    openOpts
 	lock        sync.RWMutex
 	segID       SectionID
 	blockID     SectionID
 }
 
 type openOpts struct {
-	store     []kv.TimeSeriesOptions
-	storePath string
-	inverted  inverted.StoreOpts
-	lsm       lsm.StoreOpts
+	storePath  string
+	inverted   inverted.StoreOpts
+	lsm        lsm.StoreOpts
+	store      []kv.TimeSeriesOptions
+	bufferSize int64
 }
 
 type blockOpts struct {
@@ -140,20 +147,21 @@ func (b *block) options(ctx context.Context) {
 	if options.CompressionMethod.Type == CompressionTypeZSTD {
 		b.openOpts.store = append(b.openOpts.store, kv.TSSWithZSTDCompression(options.CompressionMethod.ChunkSizeInBytes))
 	}
-	var memSize int64
+	var bufferSize int64
 	if options.BlockMemSize < 1 {
-		memSize = defaultMainMemorySize
+		bufferSize = defaultBufferSize
 	} else {
-		memSize = options.BlockMemSize
+		bufferSize = options.BlockMemSize
 	}
-	b.openOpts.store = append(b.openOpts.store, kv.TSSWithMemTableSize(memSize), kv.TSSWithLogger(b.l.Named(componentMain)))
+	b.openOpts.bufferSize = bufferSize
+	b.openOpts.store = append(b.openOpts.store, kv.TSSWithMemTableSize(bufferSize), kv.TSSWithLogger(b.l.Named(componentMain)))
 	b.openOpts.storePath = path.Join(b.path, componentMain)
 	b.openOpts.inverted = inverted.StoreOpts{
 		Path:         path.Join(b.path, componentSecondInvertedIdx),
 		Logger:       b.l.Named(componentSecondInvertedIdx),
 		BatchWaitSec: options.BlockInvertedIndex.BatchWaitSec,
 	}
-	lsmMemSize := memSize / 8
+	lsmMemSize := bufferSize / 8
 	if lsmMemSize < defaultKVMemorySize {
 		lsmMemSize = defaultKVMemorySize
 	}
@@ -177,10 +185,15 @@ func (b *block) openSafely() (err error) {
 }
 
 func (b *block) open() (err error) {
-	if b.store, err = kv.OpenTimeSeriesStore(b.openOpts.storePath, b.openOpts.store...); err != nil {
+	if b.isActive() {
+		if err = b.openBuffer(); err != nil {
+			return err
+		}
+	}
+	if b.sst, err = kv.OpenTimeSeriesStore(b.openOpts.storePath, b.openOpts.store...); err != nil {
 		return err
 	}
-	b.closableLst = append(b.closableLst, b.store)
+	b.closableLst = append(b.closableLst, b.sst)
 	if b.invertedIndex, err = inverted.NewStore(b.openOpts.inverted); err != nil {
 		return err
 	}
@@ -193,6 +206,52 @@ func (b *block) open() (err error) {
 	return nil
 }
 
+func (b *block) openBuffer() (err error) {
+	if b.buffer != nil {
+		return nil
+	}
+	if b.buffer, err = NewBuffer(b.l, int(b.openOpts.bufferSize/defaultNumBufferShards),
+		defaultWriteConcurrency, defaultNumBufferShards, b.flush); err != nil {
+		return err
+	}
+	now := b.clock.Now()
+	max := b.maxTime()
+	closeAfter := max.Sub(now)
+	// TODO: we should move inactive write buffer to segment or shard.
+	if now.After(max) {
+		closeAfter = maxBlockAge
+	}
+	// create a timer to close buffer
+	b.closeBufferTimer = time.AfterFunc(closeAfter, func() {
+		if b.l.Debug().Enabled() {
+			b.l.Debug().Msg("closing buffer")
+		}
+		b.lock.Lock()
+		defer b.lock.Unlock()
+		if b.buffer == nil {
+			return
+		}
+		if err := b.buffer.Close(); err != nil {
+			b.l.Error().Err(err).Msg("close buffer error")
+		}
+		b.buffer = nil
+	})
+	return nil
+}
+
+func (b *block) isActive() bool {
+	return !b.clock.Now().After(b.maxTime())
+}
+
+func (b *block) maxTime() time.Time {
+	return b.End.Add(maxBlockAge)
+}
+
+func (b *block) flush(shardIndex int, skl *skl.Skiplist) error {
+	b.l.Info().Int("shard", shardIndex).Msg("flushing buffer")
+	return b.sst.Handover(skl)
+}
+
 func (b *block) delegate(ctx context.Context) (blockDelegate, error) {
 	if b.deleted.Load() {
 		return nil, errors.WithMessagef(errBlockAbsent, "block %s is deleted", b)
@@ -284,6 +343,12 @@ func (b *block) close(ctx context.Context) (err error) {
 	case <-ch:
 	}
 	b.closed.Store(true)
+	if b.closeBufferTimer != nil {
+		b.closeBufferTimer.Stop()
+	}
+	if b.buffer != nil {
+		err = multierr.Append(err, b.buffer.Close())
+	}
 	for _, closer := range b.closableLst {
 		err = multierr.Append(err, closer.Close())
 	}
@@ -308,15 +373,25 @@ func (b *block) String() string {
 }
 
 func (b *block) stats() (names []string, stats []observability.Statistics) {
-	names = append(names, componentMain, componentSecondInvertedIdx, componentSecondLSMIdx)
 	if b.Closed() {
-		stats = make([]observability.Statistics, 3)
+		stats = make([]observability.Statistics, 0)
 		return
 	}
-	stats = append(stats, b.store.Stats(), b.invertedIndex.Stats(), b.lsmIndex.Stats())
+	bnn, bss := b.buffer.Stats()
+	names = append(names, bnn...)
+	stats = append(stats, bss...)
+	names = append(names, componentSecondInvertedIdx, componentSecondLSMIdx)
+	stats = append(stats, b.invertedIndex.Stats(), b.lsmIndex.Stats())
 	return names, stats
 }
 
+func (b *block) Get(key []byte, ts uint64) ([]byte, error) {
+	if v, ok := b.buffer.Read(key, time.Unix(0, int64(ts))); ok {
+		return v, nil
+	}
+	return b.sst.Get(key, ts)
+}
+
 type blockDelegate interface {
 	io.Closer
 	contains(ts time.Time) bool
@@ -340,7 +415,7 @@ type bDelegate struct {
 }
 
 func (d *bDelegate) dataReader() kv.TimeSeriesReader {
-	return d.delegate.store
+	return d.delegate
 }
 
 func (d *bDelegate) lsmIndexReader() index.Searcher {
@@ -364,7 +439,17 @@ func (d *bDelegate) identity() (segID SectionID, blockID SectionID) {
 }
 
 func (d *bDelegate) write(key []byte, val []byte, ts time.Time) error {
-	return d.delegate.store.Put(key, val, uint64(ts.UnixNano()))
+	// On-demand open buffer
+	if d.delegate.buffer == nil {
+		d.delegate.lock.Lock()
+		if err := d.delegate.openBuffer(); err != nil {
+			d.delegate.lock.Unlock()
+			return err
+		}
+		d.delegate.lock.Unlock()
+	}
+	d.delegate.buffer.Write(key, val, ts)
+	return nil
 }
 
 func (d *bDelegate) writePrimaryIndex(field index.Field, id common.ItemID) error {
diff --git a/banyand/tsdb/buffer.go b/banyand/tsdb/buffer.go
index 8372554a..e6f4ac0a 100644
--- a/banyand/tsdb/buffer.go
+++ b/banyand/tsdb/buffer.go
@@ -25,6 +25,7 @@ import (
 	"github.com/dgraph-io/badger/v3/skl"
 	"github.com/dgraph-io/badger/v3/y"
 
+	"github.com/apache/skywalking-banyandb/banyand/observability"
 	"github.com/apache/skywalking-banyandb/pkg/convert"
 	"github.com/apache/skywalking-banyandb/pkg/logger"
 	"github.com/apache/skywalking-banyandb/pkg/run"
@@ -105,7 +106,7 @@ func NewBuffer(log *logger.Logger, flushSize, writeConcurrency, numShards int, o
 
 // Write adds a key-value pair with a timestamp to the appropriate shard bucket in the buffer.
 func (b *Buffer) Write(key, value []byte, timestamp time.Time) {
-	if !b.entryCloser.AddRunning() {
+	if b == nil || !b.entryCloser.AddRunning() {
 		return
 	}
 	defer b.entryCloser.Done()
@@ -119,6 +120,10 @@ func (b *Buffer) Write(key, value []byte, timestamp time.Time) {
 
 // Read retrieves the value associated with the given key and timestamp from the appropriate shard bucket in the buffer.
 func (b *Buffer) Read(key []byte, ts time.Time) ([]byte, bool) {
+	if b == nil || !b.entryCloser.AddRunning() {
+		return nil, false
+	}
+	defer b.entryCloser.Done()
 	keyWithTS := y.KeyWithTs(key, uint64(ts.UnixNano()))
 	index := b.getShardIndex(key)
 	epoch := uint64(ts.UnixNano())
@@ -157,6 +162,25 @@ func (b *Buffer) Close() error {
 	return nil
 }
 
+// Stats returns the statistics for the buffer.
+func (b *Buffer) Stats() ([]string, []observability.Statistics) {
+	names := make([]string, b.numShards)
+	stats := make([]observability.Statistics, b.numShards)
+	for i := 0; i < b.numShards; i++ {
+		names[i] = fmt.Sprintf("buffer-%d", i)
+		var size, maxSize int64
+		for _, l := range b.buckets[i].getAll() {
+			size += l.MemSize()
+			maxSize += int64(b.buckets[i].size)
+		}
+		stats[i] = observability.Statistics{
+			MemBytes:    size,
+			MaxMemBytes: maxSize,
+		}
+	}
+	return names, stats
+}
+
 func (b *Buffer) getShardIndex(key []byte) uint64 {
 	return convert.Hash(key) % uint64(b.numShards)
 }
diff --git a/dist/LICENSE b/dist/LICENSE
index 75485dd4..d04bca78 100644
--- a/dist/LICENSE
+++ b/dist/LICENSE
@@ -178,7 +178,7 @@
 Apache-2.0 licenses
 ========================================================================
 
-    github.com/SkyAPM/badger/v3 v3.0.0-20221227124922-b88a2f7d336f Apache-2.0
+    github.com/SkyAPM/badger/v3 v3.0.0-20230329010346-dfdf57f0581b Apache-2.0
     github.com/blevesearch/segment v0.9.0 Apache-2.0
     github.com/blevesearch/vellum v1.0.7 Apache-2.0
     github.com/coreos/go-semver v0.3.0 Apache-2.0
diff --git a/go.mod b/go.mod
index aba7feed..a11d213a 100644
--- a/go.mod
+++ b/go.mod
@@ -136,5 +136,5 @@ replace (
 	github.com/blugelabs/bluge => github.com/zinclabs/bluge v1.1.5
 	github.com/blugelabs/bluge_segment_api => github.com/zinclabs/bluge_segment_api v1.0.0
 	github.com/blugelabs/ice => github.com/zinclabs/ice v1.1.3
-	github.com/dgraph-io/badger/v3 v3.2011.1 => github.com/SkyAPM/badger/v3 v3.0.0-20221227124922-b88a2f7d336f
+	github.com/dgraph-io/badger/v3 v3.2011.1 => github.com/SkyAPM/badger/v3 v3.0.0-20230329010346-dfdf57f0581b
 )
diff --git a/go.sum b/go.sum
index c5d2e026..ef0a79f4 100644
--- a/go.sum
+++ b/go.sum
@@ -46,8 +46,8 @@ github.com/OneOfOne/xxhash v1.2.2 h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE
 github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
 github.com/RoaringBitmap/roaring v0.9.4 h1:ckvZSX5gwCRaJYBNe7syNawCU5oruY9gQmjXlp4riwo=
 github.com/RoaringBitmap/roaring v0.9.4/go.mod h1:icnadbWcNyfEHlYdr+tDlOTih1Bf/h+rzPpv4sbomAA=
-github.com/SkyAPM/badger/v3 v3.0.0-20221227124922-b88a2f7d336f h1:fcfR1l/cO03QQWnwhlYdgXkjZBV58ZmxwgRJj7HiNQw=
-github.com/SkyAPM/badger/v3 v3.0.0-20221227124922-b88a2f7d336f/go.mod h1:4WETftF8A4mEeFgIsYB/VvGo5kfTVl/neYgEqlVW9Ag=
+github.com/SkyAPM/badger/v3 v3.0.0-20230329010346-dfdf57f0581b h1:ZThiWg9yHnIMYy+KaAgkNZhnFgF7kvtytsdjb24zAGU=
+github.com/SkyAPM/badger/v3 v3.0.0-20230329010346-dfdf57f0581b/go.mod h1:4WETftF8A4mEeFgIsYB/VvGo5kfTVl/neYgEqlVW9Ag=
 github.com/SkyAPM/clock v1.3.1-0.20220809233656-dc7607c94a97 h1:FKuhJ+6n/DHspGeLleeNbziWnKr9gHKYN4q7NcoCp4s=
 github.com/SkyAPM/clock v1.3.1-0.20220809233656-dc7607c94a97/go.mod h1:2xGRl9H1pllhxTbEGO1W3gDkip8P9GQaHPni/wpdR44=
 github.com/ajstarks/svgo v0.0.0-20180226025133-644b8db467af/go.mod h1:K08gAheRH3/J6wwsYMMT4xOr94bZjxIelGM0+d/wbFw=
diff --git a/test/cases/measure/measure.go b/test/cases/measure/measure.go
index b906d86b..4db0b05d 100644
--- a/test/cases/measure/measure.go
+++ b/test/cases/measure/measure.go
@@ -39,7 +39,7 @@ var (
 	}
 )
 
-var _ = g.FDescribeTable("Scanning Measures", verify,
+var _ = g.DescribeTable("Scanning Measures", verify,
 	g.Entry("all", helpers.Args{Input: "all", Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
 	g.Entry("all_only_fields", helpers.Args{Input: "all", Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
 	g.Entry("filter by tag", helpers.Args{Input: "tag_filter", Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
diff --git a/test/e2e-v2/script/env b/test/e2e-v2/script/env
index 493eb132..52d79dc8 100644
--- a/test/e2e-v2/script/env
+++ b/test/e2e-v2/script/env
@@ -22,8 +22,8 @@ SW_AGENT_PYTHON_COMMIT=c76a6ec51a478ac91abb20ec8f22a99b8d4d6a58
 SW_AGENT_CLIENT_JS_COMMIT=af0565a67d382b683c1dbd94c379b7080db61449
 SW_AGENT_CLIENT_JS_TEST_COMMIT=4f1eb1dcdbde3ec4a38534bf01dded4ab5d2f016
 SW_KUBERNETES_COMMIT_SHA=b670c41d94a82ddefcf466d54bab5c492d88d772
-SW_ROVER_COMMIT=d956eaede57b62108b78bca48045bd09ba88e653
-SW_CTL_COMMIT=e684fae0107045fc23799146d62f04cb68bd5a3b
+SW_ROVER_COMMIT=fc8d074c6d34ecfee585a7097cbd5aef1ca680a5
+SW_CTL_COMMIT=23debb3b77426edd70192095a5fe9b0fc9031068
 
-SW_OAP_COMMIT=1335a48f1c034abc1fe24f6197ee7acfa3118bf0
-SW_AGENT_E2E_SERVICE_PROVIDER_COMMIT=828e6e2f2b57a0f06bb0d507e3296d2377943d9a
+SW_OAP_COMMIT=0664bcdcb4bbf9e37ed3d497ed28e527d40bd819
+SW_AGENT_E2E_SERVICE_PROVIDER_COMMIT=cc7a2c9e97fd2c421adbe3e9c471688459a446d9