You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by ti...@apache.org on 2021/12/28 11:40:33 UTC
[servicecomb-service-center] branch master updated: Feature: add schema retire cron job (#1187)
This is an automated email from the ASF dual-hosted git repository.
tianxiaoliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-service-center.git
The following commit(s) were added to refs/heads/master by this push:
new a1387cc Feature: add schema retire cron job (#1187)
a1387cc is described below
commit a1387cc7a53d010f69437e0cc864761f6681406f
Author: little-cui <su...@qq.com>
AuthorDate: Tue Dec 28 19:40:25 2021 +0800
Feature: add schema retire cron job (#1187)
---
datasource/etcd/path/key_convertor.go | 89 +++++++++-------------
datasource/etcd/path/key_convertor_test.go | 8 +-
datasource/etcd/path/key_generator.go | 5 --
datasource/etcd/retire.go | 88 +--------------------
datasource/etcd/schema.go | 54 ++++++++++++-
datasource/mongo/retire.go | 2 +-
datasource/mongo/schema.go | 8 +-
datasource/retire.go | 11 +--
datasource/schema/schema.go | 4 +-
etc/conf/app.yaml | 4 +-
go.mod | 1 +
go.sum | 2 +
integration/instances_test.go | 24 +++---
server/job/disco/retire.go | 44 +++++------
.../disco/retire.go => job/disco/schema.go} | 28 ++++++-
server/service/disco/retire.go | 15 ++++
server/service/disco/retire_test.go | 55 ++++++++++++-
server/service/disco/schema_test.go | 28 +++++--
18 files changed, 255 insertions(+), 215 deletions(-)
diff --git a/datasource/etcd/path/key_convertor.go b/datasource/etcd/path/key_convertor.go
index 7c93604..7e0ee8c 100644
--- a/datasource/etcd/path/key_convertor.go
+++ b/datasource/etcd/path/key_convertor.go
@@ -23,39 +23,45 @@ import (
"github.com/apache/servicecomb-service-center/pkg/util"
"github.com/go-chassis/cari/discovery"
+ "github.com/go-chassis/foundation/stringutil"
)
-func ToResponse(key []byte) (keys []string) {
+func splitKey(key []byte) (keys []string) {
return strings.Split(util.BytesToStringWithNoCopy(key), SPLIT)
}
-func GetInfoFromSvcKV(key []byte) (serviceID, domainProject string) {
- keys := ToResponse(key)
+func getLast2Keys(key []byte) (string, string) {
+ keys := splitKey(key)
l := len(keys)
- if l < 4 {
- return
+ if l < 3 {
+ return "", ""
}
- serviceID = keys[l-1]
- domainProject = fmt.Sprintf("%s/%s", keys[l-3], keys[l-2])
- return
+ return fmt.Sprintf("%s/%s", keys[l-3], keys[l-2]), keys[l-1]
}
-func GetInfoFromInstKV(key []byte) (serviceID, instanceID, domainProject string) {
- keys := ToResponse(key)
+func getLast3Keys(key []byte) (string, string, string) {
+ keys := splitKey(key)
l := len(keys)
if l < 4 {
- return
+ return "", "", ""
}
- serviceID = keys[l-2]
- instanceID = keys[l-1]
- domainProject = fmt.Sprintf("%s/%s", keys[l-4], keys[l-3])
+ return fmt.Sprintf("%s/%s", keys[l-4], keys[l-3]), keys[l-2], keys[l-1]
+}
+
+func GetInfoFromSvcKV(key []byte) (serviceID, domainProject string) {
+ domainProject, serviceID = getLast2Keys(key)
+ return
+}
+
+func GetInfoFromInstKV(key []byte) (serviceID, instanceID, domainProject string) {
+ domainProject, serviceID, instanceID = getLast3Keys(key)
return
}
func GetInfoFromDomainKV(key []byte) (domain string) {
- keys := ToResponse(key)
+ keys := splitKey(key)
l := len(keys)
- if l < 2 {
+ if l < 1 {
return
}
domain = keys[l-1]
@@ -63,27 +69,21 @@ func GetInfoFromDomainKV(key []byte) (domain string) {
}
func GetInfoFromProjectKV(key []byte) (domain, project string) {
- keys := ToResponse(key)
+ keys := splitKey(key)
l := len(keys)
if l < 2 {
- return
+ return "", ""
}
return keys[l-2], keys[l-1]
}
func GetInfoFromTagKV(key []byte) (serviceID, domainProject string) {
- keys := ToResponse(key)
- l := len(keys)
- if l < 3 {
- return
- }
- serviceID = keys[l-1]
- domainProject = fmt.Sprintf("%s/%s", keys[l-3], keys[l-2])
+ domainProject, serviceID = getLast2Keys(key)
return
}
func GetInfoFromSvcIndexKV(key []byte) *discovery.MicroServiceKey {
- keys := ToResponse(key)
+ keys := splitKey(key)
l := len(keys)
if l < 6 {
return nil
@@ -103,43 +103,28 @@ func GetInfoFromSvcAliasKV(key []byte) *discovery.MicroServiceKey {
}
func GetInfoFromSchemaRefKV(key []byte) (domainProject, serviceID, schemaID string) {
- return GetInfoFromSchemaSummaryKV(key)
+ return getLast3Keys(key)
}
func GetInfoFromSchemaSummaryKV(key []byte) (domainProject, serviceID, schemaID string) {
- keys := ToResponse(key)
- l := len(keys)
- if l < 4 {
- return
- }
- domainProject = fmt.Sprintf("%s/%s", keys[l-4], keys[l-3])
- return domainProject, keys[l-2], keys[l-1]
+ return getLast3Keys(key)
}
func GetInfoFromSchemaKV(key []byte) (domainProject, serviceID, schemaID string) {
- keys := ToResponse(key)
- l := len(keys)
- if l < 4 {
- return
- }
- domainProject = fmt.Sprintf("%s/%s", keys[l-4], keys[l-3])
- return domainProject, keys[l-2], keys[l-1]
+ return getLast3Keys(key)
+}
+
+func GetInfoFromSchemaContentKV(key []byte) (domainProject, hash string) {
+ return getLast2Keys(key)
}
func GetInfoFromDependencyQueueKV(key []byte) (consumerID, domainProject, uuid string) {
- keys := ToResponse(key)
- l := len(keys)
- if l < 4 {
- return
- }
- consumerID = keys[l-2]
- domainProject = fmt.Sprintf("%s/%s", keys[l-4], keys[l-3])
- uuid = keys[l-1]
+ domainProject, consumerID, uuid = getLast3Keys(key)
return
}
func GetInfoFromDependencyRuleKV(key []byte) (t string, _ *discovery.MicroServiceKey) {
- keys := ToResponse(key)
+ keys := splitKey(key)
l := len(keys)
if l < 5 {
return "", nil
@@ -162,7 +147,5 @@ func GetInfoFromDependencyRuleKV(key []byte) (t string, _ *discovery.MicroServic
}
func SplitDomainProject(domainProject string) (string, string) {
- domain := domainProject[:strings.Index(domainProject, SPLIT)]
- project := domainProject[strings.Index(domainProject, SPLIT)+1:]
- return domain, project
+ return stringutil.SplitToTwo(domainProject, SPLIT)
}
diff --git a/datasource/etcd/path/key_convertor_test.go b/datasource/etcd/path/key_convertor_test.go
index add76d3..dc09366 100644
--- a/datasource/etcd/path/key_convertor_test.go
+++ b/datasource/etcd/path/key_convertor_test.go
@@ -43,7 +43,7 @@ func TestGetInfoFromKV(t *testing.T) {
assert.False(t, d != "a")
d = path.GetInfoFromDomainKV([]byte("sdf"))
- assert.False(t, d != "")
+ assert.False(t, d != "sdf")
p := ""
d, p = path.GetInfoFromProjectKV([]byte(path.GenerateProjectKey("a", "b")))
@@ -88,6 +88,12 @@ func TestGetInfoFromKV(t *testing.T) {
d, s, m = path.GetInfoFromSchemaKV([]byte("sdf"))
assert.False(t, m != "" || s != "" || d != "")
+ d, h := path.GetInfoFromSchemaContentKV([]byte(path.GenerateServiceSchemaContentKey("a/b", "c")))
+ assert.False(t, h != "c" || d != "a/b")
+
+ d, h = path.GetInfoFromSchemaContentKV([]byte("sdf"))
+ assert.False(t, h != "" || d != "")
+
u := ""
s, d, u = path.GetInfoFromDependencyQueueKV([]byte(path.GenerateConsumerDependencyQueueKey("a/b", "c", "d")))
assert.False(t, s != "c" || d != "a/b" || u != "d")
diff --git a/datasource/etcd/path/key_generator.go b/datasource/etcd/path/key_generator.go
index 31d4a7f..59137d6 100644
--- a/datasource/etcd/path/key_generator.go
+++ b/datasource/etcd/path/key_generator.go
@@ -46,7 +46,6 @@ const (
DepsQueueUUID = "0"
DepsConsumer = "c"
DepsProvider = "p"
- RegistryRetirePlan = "retire-plan"
)
func GetRootKey() string {
@@ -384,7 +383,3 @@ func GenerateMetricsKey(name, utc, domain string) string {
domain,
}, SPLIT)
}
-
-func GenerateRetirePlanKey() string {
- return util.StringJoin([]string{GetRootKey(), RegistryRetirePlan}, SPLIT)
-}
diff --git a/datasource/etcd/retire.go b/datasource/etcd/retire.go
index 3087be6..54d4d67 100644
--- a/datasource/etcd/retire.go
+++ b/datasource/etcd/retire.go
@@ -19,19 +19,14 @@ package etcd
import (
"context"
- "encoding/json"
- "errors"
"fmt"
"sync/atomic"
- "time"
"github.com/apache/servicecomb-service-center/datasource"
- "github.com/apache/servicecomb-service-center/datasource/etcd/mux"
"github.com/apache/servicecomb-service-center/datasource/etcd/path"
"github.com/apache/servicecomb-service-center/datasource/etcd/sd"
"github.com/apache/servicecomb-service-center/datasource/etcd/state/kvstore"
serviceUtil "github.com/apache/servicecomb-service-center/datasource/etcd/util"
- "github.com/apache/servicecomb-service-center/pkg/etcdsync"
"github.com/apache/servicecomb-service-center/pkg/goutil"
"github.com/apache/servicecomb-service-center/pkg/log"
"github.com/apache/servicecomb-service-center/pkg/util"
@@ -40,30 +35,14 @@ import (
"github.com/little-cui/etcdadpt"
)
-const (
- poolSizeOfRotation = 5
- retirementLockID mux.ID = "/cse-sr/lock/retirement"
-)
-
-var ErrAlreadyRetire = errors.New("already retired by other SC")
+const poolSizeOfRotation = 5
type RotateServiceIDKey struct {
DomainProject string
ServiceID string
}
-func (ds *MetadataManager) RetireService(ctx context.Context, localPlan *datasource.RetirePlan) error {
- lock, err := getRetirementLock()
- if err != nil {
- return err
- }
- defer releaseRetirementLock(lock)
-
- plan, err := ds.getRetirePlan(ctx, localPlan)
- if err != nil || !plan.ShouldRetire() {
- return err
- }
-
+func (ds *MetadataManager) RetireService(ctx context.Context, plan *datasource.RetirePlan) error {
key := path.GetServiceIndexRootKey("")
indexesResp, err := sd.ServiceIndex().Search(ctx, etcdadpt.WithStrKey(key), etcdadpt.WithPrefix())
if err != nil {
@@ -84,41 +63,7 @@ func (ds *MetadataManager) RetireService(ctx context.Context, localPlan *datasou
if n > 0 {
log.Warn(fmt.Sprintf("%d microservices retired", n))
}
-
- plan.LastRunAt = time.Now().Unix()
- return ds.UpsertRetirePlan(ctx, plan)
-}
-
-func releaseRetirementLock(lock *etcdsync.DLock) {
- err := lock.Unlock()
- if err != nil {
- log.Error("", err)
- }
-}
-
-func getRetirementLock() (*etcdsync.DLock, error) {
- lock, err := mux.Try(retirementLockID)
- if err != nil {
- return nil, err
- }
- if lock == nil {
- return nil, ErrAlreadyRetire
- }
- return lock, nil
-}
-
-func (ds *MetadataManager) getRetirePlan(ctx context.Context, localPlan *datasource.RetirePlan) (*datasource.RetirePlan, error) {
- plan, err := ds.GetRetirePlan(ctx)
- if err != nil {
- return nil, err
- }
- if plan == nil {
- plan = localPlan
- } else {
- plan.Interval = localPlan.Interval
- plan.Reserve = localPlan.Reserve
- }
- return plan, nil
+ return nil
}
func FilterNoInstance(ctx context.Context, serviceIDKeys []*RotateServiceIDKey) []*RotateServiceIDKey {
@@ -198,30 +143,3 @@ func GetRetireServiceIDs(indexesResp *kvstore.Response, reserveVersionCount int)
}
return serviceIDs
}
-
-func (ds *MetadataManager) GetRetirePlan(ctx context.Context) (*datasource.RetirePlan, error) {
- kv, err := etcdadpt.Get(ctx, path.GenerateRetirePlanKey())
- if err != nil {
- log.Error("", err)
- return nil, err
- }
- if kv == nil {
- return nil, nil
- }
- var plan datasource.RetirePlan
- err = json.Unmarshal(kv.Value, &plan)
- if err != nil {
- log.Error("decode retire plan failed", err)
- return nil, err
- }
- return &plan, nil
-}
-
-func (ds *MetadataManager) UpsertRetirePlan(ctx context.Context, plan *datasource.RetirePlan) error {
- bytes, err := json.Marshal(plan)
- if err != nil {
- log.Error("encode retire plan failed", err)
- return err
- }
- return etcdadpt.PutBytes(ctx, path.GenerateRetirePlanKey(), bytes)
-}
diff --git a/datasource/etcd/schema.go b/datasource/etcd/schema.go
index afb1057..892cb8b 100644
--- a/datasource/etcd/schema.go
+++ b/datasource/etcd/schema.go
@@ -32,6 +32,7 @@ import (
mapset "github.com/deckarep/golang-set"
"github.com/go-chassis/cari/discovery"
"github.com/little-cui/etcdadpt"
+ "go.etcd.io/etcd/api/v3/mvccpb"
)
func init() {
@@ -352,10 +353,55 @@ func getContentHashMap(ctx context.Context) (map[string]struct{}, error) {
return refMap, nil
}
-func (dao *SchemaDAO) ListHash(ctx context.Context) ([]*schema.Content, error) {
- panic("implement me")
+func (dao *SchemaDAO) DeleteNoRefContents(ctx context.Context) (int, error) {
+ contentPrefixKey := path.GetServiceSchemaContentRootKey("")
+ kvs, _, err := etcdadpt.List(ctx, contentPrefixKey, etcdadpt.WithKeyOnly())
+ if err != nil {
+ log.Error("list contents failed", err)
+ return 0, err
+ }
+ if len(kvs) == 0 {
+ return 0, nil
+ }
+
+ set, err := filterNoRefContentHashes(ctx, kvs)
+ if err != nil {
+ log.Error("filter no ref content hashes failed", err)
+ return 0, err
+ }
+ if set.Cardinality() == 0 {
+ return 0, nil
+ }
+
+ var ops []etcdadpt.OpOptions
+ for item := range set.Iter() {
+ ops = append(ops, etcdadpt.OpDel(etcdadpt.WithStrKey(contentPrefixKey+item.(string))))
+ }
+ err = etcdadpt.Txn(ctx, ops)
+ if err != nil {
+ log.Error("txn delete failed", err)
+ return 0, err
+ }
+ return len(ops), nil
}
-func (dao *SchemaDAO) ExistRef(ctx context.Context, hash *schema.ContentRequest) (*schema.Ref, error) {
- panic("implement me")
+func filterNoRefContentHashes(ctx context.Context, kvs []*mvccpb.KeyValue) (mapset.Set, error) {
+ set := mapset.NewThreadUnsafeSet()
+ for _, kv := range kvs {
+ domainProject, hash := path.GetInfoFromSchemaContentKV(kv.Key)
+ set.Add(domainProject + path.SPLIT + hash)
+ }
+
+ refPrefixKey := path.GetServiceSchemaRefRootKey("")
+ resp, err := sd.SchemaRef().Search(ctx, serviceUtil.ContextOptions(ctx,
+ etcdadpt.WithStrKey(refPrefixKey), etcdadpt.WithPrefix())...)
+ if err != nil {
+ return nil, err
+ }
+
+ for _, kv := range resp.Kvs {
+ domainProject, _, _ := path.GetInfoFromSchemaRefKV(kv.Key)
+ set.Remove(domainProject + path.SPLIT + kv.Value.(string))
+ }
+ return set, nil
}
diff --git a/datasource/mongo/retire.go b/datasource/mongo/retire.go
index 0ffc9a9..88e76e4 100644
--- a/datasource/mongo/retire.go
+++ b/datasource/mongo/retire.go
@@ -24,5 +24,5 @@ import (
)
func (ds *MetadataManager) RetireService(ctx context.Context, plan *datasource.RetirePlan) error {
- panic("implement me")
+ return nil
}
diff --git a/datasource/mongo/schema.go b/datasource/mongo/schema.go
index 6d45f96..85f16e3 100644
--- a/datasource/mongo/schema.go
+++ b/datasource/mongo/schema.go
@@ -88,10 +88,6 @@ func (s *SchemaDAO) DeleteContent(ctx context.Context, contentRequest *schema.Co
return schema.ErrSchemaContentNotFound
}
-func (s *SchemaDAO) ListHash(ctx context.Context) ([]*schema.Content, error) {
- panic("implement me")
-}
-
-func (s *SchemaDAO) ExistRef(ctx context.Context, contentRequest *schema.ContentRequest) (*schema.Ref, error) {
- panic("implement me")
+func (s *SchemaDAO) DeleteNoRefContents(ctx context.Context) (int, error) {
+ return 0, nil
}
diff --git a/datasource/retire.go b/datasource/retire.go
index 47f0d2d..5585680 100644
--- a/datasource/retire.go
+++ b/datasource/retire.go
@@ -17,14 +17,7 @@
package datasource
-import "time"
-
type RetirePlan struct {
- Interval time.Duration `json:"interval,omitempty"`
- Reserve int `json:"reserve,omitempty"`
- LastRunAt int64 `json:"lastRunAt,omitempty" bson:"last_run_at"`
-}
-
-func (r *RetirePlan) ShouldRetire() bool {
- return time.Now().Add(-r.Interval).Unix() >= r.LastRunAt
+ Cron string `json:"cron,omitempty"`
+ Reserve int `json:"reserve,omitempty"`
}
diff --git a/datasource/schema/schema.go b/datasource/schema/schema.go
index 6e4e463..b005569 100644
--- a/datasource/schema/schema.go
+++ b/datasource/schema/schema.go
@@ -82,9 +82,7 @@ type DAO interface {
PutContent(ctx context.Context, contentRequest *PutContentRequest) error
PutManyContent(ctx context.Context, contentRequest *PutManyContentRequest) error
DeleteContent(ctx context.Context, contentRequest *ContentRequest) error
- // ListHash return Content list without content
- ListHash(ctx context.Context) ([]*Content, error)
- ExistRef(ctx context.Context, contentRequest *ContentRequest) (*Ref, error)
+ DeleteNoRefContents(ctx context.Context) (int, error)
}
func Hash(schemaID, content string) string {
diff --git a/etc/conf/app.yaml b/etc/conf/app.yaml
index 8d190e9..2848946 100644
--- a/etc/conf/app.yaml
+++ b/etc/conf/app.yaml
@@ -151,7 +151,7 @@ registry:
# delete other versions which doesn't register any instances.
retire:
disable: false
- interval: 12h
+ cron: '0 1 * * *'
reserve: 3
instance:
# By default, instance TTL = (times + 1) * interval
@@ -168,7 +168,7 @@ registry:
notEditable: false
# remove the schema without refs every 7d
retire:
- interval: 7d
+ cron: '0 2 * * *'
# enable to register sc itself when startup
selfRegister: 1
diff --git a/go.mod b/go.mod
index 5055d94..63b01a4 100644
--- a/go.mod
+++ b/go.mod
@@ -6,6 +6,7 @@ replace (
)
require (
+ github.com/robfig/cron/v3 v3.0.1
github.com/NYTimes/gziphandler v1.1.1
github.com/apache/servicecomb-service-center/api v0.0.0
github.com/astaxie/beego v1.12.2
diff --git a/go.sum b/go.sum
index 8a38c15..7f895cf 100644
--- a/go.sum
+++ b/go.sum
@@ -563,6 +563,8 @@ github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1
github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU=
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a h1:9ZKAASQSHhDYGoxY8uLVpewe1GDZ2vu2Tr/vTdVAkFQ=
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
+github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
+github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
github.com/rogpeppe/go-internal v1.1.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
diff --git a/integration/instances_test.go b/integration/instances_test.go
index f5ac910..457931b 100644
--- a/integration/instances_test.go
+++ b/integration/instances_test.go
@@ -17,28 +17,26 @@
package integrationtest_test
import (
+ "bytes"
"encoding/json"
"fmt"
+ "io/ioutil"
+ "math/rand"
"net/http"
+ "strconv"
"strings"
"sync"
-
- "github.com/go-chassis/cari/discovery"
- "github.com/gorilla/websocket"
- "github.com/stretchr/testify/assert"
+ "testing"
+ "time"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
- "github.com/widuu/gojson"
-
- "bytes"
- "io/ioutil"
- "math/rand"
- "strconv"
- "testing"
- "time"
. "github.com/apache/servicecomb-service-center/integration"
+ "github.com/go-chassis/cari/discovery"
+ "github.com/gorilla/websocket"
+ "github.com/stretchr/testify/assert"
+ "github.com/widuu/gojson"
)
var _ = Describe("MicroService Api Test", func() {
@@ -395,7 +393,7 @@ var _ = Describe("MicroService Api Test", func() {
})
It("Find Micro-service Info by alias", func() {
- req, _ := http.NewRequest(GET, SCURL+FINDINSTANCE+"?appId="+serviceAppId+"&serviceName="+alias+"&version="+serviceVersion, nil)
+ req, _ := http.NewRequest(GET, SCURL+FINDINSTANCE+"?noCache=true&appId="+serviceAppId+"&serviceName="+alias+"&version="+serviceVersion, nil)
req.Header.Set("X-Domain-Name", "default")
req.Header.Set("X-ConsumerId", consumerID)
resp, _ := scclient.Do(req)
diff --git a/server/job/disco/retire.go b/server/job/disco/retire.go
index 55e4fdf..fcc8f6e 100644
--- a/server/job/disco/retire.go
+++ b/server/job/disco/retire.go
@@ -20,49 +20,45 @@ package disco
import (
"context"
"fmt"
- "time"
"github.com/apache/servicecomb-service-center/datasource"
"github.com/apache/servicecomb-service-center/pkg/log"
"github.com/apache/servicecomb-service-center/server/config"
discosvc "github.com/apache/servicecomb-service-center/server/service/disco"
- "github.com/go-chassis/foundation/gopool"
+ "github.com/robfig/cron/v3"
)
const (
- defaultRetireMicroserviceInterval = 12 * time.Hour
- defaultReserveVersionCount = 3
+ defaultRetireMicroserviceCron = "0 1 * * *"
+ defaultReserveVersionCount = 3
)
func init() {
- startRetireServiceJob()
-}
-
-func startRetireServiceJob() {
disable := config.GetBool("registry.service.retire.disable", false)
if disable {
return
}
+ startRetireServiceJob()
+}
+func startRetireServiceJob() {
localPlan := &datasource.RetirePlan{
- Interval: config.GetDuration("registry.service.retire.interval", defaultRetireMicroserviceInterval),
- Reserve: config.GetInt("registry.service.retire.reserve", defaultReserveVersionCount),
+ Cron: config.GetString("registry.service.retire.cron", defaultRetireMicroserviceCron),
+ Reserve: config.GetInt("registry.service.retire.reserve", defaultReserveVersionCount),
}
-
log.Info(fmt.Sprintf("start retire microservice job, plan is %v", localPlan))
- gopool.Go(func(ctx context.Context) {
- tick := time.NewTicker(localPlan.Interval)
- defer tick.Stop()
- for {
- select {
- case <-ctx.Done():
- return
- case <-tick.C:
- err := discosvc.RetireService(ctx, localPlan)
- if err != nil {
- log.Error("retire microservice failed", err)
- }
- }
+
+ c := cron.New()
+ _, err := c.AddFunc(localPlan.Cron, func() {
+ //TODO use DLock
+ err := discosvc.RetireService(context.Background(), localPlan)
+ if err != nil {
+ log.Error("retire microservice failed", err)
}
})
+ if err != nil {
+ log.Error("cron add func failed", err)
+ return
+ }
+ c.Start()
}
diff --git a/server/service/disco/retire.go b/server/job/disco/schema.go
similarity index 53%
copy from server/service/disco/retire.go
copy to server/job/disco/schema.go
index 9b64b26..9ba782f 100644
--- a/server/service/disco/retire.go
+++ b/server/job/disco/schema.go
@@ -19,10 +19,32 @@ package disco
import (
"context"
+ "fmt"
- "github.com/apache/servicecomb-service-center/datasource"
+ "github.com/apache/servicecomb-service-center/pkg/log"
+ "github.com/apache/servicecomb-service-center/server/config"
+ discosvc "github.com/apache/servicecomb-service-center/server/service/disco"
+ "github.com/robfig/cron/v3"
)
-func RetireService(ctx context.Context, plan *datasource.RetirePlan) error {
- return datasource.GetMetadataManager().RetireService(ctx, plan)
+const (
+ defaultRetireSchemaCron = "0 2 * * *"
+)
+
+func init() {
+ cronExpr := config.GetString("registry.schema.retire.cron", defaultRetireSchemaCron)
+ log.Info(fmt.Sprintf("start retire schema job, plan is %v", cronExpr))
+ c := cron.New()
+ _, err := c.AddFunc(cronExpr, func() {
+ //TODO use DLock
+ err := discosvc.RetireSchema(context.Background())
+ if err != nil {
+ log.Error("retire schema failed", err)
+ }
+ })
+ if err != nil {
+ log.Error("cron add func failed", err)
+ return
+ }
+ c.Start()
}
diff --git a/server/service/disco/retire.go b/server/service/disco/retire.go
index 9b64b26..0d17d15 100644
--- a/server/service/disco/retire.go
+++ b/server/service/disco/retire.go
@@ -19,10 +19,25 @@ package disco
import (
"context"
+ "fmt"
"github.com/apache/servicecomb-service-center/datasource"
+ "github.com/apache/servicecomb-service-center/datasource/schema"
+ "github.com/apache/servicecomb-service-center/pkg/log"
)
func RetireService(ctx context.Context, plan *datasource.RetirePlan) error {
return datasource.GetMetadataManager().RetireService(ctx, plan)
}
+
+func RetireSchema(ctx context.Context) error {
+ n, err := schema.Instance().DeleteNoRefContents(ctx)
+ if err != nil {
+ log.Error("delete no ref contents failed", err)
+ return err
+ }
+ if n > 0 {
+ log.Warn(fmt.Sprintf("%d schema-contents retired", n))
+ }
+ return nil
+}
diff --git a/server/service/disco/retire_test.go b/server/service/disco/retire_test.go
index ca778c9..c41fbc6 100644
--- a/server/service/disco/retire_test.go
+++ b/server/service/disco/retire_test.go
@@ -22,6 +22,7 @@ import (
"testing"
"github.com/apache/servicecomb-service-center/datasource"
+ "github.com/apache/servicecomb-service-center/datasource/schema"
discosvc "github.com/apache/servicecomb-service-center/server/service/disco"
"github.com/apache/servicecomb-service-center/test"
pb "github.com/go-chassis/cari/discovery"
@@ -54,11 +55,12 @@ func TestRetireService(t *testing.T) {
idx := fmt.Sprintf("%d", i)
discosvc.UnregisterService(ctx, &pb.DeleteServiceRequest{
ServiceId: serviceIDPrefix + idx,
+ Force: true,
})
}
}()
- err := discosvc.RetireService(ctx, &datasource.RetirePlan{Interval: 0, Reserve: 1})
+ err := discosvc.RetireService(ctx, &datasource.RetirePlan{Reserve: 1})
assert.NoError(t, err)
resp, err := datasource.GetMetadataManager().ListServiceDetail(ctx, &pb.GetServicesInfoRequest{
@@ -69,3 +71,54 @@ func TestRetireService(t *testing.T) {
assert.Equal(t, serviceIDPrefix+"4", resp.AllServicesDetail[0].MicroService.ServiceId)
})
}
+
+func TestRetireSchema(t *testing.T) {
+ if !test.IsETCD() {
+ return
+ }
+
+ var serviceID string
+
+ ctx := getContext()
+ service, err := discosvc.RegisterService(ctx, &pb.CreateServiceRequest{
+ Service: &pb.MicroService{
+ ServiceName: "TestRetireSchema",
+ },
+ })
+ assert.NoError(t, err)
+ serviceID = service.ServiceId
+ schemaID := "schemaID_1"
+ content := "content_1"
+ hash := schema.Hash(schemaID, content)
+ defer schema.Instance().DeleteContent(ctx, &schema.ContentRequest{Hash: hash})
+ defer discosvc.UnregisterService(ctx, &pb.DeleteServiceRequest{ServiceId: serviceID, Force: true})
+
+ t.Run("retire schema with ref, should not delete it", func(t *testing.T) {
+ err = discosvc.PutSchema(ctx, &pb.ModifySchemaRequest{
+ ServiceId: serviceID,
+ SchemaId: schemaID,
+ Schema: content,
+ })
+ assert.NoError(t, err)
+
+ err := discosvc.RetireSchema(ctx)
+ assert.NoError(t, err)
+
+ _, err = discosvc.GetSchema(ctx, &pb.GetSchemaRequest{ServiceId: serviceID, SchemaId: schemaID})
+ assert.NoError(t, err)
+ })
+
+ t.Run("retire schema without ref, should delete it", func(t *testing.T) {
+ err := discosvc.DeleteSchema(ctx, &pb.DeleteSchemaRequest{
+ ServiceId: serviceID,
+ SchemaId: schemaID,
+ })
+ assert.NoError(t, err)
+
+ err = discosvc.RetireSchema(ctx)
+ assert.NoError(t, err)
+
+ _, err = schema.Instance().GetContent(ctx, &schema.ContentRequest{Hash: hash})
+ assert.ErrorIs(t, schema.ErrSchemaContentNotFound, err)
+ })
+}
diff --git a/server/service/disco/schema_test.go b/server/service/disco/schema_test.go
index 6930dc4..a77764c 100644
--- a/server/service/disco/schema_test.go
+++ b/server/service/disco/schema_test.go
@@ -1014,7 +1014,8 @@ func TestCompatibleOperateSchema(t *testing.T) {
})
assert.NoError(t, err)
assert.Equal(t, 2, len(schemas))
- schema = schemas[1]
+ schema = findSchemaBySchemaID(schemas, "schemaID_2")
+ assert.NotNil(t, schema)
assert.Equal(t, "schema_2", schema.Schema)
assert.Equal(t, "summary2", schema.Summary)
})
@@ -1040,10 +1041,14 @@ func TestCompatibleOperateSchema(t *testing.T) {
})
assert.NoError(t, err)
assert.Equal(t, 2, len(schemas))
- schema = schemas[0]
+
+ schema = findSchemaBySchemaID(schemas, "schemaID_1")
+ assert.NotNil(t, schema)
assert.Empty(t, schema.Schema)
assert.Empty(t, schema.Summary)
- schema = schemas[1]
+
+ schema = findSchemaBySchemaID(schemas, "schemaID_2")
+ assert.NotNil(t, schema)
assert.Equal(t, "schema_2", schema.Schema)
assert.Equal(t, "summary2", schema.Summary)
})
@@ -1069,11 +1074,24 @@ func TestCompatibleOperateSchema(t *testing.T) {
})
assert.NoError(t, err)
assert.Equal(t, 2, len(schemas))
- schema = schemas[0]
+
+ schema = findSchemaBySchemaID(schemas, "schemaID_1")
+ assert.NotNil(t, schema)
assert.Empty(t, schema.Schema)
assert.Empty(t, schema.Summary)
- schema = schemas[1]
+
+ schema = findSchemaBySchemaID(schemas, "schemaID_2")
+ assert.NotNil(t, schema)
assert.Empty(t, schema.Schema)
assert.Empty(t, schema.Summary)
})
}
+
+func findSchemaBySchemaID(schemas []*pb.Schema, schemaID string) *pb.Schema {
+ for _, schema := range schemas {
+ if schema.SchemaId == schemaID {
+ return schema
+ }
+ }
+ return nil
+}