You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by ti...@apache.org on 2021/02/26 10:33:10 UTC

[servicecomb-service-center] branch master updated: [SCB-2094] Bugfix-cache heartbeat mode in mongo (#874)

This is an automated email from the ASF dual-hosted git repository.

tianxiaoliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-service-center.git


The following commit(s) were added to refs/heads/master by this push:
     new a0e76c3  [SCB-2094] Bugfix-cache heartbeat mode in mongo (#874)
a0e76c3 is described below

commit a0e76c30ca34a9340e177b8e4b5e212110f62d18
Author: robotLJW <79...@qq.com>
AuthorDate: Fri Feb 26 18:33:04 2021 +0800

    [SCB-2094] Bugfix-cache heartbeat mode in mongo (#874)
    
    1. When mongodb is used, SC does not implement its own heartbeat reporting interface
---
 datasource/mongo/engine.go                         | 50 +++++++++++++++++++---
 datasource/mongo/heartbeat/cache/heartbeatcache.go | 15 +++++++
 datasource/mongo/mongo.go                          |  4 +-
 3 files changed, 61 insertions(+), 8 deletions(-)

diff --git a/datasource/mongo/engine.go b/datasource/mongo/engine.go
index 07bca41..1024fd2 100644
--- a/datasource/mongo/engine.go
+++ b/datasource/mongo/engine.go
@@ -19,23 +19,23 @@ package mongo
 
 import (
 	"context"
-	"time"
-
-	"github.com/apache/servicecomb-service-center/pkg/cluster"
-
 	"fmt"
 	"strconv"
 	"strings"
+	"time"
+
+	pb "github.com/go-chassis/cari/discovery"
+	"go.mongodb.org/mongo-driver/bson"
 
 	"github.com/apache/servicecomb-service-center/datasource"
 	"github.com/apache/servicecomb-service-center/datasource/etcd/path"
 	"github.com/apache/servicecomb-service-center/datasource/mongo/client"
+	"github.com/apache/servicecomb-service-center/pkg/cluster"
+	"github.com/apache/servicecomb-service-center/pkg/gopool"
 	"github.com/apache/servicecomb-service-center/pkg/log"
 	"github.com/apache/servicecomb-service-center/pkg/util"
 	"github.com/apache/servicecomb-service-center/server/core"
 	"github.com/apache/servicecomb-service-center/server/metrics"
-	pb "github.com/go-chassis/cari/discovery"
-	"go.mongodb.org/mongo-driver/bson"
 )
 
 func (ds *DataSource) SelfRegister(ctx context.Context) error {
@@ -192,8 +192,44 @@ func (ds *DataSource) registryInstance(pCtx context.Context) error {
 	return nil
 }
 
+func (ds *DataSource) selfHeartBeat(pCtx context.Context) error {
+	ctx := core.AddDefaultContextValue(pCtx)
+	respI, err := core.InstanceAPI.Heartbeat(ctx, core.HeartbeatRequest())
+	if err != nil {
+		log.Error("send heartbeat failed", err)
+		return err
+	}
+	if respI.Response.GetCode() == pb.ResponseSuccess {
+		log.Debugf("update service center instance[%s/%s] heartbeat",
+			core.Instance.ServiceId, core.Instance.InstanceId)
+		return nil
+	}
+	err = fmt.Errorf(respI.Response.GetMessage())
+	log.Errorf(err, "update service center instance[%s/%s] heartbeat failed",
+		core.Instance.ServiceId, core.Instance.InstanceId)
+	return err
+}
+
 func (ds *DataSource) autoSelfHeartBeat() {
-	//todo
+	gopool.Go(func(ctx context.Context) {
+		for {
+			select {
+			case <-ctx.Done():
+				return
+			case <-time.After(time.Duration(core.Instance.HealthCheck.Interval) * time.Second):
+				err := ds.selfHeartBeat(ctx)
+				if err == nil {
+					continue
+				}
+				//服务不存在,创建服务
+				err = ds.SelfRegister(ctx)
+				if err != nil {
+					log.Errorf(err, "retry to register[%s/%s/%s/%s] failed",
+						core.Service.Environment, core.Service.AppId, core.Service.ServiceName, core.Service.Version)
+				}
+			}
+		}
+	})
 }
 
 func GetAllServicesAcrossDomainProject(ctx context.Context) (map[string][]*pb.MicroService, error) {
diff --git a/datasource/mongo/heartbeat/cache/heartbeatcache.go b/datasource/mongo/heartbeat/cache/heartbeatcache.go
index a9f4d80..e3d229f 100644
--- a/datasource/mongo/heartbeat/cache/heartbeatcache.go
+++ b/datasource/mongo/heartbeat/cache/heartbeatcache.go
@@ -29,6 +29,14 @@ import (
 	"github.com/apache/servicecomb-service-center/pkg/util"
 )
 
+const (
+	maxInterval     = 60
+	minInterval     = 0
+	defaultInterval = 30
+	maxTimes        = 3
+	minTimes        = 0
+)
+
 var ErrHeartbeatConversionFailed = errors.New("instanceHeartbeatInfo type conversion failed. ")
 
 func init() {
@@ -91,6 +99,13 @@ func notInCacheStrategy(ctx context.Context, request *pb.HeartbeatRequest) (*pb.
 		return resp, err
 	}
 	interval, times := instance.Instance.HealthCheck.Interval, instance.Instance.HealthCheck.Times
+	// Set the range of interval and time
+	if interval > maxInterval || interval < minTimes {
+		interval = defaultInterval
+	}
+	if times > maxTimes || times < minTimes {
+		times = maxTimes
+	}
 	err = addHeartbeatTask(request.ServiceId, request.InstanceId, interval*(times+1))
 	if err != nil {
 		log.Error(fmt.Sprintf("heartbeat failed, instance[%s]. operator %s", request.InstanceId, remoteIP), err)
diff --git a/datasource/mongo/mongo.go b/datasource/mongo/mongo.go
index 2ba5388..af0f1d7 100644
--- a/datasource/mongo/mongo.go
+++ b/datasource/mongo/mongo.go
@@ -31,6 +31,8 @@ import (
 	"go.mongodb.org/mongo-driver/mongo/options"
 )
 
+const defaultExpireTime = 300
+
 func init() {
 	datasource.Install("mongo", NewDataSource)
 }
@@ -139,7 +141,7 @@ func EnsureInstance() {
 	wrapCreateCollectionError(err)
 
 	instanceIndex := BuildIndexDoc(ColumnRefreshTime)
-	instanceIndex.Options = options.Index().SetExpireAfterSeconds(60)
+	instanceIndex.Options = options.Index().SetExpireAfterSeconds(defaultExpireTime)
 
 	instanceServiceIndex := BuildIndexDoc(StringBuilder([]string{ColumnInstance, ColumnServiceID}))