You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by li...@apache.org on 2021/06/30 09:52:21 UTC

[servicecomb-service-center] branch master updated: Add mongo account lock impl (#1079)

This is an automated email from the ASF dual-hosted git repository.

littlecui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-service-center.git


The following commit(s) were added to refs/heads/master by this push:
     new 3879911  Add mongo account lock impl (#1079)
3879911 is described below

commit 387991111977fbba9f6f612cf7087f2cccede202
Author: little-cui <su...@qq.com>
AuthorDate: Wed Jun 30 17:52:12 2021 +0800

    Add mongo account lock impl (#1079)
    
    * Add mongo account lock impl
    
    * Fix golangci-lint check failure
---
 datasource/account.go                  |  21 +++---
 datasource/account_test.go             |  36 ++++++++++
 datasource/etcd/account_lock.go        |  13 ++--
 datasource/mongo/account_lock.go       |  72 ++++++++++++++-----
 datasource/mongo/client/model/types.go | 100 +++++++++++++-------------
 datasource/mongo/db.go                 | 128 +++++++++++++++++++++++++++++++++
 datasource/mongo/mongo.go              | 125 --------------------------------
 datasource/mongo/sd/dep_cache.go       |   4 +-
 datasource/mongo/sd/index_func.go      |  22 +++---
 datasource/mongo/sd/instance_cache.go  |   4 +-
 datasource/mongo/sd/rule_cache.go      |   4 +-
 datasource/mongo/sd/service_cache.go   |   4 +-
 datasource/mongo/util/db.go            |  18 +++++
 13 files changed, 322 insertions(+), 229 deletions(-)

diff --git a/datasource/account.go b/datasource/account.go
index 1c04cd7..623bd72 100644
--- a/datasource/account.go
+++ b/datasource/account.go
@@ -25,15 +25,16 @@ import (
 )
 
 var (
-	ErrAccountDuplicated   = errors.New("account is duplicated")
-	ErrAccountCanNotEdit   = errors.New("account can not be edited")
-	ErrDLockNotFound       = errors.New("dlock not found")
-	ErrCannotReleaseLock   = errors.New("can not release account")
-	ErrAccountLockNotExist = errors.New("account not exist")
-	ErrDeleteAccountFailed = errors.New("failed to delete account")
-	ErrQueryAccountFailed  = errors.New("failed to query account")
-	ErrAccountNotExist     = errors.New("account not exist")
-	ErrRoleBindingExist    = errors.New("role is bind to account")
+	ErrAccountDuplicated      = errors.New("account is duplicated")
+	ErrAccountCanNotEdit      = errors.New("account can not be edited")
+	ErrDLockNotFound          = errors.New("dlock not found")
+	ErrCannotReleaseLock      = errors.New("can not release account")
+	ErrAccountLockNotExist    = errors.New("account lock not exist")
+	ErrDeleteAccountFailed    = errors.New("failed to delete account")
+	ErrQueryAccountFailed     = errors.New("failed to query account")
+	ErrQueryAccountLockFailed = errors.New("failed to query account lock")
+	ErrAccountNotExist        = errors.New("account not exist")
+	ErrRoleBindingExist       = errors.New("role is bind to account")
 )
 
 const (
@@ -59,5 +60,5 @@ type AccountLockManager interface {
 type AccountLock struct {
 	Key       string `json:"key,omitempty"`
 	Status    string `json:"status,omitempty"`
-	ReleaseAt int64  `json:"releaseAt,omitempty"`
+	ReleaseAt int64  `json:"releaseAt,omitempty" bson:"release_at"`
 }
diff --git a/datasource/account_test.go b/datasource/account_test.go
index 84b9301..153e87d 100644
--- a/datasource/account_test.go
+++ b/datasource/account_test.go
@@ -22,6 +22,7 @@ package datasource_test
 import (
 	"context"
 	"testing"
+	"time"
 
 	"github.com/go-chassis/cari/rbac"
 
@@ -92,3 +93,38 @@ func TestAccount(t *testing.T) {
 		assert.NoError(t, err)
 	})
 }
+
+func TestAccountLock(t *testing.T) {
+	t.Run("ban account TestAccountLock, should return no error", func(t *testing.T) {
+		err := datasource.GetAccountLockManager().Ban(context.Background(), "TestAccountLock")
+		assert.NoError(t, err)
+
+		lock, err := datasource.GetAccountLockManager().GetLock(context.Background(), "TestAccountLock")
+		assert.NoError(t, err)
+		assert.Equal(t, datasource.StatusBanned, lock.Status)
+		assert.Less(t, time.Now().Unix(), lock.ReleaseAt)
+	})
+
+	t.Run("ban account TestAccountLock again, should refresh releaseAt", func(t *testing.T) {
+		lock1, err := datasource.GetAccountLockManager().GetLock(context.Background(), "TestAccountLock")
+		assert.NoError(t, err)
+		assert.Equal(t, datasource.StatusBanned, lock1.Status)
+
+		time.Sleep(time.Second)
+		err = datasource.GetAccountLockManager().Ban(context.Background(), "TestAccountLock")
+		assert.NoError(t, err)
+
+		lock2, err := datasource.GetAccountLockManager().GetLock(context.Background(), "TestAccountLock")
+		assert.NoError(t, err)
+		assert.Less(t, lock1.ReleaseAt, lock2.ReleaseAt)
+	})
+
+	t.Run("delete account lock, should return no error", func(t *testing.T) {
+		err := datasource.GetAccountLockManager().DeleteLock(context.Background(), "TestAccountLock")
+		assert.NoError(t, err)
+
+		lock, err := datasource.GetAccountLockManager().GetLock(context.Background(), "TestAccountLock")
+		assert.Equal(t, datasource.ErrAccountLockNotExist, err)
+		assert.Nil(t, lock)
+	})
+}
diff --git a/datasource/etcd/account_lock.go b/datasource/etcd/account_lock.go
index 24cfd5c..8a88449 100644
--- a/datasource/etcd/account_lock.go
+++ b/datasource/etcd/account_lock.go
@@ -43,23 +43,20 @@ func (al AccountLockManager) GetLock(ctx context.Context, key string) (*datasour
 	lock := &datasource.AccountLock{}
 	err = json.Unmarshal(resp.Kvs[0].Value, lock)
 	if err != nil {
-		log.Errorf(err, "format invalid")
+		log.Error(fmt.Sprintf("key %s format invalid", key), err)
 		return nil, err
 	}
 	return lock, nil
 }
 
 func (al AccountLockManager) DeleteLock(ctx context.Context, key string) error {
-	ok, err := client.Delete(ctx, key)
+	_, err := client.Delete(ctx, path.GenerateAccountLockKey(key))
 	if err != nil {
-		log.Errorf(err, "remove lock failed")
+		log.Error(fmt.Sprintf("remove lock %s failed", key), err)
 		return datasource.ErrCannotReleaseLock
 	}
-	if ok {
-		log.Info(fmt.Sprintf("%s is released", key))
-		return nil
-	}
-	return datasource.ErrCannotReleaseLock
+	log.Info(fmt.Sprintf("%s is released", key))
+	return nil
 }
 
 func NewAccountLockManager(ReleaseAfter time.Duration) datasource.AccountLockManager {
diff --git a/datasource/mongo/account_lock.go b/datasource/mongo/account_lock.go
index 7d21333..a373a9c 100644
--- a/datasource/mongo/account_lock.go
+++ b/datasource/mongo/account_lock.go
@@ -18,43 +18,77 @@ package mongo
 import (
 	"context"
 	"fmt"
-	"sync"
 	"time"
 
 	"github.com/apache/servicecomb-service-center/datasource"
+	"github.com/apache/servicecomb-service-center/datasource/mongo/client"
+	"github.com/apache/servicecomb-service-center/datasource/mongo/client/model"
+	mutil "github.com/apache/servicecomb-service-center/datasource/mongo/util"
 	"github.com/apache/servicecomb-service-center/pkg/log"
+	"go.mongodb.org/mongo-driver/mongo"
+	"go.mongodb.org/mongo-driver/mongo/options"
 )
 
 type AccountLockManager struct {
 	releaseAfter time.Duration
-	locks        sync.Map
 }
 
 func (al *AccountLockManager) GetLock(ctx context.Context, key string) (*datasource.AccountLock, error) {
-	l, ok := al.locks.Load(key)
-	if !ok {
-		log.Debug(fmt.Sprintf("%s is not locked", key))
-		return nil, datasource.ErrAccountLockNotExist
+	filter := mutil.NewFilter(mutil.AccountLockKey(key))
+	result, err := client.GetMongoClient().FindOne(ctx, model.CollectionAccountLock, filter)
+	if err != nil {
+		return nil, err
 	}
-	return l.(*datasource.AccountLock), nil
+	if err = result.Err(); err != nil {
+		if err == mongo.ErrNoDocuments {
+			return nil, datasource.ErrAccountLockNotExist
+		}
+		msg := fmt.Sprintf("failed to query account lock, key %s", key)
+		log.Error(msg, result.Err())
+		return nil, datasource.ErrQueryAccountLockFailed
+	}
+	var lock datasource.AccountLock
+	err = result.Decode(&lock)
+	if err != nil {
+		log.Error(fmt.Sprintf("failed to decode account lock %s", key), err)
+		return nil, err
+	}
+	return &lock, nil
 }
 
 func (al *AccountLockManager) DeleteLock(ctx context.Context, key string) error {
-	al.locks.Delete(key)
-	log.Warn(fmt.Sprintf("%s is released", key))
+	filter := mutil.NewFilter(mutil.AccountLockKey(key))
+	_, err := client.GetMongoClient().Delete(ctx, model.CollectionAccountLock, filter)
+	if err != nil {
+		log.Error(fmt.Sprintf("remove lock %s failed", key), err)
+		return datasource.ErrCannotReleaseLock
+	}
+	log.Info(fmt.Sprintf("%s is released", key))
 	return nil
 }
 
-func NewAccountLockManager(ReleaseAfter time.Duration) datasource.AccountLockManager {
-	return &AccountLockManager{releaseAfter: ReleaseAfter}
-}
-
 func (al *AccountLockManager) Ban(ctx context.Context, key string) error {
-	l := &datasource.AccountLock{}
-	l.Key = key
-	l.Status = datasource.StatusBanned
-	l.ReleaseAt = time.Now().Add(al.releaseAfter).Unix()
-	al.locks.Store(key, l)
-	log.Warn(fmt.Sprintf("%s is locked, release at %d", key, l.ReleaseAt))
+	releaseAt := time.Now().Add(al.releaseAfter).Unix()
+	filter := mutil.NewFilter(mutil.AccountLockKey(key))
+	updateFilter := mutil.NewFilter(mutil.Set(mutil.NewFilter(
+		mutil.AccountLockKey(key),
+		mutil.AccountLockStatus(datasource.StatusBanned),
+		mutil.AccountLockReleaseAt(releaseAt),
+	)))
+	result, err := client.GetMongoClient().FindOneAndUpdate(ctx, model.CollectionAccountLock, filter, updateFilter,
+		options.FindOneAndUpdate().SetUpsert(true))
+	if err != nil {
+		log.Error(fmt.Sprintf("can not save account lock %s", key), err)
+		return err
+	}
+	if result.Err() != nil && result.Err() != mongo.ErrNoDocuments {
+		log.Error(fmt.Sprintf("can not save account lock %s", key), result.Err())
+		return result.Err()
+	}
+	log.Info(fmt.Sprintf("%s is locked, release at %d", key, releaseAt))
 	return nil
 }
+
+func NewAccountLockManager(ReleaseAfter time.Duration) datasource.AccountLockManager {
+	return &AccountLockManager{releaseAfter: ReleaseAfter}
+}
diff --git a/datasource/mongo/client/model/types.go b/datasource/mongo/client/model/types.go
index bcaa11a..39ea6be 100644
--- a/datasource/mongo/client/model/types.go
+++ b/datasource/mongo/client/model/types.go
@@ -24,57 +24,61 @@ import (
 )
 
 const (
-	CollectionAccount  = "account"
-	CollectionService  = "service"
-	CollectionSchema   = "schema"
-	CollectionRule     = "rule"
-	CollectionInstance = "instance"
-	CollectionDep      = "dependency"
-	CollectionRole     = "role"
-	CollectionDomain   = "domain"
-	CollectionProject  = "project"
+	CollectionAccount     = "account"
+	CollectionAccountLock = "account_lock"
+	CollectionService     = "service"
+	CollectionSchema      = "schema"
+	CollectionRule        = "rule"
+	CollectionInstance    = "instance"
+	CollectionDep         = "dependency"
+	CollectionRole        = "role"
+	CollectionDomain      = "domain"
+	CollectionProject     = "project"
 )
 
 const (
-	ColumnDomain              = "domain"
-	ColumnProject             = "project"
-	ColumnTag                 = "tags"
-	ColumnSchemaID            = "schema_id"
-	ColumnServiceID           = "service_id"
-	ColumnRuleID              = "rule_id"
-	ColumnService             = "service"
-	ColumnProperty            = "properties"
-	ColumnModTime             = "mod_timestamp"
-	ColumnEnv                 = "env"
-	ColumnAppID               = "app"
-	ColumnServiceName         = "service_name"
-	ColumnAlias               = "alias"
-	ColumnVersion             = "version"
-	ColumnSchemas             = "schemas"
-	ColumnAttribute           = "attribute"
-	ColumnPattern             = "pattern"
-	ColumnDescription         = "description"
-	ColumnRuleType            = "rule_type"
-	ColumnSchema              = "schema"
-	ColumnSchemaSummary       = "schema_summary"
-	ColumnDep                 = "dep"
-	ColumnDependency          = "dependency"
-	ColumnRule                = "rule"
-	ColumnInstance            = "instance"
-	ColumnInstanceID          = "instance_id"
-	ColumnTenant              = "tenant"
-	ColumnServiceType         = "type"
-	ColumnServiceKey          = "service_key"
-	ColumnID                  = "id"
-	ColumnAccountName         = "name"
-	ColumnRoleName            = "name"
-	ColumnPerms               = "perms"
-	ColumnPassword            = "password"
-	ColumnRoles               = "roles"
-	ColumnTokenExpirationTime = "token_expiration_time"
-	ColumnCurrentPassword     = "current_password"
-	ColumnStatus              = "status"
-	ColumnRefreshTime         = "refresh_time"
+	ColumnDomain               = "domain"
+	ColumnProject              = "project"
+	ColumnTag                  = "tags"
+	ColumnSchemaID             = "schema_id"
+	ColumnServiceID            = "service_id"
+	ColumnRuleID               = "rule_id"
+	ColumnService              = "service"
+	ColumnProperty             = "properties"
+	ColumnModTime              = "mod_timestamp"
+	ColumnEnv                  = "env"
+	ColumnAppID                = "app"
+	ColumnServiceName          = "service_name"
+	ColumnAlias                = "alias"
+	ColumnVersion              = "version"
+	ColumnSchemas              = "schemas"
+	ColumnAttribute            = "attribute"
+	ColumnPattern              = "pattern"
+	ColumnDescription          = "description"
+	ColumnRuleType             = "rule_type"
+	ColumnSchema               = "schema"
+	ColumnSchemaSummary        = "schema_summary"
+	ColumnDep                  = "dep"
+	ColumnDependency           = "dependency"
+	ColumnRule                 = "rule"
+	ColumnInstance             = "instance"
+	ColumnInstanceID           = "instance_id"
+	ColumnTenant               = "tenant"
+	ColumnServiceType          = "type"
+	ColumnServiceKey           = "service_key"
+	ColumnID                   = "id"
+	ColumnAccountName          = "name"
+	ColumnRoleName             = "name"
+	ColumnPerms                = "perms"
+	ColumnPassword             = "password"
+	ColumnRoles                = "roles"
+	ColumnTokenExpirationTime  = "token_expiration_time"
+	ColumnCurrentPassword      = "current_password"
+	ColumnStatus               = "status"
+	ColumnRefreshTime          = "refresh_time"
+	ColumnAccountLockKey       = "key"
+	ColumnAccountLockStatus    = "status"
+	ColumnAccountLockReleaseAt = "release_at"
 )
 
 type Service struct {
diff --git a/datasource/mongo/db.go b/datasource/mongo/db.go
new file mode 100644
index 0000000..0d0fdb6
--- /dev/null
+++ b/datasource/mongo/db.go
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package mongo
+
+import (
+	"context"
+	"fmt"
+
+	"github.com/apache/servicecomb-service-center/datasource/mongo/client"
+	"github.com/apache/servicecomb-service-center/datasource/mongo/client/model"
+	mutil "github.com/apache/servicecomb-service-center/datasource/mongo/util"
+	"github.com/apache/servicecomb-service-center/pkg/log"
+	"github.com/apache/servicecomb-service-center/pkg/util"
+	"go.mongodb.org/mongo-driver/mongo"
+	"go.mongodb.org/mongo-driver/mongo/options"
+)
+
+func EnsureDB() {
+	EnsureService()
+	EnsureInstance()
+	EnsureRule()
+	EnsureSchema()
+	EnsureDep()
+	EnsureAccountLock()
+}
+
+func EnsureService() {
+	serviceIDIndex := mutil.BuildIndexDoc(
+		model.ColumnDomain,
+		model.ColumnProject,
+		mutil.ConnectWithDot([]string{model.ColumnService, model.ColumnServiceID}))
+	serviceIDIndex.Options = options.Index().SetUnique(true)
+
+	serviceIndex := mutil.BuildIndexDoc(
+		mutil.ConnectWithDot([]string{model.ColumnService, model.ColumnAppID}),
+		mutil.ConnectWithDot([]string{model.ColumnService, model.ColumnServiceName}),
+		mutil.ConnectWithDot([]string{model.ColumnService, model.ColumnEnv}),
+		mutil.ConnectWithDot([]string{model.ColumnService, model.ColumnVersion}),
+		model.ColumnDomain,
+		model.ColumnProject)
+	serviceIndex.Options = options.Index().SetUnique(true)
+
+	var serviceIndexes []mongo.IndexModel
+	serviceIndexes = append(serviceIndexes, serviceIDIndex, serviceIndex)
+
+	EnsureCollection(model.CollectionService, serviceIndexes)
+}
+
+func EnsureInstance() {
+	instanceIndex := mutil.BuildIndexDoc(model.ColumnRefreshTime)
+	instanceIndex.Options = options.Index().SetExpireAfterSeconds(defaultExpireTime)
+
+	instanceServiceIndex := mutil.BuildIndexDoc(mutil.ConnectWithDot([]string{model.ColumnInstance, model.ColumnServiceID}))
+
+	var instanceIndexes []mongo.IndexModel
+	instanceIndexes = append(instanceIndexes, instanceIndex, instanceServiceIndex)
+
+	EnsureCollection(model.CollectionInstance, instanceIndexes)
+}
+
+func EnsureSchema() {
+	EnsureCollection(model.CollectionSchema, []mongo.IndexModel{mutil.BuildIndexDoc(
+		model.ColumnDomain,
+		model.ColumnProject,
+		model.ColumnServiceID)})
+}
+
+func EnsureRule() {
+	EnsureCollection(model.CollectionRule, []mongo.IndexModel{mutil.BuildIndexDoc(
+		model.ColumnDomain,
+		model.ColumnProject,
+		model.ColumnServiceID)})
+}
+
+func EnsureDep() {
+	EnsureCollection(model.CollectionDep, []mongo.IndexModel{mutil.BuildIndexDoc(
+		model.ColumnDomain,
+		model.ColumnProject,
+		model.ColumnServiceKey)})
+}
+
+func EnsureAccountLock() {
+	EnsureCollection(model.CollectionAccountLock, []mongo.IndexModel{
+		mutil.BuildIndexDoc(model.ColumnAccountLockKey)})
+}
+
+func EnsureCollection(col string, indexes []mongo.IndexModel) {
+	err := client.GetMongoClient().GetDB().CreateCollection(context.Background(), col, options.CreateCollection().SetValidator(nil))
+	wrapCreateCollectionError(err)
+
+	err = client.GetMongoClient().CreateIndexes(context.Background(), col, indexes)
+	wrapCreateIndexesError(err)
+}
+
+func wrapCreateCollectionError(err error) {
+	if err != nil {
+		if client.IsCollectionsExist(err) {
+			log.Warn(fmt.Sprintf("collection already exist, err type: %s", util.Reflect(err).FullName))
+			return
+		}
+		log.Fatal(fmt.Sprintf("failed to create collection with validation, err type: %s", util.Reflect(err).FullName), err)
+	}
+}
+
+func wrapCreateIndexesError(err error) {
+	if err != nil {
+		if client.IsDuplicateKey(err) {
+			log.Warn(fmt.Sprintf("indexes already exist, err type: %s", util.Reflect(err).FullName))
+			return
+		}
+		log.Fatal(fmt.Sprintf("failed to create indexes, err type: %s", util.Reflect(err).FullName), err)
+	}
+}
diff --git a/datasource/mongo/mongo.go b/datasource/mongo/mongo.go
index 5356e77..311eb19 100644
--- a/datasource/mongo/mongo.go
+++ b/datasource/mongo/mongo.go
@@ -18,21 +18,15 @@
 package mongo
 
 import (
-	"context"
 	"fmt"
 
 	"github.com/apache/servicecomb-service-center/datasource"
 	"github.com/apache/servicecomb-service-center/datasource/mongo/client"
-	"github.com/apache/servicecomb-service-center/datasource/mongo/client/model"
 	"github.com/apache/servicecomb-service-center/datasource/mongo/heartbeat"
 	"github.com/apache/servicecomb-service-center/datasource/mongo/sd"
-	mutil "github.com/apache/servicecomb-service-center/datasource/mongo/util"
 	"github.com/apache/servicecomb-service-center/pkg/log"
-	"github.com/apache/servicecomb-service-center/pkg/util"
 	"github.com/apache/servicecomb-service-center/server/config"
 	"github.com/go-chassis/go-chassis/v2/storage"
-	"go.mongodb.org/mongo-driver/mongo"
-	"go.mongodb.org/mongo-driver/mongo/options"
 )
 
 const defaultExpireTime = 300
@@ -152,125 +146,6 @@ func (ds *DataSource) initClient() error {
 	}
 }
 
-func EnsureDB() {
-	EnsureService()
-	EnsureInstance()
-	EnsureRule()
-	EnsureSchema()
-	EnsureDep()
-}
-
-func EnsureService() {
-	err := client.GetMongoClient().GetDB().CreateCollection(context.Background(), model.CollectionService, options.CreateCollection().SetValidator(nil))
-	wrapCreateCollectionError(err)
-
-	serviceIDIndex := mutil.BuildIndexDoc(
-		model.ColumnDomain,
-		model.ColumnProject,
-		mutil.ConnectWithDot([]string{model.ColumnService, model.ColumnServiceID}))
-	serviceIDIndex.Options = options.Index().SetUnique(true)
-
-	serviceIndex := mutil.BuildIndexDoc(
-		mutil.ConnectWithDot([]string{model.ColumnService, model.ColumnAppID}),
-		mutil.ConnectWithDot([]string{model.ColumnService, model.ColumnServiceName}),
-		mutil.ConnectWithDot([]string{model.ColumnService, model.ColumnEnv}),
-		mutil.ConnectWithDot([]string{model.ColumnService, model.ColumnVersion}),
-		model.ColumnDomain,
-		model.ColumnProject)
-	serviceIndex.Options = options.Index().SetUnique(true)
-
-	var serviceIndexs []mongo.IndexModel
-	serviceIndexs = append(serviceIndexs, serviceIDIndex, serviceIndex)
-
-	err = client.GetMongoClient().CreateIndexes(context.Background(), model.CollectionService, serviceIndexs)
-	wrapCreateIndexesError(err)
-}
-
-func EnsureInstance() {
-	err := client.GetMongoClient().GetDB().CreateCollection(context.Background(), model.CollectionInstance, options.CreateCollection().SetValidator(nil))
-	wrapCreateCollectionError(err)
-
-	instanceIndex := mutil.BuildIndexDoc(model.ColumnRefreshTime)
-	instanceIndex.Options = options.Index().SetExpireAfterSeconds(defaultExpireTime)
-
-	instanceServiceIndex := mutil.BuildIndexDoc(mutil.ConnectWithDot([]string{model.ColumnInstance, model.ColumnServiceID}))
-
-	var instanceIndexs []mongo.IndexModel
-	instanceIndexs = append(instanceIndexs, instanceIndex, instanceServiceIndex)
-
-	err = client.GetMongoClient().CreateIndexes(context.Background(), model.CollectionInstance, instanceIndexs)
-	wrapCreateIndexesError(err)
-}
-
-func EnsureSchema() {
-	err := client.GetMongoClient().GetDB().CreateCollection(context.Background(), model.CollectionSchema, options.CreateCollection().SetValidator(nil))
-	wrapCreateCollectionError(err)
-
-	schemaServiceIndex := mutil.BuildIndexDoc(
-		model.ColumnDomain,
-		model.ColumnProject,
-		model.ColumnServiceID)
-
-	var schemaIndexs []mongo.IndexModel
-	schemaIndexs = append(schemaIndexs, schemaServiceIndex)
-
-	err = client.GetMongoClient().CreateIndexes(context.Background(), model.CollectionSchema, schemaIndexs)
-	wrapCreateIndexesError(err)
-}
-
-func EnsureRule() {
-	err := client.GetMongoClient().GetDB().CreateCollection(context.Background(), model.CollectionRule, options.CreateCollection().SetValidator(nil))
-	wrapCreateCollectionError(err)
-
-	ruleServiceIndex := mutil.BuildIndexDoc(
-		model.ColumnDomain,
-		model.ColumnProject,
-		model.ColumnServiceID)
-
-	var ruleIndexs []mongo.IndexModel
-	ruleIndexs = append(ruleIndexs, ruleServiceIndex)
-
-	err = client.GetMongoClient().CreateIndexes(context.Background(), model.CollectionRule, ruleIndexs)
-	wrapCreateIndexesError(err)
-}
-
-func EnsureDep() {
-	err := client.GetMongoClient().GetDB().CreateCollection(context.Background(), model.CollectionDep, options.CreateCollection().SetValidator(nil))
-	wrapCreateCollectionError(err)
-
-	depServiceIndex := mutil.BuildIndexDoc(
-		model.ColumnDomain,
-		model.ColumnProject,
-		model.ColumnServiceKey)
-
-	var depIndexs []mongo.IndexModel
-	depIndexs = append(depIndexs, depServiceIndex)
-
-	err = client.GetMongoClient().CreateIndexes(context.Background(), model.CollectionDep, depIndexs)
-	if err != nil {
-		log.Fatal("failed to create dep collection indexs", err)
-		return
-	}
-}
-
-func wrapCreateCollectionError(err error) {
-	if err != nil {
-		if client.IsCollectionsExist(err) {
-			return
-		}
-		log.Fatal(fmt.Sprintf("failed to create collection with validation, err type: %s", util.Reflect(err).FullName), err)
-	}
-}
-
-func wrapCreateIndexesError(err error) {
-	if err != nil {
-		if client.IsDuplicateKey(err) {
-			return
-		}
-		log.Fatal(fmt.Sprintf("failed to create indexes, err type: %s", util.Reflect(err).FullName), err)
-	}
-}
-
 func (ds *DataSource) initStore() {
 	if !config.GetRegistry().EnableCache {
 		log.Debug("cache is disabled")
diff --git a/datasource/mongo/sd/dep_cache.go b/datasource/mongo/sd/dep_cache.go
index a231b52..33e4d1f 100644
--- a/datasource/mongo/sd/dep_cache.go
+++ b/datasource/mongo/sd/dep_cache.go
@@ -125,7 +125,7 @@ func (s *depStore) ProcessUpdate(event MongoEvent) {
 	}
 	// set the document data.
 	s.concurrentMap.Set(event.DocumentID, event.Value)
-	for _, index := range DepIndexCols.GetIndexs(dep) {
+	for _, index := range DepIndexCols.GetIndexes(dep) {
 		// set the index sets.
 		s.indexSets.Put(index, event.DocumentID)
 	}
@@ -141,7 +141,7 @@ func (s *depStore) ProcessDelete(event MongoEvent) {
 		return
 	}
 	s.concurrentMap.Remove(event.DocumentID)
-	for _, index := range DepIndexCols.GetIndexs(dep) {
+	for _, index := range DepIndexCols.GetIndexes(dep) {
 		s.indexSets.Delete(index, event.DocumentID)
 	}
 }
diff --git a/datasource/mongo/sd/index_func.go b/datasource/mongo/sd/index_func.go
index 8399653..b6cbe7e 100644
--- a/datasource/mongo/sd/index_func.go
+++ b/datasource/mongo/sd/index_func.go
@@ -17,26 +17,26 @@
 
 package sd
 
-type indexFunc func(interface{}) string
+type IndexFunc func(interface{}) string
 
-type indexCols struct {
-	indexFuncs []indexFunc
+type IndexCols struct {
+	indexFuncs []IndexFunc
 }
 
-var DepIndexCols *indexCols
-var InstIndexCols *indexCols
-var ServiceIndexCols *indexCols
-var RuleIndexCols *indexCols
+var DepIndexCols *IndexCols
+var InstIndexCols *IndexCols
+var ServiceIndexCols *IndexCols
+var RuleIndexCols *IndexCols
 
-func NewIndexCols() *indexCols {
-	return &indexCols{indexFuncs: make([]indexFunc, 0)}
+func NewIndexCols() *IndexCols {
+	return &IndexCols{indexFuncs: make([]IndexFunc, 0)}
 }
 
-func (i *indexCols) AddIndexFunc(f indexFunc) {
+func (i *IndexCols) AddIndexFunc(f IndexFunc) {
 	i.indexFuncs = append(i.indexFuncs, f)
 }
 
-func (i *indexCols) GetIndexs(data interface{}) (res []string) {
+func (i *IndexCols) GetIndexes(data interface{}) (res []string) {
 	for _, f := range i.indexFuncs {
 		index := f(data)
 		if len(index) != 0 {
diff --git a/datasource/mongo/sd/instance_cache.go b/datasource/mongo/sd/instance_cache.go
index 8d58594..8ff80bb 100644
--- a/datasource/mongo/sd/instance_cache.go
+++ b/datasource/mongo/sd/instance_cache.go
@@ -124,7 +124,7 @@ func (s *instanceStore) ProcessUpdate(event MongoEvent) {
 	}
 	// set the document data.
 	s.concurrentMap.Set(event.DocumentID, event.Value)
-	for _, index := range InstIndexCols.GetIndexs(instData) {
+	for _, index := range InstIndexCols.GetIndexes(instData) {
 		// set the index sets.
 		s.indexSets.Put(index, event.DocumentID)
 	}
@@ -140,7 +140,7 @@ func (s *instanceStore) ProcessDelete(event MongoEvent) {
 		return
 	}
 	s.concurrentMap.Remove(event.DocumentID)
-	for _, index := range InstIndexCols.GetIndexs(instanceData) {
+	for _, index := range InstIndexCols.GetIndexes(instanceData) {
 		s.indexSets.Delete(index, event.DocumentID)
 	}
 }
diff --git a/datasource/mongo/sd/rule_cache.go b/datasource/mongo/sd/rule_cache.go
index 7372c60..8e669c6 100644
--- a/datasource/mongo/sd/rule_cache.go
+++ b/datasource/mongo/sd/rule_cache.go
@@ -124,7 +124,7 @@ func (s *ruleStore) ProcessUpdate(event MongoEvent) {
 	}
 	// set the document data.
 	s.concurrentMap.Set(event.DocumentID, event.Value)
-	for _, index := range RuleIndexCols.GetIndexs(ruleData) {
+	for _, index := range RuleIndexCols.GetIndexes(ruleData) {
 		// set the index sets.
 		s.indexSets.Put(index, event.DocumentID)
 	}
@@ -140,7 +140,7 @@ func (s *ruleStore) ProcessDelete(event MongoEvent) {
 		return
 	}
 	s.concurrentMap.Remove(event.DocumentID)
-	for _, index := range RuleIndexCols.GetIndexs(ruleData) {
+	for _, index := range RuleIndexCols.GetIndexes(ruleData) {
 		s.indexSets.Delete(index, event.DocumentID)
 	}
 }
diff --git a/datasource/mongo/sd/service_cache.go b/datasource/mongo/sd/service_cache.go
index 01a11fc..218a56d 100644
--- a/datasource/mongo/sd/service_cache.go
+++ b/datasource/mongo/sd/service_cache.go
@@ -127,7 +127,7 @@ func (s *serviceStore) ProcessUpdate(event MongoEvent) {
 	}
 	// set the document data.
 	s.concurrentMap.Set(event.DocumentID, event.Value)
-	for _, index := range ServiceIndexCols.GetIndexs(serviceData) {
+	for _, index := range ServiceIndexCols.GetIndexes(serviceData) {
 		// set the index sets.
 		s.indexSets.Put(index, event.DocumentID)
 	}
@@ -143,7 +143,7 @@ func (s *serviceStore) ProcessDelete(event MongoEvent) {
 		return
 	}
 	s.concurrentMap.Remove(event.DocumentID)
-	for _, index := range ServiceIndexCols.GetIndexs(serviceMongo) {
+	for _, index := range ServiceIndexCols.GetIndexes(serviceMongo) {
 		s.indexSets.Delete(index, event.DocumentID)
 	}
 }
diff --git a/datasource/mongo/util/db.go b/datasource/mongo/util/db.go
index 7c97c0e..f58cc99 100644
--- a/datasource/mongo/util/db.go
+++ b/datasource/mongo/util/db.go
@@ -97,6 +97,24 @@ func Perms(perms []*rbac.Permission) Option {
 	}
 }
 
+func AccountLockKey(key interface{}) Option {
+	return func(filter bson.M) {
+		filter[model.ColumnAccountLockKey] = key
+	}
+}
+
+func AccountLockStatus(status interface{}) Option {
+	return func(filter bson.M) {
+		filter[model.ColumnAccountLockStatus] = status
+	}
+}
+
+func AccountLockReleaseAt(releaseAt interface{}) Option {
+	return func(filter bson.M) {
+		filter[model.ColumnAccountLockReleaseAt] = releaseAt
+	}
+}
+
 func In(data interface{}) Option {
 	return func(filter bson.M) {
 		filter["$in"] = data