You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by ro...@apache.org on 2021/06/16 08:10:03 UTC

[servicecomb-service-center] branch master updated: fix cache bugs

This is an automated email from the ASF dual-hosted git repository.

robotljw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-service-center.git


The following commit(s) were added to refs/heads/master by this push:
     new 8476900  fix cache bugs
     new c2de051  Merge pull request #1047 from DFSOrange/319
8476900 is described below

commit 8476900a587156c91efb46752a4c9eefc10dd0fc
Author: DFSOrange <pq...@163.com>
AuthorDate: Wed Jun 16 09:28:24 2021 +0800

    fix cache bugs
---
 datasource/mongo/event/instance_event_handler.go |  3 --
 datasource/mongo/sd/dep_cache.go                 | 11 +++-
 datasource/mongo/sd/instance_cache.go            | 13 ++++-
 datasource/mongo/sd/instancec_test.go            | 66 ++++++++++++++++++++++++
 datasource/mongo/sd/mongo_cacher.go              |  5 ++
 datasource/mongo/sd/rule_cache.go                | 12 ++++-
 datasource/mongo/sd/service_cache.go             | 11 +++-
 pkg/event/bus_service.go                         |  2 +-
 server/handler/metrics/metrics.go                |  6 ++-
 9 files changed, 120 insertions(+), 9 deletions(-)

