You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by ti...@apache.org on 2022/01/07 04:48:01 UTC
[servicecomb-service-center] branch master updated: [feat] add tag sync func and ut when db mode is etcd (#1204)
This is an automated email from the ASF dual-hosted git repository.
tianxiaoliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-service-center.git
The following commit(s) were added to refs/heads/master by this push:
new 738f155 [feat] add tag sync func and ut when db mode is etcd (#1204)
738f155 is described below
commit 738f155d006d5e47e4e100257e7b3a8600a3ce5f
Author: robotljw <79...@qq.com>
AuthorDate: Fri Jan 7 12:47:54 2022 +0800
[feat] add tag sync func and ut when db mode is etcd (#1204)
---
datasource/common.go | 1 +
datasource/etcd/account.go | 65 +++++-------
datasource/etcd/account_test.go | 8 +-
datasource/etcd/dep.go | 38 +++----
datasource/etcd/ms.go | 194 ++++++++++++++++------------------
datasource/etcd/role.go | 59 +++++------
datasource/etcd/role_test.go | 8 +-
datasource/etcd/sync/sync.go | 126 ++++++++++++++++++++++
datasource/etcd/sync/sync_test.go | 58 +++++++++++
datasource/etcd/tag_test.go | 214 ++++++++++++++++++++++++++++++++++++++
datasource/etcd/task_util.go | 41 --------
datasource/etcd/tombstone_util.go | 38 -------
datasource/etcd/util/tag_util.go | 21 ++--
13 files changed, 581 insertions(+), 290 deletions(-)
diff --git a/datasource/common.go b/datasource/common.go
index a3b9453..5149fb2 100644
--- a/datasource/common.go
+++ b/datasource/common.go
@@ -36,6 +36,7 @@ const (
ResourceRole = "role"
ResourceDependency = "dependency"
ResourceService = "service"
+ ResourceKV = "kv"
)
// WrapErrResponse is temp func here to wait finish to refact the discosvc pkg
diff --git a/datasource/etcd/account.go b/datasource/etcd/account.go
index d94352e..c0aded7 100644
--- a/datasource/etcd/account.go
+++ b/datasource/etcd/account.go
@@ -22,13 +22,13 @@ import (
"strconv"
"time"
- rbacmodel "github.com/go-chassis/cari/rbac"
- "github.com/go-chassis/cari/sync"
+ crbac "github.com/go-chassis/cari/rbac"
"github.com/go-chassis/foundation/stringutil"
"github.com/little-cui/etcdadpt"
"github.com/apache/servicecomb-service-center/datasource"
"github.com/apache/servicecomb-service-center/datasource/etcd/path"
+ esync "github.com/apache/servicecomb-service-center/datasource/etcd/sync"
"github.com/apache/servicecomb-service-center/datasource/rbac"
"github.com/apache/servicecomb-service-center/pkg/etcdsync"
"github.com/apache/servicecomb-service-center/pkg/log"
@@ -49,7 +49,7 @@ func NewRbacDAO(opts rbac.Options) (rbac.DAO, error) {
type RbacDAO struct {
}
-func (ds *RbacDAO) CreateAccount(ctx context.Context, a *rbacmodel.Account) error {
+func (ds *RbacDAO) CreateAccount(ctx context.Context, a *crbac.Account) error {
lock, err := etcdsync.Lock("/account-creating/"+a.Name, -1, false)
if err != nil {
return fmt.Errorf("account %s is creating", a.Name)
@@ -83,14 +83,12 @@ func (ds *RbacDAO) CreateAccount(ctx context.Context, a *rbacmodel.Account) erro
log.Error("", err)
return err
}
- if datasource.EnableSync {
- op, err := GenTaskOpts("", "", sync.CreateAction, datasource.ResourceAccount, a)
- if err != nil {
- log.Error("", err)
- return err
- }
- opts = append(opts, op)
+ syncOpts, err := esync.GenCreateOpts(ctx, datasource.ResourceAccount, a)
+ if err != nil {
+ log.Error("fail to create sync opts", err)
+ return err
}
+ opts = append(opts, syncOpts...)
err = etcdadpt.Txn(ctx, opts)
if err != nil {
log.Error("can not save account info", err)
@@ -100,7 +98,7 @@ func (ds *RbacDAO) CreateAccount(ctx context.Context, a *rbacmodel.Account) erro
return nil
}
-func GenAccountOpts(a *rbacmodel.Account, action etcdadpt.Action) ([]etcdadpt.OpOptions, error) {
+func GenAccountOpts(a *crbac.Account, action etcdadpt.Action) ([]etcdadpt.OpOptions, error) {
opts := make([]etcdadpt.OpOptions, 0)
value, err := json.Marshal(a)
if err != nil {
@@ -126,7 +124,7 @@ func GenAccountOpts(a *rbacmodel.Account, action etcdadpt.Action) ([]etcdadpt.Op
func (ds *RbacDAO) AccountExist(ctx context.Context, name string) (bool, error) {
return etcdadpt.Exist(ctx, path.GenerateRBACAccountKey(name))
}
-func (ds *RbacDAO) GetAccount(ctx context.Context, name string) (*rbacmodel.Account, error) {
+func (ds *RbacDAO) GetAccount(ctx context.Context, name string) (*crbac.Account, error) {
kv, err := etcdadpt.Get(ctx, path.GenerateRBACAccountKey(name))
if err != nil {
return nil, err
@@ -134,7 +132,7 @@ func (ds *RbacDAO) GetAccount(ctx context.Context, name string) (*rbacmodel.Acco
if kv == nil {
return nil, rbac.ErrAccountNotExist
}
- account := &rbacmodel.Account{}
+ account := &crbac.Account{}
err = json.Unmarshal(kv.Value, account)
if err != nil {
log.Error("account info format invalid", err)
@@ -144,7 +142,7 @@ func (ds *RbacDAO) GetAccount(ctx context.Context, name string) (*rbacmodel.Acco
return account, nil
}
-func (ds *RbacDAO) compatibleOldVersionAccount(a *rbacmodel.Account) {
+func (ds *RbacDAO) compatibleOldVersionAccount(a *crbac.Account) {
// old version use Role, now use Roles
// Role/Roles will not exist at the same time
if len(a.Role) == 0 {
@@ -154,14 +152,14 @@ func (ds *RbacDAO) compatibleOldVersionAccount(a *rbacmodel.Account) {
a.Role = ""
}
-func (ds *RbacDAO) ListAccount(ctx context.Context) ([]*rbacmodel.Account, int64, error) {
+func (ds *RbacDAO) ListAccount(ctx context.Context) ([]*crbac.Account, int64, error) {
kvs, n, err := etcdadpt.List(ctx, path.GenerateRBACAccountKey(""))
if err != nil {
return nil, 0, err
}
- accounts := make([]*rbacmodel.Account, 0, n)
+ accounts := make([]*crbac.Account, 0, n)
for _, v := range kvs {
- a := &rbacmodel.Account{}
+ a := &crbac.Account{}
err = json.Unmarshal(v.Value, a)
if err != nil {
log.Error("account info format invalid:", err)
@@ -194,20 +192,13 @@ func (ds *RbacDAO) DeleteAccount(ctx context.Context, names []string) (bool, err
continue //do not fail if some account is invalid
}
- if datasource.EnableSync {
- taskOpt, err := GenTaskOpts("", "", sync.DeleteAction, datasource.ResourceAccount, a)
- if err != nil {
- log.Error("", err)
- return false, err
- }
- tombstoneOpt, err := GenTombstoneOpts("", "", datasource.ResourceAccount, a.Name)
- if err != nil {
- log.Error("", err)
- return false, err
- }
- opts = append(opts, tombstoneOpt, taskOpt)
+ syncOpts, err := esync.GenDeleteOpts(ctx, datasource.ResourceAccount, a.Name, a)
+ if err != nil {
+ log.Error("fail to create sync opts", err)
+ return false, err
}
allOpts = append(allOpts, opts...)
+ allOpts = append(allOpts, syncOpts...)
}
err := etcdadpt.Txn(ctx, allOpts)
if err != nil {
@@ -217,7 +208,7 @@ func (ds *RbacDAO) DeleteAccount(ctx context.Context, names []string) (bool, err
return true, nil
}
-func (ds *RbacDAO) UpdateAccount(ctx context.Context, name string, account *rbacmodel.Account) error {
+func (ds *RbacDAO) UpdateAccount(ctx context.Context, name string, account *crbac.Account) error {
var (
opts []etcdadpt.OpOptions
err error
@@ -247,14 +238,12 @@ func (ds *RbacDAO) UpdateAccount(ctx context.Context, name string, account *rbac
}
opts = append(opts, opt)
}
- if datasource.EnableSync {
- op, err := GenTaskOpts("", "", sync.UpdateAction, datasource.ResourceAccount, account)
- if err != nil {
- log.Error("", err)
- return err
- }
- opts = append(opts, op)
+ syncOpts, err := esync.GenUpdateOpts(ctx, datasource.ResourceAccount, account)
+ if err != nil {
+ log.Error("fail to create sync opts", err)
+ return err
}
+ opts = append(opts, syncOpts...)
err = etcdadpt.Txn(ctx, opts)
if err != nil {
log.Error("BatchCommit failed", err)
@@ -262,7 +251,7 @@ func (ds *RbacDAO) UpdateAccount(ctx context.Context, name string, account *rbac
return err
}
-func hasRole(account *rbacmodel.Account, r string) bool {
+func hasRole(account *crbac.Account, r string) bool {
for _, n := range account.Roles {
if r == n {
return true
diff --git a/datasource/etcd/account_test.go b/datasource/etcd/account_test.go
index 47a3771..8284592 100644
--- a/datasource/etcd/account_test.go
+++ b/datasource/etcd/account_test.go
@@ -21,7 +21,7 @@ import (
"context"
"testing"
- rbacmodel "github.com/go-chassis/cari/rbac"
+ crbac "github.com/go-chassis/cari/rbac"
"github.com/stretchr/testify/assert"
"github.com/apache/servicecomb-service-center/datasource"
@@ -39,7 +39,7 @@ func TestSyncAccount(t *testing.T) {
t.Run("create account", func(t *testing.T) {
t.Run("creating a account then delete it will create two tasks and a tombstone should pass",
func(t *testing.T) {
- a1 := rbacmodel.Account{
+ a1 := crbac.Account{
ID: "sync-create-11111",
Name: "sync-create-account1",
Password: "tnuocca-tset",
@@ -78,7 +78,7 @@ func TestSyncAccount(t *testing.T) {
t.Run("update account", func(t *testing.T) {
t.Run("creating two accounts then update them,finally delete them, will create six tasks and two tombstones should pass",
func(t *testing.T) {
- a2 := rbacmodel.Account{
+ a2 := crbac.Account{
ID: "sync-update-22222",
Name: "sync-update-account2",
Password: "tnuocca-tset",
@@ -86,7 +86,7 @@ func TestSyncAccount(t *testing.T) {
TokenExpirationTime: "2020-12-30",
CurrentPassword: "tnuocca-tset",
}
- a3 := rbacmodel.Account{
+ a3 := crbac.Account{
ID: "sync-update-33333",
Name: "sync-update-account3",
Password: "tnuocca-tset",
diff --git a/datasource/etcd/dep.go b/datasource/etcd/dep.go
index 2a719a8..bf0eb74 100644
--- a/datasource/etcd/dep.go
+++ b/datasource/etcd/dep.go
@@ -24,14 +24,14 @@ import (
"fmt"
pb "github.com/go-chassis/cari/discovery"
- "github.com/go-chassis/cari/sync"
"github.com/little-cui/etcdadpt"
"github.com/apache/servicecomb-service-center/datasource"
"github.com/apache/servicecomb-service-center/datasource/etcd/event"
"github.com/apache/servicecomb-service-center/datasource/etcd/path"
"github.com/apache/servicecomb-service-center/datasource/etcd/sd"
- serviceUtil "github.com/apache/servicecomb-service-center/datasource/etcd/util"
+ esync "github.com/apache/servicecomb-service-center/datasource/etcd/sync"
+ eutil "github.com/apache/servicecomb-service-center/datasource/etcd/util"
"github.com/apache/servicecomb-service-center/pkg/log"
"github.com/apache/servicecomb-service-center/pkg/util"
)
@@ -42,7 +42,7 @@ type DepManager struct {
func (dm *DepManager) SearchProviderDependency(ctx context.Context, request *pb.GetDependenciesRequest) (*pb.GetProDependenciesResponse, error) {
domainProject := util.ParseDomainProject(ctx)
providerServiceID := request.ServiceId
- provider, err := serviceUtil.GetService(ctx, domainProject, providerServiceID)
+ provider, err := eutil.GetService(ctx, domainProject, providerServiceID)
if err != nil {
if errors.Is(err, datasource.ErrNoData) {
@@ -55,7 +55,7 @@ func (dm *DepManager) SearchProviderDependency(ctx context.Context, request *pb.
return nil, err
}
- services, err := serviceUtil.GetConsumers(ctx, domainProject, provider, toDependencyFilterOptions(request)...)
+ services, err := eutil.GetConsumers(ctx, domainProject, provider, toDependencyFilterOptions(request)...)
if err != nil {
log.Error(fmt.Sprintf("query provider failed, provider is %s/%s/%s/%s",
provider.Environment, provider.AppId, provider.ServiceName, provider.Version), err)
@@ -73,7 +73,7 @@ func (dm *DepManager) SearchProviderDependency(ctx context.Context, request *pb.
func (dm *DepManager) SearchConsumerDependency(ctx context.Context, request *pb.GetDependenciesRequest) (*pb.GetConDependenciesResponse, error) {
consumerID := request.ServiceId
domainProject := util.ParseDomainProject(ctx)
- consumer, err := serviceUtil.GetService(ctx, domainProject, consumerID)
+ consumer, err := eutil.GetService(ctx, domainProject, consumerID)
if err != nil {
if errors.Is(err, datasource.ErrNoData) {
@@ -86,7 +86,7 @@ func (dm *DepManager) SearchConsumerDependency(ctx context.Context, request *pb.
return nil, err
}
- services, err := serviceUtil.GetProviders(ctx, domainProject, consumer, toDependencyFilterOptions(request)...)
+ services, err := eutil.GetProviders(ctx, domainProject, consumer, toDependencyFilterOptions(request)...)
if err != nil {
log.Error(fmt.Sprintf("query consumer failed, consumer is %s/%s/%s/%s",
consumer.Environment, consumer.AppId, consumer.ServiceName, consumer.Version), err)
@@ -136,7 +136,7 @@ func (dm *DepManager) AddOrUpdateDependencies(ctx context.Context, dependencyInf
return rsp.Response, nil
}
- consumerID, err := serviceUtil.GetServiceID(ctx, consumerInfo)
+ consumerID, err := eutil.GetServiceID(ctx, consumerInfo)
if err != nil {
log.Error(fmt.Sprintf("put request into dependency queue failed, override: %t, get consumer[%s] id failed",
override, consumerFlag), err)
@@ -164,21 +164,17 @@ func (dm *DepManager) AddOrUpdateDependencies(ctx context.Context, dependencyInf
opts = append(opts, etcdadpt.OpPut(etcdadpt.WithStrKey(key), etcdadpt.WithValue(data)))
}
- if datasource.EnableSync {
- action := sync.UpdateAction
- if override {
- action = sync.CreateAction
- }
- domain := util.ParseDomain(ctx)
- project := util.ParseProject(ctx)
- taskOpt, err := GenTaskOpts(domain, project, action, datasource.ResourceDependency, dependencyInfos)
- if err != nil {
- log.Error("fail to create task", err)
- return pb.CreateResponse(pb.ErrInternal, err.Error()), err
- }
- opts = append(opts, taskOpt)
+ syncOpts, err := esync.GenCreateOpts(ctx, datasource.ResourceDependency, dependencyInfos)
+ if !override {
+ syncOpts, err = esync.GenUpdateOpts(ctx, datasource.ResourceDependency, dependencyInfos)
}
- err := etcdadpt.Txn(ctx, opts)
+ if err != nil {
+ log.Error("fail to create sync opts", err)
+ return pb.CreateResponse(pb.ErrInternal, err.Error()), err
+ }
+ opts = append(opts, syncOpts...)
+
+ err = etcdadpt.Txn(ctx, opts)
if err != nil {
log.Error(fmt.Sprintf("put request into dependency queue failed, override: %t, %v",
override, dependencyInfos), err)
diff --git a/datasource/etcd/ms.go b/datasource/etcd/ms.go
index 2747834..ce5ecf4 100644
--- a/datasource/etcd/ms.go
+++ b/datasource/etcd/ms.go
@@ -25,25 +25,25 @@ import (
"strconv"
"time"
- "github.com/go-chassis/cari/sync"
+ pb "github.com/go-chassis/cari/discovery"
+ "github.com/go-chassis/cari/pkg/errsvc"
+ "github.com/go-chassis/foundation/gopool"
+ "github.com/jinzhu/copier"
+ "github.com/little-cui/etcdadpt"
"github.com/apache/servicecomb-service-center/datasource"
"github.com/apache/servicecomb-service-center/datasource/etcd/cache"
"github.com/apache/servicecomb-service-center/datasource/etcd/path"
"github.com/apache/servicecomb-service-center/datasource/etcd/sd"
"github.com/apache/servicecomb-service-center/datasource/etcd/state/kvstore"
- serviceUtil "github.com/apache/servicecomb-service-center/datasource/etcd/util"
+ esync "github.com/apache/servicecomb-service-center/datasource/etcd/sync"
+ eutil "github.com/apache/servicecomb-service-center/datasource/etcd/util"
"github.com/apache/servicecomb-service-center/datasource/schema"
"github.com/apache/servicecomb-service-center/pkg/log"
"github.com/apache/servicecomb-service-center/pkg/util"
"github.com/apache/servicecomb-service-center/server/core"
"github.com/apache/servicecomb-service-center/server/plugin/uuid"
quotasvc "github.com/apache/servicecomb-service-center/server/service/quota"
- pb "github.com/go-chassis/cari/discovery"
- "github.com/go-chassis/cari/pkg/errsvc"
- "github.com/go-chassis/foundation/gopool"
- "github.com/jinzhu/copier"
- "github.com/little-cui/etcdadpt"
)
var (
@@ -127,18 +127,14 @@ func (ds *MetadataManager) RegisterService(ctx context.Context, request *pb.Crea
failOpts = append(failOpts, etcdadpt.OpGet(etcdadpt.WithStrKey(alias)))
}
- if datasource.EnableSync {
- domain := util.ParseDomain(ctx)
- project := util.ParseProject(ctx)
- taskOpt, err := GenTaskOpts(domain, project, sync.CreateAction, datasource.ResourceService, request)
- if err != nil {
- log.Error("fail to create task", err)
- return &pb.CreateServiceResponse{
- Response: pb.CreateResponse(pb.ErrInternal, err.Error()),
- }, err
- }
- opts = append(opts, taskOpt)
+ syncOpts, err := esync.GenCreateOpts(ctx, datasource.ResourceService, request)
+ if err != nil {
+ log.Error("fail to create sync opts", err)
+ return &pb.CreateServiceResponse{
+ Response: pb.CreateResponse(pb.ErrInternal, err.Error()),
+ }, err
}
+ opts = append(opts, syncOpts...)
resp, err := etcdadpt.TxnWithCmp(ctx, opts, uniqueCmpOpts, failOpts)
if err != nil {
@@ -192,7 +188,7 @@ func (ds *MetadataManager) RegisterService(ctx context.Context, request *pb.Crea
func (ds *MetadataManager) GetServices(ctx context.Context, request *pb.GetServicesRequest) (
*pb.GetServicesResponse, error) {
- services, err := serviceUtil.GetAllServiceUtil(ctx)
+ services, err := eutil.GetAllServiceUtil(ctx)
if err != nil {
log.Error("get all services by domain failed", err)
return &pb.GetServicesResponse{
@@ -209,7 +205,7 @@ func (ds *MetadataManager) GetServices(ctx context.Context, request *pb.GetServi
func (ds *MetadataManager) GetService(ctx context.Context, request *pb.GetServiceRequest) (
*pb.MicroService, error) {
domainProject := util.ParseDomainProject(ctx)
- singleService, err := serviceUtil.GetService(ctx, domainProject, request.ServiceId)
+ singleService, err := eutil.GetService(ctx, domainProject, request.ServiceId)
if err != nil {
if errors.Is(err, datasource.ErrNoData) {
@@ -226,7 +222,7 @@ func (ds *MetadataManager) GetServiceDetail(ctx context.Context, request *pb.Get
*pb.ServiceDetail, error) {
domainProject := util.ParseDomainProject(ctx)
- service, err := serviceUtil.GetService(ctx, domainProject, request.ServiceId)
+ service, err := eutil.GetService(ctx, domainProject, request.ServiceId)
if err != nil {
if errors.Is(err, datasource.ErrNoData) {
return nil, pb.NewError(pb.ErrServiceNotExists, "Service does not exist.")
@@ -297,7 +293,7 @@ func (ds *MetadataManager) ListServiceDetail(ctx context.Context, request *pb.Ge
}
//获取所有服务
- services, err := serviceUtil.GetAllServiceUtil(ctx)
+ services, err := eutil.GetAllServiceUtil(ctx)
if err != nil {
log.Error("get all services by domain failed", err)
return nil, pb.NewError(pb.ErrInternal, err.Error())
@@ -370,7 +366,7 @@ func (ds *MetadataManager) ListApp(ctx context.Context, request *pb.GetAppsReque
domainProject := util.ParseDomainProject(ctx)
key := path.GetServiceAppKey(domainProject, request.Environment, "")
- opts := append(serviceUtil.FromContext(ctx),
+ opts := append(eutil.FromContext(ctx),
etcdadpt.WithStrKey(key),
etcdadpt.WithPrefix(),
etcdadpt.WithKeyOnly())
@@ -410,7 +406,7 @@ func (ds *MetadataManager) ExistServiceByID(ctx context.Context, request *pb.Get
domainProject := util.ParseDomainProject(ctx)
return &pb.GetExistenceByIDResponse{
Response: pb.CreateResponse(pb.ResponseSuccess, "Get all applications successfully."),
- Exist: serviceUtil.ServiceExist(ctx, domainProject, request.ServiceId),
+ Exist: eutil.ServiceExist(ctx, domainProject, request.ServiceId),
}, nil
}
@@ -420,7 +416,7 @@ func (ds *MetadataManager) ExistService(ctx context.Context, request *pb.GetExis
serviceFlag := util.StringJoin([]string{
request.Environment, request.AppId, request.ServiceName, request.Version}, path.SPLIT)
- ids, exist, err := serviceUtil.FindServiceIds(ctx, &pb.MicroServiceKey{
+ ids, exist, err := eutil.FindServiceIds(ctx, &pb.MicroServiceKey{
Environment: request.Environment,
AppId: request.AppId,
ServiceName: request.ServiceName,
@@ -458,7 +454,7 @@ func (ds *MetadataManager) UpdateService(ctx context.Context, request *pb.Update
domainProject := util.ParseDomainProject(ctx)
key := path.GenerateServiceKey(domainProject, request.ServiceId)
- microservice, err := serviceUtil.GetService(ctx, domainProject, request.ServiceId)
+ microservice, err := eutil.GetService(ctx, domainProject, request.ServiceId)
if err != nil {
if errors.Is(err, datasource.ErrNoData) {
log.Debug(fmt.Sprintf("service does not exist, update service[%s] properties failed, operator: %s",
@@ -490,18 +486,14 @@ func (ds *MetadataManager) UpdateService(ctx context.Context, request *pb.Update
opts := []etcdadpt.OpOptions{
etcdadpt.OpPut(etcdadpt.WithStrKey(key), etcdadpt.WithValue(data)),
}
- if datasource.EnableSync {
- domain := util.ParseDomain(ctx)
- project := util.ParseProject(ctx)
- taskOpt, err := GenTaskOpts(domain, project, sync.UpdateAction, datasource.ResourceService, request)
- if err != nil {
- log.Error("fail to create task", err)
- return &pb.UpdateServicePropsResponse{
- Response: pb.CreateResponse(pb.ErrInternal, err.Error()),
- }, err
- }
- opts = append(opts, taskOpt)
+ syncOpts, err := esync.GenUpdateOpts(ctx, datasource.ResourceService, request)
+ if err != nil {
+ log.Error("fail to create task", err)
+ return &pb.UpdateServicePropsResponse{
+ Response: pb.CreateResponse(pb.ErrInternal, err.Error()),
+ }, err
}
+ opts = append(opts, syncOpts...)
// Set key file
resp, err := etcdadpt.TxnWithCmp(ctx, opts, etcdadpt.If(etcdadpt.NotEqualVer(key, 0)), nil)
@@ -660,7 +652,7 @@ func (ds *MetadataManager) sendHeartbeatInstead(ctx context.Context, instance *p
func (ds *MetadataManager) ExistInstanceByID(ctx context.Context, request *pb.MicroServiceInstanceKey) (*pb.GetExistenceByIDResponse, error) {
domainProject := util.ParseDomainProject(ctx)
- exist, _ := serviceUtil.InstanceExist(ctx, domainProject, request.ServiceId, request.InstanceId)
+ exist, _ := eutil.InstanceExist(ctx, domainProject, request.ServiceId, request.InstanceId)
if !exist {
return &pb.GetExistenceByIDResponse{
Response: pb.CreateResponse(pb.ErrInstanceNotExists, "Check instance exist failed."),
@@ -680,7 +672,7 @@ func (ds *MetadataManager) GetInstance(ctx context.Context, request *pb.GetOneIn
service := &pb.MicroService{}
var err error
if len(request.ConsumerServiceId) > 0 {
- service, err = serviceUtil.GetService(ctx, domainProject, request.ConsumerServiceId)
+ service, err = eutil.GetService(ctx, domainProject, request.ConsumerServiceId)
if err != nil {
if errors.Is(err, datasource.ErrNoData) {
log.Debug(fmt.Sprintf("consumer does not exist in db, consumer[%s] find provider instance[%s/%s]",
@@ -698,7 +690,7 @@ func (ds *MetadataManager) GetInstance(ctx context.Context, request *pb.GetOneIn
}
}
- provider, err := serviceUtil.GetService(ctx, domainProject, request.ProviderServiceId)
+ provider, err := eutil.GetService(ctx, domainProject, request.ProviderServiceId)
if err != nil {
if errors.Is(err, datasource.ErrNoData) {
log.Debug(fmt.Sprintf("provider does not exist in db, consumer[%s] find provider instance[%s/%s]",
@@ -761,7 +753,7 @@ func (ds *MetadataManager) GetInstances(ctx context.Context, request *pb.GetInst
service := &pb.MicroService{}
var err error
if len(request.ConsumerServiceId) > 0 {
- service, err = serviceUtil.GetService(ctx, domainProject, request.ConsumerServiceId)
+ service, err = eutil.GetService(ctx, domainProject, request.ConsumerServiceId)
if err != nil {
if errors.Is(err, datasource.ErrNoData) {
log.Debug(fmt.Sprintf("consumer does not exist in db, consumer[%s] find provider[%s] instances",
@@ -779,7 +771,7 @@ func (ds *MetadataManager) GetInstances(ctx context.Context, request *pb.GetInst
}
}
- provider, err := serviceUtil.GetService(ctx, domainProject, request.ProviderServiceId)
+ provider, err := eutil.GetService(ctx, domainProject, request.ProviderServiceId)
if err != nil {
if errors.Is(err, datasource.ErrNoData) {
log.Debug(fmt.Sprintf("provider does not exist, consumer[%s] find provider[%s] instances",
@@ -844,7 +836,7 @@ func (ds *MetadataManager) GetProviderInstances(ctx context.Context, request *pb
if err != nil {
return
}
- return instances, serviceUtil.FormatRevision(maxRevs, counts), nil
+ return instances, eutil.FormatRevision(maxRevs, counts), nil
}
func (ds *MetadataManager) BatchGetProviderInstances(ctx context.Context, request *pb.BatchGetInstancesRequest) (instances []*pb.MicroServiceInstance, rev string, err error) {
@@ -865,12 +857,12 @@ func (ds *MetadataManager) BatchGetProviderInstances(ctx context.Context, reques
instances = append(instances, insts...)
}
- return instances, serviceUtil.FormatRevision(maxRevs, counts), nil
+ return instances, eutil.FormatRevision(maxRevs, counts), nil
}
func (ds *MetadataManager) findInstances(ctx context.Context, domainProject, serviceID string, maxRevs []int64, counts []int64) (instances []*pb.MicroServiceInstance, err error) {
key := path.GenerateInstanceKey(domainProject, serviceID, "")
- opts := append(serviceUtil.FromContext(ctx), etcdadpt.WithStrKey(key), etcdadpt.WithPrefix())
+ opts := append(eutil.FromContext(ctx), etcdadpt.WithStrKey(key), etcdadpt.WithPrefix())
resp, err := sd.Instance().Search(ctx, opts...)
if err != nil {
return nil, err
@@ -922,7 +914,7 @@ func (ds *MetadataManager) findInstance(ctx context.Context, request *pb.FindIns
domainProject := util.ParseDomainProject(ctx)
service := &pb.MicroService{Environment: request.Environment}
if len(request.ConsumerServiceId) > 0 {
- service, err = serviceUtil.GetService(ctx, domainProject, request.ConsumerServiceId)
+ service, err = eutil.GetService(ctx, domainProject, request.ConsumerServiceId)
if err != nil {
if errors.Is(err, datasource.ErrNoData) {
log.Debug(fmt.Sprintf("consumer does not exist, consumer[%s] find provider[%s/%s/%s]",
@@ -976,7 +968,7 @@ func (ds *MetadataManager) findInstance(ctx context.Context, request *pb.FindIns
return nil, err
}
if provider != nil {
- err = serviceUtil.AddServiceVersionRule(ctx, domainProject, service, provider)
+ err = eutil.AddServiceVersionRule(ctx, domainProject, service, provider)
} else {
err := fmt.Errorf("%s failed, provider does not exist", findFlag)
log.Error("AddServiceVersionRule failed", err)
@@ -1040,7 +1032,7 @@ func (ds *MetadataManager) genFindResult(ctx context.Context, oldRev string, ite
func (ds *MetadataManager) reshapeProviderKey(ctx context.Context, provider *pb.MicroServiceKey, providerID string) (
*pb.MicroServiceKey, error) {
// service name 可能是别名,所以重新获取
- providerService, err := serviceUtil.GetService(ctx, provider.Tenant, providerID)
+ providerService, err := eutil.GetService(ctx, provider.Tenant, providerID)
if err != nil {
return nil, err
}
@@ -1054,7 +1046,7 @@ func (ds *MetadataManager) UpdateInstanceStatus(ctx context.Context, request *pb
domainProject := util.ParseDomainProject(ctx)
updateStatusFlag := util.StringJoin([]string{request.ServiceId, request.InstanceId, request.Status}, path.SPLIT)
- instance, err := serviceUtil.GetInstance(ctx, domainProject, request.ServiceId, request.InstanceId)
+ instance, err := eutil.GetInstance(ctx, domainProject, request.ServiceId, request.InstanceId)
if err != nil {
log.Error(fmt.Sprintf("update instance[%s] status failed", updateStatusFlag), err)
return &pb.UpdateInstanceStatusResponse{
@@ -1071,7 +1063,7 @@ func (ds *MetadataManager) UpdateInstanceStatus(ctx context.Context, request *pb
copyInstanceRef := *instance
copyInstanceRef.Status = request.Status
- if err := serviceUtil.UpdateInstance(ctx, domainProject, ©InstanceRef); err != nil {
+ if err := eutil.UpdateInstance(ctx, domainProject, ©InstanceRef); err != nil {
log.Error(fmt.Sprintf("update instance[%s] status failed", updateStatusFlag), err)
resp := &pb.UpdateInstanceStatusResponse{
Response: pb.CreateResponseWithSCErr(err),
@@ -1093,7 +1085,7 @@ func (ds *MetadataManager) UpdateInstanceProperties(ctx context.Context, request
domainProject := util.ParseDomainProject(ctx)
instanceFlag := util.StringJoin([]string{request.ServiceId, request.InstanceId}, path.SPLIT)
- instance, err := serviceUtil.GetInstance(ctx, domainProject, request.ServiceId, request.InstanceId)
+ instance, err := eutil.GetInstance(ctx, domainProject, request.ServiceId, request.InstanceId)
if err != nil {
log.Error(fmt.Sprintf("update instance[%s] properties failed", instanceFlag), err)
return &pb.UpdateInstancePropsResponse{
@@ -1110,7 +1102,7 @@ func (ds *MetadataManager) UpdateInstanceProperties(ctx context.Context, request
copyInstanceRef := *instance
copyInstanceRef.Properties = request.Properties
- if err := serviceUtil.UpdateInstance(ctx, domainProject, ©InstanceRef); err != nil {
+ if err := eutil.UpdateInstance(ctx, domainProject, ©InstanceRef); err != nil {
log.Error(fmt.Sprintf("update instance[%s] properties failed", instanceFlag), err)
resp := &pb.UpdateInstancePropsResponse{
Response: pb.CreateResponseWithSCErr(err),
@@ -1222,7 +1214,7 @@ func (ds *MetadataManager) batchFindServices(ctx context.Context, request *pb.Ba
return nil, err
}
failed, ok := failedResult[resp.Response.GetCode()]
- serviceUtil.AppendFindResponse(findCtx, int64(index), resp.Response, resp.Instances,
+ eutil.AppendFindResponse(findCtx, int64(index), resp.Response, resp.Instances,
&services.Updated, &services.NotModified, &failed)
if !ok && failed != nil {
failedResult[resp.Response.GetCode()] = failed
@@ -1255,7 +1247,7 @@ func (ds *MetadataManager) batchFindInstances(ctx context.Context, request *pb.B
return nil, err
}
failed, ok := failedResult[resp.Response.GetCode()]
- serviceUtil.AppendFindResponse(getCtx, int64(index), resp.Response, []*pb.MicroServiceInstance{resp.Instance},
+ eutil.AppendFindResponse(getCtx, int64(index), resp.Response, []*pb.MicroServiceInstance{resp.Instance},
&instances.Updated, &instances.NotModified, &failed)
if !ok && failed != nil {
failedResult[resp.Response.GetCode()] = failed
@@ -1300,7 +1292,7 @@ func (ds *MetadataManager) Heartbeat(ctx context.Context, request *pb.HeartbeatR
domainProject := util.ParseDomainProject(ctx)
instanceFlag := util.StringJoin([]string{request.ServiceId, request.InstanceId}, path.SPLIT)
- _, ttl, err := serviceUtil.HeartbeatUtil(ctx, domainProject, request.ServiceId, request.InstanceId)
+ _, ttl, err := eutil.HeartbeatUtil(ctx, domainProject, request.ServiceId, request.InstanceId)
if err != nil {
log.Error(fmt.Sprintf("heartbeat failed, instance[%s]. operator %s",
instanceFlag, remoteIP), err)
@@ -1329,7 +1321,7 @@ func (ds *MetadataManager) Heartbeat(ctx context.Context, request *pb.HeartbeatR
func (ds *MetadataManager) GetAllInstances(ctx context.Context, request *pb.GetAllInstancesRequest) (*pb.GetAllInstancesResponse, error) {
domainProject := util.ParseDomainProject(ctx)
key := path.GetInstanceRootKey(domainProject) + path.SPLIT
- opts := append(serviceUtil.FromContext(ctx), etcdadpt.WithStrKey(key), etcdadpt.WithPrefix())
+ opts := append(eutil.FromContext(ctx), etcdadpt.WithStrKey(key), etcdadpt.WithPrefix())
kvs, err := sd.Instance().Search(ctx, opts...)
if err != nil {
return nil, err
@@ -1354,7 +1346,7 @@ func (ds *MetadataManager) ModifySchemas(ctx context.Context, request *pb.Modify
serviceID := request.ServiceId
domainProject := util.ParseDomainProject(ctx)
- serviceInfo, err := serviceUtil.GetService(ctx, domainProject, serviceID)
+ serviceInfo, err := eutil.GetService(ctx, domainProject, serviceID)
if err != nil {
if errors.Is(err, datasource.ErrNoData) {
log.Debug(fmt.Sprintf("modify service[%s] schemas failed, service does not exist in db, operator: %s",
@@ -1403,7 +1395,7 @@ func (ds *MetadataManager) ExistSchema(ctx context.Context, request *pb.GetExist
*pb.GetExistenceResponse, error) {
domainProject := util.ParseDomainProject(ctx)
- if !serviceUtil.ServiceExist(ctx, domainProject, request.ServiceId) {
+ if !eutil.ServiceExist(ctx, domainProject, request.ServiceId) {
log.Warn(fmt.Sprintf("schema[%s/%s] exist failed, service does not exist", request.ServiceId, request.SchemaId))
return nil, pb.NewError(pb.ErrServiceNotExists, "service does not exist.")
}
@@ -1434,14 +1426,14 @@ func (ds *MetadataManager) ExistSchema(ctx context.Context, request *pb.GetExist
func (ds *MetadataManager) GetSchema(ctx context.Context, request *pb.GetSchemaRequest) (*pb.GetSchemaResponse, error) {
domainProject := util.ParseDomainProject(ctx)
- if !serviceUtil.ServiceExist(ctx, domainProject, request.ServiceId) {
+ if !eutil.ServiceExist(ctx, domainProject, request.ServiceId) {
log.Error(fmt.Sprintf("get schema[%s/%s] failed, service does not exist",
request.ServiceId, request.SchemaId), nil)
return nil, pb.NewError(pb.ErrServiceNotExists, "Service does not exist.")
}
key := path.GenerateServiceSchemaKey(domainProject, request.ServiceId, request.SchemaId)
- opts := append(serviceUtil.FromContext(ctx), etcdadpt.WithStrKey(key))
+ opts := append(eutil.FromContext(ctx), etcdadpt.WithStrKey(key))
resp, errDo := sd.Schema().Search(ctx, opts...)
if errDo != nil {
log.Error(fmt.Sprintf("get schema[%s/%s] failed", request.ServiceId, request.SchemaId), errDo)
@@ -1471,7 +1463,7 @@ func (ds *MetadataManager) GetAllSchemas(ctx context.Context, request *pb.GetAll
*pb.GetAllSchemaResponse, error) {
domainProject := util.ParseDomainProject(ctx)
- service, err := serviceUtil.GetService(ctx, domainProject, request.ServiceId)
+ service, err := eutil.GetService(ctx, domainProject, request.ServiceId)
if err != nil {
if errors.Is(err, datasource.ErrNoData) {
log.Debug(fmt.Sprintf("get service[%s] all schemas failed, service does not exist in db", request.ServiceId))
@@ -1490,7 +1482,7 @@ func (ds *MetadataManager) GetAllSchemas(ctx context.Context, request *pb.GetAll
}
key := path.GenerateServiceSchemaSummaryKey(domainProject, request.ServiceId, "")
- opts := append(serviceUtil.FromContext(ctx), etcdadpt.WithStrKey(key), etcdadpt.WithPrefix())
+ opts := append(eutil.FromContext(ctx), etcdadpt.WithStrKey(key), etcdadpt.WithPrefix())
resp, errDo := sd.SchemaSummary().Search(ctx, opts...)
if errDo != nil {
log.Error(fmt.Sprintf("get service[%s] all schema summaries failed", request.ServiceId), errDo)
@@ -1500,7 +1492,7 @@ func (ds *MetadataManager) GetAllSchemas(ctx context.Context, request *pb.GetAll
respWithSchema := &kvstore.Response{}
if request.WithSchema {
key := path.GenerateServiceSchemaKey(domainProject, request.ServiceId, "")
- opts := append(serviceUtil.FromContext(ctx), etcdadpt.WithStrKey(key), etcdadpt.WithPrefix())
+ opts := append(eutil.FromContext(ctx), etcdadpt.WithStrKey(key), etcdadpt.WithPrefix())
respWithSchema, errDo = sd.Schema().Search(ctx, opts...)
if errDo != nil {
log.Error(fmt.Sprintf("get service[%s] all schemas failed", request.ServiceId), errDo)
@@ -1540,7 +1532,7 @@ func (ds *MetadataManager) DeleteSchema(ctx context.Context, request *pb.DeleteS
domainProject := util.ParseDomainProject(ctx)
key := path.GenerateServiceSchemaKey(domainProject, request.ServiceId, request.SchemaId)
- exist, err := serviceUtil.CheckSchemaInfoExist(ctx, key)
+ exist, err := eutil.CheckSchemaInfoExist(ctx, key)
if err != nil {
log.Error(fmt.Sprintf("delete schema[%s/%s] failed, operator: %s",
request.ServiceId, request.SchemaId, remoteIP), err)
@@ -1582,7 +1574,7 @@ func (ds *MetadataManager) AddTags(ctx context.Context, request *pb.AddServiceTa
domainProject := util.ParseDomainProject(ctx)
// service id存在性校验
- if !serviceUtil.ServiceExist(ctx, domainProject, request.ServiceId) {
+ if !eutil.ServiceExist(ctx, domainProject, request.ServiceId) {
log.Error(fmt.Sprintf("add service[%s]'s tags %v failed, service does not exist, operator: %s",
request.ServiceId, request.Tags, remoteIP), nil)
return &pb.AddServiceTagsResponse{
@@ -1590,7 +1582,7 @@ func (ds *MetadataManager) AddTags(ctx context.Context, request *pb.AddServiceTa
}, nil
}
- checkErr := serviceUtil.AddTagIntoETCD(ctx, domainProject, request.ServiceId, request.Tags)
+ checkErr := eutil.AddTagIntoETCD(ctx, domainProject, request.ServiceId, request.Tags)
if checkErr != nil {
log.Error(fmt.Sprintf("add service[%s]'s tags %v failed, operator: %s",
request.ServiceId, request.Tags, remoteIP), checkErr)
@@ -1612,13 +1604,13 @@ func (ds *MetadataManager) AddTags(ctx context.Context, request *pb.AddServiceTa
func (ds *MetadataManager) GetTags(ctx context.Context, request *pb.GetServiceTagsRequest) (*pb.GetServiceTagsResponse, error) {
var err error
domainProject := util.ParseDomainProject(ctx)
- if !serviceUtil.ServiceExist(ctx, domainProject, request.ServiceId) {
+ if !eutil.ServiceExist(ctx, domainProject, request.ServiceId) {
log.Error(fmt.Sprintf("get service[%s]'s tags failed, service does not exist", request.ServiceId), err)
return &pb.GetServiceTagsResponse{
Response: pb.CreateResponse(pb.ErrServiceNotExists, "Service does not exist."),
}, nil
}
- tags, err := serviceUtil.GetTagsUtils(ctx, domainProject, request.ServiceId)
+ tags, err := eutil.GetTagsUtils(ctx, domainProject, request.ServiceId)
if err != nil {
log.Error(fmt.Sprintf("get service[%s]'s tags failed, get tags failed", request.ServiceId), err)
return &pb.GetServiceTagsResponse{
@@ -1638,7 +1630,7 @@ func (ds *MetadataManager) UpdateTag(ctx context.Context, request *pb.UpdateServ
tagFlag := util.StringJoin([]string{request.Key, request.Value}, path.SPLIT)
domainProject := util.ParseDomainProject(ctx)
- if !serviceUtil.ServiceExist(ctx, domainProject, request.ServiceId) {
+ if !eutil.ServiceExist(ctx, domainProject, request.ServiceId) {
log.Error(fmt.Sprintf("update service[%s]'s tag[%s] failed, service does not exist, operator: %s",
request.ServiceId, tagFlag, remoteIP), err)
return &pb.UpdateServiceTagResponse{
@@ -1646,7 +1638,7 @@ func (ds *MetadataManager) UpdateTag(ctx context.Context, request *pb.UpdateServ
}, nil
}
- tags, err := serviceUtil.GetTagsUtils(ctx, domainProject, request.ServiceId)
+ tags, err := eutil.GetTagsUtils(ctx, domainProject, request.ServiceId)
if err != nil {
log.Error(fmt.Sprintf("update service[%s]'s tag[%s] failed, get tag failed, operator: %s",
request.ServiceId, tagFlag, remoteIP), err)
@@ -1670,7 +1662,7 @@ func (ds *MetadataManager) UpdateTag(ctx context.Context, request *pb.UpdateServ
}
copyTags[request.Key] = request.Value
- checkErr := serviceUtil.AddTagIntoETCD(ctx, domainProject, request.ServiceId, copyTags)
+ checkErr := eutil.AddTagIntoETCD(ctx, domainProject, request.ServiceId, copyTags)
if checkErr != nil {
log.Error(fmt.Sprintf("update service[%s]'s tag[%s] failed, operator: %s",
request.ServiceId, tagFlag, remoteIP), checkErr)
@@ -1693,7 +1685,7 @@ func (ds *MetadataManager) DeleteTags(ctx context.Context, request *pb.DeleteSer
remoteIP := util.GetIPFromContext(ctx)
domainProject := util.ParseDomainProject(ctx)
- if !serviceUtil.ServiceExist(ctx, domainProject, request.ServiceId) {
+ if !eutil.ServiceExist(ctx, domainProject, request.ServiceId) {
log.Error(fmt.Sprintf("delete service[%s]'s tags %v failed, service does not exist, operator: %s",
request.ServiceId, request.Keys, remoteIP), nil)
return &pb.DeleteServiceTagsResponse{
@@ -1701,7 +1693,7 @@ func (ds *MetadataManager) DeleteTags(ctx context.Context, request *pb.DeleteSer
}, nil
}
- tags, err := serviceUtil.GetTagsUtils(ctx, domainProject, request.ServiceId)
+ tags, err := eutil.GetTagsUtils(ctx, domainProject, request.ServiceId)
if err != nil {
log.Error(fmt.Sprintf("delete service[%s]'s tags %v failed, get service tags failed, operator: %s",
request.ServiceId, request.Keys, remoteIP), err)
@@ -1737,10 +1729,18 @@ func (ds *MetadataManager) DeleteTags(ctx context.Context, request *pb.DeleteSer
key := path.GenerateServiceTagKey(domainProject, request.ServiceId)
- resp, err := etcdadpt.TxnWithCmp(ctx,
- etcdadpt.Ops(etcdadpt.OpPut(etcdadpt.WithStrKey(key), etcdadpt.WithValue(data))),
- etcdadpt.If(etcdadpt.NotEqualVer(path.GenerateServiceKey(domainProject, request.ServiceId), 0)),
- nil)
+ opts := etcdadpt.Ops(etcdadpt.OpPut(etcdadpt.WithStrKey(key), etcdadpt.WithValue(data)))
+ syncOpts, err := esync.GenDeleteOpts(ctx, datasource.ResourceKV, key, data,
+ esync.WithOpts(map[string]string{"key": key}))
+ if err != nil {
+ return &pb.DeleteServiceTagsResponse{
+ Response: pb.CreateResponse(pb.ErrInternal, err.Error()),
+ }, err
+ }
+ opts = append(opts, syncOpts...)
+
+ resp, err := etcdadpt.TxnWithCmp(ctx, opts,
+ etcdadpt.If(etcdadpt.NotEqualVer(path.GenerateServiceKey(domainProject, request.ServiceId), 0)), nil)
if err != nil {
log.Error(fmt.Sprintf("delete service[%s]'s tags %v failed, operator: %s",
request.ServiceId, request.Keys, remoteIP), err)
@@ -1786,7 +1786,7 @@ func (ds *MetadataManager) modifySchemas(ctx context.Context, domainProject stri
}
service.Schemas = nonExistSchemaIds
- opt, err := serviceUtil.UpdateService(domainProject, serviceID, service)
+ opt, err := eutil.UpdateService(domainProject, serviceID, service)
if err != nil {
log.Error(fmt.Sprintf("modify service[%s] schemas failed, update service.Schemas failed, operator: %s",
serviceID, remoteIP), err)
@@ -1851,7 +1851,7 @@ func (ds *MetadataManager) modifySchemas(ctx context.Context, domainProject stri
}
service.Schemas = schemaIDs
- opt, err := serviceUtil.UpdateService(domainProject, serviceID, service)
+ opt, err := eutil.UpdateService(domainProject, serviceID, service)
if err != nil {
log.Error(fmt.Sprintf("modify service[%s] schemas failed, update service.Schemas failed, operator: %s",
serviceID, remoteIP), err)
@@ -1883,7 +1883,7 @@ func (ds *MetadataManager) modifySchema(ctx context.Context, serviceID string, s
domainProject := util.ParseDomainProject(ctx)
schemaID := schema.SchemaId
- microService, err := serviceUtil.GetService(ctx, domainProject, serviceID)
+ microService, err := eutil.GetService(ctx, domainProject, serviceID)
if err != nil {
if errors.Is(err, datasource.ErrNoData) {
log.Debug(fmt.Sprintf("microService does not exist, modify schema[%s/%s] failed, operator: %s",
@@ -1933,7 +1933,7 @@ func (ds *MetadataManager) modifySchema(ctx context.Context, serviceID string, s
if len(microService.Schemas) == 0 {
microService.Schemas = append(microService.Schemas, schemaID)
- opt, err := serviceUtil.UpdateService(domainProject, serviceID, microService)
+ opt, err := eutil.UpdateService(domainProject, serviceID, microService)
if err != nil {
log.Error(fmt.Sprintf("modify schema[%s/%s] failed, update microService.Schemas failed, operator: %s",
serviceID, schemaID, remoteIP), err)
@@ -1944,7 +1944,7 @@ func (ds *MetadataManager) modifySchema(ctx context.Context, serviceID string, s
} else {
if !isExist {
microService.Schemas = append(microService.Schemas, schemaID)
- opt, err := serviceUtil.UpdateService(domainProject, serviceID, microService)
+ opt, err := eutil.UpdateService(domainProject, serviceID, microService)
if err != nil {
log.Error(fmt.Sprintf("modify schema[%s/%s] failed, update microService.Schemas failed, operator: %s",
serviceID, schemaID, remoteIP), err)
@@ -1984,7 +1984,7 @@ func (ds *MetadataManager) DeleteServicePri(ctx context.Context, serviceID strin
return pb.CreateResponse(pb.ErrInvalidParams, err.Error()), nil
}
- microservice, err := serviceUtil.GetService(ctx, domainProject, serviceID)
+ microservice, err := eutil.GetService(ctx, domainProject, serviceID)
if err != nil {
if errors.Is(err, datasource.ErrNoData) {
log.Debug(fmt.Sprintf("service does not exist, %s micro-service[%s] failed, operator: %s",
@@ -1998,7 +1998,7 @@ func (ds *MetadataManager) DeleteServicePri(ctx context.Context, serviceID strin
// 强制删除,则与该服务相关的信息删除,非强制删除: 如果作为该被依赖(作为provider,提供服务,且不是只存在自依赖)或者存在实例,则不能删除
if !force {
- services, err := serviceUtil.GetConsumerIds(ctx, domainProject, microservice)
+ services, err := eutil.GetConsumerIds(ctx, domainProject, microservice)
if err != nil {
log.Error(fmt.Sprintf("delete micro-service[%s] failed, get service dependency failed, operator: %s",
serviceID, remoteIP), err)
@@ -2042,26 +2042,16 @@ func (ds *MetadataManager) DeleteServicePri(ctx context.Context, serviceID strin
etcdadpt.OpDel(etcdadpt.WithStrKey(path.GenerateServiceAliasKey(serviceKey))),
etcdadpt.OpDel(etcdadpt.WithStrKey(serviceIDKey)),
}
-
- if datasource.EnableSync {
- domain := util.ParseDomain(ctx)
- project := util.ParseProject(ctx)
- taskOpt, err := GenTaskOpts(domain, project, sync.DeleteAction, datasource.ResourceService,
- &pb.DeleteServiceRequest{ServiceId: serviceID, Force: force})
- if err != nil {
- log.Error("fail to create task", err)
- return pb.CreateResponse(pb.ErrInternal, err.Error()), err
- }
- tombstoneOpt, err := GenTombstoneOpts(domain, project, datasource.ResourceService, serviceID)
- if err != nil {
- log.Error("fail to create tombstone", err)
- return pb.CreateResponse(pb.ErrInternal, err.Error()), err
- }
- opts = append(opts, taskOpt, tombstoneOpt)
+ syncOpts, err := esync.GenDeleteOpts(ctx, datasource.ResourceService, serviceID,
+ &pb.DeleteServiceRequest{ServiceId: serviceID, Force: force})
+ if err != nil {
+ log.Error("fail to sync opt", err)
+ return pb.CreateResponse(pb.ErrInternal, err.Error()), err
}
+ opts = append(opts, syncOpts...)
//删除依赖规则
- optDeleteDep, err := serviceUtil.DeleteDependencyForDeleteService(domainProject, serviceID, serviceKey)
+ optDeleteDep, err := eutil.DeleteDependencyForDeleteService(domainProject, serviceID, serviceKey)
if err != nil {
log.Error(fmt.Sprintf("%s micro-service[%s] failed, delete dependency failed, operator: %s",
title, serviceID, remoteIP), err)
@@ -2093,7 +2083,7 @@ func (ds *MetadataManager) DeleteServicePri(ctx context.Context, serviceID strin
etcdadpt.WithPrefix()))
//删除实例
- err = serviceUtil.DeleteServiceAllInstances(ctx, serviceID)
+ err = eutil.DeleteServiceAllInstances(ctx, serviceID)
if err != nil {
log.Error(fmt.Sprintf("%s micro-service[%s] failed, revoke all instances failed, operator: %s",
title, serviceID, remoteIP), err)
diff --git a/datasource/etcd/role.go b/datasource/etcd/role.go
index 9e117d9..d2a3735 100644
--- a/datasource/etcd/role.go
+++ b/datasource/etcd/role.go
@@ -24,19 +24,19 @@ import (
"strconv"
"time"
- rbacmodel "github.com/go-chassis/cari/rbac"
- "github.com/go-chassis/cari/sync"
+ crbac "github.com/go-chassis/cari/rbac"
"github.com/little-cui/etcdadpt"
"github.com/apache/servicecomb-service-center/datasource"
"github.com/apache/servicecomb-service-center/datasource/etcd/path"
+ esync "github.com/apache/servicecomb-service-center/datasource/etcd/sync"
"github.com/apache/servicecomb-service-center/datasource/rbac"
"github.com/apache/servicecomb-service-center/pkg/etcdsync"
"github.com/apache/servicecomb-service-center/pkg/log"
"github.com/apache/servicecomb-service-center/pkg/util"
)
-func (rm *RbacDAO) CreateRole(ctx context.Context, r *rbacmodel.Role) error {
+func (rm *RbacDAO) CreateRole(ctx context.Context, r *crbac.Role) error {
lock, err := etcdsync.Lock("/role-creating/"+r.Name, -1, false)
if err != nil {
return fmt.Errorf("role %s is creating", r.Name)
@@ -67,14 +67,12 @@ func (rm *RbacDAO) CreateRole(ctx context.Context, r *rbacmodel.Role) error {
opts := []etcdadpt.OpOptions{
etcdadpt.OpPut(etcdadpt.WithStrKey(key), etcdadpt.WithValue(value)),
}
- if datasource.EnableSync {
- taskOpt, err := GenTaskOpts("", "", sync.CreateAction, datasource.ResourceRole, r)
- if err != nil {
- log.Error("", err)
- return err
- }
- opts = append(opts, taskOpt)
+ syncOpts, err := esync.GenCreateOpts(ctx, datasource.ResourceRole, r)
+ if err != nil {
+ log.Error("fail to create sync opts", err)
+ return err
}
+ opts = append(opts, syncOpts...)
err = etcdadpt.Txn(ctx, opts)
if err != nil {
log.Error("can not save account info", err)
@@ -88,7 +86,7 @@ func (rm *RbacDAO) RoleExist(ctx context.Context, name string) (bool, error) {
return etcdadpt.Exist(ctx, path.GenerateRBACRoleKey(name))
}
-func (rm *RbacDAO) GetRole(ctx context.Context, name string) (*rbacmodel.Role, error) {
+func (rm *RbacDAO) GetRole(ctx context.Context, name string) (*crbac.Role, error) {
kv, err := etcdadpt.Get(ctx, path.GenerateRBACRoleKey(name))
if err != nil {
return nil, err
@@ -96,7 +94,7 @@ func (rm *RbacDAO) GetRole(ctx context.Context, name string) (*rbacmodel.Role, e
if kv == nil {
return nil, rbac.ErrRoleNotExist
}
- role := &rbacmodel.Role{}
+ role := &crbac.Role{}
err = json.Unmarshal(kv.Value, role)
if err != nil {
log.Error("role info format invalid", err)
@@ -104,14 +102,14 @@ func (rm *RbacDAO) GetRole(ctx context.Context, name string) (*rbacmodel.Role, e
}
return role, nil
}
-func (rm *RbacDAO) ListRole(ctx context.Context) ([]*rbacmodel.Role, int64, error) {
+func (rm *RbacDAO) ListRole(ctx context.Context) ([]*crbac.Role, int64, error) {
kvs, n, err := etcdadpt.List(ctx, path.GenerateRBACRoleKey(""))
if err != nil {
return nil, 0, err
}
- roles := make([]*rbacmodel.Role, 0, n)
+ roles := make([]*crbac.Role, 0, n)
for _, v := range kvs {
- r := &rbacmodel.Role{}
+ r := &crbac.Role{}
err = json.Unmarshal(v.Value, r)
if err != nil {
log.Error("role info format invalid:", err)
@@ -132,19 +130,12 @@ func (rm *RbacDAO) DeleteRole(ctx context.Context, name string) (bool, error) {
return false, rbac.ErrRoleBindingExist
}
opts := []etcdadpt.OpOptions{etcdadpt.OpDel(etcdadpt.WithStrKey(path.GenerateRBACRoleKey(name)))}
- if datasource.EnableSync {
- taskOpt, err := GenTaskOpts("", "", sync.DeleteAction, datasource.ResourceRole, name)
- if err != nil {
- log.Error("", err)
- return false, err
- }
- tombstoneOpt, err := GenTombstoneOpts("", "", datasource.ResourceRole, name)
- if err != nil {
- log.Error("", err)
- return false, err
- }
- opts = append(opts, taskOpt, tombstoneOpt)
+ syncOpts, err := esync.GenDeleteOpts(ctx, datasource.ResourceRole, name, name)
+ if err != nil {
+ log.Error("fail to create sync opts", err)
+ return false, err
}
+ opts = append(opts, syncOpts...)
err = etcdadpt.Txn(ctx, opts)
if err != nil {
return false, err
@@ -159,7 +150,7 @@ func RoleBindingExists(ctx context.Context, role string) (bool, error) {
}
return total > 0, nil
}
-func (rm *RbacDAO) UpdateRole(ctx context.Context, name string, role *rbacmodel.Role) error {
+func (rm *RbacDAO) UpdateRole(ctx context.Context, name string, role *crbac.Role) error {
role.UpdateTime = strconv.FormatInt(time.Now().Unix(), 10)
value, err := json.Marshal(role)
if err != nil {
@@ -169,13 +160,11 @@ func (rm *RbacDAO) UpdateRole(ctx context.Context, name string, role *rbacmodel.
opts := []etcdadpt.OpOptions{
etcdadpt.OpPut(etcdadpt.WithStrKey(path.GenerateRBACRoleKey(name)), etcdadpt.WithValue(value)),
}
- if datasource.EnableSync {
- taskOpt, err := GenTaskOpts("", "", sync.UpdateAction, datasource.ResourceRole, role)
- if err != nil {
- log.Error("", err)
- return err
- }
- opts = append(opts, taskOpt)
+ syncOpts, err := esync.GenUpdateOpts(ctx, datasource.ResourceRole, role)
+ if err != nil {
+ log.Error("fail to create sync opts", err)
+ return err
}
+ opts = append(opts, syncOpts...)
return etcdadpt.Txn(ctx, opts)
}
diff --git a/datasource/etcd/role_test.go b/datasource/etcd/role_test.go
index 0a447c5..a9af11f 100644
--- a/datasource/etcd/role_test.go
+++ b/datasource/etcd/role_test.go
@@ -22,7 +22,7 @@ import (
"strconv"
"testing"
- rbacmodel "github.com/go-chassis/cari/rbac"
+ crbac "github.com/go-chassis/cari/rbac"
"github.com/stretchr/testify/assert"
"github.com/apache/servicecomb-service-center/datasource"
@@ -39,7 +39,7 @@ func TestSyncRole(t *testing.T) {
t.Run("create role", func(t *testing.T) {
t.Run("creating a role and delete it will create two tasks and a tombstone should pass", func(t *testing.T) {
- r1 := rbacmodel.Role{
+ r1 := crbac.Role{
ID: "create-11111",
Name: "create-role",
Perms: nil,
@@ -78,12 +78,12 @@ func TestSyncRole(t *testing.T) {
t.Run("update role", func(t *testing.T) {
t.Run("create two roles ,then update them, finally delete them, will create six tasks and two tombstones should pass",
func(t *testing.T) {
- r2 := rbacmodel.Role{
+ r2 := crbac.Role{
ID: "update-22222",
Name: "update-role-22222",
Perms: nil,
}
- r3 := rbacmodel.Role{
+ r3 := crbac.Role{
ID: "update-33333",
Name: "update-role-33333",
Perms: nil,
diff --git a/datasource/etcd/sync/sync.go b/datasource/etcd/sync/sync.go
new file mode 100644
index 0000000..28f4112
--- /dev/null
+++ b/datasource/etcd/sync/sync.go
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package sync
+
+import (
+ "context"
+ "encoding/json"
+
+ "github.com/go-chassis/cari/sync"
+ "github.com/little-cui/etcdadpt"
+
+ "github.com/apache/servicecomb-service-center/datasource"
+ "github.com/apache/servicecomb-service-center/eventbase/datasource/etcd/key"
+ "github.com/apache/servicecomb-service-center/pkg/util"
+)
+
+type Options struct {
+ ResourceID string
+ Opts map[string]string
+}
+
+type Option func(options *Options)
+
+func NewSyncOptions() Options {
+ return Options{}
+}
+
+func WithResourceID(resourceID string) Option {
+ return func(options *Options) {
+ options.ResourceID = resourceID
+ }
+}
+
+func WithOpts(opts map[string]string) Option {
+ return func(options *Options) {
+ options.Opts = opts
+ }
+}
+
+func GenCreateOpts(ctx context.Context, resourceType string, resource interface{},
+ options ...Option) ([]etcdadpt.OpOptions, error) {
+ return genOpts(ctx, sync.CreateAction, resourceType, resource, options...)
+}
+
+func GenUpdateOpts(ctx context.Context, resourceType string, resource interface{},
+ options ...Option) ([]etcdadpt.OpOptions, error) {
+ return genOpts(ctx, sync.UpdateAction, resourceType, resource, options...)
+}
+
+func GenDeleteOpts(ctx context.Context, resourceType, resourceID string, resource interface{},
+ options ...Option) ([]etcdadpt.OpOptions, error) {
+ options = append(options, WithResourceID(resourceID))
+ return genOpts(ctx, sync.DeleteAction, resourceType, resource, options...)
+
+}
+
+func genOpts(ctx context.Context, action string, resourceType string, resource interface{},
+ options ...Option) ([]etcdadpt.OpOptions, error) {
+ if !datasource.EnableSync {
+ return nil, nil
+ }
+ syncOpts := NewSyncOptions()
+ for _, option := range options {
+ option(&syncOpts)
+ }
+ taskOpt, err := genTaskOpt(ctx, action, resourceType, resource, &syncOpts)
+ if err != nil {
+ return nil, err
+ }
+ if action != sync.DeleteAction {
+ return []etcdadpt.OpOptions{taskOpt}, nil
+ }
+ tombstoneOpt, err := genTombstoneOpt(ctx, resourceType, syncOpts.ResourceID)
+ if err != nil {
+ return nil, err
+ }
+ return []etcdadpt.OpOptions{taskOpt, tombstoneOpt}, nil
+}
+
+func genTaskOpt(ctx context.Context, action string, resourceType string, resource interface{},
+ syncOpts *Options) (etcdadpt.OpOptions, error) {
+ domain := util.ParseDomain(ctx)
+ project := util.ParseProject(ctx)
+ task, err := sync.NewTask(domain, project, action, resourceType, resource)
+ if err != nil {
+ return etcdadpt.OpOptions{}, err
+ }
+ if syncOpts.Opts != nil {
+ task.Opts = syncOpts.Opts
+ }
+ taskBytes, err := json.Marshal(task)
+ if err != nil {
+ return etcdadpt.OpOptions{}, err
+ }
+ taskOpPut := etcdadpt.OpPut(etcdadpt.WithStrKey(key.TaskKey(domain, project,
+ task.ID, task.Timestamp)), etcdadpt.WithValue(taskBytes))
+ return taskOpPut, nil
+}
+
+func genTombstoneOpt(ctx context.Context, resourceType, resourceID string) (etcdadpt.OpOptions, error) {
+ domain := util.ParseDomain(ctx)
+ project := util.ParseProject(ctx)
+ tombstone := sync.NewTombstone(domain, project, resourceType, resourceID)
+ tombstoneBytes, err := json.Marshal(tombstone)
+ if err != nil {
+ return etcdadpt.OpOptions{}, err
+ }
+ tombstoneOpPut := etcdadpt.OpPut(etcdadpt.WithStrKey(key.TombstoneKey(domain, project, tombstone.ResourceType,
+ tombstone.ResourceID)), etcdadpt.WithValue(tombstoneBytes))
+ return tombstoneOpPut, nil
+}
diff --git a/datasource/etcd/sync/sync_test.go b/datasource/etcd/sync/sync_test.go
new file mode 100644
index 0000000..c723ba4
--- /dev/null
+++ b/datasource/etcd/sync/sync_test.go
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package sync_test
+
+import (
+ "context"
+ "testing"
+
+ pb "github.com/go-chassis/cari/discovery"
+ "github.com/stretchr/testify/assert"
+
+ "github.com/apache/servicecomb-service-center/datasource"
+ "github.com/apache/servicecomb-service-center/datasource/etcd/sync"
+ "github.com/apache/servicecomb-service-center/pkg/util"
+)
+
+func optsContext() context.Context {
+ return util.WithNoCache(util.SetDomainProject(context.Background(), "sync-opts",
+ "sync-opts"))
+}
+
+func TestOpts(t *testing.T) {
+ datasource.EnableSync = true
+
+ t.Run("create func will create a task opt should pass", func(t *testing.T) {
+ opts, err := sync.GenCreateOpts(optsContext(), datasource.ResourceService, &pb.CreateServiceRequest{})
+ assert.Nil(t, err)
+ assert.Equal(t, 1, len(opts))
+ })
+
+ t.Run("update func will create a task opt should pass", func(t *testing.T) {
+ opts, err := sync.GenUpdateOpts(optsContext(), datasource.ResourceService, &pb.UpdateServicePropsRequest{})
+ assert.Nil(t, err)
+ assert.Equal(t, 1, len(opts))
+ })
+
+ t.Run("delete func will create a task and a tombstone should pass", func(t *testing.T) {
+ opts, err := sync.GenDeleteOpts(optsContext(), datasource.ResourceService, "11111", &pb.DeleteServiceRequest{})
+ assert.Nil(t, err)
+ assert.Equal(t, 2, len(opts))
+ })
+ datasource.EnableSync = false
+}
diff --git a/datasource/etcd/tag_test.go b/datasource/etcd/tag_test.go
new file mode 100644
index 0000000..829fbe1
--- /dev/null
+++ b/datasource/etcd/tag_test.go
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package etcd_test
+
+import (
+ "context"
+ "testing"
+
+ pb "github.com/go-chassis/cari/discovery"
+ "github.com/go-chassis/cari/sync"
+ "github.com/stretchr/testify/assert"
+
+ "github.com/apache/servicecomb-service-center/datasource"
+ "github.com/apache/servicecomb-service-center/eventbase/model"
+ "github.com/apache/servicecomb-service-center/eventbase/service/task"
+ "github.com/apache/servicecomb-service-center/eventbase/service/tombstone"
+ "github.com/apache/servicecomb-service-center/pkg/util"
+ _ "github.com/apache/servicecomb-service-center/test"
+)
+
+func tagContext() context.Context {
+ return util.WithNoCache(util.SetDomainProject(context.Background(), "sync-tag",
+ "sync-tag"))
+}
+
+func TestSyncTag(t *testing.T) {
+ var serviceID string
+ datasource.EnableSync = true
+ t.Run("create service", func(t *testing.T) {
+ t.Run("register a micro service will create a task should pass", func(t *testing.T) {
+ resp, err := datasource.GetMetadataManager().RegisterService(tagContext(), &pb.CreateServiceRequest{
+ Service: &pb.MicroService{
+ AppId: "sync_tag_group",
+ ServiceName: "sync_micro_service_tag",
+ Version: "1.0.0",
+ Level: "FRONT",
+ Status: pb.MS_UP,
+ },
+ })
+ assert.NotNil(t, resp)
+ assert.NoError(t, err)
+ assert.Equal(t, pb.ResponseSuccess, resp.Response.GetCode())
+ serviceID = resp.ServiceId
+ listTaskReq := model.ListTaskRequest{
+ Domain: "sync-tag",
+ Project: "sync-tag",
+ ResourceType: datasource.ResourceService,
+ Action: sync.CreateAction,
+ Status: sync.PendingStatus,
+ }
+ tasks, err := task.List(tagContext(), &listTaskReq)
+ assert.NoError(t, err)
+ assert.Equal(t, 1, len(tasks))
+ err = task.Delete(tagContext(), tasks...)
+ assert.NoError(t, err)
+ tasks, err = task.List(tagContext(), &listTaskReq)
+ assert.NoError(t, err)
+ assert.Equal(t, 0, len(tasks))
+ })
+ })
+
+ t.Run("add tags", func(t *testing.T) {
+ t.Run("add tags for a service will create a task should pass", func(t *testing.T) {
+ respAddTages, err := datasource.GetMetadataManager().AddTags(tagContext(), &pb.AddServiceTagsRequest{
+ ServiceId: serviceID,
+ Tags: map[string]string{
+ "a": "test",
+ "b": "b",
+ },
+ })
+ assert.NoError(t, err)
+ assert.Equal(t, pb.ResponseSuccess, respAddTages.Response.GetCode())
+ listTaskReq := model.ListTaskRequest{
+ Domain: "sync-tag",
+ Project: "sync-tag",
+ ResourceType: datasource.ResourceKV,
+ Action: sync.UpdateAction,
+ Status: sync.PendingStatus,
+ }
+ tasks, err := task.List(tagContext(), &listTaskReq)
+ assert.NoError(t, err)
+ assert.Equal(t, 1, len(tasks))
+ err = task.Delete(tagContext(), tasks...)
+ assert.NoError(t, err)
+ tasks, err = task.List(tagContext(), &listTaskReq)
+ assert.NoError(t, err)
+ assert.Equal(t, 0, len(tasks))
+ })
+ })
+
+ t.Run("update a tag", func(t *testing.T) {
+ t.Run("update a service tag will create a task should pass", func(t *testing.T) {
+ resp, err := datasource.GetMetadataManager().UpdateTag(tagContext(), &pb.UpdateServiceTagRequest{
+ ServiceId: serviceID,
+ Key: "a",
+ Value: "update",
+ })
+ assert.NoError(t, err)
+ assert.Equal(t, pb.ResponseSuccess, resp.Response.GetCode())
+ listTaskReq := model.ListTaskRequest{
+ Domain: "sync-tag",
+ Project: "sync-tag",
+ ResourceType: datasource.ResourceKV,
+ Action: sync.UpdateAction,
+ Status: sync.PendingStatus,
+ }
+ tasks, err := task.List(tagContext(), &listTaskReq)
+ assert.NoError(t, err)
+ assert.Equal(t, 1, len(tasks))
+ err = task.Delete(tagContext(), tasks...)
+ assert.NoError(t, err)
+ tasks, err = task.List(tagContext(), &listTaskReq)
+ assert.NoError(t, err)
+ assert.Equal(t, 0, len(tasks))
+ })
+ })
+
+ t.Run("delete tags", func(t *testing.T) {
+ t.Run("delete a service's tags will create a task and a tombstone should pass", func(t *testing.T) {
+ resp, err := datasource.GetMetadataManager().DeleteTags(tagContext(), &pb.DeleteServiceTagsRequest{
+ ServiceId: serviceID,
+ Keys: []string{"a", "b"},
+ })
+ assert.NoError(t, err)
+ assert.Equal(t, pb.ResponseSuccess, resp.Response.GetCode())
+ respGetTags, err := datasource.GetMetadataManager().GetTags(tagContext(), &pb.GetServiceTagsRequest{
+ ServiceId: serviceID,
+ })
+ assert.NoError(t, err)
+ assert.Equal(t, pb.ResponseSuccess, resp.Response.GetCode())
+ assert.Equal(t, "", respGetTags.Tags["a"])
+ assert.Equal(t, "", respGetTags.Tags["b"])
+ listTaskReq := model.ListTaskRequest{
+ Domain: "sync-tag",
+ Project: "sync-tag",
+ ResourceType: datasource.ResourceKV,
+ Action: sync.DeleteAction,
+ Status: sync.PendingStatus,
+ }
+ tasks, err := task.List(tagContext(), &listTaskReq)
+ assert.NoError(t, err)
+ assert.Equal(t, 1, len(tasks))
+ err = task.Delete(tagContext(), tasks...)
+ assert.NoError(t, err)
+ tasks, err = task.List(tagContext(), &listTaskReq)
+ assert.NoError(t, err)
+ assert.Equal(t, 0, len(tasks))
+ tombstoneListReq := model.ListTombstoneRequest{
+ Domain: "sync-tag",
+ Project: "sync-tag",
+ ResourceType: datasource.ResourceKV,
+ }
+ tombstones, err := tombstone.List(tagContext(), &tombstoneListReq)
+ assert.NoError(t, err)
+ assert.Equal(t, 1, len(tombstones))
+ err = tombstone.Delete(tagContext(), tombstones...)
+ assert.NoError(t, err)
+ })
+ })
+
+ t.Run("unregister service", func(t *testing.T) {
+ t.Run("unregister a service will create a task and a tombstone should pass", func(t *testing.T) {
+ resp, err := datasource.GetMetadataManager().UnregisterService(tagContext(), &pb.DeleteServiceRequest{
+ ServiceId: serviceID,
+ Force: true,
+ })
+ assert.NotNil(t, resp)
+ assert.NoError(t, err)
+ assert.Equal(t, pb.ResponseSuccess, resp.Response.GetCode())
+ listTaskReq := model.ListTaskRequest{
+ Domain: "sync-tag",
+ Project: "sync-tag",
+ ResourceType: datasource.ResourceService,
+ Action: sync.DeleteAction,
+ Status: sync.PendingStatus,
+ }
+ tasks, err := task.List(tagContext(), &listTaskReq)
+ assert.NoError(t, err)
+ assert.Equal(t, 1, len(tasks))
+ err = task.Delete(tagContext(), tasks...)
+ assert.NoError(t, err)
+ tasks, err = task.List(tagContext(), &listTaskReq)
+ assert.NoError(t, err)
+ assert.Equal(t, 0, len(tasks))
+ tombstoneListReq := model.ListTombstoneRequest{
+ Domain: "sync-tag",
+ Project: "sync-tag",
+ ResourceType: datasource.ResourceService,
+ }
+ tombstones, err := tombstone.List(tagContext(), &tombstoneListReq)
+ assert.NoError(t, err)
+ assert.Equal(t, 1, len(tombstones))
+ err = tombstone.Delete(tagContext(), tombstones...)
+ assert.NoError(t, err)
+ })
+ })
+
+ datasource.EnableSync = false
+}
diff --git a/datasource/etcd/task_util.go b/datasource/etcd/task_util.go
deleted file mode 100644
index 232a841..0000000
--- a/datasource/etcd/task_util.go
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package etcd
-
-import (
- "encoding/json"
-
- "github.com/go-chassis/cari/sync"
- "github.com/little-cui/etcdadpt"
-
- "github.com/apache/servicecomb-service-center/eventbase/datasource/etcd/key"
-)
-
-func GenTaskOpts(domain, project, action, resourceType string, resource interface{}) (etcdadpt.OpOptions, error) {
- task, err := sync.NewTask(domain, project, action, resourceType, resource)
- if err != nil {
- return etcdadpt.OpOptions{}, err
- }
- taskBytes, err := json.Marshal(task)
- if err != nil {
- return etcdadpt.OpOptions{}, err
- }
- taskOpPut := etcdadpt.OpPut(etcdadpt.WithStrKey(key.TaskKey(domain, project,
- task.ID, task.Timestamp)), etcdadpt.WithValue(taskBytes))
- return taskOpPut, nil
-}
diff --git a/datasource/etcd/tombstone_util.go b/datasource/etcd/tombstone_util.go
deleted file mode 100644
index 6a968c0..0000000
--- a/datasource/etcd/tombstone_util.go
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package etcd
-
-import (
- "encoding/json"
-
- "github.com/go-chassis/cari/sync"
- "github.com/little-cui/etcdadpt"
-
- "github.com/apache/servicecomb-service-center/eventbase/datasource/etcd/key"
-)
-
-func GenTombstoneOpts(domain, project, resourceType, resourceID string) (etcdadpt.OpOptions, error) {
- tombstone := sync.NewTombstone(domain, project, resourceType, resourceID)
- tombstoneBytes, err := json.Marshal(tombstone)
- if err != nil {
- return etcdadpt.OpOptions{}, err
- }
- tombstoneOpPut := etcdadpt.OpPut(etcdadpt.WithStrKey(key.TombstoneKey(domain, project, tombstone.ResourceType,
- tombstone.ResourceID)), etcdadpt.WithValue(tombstoneBytes))
- return tombstoneOpPut, nil
-}
diff --git a/datasource/etcd/util/tag_util.go b/datasource/etcd/util/tag_util.go
index a65fcc7..1b33f80 100644
--- a/datasource/etcd/util/tag_util.go
+++ b/datasource/etcd/util/tag_util.go
@@ -22,12 +22,15 @@ import (
"encoding/json"
"fmt"
- "github.com/apache/servicecomb-service-center/datasource/etcd/path"
- "github.com/apache/servicecomb-service-center/datasource/etcd/sd"
- "github.com/apache/servicecomb-service-center/pkg/log"
"github.com/go-chassis/cari/discovery"
"github.com/go-chassis/cari/pkg/errsvc"
"github.com/little-cui/etcdadpt"
+
+ "github.com/apache/servicecomb-service-center/datasource"
+ "github.com/apache/servicecomb-service-center/datasource/etcd/path"
+ "github.com/apache/servicecomb-service-center/datasource/etcd/sd"
+ esync "github.com/apache/servicecomb-service-center/datasource/etcd/sync"
+ "github.com/apache/servicecomb-service-center/pkg/log"
)
func AddTagIntoETCD(ctx context.Context, domainProject string, serviceID string, dataTags map[string]string) *errsvc.Error {
@@ -37,10 +40,14 @@ func AddTagIntoETCD(ctx context.Context, domainProject string, serviceID string,
return discovery.NewError(discovery.ErrInternal, err.Error())
}
- resp, err := etcdadpt.TxnWithCmp(ctx,
- etcdadpt.Ops(etcdadpt.OpPut(etcdadpt.WithStrKey(key), etcdadpt.WithValue(data))),
- etcdadpt.If(etcdadpt.NotEqualVer(path.GenerateServiceKey(domainProject, serviceID), 0)),
- nil)
+ opts := etcdadpt.Ops(etcdadpt.OpPut(etcdadpt.WithStrKey(key), etcdadpt.WithValue(data)))
+ syncOpts, err := esync.GenUpdateOpts(ctx, datasource.ResourceKV, data, esync.WithOpts(map[string]string{"key": key}))
+ if err != nil {
+ return discovery.NewError(discovery.ErrInternal, err.Error())
+ }
+ opts = append(opts, syncOpts...)
+ resp, err := etcdadpt.TxnWithCmp(ctx, opts,
+ etcdadpt.If(etcdadpt.NotEqualVer(path.GenerateServiceKey(domainProject, serviceID), 0)), nil)
if err != nil {
return discovery.NewError(discovery.ErrUnavailableBackend, err.Error())
}