You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ha...@apache.org on 2022/04/14 12:15:46 UTC

[skywalking-banyandb] branch main updated: Return error created from status (#100)

This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new 54c252b  Return error created from status (#100)
54c252b is described below

commit 54c252be79237097a324a06fdf2ea250a9bdb718
Author: Jiajing LU <lu...@gmail.com>
AuthorDate: Thu Apr 14 20:15:40 2022 +0800

    Return error created from status (#100)
    
    * add grpc errors
    
    * add create method
---
 banyand/liaison/grpc/property.go      |  4 +-
 banyand/liaison/grpc/registry.go      | 12 +++---
 banyand/liaison/grpc/registry_test.go |  8 ++--
 banyand/metadata/schema/error.go      | 51 +++++++++++++++++++++++
 banyand/metadata/schema/etcd.go       | 43 +++++++++++++++++---
 banyand/metadata/schema/etcd_test.go  |  8 ++--
 banyand/metadata/schema/group.go      | 14 +++++--
 banyand/metadata/schema/index.go      | 29 +++++++++++--
 banyand/metadata/schema/measure.go    | 76 ++++++++++++++++++++++++++++++++---
 banyand/metadata/schema/property.go   | 17 ++++++--
 banyand/metadata/schema/schema.go     |  7 ++++
 banyand/metadata/schema/stream.go     | 16 ++++++--
 banyand/metadata/schema/topn.go       | 16 ++++++--
 go.mod                                |  2 +-
 pkg/test/measure/etcd.go              | 40 +++++++++++++++---
 pkg/test/stream/etcd.go               | 58 +++++++++++++++++++++-----
 16 files changed, 340 insertions(+), 61 deletions(-)

diff --git a/banyand/liaison/grpc/property.go b/banyand/liaison/grpc/property.go
index be08309..7326d94 100644
--- a/banyand/liaison/grpc/property.go
+++ b/banyand/liaison/grpc/property.go
@@ -14,7 +14,7 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-//
+
 package grpc
 
 import (
@@ -30,7 +30,7 @@ type propertyServer struct {
 }
 
 func (ps *propertyServer) Create(ctx context.Context, req *propertyv1.CreateRequest) (*propertyv1.CreateResponse, error) {
-	if err := ps.schemaRegistry.PropertyRegistry().UpdateProperty(ctx, req.GetProperty()); err != nil {
+	if err := ps.schemaRegistry.PropertyRegistry().CreateProperty(ctx, req.GetProperty()); err != nil {
 		return nil, err
 	}
 	return &propertyv1.CreateResponse{}, nil
diff --git a/banyand/liaison/grpc/registry.go b/banyand/liaison/grpc/registry.go
index 61e2add..5f25b8a 100644
--- a/banyand/liaison/grpc/registry.go
+++ b/banyand/liaison/grpc/registry.go
@@ -32,7 +32,7 @@ type streamRegistryServer struct {
 
 func (rs *streamRegistryServer) Create(ctx context.Context,
 	req *databasev1.StreamRegistryServiceCreateRequest) (*databasev1.StreamRegistryServiceCreateResponse, error) {
-	if err := rs.schemaRegistry.StreamRegistry().UpdateStream(ctx, req.GetStream()); err != nil {
+	if err := rs.schemaRegistry.StreamRegistry().CreateStream(ctx, req.GetStream()); err != nil {
 		return nil, err
 	}
 	return &databasev1.StreamRegistryServiceCreateResponse{}, nil
@@ -87,7 +87,7 @@ type indexRuleBindingRegistryServer struct {
 func (rs *indexRuleBindingRegistryServer) Create(ctx context.Context,
 	req *databasev1.IndexRuleBindingRegistryServiceCreateRequest) (
 	*databasev1.IndexRuleBindingRegistryServiceCreateResponse, error) {
-	if err := rs.schemaRegistry.IndexRuleBindingRegistry().UpdateIndexRuleBinding(ctx, req.GetIndexRuleBinding()); err != nil {
+	if err := rs.schemaRegistry.IndexRuleBindingRegistry().CreateIndexRuleBinding(ctx, req.GetIndexRuleBinding()); err != nil {
 		return nil, err
 	}
 	return &databasev1.IndexRuleBindingRegistryServiceCreateResponse{}, nil
@@ -146,7 +146,7 @@ type indexRuleRegistryServer struct {
 
 func (rs *indexRuleRegistryServer) Create(ctx context.Context, req *databasev1.IndexRuleRegistryServiceCreateRequest) (
 	*databasev1.IndexRuleRegistryServiceCreateResponse, error) {
-	if err := rs.schemaRegistry.IndexRuleRegistry().UpdateIndexRule(ctx, req.GetIndexRule()); err != nil {
+	if err := rs.schemaRegistry.IndexRuleRegistry().CreateIndexRule(ctx, req.GetIndexRule()); err != nil {
 		return nil, err
 	}
 	return &databasev1.IndexRuleRegistryServiceCreateResponse{}, nil
@@ -200,7 +200,7 @@ type measureRegistryServer struct {
 
 func (rs *measureRegistryServer) Create(ctx context.Context, req *databasev1.MeasureRegistryServiceCreateRequest) (
 	*databasev1.MeasureRegistryServiceCreateResponse, error) {
-	if err := rs.schemaRegistry.MeasureRegistry().UpdateMeasure(ctx, req.GetMeasure()); err != nil {
+	if err := rs.schemaRegistry.MeasureRegistry().CreateMeasure(ctx, req.GetMeasure()); err != nil {
 		return nil, err
 	}
 	return &databasev1.MeasureRegistryServiceCreateResponse{}, nil
@@ -254,7 +254,7 @@ type groupRegistryServer struct {
 
 func (rs *groupRegistryServer) Create(ctx context.Context, req *databasev1.GroupRegistryServiceCreateRequest) (
 	*databasev1.GroupRegistryServiceCreateResponse, error) {
-	if err := rs.schemaRegistry.GroupRegistry().UpdateGroup(ctx, req.GetGroup()); err != nil {
+	if err := rs.schemaRegistry.GroupRegistry().CreateGroup(ctx, req.GetGroup()); err != nil {
 		return nil, err
 	}
 	return &databasev1.GroupRegistryServiceCreateResponse{}, nil
@@ -308,7 +308,7 @@ type topNAggregationRegistryServer struct {
 
 func (ts *topNAggregationRegistryServer) Create(ctx context.Context,
 	req *databasev1.TopNAggregationRegistryServiceCreateRequest) (*databasev1.TopNAggregationRegistryServiceCreateResponse, error) {
-	if err := ts.schemaRegistry.TopNAggregationRegistry().UpdateTopNAggregation(ctx, req.GetTopNAggregation()); err != nil {
+	if err := ts.schemaRegistry.TopNAggregationRegistry().CreateTopNAggregation(ctx, req.GetTopNAggregation()); err != nil {
 		return nil, err
 	}
 	return &databasev1.TopNAggregationRegistryServiceCreateResponse{}, nil
diff --git a/banyand/liaison/grpc/registry_test.go b/banyand/liaison/grpc/registry_test.go
index 10033b1..a88ea70 100644
--- a/banyand/liaison/grpc/registry_test.go
+++ b/banyand/liaison/grpc/registry_test.go
@@ -23,6 +23,7 @@ import (
 	. "github.com/onsi/ginkgo/v2"
 	. "github.com/onsi/gomega"
 	grpclib "google.golang.org/grpc"
+	"google.golang.org/grpc/codes"
 	"google.golang.org/grpc/status"
 
 	commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
@@ -30,7 +31,6 @@ import (
 	"github.com/apache/skywalking-banyandb/banyand/discovery"
 	"github.com/apache/skywalking-banyandb/banyand/liaison/grpc"
 	"github.com/apache/skywalking-banyandb/banyand/metadata"
-	"github.com/apache/skywalking-banyandb/banyand/metadata/schema"
 	"github.com/apache/skywalking-banyandb/banyand/queue"
 	"github.com/apache/skywalking-banyandb/pkg/test"
 )
@@ -66,7 +66,7 @@ var _ = Describe("Registry", func() {
 			Metadata: meta,
 		})
 		errStatus, _ := status.FromError(err)
-		Expect(errStatus.Message()).To(Equal(schema.ErrEntityNotFound.Error()))
+		Expect(errStatus.Code()).To(Equal(codes.NotFound))
 		By("Creating a new stream")
 		_, err = client.Create(context.TODO(), &databasev1.StreamRegistryServiceCreateRequest{Stream: getResp.GetStream()})
 		Expect(err).ShouldNot(HaveOccurred())
@@ -96,7 +96,7 @@ var _ = Describe("Registry", func() {
 			Metadata: meta,
 		})
 		errStatus, _ := status.FromError(err)
-		Expect(errStatus.Message()).To(Equal(schema.ErrEntityNotFound.Error()))
+		Expect(errStatus.Code()).To(Equal(codes.NotFound))
 		By("Creating a new index-rule-binding")
 		_, err = client.Create(context.TODO(), &databasev1.IndexRuleBindingRegistryServiceCreateRequest{IndexRuleBinding: getResp.GetIndexRuleBinding()})
 		Expect(err).ShouldNot(HaveOccurred())
@@ -126,7 +126,7 @@ var _ = Describe("Registry", func() {
 			Metadata: meta,
 		})
 		errStatus, _ := status.FromError(err)
-		Expect(errStatus.Message()).To(Equal(schema.ErrEntityNotFound.Error()))
+		Expect(errStatus.Code()).To(Equal(codes.NotFound))
 		By("Creating a new index-rule")
 		_, err = client.Create(context.TODO(), &databasev1.IndexRuleRegistryServiceCreateRequest{IndexRule: getResp.GetIndexRule()})
 		Expect(err).ShouldNot(HaveOccurred())
diff --git a/banyand/metadata/schema/error.go b/banyand/metadata/schema/error.go
new file mode 100644
index 0000000..8853e0f
--- /dev/null
+++ b/banyand/metadata/schema/error.go
@@ -0,0 +1,51 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package schema
+
+import (
+	"github.com/pkg/errors"
+	"google.golang.org/genproto/googleapis/rpc/errdetails"
+	"google.golang.org/grpc/codes"
+	"google.golang.org/grpc/status"
+)
+
+var (
+	statusGRPCInvalidArgument  = status.New(codes.InvalidArgument, "banyandb: input is invalid")
+	ErrGRPCInvalidArgument     = statusGRPCInvalidArgument.Err()
+	statusGRPCResourceNotFound = status.New(codes.NotFound, "banyandb: resource not found")
+	ErrGRPCResourceNotFound    = statusGRPCResourceNotFound.Err()
+	statusGRPCAlreadyExists    = status.New(codes.AlreadyExists, "banyandb: resource already exists")
+	ErrGRPCAlreadyExists       = statusGRPCAlreadyExists.Err()
+)
+
+func IsNotFound(err error) bool {
+	return errors.Is(err, ErrGRPCResourceNotFound)
+}
+
+// BadRequest creates a gRPC error with error details with type BadRequest,
+// which describes violations in a client request.
+func BadRequest(field, desc string) error {
+	v := &errdetails.BadRequest_FieldViolation{
+		Field:       field,
+		Description: desc,
+	}
+	br := &errdetails.BadRequest{}
+	br.FieldViolations = append(br.FieldViolations, v)
+	st, _ := statusGRPCInvalidArgument.WithDetails(br)
+	return st.Err()
+}
diff --git a/banyand/metadata/schema/etcd.go b/banyand/metadata/schema/etcd.go
index 0d1d58e..7225cd5 100644
--- a/banyand/metadata/schema/etcd.go
+++ b/banyand/metadata/schema/etcd.go
@@ -42,7 +42,6 @@ var (
 	_ Group            = (*etcdSchemaRegistry)(nil)
 	_ Property         = (*etcdSchemaRegistry)(nil)
 
-	ErrEntityNotFound             = errors.New("entity is not found")
 	ErrUnexpectedNumberOfEntities = errors.New("unexpected number of entities")
 	ErrConcurrentModification     = errors.New("concurrent modification of entities")
 
@@ -176,7 +175,7 @@ func (e *etcdSchemaRegistry) get(ctx context.Context, key string, message proto.
 		return err
 	}
 	if resp.Count == 0 {
-		return ErrEntityNotFound
+		return ErrGRPCResourceNotFound
 	}
 	if resp.Count > 1 {
 		return ErrUnexpectedNumberOfEntities
@@ -192,6 +191,9 @@ func (e *etcdSchemaRegistry) get(ctx context.Context, key string, message proto.
 	return nil
 }
 
+// update will first ensure the existence of the entity with the metadata,
+// and overwrite the existing value if so.
+// Otherwise, it will return ErrGRPCResourceNotFound.
 func (e *etcdSchemaRegistry) update(ctx context.Context, metadata Metadata) error {
 	key, err := metadata.Key()
 	if err != nil {
@@ -231,11 +233,40 @@ func (e *etcdSchemaRegistry) update(ctx context.Context, metadata Metadata) erro
 			return ErrConcurrentModification
 		}
 	} else {
-		_, err = e.kv.Put(ctx, key, string(val))
-		if err != nil {
-			return err
-		}
+		return ErrGRPCResourceNotFound
+	}
+	e.notifyUpdate(metadata)
+	return nil
+}
+
+// create will first check existence of the entity with the metadata,
+// and put the value if it does not exist.
+// Otherwise, it will return ErrGRPCAlreadyExists.
+func (e *etcdSchemaRegistry) create(ctx context.Context, metadata Metadata) error {
+	key, err := metadata.Key()
+	if err != nil {
+		return err
 	}
+	getResp, err := e.kv.Get(ctx, key)
+	if err != nil {
+		return err
+	}
+	if getResp.Count > 1 {
+		return ErrUnexpectedNumberOfEntities
+	}
+	val, err := proto.Marshal(metadata.Spec.(proto.Message))
+	if err != nil {
+		return err
+	}
+	replace := getResp.Count > 0
+	if replace {
+		return ErrGRPCAlreadyExists
+	}
+	_, err = e.kv.Put(ctx, key, string(val))
+	if err != nil {
+		return err
+	}
+
 	e.notifyUpdate(metadata)
 	return nil
 }
diff --git a/banyand/metadata/schema/etcd_test.go b/banyand/metadata/schema/etcd_test.go
index 4f53b51..9131e0d 100644
--- a/banyand/metadata/schema/etcd_test.go
+++ b/banyand/metadata/schema/etcd_test.go
@@ -68,7 +68,7 @@ func preloadSchema(e Registry) error {
 	if err := protojson.Unmarshal([]byte(groupJSON), g); err != nil {
 		return err
 	}
-	if err := e.UpdateGroup(context.TODO(), g); err != nil {
+	if err := e.CreateGroup(context.TODO(), g); err != nil {
 		return err
 	}
 
@@ -76,7 +76,7 @@ func preloadSchema(e Registry) error {
 	if err := protojson.Unmarshal([]byte(streamJSON), s); err != nil {
 		return err
 	}
-	err := e.UpdateStream(context.Background(), s)
+	err := e.CreateStream(context.Background(), s)
 	if err != nil {
 		return err
 	}
@@ -85,7 +85,7 @@ func preloadSchema(e Registry) error {
 	if err = protojson.Unmarshal([]byte(indexRuleBindingJSON), indexRuleBinding); err != nil {
 		return err
 	}
-	err = e.UpdateIndexRuleBinding(context.Background(), indexRuleBinding)
+	err = e.CreateIndexRuleBinding(context.Background(), indexRuleBinding)
 	if err != nil {
 		return err
 	}
@@ -104,7 +104,7 @@ func preloadSchema(e Registry) error {
 		if err != nil {
 			return err
 		}
-		err = e.UpdateIndexRule(context.Background(), &idxRule)
+		err = e.CreateIndexRule(context.Background(), &idxRule)
 		if err != nil {
 			return err
 		}
diff --git a/banyand/metadata/schema/group.go b/banyand/metadata/schema/group.go
index 786dbb4..e1b0722 100644
--- a/banyand/metadata/schema/group.go
+++ b/banyand/metadata/schema/group.go
@@ -14,7 +14,7 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-//
+
 package schema
 
 import (
@@ -31,8 +31,6 @@ import (
 var (
 	GroupsKeyPrefix  = "/groups/"
 	GroupMetadataKey = "/__meta_group__"
-
-	ErrGroupAbsent = errors.New("group is absent")
 )
 
 func (e *etcdSchemaRegistry) GetGroup(ctx context.Context, group string) (*commonv1.Group, error) {
@@ -88,6 +86,16 @@ func (e *etcdSchemaRegistry) DeleteGroup(ctx context.Context, group string) (boo
 	return true, nil
 }
 
+func (e *etcdSchemaRegistry) CreateGroup(ctx context.Context, group *commonv1.Group) error {
+	return e.create(ctx, Metadata{
+		TypeMeta: TypeMeta{
+			Kind: KindGroup,
+			Name: group.GetMetadata().GetName(),
+		},
+		Spec: group,
+	})
+}
+
 func (e *etcdSchemaRegistry) UpdateGroup(ctx context.Context, group *commonv1.Group) error {
 	return e.update(ctx, Metadata{
 		TypeMeta: TypeMeta{
diff --git a/banyand/metadata/schema/index.go b/banyand/metadata/schema/index.go
index 33dd4cb..c79ebfc 100644
--- a/banyand/metadata/schema/index.go
+++ b/banyand/metadata/schema/index.go
@@ -14,13 +14,12 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-//
+
 package schema
 
 import (
 	"context"
 
-	"github.com/pkg/errors"
 	"google.golang.org/protobuf/proto"
 
 	commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
@@ -42,7 +41,7 @@ func (e *etcdSchemaRegistry) GetIndexRuleBinding(ctx context.Context, metadata *
 
 func (e *etcdSchemaRegistry) ListIndexRuleBinding(ctx context.Context, opt ListOpt) ([]*databasev1.IndexRuleBinding, error) {
 	if opt.Group == "" {
-		return nil, errors.Wrap(ErrGroupAbsent, "list index rule binding")
+		return nil, BadRequest("group", "group should not be empty")
 	}
 	messages, err := e.listWithPrefix(ctx, listPrefixesForEntity(opt.Group, IndexRuleBindingKeyPrefix), func() proto.Message {
 		return &databasev1.IndexRuleBinding{}
@@ -57,6 +56,17 @@ func (e *etcdSchemaRegistry) ListIndexRuleBinding(ctx context.Context, opt ListO
 	return entities, nil
 }
 
+func (e *etcdSchemaRegistry) CreateIndexRuleBinding(ctx context.Context, indexRuleBinding *databasev1.IndexRuleBinding) error {
+	return e.create(ctx, Metadata{
+		TypeMeta: TypeMeta{
+			Kind:  KindIndexRuleBinding,
+			Name:  indexRuleBinding.GetMetadata().GetName(),
+			Group: indexRuleBinding.GetMetadata().GetGroup(),
+		},
+		Spec: indexRuleBinding,
+	})
+}
+
 func (e *etcdSchemaRegistry) UpdateIndexRuleBinding(ctx context.Context, indexRuleBinding *databasev1.IndexRuleBinding) error {
 	return e.update(ctx, Metadata{
 		TypeMeta: TypeMeta{
@@ -88,7 +98,7 @@ func (e *etcdSchemaRegistry) GetIndexRule(ctx context.Context, metadata *commonv
 
 func (e *etcdSchemaRegistry) ListIndexRule(ctx context.Context, opt ListOpt) ([]*databasev1.IndexRule, error) {
 	if opt.Group == "" {
-		return nil, errors.Wrap(ErrGroupAbsent, "list index rule")
+		return nil, BadRequest("group", "group should not be empty")
 	}
 	messages, err := e.listWithPrefix(ctx, listPrefixesForEntity(opt.Group, IndexRuleKeyPrefix), func() proto.Message {
 		return &databasev1.IndexRule{}
@@ -103,6 +113,17 @@ func (e *etcdSchemaRegistry) ListIndexRule(ctx context.Context, opt ListOpt) ([]
 	return entities, nil
 }
 
+func (e *etcdSchemaRegistry) CreateIndexRule(ctx context.Context, indexRule *databasev1.IndexRule) error {
+	return e.create(ctx, Metadata{
+		TypeMeta: TypeMeta{
+			Kind:  KindIndexRule,
+			Name:  indexRule.GetMetadata().GetName(),
+			Group: indexRule.GetMetadata().GetGroup(),
+		},
+		Spec: indexRule,
+	})
+}
+
 func (e *etcdSchemaRegistry) UpdateIndexRule(ctx context.Context, indexRule *databasev1.IndexRule) error {
 	return e.update(ctx, Metadata{
 		TypeMeta: TypeMeta{
diff --git a/banyand/metadata/schema/measure.go b/banyand/metadata/schema/measure.go
index 33e5c42..2bc1fae 100644
--- a/banyand/metadata/schema/measure.go
+++ b/banyand/metadata/schema/measure.go
@@ -14,14 +14,13 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-//
+
 package schema
 
 import (
 	"context"
 	"time"
 
-	"github.com/pkg/errors"
 	"google.golang.org/protobuf/proto"
 	"google.golang.org/protobuf/types/known/timestamppb"
 
@@ -44,7 +43,7 @@ func (e *etcdSchemaRegistry) GetMeasure(ctx context.Context, metadata *commonv1.
 
 func (e *etcdSchemaRegistry) ListMeasure(ctx context.Context, opt ListOpt) ([]*databasev1.Measure, error) {
 	if opt.Group == "" {
-		return nil, errors.Wrap(ErrGroupAbsent, "list measure")
+		return nil, BadRequest("group", "group should not be empty")
 	}
 	messages, err := e.listWithPrefix(ctx, listPrefixesForEntity(opt.Group, MeasureKeyPrefix), func() proto.Message {
 		return &databasev1.Measure{}
@@ -59,6 +58,73 @@ func (e *etcdSchemaRegistry) ListMeasure(ctx context.Context, opt ListOpt) ([]*d
 	return entities, nil
 }
 
+func (e *etcdSchemaRegistry) CreateMeasure(ctx context.Context, measure *databasev1.Measure) error {
+	if err := e.create(ctx, Metadata{
+		TypeMeta: TypeMeta{
+			Kind:  KindMeasure,
+			Group: measure.GetMetadata().GetGroup(),
+			Name:  measure.GetMetadata().GetName(),
+		},
+		Spec: measure,
+	}); err != nil {
+		return err
+	}
+
+	// Add an index rule for the ID type tag
+	idIndexRuleMetadata := &commonv1.Metadata{
+		Name:  TagTypeID,
+		Group: measure.Metadata.Group,
+	}
+	_, err := e.GetIndexRule(ctx, idIndexRuleMetadata)
+	if IsNotFound(err) {
+		if errIndexRule := e.CreateIndexRule(ctx, &databasev1.IndexRule{
+			Metadata:  idIndexRuleMetadata,
+			Tags:      []string{TagTypeID},
+			Type:      databasev1.IndexRule_TYPE_TREE,
+			Location:  databasev1.IndexRule_LOCATION_SERIES,
+			UpdatedAt: timestamppb.Now(),
+		}); errIndexRule != nil {
+			return errIndexRule
+		}
+	} else if err != nil {
+		return err
+	}
+	for _, tfs := range measure.GetTagFamilies() {
+		for _, ts := range tfs.GetTags() {
+			if ts.Type == databasev1.TagType_TAG_TYPE_ID {
+				for _, e := range measure.Entity.TagNames {
+					if ts.Name == e {
+						continue
+					}
+				}
+				irb := &databasev1.IndexRuleBinding{
+					Metadata: &commonv1.Metadata{
+						Name:  TagTypeID + "_" + measure.Metadata.Name + "_" + ts.Name,
+						Group: measure.Metadata.Group,
+					},
+					Rules: []string{TagTypeID},
+					Subject: &databasev1.Subject{
+						Catalog: commonv1.Catalog_CATALOG_MEASURE,
+						Name:    measure.Metadata.Name,
+					},
+					BeginAt:   timestamppb.Now(),
+					ExpireAt:  timestamppb.New(time.Now().AddDate(100, 0, 0)),
+					UpdatedAt: timestamppb.Now(),
+				}
+				_, innerErr := e.GetIndexRuleBinding(ctx, irb.GetMetadata())
+				if innerErr == nil {
+					return e.UpdateIndexRuleBinding(ctx, irb)
+				}
+				if IsNotFound(innerErr) {
+					return e.CreateIndexRuleBinding(ctx, irb)
+				}
+				return innerErr
+			}
+		}
+	}
+	return nil
+}
+
 func (e *etcdSchemaRegistry) UpdateMeasure(ctx context.Context, measure *databasev1.Measure) error {
 	if err := e.update(ctx, Metadata{
 		TypeMeta: TypeMeta{
@@ -77,8 +143,8 @@ func (e *etcdSchemaRegistry) UpdateMeasure(ctx context.Context, measure *databas
 		Group: measure.Metadata.Group,
 	}
 	_, err := e.GetIndexRule(ctx, idIndexRuleMetadata)
-	if errors.Is(err, ErrEntityNotFound) {
-		if errIndexRule := e.UpdateIndexRule(ctx, &databasev1.IndexRule{
+	if IsNotFound(err) {
+		if errIndexRule := e.CreateIndexRule(ctx, &databasev1.IndexRule{
 			Metadata:  idIndexRuleMetadata,
 			Tags:      []string{TagTypeID},
 			Type:      databasev1.IndexRule_TYPE_TREE,
diff --git a/banyand/metadata/schema/property.go b/banyand/metadata/schema/property.go
index 28cc8a5..d873014 100644
--- a/banyand/metadata/schema/property.go
+++ b/banyand/metadata/schema/property.go
@@ -14,13 +14,12 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-//
+
 package schema
 
 import (
 	"context"
 
-	"github.com/pkg/errors"
 	"google.golang.org/protobuf/proto"
 
 	commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
@@ -39,7 +38,7 @@ func (e *etcdSchemaRegistry) GetProperty(ctx context.Context, metadata *property
 
 func (e *etcdSchemaRegistry) ListProperty(ctx context.Context, container *commonv1.Metadata) ([]*propertyv1.Property, error) {
 	if container.Group == "" {
-		return nil, errors.Wrap(ErrGroupAbsent, "list Property")
+		return nil, BadRequest("container.group", "group should not be empty")
 	}
 	messages, err := e.listWithPrefix(ctx, listPrefixesForEntity(container.Group, PropertyKeyPrefix+container.Name), func() proto.Message {
 		return &propertyv1.Property{}
@@ -54,6 +53,18 @@ func (e *etcdSchemaRegistry) ListProperty(ctx context.Context, container *common
 	return entities, nil
 }
 
+func (e *etcdSchemaRegistry) CreateProperty(ctx context.Context, property *propertyv1.Property) error {
+	m := transformKey(property.GetMetadata())
+	return e.create(ctx, Metadata{
+		TypeMeta: TypeMeta{
+			Kind:  KindProperty,
+			Group: m.GetGroup(),
+			Name:  m.GetName(),
+		},
+		Spec: property,
+	})
+}
+
 func (e *etcdSchemaRegistry) UpdateProperty(ctx context.Context, property *propertyv1.Property) error {
 	m := transformKey(property.GetMetadata())
 	return e.update(ctx, Metadata{
diff --git a/banyand/metadata/schema/schema.go b/banyand/metadata/schema/schema.go
index c0f4060..9a5c0c3 100644
--- a/banyand/metadata/schema/schema.go
+++ b/banyand/metadata/schema/schema.go
@@ -157,6 +157,7 @@ func (m Metadata) Equal(other proto.Message) bool {
 type Stream interface {
 	GetStream(ctx context.Context, metadata *commonv1.Metadata) (*databasev1.Stream, error)
 	ListStream(ctx context.Context, opt ListOpt) ([]*databasev1.Stream, error)
+	CreateStream(ctx context.Context, stream *databasev1.Stream) error
 	UpdateStream(ctx context.Context, stream *databasev1.Stream) error
 	DeleteStream(ctx context.Context, metadata *commonv1.Metadata) (bool, error)
 	RegisterHandler(Kind, EventHandler)
@@ -165,6 +166,7 @@ type Stream interface {
 type IndexRule interface {
 	GetIndexRule(ctx context.Context, metadata *commonv1.Metadata) (*databasev1.IndexRule, error)
 	ListIndexRule(ctx context.Context, opt ListOpt) ([]*databasev1.IndexRule, error)
+	CreateIndexRule(ctx context.Context, indexRule *databasev1.IndexRule) error
 	UpdateIndexRule(ctx context.Context, indexRule *databasev1.IndexRule) error
 	DeleteIndexRule(ctx context.Context, metadata *commonv1.Metadata) (bool, error)
 }
@@ -172,6 +174,7 @@ type IndexRule interface {
 type IndexRuleBinding interface {
 	GetIndexRuleBinding(ctx context.Context, metadata *commonv1.Metadata) (*databasev1.IndexRuleBinding, error)
 	ListIndexRuleBinding(ctx context.Context, opt ListOpt) ([]*databasev1.IndexRuleBinding, error)
+	CreateIndexRuleBinding(ctx context.Context, indexRuleBinding *databasev1.IndexRuleBinding) error
 	UpdateIndexRuleBinding(ctx context.Context, indexRuleBinding *databasev1.IndexRuleBinding) error
 	DeleteIndexRuleBinding(ctx context.Context, metadata *commonv1.Metadata) (bool, error)
 }
@@ -179,6 +182,7 @@ type IndexRuleBinding interface {
 type Measure interface {
 	GetMeasure(ctx context.Context, metadata *commonv1.Metadata) (*databasev1.Measure, error)
 	ListMeasure(ctx context.Context, opt ListOpt) ([]*databasev1.Measure, error)
+	CreateMeasure(ctx context.Context, measure *databasev1.Measure) error
 	UpdateMeasure(ctx context.Context, measure *databasev1.Measure) error
 	DeleteMeasure(ctx context.Context, metadata *commonv1.Metadata) (bool, error)
 	RegisterHandler(Kind, EventHandler)
@@ -189,12 +193,14 @@ type Group interface {
 	ListGroup(ctx context.Context) ([]*commonv1.Group, error)
 	// DeleteGroup delete all items belonging to the group
 	DeleteGroup(ctx context.Context, group string) (bool, error)
+	CreateGroup(ctx context.Context, group *commonv1.Group) error
 	UpdateGroup(ctx context.Context, group *commonv1.Group) error
 }
 
 type TopNAggregation interface {
 	GetTopNAggregation(ctx context.Context, metadata *commonv1.Metadata) (*databasev1.TopNAggregation, error)
 	ListTopNAggregation(ctx context.Context, opt ListOpt) ([]*databasev1.TopNAggregation, error)
+	CreateTopNAggregation(ctx context.Context, measure *databasev1.TopNAggregation) error
 	UpdateTopNAggregation(ctx context.Context, measure *databasev1.TopNAggregation) error
 	DeleteTopNAggregation(ctx context.Context, metadata *commonv1.Metadata) (bool, error)
 }
@@ -202,6 +208,7 @@ type TopNAggregation interface {
 type Property interface {
 	GetProperty(ctx context.Context, metadata *propertyv1.Metadata) (*propertyv1.Property, error)
 	ListProperty(ctx context.Context, container *commonv1.Metadata) ([]*propertyv1.Property, error)
+	CreateProperty(ctx context.Context, property *propertyv1.Property) error
 	UpdateProperty(ctx context.Context, property *propertyv1.Property) error
 	DeleteProperty(ctx context.Context, metadata *propertyv1.Metadata) (bool, error)
 }
diff --git a/banyand/metadata/schema/stream.go b/banyand/metadata/schema/stream.go
index bbcad7c..accc6d8 100644
--- a/banyand/metadata/schema/stream.go
+++ b/banyand/metadata/schema/stream.go
@@ -14,13 +14,12 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-//
+
 package schema
 
 import (
 	"context"
 
-	"github.com/pkg/errors"
 	"google.golang.org/protobuf/proto"
 
 	commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
@@ -39,7 +38,7 @@ func (e *etcdSchemaRegistry) GetStream(ctx context.Context, metadata *commonv1.M
 
 func (e *etcdSchemaRegistry) ListStream(ctx context.Context, opt ListOpt) ([]*databasev1.Stream, error) {
 	if opt.Group == "" {
-		return nil, errors.Wrap(ErrGroupAbsent, "list stream")
+		return nil, BadRequest("group", "group should not be empty")
 	}
 	messages, err := e.listWithPrefix(ctx, listPrefixesForEntity(opt.Group, StreamKeyPrefix), func() proto.Message {
 		return &databasev1.Stream{}
@@ -65,6 +64,17 @@ func (e *etcdSchemaRegistry) UpdateStream(ctx context.Context, stream *databasev
 	})
 }
 
+func (e *etcdSchemaRegistry) CreateStream(ctx context.Context, stream *databasev1.Stream) error {
+	return e.create(ctx, Metadata{
+		TypeMeta: TypeMeta{
+			Kind:  KindStream,
+			Group: stream.GetMetadata().GetGroup(),
+			Name:  stream.GetMetadata().GetName(),
+		},
+		Spec: stream,
+	})
+}
+
 func (e *etcdSchemaRegistry) DeleteStream(ctx context.Context, metadata *commonv1.Metadata) (bool, error) {
 	return e.delete(ctx, Metadata{
 		TypeMeta: TypeMeta{
diff --git a/banyand/metadata/schema/topn.go b/banyand/metadata/schema/topn.go
index af79e24..fc94dad 100644
--- a/banyand/metadata/schema/topn.go
+++ b/banyand/metadata/schema/topn.go
@@ -14,13 +14,12 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-//
+
 package schema
 
 import (
 	"context"
 
-	"github.com/pkg/errors"
 	"google.golang.org/protobuf/proto"
 
 	commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
@@ -39,7 +38,7 @@ func (e *etcdSchemaRegistry) GetTopNAggregation(ctx context.Context, metadata *c
 
 func (e *etcdSchemaRegistry) ListTopNAggregation(ctx context.Context, opt ListOpt) ([]*databasev1.TopNAggregation, error) {
 	if opt.Group == "" {
-		return nil, errors.Wrap(ErrGroupAbsent, "list TopNAggregation")
+		return nil, BadRequest("group", "group should not be empty")
 	}
 	messages, err := e.listWithPrefix(ctx, listPrefixesForEntity(opt.Group, TopNAggregationKeyPrefix), func() proto.Message {
 		return &databasev1.TopNAggregation{}
@@ -54,6 +53,17 @@ func (e *etcdSchemaRegistry) ListTopNAggregation(ctx context.Context, opt ListOp
 	return entities, nil
 }
 
+func (e *etcdSchemaRegistry) CreateTopNAggregation(ctx context.Context, topNAggregation *databasev1.TopNAggregation) error {
+	return e.create(ctx, Metadata{
+		TypeMeta: TypeMeta{
+			Kind:  KindTopNAggregation,
+			Group: topNAggregation.GetMetadata().GetGroup(),
+			Name:  topNAggregation.GetMetadata().GetName(),
+		},
+		Spec: topNAggregation,
+	})
+}
+
 func (e *etcdSchemaRegistry) UpdateTopNAggregation(ctx context.Context, topNAggregation *databasev1.TopNAggregation) error {
 	return e.update(ctx, Metadata{
 		TypeMeta: TypeMeta{
diff --git a/go.mod b/go.mod
index 4f5498c..e56cf81 100644
--- a/go.mod
+++ b/go.mod
@@ -26,7 +26,7 @@ require (
 	go.uber.org/multierr v1.7.0
 	golang.org/x/net v0.0.0-20210716203947-853a461950ff // indirect
 	golang.org/x/sys v0.0.0-20220114195835-da31bd327af9 // indirect
-	google.golang.org/genproto v0.0.0-20210722135532-667f2b7c528f // indirect
+	google.golang.org/genproto v0.0.0-20210722135532-667f2b7c528f
 	google.golang.org/grpc v1.39.0
 	google.golang.org/protobuf v1.27.1
 )
diff --git a/pkg/test/measure/etcd.go b/pkg/test/measure/etcd.go
index d5d65aa..a8242c1 100644
--- a/pkg/test/measure/etcd.go
+++ b/pkg/test/measure/etcd.go
@@ -48,22 +48,50 @@ var (
 
 func PreloadSchema(e schema.Registry) error {
 	if err := loadSchema(groupDir, &commonv1.Group{}, func(group proto.Message) error {
+		_, err := e.GetGroup(context.TODO(), group.(*commonv1.Group).GetMetadata().GetName())
+		if err != nil {
+			if schema.IsNotFound(err) {
+				return e.CreateGroup(context.TODO(), group.(*commonv1.Group))
+			}
+			return err
+		}
 		return e.UpdateGroup(context.TODO(), group.(*commonv1.Group))
 	}); err != nil {
 		return errors.WithStack(err)
 	}
-	if err := loadSchema(measureDir, &databasev1.Measure{}, func(group proto.Message) error {
-		return e.UpdateMeasure(context.TODO(), group.(*databasev1.Measure))
+	if err := loadSchema(measureDir, &databasev1.Measure{}, func(measure proto.Message) error {
+		_, err := e.GetMeasure(context.TODO(), measure.(*databasev1.Measure).GetMetadata())
+		if err != nil {
+			if schema.IsNotFound(err) {
+				return e.CreateMeasure(context.TODO(), measure.(*databasev1.Measure))
+			}
+			return err
+		}
+		return e.UpdateMeasure(context.TODO(), measure.(*databasev1.Measure))
 	}); err != nil {
 		return errors.WithStack(err)
 	}
-	if err := loadSchema(indexRuleDir, &databasev1.IndexRule{}, func(group proto.Message) error {
-		return e.UpdateIndexRule(context.TODO(), group.(*databasev1.IndexRule))
+	if err := loadSchema(indexRuleDir, &databasev1.IndexRule{}, func(indexRule proto.Message) error {
+		_, err := e.GetIndexRule(context.TODO(), indexRule.(*databasev1.IndexRule).GetMetadata())
+		if err != nil {
+			if schema.IsNotFound(err) {
+				return e.CreateIndexRule(context.TODO(), indexRule.(*databasev1.IndexRule))
+			}
+			return err
+		}
+		return e.UpdateIndexRule(context.TODO(), indexRule.(*databasev1.IndexRule))
 	}); err != nil {
 		return errors.WithStack(err)
 	}
-	if err := loadSchema(indexRuleBindingDir, &databasev1.IndexRuleBinding{}, func(group proto.Message) error {
-		return e.UpdateIndexRuleBinding(context.TODO(), group.(*databasev1.IndexRuleBinding))
+	if err := loadSchema(indexRuleBindingDir, &databasev1.IndexRuleBinding{}, func(indexRuleBinding proto.Message) error {
+		_, err := e.GetIndexRuleBinding(context.TODO(), indexRuleBinding.(*databasev1.IndexRuleBinding).GetMetadata())
+		if err != nil {
+			if schema.IsNotFound(err) {
+				return e.CreateIndexRuleBinding(context.TODO(), indexRuleBinding.(*databasev1.IndexRuleBinding))
+			}
+			return err
+		}
+		return e.UpdateIndexRuleBinding(context.TODO(), indexRuleBinding.(*databasev1.IndexRuleBinding))
 	}); err != nil {
 		return errors.WithStack(err)
 	}
diff --git a/pkg/test/stream/etcd.go b/pkg/test/stream/etcd.go
index e6ca510..6c9d7c8 100644
--- a/pkg/test/stream/etcd.go
+++ b/pkg/test/stream/etcd.go
@@ -50,25 +50,53 @@ func PreloadSchema(e schema.Registry) error {
 	if err := protojson.Unmarshal([]byte(groupJSON), g); err != nil {
 		return err
 	}
-	if err := e.UpdateGroup(context.TODO(), g); err != nil {
+
+	_, err := e.GetGroup(context.TODO(), g.GetMetadata().GetName())
+	if err != nil && schema.IsNotFound(err) {
+		if innerErr := e.CreateGroup(context.TODO(), g); innerErr != nil {
+			return innerErr
+		}
+	} else if err != nil {
 		return err
+	} else {
+		if innerErr := e.UpdateGroup(context.TODO(), g); innerErr != nil {
+			return innerErr
+		}
 	}
+
 	s := &databasev1.Stream{}
-	if err := protojson.Unmarshal([]byte(streamJSON), s); err != nil {
-		return err
+	if unmarshalErr := protojson.Unmarshal([]byte(streamJSON), s); unmarshalErr != nil {
+		return unmarshalErr
 	}
-	err := e.UpdateStream(context.Background(), s)
-	if err != nil {
+	_, err = e.GetStream(context.Background(), s.GetMetadata())
+	if err != nil && schema.IsNotFound(err) {
+		if innerErr := e.CreateStream(context.TODO(), s); innerErr != nil {
+			return innerErr
+		}
+	} else if err != nil {
 		return err
+	} else {
+		if innerErr := e.UpdateStream(context.TODO(), s); innerErr != nil {
+			return innerErr
+		}
 	}
 
 	indexRuleBinding := &databasev1.IndexRuleBinding{}
-	if err = protojson.Unmarshal([]byte(indexRuleBindingJSON), indexRuleBinding); err != nil {
-		return err
+	if unmarshalErr := protojson.Unmarshal([]byte(indexRuleBindingJSON), indexRuleBinding); unmarshalErr != nil {
+		return unmarshalErr
 	}
-	err = e.UpdateIndexRuleBinding(context.Background(), indexRuleBinding)
-	if err != nil {
+
+	_, err = e.GetIndexRuleBinding(context.Background(), indexRuleBinding.GetMetadata())
+	if err != nil && schema.IsNotFound(err) {
+		if innerErr := e.CreateIndexRuleBinding(context.TODO(), indexRuleBinding); innerErr != nil {
+			return innerErr
+		}
+	} else if err != nil {
 		return err
+	} else {
+		if innerErr := e.UpdateIndexRuleBinding(context.TODO(), indexRuleBinding); innerErr != nil {
+			return innerErr
+		}
 	}
 
 	entries, err := indexRuleStore.ReadDir(indexRuleDir)
@@ -85,9 +113,17 @@ func PreloadSchema(e schema.Registry) error {
 		if err != nil {
 			return err
 		}
-		err = e.UpdateIndexRule(context.Background(), &idxRule)
-		if err != nil {
+		_, err = e.GetIndexRule(context.Background(), idxRule.GetMetadata())
+		if err != nil && schema.IsNotFound(err) {
+			if innerErr := e.CreateIndexRule(context.TODO(), &idxRule); innerErr != nil {
+				return innerErr
+			}
+		} else if err != nil {
 			return err
+		} else {
+			if innerErr := e.UpdateIndexRule(context.TODO(), &idxRule); innerErr != nil {
+				return innerErr
+			}
 		}
 	}