You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by ro...@apache.org on 2022/01/26 08:25:51 UTC

[servicecomb-service-center] branch master updated: [feat] add SyncAll func when db is mongo

This is an automated email from the ASF dual-hosted git repository.

robotljw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-service-center.git


The following commit(s) were added to refs/heads/master by this push:
     new e29c607  [feat] add SyncAll func when db is mongo
     new 31619d1  Merge pull request #1249 from robotLJW/master
e29c607 is described below

commit e29c6074644ea19ffd152478657792486dfab847
Author: robotljw <79...@qq.com>
AuthorDate: Wed Jan 26 15:51:40 2022 +0800

    [feat] add SyncAll func when db is mongo
---
 datasource/etcd/sync_test.go            |  25 ++-
 datasource/mongo/db.go                  |   6 +
 datasource/mongo/model/types.go         |   2 +
 datasource/mongo/sync.go                | 185 +++++++++++++++++++++-
 datasource/{etcd => mongo}/sync_test.go | 266 +++++++-------------------------
 5 files changed, 272 insertions(+), 212 deletions(-)

diff --git a/datasource/etcd/sync_test.go b/datasource/etcd/sync_test.go
index 09ed470..718f092 100644
--- a/datasource/etcd/sync_test.go
+++ b/datasource/etcd/sync_test.go
@@ -337,8 +337,10 @@ func TestSyncAll(t *testing.T) {
 			assert.Equal(t, 3, len(tasks))
 			err = task.Delete(syncAllContext(), tasks...)
 			assert.NoError(t, err)
-			err = task.Delete(syncAllContext(), tasks...)
+			tasks, err = task.List(syncAllContext(), &listServiceTaskReq)
 			assert.NoError(t, err)
+			assert.Equal(t, 0, len(tasks))
+
 			listKVTaskReq := model.ListTaskRequest{
 				Domain:       "sync-all",
 				Project:      "sync-all",
@@ -350,8 +352,13 @@ func TestSyncAll(t *testing.T) {
 			assert.Equal(t, 4, len(tasks))
 			err = task.Delete(syncAllContext(), tasks...)
 			assert.NoError(t, err)
+			tasks, err = task.List(syncAllContext(), &listKVTaskReq)
+			assert.NoError(t, err)
+			// three schema and one tag
+			assert.Equal(t, 0, len(tasks))
 			err = task.Delete(syncAllContext(), tasks...)
 			assert.NoError(t, err)
+
 			listAccountTaskReq := model.ListTaskRequest{
 				Domain:       "",
 				Project:      "",
@@ -362,8 +369,10 @@ func TestSyncAll(t *testing.T) {
 			assert.Equal(t, 1, len(tasks))
 			err = task.Delete(syncAllContext(), tasks...)
 			assert.NoError(t, err)
-			err = task.Delete(syncAllContext(), tasks...)
+			tasks, err = task.List(syncAllContext(), &listAccountTaskReq)
 			assert.NoError(t, err)
+			assert.Equal(t, 0, len(tasks))
+
 			listRoleTaskReq := model.ListTaskRequest{
 				Domain:       "",
 				Project:      "",
@@ -374,8 +383,10 @@ func TestSyncAll(t *testing.T) {
 			assert.Equal(t, 1, len(tasks))
 			err = task.Delete(syncAllContext(), tasks...)
 			assert.NoError(t, err)
-			err = task.Delete(syncAllContext(), tasks...)
+			tasks, err = task.List(syncAllContext(), &listRoleTaskReq)
 			assert.NoError(t, err)
+			assert.Equal(t, 0, len(tasks))
+
 			listDepTaskReq := model.ListTaskRequest{
 				Domain:       "sync-all",
 				Project:      "sync-all",
@@ -386,11 +397,17 @@ func TestSyncAll(t *testing.T) {
 			assert.Equal(t, 1, len(tasks))
 			err = task.Delete(syncAllContext(), tasks...)
 			assert.NoError(t, err)
-			err = task.Delete(syncAllContext(), tasks...)
+			tasks, err = task.List(syncAllContext(), &listDepTaskReq)
 			assert.NoError(t, err)
+			assert.Equal(t, 0, len(tasks))
+
 			exist, err := etcdadpt.Exist(syncAllContext(), etcd.SyncAllKey)
 			assert.Equal(t, true, exist)
 			assert.Nil(t, err)
+
+			isDelete, err := etcdadpt.Delete(syncAllContext(), etcd.SyncAllKey)
+			assert.Equal(t, true, isDelete)
+			assert.Nil(t, err)
 		})
 
 		t.Run("delete all resources should pass", func(t *testing.T) {
diff --git a/datasource/mongo/db.go b/datasource/mongo/db.go
index 4597dfa..e64e1eb 100644
--- a/datasource/mongo/db.go
+++ b/datasource/mongo/db.go
@@ -33,6 +33,7 @@ func ensureDB() {
 	ensureDep()
 	ensureAccount()
 	ensureAccountLock()
+	ensureSyncLock()
 }
 
 func ensureService() {
@@ -93,3 +94,8 @@ func ensureAccountLock() {
 	dmongo.EnsureCollection(model.CollectionAccountLock, nil, []mongo.IndexModel{
 		util.BuildIndexDoc(model.ColumnAccountLockKey)})
 }
+
+func ensureSyncLock() {
+	dmongo.EnsureCollection(model.CollectionSync, nil, []mongo.IndexModel{
+		util.BuildIndexDoc(model.ColumnKey)})
+}
diff --git a/datasource/mongo/model/types.go b/datasource/mongo/model/types.go
index 0a34020..cd42798 100644
--- a/datasource/mongo/model/types.go
+++ b/datasource/mongo/model/types.go
@@ -33,6 +33,7 @@ const (
 	CollectionRole        = "role"
 	CollectionDomain      = "domain"
 	CollectionProject     = "project"
+	CollectionSync        = "sync"
 )
 
 const (
@@ -75,6 +76,7 @@ const (
 	ColumnAccountLockKey       = "key"
 	ColumnAccountLockStatus    = "status"
 	ColumnAccountLockReleaseAt = "release_at"
+	ColumnKey                  = "key"
 )
 
 type Service struct {
diff --git a/datasource/mongo/sync.go b/datasource/mongo/sync.go
index 7fcaaa1..92a615f 100644
--- a/datasource/mongo/sync.go
+++ b/datasource/mongo/sync.go
@@ -19,8 +19,25 @@ package mongo
 
 import (
 	"context"
+	"fmt"
+	"strings"
 
+	dmongo "github.com/go-chassis/cari/db/mongo"
+	"github.com/go-chassis/cari/discovery"
+	rbacmodel "github.com/go-chassis/cari/rbac"
+	"go.mongodb.org/mongo-driver/bson"
+	"go.mongodb.org/mongo-driver/mongo"
+
+	"github.com/apache/servicecomb-service-center/datasource"
+	"github.com/apache/servicecomb-service-center/datasource/mongo/model"
+	"github.com/apache/servicecomb-service-center/datasource/mongo/sync"
 	"github.com/apache/servicecomb-service-center/pkg/log"
+	putil "github.com/apache/servicecomb-service-center/pkg/util"
+	"github.com/apache/servicecomb-service-center/server/config"
+)
+
+const (
+	SyncAllKey = "sync-all"
 )
 
 type SyncManager struct {
@@ -28,7 +45,171 @@ type SyncManager struct {
 
 // SyncAll will list all services,accounts,roles,schemas,tags,deps and use tasks to store
 func (s *SyncManager) SyncAll(ctx context.Context) error {
-	// TODO mongo should implement it
-	log.Info("Mongo does not implement this method")
+	enable := config.GetBool("sync.enableOnStart", false)
+	if !enable {
+		return nil
+	}
+	exist, err := syncAllKeyExist(ctx)
+	if err != nil {
+		return err
+	}
+	if exist {
+		log.Info(fmt.Sprintf("%s key already exists, do not need to do tasks", SyncAllKey))
+		return datasource.ErrSyncAllKeyExists
+	}
+	// TODO use mongo distributed lock
+	err = syncAllAccounts(ctx)
+	if err != nil {
+		return err
+	}
+	err = syncAllRoles(ctx)
+	if err != nil {
+		return err
+	}
+	err = syncAllServices(ctx)
+	if err != nil {
+		return err
+	}
+	err = syncAllDependencies(ctx)
+	if err != nil {
+		return err
+	}
+	return insertSyncAllKey(ctx)
+}
+
+func syncAllKeyExist(ctx context.Context) (bool, error) {
+	count, err := dmongo.GetClient().GetDB().Collection(model.CollectionSync).CountDocuments(ctx, bson.M{"key": SyncAllKey})
+	if err != nil {
+		return false, err
+	}
+	if count > 0 {
+		return true, nil
+	}
+	return false, nil
+}
+
+func syncAllAccounts(ctx context.Context) error {
+	cursor, err := dmongo.GetClient().GetDB().Collection(model.CollectionAccount).Find(ctx, bson.M{})
+	if err != nil {
+		return err
+	}
+	defer func(cursor *mongo.Cursor, ctx context.Context) {
+		err := cursor.Close(ctx)
+		if err != nil {
+			log.Error("fail to close mongo cursor", err)
+		}
+	}(cursor, ctx)
+	for cursor.Next(ctx) {
+		var account rbacmodel.Account
+		err = cursor.Decode(&account)
+		if err != nil {
+			log.Error("failed to decode account", err)
+			return err
+		}
+		err = sync.DoCreateOpts(ctx, datasource.ResourceAccount, &account)
+		if err != nil {
+			log.Error("failed to create account task", err)
+			return err
+		}
+	}
+	return nil
+}
+
+func syncAllRoles(ctx context.Context) error {
+	cursor, err := dmongo.GetClient().GetDB().Collection(model.CollectionRole).Find(ctx, bson.M{})
+	if err != nil {
+		return err
+	}
+	defer func(cursor *mongo.Cursor, ctx context.Context) {
+		err := cursor.Close(ctx)
+		if err != nil {
+			log.Error("fail to close mongo cursor", err)
+		}
+	}(cursor, ctx)
+	for cursor.Next(ctx) {
+		var role rbacmodel.Role
+		err = cursor.Decode(&role)
+		if err != nil {
+			log.Error("failed to decode role", err)
+			return err
+		}
+		err = sync.DoCreateOpts(ctx, datasource.ResourceRole, &role)
+		if err != nil {
+			log.Error("failed to create role task", err)
+			return err
+		}
+	}
 	return nil
 }
+
+func syncAllServices(ctx context.Context) error {
+	cursor, err := dmongo.GetClient().GetDB().Collection(model.CollectionService).Find(ctx, bson.M{})
+	if err != nil {
+		return err
+	}
+	defer func(cursor *mongo.Cursor, ctx context.Context) {
+		err := cursor.Close(ctx)
+		if err != nil {
+			log.Error("fail to close mongo cursor", err)
+		}
+	}(cursor, ctx)
+	for cursor.Next(ctx) {
+		var tmp model.Service
+		err := cursor.Decode(&tmp)
+		if err != nil {
+			return err
+		}
+		request := &discovery.CreateServiceRequest{
+			Service: tmp.Service,
+			Tags:    tmp.Tags,
+		}
+		putil.SetDomain(ctx, tmp.Domain)
+		putil.SetProject(ctx, tmp.Project)
+		err = sync.DoCreateOpts(ctx, datasource.ResourceService, request)
+		if err != nil {
+			log.Error("failed to create service task", err)
+			return err
+		}
+	}
+	return nil
+}
+
+func syncAllDependencies(ctx context.Context) error {
+	cursor, err := dmongo.GetClient().GetDB().Collection(model.CollectionDep).Find(ctx, bson.M{})
+	if err != nil {
+		return err
+	}
+	depInfosMap := make(map[string][]*discovery.ConsumerDependency)
+	defer func(cursor *mongo.Cursor, ctx context.Context) {
+		err := cursor.Close(ctx)
+		if err != nil {
+			log.Error("fail to close mongo cursor", err)
+		}
+	}(cursor, ctx)
+	for cursor.Next(ctx) {
+		var tmp model.ConsumerDep
+		err := cursor.Decode(&tmp)
+		if err != nil {
+			return err
+		}
+		key := tmp.Domain + "/" + tmp.Project
+		depInfosMap[key] = append(depInfosMap[key], tmp.ConsumerDep)
+	}
+	for key, dependencies := range depInfosMap {
+		splitKey := strings.Split(key, "/")
+		domain, project := splitKey[0], splitKey[1]
+		putil.SetDomain(ctx, domain)
+		putil.SetProject(ctx, project)
+		err := sync.DoUpdateOpts(ctx, datasource.ResourceDependency, dependencies)
+		if err != nil {
+			log.Error("fail to create dep tasks", err)
+			return err
+		}
+	}
+	return nil
+}
+
+func insertSyncAllKey(ctx context.Context) error {
+	_, err := dmongo.GetClient().GetDB().Collection(model.CollectionSync).InsertOne(ctx, bson.M{"key": SyncAllKey})
+	return err
+}
diff --git a/datasource/etcd/sync_test.go b/datasource/mongo/sync_test.go
similarity index 59%
copy from datasource/etcd/sync_test.go
copy to datasource/mongo/sync_test.go
index 09ed470..816a2bf 100644
--- a/datasource/etcd/sync_test.go
+++ b/datasource/mongo/sync_test.go
@@ -3,41 +3,40 @@
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
  * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except request compliance with
+ * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
  *     http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to request writing, software
+ * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
 
-package etcd_test
+package mongo_test
 
 import (
 	"context"
 	"strconv"
 	"testing"
 
+	dmongo "github.com/go-chassis/cari/db/mongo"
 	pb "github.com/go-chassis/cari/discovery"
 	crbac "github.com/go-chassis/cari/rbac"
 	"github.com/go-chassis/cari/sync"
 	"github.com/go-chassis/go-archaius"
-	"github.com/little-cui/etcdadpt"
 	"github.com/stretchr/testify/assert"
+	"go.mongodb.org/mongo-driver/bson"
 
 	"github.com/apache/servicecomb-service-center/datasource"
-	"github.com/apache/servicecomb-service-center/datasource/etcd"
+	"github.com/apache/servicecomb-service-center/datasource/mongo"
+	"github.com/apache/servicecomb-service-center/datasource/mongo/model"
 	"github.com/apache/servicecomb-service-center/datasource/rbac"
-	"github.com/apache/servicecomb-service-center/datasource/schema"
-	"github.com/apache/servicecomb-service-center/eventbase/model"
+	emodel "github.com/apache/servicecomb-service-center/eventbase/model"
 	"github.com/apache/servicecomb-service-center/eventbase/service/task"
-	"github.com/apache/servicecomb-service-center/eventbase/service/tombstone"
 	"github.com/apache/servicecomb-service-center/pkg/util"
-	_ "github.com/apache/servicecomb-service-center/test"
 )
 
 func syncAllContext() context.Context {
@@ -54,69 +53,22 @@ func TestSyncAll(t *testing.T) {
 
 	t.Run("enableOnStart is true and syncAllKey exists will not do sync", func(t *testing.T) {
 		_ = archaius.Set("sync.enableOnStart", true)
-		err := etcdadpt.Put(syncAllContext(), etcd.SyncAllKey, "1")
+		_, err := dmongo.GetClient().GetDB().Collection(model.CollectionSync).InsertOne(syncAllContext(),
+			bson.M{"key": mongo.SyncAllKey})
 		assert.Nil(t, err)
 		err = datasource.GetSyncManager().SyncAll(syncAllContext())
 		assert.Equal(t, datasource.ErrSyncAllKeyExists, err)
-		isDeleted, err := etcdadpt.Delete(syncAllContext(), etcd.SyncAllKey)
-		assert.Equal(t, isDeleted, true)
-		assert.Nil(t, err)
-	})
-
-	t.Run("enableOnstart is true and syncAllKey not exists but SyncAllLockKey is lock will not do sync", func(t *testing.T) {
-		_ = archaius.Set("sync.enableOnStart", true)
-		lock, err := etcdadpt.TryLock(etcd.SyncAllLockKey, 600)
-		assert.Nil(t, err)
-		err = datasource.GetSyncManager().SyncAll(syncAllContext())
-		assert.Nil(t, err)
-		listTaskReq := model.ListTaskRequest{
-			Domain:  "sync-all",
-			Project: "sync-all",
-		}
-		tasks, err := task.List(syncAllContext(), &listTaskReq)
-		assert.NoError(t, err)
-		assert.Equal(t, 0, len(tasks))
-		err = lock.Unlock()
+		_, err = dmongo.GetClient().GetDB().Collection(model.CollectionSync).DeleteOne(syncAllContext(),
+			bson.M{"key": mongo.SyncAllKey})
 		assert.Nil(t, err)
 	})
 
 	t.Run("enableOnStart is true and syncAllKey not exists will do sync", func(t *testing.T) {
 		_ = archaius.Set("sync.enableOnStart", true)
-		var serviceID string
 		var accountName string
 		var roleName string
 		var consumerID string
 		var providerID string
-		t.Run("register a service and delete the task should pass", func(t *testing.T) {
-			resp, err := datasource.GetMetadataManager().RegisterService(syncAllContext(), &pb.CreateServiceRequest{
-				Service: &pb.MicroService{
-					AppId:       "sync_micro_service_group",
-					ServiceName: "sync_micro_service_sync_all",
-					Version:     "1.0.0",
-					Level:       "FRONT",
-					Status:      pb.MS_UP,
-				},
-			})
-			assert.NotNil(t, resp)
-			assert.NoError(t, err)
-			assert.Equal(t, pb.ResponseSuccess, resp.Response.GetCode())
-			serviceID = resp.ServiceId
-			listTaskReq := model.ListTaskRequest{
-				Domain:       "sync-all",
-				Project:      "sync-all",
-				ResourceType: datasource.ResourceService,
-				Action:       sync.CreateAction,
-				Status:       sync.PendingStatus,
-			}
-			tasks, err := task.List(syncAllContext(), &listTaskReq)
-			assert.NoError(t, err)
-			assert.Equal(t, 1, len(tasks))
-			err = task.Delete(syncAllContext(), tasks...)
-			assert.NoError(t, err)
-			tasks, err = task.List(syncAllContext(), &listTaskReq)
-			assert.NoError(t, err)
-			assert.Equal(t, 0, len(tasks))
-		})
 		t.Run("create a account and delete the task should pass", func(t *testing.T) {
 			a1 := crbac.Account{
 				ID:                  "sync-create-11111-sync-all",
@@ -132,7 +84,7 @@ func TestSyncAll(t *testing.T) {
 			r, err := rbac.Instance().GetAccount(syncAllContext(), a1.Name)
 			assert.NoError(t, err)
 			assert.Equal(t, a1, *r)
-			listTaskReq := model.ListTaskRequest{
+			listTaskReq := emodel.ListTaskRequest{
 				Domain:       "",
 				Project:      "",
 				ResourceType: datasource.ResourceAccount,
@@ -161,7 +113,7 @@ func TestSyncAll(t *testing.T) {
 			assert.Less(t, 0, dt)
 			assert.Equal(t, r.CreateTime, r.UpdateTime)
 			roleName = r1.Name
-			listTaskReq := model.ListTaskRequest{
+			listTaskReq := emodel.ListTaskRequest{
 				Domain:       "",
 				Project:      "",
 				ResourceType: datasource.ResourceRole,
@@ -172,63 +124,6 @@ func TestSyncAll(t *testing.T) {
 			err = task.Delete(syncAllContext(), tasks...)
 			assert.NoError(t, err)
 		})
-		t.Run("put content with valid request and delete three task should pass", func(t *testing.T) {
-			err := schema.Instance().PutContent(syncAllContext(), &schema.PutContentRequest{
-				ServiceID: serviceID,
-				SchemaID:  "schemaID_sync_all",
-				Content: &schema.ContentItem{
-					Hash:    "hash_sync_all",
-					Summary: "summary_sync_all",
-					Content: "1111111111",
-				},
-			})
-			assert.NoError(t, err)
-			ref, err := schema.Instance().GetRef(syncAllContext(), &schema.RefRequest{
-				ServiceID: serviceID,
-				SchemaID:  "schemaID_sync_all",
-			})
-			assert.NoError(t, err)
-			assert.NotNil(t, ref)
-			assert.Equal(t, "summary_sync_all", ref.Summary)
-			assert.Equal(t, "hash_sync_all", ref.Hash)
-			listTaskReq := model.ListTaskRequest{
-				Domain:       "sync-all",
-				Project:      "sync-all",
-				Action:       sync.UpdateAction,
-				ResourceType: datasource.ResourceKV,
-				Status:       sync.PendingStatus,
-			}
-			tasks, err := task.List(syncAllContext(), &listTaskReq)
-			assert.NoError(t, err)
-			assert.Equal(t, 3, len(tasks))
-			err = task.Delete(syncAllContext(), tasks...)
-			assert.NoError(t, err)
-		})
-		t.Run("update a service tag and delete the task should pass", func(t *testing.T) {
-			err := datasource.GetMetadataManager().PutManyTags(syncAllContext(), &pb.AddServiceTagsRequest{
-				ServiceId: serviceID,
-				Tags: map[string]string{
-					"a": "test",
-					"b": "b",
-				},
-			})
-			assert.NoError(t, err)
-			listTaskReq := model.ListTaskRequest{
-				Domain:       "sync-all",
-				Project:      "sync-all",
-				ResourceType: datasource.ResourceKV,
-				Action:       sync.UpdateAction,
-				Status:       sync.PendingStatus,
-			}
-			tasks, err := task.List(syncAllContext(), &listTaskReq)
-			assert.NoError(t, err)
-			assert.Equal(t, 1, len(tasks))
-			err = task.Delete(syncAllContext(), tasks...)
-			assert.NoError(t, err)
-			tasks, err = task.List(syncAllContext(), &listTaskReq)
-			assert.NoError(t, err)
-			assert.Equal(t, 0, len(tasks))
-		})
 		t.Run("create a consumer service will create a service task should pass", func(t *testing.T) {
 			resp, err := datasource.GetMetadataManager().RegisterService(syncAllContext(), &pb.CreateServiceRequest{
 				Service: &pb.MicroService{
@@ -242,7 +137,7 @@ func TestSyncAll(t *testing.T) {
 			assert.NotNil(t, resp)
 			assert.NoError(t, err)
 			consumerID = resp.ServiceId
-			listTaskReq := model.ListTaskRequest{
+			listTaskReq := emodel.ListTaskRequest{
 				Domain:       "sync-all",
 				Project:      "sync-all",
 				ResourceType: datasource.ResourceService,
@@ -272,7 +167,7 @@ func TestSyncAll(t *testing.T) {
 			assert.NoError(t, err)
 			providerID = resp.ServiceId
 
-			listTaskReq := model.ListTaskRequest{
+			listTaskReq := emodel.ListTaskRequest{
 				Domain:       "sync-all",
 				Project:      "sync-all",
 				ResourceType: datasource.ResourceService,
@@ -304,14 +199,14 @@ func TestSyncAll(t *testing.T) {
 						},
 					},
 				},
-			}, true)
+			}, false)
 			assert.NoError(t, err)
 
-			listTaskReq := model.ListTaskRequest{
+			listTaskReq := emodel.ListTaskRequest{
 				Domain:       "sync-all",
 				Project:      "sync-all",
 				ResourceType: datasource.ResourceDependency,
-				Action:       sync.CreateAction,
+				Action:       sync.UpdateAction,
 				Status:       sync.PendingStatus,
 			}
 			tasks, err := task.List(syncAllContext(), &listTaskReq)
@@ -327,32 +222,21 @@ func TestSyncAll(t *testing.T) {
 		t.Run("do sync will create task should pass", func(t *testing.T) {
 			err := datasource.GetSyncManager().SyncAll(syncAllContext())
 			assert.Nil(t, err)
-			listServiceTaskReq := model.ListTaskRequest{
+			listServiceTaskReq := emodel.ListTaskRequest{
 				Domain:       "sync-all",
 				Project:      "sync-all",
 				ResourceType: datasource.ResourceService,
 			}
 			tasks, err := task.List(syncAllContext(), &listServiceTaskReq)
 			assert.NoError(t, err)
-			assert.Equal(t, 3, len(tasks))
-			err = task.Delete(syncAllContext(), tasks...)
-			assert.NoError(t, err)
-			err = task.Delete(syncAllContext(), tasks...)
-			assert.NoError(t, err)
-			listKVTaskReq := model.ListTaskRequest{
-				Domain:       "sync-all",
-				Project:      "sync-all",
-				ResourceType: datasource.ResourceKV,
-			}
-			tasks, err = task.List(syncAllContext(), &listKVTaskReq)
-			assert.NoError(t, err)
-			// three schema and one tag
-			assert.Equal(t, 4, len(tasks))
+			assert.Equal(t, 2, len(tasks))
 			err = task.Delete(syncAllContext(), tasks...)
 			assert.NoError(t, err)
-			err = task.Delete(syncAllContext(), tasks...)
+			tasks, err = task.List(syncAllContext(), &listServiceTaskReq)
 			assert.NoError(t, err)
-			listAccountTaskReq := model.ListTaskRequest{
+			assert.Equal(t, 0, len(tasks))
+
+			listAccountTaskReq := emodel.ListTaskRequest{
 				Domain:       "",
 				Project:      "",
 				ResourceType: datasource.ResourceAccount,
@@ -362,9 +246,11 @@ func TestSyncAll(t *testing.T) {
 			assert.Equal(t, 1, len(tasks))
 			err = task.Delete(syncAllContext(), tasks...)
 			assert.NoError(t, err)
-			err = task.Delete(syncAllContext(), tasks...)
+			tasks, err = task.List(syncAllContext(), &listAccountTaskReq)
 			assert.NoError(t, err)
-			listRoleTaskReq := model.ListTaskRequest{
+			assert.Equal(t, 0, len(tasks))
+
+			listRoleTaskReq := emodel.ListTaskRequest{
 				Domain:       "",
 				Project:      "",
 				ResourceType: datasource.ResourceRole,
@@ -374,9 +260,11 @@ func TestSyncAll(t *testing.T) {
 			assert.Equal(t, 1, len(tasks))
 			err = task.Delete(syncAllContext(), tasks...)
 			assert.NoError(t, err)
-			err = task.Delete(syncAllContext(), tasks...)
+			tasks, err = task.List(syncAllContext(), &listRoleTaskReq)
 			assert.NoError(t, err)
-			listDepTaskReq := model.ListTaskRequest{
+			assert.Equal(t, 0, len(tasks))
+
+			listDepTaskReq := emodel.ListTaskRequest{
 				Domain:       "sync-all",
 				Project:      "sync-all",
 				ResourceType: datasource.ResourceDependency,
@@ -386,61 +274,24 @@ func TestSyncAll(t *testing.T) {
 			assert.Equal(t, 1, len(tasks))
 			err = task.Delete(syncAllContext(), tasks...)
 			assert.NoError(t, err)
-			err = task.Delete(syncAllContext(), tasks...)
-			assert.NoError(t, err)
-			exist, err := etcdadpt.Exist(syncAllContext(), etcd.SyncAllKey)
-			assert.Equal(t, true, exist)
+			count, err := dmongo.GetClient().GetDB().Collection(model.CollectionSync).CountDocuments(syncAllContext(),
+				bson.M{"key": mongo.SyncAllKey})
+			assert.Nil(t, err)
+			assert.Equal(t, int64(1), count)
+			_, err = dmongo.GetClient().GetDB().Collection(model.CollectionSync).DeleteOne(syncAllContext(),
+				bson.M{"key": mongo.SyncAllKey})
 			assert.Nil(t, err)
 		})
 
 		t.Run("delete all resources should pass", func(t *testing.T) {
-			err := schema.Instance().DeleteRef(syncAllContext(), &schema.RefRequest{
-				ServiceID: serviceID,
-				SchemaID:  "schemaID_sync_all",
-			})
-			assert.NoError(t, err)
-			err = datasource.GetMetadataManager().DeleteSchema(syncAllContext(), &pb.DeleteSchemaRequest{
-				ServiceId: serviceID,
-				SchemaId:  "schemaID_sync_all",
-			})
-			err = datasource.GetMetadataManager().UnregisterService(syncAllContext(), &pb.DeleteServiceRequest{
-				ServiceId: serviceID,
-				Force:     true,
-			})
-			assert.NoError(t, err)
-			_, err = rbac.Instance().DeleteAccount(syncAllContext(), []string{accountName})
-			assert.NoError(t, err)
-			_, err = rbac.Instance().DeleteRole(syncAllContext(), roleName)
-			assert.NoError(t, err)
-			err = datasource.GetMetadataManager().UnregisterService(syncAllContext(), &pb.DeleteServiceRequest{
-				ServiceId: consumerID, Force: true,
-			})
-			assert.NoError(t, err)
-
-			err = datasource.GetMetadataManager().UnregisterService(syncAllContext(), &pb.DeleteServiceRequest{
-				ServiceId: providerID, Force: true,
-			})
+			_, err := rbac.Instance().DeleteAccount(syncAllContext(), []string{accountName})
 			assert.NoError(t, err)
-
-			listSeviceTaskReq := model.ListTaskRequest{
-				Domain:       "sync-all",
-				Project:      "sync-all",
-				ResourceType: datasource.ResourceService,
-			}
-			tasks, err := task.List(syncAllContext(), &listSeviceTaskReq)
-			assert.NoError(t, err)
-			assert.Equal(t, 3, len(tasks))
-			err = task.Delete(syncAllContext(), tasks...)
-			assert.NoError(t, err)
-			tasks, err = task.List(syncAllContext(), &listSeviceTaskReq)
-			assert.NoError(t, err)
-			assert.Equal(t, 0, len(tasks))
-			listAccountTaskReq := model.ListTaskRequest{
+			listAccountTaskReq := emodel.ListTaskRequest{
 				Domain:       "",
 				Project:      "",
 				ResourceType: datasource.ResourceAccount,
 			}
-			tasks, err = task.List(syncAllContext(), &listAccountTaskReq)
+			tasks, err := task.List(syncAllContext(), &listAccountTaskReq)
 			assert.NoError(t, err)
 			assert.Equal(t, 1, len(tasks))
 			err = task.Delete(syncAllContext(), tasks...)
@@ -449,7 +300,9 @@ func TestSyncAll(t *testing.T) {
 			assert.NoError(t, err)
 			assert.Equal(t, 0, len(tasks))
 
-			listRoleTaskReq := model.ListTaskRequest{
+			_, err = rbac.Instance().DeleteRole(syncAllContext(), roleName)
+			assert.NoError(t, err)
+			listRoleTaskReq := emodel.ListTaskRequest{
 				Domain:       "",
 				Project:      "",
 				ResourceType: datasource.ResourceRole,
@@ -463,29 +316,30 @@ func TestSyncAll(t *testing.T) {
 			assert.NoError(t, err)
 			assert.Equal(t, 0, len(tasks))
 
-			listKVTaskReq := model.ListTaskRequest{
+			err = datasource.GetMetadataManager().UnregisterService(syncAllContext(), &pb.DeleteServiceRequest{
+				ServiceId: consumerID, Force: true,
+			})
+			assert.NoError(t, err)
+
+			err = datasource.GetMetadataManager().UnregisterService(syncAllContext(), &pb.DeleteServiceRequest{
+				ServiceId: providerID, Force: true,
+			})
+			assert.NoError(t, err)
+
+			listSeviceTaskReq := emodel.ListTaskRequest{
 				Domain:       "sync-all",
 				Project:      "sync-all",
-				ResourceType: datasource.ResourceKV,
+				ResourceType: datasource.ResourceService,
 			}
-			tasks, err = task.List(syncAllContext(), &listKVTaskReq)
+			tasks, err = task.List(syncAllContext(), &listSeviceTaskReq)
 			assert.NoError(t, err)
 			assert.Equal(t, 2, len(tasks))
 			err = task.Delete(syncAllContext(), tasks...)
 			assert.NoError(t, err)
-			tasks, err = task.List(syncAllContext(), &listKVTaskReq)
+			tasks, err = task.List(syncAllContext(), &listSeviceTaskReq)
 			assert.NoError(t, err)
 			assert.Equal(t, 0, len(tasks))
-
-			tombstoneListReq := model.ListTombstoneRequest{
-				Domain:  "sync-all",
-				Project: "sync-all",
-			}
-			tombstones, err := tombstone.List(syncAllContext(), &tombstoneListReq)
-			assert.NoError(t, err)
-			assert.Equal(t, 7, len(tombstones))
-			err = tombstone.Delete(syncAllContext(), tombstones...)
-			assert.NoError(t, err)
 		})
 	})
+
 }