You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ha...@apache.org on 2022/04/14 12:15:46 UTC
[skywalking-banyandb] branch main updated: Return error created from status (#100)
This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new 54c252b Return error created from status (#100)
54c252b is described below
commit 54c252be79237097a324a06fdf2ea250a9bdb718
Author: Jiajing LU <lu...@gmail.com>
AuthorDate: Thu Apr 14 20:15:40 2022 +0800
Return error created from status (#100)
* add grpc errors
* add create method
---
banyand/liaison/grpc/property.go | 4 +-
banyand/liaison/grpc/registry.go | 12 +++---
banyand/liaison/grpc/registry_test.go | 8 ++--
banyand/metadata/schema/error.go | 51 +++++++++++++++++++++++
banyand/metadata/schema/etcd.go | 43 +++++++++++++++++---
banyand/metadata/schema/etcd_test.go | 8 ++--
banyand/metadata/schema/group.go | 14 +++++--
banyand/metadata/schema/index.go | 29 +++++++++++--
banyand/metadata/schema/measure.go | 76 ++++++++++++++++++++++++++++++++---
banyand/metadata/schema/property.go | 17 ++++++--
banyand/metadata/schema/schema.go | 7 ++++
banyand/metadata/schema/stream.go | 16 ++++++--
banyand/metadata/schema/topn.go | 16 ++++++--
go.mod | 2 +-
pkg/test/measure/etcd.go | 40 +++++++++++++++---
pkg/test/stream/etcd.go | 58 +++++++++++++++++++++-----
16 files changed, 340 insertions(+), 61 deletions(-)
diff --git a/banyand/liaison/grpc/property.go b/banyand/liaison/grpc/property.go
index be08309..7326d94 100644
--- a/banyand/liaison/grpc/property.go
+++ b/banyand/liaison/grpc/property.go
@@ -14,7 +14,7 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
-//
+
package grpc
import (
@@ -30,7 +30,7 @@ type propertyServer struct {
}
func (ps *propertyServer) Create(ctx context.Context, req *propertyv1.CreateRequest) (*propertyv1.CreateResponse, error) {
- if err := ps.schemaRegistry.PropertyRegistry().UpdateProperty(ctx, req.GetProperty()); err != nil {
+ if err := ps.schemaRegistry.PropertyRegistry().CreateProperty(ctx, req.GetProperty()); err != nil {
return nil, err
}
return &propertyv1.CreateResponse{}, nil
diff --git a/banyand/liaison/grpc/registry.go b/banyand/liaison/grpc/registry.go
index 61e2add..5f25b8a 100644
--- a/banyand/liaison/grpc/registry.go
+++ b/banyand/liaison/grpc/registry.go
@@ -32,7 +32,7 @@ type streamRegistryServer struct {
func (rs *streamRegistryServer) Create(ctx context.Context,
req *databasev1.StreamRegistryServiceCreateRequest) (*databasev1.StreamRegistryServiceCreateResponse, error) {
- if err := rs.schemaRegistry.StreamRegistry().UpdateStream(ctx, req.GetStream()); err != nil {
+ if err := rs.schemaRegistry.StreamRegistry().CreateStream(ctx, req.GetStream()); err != nil {
return nil, err
}
return &databasev1.StreamRegistryServiceCreateResponse{}, nil
@@ -87,7 +87,7 @@ type indexRuleBindingRegistryServer struct {
func (rs *indexRuleBindingRegistryServer) Create(ctx context.Context,
req *databasev1.IndexRuleBindingRegistryServiceCreateRequest) (
*databasev1.IndexRuleBindingRegistryServiceCreateResponse, error) {
- if err := rs.schemaRegistry.IndexRuleBindingRegistry().UpdateIndexRuleBinding(ctx, req.GetIndexRuleBinding()); err != nil {
+ if err := rs.schemaRegistry.IndexRuleBindingRegistry().CreateIndexRuleBinding(ctx, req.GetIndexRuleBinding()); err != nil {
return nil, err
}
return &databasev1.IndexRuleBindingRegistryServiceCreateResponse{}, nil
@@ -146,7 +146,7 @@ type indexRuleRegistryServer struct {
func (rs *indexRuleRegistryServer) Create(ctx context.Context, req *databasev1.IndexRuleRegistryServiceCreateRequest) (
*databasev1.IndexRuleRegistryServiceCreateResponse, error) {
- if err := rs.schemaRegistry.IndexRuleRegistry().UpdateIndexRule(ctx, req.GetIndexRule()); err != nil {
+ if err := rs.schemaRegistry.IndexRuleRegistry().CreateIndexRule(ctx, req.GetIndexRule()); err != nil {
return nil, err
}
return &databasev1.IndexRuleRegistryServiceCreateResponse{}, nil
@@ -200,7 +200,7 @@ type measureRegistryServer struct {
func (rs *measureRegistryServer) Create(ctx context.Context, req *databasev1.MeasureRegistryServiceCreateRequest) (
*databasev1.MeasureRegistryServiceCreateResponse, error) {
- if err := rs.schemaRegistry.MeasureRegistry().UpdateMeasure(ctx, req.GetMeasure()); err != nil {
+ if err := rs.schemaRegistry.MeasureRegistry().CreateMeasure(ctx, req.GetMeasure()); err != nil {
return nil, err
}
return &databasev1.MeasureRegistryServiceCreateResponse{}, nil
@@ -254,7 +254,7 @@ type groupRegistryServer struct {
func (rs *groupRegistryServer) Create(ctx context.Context, req *databasev1.GroupRegistryServiceCreateRequest) (
*databasev1.GroupRegistryServiceCreateResponse, error) {
- if err := rs.schemaRegistry.GroupRegistry().UpdateGroup(ctx, req.GetGroup()); err != nil {
+ if err := rs.schemaRegistry.GroupRegistry().CreateGroup(ctx, req.GetGroup()); err != nil {
return nil, err
}
return &databasev1.GroupRegistryServiceCreateResponse{}, nil
@@ -308,7 +308,7 @@ type topNAggregationRegistryServer struct {
func (ts *topNAggregationRegistryServer) Create(ctx context.Context,
req *databasev1.TopNAggregationRegistryServiceCreateRequest) (*databasev1.TopNAggregationRegistryServiceCreateResponse, error) {
- if err := ts.schemaRegistry.TopNAggregationRegistry().UpdateTopNAggregation(ctx, req.GetTopNAggregation()); err != nil {
+ if err := ts.schemaRegistry.TopNAggregationRegistry().CreateTopNAggregation(ctx, req.GetTopNAggregation()); err != nil {
return nil, err
}
return &databasev1.TopNAggregationRegistryServiceCreateResponse{}, nil
diff --git a/banyand/liaison/grpc/registry_test.go b/banyand/liaison/grpc/registry_test.go
index 10033b1..a88ea70 100644
--- a/banyand/liaison/grpc/registry_test.go
+++ b/banyand/liaison/grpc/registry_test.go
@@ -23,6 +23,7 @@ import (
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
grpclib "google.golang.org/grpc"
+ "google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
@@ -30,7 +31,6 @@ import (
"github.com/apache/skywalking-banyandb/banyand/discovery"
"github.com/apache/skywalking-banyandb/banyand/liaison/grpc"
"github.com/apache/skywalking-banyandb/banyand/metadata"
- "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
"github.com/apache/skywalking-banyandb/banyand/queue"
"github.com/apache/skywalking-banyandb/pkg/test"
)
@@ -66,7 +66,7 @@ var _ = Describe("Registry", func() {
Metadata: meta,
})
errStatus, _ := status.FromError(err)
- Expect(errStatus.Message()).To(Equal(schema.ErrEntityNotFound.Error()))
+ Expect(errStatus.Code()).To(Equal(codes.NotFound))
By("Creating a new stream")
_, err = client.Create(context.TODO(), &databasev1.StreamRegistryServiceCreateRequest{Stream: getResp.GetStream()})
Expect(err).ShouldNot(HaveOccurred())
@@ -96,7 +96,7 @@ var _ = Describe("Registry", func() {
Metadata: meta,
})
errStatus, _ := status.FromError(err)
- Expect(errStatus.Message()).To(Equal(schema.ErrEntityNotFound.Error()))
+ Expect(errStatus.Code()).To(Equal(codes.NotFound))
By("Creating a new index-rule-binding")
_, err = client.Create(context.TODO(), &databasev1.IndexRuleBindingRegistryServiceCreateRequest{IndexRuleBinding: getResp.GetIndexRuleBinding()})
Expect(err).ShouldNot(HaveOccurred())
@@ -126,7 +126,7 @@ var _ = Describe("Registry", func() {
Metadata: meta,
})
errStatus, _ := status.FromError(err)
- Expect(errStatus.Message()).To(Equal(schema.ErrEntityNotFound.Error()))
+ Expect(errStatus.Code()).To(Equal(codes.NotFound))
By("Creating a new index-rule")
_, err = client.Create(context.TODO(), &databasev1.IndexRuleRegistryServiceCreateRequest{IndexRule: getResp.GetIndexRule()})
Expect(err).ShouldNot(HaveOccurred())
diff --git a/banyand/metadata/schema/error.go b/banyand/metadata/schema/error.go
new file mode 100644
index 0000000..8853e0f
--- /dev/null
+++ b/banyand/metadata/schema/error.go
@@ -0,0 +1,51 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package schema
+
+import (
+ "github.com/pkg/errors"
+ "google.golang.org/genproto/googleapis/rpc/errdetails"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+)
+
+var (
+ statusGRPCInvalidArgument = status.New(codes.InvalidArgument, "banyandb: input is invalid")
+ ErrGRPCInvalidArgument = statusGRPCInvalidArgument.Err()
+ statusGRPCResourceNotFound = status.New(codes.NotFound, "banyandb: resource not found")
+ ErrGRPCResourceNotFound = statusGRPCResourceNotFound.Err()
+ statusGRPCAlreadyExists = status.New(codes.AlreadyExists, "banyandb: resource already exists")
+ ErrGRPCAlreadyExists = statusGRPCAlreadyExists.Err()
+)
+
+func IsNotFound(err error) bool {
+ return errors.Is(err, ErrGRPCResourceNotFound)
+}
+
+// BadRequest creates a gRPC error with error details with type BadRequest,
+// which describes violations in a client request.
+func BadRequest(field, desc string) error {
+ v := &errdetails.BadRequest_FieldViolation{
+ Field: field,
+ Description: desc,
+ }
+ br := &errdetails.BadRequest{}
+ br.FieldViolations = append(br.FieldViolations, v)
+ st, _ := statusGRPCInvalidArgument.WithDetails(br)
+ return st.Err()
+}
diff --git a/banyand/metadata/schema/etcd.go b/banyand/metadata/schema/etcd.go
index 0d1d58e..7225cd5 100644
--- a/banyand/metadata/schema/etcd.go
+++ b/banyand/metadata/schema/etcd.go
@@ -42,7 +42,6 @@ var (
_ Group = (*etcdSchemaRegistry)(nil)
_ Property = (*etcdSchemaRegistry)(nil)
- ErrEntityNotFound = errors.New("entity is not found")
ErrUnexpectedNumberOfEntities = errors.New("unexpected number of entities")
ErrConcurrentModification = errors.New("concurrent modification of entities")
@@ -176,7 +175,7 @@ func (e *etcdSchemaRegistry) get(ctx context.Context, key string, message proto.
return err
}
if resp.Count == 0 {
- return ErrEntityNotFound
+ return ErrGRPCResourceNotFound
}
if resp.Count > 1 {
return ErrUnexpectedNumberOfEntities
@@ -192,6 +191,9 @@ func (e *etcdSchemaRegistry) get(ctx context.Context, key string, message proto.
return nil
}
+// update will first ensure the existence of the entity with the metadata,
+// and overwrite the existing value if so.
+// Otherwise, it will return ErrGRPCResourceNotFound.
func (e *etcdSchemaRegistry) update(ctx context.Context, metadata Metadata) error {
key, err := metadata.Key()
if err != nil {
@@ -231,11 +233,40 @@ func (e *etcdSchemaRegistry) update(ctx context.Context, metadata Metadata) erro
return ErrConcurrentModification
}
} else {
- _, err = e.kv.Put(ctx, key, string(val))
- if err != nil {
- return err
- }
+ return ErrGRPCResourceNotFound
+ }
+ e.notifyUpdate(metadata)
+ return nil
+}
+
+// create will first check existence of the entity with the metadata,
+// and put the value if it does not exist.
+// Otherwise, it will return ErrGRPCAlreadyExists.
+func (e *etcdSchemaRegistry) create(ctx context.Context, metadata Metadata) error {
+ key, err := metadata.Key()
+ if err != nil {
+ return err
}
+ getResp, err := e.kv.Get(ctx, key)
+ if err != nil {
+ return err
+ }
+ if getResp.Count > 1 {
+ return ErrUnexpectedNumberOfEntities
+ }
+ val, err := proto.Marshal(metadata.Spec.(proto.Message))
+ if err != nil {
+ return err
+ }
+ replace := getResp.Count > 0
+ if replace {
+ return ErrGRPCAlreadyExists
+ }
+ _, err = e.kv.Put(ctx, key, string(val))
+ if err != nil {
+ return err
+ }
+
e.notifyUpdate(metadata)
return nil
}
diff --git a/banyand/metadata/schema/etcd_test.go b/banyand/metadata/schema/etcd_test.go
index 4f53b51..9131e0d 100644
--- a/banyand/metadata/schema/etcd_test.go
+++ b/banyand/metadata/schema/etcd_test.go
@@ -68,7 +68,7 @@ func preloadSchema(e Registry) error {
if err := protojson.Unmarshal([]byte(groupJSON), g); err != nil {
return err
}
- if err := e.UpdateGroup(context.TODO(), g); err != nil {
+ if err := e.CreateGroup(context.TODO(), g); err != nil {
return err
}
@@ -76,7 +76,7 @@ func preloadSchema(e Registry) error {
if err := protojson.Unmarshal([]byte(streamJSON), s); err != nil {
return err
}
- err := e.UpdateStream(context.Background(), s)
+ err := e.CreateStream(context.Background(), s)
if err != nil {
return err
}
@@ -85,7 +85,7 @@ func preloadSchema(e Registry) error {
if err = protojson.Unmarshal([]byte(indexRuleBindingJSON), indexRuleBinding); err != nil {
return err
}
- err = e.UpdateIndexRuleBinding(context.Background(), indexRuleBinding)
+ err = e.CreateIndexRuleBinding(context.Background(), indexRuleBinding)
if err != nil {
return err
}
@@ -104,7 +104,7 @@ func preloadSchema(e Registry) error {
if err != nil {
return err
}
- err = e.UpdateIndexRule(context.Background(), &idxRule)
+ err = e.CreateIndexRule(context.Background(), &idxRule)
if err != nil {
return err
}
diff --git a/banyand/metadata/schema/group.go b/banyand/metadata/schema/group.go
index 786dbb4..e1b0722 100644
--- a/banyand/metadata/schema/group.go
+++ b/banyand/metadata/schema/group.go
@@ -14,7 +14,7 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
-//
+
package schema
import (
@@ -31,8 +31,6 @@ import (
var (
GroupsKeyPrefix = "/groups/"
GroupMetadataKey = "/__meta_group__"
-
- ErrGroupAbsent = errors.New("group is absent")
)
func (e *etcdSchemaRegistry) GetGroup(ctx context.Context, group string) (*commonv1.Group, error) {
@@ -88,6 +86,16 @@ func (e *etcdSchemaRegistry) DeleteGroup(ctx context.Context, group string) (boo
return true, nil
}
+func (e *etcdSchemaRegistry) CreateGroup(ctx context.Context, group *commonv1.Group) error {
+ return e.create(ctx, Metadata{
+ TypeMeta: TypeMeta{
+ Kind: KindGroup,
+ Name: group.GetMetadata().GetName(),
+ },
+ Spec: group,
+ })
+}
+
func (e *etcdSchemaRegistry) UpdateGroup(ctx context.Context, group *commonv1.Group) error {
return e.update(ctx, Metadata{
TypeMeta: TypeMeta{
diff --git a/banyand/metadata/schema/index.go b/banyand/metadata/schema/index.go
index 33dd4cb..c79ebfc 100644
--- a/banyand/metadata/schema/index.go
+++ b/banyand/metadata/schema/index.go
@@ -14,13 +14,12 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
-//
+
package schema
import (
"context"
- "github.com/pkg/errors"
"google.golang.org/protobuf/proto"
commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
@@ -42,7 +41,7 @@ func (e *etcdSchemaRegistry) GetIndexRuleBinding(ctx context.Context, metadata *
func (e *etcdSchemaRegistry) ListIndexRuleBinding(ctx context.Context, opt ListOpt) ([]*databasev1.IndexRuleBinding, error) {
if opt.Group == "" {
- return nil, errors.Wrap(ErrGroupAbsent, "list index rule binding")
+ return nil, BadRequest("group", "group should not be empty")
}
messages, err := e.listWithPrefix(ctx, listPrefixesForEntity(opt.Group, IndexRuleBindingKeyPrefix), func() proto.Message {
return &databasev1.IndexRuleBinding{}
@@ -57,6 +56,17 @@ func (e *etcdSchemaRegistry) ListIndexRuleBinding(ctx context.Context, opt ListO
return entities, nil
}
+func (e *etcdSchemaRegistry) CreateIndexRuleBinding(ctx context.Context, indexRuleBinding *databasev1.IndexRuleBinding) error {
+ return e.create(ctx, Metadata{
+ TypeMeta: TypeMeta{
+ Kind: KindIndexRuleBinding,
+ Name: indexRuleBinding.GetMetadata().GetName(),
+ Group: indexRuleBinding.GetMetadata().GetGroup(),
+ },
+ Spec: indexRuleBinding,
+ })
+}
+
func (e *etcdSchemaRegistry) UpdateIndexRuleBinding(ctx context.Context, indexRuleBinding *databasev1.IndexRuleBinding) error {
return e.update(ctx, Metadata{
TypeMeta: TypeMeta{
@@ -88,7 +98,7 @@ func (e *etcdSchemaRegistry) GetIndexRule(ctx context.Context, metadata *commonv
func (e *etcdSchemaRegistry) ListIndexRule(ctx context.Context, opt ListOpt) ([]*databasev1.IndexRule, error) {
if opt.Group == "" {
- return nil, errors.Wrap(ErrGroupAbsent, "list index rule")
+ return nil, BadRequest("group", "group should not be empty")
}
messages, err := e.listWithPrefix(ctx, listPrefixesForEntity(opt.Group, IndexRuleKeyPrefix), func() proto.Message {
return &databasev1.IndexRule{}
@@ -103,6 +113,17 @@ func (e *etcdSchemaRegistry) ListIndexRule(ctx context.Context, opt ListOpt) ([]
return entities, nil
}
+func (e *etcdSchemaRegistry) CreateIndexRule(ctx context.Context, indexRule *databasev1.IndexRule) error {
+ return e.create(ctx, Metadata{
+ TypeMeta: TypeMeta{
+ Kind: KindIndexRule,
+ Name: indexRule.GetMetadata().GetName(),
+ Group: indexRule.GetMetadata().GetGroup(),
+ },
+ Spec: indexRule,
+ })
+}
+
func (e *etcdSchemaRegistry) UpdateIndexRule(ctx context.Context, indexRule *databasev1.IndexRule) error {
return e.update(ctx, Metadata{
TypeMeta: TypeMeta{
diff --git a/banyand/metadata/schema/measure.go b/banyand/metadata/schema/measure.go
index 33e5c42..2bc1fae 100644
--- a/banyand/metadata/schema/measure.go
+++ b/banyand/metadata/schema/measure.go
@@ -14,14 +14,13 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
-//
+
package schema
import (
"context"
"time"
- "github.com/pkg/errors"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/timestamppb"
@@ -44,7 +43,7 @@ func (e *etcdSchemaRegistry) GetMeasure(ctx context.Context, metadata *commonv1.
func (e *etcdSchemaRegistry) ListMeasure(ctx context.Context, opt ListOpt) ([]*databasev1.Measure, error) {
if opt.Group == "" {
- return nil, errors.Wrap(ErrGroupAbsent, "list measure")
+ return nil, BadRequest("group", "group should not be empty")
}
messages, err := e.listWithPrefix(ctx, listPrefixesForEntity(opt.Group, MeasureKeyPrefix), func() proto.Message {
return &databasev1.Measure{}
@@ -59,6 +58,73 @@ func (e *etcdSchemaRegistry) ListMeasure(ctx context.Context, opt ListOpt) ([]*d
return entities, nil
}
+func (e *etcdSchemaRegistry) CreateMeasure(ctx context.Context, measure *databasev1.Measure) error {
+ if err := e.create(ctx, Metadata{
+ TypeMeta: TypeMeta{
+ Kind: KindMeasure,
+ Group: measure.GetMetadata().GetGroup(),
+ Name: measure.GetMetadata().GetName(),
+ },
+ Spec: measure,
+ }); err != nil {
+ return err
+ }
+
+ // Add an index rule for the ID type tag
+ idIndexRuleMetadata := &commonv1.Metadata{
+ Name: TagTypeID,
+ Group: measure.Metadata.Group,
+ }
+ _, err := e.GetIndexRule(ctx, idIndexRuleMetadata)
+ if IsNotFound(err) {
+ if errIndexRule := e.CreateIndexRule(ctx, &databasev1.IndexRule{
+ Metadata: idIndexRuleMetadata,
+ Tags: []string{TagTypeID},
+ Type: databasev1.IndexRule_TYPE_TREE,
+ Location: databasev1.IndexRule_LOCATION_SERIES,
+ UpdatedAt: timestamppb.Now(),
+ }); errIndexRule != nil {
+ return errIndexRule
+ }
+ } else if err != nil {
+ return err
+ }
+ for _, tfs := range measure.GetTagFamilies() {
+ for _, ts := range tfs.GetTags() {
+ if ts.Type == databasev1.TagType_TAG_TYPE_ID {
+ for _, e := range measure.Entity.TagNames {
+ if ts.Name == e {
+ continue
+ }
+ }
+ irb := &databasev1.IndexRuleBinding{
+ Metadata: &commonv1.Metadata{
+ Name: TagTypeID + "_" + measure.Metadata.Name + "_" + ts.Name,
+ Group: measure.Metadata.Group,
+ },
+ Rules: []string{TagTypeID},
+ Subject: &databasev1.Subject{
+ Catalog: commonv1.Catalog_CATALOG_MEASURE,
+ Name: measure.Metadata.Name,
+ },
+ BeginAt: timestamppb.Now(),
+ ExpireAt: timestamppb.New(time.Now().AddDate(100, 0, 0)),
+ UpdatedAt: timestamppb.Now(),
+ }
+ _, innerErr := e.GetIndexRuleBinding(ctx, irb.GetMetadata())
+ if innerErr == nil {
+ return e.UpdateIndexRuleBinding(ctx, irb)
+ }
+ if IsNotFound(innerErr) {
+ return e.CreateIndexRuleBinding(ctx, irb)
+ }
+ return innerErr
+ }
+ }
+ }
+ return nil
+}
+
func (e *etcdSchemaRegistry) UpdateMeasure(ctx context.Context, measure *databasev1.Measure) error {
if err := e.update(ctx, Metadata{
TypeMeta: TypeMeta{
@@ -77,8 +143,8 @@ func (e *etcdSchemaRegistry) UpdateMeasure(ctx context.Context, measure *databas
Group: measure.Metadata.Group,
}
_, err := e.GetIndexRule(ctx, idIndexRuleMetadata)
- if errors.Is(err, ErrEntityNotFound) {
- if errIndexRule := e.UpdateIndexRule(ctx, &databasev1.IndexRule{
+ if IsNotFound(err) {
+ if errIndexRule := e.CreateIndexRule(ctx, &databasev1.IndexRule{
Metadata: idIndexRuleMetadata,
Tags: []string{TagTypeID},
Type: databasev1.IndexRule_TYPE_TREE,
diff --git a/banyand/metadata/schema/property.go b/banyand/metadata/schema/property.go
index 28cc8a5..d873014 100644
--- a/banyand/metadata/schema/property.go
+++ b/banyand/metadata/schema/property.go
@@ -14,13 +14,12 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
-//
+
package schema
import (
"context"
- "github.com/pkg/errors"
"google.golang.org/protobuf/proto"
commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
@@ -39,7 +38,7 @@ func (e *etcdSchemaRegistry) GetProperty(ctx context.Context, metadata *property
func (e *etcdSchemaRegistry) ListProperty(ctx context.Context, container *commonv1.Metadata) ([]*propertyv1.Property, error) {
if container.Group == "" {
- return nil, errors.Wrap(ErrGroupAbsent, "list Property")
+ return nil, BadRequest("container.group", "group should not be empty")
}
messages, err := e.listWithPrefix(ctx, listPrefixesForEntity(container.Group, PropertyKeyPrefix+container.Name), func() proto.Message {
return &propertyv1.Property{}
@@ -54,6 +53,18 @@ func (e *etcdSchemaRegistry) ListProperty(ctx context.Context, container *common
return entities, nil
}
+func (e *etcdSchemaRegistry) CreateProperty(ctx context.Context, property *propertyv1.Property) error {
+ m := transformKey(property.GetMetadata())
+ return e.create(ctx, Metadata{
+ TypeMeta: TypeMeta{
+ Kind: KindProperty,
+ Group: m.GetGroup(),
+ Name: m.GetName(),
+ },
+ Spec: property,
+ })
+}
+
func (e *etcdSchemaRegistry) UpdateProperty(ctx context.Context, property *propertyv1.Property) error {
m := transformKey(property.GetMetadata())
return e.update(ctx, Metadata{
diff --git a/banyand/metadata/schema/schema.go b/banyand/metadata/schema/schema.go
index c0f4060..9a5c0c3 100644
--- a/banyand/metadata/schema/schema.go
+++ b/banyand/metadata/schema/schema.go
@@ -157,6 +157,7 @@ func (m Metadata) Equal(other proto.Message) bool {
type Stream interface {
GetStream(ctx context.Context, metadata *commonv1.Metadata) (*databasev1.Stream, error)
ListStream(ctx context.Context, opt ListOpt) ([]*databasev1.Stream, error)
+ CreateStream(ctx context.Context, stream *databasev1.Stream) error
UpdateStream(ctx context.Context, stream *databasev1.Stream) error
DeleteStream(ctx context.Context, metadata *commonv1.Metadata) (bool, error)
RegisterHandler(Kind, EventHandler)
@@ -165,6 +166,7 @@ type Stream interface {
type IndexRule interface {
GetIndexRule(ctx context.Context, metadata *commonv1.Metadata) (*databasev1.IndexRule, error)
ListIndexRule(ctx context.Context, opt ListOpt) ([]*databasev1.IndexRule, error)
+ CreateIndexRule(ctx context.Context, indexRule *databasev1.IndexRule) error
UpdateIndexRule(ctx context.Context, indexRule *databasev1.IndexRule) error
DeleteIndexRule(ctx context.Context, metadata *commonv1.Metadata) (bool, error)
}
@@ -172,6 +174,7 @@ type IndexRule interface {
type IndexRuleBinding interface {
GetIndexRuleBinding(ctx context.Context, metadata *commonv1.Metadata) (*databasev1.IndexRuleBinding, error)
ListIndexRuleBinding(ctx context.Context, opt ListOpt) ([]*databasev1.IndexRuleBinding, error)
+ CreateIndexRuleBinding(ctx context.Context, indexRuleBinding *databasev1.IndexRuleBinding) error
UpdateIndexRuleBinding(ctx context.Context, indexRuleBinding *databasev1.IndexRuleBinding) error
DeleteIndexRuleBinding(ctx context.Context, metadata *commonv1.Metadata) (bool, error)
}
@@ -179,6 +182,7 @@ type IndexRuleBinding interface {
type Measure interface {
GetMeasure(ctx context.Context, metadata *commonv1.Metadata) (*databasev1.Measure, error)
ListMeasure(ctx context.Context, opt ListOpt) ([]*databasev1.Measure, error)
+ CreateMeasure(ctx context.Context, measure *databasev1.Measure) error
UpdateMeasure(ctx context.Context, measure *databasev1.Measure) error
DeleteMeasure(ctx context.Context, metadata *commonv1.Metadata) (bool, error)
RegisterHandler(Kind, EventHandler)
@@ -189,12 +193,14 @@ type Group interface {
ListGroup(ctx context.Context) ([]*commonv1.Group, error)
// DeleteGroup delete all items belonging to the group
DeleteGroup(ctx context.Context, group string) (bool, error)
+ CreateGroup(ctx context.Context, group *commonv1.Group) error
UpdateGroup(ctx context.Context, group *commonv1.Group) error
}
type TopNAggregation interface {
GetTopNAggregation(ctx context.Context, metadata *commonv1.Metadata) (*databasev1.TopNAggregation, error)
ListTopNAggregation(ctx context.Context, opt ListOpt) ([]*databasev1.TopNAggregation, error)
+ CreateTopNAggregation(ctx context.Context, measure *databasev1.TopNAggregation) error
UpdateTopNAggregation(ctx context.Context, measure *databasev1.TopNAggregation) error
DeleteTopNAggregation(ctx context.Context, metadata *commonv1.Metadata) (bool, error)
}
@@ -202,6 +208,7 @@ type TopNAggregation interface {
type Property interface {
GetProperty(ctx context.Context, metadata *propertyv1.Metadata) (*propertyv1.Property, error)
ListProperty(ctx context.Context, container *commonv1.Metadata) ([]*propertyv1.Property, error)
+ CreateProperty(ctx context.Context, property *propertyv1.Property) error
UpdateProperty(ctx context.Context, property *propertyv1.Property) error
DeleteProperty(ctx context.Context, metadata *propertyv1.Metadata) (bool, error)
}
diff --git a/banyand/metadata/schema/stream.go b/banyand/metadata/schema/stream.go
index bbcad7c..accc6d8 100644
--- a/banyand/metadata/schema/stream.go
+++ b/banyand/metadata/schema/stream.go
@@ -14,13 +14,12 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
-//
+
package schema
import (
"context"
- "github.com/pkg/errors"
"google.golang.org/protobuf/proto"
commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
@@ -39,7 +38,7 @@ func (e *etcdSchemaRegistry) GetStream(ctx context.Context, metadata *commonv1.M
func (e *etcdSchemaRegistry) ListStream(ctx context.Context, opt ListOpt) ([]*databasev1.Stream, error) {
if opt.Group == "" {
- return nil, errors.Wrap(ErrGroupAbsent, "list stream")
+ return nil, BadRequest("group", "group should not be empty")
}
messages, err := e.listWithPrefix(ctx, listPrefixesForEntity(opt.Group, StreamKeyPrefix), func() proto.Message {
return &databasev1.Stream{}
@@ -65,6 +64,17 @@ func (e *etcdSchemaRegistry) UpdateStream(ctx context.Context, stream *databasev
})
}
+func (e *etcdSchemaRegistry) CreateStream(ctx context.Context, stream *databasev1.Stream) error {
+ return e.create(ctx, Metadata{
+ TypeMeta: TypeMeta{
+ Kind: KindStream,
+ Group: stream.GetMetadata().GetGroup(),
+ Name: stream.GetMetadata().GetName(),
+ },
+ Spec: stream,
+ })
+}
+
func (e *etcdSchemaRegistry) DeleteStream(ctx context.Context, metadata *commonv1.Metadata) (bool, error) {
return e.delete(ctx, Metadata{
TypeMeta: TypeMeta{
diff --git a/banyand/metadata/schema/topn.go b/banyand/metadata/schema/topn.go
index af79e24..fc94dad 100644
--- a/banyand/metadata/schema/topn.go
+++ b/banyand/metadata/schema/topn.go
@@ -14,13 +14,12 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
-//
+
package schema
import (
"context"
- "github.com/pkg/errors"
"google.golang.org/protobuf/proto"
commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
@@ -39,7 +38,7 @@ func (e *etcdSchemaRegistry) GetTopNAggregation(ctx context.Context, metadata *c
func (e *etcdSchemaRegistry) ListTopNAggregation(ctx context.Context, opt ListOpt) ([]*databasev1.TopNAggregation, error) {
if opt.Group == "" {
- return nil, errors.Wrap(ErrGroupAbsent, "list TopNAggregation")
+ return nil, BadRequest("group", "group should not be empty")
}
messages, err := e.listWithPrefix(ctx, listPrefixesForEntity(opt.Group, TopNAggregationKeyPrefix), func() proto.Message {
return &databasev1.TopNAggregation{}
@@ -54,6 +53,17 @@ func (e *etcdSchemaRegistry) ListTopNAggregation(ctx context.Context, opt ListOp
return entities, nil
}
+func (e *etcdSchemaRegistry) CreateTopNAggregation(ctx context.Context, topNAggregation *databasev1.TopNAggregation) error {
+ return e.create(ctx, Metadata{
+ TypeMeta: TypeMeta{
+ Kind: KindTopNAggregation,
+ Group: topNAggregation.GetMetadata().GetGroup(),
+ Name: topNAggregation.GetMetadata().GetName(),
+ },
+ Spec: topNAggregation,
+ })
+}
+
func (e *etcdSchemaRegistry) UpdateTopNAggregation(ctx context.Context, topNAggregation *databasev1.TopNAggregation) error {
return e.update(ctx, Metadata{
TypeMeta: TypeMeta{
diff --git a/go.mod b/go.mod
index 4f5498c..e56cf81 100644
--- a/go.mod
+++ b/go.mod
@@ -26,7 +26,7 @@ require (
go.uber.org/multierr v1.7.0
golang.org/x/net v0.0.0-20210716203947-853a461950ff // indirect
golang.org/x/sys v0.0.0-20220114195835-da31bd327af9 // indirect
- google.golang.org/genproto v0.0.0-20210722135532-667f2b7c528f // indirect
+ google.golang.org/genproto v0.0.0-20210722135532-667f2b7c528f
google.golang.org/grpc v1.39.0
google.golang.org/protobuf v1.27.1
)
diff --git a/pkg/test/measure/etcd.go b/pkg/test/measure/etcd.go
index d5d65aa..a8242c1 100644
--- a/pkg/test/measure/etcd.go
+++ b/pkg/test/measure/etcd.go
@@ -48,22 +48,50 @@ var (
func PreloadSchema(e schema.Registry) error {
if err := loadSchema(groupDir, &commonv1.Group{}, func(group proto.Message) error {
+ _, err := e.GetGroup(context.TODO(), group.(*commonv1.Group).GetMetadata().GetName())
+ if err != nil {
+ if schema.IsNotFound(err) {
+ return e.CreateGroup(context.TODO(), group.(*commonv1.Group))
+ }
+ return err
+ }
return e.UpdateGroup(context.TODO(), group.(*commonv1.Group))
}); err != nil {
return errors.WithStack(err)
}
- if err := loadSchema(measureDir, &databasev1.Measure{}, func(group proto.Message) error {
- return e.UpdateMeasure(context.TODO(), group.(*databasev1.Measure))
+ if err := loadSchema(measureDir, &databasev1.Measure{}, func(measure proto.Message) error {
+ _, err := e.GetMeasure(context.TODO(), measure.(*databasev1.Measure).GetMetadata())
+ if err != nil {
+ if schema.IsNotFound(err) {
+ return e.CreateMeasure(context.TODO(), measure.(*databasev1.Measure))
+ }
+ return err
+ }
+ return e.UpdateMeasure(context.TODO(), measure.(*databasev1.Measure))
}); err != nil {
return errors.WithStack(err)
}
- if err := loadSchema(indexRuleDir, &databasev1.IndexRule{}, func(group proto.Message) error {
- return e.UpdateIndexRule(context.TODO(), group.(*databasev1.IndexRule))
+ if err := loadSchema(indexRuleDir, &databasev1.IndexRule{}, func(indexRule proto.Message) error {
+ _, err := e.GetIndexRule(context.TODO(), indexRule.(*databasev1.IndexRule).GetMetadata())
+ if err != nil {
+ if schema.IsNotFound(err) {
+ return e.CreateIndexRule(context.TODO(), indexRule.(*databasev1.IndexRule))
+ }
+ return err
+ }
+ return e.UpdateIndexRule(context.TODO(), indexRule.(*databasev1.IndexRule))
}); err != nil {
return errors.WithStack(err)
}
- if err := loadSchema(indexRuleBindingDir, &databasev1.IndexRuleBinding{}, func(group proto.Message) error {
- return e.UpdateIndexRuleBinding(context.TODO(), group.(*databasev1.IndexRuleBinding))
+ if err := loadSchema(indexRuleBindingDir, &databasev1.IndexRuleBinding{}, func(indexRuleBinding proto.Message) error {
+ _, err := e.GetIndexRuleBinding(context.TODO(), indexRuleBinding.(*databasev1.IndexRuleBinding).GetMetadata())
+ if err != nil {
+ if schema.IsNotFound(err) {
+ return e.CreateIndexRuleBinding(context.TODO(), indexRuleBinding.(*databasev1.IndexRuleBinding))
+ }
+ return err
+ }
+ return e.UpdateIndexRuleBinding(context.TODO(), indexRuleBinding.(*databasev1.IndexRuleBinding))
}); err != nil {
return errors.WithStack(err)
}
diff --git a/pkg/test/stream/etcd.go b/pkg/test/stream/etcd.go
index e6ca510..6c9d7c8 100644
--- a/pkg/test/stream/etcd.go
+++ b/pkg/test/stream/etcd.go
@@ -50,25 +50,53 @@ func PreloadSchema(e schema.Registry) error {
if err := protojson.Unmarshal([]byte(groupJSON), g); err != nil {
return err
}
- if err := e.UpdateGroup(context.TODO(), g); err != nil {
+
+ _, err := e.GetGroup(context.TODO(), g.GetMetadata().GetName())
+ if err != nil && schema.IsNotFound(err) {
+ if innerErr := e.CreateGroup(context.TODO(), g); innerErr != nil {
+ return innerErr
+ }
+ } else if err != nil {
return err
+ } else {
+ if innerErr := e.UpdateGroup(context.TODO(), g); innerErr != nil {
+ return innerErr
+ }
}
+
s := &databasev1.Stream{}
- if err := protojson.Unmarshal([]byte(streamJSON), s); err != nil {
- return err
+ if unmarshalErr := protojson.Unmarshal([]byte(streamJSON), s); unmarshalErr != nil {
+ return unmarshalErr
}
- err := e.UpdateStream(context.Background(), s)
- if err != nil {
+ _, err = e.GetStream(context.Background(), s.GetMetadata())
+ if err != nil && schema.IsNotFound(err) {
+ if innerErr := e.CreateStream(context.TODO(), s); innerErr != nil {
+ return innerErr
+ }
+ } else if err != nil {
return err
+ } else {
+ if innerErr := e.UpdateStream(context.TODO(), s); innerErr != nil {
+ return innerErr
+ }
}
indexRuleBinding := &databasev1.IndexRuleBinding{}
- if err = protojson.Unmarshal([]byte(indexRuleBindingJSON), indexRuleBinding); err != nil {
- return err
+ if unmarshalErr := protojson.Unmarshal([]byte(indexRuleBindingJSON), indexRuleBinding); unmarshalErr != nil {
+ return unmarshalErr
}
- err = e.UpdateIndexRuleBinding(context.Background(), indexRuleBinding)
- if err != nil {
+
+ _, err = e.GetIndexRuleBinding(context.Background(), indexRuleBinding.GetMetadata())
+ if err != nil && schema.IsNotFound(err) {
+ if innerErr := e.CreateIndexRuleBinding(context.TODO(), indexRuleBinding); innerErr != nil {
+ return innerErr
+ }
+ } else if err != nil {
return err
+ } else {
+ if innerErr := e.UpdateIndexRuleBinding(context.TODO(), indexRuleBinding); innerErr != nil {
+ return innerErr
+ }
}
entries, err := indexRuleStore.ReadDir(indexRuleDir)
@@ -85,9 +113,17 @@ func PreloadSchema(e schema.Registry) error {
if err != nil {
return err
}
- err = e.UpdateIndexRule(context.Background(), &idxRule)
- if err != nil {
+ _, err = e.GetIndexRule(context.Background(), idxRule.GetMetadata())
+ if err != nil && schema.IsNotFound(err) {
+ if innerErr := e.CreateIndexRule(context.TODO(), &idxRule); innerErr != nil {
+ return innerErr
+ }
+ } else if err != nil {
return err
+ } else {
+ if innerErr := e.UpdateIndexRule(context.TODO(), &idxRule); innerErr != nil {
+ return innerErr
+ }
}
}