You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by li...@apache.org on 2019/01/29 01:48:31 UTC
[servicecomb-service-center] branch master updated: SCB-1059
Optimize quota plugin (#529)
This is an automated email from the ASF dual-hosted git repository.
littlecui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-service-center.git
The following commit(s) were added to refs/heads/master by this push:
new 5782435 SCB-1059 Optimize quota plugin (#529)
5782435 is described below
commit 5782435c1dd45a712d02abfaba5fdb2ec4100b97
Author: little-cui <su...@qq.com>
AuthorDate: Tue Jan 29 09:48:27 2019 +0800
SCB-1059 Optimize quota plugin (#529)
---
server/plugin/pkg/quota/buildin/buildin.go | 2 +
server/plugin/pkg/quota/buildin/common.go | 6 +-
server/plugin/pkg/quota/buildin/counter.go | 57 ++++++
server/plugin/pkg/quota/buildin/counter_test.go | 50 +++++
server/plugin/pkg/quota/counter/counter.go | 49 +++++
server/plugin/pkg/quota/counter/event.go | 105 ++++++++++
server/plugin/pkg/quota/counter/event_test.go | 260 ++++++++++++++++++++++++
server/service/event/instance_event_handler.go | 16 +-
server/service/util/instance_util.go | 11 +-
9 files changed, 544 insertions(+), 12 deletions(-)
diff --git a/server/plugin/pkg/quota/buildin/buildin.go b/server/plugin/pkg/quota/buildin/buildin.go
index 06ef630..6d5f147 100644
--- a/server/plugin/pkg/quota/buildin/buildin.go
+++ b/server/plugin/pkg/quota/buildin/buildin.go
@@ -20,11 +20,13 @@ import (
"github.com/apache/servicecomb-service-center/pkg/log"
mgr "github.com/apache/servicecomb-service-center/server/plugin"
"github.com/apache/servicecomb-service-center/server/plugin/pkg/quota"
+ "github.com/apache/servicecomb-service-center/server/plugin/pkg/quota/counter"
"golang.org/x/net/context"
)
func init() {
mgr.RegisterPlugin(mgr.Plugin{mgr.QUOTA, "buildin", New})
+ counter.RegisterCounterListener("buildin")
}
func New() mgr.PluginInstance {
diff --git a/server/plugin/pkg/quota/buildin/common.go b/server/plugin/pkg/quota/buildin/common.go
index eba7eb5..6e47136 100644
--- a/server/plugin/pkg/quota/buildin/common.go
+++ b/server/plugin/pkg/quota/buildin/common.go
@@ -85,11 +85,9 @@ func resourceLimitHandler(ctx context.Context, res *quota.ApplyQuotaResource) (i
switch res.QuotaType {
case quota.MicroServiceInstanceQuotaType:
- key = core.GetInstanceRootKey("")
- indexer = backend.Store().Instance()
+ return globalCounter.InstanceCount, nil
case quota.MicroServiceQuotaType:
- key = core.GetServiceRootKey("")
- indexer = backend.Store().Service()
+ return globalCounter.ServiceCount, nil
case quota.RuleQuotaType:
key = core.GenerateServiceRuleKey(domainProject, serviceId, "")
indexer = backend.Store().Rule()
diff --git a/server/plugin/pkg/quota/buildin/counter.go b/server/plugin/pkg/quota/buildin/counter.go
new file mode 100644
index 0000000..86eb066
--- /dev/null
+++ b/server/plugin/pkg/quota/buildin/counter.go
@@ -0,0 +1,57 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package buildin
+
+import (
+ "github.com/apache/servicecomb-service-center/server/core/backend"
+ "github.com/apache/servicecomb-service-center/server/plugin/pkg/discovery"
+ "github.com/apache/servicecomb-service-center/server/plugin/pkg/quota/counter"
+)
+
+var globalCounter = &GlobalCounter{}
+
+func init() {
+ counter.RegisterCounter(globalCounter)
+}
+
+type GlobalCounter struct {
+ ServiceCount int64
+ InstanceCount int64
+}
+
+func (c *GlobalCounter) OnCreate(t discovery.Type, domainProject string) {
+ switch t {
+ case backend.SERVICE_INDEX:
+ c.ServiceCount++
+ case backend.INSTANCE:
+ c.InstanceCount++
+ }
+}
+
+func (c *GlobalCounter) OnDelete(t discovery.Type, domainProject string) {
+ switch t {
+ case backend.SERVICE_INDEX:
+ if c.ServiceCount == 0 {
+ return
+ }
+ c.ServiceCount--
+ case backend.INSTANCE:
+ if c.InstanceCount == 0 {
+ return
+ }
+ c.InstanceCount--
+ }
+}
diff --git a/server/plugin/pkg/quota/buildin/counter_test.go b/server/plugin/pkg/quota/buildin/counter_test.go
new file mode 100644
index 0000000..a8591cf
--- /dev/null
+++ b/server/plugin/pkg/quota/buildin/counter_test.go
@@ -0,0 +1,50 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package buildin
+
+import (
+ "github.com/apache/servicecomb-service-center/server/core/backend"
+ "testing"
+)
+
+func TestGlobalCounter_OnCreate(t *testing.T) {
+ var counter GlobalCounter
+ counter.OnCreate(backend.SERVICE, "a/b")
+ counter.OnCreate(backend.SERVICE_INDEX, "a/b")
+ counter.OnCreate(backend.INSTANCE, "a/b")
+ counter.OnCreate(backend.SERVICE_INDEX, "a/b")
+ counter.OnCreate(backend.INSTANCE, "a/b")
+ if counter.ServiceCount != 2 || counter.InstanceCount != 2 {
+ t.Fatal("TestGlobalCounter_OnCreate failed", counter)
+ }
+}
+
+func TestGlobalCounter_OnDelete(t *testing.T) {
+ var counter GlobalCounter
+ counter.OnDelete(backend.SERVICE, "a/b")
+ counter.OnDelete(backend.SERVICE_INDEX, "a/b")
+ counter.OnDelete(backend.INSTANCE, "a/b")
+ if counter.ServiceCount != 0 || counter.InstanceCount != 0 {
+ t.Fatal("TestGlobalCounter_OnDelete failed", counter)
+ }
+ counter.OnCreate(backend.SERVICE_INDEX, "a/b")
+ counter.OnCreate(backend.INSTANCE, "a/b")
+ counter.OnDelete(backend.SERVICE_INDEX, "a/b")
+ counter.OnDelete(backend.INSTANCE, "a/b")
+ if counter.ServiceCount != 0 || counter.InstanceCount != 0 {
+ t.Fatal("TestGlobalCounter_OnDelete failed", counter)
+ }
+}
diff --git a/server/plugin/pkg/quota/counter/counter.go b/server/plugin/pkg/quota/counter/counter.go
new file mode 100644
index 0000000..6aa7a63
--- /dev/null
+++ b/server/plugin/pkg/quota/counter/counter.go
@@ -0,0 +1,49 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package counter
+
+import (
+ "github.com/apache/servicecomb-service-center/server/plugin/pkg/discovery"
+)
+
+var counters = Counters{}
+
+type Counter interface {
+ OnCreate(t discovery.Type, domainProject string)
+ OnDelete(t discovery.Type, domainProject string)
+}
+
+type Counters []Counter
+
+func (cs Counters) OnCreate(t discovery.Type, domainProject string) {
+ for _, c := range cs {
+ c.OnCreate(t, domainProject)
+ }
+}
+
+func (cs Counters) OnDelete(t discovery.Type, domainProject string) {
+ for _, c := range cs {
+ c.OnDelete(t, domainProject)
+ }
+}
+
+func RegisterCounter(c Counter) {
+ counters = append(counters, c)
+}
+
+func GetCounters() Counters {
+ return counters
+}
diff --git a/server/plugin/pkg/quota/counter/event.go b/server/plugin/pkg/quota/counter/event.go
new file mode 100644
index 0000000..3dcf38a
--- /dev/null
+++ b/server/plugin/pkg/quota/counter/event.go
@@ -0,0 +1,105 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package counter
+
+import (
+ "github.com/apache/servicecomb-service-center/pkg/log"
+ "github.com/apache/servicecomb-service-center/pkg/util"
+ "github.com/apache/servicecomb-service-center/server/core"
+ "github.com/apache/servicecomb-service-center/server/core/backend"
+ pb "github.com/apache/servicecomb-service-center/server/core/proto"
+ "github.com/apache/servicecomb-service-center/server/plugin/pkg/discovery"
+ serviceUtil "github.com/apache/servicecomb-service-center/server/service/util"
+ "github.com/astaxie/beego"
+ "golang.org/x/net/context"
+)
+
+var (
+ SharedServiceIds util.ConcurrentMap
+)
+
+type ServiceIndexEventHandler struct {
+}
+
+func (h *ServiceIndexEventHandler) Type() discovery.Type {
+ return backend.SERVICE_INDEX
+}
+
+func (h *ServiceIndexEventHandler) OnEvent(evt discovery.KvEvent) {
+ key := core.GetInfoFromSvcIndexKV(evt.KV.Key)
+ if core.IsShared(key) {
+ SharedServiceIds.Put(key.Tenant+core.SPLIT+evt.KV.Value.(string), struct{}{})
+ return
+ }
+
+ switch evt.Type {
+ case pb.EVT_INIT, pb.EVT_CREATE:
+ GetCounters().OnCreate(h.Type(), key.Tenant)
+ case pb.EVT_DELETE:
+ GetCounters().OnDelete(h.Type(), key.Tenant)
+ default:
+ }
+}
+
+func NewServiceIndexEventHandler() *ServiceIndexEventHandler {
+ return &ServiceIndexEventHandler{}
+}
+
+type InstanceEventHandler struct {
+ SharedServiceIds map[string]struct{}
+}
+
+func (h *InstanceEventHandler) Type() discovery.Type {
+ return backend.INSTANCE
+}
+
+func (h *InstanceEventHandler) OnEvent(evt discovery.KvEvent) {
+ serviceId, _, domainProject := core.GetInfoFromInstKV(evt.KV.Key)
+ key := domainProject + core.SPLIT + serviceId
+ if _, ok := SharedServiceIds.Get(key); ok {
+ return
+ }
+
+ switch evt.Type {
+ case pb.EVT_INIT, pb.EVT_CREATE:
+ if domainProject == core.REGISTRY_DOMAIN_PROJECT {
+ service, err := serviceUtil.GetService(context.Background(), domainProject, serviceId)
+ if service == nil || err != nil {
+ log.Errorf(err, "GetService[%s] failed", key)
+ return
+ }
+ if core.IsShared(pb.MicroServiceToKey(domainProject, service)) {
+ SharedServiceIds.Put(key, struct{}{})
+ return
+ }
+ }
+ GetCounters().OnCreate(h.Type(), domainProject)
+ case pb.EVT_DELETE:
+ GetCounters().OnDelete(h.Type(), domainProject)
+ }
+}
+
+func NewInstanceEventHandler() *InstanceEventHandler {
+ return &InstanceEventHandler{SharedServiceIds: make(map[string]struct{})}
+}
+
+func RegisterCounterListener(pluginName string) {
+ if pluginName != beego.AppConfig.DefaultString("quota_plugin", "buildin") {
+ return
+ }
+ discovery.AddEventHandler(NewServiceIndexEventHandler())
+ discovery.AddEventHandler(NewInstanceEventHandler())
+}
diff --git a/server/plugin/pkg/quota/counter/event_test.go b/server/plugin/pkg/quota/counter/event_test.go
new file mode 100644
index 0000000..9254b8e
--- /dev/null
+++ b/server/plugin/pkg/quota/counter/event_test.go
@@ -0,0 +1,260 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package counter
+
+import (
+ "github.com/apache/servicecomb-service-center/server/core"
+ "github.com/apache/servicecomb-service-center/server/core/backend"
+ "github.com/apache/servicecomb-service-center/server/core/proto"
+ "github.com/apache/servicecomb-service-center/server/plugin/pkg/discovery"
+ "testing"
+)
+
+type mockCounter struct {
+ ServiceCount int64
+ InstanceCount int64
+}
+
+func (c *mockCounter) OnCreate(t discovery.Type, domainProject string) {
+ switch t {
+ case backend.SERVICE_INDEX:
+ c.ServiceCount++
+ case backend.INSTANCE:
+ c.InstanceCount++
+ default:
+ panic("error")
+ }
+}
+
+func (c *mockCounter) OnDelete(t discovery.Type, domainProject string) {
+ switch t {
+ case backend.SERVICE_INDEX:
+ c.ServiceCount--
+ case backend.INSTANCE:
+ c.InstanceCount--
+ default:
+ panic("error")
+ }
+}
+
+func TestNewServiceIndexEventHandler(t *testing.T) {
+
+ var counter = mockCounter{}
+ RegisterCounter(&counter)
+ h := NewServiceIndexEventHandler()
+
+ cases := []discovery.KvEvent{
+ {
+ Type: proto.EVT_INIT,
+ KV: &discovery.KeyValue{
+ Key: []byte(core.GenerateServiceIndexKey(&proto.MicroServiceKey{
+ Tenant: core.REGISTRY_DOMAIN_PROJECT,
+ Project: "",
+ AppId: core.REGISTRY_APP_ID,
+ ServiceName: core.REGISTRY_SERVICE_NAME,
+ Version: "e",
+ Environment: "f",
+ Alias: "g",
+ })),
+ Value: "1",
+ },
+ },
+ {
+ Type: proto.EVT_UPDATE,
+ KV: &discovery.KeyValue{
+ Key: []byte(core.GenerateServiceIndexKey(&proto.MicroServiceKey{
+ Tenant: core.REGISTRY_DOMAIN_PROJECT,
+ Project: "",
+ AppId: core.REGISTRY_APP_ID,
+ ServiceName: core.REGISTRY_SERVICE_NAME,
+ Version: "e",
+ Environment: "f",
+ Alias: "g",
+ })),
+ Value: "1",
+ },
+ },
+ {
+ Type: proto.EVT_DELETE,
+ KV: &discovery.KeyValue{
+ Key: []byte(core.GenerateServiceIndexKey(&proto.MicroServiceKey{
+ Tenant: core.REGISTRY_DOMAIN_PROJECT,
+ Project: "",
+ AppId: core.REGISTRY_APP_ID,
+ ServiceName: core.REGISTRY_SERVICE_NAME,
+ Version: "e",
+ Environment: "f",
+ Alias: "g",
+ })),
+ Value: "1",
+ },
+ },
+ {
+ Type: proto.EVT_CREATE,
+ KV: &discovery.KeyValue{
+ Key: []byte(core.GenerateServiceIndexKey(&proto.MicroServiceKey{
+ Tenant: core.REGISTRY_DOMAIN_PROJECT,
+ Project: "",
+ AppId: core.REGISTRY_APP_ID,
+ ServiceName: core.REGISTRY_SERVICE_NAME,
+ Version: "e",
+ Environment: "f",
+ Alias: "g",
+ })),
+ Value: "1",
+ },
+ },
+ {
+ Type: proto.EVT_INIT,
+ KV: &discovery.KeyValue{
+ Key: []byte(core.GenerateServiceIndexKey(&proto.MicroServiceKey{
+ Tenant: "a/b",
+ Project: "",
+ AppId: "c",
+ ServiceName: "d",
+ Version: "e",
+ Environment: "f",
+ Alias: "g",
+ })),
+ Value: "1",
+ },
+ },
+ {
+ Type: proto.EVT_DELETE,
+ KV: &discovery.KeyValue{
+ Key: []byte(core.GenerateServiceIndexKey(&proto.MicroServiceKey{
+ Tenant: "a/b",
+ Project: "",
+ AppId: "c",
+ ServiceName: "d",
+ Version: "e",
+ Environment: "f",
+ Alias: "g",
+ })),
+ Value: "1",
+ },
+ },
+ {
+ Type: proto.EVT_UPDATE,
+ KV: &discovery.KeyValue{
+ Key: []byte(core.GenerateServiceIndexKey(&proto.MicroServiceKey{
+ Tenant: "a/b",
+ Project: "",
+ AppId: "c",
+ ServiceName: "d",
+ Version: "e",
+ Environment: "f",
+ Alias: "g",
+ })),
+ Value: "1",
+ },
+ },
+ {
+ Type: proto.EVT_CREATE,
+ KV: &discovery.KeyValue{
+ Key: []byte(core.GenerateServiceIndexKey(&proto.MicroServiceKey{
+ Tenant: "a/b",
+ Project: "",
+ AppId: "c",
+ ServiceName: "d",
+ Version: "e",
+ Environment: "f",
+ Alias: "g",
+ })),
+ Value: "1",
+ },
+ },
+ }
+
+ for _, evt := range cases {
+ h.OnEvent(evt)
+ }
+ if counter.ServiceCount != 1 || counter.InstanceCount != 0 {
+ t.Fatal("TestNewServiceIndexEventHandler failed", counter)
+ }
+}
+
+func TestNewInstanceEventHandler(t *testing.T) {
+ var counter = mockCounter{}
+ RegisterCounter(&counter)
+ h := NewInstanceEventHandler()
+ SharedServiceIds.Put(core.REGISTRY_DOMAIN_PROJECT+core.SPLIT+"2", struct{}{})
+ cases := []discovery.KvEvent{
+ {
+ Type: proto.EVT_INIT,
+ KV: &discovery.KeyValue{
+ Key: []byte(core.GenerateInstanceKey(core.REGISTRY_DOMAIN_PROJECT, "2", "1")),
+ Value: nil,
+ },
+ },
+ {
+ Type: proto.EVT_UPDATE,
+ KV: &discovery.KeyValue{
+ Key: []byte(core.GenerateInstanceKey(core.REGISTRY_DOMAIN_PROJECT, "2", "1")),
+ Value: nil,
+ },
+ },
+ {
+ Type: proto.EVT_CREATE,
+ KV: &discovery.KeyValue{
+ Key: []byte(core.GenerateInstanceKey(core.REGISTRY_DOMAIN_PROJECT, "2", "1")),
+ Value: nil,
+ },
+ },
+ {
+ Type: proto.EVT_DELETE,
+ KV: &discovery.KeyValue{
+ Key: []byte(core.GenerateInstanceKey(core.REGISTRY_DOMAIN_PROJECT, "2", "1")),
+ Value: nil,
+ },
+ },
+ {
+ Type: proto.EVT_INIT,
+ KV: &discovery.KeyValue{
+ Key: []byte(core.GenerateInstanceKey("a/b", "1", "1")),
+ Value: nil,
+ },
+ },
+ {
+ Type: proto.EVT_DELETE,
+ KV: &discovery.KeyValue{
+ Key: []byte(core.GenerateInstanceKey("a/b", "1", "1")),
+ Value: nil,
+ },
+ },
+ {
+ Type: proto.EVT_UPDATE,
+ KV: &discovery.KeyValue{
+ Key: []byte(core.GenerateInstanceKey("a/b", "1", "1")),
+ Value: nil,
+ },
+ },
+ {
+ Type: proto.EVT_CREATE,
+ KV: &discovery.KeyValue{
+ Key: []byte(core.GenerateInstanceKey("a/b", "1", "1")),
+ Value: nil,
+ },
+ },
+ }
+
+ for _, evt := range cases {
+ h.OnEvent(evt)
+ }
+ if counter.InstanceCount != 1 || counter.ServiceCount != 0 {
+ t.Fatal("TestNewServiceIndexEventHandler failed", counter)
+ }
+}
diff --git a/server/service/event/instance_event_handler.go b/server/service/event/instance_event_handler.go
index 1ea5310..e82ec04 100644
--- a/server/service/event/instance_event_handler.go
+++ b/server/service/event/instance_event_handler.go
@@ -40,6 +40,7 @@ func (h *InstanceEventHandler) Type() discovery.Type {
func (h *InstanceEventHandler) OnEvent(evt discovery.KvEvent) {
action := evt.Type
+ instance := evt.KV.Value.(*pb.MicroServiceInstance)
providerId, providerInstanceId, domainProject := apt.GetInfoFromInstKV(evt.KV.Key)
idx := strings.Index(domainProject, "/")
domainName := domainProject[:idx]
@@ -59,8 +60,8 @@ func (h *InstanceEventHandler) OnEvent(evt discovery.KvEvent) {
}
if notify.NotifyCenter().Closed() {
- log.Warnf("caught [%s] instance[%s/%s] event, but notify service is closed",
- action, providerId, providerInstanceId)
+ log.Warnf("caught [%s] instance[%s/%s] event, endpoints %v, but notify service is closed",
+ action, providerId, providerInstanceId, instance.Endpoints)
return
}
@@ -70,13 +71,14 @@ func (h *InstanceEventHandler) OnEvent(evt discovery.KvEvent) {
serviceUtil.CTX_GLOBAL, "1")
ms, err := serviceUtil.GetService(ctx, domainProject, providerId)
if ms == nil {
- log.Errorf(err, "caught [%s] instance[%s/%s] event, get cached provider's file failed",
- action, providerId, providerInstanceId)
+ log.Errorf(err, "caught [%s] instance[%s/%s] event, endpoints %v, get cached provider's file failed",
+ action, providerId, providerInstanceId, instance.Endpoints)
return
}
- log.Infof("caught [%s] service[%s][%s/%s/%s/%s] instance[%s] event",
- action, providerId, ms.Environment, ms.AppId, ms.ServiceName, ms.Version, providerInstanceId)
+ log.Infof("caught [%s] service[%s][%s/%s/%s/%s] instance[%s] event, endpoints %v",
+ action, providerId, ms.Environment, ms.AppId, ms.ServiceName, ms.Version,
+ providerInstanceId, instance.Endpoints)
// 查询所有consumer
consumerIds, _, err := serviceUtil.GetAllConsumerIds(ctx, domainProject, ms)
@@ -87,7 +89,7 @@ func (h *InstanceEventHandler) OnEvent(evt discovery.KvEvent) {
}
PublishInstanceEvent(domainProject, action, pb.MicroServiceToKey(domainProject, ms),
- evt.KV.Value.(*pb.MicroServiceInstance), evt.Revision, consumerIds)
+ instance, evt.Revision, consumerIds)
}
func NewInstanceEventHandler() *InstanceEventHandler {
diff --git a/server/service/util/instance_util.go b/server/service/util/instance_util.go
index 188caf1..51cd5fe 100644
--- a/server/service/util/instance_util.go
+++ b/server/service/util/instance_util.go
@@ -210,6 +210,14 @@ func queryServiceInstancesKvs(ctx context.Context, serviceId string, rev int64)
}
func UpdateInstance(ctx context.Context, domainProject string, instance *pb.MicroServiceInstance) *scerr.Error {
+ leaseID, err := GetLeaseId(ctx, domainProject, instance.ServiceId, instance.InstanceId)
+ if err != nil {
+ return scerr.NewError(scerr.ErrInternal, err.Error())
+ }
+ if leaseID == -1 {
+ return scerr.NewError(scerr.ErrInstanceNotExists, "Instance's leaseId not exist.")
+ }
+
instance.ModTimestamp = strconv.FormatInt(time.Now().Unix(), 10)
data, err := json.Marshal(instance)
if err != nil {
@@ -217,11 +225,12 @@ func UpdateInstance(ctx context.Context, domainProject string, instance *pb.Micr
}
key := apt.GenerateInstanceKey(domainProject, instance.ServiceId, instance.InstanceId)
+
resp, err := backend.Registry().TxnWithCmp(ctx,
[]registry.PluginOp{registry.OpPut(
registry.WithStrKey(key),
registry.WithValue(data),
- registry.WithIgnoreLease())},
+ registry.WithLease(leaseID))},
[]registry.CompareOp{registry.OpCmp(
registry.CmpVer(util.StringToBytesWithNoCopy(apt.GenerateServiceKey(domainProject, instance.ServiceId))),
registry.CMP_NOT_EQUAL, 0)},