You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ha...@apache.org on 2023/04/23 11:16:10 UTC
[skywalking-banyandb] branch main updated: Use TopN in max revision (#267)
This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new dea8c1e3 Use TopN in max revision (#267)
dea8c1e3 is described below
commit dea8c1e37d4dc19fe18397deb576151a22e2fad8
Author: Jiajing LU <lu...@gmail.com>
AuthorDate: Sun Apr 23 19:16:04 2023 +0800
Use TopN in max revision (#267)
* use topn in maxMod computation
---
CHANGES.md | 1 +
banyand/measure/measure.go | 9 +++++++--
banyand/stream/stream.go | 7 +++++--
pkg/pb/v1/metadata.go | 11 -----------
pkg/schema/metadata.go | 24 +++++++++++++++++++++---
5 files changed, 34 insertions(+), 18 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index 2800744d..81da5ab3 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -13,6 +13,7 @@ Release Notes.
- Add a meter system to control the internal metrics.
- Add multiple metrics for measuring the storage subsystem.
- Refactor callback of TopNAggregation schema event to avoid deadlock and reload issue.
+- Fix max ModRevision computation with inclusion of `TopNAggregation`
### Chores
diff --git a/banyand/measure/measure.go b/banyand/measure/measure.go
index 417766ee..a5ee5857 100644
--- a/banyand/measure/measure.go
+++ b/banyand/measure/measure.go
@@ -22,6 +22,7 @@ package measure
import (
"context"
+ "math"
"time"
commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
@@ -31,8 +32,8 @@ import (
"github.com/apache/skywalking-banyandb/banyand/tsdb/index"
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/partition"
- pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
"github.com/apache/skywalking-banyandb/pkg/query/logical"
+ resourceSchema "github.com/apache/skywalking-banyandb/pkg/schema"
"github.com/apache/skywalking-banyandb/pkg/timestamp"
)
@@ -89,6 +90,10 @@ func (s *measure) GetIndexRules() []*databasev1.IndexRule {
return s.indexRules
}
+func (s *measure) GetTopN() []*databasev1.TopNAggregation {
+ return s.topNAggregations
+}
+
func (s *measure) MaxObservedModRevision() int64 {
return s.maxObservedModRevision
}
@@ -107,7 +112,7 @@ func (s *measure) Close() error {
func (s *measure) parseSpec() (err error) {
s.name, s.group = s.schema.GetMetadata().GetName(), s.schema.GetMetadata().GetGroup()
s.entityLocator = partition.NewEntityLocator(s.schema.GetTagFamilies(), s.schema.GetEntity())
- s.maxObservedModRevision = pbv1.ParseMaxModRevision(s.indexRules)
+ s.maxObservedModRevision = int64(math.Max(float64(resourceSchema.ParseMaxModRevision(s.indexRules)), float64(resourceSchema.ParseMaxModRevision(s.topNAggregations))))
if s.schema.Interval != "" {
s.interval, err = timestamp.ParseDuration(s.schema.Interval)
}
diff --git a/banyand/stream/stream.go b/banyand/stream/stream.go
index 711b713d..ded686fe 100644
--- a/banyand/stream/stream.go
+++ b/banyand/stream/stream.go
@@ -28,7 +28,6 @@ import (
"github.com/apache/skywalking-banyandb/banyand/tsdb/index"
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/partition"
- pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
"github.com/apache/skywalking-banyandb/pkg/schema"
)
@@ -66,6 +65,10 @@ func (s *stream) MaxObservedModRevision() int64 {
return s.maxObservedModRevision
}
+func (s *stream) GetTopN() []*databasev1.TopNAggregation {
+ return nil
+}
+
func (s *stream) EntityLocator() partition.EntityLocator {
return s.entityLocator
}
@@ -77,7 +80,7 @@ func (s *stream) Close() error {
func (s *stream) parseSpec() {
s.name, s.group = s.schema.GetMetadata().GetName(), s.schema.GetMetadata().GetGroup()
s.entityLocator = partition.NewEntityLocator(s.schema.GetTagFamilies(), s.schema.GetEntity())
- s.maxObservedModRevision = pbv1.ParseMaxModRevision(s.indexRules)
+ s.maxObservedModRevision = schema.ParseMaxModRevision(s.indexRules)
}
type streamSpec struct {
diff --git a/pkg/pb/v1/metadata.go b/pkg/pb/v1/metadata.go
index 373c0e47..8654d390 100644
--- a/pkg/pb/v1/metadata.go
+++ b/pkg/pb/v1/metadata.go
@@ -72,14 +72,3 @@ func FieldValueTypeConv(fieldValue *modelv1.FieldValue) (tagType databasev1.Fiel
}
return databasev1.FieldType_FIELD_TYPE_UNSPECIFIED, false
}
-
-// ParseMaxModRevision parses the index rule's max revision field from its metadata.
-func ParseMaxModRevision(indexRules []*databasev1.IndexRule) (maxRevisionForIdxRules int64) {
- maxRevisionForIdxRules = int64(0)
- for _, idxRule := range indexRules {
- if idxRule.GetMetadata().GetModRevision() > maxRevisionForIdxRules {
- maxRevisionForIdxRules = idxRule.GetMetadata().GetModRevision()
- }
- }
- return
-}
diff --git a/pkg/schema/metadata.go b/pkg/schema/metadata.go
index 103958d5..31f23f57 100644
--- a/pkg/schema/metadata.go
+++ b/pkg/schema/metadata.go
@@ -21,6 +21,7 @@ package schema
import (
"context"
"io"
+ "math"
"sync"
"sync/atomic"
"time"
@@ -39,7 +40,6 @@ import (
"github.com/apache/skywalking-banyandb/pkg/bus"
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/partition"
- pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
"github.com/apache/skywalking-banyandb/pkg/run"
)
@@ -93,6 +93,7 @@ type ResourceSpec struct {
// Resource allows access metadata from a local cache.
type Resource interface {
GetIndexRules() []*databasev1.IndexRule
+ GetTopN() []*databasev1.TopNAggregation
MaxObservedModRevision() int64
EntityLocator() partition.EntityLocator
ResourceSchema
@@ -452,8 +453,14 @@ func (g *group) StoreResource(resourceSchema ResourceSchema) (Resource, error) {
if errIndexRules != nil {
return nil, errIndexRules
}
- if len(idxRules) == len(preResource.GetIndexRules()) {
- maxModRevision := pbv1.ParseMaxModRevision(idxRules)
+ ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
+ topNAggrs, errTopN := g.metadata.MeasureRegistry().TopNAggregations(ctx, resourceSchema.GetMetadata())
+ cancel()
+ if errTopN != nil {
+ return nil, errTopN
+ }
+ if len(idxRules) == len(preResource.GetIndexRules()) && len(topNAggrs) == len(preResource.GetTopN()) {
+ maxModRevision := int64(math.Max(float64(ParseMaxModRevision(idxRules)), float64(ParseMaxModRevision(topNAggrs))))
if preResource.MaxObservedModRevision() >= maxModRevision {
return preResource, nil
}
@@ -553,3 +560,14 @@ func (g *group) close() (err error) {
g.mapMutex.RUnlock()
return multierr.Append(err, g.SupplyTSDB().Close())
}
+
+// ParseMaxModRevision gives the max revision from resources' metadata.
+func ParseMaxModRevision[T ResourceSchema](indexRules []T) (maxRevisionForIdxRules int64) {
+ maxRevisionForIdxRules = int64(0)
+ for _, idxRule := range indexRules {
+ if idxRule.GetMetadata().GetModRevision() > maxRevisionForIdxRules {
+ maxRevisionForIdxRules = idxRule.GetMetadata().GetModRevision()
+ }
+ }
+ return
+}