You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by as...@apache.org on 2018/06/25 04:44:15 UTC

[incubator-servicecomb-service-center] branch master updated: SCB-680 Optimize find instance api (#376)

This is an automated email from the ASF dual-hosted git repository.

asifdxtreme pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-service-center.git


The following commit(s) were added to refs/heads/master by this push:
     new 67e0f0b  SCB-680 Optimize find instance api (#376)
67e0f0b is described below

commit 67e0f0bd5b71e5d8034dc3ca84a13f88ba8692df
Author: little-cui <su...@qq.com>
AuthorDate: Mon Jun 25 12:44:12 2018 +0800

    SCB-680 Optimize find instance api (#376)
    
    * SCB-680 Optimize find instance api
---
 server/service/dependency.go                       |  2 -
 server/service/event/instance_event_handler.go     | 19 +++++-
 server/service/event/rule_event_handler.go         |  2 +-
 server/service/event/tag_event_handler.go          |  2 +-
 server/service/instance.go                         | 26 +++++++-
 server/service/{util => notification}/common.go    | 41 +++++++++++--
 server/service/notification/listwatcher.go         | 25 +++++---
 .../{util/common.go => notification/notice.go}     | 32 +++++++---
 server/service/notification/stream.go              | 54 ++++++++++++++++
 .../notification/{struct.go => subscriber.go}      | 58 ------------------
 .../notification/{watch_util.go => websocket.go}   | 57 +----------------
 server/service/util/common.go                      |  4 ++
 server/service/util/find_cache.go                  | 71 ++++++++++++++++++++++
 server/service/util/find_cache_test.go             | 60 ++++++++++++++++++
 server/service/util/instance_util.go               | 11 ++--
 server/service/util/instance_util_test.go          |  8 +--
 server/service/watch.go                            |  2 +-
 17 files changed, 322 insertions(+), 152 deletions(-)

diff --git a/server/service/dependency.go b/server/service/dependency.go
index eac1aa2..4c13dd0 100644
--- a/server/service/dependency.go
+++ b/server/service/dependency.go
@@ -140,7 +140,6 @@ func (s *MicroServiceService) GetProviderDependencies(ctx context.Context, in *p
 			Response: pb.CreateResponse(scerr.ErrInternal, err.Error()),
 		}, err
 	}
-	util.Logger().Debugf("GetProviderDependencies successfully, providerId is %s.", in.ServiceId)
 	return &pb.GetProDependenciesResponse{
 		Response:  pb.CreateResponse(pb.Response_SUCCESS, "Get all consumers successful."),
 		Consumers: services,
@@ -181,7 +180,6 @@ func (s *MicroServiceService) GetConsumerDependencies(ctx context.Context, in *p
 		}, err
 	}
 
-	util.Logger().Debugf("GetConsumerDependencies successfully, consumerId is %s.", consumerId)
 	return &pb.GetConDependenciesResponse{
 		Response:  pb.CreateResponse(pb.Response_SUCCESS, "Get all providers successfully."),
 		Providers: services,
diff --git a/server/service/event/instance_event_handler.go b/server/service/event/instance_event_handler.go
index 8144649..a80376d 100644
--- a/server/service/event/instance_event_handler.go
+++ b/server/service/event/instance_event_handler.go
@@ -93,7 +93,7 @@ func (h *InstanceEventHandler) OnEvent(evt backend.KvEvent) {
 		return
 	}
 
-	nf.PublishInstanceEvent(domainProject, action, &pb.MicroServiceKey{
+	PublishInstanceEvent(domainProject, action, &pb.MicroServiceKey{
 		Environment: ms.Environment,
 		AppId:       ms.AppId,
 		ServiceName: ms.ServiceName,
@@ -104,3 +104,20 @@ func (h *InstanceEventHandler) OnEvent(evt backend.KvEvent) {
 func NewInstanceEventHandler() *InstanceEventHandler {
 	return &InstanceEventHandler{}
 }
+
+func PublishInstanceEvent(domainProject string, action pb.EventType, serviceKey *pb.MicroServiceKey, instance *pb.MicroServiceInstance, rev int64, subscribers []string) {
+	response := &pb.WatchInstanceResponse{
+		Response: pb.CreateResponse(pb.Response_SUCCESS, "Watch instance successfully."),
+		Action:   string(action),
+		Key:      serviceKey,
+		Instance: instance,
+	}
+	for _, consumerId := range subscribers {
+		// expires cache
+		serviceUtil.FindInstancesCache.Delete(domainProject, consumerId, serviceKey)
+
+		// TODO add超时怎么处理?
+		job := nf.NewWatchJob(consumerId, apt.GetInstanceRootKey(domainProject)+"/", rev, response)
+		nf.GetNotifyService().AddJob(job)
+	}
+}
diff --git a/server/service/event/rule_event_handler.go b/server/service/event/rule_event_handler.go
index 0999dd0..474811e 100644
--- a/server/service/event/rule_event_handler.go
+++ b/server/service/event/rule_event_handler.go
@@ -69,7 +69,7 @@ func (apt *RulesChangedTask) publish(ctx context.Context, domainProject, provide
 	}
 	providerKey := pb.MicroServiceToKey(domainProject, provider)
 
-	nf.PublishInstanceEvent(domainProject, pb.EVT_EXPIRE, providerKey, nil, rev, consumerIds)
+	PublishInstanceEvent(domainProject, pb.EVT_EXPIRE, providerKey, nil, rev, consumerIds)
 	return nil
 }
 
diff --git a/server/service/event/tag_event_handler.go b/server/service/event/tag_event_handler.go
index f87f037..a83d2c5 100644
--- a/server/service/event/tag_event_handler.go
+++ b/server/service/event/tag_event_handler.go
@@ -73,7 +73,7 @@ func (apt *TagsChangedTask) publish(ctx context.Context, domainProject, consumer
 			util.Logger().Warnf(err, "get service %s file failed", providerId)
 			continue
 		}
-		nf.PublishInstanceEvent(domainProject, pb.EVT_EXPIRE,
+		PublishInstanceEvent(domainProject, pb.EVT_EXPIRE,
 			&pb.MicroServiceKey{
 				Environment: provider.Environment,
 				AppId:       provider.AppId,
diff --git a/server/service/instance.go b/server/service/instance.go
index bca599a..1618886 100644
--- a/server/service/instance.go
+++ b/server/service/instance.go
@@ -524,6 +524,7 @@ func (s *InstanceService) Find(ctx context.Context, in *pb.FindInstancesRequest)
 		AppId:       in.AppId,
 		ServiceName: in.ServiceName,
 		Alias:       in.ServiceName,
+		Version:     in.VersionRule,
 	}
 	if apt.IsShared(provider) {
 		// it means the shared micro-services must be the same env with SC.
@@ -536,6 +537,23 @@ func (s *InstanceService) Find(ctx context.Context, in *pb.FindInstancesRequest)
 		provider.Tenant = util.ParseTargetDomainProject(ctx)
 	}
 
+	// cache
+	if item := serviceUtil.FindInstancesCache.Get(provider.Tenant, in.ConsumerServiceId, provider); item != nil {
+		noCache, cacheOnly := ctx.Value(serviceUtil.CTX_NOCACHE) == "1", ctx.Value(serviceUtil.CTX_CACHEONLY) == "1"
+		rev, _ := ctx.Value(serviceUtil.CTX_REQUEST_REVISION).(int64)
+		if !noCache && (cacheOnly || rev <= item.Rev) {
+			instances := item.Instances
+			if rev == item.Rev {
+				instances = instances[:0]
+			}
+			util.SetContext(ctx, serviceUtil.CTX_RESPONSE_REVISION, item.Rev)
+			return &pb.FindInstancesResponse{
+				Response:  pb.CreateResponse(pb.Response_SUCCESS, "Query service instances successfully."),
+				Instances: instances,
+			}, nil
+		}
+	}
+
 	// 版本规则
 	ids, err := serviceUtil.FindServiceIds(ctx, in.VersionRule, provider)
 	if err != nil {
@@ -583,13 +601,19 @@ func (s *InstanceService) Find(ctx context.Context, in *pb.FindInstancesRequest)
 		}
 	}
 
-	instances, err := serviceUtil.GetAllInstancesOfServices(ctx, util.ParseTargetDomainProject(ctx), ids)
+	instances, rev, err := serviceUtil.GetAllInstancesOfServices(ctx, util.ParseTargetDomainProject(ctx), ids)
 	if err != nil {
 		util.Logger().Errorf(err, "find instance failed, %s: GetAllInstancesOfServices failed.", findFlag)
 		return &pb.FindInstancesResponse{
 			Response: pb.CreateResponse(scerr.ErrInternal, err.Error()),
 		}, err
 	}
+
+	serviceUtil.FindInstancesCache.Set(provider.Tenant, in.ConsumerServiceId, provider, &serviceUtil.VersionRuleCacheItem{
+		Instances: instances,
+		Rev:       rev,
+	})
+	util.SetContext(ctx, serviceUtil.CTX_RESPONSE_REVISION, rev)
 	return &pb.FindInstancesResponse{
 		Response:  pb.CreateResponse(pb.Response_SUCCESS, "Query service instances successfully."),
 		Instances: instances,
diff --git a/server/service/util/common.go b/server/service/notification/common.go
similarity index 52%
copy from server/service/util/common.go
copy to server/service/notification/common.go
index 6c25e51..7e2e16a 100644
--- a/server/service/util/common.go
+++ b/server/service/notification/common.go
@@ -14,12 +14,41 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package util
+package notification
+
+import (
+	"fmt"
+	"strconv"
+	"time"
+)
 
 const (
-	HEADER_REV            = "X-Resource-Revision"
-	CTX_NOCACHE           = "noCache"
-	CTX_CACHEONLY         = "cacheOnly"
-	CTX_REQUEST_REVISION  = "requestRev"
-	CTX_RESPONSE_REVISION = "responseRev"
+	DEFAULT_MAX_QUEUE          = 1000
+	DEFAULT_INIT_SUBSCRIBERS   = 1000
+	DEFAULT_ON_MESSAGE_TIMEOUT = 100 * time.Millisecond
+	DEFAULT_TIMEOUT            = 30 * time.Second
+
+	NOTIFTY NotifyType = iota
+	INSTANCE
+	typeEnd
 )
+
+type NotifyType int
+
+func (nt NotifyType) String() string {
+	if int(nt) < len(notifyTypeNames) {
+		return notifyTypeNames[nt]
+	}
+	return "NotifyType" + strconv.Itoa(int(nt))
+}
+
+type NotifyServiceConfig struct {
+	AddTimeout    time.Duration
+	NotifyTimeout time.Duration
+	MaxQueue      int64
+}
+
+func (nsc NotifyServiceConfig) String() string {
+	return fmt.Sprintf("{acceptQueue: %d, accept: %s, notify: %s}",
+		nsc.MaxQueue, nsc.AddTimeout, nsc.NotifyTimeout)
+}
diff --git a/server/service/notification/listwatcher.go b/server/service/notification/listwatcher.go
index 5d911a2..a6942c9 100644
--- a/server/service/notification/listwatcher.go
+++ b/server/service/notification/listwatcher.go
@@ -32,7 +32,7 @@ type WatchJob struct {
 
 type ListWatcher struct {
 	BaseSubscriber
-	Job          chan NotifyJob
+	Job          chan *WatchJob
 	ListRevision int64
 	ListFunc     func() (results []*pb.WatchInstanceResponse, rev int64)
 
@@ -56,7 +56,7 @@ func (w *ListWatcher) listAndPublishJobs(_ context.Context) {
 	results, rev := w.ListFunc()
 	w.ListRevision = rev
 	for _, response := range results {
-		w.sendMessage(NewWatchJob(w.Type(), w.Id(), w.Subject(), w.ListRevision, response))
+		w.sendMessage(NewWatchJob(w.Id(), w.Subject(), w.ListRevision, response))
 	}
 }
 
@@ -66,6 +66,11 @@ func (w *ListWatcher) OnMessage(job NotifyJob) {
 		return
 	}
 
+	wJob, ok := job.(*WatchJob)
+	if !ok {
+		return
+	}
+
 	timer := time.NewTimer(DEFAULT_ON_MESSAGE_TIMEOUT)
 	select {
 	case <-w.listCh:
@@ -77,16 +82,16 @@ func (w *ListWatcher) OnMessage(job NotifyJob) {
 		return
 	}
 
-	if job.(*WatchJob).Revision <= w.ListRevision {
+	if wJob.Revision <= w.ListRevision {
 		util.Logger().Warnf(nil,
 			"unexpected notify %s job is coming in, watcher %s %s, job is %v, current revision is %v",
 			w.Type(), w.Id(), w.Subject(), job, w.ListRevision)
 		return
 	}
-	w.sendMessage(job)
+	w.sendMessage(wJob)
 }
 
-func (w *ListWatcher) sendMessage(job NotifyJob) {
+func (w *ListWatcher) sendMessage(job *WatchJob) {
 	util.Logger().Debugf("start to notify %s watcher %s %s, job is %v, current revision is %v", w.Type(),
 		w.Id(), w.Subject(), job, w.ListRevision)
 	defer util.RecoverAndReport()
@@ -105,27 +110,27 @@ func (w *ListWatcher) Close() {
 	close(w.Job)
 }
 
-func NewWatchJob(nType NotifyType, subscriberId, subject string, rev int64, response *pb.WatchInstanceResponse) *WatchJob {
+func NewWatchJob(subscriberId, subject string, rev int64, response *pb.WatchInstanceResponse) *WatchJob {
 	return &WatchJob{
 		BaseNotifyJob: BaseNotifyJob{
 			subscriberId: subscriberId,
 			subject:      subject,
-			nType:        nType,
+			nType:        INSTANCE,
 		},
 		Revision: rev,
 		Response: response,
 	}
 }
 
-func NewListWatcher(nType NotifyType, id string, subject string,
+func NewListWatcher(id string, subject string,
 	listFunc func() (results []*pb.WatchInstanceResponse, rev int64)) *ListWatcher {
 	watcher := &ListWatcher{
 		BaseSubscriber: BaseSubscriber{
 			id:      id,
 			subject: subject,
-			nType:   nType,
+			nType:   INSTANCE,
 		},
-		Job:      make(chan NotifyJob, DEFAULT_MAX_QUEUE),
+		Job:      make(chan *WatchJob, DEFAULT_MAX_QUEUE),
 		ListFunc: listFunc,
 		listCh:   make(chan struct{}),
 	}
diff --git a/server/service/util/common.go b/server/service/notification/notice.go
similarity index 66%
copy from server/service/util/common.go
copy to server/service/notification/notice.go
index 6c25e51..498d435 100644
--- a/server/service/util/common.go
+++ b/server/service/notification/notice.go
@@ -14,12 +14,28 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package util
+package notification
 
-const (
-	HEADER_REV            = "X-Resource-Revision"
-	CTX_NOCACHE           = "noCache"
-	CTX_CACHEONLY         = "cacheOnly"
-	CTX_REQUEST_REVISION  = "requestRev"
-	CTX_RESPONSE_REVISION = "responseRev"
-)
+type NotifyJob interface {
+	SubscriberId() string
+	Subject() string
+	Type() NotifyType
+}
+
+type BaseNotifyJob struct {
+	subscriberId string
+	subject      string
+	nType        NotifyType
+}
+
+func (s *BaseNotifyJob) SubscriberId() string {
+	return s.subscriberId
+}
+
+func (s *BaseNotifyJob) Subject() string {
+	return s.subject
+}
+
+func (s *BaseNotifyJob) Type() NotifyType {
+	return s.nType
+}
diff --git a/server/service/notification/stream.go b/server/service/notification/stream.go
new file mode 100644
index 0000000..101473d
--- /dev/null
+++ b/server/service/notification/stream.go
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package notification
+
+import (
+	"errors"
+	"github.com/apache/incubator-servicecomb-service-center/pkg/util"
+	pb "github.com/apache/incubator-servicecomb-service-center/server/core/proto"
+	"time"
+)
+
+func HandleWatchJob(watcher *ListWatcher, stream pb.ServiceInstanceCtrl_WatchServer, timeout time.Duration) (err error) {
+	for {
+		timer := time.NewTimer(timeout)
+		select {
+		case <-timer.C:
+			// TODO grpc 长连接心跳?
+		case job := <-watcher.Job:
+			timer.Stop()
+
+			if job == nil {
+				err = errors.New("channel is closed")
+				util.Logger().Errorf(err, "watcher %s %s caught an exception",
+					watcher.Subject(), watcher.Id())
+				return
+			}
+			resp := job.Response
+			util.Logger().Infof("event is coming in, watcher %s %s",
+				watcher.Subject(), watcher.Id())
+
+			err = stream.Send(resp)
+			if err != nil {
+				util.Logger().Errorf(err, "send message error, watcher %s %s",
+					watcher.Subject(), watcher.Id())
+				watcher.SetError(err)
+				return
+			}
+		}
+	}
+}
diff --git a/server/service/notification/struct.go b/server/service/notification/subscriber.go
similarity index 64%
rename from server/service/notification/struct.go
rename to server/service/notification/subscriber.go
index f6564ff..9f0b4df 100644
--- a/server/service/notification/struct.go
+++ b/server/service/notification/subscriber.go
@@ -18,42 +18,8 @@ package notification
 
 import (
 	"errors"
-	"fmt"
-	"strconv"
-	"time"
 )
 
-const (
-	DEFAULT_MAX_QUEUE          = 1000
-	DEFAULT_INIT_SUBSCRIBERS   = 1000
-	DEFAULT_ON_MESSAGE_TIMEOUT = 100 * time.Millisecond
-	DEFAULT_TIMEOUT            = 30 * time.Second
-
-	NOTIFTY NotifyType = iota
-	INSTANCE
-	typeEnd
-)
-
-type NotifyType int
-
-func (nt NotifyType) String() string {
-	if int(nt) < len(notifyTypeNames) {
-		return notifyTypeNames[nt]
-	}
-	return "NotifyType" + strconv.Itoa(int(nt))
-}
-
-type NotifyServiceConfig struct {
-	AddTimeout    time.Duration
-	NotifyTimeout time.Duration
-	MaxQueue      int64
-}
-
-func (nsc NotifyServiceConfig) String() string {
-	return fmt.Sprintf("{acceptQueue: %d, accept: %s, notify: %s}",
-		nsc.MaxQueue, nsc.AddTimeout, nsc.NotifyTimeout)
-}
-
 type Subscriber interface {
 	Err() error
 	SetError(err error)
@@ -68,12 +34,6 @@ type Subscriber interface {
 	Close()
 }
 
-type NotifyJob interface {
-	SubscriberId() string
-	Subject() string
-	Type() NotifyType
-}
-
 type BaseSubscriber struct {
 	id      string
 	subject string
@@ -122,21 +82,3 @@ func (s *BaseSubscriber) OnMessage(job NotifyJob) {
 func (s *BaseSubscriber) Close() {
 
 }
-
-type BaseNotifyJob struct {
-	subscriberId string
-	subject      string
-	nType        NotifyType
-}
-
-func (s *BaseNotifyJob) SubscriberId() string {
-	return s.subscriberId
-}
-
-func (s *BaseNotifyJob) Subject() string {
-	return s.subject
-}
-
-func (s *BaseNotifyJob) Type() NotifyType {
-	return s.nType
-}
diff --git a/server/service/notification/watch_util.go b/server/service/notification/websocket.go
similarity index 82%
rename from server/service/notification/watch_util.go
rename to server/service/notification/websocket.go
index 3a13500..f53e641 100644
--- a/server/service/notification/watch_util.go
+++ b/server/service/notification/websocket.go
@@ -18,7 +18,6 @@ package notification
 
 import (
 	"encoding/json"
-	"errors"
 	"fmt"
 	"github.com/apache/incubator-servicecomb-service-center/pkg/util"
 	apt "github.com/apache/incubator-servicecomb-service-center/server/core"
@@ -29,36 +28,6 @@ import (
 	"time"
 )
 
-func HandleWatchJob(watcher *ListWatcher, stream pb.ServiceInstanceCtrl_WatchServer, timeout time.Duration) (err error) {
-	for {
-		timer := time.NewTimer(timeout)
-		select {
-		case <-timer.C:
-		// TODO grpc 长连接心跳?
-		case job := <-watcher.Job:
-			timer.Stop()
-
-			if job == nil {
-				err = errors.New("channel is closed")
-				util.Logger().Errorf(err, "watcher %s %s caught an exception",
-					watcher.Subject(), watcher.Id())
-				return
-			}
-			resp := job.(*WatchJob).Response
-			util.Logger().Infof("event is coming in, watcher %s %s",
-				watcher.Subject(), watcher.Id())
-
-			err = stream.Send(resp)
-			if err != nil {
-				util.Logger().Errorf(err, "send message error, watcher %s %s",
-					watcher.Subject(), watcher.Id())
-				watcher.SetError(err)
-				return
-			}
-		}
-	}
-}
-
 type WebSocketHandler struct {
 	ctx             context.Context
 	conn            *websocket.Conn
@@ -71,7 +40,7 @@ type WebSocketHandler struct {
 func (wh *WebSocketHandler) Init() error {
 	remoteAddr := wh.conn.RemoteAddr().String()
 	if err := GetNotifyService().AddSubscriber(wh.watcher); err != nil {
-		err = fmt.Errorf("establish[%s] websocket watch failed: notify service error, %s.",
+		err = fmt.Errorf("establish[%s] websocket watch failed: notify service error, %s",
 			remoteAddr, err.Error())
 		util.Logger().Errorf(nil, err.Error())
 
@@ -207,7 +176,7 @@ func (wh *WebSocketHandler) HandleWatchWebSocketJob() {
 				return
 			}
 
-			resp := job.(*WatchJob).Response
+			resp := job.Response
 
 			providerFlag := fmt.Sprintf("%s/%s/%s", resp.Key.AppId, resp.Key.ServiceName, resp.Key.Version)
 			if resp.Action != string(pb.EVT_EXPIRE) {
@@ -261,7 +230,7 @@ func DoWebSocketListAndWatch(ctx context.Context, serviceId string, f func() ([]
 	handler := &WebSocketHandler{
 		ctx:             ctx,
 		conn:            conn,
-		watcher:         NewInstanceListWatcher(serviceId, apt.GetInstanceRootKey(domainProject)+"/", f),
+		watcher:         NewListWatcher(serviceId, apt.GetInstanceRootKey(domainProject)+"/", f),
 		needPingWatcher: true,
 		closed:          make(chan struct{}),
 		goroutine:       util.NewGo(context.Background()),
@@ -283,23 +252,3 @@ func EstablishWebSocketError(conn *websocket.Conn, err error) {
 		util.Logger().Errorf(err, "establish[%s] websocket watch failed: write message failed.", remoteAddr)
 	}
 }
-
-func PublishInstanceEvent(domainProject string, action pb.EventType, serviceKey *pb.MicroServiceKey, instance *pb.MicroServiceInstance, rev int64, subscribers []string) {
-	response := &pb.WatchInstanceResponse{
-		Response: pb.CreateResponse(pb.Response_SUCCESS, "Watch instance successfully."),
-		Action:   string(action),
-		Key:      serviceKey,
-		Instance: instance,
-	}
-	for _, consumerId := range subscribers {
-		job := NewWatchJob(INSTANCE, consumerId, apt.GetInstanceRootKey(domainProject)+"/", rev, response)
-		util.Logger().Debugf("publish event to notify service, %v", job)
-
-		// TODO add超时怎么处理?
-		GetNotifyService().AddJob(job)
-	}
-}
-
-func NewInstanceListWatcher(selfServiceId, instanceRoot string, listFunc func() (results []*pb.WatchInstanceResponse, rev int64)) *ListWatcher {
-	return NewListWatcher(INSTANCE, selfServiceId, instanceRoot, listFunc)
-}
diff --git a/server/service/util/common.go b/server/service/util/common.go
index 6c25e51..9e31b3f 100644
--- a/server/service/util/common.go
+++ b/server/service/util/common.go
@@ -16,10 +16,14 @@
  */
 package util
 
+import "time"
+
 const (
 	HEADER_REV            = "X-Resource-Revision"
 	CTX_NOCACHE           = "noCache"
 	CTX_CACHEONLY         = "cacheOnly"
 	CTX_REQUEST_REVISION  = "requestRev"
 	CTX_RESPONSE_REVISION = "responseRev"
+
+	cacheTTL = 5 * time.Minute
 )
diff --git a/server/service/util/find_cache.go b/server/service/util/find_cache.go
new file mode 100644
index 0000000..d5c2ab7
--- /dev/null
+++ b/server/service/util/find_cache.go
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package util
+
+import (
+	"errors"
+	"github.com/apache/incubator-servicecomb-service-center/pkg/util"
+	pb "github.com/apache/incubator-servicecomb-service-center/server/core/proto"
+	"github.com/karlseguin/ccache"
+)
+
+var FindInstancesCache = &VersionRuleCache{
+	c: ccache.Layered(ccache.Configure()),
+}
+
+type VersionRuleCacheItem struct {
+	Instances []*pb.MicroServiceInstance
+	Rev       int64
+}
+
+type VersionRuleCache struct {
+	c *ccache.LayeredCache
+}
+
+func (c *VersionRuleCache) primaryKey(domainProject string, provider *pb.MicroServiceKey) string {
+	return util.StringJoin([]string{
+		domainProject,
+		provider.Environment,
+		provider.AppId,
+		provider.ServiceName}, "/")
+}
+
+func (c *VersionRuleCache) Get(domainProject, consumer string, provider *pb.MicroServiceKey) *VersionRuleCacheItem {
+	item, _ := c.c.Fetch(c.primaryKey(domainProject, provider), provider.Version, cacheTTL, func() (interface{}, error) {
+		return nil, errors.New("not exist")
+	})
+	if item == nil || item.Expired() {
+		return nil
+	}
+
+	if v, ok := item.Value().(*util.ConcurrentMap).Get(consumer); ok {
+		return v.(*VersionRuleCacheItem)
+	}
+	return nil
+}
+
+func (c *VersionRuleCache) Set(domainProject, consumer string, provider *pb.MicroServiceKey, item *VersionRuleCacheItem) {
+	c2, _ := c.c.Fetch(c.primaryKey(domainProject, provider), provider.Version, cacheTTL, func() (interface{}, error) {
+		// new one if not exist
+		return util.NewConcurrentMap(1), nil
+	})
+	c2.Value().(*util.ConcurrentMap).Put(consumer, item)
+}
+
+func (c *VersionRuleCache) Delete(domainProject, consumer string, provider *pb.MicroServiceKey) {
+	c.c.DeleteAll(c.primaryKey(domainProject, provider))
+}
diff --git a/server/service/util/find_cache_test.go b/server/service/util/find_cache_test.go
new file mode 100644
index 0000000..9275101
--- /dev/null
+++ b/server/service/util/find_cache_test.go
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package util
+
+import (
+	pb "github.com/apache/incubator-servicecomb-service-center/server/core/proto"
+	"testing"
+)
+
+func TestVersionRuleCache_Get(t *testing.T) {
+	p := &pb.MicroServiceKey{}
+	c := FindInstancesCache.Get("d", "c", p)
+	if c != nil {
+		t.Fatalf("TestVersionRuleCache_Get failed, %v", c)
+	}
+
+	r := &VersionRuleCacheItem{Rev: 1}
+	FindInstancesCache.Set("d", "c", p, r)
+	c = FindInstancesCache.Get("d", "c", p)
+	if c == nil {
+		t.Fatalf("TestVersionRuleCache_Get failed, %v", c)
+	}
+	if c.Rev != 1 {
+		t.Fatalf("TestVersionRuleCache_Get failed, rev %d != 1", c.Rev)
+	}
+	c = FindInstancesCache.Get("d", "c2", p)
+	if c != nil {
+		t.Fatalf("TestVersionRuleCache_Get failed, %v", c)
+	}
+
+	p2 := &pb.MicroServiceKey{ServiceName: "p2"}
+	FindInstancesCache.Set("d", "c2", p, r)
+	FindInstancesCache.Set("d", "c2", p2, r)
+	FindInstancesCache.Delete("d", "c", p)
+	c = FindInstancesCache.Get("d", "c2", p)
+	if c != nil {
+		t.Fatalf("TestVersionRuleCache_Get failed, %v", c)
+	}
+	c = FindInstancesCache.Get("d", "c2", p2)
+	if c == nil {
+		t.Fatalf("TestVersionRuleCache_Get failed, %v", c)
+	}
+	if c.Rev != 1 {
+		t.Fatalf("TestVersionRuleCache_Get failed, rev %d != 1", c.Rev)
+	}
+}
diff --git a/server/service/util/instance_util.go b/server/service/util/instance_util.go
index 6f3b3b3..bf91107 100644
--- a/server/service/util/instance_util.go
+++ b/server/service/util/instance_util.go
@@ -66,11 +66,12 @@ func GetInstance(ctx context.Context, domainProject string, serviceId string, in
 	return instance, nil
 }
 
-func GetAllInstancesOfServices(ctx context.Context, domainProject string, ids []string) (instances []*pb.MicroServiceInstance, err error) {
+func GetAllInstancesOfServices(ctx context.Context, domainProject string, ids []string) (
+	instances []*pb.MicroServiceInstance, rev int64, err error) {
 	cloneCtx := util.CloneContext(ctx)
 	noCache, cacheOnly := ctx.Value(CTX_NOCACHE) == "1", ctx.Value(CTX_CACHEONLY) == "1"
 
-	rev, _ := cloneCtx.Value(CTX_REQUEST_REVISION).(int64)
+	rev, _ = cloneCtx.Value(CTX_REQUEST_REVISION).(int64)
 	if !noCache && !cacheOnly && rev > 0 {
 		// force to find in cache at first time when rev > 0
 		util.SetContext(cloneCtx, CTX_CACHEONLY, "1")
@@ -86,7 +87,7 @@ func GetAllInstancesOfServices(ctx context.Context, domainProject string, ids []
 			opts := append(FromContext(cloneCtx), registry.WithStrKey(key), registry.WithPrefix())
 			resp, err := backend.Store().Instance().Search(cloneCtx, opts...)
 			if err != nil {
-				return nil, err
+				return nil, 0, err
 			}
 
 			if len(resp.Kvs) > 0 {
@@ -122,13 +123,13 @@ func GetAllInstancesOfServices(ctx context.Context, domainProject string, ids []
 		instance := &pb.MicroServiceInstance{}
 		err := json.Unmarshal(kv.Value, instance)
 		if err != nil {
-			return nil, fmt.Errorf("unmarshal %s faild, %s",
+			return nil, 0, fmt.Errorf("unmarshal %s faild, %s",
 				util.BytesToStringWithNoCopy(kv.Key), err.Error())
 		}
 		instances = append(instances, instance)
 	}
 
-	util.SetContext(ctx, CTX_RESPONSE_REVISION, max)
+	rev = max
 	return
 }
 
diff --git a/server/service/util/instance_util_test.go b/server/service/util/instance_util_test.go
index 635be35..b99d810 100644
--- a/server/service/util/instance_util_test.go
+++ b/server/service/util/instance_util_test.go
@@ -116,19 +116,19 @@ func TestGetInstanceCountOfOneService(t *testing.T) {
 }
 
 func TestGetInstanceCountOfServices(t *testing.T) {
-	_, err := GetAllInstancesOfServices(util.SetContext(context.Background(), CTX_CACHEONLY, "1"), "", []string{"1"})
+	_, _, err := GetAllInstancesOfServices(util.SetContext(context.Background(), CTX_CACHEONLY, "1"), "", []string{"1"})
 	if err != nil {
 		t.Fatalf(`GetAllInstancesOfServices CTX_CACHEONLY failed`)
 	}
-	_, err = GetAllInstancesOfServices(util.SetContext(context.Background(), CTX_NOCACHE, "1"), "", []string{"1"})
+	_, _, err = GetAllInstancesOfServices(util.SetContext(context.Background(), CTX_NOCACHE, "1"), "", []string{"1"})
 	if err == nil {
 		t.Fatalf(`GetAllInstancesOfServices CTX_NOCACHE failed`)
 	}
-	_, err = GetAllInstancesOfServices(util.SetContext(context.Background(), CTX_REQUEST_REVISION, 1), "", []string{"1"})
+	_, _, err = GetAllInstancesOfServices(util.SetContext(context.Background(), CTX_REQUEST_REVISION, 1), "", []string{"1"})
 	if err == nil {
 		t.Fatalf(`GetAllInstancesOfServices CTX_REQUEST_REVISION failed`)
 	}
-	_, err = GetAllInstancesOfServices(context.Background(), "", []string{"1"})
+	_, _, err = GetAllInstancesOfServices(context.Background(), "", []string{"1"})
 	if err == nil {
 		t.Fatalf(`GetAllInstancesOfServices failed`)
 	}
diff --git a/server/service/watch.go b/server/service/watch.go
index e0b5780..1fcefa0 100644
--- a/server/service/watch.go
+++ b/server/service/watch.go
@@ -45,7 +45,7 @@ func (s *InstanceService) Watch(in *pb.WatchInstanceRequest, stream pb.ServiceIn
 		return err
 	}
 	domainProject := util.ParseDomainProject(stream.Context())
-	watcher := nf.NewInstanceListWatcher(in.SelfServiceId, apt.GetInstanceRootKey(domainProject)+"/", nil)
+	watcher := nf.NewListWatcher(in.SelfServiceId, apt.GetInstanceRootKey(domainProject)+"/", nil)
 	err = nf.GetNotifyService().AddSubscriber(watcher)
 	util.Logger().Infof("start watch instance status, watcher %s %s", watcher.Subject(), watcher.Id())
 	return nf.HandleWatchJob(watcher, stream, nf.GetNotifyService().Config.NotifyTimeout)