You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ha...@apache.org on 2022/04/14 12:08:50 UTC

[skywalking-banyandb] branch patch-inverted-index created (now e98a5fc)

This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a change to branch patch-inverted-index
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


      at e98a5fc  Trigger inverted index flushing on flushing the main store

This branch includes the following new commits:

     new e98a5fc  Trigger inverted index flushing on flushing the main store

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[skywalking-banyandb] 01/01: Trigger inverted index flushing on flushing the main store

Posted by ha...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch patch-inverted-index
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit e98a5fca497417478644033d51c13e698671bec1
Author: Gao Hongtao <ha...@gmail.com>
AuthorDate: Thu Apr 14 11:41:03 2022 +0000

    Trigger inverted index flushing on flushing the main store
    
    Signed-off-by: Gao Hongtao <ha...@gmail.com>
---
 banyand/kv/kv.go      |  8 ++++++++
 banyand/tsdb/block.go | 26 ++++++++++++++++++++++++++
 pkg/index/index.go    |  2 ++
 pkg/index/lsm/lsm.go  |  4 ++++
 4 files changed, 40 insertions(+)

diff --git a/banyand/kv/kv.go b/banyand/kv/kv.go
index 103959f..4f0b795 100644
--- a/banyand/kv/kv.go
+++ b/banyand/kv/kv.go
@@ -118,6 +118,14 @@ func TSSWithEncoding(encoderPool encoding.SeriesEncoderPool, decoderPool encodin
 	}
 }
 
+func TSSWithFlushCallback(callback func()) TimeSeriesOptions {
+	return func(store TimeSeriesStore) {
+		if btss, ok := store.(*badgerTSS); ok {
+			btss.dbOpts.FlushCallBack = callback
+		}
+	}
+}
+
 type Iterator interface {
 	Next()
 	Rewind()
diff --git a/banyand/tsdb/block.go b/banyand/tsdb/block.go
index 3296b26..88d88c2 100644
--- a/banyand/tsdb/block.go
+++ b/banyand/tsdb/block.go
@@ -66,6 +66,7 @@ type block struct {
 	segID          uint16
 	blockID        uint16
 	encodingMethod EncodingMethod
+	flushCh        chan struct{}
 }
 
 type blockOpts struct {
@@ -119,6 +120,9 @@ func (b *block) open() (err error) {
 		path.Join(b.path, componentMain),
 		kv.TSSWithEncoding(b.encodingMethod.EncoderPool, b.encodingMethod.DecoderPool),
 		kv.TSSWithLogger(b.l.Named(componentMain)),
+		kv.TSSWithFlushCallback(func() {
+
+		}),
 	); err != nil {
 		return err
 	}
@@ -143,6 +147,16 @@ func (b *block) open() (err error) {
 	}
 	b.closableLst = append(b.closableLst, b.invertedIndex, b.lsmIndex)
 	b.closed.Store(false)
+	b.flushCh = make(chan struct{})
+	go func() {
+		for {
+			_, more := <-b.flushCh
+			if !more {
+				return
+			}
+			b.flush()
+		}
+	}()
 	return nil
 }
 
@@ -164,6 +178,17 @@ func (b *block) incRef() {
 	b.ref.AddRunning(1)
 }
 
+func (b *block) flush() {
+	for i := 0; i < 10; i++ {
+		err := b.invertedIndex.Flush()
+		if err == nil {
+			break
+		}
+		time.Sleep(time.Second)
+		b.l.Warn().Err(err).Int("retried", i).Msg("failed to flush inverted index")
+	}
+}
+
 func (b *block) close() {
 	b.lock.Lock()
 	defer b.lock.Unlock()
@@ -176,6 +201,7 @@ func (b *block) close() {
 		_ = closer.Close()
 	}
 	b.closed.Store(true)
+	close(b.flushCh)
 }
 
 func (b *block) isClosed() bool {
diff --git a/pkg/index/index.go b/pkg/index/index.go
index 82acfd8..ebedc32 100644
--- a/pkg/index/index.go
+++ b/pkg/index/index.go
@@ -177,4 +177,6 @@ type Store interface {
 	io.Closer
 	Writer
 	Searcher
+	// Flush flushed memory data to disk
+	Flush() error
 }
diff --git a/pkg/index/lsm/lsm.go b/pkg/index/lsm/lsm.go
index 12d5a6c..a6a3557 100644
--- a/pkg/index/lsm/lsm.go
+++ b/pkg/index/lsm/lsm.go
@@ -37,6 +37,10 @@ type store struct {
 	l            *logger.Logger
 }
 
+func (*store) Flush() error {
+	panic("do not call flush here. LSM index is using its own controller to flush memory data")
+}
+
 func (s *store) Stats() observability.Statistics {
 	return s.lsm.Stats()
 }