You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by ze...@apache.org on 2019/06/21 09:37:23 UTC
[servicecomb-service-center] branch master updated: Abstract
synchronous data structure
This is an automated email from the ASF dual-hosted git repository.
zenlin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-service-center.git
The following commit(s) were added to refs/heads/master by this push:
new d3619e1 Abstract synchronous data structure
new ae14fbf Merge pull request #561 from ChinX/master
d3619e1 is described below
commit d3619e1f2172093435f5eacaa880b62d2990d77a
Author: chinx <c5...@126.com>
AuthorDate: Fri Jun 21 14:45:25 2019 +0800
Abstract synchronous data structure
---
syncer/etcd/storage.go | 108 +++++++-
syncer/pkg/mock/mockplugin/instance.go | 39 +--
syncer/pkg/mock/mockplugin/service.go | 14 +-
syncer/pkg/mock/mockplugin/servicecenter.go | 204 ++++++--------
syncer/plugins/plugin_test.go | 13 +-
syncer/plugins/servicecenter.go | 8 +-
syncer/plugins/servicecenter/instance.go | 16 +-
syncer/plugins/servicecenter/instance_test.go | 22 +-
syncer/plugins/servicecenter/service.go | 9 +-
syncer/plugins/servicecenter/service_test.go | 10 +-
syncer/plugins/servicecenter/servicecenter.go | 3 +-
syncer/plugins/servicecenter/transform.go | 199 ++++++++++++--
syncer/proto/syncer.pb.go | 378 +++++++++++++++++++++++---
syncer/proto/syncer.proto | 62 ++++-
syncer/servicecenter/exclude.go | 43 ++-
syncer/servicecenter/servicecenter.go | 55 ++--
syncer/servicecenter/servicecenter_test.go | 3 +-
syncer/servicecenter/sync.go | 39 +--
18 files changed, 869 insertions(+), 356 deletions(-)
diff --git a/syncer/etcd/storage.go b/syncer/etcd/storage.go
index 5d45190..f77af90 100644
--- a/syncer/etcd/storage.go
+++ b/syncer/etcd/storage.go
@@ -34,6 +34,8 @@ var (
mappingsKey = "/syncer/v1/mappings"
// servicesKey the key of service in etcd
servicesKey = "/syncer/v1/services"
+ // instancesKey the key of instance in etcd
+ instancesKey = "/syncer/v1/instances"
)
type storage struct {
@@ -67,15 +69,52 @@ func (s *storage) getPrefixKey(prefix string, handler func(key, val []byte) (nex
// UpdateData Update data to storage
func (s *storage) UpdateData(data *pb.SyncData) {
+ services := s.GetServices()
s.UpdateServices(data.Services)
+ s.compareAndDeleteServices(services, data.Services)
+
+ instances := s.GetInstances()
+ s.UpdateInstances(data.Instances)
+ s.compareAndDeleteInstances(instances, data.Instances)
}
// GetData Get data from storage
func (s *storage) GetData() (data *pb.SyncData) {
- data = &pb.SyncData{Services: s.GetServices()}
+ data = &pb.SyncData{
+ Services: s.GetServices(),
+ Instances: s.GetInstances(),
+ }
return
}
+func (s *storage) compareAndDeleteServices(origin, renew []*pb.SyncService) {
+ expires := make([]*pb.SyncService, 0, len(origin))
+next:
+ for _, item := range origin {
+ for _, service := range renew {
+ if service.ServiceId == item.ServiceId {
+ continue next
+ }
+ }
+ expires = append(expires, item)
+ }
+ s.DeleteServices(expires)
+}
+
+func (s *storage) compareAndDeleteInstances(origin, renew []*pb.SyncInstance) {
+ expires := make([]*pb.SyncInstance, 0, len(origin))
+next:
+ for _, item := range origin {
+ for _, service := range renew {
+ if service.InstanceId == item.InstanceId {
+ continue next
+ }
+ }
+ expires = append(expires, item)
+ }
+ s.DeleteInstances(expires)
+}
+
// cleanExpired clean the expired instances in the maps
func (s *storage) cleanExpired(sources pb.SyncMapping, actives pb.SyncMapping) {
next:
@@ -89,13 +128,15 @@ next:
if _, err := s.client.Delete(context.Background(), key); err != nil {
log.Errorf(err, "Delete instance clusterName=%s instanceID=%s failed", entry.ClusterName, entry.OrgInstanceID)
}
+
+ s.deleteInstance(entry.CurInstanceID)
}
}
// UpdateServices Update services to storage
func (s *storage) UpdateServices(services []*pb.SyncService) {
for _, val := range services {
- key := servicesKey + "/" + val.Service.ServiceId
+ key := servicesKey + "/" + val.ServiceId
data, err := proto.Marshal(val)
if err != nil {
log.Errorf(err, "Proto marshal failed: %s", err)
@@ -124,6 +165,69 @@ func (s *storage) GetServices() (services []*pb.SyncService) {
return
}
+// DeleteServices Delete services from storage
+func (s *storage) DeleteServices(services []*pb.SyncService) {
+ for _, val := range services {
+ s.deleteService(val.ServiceId)
+ }
+}
+
+// DeleteServices Delete services from storage
+func (s *storage) deleteService(serviceId string) {
+ key := servicesKey + "/" + serviceId
+ _, err := s.client.Delete(context.Background(), key)
+ if err != nil {
+ log.Errorf(err, "Delete service from etcd failed: %s", err)
+ }
+}
+
+// UpdateInstances Update instances to storage
+func (s *storage) UpdateInstances(instances []*pb.SyncInstance) {
+ for _, val := range instances {
+ key := instancesKey + "/" + val.InstanceId
+ data, err := proto.Marshal(val)
+ if err != nil {
+ log.Errorf(err, "Proto marshal failed: %s", err)
+ continue
+ }
+ _, err = s.client.Put(context.Background(), key, util.BytesToStringWithNoCopy(data))
+ if err != nil {
+ log.Errorf(err, "Save instance to etcd failed: %s", err)
+ }
+ }
+}
+
+// GetInstances Get instances from storage
+func (s *storage) GetInstances() (instances []*pb.SyncInstance) {
+ instances = make([]*pb.SyncInstance, 0, 10)
+ s.getPrefixKey(instancesKey, func(key, val []byte) (next bool) {
+ next = true
+ item := &pb.SyncInstance{}
+ if err := proto.Unmarshal(val, item); err != nil {
+ log.Errorf(err, "Proto unmarshal '%s' failed: %s", val, err)
+ return
+ }
+ instances = append(instances, item)
+ return
+ })
+ return
+}
+
+// DeleteInstances Delete instances from storage
+func (s *storage) DeleteInstances(instances []*pb.SyncInstance) {
+ for _, val := range instances {
+ s.deleteInstance(val.InstanceId)
+ }
+}
+
+func (s *storage) deleteInstance(instanceID string) {
+ key := instancesKey + "/" + instanceID
+ _, err := s.client.Delete(context.Background(), key)
+ if err != nil {
+ log.Errorf(err, "Delete instance from etcd failed: %s", err)
+ }
+}
+
// UpdateMapByCluster update map to storage by clusterName of other cluster
func (s *storage) UpdateMapByCluster(clusterName string, mapping pb.SyncMapping) {
newMaps := make(pb.SyncMapping, 0, len(mapping))
diff --git a/syncer/pkg/mock/mockplugin/instance.go b/syncer/pkg/mock/mockplugin/instance.go
index 71250aa..53711d7 100644
--- a/syncer/pkg/mock/mockplugin/instance.go
+++ b/syncer/pkg/mock/mockplugin/instance.go
@@ -20,17 +20,17 @@ package mockplugin
import (
"context"
- scpb "github.com/apache/servicecomb-service-center/server/core/proto"
+ pb "github.com/apache/servicecomb-service-center/syncer/proto"
)
var (
- registerInstance func(ctx context.Context, domainProject, serviceId string, instance *scpb.MicroServiceInstance) (string, error)
+ registerInstance func(ctx context.Context, domainProject, serviceId string, instance *pb.SyncInstance) (string, error)
unregisterInstance func(ctx context.Context, domainProject, serviceId, instanceId string) error
- discoveryInstances func(ctx context.Context, domainProject, consumerId, providerAppId, providerServiceName, providerVersionRule string) ([]*scpb.MicroServiceInstance, error)
heartbeat func(ctx context.Context, domainProject, serviceId, instanceId string) error
+
)
-func SetRegisterInstance(handler func(ctx context.Context, domainProject, serviceId string, instance *scpb.MicroServiceInstance) (string, error)) {
+func SetRegisterInstance(handler func(ctx context.Context, domainProject, serviceId string, instance *pb.SyncInstance) (string, error)) {
registerInstance = handler
}
@@ -38,15 +38,11 @@ func SetUnregisterInstance(handler func(ctx context.Context, domainProject, serv
unregisterInstance = handler
}
-func SetDiscoveryInstances(handler func(ctx context.Context, domainProject, consumerId, providerAppId, providerServiceName, providerVersionRule string) ([]*scpb.MicroServiceInstance, error)) {
- discoveryInstances = handler
-}
-
func SetHeartbeat(handler func(ctx context.Context, domainProject, serviceId, instanceId string) error) {
heartbeat = handler
}
-func (c *mockPlugin) RegisterInstance(ctx context.Context, domainProject, serviceId string, instance *scpb.MicroServiceInstance) (string, error) {
+func (c *mockPlugin) RegisterInstance(ctx context.Context, domainProject, serviceId string, instance *pb.SyncInstance) (string, error) {
if registerInstance != nil {
return registerInstance(ctx, domainProject, serviceId, instance)
}
@@ -60,31 +56,6 @@ func (c *mockPlugin) UnregisterInstance(ctx context.Context, domainProject, serv
return nil
}
-func (c *mockPlugin) DiscoveryInstances(ctx context.Context, domainProject, consumerId, providerAppId, providerServiceName, providerVersionRule string) ([]*scpb.MicroServiceInstance, error) {
- if discoveryInstances != nil {
- return discoveryInstances(ctx, domainProject, consumerId, providerAppId, providerServiceName, providerVersionRule)
- }
- return []*scpb.MicroServiceInstance{
- {
- InstanceId: "4d41a637471f11e9888cfa163eca30e0",
- ServiceId: "5db1b794aa6f8a875d6e68110260b5491ee7e223",
- Endpoints: []string{
- "rest://127.0.0.1:30100/",
- },
- HostName: "testmock",
- Status: "UP",
- HealthCheck: &scpb.HealthCheck{
- Mode: "push",
- Interval: 30,
- Times: 3,
- },
- Timestamp: "1552653537",
- ModTimestamp: "1552653537",
- Version: "1.1.0",
- },
- }, nil
-}
-
func (c *mockPlugin) Heartbeat(ctx context.Context, domainProject, serviceId, instanceId string) error {
if heartbeat != nil {
return heartbeat(ctx, domainProject, serviceId, instanceId)
diff --git a/syncer/pkg/mock/mockplugin/service.go b/syncer/pkg/mock/mockplugin/service.go
index 97bbaba..39a79f8 100644
--- a/syncer/pkg/mock/mockplugin/service.go
+++ b/syncer/pkg/mock/mockplugin/service.go
@@ -20,16 +20,16 @@ package mockplugin
import (
"context"
- scpb "github.com/apache/servicecomb-service-center/server/core/proto"
+ pb "github.com/apache/servicecomb-service-center/syncer/proto"
)
var (
- createServiceHandler func(ctx context.Context, domainProject string, service *scpb.MicroService) (string, error)
+ createServiceHandler func(ctx context.Context, domainProject string, service *pb.SyncService) (string, error)
deleteService func(ctx context.Context, domainProject, serviceId string) error
- serviceExistence func(ctx context.Context, domainProject string, service *scpb.MicroService) (string, error)
+ serviceExistence func(ctx context.Context, domainProject string, service *pb.SyncService) (string, error)
)
-func SetCreateService(handler func(ctx context.Context, domainProject string, service *scpb.MicroService) (string, error)) {
+func SetCreateService(handler func(ctx context.Context, domainProject string, service *pb.SyncService) (string, error)) {
createServiceHandler = handler
}
@@ -37,11 +37,11 @@ func SetDeleteService(handler func(ctx context.Context, domainProject, serviceId
deleteService = handler
}
-func SetServiceExistence(handler func(ctx context.Context, domainProject string, service *scpb.MicroService) (string, error)) {
+func SetServiceExistence(handler func(ctx context.Context, domainProject string, source *pb.SyncService) (string, error)) {
serviceExistence = handler
}
-func (c *mockPlugin) CreateService(ctx context.Context, domainProject string, service *scpb.MicroService) (string, error) {
+func (c *mockPlugin) CreateService(ctx context.Context, domainProject string, service *pb.SyncService) (string, error) {
if createServiceHandler != nil {
return createServiceHandler(ctx, domainProject, service)
}
@@ -55,7 +55,7 @@ func (c *mockPlugin) DeleteService(ctx context.Context, domainProject, serviceId
return nil
}
-func (c *mockPlugin) ServiceExistence(ctx context.Context, domainProject string, service *scpb.MicroService) (string, error) {
+func (c *mockPlugin) ServiceExistence(ctx context.Context, domainProject string, service *pb.SyncService) (string, error) {
if serviceExistence != nil {
return serviceExistence(ctx, domainProject, service)
}
diff --git a/syncer/pkg/mock/mockplugin/servicecenter.go b/syncer/pkg/mock/mockplugin/servicecenter.go
index 695fa46..4e01074 100644
--- a/syncer/pkg/mock/mockplugin/servicecenter.go
+++ b/syncer/pkg/mock/mockplugin/servicecenter.go
@@ -20,7 +20,6 @@ package mockplugin
import (
"context"
- scpb "github.com/apache/servicecomb-service-center/server/core/proto"
"github.com/apache/servicecomb-service-center/syncer/plugins"
pb "github.com/apache/servicecomb-service-center/syncer/proto"
)
@@ -60,99 +59,69 @@ func NewGetAll(ctx context.Context) (*pb.SyncData, error) {
return &pb.SyncData{
Services: []*pb.SyncService{
{
+ ServiceId: "5db1b794aa6f8a875d6e68110260b5491ee7e223",
+ App: "default",
+ Name: "SERVICECENTER",
+ Version: "1.1.0",
+ Status: pb.SyncService_UP,
DomainProject: "default/default",
- Service: &scpb.MicroService{
- ServiceId: "5db1b794aa6f8a875d6e68110260b5491ee7e223",
- AppId: "default",
- ServiceName: "SERVICECENTER",
- Version: "1.1.0",
- Level: "BACK",
- Schemas: []string{
- "servicecenter.grpc.api.ServiceCtrl",
- "servicecenter.grpc.api.ServiceInstanceCtrl",
- },
- Status: "UP",
- Properties: map[string]string{
- "allowCrossApp": "true",
- },
- Timestamp: "1552626180",
- ModTimestamp: "1552626180",
- Environment: "production",
+ PluginName: PluginName,
+ }, {
+ ServiceId: "5db1b794aa6f8a875d6e68110260b5491ee7e211",
+ App: "default",
+ Name: "SERVICECENTER",
+ Version: "1.1.0",
+ Status: pb.SyncService_UP,
+ DomainProject: "default/default",
+ PluginName: PluginName,
+ },
+ },
+ Instances: []*pb.SyncInstance{
+ {
+ InstanceId: "4d41a637471f11e9888cfa163eca30ab",
+ ServiceId: "5db1b794aa6f8a875d6e68110260b5491ee7e223",
+ Endpoints: []string{
+ "http://127.0.0.1:30100",
},
- Instances: []*scpb.MicroServiceInstance{
- {
- InstanceId: "4d41a637471f11e9888cfa163eca30ab",
- ServiceId: "5db1b794aa6f8a875d6e68110260b5491ee7e223",
- Endpoints: []string{
- "rest://127.0.0.1:30100/",
- },
- HostName: "testmock",
- Status: "UP",
- HealthCheck: &scpb.HealthCheck{
- Mode: "push",
- Interval: 30,
- Times: 3,
- },
- Timestamp: "1552653537",
- ModTimestamp: "1552653537",
- Version: "1.1.0",
- },{
- InstanceId: "4d41a637471f11e9888cfa163eca30e0",
- ServiceId: "5db1b794aa6f8a875d6e68110260b5491ee7e223",
- Endpoints: []string{
- "rest://127.0.0.1:30100/",
- },
- HostName: "testmock",
- Status: "UP",
- HealthCheck: &scpb.HealthCheck{
- Mode: "push",
- Interval: 30,
- Times: 3,
- },
- Timestamp: "1552653537",
- ModTimestamp: "1552653537",
- Version: "1.1.0",
- },
+ HostName: "testmock",
+ Status: pb.SyncInstance_UP,
+ HealthCheck: &pb.HealthCheck{
+ Mode: pb.HealthCheck_PUSH,
+ Interval: 30,
+ Times: 3,
},
- },{
- DomainProject: "default/default",
- Service: &scpb.MicroService{
- ServiceId: "5db1b794aa6f8a875d6e68110260b5491ee7e211",
- AppId: "default",
- ServiceName: "SERVICECENTER",
- Version: "1.1.0",
- Level: "BACK",
- Schemas: []string{
- "servicecenter.grpc.api.ServiceCtrl",
- "servicecenter.grpc.api.ServiceInstanceCtrl",
- },
- Status: "UP",
- Properties: map[string]string{
- "allowCrossApp": "true",
- },
- Timestamp: "1552626180",
- ModTimestamp: "1552626180",
- Environment: "production",
+ Version: "1.1.0",
+ PluginName: PluginName,
+ }, {
+ InstanceId: "4d41a637471f11e9888cfa163eca30e0",
+ ServiceId: "5db1b794aa6f8a875d6e68110260b5491ee7e223",
+ Endpoints: []string{
+ "http://127.0.0.1:30100",
},
- Instances: []*scpb.MicroServiceInstance{
- {
- InstanceId: "4d41a637471f11e9888cfa163eca30ab",
- ServiceId: "5db1b794aa6f8a875d6e68110260b5491ee7e211",
- Endpoints: []string{
- "rest://127.0.0.1:30100/",
- },
- HostName: "testmock",
- Status: "UP",
- HealthCheck: &scpb.HealthCheck{
- Mode: "push",
- Interval: 30,
- Times: 3,
- },
- Timestamp: "1552653537",
- ModTimestamp: "1552653537",
- Version: "1.1.0",
- },
+ HostName: "testmock",
+ Status: pb.SyncInstance_UP,
+ HealthCheck: &pb.HealthCheck{
+ Mode: pb.HealthCheck_PUSH,
+ Interval: 30,
+ Times: 3,
},
+ Version: "1.1.0",
+ PluginName: PluginName,
+ }, {
+ InstanceId: "4d41a637471f11e9888cfa163eca30ab",
+ ServiceId: "5db1b794aa6f8a875d6e68110260b5491ee7e211",
+ Endpoints: []string{
+ "http://127.0.0.1:30100",
+ },
+ HostName: "testmock",
+ Status: pb.SyncInstance_UP,
+ HealthCheck: &pb.HealthCheck{
+ Mode: pb.HealthCheck_PUSH,
+ Interval: 30,
+ Times: 3,
+ },
+ Version: "1.1.0",
+ PluginName: PluginName,
},
},
}, nil
@@ -165,44 +134,31 @@ func (c *mockPlugin) GetAll(ctx context.Context) (*pb.SyncData, error) {
return &pb.SyncData{
Services: []*pb.SyncService{
{
+ ServiceId: "5db1b794aa6f8a875d6e68110260b5491ee7e223",
+ App: "default",
+ Name: "SERVICECENTER",
+ Version: "1.1.0",
+ Status: pb.SyncService_UP,
DomainProject: "default/default",
- Service: &scpb.MicroService{
- ServiceId: "5db1b794aa6f8a875d6e68110260b5491ee7e223",
- AppId: "default",
- ServiceName: "SERVICECENTER",
- Version: "1.1.0",
- Level: "BACK",
- Schemas: []string{
- "servicecenter.grpc.api.ServiceCtrl",
- "servicecenter.grpc.api.ServiceInstanceCtrl",
- },
- Status: "UP",
- Properties: map[string]string{
- "allowCrossApp": "true",
- },
- Timestamp: "1552626180",
- ModTimestamp: "1552626180",
- Environment: "production",
+ PluginName: PluginName,
+ },
+ },
+ Instances: []*pb.SyncInstance{
+ {
+ InstanceId: "4d41a637471f11e9888cfa163eca30e0",
+ ServiceId: "5db1b794aa6f8a875d6e68110260b5491ee7e223",
+ Endpoints: []string{
+ "http://127.0.0.1:30100",
},
- Instances: []*scpb.MicroServiceInstance{
- {
- InstanceId: "4d41a637471f11e9888cfa163eca30e0",
- ServiceId: "5db1b794aa6f8a875d6e68110260b5491ee7e223",
- Endpoints: []string{
- "rest://127.0.0.1:30100/",
- },
- HostName: "testmock",
- Status: "UP",
- HealthCheck: &scpb.HealthCheck{
- Mode: "push",
- Interval: 30,
- Times: 3,
- },
- Timestamp: "1552653537",
- ModTimestamp: "1552653537",
- Version: "1.1.0",
- },
+ HostName: "testmock",
+ Status: pb.SyncInstance_UP,
+ HealthCheck: &pb.HealthCheck{
+ Mode: pb.HealthCheck_PUSH,
+ Interval: 30,
+ Times: 3,
},
+ Version: "1.1.0",
+ PluginName: PluginName,
},
},
}, nil
diff --git a/syncer/plugins/plugin_test.go b/syncer/plugins/plugin_test.go
index cfaa778..f4bd53e 100644
--- a/syncer/plugins/plugin_test.go
+++ b/syncer/plugins/plugin_test.go
@@ -21,7 +21,6 @@ import (
"context"
"testing"
- scpb "github.com/apache/servicecomb-service-center/server/core/proto"
pb "github.com/apache/servicecomb-service-center/syncer/proto"
)
@@ -39,7 +38,7 @@ type mockRepository struct{}
func (r *mockRepository) GetAll(ctx context.Context) (data *pb.SyncData, err error) { return }
-func (r *mockRepository) CreateService(ctx context.Context, domainProject string, service *scpb.MicroService) (str string, err error) {
+func (r *mockRepository) CreateService(ctx context.Context, domainProject string, service *pb.SyncService) (str string, err error) {
return
}
@@ -47,11 +46,11 @@ func (r *mockRepository) DeleteService(ctx context.Context, domainProject, servi
return
}
-func (r *mockRepository) ServiceExistence(ctx context.Context, domainProject string, service *scpb.MicroService) (str string, err error) {
+func (r *mockRepository) ServiceExistence(ctx context.Context, domainProject string, service *pb.SyncService) (str string, err error) {
return
}
-func (r *mockRepository) RegisterInstance(ctx context.Context, domainProject, serviceId string, instance *scpb.MicroServiceInstance) (str string, err error) {
+func (r *mockRepository) RegisterInstance(ctx context.Context, domainProject, serviceId string, instance *pb.SyncInstance) (str string, err error) {
return
}
@@ -59,10 +58,6 @@ func (r *mockRepository) UnregisterInstance(ctx context.Context, domainProject,
return
}
-func (r *mockRepository) DiscoveryInstances(ctx context.Context, domainProject, consumerId, providerAppId, providerServiceName, providerVersionRule string) (list []*scpb.MicroServiceInstance, err error) {
- return
-}
-
func (r *mockRepository) Heartbeat(ctx context.Context, domainProject, serviceId, instanceId string) (err error) {
return
}
@@ -71,7 +66,7 @@ func TestManager_New(t *testing.T) {
pm := Plugins()
notfound := PluginType(999)
- notfound.String()
+ t.Log(notfound.String())
p := pm.Get(notfound, BUILDIN)
if p != nil {
diff --git a/syncer/plugins/servicecenter.go b/syncer/plugins/servicecenter.go
index 8797e44..e528c52 100644
--- a/syncer/plugins/servicecenter.go
+++ b/syncer/plugins/servicecenter.go
@@ -19,7 +19,6 @@ package plugins
import (
"context"
- scpb "github.com/apache/servicecomb-service-center/server/core/proto"
pb "github.com/apache/servicecomb-service-center/syncer/proto"
)
@@ -31,11 +30,10 @@ type Adaptor interface {
// Servicecenter servicecenter interface
type Servicecenter interface {
GetAll(ctx context.Context) (*pb.SyncData, error)
- CreateService(ctx context.Context, domainProject string, service *scpb.MicroService) (string, error)
+ CreateService(ctx context.Context, domainProject string, service *pb.SyncService) (string, error)
DeleteService(ctx context.Context, domainProject, serviceId string) error
- ServiceExistence(ctx context.Context, domainProject string, service *scpb.MicroService) (string, error)
- RegisterInstance(ctx context.Context, domainProject, serviceId string, instance *scpb.MicroServiceInstance) (string, error)
+ ServiceExistence(ctx context.Context, domainProject string, service *pb.SyncService) (string, error)
+ RegisterInstance(ctx context.Context, domainProject, serviceId string, instance *pb.SyncInstance) (string, error)
UnregisterInstance(ctx context.Context, domainProject, serviceId, instanceId string) error
- DiscoveryInstances(ctx context.Context, domainProject, consumerId, providerAppId, providerServiceName, providerVersionRule string) ([]*scpb.MicroServiceInstance, error)
Heartbeat(ctx context.Context, domainProject, serviceId, instanceId string) error
}
diff --git a/syncer/plugins/servicecenter/instance.go b/syncer/plugins/servicecenter/instance.go
index 05d21f3..74d80b0 100644
--- a/syncer/plugins/servicecenter/instance.go
+++ b/syncer/plugins/servicecenter/instance.go
@@ -19,11 +19,14 @@ package servicecenter
import (
"context"
- scpb "github.com/apache/servicecomb-service-center/server/core/proto"
+ pb "github.com/apache/servicecomb-service-center/syncer/proto"
)
// RegisterInstance register instance to servicecenter
-func (c *Client) RegisterInstance(ctx context.Context, domainProject, serviceId string, instance *scpb.MicroServiceInstance) (string, error) {
+func (c *Client) RegisterInstance(ctx context.Context, domainProject, serviceId string, syncInstance *pb.SyncInstance) (string, error) {
+ instance := toInstance(syncInstance)
+ instance.InstanceId = ""
+ instance.ServiceId = serviceId
instanceID, err := c.cli.RegisterInstance(ctx, domainProject, serviceId, instance)
if err != nil {
return "", err
@@ -40,15 +43,6 @@ func (c *Client) UnregisterInstance(ctx context.Context, domainProject, serviceI
return nil
}
-// DiscoveryInstances discoveries instances from servicecenter
-func (c *Client) DiscoveryInstances(ctx context.Context, domainProject, consumerId, providerAppId, providerServiceName, providerVersionRule string) ([]*scpb.MicroServiceInstance, error) {
- instances, err := c.cli.DiscoveryInstances(ctx, domainProject, consumerId, providerAppId, providerServiceName, providerVersionRule)
- if err != nil {
- return nil, err
- }
- return instances, nil
-}
-
// Heartbeat sends heartbeat to servicecenter
func (c *Client) Heartbeat(ctx context.Context, domainProject, serviceId, instanceId string) error {
err := c.cli.Heartbeat(ctx, domainProject, serviceId, instanceId)
diff --git a/syncer/plugins/servicecenter/instance_test.go b/syncer/plugins/servicecenter/instance_test.go
index 0df4d5e..726eca3 100644
--- a/syncer/plugins/servicecenter/instance_test.go
+++ b/syncer/plugins/servicecenter/instance_test.go
@@ -20,20 +20,20 @@ import (
"context"
"testing"
- scpb "github.com/apache/servicecomb-service-center/server/core/proto"
+ pb "github.com/apache/servicecomb-service-center/syncer/proto"
)
func TestClient_RegisterInstance(t *testing.T) {
svr, repo := newServiceCenter(t)
_, err := repo.RegisterInstance(context.Background(), "default/deault",
- "4042a6a3e5a2893698ae363ea99a69eb63fc51cd", &scpb.MicroServiceInstance{})
+ "4042a6a3e5a2893698ae363ea99a69eb63fc51cd", &pb.SyncInstance{})
if err != nil {
t.Errorf("register instance failed, error: %s", err)
}
svr.Close()
_, err = repo.RegisterInstance(context.Background(), "default/deault",
- "4042a6a3e5a2893698ae363ea99a69eb63fc51cd", &scpb.MicroServiceInstance{})
+ "4042a6a3e5a2893698ae363ea99a69eb63fc51cd", &pb.SyncInstance{})
if err != nil {
t.Logf("register instance failed, error: %s", err)
}
@@ -55,22 +55,6 @@ func TestClient_UnregisterInstance(t *testing.T) {
}
}
-func TestClient_DiscoveryInstances(t *testing.T) {
- svr, repo := newServiceCenter(t)
- _, err := repo.DiscoveryInstances(context.Background(), "default/deault",
- "4042a6a3e5a2893698ae363ea99a69eb63fc51cd", "default", "testservice", "1.0.1")
- if err != nil {
- t.Errorf("discovery instances failed, error: %s", err)
- }
-
- svr.Close()
- _, err = repo.DiscoveryInstances(context.Background(), "default/deault",
- "4042a6a3e5a2893698ae363ea99a69eb63fc51cd", "default", "testservice", "1.0.1")
- if err != nil {
- t.Logf("discovery instances failed, error: %s", err)
- }
-}
-
func TestClient_Heartbeat(t *testing.T) {
svr, repo := newServiceCenter(t)
err := repo.Heartbeat(context.Background(), "default/deault",
diff --git a/syncer/plugins/servicecenter/service.go b/syncer/plugins/servicecenter/service.go
index 129f904..fa853d9 100644
--- a/syncer/plugins/servicecenter/service.go
+++ b/syncer/plugins/servicecenter/service.go
@@ -19,11 +19,13 @@ package servicecenter
import (
"context"
- scpb "github.com/apache/servicecomb-service-center/server/core/proto"
+ pb "github.com/apache/servicecomb-service-center/syncer/proto"
)
// CreateService creates the service of servicecenter
-func (c *Client) CreateService(ctx context.Context, domainProject string, service *scpb.MicroService) (string, error) {
+func (c *Client) CreateService(ctx context.Context, domainProject string, syncService *pb.SyncService) (string, error) {
+ service := toService(syncService)
+ service.ServiceId = ""
serviceID, err := c.cli.CreateService(ctx, domainProject, service)
if err != nil {
return "", err
@@ -41,7 +43,8 @@ func (c *Client) DeleteService(ctx context.Context, domainProject, serviceId str
}
// ServiceExistence Checkes service exists in servicecenter
-func (c *Client) ServiceExistence(ctx context.Context, domainProject string, service *scpb.MicroService) (string, error) {
+func (c *Client) ServiceExistence(ctx context.Context, domainProject string, syncService *pb.SyncService) (string, error) {
+ service := toService(syncService)
serviceID, err := c.cli.ServiceExistence(ctx, domainProject, service.AppId, service.ServiceName, service.Version, service.Environment)
if err != nil {
return "", err
diff --git a/syncer/plugins/servicecenter/service_test.go b/syncer/plugins/servicecenter/service_test.go
index 8f4e343..b8d10f3 100644
--- a/syncer/plugins/servicecenter/service_test.go
+++ b/syncer/plugins/servicecenter/service_test.go
@@ -20,18 +20,18 @@ import (
"context"
"testing"
- scpb "github.com/apache/servicecomb-service-center/server/core/proto"
+ pb "github.com/apache/servicecomb-service-center/syncer/proto"
)
func TestClient_CreateService(t *testing.T) {
svr, repo := newServiceCenter(t)
- _, err := repo.CreateService(context.Background(), "default/deault", &scpb.MicroService{})
+ _, err := repo.CreateService(context.Background(), "default/deault", &pb.SyncService{})
if err != nil {
t.Errorf("create service failed, error: %s", err)
}
svr.Close()
- _, err = repo.CreateService(context.Background(), "default/deault", &scpb.MicroService{})
+ _, err = repo.CreateService(context.Background(), "default/deault", &pb.SyncService{})
if err != nil {
t.Logf("create service failed, error: %s", err)
}
@@ -39,13 +39,13 @@ func TestClient_CreateService(t *testing.T) {
func TestClient_ServiceExistence(t *testing.T) {
svr, repo := newServiceCenter(t)
- _, err := repo.ServiceExistence(context.Background(), "default/deault", &scpb.MicroService{})
+ _, err := repo.ServiceExistence(context.Background(), "default/deault", &pb.SyncService{})
if err != nil {
t.Errorf("check service existence failed, error: %s", err)
}
svr.Close()
- _, err = repo.ServiceExistence(context.Background(), "default/deault", &scpb.MicroService{})
+ _, err = repo.ServiceExistence(context.Background(), "default/deault", &pb.SyncService{})
if err != nil {
t.Logf("check service existence failed, error: %s", err)
}
diff --git a/syncer/plugins/servicecenter/servicecenter.go b/syncer/plugins/servicecenter/servicecenter.go
index 8d3154a..bfc9af2 100644
--- a/syncer/plugins/servicecenter/servicecenter.go
+++ b/syncer/plugins/servicecenter/servicecenter.go
@@ -60,6 +60,5 @@ func (c *Client) GetAll(ctx context.Context) (*pb.SyncData, error) {
if err != nil {
return nil, err
}
- return transform(cache), nil
-
+ return toSyncData(cache), nil
}
diff --git a/syncer/plugins/servicecenter/transform.go b/syncer/plugins/servicecenter/transform.go
index 984d678..e028d1c 100644
--- a/syncer/plugins/servicecenter/transform.go
+++ b/syncer/plugins/servicecenter/transform.go
@@ -17,43 +17,206 @@
package servicecenter
import (
+ "net/url"
+ "strconv"
"strings"
+ "github.com/apache/servicecomb-service-center/pkg/log"
"github.com/apache/servicecomb-service-center/server/admin/model"
scpb "github.com/apache/servicecomb-service-center/server/core/proto"
pb "github.com/apache/servicecomb-service-center/syncer/proto"
+ "github.com/gogo/protobuf/proto"
)
-// transform servicecenter service cache to SyncData
-func transform(cache *model.Cache) (data *pb.SyncData) {
- data = &pb.SyncData{Services: make([]*pb.SyncService, 0, 10)}
+// toSyncData transform service-center service cache to SyncData
+func toSyncData(cache *model.Cache) (data *pb.SyncData) {
+ data = &pb.SyncData{
+ Services: make([]*pb.SyncService, 0, len(cache.Microservices)),
+ Instances: make([]*pb.SyncInstance, 0, len(cache.Instances)),
+ }
+
+ for _, service := range cache.Microservices {
+ tenant := strings.Split(service.Key, "/")
+ if len(tenant) < 6 {
+ continue
+ }
+ syncService := toSyncService(service.Value)
+ syncService.DomainProject = strings.Join(tenant[4:6], "/")
- for _, svc := range cache.Microservices {
- instances := instancesFromService(svc.Value, cache.Instances)
- if len(instances) == 0 {
+ syncInstances := toSyncInstances(syncService.ServiceId, cache.Instances)
+ if len(syncInstances) == 0 {
continue
}
- data.Services = append(data.Services, &pb.SyncService{
- DomainProject: strings.Join(strings.Split(svc.Key, "/")[4:6], "/"),
- Service: svc.Value,
- Instances: instances,
- })
+ data.Services = append(data.Services, syncService)
+ data.Instances = append(data.Instances, syncInstances...)
+ }
+ return
+}
+
+// toSyncService transform service-center service to SyncService
+func toSyncService(service *scpb.MicroService) (syncService *pb.SyncService) {
+ syncService = &pb.SyncService{
+ ServiceId: service.ServiceId,
+ App: service.AppId,
+ Name: service.ServiceName,
+ Version: service.Version,
+ Environment: service.Environment,
+ PluginName: PluginName,
+ }
+ switch service.Status {
+ case scpb.MS_UP:
+ syncService.Status = pb.SyncService_UP
+ case scpb.MS_DOWN:
+ syncService.Status = pb.SyncService_DOWN
+ default:
+ syncService.Status = pb.SyncService_UNKNOWN
+ }
+
+ expansion, err := proto.Marshal(service)
+ if err != nil {
+ log.Errorf(err, "transform sc service to syncer service failed: %s", err)
+ return
}
+ syncService.Expansion = expansion
return
}
-// instancesFromService Extract instance information from the service cache of the servicecenter
-func instancesFromService(service *scpb.MicroService, instances []*model.Instance) []*scpb.MicroServiceInstance {
- instList := make([]*scpb.MicroServiceInstance, 0, 10)
+// toSyncInstances transform service-center instances to SyncInstances
+func toSyncInstances(serviceID string, instances []*model.Instance) (syncInstances []*pb.SyncInstance) {
for _, inst := range instances {
- if inst.Value.Status != "UP" {
+ if inst.Value.Status != scpb.MSI_UP {
continue
}
- if inst.Value.ServiceId == service.ServiceId {
- instList = append(instList, inst.Value)
+ if inst.Value.ServiceId == serviceID {
+ syncInstances = append(syncInstances, toSyncInstance(serviceID, inst.Value))
}
}
- return instList
+ return
+}
+
+// toSyncInstance transform service-center instance to SyncInstance
+func toSyncInstance(serviceID string, instance *scpb.MicroServiceInstance) (syncInstance *pb.SyncInstance) {
+ syncInstance = &pb.SyncInstance{
+ InstanceId: instance.InstanceId,
+ ServiceId: serviceID,
+ Endpoints: make([]string, 0, len(instance.Endpoints)),
+ HostName: instance.HostName,
+ Version: instance.Version,
+ PluginName: PluginName,
+ }
+ switch instance.Status {
+ case scpb.MSI_UP:
+ syncInstance.Status = pb.SyncInstance_UP
+ case scpb.MSI_DOWN:
+ syncInstance.Status = pb.SyncInstance_DOWN
+ case scpb.MSI_STARTING:
+ syncInstance.Status = pb.SyncInstance_STARTING
+ case scpb.MSI_OUTOFSERVICE:
+ syncInstance.Status = pb.SyncInstance_OUTOFSERVICE
+ default:
+ syncInstance.Status = pb.SyncInstance_UNKNOWN
+ }
+
+ for _, ep := range instance.Endpoints {
+ prefix := "http://"
+ addr, err := url.Parse(ep)
+ if err != nil {
+ log.Errorf(err, "parse sc instance endpoint failed: %s", err)
+ continue
+ }
+ if addr.Scheme == "rest" {
+ b, _ := strconv.ParseBool(addr.Query().Get("sslEnabled"))
+ if b {
+ prefix = "https://"
+ }
+ }
+ syncInstance.Endpoints = append(syncInstance.Endpoints, strings.Replace(ep, addr.Scheme+"://", prefix, 1))
+ }
+
+ if instance.HealthCheck != nil {
+ syncInstance.HealthCheck = &pb.HealthCheck{
+ Port: instance.HealthCheck.Port,
+ Interval: instance.HealthCheck.Interval,
+ Times: instance.HealthCheck.Times,
+ Url: instance.HealthCheck.Url,
+ }
+ }
+
+ expansion, err := proto.Marshal(instance)
+ if err != nil {
+ log.Errorf(err, "transform sc service to syncer service failed: %s", err)
+ return
+ }
+ syncInstance.Expansion = expansion
+ return
+}
+
+// toService transform SyncService to service-center service
+func toService(syncService *pb.SyncService) (service *scpb.MicroService) {
+ service = &scpb.MicroService{}
+ if syncService.PluginName == PluginName && syncService.Expansion != nil {
+ err := proto.Unmarshal(syncService.Expansion, service)
+ if err == nil {
+ service.ServiceId = syncService.ServiceId
+ return
+ }
+ log.Errorf(err, "proto unmarshal %s service, serviceID = %s, content = %v failed",
+ PluginName, syncService.ServiceId, syncService.Expansion)
+ }
+ service.AppId = syncService.App
+ service.ServiceId = syncService.ServiceId
+ service.ServiceName = syncService.Name
+ service.Version = syncService.Version
+ service.Status = pb.SyncService_Status_name[int32(syncService.Status)]
+ service.Environment = syncService.Environment
+ return
+}
+
+// toInstance transform SyncInstance to service-center instance
+func toInstance(syncInstance *pb.SyncInstance) (instance *scpb.MicroServiceInstance) {
+ instance = &scpb.MicroServiceInstance{}
+ if syncInstance.PluginName == PluginName && syncInstance.Expansion != nil {
+ err := proto.Unmarshal(syncInstance.Expansion, instance)
+ if err == nil {
+ instance.InstanceId = syncInstance.InstanceId
+ instance.ServiceId = syncInstance.ServiceId
+ return
+ }
+ log.Errorf(err, "proto unmarshal %s instance, instanceID = %s, content = %v failed",
+ PluginName, syncInstance.InstanceId, syncInstance.Expansion)
+ }
+ instance.InstanceId = syncInstance.InstanceId
+ instance.ServiceId = syncInstance.ServiceId
+ instance.Endpoints = make([]string, 0, len(syncInstance.Endpoints))
+ instance.HostName = syncInstance.HostName
+ instance.Version = syncInstance.Version
+ instance.Status = pb.SyncInstance_Status_name[int32(syncInstance.Status)]
+
+ for _, ep := range syncInstance.Endpoints {
+ addr, err := url.Parse(ep)
+ if err != nil {
+ log.Errorf(err, "parse sc instance endpoint failed: %s", err)
+ continue
+ }
+ endpoint := ""
+ switch addr.Scheme {
+ case "http":
+ endpoint = strings.Replace(ep, "http://", "rest://", 1)
+ case "https":
+ endpoint = strings.Replace(ep, "https://", "rest://", 1) + "?sslEnabled=true"
+ }
+ instance.Endpoints = append(instance.Endpoints, endpoint)
+ }
+
+ if syncInstance.HealthCheck != nil {
+ instance.HealthCheck = &scpb.HealthCheck{
+ Port: syncInstance.HealthCheck.Port,
+ Interval: syncInstance.HealthCheck.Interval,
+ Times: syncInstance.HealthCheck.Times,
+ Url: syncInstance.HealthCheck.Url,
+ }
+ }
+ return
}
diff --git a/syncer/proto/syncer.pb.go b/syncer/proto/syncer.pb.go
index 7d502a2..0d4b919 100644
--- a/syncer/proto/syncer.pb.go
+++ b/syncer/proto/syncer.pb.go
@@ -4,15 +4,15 @@
/*
Package proto is a generated protocol buffer package.
-import "../../server/core/proto/services.proto";
-
It is generated from these files:
syncer.proto
It has these top-level messages:
PullRequest
- SyncService
SyncData
+ SyncService
+ SyncInstance
+ HealthCheck
MappingEntry
*/
package proto
@@ -20,7 +20,6 @@ package proto
import proto1 "github.com/golang/protobuf/proto"
import fmt "fmt"
import math "math"
-import proto2 "github.com/apache/servicecomb-service-center/server/core/proto"
import (
context "golang.org/x/net/context"
@@ -38,6 +37,84 @@ var _ = math.Inf
// proto package needs to be updated.
const _ = proto1.ProtoPackageIsVersion2 // please upgrade the proto package
+type SyncService_Status int32
+
+const (
+ SyncService_UNKNOWN SyncService_Status = 0
+ SyncService_UP SyncService_Status = 1
+ SyncService_DOWN SyncService_Status = 2
+)
+
+var SyncService_Status_name = map[int32]string{
+ 0: "UNKNOWN",
+ 1: "UP",
+ 2: "DOWN",
+}
+var SyncService_Status_value = map[string]int32{
+ "UNKNOWN": 0,
+ "UP": 1,
+ "DOWN": 2,
+}
+
+func (x SyncService_Status) String() string {
+ return proto1.EnumName(SyncService_Status_name, int32(x))
+}
+func (SyncService_Status) EnumDescriptor() ([]byte, []int) { return fileDescriptor0, []int{2, 0} }
+
+type SyncInstance_Status int32
+
+const (
+ SyncInstance_UNKNOWN SyncInstance_Status = 0
+ SyncInstance_UP SyncInstance_Status = 1
+ SyncInstance_DOWN SyncInstance_Status = 2
+ SyncInstance_STARTING SyncInstance_Status = 3
+ SyncInstance_OUTOFSERVICE SyncInstance_Status = 4
+)
+
+var SyncInstance_Status_name = map[int32]string{
+ 0: "UNKNOWN",
+ 1: "UP",
+ 2: "DOWN",
+ 3: "STARTING",
+ 4: "OUTOFSERVICE",
+}
+var SyncInstance_Status_value = map[string]int32{
+ "UNKNOWN": 0,
+ "UP": 1,
+ "DOWN": 2,
+ "STARTING": 3,
+ "OUTOFSERVICE": 4,
+}
+
+func (x SyncInstance_Status) String() string {
+ return proto1.EnumName(SyncInstance_Status_name, int32(x))
+}
+func (SyncInstance_Status) EnumDescriptor() ([]byte, []int) { return fileDescriptor0, []int{3, 0} }
+
+type HealthCheck_Modes int32
+
+const (
+ HealthCheck_UNKNOWN HealthCheck_Modes = 0
+ HealthCheck_PUSH HealthCheck_Modes = 1
+ HealthCheck_PULL HealthCheck_Modes = 2
+)
+
+var HealthCheck_Modes_name = map[int32]string{
+ 0: "UNKNOWN",
+ 1: "PUSH",
+ 2: "PULL",
+}
+var HealthCheck_Modes_value = map[string]int32{
+ "UNKNOWN": 0,
+ "PUSH": 1,
+ "PULL": 2,
+}
+
+func (x HealthCheck_Modes) String() string {
+ return proto1.EnumName(HealthCheck_Modes_name, int32(x))
+}
+func (HealthCheck_Modes) EnumDescriptor() ([]byte, []int) { return fileDescriptor0, []int{4, 0} }
+
type PullRequest struct {
ServiceName string `protobuf:"bytes,1,opt,name=serviceName" json:"serviceName,omitempty"`
Options string `protobuf:"bytes,2,opt,name=options" json:"options,omitempty"`
@@ -70,16 +147,81 @@ func (m *PullRequest) GetTime() string {
return ""
}
+type SyncData struct {
+ Services []*SyncService `protobuf:"bytes,1,rep,name=services" json:"services,omitempty"`
+ Instances []*SyncInstance `protobuf:"bytes,2,rep,name=Instances" json:"Instances,omitempty"`
+}
+
+func (m *SyncData) Reset() { *m = SyncData{} }
+func (m *SyncData) String() string { return proto1.CompactTextString(m) }
+func (*SyncData) ProtoMessage() {}
+func (*SyncData) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{1} }
+
+func (m *SyncData) GetServices() []*SyncService {
+ if m != nil {
+ return m.Services
+ }
+ return nil
+}
+
+func (m *SyncData) GetInstances() []*SyncInstance {
+ if m != nil {
+ return m.Instances
+ }
+ return nil
+}
+
type SyncService struct {
- DomainProject string `protobuf:"bytes,1,opt,name=domainProject" json:"domainProject,omitempty"`
- Service *proto2.MicroService `protobuf:"bytes,2,opt,name=service" json:"service,omitempty"`
- Instances []*proto2.MicroServiceInstance `protobuf:"bytes,3,rep,name=instances" json:"instances,omitempty"`
+ ServiceId string `protobuf:"bytes,1,opt,name=serviceId" json:"serviceId,omitempty"`
+ App string `protobuf:"bytes,2,opt,name=app" json:"app,omitempty"`
+ Name string `protobuf:"bytes,3,opt,name=name" json:"name,omitempty"`
+ Version string `protobuf:"bytes,4,opt,name=version" json:"version,omitempty"`
+ Status SyncService_Status `protobuf:"varint,5,opt,name=status,enum=proto.SyncService_Status" json:"status,omitempty"`
+ DomainProject string `protobuf:"bytes,6,opt,name=domainProject" json:"domainProject,omitempty"`
+ Environment string `protobuf:"bytes,7,opt,name=environment" json:"environment,omitempty"`
+ PluginName string `protobuf:"bytes,8,opt,name=pluginName" json:"pluginName,omitempty"`
+ Expansion []byte `protobuf:"bytes,9,opt,name=expansion,proto3" json:"expansion,omitempty"`
}
func (m *SyncService) Reset() { *m = SyncService{} }
func (m *SyncService) String() string { return proto1.CompactTextString(m) }
func (*SyncService) ProtoMessage() {}
-func (*SyncService) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{1} }
+func (*SyncService) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{2} }
+
+func (m *SyncService) GetServiceId() string {
+ if m != nil {
+ return m.ServiceId
+ }
+ return ""
+}
+
+func (m *SyncService) GetApp() string {
+ if m != nil {
+ return m.App
+ }
+ return ""
+}
+
+func (m *SyncService) GetName() string {
+ if m != nil {
+ return m.Name
+ }
+ return ""
+}
+
+func (m *SyncService) GetVersion() string {
+ if m != nil {
+ return m.Version
+ }
+ return ""
+}
+
+func (m *SyncService) GetStatus() SyncService_Status {
+ if m != nil {
+ return m.Status
+ }
+ return SyncService_UNKNOWN
+}
func (m *SyncService) GetDomainProject() string {
if m != nil {
@@ -88,38 +230,158 @@ func (m *SyncService) GetDomainProject() string {
return ""
}
-func (m *SyncService) GetService() *proto2.MicroService {
+func (m *SyncService) GetEnvironment() string {
+ if m != nil {
+ return m.Environment
+ }
+ return ""
+}
+
+func (m *SyncService) GetPluginName() string {
+ if m != nil {
+ return m.PluginName
+ }
+ return ""
+}
+
+func (m *SyncService) GetExpansion() []byte {
if m != nil {
- return m.Service
+ return m.Expansion
}
return nil
}
-func (m *SyncService) GetInstances() []*proto2.MicroServiceInstance {
+type SyncInstance struct {
+ InstanceId string `protobuf:"bytes,1,opt,name=instanceId" json:"instanceId,omitempty"`
+ ServiceId string `protobuf:"bytes,2,opt,name=serviceId" json:"serviceId,omitempty"`
+ Endpoints []string `protobuf:"bytes,3,rep,name=endpoints" json:"endpoints,omitempty"`
+ HostName string `protobuf:"bytes,4,opt,name=hostName" json:"hostName,omitempty"`
+ Status SyncInstance_Status `protobuf:"varint,5,opt,name=status,enum=proto.SyncInstance_Status" json:"status,omitempty"`
+ HealthCheck *HealthCheck `protobuf:"bytes,6,opt,name=healthCheck" json:"healthCheck,omitempty"`
+ Version string `protobuf:"bytes,7,opt,name=version" json:"version,omitempty"`
+ PluginName string `protobuf:"bytes,8,opt,name=pluginName" json:"pluginName,omitempty"`
+ Expansion []byte `protobuf:"bytes,9,opt,name=expansion,proto3" json:"expansion,omitempty"`
+}
+
+func (m *SyncInstance) Reset() { *m = SyncInstance{} }
+func (m *SyncInstance) String() string { return proto1.CompactTextString(m) }
+func (*SyncInstance) ProtoMessage() {}
+func (*SyncInstance) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{3} }
+
+func (m *SyncInstance) GetInstanceId() string {
if m != nil {
- return m.Instances
+ return m.InstanceId
+ }
+ return ""
+}
+
+func (m *SyncInstance) GetServiceId() string {
+ if m != nil {
+ return m.ServiceId
+ }
+ return ""
+}
+
+func (m *SyncInstance) GetEndpoints() []string {
+ if m != nil {
+ return m.Endpoints
}
return nil
}
-type SyncData struct {
- Services []*SyncService `protobuf:"bytes,1,rep,name=services" json:"services,omitempty"`
+func (m *SyncInstance) GetHostName() string {
+ if m != nil {
+ return m.HostName
+ }
+ return ""
}
-func (m *SyncData) Reset() { *m = SyncData{} }
-func (m *SyncData) String() string { return proto1.CompactTextString(m) }
-func (*SyncData) ProtoMessage() {}
-func (*SyncData) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{2} }
+func (m *SyncInstance) GetStatus() SyncInstance_Status {
+ if m != nil {
+ return m.Status
+ }
+ return SyncInstance_UNKNOWN
+}
-func (m *SyncData) GetServices() []*SyncService {
+func (m *SyncInstance) GetHealthCheck() *HealthCheck {
if m != nil {
- return m.Services
+ return m.HealthCheck
+ }
+ return nil
+}
+
+func (m *SyncInstance) GetVersion() string {
+ if m != nil {
+ return m.Version
+ }
+ return ""
+}
+
+func (m *SyncInstance) GetPluginName() string {
+ if m != nil {
+ return m.PluginName
+ }
+ return ""
+}
+
+func (m *SyncInstance) GetExpansion() []byte {
+ if m != nil {
+ return m.Expansion
}
return nil
}
+type HealthCheck struct {
+ Mode HealthCheck_Modes `protobuf:"varint,1,opt,name=mode,enum=proto.HealthCheck_Modes" json:"mode,omitempty"`
+ Port int32 `protobuf:"varint,2,opt,name=port" json:"port,omitempty"`
+ Interval int32 `protobuf:"varint,3,opt,name=interval" json:"interval,omitempty"`
+ Times int32 `protobuf:"varint,4,opt,name=times" json:"times,omitempty"`
+ Url string `protobuf:"bytes,5,opt,name=url" json:"url,omitempty"`
+}
+
+func (m *HealthCheck) Reset() { *m = HealthCheck{} }
+func (m *HealthCheck) String() string { return proto1.CompactTextString(m) }
+func (*HealthCheck) ProtoMessage() {}
+func (*HealthCheck) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{4} }
+
+func (m *HealthCheck) GetMode() HealthCheck_Modes {
+ if m != nil {
+ return m.Mode
+ }
+ return HealthCheck_UNKNOWN
+}
+
+func (m *HealthCheck) GetPort() int32 {
+ if m != nil {
+ return m.Port
+ }
+ return 0
+}
+
+func (m *HealthCheck) GetInterval() int32 {
+ if m != nil {
+ return m.Interval
+ }
+ return 0
+}
+
+func (m *HealthCheck) GetTimes() int32 {
+ if m != nil {
+ return m.Times
+ }
+ return 0
+}
+
+func (m *HealthCheck) GetUrl() string {
+ if m != nil {
+ return m.Url
+ }
+ return ""
+}
+
type MappingEntry struct {
- ClusterName string `protobuf:"bytes,1,opt,name=clusterName" json:"clusterName,omitempty"`
+ ClusterName string `protobuf:"bytes,1,opt,name=clusterName" json:"clusterName,omitempty"`
+ // Tenant tenant = 2;
DomainProject string `protobuf:"bytes,2,opt,name=domainProject" json:"domainProject,omitempty"`
OrgServiceID string `protobuf:"bytes,3,opt,name=orgServiceID" json:"orgServiceID,omitempty"`
OrgInstanceID string `protobuf:"bytes,4,opt,name=orgInstanceID" json:"orgInstanceID,omitempty"`
@@ -130,7 +392,7 @@ type MappingEntry struct {
func (m *MappingEntry) Reset() { *m = MappingEntry{} }
func (m *MappingEntry) String() string { return proto1.CompactTextString(m) }
func (*MappingEntry) ProtoMessage() {}
-func (*MappingEntry) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{3} }
+func (*MappingEntry) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{5} }
func (m *MappingEntry) GetClusterName() string {
if m != nil {
@@ -176,9 +438,14 @@ func (m *MappingEntry) GetCurInstanceID() string {
func init() {
proto1.RegisterType((*PullRequest)(nil), "proto.PullRequest")
- proto1.RegisterType((*SyncService)(nil), "proto.SyncService")
proto1.RegisterType((*SyncData)(nil), "proto.SyncData")
+ proto1.RegisterType((*SyncService)(nil), "proto.SyncService")
+ proto1.RegisterType((*SyncInstance)(nil), "proto.SyncInstance")
+ proto1.RegisterType((*HealthCheck)(nil), "proto.HealthCheck")
proto1.RegisterType((*MappingEntry)(nil), "proto.MappingEntry")
+ proto1.RegisterEnum("proto.SyncService_Status", SyncService_Status_name, SyncService_Status_value)
+ proto1.RegisterEnum("proto.SyncInstance_Status", SyncInstance_Status_name, SyncInstance_Status_value)
+ proto1.RegisterEnum("proto.HealthCheck_Modes", HealthCheck_Modes_name, HealthCheck_Modes_value)
}
// Reference imports to suppress errors if they are not otherwise used.
@@ -256,26 +523,47 @@ var _Sync_serviceDesc = grpc.ServiceDesc{
func init() { proto1.RegisterFile("syncer.proto", fileDescriptor0) }
var fileDescriptor0 = []byte{
- // 336 bytes of a gzipped FileDescriptorProto
- 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x6c, 0x92, 0xbf, 0x4e, 0xc3, 0x30,
- 0x10, 0x87, 0x49, 0x9b, 0xfe, 0xbb, 0x14, 0x90, 0x8e, 0xc5, 0x2a, 0x4b, 0x15, 0x31, 0x74, 0x69,
- 0x87, 0x22, 0x06, 0x98, 0xcb, 0xd0, 0xa1, 0xa8, 0x4a, 0x67, 0x86, 0x60, 0xac, 0xca, 0xa8, 0xb5,
- 0x83, 0xed, 0x20, 0xe5, 0x61, 0x78, 0x41, 0x9e, 0x02, 0xd9, 0x71, 0xa8, 0x23, 0x3a, 0x25, 0xf9,
- 0x72, 0xfe, 0xee, 0xee, 0x27, 0xc3, 0x58, 0x57, 0x82, 0x32, 0xb5, 0x28, 0x94, 0x34, 0x12, 0x7b,
- 0xee, 0x31, 0xb9, 0xd2, 0x4c, 0x7d, 0x71, 0xca, 0x74, 0x8d, 0xd3, 0x57, 0x48, 0xb6, 0xe5, 0xe1,
- 0x90, 0xb1, 0xcf, 0x92, 0x69, 0x83, 0x53, 0x48, 0x7c, 0xc1, 0x4b, 0x7e, 0x64, 0x24, 0x9a, 0x46,
- 0xb3, 0x51, 0x16, 0x22, 0x24, 0x30, 0x90, 0x85, 0xe1, 0x52, 0x68, 0xd2, 0x71, 0x7f, 0x9b, 0x4f,
- 0x44, 0x88, 0x0d, 0x3f, 0x32, 0xd2, 0x75, 0xd8, 0xbd, 0xa7, 0xdf, 0x11, 0x24, 0xbb, 0x4a, 0xd0,
- 0x5d, 0x6d, 0xc0, 0x3b, 0xb8, 0x7c, 0x97, 0xc7, 0x9c, 0x8b, 0xad, 0x92, 0x1f, 0x8c, 0x1a, 0xdf,
- 0xa1, 0x0d, 0x71, 0x0e, 0x03, 0xdf, 0xd2, 0xf5, 0x48, 0x96, 0x37, 0xf5, 0xb4, 0x8b, 0x0d, 0xa7,
- 0x4a, 0x7a, 0x57, 0xd6, 0xd4, 0xe0, 0x23, 0x8c, 0xb8, 0xd0, 0x26, 0x17, 0x94, 0x69, 0xd2, 0x9d,
- 0x76, 0x67, 0xc9, 0xf2, 0xf6, 0xcc, 0x81, 0xb5, 0xaf, 0xc9, 0x4e, 0xd5, 0xe9, 0x13, 0x0c, 0xed,
- 0x78, 0xab, 0xdc, 0xe4, 0xb8, 0x80, 0x61, 0x13, 0x0e, 0x89, 0x9c, 0x05, 0xbd, 0x25, 0xd8, 0x20,
- 0xfb, 0xab, 0x49, 0x7f, 0x22, 0x18, 0x6f, 0xf2, 0xa2, 0xe0, 0x62, 0xff, 0x2c, 0x8c, 0xaa, 0x6c,
- 0x78, 0xf4, 0x50, 0x6a, 0xc3, 0x54, 0x18, 0x5e, 0x80, 0xfe, 0xaf, 0xdf, 0x39, 0xb7, 0x7e, 0x0a,
- 0x63, 0xa9, 0xf6, 0xcd, 0xd4, 0x2b, 0x1f, 0x68, 0x8b, 0x59, 0x93, 0x54, 0xfb, 0x66, 0xa5, 0xf5,
- 0x8a, 0xc4, 0xb5, 0xa9, 0x05, 0xad, 0x89, 0x96, 0xea, 0x64, 0xea, 0xd5, 0xa6, 0x90, 0x59, 0x13,
- 0x2d, 0x55, 0x60, 0xea, 0xd7, 0xa6, 0x16, 0x5c, 0x3e, 0x40, 0x6c, 0x53, 0xc0, 0x39, 0xc4, 0xf6,
- 0xbe, 0x60, 0x13, 0x4d, 0x70, 0x79, 0x26, 0xd7, 0x41, 0x5c, 0x36, 0xd1, 0xf4, 0xe2, 0xad, 0xef,
- 0xc8, 0xfd, 0x6f, 0x00, 0x00, 0x00, 0xff, 0xff, 0xfa, 0x2b, 0x2b, 0x6b, 0x8c, 0x02, 0x00, 0x00,
+ // 662 bytes of a gzipped FileDescriptorProto
+ 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xa4, 0x54, 0xd1, 0x6e, 0xd3, 0x30,
+ 0x14, 0x5d, 0xd2, 0xb4, 0x4d, 0x6f, 0xc2, 0x88, 0x0c, 0x0f, 0x61, 0x42, 0xa8, 0x8a, 0x90, 0xe8,
+ 0x03, 0x54, 0x5a, 0x81, 0x0f, 0x40, 0xeb, 0xd8, 0x2a, 0xb6, 0xae, 0x72, 0x57, 0x78, 0xe2, 0x21,
+ 0xa4, 0x56, 0x1b, 0x48, 0xed, 0x60, 0x3b, 0x15, 0xfb, 0x0c, 0x3e, 0x88, 0x4f, 0xe0, 0x47, 0xf8,
+ 0x0a, 0x64, 0x27, 0x6d, 0x9c, 0x6d, 0x0f, 0x48, 0x3c, 0xc5, 0x3e, 0xbe, 0xbe, 0xb9, 0xe7, 0xdc,
+ 0x73, 0x0d, 0xbe, 0xb8, 0xa1, 0x09, 0xe1, 0xc3, 0x9c, 0x33, 0xc9, 0x50, 0x5b, 0x7f, 0xa2, 0xcf,
+ 0xe0, 0xcd, 0x8a, 0x2c, 0xc3, 0xe4, 0x7b, 0x41, 0x84, 0x44, 0x7d, 0xf0, 0x04, 0xe1, 0xdb, 0x34,
+ 0x21, 0xd3, 0x78, 0x43, 0x42, 0xab, 0x6f, 0x0d, 0x7a, 0xd8, 0x84, 0x50, 0x08, 0x5d, 0x96, 0xcb,
+ 0x94, 0x51, 0x11, 0xda, 0xfa, 0x74, 0xb7, 0x45, 0x08, 0x1c, 0x99, 0x6e, 0x48, 0xd8, 0xd2, 0xb0,
+ 0x5e, 0x47, 0x1b, 0x70, 0xe7, 0x37, 0x34, 0x19, 0xc7, 0x32, 0x46, 0x43, 0x70, 0xab, 0x44, 0x22,
+ 0xb4, 0xfa, 0xad, 0x81, 0x37, 0x42, 0x65, 0x2d, 0x43, 0x15, 0x32, 0x2f, 0x8f, 0xf0, 0x3e, 0x06,
+ 0x1d, 0x43, 0x6f, 0x42, 0x85, 0x8c, 0xa9, 0xba, 0x60, 0xeb, 0x0b, 0x8f, 0x8c, 0x0b, 0xbb, 0x33,
+ 0x5c, 0x47, 0x45, 0xbf, 0x6d, 0xf0, 0x8c, 0x64, 0xe8, 0x29, 0xf4, 0xaa, 0x74, 0x93, 0x65, 0x45,
+ 0xa6, 0x06, 0x50, 0x00, 0xad, 0x38, 0xcf, 0x2b, 0x1a, 0x6a, 0xa9, 0x28, 0xd0, 0xb8, 0xa6, 0x40,
+ 0x2b, 0xc2, 0x5b, 0xc2, 0x45, 0xca, 0x68, 0xe8, 0x94, 0x84, 0xab, 0x2d, 0x3a, 0x86, 0x8e, 0x90,
+ 0xb1, 0x2c, 0x44, 0xd8, 0xee, 0x5b, 0x83, 0xc3, 0xd1, 0x93, 0xbb, 0x74, 0x86, 0x73, 0x1d, 0x80,
+ 0xab, 0x40, 0xf4, 0x1c, 0x1e, 0x2c, 0xd9, 0x26, 0x4e, 0xe9, 0x8c, 0xb3, 0xaf, 0x24, 0x91, 0x61,
+ 0x47, 0xa7, 0x6c, 0x82, 0xaa, 0x0b, 0x84, 0x6e, 0x53, 0xce, 0xe8, 0x86, 0x50, 0x19, 0x76, 0xcb,
+ 0x2e, 0x18, 0x10, 0x7a, 0x06, 0x90, 0x67, 0xc5, 0x2a, 0xa5, 0xba, 0x4d, 0xae, 0x0e, 0x30, 0x10,
+ 0x45, 0x9c, 0xfc, 0xc8, 0x63, 0xaa, 0xcb, 0xee, 0xf5, 0xad, 0x81, 0x8f, 0x6b, 0x20, 0x7a, 0x01,
+ 0x9d, 0xb2, 0x2e, 0xe4, 0x41, 0x77, 0x31, 0xfd, 0x30, 0xbd, 0xfa, 0x34, 0x0d, 0x0e, 0x50, 0x07,
+ 0xec, 0xc5, 0x2c, 0xb0, 0x90, 0x0b, 0xce, 0x58, 0x21, 0x76, 0xf4, 0xb3, 0x05, 0xbe, 0xa9, 0xb5,
+ 0xfa, 0x6f, 0x5a, 0xad, 0xf7, 0x8a, 0x1a, 0x48, 0x53, 0x70, 0xfb, 0xb6, 0xe0, 0xaa, 0x2a, 0xba,
+ 0xcc, 0x59, 0x4a, 0xa5, 0x08, 0x5b, 0xfd, 0x96, 0x3a, 0xdd, 0x03, 0xe8, 0x08, 0xdc, 0x35, 0x13,
+ 0x52, 0x33, 0x2a, 0x95, 0xde, 0xef, 0xd1, 0xe8, 0x96, 0xd4, 0x47, 0xf7, 0x18, 0xe1, 0xb6, 0xd6,
+ 0x6f, 0xc0, 0x5b, 0x93, 0x38, 0x93, 0xeb, 0x93, 0x35, 0x49, 0xbe, 0x69, 0xa5, 0x6b, 0xcb, 0x9d,
+ 0xd7, 0x27, 0xd8, 0x0c, 0x33, 0xdb, 0xdd, 0x6d, 0xb6, 0xfb, 0xff, 0x34, 0x3f, 0xfb, 0x47, 0xcd,
+ 0x91, 0x0f, 0xee, 0xfc, 0xfa, 0x1d, 0xbe, 0x9e, 0x4c, 0xcf, 0x82, 0x16, 0x0a, 0xc0, 0xbf, 0x5a,
+ 0x5c, 0x5f, 0xbd, 0x9f, 0x9f, 0xe2, 0x8f, 0x93, 0x93, 0xd3, 0xc0, 0x89, 0x7e, 0x59, 0xe0, 0x19,
+ 0xd5, 0xa3, 0x97, 0xe0, 0x6c, 0xd8, 0xb2, 0x9c, 0xd5, 0xc3, 0x51, 0x78, 0x97, 0xdf, 0xf0, 0x92,
+ 0x2d, 0x89, 0xc0, 0x3a, 0x4a, 0x39, 0x3c, 0x67, 0x5c, 0xea, 0xde, 0xb4, 0xb1, 0x5e, 0x2b, 0xe1,
+ 0x53, 0x2a, 0x09, 0xdf, 0xc6, 0x99, 0x76, 0x7e, 0x1b, 0xef, 0xf7, 0xe8, 0x31, 0xb4, 0xd5, 0x20,
+ 0x0b, 0xdd, 0x91, 0x36, 0x2e, 0x37, 0x6a, 0x72, 0x0a, 0x9e, 0xe9, 0x5e, 0xf4, 0xb0, 0x5a, 0x46,
+ 0x03, 0x68, 0xeb, 0xdf, 0x34, 0xd9, 0xb9, 0xe0, 0xcc, 0x16, 0xf3, 0xf3, 0x92, 0xdf, 0x6c, 0x71,
+ 0x71, 0x11, 0xd8, 0xd1, 0x1f, 0x0b, 0xfc, 0xcb, 0x38, 0xcf, 0x53, 0xba, 0x3a, 0xa5, 0x92, 0xdf,
+ 0x28, 0xb7, 0x27, 0x59, 0x21, 0x24, 0xe1, 0xe6, 0x9b, 0x63, 0x40, 0x77, 0xa7, 0xc6, 0xbe, 0x6f,
+ 0x6a, 0x22, 0xf0, 0x19, 0x5f, 0x55, 0x83, 0x37, 0x19, 0x57, 0x43, 0xdc, 0xc0, 0x54, 0x26, 0xc6,
+ 0x57, 0x3b, 0xc7, 0x4c, 0xc6, 0x95, 0xd1, 0x9a, 0xa0, 0xca, 0x94, 0x14, 0xbc, 0xce, 0x54, 0xf2,
+ 0x6c, 0x60, 0x2a, 0x53, 0x52, 0x70, 0x23, 0x53, 0x35, 0xc9, 0x0d, 0x70, 0xf4, 0x16, 0x1c, 0x65,
+ 0x51, 0xf4, 0x0a, 0x1c, 0xf5, 0xcc, 0xa2, 0x9d, 0xfd, 0x8c, 0x37, 0xf7, 0xe8, 0xa1, 0xe1, 0x65,
+ 0xf5, 0x50, 0x46, 0x07, 0x5f, 0x3a, 0x1a, 0x79, 0xfd, 0x37, 0x00, 0x00, 0xff, 0xff, 0xde, 0x85,
+ 0x0f, 0x71, 0xb3, 0x05, 0x00, 0x00,
}
diff --git a/syncer/proto/syncer.proto b/syncer/proto/syncer.proto
index 103e82c..3b571f7 100644
--- a/syncer/proto/syncer.proto
+++ b/syncer/proto/syncer.proto
@@ -1,6 +1,4 @@
syntax = "proto3";
-import "services.proto";
-//import "../../server/core/proto/services.proto";
package proto;
message PullRequest {
@@ -13,18 +11,68 @@ service Sync {
rpc Pull(PullRequest) returns (SyncData) {}
}
+message SyncData {
+ repeated SyncService services = 1;
+ repeated SyncInstance Instances = 2;
+}
+
message SyncService {
- string domainProject = 1;
- MicroService service = 2;
- repeated MicroServiceInstance instances = 3;
+ string serviceId = 1;
+ string app = 2;
+ string name = 3;
+ string version = 4;
+ Status status = 5;
+ string domainProject = 6;
+ string environment = 7;
+ string pluginName = 8;
+ bytes expansion = 9;
+ enum Status {
+ UNKNOWN = 0;
+ UP = 1;
+ DOWN = 2;
+ }
}
-message SyncData {
- repeated SyncService services = 1;
+message SyncInstance {
+ string instanceId = 1;
+ string serviceId = 2;
+ repeated string endpoints = 3;
+ string hostName = 4;
+ Status status = 5;
+ HealthCheck healthCheck = 6;
+ string version = 7;
+ string pluginName = 8;
+ bytes expansion = 9;
+ enum Status {
+ UNKNOWN = 0;
+ UP = 1;
+ DOWN = 2;
+ STARTING = 3;
+ OUTOFSERVICE = 4;
+ }
}
+message HealthCheck {
+ Modes mode = 1;
+ int32 port = 2;
+ int32 interval = 3;
+ int32 times = 4;
+ string url = 5;
+ enum Modes {
+ UNKNOWN = 0;
+ PUSH = 1;
+ PULL = 2;
+ }
+}
+
+//message Tenant {
+// string domain = 1;
+// string project = 2;
+//}
+
message MappingEntry {
string clusterName = 1;
+ // Tenant tenant = 2;
string domainProject = 2;
string orgServiceID = 3;
string orgInstanceID = 4;
diff --git a/syncer/servicecenter/exclude.go b/syncer/servicecenter/exclude.go
index d3f0f21..23cf3a8 100644
--- a/syncer/servicecenter/exclude.go
+++ b/syncer/servicecenter/exclude.go
@@ -17,31 +17,44 @@
package servicecenter
import (
- scpb "github.com/apache/servicecomb-service-center/server/core/proto"
+ "errors"
+
+ "github.com/apache/servicecomb-service-center/pkg/log"
pb "github.com/apache/servicecomb-service-center/syncer/proto"
)
// exclude instances from other servicecenter, exclude the expired instances in the maps
func (s *servicecenter) exclude(data *pb.SyncData, mapping pb.SyncMapping) (*pb.SyncData, pb.SyncMapping) {
services := make([]*pb.SyncService, 0, 10)
+ instances := make([]*pb.SyncInstance, 0, 10)
maps := make(pb.SyncMapping, 0, len(mapping))
- for _, svc := range data.Services {
-
- nis := make([]*scpb.MicroServiceInstance, 0, len(svc.Instances))
- for _, inst := range svc.Instances {
- if index := mapping.CurrentIndex(inst.InstanceId); index != -1 {
- // exclude the expired instances in the maps
- maps = append(maps, mapping[index])
- continue
- }
- // exclude instances from other servicecenter
- nis = append(nis, inst)
+ for _, inst := range data.Instances {
+ if index := mapping.CurrentIndex(inst.InstanceId); index != -1 {
+ // exclude the expired instances in the maps
+ maps = append(maps, mapping[index])
+ continue
}
-
- svc.Instances = nis
+ svc := searchService(inst, data.Services)
+ if svc == nil {
+ err := errors.New("service does not exist")
+ log.Errorf(err, "servicecenter.exclude, serviceID = %s, instanceId = %s", inst.ServiceId, inst.InstanceId)
+ continue
+ }
+ // exclude instances from other servicecenter
+ instances = append(instances, inst)
services = append(services, svc)
}
data.Services = services
-
+ data.Instances = instances
return data, maps
}
+
+// searchService search service by instance
+func searchService(instance *pb.SyncInstance, services []*pb.SyncService) *pb.SyncService {
+ for _, svc := range services {
+ if instance.ServiceId == svc.ServiceId {
+ return svc
+ }
+ }
+ return nil
+}
diff --git a/syncer/servicecenter/servicecenter.go b/syncer/servicecenter/servicecenter.go
index a2da4ad..63f2d29 100644
--- a/syncer/servicecenter/servicecenter.go
+++ b/syncer/servicecenter/servicecenter.go
@@ -18,6 +18,7 @@ package servicecenter
import (
"context"
+ "errors"
"github.com/apache/servicecomb-service-center/pkg/log"
"github.com/apache/servicecomb-service-center/syncer/plugins"
@@ -80,32 +81,38 @@ func (s *servicecenter) FlushData() {
// Registry registry data to the servicecenter, update mapping data
func (s *servicecenter) Registry(clusterName string, data *pb.SyncData) {
mapping := s.storage.GetMapByCluster(clusterName)
- for _, svc := range data.Services {
- log.Debugf("trying to do registration of service, serviceID = %s", svc.Service.ServiceId)
+ for _, inst := range data.Instances {
+ svc := searchService(inst, data.Services)
+ if svc == nil {
+ err := errors.New("service does not exist")
+ log.Errorf(err, "servicecenter.Registry, serviceID = %s, instanceId = %s", inst.ServiceId, inst.InstanceId)
+ continue
+ }
+
// If the svc is in the mapping, just do nothing, if not, created it in servicecenter and get the new serviceID
svcID := s.createService(svc)
- for _, inst := range svc.Instances {
- // If inst is in the mapping, just heart beat it in servicecenter
- log.Debugf("trying to do registration of instance, instanceID = %s", inst.InstanceId)
- if s.heartbeatInstances(mapping, inst) {
- continue
- }
-
- // If inst is not in the mapping, that is because this the first time syncer get the instance data
- // in this case, we should registry it to the servicecenter and get the new instanceID
- item := &pb.MappingEntry{
- DomainProject: svc.DomainProject,
- OrgServiceID: inst.ServiceId,
- OrgInstanceID: inst.InstanceId,
- CurServiceID: svcID,
- ClusterName: clusterName,
- }
- item.CurInstanceID = s.registryInstances(svc.DomainProject, svcID, inst)
-
- // Use new serviceID and instanceID to update mapping data in this servicecenter
- if item.CurInstanceID != "" {
- mapping = append(mapping, item)
- }
+ log.Debugf("create service success orgServiceID= %s, curServiceID = %s", inst.ServiceId, svcID)
+
+ // If inst is in the mapping, just heart beat it in servicecenter
+ log.Debugf("trying to do registration of instance, instanceID = %s", inst.InstanceId)
+ if s.heartbeatInstances(mapping, inst) {
+ continue
+ }
+
+ // If inst is not in the mapping, that is because this the first time syncer get the instance data
+ // in this case, we should registry it to the servicecenter and get the new instanceID
+ item := &pb.MappingEntry{
+ ClusterName: clusterName,
+ DomainProject: svc.DomainProject,
+ OrgServiceID: svc.ServiceId,
+ OrgInstanceID: inst.InstanceId,
+ CurServiceID: svcID,
+ CurInstanceID: s.registryInstances(svc.DomainProject, svcID, inst),
+ }
+
+ // Use new serviceID and instanceID to update mapping data in this servicecenter
+ if item.CurInstanceID != "" {
+ mapping = append(mapping, item)
}
}
// UnRegistry instances that is not in the data which means the instance in the mapping is no longer actived
diff --git a/syncer/servicecenter/servicecenter_test.go b/syncer/servicecenter/servicecenter_test.go
index d12d874..120b109 100644
--- a/syncer/servicecenter/servicecenter_test.go
+++ b/syncer/servicecenter/servicecenter_test.go
@@ -21,7 +21,6 @@ import (
"errors"
"testing"
- "github.com/apache/servicecomb-service-center/server/core/proto"
"github.com/apache/servicecomb-service-center/syncer/config"
"github.com/apache/servicecomb-service-center/syncer/pkg/mock/mockplugin"
"github.com/apache/servicecomb-service-center/syncer/pkg/mock/mocksotrage"
@@ -92,7 +91,7 @@ func TestOnEvent(t *testing.T) {
dc.Registry(clusterName, newData)
- mockplugin.SetRegisterInstance(func(ctx context.Context, domainProject, serviceId string, instance *proto.MicroServiceInstance) (s string, e error) {
+ mockplugin.SetRegisterInstance(func(ctx context.Context, domainProject, serviceId string, instance *pb.SyncInstance) (s string, e error) {
return "", errors.New("test error")
})
diff --git a/syncer/servicecenter/sync.go b/syncer/servicecenter/sync.go
index f86ddf8..b54c904 100644
--- a/syncer/servicecenter/sync.go
+++ b/syncer/servicecenter/sync.go
@@ -20,12 +20,11 @@ import (
"context"
"github.com/apache/servicecomb-service-center/pkg/log"
- scpb "github.com/apache/servicecomb-service-center/server/core/proto"
pb "github.com/apache/servicecomb-service-center/syncer/proto"
)
// Send an instance heartbeat if the instance has already been registered
-func (s *servicecenter) heartbeatInstances(mapping pb.SyncMapping, instance *scpb.MicroServiceInstance) bool {
+func (s *servicecenter) heartbeatInstances(mapping pb.SyncMapping, instance *pb.SyncInstance) bool {
index := mapping.OriginIndex(instance.InstanceId)
if index == -1 {
return false
@@ -37,37 +36,31 @@ func (s *servicecenter) heartbeatInstances(mapping pb.SyncMapping, instance *scp
log.Errorf(err, "Servicecenter heartbeat instance failed")
}
log.Debugf("Instance %s is already exist, sent heartbeat to service-center", item.OrgInstanceID)
- instance.InstanceId = item.CurInstanceID
return true
}
func (s *servicecenter) createService(service *pb.SyncService) string {
ctx := context.Background()
- serviceID, _ := s.servicecenter.ServiceExistence(ctx, service.DomainProject, service.Service)
- if serviceID != "" {
- return serviceID
- }
- service.Service.ServiceId = ""
- serviceID, err := s.servicecenter.CreateService(ctx, service.DomainProject, service.Service)
- if err != nil {
- log.Errorf(err, "Servicecenter create service failed")
- return ""
+ serviceID, _ := s.servicecenter.ServiceExistence(ctx, service.DomainProject, service)
+ if serviceID == "" {
+ var err error
+ serviceID, err = s.servicecenter.CreateService(ctx, service.DomainProject, service)
+ if err != nil {
+ log.Errorf(err, "Servicecenter create service failed")
+ return ""
+ }
+ log.Debugf("Create service successful, serviceID = %s", serviceID)
}
- log.Debugf("Create service successful, serviceID = %s", serviceID)
- service.Service.ServiceId = serviceID
return serviceID
}
-func (s *servicecenter) registryInstances(domainProject, serviceId string, instance *scpb.MicroServiceInstance) string {
- instance.ServiceId = serviceId
- instance.InstanceId = ""
+func (s *servicecenter) registryInstances(domainProject, serviceId string, instance *pb.SyncInstance) string {
instanceID, err := s.servicecenter.RegisterInstance(context.Background(), domainProject, serviceId, instance)
if err != nil {
log.Errorf(err, "Servicecenter registry instance failed")
return ""
}
log.Debugf("Registered instance successful, instanceID = %s", instanceID)
- instance.InstanceId = instanceID
return instanceID
}
@@ -77,12 +70,10 @@ func (s *servicecenter) unRegistryInstances(data *pb.SyncData, mapping pb.SyncMa
nm := make(pb.SyncMapping, 0, len(mapping))
next:
for _, val := range mapping {
- for _, svc := range data.Services {
- for _, inst := range svc.Instances {
- if val.CurInstanceID == inst.InstanceId {
- nm = append(nm, val)
- continue next
- }
+ for _, inst := range data.Instances {
+ if val.OrgInstanceID == inst.InstanceId {
+ nm = append(nm, val)
+ continue next
}
}