You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ha...@apache.org on 2023/04/18 21:19:34 UTC

[skywalking-banyandb] branch main updated: Fix TopN reload issue (#265)

This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new 65bbf7e7 Fix TopN reload issue (#265)
65bbf7e7 is described below

commit 65bbf7e7567ed55bb40e831d3194fb3618dcfeee
Author: Jiajing LU <lu...@gmail.com>
AuthorDate: Wed Apr 19 05:17:31 2023 +0800

    Fix TopN reload issue (#265)
    
    * Fix the deadlock when creating TopNAggregation's result measure.
---
 CHANGES.md                      |   1 +
 banyand/measure/measure.go      |  12 +++--
 banyand/measure/measure_topn.go |  67 --------------------------
 banyand/measure/metadata.go     | 103 ++++++++++++++++++++++++++++++++++++++--
 banyand/measure/service.go      |  63 ++++++++++++------------
 5 files changed, 139 insertions(+), 107 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index 27fdfb0b..2800744d 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -12,6 +12,7 @@ Release Notes.
 - Add a sharded buffer to TSDB to replace Badger's memtable. Badger KV only provides SST.
 - Add a meter system to control the internal metrics.
 - Add multiple metrics for measuring the storage subsystem.
+- Refactor callback of TopNAggregation schema event to avoid deadlock and reload issue.
 
 ### Chores
 
diff --git a/banyand/measure/measure.go b/banyand/measure/measure.go
index 1e1e7e2f..417766ee 100644
--- a/banyand/measure/measure.go
+++ b/banyand/measure/measure.go
@@ -26,7 +26,6 @@ import (
 
 	commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
 	databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
-	"github.com/apache/skywalking-banyandb/banyand/metadata"
 	"github.com/apache/skywalking-banyandb/banyand/queue"
 	"github.com/apache/skywalking-banyandb/banyand/tsdb"
 	"github.com/apache/skywalking-banyandb/banyand/tsdb/index"
@@ -59,7 +58,7 @@ type measure struct {
 	shardNum               uint32
 }
 
-func (s *measure) startSteamingManager(pipeline queue.Queue, repo metadata.Repo) error {
+func (s *measure) startSteamingManager(pipeline queue.Queue) error {
 	if len(s.topNAggregations) == 0 {
 		return nil
 	}
@@ -69,7 +68,6 @@ func (s *measure) startSteamingManager(pipeline queue.Queue, repo metadata.Repo)
 	s.processorManager = &topNProcessorManager{
 		l:            s.l,
 		pipeline:     pipeline,
-		repo:         repo,
 		m:            s,
 		s:            tagMapSpec,
 		topNSchemas:  s.topNAggregations,
@@ -122,7 +120,8 @@ type measureSpec struct {
 	topNAggregations []*databasev1.TopNAggregation
 }
 
-func openMeasure(shardNum uint32, db tsdb.Supplier, spec measureSpec, l *logger.Logger) (*measure, error) {
+func openMeasure(shardNum uint32, db tsdb.Supplier, spec measureSpec, l *logger.Logger, pipeline queue.Queue,
+) (*measure, error) {
 	m := &measure{
 		shardNum:         shardNum,
 		schema:           spec.schema,
@@ -143,5 +142,10 @@ func openMeasure(shardNum uint32, db tsdb.Supplier, spec measureSpec, l *logger.
 		IndexRules: spec.indexRules,
 	})
 
+	if startErr := m.startSteamingManager(pipeline); startErr != nil {
+		l.Err(startErr).Str("measure", spec.schema.GetMetadata().GetName()).
+			Msg("fail to start streaming manager")
+	}
+
 	return m, nil
 }
diff --git a/banyand/measure/measure_topn.go b/banyand/measure/measure_topn.go
index 1b5e6cd3..2fb5b00b 100644
--- a/banyand/measure/measure_topn.go
+++ b/banyand/measure/measure_topn.go
@@ -27,11 +27,9 @@ import (
 	"sync"
 	"time"
 
-	"github.com/google/go-cmp/cmp"
 	"github.com/pkg/errors"
 	"go.uber.org/multierr"
 	"golang.org/x/exp/slices"
-	"google.golang.org/protobuf/testing/protocmp"
 	"google.golang.org/protobuf/types/known/timestamppb"
 
 	"github.com/apache/skywalking-banyandb/api/common"
@@ -40,8 +38,6 @@ import (
 	databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
 	measurev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
 	modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
-	"github.com/apache/skywalking-banyandb/banyand/metadata"
-	"github.com/apache/skywalking-banyandb/banyand/metadata/schema"
 	"github.com/apache/skywalking-banyandb/banyand/queue"
 	"github.com/apache/skywalking-banyandb/banyand/tsdb"
 	"github.com/apache/skywalking-banyandb/pkg/bus"
@@ -294,7 +290,6 @@ func (t *topNStreamingProcessor) handleError() {
 type topNProcessorManager struct {
 	l            *logger.Logger
 	pipeline     queue.Queue
-	repo         metadata.Repo
 	m            *measure
 	s            logical.TagSpecRegistry
 	processorMap map[*commonv1.Metadata][]*topNStreamingProcessor
@@ -329,71 +324,9 @@ func (manager *topNProcessorManager) onMeasureWrite(request *measurev1.InternalW
 	}()
 }
 
-func (manager *topNProcessorManager) createOrUpdateTopNMeasure(topNSchema *databasev1.TopNAggregation) error {
-	m, err := manager.repo.MeasureRegistry().GetMeasure(context.TODO(), topNSchema.GetMetadata())
-	if err != nil && !errors.Is(err, schema.ErrGRPCResourceNotFound) {
-		return err
-	}
-
-	tagNames := manager.m.GetSchema().GetEntity().GetTagNames()
-	seriesSpecs := make([]*databasev1.TagSpec, 0, len(tagNames))
-
-	for _, tagName := range tagNames {
-		var found bool
-		for _, fSpec := range manager.m.GetSchema().GetTagFamilies() {
-			for _, tSpec := range fSpec.GetTags() {
-				if tSpec.GetName() == tagName {
-					seriesSpecs = append(seriesSpecs, tSpec)
-					found = true
-					goto CHECK
-				}
-			}
-		}
-
-	CHECK:
-		if !found {
-			return fmt.Errorf("fail to find tag spec %s", tagName)
-		}
-	}
-
-	// create a new "derived" measure for TopN result
-	newTopNMeasure := &databasev1.Measure{
-		Metadata: topNSchema.GetMetadata(),
-		Interval: manager.m.schema.GetInterval(),
-		TagFamilies: []*databasev1.TagFamilySpec{
-			{
-				Name: TopNTagFamily,
-				Tags: append([]*databasev1.TagSpec{
-					{
-						Name: "measure_id",
-						Type: databasev1.TagType_TAG_TYPE_ID,
-					},
-				}, seriesSpecs...),
-			},
-		},
-		Fields: []*databasev1.FieldSpec{TopNValueFieldSpec},
-	}
-	if m == nil {
-		return manager.repo.MeasureRegistry().CreateMeasure(context.Background(), newTopNMeasure)
-	}
-	// compare with the old one
-	if cmp.Diff(newTopNMeasure, m,
-		protocmp.IgnoreUnknown(),
-		protocmp.IgnoreFields(&databasev1.Measure{}, "updated_at"),
-		protocmp.IgnoreFields(&commonv1.Metadata{}, "id", "create_revision", "mod_revision"),
-		protocmp.Transform()) == "" {
-		return nil
-	}
-	// update
-	return manager.repo.MeasureRegistry().UpdateMeasure(context.Background(), newTopNMeasure)
-}
-
 func (manager *topNProcessorManager) start() error {
 	interval := manager.m.interval
 	for _, topNSchema := range manager.topNSchemas {
-		if createErr := manager.createOrUpdateTopNMeasure(topNSchema); createErr != nil {
-			return createErr
-		}
 		sortDirections := make([]modelv1.Sort, 0, 2)
 		if topNSchema.GetFieldValueSort() == modelv1.Sort_SORT_UNSPECIFIED {
 			sortDirections = append(sortDirections, modelv1.Sort_SORT_ASC, modelv1.Sort_SORT_DESC)
diff --git a/banyand/measure/metadata.go b/banyand/measure/metadata.go
index 2ed11cdd..b19a1bc1 100644
--- a/banyand/measure/metadata.go
+++ b/banyand/measure/metadata.go
@@ -19,9 +19,14 @@ package measure
 
 import (
 	"context"
+	"fmt"
 	"path"
 	"time"
 
+	"github.com/google/go-cmp/cmp"
+	"github.com/pkg/errors"
+	"google.golang.org/protobuf/testing/protocmp"
+
 	"github.com/apache/skywalking-banyandb/api/common"
 	"github.com/apache/skywalking-banyandb/api/event"
 	commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
@@ -29,6 +34,7 @@ import (
 	"github.com/apache/skywalking-banyandb/banyand/discovery"
 	"github.com/apache/skywalking-banyandb/banyand/metadata"
 	"github.com/apache/skywalking-banyandb/banyand/metadata/schema"
+	"github.com/apache/skywalking-banyandb/banyand/queue"
 	"github.com/apache/skywalking-banyandb/banyand/tsdb"
 	"github.com/apache/skywalking-banyandb/pkg/logger"
 	pb_v1 "github.com/apache/skywalking-banyandb/pkg/pb/v1/tsdb"
@@ -42,7 +48,7 @@ type schemaRepo struct {
 }
 
 func newSchemaRepo(path string, metadata metadata.Repo, repo discovery.ServiceRepo,
-	dbOpts tsdb.DatabaseOpts, l *logger.Logger,
+	dbOpts tsdb.DatabaseOpts, l *logger.Logger, pipeline queue.Queue,
 ) schemaRepo {
 	return schemaRepo{
 		l:        l,
@@ -51,7 +57,7 @@ func newSchemaRepo(path string, metadata metadata.Repo, repo discovery.ServiceRe
 			metadata,
 			repo,
 			l,
-			newSupplier(path, metadata, dbOpts, l),
+			newSupplier(path, metadata, dbOpts, l, pipeline),
 			event.MeasureTopicShardEvent,
 			event.MeasureTopicEntityEvent,
 		),
@@ -115,6 +121,13 @@ func (sr *schemaRepo) OnAddOrUpdate(metadata schema.Metadata) {
 			})
 		}
 	case schema.KindTopNAggregation:
+		// createOrUpdate TopN schemas in advance
+		_, err := createOrUpdateTopNMeasure(sr.metadata.MeasureRegistry(), metadata.Spec.(*databasev1.TopNAggregation))
+		if err != nil {
+			sr.l.Error().Err(err).Msg("fail to create/update topN measure")
+			return
+		}
+		// reload source measure
 		sr.SendMetadataEvent(resourceSchema.MetadataEvent{
 			Typ:      resourceSchema.EventAddOrUpdate,
 			Kind:     resourceSchema.EventKindResource,
@@ -124,6 +137,76 @@ func (sr *schemaRepo) OnAddOrUpdate(metadata schema.Metadata) {
 	}
 }
 
+func createOrUpdateTopNMeasure(measureSchemaRegistry schema.Measure, topNSchema *databasev1.TopNAggregation) (*databasev1.Measure, error) {
+	oldTopNSchema, err := measureSchemaRegistry.GetMeasure(context.TODO(), topNSchema.GetMetadata())
+	if err != nil && !errors.Is(err, schema.ErrGRPCResourceNotFound) {
+		return nil, err
+	}
+
+	sourceMeasureSchema, err := measureSchemaRegistry.GetMeasure(context.Background(), topNSchema.GetSourceMeasure())
+	if err != nil {
+		return nil, err
+	}
+
+	tagNames := sourceMeasureSchema.GetEntity().GetTagNames()
+	seriesSpecs := make([]*databasev1.TagSpec, 0, len(tagNames))
+
+	for _, tagName := range tagNames {
+		var found bool
+		for _, fSpec := range sourceMeasureSchema.GetTagFamilies() {
+			for _, tSpec := range fSpec.GetTags() {
+				if tSpec.GetName() == tagName {
+					seriesSpecs = append(seriesSpecs, tSpec)
+					found = true
+					goto CHECK
+				}
+			}
+		}
+
+	CHECK:
+		if !found {
+			return nil, fmt.Errorf("fail to find tag spec %s", tagName)
+		}
+	}
+
+	// create a new "derived" measure for TopN result
+	newTopNMeasure := &databasev1.Measure{
+		Metadata: topNSchema.GetMetadata(),
+		Interval: sourceMeasureSchema.GetInterval(),
+		TagFamilies: []*databasev1.TagFamilySpec{
+			{
+				Name: TopNTagFamily,
+				Tags: append([]*databasev1.TagSpec{
+					{
+						Name: "measure_id",
+						Type: databasev1.TagType_TAG_TYPE_ID,
+					},
+				}, seriesSpecs...),
+			},
+		},
+		Fields: []*databasev1.FieldSpec{TopNValueFieldSpec},
+	}
+	if oldTopNSchema == nil {
+		if innerErr := measureSchemaRegistry.CreateMeasure(context.Background(), newTopNMeasure); innerErr != nil {
+			return nil, innerErr
+		}
+		return newTopNMeasure, nil
+	}
+	// compare with the old one
+	if cmp.Diff(newTopNMeasure, oldTopNSchema,
+		protocmp.IgnoreUnknown(),
+		protocmp.IgnoreFields(&databasev1.Measure{}, "updated_at"),
+		protocmp.IgnoreFields(&commonv1.Metadata{}, "id", "create_revision", "mod_revision"),
+		protocmp.Transform()) == "" {
+		return oldTopNSchema, nil
+	}
+	// update
+	if err = measureSchemaRegistry.UpdateMeasure(context.Background(), newTopNMeasure); err != nil {
+		return nil, err
+	}
+	return newTopNMeasure, nil
+}
+
 func (sr *schemaRepo) OnDelete(metadata schema.Metadata) {
 	switch metadata.Kind {
 	case schema.KindGroup:
@@ -163,6 +246,11 @@ func (sr *schemaRepo) OnDelete(metadata schema.Metadata) {
 		}
 	case schema.KindIndexRule:
 	case schema.KindTopNAggregation:
+		err := sr.removeTopNMeasure(metadata.Spec.(*databasev1.TopNAggregation).GetSourceMeasure())
+		if err != nil {
+			sr.l.Error().Err(err).Msg("fail to remove topN measure")
+			return
+		}
 		// we should update instead of delete
 		sr.SendMetadataEvent(resourceSchema.MetadataEvent{
 			Typ:      resourceSchema.EventAddOrUpdate,
@@ -173,6 +261,11 @@ func (sr *schemaRepo) OnDelete(metadata schema.Metadata) {
 	}
 }
 
+func (sr *schemaRepo) removeTopNMeasure(metadata *commonv1.Metadata) error {
+	_, err := sr.metadata.MeasureRegistry().DeleteMeasure(context.Background(), metadata)
+	return err
+}
+
 func (sr *schemaRepo) loadMeasure(metadata *commonv1.Metadata) (*measure, bool) {
 	r, ok := sr.LoadResource(metadata)
 	if !ok {
@@ -186,17 +279,19 @@ var _ resourceSchema.ResourceSupplier = (*supplier)(nil)
 
 type supplier struct {
 	metadata metadata.Repo
+	pipeline queue.Queue
 	l        *logger.Logger
 	path     string
 	dbOpts   tsdb.DatabaseOpts
 }
 
-func newSupplier(path string, metadata metadata.Repo, dbOpts tsdb.DatabaseOpts, l *logger.Logger) *supplier {
+func newSupplier(path string, metadata metadata.Repo, dbOpts tsdb.DatabaseOpts, l *logger.Logger, pipeline queue.Queue) *supplier {
 	return &supplier{
 		path:     path,
 		dbOpts:   dbOpts,
 		metadata: metadata,
 		l:        l,
+		pipeline: pipeline,
 	}
 }
 
@@ -206,7 +301,7 @@ func (s *supplier) OpenResource(shardNum uint32, db tsdb.Supplier, spec resource
 		schema:           measureSchema,
 		indexRules:       spec.IndexRules,
 		topNAggregations: spec.Aggregations,
-	}, s.l)
+	}, s.l, s.pipeline)
 }
 
 func (s *supplier) ResourceSchema(md *commonv1.Metadata) (resourceSchema.ResourceSchema, error) {
diff --git a/banyand/measure/service.go b/banyand/measure/service.go
index 15d1bdea..46eacd30 100644
--- a/banyand/measure/service.go
+++ b/banyand/measure/service.go
@@ -23,9 +23,11 @@ import (
 	"time"
 
 	"github.com/pkg/errors"
+	"go.uber.org/multierr"
 
 	"github.com/apache/skywalking-banyandb/api/data"
 	commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+	databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
 	"github.com/apache/skywalking-banyandb/banyand/discovery"
 	"github.com/apache/skywalking-banyandb/banyand/metadata"
 	"github.com/apache/skywalking-banyandb/banyand/metadata/schema"
@@ -105,7 +107,7 @@ func (s *service) PreRun() error {
 	if err != nil {
 		return err
 	}
-	s.schemaRepo = newSchemaRepo(path.Join(s.root, s.Name()), s.metadata, s.repo, s.dbOpts, s.l)
+	s.schemaRepo = newSchemaRepo(path.Join(s.root, s.Name()), s.metadata, s.repo, s.dbOpts, s.l, s.pipeline)
 	for _, g := range groups {
 		if g.Catalog != commonv1.Catalog_CATALOG_MEASURE {
 			continue
@@ -122,6 +124,11 @@ func (s *service) PreRun() error {
 			return innerErr
 		}
 		for _, measureSchema := range allMeasureSchemas {
+			// sanity check before calling StoreResource
+			// since StoreResource may be called inside the event loop
+			if checkErr := s.sanityCheck(gp, measureSchema); checkErr != nil {
+				return checkErr
+			}
 			if _, innerErr := gp.StoreResource(measureSchema); innerErr != nil {
 				return innerErr
 			}
@@ -136,6 +143,29 @@ func (s *service) PreRun() error {
 	return nil
 }
 
+func (s *service) sanityCheck(group resourceSchema.Group, measureSchema *databasev1.Measure) error {
+	var topNAggrs []*databasev1.TopNAggregation
+	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
+	defer cancel()
+	topNAggrs, err := s.metadata.MeasureRegistry().TopNAggregations(ctx, measureSchema.GetMetadata())
+	if err != nil || len(topNAggrs) == 0 {
+		return err
+	}
+
+	for _, topNAggr := range topNAggrs {
+		topNMeasure, innerErr := createOrUpdateTopNMeasure(s.metadata.MeasureRegistry(), topNAggr)
+		err = multierr.Append(err, innerErr)
+		if topNMeasure != nil {
+			_, storeErr := group.StoreResource(topNMeasure)
+			if storeErr != nil {
+				err = multierr.Append(err, storeErr)
+			}
+		}
+	}
+
+	return err
+}
+
 func (s *service) Serve() run.StopNotify {
 	_ = s.schemaRepo.NotifyAll()
 	// run a serial watcher
@@ -145,37 +175,6 @@ func (s *service) Serve() run.StopNotify {
 		RegisterHandler(schema.KindGroup|schema.KindMeasure|schema.KindIndexRuleBinding|schema.KindIndexRule|schema.KindTopNAggregation,
 			&s.schemaRepo)
 
-	// start TopN manager after registering handlers
-	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
-	groups, err := s.metadata.GroupRegistry().ListGroup(ctx)
-	cancel()
-
-	if err != nil {
-		s.l.Err(err).Msg("fail to list groups")
-		return s.stopCh
-	}
-
-	for _, g := range groups {
-		if g.Catalog != commonv1.Catalog_CATALOG_MEASURE {
-			continue
-		}
-		ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
-		allMeasureSchemas, listErr := s.metadata.MeasureRegistry().
-			ListMeasure(ctx, schema.ListOpt{Group: g.GetMetadata().GetName()})
-		cancel()
-		if listErr != nil {
-			s.l.Err(listErr).Str("group", g.GetMetadata().GetName()).Msg("fail to list measures in the group")
-			continue
-		}
-		for _, measureSchema := range allMeasureSchemas {
-			if res, ok := s.schemaRepo.LoadResource(measureSchema.GetMetadata()); ok {
-				if startErr := res.(*measure).startSteamingManager(s.pipeline, s.metadata); startErr != nil {
-					s.l.Err(startErr).Str("measure", measureSchema.GetMetadata().GetName()).Msg("fail to start streaming manager")
-				}
-			}
-		}
-	}
-
 	return s.stopCh
 }