You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by ti...@apache.org on 2021/02/26 10:33:10 UTC
[servicecomb-service-center] branch master updated: [SCB-2094]
Bugfix-cache heartbeat mode in mongo (#874)
This is an automated email from the ASF dual-hosted git repository.
tianxiaoliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-service-center.git
The following commit(s) were added to refs/heads/master by this push:
new a0e76c3 [SCB-2094] Bugfix-cache heartbeat mode in mongo (#874)
a0e76c3 is described below
commit a0e76c30ca34a9340e177b8e4b5e212110f62d18
Author: robotLJW <79...@qq.com>
AuthorDate: Fri Feb 26 18:33:04 2021 +0800
[SCB-2094] Bugfix-cache heartbeat mode in mongo (#874)
1. When mongodb is used, SC does not implement its own heartbeat reporting interface
---
datasource/mongo/engine.go | 50 +++++++++++++++++++---
datasource/mongo/heartbeat/cache/heartbeatcache.go | 15 +++++++
datasource/mongo/mongo.go | 4 +-
3 files changed, 61 insertions(+), 8 deletions(-)
diff --git a/datasource/mongo/engine.go b/datasource/mongo/engine.go
index 07bca41..1024fd2 100644
--- a/datasource/mongo/engine.go
+++ b/datasource/mongo/engine.go
@@ -19,23 +19,23 @@ package mongo
import (
"context"
- "time"
-
- "github.com/apache/servicecomb-service-center/pkg/cluster"
-
"fmt"
"strconv"
"strings"
+ "time"
+
+ pb "github.com/go-chassis/cari/discovery"
+ "go.mongodb.org/mongo-driver/bson"
"github.com/apache/servicecomb-service-center/datasource"
"github.com/apache/servicecomb-service-center/datasource/etcd/path"
"github.com/apache/servicecomb-service-center/datasource/mongo/client"
+ "github.com/apache/servicecomb-service-center/pkg/cluster"
+ "github.com/apache/servicecomb-service-center/pkg/gopool"
"github.com/apache/servicecomb-service-center/pkg/log"
"github.com/apache/servicecomb-service-center/pkg/util"
"github.com/apache/servicecomb-service-center/server/core"
"github.com/apache/servicecomb-service-center/server/metrics"
- pb "github.com/go-chassis/cari/discovery"
- "go.mongodb.org/mongo-driver/bson"
)
func (ds *DataSource) SelfRegister(ctx context.Context) error {
@@ -192,8 +192,44 @@ func (ds *DataSource) registryInstance(pCtx context.Context) error {
return nil
}
+func (ds *DataSource) selfHeartBeat(pCtx context.Context) error {
+ ctx := core.AddDefaultContextValue(pCtx)
+ respI, err := core.InstanceAPI.Heartbeat(ctx, core.HeartbeatRequest())
+ if err != nil {
+ log.Error("send heartbeat failed", err)
+ return err
+ }
+ if respI.Response.GetCode() == pb.ResponseSuccess {
+ log.Debugf("update service center instance[%s/%s] heartbeat",
+ core.Instance.ServiceId, core.Instance.InstanceId)
+ return nil
+ }
+ err = fmt.Errorf(respI.Response.GetMessage())
+ log.Errorf(err, "update service center instance[%s/%s] heartbeat failed",
+ core.Instance.ServiceId, core.Instance.InstanceId)
+ return err
+}
+
func (ds *DataSource) autoSelfHeartBeat() {
- //todo
+ gopool.Go(func(ctx context.Context) {
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ case <-time.After(time.Duration(core.Instance.HealthCheck.Interval) * time.Second):
+ err := ds.selfHeartBeat(ctx)
+ if err == nil {
+ continue
+ }
+ //服务不存在,创建服务
+ err = ds.SelfRegister(ctx)
+ if err != nil {
+ log.Errorf(err, "retry to register[%s/%s/%s/%s] failed",
+ core.Service.Environment, core.Service.AppId, core.Service.ServiceName, core.Service.Version)
+ }
+ }
+ }
+ })
}
func GetAllServicesAcrossDomainProject(ctx context.Context) (map[string][]*pb.MicroService, error) {
diff --git a/datasource/mongo/heartbeat/cache/heartbeatcache.go b/datasource/mongo/heartbeat/cache/heartbeatcache.go
index a9f4d80..e3d229f 100644
--- a/datasource/mongo/heartbeat/cache/heartbeatcache.go
+++ b/datasource/mongo/heartbeat/cache/heartbeatcache.go
@@ -29,6 +29,14 @@ import (
"github.com/apache/servicecomb-service-center/pkg/util"
)
+const (
+ maxInterval = 60
+ minInterval = 0
+ defaultInterval = 30
+ maxTimes = 3
+ minTimes = 0
+)
+
var ErrHeartbeatConversionFailed = errors.New("instanceHeartbeatInfo type conversion failed. ")
func init() {
@@ -91,6 +99,13 @@ func notInCacheStrategy(ctx context.Context, request *pb.HeartbeatRequest) (*pb.
return resp, err
}
interval, times := instance.Instance.HealthCheck.Interval, instance.Instance.HealthCheck.Times
+ // Set the range of interval and time
+ if interval > maxInterval || interval < minTimes {
+ interval = defaultInterval
+ }
+ if times > maxTimes || times < minTimes {
+ times = maxTimes
+ }
err = addHeartbeatTask(request.ServiceId, request.InstanceId, interval*(times+1))
if err != nil {
log.Error(fmt.Sprintf("heartbeat failed, instance[%s]. operator %s", request.InstanceId, remoteIP), err)
diff --git a/datasource/mongo/mongo.go b/datasource/mongo/mongo.go
index 2ba5388..af0f1d7 100644
--- a/datasource/mongo/mongo.go
+++ b/datasource/mongo/mongo.go
@@ -31,6 +31,8 @@ import (
"go.mongodb.org/mongo-driver/mongo/options"
)
+const defaultExpireTime = 300
+
func init() {
datasource.Install("mongo", NewDataSource)
}
@@ -139,7 +141,7 @@ func EnsureInstance() {
wrapCreateCollectionError(err)
instanceIndex := BuildIndexDoc(ColumnRefreshTime)
- instanceIndex.Options = options.Index().SetExpireAfterSeconds(60)
+ instanceIndex.Options = options.Index().SetExpireAfterSeconds(defaultExpireTime)
instanceServiceIndex := BuildIndexDoc(StringBuilder([]string{ColumnInstance, ColumnServiceID}))