You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by li...@apache.org on 2022/01/25 13:27:32 UTC

[servicecomb-service-center] branch master updated: [feat] add SyncAll func (#1245)

This is an automated email from the ASF dual-hosted git repository.

littlecui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-service-center.git


The following commit(s) were added to refs/heads/master by this push:
     new ae18050  [feat] add SyncAll func (#1245)
ae18050 is described below

commit ae180500f95f699e6c7ac071c681ec0a1ba00d12
Author: robotljw <79...@qq.com>
AuthorDate: Tue Jan 25 21:27:23 2022 +0800

    [feat] add SyncAll func (#1245)
---
 datasource/datasource.go                    |   1 +
 datasource/etcd/etcd.go                     |  36 ++-
 datasource/etcd/path/key_generator_test.go  |   2 +
 datasource/etcd/sync.go                     | 405 ++++++++++++++++++++++++
 datasource/etcd/sync_test.go                | 474 ++++++++++++++++++++++++++++
 datasource/manager.go                       |   8 +
 datasource/mongo/mongo.go                   |   6 +
 datasource/{datasource.go => mongo/sync.go} |  23 +-
 datasource/{datasource.go => sync.go}       |  20 +-
 9 files changed, 944 insertions(+), 31 deletions(-)

diff --git a/datasource/datasource.go b/datasource/datasource.go
index ae0727b..0c8a8cb 100644
--- a/datasource/datasource.go
+++ b/datasource/datasource.go
@@ -24,4 +24,5 @@ type DataSource interface {
 	MetadataManager() MetadataManager
 	SCManager() SCManager
 	MetricsManager() MetricsManager
+	SyncManager() SyncManager
 }
diff --git a/datasource/etcd/etcd.go b/datasource/etcd/etcd.go
index 55884bf..e4f0cdd 100644
--- a/datasource/etcd/etcd.go
+++ b/datasource/etcd/etcd.go
@@ -1,17 +1,19 @@
-// Licensed to the Apache Software Foundation (ASF) under one or more
-// contributor license agreements.  See the NOTICE file distributed with
-// this work for additional information regarding copyright ownership.
-// The ASF licenses this file to You under the Apache License, Version 2.0
-// (the "License"); you may not use this file except in compliance with
-// the License.  You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 
 package etcd
 
@@ -55,6 +57,7 @@ type DataSource struct {
 	depManager      datasource.DependencyManager
 	scManager       datasource.SCManager
 	metricsManager  datasource.MetricsManager
+	syncManager     datasource.SyncManager
 }
 
 func (ds *DataSource) SystemManager() datasource.SystemManager {
@@ -77,6 +80,10 @@ func (ds *DataSource) MetricsManager() datasource.MetricsManager {
 	return ds.metricsManager
 }
 
+func (ds *DataSource) SyncManager() datasource.SyncManager {
+	return ds.syncManager
+}
+
 func NewDataSource(opts datasource.Options) (datasource.DataSource, error) {
 	log.Warn("data source enable etcd mode")
 
@@ -105,6 +112,7 @@ func NewDataSource(opts datasource.Options) (datasource.DataSource, error) {
 	inst.depManager = &DepManager{}
 	inst.scManager = &SCManager{}
 	inst.metricsManager = &MetricsManager{}
+	inst.syncManager = &SyncManager{}
 	return inst, nil
 }
 
diff --git a/datasource/etcd/path/key_generator_test.go b/datasource/etcd/path/key_generator_test.go
index 7e132d8..82541c3 100644
--- a/datasource/etcd/path/key_generator_test.go
+++ b/datasource/etcd/path/key_generator_test.go
@@ -41,9 +41,11 @@ func TestGenerateETCDDomainKey(t *testing.T) {
 func TestGenerateAccountKey(t *testing.T) {
 	assert.Equal(t, "/cse-sr/accounts/admin", path.GenerateAccountKey("admin"))
 }
+
 func TestGenerateAccountSecretKey(t *testing.T) {
 	assert.Equal(t, "/cse-sr/rbac/secret", path.GenerateRBACSecretKey())
 }
+
 func TestGenerateDependencyRuleKey(t *testing.T) {
 	// consumer
 	k := path.GenerateConsumerDependencyRuleKey("a", nil)
diff --git a/datasource/etcd/sync.go b/datasource/etcd/sync.go
new file mode 100644
index 0000000..0c0c5d4
--- /dev/null
+++ b/datasource/etcd/sync.go
@@ -0,0 +1,405 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except request compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to request writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package etcd
+
+import (
+	"context"
+	"encoding/json"
+	"errors"
+	"fmt"
+	"strings"
+
+	"github.com/go-chassis/cari/discovery"
+	crbac "github.com/go-chassis/cari/rbac"
+	"github.com/little-cui/etcdadpt"
+
+	"github.com/apache/servicecomb-service-center/datasource"
+	"github.com/apache/servicecomb-service-center/datasource/etcd/path"
+	esync "github.com/apache/servicecomb-service-center/datasource/etcd/sync"
+	"github.com/apache/servicecomb-service-center/pkg/log"
+	putil "github.com/apache/servicecomb-service-center/pkg/util"
+	"github.com/apache/servicecomb-service-center/server/config"
+)
+
+const (
+	SyncAllKey = "/cse-sr/sync-all"
+)
+
+var (
+	ErrWithoutDomainProject = errors.New("key without domain and project")
+)
+
+type SyncManager struct {
+}
+
+// SyncAll will list all services,accounts,roles,schemas,tags,deps and use tasks to store
+func (s *SyncManager) SyncAll(ctx context.Context) error {
+	enable := config.GetBool("sync.enableOnStart", false)
+	if !enable {
+		return nil
+	}
+	exist, err := etcdadpt.Exist(ctx, SyncAllKey)
+	if err != nil {
+		return err
+	}
+	if exist {
+		log.Info(fmt.Sprintf("%s key already exists, do not need to do tasks", SyncAllKey))
+		return datasource.ErrSyncAllKeyExists
+	}
+	err = syncAllAccounts(ctx)
+	if err != nil {
+		return err
+	}
+	err = syncAllRoles(ctx)
+	if err != nil {
+		return err
+	}
+	err = syncAllServices(ctx)
+	if err != nil {
+		return err
+	}
+	err = syncAllTags(ctx)
+	if err != nil {
+		return err
+	}
+	err = syncAllSchemas(ctx)
+	if err != nil {
+		return err
+	}
+	err = syncAllDependencies(ctx)
+	if err != nil {
+		return err
+	}
+	return etcdadpt.Put(ctx, SyncAllKey, "1")
+}
+
+func syncAllAccounts(ctx context.Context) error {
+	kvs, _, err := etcdadpt.List(ctx, path.GenerateRBACAccountKey(""))
+	if err != nil {
+		return err
+	}
+	syncOpts := make([]etcdadpt.OpOptions, 0)
+	putil.SetDomain(ctx, "")
+	putil.SetProject(ctx, "")
+	for _, v := range kvs {
+		a := &crbac.Account{}
+		err = json.Unmarshal(v.Value, a)
+		if err != nil {
+			log.Error("fail to unmarshal account ", err)
+			return err
+		}
+		opt, err := esync.GenCreateOpts(ctx, datasource.ResourceAccount, a)
+		if err != nil {
+			log.Error("fail to create sync opts", err)
+			return err
+		}
+		syncOpts = append(syncOpts, opt...)
+	}
+	err = etcdadpt.Txn(ctx, syncOpts)
+	if err != nil {
+		log.Error("fail to account tasks", err)
+	}
+	return err
+}
+
+func syncAllRoles(ctx context.Context) error {
+	kvs, _, err := etcdadpt.List(ctx, path.GenerateRBACRoleKey(""))
+	if err != nil {
+		return err
+	}
+	syncOpts := make([]etcdadpt.OpOptions, 0)
+	putil.SetDomain(ctx, "")
+	putil.SetProject(ctx, "")
+	for _, v := range kvs {
+		r := &crbac.Role{}
+		err = json.Unmarshal(v.Value, r)
+		if err != nil {
+			log.Error("fail to unmarshal role", err)
+			return err
+		}
+		opt, err := esync.GenCreateOpts(ctx, datasource.ResourceRole, r)
+		if err != nil {
+			log.Error("fail to create sync opts", err)
+			return err
+		}
+		syncOpts = append(syncOpts, opt...)
+	}
+	err = etcdadpt.Txn(ctx, syncOpts)
+	if err != nil {
+		log.Error("fail to role tasks", err)
+	}
+	return err
+}
+
+// syncAllTags func use kv resource task to store tags
+func syncAllTags(ctx context.Context) error {
+	kvs, _, err := etcdadpt.List(ctx, path.GetServiceTagRootKey(""))
+	if err != nil {
+		return err
+	}
+	syncOpts := make([]etcdadpt.OpOptions, 0)
+	for _, kv := range kvs {
+		domain, project, err := getDomainProject(string(kv.Key), path.GetServiceTagRootKey(""))
+		if err != nil {
+			log.Error("fail to get domain and project", err)
+			return err
+		}
+		putil.SetDomain(ctx, domain)
+		putil.SetProject(ctx, project)
+		opts, err := esync.GenCreateOpts(ctx, datasource.ResourceKV, kv.Value,
+			esync.WithOpts(map[string]string{"key": string(kv.Key)}))
+		if err != nil {
+			log.Error("fail to create tag opts", err)
+			return err
+		}
+		syncOpts = append(syncOpts, opts...)
+	}
+	err = etcdadpt.Txn(ctx, syncOpts)
+	if err != nil {
+		log.Error("fail to create tag tasks", err)
+	}
+	return err
+}
+
+func syncAllServices(ctx context.Context) error {
+	kvs, _, err := etcdadpt.List(ctx, path.GetServiceRootKey(""))
+	if err != nil {
+		return err
+	}
+	syncOpts := make([]etcdadpt.OpOptions, 0)
+	for _, kv := range kvs {
+		service := &discovery.MicroService{}
+		err := json.Unmarshal(kv.Value, service)
+		if err != nil {
+			log.Error("fail to unmarshal service", err)
+			return err
+		}
+		domain, project, err := getDomainProject(string(kv.Key), path.GetServiceRootKey(""))
+		if err != nil {
+			log.Error("fail to get domain and project", err)
+			return err
+		}
+		putil.SetDomain(ctx, domain)
+		putil.SetProject(ctx, project)
+		request := &discovery.CreateServiceRequest{
+			Service: service,
+		}
+		opts, err := esync.GenCreateOpts(ctx, datasource.ResourceService, request)
+		if err != nil {
+			log.Error("fail to create service task", err)
+			return err
+		}
+		syncOpts = append(syncOpts, opts...)
+	}
+	err = etcdadpt.Txn(ctx, syncOpts)
+	if err != nil {
+		log.Error("fail to create service tasks", err)
+	}
+	return err
+}
+
+// syncAllSchemas func use kv resource task to store schemas
+func syncAllSchemas(ctx context.Context) error {
+	putil.SetDomain(ctx, "")
+	putil.SetProject(ctx, "")
+	err := syncAllServiceSchemas(ctx)
+	if err != nil {
+		return err
+	}
+	err = syncAllServiceSchemaRefs(ctx)
+	if err != nil {
+		return err
+	}
+	err = syncAllServiceSchemaContents(ctx)
+	if err != nil {
+		return err
+	}
+	return syncAllServiceSchemaSummaries(ctx)
+}
+
+func syncAllServiceSchemas(ctx context.Context) error {
+	kvs, _, err := etcdadpt.List(ctx, path.GetServiceSchemaRootKey(""))
+	if err != nil {
+		return err
+	}
+	syncOpts := make([]etcdadpt.OpOptions, 0)
+	for _, kv := range kvs {
+		domain, project, err := getDomainProject(string(kv.Key), path.GetServiceSchemaRootKey(""))
+		if err != nil {
+			log.Error("fail to get domain and project", err)
+			return err
+		}
+		putil.SetDomain(ctx, domain)
+		putil.SetProject(ctx, project)
+		opts, err := esync.GenCreateOpts(ctx, datasource.ResourceKV, kv.Value,
+			esync.WithOpts(map[string]string{"key": string(kv.Key)}))
+		if err != nil {
+			log.Error("fail to create schema opts", err)
+			return err
+		}
+		syncOpts = append(syncOpts, opts...)
+	}
+	err = etcdadpt.Txn(ctx, syncOpts)
+	if err != nil {
+		log.Error("fail to create schema tasks", err)
+	}
+	return err
+}
+
+func syncAllServiceSchemaRefs(ctx context.Context) error {
+	kvs, _, err := etcdadpt.List(ctx, path.GetServiceSchemaRefRootKey(""))
+	if err != nil {
+		return err
+	}
+	syncOpts := make([]etcdadpt.OpOptions, 0)
+	for _, kv := range kvs {
+		domain, project, err := getDomainProject(string(kv.Key), path.GetServiceSchemaRefRootKey(""))
+		if err != nil {
+			log.Error("fail to get domain and project", err)
+			return err
+		}
+		putil.SetDomain(ctx, domain)
+		putil.SetProject(ctx, project)
+		opts, err := esync.GenCreateOpts(ctx, datasource.ResourceKV, kv.Value,
+			esync.WithOpts(map[string]string{"key": string(kv.Key)}))
+		if err != nil {
+			log.Error("fail to create schema ref opts", err)
+			return err
+		}
+		syncOpts = append(syncOpts, opts...)
+	}
+	err = etcdadpt.Txn(ctx, syncOpts)
+	if err != nil {
+		log.Error("fail to create schema ref tasks", err)
+	}
+	return err
+}
+
+func syncAllServiceSchemaContents(ctx context.Context) error {
+	kvs, _, err := etcdadpt.List(ctx, path.GetServiceSchemaContentRootKey(""))
+	if err != nil {
+		return err
+	}
+	syncOpts := make([]etcdadpt.OpOptions, 0)
+	for _, kv := range kvs {
+		domain, project, err := getDomainProject(string(kv.Key), path.GetServiceSchemaContentRootKey(""))
+		if err != nil {
+			log.Error("fail to get domain and project", err)
+			return err
+		}
+		putil.SetDomain(ctx, domain)
+		putil.SetProject(ctx, project)
+		opts, err := esync.GenCreateOpts(ctx, datasource.ResourceKV, kv.Value,
+			esync.WithOpts(map[string]string{"key": string(kv.Key)}))
+		if err != nil {
+			log.Error("fail to create schema content opts", err)
+			return err
+		}
+		syncOpts = append(syncOpts, opts...)
+	}
+	err = etcdadpt.Txn(ctx, syncOpts)
+	if err != nil {
+		log.Error("fail to create schema content tasks", err)
+	}
+	return err
+}
+
+func syncAllServiceSchemaSummaries(ctx context.Context) error {
+	kvs, _, err := etcdadpt.List(ctx, path.GetServiceSchemaSummaryRootKey(""))
+	if err != nil {
+		return err
+	}
+	syncOpts := make([]etcdadpt.OpOptions, 0)
+	for _, kv := range kvs {
+		domain, project, err := getDomainProject(string(kv.Key), path.GetServiceSchemaSummaryRootKey(""))
+		if err != nil {
+			log.Error("fail to get domain and project", err)
+			return err
+		}
+		putil.SetDomain(ctx, domain)
+		putil.SetProject(ctx, project)
+		opts, err := esync.GenCreateOpts(ctx, datasource.ResourceKV, kv.Value,
+			esync.WithOpts(map[string]string{"key": string(kv.Key)}))
+		if err != nil {
+			log.Error("fail to create schema summary opts", err)
+			return err
+		}
+		syncOpts = append(syncOpts, opts...)
+	}
+	err = etcdadpt.Txn(ctx, syncOpts)
+	if err != nil {
+		log.Error("fail to create schema summary tasks", err)
+	}
+	return err
+}
+
+func syncAllDependencies(ctx context.Context) error {
+	kvs, _, err := etcdadpt.List(ctx, path.GetServiceDependencyQueueRootKey(""))
+	if err != nil {
+		return err
+	}
+	syncOpts := make([]etcdadpt.OpOptions, 0)
+	depInfosMap := make(map[string][]*discovery.ConsumerDependency)
+	for _, kv := range kvs {
+		dep := &discovery.ConsumerDependency{}
+		err = json.Unmarshal(kv.Value, dep)
+		if err != nil {
+			log.Error("fail to unmarshal dependency ", err)
+			return err
+		}
+		domain, project, err := getDomainProject(string(kv.Key), path.GetServiceDependencyQueueRootKey(""))
+		if err != nil {
+			log.Error("fail to get domain and project", err)
+			return err
+		}
+		key := domain + "/" + project
+		depInfosMap[key] = append(depInfosMap[key], dep)
+	}
+	for key, dependencies := range depInfosMap {
+		splitKey := strings.Split(key, "/")
+		domain, project := splitKey[0], splitKey[1]
+		putil.SetDomain(ctx, domain)
+		putil.SetProject(ctx, project)
+		opts, err := esync.GenUpdateOpts(ctx, datasource.ResourceDependency, dependencies)
+		if err != nil {
+			log.Error("fail to create dep opts", err)
+			return err
+		}
+		syncOpts = append(syncOpts, opts...)
+	}
+	err = etcdadpt.Txn(ctx, syncOpts)
+	if err != nil {
+		log.Error("fail to create dep tasks", err)
+	}
+	return err
+}
+
+func getDomainProject(key string, prefixKey string) (domain string, project string, err error) {
+	splitKey := strings.Split(key, prefixKey)
+	if len(splitKey) != 2 {
+		return "", "", ErrWithoutDomainProject
+	}
+	suffixKey := splitKey[len(splitKey)-1]
+	splitStr := strings.Split(suffixKey, "/")
+	if len(splitStr) < 2 {
+		return "", "", ErrWithoutDomainProject
+	}
+	domain = splitStr[0]
+	project = splitStr[1]
+	return
+}
diff --git a/datasource/etcd/sync_test.go b/datasource/etcd/sync_test.go
new file mode 100644
index 0000000..f4400b0
--- /dev/null
+++ b/datasource/etcd/sync_test.go
@@ -0,0 +1,474 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except request compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to request writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package etcd_test
+
+import (
+	"context"
+	"strconv"
+	"testing"
+
+	pb "github.com/go-chassis/cari/discovery"
+	crbac "github.com/go-chassis/cari/rbac"
+	"github.com/go-chassis/cari/sync"
+	"github.com/go-chassis/go-archaius"
+	"github.com/little-cui/etcdadpt"
+	"github.com/stretchr/testify/assert"
+
+	"github.com/apache/servicecomb-service-center/datasource"
+	"github.com/apache/servicecomb-service-center/datasource/etcd"
+	"github.com/apache/servicecomb-service-center/datasource/rbac"
+	"github.com/apache/servicecomb-service-center/datasource/schema"
+	"github.com/apache/servicecomb-service-center/eventbase/model"
+	"github.com/apache/servicecomb-service-center/eventbase/service/task"
+	"github.com/apache/servicecomb-service-center/eventbase/service/tombstone"
+	"github.com/apache/servicecomb-service-center/pkg/util"
+	_ "github.com/apache/servicecomb-service-center/test"
+)
+
+func syncAllContext() context.Context {
+	ctx := util.WithNoCache(util.SetDomainProject(context.Background(), "sync-all", "sync-all"))
+	return util.WithNoCache(util.SetContext(ctx, util.CtxEnableSync, "1"))
+}
+
+func TestSyncAll(t *testing.T) {
+	t.Run("enableOnStart is false will not do sync", func(t *testing.T) {
+		_ = archaius.Set("sync.enableOnStart", false)
+		err := datasource.GetSyncManager().SyncAll(syncAllContext())
+		assert.Nil(t, err)
+	})
+
+	t.Run("enableOnStart is true and syncAllKey exists will not do sync", func(t *testing.T) {
+		_ = archaius.Set("sync.enableOnStart", true)
+		err := etcdadpt.Put(syncAllContext(), etcd.SyncAllKey, "1")
+		assert.Nil(t, err)
+		err = datasource.GetSyncManager().SyncAll(syncAllContext())
+		assert.Equal(t, datasource.ErrSyncAllKeyExists, err)
+		isDeleted, err := etcdadpt.Delete(syncAllContext(), etcd.SyncAllKey)
+		assert.Equal(t, isDeleted, true)
+		assert.Nil(t, err)
+	})
+
+	t.Run("enableOnStart is true and syncAllKey not exists will do sync", func(t *testing.T) {
+		_ = archaius.Set("sync.enableOnStart", true)
+		var serviceID string
+		var accountName string
+		var roleName string
+		var consumerID string
+		var providerID string
+		t.Run("register a service and delete the task should pass", func(t *testing.T) {
+			resp, err := datasource.GetMetadataManager().RegisterService(syncAllContext(), &pb.CreateServiceRequest{
+				Service: &pb.MicroService{
+					AppId:       "sync_micro_service_group",
+					ServiceName: "sync_micro_service_sync_all",
+					Version:     "1.0.0",
+					Level:       "FRONT",
+					Status:      pb.MS_UP,
+				},
+			})
+			assert.NotNil(t, resp)
+			assert.NoError(t, err)
+			assert.Equal(t, pb.ResponseSuccess, resp.Response.GetCode())
+			serviceID = resp.ServiceId
+			listTaskReq := model.ListTaskRequest{
+				Domain:       "sync-all",
+				Project:      "sync-all",
+				ResourceType: datasource.ResourceService,
+				Action:       sync.CreateAction,
+				Status:       sync.PendingStatus,
+			}
+			tasks, err := task.List(syncAllContext(), &listTaskReq)
+			assert.NoError(t, err)
+			assert.Equal(t, 1, len(tasks))
+			err = task.Delete(syncAllContext(), tasks...)
+			assert.NoError(t, err)
+			tasks, err = task.List(syncAllContext(), &listTaskReq)
+			assert.NoError(t, err)
+			assert.Equal(t, 0, len(tasks))
+		})
+		t.Run("create a account and delete the task should pass", func(t *testing.T) {
+			a1 := crbac.Account{
+				ID:                  "sync-create-11111-sync-all",
+				Name:                "sync-create-account1-sync-all",
+				Password:            "tnuocca-tset",
+				Roles:               []string{"admin"},
+				TokenExpirationTime: "2020-12-30",
+				CurrentPassword:     "tnuocca-tset1",
+			}
+			err := rbac.Instance().CreateAccount(syncAllContext(), &a1)
+			assert.NoError(t, err)
+			accountName = a1.Name
+			r, err := rbac.Instance().GetAccount(syncAllContext(), a1.Name)
+			assert.NoError(t, err)
+			assert.Equal(t, a1, *r)
+			listTaskReq := model.ListTaskRequest{
+				Domain:       "",
+				Project:      "",
+				ResourceType: datasource.ResourceAccount,
+			}
+			tasks, err := task.List(syncAllContext(), &listTaskReq)
+			assert.NoError(t, err)
+			assert.Equal(t, 1, len(tasks))
+			err = task.Delete(syncAllContext(), tasks...)
+			assert.NoError(t, err)
+			tasks, err = task.List(syncAllContext(), &listTaskReq)
+			assert.NoError(t, err)
+			assert.Equal(t, 0, len(tasks))
+		})
+		t.Run("create a role and delete the task should pass", func(t *testing.T) {
+			r1 := crbac.Role{
+				ID:    "create-11111-sync-all",
+				Name:  "create-role-sync-all",
+				Perms: nil,
+			}
+			err := rbac.Instance().CreateRole(syncAllContext(), &r1)
+			assert.NoError(t, err)
+			r, err := rbac.Instance().GetRole(syncAllContext(), "create-role-sync-all")
+			assert.NoError(t, err)
+			assert.Equal(t, r1, *r)
+			dt, _ := strconv.Atoi(r.CreateTime)
+			assert.Less(t, 0, dt)
+			assert.Equal(t, r.CreateTime, r.UpdateTime)
+			roleName = r1.Name
+			listTaskReq := model.ListTaskRequest{
+				Domain:       "",
+				Project:      "",
+				ResourceType: datasource.ResourceRole,
+			}
+			tasks, err := task.List(syncAllContext(), &listTaskReq)
+			assert.NoError(t, err)
+			assert.Equal(t, 1, len(tasks))
+			err = task.Delete(syncAllContext(), tasks...)
+			assert.NoError(t, err)
+		})
+		t.Run("put content with valid request and delete three task should pass", func(t *testing.T) {
+			err := schema.Instance().PutContent(syncAllContext(), &schema.PutContentRequest{
+				ServiceID: serviceID,
+				SchemaID:  "schemaID_sync_all",
+				Content: &schema.ContentItem{
+					Hash:    "hash_sync_all",
+					Summary: "summary_sync_all",
+					Content: "1111111111",
+				},
+			})
+			assert.NoError(t, err)
+			ref, err := schema.Instance().GetRef(syncAllContext(), &schema.RefRequest{
+				ServiceID: serviceID,
+				SchemaID:  "schemaID_sync_all",
+			})
+			assert.NoError(t, err)
+			assert.NotNil(t, ref)
+			assert.Equal(t, "summary_sync_all", ref.Summary)
+			assert.Equal(t, "hash_sync_all", ref.Hash)
+			listTaskReq := model.ListTaskRequest{
+				Domain:       "sync-all",
+				Project:      "sync-all",
+				Action:       sync.UpdateAction,
+				ResourceType: datasource.ResourceKV,
+				Status:       sync.PendingStatus,
+			}
+			tasks, err := task.List(syncAllContext(), &listTaskReq)
+			assert.NoError(t, err)
+			assert.Equal(t, 3, len(tasks))
+			err = task.Delete(syncAllContext(), tasks...)
+			assert.NoError(t, err)
+		})
+		t.Run("update a service tag and delete the task should pass", func(t *testing.T) {
+			err := datasource.GetMetadataManager().PutManyTags(syncAllContext(), &pb.AddServiceTagsRequest{
+				ServiceId: serviceID,
+				Tags: map[string]string{
+					"a": "test",
+					"b": "b",
+				},
+			})
+			assert.NoError(t, err)
+			listTaskReq := model.ListTaskRequest{
+				Domain:       "sync-all",
+				Project:      "sync-all",
+				ResourceType: datasource.ResourceKV,
+				Action:       sync.UpdateAction,
+				Status:       sync.PendingStatus,
+			}
+			tasks, err := task.List(syncAllContext(), &listTaskReq)
+			assert.NoError(t, err)
+			assert.Equal(t, 1, len(tasks))
+			err = task.Delete(syncAllContext(), tasks...)
+			assert.NoError(t, err)
+			tasks, err = task.List(syncAllContext(), &listTaskReq)
+			assert.NoError(t, err)
+			assert.Equal(t, 0, len(tasks))
+		})
+		t.Run("create a consumer service will create a service task should pass", func(t *testing.T) {
+			resp, err := datasource.GetMetadataManager().RegisterService(syncAllContext(), &pb.CreateServiceRequest{
+				Service: &pb.MicroService{
+					AppId:       "sync_dep_group_sync_all",
+					ServiceName: "sync_dep_consumer_sync_all",
+					Version:     "1.0.0",
+					Level:       "FRONT",
+					Status:      pb.MS_UP,
+				},
+			})
+			assert.NotNil(t, resp)
+			assert.NoError(t, err)
+			consumerID = resp.ServiceId
+			listTaskReq := model.ListTaskRequest{
+				Domain:       "sync-all",
+				Project:      "sync-all",
+				ResourceType: datasource.ResourceService,
+				Action:       sync.CreateAction,
+				Status:       sync.PendingStatus,
+			}
+			tasks, err := task.List(syncAllContext(), &listTaskReq)
+			assert.NoError(t, err)
+			assert.Equal(t, 1, len(tasks))
+			err = task.Delete(syncAllContext(), tasks...)
+			assert.NoError(t, err)
+			tasks, err = task.List(syncAllContext(), &listTaskReq)
+			assert.NoError(t, err)
+			assert.Equal(t, 0, len(tasks))
+		})
+		t.Run("create one provider service will create one service task should pass", func(t *testing.T) {
+			resp, err := datasource.GetMetadataManager().RegisterService(syncAllContext(), &pb.CreateServiceRequest{
+				Service: &pb.MicroService{
+					AppId:       "sync_dep_group_sync_all",
+					ServiceName: "sync_dep_provider_sync_all",
+					Version:     "1.0.0",
+					Level:       "FRONT",
+					Status:      pb.MS_UP,
+				},
+			})
+			assert.NotNil(t, resp)
+			assert.NoError(t, err)
+			providerID = resp.ServiceId
+
+			listTaskReq := model.ListTaskRequest{
+				Domain:       "sync-all",
+				Project:      "sync-all",
+				ResourceType: datasource.ResourceService,
+				Action:       sync.CreateAction,
+				Status:       sync.PendingStatus,
+			}
+			tasks, err := task.List(syncAllContext(), &listTaskReq)
+			assert.NoError(t, err)
+			assert.Equal(t, 1, len(tasks))
+			err = task.Delete(syncAllContext(), tasks...)
+			assert.NoError(t, err)
+			tasks, err = task.List(syncAllContext(), &listTaskReq)
+			assert.NoError(t, err)
+			assert.Equal(t, 0, len(tasks))
+		})
+		t.Run("create dependencies for microServices will create a dependency task should pass", func(t *testing.T) {
+			consumer := &pb.MicroServiceKey{
+				ServiceName: "sync_dep_consumer_sync_all",
+				AppId:       "sync_dep_group_sync_all",
+				Version:     "1.0.0",
+			}
+			err := datasource.GetDependencyManager().PutDependencies(syncAllContext(), []*pb.ConsumerDependency{
+				{
+					Consumer: consumer,
+					Providers: []*pb.MicroServiceKey{
+						{
+							AppId:       "sync_dep_group_sync_all",
+							ServiceName: "sync_dep_provider_sync_all",
+						},
+					},
+				},
+			}, true)
+			assert.NoError(t, err)
+
+			listTaskReq := model.ListTaskRequest{
+				Domain:       "sync-all",
+				Project:      "sync-all",
+				ResourceType: datasource.ResourceDependency,
+				Action:       sync.CreateAction,
+				Status:       sync.PendingStatus,
+			}
+			tasks, err := task.List(syncAllContext(), &listTaskReq)
+			assert.NoError(t, err)
+			assert.Equal(t, 1, len(tasks))
+			err = task.Delete(syncAllContext(), tasks...)
+			assert.NoError(t, err)
+			tasks, err = task.List(syncAllContext(), &listTaskReq)
+			assert.NoError(t, err)
+			assert.Equal(t, 0, len(tasks))
+		})
+
+		t.Run("do sync will create task should pass", func(t *testing.T) {
+			err := datasource.GetSyncManager().SyncAll(syncAllContext())
+			assert.Nil(t, err)
+			listServiceTaskReq := model.ListTaskRequest{
+				Domain:       "sync-all",
+				Project:      "sync-all",
+				ResourceType: datasource.ResourceService,
+			}
+			tasks, err := task.List(syncAllContext(), &listServiceTaskReq)
+			assert.NoError(t, err)
+			assert.Equal(t, 3, len(tasks))
+			err = task.Delete(syncAllContext(), tasks...)
+			assert.NoError(t, err)
+			err = task.Delete(syncAllContext(), tasks...)
+			assert.NoError(t, err)
+			listKVTaskReq := model.ListTaskRequest{
+				Domain:       "sync-all",
+				Project:      "sync-all",
+				ResourceType: datasource.ResourceKV,
+			}
+			tasks, err = task.List(syncAllContext(), &listKVTaskReq)
+			assert.NoError(t, err)
+			// three schema and one tag
+			assert.Equal(t, 4, len(tasks))
+			err = task.Delete(syncAllContext(), tasks...)
+			assert.NoError(t, err)
+			err = task.Delete(syncAllContext(), tasks...)
+			assert.NoError(t, err)
+			listAccountTaskReq := model.ListTaskRequest{
+				Domain:       "",
+				Project:      "",
+				ResourceType: datasource.ResourceAccount,
+			}
+			tasks, err = task.List(syncAllContext(), &listAccountTaskReq)
+			assert.NoError(t, err)
+			assert.Equal(t, 1, len(tasks))
+			err = task.Delete(syncAllContext(), tasks...)
+			assert.NoError(t, err)
+			err = task.Delete(syncAllContext(), tasks...)
+			assert.NoError(t, err)
+			listRoleTaskReq := model.ListTaskRequest{
+				Domain:       "",
+				Project:      "",
+				ResourceType: datasource.ResourceRole,
+			}
+			tasks, err = task.List(syncAllContext(), &listRoleTaskReq)
+			assert.NoError(t, err)
+			assert.Equal(t, 1, len(tasks))
+			err = task.Delete(syncAllContext(), tasks...)
+			assert.NoError(t, err)
+			err = task.Delete(syncAllContext(), tasks...)
+			assert.NoError(t, err)
+			listDepTaskReq := model.ListTaskRequest{
+				Domain:       "sync-all",
+				Project:      "sync-all",
+				ResourceType: datasource.ResourceDependency,
+			}
+			tasks, err = task.List(syncAllContext(), &listDepTaskReq)
+			assert.NoError(t, err)
+			assert.Equal(t, 1, len(tasks))
+			err = task.Delete(syncAllContext(), tasks...)
+			assert.NoError(t, err)
+			err = task.Delete(syncAllContext(), tasks...)
+			assert.NoError(t, err)
+			exist, err := etcdadpt.Exist(syncAllContext(), etcd.SyncAllKey)
+			assert.Equal(t, true, exist)
+			assert.Nil(t, err)
+		})
+
+		t.Run("delete all resources should pass", func(t *testing.T) {
+			err := schema.Instance().DeleteRef(syncAllContext(), &schema.RefRequest{
+				ServiceID: serviceID,
+				SchemaID:  "schemaID_sync_all",
+			})
+			assert.NoError(t, err)
+			err = datasource.GetMetadataManager().DeleteSchema(syncAllContext(), &pb.DeleteSchemaRequest{
+				ServiceId: serviceID,
+				SchemaId:  "schemaID_sync_all",
+			})
+			err = datasource.GetMetadataManager().UnregisterService(syncAllContext(), &pb.DeleteServiceRequest{
+				ServiceId: serviceID,
+				Force:     true,
+			})
+			assert.NoError(t, err)
+			_, err = rbac.Instance().DeleteAccount(syncAllContext(), []string{accountName})
+			assert.NoError(t, err)
+			_, err = rbac.Instance().DeleteRole(syncAllContext(), roleName)
+			assert.NoError(t, err)
+			err = datasource.GetMetadataManager().UnregisterService(syncAllContext(), &pb.DeleteServiceRequest{
+				ServiceId: consumerID, Force: true,
+			})
+			assert.NoError(t, err)
+
+			err = datasource.GetMetadataManager().UnregisterService(syncAllContext(), &pb.DeleteServiceRequest{
+				ServiceId: providerID, Force: true,
+			})
+			assert.NoError(t, err)
+
+			listSeviceTaskReq := model.ListTaskRequest{
+				Domain:       "sync-all",
+				Project:      "sync-all",
+				ResourceType: datasource.ResourceService,
+			}
+			tasks, err := task.List(syncAllContext(), &listSeviceTaskReq)
+			assert.NoError(t, err)
+			assert.Equal(t, 3, len(tasks))
+			err = task.Delete(syncAllContext(), tasks...)
+			assert.NoError(t, err)
+			tasks, err = task.List(syncAllContext(), &listSeviceTaskReq)
+			assert.NoError(t, err)
+			assert.Equal(t, 0, len(tasks))
+			listAccountTaskReq := model.ListTaskRequest{
+				Domain:       "",
+				Project:      "",
+				ResourceType: datasource.ResourceAccount,
+			}
+			tasks, err = task.List(syncAllContext(), &listAccountTaskReq)
+			assert.NoError(t, err)
+			assert.Equal(t, 1, len(tasks))
+			err = task.Delete(syncAllContext(), tasks...)
+			assert.NoError(t, err)
+			tasks, err = task.List(syncAllContext(), &listAccountTaskReq)
+			assert.NoError(t, err)
+			assert.Equal(t, 0, len(tasks))
+
+			listRoleTaskReq := model.ListTaskRequest{
+				Domain:       "",
+				Project:      "",
+				ResourceType: datasource.ResourceRole,
+			}
+			tasks, err = task.List(syncAllContext(), &listRoleTaskReq)
+			assert.NoError(t, err)
+			assert.Equal(t, 1, len(tasks))
+			err = task.Delete(syncAllContext(), tasks...)
+			assert.NoError(t, err)
+			tasks, err = task.List(syncAllContext(), &listRoleTaskReq)
+			assert.NoError(t, err)
+			assert.Equal(t, 0, len(tasks))
+
+			listKVTaskReq := model.ListTaskRequest{
+				Domain:       "sync-all",
+				Project:      "sync-all",
+				ResourceType: datasource.ResourceKV,
+			}
+			tasks, err = task.List(syncAllContext(), &listKVTaskReq)
+			assert.NoError(t, err)
+			assert.Equal(t, 2, len(tasks))
+			err = task.Delete(syncAllContext(), tasks...)
+			assert.NoError(t, err)
+			tasks, err = task.List(syncAllContext(), &listKVTaskReq)
+			assert.NoError(t, err)
+			assert.Equal(t, 0, len(tasks))
+
+			tombstoneListReq := model.ListTombstoneRequest{
+				Domain:  "sync-all",
+				Project: "sync-all",
+			}
+			tombstones, err := tombstone.List(syncAllContext(), &tombstoneListReq)
+			assert.NoError(t, err)
+			assert.Equal(t, 7, len(tombstones))
+			err = tombstone.Delete(syncAllContext(), tombstones...)
+			assert.NoError(t, err)
+		})
+	})
+}
diff --git a/datasource/manager.go b/datasource/manager.go
index 9d3e5c2..9ede3fa 100644
--- a/datasource/manager.go
+++ b/datasource/manager.go
@@ -18,6 +18,7 @@
 package datasource
 
 import (
+	"context"
 	"fmt"
 
 	"github.com/go-chassis/cari/dlock"
@@ -51,6 +52,10 @@ func Init(opts Options) error {
 	if err != nil {
 		return err
 	}
+	err = GetSyncManager().SyncAll(context.Background())
+	if err != nil && err != ErrSyncAllKeyExists {
+		return err
+	}
 	err = schema.Init(schema.Options{Kind: opts.Kind})
 	if err != nil {
 		return err
@@ -100,3 +105,6 @@ func GetDependencyManager() DependencyManager {
 func GetMetricsManager() MetricsManager {
 	return dataSourceInst.MetricsManager()
 }
+func GetSyncManager() SyncManager {
+	return dataSourceInst.SyncManager()
+}
diff --git a/datasource/mongo/mongo.go b/datasource/mongo/mongo.go
index 51d2506..bbde3f4 100644
--- a/datasource/mongo/mongo.go
+++ b/datasource/mongo/mongo.go
@@ -44,6 +44,7 @@ type DataSource struct {
 	depManager      datasource.DependencyManager
 	scManager       datasource.SCManager
 	metricsManager  datasource.MetricsManager
+	syncManager     datasource.SyncManager
 }
 
 func (ds *DataSource) SystemManager() datasource.SystemManager {
@@ -66,6 +67,10 @@ func (ds *DataSource) MetricsManager() datasource.MetricsManager {
 	return ds.metricsManager
 }
 
+func (ds *DataSource) SyncManager() datasource.SyncManager {
+	return ds.syncManager
+}
+
 func NewDataSource(opts datasource.Options) (datasource.DataSource, error) {
 	// TODO: construct a reasonable DataSource instance
 	inst := &DataSource{}
@@ -82,6 +87,7 @@ func NewDataSource(opts datasource.Options) (datasource.DataSource, error) {
 		InstanceProperties: opts.InstanceProperties,
 	}
 	inst.metricsManager = &MetricsManager{}
+	inst.syncManager = &SyncManager{}
 	return inst, nil
 }
 
diff --git a/datasource/datasource.go b/datasource/mongo/sync.go
similarity index 68%
copy from datasource/datasource.go
copy to datasource/mongo/sync.go
index ae0727b..7fcaaa1 100644
--- a/datasource/datasource.go
+++ b/datasource/mongo/sync.go
@@ -15,13 +15,20 @@
  * limitations under the License.
  */
 
-package datasource
+package mongo
 
-// DataSource is the DAO layer
-type DataSource interface {
-	SystemManager() SystemManager
-	DependencyManager() DependencyManager
-	MetadataManager() MetadataManager
-	SCManager() SCManager
-	MetricsManager() MetricsManager
+import (
+	"context"
+
+	"github.com/apache/servicecomb-service-center/pkg/log"
+)
+
+type SyncManager struct {
+}
+
+// SyncAll will list all services,accounts,roles,schemas,tags,deps and use tasks to store
+func (s *SyncManager) SyncAll(ctx context.Context) error {
+	// TODO mongo should implement it
+	log.Info("Mongo does not implement this method")
+	return nil
 }
diff --git a/datasource/datasource.go b/datasource/sync.go
similarity index 65%
copy from datasource/datasource.go
copy to datasource/sync.go
index ae0727b..81205c2 100644
--- a/datasource/datasource.go
+++ b/datasource/sync.go
@@ -3,12 +3,12 @@
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
  * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
+ * (the "License"); you may not use this file except request compliance with
  * the License.  You may obtain a copy of the License at
  *
  *     http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
+ * Unless required by applicable law or agreed to request writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
@@ -17,11 +17,13 @@
 
 package datasource
 
-// DataSource is the DAO layer
-type DataSource interface {
-	SystemManager() SystemManager
-	DependencyManager() DependencyManager
-	MetadataManager() MetadataManager
-	SCManager() SCManager
-	MetricsManager() MetricsManager
+import (
+	"context"
+	"errors"
+)
+
+var ErrSyncAllKeyExists = errors.New("sync all key already exists")
+
+type SyncManager interface {
+	SyncAll(ctx context.Context) error
 }