You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ha...@apache.org on 2023/03/23 00:03:07 UTC
[skywalking-banyandb] 01/01: Add a sharded buffer to ingest data
This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch tsdb-buffer
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 0bb5d0b9bb994353c191cf4049008c3f6001f7d8
Author: Gao Hongtao <ha...@gmail.com>
AuthorDate: Thu Mar 23 00:02:30 2023 +0000
Add a sharded buffer to ingest data
Signed-off-by: Gao Hongtao <ha...@gmail.com>
---
banyand/tsdb/buffer.go | 212 ++++++++++++++++++++++++++++++++++++++++++++
banyand/tsdb/buffer_test.go | 147 ++++++++++++++++++++++++++++++
2 files changed, 359 insertions(+)
diff --git a/banyand/tsdb/buffer.go b/banyand/tsdb/buffer.go
new file mode 100644
index 00000000..0e351f3a
--- /dev/null
+++ b/banyand/tsdb/buffer.go
@@ -0,0 +1,212 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package tsdb
+
+import (
+ "fmt"
+ "sync"
+ "time"
+
+ "github.com/dgraph-io/badger/v3/skl"
+ "github.com/dgraph-io/badger/v3/y"
+
+ "github.com/apache/skywalking-banyandb/pkg/convert"
+ "github.com/apache/skywalking-banyandb/pkg/logger"
+ "github.com/apache/skywalking-banyandb/pkg/run"
+)
+
+const defaultSize = 1 << 20 // 1MB
+
+type operation struct {
+ key []byte
+ value []byte
+ epoch uint64
+}
+
+type flushEvent struct {
+ data *skl.Skiplist
+}
+
+type onFlush func(shardIndex int, skl *skl.Skiplist) error
+
+type bufferShardBucket struct {
+ mutable *skl.Skiplist
+ writeCh chan operation
+ flushCh chan flushEvent
+ writeWaitGroup *sync.WaitGroup
+ flushWaitGroup *sync.WaitGroup
+ log *logger.Logger
+ immutables []*skl.Skiplist
+ index int
+ flushSize int
+ size int
+ mutex sync.RWMutex
+}
+
+// Buffer is an exported struct that represents a buffer composed of multiple shard buckets.
+type Buffer struct {
+ onFlushFn onFlush
+ entryCloser *run.Closer
+ log *logger.Logger
+ buckets []bufferShardBucket
+ writeWaitGroup sync.WaitGroup
+ flushWaitGroup sync.WaitGroup
+ numShards int
+ closerOnce sync.Once
+}
+
+// NewBuffer creates a new Buffer instance with the given parameters.
+func NewBuffer(log *logger.Logger, flushSize, writeConcurrency, numShards int, onFlushFn onFlush) (*Buffer, error) {
+ size := flushSize
+ if size < defaultSize {
+ size = defaultSize
+ }
+ buckets := make([]bufferShardBucket, numShards)
+ buffer := &Buffer{
+ buckets: buckets,
+ numShards: numShards,
+ onFlushFn: onFlushFn,
+ entryCloser: run.NewCloser(1),
+ log: log.Named("buffer"),
+ }
+ buffer.writeWaitGroup.Add(numShards)
+ buffer.flushWaitGroup.Add(numShards)
+ for i := 0; i < numShards; i++ {
+ buckets[i] = bufferShardBucket{
+ index: i,
+ size: size,
+ mutable: skl.NewSkiplist(int64(size)),
+ flushSize: flushSize,
+ writeCh: make(chan operation, writeConcurrency),
+ flushCh: make(chan flushEvent, 1),
+ writeWaitGroup: &buffer.writeWaitGroup,
+ flushWaitGroup: &buffer.flushWaitGroup,
+ log: buffer.log.Named(fmt.Sprintf("shard-%d", i)),
+ }
+ buckets[i].start(onFlushFn)
+ }
+ return buffer, nil
+}
+
+// Write adds a key-value pair with a timestamp to the appropriate shard bucket in the buffer.
+func (b *Buffer) Write(key, value []byte, timestamp time.Time) {
+ if !b.entryCloser.AddRunning() {
+ return
+ }
+ defer b.entryCloser.Done()
+ index := b.getShardIndex(key)
+ if b.log.Debug().Enabled() {
+ b.log.Debug().Uint64("shard", index).Bytes("key", key).
+ Time("ts", timestamp).Msg("route a shard")
+ }
+ b.buckets[index].writeCh <- operation{key: key, value: value, epoch: uint64(timestamp.UnixNano())}
+}
+
+// Read retrieves the value associated with the given key and timestamp from the appropriate shard bucket in the buffer.
+func (b *Buffer) Read(key []byte, ts time.Time) ([]byte, bool) {
+ keyWithTS := y.KeyWithTs(key, uint64(ts.UnixNano()))
+ index := b.getShardIndex(key)
+ epoch := uint64(ts.UnixNano())
+ for _, bk := range b.buckets[index].getAll() {
+ value := bk.Get(keyWithTS)
+ if value.Meta == 0 && value.Value == nil {
+ continue
+ }
+ if value.Version == epoch {
+ return value.Value, true
+ }
+ }
+ return nil, false
+}
+
+// Close gracefully closes the Buffer and ensures that all pending operations are completed.
+func (b *Buffer) Close() {
+ b.closerOnce.Do(func() {
+ b.entryCloser.Done()
+ b.entryCloser.CloseThenWait()
+ for i := 0; i < b.numShards; i++ {
+ close(b.buckets[i].writeCh)
+ }
+ b.writeWaitGroup.Wait()
+ for i := 0; i < b.numShards; i++ {
+ if err := b.onFlushFn(i, b.buckets[i].mutable); err != nil {
+ b.buckets[i].log.Err(err).Msg("flushing mutable buffer failed")
+ }
+ b.buckets[i].mutable.DecrRef()
+ }
+ for i := 0; i < b.numShards; i++ {
+ close(b.buckets[i].flushCh)
+ }
+ b.flushWaitGroup.Wait()
+ })
+}
+
+func (b *Buffer) getShardIndex(key []byte) uint64 {
+ return convert.Hash(key) % uint64(b.numShards)
+}
+
+func (bsb *bufferShardBucket) getAll() []*skl.Skiplist {
+ bsb.mutex.RLock()
+ defer bsb.mutex.RUnlock()
+ allList := make([]*skl.Skiplist, len(bsb.immutables)+1)
+ bsb.mutable.IncrRef()
+ allList[0] = bsb.mutable
+ last := len(bsb.immutables) - 1
+ for i := range bsb.immutables {
+ allList[i+1] = bsb.immutables[last-i]
+ bsb.immutables[last-i].IncrRef()
+ }
+ return allList
+}
+
+func (bsb *bufferShardBucket) start(onFlushFn onFlush) {
+ go func() {
+ defer bsb.flushWaitGroup.Done()
+ for event := range bsb.flushCh {
+ oldSkipList := event.data
+ if err := onFlushFn(bsb.index, oldSkipList); err != nil {
+ bsb.log.Err(err).Msg("flushing immutable buffer failed")
+ continue
+ }
+ bsb.mutex.Lock()
+ bsb.immutables = bsb.immutables[1:]
+ oldSkipList.DecrRef()
+ bsb.mutex.Unlock()
+ }
+ }()
+ go func() {
+ defer bsb.writeWaitGroup.Done()
+ for op := range bsb.writeCh {
+ bsb.mutex.Lock()
+ if bsb.mutable.MemSize() >= int64(bsb.flushSize) {
+ select {
+ case bsb.flushCh <- flushEvent{data: bsb.mutable}:
+ default:
+ }
+ bsb.swap()
+ }
+ bsb.mutex.Unlock()
+ bsb.mutable.Put(y.KeyWithTs(op.key, op.epoch), y.ValueStruct{Value: op.value, Version: op.epoch})
+ }
+ }()
+}
+
+func (bsb *bufferShardBucket) swap() {
+ bsb.immutables = append(bsb.immutables, bsb.mutable)
+ bsb.mutable = skl.NewSkiplist(int64(bsb.size))
+}
diff --git a/banyand/tsdb/buffer_test.go b/banyand/tsdb/buffer_test.go
new file mode 100644
index 00000000..0cffedc5
--- /dev/null
+++ b/banyand/tsdb/buffer_test.go
@@ -0,0 +1,147 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package tsdb_test
+
+import (
+ "bytes"
+ "crypto/rand"
+ "fmt"
+ "math/big"
+ "sync"
+ "time"
+
+ "github.com/dgraph-io/badger/v3/skl"
+ . "github.com/onsi/ginkgo/v2"
+ . "github.com/onsi/gomega"
+ "github.com/onsi/gomega/gleak"
+
+ "github.com/apache/skywalking-banyandb/banyand/tsdb"
+ "github.com/apache/skywalking-banyandb/pkg/logger"
+ "github.com/apache/skywalking-banyandb/pkg/test/flags"
+)
+
+var _ = Describe("Buffer", func() {
+ var (
+ buffer *tsdb.Buffer
+ log = logger.GetLogger("buffer-test")
+ goods []gleak.Goroutine
+ )
+
+ BeforeEach(func() {
+ goods = gleak.Goroutines()
+ })
+ AfterEach(func() {
+ Eventually(gleak.Goroutines, flags.EventuallyTimeout).ShouldNot(gleak.HaveLeaked(goods))
+ })
+ Context("Write and Read", func() {
+ BeforeEach(func() {
+ var err error
+ buffer, err = tsdb.NewBuffer(log, 1024*1024, 16, 4, func(shardIndex int, skl *skl.Skiplist) error {
+ return nil
+ })
+ Expect(err).ToNot(HaveOccurred())
+ })
+
+ AfterEach(func() {
+ buffer.Close()
+ })
+ It("should write and read data correctly", func() {
+ var wg sync.WaitGroup
+ wg.Add(100)
+
+ for i := 0; i < 100; i++ {
+ go func(idx int) {
+ defer GinkgoRecover()
+ defer wg.Done()
+
+ key := []byte(fmt.Sprintf("key-%d", idx))
+ value := []byte(fmt.Sprintf("value-%d", idx))
+ ts := time.Now()
+
+ buffer.Write(key, value, ts)
+ Eventually(func(g Gomega) {
+ readValue, ok := buffer.Read(key, ts)
+ g.Expect(ok).To(BeTrue())
+ g.Expect(bytes.Equal(value, readValue)).To(BeTrue())
+ }, flags.EventuallyTimeout).Should(Succeed())
+ }(i)
+ }
+
+ wg.Wait()
+ })
+ })
+
+ Context("Flush", func() {
+ It("should trigger flush when buffer size exceeds the limit", func() {
+ numShards := 4
+ doneChs := make([]chan struct{}, numShards)
+ for i := 0; i < numShards; i++ {
+ doneChs[i] = make(chan struct{})
+ }
+
+ onFlushFn := func(shardIndex int, skl *skl.Skiplist) error {
+ if doneChs[shardIndex] == nil {
+ return nil
+ }
+ close(doneChs[shardIndex])
+ doneChs[shardIndex] = nil
+ return nil
+ }
+
+ var wg sync.WaitGroup
+ wg.Add(numShards)
+
+ for _, ch := range doneChs {
+ go func(c <-chan struct{}) {
+ select {
+ case res := <-c:
+ GinkgoWriter.Printf("Received value: %d\n", res)
+ case <-time.After(10 * time.Second):
+ GinkgoWriter.Printf("Timeout")
+ }
+ wg.Done()
+ }(ch)
+ }
+
+ buffer, err := tsdb.NewBuffer(log, 1024, 16, numShards, onFlushFn)
+ defer buffer.Close()
+ Expect(err).ToNot(HaveOccurred())
+
+ randInt := func() int {
+ n, err := rand.Int(rand.Reader, big.NewInt(1000))
+ if err != nil {
+ return 0
+ }
+ return int(n.Int64())
+ }
+ for i := 0; i < 1000; i++ {
+ key := fmt.Sprintf("key-%d", randInt())
+ value := fmt.Sprintf("value-%d", randInt())
+ ts := time.Now()
+
+ buffer.Write([]byte(key), []byte(value), ts)
+ }
+
+ wg.Wait()
+ for i, elem := range doneChs {
+ if elem != nil {
+ Fail(fmt.Sprintf("%d in doneChs is not nil", i))
+ }
+ }
+ })
+ })
+})