You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by ti...@apache.org on 2021/08/16 00:49:30 UTC
[servicecomb-service-center] branch master updated: Feature:
service retirement plan (#1133)
This is an automated email from the ASF dual-hosted git repository.
tianxiaoliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-service-center.git
The following commit(s) were added to refs/heads/master by this push:
new 8194a63 Feature: service retirement plan (#1133)
8194a63 is described below
commit 8194a632f58509277397f91f0f63c282988fa096
Author: little-cui <su...@qq.com>
AuthorDate: Mon Aug 16 08:49:26 2021 +0800
Feature: service retirement plan (#1133)
* Feature: Add cleanup unused microservice job
* Add rotate configuration
* Resolve comment
---
datasource/etcd/etcd_suite_test.go | 15 +-
datasource/etcd/event/dependency_event_handler.go | 6 +-
datasource/etcd/mux/mux.go | 14 +-
datasource/etcd/path/key_generator.go | 5 +
datasource/etcd/retire.go | 227 +++++++++++++++++++++
datasource/etcd/retire_test.go | 198 ++++++++++++++++++
datasource/etcd/util/instance_util.go | 2 +-
datasource/etcd/util/versionrule.go | 84 +++++---
datasource/etcd/util/versionrule_test.go | 97 ++++++---
.../{etcd/etcd_suite_test.go => mongo/retire.go} | 27 +--
datasource/ms.go | 4 +
datasource/{etcd/mux/mux.go => retire.go} | 35 +---
etc/conf/app.yaml | 8 +
server/bootstrap/bootstrap.go | 1 +
server/job/disco/retire.go | 68 ++++++
.../service/disco/retire.go | 27 +--
server/service/disco/retire_test.go | 71 +++++++
test/test.go | 18 +-
18 files changed, 753 insertions(+), 154 deletions(-)
diff --git a/datasource/etcd/etcd_suite_test.go b/datasource/etcd/etcd_suite_test.go
index b0ad473..02a0304 100644
--- a/datasource/etcd/etcd_suite_test.go
+++ b/datasource/etcd/etcd_suite_test.go
@@ -19,17 +19,16 @@ package etcd_test
// initialize
import (
"context"
+ "testing"
+ "time"
_ "github.com/apache/servicecomb-service-center/test"
+ . "github.com/onsi/ginkgo"
+ . "github.com/onsi/gomega"
"github.com/apache/servicecomb-service-center/datasource"
- . "github.com/onsi/ginkgo"
+ "github.com/apache/servicecomb-service-center/pkg/util"
"github.com/onsi/ginkgo/reporters"
-
- "testing"
- "time"
-
- . "github.com/onsi/gomega"
)
var timeLimit = 2 * time.Second
@@ -45,3 +44,7 @@ func TestEtcd(t *testing.T) {
junitReporter := reporters.NewJUnitReporter("etcd.junit.xml")
RunSpecsWithDefaultAndCustomReporters(t, "etcd Suite", []Reporter{junitReporter})
}
+
+func getContext() context.Context {
+ return util.WithNoCache(util.SetDomainProject(context.Background(), "default", "default"))
+}
diff --git a/datasource/etcd/event/dependency_event_handler.go b/datasource/etcd/event/dependency_event_handler.go
index e0e2aa5..c16ca17 100644
--- a/datasource/etcd/event/dependency_event_handler.go
+++ b/datasource/etcd/event/dependency_event_handler.go
@@ -41,6 +41,8 @@ import (
"github.com/little-cui/etcdadpt"
)
+const DepQueueLock mux.ID = "/cse-sr/lock/dep-queue"
+
// just for unit test
var testMux sync.Mutex
@@ -79,9 +81,9 @@ func (h *DependencyEventHandler) backoff(f func(), retries int) int {
func (h *DependencyEventHandler) tryWithBackoff(success func() error, backoff func(), retries int) (int, error) {
defer log.Recover()
- lock, err := mux.Try(mux.DepQueueLock)
+ lock, err := mux.Try(DepQueueLock)
if err != nil {
- log.Error(fmt.Sprintf("try to lock %s failed", mux.DepQueueLock), err)
+ log.Error(fmt.Sprintf("try to lock %s failed", DepQueueLock), err)
return h.backoff(backoff, retries), err
}
diff --git a/datasource/etcd/mux/mux.go b/datasource/etcd/mux/mux.go
index 6f3a33e..7b7e3d8 100644
--- a/datasource/etcd/mux/mux.go
+++ b/datasource/etcd/mux/mux.go
@@ -24,9 +24,9 @@ import (
"github.com/apache/servicecomb-service-center/pkg/etcdsync"
)
-type Type string
+type ID string
-func (m *Type) String() (s string) {
+func (m *ID) String() (s string) {
pMT := (*reflect.StringHeader)(unsafe.Pointer(m))
pStr := (*reflect.StringHeader)(unsafe.Pointer(&s))
pStr.Data = pMT.Data
@@ -34,16 +34,12 @@ func (m *Type) String() (s string) {
return
}
-const (
- GlobalLock Type = "/cse-sr/lock/global"
- DepQueueLock Type = "/cse-sr/lock/dep-queue"
- ServiceClearLock Type = "/cse-sr/lock/service-clear"
-)
+const GlobalLock ID = "/cse-sr/lock/global"
-func Lock(t Type) (*etcdsync.DLock, error) {
+func Lock(t ID) (*etcdsync.DLock, error) {
return etcdsync.Lock(t.String(), -1, true)
}
-func Try(t Type) (*etcdsync.DLock, error) {
+func Try(t ID) (*etcdsync.DLock, error) {
return etcdsync.Lock(t.String(), -1, false)
}
diff --git a/datasource/etcd/path/key_generator.go b/datasource/etcd/path/key_generator.go
index 2a7db02..4e63818 100644
--- a/datasource/etcd/path/key_generator.go
+++ b/datasource/etcd/path/key_generator.go
@@ -44,6 +44,7 @@ const (
DepsQueueUUID = "0"
DepsConsumer = "c"
DepsProvider = "p"
+ RegistryRetirePlan = "retire-plan"
)
func GetRootKey() string {
@@ -347,3 +348,7 @@ func GenerateMetricsKey(name, utc, domain string) string {
domain,
}, SPLIT)
}
+
+func GenerateRetirePlanKey() string {
+ return util.StringJoin([]string{GetRootKey(), RegistryRetirePlan}, SPLIT)
+}
diff --git a/datasource/etcd/retire.go b/datasource/etcd/retire.go
new file mode 100644
index 0000000..eda5b35
--- /dev/null
+++ b/datasource/etcd/retire.go
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package etcd
+
+import (
+ "context"
+ "encoding/json"
+ "errors"
+ "fmt"
+ "sync/atomic"
+ "time"
+
+ "github.com/apache/servicecomb-service-center/datasource"
+ "github.com/apache/servicecomb-service-center/datasource/etcd/mux"
+ "github.com/apache/servicecomb-service-center/datasource/etcd/path"
+ "github.com/apache/servicecomb-service-center/datasource/etcd/sd"
+ "github.com/apache/servicecomb-service-center/datasource/etcd/state/kvstore"
+ serviceUtil "github.com/apache/servicecomb-service-center/datasource/etcd/util"
+ "github.com/apache/servicecomb-service-center/pkg/etcdsync"
+ "github.com/apache/servicecomb-service-center/pkg/goutil"
+ "github.com/apache/servicecomb-service-center/pkg/log"
+ "github.com/apache/servicecomb-service-center/pkg/util"
+ pb "github.com/go-chassis/cari/discovery"
+ "github.com/go-chassis/foundation/gopool"
+ "github.com/little-cui/etcdadpt"
+)
+
+const (
+ poolSizeOfRotation = 5
+ retirementLockID mux.ID = "/cse-sr/lock/retirement"
+)
+
+var ErrAlreadyRetire = errors.New("already retired by other SC")
+
+type RotateServiceIDKey struct {
+ DomainProject string
+ ServiceID string
+}
+
+func (ds *MetadataManager) RetireService(ctx context.Context, localPlan *datasource.RetirePlan) error {
+ lock, err := getRetirementLock()
+ if err != nil {
+ return err
+ }
+ defer releaseRetirementLock(lock)
+
+ plan, err := ds.getRetirePlan(ctx, localPlan)
+ if err != nil || !plan.ShouldRetire() {
+ return err
+ }
+
+ key := path.GetServiceIndexRootKey("")
+ indexesResp, err := sd.ServiceIndex().Search(ctx, etcdadpt.WithStrKey(key), etcdadpt.WithPrefix())
+ if err != nil {
+ log.Error("query all microservices indexes failed", err)
+ return err
+ }
+ serviceIDKeys := GetRetireServiceIDs(indexesResp, plan.Reserve)
+ if len(serviceIDKeys) == 0 {
+ return nil
+ }
+ serviceIDKeys = FilterNoInstance(ctx, serviceIDKeys)
+ if len(serviceIDKeys) == 0 {
+ return nil
+ }
+
+ log.Warn(fmt.Sprintf("start retire %d microservices", len(serviceIDKeys)))
+ n := UnregisterManyService(ctx, serviceIDKeys)
+ if n > 0 {
+ log.Warn(fmt.Sprintf("%d microservices retired", n))
+ }
+
+ plan.LastRunAt = time.Now().Unix()
+ return ds.UpsertRetirePlan(ctx, plan)
+}
+
+func releaseRetirementLock(lock *etcdsync.DLock) {
+ err := lock.Unlock()
+ if err != nil {
+ log.Error("", err)
+ }
+}
+
+func getRetirementLock() (*etcdsync.DLock, error) {
+ lock, err := mux.Try(retirementLockID)
+ if err != nil {
+ return nil, err
+ }
+ if lock == nil {
+ return nil, ErrAlreadyRetire
+ }
+ return lock, nil
+}
+
+func (ds *MetadataManager) getRetirePlan(ctx context.Context, localPlan *datasource.RetirePlan) (*datasource.RetirePlan, error) {
+ plan, err := ds.GetRetirePlan(ctx)
+ if err != nil {
+ return nil, err
+ }
+ if plan == nil {
+ plan = localPlan
+ } else {
+ plan.Interval = localPlan.Interval
+ plan.Reserve = localPlan.Reserve
+ }
+ return plan, nil
+}
+
+func FilterNoInstance(ctx context.Context, serviceIDKeys []*RotateServiceIDKey) []*RotateServiceIDKey {
+ matched := make([]*RotateServiceIDKey, 0, len(serviceIDKeys))
+ for _, serviceIDKey := range serviceIDKeys {
+ serviceID := serviceIDKey.ServiceID
+ instanceKey := path.GenerateInstanceKey(serviceIDKey.DomainProject, serviceID, "")
+ resp, err := sd.Instance().Search(ctx, etcdadpt.WithStrKey(instanceKey),
+ etcdadpt.WithPrefix(), etcdadpt.WithCountOnly())
+ if err != nil {
+ log.Error(fmt.Sprintf("count microservice %s instance failed", serviceID), err)
+ continue
+ }
+ if resp.Count > 0 {
+ continue
+ }
+ matched = append(matched, serviceIDKey)
+ }
+ return matched
+}
+
+func UnregisterManyService(ctx context.Context, serviceIDKeys []*RotateServiceIDKey) (deleted int64) {
+ pool := goutil.New(gopool.Configure().WithContext(ctx).Workers(poolSizeOfRotation))
+ defer pool.Done()
+
+ for _, key := range serviceIDKeys {
+ domainProject := key.DomainProject
+ serviceID := key.ServiceID
+ pool.Do(func(ctx context.Context) {
+ resp, err := datasource.GetMetadataManager().UnregisterService(util.SetDomainProjectString(ctx, domainProject),
+ &pb.DeleteServiceRequest{ServiceId: serviceID})
+ if err == nil && resp.Response.IsSucceed() {
+ atomic.AddInt64(&deleted, 1)
+ }
+ })
+ }
+ return
+}
+
+func GetRetireServiceIDs(indexesResp *kvstore.Response, reserveVersionCount int) []*RotateServiceIDKey {
+ total := indexesResp.Count
+ if total == 0 {
+ return nil
+ }
+ var (
+ counter = make(map[string]int, total)
+ matched = make(map[string]*pb.MicroServiceKey)
+ serviceVersionMap = make(map[string][]*kvstore.KeyValue, total)
+ )
+ for _, kv := range indexesResp.Kvs {
+ serviceVersionKey := path.GetInfoFromSvcIndexKV(kv.Key)
+ serviceVersionKey.Version = ""
+ serviceKey := path.GenerateServiceIndexKey(serviceVersionKey)
+ count := counter[serviceKey]
+ count++
+ counter[serviceKey] = count
+ serviceVersionMap[serviceKey] = append(serviceVersionMap[serviceKey], kv)
+ if count > reserveVersionCount {
+ matched[serviceKey] = serviceVersionKey
+ }
+ }
+
+ if len(matched) == 0 {
+ return nil
+ }
+
+ var serviceIDs []*RotateServiceIDKey
+ for key, serviceKey := range matched {
+ kvs := serviceVersionMap[key]
+ serviceUtil.Sort(kvs, serviceUtil.Larger)
+ for _, kv := range kvs[reserveVersionCount:] {
+ serviceIDs = append(serviceIDs, &RotateServiceIDKey{
+ DomainProject: serviceKey.Tenant,
+ ServiceID: kv.Value.(string),
+ })
+ }
+ }
+ return serviceIDs
+}
+
+func (ds *MetadataManager) GetRetirePlan(ctx context.Context) (*datasource.RetirePlan, error) {
+ kv, err := etcdadpt.Get(ctx, path.GenerateRetirePlanKey())
+ if err != nil {
+ log.Error("", err)
+ return nil, err
+ }
+ if kv == nil {
+ return nil, nil
+ }
+ var plan datasource.RetirePlan
+ err = json.Unmarshal(kv.Value, &plan)
+ if err != nil {
+ log.Error("decode retire plan failed", err)
+ return nil, err
+ }
+ return &plan, nil
+}
+
+func (ds *MetadataManager) UpsertRetirePlan(ctx context.Context, plan *datasource.RetirePlan) error {
+ bytes, err := json.Marshal(plan)
+ if err != nil {
+ log.Error("encode retire plan failed", err)
+ return err
+ }
+ return etcdadpt.PutBytes(ctx, path.GenerateRetirePlanKey(), bytes)
+}
diff --git a/datasource/etcd/retire_test.go b/datasource/etcd/retire_test.go
new file mode 100644
index 0000000..1dc4ada
--- /dev/null
+++ b/datasource/etcd/retire_test.go
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package etcd_test
+
+import (
+ "fmt"
+ "reflect"
+ "testing"
+
+ "github.com/apache/servicecomb-service-center/datasource"
+ "github.com/apache/servicecomb-service-center/datasource/etcd"
+ "github.com/apache/servicecomb-service-center/datasource/etcd/path"
+ "github.com/apache/servicecomb-service-center/datasource/etcd/state/kvstore"
+ pb "github.com/go-chassis/cari/discovery"
+ "github.com/little-cui/etcdadpt"
+ "github.com/stretchr/testify/assert"
+)
+
+const domainProject = "default/default"
+
+func TestGetOldServiceIDs(t *testing.T) {
+ const (
+ reserve = 3
+ etcdKeyPrefix = "/cse-sr/ms/indexes/default/default//default/"
+ )
+
+ type args struct {
+ indexesResp *kvstore.Response
+ }
+ tests := []struct {
+ name string
+ args args
+ want []*etcd.RotateServiceIDKey
+ }{
+ {"input empty should return empty", args{indexesResp: &kvstore.Response{}}, nil},
+ {"less then reserve version count should return empty", args{indexesResp: &kvstore.Response{
+ Kvs: []*kvstore.KeyValue{
+ {Key: []byte(etcdKeyPrefix + "svc1/1.0"), Value: "svc1-1.0"},
+ {Key: []byte(etcdKeyPrefix + "svc1/1.2"), Value: "svc1-1.2"},
+ {Key: []byte(etcdKeyPrefix + "svc1/2.0"), Value: "svc1-2.0"},
+ {Key: []byte(etcdKeyPrefix + "svc2/1.0"), Value: "svc2-1.0"},
+ }, Count: 4,
+ }}, nil},
+ {"large then reserve version count should return 2 version", args{indexesResp: &kvstore.Response{
+ Kvs: []*kvstore.KeyValue{
+ {Key: []byte(etcdKeyPrefix + "svc1/1.0"), Value: "svc1-1.0"},
+ {Key: []byte(etcdKeyPrefix + "svc1/1.2"), Value: "svc1-1.2"},
+ {Key: []byte(etcdKeyPrefix + "svc1/1.3"), Value: "svc1-1.3"},
+ {Key: []byte(etcdKeyPrefix + "svc1/2.0"), Value: "svc1-2.0"},
+ {Key: []byte(etcdKeyPrefix + "svc1/3.0"), Value: "svc1-3.0"},
+ {Key: []byte(etcdKeyPrefix + "svc2/1.0"), Value: "svc2-1.0"},
+ }, Count: 6,
+ }}, []*etcd.RotateServiceIDKey{
+ {domainProject, "svc1-1.2"},
+ {domainProject, "svc1-1.0"},
+ }},
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ if got := etcd.GetRetireServiceIDs(tt.args.indexesResp, reserve); !reflect.DeepEqual(got, tt.want) {
+ t.Errorf("GetRetireServiceIDs() = %v, want %v", got, tt.want)
+ }
+ })
+ }
+}
+
+func TestFilterInUsed(t *testing.T) {
+ const (
+ unusedServiceID = "filterUnused"
+ notExistServiceID = "not-exist"
+ inusedServiceID = "filterInused"
+ )
+ ctx := getContext()
+
+ _, err := datasource.GetMetadataManager().RegisterService(ctx, &pb.CreateServiceRequest{
+ Service: &pb.MicroService{
+ ServiceId: unusedServiceID,
+ ServiceName: unusedServiceID,
+ },
+ })
+ assert.NoError(t, err)
+ defer datasource.GetMetadataManager().UnregisterService(ctx, &pb.DeleteServiceRequest{ServiceId: unusedServiceID, Force: true})
+
+ _, err = datasource.GetMetadataManager().RegisterService(ctx, &pb.CreateServiceRequest{
+ Service: &pb.MicroService{
+ ServiceId: inusedServiceID,
+ ServiceName: inusedServiceID,
+ },
+ })
+ assert.NoError(t, err)
+ defer datasource.GetMetadataManager().UnregisterService(ctx, &pb.DeleteServiceRequest{ServiceId: inusedServiceID, Force: true})
+
+ _, err = datasource.GetMetadataManager().RegisterInstance(ctx, &pb.RegisterInstanceRequest{
+ Instance: &pb.MicroServiceInstance{
+ ServiceId: inusedServiceID,
+ HostName: inusedServiceID,
+ },
+ })
+ assert.NoError(t, err)
+
+ type args struct {
+ serviceIDKeys []*etcd.RotateServiceIDKey
+ }
+ tests := []struct {
+ name string
+ args args
+ want []*etcd.RotateServiceIDKey
+ }{
+ {"input empty should return empty", args{serviceIDKeys: []*etcd.RotateServiceIDKey{}}, []*etcd.RotateServiceIDKey{}},
+ {"input only one unused should return it", args{serviceIDKeys: []*etcd.RotateServiceIDKey{
+ {DomainProject: domainProject, ServiceID: notExistServiceID},
+ {DomainProject: domainProject, ServiceID: inusedServiceID},
+ {DomainProject: domainProject, ServiceID: unusedServiceID},
+ }}, []*etcd.RotateServiceIDKey{
+ {DomainProject: domainProject, ServiceID: notExistServiceID},
+ {DomainProject: domainProject, ServiceID: unusedServiceID},
+ }},
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ if got := etcd.FilterNoInstance(ctx, tt.args.serviceIDKeys); !reflect.DeepEqual(got, tt.want) {
+ t.Errorf("FilterNoInstance() = %v, want %v", got, tt.want)
+ }
+ })
+ }
+}
+
+func TestUnregisterManyService(t *testing.T) {
+ const serviceIDPrefix = "TestUnregisterManyService"
+ ctx := getContext()
+
+ t.Run("delete many should ok", func(t *testing.T) {
+ const serviceVersionCount = 10
+ var serviceIDs []*etcd.RotateServiceIDKey
+ for i := 0; i < serviceVersionCount; i++ {
+ serviceID := serviceIDPrefix + fmt.Sprintf("%v", i)
+ _, err := datasource.GetMetadataManager().RegisterService(ctx, &pb.CreateServiceRequest{
+ Service: &pb.MicroService{
+ ServiceId: serviceID,
+ ServiceName: serviceID,
+ },
+ })
+ assert.NoError(t, err)
+ serviceIDs = append(serviceIDs, &etcd.RotateServiceIDKey{DomainProject: domainProject, ServiceID: serviceID})
+ }
+
+ deleted := etcd.UnregisterManyService(ctx, serviceIDs)
+ assert.Equal(t, int64(serviceVersionCount), deleted)
+
+ _, i, err := etcdadpt.List(ctx, path.GenerateServiceKey(domainProject, serviceIDPrefix))
+ assert.NoError(t, err)
+ assert.Equal(t, int64(0), i)
+ })
+
+ t.Run("delete inused should failed", func(t *testing.T) {
+ var serviceIDs []*etcd.RotateServiceIDKey
+ service, err := datasource.GetMetadataManager().RegisterService(ctx, &pb.CreateServiceRequest{
+ Service: &pb.MicroService{
+ ServiceId: serviceIDPrefix + "1",
+ ServiceName: serviceIDPrefix + "1",
+ },
+ })
+ assert.NoError(t, err)
+ serviceIDs = append(serviceIDs, &etcd.RotateServiceIDKey{DomainProject: domainProject, ServiceID: service.ServiceId})
+
+ defer datasource.GetMetadataManager().UnregisterService(ctx, &pb.DeleteServiceRequest{ServiceId: service.ServiceId, Force: true})
+
+ _, err = datasource.GetMetadataManager().RegisterInstance(ctx, &pb.RegisterInstanceRequest{
+ Instance: &pb.MicroServiceInstance{
+ ServiceId: service.ServiceId,
+ HostName: service.ServiceId,
+ },
+ })
+ assert.NoError(t, err)
+
+ deleted := etcd.UnregisterManyService(ctx, serviceIDs)
+ assert.Equal(t, int64(0), deleted)
+
+ _, i, err := etcdadpt.List(ctx, path.GenerateServiceKey(domainProject, serviceIDPrefix))
+ assert.NoError(t, err)
+ assert.Equal(t, int64(1), i)
+ })
+}
diff --git a/datasource/etcd/util/instance_util.go b/datasource/etcd/util/instance_util.go
index f9be4f7..6e68245 100644
--- a/datasource/etcd/util/instance_util.go
+++ b/datasource/etcd/util/instance_util.go
@@ -142,7 +142,6 @@ func DeleteServiceAllInstances(ctx context.Context, serviceID string) error {
return err
}
if resp.Count <= 0 {
- log.Warn(fmt.Sprintf("service[%s] has no deployment of instance.", serviceID))
return nil
}
for _, v := range resp.Kvs {
@@ -152,6 +151,7 @@ func DeleteServiceAllInstances(ctx context.Context, serviceID string) error {
log.Error("", err)
}
}
+ log.Warn(fmt.Sprintf("force delete service[%s] %d instance.", serviceID, resp.Count))
return nil
}
diff --git a/datasource/etcd/util/versionrule.go b/datasource/etcd/util/versionrule.go
index 19aa3a9..84c8af6 100644
--- a/datasource/etcd/util/versionrule.go
+++ b/datasource/etcd/util/versionrule.go
@@ -26,20 +26,34 @@ import (
"github.com/apache/servicecomb-service-center/pkg/validate"
)
-type VersionRule func(sorted []string, kvs map[string]*kvstore.KeyValue, start, end string) []string
+type VersionRule func(sorted []string, kvs []*kvstore.KeyValue, start, end string) []string
-func (vr VersionRule) Match(kvs []*kvstore.KeyValue, ops ...string) []string {
+func Sort(kvs []*kvstore.KeyValue, cmp func(start, end string) bool) {
+ sorter := newSorter(kvs, cmp, true)
+ sort.Sort(sorter)
+}
+
+func newSorter(kvs []*kvstore.KeyValue, cmp func(start string, end string) bool, ref bool) *serviceKeySorter {
+ tmp := kvs
+ if !ref {
+ tmp = make([]*kvstore.KeyValue, len(kvs))
+ }
sorter := &serviceKeySorter{
sortArr: make([]string, len(kvs)),
- kvs: make(map[string]*kvstore.KeyValue, len(kvs)),
- cmp: Larger,
+ kvs: tmp,
+ cmp: cmp,
}
for i, kv := range kvs {
key := util.BytesToStringWithNoCopy(kv.Key)
ver := key[strings.LastIndex(key, "/")+1:]
sorter.sortArr[i] = ver
- sorter.kvs[ver] = kv
+ sorter.kvs[i] = kv
}
+ return sorter
+}
+
+func (vr VersionRule) Match(kvs []*kvstore.KeyValue, ops ...string) []string {
+ sorter := newSorter(kvs, Larger, false)
sort.Sort(sorter)
args := [2]string{}
@@ -55,7 +69,7 @@ func (vr VersionRule) Match(kvs []*kvstore.KeyValue, ops ...string) []string {
type serviceKeySorter struct {
sortArr []string
- kvs map[string]*kvstore.KeyValue
+ kvs []*kvstore.KeyValue
cmp func(i, j string) bool
}
@@ -65,6 +79,7 @@ func (sks *serviceKeySorter) Len() int {
func (sks *serviceKeySorter) Swap(i, j int) {
sks.sortArr[i], sks.sortArr[j] = sks.sortArr[j], sks.sortArr[i]
+ sks.kvs[i], sks.kvs[j] = sks.kvs[j], sks.kvs[i]
}
func (sks *serviceKeySorter) Less(i, j int) bool {
@@ -81,50 +96,57 @@ func LessEqual(start, end string) bool {
return !Larger(start, end)
}
-func Latest(sorted []string, kvs map[string]*kvstore.KeyValue, start, end string) []string {
+// Latest return latest version kv
+func Latest(sorted []string, kvs []*kvstore.KeyValue, start, end string) []string {
if len(sorted) == 0 {
return []string{}
}
- return []string{kvs[sorted[0]].Value.(string)}
+ return []string{kvs[0].Value.(string)}
}
-func Range(sorted []string, kvs map[string]*kvstore.KeyValue, start, end string) []string {
- result := make([]string, len(sorted))
- i, flag := 0, 0
+// Range return start <= version < end
+func Range(sorted []string, kvs []*kvstore.KeyValue, start, end string) []string {
+ total := len(sorted)
+ if total == 0 {
+ return []string{}
+ }
+
+ result := make([]string, 0, total)
+ firstFound := false
if Larger(start, end) {
start, end = end, start
}
- l := len(sorted)
- if l == 0 || Larger(start, sorted[0]) || LessEqual(end, sorted[l-1]) {
+ eldest, latest := sorted[total-1], sorted[0]
+ if Larger(start, latest) || LessEqual(end, eldest) {
return []string{}
}
- for _, k := range sorted {
- // end >= k >= start
- switch flag {
- case 0:
+ for i, k := range sorted {
+ if !firstFound {
if LessEqual(end, k) {
continue
}
- flag = 1
- case 1:
- if Larger(start, k) {
- return result[:i]
- }
+ firstFound = true
+ } else if Larger(start, k) {
+ break
}
-
- result[i] = kvs[k].Value.(string)
- i++
+ // end >= k >= start
+ result = append(result, kvs[i].Value.(string))
}
- return result[:i]
+ return result
}
-func AtLess(sorted []string, kvs map[string]*kvstore.KeyValue, start, end string) []string {
- result := make([]string, len(sorted))
+// AtLess return version >= start
+func AtLess(sorted []string, kvs []*kvstore.KeyValue, start, end string) []string {
+ total := len(sorted)
+ if total == 0 {
+ return []string{}
+ }
- if len(sorted) == 0 || Larger(start, sorted[0]) {
+ result := make([]string, 0, total)
+ if Larger(start, sorted[0]) {
return []string{}
}
@@ -132,9 +154,9 @@ func AtLess(sorted []string, kvs map[string]*kvstore.KeyValue, start, end string
if Larger(start, k) {
return result[:i]
}
- result[i] = kvs[k].Value.(string)
+ result = append(result, kvs[i].Value.(string))
}
- return result[:]
+ return result
}
func ParseVersionRule(versionRule string) func(kvs []*kvstore.KeyValue) []string {
diff --git a/datasource/etcd/util/versionrule_test.go b/datasource/etcd/util/versionrule_test.go
index 44a05cb..787f7ff 100644
--- a/datasource/etcd/util/versionrule_test.go
+++ b/datasource/etcd/util/versionrule_test.go
@@ -18,11 +18,11 @@ package util
import (
"fmt"
- sd "github.com/apache/servicecomb-service-center/datasource/etcd/state/kvstore"
"reflect"
"sort"
"testing"
+ "github.com/apache/servicecomb-service-center/datasource/etcd/state/kvstore"
"github.com/apache/servicecomb-service-center/pkg/log"
"github.com/stretchr/testify/assert"
)
@@ -30,11 +30,11 @@ import (
const VERSIONRULE_BASE = 5000
func BenchmarkVersionRule_Latest_GetServicesIds(b *testing.B) {
- var kvs = make([]*sd.KeyValue, VERSIONRULE_BASE)
+ var kvs = make([]*kvstore.KeyValue, VERSIONRULE_BASE)
for i := 1; i <= VERSIONRULE_BASE; i++ {
- kvs[i-1] = &sd.KeyValue{
+ kvs[i-1] = &kvstore.KeyValue{
Key: []byte(fmt.Sprintf("/service/ver/1.%d", i)),
- Value: []byte(fmt.Sprintf("%d", i)),
+ Value: fmt.Sprintf("%d", i),
}
}
b.N = VERSIONRULE_BASE
@@ -44,14 +44,15 @@ func BenchmarkVersionRule_Latest_GetServicesIds(b *testing.B) {
}
b.ReportAllocs()
// 5000 7105020 ns/op 2180198 B/op 39068 allocs/op
+ // 5000 8364556 ns/op 123167 B/op 5 allocs/op
}
func BenchmarkVersionRule_Range_GetServicesIds(b *testing.B) {
- var kvs = make([]*sd.KeyValue, VERSIONRULE_BASE)
+ var kvs = make([]*kvstore.KeyValue, VERSIONRULE_BASE)
for i := 1; i <= VERSIONRULE_BASE; i++ {
- kvs[i-1] = &sd.KeyValue{
+ kvs[i-1] = &kvstore.KeyValue{
Key: []byte(fmt.Sprintf("/service/ver/1.%d", i)),
- Value: []byte(fmt.Sprintf("%d", i)),
+ Value: fmt.Sprintf("%d", i),
}
}
b.N = VERSIONRULE_BASE
@@ -61,14 +62,15 @@ func BenchmarkVersionRule_Range_GetServicesIds(b *testing.B) {
}
b.ReportAllocs()
// 5000 7244029 ns/op 2287389 B/op 39584 allocs/op
+ // 5000 8824243 ns/op 205161 B/op 9 allocs/op
}
func BenchmarkVersionRule_AtLess_GetServicesIds(b *testing.B) {
- var kvs = make([]*sd.KeyValue, VERSIONRULE_BASE)
+ var kvs = make([]*kvstore.KeyValue, VERSIONRULE_BASE)
for i := 1; i <= VERSIONRULE_BASE; i++ {
- kvs[i-1] = &sd.KeyValue{
+ kvs[i-1] = &kvstore.KeyValue{
Key: []byte(fmt.Sprintf("/service/ver/1.%d", i)),
- Value: []byte(fmt.Sprintf("%d", i)),
+ Value: fmt.Sprintf("%d", i),
}
}
b.N = VERSIONRULE_BASE
@@ -78,11 +80,12 @@ func BenchmarkVersionRule_AtLess_GetServicesIds(b *testing.B) {
}
b.ReportAllocs()
// 5000 11221098 ns/op 3174720 B/op 58064 allocs/op
+ // 5000 8723274 ns/op 205146 B/op 7 allocs/op
}
func BenchmarkParseVersionRule(b *testing.B) {
- f := ParseVersionRule("latest")
- kvs := []*sd.KeyValue{
+ f := ParseVersionRule("0.0.0.0+")
+ kvs := []*kvstore.KeyValue{
{
Key: []byte("/service/ver/1.0.300"),
Value: "1.0.300",
@@ -103,6 +106,8 @@ func BenchmarkParseVersionRule(b *testing.B) {
}
})
b.ReportAllocs()
+ // latest: 4786342 214 ns/op 160 B/op 4 allocs/op
+ // 0.0.0.0+: 2768061 374 ns/op 192 B/op 4 allocs/op
}
func TestSorter(t *testing.T) {
@@ -112,7 +117,7 @@ func TestSorter(t *testing.T) {
kvs := []string{"1.0.0", "1.0.1"}
sort.Sort(&serviceKeySorter{
sortArr: kvs,
- kvs: make(map[string]*sd.KeyValue),
+ kvs: make([]*kvstore.KeyValue, len(kvs)),
cmp: Larger,
})
assert.Equal(t, "1.0.1", kvs[0])
@@ -123,7 +128,7 @@ func TestSorter(t *testing.T) {
kvs := []string{"1.0.1", "1.0.0"}
sort.Sort(&serviceKeySorter{
sortArr: kvs,
- kvs: make(map[string]*sd.KeyValue),
+ kvs: make([]*kvstore.KeyValue, len(kvs)),
cmp: Larger,
})
assert.Equal(t, "1.0.1", kvs[0])
@@ -134,7 +139,7 @@ func TestSorter(t *testing.T) {
kvs := []string{"1.0.0.0", "1.0.1"}
sort.Sort(&serviceKeySorter{
sortArr: kvs,
- kvs: make(map[string]*sd.KeyValue),
+ kvs: make([]*kvstore.KeyValue, len(kvs)),
cmp: Larger,
})
assert.Equal(t, "1.0.1", kvs[0])
@@ -145,7 +150,7 @@ func TestSorter(t *testing.T) {
kvs := []string{"1.0.9", "1.0.10"}
sort.Sort(&serviceKeySorter{
sortArr: kvs,
- kvs: make(map[string]*sd.KeyValue),
+ kvs: make([]*kvstore.KeyValue, len(kvs)),
cmp: Larger,
})
assert.Equal(t, "1.0.10", kvs[0])
@@ -156,7 +161,7 @@ func TestSorter(t *testing.T) {
kvs := []string{"1.10", "4"}
sort.Sort(&serviceKeySorter{
sortArr: kvs,
- kvs: make(map[string]*sd.KeyValue),
+ kvs: make([]*kvstore.KeyValue, len(kvs)),
cmp: Larger,
})
assert.Equal(t, "4", kvs[0])
@@ -169,7 +174,7 @@ func TestSorter(t *testing.T) {
kvs := []string{"1.a", "1.0.1.a", ""}
sort.Sort(&serviceKeySorter{
sortArr: kvs,
- kvs: make(map[string]*sd.KeyValue),
+ kvs: make([]*kvstore.KeyValue, len(kvs)),
cmp: Larger,
})
assert.Equal(t, "1.a", kvs[0])
@@ -181,7 +186,7 @@ func TestSorter(t *testing.T) {
kvs := []string{"1.0", "1.0.1.32768"}
sort.Sort(&serviceKeySorter{
sortArr: kvs,
- kvs: make(map[string]*sd.KeyValue),
+ kvs: make([]*kvstore.KeyValue, len(kvs)),
cmp: Larger,
})
assert.Equal(t, "1.0", kvs[0])
@@ -189,7 +194,7 @@ func TestSorter(t *testing.T) {
kvs = []string{"1.0", "1.0.1.32767"}
sort.Sort(&serviceKeySorter{
sortArr: kvs,
- kvs: make(map[string]*sd.KeyValue),
+ kvs: make([]*kvstore.KeyValue, len(kvs)),
cmp: Larger,
})
assert.Equal(t, "1.0.1.32767", kvs[0])
@@ -199,9 +204,9 @@ func TestSorter(t *testing.T) {
func TestVersionRule(t *testing.T) {
const count = 10
- var kvs = [count]*sd.KeyValue{}
+ var kvs = [count]*kvstore.KeyValue{}
for i := 1; i <= count; i++ {
- kvs[i-1] = &sd.KeyValue{
+ kvs[i-1] = &kvstore.KeyValue{
Key: []byte(fmt.Sprintf("/service/ver/1.%d", i)),
Value: fmt.Sprintf("%d", i),
}
@@ -236,14 +241,14 @@ func TestVersionRule(t *testing.T) {
assert.Equal(t, "1", results[9])
})
- t.Run("range3 ver in [1.4.1, 1.9.1]", func(t *testing.T) {
+ t.Run("range3 ver in [1.4.1, 1.9.1)", func(t *testing.T) {
results := VersionRule(Range).Match(kvs[:], "1.4.1", "1.9.1")
assert.Equal(t, 5, len(results))
assert.Equal(t, "9", results[0])
assert.Equal(t, "5", results[4])
})
- t.Run("range4 ver in [2, 4]", func(t *testing.T) {
+ t.Run("range4 ver in [2, 4)", func(t *testing.T) {
results := VersionRule(Range).Match(kvs[:], "2", "4")
assert.Equal(t, len(results), 0)
})
@@ -300,7 +305,7 @@ func TestVersionRule(t *testing.T) {
assert.Equal(t, fmt.Sprintf("%d", count), results[0])
})
- t.Run("range ver in[1.4, 1.8]", func(t *testing.T) {
+ t.Run("range ver in[1.4, 1.8)", func(t *testing.T) {
match := ParseVersionRule("1.4-1.8")
results := match(kvs[:])
assert.Equal(t, 4, len(results))
@@ -322,7 +327,7 @@ func TestVersionRule(t *testing.T) {
assert.Equal(t, true, VersionMatchRule("1.0", "latest"))
})
- t.Run("range ver in [1.4, 1.8]", func(t *testing.T) {
+ t.Run("range ver in [1.4, 1.8)", func(t *testing.T) {
assert.Equal(t, true, VersionMatchRule("1.4", "1.4-1.8"))
assert.Equal(t, true, VersionMatchRule("1.6", "1.4-1.8"))
assert.Equal(t, false, VersionMatchRule("1.8", "1.4-1.8"))
@@ -336,3 +341,43 @@ func TestVersionRule(t *testing.T) {
assert.Equal(t, false, VersionMatchRule("1.0", "1.6+"))
})
}
+
+func TestSort(t *testing.T) {
+ type args struct {
+ kvs []*kvstore.KeyValue
+ cmp func(start, end string) bool
+ }
+ tests := []struct {
+ name string
+ args args
+ want []*kvstore.KeyValue
+ }{
+ {"sort asc order", args{kvs: []*kvstore.KeyValue{
+ {Key: []byte("/svc/1.1.0"), Value: "1"},
+ {Key: []byte("/svc/2.0.1"), Value: "2"},
+ {Key: []byte("/svc/1.0.0"), Value: "0"},
+ }, cmp: LessEqual}, []*kvstore.KeyValue{
+ {Key: []byte("/svc/1.0.0"), Value: "0"},
+ {Key: []byte("/svc/1.1.0"), Value: "1"},
+ {Key: []byte("/svc/2.0.1"), Value: "2"},
+ }},
+ {"sort desc order", args{kvs: []*kvstore.KeyValue{
+ {Key: []byte("/svc/1.1.0"), Value: "1"},
+ {Key: []byte("/svc/2.0.1"), Value: "2"},
+ {Key: []byte("/svc/1.0.0"), Value: "0"},
+ }, cmp: Larger}, []*kvstore.KeyValue{
+ {Key: []byte("/svc/2.0.1"), Value: "2"},
+ {Key: []byte("/svc/1.1.0"), Value: "1"},
+ {Key: []byte("/svc/1.0.0"), Value: "0"},
+ }},
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ got := tt.args.kvs
+ Sort(got, tt.args.cmp)
+ if !reflect.DeepEqual(got, tt.want) {
+ t.Errorf("Sort() = %v, want %v", got, tt.want)
+ }
+ })
+ }
+}
diff --git a/datasource/etcd/etcd_suite_test.go b/datasource/mongo/retire.go
similarity index 58%
copy from datasource/etcd/etcd_suite_test.go
copy to datasource/mongo/retire.go
index b0ad473..0ffc9a9 100644
--- a/datasource/etcd/etcd_suite_test.go
+++ b/datasource/mongo/retire.go
@@ -14,34 +14,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package etcd_test
-// initialize
+package mongo
+
import (
"context"
- _ "github.com/apache/servicecomb-service-center/test"
-
"github.com/apache/servicecomb-service-center/datasource"
- . "github.com/onsi/ginkgo"
- "github.com/onsi/ginkgo/reporters"
-
- "testing"
- "time"
-
- . "github.com/onsi/gomega"
)
-var timeLimit = 2 * time.Second
-
-var _ = BeforeSuite(func() {
- //clear service created in last test
- time.Sleep(timeLimit)
- _ = datasource.GetSCManager().ClearNoInstanceServices(context.Background(), timeLimit)
-})
-
-func TestEtcd(t *testing.T) {
- RegisterFailHandler(Fail)
- junitReporter := reporters.NewJUnitReporter("etcd.junit.xml")
- RunSpecsWithDefaultAndCustomReporters(t, "etcd Suite", []Reporter{junitReporter})
+func (ds *MetadataManager) RetireService(ctx context.Context, plan *datasource.RetirePlan) error {
+ panic("implement me")
}
diff --git a/datasource/ms.go b/datasource/ms.go
index 8ef2644..0e1f481 100644
--- a/datasource/ms.go
+++ b/datasource/ms.go
@@ -100,4 +100,8 @@ type MetadataManager interface {
GetTags(ctx context.Context, request *pb.GetServiceTagsRequest) (*pb.GetServiceTagsResponse, error)
UpdateTag(ctx context.Context, request *pb.UpdateServiceTagRequest) (*pb.UpdateServiceTagResponse, error)
DeleteTags(ctx context.Context, request *pb.DeleteServiceTagsRequest) (*pb.DeleteServiceTagsResponse, error)
+
+ // RetireService retire the 'RetirePlan.Reserve' latest versions for each of service,
+ // delete other versions which doesn't register any instances.
+ RetireService(ctx context.Context, plan *RetirePlan) error
}
diff --git a/datasource/etcd/mux/mux.go b/datasource/retire.go
similarity index 55%
copy from datasource/etcd/mux/mux.go
copy to datasource/retire.go
index 6f3a33e..47f0d2d 100644
--- a/datasource/etcd/mux/mux.go
+++ b/datasource/retire.go
@@ -15,35 +15,16 @@
* limitations under the License.
*/
-package mux
+package datasource
-import (
- "reflect"
- "unsafe"
+import "time"
- "github.com/apache/servicecomb-service-center/pkg/etcdsync"
-)
-
-type Type string
-
-func (m *Type) String() (s string) {
- pMT := (*reflect.StringHeader)(unsafe.Pointer(m))
- pStr := (*reflect.StringHeader)(unsafe.Pointer(&s))
- pStr.Data = pMT.Data
- pStr.Len = pMT.Len
- return
-}
-
-const (
- GlobalLock Type = "/cse-sr/lock/global"
- DepQueueLock Type = "/cse-sr/lock/dep-queue"
- ServiceClearLock Type = "/cse-sr/lock/service-clear"
-)
-
-func Lock(t Type) (*etcdsync.DLock, error) {
- return etcdsync.Lock(t.String(), -1, true)
+type RetirePlan struct {
+ Interval time.Duration `json:"interval,omitempty"`
+ Reserve int `json:"reserve,omitempty"`
+ LastRunAt int64 `json:"lastRunAt,omitempty" bson:"last_run_at"`
}
-func Try(t Type) (*etcdsync.DLock, error) {
- return etcdsync.Lock(t.String(), -1, false)
+func (r *RetirePlan) ShouldRetire() bool {
+ return time.Now().Add(-r.Interval).Unix() >= r.LastRunAt
}
diff --git a/etc/conf/app.yaml b/etc/conf/app.yaml
index 5315202..cae4628 100644
--- a/etc/conf/app.yaml
+++ b/etc/conf/app.yaml
@@ -129,7 +129,14 @@ registry:
queueSize: 0
service:
+ # globalVisible is the global microservice name list
globalVisible:
+ # retire the 'reserve' latest versions for each of service,
+ # delete other versions which doesn't register any instances.
+ retire:
+ disable: false
+ interval: 12h
+ reserve: 3
instance:
# By default, instance TTL = (times + 1) * interval
# if ttl > 0, the instance will always set this value, ignore the API request body
@@ -158,6 +165,7 @@ rbac:
publicKeyFile: ./public.key
releaseLockAfter: 15m # failure login attempt causes account blocking, that is block duration
scope: '*' # specify auth resource scope, can be account,role,service,service/schema,...
+
metrics:
# enable to start metrics gather
enable: true
diff --git a/server/bootstrap/bootstrap.go b/server/bootstrap/bootstrap.go
index d5574bf..23c292a 100644
--- a/server/bootstrap/bootstrap.go
+++ b/server/bootstrap/bootstrap.go
@@ -63,6 +63,7 @@ import (
//jobs
_ "github.com/apache/servicecomb-service-center/server/job/account"
+ _ "github.com/apache/servicecomb-service-center/server/job/disco"
"github.com/apache/servicecomb-service-center/pkg/log"
"github.com/apache/servicecomb-service-center/server/handler/accesslog"
diff --git a/server/job/disco/retire.go b/server/job/disco/retire.go
new file mode 100644
index 0000000..55e4fdf
--- /dev/null
+++ b/server/job/disco/retire.go
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package disco
+
+import (
+ "context"
+ "fmt"
+ "time"
+
+ "github.com/apache/servicecomb-service-center/datasource"
+ "github.com/apache/servicecomb-service-center/pkg/log"
+ "github.com/apache/servicecomb-service-center/server/config"
+ discosvc "github.com/apache/servicecomb-service-center/server/service/disco"
+ "github.com/go-chassis/foundation/gopool"
+)
+
+const (
+ defaultRetireMicroserviceInterval = 12 * time.Hour
+ defaultReserveVersionCount = 3
+)
+
+func init() {
+ startRetireServiceJob()
+}
+
+func startRetireServiceJob() {
+ disable := config.GetBool("registry.service.retire.disable", false)
+ if disable {
+ return
+ }
+
+ localPlan := &datasource.RetirePlan{
+ Interval: config.GetDuration("registry.service.retire.interval", defaultRetireMicroserviceInterval),
+ Reserve: config.GetInt("registry.service.retire.reserve", defaultReserveVersionCount),
+ }
+
+ log.Info(fmt.Sprintf("start retire microservice job, plan is %v", localPlan))
+ gopool.Go(func(ctx context.Context) {
+ tick := time.NewTicker(localPlan.Interval)
+ defer tick.Stop()
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ case <-tick.C:
+ err := discosvc.RetireService(ctx, localPlan)
+ if err != nil {
+ log.Error("retire microservice failed", err)
+ }
+ }
+ }
+ })
+}
diff --git a/datasource/etcd/etcd_suite_test.go b/server/service/disco/retire.go
similarity index 59%
copy from datasource/etcd/etcd_suite_test.go
copy to server/service/disco/retire.go
index b0ad473..9b64b26 100644
--- a/datasource/etcd/etcd_suite_test.go
+++ b/server/service/disco/retire.go
@@ -14,34 +14,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package etcd_test
-// initialize
+package disco
+
import (
"context"
- _ "github.com/apache/servicecomb-service-center/test"
-
"github.com/apache/servicecomb-service-center/datasource"
- . "github.com/onsi/ginkgo"
- "github.com/onsi/ginkgo/reporters"
-
- "testing"
- "time"
-
- . "github.com/onsi/gomega"
)
-var timeLimit = 2 * time.Second
-
-var _ = BeforeSuite(func() {
- //clear service created in last test
- time.Sleep(timeLimit)
- _ = datasource.GetSCManager().ClearNoInstanceServices(context.Background(), timeLimit)
-})
-
-func TestEtcd(t *testing.T) {
- RegisterFailHandler(Fail)
- junitReporter := reporters.NewJUnitReporter("etcd.junit.xml")
- RunSpecsWithDefaultAndCustomReporters(t, "etcd Suite", []Reporter{junitReporter})
+func RetireService(ctx context.Context, plan *datasource.RetirePlan) error {
+ return datasource.GetMetadataManager().RetireService(ctx, plan)
}
diff --git a/server/service/disco/retire_test.go b/server/service/disco/retire_test.go
new file mode 100644
index 0000000..ca778c9
--- /dev/null
+++ b/server/service/disco/retire_test.go
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package disco_test
+
+import (
+ "fmt"
+ "testing"
+
+ "github.com/apache/servicecomb-service-center/datasource"
+ discosvc "github.com/apache/servicecomb-service-center/server/service/disco"
+ "github.com/apache/servicecomb-service-center/test"
+ pb "github.com/go-chassis/cari/discovery"
+ "github.com/stretchr/testify/assert"
+)
+
+func TestRetireService(t *testing.T) {
+ if !test.IsETCD() {
+ return
+ }
+
+ const serviceIDPrefix = "TestRetireMicroservice"
+ ctx := getContext()
+
+ t.Run("normal case should return ok", func(t *testing.T) {
+ const count = 5
+ for i := 0; i < count; i++ {
+ idx := fmt.Sprintf("%d", i)
+ _, err := discosvc.RegisterService(ctx, &pb.CreateServiceRequest{
+ Service: &pb.MicroService{
+ ServiceId: serviceIDPrefix + idx,
+ ServiceName: serviceIDPrefix,
+ Version: "1.0." + idx,
+ },
+ })
+ assert.NoError(t, err)
+ }
+ defer func() {
+ for i := 0; i < count; i++ {
+ idx := fmt.Sprintf("%d", i)
+ discosvc.UnregisterService(ctx, &pb.DeleteServiceRequest{
+ ServiceId: serviceIDPrefix + idx,
+ })
+ }
+ }()
+
+ err := discosvc.RetireService(ctx, &datasource.RetirePlan{Interval: 0, Reserve: 1})
+ assert.NoError(t, err)
+
+ resp, err := datasource.GetMetadataManager().ListServiceDetail(ctx, &pb.GetServicesInfoRequest{
+ ServiceName: serviceIDPrefix,
+ })
+ assert.NoError(t, err)
+ assert.Equal(t, 1, len(resp.AllServicesDetail))
+ assert.Equal(t, serviceIDPrefix+"4", resp.AllServicesDetail[0].MicroService.ServiceId)
+ })
+}
diff --git a/test/test.go b/test/test.go
index ac4e73c..7f14d47 100644
--- a/test/test.go
+++ b/test/test.go
@@ -33,23 +33,29 @@ import (
)
func init() {
- t := archaius.Get("TEST_MODE")
- if t == nil {
- t = "etcd"
- }
+ var kind = "etcd"
archaius.Set("rbac.releaseLockAfter", "3s")
- if t == "etcd" {
+ if IsETCD() {
archaius.Set("registry.cache.mode", 0)
archaius.Set("discovery.kind", "etcd")
archaius.Set("registry.kind", "etcd")
} else {
archaius.Set("registry.heartbeat.kind", "checker")
+ kind = "mongo"
}
datasource.Init(datasource.Options{
Config: etcdadpt.Config{
- Kind: t.(string),
+ Kind: kind,
},
ReleaseAccountAfter: 3 * time.Second,
})
core.ServiceAPI = disco.AssembleResources()
}
+
+func IsETCD() bool {
+ t := archaius.Get("TEST_MODE")
+ if t == nil {
+ t = "etcd"
+ }
+ return t == "etcd"
+}