You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by ro...@apache.org on 2022/01/09 11:14:50 UTC
[servicecomb-service-center] branch master updated: [feat] add schema sync func and ut when db mode is etcd
This is an automated email from the ASF dual-hosted git repository.
robotljw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-service-center.git
The following commit(s) were added to refs/heads/master by this push:
new dcfc2ae [feat] add schema sync func and ut when db mode is etcd
new f1d9781 Merge pull request #1209 from robotLJW/master
dcfc2ae is described below
commit dcfc2ae871e9525795c964e0101ca20f7b739b6e
Author: robotljw <79...@qq.com>
AuthorDate: Fri Jan 7 19:32:01 2022 +0800
[feat] add schema sync func and ut when db mode is etcd
---
datasource/etcd/ms.go | 21 ++-
datasource/etcd/schema.go | 94 ++++++++++++--
datasource/etcd/schema_test.go | 281 +++++++++++++++++++++++++++++++++++++++++
3 files changed, 378 insertions(+), 18 deletions(-)
diff --git a/datasource/etcd/ms.go b/datasource/etcd/ms.go
index ce5ecf4..220030c 100644
--- a/datasource/etcd/ms.go
+++ b/datasource/etcd/ms.go
@@ -1544,13 +1544,24 @@ func (ds *MetadataManager) DeleteSchema(ctx context.Context, request *pb.DeleteS
return nil, schema.ErrSchemaNotFound
}
epSummaryKey := path.GenerateServiceSchemaSummaryKey(domainProject, request.ServiceId, request.SchemaId)
- resp, errDo := etcdadpt.TxnWithCmp(ctx,
- etcdadpt.Ops(
- etcdadpt.OpDel(etcdadpt.WithStrKey(epSummaryKey)),
- etcdadpt.OpDel(etcdadpt.WithStrKey(key)),
- ),
+ opts := []etcdadpt.OpOptions{etcdadpt.OpDel(etcdadpt.WithStrKey(epSummaryKey)), etcdadpt.OpDel(etcdadpt.WithStrKey(key))}
+ schemaKeyOpt, err := esync.GenDeleteOpts(ctx, datasource.ResourceKV, key, key)
+ if err != nil {
+ log.Error("fail to create delete opts", err)
+ return nil, err
+ }
+ opts = append(opts, schemaKeyOpt...)
+ schemaSummaryKeyOpt, err := esync.GenDeleteOpts(ctx, datasource.ResourceKV, epSummaryKey, epSummaryKey)
+ if err != nil {
+ log.Error("fail to create delete opts", err)
+ return nil, err
+ }
+ opts = append(opts, schemaSummaryKeyOpt...)
+
+ resp, errDo := etcdadpt.TxnWithCmp(ctx, opts,
etcdadpt.If(etcdadpt.NotEqualVer(path.GenerateServiceKey(domainProject, request.ServiceId), 0)),
nil)
+
if errDo != nil {
log.Error(fmt.Sprintf("delete schema[%s/%s] failed, operator: %s",
request.ServiceId, request.SchemaId, remoteIP), errDo)
diff --git a/datasource/etcd/schema.go b/datasource/etcd/schema.go
index 892cb8b..2f16dcd 100644
--- a/datasource/etcd/schema.go
+++ b/datasource/etcd/schema.go
@@ -22,17 +22,19 @@ import (
"encoding/json"
"fmt"
+ mapset "github.com/deckarep/golang-set"
+ "github.com/go-chassis/cari/discovery"
+ "github.com/little-cui/etcdadpt"
+ "go.etcd.io/etcd/api/v3/mvccpb"
+
"github.com/apache/servicecomb-service-center/datasource"
"github.com/apache/servicecomb-service-center/datasource/etcd/path"
"github.com/apache/servicecomb-service-center/datasource/etcd/sd"
- serviceUtil "github.com/apache/servicecomb-service-center/datasource/etcd/util"
+ "github.com/apache/servicecomb-service-center/datasource/etcd/sync"
+ eutil "github.com/apache/servicecomb-service-center/datasource/etcd/util"
"github.com/apache/servicecomb-service-center/datasource/schema"
"github.com/apache/servicecomb-service-center/pkg/log"
"github.com/apache/servicecomb-service-center/pkg/util"
- mapset "github.com/deckarep/golang-set"
- "github.com/go-chassis/cari/discovery"
- "github.com/little-cui/etcdadpt"
- "go.etcd.io/etcd/api/v3/mvccpb"
)
func init() {
@@ -55,7 +57,7 @@ func (dao *SchemaDAO) GetRef(ctx context.Context, refRequest *schema.RefRequest)
schemaID := refRequest.SchemaID
refKey := path.GenerateServiceSchemaRefKey(domainProject, serviceID, schemaID)
- refResp, err := sd.SchemaRef().Search(ctx, serviceUtil.ContextOptions(ctx, etcdadpt.WithStrKey(refKey))...)
+ refResp, err := sd.SchemaRef().Search(ctx, eutil.ContextOptions(ctx, etcdadpt.WithStrKey(refKey))...)
if err != nil {
log.Error(fmt.Sprintf("get service[%s] schema-ref[%s] failed", serviceID, schemaID), err)
return nil, err
@@ -83,7 +85,7 @@ func (dao *SchemaDAO) GetRef(ctx context.Context, refRequest *schema.RefRequest)
func getSummary(ctx context.Context, serviceID string, schemaID string) (string, error) {
domainProject := util.ParseDomainProject(ctx)
summaryKey := path.GenerateServiceSchemaSummaryKey(domainProject, serviceID, schemaID)
- summaryResp, err := sd.SchemaSummary().Search(ctx, serviceUtil.ContextOptions(ctx, etcdadpt.WithStrKey(summaryKey))...)
+ summaryResp, err := sd.SchemaSummary().Search(ctx, eutil.ContextOptions(ctx, etcdadpt.WithStrKey(summaryKey))...)
if err != nil {
return "", err
}
@@ -101,7 +103,7 @@ func (dao *SchemaDAO) ListRef(ctx context.Context, refRequest *schema.RefRequest
serviceID := refRequest.ServiceID
refPrefixKey := path.GenerateServiceSchemaRefKey(domainProject, serviceID, "")
- refResp, err := sd.SchemaRef().Search(ctx, serviceUtil.ContextOptions(ctx,
+ refResp, err := sd.SchemaRef().Search(ctx, eutil.ContextOptions(ctx,
etcdadpt.WithStrKey(refPrefixKey), etcdadpt.WithPrefix())...)
if err != nil {
log.Error(fmt.Sprintf("get service[%s] schema-refs failed", serviceID), err)
@@ -132,7 +134,7 @@ func (dao *SchemaDAO) ListRef(ctx context.Context, refRequest *schema.RefRequest
func getSummaryMap(ctx context.Context, serviceID string) (map[string]string, error) {
domainProject := util.ParseDomainProject(ctx)
summaryPrefixKey := path.GenerateServiceSchemaSummaryKey(domainProject, serviceID, "")
- summaryResp, err := sd.SchemaSummary().Search(ctx, serviceUtil.ContextOptions(ctx,
+ summaryResp, err := sd.SchemaSummary().Search(ctx, eutil.ContextOptions(ctx,
etcdadpt.WithStrKey(summaryPrefixKey), etcdadpt.WithPrefix())...)
if err != nil {
return nil, err
@@ -156,6 +158,19 @@ func (dao *SchemaDAO) DeleteRef(ctx context.Context, refRequest *schema.RefReque
etcdadpt.OpDel(etcdadpt.WithStrKey(refKey)),
etcdadpt.OpDel(etcdadpt.WithStrKey(summaryKey)),
}
+ refOpts, err := sync.GenDeleteOpts(ctx, datasource.ResourceKV, refKey, refKey)
+ if err != nil {
+ log.Error("fail to create delete opts", err)
+ return err
+ }
+ options = append(options, refOpts...)
+ summaryOpts, err := sync.GenDeleteOpts(ctx, datasource.ResourceKV, summaryKey, summaryKey)
+ if err != nil {
+ log.Error("fail to create delete opts", err)
+ return err
+ }
+ options = append(options, summaryOpts...)
+
cmp, err := etcdadpt.TxnWithCmp(ctx, options, etcdadpt.If(etcdadpt.ExistKey(refKey)), options)
if err != nil {
log.Error(fmt.Sprintf("delete service[%s] schema-ref[%s] failed", serviceID, schemaID), err)
@@ -213,6 +228,18 @@ func (dao *SchemaDAO) PutContent(ctx context.Context, contentRequest *schema.Put
etcdadpt.OpPut(etcdadpt.WithStrKey(refKey), etcdadpt.WithStrValue(content.Hash)),
etcdadpt.OpPut(etcdadpt.WithStrKey(summaryKey), etcdadpt.WithStrValue(content.Summary)),
}
+ refOpts, err := sync.GenUpdateOpts(ctx, datasource.ResourceKV, content.Hash, sync.WithOpts(map[string]string{"key": refKey}))
+ if err != nil {
+ log.Error("fail to create update opts", err)
+ return err
+ }
+ summaryOpts, err := sync.GenUpdateOpts(ctx, datasource.ResourceKV, content.Summary, sync.WithOpts(map[string]string{"key": summaryKey}))
+ if err != nil {
+ log.Error("fail to create update opts", err)
+ return err
+ }
+ existContentOptions = append(existContentOptions, refOpts...)
+ existContentOptions = append(existContentOptions, summaryOpts...)
// append the schemaID into service.Schemas if schemaID is new
if !util.SliceHave(service.Schemas, schemaID) {
@@ -230,6 +257,13 @@ func (dao *SchemaDAO) PutContent(ctx context.Context, contentRequest *schema.Put
newContentOptions := append(existContentOptions,
etcdadpt.OpPut(etcdadpt.WithStrKey(contentKey), etcdadpt.WithStrValue(content.Content)),
)
+ contentOpts, err := sync.GenUpdateOpts(ctx, datasource.ResourceKV, content.Content, sync.WithOpts(map[string]string{"key": contentKey}))
+ if err != nil {
+ log.Error("fail to create update opts", err)
+ return err
+ }
+ newContentOptions = append(newContentOptions, contentOpts...)
+
cmp, err := etcdadpt.TxnWithCmp(ctx, newContentOptions, etcdadpt.If(etcdadpt.NotExistKey(contentKey)), existContentOptions)
if err != nil {
log.Error(fmt.Sprintf("put kv[%s] failed", refKey), err)
@@ -261,7 +295,7 @@ func (dao *SchemaDAO) PutManyContent(ctx context.Context, contentRequest *schema
}
// unsafe!
- schemaIDs, options := transformSchemaIDsAndOptions(domainProject, serviceID, service.Schemas, contentRequest)
+ schemaIDs, options := transformSchemaIDsAndOptions(ctx, domainProject, serviceID, service.Schemas, contentRequest)
// should update service.Schemas
service.Schemas = schemaIDs
@@ -272,10 +306,18 @@ func (dao *SchemaDAO) PutManyContent(ctx context.Context, contentRequest *schema
}
serviceKey := path.GenerateServiceKey(domainProject, serviceID)
options = append(options, etcdadpt.OpPut(etcdadpt.WithStrKey(serviceKey), etcdadpt.WithValue(body)))
+ // update service task
+ serviceOpts, err := sync.GenUpdateOpts(ctx, datasource.ResourceKV, body, sync.WithOpts(map[string]string{"key": serviceKey}))
+ if err != nil {
+ log.Error("fail to create update opts", err)
+ }
+ options = append(options, serviceOpts...)
+
return etcdadpt.Txn(ctx, options)
}
-func transformSchemaIDsAndOptions(domainProject string, serviceID string, oldSchemaIDs []string, contentRequest *schema.PutManyContentRequest) ([]string, []etcdadpt.OpOptions) {
+func transformSchemaIDsAndOptions(ctx context.Context, domainProject string, serviceID string,
+ oldSchemaIDs []string, contentRequest *schema.PutManyContentRequest) ([]string, []etcdadpt.OpOptions) {
pendingDeleteSchemaIDs := mapset.NewSet()
for _, schemaID := range oldSchemaIDs {
pendingDeleteSchemaIDs.Add(schemaID)
@@ -293,6 +335,22 @@ func transformSchemaIDsAndOptions(domainProject string, serviceID string, oldSch
etcdadpt.OpPut(etcdadpt.WithStrKey(contentKey), etcdadpt.WithStrValue(content.Content)),
etcdadpt.OpPut(etcdadpt.WithStrKey(summaryKey), etcdadpt.WithStrValue(content.Summary)),
)
+ refOpts, err := sync.GenUpdateOpts(ctx, datasource.ResourceKV, content.Hash, sync.WithOpts(map[string]string{"key": refKey}))
+ if err != nil {
+ log.Error("fail to create update opts", err)
+ }
+ options = append(options, refOpts...)
+ contentOpts, err := sync.GenUpdateOpts(ctx, datasource.ResourceKV, content.Content, sync.WithOpts(map[string]string{"key": contentKey}))
+ if err != nil {
+ log.Error("fail to create update opts", err)
+ }
+ options = append(options, contentOpts...)
+ summaryOpts, err := sync.GenUpdateOpts(ctx, datasource.ResourceKV, content.Summary, sync.WithOpts(map[string]string{"key": summaryKey}))
+ if err != nil {
+ log.Error("fail to create update opts", err)
+ }
+ options = append(options, summaryOpts...)
+
schemaIDs = append(schemaIDs, schemaID)
pendingDeleteSchemaIDs.Remove(schemaID)
}
@@ -305,6 +363,16 @@ func transformSchemaIDsAndOptions(domainProject string, serviceID string, oldSch
etcdadpt.OpDel(etcdadpt.WithStrKey(refKey)),
etcdadpt.OpDel(etcdadpt.WithStrKey(summaryKey)),
)
+ refOpts, err := sync.GenDeleteOpts(ctx, datasource.ResourceKV, refKey, refKey)
+ if err != nil {
+ log.Error("fail to create update opts", err)
+ }
+ options = append(options, refOpts...)
+ summaryOpt, err := sync.GenDeleteOpts(ctx, datasource.ResourceKV, summaryKey, summaryKey)
+ if err != nil {
+ log.Error("fail to create update opts", err)
+ }
+ options = append(options, summaryOpt...)
}
return schemaIDs, options
}
@@ -341,7 +409,7 @@ func (dao *SchemaDAO) DeleteContent(ctx context.Context, contentRequest *schema.
func getContentHashMap(ctx context.Context) (map[string]struct{}, error) {
domainProject := util.ParseDomainProject(ctx)
refPrefixKey := path.GetServiceSchemaRefRootKey(domainProject) + path.SPLIT
- refResp, err := sd.SchemaRef().Search(ctx, serviceUtil.ContextOptions(ctx,
+ refResp, err := sd.SchemaRef().Search(ctx, eutil.ContextOptions(ctx,
etcdadpt.WithStrKey(refPrefixKey), etcdadpt.WithPrefix())...)
if err != nil {
return nil, err
@@ -393,7 +461,7 @@ func filterNoRefContentHashes(ctx context.Context, kvs []*mvccpb.KeyValue) (maps
}
refPrefixKey := path.GetServiceSchemaRefRootKey("")
- resp, err := sd.SchemaRef().Search(ctx, serviceUtil.ContextOptions(ctx,
+ resp, err := sd.SchemaRef().Search(ctx, eutil.ContextOptions(ctx,
etcdadpt.WithStrKey(refPrefixKey), etcdadpt.WithPrefix())...)
if err != nil {
return nil, err
diff --git a/datasource/etcd/schema_test.go b/datasource/etcd/schema_test.go
new file mode 100644
index 0000000..efc62e6
--- /dev/null
+++ b/datasource/etcd/schema_test.go
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package etcd_test
+
+import (
+ "context"
+ "testing"
+
+ pb "github.com/go-chassis/cari/discovery"
+ csync "github.com/go-chassis/cari/sync"
+ "github.com/stretchr/testify/assert"
+
+ "github.com/apache/servicecomb-service-center/datasource"
+ "github.com/apache/servicecomb-service-center/datasource/schema"
+ "github.com/apache/servicecomb-service-center/eventbase/model"
+ "github.com/apache/servicecomb-service-center/eventbase/service/task"
+ "github.com/apache/servicecomb-service-center/eventbase/service/tombstone"
+ "github.com/apache/servicecomb-service-center/pkg/util"
+ _ "github.com/apache/servicecomb-service-center/test"
+)
+
+func schemaContext() context.Context {
+ return util.WithNoCache(util.SetDomainProject(context.Background(), "sync-schema", "sync-schema"))
+}
+
+func TestSyncSchema(t *testing.T) {
+
+ datasource.EnableSync = true
+ var serviceID string
+
+ defer schema.Instance().DeleteContent(schemaContext(), &schema.ContentRequest{
+ Hash: "hash_1",
+ })
+ defer schema.Instance().DeleteContent(schemaContext(), &schema.ContentRequest{
+ Hash: "hash_2",
+ })
+ defer schema.Instance().DeleteContent(schemaContext(), &schema.ContentRequest{
+ Hash: "hash_2",
+ })
+
+ t.Run("register a micro service", func(t *testing.T) {
+ t.Run("register a service will create a service task should pass", func(t *testing.T) {
+ resp, err := datasource.GetMetadataManager().RegisterService(schemaContext(), &pb.CreateServiceRequest{
+ Service: &pb.MicroService{
+ AppId: "sync_schemas_prod",
+ ServiceName: "sync_schemas_service",
+ Version: "1.0.1",
+ Level: "FRONT",
+ Status: pb.MS_UP,
+ Environment: pb.ENV_PROD,
+ },
+ })
+ assert.NoError(t, err)
+ assert.Equal(t, pb.ResponseSuccess, resp.Response.GetCode())
+ serviceID = resp.ServiceId
+ listTaskReq := model.ListTaskRequest{
+ Domain: "sync-schema",
+ Project: "sync-schema",
+ Action: csync.CreateAction,
+ ResourceType: datasource.ResourceService,
+ Status: csync.PendingStatus,
+ }
+ tasks, err := task.List(context.Background(), &listTaskReq)
+ assert.NoError(t, err)
+ assert.Equal(t, 1, len(tasks))
+ err = task.Delete(context.Background(), tasks...)
+ assert.NoError(t, err)
+ })
+ })
+
+ t.Run("put schema will execute the PutContent func", func(t *testing.T) {
+ t.Run("put content with valid request, will create 3 kv tasks(hash summary content) should pass", func(t *testing.T) {
+ err := schema.Instance().PutContent(schemaContext(), &schema.PutContentRequest{
+ ServiceID: serviceID,
+ SchemaID: "schemaID_1",
+ Content: &schema.ContentItem{
+ Hash: "hash_1",
+ Summary: "summary_1",
+ Content: "1111111111",
+ },
+ })
+ assert.NoError(t, err)
+
+ ref, err := schema.Instance().GetRef(schemaContext(), &schema.RefRequest{
+ ServiceID: serviceID,
+ SchemaID: "schemaID_1",
+ })
+ assert.NoError(t, err)
+ assert.NotNil(t, ref)
+ assert.Equal(t, "summary_1", ref.Summary)
+ assert.Equal(t, "hash_1", ref.Hash)
+ listTaskReq := model.ListTaskRequest{
+ Domain: "sync-schema",
+ Project: "sync-schema",
+ Action: csync.UpdateAction,
+ ResourceType: datasource.ResourceKV,
+ Status: csync.PendingStatus,
+ }
+ tasks, err := task.List(context.Background(), &listTaskReq)
+ assert.NoError(t, err)
+ assert.Equal(t, 3, len(tasks))
+ err = task.Delete(context.Background(), tasks...)
+ assert.NoError(t, err)
+ })
+ })
+
+ t.Run("put schemas will execute the PutManyContent func", func(t *testing.T) {
+ t.Run("put many content with valid request, will create 7 kv update task (2 ref tasks, 2 content tasks, 2 summary tasks"+
+ " 1 service task), two delete kv task, two tombstones(ref and summary) should pass", func(t *testing.T) {
+ err := schema.Instance().PutManyContent(schemaContext(), &schema.PutManyContentRequest{
+ ServiceID: serviceID,
+ SchemaIDs: []string{"schemaID_2", "schemaID_3"},
+ Contents: []*schema.ContentItem{
+ {
+ Hash: "hash_2",
+ Content: "content_2",
+ Summary: "summary_2",
+ },
+ {
+ Hash: "hash_3",
+ Content: "content_3",
+ Summary: "summary_3",
+ },
+ },
+ })
+ assert.NoError(t, err)
+ ref, err := schema.Instance().GetRef(schemaContext(), &schema.RefRequest{
+ ServiceID: serviceID,
+ SchemaID: "schemaID_2",
+ })
+ assert.NoError(t, err)
+ assert.NotNil(t, ref)
+ assert.Equal(t, "summary_2", ref.Summary)
+ assert.Equal(t, "hash_2", ref.Hash)
+ ref, err = schema.Instance().GetRef(schemaContext(), &schema.RefRequest{
+ ServiceID: serviceID,
+ SchemaID: "schemaID_3",
+ })
+ assert.NoError(t, err)
+ assert.NotNil(t, ref)
+ assert.Equal(t, "summary_3", ref.Summary)
+ assert.Equal(t, "hash_3", ref.Hash)
+ listTaskReq := model.ListTaskRequest{
+ Domain: "sync-schema",
+ Project: "sync-schema",
+ Action: csync.UpdateAction,
+ ResourceType: datasource.ResourceKV,
+ Status: csync.PendingStatus,
+ }
+ tasks, err := task.List(context.Background(), &listTaskReq)
+ assert.NoError(t, err)
+ assert.Equal(t, 7, len(tasks))
+ err = task.Delete(context.Background(), tasks...)
+ assert.NoError(t, err)
+ listTaskReq = model.ListTaskRequest{
+ Domain: "sync-schema",
+ Project: "sync-schema",
+ Action: csync.DeleteAction,
+ ResourceType: datasource.ResourceKV,
+ Status: csync.PendingStatus,
+ }
+ tasks, err = task.List(context.Background(), &listTaskReq)
+ assert.NoError(t, err)
+ assert.Equal(t, 2, len(tasks))
+ err = task.Delete(context.Background(), tasks...)
+ assert.NoError(t, err)
+ tombstoneListReq := model.ListTombstoneRequest{
+ Domain: "sync-schema",
+ Project: "sync-schema",
+ ResourceType: datasource.ResourceKV,
+ }
+ tombstones, err := tombstone.List(context.Background(), &tombstoneListReq)
+ assert.NoError(t, err)
+ assert.Equal(t, 2, len(tombstones))
+ err = tombstone.Delete(context.Background(), tombstones...)
+ assert.NoError(t, err)
+ })
+ })
+
+ t.Run("delete schemas will execute the DeleteRef func and DeleteSchema func ", func(t *testing.T) {
+ t.Run("delete schemaID_2 and schemaID_3 will create 4 tasks(2 from DeleteRef, 2 from DeleteSchema ) "+
+ "and 4 tombstones (2 from DeleteRef, 2 from DeleteSchema) should pass", func(t *testing.T) {
+ err := schema.Instance().DeleteRef(schemaContext(), &schema.RefRequest{
+ ServiceID: serviceID,
+ SchemaID: "schemaID_2",
+ })
+ assert.NoError(t, err)
+ _, err = datasource.GetMetadataManager().DeleteSchema(schemaContext(), &pb.DeleteSchemaRequest{
+ ServiceId: serviceID,
+ SchemaId: "schemaID_2",
+ })
+ assert.Equal(t, schema.ErrSchemaNotFound, err)
+ err = schema.Instance().DeleteRef(schemaContext(), &schema.RefRequest{
+ ServiceID: serviceID,
+ SchemaID: "schemaID_3",
+ })
+ assert.NoError(t, err)
+ _, err = datasource.GetMetadataManager().DeleteSchema(schemaContext(), &pb.DeleteSchemaRequest{
+ ServiceId: serviceID,
+ SchemaId: "schemaID_3",
+ })
+ assert.Equal(t, schema.ErrSchemaNotFound, err)
+ listTaskReq := model.ListTaskRequest{
+ Domain: "sync-schema",
+ Project: "sync-schema",
+ Action: csync.DeleteAction,
+ ResourceType: datasource.ResourceKV,
+ Status: csync.PendingStatus,
+ }
+ tasks, err := task.List(context.Background(), &listTaskReq)
+ assert.NoError(t, err)
+ assert.Equal(t, 4, len(tasks))
+ err = task.Delete(context.Background(), tasks...)
+ assert.NoError(t, err)
+ tombstoneListReq := model.ListTombstoneRequest{
+ Domain: "sync-schema",
+ Project: "sync-schema",
+ ResourceType: datasource.ResourceKV,
+ }
+ tombstones, err := tombstone.List(context.Background(), &tombstoneListReq)
+ assert.NoError(t, err)
+ assert.Equal(t, 4, len(tombstones))
+ err = tombstone.Delete(context.Background(), tombstones...)
+ assert.NoError(t, err)
+ })
+ })
+
+ t.Run("unregister micro-service", func(t *testing.T) {
+ t.Run("unregister a micro service will create a task and a tombstone should pass", func(t *testing.T) {
+ resp, err := datasource.GetMetadataManager().UnregisterService(schemaContext(), &pb.DeleteServiceRequest{
+ ServiceId: serviceID,
+ Force: true,
+ })
+ assert.NotNil(t, resp)
+ assert.NoError(t, err)
+ assert.Equal(t, pb.ResponseSuccess, resp.Response.GetCode())
+ listTaskReq := model.ListTaskRequest{
+ Domain: "sync-schema",
+ Project: "sync-schema",
+ ResourceType: datasource.ResourceService,
+ Action: csync.DeleteAction,
+ Status: csync.PendingStatus,
+ }
+ tasks, err := task.List(schemaContext(), &listTaskReq)
+ assert.NoError(t, err)
+ assert.Equal(t, 1, len(tasks))
+ err = task.Delete(context.Background(), tasks...)
+ assert.NoError(t, err)
+ tasks, err = task.List(schemaContext(), &listTaskReq)
+ assert.NoError(t, err)
+ assert.Equal(t, 0, len(tasks))
+ tombstoneListReq := model.ListTombstoneRequest{
+ Domain: "sync-schema",
+ Project: "sync-schema",
+ ResourceType: datasource.ResourceService,
+ }
+ tombstones, err := tombstone.List(schemaContext(), &tombstoneListReq)
+ assert.NoError(t, err)
+ assert.Equal(t, 1, len(tombstones))
+ err = tombstone.Delete(schemaContext(), tombstones...)
+ assert.NoError(t, err)
+ })
+ })
+
+ datasource.EnableSync = false
+}