You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ha...@apache.org on 2022/11/21 13:50:41 UTC

[skywalking-banyandb] 01/01: Sync inverted index writing

This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch index
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit bef1b550a211c8fe2cd5c84a03f2e18d587fe78f
Author: Gao Hongtao <ha...@gmail.com>
AuthorDate: Mon Nov 21 13:48:45 2022 +0000

    Sync inverted index writing
    
    Signed-off-by: Gao Hongtao <ha...@gmail.com>
---
 banyand/measure/measure_query.go    |   1 -
 banyand/measure/measure_write.go    |  23 +-------
 banyand/stream/stream_query.go      |   2 -
 banyand/stream/stream_write.go      |  22 +------
 banyand/tsdb/index/writer.go        |  56 +++++-------------
 pkg/index/inverted/inverted.go      | 113 ++++++++++++++++++++++++++++++++----
 pkg/index/inverted/inverted_test.go |   3 +
 7 files changed, 122 insertions(+), 98 deletions(-)

diff --git a/banyand/measure/measure_query.go b/banyand/measure/measure_query.go
index 89f1b13..90f77c5 100644
--- a/banyand/measure/measure_query.go
+++ b/banyand/measure/measure_query.go
@@ -44,7 +44,6 @@ type Query interface {
 
 type Measure interface {
 	io.Closer
-	Write(value *measurev1.DataPointValue) error
 	Shards(entity tsdb.Entity) ([]tsdb.Shard, error)
 	CompanionShards(metadata *commonv1.Metadata) ([]tsdb.Shard, error)
 	Shard(id common.ShardID) (tsdb.Shard, error)
diff --git a/banyand/measure/measure_write.go b/banyand/measure/measure_write.go
index 7cd8224..0e4b4fe 100644
--- a/banyand/measure/measure_write.go
+++ b/banyand/measure/measure_write.go
@@ -38,25 +38,7 @@ import (
 
 var ErrMalformedElement = errors.New("element is malformed")
 
-// Write is for testing
-func (s *measure) Write(value *measurev1.DataPointValue) error {
-	entity, shardID, err := s.entityLocator.Locate(s.name, value.GetTagFamilies(), s.shardNum)
-	if err != nil {
-		return err
-	}
-	waitCh := make(chan struct{})
-	err = s.write(shardID, tsdb.HashEntity(entity), value, func() {
-		close(waitCh)
-	})
-	if err != nil {
-		close(waitCh)
-		return err
-	}
-	<-waitCh
-	return nil
-}
-
-func (s *measure) write(shardID common.ShardID, seriesHashKey []byte, value *measurev1.DataPointValue, cb index.CallbackFn) error {
+func (s *measure) write(shardID common.ShardID, seriesHashKey []byte, value *measurev1.DataPointValue) error {
 	t := value.GetTimestamp().AsTime().Local()
 	if err := timestamp.Check(t); err != nil {
 		return errors.WithMessage(err, "writing stream")
@@ -141,7 +123,6 @@ func (s *measure) write(shardID common.ShardID, seriesHashKey []byte, value *mea
 			Timestamp:   value.GetTimestamp().AsTime(),
 		},
 		BlockCloser: wp,
-		Cb:          cb,
 	}
 	s.indexWriter.Write(m)
 	s.processorManager.onMeasureWrite(&measurev1.WriteRequest{
@@ -175,7 +156,7 @@ func (w *writeCallback) Rev(message bus.Message) (resp bus.Message) {
 		w.l.Warn().Msg("cannot find measure definition")
 		return
 	}
-	err := stm.write(common.ShardID(writeEvent.GetShardId()), writeEvent.GetSeriesHash(), writeEvent.GetRequest().GetDataPoint(), nil)
+	err := stm.write(common.ShardID(writeEvent.GetShardId()), writeEvent.GetSeriesHash(), writeEvent.GetRequest().GetDataPoint())
 	if err != nil {
 		w.l.Error().Err(err).Msg("fail to write entity")
 	}
diff --git a/banyand/stream/stream_query.go b/banyand/stream/stream_query.go
index 218fc6c..1f89be8 100644
--- a/banyand/stream/stream_query.go
+++ b/banyand/stream/stream_query.go
@@ -27,7 +27,6 @@ import (
 	commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
 	databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
 	modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
-	streamv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
 	"github.com/apache/skywalking-banyandb/banyand/tsdb"
 	"github.com/apache/skywalking-banyandb/pkg/partition"
 )
@@ -40,7 +39,6 @@ type Query interface {
 
 type Stream interface {
 	io.Closer
-	Write(value *streamv1.ElementValue) error
 	Shards(entity tsdb.Entity) ([]tsdb.Shard, error)
 	Shard(id common.ShardID) (tsdb.Shard, error)
 	ParseTagFamily(family string, item tsdb.Item) (*modelv1.TagFamily, error)
diff --git a/banyand/stream/stream_write.go b/banyand/stream/stream_write.go
index af03580..ea4098c 100644
--- a/banyand/stream/stream_write.go
+++ b/banyand/stream/stream_write.go
@@ -51,24 +51,7 @@ func init() {
 	)
 }
 
-func (s *stream) Write(value *streamv1.ElementValue) error {
-	entity, shardID, err := s.entityLocator.Locate(s.name, value.GetTagFamilies(), s.shardNum)
-	if err != nil {
-		return err
-	}
-	waitCh := make(chan struct{})
-	err = s.write(shardID, tsdb.HashEntity(entity), value, func() {
-		close(waitCh)
-	})
-	if err != nil {
-		close(waitCh)
-		return err
-	}
-	<-waitCh
-	return nil
-}
-
-func (s *stream) write(shardID common.ShardID, seriesHashKey []byte, value *streamv1.ElementValue, cb index.CallbackFn) error {
+func (s *stream) write(shardID common.ShardID, seriesHashKey []byte, value *streamv1.ElementValue) error {
 	tp := value.GetTimestamp().AsTime()
 	if err := timestamp.Check(tp); err != nil {
 		return errors.WithMessage(err, "writing stream")
@@ -142,7 +125,6 @@ func (s *stream) write(shardID common.ShardID, seriesHashKey []byte, value *stre
 			Timestamp:   value.GetTimestamp().AsTime(),
 		},
 		BlockCloser: wp,
-		Cb:          cb,
 	}
 	s.indexWriter.Write(m)
 	return err
@@ -172,7 +154,7 @@ func (w *writeCallback) Rev(message bus.Message) (resp bus.Message) {
 		w.l.Warn().Msg("cannot find stream definition")
 		return
 	}
-	err := stm.write(common.ShardID(writeEvent.GetShardId()), writeEvent.GetSeriesHash(), writeEvent.GetRequest().GetElement(), nil)
+	err := stm.write(common.ShardID(writeEvent.GetShardId()), writeEvent.GetSeriesHash(), writeEvent.GetRequest().GetElement())
 	if err != nil {
 		w.l.Error().Err(err).Msg("fail to write entity")
 	}
diff --git a/banyand/tsdb/index/writer.go b/banyand/tsdb/index/writer.go
index 1dfd30b..a410bd8 100644
--- a/banyand/tsdb/index/writer.go
+++ b/banyand/tsdb/index/writer.go
@@ -33,7 +33,6 @@ import (
 	"github.com/apache/skywalking-banyandb/pkg/logger"
 	"github.com/apache/skywalking-banyandb/pkg/partition"
 	pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
-	"github.com/apache/skywalking-banyandb/pkg/run"
 )
 
 type CallbackFn func()
@@ -43,7 +42,6 @@ type Message struct {
 	Value       Value
 	LocalWriter tsdb.Writer
 	BlockCloser io.Closer
-	Cb          CallbackFn
 }
 
 type Value struct {
@@ -71,9 +69,7 @@ type Writer struct {
 	db                tsdb.Supplier
 	shardNum          uint32
 	enableGlobalIndex bool
-	ch                chan Message
 	invertRuleIndex   map[byte][]*partition.IndexRuleLocator
-	closer            *run.Closer
 }
 
 func NewWriter(ctx context.Context, options WriterOptions) *Writer {
@@ -93,68 +89,42 @@ func NewWriter(ctx context.Context, options WriterOptions) *Writer {
 		var key byte
 		switch rule.GetLocation() {
 		case databasev1.IndexRule_LOCATION_SERIES:
-			key = key | local
+			key |= local
 		case databasev1.IndexRule_LOCATION_GLOBAL:
 			if !w.enableGlobalIndex {
 				w.l.Warn().RawJSON("index-rule", logger.Proto(ruleIndex.Rule)).Msg("global index is disabled")
 				continue
 			}
-			key = key | global
+			key |= global
 		}
 		switch rule.Type {
 		case databasev1.IndexRule_TYPE_INVERTED:
-			key = key | inverted
+			key |= inverted
 		case databasev1.IndexRule_TYPE_TREE:
-			key = key | tree
+			key |= tree
 		}
 		rules := w.invertRuleIndex[key]
 		rules = append(rules, ruleIndex)
 		w.invertRuleIndex[key] = rules
 	}
-	w.ch = make(chan Message)
-	w.bootIndexGenerator()
-	w.closer = run.NewCloser(0)
 	return w
 }
 
-func (s *Writer) Write(value Message) {
-	go func(m Message) {
-		if !s.closer.AddRunning() {
-			return
-		}
-		defer s.closer.Done()
-		select {
-		case <-s.closer.CloseNotify():
-			return
-		case s.ch <- m:
-		}
-	}(value)
+func (s *Writer) Write(m Message) {
+	err := multierr.Combine(
+		s.writeLocalIndex(m.LocalWriter, m.Value),
+		s.writeGlobalIndex(m.Scope, m.LocalWriter.ItemID(), m.Value),
+		m.BlockCloser.Close(),
+	)
+	if err != nil {
+		s.l.Error().Err(err).Msg("encounter some errors when generating indices")
+	}
 }
 
 func (s *Writer) Close() error {
-	s.closer.CloseThenWait()
-	close(s.ch)
 	return nil
 }
 
-func (s *Writer) bootIndexGenerator() {
-	go func() {
-		for m := range s.ch {
-			err := multierr.Combine(
-				s.writeLocalIndex(m.LocalWriter, m.Value),
-				s.writeGlobalIndex(m.Scope, m.LocalWriter.ItemID(), m.Value),
-				m.BlockCloser.Close(),
-			)
-			if err != nil {
-				s.l.Error().Err(err).Msg("encounter some errors when generating indices")
-			}
-			if m.Cb != nil {
-				m.Cb()
-			}
-		}
-	}()
-}
-
 // TODO: should listen to pipeline in a distributed cluster
 func (s *Writer) writeGlobalIndex(scope tsdb.Entry, ref tsdb.GlobalItemID, value Value) error {
 	collect := func(ruleIndexes []*partition.IndexRuleLocator, fn func(indexWriter tsdb.IndexWriter, fields []index.Field) error) error {
diff --git a/pkg/index/inverted/inverted.go b/pkg/index/inverted/inverted.go
index 4f680b8..31e57e9 100644
--- a/pkg/index/inverted/inverted.go
+++ b/pkg/index/inverted/inverted.go
@@ -23,6 +23,7 @@ import (
 	"errors"
 	"log"
 	"math"
+	"time"
 
 	"github.com/blugelabs/bluge"
 	"github.com/blugelabs/bluge/analysis"
@@ -40,9 +41,13 @@ import (
 	"github.com/apache/skywalking-banyandb/pkg/index/posting"
 	"github.com/apache/skywalking-banyandb/pkg/index/posting/roaring"
 	"github.com/apache/skywalking-banyandb/pkg/logger"
+	"github.com/apache/skywalking-banyandb/pkg/run"
 )
 
-const docID = "_id"
+const (
+	docID     = "_id"
+	batchSize = 1024
+)
 
 var analyzers map[databasev1.IndexRule_Analyzer]*analysis.Analyzer
 
@@ -61,8 +66,20 @@ type StoreOpts struct {
 	Logger *logger.Logger
 }
 
+type doc struct {
+	fields []index.Field
+	itemID common.ItemID
+}
+
+type flushEvent struct {
+	onComplete chan struct{}
+}
+
 type store struct {
 	writer *bluge.Writer
+	ch     chan any
+	closer *run.Closer
+	l      *logger.Logger
 }
 
 func NewStore(opts StoreOpts) (index.Store, error) {
@@ -73,9 +90,14 @@ func NewStore(opts StoreOpts) (index.Store, error) {
 	if err != nil {
 		return nil, err
 	}
-	return &store{
+	s := &store{
 		writer: w,
-	}, nil
+		l:      opts.Logger,
+		ch:     make(chan any, batchSize),
+		closer: run.NewCloser(1),
+	}
+	s.run()
+	return s, nil
 }
 
 func (s *store) Stats() observability.Statistics {
@@ -83,19 +105,23 @@ func (s *store) Stats() observability.Statistics {
 }
 
 func (s *store) Close() error {
+	s.closer.CloseThenWait()
 	return s.writer.Close()
 }
 
 func (s *store) Write(fields []index.Field, itemID common.ItemID) error {
-	doc := bluge.NewDocument(string(convert.Uint64ToBytes(uint64(itemID))))
-	for _, f := range fields {
-		field := bluge.NewKeywordFieldBytes(f.Key.MarshalToStr(), f.Term).StoreValue().Sortable()
-		if f.Key.Analyzer != databasev1.IndexRule_ANALYZER_UNSPECIFIED {
-			field.WithAnalyzer(analyzers[f.Key.Analyzer])
-		}
-		doc.AddField(field)
+	if !s.closer.AddRunning() {
+		return nil
 	}
-	return s.writer.Insert(doc)
+	defer s.closer.Done()
+	select {
+	case <-s.closer.CloseNotify():
+	case s.ch <- doc{
+		fields: fields,
+		itemID: itemID,
+	}:
+	}
+	return nil
 }
 
 func (s *store) Iterator(fieldKey index.FieldKey, termRange index.RangeOpts, order modelv1.Sort) (iter index.FieldIterator, err error) {
@@ -204,6 +230,71 @@ func (s *store) Range(fieldKey index.FieldKey, opts index.RangeOpts) (list posti
 	return
 }
 
+func (s *store) run() {
+	go func() {
+		defer s.closer.Done()
+		size := 0
+		batch := bluge.NewBatch()
+		flush := func() {
+			if size < 1 {
+				return
+			}
+			if err := s.writer.Batch(batch); err != nil {
+				s.l.Error().Err(err).Msg("write to the inverted index")
+			}
+			batch.Reset()
+			size = 0
+		}
+		for {
+			timer := time.NewTimer(time.Second)
+			select {
+			case <-s.closer.CloseNotify():
+				return
+			case event, more := <-s.ch:
+				if !more {
+					return
+				}
+				switch d := event.(type) {
+				case flushEvent:
+					flush()
+					close(d.onComplete)
+				case doc:
+					doc := bluge.NewDocument(string(convert.Uint64ToBytes(uint64(d.itemID))))
+					for _, f := range d.fields {
+						field := bluge.NewKeywordFieldBytes(f.Key.MarshalToStr(), f.Term).StoreValue().Sortable()
+						if f.Key.Analyzer != databasev1.IndexRule_ANALYZER_UNSPECIFIED {
+							field.WithAnalyzer(analyzers[f.Key.Analyzer])
+						}
+						doc.AddField(field)
+					}
+					size++
+					if size >= batchSize {
+						flush()
+					} else {
+						batch.Insert(doc)
+					}
+				}
+
+			case <-timer.C:
+				flush()
+			}
+		}
+	}()
+}
+
+func (s *store) flush() {
+	if !s.closer.AddRunning() {
+		return
+	}
+	defer s.closer.Done()
+	onComplete := make(chan struct{})
+	select {
+	case <-s.closer.CloseNotify():
+	case s.ch <- flushEvent{onComplete: onComplete}:
+	}
+	<-onComplete
+}
+
 type blugeMatchIterator struct {
 	delegated search.DocumentMatchIterator
 	fieldKey  string
diff --git a/pkg/index/inverted/inverted_test.go b/pkg/index/inverted/inverted_test.go
index fda5215..d6cac53 100644
--- a/pkg/index/inverted/inverted_test.go
+++ b/pkg/index/inverted/inverted_test.go
@@ -65,6 +65,7 @@ func TestStore_Match(t *testing.T) {
 		Key:  serviceName,
 		Term: []byte("org.apache.skywalking.examples.OrderService.order"),
 	}}, common.ItemID(3)))
+	s.(*store).flush()
 
 	tests := []struct {
 		matches []string
@@ -152,6 +153,7 @@ func TestStore_MatchTerm(t *testing.T) {
 	}()
 	tester.NoError(err)
 	testcases.SetUp(tester, s)
+	s.(*store).flush()
 	testcases.RunServiceName(t, s)
 }
 
@@ -168,6 +170,7 @@ func TestStore_Iterator(t *testing.T) {
 	}()
 	tester.NoError(err)
 	data := testcases.SetUpDuration(tester, s)
+	s.(*store).flush()
 	testcases.RunDuration(t, data, s)
 }