You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by ro...@apache.org on 2022/01/09 11:14:50 UTC

[servicecomb-service-center] branch master updated: [feat] add schema sync func and ut when db mode is etcd

This is an automated email from the ASF dual-hosted git repository.

robotljw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-service-center.git


The following commit(s) were added to refs/heads/master by this push:
     new dcfc2ae  [feat] add schema sync func and ut when db mode is etcd
     new f1d9781  Merge pull request #1209 from robotLJW/master
dcfc2ae is described below

commit dcfc2ae871e9525795c964e0101ca20f7b739b6e
Author: robotljw <79...@qq.com>
AuthorDate: Fri Jan 7 19:32:01 2022 +0800

    [feat] add schema sync func and ut when db mode is etcd
---
 datasource/etcd/ms.go          |  21 ++-
 datasource/etcd/schema.go      |  94 ++++++++++++--
 datasource/etcd/schema_test.go | 281 +++++++++++++++++++++++++++++++++++++++++
 3 files changed, 378 insertions(+), 18 deletions(-)

diff --git a/datasource/etcd/ms.go b/datasource/etcd/ms.go
index ce5ecf4..220030c 100644
--- a/datasource/etcd/ms.go
+++ b/datasource/etcd/ms.go
@@ -1544,13 +1544,24 @@ func (ds *MetadataManager) DeleteSchema(ctx context.Context, request *pb.DeleteS
 		return nil, schema.ErrSchemaNotFound
 	}
 	epSummaryKey := path.GenerateServiceSchemaSummaryKey(domainProject, request.ServiceId, request.SchemaId)
