You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by li...@apache.org on 2018/01/08 01:54:34 UTC

[incubator-servicecomb-service-center] branch master updated: SCB-159 Watcher will get a wrong event sequence when SC modify resource concurrently. (#245)

This is an automated email from the ASF dual-hosted git repository.

littlecui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-service-center.git


The following commit(s) were added to refs/heads/master by this push:
     new 7c45b82  SCB-159 Watcher will get a wrong event sequence when SC modify resource concurrently. (#245)
7c45b82 is described below

commit 7c45b824abd34d50afe9d7c74e340cf0ec432b19
Author: little-cui <su...@qq.com>
AuthorDate: Mon Jan 8 09:54:32 2018 +0800

    SCB-159 Watcher will get a wrong event sequence when SC modify resource concurrently. (#245)
    
    * SCB-127 Indexer search with prefix in cache always return 0.
    
    * SCB-159 Watcher will get a wrong event sequence when SC modify resource concurrently.
---
 pkg/util/log.go                                      |  4 ++--
 server/core/backend/store/cacher.go                  | 14 +++++++++-----
 server/core/backend/store/defer.go                   |  7 +------
 server/core/backend/store/indexer.go                 | 11 ++---------
 server/govern/service.go                             |  2 +-
 server/service/notification/listwatcher.go           | 20 +++++++++++++++++---
 .../notification/notification_healthchecker.go       | 11 +++++++++--
 server/service/notification/notification_service.go  | 18 +++++++++---------
 server/service/notification/struct.go                |  7 +++++--
 server/service/util/instance_util.go                 |  2 +-
 10 files changed, 56 insertions(+), 40 deletions(-)

diff --git a/pkg/util/log.go b/pkg/util/log.go
index cc8e462..00de0b8 100644
--- a/pkg/util/log.go
+++ b/pkg/util/log.go
@@ -119,12 +119,12 @@ func getCalleeFuncName() string {
 	for i := 2; i <= 4; i++ {
 		pc, file, _, ok := runtime.Caller(i)
 
-		if strings.Index(file, "log.go") > 0 {
+		if strings.Index(file, "/log.go") > 0 {
 			continue
 		}
 
 		if ok {
-			idx := strings.LastIndex(file, "src")
+			idx := strings.LastIndex(file, "/src/")
 			switch {
 			case idx >= 0:
 				fullName = file[idx+4:]
diff --git a/server/core/backend/store/cacher.go b/server/core/backend/store/cacher.go
index b1ab867..7ca82cf 100644
--- a/server/core/backend/store/cacher.go
+++ b/server/core/backend/store/cacher.go
@@ -41,6 +41,7 @@ type Cache interface {
 	Version() int64
 	Data(interface{}) interface{}
 	Have(interface{}) bool
+	Size() int
 }
 
 type Cacher interface {
@@ -65,6 +66,10 @@ func (n *nullCache) Have(interface{}) bool {
 	return false
 }
 
+func (n *nullCache) Size() int {
+	return 0
+}
+
 type nullCacher struct {
 }
 
@@ -128,15 +133,15 @@ func (c *KvCache) Unlock() {
 	if c.size >= l &&
 		c.lastMaxSize > c.size*DEFAULT_COMPACT_TIMES &&
 		time.Now().Sub(c.lastRefresh) >= DEFAULT_COMPACT_TIMEOUT {
-		util.Logger().Infof("cache is empty and not in use over %s, compact capacity to size %d->%d",
-			DEFAULT_COMPACT_TIMEOUT, c.lastMaxSize, c.size)
+		util.Logger().Infof("cache %s is not in use over %s, compact capacity to size %d->%d",
+			c.owner.Cfg.Key, DEFAULT_COMPACT_TIMEOUT, c.lastMaxSize, c.size)
 		// gc
 		newCache := make(map[string]*mvccpb.KeyValue, c.size)
 		for k, v := range c.store {
 			newCache[k] = v
 		}
 		c.store = newCache
-		c.lastMaxSize = c.size
+		c.lastMaxSize = l
 		c.lastRefresh = time.Now()
 	}
 	c.rwMux.Unlock()
@@ -313,7 +318,7 @@ func (c *KvCacher) filter(rev int64, items []*mvccpb.KeyValue) []*Event {
 		max = nc
 	}
 
-	newStore := make(map[string]*mvccpb.KeyValue)
+	newStore := make(map[string]*mvccpb.KeyValue, nc)
 	for _, kv := range items {
 		newStore[util.BytesToStringWithNoCopy(kv.Key)] = kv
 	}
@@ -532,7 +537,6 @@ func NewKvCache(c *KvCacher, size int) *KvCache {
 	return &KvCache{
 		owner:       c,
 		size:        size,
-		lastMaxSize: size,
 		store:       make(map[string]*mvccpb.KeyValue, size),
 		lastRefresh: time.Now(),
 	}
diff --git a/server/core/backend/store/defer.go b/server/core/backend/store/defer.go
index 1c45a54..7f7bbfa 100644
--- a/server/core/backend/store/defer.go
+++ b/server/core/backend/store/defer.go
@@ -46,12 +46,7 @@ func (iedh *InstanceEventDeferHandler) deferMode(total int, del int) bool {
 }
 
 func (iedh *InstanceEventDeferHandler) needDefer(cache Cache, evts []*Event) bool {
-	kvCache, ok := cache.(*KvCache)
-	if !ok {
-		return false
-	}
-
-	if !iedh.deferMode(kvCache.Size(), len(evts)) {
+	if !iedh.deferMode(cache.Size(), len(evts)) {
 		return false
 	}
 
diff --git a/server/core/backend/store/indexer.go b/server/core/backend/store/indexer.go
index 65349bd..528922d 100644
--- a/server/core/backend/store/indexer.go
+++ b/server/core/backend/store/indexer.go
@@ -224,11 +224,7 @@ func (i *Indexer) getPrefixKey(arr *[]string, prefix string) (count int) {
 	}
 
 	for key := range keysRef {
-		var childs *[]string = nil
-		if arr != nil {
-			childs = &[]string{}
-		}
-		n := i.getPrefixKey(childs, key)
+		n := i.getPrefixKey(arr, key)
 		if n == 0 {
 			count += len(keysRef)
 			if arr != nil {
@@ -238,10 +234,7 @@ func (i *Indexer) getPrefixKey(arr *[]string, prefix string) (count int) {
 			}
 			break
 		}
-		/*count += n
-		if arr != nil {
-			*arr = append(*arr, *childs...)
-		}*/
+		count += n
 	}
 	return count
 }
diff --git a/server/govern/service.go b/server/govern/service.go
index e059aab..15e36d1 100644
--- a/server/govern/service.go
+++ b/server/govern/service.go
@@ -256,7 +256,7 @@ func getSchemaInfoUtil(ctx context.Context, domainProject string, serviceId stri
 		registry.WithStrKey(key),
 		registry.WithPrefix())
 	if err != nil {
-		util.Logger().Errorf(err, "Get schema failed,%s")
+		util.Logger().Errorf(err, "Get schema failed")
 		return make([]*pb.Schema, 0), err
 	}
 	schemas := make([]*pb.Schema, 0, len(resp.Kvs))
diff --git a/server/service/notification/listwatcher.go b/server/service/notification/listwatcher.go
index f90eefc..8a34076 100644
--- a/server/service/notification/listwatcher.go
+++ b/server/service/notification/listwatcher.go
@@ -19,6 +19,7 @@ package notification
 import (
 	"github.com/apache/incubator-servicecomb-service-center/pkg/util"
 	pb "github.com/apache/incubator-servicecomb-service-center/server/core/proto"
+	"time"
 )
 
 // 状态变化推送
@@ -64,7 +65,14 @@ func (w *ListWatcher) OnMessage(job NotifyJob) {
 		return
 	}
 
-	<-w.listCh
+	select {
+	case <-w.listCh:
+	case <-time.After(DEFAULT_ON_MESSAGE_TIMEOUT):
+		util.Logger().Errorf(nil,
+			"the %s listwatcher %s %s is not ready[over %s], drop the event %v",
+			w.Type(), w.Id(), w.Subject(), DEFAULT_ON_MESSAGE_TIMEOUT, job)
+		return
+	}
 
 	if job.(*WatchJob).Revision <= w.ListRevision {
 		util.Logger().Warnf(nil,
@@ -76,10 +84,16 @@ func (w *ListWatcher) OnMessage(job NotifyJob) {
 }
 
 func (w *ListWatcher) sendMessage(job NotifyJob) {
-	util.Logger().Debugf("start notify %s watcher %s %s, job is %v, current revision is %v", w.Type(),
+	util.Logger().Debugf("start to notify %s watcher %s %s, job is %v, current revision is %v", w.Type(),
 		w.Id(), w.Subject(), job, w.ListRevision)
 	defer util.RecoverAndReport()
-	w.Job <- job
+	select {
+	case w.Job <- job:
+	case <-time.After(DEFAULT_ON_MESSAGE_TIMEOUT):
+		util.Logger().Errorf(nil,
+			"the %s watcher %s %s event queue is full[over %s], drop the event %v",
+			w.Type(), w.Id(), w.Subject(), DEFAULT_ON_MESSAGE_TIMEOUT, job)
+	}
 }
 
 func (w *ListWatcher) Close() {
diff --git a/server/service/notification/notification_healthchecker.go b/server/service/notification/notification_healthchecker.go
index e3b8b74..310275b 100644
--- a/server/service/notification/notification_healthchecker.go
+++ b/server/service/notification/notification_healthchecker.go
@@ -36,8 +36,15 @@ type NotifyServiceHealthCheckJob struct {
 func (s *NotifyServiceHealthChecker) OnMessage(job NotifyJob) {
 	j := job.(*NotifyServiceHealthCheckJob)
 	err := j.ErrorSubscriber.Err()
-	util.Logger().Warnf(err, "notify server remove watcher %s %s",
-		j.ErrorSubscriber.Subject(), j.ErrorSubscriber.Id())
+
+	if j.ErrorSubscriber.Type() == NOTIFTY {
+		util.Logger().Errorf(nil, "remove %s watcher %s %s failed, here cause a dead lock",
+			j.ErrorSubscriber.Type(), j.ErrorSubscriber.Subject(), j.ErrorSubscriber.Id())
+		return
+	}
+
+	util.Logger().Warnf(err, "notification service remove %s watcher %s %s",
+		j.ErrorSubscriber.Type(), j.ErrorSubscriber.Subject(), j.ErrorSubscriber.Id())
 	s.Service().RemoveSubscriber(j.ErrorSubscriber)
 }
 
diff --git a/server/service/notification/notification_service.go b/server/service/notification/notification_service.go
index c76d22d..e2d7864 100644
--- a/server/service/notification/notification_service.go
+++ b/server/service/notification/notification_service.go
@@ -73,15 +73,15 @@ func (s *NotifyService) AddSubscriber(n Subscriber) error {
 
 	sr, ok := ss[n.Subject()]
 	if !ok {
-		sr = make(subscriberIndex)
-		ss[n.Subject()] = sr
+		sr = make(subscriberIndex, DEFAULT_INIT_SUBSCRIBERS)
+		ss[n.Subject()] = sr // add a subscriber
 	}
 
 	ns, ok := sr[n.Id()]
 	if !ok {
 		ns = list.New()
 	}
-	ns.PushBack(n)
+	ns.PushBack(n) // add a connection
 	sr[n.Id()] = ns
 
 	n.SetService(s)
@@ -170,7 +170,7 @@ func (s *NotifyService) publish2Subscriber(t NotifyType) {
 				ns, ok := m[job.SubscriberId()]
 				if ok {
 					for n := ns.Front(); n != nil; n = n.Next() {
-						go n.Value.(Subscriber).OnMessage(job)
+						n.Value.(Subscriber).OnMessage(job)
 					}
 				}
 				s.mutexes[t].Unlock()
@@ -179,7 +179,7 @@ func (s *NotifyService) publish2Subscriber(t NotifyType) {
 			for key := range m {
 				ns := m[key]
 				for n := ns.Front(); n != nil; n = n.Next() {
-					go n.Value.(Subscriber).OnMessage(job)
+					n.Value.(Subscriber).OnMessage(job)
 				}
 			}
 		}
@@ -199,12 +199,12 @@ func (s *NotifyService) init() {
 		s.Config.MaxQueue = DEFAULT_MAX_QUEUE
 	}
 
-	s.services = make(serviceIndex)
+	s.services = make(serviceIndex, typeEnd)
 	s.err = make(chan error, 1)
-	s.queues = make(map[NotifyType]chan NotifyJob)
-	s.mutexes = make(map[NotifyType]*sync.Mutex)
+	s.queues = make(map[NotifyType]chan NotifyJob, typeEnd)
+	s.mutexes = make(map[NotifyType]*sync.Mutex, typeEnd)
 	for i := NotifyType(0); i != typeEnd; i++ {
-		s.services[i] = make(subscriberSubjectIndex)
+		s.services[i] = make(subscriberSubjectIndex, DEFAULT_INIT_SUBSCRIBERS)
 		s.queues[i] = make(chan NotifyJob, s.Config.MaxQueue)
 		s.mutexes[i] = &sync.Mutex{}
 		s.waits.Add(1)
diff --git a/server/service/notification/struct.go b/server/service/notification/struct.go
index be53e6b..f6564ff 100644
--- a/server/service/notification/struct.go
+++ b/server/service/notification/struct.go
@@ -24,8 +24,10 @@ import (
 )
 
 const (
-	DEFAULT_MAX_QUEUE = 1000
-	DEFAULT_TIMEOUT   = 30 * time.Second
+	DEFAULT_MAX_QUEUE          = 1000
+	DEFAULT_INIT_SUBSCRIBERS   = 1000
+	DEFAULT_ON_MESSAGE_TIMEOUT = 100 * time.Millisecond
+	DEFAULT_TIMEOUT            = 30 * time.Second
 
 	NOTIFTY NotifyType = iota
 	INSTANCE
@@ -61,6 +63,7 @@ type Subscriber interface {
 	Service() *NotifyService
 	SetService(*NotifyService)
 	OnAccept()
+	// The event bus will callback this function, so it must be non-blocked.
 	OnMessage(job NotifyJob)
 	Close()
 }
diff --git a/server/service/util/instance_util.go b/server/service/util/instance_util.go
index 77e91f7..117ce38 100644
--- a/server/service/util/instance_util.go
+++ b/server/service/util/instance_util.go
@@ -245,7 +245,7 @@ func QueryAllProvidersInstances(ctx context.Context, selfServiceId string) (resu
 			}
 			results = append(results, &pb.WatchInstanceResponse{
 				Response: pb.CreateResponse(pb.Response_SUCCESS, "List instance successfully."),
-				Action:   string(pb.EVT_CREATE),
+				Action:   string(pb.EVT_INIT),
 				Key: &pb.MicroServiceKey{
 					Environment: service.Environment,
 					AppId:       service.AppId,

-- 
To stop receiving notification emails like this one, please contact
['"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>'].