You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by ti...@apache.org on 2022/01/07 04:48:01 UTC

[servicecomb-service-center] branch master updated: [feat] add tag sync func and ut when db mode is etcd (#1204)

This is an automated email from the ASF dual-hosted git repository.

tianxiaoliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-service-center.git


The following commit(s) were added to refs/heads/master by this push:
     new 738f155  [feat] add tag sync func and ut when db mode is etcd (#1204)
738f155 is described below

commit 738f155d006d5e47e4e100257e7b3a8600a3ce5f
Author: robotljw <79...@qq.com>
AuthorDate: Fri Jan 7 12:47:54 2022 +0800

    [feat] add tag sync func and ut when db mode is etcd (#1204)
---
 datasource/common.go              |   1 +
 datasource/etcd/account.go        |  65 +++++-------
 datasource/etcd/account_test.go   |   8 +-
 datasource/etcd/dep.go            |  38 +++----
 datasource/etcd/ms.go             | 194 ++++++++++++++++------------------
 datasource/etcd/role.go           |  59 +++++------
 datasource/etcd/role_test.go      |   8 +-
 datasource/etcd/sync/sync.go      | 126 ++++++++++++++++++++++
 datasource/etcd/sync/sync_test.go |  58 +++++++++++
 datasource/etcd/tag_test.go       | 214 ++++++++++++++++++++++++++++++++++++++
 datasource/etcd/task_util.go      |  41 --------
 datasource/etcd/tombstone_util.go |  38 -------
 datasource/etcd/util/tag_util.go  |  21 ++--
 13 files changed, 581 insertions(+), 290 deletions(-)

diff --git a/datasource/common.go b/datasource/common.go
index a3b9453..5149fb2 100644
--- a/datasource/common.go
+++ b/datasource/common.go
@@ -36,6 +36,7 @@ const (
 	ResourceRole       = "role"
 	ResourceDependency = "dependency"
 	ResourceService    = "service"
+	ResourceKV         = "kv"
 )
 
 // WrapErrResponse is temp func here to wait finish to refact the discosvc pkg
diff --git a/datasource/etcd/account.go b/datasource/etcd/account.go
index d94352e..c0aded7 100644
--- a/datasource/etcd/account.go
+++ b/datasource/etcd/account.go
@@ -22,13 +22,13 @@ import (
 	"strconv"
 	"time"
 
-	rbacmodel "github.com/go-chassis/cari/rbac"
-	"github.com/go-chassis/cari/sync"
+	crbac "github.com/go-chassis/cari/rbac"
 	"github.com/go-chassis/foundation/stringutil"
 	"github.com/little-cui/etcdadpt"
 
 	"github.com/apache/servicecomb-service-center/datasource"
 	"github.com/apache/servicecomb-service-center/datasource/etcd/path"
+	esync "github.com/apache/servicecomb-service-center/datasource/etcd/sync"
 	"github.com/apache/servicecomb-service-center/datasource/rbac"
 	"github.com/apache/servicecomb-service-center/pkg/etcdsync"
 	"github.com/apache/servicecomb-service-center/pkg/log"
@@ -49,7 +49,7 @@ func NewRbacDAO(opts rbac.Options) (rbac.DAO, error) {
 type RbacDAO struct {
 }
 
-func (ds *RbacDAO) CreateAccount(ctx context.Context, a *rbacmodel.Account) error {
+func (ds *RbacDAO) CreateAccount(ctx context.Context, a *crbac.Account) error {
 	lock, err := etcdsync.Lock("/account-creating/"+a.Name, -1, false)
 	if err != nil {
 		return fmt.Errorf("account %s is creating", a.Name)
@@ -83,14 +83,12 @@ func (ds *RbacDAO) CreateAccount(ctx context.Context, a *rbacmodel.Account) erro
 		log.Error("", err)
 		return err
 	}
-	if datasource.EnableSync {
-		op, err := GenTaskOpts("", "", sync.CreateAction, datasource.ResourceAccount, a)
-		if err != nil {
-			log.Error("", err)
-			return err
-		}
-		opts = append(opts, op)
+	syncOpts, err := esync.GenCreateOpts(ctx, datasource.ResourceAccount, a)
+	if err != nil {
+		log.Error("fail to create sync opts", err)
+		return err
 	}
+	opts = append(opts, syncOpts...)
 	err = etcdadpt.Txn(ctx, opts)
 	if err != nil {
 		log.Error("can not save account info", err)
@@ -100,7 +98,7 @@ func (ds *RbacDAO) CreateAccount(ctx context.Context, a *rbacmodel.Account) erro
 	return nil
 }
 
-func GenAccountOpts(a *rbacmodel.Account, action etcdadpt.Action) ([]etcdadpt.OpOptions, error) {
+func GenAccountOpts(a *crbac.Account, action etcdadpt.Action) ([]etcdadpt.OpOptions, error) {
 	opts := make([]etcdadpt.OpOptions, 0)
 	value, err := json.Marshal(a)
 	if err != nil {
@@ -126,7 +124,7 @@ func GenAccountOpts(a *rbacmodel.Account, action etcdadpt.Action) ([]etcdadpt.Op
 func (ds *RbacDAO) AccountExist(ctx context.Context, name string) (bool, error) {
 	return etcdadpt.Exist(ctx, path.GenerateRBACAccountKey(name))
 }
-func (ds *RbacDAO) GetAccount(ctx context.Context, name string) (*rbacmodel.Account, error) {
+func (ds *RbacDAO) GetAccount(ctx context.Context, name string) (*crbac.Account, error) {
 	kv, err := etcdadpt.Get(ctx, path.GenerateRBACAccountKey(name))
 	if err != nil {
 		return nil, err
@@ -134,7 +132,7 @@ func (ds *RbacDAO) GetAccount(ctx context.Context, name string) (*rbacmodel.Acco
 	if kv == nil {
 		return nil, rbac.ErrAccountNotExist
 	}
-	account := &rbacmodel.Account{}
+	account := &crbac.Account{}
 	err = json.Unmarshal(kv.Value, account)
 	if err != nil {
 		log.Error("account info format invalid", err)
@@ -144,7 +142,7 @@ func (ds *RbacDAO) GetAccount(ctx context.Context, name string) (*rbacmodel.Acco
 	return account, nil
 }
 
-func (ds *RbacDAO) compatibleOldVersionAccount(a *rbacmodel.Account) {
+func (ds *RbacDAO) compatibleOldVersionAccount(a *crbac.Account) {
 	// old version use Role, now use Roles
 	// Role/Roles will not exist at the same time
 	if len(a.Role) == 0 {
@@ -154,14 +152,14 @@ func (ds *RbacDAO) compatibleOldVersionAccount(a *rbacmodel.Account) {
 	a.Role = ""
 }
 
-func (ds *RbacDAO) ListAccount(ctx context.Context) ([]*rbacmodel.Account, int64, error) {
+func (ds *RbacDAO) ListAccount(ctx context.Context) ([]*crbac.Account, int64, error) {
 	kvs, n, err := etcdadpt.List(ctx, path.GenerateRBACAccountKey(""))
 	if err != nil {
 		return nil, 0, err
 	}
-	accounts := make([]*rbacmodel.Account, 0, n)
+	accounts := make([]*crbac.Account, 0, n)
 	for _, v := range kvs {
-		a := &rbacmodel.Account{}
+		a := &crbac.Account{}
 		err = json.Unmarshal(v.Value, a)
 		if err != nil {
 			log.Error("account info format invalid:", err)
@@ -194,20 +192,13 @@ func (ds *RbacDAO) DeleteAccount(ctx context.Context, names []string) (bool, err
 			continue //do not fail if some account is invalid
 
 		}
-		if datasource.EnableSync {
-			taskOpt, err := GenTaskOpts("", "", sync.DeleteAction, datasource.ResourceAccount, a)
-			if err != nil {
-				log.Error("", err)
-				return false, err
-			}
-			tombstoneOpt, err := GenTombstoneOpts("", "", datasource.ResourceAccount, a.Name)
-			if err != nil {
-				log.Error("", err)
-				return false, err
-			}
-			opts = append(opts, tombstoneOpt, taskOpt)
+		syncOpts, err := esync.GenDeleteOpts(ctx, datasource.ResourceAccount, a.Name, a)
+		if err != nil {
+			log.Error("fail to create sync opts", err)
+			return false, err
 		}
 		allOpts = append(allOpts, opts...)
+		allOpts = append(allOpts, syncOpts...)
 	}
 	err := etcdadpt.Txn(ctx, allOpts)
 	if err != nil {
@@ -217,7 +208,7 @@ func (ds *RbacDAO) DeleteAccount(ctx context.Context, names []string) (bool, err
 	return true, nil
 }
 
-func (ds *RbacDAO) UpdateAccount(ctx context.Context, name string, account *rbacmodel.Account) error {
+func (ds *RbacDAO) UpdateAccount(ctx context.Context, name string, account *crbac.Account) error {
 	var (
 		opts []etcdadpt.OpOptions
 		err  error
@@ -247,14 +238,12 @@ func (ds *RbacDAO) UpdateAccount(ctx context.Context, name string, account *rbac
 		}
 		opts = append(opts, opt)
 	}
-	if datasource.EnableSync {
-		op, err := GenTaskOpts("", "", sync.UpdateAction, datasource.ResourceAccount, account)
-		if err != nil {
-			log.Error("", err)
-			return err
-		}
-		opts = append(opts, op)
+	syncOpts, err := esync.GenUpdateOpts(ctx, datasource.ResourceAccount, account)
+	if err != nil {
+		log.Error("fail to create sync opts", err)
+		return err
 	}
+	opts = append(opts, syncOpts...)
 	err = etcdadpt.Txn(ctx, opts)
 	if err != nil {
 		log.Error("BatchCommit failed", err)
@@ -262,7 +251,7 @@ func (ds *RbacDAO) UpdateAccount(ctx context.Context, name string, account *rbac
 	return err
 }
 
-func hasRole(account *rbacmodel.Account, r string) bool {
+func hasRole(account *crbac.Account, r string) bool {
 	for _, n := range account.Roles {
 		if r == n {
 			return true
diff --git a/datasource/etcd/account_test.go b/datasource/etcd/account_test.go
index 47a3771..8284592 100644
--- a/datasource/etcd/account_test.go
+++ b/datasource/etcd/account_test.go
@@ -21,7 +21,7 @@ import (
 	"context"
 	"testing"
 
-	rbacmodel "github.com/go-chassis/cari/rbac"
+	crbac "github.com/go-chassis/cari/rbac"
 	"github.com/stretchr/testify/assert"
 
 	"github.com/apache/servicecomb-service-center/datasource"
@@ -39,7 +39,7 @@ func TestSyncAccount(t *testing.T) {
 	t.Run("create account", func(t *testing.T) {
 		t.Run("creating a account then delete it will create two tasks and a tombstone should pass",
 			func(t *testing.T) {
-				a1 := rbacmodel.Account{
+				a1 := crbac.Account{
 					ID:                  "sync-create-11111",
 					Name:                "sync-create-account1",
 					Password:            "tnuocca-tset",
@@ -78,7 +78,7 @@ func TestSyncAccount(t *testing.T) {
 	t.Run("update account", func(t *testing.T) {
 		t.Run("creating two accounts then update them,finally delete them, will create six tasks and two tombstones should pass",
 			func(t *testing.T) {
-				a2 := rbacmodel.Account{
+				a2 := crbac.Account{
 					ID:                  "sync-update-22222",
 					Name:                "sync-update-account2",
 					Password:            "tnuocca-tset",
@@ -86,7 +86,7 @@ func TestSyncAccount(t *testing.T) {
 					TokenExpirationTime: "2020-12-30",
 					CurrentPassword:     "tnuocca-tset",
 				}
-				a3 := rbacmodel.Account{
+				a3 := crbac.Account{
 					ID:                  "sync-update-33333",
 					Name:                "sync-update-account3",
 					Password:            "tnuocca-tset",
diff --git a/datasource/etcd/dep.go b/datasource/etcd/dep.go
index 2a719a8..bf0eb74 100644
--- a/datasource/etcd/dep.go
+++ b/datasource/etcd/dep.go
@@ -24,14 +24,14 @@ import (
 	"fmt"
 
 	pb "github.com/go-chassis/cari/discovery"
-	"github.com/go-chassis/cari/sync"
 	"github.com/little-cui/etcdadpt"
 
 	"github.com/apache/servicecomb-service-center/datasource"
 	"github.com/apache/servicecomb-service-center/datasource/etcd/event"
 	"github.com/apache/servicecomb-service-center/datasource/etcd/path"
 	"github.com/apache/servicecomb-service-center/datasource/etcd/sd"
-	serviceUtil "github.com/apache/servicecomb-service-center/datasource/etcd/util"
+	esync "github.com/apache/servicecomb-service-center/datasource/etcd/sync"
+	eutil "github.com/apache/servicecomb-service-center/datasource/etcd/util"
 	"github.com/apache/servicecomb-service-center/pkg/log"
 	"github.com/apache/servicecomb-service-center/pkg/util"
 )
@@ -42,7 +42,7 @@ type DepManager struct {
 func (dm *DepManager) SearchProviderDependency(ctx context.Context, request *pb.GetDependenciesRequest) (*pb.GetProDependenciesResponse, error) {
 	domainProject := util.ParseDomainProject(ctx)
 	providerServiceID := request.ServiceId
-	provider, err := serviceUtil.GetService(ctx, domainProject, providerServiceID)
+	provider, err := eutil.GetService(ctx, domainProject, providerServiceID)
 
 	if err != nil {
 		if errors.Is(err, datasource.ErrNoData) {
@@ -55,7 +55,7 @@ func (dm *DepManager) SearchProviderDependency(ctx context.Context, request *pb.
 		return nil, err
 	}
 
-	services, err := serviceUtil.GetConsumers(ctx, domainProject, provider, toDependencyFilterOptions(request)...)
+	services, err := eutil.GetConsumers(ctx, domainProject, provider, toDependencyFilterOptions(request)...)
 	if err != nil {
 		log.Error(fmt.Sprintf("query provider failed, provider is %s/%s/%s/%s",
 			provider.Environment, provider.AppId, provider.ServiceName, provider.Version), err)
@@ -73,7 +73,7 @@ func (dm *DepManager) SearchProviderDependency(ctx context.Context, request *pb.
 func (dm *DepManager) SearchConsumerDependency(ctx context.Context, request *pb.GetDependenciesRequest) (*pb.GetConDependenciesResponse, error) {
 	consumerID := request.ServiceId
 	domainProject := util.ParseDomainProject(ctx)
-	consumer, err := serviceUtil.GetService(ctx, domainProject, consumerID)
+	consumer, err := eutil.GetService(ctx, domainProject, consumerID)
 
 	if err != nil {
 		if errors.Is(err, datasource.ErrNoData) {
@@ -86,7 +86,7 @@ func (dm *DepManager) SearchConsumerDependency(ctx context.Context, request *pb.
 		return nil, err
 	}
 
-	services, err := serviceUtil.GetProviders(ctx, domainProject, consumer, toDependencyFilterOptions(request)...)
+	services, err := eutil.GetProviders(ctx, domainProject, consumer, toDependencyFilterOptions(request)...)
 	if err != nil {
 		log.Error(fmt.Sprintf("query consumer failed, consumer is %s/%s/%s/%s",
 			consumer.Environment, consumer.AppId, consumer.ServiceName, consumer.Version), err)
@@ -136,7 +136,7 @@ func (dm *DepManager) AddOrUpdateDependencies(ctx context.Context, dependencyInf
 			return rsp.Response, nil
 		}
 
-		consumerID, err := serviceUtil.GetServiceID(ctx, consumerInfo)
+		consumerID, err := eutil.GetServiceID(ctx, consumerInfo)
 		if err != nil {
 			log.Error(fmt.Sprintf("put request into dependency queue failed, override: %t, get consumer[%s] id failed",
 				override, consumerFlag), err)
@@ -164,21 +164,17 @@ func (dm *DepManager) AddOrUpdateDependencies(ctx context.Context, dependencyInf
 		opts = append(opts, etcdadpt.OpPut(etcdadpt.WithStrKey(key), etcdadpt.WithValue(data)))
 	}
 
-	if datasource.EnableSync {
-		action := sync.UpdateAction
-		if override {
-			action = sync.CreateAction
-		}
-		domain := util.ParseDomain(ctx)
-		project := util.ParseProject(ctx)
-		taskOpt, err := GenTaskOpts(domain, project, action, datasource.ResourceDependency, dependencyInfos)
-		if err != nil {
-			log.Error("fail to create task", err)
-			return pb.CreateResponse(pb.ErrInternal, err.Error()), err
-		}
-		opts = append(opts, taskOpt)
+	syncOpts, err := esync.GenCreateOpts(ctx, datasource.ResourceDependency, dependencyInfos)
+	if !override {
+		syncOpts, err = esync.GenUpdateOpts(ctx, datasource.ResourceDependency, dependencyInfos)
 	}
-	err := etcdadpt.Txn(ctx, opts)
+	if err != nil {
+		log.Error("fail to create sync opts", err)
+		return pb.CreateResponse(pb.ErrInternal, err.Error()), err
+	}
+	opts = append(opts, syncOpts...)
+
+	err = etcdadpt.Txn(ctx, opts)
 	if err != nil {
 		log.Error(fmt.Sprintf("put request into dependency queue failed, override: %t, %v",
 			override, dependencyInfos), err)
diff --git a/datasource/etcd/ms.go b/datasource/etcd/ms.go
index 2747834..ce5ecf4 100644
--- a/datasource/etcd/ms.go
+++ b/datasource/etcd/ms.go
@@ -25,25 +25,25 @@ import (
 	"strconv"
 	"time"
 
-	"github.com/go-chassis/cari/sync"
+	pb "github.com/go-chassis/cari/discovery"
+	"github.com/go-chassis/cari/pkg/errsvc"
+	"github.com/go-chassis/foundation/gopool"
+	"github.com/jinzhu/copier"
+	"github.com/little-cui/etcdadpt"
 
 	"github.com/apache/servicecomb-service-center/datasource"
 	"github.com/apache/servicecomb-service-center/datasource/etcd/cache"
 	"github.com/apache/servicecomb-service-center/datasource/etcd/path"
 	"github.com/apache/servicecomb-service-center/datasource/etcd/sd"
 	"github.com/apache/servicecomb-service-center/datasource/etcd/state/kvstore"
-	serviceUtil "github.com/apache/servicecomb-service-center/datasource/etcd/util"
+	esync "github.com/apache/servicecomb-service-center/datasource/etcd/sync"
+	eutil "github.com/apache/servicecomb-service-center/datasource/etcd/util"
 	"github.com/apache/servicecomb-service-center/datasource/schema"
 	"github.com/apache/servicecomb-service-center/pkg/log"
 	"github.com/apache/servicecomb-service-center/pkg/util"
 	"github.com/apache/servicecomb-service-center/server/core"
 	"github.com/apache/servicecomb-service-center/server/plugin/uuid"
 	quotasvc "github.com/apache/servicecomb-service-center/server/service/quota"
-	pb "github.com/go-chassis/cari/discovery"
-	"github.com/go-chassis/cari/pkg/errsvc"
-	"github.com/go-chassis/foundation/gopool"
-	"github.com/jinzhu/copier"
-	"github.com/little-cui/etcdadpt"
 )
 
 var (
@@ -127,18 +127,14 @@ func (ds *MetadataManager) RegisterService(ctx context.Context, request *pb.Crea
 		failOpts = append(failOpts, etcdadpt.OpGet(etcdadpt.WithStrKey(alias)))
 	}
 
-	if datasource.EnableSync {
-		domain := util.ParseDomain(ctx)
-		project := util.ParseProject(ctx)
-		taskOpt, err := GenTaskOpts(domain, project, sync.CreateAction, datasource.ResourceService, request)
-		if err != nil {
-			log.Error("fail to create task", err)
-			return &pb.CreateServiceResponse{
-				Response: pb.CreateResponse(pb.ErrInternal, err.Error()),
-			}, err
-		}
-		opts = append(opts, taskOpt)
+	syncOpts, err := esync.GenCreateOpts(ctx, datasource.ResourceService, request)
+	if err != nil {
+		log.Error("fail to create sync opts", err)
+		return &pb.CreateServiceResponse{
+			Response: pb.CreateResponse(pb.ErrInternal, err.Error()),
+		}, err
 	}
+	opts = append(opts, syncOpts...)
 
 	resp, err := etcdadpt.TxnWithCmp(ctx, opts, uniqueCmpOpts, failOpts)
 	if err != nil {
@@ -192,7 +188,7 @@ func (ds *MetadataManager) RegisterService(ctx context.Context, request *pb.Crea
 
 func (ds *MetadataManager) GetServices(ctx context.Context, request *pb.GetServicesRequest) (
 	*pb.GetServicesResponse, error) {
-	services, err := serviceUtil.GetAllServiceUtil(ctx)
+	services, err := eutil.GetAllServiceUtil(ctx)
 	if err != nil {
 		log.Error("get all services by domain failed", err)
 		return &pb.GetServicesResponse{
@@ -209,7 +205,7 @@ func (ds *MetadataManager) GetServices(ctx context.Context, request *pb.GetServi
 func (ds *MetadataManager) GetService(ctx context.Context, request *pb.GetServiceRequest) (
 	*pb.MicroService, error) {
 	domainProject := util.ParseDomainProject(ctx)
-	singleService, err := serviceUtil.GetService(ctx, domainProject, request.ServiceId)
+	singleService, err := eutil.GetService(ctx, domainProject, request.ServiceId)
 
 	if err != nil {
 		if errors.Is(err, datasource.ErrNoData) {
@@ -226,7 +222,7 @@ func (ds *MetadataManager) GetServiceDetail(ctx context.Context, request *pb.Get
 	*pb.ServiceDetail, error) {
 	domainProject := util.ParseDomainProject(ctx)
 
-	service, err := serviceUtil.GetService(ctx, domainProject, request.ServiceId)
+	service, err := eutil.GetService(ctx, domainProject, request.ServiceId)
 	if err != nil {
 		if errors.Is(err, datasource.ErrNoData) {
 			return nil, pb.NewError(pb.ErrServiceNotExists, "Service does not exist.")
@@ -297,7 +293,7 @@ func (ds *MetadataManager) ListServiceDetail(ctx context.Context, request *pb.Ge
 	}
 
 	//获取所有服务
-	services, err := serviceUtil.GetAllServiceUtil(ctx)
+	services, err := eutil.GetAllServiceUtil(ctx)
 	if err != nil {
 		log.Error("get all services by domain failed", err)
 		return nil, pb.NewError(pb.ErrInternal, err.Error())
@@ -370,7 +366,7 @@ func (ds *MetadataManager) ListApp(ctx context.Context, request *pb.GetAppsReque
 	domainProject := util.ParseDomainProject(ctx)
 	key := path.GetServiceAppKey(domainProject, request.Environment, "")
 
-	opts := append(serviceUtil.FromContext(ctx),
+	opts := append(eutil.FromContext(ctx),
 		etcdadpt.WithStrKey(key),
 		etcdadpt.WithPrefix(),
 		etcdadpt.WithKeyOnly())
@@ -410,7 +406,7 @@ func (ds *MetadataManager) ExistServiceByID(ctx context.Context, request *pb.Get
 	domainProject := util.ParseDomainProject(ctx)
 	return &pb.GetExistenceByIDResponse{
 		Response: pb.CreateResponse(pb.ResponseSuccess, "Get all applications successfully."),
-		Exist:    serviceUtil.ServiceExist(ctx, domainProject, request.ServiceId),
+		Exist:    eutil.ServiceExist(ctx, domainProject, request.ServiceId),
 	}, nil
 }
 
@@ -420,7 +416,7 @@ func (ds *MetadataManager) ExistService(ctx context.Context, request *pb.GetExis
 	serviceFlag := util.StringJoin([]string{
 		request.Environment, request.AppId, request.ServiceName, request.Version}, path.SPLIT)
 
-	ids, exist, err := serviceUtil.FindServiceIds(ctx, &pb.MicroServiceKey{
+	ids, exist, err := eutil.FindServiceIds(ctx, &pb.MicroServiceKey{
 		Environment: request.Environment,
 		AppId:       request.AppId,
 		ServiceName: request.ServiceName,
@@ -458,7 +454,7 @@ func (ds *MetadataManager) UpdateService(ctx context.Context, request *pb.Update
 	domainProject := util.ParseDomainProject(ctx)
 
 	key := path.GenerateServiceKey(domainProject, request.ServiceId)
-	microservice, err := serviceUtil.GetService(ctx, domainProject, request.ServiceId)
+	microservice, err := eutil.GetService(ctx, domainProject, request.ServiceId)
 	if err != nil {
 		if errors.Is(err, datasource.ErrNoData) {
 			log.Debug(fmt.Sprintf("service does not exist, update service[%s] properties failed, operator: %s",
@@ -490,18 +486,14 @@ func (ds *MetadataManager) UpdateService(ctx context.Context, request *pb.Update
 	opts := []etcdadpt.OpOptions{
 		etcdadpt.OpPut(etcdadpt.WithStrKey(key), etcdadpt.WithValue(data)),
 	}
-	if datasource.EnableSync {
-		domain := util.ParseDomain(ctx)
-		project := util.ParseProject(ctx)
-		taskOpt, err := GenTaskOpts(domain, project, sync.UpdateAction, datasource.ResourceService, request)
-		if err != nil {
-			log.Error("fail to create task", err)
-			return &pb.UpdateServicePropsResponse{
-				Response: pb.CreateResponse(pb.ErrInternal, err.Error()),
-			}, err
-		}
-		opts = append(opts, taskOpt)
+	syncOpts, err := esync.GenUpdateOpts(ctx, datasource.ResourceService, request)
+	if err != nil {
+		log.Error("fail to create task", err)
+		return &pb.UpdateServicePropsResponse{
+			Response: pb.CreateResponse(pb.ErrInternal, err.Error()),
+		}, err
 	}
+	opts = append(opts, syncOpts...)
 
 	// Set key file
 	resp, err := etcdadpt.TxnWithCmp(ctx, opts, etcdadpt.If(etcdadpt.NotEqualVer(key, 0)), nil)
@@ -660,7 +652,7 @@ func (ds *MetadataManager) sendHeartbeatInstead(ctx context.Context, instance *p
 
 func (ds *MetadataManager) ExistInstanceByID(ctx context.Context, request *pb.MicroServiceInstanceKey) (*pb.GetExistenceByIDResponse, error) {
 	domainProject := util.ParseDomainProject(ctx)
-	exist, _ := serviceUtil.InstanceExist(ctx, domainProject, request.ServiceId, request.InstanceId)
+	exist, _ := eutil.InstanceExist(ctx, domainProject, request.ServiceId, request.InstanceId)
 	if !exist {
 		return &pb.GetExistenceByIDResponse{
 			Response: pb.CreateResponse(pb.ErrInstanceNotExists, "Check instance exist failed."),
@@ -680,7 +672,7 @@ func (ds *MetadataManager) GetInstance(ctx context.Context, request *pb.GetOneIn
 	service := &pb.MicroService{}
 	var err error
 	if len(request.ConsumerServiceId) > 0 {
-		service, err = serviceUtil.GetService(ctx, domainProject, request.ConsumerServiceId)
+		service, err = eutil.GetService(ctx, domainProject, request.ConsumerServiceId)
 		if err != nil {
 			if errors.Is(err, datasource.ErrNoData) {
 				log.Debug(fmt.Sprintf("consumer does not exist in db, consumer[%s] find provider instance[%s/%s]",
@@ -698,7 +690,7 @@ func (ds *MetadataManager) GetInstance(ctx context.Context, request *pb.GetOneIn
 		}
 	}
 
-	provider, err := serviceUtil.GetService(ctx, domainProject, request.ProviderServiceId)
+	provider, err := eutil.GetService(ctx, domainProject, request.ProviderServiceId)
 	if err != nil {
 		if errors.Is(err, datasource.ErrNoData) {
 			log.Debug(fmt.Sprintf("provider does not exist in db, consumer[%s] find provider instance[%s/%s]",
@@ -761,7 +753,7 @@ func (ds *MetadataManager) GetInstances(ctx context.Context, request *pb.GetInst
 	service := &pb.MicroService{}
 	var err error
 	if len(request.ConsumerServiceId) > 0 {
-		service, err = serviceUtil.GetService(ctx, domainProject, request.ConsumerServiceId)
+		service, err = eutil.GetService(ctx, domainProject, request.ConsumerServiceId)
 		if err != nil {
 			if errors.Is(err, datasource.ErrNoData) {
 				log.Debug(fmt.Sprintf("consumer does not exist in db, consumer[%s] find provider[%s] instances",
@@ -779,7 +771,7 @@ func (ds *MetadataManager) GetInstances(ctx context.Context, request *pb.GetInst
 		}
 	}
 
-	provider, err := serviceUtil.GetService(ctx, domainProject, request.ProviderServiceId)
+	provider, err := eutil.GetService(ctx, domainProject, request.ProviderServiceId)
 	if err != nil {
 		if errors.Is(err, datasource.ErrNoData) {
 			log.Debug(fmt.Sprintf("provider does not exist, consumer[%s] find provider[%s] instances",
@@ -844,7 +836,7 @@ func (ds *MetadataManager) GetProviderInstances(ctx context.Context, request *pb
 	if err != nil {
 		return
 	}
-	return instances, serviceUtil.FormatRevision(maxRevs, counts), nil
+	return instances, eutil.FormatRevision(maxRevs, counts), nil
 }
 
 func (ds *MetadataManager) BatchGetProviderInstances(ctx context.Context, request *pb.BatchGetInstancesRequest) (instances []*pb.MicroServiceInstance, rev string, err error) {
@@ -865,12 +857,12 @@ func (ds *MetadataManager) BatchGetProviderInstances(ctx context.Context, reques
 		instances = append(instances, insts...)
 	}
 
-	return instances, serviceUtil.FormatRevision(maxRevs, counts), nil
+	return instances, eutil.FormatRevision(maxRevs, counts), nil
 }
 
 func (ds *MetadataManager) findInstances(ctx context.Context, domainProject, serviceID string, maxRevs []int64, counts []int64) (instances []*pb.MicroServiceInstance, err error) {
 	key := path.GenerateInstanceKey(domainProject, serviceID, "")
-	opts := append(serviceUtil.FromContext(ctx), etcdadpt.WithStrKey(key), etcdadpt.WithPrefix())
+	opts := append(eutil.FromContext(ctx), etcdadpt.WithStrKey(key), etcdadpt.WithPrefix())
 	resp, err := sd.Instance().Search(ctx, opts...)
 	if err != nil {
 		return nil, err
@@ -922,7 +914,7 @@ func (ds *MetadataManager) findInstance(ctx context.Context, request *pb.FindIns
 	domainProject := util.ParseDomainProject(ctx)
 	service := &pb.MicroService{Environment: request.Environment}
 	if len(request.ConsumerServiceId) > 0 {
-		service, err = serviceUtil.GetService(ctx, domainProject, request.ConsumerServiceId)
+		service, err = eutil.GetService(ctx, domainProject, request.ConsumerServiceId)
 		if err != nil {
 			if errors.Is(err, datasource.ErrNoData) {
 				log.Debug(fmt.Sprintf("consumer does not exist, consumer[%s] find provider[%s/%s/%s]",
@@ -976,7 +968,7 @@ func (ds *MetadataManager) findInstance(ctx context.Context, request *pb.FindIns
 			return nil, err
 		}
 		if provider != nil {
-			err = serviceUtil.AddServiceVersionRule(ctx, domainProject, service, provider)
+			err = eutil.AddServiceVersionRule(ctx, domainProject, service, provider)
 		} else {
 			err := fmt.Errorf("%s failed, provider does not exist", findFlag)
 			log.Error("AddServiceVersionRule failed", err)
@@ -1040,7 +1032,7 @@ func (ds *MetadataManager) genFindResult(ctx context.Context, oldRev string, ite
 func (ds *MetadataManager) reshapeProviderKey(ctx context.Context, provider *pb.MicroServiceKey, providerID string) (
 	*pb.MicroServiceKey, error) {
 	// service name 可能是别名,所以重新获取
-	providerService, err := serviceUtil.GetService(ctx, provider.Tenant, providerID)
+	providerService, err := eutil.GetService(ctx, provider.Tenant, providerID)
 	if err != nil {
 		return nil, err
 	}
@@ -1054,7 +1046,7 @@ func (ds *MetadataManager) UpdateInstanceStatus(ctx context.Context, request *pb
 	domainProject := util.ParseDomainProject(ctx)
 	updateStatusFlag := util.StringJoin([]string{request.ServiceId, request.InstanceId, request.Status}, path.SPLIT)
 
-	instance, err := serviceUtil.GetInstance(ctx, domainProject, request.ServiceId, request.InstanceId)
+	instance, err := eutil.GetInstance(ctx, domainProject, request.ServiceId, request.InstanceId)
 	if err != nil {
 		log.Error(fmt.Sprintf("update instance[%s] status failed", updateStatusFlag), err)
 		return &pb.UpdateInstanceStatusResponse{
@@ -1071,7 +1063,7 @@ func (ds *MetadataManager) UpdateInstanceStatus(ctx context.Context, request *pb
 	copyInstanceRef := *instance
 	copyInstanceRef.Status = request.Status
 
-	if err := serviceUtil.UpdateInstance(ctx, domainProject, &copyInstanceRef); err != nil {
+	if err := eutil.UpdateInstance(ctx, domainProject, &copyInstanceRef); err != nil {
 		log.Error(fmt.Sprintf("update instance[%s] status failed", updateStatusFlag), err)
 		resp := &pb.UpdateInstanceStatusResponse{
 			Response: pb.CreateResponseWithSCErr(err),
@@ -1093,7 +1085,7 @@ func (ds *MetadataManager) UpdateInstanceProperties(ctx context.Context, request
 	domainProject := util.ParseDomainProject(ctx)
 	instanceFlag := util.StringJoin([]string{request.ServiceId, request.InstanceId}, path.SPLIT)
 
-	instance, err := serviceUtil.GetInstance(ctx, domainProject, request.ServiceId, request.InstanceId)
+	instance, err := eutil.GetInstance(ctx, domainProject, request.ServiceId, request.InstanceId)
 	if err != nil {
 		log.Error(fmt.Sprintf("update instance[%s] properties failed", instanceFlag), err)
 		return &pb.UpdateInstancePropsResponse{
@@ -1110,7 +1102,7 @@ func (ds *MetadataManager) UpdateInstanceProperties(ctx context.Context, request
 	copyInstanceRef := *instance
 	copyInstanceRef.Properties = request.Properties
 
-	if err := serviceUtil.UpdateInstance(ctx, domainProject, &copyInstanceRef); err != nil {
+	if err := eutil.UpdateInstance(ctx, domainProject, &copyInstanceRef); err != nil {
 		log.Error(fmt.Sprintf("update instance[%s] properties failed", instanceFlag), err)
 		resp := &pb.UpdateInstancePropsResponse{
 			Response: pb.CreateResponseWithSCErr(err),
@@ -1222,7 +1214,7 @@ func (ds *MetadataManager) batchFindServices(ctx context.Context, request *pb.Ba
 			return nil, err
 		}
 		failed, ok := failedResult[resp.Response.GetCode()]
-		serviceUtil.AppendFindResponse(findCtx, int64(index), resp.Response, resp.Instances,
+		eutil.AppendFindResponse(findCtx, int64(index), resp.Response, resp.Instances,
 			&services.Updated, &services.NotModified, &failed)
 		if !ok && failed != nil {
 			failedResult[resp.Response.GetCode()] = failed
@@ -1255,7 +1247,7 @@ func (ds *MetadataManager) batchFindInstances(ctx context.Context, request *pb.B
 			return nil, err
 		}
 		failed, ok := failedResult[resp.Response.GetCode()]
-		serviceUtil.AppendFindResponse(getCtx, int64(index), resp.Response, []*pb.MicroServiceInstance{resp.Instance},
+		eutil.AppendFindResponse(getCtx, int64(index), resp.Response, []*pb.MicroServiceInstance{resp.Instance},
 			&instances.Updated, &instances.NotModified, &failed)
 		if !ok && failed != nil {
 			failedResult[resp.Response.GetCode()] = failed
@@ -1300,7 +1292,7 @@ func (ds *MetadataManager) Heartbeat(ctx context.Context, request *pb.HeartbeatR
 	domainProject := util.ParseDomainProject(ctx)
 	instanceFlag := util.StringJoin([]string{request.ServiceId, request.InstanceId}, path.SPLIT)
 
-	_, ttl, err := serviceUtil.HeartbeatUtil(ctx, domainProject, request.ServiceId, request.InstanceId)
+	_, ttl, err := eutil.HeartbeatUtil(ctx, domainProject, request.ServiceId, request.InstanceId)
 	if err != nil {
 		log.Error(fmt.Sprintf("heartbeat failed, instance[%s]. operator %s",
 			instanceFlag, remoteIP), err)
@@ -1329,7 +1321,7 @@ func (ds *MetadataManager) Heartbeat(ctx context.Context, request *pb.HeartbeatR
 func (ds *MetadataManager) GetAllInstances(ctx context.Context, request *pb.GetAllInstancesRequest) (*pb.GetAllInstancesResponse, error) {
 	domainProject := util.ParseDomainProject(ctx)
 	key := path.GetInstanceRootKey(domainProject) + path.SPLIT
-	opts := append(serviceUtil.FromContext(ctx), etcdadpt.WithStrKey(key), etcdadpt.WithPrefix())
+	opts := append(eutil.FromContext(ctx), etcdadpt.WithStrKey(key), etcdadpt.WithPrefix())
 	kvs, err := sd.Instance().Search(ctx, opts...)
 	if err != nil {
 		return nil, err
@@ -1354,7 +1346,7 @@ func (ds *MetadataManager) ModifySchemas(ctx context.Context, request *pb.Modify
 	serviceID := request.ServiceId
 	domainProject := util.ParseDomainProject(ctx)
 
-	serviceInfo, err := serviceUtil.GetService(ctx, domainProject, serviceID)
+	serviceInfo, err := eutil.GetService(ctx, domainProject, serviceID)
 	if err != nil {
 		if errors.Is(err, datasource.ErrNoData) {
 			log.Debug(fmt.Sprintf("modify service[%s] schemas failed, service does not exist in db, operator: %s",
@@ -1403,7 +1395,7 @@ func (ds *MetadataManager) ExistSchema(ctx context.Context, request *pb.GetExist
 	*pb.GetExistenceResponse, error) {
 	domainProject := util.ParseDomainProject(ctx)
 
-	if !serviceUtil.ServiceExist(ctx, domainProject, request.ServiceId) {
+	if !eutil.ServiceExist(ctx, domainProject, request.ServiceId) {
 		log.Warn(fmt.Sprintf("schema[%s/%s] exist failed, service does not exist", request.ServiceId, request.SchemaId))
 		return nil, pb.NewError(pb.ErrServiceNotExists, "service does not exist.")
 	}
@@ -1434,14 +1426,14 @@ func (ds *MetadataManager) ExistSchema(ctx context.Context, request *pb.GetExist
 func (ds *MetadataManager) GetSchema(ctx context.Context, request *pb.GetSchemaRequest) (*pb.GetSchemaResponse, error) {
 	domainProject := util.ParseDomainProject(ctx)
 
-	if !serviceUtil.ServiceExist(ctx, domainProject, request.ServiceId) {
+	if !eutil.ServiceExist(ctx, domainProject, request.ServiceId) {
 		log.Error(fmt.Sprintf("get schema[%s/%s] failed, service does not exist",
 			request.ServiceId, request.SchemaId), nil)
 		return nil, pb.NewError(pb.ErrServiceNotExists, "Service does not exist.")
 	}
 
 	key := path.GenerateServiceSchemaKey(domainProject, request.ServiceId, request.SchemaId)
-	opts := append(serviceUtil.FromContext(ctx), etcdadpt.WithStrKey(key))
+	opts := append(eutil.FromContext(ctx), etcdadpt.WithStrKey(key))
 	resp, errDo := sd.Schema().Search(ctx, opts...)
 	if errDo != nil {
 		log.Error(fmt.Sprintf("get schema[%s/%s] failed", request.ServiceId, request.SchemaId), errDo)
@@ -1471,7 +1463,7 @@ func (ds *MetadataManager) GetAllSchemas(ctx context.Context, request *pb.GetAll
 	*pb.GetAllSchemaResponse, error) {
 	domainProject := util.ParseDomainProject(ctx)
 
-	service, err := serviceUtil.GetService(ctx, domainProject, request.ServiceId)
+	service, err := eutil.GetService(ctx, domainProject, request.ServiceId)
 	if err != nil {
 		if errors.Is(err, datasource.ErrNoData) {
 			log.Debug(fmt.Sprintf("get service[%s] all schemas failed, service does not exist in db", request.ServiceId))
@@ -1490,7 +1482,7 @@ func (ds *MetadataManager) GetAllSchemas(ctx context.Context, request *pb.GetAll
 	}
 
 	key := path.GenerateServiceSchemaSummaryKey(domainProject, request.ServiceId, "")
-	opts := append(serviceUtil.FromContext(ctx), etcdadpt.WithStrKey(key), etcdadpt.WithPrefix())
+	opts := append(eutil.FromContext(ctx), etcdadpt.WithStrKey(key), etcdadpt.WithPrefix())
 	resp, errDo := sd.SchemaSummary().Search(ctx, opts...)
 	if errDo != nil {
 		log.Error(fmt.Sprintf("get service[%s] all schema summaries failed", request.ServiceId), errDo)
@@ -1500,7 +1492,7 @@ func (ds *MetadataManager) GetAllSchemas(ctx context.Context, request *pb.GetAll
 	respWithSchema := &kvstore.Response{}
 	if request.WithSchema {
 		key := path.GenerateServiceSchemaKey(domainProject, request.ServiceId, "")
-		opts := append(serviceUtil.FromContext(ctx), etcdadpt.WithStrKey(key), etcdadpt.WithPrefix())
+		opts := append(eutil.FromContext(ctx), etcdadpt.WithStrKey(key), etcdadpt.WithPrefix())
 		respWithSchema, errDo = sd.Schema().Search(ctx, opts...)
 		if errDo != nil {
 			log.Error(fmt.Sprintf("get service[%s] all schemas failed", request.ServiceId), errDo)
@@ -1540,7 +1532,7 @@ func (ds *MetadataManager) DeleteSchema(ctx context.Context, request *pb.DeleteS
 	domainProject := util.ParseDomainProject(ctx)
 
 	key := path.GenerateServiceSchemaKey(domainProject, request.ServiceId, request.SchemaId)
-	exist, err := serviceUtil.CheckSchemaInfoExist(ctx, key)
+	exist, err := eutil.CheckSchemaInfoExist(ctx, key)
 	if err != nil {
 		log.Error(fmt.Sprintf("delete schema[%s/%s] failed, operator: %s",
 			request.ServiceId, request.SchemaId, remoteIP), err)
@@ -1582,7 +1574,7 @@ func (ds *MetadataManager) AddTags(ctx context.Context, request *pb.AddServiceTa
 	domainProject := util.ParseDomainProject(ctx)
 
 	// service id存在性校验
-	if !serviceUtil.ServiceExist(ctx, domainProject, request.ServiceId) {
+	if !eutil.ServiceExist(ctx, domainProject, request.ServiceId) {
 		log.Error(fmt.Sprintf("add service[%s]'s tags %v failed, service does not exist, operator: %s",
 			request.ServiceId, request.Tags, remoteIP), nil)
 		return &pb.AddServiceTagsResponse{
@@ -1590,7 +1582,7 @@ func (ds *MetadataManager) AddTags(ctx context.Context, request *pb.AddServiceTa
 		}, nil
 	}
 
-	checkErr := serviceUtil.AddTagIntoETCD(ctx, domainProject, request.ServiceId, request.Tags)
+	checkErr := eutil.AddTagIntoETCD(ctx, domainProject, request.ServiceId, request.Tags)
 	if checkErr != nil {
 		log.Error(fmt.Sprintf("add service[%s]'s tags %v failed, operator: %s",
 			request.ServiceId, request.Tags, remoteIP), checkErr)
@@ -1612,13 +1604,13 @@ func (ds *MetadataManager) AddTags(ctx context.Context, request *pb.AddServiceTa
 func (ds *MetadataManager) GetTags(ctx context.Context, request *pb.GetServiceTagsRequest) (*pb.GetServiceTagsResponse, error) {
 	var err error
 	domainProject := util.ParseDomainProject(ctx)
-	if !serviceUtil.ServiceExist(ctx, domainProject, request.ServiceId) {
+	if !eutil.ServiceExist(ctx, domainProject, request.ServiceId) {
 		log.Error(fmt.Sprintf("get service[%s]'s tags failed, service does not exist", request.ServiceId), err)
 		return &pb.GetServiceTagsResponse{
 			Response: pb.CreateResponse(pb.ErrServiceNotExists, "Service does not exist."),
 		}, nil
 	}
-	tags, err := serviceUtil.GetTagsUtils(ctx, domainProject, request.ServiceId)
+	tags, err := eutil.GetTagsUtils(ctx, domainProject, request.ServiceId)
 	if err != nil {
 		log.Error(fmt.Sprintf("get service[%s]'s tags failed, get tags failed", request.ServiceId), err)
 		return &pb.GetServiceTagsResponse{
@@ -1638,7 +1630,7 @@ func (ds *MetadataManager) UpdateTag(ctx context.Context, request *pb.UpdateServ
 	tagFlag := util.StringJoin([]string{request.Key, request.Value}, path.SPLIT)
 	domainProject := util.ParseDomainProject(ctx)
 
-	if !serviceUtil.ServiceExist(ctx, domainProject, request.ServiceId) {
+	if !eutil.ServiceExist(ctx, domainProject, request.ServiceId) {
 		log.Error(fmt.Sprintf("update service[%s]'s tag[%s] failed, service does not exist, operator: %s",
 			request.ServiceId, tagFlag, remoteIP), err)
 		return &pb.UpdateServiceTagResponse{
@@ -1646,7 +1638,7 @@ func (ds *MetadataManager) UpdateTag(ctx context.Context, request *pb.UpdateServ
 		}, nil
 	}
 
-	tags, err := serviceUtil.GetTagsUtils(ctx, domainProject, request.ServiceId)
+	tags, err := eutil.GetTagsUtils(ctx, domainProject, request.ServiceId)
 	if err != nil {
 		log.Error(fmt.Sprintf("update service[%s]'s tag[%s] failed, get tag failed, operator: %s",
 			request.ServiceId, tagFlag, remoteIP), err)
@@ -1670,7 +1662,7 @@ func (ds *MetadataManager) UpdateTag(ctx context.Context, request *pb.UpdateServ
 	}
 	copyTags[request.Key] = request.Value
 
-	checkErr := serviceUtil.AddTagIntoETCD(ctx, domainProject, request.ServiceId, copyTags)
+	checkErr := eutil.AddTagIntoETCD(ctx, domainProject, request.ServiceId, copyTags)
 	if checkErr != nil {
 		log.Error(fmt.Sprintf("update service[%s]'s tag[%s] failed, operator: %s",
 			request.ServiceId, tagFlag, remoteIP), checkErr)
@@ -1693,7 +1685,7 @@ func (ds *MetadataManager) DeleteTags(ctx context.Context, request *pb.DeleteSer
 	remoteIP := util.GetIPFromContext(ctx)
 	domainProject := util.ParseDomainProject(ctx)
 
-	if !serviceUtil.ServiceExist(ctx, domainProject, request.ServiceId) {
+	if !eutil.ServiceExist(ctx, domainProject, request.ServiceId) {
 		log.Error(fmt.Sprintf("delete service[%s]'s tags %v failed, service does not exist, operator: %s",
 			request.ServiceId, request.Keys, remoteIP), nil)
 		return &pb.DeleteServiceTagsResponse{
@@ -1701,7 +1693,7 @@ func (ds *MetadataManager) DeleteTags(ctx context.Context, request *pb.DeleteSer
 		}, nil
 	}
 
-	tags, err := serviceUtil.GetTagsUtils(ctx, domainProject, request.ServiceId)
+	tags, err := eutil.GetTagsUtils(ctx, domainProject, request.ServiceId)
 	if err != nil {
 		log.Error(fmt.Sprintf("delete service[%s]'s tags %v failed, get service tags failed, operator: %s",
 			request.ServiceId, request.Keys, remoteIP), err)
@@ -1737,10 +1729,18 @@ func (ds *MetadataManager) DeleteTags(ctx context.Context, request *pb.DeleteSer
 
 	key := path.GenerateServiceTagKey(domainProject, request.ServiceId)
 
-	resp, err := etcdadpt.TxnWithCmp(ctx,
-		etcdadpt.Ops(etcdadpt.OpPut(etcdadpt.WithStrKey(key), etcdadpt.WithValue(data))),
-		etcdadpt.If(etcdadpt.NotEqualVer(path.GenerateServiceKey(domainProject, request.ServiceId), 0)),
-		nil)
+	opts := etcdadpt.Ops(etcdadpt.OpPut(etcdadpt.WithStrKey(key), etcdadpt.WithValue(data)))
+	syncOpts, err := esync.GenDeleteOpts(ctx, datasource.ResourceKV, key, data,
+		esync.WithOpts(map[string]string{"key": key}))
+	if err != nil {
+		return &pb.DeleteServiceTagsResponse{
+			Response: pb.CreateResponse(pb.ErrInternal, err.Error()),
+		}, err
+	}
+	opts = append(opts, syncOpts...)
+
+	resp, err := etcdadpt.TxnWithCmp(ctx, opts,
+		etcdadpt.If(etcdadpt.NotEqualVer(path.GenerateServiceKey(domainProject, request.ServiceId), 0)), nil)
 	if err != nil {
 		log.Error(fmt.Sprintf("delete service[%s]'s tags %v failed, operator: %s",
 			request.ServiceId, request.Keys, remoteIP), err)
@@ -1786,7 +1786,7 @@ func (ds *MetadataManager) modifySchemas(ctx context.Context, domainProject stri
 			}
 
 			service.Schemas = nonExistSchemaIds
-			opt, err := serviceUtil.UpdateService(domainProject, serviceID, service)
+			opt, err := eutil.UpdateService(domainProject, serviceID, service)
 			if err != nil {
 				log.Error(fmt.Sprintf("modify service[%s] schemas failed, update service.Schemas failed, operator: %s",
 					serviceID, remoteIP), err)
@@ -1851,7 +1851,7 @@ func (ds *MetadataManager) modifySchemas(ctx context.Context, domainProject stri
 		}
 
 		service.Schemas = schemaIDs
-		opt, err := serviceUtil.UpdateService(domainProject, serviceID, service)
+		opt, err := eutil.UpdateService(domainProject, serviceID, service)
 		if err != nil {
 			log.Error(fmt.Sprintf("modify service[%s] schemas failed, update service.Schemas failed, operator: %s",
 				serviceID, remoteIP), err)
@@ -1883,7 +1883,7 @@ func (ds *MetadataManager) modifySchema(ctx context.Context, serviceID string, s
 	domainProject := util.ParseDomainProject(ctx)
 	schemaID := schema.SchemaId
 
-	microService, err := serviceUtil.GetService(ctx, domainProject, serviceID)
+	microService, err := eutil.GetService(ctx, domainProject, serviceID)
 	if err != nil {
 		if errors.Is(err, datasource.ErrNoData) {
 			log.Debug(fmt.Sprintf("microService does not exist, modify schema[%s/%s] failed, operator: %s",
@@ -1933,7 +1933,7 @@ func (ds *MetadataManager) modifySchema(ctx context.Context, serviceID string, s
 
 		if len(microService.Schemas) == 0 {
 			microService.Schemas = append(microService.Schemas, schemaID)
-			opt, err := serviceUtil.UpdateService(domainProject, serviceID, microService)
+			opt, err := eutil.UpdateService(domainProject, serviceID, microService)
 			if err != nil {
 				log.Error(fmt.Sprintf("modify schema[%s/%s] failed, update microService.Schemas failed, operator: %s",
 					serviceID, schemaID, remoteIP), err)
@@ -1944,7 +1944,7 @@ func (ds *MetadataManager) modifySchema(ctx context.Context, serviceID string, s
 	} else {
 		if !isExist {
 			microService.Schemas = append(microService.Schemas, schemaID)
-			opt, err := serviceUtil.UpdateService(domainProject, serviceID, microService)
+			opt, err := eutil.UpdateService(domainProject, serviceID, microService)
 			if err != nil {
 				log.Error(fmt.Sprintf("modify schema[%s/%s] failed, update microService.Schemas failed, operator: %s",
 					serviceID, schemaID, remoteIP), err)
@@ -1984,7 +1984,7 @@ func (ds *MetadataManager) DeleteServicePri(ctx context.Context, serviceID strin
 		return pb.CreateResponse(pb.ErrInvalidParams, err.Error()), nil
 	}
 
-	microservice, err := serviceUtil.GetService(ctx, domainProject, serviceID)
+	microservice, err := eutil.GetService(ctx, domainProject, serviceID)
 	if err != nil {
 		if errors.Is(err, datasource.ErrNoData) {
 			log.Debug(fmt.Sprintf("service does not exist, %s micro-service[%s] failed, operator: %s",
@@ -1998,7 +1998,7 @@ func (ds *MetadataManager) DeleteServicePri(ctx context.Context, serviceID strin
 
 	// 强制删除,则与该服务相关的信息删除,非强制删除: 如果作为该被依赖(作为provider,提供服务,且不是只存在自依赖)或者存在实例,则不能删除
 	if !force {
-		services, err := serviceUtil.GetConsumerIds(ctx, domainProject, microservice)
+		services, err := eutil.GetConsumerIds(ctx, domainProject, microservice)
 		if err != nil {
 			log.Error(fmt.Sprintf("delete micro-service[%s] failed, get service dependency failed, operator: %s",
 				serviceID, remoteIP), err)
@@ -2042,26 +2042,16 @@ func (ds *MetadataManager) DeleteServicePri(ctx context.Context, serviceID strin
 		etcdadpt.OpDel(etcdadpt.WithStrKey(path.GenerateServiceAliasKey(serviceKey))),
 		etcdadpt.OpDel(etcdadpt.WithStrKey(serviceIDKey)),
 	}
-
-	if datasource.EnableSync {
-		domain := util.ParseDomain(ctx)
-		project := util.ParseProject(ctx)
-		taskOpt, err := GenTaskOpts(domain, project, sync.DeleteAction, datasource.ResourceService,
-			&pb.DeleteServiceRequest{ServiceId: serviceID, Force: force})
-		if err != nil {
-			log.Error("fail to create task", err)
-			return pb.CreateResponse(pb.ErrInternal, err.Error()), err
-		}
-		tombstoneOpt, err := GenTombstoneOpts(domain, project, datasource.ResourceService, serviceID)
-		if err != nil {
-			log.Error("fail to create tombstone", err)
-			return pb.CreateResponse(pb.ErrInternal, err.Error()), err
-		}
-		opts = append(opts, taskOpt, tombstoneOpt)
+	syncOpts, err := esync.GenDeleteOpts(ctx, datasource.ResourceService, serviceID,
+		&pb.DeleteServiceRequest{ServiceId: serviceID, Force: force})
+	if err != nil {
+		log.Error("fail to sync opt", err)
+		return pb.CreateResponse(pb.ErrInternal, err.Error()), err
 	}
+	opts = append(opts, syncOpts...)
 
 	//删除依赖规则
-	optDeleteDep, err := serviceUtil.DeleteDependencyForDeleteService(domainProject, serviceID, serviceKey)
+	optDeleteDep, err := eutil.DeleteDependencyForDeleteService(domainProject, serviceID, serviceKey)
 	if err != nil {
 		log.Error(fmt.Sprintf("%s micro-service[%s] failed, delete dependency failed, operator: %s",
 			title, serviceID, remoteIP), err)
@@ -2093,7 +2083,7 @@ func (ds *MetadataManager) DeleteServicePri(ctx context.Context, serviceID strin
 		etcdadpt.WithPrefix()))
 
 	//删除实例
-	err = serviceUtil.DeleteServiceAllInstances(ctx, serviceID)
+	err = eutil.DeleteServiceAllInstances(ctx, serviceID)
 	if err != nil {
 		log.Error(fmt.Sprintf("%s micro-service[%s] failed, revoke all instances failed, operator: %s",
 			title, serviceID, remoteIP), err)
diff --git a/datasource/etcd/role.go b/datasource/etcd/role.go
index 9e117d9..d2a3735 100644
--- a/datasource/etcd/role.go
+++ b/datasource/etcd/role.go
@@ -24,19 +24,19 @@ import (
 	"strconv"
 	"time"
 
-	rbacmodel "github.com/go-chassis/cari/rbac"
-	"github.com/go-chassis/cari/sync"
+	crbac "github.com/go-chassis/cari/rbac"
 	"github.com/little-cui/etcdadpt"
 
 	"github.com/apache/servicecomb-service-center/datasource"
 	"github.com/apache/servicecomb-service-center/datasource/etcd/path"
+	esync "github.com/apache/servicecomb-service-center/datasource/etcd/sync"
 	"github.com/apache/servicecomb-service-center/datasource/rbac"
 	"github.com/apache/servicecomb-service-center/pkg/etcdsync"
 	"github.com/apache/servicecomb-service-center/pkg/log"
 	"github.com/apache/servicecomb-service-center/pkg/util"
 )
 
-func (rm *RbacDAO) CreateRole(ctx context.Context, r *rbacmodel.Role) error {
+func (rm *RbacDAO) CreateRole(ctx context.Context, r *crbac.Role) error {
 	lock, err := etcdsync.Lock("/role-creating/"+r.Name, -1, false)
 	if err != nil {
 		return fmt.Errorf("role %s is creating", r.Name)
@@ -67,14 +67,12 @@ func (rm *RbacDAO) CreateRole(ctx context.Context, r *rbacmodel.Role) error {
 	opts := []etcdadpt.OpOptions{
 		etcdadpt.OpPut(etcdadpt.WithStrKey(key), etcdadpt.WithValue(value)),
 	}
-	if datasource.EnableSync {
-		taskOpt, err := GenTaskOpts("", "", sync.CreateAction, datasource.ResourceRole, r)
-		if err != nil {
-			log.Error("", err)
-			return err
-		}
-		opts = append(opts, taskOpt)
+	syncOpts, err := esync.GenCreateOpts(ctx, datasource.ResourceRole, r)
+	if err != nil {
+		log.Error("fail to create sync opts", err)
+		return err
 	}
+	opts = append(opts, syncOpts...)
 	err = etcdadpt.Txn(ctx, opts)
 	if err != nil {
 		log.Error("can not save account info", err)
@@ -88,7 +86,7 @@ func (rm *RbacDAO) RoleExist(ctx context.Context, name string) (bool, error) {
 	return etcdadpt.Exist(ctx, path.GenerateRBACRoleKey(name))
 }
 
-func (rm *RbacDAO) GetRole(ctx context.Context, name string) (*rbacmodel.Role, error) {
+func (rm *RbacDAO) GetRole(ctx context.Context, name string) (*crbac.Role, error) {
 	kv, err := etcdadpt.Get(ctx, path.GenerateRBACRoleKey(name))
 	if err != nil {
 		return nil, err
@@ -96,7 +94,7 @@ func (rm *RbacDAO) GetRole(ctx context.Context, name string) (*rbacmodel.Role, e
 	if kv == nil {
 		return nil, rbac.ErrRoleNotExist
 	}
-	role := &rbacmodel.Role{}
+	role := &crbac.Role{}
 	err = json.Unmarshal(kv.Value, role)
 	if err != nil {
 		log.Error("role info format invalid", err)
@@ -104,14 +102,14 @@ func (rm *RbacDAO) GetRole(ctx context.Context, name string) (*rbacmodel.Role, e
 	}
 	return role, nil
 }
-func (rm *RbacDAO) ListRole(ctx context.Context) ([]*rbacmodel.Role, int64, error) {
+func (rm *RbacDAO) ListRole(ctx context.Context) ([]*crbac.Role, int64, error) {
 	kvs, n, err := etcdadpt.List(ctx, path.GenerateRBACRoleKey(""))
 	if err != nil {
 		return nil, 0, err
 	}
-	roles := make([]*rbacmodel.Role, 0, n)
+	roles := make([]*crbac.Role, 0, n)
 	for _, v := range kvs {
-		r := &rbacmodel.Role{}
+		r := &crbac.Role{}
 		err = json.Unmarshal(v.Value, r)
 		if err != nil {
 			log.Error("role info format invalid:", err)
@@ -132,19 +130,12 @@ func (rm *RbacDAO) DeleteRole(ctx context.Context, name string) (bool, error) {
 		return false, rbac.ErrRoleBindingExist
 	}
 	opts := []etcdadpt.OpOptions{etcdadpt.OpDel(etcdadpt.WithStrKey(path.GenerateRBACRoleKey(name)))}
-	if datasource.EnableSync {
-		taskOpt, err := GenTaskOpts("", "", sync.DeleteAction, datasource.ResourceRole, name)
-		if err != nil {
-			log.Error("", err)
-			return false, err
-		}
-		tombstoneOpt, err := GenTombstoneOpts("", "", datasource.ResourceRole, name)
-		if err != nil {
-			log.Error("", err)
-			return false, err
-		}
-		opts = append(opts, taskOpt, tombstoneOpt)
+	syncOpts, err := esync.GenDeleteOpts(ctx, datasource.ResourceRole, name, name)
+	if err != nil {
+		log.Error("fail to create sync opts", err)
+		return false, err
 	}
+	opts = append(opts, syncOpts...)
 	err = etcdadpt.Txn(ctx, opts)
 	if err != nil {
 		return false, err
@@ -159,7 +150,7 @@ func RoleBindingExists(ctx context.Context, role string) (bool, error) {
 	}
 	return total > 0, nil
 }
-func (rm *RbacDAO) UpdateRole(ctx context.Context, name string, role *rbacmodel.Role) error {
+func (rm *RbacDAO) UpdateRole(ctx context.Context, name string, role *crbac.Role) error {
 	role.UpdateTime = strconv.FormatInt(time.Now().Unix(), 10)
 	value, err := json.Marshal(role)
 	if err != nil {
@@ -169,13 +160,11 @@ func (rm *RbacDAO) UpdateRole(ctx context.Context, name string, role *rbacmodel.
 	opts := []etcdadpt.OpOptions{
 		etcdadpt.OpPut(etcdadpt.WithStrKey(path.GenerateRBACRoleKey(name)), etcdadpt.WithValue(value)),
 	}
-	if datasource.EnableSync {
-		taskOpt, err := GenTaskOpts("", "", sync.UpdateAction, datasource.ResourceRole, role)
-		if err != nil {
-			log.Error("", err)
-			return err
-		}
-		opts = append(opts, taskOpt)
+	syncOpts, err := esync.GenUpdateOpts(ctx, datasource.ResourceRole, role)
+	if err != nil {
+		log.Error("fail to create sync opts", err)
+		return err
 	}
+	opts = append(opts, syncOpts...)
 	return etcdadpt.Txn(ctx, opts)
 }
diff --git a/datasource/etcd/role_test.go b/datasource/etcd/role_test.go
index 0a447c5..a9af11f 100644
--- a/datasource/etcd/role_test.go
+++ b/datasource/etcd/role_test.go
@@ -22,7 +22,7 @@ import (
 	"strconv"
 	"testing"
 
-	rbacmodel "github.com/go-chassis/cari/rbac"
+	crbac "github.com/go-chassis/cari/rbac"
 	"github.com/stretchr/testify/assert"
 
 	"github.com/apache/servicecomb-service-center/datasource"
@@ -39,7 +39,7 @@ func TestSyncRole(t *testing.T) {
 
 	t.Run("create role", func(t *testing.T) {
 		t.Run("creating a role and delete it will create two tasks and a tombstone should pass", func(t *testing.T) {
-			r1 := rbacmodel.Role{
+			r1 := crbac.Role{
 				ID:    "create-11111",
 				Name:  "create-role",
 				Perms: nil,
@@ -78,12 +78,12 @@ func TestSyncRole(t *testing.T) {
 	t.Run("update role", func(t *testing.T) {
 		t.Run("create two roles ,then update them, finally delete them, will create six tasks and two tombstones should pass",
 			func(t *testing.T) {
-				r2 := rbacmodel.Role{
+				r2 := crbac.Role{
 					ID:    "update-22222",
 					Name:  "update-role-22222",
 					Perms: nil,
 				}
-				r3 := rbacmodel.Role{
+				r3 := crbac.Role{
 					ID:    "update-33333",
 					Name:  "update-role-33333",
 					Perms: nil,
diff --git a/datasource/etcd/sync/sync.go b/datasource/etcd/sync/sync.go
new file mode 100644
index 0000000..28f4112
--- /dev/null
+++ b/datasource/etcd/sync/sync.go
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package sync
+
+import (
+	"context"
+	"encoding/json"
+
+	"github.com/go-chassis/cari/sync"
+	"github.com/little-cui/etcdadpt"
+
+	"github.com/apache/servicecomb-service-center/datasource"
+	"github.com/apache/servicecomb-service-center/eventbase/datasource/etcd/key"
+	"github.com/apache/servicecomb-service-center/pkg/util"
+)
+
+type Options struct {
+	ResourceID string
+	Opts       map[string]string
+}
+
+type Option func(options *Options)
+
+func NewSyncOptions() Options {
+	return Options{}
+}
+
+func WithResourceID(resourceID string) Option {
+	return func(options *Options) {
+		options.ResourceID = resourceID
+	}
+}
+
+func WithOpts(opts map[string]string) Option {
+	return func(options *Options) {
+		options.Opts = opts
+	}
+}
+
+func GenCreateOpts(ctx context.Context, resourceType string, resource interface{},
+	options ...Option) ([]etcdadpt.OpOptions, error) {
+	return genOpts(ctx, sync.CreateAction, resourceType, resource, options...)
+}
+
+func GenUpdateOpts(ctx context.Context, resourceType string, resource interface{},
+	options ...Option) ([]etcdadpt.OpOptions, error) {
+	return genOpts(ctx, sync.UpdateAction, resourceType, resource, options...)
+}
+
+func GenDeleteOpts(ctx context.Context, resourceType, resourceID string, resource interface{},
+	options ...Option) ([]etcdadpt.OpOptions, error) {
+	options = append(options, WithResourceID(resourceID))
+	return genOpts(ctx, sync.DeleteAction, resourceType, resource, options...)
+
+}
+
+func genOpts(ctx context.Context, action string, resourceType string, resource interface{},
+	options ...Option) ([]etcdadpt.OpOptions, error) {
+	if !datasource.EnableSync {
+		return nil, nil
+	}
+	syncOpts := NewSyncOptions()
+	for _, option := range options {
+		option(&syncOpts)
+	}
+	taskOpt, err := genTaskOpt(ctx, action, resourceType, resource, &syncOpts)
+	if err != nil {
+		return nil, err
+	}
+	if action != sync.DeleteAction {
+		return []etcdadpt.OpOptions{taskOpt}, nil
+	}
+	tombstoneOpt, err := genTombstoneOpt(ctx, resourceType, syncOpts.ResourceID)
+	if err != nil {
+		return nil, err
+	}
+	return []etcdadpt.OpOptions{taskOpt, tombstoneOpt}, nil
+}
+
+func genTaskOpt(ctx context.Context, action string, resourceType string, resource interface{},
+	syncOpts *Options) (etcdadpt.OpOptions, error) {
+	domain := util.ParseDomain(ctx)
+	project := util.ParseProject(ctx)
+	task, err := sync.NewTask(domain, project, action, resourceType, resource)
+	if err != nil {
+		return etcdadpt.OpOptions{}, err
+	}
+	if syncOpts.Opts != nil {
+		task.Opts = syncOpts.Opts
+	}
+	taskBytes, err := json.Marshal(task)
+	if err != nil {
+		return etcdadpt.OpOptions{}, err
+	}
+	taskOpPut := etcdadpt.OpPut(etcdadpt.WithStrKey(key.TaskKey(domain, project,
+		task.ID, task.Timestamp)), etcdadpt.WithValue(taskBytes))
+	return taskOpPut, nil
+}
+
+func genTombstoneOpt(ctx context.Context, resourceType, resourceID string) (etcdadpt.OpOptions, error) {
+	domain := util.ParseDomain(ctx)
+	project := util.ParseProject(ctx)
+	tombstone := sync.NewTombstone(domain, project, resourceType, resourceID)
+	tombstoneBytes, err := json.Marshal(tombstone)
+	if err != nil {
+		return etcdadpt.OpOptions{}, err
+	}
+	tombstoneOpPut := etcdadpt.OpPut(etcdadpt.WithStrKey(key.TombstoneKey(domain, project, tombstone.ResourceType,
+		tombstone.ResourceID)), etcdadpt.WithValue(tombstoneBytes))
+	return tombstoneOpPut, nil
+}
diff --git a/datasource/etcd/sync/sync_test.go b/datasource/etcd/sync/sync_test.go
new file mode 100644
index 0000000..c723ba4
--- /dev/null
+++ b/datasource/etcd/sync/sync_test.go
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package sync_test
+
+import (
+	"context"
+	"testing"
+
+	pb "github.com/go-chassis/cari/discovery"
+	"github.com/stretchr/testify/assert"
+
+	"github.com/apache/servicecomb-service-center/datasource"
+	"github.com/apache/servicecomb-service-center/datasource/etcd/sync"
+	"github.com/apache/servicecomb-service-center/pkg/util"
+)
+
+func optsContext() context.Context {
+	return util.WithNoCache(util.SetDomainProject(context.Background(), "sync-opts",
+		"sync-opts"))
+}
+
+func TestOpts(t *testing.T) {
+	datasource.EnableSync = true
+
+	t.Run("create func will create a task opt should pass", func(t *testing.T) {
+		opts, err := sync.GenCreateOpts(optsContext(), datasource.ResourceService, &pb.CreateServiceRequest{})
+		assert.Nil(t, err)
+		assert.Equal(t, 1, len(opts))
+	})
+
+	t.Run("update func will create a task opt should pass", func(t *testing.T) {
+		opts, err := sync.GenUpdateOpts(optsContext(), datasource.ResourceService, &pb.UpdateServicePropsRequest{})
+		assert.Nil(t, err)
+		assert.Equal(t, 1, len(opts))
+	})
+
+	t.Run("delete func will create a task and a tombstone should pass", func(t *testing.T) {
+		opts, err := sync.GenDeleteOpts(optsContext(), datasource.ResourceService, "11111", &pb.DeleteServiceRequest{})
+		assert.Nil(t, err)
+		assert.Equal(t, 2, len(opts))
+	})
+	datasource.EnableSync = false
+}
diff --git a/datasource/etcd/tag_test.go b/datasource/etcd/tag_test.go
new file mode 100644
index 0000000..829fbe1
--- /dev/null
+++ b/datasource/etcd/tag_test.go
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package etcd_test
+
+import (
+	"context"
+	"testing"
+
+	pb "github.com/go-chassis/cari/discovery"
+	"github.com/go-chassis/cari/sync"
+	"github.com/stretchr/testify/assert"
+
+	"github.com/apache/servicecomb-service-center/datasource"
+	"github.com/apache/servicecomb-service-center/eventbase/model"
+	"github.com/apache/servicecomb-service-center/eventbase/service/task"
+	"github.com/apache/servicecomb-service-center/eventbase/service/tombstone"
+	"github.com/apache/servicecomb-service-center/pkg/util"
+	_ "github.com/apache/servicecomb-service-center/test"
+)
+
+func tagContext() context.Context {
+	return util.WithNoCache(util.SetDomainProject(context.Background(), "sync-tag",
+		"sync-tag"))
+}
+
+func TestSyncTag(t *testing.T) {
+	var serviceID string
+	datasource.EnableSync = true
+	t.Run("create service", func(t *testing.T) {
+		t.Run("register a micro service will create a task should pass", func(t *testing.T) {
+			resp, err := datasource.GetMetadataManager().RegisterService(tagContext(), &pb.CreateServiceRequest{
+				Service: &pb.MicroService{
+					AppId:       "sync_tag_group",
+					ServiceName: "sync_micro_service_tag",
+					Version:     "1.0.0",
+					Level:       "FRONT",
+					Status:      pb.MS_UP,
+				},
+			})
+			assert.NotNil(t, resp)
+			assert.NoError(t, err)
+			assert.Equal(t, pb.ResponseSuccess, resp.Response.GetCode())
+			serviceID = resp.ServiceId
+			listTaskReq := model.ListTaskRequest{
+				Domain:       "sync-tag",
+				Project:      "sync-tag",
+				ResourceType: datasource.ResourceService,
+				Action:       sync.CreateAction,
+				Status:       sync.PendingStatus,
+			}
+			tasks, err := task.List(tagContext(), &listTaskReq)
+			assert.NoError(t, err)
+			assert.Equal(t, 1, len(tasks))
+			err = task.Delete(tagContext(), tasks...)
+			assert.NoError(t, err)
+			tasks, err = task.List(tagContext(), &listTaskReq)
+			assert.NoError(t, err)
+			assert.Equal(t, 0, len(tasks))
+		})
+	})
+
+	t.Run("add tags", func(t *testing.T) {
+		t.Run("add tags for a service will create a task should pass", func(t *testing.T) {
+			respAddTages, err := datasource.GetMetadataManager().AddTags(tagContext(), &pb.AddServiceTagsRequest{
+				ServiceId: serviceID,
+				Tags: map[string]string{
+					"a": "test",
+					"b": "b",
+				},
+			})
+			assert.NoError(t, err)
+			assert.Equal(t, pb.ResponseSuccess, respAddTages.Response.GetCode())
+			listTaskReq := model.ListTaskRequest{
+				Domain:       "sync-tag",
+				Project:      "sync-tag",
+				ResourceType: datasource.ResourceKV,
+				Action:       sync.UpdateAction,
+				Status:       sync.PendingStatus,
+			}
+			tasks, err := task.List(tagContext(), &listTaskReq)
+			assert.NoError(t, err)
+			assert.Equal(t, 1, len(tasks))
+			err = task.Delete(tagContext(), tasks...)
+			assert.NoError(t, err)
+			tasks, err = task.List(tagContext(), &listTaskReq)
+			assert.NoError(t, err)
+			assert.Equal(t, 0, len(tasks))
+		})
+	})
+
+	t.Run("update a tag", func(t *testing.T) {
+		t.Run("update a service tag will create a task should pass", func(t *testing.T) {
+			resp, err := datasource.GetMetadataManager().UpdateTag(tagContext(), &pb.UpdateServiceTagRequest{
+				ServiceId: serviceID,
+				Key:       "a",
+				Value:     "update",
+			})
+			assert.NoError(t, err)
+			assert.Equal(t, pb.ResponseSuccess, resp.Response.GetCode())
+			listTaskReq := model.ListTaskRequest{
+				Domain:       "sync-tag",
+				Project:      "sync-tag",
+				ResourceType: datasource.ResourceKV,
+				Action:       sync.UpdateAction,
+				Status:       sync.PendingStatus,
+			}
+			tasks, err := task.List(tagContext(), &listTaskReq)
+			assert.NoError(t, err)
+			assert.Equal(t, 1, len(tasks))
+			err = task.Delete(tagContext(), tasks...)
+			assert.NoError(t, err)
+			tasks, err = task.List(tagContext(), &listTaskReq)
+			assert.NoError(t, err)
+			assert.Equal(t, 0, len(tasks))
+		})
+	})
+
+	t.Run("delete tags", func(t *testing.T) {
+		t.Run("delete a service's tags will create a task and a tombstone should pass", func(t *testing.T) {
+			resp, err := datasource.GetMetadataManager().DeleteTags(tagContext(), &pb.DeleteServiceTagsRequest{
+				ServiceId: serviceID,
+				Keys:      []string{"a", "b"},
+			})
+			assert.NoError(t, err)
+			assert.Equal(t, pb.ResponseSuccess, resp.Response.GetCode())
+			respGetTags, err := datasource.GetMetadataManager().GetTags(tagContext(), &pb.GetServiceTagsRequest{
+				ServiceId: serviceID,
+			})
+			assert.NoError(t, err)
+			assert.Equal(t, pb.ResponseSuccess, resp.Response.GetCode())
+			assert.Equal(t, "", respGetTags.Tags["a"])
+			assert.Equal(t, "", respGetTags.Tags["b"])
+			listTaskReq := model.ListTaskRequest{
+				Domain:       "sync-tag",
+				Project:      "sync-tag",
+				ResourceType: datasource.ResourceKV,
+				Action:       sync.DeleteAction,
+				Status:       sync.PendingStatus,
+			}
+			tasks, err := task.List(tagContext(), &listTaskReq)
+			assert.NoError(t, err)
+			assert.Equal(t, 1, len(tasks))
+			err = task.Delete(tagContext(), tasks...)
+			assert.NoError(t, err)
+			tasks, err = task.List(tagContext(), &listTaskReq)
+			assert.NoError(t, err)
+			assert.Equal(t, 0, len(tasks))
+			tombstoneListReq := model.ListTombstoneRequest{
+				Domain:       "sync-tag",
+				Project:      "sync-tag",
+				ResourceType: datasource.ResourceKV,
+			}
+			tombstones, err := tombstone.List(tagContext(), &tombstoneListReq)
+			assert.NoError(t, err)
+			assert.Equal(t, 1, len(tombstones))
+			err = tombstone.Delete(tagContext(), tombstones...)
+			assert.NoError(t, err)
+		})
+	})
+
+	t.Run("unregister service", func(t *testing.T) {
+		t.Run("unregister a service will create a task and a tombstone should pass", func(t *testing.T) {
+			resp, err := datasource.GetMetadataManager().UnregisterService(tagContext(), &pb.DeleteServiceRequest{
+				ServiceId: serviceID,
+				Force:     true,
+			})
+			assert.NotNil(t, resp)
+			assert.NoError(t, err)
+			assert.Equal(t, pb.ResponseSuccess, resp.Response.GetCode())
+			listTaskReq := model.ListTaskRequest{
+				Domain:       "sync-tag",
+				Project:      "sync-tag",
+				ResourceType: datasource.ResourceService,
+				Action:       sync.DeleteAction,
+				Status:       sync.PendingStatus,
+			}
+			tasks, err := task.List(tagContext(), &listTaskReq)
+			assert.NoError(t, err)
+			assert.Equal(t, 1, len(tasks))
+			err = task.Delete(tagContext(), tasks...)
+			assert.NoError(t, err)
+			tasks, err = task.List(tagContext(), &listTaskReq)
+			assert.NoError(t, err)
+			assert.Equal(t, 0, len(tasks))
+			tombstoneListReq := model.ListTombstoneRequest{
+				Domain:       "sync-tag",
+				Project:      "sync-tag",
+				ResourceType: datasource.ResourceService,
+			}
+			tombstones, err := tombstone.List(tagContext(), &tombstoneListReq)
+			assert.NoError(t, err)
+			assert.Equal(t, 1, len(tombstones))
+			err = tombstone.Delete(tagContext(), tombstones...)
+			assert.NoError(t, err)
+		})
+	})
+
+	datasource.EnableSync = false
+}
diff --git a/datasource/etcd/task_util.go b/datasource/etcd/task_util.go
deleted file mode 100644
index 232a841..0000000
--- a/datasource/etcd/task_util.go
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package etcd
-
-import (
-	"encoding/json"
-
-	"github.com/go-chassis/cari/sync"
-	"github.com/little-cui/etcdadpt"
-
-	"github.com/apache/servicecomb-service-center/eventbase/datasource/etcd/key"
-)
-
-func GenTaskOpts(domain, project, action, resourceType string, resource interface{}) (etcdadpt.OpOptions, error) {
-	task, err := sync.NewTask(domain, project, action, resourceType, resource)
-	if err != nil {
-		return etcdadpt.OpOptions{}, err
-	}
-	taskBytes, err := json.Marshal(task)
-	if err != nil {
-		return etcdadpt.OpOptions{}, err
-	}
-	taskOpPut := etcdadpt.OpPut(etcdadpt.WithStrKey(key.TaskKey(domain, project,
-		task.ID, task.Timestamp)), etcdadpt.WithValue(taskBytes))
-	return taskOpPut, nil
-}
diff --git a/datasource/etcd/tombstone_util.go b/datasource/etcd/tombstone_util.go
deleted file mode 100644
index 6a968c0..0000000
--- a/datasource/etcd/tombstone_util.go
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package etcd
-
-import (
-	"encoding/json"
-
-	"github.com/go-chassis/cari/sync"
-	"github.com/little-cui/etcdadpt"
-
-	"github.com/apache/servicecomb-service-center/eventbase/datasource/etcd/key"
-)
-
-func GenTombstoneOpts(domain, project, resourceType, resourceID string) (etcdadpt.OpOptions, error) {
-	tombstone := sync.NewTombstone(domain, project, resourceType, resourceID)
-	tombstoneBytes, err := json.Marshal(tombstone)
-	if err != nil {
-		return etcdadpt.OpOptions{}, err
-	}
-	tombstoneOpPut := etcdadpt.OpPut(etcdadpt.WithStrKey(key.TombstoneKey(domain, project, tombstone.ResourceType,
-		tombstone.ResourceID)), etcdadpt.WithValue(tombstoneBytes))
-	return tombstoneOpPut, nil
-}
diff --git a/datasource/etcd/util/tag_util.go b/datasource/etcd/util/tag_util.go
index a65fcc7..1b33f80 100644
--- a/datasource/etcd/util/tag_util.go
+++ b/datasource/etcd/util/tag_util.go
@@ -22,12 +22,15 @@ import (
 	"encoding/json"
 	"fmt"
 
-	"github.com/apache/servicecomb-service-center/datasource/etcd/path"
-	"github.com/apache/servicecomb-service-center/datasource/etcd/sd"
-	"github.com/apache/servicecomb-service-center/pkg/log"
 	"github.com/go-chassis/cari/discovery"
 	"github.com/go-chassis/cari/pkg/errsvc"
 	"github.com/little-cui/etcdadpt"
+
+	"github.com/apache/servicecomb-service-center/datasource"
+	"github.com/apache/servicecomb-service-center/datasource/etcd/path"
+	"github.com/apache/servicecomb-service-center/datasource/etcd/sd"
+	esync "github.com/apache/servicecomb-service-center/datasource/etcd/sync"
+	"github.com/apache/servicecomb-service-center/pkg/log"
 )
 
 func AddTagIntoETCD(ctx context.Context, domainProject string, serviceID string, dataTags map[string]string) *errsvc.Error {
@@ -37,10 +40,14 @@ func AddTagIntoETCD(ctx context.Context, domainProject string, serviceID string,
 		return discovery.NewError(discovery.ErrInternal, err.Error())
 	}
 
-	resp, err := etcdadpt.TxnWithCmp(ctx,
-		etcdadpt.Ops(etcdadpt.OpPut(etcdadpt.WithStrKey(key), etcdadpt.WithValue(data))),
-		etcdadpt.If(etcdadpt.NotEqualVer(path.GenerateServiceKey(domainProject, serviceID), 0)),
-		nil)
+	opts := etcdadpt.Ops(etcdadpt.OpPut(etcdadpt.WithStrKey(key), etcdadpt.WithValue(data)))
+	syncOpts, err := esync.GenUpdateOpts(ctx, datasource.ResourceKV, data, esync.WithOpts(map[string]string{"key": key}))
+	if err != nil {
+		return discovery.NewError(discovery.ErrInternal, err.Error())
+	}
+	opts = append(opts, syncOpts...)
+	resp, err := etcdadpt.TxnWithCmp(ctx, opts,
+		etcdadpt.If(etcdadpt.NotEqualVer(path.GenerateServiceKey(domainProject, serviceID), 0)), nil)
 	if err != nil {
 		return discovery.NewError(discovery.ErrUnavailableBackend, err.Error())
 	}