You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by ti...@apache.org on 2021/03/13 08:51:06 UTC
[servicecomb-service-center] branch master updated: self
preservation can not replay events (#895)
This is an automated email from the ASF dual-hosted git repository.
tianxiaoliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-service-center.git
The following commit(s) were added to refs/heads/master by this push:
new 38f39d1 self preservation can not replay events (#895)
38f39d1 is described below
commit 38f39d1a315611f4930dea2a00d6906df6f4b8fb
Author: Shawn <xi...@gmail.com>
AuthorDate: Sat Mar 13 16:50:58 2021 +0800
self preservation can not replay events (#895)
---
datasource/etcd/kv/instance_event_handler.go | 89 ++++++++++++++--------------
datasource/etcd/sd/etcd/cacher_kv.go | 1 +
pkg/metrics/common.go | 14 ++++-
server/config/util.go | 3 +
server/metrics/meta.go | 29 +++++----
server/plugin/quota/buildin/buildin_test.go | 32 ++++++++--
server/plugin/quota/quota.go | 12 ++--
7 files changed, 105 insertions(+), 75 deletions(-)
diff --git a/datasource/etcd/kv/instance_event_handler.go b/datasource/etcd/kv/instance_event_handler.go
index 8650e50..d013a09 100644
--- a/datasource/etcd/kv/instance_event_handler.go
+++ b/datasource/etcd/kv/instance_event_handler.go
@@ -30,20 +30,20 @@ import (
)
type deferItem struct {
- ttl int32 // in seconds
- event sd.KvEvent
+ ReplayAfter int32 // in seconds
+ event sd.KvEvent
}
type InstanceEventDeferHandler struct {
Percent float64
- cache sd.CacheReader
- once sync.Once
- enabled bool
- items map[string]*deferItem
- pendingCh chan []sd.KvEvent
- deferCh chan sd.KvEvent
- resetCh chan struct{}
+ cache sd.CacheReader
+ once sync.Once
+ enabled bool
+ items map[string]*deferItem
+ evts chan []sd.KvEvent
+ replayCh chan sd.KvEvent
+ resetCh chan struct{}
}
func (iedh *InstanceEventDeferHandler) OnCondition(cache sd.CacheReader, evts []sd.KvEvent) bool {
@@ -54,19 +54,19 @@ func (iedh *InstanceEventDeferHandler) OnCondition(cache sd.CacheReader, evts []
iedh.once.Do(func() {
iedh.cache = cache
iedh.items = make(map[string]*deferItem)
- iedh.pendingCh = make(chan []sd.KvEvent, eventBlockSize)
- iedh.deferCh = make(chan sd.KvEvent, eventBlockSize)
+ iedh.evts = make(chan []sd.KvEvent, eventBlockSize)
+ iedh.replayCh = make(chan sd.KvEvent, eventBlockSize)
iedh.resetCh = make(chan struct{})
gopool.Go(iedh.check)
})
- iedh.pendingCh <- evts
+ iedh.evts <- evts
return true
}
func (iedh *InstanceEventDeferHandler) recoverOrDefer(evt sd.KvEvent) {
if evt.KV == nil {
- log.Errorf(nil, "defer or recover a %s nil KV", evt.Type)
+ log.Errorf(nil, "defer or replayEvent a %s nil KV", evt.Type)
return
}
kv := evt.KV
@@ -78,7 +78,7 @@ func (iedh *InstanceEventDeferHandler) recoverOrDefer(evt sd.KvEvent) {
log.Infof("recovered key %s events", key)
// return nil // no need to publish event to subscribers?
}
- iedh.recover(evt)
+ iedh.replayEvent(evt)
case discovery.EVT_DELETE:
if ok {
return
@@ -86,7 +86,7 @@ func (iedh *InstanceEventDeferHandler) recoverOrDefer(evt sd.KvEvent) {
instance := kv.Value.(*discovery.MicroServiceInstance)
if instance == nil {
- log.Errorf(nil, "defer or recover a %s nil Value, KV is %v", evt.Type, kv)
+ log.Errorf(nil, "defer or replayEvent a %s nil Value, KV is %v", evt.Type, kv)
return
}
ttl := instance.HealthCheck.Interval * (instance.HealthCheck.Times + 1)
@@ -94,27 +94,26 @@ func (iedh *InstanceEventDeferHandler) recoverOrDefer(evt sd.KvEvent) {
ttl = selfPreservationMaxTTL
}
iedh.items[key] = &deferItem{
- ttl: ttl,
- event: evt,
+ ReplayAfter: ttl,
+ event: evt,
}
}
}
func (iedh *InstanceEventDeferHandler) HandleChan() <-chan sd.KvEvent {
- return iedh.deferCh
+ return iedh.replayCh
}
func (iedh *InstanceEventDeferHandler) check(ctx context.Context) {
defer log.Recover()
-
t, n := time.NewTimer(deferCheckWindow), false
- interval := int32(deferCheckWindow / time.Second)
defer t.Stop()
for {
select {
case <-ctx.Done():
+ log.Error("self preservation routine dead", nil)
return
- case evts := <-iedh.pendingCh:
+ case evts := <-iedh.evts:
for _, evt := range evts {
iedh.recoverOrDefer(evt)
}
@@ -145,45 +144,45 @@ func (iedh *InstanceEventDeferHandler) check(ctx context.Context) {
if !iedh.enabled {
for _, item := range iedh.items {
- iedh.recover(item.event)
+ iedh.replayEvent(item.event)
}
continue
}
- for key, item := range iedh.items {
- item.ttl -= interval
- if item.ttl > 0 {
- continue
- }
- log.Warnf("defer handle timed out, removed key is %s", key)
- iedh.recover(item.event)
- }
- if len(iedh.items) == 0 {
- iedh.renew()
- log.Warnf("self preservation is stopped")
- }
+ iedh.ReplayEvents()
case <-iedh.resetCh:
- iedh.renew()
- log.Warnf("self preservation is reset")
-
+ iedh.ReplayEvents()
+ iedh.enabled = false
util.ResetTimer(t, deferCheckWindow)
}
}
}
-func (iedh *InstanceEventDeferHandler) recover(evt sd.KvEvent) {
- key := util.BytesToStringWithNoCopy(evt.KV.Key)
- delete(iedh.items, key)
- iedh.deferCh <- evt
+func (iedh *InstanceEventDeferHandler) ReplayEvents() {
+ interval := int32(deferCheckWindow / time.Second)
+ for key, item := range iedh.items {
+ item.ReplayAfter -= interval
+ if item.ReplayAfter > 0 {
+ continue
+ }
+ log.Warnf("replay delete event, remove key: %s", key)
+ iedh.replayEvent(item.event)
+ }
+ if len(iedh.items) == 0 {
+ iedh.enabled = false
+ log.Warnf("self preservation stopped")
+ }
}
-func (iedh *InstanceEventDeferHandler) renew() {
- iedh.enabled = false
- iedh.items = make(map[string]*deferItem)
+func (iedh *InstanceEventDeferHandler) replayEvent(evt sd.KvEvent) {
+ key := util.BytesToStringWithNoCopy(evt.KV.Key)
+ delete(iedh.items, key)
+ iedh.replayCh <- evt
}
func (iedh *InstanceEventDeferHandler) Reset() bool {
- if iedh.enabled {
+ if iedh.enabled || len(iedh.items) != 0 {
+ log.Warnf("self preservation is reset")
iedh.resetCh <- struct{}{}
return true
}
diff --git a/datasource/etcd/sd/etcd/cacher_kv.go b/datasource/etcd/sd/etcd/cacher_kv.go
index 9a7fa51..d65dca8 100644
--- a/datasource/etcd/sd/etcd/cacher_kv.go
+++ b/datasource/etcd/sd/etcd/cacher_kv.go
@@ -373,6 +373,7 @@ func (c *KvCacher) handleDeferEvents(ctx context.Context) {
return
case evt, ok := <-c.Cfg.DeferHandler.HandleChan():
if !ok {
+ log.Error("replay channel is closed", nil)
return
}
diff --git a/pkg/metrics/common.go b/pkg/metrics/common.go
index 459e9a4..56b4d47 100644
--- a/pkg/metrics/common.go
+++ b/pkg/metrics/common.go
@@ -23,7 +23,15 @@ import (
dto "github.com/prometheus/client_model/go"
)
-func getValue(name string, labels prometheus.Labels, apply func(m *dto.Metric) float64) float64 {
+// keys of gauge
+const (
+ KeyServiceTotal = "service_total"
+ KeyInstanceTotal = "instance_total"
+
+ SubSystem = "db"
+)
+
+func getValue(name string, labels prometheus.Labels) float64 {
f := Family(name)
if f == nil {
return 0
@@ -34,13 +42,13 @@ func getValue(name string, labels prometheus.Labels, apply func(m *dto.Metric) f
if !matchAll && !MatchLabels(m, labels) {
continue
}
- sum += apply(m)
+ sum += m.GetGauge().GetValue()
}
return sum
}
func GaugeValue(name string, labels prometheus.Labels) int64 {
- return int64(getValue(name, labels, func(m *dto.Metric) float64 { return m.GetGauge().GetValue() }))
+ return int64(getValue(name, labels))
}
func MatchLabels(m *dto.Metric, labels prometheus.Labels) bool {
diff --git a/server/config/util.go b/server/config/util.go
index a5cd29f..cb0f6c0 100644
--- a/server/config/util.go
+++ b/server/config/util.go
@@ -73,6 +73,9 @@ func GetInt(key string, def int, opts ...Option) int {
if archaius.Exist(key) {
return archaius.GetInt(key, def)
}
+ if archaius.Exist(options.Standby) {
+ return archaius.GetInt(options.Standby, def)
+ }
return beego.AppConfig.DefaultInt(options.Standby, def)
}
diff --git a/server/metrics/meta.go b/server/metrics/meta.go
index 4846719..7d17744 100644
--- a/server/metrics/meta.go
+++ b/server/metrics/meta.go
@@ -18,6 +18,7 @@
package metrics
import (
+ "strings"
"time"
"github.com/apache/servicecomb-service-center/pkg/metrics"
@@ -29,24 +30,20 @@ import (
// keys of gauge
const (
KeyDomainTotal = "domain_total"
- KeyServiceTotal = "service_total"
- KeyInstanceTotal = "instance_total"
KeySchemaTotal = "schema_total"
KeyFrameworkTotal = "framework_total"
-
- SubSystem = "db"
)
// Key return metrics key
func Key(name string) string {
- return util.StringJoin([]string{SubSystem, name}, "_")
+ return util.StringJoin([]string{metrics.SubSystem, name}, "_")
}
var (
domainCounter = helper.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: metrics.FamilyName,
- Subsystem: SubSystem,
+ Subsystem: metrics.SubSystem,
Name: KeyDomainTotal,
Help: "Gauge of domain created in Service Center",
}, []string{"instance"})
@@ -55,22 +52,22 @@ var (
prometheus.GaugeOpts{
Namespace: metrics.FamilyName,
Subsystem: "db",
- Name: KeyServiceTotal,
+ Name: metrics.KeyServiceTotal,
Help: "Gauge of microservice created in Service Center",
}, []string{"instance", "framework", "frameworkVersion", "domain"})
instanceCounter = helper.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: metrics.FamilyName,
- Subsystem: SubSystem,
- Name: KeyInstanceTotal,
+ Subsystem: metrics.SubSystem,
+ Name: metrics.KeyInstanceTotal,
Help: "Gauge of microservice created in Service Center",
}, []string{"instance", "domain"})
schemaCounter = helper.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: metrics.FamilyName,
- Subsystem: SubSystem,
+ Subsystem: metrics.SubSystem,
Name: KeySchemaTotal,
Help: "Gauge of schema created in Service Center",
}, []string{"instance", "domain"})
@@ -78,7 +75,7 @@ var (
frameworkCounter = helper.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: metrics.FamilyName,
- Subsystem: SubSystem,
+ Subsystem: metrics.SubSystem,
Name: KeyFrameworkTotal,
Help: "Gauge of client framework info in Service Center",
}, metrics.ToLabelNames(Framework{}))
@@ -126,12 +123,18 @@ func ReportServices(domain, framework, frameworkVersion string, c float64) {
instance := metrics.InstanceName()
serviceCounter.WithLabelValues(instance, framework, frameworkVersion, domain).Add(c)
}
-
+func GetTotalService(domain string) int64 {
+ return metrics.GaugeValue(strings.Join([]string{metrics.SubSystem, metrics.KeyServiceTotal}, "_"), prometheus.Labels{"domain": domain})
+}
func ReportInstances(domain string, c float64) {
instance := metrics.InstanceName()
instanceCounter.WithLabelValues(instance, domain).Add(c)
}
-
+func GetTotalInstance(domain string) int64 {
+ mn := strings.Join([]string{metrics.SubSystem, metrics.KeyInstanceTotal}, "_")
+ usage := metrics.GaugeValue(mn, prometheus.Labels{"domain": domain})
+ return usage
+}
func ReportSchemas(domain string, c float64) {
instance := metrics.InstanceName()
schemaCounter.WithLabelValues(instance, domain).Add(c)
diff --git a/server/plugin/quota/buildin/buildin_test.go b/server/plugin/quota/buildin/buildin_test.go
index bc69e26..799aaa1 100644
--- a/server/plugin/quota/buildin/buildin_test.go
+++ b/server/plugin/quota/buildin/buildin_test.go
@@ -17,6 +17,7 @@ package buildin_test
import (
"context"
+ "github.com/apache/servicecomb-service-center/pkg/util"
"github.com/apache/servicecomb-service-center/datasource"
_ "github.com/apache/servicecomb-service-center/server/init"
@@ -40,13 +41,15 @@ func init() {
}
func TestGetResourceLimit(t *testing.T) {
//var id string
+ ctx := context.TODO()
+ ctx = util.SetDomainProject(ctx, "quota", "quota")
t.Run("create service,should success", func(t *testing.T) {
- res := quota.NewApplyQuotaResource(quota.TypeService, "default/default", "", 1)
- err := quota.Apply(context.TODO(), res)
+ res := quota.NewApplyQuotaResource(quota.TypeService, "quota/quota", "", 1)
+ err := quota.Apply(ctx, res)
assert.Nil(t, err)
})
- t.Run("create instance,should success", func(t *testing.T) {
- resp, err := datasource.Instance().RegisterService(context.TODO(), &pb.CreateServiceRequest{
+ t.Run("create 1 instance,should success", func(t *testing.T) {
+ resp, err := datasource.Instance().RegisterService(ctx, &pb.CreateServiceRequest{
Service: &pb.MicroService{
ServiceName: "quota",
},
@@ -54,9 +57,26 @@ func TestGetResourceLimit(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, pb.ResponseSuccess, resp.Response.GetCode())
- res := quota.NewApplyQuotaResource(quota.TypeInstance, "default/default", resp.ServiceId, 1)
- err = quota.Apply(context.TODO(), res)
+ res := quota.NewApplyQuotaResource(quota.TypeInstance, "quota/quota", resp.ServiceId, 1)
+ err = quota.Apply(ctx, res)
assert.Nil(t, err)
+
+ res = quota.NewApplyQuotaResource(quota.TypeInstance, "quota/quota", resp.ServiceId, 150001)
+ err = quota.Apply(ctx, res)
+ assert.NotNil(t, err)
+ })
+ t.Run("create 150001 instance,should failed", func(t *testing.T) {
+ resp, err := datasource.Instance().RegisterService(ctx, &pb.CreateServiceRequest{
+ Service: &pb.MicroService{
+ ServiceName: "quota2",
+ },
+ })
+ assert.NoError(t, err)
+ assert.Equal(t, pb.ResponseSuccess, resp.Response.GetCode())
+
+ res := quota.NewApplyQuotaResource(quota.TypeInstance, "quota/quota", resp.ServiceId, 150001)
+ err = quota.Apply(ctx, res)
+ assert.NotNil(t, err)
})
}
diff --git a/server/plugin/quota/quota.go b/server/plugin/quota/quota.go
index 3460045..51210d4 100644
--- a/server/plugin/quota/quota.go
+++ b/server/plugin/quota/quota.go
@@ -23,9 +23,8 @@ import (
"fmt"
"github.com/apache/servicecomb-service-center/datasource"
"github.com/apache/servicecomb-service-center/pkg/log"
- "github.com/apache/servicecomb-service-center/pkg/metrics"
"github.com/apache/servicecomb-service-center/pkg/util"
- "github.com/prometheus/client_golang/prometheus"
+ "github.com/apache/servicecomb-service-center/server/metrics"
"strconv"
"github.com/apache/servicecomb-service-center/server/config"
@@ -50,10 +49,6 @@ const (
TypeService
TypeInstance
)
-const (
- TotalService = "db_service_total"
- TotalInstance = "db_instance_total"
-)
var (
DefaultServiceQuota = defaultServiceLimit
@@ -141,9 +136,10 @@ func GetResourceUsage(ctx context.Context, res *ApplyQuotaResource) (int64, erro
serviceID := res.ServiceID
switch res.QuotaType {
case TypeService:
- return metrics.GaugeValue(TotalService, prometheus.Labels{"domain": util.ParseDomain(ctx)}), nil
+ return metrics.GetTotalService(util.ParseDomain(ctx)), nil
case TypeInstance:
- return metrics.GaugeValue(TotalInstance, prometheus.Labels{"domain": util.ParseDomain(ctx)}), nil
+ usage := metrics.GetTotalInstance(util.ParseDomain(ctx))
+ return usage, nil
case TypeRule:
{
resp, err := datasource.Instance().GetRules(ctx, &pb.GetServiceRulesRequest{