You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@servicecomb.apache.org by GitBox <gi...@apache.org> on 2018/01/29 04:39:23 UTC

[GitHub] asifdxtreme closed pull request #261: SCB-296 Self preservation will never stop when SC do list-watch loop

asifdxtreme closed pull request #261: SCB-296 Self preservation will never stop when SC do list-watch loop
URL: https://github.com/apache/incubator-servicecomb-service-center/pull/261
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/etc/conf/app.conf b/etc/conf/app.conf
index 9d54f2d6..a48f08c5 100644
--- a/etc/conf/app.conf
+++ b/etc/conf/app.conf
@@ -26,11 +26,12 @@ plugins_dir = ./plugins
 registry_plugin = etcd
 
 # registry address
-# registry_plugin equals to 'embeded_etcd', example:
+# 1. if registry_plugin equals to 'embeded_etcd'
 # manager_name = "sc-0"
 # manager_addr = "http://127.0.0.1:2380"
 # manager_cluster = "sc-0=http://127.0.0.1:2380"
-# registry_plugin equals to 'etcd'
+# 2. if registry_plugin equals to 'etcd'
+# manager_cluster = "127.0.0.1:2379"
 manager_cluster = "127.0.0.1:2379"
 
 #heartbeat that sync synchronizes client's endpoints with the known endpoints from the etcd membership,unit is second.
