You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by ti...@apache.org on 2021/03/13 08:51:06 UTC

[servicecomb-service-center] branch master updated: self preservation can not replay events (#895)

This is an automated email from the ASF dual-hosted git repository.

tianxiaoliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-service-center.git


The following commit(s) were added to refs/heads/master by this push:
     new 38f39d1  self preservation can not replay events (#895)
38f39d1 is described below

commit 38f39d1a315611f4930dea2a00d6906df6f4b8fb
Author: Shawn <xi...@gmail.com>
AuthorDate: Sat Mar 13 16:50:58 2021 +0800

    self preservation can not replay events (#895)
---
 datasource/etcd/kv/instance_event_handler.go | 89 ++++++++++++++--------------
 datasource/etcd/sd/etcd/cacher_kv.go         |  1 +
 pkg/metrics/common.go                        | 14 ++++-
 server/config/util.go                        |  3 +
 server/metrics/meta.go                       | 29 +++++----
 server/plugin/quota/buildin/buildin_test.go  | 32 ++++++++--
 server/plugin/quota/quota.go                 | 12 ++--
 7 files changed, 105 insertions(+), 75 deletions(-)

diff --git a/datasource/etcd/kv/instance_event_handler.go b/datasource/etcd/kv/instance_event_handler.go
index 8650e50..d013a09 100644
--- a/datasource/etcd/kv/instance_event_handler.go
+++ b/datasource/etcd/kv/instance_event_handler.go
@@ -30,20 +30,20 @@ import (
 )
 
 type deferItem struct {
-	ttl   int32 // in seconds
-	event sd.KvEvent
+	ReplayAfter int32 // in seconds
+	event       sd.KvEvent
 }
 
 type InstanceEventDeferHandler struct {
 	Percent float64
 
-	cache     sd.CacheReader
-	once      sync.Once
-	enabled   bool
-	items     map[string]*deferItem
-	pendingCh chan []sd.KvEvent
-	deferCh   chan sd.KvEvent
-	resetCh   chan struct{}
+	cache    sd.CacheReader
+	once     sync.Once
+	enabled  bool
+	items    map[string]*deferItem
+	evts     chan []sd.KvEvent
+	replayCh chan sd.KvEvent
+	resetCh  chan struct{}
 }
 
 func (iedh *InstanceEventDeferHandler) OnCondition(cache sd.CacheReader, evts []sd.KvEvent) bool {
@@ -54,19 +54,19 @@ func (iedh *InstanceEventDeferHandler) OnCondition(cache sd.CacheReader, evts []
 	iedh.once.Do(func() {
 		iedh.cache = cache
 		iedh.items = make(map[string]*deferItem)
-		iedh.pendingCh = make(chan []sd.KvEvent, eventBlockSize)
-		iedh.deferCh = make(chan sd.KvEvent, eventBlockSize)
+		iedh.evts = make(chan []sd.KvEvent, eventBlockSize)
+		iedh.replayCh = make(chan sd.KvEvent, eventBlockSize)
 		iedh.resetCh = make(chan struct{})
 		gopool.Go(iedh.check)
 	})
 
-	iedh.pendingCh <- evts
+	iedh.evts <- evts
 	return true
 }
 
 func (iedh *InstanceEventDeferHandler) recoverOrDefer(evt sd.KvEvent) {
 	if evt.KV == nil {
-		log.Errorf(nil, "defer or recover a %s nil KV", evt.Type)
+		log.Errorf(nil, "defer or replayEvent a %s nil KV", evt.Type)
 		return
 	}
 	kv := evt.KV
@@ -78,7 +78,7 @@ func (iedh *InstanceEventDeferHandler) recoverOrDefer(evt sd.KvEvent) {
 			log.Infof("recovered key %s events", key)
 			// return nil // no need to publish event to subscribers?
 		}
-		iedh.recover(evt)
+		iedh.replayEvent(evt)
 	case discovery.EVT_DELETE:
 		if ok {
 			return
@@ -86,7 +86,7 @@ func (iedh *InstanceEventDeferHandler) recoverOrDefer(evt sd.KvEvent) {
 
 		instance := kv.Value.(*discovery.MicroServiceInstance)
 		if instance == nil {
-			log.Errorf(nil, "defer or recover a %s nil Value, KV is %v", evt.Type, kv)
+			log.Errorf(nil, "defer or replayEvent a %s nil Value, KV is %v", evt.Type, kv)
 			return
 		}
 		ttl := instance.HealthCheck.Interval * (instance.HealthCheck.Times + 1)
@@ -94,27 +94,26 @@ func (iedh *InstanceEventDeferHandler) recoverOrDefer(evt sd.KvEvent) {
 			ttl = selfPreservationMaxTTL
 		}
 		iedh.items[key] = &deferItem{
-			ttl:   ttl,
-			event: evt,
+			ReplayAfter: ttl,
+			event:       evt,
 		}
 	}
 }
 
 func (iedh *InstanceEventDeferHandler) HandleChan() <-chan sd.KvEvent {
-	return iedh.deferCh
+	return iedh.replayCh
 }
 
 func (iedh *InstanceEventDeferHandler) check(ctx context.Context) {
 	defer log.Recover()
-
 	t, n := time.NewTimer(deferCheckWindow), false
-	interval := int32(deferCheckWindow / time.Second)
 	defer t.Stop()
 	for {
 		select {
 		case <-ctx.Done():
+			log.Error("self preservation routine dead", nil)
 			return
-		case evts := <-iedh.pendingCh:
+		case evts := <-iedh.evts:
 			for _, evt := range evts {
 				iedh.recoverOrDefer(evt)
 			}
@@ -145,45 +144,45 @@ func (iedh *InstanceEventDeferHandler) check(ctx context.Context) {
 
 			if !iedh.enabled {
 				for _, item := range iedh.items {
-					iedh.recover(item.event)
+					iedh.replayEvent(item.event)
 				}
 				continue
 			}
 
-			for key, item := range iedh.items {
-				item.ttl -= interval
-				if item.ttl > 0 {
-					continue
-				}
-				log.Warnf("defer handle timed out, removed key is %s", key)
-				iedh.recover(item.event)
-			}
-			if len(iedh.items) == 0 {
-				iedh.renew()
-				log.Warnf("self preservation is stopped")
-			}
+			iedh.ReplayEvents()
 		case <-iedh.resetCh:
-			iedh.renew()
-			log.Warnf("self preservation is reset")
-
+			iedh.ReplayEvents()
+			iedh.enabled = false
 			util.ResetTimer(t, deferCheckWindow)
 		}
 	}
 }
 
-func (iedh *InstanceEventDeferHandler) recover(evt sd.KvEvent) {
-	key := util.BytesToStringWithNoCopy(evt.KV.Key)
-	delete(iedh.items, key)
-	iedh.deferCh <- evt
+func (iedh *InstanceEventDeferHandler) ReplayEvents() {
+	interval := int32(deferCheckWindow / time.Second)
+	for key, item := range iedh.items {
+		item.ReplayAfter -= interval
+		if item.ReplayAfter > 0 {
+			continue
+		}
+		log.Warnf("replay delete event, remove key: %s", key)
+		iedh.replayEvent(item.event)
+	}
+	if len(iedh.items) == 0 {
+		iedh.enabled = false
+		log.Warnf("self preservation stopped")
+	}
 }
 
-func (iedh *InstanceEventDeferHandler) renew() {
-	iedh.enabled = false
-	iedh.items = make(map[string]*deferItem)
+func (iedh *InstanceEventDeferHandler) replayEvent(evt sd.KvEvent) {
+	key := util.BytesToStringWithNoCopy(evt.KV.Key)
+	delete(iedh.items, key)
+	iedh.replayCh <- evt
 }
 
 func (iedh *InstanceEventDeferHandler) Reset() bool {
-	if iedh.enabled {
+	if iedh.enabled || len(iedh.items) != 0 {
+		log.Warnf("self preservation is reset")
 		iedh.resetCh <- struct{}{}
 		return true
 	}
diff --git a/datasource/etcd/sd/etcd/cacher_kv.go b/datasource/etcd/sd/etcd/cacher_kv.go
index 9a7fa51..d65dca8 100644
--- a/datasource/etcd/sd/etcd/cacher_kv.go
+++ b/datasource/etcd/sd/etcd/cacher_kv.go
@@ -373,6 +373,7 @@ func (c *KvCacher) handleDeferEvents(ctx context.Context) {
 			return
 		case evt, ok := <-c.Cfg.DeferHandler.HandleChan():
 			if !ok {
+				log.Error("replay channel is closed", nil)
 				return
 			}
 
diff --git a/pkg/metrics/common.go b/pkg/metrics/common.go
index 459e9a4..56b4d47 100644
--- a/pkg/metrics/common.go
+++ b/pkg/metrics/common.go
@@ -23,7 +23,15 @@ import (
 	dto "github.com/prometheus/client_model/go"
 )
 
-func getValue(name string, labels prometheus.Labels, apply func(m *dto.Metric) float64) float64 {
+// keys of gauge
+const (
+	KeyServiceTotal  = "service_total"
+	KeyInstanceTotal = "instance_total"
+
+	SubSystem = "db"
+)
+
+func getValue(name string, labels prometheus.Labels) float64 {
 	f := Family(name)
 	if f == nil {
 		return 0
@@ -34,13 +42,13 @@ func getValue(name string, labels prometheus.Labels, apply func(m *dto.Metric) f
 		if !matchAll && !MatchLabels(m, labels) {
 			continue
 		}
-		sum += apply(m)
+		sum += m.GetGauge().GetValue()
 	}
 	return sum
 }
 
 func GaugeValue(name string, labels prometheus.Labels) int64 {
-	return int64(getValue(name, labels, func(m *dto.Metric) float64 { return m.GetGauge().GetValue() }))
+	return int64(getValue(name, labels))
 }
 
 func MatchLabels(m *dto.Metric, labels prometheus.Labels) bool {
diff --git a/server/config/util.go b/server/config/util.go
index a5cd29f..cb0f6c0 100644
--- a/server/config/util.go
+++ b/server/config/util.go
@@ -73,6 +73,9 @@ func GetInt(key string, def int, opts ...Option) int {
 	if archaius.Exist(key) {
 		return archaius.GetInt(key, def)
 	}
+	if archaius.Exist(options.Standby) {
+		return archaius.GetInt(options.Standby, def)
+	}
 	return beego.AppConfig.DefaultInt(options.Standby, def)
 }
 
diff --git a/server/metrics/meta.go b/server/metrics/meta.go
index 4846719..7d17744 100644
--- a/server/metrics/meta.go
+++ b/server/metrics/meta.go
@@ -18,6 +18,7 @@
 package metrics
 
 import (
+	"strings"
 	"time"
 
 	"github.com/apache/servicecomb-service-center/pkg/metrics"
@@ -29,24 +30,20 @@ import (
 // keys of gauge
 const (
 	KeyDomainTotal    = "domain_total"
-	KeyServiceTotal   = "service_total"
-	KeyInstanceTotal  = "instance_total"
 	KeySchemaTotal    = "schema_total"
 	KeyFrameworkTotal = "framework_total"
-
-	SubSystem = "db"
 )
 
 // Key return metrics key
 func Key(name string) string {
-	return util.StringJoin([]string{SubSystem, name}, "_")
+	return util.StringJoin([]string{metrics.SubSystem, name}, "_")
 }
 
 var (
 	domainCounter = helper.NewGaugeVec(
 		prometheus.GaugeOpts{
 			Namespace: metrics.FamilyName,
-			Subsystem: SubSystem,
+			Subsystem: metrics.SubSystem,
 			Name:      KeyDomainTotal,
 			Help:      "Gauge of domain created in Service Center",
 		}, []string{"instance"})
@@ -55,22 +52,22 @@ var (
 		prometheus.GaugeOpts{
 			Namespace: metrics.FamilyName,
 			Subsystem: "db",
-			Name:      KeyServiceTotal,
+			Name:      metrics.KeyServiceTotal,
 			Help:      "Gauge of microservice created in Service Center",
 		}, []string{"instance", "framework", "frameworkVersion", "domain"})
 
 	instanceCounter = helper.NewGaugeVec(
 		prometheus.GaugeOpts{
 			Namespace: metrics.FamilyName,
-			Subsystem: SubSystem,
-			Name:      KeyInstanceTotal,
+			Subsystem: metrics.SubSystem,
+			Name:      metrics.KeyInstanceTotal,
 			Help:      "Gauge of microservice created in Service Center",
 		}, []string{"instance", "domain"})
 
 	schemaCounter = helper.NewGaugeVec(
 		prometheus.GaugeOpts{
 			Namespace: metrics.FamilyName,
-			Subsystem: SubSystem,
+			Subsystem: metrics.SubSystem,
 			Name:      KeySchemaTotal,
 			Help:      "Gauge of schema created in Service Center",
 		}, []string{"instance", "domain"})
@@ -78,7 +75,7 @@ var (
 	frameworkCounter = helper.NewGaugeVec(
 		prometheus.GaugeOpts{
 			Namespace: metrics.FamilyName,
-			Subsystem: SubSystem,
+			Subsystem: metrics.SubSystem,
 			Name:      KeyFrameworkTotal,
 			Help:      "Gauge of client framework info in Service Center",
 		}, metrics.ToLabelNames(Framework{}))
@@ -126,12 +123,18 @@ func ReportServices(domain, framework, frameworkVersion string, c float64) {
 	instance := metrics.InstanceName()
 	serviceCounter.WithLabelValues(instance, framework, frameworkVersion, domain).Add(c)
 }
-
+func GetTotalService(domain string) int64 {
+	return metrics.GaugeValue(strings.Join([]string{metrics.SubSystem, metrics.KeyServiceTotal}, "_"), prometheus.Labels{"domain": domain})
+}
 func ReportInstances(domain string, c float64) {
 	instance := metrics.InstanceName()
 	instanceCounter.WithLabelValues(instance, domain).Add(c)
 }
-
+func GetTotalInstance(domain string) int64 {
+	mn := strings.Join([]string{metrics.SubSystem, metrics.KeyInstanceTotal}, "_")
+	usage := metrics.GaugeValue(mn, prometheus.Labels{"domain": domain})
+	return usage
+}
 func ReportSchemas(domain string, c float64) {
 	instance := metrics.InstanceName()
 	schemaCounter.WithLabelValues(instance, domain).Add(c)
diff --git a/server/plugin/quota/buildin/buildin_test.go b/server/plugin/quota/buildin/buildin_test.go
index bc69e26..799aaa1 100644
--- a/server/plugin/quota/buildin/buildin_test.go
+++ b/server/plugin/quota/buildin/buildin_test.go
@@ -17,6 +17,7 @@ package buildin_test
 
 import (
 	"context"
+	"github.com/apache/servicecomb-service-center/pkg/util"
 
 	"github.com/apache/servicecomb-service-center/datasource"
 	_ "github.com/apache/servicecomb-service-center/server/init"
@@ -40,13 +41,15 @@ func init() {
 }
 func TestGetResourceLimit(t *testing.T) {
 	//var id string
+	ctx := context.TODO()
+	ctx = util.SetDomainProject(ctx, "quota", "quota")
 	t.Run("create service,should success", func(t *testing.T) {
-		res := quota.NewApplyQuotaResource(quota.TypeService, "default/default", "", 1)
-		err := quota.Apply(context.TODO(), res)
+		res := quota.NewApplyQuotaResource(quota.TypeService, "quota/quota", "", 1)
+		err := quota.Apply(ctx, res)
 		assert.Nil(t, err)
 	})
-	t.Run("create instance,should success", func(t *testing.T) {
-		resp, err := datasource.Instance().RegisterService(context.TODO(), &pb.CreateServiceRequest{
+	t.Run("create 1 instance,should success", func(t *testing.T) {
+		resp, err := datasource.Instance().RegisterService(ctx, &pb.CreateServiceRequest{
 			Service: &pb.MicroService{
 				ServiceName: "quota",
 			},
@@ -54,9 +57,26 @@ func TestGetResourceLimit(t *testing.T) {
 		assert.NoError(t, err)
 		assert.Equal(t, pb.ResponseSuccess, resp.Response.GetCode())
 
-		res := quota.NewApplyQuotaResource(quota.TypeInstance, "default/default", resp.ServiceId, 1)
-		err = quota.Apply(context.TODO(), res)
+		res := quota.NewApplyQuotaResource(quota.TypeInstance, "quota/quota", resp.ServiceId, 1)
+		err = quota.Apply(ctx, res)
 		assert.Nil(t, err)
+
+		res = quota.NewApplyQuotaResource(quota.TypeInstance, "quota/quota", resp.ServiceId, 150001)
+		err = quota.Apply(ctx, res)
+		assert.NotNil(t, err)
+	})
+	t.Run("create 150001 instance,should failed", func(t *testing.T) {
+		resp, err := datasource.Instance().RegisterService(ctx, &pb.CreateServiceRequest{
+			Service: &pb.MicroService{
+				ServiceName: "quota2",
+			},
+		})
+		assert.NoError(t, err)
+		assert.Equal(t, pb.ResponseSuccess, resp.Response.GetCode())
+
+		res := quota.NewApplyQuotaResource(quota.TypeInstance, "quota/quota", resp.ServiceId, 150001)
+		err = quota.Apply(ctx, res)
+		assert.NotNil(t, err)
 	})
 
 }
diff --git a/server/plugin/quota/quota.go b/server/plugin/quota/quota.go
index 3460045..51210d4 100644
--- a/server/plugin/quota/quota.go
+++ b/server/plugin/quota/quota.go
@@ -23,9 +23,8 @@ import (
 	"fmt"
 	"github.com/apache/servicecomb-service-center/datasource"
 	"github.com/apache/servicecomb-service-center/pkg/log"
-	"github.com/apache/servicecomb-service-center/pkg/metrics"
 	"github.com/apache/servicecomb-service-center/pkg/util"
-	"github.com/prometheus/client_golang/prometheus"
+	"github.com/apache/servicecomb-service-center/server/metrics"
 	"strconv"
 
 	"github.com/apache/servicecomb-service-center/server/config"
@@ -50,10 +49,6 @@ const (
 	TypeService
 	TypeInstance
 )
-const (
-	TotalService  = "db_service_total"
-	TotalInstance = "db_instance_total"
-)
 
 var (
 	DefaultServiceQuota  = defaultServiceLimit
@@ -141,9 +136,10 @@ func GetResourceUsage(ctx context.Context, res *ApplyQuotaResource) (int64, erro
 	serviceID := res.ServiceID
 	switch res.QuotaType {
 	case TypeService:
-		return metrics.GaugeValue(TotalService, prometheus.Labels{"domain": util.ParseDomain(ctx)}), nil
+		return metrics.GetTotalService(util.ParseDomain(ctx)), nil
 	case TypeInstance:
-		return metrics.GaugeValue(TotalInstance, prometheus.Labels{"domain": util.ParseDomain(ctx)}), nil
+		usage := metrics.GetTotalInstance(util.ParseDomain(ctx))
+		return usage, nil
 	case TypeRule:
 		{
 			resp, err := datasource.Instance().GetRules(ctx, &pb.GetServiceRulesRequest{