You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by li...@apache.org on 2021/03/30 03:33:01 UTC

[servicecomb-service-center] 01/01: SCB-2176 Auto grant lease if instance exist

This is an automated email from the ASF dual-hosted git repository.

littlecui pushed a commit to branch lease
in repository https://gitbox.apache.org/repos/asf/servicecomb-service-center.git

commit cb9248faf642c083d3be84df44907efe62aed5e5
Author: little-cui <su...@qq.com>
AuthorDate: Tue Mar 30 11:12:37 2021 +0800

    SCB-2176 Auto grant lease if instance exist
---
 server/core/backend/common.go                      |   7 +-
 .../{defer_test.go => defer_instance_test.go}      |   0
 server/core/backend/defer_lease.go                 | 138 +++++++++++++++++++++
 server/plugin/discovery/config.go                  |  10 +-
 server/plugin/discovery/etcd/cacher_kv.go          |   4 +
 server/plugin/discovery/etcd/common.go             |   1 +
 server/plugin/discovery/types.go                   |   1 +
 7 files changed, 157 insertions(+), 4 deletions(-)

diff --git a/server/core/backend/common.go b/server/core/backend/common.go
index e624abc..53fa826 100644
--- a/server/core/backend/common.go
+++ b/server/core/backend/common.go
@@ -49,12 +49,15 @@ var (
 )
 
 func registerInnerTypes() {
+	leaseEventDeferHandler := NewLeaseEventDeferHandler()
+
 	SERVICE = Store().MustInstall(NewAddOn("SERVICE",
 		discovery.Configure().WithPrefix(core.GetServiceRootKey("")).
 			WithInitSize(500).WithParser(proto.ServiceParser)))
 	INSTANCE = Store().MustInstall(NewAddOn("INSTANCE",
 		discovery.Configure().WithPrefix(core.GetInstanceRootKey("")).
-			WithInitSize(1000).WithParser(proto.InstanceParser)))
+			WithInitSize(1000).WithParser(proto.InstanceParser).WithLease().
+			WithDeferHandler(leaseEventDeferHandler)))
 	DOMAIN = Store().MustInstall(NewAddOn("DOMAIN",
 		discovery.Configure().WithPrefix(core.GetDomainRootKey()+core.SPLIT).
 			WithInitSize(100).WithParser(proto.StringParser)))
@@ -69,7 +72,7 @@ func registerInnerTypes() {
 			WithInitSize(100).WithParser(proto.RuleParser)))
 	LEASE = Store().MustInstall(NewAddOn("LEASE",
 		discovery.Configure().WithPrefix(core.GetInstanceLeaseRootKey("")).
-			WithInitSize(1000).WithParser(proto.StringParser)))
+			WithInitSize(1000).WithParser(proto.StringParser).WithLease()))
 	ServiceIndex = Store().MustInstall(NewAddOn("SERVICE_INDEX",
 		discovery.Configure().WithPrefix(core.GetServiceIndexRootKey("")).
 			WithInitSize(500).WithParser(proto.StringParser)))
