You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by GitBox <gi...@apache.org> on 2021/10/10 11:11:01 UTC

[GitHub] [skywalking-banyandb] lujiajing1126 opened a new pull request #55: Feat: metadata registry with embedded etcd

lujiajing1126 opened a new pull request #55:
URL: https://github.com/apache/skywalking-banyandb/pull/55


   This PR intends to add registry for `Stream`, `IndexRuleBinding` and `IndexRule` with embedded etcd.
   
   @hanahmily Pls have a review.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-banyandb] hanahmily commented on pull request #55: Feat: metadata registry with embedded etcd

Posted by GitBox <gi...@apache.org>.
hanahmily commented on pull request #55:
URL: https://github.com/apache/skywalking-banyandb/pull/55#issuecomment-940648915


   @lujiajing1126 Please add `Measure` related APIs in this.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-banyandb] hanahmily commented on a change in pull request #55: Feat: metadata registry with embedded etcd

Posted by GitBox <gi...@apache.org>.
hanahmily commented on a change in pull request #55:
URL: https://github.com/apache/skywalking-banyandb/pull/55#discussion_r725626454



##########
File path: api/proto/banyandb/database/v1/rpc.proto
##########
@@ -0,0 +1,77 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+syntax = "proto3";
+
+option java_package = "org.apache.skywalking.banyandb.database.v1";
+option go_package = "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1";
+
+package banyandb.database.v1;
+
+import "banyandb/database/v1/schema.proto";
+import "banyandb/common/v1/common.proto";
+import "google/protobuf/struct.proto";
+
+// EntityType includes three kinds of entities (schema)
+// that can be create/delete/modified at runtime.
+enum EntityType {
+  ENTITY_TYPE_UNSPECIFIED = 0;
+  ENTITY_TYPE_STREAM = 1;
+  ENTITY_TYPE_INDEX_RULE_BINDING = 2;
+  ENTITY_TYPE_INDEX_RULE = 3;
+}
+
+// EntityCreateOrUpdateRequest is the request for creating
+// or updating entity.
+message EntityCreateOrUpdateRequest {
+  oneof entity {
+    banyandb.database.v1.Stream stream = 1;
+    banyandb.database.v1.IndexRuleBinding index_rule_binding = 2;
+    banyandb.database.v1.IndexRule index_rule = 3;
+  }
+}
+
+message EntityCreateOrUpdateResponse {
+}
+
+// GeneralEntityRequest can be used to delete, get entity
+// with the given entity_type and its metadata.
+message GeneralEntityRequest {
+  banyandb.common.v1.Metadata metadata = 1;
+  EntityType entity_type = 2;
+}
+
+message EntityDeleteResponse {
+}
+
+// EntityGetResponse is the response for Get method.
+// It contains an entity which is possibly nullable which
+// means the entity cannot be found
+message EntityGetResponse {
+  oneof entity {
+    google.protobuf.NullValue null = 1;
+    banyandb.database.v1.Stream stream = 2;
+    banyandb.database.v1.IndexRuleBinding index_rule_binding = 3;
+    banyandb.database.v1.IndexRule index_rule = 4;
+  }
+}
+
+service EntityRegistry {
+  rpc CreateOrUpdate(EntityCreateOrUpdateRequest) returns (EntityCreateOrUpdateResponse);

Review comment:
       We should split it to `Create` and `Update` as we can grant different permission to them once the auth sub-system is built. 

##########
File path: api/proto/banyandb/database/v1/rpc.proto
##########
@@ -0,0 +1,77 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+syntax = "proto3";
+
+option java_package = "org.apache.skywalking.banyandb.database.v1";
+option go_package = "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1";
+
+package banyandb.database.v1;
+
+import "banyandb/database/v1/schema.proto";
+import "banyandb/common/v1/common.proto";
+import "google/protobuf/struct.proto";
+
+// EntityType includes three kinds of entities (schema)
+// that can be create/delete/modified at runtime.
+enum EntityType {
+  ENTITY_TYPE_UNSPECIFIED = 0;
+  ENTITY_TYPE_STREAM = 1;
+  ENTITY_TYPE_INDEX_RULE_BINDING = 2;
+  ENTITY_TYPE_INDEX_RULE = 3;
+}
+
+// EntityCreateOrUpdateRequest is the request for creating
+// or updating entity.
+message EntityCreateOrUpdateRequest {
+  oneof entity {
+    banyandb.database.v1.Stream stream = 1;
+    banyandb.database.v1.IndexRuleBinding index_rule_binding = 2;
+    banyandb.database.v1.IndexRule index_rule = 3;
+  }
+}
+
+message EntityCreateOrUpdateResponse {
+}
+
+// GeneralEntityRequest can be used to delete, get entity
+// with the given entity_type and its metadata.
+message GeneralEntityRequest {
+  banyandb.common.v1.Metadata metadata = 1;
+  EntityType entity_type = 2;
+}
+
+message EntityDeleteResponse {
+}
+
+// EntityGetResponse is the response for Get method.
+// It contains an entity which is possibly nullable which
+// means the entity cannot be found
+message EntityGetResponse {
+  oneof entity {
+    google.protobuf.NullValue null = 1;
+    banyandb.database.v1.Stream stream = 2;
+    banyandb.database.v1.IndexRuleBinding index_rule_binding = 3;
+    banyandb.database.v1.IndexRule index_rule = 4;
+  }
+}
+
+service EntityRegistry {
+  rpc CreateOrUpdate(EntityCreateOrUpdateRequest) returns (EntityCreateOrUpdateResponse);
+  rpc Delete(GeneralEntityRequest) returns (EntityDeleteResponse);
+  rpc Get(GeneralEntityRequest) returns (EntityGetResponse);

Review comment:
       We need a `List` service which is helpful for observing the database. 

##########
File path: api/proto/banyandb/database/v1/rpc.proto
##########
@@ -0,0 +1,77 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+syntax = "proto3";
+
+option java_package = "org.apache.skywalking.banyandb.database.v1";
+option go_package = "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1";
+
+package banyandb.database.v1;
+
+import "banyandb/database/v1/schema.proto";
+import "banyandb/common/v1/common.proto";
+import "google/protobuf/struct.proto";
+
+// EntityType includes three kinds of entities (schema)
+// that can be create/delete/modified at runtime.
+enum EntityType {
+  ENTITY_TYPE_UNSPECIFIED = 0;
+  ENTITY_TYPE_STREAM = 1;
+  ENTITY_TYPE_INDEX_RULE_BINDING = 2;
+  ENTITY_TYPE_INDEX_RULE = 3;
+}
+
+// EntityCreateOrUpdateRequest is the request for creating
+// or updating entity.
+message EntityCreateOrUpdateRequest {
+  oneof entity {
+    banyandb.database.v1.Stream stream = 1;
+    banyandb.database.v1.IndexRuleBinding index_rule_binding = 2;
+    banyandb.database.v1.IndexRule index_rule = 3;
+  }
+}
+
+message EntityCreateOrUpdateResponse {
+}
+
+// GeneralEntityRequest can be used to delete, get entity
+// with the given entity_type and its metadata.
+message GeneralEntityRequest {
+  banyandb.common.v1.Metadata metadata = 1;
+  EntityType entity_type = 2;
+}
+
+message EntityDeleteResponse {
+}
+
+// EntityGetResponse is the response for Get method.
+// It contains an entity which is possibly nullable which
+// means the entity cannot be found
+message EntityGetResponse {
+  oneof entity {
+    google.protobuf.NullValue null = 1;
+    banyandb.database.v1.Stream stream = 2;
+    banyandb.database.v1.IndexRuleBinding index_rule_binding = 3;
+    banyandb.database.v1.IndexRule index_rule = 4;
+  }
+}
+
+service EntityRegistry {
+  rpc CreateOrUpdate(EntityCreateOrUpdateRequest) returns (EntityCreateOrUpdateResponse);
+  rpc Delete(GeneralEntityRequest) returns (EntityDeleteResponse);
+  rpc Get(GeneralEntityRequest) returns (EntityGetResponse);

Review comment:
       We should introduce the `Group` CURD service here. Other "entities" belong to a group. 

##########
File path: api/proto/banyandb/database/v1/rpc.proto
##########
@@ -0,0 +1,77 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+syntax = "proto3";
+
+option java_package = "org.apache.skywalking.banyandb.database.v1";
+option go_package = "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1";
+
+package banyandb.database.v1;
+
+import "banyandb/database/v1/schema.proto";
+import "banyandb/common/v1/common.proto";
+import "google/protobuf/struct.proto";
+
+// EntityType includes three kinds of entities (schema)
+// that can be create/delete/modified at runtime.
+enum EntityType {
+  ENTITY_TYPE_UNSPECIFIED = 0;
+  ENTITY_TYPE_STREAM = 1;
+  ENTITY_TYPE_INDEX_RULE_BINDING = 2;
+  ENTITY_TYPE_INDEX_RULE = 3;
+}
+
+// EntityCreateOrUpdateRequest is the request for creating
+// or updating entity.
+message EntityCreateOrUpdateRequest {
+  oneof entity {
+    banyandb.database.v1.Stream stream = 1;
+    banyandb.database.v1.IndexRuleBinding index_rule_binding = 2;
+    banyandb.database.v1.IndexRule index_rule = 3;
+  }
+}
+
+message EntityCreateOrUpdateResponse {
+}
+
+// GeneralEntityRequest can be used to delete, get entity
+// with the given entity_type and its metadata.
+message GeneralEntityRequest {
+  banyandb.common.v1.Metadata metadata = 1;
+  EntityType entity_type = 2;
+}
+
+message EntityDeleteResponse {
+}
+
+// EntityGetResponse is the response for Get method.
+// It contains an entity which is possibly nullable which
+// means the entity cannot be found
+message EntityGetResponse {
+  oneof entity {
+    google.protobuf.NullValue null = 1;
+    banyandb.database.v1.Stream stream = 2;
+    banyandb.database.v1.IndexRuleBinding index_rule_binding = 3;
+    banyandb.database.v1.IndexRule index_rule = 4;
+  }
+}
+
+service EntityRegistry {

Review comment:
       I don't like this style. 
   
   It’s important to keep in our mind that APIs may change over time, and we don’t want to couple these separate RPC calls tightly together. If we re-use the same message object across RPC calls, some fields are set for one call but ignored for another.
   
   The server also has a more difficult time, as it needs to ignore certain fields 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-banyandb] lujiajing1126 commented on pull request #55: Feat: metadata registry with embedded etcd

Posted by GitBox <gi...@apache.org>.
lujiajing1126 commented on pull request #55:
URL: https://github.com/apache/skywalking-banyandb/pull/55#issuecomment-943385120


   > > @hanahmily It seems `protoc` is included in the `buf` and what we've installed is actually not used by the `buf generate`.
   > 
   > After some local verifications, I got the same result. How about to remove `protoc` installation process from `base.mk`?
   
   Removed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-banyandb] lujiajing1126 commented on pull request #55: Feat: metadata registry with embedded etcd

Posted by GitBox <gi...@apache.org>.
lujiajing1126 commented on pull request #55:
URL: https://github.com/apache/skywalking-banyandb/pull/55#issuecomment-942936677


   @hanahmily It seems `protoc` is included in the `buf` and what we've installed is actually not used by the `buf generate`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-banyandb] lujiajing1126 commented on a change in pull request #55: Feat: metadata registry with embedded etcd

Posted by GitBox <gi...@apache.org>.
lujiajing1126 commented on a change in pull request #55:
URL: https://github.com/apache/skywalking-banyandb/pull/55#discussion_r730591005



##########
File path: banyand/metadata/schema/etcd.go
##########
@@ -0,0 +1,520 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package schema
+
+import (
+	"context"
+	"fmt"
+	"math/rand"
+	"net/url"
+	"os"
+	"path"
+	"path/filepath"
+
+	googleUUID "github.com/google/uuid"
+	"github.com/pkg/errors"
+	clientv3 "go.etcd.io/etcd/client/v3"
+	"go.etcd.io/etcd/server/v3/embed"
+	"google.golang.org/protobuf/encoding/protojson"
+	"google.golang.org/protobuf/proto"
+
+	commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+	databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+)
+
+var (
+	_ Stream           = (*etcdSchemaRegistry)(nil)
+	_ IndexRuleBinding = (*etcdSchemaRegistry)(nil)
+	_ IndexRule        = (*etcdSchemaRegistry)(nil)
+	_ Measure          = (*etcdSchemaRegistry)(nil)
+	_ Group            = (*etcdSchemaRegistry)(nil)
+
+	ErrEntityNotFound             = errors.New("entity is not found")
+	ErrUnexpectedNumberOfEntities = errors.New("unexpected number of entities")
+	ErrGroupAlreadyDefined        = errors.New("group is already defined")
+	ErrGroupNotDefined            = errors.New("group is not defined or has already been deleted")
+
+	StreamKeyPrefix           = "/stream/"
+	IndexRuleBindingKeyPrefix = "/index-rule-binding/"
+	IndexRuleKeyPrefix        = "/index-rule/"
+	MeasureKeyPrefix          = "/measure/"
+	GroupsKeyPrefix           = "/groups/"
+)
+
+type RegistryOption func(*etcdSchemaRegistryConfig)
+
+func PreloadSchema() RegistryOption {
+	return func(config *etcdSchemaRegistryConfig) {
+		config.preload = true
+	}
+}
+
+func UseRandomTempDir() RegistryOption {
+	return func(config *etcdSchemaRegistryConfig) {
+		config.rootDir = RandomTempDir()
+	}
+}
+
+func RandomTempDir() string {
+	return path.Join(os.TempDir(), fmt.Sprintf("banyandb-embed-etcd-%s", googleUUID.New().String()))
+}
+
+func RootDir(rootDir string) RegistryOption {
+	return func(config *etcdSchemaRegistryConfig) {
+		config.rootDir = rootDir
+	}
+}
+
+func UseListener(client, peer string) RegistryOption {
+	return func(config *etcdSchemaRegistryConfig) {
+		config.listenerClientURL = client
+		config.listenerPeerURL = peer
+	}
+}
+
+func UseUnixDomain() RegistryOption {
+	return func(config *etcdSchemaRegistryConfig) {
+		config.listenerClientURL, config.listenerPeerURL = RandomUnixDomainListener()
+	}
+}
+
+func RandomUnixDomainListener() (string, string) {
+	i := rand.Uint64()
+	return fmt.Sprintf("%s://localhost:%d%06d", "unix", os.Getpid(), i), fmt.Sprintf("%s://localhost:%d%06d", "unix", os.Getpid(), i+1)
+}
+
+type etcdSchemaRegistry struct {
+	server *embed.Etcd
+	kv     clientv3.KV
+}
+
+type etcdSchemaRegistryConfig struct {
+	// preload internal schema
+	preload bool
+	// rootDir is the root directory for etcd storage
+	rootDir string
+	// listenerClientURL is the listener for client
+	listenerClientURL string
+	// listenerPeerURL is the listener for peer
+	listenerPeerURL string
+}
+
+func (e *etcdSchemaRegistry) ExistGroup(ctx context.Context, group string) (bool, error) {

Review comment:
       `GetGroup` should return everything in this group?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-banyandb] hanahmily merged pull request #55: Feat: metadata registry with embedded etcd

Posted by GitBox <gi...@apache.org>.
hanahmily merged pull request #55:
URL: https://github.com/apache/skywalking-banyandb/pull/55


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-banyandb] lujiajing1126 commented on a change in pull request #55: Feat: metadata registry with embedded etcd

Posted by GitBox <gi...@apache.org>.
lujiajing1126 commented on a change in pull request #55:
URL: https://github.com/apache/skywalking-banyandb/pull/55#discussion_r730590802



##########
File path: banyand/metadata/schema/etcd.go
##########
@@ -0,0 +1,520 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package schema
+
+import (
+	"context"
+	"fmt"
+	"math/rand"
+	"net/url"
+	"os"
+	"path"
+	"path/filepath"
+
+	googleUUID "github.com/google/uuid"
+	"github.com/pkg/errors"
+	clientv3 "go.etcd.io/etcd/client/v3"
+	"go.etcd.io/etcd/server/v3/embed"
+	"google.golang.org/protobuf/encoding/protojson"
+	"google.golang.org/protobuf/proto"
+
+	commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+	databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+)
+
+var (
+	_ Stream           = (*etcdSchemaRegistry)(nil)
+	_ IndexRuleBinding = (*etcdSchemaRegistry)(nil)
+	_ IndexRule        = (*etcdSchemaRegistry)(nil)
+	_ Measure          = (*etcdSchemaRegistry)(nil)
+	_ Group            = (*etcdSchemaRegistry)(nil)
+
+	ErrEntityNotFound             = errors.New("entity is not found")
+	ErrUnexpectedNumberOfEntities = errors.New("unexpected number of entities")
+	ErrGroupAlreadyDefined        = errors.New("group is already defined")
+	ErrGroupNotDefined            = errors.New("group is not defined or has already been deleted")
+
+	StreamKeyPrefix           = "/stream/"
+	IndexRuleBindingKeyPrefix = "/index-rule-binding/"
+	IndexRuleKeyPrefix        = "/index-rule/"
+	MeasureKeyPrefix          = "/measure/"
+	GroupsKeyPrefix           = "/groups/"

Review comment:
       Done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-banyandb] lujiajing1126 commented on a change in pull request #55: Feat: metadata registry with embedded etcd

Posted by GitBox <gi...@apache.org>.
lujiajing1126 commented on a change in pull request #55:
URL: https://github.com/apache/skywalking-banyandb/pull/55#discussion_r729012821



##########
File path: api/proto/banyandb/database/v1/rpc.proto
##########
@@ -0,0 +1,77 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+syntax = "proto3";
+
+option java_package = "org.apache.skywalking.banyandb.database.v1";
+option go_package = "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1";
+
+package banyandb.database.v1;
+
+import "banyandb/database/v1/schema.proto";
+import "banyandb/common/v1/common.proto";
+import "google/protobuf/struct.proto";
+
+// EntityType includes three kinds of entities (schema)
+// that can be create/delete/modified at runtime.
+enum EntityType {
+  ENTITY_TYPE_UNSPECIFIED = 0;
+  ENTITY_TYPE_STREAM = 1;
+  ENTITY_TYPE_INDEX_RULE_BINDING = 2;
+  ENTITY_TYPE_INDEX_RULE = 3;
+}
+
+// EntityCreateOrUpdateRequest is the request for creating
+// or updating entity.
+message EntityCreateOrUpdateRequest {
+  oneof entity {
+    banyandb.database.v1.Stream stream = 1;
+    banyandb.database.v1.IndexRuleBinding index_rule_binding = 2;
+    banyandb.database.v1.IndexRule index_rule = 3;
+  }
+}
+
+message EntityCreateOrUpdateResponse {
+}
+
+// GeneralEntityRequest can be used to delete, get entity
+// with the given entity_type and its metadata.
+message GeneralEntityRequest {
+  banyandb.common.v1.Metadata metadata = 1;
+  EntityType entity_type = 2;
+}
+
+message EntityDeleteResponse {
+}
+
+// EntityGetResponse is the response for Get method.
+// It contains an entity which is possibly nullable which
+// means the entity cannot be found
+message EntityGetResponse {
+  oneof entity {
+    google.protobuf.NullValue null = 1;
+    banyandb.database.v1.Stream stream = 2;
+    banyandb.database.v1.IndexRuleBinding index_rule_binding = 3;
+    banyandb.database.v1.IndexRule index_rule = 4;
+  }
+}
+
+service EntityRegistry {
+  rpc CreateOrUpdate(EntityCreateOrUpdateRequest) returns (EntityCreateOrUpdateResponse);
+  rpc Delete(GeneralEntityRequest) returns (EntityDeleteResponse);
+  rpc Get(GeneralEntityRequest) returns (EntityGetResponse);

Review comment:
       Group and List are added




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-banyandb] hanahmily commented on a change in pull request #55: Feat: metadata registry with embedded etcd

Posted by GitBox <gi...@apache.org>.
hanahmily commented on a change in pull request #55:
URL: https://github.com/apache/skywalking-banyandb/pull/55#discussion_r730617968



##########
File path: banyand/metadata/schema/etcd.go
##########
@@ -0,0 +1,520 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package schema
+
+import (
+	"context"
+	"fmt"
+	"math/rand"
+	"net/url"
+	"os"
+	"path"
+	"path/filepath"
+
+	googleUUID "github.com/google/uuid"
+	"github.com/pkg/errors"
+	clientv3 "go.etcd.io/etcd/client/v3"
+	"go.etcd.io/etcd/server/v3/embed"
+	"google.golang.org/protobuf/encoding/protojson"
+	"google.golang.org/protobuf/proto"
+
+	commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+	databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+)
+
+var (
+	_ Stream           = (*etcdSchemaRegistry)(nil)
+	_ IndexRuleBinding = (*etcdSchemaRegistry)(nil)
+	_ IndexRule        = (*etcdSchemaRegistry)(nil)
+	_ Measure          = (*etcdSchemaRegistry)(nil)
+	_ Group            = (*etcdSchemaRegistry)(nil)
+
+	ErrEntityNotFound             = errors.New("entity is not found")
+	ErrUnexpectedNumberOfEntities = errors.New("unexpected number of entities")
+	ErrGroupAlreadyDefined        = errors.New("group is already defined")
+	ErrGroupNotDefined            = errors.New("group is not defined or has already been deleted")
+
+	StreamKeyPrefix           = "/stream/"
+	IndexRuleBindingKeyPrefix = "/index-rule-binding/"
+	IndexRuleKeyPrefix        = "/index-rule/"
+	MeasureKeyPrefix          = "/measure/"
+	GroupsKeyPrefix           = "/groups/"
+)
+
+type RegistryOption func(*etcdSchemaRegistryConfig)
+
+func PreloadSchema() RegistryOption {

Review comment:
       Move the procedure into `testdata` folder, load them in the `setup` phase of cases.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-banyandb] lujiajing1126 commented on a change in pull request #55: Feat: metadata registry with embedded etcd

Posted by GitBox <gi...@apache.org>.
lujiajing1126 commented on a change in pull request #55:
URL: https://github.com/apache/skywalking-banyandb/pull/55#discussion_r730593875



##########
File path: banyand/metadata/schema/etcd.go
##########
@@ -0,0 +1,520 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package schema
+
+import (
+	"context"
+	"fmt"
+	"math/rand"
+	"net/url"
+	"os"
+	"path"
+	"path/filepath"
+
+	googleUUID "github.com/google/uuid"
+	"github.com/pkg/errors"
+	clientv3 "go.etcd.io/etcd/client/v3"
+	"go.etcd.io/etcd/server/v3/embed"
+	"google.golang.org/protobuf/encoding/protojson"
+	"google.golang.org/protobuf/proto"
+
+	commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+	databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+)
+
+var (
+	_ Stream           = (*etcdSchemaRegistry)(nil)
+	_ IndexRuleBinding = (*etcdSchemaRegistry)(nil)
+	_ IndexRule        = (*etcdSchemaRegistry)(nil)
+	_ Measure          = (*etcdSchemaRegistry)(nil)
+	_ Group            = (*etcdSchemaRegistry)(nil)
+
+	ErrEntityNotFound             = errors.New("entity is not found")
+	ErrUnexpectedNumberOfEntities = errors.New("unexpected number of entities")
+	ErrGroupAlreadyDefined        = errors.New("group is already defined")
+	ErrGroupNotDefined            = errors.New("group is not defined or has already been deleted")
+
+	StreamKeyPrefix           = "/stream/"
+	IndexRuleBindingKeyPrefix = "/index-rule-binding/"
+	IndexRuleKeyPrefix        = "/index-rule/"
+	MeasureKeyPrefix          = "/measure/"
+	GroupsKeyPrefix           = "/groups/"
+)
+
+type RegistryOption func(*etcdSchemaRegistryConfig)
+
+func PreloadSchema() RegistryOption {
+	return func(config *etcdSchemaRegistryConfig) {
+		config.preload = true
+	}
+}
+
+func UseRandomTempDir() RegistryOption {
+	return func(config *etcdSchemaRegistryConfig) {
+		config.rootDir = RandomTempDir()
+	}
+}
+
+func RandomTempDir() string {
+	return path.Join(os.TempDir(), fmt.Sprintf("banyandb-embed-etcd-%s", googleUUID.New().String()))
+}
+
+func RootDir(rootDir string) RegistryOption {
+	return func(config *etcdSchemaRegistryConfig) {
+		config.rootDir = rootDir
+	}
+}
+
+func UseListener(client, peer string) RegistryOption {
+	return func(config *etcdSchemaRegistryConfig) {
+		config.listenerClientURL = client
+		config.listenerPeerURL = peer
+	}
+}
+
+func UseUnixDomain() RegistryOption {
+	return func(config *etcdSchemaRegistryConfig) {
+		config.listenerClientURL, config.listenerPeerURL = RandomUnixDomainListener()
+	}
+}
+
+func RandomUnixDomainListener() (string, string) {
+	i := rand.Uint64()
+	return fmt.Sprintf("%s://localhost:%d%06d", "unix", os.Getpid(), i), fmt.Sprintf("%s://localhost:%d%06d", "unix", os.Getpid(), i+1)
+}
+
+type etcdSchemaRegistry struct {
+	server *embed.Etcd
+	kv     clientv3.KV
+}
+
+type etcdSchemaRegistryConfig struct {
+	// preload internal schema
+	preload bool
+	// rootDir is the root directory for etcd storage
+	rootDir string
+	// listenerClientURL is the listener for client
+	listenerClientURL string
+	// listenerPeerURL is the listener for peer
+	listenerPeerURL string
+}
+
+func (e *etcdSchemaRegistry) ExistGroup(ctx context.Context, group string) (bool, error) {
+	var entity commonv1.Group
+	err := e.get(ctx, formatGroupKey(group), &entity)
+	if err != nil && !errors.Is(err, ErrEntityNotFound) {
+		return false, err
+	}
+	return entity.GetName() != "" && !entity.Deleted, nil
+}
+
+func (e *etcdSchemaRegistry) ListGroup(ctx context.Context) ([]string, error) {
+	messages, err := e.listWithPrefix(ctx, GroupsKeyPrefix, func() proto.Message {
+		return &commonv1.Group{}
+	})
+	if err != nil {
+		return nil, err
+	}
+	groups := make([]string, len(messages))
+	for i, message := range messages {
+		groups[i] = message.(*commonv1.Group).GetName()
+	}
+	return groups, nil
+}
+
+func (e *etcdSchemaRegistry) DeleteGroup(ctx context.Context, group string) (bool, error) {
+	exist, err := e.ExistGroup(ctx, group)
+	if err != nil {
+		return false, err
+	}
+	if !exist {
+		return false, errors.Wrap(ErrGroupNotDefined, group)
+	}
+	if err := e.update(ctx, formatGroupKey(group), &commonv1.Group{
+		Name:    group,
+		Deleted: true,
+	}); err != nil {
+		return false, err
+	}
+	return true, nil
+}
+
+func (e *etcdSchemaRegistry) CreateGroup(ctx context.Context, group string) error {
+	exist, err := e.ExistGroup(ctx, group)
+	if err != nil {
+		return err
+	}
+	if exist {
+		return errors.Wrap(ErrGroupAlreadyDefined, group)
+	}
+	return e.update(ctx, formatGroupKey(group), &commonv1.Group{
+		Name:    group,
+		Deleted: false,
+	})
+}
+
+func (e *etcdSchemaRegistry) GetMeasure(ctx context.Context, metadata *commonv1.Metadata) (*databasev1.Measure, error) {
+	var entity databasev1.Measure
+	if err := e.get(ctx, formatMeasureKey(metadata), &entity); err != nil {
+		return nil, err
+	}
+	return &entity, nil
+}
+
+func (e *etcdSchemaRegistry) ListMeasure(ctx context.Context, opt ListOpt) ([]*databasev1.Measure, error) {
+	keyPrefix := MeasureKeyPrefix
+	if opt.Group != "" {
+		keyPrefix += opt.Group + "/"
+	}
+	messages, err := e.listWithPrefix(ctx, keyPrefix, func() proto.Message {
+		return &databasev1.Measure{}
+	})
+	if err != nil {
+		return nil, err
+	}
+	entities := make([]*databasev1.Measure, len(messages))
+	for i, message := range messages {
+		entities[i] = message.(*databasev1.Measure)
+	}
+	return entities, nil
+}
+
+func (e *etcdSchemaRegistry) UpdateMeasure(ctx context.Context, measure *databasev1.Measure) error {
+	groupExist, err := e.ExistGroup(ctx, measure.GetMetadata().GetGroup())
+	if err != nil {
+		return err
+	}
+	if !groupExist {
+		return errors.Wrap(ErrGroupNotDefined, measure.GetMetadata().GetGroup())
+	}
+	return e.update(ctx, formatMeasureKey(measure.GetMetadata()), measure)
+}
+
+func (e *etcdSchemaRegistry) DeleteMeasure(ctx context.Context, metadata *commonv1.Metadata) (bool, error) {
+	return e.delete(ctx, formatMeasureKey(metadata))
+}
+
+func (e *etcdSchemaRegistry) GetStream(ctx context.Context, metadata *commonv1.Metadata) (*databasev1.Stream, error) {
+	var entity databasev1.Stream
+	if err := e.get(ctx, formatSteamKey(metadata), &entity); err != nil {
+		return nil, err
+	}
+	return &entity, nil
+}
+
+func (e *etcdSchemaRegistry) ListStream(ctx context.Context, opt ListOpt) ([]*databasev1.Stream, error) {
+	keyPrefix := StreamKeyPrefix
+	if opt.Group != "" {
+		keyPrefix += opt.Group + "/"
+	}
+	messages, err := e.listWithPrefix(ctx, keyPrefix, func() proto.Message {
+		return &databasev1.Stream{}
+	})
+	if err != nil {
+		return nil, err
+	}
+	entities := make([]*databasev1.Stream, len(messages))
+	for i, message := range messages {
+		entities[i] = message.(*databasev1.Stream)
+	}
+	return entities, nil
+}
+
+func (e *etcdSchemaRegistry) UpdateStream(ctx context.Context, stream *databasev1.Stream) error {
+	groupExist, err := e.ExistGroup(ctx, stream.GetMetadata().GetGroup())
+	if err != nil {
+		return err
+	}
+	if !groupExist {
+		return errors.Wrap(ErrGroupNotDefined, stream.GetMetadata().GetGroup())
+	}
+	return e.update(ctx, formatSteamKey(stream.GetMetadata()), stream)
+}
+
+func (e *etcdSchemaRegistry) DeleteStream(ctx context.Context, metadata *commonv1.Metadata) (bool, error) {
+	return e.delete(ctx, formatSteamKey(metadata))
+}
+
+func (e *etcdSchemaRegistry) GetIndexRuleBinding(ctx context.Context, metadata *commonv1.Metadata) (*databasev1.IndexRuleBinding, error) {
+	var indexRuleBinding databasev1.IndexRuleBinding
+	if err := e.get(ctx, formatIndexRuleBindingKey(metadata), &indexRuleBinding); err != nil {
+		return nil, err
+	}
+	return &indexRuleBinding, nil
+}
+
+func (e *etcdSchemaRegistry) ListIndexRuleBinding(ctx context.Context, opt ListOpt) ([]*databasev1.IndexRuleBinding, error) {
+	keyPrefix := IndexRuleBindingKeyPrefix
+	if opt.Group != "" {
+		keyPrefix += opt.Group + "/"
+	}
+	messages, err := e.listWithPrefix(ctx, keyPrefix, func() proto.Message {
+		return &databasev1.IndexRuleBinding{}
+	})
+	if err != nil {
+		return nil, err
+	}
+	entities := make([]*databasev1.IndexRuleBinding, len(messages))
+	for i, message := range messages {
+		entities[i] = message.(*databasev1.IndexRuleBinding)
+	}
+	return entities, nil
+}
+
+func (e *etcdSchemaRegistry) UpdateIndexRuleBinding(ctx context.Context, indexRuleBinding *databasev1.IndexRuleBinding) error {
+	groupExist, err := e.ExistGroup(ctx, indexRuleBinding.GetMetadata().GetGroup())
+	if err != nil {
+		return err
+	}
+	if !groupExist {
+		return errors.Wrap(ErrGroupNotDefined, indexRuleBinding.GetMetadata().GetGroup())
+	}
+	return e.update(ctx, formatIndexRuleBindingKey(indexRuleBinding.GetMetadata()), indexRuleBinding)
+}
+
+func (e *etcdSchemaRegistry) DeleteIndexRuleBinding(ctx context.Context, metadata *commonv1.Metadata) (bool, error) {
+	return e.delete(ctx, formatIndexRuleBindingKey(metadata))
+}
+
+func (e *etcdSchemaRegistry) GetIndexRule(ctx context.Context, metadata *commonv1.Metadata) (*databasev1.IndexRule, error) {
+	var entity databasev1.IndexRule
+	if err := e.get(ctx, formatIndexRuleKey(metadata), &entity); err != nil {
+		return nil, err
+	}
+	return &entity, nil
+}
+
+func (e *etcdSchemaRegistry) ListIndexRule(ctx context.Context, opt ListOpt) ([]*databasev1.IndexRule, error) {
+	keyPrefix := IndexRuleKeyPrefix
+	if opt.Group != "" {
+		keyPrefix += opt.Group + "/"
+	}
+	messages, err := e.listWithPrefix(ctx, keyPrefix, func() proto.Message {
+		return &databasev1.IndexRule{}
+	})
+	if err != nil {
+		return nil, err
+	}
+	entities := make([]*databasev1.IndexRule, len(messages))
+	for i, message := range messages {
+		entities[i] = message.(*databasev1.IndexRule)
+	}
+	return entities, nil
+}
+
+func (e *etcdSchemaRegistry) UpdateIndexRule(ctx context.Context, indexRule *databasev1.IndexRule) error {
+	groupExist, err := e.ExistGroup(ctx, indexRule.GetMetadata().GetGroup())
+	if err != nil {
+		return err
+	}
+	if !groupExist {
+		return errors.Wrap(ErrGroupNotDefined, indexRule.GetMetadata().GetGroup())
+	}
+	return e.update(ctx, formatIndexRuleKey(indexRule.GetMetadata()), indexRule)
+}
+
+func (e *etcdSchemaRegistry) DeleteIndexRule(ctx context.Context, metadata *commonv1.Metadata) (bool, error) {
+	return e.delete(ctx, formatIndexRuleKey(metadata))
+}
+
+func (e *etcdSchemaRegistry) preload() error {
+	if err := e.CreateGroup(context.TODO(), "default"); err != nil {
+		return err
+	}
+
+	s := &databasev1.Stream{}
+	if err := protojson.Unmarshal([]byte(streamJSON), s); err != nil {
+		return err
+	}
+	err := e.UpdateStream(context.Background(), s)
+	if err != nil {
+		return err
+	}
+
+	indexRuleBinding := &databasev1.IndexRuleBinding{}
+	if err = protojson.Unmarshal([]byte(indexRuleBindingJSON), indexRuleBinding); err != nil {
+		return err
+	}
+	err = e.UpdateIndexRuleBinding(context.Background(), indexRuleBinding)
+	if err != nil {
+		return err
+	}
+
+	entries, err := indexRuleStore.ReadDir(indexRuleDir)
+	if err != nil {
+		return err
+	}
+	for _, entry := range entries {
+		data, err := indexRuleStore.ReadFile(indexRuleDir + "/" + entry.Name())
+		if err != nil {
+			return err
+		}
+		var idxRule databasev1.IndexRule
+		err = protojson.Unmarshal(data, &idxRule)
+		if err != nil {
+			return err
+		}
+		err = e.UpdateIndexRule(context.Background(), &idxRule)
+		if err != nil {
+			return err
+		}
+	}
+
+	return nil
+}
+
+func (e *etcdSchemaRegistry) Close() error {
+	e.server.Close()
+	return nil
+}
+
+func NewEtcdSchemaRegistry(options ...RegistryOption) (Registry, error) {
+	registryConfig := &etcdSchemaRegistryConfig{
+		rootDir:           os.TempDir(),
+		listenerClientURL: embed.DefaultListenClientURLs,
+		listenerPeerURL:   embed.DefaultListenPeerURLs,
+	}
+	for _, opt := range options {
+		opt(registryConfig)
+	}
+	// TODO: allow use cluster setting
+	embedConfig := newStandaloneEtcdConfig(registryConfig)
+	e, err := embed.StartEtcd(embedConfig)
+	if err != nil {
+		return nil, err
+	}
+	if e != nil {
+		<-e.Server.ReadyNotify() // wait for e.Server to join the cluster
+	}
+	client, err := clientv3.NewFromURL(e.Config().ACUrls[0].String())
+	if err != nil {
+		return nil, err
+	}
+	kvClient := clientv3.NewKV(client)
+	reg := &etcdSchemaRegistry{
+		server: e,
+		kv:     kvClient,
+	}
+	if registryConfig.preload {
+		err := reg.preload()
+		if err != nil {
+			return nil, err
+		}
+	}
+	return reg, nil
+}
+
+func (e *etcdSchemaRegistry) get(ctx context.Context, key string, message proto.Message) error {
+	resp, err := e.kv.Get(ctx, key)
+	if err != nil {
+		return err
+	}
+	if resp.Count == 0 {
+		return ErrEntityNotFound
+	}
+	if resp.Count > 1 {
+		return ErrUnexpectedNumberOfEntities
+	}
+	if err := proto.Unmarshal(resp.Kvs[0].Value, message); err != nil {
+		return err
+	}
+	return nil
+}
+
+func (e *etcdSchemaRegistry) update(ctx context.Context, key string, message proto.Message) error {
+	val, err := proto.Marshal(message)
+	if err != nil {
+		return err
+	}
+	_, err = e.kv.Put(ctx, key, string(val))
+	if err != nil {
+		return err
+	}
+	return nil
+}
+
+func (e *etcdSchemaRegistry) listWithPrefix(ctx context.Context, prefix string, factory func() proto.Message) ([]proto.Message, error) {
+	resp, err := e.kv.Get(ctx, prefix, clientv3.WithFromKey(), clientv3.WithRange(incrementLastByte(prefix)))
+	if err != nil {
+		return nil, err
+	}
+	if resp.Count == 0 {
+		return nil, ErrEntityNotFound
+	}
+	entities := make([]proto.Message, resp.Count)
+	for i := int64(0); i < resp.Count; i++ {
+		message := factory()
+		if err := proto.Unmarshal(resp.Kvs[0].Value, message); err != nil {
+			return nil, err
+		}
+		entities[i] = message
+	}
+	return entities, nil
+}
+
+func (e *etcdSchemaRegistry) delete(ctx context.Context, key string) (bool, error) {
+	resp, err := e.kv.Delete(ctx, key)
+	if err != nil {
+		return false, err
+	}
+	return resp.Deleted > 0, nil
+}
+
+func formatIndexRuleKey(metadata *commonv1.Metadata) string {
+	return formatKey(IndexRuleKeyPrefix, metadata)
+}
+
+func formatIndexRuleBindingKey(metadata *commonv1.Metadata) string {
+	return formatKey(IndexRuleBindingKeyPrefix, metadata)
+}
+
+func formatSteamKey(metadata *commonv1.Metadata) string {
+	return formatKey(StreamKeyPrefix, metadata)
+}
+
+func formatMeasureKey(metadata *commonv1.Metadata) string {
+	return formatKey(MeasureKeyPrefix, metadata)
+}
+
+func formatKey(prefix string, metadata *commonv1.Metadata) string {
+	return prefix + metadata.GetGroup() + "/" + metadata.GetName()
+}
+
+func formatGroupKey(group string) string {
+	return GroupsKeyPrefix + group
+}
+
+func incrementLastByte(key string) string {
+	bb := []byte(key)
+	bb[len(bb)-1]++
+	return string(bb)
+}
+
+func newStandaloneEtcdConfig(config *etcdSchemaRegistryConfig) *embed.Config {
+	cfg := embed.NewConfig()
+	// TODO: allow user to set path
+	cfg.Dir = filepath.Join(config.rootDir, "embed-etcd")

Review comment:
       Fixed




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-banyandb] lujiajing1126 commented on a change in pull request #55: Feat: metadata registry with embedded etcd

Posted by GitBox <gi...@apache.org>.
lujiajing1126 commented on a change in pull request #55:
URL: https://github.com/apache/skywalking-banyandb/pull/55#discussion_r730591070



##########
File path: pkg/query/logical/analyzer.go
##########
@@ -77,25 +78,49 @@ func (t *Tag) GetFamilyName() string {
 }
 
 type Analyzer struct {
-	indexRuleRepo        metadataSchema.IndexRule
-	indexRuleBindingRepo metadataSchema.IndexRuleBinding
-	metadataRepoImpl     metadata.Repo
+	metadataRepoImpl metadata.Repo
 }
 
-func DefaultAnalyzer() *Analyzer {
-	indexRule, _ := metadataSchema.NewIndexRule()
-	indexRuleBinding, _ := metadataSchema.NewIndexRuleBinding()
-	metadataService, _ := metadata.NewService(context.TODO())
+// DefaultAnalyzer creates a default analyzer for testing.
+// You have to close the underlying metadata after test
+func DefaultAnalyzer() (*Analyzer, func(), error) {

Review comment:
       Moved




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-banyandb] lujiajing1126 commented on a change in pull request #55: Feat: metadata registry with embedded etcd

Posted by GitBox <gi...@apache.org>.
lujiajing1126 commented on a change in pull request #55:
URL: https://github.com/apache/skywalking-banyandb/pull/55#discussion_r730593657



##########
File path: banyand/metadata/schema/etcd.go
##########
@@ -0,0 +1,520 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package schema
+
+import (
+	"context"
+	"fmt"
+	"math/rand"
+	"net/url"
+	"os"
+	"path"
+	"path/filepath"
+
+	googleUUID "github.com/google/uuid"
+	"github.com/pkg/errors"
+	clientv3 "go.etcd.io/etcd/client/v3"
+	"go.etcd.io/etcd/server/v3/embed"
+	"google.golang.org/protobuf/encoding/protojson"
+	"google.golang.org/protobuf/proto"
+
+	commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+	databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+)
+
+var (
+	_ Stream           = (*etcdSchemaRegistry)(nil)
+	_ IndexRuleBinding = (*etcdSchemaRegistry)(nil)
+	_ IndexRule        = (*etcdSchemaRegistry)(nil)
+	_ Measure          = (*etcdSchemaRegistry)(nil)
+	_ Group            = (*etcdSchemaRegistry)(nil)
+
+	ErrEntityNotFound             = errors.New("entity is not found")
+	ErrUnexpectedNumberOfEntities = errors.New("unexpected number of entities")
+	ErrGroupAlreadyDefined        = errors.New("group is already defined")
+	ErrGroupNotDefined            = errors.New("group is not defined or has already been deleted")
+
+	StreamKeyPrefix           = "/stream/"
+	IndexRuleBindingKeyPrefix = "/index-rule-binding/"
+	IndexRuleKeyPrefix        = "/index-rule/"
+	MeasureKeyPrefix          = "/measure/"
+	GroupsKeyPrefix           = "/groups/"
+)
+
+type RegistryOption func(*etcdSchemaRegistryConfig)
+
+func PreloadSchema() RegistryOption {
+	return func(config *etcdSchemaRegistryConfig) {
+		config.preload = true
+	}
+}
+
+func UseRandomTempDir() RegistryOption {
+	return func(config *etcdSchemaRegistryConfig) {
+		config.rootDir = RandomTempDir()
+	}
+}
+
+func RandomTempDir() string {
+	return path.Join(os.TempDir(), fmt.Sprintf("banyandb-embed-etcd-%s", googleUUID.New().String()))
+}

Review comment:
       Done

##########
File path: banyand/metadata/schema/etcd.go
##########
@@ -0,0 +1,520 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package schema
+
+import (
+	"context"
+	"fmt"
+	"math/rand"
+	"net/url"
+	"os"
+	"path"
+	"path/filepath"
+
+	googleUUID "github.com/google/uuid"
+	"github.com/pkg/errors"
+	clientv3 "go.etcd.io/etcd/client/v3"
+	"go.etcd.io/etcd/server/v3/embed"
+	"google.golang.org/protobuf/encoding/protojson"
+	"google.golang.org/protobuf/proto"
+
+	commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+	databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+)
+
+var (
+	_ Stream           = (*etcdSchemaRegistry)(nil)
+	_ IndexRuleBinding = (*etcdSchemaRegistry)(nil)
+	_ IndexRule        = (*etcdSchemaRegistry)(nil)
+	_ Measure          = (*etcdSchemaRegistry)(nil)
+	_ Group            = (*etcdSchemaRegistry)(nil)
+
+	ErrEntityNotFound             = errors.New("entity is not found")
+	ErrUnexpectedNumberOfEntities = errors.New("unexpected number of entities")
+	ErrGroupAlreadyDefined        = errors.New("group is already defined")
+	ErrGroupNotDefined            = errors.New("group is not defined or has already been deleted")
+
+	StreamKeyPrefix           = "/stream/"
+	IndexRuleBindingKeyPrefix = "/index-rule-binding/"
+	IndexRuleKeyPrefix        = "/index-rule/"
+	MeasureKeyPrefix          = "/measure/"
+	GroupsKeyPrefix           = "/groups/"
+)
+
+type RegistryOption func(*etcdSchemaRegistryConfig)
+
+func PreloadSchema() RegistryOption {
+	return func(config *etcdSchemaRegistryConfig) {
+		config.preload = true
+	}
+}
+
+func UseRandomTempDir() RegistryOption {
+	return func(config *etcdSchemaRegistryConfig) {
+		config.rootDir = RandomTempDir()
+	}
+}
+
+func RandomTempDir() string {
+	return path.Join(os.TempDir(), fmt.Sprintf("banyandb-embed-etcd-%s", googleUUID.New().String()))
+}
+
+func RootDir(rootDir string) RegistryOption {
+	return func(config *etcdSchemaRegistryConfig) {
+		config.rootDir = rootDir
+	}
+}
+
+func UseListener(client, peer string) RegistryOption {
+	return func(config *etcdSchemaRegistryConfig) {
+		config.listenerClientURL = client
+		config.listenerPeerURL = peer
+	}
+}
+
+func UseUnixDomain() RegistryOption {
+	return func(config *etcdSchemaRegistryConfig) {
+		config.listenerClientURL, config.listenerPeerURL = RandomUnixDomainListener()
+	}
+}
+
+func RandomUnixDomainListener() (string, string) {
+	i := rand.Uint64()
+	return fmt.Sprintf("%s://localhost:%d%06d", "unix", os.Getpid(), i), fmt.Sprintf("%s://localhost:%d%06d", "unix", os.Getpid(), i+1)

Review comment:
       Done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-banyandb] lujiajing1126 commented on a change in pull request #55: Feat: metadata registry with embedded etcd

Posted by GitBox <gi...@apache.org>.
lujiajing1126 commented on a change in pull request #55:
URL: https://github.com/apache/skywalking-banyandb/pull/55#discussion_r730711665



##########
File path: banyand/metadata/metadata.go
##########
@@ -37,50 +39,99 @@ type IndexFilter interface {
 
 type Repo interface {
 	IndexFilter
-	Stream() schema.Stream
+	StreamRegistry() schema.Stream
+	IndexRuleRegistry() schema.IndexRule
+	IndexRuleBindingRegistry() schema.IndexRuleBinding
+	MeasureRegistry() schema.Measure
+	GroupRegistry() schema.Group
 }
 
 type Service interface {
 	Repo
-	run.Unit
+	run.PreRunner
+	run.Service
+	run.Config
 }
 
 type service struct {
-	stream           schema.Stream
-	indexRule        schema.IndexRule
-	indexRuleBinding schema.IndexRuleBinding
+	schemaRegistry    schema.Registry
+	stopCh            chan struct{}
+	clientListenerURL string
+	peerListenerURL   string
+	rootDir           string
 }
 
-func NewService(_ context.Context) (Service, error) {
-	stream, err := schema.NewStream()
-	if err != nil {
-		return nil, err
+func (s *service) FlagSet() *run.FlagSet {
+	fs := run.NewFlagSet("metadata")
+	fs.StringVarP(&s.clientListenerURL, "listener-client-url", "", embed.DefaultListenClientURLs,
+		"listener for client")
+	fs.StringVarP(&s.peerListenerURL, "listener-peer-url", "", embed.DefaultListenPeerURLs,
+		"listener for peer")
+	fs.StringVarP(&s.rootDir, "etcd-root-path", "", "/tmp", "the root path of database")
+	return fs
+}
+
+func (s *service) Validate() error {
+	if s.clientListenerURL == "" || s.peerListenerURL == "" {
+		return errors.New("listener cannot be set to empty")
 	}
-	indexRule, err := schema.NewIndexRule()
-	if err != nil {
-		return nil, err
+	if s.rootDir == "" {
+		return errors.New("rootDir is empty")
 	}
-	indexRuleBinding, err := schema.NewIndexRuleBinding()
-	if err != nil {
-		return nil, err
+	return nil
+}
+
+func (s *service) PreRun() error {
+	var err error
+	s.schemaRegistry, err = schema.NewEtcdSchemaRegistry(schema.PreloadSchema(),
+		schema.UseListener(s.clientListenerURL, s.peerListenerURL),
+		schema.RootDir(s.rootDir))
+	return err
+}
+
+func (s *service) Serve() error {
+	s.stopCh = make(chan struct{})

Review comment:
       Done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-banyandb] lujiajing1126 commented on a change in pull request #55: Feat: metadata registry with embedded etcd

Posted by GitBox <gi...@apache.org>.
lujiajing1126 commented on a change in pull request #55:
URL: https://github.com/apache/skywalking-banyandb/pull/55#discussion_r730711889



##########
File path: banyand/metadata/schema/etcd.go
##########
@@ -0,0 +1,520 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package schema
+
+import (
+	"context"
+	"fmt"
+	"math/rand"
+	"net/url"
+	"os"
+	"path"
+	"path/filepath"
+
+	googleUUID "github.com/google/uuid"
+	"github.com/pkg/errors"
+	clientv3 "go.etcd.io/etcd/client/v3"
+	"go.etcd.io/etcd/server/v3/embed"
+	"google.golang.org/protobuf/encoding/protojson"
+	"google.golang.org/protobuf/proto"
+
+	commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+	databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+)
+
+var (
+	_ Stream           = (*etcdSchemaRegistry)(nil)
+	_ IndexRuleBinding = (*etcdSchemaRegistry)(nil)
+	_ IndexRule        = (*etcdSchemaRegistry)(nil)
+	_ Measure          = (*etcdSchemaRegistry)(nil)
+	_ Group            = (*etcdSchemaRegistry)(nil)
+
+	ErrEntityNotFound             = errors.New("entity is not found")
+	ErrUnexpectedNumberOfEntities = errors.New("unexpected number of entities")
+	ErrGroupAlreadyDefined        = errors.New("group is already defined")
+	ErrGroupNotDefined            = errors.New("group is not defined or has already been deleted")
+
+	StreamKeyPrefix           = "/stream/"
+	IndexRuleBindingKeyPrefix = "/index-rule-binding/"
+	IndexRuleKeyPrefix        = "/index-rule/"
+	MeasureKeyPrefix          = "/measure/"
+	GroupsKeyPrefix           = "/groups/"
+)
+
+type RegistryOption func(*etcdSchemaRegistryConfig)
+
+func PreloadSchema() RegistryOption {
+	return func(config *etcdSchemaRegistryConfig) {
+		config.preload = true
+	}
+}
+
+func UseRandomTempDir() RegistryOption {
+	return func(config *etcdSchemaRegistryConfig) {
+		config.rootDir = RandomTempDir()
+	}
+}
+
+func RandomTempDir() string {
+	return path.Join(os.TempDir(), fmt.Sprintf("banyandb-embed-etcd-%s", googleUUID.New().String()))
+}
+
+func RootDir(rootDir string) RegistryOption {
+	return func(config *etcdSchemaRegistryConfig) {
+		config.rootDir = rootDir
+	}
+}
+
+func UseListener(client, peer string) RegistryOption {
+	return func(config *etcdSchemaRegistryConfig) {
+		config.listenerClientURL = client
+		config.listenerPeerURL = peer
+	}
+}
+
+func UseUnixDomain() RegistryOption {
+	return func(config *etcdSchemaRegistryConfig) {
+		config.listenerClientURL, config.listenerPeerURL = RandomUnixDomainListener()
+	}
+}
+
+func RandomUnixDomainListener() (string, string) {
+	i := rand.Uint64()
+	return fmt.Sprintf("%s://localhost:%d%06d", "unix", os.Getpid(), i), fmt.Sprintf("%s://localhost:%d%06d", "unix", os.Getpid(), i+1)
+}
+
+type etcdSchemaRegistry struct {
+	server *embed.Etcd
+	kv     clientv3.KV
+}
+
+type etcdSchemaRegistryConfig struct {
+	// preload internal schema
+	preload bool
+	// rootDir is the root directory for etcd storage
+	rootDir string
+	// listenerClientURL is the listener for client
+	listenerClientURL string
+	// listenerPeerURL is the listener for peer
+	listenerPeerURL string
+}
+
+func (e *etcdSchemaRegistry) ExistGroup(ctx context.Context, group string) (bool, error) {
+	var entity commonv1.Group
+	err := e.get(ctx, formatGroupKey(group), &entity)
+	if err != nil && !errors.Is(err, ErrEntityNotFound) {
+		return false, err
+	}
+	return entity.GetName() != "" && !entity.Deleted, nil

Review comment:
       Removed




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-banyandb] lujiajing1126 commented on pull request #55: Feat: metadata registry with embedded etcd

Posted by GitBox <gi...@apache.org>.
lujiajing1126 commented on pull request #55:
URL: https://github.com/apache/skywalking-banyandb/pull/55#issuecomment-945898273


   > Some suggestions about how to organize data in etcd.
   
   All suggestions applied


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-banyandb] hanahmily commented on a change in pull request #55: Feat: metadata registry with embedded etcd

Posted by GitBox <gi...@apache.org>.
hanahmily commented on a change in pull request #55:
URL: https://github.com/apache/skywalking-banyandb/pull/55#discussion_r730617348



##########
File path: banyand/metadata/schema/etcd.go
##########
@@ -0,0 +1,520 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package schema
+
+import (
+	"context"
+	"fmt"
+	"math/rand"
+	"net/url"
+	"os"
+	"path"
+	"path/filepath"
+
+	googleUUID "github.com/google/uuid"
+	"github.com/pkg/errors"
+	clientv3 "go.etcd.io/etcd/client/v3"
+	"go.etcd.io/etcd/server/v3/embed"
+	"google.golang.org/protobuf/encoding/protojson"
+	"google.golang.org/protobuf/proto"
+
+	commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+	databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+)
+
+var (
+	_ Stream           = (*etcdSchemaRegistry)(nil)
+	_ IndexRuleBinding = (*etcdSchemaRegistry)(nil)
+	_ IndexRule        = (*etcdSchemaRegistry)(nil)
+	_ Measure          = (*etcdSchemaRegistry)(nil)
+	_ Group            = (*etcdSchemaRegistry)(nil)
+
+	ErrEntityNotFound             = errors.New("entity is not found")
+	ErrUnexpectedNumberOfEntities = errors.New("unexpected number of entities")
+	ErrGroupAlreadyDefined        = errors.New("group is already defined")
+	ErrGroupNotDefined            = errors.New("group is not defined or has already been deleted")
+
+	StreamKeyPrefix           = "/stream/"
+	IndexRuleBindingKeyPrefix = "/index-rule-binding/"
+	IndexRuleKeyPrefix        = "/index-rule/"
+	MeasureKeyPrefix          = "/measure/"
+	GroupsKeyPrefix           = "/groups/"
+)
+
+type RegistryOption func(*etcdSchemaRegistryConfig)
+
+func PreloadSchema() RegistryOption {
+	return func(config *etcdSchemaRegistryConfig) {
+		config.preload = true
+	}
+}
+
+func UseRandomTempDir() RegistryOption {
+	return func(config *etcdSchemaRegistryConfig) {
+		config.rootDir = RandomTempDir()
+	}
+}
+
+func RandomTempDir() string {
+	return path.Join(os.TempDir(), fmt.Sprintf("banyandb-embed-etcd-%s", googleUUID.New().String()))
+}
+
+func RootDir(rootDir string) RegistryOption {
+	return func(config *etcdSchemaRegistryConfig) {
+		config.rootDir = rootDir
+	}
+}
+
+func UseListener(client, peer string) RegistryOption {
+	return func(config *etcdSchemaRegistryConfig) {
+		config.listenerClientURL = client
+		config.listenerPeerURL = peer
+	}
+}
+
+func UseUnixDomain() RegistryOption {
+	return func(config *etcdSchemaRegistryConfig) {
+		config.listenerClientURL, config.listenerPeerURL = RandomUnixDomainListener()
+	}
+}
+
+func RandomUnixDomainListener() (string, string) {
+	i := rand.Uint64()
+	return fmt.Sprintf("%s://localhost:%d%06d", "unix", os.Getpid(), i), fmt.Sprintf("%s://localhost:%d%06d", "unix", os.Getpid(), i+1)
+}
+
+type etcdSchemaRegistry struct {
+	server *embed.Etcd
+	kv     clientv3.KV
+}
+
+type etcdSchemaRegistryConfig struct {
+	// preload internal schema
+	preload bool
+	// rootDir is the root directory for etcd storage
+	rootDir string
+	// listenerClientURL is the listener for client
+	listenerClientURL string
+	// listenerPeerURL is the listener for peer
+	listenerPeerURL string
+}
+
+func (e *etcdSchemaRegistry) ExistGroup(ctx context.Context, group string) (bool, error) {

Review comment:
       yep, should contain the below items at least:
   
      * group  name
      * updated time




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-banyandb] lujiajing1126 commented on a change in pull request #55: Feat: metadata registry with embedded etcd

Posted by GitBox <gi...@apache.org>.
lujiajing1126 commented on a change in pull request #55:
URL: https://github.com/apache/skywalking-banyandb/pull/55#discussion_r731050751



##########
File path: banyand/metadata/metadata.go
##########
@@ -37,50 +39,99 @@ type IndexFilter interface {
 
 type Repo interface {
 	IndexFilter
-	Stream() schema.Stream
+	StreamRegistry() schema.Stream
+	IndexRuleRegistry() schema.IndexRule
+	IndexRuleBindingRegistry() schema.IndexRuleBinding
+	MeasureRegistry() schema.Measure
+	GroupRegistry() schema.Group
 }
 
 type Service interface {
 	Repo
-	run.Unit
+	run.PreRunner
+	run.Service
+	run.Config
 }
 
 type service struct {
-	stream           schema.Stream
-	indexRule        schema.IndexRule
-	indexRuleBinding schema.IndexRuleBinding
+	schemaRegistry    schema.Registry
+	stopCh            chan struct{}
+	clientListenerURL string
+	peerListenerURL   string
+	rootDir           string
 }
 
-func NewService(_ context.Context) (Service, error) {
-	stream, err := schema.NewStream()
-	if err != nil {
-		return nil, err
+func (s *service) FlagSet() *run.FlagSet {
+	fs := run.NewFlagSet("metadata")
+	fs.StringVarP(&s.clientListenerURL, "listener-client-url", "", embed.DefaultListenClientURLs,
+		"listener for client")
+	fs.StringVarP(&s.peerListenerURL, "listener-peer-url", "", embed.DefaultListenPeerURLs,
+		"listener for peer")

Review comment:
       Removed

##########
File path: banyand/metadata/schema/etcd.go
##########
@@ -0,0 +1,520 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package schema
+
+import (
+	"context"
+	"fmt"
+	"math/rand"
+	"net/url"
+	"os"
+	"path"
+	"path/filepath"
+
+	googleUUID "github.com/google/uuid"
+	"github.com/pkg/errors"
+	clientv3 "go.etcd.io/etcd/client/v3"
+	"go.etcd.io/etcd/server/v3/embed"
+	"google.golang.org/protobuf/encoding/protojson"
+	"google.golang.org/protobuf/proto"
+
+	commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+	databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+)
+
+var (
+	_ Stream           = (*etcdSchemaRegistry)(nil)
+	_ IndexRuleBinding = (*etcdSchemaRegistry)(nil)
+	_ IndexRule        = (*etcdSchemaRegistry)(nil)
+	_ Measure          = (*etcdSchemaRegistry)(nil)
+	_ Group            = (*etcdSchemaRegistry)(nil)
+
+	ErrEntityNotFound             = errors.New("entity is not found")
+	ErrUnexpectedNumberOfEntities = errors.New("unexpected number of entities")
+	ErrGroupAlreadyDefined        = errors.New("group is already defined")
+	ErrGroupNotDefined            = errors.New("group is not defined or has already been deleted")
+
+	StreamKeyPrefix           = "/stream/"
+	IndexRuleBindingKeyPrefix = "/index-rule-binding/"
+	IndexRuleKeyPrefix        = "/index-rule/"
+	MeasureKeyPrefix          = "/measure/"
+	GroupsKeyPrefix           = "/groups/"
+)
+
+type RegistryOption func(*etcdSchemaRegistryConfig)
+
+func PreloadSchema() RegistryOption {

Review comment:
       Done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-banyandb] lujiajing1126 commented on a change in pull request #55: Feat: metadata registry with embedded etcd

Posted by GitBox <gi...@apache.org>.
lujiajing1126 commented on a change in pull request #55:
URL: https://github.com/apache/skywalking-banyandb/pull/55#discussion_r730591855



##########
File path: banyand/metadata/schema/etcd.go
##########
@@ -0,0 +1,520 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package schema
+
+import (
+	"context"
+	"fmt"
+	"math/rand"
+	"net/url"
+	"os"
+	"path"
+	"path/filepath"
+
+	googleUUID "github.com/google/uuid"
+	"github.com/pkg/errors"
+	clientv3 "go.etcd.io/etcd/client/v3"
+	"go.etcd.io/etcd/server/v3/embed"
+	"google.golang.org/protobuf/encoding/protojson"
+	"google.golang.org/protobuf/proto"
+
+	commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+	databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+)
+
+var (
+	_ Stream           = (*etcdSchemaRegistry)(nil)
+	_ IndexRuleBinding = (*etcdSchemaRegistry)(nil)
+	_ IndexRule        = (*etcdSchemaRegistry)(nil)
+	_ Measure          = (*etcdSchemaRegistry)(nil)
+	_ Group            = (*etcdSchemaRegistry)(nil)
+
+	ErrEntityNotFound             = errors.New("entity is not found")
+	ErrUnexpectedNumberOfEntities = errors.New("unexpected number of entities")
+	ErrGroupAlreadyDefined        = errors.New("group is already defined")
+	ErrGroupNotDefined            = errors.New("group is not defined or has already been deleted")
+
+	StreamKeyPrefix           = "/stream/"
+	IndexRuleBindingKeyPrefix = "/index-rule-binding/"
+	IndexRuleKeyPrefix        = "/index-rule/"
+	MeasureKeyPrefix          = "/measure/"
+	GroupsKeyPrefix           = "/groups/"
+)
+
+type RegistryOption func(*etcdSchemaRegistryConfig)
+
+func PreloadSchema() RegistryOption {

Review comment:
       > banyandb should pre-load anything. That's the OAP client's work.
   
   You meant `nothing`? But we have to preload everything for test.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-banyandb] lujiajing1126 commented on a change in pull request #55: Feat: metadata registry with embedded etcd

Posted by GitBox <gi...@apache.org>.
lujiajing1126 commented on a change in pull request #55:
URL: https://github.com/apache/skywalking-banyandb/pull/55#discussion_r730593786



##########
File path: banyand/metadata/metadata.go
##########
@@ -37,50 +39,99 @@ type IndexFilter interface {
 
 type Repo interface {
 	IndexFilter
-	Stream() schema.Stream
+	StreamRegistry() schema.Stream
+	IndexRuleRegistry() schema.IndexRule
+	IndexRuleBindingRegistry() schema.IndexRuleBinding
+	MeasureRegistry() schema.Measure
+	GroupRegistry() schema.Group
 }
 
 type Service interface {
 	Repo
-	run.Unit
+	run.PreRunner
+	run.Service
+	run.Config
 }
 
 type service struct {
-	stream           schema.Stream
-	indexRule        schema.IndexRule
-	indexRuleBinding schema.IndexRuleBinding
+	schemaRegistry    schema.Registry
+	stopCh            chan struct{}
+	clientListenerURL string
+	peerListenerURL   string
+	rootDir           string
 }
 
-func NewService(_ context.Context) (Service, error) {
-	stream, err := schema.NewStream()
-	if err != nil {
-		return nil, err
+func (s *service) FlagSet() *run.FlagSet {
+	fs := run.NewFlagSet("metadata")
+	fs.StringVarP(&s.clientListenerURL, "listener-client-url", "", embed.DefaultListenClientURLs,
+		"listener for client")
+	fs.StringVarP(&s.peerListenerURL, "listener-peer-url", "", embed.DefaultListenPeerURLs,
+		"listener for peer")
+	fs.StringVarP(&s.rootDir, "etcd-root-path", "", "/tmp", "the root path of database")

Review comment:
       Done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-banyandb] lujiajing1126 commented on a change in pull request #55: Feat: metadata registry with embedded etcd

Posted by GitBox <gi...@apache.org>.
lujiajing1126 commented on a change in pull request #55:
URL: https://github.com/apache/skywalking-banyandb/pull/55#discussion_r729013058



##########
File path: api/proto/banyandb/database/v1/rpc.proto
##########
@@ -0,0 +1,77 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+syntax = "proto3";
+
+option java_package = "org.apache.skywalking.banyandb.database.v1";
+option go_package = "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1";
+
+package banyandb.database.v1;
+
+import "banyandb/database/v1/schema.proto";
+import "banyandb/common/v1/common.proto";
+import "google/protobuf/struct.proto";
+
+// EntityType includes three kinds of entities (schema)
+// that can be create/delete/modified at runtime.
+enum EntityType {
+  ENTITY_TYPE_UNSPECIFIED = 0;
+  ENTITY_TYPE_STREAM = 1;
+  ENTITY_TYPE_INDEX_RULE_BINDING = 2;
+  ENTITY_TYPE_INDEX_RULE = 3;
+}
+
+// EntityCreateOrUpdateRequest is the request for creating
+// or updating entity.
+message EntityCreateOrUpdateRequest {
+  oneof entity {
+    banyandb.database.v1.Stream stream = 1;
+    banyandb.database.v1.IndexRuleBinding index_rule_binding = 2;
+    banyandb.database.v1.IndexRule index_rule = 3;
+  }
+}
+
+message EntityCreateOrUpdateResponse {
+}
+
+// GeneralEntityRequest can be used to delete, get entity
+// with the given entity_type and its metadata.
+message GeneralEntityRequest {
+  banyandb.common.v1.Metadata metadata = 1;
+  EntityType entity_type = 2;
+}
+
+message EntityDeleteResponse {
+}
+
+// EntityGetResponse is the response for Get method.
+// It contains an entity which is possibly nullable which
+// means the entity cannot be found
+message EntityGetResponse {
+  oneof entity {
+    google.protobuf.NullValue null = 1;
+    banyandb.database.v1.Stream stream = 2;
+    banyandb.database.v1.IndexRuleBinding index_rule_binding = 3;
+    banyandb.database.v1.IndexRule index_rule = 4;
+  }
+}
+
+service EntityRegistry {

Review comment:
       Could you pls check the protocols again?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-banyandb] hanahmily commented on a change in pull request #55: Feat: metadata registry with embedded etcd

Posted by GitBox <gi...@apache.org>.
hanahmily commented on a change in pull request #55:
URL: https://github.com/apache/skywalking-banyandb/pull/55#discussion_r730234010



##########
File path: banyand/metadata/schema/etcd.go
##########
@@ -0,0 +1,520 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package schema
+
+import (
+	"context"
+	"fmt"
+	"math/rand"
+	"net/url"
+	"os"
+	"path"
+	"path/filepath"
+
+	googleUUID "github.com/google/uuid"
+	"github.com/pkg/errors"
+	clientv3 "go.etcd.io/etcd/client/v3"
+	"go.etcd.io/etcd/server/v3/embed"
+	"google.golang.org/protobuf/encoding/protojson"
+	"google.golang.org/protobuf/proto"
+
+	commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+	databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+)
+
+var (
+	_ Stream           = (*etcdSchemaRegistry)(nil)
+	_ IndexRuleBinding = (*etcdSchemaRegistry)(nil)
+	_ IndexRule        = (*etcdSchemaRegistry)(nil)
+	_ Measure          = (*etcdSchemaRegistry)(nil)
+	_ Group            = (*etcdSchemaRegistry)(nil)
+
+	ErrEntityNotFound             = errors.New("entity is not found")
+	ErrUnexpectedNumberOfEntities = errors.New("unexpected number of entities")
+	ErrGroupAlreadyDefined        = errors.New("group is already defined")
+	ErrGroupNotDefined            = errors.New("group is not defined or has already been deleted")
+
+	StreamKeyPrefix           = "/stream/"
+	IndexRuleBindingKeyPrefix = "/index-rule-binding/"
+	IndexRuleKeyPrefix        = "/index-rule/"
+	MeasureKeyPrefix          = "/measure/"
+	GroupsKeyPrefix           = "/groups/"

Review comment:
       How about using [restful resource naming](https://restfulapi.net/resource-naming/): `groups/<group_name>/streams|index-rule-bindings|.../<stream_name|index-rule-binding_name|...>`

##########
File path: banyand/metadata/schema/etcd.go
##########
@@ -0,0 +1,520 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package schema
+
+import (
+	"context"
+	"fmt"
+	"math/rand"
+	"net/url"
+	"os"
+	"path"
+	"path/filepath"
+
+	googleUUID "github.com/google/uuid"
+	"github.com/pkg/errors"
+	clientv3 "go.etcd.io/etcd/client/v3"
+	"go.etcd.io/etcd/server/v3/embed"
+	"google.golang.org/protobuf/encoding/protojson"
+	"google.golang.org/protobuf/proto"
+
+	commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+	databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+)
+
+var (
+	_ Stream           = (*etcdSchemaRegistry)(nil)
+	_ IndexRuleBinding = (*etcdSchemaRegistry)(nil)
+	_ IndexRule        = (*etcdSchemaRegistry)(nil)
+	_ Measure          = (*etcdSchemaRegistry)(nil)
+	_ Group            = (*etcdSchemaRegistry)(nil)
+
+	ErrEntityNotFound             = errors.New("entity is not found")
+	ErrUnexpectedNumberOfEntities = errors.New("unexpected number of entities")
+	ErrGroupAlreadyDefined        = errors.New("group is already defined")
+	ErrGroupNotDefined            = errors.New("group is not defined or has already been deleted")
+
+	StreamKeyPrefix           = "/stream/"
+	IndexRuleBindingKeyPrefix = "/index-rule-binding/"
+	IndexRuleKeyPrefix        = "/index-rule/"
+	MeasureKeyPrefix          = "/measure/"
+	GroupsKeyPrefix           = "/groups/"
+)
+
+type RegistryOption func(*etcdSchemaRegistryConfig)
+
+func PreloadSchema() RegistryOption {
+	return func(config *etcdSchemaRegistryConfig) {
+		config.preload = true
+	}
+}
+
+func UseRandomTempDir() RegistryOption {
+	return func(config *etcdSchemaRegistryConfig) {
+		config.rootDir = RandomTempDir()
+	}
+}
+
+func RandomTempDir() string {
+	return path.Join(os.TempDir(), fmt.Sprintf("banyandb-embed-etcd-%s", googleUUID.New().String()))
+}
+
+func RootDir(rootDir string) RegistryOption {
+	return func(config *etcdSchemaRegistryConfig) {
+		config.rootDir = rootDir
+	}
+}
+
+func UseListener(client, peer string) RegistryOption {
+	return func(config *etcdSchemaRegistryConfig) {
+		config.listenerClientURL = client
+		config.listenerPeerURL = peer
+	}
+}
+
+func UseUnixDomain() RegistryOption {
+	return func(config *etcdSchemaRegistryConfig) {
+		config.listenerClientURL, config.listenerPeerURL = RandomUnixDomainListener()
+	}
+}
+
+func RandomUnixDomainListener() (string, string) {
+	i := rand.Uint64()
+	return fmt.Sprintf("%s://localhost:%d%06d", "unix", os.Getpid(), i), fmt.Sprintf("%s://localhost:%d%06d", "unix", os.Getpid(), i+1)
+}
+
+type etcdSchemaRegistry struct {
+	server *embed.Etcd
+	kv     clientv3.KV
+}
+
+type etcdSchemaRegistryConfig struct {
+	// preload internal schema
+	preload bool
+	// rootDir is the root directory for etcd storage
+	rootDir string
+	// listenerClientURL is the listener for client
+	listenerClientURL string
+	// listenerPeerURL is the listener for peer
+	listenerPeerURL string
+}
+
+func (e *etcdSchemaRegistry) ExistGroup(ctx context.Context, group string) (bool, error) {
+	var entity commonv1.Group
+	err := e.get(ctx, formatGroupKey(group), &entity)
+	if err != nil && !errors.Is(err, ErrEntityNotFound) {
+		return false, err
+	}
+	return entity.GetName() != "" && !entity.Deleted, nil

Review comment:
       We don't need a `deleted` flag here, while we should support delete all resources in the same group instead.

##########
File path: banyand/metadata/metadata.go
##########
@@ -37,50 +39,99 @@ type IndexFilter interface {
 
 type Repo interface {
 	IndexFilter
-	Stream() schema.Stream
+	StreamRegistry() schema.Stream
+	IndexRuleRegistry() schema.IndexRule
+	IndexRuleBindingRegistry() schema.IndexRuleBinding
+	MeasureRegistry() schema.Measure
+	GroupRegistry() schema.Group
 }
 
 type Service interface {
 	Repo
-	run.Unit
+	run.PreRunner
+	run.Service
+	run.Config
 }
 
 type service struct {
-	stream           schema.Stream
-	indexRule        schema.IndexRule
-	indexRuleBinding schema.IndexRuleBinding
+	schemaRegistry    schema.Registry
+	stopCh            chan struct{}
+	clientListenerURL string
+	peerListenerURL   string
+	rootDir           string
 }
 
-func NewService(_ context.Context) (Service, error) {
-	stream, err := schema.NewStream()
-	if err != nil {
-		return nil, err
+func (s *service) FlagSet() *run.FlagSet {
+	fs := run.NewFlagSet("metadata")
+	fs.StringVarP(&s.clientListenerURL, "listener-client-url", "", embed.DefaultListenClientURLs,
+		"listener for client")
+	fs.StringVarP(&s.peerListenerURL, "listener-peer-url", "", embed.DefaultListenPeerURLs,
+		"listener for peer")
+	fs.StringVarP(&s.rootDir, "etcd-root-path", "", "/tmp", "the root path of database")

Review comment:
       Please use `metadata-root-path`. We don't intend to expose which implement banyandb uses for metadata.

##########
File path: banyand/metadata/schema/etcd.go
##########
@@ -0,0 +1,520 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package schema
+
+import (
+	"context"
+	"fmt"
+	"math/rand"
+	"net/url"
+	"os"
+	"path"
+	"path/filepath"
+
+	googleUUID "github.com/google/uuid"
+	"github.com/pkg/errors"
+	clientv3 "go.etcd.io/etcd/client/v3"
+	"go.etcd.io/etcd/server/v3/embed"
+	"google.golang.org/protobuf/encoding/protojson"
+	"google.golang.org/protobuf/proto"
+
+	commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+	databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+)
+
+var (
+	_ Stream           = (*etcdSchemaRegistry)(nil)
+	_ IndexRuleBinding = (*etcdSchemaRegistry)(nil)
+	_ IndexRule        = (*etcdSchemaRegistry)(nil)
+	_ Measure          = (*etcdSchemaRegistry)(nil)
+	_ Group            = (*etcdSchemaRegistry)(nil)
+
+	ErrEntityNotFound             = errors.New("entity is not found")
+	ErrUnexpectedNumberOfEntities = errors.New("unexpected number of entities")
+	ErrGroupAlreadyDefined        = errors.New("group is already defined")
+	ErrGroupNotDefined            = errors.New("group is not defined or has already been deleted")
+
+	StreamKeyPrefix           = "/stream/"
+	IndexRuleBindingKeyPrefix = "/index-rule-binding/"
+	IndexRuleKeyPrefix        = "/index-rule/"
+	MeasureKeyPrefix          = "/measure/"
+	GroupsKeyPrefix           = "/groups/"
+)
+
+type RegistryOption func(*etcdSchemaRegistryConfig)
+
+func PreloadSchema() RegistryOption {

Review comment:
       banyandb should pre-load anything. That's the OAP client's work.

##########
File path: banyand/metadata/schema/etcd.go
##########
@@ -0,0 +1,520 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package schema
+
+import (
+	"context"
+	"fmt"
+	"math/rand"
+	"net/url"
+	"os"
+	"path"
+	"path/filepath"
+
+	googleUUID "github.com/google/uuid"
+	"github.com/pkg/errors"
+	clientv3 "go.etcd.io/etcd/client/v3"
+	"go.etcd.io/etcd/server/v3/embed"
+	"google.golang.org/protobuf/encoding/protojson"
+	"google.golang.org/protobuf/proto"
+
+	commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+	databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+)
+
+var (
+	_ Stream           = (*etcdSchemaRegistry)(nil)
+	_ IndexRuleBinding = (*etcdSchemaRegistry)(nil)
+	_ IndexRule        = (*etcdSchemaRegistry)(nil)
+	_ Measure          = (*etcdSchemaRegistry)(nil)
+	_ Group            = (*etcdSchemaRegistry)(nil)
+
+	ErrEntityNotFound             = errors.New("entity is not found")
+	ErrUnexpectedNumberOfEntities = errors.New("unexpected number of entities")
+	ErrGroupAlreadyDefined        = errors.New("group is already defined")
+	ErrGroupNotDefined            = errors.New("group is not defined or has already been deleted")
+
+	StreamKeyPrefix           = "/stream/"
+	IndexRuleBindingKeyPrefix = "/index-rule-binding/"
+	IndexRuleKeyPrefix        = "/index-rule/"
+	MeasureKeyPrefix          = "/measure/"
+	GroupsKeyPrefix           = "/groups/"

Review comment:
       We can use `WithPrefix` to implement `Group` operations

##########
File path: banyand/metadata/schema/etcd.go
##########
@@ -0,0 +1,520 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package schema
+
+import (
+	"context"
+	"fmt"
+	"math/rand"
+	"net/url"
+	"os"
+	"path"
+	"path/filepath"
+
+	googleUUID "github.com/google/uuid"
+	"github.com/pkg/errors"
+	clientv3 "go.etcd.io/etcd/client/v3"
+	"go.etcd.io/etcd/server/v3/embed"
+	"google.golang.org/protobuf/encoding/protojson"
+	"google.golang.org/protobuf/proto"
+
+	commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+	databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+)
+
+var (
+	_ Stream           = (*etcdSchemaRegistry)(nil)
+	_ IndexRuleBinding = (*etcdSchemaRegistry)(nil)
+	_ IndexRule        = (*etcdSchemaRegistry)(nil)
+	_ Measure          = (*etcdSchemaRegistry)(nil)
+	_ Group            = (*etcdSchemaRegistry)(nil)
+
+	ErrEntityNotFound             = errors.New("entity is not found")
+	ErrUnexpectedNumberOfEntities = errors.New("unexpected number of entities")
+	ErrGroupAlreadyDefined        = errors.New("group is already defined")
+	ErrGroupNotDefined            = errors.New("group is not defined or has already been deleted")
+
+	StreamKeyPrefix           = "/stream/"
+	IndexRuleBindingKeyPrefix = "/index-rule-binding/"
+	IndexRuleKeyPrefix        = "/index-rule/"
+	MeasureKeyPrefix          = "/measure/"
+	GroupsKeyPrefix           = "/groups/"
+)
+
+type RegistryOption func(*etcdSchemaRegistryConfig)
+
+func PreloadSchema() RegistryOption {
+	return func(config *etcdSchemaRegistryConfig) {
+		config.preload = true
+	}
+}
+
+func UseRandomTempDir() RegistryOption {
+	return func(config *etcdSchemaRegistryConfig) {
+		config.rootDir = RandomTempDir()
+	}
+}
+
+func RandomTempDir() string {
+	return path.Join(os.TempDir(), fmt.Sprintf("banyandb-embed-etcd-%s", googleUUID.New().String()))
+}
+
+func RootDir(rootDir string) RegistryOption {
+	return func(config *etcdSchemaRegistryConfig) {
+		config.rootDir = rootDir
+	}
+}
+
+func UseListener(client, peer string) RegistryOption {
+	return func(config *etcdSchemaRegistryConfig) {
+		config.listenerClientURL = client
+		config.listenerPeerURL = peer
+	}
+}
+
+func UseUnixDomain() RegistryOption {
+	return func(config *etcdSchemaRegistryConfig) {
+		config.listenerClientURL, config.listenerPeerURL = RandomUnixDomainListener()
+	}
+}
+
+func RandomUnixDomainListener() (string, string) {
+	i := rand.Uint64()
+	return fmt.Sprintf("%s://localhost:%d%06d", "unix", os.Getpid(), i), fmt.Sprintf("%s://localhost:%d%06d", "unix", os.Getpid(), i+1)
+}
+
+type etcdSchemaRegistry struct {
+	server *embed.Etcd
+	kv     clientv3.KV
+}
+
+type etcdSchemaRegistryConfig struct {
+	// preload internal schema
+	preload bool
+	// rootDir is the root directory for etcd storage
+	rootDir string
+	// listenerClientURL is the listener for client
+	listenerClientURL string
+	// listenerPeerURL is the listener for peer
+	listenerPeerURL string
+}
+
+func (e *etcdSchemaRegistry) ExistGroup(ctx context.Context, group string) (bool, error) {

Review comment:
       `GetGroup` is a better name.

##########
File path: banyand/metadata/schema/etcd.go
##########
@@ -0,0 +1,520 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package schema
+
+import (
+	"context"
+	"fmt"
+	"math/rand"
+	"net/url"
+	"os"
+	"path"
+	"path/filepath"
+
+	googleUUID "github.com/google/uuid"
+	"github.com/pkg/errors"
+	clientv3 "go.etcd.io/etcd/client/v3"
+	"go.etcd.io/etcd/server/v3/embed"
+	"google.golang.org/protobuf/encoding/protojson"
+	"google.golang.org/protobuf/proto"
+
+	commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+	databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+)
+
+var (
+	_ Stream           = (*etcdSchemaRegistry)(nil)
+	_ IndexRuleBinding = (*etcdSchemaRegistry)(nil)
+	_ IndexRule        = (*etcdSchemaRegistry)(nil)
+	_ Measure          = (*etcdSchemaRegistry)(nil)
+	_ Group            = (*etcdSchemaRegistry)(nil)
+
+	ErrEntityNotFound             = errors.New("entity is not found")
+	ErrUnexpectedNumberOfEntities = errors.New("unexpected number of entities")
+	ErrGroupAlreadyDefined        = errors.New("group is already defined")
+	ErrGroupNotDefined            = errors.New("group is not defined or has already been deleted")
+
+	StreamKeyPrefix           = "/stream/"
+	IndexRuleBindingKeyPrefix = "/index-rule-binding/"
+	IndexRuleKeyPrefix        = "/index-rule/"
+	MeasureKeyPrefix          = "/measure/"
+	GroupsKeyPrefix           = "/groups/"
+)
+
+type RegistryOption func(*etcdSchemaRegistryConfig)
+
+func PreloadSchema() RegistryOption {
+	return func(config *etcdSchemaRegistryConfig) {
+		config.preload = true
+	}
+}
+
+func UseRandomTempDir() RegistryOption {
+	return func(config *etcdSchemaRegistryConfig) {
+		config.rootDir = RandomTempDir()
+	}
+}
+
+func RandomTempDir() string {
+	return path.Join(os.TempDir(), fmt.Sprintf("banyandb-embed-etcd-%s", googleUUID.New().String()))
+}
+
+func RootDir(rootDir string) RegistryOption {
+	return func(config *etcdSchemaRegistryConfig) {
+		config.rootDir = rootDir
+	}
+}
+
+func UseListener(client, peer string) RegistryOption {
+	return func(config *etcdSchemaRegistryConfig) {
+		config.listenerClientURL = client
+		config.listenerPeerURL = peer
+	}
+}
+
+func UseUnixDomain() RegistryOption {
+	return func(config *etcdSchemaRegistryConfig) {
+		config.listenerClientURL, config.listenerPeerURL = RandomUnixDomainListener()
+	}
+}
+
+func RandomUnixDomainListener() (string, string) {
+	i := rand.Uint64()
+	return fmt.Sprintf("%s://localhost:%d%06d", "unix", os.Getpid(), i), fmt.Sprintf("%s://localhost:%d%06d", "unix", os.Getpid(), i+1)

Review comment:
       Move to `pkg/test` as well.

##########
File path: banyand/metadata/schema/etcd.go
##########
@@ -0,0 +1,520 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package schema
+
+import (
+	"context"
+	"fmt"
+	"math/rand"
+	"net/url"
+	"os"
+	"path"
+	"path/filepath"
+
+	googleUUID "github.com/google/uuid"
+	"github.com/pkg/errors"
+	clientv3 "go.etcd.io/etcd/client/v3"
+	"go.etcd.io/etcd/server/v3/embed"
+	"google.golang.org/protobuf/encoding/protojson"
+	"google.golang.org/protobuf/proto"
+
+	commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+	databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+)
+
+var (
+	_ Stream           = (*etcdSchemaRegistry)(nil)
+	_ IndexRuleBinding = (*etcdSchemaRegistry)(nil)
+	_ IndexRule        = (*etcdSchemaRegistry)(nil)
+	_ Measure          = (*etcdSchemaRegistry)(nil)
+	_ Group            = (*etcdSchemaRegistry)(nil)
+
+	ErrEntityNotFound             = errors.New("entity is not found")
+	ErrUnexpectedNumberOfEntities = errors.New("unexpected number of entities")
+	ErrGroupAlreadyDefined        = errors.New("group is already defined")
+	ErrGroupNotDefined            = errors.New("group is not defined or has already been deleted")
+
+	StreamKeyPrefix           = "/stream/"
+	IndexRuleBindingKeyPrefix = "/index-rule-binding/"
+	IndexRuleKeyPrefix        = "/index-rule/"
+	MeasureKeyPrefix          = "/measure/"
+	GroupsKeyPrefix           = "/groups/"
+)
+
+type RegistryOption func(*etcdSchemaRegistryConfig)
+
+func PreloadSchema() RegistryOption {
+	return func(config *etcdSchemaRegistryConfig) {
+		config.preload = true
+	}
+}
+
+func UseRandomTempDir() RegistryOption {
+	return func(config *etcdSchemaRegistryConfig) {
+		config.rootDir = RandomTempDir()
+	}
+}
+
+func RandomTempDir() string {
+	return path.Join(os.TempDir(), fmt.Sprintf("banyandb-embed-etcd-%s", googleUUID.New().String()))
+}

Review comment:
       Are they for testing purposes? if that, move them to `pkg/test`

##########
File path: api/proto/banyandb/common/v1/common.proto
##########
@@ -36,3 +36,11 @@ message Metadata {
     string name = 2;
     uint32 id = 3;
 }
+
+// Group is an internal object for Group management
+message Group {
+    // name of the group
+    string name = 1;
+    // deleted marks whether this group has been deleted
+    bool deleted = 2;

Review comment:
       Why do we need this? Could not delete `Group` directly?
   
   BTW, `Group` need `updated_time` as other resources.

##########
File path: banyand/metadata/metadata.go
##########
@@ -37,50 +39,99 @@ type IndexFilter interface {
 
 type Repo interface {
 	IndexFilter
-	Stream() schema.Stream
+	StreamRegistry() schema.Stream
+	IndexRuleRegistry() schema.IndexRule
+	IndexRuleBindingRegistry() schema.IndexRuleBinding
+	MeasureRegistry() schema.Measure
+	GroupRegistry() schema.Group
 }
 
 type Service interface {
 	Repo
-	run.Unit
+	run.PreRunner
+	run.Service
+	run.Config
 }
 
 type service struct {
-	stream           schema.Stream
-	indexRule        schema.IndexRule
-	indexRuleBinding schema.IndexRuleBinding
+	schemaRegistry    schema.Registry
+	stopCh            chan struct{}
+	clientListenerURL string
+	peerListenerURL   string
+	rootDir           string
 }
 
-func NewService(_ context.Context) (Service, error) {
-	stream, err := schema.NewStream()
-	if err != nil {
-		return nil, err
+func (s *service) FlagSet() *run.FlagSet {
+	fs := run.NewFlagSet("metadata")
+	fs.StringVarP(&s.clientListenerURL, "listener-client-url", "", embed.DefaultListenClientURLs,
+		"listener for client")
+	fs.StringVarP(&s.peerListenerURL, "listener-peer-url", "", embed.DefaultListenPeerURLs,
+		"listener for peer")

Review comment:
       In which scenario, users have to config them. They look like internal components. 
   
   In distributed mode, banyandb could leverage the discovery module to distribute this info across the cluster.

##########
File path: pkg/query/logical/analyzer.go
##########
@@ -77,25 +78,49 @@ func (t *Tag) GetFamilyName() string {
 }
 
 type Analyzer struct {
-	indexRuleRepo        metadataSchema.IndexRule
-	indexRuleBindingRepo metadataSchema.IndexRuleBinding
-	metadataRepoImpl     metadata.Repo
+	metadataRepoImpl metadata.Repo
 }
 
-func DefaultAnalyzer() *Analyzer {
-	indexRule, _ := metadataSchema.NewIndexRule()
-	indexRuleBinding, _ := metadataSchema.NewIndexRuleBinding()
-	metadataService, _ := metadata.NewService(context.TODO())
+// DefaultAnalyzer creates a default analyzer for testing.
+// You have to close the underlying metadata after test
+func DefaultAnalyzer() (*Analyzer, func(), error) {

Review comment:
       We should move this function into a test file since it's for testing. 

##########
File path: banyand/metadata/schema/etcd.go
##########
@@ -0,0 +1,520 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package schema
+
+import (
+	"context"
+	"fmt"
+	"math/rand"
+	"net/url"
+	"os"
+	"path"
+	"path/filepath"
+
+	googleUUID "github.com/google/uuid"
+	"github.com/pkg/errors"
+	clientv3 "go.etcd.io/etcd/client/v3"
+	"go.etcd.io/etcd/server/v3/embed"
+	"google.golang.org/protobuf/encoding/protojson"
+	"google.golang.org/protobuf/proto"
+
+	commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+	databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+)
+
+var (
+	_ Stream           = (*etcdSchemaRegistry)(nil)
+	_ IndexRuleBinding = (*etcdSchemaRegistry)(nil)
+	_ IndexRule        = (*etcdSchemaRegistry)(nil)
+	_ Measure          = (*etcdSchemaRegistry)(nil)
+	_ Group            = (*etcdSchemaRegistry)(nil)
+
+	ErrEntityNotFound             = errors.New("entity is not found")
+	ErrUnexpectedNumberOfEntities = errors.New("unexpected number of entities")
+	ErrGroupAlreadyDefined        = errors.New("group is already defined")
+	ErrGroupNotDefined            = errors.New("group is not defined or has already been deleted")
+
+	StreamKeyPrefix           = "/stream/"
+	IndexRuleBindingKeyPrefix = "/index-rule-binding/"
+	IndexRuleKeyPrefix        = "/index-rule/"
+	MeasureKeyPrefix          = "/measure/"
+	GroupsKeyPrefix           = "/groups/"
+)
+
+type RegistryOption func(*etcdSchemaRegistryConfig)
+
+func PreloadSchema() RegistryOption {
+	return func(config *etcdSchemaRegistryConfig) {
+		config.preload = true
+	}
+}
+
+func UseRandomTempDir() RegistryOption {
+	return func(config *etcdSchemaRegistryConfig) {
+		config.rootDir = RandomTempDir()
+	}
+}
+
+func RandomTempDir() string {
+	return path.Join(os.TempDir(), fmt.Sprintf("banyandb-embed-etcd-%s", googleUUID.New().String()))
+}
+
+func RootDir(rootDir string) RegistryOption {
+	return func(config *etcdSchemaRegistryConfig) {
+		config.rootDir = rootDir
+	}
+}
+
+func UseListener(client, peer string) RegistryOption {
+	return func(config *etcdSchemaRegistryConfig) {
+		config.listenerClientURL = client
+		config.listenerPeerURL = peer
+	}
+}
+
+func UseUnixDomain() RegistryOption {
+	return func(config *etcdSchemaRegistryConfig) {
+		config.listenerClientURL, config.listenerPeerURL = RandomUnixDomainListener()
+	}
+}
+
+func RandomUnixDomainListener() (string, string) {
+	i := rand.Uint64()
+	return fmt.Sprintf("%s://localhost:%d%06d", "unix", os.Getpid(), i), fmt.Sprintf("%s://localhost:%d%06d", "unix", os.Getpid(), i+1)
+}
+
+type etcdSchemaRegistry struct {
+	server *embed.Etcd
+	kv     clientv3.KV
+}
+
+type etcdSchemaRegistryConfig struct {
+	// preload internal schema
+	preload bool
+	// rootDir is the root directory for etcd storage
+	rootDir string
+	// listenerClientURL is the listener for client
+	listenerClientURL string
+	// listenerPeerURL is the listener for peer
+	listenerPeerURL string
+}
+
+func (e *etcdSchemaRegistry) ExistGroup(ctx context.Context, group string) (bool, error) {
+	var entity commonv1.Group
+	err := e.get(ctx, formatGroupKey(group), &entity)
+	if err != nil && !errors.Is(err, ErrEntityNotFound) {
+		return false, err
+	}
+	return entity.GetName() != "" && !entity.Deleted, nil
+}
+
+func (e *etcdSchemaRegistry) ListGroup(ctx context.Context) ([]string, error) {
+	messages, err := e.listWithPrefix(ctx, GroupsKeyPrefix, func() proto.Message {
+		return &commonv1.Group{}
+	})
+	if err != nil {
+		return nil, err
+	}
+	groups := make([]string, len(messages))
+	for i, message := range messages {
+		groups[i] = message.(*commonv1.Group).GetName()
+	}
+	return groups, nil
+}
+
+func (e *etcdSchemaRegistry) DeleteGroup(ctx context.Context, group string) (bool, error) {
+	exist, err := e.ExistGroup(ctx, group)
+	if err != nil {
+		return false, err
+	}
+	if !exist {
+		return false, errors.Wrap(ErrGroupNotDefined, group)
+	}
+	if err := e.update(ctx, formatGroupKey(group), &commonv1.Group{
+		Name:    group,
+		Deleted: true,
+	}); err != nil {
+		return false, err
+	}
+	return true, nil
+}
+
+func (e *etcdSchemaRegistry) CreateGroup(ctx context.Context, group string) error {
+	exist, err := e.ExistGroup(ctx, group)
+	if err != nil {
+		return err
+	}
+	if exist {
+		return errors.Wrap(ErrGroupAlreadyDefined, group)
+	}
+	return e.update(ctx, formatGroupKey(group), &commonv1.Group{
+		Name:    group,
+		Deleted: false,
+	})
+}
+
+func (e *etcdSchemaRegistry) GetMeasure(ctx context.Context, metadata *commonv1.Metadata) (*databasev1.Measure, error) {
+	var entity databasev1.Measure
+	if err := e.get(ctx, formatMeasureKey(metadata), &entity); err != nil {
+		return nil, err
+	}
+	return &entity, nil
+}
+
+func (e *etcdSchemaRegistry) ListMeasure(ctx context.Context, opt ListOpt) ([]*databasev1.Measure, error) {
+	keyPrefix := MeasureKeyPrefix
+	if opt.Group != "" {
+		keyPrefix += opt.Group + "/"
+	}
+	messages, err := e.listWithPrefix(ctx, keyPrefix, func() proto.Message {
+		return &databasev1.Measure{}
+	})
+	if err != nil {
+		return nil, err
+	}
+	entities := make([]*databasev1.Measure, len(messages))
+	for i, message := range messages {
+		entities[i] = message.(*databasev1.Measure)
+	}
+	return entities, nil
+}
+
+func (e *etcdSchemaRegistry) UpdateMeasure(ctx context.Context, measure *databasev1.Measure) error {
+	groupExist, err := e.ExistGroup(ctx, measure.GetMetadata().GetGroup())
+	if err != nil {
+		return err
+	}
+	if !groupExist {
+		return errors.Wrap(ErrGroupNotDefined, measure.GetMetadata().GetGroup())
+	}
+	return e.update(ctx, formatMeasureKey(measure.GetMetadata()), measure)
+}
+
+func (e *etcdSchemaRegistry) DeleteMeasure(ctx context.Context, metadata *commonv1.Metadata) (bool, error) {
+	return e.delete(ctx, formatMeasureKey(metadata))
+}
+
+func (e *etcdSchemaRegistry) GetStream(ctx context.Context, metadata *commonv1.Metadata) (*databasev1.Stream, error) {
+	var entity databasev1.Stream
+	if err := e.get(ctx, formatSteamKey(metadata), &entity); err != nil {
+		return nil, err
+	}
+	return &entity, nil
+}
+
+func (e *etcdSchemaRegistry) ListStream(ctx context.Context, opt ListOpt) ([]*databasev1.Stream, error) {
+	keyPrefix := StreamKeyPrefix
+	if opt.Group != "" {
+		keyPrefix += opt.Group + "/"
+	}
+	messages, err := e.listWithPrefix(ctx, keyPrefix, func() proto.Message {
+		return &databasev1.Stream{}
+	})
+	if err != nil {
+		return nil, err
+	}
+	entities := make([]*databasev1.Stream, len(messages))
+	for i, message := range messages {
+		entities[i] = message.(*databasev1.Stream)
+	}
+	return entities, nil
+}
+
+func (e *etcdSchemaRegistry) UpdateStream(ctx context.Context, stream *databasev1.Stream) error {
+	groupExist, err := e.ExistGroup(ctx, stream.GetMetadata().GetGroup())
+	if err != nil {
+		return err
+	}
+	if !groupExist {
+		return errors.Wrap(ErrGroupNotDefined, stream.GetMetadata().GetGroup())
+	}
+	return e.update(ctx, formatSteamKey(stream.GetMetadata()), stream)
+}
+
+func (e *etcdSchemaRegistry) DeleteStream(ctx context.Context, metadata *commonv1.Metadata) (bool, error) {
+	return e.delete(ctx, formatSteamKey(metadata))
+}
+
+func (e *etcdSchemaRegistry) GetIndexRuleBinding(ctx context.Context, metadata *commonv1.Metadata) (*databasev1.IndexRuleBinding, error) {
+	var indexRuleBinding databasev1.IndexRuleBinding
+	if err := e.get(ctx, formatIndexRuleBindingKey(metadata), &indexRuleBinding); err != nil {
+		return nil, err
+	}
+	return &indexRuleBinding, nil
+}
+
+func (e *etcdSchemaRegistry) ListIndexRuleBinding(ctx context.Context, opt ListOpt) ([]*databasev1.IndexRuleBinding, error) {
+	keyPrefix := IndexRuleBindingKeyPrefix
+	if opt.Group != "" {
+		keyPrefix += opt.Group + "/"
+	}
+	messages, err := e.listWithPrefix(ctx, keyPrefix, func() proto.Message {
+		return &databasev1.IndexRuleBinding{}
+	})
+	if err != nil {
+		return nil, err
+	}
+	entities := make([]*databasev1.IndexRuleBinding, len(messages))
+	for i, message := range messages {
+		entities[i] = message.(*databasev1.IndexRuleBinding)
+	}
+	return entities, nil
+}
+
+func (e *etcdSchemaRegistry) UpdateIndexRuleBinding(ctx context.Context, indexRuleBinding *databasev1.IndexRuleBinding) error {
+	groupExist, err := e.ExistGroup(ctx, indexRuleBinding.GetMetadata().GetGroup())
+	if err != nil {
+		return err
+	}
+	if !groupExist {
+		return errors.Wrap(ErrGroupNotDefined, indexRuleBinding.GetMetadata().GetGroup())
+	}
+	return e.update(ctx, formatIndexRuleBindingKey(indexRuleBinding.GetMetadata()), indexRuleBinding)
+}
+
+func (e *etcdSchemaRegistry) DeleteIndexRuleBinding(ctx context.Context, metadata *commonv1.Metadata) (bool, error) {
+	return e.delete(ctx, formatIndexRuleBindingKey(metadata))
+}
+
+func (e *etcdSchemaRegistry) GetIndexRule(ctx context.Context, metadata *commonv1.Metadata) (*databasev1.IndexRule, error) {
+	var entity databasev1.IndexRule
+	if err := e.get(ctx, formatIndexRuleKey(metadata), &entity); err != nil {
+		return nil, err
+	}
+	return &entity, nil
+}
+
+func (e *etcdSchemaRegistry) ListIndexRule(ctx context.Context, opt ListOpt) ([]*databasev1.IndexRule, error) {
+	keyPrefix := IndexRuleKeyPrefix
+	if opt.Group != "" {
+		keyPrefix += opt.Group + "/"
+	}
+	messages, err := e.listWithPrefix(ctx, keyPrefix, func() proto.Message {
+		return &databasev1.IndexRule{}
+	})
+	if err != nil {
+		return nil, err
+	}
+	entities := make([]*databasev1.IndexRule, len(messages))
+	for i, message := range messages {
+		entities[i] = message.(*databasev1.IndexRule)
+	}
+	return entities, nil
+}
+
+func (e *etcdSchemaRegistry) UpdateIndexRule(ctx context.Context, indexRule *databasev1.IndexRule) error {
+	groupExist, err := e.ExistGroup(ctx, indexRule.GetMetadata().GetGroup())
+	if err != nil {
+		return err
+	}
+	if !groupExist {
+		return errors.Wrap(ErrGroupNotDefined, indexRule.GetMetadata().GetGroup())
+	}
+	return e.update(ctx, formatIndexRuleKey(indexRule.GetMetadata()), indexRule)
+}
+
+func (e *etcdSchemaRegistry) DeleteIndexRule(ctx context.Context, metadata *commonv1.Metadata) (bool, error) {
+	return e.delete(ctx, formatIndexRuleKey(metadata))
+}
+
+func (e *etcdSchemaRegistry) preload() error {
+	if err := e.CreateGroup(context.TODO(), "default"); err != nil {
+		return err
+	}
+
+	s := &databasev1.Stream{}
+	if err := protojson.Unmarshal([]byte(streamJSON), s); err != nil {
+		return err
+	}
+	err := e.UpdateStream(context.Background(), s)
+	if err != nil {
+		return err
+	}
+
+	indexRuleBinding := &databasev1.IndexRuleBinding{}
+	if err = protojson.Unmarshal([]byte(indexRuleBindingJSON), indexRuleBinding); err != nil {
+		return err
+	}
+	err = e.UpdateIndexRuleBinding(context.Background(), indexRuleBinding)
+	if err != nil {
+		return err
+	}
+
+	entries, err := indexRuleStore.ReadDir(indexRuleDir)
+	if err != nil {
+		return err
+	}
+	for _, entry := range entries {
+		data, err := indexRuleStore.ReadFile(indexRuleDir + "/" + entry.Name())
+		if err != nil {
+			return err
+		}
+		var idxRule databasev1.IndexRule
+		err = protojson.Unmarshal(data, &idxRule)
+		if err != nil {
+			return err
+		}
+		err = e.UpdateIndexRule(context.Background(), &idxRule)
+		if err != nil {
+			return err
+		}
+	}
+
+	return nil
+}
+
+func (e *etcdSchemaRegistry) Close() error {
+	e.server.Close()
+	return nil
+}
+
+func NewEtcdSchemaRegistry(options ...RegistryOption) (Registry, error) {
+	registryConfig := &etcdSchemaRegistryConfig{
+		rootDir:           os.TempDir(),
+		listenerClientURL: embed.DefaultListenClientURLs,
+		listenerPeerURL:   embed.DefaultListenPeerURLs,
+	}
+	for _, opt := range options {
+		opt(registryConfig)
+	}
+	// TODO: allow use cluster setting
+	embedConfig := newStandaloneEtcdConfig(registryConfig)
+	e, err := embed.StartEtcd(embedConfig)
+	if err != nil {
+		return nil, err
+	}
+	if e != nil {
+		<-e.Server.ReadyNotify() // wait for e.Server to join the cluster
+	}
+	client, err := clientv3.NewFromURL(e.Config().ACUrls[0].String())
+	if err != nil {
+		return nil, err
+	}
+	kvClient := clientv3.NewKV(client)
+	reg := &etcdSchemaRegistry{
+		server: e,
+		kv:     kvClient,
+	}
+	if registryConfig.preload {
+		err := reg.preload()
+		if err != nil {
+			return nil, err
+		}
+	}
+	return reg, nil
+}
+
+func (e *etcdSchemaRegistry) get(ctx context.Context, key string, message proto.Message) error {
+	resp, err := e.kv.Get(ctx, key)
+	if err != nil {
+		return err
+	}
+	if resp.Count == 0 {
+		return ErrEntityNotFound
+	}
+	if resp.Count > 1 {
+		return ErrUnexpectedNumberOfEntities
+	}
+	if err := proto.Unmarshal(resp.Kvs[0].Value, message); err != nil {
+		return err
+	}
+	return nil
+}
+
+func (e *etcdSchemaRegistry) update(ctx context.Context, key string, message proto.Message) error {
+	val, err := proto.Marshal(message)
+	if err != nil {
+		return err
+	}
+	_, err = e.kv.Put(ctx, key, string(val))
+	if err != nil {
+		return err
+	}
+	return nil
+}
+
+func (e *etcdSchemaRegistry) listWithPrefix(ctx context.Context, prefix string, factory func() proto.Message) ([]proto.Message, error) {
+	resp, err := e.kv.Get(ctx, prefix, clientv3.WithFromKey(), clientv3.WithRange(incrementLastByte(prefix)))
+	if err != nil {
+		return nil, err
+	}
+	if resp.Count == 0 {
+		return nil, ErrEntityNotFound
+	}
+	entities := make([]proto.Message, resp.Count)
+	for i := int64(0); i < resp.Count; i++ {
+		message := factory()
+		if err := proto.Unmarshal(resp.Kvs[0].Value, message); err != nil {
+			return nil, err
+		}
+		entities[i] = message
+	}
+	return entities, nil
+}
+
+func (e *etcdSchemaRegistry) delete(ctx context.Context, key string) (bool, error) {
+	resp, err := e.kv.Delete(ctx, key)
+	if err != nil {
+		return false, err
+	}
+	return resp.Deleted > 0, nil
+}
+
+func formatIndexRuleKey(metadata *commonv1.Metadata) string {
+	return formatKey(IndexRuleKeyPrefix, metadata)
+}
+
+func formatIndexRuleBindingKey(metadata *commonv1.Metadata) string {
+	return formatKey(IndexRuleBindingKeyPrefix, metadata)
+}
+
+func formatSteamKey(metadata *commonv1.Metadata) string {
+	return formatKey(StreamKeyPrefix, metadata)
+}
+
+func formatMeasureKey(metadata *commonv1.Metadata) string {
+	return formatKey(MeasureKeyPrefix, metadata)
+}
+
+func formatKey(prefix string, metadata *commonv1.Metadata) string {
+	return prefix + metadata.GetGroup() + "/" + metadata.GetName()
+}
+
+func formatGroupKey(group string) string {
+	return GroupsKeyPrefix + group
+}
+
+func incrementLastByte(key string) string {
+	bb := []byte(key)
+	bb[len(bb)-1]++
+	return string(bb)
+}
+
+func newStandaloneEtcdConfig(config *etcdSchemaRegistryConfig) *embed.Config {
+	cfg := embed.NewConfig()
+	// TODO: allow user to set path
+	cfg.Dir = filepath.Join(config.rootDir, "embed-etcd")

Review comment:
       replace `embed-etcd` with `metadata`

##########
File path: banyand/metadata/metadata.go
##########
@@ -37,50 +39,99 @@ type IndexFilter interface {
 
 type Repo interface {
 	IndexFilter
-	Stream() schema.Stream
+	StreamRegistry() schema.Stream
+	IndexRuleRegistry() schema.IndexRule
+	IndexRuleBindingRegistry() schema.IndexRuleBinding
+	MeasureRegistry() schema.Measure
+	GroupRegistry() schema.Group
 }
 
 type Service interface {
 	Repo
-	run.Unit
+	run.PreRunner
+	run.Service
+	run.Config
 }
 
 type service struct {
-	stream           schema.Stream
-	indexRule        schema.IndexRule
-	indexRuleBinding schema.IndexRuleBinding
+	schemaRegistry    schema.Registry
+	stopCh            chan struct{}
+	clientListenerURL string
+	peerListenerURL   string
+	rootDir           string
 }
 
-func NewService(_ context.Context) (Service, error) {
-	stream, err := schema.NewStream()
-	if err != nil {
-		return nil, err
+func (s *service) FlagSet() *run.FlagSet {
+	fs := run.NewFlagSet("metadata")
+	fs.StringVarP(&s.clientListenerURL, "listener-client-url", "", embed.DefaultListenClientURLs,
+		"listener for client")
+	fs.StringVarP(&s.peerListenerURL, "listener-peer-url", "", embed.DefaultListenPeerURLs,
+		"listener for peer")
+	fs.StringVarP(&s.rootDir, "etcd-root-path", "", "/tmp", "the root path of database")
+	return fs
+}
+
+func (s *service) Validate() error {
+	if s.clientListenerURL == "" || s.peerListenerURL == "" {
+		return errors.New("listener cannot be set to empty")
 	}
-	indexRule, err := schema.NewIndexRule()
-	if err != nil {
-		return nil, err
+	if s.rootDir == "" {
+		return errors.New("rootDir is empty")
 	}
-	indexRuleBinding, err := schema.NewIndexRuleBinding()
-	if err != nil {
-		return nil, err
+	return nil
+}
+
+func (s *service) PreRun() error {
+	var err error
+	s.schemaRegistry, err = schema.NewEtcdSchemaRegistry(schema.PreloadSchema(),
+		schema.UseListener(s.clientListenerURL, s.peerListenerURL),
+		schema.RootDir(s.rootDir))
+	return err
+}
+
+func (s *service) Serve() error {
+	s.stopCh = make(chan struct{})

Review comment:
       Could you use etcd's `StoppingNotify` to block this method in case of etcd server error?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-banyandb] hanahmily commented on pull request #55: Feat: metadata registry with embedded etcd

Posted by GitBox <gi...@apache.org>.
hanahmily commented on pull request #55:
URL: https://github.com/apache/skywalking-banyandb/pull/55#issuecomment-943279077


   > @hanahmily It seems `protoc` is included in the `buf` and what we've installed is actually not used by the `buf generate`.
   
   After some local verifications, I got the same result. How about to remove `protoc` installation process from `base.mk`?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [skywalking-banyandb] lujiajing1126 commented on a change in pull request #55: Feat: metadata registry with embedded etcd

Posted by GitBox <gi...@apache.org>.
lujiajing1126 commented on a change in pull request #55:
URL: https://github.com/apache/skywalking-banyandb/pull/55#discussion_r729012657



##########
File path: api/proto/banyandb/database/v1/rpc.proto
##########
@@ -0,0 +1,77 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+syntax = "proto3";
+
+option java_package = "org.apache.skywalking.banyandb.database.v1";
+option go_package = "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1";
+
+package banyandb.database.v1;
+
+import "banyandb/database/v1/schema.proto";
+import "banyandb/common/v1/common.proto";
+import "google/protobuf/struct.proto";
+
+// EntityType includes three kinds of entities (schema)
+// that can be create/delete/modified at runtime.
+enum EntityType {
+  ENTITY_TYPE_UNSPECIFIED = 0;
+  ENTITY_TYPE_STREAM = 1;
+  ENTITY_TYPE_INDEX_RULE_BINDING = 2;
+  ENTITY_TYPE_INDEX_RULE = 3;
+}
+
+// EntityCreateOrUpdateRequest is the request for creating
+// or updating entity.
+message EntityCreateOrUpdateRequest {
+  oneof entity {
+    banyandb.database.v1.Stream stream = 1;
+    banyandb.database.v1.IndexRuleBinding index_rule_binding = 2;
+    banyandb.database.v1.IndexRule index_rule = 3;
+  }
+}
+
+message EntityCreateOrUpdateResponse {
+}
+
+// GeneralEntityRequest can be used to delete, get entity
+// with the given entity_type and its metadata.
+message GeneralEntityRequest {
+  banyandb.common.v1.Metadata metadata = 1;
+  EntityType entity_type = 2;
+}
+
+message EntityDeleteResponse {
+}
+
+// EntityGetResponse is the response for Get method.
+// It contains an entity which is possibly nullable which
+// means the entity cannot be found
+message EntityGetResponse {
+  oneof entity {
+    google.protobuf.NullValue null = 1;
+    banyandb.database.v1.Stream stream = 2;
+    banyandb.database.v1.IndexRuleBinding index_rule_binding = 3;
+    banyandb.database.v1.IndexRule index_rule = 4;
+  }
+}
+
+service EntityRegistry {
+  rpc CreateOrUpdate(EntityCreateOrUpdateRequest) returns (EntityCreateOrUpdateResponse);

Review comment:
       Done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@skywalking.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org