You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by ti...@apache.org on 2021/02/07 08:44:06 UTC
[servicecomb-service-center] branch master updated: [SCB-2094] Add
cache heartbeat mode in mongo (#845) (#844)
This is an automated email from the ASF dual-hosted git repository.
tianxiaoliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-service-center.git
The following commit(s) were added to refs/heads/master by this push:
new 2ce0b6c [SCB-2094] Add cache heartbeat mode in mongo (#845) (#844)
2ce0b6c is described below
commit 2ce0b6c202bf23a92a412f442c148bd376ba0d2c
Author: robotLJW <79...@qq.com>
AuthorDate: Sun Feb 7 16:43:58 2021 +0800
[SCB-2094] Add cache heartbeat mode in mongo (#845) (#844)
* [SCB-2094] Add cache heartbeat mode in mongo
* [SCB-2094] Modify the field containing info in struct
---
datasource/mongo/bootstrap/bootstrap.go | 3 +-
datasource/mongo/database.go | 43 ++--
datasource/mongo/dep.go | 58 ++---
datasource/mongo/dependency_query.go | 54 ++--
datasource/mongo/engine.go | 2 +-
datasource/mongo/event/instance_event_handler.go | 14 +-
.../mongo/event/instance_event_handler_test.go | 4 +-
datasource/mongo/heartbeat/cache/heartbeat.go | 218 ++++++++++++++++
datasource/mongo/heartbeat/cache/heartbeat_test.go | 137 ++++++++++
datasource/mongo/heartbeat/cache/heartbeatcache.go | 115 +++++++++
.../mongo/heartbeat/cache/heartbeatcache_test.go | 86 +++++++
.../{heartbeatchecker => checker}/heartbeat.go | 6 +-
.../heartbeat_test.go | 14 +-
.../heartbeatchecker.go | 4 +-
.../heartbeatchecker_test.go | 10 +-
datasource/mongo/heartbeat/manager_test.go | 4 +-
datasource/mongo/mongo.go | 16 +-
datasource/mongo/ms.go | 276 ++++++++++-----------
datasource/mongo/rule_util.go | 2 +-
datasource/mongo/sd/listwatch_inner.go | 8 +-
datasource/mongo/sd/listwatch_test.go | 6 +-
datasource/mongo/sd/mongo_cacher.go | 12 +-
datasource/mongo/sd/mongo_cacher_test.go | 6 +-
datasource/mongo/sd/types.go | 16 +-
datasource/mongo/system.go | 6 +-
datasource/mongo/util.go | 2 +-
etc/conf/app.conf | 2 -
etc/conf/app.yaml | 11 +
test/test.go | 2 +-
29 files changed, 851 insertions(+), 286 deletions(-)
diff --git a/datasource/mongo/bootstrap/bootstrap.go b/datasource/mongo/bootstrap/bootstrap.go
index 1dc6119..47b4dcf 100644
--- a/datasource/mongo/bootstrap/bootstrap.go
+++ b/datasource/mongo/bootstrap/bootstrap.go
@@ -21,7 +21,8 @@ import (
_ "github.com/apache/servicecomb-service-center/datasource/mongo"
// heartbeat
- _ "github.com/apache/servicecomb-service-center/datasource/mongo/heartbeat/heartbeatchecker"
+ _ "github.com/apache/servicecomb-service-center/datasource/mongo/heartbeat/cache"
+ _ "github.com/apache/servicecomb-service-center/datasource/mongo/heartbeat/checker"
// events
_ "github.com/apache/servicecomb-service-center/datasource/mongo/event"
diff --git a/datasource/mongo/database.go b/datasource/mongo/database.go
index f98f318..876ec33 100644
--- a/datasource/mongo/database.go
+++ b/datasource/mongo/database.go
@@ -44,7 +44,7 @@ const (
ColumnSchemaID = "schemaid"
ColumnServiceID = "serviceid"
ColumnRuleID = "ruleid"
- ColumnServiceInfo = "serviceinfo"
+ ColumnService = "service"
ColumnProperty = "properties"
ColumnModTime = "modtimestamp"
ColumnEnv = "env"
@@ -57,12 +57,12 @@ const (
ColumnPattern = "pattern"
ColumnDescription = "description"
ColumnRuleType = "ruletype"
- ColumnSchemaInfo = "schemainfo"
+ ColumnSchema = "schema"
ColumnSchemaSummary = "schemasummary"
- ColumnDepInfo = "depinfo"
+ ColumnDep = "dep"
ColumnDependency = "dependency"
- ColumnRuleInfo = "ruleinfo"
- ColumnInstanceInfo = "instanceinfo"
+ ColumnRule = "rule"
+ ColumnInstance = "instance"
ColumnInstanceID = "instanceid"
ColumnConsumerID = "consumerid"
ColumnMongoID = "_id"
@@ -70,7 +70,6 @@ const (
ColumnServiceType = "type"
ColumnServiceKey = "servicekey"
ColumnConsumer = "consumer"
- ColumnDependencyInfo = "dependencyinfo"
ColumnID = "id"
ColumnAccountName = "name"
ColumnRoleName = "name"
@@ -84,10 +83,10 @@ const (
)
type Service struct {
- Domain string
- Project string
- Tags map[string]string
- ServiceInfo *pb.MicroService
+ Domain string
+ Project string
+ Tags map[string]string
+ Service *pb.MicroService
}
type Schema struct {
@@ -95,7 +94,7 @@ type Schema struct {
Project string
ServiceID string
SchemaID string
- SchemaInfo string
+ Schema string
SchemaSummary string
}
@@ -103,22 +102,22 @@ type Rule struct {
Domain string
Project string
ServiceID string
- RuleInfo *pb.ServiceRule
+ Rule *pb.ServiceRule
}
type Instance struct {
- Domain string
- Project string
- RefreshTime time.Time
- InstanceInfo *pb.MicroServiceInstance
+ Domain string
+ Project string
+ RefreshTime time.Time
+ Instance *pb.MicroServiceInstance
}
type ConsumerDep struct {
- Domain string
- Project string
- ConsumerID string
- UUID string
- ConsumerDepInfo *pb.ConsumerDependency
+ Domain string
+ Project string
+ ConsumerID string
+ UUID string
+ ConsumerDep *pb.ConsumerDependency
}
type DependencyRule struct {
@@ -126,7 +125,7 @@ type DependencyRule struct {
Domain string
Project string
ServiceKey *pb.MicroServiceKey
- DepInfo *pb.MicroServiceDependency
+ Dep *pb.MicroServiceDependency
}
type DelDepCacheKey struct {
diff --git a/datasource/mongo/dep.go b/datasource/mongo/dep.go
index 07634e6..1b0c600 100644
--- a/datasource/mongo/dep.go
+++ b/datasource/mongo/dep.go
@@ -54,11 +54,11 @@ func (ds *DataSource) SearchProviderDependency(ctx context.Context, request *dis
}, nil
}
- dr := NewProviderDependencyRelation(ctx, domainProject, provider.ServiceInfo)
+ dr := NewProviderDependencyRelation(ctx, domainProject, provider.Service)
services, err := dr.GetDependencyConsumers(ToDependencyFilterOptions(request)...)
if err != nil {
log.Error(fmt.Sprintf("GetProviderDependencies failed, provider is %s/%s/%s/%s",
- provider.ServiceInfo.Environment, provider.ServiceInfo.AppId, provider.ServiceInfo.ServiceName, provider.ServiceInfo.Version), err)
+ provider.Service.Environment, provider.Service.AppId, provider.Service.ServiceName, provider.Service.Version), err)
return &discovery.GetProDependenciesResponse{
Response: discovery.CreateResponse(discovery.ErrInternal, err.Error()),
}, err
@@ -93,11 +93,11 @@ func (ds *DataSource) SearchConsumerDependency(ctx context.Context, request *dis
}, nil
}
- dr := NewConsumerDependencyRelation(ctx, domainProject, consumer.ServiceInfo)
+ dr := NewConsumerDependencyRelation(ctx, domainProject, consumer.Service)
services, err := dr.GetDependencyProviders(ToDependencyFilterOptions(request)...)
if err != nil {
log.Error(fmt.Sprintf("query consumer failed, consumer is %s/%s/%s/%s",
- consumer.ServiceInfo.Environment, consumer.ServiceInfo.AppId, consumer.ServiceInfo.ServiceName, consumer.ServiceInfo.Version), err)
+ consumer.Service.Environment, consumer.Service.AppId, consumer.Service.ServiceName, consumer.Service.Version), err)
return &discovery.GetConDependenciesResponse{
Response: discovery.CreateResponse(discovery.ErrInternal, err.Error()),
}, err
@@ -109,16 +109,16 @@ func (ds *DataSource) SearchConsumerDependency(ctx context.Context, request *dis
}, nil
}
-func (ds *DataSource) AddOrUpdateDependencies(ctx context.Context, dependencyInfos []*discovery.ConsumerDependency, override bool) (*discovery.Response, error) {
+func (ds *DataSource) AddOrUpdateDependencies(ctx context.Context, dependencys []*discovery.ConsumerDependency, override bool) (*discovery.Response, error) {
domainProject := util.ParseDomainProject(ctx)
- for _, dependencyInfo := range dependencyInfos {
+ for _, dependency := range dependencys {
consumerFlag := util.StringJoin([]string{
- dependencyInfo.Consumer.Environment,
- dependencyInfo.Consumer.AppId,
- dependencyInfo.Consumer.ServiceName,
- dependencyInfo.Consumer.Version}, "/")
- consumerInfo := discovery.DependenciesToKeys([]*discovery.MicroServiceKey{dependencyInfo.Consumer}, domainProject)[0]
- providersInfo := discovery.DependenciesToKeys(dependencyInfo.Providers, domainProject)
+ dependency.Consumer.Environment,
+ dependency.Consumer.AppId,
+ dependency.Consumer.ServiceName,
+ dependency.Consumer.Version}, "/")
+ consumerInfo := discovery.DependenciesToKeys([]*discovery.MicroServiceKey{dependency.Consumer}, domainProject)[0]
+ providersInfo := discovery.DependenciesToKeys(dependency.Providers, domainProject)
rsp := datasource.ParamsChecker(consumerInfo, providersInfo)
if rsp != nil {
@@ -139,18 +139,18 @@ func (ds *DataSource) AddOrUpdateDependencies(ctx context.Context, dependencyInf
return discovery.CreateResponse(discovery.ErrServiceNotExists, fmt.Sprintf("Consumer %s does not exist.", consumerFlag)), nil
}
- dependencyInfo.Override = override
+ dependency.Override = override
if !override {
id := util.GenerateUUID()
domain := util.ParseDomain(ctx)
project := util.ParseProject(ctx)
data := &ConsumerDep{
- Domain: domain,
- Project: project,
- ConsumerID: consumerID,
- UUID: id,
- ConsumerDepInfo: dependencyInfo,
+ Domain: domain,
+ Project: project,
+ ConsumerID: consumerID,
+ UUID: id,
+ ConsumerDep: dependency,
}
insertRes, err := client.GetMongoClient().Insert(ctx, CollectionDep, data)
if err != nil {
@@ -159,7 +159,7 @@ func (ds *DataSource) AddOrUpdateDependencies(ctx context.Context, dependencyInf
}
log.Info(fmt.Sprintf("insert dep to mongodb success %s", insertRes.InsertedID))
}
- err = syncDependencyRule(ctx, domainProject, dependencyInfo)
+ err = syncDependencyRule(ctx, domainProject, dependency)
if err != nil {
return nil, err
}
@@ -216,14 +216,14 @@ func GetOldProviderRules(dep *datasource.Dependency) (*discovery.MicroServiceDep
if err != nil {
return nil, err
}
- return depRule.DepInfo, nil
+ return depRule.Dep, nil
}
func updateDeps(domainProject string, dep *datasource.Dependency) error {
var upsert = true
for _, r := range dep.DeleteDependencyRuleList {
filter := GenerateProviderDependencyRuleKey(domainProject, r)
- _, err := client.GetMongoClient().Update(context.TODO(), CollectionDep, filter, bson.M{"$pull": bson.M{StringBuilder([]string{ColumnDepInfo, ColumnDependency}): dep.Consumer}})
+ _, err := client.GetMongoClient().Update(context.TODO(), CollectionDep, filter, bson.M{"$pull": bson.M{StringBuilder([]string{ColumnDep, ColumnDependency}): dep.Consumer}})
if err != nil {
return err
}
@@ -234,7 +234,7 @@ func updateDeps(domainProject string, dep *datasource.Dependency) error {
for _, r := range dep.CreateDependencyRuleList {
filter := GenerateProviderDependencyRuleKey(domainProject, r)
data := bson.M{
- "$addToSet": bson.M{StringBuilder([]string{ColumnDepInfo, ColumnDependency}): dep.Consumer},
+ "$addToSet": bson.M{StringBuilder([]string{ColumnDep, ColumnDependency}): dep.Consumer},
}
_, err := client.GetMongoClient().Update(context.TODO(), CollectionDep, filter, data, &options.UpdateOptions{Upsert: &upsert})
if err != nil {
@@ -252,7 +252,7 @@ func updateDeps(domainProject string, dep *datasource.Dependency) error {
}
} else {
updateData := bson.M{
- "$set": bson.M{StringBuilder([]string{ColumnDepInfo, ColumnDependency}): dep.ProvidersRule},
+ "$set": bson.M{StringBuilder([]string{ColumnDep, ColumnDependency}): dep.ProvidersRule},
}
_, err := client.GetMongoClient().Update(context.TODO(), CollectionDep, filter, updateData, &options.UpdateOptions{Upsert: &upsert})
if err != nil {
@@ -364,7 +364,7 @@ func removeProviderDeps(ctx context.Context, depRule *DependencyRule, cache map[
func removeConsumerDeps(ctx context.Context, depRule *DependencyRule, cache map[*DelDepCacheKey]bool) (err error) {
var left []*discovery.MicroServiceKey
- for _, key := range depRule.DepInfo.Dependency {
+ for _, key := range depRule.Dep.Dependency {
if key.ServiceName == "*" {
left = append(left, key)
continue
@@ -387,7 +387,7 @@ func removeConsumerDeps(ctx context.Context, depRule *DependencyRule, cache map[
left = append(left, key)
}
}
- if len(depRule.DepInfo.Dependency) == len(left) {
+ if len(depRule.Dep.Dependency) == len(left) {
return nil
}
@@ -398,7 +398,7 @@ func removeConsumerDeps(ctx context.Context, depRule *DependencyRule, cache map[
_, err = client.GetMongoClient().DocDelete(ctx, CollectionDep, filter)
} else {
updateData := bson.M{
- "$set": bson.M{StringBuilder([]string{ColumnDepInfo, ColumnDependency}): left},
+ "$set": bson.M{StringBuilder([]string{ColumnDep, ColumnDependency}): left},
}
_, err = client.GetMongoClient().Update(ctx, CollectionDep, filter, updateData)
}
@@ -422,7 +422,7 @@ func TransferToMicroServiceDependency(ctx context.Context, filter bson.M) (*disc
if err != nil {
return nil, err
}
- microServiceDependency.Dependency = append(microServiceDependency.Dependency, depRule.DepInfo.Dependency...)
+ microServiceDependency.Dependency = append(microServiceDependency.Dependency, depRule.Dep.Dependency...)
return microServiceDependency, nil
}
return microServiceDependency, nil
@@ -442,7 +442,7 @@ func GetConsumerDepInfo(ctx context.Context, filter bson.M) ([]*discovery.Consum
if err != nil {
return nil, err
}
- ConsumerDeps = append(ConsumerDeps, dep.ConsumerDepInfo)
+ ConsumerDeps = append(ConsumerDeps, dep.ConsumerDep)
}
return ConsumerDeps, nil
}
@@ -464,7 +464,7 @@ func getServiceID(ctx context.Context, filter bson.M) (serviceID string, err err
return
}
if svc != nil {
- serviceID = svc.ServiceInfo.ServiceId
+ serviceID = svc.Service.ServiceId
return
}
return
diff --git a/datasource/mongo/dependency_query.go b/datasource/mongo/dependency_query.go
index c245dbe..a83b35c 100644
--- a/datasource/mongo/dependency_query.go
+++ b/datasource/mongo/dependency_query.go
@@ -103,7 +103,7 @@ func (dr *DependencyRelation) GetDependencyProviders(opts ...DependencyRelationF
if op.NonSelf && providerID == dr.consumer.ServiceId {
continue
}
- services = append(services, provider.ServiceInfo)
+ services = append(services, provider.Service)
}
if key.ServiceName == "*" {
break
@@ -197,8 +197,8 @@ func (dr *DependencyRelation) GetConsumerOfSameServiceNameAndAppID(provider *pb.
continue
}
}
- if len(depRule.DepInfo.Dependency) > 0 {
- allConsumers = append(allConsumers, depRule.DepInfo.Dependency...)
+ if len(depRule.Dep.Dependency) > 0 {
+ allConsumers = append(allConsumers, depRule.Dep.Dependency...)
}
}
return allConsumers, nil
@@ -244,8 +244,8 @@ func (dr *DependencyRelation) GetServiceByMicroServiceKey(service *pb.MicroServi
if err != nil {
return nil, err
}
- if service.ServiceInfo != nil {
- return service.ServiceInfo, nil
+ if service.Service != nil {
+ return service.Service, nil
}
}
return nil, nil
@@ -327,7 +327,7 @@ func (dr *DependencyRelation) parseDependencyRule(dependencyRule *pb.MicroServic
if err != nil {
return nil, err
}
- serviceIDs = append(serviceIDs, service.ServiceInfo.ServiceId)
+ serviceIDs = append(serviceIDs, service.Service.ServiceId)
}
default:
serviceIDs, _, err = FindServiceIds(dr.ctx, dependencyRule.Version, dependencyRule)
@@ -366,10 +366,10 @@ func MicroServiceKeyFilter(key *pb.MicroServiceKey) (bson.M, error) {
return bson.M{
ColumnDomain: tenant[0],
ColumnProject: tenant[1],
- StringBuilder([]string{ColumnServiceInfo, ColumnEnv}): key.Environment,
- StringBuilder([]string{ColumnServiceInfo, ColumnAppID}): key.AppId,
- StringBuilder([]string{ColumnServiceInfo, ColumnAlias}): key.Alias,
- StringBuilder([]string{ColumnServiceInfo, ColumnVersion}): key.Version}, nil
+ StringBuilder([]string{ColumnService, ColumnEnv}): key.Environment,
+ StringBuilder([]string{ColumnService, ColumnAppID}): key.AppId,
+ StringBuilder([]string{ColumnService, ColumnAlias}): key.Alias,
+ StringBuilder([]string{ColumnService, ColumnVersion}): key.Version}, nil
}
func RelyAllServiceKey(key *pb.MicroServiceKey) (bson.M, error) {
@@ -380,7 +380,7 @@ func RelyAllServiceKey(key *pb.MicroServiceKey) (bson.M, error) {
return bson.M{
ColumnDomain: tenant[0],
ColumnProject: tenant[1],
- StringBuilder([]string{ColumnServiceInfo, ColumnEnv}): key.Environment}, nil
+ StringBuilder([]string{ColumnService, ColumnEnv}): key.Environment}, nil
}
func FindServiceIds(ctx context.Context, versionRule string, key *pb.MicroServiceKey) ([]string, bool, error) {
@@ -396,8 +396,8 @@ func FindServiceIds(ctx context.Context, versionRule string, key *pb.MicroServic
baseFilter := bson.D{
{Key: ColumnDomain, Value: tenant[0]},
{Key: ColumnProject, Value: tenant[1]},
- {Key: StringBuilder([]string{ColumnServiceInfo, ColumnEnv}), Value: key.Environment},
- {Key: StringBuilder([]string{ColumnServiceInfo, ColumnAppID}), Value: key.AppId}}
+ {Key: StringBuilder([]string{ColumnService, ColumnEnv}), Value: key.Environment},
+ {Key: StringBuilder([]string{ColumnService, ColumnAppID}), Value: key.AppId}}
serviceIds, exist, err := findServiceKeysByServiceName(ctx, versionRule, key, baseFilter)
if err != nil {
@@ -444,13 +444,13 @@ func serviceVersionFilter(ctx context.Context, versionRule string, filter bson.D
func findServiceKeysByServiceName(ctx context.Context, versionRule string, key *pb.MicroServiceKey, baseFilter bson.D) ([]string, bool, error) {
filter := append(baseFilter,
- bson.E{Key: StringBuilder([]string{ColumnServiceInfo, ColumnServiceName}), Value: key.ServiceName})
+ bson.E{Key: StringBuilder([]string{ColumnService, ColumnServiceName}), Value: key.ServiceName})
return serviceVersionFilter(ctx, versionRule, filter)
}
func findServiceKeysByAlias(ctx context.Context, versionRule string, key *pb.MicroServiceKey, baseFilter bson.D) ([]string, bool, error) {
filter := append(baseFilter,
- bson.E{Key: StringBuilder([]string{ColumnServiceInfo, ColumnAlias}), Value: key.Alias})
+ bson.E{Key: StringBuilder([]string{ColumnService, ColumnAlias}), Value: key.Alias})
return serviceVersionFilter(ctx, versionRule, filter)
}
@@ -463,15 +463,15 @@ func findServiceKeys(ctx context.Context, versionRule string, filter bson.D) (fi
return GetVersionServiceLatest, filter
case versionRule[len(versionRule)-1:] == "+":
start := versionRule[:len(versionRule)-1]
- filter = append(filter, bson.E{Key: StringBuilder([]string{ColumnServiceInfo, ColumnVersion}), Value: bson.M{"$gte": start}})
+ filter = append(filter, bson.E{Key: StringBuilder([]string{ColumnService, ColumnVersion}), Value: bson.M{"$gte": start}})
return GetVersionService, filter
case rangeIdx > 0:
start := versionRule[:rangeIdx]
end := versionRule[rangeIdx+1:]
- filter = append(filter, bson.E{Key: StringBuilder([]string{ColumnServiceInfo, ColumnVersion}), Value: bson.M{"$gte": start, "$lt": end}})
+ filter = append(filter, bson.E{Key: StringBuilder([]string{ColumnService, ColumnVersion}), Value: bson.M{"$gte": start, "$lt": end}})
return GetVersionService, filter
default:
- filter = append(filter, bson.E{Key: StringBuilder([]string{ColumnServiceInfo, ColumnVersion}), Value: versionRule})
+ filter = append(filter, bson.E{Key: StringBuilder([]string{ColumnService, ColumnVersion}), Value: versionRule})
return nil, filter
}
}
@@ -479,7 +479,7 @@ func findServiceKeys(ctx context.Context, versionRule string, filter bson.D) (fi
func GetVersionServiceLatest(ctx context.Context, m bson.D) (serviceIds []string, err error) {
findRes, err := client.GetMongoClient().Find(ctx, CollectionService, m,
&options.FindOptions{
- Sort: bson.M{StringBuilder([]string{ColumnServiceInfo, ColumnVersion}): -1}})
+ Sort: bson.M{StringBuilder([]string{ColumnService, ColumnVersion}): -1}})
if err != nil {
return nil, err
}
@@ -492,7 +492,7 @@ func GetVersionServiceLatest(ctx context.Context, m bson.D) (serviceIds []string
if err != nil {
return
}
- serviceIds = append(serviceIds, service.ServiceInfo.ServiceId)
+ serviceIds = append(serviceIds, service.Service.ServiceId)
if serviceIds != nil {
return
}
@@ -502,7 +502,7 @@ func GetVersionServiceLatest(ctx context.Context, m bson.D) (serviceIds []string
func GetVersionService(ctx context.Context, m bson.D) (serviceIds []string, err error) {
findRes, err := client.GetMongoClient().Find(ctx, CollectionService, m, &options.FindOptions{
- Sort: bson.M{StringBuilder([]string{ColumnServiceInfo, ColumnVersion}): -1}})
+ Sort: bson.M{StringBuilder([]string{ColumnService, ColumnVersion}): -1}})
if err != nil {
return
}
@@ -515,7 +515,7 @@ func GetVersionService(ctx context.Context, m bson.D) (serviceIds []string, err
if err != nil {
return
}
- serviceIds = append(serviceIds, service.ServiceInfo.ServiceId)
+ serviceIds = append(serviceIds, service.Service.ServiceId)
}
return
}
@@ -541,7 +541,7 @@ func ParseVersionRule(ctx context.Context, versionRule string, key *pb.MicroServ
filter := bson.M{
ColumnDomain: tenant[0],
ColumnProject: tenant[1],
- StringBuilder([]string{ColumnServiceInfo, ColumnVersion}): bson.M{"$gte": start}}
+ StringBuilder([]string{ColumnService, ColumnVersion}): bson.M{"$gte": start}}
return GetFilterVersionService(ctx, filter)
case rangeIdx > 0:
start := versionRule[:rangeIdx]
@@ -549,7 +549,7 @@ func ParseVersionRule(ctx context.Context, versionRule string, key *pb.MicroServ
filter := bson.M{
ColumnDomain: tenant[0],
ColumnProject: tenant[1],
- StringBuilder([]string{ColumnServiceInfo, ColumnVersion}): bson.M{"$gte": start, "$lte": end}}
+ StringBuilder([]string{ColumnService, ColumnVersion}): bson.M{"$gte": start, "$lte": end}}
return GetFilterVersionService(ctx, filter)
default:
return nil, nil
@@ -570,7 +570,7 @@ func GetFilterVersionService(ctx context.Context, m bson.M) (serviceIDs []string
if err != nil {
return nil, err
}
- serviceIDs = append(serviceIDs, service.ServiceInfo.ServiceId)
+ serviceIDs = append(serviceIDs, service.Service.ServiceId)
}
return
}
@@ -578,7 +578,7 @@ func GetFilterVersionService(ctx context.Context, m bson.M) (serviceIDs []string
func GetFilterVersionServiceLatest(ctx context.Context, m bson.M) (serviceIDs []string, err error) {
findRes, err := client.GetMongoClient().Find(ctx, CollectionService, m,
&options.FindOptions{
- Sort: bson.M{StringBuilder([]string{ColumnServiceInfo, ColumnVersion}): -1}})
+ Sort: bson.M{StringBuilder([]string{ColumnService, ColumnVersion}): -1}})
if err != nil {
return nil, err
}
@@ -591,7 +591,7 @@ func GetFilterVersionServiceLatest(ctx context.Context, m bson.M) (serviceIDs []
if err != nil {
return nil, err
}
- serviceIDs = append(serviceIDs, service.ServiceInfo.ServiceId)
+ serviceIDs = append(serviceIDs, service.Service.ServiceId)
if serviceIDs != nil {
return serviceIDs, nil
}
diff --git a/datasource/mongo/engine.go b/datasource/mongo/engine.go
index e7b799a..07bca41 100644
--- a/datasource/mongo/engine.go
+++ b/datasource/mongo/engine.go
@@ -216,7 +216,7 @@ func GetAllServicesAcrossDomainProject(ctx context.Context) (map[string][]*pb.Mi
return nil, err
}
domainProject := mongoService.Domain + "/" + mongoService.Project
- services[domainProject] = append(services[domainProject], mongoService.ServiceInfo)
+ services[domainProject] = append(services[domainProject], mongoService.Service)
}
return services, nil
}
diff --git a/datasource/mongo/event/instance_event_handler.go b/datasource/mongo/event/instance_event_handler.go
index c11328c..75b846f 100644
--- a/datasource/mongo/event/instance_event_handler.go
+++ b/datasource/mongo/event/instance_event_handler.go
@@ -49,13 +49,13 @@ func (h InstanceEventHandler) Type() string {
func (h InstanceEventHandler) OnEvent(evt sd.MongoEvent) {
action := evt.Type
instance := evt.Value.(sd.Instance)
- providerID := instance.InstanceInfo.ServiceId
- providerInstanceID := instance.InstanceInfo.InstanceId
+ providerID := instance.Instance.ServiceId
+ providerInstanceID := instance.Instance.InstanceId
domainProject := instance.Domain + "/" + instance.Project
cacheService := sd.Store().Service().Cache().Get(providerID)
var microService *discovery.MicroService
if cacheService != nil {
- microService = cacheService.(sd.Service).ServiceInfo
+ microService = cacheService.(sd.Service).Service
}
if microService == nil {
log.Info("get cached service failed, then get from database")
@@ -68,10 +68,10 @@ func (h InstanceEventHandler) OnEvent(evt sd.MongoEvent) {
}
return
}
- microService = service.ServiceInfo // service in the cache may not ready, query from db once
+ microService = service.Service // service in the cache may not ready, query from db once
if microService == nil {
log.Warn(fmt.Sprintf("caught [%s] instance[%s/%s] event, endpoints %v, get provider's file failed from db\n",
- action, providerID, providerInstanceID, instance.InstanceInfo.Endpoints))
+ action, providerID, providerInstanceID, instance.Instance.Endpoints))
return
}
}
@@ -100,7 +100,7 @@ func PublishInstanceEvent(evt sd.MongoEvent, domainProject string, serviceKey *d
Response: discovery.CreateResponse(discovery.ResponseSuccess, "Watch instance successfully."),
Action: string(evt.Type),
Key: serviceKey,
- Instance: evt.Value.(sd.Instance).InstanceInfo,
+ Instance: evt.Value.(sd.Instance).Instance,
}
for _, consumerID := range subscribers {
evt := notify.NewInstanceEventWithTime(consumerID, domainProject, -1, simple.FromTime(time.Now()), response)
@@ -112,7 +112,7 @@ func PublishInstanceEvent(evt sd.MongoEvent, domainProject string, serviceKey *d
}
func NotifySyncerInstanceEvent(event sd.MongoEvent, microService *discovery.MicroService) {
- instance := event.Value.(sd.Instance).InstanceInfo
+ instance := event.Value.(sd.Instance).Instance
log.Info(fmt.Sprintf("instanceId : %s and serviceId : %s in NotifySyncerInstanceEvent", instance.InstanceId, instance.ServiceId))
instanceKey := util.StringJoin([]string{datasource.InstanceKeyPrefix, event.Value.(sd.Instance).Domain,
event.Value.(sd.Instance).Project, instance.ServiceId, instance.InstanceId}, datasource.SPLIT)
diff --git a/datasource/mongo/event/instance_event_handler_test.go b/datasource/mongo/event/instance_event_handler_test.go
index 5f0e168..798afc1 100644
--- a/datasource/mongo/event/instance_event_handler_test.go
+++ b/datasource/mongo/event/instance_event_handler_test.go
@@ -67,7 +67,7 @@ func mongoAssign() sd.MongoEvent {
Endpoints: endPoints,
}
mongoInstance := sd.Instance{}
- mongoInstance.InstanceInfo = &instance
+ mongoInstance.Instance = &instance
mongoInstance.Domain = "default"
mongoInstance.Project = "default"
mongoEvent := sd.MongoEvent{}
@@ -87,7 +87,7 @@ func mongoEventWronServiceId() sd.MongoEvent {
Endpoints: endPoints,
}
mongoInstance := sd.Instance{}
- mongoInstance.InstanceInfo = &instance
+ mongoInstance.Instance = &instance
mongoInstance.Domain = "default"
mongoInstance.Project = "default"
mongoEvent := sd.MongoEvent{}
diff --git a/datasource/mongo/heartbeat/cache/heartbeat.go b/datasource/mongo/heartbeat/cache/heartbeat.go
new file mode 100644
index 0000000..159d8c9
--- /dev/null
+++ b/datasource/mongo/heartbeat/cache/heartbeat.go
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except request compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to request writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package heartbeatcache
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "runtime"
+ "time"
+
+ "github.com/patrickmn/go-cache"
+ "go.mongodb.org/mongo-driver/bson"
+
+ "github.com/apache/servicecomb-service-center/datasource/mongo"
+ "github.com/apache/servicecomb-service-center/datasource/mongo/client"
+ "github.com/apache/servicecomb-service-center/pkg/gopool"
+ "github.com/apache/servicecomb-service-center/pkg/log"
+ "github.com/apache/servicecomb-service-center/server/config"
+)
+
+const (
+ defaultTTL = 30
+ defaultCacheCapacity = 10000
+ defaultWorkNum = 10
+ defaultTimeout = 10
+ instanceCheckerInternal = 1 * time.Second
+ ctxTimeout = 5 * time.Second
+)
+
+// Store cache structure
+type instanceHeartbeatInfo struct {
+ serviceID string
+ instanceID string
+ ttl int32
+ lastRefresh time.Time
+}
+
+var (
+ cacheChan chan *instanceHeartbeatInfo
+ instanceHeartbeatStore = cache.New(0, instanceCheckerInternal)
+ workerNum = runtime.NumCPU()
+ heartbeatTaskTimeout = config.GetInt("registry.mongo.heartbeat.timeout", defaultTimeout)
+ ErrHeartbeatTimeout = errors.New("heartbeat task waiting for processing timeout. ")
+)
+
+func init() {
+ capacity := config.GetInt("registry.mongo.heartbeat.cacheCapacity", defaultCacheCapacity)
+ cacheChan = make(chan *instanceHeartbeatInfo, capacity)
+ num := config.GetInt("registry.mongo.heartbeat.workerNum", defaultWorkNum)
+ if num != 0 {
+ workerNum = num
+ }
+ instanceHeartbeatStore.OnEvicted(func(k string, v interface{}) {
+ instanceInfo, ok := v.(*instanceHeartbeatInfo)
+ if ok && instanceInfo != nil {
+ ctx, cancel := context.WithTimeout(context.Background(), ctxTimeout)
+ defer cancel()
+ err := cleanInstance(ctx, instanceInfo.serviceID, instanceInfo.instanceID)
+ if err != nil {
+ log.Error("failed to cleanInstance in mongodb.", err)
+ }
+ }
+ })
+
+ for i := 1; i <= workerNum; i++ {
+ gopool.Go(func(ctx context.Context) {
+ for {
+ select {
+ case <-ctx.Done():
+ log.Warn("heartbeat work protocol exit.")
+ return
+ case heartbeatInfo, ok := <-cacheChan:
+ if ok {
+ instanceHeartbeatStore.Set(heartbeatInfo.instanceID, heartbeatInfo, time.Duration(heartbeatInfo.ttl)*time.Second)
+ }
+ }
+ }
+ })
+ }
+}
+
+func addHeartbeatTask(serviceID string, instanceID string, ttl int32) error {
+ // Unassigned setting default value is 30s
+ if ttl <= 0 {
+ ttl = defaultTTL
+ }
+ newInstance := &instanceHeartbeatInfo{
+ serviceID: serviceID,
+ instanceID: instanceID,
+ ttl: ttl,
+ lastRefresh: time.Now(),
+ }
+ select {
+ case cacheChan <- newInstance:
+ return nil
+ case <-time.After(time.Duration(heartbeatTaskTimeout) * time.Second):
+ log.Warn("the heartbeat's channel is full. ")
+ return ErrHeartbeatTimeout
+ }
+}
+
+func RemoveCacheInstance(instanceID string) {
+ instanceHeartbeatStore.Delete(instanceID)
+}
+
+func cleanInstance(ctx context.Context, serviceID string, instanceID string) error {
+ session, err := client.GetMongoClient().StartSession(ctx)
+ if err != nil {
+ return err
+ }
+ if err = session.StartTransaction(); err != nil {
+ return err
+ }
+ defer session.EndSession(ctx)
+
+ filter := bson.M{
+ mongo.StringBuilder([]string{mongo.ColumnInstance, mongo.ColumnServiceID}): serviceID,
+ mongo.StringBuilder([]string{mongo.ColumnInstance, mongo.ColumnInstanceID}): instanceID,
+ }
+
+ result, err := client.GetMongoClient().FindOne(ctx, mongo.CollectionInstance, filter)
+ if err != nil {
+ log.Error("failed to query instance: %v", err)
+ return err
+ }
+ var ins mongo.Instance
+ err = result.Decode(&ins)
+ if err != nil {
+ log.Error("decode instance failed: %v", err)
+ return err
+ }
+ ttl := ins.Instance.HealthCheck.Interval * (ins.Instance.HealthCheck.Times + 1)
+ if ttl <= 0 {
+ ttl = defaultTTL
+ }
+ if isOutDate(ins.RefreshTime, ttl) {
+ return nil
+ }
+ err = removeDBInstance(ctx, ins.Instance.ServiceId, ins.Instance.InstanceId)
+ if err != nil {
+ log.Error("fail to remote instance in db: %v", err)
+ errAbort := session.AbortTransaction(ctx)
+ if errAbort != nil {
+ return errAbort
+ }
+ return err
+ }
+ err = session.CommitTransaction(ctx)
+ return err
+}
+
+func removeDBInstance(ctx context.Context, serviceID string, instanceID string) error {
+ filter := bson.M{
+ mongo.StringBuilder([]string{mongo.ColumnInstance, mongo.ColumnServiceID}): serviceID,
+ mongo.StringBuilder([]string{mongo.ColumnInstance, mongo.ColumnInstanceID}): instanceID,
+ }
+ res, err := client.GetMongoClient().DeleteOne(ctx, mongo.CollectionInstance, filter)
+ if err != nil {
+ log.Error("failed to clean instance", err)
+ return err
+ }
+ log.Info(fmt.Sprintf("delete from mongodb:%+v", res))
+ return nil
+}
+
+func findInstance(ctx context.Context, serviceID string, instanceID string) (*mongo.Instance, error) {
+ filter := bson.M{
+ mongo.StringBuilder([]string{mongo.ColumnInstance, mongo.ColumnServiceID}): serviceID,
+ mongo.StringBuilder([]string{mongo.ColumnInstance, mongo.ColumnInstanceID}): instanceID,
+ }
+ result, err := client.GetMongoClient().FindOne(ctx, mongo.CollectionInstance, filter)
+ if err != nil {
+ return nil, err
+ }
+ var ins mongo.Instance
+ err = result.Decode(&ins)
+ if err != nil {
+ log.Error("decode instance failed: ", err)
+ return nil, err
+ }
+ return &ins, nil
+}
+
+func updateInstance(ctx context.Context, serviceID string, instanceID string) error {
+ filter := bson.M{
+ mongo.StringBuilder([]string{mongo.ColumnInstance, mongo.ColumnServiceID}): serviceID,
+ mongo.StringBuilder([]string{mongo.ColumnInstance, mongo.ColumnInstanceID}): instanceID,
+ }
+ update := bson.M{
+ "$set": bson.M{mongo.ColumnRefreshTime: time.Now()},
+ }
+ result, err := client.GetMongoClient().FindOneAndUpdate(ctx, mongo.CollectionInstance, filter, update)
+ if err != nil {
+ log.Error("failed to update refresh time of instance: ", err)
+ return err
+ }
+ return result.Err()
+}
+
+func isOutDate(refreshTime time.Time, ttl int32) bool {
+ return time.Since(refreshTime) <= time.Duration(ttl)*time.Second
+}
diff --git a/datasource/mongo/heartbeat/cache/heartbeat_test.go b/datasource/mongo/heartbeat/cache/heartbeat_test.go
new file mode 100644
index 0000000..6d7670c
--- /dev/null
+++ b/datasource/mongo/heartbeat/cache/heartbeat_test.go
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except request compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to request writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package heartbeatcache
+
+import (
+ _ "github.com/apache/servicecomb-service-center/server/init"
+)
+
+import (
+ "context"
+ "testing"
+ "time"
+
+ pb "github.com/go-chassis/cari/discovery"
+ "github.com/go-chassis/go-chassis/v2/storage"
+ "github.com/stretchr/testify/assert"
+
+ "github.com/apache/servicecomb-service-center/datasource/mongo"
+ "github.com/apache/servicecomb-service-center/datasource/mongo/client"
+)
+
+func init() {
+ config := storage.Options{
+ URI: "mongodb://localhost:27017",
+ }
+ client.NewMongoClient(config)
+}
+
+func TestAddCacheInstance(t *testing.T) {
+ t.Run("add cache instance: set the ttl to 2 seconds", func(t *testing.T) {
+ instance1 := mongo.Instance{
+ RefreshTime: time.Now(),
+ Instance: &pb.MicroServiceInstance{
+ InstanceId: "instanceID1",
+ ServiceId: "serviceID1",
+ HealthCheck: &pb.HealthCheck{
+ Interval: 1,
+ Times: 1,
+ },
+ },
+ }
+ err := addHeartbeatTask(instance1.Instance.ServiceId, instance1.Instance.InstanceId, instance1.Instance.HealthCheck.Interval*(instance1.Instance.HealthCheck.Times+1))
+ assert.Equal(t, nil, err)
+ _, err = client.GetMongoClient().Insert(context.Background(), mongo.CollectionInstance, instance1)
+ assert.Equal(t, nil, err)
+ info, ok := instanceHeartbeatStore.Get(instance1.Instance.InstanceId)
+ assert.Equal(t, true, ok)
+ if ok {
+ heartBeatInfo := info.(*instanceHeartbeatInfo)
+ assert.Equal(t, instance1.Instance.InstanceId, heartBeatInfo.instanceID)
+ assert.Equal(t, instance1.Instance.HealthCheck.Interval*(instance1.Instance.HealthCheck.Times+1), heartBeatInfo.ttl)
+ }
+ time.Sleep(2 * time.Second)
+ _, ok = instanceHeartbeatStore.Get(instance1.Instance.InstanceId)
+ assert.Equal(t, false, ok)
+ _, err = client.GetMongoClient().Delete(context.Background(), mongo.CollectionInstance, instance1)
+ assert.Equal(t, nil, err)
+ })
+
+ t.Run("add cache instance: do not set interval time", func(t *testing.T) {
+ instance1 := mongo.Instance{
+ RefreshTime: time.Now(),
+ Instance: &pb.MicroServiceInstance{
+ InstanceId: "instanceID1",
+ ServiceId: "serviceID1",
+ HealthCheck: &pb.HealthCheck{
+ Interval: 0,
+ Times: 0,
+ },
+ },
+ }
+ err := addHeartbeatTask(instance1.Instance.ServiceId, instance1.Instance.InstanceId, instance1.Instance.HealthCheck.Interval*(instance1.Instance.HealthCheck.Times+1))
+ assert.Equal(t, nil, err)
+ _, err = client.GetMongoClient().Insert(context.Background(), mongo.CollectionInstance, instance1)
+ assert.Equal(t, nil, err)
+ info, ok := instanceHeartbeatStore.Get(instance1.Instance.InstanceId)
+ assert.Equal(t, true, ok)
+ if ok {
+ heartBeatInfo := info.(*instanceHeartbeatInfo)
+ assert.Equal(t, instance1.Instance.InstanceId, heartBeatInfo.instanceID)
+ assert.Equal(t, int32(defaultTTL), heartBeatInfo.ttl)
+ }
+ time.Sleep(defaultTTL * time.Second)
+ _, ok = instanceHeartbeatStore.Get(instance1.Instance.InstanceId)
+ assert.Equal(t, false, ok)
+ _, err = client.GetMongoClient().Delete(context.Background(), mongo.CollectionInstance, instance1)
+ assert.Equal(t, nil, err)
+ })
+}
+
+func TestRemoveCacheInstance(t *testing.T) {
+ t.Run("remove cache instance: the instance has cache and can be deleted successfully", func(t *testing.T) {
+ instance1 := mongo.Instance{
+ RefreshTime: time.Now(),
+ Instance: &pb.MicroServiceInstance{
+ InstanceId: "instanceID1",
+ ServiceId: "serviceID1",
+ HealthCheck: &pb.HealthCheck{
+ Interval: 3,
+ Times: 1,
+ },
+ },
+ }
+ err := addHeartbeatTask(instance1.Instance.ServiceId, instance1.Instance.InstanceId, instance1.Instance.HealthCheck.Interval*(instance1.Instance.HealthCheck.Times+1))
+ assert.Equal(t, nil, err)
+ _, err = client.GetMongoClient().Insert(context.Background(), mongo.CollectionInstance, instance1)
+ assert.Equal(t, nil, err)
+ info, ok := instanceHeartbeatStore.Get(instance1.Instance.InstanceId)
+ assert.Equal(t, true, ok)
+ if ok {
+ heartBeatInfo := info.(*instanceHeartbeatInfo)
+ assert.Equal(t, instance1.Instance.InstanceId, heartBeatInfo.instanceID)
+ assert.Equal(t, instance1.Instance.HealthCheck.Interval*(instance1.Instance.HealthCheck.Times+1), heartBeatInfo.ttl)
+ }
+ time.Sleep(4 * time.Second)
+ RemoveCacheInstance(instance1.Instance.InstanceId)
+ _, ok = instanceHeartbeatStore.Get(instance1.Instance.InstanceId)
+ assert.Equal(t, false, ok)
+ _, err = client.GetMongoClient().Delete(context.Background(), mongo.CollectionInstance, instance1)
+ assert.Equal(t, nil, err)
+ })
+}
diff --git a/datasource/mongo/heartbeat/cache/heartbeatcache.go b/datasource/mongo/heartbeat/cache/heartbeatcache.go
new file mode 100644
index 0000000..46588e9
--- /dev/null
+++ b/datasource/mongo/heartbeat/cache/heartbeatcache.go
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package heartbeatcache
+
+import (
+ "context"
+ "errors"
+ "fmt"
+
+ pb "github.com/go-chassis/cari/discovery"
+
+ "github.com/apache/servicecomb-service-center/datasource/mongo/heartbeat"
+ "github.com/apache/servicecomb-service-center/pkg/log"
+ "github.com/apache/servicecomb-service-center/pkg/util"
+)
+
+var ErrHeartbeatConversionFailed = errors.New("instanceHeartbeatInfo type conversion failed. ")
+
+func init() {
+ heartbeat.Install("cache", NewHeartBeatCheck)
+}
+
+type HeartBeatCheck struct {
+}
+
+func NewHeartBeatCheck(opts heartbeat.Options) (heartbeat.HealthCheck, error) {
+ return &HeartBeatCheck{}, nil
+}
+
+func (h *HeartBeatCheck) Heartbeat(ctx context.Context, request *pb.HeartbeatRequest) (*pb.HeartbeatResponse, error) {
+ if ins, ok := instanceHeartbeatStore.Get(request.InstanceId); ok {
+ return inCacheStrategy(ctx, request, ins)
+ } else {
+ return notInCacheStrategy(ctx, request)
+ }
+}
+
+func inCacheStrategy(ctx context.Context, request *pb.HeartbeatRequest, insHeartbeatInfo interface{}) (*pb.HeartbeatResponse, error) {
+ remoteIP := util.GetIPFromContext(ctx)
+ heartbeatInfo, ok := insHeartbeatInfo.(*instanceHeartbeatInfo)
+ if !ok {
+ log.Error("type conversion failed: %v", ErrHeartbeatConversionFailed)
+ resp := &pb.HeartbeatResponse{
+ Response: pb.CreateResponseWithSCErr(pb.NewError(pb.ErrInstanceNotExists, ErrHeartbeatConversionFailed.Error())),
+ }
+ return resp, ErrHeartbeatConversionFailed
+ }
+ err := addHeartbeatTask(request.ServiceId, request.InstanceId, heartbeatInfo.ttl)
+ if err != nil {
+ log.Error(fmt.Sprintf("heartbeat failed, instance[%s]. operator %s", request.InstanceId, remoteIP), err)
+ resp := &pb.HeartbeatResponse{
+ Response: pb.CreateResponseWithSCErr(pb.NewError(pb.ErrNotEnoughQuota, err.Error())),
+ }
+ return resp, err
+ }
+ err = updateInstance(ctx, request.ServiceId, request.InstanceId)
+ if err != nil {
+ log.Error(fmt.Sprintf("heartbeat failed, instance[%s]. operator %s", request.InstanceId, remoteIP), err)
+ resp := &pb.HeartbeatResponse{
+ Response: pb.CreateResponseWithSCErr(pb.NewError(pb.ErrInstanceNotExists, err.Error())),
+ }
+ return resp, err
+ }
+ return &pb.HeartbeatResponse{
+ Response: pb.CreateResponse(pb.ResponseSuccess, "update service instance heartbeat successfully"),
+ }, nil
+}
+
+func notInCacheStrategy(ctx context.Context, request *pb.HeartbeatRequest) (*pb.HeartbeatResponse, error) {
+ remoteIP := util.GetIPFromContext(ctx)
+ instance, err := findInstance(ctx, request.ServiceId, request.InstanceId)
+ if err != nil {
+ log.Error(fmt.Sprintf("heartbeat failed, instance[%s]. operator %s", request.InstanceId, remoteIP), err)
+ resp := &pb.HeartbeatResponse{
+ Response: pb.CreateResponseWithSCErr(pb.NewError(pb.ErrInstanceNotExists, err.Error())),
+ }
+ return resp, err
+ }
+ interval, times := instance.Instance.HealthCheck.Interval, instance.Instance.HealthCheck.Times
+ err = addHeartbeatTask(request.ServiceId, request.InstanceId, interval*(times+1))
+ if err != nil {
+ log.Error(fmt.Sprintf("heartbeat failed, instance[%s]. operator %s", request.InstanceId, remoteIP), err)
+ resp := &pb.HeartbeatResponse{
+ Response: pb.CreateResponseWithSCErr(pb.NewError(pb.ErrNotEnoughQuota, err.Error())),
+ }
+ return resp, err
+ }
+ err = updateInstance(ctx, request.ServiceId, request.InstanceId)
+ if err != nil {
+ RemoveCacheInstance(request.InstanceId)
+ log.Error(fmt.Sprintf("heartbeat failed, instance[%s]. operator %s", request.InstanceId, remoteIP), err)
+ resp := &pb.HeartbeatResponse{
+ Response: pb.CreateResponseWithSCErr(pb.NewError(pb.ErrInstanceNotExists, err.Error())),
+ }
+ return resp, err
+ }
+ return &pb.HeartbeatResponse{
+ Response: pb.CreateResponse(pb.ResponseSuccess, "update service instance heartbeat successfully"),
+ }, nil
+}
diff --git a/datasource/mongo/heartbeat/cache/heartbeatcache_test.go b/datasource/mongo/heartbeat/cache/heartbeatcache_test.go
new file mode 100644
index 0000000..58980d6
--- /dev/null
+++ b/datasource/mongo/heartbeat/cache/heartbeatcache_test.go
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except request compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to request writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package heartbeatcache
+
+import (
+ "context"
+ "testing"
+ "time"
+
+ pb "github.com/go-chassis/cari/discovery"
+ "github.com/stretchr/testify/assert"
+ "go.mongodb.org/mongo-driver/bson"
+
+ "github.com/apache/servicecomb-service-center/datasource/mongo"
+ "github.com/apache/servicecomb-service-center/datasource/mongo/client"
+)
+
+func TestHeartBeatCheck(t *testing.T) {
+ t.Run("heartbeat check: instance does not exist,it should be failed", func(t *testing.T) {
+ heartBeatCheck := &HeartBeatCheck{}
+ resp, err := heartBeatCheck.Heartbeat(context.Background(), &pb.HeartbeatRequest{
+ ServiceId: "serviceId1",
+ InstanceId: "not-exist-ins",
+ })
+ assert.NotNil(t, err)
+ assert.NotEqual(t, pb.ResponseSuccess, resp.Response.GetCode())
+ })
+
+ t.Run("heartbeat check: data exists in the cache,but not in db,it should be failed", func(t *testing.T) {
+ err := addHeartbeatTask("not-exist-svc", "not-exist-ins", 30)
+ assert.Nil(t, err)
+ heartBeatCheck := &HeartBeatCheck{}
+ resp, err := heartBeatCheck.Heartbeat(context.Background(), &pb.HeartbeatRequest{
+ ServiceId: "serviceId1",
+ InstanceId: "not-exist-ins",
+ })
+ assert.NotNil(t, err)
+ assert.NotEqual(t, pb.ResponseSuccess, resp.Response.GetCode())
+ })
+
+ t.Run("heartbeat check: data exists in the cache and db,it can be update successfully", func(t *testing.T) {
+ heartBeatCheck := &HeartBeatCheck{}
+ instanceDB := mongo.Instance{
+ RefreshTime: time.Now(),
+ Instance: &pb.MicroServiceInstance{
+ InstanceId: "instanceIdDB",
+ ServiceId: "serviceIdDB",
+ HealthCheck: &pb.HealthCheck{
+ Interval: 1,
+ Times: 1,
+ },
+ },
+ }
+ filter := bson.M{
+ mongo.StringBuilder([]string{mongo.ColumnInstance, mongo.ColumnInstanceID}): instanceDB.Instance.InstanceId,
+ }
+ _, _ = client.GetMongoClient().Delete(context.Background(), mongo.CollectionInstance, filter)
+ _, err := client.GetMongoClient().Insert(context.Background(), mongo.CollectionInstance, instanceDB)
+ assert.Equal(t, nil, err)
+ err = addHeartbeatTask(instanceDB.Instance.ServiceId, instanceDB.Instance.InstanceId, instanceDB.Instance.HealthCheck.Interval*(instanceDB.Instance.HealthCheck.Times+1))
+ assert.Equal(t, nil, err)
+ resp, err := heartBeatCheck.Heartbeat(context.Background(), &pb.HeartbeatRequest{
+ ServiceId: "serviceIdDB",
+ InstanceId: "instanceIdDB",
+ })
+ assert.Nil(t, err)
+ assert.Equal(t, pb.ResponseSuccess, resp.Response.GetCode())
+ _, err = client.GetMongoClient().Delete(context.Background(), mongo.CollectionInstance, filter)
+ assert.Nil(t, err)
+ })
+}
diff --git a/datasource/mongo/heartbeat/heartbeatchecker/heartbeat.go b/datasource/mongo/heartbeat/checker/heartbeat.go
similarity index 87%
rename from datasource/mongo/heartbeat/heartbeatchecker/heartbeat.go
rename to datasource/mongo/heartbeat/checker/heartbeat.go
index c5113df..fc949a2 100644
--- a/datasource/mongo/heartbeat/heartbeatchecker/heartbeat.go
+++ b/datasource/mongo/heartbeat/checker/heartbeat.go
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package heartbeatchecker
+package checker
import (
"context"
@@ -30,8 +30,8 @@ import (
func updateInstanceRefreshTime(ctx context.Context, serviceID string, instanceID string) error {
filter := bson.M{
- mongo.StringBuilder([]string{mongo.ColumnInstanceInfo, mongo.ColumnServiceID}): serviceID,
- mongo.StringBuilder([]string{mongo.ColumnInstanceInfo, mongo.ColumnInstanceID}): instanceID,
+ mongo.StringBuilder([]string{mongo.ColumnInstance, mongo.ColumnServiceID}): serviceID,
+ mongo.StringBuilder([]string{mongo.ColumnInstance, mongo.ColumnInstanceID}): instanceID,
}
update := bson.M{
"$set": bson.M{mongo.ColumnRefreshTime: time.Now()},
diff --git a/datasource/mongo/heartbeat/heartbeatchecker/heartbeat_test.go b/datasource/mongo/heartbeat/checker/heartbeat_test.go
similarity index 80%
rename from datasource/mongo/heartbeat/heartbeatchecker/heartbeat_test.go
rename to datasource/mongo/heartbeat/checker/heartbeat_test.go
index 5c6c35f..696203c 100644
--- a/datasource/mongo/heartbeat/heartbeatchecker/heartbeat_test.go
+++ b/datasource/mongo/heartbeat/checker/heartbeat_test.go
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package heartbeatchecker
+package checker
import (
"context"
@@ -49,18 +49,18 @@ func TestUpdateInstanceRefreshTime(t *testing.T) {
t.Run("update instance refresh time: if the instance does exist,the update should succeed", func(t *testing.T) {
instance1 := mongo.Instance{
RefreshTime: time.Now(),
- InstanceInfo: &pb.MicroServiceInstance{
+ Instance: &pb.MicroServiceInstance{
InstanceId: "instanceId1",
ServiceId: "serviceId1",
},
}
_, err := client.GetMongoClient().Insert(context.Background(), mongo.CollectionInstance, instance1)
assert.Equal(t, nil, err)
- err = updateInstanceRefreshTime(context.Background(), instance1.InstanceInfo.ServiceId, instance1.InstanceInfo.InstanceId)
+ err = updateInstanceRefreshTime(context.Background(), instance1.Instance.ServiceId, instance1.Instance.InstanceId)
assert.Equal(t, nil, err)
filter := bson.M{
- mongo.StringBuilder([]string{mongo.ColumnInstanceInfo, mongo.ColumnServiceID}): instance1.InstanceInfo.ServiceId,
- mongo.StringBuilder([]string{mongo.ColumnInstanceInfo, mongo.ColumnInstanceID}): instance1.InstanceInfo.InstanceId,
+ mongo.StringBuilder([]string{mongo.ColumnInstance, mongo.ColumnServiceID}): instance1.Instance.ServiceId,
+ mongo.StringBuilder([]string{mongo.ColumnInstance, mongo.ColumnInstanceID}): instance1.Instance.InstanceId,
}
result, err := client.GetMongoClient().FindOne(context.Background(), mongo.CollectionInstance, filter)
assert.Nil(t, err)
@@ -69,8 +69,8 @@ func TestUpdateInstanceRefreshTime(t *testing.T) {
assert.Nil(t, err)
assert.NotEqual(t, instance1.RefreshTime, ins.RefreshTime)
filter = bson.M{
- mongo.StringBuilder([]string{mongo.ColumnInstanceInfo, mongo.ColumnServiceID}): instance1.InstanceInfo.ServiceId,
- mongo.StringBuilder([]string{mongo.ColumnInstanceInfo, mongo.ColumnInstanceID}): instance1.InstanceInfo.InstanceId,
+ mongo.StringBuilder([]string{mongo.ColumnInstance, mongo.ColumnServiceID}): instance1.Instance.ServiceId,
+ mongo.StringBuilder([]string{mongo.ColumnInstance, mongo.ColumnInstanceID}): instance1.Instance.InstanceId,
}
_, err = client.GetMongoClient().Delete(context.Background(), mongo.CollectionInstance, filter)
assert.Nil(t, err)
diff --git a/datasource/mongo/heartbeat/heartbeatchecker/heartbeatchecker.go b/datasource/mongo/heartbeat/checker/heartbeatchecker.go
similarity index 95%
rename from datasource/mongo/heartbeat/heartbeatchecker/heartbeatchecker.go
rename to datasource/mongo/heartbeat/checker/heartbeatchecker.go
index 03be2af..5183185 100644
--- a/datasource/mongo/heartbeat/heartbeatchecker/heartbeatchecker.go
+++ b/datasource/mongo/heartbeat/checker/heartbeatchecker.go
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package heartbeatchecker
+package checker
import (
"context"
@@ -29,7 +29,7 @@ import (
)
func init() {
- heartbeat.Install("heartbeatchecker", NewHeartBeatChecker)
+ heartbeat.Install("checker", NewHeartBeatChecker)
}
type HeartBeatChecker struct {
diff --git a/datasource/mongo/heartbeat/heartbeatchecker/heartbeatchecker_test.go b/datasource/mongo/heartbeat/checker/heartbeatchecker_test.go
similarity index 88%
rename from datasource/mongo/heartbeat/heartbeatchecker/heartbeatchecker_test.go
rename to datasource/mongo/heartbeat/checker/heartbeatchecker_test.go
index 25d0b89..71e69b8 100644
--- a/datasource/mongo/heartbeat/heartbeatchecker/heartbeatchecker_test.go
+++ b/datasource/mongo/heartbeat/checker/heartbeatchecker_test.go
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package heartbeatchecker
+package checker
import (
"context"
@@ -44,7 +44,7 @@ func TestHeartbeat(t *testing.T) {
t.Run("heartbeat: if the instance does exist,the heartbeat should succeed", func(t *testing.T) {
instance1 := mongo.Instance{
RefreshTime: time.Now(),
- InstanceInfo: &pb.MicroServiceInstance{
+ Instance: &pb.MicroServiceInstance{
InstanceId: "instanceId1",
ServiceId: "serviceId1",
},
@@ -53,13 +53,13 @@ func TestHeartbeat(t *testing.T) {
assert.Equal(t, nil, err)
heartBeatChecker := &HeartBeatChecker{}
resp, err := heartBeatChecker.Heartbeat(context.Background(), &pb.HeartbeatRequest{
- ServiceId: instance1.InstanceInfo.ServiceId,
- InstanceId: instance1.InstanceInfo.InstanceId,
+ ServiceId: instance1.Instance.ServiceId,
+ InstanceId: instance1.Instance.InstanceId,
})
assert.Nil(t, err)
assert.Equal(t, pb.ResponseSuccess, resp.Response.GetCode())
filter := bson.M{
- mongo.StringBuilder([]string{mongo.ColumnInstanceInfo, mongo.ColumnInstanceID}): instance1.InstanceInfo.InstanceId,
+ mongo.StringBuilder([]string{mongo.ColumnInstance, mongo.ColumnInstanceID}): instance1.Instance.InstanceId,
}
_, err = client.GetMongoClient().Delete(context.Background(), mongo.CollectionInstance, filter)
assert.Nil(t, err)
diff --git a/datasource/mongo/heartbeat/manager_test.go b/datasource/mongo/heartbeat/manager_test.go
index 607c3a4..4ef38dd 100644
--- a/datasource/mongo/heartbeat/manager_test.go
+++ b/datasource/mongo/heartbeat/manager_test.go
@@ -23,7 +23,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/apache/servicecomb-service-center/datasource/mongo/heartbeat"
- _ "github.com/apache/servicecomb-service-center/datasource/mongo/heartbeat/heartbeatchecker"
+ _ "github.com/apache/servicecomb-service-center/datasource/mongo/heartbeat/checker"
)
func TestInit(t *testing.T) {
@@ -33,7 +33,7 @@ func TestInit(t *testing.T) {
assert.Error(t, err)
})
t.Run("install and init heartbeat plugin, should pass", func(t *testing.T) {
- pluginName := heartbeat.ImplName("heartbeatchecker")
+ pluginName := heartbeat.ImplName("checker")
err := heartbeat.Init(heartbeat.Options{
PluginImplName: pluginName,
})
diff --git a/datasource/mongo/mongo.go b/datasource/mongo/mongo.go
index 2248110..4ce4cdc 100644
--- a/datasource/mongo/mongo.go
+++ b/datasource/mongo/mongo.go
@@ -79,7 +79,7 @@ func (ds *DataSource) initialize() error {
}
func (ds *DataSource) initPlugins() error {
- kind := config.GetString("registry.heartbeat.kind", "heartbeatchecker", config.WithStandby("heartbeat_plugin"))
+ kind := config.GetString("registry.mongo.heartbeat.kind", "cache")
err := heartbeat.Init(heartbeat.Options{PluginImplName: heartbeat.ImplName(kind)})
if err != nil {
log.Fatal("heartbeat init failed", err)
@@ -100,16 +100,16 @@ func (ds *DataSource) initClient() error {
}
}
-//{Key: StringBuilder([]string{ColumnServiceInfo, ColumnAlias}), Value: bsonx.Int32(1)}
+//{Key: StringBuilder([]string{ColumnService, ColumnAlias}), Value: bsonx.Int32(1)}
func (ds *DataSource) createIndexes() (err error) {
err = client.GetMongoClient().CreateIndexes(context.TODO(), CollectionService, []mongo.IndexModel{{
- Keys: bsonx.Doc{{Key: StringBuilder([]string{ColumnServiceInfo, ColumnServiceID}), Value: bsonx.Int32(1)}},
+ Keys: bsonx.Doc{{Key: StringBuilder([]string{ColumnService, ColumnServiceID}), Value: bsonx.Int32(1)}},
Options: options.Index().SetUnique(true),
}, {
- Keys: bsonx.Doc{{Key: StringBuilder([]string{ColumnServiceInfo, ColumnAppID}), Value: bsonx.Int32(1)},
- {Key: StringBuilder([]string{ColumnServiceInfo, ColumnServiceName}), Value: bsonx.Int32(1)},
- {Key: StringBuilder([]string{ColumnServiceInfo, ColumnEnv}), Value: bsonx.Int32(1)},
- {Key: StringBuilder([]string{ColumnServiceInfo, ColumnVersion}), Value: bsonx.Int32(1)},
+ Keys: bsonx.Doc{{Key: StringBuilder([]string{ColumnService, ColumnAppID}), Value: bsonx.Int32(1)},
+ {Key: StringBuilder([]string{ColumnService, ColumnServiceName}), Value: bsonx.Int32(1)},
+ {Key: StringBuilder([]string{ColumnService, ColumnEnv}), Value: bsonx.Int32(1)},
+ {Key: StringBuilder([]string{ColumnService, ColumnVersion}), Value: bsonx.Int32(1)},
{Key: ColumnDomain, Value: bsonx.Int32(1)},
{Key: ColumnProject, Value: bsonx.Int32(1)},
},
@@ -119,7 +119,7 @@ func (ds *DataSource) createIndexes() (err error) {
return
}
err = client.GetMongoClient().CreateIndexes(context.TODO(), CollectionInstance, []mongo.IndexModel{{
- Keys: bsonx.Doc{{Key: StringBuilder([]string{ColumnInstanceInfo, ColumnInstanceID}), Value: bsonx.Int32(1)}},
+ Keys: bsonx.Doc{{Key: StringBuilder([]string{ColumnInstance, ColumnInstanceID}), Value: bsonx.Int32(1)}},
Options: options.Index().SetUnique(true),
}, {
Keys: bsonx.Doc{{Key: StringBuilder([]string{ColumnInstanceID, ColumnServiceID}), Value: bsonx.Int32(1)}},
diff --git a/datasource/mongo/ms.go b/datasource/mongo/ms.go
index 0a769ee..eb9cad3 100644
--- a/datasource/mongo/ms.go
+++ b/datasource/mongo/ms.go
@@ -94,7 +94,7 @@ func (ds *DataSource) RegisterService(ctx context.Context, request *discovery.Cr
}, nil
}
}
- insertRes, err := client.GetMongoClient().Insert(ctx, CollectionService, &Service{Domain: domain, Project: project, ServiceInfo: service})
+ insertRes, err := client.GetMongoClient().Insert(ctx, CollectionService, &Service{Domain: domain, Project: project, Service: service})
if err != nil {
if client.IsDuplicateKey(err) {
serviceIDInner, err := GetServiceID(ctx, &discovery.MicroServiceKey{
@@ -163,7 +163,7 @@ func (ds *DataSource) GetApplications(ctx context.Context, request *discovery.Ge
filter := bson.M{
ColumnDomain: domain,
ColumnProject: project,
- StringBuilder([]string{ColumnServiceInfo, ColumnEnv}): request.Environment}
+ StringBuilder([]string{ColumnService, ColumnEnv}): request.Environment}
services, err := GetServices(ctx, filter)
if err != nil {
@@ -212,7 +212,7 @@ func (ds *DataSource) GetService(ctx context.Context, request *discovery.GetServ
}
return &discovery.GetServiceResponse{
Response: discovery.CreateResponse(discovery.ResponseSuccess, "Get service successfully."),
- Service: svc.ServiceInfo,
+ Service: svc.Service,
}, nil
}
@@ -304,7 +304,7 @@ func (ds *DataSource) DelServicePri(ctx context.Context, serviceID string, force
}
// 强制删除,则与该服务相关的信息删除,非强制删除: 如果作为该被依赖(作为provider,提供服务,且不是只存在自依赖)或者存在实例,则不能删除
if !force {
- dr := NewProviderDependencyRelation(ctx, util.ParseDomainProject(ctx), microservice.ServiceInfo)
+ dr := NewProviderDependencyRelation(ctx, util.ParseDomainProject(ctx), microservice.Service)
services, err := dr.GetDependencyConsumerIds()
if err != nil {
log.Error(fmt.Sprintf("delete micro-service[%s] failed, get service dependency failed, operator: %s",
@@ -317,7 +317,7 @@ func (ds *DataSource) DelServicePri(ctx context.Context, serviceID string, force
return discovery.CreateResponse(discovery.ErrDependedOnConsumer, "Can not delete this service, other service rely it."), err
}
//todo wait for dep interface
- instancesExist, err := client.GetMongoClient().DocExist(ctx, CollectionInstance, bson.M{StringBuilder([]string{ColumnInstanceInfo, ColumnServiceID}): serviceID})
+ instancesExist, err := client.GetMongoClient().DocExist(ctx, CollectionInstance, bson.M{StringBuilder([]string{ColumnInstance, ColumnServiceID}): serviceID})
if err != nil {
log.Error(fmt.Sprintf("delete micro-service[%s] failed, get instances number failed, operator: %s",
serviceID, remoteIP), err)
@@ -333,8 +333,8 @@ func (ds *DataSource) DelServicePri(ctx context.Context, serviceID string, force
schemaOps := client.MongoOperation{Table: CollectionSchema, Models: []mongo.WriteModel{mongo.NewDeleteManyModel().SetFilter(bson.M{ColumnServiceID: serviceID})}}
rulesOps := client.MongoOperation{Table: CollectionRule, Models: []mongo.WriteModel{mongo.NewDeleteManyModel().SetFilter(bson.M{ColumnServiceID: serviceID})}}
- instanceOps := client.MongoOperation{Table: CollectionInstance, Models: []mongo.WriteModel{mongo.NewDeleteManyModel().SetFilter(bson.M{StringBuilder([]string{ColumnInstanceInfo, ColumnServiceID}): serviceID})}}
- serviceOps := client.MongoOperation{Table: CollectionService, Models: []mongo.WriteModel{mongo.NewDeleteOneModel().SetFilter(bson.M{StringBuilder([]string{ColumnServiceInfo, ColumnServiceID}): serviceID})}}
+ instanceOps := client.MongoOperation{Table: CollectionInstance, Models: []mongo.WriteModel{mongo.NewDeleteManyModel().SetFilter(bson.M{StringBuilder([]string{ColumnInstance, ColumnServiceID}): serviceID})}}
+ serviceOps := client.MongoOperation{Table: CollectionService, Models: []mongo.WriteModel{mongo.NewDeleteOneModel().SetFilter(bson.M{StringBuilder([]string{ColumnService, ColumnServiceID}): serviceID})}}
err = client.GetMongoClient().MultiTableBatchUpdate(ctx, []client.MongoOperation{schemaOps, rulesOps, instanceOps, serviceOps})
if err != nil {
@@ -345,11 +345,11 @@ func (ds *DataSource) DelServicePri(ctx context.Context, serviceID string, force
domainProject := util.ToDomainProject(microservice.Domain, microservice.Project)
serviceKey := &discovery.MicroServiceKey{
Tenant: domainProject,
- Environment: microservice.ServiceInfo.Environment,
- AppId: microservice.ServiceInfo.AppId,
- ServiceName: microservice.ServiceInfo.ServiceName,
- Version: microservice.ServiceInfo.Version,
- Alias: microservice.ServiceInfo.Alias,
+ Environment: microservice.Service.Environment,
+ AppId: microservice.Service.AppId,
+ ServiceName: microservice.Service.ServiceName,
+ Version: microservice.Service.Version,
+ Alias: microservice.Service.Alias,
}
err = DeleteDependencyForDeleteService(domainProject, serviceID, serviceKey)
@@ -365,8 +365,8 @@ func (ds *DataSource) UpdateService(ctx context.Context, request *discovery.Upda
*discovery.UpdateServicePropsResponse, error) {
updateData := bson.M{
"$set": bson.M{
- StringBuilder([]string{ColumnServiceInfo, ColumnModTime}): strconv.FormatInt(time.Now().Unix(), 10),
- StringBuilder([]string{ColumnServiceInfo, ColumnProperty}): request.Properties}}
+ StringBuilder([]string{ColumnService, ColumnModTime}): strconv.FormatInt(time.Now().Unix(), 10),
+ StringBuilder([]string{ColumnService, ColumnProperty}): request.Properties}}
err := UpdateService(ctx, GeneratorServiceFilter(ctx, request.ServiceId), updateData)
if err != nil {
log.Error(fmt.Sprintf("update service %s properties failed, update mongo failed", request.ServiceId), err)
@@ -410,7 +410,7 @@ func (ds *DataSource) GetServiceDetail(ctx context.Context, request *discovery.G
Response: discovery.CreateResponse(discovery.ErrInternal, err.Error()),
}, err
}
- svc := mgSvc.ServiceInfo
+ svc := mgSvc.Service
key := &discovery.MicroServiceKey{
Environment: svc.Environment,
AppId: svc.AppId,
@@ -481,14 +481,14 @@ func (ds *DataSource) GetServicesInfo(ctx context.Context, request *discovery.Ge
allServiceDetails := make([]*discovery.ServiceDetail, 0, len(services))
domainProject := util.ParseDomainProject(ctx)
for _, mgSvc := range services {
- if !request.WithShared && apt.IsGlobal(discovery.MicroServiceToKey(domainProject, mgSvc.ServiceInfo)) {
+ if !request.WithShared && apt.IsGlobal(discovery.MicroServiceToKey(domainProject, mgSvc.Service)) {
continue
}
if len(request.AppId) > 0 {
- if request.AppId != mgSvc.ServiceInfo.AppId {
+ if request.AppId != mgSvc.Service.AppId {
continue
}
- if len(request.ServiceName) > 0 && request.ServiceName != mgSvc.ServiceInfo.ServiceName {
+ if len(request.ServiceName) > 0 && request.ServiceName != mgSvc.Service.ServiceName {
continue
}
}
@@ -499,7 +499,7 @@ func (ds *DataSource) GetServicesInfo(ctx context.Context, request *discovery.Ge
Response: discovery.CreateResponse(discovery.ErrInternal, err.Error()),
}, err
}
- serviceDetail.MicroService = mgSvc.ServiceInfo
+ serviceDetail.MicroService = mgSvc.Service
allServiceDetails = append(allServiceDetails, serviceDetail)
}
@@ -654,7 +654,7 @@ func (ds *DataSource) GetSchema(ctx context.Context, request *discovery.GetSchem
}
return &discovery.GetSchemaResponse{
Response: discovery.CreateResponse(discovery.ResponseSuccess, "Get schema info successfully."),
- Schema: schema.SchemaInfo,
+ Schema: schema.Schema,
SchemaSummary: schema.SchemaSummary,
}, nil
}
@@ -673,7 +673,7 @@ func (ds *DataSource) GetAllSchemas(ctx context.Context, request *discovery.GetA
Response: discovery.CreateResponse(discovery.ErrInternal, err.Error()),
}, err
}
- schemasList := svc.ServiceInfo.Schemas
+ schemasList := svc.Service.Schemas
if len(schemasList) == 0 {
return &discovery.GetAllSchemaResponse{
Response: discovery.CreateResponse(discovery.ResponseSuccess, "Do not have this schema info."),
@@ -696,7 +696,7 @@ func (ds *DataSource) GetAllSchemas(ctx context.Context, request *discovery.GetA
}
tempSchema.Summary = schema.SchemaSummary
if request.WithSchema {
- tempSchema.Schema = schema.SchemaInfo
+ tempSchema.Schema = schema.Schema
}
schemas = append(schemas, tempSchema)
}
@@ -800,7 +800,7 @@ func (ds *DataSource) ModifySchemas(ctx context.Context, request *discovery.Modi
}
return &discovery.ModifySchemasResponse{Response: discovery.CreateResponse(discovery.ErrInternal, err.Error())}, err
}
- respErr := ds.modifySchemas(ctx, svc.ServiceInfo, request.Schemas)
+ respErr := ds.modifySchemas(ctx, svc.Service, request.Schemas)
if respErr != nil {
resp := &discovery.ModifySchemasResponse{
Response: discovery.CreateResponseWithSCErr(respErr),
@@ -842,7 +842,7 @@ func (ds *DataSource) modifySchemas(ctx context.Context, service *discovery.Micr
log.Error(fmt.Sprintf("modify service[%s] schemas failed, operator: %s", serviceID, remoteIP), errQuota)
return errQuota
}
- serviceOps = append(serviceOps, mongo.NewUpdateOneModel().SetUpdate(bson.M{"$set": bson.M{StringBuilder([]string{ColumnServiceInfo, ColumnSchemas}): nonExistSchemaIds}}).SetFilter(GeneratorServiceFilter(ctx, serviceID)))
+ serviceOps = append(serviceOps, mongo.NewUpdateOneModel().SetUpdate(bson.M{"$set": bson.M{StringBuilder([]string{ColumnService, ColumnSchemas}): nonExistSchemaIds}}).SetFilter(GeneratorServiceFilter(ctx, serviceID)))
} else {
if len(nonExistSchemaIds) != 0 {
errInfo := fmt.Errorf("non-existent schemaIDs %v", nonExistSchemaIds)
@@ -855,7 +855,7 @@ func (ds *DataSource) modifySchemas(ctx context.Context, service *discovery.Micr
return discovery.NewError(discovery.ErrInternal, err.Error())
}
if !exist {
- schemasOps = append(schemasOps, mongo.NewUpdateOneModel().SetFilter(GeneratorSchemaFilter(ctx, serviceID, needUpdateSchema.SchemaId)).SetUpdate(bson.M{"$set": bson.M{ColumnSchemaInfo: needUpdateSchema.Schema, ColumnSchemaSummary: needUpdateSchema.Summary}}))
+ schemasOps = append(schemasOps, mongo.NewUpdateOneModel().SetFilter(GeneratorSchemaFilter(ctx, serviceID, needUpdateSchema.SchemaId)).SetUpdate(bson.M{"$set": bson.M{ColumnSchema: needUpdateSchema.Schema, ColumnSchemaSummary: needUpdateSchema.Summary}}))
} else {
log.Warn(fmt.Sprintf("schema[%s/%s] and it's summary already exist, skip to update, operator: %s",
serviceID, needUpdateSchema.SchemaId, remoteIP))
@@ -870,7 +870,7 @@ func (ds *DataSource) modifySchemas(ctx context.Context, service *discovery.Micr
Project: project,
ServiceID: serviceID,
SchemaID: schema.SchemaId,
- SchemaInfo: schema.Schema,
+ Schema: schema.Schema,
SchemaSummary: schema.Summary,
}))
}
@@ -893,7 +893,7 @@ func (ds *DataSource) modifySchemas(ctx context.Context, service *discovery.Micr
Project: project,
ServiceID: serviceID,
SchemaID: schema.SchemaId,
- SchemaInfo: schema.Schema,
+ Schema: schema.Schema,
SchemaSummary: schema.Summary,
}))
schemaIDs = append(schemaIDs, schema.SchemaId)
@@ -901,7 +901,7 @@ func (ds *DataSource) modifySchemas(ctx context.Context, service *discovery.Micr
for _, schema := range needUpdateSchemas {
log.Info(fmt.Sprintf("update schema[%s/%s], operator: %s", serviceID, schema.SchemaId, remoteIP))
- schemasOps = append(schemasOps, mongo.NewUpdateOneModel().SetFilter(GeneratorSchemaFilter(ctx, serviceID, schema.SchemaId)).SetUpdate(bson.M{"$set": bson.M{ColumnSchemaInfo: schema.Schema, ColumnSchemaSummary: schema.Summary}}))
+ schemasOps = append(schemasOps, mongo.NewUpdateOneModel().SetFilter(GeneratorSchemaFilter(ctx, serviceID, schema.SchemaId)).SetUpdate(bson.M{"$set": bson.M{ColumnSchema: schema.Schema, ColumnSchemaSummary: schema.Summary}}))
schemaIDs = append(schemaIDs, schema.SchemaId)
}
@@ -910,7 +910,7 @@ func (ds *DataSource) modifySchemas(ctx context.Context, service *discovery.Micr
schemasOps = append(schemasOps, mongo.NewDeleteOneModel().SetFilter(GeneratorSchemaFilter(ctx, serviceID, schema.SchemaId)))
}
- serviceOps = append(serviceOps, mongo.NewUpdateOneModel().SetUpdate(bson.M{"$set": bson.M{StringBuilder([]string{ColumnServiceInfo, ColumnSchemas}): schemaIDs}}).SetFilter(GeneratorServiceFilter(ctx, serviceID)))
+ serviceOps = append(serviceOps, mongo.NewUpdateOneModel().SetUpdate(bson.M{"$set": bson.M{StringBuilder([]string{ColumnService, ColumnSchemas}): schemaIDs}}).SetFilter(GeneratorServiceFilter(ctx, serviceID)))
}
if len(schemasOps) > 0 {
_, err = client.GetMongoClient().BatchUpdate(ctx, CollectionSchema, schemasOps)
@@ -941,7 +941,7 @@ func (ds *DataSource) modifySchema(ctx context.Context, serviceID string, schema
}
return discovery.NewError(discovery.ErrInternal, err.Error())
}
- microservice := svc.ServiceInfo
+ microservice := svc.Service
var isExist bool
for _, sid := range microservice.Schemas {
if sid == schema.SchemaId {
@@ -982,13 +982,13 @@ func (ds *DataSource) modifySchema(ctx context.Context, serviceID string, schema
}
}
if len(newSchemas) != 0 {
- updateData := bson.M{StringBuilder([]string{ColumnServiceInfo, ColumnSchemas}): newSchemas}
+ updateData := bson.M{StringBuilder([]string{ColumnService, ColumnSchemas}): newSchemas}
err = UpdateService(ctx, GeneratorServiceFilter(ctx, serviceID), bson.M{"$set": updateData})
if err != nil {
return discovery.NewError(discovery.ErrInternal, err.Error())
}
}
- newSchema := bson.M{"$set": bson.M{ColumnSchemaInfo: schema.Schema, ColumnSchemaSummary: schema.Summary}}
+ newSchema := bson.M{"$set": bson.M{ColumnSchema: schema.Schema, ColumnSchemaSummary: schema.Summary}}
err = UpdateSchema(ctx, GeneratorSchemaFilter(ctx, serviceID, schema.SchemaId), newSchema, options.FindOneAndUpdate().SetUpsert(true))
if err != nil {
return discovery.NewError(discovery.ErrInternal, err.Error())
@@ -1055,7 +1055,7 @@ func (ds *DataSource) AddRule(ctx context.Context, request *discovery.AddService
Domain: util.ParseDomain(ctx),
Project: util.ParseProject(ctx),
ServiceID: request.ServiceId,
- RuleInfo: &discovery.ServiceRule{
+ Rule: &discovery.ServiceRule{
RuleId: util.GenerateUUID(),
RuleType: rule.RuleType,
Attribute: rule.Attribute,
@@ -1065,7 +1065,7 @@ func (ds *DataSource) AddRule(ctx context.Context, request *discovery.AddService
ModTimestamp: timestamp,
},
}
- ruleIDs = append(ruleIDs, ruleAdd.RuleInfo.RuleId)
+ ruleIDs = append(ruleIDs, ruleAdd.Rule.RuleId)
_, err = client.GetMongoClient().Insert(ctx, CollectionRule, ruleAdd)
if err != nil {
return &discovery.AddServiceRulesResponse{
@@ -1180,11 +1180,11 @@ func (ds *DataSource) UpdateRule(ctx context.Context, request *discovery.UpdateS
}
newRule := bson.M{
- StringBuilder([]string{ColumnRuleInfo, ColumnRuleType}): request.Rule.RuleType,
- StringBuilder([]string{ColumnRuleInfo, ColumnPattern}): request.Rule.Pattern,
- StringBuilder([]string{ColumnRuleInfo, ColumnAttribute}): request.Rule.Attribute,
- StringBuilder([]string{ColumnRuleInfo, ColumnDescription}): request.Rule.Description,
- StringBuilder([]string{ColumnRuleInfo, ColumnModTime}): strconv.FormatInt(time.Now().Unix(), 10)}
+ StringBuilder([]string{ColumnRule, ColumnRuleType}): request.Rule.RuleType,
+ StringBuilder([]string{ColumnRule, ColumnPattern}): request.Rule.Pattern,
+ StringBuilder([]string{ColumnRule, ColumnAttribute}): request.Rule.Attribute,
+ StringBuilder([]string{ColumnRule, ColumnDescription}): request.Rule.Description,
+ StringBuilder([]string{ColumnRule, ColumnModTime}): strconv.FormatInt(time.Now().Unix(), 10)}
err = UpdateRule(ctx, GeneratorRuleFilter(ctx, request.ServiceId, request.RuleId), bson.M{"$set": newRule})
if err != nil {
@@ -1240,7 +1240,7 @@ func GetServices(ctx context.Context, filter bson.M) ([]*discovery.MicroService,
if err != nil {
return nil, err
}
- services = append(services, tmp.ServiceInfo)
+ services = append(services, tmp.Service)
}
return services, nil
}
@@ -1274,14 +1274,14 @@ func GetServicesVersions(ctx context.Context, filter interface{}) ([]string, err
if err != nil {
return nil, err
}
- versions = append(versions, tmp.ServiceInfo.Version)
+ versions = append(versions, tmp.Service.Version)
}
return versions, nil
}
func getServiceDetailUtil(ctx context.Context, mgs *Service, countOnly bool, options []string) (*discovery.ServiceDetail, error) {
serviceDetail := new(discovery.ServiceDetail)
- serviceID := mgs.ServiceInfo.ServiceId
+ serviceID := mgs.Service.ServiceId
domainProject := util.ParseDomainProject(ctx)
if countOnly {
serviceDetail.Statics = new(discovery.Statistics)
@@ -1292,9 +1292,9 @@ func getServiceDetailUtil(ctx context.Context, mgs *Service, countOnly bool, opt
case "tags":
serviceDetail.Tags = mgs.Tags
case "rules":
- rules, err := GetRules(ctx, mgs.ServiceInfo.ServiceId)
+ rules, err := GetRules(ctx, mgs.Service.ServiceId)
if err != nil {
- log.Error(fmt.Sprintf("get service %s's all rules failed", mgs.ServiceInfo.ServiceId), err)
+ log.Error(fmt.Sprintf("get service %s's all rules failed", mgs.Service.ServiceId), err)
return nil, err
}
for _, rule := range rules {
@@ -1320,14 +1320,14 @@ func getServiceDetailUtil(ctx context.Context, mgs *Service, countOnly bool, opt
}
serviceDetail.Instances = instances
case "schemas":
- schemas, err := GetSchemas(ctx, GeneratorServiceFilter(ctx, mgs.ServiceInfo.ServiceId))
+ schemas, err := GetSchemas(ctx, GeneratorServiceFilter(ctx, mgs.Service.ServiceId))
if err != nil {
- log.Error(fmt.Sprintf("get service %s's all schemas failed", mgs.ServiceInfo.ServiceId), err)
+ log.Error(fmt.Sprintf("get service %s's all schemas failed", mgs.Service.ServiceId), err)
return nil, err
}
serviceDetail.SchemaInfos = schemas
case "dependencies":
- service := mgs.ServiceInfo
+ service := mgs.Service
dr := NewDependencyRelation(ctx, domainProject, service, service)
consumers, err := dr.GetDependencyConsumers(WithoutSelfDependency(), WithSameDomainProject())
if err != nil {
@@ -1371,7 +1371,7 @@ func GetRules(ctx context.Context, serviceID string) ([]*discovery.ServiceRule,
if err != nil {
return nil, err
}
- rules = append(rules, tmpRule.RuleInfo)
+ rules = append(rules, tmpRule.Rule)
}
return rules, nil
}
@@ -1410,7 +1410,7 @@ func GeneratorServiceFilter(ctx context.Context, serviceID string) bson.M {
return bson.M{
ColumnDomain: domain,
ColumnProject: project,
- StringBuilder([]string{ColumnServiceInfo, ColumnServiceID}): serviceID}
+ StringBuilder([]string{ColumnService, ColumnServiceID}): serviceID}
}
func GeneratorServiceNameFilter(ctx context.Context, service *discovery.MicroServiceKey) bson.M {
@@ -1420,10 +1420,10 @@ func GeneratorServiceNameFilter(ctx context.Context, service *discovery.MicroSer
return bson.M{
ColumnDomain: domain,
ColumnProject: project,
- StringBuilder([]string{ColumnServiceInfo, ColumnEnv}): service.Environment,
- StringBuilder([]string{ColumnServiceInfo, ColumnAppID}): service.AppId,
- StringBuilder([]string{ColumnServiceInfo, ColumnServiceName}): service.ServiceName,
- StringBuilder([]string{ColumnServiceInfo, ColumnVersion}): service.Version}
+ StringBuilder([]string{ColumnService, ColumnEnv}): service.Environment,
+ StringBuilder([]string{ColumnService, ColumnAppID}): service.AppId,
+ StringBuilder([]string{ColumnService, ColumnServiceName}): service.ServiceName,
+ StringBuilder([]string{ColumnService, ColumnVersion}): service.Version}
}
func GeneratorServiceAliasFilter(ctx context.Context, service *discovery.MicroServiceKey) bson.M {
@@ -1433,17 +1433,17 @@ func GeneratorServiceAliasFilter(ctx context.Context, service *discovery.MicroSe
return bson.M{
ColumnDomain: domain,
ColumnProject: project,
- StringBuilder([]string{ColumnServiceInfo, ColumnEnv}): service.Environment,
- StringBuilder([]string{ColumnServiceInfo, ColumnAppID}): service.AppId,
- StringBuilder([]string{ColumnServiceInfo, ColumnAlias}): service.Alias,
- StringBuilder([]string{ColumnServiceInfo, ColumnVersion}): service.Version}
+ StringBuilder([]string{ColumnService, ColumnEnv}): service.Environment,
+ StringBuilder([]string{ColumnService, ColumnAppID}): service.AppId,
+ StringBuilder([]string{ColumnService, ColumnAlias}): service.Alias,
+ StringBuilder([]string{ColumnService, ColumnVersion}): service.Version}
}
func GeneratorRuleAttFilter(ctx context.Context, serviceID, attribute, pattern string) bson.M {
return bson.M{
ColumnServiceID: serviceID,
- StringBuilder([]string{ColumnRuleInfo, ColumnAttribute}): attribute,
- StringBuilder([]string{ColumnRuleInfo, ColumnPattern}): pattern}
+ StringBuilder([]string{ColumnRule, ColumnAttribute}): attribute,
+ StringBuilder([]string{ColumnRule, ColumnPattern}): pattern}
}
func GeneratorSchemaFilter(ctx context.Context, serviceID, schemaID string) bson.M {
@@ -1461,7 +1461,7 @@ func GeneratorRuleFilter(ctx context.Context, serviceID, ruleID string) bson.M {
ColumnDomain: domain,
ColumnProject: project,
ColumnServiceID: serviceID,
- StringBuilder([]string{ColumnRuleInfo, ColumnRuleID}): ruleID}
+ StringBuilder([]string{ColumnRule, ColumnRuleID}): ruleID}
}
func GetSchemas(ctx context.Context, filter bson.M) ([]*discovery.Schema, error) {
@@ -1479,7 +1479,7 @@ func GetSchemas(ctx context.Context, filter bson.M) ([]*discovery.Schema, error)
schemas = append(schemas, &discovery.Schema{
SchemaId: tmp.SchemaID,
Summary: tmp.SchemaSummary,
- Schema: tmp.SchemaInfo,
+ Schema: tmp.Schema,
})
}
return schemas, nil
@@ -1610,13 +1610,13 @@ func (ds *DataSource) GetInstance(ctx context.Context, request *discovery.GetOne
}
findFlag := func() string {
return fmt.Sprintf("consumer[%s][%s/%s/%s/%s] find provider[%s][%s/%s/%s/%s] instance[%s]",
- request.ConsumerServiceId, service.ServiceInfo.Environment, service.ServiceInfo.AppId, service.ServiceInfo.ServiceName, service.ServiceInfo.Version,
- provider.ServiceInfo.ServiceId, provider.ServiceInfo.Environment, provider.ServiceInfo.AppId, provider.ServiceInfo.ServiceName, provider.ServiceInfo.Version,
+ request.ConsumerServiceId, service.Service.Environment, service.Service.AppId, service.Service.ServiceName, service.Service.Version,
+ provider.Service.ServiceId, provider.Service.Environment, provider.Service.AppId, provider.Service.ServiceName, provider.Service.Version,
request.ProviderInstanceId)
}
domainProject := util.ParseDomainProject(ctx)
- services, err := findServices(ctx, discovery.MicroServiceToKey(domainProject, provider.ServiceInfo))
+ services, err := findServices(ctx, discovery.MicroServiceToKey(domainProject, provider.Service))
if err != nil {
log.Error(fmt.Sprintf("get instance failed %s", findFlag()), err)
return &discovery.GetOneInstanceResponse{
@@ -1714,8 +1714,8 @@ func (ds *DataSource) GetInstances(ctx context.Context, request *discovery.GetIn
findFlag := func() string {
return fmt.Sprintf("consumer[%s][%s/%s/%s/%s] find provider[%s][%s/%s/%s/%s] instances",
- request.ConsumerServiceId, service.ServiceInfo.Environment, service.ServiceInfo.AppId, service.ServiceInfo.ServiceName, service.ServiceInfo.Version,
- provider.ServiceInfo.ServiceId, provider.ServiceInfo.Environment, provider.ServiceInfo.AppId, provider.ServiceInfo.ServiceName, provider.ServiceInfo.Version)
+ request.ConsumerServiceId, service.Service.Environment, service.Service.AppId, service.Service.ServiceName, service.Service.Version,
+ provider.Service.ServiceId, provider.Service.Environment, provider.Service.AppId, provider.Service.ServiceName, provider.Service.Version)
}
rev, _ := ctx.Value(util.CtxRequestRevision).(string)
@@ -1752,7 +1752,7 @@ func (ds *DataSource) GetProviderInstances(ctx context.Context, request *discove
filter := bson.M{
ColumnDomain: domain,
ColumnProject: project,
- StringBuilder([]string{ColumnInstanceInfo, ColumnServiceID}): request.ProviderServiceId}
+ StringBuilder([]string{ColumnInstance, ColumnServiceID}): request.ProviderServiceId}
findRes, err := client.GetMongoClient().Find(ctx, CollectionInstance, filter)
if err != nil {
@@ -1763,7 +1763,7 @@ func (ds *DataSource) GetProviderInstances(ctx context.Context, request *discove
var mongoInstance Instance
err := findRes.Decode(&mongoInstance)
if err == nil {
- instances = append(instances, mongoInstance.InstanceInfo)
+ instances = append(instances, mongoInstance.Instance)
}
}
@@ -1793,7 +1793,7 @@ func (ds *DataSource) GetAllInstances(ctx context.Context, request *discovery.Ge
Response: discovery.CreateResponse(discovery.ErrInternal, err.Error()),
}, err
}
- resp.Instances = append(resp.Instances, instance.InstanceInfo)
+ resp.Instances = append(resp.Instances, instance.Instance)
}
return resp, nil
@@ -1811,7 +1811,7 @@ func (ds *DataSource) BatchGetProviderInstances(ctx context.Context, request *di
filter := bson.M{
ColumnDomain: domain,
ColumnProject: project,
- StringBuilder([]string{ColumnInstanceInfo, ColumnServiceID}): providerServiceID}
+ StringBuilder([]string{ColumnInstance, ColumnServiceID}): providerServiceID}
findRes, err := client.GetMongoClient().Find(ctx, CollectionInstance, filter)
if err != nil {
return instances, "", nil
@@ -1821,7 +1821,7 @@ func (ds *DataSource) BatchGetProviderInstances(ctx context.Context, request *di
var mongoInstance Instance
err := findRes.Decode(&mongoInstance)
if err == nil {
- instances = append(instances, mongoInstance.InstanceInfo)
+ instances = append(instances, mongoInstance.Instance)
}
}
}
@@ -1874,9 +1874,9 @@ func (ds *DataSource) UpdateInstanceStatus(ctx context.Context, request *discove
}
copyInstanceRef := *instance
- copyInstanceRef.InstanceInfo.Status = request.Status
+ copyInstanceRef.Instance.Status = request.Status
- if err := UpdateInstanceS(ctx, copyInstanceRef.InstanceInfo); err != nil {
+ if err := UpdateInstanceS(ctx, copyInstanceRef.Instance); err != nil {
log.Error(fmt.Sprintf("update instance %s status failed", updateStatusFlag), err)
resp := &discovery.UpdateInstanceStatusResponse{
Response: discovery.CreateResponseWithSCErr(err),
@@ -1911,10 +1911,10 @@ func (ds *DataSource) UpdateInstanceProperties(ctx context.Context, request *dis
}
copyInstanceRef := *instance
- copyInstanceRef.InstanceInfo.Properties = request.Properties
+ copyInstanceRef.Instance.Properties = request.Properties
// todo finish update instance
- if err := UpdateInstanceP(ctx, copyInstanceRef.InstanceInfo); err != nil {
+ if err := UpdateInstanceP(ctx, copyInstanceRef.Instance); err != nil {
log.Error(fmt.Sprintf("update instance %s properties failed", instanceFlag), err)
resp := &discovery.UpdateInstancePropsResponse{
Response: discovery.CreateResponseWithSCErr(err),
@@ -1943,8 +1943,8 @@ func (ds *DataSource) UnregisterInstance(ctx context.Context, request *discovery
filter := bson.M{
ColumnDomain: domain,
ColumnProject: project,
- StringBuilder([]string{ColumnInstanceInfo, ColumnServiceID}): serviceID,
- StringBuilder([]string{ColumnInstanceInfo, ColumnInstanceID}): instanceID}
+ StringBuilder([]string{ColumnInstance, ColumnServiceID}): serviceID,
+ StringBuilder([]string{ColumnInstance, ColumnInstanceID}): instanceID}
result, err := client.GetMongoClient().Delete(ctx, CollectionInstance, filter)
if err != nil || result.DeletedCount == 0 {
log.Error(fmt.Sprintf("unregister instance failed, instance %s, operator %s revoke instance failed", instanceFlag, remoteIP), err)
@@ -2063,10 +2063,10 @@ func registryInstance(ctx context.Context, request *discovery.RegisterInstanceRe
instance := request.Instance
instanceID := instance.InstanceId
data := &Instance{
- Domain: domain,
- Project: project,
- RefreshTime: time.Now(),
- InstanceInfo: instance,
+ Domain: domain,
+ Project: project,
+ RefreshTime: time.Now(),
+ Instance: instance,
}
instanceFlag := fmt.Sprintf("endpoints %v, host '%s', serviceID %s",
@@ -2139,7 +2139,7 @@ func (ds *DataSource) findSharedServiceInstance(ctx context.Context, request *di
func (ds *DataSource) findInstance(ctx context.Context, request *discovery.FindInstancesRequest, provider *discovery.MicroServiceKey, rev string) (*discovery.FindInstancesResponse, error) {
var err error
domainProject := util.ParseDomainProject(ctx)
- service := &Service{ServiceInfo: &discovery.MicroService{Environment: request.Environment}}
+ service := &Service{Service: &discovery.MicroService{Environment: request.Environment}}
if len(request.ConsumerServiceId) > 0 {
filter := GeneratorServiceFilter(ctx, request.ConsumerServiceId)
service, err = GetService(ctx, filter)
@@ -2158,7 +2158,7 @@ func (ds *DataSource) findInstance(ctx context.Context, request *discovery.FindI
Response: discovery.CreateResponse(discovery.ErrInternal, err.Error()),
}, err
}
- provider.Environment = service.ServiceInfo.Environment
+ provider.Environment = service.Service.Environment
}
// provider is not a shared micro-service,
@@ -2168,7 +2168,7 @@ func (ds *DataSource) findInstance(ctx context.Context, request *discovery.FindI
findFlag := func() string {
return fmt.Sprintf("Consumer[%s][%s/%s/%s/%s] find provider[%s/%s/%s/%s]",
- request.ConsumerServiceId, service.ServiceInfo.Environment, service.ServiceInfo.AppId, service.ServiceInfo.ServiceName, service.ServiceInfo.Version,
+ request.ConsumerServiceId, service.Service.Environment, service.Service.AppId, service.Service.ServiceName, service.Service.Version,
provider.Environment, provider.AppId, provider.ServiceName, provider.Version)
}
basicFilterServices, err := servicesBasicFilter(ctx, provider)
@@ -2208,7 +2208,7 @@ func (ds *DataSource) findInstance(ctx context.Context, request *discovery.FindI
return nil, err
}
if provider != nil {
- err = AddServiceVersionRule(ctx, domainProject, service.ServiceInfo, provider)
+ err = AddServiceVersionRule(ctx, domainProject, service.Service, provider)
} else {
mes := fmt.Errorf("%s failed, provider does not exist", findFlag())
log.Error("add service version rule failed", mes)
@@ -2245,7 +2245,7 @@ func (ds *DataSource) reshapeProviderKey(ctx context.Context, provider *discover
}
versionRule := provider.Version
- provider = discovery.MicroServiceToKey(provider.Tenant, providerService.ServiceInfo)
+ provider = discovery.MicroServiceToKey(provider.Tenant, providerService.Service)
provider.Version = versionRule
return provider, nil
}
@@ -2309,8 +2309,8 @@ func GetInstance(ctx context.Context, serviceID string, instanceID string) (*Ins
filter := bson.M{
ColumnDomain: domain,
ColumnProject: project,
- StringBuilder([]string{ColumnInstanceInfo, ColumnServiceID}): serviceID,
- StringBuilder([]string{ColumnInstanceInfo, ColumnInstanceID}): instanceID}
+ StringBuilder([]string{ColumnInstance, ColumnServiceID}): serviceID,
+ StringBuilder([]string{ColumnInstance, ColumnInstanceID}): instanceID}
findRes, err := client.GetMongoClient().FindOne(ctx, CollectionInstance, filter)
if err != nil {
return nil, err
@@ -2333,8 +2333,8 @@ func UpdateInstanceS(ctx context.Context, instance *discovery.MicroServiceInstan
filter := bson.M{
ColumnDomain: domain,
ColumnProject: project,
- StringBuilder([]string{ColumnInstanceInfo, ColumnServiceID}): instance.ServiceId,
- StringBuilder([]string{ColumnInstanceInfo, ColumnInstanceID}): instance.InstanceId}
+ StringBuilder([]string{ColumnInstance, ColumnServiceID}): instance.ServiceId,
+ StringBuilder([]string{ColumnInstance, ColumnInstanceID}): instance.InstanceId}
_, err := client.GetMongoClient().Update(ctx, CollectionInstance, filter, bson.M{"$set": bson.M{"instance.motTimestamp": strconv.FormatInt(time.Now().Unix(), 10), "instance.status": instance.Status}})
if err != nil {
return discovery.NewError(discovery.ErrUnavailableBackend, err.Error())
@@ -2348,8 +2348,8 @@ func UpdateInstanceP(ctx context.Context, instance *discovery.MicroServiceInstan
filter := bson.M{
ColumnDomain: domain,
ColumnProject: project,
- StringBuilder([]string{ColumnInstanceInfo, ColumnServiceID}): instance.ServiceId,
- StringBuilder([]string{ColumnInstanceInfo, ColumnInstanceID}): instance.InstanceId}
+ StringBuilder([]string{ColumnInstance, ColumnServiceID}): instance.ServiceId,
+ StringBuilder([]string{ColumnInstance, ColumnInstanceID}): instance.InstanceId}
_, err := client.GetMongoClient().Update(ctx, CollectionInstance, filter, bson.M{"$set": bson.M{"instance.motTimestamp": strconv.FormatInt(time.Now().Unix(), 10), "instance.properties": instance.Properties}})
if err != nil {
return discovery.NewError(discovery.ErrUnavailableBackend, err.Error())
@@ -2519,7 +2519,7 @@ func preProcessRegisterInstance(ctx context.Context, instance *discovery.MicroSe
if err != nil {
return discovery.NewError(discovery.ErrServiceNotExists, "invalid 'serviceID' in request body.")
}
- instance.Version = microservice.ServiceInfo.Version
+ instance.Version = microservice.Service.Version
return nil
}
@@ -2532,10 +2532,10 @@ func servicesBasicFilter(ctx context.Context, key *discovery.MicroServiceKey) ([
filter := bson.M{
ColumnDomain: tenant[0],
ColumnProject: tenant[1],
- StringBuilder([]string{ColumnServiceInfo, ColumnEnv}): key.Environment,
- StringBuilder([]string{ColumnServiceInfo, ColumnAppID}): key.AppId,
- StringBuilder([]string{ColumnServiceInfo, ColumnServiceName}): key.ServiceName,
- StringBuilder([]string{ColumnServiceInfo, ColumnAlias}): key.Alias,
+ StringBuilder([]string{ColumnService, ColumnEnv}): key.Environment,
+ StringBuilder([]string{ColumnService, ColumnAppID}): key.AppId,
+ StringBuilder([]string{ColumnService, ColumnServiceName}): key.ServiceName,
+ StringBuilder([]string{ColumnService, ColumnAlias}): key.Alias,
}
rangeIdx := strings.Index(key.Version, "-")
// if the version number is clear, need to add the version number to query
@@ -2547,7 +2547,7 @@ func servicesBasicFilter(ctx context.Context, key *discovery.MicroServiceKey) ([
case rangeIdx > 0:
return servicesFilter(ctx, filter)
default:
- filter[StringBuilder([]string{ColumnServiceInfo, ColumnVersion})] = key.Version
+ filter[StringBuilder([]string{ColumnService, ColumnVersion})] = key.Version
return servicesFilter(ctx, filter)
}
}
@@ -2561,25 +2561,25 @@ func findServices(ctx context.Context, key *discovery.MicroServiceKey) ([]*Servi
filter := bson.M{
ColumnDomain: tenant[0],
ColumnProject: tenant[1],
- StringBuilder([]string{ColumnServiceInfo, ColumnEnv}): key.Environment,
- StringBuilder([]string{ColumnServiceInfo, ColumnAppID}): key.AppId,
- StringBuilder([]string{ColumnServiceInfo, ColumnServiceName}): key.ServiceName,
- StringBuilder([]string{ColumnServiceInfo, ColumnAlias}): key.Alias,
+ StringBuilder([]string{ColumnService, ColumnEnv}): key.Environment,
+ StringBuilder([]string{ColumnService, ColumnAppID}): key.AppId,
+ StringBuilder([]string{ColumnService, ColumnServiceName}): key.ServiceName,
+ StringBuilder([]string{ColumnService, ColumnAlias}): key.Alias,
}
switch {
case key.Version == "latest":
return latestServicesFilter(ctx, filter)
case len(key.Version) > 0 && key.Version[len(key.Version)-1:] == "+":
start := key.Version[:len(key.Version)-1]
- filter[StringBuilder([]string{ColumnServiceInfo, ColumnVersion})] = bson.M{"$gte": start}
+ filter[StringBuilder([]string{ColumnService, ColumnVersion})] = bson.M{"$gte": start}
return servicesFilter(ctx, filter)
case rangeIdx > 0:
start := key.Version[:rangeIdx]
end := key.Version[rangeIdx+1:]
- filter[StringBuilder([]string{ColumnServiceInfo, ColumnVersion})] = bson.M{"$gte": start, "$lte": end}
+ filter[StringBuilder([]string{ColumnService, ColumnVersion})] = bson.M{"$gte": start, "$lte": end}
return servicesFilter(ctx, filter)
default:
- filter[StringBuilder([]string{ColumnServiceInfo, ColumnVersion})] = key.Version
+ filter[StringBuilder([]string{ColumnService, ColumnVersion})] = key.Version
return servicesFilter(ctx, filter)
}
}
@@ -2589,8 +2589,8 @@ func instancesFilter(ctx context.Context, serviceIDs []string) ([]*discovery.Mic
if len(serviceIDs) == 0 {
return instances, nil
}
- resp, err := client.GetMongoClient().Find(ctx, CollectionInstance, bson.M{StringBuilder([]string{ColumnInstanceInfo, ColumnServiceID}): bson.M{"$in": serviceIDs}}, &options.FindOptions{
- Sort: bson.M{StringBuilder([]string{ColumnInstanceInfo, ColumnVersion}): -1}})
+ resp, err := client.GetMongoClient().Find(ctx, CollectionInstance, bson.M{StringBuilder([]string{ColumnInstance, ColumnServiceID}): bson.M{"$in": serviceIDs}}, &options.FindOptions{
+ Sort: bson.M{StringBuilder([]string{ColumnInstance, ColumnVersion}): -1}})
if err != nil {
return nil, err
}
@@ -2603,7 +2603,7 @@ func instancesFilter(ctx context.Context, serviceIDs []string) ([]*discovery.Mic
if err != nil {
return nil, err
}
- instances = append(instances, instance.InstanceInfo)
+ instances = append(instances, instance.Instance)
}
return instances, nil
}
@@ -2617,7 +2617,7 @@ func filterServiceIDs(ctx context.Context, consumerID string, tags []string, ser
filterService = tagsFilter(services, tags)
filterService = accessibleFilter(ctx, consumerID, filterService)
for _, service := range filterService {
- serviceIDs = append(serviceIDs, service.ServiceInfo.ServiceId)
+ serviceIDs = append(serviceIDs, service.Service.ServiceId)
}
return serviceIDs
}
@@ -2644,9 +2644,9 @@ func tagsFilter(services []*Service, tags []string) []*Service {
func accessibleFilter(ctx context.Context, consumerID string, services []*Service) []*Service {
var newServices []*Service
for _, service := range services {
- if err := accessible(ctx, consumerID, service.ServiceInfo.ServiceId); err != nil {
+ if err := accessible(ctx, consumerID, service.Service.ServiceId); err != nil {
findFlag := fmt.Sprintf("consumer '%s' find provider %s/%s/%s", consumerID,
- service.ServiceInfo.AppId, service.ServiceInfo.ServiceName, service.ServiceInfo.Version)
+ service.Service.AppId, service.Service.ServiceName, service.Service.Version)
log.Error(fmt.Sprintf("accessible filter failed, %s", findFlag), err)
continue
}
@@ -2678,7 +2678,7 @@ func servicesFilter(ctx context.Context, filter bson.M) ([]*Service, error) {
func latestServicesFilter(ctx context.Context, filter bson.M) ([]*Service, error) {
resp, err := client.GetMongoClient().Find(ctx, CollectionService, filter, &options.FindOptions{
- Sort: bson.M{StringBuilder([]string{ColumnServiceInfo, ColumnVersion}): -1}})
+ Sort: bson.M{StringBuilder([]string{ColumnService, ColumnVersion}): -1}})
if err != nil {
return nil, err
}
@@ -2705,7 +2705,7 @@ func getTags(ctx context.Context, domain string, project string, serviceID strin
filter := bson.M{
ColumnDomain: domain,
ColumnProject: project,
- StringBuilder([]string{ColumnServiceInfo, ColumnServiceID}): serviceID,
+ StringBuilder([]string{ColumnService, ColumnServiceID}): serviceID,
}
result, err := client.GetMongoClient().FindOne(ctx, CollectionService, filter)
if err != nil {
@@ -2727,7 +2727,7 @@ func getService(ctx context.Context, domain string, project string, serviceID st
filter := bson.M{
ColumnDomain: domain,
ColumnProject: project,
- StringBuilder([]string{ColumnServiceInfo, ColumnServiceID}): serviceID,
+ StringBuilder([]string{ColumnService, ColumnServiceID}): serviceID,
}
result, err := client.GetMongoClient().FindOne(ctx, CollectionService, filter)
if err != nil {
@@ -2783,11 +2783,11 @@ func accessible(ctx context.Context, consumerID string, providerID string) *disc
return nil
}
- validateTags, err := getTags(ctx, consumerDomain, consumerProject, consumerService.ServiceInfo.ServiceId)
+ validateTags, err := getTags(ctx, consumerDomain, consumerProject, consumerService.Service.ServiceId)
if err != nil {
return discovery.NewError(discovery.ErrInternal, fmt.Sprintf("an error occurred in query consumer tags(%s)", err.Error()))
}
- return MatchRules(rules, consumerService.ServiceInfo, validateTags)
+ return MatchRules(rules, consumerService.Service, validateTags)
}
func MatchRules(rulesOfProvider []*Rule, consumer *discovery.MicroService, tagsOfConsumer map[string]string) *discovery.Error {
@@ -2798,7 +2798,7 @@ func MatchRules(rulesOfProvider []*Rule, consumer *discovery.MicroService, tagsO
if len(rulesOfProvider) <= 0 {
return nil
}
- if rulesOfProvider[0].RuleInfo.RuleType == "WHITE" {
+ if rulesOfProvider[0].Rule.RuleType == "WHITE" {
return patternWhiteList(rulesOfProvider, tagsOfConsumer, consumer)
}
return patternBlackList(rulesOfProvider, tagsOfConsumer, consumer)
@@ -2827,7 +2827,7 @@ func patternWhiteList(rulesOfProvider []*Rule, tagsOfConsumer map[string]string,
v := reflect.Indirect(reflect.ValueOf(consumer))
consumerID := consumer.ServiceId
for _, rule := range rulesOfProvider {
- value, err := parsePattern(v, rule.RuleInfo, tagsOfConsumer, consumerID)
+ value, err := parsePattern(v, rule.Rule, tagsOfConsumer, consumerID)
if err != nil {
return err
}
@@ -2835,11 +2835,11 @@ func patternWhiteList(rulesOfProvider []*Rule, tagsOfConsumer map[string]string,
continue
}
- match, _ := regexp.MatchString(rule.RuleInfo.Pattern, value)
+ match, _ := regexp.MatchString(rule.Rule.Pattern, value)
if match {
log.Info(fmt.Sprintf("consumer[%s][%s/%s/%s/%s] match white list, rule.Pattern is %s, value is %s",
consumerID, consumer.Environment, consumer.AppId, consumer.ServiceName, consumer.Version,
- rule.RuleInfo.Pattern, value))
+ rule.Rule.Pattern, value))
return nil
}
}
@@ -2851,7 +2851,7 @@ func patternBlackList(rulesOfProvider []*Rule, tagsOfConsumer map[string]string,
consumerID := consumer.ServiceId
for _, rule := range rulesOfProvider {
var value string
- value, err := parsePattern(v, rule.RuleInfo, tagsOfConsumer, consumerID)
+ value, err := parsePattern(v, rule.Rule, tagsOfConsumer, consumerID)
if err != nil {
return err
}
@@ -2859,11 +2859,11 @@ func patternBlackList(rulesOfProvider []*Rule, tagsOfConsumer map[string]string,
continue
}
- match, _ := regexp.MatchString(rule.RuleInfo.Pattern, value)
+ match, _ := regexp.MatchString(rule.Rule.Pattern, value)
if match {
log.Warn(fmt.Sprintf("no permission to access, consumer[%s][%s/%s/%s/%s] match black list, rule.Pattern is %s, value is %s",
consumerID, consumer.Environment, consumer.AppId, consumer.ServiceName, consumer.Version,
- rule.RuleInfo.Pattern, value))
+ rule.Rule.Pattern, value))
return discovery.NewError(discovery.ErrPermissionDeny, "found in black list")
}
}
@@ -2898,17 +2898,17 @@ func getRulesUtil(ctx context.Context, domain string, project string, serviceID
}
func allowAcrossDimension(ctx context.Context, providerService *Service, consumerService *Service) error {
- if providerService.ServiceInfo.AppId != consumerService.ServiceInfo.AppId {
- if len(providerService.ServiceInfo.Properties) == 0 {
+ if providerService.Service.AppId != consumerService.Service.AppId {
+ if len(providerService.Service.Properties) == 0 {
return fmt.Errorf("not allow across app access")
}
- if allowCrossApp, ok := providerService.ServiceInfo.Properties[discovery.PropAllowCrossApp]; !ok || strings.ToLower(allowCrossApp) != "true" {
+ if allowCrossApp, ok := providerService.Service.Properties[discovery.PropAllowCrossApp]; !ok || strings.ToLower(allowCrossApp) != "true" {
return fmt.Errorf("not allow across app access")
}
}
- if !apt.IsGlobal(discovery.MicroServiceToKey(util.ParseTargetDomainProject(ctx), providerService.ServiceInfo)) &&
- providerService.ServiceInfo.Environment != consumerService.ServiceInfo.Environment {
+ if !apt.IsGlobal(discovery.MicroServiceToKey(util.ParseTargetDomainProject(ctx), providerService.Service)) &&
+ providerService.Service.Environment != consumerService.Service.Environment {
return fmt.Errorf("not allow across environment access")
}
return nil
@@ -2936,7 +2936,7 @@ func GetAllInstancesOfOneService(ctx context.Context, serviceID string) ([]*disc
if err != nil {
return nil, err
}
- instances = append(instances, tmp.InstanceInfo)
+ instances = append(instances, tmp.Instance)
}
return instances, nil
}
@@ -2965,9 +2965,9 @@ func GeneratorServiceVersionsFilter(ctx context.Context, service *discovery.Micr
return bson.M{
ColumnDomain: domain,
ColumnProject: project,
- StringBuilder([]string{ColumnServiceInfo, ColumnEnv}): service.Environment,
- StringBuilder([]string{ColumnServiceInfo, ColumnAppID}): service.AppId,
- StringBuilder([]string{ColumnServiceInfo, ColumnServiceName}): service.ServiceName}
+ StringBuilder([]string{ColumnService, ColumnEnv}): service.Environment,
+ StringBuilder([]string{ColumnService, ColumnAppID}): service.AppId,
+ StringBuilder([]string{ColumnService, ColumnServiceName}): service.ServiceName}
}
func GeneratorServiceInstanceFilter(ctx context.Context, serviceID string) bson.M {
@@ -2976,7 +2976,7 @@ func GeneratorServiceInstanceFilter(ctx context.Context, serviceID string) bson.
return bson.M{
ColumnDomain: domain,
ColumnProject: project,
- StringBuilder([]string{ColumnInstanceInfo, ColumnServiceID}): serviceID}
+ StringBuilder([]string{ColumnInstance, ColumnServiceID}): serviceID}
}
func GetServiceByID(ctx context.Context, serviceID string) (*Service, error) {
@@ -2990,10 +2990,10 @@ func GetServiceByID(ctx context.Context, serviceID string) (*Service, error) {
func cacheToService(service sd.Service) *Service {
return &Service{
- Domain: service.Domain,
- Project: service.Project,
- Tags: service.Tags,
- ServiceInfo: service.ServiceInfo,
+ Domain: service.Domain,
+ Project: service.Project,
+ Tags: service.Tags,
+ Service: service.Service,
}
}
@@ -3007,7 +3007,7 @@ func GetInstancesByServiceID(ctx context.Context, serviceID string) ([]*discover
cacheUnavailable = true
break
}
- res = append(res, inst.InstanceInfo)
+ res = append(res, inst.Instance)
}
if cacheUnavailable || len(res) == 0 {
res, err := instancesFilter(ctx, []string{serviceID})
diff --git a/datasource/mongo/rule_util.go b/datasource/mongo/rule_util.go
index d7df917..640514e 100644
--- a/datasource/mongo/rule_util.go
+++ b/datasource/mongo/rule_util.go
@@ -40,7 +40,7 @@ func Filter(ctx context.Context, rules []*Rule, consumerID string) (bool, error)
if err != nil {
return false, err
}
- matchErr := MatchRules(rules, consumer.ServiceInfo, tags)
+ matchErr := MatchRules(rules, consumer.Service, tags)
if matchErr != nil {
if matchErr.Code == discovery.ErrPermissionDeny {
return false, nil
diff --git a/datasource/mongo/sd/listwatch_inner.go b/datasource/mongo/sd/listwatch_inner.go
index a95c103..94b3030 100644
--- a/datasource/mongo/sd/listwatch_inner.go
+++ b/datasource/mongo/sd/listwatch_inner.go
@@ -130,9 +130,9 @@ func (lw *mongoListWatch) doParseDocumentToResource(fullDocument bson.Raw) (reso
log.Error("error to parse bson raw to documentInfo", err)
return
}
- resource.Key = instance.InstanceInfo.InstanceId
+ resource.Key = instance.Instance.InstanceId
resource.Value = instance
- resource.Index = instance.InstanceInfo.ServiceId
+ resource.Index = instance.Instance.ServiceId
case service:
service := Service{}
err := bson.Unmarshal(fullDocument, &service)
@@ -140,9 +140,9 @@ func (lw *mongoListWatch) doParseDocumentToResource(fullDocument bson.Raw) (reso
log.Error("error to parse bson raw to documentInfo", err)
return
}
- resource.Key = service.ServiceInfo.ServiceId
+ resource.Key = service.Service.ServiceId
resource.Value = service
- resource.Index = util.StringJoin([]string{service.Domain, service.Project, service.ServiceInfo.ServiceName, service.ServiceInfo.Version, service.ServiceInfo.AppId, service.ServiceInfo.Environment}, "/")
+ resource.Index = util.StringJoin([]string{service.Domain, service.Project, service.Service.ServiceName, service.Service.Version, service.Service.AppId, service.Service.Environment}, "/")
default:
return
}
diff --git a/datasource/mongo/sd/listwatch_test.go b/datasource/mongo/sd/listwatch_test.go
index ad0a03a..2e5a8b0 100644
--- a/datasource/mongo/sd/listwatch_test.go
+++ b/datasource/mongo/sd/listwatch_test.go
@@ -27,10 +27,10 @@ func TestListWatchConfig_String(t *testing.T) {
func TestDoParseWatchRspToMongoInfo(t *testing.T) {
documentID := primitive.NewObjectID()
- mockDocument, _ := bson.Marshal(bson.M{"_id": documentID, "domain": "default", "project": "default", "instanceinfo": bson.M{"instanceid": "8064a600438511eb8584fa163e8a81c9", "serviceid": "91afbe0faa9dc1594689139f099eb293b0cd048d",
+ mockDocument, _ := bson.Marshal(bson.M{"_id": documentID, "domain": "default", "project": "default", "instance": bson.M{"instanceid": "8064a600438511eb8584fa163e8a81c9", "serviceid": "91afbe0faa9dc1594689139f099eb293b0cd048d",
"hostname": "ecs-hcsadlab-dev-0002", "status": "UP", "timestamp": "1608552622", "modtimestamp": "1608552622", "version": "0.0.1"}})
- mockServiceDocument, _ := bson.Marshal(bson.M{"_id": documentID, "domain": "default", "project": "default", "serviceinfo": bson.M{"serviceid": "91afbe0faa9dc1594689139f099eb293b0cd048d", "timestamp": "1608552622", "modtimestamp": "1608552622", "version": "0.0.1"}})
+ mockServiceDocument, _ := bson.Marshal(bson.M{"_id": documentID, "domain": "default", "project": "default", "service": bson.M{"serviceid": "91afbe0faa9dc1594689139f099eb293b0cd048d", "timestamp": "1608552622", "modtimestamp": "1608552622", "version": "0.0.1"}})
// case instance insertOp
@@ -50,7 +50,7 @@ func TestDoParseWatchRspToMongoInfo(t *testing.T) {
info = ilw.doParseWatchRspToResource(mockWatchRsp)
assert.Equal(t, documentID.Hex(), info.DocumentID)
assert.Equal(t, "8064a600438511eb8584fa163e8a81c9", info.Key)
- assert.Equal(t, "1608552622", info.Value.(Instance).InstanceInfo.ModTimestamp)
+ assert.Equal(t, "1608552622", info.Value.(Instance).Instance.ModTimestamp)
// case delete
mockWatchRsp.OperationType = deleteOp
diff --git a/datasource/mongo/sd/mongo_cacher.go b/datasource/mongo/sd/mongo_cacher.go
index 891f274..16cf67b 100644
--- a/datasource/mongo/sd/mongo_cacher.go
+++ b/datasource/mongo/sd/mongo_cacher.go
@@ -368,19 +368,19 @@ func (c *MongoCacher) isValueNotUpdated(value interface{}, newValue interface{})
case instance:
instance := value.(Instance)
newInstance := newValue.(Instance)
- if instance.InstanceInfo == nil || newInstance.InstanceInfo == nil {
+ if instance.Instance == nil || newInstance.Instance == nil {
return true
}
- modTime = instance.InstanceInfo.ModTimestamp
- newModTime = newInstance.InstanceInfo.ModTimestamp
+ modTime = instance.Instance.ModTimestamp
+ newModTime = newInstance.Instance.ModTimestamp
case service:
service := value.(Service)
newService := newValue.(Service)
- if service.ServiceInfo == nil || newService.ServiceInfo == nil {
+ if service.Service == nil || newService.Service == nil {
return true
}
- modTime = service.ServiceInfo.ModTimestamp
- newModTime = newService.ServiceInfo.ModTimestamp
+ modTime = service.Service.ModTimestamp
+ newModTime = newService.Service.ModTimestamp
}
if newModTime == "" || modTime == newModTime {
diff --git a/datasource/mongo/sd/mongo_cacher_test.go b/datasource/mongo/sd/mongo_cacher_test.go
index 86f571f..700f549 100644
--- a/datasource/mongo/sd/mongo_cacher_test.go
+++ b/datasource/mongo/sd/mongo_cacher_test.go
@@ -102,7 +102,7 @@ func TestNewMongoCacher(t *testing.T) {
var resources []*sdcommon.Resource
resource := &sdcommon.Resource{Key: mockResourceID, DocumentID: mockDocumentID, Value: Instance{Domain: "default", Project: "default",
- InstanceInfo: &pb.MicroServiceInstance{InstanceId: mockResourceID, ModTimestamp: "100000"}}}
+ Instance: &pb.MicroServiceInstance{InstanceId: mockResourceID, ModTimestamp: "100000"}}}
resources = append(resources, resource)
test := &sdcommon.ListWatchResp{
Action: sdcommon.ActionCreate,
@@ -159,7 +159,7 @@ func TestNewMongoCacher(t *testing.T) {
// prepare updateOp data
dataUpdate := &sdcommon.Resource{Key: mockResourceID, DocumentID: mockDocumentID,
Value: Instance{Domain: "default", Project: "default",
- InstanceInfo: &pb.MicroServiceInstance{InstanceId: mockResourceID, HostName: "test", ModTimestamp: "100001"}}}
+ Instance: &pb.MicroServiceInstance{InstanceId: mockResourceID, HostName: "test", ModTimestamp: "100001"}}}
var mongoUpdateResources []*sdcommon.Resource
mongoUpdateResources = append(mongoUpdateResources, dataUpdate)
@@ -252,7 +252,7 @@ func TestNewMongoCacher(t *testing.T) {
// prepare updateOp data
dataUpdate := &sdcommon.Resource{Key: mockResourceID, DocumentID: mockDocumentID,
Value: Instance{Domain: "default", Project: "default",
- InstanceInfo: &pb.MicroServiceInstance{InstanceId: mockResourceID, HostName: "test", ModTimestamp: "100001"}}}
+ Instance: &pb.MicroServiceInstance{InstanceId: mockResourceID, HostName: "test", ModTimestamp: "100001"}}}
var mongoUpdateResources []*sdcommon.Resource
mongoUpdateResources = append(mongoUpdateResources, dataUpdate)
diff --git a/datasource/mongo/sd/types.go b/datasource/mongo/sd/types.go
index e5676e0..a5efe4e 100644
--- a/datasource/mongo/sd/types.go
+++ b/datasource/mongo/sd/types.go
@@ -90,15 +90,15 @@ type ResumeToken struct {
}
type Service struct {
- Domain string
- Project string
- Tags map[string]string
- ServiceInfo *pb.MicroService
+ Domain string
+ Project string
+ Tags map[string]string
+ Service *pb.MicroService
}
type Instance struct {
- Domain string
- Project string
- RefreshTime time.Time
- InstanceInfo *pb.MicroServiceInstance
+ Domain string
+ Project string
+ RefreshTime time.Time
+ Instance *pb.MicroServiceInstance
}
diff --git a/datasource/mongo/system.go b/datasource/mongo/system.go
index ab69149..1413bc4 100644
--- a/datasource/mongo/system.go
+++ b/datasource/mongo/system.go
@@ -51,7 +51,7 @@ func setServiceValue(e *sd.MongoCacher, setter dump.Setter) {
setter.SetValue(&dump.KV{
Key: util.StringJoin([]string{datasource.ServiceKeyPrefix, service.Domain, service.Project, k},
datasource.SPLIT),
- Value: service.ServiceInfo,
+ Value: service.Service,
})
return true
})
@@ -62,8 +62,8 @@ func setInstanceValue(e *sd.MongoCacher, setter dump.Setter) {
instance := kv.(cache.Item).Object.(sd.Instance)
setter.SetValue(&dump.KV{
Key: util.StringJoin([]string{datasource.InstanceKeyPrefix, instance.Domain, instance.Project,
- instance.InstanceInfo.ServiceId, k}, datasource.SPLIT),
- Value: instance.InstanceInfo,
+ instance.Instance.ServiceId, k}, datasource.SPLIT),
+ Value: instance.Instance,
})
return true
})
diff --git a/datasource/mongo/util.go b/datasource/mongo/util.go
index cc2179b..8e07918 100644
--- a/datasource/mongo/util.go
+++ b/datasource/mongo/util.go
@@ -89,7 +89,7 @@ func statistics(ctx context.Context, withShared bool) (*pb.Statistics, error) {
}
var instIDs []string
for _, inst := range instances {
- instIDs = append(instIDs, inst.InstanceInfo.ServiceId)
+ instIDs = append(instIDs, inst.Instance.ServiceId)
}
datasource.SetStaticInstances(result, svcIDToNonVerKey, instIDs)
data := <-respGetInstanceCountByDomain
diff --git a/etc/conf/app.conf b/etc/conf/app.conf
index 8f3ffbf..d40cc50 100644
--- a/etc/conf/app.conf
+++ b/etc/conf/app.conf
@@ -70,8 +70,6 @@ registry_plugin = etcd
# manager_cluster = "127.0.0.1:2379"
manager_cluster = "127.0.0.1:2379"
-# heartbeat_plugin="heartbeatchecker"
-heartbeat_plugin="heartbeatchecker"
# heartbeat that sync synchronizes client's endpoints with the known endpoints from
# the etcd membership, unit is second and value must greater then 1s, it is set
# default 30s if value less then 0
diff --git a/etc/conf/app.yaml b/etc/conf/app.yaml
index fa4a551..82b2f50 100644
--- a/etc/conf/app.yaml
+++ b/etc/conf/app.yaml
@@ -83,6 +83,17 @@ registry:
request:
timeout:
mongo:
+ heartbeat:
+ # Mongo's heartbeat plugin
+ # heartbeat.kind="checker or cache"
+ # if heartbeat.kind equals to 'cache', should set cacheCapacity,workerNum and taskTimeout
+ # capacity = 10000
+ # workerNum = 10
+ # timeout = 10
+ kind: cache
+ cacheCapacity: 10000
+ workerNum: 10
+ timeout: 10
cluster:
uri: mongodb://localhost:27017
diff --git a/test/test.go b/test/test.go
index 3639398..9ce76f1 100644
--- a/test/test.go
+++ b/test/test.go
@@ -39,7 +39,7 @@ func init() {
archaius.Set("discovery.kind", "etcd")
archaius.Set("registry.kind", "etcd")
} else {
- archaius.Set("registry.heartbeat.kind", "heartbeatchecker")
+ archaius.Set("registry.heartbeat.kind", "checker")
}
datasource.Init(datasource.Options{PluginImplName: datasource.ImplName(t.(string))})
core.ServiceAPI, core.InstanceAPI = service.AssembleResources()