diff --git a/datasource/mongo/event/instance_event_handler.go b/datasource/mongo/event/instance_event_handler.go
index 691e07d..4f2d57d 100644
--- a/datasource/mongo/event/instance_event_handler.go
+++ b/datasource/mongo/event/instance_event_handler.go
@@ -50,9 +50,6 @@ func (h InstanceEventHandler) Type() string {
 
 func (h InstanceEventHandler) OnEvent(evt sd.MongoEvent) {
 	action := evt.Type
-	if evt.Type == discovery.EVT_UPDATE {
-		return
-	}
 	instance, ok := evt.Value.(model.Instance)
 	if !ok {
 		log.Error("failed to assert instance", datasource.ErrAssertFail)
diff --git a/datasource/mongo/sd/dep_cache.go b/datasource/mongo/sd/dep_cache.go
index 124cbec..cda7aa4 100644
--- a/datasource/mongo/sd/dep_cache.go
+++ b/datasource/mongo/sd/dep_cache.go
@@ -18,6 +18,7 @@
 package sd
 
 import (
+	"reflect"
 	"strings"
 
 	"github.com/apache/servicecomb-service-center/datasource/mongo/client/model"
@@ -123,7 +124,15 @@ func (s *depStore) ProcessDelete(event MongoEvent) {
 }
 
 func (s *depStore) isValueNotUpdated(value interface{}, newValue interface{}) bool {
-	return false
+	newDep, ok := newValue.(model.DependencyRule)
+	if !ok {
+		return true
+	}
+	oldDep, ok := value.(model.DependencyRule)
+	if !ok {
+		return true
+	}
+	return reflect.DeepEqual(newDep, oldDep)
 }
 
 func genDepServiceKey(dep model.DependencyRule) string {
diff --git a/datasource/mongo/sd/instance_cache.go b/datasource/mongo/sd/instance_cache.go
index db8d371..5072835 100644
--- a/datasource/mongo/sd/instance_cache.go
+++ b/datasource/mongo/sd/instance_cache.go
@@ -18,6 +18,8 @@
 package sd
 
 import (
+	"reflect"
+
 	"github.com/apache/servicecomb-service-center/datasource/mongo/client/model"
 	"github.com/apache/servicecomb-service-center/datasource/sdcommon"
 	rmodel "github.com/go-chassis/cari/discovery"
@@ -120,7 +122,16 @@ func (s *instanceStore) ProcessDelete(event MongoEvent) {
 }
 
 func (s *instanceStore) isValueNotUpdated(value interface{}, newValue interface{}) bool {
-	return true
+	newInst, ok := newValue.(model.Instance)
+	if !ok {
+		return true
+	}
+	oldInst, ok := value.(model.Instance)
+	if !ok {
+		return true
+	}
+	newInst.RefreshTime = oldInst.RefreshTime
+	return reflect.DeepEqual(newInst, oldInst)
 }
 
 func genInstServiceID(inst model.Instance) string {
diff --git a/datasource/mongo/sd/instancec_test.go b/datasource/mongo/sd/instancec_test.go
index 9b2501d..29aaf40 100644
--- a/datasource/mongo/sd/instancec_test.go
+++ b/datasource/mongo/sd/instancec_test.go
@@ -21,6 +21,7 @@ package sd
 
 import (
 	"testing"
+	"time"
 
 	"github.com/apache/servicecomb-service-center/datasource/mongo/client/model"
 	"github.com/go-chassis/cari/discovery"
@@ -83,3 +84,68 @@ func TestInstCacheBasicFunc(t *testing.T) {
 		assert.Len(t, instanceCache.cache.GetValue("svcid"), 0)
 	})
 }
+
+func TestInstValueUpdate(t *testing.T) {
+	inst1 := model.Instance{
+		Domain:      "d1",
+		Project:     "p1",
+		RefreshTime: time.Time{},
+		Instance: &discovery.MicroServiceInstance{
+			InstanceId: "123",
+			Version:    "1.0",
+		},
+	}
+	inst2 := model.Instance{
+		Domain:      "d1",
+		Project:     "p1",
+		RefreshTime: time.Time{},
+		Instance: &discovery.MicroServiceInstance{
+			InstanceId: "123",
+			Version:    "1.0",
+		},
+	}
+	inst3 := model.Instance{
+		Domain:      "d2",
+		Project:     "p2",
+		RefreshTime: time.Time{},
+		Instance: &discovery.MicroServiceInstance{
+			InstanceId: "123",
+			Version:    "1.0",
+		},
+	}
+	inst4 := model.Instance{
+		Domain:      "d2",
+		Project:     "p2",
+		RefreshTime: time.Time{},
+		Instance: &discovery.MicroServiceInstance{
+			InstanceId: "123",
+			Version:    "1.1",
+		},
+	}
+	inst5 := model.Instance{
+		Domain:      "d2",
+		Project:     "p2",
+		RefreshTime: time.Now(),
+		Instance: &discovery.MicroServiceInstance{
+			InstanceId: "123",
+			Version:    "1.1",
+		},
+	}
+
+	t.Run("given the same instances expect equal result", func(t *testing.T) {
+		res := instanceCache.cache.isValueNotUpdated(inst1, inst2)
+		assert.True(t, res)
+	})
+	t.Run("given instances with the different domain expect not equal result", func(t *testing.T) {
+		res := instanceCache.cache.isValueNotUpdated(inst2, inst3)
+		assert.False(t, res)
+	})
+	t.Run("given instances with the different version expect not equal result", func(t *testing.T) {
+		res := instanceCache.cache.isValueNotUpdated(inst3, inst4)
+		assert.False(t, res)
+	})
+	t.Run("given instances with the different refresh time expect  equal result", func(t *testing.T) {
+		res := instanceCache.cache.isValueNotUpdated(inst4, inst5)
+		assert.True(t, res)
+	})
+}
diff --git a/datasource/mongo/sd/mongo_cacher.go b/datasource/mongo/sd/mongo_cacher.go
index 986d15f..9a9c7b4 100644
--- a/datasource/mongo/sd/mongo_cacher.go
+++ b/datasource/mongo/sd/mongo_cacher.go
@@ -193,6 +193,10 @@ func (c *MongoCacher) handleEventBus(eventbus *sdcommon.EventBus) error {
 		}
 
 		for _, resource := range resp.Resources {
+			if resource.Value == nil {
+				log.Error(fmt.Sprintf("get nil value while watch for mongocache,the docID is %s", resource.Key), nil)
+				break
+			}
 			action := resp.Action
 			var event MongoEvent
 			switch action {
@@ -265,6 +269,7 @@ func (c *MongoCacher) filter(infos []*sdcommon.Resource) []MongoEvent {
 	for block := range eventsCh {
 		for _, e := range block {
 			if e.Value == nil {
+				log.Error(fmt.Sprintf("get nil value while do list, the docID is %s", e.DocumentID), nil)
 				break
 			}
 			events = append(events, e)
diff --git a/datasource/mongo/sd/rule_cache.go b/datasource/mongo/sd/rule_cache.go
index 8809eab..638256b 100644
--- a/datasource/mongo/sd/rule_cache.go
+++ b/datasource/mongo/sd/rule_cache.go
@@ -18,6 +18,8 @@
 package sd
 
 import (
+	"reflect"
+
 	"github.com/apache/servicecomb-service-center/datasource/mongo/client/model"
 	"github.com/apache/servicecomb-service-center/datasource/sdcommon"
 	"go.mongodb.org/mongo-driver/bson"
@@ -115,7 +117,15 @@ func (s *ruleStore) ProcessDelete(event MongoEvent) {
 }
 
 func (s *ruleStore) isValueNotUpdated(value interface{}, newValue interface{}) bool {
-	return false
+	newRule, ok := newValue.(model.Rule)
+	if !ok {
+		return true
+	}
+	oldRule, ok := value.(model.Rule)
+	if !ok {
+		return true
+	}
+	return reflect.DeepEqual(newRule, oldRule)
 }
 
 func genRuleServiceID(rule model.Rule) string {
diff --git a/datasource/mongo/sd/service_cache.go b/datasource/mongo/sd/service_cache.go
index 23d4f13..03d1764 100644
--- a/datasource/mongo/sd/service_cache.go
+++ b/datasource/mongo/sd/service_cache.go
@@ -18,6 +18,7 @@
 package sd
 
 import (
+	"reflect"
 	"strings"
 
 	"github.com/apache/servicecomb-service-center/datasource/mongo/client/model"
@@ -119,7 +120,15 @@ func (s *serviceStore) ProcessDelete(event MongoEvent) {
 }
 
 func (s *serviceStore) isValueNotUpdated(value interface{}, newValue interface{}) bool {
-	return false
+	newService, ok := newValue.(model.Service)
+	if !ok {
+		return true
+	}
+	oldService, ok := value.(model.Service)
+	if !ok {
+		return true
+	}
+	return reflect.DeepEqual(newService, oldService)
 }
 
 func genServiceID(svc model.Service) string {
diff --git a/pkg/event/bus_service.go b/pkg/event/bus_service.go
index 2b53597..812a241 100644
--- a/pkg/event/bus_service.go
+++ b/pkg/event/bus_service.go
@@ -128,7 +128,7 @@ func (s *BusService) Fire(evt Event) error {
 	bus, ok := s.buses[evt.Type()]
 	if !ok {
 		s.mux.RUnlock()
-		return fmt.Errorf("unknown event type[%s]", evt.Type())
+		return fmt.Errorf("no %s subscriber on this service center", evt.Type())
 	}
 	s.mux.RUnlock()
 	bus.Fire(evt)
diff --git a/server/handler/metrics/metrics.go b/server/handler/metrics/metrics.go
index 36dca1d..33c2bfa 100644
--- a/server/handler/metrics/metrics.go
+++ b/server/handler/metrics/metrics.go
@@ -21,6 +21,8 @@ import (
 	"net/http"
 	"time"
 
+	"github.com/apache/servicecomb-service-center/server/config"
+
 	"github.com/apache/servicecomb-service-center/pkg/chain"
 	"github.com/apache/servicecomb-service-center/pkg/log"
 	"github.com/apache/servicecomb-service-center/pkg/rest"
@@ -44,5 +46,7 @@ func (h *Handler) Handle(i *chain.Invocation) {
 }
 
 func RegisterHandlers() {
-	chain.RegisterHandler(rest.ServerChainName, &Handler{})
+	if config.GetBool("metrics.enable", false) {
+		chain.RegisterHandler(rest.ServerChainName, &Handler{})
+	}
 }