You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by li...@apache.org on 2022/01/25 13:27:32 UTC
[servicecomb-service-center] branch master updated: [feat] add SyncAll func (#1245)
This is an automated email from the ASF dual-hosted git repository.
littlecui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-service-center.git
The following commit(s) were added to refs/heads/master by this push:
new ae18050 [feat] add SyncAll func (#1245)
ae18050 is described below
commit ae180500f95f699e6c7ac071c681ec0a1ba00d12
Author: robotljw <79...@qq.com>
AuthorDate: Tue Jan 25 21:27:23 2022 +0800
[feat] add SyncAll func (#1245)
---
datasource/datasource.go | 1 +
datasource/etcd/etcd.go | 36 ++-
datasource/etcd/path/key_generator_test.go | 2 +
datasource/etcd/sync.go | 405 ++++++++++++++++++++++++
datasource/etcd/sync_test.go | 474 ++++++++++++++++++++++++++++
datasource/manager.go | 8 +
datasource/mongo/mongo.go | 6 +
datasource/{datasource.go => mongo/sync.go} | 23 +-
datasource/{datasource.go => sync.go} | 20 +-
9 files changed, 944 insertions(+), 31 deletions(-)
diff --git a/datasource/datasource.go b/datasource/datasource.go
index ae0727b..0c8a8cb 100644
--- a/datasource/datasource.go
+++ b/datasource/datasource.go
@@ -24,4 +24,5 @@ type DataSource interface {
MetadataManager() MetadataManager
SCManager() SCManager
MetricsManager() MetricsManager
+ SyncManager() SyncManager
}
diff --git a/datasource/etcd/etcd.go b/datasource/etcd/etcd.go
index 55884bf..e4f0cdd 100644
--- a/datasource/etcd/etcd.go
+++ b/datasource/etcd/etcd.go
@@ -1,17 +1,19 @@
-// Licensed to the Apache Software Foundation (ASF) under one or more
-// contributor license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright ownership.
-// The ASF licenses this file to You under the Apache License, Version 2.0
-// (the "License"); you may not use this file except in compliance with
-// the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package etcd
@@ -55,6 +57,7 @@ type DataSource struct {
depManager datasource.DependencyManager
scManager datasource.SCManager
metricsManager datasource.MetricsManager
+ syncManager datasource.SyncManager
}
func (ds *DataSource) SystemManager() datasource.SystemManager {
@@ -77,6 +80,10 @@ func (ds *DataSource) MetricsManager() datasource.MetricsManager {
return ds.metricsManager
}
+func (ds *DataSource) SyncManager() datasource.SyncManager {
+ return ds.syncManager
+}
+
func NewDataSource(opts datasource.Options) (datasource.DataSource, error) {
log.Warn("data source enable etcd mode")
@@ -105,6 +112,7 @@ func NewDataSource(opts datasource.Options) (datasource.DataSource, error) {
inst.depManager = &DepManager{}
inst.scManager = &SCManager{}
inst.metricsManager = &MetricsManager{}
+ inst.syncManager = &SyncManager{}
return inst, nil
}
diff --git a/datasource/etcd/path/key_generator_test.go b/datasource/etcd/path/key_generator_test.go
index 7e132d8..82541c3 100644
--- a/datasource/etcd/path/key_generator_test.go
+++ b/datasource/etcd/path/key_generator_test.go
@@ -41,9 +41,11 @@ func TestGenerateETCDDomainKey(t *testing.T) {
func TestGenerateAccountKey(t *testing.T) {
assert.Equal(t, "/cse-sr/accounts/admin", path.GenerateAccountKey("admin"))
}
+
func TestGenerateAccountSecretKey(t *testing.T) {
assert.Equal(t, "/cse-sr/rbac/secret", path.GenerateRBACSecretKey())
}
+
func TestGenerateDependencyRuleKey(t *testing.T) {
// consumer
k := path.GenerateConsumerDependencyRuleKey("a", nil)
diff --git a/datasource/etcd/sync.go b/datasource/etcd/sync.go
new file mode 100644
index 0000000..0c0c5d4
--- /dev/null
+++ b/datasource/etcd/sync.go
@@ -0,0 +1,405 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except request compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to request writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package etcd
+
+import (
+ "context"
+ "encoding/json"
+ "errors"
+ "fmt"
+ "strings"
+
+ "github.com/go-chassis/cari/discovery"
+ crbac "github.com/go-chassis/cari/rbac"
+ "github.com/little-cui/etcdadpt"
+
+ "github.com/apache/servicecomb-service-center/datasource"
+ "github.com/apache/servicecomb-service-center/datasource/etcd/path"
+ esync "github.com/apache/servicecomb-service-center/datasource/etcd/sync"
+ "github.com/apache/servicecomb-service-center/pkg/log"
+ putil "github.com/apache/servicecomb-service-center/pkg/util"
+ "github.com/apache/servicecomb-service-center/server/config"
+)
+
+const (
+ SyncAllKey = "/cse-sr/sync-all"
+)
+
+var (
+ ErrWithoutDomainProject = errors.New("key without domain and project")
+)
+
+type SyncManager struct {
+}
+
+// SyncAll will list all services,accounts,roles,schemas,tags,deps and use tasks to store
+func (s *SyncManager) SyncAll(ctx context.Context) error {
+ enable := config.GetBool("sync.enableOnStart", false)
+ if !enable {
+ return nil
+ }
+ exist, err := etcdadpt.Exist(ctx, SyncAllKey)
+ if err != nil {
+ return err
+ }
+ if exist {
+ log.Info(fmt.Sprintf("%s key already exists, do not need to do tasks", SyncAllKey))
+ return datasource.ErrSyncAllKeyExists
+ }
+ err = syncAllAccounts(ctx)
+ if err != nil {
+ return err
+ }
+ err = syncAllRoles(ctx)
+ if err != nil {
+ return err
+ }
+ err = syncAllServices(ctx)
+ if err != nil {
+ return err
+ }
+ err = syncAllTags(ctx)
+ if err != nil {
+ return err
+ }
+ err = syncAllSchemas(ctx)
+ if err != nil {
+ return err
+ }
+ err = syncAllDependencies(ctx)
+ if err != nil {
+ return err
+ }
+ return etcdadpt.Put(ctx, SyncAllKey, "1")
+}
+
+func syncAllAccounts(ctx context.Context) error {
+ kvs, _, err := etcdadpt.List(ctx, path.GenerateRBACAccountKey(""))
+ if err != nil {
+ return err
+ }
+ syncOpts := make([]etcdadpt.OpOptions, 0)
+ putil.SetDomain(ctx, "")
+ putil.SetProject(ctx, "")
+ for _, v := range kvs {
+ a := &crbac.Account{}
+ err = json.Unmarshal(v.Value, a)
+ if err != nil {
+ log.Error("fail to unmarshal account ", err)
+ return err
+ }
+ opt, err := esync.GenCreateOpts(ctx, datasource.ResourceAccount, a)
+ if err != nil {
+ log.Error("fail to create sync opts", err)
+ return err
+ }
+ syncOpts = append(syncOpts, opt...)
+ }
+ err = etcdadpt.Txn(ctx, syncOpts)
+ if err != nil {
+ log.Error("fail to account tasks", err)
+ }
+ return err
+}
+
+func syncAllRoles(ctx context.Context) error {
+ kvs, _, err := etcdadpt.List(ctx, path.GenerateRBACRoleKey(""))
+ if err != nil {
+ return err
+ }
+ syncOpts := make([]etcdadpt.OpOptions, 0)
+ putil.SetDomain(ctx, "")
+ putil.SetProject(ctx, "")
+ for _, v := range kvs {
+ r := &crbac.Role{}
+ err = json.Unmarshal(v.Value, r)
+ if err != nil {
+ log.Error("fail to unmarshal role", err)
+ return err
+ }
+ opt, err := esync.GenCreateOpts(ctx, datasource.ResourceRole, r)
+ if err != nil {
+ log.Error("fail to create sync opts", err)
+ return err
+ }
+ syncOpts = append(syncOpts, opt...)
+ }
+ err = etcdadpt.Txn(ctx, syncOpts)
+ if err != nil {
+ log.Error("fail to role tasks", err)
+ }
+ return err
+}
+
+// syncAllTags func use kv resource task to store tags
+func syncAllTags(ctx context.Context) error {
+ kvs, _, err := etcdadpt.List(ctx, path.GetServiceTagRootKey(""))
+ if err != nil {
+ return err
+ }
+ syncOpts := make([]etcdadpt.OpOptions, 0)
+ for _, kv := range kvs {
+ domain, project, err := getDomainProject(string(kv.Key), path.GetServiceTagRootKey(""))
+ if err != nil {
+ log.Error("fail to get domain and project", err)
+ return err
+ }
+ putil.SetDomain(ctx, domain)
+ putil.SetProject(ctx, project)
+ opts, err := esync.GenCreateOpts(ctx, datasource.ResourceKV, kv.Value,
+ esync.WithOpts(map[string]string{"key": string(kv.Key)}))
+ if err != nil {
+ log.Error("fail to create tag opts", err)
+ return err
+ }
+ syncOpts = append(syncOpts, opts...)
+ }
+ err = etcdadpt.Txn(ctx, syncOpts)
+ if err != nil {
+ log.Error("fail to create tag tasks", err)
+ }
+ return err
+}
+
+func syncAllServices(ctx context.Context) error {
+ kvs, _, err := etcdadpt.List(ctx, path.GetServiceRootKey(""))
+ if err != nil {
+ return err
+ }
+ syncOpts := make([]etcdadpt.OpOptions, 0)
+ for _, kv := range kvs {
+ service := &discovery.MicroService{}
+ err := json.Unmarshal(kv.Value, service)
+ if err != nil {
+ log.Error("fail to unmarshal service", err)
+ return err
+ }
+ domain, project, err := getDomainProject(string(kv.Key), path.GetServiceRootKey(""))
+ if err != nil {
+ log.Error("fail to get domain and project", err)
+ return err
+ }
+ putil.SetDomain(ctx, domain)
+ putil.SetProject(ctx, project)
+ request := &discovery.CreateServiceRequest{
+ Service: service,
+ }
+ opts, err := esync.GenCreateOpts(ctx, datasource.ResourceService, request)
+ if err != nil {
+ log.Error("fail to create service task", err)
+ return err
+ }
+ syncOpts = append(syncOpts, opts...)
+ }
+ err = etcdadpt.Txn(ctx, syncOpts)
+ if err != nil {
+ log.Error("fail to create service tasks", err)
+ }
+ return err
+}
+
+// syncAllSchemas func use kv resource task to store schemas
+func syncAllSchemas(ctx context.Context) error {
+ putil.SetDomain(ctx, "")
+ putil.SetProject(ctx, "")
+ err := syncAllServiceSchemas(ctx)
+ if err != nil {
+ return err
+ }
+ err = syncAllServiceSchemaRefs(ctx)
+ if err != nil {
+ return err
+ }
+ err = syncAllServiceSchemaContents(ctx)
+ if err != nil {
+ return err
+ }
+ return syncAllServiceSchemaSummaries(ctx)
+}
+
+func syncAllServiceSchemas(ctx context.Context) error {
+ kvs, _, err := etcdadpt.List(ctx, path.GetServiceSchemaRootKey(""))
+ if err != nil {
+ return err
+ }
+ syncOpts := make([]etcdadpt.OpOptions, 0)
+ for _, kv := range kvs {
+ domain, project, err := getDomainProject(string(kv.Key), path.GetServiceSchemaRootKey(""))
+ if err != nil {
+ log.Error("fail to get domain and project", err)
+ return err
+ }
+ putil.SetDomain(ctx, domain)
+ putil.SetProject(ctx, project)
+ opts, err := esync.GenCreateOpts(ctx, datasource.ResourceKV, kv.Value,
+ esync.WithOpts(map[string]string{"key": string(kv.Key)}))
+ if err != nil {
+ log.Error("fail to create schema opts", err)
+ return err
+ }
+ syncOpts = append(syncOpts, opts...)
+ }
+ err = etcdadpt.Txn(ctx, syncOpts)
+ if err != nil {
+ log.Error("fail to create schema tasks", err)
+ }
+ return err
+}
+
+func syncAllServiceSchemaRefs(ctx context.Context) error {
+ kvs, _, err := etcdadpt.List(ctx, path.GetServiceSchemaRefRootKey(""))
+ if err != nil {
+ return err
+ }
+ syncOpts := make([]etcdadpt.OpOptions, 0)
+ for _, kv := range kvs {
+ domain, project, err := getDomainProject(string(kv.Key), path.GetServiceSchemaRefRootKey(""))
+ if err != nil {
+ log.Error("fail to get domain and project", err)
+ return err
+ }
+ putil.SetDomain(ctx, domain)
+ putil.SetProject(ctx, project)
+ opts, err := esync.GenCreateOpts(ctx, datasource.ResourceKV, kv.Value,
+ esync.WithOpts(map[string]string{"key": string(kv.Key)}))
+ if err != nil {
+ log.Error("fail to create schema ref opts", err)
+ return err
+ }
+ syncOpts = append(syncOpts, opts...)
+ }
+ err = etcdadpt.Txn(ctx, syncOpts)
+ if err != nil {
+ log.Error("fail to create schema ref tasks", err)
+ }
+ return err
+}
+
+func syncAllServiceSchemaContents(ctx context.Context) error {
+ kvs, _, err := etcdadpt.List(ctx, path.GetServiceSchemaContentRootKey(""))
+ if err != nil {
+ return err
+ }
+ syncOpts := make([]etcdadpt.OpOptions, 0)
+ for _, kv := range kvs {
+ domain, project, err := getDomainProject(string(kv.Key), path.GetServiceSchemaContentRootKey(""))
+ if err != nil {
+ log.Error("fail to get domain and project", err)
+ return err
+ }
+ putil.SetDomain(ctx, domain)
+ putil.SetProject(ctx, project)
+ opts, err := esync.GenCreateOpts(ctx, datasource.ResourceKV, kv.Value,
+ esync.WithOpts(map[string]string{"key": string(kv.Key)}))
+ if err != nil {
+ log.Error("fail to create schema content opts", err)
+ return err
+ }
+ syncOpts = append(syncOpts, opts...)
+ }
+ err = etcdadpt.Txn(ctx, syncOpts)
+ if err != nil {
+ log.Error("fail to create schema content tasks", err)
+ }
+ return err
+}
+
+func syncAllServiceSchemaSummaries(ctx context.Context) error {
+ kvs, _, err := etcdadpt.List(ctx, path.GetServiceSchemaSummaryRootKey(""))
+ if err != nil {
+ return err
+ }
+ syncOpts := make([]etcdadpt.OpOptions, 0)
+ for _, kv := range kvs {
+ domain, project, err := getDomainProject(string(kv.Key), path.GetServiceSchemaSummaryRootKey(""))
+ if err != nil {
+ log.Error("fail to get domain and project", err)
+ return err
+ }
+ putil.SetDomain(ctx, domain)
+ putil.SetProject(ctx, project)
+ opts, err := esync.GenCreateOpts(ctx, datasource.ResourceKV, kv.Value,
+ esync.WithOpts(map[string]string{"key": string(kv.Key)}))
+ if err != nil {
+ log.Error("fail to create schema summary opts", err)
+ return err
+ }
+ syncOpts = append(syncOpts, opts...)
+ }
+ err = etcdadpt.Txn(ctx, syncOpts)
+ if err != nil {
+ log.Error("fail to create schema summary tasks", err)
+ }
+ return err
+}
+
+func syncAllDependencies(ctx context.Context) error {
+ kvs, _, err := etcdadpt.List(ctx, path.GetServiceDependencyQueueRootKey(""))
+ if err != nil {
+ return err
+ }
+ syncOpts := make([]etcdadpt.OpOptions, 0)
+ depInfosMap := make(map[string][]*discovery.ConsumerDependency)
+ for _, kv := range kvs {
+ dep := &discovery.ConsumerDependency{}
+ err = json.Unmarshal(kv.Value, dep)
+ if err != nil {
+ log.Error("fail to unmarshal dependency ", err)
+ return err
+ }
+ domain, project, err := getDomainProject(string(kv.Key), path.GetServiceDependencyQueueRootKey(""))
+ if err != nil {
+ log.Error("fail to get domain and project", err)
+ return err
+ }
+ key := domain + "/" + project
+ depInfosMap[key] = append(depInfosMap[key], dep)
+ }
+ for key, dependencies := range depInfosMap {
+ splitKey := strings.Split(key, "/")
+ domain, project := splitKey[0], splitKey[1]
+ putil.SetDomain(ctx, domain)
+ putil.SetProject(ctx, project)
+ opts, err := esync.GenUpdateOpts(ctx, datasource.ResourceDependency, dependencies)
+ if err != nil {
+ log.Error("fail to create dep opts", err)
+ return err
+ }
+ syncOpts = append(syncOpts, opts...)
+ }
+ err = etcdadpt.Txn(ctx, syncOpts)
+ if err != nil {
+ log.Error("fail to create dep tasks", err)
+ }
+ return err
+}
+
+func getDomainProject(key string, prefixKey string) (domain string, project string, err error) {
+ splitKey := strings.Split(key, prefixKey)
+ if len(splitKey) != 2 {
+ return "", "", ErrWithoutDomainProject
+ }
+ suffixKey := splitKey[len(splitKey)-1]
+ splitStr := strings.Split(suffixKey, "/")
+ if len(splitStr) < 2 {
+ return "", "", ErrWithoutDomainProject
+ }
+ domain = splitStr[0]
+ project = splitStr[1]
+ return
+}
diff --git a/datasource/etcd/sync_test.go b/datasource/etcd/sync_test.go
new file mode 100644
index 0000000..f4400b0
--- /dev/null
+++ b/datasource/etcd/sync_test.go
@@ -0,0 +1,474 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except request compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to request writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package etcd_test
+
+import (
+ "context"
+ "strconv"
+ "testing"
+
+ pb "github.com/go-chassis/cari/discovery"
+ crbac "github.com/go-chassis/cari/rbac"
+ "github.com/go-chassis/cari/sync"
+ "github.com/go-chassis/go-archaius"
+ "github.com/little-cui/etcdadpt"
+ "github.com/stretchr/testify/assert"
+
+ "github.com/apache/servicecomb-service-center/datasource"
+ "github.com/apache/servicecomb-service-center/datasource/etcd"
+ "github.com/apache/servicecomb-service-center/datasource/rbac"
+ "github.com/apache/servicecomb-service-center/datasource/schema"
+ "github.com/apache/servicecomb-service-center/eventbase/model"
+ "github.com/apache/servicecomb-service-center/eventbase/service/task"
+ "github.com/apache/servicecomb-service-center/eventbase/service/tombstone"
+ "github.com/apache/servicecomb-service-center/pkg/util"
+ _ "github.com/apache/servicecomb-service-center/test"
+)
+
+func syncAllContext() context.Context {
+ ctx := util.WithNoCache(util.SetDomainProject(context.Background(), "sync-all", "sync-all"))
+ return util.WithNoCache(util.SetContext(ctx, util.CtxEnableSync, "1"))
+}
+
+func TestSyncAll(t *testing.T) {
+ t.Run("enableOnStart is false will not do sync", func(t *testing.T) {
+ _ = archaius.Set("sync.enableOnStart", false)
+ err := datasource.GetSyncManager().SyncAll(syncAllContext())
+ assert.Nil(t, err)
+ })
+
+ t.Run("enableOnStart is true and syncAllKey exists will not do sync", func(t *testing.T) {
+ _ = archaius.Set("sync.enableOnStart", true)
+ err := etcdadpt.Put(syncAllContext(), etcd.SyncAllKey, "1")
+ assert.Nil(t, err)
+ err = datasource.GetSyncManager().SyncAll(syncAllContext())
+ assert.Equal(t, datasource.ErrSyncAllKeyExists, err)
+ isDeleted, err := etcdadpt.Delete(syncAllContext(), etcd.SyncAllKey)
+ assert.Equal(t, isDeleted, true)
+ assert.Nil(t, err)
+ })
+
+ t.Run("enableOnStart is true and syncAllKey not exists will do sync", func(t *testing.T) {
+ _ = archaius.Set("sync.enableOnStart", true)
+ var serviceID string
+ var accountName string
+ var roleName string
+ var consumerID string
+ var providerID string
+ t.Run("register a service and delete the task should pass", func(t *testing.T) {
+ resp, err := datasource.GetMetadataManager().RegisterService(syncAllContext(), &pb.CreateServiceRequest{
+ Service: &pb.MicroService{
+ AppId: "sync_micro_service_group",
+ ServiceName: "sync_micro_service_sync_all",
+ Version: "1.0.0",
+ Level: "FRONT",
+ Status: pb.MS_UP,
+ },
+ })
+ assert.NotNil(t, resp)
+ assert.NoError(t, err)
+ assert.Equal(t, pb.ResponseSuccess, resp.Response.GetCode())
+ serviceID = resp.ServiceId
+ listTaskReq := model.ListTaskRequest{
+ Domain: "sync-all",
+ Project: "sync-all",
+ ResourceType: datasource.ResourceService,
+ Action: sync.CreateAction,
+ Status: sync.PendingStatus,
+ }
+ tasks, err := task.List(syncAllContext(), &listTaskReq)
+ assert.NoError(t, err)
+ assert.Equal(t, 1, len(tasks))
+ err = task.Delete(syncAllContext(), tasks...)
+ assert.NoError(t, err)
+ tasks, err = task.List(syncAllContext(), &listTaskReq)
+ assert.NoError(t, err)
+ assert.Equal(t, 0, len(tasks))
+ })
+ t.Run("create a account and delete the task should pass", func(t *testing.T) {
+ a1 := crbac.Account{
+ ID: "sync-create-11111-sync-all",
+ Name: "sync-create-account1-sync-all",
+ Password: "tnuocca-tset",
+ Roles: []string{"admin"},
+ TokenExpirationTime: "2020-12-30",
+ CurrentPassword: "tnuocca-tset1",
+ }
+ err := rbac.Instance().CreateAccount(syncAllContext(), &a1)
+ assert.NoError(t, err)
+ accountName = a1.Name
+ r, err := rbac.Instance().GetAccount(syncAllContext(), a1.Name)
+ assert.NoError(t, err)
+ assert.Equal(t, a1, *r)
+ listTaskReq := model.ListTaskRequest{
+ Domain: "",
+ Project: "",
+ ResourceType: datasource.ResourceAccount,
+ }
+ tasks, err := task.List(syncAllContext(), &listTaskReq)
+ assert.NoError(t, err)
+ assert.Equal(t, 1, len(tasks))
+ err = task.Delete(syncAllContext(), tasks...)
+ assert.NoError(t, err)
+ tasks, err = task.List(syncAllContext(), &listTaskReq)
+ assert.NoError(t, err)
+ assert.Equal(t, 0, len(tasks))
+ })
+ t.Run("create a role and delete the task should pass", func(t *testing.T) {
+ r1 := crbac.Role{
+ ID: "create-11111-sync-all",
+ Name: "create-role-sync-all",
+ Perms: nil,
+ }
+ err := rbac.Instance().CreateRole(syncAllContext(), &r1)
+ assert.NoError(t, err)
+ r, err := rbac.Instance().GetRole(syncAllContext(), "create-role-sync-all")
+ assert.NoError(t, err)
+ assert.Equal(t, r1, *r)
+ dt, _ := strconv.Atoi(r.CreateTime)
+ assert.Less(t, 0, dt)
+ assert.Equal(t, r.CreateTime, r.UpdateTime)
+ roleName = r1.Name
+ listTaskReq := model.ListTaskRequest{
+ Domain: "",
+ Project: "",
+ ResourceType: datasource.ResourceRole,
+ }
+ tasks, err := task.List(syncAllContext(), &listTaskReq)
+ assert.NoError(t, err)
+ assert.Equal(t, 1, len(tasks))
+ err = task.Delete(syncAllContext(), tasks...)
+ assert.NoError(t, err)
+ })
+ t.Run("put content with valid request and delete three task should pass", func(t *testing.T) {
+ err := schema.Instance().PutContent(syncAllContext(), &schema.PutContentRequest{
+ ServiceID: serviceID,
+ SchemaID: "schemaID_sync_all",
+ Content: &schema.ContentItem{
+ Hash: "hash_sync_all",
+ Summary: "summary_sync_all",
+ Content: "1111111111",
+ },
+ })
+ assert.NoError(t, err)
+ ref, err := schema.Instance().GetRef(syncAllContext(), &schema.RefRequest{
+ ServiceID: serviceID,
+ SchemaID: "schemaID_sync_all",
+ })
+ assert.NoError(t, err)
+ assert.NotNil(t, ref)
+ assert.Equal(t, "summary_sync_all", ref.Summary)
+ assert.Equal(t, "hash_sync_all", ref.Hash)
+ listTaskReq := model.ListTaskRequest{
+ Domain: "sync-all",
+ Project: "sync-all",
+ Action: sync.UpdateAction,
+ ResourceType: datasource.ResourceKV,
+ Status: sync.PendingStatus,
+ }
+ tasks, err := task.List(syncAllContext(), &listTaskReq)
+ assert.NoError(t, err)
+ assert.Equal(t, 3, len(tasks))
+ err = task.Delete(syncAllContext(), tasks...)
+ assert.NoError(t, err)
+ })
+ t.Run("update a service tag and delete the task should pass", func(t *testing.T) {
+ err := datasource.GetMetadataManager().PutManyTags(syncAllContext(), &pb.AddServiceTagsRequest{
+ ServiceId: serviceID,
+ Tags: map[string]string{
+ "a": "test",
+ "b": "b",
+ },
+ })
+ assert.NoError(t, err)
+ listTaskReq := model.ListTaskRequest{
+ Domain: "sync-all",
+ Project: "sync-all",
+ ResourceType: datasource.ResourceKV,
+ Action: sync.UpdateAction,
+ Status: sync.PendingStatus,
+ }
+ tasks, err := task.List(syncAllContext(), &listTaskReq)
+ assert.NoError(t, err)
+ assert.Equal(t, 1, len(tasks))
+ err = task.Delete(syncAllContext(), tasks...)
+ assert.NoError(t, err)
+ tasks, err = task.List(syncAllContext(), &listTaskReq)
+ assert.NoError(t, err)
+ assert.Equal(t, 0, len(tasks))
+ })
+ t.Run("create a consumer service will create a service task should pass", func(t *testing.T) {
+ resp, err := datasource.GetMetadataManager().RegisterService(syncAllContext(), &pb.CreateServiceRequest{
+ Service: &pb.MicroService{
+ AppId: "sync_dep_group_sync_all",
+ ServiceName: "sync_dep_consumer_sync_all",
+ Version: "1.0.0",
+ Level: "FRONT",
+ Status: pb.MS_UP,
+ },
+ })
+ assert.NotNil(t, resp)
+ assert.NoError(t, err)
+ consumerID = resp.ServiceId
+ listTaskReq := model.ListTaskRequest{
+ Domain: "sync-all",
+ Project: "sync-all",
+ ResourceType: datasource.ResourceService,
+ Action: sync.CreateAction,
+ Status: sync.PendingStatus,
+ }
+ tasks, err := task.List(syncAllContext(), &listTaskReq)
+ assert.NoError(t, err)
+ assert.Equal(t, 1, len(tasks))
+ err = task.Delete(syncAllContext(), tasks...)
+ assert.NoError(t, err)
+ tasks, err = task.List(syncAllContext(), &listTaskReq)
+ assert.NoError(t, err)
+ assert.Equal(t, 0, len(tasks))
+ })
+ t.Run("create one provider service will create one service task should pass", func(t *testing.T) {
+ resp, err := datasource.GetMetadataManager().RegisterService(syncAllContext(), &pb.CreateServiceRequest{
+ Service: &pb.MicroService{
+ AppId: "sync_dep_group_sync_all",
+ ServiceName: "sync_dep_provider_sync_all",
+ Version: "1.0.0",
+ Level: "FRONT",
+ Status: pb.MS_UP,
+ },
+ })
+ assert.NotNil(t, resp)
+ assert.NoError(t, err)
+ providerID = resp.ServiceId
+
+ listTaskReq := model.ListTaskRequest{
+ Domain: "sync-all",
+ Project: "sync-all",
+ ResourceType: datasource.ResourceService,
+ Action: sync.CreateAction,
+ Status: sync.PendingStatus,
+ }
+ tasks, err := task.List(syncAllContext(), &listTaskReq)
+ assert.NoError(t, err)
+ assert.Equal(t, 1, len(tasks))
+ err = task.Delete(syncAllContext(), tasks...)
+ assert.NoError(t, err)
+ tasks, err = task.List(syncAllContext(), &listTaskReq)
+ assert.NoError(t, err)
+ assert.Equal(t, 0, len(tasks))
+ })
+ t.Run("create dependencies for microServices will create a dependency task should pass", func(t *testing.T) {
+ consumer := &pb.MicroServiceKey{
+ ServiceName: "sync_dep_consumer_sync_all",
+ AppId: "sync_dep_group_sync_all",
+ Version: "1.0.0",
+ }
+ err := datasource.GetDependencyManager().PutDependencies(syncAllContext(), []*pb.ConsumerDependency{
+ {
+ Consumer: consumer,
+ Providers: []*pb.MicroServiceKey{
+ {
+ AppId: "sync_dep_group_sync_all",
+ ServiceName: "sync_dep_provider_sync_all",
+ },
+ },
+ },
+ }, true)
+ assert.NoError(t, err)
+
+ listTaskReq := model.ListTaskRequest{
+ Domain: "sync-all",
+ Project: "sync-all",
+ ResourceType: datasource.ResourceDependency,
+ Action: sync.CreateAction,
+ Status: sync.PendingStatus,
+ }
+ tasks, err := task.List(syncAllContext(), &listTaskReq)
+ assert.NoError(t, err)
+ assert.Equal(t, 1, len(tasks))
+ err = task.Delete(syncAllContext(), tasks...)
+ assert.NoError(t, err)
+ tasks, err = task.List(syncAllContext(), &listTaskReq)
+ assert.NoError(t, err)
+ assert.Equal(t, 0, len(tasks))
+ })
+
+ t.Run("do sync will create task should pass", func(t *testing.T) {
+ err := datasource.GetSyncManager().SyncAll(syncAllContext())
+ assert.Nil(t, err)
+ listServiceTaskReq := model.ListTaskRequest{
+ Domain: "sync-all",
+ Project: "sync-all",
+ ResourceType: datasource.ResourceService,
+ }
+ tasks, err := task.List(syncAllContext(), &listServiceTaskReq)
+ assert.NoError(t, err)
+ assert.Equal(t, 3, len(tasks))
+ err = task.Delete(syncAllContext(), tasks...)
+ assert.NoError(t, err)
+ err = task.Delete(syncAllContext(), tasks...)
+ assert.NoError(t, err)
+ listKVTaskReq := model.ListTaskRequest{
+ Domain: "sync-all",
+ Project: "sync-all",
+ ResourceType: datasource.ResourceKV,
+ }
+ tasks, err = task.List(syncAllContext(), &listKVTaskReq)
+ assert.NoError(t, err)
+ // three schema and one tag
+ assert.Equal(t, 4, len(tasks))
+ err = task.Delete(syncAllContext(), tasks...)
+ assert.NoError(t, err)
+ err = task.Delete(syncAllContext(), tasks...)
+ assert.NoError(t, err)
+ listAccountTaskReq := model.ListTaskRequest{
+ Domain: "",
+ Project: "",
+ ResourceType: datasource.ResourceAccount,
+ }
+ tasks, err = task.List(syncAllContext(), &listAccountTaskReq)
+ assert.NoError(t, err)
+ assert.Equal(t, 1, len(tasks))
+ err = task.Delete(syncAllContext(), tasks...)
+ assert.NoError(t, err)
+ err = task.Delete(syncAllContext(), tasks...)
+ assert.NoError(t, err)
+ listRoleTaskReq := model.ListTaskRequest{
+ Domain: "",
+ Project: "",
+ ResourceType: datasource.ResourceRole,
+ }
+ tasks, err = task.List(syncAllContext(), &listRoleTaskReq)
+ assert.NoError(t, err)
+ assert.Equal(t, 1, len(tasks))
+ err = task.Delete(syncAllContext(), tasks...)
+ assert.NoError(t, err)
+ err = task.Delete(syncAllContext(), tasks...)
+ assert.NoError(t, err)
+ listDepTaskReq := model.ListTaskRequest{
+ Domain: "sync-all",
+ Project: "sync-all",
+ ResourceType: datasource.ResourceDependency,
+ }
+ tasks, err = task.List(syncAllContext(), &listDepTaskReq)
+ assert.NoError(t, err)
+ assert.Equal(t, 1, len(tasks))
+ err = task.Delete(syncAllContext(), tasks...)
+ assert.NoError(t, err)
+ err = task.Delete(syncAllContext(), tasks...)
+ assert.NoError(t, err)
+ exist, err := etcdadpt.Exist(syncAllContext(), etcd.SyncAllKey)
+ assert.Equal(t, true, exist)
+ assert.Nil(t, err)
+ })
+
+ t.Run("delete all resources should pass", func(t *testing.T) {
+ err := schema.Instance().DeleteRef(syncAllContext(), &schema.RefRequest{
+ ServiceID: serviceID,
+ SchemaID: "schemaID_sync_all",
+ })
+ assert.NoError(t, err)
+ err = datasource.GetMetadataManager().DeleteSchema(syncAllContext(), &pb.DeleteSchemaRequest{
+ ServiceId: serviceID,
+ SchemaId: "schemaID_sync_all",
+ })
+ err = datasource.GetMetadataManager().UnregisterService(syncAllContext(), &pb.DeleteServiceRequest{
+ ServiceId: serviceID,
+ Force: true,
+ })
+ assert.NoError(t, err)
+ _, err = rbac.Instance().DeleteAccount(syncAllContext(), []string{accountName})
+ assert.NoError(t, err)
+ _, err = rbac.Instance().DeleteRole(syncAllContext(), roleName)
+ assert.NoError(t, err)
+ err = datasource.GetMetadataManager().UnregisterService(syncAllContext(), &pb.DeleteServiceRequest{
+ ServiceId: consumerID, Force: true,
+ })
+ assert.NoError(t, err)
+
+ err = datasource.GetMetadataManager().UnregisterService(syncAllContext(), &pb.DeleteServiceRequest{
+ ServiceId: providerID, Force: true,
+ })
+ assert.NoError(t, err)
+
+ listSeviceTaskReq := model.ListTaskRequest{
+ Domain: "sync-all",
+ Project: "sync-all",
+ ResourceType: datasource.ResourceService,
+ }
+ tasks, err := task.List(syncAllContext(), &listSeviceTaskReq)
+ assert.NoError(t, err)
+ assert.Equal(t, 3, len(tasks))
+ err = task.Delete(syncAllContext(), tasks...)
+ assert.NoError(t, err)
+ tasks, err = task.List(syncAllContext(), &listSeviceTaskReq)
+ assert.NoError(t, err)
+ assert.Equal(t, 0, len(tasks))
+ listAccountTaskReq := model.ListTaskRequest{
+ Domain: "",
+ Project: "",
+ ResourceType: datasource.ResourceAccount,
+ }
+ tasks, err = task.List(syncAllContext(), &listAccountTaskReq)
+ assert.NoError(t, err)
+ assert.Equal(t, 1, len(tasks))
+ err = task.Delete(syncAllContext(), tasks...)
+ assert.NoError(t, err)
+ tasks, err = task.List(syncAllContext(), &listAccountTaskReq)
+ assert.NoError(t, err)
+ assert.Equal(t, 0, len(tasks))
+
+ listRoleTaskReq := model.ListTaskRequest{
+ Domain: "",
+ Project: "",
+ ResourceType: datasource.ResourceRole,
+ }
+ tasks, err = task.List(syncAllContext(), &listRoleTaskReq)
+ assert.NoError(t, err)
+ assert.Equal(t, 1, len(tasks))
+ err = task.Delete(syncAllContext(), tasks...)
+ assert.NoError(t, err)
+ tasks, err = task.List(syncAllContext(), &listRoleTaskReq)
+ assert.NoError(t, err)
+ assert.Equal(t, 0, len(tasks))
+
+ listKVTaskReq := model.ListTaskRequest{
+ Domain: "sync-all",
+ Project: "sync-all",
+ ResourceType: datasource.ResourceKV,
+ }
+ tasks, err = task.List(syncAllContext(), &listKVTaskReq)
+ assert.NoError(t, err)
+ assert.Equal(t, 2, len(tasks))
+ err = task.Delete(syncAllContext(), tasks...)
+ assert.NoError(t, err)
+ tasks, err = task.List(syncAllContext(), &listKVTaskReq)
+ assert.NoError(t, err)
+ assert.Equal(t, 0, len(tasks))
+
+ tombstoneListReq := model.ListTombstoneRequest{
+ Domain: "sync-all",
+ Project: "sync-all",
+ }
+ tombstones, err := tombstone.List(syncAllContext(), &tombstoneListReq)
+ assert.NoError(t, err)
+ assert.Equal(t, 7, len(tombstones))
+ err = tombstone.Delete(syncAllContext(), tombstones...)
+ assert.NoError(t, err)
+ })
+ })
+}
diff --git a/datasource/manager.go b/datasource/manager.go
index 9d3e5c2..9ede3fa 100644
--- a/datasource/manager.go
+++ b/datasource/manager.go
@@ -18,6 +18,7 @@
package datasource
import (
+ "context"
"fmt"
"github.com/go-chassis/cari/dlock"
@@ -51,6 +52,10 @@ func Init(opts Options) error {
if err != nil {
return err
}
+ err = GetSyncManager().SyncAll(context.Background())
+ if err != nil && err != ErrSyncAllKeyExists {
+ return err
+ }
err = schema.Init(schema.Options{Kind: opts.Kind})
if err != nil {
return err
@@ -100,3 +105,6 @@ func GetDependencyManager() DependencyManager {
func GetMetricsManager() MetricsManager {
return dataSourceInst.MetricsManager()
}
+func GetSyncManager() SyncManager {
+ return dataSourceInst.SyncManager()
+}
diff --git a/datasource/mongo/mongo.go b/datasource/mongo/mongo.go
index 51d2506..bbde3f4 100644
--- a/datasource/mongo/mongo.go
+++ b/datasource/mongo/mongo.go
@@ -44,6 +44,7 @@ type DataSource struct {
depManager datasource.DependencyManager
scManager datasource.SCManager
metricsManager datasource.MetricsManager
+ syncManager datasource.SyncManager
}
func (ds *DataSource) SystemManager() datasource.SystemManager {
@@ -66,6 +67,10 @@ func (ds *DataSource) MetricsManager() datasource.MetricsManager {
return ds.metricsManager
}
+func (ds *DataSource) SyncManager() datasource.SyncManager {
+ return ds.syncManager
+}
+
func NewDataSource(opts datasource.Options) (datasource.DataSource, error) {
// TODO: construct a reasonable DataSource instance
inst := &DataSource{}
@@ -82,6 +87,7 @@ func NewDataSource(opts datasource.Options) (datasource.DataSource, error) {
InstanceProperties: opts.InstanceProperties,
}
inst.metricsManager = &MetricsManager{}
+ inst.syncManager = &SyncManager{}
return inst, nil
}
diff --git a/datasource/datasource.go b/datasource/mongo/sync.go
similarity index 68%
copy from datasource/datasource.go
copy to datasource/mongo/sync.go
index ae0727b..7fcaaa1 100644
--- a/datasource/datasource.go
+++ b/datasource/mongo/sync.go
@@ -15,13 +15,20 @@
* limitations under the License.
*/
-package datasource
+package mongo
-// DataSource is the DAO layer
-type DataSource interface {
- SystemManager() SystemManager
- DependencyManager() DependencyManager
- MetadataManager() MetadataManager
- SCManager() SCManager
- MetricsManager() MetricsManager
+import (
+ "context"
+
+ "github.com/apache/servicecomb-service-center/pkg/log"
+)
+
+type SyncManager struct {
+}
+
+// SyncAll will list all services,accounts,roles,schemas,tags,deps and use tasks to store
+func (s *SyncManager) SyncAll(ctx context.Context) error {
+ // TODO mongo should implement it
+ log.Info("Mongo does not implement this method")
+ return nil
}
diff --git a/datasource/datasource.go b/datasource/sync.go
similarity index 65%
copy from datasource/datasource.go
copy to datasource/sync.go
index ae0727b..81205c2 100644
--- a/datasource/datasource.go
+++ b/datasource/sync.go
@@ -3,12 +3,12 @@
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
+ * (the "License"); you may not use this file except request compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
+ * Unless required by applicable law or agreed to request writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
@@ -17,11 +17,13 @@
package datasource
-// DataSource is the DAO layer
-type DataSource interface {
- SystemManager() SystemManager
- DependencyManager() DependencyManager
- MetadataManager() MetadataManager
- SCManager() SCManager
- MetricsManager() MetricsManager
+import (
+ "context"
+ "errors"
+)
+
+var ErrSyncAllKeyExists = errors.New("sync all key already exists")
+
+type SyncManager interface {
+ SyncAll(ctx context.Context) error
}