You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by lu...@apache.org on 2023/04/22 13:19:23 UTC
[skywalking-banyandb] 01/01: use topn in maxMod computation
This is an automated email from the ASF dual-hosted git repository.
lujiajing pushed a commit to branch polish-measure-reload
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 99f62e0df0a8b3aad22e10733253f6eef72e307f
Author: Megrez Lu <lu...@gmail.com>
AuthorDate: Sat Apr 22 21:19:11 2023 +0800
use topn in maxMod computation
---
banyand/measure/measure.go | 9 +++++++--
banyand/stream/stream.go | 7 +++++--
pkg/pb/v1/metadata.go | 11 -----------
pkg/schema/metadata.go | 24 +++++++++++++++++++++---
4 files changed, 33 insertions(+), 18 deletions(-)
diff --git a/banyand/measure/measure.go b/banyand/measure/measure.go
index 417766ee..a5ee5857 100644
--- a/banyand/measure/measure.go
+++ b/banyand/measure/measure.go
@@ -22,6 +22,7 @@ package measure
import (
"context"
+ "math"
"time"
commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
@@ -31,8 +32,8 @@ import (
"github.com/apache/skywalking-banyandb/banyand/tsdb/index"
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/partition"
- pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
"github.com/apache/skywalking-banyandb/pkg/query/logical"
+ resourceSchema "github.com/apache/skywalking-banyandb/pkg/schema"
"github.com/apache/skywalking-banyandb/pkg/timestamp"
)
@@ -89,6 +90,10 @@ func (s *measure) GetIndexRules() []*databasev1.IndexRule {
return s.indexRules
}
+func (s *measure) GetTopN() []*databasev1.TopNAggregation {
+ return s.topNAggregations
+}
+
func (s *measure) MaxObservedModRevision() int64 {
return s.maxObservedModRevision
}
@@ -107,7 +112,7 @@ func (s *measure) Close() error {
func (s *measure) parseSpec() (err error) {
s.name, s.group = s.schema.GetMetadata().GetName(), s.schema.GetMetadata().GetGroup()
s.entityLocator = partition.NewEntityLocator(s.schema.GetTagFamilies(), s.schema.GetEntity())
- s.maxObservedModRevision = pbv1.ParseMaxModRevision(s.indexRules)
+ s.maxObservedModRevision = int64(math.Max(float64(resourceSchema.ParseMaxModRevision(s.indexRules)), float64(resourceSchema.ParseMaxModRevision(s.topNAggregations))))
if s.schema.Interval != "" {
s.interval, err = timestamp.ParseDuration(s.schema.Interval)
}
diff --git a/banyand/stream/stream.go b/banyand/stream/stream.go
index 711b713d..ded686fe 100644
--- a/banyand/stream/stream.go
+++ b/banyand/stream/stream.go
@@ -28,7 +28,6 @@ import (
"github.com/apache/skywalking-banyandb/banyand/tsdb/index"
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/partition"
- pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
"github.com/apache/skywalking-banyandb/pkg/schema"
)
@@ -66,6 +65,10 @@ func (s *stream) MaxObservedModRevision() int64 {
return s.maxObservedModRevision
}
+func (s *stream) GetTopN() []*databasev1.TopNAggregation {
+ return nil
+}
+
func (s *stream) EntityLocator() partition.EntityLocator {
return s.entityLocator
}
@@ -77,7 +80,7 @@ func (s *stream) Close() error {
func (s *stream) parseSpec() {
s.name, s.group = s.schema.GetMetadata().GetName(), s.schema.GetMetadata().GetGroup()
s.entityLocator = partition.NewEntityLocator(s.schema.GetTagFamilies(), s.schema.GetEntity())
- s.maxObservedModRevision = pbv1.ParseMaxModRevision(s.indexRules)
+ s.maxObservedModRevision = schema.ParseMaxModRevision(s.indexRules)
}
type streamSpec struct {
diff --git a/pkg/pb/v1/metadata.go b/pkg/pb/v1/metadata.go
index 373c0e47..8654d390 100644
--- a/pkg/pb/v1/metadata.go
+++ b/pkg/pb/v1/metadata.go
@@ -72,14 +72,3 @@ func FieldValueTypeConv(fieldValue *modelv1.FieldValue) (tagType databasev1.Fiel
}
return databasev1.FieldType_FIELD_TYPE_UNSPECIFIED, false
}
-
-// ParseMaxModRevision parses the index rule's max revision field from its metadata.
-func ParseMaxModRevision(indexRules []*databasev1.IndexRule) (maxRevisionForIdxRules int64) {
- maxRevisionForIdxRules = int64(0)
- for _, idxRule := range indexRules {
- if idxRule.GetMetadata().GetModRevision() > maxRevisionForIdxRules {
- maxRevisionForIdxRules = idxRule.GetMetadata().GetModRevision()
- }
- }
- return
-}
diff --git a/pkg/schema/metadata.go b/pkg/schema/metadata.go
index 103958d5..31f23f57 100644
--- a/pkg/schema/metadata.go
+++ b/pkg/schema/metadata.go
@@ -21,6 +21,7 @@ package schema
import (
"context"
"io"
+ "math"
"sync"
"sync/atomic"
"time"
@@ -39,7 +40,6 @@ import (
"github.com/apache/skywalking-banyandb/pkg/bus"
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/partition"
- pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
"github.com/apache/skywalking-banyandb/pkg/run"
)
@@ -93,6 +93,7 @@ type ResourceSpec struct {
// Resource allows access metadata from a local cache.
type Resource interface {
GetIndexRules() []*databasev1.IndexRule
+ GetTopN() []*databasev1.TopNAggregation
MaxObservedModRevision() int64
EntityLocator() partition.EntityLocator
ResourceSchema
@@ -452,8 +453,14 @@ func (g *group) StoreResource(resourceSchema ResourceSchema) (Resource, error) {
if errIndexRules != nil {
return nil, errIndexRules
}
- if len(idxRules) == len(preResource.GetIndexRules()) {
- maxModRevision := pbv1.ParseMaxModRevision(idxRules)
+ ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
+ topNAggrs, errTopN := g.metadata.MeasureRegistry().TopNAggregations(ctx, resourceSchema.GetMetadata())
+ cancel()
+ if errTopN != nil {
+ return nil, errTopN
+ }
+ if len(idxRules) == len(preResource.GetIndexRules()) && len(topNAggrs) == len(preResource.GetTopN()) {
+ maxModRevision := int64(math.Max(float64(ParseMaxModRevision(idxRules)), float64(ParseMaxModRevision(topNAggrs))))
if preResource.MaxObservedModRevision() >= maxModRevision {
return preResource, nil
}
@@ -553,3 +560,14 @@ func (g *group) close() (err error) {
g.mapMutex.RUnlock()
return multierr.Append(err, g.SupplyTSDB().Close())
}
+
+// ParseMaxModRevision gives the max revision from resources' metadata.
+func ParseMaxModRevision[T ResourceSchema](indexRules []T) (maxRevisionForIdxRules int64) {
+ maxRevisionForIdxRules = int64(0)
+ for _, idxRule := range indexRules {
+ if idxRule.GetMetadata().GetModRevision() > maxRevisionForIdxRules {
+ maxRevisionForIdxRules = idxRule.GetMetadata().GetModRevision()
+ }
+ }
+ return
+}