-	resp, errDo := etcdadpt.TxnWithCmp(ctx,
-		etcdadpt.Ops(
-			etcdadpt.OpDel(etcdadpt.WithStrKey(epSummaryKey)),
-			etcdadpt.OpDel(etcdadpt.WithStrKey(key)),
-		),
+	opts := []etcdadpt.OpOptions{etcdadpt.OpDel(etcdadpt.WithStrKey(epSummaryKey)), etcdadpt.OpDel(etcdadpt.WithStrKey(key))}
+	schemaKeyOpt, err := esync.GenDeleteOpts(ctx, datasource.ResourceKV, key, key)
+	if err != nil {
+		log.Error("fail to create delete opts", err)
+		return nil, err
+	}
+	opts = append(opts, schemaKeyOpt...)
+	schemaSummaryKeyOpt, err := esync.GenDeleteOpts(ctx, datasource.ResourceKV, epSummaryKey, epSummaryKey)
+	if err != nil {
+		log.Error("fail to create delete opts", err)
+		return nil, err
+	}
+	opts = append(opts, schemaSummaryKeyOpt...)
+
+	resp, errDo := etcdadpt.TxnWithCmp(ctx, opts,
 		etcdadpt.If(etcdadpt.NotEqualVer(path.GenerateServiceKey(domainProject, request.ServiceId), 0)),
 		nil)
+
 	if errDo != nil {
 		log.Error(fmt.Sprintf("delete schema[%s/%s] failed, operator: %s",
 			request.ServiceId, request.SchemaId, remoteIP), errDo)
diff --git a/datasource/etcd/schema.go b/datasource/etcd/schema.go
index 892cb8b..2f16dcd 100644
--- a/datasource/etcd/schema.go
+++ b/datasource/etcd/schema.go
@@ -22,17 +22,19 @@ import (
 	"encoding/json"
 	"fmt"
 
+	mapset "github.com/deckarep/golang-set"
+	"github.com/go-chassis/cari/discovery"
+	"github.com/little-cui/etcdadpt"
+	"go.etcd.io/etcd/api/v3/mvccpb"
+
 	"github.com/apache/servicecomb-service-center/datasource"
 	"github.com/apache/servicecomb-service-center/datasource/etcd/path"
 	"github.com/apache/servicecomb-service-center/datasource/etcd/sd"
-	serviceUtil "github.com/apache/servicecomb-service-center/datasource/etcd/util"
+	"github.com/apache/servicecomb-service-center/datasource/etcd/sync"
+	eutil "github.com/apache/servicecomb-service-center/datasource/etcd/util"
 	"github.com/apache/servicecomb-service-center/datasource/schema"
 	"github.com/apache/servicecomb-service-center/pkg/log"
 	"github.com/apache/servicecomb-service-center/pkg/util"
-	mapset "github.com/deckarep/golang-set"
-	"github.com/go-chassis/cari/discovery"
-	"github.com/little-cui/etcdadpt"
-	"go.etcd.io/etcd/api/v3/mvccpb"
 )
 
 func init() {
@@ -55,7 +57,7 @@ func (dao *SchemaDAO) GetRef(ctx context.Context, refRequest *schema.RefRequest)
 	schemaID := refRequest.SchemaID
 
 	refKey := path.GenerateServiceSchemaRefKey(domainProject, serviceID, schemaID)
-	refResp, err := sd.SchemaRef().Search(ctx, serviceUtil.ContextOptions(ctx, etcdadpt.WithStrKey(refKey))...)
+	refResp, err := sd.SchemaRef().Search(ctx, eutil.ContextOptions(ctx, etcdadpt.WithStrKey(refKey))...)
 	if err != nil {
 		log.Error(fmt.Sprintf("get service[%s] schema-ref[%s] failed", serviceID, schemaID), err)
 		return nil, err
@@ -83,7 +85,7 @@ func (dao *SchemaDAO) GetRef(ctx context.Context, refRequest *schema.RefRequest)
 func getSummary(ctx context.Context, serviceID string, schemaID string) (string, error) {
 	domainProject := util.ParseDomainProject(ctx)
 	summaryKey := path.GenerateServiceSchemaSummaryKey(domainProject, serviceID, schemaID)
-	summaryResp, err := sd.SchemaSummary().Search(ctx, serviceUtil.ContextOptions(ctx, etcdadpt.WithStrKey(summaryKey))...)
+	summaryResp, err := sd.SchemaSummary().Search(ctx, eutil.ContextOptions(ctx, etcdadpt.WithStrKey(summaryKey))...)
 	if err != nil {
 		return "", err
 	}
@@ -101,7 +103,7 @@ func (dao *SchemaDAO) ListRef(ctx context.Context, refRequest *schema.RefRequest
 	serviceID := refRequest.ServiceID
 
 	refPrefixKey := path.GenerateServiceSchemaRefKey(domainProject, serviceID, "")
-	refResp, err := sd.SchemaRef().Search(ctx, serviceUtil.ContextOptions(ctx,
+	refResp, err := sd.SchemaRef().Search(ctx, eutil.ContextOptions(ctx,
 		etcdadpt.WithStrKey(refPrefixKey), etcdadpt.WithPrefix())...)
 	if err != nil {
 		log.Error(fmt.Sprintf("get service[%s] schema-refs failed", serviceID), err)
@@ -132,7 +134,7 @@ func (dao *SchemaDAO) ListRef(ctx context.Context, refRequest *schema.RefRequest
 func getSummaryMap(ctx context.Context, serviceID string) (map[string]string, error) {
 	domainProject := util.ParseDomainProject(ctx)
 	summaryPrefixKey := path.GenerateServiceSchemaSummaryKey(domainProject, serviceID, "")
-	summaryResp, err := sd.SchemaSummary().Search(ctx, serviceUtil.ContextOptions(ctx,
+	summaryResp, err := sd.SchemaSummary().Search(ctx, eutil.ContextOptions(ctx,
 		etcdadpt.WithStrKey(summaryPrefixKey), etcdadpt.WithPrefix())...)
 	if err != nil {
 		return nil, err
@@ -156,6 +158,19 @@ func (dao *SchemaDAO) DeleteRef(ctx context.Context, refRequest *schema.RefReque
 		etcdadpt.OpDel(etcdadpt.WithStrKey(refKey)),
 		etcdadpt.OpDel(etcdadpt.WithStrKey(summaryKey)),
 	}
+	refOpts, err := sync.GenDeleteOpts(ctx, datasource.ResourceKV, refKey, refKey)
+	if err != nil {
+		log.Error("fail to create delete opts", err)
+		return err
+	}
+	options = append(options, refOpts...)
+	summaryOpts, err := sync.GenDeleteOpts(ctx, datasource.ResourceKV, summaryKey, summaryKey)
+	if err != nil {
+		log.Error("fail to create delete opts", err)
+		return err
+	}
+	options = append(options, summaryOpts...)
+
 	cmp, err := etcdadpt.TxnWithCmp(ctx, options, etcdadpt.If(etcdadpt.ExistKey(refKey)), options)
 	if err != nil {
 		log.Error(fmt.Sprintf("delete service[%s] schema-ref[%s] failed", serviceID, schemaID), err)
@@ -213,6 +228,18 @@ func (dao *SchemaDAO) PutContent(ctx context.Context, contentRequest *schema.Put
 		etcdadpt.OpPut(etcdadpt.WithStrKey(refKey), etcdadpt.WithStrValue(content.Hash)),
 		etcdadpt.OpPut(etcdadpt.WithStrKey(summaryKey), etcdadpt.WithStrValue(content.Summary)),
 	}
+	refOpts, err := sync.GenUpdateOpts(ctx, datasource.ResourceKV, content.Hash, sync.WithOpts(map[string]string{"key": refKey}))
+	if err != nil {
+		log.Error("fail to create update opts", err)
+		return err
+	}
+	summaryOpts, err := sync.GenUpdateOpts(ctx, datasource.ResourceKV, content.Summary, sync.WithOpts(map[string]string{"key": summaryKey}))
+	if err != nil {
+		log.Error("fail to create update opts", err)
+		return err
+	}
+	existContentOptions = append(existContentOptions, refOpts...)
+	existContentOptions = append(existContentOptions, summaryOpts...)
 
 	// append the schemaID into service.Schemas if schemaID is new
 	if !util.SliceHave(service.Schemas, schemaID) {
@@ -230,6 +257,13 @@ func (dao *SchemaDAO) PutContent(ctx context.Context, contentRequest *schema.Put
 	newContentOptions := append(existContentOptions,
 		etcdadpt.OpPut(etcdadpt.WithStrKey(contentKey), etcdadpt.WithStrValue(content.Content)),
 	)
+	contentOpts, err := sync.GenUpdateOpts(ctx, datasource.ResourceKV, content.Content, sync.WithOpts(map[string]string{"key": contentKey}))
+	if err != nil {
+		log.Error("fail to create update opts", err)
+		return err
+	}
+	newContentOptions = append(newContentOptions, contentOpts...)
+
 	cmp, err := etcdadpt.TxnWithCmp(ctx, newContentOptions, etcdadpt.If(etcdadpt.NotExistKey(contentKey)), existContentOptions)
 	if err != nil {
 		log.Error(fmt.Sprintf("put kv[%s] failed", refKey), err)
@@ -261,7 +295,7 @@ func (dao *SchemaDAO) PutManyContent(ctx context.Context, contentRequest *schema
 	}
 
 	// unsafe!
-	schemaIDs, options := transformSchemaIDsAndOptions(domainProject, serviceID, service.Schemas, contentRequest)
+	schemaIDs, options := transformSchemaIDsAndOptions(ctx, domainProject, serviceID, service.Schemas, contentRequest)
 
 	// should update service.Schemas
 	service.Schemas = schemaIDs
@@ -272,10 +306,18 @@ func (dao *SchemaDAO) PutManyContent(ctx context.Context, contentRequest *schema
 	}
 	serviceKey := path.GenerateServiceKey(domainProject, serviceID)
 	options = append(options, etcdadpt.OpPut(etcdadpt.WithStrKey(serviceKey), etcdadpt.WithValue(body)))
+	// update service task
+	serviceOpts, err := sync.GenUpdateOpts(ctx, datasource.ResourceKV, body, sync.WithOpts(map[string]string{"key": serviceKey}))
+	if err != nil {
+		log.Error("fail to create update opts", err)
+	}
+	options = append(options, serviceOpts...)
+
 	return etcdadpt.Txn(ctx, options)
 }
 
-func transformSchemaIDsAndOptions(domainProject string, serviceID string, oldSchemaIDs []string, contentRequest *schema.PutManyContentRequest) ([]string, []etcdadpt.OpOptions) {
+func transformSchemaIDsAndOptions(ctx context.Context, domainProject string, serviceID string,
+	oldSchemaIDs []string, contentRequest *schema.PutManyContentRequest) ([]string, []etcdadpt.OpOptions) {
 	pendingDeleteSchemaIDs := mapset.NewSet()
 	for _, schemaID := range oldSchemaIDs {
 		pendingDeleteSchemaIDs.Add(schemaID)
@@ -293,6 +335,22 @@ func transformSchemaIDsAndOptions(domainProject string, serviceID string, oldSch
 			etcdadpt.OpPut(etcdadpt.WithStrKey(contentKey), etcdadpt.WithStrValue(content.Content)),
 			etcdadpt.OpPut(etcdadpt.WithStrKey(summaryKey), etcdadpt.WithStrValue(content.Summary)),
 		)
+		refOpts, err := sync.GenUpdateOpts(ctx, datasource.ResourceKV, content.Hash, sync.WithOpts(map[string]string{"key": refKey}))
+		if err != nil {
+			log.Error("fail to create update opts", err)
+		}
+		options = append(options, refOpts...)
+		contentOpts, err := sync.GenUpdateOpts(ctx, datasource.ResourceKV, content.Content, sync.WithOpts(map[string]string{"key": contentKey}))
+		if err != nil {
+			log.Error("fail to create update opts", err)
+		}
+		options = append(options, contentOpts...)
+		summaryOpts, err := sync.GenUpdateOpts(ctx, datasource.ResourceKV, content.Summary, sync.WithOpts(map[string]string{"key": summaryKey}))
+		if err != nil {
+			log.Error("fail to create update opts", err)
+		}
+		options = append(options, summaryOpts...)
+
 		schemaIDs = append(schemaIDs, schemaID)
 		pendingDeleteSchemaIDs.Remove(schemaID)
 	}
@@ -305,6 +363,16 @@ func transformSchemaIDsAndOptions(domainProject string, serviceID string, oldSch
 			etcdadpt.OpDel(etcdadpt.WithStrKey(refKey)),
 			etcdadpt.OpDel(etcdadpt.WithStrKey(summaryKey)),
 		)
+		refOpts, err := sync.GenDeleteOpts(ctx, datasource.ResourceKV, refKey, refKey)
+		if err != nil {
+			log.Error("fail to create update opts", err)
+		}
+		options = append(options, refOpts...)
+		summaryOpt, err := sync.GenDeleteOpts(ctx, datasource.ResourceKV, summaryKey, summaryKey)
+		if err != nil {
+			log.Error("fail to create update opts", err)
+		}
+		options = append(options, summaryOpt...)
 	}
 	return schemaIDs, options
 }
@@ -341,7 +409,7 @@ func (dao *SchemaDAO) DeleteContent(ctx context.Context, contentRequest *schema.
 func getContentHashMap(ctx context.Context) (map[string]struct{}, error) {
 	domainProject := util.ParseDomainProject(ctx)
 	refPrefixKey := path.GetServiceSchemaRefRootKey(domainProject) + path.SPLIT
-	refResp, err := sd.SchemaRef().Search(ctx, serviceUtil.ContextOptions(ctx,
+	refResp, err := sd.SchemaRef().Search(ctx, eutil.ContextOptions(ctx,
 		etcdadpt.WithStrKey(refPrefixKey), etcdadpt.WithPrefix())...)
 	if err != nil {
 		return nil, err
@@ -393,7 +461,7 @@ func filterNoRefContentHashes(ctx context.Context, kvs []*mvccpb.KeyValue) (maps
 	}
 
 	refPrefixKey := path.GetServiceSchemaRefRootKey("")
-	resp, err := sd.SchemaRef().Search(ctx, serviceUtil.ContextOptions(ctx,
+	resp, err := sd.SchemaRef().Search(ctx, eutil.ContextOptions(ctx,
 		etcdadpt.WithStrKey(refPrefixKey), etcdadpt.WithPrefix())...)
 	if err != nil {
 		return nil, err
diff --git a/datasource/etcd/schema_test.go b/datasource/etcd/schema_test.go
new file mode 100644
index 0000000..efc62e6
--- /dev/null
+++ b/datasource/etcd/schema_test.go
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package etcd_test
+
+import (
+	"context"
+	"testing"
+
+	pb "github.com/go-chassis/cari/discovery"
+	csync "github.com/go-chassis/cari/sync"
+	"github.com/stretchr/testify/assert"
+
+	"github.com/apache/servicecomb-service-center/datasource"
+	"github.com/apache/servicecomb-service-center/datasource/schema"
+	"github.com/apache/servicecomb-service-center/eventbase/model"
+	"github.com/apache/servicecomb-service-center/eventbase/service/task"
+	"github.com/apache/servicecomb-service-center/eventbase/service/tombstone"
+	"github.com/apache/servicecomb-service-center/pkg/util"
+	_ "github.com/apache/servicecomb-service-center/test"
+)
+
+func schemaContext() context.Context {
+	return util.WithNoCache(util.SetDomainProject(context.Background(), "sync-schema", "sync-schema"))
+}
+
+func TestSyncSchema(t *testing.T) {
+
+	datasource.EnableSync = true
+	var serviceID string
+
+	defer schema.Instance().DeleteContent(schemaContext(), &schema.ContentRequest{
+		Hash: "hash_1",
+	})
+	defer schema.Instance().DeleteContent(schemaContext(), &schema.ContentRequest{
+		Hash: "hash_2",
+	})
+	defer schema.Instance().DeleteContent(schemaContext(), &schema.ContentRequest{
+		Hash: "hash_2",
+	})
+
+	t.Run("register a micro service", func(t *testing.T) {
+		t.Run("register a service will create a service task should pass", func(t *testing.T) {
+			resp, err := datasource.GetMetadataManager().RegisterService(schemaContext(), &pb.CreateServiceRequest{
+				Service: &pb.MicroService{
+					AppId:       "sync_schemas_prod",
+					ServiceName: "sync_schemas_service",
+					Version:     "1.0.1",
+					Level:       "FRONT",
+					Status:      pb.MS_UP,
+					Environment: pb.ENV_PROD,
+				},
+			})
+			assert.NoError(t, err)
+			assert.Equal(t, pb.ResponseSuccess, resp.Response.GetCode())
+			serviceID = resp.ServiceId
+			listTaskReq := model.ListTaskRequest{
+				Domain:       "sync-schema",
+				Project:      "sync-schema",
+				Action:       csync.CreateAction,
+				ResourceType: datasource.ResourceService,
+				Status:       csync.PendingStatus,
+			}
+			tasks, err := task.List(context.Background(), &listTaskReq)
+			assert.NoError(t, err)
+			assert.Equal(t, 1, len(tasks))
+			err = task.Delete(context.Background(), tasks...)
+			assert.NoError(t, err)
+		})
+	})
+
+	t.Run("put schema will execute the PutContent func", func(t *testing.T) {
+		t.Run("put content with valid request, will create 3 kv tasks(hash summary content) should pass", func(t *testing.T) {
+			err := schema.Instance().PutContent(schemaContext(), &schema.PutContentRequest{
+				ServiceID: serviceID,
+				SchemaID:  "schemaID_1",
+				Content: &schema.ContentItem{
+					Hash:    "hash_1",
+					Summary: "summary_1",
+					Content: "1111111111",
+				},
+			})
+			assert.NoError(t, err)
+
+			ref, err := schema.Instance().GetRef(schemaContext(), &schema.RefRequest{
+				ServiceID: serviceID,
+				SchemaID:  "schemaID_1",
+			})
+			assert.NoError(t, err)
+			assert.NotNil(t, ref)
+			assert.Equal(t, "summary_1", ref.Summary)
+			assert.Equal(t, "hash_1", ref.Hash)
+			listTaskReq := model.ListTaskRequest{
+				Domain:       "sync-schema",
+				Project:      "sync-schema",
+				Action:       csync.UpdateAction,
+				ResourceType: datasource.ResourceKV,
+				Status:       csync.PendingStatus,
+			}
+			tasks, err := task.List(context.Background(), &listTaskReq)
+			assert.NoError(t, err)
+			assert.Equal(t, 3, len(tasks))
+			err = task.Delete(context.Background(), tasks...)
+			assert.NoError(t, err)
+		})
+	})
+
+	t.Run("put schemas will execute the PutManyContent func", func(t *testing.T) {
+		t.Run("put many content with valid request, will create 7 kv update task (2 ref tasks, 2 content tasks, 2 summary tasks"+
+			" 1 service task), two delete kv task, two tombstones(ref and summary) should pass", func(t *testing.T) {
+			err := schema.Instance().PutManyContent(schemaContext(), &schema.PutManyContentRequest{
+				ServiceID: serviceID,
+				SchemaIDs: []string{"schemaID_2", "schemaID_3"},
+				Contents: []*schema.ContentItem{
+					{
+						Hash:    "hash_2",
+						Content: "content_2",
+						Summary: "summary_2",
+					},
+					{
+						Hash:    "hash_3",
+						Content: "content_3",
+						Summary: "summary_3",
+					},
+				},
+			})
+			assert.NoError(t, err)
+			ref, err := schema.Instance().GetRef(schemaContext(), &schema.RefRequest{
+				ServiceID: serviceID,
+				SchemaID:  "schemaID_2",
+			})
+			assert.NoError(t, err)
+			assert.NotNil(t, ref)
+			assert.Equal(t, "summary_2", ref.Summary)
+			assert.Equal(t, "hash_2", ref.Hash)
+			ref, err = schema.Instance().GetRef(schemaContext(), &schema.RefRequest{
+				ServiceID: serviceID,
+				SchemaID:  "schemaID_3",
+			})
+			assert.NoError(t, err)
+			assert.NotNil(t, ref)
+			assert.Equal(t, "summary_3", ref.Summary)
+			assert.Equal(t, "hash_3", ref.Hash)
+			listTaskReq := model.ListTaskRequest{
+				Domain:       "sync-schema",
+				Project:      "sync-schema",
+				Action:       csync.UpdateAction,
+				ResourceType: datasource.ResourceKV,
+				Status:       csync.PendingStatus,
+			}
+			tasks, err := task.List(context.Background(), &listTaskReq)
+			assert.NoError(t, err)
+			assert.Equal(t, 7, len(tasks))
+			err = task.Delete(context.Background(), tasks...)
+			assert.NoError(t, err)
+			listTaskReq = model.ListTaskRequest{
+				Domain:       "sync-schema",
+				Project:      "sync-schema",
+				Action:       csync.DeleteAction,
+				ResourceType: datasource.ResourceKV,
+				Status:       csync.PendingStatus,
+			}
+			tasks, err = task.List(context.Background(), &listTaskReq)
+			assert.NoError(t, err)
+			assert.Equal(t, 2, len(tasks))
+			err = task.Delete(context.Background(), tasks...)
+			assert.NoError(t, err)
+			tombstoneListReq := model.ListTombstoneRequest{
+				Domain:       "sync-schema",
+				Project:      "sync-schema",
+				ResourceType: datasource.ResourceKV,
+			}
+			tombstones, err := tombstone.List(context.Background(), &tombstoneListReq)
+			assert.NoError(t, err)
+			assert.Equal(t, 2, len(tombstones))
+			err = tombstone.Delete(context.Background(), tombstones...)
+			assert.NoError(t, err)
+		})
+	})
+
+	t.Run("delete schemas will execute the DeleteRef func and DeleteSchema func ", func(t *testing.T) {
+		t.Run("delete schemaID_2 and schemaID_3 will create 4 tasks(2 from DeleteRef, 2 from DeleteSchema ) "+
+			"and 4 tombstones (2 from DeleteRef, 2 from DeleteSchema) should pass", func(t *testing.T) {
+			err := schema.Instance().DeleteRef(schemaContext(), &schema.RefRequest{
+				ServiceID: serviceID,
+				SchemaID:  "schemaID_2",
+			})
+			assert.NoError(t, err)
+			_, err = datasource.GetMetadataManager().DeleteSchema(schemaContext(), &pb.DeleteSchemaRequest{
+				ServiceId: serviceID,
+				SchemaId:  "schemaID_2",
+			})
+			assert.Equal(t, schema.ErrSchemaNotFound, err)
+			err = schema.Instance().DeleteRef(schemaContext(), &schema.RefRequest{
+				ServiceID: serviceID,
+				SchemaID:  "schemaID_3",
+			})
+			assert.NoError(t, err)
+			_, err = datasource.GetMetadataManager().DeleteSchema(schemaContext(), &pb.DeleteSchemaRequest{
+				ServiceId: serviceID,
+				SchemaId:  "schemaID_3",
+			})
+			assert.Equal(t, schema.ErrSchemaNotFound, err)
+			listTaskReq := model.ListTaskRequest{
+				Domain:       "sync-schema",
+				Project:      "sync-schema",
+				Action:       csync.DeleteAction,
+				ResourceType: datasource.ResourceKV,
+				Status:       csync.PendingStatus,
+			}
+			tasks, err := task.List(context.Background(), &listTaskReq)
+			assert.NoError(t, err)
+			assert.Equal(t, 4, len(tasks))
+			err = task.Delete(context.Background(), tasks...)
+			assert.NoError(t, err)
+			tombstoneListReq := model.ListTombstoneRequest{
+				Domain:       "sync-schema",
+				Project:      "sync-schema",
+				ResourceType: datasource.ResourceKV,
+			}
+			tombstones, err := tombstone.List(context.Background(), &tombstoneListReq)
+			assert.NoError(t, err)
+			assert.Equal(t, 4, len(tombstones))
+			err = tombstone.Delete(context.Background(), tombstones...)
+			assert.NoError(t, err)
+		})
+	})
+
+	t.Run("unregister micro-service", func(t *testing.T) {
+		t.Run("unregister a micro service will create a task and a tombstone should pass", func(t *testing.T) {
+			resp, err := datasource.GetMetadataManager().UnregisterService(schemaContext(), &pb.DeleteServiceRequest{
+				ServiceId: serviceID,
+				Force:     true,
+			})
+			assert.NotNil(t, resp)
+			assert.NoError(t, err)
+			assert.Equal(t, pb.ResponseSuccess, resp.Response.GetCode())
+			listTaskReq := model.ListTaskRequest{
+				Domain:       "sync-schema",
+				Project:      "sync-schema",
+				ResourceType: datasource.ResourceService,
+				Action:       csync.DeleteAction,
+				Status:       csync.PendingStatus,
+			}
+			tasks, err := task.List(schemaContext(), &listTaskReq)
+			assert.NoError(t, err)
+			assert.Equal(t, 1, len(tasks))
+			err = task.Delete(context.Background(), tasks...)
+			assert.NoError(t, err)
+			tasks, err = task.List(schemaContext(), &listTaskReq)
+			assert.NoError(t, err)
+			assert.Equal(t, 0, len(tasks))
+			tombstoneListReq := model.ListTombstoneRequest{
+				Domain:       "sync-schema",
+				Project:      "sync-schema",
+				ResourceType: datasource.ResourceService,
+			}
+			tombstones, err := tombstone.List(schemaContext(), &tombstoneListReq)
+			assert.NoError(t, err)
+			assert.Equal(t, 1, len(tombstones))
+			err = tombstone.Delete(schemaContext(), tombstones...)
+			assert.NoError(t, err)
+		})
+	})
+
+	datasource.EnableSync = false
+}