You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ha...@apache.org on 2023/04/18 21:19:34 UTC
[skywalking-banyandb] branch main updated: Fix TopN reload issue (#265)
This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new 65bbf7e7 Fix TopN reload issue (#265)
65bbf7e7 is described below
commit 65bbf7e7567ed55bb40e831d3194fb3618dcfeee
Author: Jiajing LU <lu...@gmail.com>
AuthorDate: Wed Apr 19 05:17:31 2023 +0800
Fix TopN reload issue (#265)
* Fix the deadlock when creating TopNAggregation's result measure.
---
CHANGES.md | 1 +
banyand/measure/measure.go | 12 +++--
banyand/measure/measure_topn.go | 67 --------------------------
banyand/measure/metadata.go | 103 ++++++++++++++++++++++++++++++++++++++--
banyand/measure/service.go | 63 ++++++++++++------------
5 files changed, 139 insertions(+), 107 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index 27fdfb0b..2800744d 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -12,6 +12,7 @@ Release Notes.
- Add a sharded buffer to TSDB to replace Badger's memtable. Badger KV only provides SST.
- Add a meter system to control the internal metrics.
- Add multiple metrics for measuring the storage subsystem.
+- Refactor callback of TopNAggregation schema event to avoid deadlock and reload issue.
### Chores
diff --git a/banyand/measure/measure.go b/banyand/measure/measure.go
index 1e1e7e2f..417766ee 100644
--- a/banyand/measure/measure.go
+++ b/banyand/measure/measure.go
@@ -26,7 +26,6 @@ import (
commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
- "github.com/apache/skywalking-banyandb/banyand/metadata"
"github.com/apache/skywalking-banyandb/banyand/queue"
"github.com/apache/skywalking-banyandb/banyand/tsdb"
"github.com/apache/skywalking-banyandb/banyand/tsdb/index"
@@ -59,7 +58,7 @@ type measure struct {
shardNum uint32
}
-func (s *measure) startSteamingManager(pipeline queue.Queue, repo metadata.Repo) error {
+func (s *measure) startSteamingManager(pipeline queue.Queue) error {
if len(s.topNAggregations) == 0 {
return nil
}
@@ -69,7 +68,6 @@ func (s *measure) startSteamingManager(pipeline queue.Queue, repo metadata.Repo)
s.processorManager = &topNProcessorManager{
l: s.l,
pipeline: pipeline,
- repo: repo,
m: s,
s: tagMapSpec,
topNSchemas: s.topNAggregations,
@@ -122,7 +120,8 @@ type measureSpec struct {
topNAggregations []*databasev1.TopNAggregation
}
-func openMeasure(shardNum uint32, db tsdb.Supplier, spec measureSpec, l *logger.Logger) (*measure, error) {
+func openMeasure(shardNum uint32, db tsdb.Supplier, spec measureSpec, l *logger.Logger, pipeline queue.Queue,
+) (*measure, error) {
m := &measure{
shardNum: shardNum,
schema: spec.schema,
@@ -143,5 +142,10 @@ func openMeasure(shardNum uint32, db tsdb.Supplier, spec measureSpec, l *logger.
IndexRules: spec.indexRules,
})
+ if startErr := m.startSteamingManager(pipeline); startErr != nil {
+ l.Err(startErr).Str("measure", spec.schema.GetMetadata().GetName()).
+ Msg("fail to start streaming manager")
+ }
+
return m, nil
}
diff --git a/banyand/measure/measure_topn.go b/banyand/measure/measure_topn.go
index 1b5e6cd3..2fb5b00b 100644
--- a/banyand/measure/measure_topn.go
+++ b/banyand/measure/measure_topn.go
@@ -27,11 +27,9 @@ import (
"sync"
"time"
- "github.com/google/go-cmp/cmp"
"github.com/pkg/errors"
"go.uber.org/multierr"
"golang.org/x/exp/slices"
- "google.golang.org/protobuf/testing/protocmp"
"google.golang.org/protobuf/types/known/timestamppb"
"github.com/apache/skywalking-banyandb/api/common"
@@ -40,8 +38,6 @@ import (
databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
measurev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
- "github.com/apache/skywalking-banyandb/banyand/metadata"
- "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
"github.com/apache/skywalking-banyandb/banyand/queue"
"github.com/apache/skywalking-banyandb/banyand/tsdb"
"github.com/apache/skywalking-banyandb/pkg/bus"
@@ -294,7 +290,6 @@ func (t *topNStreamingProcessor) handleError() {
type topNProcessorManager struct {
l *logger.Logger
pipeline queue.Queue
- repo metadata.Repo
m *measure
s logical.TagSpecRegistry
processorMap map[*commonv1.Metadata][]*topNStreamingProcessor
@@ -329,71 +324,9 @@ func (manager *topNProcessorManager) onMeasureWrite(request *measurev1.InternalW
}()
}
-func (manager *topNProcessorManager) createOrUpdateTopNMeasure(topNSchema *databasev1.TopNAggregation) error {
- m, err := manager.repo.MeasureRegistry().GetMeasure(context.TODO(), topNSchema.GetMetadata())
- if err != nil && !errors.Is(err, schema.ErrGRPCResourceNotFound) {
- return err
- }
-
- tagNames := manager.m.GetSchema().GetEntity().GetTagNames()
- seriesSpecs := make([]*databasev1.TagSpec, 0, len(tagNames))
-
- for _, tagName := range tagNames {
- var found bool
- for _, fSpec := range manager.m.GetSchema().GetTagFamilies() {
- for _, tSpec := range fSpec.GetTags() {
- if tSpec.GetName() == tagName {
- seriesSpecs = append(seriesSpecs, tSpec)
- found = true
- goto CHECK
- }
- }
- }
-
- CHECK:
- if !found {
- return fmt.Errorf("fail to find tag spec %s", tagName)
- }
- }
-
- // create a new "derived" measure for TopN result
- newTopNMeasure := &databasev1.Measure{
- Metadata: topNSchema.GetMetadata(),
- Interval: manager.m.schema.GetInterval(),
- TagFamilies: []*databasev1.TagFamilySpec{
- {
- Name: TopNTagFamily,
- Tags: append([]*databasev1.TagSpec{
- {
- Name: "measure_id",
- Type: databasev1.TagType_TAG_TYPE_ID,
- },
- }, seriesSpecs...),
- },
- },
- Fields: []*databasev1.FieldSpec{TopNValueFieldSpec},
- }
- if m == nil {
- return manager.repo.MeasureRegistry().CreateMeasure(context.Background(), newTopNMeasure)
- }
- // compare with the old one
- if cmp.Diff(newTopNMeasure, m,
- protocmp.IgnoreUnknown(),
- protocmp.IgnoreFields(&databasev1.Measure{}, "updated_at"),
- protocmp.IgnoreFields(&commonv1.Metadata{}, "id", "create_revision", "mod_revision"),
- protocmp.Transform()) == "" {
- return nil
- }
- // update
- return manager.repo.MeasureRegistry().UpdateMeasure(context.Background(), newTopNMeasure)
-}
-
func (manager *topNProcessorManager) start() error {
interval := manager.m.interval
for _, topNSchema := range manager.topNSchemas {
- if createErr := manager.createOrUpdateTopNMeasure(topNSchema); createErr != nil {
- return createErr
- }
sortDirections := make([]modelv1.Sort, 0, 2)
if topNSchema.GetFieldValueSort() == modelv1.Sort_SORT_UNSPECIFIED {
sortDirections = append(sortDirections, modelv1.Sort_SORT_ASC, modelv1.Sort_SORT_DESC)
diff --git a/banyand/measure/metadata.go b/banyand/measure/metadata.go
index 2ed11cdd..b19a1bc1 100644
--- a/banyand/measure/metadata.go
+++ b/banyand/measure/metadata.go
@@ -19,9 +19,14 @@ package measure
import (
"context"
+ "fmt"
"path"
"time"
+ "github.com/google/go-cmp/cmp"
+ "github.com/pkg/errors"
+ "google.golang.org/protobuf/testing/protocmp"
+
"github.com/apache/skywalking-banyandb/api/common"
"github.com/apache/skywalking-banyandb/api/event"
commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
@@ -29,6 +34,7 @@ import (
"github.com/apache/skywalking-banyandb/banyand/discovery"
"github.com/apache/skywalking-banyandb/banyand/metadata"
"github.com/apache/skywalking-banyandb/banyand/metadata/schema"
+ "github.com/apache/skywalking-banyandb/banyand/queue"
"github.com/apache/skywalking-banyandb/banyand/tsdb"
"github.com/apache/skywalking-banyandb/pkg/logger"
pb_v1 "github.com/apache/skywalking-banyandb/pkg/pb/v1/tsdb"
@@ -42,7 +48,7 @@ type schemaRepo struct {
}
func newSchemaRepo(path string, metadata metadata.Repo, repo discovery.ServiceRepo,
- dbOpts tsdb.DatabaseOpts, l *logger.Logger,
+ dbOpts tsdb.DatabaseOpts, l *logger.Logger, pipeline queue.Queue,
) schemaRepo {
return schemaRepo{
l: l,
@@ -51,7 +57,7 @@ func newSchemaRepo(path string, metadata metadata.Repo, repo discovery.ServiceRe
metadata,
repo,
l,
- newSupplier(path, metadata, dbOpts, l),
+ newSupplier(path, metadata, dbOpts, l, pipeline),
event.MeasureTopicShardEvent,
event.MeasureTopicEntityEvent,
),
@@ -115,6 +121,13 @@ func (sr *schemaRepo) OnAddOrUpdate(metadata schema.Metadata) {
})
}
case schema.KindTopNAggregation:
+ // createOrUpdate TopN schemas in advance
+ _, err := createOrUpdateTopNMeasure(sr.metadata.MeasureRegistry(), metadata.Spec.(*databasev1.TopNAggregation))
+ if err != nil {
+ sr.l.Error().Err(err).Msg("fail to create/update topN measure")
+ return
+ }
+ // reload source measure
sr.SendMetadataEvent(resourceSchema.MetadataEvent{
Typ: resourceSchema.EventAddOrUpdate,
Kind: resourceSchema.EventKindResource,
@@ -124,6 +137,76 @@ func (sr *schemaRepo) OnAddOrUpdate(metadata schema.Metadata) {
}
}
+func createOrUpdateTopNMeasure(measureSchemaRegistry schema.Measure, topNSchema *databasev1.TopNAggregation) (*databasev1.Measure, error) {
+ oldTopNSchema, err := measureSchemaRegistry.GetMeasure(context.TODO(), topNSchema.GetMetadata())
+ if err != nil && !errors.Is(err, schema.ErrGRPCResourceNotFound) {
+ return nil, err
+ }
+
+ sourceMeasureSchema, err := measureSchemaRegistry.GetMeasure(context.Background(), topNSchema.GetSourceMeasure())
+ if err != nil {
+ return nil, err
+ }
+
+ tagNames := sourceMeasureSchema.GetEntity().GetTagNames()
+ seriesSpecs := make([]*databasev1.TagSpec, 0, len(tagNames))
+
+ for _, tagName := range tagNames {
+ var found bool
+ for _, fSpec := range sourceMeasureSchema.GetTagFamilies() {
+ for _, tSpec := range fSpec.GetTags() {
+ if tSpec.GetName() == tagName {
+ seriesSpecs = append(seriesSpecs, tSpec)
+ found = true
+ goto CHECK
+ }
+ }
+ }
+
+ CHECK:
+ if !found {
+ return nil, fmt.Errorf("fail to find tag spec %s", tagName)
+ }
+ }
+
+ // create a new "derived" measure for TopN result
+ newTopNMeasure := &databasev1.Measure{
+ Metadata: topNSchema.GetMetadata(),
+ Interval: sourceMeasureSchema.GetInterval(),
+ TagFamilies: []*databasev1.TagFamilySpec{
+ {
+ Name: TopNTagFamily,
+ Tags: append([]*databasev1.TagSpec{
+ {
+ Name: "measure_id",
+ Type: databasev1.TagType_TAG_TYPE_ID,
+ },
+ }, seriesSpecs...),
+ },
+ },
+ Fields: []*databasev1.FieldSpec{TopNValueFieldSpec},
+ }
+ if oldTopNSchema == nil {
+ if innerErr := measureSchemaRegistry.CreateMeasure(context.Background(), newTopNMeasure); innerErr != nil {
+ return nil, innerErr
+ }
+ return newTopNMeasure, nil
+ }
+ // compare with the old one
+ if cmp.Diff(newTopNMeasure, oldTopNSchema,
+ protocmp.IgnoreUnknown(),
+ protocmp.IgnoreFields(&databasev1.Measure{}, "updated_at"),
+ protocmp.IgnoreFields(&commonv1.Metadata{}, "id", "create_revision", "mod_revision"),
+ protocmp.Transform()) == "" {
+ return oldTopNSchema, nil
+ }
+ // update
+ if err = measureSchemaRegistry.UpdateMeasure(context.Background(), newTopNMeasure); err != nil {
+ return nil, err
+ }
+ return newTopNMeasure, nil
+}
+
func (sr *schemaRepo) OnDelete(metadata schema.Metadata) {
switch metadata.Kind {
case schema.KindGroup:
@@ -163,6 +246,11 @@ func (sr *schemaRepo) OnDelete(metadata schema.Metadata) {
}
case schema.KindIndexRule:
case schema.KindTopNAggregation:
+ err := sr.removeTopNMeasure(metadata.Spec.(*databasev1.TopNAggregation).GetSourceMeasure())
+ if err != nil {
+ sr.l.Error().Err(err).Msg("fail to remove topN measure")
+ return
+ }
// we should update instead of delete
sr.SendMetadataEvent(resourceSchema.MetadataEvent{
Typ: resourceSchema.EventAddOrUpdate,
@@ -173,6 +261,11 @@ func (sr *schemaRepo) OnDelete(metadata schema.Metadata) {
}
}
+func (sr *schemaRepo) removeTopNMeasure(metadata *commonv1.Metadata) error {
+ _, err := sr.metadata.MeasureRegistry().DeleteMeasure(context.Background(), metadata)
+ return err
+}
+
func (sr *schemaRepo) loadMeasure(metadata *commonv1.Metadata) (*measure, bool) {
r, ok := sr.LoadResource(metadata)
if !ok {
@@ -186,17 +279,19 @@ var _ resourceSchema.ResourceSupplier = (*supplier)(nil)
type supplier struct {
metadata metadata.Repo
+ pipeline queue.Queue
l *logger.Logger
path string
dbOpts tsdb.DatabaseOpts
}
-func newSupplier(path string, metadata metadata.Repo, dbOpts tsdb.DatabaseOpts, l *logger.Logger) *supplier {
+func newSupplier(path string, metadata metadata.Repo, dbOpts tsdb.DatabaseOpts, l *logger.Logger, pipeline queue.Queue) *supplier {
return &supplier{
path: path,
dbOpts: dbOpts,
metadata: metadata,
l: l,
+ pipeline: pipeline,
}
}
@@ -206,7 +301,7 @@ func (s *supplier) OpenResource(shardNum uint32, db tsdb.Supplier, spec resource
schema: measureSchema,
indexRules: spec.IndexRules,
topNAggregations: spec.Aggregations,
- }, s.l)
+ }, s.l, s.pipeline)
}
func (s *supplier) ResourceSchema(md *commonv1.Metadata) (resourceSchema.ResourceSchema, error) {
diff --git a/banyand/measure/service.go b/banyand/measure/service.go
index 15d1bdea..46eacd30 100644
--- a/banyand/measure/service.go
+++ b/banyand/measure/service.go
@@ -23,9 +23,11 @@ import (
"time"
"github.com/pkg/errors"
+ "go.uber.org/multierr"
"github.com/apache/skywalking-banyandb/api/data"
commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+ databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
"github.com/apache/skywalking-banyandb/banyand/discovery"
"github.com/apache/skywalking-banyandb/banyand/metadata"
"github.com/apache/skywalking-banyandb/banyand/metadata/schema"
@@ -105,7 +107,7 @@ func (s *service) PreRun() error {
if err != nil {
return err
}
- s.schemaRepo = newSchemaRepo(path.Join(s.root, s.Name()), s.metadata, s.repo, s.dbOpts, s.l)
+ s.schemaRepo = newSchemaRepo(path.Join(s.root, s.Name()), s.metadata, s.repo, s.dbOpts, s.l, s.pipeline)
for _, g := range groups {
if g.Catalog != commonv1.Catalog_CATALOG_MEASURE {
continue
@@ -122,6 +124,11 @@ func (s *service) PreRun() error {
return innerErr
}
for _, measureSchema := range allMeasureSchemas {
+ // sanity check before calling StoreResource
+ // since StoreResource may be called inside the event loop
+ if checkErr := s.sanityCheck(gp, measureSchema); checkErr != nil {
+ return checkErr
+ }
if _, innerErr := gp.StoreResource(measureSchema); innerErr != nil {
return innerErr
}
@@ -136,6 +143,29 @@ func (s *service) PreRun() error {
return nil
}
+func (s *service) sanityCheck(group resourceSchema.Group, measureSchema *databasev1.Measure) error {
+ var topNAggrs []*databasev1.TopNAggregation
+ ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
+ defer cancel()
+ topNAggrs, err := s.metadata.MeasureRegistry().TopNAggregations(ctx, measureSchema.GetMetadata())
+ if err != nil || len(topNAggrs) == 0 {
+ return err
+ }
+
+ for _, topNAggr := range topNAggrs {
+ topNMeasure, innerErr := createOrUpdateTopNMeasure(s.metadata.MeasureRegistry(), topNAggr)
+ err = multierr.Append(err, innerErr)
+ if topNMeasure != nil {
+ _, storeErr := group.StoreResource(topNMeasure)
+ if storeErr != nil {
+ err = multierr.Append(err, storeErr)
+ }
+ }
+ }
+
+ return err
+}
+
func (s *service) Serve() run.StopNotify {
_ = s.schemaRepo.NotifyAll()
// run a serial watcher
@@ -145,37 +175,6 @@ func (s *service) Serve() run.StopNotify {
RegisterHandler(schema.KindGroup|schema.KindMeasure|schema.KindIndexRuleBinding|schema.KindIndexRule|schema.KindTopNAggregation,
&s.schemaRepo)
- // start TopN manager after registering handlers
- ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
- groups, err := s.metadata.GroupRegistry().ListGroup(ctx)
- cancel()
-
- if err != nil {
- s.l.Err(err).Msg("fail to list groups")
- return s.stopCh
- }
-
- for _, g := range groups {
- if g.Catalog != commonv1.Catalog_CATALOG_MEASURE {
- continue
- }
- ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
- allMeasureSchemas, listErr := s.metadata.MeasureRegistry().
- ListMeasure(ctx, schema.ListOpt{Group: g.GetMetadata().GetName()})
- cancel()
- if listErr != nil {
- s.l.Err(listErr).Str("group", g.GetMetadata().GetName()).Msg("fail to list measures in the group")
- continue
- }
- for _, measureSchema := range allMeasureSchemas {
- if res, ok := s.schemaRepo.LoadResource(measureSchema.GetMetadata()); ok {
- if startErr := res.(*measure).startSteamingManager(s.pipeline, s.metadata); startErr != nil {
- s.l.Err(startErr).Str("measure", measureSchema.GetMetadata().GetName()).Msg("fail to start streaming manager")
- }
- }
- }
- }
-
return s.stopCh
}