You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by as...@apache.org on 2018/01/29 04:39:21 UTC
[incubator-servicecomb-service-center] branch master updated:
SCB-296 Self preservation will never stop when SC do list-watch loop (#261)
This is an automated email from the ASF dual-hosted git repository.
asifdxtreme pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-service-center.git
The following commit(s) were added to refs/heads/master by this push:
new a4b6ce4 SCB-296 Self preservation will never stop when SC do list-watch loop (#261)
a4b6ce4 is described below
commit a4b6ce4d60309361940b9039b9a16ce641af183a
Author: little-cui <su...@qq.com>
AuthorDate: Mon Jan 29 12:39:18 2018 +0800
SCB-296 Self preservation will never stop when SC do list-watch loop (#261)
* SCB-296 Self preservation will never stop when SC do list-watch loop
---
etc/conf/app.conf | 5 +-
integration/instances_test.go | 2 +-
pkg/chain/chain.go | 2 +-
pkg/httplimiter/httpratelimiter.go | 13 +-
pkg/logrotate/logrotate.go | 4 +-
pkg/ratelimiter/ratelimiter.go | 5 +-
pkg/rest/client.go | 2 +-
pkg/rest/route.go | 16 +-
server/core/backend/store/cacher.go | 104 ++++++++-----
server/core/backend/store/defer.go | 173 +++++++++++----------
server/core/backend/store/defer_test.go | 141 +++++++++++++++++
server/core/backend/store/indexer.go | 5 -
server/core/backend/store/listwatch.go | 10 +-
server/core/backend/store/opt.go | 4 +-
server/core/backend/store/store.go | 7 -
server/infra/quota/quota.go | 2 +-
server/plugin/infra/registry/etcd/etcd.go | 75 ++++-----
.../rest/controller/v4/microservice_controller.go | 1 -
server/service/event/dependency_event_handler.go | 10 +-
server/service/instances.go | 8 +-
server/service/instances_test.go | 2 +-
server/service/microservices.go | 3 +-
server/service/rule.go | 1 -
server/service/util/dependency.go | 8 +-
24 files changed, 376 insertions(+), 227 deletions(-)
diff --git a/etc/conf/app.conf b/etc/conf/app.conf
index 9d54f2d..a48f08c 100644
--- a/etc/conf/app.conf
+++ b/etc/conf/app.conf
@@ -26,11 +26,12 @@ plugins_dir = ./plugins
registry_plugin = etcd
# registry address
-# registry_plugin equals to 'embeded_etcd', example:
+# 1. if registry_plugin equals to 'embeded_etcd'
# manager_name = "sc-0"
# manager_addr = "http://127.0.0.1:2380"
# manager_cluster = "sc-0=http://127.0.0.1:2380"
-# registry_plugin equals to 'etcd'
+# 2. if registry_plugin equals to 'etcd'
+# manager_cluster = "127.0.0.1:2379"
manager_cluster = "127.0.0.1:2379"
#heartbeat that sync synchronizes client's endpoints with the known endpoints from the etcd membership,unit is second.
diff --git a/integration/instances_test.go b/integration/instances_test.go
index 8ab0a78..948c9f6 100644
--- a/integration/instances_test.go
+++ b/integration/instances_test.go
@@ -248,7 +248,7 @@ var _ = Describe("MicroService Api Test", func() {
By("Discover MicroService Instance API", func() {
It("Find Micro-service Info by AppID", func() {
- req, _ := http.NewRequest(GET, SCURL+FINDINSTANCE+"?appId="+serviceAppId+"&serviceName="+serviceName+"&version="+serviceVersion, nil)
+ req, _ := http.NewRequest(GET, SCURL+FINDINSTANCE+"?noCache=1&appId="+serviceAppId+"&serviceName="+serviceName+"&version="+serviceVersion, nil)
req.Header.Set("X-Domain-Name", "default")
req.Header.Set("X-ConsumerId", serviceId)
resp, _ := scclient.Do(req)
diff --git a/pkg/chain/chain.go b/pkg/chain/chain.go
index 8517824..a48fc1c 100644
--- a/pkg/chain/chain.go
+++ b/pkg/chain/chain.go
@@ -60,7 +60,7 @@ func (c *Chain) syncNext(i *Invocation) {
}
func (c *Chain) Next(i *Invocation) {
- go c.syncNext(i)
+ c.syncNext(i)
}
func NewChain(name string, handlers []Handler) Chain {
diff --git a/pkg/httplimiter/httpratelimiter.go b/pkg/httplimiter/httpratelimiter.go
index 9128d5b..3110056 100644
--- a/pkg/httplimiter/httpratelimiter.go
+++ b/pkg/httplimiter/httpratelimiter.go
@@ -18,13 +18,13 @@
package httplimiter
import (
+ "fmt"
+ "github.com/apache/incubator-servicecomb-service-center/pkg/ratelimiter"
"net/http"
"strconv"
"strings"
- "time"
- "github.com/apache/incubator-servicecomb-service-center/pkg/ratelimiter"
- "fmt"
"sync"
+ "time"
)
type HTTPErrorMessage struct {
@@ -36,7 +36,6 @@ func (httpErrorMessage *HTTPErrorMessage) Error() string {
return fmt.Sprintf("%v: %v", httpErrorMessage.StatusCode, httpErrorMessage.Message)
}
-
type HttpLimiter struct {
HttpMessage string
ContentType string
@@ -51,8 +50,6 @@ type HttpLimiter struct {
sync.RWMutex
}
-
-
func LimitBySegments(limiter *HttpLimiter, keys []string) *HTTPErrorMessage {
if limiter.LimitExceeded(strings.Join(keys, "|")) {
return &HTTPErrorMessage{Message: limiter.HttpMessage, StatusCode: limiter.StatusCode}
@@ -199,7 +196,6 @@ func getRemoteIP(ipLookups []string, r *http.Request) string {
return ""
}
-
func NewHttpLimiter(max int64, ttl time.Duration) *HttpLimiter {
limiter := &HttpLimiter{RequestLimit: max, TTL: ttl}
limiter.ContentType = "text/plain; charset=utf-8"
@@ -211,7 +207,6 @@ func NewHttpLimiter(max int64, ttl time.Duration) *HttpLimiter {
return limiter
}
-
func (rateLimiter *HttpLimiter) LimitExceeded(key string) bool {
rateLimiter.Lock()
if _, found := rateLimiter.leakyBuckets[key]; !found {
@@ -224,5 +219,3 @@ func (rateLimiter *HttpLimiter) LimitExceeded(key string) bool {
}
return true
}
-
-
diff --git a/pkg/logrotate/logrotate.go b/pkg/logrotate/logrotate.go
index c2e9e93..76de98d 100644
--- a/pkg/logrotate/logrotate.go
+++ b/pkg/logrotate/logrotate.go
@@ -240,9 +240,9 @@ func LogRotate(path string, MaxFileSize int, MaxBackupCount int) {
}
}
-func isSkip(f os.FileInfo) bool{
+func isSkip(f os.FileInfo) bool {
//dir or non write permission,skip
- return f.IsDir() || (f.Mode() & 0200 == 0000)
+ return f.IsDir() || (f.Mode()&0200 == 0000)
}
//path : where the file will be filtered
diff --git a/pkg/ratelimiter/ratelimiter.go b/pkg/ratelimiter/ratelimiter.go
index 8184108..a46bc39 100644
--- a/pkg/ratelimiter/ratelimiter.go
+++ b/pkg/ratelimiter/ratelimiter.go
@@ -18,8 +18,8 @@
package ratelimiter
import (
- "time"
"sync"
+ "time"
)
type LeakyBucket struct {
@@ -32,7 +32,6 @@ type LeakyBucket struct {
availableTicker int64
}
-
func NewLeakyBucket(fillInterval time.Duration, capacity, quantum int64) *LeakyBucket {
if fillInterval <= 0 {
panic("leaky bucket fill interval is not > 0")
@@ -77,7 +76,6 @@ func (leakyBucket *LeakyBucket) MaximumTakeDuration(count int64, maxWait time.Du
return leakyBucket.take(time.Now(), count, maxWait)
}
-
func (leakyBucket *LeakyBucket) Rate() float64 {
return 1e9 * float64(leakyBucket.quantum) / float64(leakyBucket.interval)
}
@@ -118,4 +116,3 @@ func (leakyBucket *LeakyBucket) adjust(now time.Time) (currentTick int64) {
leakyBucket.availableTicker = currentTick
return
}
-
diff --git a/pkg/rest/client.go b/pkg/rest/client.go
index 3ce4e29..2cd68ab 100644
--- a/pkg/rest/client.go
+++ b/pkg/rest/client.go
@@ -28,9 +28,9 @@ import (
"io/ioutil"
"net"
"net/http"
+ "net/url"
"reflect"
"time"
- "net/url"
)
const (
diff --git a/pkg/rest/route.go b/pkg/rest/route.go
index 6595021..3015aff 100644
--- a/pkg/rest/route.go
+++ b/pkg/rest/route.go
@@ -23,6 +23,7 @@ import (
errorsEx "github.com/apache/incubator-servicecomb-service-center/pkg/errors"
"github.com/apache/incubator-servicecomb-service-center/pkg/util"
"net/http"
+ "net/url"
"strings"
)
@@ -77,7 +78,7 @@ func (this *ROAServerHandler) ServeHTTP(w http.ResponseWriter, r *http.Request)
for _, ph := range this.handlers[r.Method] {
if params, ok := ph.try(r.URL.Path); ok {
if len(params) > 0 {
- r.URL.RawQuery = util.UrlEncode(params) + "&" + r.URL.RawQuery
+ r.URL.RawQuery = params + r.URL.RawQuery
}
this.serve(ph, w, r)
@@ -151,7 +152,7 @@ func (this *ROAServerHandler) serve(ph *urlPatternHandler, w http.ResponseWriter
<-ch
}
-func (this *urlPatternHandler) try(path string) (p map[string]string, _ bool) {
+func (this *urlPatternHandler) try(path string) (p string, _ bool) {
var i, j int
l, sl := len(this.Path), len(path)
for i < sl {
@@ -160,7 +161,7 @@ func (this *urlPatternHandler) try(path string) (p map[string]string, _ bool) {
if this.Path != "/" && l > 0 && this.Path[l-1] == '/' {
return p, true
}
- return nil, false
+ return "", false
case this.Path[j] == ':':
var val string
var nextc byte
@@ -168,19 +169,16 @@ func (this *urlPatternHandler) try(path string) (p map[string]string, _ bool) {
_, nextc, j = match(this.Path, isAlnum, 0, j+1)
val, _, i = match(path, matchParticial, nextc, i)
- if p == nil {
- p = make(map[string]string, 5)
- }
- p[this.Path[o:j]] = val
+ p += url.QueryEscape(this.Path[o:j]) + "=" + url.QueryEscape(val) + "&"
case path[i] == this.Path[j]:
i++
j++
default:
- return nil, false
+ return "", false
}
}
if j != l {
- return nil, false
+ return "", false
}
return p, true
}
diff --git a/server/core/backend/store/cacher.go b/server/core/backend/store/cacher.go
index 7ca82cf..c6ade97 100644
--- a/server/core/backend/store/cacher.go
+++ b/server/core/backend/store/cacher.go
@@ -120,6 +120,15 @@ func (c *KvCache) Have(k interface{}) (ok bool) {
return
}
+func (c *KvCache) RLock() map[string]*mvccpb.KeyValue {
+ c.rwMux.RLock()
+ return c.store
+}
+
+func (c *KvCache) RUnlock() {
+ c.rwMux.RUnlock()
+}
+
func (c *KvCache) Lock() map[string]*mvccpb.KeyValue {
c.rwMux.Lock()
return c.store
@@ -251,15 +260,39 @@ func (c *KvCacher) handleWatcher(watcher *Watcher) error {
}
func (c *KvCacher) needDeferHandle(evts []*Event) bool {
- if c.Cfg.DeferHander == nil {
+ if c.Cfg.DeferHandler == nil {
return false
}
- return c.Cfg.DeferHander.OnCondition(c.Cache(), evts)
+ return c.Cfg.DeferHandler.OnCondition(c.Cache(), evts)
+}
+
+func (c *KvCacher) refresh(stopCh <-chan struct{}) {
+ util.Logger().Debugf("start to list and watch %s", c.Cfg)
+ ctx, cancel := context.WithCancel(context.Background())
+ c.goroute.Do(func(stopCh <-chan struct{}) {
+ defer cancel()
+ <-stopCh
+ })
+ for {
+ start := time.Now()
+ c.ListAndWatch(ctx)
+ watchDuration := time.Since(start)
+ nextPeriod := 0 * time.Second
+ if watchDuration > 0 && c.Cfg.Period > watchDuration {
+ nextPeriod = c.Cfg.Period - watchDuration
+ }
+ select {
+ case <-stopCh:
+ util.Logger().Debugf("stop to list and watch %s", c.Cfg)
+ return
+ case <-time.After(nextPeriod):
+ }
+ }
}
func (c *KvCacher) deferHandle(stopCh <-chan struct{}) {
- if c.Cfg.DeferHander == nil {
+ if c.Cfg.DeferHandler == nil {
return
}
@@ -268,7 +301,7 @@ func (c *KvCacher) deferHandle(stopCh <-chan struct{}) {
select {
case <-stopCh:
return
- case evt, ok := <-c.Cfg.DeferHander.HandleChan():
+ case evt, ok := <-c.Cfg.DeferHandler.HandleChan():
if !ok {
<-time.After(time.Second)
continue
@@ -281,7 +314,7 @@ func (c *KvCacher) deferHandle(stopCh <-chan struct{}) {
evts[i] = evt
i++
- case <-time.After(time.Second):
+ case <-time.After(300 * time.Millisecond):
if i == 0 {
continue
}
@@ -305,9 +338,9 @@ func (c *KvCacher) sync(evts []*Event) {
}
func (c *KvCacher) filter(rev int64, items []*mvccpb.KeyValue) []*Event {
- cache := c.Cache().(*KvCache)
- store := cache.Lock()
- defer cache.Unlock()
+ store := c.cache.RLock()
+ defer c.cache.RUnlock()
+
oc, nc := len(store), len(items)
tc := oc + nc
if tc == 0 {
@@ -359,7 +392,7 @@ func (c *KvCacher) filterDelete(store map[string]*mvccpb.KeyValue, newStore map[
block[i] = &Event{
Revision: rev,
Type: proto.EVT_DELETE,
- Key: c.Cfg.Key,
+ Prefix: c.Cfg.Key,
Object: v,
}
i++
@@ -387,7 +420,7 @@ func (c *KvCacher) filterCreateOrUpdate(store map[string]*mvccpb.KeyValue, newSt
block[i] = &Event{
Revision: rev,
Type: proto.EVT_CREATE,
- Key: c.Cfg.Key,
+ Prefix: c.Cfg.Key,
Object: v,
}
i++
@@ -407,7 +440,7 @@ func (c *KvCacher) filterCreateOrUpdate(store map[string]*mvccpb.KeyValue, newSt
block[i] = &Event{
Revision: rev,
Type: proto.EVT_UPDATE,
- Key: c.Cfg.Key,
+ Prefix: c.Cfg.Key,
Object: v,
}
i++
@@ -424,10 +457,9 @@ func (c *KvCacher) filterCreateOrUpdate(store map[string]*mvccpb.KeyValue, newSt
}
func (c *KvCacher) onEvents(evts []*Event) {
- cache := c.Cache().(*KvCache)
- idx := 0
+ idx, init := 0, !c.IsReady()
kvEvts := make([]*KvEvent, len(evts))
- store := cache.Lock()
+ store := c.cache.Lock()
for _, evt := range evts {
kv := evt.Object.(*mvccpb.KeyValue)
key := util.BytesToStringWithNoCopy(kv.Key)
@@ -456,7 +488,6 @@ func (c *KvCacher) onEvents(evts []*Event) {
Action: t,
KV: kv,
}
- idx++
case proto.EVT_DELETE:
if !ok {
util.Logger().Warnf(nil, "unexpected %s event! key %s does not exist", evt.Type, key)
@@ -470,10 +501,15 @@ func (c *KvCacher) onEvents(evts []*Event) {
Action: evt.Type,
KV: prevKv,
}
- idx++
}
+
+ if init && kvEvts[idx].Action == proto.EVT_CREATE {
+ kvEvts[idx].Action = proto.EVT_INIT
+ }
+
+ idx++
}
- cache.Unlock()
+ c.cache.Unlock()
c.onKvEvents(kvEvts[:idx])
}
@@ -488,30 +524,7 @@ func (c *KvCacher) onKvEvents(evts []*KvEvent) {
}
func (c *KvCacher) run() {
- c.goroute.Do(func(stopCh <-chan struct{}) {
- util.Logger().Debugf("start to list and watch %s", c.Cfg)
- ctx, cancel := context.WithCancel(context.Background())
- c.goroute.Do(func(stopCh <-chan struct{}) {
- defer cancel()
- <-stopCh
- })
- for {
- start := time.Now()
- c.ListAndWatch(ctx)
- watchDuration := time.Now().Sub(start)
- nextPeriod := 0 * time.Second
- if watchDuration > 0 && c.Cfg.Period > watchDuration {
- nextPeriod = c.Cfg.Period - watchDuration
- }
- select {
- case <-stopCh:
- util.Logger().Debugf("stop to list and watch %s", c.Cfg)
- return
- case <-time.After(nextPeriod):
- }
- }
- })
-
+ c.goroute.Do(c.refresh)
c.goroute.Do(c.deferHandle)
}
@@ -533,6 +546,15 @@ func (c *KvCacher) Ready() <-chan struct{} {
return c.ready
}
+func (c *KvCacher) IsReady() bool {
+ select {
+ case <-c.ready:
+ return true
+ default:
+ return false
+ }
+}
+
func NewKvCache(c *KvCacher, size int) *KvCache {
return &KvCache{
owner: c,
diff --git a/server/core/backend/store/defer.go b/server/core/backend/store/defer.go
index 7f7bbfa..173a44a 100644
--- a/server/core/backend/store/defer.go
+++ b/server/core/backend/store/defer.go
@@ -30,122 +30,127 @@ type DeferHandler interface {
HandleChan() <-chan *Event
}
+type deferItem struct {
+ ttl *time.Timer
+ event *Event
+}
+
type InstanceEventDeferHandler struct {
Percent float64
- enabled bool
- events map[string]*Event
- ttls map[string]int64
- mux sync.RWMutex
- deferCh chan *Event
-}
-func (iedh *InstanceEventDeferHandler) deferMode(total int, del int) bool {
- return iedh.Percent > 0 && total > 0 &&
- del > 1 &&
- float64(del/total) >= iedh.Percent
+ cache Cache
+ once sync.Once
+ enabled bool
+ items map[string]*deferItem
+ pendingCh chan []*Event
+ deferCh chan *Event
}
-func (iedh *InstanceEventDeferHandler) needDefer(cache Cache, evts []*Event) bool {
- if !iedh.deferMode(cache.Size(), len(evts)) {
+func (iedh *InstanceEventDeferHandler) OnCondition(cache Cache, evts []*Event) bool {
+ if iedh.Percent <= 0 {
return false
}
- for _, evt := range evts {
- if evt.Type != pb.EVT_DELETE {
- return false
- }
- }
- return true
-}
-
-func (iedh *InstanceEventDeferHandler) init() {
- if iedh.deferCh == nil {
+ iedh.once.Do(func() {
+ iedh.cache = cache
+ iedh.items = make(map[string]*deferItem, event_block_size)
+ iedh.pendingCh = make(chan []*Event, event_block_size)
iedh.deferCh = make(chan *Event, event_block_size)
- }
-
- if iedh.events == nil {
- iedh.events = make(map[string]*Event, event_block_size)
- iedh.ttls = make(map[string]int64, event_block_size)
util.Go(iedh.check)
- }
-}
-
-func (iedh *InstanceEventDeferHandler) OnCondition(cache Cache, evts []*Event) bool {
- iedh.mux.Lock()
- if !iedh.enabled && iedh.needDefer(cache, evts) {
- util.Logger().Warnf(nil, "self preservation is enabled, caught %d(>=%.0f%%) DELETE events",
- len(evts), iedh.Percent*100)
- iedh.enabled = true
- }
+ })
- if !iedh.enabled {
- iedh.mux.Unlock()
- return false
- }
-
- iedh.init()
-
- for _, evt := range evts {
- kv, ok := evt.Object.(*mvccpb.KeyValue)
- if !ok {
- continue
- }
- key := util.BytesToStringWithNoCopy(kv.Key)
- switch evt.Type {
- case pb.EVT_CREATE, pb.EVT_UPDATE:
- delete(iedh.events, key)
- delete(iedh.ttls, key)
+ iedh.pendingCh <- evts
+ return true
+}
+func (iedh *InstanceEventDeferHandler) recoverOrDefer(evt *Event) error {
+ kv := evt.Object.(*mvccpb.KeyValue)
+ key := util.BytesToStringWithNoCopy(kv.Key)
+ _, ok := iedh.items[key]
+ switch evt.Type {
+ case pb.EVT_CREATE, pb.EVT_UPDATE:
+ if ok {
util.Logger().Infof("recovered key %s events", key)
+ // return nil // no need to publish event to subscribers?
+ }
+ iedh.recover(evt)
+ case pb.EVT_DELETE:
+ if ok {
+ return nil
+ }
- iedh.deferCh <- evt
- case pb.EVT_DELETE:
- var instance pb.MicroServiceInstance
- err := json.Unmarshal(kv.Value, &instance)
- if err != nil {
- util.Logger().Errorf(err, "unmarshal instance file failed, key is %s", key)
- continue
- }
- iedh.events[key] = evt
- iedh.ttls[key] = int64(instance.HealthCheck.Interval * (instance.HealthCheck.Times + 1))
+ var instance pb.MicroServiceInstance
+ err := json.Unmarshal(kv.Value, &instance)
+ if err != nil {
+ util.Logger().Errorf(err, "unmarshal instance file failed, key is %s", key)
+ return err
+ }
+ iedh.items[key] = &deferItem{
+ ttl: time.NewTimer(
+ time.Duration(instance.HealthCheck.Interval*(instance.HealthCheck.Times+1)) * time.Second),
+ event: evt,
}
}
- iedh.mux.Unlock()
- return true
+ return nil
+}
+
+func (iedh *InstanceEventDeferHandler) HandleChan() <-chan *Event {
+ return iedh.deferCh
}
func (iedh *InstanceEventDeferHandler) check(stopCh <-chan struct{}) {
defer util.RecoverAndReport()
+ t, n := iedh.newTimer(), false
for {
select {
case <-stopCh:
return
- case <-time.After(time.Second):
- iedh.mux.Lock()
- for key, ttl := range iedh.ttls {
- ttl--
- if ttl > 0 {
- iedh.ttls[key] = ttl
- continue
- }
-
- evt := iedh.events[key]
- delete(iedh.events, key)
- delete(iedh.ttls, key)
+ case evts := <-iedh.pendingCh:
+ for _, evt := range evts {
+ iedh.recoverOrDefer(evt)
+ }
- util.Logger().Warnf(nil, "defer handle timed out, removed key is %s", key)
+ del := len(iedh.items)
+ if del > 0 && !n {
+ t.Stop()
+ t, n = iedh.newTimer(), true
+ }
- iedh.deferCh <- evt
+ total := iedh.cache.Size()
+ if del > 0 && total > 0 && float64(del) >= float64(total)*iedh.Percent {
+ iedh.enabled = true
+ util.Logger().Warnf(nil, "self preservation is enabled, caught %d/%d(>=%.0f%%) DELETE events",
+ del, total, iedh.Percent*100)
+ }
+ case <-t.C:
+ t, n = iedh.newTimer(), false
+
+ for key, item := range iedh.items {
+ if iedh.enabled {
+ select {
+ case <-item.ttl.C:
+ default:
+ continue
+ }
+ util.Logger().Warnf(nil, "defer handle timed out, removed key is %s", key)
+ }
+ iedh.recover(item.event)
}
- if iedh.enabled && len(iedh.ttls) == 0 {
+
+ if iedh.enabled && len(iedh.items) == 0 {
iedh.enabled = false
util.Logger().Warnf(nil, "self preservation is stopped")
}
- iedh.mux.Unlock()
}
}
}
-func (iedh *InstanceEventDeferHandler) HandleChan() <-chan *Event {
- return iedh.deferCh
+func (iedh *InstanceEventDeferHandler) newTimer() *time.Timer {
+ return time.NewTimer(2 * time.Second) // instance DELETE event will be delay.
+}
+
+func (iedh *InstanceEventDeferHandler) recover(evt *Event) {
+ key := util.BytesToStringWithNoCopy(evt.Object.(*mvccpb.KeyValue).Key)
+ delete(iedh.items, key)
+ iedh.deferCh <- evt
}
diff --git a/server/core/backend/store/defer_test.go b/server/core/backend/store/defer_test.go
new file mode 100644
index 0000000..2c9c453
--- /dev/null
+++ b/server/core/backend/store/defer_test.go
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package store
+
+import (
+ "encoding/json"
+ "fmt"
+ "github.com/apache/incubator-servicecomb-service-center/pkg/util"
+ pb "github.com/apache/incubator-servicecomb-service-center/server/core/proto"
+ "github.com/coreos/etcd/mvcc/mvccpb"
+ "testing"
+ "time"
+)
+
+func TestInstanceEventDeferHandler_OnCondition(t *testing.T) {
+ iedh := &InstanceEventDeferHandler{
+ Percent: 0,
+ }
+
+ if iedh.OnCondition(nil, nil) {
+ fmt.Printf(`TestInstanceEventDeferHandler_OnCondition with 0%% failed`)
+ t.FailNow()
+ }
+
+ iedh.Percent = 0.01
+ if !iedh.OnCondition(nil, nil) {
+ fmt.Printf(`TestInstanceEventDeferHandler_OnCondition with 1%% failed`)
+ t.FailNow()
+ }
+}
+
+func TestInstanceEventDeferHandler_HandleChan(t *testing.T) {
+ inst := &pb.MicroServiceInstance{
+ HealthCheck: &pb.HealthCheck{
+ Interval: 4,
+ Times: 0,
+ },
+ }
+ b, _ := json.Marshal(inst)
+ kv1 := &mvccpb.KeyValue{
+ Key: util.StringToBytesWithNoCopy("/1"),
+ Value: b,
+ }
+ kv2 := &mvccpb.KeyValue{
+ Key: util.StringToBytesWithNoCopy("/2"),
+ Value: b,
+ }
+ kv3 := &mvccpb.KeyValue{
+ Key: util.StringToBytesWithNoCopy("/3"),
+ Value: b,
+ }
+
+ cache := NewKvCache(nil, 1)
+ cache.store["/1"] = kv1
+ cache.store["/2"] = kv2
+ cache.store["/3"] = kv3
+
+ evts1 := []*Event{
+ {
+ Type: pb.EVT_CREATE,
+ Object: kv1,
+ },
+ {
+ Type: pb.EVT_UPDATE,
+ Object: kv1,
+ },
+ }
+ evts2 := []*Event{
+ {
+ Type: pb.EVT_DELETE,
+ Object: kv2,
+ },
+ {
+ Type: pb.EVT_DELETE,
+ Object: kv3,
+ },
+ }
+ evts3 := []*Event{
+ {
+ Type: pb.EVT_CREATE,
+ Object: kv2,
+ },
+ }
+
+ iedh := &InstanceEventDeferHandler{
+ Percent: 0.01,
+ }
+
+ iedh.OnCondition(cache, evts1)
+ iedh.OnCondition(cache, evts2)
+ iedh.OnCondition(cache, evts3)
+
+ getEvents(t, iedh)
+
+ iedh.Percent = 0.8
+ iedh.OnCondition(cache, evts1)
+ iedh.OnCondition(cache, evts2)
+ iedh.OnCondition(cache, evts3)
+
+ getEvents(t, iedh)
+}
+
+func getEvents(t *testing.T, iedh *InstanceEventDeferHandler) {
+ fmt.Println(time.Now())
+ c := time.After(3 * time.Second)
+ var evt3 *Event
+ for {
+ select {
+ case evt := <-iedh.HandleChan():
+ fmt.Println(time.Now(), evt)
+ if string(evt.Object.(*mvccpb.KeyValue).Key) == "/3" {
+ evt3 = evt
+ if iedh.Percent == 0.01 && evt.Type == pb.EVT_DELETE {
+ fmt.Printf(`TestInstanceEventDeferHandler_HandleChan with 1%% failed`)
+ t.FailNow()
+ }
+ }
+ continue
+ case <-c:
+ if iedh.Percent == 0.8 && evt3 == nil {
+ fmt.Printf(`TestInstanceEventDeferHandler_HandleChan with 80%% failed`)
+ t.FailNow()
+ }
+ }
+ break
+ }
+}
diff --git a/server/core/backend/store/indexer.go b/server/core/backend/store/indexer.go
index 528922d..7236050 100644
--- a/server/core/backend/store/indexer.go
+++ b/server/core/backend/store/indexer.go
@@ -264,11 +264,6 @@ func (i *Indexer) deletePrefixKey(prefix, key string) {
if !ok {
return
}
- // remove child
- for k := range i.prefixIndex[key] {
- i.deletePrefixKey(key, k)
- }
-
delete(m, key)
// remove parent which has no child
diff --git a/server/core/backend/store/listwatch.go b/server/core/backend/store/listwatch.go
index 9e210bc..189f909 100644
--- a/server/core/backend/store/listwatch.go
+++ b/server/core/backend/store/listwatch.go
@@ -32,7 +32,7 @@ const EVENT_BUS_MAX_SIZE = 1000
type Event struct {
Revision int64
Type proto.EventType
- Key string
+ Prefix string
Object interface{}
}
@@ -93,7 +93,7 @@ func (lw *ListWatcher) doWatch(ctx context.Context, f func(evt []*Event)) error
evts := make([]*Event, len(resp.Kvs))
for i, kv := range resp.Kvs {
- evt := &Event{Key: lw.Key, Revision: kv.ModRevision}
+ evt := &Event{Prefix: lw.Key, Revision: kv.ModRevision}
switch {
case resp.Action == registry.Put && kv.Version == 1:
evt.Type, evt.Object = proto.EVT_CREATE, kv
@@ -148,9 +148,9 @@ func (w *Watcher) process() {
}
}
-func (w *Watcher) sendEvent(evt []*Event) {
+func (w *Watcher) sendEvent(evts []*Event) {
defer util.RecoverAndReport()
- w.bus <- evt
+ w.bus <- evts
}
func (w *Watcher) Stop() {
@@ -168,7 +168,7 @@ func (w *Watcher) Stop() {
func errEvent(key string, err error) *Event {
return &Event{
Type: proto.EVT_ERROR,
- Key: key,
+ Prefix: key,
Object: err,
}
}
diff --git a/server/core/backend/store/opt.go b/server/core/backend/store/opt.go
index 252f244..86142b1 100644
--- a/server/core/backend/store/opt.go
+++ b/server/core/backend/store/opt.go
@@ -34,7 +34,7 @@ type KvCacherCfg struct {
Timeout time.Duration
Period time.Duration
OnEvent KvEventFunc
- DeferHander DeferHandler
+ DeferHandler DeferHandler
}
func (cfg KvCacherCfg) String() string {
@@ -65,7 +65,7 @@ func WithEventFunc(f KvEventFunc) KvCacherCfgOption {
}
func WithDeferHandler(h DeferHandler) KvCacherCfgOption {
- return func(cfg *KvCacherCfg) { cfg.DeferHander = h }
+ return func(cfg *KvCacherCfg) { cfg.DeferHandler = h }
}
func DefaultKvCacherConfig() KvCacherCfg {
diff --git a/server/core/backend/store/store.go b/server/core/backend/store/store.go
index d2f89e7..6a5f38c 100644
--- a/server/core/backend/store/store.go
+++ b/server/core/backend/store/store.go
@@ -124,13 +124,6 @@ func (s *KvStore) Initialize() {
func (s *KvStore) dispatchEvent(t StoreType, evt *KvEvent) {
s.indexers[t].OnCacheEvent(evt)
- select {
- case <-s.Ready():
- default:
- if evt.Action == pb.EVT_CREATE {
- evt.Action = pb.EVT_INIT
- }
- }
EventProxy(t).OnEvent(evt)
}
diff --git a/server/infra/quota/quota.go b/server/infra/quota/quota.go
index 84dc579..acff48e 100644
--- a/server/infra/quota/quota.go
+++ b/server/infra/quota/quota.go
@@ -18,8 +18,8 @@ package quota
import (
"fmt"
- "golang.org/x/net/context"
scerr "github.com/apache/incubator-servicecomb-service-center/server/error"
+ "golang.org/x/net/context"
)
type ApplyQuotaResult struct {
diff --git a/server/plugin/infra/registry/etcd/etcd.go b/server/plugin/infra/registry/etcd/etcd.go
index d848eb9..1e57ee1 100644
--- a/server/plugin/infra/registry/etcd/etcd.go
+++ b/server/plugin/infra/registry/etcd/etcd.go
@@ -501,49 +501,55 @@ func (c *EtcdClient) Watch(ctx context.Context, opts ...registry.PluginOpOption)
return
case resp, ok = <-ws:
if !ok {
- err := errors.New("channel is closed")
- return err
+ err = errors.New("channel is closed")
+ return
}
// cause a rpc ResourceExhausted error if watch response body larger then 4MB
if err = resp.Err(); err != nil {
- return err
+ return
}
- l := len(resp.Events)
- kvs := make([]*mvccpb.KeyValue, l)
- pIdx, prevAction := 0, mvccpb.PUT
- pResp := ®istry.PluginResponse{Action: registry.Put, Succeeded: true}
-
- for _, evt := range resp.Events {
- if prevAction != evt.Type {
- prevAction = evt.Type
-
- if pIdx > 0 {
- err = setResponseAndCallback(pResp, kvs[:pIdx], op.WatchCallback)
- if err != nil {
- return
- }
- pIdx = 0
- }
- }
+ err = dispatch(resp.Events, op.WatchCallback)
+ if err != nil {
+ return
+ }
+ }
+ }
+ }
+ return fmt.Errorf("no key has been watched")
+}
- pResp.Revision = evt.Kv.ModRevision
- pResp.Action = setKvsAndConvertAction(kvs, pIdx, evt)
+func dispatch(evts []*clientv3.Event, cb registry.WatchCallback) error {
+ l := len(evts)
+ kvs := make([]*mvccpb.KeyValue, l)
+ sIdx, eIdx, prevAction := 0, 0, mvccpb.PUT
+ pResp := ®istry.PluginResponse{Action: registry.Put, Succeeded: true}
- pIdx++
- }
+ for _, evt := range evts {
+ if prevAction != evt.Type {
+ prevAction = evt.Type
- if pIdx > 0 {
- err = setResponseAndCallback(pResp, kvs[:pIdx], op.WatchCallback)
- if err != nil {
- return
- }
+ if eIdx > 0 {
+ err := setResponseAndCallback(pResp, kvs[sIdx:eIdx], cb)
+ if err != nil {
+ return err
}
+ sIdx = eIdx
}
}
+
+ if pResp.Revision < evt.Kv.ModRevision {
+ pResp.Revision = evt.Kv.ModRevision
+ }
+ pResp.Action = setKvsAndConvertAction(kvs, eIdx, evt)
+
+ eIdx++
}
- err = fmt.Errorf("no key has been watched")
- return
+
+ if eIdx > 0 {
+ return setResponseAndCallback(pResp, kvs[sIdx:eIdx], cb)
+ }
+ return nil
}
func setKvsAndConvertAction(kvs []*mvccpb.KeyValue, pIdx int, evt *clientv3.Event) registry.ActionType {
@@ -564,12 +570,7 @@ func setKvsAndConvertAction(kvs []*mvccpb.KeyValue, pIdx int, evt *clientv3.Even
func setResponseAndCallback(pResp *registry.PluginResponse, kvs []*mvccpb.KeyValue, cb registry.WatchCallback) error {
pResp.Count = int64(len(kvs))
pResp.Kvs = kvs
-
- err := cb("key information changed", pResp)
- if err != nil {
- return err
- }
- return nil
+ return cb("key information changed", pResp)
}
func NewRegistry() mgr.PluginInstance {
diff --git a/server/rest/controller/v4/microservice_controller.go b/server/rest/controller/v4/microservice_controller.go
index b0b177f..795c72c 100644
--- a/server/rest/controller/v4/microservice_controller.go
+++ b/server/rest/controller/v4/microservice_controller.go
@@ -102,7 +102,6 @@ func (this *MicroServiceService) Unregister(w http.ResponseWriter, r *http.Reque
func (this *MicroServiceService) GetServices(w http.ResponseWriter, r *http.Request) {
request := &pb.GetServicesRequest{}
- util.Logger().Debugf("domain is %s", util.ParseDomain(r.Context()))
resp, _ := core.ServiceAPI.GetServices(r.Context(), request)
respInternal := resp.Response
resp.Response = nil
diff --git a/server/service/event/dependency_event_handler.go b/server/service/event/dependency_event_handler.go
index 99808e1..d1bd311 100644
--- a/server/service/event/dependency_event_handler.go
+++ b/server/service/event/dependency_event_handler.go
@@ -91,9 +91,9 @@ func (h *DependencyEventHandler) loop() {
}
type DependencyEventHandlerResource struct {
- dep *pb.ConsumerDependency
- kv *mvccpb.KeyValue
- domainProject string
+ dep *pb.ConsumerDependency
+ kv *mvccpb.KeyValue
+ domainProject string
}
func NewDependencyEventHandlerResource(dep *pb.ConsumerDependency, kv *mvccpb.KeyValue, domainProject string) *DependencyEventHandlerResource {
@@ -150,7 +150,7 @@ func (h *DependencyEventHandler) Handle() error {
dependencyRuleHandleResults := make(chan error, len(resourcesMap))
for lockKey, resources := range resourcesMap {
- go func(lockKey string, resources []*DependencyEventHandlerResource){
+ go func(lockKey string, resources []*DependencyEventHandlerResource) {
err := h.dependencyRuleHandle(ctx, lockKey, resources)
dependencyRuleHandleResults <- err
}(lockKey, resources)
@@ -169,7 +169,7 @@ func (h *DependencyEventHandler) Handle() error {
return lastErr
}
-func (h *DependencyEventHandler)dependencyRuleHandle(ctx context.Context, lockKey string, resources []*DependencyEventHandlerResource) error{
+func (h *DependencyEventHandler) dependencyRuleHandle(ctx context.Context, lockKey string, resources []*DependencyEventHandlerResource) error {
lock, err := serviceUtil.DependencyLock(lockKey)
if err != nil {
util.Logger().Errorf(err, "create dependency rule locker failed")
diff --git a/server/service/instances.go b/server/service/instances.go
index 1f066fb..2e12fcf 100644
--- a/server/service/instances.go
+++ b/server/service/instances.go
@@ -589,9 +589,13 @@ func (s *InstanceService) Find(ctx context.Context, in *pb.FindInstancesRequest)
}, nil
}
- instances := make([]*pb.MicroServiceInstance, 0)
+ var instances []*pb.MicroServiceInstance
+ cloneCtx := ctx
+ if s, ok := ctx.Value("noCache").(string); !ok || s != "1" {
+ cloneCtx = util.SetContext(util.CloneContext(ctx), "cacheOnly", "1")
+ }
for _, serviceId := range ids {
- resp, err := s.GetInstances(ctx, &pb.GetInstancesRequest{
+ resp, err := s.GetInstances(cloneCtx, &pb.GetInstancesRequest{
ConsumerServiceId: in.ConsumerServiceId,
ProviderServiceId: serviceId,
Tags: in.Tags,
diff --git a/server/service/instances_test.go b/server/service/instances_test.go
index 355a615..dfba6d3 100644
--- a/server/service/instances_test.go
+++ b/server/service/instances_test.go
@@ -909,7 +909,7 @@ var _ = Describe("'Instance' service", func() {
respFind, err = instanceResource.Find(
util.SetTargetDomainProject(
- util.SetDomainProject(context.Background(), "user", "user"),
+ util.SetDomainProject(util.CloneContext(getContext()), "user", "user"),
"default", "default"),
&pb.FindInstancesRequest{
ConsumerServiceId: serviceId6,
diff --git a/server/service/microservices.go b/server/service/microservices.go
index 319c108..dcaa4be 100644
--- a/server/service/microservices.go
+++ b/server/service/microservices.go
@@ -18,6 +18,7 @@ package service
import (
"encoding/json"
+ "errors"
"fmt"
"github.com/apache/incubator-servicecomb-service-center/pkg/util"
"github.com/apache/incubator-servicecomb-service-center/server/core"
@@ -33,7 +34,6 @@ import (
"golang.org/x/net/context"
"strconv"
"time"
- "errors"
)
type MicroServiceService struct {
@@ -130,6 +130,7 @@ func (s *MicroServiceService) CreateServicePri(ctx context.Context, in *pb.Creat
index := apt.GenerateServiceIndexKey(serviceKey)
indexBytes := util.StringToBytesWithNoCopy(index)
aliasBytes := util.StringToBytesWithNoCopy(apt.GenerateServiceAliasKey(serviceKey))
+
opts := []registry.PluginOp{
registry.OpPut(registry.WithStrKey(key), registry.WithValue(data)),
registry.OpPut(registry.WithKey(indexBytes), registry.WithStrValue(serviceId)),
diff --git a/server/service/rule.go b/server/service/rule.go
index 287dbc2..4060907 100644
--- a/server/service/rule.go
+++ b/server/service/rule.go
@@ -64,7 +64,6 @@ func (s *MicroServiceService) AddRule(ctx context.Context, in *pb.AddServiceRule
return response, nil
}
-
ruleType, _, err := serviceUtil.GetServiceRuleType(ctx, domainProject, in.ServiceId)
util.Logger().Debugf("ruleType is %s", ruleType)
if err != nil {
diff --git a/server/service/util/dependency.go b/server/service/util/dependency.go
index 5bfb978..68c6bf7 100644
--- a/server/service/util/dependency.go
+++ b/server/service/util/dependency.go
@@ -21,6 +21,7 @@ import (
"errors"
"fmt"
"github.com/apache/incubator-servicecomb-service-center/pkg/cache"
+ "github.com/apache/incubator-servicecomb-service-center/pkg/etcdsync"
"github.com/apache/incubator-servicecomb-service-center/pkg/util"
apt "github.com/apache/incubator-servicecomb-service-center/server/core"
"github.com/apache/incubator-servicecomb-service-center/server/core/backend"
@@ -28,11 +29,10 @@ import (
pb "github.com/apache/incubator-servicecomb-service-center/server/core/proto"
scerr "github.com/apache/incubator-servicecomb-service-center/server/error"
"github.com/apache/incubator-servicecomb-service-center/server/infra/registry"
+ "github.com/apache/incubator-servicecomb-service-center/server/mux"
"golang.org/x/net/context"
"strings"
"time"
- "github.com/apache/incubator-servicecomb-service-center/pkg/etcdsync"
- "github.com/apache/incubator-servicecomb-service-center/server/mux"
)
var consumerCache *cache.Cache
@@ -1059,5 +1059,5 @@ func DependencyLock(lockKey string) (*etcdsync.DLock, error) {
}
func NewDependencyLockKey(domainProject, env string) string {
- return util.StringJoin([]string{"","env-lock", domainProject, env}, "/")
-}
\ No newline at end of file
+ return util.StringJoin([]string{"", "env-lock", domainProject, env}, "/")
+}
--
To stop receiving notification emails like this one, please contact
asifdxtreme@apache.org.