You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ha...@apache.org on 2022/11/21 13:50:41 UTC
[skywalking-banyandb] 01/01: Sync inverted index writing
This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch index
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit bef1b550a211c8fe2cd5c84a03f2e18d587fe78f
Author: Gao Hongtao <ha...@gmail.com>
AuthorDate: Mon Nov 21 13:48:45 2022 +0000
Sync inverted index writing
Signed-off-by: Gao Hongtao <ha...@gmail.com>
---
banyand/measure/measure_query.go | 1 -
banyand/measure/measure_write.go | 23 +-------
banyand/stream/stream_query.go | 2 -
banyand/stream/stream_write.go | 22 +------
banyand/tsdb/index/writer.go | 56 +++++-------------
pkg/index/inverted/inverted.go | 113 ++++++++++++++++++++++++++++++++----
pkg/index/inverted/inverted_test.go | 3 +
7 files changed, 122 insertions(+), 98 deletions(-)
diff --git a/banyand/measure/measure_query.go b/banyand/measure/measure_query.go
index 89f1b13..90f77c5 100644
--- a/banyand/measure/measure_query.go
+++ b/banyand/measure/measure_query.go
@@ -44,7 +44,6 @@ type Query interface {
type Measure interface {
io.Closer
- Write(value *measurev1.DataPointValue) error
Shards(entity tsdb.Entity) ([]tsdb.Shard, error)
CompanionShards(metadata *commonv1.Metadata) ([]tsdb.Shard, error)
Shard(id common.ShardID) (tsdb.Shard, error)
diff --git a/banyand/measure/measure_write.go b/banyand/measure/measure_write.go
index 7cd8224..0e4b4fe 100644
--- a/banyand/measure/measure_write.go
+++ b/banyand/measure/measure_write.go
@@ -38,25 +38,7 @@ import (
var ErrMalformedElement = errors.New("element is malformed")
-// Write is for testing
-func (s *measure) Write(value *measurev1.DataPointValue) error {
- entity, shardID, err := s.entityLocator.Locate(s.name, value.GetTagFamilies(), s.shardNum)
- if err != nil {
- return err
- }
- waitCh := make(chan struct{})
- err = s.write(shardID, tsdb.HashEntity(entity), value, func() {
- close(waitCh)
- })
- if err != nil {
- close(waitCh)
- return err
- }
- <-waitCh
- return nil
-}
-
-func (s *measure) write(shardID common.ShardID, seriesHashKey []byte, value *measurev1.DataPointValue, cb index.CallbackFn) error {
+func (s *measure) write(shardID common.ShardID, seriesHashKey []byte, value *measurev1.DataPointValue) error {
t := value.GetTimestamp().AsTime().Local()
if err := timestamp.Check(t); err != nil {
return errors.WithMessage(err, "writing stream")
@@ -141,7 +123,6 @@ func (s *measure) write(shardID common.ShardID, seriesHashKey []byte, value *mea
Timestamp: value.GetTimestamp().AsTime(),
},
BlockCloser: wp,
- Cb: cb,
}
s.indexWriter.Write(m)
s.processorManager.onMeasureWrite(&measurev1.WriteRequest{
@@ -175,7 +156,7 @@ func (w *writeCallback) Rev(message bus.Message) (resp bus.Message) {
w.l.Warn().Msg("cannot find measure definition")
return
}
- err := stm.write(common.ShardID(writeEvent.GetShardId()), writeEvent.GetSeriesHash(), writeEvent.GetRequest().GetDataPoint(), nil)
+ err := stm.write(common.ShardID(writeEvent.GetShardId()), writeEvent.GetSeriesHash(), writeEvent.GetRequest().GetDataPoint())
if err != nil {
w.l.Error().Err(err).Msg("fail to write entity")
}
diff --git a/banyand/stream/stream_query.go b/banyand/stream/stream_query.go
index 218fc6c..1f89be8 100644
--- a/banyand/stream/stream_query.go
+++ b/banyand/stream/stream_query.go
@@ -27,7 +27,6 @@ import (
commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
- streamv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
"github.com/apache/skywalking-banyandb/banyand/tsdb"
"github.com/apache/skywalking-banyandb/pkg/partition"
)
@@ -40,7 +39,6 @@ type Query interface {
type Stream interface {
io.Closer
- Write(value *streamv1.ElementValue) error
Shards(entity tsdb.Entity) ([]tsdb.Shard, error)
Shard(id common.ShardID) (tsdb.Shard, error)
ParseTagFamily(family string, item tsdb.Item) (*modelv1.TagFamily, error)
diff --git a/banyand/stream/stream_write.go b/banyand/stream/stream_write.go
index af03580..ea4098c 100644
--- a/banyand/stream/stream_write.go
+++ b/banyand/stream/stream_write.go
@@ -51,24 +51,7 @@ func init() {
)
}
-func (s *stream) Write(value *streamv1.ElementValue) error {
- entity, shardID, err := s.entityLocator.Locate(s.name, value.GetTagFamilies(), s.shardNum)
- if err != nil {
- return err
- }
- waitCh := make(chan struct{})
- err = s.write(shardID, tsdb.HashEntity(entity), value, func() {
- close(waitCh)
- })
- if err != nil {
- close(waitCh)
- return err
- }
- <-waitCh
- return nil
-}
-
-func (s *stream) write(shardID common.ShardID, seriesHashKey []byte, value *streamv1.ElementValue, cb index.CallbackFn) error {
+func (s *stream) write(shardID common.ShardID, seriesHashKey []byte, value *streamv1.ElementValue) error {
tp := value.GetTimestamp().AsTime()
if err := timestamp.Check(tp); err != nil {
return errors.WithMessage(err, "writing stream")
@@ -142,7 +125,6 @@ func (s *stream) write(shardID common.ShardID, seriesHashKey []byte, value *stre
Timestamp: value.GetTimestamp().AsTime(),
},
BlockCloser: wp,
- Cb: cb,
}
s.indexWriter.Write(m)
return err
@@ -172,7 +154,7 @@ func (w *writeCallback) Rev(message bus.Message) (resp bus.Message) {
w.l.Warn().Msg("cannot find stream definition")
return
}
- err := stm.write(common.ShardID(writeEvent.GetShardId()), writeEvent.GetSeriesHash(), writeEvent.GetRequest().GetElement(), nil)
+ err := stm.write(common.ShardID(writeEvent.GetShardId()), writeEvent.GetSeriesHash(), writeEvent.GetRequest().GetElement())
if err != nil {
w.l.Error().Err(err).Msg("fail to write entity")
}
diff --git a/banyand/tsdb/index/writer.go b/banyand/tsdb/index/writer.go
index 1dfd30b..a410bd8 100644
--- a/banyand/tsdb/index/writer.go
+++ b/banyand/tsdb/index/writer.go
@@ -33,7 +33,6 @@ import (
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/partition"
pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
- "github.com/apache/skywalking-banyandb/pkg/run"
)
type CallbackFn func()
@@ -43,7 +42,6 @@ type Message struct {
Value Value
LocalWriter tsdb.Writer
BlockCloser io.Closer
- Cb CallbackFn
}
type Value struct {
@@ -71,9 +69,7 @@ type Writer struct {
db tsdb.Supplier
shardNum uint32
enableGlobalIndex bool
- ch chan Message
invertRuleIndex map[byte][]*partition.IndexRuleLocator
- closer *run.Closer
}
func NewWriter(ctx context.Context, options WriterOptions) *Writer {
@@ -93,68 +89,42 @@ func NewWriter(ctx context.Context, options WriterOptions) *Writer {
var key byte
switch rule.GetLocation() {
case databasev1.IndexRule_LOCATION_SERIES:
- key = key | local
+ key |= local
case databasev1.IndexRule_LOCATION_GLOBAL:
if !w.enableGlobalIndex {
w.l.Warn().RawJSON("index-rule", logger.Proto(ruleIndex.Rule)).Msg("global index is disabled")
continue
}
- key = key | global
+ key |= global
}
switch rule.Type {
case databasev1.IndexRule_TYPE_INVERTED:
- key = key | inverted
+ key |= inverted
case databasev1.IndexRule_TYPE_TREE:
- key = key | tree
+ key |= tree
}
rules := w.invertRuleIndex[key]
rules = append(rules, ruleIndex)
w.invertRuleIndex[key] = rules
}
- w.ch = make(chan Message)
- w.bootIndexGenerator()
- w.closer = run.NewCloser(0)
return w
}
-func (s *Writer) Write(value Message) {
- go func(m Message) {
- if !s.closer.AddRunning() {
- return
- }
- defer s.closer.Done()
- select {
- case <-s.closer.CloseNotify():
- return
- case s.ch <- m:
- }
- }(value)
+func (s *Writer) Write(m Message) {
+ err := multierr.Combine(
+ s.writeLocalIndex(m.LocalWriter, m.Value),
+ s.writeGlobalIndex(m.Scope, m.LocalWriter.ItemID(), m.Value),
+ m.BlockCloser.Close(),
+ )
+ if err != nil {
+ s.l.Error().Err(err).Msg("encounter some errors when generating indices")
+ }
}
func (s *Writer) Close() error {
- s.closer.CloseThenWait()
- close(s.ch)
return nil
}
-func (s *Writer) bootIndexGenerator() {
- go func() {
- for m := range s.ch {
- err := multierr.Combine(
- s.writeLocalIndex(m.LocalWriter, m.Value),
- s.writeGlobalIndex(m.Scope, m.LocalWriter.ItemID(), m.Value),
- m.BlockCloser.Close(),
- )
- if err != nil {
- s.l.Error().Err(err).Msg("encounter some errors when generating indices")
- }
- if m.Cb != nil {
- m.Cb()
- }
- }
- }()
-}
-
// TODO: should listen to pipeline in a distributed cluster
func (s *Writer) writeGlobalIndex(scope tsdb.Entry, ref tsdb.GlobalItemID, value Value) error {
collect := func(ruleIndexes []*partition.IndexRuleLocator, fn func(indexWriter tsdb.IndexWriter, fields []index.Field) error) error {
diff --git a/pkg/index/inverted/inverted.go b/pkg/index/inverted/inverted.go
index 4f680b8..31e57e9 100644
--- a/pkg/index/inverted/inverted.go
+++ b/pkg/index/inverted/inverted.go
@@ -23,6 +23,7 @@ import (
"errors"
"log"
"math"
+ "time"
"github.com/blugelabs/bluge"
"github.com/blugelabs/bluge/analysis"
@@ -40,9 +41,13 @@ import (
"github.com/apache/skywalking-banyandb/pkg/index/posting"
"github.com/apache/skywalking-banyandb/pkg/index/posting/roaring"
"github.com/apache/skywalking-banyandb/pkg/logger"
+ "github.com/apache/skywalking-banyandb/pkg/run"
)
-const docID = "_id"
+const (
+ docID = "_id"
+ batchSize = 1024
+)
var analyzers map[databasev1.IndexRule_Analyzer]*analysis.Analyzer
@@ -61,8 +66,20 @@ type StoreOpts struct {
Logger *logger.Logger
}
+type doc struct {
+ fields []index.Field
+ itemID common.ItemID
+}
+
+type flushEvent struct {
+ onComplete chan struct{}
+}
+
type store struct {
writer *bluge.Writer
+ ch chan any
+ closer *run.Closer
+ l *logger.Logger
}
func NewStore(opts StoreOpts) (index.Store, error) {
@@ -73,9 +90,14 @@ func NewStore(opts StoreOpts) (index.Store, error) {
if err != nil {
return nil, err
}
- return &store{
+ s := &store{
writer: w,
- }, nil
+ l: opts.Logger,
+ ch: make(chan any, batchSize),
+ closer: run.NewCloser(1),
+ }
+ s.run()
+ return s, nil
}
func (s *store) Stats() observability.Statistics {
@@ -83,19 +105,23 @@ func (s *store) Stats() observability.Statistics {
}
func (s *store) Close() error {
+ s.closer.CloseThenWait()
return s.writer.Close()
}
func (s *store) Write(fields []index.Field, itemID common.ItemID) error {
- doc := bluge.NewDocument(string(convert.Uint64ToBytes(uint64(itemID))))
- for _, f := range fields {
- field := bluge.NewKeywordFieldBytes(f.Key.MarshalToStr(), f.Term).StoreValue().Sortable()
- if f.Key.Analyzer != databasev1.IndexRule_ANALYZER_UNSPECIFIED {
- field.WithAnalyzer(analyzers[f.Key.Analyzer])
- }
- doc.AddField(field)
+ if !s.closer.AddRunning() {
+ return nil
}
- return s.writer.Insert(doc)
+ defer s.closer.Done()
+ select {
+ case <-s.closer.CloseNotify():
+ case s.ch <- doc{
+ fields: fields,
+ itemID: itemID,
+ }:
+ }
+ return nil
}
func (s *store) Iterator(fieldKey index.FieldKey, termRange index.RangeOpts, order modelv1.Sort) (iter index.FieldIterator, err error) {
@@ -204,6 +230,71 @@ func (s *store) Range(fieldKey index.FieldKey, opts index.RangeOpts) (list posti
return
}
+func (s *store) run() {
+ go func() {
+ defer s.closer.Done()
+ size := 0
+ batch := bluge.NewBatch()
+ flush := func() {
+ if size < 1 {
+ return
+ }
+ if err := s.writer.Batch(batch); err != nil {
+ s.l.Error().Err(err).Msg("write to the inverted index")
+ }
+ batch.Reset()
+ size = 0
+ }
+ for {
+ timer := time.NewTimer(time.Second)
+ select {
+ case <-s.closer.CloseNotify():
+ return
+ case event, more := <-s.ch:
+ if !more {
+ return
+ }
+ switch d := event.(type) {
+ case flushEvent:
+ flush()
+ close(d.onComplete)
+ case doc:
+ doc := bluge.NewDocument(string(convert.Uint64ToBytes(uint64(d.itemID))))
+ for _, f := range d.fields {
+ field := bluge.NewKeywordFieldBytes(f.Key.MarshalToStr(), f.Term).StoreValue().Sortable()
+ if f.Key.Analyzer != databasev1.IndexRule_ANALYZER_UNSPECIFIED {
+ field.WithAnalyzer(analyzers[f.Key.Analyzer])
+ }
+ doc.AddField(field)
+ }
+ size++
+ if size >= batchSize {
+ flush()
+ } else {
+ batch.Insert(doc)
+ }
+ }
+
+ case <-timer.C:
+ flush()
+ }
+ }
+ }()
+}
+
+func (s *store) flush() {
+ if !s.closer.AddRunning() {
+ return
+ }
+ defer s.closer.Done()
+ onComplete := make(chan struct{})
+ select {
+ case <-s.closer.CloseNotify():
+ case s.ch <- flushEvent{onComplete: onComplete}:
+ }
+ <-onComplete
+}
+
type blugeMatchIterator struct {
delegated search.DocumentMatchIterator
fieldKey string
diff --git a/pkg/index/inverted/inverted_test.go b/pkg/index/inverted/inverted_test.go
index fda5215..d6cac53 100644
--- a/pkg/index/inverted/inverted_test.go
+++ b/pkg/index/inverted/inverted_test.go
@@ -65,6 +65,7 @@ func TestStore_Match(t *testing.T) {
Key: serviceName,
Term: []byte("org.apache.skywalking.examples.OrderService.order"),
}}, common.ItemID(3)))
+ s.(*store).flush()
tests := []struct {
matches []string
@@ -152,6 +153,7 @@ func TestStore_MatchTerm(t *testing.T) {
}()
tester.NoError(err)
testcases.SetUp(tester, s)
+ s.(*store).flush()
testcases.RunServiceName(t, s)
}
@@ -168,6 +170,7 @@ func TestStore_Iterator(t *testing.T) {
}()
tester.NoError(err)
data := testcases.SetUpDuration(tester, s)
+ s.(*store).flush()
testcases.RunDuration(t, data, s)
}