You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by ti...@apache.org on 2021/12/29 01:49:12 UTC
[servicecomb-service-center] branch master updated: Refactor: add self registry service (#1190)
This is an automated email from the ASF dual-hosted git repository.
tianxiaoliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-service-center.git
The following commit(s) were added to refs/heads/master by this push:
new 35b4680 Refactor: add self registry service (#1190)
35b4680 is described below
commit 35b46800a60df832170c10964fca48d897fc052c
Author: little-cui <su...@qq.com>
AuthorDate: Wed Dec 29 09:49:04 2021 +0800
Refactor: add self registry service (#1190)
---
datasource/engine.go | 2 -
datasource/engine_test.go | 98 ---------------
datasource/etcd/engine.go | 139 ---------------------
datasource/mongo/engine.go | 138 --------------------
server/api_server.go | 9 +-
server/datacache/cache.go | 8 --
.../service/registry/registry.go | 70 +++--------
7 files changed, 18 insertions(+), 446 deletions(-)
diff --git a/datasource/engine.go b/datasource/engine.go
index d2c3265..1c2d6d5 100644
--- a/datasource/engine.go
+++ b/datasource/engine.go
@@ -25,8 +25,6 @@ import (
// SCManager contains the APIs of registration of SC itself
type SCManager interface {
- SelfRegister(ctx context.Context) error
- SelfUnregister(ctx context.Context) error
UpgradeVersion(ctx context.Context) error
GetClusters(ctx context.Context) (etcdadpt.Clusters, error)
}
diff --git a/datasource/engine_test.go b/datasource/engine_test.go
deleted file mode 100644
index cc3a048..0000000
--- a/datasource/engine_test.go
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package datasource_test
-
-import (
- "context"
- "fmt"
-
- "github.com/apache/servicecomb-service-center/datasource/etcd/path"
-
- "github.com/apache/servicecomb-service-center/pkg/util"
- apt "github.com/apache/servicecomb-service-center/server/core"
- pb "github.com/go-chassis/cari/discovery"
- . "github.com/onsi/ginkgo"
- . "github.com/onsi/gomega"
-)
-
-// map[domainProject][serviceName]*serviceCleanInfo
-var svcCleanInfos = make(map[string]map[string]*serviceCleanInfo)
-
-type serviceCleanInfo struct {
- ServiceName string
- ServiceId string
- WithInstance bool
- ShouldClear bool
-}
-
-func getContextWith(domain string, project string) context.Context {
- return util.WithNoCache(util.SetDomainProject(context.Background(), domain, project))
-}
-
-func createService(domain string, project string, name string, withInstance bool, shouldClear bool) {
- By(fmt.Sprintf("create service: %s, with instance: %t, should clear: %t", name, withInstance, shouldClear))
- svc := &pb.CreateServiceRequest{
- Service: &pb.MicroService{
- AppId: "clear",
- ServiceName: name,
- Version: "1.0",
- },
- }
- if withInstance {
- svc.Instances = []*pb.MicroServiceInstance{
- {
- Endpoints: []string{"http://127.0.0.1:80"},
- HostName: "1",
- },
- }
- }
- ctx := getContextWith(domain, project)
- svcResp, err := apt.ServiceAPI.Create(ctx, svc)
- Expect(err).To(BeNil())
- Expect(svcResp).NotTo(BeNil())
- Expect(svcResp.Response.GetCode()).To(Equal(pb.ResponseSuccess))
- info := &serviceCleanInfo{
- ServiceName: name,
- ServiceId: svcResp.ServiceId,
- WithInstance: withInstance,
- ShouldClear: shouldClear,
- }
- domainProject := domain + path.SPLIT + project
- m, ok := svcCleanInfos[domainProject]
- if !ok {
- m = make(map[string]*serviceCleanInfo)
- svcCleanInfos[domainProject] = m
- }
- m[name] = info
-}
-
-func checkServiceCleared(domain string, project string) {
- domainProject := domain + path.SPLIT + project
- m := svcCleanInfos[domainProject]
- for _, v := range m {
- By(fmt.Sprintf("check cleared, service: %s, should be cleared: %t", v.ServiceName, v.ShouldClear))
- getSvcReq := &pb.GetServiceRequest{
- ServiceId: v.ServiceId,
- }
- ctx := getContextWith(domain, project)
- getSvcResp, err := apt.ServiceAPI.GetOne(ctx, getSvcReq)
- Expect(err).To(BeNil())
- Expect(getSvcResp).NotTo(BeNil())
- Expect(getSvcResp.Response.GetCode() == pb.ResponseSuccess).To(Equal(!v.ShouldClear))
- }
-}
diff --git a/datasource/etcd/engine.go b/datasource/etcd/engine.go
index 3666aa3..c9345bb 100644
--- a/datasource/etcd/engine.go
+++ b/datasource/etcd/engine.go
@@ -20,158 +20,19 @@ package etcd
import (
"context"
"encoding/json"
- "errors"
- "fmt"
"os"
- "time"
- "github.com/apache/servicecomb-service-center/datasource"
"github.com/apache/servicecomb-service-center/datasource/etcd/mux"
"github.com/apache/servicecomb-service-center/datasource/etcd/path"
"github.com/apache/servicecomb-service-center/pkg/log"
"github.com/apache/servicecomb-service-center/server/config"
- "github.com/apache/servicecomb-service-center/server/core"
- discosvc "github.com/apache/servicecomb-service-center/server/service/disco"
"github.com/apache/servicecomb-service-center/version"
- pb "github.com/go-chassis/cari/discovery"
- "github.com/go-chassis/foundation/gopool"
"github.com/little-cui/etcdadpt"
)
type SCManager struct {
}
-func (sm *SCManager) SelfRegister(ctx context.Context) error {
- err := sm.selfRegister(ctx)
- if err != nil {
- return err
- }
- // start send heart beat job
- sm.autoSelfHeartBeat()
- return nil
-}
-
-func (sm *SCManager) selfRegister(pCtx context.Context) error {
- ctx := core.AddDefaultContextValue(pCtx)
- err := sm.registerService(ctx)
- if err != nil {
- return err
- }
- // 实例信息
- return sm.registerInstance(ctx)
-}
-
-func (sm *SCManager) registerService(ctx context.Context) error {
- respE, err := core.ServiceAPI.Exist(ctx, core.GetExistenceRequest())
- if err != nil {
- log.Error("query service center existence failed", err)
- return err
- }
- if respE.Response.GetCode() == pb.ResponseSuccess {
- log.Warn(fmt.Sprintf("service center service[%s] already registered", respE.ServiceId))
- respG, err := core.ServiceAPI.GetOne(ctx, core.GetServiceRequest(respE.ServiceId))
- if respG.Response.GetCode() != pb.ResponseSuccess {
- log.Error(fmt.Sprintf("query service center service[%s] info failed", respE.ServiceId), err)
- return datasource.ErrServiceNotExists
- }
- core.Service = respG.Service
- return nil
- }
-
- respS, err := core.ServiceAPI.Create(ctx, core.CreateServiceRequest())
- if err != nil {
- log.Error("register service center failed", err)
- return err
- }
- if respS.Response.GetCode() != pb.ResponseSuccess {
- log.Error("register service center failed, msg: "+respS.Response.GetMessage(), nil)
- return errors.New(respS.Response.GetMessage())
- }
- core.Service.ServiceId = respS.ServiceId
- log.Info(fmt.Sprintf("register service center service[%s]", respS.ServiceId))
- return nil
-}
-
-func (sm *SCManager) registerInstance(ctx context.Context) error {
- core.Instance.InstanceId = ""
- core.Instance.ServiceId = core.Service.ServiceId
- respI, err := discosvc.RegisterInstance(ctx, core.RegisterInstanceRequest())
- if err != nil {
- log.Error("register failed", err)
- return err
- }
- if respI.Response.GetCode() != pb.ResponseSuccess {
- log.Error(fmt.Sprintf("register service center[%s] instance failed, %s",
- core.Instance.ServiceId, respI.Response.GetMessage()), nil)
- return errors.New(respI.Response.GetMessage())
- }
- core.Instance.InstanceId = respI.InstanceId
- log.Info(fmt.Sprintf("register service center instance[%s/%s], endpoints is %s",
- core.Service.ServiceId, respI.InstanceId, core.Instance.Endpoints))
- return nil
-}
-
-func (sm *SCManager) selfHeartBeat(pCtx context.Context) error {
- ctx := core.AddDefaultContextValue(pCtx)
- respI, err := discosvc.Heartbeat(ctx, core.HeartbeatRequest())
- if err != nil {
- log.Error("send heartbeat failed", err)
- return err
- }
- if respI.Response.GetCode() == pb.ResponseSuccess {
- log.Debug(fmt.Sprintf("update service center instance[%s/%s] heartbeat",
- core.Instance.ServiceId, core.Instance.InstanceId))
- return nil
- }
- err = fmt.Errorf(respI.Response.GetMessage())
- log.Error(fmt.Sprintf("update service center instance[%s/%s] heartbeat failed",
- core.Instance.ServiceId, core.Instance.InstanceId), err)
- return err
-}
-
-func (sm *SCManager) autoSelfHeartBeat() {
- gopool.Go(func(ctx context.Context) {
- for {
- select {
- case <-ctx.Done():
- return
- case <-time.After(time.Duration(core.Instance.HealthCheck.Interval) * time.Second):
- err := sm.selfHeartBeat(ctx)
- if err == nil {
- continue
- }
- //服务不存在,创建服务
- err = sm.selfRegister(ctx)
- if err != nil {
- log.Error(fmt.Sprintf("retry to register[%s/%s/%s/%s] failed",
- core.Service.Environment, core.Service.AppId, core.Service.ServiceName, core.Service.Version), err)
- }
- }
- }
- })
-}
-
-func (sm *SCManager) SelfUnregister(pCtx context.Context) error {
- if len(core.Instance.InstanceId) == 0 {
- return nil
- }
- ctx := core.AddDefaultContextValue(pCtx)
- respI, err := discosvc.UnregisterInstance(ctx, core.UnregisterInstanceRequest())
- if err != nil {
- log.Error("unregister failed", err)
- return err
- }
- if respI.Response.GetCode() != pb.ResponseSuccess {
- err = fmt.Errorf("unregister service center instance[%s/%s] failed, %s",
- core.Instance.ServiceId, core.Instance.InstanceId, respI.Response.GetMessage())
- log.Error(err.Error(), nil)
- return err
- }
- log.Warn(fmt.Sprintf("unregister service center instance[%s/%s]",
- core.Service.ServiceId, core.Instance.InstanceId))
- return nil
-}
-
func (sm *SCManager) GetClusters(ctx context.Context) (etcdadpt.Clusters, error) {
return etcdadpt.ListCluster(ctx)
}
diff --git a/datasource/mongo/engine.go b/datasource/mongo/engine.go
index a5eb730..7be13da 100644
--- a/datasource/mongo/engine.go
+++ b/datasource/mongo/engine.go
@@ -19,60 +19,13 @@ package mongo
import (
"context"
- "fmt"
- "time"
- "github.com/apache/servicecomb-service-center/datasource"
- mutil "github.com/apache/servicecomb-service-center/datasource/mongo/util"
- "github.com/apache/servicecomb-service-center/pkg/log"
- "github.com/apache/servicecomb-service-center/server/core"
- "github.com/apache/servicecomb-service-center/server/metrics"
- discosvc "github.com/apache/servicecomb-service-center/server/service/disco"
- pb "github.com/go-chassis/cari/discovery"
- "github.com/go-chassis/foundation/gopool"
"github.com/little-cui/etcdadpt"
)
type SCManager struct {
}
-func (ds *SCManager) SelfRegister(ctx context.Context) error {
- err := ds.registryService(ctx)
- if err != nil {
- return err
- }
-
- // 实例信息
- err = ds.registryInstance(ctx)
-
- // wait heartbeat
- ds.autoSelfHeartBeat()
-
- metrics.ReportScInstance()
- return err
-}
-func (ds *SCManager) SelfUnregister(ctx context.Context) error {
- if len(core.Instance.InstanceId) == 0 {
- return nil
- }
-
- ctx = core.AddDefaultContextValue(ctx)
- respI, err := datasource.GetMetadataManager().UnregisterInstance(ctx, core.UnregisterInstanceRequest())
- if err != nil {
- log.Error("unregister failed", err)
- return err
- }
- if respI.Response.GetCode() != pb.ResponseSuccess {
- err = fmt.Errorf("unregister service center instance[%s/%s] failed, %s",
- core.Instance.ServiceId, core.Instance.InstanceId, respI.Response.GetMessage())
- log.Error(err.Error(), nil)
- return err
- }
- log.Warn(fmt.Sprintf("unregister service center instance[%s/%s]",
- core.Service.ServiceId, core.Instance.InstanceId))
- return nil
-}
-
func (ds *SCManager) UpgradeVersion(ctx context.Context) error {
return nil
}
@@ -80,94 +33,3 @@ func (ds *SCManager) UpgradeVersion(ctx context.Context) error {
func (ds *SCManager) GetClusters(ctx context.Context) (etcdadpt.Clusters, error) {
return nil, nil
}
-
-func (ds *SCManager) registryService(pCtx context.Context) error {
- ctx := core.AddDefaultContextValue(pCtx)
- respE, err := datasource.GetMetadataManager().ExistService(ctx, core.GetExistenceRequest())
- if err != nil {
- log.Error("query service center existence failed", err)
- return err
- }
- if respE.Response.GetCode() == pb.ResponseSuccess {
- log.Warn(fmt.Sprintf("service center service[%s] already registered", respE.ServiceId))
- service, err := datasource.GetMetadataManager().GetService(ctx, core.GetServiceRequest(respE.ServiceId))
- if err != nil {
- log.Error(fmt.Sprintf("query service center service[%s] info failed", respE.ServiceId), err)
- return mutil.ErrLostServiceFile
- }
- core.Service = service
- return nil
- }
-
- respS, err := datasource.GetMetadataManager().RegisterService(ctx, core.CreateServiceRequest())
- if err != nil {
- log.Error("register service center failed", err)
- return err
- }
- core.Service.ServiceId = respS.ServiceId
- log.Info(fmt.Sprintf("register service center service[%s]", respS.ServiceId))
- return nil
-}
-
-func (ds *SCManager) registryInstance(pCtx context.Context) error {
- core.Instance.InstanceId = ""
- core.Instance.ServiceId = core.Service.ServiceId
-
- ctx := core.AddDefaultContextValue(pCtx)
-
- respI, err := datasource.GetMetadataManager().RegisterInstance(ctx, core.RegisterInstanceRequest())
- if err != nil {
- log.Error("register failed", err)
- return err
- }
- if respI.Response.GetCode() != pb.ResponseSuccess {
- err = fmt.Errorf("register service center[%s] instance failed, %s",
- core.Instance.ServiceId, respI.Response.GetMessage())
- log.Error(err.Error(), nil)
- return err
- }
- core.Instance.InstanceId = respI.InstanceId
- log.Info(fmt.Sprintf("register service center instance[%s/%s], endpoints is %s",
- core.Service.ServiceId, respI.InstanceId, core.Instance.Endpoints))
- return nil
-}
-
-func (ds *SCManager) selfHeartBeat(pCtx context.Context) error {
- ctx := core.AddDefaultContextValue(pCtx)
- respI, err := discosvc.Heartbeat(ctx, core.HeartbeatRequest())
- if err != nil {
- log.Error("send heartbeat failed", err)
- return err
- }
- if respI.Response.GetCode() == pb.ResponseSuccess {
- log.Debug(fmt.Sprintf("update service center instance[%s/%s] heartbeat",
- core.Instance.ServiceId, core.Instance.InstanceId))
- return nil
- }
- err = fmt.Errorf(respI.Response.GetMessage())
- log.Error(fmt.Sprintf("update service center instance[%s/%s] heartbeat failed",
- core.Instance.ServiceId, core.Instance.InstanceId), err)
- return err
-}
-
-func (ds *SCManager) autoSelfHeartBeat() {
- gopool.Go(func(ctx context.Context) {
- for {
- select {
- case <-ctx.Done():
- return
- case <-time.After(time.Duration(core.Instance.HealthCheck.Interval) * time.Second):
- err := ds.selfHeartBeat(ctx)
- if err == nil {
- continue
- }
- //服务不存在,创建服务
- err = ds.SelfRegister(ctx)
- if err != nil {
- log.Error(fmt.Sprintf("retry to register[%s/%s/%s/%s] failed",
- core.Service.Environment, core.Service.AppId, core.Service.ServiceName, core.Service.Version), err)
- }
- }
- }
- })
-}
diff --git a/server/api_server.go b/server/api_server.go
index 235532e..61c3ed6 100644
--- a/server/api_server.go
+++ b/server/api_server.go
@@ -23,9 +23,6 @@ import (
"net"
"time"
- "github.com/apache/servicecomb-service-center/server/service/disco"
-
- "github.com/apache/servicecomb-service-center/datasource"
"github.com/apache/servicecomb-service-center/pkg/grace"
"github.com/apache/servicecomb-service-center/pkg/log"
"github.com/apache/servicecomb-service-center/pkg/rest"
@@ -33,6 +30,8 @@ import (
"github.com/apache/servicecomb-service-center/server/core"
"github.com/apache/servicecomb-service-center/server/metrics"
rs "github.com/apache/servicecomb-service-center/server/rest"
+ "github.com/apache/servicecomb-service-center/server/service/disco"
+ "github.com/apache/servicecomb-service-center/server/service/registry"
"github.com/go-chassis/foundation/gopool"
)
@@ -165,7 +164,7 @@ func (s *APIServer) Stop() {
}
func (s *APIServer) selfRegister() {
- err := datasource.GetSCManager().SelfRegister(context.Background())
+ err := registry.SelfRegister(context.Background())
if err != nil {
s.err <- err
return
@@ -177,7 +176,7 @@ func (s *APIServer) selfRegister() {
func (s *APIServer) selfUnregister() {
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
- if err := datasource.GetSCManager().SelfUnregister(ctx); err != nil {
+ if err := registry.SelfUnregister(ctx); err != nil {
log.Error("stop registry engine failed", err)
}
}
diff --git a/server/datacache/cache.go b/server/datacache/cache.go
deleted file mode 100644
index 72e9f92..0000000
--- a/server/datacache/cache.go
+++ /dev/null
@@ -1,8 +0,0 @@
-package datacache
-
-import "context"
-
-type DataCache interface {
- Get(ctx context.Context, name string) (interface{}, error)
- List(ctx context.Context) ([]interface{}, error)
-}
diff --git a/datasource/etcd/engine.go b/server/service/registry/registry.go
similarity index 73%
copy from datasource/etcd/engine.go
copy to server/service/registry/registry.go
index 3666aa3..e2e04c8 100644
--- a/datasource/etcd/engine.go
+++ b/server/service/registry/registry.go
@@ -15,53 +15,43 @@
* limitations under the License.
*/
-package etcd
+package registry
import (
"context"
- "encoding/json"
"errors"
"fmt"
- "os"
"time"
"github.com/apache/servicecomb-service-center/datasource"
- "github.com/apache/servicecomb-service-center/datasource/etcd/mux"
- "github.com/apache/servicecomb-service-center/datasource/etcd/path"
"github.com/apache/servicecomb-service-center/pkg/log"
- "github.com/apache/servicecomb-service-center/server/config"
"github.com/apache/servicecomb-service-center/server/core"
discosvc "github.com/apache/servicecomb-service-center/server/service/disco"
- "github.com/apache/servicecomb-service-center/version"
pb "github.com/go-chassis/cari/discovery"
"github.com/go-chassis/foundation/gopool"
- "github.com/little-cui/etcdadpt"
)
-type SCManager struct {
-}
-
-func (sm *SCManager) SelfRegister(ctx context.Context) error {
- err := sm.selfRegister(ctx)
+func SelfRegister(ctx context.Context) error {
+ err := selfRegister(ctx)
if err != nil {
return err
}
// start send heart beat job
- sm.autoSelfHeartBeat()
+ autoSelfHeartBeat()
return nil
}
-func (sm *SCManager) selfRegister(pCtx context.Context) error {
+func selfRegister(pCtx context.Context) error {
ctx := core.AddDefaultContextValue(pCtx)
- err := sm.registerService(ctx)
+ err := registerService(ctx)
if err != nil {
return err
}
// 实例信息
- return sm.registerInstance(ctx)
+ return registerInstance(ctx)
}
-func (sm *SCManager) registerService(ctx context.Context) error {
+func registerService(ctx context.Context) error {
respE, err := core.ServiceAPI.Exist(ctx, core.GetExistenceRequest())
if err != nil {
log.Error("query service center existence failed", err)
@@ -92,7 +82,7 @@ func (sm *SCManager) registerService(ctx context.Context) error {
return nil
}
-func (sm *SCManager) registerInstance(ctx context.Context) error {
+func registerInstance(ctx context.Context) error {
core.Instance.InstanceId = ""
core.Instance.ServiceId = core.Service.ServiceId
respI, err := discosvc.RegisterInstance(ctx, core.RegisterInstanceRequest())
@@ -111,7 +101,7 @@ func (sm *SCManager) registerInstance(ctx context.Context) error {
return nil
}
-func (sm *SCManager) selfHeartBeat(pCtx context.Context) error {
+func selfHeartBeat(pCtx context.Context) error {
ctx := core.AddDefaultContextValue(pCtx)
respI, err := discosvc.Heartbeat(ctx, core.HeartbeatRequest())
if err != nil {
@@ -129,19 +119,19 @@ func (sm *SCManager) selfHeartBeat(pCtx context.Context) error {
return err
}
-func (sm *SCManager) autoSelfHeartBeat() {
+func autoSelfHeartBeat() {
gopool.Go(func(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case <-time.After(time.Duration(core.Instance.HealthCheck.Interval) * time.Second):
- err := sm.selfHeartBeat(ctx)
+ err := selfHeartBeat(ctx)
if err == nil {
continue
}
//服务不存在,创建服务
- err = sm.selfRegister(ctx)
+ err = selfRegister(ctx)
if err != nil {
log.Error(fmt.Sprintf("retry to register[%s/%s/%s/%s] failed",
core.Service.Environment, core.Service.AppId, core.Service.ServiceName, core.Service.Version), err)
@@ -151,7 +141,7 @@ func (sm *SCManager) autoSelfHeartBeat() {
})
}
-func (sm *SCManager) SelfUnregister(pCtx context.Context) error {
+func SelfUnregister(pCtx context.Context) error {
if len(core.Instance.InstanceId) == 0 {
return nil
}
@@ -171,35 +161,3 @@ func (sm *SCManager) SelfUnregister(pCtx context.Context) error {
core.Service.ServiceId, core.Instance.InstanceId))
return nil
}
-
-func (sm *SCManager) GetClusters(ctx context.Context) (etcdadpt.Clusters, error) {
- return etcdadpt.ListCluster(ctx)
-}
-func (sm *SCManager) UpgradeServerVersion(ctx context.Context) error {
- bytes, err := json.Marshal(config.Server)
- if err != nil {
- return err
- }
- return etcdadpt.PutBytes(ctx, path.GetServerInfoKey(), bytes)
-}
-func (sm *SCManager) UpgradeVersion(ctx context.Context) error {
- lock, err := mux.Lock(mux.GlobalLock)
-
- if err != nil {
- log.Error("wait for server ready failed", err)
- return err
- }
- if needUpgrade(ctx) {
- config.Server.Version = version.Ver().Version
-
- if err := sm.UpgradeServerVersion(ctx); err != nil {
- log.Error("upgrade server version failed", err)
- os.Exit(1)
- }
- }
- err = lock.Unlock()
- if err != nil {
- log.Error("", err)
- }
- return err
-}