You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by li...@apache.org on 2021/03/30 03:33:01 UTC
[servicecomb-service-center] 01/01: SCB-2176 Auto grant lease if
instance exist
This is an automated email from the ASF dual-hosted git repository.
littlecui pushed a commit to branch lease
in repository https://gitbox.apache.org/repos/asf/servicecomb-service-center.git
commit cb9248faf642c083d3be84df44907efe62aed5e5
Author: little-cui <su...@qq.com>
AuthorDate: Tue Mar 30 11:12:37 2021 +0800
SCB-2176 Auto grant lease if instance exist
---
server/core/backend/common.go | 7 +-
.../{defer_test.go => defer_instance_test.go} | 0
server/core/backend/defer_lease.go | 138 +++++++++++++++++++++
server/plugin/discovery/config.go | 10 +-
server/plugin/discovery/etcd/cacher_kv.go | 4 +
server/plugin/discovery/etcd/common.go | 1 +
server/plugin/discovery/types.go | 1 +
7 files changed, 157 insertions(+), 4 deletions(-)
diff --git a/server/core/backend/common.go b/server/core/backend/common.go
index e624abc..53fa826 100644
--- a/server/core/backend/common.go
+++ b/server/core/backend/common.go
@@ -49,12 +49,15 @@ var (
)
func registerInnerTypes() {
+ leaseEventDeferHandler := NewLeaseEventDeferHandler()
+
SERVICE = Store().MustInstall(NewAddOn("SERVICE",
discovery.Configure().WithPrefix(core.GetServiceRootKey("")).
WithInitSize(500).WithParser(proto.ServiceParser)))
INSTANCE = Store().MustInstall(NewAddOn("INSTANCE",
discovery.Configure().WithPrefix(core.GetInstanceRootKey("")).
- WithInitSize(1000).WithParser(proto.InstanceParser)))
+ WithInitSize(1000).WithParser(proto.InstanceParser).WithLease().
+ WithDeferHandler(leaseEventDeferHandler)))
DOMAIN = Store().MustInstall(NewAddOn("DOMAIN",
discovery.Configure().WithPrefix(core.GetDomainRootKey()+core.SPLIT).
WithInitSize(100).WithParser(proto.StringParser)))
@@ -69,7 +72,7 @@ func registerInnerTypes() {
WithInitSize(100).WithParser(proto.RuleParser)))
LEASE = Store().MustInstall(NewAddOn("LEASE",
discovery.Configure().WithPrefix(core.GetInstanceLeaseRootKey("")).
- WithInitSize(1000).WithParser(proto.StringParser)))
+ WithInitSize(1000).WithParser(proto.StringParser).WithLease()))
ServiceIndex = Store().MustInstall(NewAddOn("SERVICE_INDEX",
discovery.Configure().WithPrefix(core.GetServiceIndexRootKey("")).
WithInitSize(500).WithParser(proto.StringParser)))
diff --git a/server/core/backend/defer_test.go b/server/core/backend/defer_instance_test.go
similarity index 100%
rename from server/core/backend/defer_test.go
rename to server/core/backend/defer_instance_test.go
diff --git a/server/core/backend/defer_lease.go b/server/core/backend/defer_lease.go
new file mode 100644
index 0000000..2abae36
--- /dev/null
+++ b/server/core/backend/defer_lease.go
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package backend
+
+import (
+ "context"
+ "encoding/json"
+ "fmt"
+ "github.com/apache/servicecomb-service-center/pkg/gopool"
+ "github.com/apache/servicecomb-service-center/pkg/log"
+ rmodel "github.com/apache/servicecomb-service-center/pkg/registry"
+ "github.com/apache/servicecomb-service-center/pkg/util"
+ apt "github.com/apache/servicecomb-service-center/server/core"
+ "github.com/apache/servicecomb-service-center/server/plugin/discovery"
+ "github.com/apache/servicecomb-service-center/server/plugin/registry"
+ "sync"
+)
+
+type LeaseEventDeferHandler struct {
+ once sync.Once
+ pendingCh chan []discovery.KvEvent
+ deferCh chan discovery.KvEvent
+}
+
+func (iedh *LeaseEventDeferHandler) OnCondition(cache discovery.CacheReader, evts []discovery.KvEvent) bool {
+ iedh.once.Do(func() {
+ iedh.pendingCh = make(chan []discovery.KvEvent, eventBlockSize)
+ iedh.deferCh = make(chan discovery.KvEvent, eventBlockSize)
+ gopool.Go(iedh.recoverLoop)
+ })
+
+ pendingEvts := make([]discovery.KvEvent, 0, eventBlockSize)
+ for _, evt := range evts {
+ //
+ iedh.deferCh <- evt
+ if evt.Type != rmodel.EVT_DELETE && evt.KV != nil && evt.KV.Lease == 0 {
+ pendingEvts = append(pendingEvts, evt)
+ }
+ }
+ if len(pendingEvts) == 0 {
+ return true
+ }
+
+ iedh.pendingCh <- pendingEvts
+ return true
+}
+
+func (iedh *LeaseEventDeferHandler) recoverLoop(ctx context.Context) {
+ defer log.Recover()
+
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ case evts, ok := <-iedh.pendingCh:
+ if !ok {
+ log.Error("lease pending chan is closed!", nil)
+ return
+ }
+ log.Info(fmt.Sprintf("start to recover leases[%d]", len(evts)))
+ for _, evt := range evts {
+ iedh.recoverLease(evt)
+ }
+ }
+ }
+}
+
+func (iedh *LeaseEventDeferHandler) recoverLease(evt discovery.KvEvent) {
+ ctx := context.Background()
+ kv := evt.KV
+ key := util.BytesToStringWithNoCopy(kv.Key)
+ instance, ok := kv.Value.(*rmodel.MicroServiceInstance)
+ if !ok {
+ log.Error(fmt.Sprintf("[%s] value covert to MicroServiceInstance failed", key), nil)
+ return
+ }
+
+ ttl := instance.HealthCheck.Interval * (instance.HealthCheck.Times + 1)
+ leaseID, err := Registry().LeaseGrant(ctx, int64(ttl))
+ if err != nil {
+ log.Error(fmt.Sprintf("[%s]grant lease failed", key), err)
+ return
+ }
+ data, err := json.Marshal(kv.Value)
+ if err != nil {
+ log.Error(fmt.Sprintf("[%s]Marshal instance failed", key), err)
+ return
+ }
+
+ serviceID, instanceID, domainProject := apt.GetInfoFromInstKV(kv.Key)
+ hbKey := apt.GenerateInstanceLeaseKey(domainProject, serviceID, instanceID)
+ opts := []registry.PluginOp{
+ registry.OpPut(registry.WithStrKey(key), registry.WithValue(data),
+ registry.WithLease(leaseID)),
+ registry.OpPut(registry.WithStrKey(hbKey), registry.WithStrValue(fmt.Sprintf("%d", leaseID)),
+ registry.WithLease(leaseID)),
+ }
+
+ resp, err := Registry().TxnWithCmp(ctx, opts,
+ []registry.CompareOp{registry.OpCmp(
+ registry.CmpVer(util.StringToBytesWithNoCopy(apt.GenerateServiceKey(domainProject, serviceID))),
+ registry.CmpNotEqual, 0)}, nil)
+ if err != nil {
+ log.Error(fmt.Sprintf("recover instance lease failed, serviceID %s, instanceID %s", serviceID, instanceID), nil)
+ return
+ }
+ if !resp.Succeeded {
+ log.Error(fmt.Sprintf("recover instance lease failed, serviceID %s, instanceID %s: service does not exist", serviceID, instanceID), nil)
+ return
+ }
+}
+
+func (iedh *LeaseEventDeferHandler) HandleChan() <-chan discovery.KvEvent {
+ return iedh.deferCh
+}
+
+func (iedh *LeaseEventDeferHandler) Reset() bool {
+ return false
+}
+
+func NewLeaseEventDeferHandler() *LeaseEventDeferHandler {
+ return &LeaseEventDeferHandler{}
+}
diff --git a/server/plugin/discovery/config.go b/server/plugin/discovery/config.go
index b14cbab..d0e53f0 100644
--- a/server/plugin/discovery/config.go
+++ b/server/plugin/discovery/config.go
@@ -31,11 +31,12 @@ type Config struct {
DeferHandler DeferHandler
OnEvent KvEventFunc
Parser proto.Parser
+ Lease bool
}
func (cfg *Config) String() string {
- return fmt.Sprintf("{key: %s, timeout: %s, period: %s}",
- cfg.Key, cfg.Timeout, cfg.Period)
+ return fmt.Sprintf("{key: %s, timeout: %s, period: %s, lease: %v}",
+ cfg.Key, cfg.Timeout, cfg.Period, cfg.Lease)
}
func (cfg *Config) WithPrefix(key string) *Config {
@@ -85,6 +86,11 @@ func (cfg *Config) WithParser(parser proto.Parser) *Config {
return cfg
}
+func (cfg *Config) WithLease() *Config {
+ cfg.Lease = true
+ return cfg
+}
+
func Configure() *Config {
return &Config{
Key: "/",
diff --git a/server/plugin/discovery/etcd/cacher_kv.go b/server/plugin/discovery/etcd/cacher_kv.go
index eaa302b..c37aadd 100644
--- a/server/plugin/discovery/etcd/cacher_kv.go
+++ b/server/plugin/discovery/etcd/cacher_kv.go
@@ -362,6 +362,7 @@ func (c *KvCacher) handleDeferEvents(ctx context.Context) {
evts = make([]discovery.KvEvent, eventBlockSize)
i int
)
+ // the interval of pack recv events into one block
interval := 300 * time.Millisecond
timer := time.NewTimer(interval)
defer timer.Stop()
@@ -371,6 +372,7 @@ func (c *KvCacher) handleDeferEvents(ctx context.Context) {
return
case evt, ok := <-c.Cfg.DeferHandler.HandleChan():
if !ok {
+ log.Error(fmt.Sprintf("[%s]defer handle chan is closed!", c.Cfg.Key), nil)
return
}
@@ -391,6 +393,8 @@ func (c *KvCacher) handleDeferEvents(ctx context.Context) {
continue
}
+ log.Debug(fmt.Sprintf("[%s]recv one defer events block[%d]", c.Cfg.Key, i))
+
c.onEvents(evts[:i])
evts = make([]discovery.KvEvent, eventBlockSize)
i = 0
diff --git a/server/plugin/discovery/etcd/common.go b/server/plugin/discovery/etcd/common.go
index b0c1a04..31d060f 100644
--- a/server/plugin/discovery/etcd/common.go
+++ b/server/plugin/discovery/etcd/common.go
@@ -43,6 +43,7 @@ func FromEtcdKeyValue(dist *discovery.KeyValue, src *mvccpb.KeyValue, parser pro
dist.Version = src.Version
dist.CreateRevision = src.CreateRevision
dist.ModRevision = src.ModRevision
+ dist.Lease = src.Lease
if parser == nil {
return
}
diff --git a/server/plugin/discovery/types.go b/server/plugin/discovery/types.go
index 7e75723..f5be39b 100644
--- a/server/plugin/discovery/types.go
+++ b/server/plugin/discovery/types.go
@@ -65,6 +65,7 @@ type KeyValue struct {
Version int64
CreateRevision int64
ModRevision int64
+ Lease int64
ClusterName string
}