You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by as...@apache.org on 2018/06/25 04:44:15 UTC
[incubator-servicecomb-service-center] branch master updated:
SCB-680 Optimize find instance api (#376)
This is an automated email from the ASF dual-hosted git repository.
asifdxtreme pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-service-center.git
The following commit(s) were added to refs/heads/master by this push:
new 67e0f0b SCB-680 Optimize find instance api (#376)
67e0f0b is described below
commit 67e0f0bd5b71e5d8034dc3ca84a13f88ba8692df
Author: little-cui <su...@qq.com>
AuthorDate: Mon Jun 25 12:44:12 2018 +0800
SCB-680 Optimize find instance api (#376)
* SCB-680 Optimize find instance api
---
server/service/dependency.go | 2 -
server/service/event/instance_event_handler.go | 19 +++++-
server/service/event/rule_event_handler.go | 2 +-
server/service/event/tag_event_handler.go | 2 +-
server/service/instance.go | 26 +++++++-
server/service/{util => notification}/common.go | 41 +++++++++++--
server/service/notification/listwatcher.go | 25 +++++---
.../{util/common.go => notification/notice.go} | 32 +++++++---
server/service/notification/stream.go | 54 ++++++++++++++++
.../notification/{struct.go => subscriber.go} | 58 ------------------
.../notification/{watch_util.go => websocket.go} | 57 +----------------
server/service/util/common.go | 4 ++
server/service/util/find_cache.go | 71 ++++++++++++++++++++++
server/service/util/find_cache_test.go | 60 ++++++++++++++++++
server/service/util/instance_util.go | 11 ++--
server/service/util/instance_util_test.go | 8 +--
server/service/watch.go | 2 +-
17 files changed, 322 insertions(+), 152 deletions(-)
diff --git a/server/service/dependency.go b/server/service/dependency.go
index eac1aa2..4c13dd0 100644
--- a/server/service/dependency.go
+++ b/server/service/dependency.go
@@ -140,7 +140,6 @@ func (s *MicroServiceService) GetProviderDependencies(ctx context.Context, in *p
Response: pb.CreateResponse(scerr.ErrInternal, err.Error()),
}, err
}
- util.Logger().Debugf("GetProviderDependencies successfully, providerId is %s.", in.ServiceId)
return &pb.GetProDependenciesResponse{
Response: pb.CreateResponse(pb.Response_SUCCESS, "Get all consumers successful."),
Consumers: services,
@@ -181,7 +180,6 @@ func (s *MicroServiceService) GetConsumerDependencies(ctx context.Context, in *p
}, err
}
- util.Logger().Debugf("GetConsumerDependencies successfully, consumerId is %s.", consumerId)
return &pb.GetConDependenciesResponse{
Response: pb.CreateResponse(pb.Response_SUCCESS, "Get all providers successfully."),
Providers: services,
diff --git a/server/service/event/instance_event_handler.go b/server/service/event/instance_event_handler.go
index 8144649..a80376d 100644
--- a/server/service/event/instance_event_handler.go
+++ b/server/service/event/instance_event_handler.go
@@ -93,7 +93,7 @@ func (h *InstanceEventHandler) OnEvent(evt backend.KvEvent) {
return
}
- nf.PublishInstanceEvent(domainProject, action, &pb.MicroServiceKey{
+ PublishInstanceEvent(domainProject, action, &pb.MicroServiceKey{
Environment: ms.Environment,
AppId: ms.AppId,
ServiceName: ms.ServiceName,
@@ -104,3 +104,20 @@ func (h *InstanceEventHandler) OnEvent(evt backend.KvEvent) {
func NewInstanceEventHandler() *InstanceEventHandler {
return &InstanceEventHandler{}
}
+
+func PublishInstanceEvent(domainProject string, action pb.EventType, serviceKey *pb.MicroServiceKey, instance *pb.MicroServiceInstance, rev int64, subscribers []string) {
+ response := &pb.WatchInstanceResponse{
+ Response: pb.CreateResponse(pb.Response_SUCCESS, "Watch instance successfully."),
+ Action: string(action),
+ Key: serviceKey,
+ Instance: instance,
+ }
+ for _, consumerId := range subscribers {
+ // expires cache
+ serviceUtil.FindInstancesCache.Delete(domainProject, consumerId, serviceKey)
+
+ // TODO add超时怎么处理?
+ job := nf.NewWatchJob(consumerId, apt.GetInstanceRootKey(domainProject)+"/", rev, response)
+ nf.GetNotifyService().AddJob(job)
+ }
+}
diff --git a/server/service/event/rule_event_handler.go b/server/service/event/rule_event_handler.go
index 0999dd0..474811e 100644
--- a/server/service/event/rule_event_handler.go
+++ b/server/service/event/rule_event_handler.go
@@ -69,7 +69,7 @@ func (apt *RulesChangedTask) publish(ctx context.Context, domainProject, provide
}
providerKey := pb.MicroServiceToKey(domainProject, provider)
- nf.PublishInstanceEvent(domainProject, pb.EVT_EXPIRE, providerKey, nil, rev, consumerIds)
+ PublishInstanceEvent(domainProject, pb.EVT_EXPIRE, providerKey, nil, rev, consumerIds)
return nil
}
diff --git a/server/service/event/tag_event_handler.go b/server/service/event/tag_event_handler.go
index f87f037..a83d2c5 100644
--- a/server/service/event/tag_event_handler.go
+++ b/server/service/event/tag_event_handler.go
@@ -73,7 +73,7 @@ func (apt *TagsChangedTask) publish(ctx context.Context, domainProject, consumer
util.Logger().Warnf(err, "get service %s file failed", providerId)
continue
}
- nf.PublishInstanceEvent(domainProject, pb.EVT_EXPIRE,
+ PublishInstanceEvent(domainProject, pb.EVT_EXPIRE,
&pb.MicroServiceKey{
Environment: provider.Environment,
AppId: provider.AppId,
diff --git a/server/service/instance.go b/server/service/instance.go
index bca599a..1618886 100644
--- a/server/service/instance.go
+++ b/server/service/instance.go
@@ -524,6 +524,7 @@ func (s *InstanceService) Find(ctx context.Context, in *pb.FindInstancesRequest)
AppId: in.AppId,
ServiceName: in.ServiceName,
Alias: in.ServiceName,
+ Version: in.VersionRule,
}
if apt.IsShared(provider) {
// it means the shared micro-services must be the same env with SC.
@@ -536,6 +537,23 @@ func (s *InstanceService) Find(ctx context.Context, in *pb.FindInstancesRequest)
provider.Tenant = util.ParseTargetDomainProject(ctx)
}
+ // cache
+ if item := serviceUtil.FindInstancesCache.Get(provider.Tenant, in.ConsumerServiceId, provider); item != nil {
+ noCache, cacheOnly := ctx.Value(serviceUtil.CTX_NOCACHE) == "1", ctx.Value(serviceUtil.CTX_CACHEONLY) == "1"
+ rev, _ := ctx.Value(serviceUtil.CTX_REQUEST_REVISION).(int64)
+ if !noCache && (cacheOnly || rev <= item.Rev) {
+ instances := item.Instances
+ if rev == item.Rev {
+ instances = instances[:0]
+ }
+ util.SetContext(ctx, serviceUtil.CTX_RESPONSE_REVISION, item.Rev)
+ return &pb.FindInstancesResponse{
+ Response: pb.CreateResponse(pb.Response_SUCCESS, "Query service instances successfully."),
+ Instances: instances,
+ }, nil
+ }
+ }
+
// 版本规则
ids, err := serviceUtil.FindServiceIds(ctx, in.VersionRule, provider)
if err != nil {
@@ -583,13 +601,19 @@ func (s *InstanceService) Find(ctx context.Context, in *pb.FindInstancesRequest)
}
}
- instances, err := serviceUtil.GetAllInstancesOfServices(ctx, util.ParseTargetDomainProject(ctx), ids)
+ instances, rev, err := serviceUtil.GetAllInstancesOfServices(ctx, util.ParseTargetDomainProject(ctx), ids)
if err != nil {
util.Logger().Errorf(err, "find instance failed, %s: GetAllInstancesOfServices failed.", findFlag)
return &pb.FindInstancesResponse{
Response: pb.CreateResponse(scerr.ErrInternal, err.Error()),
}, err
}
+
+ serviceUtil.FindInstancesCache.Set(provider.Tenant, in.ConsumerServiceId, provider, &serviceUtil.VersionRuleCacheItem{
+ Instances: instances,
+ Rev: rev,
+ })
+ util.SetContext(ctx, serviceUtil.CTX_RESPONSE_REVISION, rev)
return &pb.FindInstancesResponse{
Response: pb.CreateResponse(pb.Response_SUCCESS, "Query service instances successfully."),
Instances: instances,
diff --git a/server/service/util/common.go b/server/service/notification/common.go
similarity index 52%
copy from server/service/util/common.go
copy to server/service/notification/common.go
index 6c25e51..7e2e16a 100644
--- a/server/service/util/common.go
+++ b/server/service/notification/common.go
@@ -14,12 +14,41 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package util
+package notification
+
+import (
+ "fmt"
+ "strconv"
+ "time"
+)
const (
- HEADER_REV = "X-Resource-Revision"
- CTX_NOCACHE = "noCache"
- CTX_CACHEONLY = "cacheOnly"
- CTX_REQUEST_REVISION = "requestRev"
- CTX_RESPONSE_REVISION = "responseRev"
+ DEFAULT_MAX_QUEUE = 1000
+ DEFAULT_INIT_SUBSCRIBERS = 1000
+ DEFAULT_ON_MESSAGE_TIMEOUT = 100 * time.Millisecond
+ DEFAULT_TIMEOUT = 30 * time.Second
+
+ NOTIFTY NotifyType = iota
+ INSTANCE
+ typeEnd
)
+
+type NotifyType int
+
+func (nt NotifyType) String() string {
+ if int(nt) < len(notifyTypeNames) {
+ return notifyTypeNames[nt]
+ }
+ return "NotifyType" + strconv.Itoa(int(nt))
+}
+
+type NotifyServiceConfig struct {
+ AddTimeout time.Duration
+ NotifyTimeout time.Duration
+ MaxQueue int64
+}
+
+func (nsc NotifyServiceConfig) String() string {
+ return fmt.Sprintf("{acceptQueue: %d, accept: %s, notify: %s}",
+ nsc.MaxQueue, nsc.AddTimeout, nsc.NotifyTimeout)
+}
diff --git a/server/service/notification/listwatcher.go b/server/service/notification/listwatcher.go
index 5d911a2..a6942c9 100644
--- a/server/service/notification/listwatcher.go
+++ b/server/service/notification/listwatcher.go
@@ -32,7 +32,7 @@ type WatchJob struct {
type ListWatcher struct {
BaseSubscriber
- Job chan NotifyJob
+ Job chan *WatchJob
ListRevision int64
ListFunc func() (results []*pb.WatchInstanceResponse, rev int64)
@@ -56,7 +56,7 @@ func (w *ListWatcher) listAndPublishJobs(_ context.Context) {
results, rev := w.ListFunc()
w.ListRevision = rev
for _, response := range results {
- w.sendMessage(NewWatchJob(w.Type(), w.Id(), w.Subject(), w.ListRevision, response))
+ w.sendMessage(NewWatchJob(w.Id(), w.Subject(), w.ListRevision, response))
}
}
@@ -66,6 +66,11 @@ func (w *ListWatcher) OnMessage(job NotifyJob) {
return
}
+ wJob, ok := job.(*WatchJob)
+ if !ok {
+ return
+ }
+
timer := time.NewTimer(DEFAULT_ON_MESSAGE_TIMEOUT)
select {
case <-w.listCh:
@@ -77,16 +82,16 @@ func (w *ListWatcher) OnMessage(job NotifyJob) {
return
}
- if job.(*WatchJob).Revision <= w.ListRevision {
+ if wJob.Revision <= w.ListRevision {
util.Logger().Warnf(nil,
"unexpected notify %s job is coming in, watcher %s %s, job is %v, current revision is %v",
w.Type(), w.Id(), w.Subject(), job, w.ListRevision)
return
}
- w.sendMessage(job)
+ w.sendMessage(wJob)
}
-func (w *ListWatcher) sendMessage(job NotifyJob) {
+func (w *ListWatcher) sendMessage(job *WatchJob) {
util.Logger().Debugf("start to notify %s watcher %s %s, job is %v, current revision is %v", w.Type(),
w.Id(), w.Subject(), job, w.ListRevision)
defer util.RecoverAndReport()
@@ -105,27 +110,27 @@ func (w *ListWatcher) Close() {
close(w.Job)
}
-func NewWatchJob(nType NotifyType, subscriberId, subject string, rev int64, response *pb.WatchInstanceResponse) *WatchJob {
+func NewWatchJob(subscriberId, subject string, rev int64, response *pb.WatchInstanceResponse) *WatchJob {
return &WatchJob{
BaseNotifyJob: BaseNotifyJob{
subscriberId: subscriberId,
subject: subject,
- nType: nType,
+ nType: INSTANCE,
},
Revision: rev,
Response: response,
}
}
-func NewListWatcher(nType NotifyType, id string, subject string,
+func NewListWatcher(id string, subject string,
listFunc func() (results []*pb.WatchInstanceResponse, rev int64)) *ListWatcher {
watcher := &ListWatcher{
BaseSubscriber: BaseSubscriber{
id: id,
subject: subject,
- nType: nType,
+ nType: INSTANCE,
},
- Job: make(chan NotifyJob, DEFAULT_MAX_QUEUE),
+ Job: make(chan *WatchJob, DEFAULT_MAX_QUEUE),
ListFunc: listFunc,
listCh: make(chan struct{}),
}
diff --git a/server/service/util/common.go b/server/service/notification/notice.go
similarity index 66%
copy from server/service/util/common.go
copy to server/service/notification/notice.go
index 6c25e51..498d435 100644
--- a/server/service/util/common.go
+++ b/server/service/notification/notice.go
@@ -14,12 +14,28 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package util
+package notification
-const (
- HEADER_REV = "X-Resource-Revision"
- CTX_NOCACHE = "noCache"
- CTX_CACHEONLY = "cacheOnly"
- CTX_REQUEST_REVISION = "requestRev"
- CTX_RESPONSE_REVISION = "responseRev"
-)
+type NotifyJob interface {
+ SubscriberId() string
+ Subject() string
+ Type() NotifyType
+}
+
+type BaseNotifyJob struct {
+ subscriberId string
+ subject string
+ nType NotifyType
+}
+
+func (s *BaseNotifyJob) SubscriberId() string {
+ return s.subscriberId
+}
+
+func (s *BaseNotifyJob) Subject() string {
+ return s.subject
+}
+
+func (s *BaseNotifyJob) Type() NotifyType {
+ return s.nType
+}
diff --git a/server/service/notification/stream.go b/server/service/notification/stream.go
new file mode 100644
index 0000000..101473d
--- /dev/null
+++ b/server/service/notification/stream.go
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package notification
+
+import (
+ "errors"
+ "github.com/apache/incubator-servicecomb-service-center/pkg/util"
+ pb "github.com/apache/incubator-servicecomb-service-center/server/core/proto"
+ "time"
+)
+
+func HandleWatchJob(watcher *ListWatcher, stream pb.ServiceInstanceCtrl_WatchServer, timeout time.Duration) (err error) {
+ for {
+ timer := time.NewTimer(timeout)
+ select {
+ case <-timer.C:
+ // TODO grpc 长连接心跳?
+ case job := <-watcher.Job:
+ timer.Stop()
+
+ if job == nil {
+ err = errors.New("channel is closed")
+ util.Logger().Errorf(err, "watcher %s %s caught an exception",
+ watcher.Subject(), watcher.Id())
+ return
+ }
+ resp := job.Response
+ util.Logger().Infof("event is coming in, watcher %s %s",
+ watcher.Subject(), watcher.Id())
+
+ err = stream.Send(resp)
+ if err != nil {
+ util.Logger().Errorf(err, "send message error, watcher %s %s",
+ watcher.Subject(), watcher.Id())
+ watcher.SetError(err)
+ return
+ }
+ }
+ }
+}
diff --git a/server/service/notification/struct.go b/server/service/notification/subscriber.go
similarity index 64%
rename from server/service/notification/struct.go
rename to server/service/notification/subscriber.go
index f6564ff..9f0b4df 100644
--- a/server/service/notification/struct.go
+++ b/server/service/notification/subscriber.go
@@ -18,42 +18,8 @@ package notification
import (
"errors"
- "fmt"
- "strconv"
- "time"
)
-const (
- DEFAULT_MAX_QUEUE = 1000
- DEFAULT_INIT_SUBSCRIBERS = 1000
- DEFAULT_ON_MESSAGE_TIMEOUT = 100 * time.Millisecond
- DEFAULT_TIMEOUT = 30 * time.Second
-
- NOTIFTY NotifyType = iota
- INSTANCE
- typeEnd
-)
-
-type NotifyType int
-
-func (nt NotifyType) String() string {
- if int(nt) < len(notifyTypeNames) {
- return notifyTypeNames[nt]
- }
- return "NotifyType" + strconv.Itoa(int(nt))
-}
-
-type NotifyServiceConfig struct {
- AddTimeout time.Duration
- NotifyTimeout time.Duration
- MaxQueue int64
-}
-
-func (nsc NotifyServiceConfig) String() string {
- return fmt.Sprintf("{acceptQueue: %d, accept: %s, notify: %s}",
- nsc.MaxQueue, nsc.AddTimeout, nsc.NotifyTimeout)
-}
-
type Subscriber interface {
Err() error
SetError(err error)
@@ -68,12 +34,6 @@ type Subscriber interface {
Close()
}
-type NotifyJob interface {
- SubscriberId() string
- Subject() string
- Type() NotifyType
-}
-
type BaseSubscriber struct {
id string
subject string
@@ -122,21 +82,3 @@ func (s *BaseSubscriber) OnMessage(job NotifyJob) {
func (s *BaseSubscriber) Close() {
}
-
-type BaseNotifyJob struct {
- subscriberId string
- subject string
- nType NotifyType
-}
-
-func (s *BaseNotifyJob) SubscriberId() string {
- return s.subscriberId
-}
-
-func (s *BaseNotifyJob) Subject() string {
- return s.subject
-}
-
-func (s *BaseNotifyJob) Type() NotifyType {
- return s.nType
-}
diff --git a/server/service/notification/watch_util.go b/server/service/notification/websocket.go
similarity index 82%
rename from server/service/notification/watch_util.go
rename to server/service/notification/websocket.go
index 3a13500..f53e641 100644
--- a/server/service/notification/watch_util.go
+++ b/server/service/notification/websocket.go
@@ -18,7 +18,6 @@ package notification
import (
"encoding/json"
- "errors"
"fmt"
"github.com/apache/incubator-servicecomb-service-center/pkg/util"
apt "github.com/apache/incubator-servicecomb-service-center/server/core"
@@ -29,36 +28,6 @@ import (
"time"
)
-func HandleWatchJob(watcher *ListWatcher, stream pb.ServiceInstanceCtrl_WatchServer, timeout time.Duration) (err error) {
- for {
- timer := time.NewTimer(timeout)
- select {
- case <-timer.C:
- // TODO grpc 长连接心跳?
- case job := <-watcher.Job:
- timer.Stop()
-
- if job == nil {
- err = errors.New("channel is closed")
- util.Logger().Errorf(err, "watcher %s %s caught an exception",
- watcher.Subject(), watcher.Id())
- return
- }
- resp := job.(*WatchJob).Response
- util.Logger().Infof("event is coming in, watcher %s %s",
- watcher.Subject(), watcher.Id())
-
- err = stream.Send(resp)
- if err != nil {
- util.Logger().Errorf(err, "send message error, watcher %s %s",
- watcher.Subject(), watcher.Id())
- watcher.SetError(err)
- return
- }
- }
- }
-}
-
type WebSocketHandler struct {
ctx context.Context
conn *websocket.Conn
@@ -71,7 +40,7 @@ type WebSocketHandler struct {
func (wh *WebSocketHandler) Init() error {
remoteAddr := wh.conn.RemoteAddr().String()
if err := GetNotifyService().AddSubscriber(wh.watcher); err != nil {
- err = fmt.Errorf("establish[%s] websocket watch failed: notify service error, %s.",
+ err = fmt.Errorf("establish[%s] websocket watch failed: notify service error, %s",
remoteAddr, err.Error())
util.Logger().Errorf(nil, err.Error())
@@ -207,7 +176,7 @@ func (wh *WebSocketHandler) HandleWatchWebSocketJob() {
return
}
- resp := job.(*WatchJob).Response
+ resp := job.Response
providerFlag := fmt.Sprintf("%s/%s/%s", resp.Key.AppId, resp.Key.ServiceName, resp.Key.Version)
if resp.Action != string(pb.EVT_EXPIRE) {
@@ -261,7 +230,7 @@ func DoWebSocketListAndWatch(ctx context.Context, serviceId string, f func() ([]
handler := &WebSocketHandler{
ctx: ctx,
conn: conn,
- watcher: NewInstanceListWatcher(serviceId, apt.GetInstanceRootKey(domainProject)+"/", f),
+ watcher: NewListWatcher(serviceId, apt.GetInstanceRootKey(domainProject)+"/", f),
needPingWatcher: true,
closed: make(chan struct{}),
goroutine: util.NewGo(context.Background()),
@@ -283,23 +252,3 @@ func EstablishWebSocketError(conn *websocket.Conn, err error) {
util.Logger().Errorf(err, "establish[%s] websocket watch failed: write message failed.", remoteAddr)
}
}
-
-func PublishInstanceEvent(domainProject string, action pb.EventType, serviceKey *pb.MicroServiceKey, instance *pb.MicroServiceInstance, rev int64, subscribers []string) {
- response := &pb.WatchInstanceResponse{
- Response: pb.CreateResponse(pb.Response_SUCCESS, "Watch instance successfully."),
- Action: string(action),
- Key: serviceKey,
- Instance: instance,
- }
- for _, consumerId := range subscribers {
- job := NewWatchJob(INSTANCE, consumerId, apt.GetInstanceRootKey(domainProject)+"/", rev, response)
- util.Logger().Debugf("publish event to notify service, %v", job)
-
- // TODO add超时怎么处理?
- GetNotifyService().AddJob(job)
- }
-}
-
-func NewInstanceListWatcher(selfServiceId, instanceRoot string, listFunc func() (results []*pb.WatchInstanceResponse, rev int64)) *ListWatcher {
- return NewListWatcher(INSTANCE, selfServiceId, instanceRoot, listFunc)
-}
diff --git a/server/service/util/common.go b/server/service/util/common.go
index 6c25e51..9e31b3f 100644
--- a/server/service/util/common.go
+++ b/server/service/util/common.go
@@ -16,10 +16,14 @@
*/
package util
+import "time"
+
const (
HEADER_REV = "X-Resource-Revision"
CTX_NOCACHE = "noCache"
CTX_CACHEONLY = "cacheOnly"
CTX_REQUEST_REVISION = "requestRev"
CTX_RESPONSE_REVISION = "responseRev"
+
+ cacheTTL = 5 * time.Minute
)
diff --git a/server/service/util/find_cache.go b/server/service/util/find_cache.go
new file mode 100644
index 0000000..d5c2ab7
--- /dev/null
+++ b/server/service/util/find_cache.go
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package util
+
+import (
+ "errors"
+ "github.com/apache/incubator-servicecomb-service-center/pkg/util"
+ pb "github.com/apache/incubator-servicecomb-service-center/server/core/proto"
+ "github.com/karlseguin/ccache"
+)
+
+var FindInstancesCache = &VersionRuleCache{
+ c: ccache.Layered(ccache.Configure()),
+}
+
+type VersionRuleCacheItem struct {
+ Instances []*pb.MicroServiceInstance
+ Rev int64
+}
+
+type VersionRuleCache struct {
+ c *ccache.LayeredCache
+}
+
+func (c *VersionRuleCache) primaryKey(domainProject string, provider *pb.MicroServiceKey) string {
+ return util.StringJoin([]string{
+ domainProject,
+ provider.Environment,
+ provider.AppId,
+ provider.ServiceName}, "/")
+}
+
+func (c *VersionRuleCache) Get(domainProject, consumer string, provider *pb.MicroServiceKey) *VersionRuleCacheItem {
+ item, _ := c.c.Fetch(c.primaryKey(domainProject, provider), provider.Version, cacheTTL, func() (interface{}, error) {
+ return nil, errors.New("not exist")
+ })
+ if item == nil || item.Expired() {
+ return nil
+ }
+
+ if v, ok := item.Value().(*util.ConcurrentMap).Get(consumer); ok {
+ return v.(*VersionRuleCacheItem)
+ }
+ return nil
+}
+
+func (c *VersionRuleCache) Set(domainProject, consumer string, provider *pb.MicroServiceKey, item *VersionRuleCacheItem) {
+ c2, _ := c.c.Fetch(c.primaryKey(domainProject, provider), provider.Version, cacheTTL, func() (interface{}, error) {
+ // new one if not exist
+ return util.NewConcurrentMap(1), nil
+ })
+ c2.Value().(*util.ConcurrentMap).Put(consumer, item)
+}
+
+func (c *VersionRuleCache) Delete(domainProject, consumer string, provider *pb.MicroServiceKey) {
+ c.c.DeleteAll(c.primaryKey(domainProject, provider))
+}
diff --git a/server/service/util/find_cache_test.go b/server/service/util/find_cache_test.go
new file mode 100644
index 0000000..9275101
--- /dev/null
+++ b/server/service/util/find_cache_test.go
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package util
+
+import (
+ pb "github.com/apache/incubator-servicecomb-service-center/server/core/proto"
+ "testing"
+)
+
+func TestVersionRuleCache_Get(t *testing.T) {
+ p := &pb.MicroServiceKey{}
+ c := FindInstancesCache.Get("d", "c", p)
+ if c != nil {
+ t.Fatalf("TestVersionRuleCache_Get failed, %v", c)
+ }
+
+ r := &VersionRuleCacheItem{Rev: 1}
+ FindInstancesCache.Set("d", "c", p, r)
+ c = FindInstancesCache.Get("d", "c", p)
+ if c == nil {
+ t.Fatalf("TestVersionRuleCache_Get failed, %v", c)
+ }
+ if c.Rev != 1 {
+ t.Fatalf("TestVersionRuleCache_Get failed, rev %d != 1", c.Rev)
+ }
+ c = FindInstancesCache.Get("d", "c2", p)
+ if c != nil {
+ t.Fatalf("TestVersionRuleCache_Get failed, %v", c)
+ }
+
+ p2 := &pb.MicroServiceKey{ServiceName: "p2"}
+ FindInstancesCache.Set("d", "c2", p, r)
+ FindInstancesCache.Set("d", "c2", p2, r)
+ FindInstancesCache.Delete("d", "c", p)
+ c = FindInstancesCache.Get("d", "c2", p)
+ if c != nil {
+ t.Fatalf("TestVersionRuleCache_Get failed, %v", c)
+ }
+ c = FindInstancesCache.Get("d", "c2", p2)
+ if c == nil {
+ t.Fatalf("TestVersionRuleCache_Get failed, %v", c)
+ }
+ if c.Rev != 1 {
+ t.Fatalf("TestVersionRuleCache_Get failed, rev %d != 1", c.Rev)
+ }
+}
diff --git a/server/service/util/instance_util.go b/server/service/util/instance_util.go
index 6f3b3b3..bf91107 100644
--- a/server/service/util/instance_util.go
+++ b/server/service/util/instance_util.go
@@ -66,11 +66,12 @@ func GetInstance(ctx context.Context, domainProject string, serviceId string, in
return instance, nil
}
-func GetAllInstancesOfServices(ctx context.Context, domainProject string, ids []string) (instances []*pb.MicroServiceInstance, err error) {
+func GetAllInstancesOfServices(ctx context.Context, domainProject string, ids []string) (
+ instances []*pb.MicroServiceInstance, rev int64, err error) {
cloneCtx := util.CloneContext(ctx)
noCache, cacheOnly := ctx.Value(CTX_NOCACHE) == "1", ctx.Value(CTX_CACHEONLY) == "1"
- rev, _ := cloneCtx.Value(CTX_REQUEST_REVISION).(int64)
+ rev, _ = cloneCtx.Value(CTX_REQUEST_REVISION).(int64)
if !noCache && !cacheOnly && rev > 0 {
// force to find in cache at first time when rev > 0
util.SetContext(cloneCtx, CTX_CACHEONLY, "1")
@@ -86,7 +87,7 @@ func GetAllInstancesOfServices(ctx context.Context, domainProject string, ids []
opts := append(FromContext(cloneCtx), registry.WithStrKey(key), registry.WithPrefix())
resp, err := backend.Store().Instance().Search(cloneCtx, opts...)
if err != nil {
- return nil, err
+ return nil, 0, err
}
if len(resp.Kvs) > 0 {
@@ -122,13 +123,13 @@ func GetAllInstancesOfServices(ctx context.Context, domainProject string, ids []
instance := &pb.MicroServiceInstance{}
err := json.Unmarshal(kv.Value, instance)
if err != nil {
- return nil, fmt.Errorf("unmarshal %s faild, %s",
+ return nil, 0, fmt.Errorf("unmarshal %s faild, %s",
util.BytesToStringWithNoCopy(kv.Key), err.Error())
}
instances = append(instances, instance)
}
- util.SetContext(ctx, CTX_RESPONSE_REVISION, max)
+ rev = max
return
}
diff --git a/server/service/util/instance_util_test.go b/server/service/util/instance_util_test.go
index 635be35..b99d810 100644
--- a/server/service/util/instance_util_test.go
+++ b/server/service/util/instance_util_test.go
@@ -116,19 +116,19 @@ func TestGetInstanceCountOfOneService(t *testing.T) {
}
func TestGetInstanceCountOfServices(t *testing.T) {
- _, err := GetAllInstancesOfServices(util.SetContext(context.Background(), CTX_CACHEONLY, "1"), "", []string{"1"})
+ _, _, err := GetAllInstancesOfServices(util.SetContext(context.Background(), CTX_CACHEONLY, "1"), "", []string{"1"})
if err != nil {
t.Fatalf(`GetAllInstancesOfServices CTX_CACHEONLY failed`)
}
- _, err = GetAllInstancesOfServices(util.SetContext(context.Background(), CTX_NOCACHE, "1"), "", []string{"1"})
+ _, _, err = GetAllInstancesOfServices(util.SetContext(context.Background(), CTX_NOCACHE, "1"), "", []string{"1"})
if err == nil {
t.Fatalf(`GetAllInstancesOfServices CTX_NOCACHE failed`)
}
- _, err = GetAllInstancesOfServices(util.SetContext(context.Background(), CTX_REQUEST_REVISION, 1), "", []string{"1"})
+ _, _, err = GetAllInstancesOfServices(util.SetContext(context.Background(), CTX_REQUEST_REVISION, 1), "", []string{"1"})
if err == nil {
t.Fatalf(`GetAllInstancesOfServices CTX_REQUEST_REVISION failed`)
}
- _, err = GetAllInstancesOfServices(context.Background(), "", []string{"1"})
+ _, _, err = GetAllInstancesOfServices(context.Background(), "", []string{"1"})
if err == nil {
t.Fatalf(`GetAllInstancesOfServices failed`)
}
diff --git a/server/service/watch.go b/server/service/watch.go
index e0b5780..1fcefa0 100644
--- a/server/service/watch.go
+++ b/server/service/watch.go
@@ -45,7 +45,7 @@ func (s *InstanceService) Watch(in *pb.WatchInstanceRequest, stream pb.ServiceIn
return err
}
domainProject := util.ParseDomainProject(stream.Context())
- watcher := nf.NewInstanceListWatcher(in.SelfServiceId, apt.GetInstanceRootKey(domainProject)+"/", nil)
+ watcher := nf.NewListWatcher(in.SelfServiceId, apt.GetInstanceRootKey(domainProject)+"/", nil)
err = nf.GetNotifyService().AddSubscriber(watcher)
util.Logger().Infof("start watch instance status, watcher %s %s", watcher.Subject(), watcher.Id())
return nf.HandleWatchJob(watcher, stream, nf.GetNotifyService().Config.NotifyTimeout)