You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@servicecomb.apache.org by GitBox <gi...@apache.org> on 2018/01/08 01:54:37 UTC

[GitHub] little-cui closed pull request #245: SCB-159 Watcher will get a wrong event sequence when SC modify resource concurrently.

little-cui closed pull request #245: SCB-159 Watcher will get a wrong event sequence when SC modify resource concurrently.
URL: https://github.com/apache/incubator-servicecomb-service-center/pull/245
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pkg/util/log.go b/pkg/util/log.go
index cc8e462a..00de0b8f 100644
--- a/pkg/util/log.go
+++ b/pkg/util/log.go
@@ -119,12 +119,12 @@ func getCalleeFuncName() string {
 	for i := 2; i <= 4; i++ {
 		pc, file, _, ok := runtime.Caller(i)
 
-		if strings.Index(file, "log.go") > 0 {
+		if strings.Index(file, "/log.go") > 0 {
 			continue
 		}
 
 		if ok {
-			idx := strings.LastIndex(file, "src")
+			idx := strings.LastIndex(file, "/src/")
 			switch {
 			case idx >= 0:
 				fullName = file[idx+4:]
diff --git a/server/core/backend/store/cacher.go b/server/core/backend/store/cacher.go
index b1ab8677..7ca82cf2 100644
--- a/server/core/backend/store/cacher.go
+++ b/server/core/backend/store/cacher.go
@@ -41,6 +41,7 @@ type Cache interface {
 	Version() int64
 	Data(interface{}) interface{}
 	Have(interface{}) bool
+	Size() int
 }
 
 type Cacher interface {
@@ -65,6 +66,10 @@ func (n *nullCache) Have(interface{}) bool {
 	return false
 }
 
+func (n *nullCache) Size() int {
+	return 0
+}
+
 type nullCacher struct {
 }
 
@@ -128,15 +133,15 @@ func (c *KvCache) Unlock() {
 	if c.size >= l &&
 		c.lastMaxSize > c.size*DEFAULT_COMPACT_TIMES &&
 		time.Now().Sub(c.lastRefresh) >= DEFAULT_COMPACT_TIMEOUT {
-		util.Logger().Infof("cache is empty and not in use over %s, compact capacity to size %d->%d",
-			DEFAULT_COMPACT_TIMEOUT, c.lastMaxSize, c.size)
+		util.Logger().Infof("cache %s is not in use over %s, compact capacity to size %d->%d",
+			c.owner.Cfg.Key, DEFAULT_COMPACT_TIMEOUT, c.lastMaxSize, c.size)
 		// gc
 		newCache := make(map[string]*mvccpb.KeyValue, c.size)
 		for k, v := range c.store {
 			newCache[k] = v
 		}
 		c.store = newCache
-		c.lastMaxSize = c.size
+		c.lastMaxSize = l
 		c.lastRefresh = time.Now()
 	}
 	c.rwMux.Unlock()
@@ -313,7 +318,7 @@ func (c *KvCacher) filter(rev int64, items []*mvccpb.KeyValue) []*Event {
 		max = nc
 	}
 
-	newStore := make(map[string]*mvccpb.KeyValue)
+	newStore := make(map[string]*mvccpb.KeyValue, nc)
 	for _, kv := range items {
 		newStore[util.BytesToStringWithNoCopy(kv.Key)] = kv
 	}
@@ -532,7 +537,6 @@ func NewKvCache(c *KvCacher, size int) *KvCache {
 	return &KvCache{
 		owner:       c,
 		size:        size,
-		lastMaxSize: size,
 		store:       make(map[string]*mvccpb.KeyValue, size),
 		lastRefresh: time.Now(),
 	}
diff --git a/server/core/backend/store/defer.go b/server/core/backend/store/defer.go
index 1c45a54b..7f7bbfae 100644
--- a/server/core/backend/store/defer.go
+++ b/server/core/backend/store/defer.go
@@ -46,12 +46,7 @@ func (iedh *InstanceEventDeferHandler) deferMode(total int, del int) bool {
 }
 
 func (iedh *InstanceEventDeferHandler) needDefer(cache Cache, evts []*Event) bool {
-	kvCache, ok := cache.(*KvCache)
-	if !ok {
-		return false
-	}
-
-	if !iedh.deferMode(kvCache.Size(), len(evts)) {
+	if !iedh.deferMode(cache.Size(), len(evts)) {
 		return false
 	}
 
diff --git a/server/core/backend/store/indexer.go b/server/core/backend/store/indexer.go
index 65349bdf..528922da 100644
--- a/server/core/backend/store/indexer.go
+++ b/server/core/backend/store/indexer.go
@@ -224,11 +224,7 @@ func (i *Indexer) getPrefixKey(arr *[]string, prefix string) (count int) {
 	}
 
 	for key := range keysRef {
-		var childs *[]string = nil
-		if arr != nil {
-			childs = &[]string{}
-		}
-		n := i.getPrefixKey(childs, key)
+		n := i.getPrefixKey(arr, key)
 		if n == 0 {
 			count += len(keysRef)
 			if arr != nil {
@@ -238,10 +234,7 @@ func (i *Indexer) getPrefixKey(arr *[]string, prefix string) (count int) {
 			}
 			break
 		}
-		/*count += n
-		if arr != nil {
-			*arr = append(*arr, *childs...)
-		}*/
+		count += n
 	}
 	return count
 }
diff --git a/server/govern/service.go b/server/govern/service.go
index e059aabd..15e36d11 100644
--- a/server/govern/service.go
+++ b/server/govern/service.go
@@ -256,7 +256,7 @@ func getSchemaInfoUtil(ctx context.Context, domainProject string, serviceId stri
 		registry.WithStrKey(key),
 		registry.WithPrefix())
 	if err != nil {
-		util.Logger().Errorf(err, "Get schema failed,%s")
+		util.Logger().Errorf(err, "Get schema failed")
 		return make([]*pb.Schema, 0), err
 	}
 	schemas := make([]*pb.Schema, 0, len(resp.Kvs))
diff --git a/server/service/notification/listwatcher.go b/server/service/notification/listwatcher.go
index f90eefc1..8a340760 100644
--- a/server/service/notification/listwatcher.go
+++ b/server/service/notification/listwatcher.go
@@ -19,6 +19,7 @@ package notification
 import (
 	"github.com/apache/incubator-servicecomb-service-center/pkg/util"
 	pb "github.com/apache/incubator-servicecomb-service-center/server/core/proto"
+	"time"
 )
 
 // ??????
@@ -64,7 +65,14 @@ func (w *ListWatcher) OnMessage(job NotifyJob) {
 		return
 	}
 
-	<-w.listCh
+	select {
+	case <-w.listCh:
+	case <-time.After(DEFAULT_ON_MESSAGE_TIMEOUT):
+		util.Logger().Errorf(nil,
+			"the %s listwatcher %s %s is not ready[over %s], drop the event %v",
+			w.Type(), w.Id(), w.Subject(), DEFAULT_ON_MESSAGE_TIMEOUT, job)
+		return
+	}
 
 	if job.(*WatchJob).Revision <= w.ListRevision {
 		util.Logger().Warnf(nil,
@@ -76,10 +84,16 @@ func (w *ListWatcher) OnMessage(job NotifyJob) {
 }
 
 func (w *ListWatcher) sendMessage(job NotifyJob) {
-	util.Logger().Debugf("start notify %s watcher %s %s, job is %v, current revision is %v", w.Type(),
+	util.Logger().Debugf("start to notify %s watcher %s %s, job is %v, current revision is %v", w.Type(),
 		w.Id(), w.Subject(), job, w.ListRevision)
 	defer util.RecoverAndReport()
-	w.Job <- job
+	select {
+	case w.Job <- job:
+	case <-time.After(DEFAULT_ON_MESSAGE_TIMEOUT):
+		util.Logger().Errorf(nil,
+			"the %s watcher %s %s event queue is full[over %s], drop the event %v",
+			w.Type(), w.Id(), w.Subject(), DEFAULT_ON_MESSAGE_TIMEOUT, job)
+	}
 }
 
 func (w *ListWatcher) Close() {
diff --git a/server/service/notification/notification_healthchecker.go b/server/service/notification/notification_healthchecker.go
index e3b8b740..310275b7 100644
--- a/server/service/notification/notification_healthchecker.go
+++ b/server/service/notification/notification_healthchecker.go
@@ -36,8 +36,15 @@ type NotifyServiceHealthCheckJob struct {
 func (s *NotifyServiceHealthChecker) OnMessage(job NotifyJob) {
 	j := job.(*NotifyServiceHealthCheckJob)
 	err := j.ErrorSubscriber.Err()
-	util.Logger().Warnf(err, "notify server remove watcher %s %s",
-		j.ErrorSubscriber.Subject(), j.ErrorSubscriber.Id())
+
+	if j.ErrorSubscriber.Type() == NOTIFTY {
+		util.Logger().Errorf(nil, "remove %s watcher %s %s failed, here cause a dead lock",
+			j.ErrorSubscriber.Type(), j.ErrorSubscriber.Subject(), j.ErrorSubscriber.Id())
+		return
+	}
+
+	util.Logger().Warnf(err, "notification service remove %s watcher %s %s",
+		j.ErrorSubscriber.Type(), j.ErrorSubscriber.Subject(), j.ErrorSubscriber.Id())
 	s.Service().RemoveSubscriber(j.ErrorSubscriber)
 }
 
diff --git a/server/service/notification/notification_service.go b/server/service/notification/notification_service.go
index c76d22d9..e2d78640 100644
--- a/server/service/notification/notification_service.go
+++ b/server/service/notification/notification_service.go
@@ -73,15 +73,15 @@ func (s *NotifyService) AddSubscriber(n Subscriber) error {
 
 	sr, ok := ss[n.Subject()]
 	if !ok {
-		sr = make(subscriberIndex)
-		ss[n.Subject()] = sr
+		sr = make(subscriberIndex, DEFAULT_INIT_SUBSCRIBERS)
+		ss[n.Subject()] = sr // add a subscriber
 	}
 
 	ns, ok := sr[n.Id()]
 	if !ok {
 		ns = list.New()
 	}
-	ns.PushBack(n)
+	ns.PushBack(n) // add a connection
 	sr[n.Id()] = ns
 
 	n.SetService(s)
@@ -170,7 +170,7 @@ func (s *NotifyService) publish2Subscriber(t NotifyType) {
 				ns, ok := m[job.SubscriberId()]
 				if ok {
 					for n := ns.Front(); n != nil; n = n.Next() {
-						go n.Value.(Subscriber).OnMessage(job)
+						n.Value.(Subscriber).OnMessage(job)
 					}
 				}
 				s.mutexes[t].Unlock()
@@ -179,7 +179,7 @@ func (s *NotifyService) publish2Subscriber(t NotifyType) {
 			for key := range m {
 				ns := m[key]
 				for n := ns.Front(); n != nil; n = n.Next() {
-					go n.Value.(Subscriber).OnMessage(job)
+					n.Value.(Subscriber).OnMessage(job)
 				}
 			}
 		}
@@ -199,12 +199,12 @@ func (s *NotifyService) init() {
 		s.Config.MaxQueue = DEFAULT_MAX_QUEUE
 	}
 
-	s.services = make(serviceIndex)
+	s.services = make(serviceIndex, typeEnd)
 	s.err = make(chan error, 1)
-	s.queues = make(map[NotifyType]chan NotifyJob)
-	s.mutexes = make(map[NotifyType]*sync.Mutex)
+	s.queues = make(map[NotifyType]chan NotifyJob, typeEnd)
+	s.mutexes = make(map[NotifyType]*sync.Mutex, typeEnd)
 	for i := NotifyType(0); i != typeEnd; i++ {
-		s.services[i] = make(subscriberSubjectIndex)
+		s.services[i] = make(subscriberSubjectIndex, DEFAULT_INIT_SUBSCRIBERS)
 		s.queues[i] = make(chan NotifyJob, s.Config.MaxQueue)
 		s.mutexes[i] = &sync.Mutex{}
 		s.waits.Add(1)
diff --git a/server/service/notification/struct.go b/server/service/notification/struct.go
index be53e6b5..f6564ffa 100644
--- a/server/service/notification/struct.go
+++ b/server/service/notification/struct.go
@@ -24,8 +24,10 @@ import (
 )
 
 const (
-	DEFAULT_MAX_QUEUE = 1000
-	DEFAULT_TIMEOUT   = 30 * time.Second
+	DEFAULT_MAX_QUEUE          = 1000
+	DEFAULT_INIT_SUBSCRIBERS   = 1000
+	DEFAULT_ON_MESSAGE_TIMEOUT = 100 * time.Millisecond
+	DEFAULT_TIMEOUT            = 30 * time.Second
 
 	NOTIFTY NotifyType = iota
 	INSTANCE
@@ -61,6 +63,7 @@ type Subscriber interface {
 	Service() *NotifyService
 	SetService(*NotifyService)
 	OnAccept()
+	// The event bus will callback this function, so it must be non-blocked.
 	OnMessage(job NotifyJob)
 	Close()
 }
diff --git a/server/service/util/instance_util.go b/server/service/util/instance_util.go
index 77e91f7c..117ce388 100644
--- a/server/service/util/instance_util.go
+++ b/server/service/util/instance_util.go
@@ -245,7 +245,7 @@ func QueryAllProvidersInstances(ctx context.Context, selfServiceId string) (resu
 			}
 			results = append(results, &pb.WatchInstanceResponse{
 				Response: pb.CreateResponse(pb.Response_SUCCESS, "List instance successfully."),
-				Action:   string(pb.EVT_CREATE),
+				Action:   string(pb.EVT_INIT),
 				Key: &pb.MicroServiceKey{
 					Environment: service.Environment,
 					AppId:       service.AppId,


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services