You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by ti...@apache.org on 2021/04/15 07:33:46 UTC

[servicecomb-service-center] branch master updated: SCB-2094 BugFix: join the heartbeat cache (#943)

This is an automated email from the ASF dual-hosted git repository.

tianxiaoliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-service-center.git


The following commit(s) were added to refs/heads/master by this push:
     new 677b100  SCB-2094 BugFix: join the heartbeat cache (#943)
677b100 is described below

commit 677b1006e130ea9512c63cb928b788614824d145
Author: robotljw <79...@qq.com>
AuthorDate: Thu Apr 15 15:33:37 2021 +0800

    SCB-2094 BugFix: join the heartbeat cache (#943)
---
 datasource/mongo/heartbeat/cache/heartbeat.go      | 97 +++++++++++-----------
 datasource/mongo/heartbeat/cache/heartbeat_test.go | 22 ++---
 datasource/mongo/heartbeat/cache/heartbeatcache.go | 24 ++++--
 .../mongo/heartbeat/cache/heartbeatcache_test.go   |  9 +-
 .../heartbeat/{healthcheck.go => cache/types.go}   | 20 ++---
 .../mongo/heartbeat/checker/heartbeatchecker.go    |  5 ++
 datasource/mongo/heartbeat/healthcheck.go          |  3 +
 datasource/mongo/ms.go                             |  8 +-
 8 files changed, 106 insertions(+), 82 deletions(-)

diff --git a/datasource/mongo/heartbeat/cache/heartbeat.go b/datasource/mongo/heartbeat/cache/heartbeat.go
index 61e9b53..4094cfd 100644
--- a/datasource/mongo/heartbeat/cache/heartbeat.go
+++ b/datasource/mongo/heartbeat/cache/heartbeat.go
@@ -22,6 +22,7 @@ import (
 	"errors"
 	"fmt"
 	"runtime"
+	"sync"
 	"time"
 
 	"github.com/patrickmn/go-cache"
@@ -44,59 +45,61 @@ const (
 	ctxTimeout              = 5 * time.Second
 )
 
-// Store cache structure
-type instanceHeartbeatInfo struct {
-	serviceID   string
-	instanceID  string
-	ttl         int32
-	lastRefresh time.Time
-}
+var ErrHeartbeatTimeout = errors.New("heartbeat task waiting for processing timeout. ")
 
 var (
-	cacheChan              chan *instanceHeartbeatInfo
-	instanceHeartbeatStore = cache.New(0, instanceCheckerInternal)
-	workerNum              = runtime.NumCPU()
-	heartbeatTaskTimeout   = config.GetInt("registry.mongo.heartbeat.timeout", defaultTimeout)
-	ErrHeartbeatTimeout    = errors.New("heartbeat task waiting for processing timeout. ")
+	once sync.Once
+	cfg  cacheConfig
 )
 
-func init() {
-	capacity := config.GetInt("registry.mongo.heartbeat.cacheCapacity", defaultCacheCapacity)
-	cacheChan = make(chan *instanceHeartbeatInfo, capacity)
-	num := config.GetInt("registry.mongo.heartbeat.workerNum", defaultWorkNum)
-	if num != 0 {
-		workerNum = num
-	}
-	instanceHeartbeatStore.OnEvicted(func(k string, v interface{}) {
-		instanceInfo, ok := v.(*instanceHeartbeatInfo)
-		if ok && instanceInfo != nil {
-			ctx, cancel := context.WithTimeout(context.Background(), ctxTimeout)
-			defer cancel()
-			err := cleanInstance(ctx, instanceInfo.serviceID, instanceInfo.instanceID)
-			if err != nil {
-				log.Error("failed to cleanInstance in mongodb.", err)
-			}
-		}
-	})
+type cacheConfig struct {
+	cacheChan              chan *instanceHeartbeatInfo
+	instanceHeartbeatStore *cache.Cache
+	workerNum              int
+	heartbeatTaskTimeout   int
+}
 
-	for i := 1; i <= workerNum; i++ {
-		gopool.Go(func(ctx context.Context) {
-			for {
-				select {
-				case <-ctx.Done():
-					log.Warn("heartbeat work protocol exit.")
-					return
-				case heartbeatInfo, ok := <-cacheChan:
-					if ok {
-						instanceHeartbeatStore.Set(heartbeatInfo.instanceID, heartbeatInfo, time.Duration(heartbeatInfo.ttl)*time.Second)
-					}
+func configuration() *cacheConfig {
+	once.Do(func() {
+		cfg.workerNum = runtime.NumCPU()
+		num := config.GetInt("registry.mongo.heartbeat.workerNum", defaultWorkNum)
+		if num != 0 {
+			cfg.workerNum = num
+		}
+		cfg.heartbeatTaskTimeout = config.GetInt("registry.mongo.heartbeat.timeout", defaultTimeout)
+		cfg.cacheChan = make(chan *instanceHeartbeatInfo, config.GetInt("registry.mongo.heartbeat.cacheCapacity", defaultCacheCapacity))
+		cfg.instanceHeartbeatStore = cache.New(0, instanceCheckerInternal)
+		cfg.instanceHeartbeatStore.OnEvicted(func(k string, v interface{}) {
+			instanceInfo, ok := v.(*instanceHeartbeatInfo)
+			if ok && instanceInfo != nil {
+				ctx, cancel := context.WithTimeout(context.Background(), ctxTimeout)
+				defer cancel()
+				err := cleanInstance(ctx, instanceInfo.serviceID, instanceInfo.instanceID)
+				if err != nil {
+					log.Error("failed to cleanInstance in mongodb.", err)
 				}
 			}
 		})
-	}
+		for i := 1; i <= cfg.workerNum; i++ {
+			gopool.Go(func(ctx context.Context) {
+				for {
+					select {
+					case <-ctx.Done():
+						log.Warn("heartbeat work protocol exit.")
+						return
+					case heartbeatInfo, ok := <-cfg.cacheChan:
+						if ok {
+							cfg.instanceHeartbeatStore.Set(heartbeatInfo.instanceID, heartbeatInfo, time.Duration(heartbeatInfo.ttl)*time.Second)
+						}
+					}
+				}
+			})
+		}
+	})
+	return &cfg
 }
 
-func addHeartbeatTask(serviceID string, instanceID string, ttl int32) error {
+func (c *cacheConfig) AddHeartbeatTask(serviceID string, instanceID string, ttl int32) error {
 	// Unassigned setting default value is 30s
 	if ttl <= 0 {
 		ttl = defaultTTL
@@ -108,16 +111,16 @@ func addHeartbeatTask(serviceID string, instanceID string, ttl int32) error {
 		lastRefresh: time.Now(),
 	}
 	select {
-	case cacheChan <- newInstance:
+	case c.cacheChan <- newInstance:
 		return nil
-	case <-time.After(time.Duration(heartbeatTaskTimeout) * time.Second):
+	case <-time.After(time.Duration(c.heartbeatTaskTimeout) * time.Second):
 		log.Warn("the heartbeat's channel is full. ")
 		return ErrHeartbeatTimeout
 	}
 }
 
-func RemoveCacheInstance(instanceID string) {
-	instanceHeartbeatStore.Delete(instanceID)
+func (c *cacheConfig) RemoveCacheInstance(instanceID string) {
+	c.instanceHeartbeatStore.Delete(instanceID)
 }
 
 func cleanInstance(ctx context.Context, serviceID string, instanceID string) error {
diff --git a/datasource/mongo/heartbeat/cache/heartbeat_test.go b/datasource/mongo/heartbeat/cache/heartbeat_test.go
index d6845e6..26bbc4a 100644
--- a/datasource/mongo/heartbeat/cache/heartbeat_test.go
+++ b/datasource/mongo/heartbeat/cache/heartbeat_test.go
@@ -42,6 +42,8 @@ func init() {
 	client.NewMongoClient(config)
 }
 
+var c = configuration()
+
 func TestAddCacheInstance(t *testing.T) {
 	t.Run("add cache instance: set the ttl to 2 seconds", func(t *testing.T) {
 		instance1 := model.Instance{
@@ -55,11 +57,11 @@ func TestAddCacheInstance(t *testing.T) {
 				},
 			},
 		}
-		err := addHeartbeatTask(instance1.Instance.ServiceId, instance1.Instance.InstanceId, instance1.Instance.HealthCheck.Interval*(instance1.Instance.HealthCheck.Times+1))
+		err := c.AddHeartbeatTask(instance1.Instance.ServiceId, instance1.Instance.InstanceId, instance1.Instance.HealthCheck.Interval*(instance1.Instance.HealthCheck.Times+1))
 		assert.Equal(t, nil, err)
 		_, err = client.GetMongoClient().Insert(context.Background(), model.CollectionInstance, instance1)
 		assert.Equal(t, nil, err)
-		info, ok := instanceHeartbeatStore.Get(instance1.Instance.InstanceId)
+		info, ok := c.instanceHeartbeatStore.Get(instance1.Instance.InstanceId)
 		assert.Equal(t, true, ok)
 		if ok {
 			heartBeatInfo := info.(*instanceHeartbeatInfo)
@@ -67,7 +69,7 @@ func TestAddCacheInstance(t *testing.T) {
 			assert.Equal(t, instance1.Instance.HealthCheck.Interval*(instance1.Instance.HealthCheck.Times+1), heartBeatInfo.ttl)
 		}
 		time.Sleep(2 * time.Second)
-		_, ok = instanceHeartbeatStore.Get(instance1.Instance.InstanceId)
+		_, ok = c.instanceHeartbeatStore.Get(instance1.Instance.InstanceId)
 		assert.Equal(t, false, ok)
 		_, err = client.GetMongoClient().Delete(context.Background(), model.CollectionInstance, instance1)
 		assert.Equal(t, nil, err)
@@ -85,11 +87,11 @@ func TestAddCacheInstance(t *testing.T) {
 				},
 			},
 		}
-		err := addHeartbeatTask(instance1.Instance.ServiceId, instance1.Instance.InstanceId, instance1.Instance.HealthCheck.Interval*(instance1.Instance.HealthCheck.Times+1))
+		err := c.AddHeartbeatTask(instance1.Instance.ServiceId, instance1.Instance.InstanceId, instance1.Instance.HealthCheck.Interval*(instance1.Instance.HealthCheck.Times+1))
 		assert.Equal(t, nil, err)
 		_, err = client.GetMongoClient().Insert(context.Background(), model.CollectionInstance, instance1)
 		assert.Equal(t, nil, err)
-		info, ok := instanceHeartbeatStore.Get(instance1.Instance.InstanceId)
+		info, ok := c.instanceHeartbeatStore.Get(instance1.Instance.InstanceId)
 		assert.Equal(t, true, ok)
 		if ok {
 			heartBeatInfo := info.(*instanceHeartbeatInfo)
@@ -97,7 +99,7 @@ func TestAddCacheInstance(t *testing.T) {
 			assert.Equal(t, int32(defaultTTL), heartBeatInfo.ttl)
 		}
 		time.Sleep(defaultTTL * time.Second)
-		_, ok = instanceHeartbeatStore.Get(instance1.Instance.InstanceId)
+		_, ok = c.instanceHeartbeatStore.Get(instance1.Instance.InstanceId)
 		assert.Equal(t, false, ok)
 		_, err = client.GetMongoClient().Delete(context.Background(), model.CollectionInstance, instance1)
 		assert.Equal(t, nil, err)
@@ -117,11 +119,11 @@ func TestRemoveCacheInstance(t *testing.T) {
 				},
 			},
 		}
-		err := addHeartbeatTask(instance1.Instance.ServiceId, instance1.Instance.InstanceId, instance1.Instance.HealthCheck.Interval*(instance1.Instance.HealthCheck.Times+1))
+		err := c.AddHeartbeatTask(instance1.Instance.ServiceId, instance1.Instance.InstanceId, instance1.Instance.HealthCheck.Interval*(instance1.Instance.HealthCheck.Times+1))
 		assert.Equal(t, nil, err)
 		_, err = client.GetMongoClient().Insert(context.Background(), model.CollectionInstance, instance1)
 		assert.Equal(t, nil, err)
-		info, ok := instanceHeartbeatStore.Get(instance1.Instance.InstanceId)
+		info, ok := c.instanceHeartbeatStore.Get(instance1.Instance.InstanceId)
 		assert.Equal(t, true, ok)
 		if ok {
 			heartBeatInfo := info.(*instanceHeartbeatInfo)
@@ -129,8 +131,8 @@ func TestRemoveCacheInstance(t *testing.T) {
 			assert.Equal(t, instance1.Instance.HealthCheck.Interval*(instance1.Instance.HealthCheck.Times+1), heartBeatInfo.ttl)
 		}
 		time.Sleep(4 * time.Second)
-		RemoveCacheInstance(instance1.Instance.InstanceId)
-		_, ok = instanceHeartbeatStore.Get(instance1.Instance.InstanceId)
+		c.RemoveCacheInstance(instance1.Instance.InstanceId)
+		_, ok = c.instanceHeartbeatStore.Get(instance1.Instance.InstanceId)
 		assert.Equal(t, false, ok)
 		_, err = client.GetMongoClient().Delete(context.Background(), model.CollectionInstance, instance1)
 		assert.Equal(t, nil, err)
diff --git a/datasource/mongo/heartbeat/cache/heartbeatcache.go b/datasource/mongo/heartbeat/cache/heartbeatcache.go
index 31ae699..c6381e9 100644
--- a/datasource/mongo/heartbeat/cache/heartbeatcache.go
+++ b/datasource/mongo/heartbeat/cache/heartbeatcache.go
@@ -44,20 +44,26 @@ func init() {
 }
 
 type HeartBeatCache struct {
+	cfg *cacheConfig
 }
 
 func NewHeartBeatCache(opts heartbeat.Options) (heartbeat.HealthCheck, error) {
-	return &HeartBeatCache{}, nil
+	return &HeartBeatCache{cfg: configuration()}, nil
 }
 
 func (h *HeartBeatCache) Heartbeat(ctx context.Context, request *pb.HeartbeatRequest) (*pb.HeartbeatResponse, error) {
-	if ins, ok := instanceHeartbeatStore.Get(request.InstanceId); ok {
-		return inCacheStrategy(ctx, request, ins)
+	if ins, ok := h.cfg.instanceHeartbeatStore.Get(request.InstanceId); ok {
+		return h.inCacheStrategy(ctx, request, ins)
 	}
-	return notInCacheStrategy(ctx, request)
+	return h.notInCacheStrategy(ctx, request)
 }
 
-func inCacheStrategy(ctx context.Context, request *pb.HeartbeatRequest, insHeartbeatInfo interface{}) (*pb.HeartbeatResponse, error) {
+// Add instance related information to the cache
+func (h *HeartBeatCache) CheckInstance(ctx context.Context, instance *pb.MicroServiceInstance) error {
+	return h.cfg.AddHeartbeatTask(instance.ServiceId, instance.InstanceId, instance.HealthCheck.Interval*(instance.HealthCheck.Times+1))
+}
+
+func (h *HeartBeatCache) inCacheStrategy(ctx context.Context, request *pb.HeartbeatRequest, insHeartbeatInfo interface{}) (*pb.HeartbeatResponse, error) {
 	remoteIP := util.GetIPFromContext(ctx)
 	heartbeatInfo, ok := insHeartbeatInfo.(*instanceHeartbeatInfo)
 	if !ok {
@@ -67,7 +73,7 @@ func inCacheStrategy(ctx context.Context, request *pb.HeartbeatRequest, insHeart
 		}
 		return resp, ErrHeartbeatConversionFailed
 	}
-	err := addHeartbeatTask(request.ServiceId, request.InstanceId, heartbeatInfo.ttl)
+	err := h.cfg.AddHeartbeatTask(request.ServiceId, request.InstanceId, heartbeatInfo.ttl)
 	if err != nil {
 		log.Error(fmt.Sprintf("heartbeat failed, instance[%s]. operator %s", request.InstanceId, remoteIP), err)
 		resp := &pb.HeartbeatResponse{
@@ -88,7 +94,7 @@ func inCacheStrategy(ctx context.Context, request *pb.HeartbeatRequest, insHeart
 	}, nil
 }
 
-func notInCacheStrategy(ctx context.Context, request *pb.HeartbeatRequest) (*pb.HeartbeatResponse, error) {
+func (h *HeartBeatCache) notInCacheStrategy(ctx context.Context, request *pb.HeartbeatRequest) (*pb.HeartbeatResponse, error) {
 	remoteIP := util.GetIPFromContext(ctx)
 	instance, err := findInstance(ctx, request.ServiceId, request.InstanceId)
 	if err != nil {
@@ -106,7 +112,7 @@ func notInCacheStrategy(ctx context.Context, request *pb.HeartbeatRequest) (*pb.
 	if times > maxTimes || times < minTimes {
 		times = maxTimes
 	}
-	err = addHeartbeatTask(request.ServiceId, request.InstanceId, interval*(times+1))
+	err = h.cfg.AddHeartbeatTask(request.ServiceId, request.InstanceId, interval*(times+1))
 	if err != nil {
 		log.Error(fmt.Sprintf("heartbeat failed, instance[%s]. operator %s", request.InstanceId, remoteIP), err)
 		resp := &pb.HeartbeatResponse{
@@ -116,7 +122,7 @@ func notInCacheStrategy(ctx context.Context, request *pb.HeartbeatRequest) (*pb.
 	}
 	err = updateInstance(ctx, request.ServiceId, request.InstanceId)
 	if err != nil {
-		RemoveCacheInstance(request.InstanceId)
+		h.cfg.RemoveCacheInstance(request.InstanceId)
 		log.Error(fmt.Sprintf("heartbeat failed, instance[%s]. operator %s", request.InstanceId, remoteIP), err)
 		resp := &pb.HeartbeatResponse{
 			Response: pb.CreateResponseWithSCErr(pb.NewError(pb.ErrInstanceNotExists, err.Error())),
diff --git a/datasource/mongo/heartbeat/cache/heartbeatcache_test.go b/datasource/mongo/heartbeat/cache/heartbeatcache_test.go
index da2308a..37bb0c2 100644
--- a/datasource/mongo/heartbeat/cache/heartbeatcache_test.go
+++ b/datasource/mongo/heartbeat/cache/heartbeatcache_test.go
@@ -31,9 +31,10 @@ import (
 	"github.com/apache/servicecomb-service-center/datasource/mongo/util"
 )
 
+var heartBeatCheck = &HeartBeatCache{cfg: configuration()}
+
 func TestHeartBeatCheck(t *testing.T) {
 	t.Run("heartbeat check: instance does not exist,it should be failed", func(t *testing.T) {
-		heartBeatCheck := &HeartBeatCache{}
 		resp, err := heartBeatCheck.Heartbeat(context.Background(), &pb.HeartbeatRequest{
 			ServiceId:  "serviceId1",
 			InstanceId: "not-exist-ins",
@@ -43,9 +44,8 @@ func TestHeartBeatCheck(t *testing.T) {
 	})
 
 	t.Run("heartbeat check: data exists in the cache,but not in db,it should be failed", func(t *testing.T) {
-		err := addHeartbeatTask("not-exist-svc", "not-exist-ins", 30)
+		err := heartBeatCheck.cfg.AddHeartbeatTask("not-exist-svc", "not-exist-ins", 30)
 		assert.Nil(t, err)
-		heartBeatCheck := &HeartBeatCache{}
 		resp, err := heartBeatCheck.Heartbeat(context.Background(), &pb.HeartbeatRequest{
 			ServiceId:  "serviceId1",
 			InstanceId: "not-exist-ins",
@@ -55,7 +55,6 @@ func TestHeartBeatCheck(t *testing.T) {
 	})
 
 	t.Run("heartbeat check: data exists in the cache and db,it can be update successfully", func(t *testing.T) {
-		heartBeatCheck := &HeartBeatCache{}
 		instanceDB := model.Instance{
 			RefreshTime: time.Now(),
 			Instance: &pb.MicroServiceInstance{
@@ -73,7 +72,7 @@ func TestHeartBeatCheck(t *testing.T) {
 		_, _ = client.GetMongoClient().Delete(context.Background(), model.CollectionInstance, filter)
 		_, err := client.GetMongoClient().Insert(context.Background(), model.CollectionInstance, instanceDB)
 		assert.Equal(t, nil, err)
-		err = addHeartbeatTask(instanceDB.Instance.ServiceId, instanceDB.Instance.InstanceId, instanceDB.Instance.HealthCheck.Interval*(instanceDB.Instance.HealthCheck.Times+1))
+		err = heartBeatCheck.cfg.AddHeartbeatTask(instanceDB.Instance.ServiceId, instanceDB.Instance.InstanceId, instanceDB.Instance.HealthCheck.Interval*(instanceDB.Instance.HealthCheck.Times+1))
 		assert.Equal(t, nil, err)
 		resp, err := heartBeatCheck.Heartbeat(context.Background(), &pb.HeartbeatRequest{
 			ServiceId:  "serviceIdDB",
diff --git a/datasource/mongo/heartbeat/healthcheck.go b/datasource/mongo/heartbeat/cache/types.go
similarity index 65%
copy from datasource/mongo/heartbeat/healthcheck.go
copy to datasource/mongo/heartbeat/cache/types.go
index 193b8c9..01da683 100644
--- a/datasource/mongo/heartbeat/healthcheck.go
+++ b/datasource/mongo/heartbeat/cache/types.go
@@ -3,26 +3,26 @@
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
  * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
+ * (the "License"); you may not use this file except request compliance with
  * the License.  You may obtain a copy of the License at
  *
  *     http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
+ * Unless required by applicable law or agreed to request writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
 
-package heartbeat
+package heartbeatcache
 
-import (
-	"context"
+import "time"
 
-	pb "github.com/go-chassis/cari/discovery"
-)
-
-type HealthCheck interface {
-	Heartbeat(ctx context.Context, request *pb.HeartbeatRequest) (*pb.HeartbeatResponse, error)
+// Store cache structure
+type instanceHeartbeatInfo struct {
+	serviceID   string
+	instanceID  string
+	ttl         int32
+	lastRefresh time.Time
 }
diff --git a/datasource/mongo/heartbeat/checker/heartbeatchecker.go b/datasource/mongo/heartbeat/checker/heartbeatchecker.go
index 5183185..b3522b3 100644
--- a/datasource/mongo/heartbeat/checker/heartbeatchecker.go
+++ b/datasource/mongo/heartbeat/checker/heartbeatchecker.go
@@ -54,3 +54,8 @@ func (h *HeartBeatChecker) Heartbeat(ctx context.Context, request *pb.HeartbeatR
 			"Update service instance heartbeat successfully."),
 	}, nil
 }
+
+func (h *HeartBeatChecker) CheckInstance(ctx context.Context, instance *pb.MicroServiceInstance) error {
+	// do nothing
+	return nil
+}
diff --git a/datasource/mongo/heartbeat/healthcheck.go b/datasource/mongo/heartbeat/healthcheck.go
index 193b8c9..5cb9fa1 100644
--- a/datasource/mongo/heartbeat/healthcheck.go
+++ b/datasource/mongo/heartbeat/healthcheck.go
@@ -24,5 +24,8 @@ import (
 )
 
 type HealthCheck interface {
+	// processing heartbeat request
 	Heartbeat(ctx context.Context, request *pb.HeartbeatRequest) (*pb.HeartbeatResponse, error)
+	// processing heartbeat check of instance after registration
+	CheckInstance(ctx context.Context, instance *pb.MicroServiceInstance) error
 }
diff --git a/datasource/mongo/ms.go b/datasource/mongo/ms.go
index 5d91558..3d224b8 100644
--- a/datasource/mongo/ms.go
+++ b/datasource/mongo/ms.go
@@ -1933,7 +1933,7 @@ func registryInstance(ctx context.Context, request *discovery.RegisterInstanceRe
 	project := util.ParseProject(ctx)
 	remoteIP := util.GetIPFromContext(ctx)
 	instance := request.Instance
-	ttl := int64(instance.HealthCheck.Interval * (instance.HealthCheck.Times + 1))
+	ttl := instance.HealthCheck.Interval * (instance.HealthCheck.Times + 1)
 
 	instanceFlag := fmt.Sprintf("ttl %ds, endpoints %v, host '%s', serviceID %s",
 		ttl, instance.Endpoints, instance.HostName, instance.ServiceId)
@@ -1960,6 +1960,12 @@ func registryInstance(ctx context.Context, request *discovery.RegisterInstanceRe
 		}, err
 	}
 
+	// need to complete the instance offline function in time, so you need to check the heartbeat after registering the instance
+	err = heartbeat.Instance().CheckInstance(ctx, instance)
+	if err != nil {
+		log.Error(fmt.Sprintf("fail to check instance, instance[%s]. operator %s", instance.InstanceId, remoteIP), err)
+	}
+
 	log.Info(fmt.Sprintf("register instance %s, instanceID %s, operator %s",
 		instanceFlag, insertRes.InsertedID, remoteIP))
 	return &discovery.RegisterInstanceResponse{