You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by as...@apache.org on 2018/01/29 04:39:21 UTC

[incubator-servicecomb-service-center] branch master updated: SCB-296 Self preservation will never stop when SC do list-watch loop (#261)

This is an automated email from the ASF dual-hosted git repository.

asifdxtreme pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-service-center.git


The following commit(s) were added to refs/heads/master by this push:
     new a4b6ce4  SCB-296 Self preservation will never stop when SC do list-watch loop (#261)
a4b6ce4 is described below

commit a4b6ce4d60309361940b9039b9a16ce641af183a
Author: little-cui <su...@qq.com>
AuthorDate: Mon Jan 29 12:39:18 2018 +0800

    SCB-296 Self preservation will never stop when SC do list-watch loop (#261)
    
    * SCB-296 Self preservation will never stop when SC do list-watch loop
---
 etc/conf/app.conf                                  |   5 +-
 integration/instances_test.go                      |   2 +-
 pkg/chain/chain.go                                 |   2 +-
 pkg/httplimiter/httpratelimiter.go                 |  13 +-
 pkg/logrotate/logrotate.go                         |   4 +-
 pkg/ratelimiter/ratelimiter.go                     |   5 +-
 pkg/rest/client.go                                 |   2 +-
 pkg/rest/route.go                                  |  16 +-
 server/core/backend/store/cacher.go                | 104 ++++++++-----
 server/core/backend/store/defer.go                 | 173 +++++++++++----------
 server/core/backend/store/defer_test.go            | 141 +++++++++++++++++
 server/core/backend/store/indexer.go               |   5 -
 server/core/backend/store/listwatch.go             |  10 +-
 server/core/backend/store/opt.go                   |   4 +-
 server/core/backend/store/store.go                 |   7 -
 server/infra/quota/quota.go                        |   2 +-
 server/plugin/infra/registry/etcd/etcd.go          |  75 ++++-----
 .../rest/controller/v4/microservice_controller.go  |   1 -
 server/service/event/dependency_event_handler.go   |  10 +-
 server/service/instances.go                        |   8 +-
 server/service/instances_test.go                   |   2 +-
 server/service/microservices.go                    |   3 +-
 server/service/rule.go                             |   1 -
 server/service/util/dependency.go                  |   8 +-
 24 files changed, 376 insertions(+), 227 deletions(-)

diff --git a/etc/conf/app.conf b/etc/conf/app.conf
index 9d54f2d..a48f08c 100644
--- a/etc/conf/app.conf
+++ b/etc/conf/app.conf
@@ -26,11 +26,12 @@ plugins_dir = ./plugins
 registry_plugin = etcd
 
 # registry address
-# registry_plugin equals to 'embeded_etcd', example:
+# 1. if registry_plugin equals to 'embeded_etcd'
 # manager_name = "sc-0"
 # manager_addr = "http://127.0.0.1:2380"
 # manager_cluster = "sc-0=http://127.0.0.1:2380"
-# registry_plugin equals to 'etcd'
+# 2. if registry_plugin equals to 'etcd'
+# manager_cluster = "127.0.0.1:2379"
 manager_cluster = "127.0.0.1:2379"
 
 #heartbeat that sync synchronizes client's endpoints with the known endpoints from the etcd membership,unit is second.
diff --git a/integration/instances_test.go b/integration/instances_test.go
index 8ab0a78..948c9f6 100644
--- a/integration/instances_test.go
+++ b/integration/instances_test.go
@@ -248,7 +248,7 @@ var _ = Describe("MicroService Api Test", func() {
 
 		By("Discover MicroService Instance API", func() {
 			It("Find Micro-service Info by AppID", func() {
-				req, _ := http.NewRequest(GET, SCURL+FINDINSTANCE+"?appId="+serviceAppId+"&serviceName="+serviceName+"&version="+serviceVersion, nil)
+				req, _ := http.NewRequest(GET, SCURL+FINDINSTANCE+"?noCache=1&appId="+serviceAppId+"&serviceName="+serviceName+"&version="+serviceVersion, nil)
 				req.Header.Set("X-Domain-Name", "default")
 				req.Header.Set("X-ConsumerId", serviceId)
 				resp, _ := scclient.Do(req)
diff --git a/pkg/chain/chain.go b/pkg/chain/chain.go
index 8517824..a48fc1c 100644
--- a/pkg/chain/chain.go
+++ b/pkg/chain/chain.go
@@ -60,7 +60,7 @@ func (c *Chain) syncNext(i *Invocation) {
 }
 
 func (c *Chain) Next(i *Invocation) {
-	go c.syncNext(i)
+	c.syncNext(i)
 }
 
 func NewChain(name string, handlers []Handler) Chain {
diff --git a/pkg/httplimiter/httpratelimiter.go b/pkg/httplimiter/httpratelimiter.go
index 9128d5b..3110056 100644
--- a/pkg/httplimiter/httpratelimiter.go
+++ b/pkg/httplimiter/httpratelimiter.go
@@ -18,13 +18,13 @@
 package httplimiter
 
 import (
+	"fmt"
+	"github.com/apache/incubator-servicecomb-service-center/pkg/ratelimiter"
 	"net/http"
 	"strconv"
 	"strings"
-	"time"
-	"github.com/apache/incubator-servicecomb-service-center/pkg/ratelimiter"
-	"fmt"
 	"sync"
+	"time"
 )
 
 type HTTPErrorMessage struct {
@@ -36,7 +36,6 @@ func (httpErrorMessage *HTTPErrorMessage) Error() string {
 	return fmt.Sprintf("%v: %v", httpErrorMessage.StatusCode, httpErrorMessage.Message)
 }
 
-
 type HttpLimiter struct {
 	HttpMessage    string
 	ContentType    string
@@ -51,8 +50,6 @@ type HttpLimiter struct {
 	sync.RWMutex
 }
 
-
-
 func LimitBySegments(limiter *HttpLimiter, keys []string) *HTTPErrorMessage {
 	if limiter.LimitExceeded(strings.Join(keys, "|")) {
 		return &HTTPErrorMessage{Message: limiter.HttpMessage, StatusCode: limiter.StatusCode}
@@ -199,7 +196,6 @@ func getRemoteIP(ipLookups []string, r *http.Request) string {
 	return ""
 }
 
-
 func NewHttpLimiter(max int64, ttl time.Duration) *HttpLimiter {
 	limiter := &HttpLimiter{RequestLimit: max, TTL: ttl}
 	limiter.ContentType = "text/plain; charset=utf-8"
@@ -211,7 +207,6 @@ func NewHttpLimiter(max int64, ttl time.Duration) *HttpLimiter {
 	return limiter
 }
 
-
 func (rateLimiter *HttpLimiter) LimitExceeded(key string) bool {
 	rateLimiter.Lock()
 	if _, found := rateLimiter.leakyBuckets[key]; !found {
@@ -224,5 +219,3 @@ func (rateLimiter *HttpLimiter) LimitExceeded(key string) bool {
 	}
 	return true
 }
-
-
diff --git a/pkg/logrotate/logrotate.go b/pkg/logrotate/logrotate.go
index c2e9e93..76de98d 100644
--- a/pkg/logrotate/logrotate.go
+++ b/pkg/logrotate/logrotate.go
@@ -240,9 +240,9 @@ func LogRotate(path string, MaxFileSize int, MaxBackupCount int) {
 	}
 }
 
-func isSkip(f os.FileInfo) bool{
+func isSkip(f os.FileInfo) bool {
 	//dir or non write permission,skip
-	return f.IsDir() || (f.Mode() & 0200 == 0000)
+	return f.IsDir() || (f.Mode()&0200 == 0000)
 }
 
 //path : where the file will be filtered
diff --git a/pkg/ratelimiter/ratelimiter.go b/pkg/ratelimiter/ratelimiter.go
index 8184108..a46bc39 100644
--- a/pkg/ratelimiter/ratelimiter.go
+++ b/pkg/ratelimiter/ratelimiter.go
@@ -18,8 +18,8 @@
 package ratelimiter
 
 import (
-	"time"
 	"sync"
+	"time"
 )
 
 type LeakyBucket struct {
@@ -32,7 +32,6 @@ type LeakyBucket struct {
 	availableTicker int64
 }
 
-
 func NewLeakyBucket(fillInterval time.Duration, capacity, quantum int64) *LeakyBucket {
 	if fillInterval <= 0 {
 		panic("leaky bucket fill interval is not > 0")
@@ -77,7 +76,6 @@ func (leakyBucket *LeakyBucket) MaximumTakeDuration(count int64, maxWait time.Du
 	return leakyBucket.take(time.Now(), count, maxWait)
 }
 
-
 func (leakyBucket *LeakyBucket) Rate() float64 {
 	return 1e9 * float64(leakyBucket.quantum) / float64(leakyBucket.interval)
 }
@@ -118,4 +116,3 @@ func (leakyBucket *LeakyBucket) adjust(now time.Time) (currentTick int64) {
 	leakyBucket.availableTicker = currentTick
 	return
 }
-
diff --git a/pkg/rest/client.go b/pkg/rest/client.go
index 3ce4e29..2cd68ab 100644
--- a/pkg/rest/client.go
+++ b/pkg/rest/client.go
@@ -28,9 +28,9 @@ import (
 	"io/ioutil"
 	"net"
 	"net/http"
+	"net/url"
 	"reflect"
 	"time"
-	"net/url"
 )
 
 const (
diff --git a/pkg/rest/route.go b/pkg/rest/route.go
index 6595021..3015aff 100644
--- a/pkg/rest/route.go
+++ b/pkg/rest/route.go
@@ -23,6 +23,7 @@ import (
 	errorsEx "github.com/apache/incubator-servicecomb-service-center/pkg/errors"
 	"github.com/apache/incubator-servicecomb-service-center/pkg/util"
 	"net/http"
+	"net/url"
 	"strings"
 )
 
@@ -77,7 +78,7 @@ func (this *ROAServerHandler) ServeHTTP(w http.ResponseWriter, r *http.Request)
 	for _, ph := range this.handlers[r.Method] {
 		if params, ok := ph.try(r.URL.Path); ok {
 			if len(params) > 0 {
-				r.URL.RawQuery = util.UrlEncode(params) + "&" + r.URL.RawQuery
+				r.URL.RawQuery = params + r.URL.RawQuery
 			}
 
 			this.serve(ph, w, r)
@@ -151,7 +152,7 @@ func (this *ROAServerHandler) serve(ph *urlPatternHandler, w http.ResponseWriter
 	<-ch
 }
 
-func (this *urlPatternHandler) try(path string) (p map[string]string, _ bool) {
+func (this *urlPatternHandler) try(path string) (p string, _ bool) {
 	var i, j int
 	l, sl := len(this.Path), len(path)
 	for i < sl {
@@ -160,7 +161,7 @@ func (this *urlPatternHandler) try(path string) (p map[string]string, _ bool) {
 			if this.Path != "/" && l > 0 && this.Path[l-1] == '/' {
 				return p, true
 			}
-			return nil, false
+			return "", false
 		case this.Path[j] == ':':
 			var val string
 			var nextc byte
@@ -168,19 +169,16 @@ func (this *urlPatternHandler) try(path string) (p map[string]string, _ bool) {
 			_, nextc, j = match(this.Path, isAlnum, 0, j+1)
 			val, _, i = match(path, matchParticial, nextc, i)
 
-			if p == nil {
-				p = make(map[string]string, 5)
-			}
-			p[this.Path[o:j]] = val
+			p += url.QueryEscape(this.Path[o:j]) + "=" + url.QueryEscape(val) + "&"
 		case path[i] == this.Path[j]:
 			i++
 			j++
 		default:
-			return nil, false
+			return "", false
 		}
 	}
 	if j != l {
-		return nil, false
+		return "", false
 	}
 	return p, true
 }
diff --git a/server/core/backend/store/cacher.go b/server/core/backend/store/cacher.go
index 7ca82cf..c6ade97 100644
--- a/server/core/backend/store/cacher.go
+++ b/server/core/backend/store/cacher.go
@@ -120,6 +120,15 @@ func (c *KvCache) Have(k interface{}) (ok bool) {
 	return
 }
 
+func (c *KvCache) RLock() map[string]*mvccpb.KeyValue {
+	c.rwMux.RLock()
+	return c.store
+}
+
+func (c *KvCache) RUnlock() {
+	c.rwMux.RUnlock()
+}
+
 func (c *KvCache) Lock() map[string]*mvccpb.KeyValue {
 	c.rwMux.Lock()
 	return c.store
@@ -251,15 +260,39 @@ func (c *KvCacher) handleWatcher(watcher *Watcher) error {
 }
 
 func (c *KvCacher) needDeferHandle(evts []*Event) bool {
-	if c.Cfg.DeferHander == nil {
+	if c.Cfg.DeferHandler == nil {
 		return false
 	}
 
-	return c.Cfg.DeferHander.OnCondition(c.Cache(), evts)
+	return c.Cfg.DeferHandler.OnCondition(c.Cache(), evts)
+}
+
+func (c *KvCacher) refresh(stopCh <-chan struct{}) {
+	util.Logger().Debugf("start to list and watch %s", c.Cfg)
+	ctx, cancel := context.WithCancel(context.Background())
+	c.goroute.Do(func(stopCh <-chan struct{}) {
+		defer cancel()
+		<-stopCh
+	})
+	for {
+		start := time.Now()
+		c.ListAndWatch(ctx)
+		watchDuration := time.Since(start)
+		nextPeriod := 0 * time.Second
+		if watchDuration > 0 && c.Cfg.Period > watchDuration {
+			nextPeriod = c.Cfg.Period - watchDuration
+		}
+		select {
+		case <-stopCh:
+			util.Logger().Debugf("stop to list and watch %s", c.Cfg)
+			return
+		case <-time.After(nextPeriod):
+		}
+	}
 }
 
 func (c *KvCacher) deferHandle(stopCh <-chan struct{}) {
-	if c.Cfg.DeferHander == nil {
+	if c.Cfg.DeferHandler == nil {
 		return
 	}
 
@@ -268,7 +301,7 @@ func (c *KvCacher) deferHandle(stopCh <-chan struct{}) {
 		select {
 		case <-stopCh:
 			return
-		case evt, ok := <-c.Cfg.DeferHander.HandleChan():
+		case evt, ok := <-c.Cfg.DeferHandler.HandleChan():
 			if !ok {
 				<-time.After(time.Second)
 				continue
@@ -281,7 +314,7 @@ func (c *KvCacher) deferHandle(stopCh <-chan struct{}) {
 
 			evts[i] = evt
 			i++
-		case <-time.After(time.Second):
+		case <-time.After(300 * time.Millisecond):
 			if i == 0 {
 				continue
 			}
@@ -305,9 +338,9 @@ func (c *KvCacher) sync(evts []*Event) {
 }
 
 func (c *KvCacher) filter(rev int64, items []*mvccpb.KeyValue) []*Event {
-	cache := c.Cache().(*KvCache)
-	store := cache.Lock()
-	defer cache.Unlock()
+	store := c.cache.RLock()
+	defer c.cache.RUnlock()
+
 	oc, nc := len(store), len(items)
 	tc := oc + nc
 	if tc == 0 {
@@ -359,7 +392,7 @@ func (c *KvCacher) filterDelete(store map[string]*mvccpb.KeyValue, newStore map[
 		block[i] = &Event{
 			Revision: rev,
 			Type:     proto.EVT_DELETE,
-			Key:      c.Cfg.Key,
+			Prefix:   c.Cfg.Key,
 			Object:   v,
 		}
 		i++
@@ -387,7 +420,7 @@ func (c *KvCacher) filterCreateOrUpdate(store map[string]*mvccpb.KeyValue, newSt
 			block[i] = &Event{
 				Revision: rev,
 				Type:     proto.EVT_CREATE,
-				Key:      c.Cfg.Key,
+				Prefix:   c.Cfg.Key,
 				Object:   v,
 			}
 			i++
@@ -407,7 +440,7 @@ func (c *KvCacher) filterCreateOrUpdate(store map[string]*mvccpb.KeyValue, newSt
 		block[i] = &Event{
 			Revision: rev,
 			Type:     proto.EVT_UPDATE,
-			Key:      c.Cfg.Key,
+			Prefix:   c.Cfg.Key,
 			Object:   v,
 		}
 		i++
@@ -424,10 +457,9 @@ func (c *KvCacher) filterCreateOrUpdate(store map[string]*mvccpb.KeyValue, newSt
 }
 
 func (c *KvCacher) onEvents(evts []*Event) {
-	cache := c.Cache().(*KvCache)
-	idx := 0
+	idx, init := 0, !c.IsReady()
 	kvEvts := make([]*KvEvent, len(evts))
-	store := cache.Lock()
+	store := c.cache.Lock()
 	for _, evt := range evts {
 		kv := evt.Object.(*mvccpb.KeyValue)
 		key := util.BytesToStringWithNoCopy(kv.Key)
@@ -456,7 +488,6 @@ func (c *KvCacher) onEvents(evts []*Event) {
 				Action:   t,
 				KV:       kv,
 			}
-			idx++
 		case proto.EVT_DELETE:
 			if !ok {
 				util.Logger().Warnf(nil, "unexpected %s event! key %s does not exist", evt.Type, key)
@@ -470,10 +501,15 @@ func (c *KvCacher) onEvents(evts []*Event) {
 				Action:   evt.Type,
 				KV:       prevKv,
 			}
-			idx++
 		}
+
+		if init && kvEvts[idx].Action == proto.EVT_CREATE {
+			kvEvts[idx].Action = proto.EVT_INIT
+		}
+
+		idx++
 	}
-	cache.Unlock()
+	c.cache.Unlock()
 
 	c.onKvEvents(kvEvts[:idx])
 }
@@ -488,30 +524,7 @@ func (c *KvCacher) onKvEvents(evts []*KvEvent) {
 }
 
 func (c *KvCacher) run() {
-	c.goroute.Do(func(stopCh <-chan struct{}) {
-		util.Logger().Debugf("start to list and watch %s", c.Cfg)
-		ctx, cancel := context.WithCancel(context.Background())
-		c.goroute.Do(func(stopCh <-chan struct{}) {
-			defer cancel()
-			<-stopCh
-		})
-		for {
-			start := time.Now()
-			c.ListAndWatch(ctx)
-			watchDuration := time.Now().Sub(start)
-			nextPeriod := 0 * time.Second
-			if watchDuration > 0 && c.Cfg.Period > watchDuration {
-				nextPeriod = c.Cfg.Period - watchDuration
-			}
-			select {
-			case <-stopCh:
-				util.Logger().Debugf("stop to list and watch %s", c.Cfg)
-				return
-			case <-time.After(nextPeriod):
-			}
-		}
-	})
-
+	c.goroute.Do(c.refresh)
 	c.goroute.Do(c.deferHandle)
 }
 
@@ -533,6 +546,15 @@ func (c *KvCacher) Ready() <-chan struct{} {
 	return c.ready
 }
 
+func (c *KvCacher) IsReady() bool {
+	select {
+	case <-c.ready:
+		return true
+	default:
+		return false
+	}
+}
+
 func NewKvCache(c *KvCacher, size int) *KvCache {
 	return &KvCache{
 		owner:       c,
diff --git a/server/core/backend/store/defer.go b/server/core/backend/store/defer.go
index 7f7bbfa..173a44a 100644
--- a/server/core/backend/store/defer.go
+++ b/server/core/backend/store/defer.go
@@ -30,122 +30,127 @@ type DeferHandler interface {
 	HandleChan() <-chan *Event
 }
 
+type deferItem struct {
+	ttl   *time.Timer
+	event *Event
+}
+
 type InstanceEventDeferHandler struct {
 	Percent float64
-	enabled bool
-	events  map[string]*Event
-	ttls    map[string]int64
-	mux     sync.RWMutex
-	deferCh chan *Event
-}
 
-func (iedh *InstanceEventDeferHandler) deferMode(total int, del int) bool {
-	return iedh.Percent > 0 && total > 0 &&
-		del > 1 &&
-		float64(del/total) >= iedh.Percent
+	cache     Cache
+	once      sync.Once
+	enabled   bool
+	items     map[string]*deferItem
+	pendingCh chan []*Event
+	deferCh   chan *Event
 }
 
-func (iedh *InstanceEventDeferHandler) needDefer(cache Cache, evts []*Event) bool {
-	if !iedh.deferMode(cache.Size(), len(evts)) {
+func (iedh *InstanceEventDeferHandler) OnCondition(cache Cache, evts []*Event) bool {
+	if iedh.Percent <= 0 {
 		return false
 	}
 
-	for _, evt := range evts {
-		if evt.Type != pb.EVT_DELETE {
-			return false
-		}
-	}
-	return true
-}
-
-func (iedh *InstanceEventDeferHandler) init() {
-	if iedh.deferCh == nil {
+	iedh.once.Do(func() {
+		iedh.cache = cache
+		iedh.items = make(map[string]*deferItem, event_block_size)
+		iedh.pendingCh = make(chan []*Event, event_block_size)
 		iedh.deferCh = make(chan *Event, event_block_size)
-	}
-
-	if iedh.events == nil {
-		iedh.events = make(map[string]*Event, event_block_size)
-		iedh.ttls = make(map[string]int64, event_block_size)
 		util.Go(iedh.check)
-	}
-}
-
-func (iedh *InstanceEventDeferHandler) OnCondition(cache Cache, evts []*Event) bool {
-	iedh.mux.Lock()
-	if !iedh.enabled && iedh.needDefer(cache, evts) {
-		util.Logger().Warnf(nil, "self preservation is enabled, caught %d(>=%.0f%%) DELETE events",
-			len(evts), iedh.Percent*100)
-		iedh.enabled = true
-	}
+	})
 
-	if !iedh.enabled {
-		iedh.mux.Unlock()
-		return false
-	}
-
-	iedh.init()
-
-	for _, evt := range evts {
-		kv, ok := evt.Object.(*mvccpb.KeyValue)
-		if !ok {
-			continue
-		}
-		key := util.BytesToStringWithNoCopy(kv.Key)
-		switch evt.Type {
-		case pb.EVT_CREATE, pb.EVT_UPDATE:
-			delete(iedh.events, key)
-			delete(iedh.ttls, key)
+	iedh.pendingCh <- evts
+	return true
+}
 
+func (iedh *InstanceEventDeferHandler) recoverOrDefer(evt *Event) error {
+	kv := evt.Object.(*mvccpb.KeyValue)
+	key := util.BytesToStringWithNoCopy(kv.Key)
+	_, ok := iedh.items[key]
+	switch evt.Type {
+	case pb.EVT_CREATE, pb.EVT_UPDATE:
+		if ok {
 			util.Logger().Infof("recovered key %s events", key)
+			// return nil // no need to publish event to subscribers?
+		}
+		iedh.recover(evt)
+	case pb.EVT_DELETE:
+		if ok {
+			return nil
+		}
 
-			iedh.deferCh <- evt
-		case pb.EVT_DELETE:
-			var instance pb.MicroServiceInstance
-			err := json.Unmarshal(kv.Value, &instance)
-			if err != nil {
-				util.Logger().Errorf(err, "unmarshal instance file failed, key is %s", key)
-				continue
-			}
-			iedh.events[key] = evt
-			iedh.ttls[key] = int64(instance.HealthCheck.Interval * (instance.HealthCheck.Times + 1))
+		var instance pb.MicroServiceInstance
+		err := json.Unmarshal(kv.Value, &instance)
+		if err != nil {
+			util.Logger().Errorf(err, "unmarshal instance file failed, key is %s", key)
+			return err
+		}
+		iedh.items[key] = &deferItem{
+			ttl: time.NewTimer(
+				time.Duration(instance.HealthCheck.Interval*(instance.HealthCheck.Times+1)) * time.Second),
+			event: evt,
 		}
 	}
-	iedh.mux.Unlock()
-	return true
+	return nil
+}
+
+func (iedh *InstanceEventDeferHandler) HandleChan() <-chan *Event {
+	return iedh.deferCh
 }
 
 func (iedh *InstanceEventDeferHandler) check(stopCh <-chan struct{}) {
 	defer util.RecoverAndReport()
+	t, n := iedh.newTimer(), false
 	for {
 		select {
 		case <-stopCh:
 			return
-		case <-time.After(time.Second):
-			iedh.mux.Lock()
-			for key, ttl := range iedh.ttls {
-				ttl--
-				if ttl > 0 {
-					iedh.ttls[key] = ttl
-					continue
-				}
-
-				evt := iedh.events[key]
-				delete(iedh.events, key)
-				delete(iedh.ttls, key)
+		case evts := <-iedh.pendingCh:
+			for _, evt := range evts {
+				iedh.recoverOrDefer(evt)
+			}
 
-				util.Logger().Warnf(nil, "defer handle timed out, removed key is %s", key)
+			del := len(iedh.items)
+			if del > 0 && !n {
+				t.Stop()
+				t, n = iedh.newTimer(), true
+			}
 
-				iedh.deferCh <- evt
+			total := iedh.cache.Size()
+			if del > 0 && total > 0 && float64(del) >= float64(total)*iedh.Percent {
+				iedh.enabled = true
+				util.Logger().Warnf(nil, "self preservation is enabled, caught %d/%d(>=%.0f%%) DELETE events",
+					del, total, iedh.Percent*100)
+			}
+		case <-t.C:
+			t, n = iedh.newTimer(), false
+
+			for key, item := range iedh.items {
+				if iedh.enabled {
+					select {
+					case <-item.ttl.C:
+					default:
+						continue
+					}
+					util.Logger().Warnf(nil, "defer handle timed out, removed key is %s", key)
+				}
+				iedh.recover(item.event)
 			}
-			if iedh.enabled && len(iedh.ttls) == 0 {
+
+			if iedh.enabled && len(iedh.items) == 0 {
 				iedh.enabled = false
 				util.Logger().Warnf(nil, "self preservation is stopped")
 			}
-			iedh.mux.Unlock()
 		}
 	}
 }
 
-func (iedh *InstanceEventDeferHandler) HandleChan() <-chan *Event {
-	return iedh.deferCh
+func (iedh *InstanceEventDeferHandler) newTimer() *time.Timer {
+	return time.NewTimer(2 * time.Second) // instance DELETE event will be delay.
+}
+
+func (iedh *InstanceEventDeferHandler) recover(evt *Event) {
+	key := util.BytesToStringWithNoCopy(evt.Object.(*mvccpb.KeyValue).Key)
+	delete(iedh.items, key)
+	iedh.deferCh <- evt
 }
diff --git a/server/core/backend/store/defer_test.go b/server/core/backend/store/defer_test.go
new file mode 100644
index 0000000..2c9c453
--- /dev/null
+++ b/server/core/backend/store/defer_test.go
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package store
+
+import (
+	"encoding/json"
+	"fmt"
+	"github.com/apache/incubator-servicecomb-service-center/pkg/util"
+	pb "github.com/apache/incubator-servicecomb-service-center/server/core/proto"
+	"github.com/coreos/etcd/mvcc/mvccpb"
+	"testing"
+	"time"
+)
+
+func TestInstanceEventDeferHandler_OnCondition(t *testing.T) {
+	iedh := &InstanceEventDeferHandler{
+		Percent: 0,
+	}
+
+	if iedh.OnCondition(nil, nil) {
+		fmt.Printf(`TestInstanceEventDeferHandler_OnCondition with 0%% failed`)
+		t.FailNow()
+	}
+
+	iedh.Percent = 0.01
+	if !iedh.OnCondition(nil, nil) {
+		fmt.Printf(`TestInstanceEventDeferHandler_OnCondition with 1%% failed`)
+		t.FailNow()
+	}
+}
+
+func TestInstanceEventDeferHandler_HandleChan(t *testing.T) {
+	inst := &pb.MicroServiceInstance{
+		HealthCheck: &pb.HealthCheck{
+			Interval: 4,
+			Times:    0,
+		},
+	}
+	b, _ := json.Marshal(inst)
+	kv1 := &mvccpb.KeyValue{
+		Key:   util.StringToBytesWithNoCopy("/1"),
+		Value: b,
+	}
+	kv2 := &mvccpb.KeyValue{
+		Key:   util.StringToBytesWithNoCopy("/2"),
+		Value: b,
+	}
+	kv3 := &mvccpb.KeyValue{
+		Key:   util.StringToBytesWithNoCopy("/3"),
+		Value: b,
+	}
+
+	cache := NewKvCache(nil, 1)
+	cache.store["/1"] = kv1
+	cache.store["/2"] = kv2
+	cache.store["/3"] = kv3
+
+	evts1 := []*Event{
+		{
+			Type:   pb.EVT_CREATE,
+			Object: kv1,
+		},
+		{
+			Type:   pb.EVT_UPDATE,
+			Object: kv1,
+		},
+	}
+	evts2 := []*Event{
+		{
+			Type:   pb.EVT_DELETE,
+			Object: kv2,
+		},
+		{
+			Type:   pb.EVT_DELETE,
+			Object: kv3,
+		},
+	}
+	evts3 := []*Event{
+		{
+			Type:   pb.EVT_CREATE,
+			Object: kv2,
+		},
+	}
+
+	iedh := &InstanceEventDeferHandler{
+		Percent: 0.01,
+	}
+
+	iedh.OnCondition(cache, evts1)
+	iedh.OnCondition(cache, evts2)
+	iedh.OnCondition(cache, evts3)
+
+	getEvents(t, iedh)
+
+	iedh.Percent = 0.8
+	iedh.OnCondition(cache, evts1)
+	iedh.OnCondition(cache, evts2)
+	iedh.OnCondition(cache, evts3)
+
+	getEvents(t, iedh)
+}
+
+func getEvents(t *testing.T, iedh *InstanceEventDeferHandler) {
+	fmt.Println(time.Now())
+	c := time.After(3 * time.Second)
+	var evt3 *Event
+	for {
+		select {
+		case evt := <-iedh.HandleChan():
+			fmt.Println(time.Now(), evt)
+			if string(evt.Object.(*mvccpb.KeyValue).Key) == "/3" {
+				evt3 = evt
+				if iedh.Percent == 0.01 && evt.Type == pb.EVT_DELETE {
+					fmt.Printf(`TestInstanceEventDeferHandler_HandleChan with 1%% failed`)
+					t.FailNow()
+				}
+			}
+			continue
+		case <-c:
+			if iedh.Percent == 0.8 && evt3 == nil {
+				fmt.Printf(`TestInstanceEventDeferHandler_HandleChan with 80%% failed`)
+				t.FailNow()
+			}
+		}
+		break
+	}
+}
diff --git a/server/core/backend/store/indexer.go b/server/core/backend/store/indexer.go
index 528922d..7236050 100644
--- a/server/core/backend/store/indexer.go
+++ b/server/core/backend/store/indexer.go
@@ -264,11 +264,6 @@ func (i *Indexer) deletePrefixKey(prefix, key string) {
 	if !ok {
 		return
 	}
-	// remove child
-	for k := range i.prefixIndex[key] {
-		i.deletePrefixKey(key, k)
-	}
-
 	delete(m, key)
 
 	// remove parent which has no child
diff --git a/server/core/backend/store/listwatch.go b/server/core/backend/store/listwatch.go
index 9e210bc..189f909 100644
--- a/server/core/backend/store/listwatch.go
+++ b/server/core/backend/store/listwatch.go
@@ -32,7 +32,7 @@ const EVENT_BUS_MAX_SIZE = 1000
 type Event struct {
 	Revision int64
 	Type     proto.EventType
-	Key      string
+	Prefix   string
 	Object   interface{}
 }
 
@@ -93,7 +93,7 @@ func (lw *ListWatcher) doWatch(ctx context.Context, f func(evt []*Event)) error
 
 				evts := make([]*Event, len(resp.Kvs))
 				for i, kv := range resp.Kvs {
-					evt := &Event{Key: lw.Key, Revision: kv.ModRevision}
+					evt := &Event{Prefix: lw.Key, Revision: kv.ModRevision}
 					switch {
 					case resp.Action == registry.Put && kv.Version == 1:
 						evt.Type, evt.Object = proto.EVT_CREATE, kv
@@ -148,9 +148,9 @@ func (w *Watcher) process() {
 	}
 }
 
-func (w *Watcher) sendEvent(evt []*Event) {
+func (w *Watcher) sendEvent(evts []*Event) {
 	defer util.RecoverAndReport()
-	w.bus <- evt
+	w.bus <- evts
 }
 
 func (w *Watcher) Stop() {
@@ -168,7 +168,7 @@ func (w *Watcher) Stop() {
 func errEvent(key string, err error) *Event {
 	return &Event{
 		Type:   proto.EVT_ERROR,
-		Key:    key,
+		Prefix: key,
 		Object: err,
 	}
 }
diff --git a/server/core/backend/store/opt.go b/server/core/backend/store/opt.go
index 252f244..86142b1 100644
--- a/server/core/backend/store/opt.go
+++ b/server/core/backend/store/opt.go
@@ -34,7 +34,7 @@ type KvCacherCfg struct {
 	Timeout            time.Duration
 	Period             time.Duration
 	OnEvent            KvEventFunc
-	DeferHander        DeferHandler
+	DeferHandler       DeferHandler
 }
 
 func (cfg KvCacherCfg) String() string {
@@ -65,7 +65,7 @@ func WithEventFunc(f KvEventFunc) KvCacherCfgOption {
 }
 
 func WithDeferHandler(h DeferHandler) KvCacherCfgOption {
-	return func(cfg *KvCacherCfg) { cfg.DeferHander = h }
+	return func(cfg *KvCacherCfg) { cfg.DeferHandler = h }
 }
 
 func DefaultKvCacherConfig() KvCacherCfg {
diff --git a/server/core/backend/store/store.go b/server/core/backend/store/store.go
index d2f89e7..6a5f38c 100644
--- a/server/core/backend/store/store.go
+++ b/server/core/backend/store/store.go
@@ -124,13 +124,6 @@ func (s *KvStore) Initialize() {
 
 func (s *KvStore) dispatchEvent(t StoreType, evt *KvEvent) {
 	s.indexers[t].OnCacheEvent(evt)
-	select {
-	case <-s.Ready():
-	default:
-		if evt.Action == pb.EVT_CREATE {
-			evt.Action = pb.EVT_INIT
-		}
-	}
 	EventProxy(t).OnEvent(evt)
 }
 
diff --git a/server/infra/quota/quota.go b/server/infra/quota/quota.go
index 84dc579..acff48e 100644
--- a/server/infra/quota/quota.go
+++ b/server/infra/quota/quota.go
@@ -18,8 +18,8 @@ package quota
 
 import (
 	"fmt"
-	"golang.org/x/net/context"
 	scerr "github.com/apache/incubator-servicecomb-service-center/server/error"
+	"golang.org/x/net/context"
 )
 
 type ApplyQuotaResult struct {
diff --git a/server/plugin/infra/registry/etcd/etcd.go b/server/plugin/infra/registry/etcd/etcd.go
index d848eb9..1e57ee1 100644
--- a/server/plugin/infra/registry/etcd/etcd.go
+++ b/server/plugin/infra/registry/etcd/etcd.go
@@ -501,49 +501,55 @@ func (c *EtcdClient) Watch(ctx context.Context, opts ...registry.PluginOpOption)
 				return
 			case resp, ok = <-ws:
 				if !ok {
-					err := errors.New("channel is closed")
-					return err
+					err = errors.New("channel is closed")
+					return
 				}
 				// cause a rpc ResourceExhausted error if watch response body larger then 4MB
 				if err = resp.Err(); err != nil {
-					return err
+					return
 				}
 
-				l := len(resp.Events)
-				kvs := make([]*mvccpb.KeyValue, l)
-				pIdx, prevAction := 0, mvccpb.PUT
-				pResp := &registry.PluginResponse{Action: registry.Put, Succeeded: true}
-
-				for _, evt := range resp.Events {
-					if prevAction != evt.Type {
-						prevAction = evt.Type
-
-						if pIdx > 0 {
-							err = setResponseAndCallback(pResp, kvs[:pIdx], op.WatchCallback)
-							if err != nil {
-								return
-							}
-							pIdx = 0
-						}
-					}
+				err = dispatch(resp.Events, op.WatchCallback)
+				if err != nil {
+					return
+				}
+			}
+		}
+	}
+	return fmt.Errorf("no key has been watched")
+}
 
-					pResp.Revision = evt.Kv.ModRevision
-					pResp.Action = setKvsAndConvertAction(kvs, pIdx, evt)
+func dispatch(evts []*clientv3.Event, cb registry.WatchCallback) error {
+	l := len(evts)
+	kvs := make([]*mvccpb.KeyValue, l)
+	sIdx, eIdx, prevAction := 0, 0, mvccpb.PUT
+	pResp := &registry.PluginResponse{Action: registry.Put, Succeeded: true}
 
-					pIdx++
-				}
+	for _, evt := range evts {
+		if prevAction != evt.Type {
+			prevAction = evt.Type
 
-				if pIdx > 0 {
-					err = setResponseAndCallback(pResp, kvs[:pIdx], op.WatchCallback)
-					if err != nil {
-						return
-					}
+			if eIdx > 0 {
+				err := setResponseAndCallback(pResp, kvs[sIdx:eIdx], cb)
+				if err != nil {
+					return err
 				}
+				sIdx = eIdx
 			}
 		}
+
+		if pResp.Revision < evt.Kv.ModRevision {
+			pResp.Revision = evt.Kv.ModRevision
+		}
+		pResp.Action = setKvsAndConvertAction(kvs, eIdx, evt)
+
+		eIdx++
 	}
-	err = fmt.Errorf("no key has been watched")
-	return
+
+	if eIdx > 0 {
+		return setResponseAndCallback(pResp, kvs[sIdx:eIdx], cb)
+	}
+	return nil
 }
 
 func setKvsAndConvertAction(kvs []*mvccpb.KeyValue, pIdx int, evt *clientv3.Event) registry.ActionType {
@@ -564,12 +570,7 @@ func setKvsAndConvertAction(kvs []*mvccpb.KeyValue, pIdx int, evt *clientv3.Even
 func setResponseAndCallback(pResp *registry.PluginResponse, kvs []*mvccpb.KeyValue, cb registry.WatchCallback) error {
 	pResp.Count = int64(len(kvs))
 	pResp.Kvs = kvs
-
-	err := cb("key information changed", pResp)
-	if err != nil {
-		return err
-	}
-	return nil
+	return cb("key information changed", pResp)
 }
 
 func NewRegistry() mgr.PluginInstance {
diff --git a/server/rest/controller/v4/microservice_controller.go b/server/rest/controller/v4/microservice_controller.go
index b0b177f..795c72c 100644
--- a/server/rest/controller/v4/microservice_controller.go
+++ b/server/rest/controller/v4/microservice_controller.go
@@ -102,7 +102,6 @@ func (this *MicroServiceService) Unregister(w http.ResponseWriter, r *http.Reque
 
 func (this *MicroServiceService) GetServices(w http.ResponseWriter, r *http.Request) {
 	request := &pb.GetServicesRequest{}
-	util.Logger().Debugf("domain is %s", util.ParseDomain(r.Context()))
 	resp, _ := core.ServiceAPI.GetServices(r.Context(), request)
 	respInternal := resp.Response
 	resp.Response = nil
diff --git a/server/service/event/dependency_event_handler.go b/server/service/event/dependency_event_handler.go
index 99808e1..d1bd311 100644
--- a/server/service/event/dependency_event_handler.go
+++ b/server/service/event/dependency_event_handler.go
@@ -91,9 +91,9 @@ func (h *DependencyEventHandler) loop() {
 }
 
 type DependencyEventHandlerResource struct {
-	dep    *pb.ConsumerDependency
-	kv     *mvccpb.KeyValue
-	domainProject  string
+	dep           *pb.ConsumerDependency
+	kv            *mvccpb.KeyValue
+	domainProject string
 }
 
 func NewDependencyEventHandlerResource(dep *pb.ConsumerDependency, kv *mvccpb.KeyValue, domainProject string) *DependencyEventHandlerResource {
@@ -150,7 +150,7 @@ func (h *DependencyEventHandler) Handle() error {
 
 	dependencyRuleHandleResults := make(chan error, len(resourcesMap))
 	for lockKey, resources := range resourcesMap {
-		go func(lockKey string, resources []*DependencyEventHandlerResource){
+		go func(lockKey string, resources []*DependencyEventHandlerResource) {
 			err := h.dependencyRuleHandle(ctx, lockKey, resources)
 			dependencyRuleHandleResults <- err
 		}(lockKey, resources)
@@ -169,7 +169,7 @@ func (h *DependencyEventHandler) Handle() error {
 	return lastErr
 }
 
-func (h *DependencyEventHandler)dependencyRuleHandle(ctx context.Context, lockKey string, resources []*DependencyEventHandlerResource) error{
+func (h *DependencyEventHandler) dependencyRuleHandle(ctx context.Context, lockKey string, resources []*DependencyEventHandlerResource) error {
 	lock, err := serviceUtil.DependencyLock(lockKey)
 	if err != nil {
 		util.Logger().Errorf(err, "create dependency rule locker failed")
diff --git a/server/service/instances.go b/server/service/instances.go
index 1f066fb..2e12fcf 100644
--- a/server/service/instances.go
+++ b/server/service/instances.go
@@ -589,9 +589,13 @@ func (s *InstanceService) Find(ctx context.Context, in *pb.FindInstancesRequest)
 		}, nil
 	}
 
-	instances := make([]*pb.MicroServiceInstance, 0)
+	var instances []*pb.MicroServiceInstance
+	cloneCtx := ctx
+	if s, ok := ctx.Value("noCache").(string); !ok || s != "1" {
+		cloneCtx = util.SetContext(util.CloneContext(ctx), "cacheOnly", "1")
+	}
 	for _, serviceId := range ids {
-		resp, err := s.GetInstances(ctx, &pb.GetInstancesRequest{
+		resp, err := s.GetInstances(cloneCtx, &pb.GetInstancesRequest{
 			ConsumerServiceId: in.ConsumerServiceId,
 			ProviderServiceId: serviceId,
 			Tags:              in.Tags,
diff --git a/server/service/instances_test.go b/server/service/instances_test.go
index 355a615..dfba6d3 100644
--- a/server/service/instances_test.go
+++ b/server/service/instances_test.go
@@ -909,7 +909,7 @@ var _ = Describe("'Instance' service", func() {
 
 				respFind, err = instanceResource.Find(
 					util.SetTargetDomainProject(
-						util.SetDomainProject(context.Background(), "user", "user"),
+						util.SetDomainProject(util.CloneContext(getContext()), "user", "user"),
 						"default", "default"),
 					&pb.FindInstancesRequest{
 						ConsumerServiceId: serviceId6,
diff --git a/server/service/microservices.go b/server/service/microservices.go
index 319c108..dcaa4be 100644
--- a/server/service/microservices.go
+++ b/server/service/microservices.go
@@ -18,6 +18,7 @@ package service
 
 import (
 	"encoding/json"
+	"errors"
 	"fmt"
 	"github.com/apache/incubator-servicecomb-service-center/pkg/util"
 	"github.com/apache/incubator-servicecomb-service-center/server/core"
@@ -33,7 +34,6 @@ import (
 	"golang.org/x/net/context"
 	"strconv"
 	"time"
-	"errors"
 )
 
 type MicroServiceService struct {
@@ -130,6 +130,7 @@ func (s *MicroServiceService) CreateServicePri(ctx context.Context, in *pb.Creat
 	index := apt.GenerateServiceIndexKey(serviceKey)
 	indexBytes := util.StringToBytesWithNoCopy(index)
 	aliasBytes := util.StringToBytesWithNoCopy(apt.GenerateServiceAliasKey(serviceKey))
+
 	opts := []registry.PluginOp{
 		registry.OpPut(registry.WithStrKey(key), registry.WithValue(data)),
 		registry.OpPut(registry.WithKey(indexBytes), registry.WithStrValue(serviceId)),
diff --git a/server/service/rule.go b/server/service/rule.go
index 287dbc2..4060907 100644
--- a/server/service/rule.go
+++ b/server/service/rule.go
@@ -64,7 +64,6 @@ func (s *MicroServiceService) AddRule(ctx context.Context, in *pb.AddServiceRule
 		return response, nil
 	}
 
-
 	ruleType, _, err := serviceUtil.GetServiceRuleType(ctx, domainProject, in.ServiceId)
 	util.Logger().Debugf("ruleType is %s", ruleType)
 	if err != nil {
diff --git a/server/service/util/dependency.go b/server/service/util/dependency.go
index 5bfb978..68c6bf7 100644
--- a/server/service/util/dependency.go
+++ b/server/service/util/dependency.go
@@ -21,6 +21,7 @@ import (
 	"errors"
 	"fmt"
 	"github.com/apache/incubator-servicecomb-service-center/pkg/cache"
+	"github.com/apache/incubator-servicecomb-service-center/pkg/etcdsync"
 	"github.com/apache/incubator-servicecomb-service-center/pkg/util"
 	apt "github.com/apache/incubator-servicecomb-service-center/server/core"
 	"github.com/apache/incubator-servicecomb-service-center/server/core/backend"
@@ -28,11 +29,10 @@ import (
 	pb "github.com/apache/incubator-servicecomb-service-center/server/core/proto"
 	scerr "github.com/apache/incubator-servicecomb-service-center/server/error"
 	"github.com/apache/incubator-servicecomb-service-center/server/infra/registry"
+	"github.com/apache/incubator-servicecomb-service-center/server/mux"
 	"golang.org/x/net/context"
 	"strings"
 	"time"
-	"github.com/apache/incubator-servicecomb-service-center/pkg/etcdsync"
-	"github.com/apache/incubator-servicecomb-service-center/server/mux"
 )
 
 var consumerCache *cache.Cache
@@ -1059,5 +1059,5 @@ func DependencyLock(lockKey string) (*etcdsync.DLock, error) {
 }
 
 func NewDependencyLockKey(domainProject, env string) string {
-	return util.StringJoin([]string{"","env-lock", domainProject, env}, "/")
-}
\ No newline at end of file
+	return util.StringJoin([]string{"", "env-lock", domainProject, env}, "/")
+}

-- 
To stop receiving notification emails like this one, please contact
asifdxtreme@apache.org.