You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by lu...@apache.org on 2023/04/16 07:32:41 UTC
[skywalking-banyandb] 01/01: reload topn
This is an automated email from the ASF dual-hosted git repository.
lujiajing pushed a commit to branch feature/reload-topn
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 5220c122742c2ff7ff8b66f2aa07e4e724dec306
Author: Megrez Lu <lu...@gmail.com>
AuthorDate: Sun Apr 16 15:32:27 2023 +0800
reload topn
---
banyand/measure/measure.go | 8 +++++++-
banyand/measure/metadata.go | 11 +++++++----
banyand/measure/service.go | 33 +--------------------------------
3 files changed, 15 insertions(+), 37 deletions(-)
diff --git a/banyand/measure/measure.go b/banyand/measure/measure.go
index 1e1e7e2f..6d4a83c4 100644
--- a/banyand/measure/measure.go
+++ b/banyand/measure/measure.go
@@ -122,7 +122,8 @@ type measureSpec struct {
topNAggregations []*databasev1.TopNAggregation
}
-func openMeasure(shardNum uint32, db tsdb.Supplier, spec measureSpec, l *logger.Logger) (*measure, error) {
+func openMeasure(shardNum uint32, db tsdb.Supplier, spec measureSpec, l *logger.Logger, repo metadata.Repo,
+ pipeline queue.Queue) (*measure, error) {
m := &measure{
shardNum: shardNum,
schema: spec.schema,
@@ -143,5 +144,10 @@ func openMeasure(shardNum uint32, db tsdb.Supplier, spec measureSpec, l *logger.
IndexRules: spec.indexRules,
})
+ if startErr := m.startSteamingManager(pipeline, repo); startErr != nil {
+ l.Err(startErr).Str("measure", spec.schema.GetMetadata().GetName()).
+ Msg("fail to start streaming manager")
+ }
+
return m, nil
}
diff --git a/banyand/measure/metadata.go b/banyand/measure/metadata.go
index aa16588e..a0691c0a 100644
--- a/banyand/measure/metadata.go
+++ b/banyand/measure/metadata.go
@@ -19,6 +19,7 @@ package measure
import (
"context"
+ "github.com/apache/skywalking-banyandb/banyand/queue"
"path"
"time"
@@ -42,7 +43,7 @@ type schemaRepo struct {
}
func newSchemaRepo(path string, metadata metadata.Repo, repo discovery.ServiceRepo,
- dbOpts tsdb.DatabaseOpts, l *logger.Logger,
+ dbOpts tsdb.DatabaseOpts, l *logger.Logger, pipeline queue.Queue,
) schemaRepo {
return schemaRepo{
l: l,
@@ -51,7 +52,7 @@ func newSchemaRepo(path string, metadata metadata.Repo, repo discovery.ServiceRe
metadata,
repo,
l,
- newSupplier(path, metadata, dbOpts, l),
+ newSupplier(path, metadata, dbOpts, l, pipeline),
event.MeasureTopicShardEvent,
event.MeasureTopicEntityEvent,
),
@@ -189,14 +190,16 @@ type supplier struct {
l *logger.Logger
path string
dbOpts tsdb.DatabaseOpts
+ pipeline queue.Queue
}
-func newSupplier(path string, metadata metadata.Repo, dbOpts tsdb.DatabaseOpts, l *logger.Logger) *supplier {
+func newSupplier(path string, metadata metadata.Repo, dbOpts tsdb.DatabaseOpts, l *logger.Logger, pipeline queue.Queue) *supplier {
return &supplier{
path: path,
dbOpts: dbOpts,
metadata: metadata,
l: l,
+ pipeline: pipeline,
}
}
@@ -206,7 +209,7 @@ func (s *supplier) OpenResource(shardNum uint32, db tsdb.Supplier, spec resource
schema: measureSchema,
indexRules: spec.IndexRules,
topNAggregations: spec.Aggregations,
- }, s.l)
+ }, s.l, s.metadata, s.pipeline)
}
func (s *supplier) ResourceSchema(md *commonv1.Metadata) (resourceSchema.ResourceSchema, error) {
diff --git a/banyand/measure/service.go b/banyand/measure/service.go
index 15d1bdea..3fe10099 100644
--- a/banyand/measure/service.go
+++ b/banyand/measure/service.go
@@ -105,7 +105,7 @@ func (s *service) PreRun() error {
if err != nil {
return err
}
- s.schemaRepo = newSchemaRepo(path.Join(s.root, s.Name()), s.metadata, s.repo, s.dbOpts, s.l)
+ s.schemaRepo = newSchemaRepo(path.Join(s.root, s.Name()), s.metadata, s.repo, s.dbOpts, s.l, s.pipeline)
for _, g := range groups {
if g.Catalog != commonv1.Catalog_CATALOG_MEASURE {
continue
@@ -145,37 +145,6 @@ func (s *service) Serve() run.StopNotify {
RegisterHandler(schema.KindGroup|schema.KindMeasure|schema.KindIndexRuleBinding|schema.KindIndexRule|schema.KindTopNAggregation,
&s.schemaRepo)
- // start TopN manager after registering handlers
- ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
- groups, err := s.metadata.GroupRegistry().ListGroup(ctx)
- cancel()
-
- if err != nil {
- s.l.Err(err).Msg("fail to list groups")
- return s.stopCh
- }
-
- for _, g := range groups {
- if g.Catalog != commonv1.Catalog_CATALOG_MEASURE {
- continue
- }
- ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
- allMeasureSchemas, listErr := s.metadata.MeasureRegistry().
- ListMeasure(ctx, schema.ListOpt{Group: g.GetMetadata().GetName()})
- cancel()
- if listErr != nil {
- s.l.Err(listErr).Str("group", g.GetMetadata().GetName()).Msg("fail to list measures in the group")
- continue
- }
- for _, measureSchema := range allMeasureSchemas {
- if res, ok := s.schemaRepo.LoadResource(measureSchema.GetMetadata()); ok {
- if startErr := res.(*measure).startSteamingManager(s.pipeline, s.metadata); startErr != nil {
- s.l.Err(startErr).Str("measure", measureSchema.GetMetadata().GetName()).Msg("fail to start streaming manager")
- }
- }
- }
- }
-
return s.stopCh
}