You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by lu...@apache.org on 2022/04/09 15:06:14 UTC

[skywalking-banyandb] branch polish-error updated: resolve cyclic

This is an automated email from the ASF dual-hosted git repository.

lujiajing pushed a commit to branch polish-error
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/polish-error by this push:
     new 0e260e4  resolve cyclic
0e260e4 is described below

commit 0e260e4eadb7fdeae2f6356fe977315d0d433768
Author: Megrez Lu <lu...@gmail.com>
AuthorDate: Sat Apr 9 23:06:06 2022 +0800

    resolve cyclic
---
 banyand/liaison/grpc/registry.go          | 24 ++++++++++++------------
 {pkg => banyand/metadata}/schema/error.go |  0
 banyand/metadata/schema/etcd.go           |  5 ++---
 banyand/metadata/schema/etcd_test.go      | 16 ++++++++--------
 banyand/metadata/schema/index.go          |  5 ++---
 banyand/metadata/schema/measure.go        |  5 ++---
 banyand/metadata/schema/property.go       |  3 +--
 banyand/metadata/schema/stream.go         |  3 +--
 banyand/metadata/schema/topn.go           |  3 +--
 9 files changed, 29 insertions(+), 35 deletions(-)

diff --git a/banyand/liaison/grpc/registry.go b/banyand/liaison/grpc/registry.go
index 61e2add..53b4802 100644
--- a/banyand/liaison/grpc/registry.go
+++ b/banyand/liaison/grpc/registry.go
@@ -32,7 +32,7 @@ type streamRegistryServer struct {
 
 func (rs *streamRegistryServer) Create(ctx context.Context,
 	req *databasev1.StreamRegistryServiceCreateRequest) (*databasev1.StreamRegistryServiceCreateResponse, error) {
-	if err := rs.schemaRegistry.StreamRegistry().UpdateStream(ctx, req.GetStream()); err != nil {
+	if err := rs.schemaRegistry.StreamRegistry().UpdateStream(ctx, req.GetStream(), false); err != nil {
 		return nil, err
 	}
 	return &databasev1.StreamRegistryServiceCreateResponse{}, nil
@@ -40,7 +40,7 @@ func (rs *streamRegistryServer) Create(ctx context.Context,
 
 func (rs *streamRegistryServer) Update(ctx context.Context,
 	req *databasev1.StreamRegistryServiceUpdateRequest) (*databasev1.StreamRegistryServiceUpdateResponse, error) {
-	if err := rs.schemaRegistry.StreamRegistry().UpdateStream(ctx, req.GetStream()); err != nil {
+	if err := rs.schemaRegistry.StreamRegistry().UpdateStream(ctx, req.GetStream(), true); err != nil {
 		return nil, err
 	}
 	return &databasev1.StreamRegistryServiceUpdateResponse{}, nil
@@ -87,7 +87,7 @@ type indexRuleBindingRegistryServer struct {
 func (rs *indexRuleBindingRegistryServer) Create(ctx context.Context,
 	req *databasev1.IndexRuleBindingRegistryServiceCreateRequest) (
 	*databasev1.IndexRuleBindingRegistryServiceCreateResponse, error) {
-	if err := rs.schemaRegistry.IndexRuleBindingRegistry().UpdateIndexRuleBinding(ctx, req.GetIndexRuleBinding()); err != nil {
+	if err := rs.schemaRegistry.IndexRuleBindingRegistry().UpdateIndexRuleBinding(ctx, req.GetIndexRuleBinding(), false); err != nil {
 		return nil, err
 	}
 	return &databasev1.IndexRuleBindingRegistryServiceCreateResponse{}, nil
@@ -96,7 +96,7 @@ func (rs *indexRuleBindingRegistryServer) Create(ctx context.Context,
 func (rs *indexRuleBindingRegistryServer) Update(ctx context.Context,
 	req *databasev1.IndexRuleBindingRegistryServiceUpdateRequest) (
 	*databasev1.IndexRuleBindingRegistryServiceUpdateResponse, error) {
-	if err := rs.schemaRegistry.IndexRuleBindingRegistry().UpdateIndexRuleBinding(ctx, req.GetIndexRuleBinding()); err != nil {
+	if err := rs.schemaRegistry.IndexRuleBindingRegistry().UpdateIndexRuleBinding(ctx, req.GetIndexRuleBinding(), true); err != nil {
 		return nil, err
 	}
 	return &databasev1.IndexRuleBindingRegistryServiceUpdateResponse{}, nil
@@ -146,7 +146,7 @@ type indexRuleRegistryServer struct {
 
 func (rs *indexRuleRegistryServer) Create(ctx context.Context, req *databasev1.IndexRuleRegistryServiceCreateRequest) (
 	*databasev1.IndexRuleRegistryServiceCreateResponse, error) {
-	if err := rs.schemaRegistry.IndexRuleRegistry().UpdateIndexRule(ctx, req.GetIndexRule()); err != nil {
+	if err := rs.schemaRegistry.IndexRuleRegistry().UpdateIndexRule(ctx, req.GetIndexRule(), false); err != nil {
 		return nil, err
 	}
 	return &databasev1.IndexRuleRegistryServiceCreateResponse{}, nil
@@ -154,7 +154,7 @@ func (rs *indexRuleRegistryServer) Create(ctx context.Context, req *databasev1.I
 
 func (rs *indexRuleRegistryServer) Update(ctx context.Context, req *databasev1.IndexRuleRegistryServiceUpdateRequest) (
 	*databasev1.IndexRuleRegistryServiceUpdateResponse, error) {
-	if err := rs.schemaRegistry.IndexRuleRegistry().UpdateIndexRule(ctx, req.GetIndexRule()); err != nil {
+	if err := rs.schemaRegistry.IndexRuleRegistry().UpdateIndexRule(ctx, req.GetIndexRule(), true); err != nil {
 		return nil, err
 	}
 	return &databasev1.IndexRuleRegistryServiceUpdateResponse{}, nil
@@ -200,7 +200,7 @@ type measureRegistryServer struct {
 
 func (rs *measureRegistryServer) Create(ctx context.Context, req *databasev1.MeasureRegistryServiceCreateRequest) (
 	*databasev1.MeasureRegistryServiceCreateResponse, error) {
-	if err := rs.schemaRegistry.MeasureRegistry().UpdateMeasure(ctx, req.GetMeasure()); err != nil {
+	if err := rs.schemaRegistry.MeasureRegistry().UpdateMeasure(ctx, req.GetMeasure(), false); err != nil {
 		return nil, err
 	}
 	return &databasev1.MeasureRegistryServiceCreateResponse{}, nil
@@ -208,7 +208,7 @@ func (rs *measureRegistryServer) Create(ctx context.Context, req *databasev1.Mea
 
 func (rs *measureRegistryServer) Update(ctx context.Context, req *databasev1.MeasureRegistryServiceUpdateRequest) (
 	*databasev1.MeasureRegistryServiceUpdateResponse, error) {
-	if err := rs.schemaRegistry.MeasureRegistry().UpdateMeasure(ctx, req.GetMeasure()); err != nil {
+	if err := rs.schemaRegistry.MeasureRegistry().UpdateMeasure(ctx, req.GetMeasure(), true); err != nil {
 		return nil, err
 	}
 	return &databasev1.MeasureRegistryServiceUpdateResponse{}, nil
@@ -254,7 +254,7 @@ type groupRegistryServer struct {
 
 func (rs *groupRegistryServer) Create(ctx context.Context, req *databasev1.GroupRegistryServiceCreateRequest) (
 	*databasev1.GroupRegistryServiceCreateResponse, error) {
-	if err := rs.schemaRegistry.GroupRegistry().UpdateGroup(ctx, req.GetGroup()); err != nil {
+	if err := rs.schemaRegistry.GroupRegistry().UpdateGroup(ctx, req.GetGroup(), false); err != nil {
 		return nil, err
 	}
 	return &databasev1.GroupRegistryServiceCreateResponse{}, nil
@@ -262,7 +262,7 @@ func (rs *groupRegistryServer) Create(ctx context.Context, req *databasev1.Group
 
 func (rs *groupRegistryServer) Update(ctx context.Context, req *databasev1.GroupRegistryServiceUpdateRequest) (
 	*databasev1.GroupRegistryServiceUpdateResponse, error) {
-	if err := rs.schemaRegistry.GroupRegistry().UpdateGroup(ctx, req.GetGroup()); err != nil {
+	if err := rs.schemaRegistry.GroupRegistry().UpdateGroup(ctx, req.GetGroup(), true); err != nil {
 		return nil, err
 	}
 	return &databasev1.GroupRegistryServiceUpdateResponse{}, nil
@@ -308,7 +308,7 @@ type topNAggregationRegistryServer struct {
 
 func (ts *topNAggregationRegistryServer) Create(ctx context.Context,
 	req *databasev1.TopNAggregationRegistryServiceCreateRequest) (*databasev1.TopNAggregationRegistryServiceCreateResponse, error) {
-	if err := ts.schemaRegistry.TopNAggregationRegistry().UpdateTopNAggregation(ctx, req.GetTopNAggregation()); err != nil {
+	if err := ts.schemaRegistry.TopNAggregationRegistry().UpdateTopNAggregation(ctx, req.GetTopNAggregation(), false); err != nil {
 		return nil, err
 	}
 	return &databasev1.TopNAggregationRegistryServiceCreateResponse{}, nil
@@ -316,7 +316,7 @@ func (ts *topNAggregationRegistryServer) Create(ctx context.Context,
 
 func (ts *topNAggregationRegistryServer) Update(ctx context.Context,
 	req *databasev1.TopNAggregationRegistryServiceUpdateRequest) (*databasev1.TopNAggregationRegistryServiceUpdateResponse, error) {
-	if err := ts.schemaRegistry.TopNAggregationRegistry().UpdateTopNAggregation(ctx, req.GetTopNAggregation()); err != nil {
+	if err := ts.schemaRegistry.TopNAggregationRegistry().UpdateTopNAggregation(ctx, req.GetTopNAggregation(), true); err != nil {
 		return nil, err
 	}
 	return &databasev1.TopNAggregationRegistryServiceUpdateResponse{}, nil
diff --git a/pkg/schema/error.go b/banyand/metadata/schema/error.go
similarity index 100%
rename from pkg/schema/error.go
rename to banyand/metadata/schema/error.go
diff --git a/banyand/metadata/schema/etcd.go b/banyand/metadata/schema/etcd.go
index e8e40bd..870f026 100644
--- a/banyand/metadata/schema/etcd.go
+++ b/banyand/metadata/schema/etcd.go
@@ -32,7 +32,6 @@ import (
 
 	commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
 	databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
-	"github.com/apache/skywalking-banyandb/pkg/schema"
 )
 
 var (
@@ -176,7 +175,7 @@ func (e *etcdSchemaRegistry) get(ctx context.Context, key string, message proto.
 		return err
 	}
 	if resp.Count == 0 {
-		return schema.ErrGRPCResourceNotFound
+		return ErrGRPCResourceNotFound
 	}
 	if resp.Count > 1 {
 		return ErrUnexpectedNumberOfEntities
@@ -211,7 +210,7 @@ func (e *etcdSchemaRegistry) update(ctx context.Context, metadata Metadata, allo
 	replace := getResp.Count > 0
 	if replace {
 		if !allowOverwrite {
-			return schema.ErrGRPCAlreadyExists
+			return ErrGRPCAlreadyExists
 		}
 		existingVal, innerErr := metadata.Unmarshal(getResp.Kvs[0].Value)
 		if innerErr != nil {
diff --git a/banyand/metadata/schema/etcd_test.go b/banyand/metadata/schema/etcd_test.go
index 4f53b51..be33d21 100644
--- a/banyand/metadata/schema/etcd_test.go
+++ b/banyand/metadata/schema/etcd_test.go
@@ -68,7 +68,7 @@ func preloadSchema(e Registry) error {
 	if err := protojson.Unmarshal([]byte(groupJSON), g); err != nil {
 		return err
 	}
-	if err := e.UpdateGroup(context.TODO(), g); err != nil {
+	if err := e.UpdateGroup(context.TODO(), g, false); err != nil {
 		return err
 	}
 
@@ -76,7 +76,7 @@ func preloadSchema(e Registry) error {
 	if err := protojson.Unmarshal([]byte(streamJSON), s); err != nil {
 		return err
 	}
-	err := e.UpdateStream(context.Background(), s)
+	err := e.UpdateStream(context.Background(), s, false)
 	if err != nil {
 		return err
 	}
@@ -85,7 +85,7 @@ func preloadSchema(e Registry) error {
 	if err = protojson.Unmarshal([]byte(indexRuleBindingJSON), indexRuleBinding); err != nil {
 		return err
 	}
-	err = e.UpdateIndexRuleBinding(context.Background(), indexRuleBinding)
+	err = e.UpdateIndexRuleBinding(context.Background(), indexRuleBinding, false)
 	if err != nil {
 		return err
 	}
@@ -104,7 +104,7 @@ func preloadSchema(e Registry) error {
 		if err != nil {
 			return err
 		}
-		err = e.UpdateIndexRule(context.Background(), &idxRule)
+		err = e.UpdateIndexRule(context.Background(), &idxRule, false)
 		if err != nil {
 			return err
 		}
@@ -399,7 +399,7 @@ func Test_Notify(t *testing.T) {
 				}
 
 				ir.Type = databasev1.IndexRule_TYPE_TREE
-				return r.UpdateIndexRule(ctx, ir)
+				return r.UpdateIndexRule(ctx, ir, true)
 			},
 			validationFunc: func(mocked *mockedEventHandler) bool {
 				return mocked.AssertNumberOfCalls(t, "OnAddOrUpdate", 1) &&
@@ -417,7 +417,7 @@ func Test_Notify(t *testing.T) {
 					return err
 				}
 
-				return r.UpdateIndexRule(ctx, ir)
+				return r.UpdateIndexRule(ctx, ir, true)
 			},
 			validationFunc: func(mocked *mockedEventHandler) bool {
 				return mocked.AssertNumberOfCalls(t, "OnAddOrUpdate", 0) &&
@@ -455,7 +455,7 @@ func Test_Notify(t *testing.T) {
 				}
 
 				irb.Rules = []string{"trace_id", "duration"}
-				return r.UpdateIndexRuleBinding(ctx, irb)
+				return r.UpdateIndexRuleBinding(ctx, irb, true)
 			},
 			validationFunc: func(mocked *mockedEventHandler) bool {
 				return mocked.AssertNumberOfCalls(t, "OnAddOrUpdate", 1) &&
@@ -473,7 +473,7 @@ func Test_Notify(t *testing.T) {
 					return err
 				}
 
-				return r.UpdateIndexRuleBinding(ctx, irb)
+				return r.UpdateIndexRuleBinding(ctx, irb, true)
 			},
 			validationFunc: func(mocked *mockedEventHandler) bool {
 				return mocked.AssertNumberOfCalls(t, "OnAddOrUpdate", 0) &&
diff --git a/banyand/metadata/schema/index.go b/banyand/metadata/schema/index.go
index 6b5d948..2c4d22d 100644
--- a/banyand/metadata/schema/index.go
+++ b/banyand/metadata/schema/index.go
@@ -24,7 +24,6 @@ import (
 
 	commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
 	databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
-	"github.com/apache/skywalking-banyandb/pkg/schema"
 )
 
 var (
@@ -42,7 +41,7 @@ func (e *etcdSchemaRegistry) GetIndexRuleBinding(ctx context.Context, metadata *
 
 func (e *etcdSchemaRegistry) ListIndexRuleBinding(ctx context.Context, opt ListOpt) ([]*databasev1.IndexRuleBinding, error) {
 	if opt.Group == "" {
-		return nil, schema.BadRequest("group", "group should not be empty")
+		return nil, BadRequest("group", "group should not be empty")
 	}
 	messages, err := e.listWithPrefix(ctx, listPrefixesForEntity(opt.Group, IndexRuleBindingKeyPrefix), func() proto.Message {
 		return &databasev1.IndexRuleBinding{}
@@ -88,7 +87,7 @@ func (e *etcdSchemaRegistry) GetIndexRule(ctx context.Context, metadata *commonv
 
 func (e *etcdSchemaRegistry) ListIndexRule(ctx context.Context, opt ListOpt) ([]*databasev1.IndexRule, error) {
 	if opt.Group == "" {
-		return nil, schema.BadRequest("group", "group should not be empty")
+		return nil, BadRequest("group", "group should not be empty")
 	}
 	messages, err := e.listWithPrefix(ctx, listPrefixesForEntity(opt.Group, IndexRuleKeyPrefix), func() proto.Message {
 		return &databasev1.IndexRule{}
diff --git a/banyand/metadata/schema/measure.go b/banyand/metadata/schema/measure.go
index 77e82a1..db47904 100644
--- a/banyand/metadata/schema/measure.go
+++ b/banyand/metadata/schema/measure.go
@@ -19,7 +19,6 @@ package schema
 
 import (
 	"context"
-	"github.com/apache/skywalking-banyandb/pkg/schema"
 	"time"
 
 	"google.golang.org/protobuf/proto"
@@ -44,7 +43,7 @@ func (e *etcdSchemaRegistry) GetMeasure(ctx context.Context, metadata *commonv1.
 
 func (e *etcdSchemaRegistry) ListMeasure(ctx context.Context, opt ListOpt) ([]*databasev1.Measure, error) {
 	if opt.Group == "" {
-		return nil, schema.BadRequest("group", "group should not be empty")
+		return nil, BadRequest("group", "group should not be empty")
 	}
 	messages, err := e.listWithPrefix(ctx, listPrefixesForEntity(opt.Group, MeasureKeyPrefix), func() proto.Message {
 		return &databasev1.Measure{}
@@ -77,7 +76,7 @@ func (e *etcdSchemaRegistry) UpdateMeasure(ctx context.Context, measure *databas
 		Group: measure.Metadata.Group,
 	}
 	_, err := e.GetIndexRule(ctx, idIndexRuleMetadata)
-	if schema.IsNotFound(err) {
+	if IsNotFound(err) {
 		if errIndexRule := e.UpdateIndexRule(ctx, &databasev1.IndexRule{
 			Metadata:  idIndexRuleMetadata,
 			Tags:      []string{TagTypeID},
diff --git a/banyand/metadata/schema/property.go b/banyand/metadata/schema/property.go
index 529382a..bfabd05 100644
--- a/banyand/metadata/schema/property.go
+++ b/banyand/metadata/schema/property.go
@@ -24,7 +24,6 @@ import (
 
 	commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
 	propertyv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1"
-	"github.com/apache/skywalking-banyandb/pkg/schema"
 )
 
 var PropertyKeyPrefix = "/properties/"
@@ -39,7 +38,7 @@ func (e *etcdSchemaRegistry) GetProperty(ctx context.Context, metadata *property
 
 func (e *etcdSchemaRegistry) ListProperty(ctx context.Context, container *commonv1.Metadata) ([]*propertyv1.Property, error) {
 	if container.Group == "" {
-		return nil, schema.BadRequest("container.group", "group should not be empty")
+		return nil, BadRequest("container.group", "group should not be empty")
 	}
 	messages, err := e.listWithPrefix(ctx, listPrefixesForEntity(container.Group, PropertyKeyPrefix+container.Name), func() proto.Message {
 		return &propertyv1.Property{}
diff --git a/banyand/metadata/schema/stream.go b/banyand/metadata/schema/stream.go
index 3af7ed7..9be4be8 100644
--- a/banyand/metadata/schema/stream.go
+++ b/banyand/metadata/schema/stream.go
@@ -24,7 +24,6 @@ import (
 
 	commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
 	databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
-	"github.com/apache/skywalking-banyandb/pkg/schema"
 )
 
 var StreamKeyPrefix = "/streams/"
@@ -39,7 +38,7 @@ func (e *etcdSchemaRegistry) GetStream(ctx context.Context, metadata *commonv1.M
 
 func (e *etcdSchemaRegistry) ListStream(ctx context.Context, opt ListOpt) ([]*databasev1.Stream, error) {
 	if opt.Group == "" {
-		return nil, schema.BadRequest("group", "group should not be empty")
+		return nil, BadRequest("group", "group should not be empty")
 	}
 	messages, err := e.listWithPrefix(ctx, listPrefixesForEntity(opt.Group, StreamKeyPrefix), func() proto.Message {
 		return &databasev1.Stream{}
diff --git a/banyand/metadata/schema/topn.go b/banyand/metadata/schema/topn.go
index c762ac7..7bd30c7 100644
--- a/banyand/metadata/schema/topn.go
+++ b/banyand/metadata/schema/topn.go
@@ -24,7 +24,6 @@ import (
 
 	commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
 	databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
-	"github.com/apache/skywalking-banyandb/pkg/schema"
 )
 
 var TopNAggregationKeyPrefix = "/topnagg/"
@@ -39,7 +38,7 @@ func (e *etcdSchemaRegistry) GetTopNAggregation(ctx context.Context, metadata *c
 
 func (e *etcdSchemaRegistry) ListTopNAggregation(ctx context.Context, opt ListOpt) ([]*databasev1.TopNAggregation, error) {
 	if opt.Group == "" {
-		return nil, schema.BadRequest("group", "group should not be empty")
+		return nil, BadRequest("group", "group should not be empty")
 	}
 	messages, err := e.listWithPrefix(ctx, listPrefixesForEntity(opt.Group, TopNAggregationKeyPrefix), func() proto.Message {
 		return &databasev1.TopNAggregation{}