You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ha...@apache.org on 2023/03/23 00:07:15 UTC

[skywalking-banyandb] branch tsdb-buffer updated (0bb5d0b9 -> 5e3d45b5)

This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a change to branch tsdb-buffer
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


 discard 0bb5d0b9 Add a sharded buffer to ingest data
     new 5e3d45b5 Add a sharded buffer to ingest data

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (0bb5d0b9)
            \
             N -- N -- N   refs/heads/tsdb-buffer (5e3d45b5)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 CHANGES.md | 1 +
 1 file changed, 1 insertion(+)


[skywalking-banyandb] 01/01: Add a sharded buffer to ingest data

Posted by ha...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch tsdb-buffer
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit 5e3d45b5056d50896f9dd79d1b0965ed56da0170
Author: Gao Hongtao <ha...@gmail.com>
AuthorDate: Thu Mar 23 00:02:30 2023 +0000

    Add a sharded buffer to ingest data
    
    Signed-off-by: Gao Hongtao <ha...@gmail.com>
---
 CHANGES.md                  |   1 +
 banyand/tsdb/buffer.go      | 212 ++++++++++++++++++++++++++++++++++++++++++++
 banyand/tsdb/buffer_test.go | 147 ++++++++++++++++++++++++++++++
 3 files changed, 360 insertions(+)

diff --git a/CHANGES.md b/CHANGES.md
index d4df9af1..17c09ea6 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -9,6 +9,7 @@ Release Notes.
 - Add TSDB concept document.
 - [UI] Add YAML editor for inputting query criteria.
 - Refactor TopN to support `NULL` group while keeping seriesID from the source measure.
+- Add a sharded buffer to TSDB.
 
 ### Chores
 
