You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by lu...@apache.org on 2023/04/22 13:19:22 UTC

[skywalking-banyandb] branch polish-measure-reload created (now 99f62e0d)

This is an automated email from the ASF dual-hosted git repository.

lujiajing pushed a change to branch polish-measure-reload
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


      at 99f62e0d use topn in maxMod computation

This branch includes the following new commits:

     new 99f62e0d use topn in maxMod computation

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[skywalking-banyandb] 01/01: use topn in maxMod computation

Posted by lu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lujiajing pushed a commit to branch polish-measure-reload
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit 99f62e0df0a8b3aad22e10733253f6eef72e307f
Author: Megrez Lu <lu...@gmail.com>
AuthorDate: Sat Apr 22 21:19:11 2023 +0800

    use topn in maxMod computation
---
 banyand/measure/measure.go |  9 +++++++--
 banyand/stream/stream.go   |  7 +++++--
 pkg/pb/v1/metadata.go      | 11 -----------
 pkg/schema/metadata.go     | 24 +++++++++++++++++++++---
 4 files changed, 33 insertions(+), 18 deletions(-)

diff --git a/banyand/measure/measure.go b/banyand/measure/measure.go
index 417766ee..a5ee5857 100644
--- a/banyand/measure/measure.go
+++ b/banyand/measure/measure.go
@@ -22,6 +22,7 @@ package measure
 
 import (
 	"context"
+	"math"
 	"time"
 
 	commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
@@ -31,8 +32,8 @@ import (
 	"github.com/apache/skywalking-banyandb/banyand/tsdb/index"
 	"github.com/apache/skywalking-banyandb/pkg/logger"
 	"github.com/apache/skywalking-banyandb/pkg/partition"
-	pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
 	"github.com/apache/skywalking-banyandb/pkg/query/logical"
+	resourceSchema "github.com/apache/skywalking-banyandb/pkg/schema"
 	"github.com/apache/skywalking-banyandb/pkg/timestamp"
 )
 
@@ -89,6 +90,10 @@ func (s *measure) GetIndexRules() []*databasev1.IndexRule {
 	return s.indexRules
 }
 
+func (s *measure) GetTopN() []*databasev1.TopNAggregation {
+	return s.topNAggregations
+}
+
 func (s *measure) MaxObservedModRevision() int64 {
 	return s.maxObservedModRevision
 }
@@ -107,7 +112,7 @@ func (s *measure) Close() error {
 func (s *measure) parseSpec() (err error) {
 	s.name, s.group = s.schema.GetMetadata().GetName(), s.schema.GetMetadata().GetGroup()
 	s.entityLocator = partition.NewEntityLocator(s.schema.GetTagFamilies(), s.schema.GetEntity())
-	s.maxObservedModRevision = pbv1.ParseMaxModRevision(s.indexRules)
+	s.maxObservedModRevision = int64(math.Max(float64(resourceSchema.ParseMaxModRevision(s.indexRules)), float64(resourceSchema.ParseMaxModRevision(s.topNAggregations))))
 	if s.schema.Interval != "" {
 		s.interval, err = timestamp.ParseDuration(s.schema.Interval)
 	}
diff --git a/banyand/stream/stream.go b/banyand/stream/stream.go
index 711b713d..ded686fe 100644
--- a/banyand/stream/stream.go
+++ b/banyand/stream/stream.go
@@ -28,7 +28,6 @@ import (
 	"github.com/apache/skywalking-banyandb/banyand/tsdb/index"
 	"github.com/apache/skywalking-banyandb/pkg/logger"
 	"github.com/apache/skywalking-banyandb/pkg/partition"
-	pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
 	"github.com/apache/skywalking-banyandb/pkg/schema"
 )
 
@@ -66,6 +65,10 @@ func (s *stream) MaxObservedModRevision() int64 {
 	return s.maxObservedModRevision
 }
 
+func (s *stream) GetTopN() []*databasev1.TopNAggregation {
+	return nil
+}
+
 func (s *stream) EntityLocator() partition.EntityLocator {
 	return s.entityLocator
 }
@@ -77,7 +80,7 @@ func (s *stream) Close() error {
 func (s *stream) parseSpec() {
 	s.name, s.group = s.schema.GetMetadata().GetName(), s.schema.GetMetadata().GetGroup()
 	s.entityLocator = partition.NewEntityLocator(s.schema.GetTagFamilies(), s.schema.GetEntity())
-	s.maxObservedModRevision = pbv1.ParseMaxModRevision(s.indexRules)
+	s.maxObservedModRevision = schema.ParseMaxModRevision(s.indexRules)
 }
 
 type streamSpec struct {
diff --git a/pkg/pb/v1/metadata.go b/pkg/pb/v1/metadata.go
index 373c0e47..8654d390 100644
--- a/pkg/pb/v1/metadata.go
+++ b/pkg/pb/v1/metadata.go
@@ -72,14 +72,3 @@ func FieldValueTypeConv(fieldValue *modelv1.FieldValue) (tagType databasev1.Fiel
 	}
 	return databasev1.FieldType_FIELD_TYPE_UNSPECIFIED, false
 }
-
-// ParseMaxModRevision parses the index rule's max revision field from its metadata.
-func ParseMaxModRevision(indexRules []*databasev1.IndexRule) (maxRevisionForIdxRules int64) {
-	maxRevisionForIdxRules = int64(0)
-	for _, idxRule := range indexRules {
-		if idxRule.GetMetadata().GetModRevision() > maxRevisionForIdxRules {
-			maxRevisionForIdxRules = idxRule.GetMetadata().GetModRevision()
-		}
-	}
-	return
-}
diff --git a/pkg/schema/metadata.go b/pkg/schema/metadata.go
index 103958d5..31f23f57 100644
--- a/pkg/schema/metadata.go
+++ b/pkg/schema/metadata.go
@@ -21,6 +21,7 @@ package schema
 import (
 	"context"
 	"io"
+	"math"
 	"sync"
 	"sync/atomic"
 	"time"
@@ -39,7 +40,6 @@ import (
 	"github.com/apache/skywalking-banyandb/pkg/bus"
 	"github.com/apache/skywalking-banyandb/pkg/logger"
 	"github.com/apache/skywalking-banyandb/pkg/partition"
-	pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
 	"github.com/apache/skywalking-banyandb/pkg/run"
 )
 
@@ -93,6 +93,7 @@ type ResourceSpec struct {
 // Resource allows access metadata from a local cache.
 type Resource interface {
 	GetIndexRules() []*databasev1.IndexRule
+	GetTopN() []*databasev1.TopNAggregation
 	MaxObservedModRevision() int64
 	EntityLocator() partition.EntityLocator
 	ResourceSchema
@@ -452,8 +453,14 @@ func (g *group) StoreResource(resourceSchema ResourceSchema) (Resource, error) {
 		if errIndexRules != nil {
 			return nil, errIndexRules
 		}
-		if len(idxRules) == len(preResource.GetIndexRules()) {
-			maxModRevision := pbv1.ParseMaxModRevision(idxRules)
+		ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
+		topNAggrs, errTopN := g.metadata.MeasureRegistry().TopNAggregations(ctx, resourceSchema.GetMetadata())
+		cancel()
+		if errTopN != nil {
+			return nil, errTopN
+		}
+		if len(idxRules) == len(preResource.GetIndexRules()) && len(topNAggrs) == len(preResource.GetTopN()) {
+			maxModRevision := int64(math.Max(float64(ParseMaxModRevision(idxRules)), float64(ParseMaxModRevision(topNAggrs))))
 			if preResource.MaxObservedModRevision() >= maxModRevision {
 				return preResource, nil
 			}
@@ -553,3 +560,14 @@ func (g *group) close() (err error) {
 	g.mapMutex.RUnlock()
 	return multierr.Append(err, g.SupplyTSDB().Close())
 }
+
+// ParseMaxModRevision gives the max revision from resources' metadata.
+func ParseMaxModRevision[T ResourceSchema](indexRules []T) (maxRevisionForIdxRules int64) {
+	maxRevisionForIdxRules = int64(0)
+	for _, idxRule := range indexRules {
+		if idxRule.GetMetadata().GetModRevision() > maxRevisionForIdxRules {
+			maxRevisionForIdxRules = idxRule.GetMetadata().GetModRevision()
+		}
+	}
+	return
+}