diff --git a/integration/instances_test.go b/integration/instances_test.go
index 8ab0a78f..948c9f62 100644
--- a/integration/instances_test.go
+++ b/integration/instances_test.go
@@ -248,7 +248,7 @@ var _ = Describe("MicroService Api Test", func() {
 
 		By("Discover MicroService Instance API", func() {
 			It("Find Micro-service Info by AppID", func() {
-				req, _ := http.NewRequest(GET, SCURL+FINDINSTANCE+"?appId="+serviceAppId+"&serviceName="+serviceName+"&version="+serviceVersion, nil)
+				req, _ := http.NewRequest(GET, SCURL+FINDINSTANCE+"?noCache=1&appId="+serviceAppId+"&serviceName="+serviceName+"&version="+serviceVersion, nil)
 				req.Header.Set("X-Domain-Name", "default")
 				req.Header.Set("X-ConsumerId", serviceId)
 				resp, _ := scclient.Do(req)
diff --git a/pkg/chain/chain.go b/pkg/chain/chain.go
index 85178243..a48fc1c5 100644
--- a/pkg/chain/chain.go
+++ b/pkg/chain/chain.go
@@ -60,7 +60,7 @@ func (c *Chain) syncNext(i *Invocation) {
 }
 
 func (c *Chain) Next(i *Invocation) {
-	go c.syncNext(i)
+	c.syncNext(i)
 }
 
 func NewChain(name string, handlers []Handler) Chain {
diff --git a/pkg/httplimiter/httpratelimiter.go b/pkg/httplimiter/httpratelimiter.go
index 9128d5b6..31100569 100644
--- a/pkg/httplimiter/httpratelimiter.go
+++ b/pkg/httplimiter/httpratelimiter.go
@@ -18,13 +18,13 @@
 package httplimiter
 
 import (
+	"fmt"
+	"github.com/apache/incubator-servicecomb-service-center/pkg/ratelimiter"
 	"net/http"
 	"strconv"
 	"strings"
-	"time"
-	"github.com/apache/incubator-servicecomb-service-center/pkg/ratelimiter"
-	"fmt"
 	"sync"
+	"time"
 )
 
 type HTTPErrorMessage struct {
@@ -36,7 +36,6 @@ func (httpErrorMessage *HTTPErrorMessage) Error() string {
 	return fmt.Sprintf("%v: %v", httpErrorMessage.StatusCode, httpErrorMessage.Message)
 }
 
-
 type HttpLimiter struct {
 	HttpMessage    string
 	ContentType    string
@@ -51,8 +50,6 @@ type HttpLimiter struct {
 	sync.RWMutex
 }
 
-
-
 func LimitBySegments(limiter *HttpLimiter, keys []string) *HTTPErrorMessage {
 	if limiter.LimitExceeded(strings.Join(keys, "|")) {
 		return &HTTPErrorMessage{Message: limiter.HttpMessage, StatusCode: limiter.StatusCode}
@@ -199,7 +196,6 @@ func getRemoteIP(ipLookups []string, r *http.Request) string {
 	return ""
 }
 
-
 func NewHttpLimiter(max int64, ttl time.Duration) *HttpLimiter {
 	limiter := &HttpLimiter{RequestLimit: max, TTL: ttl}
 	limiter.ContentType = "text/plain; charset=utf-8"
@@ -211,7 +207,6 @@ func NewHttpLimiter(max int64, ttl time.Duration) *HttpLimiter {
 	return limiter
 }
 
-
 func (rateLimiter *HttpLimiter) LimitExceeded(key string) bool {
 	rateLimiter.Lock()
 	if _, found := rateLimiter.leakyBuckets[key]; !found {
@@ -224,5 +219,3 @@ func (rateLimiter *HttpLimiter) LimitExceeded(key string) bool {
 	}
 	return true
 }
-
-
diff --git a/pkg/logrotate/logrotate.go b/pkg/logrotate/logrotate.go
index c2e9e93a..76de98d3 100644
--- a/pkg/logrotate/logrotate.go
+++ b/pkg/logrotate/logrotate.go
@@ -240,9 +240,9 @@ func LogRotate(path string, MaxFileSize int, MaxBackupCount int) {
 	}
 }
 
-func isSkip(f os.FileInfo) bool{
+func isSkip(f os.FileInfo) bool {
 	//dir or non write permission,skip
-	return f.IsDir() || (f.Mode() & 0200 == 0000)
+	return f.IsDir() || (f.Mode()&0200 == 0000)
 }
 
 //path : where the file will be filtered
diff --git a/pkg/ratelimiter/ratelimiter.go b/pkg/ratelimiter/ratelimiter.go
index 8184108e..a46bc399 100644
--- a/pkg/ratelimiter/ratelimiter.go
+++ b/pkg/ratelimiter/ratelimiter.go
@@ -18,8 +18,8 @@
 package ratelimiter
 
 import (
-	"time"
 	"sync"
+	"time"
 )
 
 type LeakyBucket struct {
@@ -32,7 +32,6 @@ type LeakyBucket struct {
 	availableTicker int64
 }
 
-
 func NewLeakyBucket(fillInterval time.Duration, capacity, quantum int64) *LeakyBucket {
 	if fillInterval <= 0 {
 		panic("leaky bucket fill interval is not > 0")
@@ -77,7 +76,6 @@ func (leakyBucket *LeakyBucket) MaximumTakeDuration(count int64, maxWait time.Du
 	return leakyBucket.take(time.Now(), count, maxWait)
 }
 
-
 func (leakyBucket *LeakyBucket) Rate() float64 {
 	return 1e9 * float64(leakyBucket.quantum) / float64(leakyBucket.interval)
 }
@@ -118,4 +116,3 @@ func (leakyBucket *LeakyBucket) adjust(now time.Time) (currentTick int64) {
 	leakyBucket.availableTicker = currentTick
 	return
 }
-
diff --git a/pkg/rest/client.go b/pkg/rest/client.go
index 3ce4e29c..2cd68abe 100644
--- a/pkg/rest/client.go
+++ b/pkg/rest/client.go
@@ -28,9 +28,9 @@ import (
 	"io/ioutil"
 	"net"
 	"net/http"
+	"net/url"
 	"reflect"
 	"time"
-	"net/url"
 )
 
 const (
diff --git a/pkg/rest/route.go b/pkg/rest/route.go
index 65950219..3015affa 100644
--- a/pkg/rest/route.go
+++ b/pkg/rest/route.go
@@ -23,6 +23,7 @@ import (
 	errorsEx "github.com/apache/incubator-servicecomb-service-center/pkg/errors"
 	"github.com/apache/incubator-servicecomb-service-center/pkg/util"
 	"net/http"
+	"net/url"
 	"strings"
 )
 
@@ -77,7 +78,7 @@ func (this *ROAServerHandler) ServeHTTP(w http.ResponseWriter, r *http.Request)
 	for _, ph := range this.handlers[r.Method] {
 		if params, ok := ph.try(r.URL.Path); ok {
 			if len(params) > 0 {
-				r.URL.RawQuery = util.UrlEncode(params) + "&" + r.URL.RawQuery
+				r.URL.RawQuery = params + r.URL.RawQuery
 			}
 
 			this.serve(ph, w, r)
@@ -151,7 +152,7 @@ func (this *ROAServerHandler) serve(ph *urlPatternHandler, w http.ResponseWriter
 	<-ch
 }
 
-func (this *urlPatternHandler) try(path string) (p map[string]string, _ bool) {
+func (this *urlPatternHandler) try(path string) (p string, _ bool) {
 	var i, j int
 	l, sl := len(this.Path), len(path)
 	for i < sl {
@@ -160,7 +161,7 @@ func (this *urlPatternHandler) try(path string) (p map[string]string, _ bool) {
 			if this.Path != "/" && l > 0 && this.Path[l-1] == '/' {
 				return p, true
 			}
-			return nil, false
+			return "", false
 		case this.Path[j] == ':':
 			var val string
 			var nextc byte
@@ -168,19 +169,16 @@ func (this *urlPatternHandler) try(path string) (p map[string]string, _ bool) {
 			_, nextc, j = match(this.Path, isAlnum, 0, j+1)
 			val, _, i = match(path, matchParticial, nextc, i)
 
-			if p == nil {
-				p = make(map[string]string, 5)
-			}
-			p[this.Path[o:j]] = val
+			p += url.QueryEscape(this.Path[o:j]) + "=" + url.QueryEscape(val) + "&"
 		case path[i] == this.Path[j]:
 			i++
 			j++
 		default:
-			return nil, false
+			return "", false
 		}
 	}
 	if j != l {
-		return nil, false
+		return "", false
 	}
 	return p, true
 }
diff --git a/server/core/backend/store/cacher.go b/server/core/backend/store/cacher.go
index 7ca82cf2..c6ade974 100644
--- a/server/core/backend/store/cacher.go
+++ b/server/core/backend/store/cacher.go
@@ -120,6 +120,15 @@ func (c *KvCache) Have(k interface{}) (ok bool) {
 	return
 }
 
+func (c *KvCache) RLock() map[string]*mvccpb.KeyValue {
+	c.rwMux.RLock()
+	return c.store
+}
+
+func (c *KvCache) RUnlock() {
+	c.rwMux.RUnlock()
+}
+
 func (c *KvCache) Lock() map[string]*mvccpb.KeyValue {
 	c.rwMux.Lock()
 	return c.store
@@ -251,15 +260,39 @@ func (c *KvCacher) handleWatcher(watcher *Watcher) error {
 }
 
 func (c *KvCacher) needDeferHandle(evts []*Event) bool {
-	if c.Cfg.DeferHander == nil {
+	if c.Cfg.DeferHandler == nil {
 		return false
 	}
 
-	return c.Cfg.DeferHander.OnCondition(c.Cache(), evts)
+	return c.Cfg.DeferHandler.OnCondition(c.Cache(), evts)
+}
+
+func (c *KvCacher) refresh(stopCh <-chan struct{}) {
+	util.Logger().Debugf("start to list and watch %s", c.Cfg)
+	ctx, cancel := context.WithCancel(context.Background())
+	c.goroute.Do(func(stopCh <-chan struct{}) {
+		defer cancel()
+		<-stopCh
+	})
+	for {
+		start := time.Now()
+		c.ListAndWatch(ctx)
+		watchDuration := time.Since(start)
+		nextPeriod := 0 * time.Second
+		if watchDuration > 0 && c.Cfg.Period > watchDuration {
+			nextPeriod = c.Cfg.Period - watchDuration
+		}
+		select {
+		case <-stopCh:
+			util.Logger().Debugf("stop to list and watch %s", c.Cfg)
+			return
+		case <-time.After(nextPeriod):
+		}
+	}
 }
 
 func (c *KvCacher) deferHandle(stopCh <-chan struct{}) {
-	if c.Cfg.DeferHander == nil {
+	if c.Cfg.DeferHandler == nil {
 		return
 	}
 
@@ -268,7 +301,7 @@ func (c *KvCacher) deferHandle(stopCh <-chan struct{}) {
 		select {
 		case <-stopCh:
 			return
-		case evt, ok := <-c.Cfg.DeferHander.HandleChan():
+		case evt, ok := <-c.Cfg.DeferHandler.HandleChan():
 			if !ok {
 				<-time.After(time.Second)
 				continue
@@ -281,7 +314,7 @@ func (c *KvCacher) deferHandle(stopCh <-chan struct{}) {
 
 			evts[i] = evt
 			i++
-		case <-time.After(time.Second):
+		case <-time.After(300 * time.Millisecond):
 			if i == 0 {
 				continue
 			}
@@ -305,9 +338,9 @@ func (c *KvCacher) sync(evts []*Event) {
 }
 
 func (c *KvCacher) filter(rev int64, items []*mvccpb.KeyValue) []*Event {
-	cache := c.Cache().(*KvCache)
-	store := cache.Lock()
-	defer cache.Unlock()
+	store := c.cache.RLock()
+	defer c.cache.RUnlock()
+
 	oc, nc := len(store), len(items)
 	tc := oc + nc
 	if tc == 0 {
@@ -359,7 +392,7 @@ func (c *KvCacher) filterDelete(store map[string]*mvccpb.KeyValue, newStore map[
 		block[i] = &Event{
 			Revision: rev,
 			Type:     proto.EVT_DELETE,
-			Key:      c.Cfg.Key,
+			Prefix:   c.Cfg.Key,
 			Object:   v,
 		}
 		i++
@@ -387,7 +420,7 @@ func (c *KvCacher) filterCreateOrUpdate(store map[string]*mvccpb.KeyValue, newSt
 			block[i] = &Event{
 				Revision: rev,
 				Type:     proto.EVT_CREATE,
-				Key:      c.Cfg.Key,
+				Prefix:   c.Cfg.Key,
 				Object:   v,
 			}
 			i++
@@ -407,7 +440,7 @@ func (c *KvCacher) filterCreateOrUpdate(store map[string]*mvccpb.KeyValue, newSt
 		block[i] = &Event{
 			Revision: rev,
 			Type:     proto.EVT_UPDATE,
-			Key:      c.Cfg.Key,
+			Prefix:   c.Cfg.Key,
 			Object:   v,
 		}
 		i++
@@ -424,10 +457,9 @@ func (c *KvCacher) filterCreateOrUpdate(store map[string]*mvccpb.KeyValue, newSt
 }
 
 func (c *KvCacher) onEvents(evts []*Event) {
-	cache := c.Cache().(*KvCache)
-	idx := 0
+	idx, init := 0, !c.IsReady()
 	kvEvts := make([]*KvEvent, len(evts))
-	store := cache.Lock()
+	store := c.cache.Lock()
 	for _, evt := range evts {
 		kv := evt.Object.(*mvccpb.KeyValue)
 		key := util.BytesToStringWithNoCopy(kv.Key)
@@ -456,7 +488,6 @@ func (c *KvCacher) onEvents(evts []*Event) {
 				Action:   t,
 				KV:       kv,
 			}
-			idx++
 		case proto.EVT_DELETE:
 			if !ok {
 				util.Logger().Warnf(nil, "unexpected %s event! key %s does not exist", evt.Type, key)
@@ -470,10 +501,15 @@ func (c *KvCacher) onEvents(evts []*Event) {
 				Action:   evt.Type,
 				KV:       prevKv,
 			}
-			idx++
 		}
+
+		if init && kvEvts[idx].Action == proto.EVT_CREATE {
+			kvEvts[idx].Action = proto.EVT_INIT
+		}
+
+		idx++
 	}
-	cache.Unlock()
+	c.cache.Unlock()
 
 	c.onKvEvents(kvEvts[:idx])
 }
@@ -488,30 +524,7 @@ func (c *KvCacher) onKvEvents(evts []*KvEvent) {
 }
 
 func (c *KvCacher) run() {
-	c.goroute.Do(func(stopCh <-chan struct{}) {
-		util.Logger().Debugf("start to list and watch %s", c.Cfg)
-		ctx, cancel := context.WithCancel(context.Background())
-		c.goroute.Do(func(stopCh <-chan struct{}) {
-			defer cancel()
-			<-stopCh
-		})
-		for {
-			start := time.Now()
-			c.ListAndWatch(ctx)
-			watchDuration := time.Now().Sub(start)
-			nextPeriod := 0 * time.Second
-			if watchDuration > 0 && c.Cfg.Period > watchDuration {
-				nextPeriod = c.Cfg.Period - watchDuration
-			}
-			select {
-			case <-stopCh:
-				util.Logger().Debugf("stop to list and watch %s", c.Cfg)
-				return
-			case <-time.After(nextPeriod):
-			}
-		}
-	})
-
+	c.goroute.Do(c.refresh)
 	c.goroute.Do(c.deferHandle)
 }
 
@@ -533,6 +546,15 @@ func (c *KvCacher) Ready() <-chan struct{} {
 	return c.ready
 }
 
+func (c *KvCacher) IsReady() bool {
+	select {
+	case <-c.ready:
+		return true
+	default:
+		return false
+	}
+}
+
 func NewKvCache(c *KvCacher, size int) *KvCache {
 	return &KvCache{
 		owner:       c,
diff --git a/server/core/backend/store/defer.go b/server/core/backend/store/defer.go
index 7f7bbfae..173a44a9 100644
--- a/server/core/backend/store/defer.go
+++ b/server/core/backend/store/defer.go
@@ -30,122 +30,127 @@ type DeferHandler interface {
 	HandleChan() <-chan *Event
 }
 
+type deferItem struct {
+	ttl   *time.Timer
+	event *Event
+}
+
 type InstanceEventDeferHandler struct {
 	Percent float64
-	enabled bool
-	events  map[string]*Event
-	ttls    map[string]int64
-	mux     sync.RWMutex
-	deferCh chan *Event
-}
 
-func (iedh *InstanceEventDeferHandler) deferMode(total int, del int) bool {
-	return iedh.Percent > 0 && total > 0 &&
-		del > 1 &&
-		float64(del/total) >= iedh.Percent
+	cache     Cache
+	once      sync.Once
+	enabled   bool
+	items     map[string]*deferItem
+	pendingCh chan []*Event
+	deferCh   chan *Event
 }
 
-func (iedh *InstanceEventDeferHandler) needDefer(cache Cache, evts []*Event) bool {
-	if !iedh.deferMode(cache.Size(), len(evts)) {
+func (iedh *InstanceEventDeferHandler) OnCondition(cache Cache, evts []*Event) bool {
+	if iedh.Percent <= 0 {
 		return false
 	}
 
-	for _, evt := range evts {
-		if evt.Type != pb.EVT_DELETE {
-			return false
-		}
-	}
-	return true
-}
-
-func (iedh *InstanceEventDeferHandler) init() {
-	if iedh.deferCh == nil {
+	iedh.once.Do(func() {
+		iedh.cache = cache
+		iedh.items = make(map[string]*deferItem, event_block_size)
+		iedh.pendingCh = make(chan []*Event, event_block_size)
 		iedh.deferCh = make(chan *Event, event_block_size)
-	}
-
-	if iedh.events == nil {
-		iedh.events = make(map[string]*Event, event_block_size)
-		iedh.ttls = make(map[string]int64, event_block_size)
 		util.Go(iedh.check)
-	}
-}
-
-func (iedh *InstanceEventDeferHandler) OnCondition(cache Cache, evts []*Event) bool {
-	iedh.mux.Lock()
-	if !iedh.enabled && iedh.needDefer(cache, evts) {
-		util.Logger().Warnf(nil, "self preservation is enabled, caught %d(>=%.0f%%) DELETE events",
-			len(evts), iedh.Percent*100)
-		iedh.enabled = true
-	}
+	})
 
-	if !iedh.enabled {
-		iedh.mux.Unlock()
-		return false
-	}
-
-	iedh.init()
-
-	for _, evt := range evts {
-		kv, ok := evt.Object.(*mvccpb.KeyValue)
-		if !ok {
-			continue
-		}
-		key := util.BytesToStringWithNoCopy(kv.Key)
-		switch evt.Type {
-		case pb.EVT_CREATE, pb.EVT_UPDATE:
-			delete(iedh.events, key)
-			delete(iedh.ttls, key)
+	iedh.pendingCh <- evts
+	return true
+}
 
+func (iedh *InstanceEventDeferHandler) recoverOrDefer(evt *Event) error {
+	kv := evt.Object.(*mvccpb.KeyValue)
+	key := util.BytesToStringWithNoCopy(kv.Key)
+	_, ok := iedh.items[key]
+	switch evt.Type {
+	case pb.EVT_CREATE, pb.EVT_UPDATE:
+		if ok {
 			util.Logger().Infof("recovered key %s events", key)
+			// return nil // no need to publish event to subscribers?
+		}
+		iedh.recover(evt)
+	case pb.EVT_DELETE:
+		if ok {
+			return nil
+		}
 
-			iedh.deferCh <- evt
-		case pb.EVT_DELETE:
-			var instance pb.MicroServiceInstance
-			err := json.Unmarshal(kv.Value, &instance)
-			if err != nil {
-				util.Logger().Errorf(err, "unmarshal instance file failed, key is %s", key)
-				continue
-			}
-			iedh.events[key] = evt
-			iedh.ttls[key] = int64(instance.HealthCheck.Interval * (instance.HealthCheck.Times + 1))
+		var instance pb.MicroServiceInstance
+		err := json.Unmarshal(kv.Value, &instance)
+		if err != nil {
+			util.Logger().Errorf(err, "unmarshal instance file failed, key is %s", key)
+			return err
+		}
+		iedh.items[key] = &deferItem{
+			ttl: time.NewTimer(
+				time.Duration(instance.HealthCheck.Interval*(instance.HealthCheck.Times+1)) * time.Second),
+			event: evt,
 		}
 	}
-	iedh.mux.Unlock()
-	return true
+	return nil
+}
+
+func (iedh *InstanceEventDeferHandler) HandleChan() <-chan *Event {
+	return iedh.deferCh
 }
 
 func (iedh *InstanceEventDeferHandler) check(stopCh <-chan struct{}) {
 	defer util.RecoverAndReport()
+	t, n := iedh.newTimer(), false
 	for {
 		select {
 		case <-stopCh:
 			return
-		case <-time.After(time.Second):
-			iedh.mux.Lock()
-			for key, ttl := range iedh.ttls {
-				ttl--
-				if ttl > 0 {
-					iedh.ttls[key] = ttl
-					continue
-				}
-
-				evt := iedh.events[key]
-				delete(iedh.events, key)
-				delete(iedh.ttls, key)
+		case evts := <-iedh.pendingCh:
+			for _, evt := range evts {
+				iedh.recoverOrDefer(evt)
+			}
 
-				util.Logger().Warnf(nil, "defer handle timed out, removed key is %s", key)
+			del := len(iedh.items)
+			if del > 0 && !n {
+				t.Stop()
+				t, n = iedh.newTimer(), true
+			}
 
-				iedh.deferCh <- evt
+			total := iedh.cache.Size()
+			if del > 0 && total > 0 && float64(del) >= float64(total)*iedh.Percent {
+				iedh.enabled = true
+				util.Logger().Warnf(nil, "self preservation is enabled, caught %d/%d(>=%.0f%%) DELETE events",
+					del, total, iedh.Percent*100)
+			}
+		case <-t.C:
+			t, n = iedh.newTimer(), false
+
+			for key, item := range iedh.items {
+				if iedh.enabled {
+					select {
+					case <-item.ttl.C:
+					default:
+						continue
+					}
+					util.Logger().Warnf(nil, "defer handle timed out, removed key is %s", key)
+				}
+				iedh.recover(item.event)
 			}
-			if iedh.enabled && len(iedh.ttls) == 0 {
+
+			if iedh.enabled && len(iedh.items) == 0 {
 				iedh.enabled = false
 				util.Logger().Warnf(nil, "self preservation is stopped")
 			}
-			iedh.mux.Unlock()
 		}
 	}
 }
 
-func (iedh *InstanceEventDeferHandler) HandleChan() <-chan *Event {
-	return iedh.deferCh
+func (iedh *InstanceEventDeferHandler) newTimer() *time.Timer {
+	return time.NewTimer(2 * time.Second) // instance DELETE event will be delay.
+}
+
+func (iedh *InstanceEventDeferHandler) recover(evt *Event) {
+	key := util.BytesToStringWithNoCopy(evt.Object.(*mvccpb.KeyValue).Key)
+	delete(iedh.items, key)
+	iedh.deferCh <- evt
 }
diff --git a/server/core/backend/store/defer_test.go b/server/core/backend/store/defer_test.go
new file mode 100644
index 00000000..2c9c4537
--- /dev/null
+++ b/server/core/backend/store/defer_test.go
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package store
+
+import (
+	"encoding/json"
+	"fmt"
+	"github.com/apache/incubator-servicecomb-service-center/pkg/util"
+	pb "github.com/apache/incubator-servicecomb-service-center/server/core/proto"
+	"github.com/coreos/etcd/mvcc/mvccpb"
+	"testing"
+	"time"
+)
+
+func TestInstanceEventDeferHandler_OnCondition(t *testing.T) {
+	iedh := &InstanceEventDeferHandler{
+		Percent: 0,
+	}
+
+	if iedh.OnCondition(nil, nil) {
+		fmt.Printf(`TestInstanceEventDeferHandler_OnCondition with 0%% failed`)
+		t.FailNow()
+	}
+
+	iedh.Percent = 0.01
+	if !iedh.OnCondition(nil, nil) {
+		fmt.Printf(`TestInstanceEventDeferHandler_OnCondition with 1%% failed`)
+		t.FailNow()
+	}
+}
+
+func TestInstanceEventDeferHandler_HandleChan(t *testing.T) {
+	inst := &pb.MicroServiceInstance{
+		HealthCheck: &pb.HealthCheck{
+			Interval: 4,
+			Times:    0,
+		},
+	}
+	b, _ := json.Marshal(inst)
+	kv1 := &mvccpb.KeyValue{
+		Key:   util.StringToBytesWithNoCopy("/1"),
+		Value: b,
+	}
+	kv2 := &mvccpb.KeyValue{
+		Key:   util.StringToBytesWithNoCopy("/2"),
+		Value: b,
+	}
+	kv3 := &mvccpb.KeyValue{
+		Key:   util.StringToBytesWithNoCopy("/3"),
+		Value: b,
+	}
+
+	cache := NewKvCache(nil, 1)
+	cache.store["/1"] = kv1
+	cache.store["/2"] = kv2
+	cache.store["/3"] = kv3
+
+	evts1 := []*Event{
+		{
+			Type:   pb.EVT_CREATE,
+			Object: kv1,
+		},
+		{
+			Type:   pb.EVT_UPDATE,
+			Object: kv1,
+		},
+	}
+	evts2 := []*Event{
+		{
+			Type:   pb.EVT_DELETE,
+			Object: kv2,
+		},
+		{
+			Type:   pb.EVT_DELETE,
+			Object: kv3,
+		},
+	}
+	evts3 := []*Event{
+		{
+			Type:   pb.EVT_CREATE,
+			Object: kv2,
+		},
+	}
+
+	iedh := &InstanceEventDeferHandler{
+		Percent: 0.01,
+	}
+
+	iedh.OnCondition(cache, evts1)
+	iedh.OnCondition(cache, evts2)
+	iedh.OnCondition(cache, evts3)
+
+	getEvents(t, iedh)
+
+	iedh.Percent = 0.8
+	iedh.OnCondition(cache, evts1)
+	iedh.OnCondition(cache, evts2)
+	iedh.OnCondition(cache, evts3)
+
+	getEvents(t, iedh)
+}
+
+func getEvents(t *testing.T, iedh *InstanceEventDeferHandler) {
+	fmt.Println(time.Now())
+	c := time.After(3 * time.Second)
+	var evt3 *Event
+	for {
+		select {
+		case evt := <-iedh.HandleChan():
+			fmt.Println(time.Now(), evt)
+			if string(evt.Object.(*mvccpb.KeyValue).Key) == "/3" {
+				evt3 = evt
+				if iedh.Percent == 0.01 && evt.Type == pb.EVT_DELETE {
+					fmt.Printf(`TestInstanceEventDeferHandler_HandleChan with 1%% failed`)
+					t.FailNow()
+				}
+			}
+			continue
+		case <-c:
+			if iedh.Percent == 0.8 && evt3 == nil {
+				fmt.Printf(`TestInstanceEventDeferHandler_HandleChan with 80%% failed`)
+				t.FailNow()
+			}
+		}
+		break
+	}
+}
diff --git a/server/core/backend/store/indexer.go b/server/core/backend/store/indexer.go
index 528922da..72360503 100644
--- a/server/core/backend/store/indexer.go
+++ b/server/core/backend/store/indexer.go
@@ -264,11 +264,6 @@ func (i *Indexer) deletePrefixKey(prefix, key string) {
 	if !ok {
 		return
 	}
-	// remove child
-	for k := range i.prefixIndex[key] {
-		i.deletePrefixKey(key, k)
-	}
-
 	delete(m, key)
 
 	// remove parent which has no child
diff --git a/server/core/backend/store/listwatch.go b/server/core/backend/store/listwatch.go
index 9e210bcf..189f9094 100644
--- a/server/core/backend/store/listwatch.go
+++ b/server/core/backend/store/listwatch.go
@@ -32,7 +32,7 @@ const EVENT_BUS_MAX_SIZE = 1000
 type Event struct {
 	Revision int64
 	Type     proto.EventType
-	Key      string
+	Prefix   string
 	Object   interface{}
 }
 
@@ -93,7 +93,7 @@ func (lw *ListWatcher) doWatch(ctx context.Context, f func(evt []*Event)) error
 
 				evts := make([]*Event, len(resp.Kvs))
 				for i, kv := range resp.Kvs {
-					evt := &Event{Key: lw.Key, Revision: kv.ModRevision}
+					evt := &Event{Prefix: lw.Key, Revision: kv.ModRevision}
 					switch {
 					case resp.Action == registry.Put && kv.Version == 1:
 						evt.Type, evt.Object = proto.EVT_CREATE, kv
@@ -148,9 +148,9 @@ func (w *Watcher) process() {
 	}
 }
 
-func (w *Watcher) sendEvent(evt []*Event) {
+func (w *Watcher) sendEvent(evts []*Event) {
 	defer util.RecoverAndReport()
-	w.bus <- evt
+	w.bus <- evts
 }
 
 func (w *Watcher) Stop() {
@@ -168,7 +168,7 @@ func (w *Watcher) Stop() {
 func errEvent(key string, err error) *Event {
 	return &Event{
 		Type:   proto.EVT_ERROR,
-		Key:    key,
+		Prefix: key,
 		Object: err,
 	}
 }
diff --git a/server/core/backend/store/opt.go b/server/core/backend/store/opt.go
index 252f2449..86142b15 100644
--- a/server/core/backend/store/opt.go
+++ b/server/core/backend/store/opt.go
@@ -34,7 +34,7 @@ type KvCacherCfg struct {
 	Timeout            time.Duration
 	Period             time.Duration
 	OnEvent            KvEventFunc
-	DeferHander        DeferHandler
+	DeferHandler       DeferHandler
 }
 
 func (cfg KvCacherCfg) String() string {
@@ -65,7 +65,7 @@ func WithEventFunc(f KvEventFunc) KvCacherCfgOption {
 }
 
 func WithDeferHandler(h DeferHandler) KvCacherCfgOption {
-	return func(cfg *KvCacherCfg) { cfg.DeferHander = h }
+	return func(cfg *KvCacherCfg) { cfg.DeferHandler = h }
 }
 
 func DefaultKvCacherConfig() KvCacherCfg {
diff --git a/server/core/backend/store/store.go b/server/core/backend/store/store.go
index d2f89e79..6a5f38c3 100644
--- a/server/core/backend/store/store.go
+++ b/server/core/backend/store/store.go
@@ -124,13 +124,6 @@ func (s *KvStore) Initialize() {
 
 func (s *KvStore) dispatchEvent(t StoreType, evt *KvEvent) {
 	s.indexers[t].OnCacheEvent(evt)
-	select {
-	case <-s.Ready():
-	default:
-		if evt.Action == pb.EVT_CREATE {
-			evt.Action = pb.EVT_INIT
-		}
-	}
 	EventProxy(t).OnEvent(evt)
 }
 
diff --git a/server/infra/quota/quota.go b/server/infra/quota/quota.go
index 84dc579c..acff48e5 100644
--- a/server/infra/quota/quota.go
+++ b/server/infra/quota/quota.go
@@ -18,8 +18,8 @@ package quota
 
 import (
 	"fmt"
-	"golang.org/x/net/context"
 	scerr "github.com/apache/incubator-servicecomb-service-center/server/error"
+	"golang.org/x/net/context"
 )
 
 type ApplyQuotaResult struct {
diff --git a/server/plugin/infra/registry/etcd/etcd.go b/server/plugin/infra/registry/etcd/etcd.go
index d848eb92..1e57ee1c 100644
--- a/server/plugin/infra/registry/etcd/etcd.go
+++ b/server/plugin/infra/registry/etcd/etcd.go
@@ -501,49 +501,55 @@ func (c *EtcdClient) Watch(ctx context.Context, opts ...registry.PluginOpOption)
 				return
 			case resp, ok = <-ws:
 				if !ok {
-					err := errors.New("channel is closed")
-					return err
+					err = errors.New("channel is closed")
+					return
 				}
 				// cause a rpc ResourceExhausted error if watch response body larger then 4MB
 				if err = resp.Err(); err != nil {
-					return err
+					return
 				}
 
-				l := len(resp.Events)
-				kvs := make([]*mvccpb.KeyValue, l)
-				pIdx, prevAction := 0, mvccpb.PUT
-				pResp := &registry.PluginResponse{Action: registry.Put, Succeeded: true}
-
-				for _, evt := range resp.Events {
-					if prevAction != evt.Type {
-						prevAction = evt.Type
-
-						if pIdx > 0 {
-							err = setResponseAndCallback(pResp, kvs[:pIdx], op.WatchCallback)
-							if err != nil {
-								return
-							}
-							pIdx = 0
-						}
-					}
+				err = dispatch(resp.Events, op.WatchCallback)
+				if err != nil {
+					return
+				}
+			}
+		}
+	}
+	return fmt.Errorf("no key has been watched")
+}
 
-					pResp.Revision = evt.Kv.ModRevision
-					pResp.Action = setKvsAndConvertAction(kvs, pIdx, evt)
+func dispatch(evts []*clientv3.Event, cb registry.WatchCallback) error {
+	l := len(evts)
+	kvs := make([]*mvccpb.KeyValue, l)
+	sIdx, eIdx, prevAction := 0, 0, mvccpb.PUT
+	pResp := &registry.PluginResponse{Action: registry.Put, Succeeded: true}
 
-					pIdx++
-				}
+	for _, evt := range evts {
+		if prevAction != evt.Type {
+			prevAction = evt.Type
 
-				if pIdx > 0 {
-					err = setResponseAndCallback(pResp, kvs[:pIdx], op.WatchCallback)
-					if err != nil {
-						return
-					}
+			if eIdx > 0 {
+				err := setResponseAndCallback(pResp, kvs[sIdx:eIdx], cb)
+				if err != nil {
+					return err
 				}
+				sIdx = eIdx
 			}
 		}
+
+		if pResp.Revision < evt.Kv.ModRevision {
+			pResp.Revision = evt.Kv.ModRevision
+		}
+		pResp.Action = setKvsAndConvertAction(kvs, eIdx, evt)
+
+		eIdx++
 	}
-	err = fmt.Errorf("no key has been watched")
-	return
+
+	if eIdx > 0 {
+		return setResponseAndCallback(pResp, kvs[sIdx:eIdx], cb)
+	}
+	return nil
 }
 
 func setKvsAndConvertAction(kvs []*mvccpb.KeyValue, pIdx int, evt *clientv3.Event) registry.ActionType {
@@ -564,12 +570,7 @@ func setKvsAndConvertAction(kvs []*mvccpb.KeyValue, pIdx int, evt *clientv3.Even
 func setResponseAndCallback(pResp *registry.PluginResponse, kvs []*mvccpb.KeyValue, cb registry.WatchCallback) error {
 	pResp.Count = int64(len(kvs))
 	pResp.Kvs = kvs
-
-	err := cb("key information changed", pResp)
-	if err != nil {
-		return err
-	}
-	return nil
+	return cb("key information changed", pResp)
 }
 
 func NewRegistry() mgr.PluginInstance {
diff --git a/server/rest/controller/v4/microservice_controller.go b/server/rest/controller/v4/microservice_controller.go
index b0b177ff..795c72c8 100644
--- a/server/rest/controller/v4/microservice_controller.go
+++ b/server/rest/controller/v4/microservice_controller.go
@@ -102,7 +102,6 @@ func (this *MicroServiceService) Unregister(w http.ResponseWriter, r *http.Reque
 
 func (this *MicroServiceService) GetServices(w http.ResponseWriter, r *http.Request) {
 	request := &pb.GetServicesRequest{}
-	util.Logger().Debugf("domain is %s", util.ParseDomain(r.Context()))
 	resp, _ := core.ServiceAPI.GetServices(r.Context(), request)
 	respInternal := resp.Response
 	resp.Response = nil
diff --git a/server/service/event/dependency_event_handler.go b/server/service/event/dependency_event_handler.go
index 99808e11..d1bd3118 100644
--- a/server/service/event/dependency_event_handler.go
+++ b/server/service/event/dependency_event_handler.go
@@ -91,9 +91,9 @@ func (h *DependencyEventHandler) loop() {
 }
 
 type DependencyEventHandlerResource struct {
-	dep    *pb.ConsumerDependency
-	kv     *mvccpb.KeyValue
-	domainProject  string
+	dep           *pb.ConsumerDependency
+	kv            *mvccpb.KeyValue
+	domainProject string
 }
 
 func NewDependencyEventHandlerResource(dep *pb.ConsumerDependency, kv *mvccpb.KeyValue, domainProject string) *DependencyEventHandlerResource {
@@ -150,7 +150,7 @@ func (h *DependencyEventHandler) Handle() error {
 
 	dependencyRuleHandleResults := make(chan error, len(resourcesMap))
 	for lockKey, resources := range resourcesMap {
-		go func(lockKey string, resources []*DependencyEventHandlerResource){
+		go func(lockKey string, resources []*DependencyEventHandlerResource) {
 			err := h.dependencyRuleHandle(ctx, lockKey, resources)
 			dependencyRuleHandleResults <- err
 		}(lockKey, resources)
@@ -169,7 +169,7 @@ func (h *DependencyEventHandler) Handle() error {
 	return lastErr
 }
 
-func (h *DependencyEventHandler)dependencyRuleHandle(ctx context.Context, lockKey string, resources []*DependencyEventHandlerResource) error{
+func (h *DependencyEventHandler) dependencyRuleHandle(ctx context.Context, lockKey string, resources []*DependencyEventHandlerResource) error {
 	lock, err := serviceUtil.DependencyLock(lockKey)
 	if err != nil {
 		util.Logger().Errorf(err, "create dependency rule locker failed")
diff --git a/server/service/instances.go b/server/service/instances.go
index 1f066fb4..2e12fcf6 100644
--- a/server/service/instances.go
+++ b/server/service/instances.go
@@ -589,9 +589,13 @@ func (s *InstanceService) Find(ctx context.Context, in *pb.FindInstancesRequest)
 		}, nil
 	}
 
-	instances := make([]*pb.MicroServiceInstance, 0)
+	var instances []*pb.MicroServiceInstance
+	cloneCtx := ctx
+	if s, ok := ctx.Value("noCache").(string); !ok || s != "1" {
+		cloneCtx = util.SetContext(util.CloneContext(ctx), "cacheOnly", "1")
+	}
 	for _, serviceId := range ids {
-		resp, err := s.GetInstances(ctx, &pb.GetInstancesRequest{
+		resp, err := s.GetInstances(cloneCtx, &pb.GetInstancesRequest{
 			ConsumerServiceId: in.ConsumerServiceId,
 			ProviderServiceId: serviceId,
 			Tags:              in.Tags,
diff --git a/server/service/instances_test.go b/server/service/instances_test.go
index 355a6153..dfba6d3a 100644
--- a/server/service/instances_test.go
+++ b/server/service/instances_test.go
@@ -909,7 +909,7 @@ var _ = Describe("'Instance' service", func() {
 
 				respFind, err = instanceResource.Find(
 					util.SetTargetDomainProject(
-						util.SetDomainProject(context.Background(), "user", "user"),
+						util.SetDomainProject(util.CloneContext(getContext()), "user", "user"),
 						"default", "default"),
 					&pb.FindInstancesRequest{
 						ConsumerServiceId: serviceId6,
diff --git a/server/service/microservices.go b/server/service/microservices.go
index 319c1088..dcaa4be6 100644
--- a/server/service/microservices.go
+++ b/server/service/microservices.go
@@ -18,6 +18,7 @@ package service
 
 import (
 	"encoding/json"
+	"errors"
 	"fmt"
 	"github.com/apache/incubator-servicecomb-service-center/pkg/util"
 	"github.com/apache/incubator-servicecomb-service-center/server/core"
@@ -33,7 +34,6 @@ import (
 	"golang.org/x/net/context"
 	"strconv"
 	"time"
-	"errors"
 )
 
 type MicroServiceService struct {
@@ -130,6 +130,7 @@ func (s *MicroServiceService) CreateServicePri(ctx context.Context, in *pb.Creat
 	index := apt.GenerateServiceIndexKey(serviceKey)
 	indexBytes := util.StringToBytesWithNoCopy(index)
 	aliasBytes := util.StringToBytesWithNoCopy(apt.GenerateServiceAliasKey(serviceKey))
+
 	opts := []registry.PluginOp{
 		registry.OpPut(registry.WithStrKey(key), registry.WithValue(data)),
 		registry.OpPut(registry.WithKey(indexBytes), registry.WithStrValue(serviceId)),
diff --git a/server/service/rule.go b/server/service/rule.go
index 287dbc2b..40609077 100644
--- a/server/service/rule.go
+++ b/server/service/rule.go
@@ -64,7 +64,6 @@ func (s *MicroServiceService) AddRule(ctx context.Context, in *pb.AddServiceRule
 		return response, nil
 	}
 
-
 	ruleType, _, err := serviceUtil.GetServiceRuleType(ctx, domainProject, in.ServiceId)
 	util.Logger().Debugf("ruleType is %s", ruleType)
 	if err != nil {
diff --git a/server/service/util/dependency.go b/server/service/util/dependency.go
index 5bfb9789..68c6bf71 100644
--- a/server/service/util/dependency.go
+++ b/server/service/util/dependency.go
@@ -21,6 +21,7 @@ import (
 	"errors"
 	"fmt"
 	"github.com/apache/incubator-servicecomb-service-center/pkg/cache"
+	"github.com/apache/incubator-servicecomb-service-center/pkg/etcdsync"
 	"github.com/apache/incubator-servicecomb-service-center/pkg/util"
 	apt "github.com/apache/incubator-servicecomb-service-center/server/core"
 	"github.com/apache/incubator-servicecomb-service-center/server/core/backend"
@@ -28,11 +29,10 @@ import (
 	pb "github.com/apache/incubator-servicecomb-service-center/server/core/proto"
 	scerr "github.com/apache/incubator-servicecomb-service-center/server/error"
 	"github.com/apache/incubator-servicecomb-service-center/server/infra/registry"
+	"github.com/apache/incubator-servicecomb-service-center/server/mux"
 	"golang.org/x/net/context"
 	"strings"
 	"time"
-	"github.com/apache/incubator-servicecomb-service-center/pkg/etcdsync"
-	"github.com/apache/incubator-servicecomb-service-center/server/mux"
 )
 
 var consumerCache *cache.Cache
@@ -1059,5 +1059,5 @@ func DependencyLock(lockKey string) (*etcdsync.DLock, error) {
 }
 
 func NewDependencyLockKey(domainProject, env string) string {
-	return util.StringJoin([]string{"","env-lock", domainProject, env}, "/")
-}
\ No newline at end of file
+	return util.StringJoin([]string{"", "env-lock", domainProject, env}, "/")
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services