diff --git a/server/core/backend/defer_test.go b/server/core/backend/defer_instance_test.go
similarity index 100%
rename from server/core/backend/defer_test.go
rename to server/core/backend/defer_instance_test.go
diff --git a/server/core/backend/defer_lease.go b/server/core/backend/defer_lease.go
new file mode 100644
index 0000000..2abae36
--- /dev/null
+++ b/server/core/backend/defer_lease.go
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package backend
+
+import (
+	"context"
+	"encoding/json"
+	"fmt"
+	"github.com/apache/servicecomb-service-center/pkg/gopool"
+	"github.com/apache/servicecomb-service-center/pkg/log"
+	rmodel "github.com/apache/servicecomb-service-center/pkg/registry"
+	"github.com/apache/servicecomb-service-center/pkg/util"
+	apt "github.com/apache/servicecomb-service-center/server/core"
+	"github.com/apache/servicecomb-service-center/server/plugin/discovery"
+	"github.com/apache/servicecomb-service-center/server/plugin/registry"
+	"sync"
+)
+
+type LeaseEventDeferHandler struct {
+	once      sync.Once
+	pendingCh chan []discovery.KvEvent
+	deferCh   chan discovery.KvEvent
+}
+
+func (iedh *LeaseEventDeferHandler) OnCondition(cache discovery.CacheReader, evts []discovery.KvEvent) bool {
+	iedh.once.Do(func() {
+		iedh.pendingCh = make(chan []discovery.KvEvent, eventBlockSize)
+		iedh.deferCh = make(chan discovery.KvEvent, eventBlockSize)
+		gopool.Go(iedh.recoverLoop)
+	})
+
+	pendingEvts := make([]discovery.KvEvent, 0, eventBlockSize)
+	for _, evt := range evts {
+		//
+		iedh.deferCh <- evt
+		if evt.Type != rmodel.EVT_DELETE && evt.KV != nil && evt.KV.Lease == 0 {
+			pendingEvts = append(pendingEvts, evt)
+		}
+	}
+	if len(pendingEvts) == 0 {
+		return true
+	}
+
+	iedh.pendingCh <- pendingEvts
+	return true
+}
+
+func (iedh *LeaseEventDeferHandler) recoverLoop(ctx context.Context) {
+	defer log.Recover()
+
+	for {
+		select {
+		case <-ctx.Done():
+			return
+		case evts, ok := <-iedh.pendingCh:
+			if !ok {
+				log.Error("lease pending chan is closed!", nil)
+				return
+			}
+			log.Info(fmt.Sprintf("start to recover leases[%d]", len(evts)))
+			for _, evt := range evts {
+				iedh.recoverLease(evt)
+			}
+		}
+	}
+}
+
+func (iedh *LeaseEventDeferHandler) recoverLease(evt discovery.KvEvent) {
+	ctx := context.Background()
+	kv := evt.KV
+	key := util.BytesToStringWithNoCopy(kv.Key)
+	instance, ok := kv.Value.(*rmodel.MicroServiceInstance)
+	if !ok {
+		log.Error(fmt.Sprintf("[%s] value covert to MicroServiceInstance failed", key), nil)
+		return
+	}
+
+	ttl := instance.HealthCheck.Interval * (instance.HealthCheck.Times + 1)
+	leaseID, err := Registry().LeaseGrant(ctx, int64(ttl))
+	if err != nil {
+		log.Error(fmt.Sprintf("[%s]grant lease failed", key), err)
+		return
+	}
+	data, err := json.Marshal(kv.Value)
+	if err != nil {
+		log.Error(fmt.Sprintf("[%s]Marshal instance failed", key), err)
+		return
+	}
+
+	serviceID, instanceID, domainProject := apt.GetInfoFromInstKV(kv.Key)
+	hbKey := apt.GenerateInstanceLeaseKey(domainProject, serviceID, instanceID)
+	opts := []registry.PluginOp{
+		registry.OpPut(registry.WithStrKey(key), registry.WithValue(data),
+			registry.WithLease(leaseID)),
+		registry.OpPut(registry.WithStrKey(hbKey), registry.WithStrValue(fmt.Sprintf("%d", leaseID)),
+			registry.WithLease(leaseID)),
+	}
+
+	resp, err := Registry().TxnWithCmp(ctx, opts,
+		[]registry.CompareOp{registry.OpCmp(
+			registry.CmpVer(util.StringToBytesWithNoCopy(apt.GenerateServiceKey(domainProject, serviceID))),
+			registry.CmpNotEqual, 0)}, nil)
+	if err != nil {
+		log.Error(fmt.Sprintf("recover instance lease failed, serviceID %s, instanceID %s", serviceID, instanceID), nil)
+		return
+	}
+	if !resp.Succeeded {
+		log.Error(fmt.Sprintf("recover instance lease failed, serviceID %s, instanceID %s: service does not exist", serviceID, instanceID), nil)
+		return
+	}
+}
+
+func (iedh *LeaseEventDeferHandler) HandleChan() <-chan discovery.KvEvent {
+	return iedh.deferCh
+}
+
+func (iedh *LeaseEventDeferHandler) Reset() bool {
+	return false
+}
+
+func NewLeaseEventDeferHandler() *LeaseEventDeferHandler {
+	return &LeaseEventDeferHandler{}
+}
diff --git a/server/plugin/discovery/config.go b/server/plugin/discovery/config.go
index b14cbab..d0e53f0 100644
--- a/server/plugin/discovery/config.go
+++ b/server/plugin/discovery/config.go
@@ -31,11 +31,12 @@ type Config struct {
 	DeferHandler DeferHandler
 	OnEvent      KvEventFunc
 	Parser       proto.Parser
+	Lease        bool
 }
 
 func (cfg *Config) String() string {
-	return fmt.Sprintf("{key: %s, timeout: %s, period: %s}",
-		cfg.Key, cfg.Timeout, cfg.Period)
+	return fmt.Sprintf("{key: %s, timeout: %s, period: %s, lease: %v}",
+		cfg.Key, cfg.Timeout, cfg.Period, cfg.Lease)
 }
 
 func (cfg *Config) WithPrefix(key string) *Config {
@@ -85,6 +86,11 @@ func (cfg *Config) WithParser(parser proto.Parser) *Config {
 	return cfg
 }
 
+func (cfg *Config) WithLease() *Config {
+	cfg.Lease = true
+	return cfg
+}
+
 func Configure() *Config {
 	return &Config{
 		Key:      "/",
diff --git a/server/plugin/discovery/etcd/cacher_kv.go b/server/plugin/discovery/etcd/cacher_kv.go
index eaa302b..c37aadd 100644
--- a/server/plugin/discovery/etcd/cacher_kv.go
+++ b/server/plugin/discovery/etcd/cacher_kv.go
@@ -362,6 +362,7 @@ func (c *KvCacher) handleDeferEvents(ctx context.Context) {
 		evts = make([]discovery.KvEvent, eventBlockSize)
 		i    int
 	)
+	// the interval of pack recv events into one block
 	interval := 300 * time.Millisecond
 	timer := time.NewTimer(interval)
 	defer timer.Stop()
@@ -371,6 +372,7 @@ func (c *KvCacher) handleDeferEvents(ctx context.Context) {
 			return
 		case evt, ok := <-c.Cfg.DeferHandler.HandleChan():
 			if !ok {
+				log.Error(fmt.Sprintf("[%s]defer handle chan is closed!", c.Cfg.Key), nil)
 				return
 			}
 
@@ -391,6 +393,8 @@ func (c *KvCacher) handleDeferEvents(ctx context.Context) {
 				continue
 			}
 
+			log.Debug(fmt.Sprintf("[%s]recv one defer events block[%d]", c.Cfg.Key, i))
+
 			c.onEvents(evts[:i])
 			evts = make([]discovery.KvEvent, eventBlockSize)
 			i = 0
diff --git a/server/plugin/discovery/etcd/common.go b/server/plugin/discovery/etcd/common.go
index b0c1a04..31d060f 100644
--- a/server/plugin/discovery/etcd/common.go
+++ b/server/plugin/discovery/etcd/common.go
@@ -43,6 +43,7 @@ func FromEtcdKeyValue(dist *discovery.KeyValue, src *mvccpb.KeyValue, parser pro
 	dist.Version = src.Version
 	dist.CreateRevision = src.CreateRevision
 	dist.ModRevision = src.ModRevision
+	dist.Lease = src.Lease
 	if parser == nil {
 		return
 	}
diff --git a/server/plugin/discovery/types.go b/server/plugin/discovery/types.go
index 7e75723..f5be39b 100644
--- a/server/plugin/discovery/types.go
+++ b/server/plugin/discovery/types.go
@@ -65,6 +65,7 @@ type KeyValue struct {
 	Version        int64
 	CreateRevision int64
 	ModRevision    int64
+	Lease          int64
 	ClusterName    string
 }