You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by lu...@apache.org on 2022/04/09 15:06:14 UTC
[skywalking-banyandb] branch polish-error updated: resolve cyclic
This is an automated email from the ASF dual-hosted git repository.
lujiajing pushed a commit to branch polish-error
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/polish-error by this push:
new 0e260e4 resolve cyclic
0e260e4 is described below
commit 0e260e4eadb7fdeae2f6356fe977315d0d433768
Author: Megrez Lu <lu...@gmail.com>
AuthorDate: Sat Apr 9 23:06:06 2022 +0800
resolve cyclic
---
banyand/liaison/grpc/registry.go | 24 ++++++++++++------------
{pkg => banyand/metadata}/schema/error.go | 0
banyand/metadata/schema/etcd.go | 5 ++---
banyand/metadata/schema/etcd_test.go | 16 ++++++++--------
banyand/metadata/schema/index.go | 5 ++---
banyand/metadata/schema/measure.go | 5 ++---
banyand/metadata/schema/property.go | 3 +--
banyand/metadata/schema/stream.go | 3 +--
banyand/metadata/schema/topn.go | 3 +--
9 files changed, 29 insertions(+), 35 deletions(-)
diff --git a/banyand/liaison/grpc/registry.go b/banyand/liaison/grpc/registry.go
index 61e2add..53b4802 100644
--- a/banyand/liaison/grpc/registry.go
+++ b/banyand/liaison/grpc/registry.go
@@ -32,7 +32,7 @@ type streamRegistryServer struct {
func (rs *streamRegistryServer) Create(ctx context.Context,
req *databasev1.StreamRegistryServiceCreateRequest) (*databasev1.StreamRegistryServiceCreateResponse, error) {
- if err := rs.schemaRegistry.StreamRegistry().UpdateStream(ctx, req.GetStream()); err != nil {
+ if err := rs.schemaRegistry.StreamRegistry().UpdateStream(ctx, req.GetStream(), false); err != nil {
return nil, err
}
return &databasev1.StreamRegistryServiceCreateResponse{}, nil
@@ -40,7 +40,7 @@ func (rs *streamRegistryServer) Create(ctx context.Context,
func (rs *streamRegistryServer) Update(ctx context.Context,
req *databasev1.StreamRegistryServiceUpdateRequest) (*databasev1.StreamRegistryServiceUpdateResponse, error) {
- if err := rs.schemaRegistry.StreamRegistry().UpdateStream(ctx, req.GetStream()); err != nil {
+ if err := rs.schemaRegistry.StreamRegistry().UpdateStream(ctx, req.GetStream(), true); err != nil {
return nil, err
}
return &databasev1.StreamRegistryServiceUpdateResponse{}, nil
@@ -87,7 +87,7 @@ type indexRuleBindingRegistryServer struct {
func (rs *indexRuleBindingRegistryServer) Create(ctx context.Context,
req *databasev1.IndexRuleBindingRegistryServiceCreateRequest) (
*databasev1.IndexRuleBindingRegistryServiceCreateResponse, error) {
- if err := rs.schemaRegistry.IndexRuleBindingRegistry().UpdateIndexRuleBinding(ctx, req.GetIndexRuleBinding()); err != nil {
+ if err := rs.schemaRegistry.IndexRuleBindingRegistry().UpdateIndexRuleBinding(ctx, req.GetIndexRuleBinding(), false); err != nil {
return nil, err
}
return &databasev1.IndexRuleBindingRegistryServiceCreateResponse{}, nil
@@ -96,7 +96,7 @@ func (rs *indexRuleBindingRegistryServer) Create(ctx context.Context,
func (rs *indexRuleBindingRegistryServer) Update(ctx context.Context,
req *databasev1.IndexRuleBindingRegistryServiceUpdateRequest) (
*databasev1.IndexRuleBindingRegistryServiceUpdateResponse, error) {
- if err := rs.schemaRegistry.IndexRuleBindingRegistry().UpdateIndexRuleBinding(ctx, req.GetIndexRuleBinding()); err != nil {
+ if err := rs.schemaRegistry.IndexRuleBindingRegistry().UpdateIndexRuleBinding(ctx, req.GetIndexRuleBinding(), true); err != nil {
return nil, err
}
return &databasev1.IndexRuleBindingRegistryServiceUpdateResponse{}, nil
@@ -146,7 +146,7 @@ type indexRuleRegistryServer struct {
func (rs *indexRuleRegistryServer) Create(ctx context.Context, req *databasev1.IndexRuleRegistryServiceCreateRequest) (
*databasev1.IndexRuleRegistryServiceCreateResponse, error) {
- if err := rs.schemaRegistry.IndexRuleRegistry().UpdateIndexRule(ctx, req.GetIndexRule()); err != nil {
+ if err := rs.schemaRegistry.IndexRuleRegistry().UpdateIndexRule(ctx, req.GetIndexRule(), false); err != nil {
return nil, err
}
return &databasev1.IndexRuleRegistryServiceCreateResponse{}, nil
@@ -154,7 +154,7 @@ func (rs *indexRuleRegistryServer) Create(ctx context.Context, req *databasev1.I
func (rs *indexRuleRegistryServer) Update(ctx context.Context, req *databasev1.IndexRuleRegistryServiceUpdateRequest) (
*databasev1.IndexRuleRegistryServiceUpdateResponse, error) {
- if err := rs.schemaRegistry.IndexRuleRegistry().UpdateIndexRule(ctx, req.GetIndexRule()); err != nil {
+ if err := rs.schemaRegistry.IndexRuleRegistry().UpdateIndexRule(ctx, req.GetIndexRule(), true); err != nil {
return nil, err
}
return &databasev1.IndexRuleRegistryServiceUpdateResponse{}, nil
@@ -200,7 +200,7 @@ type measureRegistryServer struct {
func (rs *measureRegistryServer) Create(ctx context.Context, req *databasev1.MeasureRegistryServiceCreateRequest) (
*databasev1.MeasureRegistryServiceCreateResponse, error) {
- if err := rs.schemaRegistry.MeasureRegistry().UpdateMeasure(ctx, req.GetMeasure()); err != nil {
+ if err := rs.schemaRegistry.MeasureRegistry().UpdateMeasure(ctx, req.GetMeasure(), false); err != nil {
return nil, err
}
return &databasev1.MeasureRegistryServiceCreateResponse{}, nil
@@ -208,7 +208,7 @@ func (rs *measureRegistryServer) Create(ctx context.Context, req *databasev1.Mea
func (rs *measureRegistryServer) Update(ctx context.Context, req *databasev1.MeasureRegistryServiceUpdateRequest) (
*databasev1.MeasureRegistryServiceUpdateResponse, error) {
- if err := rs.schemaRegistry.MeasureRegistry().UpdateMeasure(ctx, req.GetMeasure()); err != nil {
+ if err := rs.schemaRegistry.MeasureRegistry().UpdateMeasure(ctx, req.GetMeasure(), true); err != nil {
return nil, err
}
return &databasev1.MeasureRegistryServiceUpdateResponse{}, nil
@@ -254,7 +254,7 @@ type groupRegistryServer struct {
func (rs *groupRegistryServer) Create(ctx context.Context, req *databasev1.GroupRegistryServiceCreateRequest) (
*databasev1.GroupRegistryServiceCreateResponse, error) {
- if err := rs.schemaRegistry.GroupRegistry().UpdateGroup(ctx, req.GetGroup()); err != nil {
+ if err := rs.schemaRegistry.GroupRegistry().UpdateGroup(ctx, req.GetGroup(), false); err != nil {
return nil, err
}
return &databasev1.GroupRegistryServiceCreateResponse{}, nil
@@ -262,7 +262,7 @@ func (rs *groupRegistryServer) Create(ctx context.Context, req *databasev1.Group
func (rs *groupRegistryServer) Update(ctx context.Context, req *databasev1.GroupRegistryServiceUpdateRequest) (
*databasev1.GroupRegistryServiceUpdateResponse, error) {
- if err := rs.schemaRegistry.GroupRegistry().UpdateGroup(ctx, req.GetGroup()); err != nil {
+ if err := rs.schemaRegistry.GroupRegistry().UpdateGroup(ctx, req.GetGroup(), true); err != nil {
return nil, err
}
return &databasev1.GroupRegistryServiceUpdateResponse{}, nil
@@ -308,7 +308,7 @@ type topNAggregationRegistryServer struct {
func (ts *topNAggregationRegistryServer) Create(ctx context.Context,
req *databasev1.TopNAggregationRegistryServiceCreateRequest) (*databasev1.TopNAggregationRegistryServiceCreateResponse, error) {
- if err := ts.schemaRegistry.TopNAggregationRegistry().UpdateTopNAggregation(ctx, req.GetTopNAggregation()); err != nil {
+ if err := ts.schemaRegistry.TopNAggregationRegistry().UpdateTopNAggregation(ctx, req.GetTopNAggregation(), false); err != nil {
return nil, err
}
return &databasev1.TopNAggregationRegistryServiceCreateResponse{}, nil
@@ -316,7 +316,7 @@ func (ts *topNAggregationRegistryServer) Create(ctx context.Context,
func (ts *topNAggregationRegistryServer) Update(ctx context.Context,
req *databasev1.TopNAggregationRegistryServiceUpdateRequest) (*databasev1.TopNAggregationRegistryServiceUpdateResponse, error) {
- if err := ts.schemaRegistry.TopNAggregationRegistry().UpdateTopNAggregation(ctx, req.GetTopNAggregation()); err != nil {
+ if err := ts.schemaRegistry.TopNAggregationRegistry().UpdateTopNAggregation(ctx, req.GetTopNAggregation(), true); err != nil {
return nil, err
}
return &databasev1.TopNAggregationRegistryServiceUpdateResponse{}, nil
diff --git a/pkg/schema/error.go b/banyand/metadata/schema/error.go
similarity index 100%
rename from pkg/schema/error.go
rename to banyand/metadata/schema/error.go
diff --git a/banyand/metadata/schema/etcd.go b/banyand/metadata/schema/etcd.go
index e8e40bd..870f026 100644
--- a/banyand/metadata/schema/etcd.go
+++ b/banyand/metadata/schema/etcd.go
@@ -32,7 +32,6 @@ import (
commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
- "github.com/apache/skywalking-banyandb/pkg/schema"
)
var (
@@ -176,7 +175,7 @@ func (e *etcdSchemaRegistry) get(ctx context.Context, key string, message proto.
return err
}
if resp.Count == 0 {
- return schema.ErrGRPCResourceNotFound
+ return ErrGRPCResourceNotFound
}
if resp.Count > 1 {
return ErrUnexpectedNumberOfEntities
@@ -211,7 +210,7 @@ func (e *etcdSchemaRegistry) update(ctx context.Context, metadata Metadata, allo
replace := getResp.Count > 0
if replace {
if !allowOverwrite {
- return schema.ErrGRPCAlreadyExists
+ return ErrGRPCAlreadyExists
}
existingVal, innerErr := metadata.Unmarshal(getResp.Kvs[0].Value)
if innerErr != nil {
diff --git a/banyand/metadata/schema/etcd_test.go b/banyand/metadata/schema/etcd_test.go
index 4f53b51..be33d21 100644
--- a/banyand/metadata/schema/etcd_test.go
+++ b/banyand/metadata/schema/etcd_test.go
@@ -68,7 +68,7 @@ func preloadSchema(e Registry) error {
if err := protojson.Unmarshal([]byte(groupJSON), g); err != nil {
return err
}
- if err := e.UpdateGroup(context.TODO(), g); err != nil {
+ if err := e.UpdateGroup(context.TODO(), g, false); err != nil {
return err
}
@@ -76,7 +76,7 @@ func preloadSchema(e Registry) error {
if err := protojson.Unmarshal([]byte(streamJSON), s); err != nil {
return err
}
- err := e.UpdateStream(context.Background(), s)
+ err := e.UpdateStream(context.Background(), s, false)
if err != nil {
return err
}
@@ -85,7 +85,7 @@ func preloadSchema(e Registry) error {
if err = protojson.Unmarshal([]byte(indexRuleBindingJSON), indexRuleBinding); err != nil {
return err
}
- err = e.UpdateIndexRuleBinding(context.Background(), indexRuleBinding)
+ err = e.UpdateIndexRuleBinding(context.Background(), indexRuleBinding, false)
if err != nil {
return err
}
@@ -104,7 +104,7 @@ func preloadSchema(e Registry) error {
if err != nil {
return err
}
- err = e.UpdateIndexRule(context.Background(), &idxRule)
+ err = e.UpdateIndexRule(context.Background(), &idxRule, false)
if err != nil {
return err
}
@@ -399,7 +399,7 @@ func Test_Notify(t *testing.T) {
}
ir.Type = databasev1.IndexRule_TYPE_TREE
- return r.UpdateIndexRule(ctx, ir)
+ return r.UpdateIndexRule(ctx, ir, true)
},
validationFunc: func(mocked *mockedEventHandler) bool {
return mocked.AssertNumberOfCalls(t, "OnAddOrUpdate", 1) &&
@@ -417,7 +417,7 @@ func Test_Notify(t *testing.T) {
return err
}
- return r.UpdateIndexRule(ctx, ir)
+ return r.UpdateIndexRule(ctx, ir, true)
},
validationFunc: func(mocked *mockedEventHandler) bool {
return mocked.AssertNumberOfCalls(t, "OnAddOrUpdate", 0) &&
@@ -455,7 +455,7 @@ func Test_Notify(t *testing.T) {
}
irb.Rules = []string{"trace_id", "duration"}
- return r.UpdateIndexRuleBinding(ctx, irb)
+ return r.UpdateIndexRuleBinding(ctx, irb, true)
},
validationFunc: func(mocked *mockedEventHandler) bool {
return mocked.AssertNumberOfCalls(t, "OnAddOrUpdate", 1) &&
@@ -473,7 +473,7 @@ func Test_Notify(t *testing.T) {
return err
}
- return r.UpdateIndexRuleBinding(ctx, irb)
+ return r.UpdateIndexRuleBinding(ctx, irb, true)
},
validationFunc: func(mocked *mockedEventHandler) bool {
return mocked.AssertNumberOfCalls(t, "OnAddOrUpdate", 0) &&
diff --git a/banyand/metadata/schema/index.go b/banyand/metadata/schema/index.go
index 6b5d948..2c4d22d 100644
--- a/banyand/metadata/schema/index.go
+++ b/banyand/metadata/schema/index.go
@@ -24,7 +24,6 @@ import (
commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
- "github.com/apache/skywalking-banyandb/pkg/schema"
)
var (
@@ -42,7 +41,7 @@ func (e *etcdSchemaRegistry) GetIndexRuleBinding(ctx context.Context, metadata *
func (e *etcdSchemaRegistry) ListIndexRuleBinding(ctx context.Context, opt ListOpt) ([]*databasev1.IndexRuleBinding, error) {
if opt.Group == "" {
- return nil, schema.BadRequest("group", "group should not be empty")
+ return nil, BadRequest("group", "group should not be empty")
}
messages, err := e.listWithPrefix(ctx, listPrefixesForEntity(opt.Group, IndexRuleBindingKeyPrefix), func() proto.Message {
return &databasev1.IndexRuleBinding{}
@@ -88,7 +87,7 @@ func (e *etcdSchemaRegistry) GetIndexRule(ctx context.Context, metadata *commonv
func (e *etcdSchemaRegistry) ListIndexRule(ctx context.Context, opt ListOpt) ([]*databasev1.IndexRule, error) {
if opt.Group == "" {
- return nil, schema.BadRequest("group", "group should not be empty")
+ return nil, BadRequest("group", "group should not be empty")
}
messages, err := e.listWithPrefix(ctx, listPrefixesForEntity(opt.Group, IndexRuleKeyPrefix), func() proto.Message {
return &databasev1.IndexRule{}
diff --git a/banyand/metadata/schema/measure.go b/banyand/metadata/schema/measure.go
index 77e82a1..db47904 100644
--- a/banyand/metadata/schema/measure.go
+++ b/banyand/metadata/schema/measure.go
@@ -19,7 +19,6 @@ package schema
import (
"context"
- "github.com/apache/skywalking-banyandb/pkg/schema"
"time"
"google.golang.org/protobuf/proto"
@@ -44,7 +43,7 @@ func (e *etcdSchemaRegistry) GetMeasure(ctx context.Context, metadata *commonv1.
func (e *etcdSchemaRegistry) ListMeasure(ctx context.Context, opt ListOpt) ([]*databasev1.Measure, error) {
if opt.Group == "" {
- return nil, schema.BadRequest("group", "group should not be empty")
+ return nil, BadRequest("group", "group should not be empty")
}
messages, err := e.listWithPrefix(ctx, listPrefixesForEntity(opt.Group, MeasureKeyPrefix), func() proto.Message {
return &databasev1.Measure{}
@@ -77,7 +76,7 @@ func (e *etcdSchemaRegistry) UpdateMeasure(ctx context.Context, measure *databas
Group: measure.Metadata.Group,
}
_, err := e.GetIndexRule(ctx, idIndexRuleMetadata)
- if schema.IsNotFound(err) {
+ if IsNotFound(err) {
if errIndexRule := e.UpdateIndexRule(ctx, &databasev1.IndexRule{
Metadata: idIndexRuleMetadata,
Tags: []string{TagTypeID},
diff --git a/banyand/metadata/schema/property.go b/banyand/metadata/schema/property.go
index 529382a..bfabd05 100644
--- a/banyand/metadata/schema/property.go
+++ b/banyand/metadata/schema/property.go
@@ -24,7 +24,6 @@ import (
commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
propertyv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1"
- "github.com/apache/skywalking-banyandb/pkg/schema"
)
var PropertyKeyPrefix = "/properties/"
@@ -39,7 +38,7 @@ func (e *etcdSchemaRegistry) GetProperty(ctx context.Context, metadata *property
func (e *etcdSchemaRegistry) ListProperty(ctx context.Context, container *commonv1.Metadata) ([]*propertyv1.Property, error) {
if container.Group == "" {
- return nil, schema.BadRequest("container.group", "group should not be empty")
+ return nil, BadRequest("container.group", "group should not be empty")
}
messages, err := e.listWithPrefix(ctx, listPrefixesForEntity(container.Group, PropertyKeyPrefix+container.Name), func() proto.Message {
return &propertyv1.Property{}
diff --git a/banyand/metadata/schema/stream.go b/banyand/metadata/schema/stream.go
index 3af7ed7..9be4be8 100644
--- a/banyand/metadata/schema/stream.go
+++ b/banyand/metadata/schema/stream.go
@@ -24,7 +24,6 @@ import (
commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
- "github.com/apache/skywalking-banyandb/pkg/schema"
)
var StreamKeyPrefix = "/streams/"
@@ -39,7 +38,7 @@ func (e *etcdSchemaRegistry) GetStream(ctx context.Context, metadata *commonv1.M
func (e *etcdSchemaRegistry) ListStream(ctx context.Context, opt ListOpt) ([]*databasev1.Stream, error) {
if opt.Group == "" {
- return nil, schema.BadRequest("group", "group should not be empty")
+ return nil, BadRequest("group", "group should not be empty")
}
messages, err := e.listWithPrefix(ctx, listPrefixesForEntity(opt.Group, StreamKeyPrefix), func() proto.Message {
return &databasev1.Stream{}
diff --git a/banyand/metadata/schema/topn.go b/banyand/metadata/schema/topn.go
index c762ac7..7bd30c7 100644
--- a/banyand/metadata/schema/topn.go
+++ b/banyand/metadata/schema/topn.go
@@ -24,7 +24,6 @@ import (
commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
- "github.com/apache/skywalking-banyandb/pkg/schema"
)
var TopNAggregationKeyPrefix = "/topnagg/"
@@ -39,7 +38,7 @@ func (e *etcdSchemaRegistry) GetTopNAggregation(ctx context.Context, metadata *c
func (e *etcdSchemaRegistry) ListTopNAggregation(ctx context.Context, opt ListOpt) ([]*databasev1.TopNAggregation, error) {
if opt.Group == "" {
- return nil, schema.BadRequest("group", "group should not be empty")
+ return nil, BadRequest("group", "group should not be empty")
}
messages, err := e.listWithPrefix(ctx, listPrefixesForEntity(opt.Group, TopNAggregationKeyPrefix), func() proto.Message {
return &databasev1.TopNAggregation{}