You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by ti...@apache.org on 2021/04/15 07:33:46 UTC
[servicecomb-service-center] branch master updated: SCB-2094
BugFix: join the heartbeat cache (#943)
This is an automated email from the ASF dual-hosted git repository.
tianxiaoliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-service-center.git
The following commit(s) were added to refs/heads/master by this push:
new 677b100 SCB-2094 BugFix: join the heartbeat cache (#943)
677b100 is described below
commit 677b1006e130ea9512c63cb928b788614824d145
Author: robotljw <79...@qq.com>
AuthorDate: Thu Apr 15 15:33:37 2021 +0800
SCB-2094 BugFix: join the heartbeat cache (#943)
---
datasource/mongo/heartbeat/cache/heartbeat.go | 97 +++++++++++-----------
datasource/mongo/heartbeat/cache/heartbeat_test.go | 22 ++---
datasource/mongo/heartbeat/cache/heartbeatcache.go | 24 ++++--
.../mongo/heartbeat/cache/heartbeatcache_test.go | 9 +-
.../heartbeat/{healthcheck.go => cache/types.go} | 20 ++---
.../mongo/heartbeat/checker/heartbeatchecker.go | 5 ++
datasource/mongo/heartbeat/healthcheck.go | 3 +
datasource/mongo/ms.go | 8 +-
8 files changed, 106 insertions(+), 82 deletions(-)
diff --git a/datasource/mongo/heartbeat/cache/heartbeat.go b/datasource/mongo/heartbeat/cache/heartbeat.go
index 61e9b53..4094cfd 100644
--- a/datasource/mongo/heartbeat/cache/heartbeat.go
+++ b/datasource/mongo/heartbeat/cache/heartbeat.go
@@ -22,6 +22,7 @@ import (
"errors"
"fmt"
"runtime"
+ "sync"
"time"
"github.com/patrickmn/go-cache"
@@ -44,59 +45,61 @@ const (
ctxTimeout = 5 * time.Second
)
-// Store cache structure
-type instanceHeartbeatInfo struct {
- serviceID string
- instanceID string
- ttl int32
- lastRefresh time.Time
-}
+var ErrHeartbeatTimeout = errors.New("heartbeat task waiting for processing timeout. ")
var (
- cacheChan chan *instanceHeartbeatInfo
- instanceHeartbeatStore = cache.New(0, instanceCheckerInternal)
- workerNum = runtime.NumCPU()
- heartbeatTaskTimeout = config.GetInt("registry.mongo.heartbeat.timeout", defaultTimeout)
- ErrHeartbeatTimeout = errors.New("heartbeat task waiting for processing timeout. ")
+ once sync.Once
+ cfg cacheConfig
)
-func init() {
- capacity := config.GetInt("registry.mongo.heartbeat.cacheCapacity", defaultCacheCapacity)
- cacheChan = make(chan *instanceHeartbeatInfo, capacity)
- num := config.GetInt("registry.mongo.heartbeat.workerNum", defaultWorkNum)
- if num != 0 {
- workerNum = num
- }
- instanceHeartbeatStore.OnEvicted(func(k string, v interface{}) {
- instanceInfo, ok := v.(*instanceHeartbeatInfo)
- if ok && instanceInfo != nil {
- ctx, cancel := context.WithTimeout(context.Background(), ctxTimeout)
- defer cancel()
- err := cleanInstance(ctx, instanceInfo.serviceID, instanceInfo.instanceID)
- if err != nil {
- log.Error("failed to cleanInstance in mongodb.", err)
- }
- }
- })
+type cacheConfig struct {
+ cacheChan chan *instanceHeartbeatInfo
+ instanceHeartbeatStore *cache.Cache
+ workerNum int
+ heartbeatTaskTimeout int
+}
- for i := 1; i <= workerNum; i++ {
- gopool.Go(func(ctx context.Context) {
- for {
- select {
- case <-ctx.Done():
- log.Warn("heartbeat work protocol exit.")
- return
- case heartbeatInfo, ok := <-cacheChan:
- if ok {
- instanceHeartbeatStore.Set(heartbeatInfo.instanceID, heartbeatInfo, time.Duration(heartbeatInfo.ttl)*time.Second)
- }
+func configuration() *cacheConfig {
+ once.Do(func() {
+ cfg.workerNum = runtime.NumCPU()
+ num := config.GetInt("registry.mongo.heartbeat.workerNum", defaultWorkNum)
+ if num != 0 {
+ cfg.workerNum = num
+ }
+ cfg.heartbeatTaskTimeout = config.GetInt("registry.mongo.heartbeat.timeout", defaultTimeout)
+ cfg.cacheChan = make(chan *instanceHeartbeatInfo, config.GetInt("registry.mongo.heartbeat.cacheCapacity", defaultCacheCapacity))
+ cfg.instanceHeartbeatStore = cache.New(0, instanceCheckerInternal)
+ cfg.instanceHeartbeatStore.OnEvicted(func(k string, v interface{}) {
+ instanceInfo, ok := v.(*instanceHeartbeatInfo)
+ if ok && instanceInfo != nil {
+ ctx, cancel := context.WithTimeout(context.Background(), ctxTimeout)
+ defer cancel()
+ err := cleanInstance(ctx, instanceInfo.serviceID, instanceInfo.instanceID)
+ if err != nil {
+ log.Error("failed to cleanInstance in mongodb.", err)
}
}
})
- }
+ for i := 1; i <= cfg.workerNum; i++ {
+ gopool.Go(func(ctx context.Context) {
+ for {
+ select {
+ case <-ctx.Done():
+ log.Warn("heartbeat work protocol exit.")
+ return
+ case heartbeatInfo, ok := <-cfg.cacheChan:
+ if ok {
+ cfg.instanceHeartbeatStore.Set(heartbeatInfo.instanceID, heartbeatInfo, time.Duration(heartbeatInfo.ttl)*time.Second)
+ }
+ }
+ }
+ })
+ }
+ })
+ return &cfg
}
-func addHeartbeatTask(serviceID string, instanceID string, ttl int32) error {
+func (c *cacheConfig) AddHeartbeatTask(serviceID string, instanceID string, ttl int32) error {
// Unassigned setting default value is 30s
if ttl <= 0 {
ttl = defaultTTL
@@ -108,16 +111,16 @@ func addHeartbeatTask(serviceID string, instanceID string, ttl int32) error {
lastRefresh: time.Now(),
}
select {
- case cacheChan <- newInstance:
+ case c.cacheChan <- newInstance:
return nil
- case <-time.After(time.Duration(heartbeatTaskTimeout) * time.Second):
+ case <-time.After(time.Duration(c.heartbeatTaskTimeout) * time.Second):
log.Warn("the heartbeat's channel is full. ")
return ErrHeartbeatTimeout
}
}
-func RemoveCacheInstance(instanceID string) {
- instanceHeartbeatStore.Delete(instanceID)
+func (c *cacheConfig) RemoveCacheInstance(instanceID string) {
+ c.instanceHeartbeatStore.Delete(instanceID)
}
func cleanInstance(ctx context.Context, serviceID string, instanceID string) error {
diff --git a/datasource/mongo/heartbeat/cache/heartbeat_test.go b/datasource/mongo/heartbeat/cache/heartbeat_test.go
index d6845e6..26bbc4a 100644
--- a/datasource/mongo/heartbeat/cache/heartbeat_test.go
+++ b/datasource/mongo/heartbeat/cache/heartbeat_test.go
@@ -42,6 +42,8 @@ func init() {
client.NewMongoClient(config)
}
+var c = configuration()
+
func TestAddCacheInstance(t *testing.T) {
t.Run("add cache instance: set the ttl to 2 seconds", func(t *testing.T) {
instance1 := model.Instance{
@@ -55,11 +57,11 @@ func TestAddCacheInstance(t *testing.T) {
},
},
}
- err := addHeartbeatTask(instance1.Instance.ServiceId, instance1.Instance.InstanceId, instance1.Instance.HealthCheck.Interval*(instance1.Instance.HealthCheck.Times+1))
+ err := c.AddHeartbeatTask(instance1.Instance.ServiceId, instance1.Instance.InstanceId, instance1.Instance.HealthCheck.Interval*(instance1.Instance.HealthCheck.Times+1))
assert.Equal(t, nil, err)
_, err = client.GetMongoClient().Insert(context.Background(), model.CollectionInstance, instance1)
assert.Equal(t, nil, err)
- info, ok := instanceHeartbeatStore.Get(instance1.Instance.InstanceId)
+ info, ok := c.instanceHeartbeatStore.Get(instance1.Instance.InstanceId)
assert.Equal(t, true, ok)
if ok {
heartBeatInfo := info.(*instanceHeartbeatInfo)
@@ -67,7 +69,7 @@ func TestAddCacheInstance(t *testing.T) {
assert.Equal(t, instance1.Instance.HealthCheck.Interval*(instance1.Instance.HealthCheck.Times+1), heartBeatInfo.ttl)
}
time.Sleep(2 * time.Second)
- _, ok = instanceHeartbeatStore.Get(instance1.Instance.InstanceId)
+ _, ok = c.instanceHeartbeatStore.Get(instance1.Instance.InstanceId)
assert.Equal(t, false, ok)
_, err = client.GetMongoClient().Delete(context.Background(), model.CollectionInstance, instance1)
assert.Equal(t, nil, err)
@@ -85,11 +87,11 @@ func TestAddCacheInstance(t *testing.T) {
},
},
}
- err := addHeartbeatTask(instance1.Instance.ServiceId, instance1.Instance.InstanceId, instance1.Instance.HealthCheck.Interval*(instance1.Instance.HealthCheck.Times+1))
+ err := c.AddHeartbeatTask(instance1.Instance.ServiceId, instance1.Instance.InstanceId, instance1.Instance.HealthCheck.Interval*(instance1.Instance.HealthCheck.Times+1))
assert.Equal(t, nil, err)
_, err = client.GetMongoClient().Insert(context.Background(), model.CollectionInstance, instance1)
assert.Equal(t, nil, err)
- info, ok := instanceHeartbeatStore.Get(instance1.Instance.InstanceId)
+ info, ok := c.instanceHeartbeatStore.Get(instance1.Instance.InstanceId)
assert.Equal(t, true, ok)
if ok {
heartBeatInfo := info.(*instanceHeartbeatInfo)
@@ -97,7 +99,7 @@ func TestAddCacheInstance(t *testing.T) {
assert.Equal(t, int32(defaultTTL), heartBeatInfo.ttl)
}
time.Sleep(defaultTTL * time.Second)
- _, ok = instanceHeartbeatStore.Get(instance1.Instance.InstanceId)
+ _, ok = c.instanceHeartbeatStore.Get(instance1.Instance.InstanceId)
assert.Equal(t, false, ok)
_, err = client.GetMongoClient().Delete(context.Background(), model.CollectionInstance, instance1)
assert.Equal(t, nil, err)
@@ -117,11 +119,11 @@ func TestRemoveCacheInstance(t *testing.T) {
},
},
}
- err := addHeartbeatTask(instance1.Instance.ServiceId, instance1.Instance.InstanceId, instance1.Instance.HealthCheck.Interval*(instance1.Instance.HealthCheck.Times+1))
+ err := c.AddHeartbeatTask(instance1.Instance.ServiceId, instance1.Instance.InstanceId, instance1.Instance.HealthCheck.Interval*(instance1.Instance.HealthCheck.Times+1))
assert.Equal(t, nil, err)
_, err = client.GetMongoClient().Insert(context.Background(), model.CollectionInstance, instance1)
assert.Equal(t, nil, err)
- info, ok := instanceHeartbeatStore.Get(instance1.Instance.InstanceId)
+ info, ok := c.instanceHeartbeatStore.Get(instance1.Instance.InstanceId)
assert.Equal(t, true, ok)
if ok {
heartBeatInfo := info.(*instanceHeartbeatInfo)
@@ -129,8 +131,8 @@ func TestRemoveCacheInstance(t *testing.T) {
assert.Equal(t, instance1.Instance.HealthCheck.Interval*(instance1.Instance.HealthCheck.Times+1), heartBeatInfo.ttl)
}
time.Sleep(4 * time.Second)
- RemoveCacheInstance(instance1.Instance.InstanceId)
- _, ok = instanceHeartbeatStore.Get(instance1.Instance.InstanceId)
+ c.RemoveCacheInstance(instance1.Instance.InstanceId)
+ _, ok = c.instanceHeartbeatStore.Get(instance1.Instance.InstanceId)
assert.Equal(t, false, ok)
_, err = client.GetMongoClient().Delete(context.Background(), model.CollectionInstance, instance1)
assert.Equal(t, nil, err)
diff --git a/datasource/mongo/heartbeat/cache/heartbeatcache.go b/datasource/mongo/heartbeat/cache/heartbeatcache.go
index 31ae699..c6381e9 100644
--- a/datasource/mongo/heartbeat/cache/heartbeatcache.go
+++ b/datasource/mongo/heartbeat/cache/heartbeatcache.go
@@ -44,20 +44,26 @@ func init() {
}
type HeartBeatCache struct {
+ cfg *cacheConfig
}
func NewHeartBeatCache(opts heartbeat.Options) (heartbeat.HealthCheck, error) {
- return &HeartBeatCache{}, nil
+ return &HeartBeatCache{cfg: configuration()}, nil
}
func (h *HeartBeatCache) Heartbeat(ctx context.Context, request *pb.HeartbeatRequest) (*pb.HeartbeatResponse, error) {
- if ins, ok := instanceHeartbeatStore.Get(request.InstanceId); ok {
- return inCacheStrategy(ctx, request, ins)
+ if ins, ok := h.cfg.instanceHeartbeatStore.Get(request.InstanceId); ok {
+ return h.inCacheStrategy(ctx, request, ins)
}
- return notInCacheStrategy(ctx, request)
+ return h.notInCacheStrategy(ctx, request)
}
-func inCacheStrategy(ctx context.Context, request *pb.HeartbeatRequest, insHeartbeatInfo interface{}) (*pb.HeartbeatResponse, error) {
+// Add instance related information to the cache
+func (h *HeartBeatCache) CheckInstance(ctx context.Context, instance *pb.MicroServiceInstance) error {
+ return h.cfg.AddHeartbeatTask(instance.ServiceId, instance.InstanceId, instance.HealthCheck.Interval*(instance.HealthCheck.Times+1))
+}
+
+func (h *HeartBeatCache) inCacheStrategy(ctx context.Context, request *pb.HeartbeatRequest, insHeartbeatInfo interface{}) (*pb.HeartbeatResponse, error) {
remoteIP := util.GetIPFromContext(ctx)
heartbeatInfo, ok := insHeartbeatInfo.(*instanceHeartbeatInfo)
if !ok {
@@ -67,7 +73,7 @@ func inCacheStrategy(ctx context.Context, request *pb.HeartbeatRequest, insHeart
}
return resp, ErrHeartbeatConversionFailed
}
- err := addHeartbeatTask(request.ServiceId, request.InstanceId, heartbeatInfo.ttl)
+ err := h.cfg.AddHeartbeatTask(request.ServiceId, request.InstanceId, heartbeatInfo.ttl)
if err != nil {
log.Error(fmt.Sprintf("heartbeat failed, instance[%s]. operator %s", request.InstanceId, remoteIP), err)
resp := &pb.HeartbeatResponse{
@@ -88,7 +94,7 @@ func inCacheStrategy(ctx context.Context, request *pb.HeartbeatRequest, insHeart
}, nil
}
-func notInCacheStrategy(ctx context.Context, request *pb.HeartbeatRequest) (*pb.HeartbeatResponse, error) {
+func (h *HeartBeatCache) notInCacheStrategy(ctx context.Context, request *pb.HeartbeatRequest) (*pb.HeartbeatResponse, error) {
remoteIP := util.GetIPFromContext(ctx)
instance, err := findInstance(ctx, request.ServiceId, request.InstanceId)
if err != nil {
@@ -106,7 +112,7 @@ func notInCacheStrategy(ctx context.Context, request *pb.HeartbeatRequest) (*pb.
if times > maxTimes || times < minTimes {
times = maxTimes
}
- err = addHeartbeatTask(request.ServiceId, request.InstanceId, interval*(times+1))
+ err = h.cfg.AddHeartbeatTask(request.ServiceId, request.InstanceId, interval*(times+1))
if err != nil {
log.Error(fmt.Sprintf("heartbeat failed, instance[%s]. operator %s", request.InstanceId, remoteIP), err)
resp := &pb.HeartbeatResponse{
@@ -116,7 +122,7 @@ func notInCacheStrategy(ctx context.Context, request *pb.HeartbeatRequest) (*pb.
}
err = updateInstance(ctx, request.ServiceId, request.InstanceId)
if err != nil {
- RemoveCacheInstance(request.InstanceId)
+ h.cfg.RemoveCacheInstance(request.InstanceId)
log.Error(fmt.Sprintf("heartbeat failed, instance[%s]. operator %s", request.InstanceId, remoteIP), err)
resp := &pb.HeartbeatResponse{
Response: pb.CreateResponseWithSCErr(pb.NewError(pb.ErrInstanceNotExists, err.Error())),
diff --git a/datasource/mongo/heartbeat/cache/heartbeatcache_test.go b/datasource/mongo/heartbeat/cache/heartbeatcache_test.go
index da2308a..37bb0c2 100644
--- a/datasource/mongo/heartbeat/cache/heartbeatcache_test.go
+++ b/datasource/mongo/heartbeat/cache/heartbeatcache_test.go
@@ -31,9 +31,10 @@ import (
"github.com/apache/servicecomb-service-center/datasource/mongo/util"
)
+var heartBeatCheck = &HeartBeatCache{cfg: configuration()}
+
func TestHeartBeatCheck(t *testing.T) {
t.Run("heartbeat check: instance does not exist,it should be failed", func(t *testing.T) {
- heartBeatCheck := &HeartBeatCache{}
resp, err := heartBeatCheck.Heartbeat(context.Background(), &pb.HeartbeatRequest{
ServiceId: "serviceId1",
InstanceId: "not-exist-ins",
@@ -43,9 +44,8 @@ func TestHeartBeatCheck(t *testing.T) {
})
t.Run("heartbeat check: data exists in the cache,but not in db,it should be failed", func(t *testing.T) {
- err := addHeartbeatTask("not-exist-svc", "not-exist-ins", 30)
+ err := heartBeatCheck.cfg.AddHeartbeatTask("not-exist-svc", "not-exist-ins", 30)
assert.Nil(t, err)
- heartBeatCheck := &HeartBeatCache{}
resp, err := heartBeatCheck.Heartbeat(context.Background(), &pb.HeartbeatRequest{
ServiceId: "serviceId1",
InstanceId: "not-exist-ins",
@@ -55,7 +55,6 @@ func TestHeartBeatCheck(t *testing.T) {
})
t.Run("heartbeat check: data exists in the cache and db,it can be update successfully", func(t *testing.T) {
- heartBeatCheck := &HeartBeatCache{}
instanceDB := model.Instance{
RefreshTime: time.Now(),
Instance: &pb.MicroServiceInstance{
@@ -73,7 +72,7 @@ func TestHeartBeatCheck(t *testing.T) {
_, _ = client.GetMongoClient().Delete(context.Background(), model.CollectionInstance, filter)
_, err := client.GetMongoClient().Insert(context.Background(), model.CollectionInstance, instanceDB)
assert.Equal(t, nil, err)
- err = addHeartbeatTask(instanceDB.Instance.ServiceId, instanceDB.Instance.InstanceId, instanceDB.Instance.HealthCheck.Interval*(instanceDB.Instance.HealthCheck.Times+1))
+ err = heartBeatCheck.cfg.AddHeartbeatTask(instanceDB.Instance.ServiceId, instanceDB.Instance.InstanceId, instanceDB.Instance.HealthCheck.Interval*(instanceDB.Instance.HealthCheck.Times+1))
assert.Equal(t, nil, err)
resp, err := heartBeatCheck.Heartbeat(context.Background(), &pb.HeartbeatRequest{
ServiceId: "serviceIdDB",
diff --git a/datasource/mongo/heartbeat/healthcheck.go b/datasource/mongo/heartbeat/cache/types.go
similarity index 65%
copy from datasource/mongo/heartbeat/healthcheck.go
copy to datasource/mongo/heartbeat/cache/types.go
index 193b8c9..01da683 100644
--- a/datasource/mongo/heartbeat/healthcheck.go
+++ b/datasource/mongo/heartbeat/cache/types.go
@@ -3,26 +3,26 @@
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
+ * (the "License"); you may not use this file except request compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
+ * Unless required by applicable law or agreed to request writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package heartbeat
+package heartbeatcache
-import (
- "context"
+import "time"
- pb "github.com/go-chassis/cari/discovery"
-)
-
-type HealthCheck interface {
- Heartbeat(ctx context.Context, request *pb.HeartbeatRequest) (*pb.HeartbeatResponse, error)
+// Store cache structure
+type instanceHeartbeatInfo struct {
+ serviceID string
+ instanceID string
+ ttl int32
+ lastRefresh time.Time
}
diff --git a/datasource/mongo/heartbeat/checker/heartbeatchecker.go b/datasource/mongo/heartbeat/checker/heartbeatchecker.go
index 5183185..b3522b3 100644
--- a/datasource/mongo/heartbeat/checker/heartbeatchecker.go
+++ b/datasource/mongo/heartbeat/checker/heartbeatchecker.go
@@ -54,3 +54,8 @@ func (h *HeartBeatChecker) Heartbeat(ctx context.Context, request *pb.HeartbeatR
"Update service instance heartbeat successfully."),
}, nil
}
+
+func (h *HeartBeatChecker) CheckInstance(ctx context.Context, instance *pb.MicroServiceInstance) error {
+ // do nothing
+ return nil
+}
diff --git a/datasource/mongo/heartbeat/healthcheck.go b/datasource/mongo/heartbeat/healthcheck.go
index 193b8c9..5cb9fa1 100644
--- a/datasource/mongo/heartbeat/healthcheck.go
+++ b/datasource/mongo/heartbeat/healthcheck.go
@@ -24,5 +24,8 @@ import (
)
type HealthCheck interface {
+ // processing heartbeat request
Heartbeat(ctx context.Context, request *pb.HeartbeatRequest) (*pb.HeartbeatResponse, error)
+ // processing heartbeat check of instance after registration
+ CheckInstance(ctx context.Context, instance *pb.MicroServiceInstance) error
}
diff --git a/datasource/mongo/ms.go b/datasource/mongo/ms.go
index 5d91558..3d224b8 100644
--- a/datasource/mongo/ms.go
+++ b/datasource/mongo/ms.go
@@ -1933,7 +1933,7 @@ func registryInstance(ctx context.Context, request *discovery.RegisterInstanceRe
project := util.ParseProject(ctx)
remoteIP := util.GetIPFromContext(ctx)
instance := request.Instance
- ttl := int64(instance.HealthCheck.Interval * (instance.HealthCheck.Times + 1))
+ ttl := instance.HealthCheck.Interval * (instance.HealthCheck.Times + 1)
instanceFlag := fmt.Sprintf("ttl %ds, endpoints %v, host '%s', serviceID %s",
ttl, instance.Endpoints, instance.HostName, instance.ServiceId)
@@ -1960,6 +1960,12 @@ func registryInstance(ctx context.Context, request *discovery.RegisterInstanceRe
}, err
}
+ // need to complete the instance offline function in time, so you need to check the heartbeat after registering the instance
+ err = heartbeat.Instance().CheckInstance(ctx, instance)
+ if err != nil {
+ log.Error(fmt.Sprintf("fail to check instance, instance[%s]. operator %s", instance.InstanceId, remoteIP), err)
+ }
+
log.Info(fmt.Sprintf("register instance %s, instanceID %s, operator %s",
instanceFlag, insertRes.InsertedID, remoteIP))
return &discovery.RegisterInstanceResponse{