You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ha...@apache.org on 2022/11/21 13:50:40 UTC
[skywalking-banyandb] branch index created (now bef1b55)
This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a change to branch index
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
at bef1b55 Sync inverted index writing
This branch includes the following new commits:
new bef1b55 Sync inverted index writing
The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails. The revisions
listed as "add" were already present in the repository and have only
been added to this reference.
[skywalking-banyandb] 01/01: Sync inverted index writing
Posted by ha...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch index
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit bef1b550a211c8fe2cd5c84a03f2e18d587fe78f
Author: Gao Hongtao <ha...@gmail.com>
AuthorDate: Mon Nov 21 13:48:45 2022 +0000
Sync inverted index writing
Signed-off-by: Gao Hongtao <ha...@gmail.com>
---
banyand/measure/measure_query.go | 1 -
banyand/measure/measure_write.go | 23 +-------
banyand/stream/stream_query.go | 2 -
banyand/stream/stream_write.go | 22 +------
banyand/tsdb/index/writer.go | 56 +++++-------------
pkg/index/inverted/inverted.go | 113 ++++++++++++++++++++++++++++++++----
pkg/index/inverted/inverted_test.go | 3 +
7 files changed, 122 insertions(+), 98 deletions(-)
diff --git a/banyand/measure/measure_query.go b/banyand/measure/measure_query.go
index 89f1b13..90f77c5 100644
--- a/banyand/measure/measure_query.go
+++ b/banyand/measure/measure_query.go
@@ -44,7 +44,6 @@ type Query interface {
type Measure interface {
io.Closer
- Write(value *measurev1.DataPointValue) error
Shards(entity tsdb.Entity) ([]tsdb.Shard, error)
CompanionShards(metadata *commonv1.Metadata) ([]tsdb.Shard, error)
Shard(id common.ShardID) (tsdb.Shard, error)
diff --git a/banyand/measure/measure_write.go b/banyand/measure/measure_write.go
index 7cd8224..0e4b4fe 100644
--- a/banyand/measure/measure_write.go
+++ b/banyand/measure/measure_write.go
@@ -38,25 +38,7 @@ import (
var ErrMalformedElement = errors.New("element is malformed")
-// Write is for testing
-func (s *measure) Write(value *measurev1.DataPointValue) error {
- entity, shardID, err := s.entityLocator.Locate(s.name, value.GetTagFamilies(), s.shardNum)
- if err != nil {
- return err
- }
- waitCh := make(chan struct{})
- err = s.write(shardID, tsdb.HashEntity(entity), value, func() {
- close(waitCh)
- })
- if err != nil {
- close(waitCh)
- return err
- }
- <-waitCh
- return nil
-}
-
-func (s *measure) write(shardID common.ShardID, seriesHashKey []byte, value *measurev1.DataPointValue, cb index.CallbackFn) error {
+func (s *measure) write(shardID common.ShardID, seriesHashKey []byte, value *measurev1.DataPointValue) error {
t := value.GetTimestamp().AsTime().Local()
if err := timestamp.Check(t); err != nil {
return errors.WithMessage(err, "writing stream")
@@ -141,7 +123,6 @@ func (s *measure) write(shardID common.ShardID, seriesHashKey []byte, value *mea
Timestamp: value.GetTimestamp().AsTime(),
},
BlockCloser: wp,
- Cb: cb,
}
s.indexWriter.Write(m)
s.processorManager.onMeasureWrite(&measurev1.WriteRequest{
@@ -175,7 +156,7 @@ func (w *writeCallback) Rev(message bus.Message) (resp bus.Message) {
w.l.Warn().Msg("cannot find measure definition")
return
}
- err := stm.write(common.ShardID(writeEvent.GetShardId()), writeEvent.GetSeriesHash(), writeEvent.GetRequest().GetDataPoint(), nil)
+ err := stm.write(common.ShardID(writeEvent.GetShardId()), writeEvent.GetSeriesHash(), writeEvent.GetRequest().GetDataPoint())
if err != nil {
w.l.Error().Err(err).Msg("fail to write entity")
}
diff --git a/banyand/stream/stream_query.go b/banyand/stream/stream_query.go
index 218fc6c..1f89be8 100644
--- a/banyand/stream/stream_query.go
+++ b/banyand/stream/stream_query.go
@@ -27,7 +27,6 @@ import (
commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
- streamv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
"github.com/apache/skywalking-banyandb/banyand/tsdb"
"github.com/apache/skywalking-banyandb/pkg/partition"
)
@@ -40,7 +39,6 @@ type Query interface {
type Stream interface {
io.Closer
- Write(value *streamv1.ElementValue) error
Shards(entity tsdb.Entity) ([]tsdb.Shard, error)
Shard(id common.ShardID) (tsdb.Shard, error)
ParseTagFamily(family string, item tsdb.Item) (*modelv1.TagFamily, error)
diff --git a/banyand/stream/stream_write.go b/banyand/stream/stream_write.go
index af03580..ea4098c 100644
--- a/banyand/stream/stream_write.go
+++ b/banyand/stream/stream_write.go
@@ -51,24 +51,7 @@ func init() {
)
}
-func (s *stream) Write(value *streamv1.ElementValue) error {
- entity, shardID, err := s.entityLocator.Locate(s.name, value.GetTagFamilies(), s.shardNum)
- if err != nil {
- return err
- }
- waitCh := make(chan struct{})
- err = s.write(shardID, tsdb.HashEntity(entity), value, func() {
- close(waitCh)
- })
- if err != nil {
- close(waitCh)
- return err
- }
- <-waitCh
- return nil
-}
-
-func (s *stream) write(shardID common.ShardID, seriesHashKey []byte, value *streamv1.ElementValue, cb index.CallbackFn) error {
+func (s *stream) write(shardID common.ShardID, seriesHashKey []byte, value *streamv1.ElementValue) error {
tp := value.GetTimestamp().AsTime()
if err := timestamp.Check(tp); err != nil {
return errors.WithMessage(err, "writing stream")
@@ -142,7 +125,6 @@ func (s *stream) write(shardID common.ShardID, seriesHashKey []byte, value *stre
Timestamp: value.GetTimestamp().AsTime(),
},
BlockCloser: wp,
- Cb: cb,
}
s.indexWriter.Write(m)
return err
@@ -172,7 +154,7 @@ func (w *writeCallback) Rev(message bus.Message) (resp bus.Message) {
w.l.Warn().Msg("cannot find stream definition")
return
}
- err := stm.write(common.ShardID(writeEvent.GetShardId()), writeEvent.GetSeriesHash(), writeEvent.GetRequest().GetElement(), nil)
+ err := stm.write(common.ShardID(writeEvent.GetShardId()), writeEvent.GetSeriesHash(), writeEvent.GetRequest().GetElement())
if err != nil {
w.l.Error().Err(err).Msg("fail to write entity")
}
diff --git a/banyand/tsdb/index/writer.go b/banyand/tsdb/index/writer.go
index 1dfd30b..a410bd8 100644
--- a/banyand/tsdb/index/writer.go
+++ b/banyand/tsdb/index/writer.go
@@ -33,7 +33,6 @@ import (
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/partition"
pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
- "github.com/apache/skywalking-banyandb/pkg/run"
)
type CallbackFn func()
@@ -43,7 +42,6 @@ type Message struct {
Value Value
LocalWriter tsdb.Writer
BlockCloser io.Closer
- Cb CallbackFn
}
type Value struct {
@@ -71,9 +69,7 @@ type Writer struct {
db tsdb.Supplier
shardNum uint32
enableGlobalIndex bool
- ch chan Message
invertRuleIndex map[byte][]*partition.IndexRuleLocator
- closer *run.Closer
}
func NewWriter(ctx context.Context, options WriterOptions) *Writer {
@@ -93,68 +89,42 @@ func NewWriter(ctx context.Context, options WriterOptions) *Writer {
var key byte
switch rule.GetLocation() {
case databasev1.IndexRule_LOCATION_SERIES:
- key = key | local
+ key |= local
case databasev1.IndexRule_LOCATION_GLOBAL:
if !w.enableGlobalIndex {
w.l.Warn().RawJSON("index-rule", logger.Proto(ruleIndex.Rule)).Msg("global index is disabled")
continue
}
- key = key | global
+ key |= global
}
switch rule.Type {
case databasev1.IndexRule_TYPE_INVERTED:
- key = key | inverted
+ key |= inverted
case databasev1.IndexRule_TYPE_TREE:
- key = key | tree
+ key |= tree
}
rules := w.invertRuleIndex[key]
rules = append(rules, ruleIndex)
w.invertRuleIndex[key] = rules
}
- w.ch = make(chan Message)
- w.bootIndexGenerator()
- w.closer = run.NewCloser(0)
return w
}
-func (s *Writer) Write(value Message) {
- go func(m Message) {
- if !s.closer.AddRunning() {
- return
- }
- defer s.closer.Done()
- select {
- case <-s.closer.CloseNotify():
- return
- case s.ch <- m:
- }
- }(value)
+func (s *Writer) Write(m Message) {
+ err := multierr.Combine(
+ s.writeLocalIndex(m.LocalWriter, m.Value),
+ s.writeGlobalIndex(m.Scope, m.LocalWriter.ItemID(), m.Value),
+ m.BlockCloser.Close(),
+ )
+ if err != nil {
+ s.l.Error().Err(err).Msg("encounter some errors when generating indices")
+ }
}
func (s *Writer) Close() error {
- s.closer.CloseThenWait()
- close(s.ch)
return nil
}
-func (s *Writer) bootIndexGenerator() {
- go func() {
- for m := range s.ch {
- err := multierr.Combine(
- s.writeLocalIndex(m.LocalWriter, m.Value),
- s.writeGlobalIndex(m.Scope, m.LocalWriter.ItemID(), m.Value),
- m.BlockCloser.Close(),
- )
- if err != nil {
- s.l.Error().Err(err).Msg("encounter some errors when generating indices")
- }
- if m.Cb != nil {
- m.Cb()
- }
- }
- }()
-}
-
// TODO: should listen to pipeline in a distributed cluster
func (s *Writer) writeGlobalIndex(scope tsdb.Entry, ref tsdb.GlobalItemID, value Value) error {
collect := func(ruleIndexes []*partition.IndexRuleLocator, fn func(indexWriter tsdb.IndexWriter, fields []index.Field) error) error {
diff --git a/pkg/index/inverted/inverted.go b/pkg/index/inverted/inverted.go
index 4f680b8..31e57e9 100644
--- a/pkg/index/inverted/inverted.go
+++ b/pkg/index/inverted/inverted.go
@@ -23,6 +23,7 @@ import (
"errors"
"log"
"math"
+ "time"
"github.com/blugelabs/bluge"
"github.com/blugelabs/bluge/analysis"
@@ -40,9 +41,13 @@ import (
"github.com/apache/skywalking-banyandb/pkg/index/posting"
"github.com/apache/skywalking-banyandb/pkg/index/posting/roaring"
"github.com/apache/skywalking-banyandb/pkg/logger"
+ "github.com/apache/skywalking-banyandb/pkg/run"
)
-const docID = "_id"
+const (
+ docID = "_id"
+ batchSize = 1024
+)
var analyzers map[databasev1.IndexRule_Analyzer]*analysis.Analyzer
@@ -61,8 +66,20 @@ type StoreOpts struct {
Logger *logger.Logger
}
+type doc struct {
+ fields []index.Field
+ itemID common.ItemID
+}
+
+type flushEvent struct {
+ onComplete chan struct{}
+}
+
type store struct {
writer *bluge.Writer
+ ch chan any
+ closer *run.Closer
+ l *logger.Logger
}
func NewStore(opts StoreOpts) (index.Store, error) {
@@ -73,9 +90,14 @@ func NewStore(opts StoreOpts) (index.Store, error) {
if err != nil {
return nil, err
}
- return &store{
+ s := &store{
writer: w,
- }, nil
+ l: opts.Logger,
+ ch: make(chan any, batchSize),
+ closer: run.NewCloser(1),
+ }
+ s.run()
+ return s, nil
}
func (s *store) Stats() observability.Statistics {
@@ -83,19 +105,23 @@ func (s *store) Stats() observability.Statistics {
}
func (s *store) Close() error {
+ s.closer.CloseThenWait()
return s.writer.Close()
}
func (s *store) Write(fields []index.Field, itemID common.ItemID) error {
- doc := bluge.NewDocument(string(convert.Uint64ToBytes(uint64(itemID))))
- for _, f := range fields {
- field := bluge.NewKeywordFieldBytes(f.Key.MarshalToStr(), f.Term).StoreValue().Sortable()
- if f.Key.Analyzer != databasev1.IndexRule_ANALYZER_UNSPECIFIED {
- field.WithAnalyzer(analyzers[f.Key.Analyzer])
- }
- doc.AddField(field)
+ if !s.closer.AddRunning() {
+ return nil
}
- return s.writer.Insert(doc)
+ defer s.closer.Done()
+ select {
+ case <-s.closer.CloseNotify():
+ case s.ch <- doc{
+ fields: fields,
+ itemID: itemID,
+ }:
+ }
+ return nil
}
func (s *store) Iterator(fieldKey index.FieldKey, termRange index.RangeOpts, order modelv1.Sort) (iter index.FieldIterator, err error) {
@@ -204,6 +230,71 @@ func (s *store) Range(fieldKey index.FieldKey, opts index.RangeOpts) (list posti
return
}
+func (s *store) run() {
+ go func() {
+ defer s.closer.Done()
+ size := 0
+ batch := bluge.NewBatch()
+ flush := func() {
+ if size < 1 {
+ return
+ }
+ if err := s.writer.Batch(batch); err != nil {
+ s.l.Error().Err(err).Msg("write to the inverted index")
+ }
+ batch.Reset()
+ size = 0
+ }
+ for {
+ timer := time.NewTimer(time.Second)
+ select {
+ case <-s.closer.CloseNotify():
+ return
+ case event, more := <-s.ch:
+ if !more {
+ return
+ }
+ switch d := event.(type) {
+ case flushEvent:
+ flush()
+ close(d.onComplete)
+ case doc:
+ doc := bluge.NewDocument(string(convert.Uint64ToBytes(uint64(d.itemID))))
+ for _, f := range d.fields {
+ field := bluge.NewKeywordFieldBytes(f.Key.MarshalToStr(), f.Term).StoreValue().Sortable()
+ if f.Key.Analyzer != databasev1.IndexRule_ANALYZER_UNSPECIFIED {
+ field.WithAnalyzer(analyzers[f.Key.Analyzer])
+ }
+ doc.AddField(field)
+ }
+ size++
+ if size >= batchSize {
+ flush()
+ } else {
+ batch.Insert(doc)
+ }
+ }
+
+ case <-timer.C:
+ flush()
+ }
+ }
+ }()
+}
+
+func (s *store) flush() {
+ if !s.closer.AddRunning() {
+ return
+ }
+ defer s.closer.Done()
+ onComplete := make(chan struct{})
+ select {
+ case <-s.closer.CloseNotify():
+ case s.ch <- flushEvent{onComplete: onComplete}:
+ }
+ <-onComplete
+}
+
type blugeMatchIterator struct {
delegated search.DocumentMatchIterator
fieldKey string
diff --git a/pkg/index/inverted/inverted_test.go b/pkg/index/inverted/inverted_test.go
index fda5215..d6cac53 100644
--- a/pkg/index/inverted/inverted_test.go
+++ b/pkg/index/inverted/inverted_test.go
@@ -65,6 +65,7 @@ func TestStore_Match(t *testing.T) {
Key: serviceName,
Term: []byte("org.apache.skywalking.examples.OrderService.order"),
}}, common.ItemID(3)))
+ s.(*store).flush()
tests := []struct {
matches []string
@@ -152,6 +153,7 @@ func TestStore_MatchTerm(t *testing.T) {
}()
tester.NoError(err)
testcases.SetUp(tester, s)
+ s.(*store).flush()
testcases.RunServiceName(t, s)
}
@@ -168,6 +170,7 @@ func TestStore_Iterator(t *testing.T) {
}()
tester.NoError(err)
data := testcases.SetUpDuration(tester, s)
+ s.(*store).flush()
testcases.RunDuration(t, data, s)
}