You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ha...@apache.org on 2022/11/21 13:50:40 UTC

[skywalking-banyandb] branch index created (now bef1b55)

This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a change to branch index
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


      at bef1b55  Sync inverted index writing

This branch includes the following new commits:

     new bef1b55  Sync inverted index writing

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[skywalking-banyandb] 01/01: Sync inverted index writing

Posted by ha...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch index
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit bef1b550a211c8fe2cd5c84a03f2e18d587fe78f
Author: Gao Hongtao <ha...@gmail.com>
AuthorDate: Mon Nov 21 13:48:45 2022 +0000

    Sync inverted index writing
    
    Signed-off-by: Gao Hongtao <ha...@gmail.com>
---
 banyand/measure/measure_query.go    |   1 -
 banyand/measure/measure_write.go    |  23 +-------
 banyand/stream/stream_query.go      |   2 -
 banyand/stream/stream_write.go      |  22 +------
 banyand/tsdb/index/writer.go        |  56 +++++-------------
 pkg/index/inverted/inverted.go      | 113 ++++++++++++++++++++++++++++++++----
 pkg/index/inverted/inverted_test.go |   3 +
 7 files changed, 122 insertions(+), 98 deletions(-)

diff --git a/banyand/measure/measure_query.go b/banyand/measure/measure_query.go
index 89f1b13..90f77c5 100644
--- a/banyand/measure/measure_query.go
+++ b/banyand/measure/measure_query.go
@@ -44,7 +44,6 @@ type Query interface {
 
 type Measure interface {
 	io.Closer
-	Write(value *measurev1.DataPointValue) error
 	Shards(entity tsdb.Entity) ([]tsdb.Shard, error)
 	CompanionShards(metadata *commonv1.Metadata) ([]tsdb.Shard, error)
 	Shard(id common.ShardID) (tsdb.Shard, error)
diff --git a/banyand/measure/measure_write.go b/banyand/measure/measure_write.go
index 7cd8224..0e4b4fe 100644
--- a/banyand/measure/measure_write.go
+++ b/banyand/measure/measure_write.go
@@ -38,25 +38,7 @@ import (
 
 var ErrMalformedElement = errors.New("element is malformed")
 
-// Write is for testing
-func (s *measure) Write(value *measurev1.DataPointValue) error {
-	entity, shardID, err := s.entityLocator.Locate(s.name, value.GetTagFamilies(), s.shardNum)
-	if err != nil {
-		return err
-	}
-	waitCh := make(chan struct{})
-	err = s.write(shardID, tsdb.HashEntity(entity), value, func() {
-		close(waitCh)
-	})
-	if err != nil {
-		close(waitCh)
-		return err
-	}
-	<-waitCh
-	return nil
-}
-
-func (s *measure) write(shardID common.ShardID, seriesHashKey []byte, value *measurev1.DataPointValue, cb index.CallbackFn) error {
+func (s *measure) write(shardID common.ShardID, seriesHashKey []byte, value *measurev1.DataPointValue) error {
 	t := value.GetTimestamp().AsTime().Local()
 	if err := timestamp.Check(t); err != nil {
 		return errors.WithMessage(err, "writing stream")
@@ -141,7 +123,6 @@ func (s *measure) write(shardID common.ShardID, seriesHashKey []byte, value *mea
 			Timestamp:   value.GetTimestamp().AsTime(),
 		},
 		BlockCloser: wp,
-		Cb:          cb,
 	}
 	s.indexWriter.Write(m)
 	s.processorManager.onMeasureWrite(&measurev1.WriteRequest{
@@ -175,7 +156,7 @@ func (w *writeCallback) Rev(message bus.Message) (resp bus.Message) {
 		w.l.Warn().Msg("cannot find measure definition")
 		return
 	}
-	err := stm.write(common.ShardID(writeEvent.GetShardId()), writeEvent.GetSeriesHash(), writeEvent.GetRequest().GetDataPoint(), nil)
+	err := stm.write(common.ShardID(writeEvent.GetShardId()), writeEvent.GetSeriesHash(), writeEvent.GetRequest().GetDataPoint())
 	if err != nil {
 		w.l.Error().Err(err).Msg("fail to write entity")
 	}
diff --git a/banyand/stream/stream_query.go b/banyand/stream/stream_query.go
index 218fc6c..1f89be8 100644
--- a/banyand/stream/stream_query.go
+++ b/banyand/stream/stream_query.go
@@ -27,7 +27,6 @@ import (
 	commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
 	databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
 	modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
-	streamv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
 	"github.com/apache/skywalking-banyandb/banyand/tsdb"
 	"github.com/apache/skywalking-banyandb/pkg/partition"
 )
@@ -40,7 +39,6 @@ type Query interface {
 
 type Stream interface {
 	io.Closer
-	Write(value *streamv1.ElementValue) error
 	Shards(entity tsdb.Entity) ([]tsdb.Shard, error)
 	Shard(id common.ShardID) (tsdb.Shard, error)
 	ParseTagFamily(family string, item tsdb.Item) (*modelv1.TagFamily, error)
diff --git a/banyand/stream/stream_write.go b/banyand/stream/stream_write.go
index af03580..ea4098c 100644
--- a/banyand/stream/stream_write.go
+++ b/banyand/stream/stream_write.go
@@ -51,24 +51,7 @@ func init() {
 	)
 }
 
-func (s *stream) Write(value *streamv1.ElementValue) error {
-	entity, shardID, err := s.entityLocator.Locate(s.name, value.GetTagFamilies(), s.shardNum)
-	if err != nil {
-		return err
-	}
-	waitCh := make(chan struct{})
-	err = s.write(shardID, tsdb.HashEntity(entity), value, func() {
-		close(waitCh)
-	})
-	if err != nil {
-		close(waitCh)
-		return err
-	}
-	<-waitCh
-	return nil
-}
-
-func (s *stream) write(shardID common.ShardID, seriesHashKey []byte, value *streamv1.ElementValue, cb index.CallbackFn) error {
+func (s *stream) write(shardID common.ShardID, seriesHashKey []byte, value *streamv1.ElementValue) error {
 	tp := value.GetTimestamp().AsTime()
 	if err := timestamp.Check(tp); err != nil {
 		return errors.WithMessage(err, "writing stream")
@@ -142,7 +125,6 @@ func (s *stream) write(shardID common.ShardID, seriesHashKey []byte, value *stre
 			Timestamp:   value.GetTimestamp().AsTime(),
 		},
 		BlockCloser: wp,
-		Cb:          cb,
 	}
 	s.indexWriter.Write(m)
 	return err
@@ -172,7 +154,7 @@ func (w *writeCallback) Rev(message bus.Message) (resp bus.Message) {
 		w.l.Warn().Msg("cannot find stream definition")
 		return
 	}
-	err := stm.write(common.ShardID(writeEvent.GetShardId()), writeEvent.GetSeriesHash(), writeEvent.GetRequest().GetElement(), nil)
+	err := stm.write(common.ShardID(writeEvent.GetShardId()), writeEvent.GetSeriesHash(), writeEvent.GetRequest().GetElement())
 	if err != nil {
 		w.l.Error().Err(err).Msg("fail to write entity")
 	}
diff --git a/banyand/tsdb/index/writer.go b/banyand/tsdb/index/writer.go
index 1dfd30b..a410bd8 100644
--- a/banyand/tsdb/index/writer.go
+++ b/banyand/tsdb/index/writer.go
@@ -33,7 +33,6 @@ import (
 	"github.com/apache/skywalking-banyandb/pkg/logger"
 	"github.com/apache/skywalking-banyandb/pkg/partition"
 	pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
-	"github.com/apache/skywalking-banyandb/pkg/run"
 )
 
 type CallbackFn func()
@@ -43,7 +42,6 @@ type Message struct {
 	Value       Value
 	LocalWriter tsdb.Writer
 	BlockCloser io.Closer
-	Cb          CallbackFn
 }
 
 type Value struct {
@@ -71,9 +69,7 @@ type Writer struct {
 	db                tsdb.Supplier
 	shardNum          uint32
 	enableGlobalIndex bool
-	ch                chan Message
 	invertRuleIndex   map[byte][]*partition.IndexRuleLocator
-	closer            *run.Closer
 }
 
 func NewWriter(ctx context.Context, options WriterOptions) *Writer {
@@ -93,68 +89,42 @@ func NewWriter(ctx context.Context, options WriterOptions) *Writer {
 		var key byte
 		switch rule.GetLocation() {
 		case databasev1.IndexRule_LOCATION_SERIES:
-			key = key | local
+			key |= local
 		case databasev1.IndexRule_LOCATION_GLOBAL:
 			if !w.enableGlobalIndex {
 				w.l.Warn().RawJSON("index-rule", logger.Proto(ruleIndex.Rule)).Msg("global index is disabled")
 				continue
 			}
-			key = key | global
+			key |= global
 		}
 		switch rule.Type {
 		case databasev1.IndexRule_TYPE_INVERTED:
-			key = key | inverted
+			key |= inverted
 		case databasev1.IndexRule_TYPE_TREE:
-			key = key | tree
+			key |= tree
 		}
 		rules := w.invertRuleIndex[key]
 		rules = append(rules, ruleIndex)
 		w.invertRuleIndex[key] = rules
 	}
-	w.ch = make(chan Message)
-	w.bootIndexGenerator()
-	w.closer = run.NewCloser(0)
 	return w
 }
 
-func (s *Writer) Write(value Message) {
-	go func(m Message) {
-		if !s.closer.AddRunning() {
-			return
-		}
-		defer s.closer.Done()
-		select {
-		case <-s.closer.CloseNotify():
-			return
-		case s.ch <- m:
-		}
-	}(value)
+func (s *Writer) Write(m Message) {
+	err := multierr.Combine(
+		s.writeLocalIndex(m.LocalWriter, m.Value),
+		s.writeGlobalIndex(m.Scope, m.LocalWriter.ItemID(), m.Value),
+		m.BlockCloser.Close(),
+	)
+	if err != nil {
+		s.l.Error().Err(err).Msg("encounter some errors when generating indices")
+	}
 }
 
 func (s *Writer) Close() error {
-	s.closer.CloseThenWait()
-	close(s.ch)
 	return nil
 }
 
-func (s *Writer) bootIndexGenerator() {
-	go func() {
-		for m := range s.ch {
-			err := multierr.Combine(
-				s.writeLocalIndex(m.LocalWriter, m.Value),
-				s.writeGlobalIndex(m.Scope, m.LocalWriter.ItemID(), m.Value),
-				m.BlockCloser.Close(),
-			)
-			if err != nil {
-				s.l.Error().Err(err).Msg("encounter some errors when generating indices")
-			}
-			if m.Cb != nil {
-				m.Cb()
-			}
-		}
-	}()
-}
-
 // TODO: should listen to pipeline in a distributed cluster
 func (s *Writer) writeGlobalIndex(scope tsdb.Entry, ref tsdb.GlobalItemID, value Value) error {
 	collect := func(ruleIndexes []*partition.IndexRuleLocator, fn func(indexWriter tsdb.IndexWriter, fields []index.Field) error) error {
diff --git a/pkg/index/inverted/inverted.go b/pkg/index/inverted/inverted.go
index 4f680b8..31e57e9 100644
--- a/pkg/index/inverted/inverted.go
+++ b/pkg/index/inverted/inverted.go
@@ -23,6 +23,7 @@ import (
 	"errors"
 	"log"
 	"math"
+	"time"
 
 	"github.com/blugelabs/bluge"
 	"github.com/blugelabs/bluge/analysis"
@@ -40,9 +41,13 @@ import (
 	"github.com/apache/skywalking-banyandb/pkg/index/posting"
 	"github.com/apache/skywalking-banyandb/pkg/index/posting/roaring"
 	"github.com/apache/skywalking-banyandb/pkg/logger"
+	"github.com/apache/skywalking-banyandb/pkg/run"
 )
 
-const docID = "_id"
+const (
+	docID     = "_id"
+	batchSize = 1024
+)
 
 var analyzers map[databasev1.IndexRule_Analyzer]*analysis.Analyzer
 
@@ -61,8 +66,20 @@ type StoreOpts struct {
 	Logger *logger.Logger
 }
 
+type doc struct {
+	fields []index.Field
+	itemID common.ItemID
+}
+
+type flushEvent struct {
+	onComplete chan struct{}
+}
+
 type store struct {
 	writer *bluge.Writer
+	ch     chan any
+	closer *run.Closer
+	l      *logger.Logger
 }
 
 func NewStore(opts StoreOpts) (index.Store, error) {
@@ -73,9 +90,14 @@ func NewStore(opts StoreOpts) (index.Store, error) {
 	if err != nil {
 		return nil, err
 	}
-	return &store{
+	s := &store{
 		writer: w,
-	}, nil
+		l:      opts.Logger,
+		ch:     make(chan any, batchSize),
+		closer: run.NewCloser(1),
+	}
+	s.run()
+	return s, nil
 }
 
 func (s *store) Stats() observability.Statistics {
@@ -83,19 +105,23 @@ func (s *store) Stats() observability.Statistics {
 }
 
 func (s *store) Close() error {
+	s.closer.CloseThenWait()
 	return s.writer.Close()
 }
 
 func (s *store) Write(fields []index.Field, itemID common.ItemID) error {
-	doc := bluge.NewDocument(string(convert.Uint64ToBytes(uint64(itemID))))
-	for _, f := range fields {
-		field := bluge.NewKeywordFieldBytes(f.Key.MarshalToStr(), f.Term).StoreValue().Sortable()
-		if f.Key.Analyzer != databasev1.IndexRule_ANALYZER_UNSPECIFIED {
-			field.WithAnalyzer(analyzers[f.Key.Analyzer])
-		}
-		doc.AddField(field)
+	if !s.closer.AddRunning() {
+		return nil
 	}
-	return s.writer.Insert(doc)
+	defer s.closer.Done()
+	select {
+	case <-s.closer.CloseNotify():
+	case s.ch <- doc{
+		fields: fields,
+		itemID: itemID,
+	}:
+	}
+	return nil
 }
 
 func (s *store) Iterator(fieldKey index.FieldKey, termRange index.RangeOpts, order modelv1.Sort) (iter index.FieldIterator, err error) {
@@ -204,6 +230,71 @@ func (s *store) Range(fieldKey index.FieldKey, opts index.RangeOpts) (list posti
 	return
 }
 
+func (s *store) run() {
+	go func() {
+		defer s.closer.Done()
+		size := 0
+		batch := bluge.NewBatch()
+		flush := func() {
+			if size < 1 {
+				return
+			}
+			if err := s.writer.Batch(batch); err != nil {
+				s.l.Error().Err(err).Msg("write to the inverted index")
+			}
+			batch.Reset()
+			size = 0
+		}
+		for {
+			timer := time.NewTimer(time.Second)
+			select {
+			case <-s.closer.CloseNotify():
+				return
+			case event, more := <-s.ch:
+				if !more {
+					return
+				}
+				switch d := event.(type) {
+				case flushEvent:
+					flush()
+					close(d.onComplete)
+				case doc:
+					doc := bluge.NewDocument(string(convert.Uint64ToBytes(uint64(d.itemID))))
+					for _, f := range d.fields {
+						field := bluge.NewKeywordFieldBytes(f.Key.MarshalToStr(), f.Term).StoreValue().Sortable()
+						if f.Key.Analyzer != databasev1.IndexRule_ANALYZER_UNSPECIFIED {
+							field.WithAnalyzer(analyzers[f.Key.Analyzer])
+						}
+						doc.AddField(field)
+					}
+					size++
+					if size >= batchSize {
+						flush()
+					} else {
+						batch.Insert(doc)
+					}
+				}
+
+			case <-timer.C:
+				flush()
+			}
+		}
+	}()
+}
+
+func (s *store) flush() {
+	if !s.closer.AddRunning() {
+		return
+	}
+	defer s.closer.Done()
+	onComplete := make(chan struct{})
+	select {
+	case <-s.closer.CloseNotify():
+	case s.ch <- flushEvent{onComplete: onComplete}:
+	}
+	<-onComplete
+}
+
 type blugeMatchIterator struct {
 	delegated search.DocumentMatchIterator
 	fieldKey  string
diff --git a/pkg/index/inverted/inverted_test.go b/pkg/index/inverted/inverted_test.go
index fda5215..d6cac53 100644
--- a/pkg/index/inverted/inverted_test.go
+++ b/pkg/index/inverted/inverted_test.go
@@ -65,6 +65,7 @@ func TestStore_Match(t *testing.T) {
 		Key:  serviceName,
 		Term: []byte("org.apache.skywalking.examples.OrderService.order"),
 	}}, common.ItemID(3)))
+	s.(*store).flush()
 
 	tests := []struct {
 		matches []string
@@ -152,6 +153,7 @@ func TestStore_MatchTerm(t *testing.T) {
 	}()
 	tester.NoError(err)
 	testcases.SetUp(tester, s)
+	s.(*store).flush()
 	testcases.RunServiceName(t, s)
 }
 
@@ -168,6 +170,7 @@ func TestStore_Iterator(t *testing.T) {
 	}()
 	tester.NoError(err)
 	data := testcases.SetUpDuration(tester, s)
+	s.(*store).flush()
 	testcases.RunDuration(t, data, s)
 }