You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by ti...@apache.org on 2021/12/17 06:59:41 UTC

[servicecomb-service-center] branch master updated: [feat] add mongo implementation of datasource in eventbase (#1178)

This is an automated email from the ASF dual-hosted git repository.

tianxiaoliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-service-center.git


The following commit(s) were added to refs/heads/master by this push:
     new c9e7294  [feat] add mongo implementation of datasource in eventbase (#1178)
c9e7294 is described below

commit c9e729496827bdfbfa96fc4ebb04cf6eda1c6176
Author: robotljw <79...@qq.com>
AuthorDate: Fri Dec 17 14:58:36 2021 +0800

    [feat] add mongo implementation of datasource in eventbase (#1178)
---
 eventbase/datasource/mongo/client/client.go        | 217 +++++++++++++++++++++
 eventbase/datasource/mongo/mongo.go                | 142 ++++++++++++++
 eventbase/datasource/mongo/task/task_dao.go        | 124 ++++++++++++
 eventbase/datasource/mongo/task/task_dao_test.go   | 143 ++++++++++++++
 .../datasource/mongo/tombstone/tombstone_dao.go    | 124 ++++++++++++
 .../mongo/tombstone/tombstone_dao_test.go          | 110 +++++++++++
 eventbase/datasource/mongo/types.go                |  34 ++++
 eventbase/go.mod                                   |  11 ++
 eventbase/go.sum                                   |  72 +++++++
 eventbase/test/test.go                             |   6 +-
 scripts/ut_test_in_docker.sh                       |   1 +
 11 files changed, 982 insertions(+), 2 deletions(-)

diff --git a/eventbase/datasource/mongo/client/client.go b/eventbase/datasource/mongo/client/client.go
new file mode 100644
index 0000000..dea67d7
--- /dev/null
+++ b/eventbase/datasource/mongo/client/client.go
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package client
+
+import (
+	"context"
+	"crypto/tls"
+	"crypto/x509"
+	"errors"
+	"fmt"
+	"io/ioutil"
+	"time"
+
+	"github.com/go-chassis/cari/db"
+	"github.com/go-chassis/foundation/gopool"
+	"github.com/go-chassis/openlog"
+	"go.mongodb.org/mongo-driver/mongo"
+	"go.mongodb.org/mongo-driver/mongo/options"
+
+	dmongo "servicecomb-service-center/eventbase/datasource/mongo"
+)
+
+const (
+	MongoCheckDelay     = 2 * time.Second
+	HeathChekRetryTimes = 3
+)
+
+var (
+	ErrOpenDbFailed  = errors.New("open db failed")
+	ErrRootCAMissing = errors.New("rootCAFile is empty in config file")
+)
+
+var client *MongoClient
+
+type MongoClient struct {
+	client *mongo.Client
+	db     *mongo.Database
+	config *db.Config
+
+	err       chan error
+	ready     chan struct{}
+	goroutine *gopool.Pool
+}
+
+func NewMongoClient(config *db.Config) {
+	inst := &MongoClient{}
+	if err := inst.Initialize(config); err != nil {
+		openlog.Error("failed to init mongodb" + err.Error())
+		inst.err <- err
+	}
+	client = inst
+}
+
+func (mc *MongoClient) Err() <-chan error {
+	return mc.err
+}
+
+func (mc *MongoClient) Ready() <-chan struct{} {
+	return mc.ready
+}
+
+func (mc *MongoClient) Close() {
+	if mc.client != nil {
+		if err := mc.client.Disconnect(context.TODO()); err != nil {
+			openlog.Error("[close mongo client] failed disconnect the mongo client" + err.Error())
+		}
+	}
+}
+
+func (mc *MongoClient) Initialize(config *db.Config) (err error) {
+	mc.err = make(chan error, 1)
+	mc.ready = make(chan struct{})
+	mc.goroutine = gopool.New()
+	mc.config = config
+	err = mc.newClient(context.Background())
+	if err != nil {
+		return
+	}
+	mc.startHealthCheck()
+	close(mc.ready)
+	return nil
+}
+
+func (mc *MongoClient) newClient(ctx context.Context) (err error) {
+	clientOptions := []*options.ClientOptions{options.Client().ApplyURI(mc.config.URI)}
+	clientOptions = append(clientOptions, options.Client().SetMaxPoolSize(uint64(mc.config.PoolSize)))
+	if mc.config.SSLEnabled {
+		if mc.config.RootCA == "" {
+			err = ErrRootCAMissing
+			return
+		}
+		pool := x509.NewCertPool()
+		caCert, err := ioutil.ReadFile(mc.config.RootCA)
+		if err != nil {
+			err = fmt.Errorf("read ca cert file %s failed", mc.config.RootCA)
+			openlog.Error("ca cert :" + err.Error())
+			return err
+		}
+		pool.AppendCertsFromPEM(caCert)
+		clientCerts := make([]tls.Certificate, 0)
+		if mc.config.CertFile != "" && mc.config.KeyFile != "" {
+			cert, err := tls.LoadX509KeyPair(mc.config.CertFile, mc.config.KeyFile)
+			if err != nil {
+				openlog.Error("load X509 keyPair failed: " + err.Error())
+				return err
+			}
+			clientCerts = append(clientCerts, cert)
+		}
+		tc := &tls.Config{
+			RootCAs:            pool,
+			InsecureSkipVerify: !mc.config.VerifyPeer,
+			Certificates:       clientCerts,
+		}
+		clientOptions = append(clientOptions, options.Client().SetTLSConfig(tc))
+		openlog.Info("enabled ssl communication to mongodb")
+	}
+	mc.client, err = mongo.Connect(ctx, clientOptions...)
+	if err != nil {
+		openlog.Error("failed to connect to mongo" + err.Error())
+		if derr := mc.client.Disconnect(ctx); derr != nil {
+			openlog.Error("[init mongo client] failed to disconnect mongo clients" + err.Error())
+		}
+		return
+	}
+	mc.db = mc.client.Database(dmongo.DBName)
+	if mc.db == nil {
+		return ErrOpenDbFailed
+	}
+	return nil
+}
+
+func (mc *MongoClient) startHealthCheck() {
+	mc.goroutine.Do(mc.HealthCheck)
+}
+
+func (mc *MongoClient) HealthCheck(ctx context.Context) {
+	for {
+		select {
+		case <-ctx.Done():
+			mc.Close()
+			return
+		case <-time.After(MongoCheckDelay):
+			for i := 0; i < HeathChekRetryTimes; i++ {
+				err := mc.client.Ping(context.Background(), nil)
+				if err == nil {
+					break
+				}
+				openlog.Error(fmt.Sprintf("retry to connect to mongodb %s after %s", mc.config.URI, MongoCheckDelay) + err.Error())
+				select {
+				case <-ctx.Done():
+					mc.Close()
+					return
+				case <-time.After(MongoCheckDelay):
+				}
+			}
+		}
+	}
+}
+
+func GetMongoClient() *MongoClient {
+	return client
+}
+
+// ExecTxn execute a transaction command
+// want to abort transaction, return error in cmd fn impl, otherwise it will commit transaction
+func (mc *MongoClient) ExecTxn(ctx context.Context, cmd func(sessionContext mongo.SessionContext) error) error {
+	session, err := mc.client.StartSession()
+	if err != nil {
+		return err
+	}
+	if err = session.StartTransaction(); err != nil {
+		return err
+	}
+	defer session.EndSession(ctx)
+	if err = mongo.WithSession(ctx, session, func(sc mongo.SessionContext) error {
+		if err = cmd(sc); err != nil {
+			if err = session.AbortTransaction(sc); err != nil {
+				return err
+			}
+		} else {
+			if err = session.CommitTransaction(sc); err != nil {
+				return err
+			}
+		}
+		return nil
+	}); err != nil {
+		return err
+	}
+	return nil
+}
+
+func (mc *MongoClient) GetDB() *mongo.Database {
+	return mc.db
+}
+
+func (mc *MongoClient) CreateIndexes(ctx context.Context, Table string, indexes []mongo.IndexModel) error {
+	_, err := mc.db.Collection(Table).Indexes().CreateMany(ctx, indexes)
+	if err != nil {
+		return err
+	}
+	return nil
+}
diff --git a/eventbase/datasource/mongo/mongo.go b/eventbase/datasource/mongo/mongo.go
new file mode 100644
index 0000000..d156a17
--- /dev/null
+++ b/eventbase/datasource/mongo/mongo.go
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package mongo
+
+import (
+	"strings"
+
+	"github.com/go-chassis/cari/db"
+	"github.com/go-chassis/openlog"
+	"go.mongodb.org/mongo-driver/bson"
+	"gopkg.in/mgo.v2"
+
+	"servicecomb-service-center/eventbase/datasource"
+	"servicecomb-service-center/eventbase/datasource/mongo/client"
+	"servicecomb-service-center/eventbase/datasource/mongo/task"
+	"servicecomb-service-center/eventbase/datasource/mongo/tombstone"
+)
+
+type Datasource struct {
+	taskDao   datasource.TaskDao
+	tombstone datasource.TombstoneDao
+}
+
+func (d *Datasource) TaskDao() datasource.TaskDao {
+	return d.taskDao
+}
+
+func (d *Datasource) TombstoneDao() datasource.TombstoneDao {
+	return d.tombstone
+}
+
+func NewDatasource(config *db.Config) (datasource.DataSource, error) {
+	inst := &Datasource{}
+	inst.taskDao = &task.Dao{}
+	inst.tombstone = &tombstone.Dao{}
+	return inst, inst.initialize(config)
+}
+
+func (d *Datasource) initialize(config *db.Config) error {
+	err := d.initClient(config)
+	if err != nil {
+		return err
+	}
+	ensureDB(config)
+	return nil
+}
+
+func (d *Datasource) initClient(config *db.Config) error {
+	client.NewMongoClient(config)
+	select {
+	case err := <-client.GetMongoClient().Err():
+		return err
+	case <-client.GetMongoClient().Ready():
+		return nil
+	}
+}
+
+func init() {
+	datasource.RegisterPlugin("mongo", NewDatasource)
+}
+
+func ensureDB(config *db.Config) {
+	session := openSession(config)
+	defer session.Close()
+	session.SetMode(mgo.Primary, true)
+
+	ensureTask(session)
+	ensureTombstone(session)
+}
+
+func openSession(c *db.Config) *mgo.Session {
+	timeout := c.Timeout
+	var err error
+	session, err := mgo.DialWithTimeout(c.URI, timeout)
+	if err != nil {
+		openlog.Warn("can not dial db, retry once:" + err.Error())
+		session, err = mgo.DialWithTimeout(c.URI, timeout)
+		if err != nil {
+			openlog.Fatal("can not dial db:" + err.Error())
+		}
+	}
+	return session
+}
+
+func wrapError(err error, skipMsg ...string) {
+	if err != nil {
+		for _, str := range skipMsg {
+			if strings.Contains(err.Error(), str) {
+				openlog.Debug(err.Error())
+				return
+			}
+		}
+		openlog.Error(err.Error())
+	}
+}
+
+func ensureTask(session *mgo.Session) {
+	c := session.DB(DBName).C(CollectionTask)
+	err := c.Create(&mgo.CollectionInfo{Validator: bson.M{
+		ColumnTaskID:    bson.M{"$exists": true},
+		ColumnDomain:    bson.M{"$exists": true},
+		ColumnProject:   bson.M{"$exists": true},
+		ColumnTimestamp: bson.M{"$exists": true},
+	}})
+	wrapError(err)
+	err = c.EnsureIndex(mgo.Index{
+		Key:    []string{ColumnDomain, ColumnProject, ColumnTaskID, ColumnTimestamp},
+		Unique: true,
+	})
+	wrapError(err)
+}
+
+func ensureTombstone(session *mgo.Session) {
+	c := session.DB(DBName).C(CollectionTombstone)
+	err := c.Create(&mgo.CollectionInfo{Validator: bson.M{
+		ColumnResourceID:   bson.M{"$exists": true},
+		ColumnDomain:       bson.M{"$exists": true},
+		ColumnProject:      bson.M{"$exists": true},
+		ColumnResourceType: bson.M{"$exists": true},
+	}})
+	wrapError(err)
+	err = c.EnsureIndex(mgo.Index{
+		Key:    []string{ColumnDomain, ColumnProject, ColumnResourceID, ColumnResourceType},
+		Unique: true,
+	})
+	wrapError(err)
+}
diff --git a/eventbase/datasource/mongo/task/task_dao.go b/eventbase/datasource/mongo/task/task_dao.go
new file mode 100644
index 0000000..7deef48
--- /dev/null
+++ b/eventbase/datasource/mongo/task/task_dao.go
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package task
+
+import (
+	"context"
+
+	"github.com/go-chassis/cari/sync"
+	"github.com/go-chassis/openlog"
+	"go.mongodb.org/mongo-driver/bson"
+	"go.mongodb.org/mongo-driver/mongo"
+	mopts "go.mongodb.org/mongo-driver/mongo/options"
+
+	"servicecomb-service-center/eventbase/datasource"
+	dmongo "servicecomb-service-center/eventbase/datasource/mongo"
+	"servicecomb-service-center/eventbase/datasource/mongo/client"
+)
+
+type Dao struct {
+}
+
+func (d *Dao) Create(ctx context.Context, task *sync.Task) (*sync.Task, error) {
+	collection := client.GetMongoClient().GetDB().Collection(dmongo.CollectionTask)
+	_, err := collection.InsertOne(ctx, task)
+	if err != nil {
+		openlog.Error("fail to create task" + err.Error())
+		return nil, err
+	}
+	return task, nil
+}
+
+func (d *Dao) Update(ctx context.Context, task *sync.Task) error {
+	collection := client.GetMongoClient().GetDB().Collection(dmongo.CollectionTask)
+	result, err := collection.UpdateOne(ctx,
+		bson.M{dmongo.ColumnTaskID: task.TaskID, dmongo.ColumnDomain: task.Domain, dmongo.ColumnProject: task.Project, dmongo.ColumnTimestamp: task.Timestamp},
+		bson.D{{Key: "$set", Value: bson.D{
+			{Key: dmongo.ColumnStatus, Value: task.Status}}},
+		})
+	if err != nil {
+		openlog.Error("fail to update task" + err.Error())
+		return err
+	}
+	if result.ModifiedCount == 0 {
+		openlog.Error("fail to update task" + datasource.ErrTaskNotExists.Error())
+		return datasource.ErrTaskNotExists
+	}
+	return nil
+}
+
+func (d *Dao) Delete(ctx context.Context, tasks ...*sync.Task) error {
+	tasksIDs := make([]string, len(tasks))
+	filter := bson.A{}
+	for i, task := range tasks {
+		tasksIDs[i] = task.TaskID
+		dFilter := bson.D{
+			{dmongo.ColumnDomain, task.Domain},
+			{dmongo.ColumnProject, task.Project},
+			{dmongo.ColumnTaskID, task.TaskID},
+			{dmongo.ColumnTimestamp, task.Timestamp},
+		}
+		filter = append(filter, dFilter)
+	}
+
+	var deleteFunc = func(sessionContext mongo.SessionContext) error {
+		collection := client.GetMongoClient().GetDB().Collection(dmongo.CollectionTask)
+		_, err := collection.DeleteMany(sessionContext, bson.M{"$or": filter})
+		return err
+	}
+	err := client.GetMongoClient().ExecTxn(ctx, deleteFunc)
+	if err != nil {
+		openlog.Error(err.Error())
+	}
+	return err
+}
+func (d *Dao) List(ctx context.Context, domain string, project string, options ...datasource.TaskFindOption) ([]*sync.Task, error) {
+	opts := datasource.NewTaskFindOptions()
+	for _, o := range options {
+		o(&opts)
+	}
+	collection := client.GetMongoClient().GetDB().Collection(dmongo.CollectionTask)
+	filter := bson.M{dmongo.ColumnDomain: domain, dmongo.ColumnProject: project}
+	if opts.Action != "" {
+		filter[dmongo.ColumnAction] = opts.Action
+	}
+	if opts.DataType != "" {
+		filter[dmongo.ColumnDataType] = opts.DataType
+	}
+	if opts.Status != "" {
+		filter[dmongo.ColumnStatus] = opts.Status
+	}
+	opt := mopts.Find().SetSort(map[string]interface{}{
+		dmongo.ColumnTimestamp: 1,
+	})
+	cur, err := collection.Find(ctx, filter, opt)
+	if err != nil {
+		return nil, err
+	}
+	defer cur.Close(ctx)
+	tasks := make([]*sync.Task, 0)
+	for cur.Next(ctx) {
+		task := &sync.Task{}
+		if err := cur.Decode(task); err != nil {
+			openlog.Error("decode to task error: " + err.Error())
+			return nil, err
+		}
+		tasks = append(tasks, task)
+	}
+	return tasks, nil
+}
diff --git a/eventbase/datasource/mongo/task/task_dao_test.go b/eventbase/datasource/mongo/task/task_dao_test.go
new file mode 100644
index 0000000..88c4cfb
--- /dev/null
+++ b/eventbase/datasource/mongo/task/task_dao_test.go
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package task_test
+
+import (
+	"context"
+	"testing"
+	"time"
+
+	"github.com/go-chassis/cari/db"
+	"github.com/go-chassis/cari/sync"
+	"github.com/stretchr/testify/assert"
+
+	"servicecomb-service-center/eventbase/datasource"
+	"servicecomb-service-center/eventbase/datasource/mongo"
+	"servicecomb-service-center/eventbase/test"
+)
+
+var ds datasource.DataSource
+
+func init() {
+	cfg := &db.Config{
+		Kind:    test.Mongo,
+		URI:     test.MongoURI,
+		Timeout: 10 * time.Second,
+	}
+	ds, _ = mongo.NewDatasource(cfg)
+}
+
+func TestTask(t *testing.T) {
+	var (
+		task = sync.Task{
+			TaskID:    "30b93187-2a38-49e3-ae99-1961b28329b0",
+			Action:    "create",
+			DataType:  "config",
+			Domain:    "default",
+			Project:   "default",
+			Timestamp: 1638171566,
+			Status:    "pending"}
+		taskTwo = sync.Task{
+			TaskID:    "40b93187-2a38-49e3-ae99-1961b28329b0",
+			Action:    "update",
+			DataType:  "config",
+			Domain:    "default",
+			Project:   "default",
+			Timestamp: 1638171567,
+			Status:    "done"}
+		taskThree = sync.Task{
+			TaskID:    "50b93187-2a38-49e3-ae99-1961b28329b0",
+			Action:    "update",
+			DataType:  "config",
+			Domain:    "default",
+			Project:   "default",
+			Timestamp: 1638171568,
+			Status:    "pending"}
+	)
+
+	t.Run("create task", func(t *testing.T) {
+		t.Run("create a task should pass", func(t *testing.T) {
+			_, err := ds.TaskDao().Create(context.Background(), &task)
+			assert.NoError(t, err)
+		})
+
+		t.Run("create a same task should fail", func(t *testing.T) {
+			_, err := ds.TaskDao().Create(context.Background(), &task)
+			assert.NotNil(t, err)
+		})
+
+		t.Run("create taskTwo and taskThree should pass", func(t *testing.T) {
+			_, err := ds.TaskDao().Create(context.Background(), &taskTwo)
+			assert.NoError(t, err)
+			_, err = ds.TaskDao().Create(context.Background(), &taskThree)
+			assert.NoError(t, err)
+		})
+	})
+
+	t.Run("update task", func(t *testing.T) {
+		t.Run("update a existing task should pass", func(t *testing.T) {
+			task.Status = "done"
+			err := ds.TaskDao().Update(context.Background(), &task)
+			assert.NoError(t, err)
+		})
+
+		t.Run("update a not existing task should fail", func(t *testing.T) {
+			notExistTask := sync.Task{
+				TaskID:    "not-exist",
+				Action:    "create",
+				DataType:  "config",
+				Domain:    "default",
+				Project:   "default",
+				Timestamp: 1638171568,
+				Status:    "pending",
+			}
+			err := ds.TaskDao().Update(context.Background(), &notExistTask)
+			assert.NotNil(t, err)
+		})
+	})
+
+	t.Run("list task", func(t *testing.T) {
+		t.Run("list task with action ,dataType and status should pass", func(t *testing.T) {
+			opts := []datasource.TaskFindOption{
+				datasource.WithAction(task.Action),
+				datasource.WithDataType(task.DataType),
+				datasource.WithStatus(task.Status),
+			}
+			tasks, err := ds.TaskDao().List(context.Background(), task.Domain, task.Project, opts...)
+			assert.NoError(t, err)
+			assert.Equal(t, 1, len(tasks))
+		})
+
+		t.Run("list task without action ,dataType and status should pass", func(t *testing.T) {
+			tasks, err := ds.TaskDao().List(context.Background(), "default", "default")
+			assert.NoError(t, err)
+			assert.Equal(t, 3, len(tasks))
+			assert.Equal(t, tasks[0].Timestamp, task.Timestamp)
+			assert.Equal(t, tasks[1].Timestamp, taskTwo.Timestamp)
+			assert.Equal(t, tasks[2].Timestamp, taskThree.Timestamp)
+		})
+
+	})
+
+	t.Run("delete task", func(t *testing.T) {
+		t.Run("delete tasks should pass", func(t *testing.T) {
+			err := ds.TaskDao().Delete(context.Background(), []*sync.Task{&task, &taskTwo, &taskThree}...)
+			assert.NoError(t, err)
+		})
+	})
+}
diff --git a/eventbase/datasource/mongo/tombstone/tombstone_dao.go b/eventbase/datasource/mongo/tombstone/tombstone_dao.go
new file mode 100644
index 0000000..2ef60f2
--- /dev/null
+++ b/eventbase/datasource/mongo/tombstone/tombstone_dao.go
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package tombstone
+
+import (
+	"context"
+
+	"github.com/go-chassis/cari/sync"
+	"github.com/go-chassis/openlog"
+	"go.mongodb.org/mongo-driver/bson"
+	"go.mongodb.org/mongo-driver/mongo"
+
+	"servicecomb-service-center/eventbase/datasource"
+	dmongo "servicecomb-service-center/eventbase/datasource/mongo"
+	"servicecomb-service-center/eventbase/datasource/mongo/client"
+	"servicecomb-service-center/eventbase/model"
+)
+
+type Dao struct {
+}
+
+func (d *Dao) Get(ctx context.Context, req *model.GetTombstoneRequest) (*sync.Tombstone, error) {
+	collection := client.GetMongoClient().GetDB().Collection(dmongo.CollectionTombstone)
+	filter := bson.M{dmongo.ColumnDomain: req.Domain, dmongo.ColumnProject: req.Project, dmongo.ColumnResourceType: req.ResourceType, dmongo.ColumnResourceID: req.ResourceID}
+	result := collection.FindOne(ctx, filter)
+	if result != nil && result.Err() != nil {
+		openlog.Error("fail to get tombstone" + result.Err().Error())
+		return nil, result.Err()
+	}
+	if result == nil {
+		openlog.Error(datasource.ErrTombstoneNotExists.Error())
+		return nil, datasource.ErrTombstoneNotExists
+	}
+	var tombstone sync.Tombstone
+
+	err := result.Decode(&tombstone)
+	if err != nil {
+		openlog.Error("fail to decode tombstone" + err.Error())
+		return nil, err
+	}
+	return &tombstone, nil
+}
+
+func (d *Dao) Create(ctx context.Context, tombstone *sync.Tombstone) (*sync.Tombstone, error) {
+	collection := client.GetMongoClient().GetDB().Collection(dmongo.CollectionTombstone)
+	_, err := collection.InsertOne(ctx, tombstone)
+	if err != nil {
+		openlog.Error("fail to create tombstone" + err.Error())
+		return nil, err
+	}
+	return tombstone, nil
+}
+
+func (d *Dao) Delete(ctx context.Context, tombstones ...*sync.Tombstone) error {
+	tombstonesIDs := make([]string, len(tombstones))
+	filter := bson.A{}
+	for i, tombstone := range tombstones {
+		tombstonesIDs[i] = tombstone.ResourceID
+		dFilter := bson.D{
+			{dmongo.ColumnResourceID, tombstone.ResourceID},
+			{dmongo.ColumnResourceType, tombstone.ResourceType},
+			{dmongo.ColumnDomain, tombstone.Domain},
+			{dmongo.ColumnProject, tombstone.Project},
+		}
+		filter = append(filter, dFilter)
+	}
+	var deleteFunc = func(sessionContext mongo.SessionContext) error {
+		collection := client.GetMongoClient().GetDB().Collection(dmongo.CollectionTombstone)
+		_, err := collection.DeleteMany(sessionContext, bson.M{"$or": filter})
+		return err
+	}
+	err := client.GetMongoClient().ExecTxn(ctx, deleteFunc)
+	if err != nil {
+		openlog.Error(err.Error())
+	}
+	return err
+}
+
+func (d *Dao) List(ctx context.Context, domain string, project string,
+	options ...datasource.TombstoneFindOption) ([]*sync.Tombstone, error) {
+	opts := datasource.NewTombstoneFindOptions()
+	for _, o := range options {
+		o(&opts)
+	}
+	collection := client.GetMongoClient().GetDB().Collection(dmongo.CollectionTombstone)
+	filter := bson.M{dmongo.ColumnDomain: domain, dmongo.ColumnProject: project}
+	if opts.ResourceType != "" {
+		filter[dmongo.ColumnResourceType] = opts.ResourceType
+	}
+	if opts.BeforeTimestamp != 0 {
+		filter[dmongo.ColumnTimestamp] = bson.M{"$lte": opts.BeforeTimestamp}
+	}
+	cur, err := collection.Find(ctx, filter)
+	if err != nil {
+		return nil, err
+	}
+	defer cur.Close(ctx)
+	tombstones := make([]*sync.Tombstone, 0)
+	for cur.Next(ctx) {
+		tombstone := &sync.Tombstone{}
+		if err := cur.Decode(tombstone); err != nil {
+			openlog.Error("decode to tombstone error: " + err.Error())
+			return nil, err
+		}
+		tombstones = append(tombstones, tombstone)
+	}
+	return tombstones, nil
+
+}
diff --git a/eventbase/datasource/mongo/tombstone/tombstone_dao_test.go b/eventbase/datasource/mongo/tombstone/tombstone_dao_test.go
new file mode 100644
index 0000000..5f346b1
--- /dev/null
+++ b/eventbase/datasource/mongo/tombstone/tombstone_dao_test.go
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package tombstone_test
+
+import (
+	"context"
+	"testing"
+	"time"
+
+	"github.com/go-chassis/cari/db"
+	"github.com/go-chassis/cari/sync"
+	"github.com/stretchr/testify/assert"
+
+	"servicecomb-service-center/eventbase/datasource"
+	"servicecomb-service-center/eventbase/datasource/mongo"
+	"servicecomb-service-center/eventbase/model"
+	"servicecomb-service-center/eventbase/test"
+)
+
+var ds datasource.DataSource
+
+func init() {
+	cfg := &db.Config{
+		Kind:    test.Mongo,
+		URI:     test.MongoURI,
+		Timeout: 10 * time.Second,
+	}
+	ds, _ = mongo.NewDatasource(cfg)
+}
+
+func TestTombstone(t *testing.T) {
+	var (
+		tombstoneOne = sync.Tombstone{
+			ResourceID:   "app/test",
+			ResourceType: "config",
+			Domain:       "default",
+			Project:      "default",
+			Timestamp:    1638171566,
+		}
+		tombstoneTwo = sync.Tombstone{
+			ResourceID:   "property/test",
+			ResourceType: "config",
+			Domain:       "default",
+			Project:      "default",
+			Timestamp:    1638171567,
+		}
+	)
+
+	t.Run("create tombstone", func(t *testing.T) {
+		t.Run("create two tombstone should pass", func(t *testing.T) {
+			tombstone, err := ds.TombstoneDao().Create(context.Background(), &tombstoneOne)
+			assert.NoError(t, err)
+			assert.NotNil(t, tombstone)
+			tombstone, err = ds.TombstoneDao().Create(context.Background(), &tombstoneTwo)
+			assert.NoError(t, err)
+			assert.NotNil(t, tombstone)
+		})
+	})
+
+	t.Run("get tombstone", func(t *testing.T) {
+		t.Run("get one tombstone should pass", func(t *testing.T) {
+			req := model.GetTombstoneRequest{
+				Domain:       tombstoneOne.Domain,
+				Project:      tombstoneOne.Project,
+				ResourceType: tombstoneOne.ResourceType,
+				ResourceID:   tombstoneOne.ResourceID,
+			}
+			tombstone, err := ds.TombstoneDao().Get(context.Background(), &req)
+			assert.NoError(t, err)
+			assert.Equal(t, tombstone.Timestamp, tombstoneOne.Timestamp)
+		})
+	})
+
+	t.Run("list tombstone", func(t *testing.T) {
+		t.Run("list tombstone with ResourceType and BeforeTimestamp should pass", func(t *testing.T) {
+			opts := []datasource.TombstoneFindOption{
+				datasource.WithResourceType(tombstoneOne.ResourceType),
+				datasource.WithBeforeTimestamp(1638171600),
+			}
+			tombstones, err := ds.TombstoneDao().List(context.Background(), "default", "default", opts...)
+			assert.NoError(t, err)
+			assert.Equal(t, 2, len(tombstones))
+			assert.Equal(t, tombstones[0].Timestamp, tombstoneOne.Timestamp)
+			assert.Equal(t, tombstones[1].Timestamp, tombstoneTwo.Timestamp)
+		})
+	})
+
+	t.Run("delete tombstone", func(t *testing.T) {
+		t.Run("delete two tombstone should pass", func(t *testing.T) {
+			err := ds.TombstoneDao().Delete(context.Background(), []*sync.Tombstone{&tombstoneOne, &tombstoneTwo}...)
+			assert.NoError(t, err)
+		})
+	})
+
+}
diff --git a/eventbase/datasource/mongo/types.go b/eventbase/datasource/mongo/types.go
new file mode 100644
index 0000000..96f5c67
--- /dev/null
+++ b/eventbase/datasource/mongo/types.go
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package mongo
+
+const (
+	DBName = "servicecomb"
+
+	CollectionTask      = "task"
+	CollectionTombstone = "tombstone"
+	ColumnDomain        = "domain"
+	ColumnProject       = "project"
+	ColumnTaskID        = "task_id"
+	ColumnTimestamp     = "timestamp"
+	ColumnResourceID    = "resource_id"
+	ColumnResourceType  = "resource_type"
+	ColumnStatus        = "status"
+	ColumnAction        = "action"
+	ColumnDataType      = "data_type"
+)
diff --git a/eventbase/go.mod b/eventbase/go.mod
index 1eb7e5b..b8492b9 100644
--- a/eventbase/go.mod
+++ b/eventbase/go.mod
@@ -8,9 +8,12 @@ require (
 	github.com/go-chassis/openlog v1.1.3
 	github.com/little-cui/etcdadpt v0.2.1
 	github.com/stretchr/testify v1.7.0
+	go.mongodb.org/mongo-driver v1.4.2
+	gopkg.in/mgo.v2 v2.0.0-20190816093944-a6b53ec6cb22
 )
 
 require (
+	github.com/aws/aws-sdk-go v1.34.28 // indirect
 	github.com/beorn7/perks v1.0.1 // indirect
 	github.com/cespare/xxhash/v2 v2.1.1 // indirect
 	github.com/coreos/go-semver v0.3.0 // indirect
@@ -21,18 +24,23 @@ require (
 	github.com/dustin/go-humanize v1.0.0 // indirect
 	github.com/form3tech-oss/jwt-go v3.2.3+incompatible // indirect
 	github.com/fsnotify/fsnotify v1.4.7 // indirect
+	github.com/go-stack/stack v1.8.0 // indirect
 	github.com/gogo/protobuf v1.3.2 // indirect
 	github.com/golang/protobuf v1.5.2 // indirect
+	github.com/golang/snappy v0.0.1 // indirect
 	github.com/google/btree v1.0.1 // indirect
 	github.com/gorilla/websocket v1.4.3-0.20210424162022-e8629af678b7 // indirect
 	github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 // indirect
 	github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 // indirect
 	github.com/grpc-ecosystem/grpc-gateway v1.16.0 // indirect
+	github.com/jmespath/go-jmespath v0.4.0 // indirect
 	github.com/jonboulle/clockwork v0.2.2 // indirect
 	github.com/json-iterator/go v1.1.11 // indirect
+	github.com/klauspost/compress v1.9.5 // indirect
 	github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
 	github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
 	github.com/modern-go/reflect2 v1.0.1 // indirect
+	github.com/pkg/errors v0.9.1 // indirect
 	github.com/pmezard/go-difflib v1.0.0 // indirect
 	github.com/prometheus/client_golang v1.11.0 // indirect
 	github.com/prometheus/client_model v0.2.0 // indirect
@@ -43,6 +51,8 @@ require (
 	github.com/spf13/cast v1.3.0 // indirect
 	github.com/spf13/pflag v1.0.5 // indirect
 	github.com/tmc/grpc-websocket-proxy v0.0.0-20201229170055-e5319fda7802 // indirect
+	github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c // indirect
+	github.com/xdg/stringprep v0.0.0-20180714160509-73f8eece6fdc // indirect
 	github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 // indirect
 	go.etcd.io/bbolt v1.3.6 // indirect
 	go.etcd.io/etcd/api/v3 v3.5.0 // indirect
@@ -67,6 +77,7 @@ require (
 	go.uber.org/zap v1.17.0 // indirect
 	golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0 // indirect
 	golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4 // indirect
+	golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect
 	golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40 // indirect
 	golang.org/x/text v0.3.5 // indirect
 	golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba // indirect
diff --git a/eventbase/go.sum b/eventbase/go.sum
index 174c015..4ed735a 100644
--- a/eventbase/go.sum
+++ b/eventbase/go.sum
@@ -36,6 +36,8 @@ github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kd
 github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o=
 github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY=
 github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8=
+github.com/aws/aws-sdk-go v1.34.28 h1:sscPpn/Ns3i0F4HPEWAVcwdIRaZZCuL7llJ2/60yPIk=
+github.com/aws/aws-sdk-go v1.34.28/go.mod h1:H7NKnBqNVzoTJpGfLrQkkD+ytBA93eiDYi/+8rV9s48=
 github.com/benbjohnson/clock v1.0.3 h1:vkLuvpK4fmtSCuo60+yC63p7y0BmQ8gm5ZXGuBCJyXg=
 github.com/benbjohnson/clock v1.0.3/go.mod h1:bGMdMPoPVvcYyt1gHDf4J2KE153Yf9BuiUKYMaxlTDM=
 github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
@@ -144,7 +146,33 @@ github.com/go-openapi/swag v0.0.0-20160704191624-1d0bd113de87/go.mod h1:DXUve3Dp
 github.com/go-playground/locales v0.13.0/go.mod h1:taPMhCMXrRLJO55olJkUXHZBHCxTMfnGwq/HNwmWNS8=
 github.com/go-playground/universal-translator v0.17.0/go.mod h1:UkSxE5sNxxRwHyU+Scu5vgOQjsIJAF8j9muTVoKLVtA=
 github.com/go-playground/validator v9.31.0+incompatible/go.mod h1:yrEkQXlcI+PugkyDjY2bRrL/UBU4f3rvrgkN3V8JEig=
+github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg=
+github.com/go-stack/stack v1.8.0 h1:5SgMzNM5HxrEjV0ww2lTmX6E2Izsfxas4+YHWRs3Lsk=
 github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
+github.com/gobuffalo/attrs v0.0.0-20190224210810-a9411de4debd/go.mod h1:4duuawTqi2wkkpB4ePgWMaai6/Kc6WEz83bhFwpHzj0=
+github.com/gobuffalo/depgen v0.0.0-20190329151759-d478694a28d3/go.mod h1:3STtPUQYuzV0gBVOY3vy6CfMm/ljR4pABfrTeHNLHUY=
+github.com/gobuffalo/depgen v0.1.0/go.mod h1:+ifsuy7fhi15RWncXQQKjWS9JPkdah5sZvtHc2RXGlg=
+github.com/gobuffalo/envy v1.6.15/go.mod h1:n7DRkBerg/aorDM8kbduw5dN3oXGswK5liaSCx4T5NI=
+github.com/gobuffalo/envy v1.7.0/go.mod h1:n7DRkBerg/aorDM8kbduw5dN3oXGswK5liaSCx4T5NI=
+github.com/gobuffalo/flect v0.1.0/go.mod h1:d2ehjJqGOH/Kjqcoz+F7jHTBbmDb38yXA598Hb50EGs=
+github.com/gobuffalo/flect v0.1.1/go.mod h1:8JCgGVbRjJhVgD6399mQr4fx5rRfGKVzFjbj6RE/9UI=
+github.com/gobuffalo/flect v0.1.3/go.mod h1:8JCgGVbRjJhVgD6399mQr4fx5rRfGKVzFjbj6RE/9UI=
+github.com/gobuffalo/genny v0.0.0-20190329151137-27723ad26ef9/go.mod h1:rWs4Z12d1Zbf19rlsn0nurr75KqhYp52EAGGxTbBhNk=
+github.com/gobuffalo/genny v0.0.0-20190403191548-3ca520ef0d9e/go.mod h1:80lIj3kVJWwOrXWWMRzzdhW3DsrdjILVil/SFKBzF28=
+github.com/gobuffalo/genny v0.1.0/go.mod h1:XidbUqzak3lHdS//TPu2OgiFB+51Ur5f7CSnXZ/JDvo=
+github.com/gobuffalo/genny v0.1.1/go.mod h1:5TExbEyY48pfunL4QSXxlDOmdsD44RRq4mVZ0Ex28Xk=
+github.com/gobuffalo/gitgen v0.0.0-20190315122116-cc086187d211/go.mod h1:vEHJk/E9DmhejeLeNt7UVvlSGv3ziL+djtTr3yyzcOw=
+github.com/gobuffalo/gogen v0.0.0-20190315121717-8f38393713f5/go.mod h1:V9QVDIxsgKNZs6L2IYiGR8datgMhB577vzTDqypH360=
+github.com/gobuffalo/gogen v0.1.0/go.mod h1:8NTelM5qd8RZ15VjQTFkAW6qOMx5wBbW4dSCS3BY8gg=
+github.com/gobuffalo/gogen v0.1.1/go.mod h1:y8iBtmHmGc4qa3urIyo1shvOD8JftTtfcKi+71xfDNE=
+github.com/gobuffalo/logger v0.0.0-20190315122211-86e12af44bc2/go.mod h1:QdxcLw541hSGtBnhUc4gaNIXRjiDppFGaDqzbrBd3v8=
+github.com/gobuffalo/mapi v1.0.1/go.mod h1:4VAGh89y6rVOvm5A8fKFxYG+wIW6LO1FMTG9hnKStFc=
+github.com/gobuffalo/mapi v1.0.2/go.mod h1:4VAGh89y6rVOvm5A8fKFxYG+wIW6LO1FMTG9hnKStFc=
+github.com/gobuffalo/packd v0.0.0-20190315124812-a385830c7fc0/go.mod h1:M2Juc+hhDXf/PnmBANFCqx4DM3wRbgDvnVWeG2RIxq4=
+github.com/gobuffalo/packd v0.1.0/go.mod h1:M2Juc+hhDXf/PnmBANFCqx4DM3wRbgDvnVWeG2RIxq4=
+github.com/gobuffalo/packr/v2 v2.0.9/go.mod h1:emmyGweYTm6Kdper+iywB6YK5YzuKchGtJQZ0Odn4pQ=
+github.com/gobuffalo/packr/v2 v2.2.0/go.mod h1:CaAwI0GPIAv+5wKLtv8Afwl+Cm78K/I/VCm/3ptBN+0=
+github.com/gobuffalo/syncx v0.0.0-20190224160051-33c29581e754/go.mod h1:HhnNqWY95UYwwW3uSASeV7vtgYkT2t16hJgV3AEPUpw=
 github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
 github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
 github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4=
@@ -177,6 +205,8 @@ github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaS
 github.com/golang/protobuf v1.5.1/go.mod h1:DopwsBzvsk0Fs44TXzsVbJyPhcCPeIwnvohx4u74HPM=
 github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw=
 github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
+github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
+github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
 github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
 github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
 github.com/google/btree v1.0.1 h1:gK4Kx5IaGY9CD5sPJ36FHiBJ6ZXl0kilRiiCj+jdYp4=
@@ -186,6 +216,7 @@ github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMyw
 github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
 github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
 github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
+github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
 github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
 github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU=
 github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
@@ -239,6 +270,11 @@ github.com/hashicorp/serf v0.8.2/go.mod h1:6hOLApaqBFA1NXqRQAsxw9QxuDEvNxSQRwA/J
 github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
 github.com/imdario/mergo v0.3.5/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA=
 github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
+github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg=
+github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
+github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGwWFoC7ycTf1rcQZHOlsJ6N8=
+github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U=
+github.com/joho/godotenv v1.3.0/go.mod h1:7hK45KPybAkOC6peb+G5yklZfMxEjkZhHbwpqxOKXbg=
 github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo=
 github.com/jonboulle/clockwork v0.2.2 h1:UOGuzwb1PwsrDAObMuhUnj0p5ULPj8V/xJ7Kx9qUBdQ=
 github.com/jonboulle/clockwork v0.2.2/go.mod h1:Pkfl5aHPm1nk2H9h0bjmnJD/BcgbGXUBGnn1kMkgxc8=
@@ -256,11 +292,16 @@ github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7V
 github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM=
 github.com/karlseguin/ccache/v2 v2.0.8/go.mod h1:2BDThcfQMf/c0jnZowt16eW405XIqZPavt+HoYEtcxQ=
 github.com/karlseguin/expect v1.0.2-0.20190806010014-778a5f0c6003/go.mod h1:zNBxMY8P21owkeogJELCLeHIt+voOSduHYTFUbwRAV8=
+github.com/karrick/godirwalk v1.8.0/go.mod h1:H5KPZjojv4lE+QYImBI8xVtrBRgYrIVsaRPx4tDPEn4=
+github.com/karrick/godirwalk v1.10.3/go.mod h1:RoGL9dQei4vP9ilrpETWE8CLOZ1kiN0LhBygSwrAsHA=
 github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q=
 github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQLJ+jE2L00=
 github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
 github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
+github.com/klauspost/compress v1.9.5 h1:U+CaK85mrNNb4k8BNOfgJtJ/gr6kswUCFj6miSzVC6M=
+github.com/klauspost/compress v1.9.5/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A=
 github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
+github.com/konsorten/go-windows-terminal-sequences v1.0.2/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
 github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
 github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc=
 github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
@@ -274,6 +315,8 @@ github.com/little-cui/etcdadpt v0.2.1 h1:eT1A+BV1/2/dmmZA2Nl+cc7uTMuwd6T6DD+JrXr
 github.com/little-cui/etcdadpt v0.2.1/go.mod h1:727wftF2FS4vfkgFLmIvQue1XH+9u4lK2/hd6L7OAC8=
 github.com/magiconair/properties v1.8.1/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
 github.com/mailru/easyjson v0.0.0-20160728113105-d5b7844b561a/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc=
+github.com/markbates/oncer v0.0.0-20181203154359-bf2de49a0be2/go.mod h1:Ld9puTsIW75CHf65OeIOkyKbteujpZVXDpWK6YGZbxE=
+github.com/markbates/safe v1.0.1/go.mod h1:nAqgmRi7cY2nqMc92/bSEeQA+R4OheNU2T1kNSCBdG0=
 github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU=
 github.com/mattn/go-isatty v0.0.3/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4=
 github.com/mattn/go-isatty v0.0.7/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s=
@@ -295,6 +338,7 @@ github.com/modern-go/reflect2 v0.0.0-20180320133207-05fbef0ca5da/go.mod h1:bx2lN
 github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
 github.com/modern-go/reflect2 v1.0.1 h1:9f412s+6RmYXLWZSEzVVgPGK7C2PphHj5RJrvfx9AWI=
 github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
+github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe/go.mod h1:wL8QJuTMNUDYhXwkmfOly8iTdp5TEcJFWZD2D7SIkUc=
 github.com/munnerz/goautoneg v0.0.0-20120707110453-a547fc61f48d/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
 github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
 github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
@@ -309,6 +353,7 @@ github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFSt
 github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
 github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ=
 github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic=
+github.com/pelletier/go-toml v1.7.0/go.mod h1:vwGMzjaWMwyfHwgIBhI2YUM4fB6nL6lVAvS1LBMMhTE=
 github.com/peterbourgon/diskv v2.0.1+incompatible/go.mod h1:uqqh8zWWbv1HBMNONnaR/tNboyR3/BZd58JJSHlUSCU=
 github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
 github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
@@ -345,12 +390,16 @@ github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1
 github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU=
 github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
 github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
+github.com/rogpeppe/go-internal v1.1.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
+github.com/rogpeppe/go-internal v1.2.2/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
 github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
 github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
 github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts=
 github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc=
 github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc=
 github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
+github.com/sirupsen/logrus v1.4.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
+github.com/sirupsen/logrus v1.4.1/go.mod h1:ni0Sbl8bgC9z8RoU9G6nDWqqs/fq4eDPysMBDgk/93Q=
 github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
 github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88=
 github.com/sirupsen/logrus v1.7.0 h1:ShrD1U9pZB12TX0cVy0DtePoCH97K8EtX+mg7ZARUtM=
@@ -367,6 +416,7 @@ github.com/spf13/afero v1.2.2/go.mod h1:9ZxEEn6pIJ8Rxe320qSDBk6AsU0r9pR7Q4OcevTd
 github.com/spf13/cast v1.2.0/go.mod h1:r2rcYCSwa1IExKTDiTfzaxqT2FNHs8hODu4LnUfgKEg=
 github.com/spf13/cast v1.3.0 h1:oget//CVOEoFewqQxwr0Ej5yjygnqGkvggSE/gB35Q8=
 github.com/spf13/cast v1.3.0/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE=
+github.com/spf13/cobra v0.0.3/go.mod h1:1l0Ry5zgKvJasoi3XT1TypsSe7PqH0Sj9dhYf7v3XqQ=
 github.com/spf13/cobra v1.1.3/go.mod h1:pGADOWyqRD/YMrPZigI/zbliZ2wVD/23d+is3pSWzOo=
 github.com/spf13/jwalterweatherman v1.0.0/go.mod h1:cQK4TGJAtQXfYWX+Ddv3mKDzgVb68N+wFjFa4jdeBTo=
 github.com/spf13/pflag v0.0.0-20170130214245-9ff6c6923cff/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4=
@@ -385,11 +435,17 @@ github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/
 github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
 github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
 github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw=
+github.com/tidwall/pretty v1.0.0 h1:HsD+QiTn7sK6flMKIvNmpqz1qrpP3Ps6jOKIKMooyg4=
+github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk=
 github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
 github.com/tmc/grpc-websocket-proxy v0.0.0-20201229170055-e5319fda7802 h1:uruHq4dN7GR16kFc5fp3d1RIYzJW5onx8Ybykw2YQFA=
 github.com/tmc/grpc-websocket-proxy v0.0.0-20201229170055-e5319fda7802/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
 github.com/ugorji/go v1.1.4/go.mod h1:uQMGLiO92mf5W77hV/PUCpI3pbzQx3CRekS0kk+RGrc=
 github.com/wsxiaoys/terminal v0.0.0-20160513160801-0940f3fc43a0/go.mod h1:IXCdmsXIht47RaVFLEdVnh1t+pgYtTAhQGj73kz+2DM=
+github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c h1:u40Z8hqBAAQyv+vATcGgV0YCnDjqSL7/q/JyPhhJSPk=
+github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I=
+github.com/xdg/stringprep v0.0.0-20180714160509-73f8eece6fdc h1:n+nNi93yXLkJvKwXNP9d55HC7lGK4H/SRcwB5IaUZLo=
+github.com/xdg/stringprep v0.0.0-20180714160509-73f8eece6fdc/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y=
 github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 h1:eY9dn8+vbi4tKz5Qo6v2eYzo7kUS51QINcR5jNpbZS8=
 github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU=
 github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
@@ -412,6 +468,8 @@ go.etcd.io/etcd/raft/v3 v3.5.0 h1:kw2TmO3yFTgE+F0mdKkG7xMxkit2duBDa2Hu6D/HMlw=
 go.etcd.io/etcd/raft/v3 v3.5.0/go.mod h1:UFOHSIvO/nKwd4lhkwabrTD3cqW5yVyYYf/KlD00Szc=
 go.etcd.io/etcd/server/v3 v3.5.0 h1:jk8D/lwGEDlQU9kZXUFMSANkE22Sg5+mW27ip8xcF9E=
 go.etcd.io/etcd/server/v3 v3.5.0/go.mod h1:3Ah5ruV+M+7RZr0+Y/5mNLwC+eQlni+mQmOVdCRJoS4=
+go.mongodb.org/mongo-driver v1.4.2 h1:WlnEglfTg/PfPq4WXs2Vkl/5ICC6hoG8+r+LraPmGk4=
+go.mongodb.org/mongo-driver v1.4.2/go.mod h1:WcMNYLx/IlOxLe6JRJiv2uXuCz6zBLndR4SoGjYphSc=
 go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU=
 go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8=
 go.opentelemetry.io/contrib v0.20.0 h1:ubFQUn0VCZ0gPwIoJfBJVpeBlyRMxu8Mm/huKWYd9p0=
@@ -453,7 +511,9 @@ golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnf
 golang.org/x/crypto v0.0.0-20181029021203-45a5f77698d3/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
 golang.org/x/crypto v0.0.0-20190211182817-74369b46fc67/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
 golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
+golang.org/x/crypto v0.0.0-20190422162423-af44ce270edf/go.mod h1:WFFai1msRO1wXaEeE5yQxYXgSfI8pQAWXbQop6sCtWE=
 golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
+golang.org/x/crypto v0.0.0-20190530122614-20be4c3c3ed5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
 golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
 golang.org/x/crypto v0.0.0-20190820162420-60c769a6c586/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
 golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
@@ -502,6 +562,7 @@ golang.org/x/net v0.0.0-20190603091049-60506f45cf65/go.mod h1:HSz+uSET+XFnRR8LxR
 golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
 golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
 golang.org/x/net v0.0.0-20191004110552-13f9640d40b9/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
+golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
 golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
 golang.org/x/net v0.0.0-20200625001655-4c5254603344/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
 golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
@@ -519,10 +580,12 @@ golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJ
 golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
 golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
 golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.0.0-20190412183630-56d357773e84/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
 golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
 golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
 golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
 golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ=
 golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
 golang.org/x/sys v0.0.0-20170830134202-bb24a47a89ea/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
 golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
@@ -536,10 +599,13 @@ golang.org/x/sys v0.0.0-20190209173611-3b5209105503/go.mod h1:STP8DvDyc/dI5b8T5h
 golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
 golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
 golang.org/x/sys v0.0.0-20190312061237-fead79001313/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20190403152447-81d4e9dc473e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20190419153524-e8e3143a4f4a/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20190502145724-3ef323f4f1fd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20190507160741-ecd444e8653b/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20190531175056-4c3a928424d2/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20190606165138-5da285871e9c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20190624142023-c5567b49c5d0/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20190826190057-c7b8b68b1456/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
@@ -579,9 +645,13 @@ golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3
 golang.org/x/tools v0.0.0-20190312151545-0bb0c0a6e846/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
 golang.org/x/tools v0.0.0-20190312170243-e65039ee4138/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
 golang.org/x/tools v0.0.0-20190328211700-ab21143f2384/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
+golang.org/x/tools v0.0.0-20190329151228-23e29df326fe/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
+golang.org/x/tools v0.0.0-20190416151739-9c9e1878f421/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
+golang.org/x/tools v0.0.0-20190420181800-aa740d480789/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
 golang.org/x/tools v0.0.0-20190425150028-36563e24a262/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q=
 golang.org/x/tools v0.0.0-20190506145303-2d16b83fe98c/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q=
 golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q=
+golang.org/x/tools v0.0.0-20190531172133-b3315ee88b7d/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc=
 golang.org/x/tools v0.0.0-20190606124116-d0a3d012864b/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc=
 golang.org/x/tools v0.0.0-20190621195816-6e04913cbbac/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc=
 golang.org/x/tools v0.0.0-20190628153133-6cdbf07be9d0/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc=
@@ -661,6 +731,8 @@ gopkg.in/go-playground/assert.v1 v1.2.1/go.mod h1:9RXL0bg/zibRAgZUYszZSwO/z8Y/a8
 gopkg.in/go-playground/validator.v8 v8.18.2/go.mod h1:RX2a/7Ha8BgOhfk7j780h4/u/RRjR0eouCJSH80/M2Y=
 gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw=
 gopkg.in/ini.v1 v1.51.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
+gopkg.in/mgo.v2 v2.0.0-20190816093944-a6b53ec6cb22 h1:VpOs+IwYnYBaFnrNAeB8UUWtL3vEUnzSCL1nVjPhqrw=
+gopkg.in/mgo.v2 v2.0.0-20190816093944-a6b53ec6cb22/go.mod h1:yeKp02qBN3iKW1OzL3MGk2IdtZzaj7SFntXj72NppTA=
 gopkg.in/natefinch/lumberjack.v2 v2.0.0 h1:1Lc07Kr7qY4U2YPouBjpCLxpiyxIVoxqXgkXLknAOE8=
 gopkg.in/natefinch/lumberjack.v2 v2.0.0/go.mod h1:l0ndWWf7gzL7RNwBG7wST/UCcT4T24xpD6X8LsfU/+k=
 gopkg.in/resty.v1 v1.12.0/go.mod h1:mDo4pnntr5jdWRML875a/NmxYqAlA73dVijT2AXvQQo=
diff --git a/eventbase/test/test.go b/eventbase/test/test.go
index 5d0311b..c71d5ad 100644
--- a/eventbase/test/test.go
+++ b/eventbase/test/test.go
@@ -1,6 +1,8 @@
 package test
 
 var (
-	Etcd    = "etcd"
-	EtcdURI = "http://127.0.0.1:2379"
+	Etcd     = "etcd"
+	EtcdURI  = "http://127.0.0.1:2379"
+	Mongo    = "mongo"
+	MongoURI = "mongodb://127.0.0.1:27017"
 )
diff --git a/scripts/ut_test_in_docker.sh b/scripts/ut_test_in_docker.sh
index 15ddc6b..ab1e95f 100644
--- a/scripts/ut_test_in_docker.sh
+++ b/scripts/ut_test_in_docker.sh
@@ -72,6 +72,7 @@ elif [ ${db_name} == "mongo" ];then
   [ $? == 0 ] && ut_for_file datasource
   [ $? == 0 ] && ut_for_dir datasource/mongo
   [ $? == 0 ] && ut_for_dir server
+  [ $? == 0 ] && ut_for_dir eventbase/datasource/mongo
 else
   echo "${db_name} non-existent"
 	exit 1