You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by lu...@apache.org on 2023/04/22 13:19:23 UTC

[skywalking-banyandb] 01/01: use topn in maxMod computation

This is an automated email from the ASF dual-hosted git repository.

lujiajing pushed a commit to branch polish-measure-reload
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit 99f62e0df0a8b3aad22e10733253f6eef72e307f
Author: Megrez Lu <lu...@gmail.com>
AuthorDate: Sat Apr 22 21:19:11 2023 +0800

    use topn in maxMod computation
---
 banyand/measure/measure.go |  9 +++++++--
 banyand/stream/stream.go   |  7 +++++--
 pkg/pb/v1/metadata.go      | 11 -----------
 pkg/schema/metadata.go     | 24 +++++++++++++++++++++---
 4 files changed, 33 insertions(+), 18 deletions(-)

diff --git a/banyand/measure/measure.go b/banyand/measure/measure.go
index 417766ee..a5ee5857 100644
--- a/banyand/measure/measure.go
+++ b/banyand/measure/measure.go
@@ -22,6 +22,7 @@ package measure
 
 import (
 	"context"
+	"math"
 	"time"
 
 	commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
@@ -31,8 +32,8 @@ import (
 	"github.com/apache/skywalking-banyandb/banyand/tsdb/index"
 	"github.com/apache/skywalking-banyandb/pkg/logger"
 	"github.com/apache/skywalking-banyandb/pkg/partition"
-	pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
 	"github.com/apache/skywalking-banyandb/pkg/query/logical"
+	resourceSchema "github.com/apache/skywalking-banyandb/pkg/schema"
 	"github.com/apache/skywalking-banyandb/pkg/timestamp"
 )
 
@@ -89,6 +90,10 @@ func (s *measure) GetIndexRules() []*databasev1.IndexRule {
 	return s.indexRules
 }
 
+func (s *measure) GetTopN() []*databasev1.TopNAggregation {
+	return s.topNAggregations
+}
+
 func (s *measure) MaxObservedModRevision() int64 {
 	return s.maxObservedModRevision
 }
@@ -107,7 +112,7 @@ func (s *measure) Close() error {
 func (s *measure) parseSpec() (err error) {
 	s.name, s.group = s.schema.GetMetadata().GetName(), s.schema.GetMetadata().GetGroup()
 	s.entityLocator = partition.NewEntityLocator(s.schema.GetTagFamilies(), s.schema.GetEntity())
-	s.maxObservedModRevision = pbv1.ParseMaxModRevision(s.indexRules)
+	s.maxObservedModRevision = int64(math.Max(float64(resourceSchema.ParseMaxModRevision(s.indexRules)), float64(resourceSchema.ParseMaxModRevision(s.topNAggregations))))
 	if s.schema.Interval != "" {
 		s.interval, err = timestamp.ParseDuration(s.schema.Interval)
 	}
diff --git a/banyand/stream/stream.go b/banyand/stream/stream.go
index 711b713d..ded686fe 100644
--- a/banyand/stream/stream.go
+++ b/banyand/stream/stream.go
@@ -28,7 +28,6 @@ import (
 	"github.com/apache/skywalking-banyandb/banyand/tsdb/index"
 	"github.com/apache/skywalking-banyandb/pkg/logger"
 	"github.com/apache/skywalking-banyandb/pkg/partition"
-	pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
 	"github.com/apache/skywalking-banyandb/pkg/schema"
 )
 
@@ -66,6 +65,10 @@ func (s *stream) MaxObservedModRevision() int64 {
 	return s.maxObservedModRevision
 }
 
+func (s *stream) GetTopN() []*databasev1.TopNAggregation {
+	return nil
+}
+
 func (s *stream) EntityLocator() partition.EntityLocator {
 	return s.entityLocator
 }
@@ -77,7 +80,7 @@ func (s *stream) Close() error {
 func (s *stream) parseSpec() {
 	s.name, s.group = s.schema.GetMetadata().GetName(), s.schema.GetMetadata().GetGroup()
 	s.entityLocator = partition.NewEntityLocator(s.schema.GetTagFamilies(), s.schema.GetEntity())
-	s.maxObservedModRevision = pbv1.ParseMaxModRevision(s.indexRules)
+	s.maxObservedModRevision = schema.ParseMaxModRevision(s.indexRules)
 }
 
 type streamSpec struct {
diff --git a/pkg/pb/v1/metadata.go b/pkg/pb/v1/metadata.go
index 373c0e47..8654d390 100644
--- a/pkg/pb/v1/metadata.go
+++ b/pkg/pb/v1/metadata.go
@@ -72,14 +72,3 @@ func FieldValueTypeConv(fieldValue *modelv1.FieldValue) (tagType databasev1.Fiel
 	}
 	return databasev1.FieldType_FIELD_TYPE_UNSPECIFIED, false
 }
-
-// ParseMaxModRevision parses the index rule's max revision field from its metadata.
-func ParseMaxModRevision(indexRules []*databasev1.IndexRule) (maxRevisionForIdxRules int64) {
-	maxRevisionForIdxRules = int64(0)
-	for _, idxRule := range indexRules {
-		if idxRule.GetMetadata().GetModRevision() > maxRevisionForIdxRules {
-			maxRevisionForIdxRules = idxRule.GetMetadata().GetModRevision()
-		}
-	}
-	return
-}
diff --git a/pkg/schema/metadata.go b/pkg/schema/metadata.go
index 103958d5..31f23f57 100644
--- a/pkg/schema/metadata.go
+++ b/pkg/schema/metadata.go
@@ -21,6 +21,7 @@ package schema
 import (
 	"context"
 	"io"
+	"math"
 	"sync"
 	"sync/atomic"
 	"time"
@@ -39,7 +40,6 @@ import (
 	"github.com/apache/skywalking-banyandb/pkg/bus"
 	"github.com/apache/skywalking-banyandb/pkg/logger"
 	"github.com/apache/skywalking-banyandb/pkg/partition"
-	pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
 	"github.com/apache/skywalking-banyandb/pkg/run"
 )
 
@@ -93,6 +93,7 @@ type ResourceSpec struct {
 // Resource allows access metadata from a local cache.
 type Resource interface {
 	GetIndexRules() []*databasev1.IndexRule
+	GetTopN() []*databasev1.TopNAggregation
 	MaxObservedModRevision() int64
 	EntityLocator() partition.EntityLocator
 	ResourceSchema
@@ -452,8 +453,14 @@ func (g *group) StoreResource(resourceSchema ResourceSchema) (Resource, error) {
 		if errIndexRules != nil {
 			return nil, errIndexRules
 		}
-		if len(idxRules) == len(preResource.GetIndexRules()) {
-			maxModRevision := pbv1.ParseMaxModRevision(idxRules)
+		ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
+		topNAggrs, errTopN := g.metadata.MeasureRegistry().TopNAggregations(ctx, resourceSchema.GetMetadata())
+		cancel()
+		if errTopN != nil {
+			return nil, errTopN
+		}
+		if len(idxRules) == len(preResource.GetIndexRules()) && len(topNAggrs) == len(preResource.GetTopN()) {
+			maxModRevision := int64(math.Max(float64(ParseMaxModRevision(idxRules)), float64(ParseMaxModRevision(topNAggrs))))
 			if preResource.MaxObservedModRevision() >= maxModRevision {
 				return preResource, nil
 			}
@@ -553,3 +560,14 @@ func (g *group) close() (err error) {
 	g.mapMutex.RUnlock()
 	return multierr.Append(err, g.SupplyTSDB().Close())
 }
+
+// ParseMaxModRevision gives the max revision from resources' metadata.
+func ParseMaxModRevision[T ResourceSchema](indexRules []T) (maxRevisionForIdxRules int64) {
+	maxRevisionForIdxRules = int64(0)
+	for _, idxRule := range indexRules {
+		if idxRule.GetMetadata().GetModRevision() > maxRevisionForIdxRules {
+			maxRevisionForIdxRules = idxRule.GetMetadata().GetModRevision()
+		}
+	}
+	return
+}