You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by li...@apache.org on 2018/01/08 01:54:34 UTC
[incubator-servicecomb-service-center] branch master updated:
SCB-159 Watcher will get a wrong event sequence when SC modify resource
concurrently. (#245)
This is an automated email from the ASF dual-hosted git repository.
littlecui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-service-center.git
The following commit(s) were added to refs/heads/master by this push:
new 7c45b82 SCB-159 Watcher will get a wrong event sequence when SC modify resource concurrently. (#245)
7c45b82 is described below
commit 7c45b824abd34d50afe9d7c74e340cf0ec432b19
Author: little-cui <su...@qq.com>
AuthorDate: Mon Jan 8 09:54:32 2018 +0800
SCB-159 Watcher will get a wrong event sequence when SC modify resource concurrently. (#245)
* SCB-127 Indexer search with prefix in cache always return 0.
* SCB-159 Watcher will get a wrong event sequence when SC modify resource concurrently.
---
pkg/util/log.go | 4 ++--
server/core/backend/store/cacher.go | 14 +++++++++-----
server/core/backend/store/defer.go | 7 +------
server/core/backend/store/indexer.go | 11 ++---------
server/govern/service.go | 2 +-
server/service/notification/listwatcher.go | 20 +++++++++++++++++---
.../notification/notification_healthchecker.go | 11 +++++++++--
server/service/notification/notification_service.go | 18 +++++++++---------
server/service/notification/struct.go | 7 +++++--
server/service/util/instance_util.go | 2 +-
10 files changed, 56 insertions(+), 40 deletions(-)
diff --git a/pkg/util/log.go b/pkg/util/log.go
index cc8e462..00de0b8 100644
--- a/pkg/util/log.go
+++ b/pkg/util/log.go
@@ -119,12 +119,12 @@ func getCalleeFuncName() string {
for i := 2; i <= 4; i++ {
pc, file, _, ok := runtime.Caller(i)
- if strings.Index(file, "log.go") > 0 {
+ if strings.Index(file, "/log.go") > 0 {
continue
}
if ok {
- idx := strings.LastIndex(file, "src")
+ idx := strings.LastIndex(file, "/src/")
switch {
case idx >= 0:
fullName = file[idx+4:]
diff --git a/server/core/backend/store/cacher.go b/server/core/backend/store/cacher.go
index b1ab867..7ca82cf 100644
--- a/server/core/backend/store/cacher.go
+++ b/server/core/backend/store/cacher.go
@@ -41,6 +41,7 @@ type Cache interface {
Version() int64
Data(interface{}) interface{}
Have(interface{}) bool
+ Size() int
}
type Cacher interface {
@@ -65,6 +66,10 @@ func (n *nullCache) Have(interface{}) bool {
return false
}
+func (n *nullCache) Size() int {
+ return 0
+}
+
type nullCacher struct {
}
@@ -128,15 +133,15 @@ func (c *KvCache) Unlock() {
if c.size >= l &&
c.lastMaxSize > c.size*DEFAULT_COMPACT_TIMES &&
time.Now().Sub(c.lastRefresh) >= DEFAULT_COMPACT_TIMEOUT {
- util.Logger().Infof("cache is empty and not in use over %s, compact capacity to size %d->%d",
- DEFAULT_COMPACT_TIMEOUT, c.lastMaxSize, c.size)
+ util.Logger().Infof("cache %s is not in use over %s, compact capacity to size %d->%d",
+ c.owner.Cfg.Key, DEFAULT_COMPACT_TIMEOUT, c.lastMaxSize, c.size)
// gc
newCache := make(map[string]*mvccpb.KeyValue, c.size)
for k, v := range c.store {
newCache[k] = v
}
c.store = newCache
- c.lastMaxSize = c.size
+ c.lastMaxSize = l
c.lastRefresh = time.Now()
}
c.rwMux.Unlock()
@@ -313,7 +318,7 @@ func (c *KvCacher) filter(rev int64, items []*mvccpb.KeyValue) []*Event {
max = nc
}
- newStore := make(map[string]*mvccpb.KeyValue)
+ newStore := make(map[string]*mvccpb.KeyValue, nc)
for _, kv := range items {
newStore[util.BytesToStringWithNoCopy(kv.Key)] = kv
}
@@ -532,7 +537,6 @@ func NewKvCache(c *KvCacher, size int) *KvCache {
return &KvCache{
owner: c,
size: size,
- lastMaxSize: size,
store: make(map[string]*mvccpb.KeyValue, size),
lastRefresh: time.Now(),
}
diff --git a/server/core/backend/store/defer.go b/server/core/backend/store/defer.go
index 1c45a54..7f7bbfa 100644
--- a/server/core/backend/store/defer.go
+++ b/server/core/backend/store/defer.go
@@ -46,12 +46,7 @@ func (iedh *InstanceEventDeferHandler) deferMode(total int, del int) bool {
}
func (iedh *InstanceEventDeferHandler) needDefer(cache Cache, evts []*Event) bool {
- kvCache, ok := cache.(*KvCache)
- if !ok {
- return false
- }
-
- if !iedh.deferMode(kvCache.Size(), len(evts)) {
+ if !iedh.deferMode(cache.Size(), len(evts)) {
return false
}
diff --git a/server/core/backend/store/indexer.go b/server/core/backend/store/indexer.go
index 65349bd..528922d 100644
--- a/server/core/backend/store/indexer.go
+++ b/server/core/backend/store/indexer.go
@@ -224,11 +224,7 @@ func (i *Indexer) getPrefixKey(arr *[]string, prefix string) (count int) {
}
for key := range keysRef {
- var childs *[]string = nil
- if arr != nil {
- childs = &[]string{}
- }
- n := i.getPrefixKey(childs, key)
+ n := i.getPrefixKey(arr, key)
if n == 0 {
count += len(keysRef)
if arr != nil {
@@ -238,10 +234,7 @@ func (i *Indexer) getPrefixKey(arr *[]string, prefix string) (count int) {
}
break
}
- /*count += n
- if arr != nil {
- *arr = append(*arr, *childs...)
- }*/
+ count += n
}
return count
}
diff --git a/server/govern/service.go b/server/govern/service.go
index e059aab..15e36d1 100644
--- a/server/govern/service.go
+++ b/server/govern/service.go
@@ -256,7 +256,7 @@ func getSchemaInfoUtil(ctx context.Context, domainProject string, serviceId stri
registry.WithStrKey(key),
registry.WithPrefix())
if err != nil {
- util.Logger().Errorf(err, "Get schema failed,%s")
+ util.Logger().Errorf(err, "Get schema failed")
return make([]*pb.Schema, 0), err
}
schemas := make([]*pb.Schema, 0, len(resp.Kvs))
diff --git a/server/service/notification/listwatcher.go b/server/service/notification/listwatcher.go
index f90eefc..8a34076 100644
--- a/server/service/notification/listwatcher.go
+++ b/server/service/notification/listwatcher.go
@@ -19,6 +19,7 @@ package notification
import (
"github.com/apache/incubator-servicecomb-service-center/pkg/util"
pb "github.com/apache/incubator-servicecomb-service-center/server/core/proto"
+ "time"
)
// 状态变化推送
@@ -64,7 +65,14 @@ func (w *ListWatcher) OnMessage(job NotifyJob) {
return
}
- <-w.listCh
+ select {
+ case <-w.listCh:
+ case <-time.After(DEFAULT_ON_MESSAGE_TIMEOUT):
+ util.Logger().Errorf(nil,
+ "the %s listwatcher %s %s is not ready[over %s], drop the event %v",
+ w.Type(), w.Id(), w.Subject(), DEFAULT_ON_MESSAGE_TIMEOUT, job)
+ return
+ }
if job.(*WatchJob).Revision <= w.ListRevision {
util.Logger().Warnf(nil,
@@ -76,10 +84,16 @@ func (w *ListWatcher) OnMessage(job NotifyJob) {
}
func (w *ListWatcher) sendMessage(job NotifyJob) {
- util.Logger().Debugf("start notify %s watcher %s %s, job is %v, current revision is %v", w.Type(),
+ util.Logger().Debugf("start to notify %s watcher %s %s, job is %v, current revision is %v", w.Type(),
w.Id(), w.Subject(), job, w.ListRevision)
defer util.RecoverAndReport()
- w.Job <- job
+ select {
+ case w.Job <- job:
+ case <-time.After(DEFAULT_ON_MESSAGE_TIMEOUT):
+ util.Logger().Errorf(nil,
+ "the %s watcher %s %s event queue is full[over %s], drop the event %v",
+ w.Type(), w.Id(), w.Subject(), DEFAULT_ON_MESSAGE_TIMEOUT, job)
+ }
}
func (w *ListWatcher) Close() {
diff --git a/server/service/notification/notification_healthchecker.go b/server/service/notification/notification_healthchecker.go
index e3b8b74..310275b 100644
--- a/server/service/notification/notification_healthchecker.go
+++ b/server/service/notification/notification_healthchecker.go
@@ -36,8 +36,15 @@ type NotifyServiceHealthCheckJob struct {
func (s *NotifyServiceHealthChecker) OnMessage(job NotifyJob) {
j := job.(*NotifyServiceHealthCheckJob)
err := j.ErrorSubscriber.Err()
- util.Logger().Warnf(err, "notify server remove watcher %s %s",
- j.ErrorSubscriber.Subject(), j.ErrorSubscriber.Id())
+
+ if j.ErrorSubscriber.Type() == NOTIFTY {
+ util.Logger().Errorf(nil, "remove %s watcher %s %s failed, here cause a dead lock",
+ j.ErrorSubscriber.Type(), j.ErrorSubscriber.Subject(), j.ErrorSubscriber.Id())
+ return
+ }
+
+ util.Logger().Warnf(err, "notification service remove %s watcher %s %s",
+ j.ErrorSubscriber.Type(), j.ErrorSubscriber.Subject(), j.ErrorSubscriber.Id())
s.Service().RemoveSubscriber(j.ErrorSubscriber)
}
diff --git a/server/service/notification/notification_service.go b/server/service/notification/notification_service.go
index c76d22d..e2d7864 100644
--- a/server/service/notification/notification_service.go
+++ b/server/service/notification/notification_service.go
@@ -73,15 +73,15 @@ func (s *NotifyService) AddSubscriber(n Subscriber) error {
sr, ok := ss[n.Subject()]
if !ok {
- sr = make(subscriberIndex)
- ss[n.Subject()] = sr
+ sr = make(subscriberIndex, DEFAULT_INIT_SUBSCRIBERS)
+ ss[n.Subject()] = sr // add a subscriber
}
ns, ok := sr[n.Id()]
if !ok {
ns = list.New()
}
- ns.PushBack(n)
+ ns.PushBack(n) // add a connection
sr[n.Id()] = ns
n.SetService(s)
@@ -170,7 +170,7 @@ func (s *NotifyService) publish2Subscriber(t NotifyType) {
ns, ok := m[job.SubscriberId()]
if ok {
for n := ns.Front(); n != nil; n = n.Next() {
- go n.Value.(Subscriber).OnMessage(job)
+ n.Value.(Subscriber).OnMessage(job)
}
}
s.mutexes[t].Unlock()
@@ -179,7 +179,7 @@ func (s *NotifyService) publish2Subscriber(t NotifyType) {
for key := range m {
ns := m[key]
for n := ns.Front(); n != nil; n = n.Next() {
- go n.Value.(Subscriber).OnMessage(job)
+ n.Value.(Subscriber).OnMessage(job)
}
}
}
@@ -199,12 +199,12 @@ func (s *NotifyService) init() {
s.Config.MaxQueue = DEFAULT_MAX_QUEUE
}
- s.services = make(serviceIndex)
+ s.services = make(serviceIndex, typeEnd)
s.err = make(chan error, 1)
- s.queues = make(map[NotifyType]chan NotifyJob)
- s.mutexes = make(map[NotifyType]*sync.Mutex)
+ s.queues = make(map[NotifyType]chan NotifyJob, typeEnd)
+ s.mutexes = make(map[NotifyType]*sync.Mutex, typeEnd)
for i := NotifyType(0); i != typeEnd; i++ {
- s.services[i] = make(subscriberSubjectIndex)
+ s.services[i] = make(subscriberSubjectIndex, DEFAULT_INIT_SUBSCRIBERS)
s.queues[i] = make(chan NotifyJob, s.Config.MaxQueue)
s.mutexes[i] = &sync.Mutex{}
s.waits.Add(1)
diff --git a/server/service/notification/struct.go b/server/service/notification/struct.go
index be53e6b..f6564ff 100644
--- a/server/service/notification/struct.go
+++ b/server/service/notification/struct.go
@@ -24,8 +24,10 @@ import (
)
const (
- DEFAULT_MAX_QUEUE = 1000
- DEFAULT_TIMEOUT = 30 * time.Second
+ DEFAULT_MAX_QUEUE = 1000
+ DEFAULT_INIT_SUBSCRIBERS = 1000
+ DEFAULT_ON_MESSAGE_TIMEOUT = 100 * time.Millisecond
+ DEFAULT_TIMEOUT = 30 * time.Second
NOTIFTY NotifyType = iota
INSTANCE
@@ -61,6 +63,7 @@ type Subscriber interface {
Service() *NotifyService
SetService(*NotifyService)
OnAccept()
+ // The event bus will callback this function, so it must be non-blocked.
OnMessage(job NotifyJob)
Close()
}
diff --git a/server/service/util/instance_util.go b/server/service/util/instance_util.go
index 77e91f7..117ce38 100644
--- a/server/service/util/instance_util.go
+++ b/server/service/util/instance_util.go
@@ -245,7 +245,7 @@ func QueryAllProvidersInstances(ctx context.Context, selfServiceId string) (resu
}
results = append(results, &pb.WatchInstanceResponse{
Response: pb.CreateResponse(pb.Response_SUCCESS, "List instance successfully."),
- Action: string(pb.EVT_CREATE),
+ Action: string(pb.EVT_INIT),
Key: &pb.MicroServiceKey{
Environment: service.Environment,
AppId: service.AppId,
--
To stop receiving notification emails like this one, please contact
['"commits@servicecomb.apache.org" <co...@servicecomb.apache.org>'].