You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by lu...@apache.org on 2023/04/16 07:32:41 UTC

[skywalking-banyandb] 01/01: reload topn

This is an automated email from the ASF dual-hosted git repository.

lujiajing pushed a commit to branch feature/reload-topn
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit 5220c122742c2ff7ff8b66f2aa07e4e724dec306
Author: Megrez Lu <lu...@gmail.com>
AuthorDate: Sun Apr 16 15:32:27 2023 +0800

    reload topn
---
 banyand/measure/measure.go  |  8 +++++++-
 banyand/measure/metadata.go | 11 +++++++----
 banyand/measure/service.go  | 33 +--------------------------------
 3 files changed, 15 insertions(+), 37 deletions(-)

diff --git a/banyand/measure/measure.go b/banyand/measure/measure.go
index 1e1e7e2f..6d4a83c4 100644
--- a/banyand/measure/measure.go
+++ b/banyand/measure/measure.go
@@ -122,7 +122,8 @@ type measureSpec struct {
 	topNAggregations []*databasev1.TopNAggregation
 }
 
-func openMeasure(shardNum uint32, db tsdb.Supplier, spec measureSpec, l *logger.Logger) (*measure, error) {
+func openMeasure(shardNum uint32, db tsdb.Supplier, spec measureSpec, l *logger.Logger, repo metadata.Repo,
+	pipeline queue.Queue) (*measure, error) {
 	m := &measure{
 		shardNum:         shardNum,
 		schema:           spec.schema,
@@ -143,5 +144,10 @@ func openMeasure(shardNum uint32, db tsdb.Supplier, spec measureSpec, l *logger.
 		IndexRules: spec.indexRules,
 	})
 
+	if startErr := m.startSteamingManager(pipeline, repo); startErr != nil {
+		l.Err(startErr).Str("measure", spec.schema.GetMetadata().GetName()).
+			Msg("fail to start streaming manager")
+	}
+
 	return m, nil
 }
diff --git a/banyand/measure/metadata.go b/banyand/measure/metadata.go
index aa16588e..a0691c0a 100644
--- a/banyand/measure/metadata.go
+++ b/banyand/measure/metadata.go
@@ -19,6 +19,7 @@ package measure
 
 import (
 	"context"
+	"github.com/apache/skywalking-banyandb/banyand/queue"
 	"path"
 	"time"
 
@@ -42,7 +43,7 @@ type schemaRepo struct {
 }
 
 func newSchemaRepo(path string, metadata metadata.Repo, repo discovery.ServiceRepo,
-	dbOpts tsdb.DatabaseOpts, l *logger.Logger,
+	dbOpts tsdb.DatabaseOpts, l *logger.Logger, pipeline queue.Queue,
 ) schemaRepo {
 	return schemaRepo{
 		l:        l,
@@ -51,7 +52,7 @@ func newSchemaRepo(path string, metadata metadata.Repo, repo discovery.ServiceRe
 			metadata,
 			repo,
 			l,
-			newSupplier(path, metadata, dbOpts, l),
+			newSupplier(path, metadata, dbOpts, l, pipeline),
 			event.MeasureTopicShardEvent,
 			event.MeasureTopicEntityEvent,
 		),
@@ -189,14 +190,16 @@ type supplier struct {
 	l        *logger.Logger
 	path     string
 	dbOpts   tsdb.DatabaseOpts
+	pipeline queue.Queue
 }
 
-func newSupplier(path string, metadata metadata.Repo, dbOpts tsdb.DatabaseOpts, l *logger.Logger) *supplier {
+func newSupplier(path string, metadata metadata.Repo, dbOpts tsdb.DatabaseOpts, l *logger.Logger, pipeline queue.Queue) *supplier {
 	return &supplier{
 		path:     path,
 		dbOpts:   dbOpts,
 		metadata: metadata,
 		l:        l,
+		pipeline: pipeline,
 	}
 }
 
@@ -206,7 +209,7 @@ func (s *supplier) OpenResource(shardNum uint32, db tsdb.Supplier, spec resource
 		schema:           measureSchema,
 		indexRules:       spec.IndexRules,
 		topNAggregations: spec.Aggregations,
-	}, s.l)
+	}, s.l, s.metadata, s.pipeline)
 }
 
 func (s *supplier) ResourceSchema(md *commonv1.Metadata) (resourceSchema.ResourceSchema, error) {
diff --git a/banyand/measure/service.go b/banyand/measure/service.go
index 15d1bdea..3fe10099 100644
--- a/banyand/measure/service.go
+++ b/banyand/measure/service.go
@@ -105,7 +105,7 @@ func (s *service) PreRun() error {
 	if err != nil {
 		return err
 	}
-	s.schemaRepo = newSchemaRepo(path.Join(s.root, s.Name()), s.metadata, s.repo, s.dbOpts, s.l)
+	s.schemaRepo = newSchemaRepo(path.Join(s.root, s.Name()), s.metadata, s.repo, s.dbOpts, s.l, s.pipeline)
 	for _, g := range groups {
 		if g.Catalog != commonv1.Catalog_CATALOG_MEASURE {
 			continue
@@ -145,37 +145,6 @@ func (s *service) Serve() run.StopNotify {
 		RegisterHandler(schema.KindGroup|schema.KindMeasure|schema.KindIndexRuleBinding|schema.KindIndexRule|schema.KindTopNAggregation,
 			&s.schemaRepo)
 
-	// start TopN manager after registering handlers
-	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
-	groups, err := s.metadata.GroupRegistry().ListGroup(ctx)
-	cancel()
-
-	if err != nil {
-		s.l.Err(err).Msg("fail to list groups")
-		return s.stopCh
-	}
-
-	for _, g := range groups {
-		if g.Catalog != commonv1.Catalog_CATALOG_MEASURE {
-			continue
-		}
-		ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
-		allMeasureSchemas, listErr := s.metadata.MeasureRegistry().
-			ListMeasure(ctx, schema.ListOpt{Group: g.GetMetadata().GetName()})
-		cancel()
-		if listErr != nil {
-			s.l.Err(listErr).Str("group", g.GetMetadata().GetName()).Msg("fail to list measures in the group")
-			continue
-		}
-		for _, measureSchema := range allMeasureSchemas {
-			if res, ok := s.schemaRepo.LoadResource(measureSchema.GetMetadata()); ok {
-				if startErr := res.(*measure).startSteamingManager(s.pipeline, s.metadata); startErr != nil {
-					s.l.Err(startErr).Str("measure", measureSchema.GetMetadata().GetName()).Msg("fail to start streaming manager")
-				}
-			}
-		}
-	}
-
 	return s.stopCh
 }