diff --git a/banyand/tsdb/buffer.go b/banyand/tsdb/buffer.go
new file mode 100644
index 00000000..0e351f3a
--- /dev/null
+++ b/banyand/tsdb/buffer.go
@@ -0,0 +1,212 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package tsdb
+
+import (
+	"fmt"
+	"sync"
+	"time"
+
+	"github.com/dgraph-io/badger/v3/skl"
+	"github.com/dgraph-io/badger/v3/y"
+
+	"github.com/apache/skywalking-banyandb/pkg/convert"
+	"github.com/apache/skywalking-banyandb/pkg/logger"
+	"github.com/apache/skywalking-banyandb/pkg/run"
+)
+
+const defaultSize = 1 << 20 // 1MB
+
+type operation struct {
+	key   []byte
+	value []byte
+	epoch uint64
+}
+
+type flushEvent struct {
+	data *skl.Skiplist
+}
+
+type onFlush func(shardIndex int, skl *skl.Skiplist) error
+
+type bufferShardBucket struct {
+	mutable        *skl.Skiplist
+	writeCh        chan operation
+	flushCh        chan flushEvent
+	writeWaitGroup *sync.WaitGroup
+	flushWaitGroup *sync.WaitGroup
+	log            *logger.Logger
+	immutables     []*skl.Skiplist
+	index          int
+	flushSize      int
+	size           int
+	mutex          sync.RWMutex
+}
+
+// Buffer is an exported struct that represents a buffer composed of multiple shard buckets.
+type Buffer struct {
+	onFlushFn      onFlush
+	entryCloser    *run.Closer
+	log            *logger.Logger
+	buckets        []bufferShardBucket
+	writeWaitGroup sync.WaitGroup
+	flushWaitGroup sync.WaitGroup
+	numShards      int
+	closerOnce     sync.Once
+}
+
+// NewBuffer creates a new Buffer instance with the given parameters.
+func NewBuffer(log *logger.Logger, flushSize, writeConcurrency, numShards int, onFlushFn onFlush) (*Buffer, error) {
+	size := flushSize
+	if size < defaultSize {
+		size = defaultSize
+	}
+	buckets := make([]bufferShardBucket, numShards)
+	buffer := &Buffer{
+		buckets:     buckets,
+		numShards:   numShards,
+		onFlushFn:   onFlushFn,
+		entryCloser: run.NewCloser(1),
+		log:         log.Named("buffer"),
+	}
+	buffer.writeWaitGroup.Add(numShards)
+	buffer.flushWaitGroup.Add(numShards)
+	for i := 0; i < numShards; i++ {
+		buckets[i] = bufferShardBucket{
+			index:          i,
+			size:           size,
+			mutable:        skl.NewSkiplist(int64(size)),
+			flushSize:      flushSize,
+			writeCh:        make(chan operation, writeConcurrency),
+			flushCh:        make(chan flushEvent, 1),
+			writeWaitGroup: &buffer.writeWaitGroup,
+			flushWaitGroup: &buffer.flushWaitGroup,
+			log:            buffer.log.Named(fmt.Sprintf("shard-%d", i)),
+		}
+		buckets[i].start(onFlushFn)
+	}
+	return buffer, nil
+}
+
+// Write adds a key-value pair with a timestamp to the appropriate shard bucket in the buffer.
+func (b *Buffer) Write(key, value []byte, timestamp time.Time) {
+	if !b.entryCloser.AddRunning() {
+		return
+	}
+	defer b.entryCloser.Done()
+	index := b.getShardIndex(key)
+	if b.log.Debug().Enabled() {
+		b.log.Debug().Uint64("shard", index).Bytes("key", key).
+			Time("ts", timestamp).Msg("route a shard")
+	}
+	b.buckets[index].writeCh <- operation{key: key, value: value, epoch: uint64(timestamp.UnixNano())}
+}
+
+// Read retrieves the value associated with the given key and timestamp from the appropriate shard bucket in the buffer.
+func (b *Buffer) Read(key []byte, ts time.Time) ([]byte, bool) {
+	keyWithTS := y.KeyWithTs(key, uint64(ts.UnixNano()))
+	index := b.getShardIndex(key)
+	epoch := uint64(ts.UnixNano())
+	for _, bk := range b.buckets[index].getAll() {
+		value := bk.Get(keyWithTS)
+		if value.Meta == 0 && value.Value == nil {
+			continue
+		}
+		if value.Version == epoch {
+			return value.Value, true
+		}
+	}
+	return nil, false
+}
+
+// Close gracefully closes the Buffer and ensures that all pending operations are completed.
+func (b *Buffer) Close() {
+	b.closerOnce.Do(func() {
+		b.entryCloser.Done()
+		b.entryCloser.CloseThenWait()
+		for i := 0; i < b.numShards; i++ {
+			close(b.buckets[i].writeCh)
+		}
+		b.writeWaitGroup.Wait()
+		for i := 0; i < b.numShards; i++ {
+			if err := b.onFlushFn(i, b.buckets[i].mutable); err != nil {
+				b.buckets[i].log.Err(err).Msg("flushing mutable buffer failed")
+			}
+			b.buckets[i].mutable.DecrRef()
+		}
+		for i := 0; i < b.numShards; i++ {
+			close(b.buckets[i].flushCh)
+		}
+		b.flushWaitGroup.Wait()
+	})
+}
+
+func (b *Buffer) getShardIndex(key []byte) uint64 {
+	return convert.Hash(key) % uint64(b.numShards)
+}
+
+func (bsb *bufferShardBucket) getAll() []*skl.Skiplist {
+	bsb.mutex.RLock()
+	defer bsb.mutex.RUnlock()
+	allList := make([]*skl.Skiplist, len(bsb.immutables)+1)
+	bsb.mutable.IncrRef()
+	allList[0] = bsb.mutable
+	last := len(bsb.immutables) - 1
+	for i := range bsb.immutables {
+		allList[i+1] = bsb.immutables[last-i]
+		bsb.immutables[last-i].IncrRef()
+	}
+	return allList
+}
+
+func (bsb *bufferShardBucket) start(onFlushFn onFlush) {
+	go func() {
+		defer bsb.flushWaitGroup.Done()
+		for event := range bsb.flushCh {
+			oldSkipList := event.data
+			if err := onFlushFn(bsb.index, oldSkipList); err != nil {
+				bsb.log.Err(err).Msg("flushing immutable buffer failed")
+				continue
+			}
+			bsb.mutex.Lock()
+			bsb.immutables = bsb.immutables[1:]
+			oldSkipList.DecrRef()
+			bsb.mutex.Unlock()
+		}
+	}()
+	go func() {
+		defer bsb.writeWaitGroup.Done()
+		for op := range bsb.writeCh {
+			bsb.mutex.Lock()
+			if bsb.mutable.MemSize() >= int64(bsb.flushSize) {
+				select {
+				case bsb.flushCh <- flushEvent{data: bsb.mutable}:
+				default:
+				}
+				bsb.swap()
+			}
+			bsb.mutex.Unlock()
+			bsb.mutable.Put(y.KeyWithTs(op.key, op.epoch), y.ValueStruct{Value: op.value, Version: op.epoch})
+		}
+	}()
+}
+
+func (bsb *bufferShardBucket) swap() {
+	bsb.immutables = append(bsb.immutables, bsb.mutable)
+	bsb.mutable = skl.NewSkiplist(int64(bsb.size))
+}
diff --git a/banyand/tsdb/buffer_test.go b/banyand/tsdb/buffer_test.go
new file mode 100644
index 00000000..0cffedc5
--- /dev/null
+++ b/banyand/tsdb/buffer_test.go
@@ -0,0 +1,147 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//	http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package tsdb_test
+
+import (
+	"bytes"
+	"crypto/rand"
+	"fmt"
+	"math/big"
+	"sync"
+	"time"
+
+	"github.com/dgraph-io/badger/v3/skl"
+	. "github.com/onsi/ginkgo/v2"
+	. "github.com/onsi/gomega"
+	"github.com/onsi/gomega/gleak"
+
+	"github.com/apache/skywalking-banyandb/banyand/tsdb"
+	"github.com/apache/skywalking-banyandb/pkg/logger"
+	"github.com/apache/skywalking-banyandb/pkg/test/flags"
+)
+
+var _ = Describe("Buffer", func() {
+	var (
+		buffer *tsdb.Buffer
+		log    = logger.GetLogger("buffer-test")
+		goods  []gleak.Goroutine
+	)
+
+	BeforeEach(func() {
+		goods = gleak.Goroutines()
+	})
+	AfterEach(func() {
+		Eventually(gleak.Goroutines, flags.EventuallyTimeout).ShouldNot(gleak.HaveLeaked(goods))
+	})
+	Context("Write and Read", func() {
+		BeforeEach(func() {
+			var err error
+			buffer, err = tsdb.NewBuffer(log, 1024*1024, 16, 4, func(shardIndex int, skl *skl.Skiplist) error {
+				return nil
+			})
+			Expect(err).ToNot(HaveOccurred())
+		})
+
+		AfterEach(func() {
+			buffer.Close()
+		})
+		It("should write and read data correctly", func() {
+			var wg sync.WaitGroup
+			wg.Add(100)
+
+			for i := 0; i < 100; i++ {
+				go func(idx int) {
+					defer GinkgoRecover()
+					defer wg.Done()
+
+					key := []byte(fmt.Sprintf("key-%d", idx))
+					value := []byte(fmt.Sprintf("value-%d", idx))
+					ts := time.Now()
+
+					buffer.Write(key, value, ts)
+					Eventually(func(g Gomega) {
+						readValue, ok := buffer.Read(key, ts)
+						g.Expect(ok).To(BeTrue())
+						g.Expect(bytes.Equal(value, readValue)).To(BeTrue())
+					}, flags.EventuallyTimeout).Should(Succeed())
+				}(i)
+			}
+
+			wg.Wait()
+		})
+	})
+
+	Context("Flush", func() {
+		It("should trigger flush when buffer size exceeds the limit", func() {
+			numShards := 4
+			doneChs := make([]chan struct{}, numShards)
+			for i := 0; i < numShards; i++ {
+				doneChs[i] = make(chan struct{})
+			}
+
+			onFlushFn := func(shardIndex int, skl *skl.Skiplist) error {
+				if doneChs[shardIndex] == nil {
+					return nil
+				}
+				close(doneChs[shardIndex])
+				doneChs[shardIndex] = nil
+				return nil
+			}
+
+			var wg sync.WaitGroup
+			wg.Add(numShards)
+
+			for _, ch := range doneChs {
+				go func(c <-chan struct{}) {
+					select {
+					case res := <-c:
+						GinkgoWriter.Printf("Received value: %d\n", res)
+					case <-time.After(10 * time.Second):
+						GinkgoWriter.Printf("Timeout")
+					}
+					wg.Done()
+				}(ch)
+			}
+
+			buffer, err := tsdb.NewBuffer(log, 1024, 16, numShards, onFlushFn)
+			defer buffer.Close()
+			Expect(err).ToNot(HaveOccurred())
+
+			randInt := func() int {
+				n, err := rand.Int(rand.Reader, big.NewInt(1000))
+				if err != nil {
+					return 0
+				}
+				return int(n.Int64())
+			}
+			for i := 0; i < 1000; i++ {
+				key := fmt.Sprintf("key-%d", randInt())
+				value := fmt.Sprintf("value-%d", randInt())
+				ts := time.Now()
+
+				buffer.Write([]byte(key), []byte(value), ts)
+			}
+
+			wg.Wait()
+			for i, elem := range doneChs {
+				if elem != nil {
+					Fail(fmt.Sprintf("%d in doneChs is not nil", i))
+				}
+			}
+		})
+	})
+})